-/* $Id: pazpar2.c,v 1.1 2006-11-14 20:44:38 quinn Exp $ */
-
-
-#define PAZPAR2_VERSION "0.1"
-#define MAX_DATABASES 512
+/* $Id: pazpar2.c,v 1.3 2006-11-21 18:46:43 quinn Exp $ */
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <signal.h>
+#include <ctype.h>
+#include <assert.h>
#include <yaz/comstack.h>
#include <yaz/tcpip.h>
#include "pazpar2.h"
#include "eventl.h"
#include "command.h"
+#include "http.h"
-char *myname;
-static long int runId=-1;
+#define PAZPAR2_VERSION "0.1"
+#define MAX_DATABASES 512
+#define MAX_CHUNK 10
struct target
{
PAZPAR2_VERSION,
{0,0},
100,
- 10
+ MAX_CHUNK
};
struct target *t = iochan_getdata(i);
Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_presentRequest);
int toget;
+ int start = t->records + 1;
toget = global_parameters.chunk;
if (toget > t->hits - t->records)
yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
+ a->u.presentRequest->resultSetStartPoint = &start;
a->u.presentRequest->numberOfRecordsRequested = &toget;
a->u.presentRequest->resultSetId = "Default";
}
}
-#if 0
-static char *search_geterror(Z_SearchRequest *r)
-{
-#endif
-
-
static void do_searchResponse(IOCHAN i, Z_APDU *a)
{
struct target *t = iochan_getdata(i);
}
}
-// FIXME Catch present errors!!!!!!!
+const char *find_field(const char *rec, const char *field)
+{
+ const char *line = rec;
+
+ while (*line)
+ {
+ if (!strncmp(line, field, 3) && line[3] == ' ')
+ return line;
+ while (*(line++) != '\n')
+ ;
+ }
+ return 0;
+}
+
+const char *find_subfield(const char *field, char subfield)
+{
+ const char *p = field;
+
+ while (*p && *p != '\n')
+ {
+ while (*p != '\n' && *p != '\t')
+ p++;
+ if (*p == '\t' && *(++p) == subfield) {
+ if (*(++p) == ' ')
+ return ++p;
+ }
+ }
+ return 0;
+}
+
+// Extract 245 $a $b 100 $a
+char *extract_mergekey(struct session *s, const char *rec)
+{
+ const char *field, *subfield;
+ char *e, *ef;
+ char *out, *p, *pout;
+
+ wrbuf_rewind(s->wrbuf);
+
+ if (!(field = find_field(rec, "245")))
+ return 0;
+ if (!(subfield = find_subfield(field, 'a')))
+ return 0;
+ ef = index(subfield, '\n');
+ if ((e = index(subfield, '\t')) && e < ef)
+ ef = e;
+ if (ef)
+ {
+ wrbuf_write(s->wrbuf, subfield, ef - subfield);
+ if ((subfield = find_subfield(field, 'b')))
+ {
+ ef = index(subfield, '\n');
+ if ((e = index(subfield, '\t')) && e < ef)
+ ef = e;
+ if (ef)
+ {
+ wrbuf_puts(s->wrbuf, " field ");
+ wrbuf_write(s->wrbuf, subfield, ef - subfield);
+ }
+ }
+ }
+ if ((field = find_field(rec, "100")))
+ {
+ if ((subfield = find_subfield(field, 'a')))
+ {
+ ef = index(subfield, '\n');
+ if ((e = index(subfield, '\t')) && e < ef)
+ ef = e;
+ if (ef)
+ {
+ wrbuf_puts(s->wrbuf, " field ");
+ wrbuf_write(s->wrbuf, subfield, ef - subfield);
+ }
+ }
+ }
+ wrbuf_putc(s->wrbuf, '\0');
+ p = wrbuf_buf(s->wrbuf);
+ out = pout = nmem_malloc(s->nmem, strlen(p) + 1);
+
+ while (*p)
+ {
+ while (isalnum(*p))
+ *(pout++) = tolower(*(p++));
+ while (*p && !isalnum(*p))
+ p++;
+ *(pout++) = ' ';
+ }
+ if (out != pout)
+ *(--pout) = '\0';
+
+ return out;
+}
+
+static void push_record(struct session *s, struct record *r)
+{
+ int p;
+ assert(s->recheap_max + 1 < s->recheap_size);
+
+ s->recheap[p = ++s->recheap_max] = r;
+ while (p > 0)
+ {
+ int parent = (p - 1) >> 1;
+ if (strcmp(s->recheap[p]->merge_key, s->recheap[parent]->merge_key) < 0)
+ {
+ struct record *tmp;
+ tmp = s->recheap[parent];
+ s->recheap[parent] = s->recheap[p];
+ s->recheap[p] = tmp;
+ p = parent;
+ }
+ else
+ break;
+ }
+}
+
+static struct record *top_record(struct session *s)
+{
+ return s-> recheap_max >= 0 ? s->recheap[0] : 0;
+}
+
+static struct record *pop_record(struct session *s)
+{
+ struct record *res = s->recheap[0];
+ int p = 0;
+ int lastnonleaf = (s->recheap_max - 1) >> 1;
+
+ if (s->recheap_max < 0)
+ return 0;
+
+ s->recheap[p] = s->recheap[s->recheap_max--];
+
+ while (p <= lastnonleaf)
+ {
+ int right = (p + 1) << 1;
+ int left = right - 1;
+ int min = left;
+
+ if (right < s->recheap_max &&
+ strcmp(s->recheap[right]->merge_key, s->recheap[left]->merge_key) < 0)
+ min = right;
+ if (strcmp(s->recheap[min]->merge_key, s->recheap[p]->merge_key) < 0)
+ {
+ struct record *tmp = s->recheap[min];
+ s->recheap[min] = s->recheap[p];
+ s->recheap[p] = tmp;
+ p = min;
+ }
+ else
+ break;
+ }
+ return res;
+}
+
+// Like pop_record but collapses identical (merge_key) records
+// The heap will contain multiple independent matching records and possibly
+// one cluster, created the last time the list was scanned
+static struct record *pop_mrecord(struct session *s)
+{
+ struct record *this;
+ struct record *next;
+
+ if (!(this = pop_record(s)))
+ return 0;
+
+ // Collapse identical records
+ while ((next = top_record(s)))
+ {
+ struct record *p, *tmpnext;
+ if (strcmp(this->merge_key, next->merge_key))
+ break;
+ // Absorb record (and clustersiblings) into a supercluster
+ for (p = next; p; p = tmpnext) {
+ tmpnext = p->next_cluster;
+ p->next_cluster = this->next_cluster;
+ this->next_cluster = p;
+ }
+
+ pop_record(s);
+ }
+ return this;
+}
+
+// Reads records in sort order. Store records in top of heapspace until rewind is called.
+static struct record *read_recheap(struct session *s)
+{
+ struct record *r = pop_mrecord(s);
+
+ if (r)
+ {
+ if (s->recheap_scratch < 0)
+ s->recheap_scratch = s->recheap_size;
+ s->recheap[--s->recheap_scratch] = r;
+ }
+
+ return r;
+}
+
+// Return records to heap after read
+static void rewind_recheap(struct session *s)
+{
+ while (s->recheap_scratch >= 0) {
+ push_record(s, s->recheap[s->recheap_scratch++]);
+ if (s->recheap_scratch >= s->recheap_size)
+ s->recheap_scratch = -1;
+ }
+}
+
+struct record *ingest_record(struct target *t, char *buf, int len)
+{
+ struct session *s = t->session;
+ struct record *res;
+ const char *recbuf;
+
+ wrbuf_rewind(s->wrbuf);
+ yaz_marc_xml(s->yaz_marc, YAZ_MARC_LINE);
+ if (yaz_marc_decode_wrbuf(s->yaz_marc, buf, len, s->wrbuf) < 0)
+ {
+ yaz_log(YLOG_WARN, "Failed to decode MARC record");
+ return 0;
+ }
+ wrbuf_putc(s->wrbuf, '\0');
+ recbuf = wrbuf_buf(s->wrbuf);
+
+ res = nmem_malloc(s->nmem, sizeof(struct record));
+
+ res->merge_key = extract_mergekey(s, recbuf);
+ if (!res->merge_key)
+ return 0;
+ res->buf = nmem_strdupn(s->nmem, recbuf, wrbuf_len(s->wrbuf));
+ res->target = t;
+ res->next_cluster = 0;
+ res->target_offset = -1;
+
+ yaz_log(YLOG_DEBUG, "Key: %s", res->merge_key);
+
+ push_record(s, res);
+
+ return res;
+}
+
+void ingest_records(struct target *t, Z_Records *r)
+{
+ //struct session *s = t->session;
+ struct record *rec;
+ Z_NamePlusRecordList *rlist;
+ int i;
+
+ if (r->which != Z_Records_DBOSD)
+ return;
+ rlist = r->u.databaseOrSurDiagnostics;
+ for (i = 0; i < rlist->num_records; i++)
+ {
+ Z_NamePlusRecord *npr = rlist->records[i];
+ Z_External *e;
+ char *buf;
+ int len;
+
+ if (npr->which != Z_NamePlusRecord_databaseRecord)
+ {
+ yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
+ continue;
+ }
+ e = npr->u.databaseRecord;
+ if (e->which != Z_External_octet)
+ {
+ yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
+ continue;
+ }
+ buf = (char*) e->u.octet_aligned->buf;
+ len = e->u.octet_aligned->len;
+
+ rec = ingest_record(t, buf, len);
+ if (!rec)
+ continue;
+ yaz_log(YLOG_DEBUG, "Ingested a fooking record");
+ }
+}
+
static void do_presentResponse(IOCHAN i, Z_APDU *a)
{
struct target *t = iochan_getdata(i);
{
yaz_log(YLOG_DEBUG, "Good Present response");
t->records += *r->numberOfRecordsReturned;
+ ingest_records(t, r->records);
t->state = Idle;
}
else if (*r->presentStatus)
t->state = Failed;
return;
}
+ yaz_log(YLOG_DEBUG, "Successfully decoded %d oct PDU", len);
switch (a->which)
{
case Z_APDU_initResponse:
void search(struct session *s, char *query)
{
IOCHAN c;
+ int live_channels = 0;
yaz_log(YLOG_DEBUG, "Search");
if (t->state == Idle)
iochan_setflag(c, EVENT_OUTPUT);
+
+ live_channels++;
+ }
+ }
+ if (live_channels)
+ {
+ int maxrecs = live_channels * global_parameters.toget;
+ if (!s->recheap_size)
+ {
+ s->recheap = xmalloc(maxrecs * sizeof(struct record *));
+ s->recheap_size = maxrecs;
+ }
+ else if (s->recheap_size < maxrecs)
+ {
+ s->recheap = xrealloc(s->recheap, maxrecs * sizeof(struct record*));
+ s->recheap_size = maxrecs;
}
}
+ s->recheap_max = -1;
+ s->recheap_scratch = -1;
}
struct session *new_session()
session->pqf_parser = yaz_pqf_create();
session->query[0] = '\0';
session->nmem = nmem_create();
+ session->yaz_marc = yaz_marc_create();
+ yaz_marc_subfield_str(session->yaz_marc, "\t");
+ session->wrbuf = wrbuf_alloc();
+ session->recheap = 0;
+ session->recheap_size = 0;
return session;
}
+void session_destroy(struct session *s)
+{
+ // FIXME do some shit here!!!!
+}
+
struct hitsbytarget *hitsbytarget(struct session *s, int *count)
{
static struct hitsbytarget res[1000]; // FIXME MM
return res;
}
+struct record **show(struct session *s, int start, int *num)
+{
+ struct record **recs = nmem_malloc(s->nmem, *num * sizeof(struct record *));
+ int i;
+
+ // FIXME -- skip initial records
+
+ for (i = 0; i < *num; i++)
+ {
+ recs[i] = read_recheap(s);
+ if (!recs[i])
+ {
+ *num = i;
+ break;
+ }
+ }
+ rewind_recheap(s);
+ return recs;
+}
void statistics(struct session *s, struct statistics *stat)
{
if (signal(SIGPIPE, SIG_IGN) < 0)
yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
- myname = argv[0];
yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
- while ((ret = options("c:", argv, argc, &arg)) != -2)
+ while ((ret = options("c:h:", argv, argc, &arg)) != -2)
{
switch (ret) {
case 0:
case 'c':
command_init(atoi(arg));
break;
+ case 'h':
+ http_init(atoi(arg));
+ break;
default:
fprintf(stderr, "Usage: pazpar2 -d comport");
exit(1);