1 /* $Id: pazpar2.c,v 1.2 2006-11-18 05:00:38 quinn Exp $ */
8 #include <sys/socket.h>
13 #include <yaz/comstack.h>
14 #include <yaz/tcpip.h>
15 #include <yaz/proto.h>
16 #include <yaz/readconf.h>
17 #include <yaz/pquery.h>
18 #include <yaz/yaz-util.h>
24 #define PAZPAR2_VERSION "0.1"
25 #define MAX_DATABASES 512
30 struct session *session;
35 char databases[MAX_DATABASES][128];
43 int requestid; // ID of current outstanding request
59 static char *state_strings[] = {
72 IOCHAN channel_list = 0;
74 static struct parameters {
75 int timeout; /* operations timeout, in seconds */
76 char implementationId[128];
77 char implementationName[128];
78 char implementationVersion[128];
79 struct timeval base_time;
86 "Index Data PazPar2 (MasterKey)",
94 static int send_apdu(struct target *t, Z_APDU *a)
99 if (!z_APDU(t->odr_out, &a, 0, 0))
101 odr_perror(t->odr_out, "Encoding APDU");
104 buf = odr_getbuf(t->odr_out, &len, 0);
105 r = cs_put(t->link, buf, len);
108 yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(t->link)));
113 fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
115 odr_reset(t->odr_out); /* release the APDU structure */
120 static void send_init(IOCHAN i)
122 struct target *t = iochan_getdata(i);
123 Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_initRequest);
125 a->u.initRequest->implementationId = global_parameters.implementationId;
126 a->u.initRequest->implementationName = global_parameters.implementationName;
127 a->u.initRequest->implementationVersion =
128 global_parameters.implementationVersion;
129 ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
130 ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
131 ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
133 ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
134 ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
135 ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
136 if (send_apdu(t, a) >= 0)
138 iochan_setflags(i, EVENT_INPUT);
139 t->state = Initializing;
149 static void send_search(IOCHAN i)
151 struct target *t = iochan_getdata(i);
152 struct session *s = t->session;
153 Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_searchRequest);
158 yaz_log(YLOG_DEBUG, "Sending search");
159 a->u.searchRequest->query = zquery = odr_malloc(t->odr_out, sizeof(Z_Query));
160 zquery->which = Z_Query_type_1;
161 zquery->u.type_1 = p_query_rpn(t->odr_out, PROTO_Z3950, s->query);
163 for (ndb = 0; *t->databases[ndb]; ndb++)
165 databaselist = odr_malloc(t->odr_out, sizeof(char*) * ndb);
166 for (ndb = 0; *t->databases[ndb]; ndb++)
167 databaselist[ndb] = t->databases[ndb];
169 a->u.searchRequest->resultSetName = "Default";
170 a->u.searchRequest->databaseNames = databaselist;
171 a->u.searchRequest->num_databaseNames = ndb;
173 if (send_apdu(t, a) >= 0)
175 iochan_setflags(i, EVENT_INPUT);
176 t->state = Searching;
177 t->requestid = s->requestid;
185 odr_reset(t->odr_out);
188 static void send_present(IOCHAN i)
190 struct target *t = iochan_getdata(i);
191 Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_presentRequest);
193 int start = t->records + 1;
195 toget = global_parameters.chunk;
196 if (toget > t->hits - t->records)
197 toget = t->hits - t->records;
199 yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
201 a->u.presentRequest->resultSetStartPoint = &start;
202 a->u.presentRequest->numberOfRecordsRequested = &toget;
204 a->u.presentRequest->resultSetId = "Default";
206 if (send_apdu(t, a) >= 0)
208 iochan_setflags(i, EVENT_INPUT);
209 t->state = Presenting;
217 odr_reset(t->odr_out);
220 static void do_initResponse(IOCHAN i, Z_APDU *a)
222 struct target *t = iochan_getdata(i);
223 Z_InitResponse *r = a->u.initResponse;
225 yaz_log(YLOG_DEBUG, "Received init response");
239 static void do_searchResponse(IOCHAN i, Z_APDU *a)
241 struct target *t = iochan_getdata(i);
242 Z_SearchResponse *r = a->u.searchResponse;
244 yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
246 if (*r->searchStatus)
248 t->hits = *r->resultCount;
256 Z_Records *recs = r->records;
257 if (recs->which == Z_Records_NSD)
259 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
260 t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
267 const char *find_field(const char *rec, const char *field)
269 const char *line = rec;
273 if (!strncmp(line, field, 3) && line[3] == ' ')
275 while (*(line++) != '\n')
281 const char *find_subfield(const char *field, char subfield)
283 const char *p = field;
285 while (*p && *p != '\n')
287 while (*p != '\n' && *p != '\t')
289 if (*p == '\t' && *(++p) == subfield) {
297 // Extract 245 $a $b 100 $a
298 char *extract_mergekey(struct session *s, const char *rec)
300 const char *field, *subfield;
302 char *out, *p, *pout;
304 wrbuf_rewind(s->wrbuf);
306 if (!(field = find_field(rec, "245")))
308 if (!(subfield = find_subfield(field, 'a')))
310 ef = index(subfield, '\n');
311 if ((e = index(subfield, '\t')) && e < ef)
315 wrbuf_write(s->wrbuf, subfield, ef - subfield);
316 if ((subfield = find_subfield(field, 'b')))
318 ef = index(subfield, '\n');
319 if ((e = index(subfield, '\t')) && e < ef)
323 wrbuf_puts(s->wrbuf, " field ");
324 wrbuf_write(s->wrbuf, subfield, ef - subfield);
328 if ((field = find_field(rec, "100")))
330 if ((subfield = find_subfield(field, 'a')))
332 ef = index(subfield, '\n');
333 if ((e = index(subfield, '\t')) && e < ef)
337 wrbuf_puts(s->wrbuf, " field ");
338 wrbuf_write(s->wrbuf, subfield, ef - subfield);
342 wrbuf_putc(s->wrbuf, '\0');
343 p = wrbuf_buf(s->wrbuf);
344 out = pout = nmem_malloc(s->nmem, strlen(p) + 1);
349 *(pout++) = tolower(*(p++));
350 while (*p && !isalnum(*p))
360 static void push_record(struct session *s, struct record *r)
363 assert(s->recheap_max + 1 < s->recheap_size);
365 s->recheap[p = ++s->recheap_max] = r;
368 int parent = (p - 1) >> 1;
369 if (strcmp(s->recheap[p]->merge_key, s->recheap[parent]->merge_key) < 0)
372 tmp = s->recheap[parent];
373 s->recheap[parent] = s->recheap[p];
382 static struct record *top_record(struct session *s)
384 return s-> recheap_max >= 0 ? s->recheap[0] : 0;
387 static struct record *pop_record(struct session *s)
389 struct record *res = s->recheap[0];
391 int lastnonleaf = (s->recheap_max - 1) >> 1;
393 if (s->recheap_max < 0)
396 s->recheap[p] = s->recheap[s->recheap_max--];
398 while (p <= lastnonleaf)
400 int right = (p + 1) << 1;
401 int left = right - 1;
404 if (right < s->recheap_max &&
405 strcmp(s->recheap[right]->merge_key, s->recheap[left]->merge_key) < 0)
407 if (strcmp(s->recheap[min]->merge_key, s->recheap[p]->merge_key) < 0)
409 struct record *tmp = s->recheap[min];
410 s->recheap[min] = s->recheap[p];
420 // Like pop_record but collapses identical (merge_key) records
421 // The heap will contain multiple independent matching records and possibly
422 // one cluster, created the last time the list was scanned
423 static struct record *pop_mrecord(struct session *s)
428 if (!(this = pop_record(s)))
431 // Collapse identical records
432 while ((next = top_record(s)))
434 struct record *p, *tmpnext;
435 if (strcmp(this->merge_key, next->merge_key))
437 // Absorb record (and clustersiblings) into a supercluster
438 for (p = next; p; p = tmpnext) {
439 tmpnext = p->next_cluster;
440 p->next_cluster = this->next_cluster;
441 this->next_cluster = p;
449 // Reads records in sort order. Store records in top of heapspace until rewind is called.
450 static struct record *read_recheap(struct session *s)
452 struct record *r = pop_mrecord(s);
456 if (s->recheap_scratch < 0)
457 s->recheap_scratch = s->recheap_size;
458 s->recheap[--s->recheap_scratch] = r;
464 // Return records to heap after read
465 static void rewind_recheap(struct session *s)
467 while (s->recheap_scratch >= 0) {
468 push_record(s, s->recheap[s->recheap_scratch++]);
469 if (s->recheap_scratch >= s->recheap_size)
470 s->recheap_scratch = -1;
474 struct record *ingest_record(struct target *t, char *buf, int len)
476 struct session *s = t->session;
480 wrbuf_rewind(s->wrbuf);
481 yaz_marc_xml(s->yaz_marc, YAZ_MARC_LINE);
482 if (yaz_marc_decode_wrbuf(s->yaz_marc, buf, len, s->wrbuf) < 0)
484 yaz_log(YLOG_WARN, "Failed to decode MARC record");
487 wrbuf_putc(s->wrbuf, '\0');
488 recbuf = wrbuf_buf(s->wrbuf);
490 res = nmem_malloc(s->nmem, sizeof(struct record));
492 res->merge_key = extract_mergekey(s, recbuf);
495 res->buf = nmem_strdupn(s->nmem, recbuf, wrbuf_len(s->wrbuf));
497 res->next_cluster = 0;
498 res->target_offset = -1;
500 yaz_log(YLOG_DEBUG, "Key: %s", res->merge_key);
507 void ingest_records(struct target *t, Z_Records *r)
509 //struct session *s = t->session;
511 Z_NamePlusRecordList *rlist;
514 if (r->which != Z_Records_DBOSD)
516 rlist = r->u.databaseOrSurDiagnostics;
517 for (i = 0; i < rlist->num_records; i++)
519 Z_NamePlusRecord *npr = rlist->records[i];
524 if (npr->which != Z_NamePlusRecord_databaseRecord)
526 yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
529 e = npr->u.databaseRecord;
530 if (e->which != Z_External_octet)
532 yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
535 buf = (char*) e->u.octet_aligned->buf;
536 len = e->u.octet_aligned->len;
538 rec = ingest_record(t, buf, len);
541 yaz_log(YLOG_DEBUG, "Ingested a fooking record");
545 static void do_presentResponse(IOCHAN i, Z_APDU *a)
547 struct target *t = iochan_getdata(i);
548 Z_PresentResponse *r = a->u.presentResponse;
551 Z_Records *recs = r->records;
552 if (recs->which == Z_Records_NSD)
554 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
555 t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
560 yaz_log(YLOG_DEBUG, "Got Records!");
564 if (!*r->presentStatus && t->state != Error)
566 yaz_log(YLOG_DEBUG, "Good Present response");
567 t->records += *r->numberOfRecordsReturned;
568 ingest_records(t, r->records);
571 else if (*r->presentStatus)
573 yaz_log(YLOG_WARN, "Bad Present response");
578 static void handler(IOCHAN i, int event)
580 struct target *t = iochan_getdata(i);
581 struct session *s = t->session;
582 //static int waiting = 0;
584 if (t->state == No_connection) /* Start connection */
586 int res = cs_connect(t->link, t->addr);
588 t->state = Connecting;
589 if (!res) /* we are go */
590 iochan_setevent(i, EVENT_OUTPUT);
592 iochan_setflags(i, EVENT_OUTPUT);
595 yaz_log(YLOG_WARN|YLOG_ERRNO, "ERROR %s connect\n", t->hostport);
602 else if (t->state == Connecting && event & EVENT_OUTPUT)
605 socklen_t errlen = sizeof(errcode);
607 if (getsockopt(cs_fileno(t->link), SOL_SOCKET, SO_ERROR, &errcode,
608 &errlen) < 0 || errcode != 0)
617 yaz_log(YLOG_DEBUG, "Connect OK");
618 t->state = Connected;
622 else if (event & EVENT_INPUT)
624 int len = cs_get(t->link, &t->ibuf, &t->ibufsize);
642 if (t->requestid == s->requestid || t->state == Initializing)
646 odr_reset(t->odr_in);
647 odr_setbuf(t->odr_in, t->ibuf, len, 0);
648 if (!z_APDU(t->odr_in, &a, 0, 0))
655 yaz_log(YLOG_DEBUG, "Successfully decoded %d oct PDU", len);
658 case Z_APDU_initResponse:
659 do_initResponse(i, a);
661 case Z_APDU_searchResponse:
662 do_searchResponse(i, a);
664 case Z_APDU_presentResponse:
665 do_presentResponse(i, a);
668 yaz_log(YLOG_WARN, "Unexpected result from server");
674 // if (cs_more(t->link))
675 // iochan_setevent(i, EVENT_INPUT);
677 else // we throw away response and go to idle mode
680 /* if len==1 we do nothing but wait for more input */
683 else if (t->state == Connected) {
687 if (t->state == Idle)
689 if (t->requestid != s->requestid) {
692 else if (t->hits > 0 && t->records < global_parameters.toget &&
693 t->records < t->hits) {
699 int load_targets(struct session *s, const char *fn)
701 FILE *f = fopen(fn, "r");
703 struct target **target_p;
707 yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
711 target_p = &s->targets;
712 while (fgets(line, 255, f))
715 struct target *target;
718 if (strncmp(line, "target ", 7))
721 url[strlen(url) - 1] = '\0';
722 yaz_log(LOG_DEBUG, "Target: %s", url);
724 *target_p = target = xmalloc(sizeof(**target_p));
726 target_p = &target->next;
727 target->state = No_connection;
729 target->ibufsize = 0;
730 target->odr_in = odr_createmem(ODR_DECODE);
731 target->odr_out = odr_createmem(ODR_ENCODE);
735 target->requestid = -1;
737 target->diagnostic = 0;
738 strcpy(target->fullname, url);
739 if ((p = strchr(url, '/')))
742 strcpy(target->hostport, url);
745 strcpy(target->databases[0], p);
746 target->databases[1][0] = '\0';
750 strcpy(target->hostport, url);
751 strcpy(target->databases[0], "Default");
752 target->databases[1][0] = '\0';
755 if (!(target->link = cs_create(tcpip_type, 0, PROTO_Z3950)))
757 yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
760 if (!(target->addr = cs_straddr(target->link, target->hostport)))
762 printf("ERROR %s bad-address", target->hostport);
763 target->state = Failed;
766 new = iochan_create(cs_fileno(target->link), handler, 0);
767 iochan_setdata(new, target);
768 iochan_setevent(new, EVENT_EXCEPT);
769 new->next = channel_list;
777 void search(struct session *s, char *query)
780 int live_channels = 0;
782 yaz_log(YLOG_DEBUG, "Search");
784 // Determine what iochans belong to this session
785 // It might have been better to have a list of them
787 strcpy(s->query, query);
790 for (c = channel_list; c; c = c->next)
794 if (iochan_getfun(c) != handler) // Not a Z target
796 t = iochan_getdata(c);
803 if (t->state == Error)
806 if (t->state == Idle)
807 iochan_setflag(c, EVENT_OUTPUT);
814 int maxrecs = live_channels * global_parameters.toget;
815 if (!s->recheap_size)
817 s->recheap = xmalloc(maxrecs * sizeof(struct record *));
818 s->recheap_size = maxrecs;
820 else if (s->recheap_size < maxrecs)
822 s->recheap = xrealloc(s->recheap, maxrecs * sizeof(struct record*));
823 s->recheap_size = maxrecs;
827 s->recheap_scratch = -1;
830 struct session *new_session()
832 struct session *session = xmalloc(sizeof(*session));
834 yaz_log(YLOG_DEBUG, "New pazpar2 session");
836 session->requestid = -1;
837 session->targets = 0;
838 session->pqf_parser = yaz_pqf_create();
839 session->query[0] = '\0';
840 session->nmem = nmem_create();
841 session->yaz_marc = yaz_marc_create();
842 yaz_marc_subfield_str(session->yaz_marc, "\t");
843 session->wrbuf = wrbuf_alloc();
844 session->recheap = 0;
845 session->recheap_size = 0;
850 struct hitsbytarget *hitsbytarget(struct session *s, int *count)
852 static struct hitsbytarget res[1000]; // FIXME MM
856 for (c = channel_list; c; c = c->next)
857 if (iochan_getfun(c) == handler)
859 struct target *t = iochan_getdata(c);
862 strcpy(res[*count].id, t->hostport);
863 res[*count].hits = t->hits;
864 res[*count].records = t->records;
865 res[*count].diagnostic = t->diagnostic;
866 res[*count].state = state_strings[(int) t->state];
874 struct record **show(struct session *s, int start, int *num)
876 struct record **recs = nmem_malloc(s->nmem, *num * sizeof(struct record *));
879 // FIXME -- skip initial records
881 for (i = 0; i < *num; i++)
883 recs[i] = read_recheap(s);
894 void statistics(struct session *s, struct statistics *stat)
899 bzero(stat, sizeof(*stat));
900 for (i = 0, c = channel_list; c; i++, c = c->next)
903 if (iochan_getfun(c) != handler)
905 t = iochan_getdata(c);
908 case No_connection: stat->num_no_connection++; break;
909 case Connecting: stat->num_connecting++; break;
910 case Initializing: stat->num_initializing++; break;
911 case Searching: stat->num_searching++; break;
912 case Presenting: stat->num_presenting++; break;
913 case Idle: stat->num_idle++; break;
914 case Failed: stat->num_failed++; break;
915 case Error: stat->num_error++; break;
920 stat->num_connections = i;
923 int main(int argc, char **argv)
928 if (signal(SIGPIPE, SIG_IGN) < 0)
929 yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
931 yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
933 while ((ret = options("c:", argv, argc, &arg)) != -2)
939 command_init(atoi(arg));
942 fprintf(stderr, "Usage: pazpar2 -d comport");
948 event_loop(&channel_list);
956 * indent-tabs-mode: nil
958 * vim: shiftwidth=4 tabstop=8 expandtab