1 /* $Id: thread_pool_observer.cpp,v 1.21 2007-05-09 21:23:09 adam Exp $
2 Copyright (c) 2005-2007, Index Data.
4 This file is part of Metaproxy.
6 Metaproxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
11 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 You should have received a copy of the GNU General Public License
17 along with Metaproxy; see the file LICENSE. If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
32 #include <sys/socket.h>
35 #include <boost/thread/thread.hpp>
36 #include <boost/thread/mutex.hpp>
37 #include <boost/thread/condition.hpp>
44 #include <yazpp/socket-observer.h>
47 #include "thread_pool_observer.hpp"
50 namespace metaproxy_1 {
51 class ThreadPoolSocketObserver::Worker {
53 Worker(ThreadPoolSocketObserver *s) : m_s(s) {};
54 ThreadPoolSocketObserver *m_s;
55 void operator() (void) {
60 class ThreadPoolSocketObserver::Rep : public boost::noncopyable {
61 friend class ThreadPoolSocketObserver;
63 Rep(yazpp_1::ISocketObservable *obs);
66 yazpp_1::ISocketObservable *m_socketObservable;
68 boost::thread_group m_thrds;
69 boost::mutex m_mutex_input_data;
70 boost::condition m_cond_input_data;
71 boost::condition m_cond_input_full;
72 boost::mutex m_mutex_output_data;
73 std::deque<IThreadPoolMsg *> m_input;
74 std::deque<IThreadPoolMsg *> m_output;
76 unsigned m_no_threads;
78 const unsigned int queue_size_per_thread = 64;
83 using namespace yazpp_1;
84 using namespace metaproxy_1;
86 ThreadPoolSocketObserver::Rep::Rep(yazpp_1::ISocketObservable *obs)
87 : m_socketObservable(obs), m_pipe(9123)
91 ThreadPoolSocketObserver::Rep::~Rep()
95 IThreadPoolMsg::~IThreadPoolMsg()
100 ThreadPoolSocketObserver::ThreadPoolSocketObserver(
101 yazpp_1::ISocketObservable *obs, int no_threads)
104 obs->addObserver(m_p->m_pipe.read_fd(), this);
105 obs->maskObserver(this, SOCKET_OBSERVE_READ);
107 m_p->m_stop_flag = false;
108 m_p->m_no_threads = no_threads;
110 for (i = 0; i<no_threads; i++)
113 m_p->m_thrds.add_thread(new boost::thread(w));
117 ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
120 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
121 m_p->m_stop_flag = true;
122 m_p->m_cond_input_data.notify_all();
124 m_p->m_thrds.join_all();
126 m_p->m_socketObservable->deleteObserver(this);
129 void ThreadPoolSocketObserver::socketNotify(int event)
131 if (event & SOCKET_OBSERVE_READ)
135 recv(m_p->m_pipe.read_fd(), buf, 1, 0);
137 read(m_p->m_pipe.read_fd(), buf, 1);
141 boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
142 out = m_p->m_output.front();
143 m_p->m_output.pop_front();
150 void ThreadPoolSocketObserver::run(void *p)
154 IThreadPoolMsg *in = 0;
156 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
157 while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
158 m_p->m_cond_input_data.wait(input_lock);
159 if (m_p->m_stop_flag)
162 in = m_p->m_input.front();
163 m_p->m_input.pop_front();
164 m_p->m_cond_input_full.notify_all();
166 IThreadPoolMsg *out = in->handle();
168 boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
169 m_p->m_output.push_back(out);
171 send(m_p->m_pipe.write_fd(), "", 1, 0);
173 write(m_p->m_pipe.write_fd(), "", 1);
179 void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
181 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
182 while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
183 m_p->m_cond_input_full.wait(input_lock);
184 m_p->m_input.push_back(m);
185 m_p->m_cond_input_data.notify_one();
190 * indent-tabs-mode: nil
191 * c-file-style: "stroustrup"
193 * vim: shiftwidth=4 tabstop=8 expandtab