* Copyright (c) 1998-2004, Index Data.
* See the file LICENSE for details.
*
- * $Id: yaz-proxy.cpp,v 1.97 2004-02-02 15:11:41 adam Exp $
+ * $Id: yaz-proxy.cpp,v 1.106 2004-03-01 17:01:52 adam Exp $
*/
+#include <unistd.h>
#include <assert.h>
#include <time.h>
+#include <sys/types.h>
+#include <fcntl.h>
#include <yaz/srw.h>
#include <yaz/marcdisp.h>
m_request_no = 0;
m_invalid_session = 0;
m_referenceId = 0;
+ m_referenceId_mem = nmem_create();
m_config = 0;
m_marcxml_flag = 0;
m_stylesheet_xsp = 0;
m_schema = 0;
m_initRequest_apdu = 0;
m_initRequest_mem = 0;
+ m_initRequest_preferredMessageSize = 0;
+ m_initRequest_maximumRecordSize = 0;
m_initRequest_options = 0;
m_initRequest_version = 0;
m_apdu_invalid_session = 0;
m_s2z_packing = Z_SRW_recordPacking_string;
m_time_tv.tv_sec = 0;
m_time_tv.tv_usec = 0;
+ if (!m_parent)
+ low_socket_open();
}
Yaz_Proxy::~Yaz_Proxy()
m_bytes_sent, m_bytes_recv);
nmem_destroy(m_initRequest_mem);
nmem_destroy(m_mem_invalid_session);
+ nmem_destroy(m_referenceId_mem);
+
xfree (m_proxyTarget);
xfree (m_default_target);
xfree (m_proxy_authentication);
odr_destroy(m_s2z_odr_init);
if (m_s2z_odr_search)
odr_destroy(m_s2z_odr_search);
+ if (!m_parent)
+ low_socket_close();
delete m_config;
}
const char *Yaz_Proxy::load_balance(const char **url)
{
int zurl_in_use[MAX_ZURL_PLEX];
+ int zurl_in_spare[MAX_ZURL_PLEX];
Yaz_ProxyClient *c;
int i;
for (i = 0; i<MAX_ZURL_PLEX; i++)
+ {
zurl_in_use[i] = 0;
+ zurl_in_spare[i] = 0;
+ }
for (c = m_parent->m_clientPool; c; c = c->m_next)
{
for (i = 0; url[i]; i++)
if (!strcmp(url[i], c->get_hostname()))
+ {
zurl_in_use[i]++;
+ if (c->m_cookie == 0 && c->m_server == 0 && c->m_waiting == 0)
+ zurl_in_spare[i]++;
+ }
}
- int min = 100000;
- const char *ret = 0;
+ int min_use = 100000;
+ int spare_for_min = 0;
+ int max_spare = 0;
+ const char *ret_min = 0;
+ const char *ret_spare = 0;
for (i = 0; url[i]; i++)
{
- yaz_log(LOG_DEBUG, "%szurl=%s use=%d",
- m_session_str, url[i], zurl_in_use[i]);
- if (min > zurl_in_use[i])
+ yaz_log(LOG_DEBUG, "%szurl=%s use=%d spare=%d",
+ m_session_str, url[i], zurl_in_use[i], zurl_in_spare[i]);
+ if (min_use > zurl_in_use[i])
{
- ret = url[i];
- min = zurl_in_use[i];
+ ret_min = url[i];
+ min_use = zurl_in_use[i];
+ spare_for_min = zurl_in_spare[i];
+ }
+ if (max_spare < zurl_in_spare[i])
+ {
+ ret_spare = url[i];
+ max_spare = zurl_in_spare[i];
}
}
- return ret;
+ // use the one with minimum connections if spare is > 3
+ if (spare_for_min > 3)
+ return ret_min;
+ // use one with most spares (if any)
+ if (max_spare > 0)
+ return ret_spare;
+ return ret_min;
}
Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu, const char *cookie,
!strcmp(m_proxyTarget, c->get_hostname()))
{
// found it in cache
- yaz_log (LOG_LOG, "%sREUSE %s",
- m_session_str, c->get_hostname());
+ yaz_log (LOG_LOG, "%sREUSE %d %s",
+ m_session_str, parent->m_seqno, c->get_hostname());
c->m_seqno = parent->m_seqno;
assert(c->m_server == 0);
c = c_min;
if (c->m_waiting || strcmp(m_proxyTarget, c->get_hostname()))
{
- yaz_log (LOG_LOG, "%sMAXCLIENTS Destroy %d",
- m_session_str, c->m_seqno);
+ yaz_log (LOG_LOG, "%sMAXCLIENTS %d Destroy %d",
+ m_session_str, parent->m_max_clients, c->m_seqno);
if (c->m_server && c->m_server != this)
delete c->m_server;
c->m_server = 0;
}
else
{
- yaz_log (LOG_LOG, "%sMAXCLIENTS Reuse %d %d %s",
- m_session_str,
+ yaz_log (LOG_LOG, "%sMAXCLIENTS %d Reuse %d %d %s",
+ m_session_str, parent->m_max_clients,
c->m_seqno, parent->m_seqno, c->get_hostname());
xfree (c->m_cookie);
c->m_cookie = 0;
yaz_log(LOG_LOG, "%sXSLT convert %d",
m_session_str, m_stylesheet_offset);
res = xsltApplyStylesheet(m_stylesheet_xsp, doc, 0);
+
if (res)
{
xmlChar *out_buf;
xmlFree(out_buf);
xmlFreeDoc(res);
}
+
xmlFreeDoc(doc);
}
}
xsltFreeStylesheet(m_stylesheet_xsp);
m_stylesheet_xsp = 0;
timeout(m_client_idletime);
- int r = send_PDU_convert(m_stylesheet_apdu);
+ send_PDU_convert(m_stylesheet_apdu);
}
else
timeout(0);
er->record.recordData_buf = b;
er->record.recordData_len = len;
er->record.recordPacking = m_s2z_packing;
+ er->record.recordSchema = "http://explain.z3950.org/dtd/2.0/";
er->diagnostics = diagnostics;
er->num_diagnostics = num_diagnostics;
Z_ReferenceId **new_id = get_referenceIdP(apdu);
if (new_id && m_referenceId)
- *new_id = *m_referenceId;
+ *new_id = m_referenceId;
if (apdu->which == Z_APDU_searchResponse)
{
ODR_MASK_SET(nopt, i);
apdu->u.initResponse->protocolVersion = nopt;
}
+ apdu->u.initResponse->preferredMessageSize =
+ odr_intdup(odr_encode(),
+ m_client->m_initResponse_preferredMessageSize >
+ m_initRequest_preferredMessageSize ?
+ m_initRequest_preferredMessageSize :
+ m_client->m_initResponse_preferredMessageSize);
+ apdu->u.initResponse->maximumRecordSize =
+ odr_intdup(odr_encode(),
+ m_client->m_initResponse_maximumRecordSize >
+ m_initRequest_maximumRecordSize ?
+ m_initRequest_maximumRecordSize :
+ m_client->m_initResponse_maximumRecordSize);
}
int r = send_PDU_convert(apdu);
if (r)
&addinfo, &stylesheet_name, &m_schema);
if (stylesheet_name)
{
+ m_parent->low_socket_close();
+
if (m_stylesheet_xsp)
xsltFreeStylesheet(m_stylesheet_xsp);
+
m_stylesheet_xsp = xsltParseStylesheetFile((const xmlChar*)
stylesheet_name);
m_stylesheet_offset = 0;
xfree(stylesheet_name);
+
+ m_parent->low_socket_open();
}
if (err == -1)
{
&addinfo, &stylesheet_name, &m_schema);
if (stylesheet_name)
{
+ m_parent->low_socket_close();
+
if (m_stylesheet_xsp)
xsltFreeStylesheet(m_stylesheet_xsp);
+
m_stylesheet_xsp = xsltParseStylesheetFile((const xmlChar*)
stylesheet_name);
m_stylesheet_offset = 0;
xfree(stylesheet_name);
+
+ m_parent->low_socket_open();
}
if (err == -1)
{
void Yaz_Proxy::handle_incoming_Z_PDU(Z_APDU *apdu)
{
+ Z_ReferenceId **refid = get_referenceIdP(apdu);
+ nmem_reset(m_referenceId_mem);
+ if (refid && *refid)
+ {
+ m_referenceId = (Z_ReferenceId *)
+ nmem_malloc(m_referenceId_mem, sizeof(*m_referenceId));
+ m_referenceId->len = m_referenceId->size = (*refid)->len;
+ m_referenceId->buf = (unsigned char *)
+ nmem_malloc(m_referenceId_mem, (*refid)->len);
+ memcpy(m_referenceId->buf, (*refid)->buf, (*refid)->len);
+ }
+ else
+ m_referenceId = 0;
+
if (!m_client && m_invalid_session)
{
m_apdu_invalid_session = apdu;
apdu = m_initRequest_apdu;
}
- m_referenceId = get_referenceIdP(apdu);
-
// Determine our client.
Z_OtherInformation **oi;
get_otherInfoAPDU(apdu, &oi);
m_initRequest_apdu = apdu;
m_initRequest_mem = odr_extract_mem(odr_decode());
+ m_initRequest_preferredMessageSize = *apdu->u.initRequest->
+ preferredMessageSize;
+ *apdu->u.initRequest->preferredMessageSize = 1024*1024;
+ m_initRequest_maximumRecordSize = *apdu->u.initRequest->
+ maximumRecordSize;
+ *apdu->u.initRequest->maximumRecordSize = 1024*1024;
+
// save init options for the response..
m_initRequest_options = apdu->u.initRequest->options;
ODR_MASK_SET(apdu->u.initRequest->options, i);
ODR_MASK_CLEAR(apdu->u.initRequest->options,
Z_Options_negotiationModel);
+ ODR_MASK_CLEAR(apdu->u.initRequest->options,
+ Z_Options_concurrentOperations);
// make new version
m_initRequest_version = apdu->u.initRequest->protocolVersion;
{
if (handle_init_response_for_invalid_session(apdu))
return;
- Z_APDU *apdu2 = m_client->m_initResponse;
- apdu2->u.initResponse->otherInfo = 0;
- if (m_client->m_cookie && *m_client->m_cookie)
- set_otherInformationString(apdu2, VAL_COOKIE, 1,
- m_client->m_cookie);
- apdu2->u.initResponse->referenceId =
- apdu->u.initRequest->referenceId;
- apdu2->u.initResponse->options = m_client->m_initResponse_options;
- apdu2->u.initResponse->protocolVersion =
- m_client->m_initResponse_version;
-
- send_to_client(apdu2);
- return;
+ if (m_client->m_initResponse)
+ {
+ Z_APDU *apdu2 = m_client->m_initResponse;
+ apdu2->u.initResponse->otherInfo = 0;
+ if (m_client->m_cookie && *m_client->m_cookie)
+ set_otherInformationString(apdu2, VAL_COOKIE, 1,
+ m_client->m_cookie);
+ apdu2->u.initResponse->referenceId =
+ apdu->u.initRequest->referenceId;
+ apdu2->u.initResponse->options = m_client->m_initResponse_options;
+ apdu2->u.initResponse->protocolVersion =
+ m_client->m_initResponse_version;
+
+ send_to_client(apdu2);
+ return;
+ }
}
m_client->m_init_flag = 1;
}
ODR_MASK_SET(req->options, i);
ODR_MASK_CLEAR(apdu->u.initRequest->options,
Z_Options_negotiationModel);
+ ODR_MASK_CLEAR(apdu->u.initRequest->options,
+ Z_Options_concurrentOperations);
for (i = 0; i<= 10; i++)
ODR_MASK_SET(req->protocolVersion, i);
{
Yaz_ProxyClient *c;
int spare = 0;
+ int spare_waiting = 0;
int in_use = 0;
int other = 0;
for (c = m_clientPool; c; c = c->m_next)
if (c->m_cookie == 0)
{
if (c->m_server == 0)
- spare++;
+ if (c->m_waiting)
+ spare_waiting;
+ else
+ spare++;
else
in_use++;
}
}
}
yaz_log(LOG_LOG, "%spre-init %s %s use=%d other=%d spare=%d "
- "preinit=%d",m_session_str,
- name, zurl_in_use[j], in_use, other, spare, pre_init);
- if (spare < pre_init)
+ "sparew=%d preinit=%d",m_session_str,
+ name, zurl_in_use[j], in_use, other,
+ spare, spare_waiting, pre_init);
+ if (spare + spare_waiting < pre_init)
{
c = new Yaz_ProxyClient(m_PDU_Observable->clone(), this);
c->m_next = m_clientPool;
m_initResponse = 0;
m_initResponse_options = 0;
m_initResponse_version = 0;
+ m_initResponse_preferredMessageSize = 0;
+ m_initResponse_maximumRecordSize = 0;
m_resultSetStartPoint = 0;
m_bytes_sent = m_bytes_recv = 0;
m_pdu_recv = 0;
m_initResponse = apdu;
m_initResponse_options = apdu->u.initResponse->options;
m_initResponse_version = apdu->u.initResponse->protocolVersion;
+ m_initResponse_preferredMessageSize =
+ *apdu->u.initResponse->preferredMessageSize;
+ m_initResponse_maximumRecordSize =
+ *apdu->u.initResponse->maximumRecordSize;
Z_InitResponse *ir = apdu->u.initResponse;
char *im0 = ir->implementationName;
}
}
+void Yaz_Proxy::low_socket_close()
+{
+ int i;
+ for (i = 0; i<NO_SPARE_SOLARIS_FD; i++)
+ if (m_lo_fd[i] >= 0)
+ ::close(m_lo_fd[i]);
+}
+
+void Yaz_Proxy::low_socket_open()
+{
+ int i;
+ for (i = 0; i<NO_SPARE_SOLARIS_FD; i++)
+ m_lo_fd[i] = open("/dev/null", O_RDONLY);
+}
+
int Yaz_Proxy::server(const char *addr)
{
int r = Yaz_Z_Assoc::server(addr);