1 /* This file is part of the Zebra server.
2 Copyright (C) 1994-2011 Index Data
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include <yaz/xmalloc.h>
24 #include <idzebra/isamb.h>
32 #define ISAMB_MAJOR_VERSION 3
33 #define ISAMB_MINOR_VERSION_NO_ROOT 0
34 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
46 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 /* maximum size of encoded buffer */
50 #define DST_ITEM_MAX 5000
52 /* max page size for _any_ isamb use */
53 #define ISAMB_MAX_PAGE 32768
55 #define ISAMB_MAX_LEVEL 10
56 /* approx 2*max page + max size of item */
57 #define DST_BUF_SIZE (2*ISAMB_MAX_PAGE+DST_ITEM_MAX+100)
59 /* should be maximum block size of multiple thereof */
60 #define ISAMB_CACHE_ENTRY_SIZE ISAMB_MAX_PAGE
62 /* CAT_MAX: _must_ be power of 2 */
64 #define CAT_MASK (CAT_MAX-1)
65 /* CAT_NO: <= CAT_MAX */
68 /* Smallest block size */
69 #define ISAMB_MIN_SIZE 32
71 #define ISAMB_FAC_SIZE 4
73 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
74 #define ISAMB_PTR_CODEC 1
76 struct ISAMB_cache_entry {
81 struct ISAMB_cache_entry *next;
87 struct ISAMB_head head;
88 struct ISAMB_cache_entry *cache_entries;
95 struct ISAMB_file *file;
97 int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
98 int log_io; /* log level for bf_read/bf_write calls */
99 int log_freelist; /* log level for freelist handling */
100 zint skipped_numbers; /* on a leaf node */
101 zint returned_numbers;
102 zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
103 zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
104 zint number_of_int_splits;
105 zint number_of_leaf_splits;
106 int enable_int_count; /* whether we count nodes (or not) */
107 int cache_size; /* size of blocks to cache (if cache=1) */
120 zint no_items; /* number of nodes in this + children */
124 void *decodeClientData;
132 int maxlevel; /* total depth */
135 zint skipped_numbers; /* on a leaf node */
136 zint returned_numbers;
137 zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
138 zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
139 struct ISAMB_block **block;
140 int scope; /* on what level we forward */
144 #define encode_item_len encode_ptr
146 static void encode_ptr(char **dst, zint pos)
148 unsigned char *bp = (unsigned char*) *dst;
152 *bp++ = (unsigned char) (128 | (pos & 127));
155 *bp++ = (unsigned char) pos;
159 static void encode_ptr(char **dst, zint pos)
161 memcpy(*dst, &pos, sizeof(pos));
162 (*dst) += sizeof(pos);
166 #define decode_item_len decode_ptr
168 static void decode_ptr(const char **src, zint *pos)
174 while (((c = *(const unsigned char *)((*src)++)) & 128))
176 d += ((zint) (c & 127) << r);
179 d += ((zint) c << r);
183 static void decode_ptr(const char **src, zint *pos)
185 memcpy(pos, *src, sizeof(*pos));
186 (*src) += sizeof(*pos);
191 void isamb_set_int_count(ISAMB b, int v)
193 b->enable_int_count = v;
196 void isamb_set_cache_size(ISAMB b, int v)
201 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
202 int cache, int no_cat, int *sizes, int use_root_ptr)
204 ISAMB isamb = xmalloc(sizeof(*isamb));
207 assert(no_cat <= CAT_MAX);
210 isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
211 memcpy(isamb->method, method, sizeof(*method));
212 isamb->no_cat = no_cat;
214 isamb->log_freelist = 0;
215 isamb->cache = cache;
216 isamb->skipped_numbers = 0;
217 isamb->returned_numbers = 0;
218 isamb->number_of_int_splits = 0;
219 isamb->number_of_leaf_splits = 0;
220 isamb->enable_int_count = 1;
221 isamb->cache_size = 40;
224 isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
226 isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
230 for (i = 0; i<ISAMB_MAX_LEVEL; i++)
231 isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
235 yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
239 assert(cache == 0 || cache == 1);
241 isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
243 for (i = 0; i < isamb->no_cat; i++)
245 isamb->file[i].bf = 0;
246 isamb->file[i].head_dirty = 0;
247 isamb->file[i].cache_entries = 0;
250 for (i = 0; i < isamb->no_cat; i++)
252 char fname[DST_BUF_SIZE];
253 char hbuf[DST_BUF_SIZE];
255 sprintf(fname, "%s%c", name, i+'A');
257 isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
260 isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
262 if (!isamb->file[i].bf)
268 /* fill-in default values (for empty isamb) */
269 isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
270 isamb->file[i].head.last_block = isamb->file[i].head.first_block;
271 isamb->file[i].head.block_size = sizes[i];
272 assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
274 if (i == isamb->no_cat-1 || sizes[i] > 128)
275 isamb->file[i].head.block_offset = 8;
277 isamb->file[i].head.block_offset = 4;
279 isamb->file[i].head.block_offset = 11;
281 isamb->file[i].head.block_max =
282 sizes[i] - isamb->file[i].head.block_offset;
283 isamb->file[i].head.free_list = 0;
284 if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
286 /* got header assume "isamb"major minor len can fit in 16 bytes */
288 int major, minor, len, pos = 0;
291 if (memcmp(hbuf, "isamb", 5))
293 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
297 if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
299 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
303 if (major != ISAMB_MAJOR_VERSION)
305 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
306 fname, major, ISAMB_MAJOR_VERSION);
310 for (left = len - sizes[i]; left > 0; left = left - sizes[i])
313 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
315 yaz_log(YLOG_WARN, "truncated isamb header for "
316 "file=%s len=%d pos=%d",
323 decode_ptr(&src, &isamb->file[i].head.first_block);
324 decode_ptr(&src, &isamb->file[i].head.last_block);
325 decode_ptr(&src, &zint_tmp);
326 isamb->file[i].head.block_size = (int) zint_tmp;
327 decode_ptr(&src, &zint_tmp);
328 isamb->file[i].head.block_max = (int) zint_tmp;
329 decode_ptr(&src, &isamb->file[i].head.free_list);
330 if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
331 decode_ptr(&src, &isamb->root_ptr);
333 assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
334 /* must rewrite the header if root ptr is in use (bug #1017) */
335 if (use_root_ptr && writeflag)
336 isamb->file[i].head_dirty = 1;
338 isamb->file[i].head_dirty = 0;
339 assert(isamb->file[i].head.block_size == sizes[i]);
342 yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
347 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
351 int i, b_size = ISAMB_MIN_SIZE;
353 for (i = 0; i<CAT_NO; i++)
356 b_size = b_size * ISAMB_FAC_SIZE;
358 return isamb_open2(bfs, name, writeflag, method, cache,
362 static void flush_blocks(ISAMB b, int cat)
364 while (b->file[cat].cache_entries)
366 struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
367 b->file[cat].cache_entries = ce_this->next;
371 yaz_log(b->log_io, "bf_write: flush_blocks");
372 bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
379 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
381 int cat = (int) (pos&CAT_MASK);
382 int off = (int) (((pos/CAT_MAX) &
383 (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
384 * b->file[cat].head.block_size);
385 zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
387 struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
392 assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
393 for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
396 if ((*ce)->pos == norm)
399 *ce = (*ce)->next; /* remove from list */
401 ce_this->next = b->file[cat].cache_entries; /* move to front */
402 b->file[cat].cache_entries = ce_this;
406 memcpy(ce_this->buf + off, userbuf,
407 b->file[cat].head.block_size);
411 memcpy(userbuf, ce_this->buf + off,
412 b->file[cat].head.block_size);
416 if (no >= b->cache_size)
418 assert(ce_last && *ce_last);
420 *ce_last = 0; /* remove the last entry from list */
423 yaz_log(b->log_io, "bf_write: cache_block");
424 bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
429 ce_this = xmalloc(sizeof(*ce_this));
430 ce_this->next = b->file[cat].cache_entries;
431 b->file[cat].cache_entries = ce_this;
432 ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
434 yaz_log(b->log_io, "bf_read: cache_block");
435 if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
436 memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
439 memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
445 memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
451 void isamb_close(ISAMB isamb)
454 for (i = 0; isamb->accessed_nodes[i]; i++)
455 yaz_log(YLOG_DEBUG, "isamb_close level leaf-%d: "ZINT_FORMAT" read, "
456 ZINT_FORMAT" skipped",
457 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
458 yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
459 "skipped "ZINT_FORMAT,
460 isamb->skipped_numbers, isamb->returned_numbers);
462 for (i = 0; i<isamb->no_cat; i++)
464 flush_blocks(isamb, i);
465 if (isamb->file[i].head_dirty)
467 char hbuf[DST_BUF_SIZE];
468 int major = ISAMB_MAJOR_VERSION;
470 char *dst = hbuf + 16;
472 int b_size = isamb->file[i].head.block_size;
474 encode_ptr(&dst, isamb->file[i].head.first_block);
475 encode_ptr(&dst, isamb->file[i].head.last_block);
476 encode_ptr(&dst, isamb->file[i].head.block_size);
477 encode_ptr(&dst, isamb->file[i].head.block_max);
478 encode_ptr(&dst, isamb->file[i].head.free_list);
480 if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
481 encode_ptr(&dst, isamb->root_ptr);
483 memset(dst, '\0', b_size); /* ensure no random bytes are written */
487 /* print exactly 16 bytes (including trailing 0) */
488 sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
489 isamb->minor_version, len);
491 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
493 for (left = len - b_size; left > 0; left = left - b_size)
496 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
499 if (isamb->file[i].bf)
500 bf_close (isamb->file[i].bf);
503 xfree(isamb->method);
507 /* open_block: read one block at pos.
508 Decode leading sys bytes .. consisting of
510 0: leader byte, != 0 leaf, == 0, non-leaf
511 1-2: used size of block
512 3-7*: number of items and all children
514 * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
515 of items. We can thus have at most 2^40 nodes.
517 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
519 int cat = (int) (pos&CAT_MASK);
521 int offset = b->file[cat].head.block_offset;
522 struct ISAMB_block *p;
525 p = xmalloc(sizeof(*p));
527 p->cat = (int) (pos & CAT_MASK);
528 p->buf = xmalloc(b->file[cat].head.block_size);
531 if (!cache_block (b, pos, p->buf, 0))
533 yaz_log(b->log_io, "bf_read: open_block");
534 if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
536 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
537 (long) pos, (long) pos/CAT_MAX);
538 zebra_exit("isamb:open_block");
541 p->bytes = (char *)p->buf + offset;
543 p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
546 yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
549 assert(p->size >= 0);
550 src = (char*) p->buf + 3;
551 decode_ptr(&src, &p->no_items);
556 p->decodeClientData = (*b->method->codec.start)();
560 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
562 struct ISAMB_block *p;
564 p = xmalloc(sizeof(*p));
565 p->buf = xmalloc(b->file[cat].head.block_size);
567 if (!b->file[cat].head.free_list)
570 block_no = b->file[cat].head.last_block++;
571 p->pos = block_no * CAT_MAX + cat;
573 yaz_log(b->log_freelist, "got block "
574 ZINT_FORMAT " from last %d:" ZINT_FORMAT, p->pos,
575 cat, p->pos/CAT_MAX);
579 p->pos = b->file[cat].head.free_list;
580 assert((p->pos & CAT_MASK) == cat);
581 if (!cache_block(b, p->pos, p->buf, 0))
583 yaz_log(b->log_io, "bf_read: new_block");
584 if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
586 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
587 (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
588 zebra_exit("isamb:new_block");
592 yaz_log(b->log_freelist, "got block "
593 ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
594 cat, p->pos/CAT_MAX);
595 memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
598 b->file[cat].head_dirty = 1;
599 memset(p->buf, 0, b->file[cat].head.block_size);
600 p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
607 p->decodeClientData = (*b->method->codec.start)();
611 struct ISAMB_block *new_leaf(ISAMB b, int cat)
613 return new_block(b, 1, cat);
617 struct ISAMB_block *new_int(ISAMB b, int cat)
619 return new_block(b, 0, cat);
622 static void check_block(ISAMB b, struct ISAMB_block *p)
624 assert(b); /* mostly to make the compiler shut up about unused b */
632 char *startp = p->bytes;
633 const char *src = startp;
634 char *endp = p->bytes + p->size;
636 void *c1 = (*b->method->codec.start)();
638 decode_ptr(&src, &pos);
639 assert((pos&CAT_MASK) == p->cat);
643 char file_item_buf[DST_ITEM_MAX];
644 char *file_item = file_item_buf;
645 (*b->method->codec.reset)(c1);
646 (*b->method->codec.decode)(c1, &file_item, &src);
649 decode_item_len(&src, &item_len);
650 assert(item_len > 0 && item_len < 128);
653 decode_ptr(&src, &pos);
654 if ((pos&CAT_MASK) != p->cat)
656 assert((pos&CAT_MASK) == p->cat);
659 (*b->method->codec.stop)(c1);
663 void close_block(ISAMB b, struct ISAMB_block *p)
669 yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
670 p->pos, p->cat, p->pos/CAT_MAX);
671 memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
672 b->file[p->cat].head.free_list = p->pos;
673 b->file[p->cat].head_dirty = 1;
674 if (!cache_block(b, p->pos, p->buf, 1))
676 yaz_log(b->log_io, "bf_write: close_block (deleted)");
677 bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
682 int offset = b->file[p->cat].head.block_offset;
683 int size = p->size + offset;
684 char *dst = (char*)p->buf + 3;
685 assert(p->size >= 0);
687 /* memset becuase encode_ptr usually does not write all bytes */
688 memset(p->buf, 0, b->file[p->cat].head.block_offset);
690 p->buf[1] = size & 255;
691 p->buf[2] = size >> 8;
692 encode_ptr(&dst, p->no_items);
694 if (!cache_block(b, p->pos, p->buf, 1))
696 yaz_log(b->log_io, "bf_write: close_block");
697 bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
700 (*b->method->codec.stop)(p->decodeClientData);
705 int insert_sub(ISAMB b, struct ISAMB_block **p,
706 void *new_item, int *mode,
708 struct ISAMB_block **sp,
709 void *sub_item, int *sub_size,
710 const void *max_item);
712 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
714 ISAMC_I *stream, struct ISAMB_block **sp,
715 void *split_item, int *split_size, const void *last_max_item)
717 char *startp = p->bytes;
718 const char *src = startp;
719 char *endp = p->bytes + p->size;
721 struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
722 char sub_item[DST_ITEM_MAX];
726 void *c1 = (*b->method->codec.start)();
730 assert(p->size >= 0);
731 decode_ptr(&src, &pos);
735 const char *src0 = src;
737 char file_item_buf[DST_ITEM_MAX];
738 char *file_item = file_item_buf;
739 (*b->method->codec.reset)(c1);
740 (*b->method->codec.decode)(c1, &file_item, &src);
741 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
744 sub_p1 = open_block(b, pos);
746 diff_terms -= sub_p1->no_items;
747 more = insert_sub(b, &sub_p1, lookahead_item, mode,
749 sub_item, &sub_size, file_item_buf);
750 diff_terms += sub_p1->no_items;
756 decode_item_len(&src, &item_len);
757 d = (*b->method->compare_item)(src, lookahead_item);
760 sub_p1 = open_block(b, pos);
762 diff_terms -= sub_p1->no_items;
763 more = insert_sub(b, &sub_p1, lookahead_item, mode,
765 sub_item, &sub_size, src);
766 diff_terms += sub_p1->no_items;
772 decode_ptr(&src, &pos);
776 /* we reached the end. So lookahead > last item */
777 sub_p1 = open_block(b, pos);
779 diff_terms -= sub_p1->no_items;
780 more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2,
781 sub_item, &sub_size, last_max_item);
782 diff_terms += sub_p1->no_items;
785 diff_terms += sub_p2->no_items;
789 p->no_items += diff_terms;
793 /* there was a split - must insert pointer in this one */
794 char dst_buf[DST_BUF_SIZE];
797 const char *sub_item_ptr = sub_item;
799 assert(sub_size < DST_ITEM_MAX && sub_size > 1);
801 memcpy(dst, startp, src - startp);
806 (*b->method->codec.reset)(c1);
807 (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
809 encode_item_len(&dst, sub_size); /* sub length and item */
810 memcpy(dst, sub_item, sub_size);
814 encode_ptr(&dst, sub_p2->pos); /* pos */
816 if (endp - src) /* remaining data */
818 memcpy(dst, src, endp - src);
821 p->size = dst - dst_buf;
822 assert(p->size >= 0);
824 if (p->size <= b->file[p->cat].head.block_max)
826 /* it fits OK in this block */
827 memcpy(startp, dst_buf, dst - dst_buf);
829 close_block(b, sub_p2);
833 /* must split _this_ block as well .. */
834 struct ISAMB_block *sub_p3;
836 char file_item_buf[DST_ITEM_MAX];
837 char *file_item = file_item_buf;
841 zint no_items_first_half = 0;
847 b->number_of_int_splits++;
850 close_block(b, sub_p2);
852 half = src + b->file[p->cat].head.block_size/2;
853 decode_ptr(&src, &pos);
855 if (b->enable_int_count)
857 /* read sub block so we can get no_items for it */
858 sub_p3 = open_block(b, pos);
859 no_items_first_half += sub_p3->no_items;
860 close_block(b, sub_p3);
866 file_item = file_item_buf;
867 (*b->method->codec.reset)(c1);
868 (*b->method->codec.decode)(c1, &file_item, &src);
870 decode_item_len(&src, &split_size_tmp);
871 *split_size = (int) split_size_tmp;
874 decode_ptr(&src, &pos);
876 if (b->enable_int_count)
878 /* read sub block so we can get no_items for it */
879 sub_p3 = open_block(b, pos);
880 no_items_first_half += sub_p3->no_items;
881 close_block(b, sub_p3);
884 /* p is first half */
885 p_new_size = src - dst_buf;
886 memcpy(p->bytes, dst_buf, p_new_size);
889 file_item = file_item_buf;
890 (*b->method->codec.reset)(c1);
891 (*b->method->codec.decode)(c1, &file_item, &src);
892 *split_size = file_item - file_item_buf;
893 memcpy(split_item, file_item_buf, *split_size);
895 decode_item_len(&src, &split_size_tmp);
896 *split_size = (int) split_size_tmp;
897 memcpy(split_item, src, *split_size);
900 /* *sp is second half */
901 *sp = new_int(b, p->cat);
902 (*sp)->size = endp - src;
903 memcpy((*sp)->bytes, src, (*sp)->size);
905 p->size = p_new_size;
907 /* adjust no_items in first&second half */
908 (*sp)->no_items = p->no_items - no_items_first_half;
909 p->no_items = no_items_first_half;
913 close_block(b, sub_p1);
914 (*b->method->codec.stop)(c1);
918 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
919 int *lookahead_mode, ISAMC_I *stream,
920 struct ISAMB_block **sp2,
921 void *sub_item, int *sub_size,
922 const void *max_item)
924 struct ISAMB_block *p = *sp1;
927 char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
929 void *c1 = (*b->method->codec.start)();
930 void *c2 = (*b->method->codec.start)();
932 int quater = b->file[b->no_cat-1].head.block_max / 4;
933 char *mid_cut = dst_buf + quater * 2;
934 char *tail_cut = dst_buf + quater * 3;
935 char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
938 char cut_item_buf[DST_ITEM_MAX];
939 int cut_item_size = 0;
940 int no_items = 0; /* number of items (total) */
941 int no_items_1 = 0; /* number of items (first half) */
942 int inserted_dst_bytes = 0;
946 char file_item_buf[DST_ITEM_MAX];
947 char *file_item = file_item_buf;
950 endp = p->bytes + p->size;
951 (*b->method->codec.decode)(c1, &file_item, &src);
954 const char *dst_item = 0; /* resulting item to be inserted */
955 char *lookahead_next;
960 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
962 /* d now holds comparison between existing file item and
965 d > 0: lookahead before file
966 d < 0: lookahead after file
970 /* lookahead must be inserted */
971 dst_item = lookahead_item;
972 /* if this is not an insertion, it's really bad .. */
973 if (!*lookahead_mode)
975 yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
976 assert(*lookahead_mode);
979 else if (d == 0 && *lookahead_mode == 2)
981 /* For mode == 2, we insert the new key anyway - even
982 though the comparison is 0. */
983 dst_item = lookahead_item;
987 dst_item = file_item_buf;
989 if (d == 0 && !*lookahead_mode)
991 /* it's a deletion and they match so there is nothing
992 to be inserted anyway .. But mark the thing dirty
993 (file item was part of input.. The item will not be
997 else if (!half1 && dst > mid_cut)
999 /* we have reached the splitting point for the first time */
1000 const char *dst_item_0 = dst_item;
1001 half1 = dst; /* candidate for splitting */
1003 /* encode the resulting item */
1004 (*b->method->codec.encode)(c2, &dst, &dst_item);
1006 cut_item_size = dst_item - dst_item_0;
1007 assert(cut_item_size > 0);
1008 memcpy(cut_item_buf, dst_item_0, cut_item_size);
1011 no_items_1 = no_items;
1016 /* encode the resulting item */
1017 (*b->method->codec.encode)(c2, &dst, &dst_item);
1021 /* now move "pointers" .. result has been encoded .. */
1024 /* we must move the lookahead pointer */
1026 inserted_dst_bytes += (dst - dst_0);
1027 if (inserted_dst_bytes >= quater)
1028 /* no more room. Mark lookahead as "gone".. */
1032 /* move it really.. */
1033 lookahead_next = lookahead_item;
1034 if (!(*stream->read_item)(stream->clientData,
1038 /* end of stream reached: no "more" and no lookahead */
1042 if (lookahead_item && max_item &&
1043 (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1045 /* the lookahead goes beyond what we allow in this
1046 leaf. Mark it as "gone" */
1055 /* exact match .. move both pointers */
1057 lookahead_next = lookahead_item;
1058 if (!(*stream->read_item)(stream->clientData,
1059 &lookahead_next, lookahead_mode))
1065 break; /* end of file stream reached .. */
1066 file_item = file_item_buf; /* move file pointer */
1067 (*b->method->codec.decode)(c1, &file_item, &src);
1071 /* file pointer must be moved */
1074 file_item = file_item_buf;
1075 (*b->method->codec.decode)(c1, &file_item, &src);
1080 /* this loop runs when we are "appending" to a leaf page. That is
1081 either it's empty (new) or all file items have been read in
1084 maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1085 while (lookahead_item)
1088 const char *src = lookahead_item;
1091 /* if we have a lookahead item, we stop if we exceed the value of it */
1093 (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1095 /* stop if we have reached the value of max item */
1098 if (!*lookahead_mode)
1100 /* this is append. So a delete is bad */
1101 yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1102 assert(*lookahead_mode);
1104 else if (!half1 && dst > tail_cut)
1106 const char *src_0 = src;
1107 half1 = dst; /* candidate for splitting */
1109 (*b->method->codec.encode)(c2, &dst, &src);
1111 cut_item_size = src - src_0;
1112 assert(cut_item_size > 0);
1113 memcpy(cut_item_buf, src_0, cut_item_size);
1115 no_items_1 = no_items;
1119 (*b->method->codec.encode)(c2, &dst, &src);
1129 dst_item = lookahead_item;
1130 if (!(*stream->read_item)(stream->clientData, &dst_item,
1137 new_size = dst - dst_buf;
1138 if (p && p->cat != b->no_cat-1 &&
1139 new_size > b->file[p->cat].head.block_max)
1141 /* non-btree block will be removed */
1144 /* delete it too!! */
1145 p = 0; /* make a new one anyway */
1148 { /* must create a new one */
1150 for (i = 0; i < b->no_cat; i++)
1151 if (new_size <= b->file[i].head.block_max)
1157 if (new_size > b->file[p->cat].head.block_max)
1160 const char *cut_item = cut_item_buf;
1165 assert(cut_item_size > 0);
1168 p->size = half1 - dst_buf;
1169 assert(p->size <= b->file[p->cat].head.block_max);
1170 memcpy(p->bytes, dst_buf, half1 - dst_buf);
1171 p->no_items = no_items_1;
1174 *sp2 = new_leaf(b, p->cat);
1176 (*b->method->codec.reset)(c2);
1178 b->number_of_leaf_splits++;
1180 first_dst = (*sp2)->bytes;
1182 (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1184 memcpy(first_dst, half2, dst - half2);
1186 (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1187 assert((*sp2)->size <= b->file[p->cat].head.block_max);
1188 (*sp2)->no_items = no_items - no_items_1;
1191 memcpy(sub_item, cut_item_buf, cut_item_size);
1192 *sub_size = cut_item_size;
1196 memcpy(p->bytes, dst_buf, dst - dst_buf);
1198 p->no_items = no_items;
1200 (*b->method->codec.stop)(c1);
1201 (*b->method->codec.stop)(c2);
1206 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1209 struct ISAMB_block **sp,
1210 void *sub_item, int *sub_size,
1211 const void *max_item)
1213 if (!*p || (*p)->leaf)
1214 return insert_leaf(b, p, new_item, mode, stream, sp, sub_item,
1215 sub_size, max_item);
1217 return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1218 sub_size, max_item);
1221 int isamb_unlink(ISAMB b, ISAM_P pos)
1223 struct ISAMB_block *p1;
1227 p1 = open_block(b, pos);
1232 const char *src = p1->bytes + p1->offset;
1234 void *c1 = (*b->method->codec.start)();
1236 decode_ptr(&src, &sub_p);
1237 isamb_unlink(b, sub_p);
1239 while (src != p1->bytes + p1->size)
1242 char file_item_buf[DST_ITEM_MAX];
1243 char *file_item = file_item_buf;
1244 (*b->method->codec.reset)(c1);
1245 (*b->method->codec.decode)(c1, &file_item, &src);
1248 decode_item_len(&src, &item_len);
1251 decode_ptr(&src, &sub_p);
1252 isamb_unlink(b, sub_p);
1255 (*b->method->codec.stop)(c1);
1262 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1264 char item_buf[DST_ITEM_MAX];
1268 int must_delete = 0;
1275 item_ptr = item_buf;
1277 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1282 item_ptr = item_buf;
1283 more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1286 struct ISAMB_block *p = 0, *sp = 0;
1287 char sub_item[DST_ITEM_MAX];
1291 p = open_block(b, *pos);
1292 more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1293 sub_item, &sub_size, 0);
1295 { /* increase level of tree by one */
1296 struct ISAMB_block *p2 = new_int(b, p->cat);
1297 char *dst = p2->bytes + p2->size;
1299 void *c1 = (*b->method->codec.start)();
1300 const char *sub_item_ptr = sub_item;
1303 encode_ptr(&dst, p->pos);
1304 assert(sub_size < DST_ITEM_MAX && sub_size > 1);
1306 (*b->method->codec.reset)(c1);
1307 (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1309 encode_item_len(&dst, sub_size);
1310 memcpy(dst, sub_item, sub_size);
1313 encode_ptr(&dst, sp->pos);
1315 p2->size = dst - p2->bytes;
1316 p2->no_items = p->no_items + sp->no_items;
1317 *pos = p2->pos; /* return new super page */
1321 (*b->method->codec.stop)(c1);
1326 *pos = p->pos; /* return current one (again) */
1328 if (p->no_items == 0)
1336 isamb_unlink(b, *pos);
1341 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1343 ISAMB_PP pp = xmalloc(sizeof(*pp));
1349 pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1356 pp->skipped_numbers = 0;
1357 pp->returned_numbers = 0;
1359 for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1360 pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1363 struct ISAMB_block *p = open_block(isamb, pos);
1364 const char *src = p->bytes + p->offset;
1365 pp->block[pp->level] = p;
1367 pp->total_size += p->size;
1371 decode_ptr(&src, &pos);
1372 p->offset = src - p->bytes;
1374 pp->accessed_nodes[pp->level]++;
1376 pp->block[pp->level+1] = 0;
1377 pp->maxlevel = pp->level;
1383 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1385 return isamb_pp_open_x(isamb, pos, 0, scope);
1388 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1393 yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, "
1394 "skipped "ZINT_FORMAT,
1395 pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1396 for (i = pp->maxlevel; i>=0; i--)
1397 if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1398 yaz_log(YLOG_DEBUG, "isamb_pp_close level leaf-%d: "
1399 ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1400 pp->accessed_nodes[i], pp->skipped_nodes[i]);
1401 pp->isamb->skipped_numbers += pp->skipped_numbers;
1402 pp->isamb->returned_numbers += pp->returned_numbers;
1403 for (i = pp->maxlevel; i>=0; i--)
1405 pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1406 pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1409 *size = pp->total_size;
1411 *blocks = pp->no_blocks;
1412 for (i = 0; i <= pp->level; i++)
1413 close_block(pp->isamb, pp->block[i]);
1418 int isamb_block_info(ISAMB isamb, int cat)
1420 if (cat >= 0 && cat < isamb->no_cat)
1421 return isamb->file[cat].head.block_size;
1425 void isamb_pp_close(ISAMB_PP pp)
1427 isamb_pp_close_x(pp, 0, 0);
1430 /* simple recursive dumper .. */
1431 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1435 char prefix_str[1024];
1438 struct ISAMB_block *p = open_block(b, pos);
1439 sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1440 ZINT_FORMAT, level*2, "",
1441 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1444 sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1447 while (p->offset < p->size)
1449 const char *src = p->bytes + p->offset;
1451 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1452 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1453 p->offset = src - (char*) p->bytes;
1455 assert(p->offset == p->size);
1459 const char *src = p->bytes + p->offset;
1462 decode_ptr(&src, &sub);
1463 p->offset = src - (char*) p->bytes;
1465 isamb_dump_r(b, sub, pr, level+1);
1467 while (p->offset < p->size)
1470 char file_item_buf[DST_ITEM_MAX];
1471 char *file_item = file_item_buf;
1472 void *c1 = (*b->method->codec.start)();
1473 (*b->method->codec.decode)(c1, &file_item, &src);
1474 (*b->method->codec.stop)(c1);
1475 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1478 decode_item_len(&src, &item_len);
1479 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1482 decode_ptr(&src, &sub);
1484 p->offset = src - (char*) p->bytes;
1486 isamb_dump_r(b, sub, pr, level+1);
1493 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1495 isamb_dump_r(b, pos, pr, 0);
1498 int isamb_pp_read(ISAMB_PP pp, void *buf)
1500 return isamb_pp_forward(pp, buf, 0);
1504 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1505 { /* return an estimate of the current position and of the total number of */
1506 /* occureences in the isam tree, based on the current leaf */
1510 /* if end-of-stream PP may not be leaf */
1512 *total = (double) (pp->block[0]->no_items);
1513 *current = (double) pp->returned_numbers;
1515 yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1516 ZINT_FORMAT, *current, *total, pp->returned_numbers);
1520 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1524 struct ISAMB_block *p = pp->block[pp->level];
1525 ISAMB b = pp->isamb;
1529 while (p->offset == p->size)
1535 char file_item_buf[DST_ITEM_MAX];
1536 char *file_item = file_item_buf;
1540 while (p->offset == p->size)
1544 close_block(pp->isamb, pp->block[pp->level]);
1545 pp->block[pp->level] = 0;
1547 p = pp->block[pp->level];
1552 src = p->bytes + p->offset;
1555 c1 = (*b->method->codec.start)();
1556 (*b->method->codec.decode)(c1, &file_item, &src);
1558 decode_ptr(&src, &item_len);
1561 decode_ptr(&src, &pos);
1562 p->offset = src - (char*) p->bytes;
1564 src = p->bytes + p->offset;
1568 if (!untilb || p->offset == p->size)
1570 assert(p->offset < p->size);
1573 file_item = file_item_buf;
1574 (*b->method->codec.reset)(c1);
1575 (*b->method->codec.decode)(c1, &file_item, &src);
1576 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1582 decode_item_len(&src, &item_len);
1583 if ((*b->method->compare_item)(untilb, src) < pp->scope)
1587 decode_ptr(&src, &pos);
1588 p->offset = src - (char*) p->bytes;
1595 pp->block[pp->level] = p = open_block(pp->isamb, pos);
1597 pp->total_size += p->size;
1605 src = p->bytes + p->offset;
1608 decode_ptr(&src, &pos);
1609 p->offset = src - (char*) p->bytes;
1611 if (!untilb || p->offset == p->size)
1613 assert(p->offset < p->size);
1616 file_item = file_item_buf;
1617 (*b->method->codec.reset)(c1);
1618 (*b->method->codec.decode)(c1, &file_item, &src);
1619 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1625 decode_ptr(&src, &item_len);
1626 if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1634 (*b->method->codec.stop)(c1);
1637 assert(p->offset < p->size);
1642 src = p->bytes + p->offset;
1643 (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1644 p->offset = src - (char*) p->bytes;
1645 if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1648 if (p->offset == p->size) goto again;
1650 pp->returned_numbers++;
1654 zint isamb_get_int_splits(ISAMB b)
1656 return b->number_of_int_splits;
1659 zint isamb_get_leaf_splits(ISAMB b)
1661 return b->number_of_leaf_splits;
1664 zint isamb_get_root_ptr(ISAMB b)
1669 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1671 b->root_ptr = root_ptr;
1678 * c-file-style: "Stroustrup"
1679 * indent-tabs-mode: nil
1681 * vim: shiftwidth=4 tabstop=8 expandtab