1 /* This file is part of the YAZ toolkit.
2 * Copyright (C) Index Data
3 * See the file LICENSE for details.
6 * \file zoom-memcached.c
7 * \brief Implements query/record caching using memcached
18 #include <yaz/yaz-util.h>
19 #include <yaz/xmalloc.h>
21 #include <yaz/diagbib1.h>
23 #if HAVE_LIBMEMCACHED_MEMCACHED_H
24 #if HAVE_MEMCACHED_RETURN_T
26 typedef memcached_return memcached_return_t;
30 void ZOOM_memcached_init(ZOOM_connection c)
32 #if HAVE_LIBMEMCACHED_MEMCACHED_H
40 void ZOOM_memcached_destroy(ZOOM_connection c)
42 #if HAVE_LIBMEMCACHED_MEMCACHED_H
44 memcached_free(c->mc_st);
48 redisFree(c->redis_c);
52 #if HAVE_LIBMEMCACHED_MEMCACHED_H
53 /* memcached wrapper.. Because memcached function do not exist in older libs */
54 static memcached_st *yaz_memcached_wrap(const char *conf)
56 #if HAVE_MEMCACHED_FUNC
57 return memcached(conf, strlen(conf));
61 memcached_st *mc = memcached_create(0);
62 NMEM nmem = nmem_create();
63 memcached_return_t rc;
65 nmem_strsplit_blank(nmem, conf, &darray, &num);
66 for (i = 0; mc && i < num; i++)
68 if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
70 char *host = darray[i] + 9;
71 char *port = strchr(host, ':');
72 char *weight = strstr(host, "/?");
80 rc = memcached_server_add(mc, host, port ? atoi(port) : 11211);
81 yaz_log(YLOG_LOG, "memcached_server_add host=%s rc=%u %s",
82 host, (unsigned) rc, memcached_strerror(mc, rc));
83 if (rc != MEMCACHED_SUCCESS)
103 static redisContext *create_redis(const char *conf)
107 NMEM nmem = nmem_create();
108 redisContext *context = 0;
110 nmem_strsplit_blank(nmem, conf, &darray, &num);
111 for (i = 0; i < num; i++)
113 if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
115 struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
116 char *host = darray[i] + 9;
117 char *port = strchr(host, ':');
118 char *weight = strstr(host, "/?");
127 context = redisConnectWithTimeout(host,
128 port ? atoi(port) : 6379,
137 int ZOOM_memcached_configure(ZOOM_connection c)
143 redisFree(c->redis_c);
147 #if HAVE_LIBMEMCACHED_MEMCACHED_H
150 memcached_free(c->mc_st);
155 val = ZOOM_options_get(c->options, "redis");
159 struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
161 c->redis_c = redisConnectWithTimeout(val, 6379, timeout);
162 if (c->redis_c == 0 || c->redis_c->err)
164 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
165 "could not create redis");
168 return 0; /* don't bother with memcached if redis is enabled */
170 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
174 val = ZOOM_options_get(c->options, "memcached");
177 #if HAVE_LIBMEMCACHED_MEMCACHED_H
178 c->mc_st = yaz_memcached_wrap(val);
181 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
182 "could not create memcached");
185 memcached_behavior_set(c->mc_st, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
188 c->redis_c = create_redis(val);
189 if (c->redis_c == 0 || c->redis_c->err)
191 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
192 "could not create redis");
196 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
205 static void wrbuf_vary_puts(WRBUF w, const char *v)
211 wrbuf_sha1_puts(w, v, 1);
221 void ZOOM_memcached_resultset(ZOOM_resultset r, ZOOM_query q)
224 ZOOM_connection c = r->connection;
226 r->mc_key = wrbuf_alloc();
227 wrbuf_puts(r->mc_key, "1;");
228 wrbuf_vary_puts(r->mc_key, c->host_port);
229 wrbuf_puts(r->mc_key, ";");
230 wrbuf_vary_puts(r->mc_key, ZOOM_resultset_option_get(r, "extraArgs"));
231 wrbuf_puts(r->mc_key, ";");
232 wrbuf_vary_puts(r->mc_key, c->user);
233 wrbuf_puts(r->mc_key, ";");
234 wrbuf_vary_puts(r->mc_key, c->group);
235 wrbuf_puts(r->mc_key, ";");
237 wrbuf_sha1_puts(r->mc_key, c->password, 1);
238 wrbuf_puts(r->mc_key, ";");
240 WRBUF w = wrbuf_alloc();
241 ZOOM_query_get_hash(q, w);
242 wrbuf_sha1_puts(r->mc_key, wrbuf_cstr(w), 1);
245 wrbuf_puts(r->mc_key, ";");
246 wrbuf_vary_puts(r->mc_key, r->req_facets);
250 void ZOOM_memcached_search(ZOOM_connection c, ZOOM_resultset resultset)
253 if (c->redis_c && resultset->live_set == 0)
259 argv[1] = wrbuf_cstr(resultset->mc_key);
261 reply = redisCommandArgv(c->redis_c, 2, argv, 0);
262 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
263 if (reply && reply->type == REDIS_REPLY_STRING)
265 char *v = reply->str;
266 int v_len = reply->len;
268 size_t lead_len = strlen(v) + 1;
270 resultset->size = odr_atoi(v);
272 yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
273 wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
275 if (v_len > lead_len)
277 Z_OtherInformation *oi = 0;
278 int oi_len = v_len - lead_len;
279 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
280 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
282 yaz_log(YLOG_WARN, "oi decoding failed");
283 freeReplyObject(reply);
286 ZOOM_handle_search_result(c, resultset, oi);
287 ZOOM_handle_facet_result(c, resultset, oi);
289 event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
290 ZOOM_connection_put_event(c, event);
291 resultset->live_set = 1;
293 freeReplyObject(reply);
296 #if HAVE_LIBMEMCACHED_MEMCACHED_H
297 if (c->mc_st && resultset->live_set == 0)
301 memcached_return_t rc;
302 char *v = memcached_get(c->mc_st, wrbuf_buf(resultset->mc_key),
303 wrbuf_len(resultset->mc_key),
304 &v_len, &flags, &rc);
305 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
309 size_t lead_len = strlen(v) + 1;
311 resultset->size = odr_atoi(v);
313 yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
314 wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
316 if (v_len > lead_len)
318 Z_OtherInformation *oi = 0;
319 int oi_len = v_len - lead_len;
320 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
321 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
323 yaz_log(YLOG_WARN, "oi decoding failed");
327 ZOOM_handle_search_result(c, resultset, oi);
328 ZOOM_handle_facet_result(c, resultset, oi);
331 event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
332 ZOOM_connection_put_event(c, event);
333 resultset->live_set = 1;
339 void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset,
340 Z_OtherInformation *oi, const char *precision)
343 if (c->redis_c && resultset->live_set == 0)
346 ODR odr = odr_createmem(ODR_ENCODE);
351 str = odr_malloc(odr, 20 + strlen(precision));
352 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
353 sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
356 z_OtherInformation(odr, &oi, 0, 0);
357 oi_buf = odr_getbuf(odr, &oi_len, 0);
359 key = odr_malloc(odr, strlen(str) + 1 + oi_len);
362 memcpy(key + strlen(str) + 1, oi_buf, oi_len);
370 argv[1] = wrbuf_buf(resultset->mc_key);
371 argvlen[1] = wrbuf_len(resultset->mc_key);
373 argvlen[2] = strlen(str) + 1 + oi_len;
374 reply = redisCommandArgv(c->redis_c, 3, argv, argvlen);
375 freeReplyObject(reply);
380 #if HAVE_LIBMEMCACHED_MEMCACHED_H
381 if (c->mc_st && resultset->live_set == 0)
384 memcached_return_t rc;
385 time_t expiration = 36000;
387 ODR odr = odr_createmem(ODR_ENCODE);
392 str = odr_malloc(odr, 20 + strlen(precision));
393 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
394 sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
397 z_OtherInformation(odr, &oi, 0, 0);
398 oi_buf = odr_getbuf(odr, &oi_len, 0);
400 key = odr_malloc(odr, strlen(str) + 1 + oi_len);
403 memcpy(key + strlen(str) + 1, oi_buf, oi_len);
405 rc = memcached_set(c->mc_st,
406 wrbuf_buf(resultset->mc_key),
407 wrbuf_len(resultset->mc_key),
408 key, strlen(str) + 1 + oi_len, expiration, flags);
409 yaz_log(YLOG_LOG, "Store hit count key=%s value=%s oi_len=%d rc=%u %s",
410 wrbuf_cstr(resultset->mc_key), str, oi_len, (unsigned) rc,
411 memcached_strerror(c->mc_st, rc));
417 void ZOOM_memcached_add(ZOOM_resultset r, Z_NamePlusRecord *npr,
419 const char *syntax, const char *elementSetName,
421 Z_SRW_diagnostic *diag)
424 if (r->connection->redis_c &&
425 !diag && npr->which == Z_NamePlusRecord_databaseRecord)
427 WRBUF k = wrbuf_alloc();
428 WRBUF rec_sha1 = wrbuf_alloc();
429 ODR odr = odr_createmem(ODR_ENCODE);
436 z_NamePlusRecord(odr, &npr, 0, 0);
437 rec_buf = odr_getbuf(odr, &rec_len, 0);
439 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
440 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
441 syntax ? syntax : "",
442 elementSetName ? elementSetName : "",
443 schema ? schema : "");
445 wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
449 argv[1] = wrbuf_buf(k);
450 argvlen[1] = wrbuf_len(k);
451 argv[2] = wrbuf_buf(rec_sha1);
452 argvlen[2] = wrbuf_len(rec_sha1);
454 reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
455 yaz_log(YLOG_LOG, "Store record key=%s val=%s",
456 wrbuf_cstr(k), wrbuf_cstr(rec_sha1));
457 freeReplyObject(reply);
459 argv[1] = wrbuf_buf(rec_sha1);
460 argvlen[1] = wrbuf_len(rec_sha1);
462 argvlen[2] = rec_len;
464 reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
465 yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d",
466 wrbuf_cstr(rec_sha1), rec_len);
467 freeReplyObject(reply);
471 wrbuf_destroy(rec_sha1);
474 #if HAVE_LIBMEMCACHED_MEMCACHED_H
475 if (r->connection->mc_st &&
476 !diag && npr->which == Z_NamePlusRecord_databaseRecord)
478 WRBUF k = wrbuf_alloc();
479 WRBUF rec_sha1 = wrbuf_alloc();
481 memcached_return_t rc;
482 time_t expiration = 36000;
483 ODR odr = odr_createmem(ODR_ENCODE);
487 z_NamePlusRecord(odr, &npr, 0, 0);
488 rec_buf = odr_getbuf(odr, &rec_len, 0);
490 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
491 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
492 syntax ? syntax : "",
493 elementSetName ? elementSetName : "",
494 schema ? schema : "");
496 wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
498 rc = memcached_set(r->connection->mc_st,
499 wrbuf_buf(k), wrbuf_len(k),
500 wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
503 yaz_log(YLOG_LOG, "Store record key=%s val=%s rc=%u %s",
504 wrbuf_cstr(k), wrbuf_cstr(rec_sha1), (unsigned) rc,
505 memcached_strerror(r->connection->mc_st, rc));
507 rc = memcached_add(r->connection->mc_st,
508 wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
512 yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d rc=%u %s",
513 wrbuf_cstr(rec_sha1), rec_len, (unsigned) rc,
514 memcached_strerror(r->connection->mc_st, rc));
518 wrbuf_destroy(rec_sha1);
523 Z_NamePlusRecord *ZOOM_memcached_lookup(ZOOM_resultset r, int pos,
525 const char *elementSetName,
529 if (r->connection && r->connection->redis_c)
531 WRBUF k = wrbuf_alloc();
536 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
537 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
538 syntax ? syntax : "",
539 elementSetName ? elementSetName : "",
540 schema ? schema : "");
542 yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
545 argv[1] = wrbuf_buf(k);
546 argvlen[1] = wrbuf_len(k);
547 reply1 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
550 if (reply1 && reply1->type == REDIS_REPLY_STRING)
553 char *sha1_buf = reply1->str;
554 int sha1_len = reply1->len;
556 yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
561 argvlen[1] = sha1_len;
563 reply2 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
564 if (reply2 && reply2->type == REDIS_REPLY_STRING)
566 Z_NamePlusRecord *npr = 0;
567 char *v_buf = reply2->str;
568 int v_len = reply2->len;
570 odr_setbuf(r->odr, v_buf, v_len, 0);
571 z_NamePlusRecord(r->odr, &npr, 0, 0);
573 yaz_log(YLOG_LOG, "returned redis copy");
574 freeReplyObject(reply2);
575 freeReplyObject(reply1);
578 freeReplyObject(reply2);
580 freeReplyObject(reply1);
583 #if HAVE_LIBMEMCACHED_MEMCACHED_H
584 if (r->connection && r->connection->mc_st)
586 WRBUF k = wrbuf_alloc();
590 memcached_return_t rc;
592 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
593 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
594 syntax ? syntax : "",
595 elementSetName ? elementSetName : "",
596 schema ? schema : "");
598 yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
599 sha1_buf = memcached_get(r->connection->mc_st,
600 wrbuf_buf(k), wrbuf_len(k),
601 &sha1_len, &flags, &rc);
609 yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
610 v_buf = memcached_get(r->connection->mc_st, sha1_buf, sha1_len,
611 &v_len, &flags, &rc);
615 Z_NamePlusRecord *npr = 0;
617 odr_setbuf(r->odr, v_buf, v_len, 0);
618 z_NamePlusRecord(r->odr, &npr, 0, 0);
621 yaz_log(YLOG_LOG, "returned memcached copy");
633 * c-file-style: "Stroustrup"
634 * indent-tabs-mode: nil
636 * vim: shiftwidth=4 tabstop=8 expandtab