1 /* $Id: thread_pool_observer.cpp,v 1.19 2007-02-19 12:51:08 adam Exp $
2 Copyright (c) 2005-2007, Index Data.
4 See the LICENSE file for details
16 #include <sys/socket.h>
19 #include <boost/thread/thread.hpp>
20 #include <boost/thread/mutex.hpp>
21 #include <boost/thread/condition.hpp>
28 #include <yazpp/socket-observer.h>
31 #include "thread_pool_observer.hpp"
34 namespace metaproxy_1 {
35 class ThreadPoolSocketObserver::Worker {
37 Worker(ThreadPoolSocketObserver *s) : m_s(s) {};
38 ThreadPoolSocketObserver *m_s;
39 void operator() (void) {
44 class ThreadPoolSocketObserver::Rep : public boost::noncopyable {
45 friend class ThreadPoolSocketObserver;
47 Rep(yazpp_1::ISocketObservable *obs);
50 yazpp_1::ISocketObservable *m_socketObservable;
52 boost::thread_group m_thrds;
53 boost::mutex m_mutex_input_data;
54 boost::condition m_cond_input_data;
55 boost::mutex m_mutex_output_data;
56 std::deque<IThreadPoolMsg *> m_input;
57 std::deque<IThreadPoolMsg *> m_output;
64 using namespace yazpp_1;
65 using namespace metaproxy_1;
67 ThreadPoolSocketObserver::Rep::Rep(yazpp_1::ISocketObservable *obs)
68 : m_socketObservable(obs), m_pipe(9123)
72 ThreadPoolSocketObserver::Rep::~Rep()
76 IThreadPoolMsg::~IThreadPoolMsg()
81 ThreadPoolSocketObserver::ThreadPoolSocketObserver(
82 yazpp_1::ISocketObservable *obs, int no_threads)
85 obs->addObserver(m_p->m_pipe.read_fd(), this);
86 obs->maskObserver(this, SOCKET_OBSERVE_READ);
88 m_p->m_stop_flag = false;
89 m_p->m_no_threads = no_threads;
91 for (i = 0; i<no_threads; i++)
94 m_p->m_thrds.add_thread(new boost::thread(w));
98 ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
101 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
102 m_p->m_stop_flag = true;
103 m_p->m_cond_input_data.notify_all();
105 m_p->m_thrds.join_all();
107 m_p->m_socketObservable->deleteObserver(this);
110 void ThreadPoolSocketObserver::socketNotify(int event)
112 if (event & SOCKET_OBSERVE_READ)
116 recv(m_p->m_pipe.read_fd(), buf, 1, 0);
118 read(m_p->m_pipe.read_fd(), buf, 1);
122 boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
123 out = m_p->m_output.front();
124 m_p->m_output.pop_front();
131 void ThreadPoolSocketObserver::run(void *p)
135 IThreadPoolMsg *in = 0;
137 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
138 while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
139 m_p->m_cond_input_data.wait(input_lock);
140 if (m_p->m_stop_flag)
143 in = m_p->m_input.front();
144 m_p->m_input.pop_front();
146 IThreadPoolMsg *out = in->handle();
148 boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
149 m_p->m_output.push_back(out);
151 send(m_p->m_pipe.write_fd(), "", 1, 0);
153 write(m_p->m_pipe.write_fd(), "", 1);
159 void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
161 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
162 m_p->m_input.push_back(m);
163 m_p->m_cond_input_data.notify_one();
168 * indent-tabs-mode: nil
169 * c-file-style: "stroustrup"
171 * vim: shiftwidth=4 tabstop=8 expandtab