1 /* $Id: sel_thread.c,v 1.4 2007-04-23 08:06:21 adam Exp $
2 Copyright (c) 2006-2007, Index Data.
4 This file is part of Pazpar2.
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE. If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
26 #include "sel_thread.h"
36 struct work_item *next;
39 static struct work_item *queue_remove_last(struct work_item **q)
41 struct work_item **work_p = q, *work_this = 0;
43 while (*work_p && (*work_p)->next)
44 work_p = &(*work_p)->next;
53 static void queue_trav(struct work_item *q, void (*f)(void *data))
55 for (; q; q = q->next)
63 pthread_mutex_t mutex;
64 pthread_cond_t input_data;
67 struct work_item *input_queue;
68 struct work_item *output_queue;
69 struct work_item *free_queue;
70 void (*work_handler)(void *work_data);
71 void (*work_destroy)(void *work_data);
74 static void *sel_thread_handler(void *vp)
76 sel_thread_t p = (sel_thread_t) vp;
80 struct work_item *work_this = 0;
81 /* wait for some work */
82 pthread_mutex_lock(&p->mutex);
83 while (!p->stop_flag && !p->input_queue)
84 pthread_cond_wait(&p->input_data, &p->mutex);
85 /* see if we were waken up because we're shutting down */
88 /* got something. Take the last one out of input_queue */
90 assert(p->input_queue);
91 work_this = queue_remove_last(&p->input_queue);
94 pthread_mutex_unlock(&p->mutex);
96 /* work on this item */
97 p->work_handler(work_this->data);
99 /* put it back into output queue */
100 pthread_mutex_lock(&p->mutex);
101 work_this->next = p->output_queue;
102 p->output_queue = work_this;
103 pthread_mutex_unlock(&p->mutex);
105 /* wake up select/poll with a single byte */
106 write(p->fd[1], "", 1);
108 pthread_mutex_unlock(&p->mutex);
112 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
113 void (*work_destroy)(void *work_data),
114 int *read_fd, int no_of_threads)
117 NMEM nmem = nmem_create();
118 sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
120 assert(work_handler);
121 /* work_destroy may be NULL */
123 assert(no_of_threads >= 1);
135 p->work_handler = work_handler;
136 p->work_destroy = work_destroy;
139 p->no_threads = no_of_threads;
140 pthread_mutex_init(&p->mutex, 0);
141 pthread_cond_init(&p->input_data, 0);
143 p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
144 for (i = 0; i < p->no_threads; i++)
145 pthread_create (p->thread_id + i, 0, sel_thread_handler, p);
149 void sel_thread_destroy(sel_thread_t p)
152 pthread_mutex_lock(&p->mutex);
154 pthread_cond_broadcast(&p->input_data);
155 pthread_mutex_unlock(&p->mutex);
157 for (i = 0; i< p->no_threads; i++)
158 pthread_join(p->thread_id[i], 0);
162 queue_trav(p->input_queue, p->work_destroy);
163 queue_trav(p->output_queue, p->work_destroy);
168 pthread_cond_destroy(&p->input_data);
169 pthread_mutex_destroy(&p->mutex);
170 nmem_destroy(p->nmem);
173 void sel_thread_add(sel_thread_t p, void *data)
175 struct work_item *work_p;
177 pthread_mutex_lock(&p->mutex);
181 work_p = p->free_queue;
182 p->free_queue = p->free_queue->next;
185 work_p = nmem_malloc(p->nmem, sizeof(*work_p));
188 work_p->next = p->input_queue;
189 p->input_queue = work_p;
191 pthread_cond_signal(&p->input_data);
192 pthread_mutex_unlock(&p->mutex);
195 void *sel_thread_result(sel_thread_t p)
197 struct work_item *work_this = 0;
201 pthread_mutex_lock(&p->mutex);
203 /* got something. Take the last one out of output_queue */
204 work_this = queue_remove_last(&p->output_queue);
207 /* put freed item in free list */
208 work_this->next = p->free_queue;
209 p->free_queue = work_this;
211 data = work_this->data;
212 read(p->fd[0], read_buf, 1);
214 pthread_mutex_unlock(&p->mutex);
221 * indent-tabs-mode: nil
223 * vim: shiftwidth=4 tabstop=8 expandtab