new eina api/object - eina thread queues

@feature

This is a new feature for eina (and EFL) - a zero-copy thread message
queue for sending messages from one thread to another or from the
ecore mainloop to or back to the mainloop from threads. It has a
complete test suite too.
This commit is contained in:
Carsten Haitzler 2014-07-15 17:02:07 +09:00
parent 5c3cc82bdd
commit 3c130836ad
9 changed files with 1368 additions and 3 deletions

View File

@ -188,6 +188,7 @@ tests/ecore/ecore_test_ecore_imf.c \
tests/ecore/ecore_test_timer.c \
tests/ecore/ecore_test_ecore_evas.c \
tests/ecore/ecore_test_animator.c \
tests/ecore/ecore_test_ecore_thread_eina_thread_queue.c \
tests/ecore/ecore_suite.h
tests_ecore_ecore_suite_CPPFLAGS = -I$(top_builddir)/src/lib/efl \

View File

@ -81,7 +81,8 @@ lib/eina/eina_inline_lock_barrier.x \
lib/eina/eina_tmpstr.h \
lib/eina/eina_alloca.h \
lib/eina/eina_cow.h \
lib/eina/eina_inline_unicode.x
lib/eina/eina_inline_unicode.x \
lib/eina/eina_thread_queue.h
# Will be back for developper after 1.2.
# lib/eina/eina_model.h
@ -144,7 +145,8 @@ lib/eina/eina_value.c \
lib/eina/eina_xattr.c \
lib/eina/eina_share_common.h \
lib/eina/eina_private.h \
lib/eina/eina_strbuf_common.h
lib/eina/eina_strbuf_common.h \
lib/eina/eina_thread_queue.c
# Will be back for developper after 1.2
# lib/eina/eina_model.c \

View File

@ -263,6 +263,7 @@ extern "C" {
#include "eina_xattr.h"
#include "eina_value.h"
#include "eina_cow.h"
#include "eina_thread_queue.h"
#ifdef __cplusplus
}

View File

@ -158,6 +158,7 @@ EAPI Eina_Inlist *_eina_tracking = NULL;
S(thread);
S(cow);
S(cpu);
S(thread_queue);
/* no model for now
S(model);
*/
@ -201,7 +202,8 @@ static const struct eina_desc_setup _eina_desc_setup[] = {
S(tmpstr),
S(thread),
S(cow),
S(cpu)
S(cpu),
S(thread_queue)
/* no model for now
S(model)
*/

View File

@ -0,0 +1,512 @@
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <unistd.h>
#include "Eina.h"
#include "eina_thread_queue.h"
#include "eina_safety_checks.h"
#ifdef __ATOMIC_RELAXED
#define ATOMIC 1
#endif
// use spinlocks for read/write locks as they lead to more throughput and
// these locks are meant to be held very temporarily, if there is any
// contention at all
#define RW_SPINLOCK 1
#ifdef RW_SPINLOCK
#define RWLOCK Eina_Spinlock
#define RWLOCK_NEW(x) eina_spinlock_new(x)
#define RWLOCK_FREE(x) eina_spinlock_free(x)
#define RWLOCK_LOCK(x) eina_spinlock_take(x)
#define RWLOCK_UNLOCK(x) eina_spinlock_release(x)
#else
#define RWLOCK Eina_Lock
#define RWLOCK_NEW(x) eina_lock_new(x)
#define RWLOCK_FREE(x) eina_lock_free(x)
#define RWLOCK_LOCK(x) eina_lock_take(x)
#define RWLOCK_UNLOCK(x) eina_lock_release(x)
#endif
typedef struct _Eina_Thread_Queue_Msg_Block Eina_Thread_Queue_Msg_Block;
struct _Eina_Thread_Queue
{
Eina_Thread_Queue_Msg_Block *data; // all the data being written to
Eina_Thread_Queue_Msg_Block *last; // the last block where new data goes
Eina_Thread_Queue_Msg_Block *read; // block when reading starts from data
Eina_Thread_Queue *parent; // parent queue to wake on send
RWLOCK lock_read; // a lock for when doing reads
RWLOCK lock_write; // a lock for doing writes
Eina_Semaphore sem; // signalling - one per message
#ifndef ATOMIC
Eina_Spinlock lock_pending; // lock for pending field
#endif
int pending; // how many messages left to read
int fd; // optional fd to write byte to on msg
};
struct _Eina_Thread_Queue_Msg_Block
{
Eina_Thread_Queue_Msg_Block *next; // next block in the list
Eina_Lock lock_non_0_ref; // block non-0 ref state
#ifndef ATOMIC
Eina_Spinlock lock_ref; // lock for ref field
#endif
int ref; // the number of open reads/writes
int size; // the total allocated bytes of data[]
int first; // the byte pos of the first msg
int last; // the byte pos just after the last msg
int full : 1; // is this block full yet?
Eina_Thread_Queue_Msg data[1]; // data in memory beyond struct end
};
// the minimum size of any message block holding 1 or more messages
#define MIN_SIZE ((int)(4096 - sizeof(Eina_Thread_Queue_Msg_Block) + sizeof(Eina_Thread_Queue_Msg)))
// a pool of spare message blocks that are only of the minimum size so we
// avoid reallocation via malloc/free etc. to avoid free memory pages and
// pressure on the malloc subsystem
static int _eina_thread_queue_block_pool_count = 0;
static Eina_Spinlock _eina_thread_queue_block_pool_lock;
static Eina_Thread_Queue_Msg_Block *_eina_thread_queue_block_pool = NULL;
// api's to get message blocks from the pool or put them back in
static Eina_Thread_Queue_Msg_Block *
_eina_thread_queue_msg_block_new(int size)
{
Eina_Thread_Queue_Msg_Block *blk;
eina_spinlock_take(&(_eina_thread_queue_block_pool_lock));
if (_eina_thread_queue_block_pool)
{
blk = _eina_thread_queue_block_pool;
if (blk->size >= size)
{
blk->first = 0;
blk->last = 0;
blk->ref = 0;
blk->full = 0;
_eina_thread_queue_block_pool = blk->next;
blk->next = NULL;
_eina_thread_queue_block_pool_count--;
eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
return blk;
}
blk = NULL;
}
eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
blk = malloc(sizeof(Eina_Thread_Queue_Msg_Block) -
sizeof(Eina_Thread_Queue_Msg) +
size);
if (!blk) return NULL;
blk->next = NULL;
#ifndef ATOMIC
eina_spinlock_new(&(blk->lock_ref));
#endif
eina_lock_new(&(blk->lock_non_0_ref));
blk->size = size;
blk->first = 0;
blk->last = 0;
blk->ref = 0;
blk->full = 0;
return blk;
}
static void
_eina_thread_queue_msg_block_real_free(Eina_Thread_Queue_Msg_Block *blk)
{
eina_lock_free(&(blk->lock_non_0_ref));
#ifndef ATOMIC
eina_spinlock_free(&(blk->lock_ref));
#endif
free(blk);
}
static void
_eina_thread_queue_msg_block_free(Eina_Thread_Queue_Msg_Block *blk)
{
if (blk->size == MIN_SIZE)
{
eina_spinlock_take(&(_eina_thread_queue_block_pool_lock));
if (_eina_thread_queue_block_pool_count < 20)
{
_eina_thread_queue_block_pool_count++;
blk->next = _eina_thread_queue_block_pool;
_eina_thread_queue_block_pool = blk;
eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
}
else
{
eina_spinlock_release(&(_eina_thread_queue_block_pool_lock));
_eina_thread_queue_msg_block_real_free(blk);
}
}
else _eina_thread_queue_msg_block_real_free(blk);
}
static void
_eina_thread_queue_msg_block_pool_init(void)
{
eina_spinlock_new(&_eina_thread_queue_block_pool_lock);
}
static void
_eina_thread_queue_msg_block_pool_shutdown(void)
{
eina_spinlock_take(&(_eina_thread_queue_block_pool_lock));
while (_eina_thread_queue_block_pool)
{
Eina_Thread_Queue_Msg_Block *blk;
for (;;)
{
blk = _eina_thread_queue_block_pool;
if (!blk) break;
_eina_thread_queue_msg_block_real_free(blk);
_eina_thread_queue_block_pool = blk->next;
}
}
eina_spinlock_free(&_eina_thread_queue_block_pool_lock);
}
// utility functions for waiting/waking threads
static void
_eina_thread_queue_wait(Eina_Thread_Queue *thq)
{
eina_semaphore_lock(&(thq->sem));
}
static void
_eina_thread_queue_wake(Eina_Thread_Queue *thq)
{
eina_semaphore_release(&(thq->sem), 1);
}
// how to allocate or release memory within one of the message blocks for
// an arbitrary sized bit of message data. the size always includes the
// message header which tells you the size of that message
static Eina_Thread_Queue_Msg *
_eina_thread_queue_msg_alloc(Eina_Thread_Queue *thq, int size, Eina_Thread_Queue_Msg_Block **blkret)
{
Eina_Thread_Queue_Msg_Block *blk;
Eina_Thread_Queue_Msg *msg = NULL;
// round up to nearest 8
size = ((size + 7) >> 3) << 3;
if (!thq->data)
{
if (size < MIN_SIZE)
thq->data = _eina_thread_queue_msg_block_new(MIN_SIZE);
else
thq->data = _eina_thread_queue_msg_block_new(size);
thq->last = thq->data;
}
blk = thq->last;
if (blk->full)
{
if (size < MIN_SIZE)
blk->next = _eina_thread_queue_msg_block_new(MIN_SIZE);
else
blk->next = _eina_thread_queue_msg_block_new(size);
blk = blk->next;
thq->last = blk;
}
if ((blk->size - blk->last) >= size)
{
blk->last += size;
if (blk->last == blk->size) blk->full = 1;
msg = (Eina_Thread_Queue_Msg *)((char *)(&(blk->data[0])) + (blk->last - size));
}
else
{
if (size < MIN_SIZE)
blk->next = _eina_thread_queue_msg_block_new(MIN_SIZE);
else
blk->next = _eina_thread_queue_msg_block_new(size);
blk = blk->next;
thq->last = blk;
blk->last += size;
if (blk->last == blk->size) blk->full = 1;
msg = (Eina_Thread_Queue_Msg *)(&(blk->data[0]));
}
msg->size = size;
#ifdef ATOMIC
{
int ref = __atomic_add_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
if (ref == 1) eina_lock_take(&(blk->lock_non_0_ref));
}
#else
eina_spinlock_take(&(blk->lock_ref));
blk->ref++;
if (blk->ref == 1) eina_lock_take(&(blk->lock_non_0_ref));
eina_spinlock_release(&(blk->lock_ref));
#endif
*blkret = blk;
return msg;
}
static void
_eina_thread_queue_msg_alloc_done(Eina_Thread_Queue_Msg_Block *blk)
{
#ifdef ATOMIC
{
int ref = __atomic_sub_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
if (ref == 0) eina_lock_release(&(blk->lock_non_0_ref));
}
#else
eina_spinlock_take(&(blk->lock_ref));
blk->ref--;
if (blk->ref == 0) eina_lock_release(&(blk->lock_non_0_ref));
eina_spinlock_release(&(blk->lock_ref));
#endif
}
static Eina_Thread_Queue_Msg *
_eina_thread_queue_msg_fetch(Eina_Thread_Queue *thq, Eina_Thread_Queue_Msg_Block **blkret)
{
Eina_Thread_Queue_Msg_Block *blk;
Eina_Thread_Queue_Msg *msg;
int ref;
if (!thq->read)
{
RWLOCK_LOCK(&(thq->lock_write));
blk = thq->data;
if (!blk)
{
RWLOCK_UNLOCK(&(thq->lock_write));
return NULL;
}
#ifdef ATOMIC
__atomic_load(&(blk->ref), &ref, __ATOMIC_RELAXED);
#else
eina_spinlock_take(&(blk->lock_ref));
ref = blk->ref;
eina_spinlock_release(&(blk->lock_ref));
#endif
if (ref > 0) eina_lock_take(&(blk->lock_non_0_ref));
thq->read = blk;
if (thq->last == blk) thq->last = blk->next;
thq->data = blk->next;
blk->next = NULL;
eina_lock_release(&(blk->lock_non_0_ref));
RWLOCK_UNLOCK(&(thq->lock_write));
}
blk = thq->read;
msg = (Eina_Thread_Queue_Msg *)((char *)(&(blk->data[0])) + blk->first);
blk->first += msg->size;
if (blk->first >= blk->last) thq->read = NULL;
*blkret = blk;
#ifdef ATOMIC
__atomic_add_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
#else
eina_spinlock_take(&(blk->lock_ref));
blk->ref++;
eina_spinlock_release(&(blk->lock_ref));
#endif
return msg;
}
static void
_eina_thread_queue_msg_fetch_done(Eina_Thread_Queue_Msg_Block *blk)
{
int ref;
#ifdef ATOMIC
ref = __atomic_sub_fetch(&(blk->ref), 1, __ATOMIC_RELAXED);
#else
eina_spinlock_take(&(blk->lock_ref));
blk->ref--;
ref = blk->ref;
eina_spinlock_release(&(blk->lock_ref));
#endif
if ((blk->first >= blk->last) && (ref == 0))
_eina_thread_queue_msg_block_free(blk);
}
//////////////////////////////////////////////////////////////////////////////
Eina_Bool
eina_thread_queue_init(void)
{
_eina_thread_queue_msg_block_pool_init();
return EINA_TRUE;
}
Eina_Bool
eina_thread_queue_shutdown(void)
{
_eina_thread_queue_msg_block_pool_shutdown();
return EINA_TRUE;
}
EAPI Eina_Thread_Queue *
eina_thread_queue_new(void)
{
Eina_Thread_Queue *thq;
thq = calloc(1, sizeof(Eina_Thread_Queue));
if (!thq) return NULL;
thq->fd = -1;
eina_semaphore_new(&(thq->sem), 0);
RWLOCK_NEW(&(thq->lock_read));
RWLOCK_NEW(&(thq->lock_write));
#ifndef ATOMIC
eina_spinlock_new(&(thq->lock_pending));
#endif
return thq;
}
EAPI void
eina_thread_queue_free(Eina_Thread_Queue *thq)
{
#ifndef ATOMIC
eina_spinlock_free(&(thq->lock_pending));
#endif
RWLOCK_FREE(&(thq->lock_read));
RWLOCK_FREE(&(thq->lock_write));
eina_semaphore_free(&(thq->sem));
free(thq);
}
EAPI void *
eina_thread_queue_send(Eina_Thread_Queue *thq, int size, void **allocref)
{
Eina_Thread_Queue_Msg *msg;
Eina_Thread_Queue_Msg_Block *blk;
RWLOCK_LOCK(&(thq->lock_write));
msg = _eina_thread_queue_msg_alloc(thq, size, &blk);
RWLOCK_UNLOCK(&(thq->lock_write));
*allocref = blk;
#ifdef ATOMIC
__atomic_add_fetch(&(thq->pending), 1, __ATOMIC_RELAXED);
#else
eina_spinlock_take(&(thq->lock_pending));
thq->pending++;
eina_spinlock_release(&(thq->lock_pending));
#endif
return msg;
}
EAPI void
eina_thread_queue_send_done(Eina_Thread_Queue *thq, void *allocref)
{
_eina_thread_queue_msg_alloc_done(allocref);
_eina_thread_queue_wake(thq);
if (thq->parent)
{
void *ref;
Eina_Thread_Queue_Msg_Sub *msg;
msg = eina_thread_queue_send(thq->parent,
sizeof(Eina_Thread_Queue_Msg_Sub), &ref);
if (msg)
{
msg->queue = thq;
eina_thread_queue_send_done(thq->parent, ref);
}
}
if (thq->fd >= 0)
{
char dummy = 0;
write(thq->fd, &dummy, 1);
}
}
EAPI void *
eina_thread_queue_wait(Eina_Thread_Queue *thq, void **allocref)
{
Eina_Thread_Queue_Msg *msg;
Eina_Thread_Queue_Msg_Block *blk;
_eina_thread_queue_wait(thq);
RWLOCK_LOCK(&(thq->lock_read));
msg = _eina_thread_queue_msg_fetch(thq, &blk);
RWLOCK_UNLOCK(&(thq->lock_read));
*allocref = blk;
#ifdef ATOMIC
__atomic_sub_fetch(&(thq->pending), 1, __ATOMIC_RELAXED);
#else
eina_spinlock_take(&(thq->lock_pending));
thq->pending--;
eina_spinlock_release(&(thq->lock_pending));
#endif
return msg;
}
EAPI void
eina_thread_queue_wait_done(Eina_Thread_Queue *thq EINA_UNUSED, void *allocref)
{
_eina_thread_queue_msg_fetch_done(allocref);
}
EAPI void *
eina_thread_queue_poll(Eina_Thread_Queue *thq, void **allocref)
{
Eina_Thread_Queue_Msg *msg;
Eina_Thread_Queue_Msg_Block *blk;
RWLOCK_LOCK(&(thq->lock_read));
msg = _eina_thread_queue_msg_fetch(thq, &blk);
if (msg)
{
_eina_thread_queue_wait(thq);
*allocref = blk;
}
RWLOCK_UNLOCK(&(thq->lock_read));
if (msg)
{
#ifdef ATOMIC
__atomic_sub_fetch(&(thq->pending), 1, __ATOMIC_RELAXED);
#else
eina_spinlock_take(&(thq->lock_pending));
thq->pending--;
eina_spinlock_release(&(thq->lock_pending));
#endif
}
return msg;
}
EAPI int
eina_thread_queue_pending_get(const Eina_Thread_Queue *thq)
{
int pending;
#ifdef ATOMIC
__atomic_load(&(thq->pending), &pending, __ATOMIC_RELAXED);
#else
eina_spinlock_take(&(thq->lock_pending));
pending = thq->pending;
eina_spinlock_release(&(thq->lock_pending));
#endif
return pending;
}
EAPI void
eina_thread_queue_parent_set(Eina_Thread_Queue *thq, Eina_Thread_Queue *thq_parent)
{
thq->parent = thq_parent;
}
EAPI Eina_Thread_Queue *
eina_thread_queue_parent_get(const Eina_Thread_Queue *thq)
{
return thq->parent;
}
EAPI void
eina_thread_queue_fd_set(Eina_Thread_Queue *thq, int fd)
{
thq->fd = fd;
}
EAPI int
eina_thread_queue_fd_get(const Eina_Thread_Queue *thq)
{
return thq->fd;
}

View File

@ -0,0 +1,262 @@
#ifndef EINA_THREAD_QUEUE_H_
#define EINA_THREAD_QUEUE_H_
/**
* @addtogroup Eina_Thread_Queue_Group Thread Queue Group
* @ingroup Eina
*
* @brief These functions provide simple zero-copy message queues for threads
*
* @{
*
* @since 1.11
*/
/**
* @typedef Eina_Thread_Queue
*
* This is a uni-directional zero-copy thread message queue specifically
* designed with the idea of sending large volumes of messages with no
* copies from one thread to another (or from/to the mainloop). The ides
* is that a thread queue is created and then one or more threads send
* messages in one end and fetch messages on the other end. If you set a
* parent message queue to 1 or more queues, then this parent will wake up
* with a sub queue message, indicating which child queue woke up. This can
* be used to implement the ability to listen to multiple queues at once.
*
* @since 1.11
*/
typedef struct _Eina_Thread_Queue Eina_Thread_Queue;
/**
* @typedef Eina_Thread_Queue_Msg
*
* This is the minimal header of every message to be put into an Eina
* Thread Queue. Every message has at least this header at the start of the
* message data, with payload following. You would put this structure as
* the first struct member of every message type you have, like
* Eina_Thread_Queue_Msg_Sub does. Messages are always 8 byte aligned within
* message memory to ensure alignment of all types.
*
* @since 1.11
*/
typedef struct _Eina_Thread_Queue_Msg Eina_Thread_Queue_Msg;
/**
* @typedef Eina_Thread_Queue_Msg_Sub
*
* This is a special message type for Eina Thread Queues that have child
* queues. This is the only Message type for a parent message queue and
* it indicates which child queue was woken up with a new message to read.
* When this message is retrieved, the caller should then also fetch the
* message from the inidcated child queue too.
*
* @since 1.11
*/
typedef struct _Eina_Thread_Queue_Msg_Sub Eina_Thread_Queue_Msg_Sub;
struct _Eina_Thread_Queue_Msg
{
int size; /*< The size of the message in bytes, including this header */
};
struct _Eina_Thread_Queue_Msg_Sub
{
Eina_Thread_Queue_Msg head; /*< Standard header on all messages */
Eina_Thread_Queue *queue; /*< The child queue that woke up and needs a message fetched from it */
};
/**
* @brief Create a new thread queue
*
* @return A valid new thread queue, or NULL on failure
*
* @since 1.11
*/
EAPI Eina_Thread_Queue *
eina_thread_queue_new(void);
/**
* @brief Free a thread queue
*
* This frees a thread queue. It must no longer be in use by anything waiting
* on messages or sending them. Any pending messages will be freed without
* being processed by a listener.
*
* @param thq The thread queue to free
*
* @since 1.11
*/
EAPI void
eina_thread_queue_free(Eina_Thread_Queue *thq) EINA_ARG_NONNULL(1);
/**
* @brief Allocate a message to send down a thread queue
*
* @param thq The thred queue to allocate the message on
* @param size The size, in bytes, of the message, including standard header
* @param allocref A pointer to store a general reference handle for the message
* @return A pointer to the message data to fill in
*
* This allocates space for a new message on the message queue, but does not
* actually trigger the send. For that you will need to call
* eina_thread_queue_send_done() to complete the send and trigger the other
* side.
*
* @since 1.11
*/
EAPI void *
eina_thread_queue_send(Eina_Thread_Queue *thq, int size, void **allocref) EINA_ARG_NONNULL(1, 3);
/**
* @brief Finish sending the allocated message
*
* @param thq The thread queue the message was placed on
* @param allocref The allocref returned by eina_thread_queue_send()
*
* This completes the send and triggers the thread queue to wake up any
* listeners.
*
* @since 1.11
*/
EAPI void
eina_thread_queue_send_done(Eina_Thread_Queue *thq, void *allocref) EINA_ARG_NONNULL(1, 2);
/**
* @brief Fetch a message from a thread queue
*
* @param thq The thread queue to fetch the message from
* @param allocref A pointer to store a general reference handle for the message
* @return A pointer to the message data
*
* This will fetch the next message to read from the thread queue and return
* a pointer to it. The message is guaranteed to have an initial
* Eina_Thread_Queue_Msg member that will indicate size of the message as
* a whole. This function will wait, if no messages are available to read
* and block until a new message comes in, then return. When the message is
* finished with, the caller must use eina_thread_queue_wait_done() to indicate
* they are done.
*
* @since 1.11
*/
EAPI void *
eina_thread_queue_wait(Eina_Thread_Queue *thq, void **allocref) EINA_ARG_NONNULL(1, 2);
/**
* @brief Finish fetching a message from a thread queue
*
* @param thq The thread queue the message was fetched from
* @param allocref The allocref returned by eina_thread_queue_wait()
*
* This should be used after eina_thread_queue_wait() or
* eina_thread_queue_poll() to indicate the caller is done with the message.
*
* @since 1.11
*/
EAPI void
eina_thread_queue_wait_done(Eina_Thread_Queue *thq, void *allocref) EINA_ARG_NONNULL(1, 2);
/**
* @brief Fetch a message from a thread queue, but return immediately if there is none with NULL
*
* @param thq The thread queue to fetch the message from
* @param allocref A pointer to store a general reference handle for the message
* @return A pointer to the message data
*
* This is the same as eina_thread_queue_wait(), but if no messages are
* available for reading, it immediately returns NULL tot he caller, without
* waiting for a new message to arrive.
*
* @see eina_thread_queue_wait()
*
* @since 1.11
*/
EAPI void *
eina_thread_queue_poll(Eina_Thread_Queue *thq, void **allocref) EINA_ARG_NONNULL(1, 2);
/**
* @brief Get the number of messages on a queue as yet unfetched
*
* @param thq The thread queue to query for pending count
* @return The number of messages waiting to be fetched
*
* This returns the number of messages waiting to be fetched with
* eina_thread_queue_wait() or eina_thread_queue_poll().
*
* @since 1.11
*/
EAPI int
eina_thread_queue_pending_get(const Eina_Thread_Queue *thq) EINA_ARG_NONNULL(1);
/**
* @brief Set the parent of a thread queue (make this one a child)
*
* @param thq The thread queue to alter the parent of
* @param thq_parent The new parent to set
*
* This sets the parent queue where messages will be reported to. This is
* how you can listen to multiple queues at once - set multiple queues to
* have the same parent and then just wait on that one parent. This should
* be done before any messages are read from or written to the queue. To unset
* a parent, just set the parent to NULL.
*
* @since 1.11
*/
EAPI void
eina_thread_queue_parent_set(Eina_Thread_Queue *thq, Eina_Thread_Queue *thq_parent) EINA_ARG_NONNULL(1);
/**
* @brief Get the parent of a thread queue
*
* @param thq The thread queue to get the parent of
* @return The parent thread queue
*
* This gets the paren set by eina_thread_queue_parent_get(). If no parent
* is set, NULL is returned.
*
* @see eina_thread_queue_parent_set()
*
* @since 1.11
*/
EAPI Eina_Thread_Queue *
eina_thread_queue_parent_get(const Eina_Thread_Queue *thq) EINA_ARG_NONNULL(1);
/**
* @brief Set a file descriptor to write a byte to on a message send
*
* @param thq The thread queue to set the file descriptor of
* @param fd The fd to set, or -1 to unset it
*
* This sets a file descriptor to write to when a message is written to the
* thread queue. This can be used to glue a thread queue to something like
* an Ecore_Pipe that can wake up the mainloop and call a callbck whenever
* data is available on the pipe. The number of bytes available will be
* the number of messages to fetch from the associated thread queue.
*
* You should set this up before anything writes to or reads from this
* thread queue.
*
* @since 1.11
*/
EAPI void
eina_thread_queue_fd_set(Eina_Thread_Queue *thq, int fd) EINA_ARG_NONNULL(1);
/**
* @brief Get the file descriptor written to on message sends
*
* @param thq The thread queue to get the file descriptor of
* @return The file descriptor set (or -1 if none is set).
*
* This returns the file descriptor set by eina_thread_queue_fd_set() and
* by default returns -1 (no fd set).
*
* @see eina_thread_queue_fd_set()
*
* @since 1.11
*/
EAPI int
eina_thread_queue_fd_get(const Eina_Thread_Queue *thq) EINA_ARG_NONNULL(1);
/**
* @}
*/
#endif

View File

@ -28,6 +28,7 @@ static const Ecore_Test_Case etc[] = {
{ "Ecore_Timers", ecore_test_timer },
{ "Ecore_Evas", ecore_test_ecore_evas },
{ "Ecore_Animators", ecore_test_animator },
{ "Ecore_Test_Ccore_Thread_Eina_Thread_Queue", ecore_test_ecore_thread_eina_thread_queue },
{ NULL, NULL }
};

View File

@ -11,5 +11,6 @@ void ecore_test_ecore_audio(TCase *tc);
void ecore_test_timer(TCase *tc);
void ecore_test_ecore_evas(TCase *tc);
void ecore_test_animator(TCase *tc);
void ecore_test_ecore_thread_eina_thread_queue(TCase *tc);
#endif /* _ECORE_SUITE_H */

View File

@ -0,0 +1,583 @@
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <Ecore.h>
#include <Eina.h>
#include <unistd.h>
#include <stdio.h>
#include "ecore_suite.h"
/////////////////////////////////////////////////////////////////////////////
static Eina_Thread_Queue *thq1, *thq2, *thqmaster;
/////////////////////////////////////////////////////////////////////////////
typedef struct
{
Eina_Thread_Queue_Msg head;
int value;
char pad[10];
} Msg;
static void
th1_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int val = 100;
for (;;)
{
Msg *msg;
void *ref;
usleep((rand() % 10) * 1000);
msg = eina_thread_queue_send(thq1, sizeof(Msg), &ref);
msg->value = val;
eina_thread_queue_send_done(thq1, ref);
if (val == 1000) break;
val++;
}
}
static void
th2_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int val;
for (;;)
{
Msg *msg;
void *ref;
msg = eina_thread_queue_wait(thq1, &ref);
val = msg->value;
usleep((rand() % 20) * 1000);
eina_thread_queue_wait_done(thq1, ref);
msg = eina_thread_queue_send(thq2, sizeof(Msg), &ref);
msg->value = val;
eina_thread_queue_send_done(thq2, ref);
if (val == 1000) break;
}
}
START_TEST(ecore_test_ecore_thread_eina_thread_queue_t1)
{
int val = 99;
eina_init();
ecore_init();
thq1 = eina_thread_queue_new();
thq2 = eina_thread_queue_new();
ecore_thread_feedback_run(th1_do, NULL, NULL, NULL, NULL, EINA_TRUE);
ecore_thread_feedback_run(th2_do, NULL, NULL, NULL, NULL, EINA_TRUE);
for (;;)
{
Msg *msg;
void *ref;
msg = eina_thread_queue_wait(thq2, &ref);
printf("V: %i [%i]\n", msg->value, eina_thread_queue_pending_get(thq2));
if (msg->value != (val + 1))
{
printf("ERRR %i not next after %i\n", msg->value, val);
fail();
}
val = msg->value;
usleep((rand() % 30) * 1000);
eina_thread_queue_wait_done(thq2, ref);
if (val == 1000) break;
}
}
END_TEST
/////////////////////////////////////////////////////////////////////////////
typedef struct
{
Eina_Thread_Queue_Msg head;
int ops[1];
} Msg2;
static int msgs = 0;
static void
thspeed1_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
Msg2 *msg;
void *ref;
for (;;)
{
msg = eina_thread_queue_wait(thq1, &ref);
if (msg)
{
eina_thread_queue_wait_done(thq1, ref);
msgs++;
}
if (msgs == 10000000)
{
printf("msgs done\n");
break;
}
}
}
START_TEST(ecore_test_ecore_thread_eina_thread_queue_t2)
{
Msg2 *msg;
void *ref;
int i;
eina_init();
ecore_init();
thq1 = eina_thread_queue_new();
ecore_thread_feedback_run(thspeed1_do, NULL, NULL, NULL, NULL, EINA_TRUE);
for (i = 0; i < 10000000; i++)
{
msg = eina_thread_queue_send(thq1, sizeof(Msg2), &ref);
if (msg) eina_thread_queue_send_done(thq1, ref);
}
if (msgs < 1000000)
{
printf("ERR: not enough messages recieved\n");
fail();
}
printf("%i messages sent\n", i);
}
END_TEST
/////////////////////////////////////////////////////////////////////////////
typedef struct
{
Eina_Thread_Queue_Msg head;
int value;
} Msg3;
static void
th21_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int val = 100;
for (;;)
{
Msg3 *msg;
void *ref;
msg = eina_thread_queue_send(thq1, sizeof(Msg3), &ref);
msg->value = val;
eina_thread_queue_send_done(thq1, ref);
val++;
if (val == 1000100) break;
}
}
static void
th22_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int val = 100;
for (;;)
{
Msg3 *msg;
void *ref;
msg = eina_thread_queue_send(thq2, sizeof(Msg3), &ref);
msg->value = val;
eina_thread_queue_send_done(thq2, ref);
val++;
if (val == 1000100) break;
}
}
START_TEST(ecore_test_ecore_thread_eina_thread_queue_t3)
{
int val1 = 99, val2 = 99, cnt = 0;
eina_init();
ecore_init();
thq1 = eina_thread_queue_new();
thq2 = eina_thread_queue_new();
thqmaster = eina_thread_queue_new();
eina_thread_queue_parent_set(thq1, thqmaster);
eina_thread_queue_parent_set(thq2, thqmaster);
ecore_thread_feedback_run(th21_do, NULL, NULL, NULL, NULL, EINA_TRUE);
ecore_thread_feedback_run(th22_do, NULL, NULL, NULL, NULL, EINA_TRUE);
for (;;)
{
Eina_Thread_Queue_Msg_Sub *sub;
Eina_Thread_Queue *thq;
Msg3 *msg;
void *ref;
sub = eina_thread_queue_wait(thqmaster, &ref);
thq = sub->queue;
eina_thread_queue_wait_done(thqmaster, ref);
msg = eina_thread_queue_wait(thq, &ref);
printf("V %09i: %p - %i [%i]\n", cnt, thq, msg->value, eina_thread_queue_pending_get(thqmaster));
if (thq == thq1)
{
if ((val1 + 1) != msg->value)
{
printf("ERR: thq1 val wrong %i -> %i\n", val1, msg->value);
fail();
}
val1 = msg->value;
}
else if (thq == thq2)
{
if ((val2 + 1) != msg->value)
{
printf("ERR: thq2 val wrong %i -> %i\n", val2, msg->value);
fail();
}
val2 = msg->value;
}
else
{
printf("ERRR: unknown thq\n");
fail();
}
eina_thread_queue_wait_done(thq, ref);
cnt++;
if (cnt == 2000000) break;
}
printf("enough msgs\n");
}
END_TEST
/////////////////////////////////////////////////////////////////////////////
typedef struct
{
Eina_Thread_Queue_Msg head;
int value;
} Msg4;
static void
th31_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int val = 100;
for (;;)
{
Msg4 *msg;
void *ref;
msg = eina_thread_queue_send(thq1, sizeof(Msg4), &ref);
msg->value = val;
eina_thread_queue_send_done(thq1, ref);
val++;
if (val == 1000100) break;
}
}
static void
th32_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int val = 10000000;
for (;;)
{
Msg4 *msg;
void *ref;
msg = eina_thread_queue_send(thq1, sizeof(Msg4), &ref);
msg->value = val;
eina_thread_queue_send_done(thq1, ref);
val++;
if (val == 11000000) break;
}
}
START_TEST(ecore_test_ecore_thread_eina_thread_queue_t4)
{
int cnt = 0;
int val1 = 99, val2 = 9999999;
eina_init();
ecore_init();
thq1 = eina_thread_queue_new();
ecore_thread_feedback_run(th31_do, NULL, NULL, NULL, NULL, EINA_TRUE);
ecore_thread_feedback_run(th32_do, NULL, NULL, NULL, NULL, EINA_TRUE);
for (;;)
{
Msg4 *msg;
void *ref;
msg = eina_thread_queue_wait(thq1, &ref);
printf("V %08i: %i [%i]\n", cnt, msg->value, eina_thread_queue_pending_get(thq1));
if (msg->value >= 10000000)
{
if ((val2 + 1) != msg->value)
{
printf("ERR: val wrong %i -> %i\n", val2, msg->value);
fail();
}
val2 = msg->value;
}
else
{
if ((val1 + 1) != msg->value)
{
printf("ERR: val wrong %i -> %i\n", val1, msg->value);
fail();
}
val1 = msg->value;
}
eina_thread_queue_wait_done(thq1, ref);
cnt++;
if (cnt == 2000000) break;
}
printf("msgs ok\n");
}
END_TEST
/////////////////////////////////////////////////////////////////////////////
typedef struct
{
Eina_Thread_Queue_Msg head;
int value;
char pad[10];
} Msg5;
static void
th41_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int val = 100;
for (;;)
{
Msg5 *msg;
void *ref;
usleep((rand() % 10) * 1000);
msg = eina_thread_queue_send(thq1, sizeof(Msg5), &ref);
msg->value = val;
eina_thread_queue_send_done(thq1, ref);
if (val == 1100) break;
val++;
}
}
static void
th42_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int val;
for (;;)
{
Msg5 *msg;
void *ref;
msg = eina_thread_queue_wait(thq1, &ref);
val = msg->value;
usleep((rand() % 20) * 1000);
eina_thread_queue_wait_done(thq1, ref);
msg = eina_thread_queue_send(thq2, sizeof(Msg5), &ref);
msg->value = val;
eina_thread_queue_send_done(thq2, ref);
if (val == 1100) break;
}
}
START_TEST(ecore_test_ecore_thread_eina_thread_queue_t5)
{
int val = 99;
eina_init();
ecore_init();
thq1 = eina_thread_queue_new();
thq2 = eina_thread_queue_new();
ecore_thread_feedback_run(th41_do, NULL, NULL, NULL, NULL, EINA_TRUE);
ecore_thread_feedback_run(th42_do, NULL, NULL, NULL, NULL, EINA_TRUE);
for (;;)
{
Msg5 *msg;
void *ref;
msg = eina_thread_queue_poll(thq2, &ref);
if (msg)
{
printf("V: %i [%i]\n", msg->value, eina_thread_queue_pending_get(thq2));
if (msg->value != (val + 1))
{
printf("ERRR %i not next after %i\n", msg->value, val);
fail();
}
val = msg->value;
usleep((rand() % 10) * 1000);
eina_thread_queue_wait_done(thq2, ref);
if (val == 1100) break;
}
else
{
printf("V: none!\n");
usleep((rand() % 10) * 1000);
}
}
}
END_TEST
/////////////////////////////////////////////////////////////////////////////
typedef struct
{
Eina_Thread_Queue_Msg head;
int value;
} Msg6;
static Eina_Spinlock msgnum_lock;
static int msgnum = 0;
static void
th51_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int val = 100;
for (;;)
{
Msg6 *msg;
void *ref;
msg = eina_thread_queue_send(thq1, sizeof(Msg6), &ref);
msg->value = val;
eina_thread_queue_send_done(thq1, ref);
val++;
if (val == 10100) break;
usleep(1);
}
}
static void
th52_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int cnt = 0;
for (;;)
{
Msg6 *msg;
void *ref;
msg = eina_thread_queue_wait(thq1, &ref);
printf("v %08i: %i [%i]\n", cnt, msg->value, eina_thread_queue_pending_get(thq1));
eina_thread_queue_wait_done(thq1, ref);
cnt++;
eina_spinlock_take(&msgnum_lock);
msgnum++;
eina_spinlock_release(&msgnum_lock);
}
}
static void
th53_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
int cnt = 0;
for (;;)
{
Msg6 *msg;
void *ref;
msg = eina_thread_queue_wait(thq1, &ref);
printf("v %08i: %i [%i]\n", cnt, msg->value, eina_thread_queue_pending_get(thq1));
eina_thread_queue_wait_done(thq1, ref);
cnt++;
eina_spinlock_take(&msgnum_lock);
msgnum++;
eina_spinlock_release(&msgnum_lock);
}
}
START_TEST(ecore_test_ecore_thread_eina_thread_queue_t6)
{
eina_init();
ecore_init();
eina_spinlock_new(&msgnum_lock);
thq1 = eina_thread_queue_new();
ecore_thread_feedback_run(th51_do, NULL, NULL, NULL, NULL, EINA_TRUE);
ecore_thread_feedback_run(th52_do, NULL, NULL, NULL, NULL, EINA_TRUE);
ecore_thread_feedback_run(th53_do, NULL, NULL, NULL, NULL, EINA_TRUE);
for (;;)
{
eina_spinlock_take(&msgnum_lock);
printf("msgnum %i\n", msgnum);
if (msgnum == 10000)
{
eina_spinlock_release(&msgnum_lock);
break;
}
eina_spinlock_release(&msgnum_lock);
}
printf("msg multi to 1 ok\n");
}
END_TEST
/////////////////////////////////////////////////////////////////////////////
typedef struct
{
Eina_Thread_Queue_Msg head;
int ops[1];
} Msg7;
int p[2];
static void
thspeed21_do(void *data EINA_UNUSED, Ecore_Thread *th EINA_UNUSED)
{
Msg7 *msg;
void *ref;
int i;
for (i = 0; i < 1000000; i++)
{
msg = eina_thread_queue_send(thq1, sizeof(Msg7), &ref);
if (msg) eina_thread_queue_send_done(thq1, ref);
}
}
START_TEST(ecore_test_ecore_thread_eina_thread_queue_t7)
{
Msg7 *msg;
void *ref;
int msgcnt = 0;
eina_init();
ecore_init();
thq1 = eina_thread_queue_new();
if (pipe(p) != 0)
{
printf("ERR: pipe create fail\n");
fail();
}
eina_thread_queue_fd_set(thq1, p[1]);
ecore_thread_feedback_run(thspeed21_do, NULL, NULL, NULL, NULL, EINA_TRUE);
for (;;)
{
char buf;
read(p[0], &buf, 1);
msg = eina_thread_queue_wait(thq1, &ref);
if (msg)
{
eina_thread_queue_wait_done(thq1, ref);
msgcnt++;
printf("msgcnt: %i\n", msgcnt);
}
if (msgcnt == 1000000) break;
}
printf("msg fd ok\n");
}
END_TEST
void ecore_test_ecore_thread_eina_thread_queue(TCase *tc EINA_UNUSED)
{
tcase_add_test(tc, ecore_test_ecore_thread_eina_thread_queue_t1);
tcase_add_test(tc, ecore_test_ecore_thread_eina_thread_queue_t2);
tcase_add_test(tc, ecore_test_ecore_thread_eina_thread_queue_t3);
tcase_add_test(tc, ecore_test_ecore_thread_eina_thread_queue_t4);
tcase_add_test(tc, ecore_test_ecore_thread_eina_thread_queue_t5);
tcase_add_test(tc, ecore_test_ecore_thread_eina_thread_queue_t6);
tcase_add_test(tc, ecore_test_ecore_thread_eina_thread_queue_t7);
}