Spurious files deleted.
[pazpar2-moved-to-github.git] / pazpar2.c
index b003664..bcedea6 100644 (file)
--- a/pazpar2.c
+++ b/pazpar2.c
@@ -1,8 +1,4 @@
-/* $Id: pazpar2.c,v 1.1 2006-11-14 20:44:38 quinn Exp $ */
-
-
-#define PAZPAR2_VERSION "0.1"
-#define MAX_DATABASES 512
+/* $Id: pazpar2.c,v 1.2 2006-11-18 05:00:38 quinn Exp $ */
 
 #include <stdlib.h>
 #include <stdio.h>
@@ -11,6 +7,8 @@
 #include <unistd.h>
 #include <sys/socket.h>
 #include <signal.h>
+#include <ctype.h>
+#include <assert.h>
 
 #include <yaz/comstack.h>
 #include <yaz/tcpip.h>
@@ -23,8 +21,9 @@
 #include "eventl.h"
 #include "command.h"
 
-char *myname;
-static long int runId=-1;
+#define PAZPAR2_VERSION "0.1"
+#define MAX_DATABASES 512
+#define MAX_CHUNK 10
 
 struct target
 {
@@ -88,7 +87,7 @@ static struct parameters {
     PAZPAR2_VERSION,
     {0,0},
     100,
-    10
+    MAX_CHUNK
 };
 
 
@@ -191,6 +190,7 @@ static void send_present(IOCHAN i)
     struct target *t = iochan_getdata(i);
     Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_presentRequest);
     int toget;
+    int start = t->records + 1;
 
     toget = global_parameters.chunk;
     if (toget > t->hits - t->records)
@@ -198,6 +198,7 @@ static void send_present(IOCHAN i)
 
     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
 
+    a->u.presentRequest->resultSetStartPoint = &start;
     a->u.presentRequest->numberOfRecordsRequested = &toget;
 
     a->u.presentRequest->resultSetId = "Default";
@@ -235,12 +236,6 @@ static void do_initResponse(IOCHAN i, Z_APDU *a)
     }
 }
 
-#if 0
-static char *search_geterror(Z_SearchRequest *r)
-{
-#endif
-    
-
 static void do_searchResponse(IOCHAN i, Z_APDU *a)
 {
     struct target *t = iochan_getdata(i);
@@ -269,7 +264,284 @@ static void do_searchResponse(IOCHAN i, Z_APDU *a)
     }
 }
 
-// FIXME Catch present errors!!!!!!!
+const char *find_field(const char *rec, const char *field)
+{
+    const char *line = rec;
+
+    while (*line)
+    {
+        if (!strncmp(line, field, 3) && line[3] == ' ')
+            return line;
+        while (*(line++) != '\n')
+            ;
+    }
+    return 0;
+}
+
+const char *find_subfield(const char *field, char subfield)
+{
+    const char *p = field;
+
+    while (*p && *p != '\n')
+    {
+        while (*p != '\n' && *p != '\t')
+            p++;
+        if (*p == '\t' && *(++p) == subfield) {
+            if (*(++p) == ' ')
+                return ++p;
+        }
+    }
+    return 0;
+}
+
+// Extract 245 $a $b 100 $a
+char *extract_mergekey(struct session *s, const char *rec)
+{
+    const char *field, *subfield;
+    char *e, *ef;
+    char *out, *p, *pout;
+
+    wrbuf_rewind(s->wrbuf);
+
+    if (!(field = find_field(rec, "245")))
+        return 0;
+    if (!(subfield = find_subfield(field, 'a')))
+        return 0;
+    ef = index(subfield, '\n');
+    if ((e = index(subfield, '\t')) && e < ef)
+        ef = e;
+    if (ef)
+    {
+        wrbuf_write(s->wrbuf, subfield, ef - subfield);
+        if ((subfield = find_subfield(field, 'b'))) 
+        {
+            ef = index(subfield, '\n');
+            if ((e = index(subfield, '\t')) && e < ef)
+                ef = e;
+            if (ef)
+            {
+                wrbuf_puts(s->wrbuf, " field "); 
+                wrbuf_write(s->wrbuf, subfield, ef - subfield);
+            }
+        }
+    }
+    if ((field = find_field(rec, "100")))
+    {
+        if ((subfield = find_subfield(field, 'a')))
+        {
+            ef = index(subfield, '\n');
+            if ((e = index(subfield, '\t')) && e < ef)
+                ef = e;
+            if (ef)
+            {
+                wrbuf_puts(s->wrbuf, " field "); 
+                wrbuf_write(s->wrbuf, subfield, ef - subfield);
+            }
+        }
+    }
+    wrbuf_putc(s->wrbuf, '\0');
+    p = wrbuf_buf(s->wrbuf);
+    out = pout = nmem_malloc(s->nmem, strlen(p) + 1);
+
+    while (*p)
+    {
+        while (isalnum(*p))
+            *(pout++) = tolower(*(p++));
+        while (*p && !isalnum(*p))
+            p++;
+        *(pout++) = ' ';
+    }
+    if (out != pout)
+        *(--pout) = '\0';
+
+    return out;
+}
+
+static void push_record(struct session *s, struct record *r)
+{
+    int p;
+    assert(s->recheap_max + 1 < s->recheap_size);
+
+    s->recheap[p = ++s->recheap_max] = r;
+    while (p > 0)
+    {
+        int parent = (p - 1) >> 1;
+        if (strcmp(s->recheap[p]->merge_key, s->recheap[parent]->merge_key) < 0)
+        {
+            struct record *tmp;
+            tmp = s->recheap[parent];
+            s->recheap[parent] = s->recheap[p];
+            s->recheap[p] = tmp;
+            p = parent;
+        }
+        else
+            break;
+    }
+}
+
+static struct record *top_record(struct session *s)
+{
+    return s-> recheap_max >= 0 ?  s->recheap[0] : 0;
+}
+
+static struct record *pop_record(struct session *s)
+{
+    struct record *res = s->recheap[0];
+    int p = 0;
+    int lastnonleaf = (s->recheap_max - 1) >> 1;
+
+    if (s->recheap_max < 0)
+        return 0;
+
+    s->recheap[p] = s->recheap[s->recheap_max--];
+
+    while (p <= lastnonleaf)
+    {
+        int right = (p + 1) << 1;
+        int left = right - 1;
+        int min = left;
+
+        if (right < s->recheap_max &&
+                strcmp(s->recheap[right]->merge_key, s->recheap[left]->merge_key) < 0)
+            min = right;
+        if (strcmp(s->recheap[min]->merge_key, s->recheap[p]->merge_key) < 0)
+        {
+            struct record *tmp = s->recheap[min];
+            s->recheap[min] = s->recheap[p];
+            s->recheap[p] = tmp;
+            p = min;
+        }
+        else
+            break;
+    }
+    return res;
+}
+
+// Like pop_record but collapses identical (merge_key) records
+// The heap will contain multiple independent matching records and possibly
+// one cluster, created the last time the list was scanned
+static struct record *pop_mrecord(struct session *s)
+{
+    struct record *this;
+    struct record *next;
+
+    if (!(this = pop_record(s)))
+        return 0;
+
+    // Collapse identical records
+    while ((next = top_record(s)))
+    {
+        struct record *p, *tmpnext;
+        if (strcmp(this->merge_key, next->merge_key))
+            break;
+        // Absorb record (and clustersiblings) into a supercluster
+        for (p = next; p; p = tmpnext) {
+            tmpnext = p->next_cluster;
+            p->next_cluster = this->next_cluster;
+            this->next_cluster = p;
+        }
+
+        pop_record(s);
+    }
+    return this;
+}
+
+// Reads records in sort order. Store records in top of heapspace until rewind is called.
+static struct record *read_recheap(struct session *s)
+{
+    struct record *r = pop_mrecord(s);
+
+    if (r)
+    {
+        if (s->recheap_scratch < 0)
+            s->recheap_scratch = s->recheap_size;
+        s->recheap[--s->recheap_scratch] = r;
+    }
+
+    return r;
+}
+
+// Return records to heap after read
+static void rewind_recheap(struct session *s)
+{
+    while (s->recheap_scratch >= 0) {
+        push_record(s, s->recheap[s->recheap_scratch++]);
+        if (s->recheap_scratch >= s->recheap_size)
+            s->recheap_scratch = -1;
+    }
+}
+
+struct record *ingest_record(struct target *t, char *buf, int len)
+{
+    struct session *s = t->session;
+    struct record *res;
+    const char *recbuf;
+
+    wrbuf_rewind(s->wrbuf);
+    yaz_marc_xml(s->yaz_marc, YAZ_MARC_LINE);
+    if (yaz_marc_decode_wrbuf(s->yaz_marc, buf, len, s->wrbuf) < 0)
+    {
+        yaz_log(YLOG_WARN, "Failed to decode MARC record");
+        return 0;
+    }
+    wrbuf_putc(s->wrbuf, '\0');
+    recbuf = wrbuf_buf(s->wrbuf);
+
+    res = nmem_malloc(s->nmem, sizeof(struct record));
+
+    res->merge_key = extract_mergekey(s, recbuf);
+    if (!res->merge_key)
+        return 0;
+    res->buf = nmem_strdupn(s->nmem, recbuf, wrbuf_len(s->wrbuf));
+    res->target = t;
+    res->next_cluster = 0;
+    res->target_offset = -1;
+
+    yaz_log(YLOG_DEBUG, "Key: %s", res->merge_key);
+
+    push_record(s, res);
+
+    return res;
+}
+
+void ingest_records(struct target *t, Z_Records *r)
+{
+    //struct session *s = t->session;
+    struct record *rec;
+    Z_NamePlusRecordList *rlist;
+    int i;
+
+    if (r->which != Z_Records_DBOSD)
+        return;
+    rlist = r->u.databaseOrSurDiagnostics;
+    for (i = 0; i < rlist->num_records; i++)
+    {
+        Z_NamePlusRecord *npr = rlist->records[i];
+        Z_External *e;
+        char *buf;
+        int len;
+
+        if (npr->which != Z_NamePlusRecord_databaseRecord)
+        {
+            yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
+            continue;
+        }
+        e = npr->u.databaseRecord;
+        if (e->which != Z_External_octet)
+        {
+            yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
+            continue;
+        }
+        buf = (char*) e->u.octet_aligned->buf;
+        len = e->u.octet_aligned->len;
+
+        rec = ingest_record(t, buf, len);
+        if (!rec)
+            continue;
+        yaz_log(YLOG_DEBUG, "Ingested a fooking record");
+    }
+}
+
 static void do_presentResponse(IOCHAN i, Z_APDU *a)
 {
     struct target *t = iochan_getdata(i);
@@ -293,6 +565,7 @@ static void do_presentResponse(IOCHAN i, Z_APDU *a)
     {
         yaz_log(YLOG_DEBUG, "Good Present response");
         t->records += *r->numberOfRecordsReturned;
+        ingest_records(t, r->records);
         t->state = Idle;
     }
     else if (*r->presentStatus) 
@@ -379,6 +652,7 @@ static void handler(IOCHAN i, int event)
                     t->state = Failed;
                     return;
                 }
+                yaz_log(YLOG_DEBUG, "Successfully decoded %d oct PDU", len);
                 switch (a->which)
                 {
                     case Z_APDU_initResponse:
@@ -503,6 +777,7 @@ int load_targets(struct session *s, const char *fn)
 void search(struct session *s, char *query)
 {
     IOCHAN c;
+    int live_channels = 0;
 
     yaz_log(YLOG_DEBUG, "Search");
 
@@ -530,8 +805,26 @@ void search(struct session *s, char *query)
 
             if (t->state == Idle) 
                 iochan_setflag(c, EVENT_OUTPUT);
+
+            live_channels++;
+        }
+    }
+    if (live_channels)
+    {
+        int maxrecs = live_channels * global_parameters.toget;
+        if (!s->recheap_size)
+        {
+            s->recheap = xmalloc(maxrecs * sizeof(struct record *));
+            s->recheap_size = maxrecs;
+        }
+        else if (s->recheap_size < maxrecs)
+        {
+            s->recheap = xrealloc(s->recheap, maxrecs * sizeof(struct record*));
+            s->recheap_size = maxrecs;
         }
     }
+    s->recheap_max = -1;
+    s->recheap_scratch = -1;
 }
 
 struct session *new_session() 
@@ -545,6 +838,11 @@ struct session *new_session()
     session->pqf_parser = yaz_pqf_create();
     session->query[0] = '\0';
     session->nmem = nmem_create();
+    session->yaz_marc = yaz_marc_create();
+    yaz_marc_subfield_str(session->yaz_marc, "\t");
+    session->wrbuf = wrbuf_alloc();
+    session->recheap = 0;
+    session->recheap_size = 0;
 
     return session;
 }
@@ -573,6 +871,25 @@ struct hitsbytarget *hitsbytarget(struct session *s, int *count)
     return res;
 }
 
+struct record **show(struct session *s, int start, int *num)
+{
+    struct record **recs = nmem_malloc(s->nmem, *num * sizeof(struct record *));
+    int i;
+
+    // FIXME -- skip initial records
+
+    for (i = 0; i < *num; i++)
+    {
+        recs[i] = read_recheap(s);
+        if (!recs[i])
+        {
+            *num = i;
+            break;
+        }
+    }
+    rewind_recheap(s);
+    return recs;
+}
 
 void statistics(struct session *s, struct statistics *stat)
 {
@@ -611,7 +928,6 @@ int main(int argc, char **argv)
     if (signal(SIGPIPE, SIG_IGN) < 0)
         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
 
-    myname = argv[0];
     yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
 
     while ((ret = options("c:", argv, argc, &arg)) != -2)