2 * Copyright (C) 1994-2002, Index Data
5 * $Id: recindex.c,v 1.33 2002-07-15 11:50:01 adam Exp $
10 * Format of first block
15 * Format of subsequent blocks
19 * Format of each record
21 * (length, data) - pairs
22 * length = 0 if same as previous
33 static void rec_write_head (Records p)
38 assert (p->index_BFile);
40 r = bf_write (p->index_BFile, 0, 0, sizeof(p->head), &p->head);
43 logf (LOG_FATAL|LOG_ERRNO, "write head of %s", p->index_fname);
48 static void rec_tmp_expand (Records p, int size)
50 if (p->tmp_size < size + 2048 ||
51 p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
54 p->tmp_size = size + p->head.block_size[REC_BLOCK_TYPES-1]*2 + 2048;
55 p->tmp_buf = (char *) xmalloc (p->tmp_size);
59 static int read_indx (Records p, int sysno, void *buf, int itemsize,
63 int pos = (sysno-1)*itemsize;
65 r = bf_read (p->index_BFile, 1+pos/128, pos%128, itemsize, buf);
66 if (r != 1 && !ignoreError)
68 logf (LOG_FATAL|LOG_ERRNO, "read in %s at pos %ld",
69 p->index_fname, (long) pos);
75 static void write_indx (Records p, int sysno, void *buf, int itemsize)
77 int pos = (sysno-1)*itemsize;
79 bf_write (p->index_BFile, 1+pos/128, pos%128, itemsize, buf);
82 static void rec_release_blocks (Records p, int sysno)
84 struct record_index_entry entry;
86 char block_and_ref[sizeof(short) + sizeof(int)];
90 if (read_indx (p, sysno, &entry, sizeof(entry), 1) != 1)
93 freeblock = entry.next;
94 assert (freeblock > 0);
95 dst_type = freeblock & 7;
96 assert (dst_type < REC_BLOCK_TYPES);
97 freeblock = freeblock / 8;
100 if (bf_read (p->data_BFile[dst_type], freeblock, 0,
101 sizeof(block_and_ref), block_and_ref) != 1)
103 logf (LOG_FATAL|LOG_ERRNO, "read in rec_del_single");
109 memcpy (&ref, block_and_ref + sizeof(int), sizeof(ref));
111 memcpy (block_and_ref + sizeof(int), &ref, sizeof(ref));
114 if (bf_write (p->data_BFile[dst_type], freeblock, 0,
115 sizeof(block_and_ref), block_and_ref))
117 logf (LOG_FATAL|LOG_ERRNO, "write in rec_del_single");
125 if (bf_write (p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
126 &p->head.block_free[dst_type]))
128 logf (LOG_FATAL|LOG_ERRNO, "write in rec_del_single");
131 p->head.block_free[dst_type] = freeblock;
132 memcpy (&freeblock, block_and_ref, sizeof(int));
134 p->head.block_used[dst_type]--;
136 p->head.total_bytes -= entry.size;
139 static void rec_delete_single (Records p, Record rec)
141 struct record_index_entry entry;
143 rec_release_blocks (p, rec->sysno);
145 entry.next = p->head.index_free;
147 p->head.index_free = rec->sysno;
148 write_indx (p, rec->sysno, &entry, sizeof(entry));
151 static void rec_write_tmp_buf (Records p, int size, int *sysnos)
153 struct record_index_entry entry;
155 char *cptr = p->tmp_buf;
156 int block_prev = -1, block_free;
160 for (i = 1; i<REC_BLOCK_TYPES; i++)
161 if (size >= p->head.block_move[i])
163 while (no_written < size)
165 block_free = p->head.block_free[dst_type];
168 if (bf_read (p->data_BFile[dst_type],
169 block_free, 0, sizeof(*p->head.block_free),
170 &p->head.block_free[dst_type]) != 1)
172 logf (LOG_FATAL|LOG_ERRNO, "read in %s at free block %d",
173 p->data_fname[dst_type], block_free);
178 block_free = p->head.block_last[dst_type]++;
179 if (block_prev == -1)
181 entry.next = block_free*8 + dst_type;
183 p->head.total_bytes += size;
186 write_indx (p, *sysnos, &entry, sizeof(entry));
192 memcpy (cptr, &block_free, sizeof(int));
193 bf_write (p->data_BFile[dst_type], block_prev, 0, 0, cptr);
194 cptr = p->tmp_buf + no_written;
196 block_prev = block_free;
197 no_written += p->head.block_size[dst_type] - sizeof(int);
198 p->head.block_used[dst_type]++;
200 assert (block_prev != -1);
202 memcpy (cptr, &block_free, sizeof(int));
203 bf_write (p->data_BFile[dst_type], block_prev, 0,
204 sizeof(int) + (p->tmp_buf+size) - cptr, cptr);
207 Records rec_open (BFiles bfs, int rw, int compression_method)
213 p = (Records) xmalloc (sizeof(*p));
214 p->compression_method = compression_method;
217 p->tmp_buf = (char *) xmalloc (p->tmp_size);
218 p->index_fname = "reci";
219 p->index_BFile = bf_open (bfs, p->index_fname, 128, rw);
220 if (p->index_BFile == NULL)
222 logf (LOG_FATAL|LOG_ERRNO, "open %s", p->index_fname);
225 r = bf_read (p->index_BFile, 0, 0, 0, p->tmp_buf);
229 memcpy (p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
230 sprintf (p->head.version, "%3d", REC_VERSION);
231 p->head.index_free = 0;
232 p->head.index_last = 1;
233 p->head.no_records = 0;
234 p->head.total_bytes = 0;
235 for (i = 0; i<REC_BLOCK_TYPES; i++)
237 p->head.block_free[i] = 0;
238 p->head.block_last[i] = 1;
239 p->head.block_used[i] = 0;
241 p->head.block_size[0] = 128;
242 p->head.block_move[0] = 0;
243 for (i = 1; i<REC_BLOCK_TYPES; i++)
245 p->head.block_size[i] = p->head.block_size[i-1] * 4;
246 p->head.block_move[i] = p->head.block_size[i] * 24;
252 memcpy (&p->head, p->tmp_buf, sizeof(p->head));
253 if (memcmp (p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
255 logf (LOG_FATAL, "file %s has bad format", p->index_fname);
258 version = atoi (p->head.version);
259 if (version != REC_VERSION)
261 logf (LOG_FATAL, "file %s is version %d, but version"
262 " %d is required", p->index_fname, version, REC_VERSION);
267 for (i = 0; i<REC_BLOCK_TYPES; i++)
270 sprintf (str, "recd%c", i + 'A');
271 p->data_fname[i] = (char *) xmalloc (strlen(str)+1);
272 strcpy (p->data_fname[i], str);
273 p->data_BFile[i] = NULL;
275 for (i = 0; i<REC_BLOCK_TYPES; i++)
277 if (!(p->data_BFile[i] = bf_open (bfs, p->data_fname[i],
278 p->head.block_size[i],
281 logf (LOG_FATAL|LOG_ERRNO, "bf_open %s", p->data_fname[i]);
287 p->record_cache = (struct record_cache_entry *)
288 xmalloc (sizeof(*p->record_cache)*p->cache_max);
289 zebra_mutex_init (&p->mutex);
293 static void rec_encode_unsigned (unsigned n, unsigned char *buf, int *len)
298 buf[*len] = 128 + (n & 127);
306 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
312 while (buf[*len] > 127)
314 n += w*(buf[*len] & 127);
323 static void rec_cache_flush_block1 (Records p, Record rec, Record last_rec,
324 char **out_buf, int *out_size,
330 for (i = 0; i<REC_NO_INFO; i++)
332 if (*out_offset + (int) rec->size[i] + 20 > *out_size)
334 int new_size = *out_offset + rec->size[i] + 65536;
335 char *np = (char *) xmalloc (new_size);
337 memcpy (np, *out_buf, *out_offset);
339 *out_size = new_size;
344 rec_encode_unsigned (rec->sysno, *out_buf + *out_offset, &len);
345 (*out_offset) += len;
347 if (rec->size[i] == 0)
349 rec_encode_unsigned (1, *out_buf + *out_offset, &len);
350 (*out_offset) += len;
352 else if (last_rec && rec->size[i] == last_rec->size[i] &&
353 !memcmp (rec->info[i], last_rec->info[i], rec->size[i]))
355 rec_encode_unsigned (0, *out_buf + *out_offset, &len);
356 (*out_offset) += len;
360 rec_encode_unsigned (rec->size[i]+1, *out_buf + *out_offset, &len);
361 (*out_offset) += len;
362 memcpy (*out_buf + *out_offset, rec->info[i], rec->size[i]);
363 (*out_offset) += rec->size[i];
368 static void rec_write_multiple (Records p, int saveCount)
372 char compression_method;
376 char *out_buf = (char *) xmalloc (out_size);
377 int *sysnos = (int *) xmalloc (sizeof(*sysnos) * (p->cache_cur + 1));
378 int *sysnop = sysnos;
380 for (i = 0; i<p->cache_cur - saveCount; i++)
382 struct record_cache_entry *e = p->record_cache + i;
386 rec_cache_flush_block1 (p, e->rec, last_rec, &out_buf,
387 &out_size, &out_offset);
388 *sysnop++ = e->rec->sysno;
390 e->flag = recordFlagNop;
393 case recordFlagWrite:
394 rec_release_blocks (p, e->rec->sysno);
395 rec_cache_flush_block1 (p, e->rec, last_rec, &out_buf,
396 &out_size, &out_offset);
397 *sysnop++ = e->rec->sysno;
399 e->flag = recordFlagNop;
402 case recordFlagDelete:
403 rec_delete_single (p, e->rec);
404 e->flag = recordFlagNop;
414 int csize = 0; /* indicate compression "not performed yet" */
415 compression_method = p->compression_method;
416 switch (compression_method)
418 case REC_COMPRESS_BZIP2:
420 csize = out_offset + (out_offset >> 6) + 620;
421 rec_tmp_expand (p, csize);
422 #ifdef BZ_CONFIG_ERROR
423 i = BZ2_bzBuffToBuffCompress
425 i = bzBuffToBuffCompress
427 (p->tmp_buf+sizeof(int)+sizeof(short)+
429 &csize, out_buf, out_offset, 1, 0, 30);
432 logf (LOG_WARN, "bzBuffToBuffCompress error code=%d", i);
435 logf (LOG_LOG, "compress %4d %5d %5d", ref_count, out_offset,
439 case REC_COMPRESS_NONE:
444 /* either no compression or compression not supported ... */
446 rec_tmp_expand (p, csize);
447 memcpy (p->tmp_buf + sizeof(int) + sizeof(short) + sizeof(char),
448 out_buf, out_offset);
450 compression_method = REC_COMPRESS_NONE;
452 memcpy (p->tmp_buf + sizeof(int), &ref_count, sizeof(ref_count));
453 memcpy (p->tmp_buf + sizeof(int)+sizeof(short),
454 &compression_method, sizeof(compression_method));
456 /* -------- compression */
457 rec_write_tmp_buf (p, csize + sizeof(short) + sizeof(char), sysnos);
463 static void rec_cache_flush (Records p, int saveCount)
467 if (saveCount >= p->cache_cur)
470 rec_write_multiple (p, saveCount);
472 for (i = 0; i<p->cache_cur - saveCount; i++)
474 struct record_cache_entry *e = p->record_cache + i;
477 /* i still being used ... */
478 for (j = 0; j<saveCount; j++, i++)
479 memcpy (p->record_cache+j, p->record_cache+i,
480 sizeof(*p->record_cache));
481 p->cache_cur = saveCount;
484 static Record *rec_cache_lookup (Records p, int sysno,
485 enum recordCacheFlag flag)
488 for (i = 0; i<p->cache_cur; i++)
490 struct record_cache_entry *e = p->record_cache + i;
491 if (e->rec->sysno == sysno)
493 if (flag != recordFlagNop && e->flag == recordFlagNop)
501 static void rec_cache_insert (Records p, Record rec, enum recordCacheFlag flag)
503 struct record_cache_entry *e;
505 if (p->cache_cur == p->cache_max)
506 rec_cache_flush (p, 1);
507 else if (p->cache_cur > 0)
511 for (i = 0; i<p->cache_cur; i++)
513 Record r = (p->record_cache + i)->rec;
514 for (j = 0; j<REC_NO_INFO; j++)
518 rec_cache_flush (p, 1);
520 assert (p->cache_cur < p->cache_max);
522 e = p->record_cache + (p->cache_cur)++;
524 e->rec = rec_cp (rec);
527 void rec_close (Records *pp)
534 zebra_mutex_destroy (&p->mutex);
535 rec_cache_flush (p, 0);
536 xfree (p->record_cache);
542 bf_close (p->index_BFile);
544 for (i = 0; i<REC_BLOCK_TYPES; i++)
546 if (p->data_BFile[i])
547 bf_close (p->data_BFile[i]);
548 xfree (p->data_fname[i]);
555 static Record rec_get_int (Records p, int sysno)
559 struct record_index_entry entry;
560 int freeblock, dst_type;
567 char compression_method;
572 if ((recp = rec_cache_lookup (p, sysno, recordFlagNop)))
573 return rec_cp (*recp);
575 if (read_indx (p, sysno, &entry, sizeof(entry), 1) < 1)
576 return NULL; /* record is not there! */
579 return NULL; /* record is deleted */
581 dst_type = entry.next & 7;
582 assert (dst_type < REC_BLOCK_TYPES);
583 freeblock = entry.next / 8;
585 assert (freeblock > 0);
587 rec_tmp_expand (p, entry.size);
590 r = bf_read (p->data_BFile[dst_type], freeblock, 0, 0, cptr);
593 memcpy (&freeblock, cptr, sizeof(freeblock));
599 cptr += p->head.block_size[dst_type] - sizeof(freeblock);
601 memcpy (&tmp, cptr, sizeof(tmp));
602 r = bf_read (p->data_BFile[dst_type], freeblock, 0, 0, cptr);
605 memcpy (&freeblock, cptr, sizeof(freeblock));
606 memcpy (cptr, &tmp, sizeof(tmp));
609 rec = (Record) xmalloc (sizeof(*rec));
611 memcpy (&compression_method, p->tmp_buf + sizeof(int) + sizeof(short),
612 sizeof(compression_method));
613 in_buf = p->tmp_buf + sizeof(int) + sizeof(short) + sizeof(char);
614 in_size = entry.size - sizeof(short) - sizeof(char);
615 switch (compression_method)
617 case REC_COMPRESS_BZIP2:
619 bz_size = entry.size * 20 + 100;
622 bz_buf = (char *) xmalloc (bz_size);
623 #ifdef BZ_CONFIG_ERROR
624 i = BZ2_bzBuffToBuffDecompress
626 i = bzBuffToBuffDecompress
628 (bz_buf, &bz_size, in_buf, in_size, 0, 0);
629 logf (LOG_LOG, "decompress %5d %5d", in_size, bz_size);
632 logf (LOG_LOG, "failed");
639 logf (LOG_FATAL, "cannot decompress record(s) in BZIP2 format");
643 case REC_COMPRESS_NONE:
646 for (i = 0; i<REC_NO_INFO; i++)
649 nptr = in_buf; /* skip ref count */
650 while (nptr < in_buf + in_size)
654 rec_decode_unsigned (&this_sysno, nptr, &len);
657 for (i = 0; i < REC_NO_INFO; i++)
660 rec_decode_unsigned (&this_size, nptr, &len);
665 rec->size[i] = this_size-1;
670 nptr += rec->size[i];
675 if (this_sysno == sysno)
678 for (i = 0; i<REC_NO_INFO; i++)
680 if (rec->info[i] && rec->size[i])
682 char *np = xmalloc (rec->size[i]+1);
683 memcpy (np, rec->info[i], rec->size[i]);
684 np[rec->size[i]] = '\0';
689 assert (rec->info[i] == 0);
690 assert (rec->size[i] == 0);
694 rec_cache_insert (p, rec, recordFlagNop);
698 Record rec_get (Records p, int sysno)
701 zebra_mutex_lock (&p->mutex);
703 rec = rec_get_int (p, sysno);
704 zebra_mutex_unlock (&p->mutex);
708 static Record rec_new_int (Records p)
714 rec = (Record) xmalloc (sizeof(*rec));
715 if (1 || p->head.index_free == 0)
716 sysno = (p->head.index_last)++;
719 struct record_index_entry entry;
721 read_indx (p, p->head.index_free, &entry, sizeof(entry), 0);
722 sysno = p->head.index_free;
723 p->head.index_free = entry.next;
725 (p->head.no_records)++;
727 for (i = 0; i < REC_NO_INFO; i++)
732 rec_cache_insert (p, rec, recordFlagNew);
736 Record rec_new (Records p)
739 zebra_mutex_lock (&p->mutex);
741 rec = rec_new_int (p);
742 zebra_mutex_unlock (&p->mutex);
746 void rec_del (Records p, Record *recpp)
750 zebra_mutex_lock (&p->mutex);
751 (p->head.no_records)--;
752 if ((recp = rec_cache_lookup (p, (*recpp)->sysno, recordFlagDelete)))
759 rec_cache_insert (p, *recpp, recordFlagDelete);
762 zebra_mutex_unlock (&p->mutex);
766 void rec_put (Records p, Record *recpp)
770 zebra_mutex_lock (&p->mutex);
771 if ((recp = rec_cache_lookup (p, (*recpp)->sysno, recordFlagWrite)))
778 rec_cache_insert (p, *recpp, recordFlagWrite);
781 zebra_mutex_unlock (&p->mutex);
785 void rec_rm (Record *recpp)
791 for (i = 0; i < REC_NO_INFO; i++)
792 xfree ((*recpp)->info[i]);
797 Record rec_cp (Record rec)
802 n = (Record) xmalloc (sizeof(*n));
803 n->sysno = rec->sysno;
804 for (i = 0; i < REC_NO_INFO; i++)
812 n->size[i] = rec->size[i];
813 n->info[i] = (char *) xmalloc (rec->size[i]);
814 memcpy (n->info[i], rec->info[i], rec->size[i]);
820 char *rec_strdup (const char *s, size_t *len)
830 p = (char *) xmalloc (*len);