1 /* This file is part of the yazpp toolkit.
2 * Copyright (C) 1998-2012 Index Data and Mike Taylor
3 * See the file LICENSE for details.
12 #include <yaz/tcpip.h>
14 #include <yazpp/pdu-assoc.h>
20 using namespace yazpp_1;
22 void PDU_Assoc::init(ISocketObservable *socketObservable)
26 m_socketObservable = socketObservable;
38 m_session_is_dead = false;
41 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
43 init (socketObservable);
46 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
49 init(socketObservable);
52 if (cs->io_pending & CS_WANT_WRITE)
53 mask |= SOCKET_OBSERVE_WRITE;
54 if (cs->io_pending & CS_WANT_READ)
55 mask |= SOCKET_OBSERVE_READ;
56 m_socketObservable->addObserver(cs_fileno(cs), this);
59 yaz_log (m_log, "new PDU_Assoc. Ready");
65 yaz_log (m_log, "new PDU_Assoc. Accepting");
66 // assume comstack is accepting...
68 m_socketObservable->addObserver(cs_fileno(cs), this);
69 yaz_log(m_log, "maskObserver 1");
70 m_socketObservable->maskObserver(this,
71 mask |SOCKET_OBSERVE_EXCEPT);
76 IPDU_Observable *PDU_Assoc::clone()
78 PDU_Assoc *copy = new PDU_Assoc(m_socketObservable);
82 void PDU_Assoc::socketNotify(int event)
84 yaz_log (m_log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
85 this, m_state, event);
86 if (event & SOCKET_OBSERVE_EXCEPT)
89 m_PDU_Observer->failNotify();
92 else if (event & SOCKET_OBSERVE_TIMEOUT)
94 m_PDU_Observer->timeoutNotify();
100 if (!cs_accept (m_cs))
102 yaz_log (m_log, "PDU_Assoc::cs_accept failed");
105 m_PDU_Observer->failNotify();
110 if (m_cs->io_pending & CS_WANT_WRITE)
111 mask |= SOCKET_OBSERVE_WRITE;
112 if (m_cs->io_pending & CS_WANT_READ)
113 mask |= SOCKET_OBSERVE_READ;
115 { // accept is complete. turn to ready state and write if needed
120 { // accept still incomplete.
121 yaz_log(m_log, "maskObserver 2");
122 m_socketObservable->maskObserver(this,
123 mask|SOCKET_OBSERVE_EXCEPT);
128 if (event & SOCKET_OBSERVE_READ &&
129 event & SOCKET_OBSERVE_WRITE)
131 // For Unix: if both read and write is set, then connect failed.
133 m_PDU_Observer->failNotify();
137 yaz_log (m_log, "cs_rcvconnect");
138 int res = cs_rcvconnect (m_cs);
141 unsigned mask = SOCKET_OBSERVE_EXCEPT;
142 if (m_cs->io_pending & CS_WANT_WRITE)
143 mask |= SOCKET_OBSERVE_WRITE;
144 if (m_cs->io_pending & CS_WANT_READ)
145 mask |= SOCKET_OBSERVE_READ;
146 yaz_log(m_log, "maskObserver 3");
147 m_socketObservable->maskObserver(this, mask);
153 m_PDU_Observer->connectNotify();
159 if (event & SOCKET_OBSERVE_READ)
164 if ((res = cs_listen(m_cs, 0, 0)) == 1)
168 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
171 if (!(new_line = cs_accept(m_cs)))
173 /* 1. create socket-manager
175 3. create top-level object
176 setup observer for child fileid in pdu-assoc
179 yaz_log (m_log, "new session: parent fd=%d child fd=%d",
180 cs_fileno(m_cs), cs_fileno(new_line));
181 childNotify (new_line);
185 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
189 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
193 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
196 unsigned mask = SOCKET_OBSERVE_EXCEPT;
197 if (m_cs->io_pending & CS_WANT_WRITE)
198 mask |= SOCKET_OBSERVE_WRITE;
199 if (m_cs->io_pending & CS_WANT_READ)
200 mask |= SOCKET_OBSERVE_READ;
201 yaz_log(m_log, "maskObserver 4");
202 m_socketObservable->maskObserver(this, mask);
207 yaz_log (m_log, "PDU_Assoc::Connection closed by peer");
210 m_PDU_Observer->failNotify(); // problem here..
213 // lock it, so we know if recv_PDU deletes it.
215 m_destroyed = &destroyed;
220 PDU_Queue **pq = &m_queue_in;
224 *pq = new PDU_Queue(m_input_buf, res);
226 m_PDU_Observer->recv_PDU(m_input_buf, res);
228 if (destroyed) // it really was destroyed, return now.
231 } while (m_cs && cs_more (m_cs));
232 if (m_cs && m_state == Ready)
234 yaz_log(m_log, "maskObserver 5");
235 m_socketObservable->maskObserver(this,
236 SOCKET_OBSERVE_EXCEPT|
237 SOCKET_OBSERVE_READ);
242 yaz_log (m_log, "CLOSING state=%d event was %d", m_state, event);
244 m_PDU_Observer->failNotify();
247 yaz_log (m_log, "Unknown state=%d event was %d", m_state, event);
249 m_PDU_Observer->failNotify();
253 void PDU_Assoc::close_session()
255 m_session_is_dead = true;
259 m_PDU_Observer->failNotify();
263 void PDU_Assoc::shutdown()
266 for (ch = m_children; ch; ch = ch->m_next)
269 m_socketObservable->deleteObserver(this);
273 yaz_log (m_log, "PDU_Assoc::close fd=%d", cs_fileno(m_cs));
279 PDU_Queue *q_this = m_queue_out;
280 m_queue_out = m_queue_out->m_next;
288 void PDU_Assoc::destroy()
296 // delete from parent's child list (if any)
299 c = &m_parent->m_children;
307 // delete all children ...
311 PDU_Assoc *here = *c;
316 yaz_log (m_log, "PDU_Assoc::destroy this=%p", this);
319 PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
321 m_buf = (char *) xmalloc (len);
322 memcpy (m_buf, buf, len);
327 PDU_Assoc::PDU_Queue::~PDU_Queue()
332 int PDU_Assoc::flush_PDU()
336 if (m_state != Ready && m_state != Writing)
338 yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
341 PDU_Queue *q = m_queue_out;
345 yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
346 yaz_log(m_log, "maskObserver 6");
347 m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
348 SOCKET_OBSERVE_WRITE|
349 SOCKET_OBSERVE_EXCEPT);
350 if (m_session_is_dead)
353 m_PDU_Observer->failNotify();
357 r = cs_put (m_cs, q->m_buf, q->m_len);
360 yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put failed");
362 m_PDU_Observer->failNotify();
367 unsigned mask = SOCKET_OBSERVE_EXCEPT;
369 if (m_cs->io_pending & CS_WANT_WRITE)
370 mask |= SOCKET_OBSERVE_WRITE;
371 if (m_cs->io_pending & CS_WANT_READ)
372 mask |= SOCKET_OBSERVE_READ;
374 mask |= SOCKET_OBSERVE_WRITE;
375 yaz_log(m_log, "maskObserver 7");
376 m_socketObservable->maskObserver(this, mask);
377 yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
378 q->m_len, cs_fileno(m_cs));
381 yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
382 // whole packet sent... delete this and proceed to next ...
383 m_queue_out = q->m_next;
385 // don't select on write if queue is empty ...
389 yaz_log(m_log, "maskObserver 8");
390 m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
391 SOCKET_OBSERVE_EXCEPT);
392 if (m_session_is_dead)
398 int PDU_Assoc::send_PDU(const char *buf, int len)
400 yaz_log (m_log, "PDU_Assoc::send_PDU");
401 PDU_Queue **pq = &m_queue_out;
402 int is_idle = (*pq ? 0 : 1);
406 yaz_log (m_log, "PDU_Assoc::send_PDU failed, m_cs == 0");
411 *pq = new PDU_Queue(buf, len);
415 yaz_log (m_log, "PDU_Assoc::cannot send_PDU fd=%d",
420 COMSTACK PDU_Assoc::comstack(const char *type_and_host, void **vp)
422 return cs_create_host(type_and_host, 2, vp);
425 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
429 m_socketObservable->deleteObserver(this);
433 yaz_log (m_log, "PDU_Assoc::close fd=%d", cs_fileno(m_cs));
439 PDU_Queue *q_this = m_queue_out;
440 m_queue_out = m_queue_out->m_next;
452 m_PDU_Observer = observer;
454 m_cs = comstack(addr, &ap);
458 if (cs_bind(m_cs, ap, CS_SERVER) < 0)
461 int fd = cs_fileno(m_cs);
463 int oldflags = fcntl(fd, F_GETFD, 0);
466 oldflags |= FD_CLOEXEC;
467 fcntl(fd, F_SETFD, oldflags);
470 m_socketObservable->addObserver(fd, this);
471 yaz_log(m_log, "maskObserver 9");
472 m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
473 SOCKET_OBSERVE_EXCEPT);
474 yaz_log (m_log, "PDU_Assoc::listen ok fd=%d", fd);
479 void PDU_Assoc::idleTime(int idleTime)
481 m_idleTime = idleTime;
482 yaz_log (m_log, "PDU_Assoc::idleTime(%d)", idleTime);
483 m_socketObservable->timeoutObserver(this, m_idleTime);
486 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
488 yaz_log (m_log, "PDU_Assoc::connect %s", addr);
490 m_PDU_Observer = observer;
492 m_cs = comstack(addr, &ap);
495 int res = cs_connect (m_cs, ap);
496 yaz_log (m_log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs),
498 m_socketObservable->addObserver(cs_fileno(m_cs), this);
501 { // Connect complete
502 m_state = Connecting;
503 unsigned mask = SOCKET_OBSERVE_EXCEPT;
504 mask |= SOCKET_OBSERVE_WRITE;
505 mask |= SOCKET_OBSERVE_READ;
506 yaz_log(m_log, "maskObserver 11");
507 m_socketObservable->maskObserver(this, mask);
511 m_state = Connecting;
512 unsigned mask = SOCKET_OBSERVE_EXCEPT;
513 if (m_cs->io_pending & CS_WANT_WRITE)
514 mask |= SOCKET_OBSERVE_WRITE;
515 if (m_cs->io_pending & CS_WANT_READ)
516 mask |= SOCKET_OBSERVE_READ;
517 yaz_log(m_log, "maskObserver 11");
518 m_socketObservable->maskObserver(this, mask);
521 { // Connect failed immediately
522 // Since m_state is Closed we can distinguish this case from
523 // normal connect in socketNotify handler
524 yaz_log(m_log, "maskObserver 12");
525 m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
526 SOCKET_OBSERVE_EXCEPT);
531 // Single-threaded... Only useful for non-blocking handlers
532 void PDU_Assoc::childNotify(COMSTACK cs)
534 PDU_Assoc *new_observable =
535 new PDU_Assoc (m_socketObservable, cs);
537 // Clone PDU Observer
538 new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
539 (new_observable, cs_fileno(cs));
541 if (!new_observable->m_PDU_Observer)
543 new_observable->shutdown();
544 delete new_observable;
547 new_observable->m_next = m_children;
548 m_children = new_observable;
549 new_observable->m_parent = this;
552 const char*PDU_Assoc::getpeername()
556 return cs_addrstr(m_cs);
561 * c-file-style: "Stroustrup"
562 * indent-tabs-mode: nil
564 * vim: shiftwidth=4 tabstop=8 expandtab