From: Adam Dickmeiss Date: Mon, 19 Feb 2007 12:51:08 +0000 (+0000) Subject: Fixed bug #895: Metaproxy fails some 2 tests on flurry. The reason X-Git-Tag: METAPROXY.1.0.10~39 X-Git-Url: http://lists.indexdata.com/cgi-bin?a=commitdiff_plain;ds=inline;h=2b84829b69bc668745a1fed29e8a0447eecce811;p=metaproxy-moved-to-github.git Fixed bug #895: Metaproxy fails some 2 tests on flurry. The reason for tests failing was due to a an exisiting service on port 9123 which is used temporarily for making a "pipe" for ThreadPoolSocketObserver class implementation. The code now uses a regular pipe on Unix and only a real socket on Windows. However, the code is updated to use write/read on Unix in this case, since send/recv does not work on pipes. OTOH, on Windows only send/recv are supported on sockets. --- diff --git a/src/pipe.cpp b/src/pipe.cpp index 3a05896..bf54df5 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -1,4 +1,4 @@ -/* $Id: pipe.cpp,v 1.9 2007-01-25 14:05:54 adam Exp $ +/* $Id: pipe.cpp,v 1.10 2007-02-19 12:51:08 adam Exp $ Copyright (c) 2005-2007, Index Data. See the LICENSE file for details @@ -97,6 +97,8 @@ Pipe::Pipe(int port_to_use) : m_p(new Rep) WORD wVersionRequested = MAKEWORD(2, 0); if (WSAStartup( wVersionRequested, &wsaData )) throw Pipe::Error("WSAStartup failed"); +#else + port_to_use = 0; // we'll just use pipe on Unix #endif if (port_to_use) { @@ -172,7 +174,13 @@ Pipe::Pipe(int port_to_use) : m_p(new Rep) else { #ifndef WIN32 - pipe(m_p->m_fd); + if (pipe(m_p->m_fd)) + throw Pipe::Error("pipe failed"); + else + { + assert(m_p->m_fd[0] >= 0); + assert(m_p->m_fd[1] >= 0); + } #endif } } diff --git a/src/test_pipe.cpp b/src/test_pipe.cpp index 282c50c..162e673 100644 --- a/src/test_pipe.cpp +++ b/src/test_pipe.cpp @@ -1,13 +1,25 @@ -/* $Id: test_pipe.cpp,v 1.8 2007-01-25 14:05:54 adam Exp $ +/* $Id: test_pipe.cpp,v 1.9 2007-02-19 12:51:08 adam Exp $ Copyright (c) 2005-2007, Index Data. See the LICENSE file for details */ #include "config.hpp" - +#include #include +#if HAVE_UNISTD_H +#include +#endif + +#ifdef WIN32 +#include +#endif + +#if HAVE_SYS_SOCKET_H +#include +#endif + #include #include @@ -24,39 +36,69 @@ class Timer : public yazpp_1::ISocketObserver { private: yazpp_1::ISocketObservable *m_obs; mp::Pipe m_pipe; + bool m_data; bool m_timeout; public: Timer(yazpp_1::ISocketObservable *obs, int duration); void socketNotify(int event); bool timeout() { return m_timeout; }; + bool data() { return m_data; }; }; Timer::Timer(yazpp_1::ISocketObservable *obs, int duration) : - m_obs(obs), m_pipe(9122), m_timeout(false) + m_obs(obs), m_pipe(9122), m_data(false), m_timeout(false) { obs->addObserver(m_pipe.read_fd(), this); obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ); obs->timeoutObserver(this, duration); +#ifdef WIN32 + int r = send(m_pipe.write_fd(), "", 1, 0); +#else + int r = write(m_pipe.write_fd(), "", 1); +#endif + if (r == -1) + { + std::cout << "Error write: "<< strerror(errno) << std::endl; + } + BOOST_CHECK_EQUAL(write(m_pipe.write_fd(), "", 1), 1); } void Timer::socketNotify(int event) { - m_timeout = true; - m_obs->deleteObserver(this); + if (event & yazpp_1::SOCKET_OBSERVE_READ) + { + m_data = true; + char buf[3]; +#ifdef WIN32 + int r = recv(m_pipe.read_fd(), buf, 1, 0); +#else + int r = read(m_pipe.read_fd(), buf, 1); +#endif + if (r == -1) + { + std::cout << "Error read: "<< strerror(errno) << std::endl; + } + } + else if (event && yazpp_1::SOCKET_OBSERVE_TIMEOUT) + { + m_timeout = true; + m_obs->deleteObserver(this); + } } BOOST_AUTO_UNIT_TEST( test_pipe_1 ) { yazpp_1::SocketManager mySocketManager; - Timer t(&mySocketManager, 0); + Timer t(&mySocketManager, 1); while (mySocketManager.processEvent() > 0) if (t.timeout()) break; BOOST_CHECK(t.timeout()); + BOOST_CHECK(t.data()); } /* diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index d8a2fbd..ab72255 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -1,4 +1,4 @@ -/* $Id: thread_pool_observer.cpp,v 1.18 2007-01-25 14:05:54 adam Exp $ +/* $Id: thread_pool_observer.cpp,v 1.19 2007-02-19 12:51:08 adam Exp $ Copyright (c) 2005-2007, Index Data. See the LICENSE file for details @@ -112,7 +112,11 @@ void ThreadPoolSocketObserver::socketNotify(int event) if (event & SOCKET_OBSERVE_READ) { char buf[2]; +#ifdef WIN32 recv(m_p->m_pipe.read_fd(), buf, 1, 0); +#else + read(m_p->m_pipe.read_fd(), buf, 1); +#endif IThreadPoolMsg *out; { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); @@ -143,7 +147,11 @@ void ThreadPoolSocketObserver::run(void *p) { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); m_p->m_output.push_back(out); +#ifdef WIN32 send(m_p->m_pipe.write_fd(), "", 1, 0); +#else + write(m_p->m_pipe.write_fd(), "", 1); +#endif } } }