1 /* $Id: filter_load_balance.cpp,v 1.10 2008-02-20 15:07:52 adam Exp $
2 Copyright (c) 2005-2007, Index Data.
4 This file is part of Metaproxy.
6 Metaproxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
11 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 You should have received a copy of the GNU General Public License
17 along with Metaproxy; see the file LICENSE. If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
23 #include "session.hpp"
24 #include "package.hpp"
26 #include "filter_load_balance.hpp"
30 #include <boost/thread/mutex.hpp>
31 #include <boost/date_time/posix_time/posix_time.hpp>
35 // remove max macro if already defined (defined later in <limits>)
45 namespace mp = metaproxy_1;
46 namespace yf = mp::filter;
48 namespace metaproxy_1 {
50 class LoadBalance::Impl {
54 void process(metaproxy_1::Package & package);
55 void configure(const xmlNode * ptr);
57 // statistic manipulating functions,
58 void add_dead(unsigned long session_id);
59 //void clear_dead(unsigned long session_id);
60 void add_package(unsigned long session_id);
61 void remove_package(unsigned long session_id);
62 void add_session(unsigned long session_id, std::string target);
63 void remove_session(unsigned long session_id);
64 std::string find_session_target(unsigned long session_id);
67 unsigned int cost(std::string target);
68 unsigned int dead(std::string target);
73 unsigned int sessions;
74 unsigned int packages;
77 unsigned int c = sessions + packages + deads;
78 //std::cout << "stats c:" << c
79 // << " s:" << sessions
80 // << " p:" << packages
87 // local protected databases
89 std::map<std::string, TargetStat> m_target_stat;
90 std::map<unsigned long, std::string> m_session_target;
95 // define Pimpl wrapper forwarding to Impl
97 yf::LoadBalance::LoadBalance() : m_p(new Impl)
101 yf::LoadBalance::~LoadBalance()
102 { // must have a destructor because of boost::scoped_ptr
105 void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only)
107 m_p->configure(xmlnode);
110 void yf::LoadBalance::process(mp::Package &package) const
112 m_p->process(package);
116 // define Implementation stuff
120 yf::LoadBalance::Impl::Impl()
124 yf::LoadBalance::Impl::~Impl()
128 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
132 void yf::LoadBalance::Impl::process(mp::Package &package)
135 bool is_closed_front = false;
136 bool is_closed_back = false;
138 // checking for closed front end packages
139 if (package.session().is_closed()){
140 is_closed_front = true;
143 Z_GDU *gdu_req = package.request().get();
145 // passing anything but z3950 packages
146 if (gdu_req && gdu_req->which == Z_GDU_Z3950){
148 // target selecting only on Z39.50 init request
149 if (gdu_req->u.z3950->which == Z_APDU_initRequest){
151 mp::odr odr_en(ODR_ENCODE);
152 Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
154 // extracting virtual hosts
155 std::list<std::string> vhosts;
157 mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
159 // choosing one target according to load-balancing algorithm
163 unsigned int cost = std::numeric_limits<unsigned int>::max();
165 { //locking scope for local databases
166 boost::mutex::scoped_lock scoped_lock(m_mutex);
168 // load-balancing algorithm goes here
169 //target = *vhosts.begin();
170 for(std::list<std::string>::const_iterator ivh
174 if ((*ivh).size() != 0){
176 = yf::LoadBalance::Impl::cost(*ivh);
184 // updating local database
185 add_session(package.session().id(), target);
186 yf::LoadBalance::Impl::cost(target);
187 add_package(package.session().id());
190 // copying new target into init package
191 mp::util::set_vhost_otherinfo(&(org_init->otherInfo),
193 package.request() = gdu_req;
197 // frontend Z39.50 close request is added to statistics and marked
198 else if (gdu_req->u.z3950->which == Z_APDU_close){
199 is_closed_front = true;
200 boost::mutex::scoped_lock scoped_lock(m_mutex);
201 add_package(package.session().id());
203 // any other Z39.50 package is added to statistics
205 boost::mutex::scoped_lock scoped_lock(m_mutex);
206 add_package(package.session().id());
210 // moving all package types
214 // checking for closed back end packages
215 if (package.session().is_closed())
216 is_closed_back = true;
218 Z_GDU *gdu_res = package.response().get();
220 // passing anything but z3950 packages
221 if (gdu_res && gdu_res->which == Z_GDU_Z3950){
223 // session closing only on Z39.50 close response
224 if (gdu_res->u.z3950->which == Z_APDU_close){
225 is_closed_back = true;
226 boost::mutex::scoped_lock scoped_lock(m_mutex);
227 remove_package(package.session().id());
229 // any other Z39.50 package is removed from statistics
231 boost::mutex::scoped_lock scoped_lock(m_mutex);
232 remove_package(package.session().id());
236 // finally removing sessions and marking deads
237 if (is_closed_back || is_closed_front){
238 boost::mutex::scoped_lock scoped_lock(m_mutex);
240 // marking backend dead if backend closed without fronted close
241 if (is_closed_front == false)
242 add_dead(package.session().id());
244 remove_session(package.session().id());
246 // making sure that package is closed
247 package.session().close();
251 // getting timestamp for receiving of package
252 //boost::posix_time::ptime receive_time
253 // = boost::posix_time::microsec_clock::local_time();
254 // //<< receive_time << " "
255 // //<< to_iso_string(receive_time) << " "
256 //<< to_iso_extended_string(receive_time) << " "
259 // statistic manipulating functions,
260 void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
263 std::string target = find_session_target(session_id);
265 if (target.size() != 0){
266 std::map<std::string, TargetStat>::iterator itarg;
267 itarg = m_target_stat.find(target);
268 if (itarg != m_target_stat.end()
269 && itarg->second.deads < std::numeric_limits<unsigned int>::max()){
270 itarg->second.deads += 1;
271 // std:.cout << "add_dead " << session_id << " " << target
272 // << " d:" << itarg->second.deads << "\n";
277 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
278 // std::cout << "clear_dead " << session_id << "\n";
281 void yf::LoadBalance::Impl::add_package(unsigned long session_id){
283 std::string target = find_session_target(session_id);
285 if (target.size() != 0){
286 std::map<std::string, TargetStat>::iterator itarg;
287 itarg = m_target_stat.find(target);
288 if (itarg != m_target_stat.end()
289 && itarg->second.packages
290 < std::numeric_limits<unsigned int>::max()){
291 itarg->second.packages += 1;
292 // std:.cout << "add_package " << session_id << " " << target
293 // << " p:" << itarg->second.packages << "\n";
298 void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
299 std::string target = find_session_target(session_id);
301 if (target.size() != 0){
302 std::map<std::string, TargetStat>::iterator itarg;
303 itarg = m_target_stat.find(target);
304 if (itarg != m_target_stat.end()
305 && itarg->second.packages > 0){
306 itarg->second.packages -= 1;
307 // std:.cout << "remove_package " << session_id << " " << target
308 // << " p:" << itarg->second.packages << "\n";
313 void yf::LoadBalance::Impl::add_session(unsigned long session_id,
316 // finding and adding session
317 std::map<unsigned long, std::string>::iterator isess;
318 isess = m_session_target.find(session_id);
319 if (isess == m_session_target.end()){
320 m_session_target.insert(std::make_pair(session_id, target));
323 // finding and adding target statistics
324 std::map<std::string, TargetStat>::iterator itarg;
325 itarg = m_target_stat.find(target);
326 if (itarg == m_target_stat.end()){
329 stat.packages = 0; // no idea why the defaut constructor TargetStat()
330 stat.deads = 0; // is not initializig this correctly to zero ??
331 m_target_stat.insert(std::make_pair(target, stat));
332 // std:.cout << "add_session " << session_id << " " << target
335 else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
337 itarg->second.sessions += 1;
338 // std:.cout << "add_session " << session_id << " " << target
339 // << " s:" << itarg->second.sessions << "\n";
343 void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
348 std::map<unsigned long, std::string>::iterator isess;
349 isess = m_session_target.find(session_id);
350 if (isess == m_session_target.end())
353 target = isess->second;
355 // finding target statistics
356 std::map<std::string, TargetStat>::iterator itarg;
357 itarg = m_target_stat.find(target);
358 if (itarg == m_target_stat.end()){
359 m_session_target.erase(isess);
363 // counting session down
364 if (itarg->second.sessions > 0)
365 itarg->second.sessions -= 1;
367 // std:.cout << "remove_session " << session_id << " " << target
368 // << " s:" << itarg->second.sessions << "\n";
370 // clearing empty sessions and targets
371 if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
372 m_target_stat.erase(itarg);
373 m_session_target.erase(isess);
378 yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
381 std::map<unsigned long, std::string>::iterator isess;
382 isess = m_session_target.find(session_id);
383 if (isess != m_session_target.end())
384 target = isess->second;
390 unsigned int yf::LoadBalance::Impl::cost(std::string target){
394 if (target.size() != 0){
395 std::map<std::string, TargetStat>::iterator itarg;
396 itarg = m_target_stat.find(target);
397 if (itarg != m_target_stat.end()){
398 cost = itarg->second.cost();
402 //std::cout << "cost " << target << " c:" << cost << "\n";
406 unsigned int yf::LoadBalance::Impl::dead(std::string target){
410 if (target.size() != 0){
411 std::map<std::string, TargetStat>::iterator itarg;
412 itarg = m_target_stat.find(target);
413 if (itarg != m_target_stat.end()){
414 dead = itarg->second.deads;
418 //std::cout << "dead " << target << " d:" << dead << "\n";
425 static mp::filter::Base* filter_creator()
427 return new mp::filter::LoadBalance;
431 struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
442 * indent-tabs-mode: nil
443 * c-file-style: "stroustrup"
445 * vim: shiftwidth=4 tabstop=8 expandtab