configure
libtool
autom4te.cache
+Doxyfile
--- /dev/null
+.libs
+.deps
+*.lo
+*.la
+Makefile
+Makefile.in
+test_filter1
+test_filter2
+design
+p2
+test_thread_pool_observer
-## $Id: Makefile.am,v 1.3 2005-10-06 12:55:20 adam Exp $
+## $Id: Makefile.am,v 1.4 2005-10-06 19:33:58 adam Exp $
AM_CXXFLAGS = $(YAZPPINC) $(XSLT_CFLAGS) $(USEMARCONINC)
bin_PROGRAMS =
-check_PROGRAMS = test_filter1 test_filter2
+check_PROGRAMS = test_filter1 test_filter2 test_thread_pool_observer
noinst_PROGRAMS = p2 design
TESTS=$(check_PROGRAMS)
p2_config.cpp p2_config.h \
p2_backend.h p2_backend_dummy.cpp \
p2_modules.cpp p2_modules.h \
- p2_xmlerror.cpp p2_xmlerror.h msg-thread.h msg-thread.cpp
+ p2_xmlerror.cpp p2_xmlerror.h \
+ thread_pool_observer.cpp thread_pool_observer.h
+
+test_thread_pool_observer_SOURCES = test_thread_pool_observer.cpp \
+ thread_pool_observer.cpp thread_pool_observer.h
LDADD= $(YAZPPLALIB) $(XSLT_LIBS) $(USEMARCONLALIB)
+++ /dev/null
-/* $Id: msg-thread.cpp,v 1.1 2005-10-06 09:37:25 marc Exp $
- Copyright (c) 1998-2005, Index Data.
-
-This file is part of the yaz-proxy.
-
-YAZ proxy is free software; you can redistribute it and/or modify it under
-the terms of the GNU General Public License as published by the Free
-Software Foundation; either version 2, or (at your option) any later
-version.
-
-YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
-WARRANTY; without even the implied warranty of MERCHANTABILITY or
-FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
-for more details.
-
-You should have received a copy of the GNU General Public License
-along with YAZ proxy; see the file LICENSE. If not, write to the
-Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
-02111-1307, USA.
- */
-#include <pthread.h>
-#include <unistd.h>
-#include <ctype.h>
-#include <stdio.h>
-
-#include <yaz++/socket-observer.h>
-#include <yaz/log.h>
-
-#include "msg-thread.h"
-
-using namespace yazpp_1;
-
-IMsg_Thread::~IMsg_Thread()
-{
-
-}
-
-Msg_Thread_Queue::Msg_Thread_Queue()
-{
- m_list = 0;
-}
-
-int Msg_Thread_Queue::size()
-{
- int no = 0;
- Msg_Thread_Queue_List *l;
- for (l = m_list; l; l = l->m_next)
- no++;
- return no;
-}
-
-void Msg_Thread_Queue::enqueue(IMsg_Thread *m)
-{
- Msg_Thread_Queue_List *l = new Msg_Thread_Queue_List;
- l->m_next = m_list;
- l->m_item = m;
- m_list = l;
-}
-
-IMsg_Thread *Msg_Thread_Queue::dequeue()
-{
- Msg_Thread_Queue_List **l = &m_list;
- if (!*l)
- return 0;
- while ((*l)->m_next)
- l = &(*l)->m_next;
- IMsg_Thread *m = (*l)->m_item;
- delete *l;
- *l = 0;
- return m;
-}
-
-static void *tfunc(void *p)
-{
- Msg_Thread *pt = (Msg_Thread *) p;
- pt->run(0);
- return 0;
-}
-
-
-Msg_Thread::Msg_Thread(ISocketObservable *obs, int no_threads)
- : m_SocketObservable(obs)
-{
- pipe(m_fd);
- obs->addObserver(m_fd[0], this);
- obs->maskObserver(this, SOCKET_OBSERVE_READ);
-
- m_stop_flag = false;
- pthread_mutex_init(&m_mutex_input_data, 0);
- pthread_cond_init(&m_cond_input_data, 0);
- pthread_mutex_init(&m_mutex_output_data, 0);
-
- m_no_threads = no_threads;
- m_thread_id = new pthread_t[no_threads];
- int i;
- for (i = 0; i<m_no_threads; i++)
- pthread_create(&m_thread_id[i], 0, tfunc, this);
-}
-
-Msg_Thread::~Msg_Thread()
-{
- pthread_mutex_lock(&m_mutex_input_data);
- m_stop_flag = true;
- pthread_cond_broadcast(&m_cond_input_data);
- pthread_mutex_unlock(&m_mutex_input_data);
-
- int i;
- for (i = 0; i<m_no_threads; i++)
- pthread_join(m_thread_id[i], 0);
- delete [] m_thread_id;
-
- m_SocketObservable->deleteObserver(this);
-
- pthread_cond_destroy(&m_cond_input_data);
- pthread_mutex_destroy(&m_mutex_input_data);
- pthread_mutex_destroy(&m_mutex_output_data);
- close(m_fd[0]);
- close(m_fd[1]);
-}
-
-void Msg_Thread::socketNotify(int event)
-{
- if (event & SOCKET_OBSERVE_READ)
- {
- char buf[2];
- read(m_fd[0], buf, 1);
- pthread_mutex_lock(&m_mutex_output_data);
- IMsg_Thread *out = m_output.dequeue();
- pthread_mutex_unlock(&m_mutex_output_data);
- if (out)
- out->result();
- }
-}
-
-void Msg_Thread::run(void *p)
-{
- while(1)
- {
- pthread_mutex_lock(&m_mutex_input_data);
- while (!m_stop_flag && m_input.size() == 0)
- pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data);
- if (m_stop_flag)
- {
- pthread_mutex_unlock(&m_mutex_input_data);
- break;
- }
- IMsg_Thread *in = m_input.dequeue();
- pthread_mutex_unlock(&m_mutex_input_data);
-
- IMsg_Thread *out = in->handle();
- pthread_mutex_lock(&m_mutex_output_data);
- m_output.enqueue(out);
-
- write(m_fd[1], "", 1);
- pthread_mutex_unlock(&m_mutex_output_data);
- }
-}
-
-void Msg_Thread::put(IMsg_Thread *m)
-{
- pthread_mutex_lock(&m_mutex_input_data);
- m_input.enqueue(m);
- pthread_cond_signal(&m_cond_input_data);
- pthread_mutex_unlock(&m_mutex_input_data);
-}
-/*
- * Local variables:
- * c-basic-offset: 4
- * indent-tabs-mode: nil
- * End:
- * vim: shiftwidth=4 tabstop=8 expandtab
- */
-
+++ /dev/null
-/* $Id: msg-thread.h,v 1.1 2005-10-06 09:37:25 marc Exp $
- Copyright (c) 1998-2005, Index Data.
-
-This file is part of the yaz-proxy.
-
-YAZ proxy is free software; you can redistribute it and/or modify it under
-the terms of the GNU General Public License as published by the Free
-Software Foundation; either version 2, or (at your option) any later
-version.
-
-YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
-WARRANTY; without even the implied warranty of MERCHANTABILITY or
-FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
-for more details.
-
-You should have received a copy of the GNU General Public License
-along with YAZ proxy; see the file LICENSE. If not, write to the
-Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
-02111-1307, USA.
- */
-
-#include <pthread.h>
-#include <unistd.h>
-#include <ctype.h>
-
-#if HAVE_DLFCN_H
-#include <dlfcn.h>
-#endif
-
-#include <yaz++/socket-observer.h>
-#include <yaz/yconfig.h>
-
-class IMsg_Thread {
-public:
- virtual IMsg_Thread *handle() = 0;
- virtual void result() = 0;
- virtual ~IMsg_Thread();
-};
-
-class Msg_Thread_Queue_List {
- friend class Msg_Thread_Queue;
- private:
- IMsg_Thread *m_item;
- Msg_Thread_Queue_List *m_next;
-};
-
-class Msg_Thread_Queue {
- public:
- Msg_Thread_Queue();
- void enqueue(IMsg_Thread *in);
- IMsg_Thread *dequeue();
- int size();
- private:
- Msg_Thread_Queue_List *m_list;
-};
-
-class Msg_Thread : public yazpp_1::ISocketObserver {
- public:
- Msg_Thread(yazpp_1::ISocketObservable *obs, int no_threads);
- virtual ~Msg_Thread();
- void socketNotify(int event);
- void put(IMsg_Thread *m);
- IMsg_Thread *get();
- void run(void *p);
- int m_fd[2];
-private:
- yazpp_1::ISocketObservable *m_SocketObservable;
- int m_no_threads;
- pthread_t *m_thread_id;
- Msg_Thread_Queue m_input;
- Msg_Thread_Queue m_output;
- pthread_mutex_t m_mutex_input_data;
- pthread_cond_t m_cond_input_data;
- pthread_mutex_t m_mutex_output_data;
- bool m_stop_flag;
-};
-
-/*
- * Local variables:
- * c-basic-offset: 4
- * indent-tabs-mode: nil
- * End:
- * vim: shiftwidth=4 tabstop=8 expandtab
- */
-
-/* $Id: p2.cpp,v 1.1 2005-10-06 09:37:25 marc Exp $
+/* $Id: p2.cpp,v 1.2 2005-10-06 19:33:58 adam Exp $
Copyright (c) 1998-2005, Index Data.
This file is part of the yaz-proxy.
}
P2_Server::P2_Server(IPDU_Observable *the_PDU_Observable,
- Msg_Thread *my_thread,
+ ThreadPoolSocketObserver *my_thread,
P2_Config *config,
P2_ModuleFactory *modules)
: Z_Assoc(the_PDU_Observable)
PDU_Assoc *my_PDU_Assoc = 0;
- Msg_Thread my_thread(&mySocketManager, config.m_no_threads);
+ ThreadPoolSocketObserver my_thread(&mySocketManager, config.m_no_threads);
my_PDU_Assoc = new PDU_Assoc(&mySocketManager);
-/* $Id: p2_frontend.cpp,v 1.1 2005-10-06 09:37:25 marc Exp $
+/* $Id: p2_frontend.cpp,v 1.2 2005-10-06 19:33:58 adam Exp $
Copyright (c) 1998-2005, Index Data.
This file is part of the yaz-proxy.
using namespace std;
P2_Frontend::P2_Frontend(IPDU_Observable *the_PDU_Observable,
- Msg_Thread *my_thread, P2_Server *server)
+ ThreadPoolSocketObserver
+ *my_thread, P2_Server *server)
: Z_Assoc(the_PDU_Observable)
{
m_my_thread = my_thread;
-/* $Id: p2_frontend.h,v 1.1 2005-10-06 09:37:25 marc Exp $
+/* $Id: p2_frontend.h,v 1.2 2005-10-06 19:33:58 adam Exp $
Copyright (c) 1998-2005, Index Data.
This file is part of the yaz-proxy.
#include <vector>
#include <string>
-#include "msg-thread.h"
+#include "thread_pool_observer.h"
#include <yaz++/z-assoc.h>
#include <yaz++/pdu-assoc.h>
#include <yaz++/gdu.h>
public:
~P2_Server();
P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
- Msg_Thread *m_my_thread,
+ ThreadPoolSocketObserver *m_my_thread,
P2_Config *config,
P2_ModuleFactory *modules);
P2_Config *lockConfig();
void connectNotify();
private:
P2_Config *m_config;
- Msg_Thread *m_my_thread;
+ ThreadPoolSocketObserver *m_my_thread;
pthread_mutex_t m_mutex_config;
};
yazpp_1::Yaz_Z_Query m_query;
};
-class P2_Msg : public IMsg_Thread {
+class P2_Msg : public IThreadPoolMsg {
public:
int m_close_flag;
yazpp_1::GDU *m_gdu;
yazpp_1::GDU *m_output;
P2_Frontend *m_front;
P2_Server *m_server;
- IMsg_Thread *handle();
+ IThreadPoolMsg *handle();
void result();
P2_Msg(yazpp_1::GDU *gdu, P2_Frontend *front, P2_Server *server);
virtual ~P2_Msg();
public:
~P2_Frontend();
P2_Frontend(yazpp_1::IPDU_Observable *the_PDU_Observable,
- Msg_Thread *m_my_thread, P2_Server *server);
+ ThreadPoolSocketObserver *m_my_thread, P2_Server *server);
IPDU_Observer* sessionNotify(yazpp_1::IPDU_Observable *the_PDU_Observable,
int fd);
private:
yazpp_1::GDUQueue m_in_queue;
- Msg_Thread *m_my_thread;
+ ThreadPoolSocketObserver *m_my_thread;
P2_Server *m_server;
private:
bool P2_Frontend::search(Z_GDU *z_gdu);
-/* $Id: p2_msg.cpp,v 1.1 2005-10-06 09:37:25 marc Exp $
+/* $Id: p2_msg.cpp,v 1.2 2005-10-06 19:33:58 adam Exp $
Copyright (c) 1998-2005, Index Data.
This file is part of the yaz-proxy.
return zget_APDU(odr, Z_APDU_presentResponse);
}
-IMsg_Thread *P2_Msg::handle()
+IThreadPoolMsg *P2_Msg::handle()
{
ODR odr = odr_createmem(ODR_ENCODE);
yaz_log(YLOG_LOG, "P2_Msg:handle begin");
--- /dev/null
+/* $Id: test_thread_pool_observer.cpp,v 1.1 2005-10-06 19:33:58 adam Exp $
+ Copyright (c) 1998-2005, Index Data.
+
+This file is part of the yaz-proxy.
+
+YAZ proxy is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 2, or (at your option) any later
+version.
+
+YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+for more details.
+
+You should have received a copy of the GNU General Public License
+along with YAZ proxy; see the file LICENSE. If not, write to the
+Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
+02111-1307, USA.
+ */
+
+#include <stdlib.h>
+#include <ctype.h>
+
+#include <yaz++/pdu-assoc.h>
+#include <yaz++/socket-manager.h>
+#include <yaz/log.h>
+#include "thread_pool_observer.h"
+
+using namespace yazpp_1;
+
+class My_Msg : public IThreadPoolMsg {
+public:
+ IThreadPoolMsg *handle();
+ void result();
+ int m_val;
+};
+
+IThreadPoolMsg *My_Msg::handle()
+{
+ My_Msg *res = new My_Msg;
+ int sl = rand() % 5;
+
+ res->m_val = m_val;
+ printf("My_Msg::handle val=%d sleep=%d\n", m_val, sl);
+ sleep(sl);
+ return res;
+}
+
+void My_Msg::result()
+{
+ printf("My_Msg::result val=%d\n", m_val);
+}
+
+class My_Timer_Thread : public ISocketObserver {
+private:
+ ISocketObservable *m_obs;
+ int m_fd[2];
+ ThreadPoolSocketObserver *m_t;
+public:
+ My_Timer_Thread(ISocketObservable *obs, ThreadPoolSocketObserver *t);
+ void socketNotify(int event);
+};
+
+My_Timer_Thread::My_Timer_Thread(ISocketObservable *obs,
+ ThreadPoolSocketObserver *t) : m_obs(obs)
+{
+ pipe(m_fd);
+ m_t = t;
+ obs->addObserver(m_fd[0], this);
+ obs->maskObserver(this, SOCKET_OBSERVE_READ);
+ obs->timeoutObserver(this, 1);
+}
+
+void My_Timer_Thread::socketNotify(int event)
+{
+ static int seq = 1;
+ printf("Add %d\n", seq);
+ My_Msg *m = new My_Msg;
+ m->m_val = seq++;
+ m_t->put(m);
+}
+
+int main(int argc, char **argv)
+{
+ SocketManager mySocketManager;
+
+ ThreadPoolSocketObserver m(&mySocketManager, 3);
+ My_Timer_Thread t(&mySocketManager, &m) ;
+ int i = 0;
+ while (++i < 5 && mySocketManager.processEvent() > 0)
+ ;
+ return 0;
+}
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
--- /dev/null
+/* $Id: thread_pool_observer.cpp,v 1.1 2005-10-06 19:33:58 adam Exp $
+ Copyright (c) 1998-2005, Index Data.
+
+This file is part of the yaz-proxy.
+
+YAZ proxy is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 2, or (at your option) any later
+version.
+
+YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+for more details.
+
+You should have received a copy of the GNU General Public License
+along with YAZ proxy; see the file LICENSE. If not, write to the
+Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
+02111-1307, USA.
+ */
+#include <pthread.h>
+#include <unistd.h>
+#include <ctype.h>
+#include <stdio.h>
+
+#include <yaz++/socket-observer.h>
+#include <yaz/log.h>
+
+#include "thread_pool_observer.h"
+
+using namespace yazpp_1;
+
+IThreadPoolMsg::~IThreadPoolMsg()
+{
+
+}
+
+static void *tfunc(void *p)
+{
+ ThreadPoolSocketObserver *pt = (ThreadPoolSocketObserver *) p;
+ pt->run(0);
+ return 0;
+}
+
+
+ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int no_threads)
+ : m_SocketObservable(obs)
+{
+ pipe(m_fd);
+ obs->addObserver(m_fd[0], this);
+ obs->maskObserver(this, SOCKET_OBSERVE_READ);
+
+ m_stop_flag = false;
+ pthread_mutex_init(&m_mutex_input_data, 0);
+ pthread_cond_init(&m_cond_input_data, 0);
+ pthread_mutex_init(&m_mutex_output_data, 0);
+
+ m_no_threads = no_threads;
+ m_thread_id = new pthread_t[no_threads];
+ int i;
+ for (i = 0; i<m_no_threads; i++)
+ pthread_create(&m_thread_id[i], 0, tfunc, this);
+}
+
+ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
+{
+ pthread_mutex_lock(&m_mutex_input_data);
+ m_stop_flag = true;
+ pthread_cond_broadcast(&m_cond_input_data);
+ pthread_mutex_unlock(&m_mutex_input_data);
+
+ int i;
+ for (i = 0; i<m_no_threads; i++)
+ pthread_join(m_thread_id[i], 0);
+ delete [] m_thread_id;
+
+ m_SocketObservable->deleteObserver(this);
+
+ pthread_cond_destroy(&m_cond_input_data);
+ pthread_mutex_destroy(&m_mutex_input_data);
+ pthread_mutex_destroy(&m_mutex_output_data);
+ close(m_fd[0]);
+ close(m_fd[1]);
+}
+
+void ThreadPoolSocketObserver::socketNotify(int event)
+{
+ if (event & SOCKET_OBSERVE_READ)
+ {
+ char buf[2];
+ read(m_fd[0], buf, 1);
+ pthread_mutex_lock(&m_mutex_output_data);
+ IThreadPoolMsg *out = m_output.front();
+ m_output.pop_front();
+ pthread_mutex_unlock(&m_mutex_output_data);
+ if (out)
+ out->result();
+ }
+}
+
+void ThreadPoolSocketObserver::run(void *p)
+{
+ while(1)
+ {
+ pthread_mutex_lock(&m_mutex_input_data);
+ while (!m_stop_flag && m_input.size() == 0)
+ pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data);
+ if (m_stop_flag)
+ {
+ pthread_mutex_unlock(&m_mutex_input_data);
+ break;
+ }
+ IThreadPoolMsg *in = m_input.front();
+ m_input.pop_front();
+ pthread_mutex_unlock(&m_mutex_input_data);
+
+ IThreadPoolMsg *out = in->handle();
+ pthread_mutex_lock(&m_mutex_output_data);
+
+ m_output.push_back(out);
+
+ write(m_fd[1], "", 1);
+ pthread_mutex_unlock(&m_mutex_output_data);
+ }
+}
+
+void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
+{
+ pthread_mutex_lock(&m_mutex_input_data);
+ m_input.push_back(m);
+ pthread_cond_signal(&m_cond_input_data);
+ pthread_mutex_unlock(&m_mutex_input_data);
+}
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
--- /dev/null
+/* $Id: thread_pool_observer.h,v 1.1 2005-10-06 19:33:58 adam Exp $
+ Copyright (c) 1998-2005, Index Data.
+
+This file is part of the yaz-proxy.
+
+YAZ proxy is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 2, or (at your option) any later
+version.
+
+YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+for more details.
+
+You should have received a copy of the GNU General Public License
+along with YAZ proxy; see the file LICENSE. If not, write to the
+Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
+02111-1307, USA.
+ */
+
+#include <pthread.h>
+#include <unistd.h>
+#include <ctype.h>
+
+#if HAVE_DLFCN_H
+#include <dlfcn.h>
+#endif
+
+#include <deque>
+#include <yaz++/socket-observer.h>
+#include <yaz/yconfig.h>
+
+class IThreadPoolMsg {
+public:
+ virtual IThreadPoolMsg *handle() = 0;
+ virtual void result() = 0;
+ virtual ~IThreadPoolMsg();
+};
+
+class ThreadPoolSocketObserver : public yazpp_1::ISocketObserver {
+ public:
+ ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs, int no_threads);
+ virtual ~ThreadPoolSocketObserver();
+ void socketNotify(int event);
+ void put(IThreadPoolMsg *m);
+ IThreadPoolMsg *get();
+ void run(void *p);
+ int m_fd[2];
+private:
+ yazpp_1::ISocketObservable *m_SocketObservable;
+ int m_no_threads;
+ pthread_t *m_thread_id;
+
+ std::deque<IThreadPoolMsg *> m_input;
+ std::deque<IThreadPoolMsg *> m_output;
+ pthread_mutex_t m_mutex_input_data;
+ pthread_cond_t m_cond_input_data;
+ pthread_mutex_t m_mutex_output_data;
+ bool m_stop_flag;
+};
+
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+