1 /* This file is part of Pazpar2.
2 Copyright (C) 2006-2012 Index Data
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include "sel_thread.h"
34 #include <yaz/thread_create.h>
35 #include <yaz/mutex.h>
36 #include <yaz/spipe.h>
41 struct work_item *next;
44 static struct work_item *queue_remove_last(struct work_item **q)
46 struct work_item **work_p = q, *work_this = 0;
48 while (*work_p && (*work_p)->next)
49 work_p = &(*work_p)->next;
58 static void queue_trav(struct work_item *q, void (*f)(void *data))
60 for (; q; q = q->next)
69 yaz_thread_t *thread_id;
74 struct work_item *input_queue;
75 struct work_item *output_queue;
76 struct work_item *free_queue;
77 void (*work_handler)(void *work_data);
78 void (*work_destroy)(void *work_data);
81 static int input_queue_length = 0;
83 static void *sel_thread_handler(void *vp)
85 sel_thread_t p = (sel_thread_t) vp;
89 struct work_item *work_this = 0;
90 /* wait for some work */
91 yaz_mutex_enter(p->mutex);
92 while (!p->stop_flag && !p->input_queue)
93 yaz_cond_wait(p->input_data, p->mutex, 0);
94 /* see if we were waken up because we're shutting down */
97 /* got something. Take the last one out of input_queue */
99 assert(p->input_queue);
100 work_this = queue_remove_last(&p->input_queue);
101 input_queue_length--;
103 yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
107 yaz_mutex_leave(p->mutex);
109 /* work on this item */
110 p->work_handler(work_this->data);
112 /* put it back into output queue */
113 yaz_mutex_enter(p->mutex);
114 work_this->next = p->output_queue;
115 p->output_queue = work_this;
116 yaz_mutex_leave(p->mutex);
118 /* wake up select/poll with a single byte */
120 (void) send(p->write_fd, "", 1, 0);
122 (void) write(p->write_fd, "", 1);
125 yaz_mutex_leave(p->mutex);
129 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
130 void (*work_destroy)(void *work_data),
131 int *read_fd, int no_of_threads)
134 NMEM nmem = nmem_create();
135 sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
137 assert(work_handler);
138 /* work_destroy may be NULL */
140 assert(no_of_threads >= 1);
145 /* use port 12119 temporarily on Windos and hope for the best */
146 p->spipe = yaz_spipe_create(12119, 0);
148 p->spipe = yaz_spipe_create(0, 0);
156 *read_fd = p->read_fd = yaz_spipe_get_read_fd(p->spipe);
157 p->write_fd = yaz_spipe_get_write_fd(p->spipe);
162 p->work_handler = work_handler;
163 p->work_destroy = work_destroy;
164 p->no_threads = 0; /* we if need to destroy */
167 yaz_mutex_create(&p->mutex);
168 yaz_cond_create(&p->input_data);
169 if (p->input_data == 0) /* condition variable could not be created? */
171 sel_thread_destroy(p);
175 p->no_threads = no_of_threads;
176 p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
177 for (i = 0; i < p->no_threads; i++)
178 p->thread_id[i] = yaz_thread_create(sel_thread_handler, p);
182 void sel_thread_destroy(sel_thread_t p)
185 yaz_mutex_enter(p->mutex);
187 yaz_cond_broadcast(p->input_data);
188 yaz_mutex_leave(p->mutex);
190 for (i = 0; i< p->no_threads; i++)
191 yaz_thread_join(&p->thread_id[i], 0);
195 queue_trav(p->input_queue, p->work_destroy);
196 queue_trav(p->output_queue, p->work_destroy);
199 yaz_spipe_destroy(p->spipe);
200 yaz_cond_destroy(&p->input_data);
201 yaz_mutex_destroy(&p->mutex);
202 nmem_destroy(p->nmem);
205 void sel_thread_add(sel_thread_t p, void *data)
207 struct work_item *work_p;
209 yaz_mutex_enter(p->mutex);
213 work_p = p->free_queue;
214 p->free_queue = p->free_queue->next;
217 work_p = nmem_malloc(p->nmem, sizeof(*work_p));
220 work_p->next = p->input_queue;
221 p->input_queue = work_p;
222 input_queue_length++;
224 yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
226 yaz_cond_signal(p->input_data);
227 yaz_mutex_leave(p->mutex);
230 void *sel_thread_result(sel_thread_t p)
232 struct work_item *work_this = 0;
236 yaz_mutex_enter(p->mutex);
238 /* got something. Take the last one out of output_queue */
239 work_this = queue_remove_last(&p->output_queue);
242 /* put freed item in free list */
243 work_this->next = p->free_queue;
244 p->free_queue = work_this;
246 data = work_this->data;
248 (void) recv(p->read_fd, read_buf, 1, 0);
250 (void) read(p->read_fd, read_buf, 1);
253 yaz_mutex_leave(p->mutex);
260 * c-file-style: "Stroustrup"
261 * indent-tabs-mode: nil
263 * vim: shiftwidth=4 tabstop=8 expandtab