1 /* This file is part of the yazpp toolkit.
2 * Copyright (C) Index Data and Mike Taylor
3 * See the file LICENSE for details.
12 #include <yaz/tcpip.h>
14 #include <yazpp/pdu-assoc.h>
20 using namespace yazpp_1;
23 class PDU_Assoc_priv {
24 friend class PDU_Assoc;
36 PDU_Queue(const char *buf, int len);
42 PDU_Assoc *pdu_parent;
43 PDU_Assoc *pdu_children;
46 yazpp_1::ISocketObservable *m_socketObservable;
54 void init(yazpp_1::ISocketObservable *socketObservable);
55 COMSTACK comstack(const char *type_and_host, void **vp);
56 bool m_session_is_dead;
61 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
65 m_socketObservable = socketObservable;
76 m_session_is_dead = false;
80 PDU_Assoc::~PDU_Assoc()
82 xfree(m_p->cert_fname);
86 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
89 m_p = new PDU_Assoc_priv;
90 m_p->init(socketObservable);
93 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
97 m_p = new PDU_Assoc_priv;
98 m_p->init(socketObservable);
101 if (cs->io_pending & CS_WANT_WRITE)
102 mask |= SOCKET_OBSERVE_WRITE;
103 if (cs->io_pending & CS_WANT_READ)
104 mask |= SOCKET_OBSERVE_READ;
105 m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
108 yaz_log(m_p->log, "new PDU_Assoc. Ready");
109 m_p->state = PDU_Assoc_priv::Ready;
114 yaz_log(m_p->log, "new PDU_Assoc. Accepting");
115 // assume comstack is accepting...
116 m_p->state = PDU_Assoc_priv::Accepting;
117 m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
118 yaz_log(m_p->log, "maskObserver 1");
119 m_p->m_socketObservable->maskObserver(this,
120 mask|SOCKET_OBSERVE_EXCEPT);
125 IPDU_Observable *PDU_Assoc::clone()
127 PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
131 void PDU_Assoc::socketNotify(int event)
133 yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
134 this, m_p->state, event);
135 if (event & SOCKET_OBSERVE_EXCEPT)
138 m_PDU_Observer->failNotify();
141 else if (event & SOCKET_OBSERVE_TIMEOUT)
143 m_PDU_Observer->timeoutNotify();
148 case PDU_Assoc_priv::Accepting:
149 if (!cs_accept(m_p->cs))
151 yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
154 m_PDU_Observer->failNotify();
159 if (m_p->cs->io_pending & CS_WANT_WRITE)
160 mask |= SOCKET_OBSERVE_WRITE;
161 if (m_p->cs->io_pending & CS_WANT_READ)
162 mask |= SOCKET_OBSERVE_READ;
164 { // accept is complete. turn to ready state and write if needed
165 m_p->state = PDU_Assoc_priv::Ready;
169 { // accept still incomplete.
170 yaz_log(m_p->log, "maskObserver 2");
171 m_p->m_socketObservable->maskObserver(this,
172 mask|SOCKET_OBSERVE_EXCEPT);
176 case PDU_Assoc_priv::Connecting:
177 if (event & SOCKET_OBSERVE_READ &&
178 event & SOCKET_OBSERVE_WRITE)
180 // For Unix: if both read and write is set, then connect failed.
182 m_PDU_Observer->failNotify();
186 yaz_log(m_p->log, "cs_rcvconnect");
187 int res = cs_rcvconnect(m_p->cs);
190 unsigned mask = SOCKET_OBSERVE_EXCEPT;
191 if (m_p->cs->io_pending & CS_WANT_WRITE)
192 mask |= SOCKET_OBSERVE_WRITE;
193 if (m_p->cs->io_pending & CS_WANT_READ)
194 mask |= SOCKET_OBSERVE_READ;
195 yaz_log(m_p->log, "maskObserver 3");
196 m_p->m_socketObservable->maskObserver(this, mask);
200 m_p->state = PDU_Assoc_priv::Ready;
202 m_PDU_Observer->connectNotify();
207 case PDU_Assoc_priv::Listen:
208 if (event & SOCKET_OBSERVE_READ)
213 if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
217 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
220 if (!(new_line = cs_accept(m_p->cs)))
222 /* 1. create socket-manager
224 3. create top-level object
225 setup observer for child fileid in pdu-assoc
228 yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
229 cs_fileno(m_p->cs), cs_fileno(new_line));
230 childNotify(new_line);
233 case PDU_Assoc_priv::Writing:
234 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
237 case PDU_Assoc_priv::Ready:
238 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
242 int res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
245 unsigned mask = SOCKET_OBSERVE_EXCEPT;
246 if (m_p->cs->io_pending & CS_WANT_WRITE)
247 mask |= SOCKET_OBSERVE_WRITE;
248 if (m_p->cs->io_pending & CS_WANT_READ)
249 mask |= SOCKET_OBSERVE_READ;
250 yaz_log(m_p->log, "maskObserver 4");
251 m_p->m_socketObservable->maskObserver(this, mask);
256 yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
259 m_PDU_Observer->failNotify(); // problem here..
262 // lock it, so we know if recv_PDU deletes it.
264 m_p->destroyed = &destroyed;
269 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
273 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
275 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
277 if (destroyed) // it really was destroyed, return now.
280 } while (m_p->cs && cs_more(m_p->cs));
281 if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
283 yaz_log(m_p->log, "maskObserver 5");
284 m_p->m_socketObservable->maskObserver(this,
285 SOCKET_OBSERVE_EXCEPT|
286 SOCKET_OBSERVE_READ);
290 case PDU_Assoc_priv::Closed:
291 yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
294 m_PDU_Observer->failNotify();
297 yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
299 m_PDU_Observer->failNotify();
303 void PDU_Assoc::close_session()
305 m_p->m_session_is_dead = true;
309 m_PDU_Observer->failNotify();
313 void PDU_Assoc::shutdown()
316 for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
319 m_p->m_socketObservable->deleteObserver(this);
320 m_p->state = PDU_Assoc_priv::Closed;
323 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
327 while (m_p->queue_out)
329 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
330 m_p->queue_out = m_p->queue_out->m_next;
333 xfree(m_p->input_buf);
338 void PDU_Assoc::destroy()
346 // delete from parent's child list (if any)
349 c = &m_p->pdu_parent->m_p->pdu_children;
353 c = &(*c)->m_p->pdu_next;
355 *c = (*c)->m_p->pdu_next;
357 // delete all children ...
358 c = &m_p->pdu_children;
361 PDU_Assoc *here = *c;
362 *c = (*c)->m_p->pdu_next;
363 here->m_p->pdu_parent = 0;
366 yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
369 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
371 m_buf = (char *) xmalloc(len);
372 memcpy(m_buf, buf, len);
377 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
382 int PDU_Assoc::flush_PDU()
386 if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
388 yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU, not ready");
391 PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
394 m_p->state = PDU_Assoc_priv::Ready;
395 yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU queue empty");
396 yaz_log(m_p->log, "maskObserver 6");
397 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
398 SOCKET_OBSERVE_WRITE|
399 SOCKET_OBSERVE_EXCEPT);
400 if (m_p->m_session_is_dead)
403 m_PDU_Observer->failNotify();
407 r = cs_put(m_p->cs, q->m_buf, q->m_len);
410 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
412 m_PDU_Observer->failNotify();
417 unsigned mask = SOCKET_OBSERVE_EXCEPT;
418 m_p->state = PDU_Assoc_priv::Writing;
419 if (m_p->cs->io_pending & CS_WANT_WRITE)
420 mask |= SOCKET_OBSERVE_WRITE;
421 if (m_p->cs->io_pending & CS_WANT_READ)
422 mask |= SOCKET_OBSERVE_READ;
424 mask |= SOCKET_OBSERVE_WRITE;
425 yaz_log(m_p->log, "maskObserver 7");
426 m_p->m_socketObservable->maskObserver(this, mask);
427 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
428 q->m_len, cs_fileno(m_p->cs));
431 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
432 // whole packet sent... delete this and proceed to next ...
433 m_p->queue_out = q->m_next;
435 // don't select on write if queue is empty ...
438 m_p->state = PDU_Assoc_priv::Ready;
439 yaz_log(m_p->log, "maskObserver 8");
440 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
441 SOCKET_OBSERVE_EXCEPT);
442 if (m_p->m_session_is_dead)
448 int PDU_Assoc::send_PDU(const char *buf, int len)
450 yaz_log(m_p->log, "PDU_Assoc::send_PDU");
451 PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
452 int is_idle = (*pq ? 0 : 1);
456 yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
461 *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
465 yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
470 COMSTACK PDU_Assoc_priv::comstack(const char *type_and_host, void **vp)
472 return cs_create_host(type_and_host, 2, vp);
475 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
479 m_p->m_socketObservable->deleteObserver(this);
480 m_p->state = PDU_Assoc_priv::Closed;
483 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
487 while (m_p->queue_out)
489 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
490 m_p->queue_out = m_p->queue_out->m_next;
493 xfree(m_p->input_buf);
502 m_PDU_Observer = observer;
504 m_p->cs = m_p->comstack(addr, &ap);
510 cs_set_ssl_certificate_file(m_p->cs, m_p->cert_fname);
512 if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
515 int fd = cs_fileno(m_p->cs);
517 int oldflags = fcntl(fd, F_GETFD, 0);
520 oldflags |= FD_CLOEXEC;
521 fcntl(fd, F_SETFD, oldflags);
524 m_p->m_socketObservable->addObserver(fd, this);
525 yaz_log(m_p->log, "maskObserver 9");
526 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
527 SOCKET_OBSERVE_EXCEPT);
528 yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
529 m_p->state = PDU_Assoc_priv::Listen;
533 COMSTACK PDU_Assoc::get_comstack()
538 void PDU_Assoc::idleTime(int idleTime)
540 m_p->idleTime = idleTime;
541 yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
542 m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
545 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
547 yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
549 m_PDU_Observer = observer;
551 m_p->cs = m_p->comstack(addr, &ap);
554 int res = cs_connect(m_p->cs, ap);
555 yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
557 m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
560 { // Connect complete
561 m_p->state = PDU_Assoc_priv::Connecting;
562 unsigned mask = SOCKET_OBSERVE_EXCEPT;
563 mask |= SOCKET_OBSERVE_WRITE;
564 mask |= SOCKET_OBSERVE_READ;
565 yaz_log(m_p->log, "maskObserver 11");
566 m_p->m_socketObservable->maskObserver(this, mask);
570 m_p->state = PDU_Assoc_priv::Connecting;
571 unsigned mask = SOCKET_OBSERVE_EXCEPT;
572 if (m_p->cs->io_pending & CS_WANT_WRITE)
573 mask |= SOCKET_OBSERVE_WRITE;
574 if (m_p->cs->io_pending & CS_WANT_READ)
575 mask |= SOCKET_OBSERVE_READ;
576 yaz_log(m_p->log, "maskObserver 11");
577 m_p->m_socketObservable->maskObserver(this, mask);
580 { // Connect failed immediately
581 // Since m_state is Closed we can distinguish this case from
582 // normal connect in socketNotify handler
583 yaz_log(m_p->log, "maskObserver 12");
584 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
585 SOCKET_OBSERVE_EXCEPT);
590 // Single-threaded... Only useful for non-blocking handlers
591 void PDU_Assoc::childNotify(COMSTACK cs)
593 PDU_Assoc *new_observable =
594 new PDU_Assoc(m_p->m_socketObservable, cs);
596 // Clone PDU Observer
597 new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
598 (new_observable, cs_fileno(cs));
600 if (!new_observable->m_PDU_Observer)
602 new_observable->shutdown();
603 delete new_observable;
606 new_observable->m_p->pdu_next = m_p->pdu_children;
607 m_p->pdu_children = new_observable;
608 new_observable->m_p->pdu_parent = this;
611 const char*PDU_Assoc::getpeername()
615 return cs_addrstr(m_p->cs);
618 void PDU_Assoc::set_cert_fname(const char *fname)
620 xfree(m_p->cert_fname);
623 m_p->cert_fname = xstrdup(fname);
629 * c-file-style: "Stroustrup"
630 * indent-tabs-mode: nil
632 * vim: shiftwidth=4 tabstop=8 expandtab