1 /* This file is part of the Zebra server.
2 Copyright (C) 2004-2013 Index Data
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION_NO_ROOT 0
37 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
49 /* if 1, upper nodes items are encoded; 0 if not encoded */
52 /* maximum size of encoded buffer */
53 #define DST_ITEM_MAX 5000
55 /* max page size for _any_ isamb use */
56 #define ISAMB_MAX_PAGE 32768
58 #define ISAMB_MAX_LEVEL 10
59 /* approx 2*max page + max size of item */
60 #define DST_BUF_SIZE (2*ISAMB_MAX_PAGE+DST_ITEM_MAX+100)
62 /* should be maximum block size of multiple thereof */
63 #define ISAMB_CACHE_ENTRY_SIZE ISAMB_MAX_PAGE
65 /* CAT_MAX: _must_ be power of 2 */
67 #define CAT_MASK (CAT_MAX-1)
68 /* CAT_NO: <= CAT_MAX */
71 /* Smallest block size */
72 #define ISAMB_MIN_SIZE 32
74 #define ISAMB_FAC_SIZE 4
76 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
77 #define ISAMB_PTR_CODEC 1
79 struct ISAMB_cache_entry {
84 struct ISAMB_cache_entry *next;
90 struct ISAMB_head head;
91 struct ISAMB_cache_entry *cache_entries;
98 struct ISAMB_file *file;
100 int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
101 int log_io; /* log level for bf_read/bf_write calls */
102 int log_freelist; /* log level for freelist handling */
103 zint skipped_numbers; /* on a leaf node */
104 zint returned_numbers;
105 zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
106 zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
107 zint number_of_int_splits;
108 zint number_of_leaf_splits;
109 int enable_int_count; /* whether we count nodes (or not) */
110 int cache_size; /* size of blocks to cache (if cache=1) */
123 zint no_items; /* number of nodes in this + children */
127 void *decodeClientData;
135 int maxlevel; /* total depth */
138 zint skipped_numbers; /* on a leaf node */
139 zint returned_numbers;
140 zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
141 zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
142 struct ISAMB_block **block;
143 int scope; /* on what level we forward */
147 #define encode_item_len encode_ptr
149 static void encode_ptr(char **dst, zint pos)
151 unsigned char *bp = (unsigned char*) *dst;
155 *bp++ = (unsigned char) (128 | (pos & 127));
158 *bp++ = (unsigned char) pos;
162 static void encode_ptr(char **dst, zint pos)
164 memcpy(*dst, &pos, sizeof(pos));
165 (*dst) += sizeof(pos);
169 #define decode_item_len decode_ptr
171 static void decode_ptr(const char **src, zint *pos)
177 while (((c = *(const unsigned char *)((*src)++)) & 128))
179 d += ((zint) (c & 127) << r);
182 d += ((zint) c << r);
186 static void decode_ptr(const char **src, zint *pos)
188 memcpy(pos, *src, sizeof(*pos));
189 (*src) += sizeof(*pos);
194 void isamb_set_int_count(ISAMB b, int v)
196 b->enable_int_count = v;
199 void isamb_set_cache_size(ISAMB b, int v)
204 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
205 int cache, int no_cat, int *sizes, int use_root_ptr)
207 ISAMB isamb = xmalloc(sizeof(*isamb));
210 assert(no_cat <= CAT_MAX);
213 isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
214 memcpy(isamb->method, method, sizeof(*method));
215 isamb->no_cat = no_cat;
217 isamb->log_freelist = 0;
218 isamb->cache = cache;
219 isamb->skipped_numbers = 0;
220 isamb->returned_numbers = 0;
221 isamb->number_of_int_splits = 0;
222 isamb->number_of_leaf_splits = 0;
223 isamb->enable_int_count = 1;
224 isamb->cache_size = 40;
227 isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
229 isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
233 for (i = 0; i<ISAMB_MAX_LEVEL; i++)
234 isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
238 yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
242 assert(cache == 0 || cache == 1);
244 isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
246 for (i = 0; i < isamb->no_cat; i++)
248 isamb->file[i].bf = 0;
249 isamb->file[i].head_dirty = 0;
250 isamb->file[i].cache_entries = 0;
253 for (i = 0; i < isamb->no_cat; i++)
255 char fname[DST_BUF_SIZE];
256 char hbuf[DST_BUF_SIZE];
258 sprintf(fname, "%s%c", name, i+'A');
260 isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
263 isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
265 if (!isamb->file[i].bf)
271 /* fill-in default values (for empty isamb) */
272 isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
273 isamb->file[i].head.last_block = isamb->file[i].head.first_block;
274 isamb->file[i].head.block_size = sizes[i];
275 assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
277 if (i == isamb->no_cat-1 || sizes[i] > 128)
278 isamb->file[i].head.block_offset = 8;
280 isamb->file[i].head.block_offset = 4;
282 isamb->file[i].head.block_offset = 11;
284 isamb->file[i].head.block_max =
285 sizes[i] - isamb->file[i].head.block_offset;
286 isamb->file[i].head.free_list = 0;
287 if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
289 /* got header assume "isamb"major minor len can fit in 16 bytes */
291 int major, minor, len, pos = 0;
294 if (memcmp(hbuf, "isamb", 5))
296 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
300 if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
302 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
306 if (major != ISAMB_MAJOR_VERSION)
308 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
309 fname, major, ISAMB_MAJOR_VERSION);
313 for (left = len - sizes[i]; left > 0; left = left - sizes[i])
316 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
318 yaz_log(YLOG_WARN, "truncated isamb header for "
319 "file=%s len=%d pos=%d",
326 decode_ptr(&src, &isamb->file[i].head.first_block);
327 decode_ptr(&src, &isamb->file[i].head.last_block);
328 decode_ptr(&src, &zint_tmp);
329 isamb->file[i].head.block_size = (int) zint_tmp;
330 decode_ptr(&src, &zint_tmp);
331 isamb->file[i].head.block_max = (int) zint_tmp;
332 decode_ptr(&src, &isamb->file[i].head.free_list);
333 if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
334 decode_ptr(&src, &isamb->root_ptr);
336 assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
337 /* must rewrite the header if root ptr is in use (bug #1017) */
338 if (use_root_ptr && writeflag)
339 isamb->file[i].head_dirty = 1;
341 isamb->file[i].head_dirty = 0;
342 assert(isamb->file[i].head.block_size == sizes[i]);
345 yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
350 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
354 int i, b_size = ISAMB_MIN_SIZE;
356 for (i = 0; i<CAT_NO; i++)
359 b_size = b_size * ISAMB_FAC_SIZE;
361 return isamb_open2(bfs, name, writeflag, method, cache,
365 static void flush_blocks(ISAMB b, int cat)
367 while (b->file[cat].cache_entries)
369 struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
370 b->file[cat].cache_entries = ce_this->next;
374 yaz_log(b->log_io, "bf_write: flush_blocks");
375 bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
382 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
384 int cat = (int) (pos&CAT_MASK);
385 int off = (int) (((pos/CAT_MAX) &
386 (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
387 * b->file[cat].head.block_size);
388 zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
390 struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
395 assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
396 for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
399 if ((*ce)->pos == norm)
402 *ce = (*ce)->next; /* remove from list */
404 ce_this->next = b->file[cat].cache_entries; /* move to front */
405 b->file[cat].cache_entries = ce_this;
409 memcpy(ce_this->buf + off, userbuf,
410 b->file[cat].head.block_size);
414 memcpy(userbuf, ce_this->buf + off,
415 b->file[cat].head.block_size);
419 if (no >= b->cache_size)
421 assert(ce_last && *ce_last);
423 *ce_last = 0; /* remove the last entry from list */
426 yaz_log(b->log_io, "bf_write: cache_block");
427 bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
432 ce_this = xmalloc(sizeof(*ce_this));
433 ce_this->next = b->file[cat].cache_entries;
434 b->file[cat].cache_entries = ce_this;
435 ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
437 yaz_log(b->log_io, "bf_read: cache_block");
438 if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
439 memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
442 memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
448 memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
454 void isamb_close(ISAMB isamb)
457 for (i = 0; isamb->accessed_nodes[i]; i++)
458 yaz_log(YLOG_DEBUG, "isamb_close level leaf-%d: "ZINT_FORMAT" read, "
459 ZINT_FORMAT" skipped",
460 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
461 yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
462 "skipped "ZINT_FORMAT,
463 isamb->skipped_numbers, isamb->returned_numbers);
465 for (i = 0; i<isamb->no_cat; i++)
467 flush_blocks(isamb, i);
468 if (isamb->file[i].head_dirty)
470 char hbuf[DST_BUF_SIZE];
471 int major = ISAMB_MAJOR_VERSION;
473 char *dst = hbuf + 16;
475 int b_size = isamb->file[i].head.block_size;
477 encode_ptr(&dst, isamb->file[i].head.first_block);
478 encode_ptr(&dst, isamb->file[i].head.last_block);
479 encode_ptr(&dst, isamb->file[i].head.block_size);
480 encode_ptr(&dst, isamb->file[i].head.block_max);
481 encode_ptr(&dst, isamb->file[i].head.free_list);
483 if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
484 encode_ptr(&dst, isamb->root_ptr);
486 memset(dst, '\0', b_size); /* ensure no random bytes are written */
490 /* print exactly 16 bytes (including trailing 0) */
491 sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
492 isamb->minor_version, len);
494 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
496 for (left = len - b_size; left > 0; left = left - b_size)
499 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
502 if (isamb->file[i].bf)
503 bf_close (isamb->file[i].bf);
506 xfree(isamb->method);
510 /* open_block: read one block at pos.
511 Decode leading sys bytes .. consisting of
513 0: leader byte, != 0 leaf, == 0, non-leaf
514 1-2: used size of block
515 3-7*: number of items and all children
517 * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
518 of items. We can thus have at most 2^40 nodes.
520 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
522 int cat = (int) (pos&CAT_MASK);
524 int offset = b->file[cat].head.block_offset;
525 struct ISAMB_block *p;
528 p = xmalloc(sizeof(*p));
530 p->cat = (int) (pos & CAT_MASK);
531 p->buf = xmalloc(b->file[cat].head.block_size);
534 if (!cache_block (b, pos, p->buf, 0))
536 yaz_log(b->log_io, "bf_read: open_block");
537 if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
539 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
540 (long) pos, (long) pos/CAT_MAX);
541 zebra_exit("isamb:open_block");
544 p->bytes = (char *)p->buf + offset;
546 p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
549 yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
552 assert(p->size >= 0);
553 src = (char*) p->buf + 3;
554 decode_ptr(&src, &p->no_items);
559 p->decodeClientData = (*b->method->codec.start)();
563 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
565 struct ISAMB_block *p;
567 p = xmalloc(sizeof(*p));
568 p->buf = xmalloc(b->file[cat].head.block_size);
570 if (!b->file[cat].head.free_list)
573 block_no = b->file[cat].head.last_block++;
574 p->pos = block_no * CAT_MAX + cat;
576 yaz_log(b->log_freelist, "got block "
577 ZINT_FORMAT " from last %d:" ZINT_FORMAT, p->pos,
578 cat, p->pos/CAT_MAX);
582 p->pos = b->file[cat].head.free_list;
583 assert((p->pos & CAT_MASK) == cat);
584 if (!cache_block(b, p->pos, p->buf, 0))
586 yaz_log(b->log_io, "bf_read: new_block");
587 if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
589 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
590 (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
591 zebra_exit("isamb:new_block");
595 yaz_log(b->log_freelist, "got block "
596 ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
597 cat, p->pos/CAT_MAX);
598 memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
601 b->file[cat].head_dirty = 1;
602 memset(p->buf, 0, b->file[cat].head.block_size);
603 p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
610 p->decodeClientData = (*b->method->codec.start)();
614 struct ISAMB_block *new_leaf(ISAMB b, int cat)
616 return new_block(b, 1, cat);
620 struct ISAMB_block *new_int(ISAMB b, int cat)
622 return new_block(b, 0, cat);
625 static void check_block(ISAMB b, struct ISAMB_block *p)
627 assert(b); /* mostly to make the compiler shut up about unused b */
635 char *startp = p->bytes;
636 const char *src = startp;
637 char *endp = p->bytes + p->size;
639 void *c1 = (*b->method->codec.start)();
641 decode_ptr(&src, &pos);
642 assert((pos&CAT_MASK) == p->cat);
646 char file_item_buf[DST_ITEM_MAX];
647 char *file_item = file_item_buf;
648 (*b->method->codec.reset)(c1);
649 (*b->method->codec.decode)(c1, &file_item, &src);
652 decode_item_len(&src, &item_len);
653 assert(item_len > 0 && item_len < 128);
656 decode_ptr(&src, &pos);
657 if ((pos&CAT_MASK) != p->cat)
659 assert((pos&CAT_MASK) == p->cat);
662 (*b->method->codec.stop)(c1);
666 void close_block(ISAMB b, struct ISAMB_block *p)
672 yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
673 p->pos, p->cat, p->pos/CAT_MAX);
674 memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
675 b->file[p->cat].head.free_list = p->pos;
676 b->file[p->cat].head_dirty = 1;
677 if (!cache_block(b, p->pos, p->buf, 1))
679 yaz_log(b->log_io, "bf_write: close_block (deleted)");
680 bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
685 int offset = b->file[p->cat].head.block_offset;
686 int size = p->size + offset;
687 char *dst = (char*)p->buf + 3;
688 assert(p->size >= 0);
690 /* memset becuase encode_ptr usually does not write all bytes */
691 memset(p->buf, 0, b->file[p->cat].head.block_offset);
693 p->buf[1] = size & 255;
694 p->buf[2] = size >> 8;
695 encode_ptr(&dst, p->no_items);
697 if (!cache_block(b, p->pos, p->buf, 1))
699 yaz_log(b->log_io, "bf_write: close_block");
700 bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
703 (*b->method->codec.stop)(p->decodeClientData);
708 int insert_sub(ISAMB b, struct ISAMB_block **p,
709 void *new_item, int *mode,
711 struct ISAMB_block **sp,
712 void *sub_item, int *sub_size,
713 const void *max_item);
715 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
717 ISAMC_I *stream, struct ISAMB_block **sp,
718 void *split_item, int *split_size, const void *last_max_item)
720 char *startp = p->bytes;
721 const char *src = startp;
722 char *endp = p->bytes + p->size;
724 struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
725 char sub_item[DST_ITEM_MAX];
729 void *c1 = (*b->method->codec.start)();
733 assert(p->size >= 0);
734 decode_ptr(&src, &pos);
738 const char *src0 = src;
740 char file_item_buf[DST_ITEM_MAX];
741 char *file_item = file_item_buf;
742 (*b->method->codec.reset)(c1);
743 (*b->method->codec.decode)(c1, &file_item, &src);
744 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
747 sub_p1 = open_block(b, pos);
749 diff_terms -= sub_p1->no_items;
750 more = insert_sub(b, &sub_p1, lookahead_item, mode,
752 sub_item, &sub_size, file_item_buf);
753 diff_terms += sub_p1->no_items;
759 decode_item_len(&src, &item_len);
760 d = (*b->method->compare_item)(src, lookahead_item);
763 sub_p1 = open_block(b, pos);
765 diff_terms -= sub_p1->no_items;
766 more = insert_sub(b, &sub_p1, lookahead_item, mode,
768 sub_item, &sub_size, src);
769 diff_terms += sub_p1->no_items;
775 decode_ptr(&src, &pos);
779 /* we reached the end. So lookahead > last item */
780 sub_p1 = open_block(b, pos);
782 diff_terms -= sub_p1->no_items;
783 more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2,
784 sub_item, &sub_size, last_max_item);
785 diff_terms += sub_p1->no_items;
788 diff_terms += sub_p2->no_items;
792 p->no_items += diff_terms;
796 /* there was a split - must insert pointer in this one */
797 char dst_buf[DST_BUF_SIZE];
800 const char *sub_item_ptr = sub_item;
802 assert(sub_size < DST_ITEM_MAX && sub_size > 1);
804 memcpy(dst, startp, src - startp);
809 (*b->method->codec.reset)(c1);
810 (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
812 encode_item_len(&dst, sub_size); /* sub length and item */
813 memcpy(dst, sub_item, sub_size);
817 encode_ptr(&dst, sub_p2->pos); /* pos */
819 if (endp - src) /* remaining data */
821 memcpy(dst, src, endp - src);
824 p->size = dst - dst_buf;
825 assert(p->size >= 0);
827 if (p->size <= b->file[p->cat].head.block_max)
829 /* it fits OK in this block */
830 memcpy(startp, dst_buf, dst - dst_buf);
832 close_block(b, sub_p2);
836 /* must split _this_ block as well .. */
837 struct ISAMB_block *sub_p3;
839 char file_item_buf[DST_ITEM_MAX];
840 char *file_item = file_item_buf;
844 zint no_items_first_half = 0;
850 b->number_of_int_splits++;
853 close_block(b, sub_p2);
855 half = src + b->file[p->cat].head.block_size/2;
856 decode_ptr(&src, &pos);
858 if (b->enable_int_count)
860 /* read sub block so we can get no_items for it */
861 sub_p3 = open_block(b, pos);
862 no_items_first_half += sub_p3->no_items;
863 close_block(b, sub_p3);
869 file_item = file_item_buf;
870 (*b->method->codec.reset)(c1);
871 (*b->method->codec.decode)(c1, &file_item, &src);
873 decode_item_len(&src, &split_size_tmp);
874 *split_size = (int) split_size_tmp;
877 decode_ptr(&src, &pos);
879 if (b->enable_int_count)
881 /* read sub block so we can get no_items for it */
882 sub_p3 = open_block(b, pos);
883 no_items_first_half += sub_p3->no_items;
884 close_block(b, sub_p3);
887 /* p is first half */
888 p_new_size = src - dst_buf;
889 memcpy(p->bytes, dst_buf, p_new_size);
892 file_item = file_item_buf;
893 (*b->method->codec.reset)(c1);
894 (*b->method->codec.decode)(c1, &file_item, &src);
895 *split_size = file_item - file_item_buf;
896 memcpy(split_item, file_item_buf, *split_size);
898 decode_item_len(&src, &split_size_tmp);
899 *split_size = (int) split_size_tmp;
900 memcpy(split_item, src, *split_size);
903 /* *sp is second half */
904 *sp = new_int(b, p->cat);
905 (*sp)->size = endp - src;
906 memcpy((*sp)->bytes, src, (*sp)->size);
908 p->size = p_new_size;
910 /* adjust no_items in first&second half */
911 (*sp)->no_items = p->no_items - no_items_first_half;
912 p->no_items = no_items_first_half;
916 close_block(b, sub_p1);
917 (*b->method->codec.stop)(c1);
921 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
922 int *lookahead_mode, ISAMC_I *stream,
923 struct ISAMB_block **sp2,
924 void *sub_item, int *sub_size,
925 const void *max_item)
927 struct ISAMB_block *p = *sp1;
930 char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
932 void *c1 = (*b->method->codec.start)();
933 void *c2 = (*b->method->codec.start)();
935 int quater = b->file[b->no_cat-1].head.block_max / 4;
936 char *mid_cut = dst_buf + quater * 2;
937 char *tail_cut = dst_buf + quater * 3;
938 char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
941 char cut_item_buf[DST_ITEM_MAX];
942 int cut_item_size = 0;
943 int no_items = 0; /* number of items (total) */
944 int no_items_1 = 0; /* number of items (first half) */
945 int inserted_dst_bytes = 0;
949 char file_item_buf[DST_ITEM_MAX];
950 char *file_item = file_item_buf;
953 endp = p->bytes + p->size;
954 (*b->method->codec.decode)(c1, &file_item, &src);
957 const char *dst_item = 0; /* resulting item to be inserted */
958 char *lookahead_next;
963 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
965 /* d now holds comparison between existing file item and
968 d > 0: lookahead before file
969 d < 0: lookahead after file
973 /* lookahead must be inserted */
974 dst_item = lookahead_item;
975 /* if this is not an insertion, it's really bad .. */
976 if (!*lookahead_mode)
978 yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
979 assert(*lookahead_mode);
982 else if (d == 0 && *lookahead_mode == 2)
984 /* For mode == 2, we insert the new key anyway - even
985 though the comparison is 0. */
986 dst_item = lookahead_item;
990 dst_item = file_item_buf;
992 if (d == 0 && !*lookahead_mode)
994 /* it's a deletion and they match so there is nothing
995 to be inserted anyway .. But mark the thing dirty
996 (file item was part of input.. The item will not be
1000 else if (!half1 && dst > mid_cut)
1002 /* we have reached the splitting point for the first time */
1003 const char *dst_item_0 = dst_item;
1004 half1 = dst; /* candidate for splitting */
1006 /* encode the resulting item */
1007 (*b->method->codec.encode)(c2, &dst, &dst_item);
1009 cut_item_size = dst_item - dst_item_0;
1010 assert(cut_item_size > 0);
1011 memcpy(cut_item_buf, dst_item_0, cut_item_size);
1014 no_items_1 = no_items;
1019 /* encode the resulting item */
1020 (*b->method->codec.encode)(c2, &dst, &dst_item);
1024 /* now move "pointers" .. result has been encoded .. */
1027 /* we must move the lookahead pointer */
1029 inserted_dst_bytes += (dst - dst_0);
1030 if (inserted_dst_bytes >= quater)
1031 /* no more room. Mark lookahead as "gone".. */
1035 /* move it really.. */
1036 lookahead_next = lookahead_item;
1037 if (!(*stream->read_item)(stream->clientData,
1041 /* end of stream reached: no "more" and no lookahead */
1045 if (lookahead_item && max_item &&
1046 (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1048 /* the lookahead goes beyond what we allow in this
1049 leaf. Mark it as "gone" */
1058 /* exact match .. move both pointers */
1060 lookahead_next = lookahead_item;
1061 if (!(*stream->read_item)(stream->clientData,
1062 &lookahead_next, lookahead_mode))
1068 break; /* end of file stream reached .. */
1069 file_item = file_item_buf; /* move file pointer */
1070 (*b->method->codec.decode)(c1, &file_item, &src);
1074 /* file pointer must be moved */
1077 file_item = file_item_buf;
1078 (*b->method->codec.decode)(c1, &file_item, &src);
1083 /* this loop runs when we are "appending" to a leaf page. That is
1084 either it's empty (new) or all file items have been read in
1087 maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1088 while (lookahead_item)
1091 const char *src = lookahead_item;
1094 /* if we have a lookahead item, we stop if we exceed the value of it */
1096 (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1098 /* stop if we have reached the value of max item */
1101 if (!*lookahead_mode)
1103 /* this is append. So a delete is bad */
1104 yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1105 assert(*lookahead_mode);
1107 else if (!half1 && dst > tail_cut)
1109 const char *src_0 = src;
1110 half1 = dst; /* candidate for splitting */
1112 (*b->method->codec.encode)(c2, &dst, &src);
1114 cut_item_size = src - src_0;
1115 assert(cut_item_size > 0);
1116 memcpy(cut_item_buf, src_0, cut_item_size);
1118 no_items_1 = no_items;
1122 (*b->method->codec.encode)(c2, &dst, &src);
1132 dst_item = lookahead_item;
1133 if (!(*stream->read_item)(stream->clientData, &dst_item,
1140 new_size = dst - dst_buf;
1141 if (p && p->cat != b->no_cat-1 &&
1142 new_size > b->file[p->cat].head.block_max)
1144 /* non-btree block will be removed */
1147 /* delete it too!! */
1148 p = 0; /* make a new one anyway */
1151 { /* must create a new one */
1153 for (i = 0; i < b->no_cat; i++)
1154 if (new_size <= b->file[i].head.block_max)
1160 if (new_size > b->file[p->cat].head.block_max)
1163 const char *cut_item = cut_item_buf;
1168 assert(cut_item_size > 0);
1171 p->size = half1 - dst_buf;
1172 assert(p->size <= b->file[p->cat].head.block_max);
1173 memcpy(p->bytes, dst_buf, half1 - dst_buf);
1174 p->no_items = no_items_1;
1177 *sp2 = new_leaf(b, p->cat);
1179 (*b->method->codec.reset)(c2);
1181 b->number_of_leaf_splits++;
1183 first_dst = (*sp2)->bytes;
1185 (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1187 memcpy(first_dst, half2, dst - half2);
1189 (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1190 assert((*sp2)->size <= b->file[p->cat].head.block_max);
1191 (*sp2)->no_items = no_items - no_items_1;
1194 memcpy(sub_item, cut_item_buf, cut_item_size);
1195 *sub_size = cut_item_size;
1199 memcpy(p->bytes, dst_buf, dst - dst_buf);
1201 p->no_items = no_items;
1203 (*b->method->codec.stop)(c1);
1204 (*b->method->codec.stop)(c2);
1209 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1212 struct ISAMB_block **sp,
1213 void *sub_item, int *sub_size,
1214 const void *max_item)
1216 if (!*p || (*p)->leaf)
1217 return insert_leaf(b, p, new_item, mode, stream, sp, sub_item,
1218 sub_size, max_item);
1220 return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1221 sub_size, max_item);
1224 int isamb_unlink(ISAMB b, ISAM_P pos)
1226 struct ISAMB_block *p1;
1230 p1 = open_block(b, pos);
1235 const char *src = p1->bytes + p1->offset;
1237 void *c1 = (*b->method->codec.start)();
1239 decode_ptr(&src, &sub_p);
1240 isamb_unlink(b, sub_p);
1242 while (src != p1->bytes + p1->size)
1245 char file_item_buf[DST_ITEM_MAX];
1246 char *file_item = file_item_buf;
1247 (*b->method->codec.reset)(c1);
1248 (*b->method->codec.decode)(c1, &file_item, &src);
1251 decode_item_len(&src, &item_len);
1254 decode_ptr(&src, &sub_p);
1255 isamb_unlink(b, sub_p);
1258 (*b->method->codec.stop)(c1);
1265 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1267 char item_buf[DST_ITEM_MAX];
1271 int must_delete = 0;
1278 item_ptr = item_buf;
1280 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1285 item_ptr = item_buf;
1286 more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1289 struct ISAMB_block *p = 0, *sp = 0;
1290 char sub_item[DST_ITEM_MAX];
1294 p = open_block(b, *pos);
1295 more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1296 sub_item, &sub_size, 0);
1298 { /* increase level of tree by one */
1299 struct ISAMB_block *p2 = new_int(b, p->cat);
1300 char *dst = p2->bytes + p2->size;
1302 void *c1 = (*b->method->codec.start)();
1303 const char *sub_item_ptr = sub_item;
1306 encode_ptr(&dst, p->pos);
1307 assert(sub_size < DST_ITEM_MAX && sub_size > 1);
1309 (*b->method->codec.reset)(c1);
1310 (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1312 encode_item_len(&dst, sub_size);
1313 memcpy(dst, sub_item, sub_size);
1316 encode_ptr(&dst, sp->pos);
1318 p2->size = dst - p2->bytes;
1319 p2->no_items = p->no_items + sp->no_items;
1320 *pos = p2->pos; /* return new super page */
1324 (*b->method->codec.stop)(c1);
1329 *pos = p->pos; /* return current one (again) */
1331 if (p->no_items == 0)
1339 isamb_unlink(b, *pos);
1344 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1346 ISAMB_PP pp = xmalloc(sizeof(*pp));
1352 pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1359 pp->skipped_numbers = 0;
1360 pp->returned_numbers = 0;
1362 for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1363 pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1366 struct ISAMB_block *p = open_block(isamb, pos);
1367 const char *src = p->bytes + p->offset;
1368 pp->block[pp->level] = p;
1370 pp->total_size += p->size;
1374 decode_ptr(&src, &pos);
1375 p->offset = src - p->bytes;
1377 pp->accessed_nodes[pp->level]++;
1379 pp->block[pp->level+1] = 0;
1380 pp->maxlevel = pp->level;
1386 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1388 return isamb_pp_open_x(isamb, pos, 0, scope);
1391 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1396 yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, "
1397 "skipped "ZINT_FORMAT,
1398 pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1399 for (i = pp->maxlevel; i>=0; i--)
1400 if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1401 yaz_log(YLOG_DEBUG, "isamb_pp_close level leaf-%d: "
1402 ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1403 pp->accessed_nodes[i], pp->skipped_nodes[i]);
1404 pp->isamb->skipped_numbers += pp->skipped_numbers;
1405 pp->isamb->returned_numbers += pp->returned_numbers;
1406 for (i = pp->maxlevel; i>=0; i--)
1408 pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1409 pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1412 *size = pp->total_size;
1414 *blocks = pp->no_blocks;
1415 for (i = 0; i <= pp->level; i++)
1416 close_block(pp->isamb, pp->block[i]);
1421 int isamb_block_info(ISAMB isamb, int cat)
1423 if (cat >= 0 && cat < isamb->no_cat)
1424 return isamb->file[cat].head.block_size;
1428 void isamb_pp_close(ISAMB_PP pp)
1430 isamb_pp_close_x(pp, 0, 0);
1433 /* simple recursive dumper .. */
1434 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1438 char prefix_str[1024];
1441 struct ISAMB_block *p = open_block(b, pos);
1442 sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1443 ZINT_FORMAT, level*2, "",
1444 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1447 sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1450 while (p->offset < p->size)
1452 const char *src = p->bytes + p->offset;
1454 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1455 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1456 p->offset = src - (char*) p->bytes;
1458 assert(p->offset == p->size);
1462 const char *src = p->bytes + p->offset;
1465 decode_ptr(&src, &sub);
1466 p->offset = src - (char*) p->bytes;
1468 isamb_dump_r(b, sub, pr, level+1);
1470 while (p->offset < p->size)
1473 char file_item_buf[DST_ITEM_MAX];
1474 char *file_item = file_item_buf;
1475 void *c1 = (*b->method->codec.start)();
1476 (*b->method->codec.decode)(c1, &file_item, &src);
1477 (*b->method->codec.stop)(c1);
1478 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1481 decode_item_len(&src, &item_len);
1482 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1485 decode_ptr(&src, &sub);
1487 p->offset = src - (char*) p->bytes;
1489 isamb_dump_r(b, sub, pr, level+1);
1496 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1498 isamb_dump_r(b, pos, pr, 0);
1501 int isamb_pp_read(ISAMB_PP pp, void *buf)
1503 return isamb_pp_forward(pp, buf, 0);
1507 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1508 { /* return an estimate of the current position and of the total number of */
1509 /* occureences in the isam tree, based on the current leaf */
1513 /* if end-of-stream PP may not be leaf */
1515 *total = (double) (pp->block[0]->no_items);
1516 *current = (double) pp->returned_numbers;
1518 yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1519 ZINT_FORMAT, *current, *total, pp->returned_numbers);
1523 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1527 struct ISAMB_block *p = pp->block[pp->level];
1528 ISAMB b = pp->isamb;
1532 while (p->offset == p->size)
1538 char file_item_buf[DST_ITEM_MAX];
1539 char *file_item = file_item_buf;
1543 while (p->offset == p->size)
1547 close_block(pp->isamb, pp->block[pp->level]);
1548 pp->block[pp->level] = 0;
1550 p = pp->block[pp->level];
1555 src = p->bytes + p->offset;
1558 c1 = (*b->method->codec.start)();
1559 (*b->method->codec.decode)(c1, &file_item, &src);
1561 decode_ptr(&src, &item_len);
1564 decode_ptr(&src, &pos);
1565 p->offset = src - (char*) p->bytes;
1567 src = p->bytes + p->offset;
1571 if (!untilb || p->offset == p->size)
1573 assert(p->offset < p->size);
1576 file_item = file_item_buf;
1577 (*b->method->codec.reset)(c1);
1578 (*b->method->codec.decode)(c1, &file_item, &src);
1579 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1585 decode_item_len(&src, &item_len);
1586 if ((*b->method->compare_item)(untilb, src) < pp->scope)
1590 decode_ptr(&src, &pos);
1591 p->offset = src - (char*) p->bytes;
1598 pp->block[pp->level] = p = open_block(pp->isamb, pos);
1600 pp->total_size += p->size;
1608 src = p->bytes + p->offset;
1611 decode_ptr(&src, &pos);
1612 p->offset = src - (char*) p->bytes;
1614 if (!untilb || p->offset == p->size)
1616 assert(p->offset < p->size);
1619 file_item = file_item_buf;
1620 (*b->method->codec.reset)(c1);
1621 (*b->method->codec.decode)(c1, &file_item, &src);
1622 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1628 decode_ptr(&src, &item_len);
1629 if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1637 (*b->method->codec.stop)(c1);
1640 assert(p->offset < p->size);
1645 src = p->bytes + p->offset;
1646 (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1647 p->offset = src - (char*) p->bytes;
1648 if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1651 if (p->offset == p->size) goto again;
1653 pp->returned_numbers++;
1657 zint isamb_get_int_splits(ISAMB b)
1659 return b->number_of_int_splits;
1662 zint isamb_get_leaf_splits(ISAMB b)
1664 return b->number_of_leaf_splits;
1667 zint isamb_get_root_ptr(ISAMB b)
1672 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1674 b->root_ptr = root_ptr;
1681 * c-file-style: "Stroustrup"
1682 * indent-tabs-mode: nil
1684 * vim: shiftwidth=4 tabstop=8 expandtab