-/* $Id: sel_thread.c,v 1.2 2007-04-20 10:15:19 adam Exp $
+/* $Id: sel_thread.c,v 1.3 2007-04-20 11:44:58 adam Exp $
Copyright (c) 2006-2007, Index Data.
This file is part of Pazpar2.
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
+#include <assert.h>
struct work_item {
void *data;
struct work_item *next;
};
+static struct work_item *queue_remove_last(struct work_item **q)
+{
+ struct work_item **work_p = q, *work_this = 0;
+
+ while (*work_p && (*work_p)->next)
+ work_p = &(*work_p)->next;
+ if (*work_p)
+ {
+ work_this = *work_p;
+ *work_p = 0;
+ }
+ return work_this;
+}
+
struct sel_thread {
int fd[2];
NMEM nmem;
while(1)
{
- struct work_item **work_p, *work_this;
+ struct work_item *work_this = 0;
/* wait for some work */
pthread_mutex_lock(&p->mutex);
while (!p->stop_flag && !p->input_queue)
if (p->stop_flag)
break;
/* got something. Take the last one out of input_queue */
- work_p = &p->input_queue;
- while ((*work_p)->next)
- work_p = &(*work_p)->next;
- work_this = *work_p;
- *work_p = 0;
+
+ assert(p->input_queue);
+ work_this = queue_remove_last(&p->input_queue);
+ assert(work_this);
+
pthread_mutex_unlock(&p->mutex);
/* work on this item */
p->work_handler(work_this->data);
-
+
/* put it back into output queue */
pthread_mutex_lock(&p->mutex);
work_this->next = p->output_queue;
void sel_thread_destroy(sel_thread_t p)
{
pthread_mutex_lock(&p->mutex);
-
p->stop_flag = 1;
pthread_cond_broadcast(&p->input_data);
pthread_mutex_unlock(&p->mutex);
struct work_item *work_p;
pthread_mutex_lock(&p->mutex);
- work_p = p->free_queue;
- if (!work_p)
+
+ if (p->free_queue)
+ {
+ work_p = p->free_queue;
+ p->free_queue = p->free_queue->next;
+ }
+ else
work_p = nmem_malloc(p->nmem, sizeof(*work_p));
work_p->data = data;
work_p->next = p->input_queue;
p->input_queue = work_p;
+
+ pthread_cond_signal(&p->input_data);
pthread_mutex_unlock(&p->mutex);
}
void *sel_thread_result(sel_thread_t p)
{
- struct work_item **work_p, *work_this;
- void *data;
+ struct work_item *work_this = 0;
+ void *data = 0;
char read_buf[1];
- /* got something. Take the last one out of output_queue */
- work_p = &p->output_queue;
- if (!*work_p)
- return 0;
-
- read(p->fd[0], read_buf, 1);
-
- while ((*work_p)->next)
- work_p = &(*work_p)->next;
- work_this = *work_p;
- *work_p = 0;
-
- /* put freed item in free list */
- work_this->next = p->free_queue;
- p->free_queue = work_this;
+ pthread_mutex_lock(&p->mutex);
- data = work_this->data;
+ /* got something. Take the last one out of output_queue */
+ work_this = queue_remove_last(&p->output_queue);
+ if (work_this)
+ {
+ /* put freed item in free list */
+ work_this->next = p->free_queue;
+ p->free_queue = work_this;
+
+ data = work_this->data;
+ read(p->fd[0], read_buf, 1);
+ }
pthread_mutex_unlock(&p->mutex);
-
return data;
}
-/* $Id: test_sel_thread.c,v 1.1 2007-04-20 10:06:52 adam Exp $
+/* $Id: test_sel_thread.c,v 1.2 2007-04-20 11:44:58 adam Exp $
Copyright (c) 2006-2007, Index Data.
This file is part of Pazpar2.
#endif
#include "sel_thread.h"
+#include "eventl.h"
#include <yaz/test.h>
+#include <yaz/xmalloc.h>
+/** \brief stuff we work on in separate thread */
struct my_work_data {
int x;
+ int y;
};
+/** \brief work to be carried out in separate thrad */
static void work_handler(void *vp)
{
struct my_work_data *p = vp;
- p->x += 2;
+ p->y = p->x * 2;
}
+/** \brief see if we can create and destroy without problems */
static void test_1(void)
{
int fd;
sel_thread_t p = sel_thread_create(work_handler, &fd);
YAZ_CHECK(p);
+ if (!p)
+ return;
sel_thread_destroy(p);
}
+
+void iochan_handler(struct iochan *i, int event)
+{
+ static int number = 0;
+ sel_thread_t p = iochan_getdata(i);
+
+ if (event & EVENT_INPUT)
+ {
+ struct my_work_data *work;
+
+ work = sel_thread_result(p);
+
+ YAZ_CHECK(work);
+ if (work)
+ {
+ YAZ_CHECK_EQ(work->x * 2, work->y);
+ /* stop work after a couple of iterations */
+ if (work->x > 10)
+ iochan_destroy(i);
+
+ xfree(work);
+ }
+
+ }
+ if (event & EVENT_TIMEOUT)
+ {
+ struct my_work_data *work;
+
+ work = xmalloc(sizeof(*work));
+ work->x = number;
+ sel_thread_add(p, work);
+
+ work = xmalloc(sizeof(*work));
+ work->x = number+1;
+ sel_thread_add(p, work);
+
+ number += 10;
+ }
+}
+
+/** brief use the fd for something */
+static void test_2(void)
+{
+ int thread_fd;
+ sel_thread_t p = sel_thread_create(work_handler, &thread_fd);
+ YAZ_CHECK(p);
+ if (p)
+ {
+ IOCHAN chan = iochan_create(thread_fd, iochan_handler,
+ EVENT_INPUT|EVENT_TIMEOUT);
+ iochan_settimeout(chan, 1);
+ iochan_setdata(chan, p);
+
+ event_loop(&chan);
+ }
+ sel_thread_destroy(p);
+}
+
int main(int argc, char **argv)
{
YAZ_CHECK_INIT(argc, argv);
YAZ_CHECK_LOG();
test_1();
+ test_2();
YAZ_CHECK_TERM;
}