1 /* $Id: filter_load_balance.cpp,v 1.3 2007-01-03 16:25:24 marc Exp $
2 Copyright (c) 2005-2006, Index Data.
4 See the LICENSE file for details
11 #include "filter_load_balance.hpp"
14 #include <boost/thread/mutex.hpp>
15 #include <boost/date_time/posix_time/posix_time.hpp>
24 namespace mp = metaproxy_1;
25 namespace yf = mp::filter;
27 namespace metaproxy_1 {
29 class LoadBalance::Impl {
33 void process(metaproxy_1::Package & package);
34 void configure(const xmlNode * ptr);
36 // statistic manipulating functions,
37 void add_dead(unsigned long session_id);
38 //void clear_dead(unsigned long session_id);
39 void add_package(unsigned long session_id);
40 void remove_package(unsigned long session_id);
41 void add_session(unsigned long session_id, std::string target);
42 void remove_session(unsigned long session_id);
43 std::string find_session_target(unsigned long session_id);
46 unsigned int cost(std::string target);
47 unsigned int dead(std::string target);
52 unsigned int sessions;
53 unsigned int packages;
56 unsigned int c = sessions + packages + deads;
57 std::cout << "stats c:" << c
66 // local protected databases
68 std::map<std::string, TargetStat> m_target_stat;
69 std::map<unsigned long, std::string> m_session_target;
74 // define Pimpl wrapper forwarding to Impl
76 yf::LoadBalance::LoadBalance() : m_p(new Impl)
80 yf::LoadBalance::~LoadBalance()
81 { // must have a destructor because of boost::scoped_ptr
84 void yf::LoadBalance::configure(const xmlNode *xmlnode)
86 m_p->configure(xmlnode);
89 void yf::LoadBalance::process(mp::Package &package) const
91 m_p->process(package);
95 // define Implementation stuff
99 yf::LoadBalance::Impl::Impl()
103 yf::LoadBalance::Impl::~Impl()
107 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
111 void yf::LoadBalance::Impl::process(mp::Package &package)
114 bool is_closed_front = false;
116 // checking for closed front end packages
117 if (package.session().is_closed()){
118 is_closed_front = true;
121 Z_GDU *gdu_req = package.request().get();
123 // passing anything but z3950 packages
124 if (gdu_req && gdu_req->which == Z_GDU_Z3950){
126 // target selecting only on Z39.50 init request
127 if (gdu_req->u.z3950->which == Z_APDU_initRequest){
129 mp::odr odr_en(ODR_ENCODE);
130 Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
132 // extracting virtual hosts
133 std::list<std::string> vhosts;
135 mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
137 // choosing one target according to load-balancing algorithm
141 unsigned int cost = std::numeric_limits<unsigned int>::max();
143 { //locking scope for local databases
144 boost::mutex::scoped_lock scoped_lock(m_mutex);
146 // load-balancing algorithm goes here
147 //target = *vhosts.begin();
148 for(std::list<std::string>::const_iterator ivh
152 if ((*ivh).size() != 0){
154 = yf::LoadBalance::Impl::cost(*ivh);
162 // updating local database
163 add_session(package.session().id(), target);
164 yf::LoadBalance::Impl::cost(target);
165 add_package(package.session().id());
168 // copying new target into init package
169 mp::util::set_vhost_otherinfo(&(org_init->otherInfo),
171 package.request() = gdu_req;
175 // frontend Z39.50 close request is added to statistics and marked
176 else if (gdu_req->u.z3950->which == Z_APDU_close){
177 is_closed_front = true;
178 boost::mutex::scoped_lock scoped_lock(m_mutex);
179 add_package(package.session().id());
181 // any other Z39.50 package is added to statistics
183 boost::mutex::scoped_lock scoped_lock(m_mutex);
184 add_package(package.session().id());
188 // moving all package types
192 // checking for closed back end packages
193 if (package.session().is_closed()) {
194 boost::mutex::scoped_lock scoped_lock(m_mutex);
196 // marking backend dead if backend closed without fronted close
197 if (is_closed_front == false)
198 add_dead(package.session().id());
200 remove_session(package.session().id());
203 Z_GDU *gdu_res = package.response().get();
205 // passing anything but z3950 packages
206 if (gdu_res && gdu_res->which == Z_GDU_Z3950){
208 // session closing only on Z39.50 close response
209 if (gdu_res->u.z3950->which == Z_APDU_close){
210 boost::mutex::scoped_lock scoped_lock(m_mutex);
211 remove_package(package.session().id());
213 // marking backend dead if backend closed without fronted close
214 if (is_closed_front == false)
215 add_dead(package.session().id());
217 //remove_session(package.session().id());
219 // any other Z39.50 package is removed from statistics
221 boost::mutex::scoped_lock scoped_lock(m_mutex);
222 remove_package(package.session().id());
227 // getting timestamp for receiving of package
228 //boost::posix_time::ptime receive_time
229 // = boost::posix_time::microsec_clock::local_time();
230 // //<< receive_time << " "
231 // //<< to_iso_string(receive_time) << " "
232 //<< to_iso_extended_string(receive_time) << " "
235 // statistic manipulating functions,
236 void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
239 std::string target = find_session_target(session_id);
241 if (target.size() != 0){
242 std::map<std::string, TargetStat>::iterator itarg;
243 itarg = m_target_stat.find(target);
244 if (itarg != m_target_stat.end()){
245 itarg->second.deads += 1;
246 std::cout << "add_dead " << session_id << " " << target
247 << " d:" << itarg->second.deads << "\n";
252 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
253 // std::cout << "clear_dead " << session_id << "\n";
256 void yf::LoadBalance::Impl::add_package(unsigned long session_id){
258 std::string target = find_session_target(session_id);
260 if (target.size() != 0){
261 std::map<std::string, TargetStat>::iterator itarg;
262 itarg = m_target_stat.find(target);
263 if (itarg != m_target_stat.end()){
264 itarg->second.packages += 1;
265 std::cout << "add_package " << session_id << " " << target
266 << " p:" << itarg->second.packages << "\n";
271 void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
272 std::string target = find_session_target(session_id);
274 if (target.size() != 0){
275 std::map<std::string, TargetStat>::iterator itarg;
276 itarg = m_target_stat.find(target);
277 if (itarg != m_target_stat.end()
278 && itarg->second.packages > 0){
279 itarg->second.packages -= 1;
280 std::cout << "remove_package " << session_id << " " << target
281 << " p:" << itarg->second.packages << "\n";
286 void yf::LoadBalance::Impl::add_session(unsigned long session_id,
289 // finding and adding session
290 std::map<unsigned long, std::string>::iterator isess;
291 isess = m_session_target.find(session_id);
292 if (isess == m_session_target.end()){
293 m_session_target.insert(std::make_pair(session_id, target));
296 // finding and adding target statistics
297 std::map<std::string, TargetStat>::iterator itarg;
298 itarg = m_target_stat.find(target);
299 if (itarg == m_target_stat.end()){
302 stat.packages = 0; // no idea why the defaut constructor TargetStat()
303 stat.deads = 0; // is not initializig this correctly to zero ??
304 m_target_stat.insert(std::make_pair(target, stat));
305 std::cout << "add_session " << session_id << " " << target
308 itarg->second.sessions += 1;
309 std::cout << "add_session " << session_id << " " << target
310 << " s:" << itarg->second.sessions << "\n";
316 void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
321 std::map<unsigned long, std::string>::iterator isess;
322 isess = m_session_target.find(session_id);
323 if (isess == m_session_target.end())
326 target = isess->second;
328 // finding target statistics
329 std::map<std::string, TargetStat>::iterator itarg;
330 itarg = m_target_stat.find(target);
331 if (itarg == m_target_stat.end()){
332 m_session_target.erase(isess);
336 // counting session down
337 if (itarg->second.sessions > 0)
338 itarg->second.sessions -= 1;
340 std::cout << "remove_session " << session_id << " " << target
341 << " s:" << itarg->second.sessions << "\n";
343 // clearing empty sessions and targets
344 if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
345 m_target_stat.erase(itarg);
346 m_session_target.erase(isess);
351 yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
354 std::map<unsigned long, std::string>::iterator isess;
355 isess = m_session_target.find(session_id);
356 if (isess != m_session_target.end())
357 target = isess->second;
363 unsigned int yf::LoadBalance::Impl::cost(std::string target){
367 if (target.size() != 0){
368 std::map<std::string, TargetStat>::iterator itarg;
369 itarg = m_target_stat.find(target);
370 if (itarg != m_target_stat.end()){
371 cost = itarg->second.cost();
375 std::cout << "cost " << target << " c:" << cost << "\n";
379 unsigned int yf::LoadBalance::Impl::dead(std::string target){
380 std::cout << "dead " << target << "\n";
387 static mp::filter::Base* filter_creator()
389 return new mp::filter::LoadBalance;
393 struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
404 * indent-tabs-mode: nil
405 * c-file-style: "stroustrup"
407 * vim: shiftwidth=4 tabstop=8 expandtab