virtual ~Filter(){};
///sends Package off to next Filter, returns altered Package
- virtual Package & process(Package & package) const {
- return package;
+ virtual void process(Package & package) const {
};
virtual void configure(){};
public:
~P2_Session();
P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable,
- ThreadPoolSocketObserver *m_my_thread,
+ ThreadPoolSocketObserver *m_thread_pool_observer,
const Package *package);
int m_no_requests;
private:
void timeoutNotify();
void connectNotify();
private:
- ThreadPoolSocketObserver *m_my_thread;
+ ThreadPoolSocketObserver *m_thread_pool_observer;
Session m_session;
Origin m_origin;
bool m_delete_flag;
const Package *package)
: Z_Assoc(the_PDU_Observable)
{
- m_my_thread = my_thread_pool;
+ m_thread_pool_observer = my_thread_pool;
m_no_requests = 0;
m_delete_flag = false;
m_package = package;
Package *p = new Package(m_session, m_origin);
- ThreadPoolPackage *m = new ThreadPoolPackage(p, this);
+ ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
p->copy_filter(*m_package);
p->request() = yazpp_1::GDU(z_pdu);
- m_my_thread->put(m);
+ m_thread_pool_observer->put(tp);
}
void P2_Session::failNotify()
Package *p = new Package(m_session, m_origin);
- ThreadPoolPackage *m = new ThreadPoolPackage(p, this);
+ ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
p->copy_filter(*m_package);
- m_my_thread->put(m);
+ m_thread_pool_observer->put(tp);
}
void P2_Session::timeoutNotify()
public:
~P2_Server();
P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
- ThreadPoolSocketObserver *m_my_thread,
+ ThreadPoolSocketObserver *m_thread_pool_observer,
const Package *package);
private:
yazpp_1::IPDU_Observer* sessionNotify(
void timeoutNotify();
void connectNotify();
private:
- ThreadPoolSocketObserver *m_my_thread;
+ ThreadPoolSocketObserver *m_thread_pool_observer;
const Package *m_package;
};
P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
- ThreadPoolSocketObserver *my_thread,
+ ThreadPoolSocketObserver *thread_pool_observer,
const Package *package)
: Z_Assoc(the_PDU_Observable)
{
- m_my_thread = my_thread;
+ m_thread_pool_observer = thread_pool_observer;
m_package = package;
}
yazpp_1::IPDU_Observer *P2_Server::sessionNotify(yazpp_1::IPDU_Observable
*the_PDU_Observable, int fd)
{
- P2_Session *my = new P2_Session(the_PDU_Observable, m_my_thread,
+ P2_Session *my = new P2_Session(the_PDU_Observable, m_thread_pool_observer,
m_package);
return my;
}
close(m_fd[1]);
}
-Package &FilterFrontendNet::process(Package &package) const {
+void FilterFrontendNet::process(Package &package) const {
yazpp_1::SocketManager mySocketManager;
My_Timer_Thread *tt = 0;
if (tt && tt->timeout())
break;
}
- return package;
+ delete tt;
}
std::string &FilterFrontendNet::listen_address()
class FilterFrontendNet : public yp2::Filter {
public:
FilterFrontendNet::FilterFrontendNet();
- yp2::Package & process(yp2::Package & package) const;
+ void process(yp2::Package & package) const;
private:
int m_no_threads;
std::string m_listen_address;
}
/// send Package to it's next Filter defined in Router
- Package & move() {
+ void move() {
m_filter = m_router->move(m_filter, this);
if (m_filter)
- return m_filter->process(*this);
- else
- return *this;
+ m_filter->process(*this);
}
/// access session - left val in assignment
class TFilter: public yp2::Filter {
public:
- yp2::Package & process(yp2::Package & package) const {
- return package;
- };
+ void process(yp2::Package & package) const {};
};
class FilterConstant: public yp2::Filter {
public:
- yp2::Package & process(yp2::Package & package) const {
+ void process(yp2::Package & package) const {
package.data() = 1234;
- return package.move();
+ package.move();
};
};
class FilterDouble: public yp2::Filter {
public:
- yp2::Package & process(yp2::Package & package) const {
+ void process(yp2::Package & package) const {
package.data() = package.data() * 2;
- return package.move();
+ package.move();
};
};
yp2::Session session;
yp2::Origin origin;
- yp2::Package pack_in(session, origin);
+ yp2::Package pack(session, origin);
- yp2::Package pack_out = pack_in.router(router1).move();
+ pack.router(router1).move();
- BOOST_CHECK (pack_out.data() == 2468);
+ BOOST_CHECK (pack.data() == 2468);
}
yp2::Session session;
yp2::Origin origin;
- yp2::Package pack_in(session, origin);
+ yp2::Package pack(session, origin);
- yp2::Package pack_out(session, origin);
-
- pack_out = pack_in.router(router2).move();
+ pack.router(router2).move();
- BOOST_CHECK (pack_out.data() == 1234);
+ BOOST_CHECK (pack.data() == 1234);
}
class FilterInit: public yp2::Filter {
public:
- yp2::Package & process(yp2::Package & package) const {
+ void process(yp2::Package & package) const {
- Z_GDU *gdu = package.request().get();
if (package.session().is_closed())
{
- // std::cout << "Got Close. Sending nothing\n";
+ // std::cout << "Got Close.\n";
}
+ Z_GDU *gdu = package.request().get();
if (gdu)
{
// std::cout << "Got PDU. Sending init response\n";
{
yp2::FilterFrontendNet nf;
}
- BOOST_CHECK(true);
}
catch ( ... ) {
BOOST_CHECK (false);
// Create package with Z39.50 init request in it
yp2::Session session;
yp2::Origin origin;
- yp2::Package pack_in(session, origin);
+ yp2::Package pack(session, origin);
ODR odr = odr_createmem(ODR_ENCODE);
Z_APDU *apdu = zget_APDU(odr, Z_APDU_initRequest);
- pack_in.request() = apdu;
+ pack.request() = apdu;
odr_destroy(odr);
// Done creating query.
// Put it in router
- pack_in.router(router).move();
+ pack.router(router).move();
// Inspect that we got Z39.50 init response
- yazpp_1::GDU *gdu = &pack_in.response();
+ yazpp_1::GDU *gdu = &pack.response();
Z_GDU *z_gdu = gdu->get();
BOOST_CHECK(z_gdu);
BOOST_CHECK_EQUAL(z_gdu->u.z3950->which, Z_APDU_initResponse);
}
}
- BOOST_CHECK(true);
}
catch ( ... ) {
BOOST_CHECK (false);
{
{
yp2::RouterChain router;
+
+ // put in frontend first
yp2::FilterFrontendNet filter_front;
filter_front.listen_address() = "unix:socket";
filter_front.listen_duration() = 2; // listen a short time only
router.rule(filter_front);
+ // put in a backend
FilterInit filter_init;
router.rule(filter_init);
yp2::Session session;
yp2::Origin origin;
- yp2::Package pack_in(session, origin);
+ yp2::Package pack(session, origin);
- pack_in.router(router).move();
+ pack.router(router).move();
}
BOOST_CHECK(true);
}
-/* $Id: thread_pool_observer.cpp,v 1.4 2005-10-13 20:06:45 adam Exp $
+/* $Id: thread_pool_observer.cpp,v 1.5 2005-10-14 10:08:40 adam Exp $
Copyright (c) 1998-2005, Index Data.
This file is part of the yaz-proxy.
}
-class worker {
-public:
- worker(ThreadPoolSocketObserver *s) : m_s(s) {};
- ThreadPoolSocketObserver *m_s;
- void operator() (void) {
- m_s->run(0);
- }
-};
-
ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int no_threads)
: m_SocketObservable(obs)
{
int i;
for (i = 0; i<no_threads; i++)
{
- worker w(this);
+ Worker w(this);
m_thrds.add_thread(new boost::thread(w));
}
}
-/* $Id: thread_pool_observer.hpp,v 1.1 2005-10-13 20:06:45 adam Exp $
+/* $Id: thread_pool_observer.hpp,v 1.2 2005-10-14 10:08:40 adam Exp $
Copyright (c) 1998-2005, Index Data.
This file is part of the yaz-proxy.
};
class ThreadPoolSocketObserver : public yazpp_1::ISocketObserver {
- public:
+private:
+ class Worker {
+ public:
+ Worker(ThreadPoolSocketObserver *s) : m_s(s) {};
+ ThreadPoolSocketObserver *m_s;
+ void operator() (void) {
+ m_s->run(0);
+ }
+ };
+public:
ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs, int no_threads);
virtual ~ThreadPoolSocketObserver();
void socketNotify(int event);
boost::condition m_cond_input_data;
boost::mutex m_mutex_output_data;
bool m_stop_flag;
+
+
};
#endif