efl/legacy/ecore/src/lib/ecore/ecore_thread.c

1593 lines
44 KiB
C
Raw Normal View History

#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <sys/time.h>
#ifdef HAVE_EVIL
# include <Evil.h>
#endif
#include "Ecore.h"
#include "ecore_private.h"
#ifdef EFL_HAVE_THREADS
# ifdef EFL_HAVE_POSIX_THREADS
# include <pthread.h>
# ifdef __linux__
# include <sched.h>
# include <sys/resource.h>
# include <unistd.h>
# include <sys/syscall.h>
# include <errno.h>
# endif
# define PH(x) pthread_t x
# define PHE(x, y) pthread_equal(x, y)
# define PHS() pthread_self()
# define PHC(x, f, d) pthread_create(&(x), NULL, (void*) f, d)
# define PHJ(x, p) pthread_join(x, (void**)(&(p)))
# define PHA(x) pthread_cancel(x)
# define CD(x) pthread_cond_t x
# define CDI(x) pthread_cond_init(&(x), NULL);
# define CDD(x) pthread_cond_destroy(&(x));
# define CDB(x) pthread_cond_broadcast(&(x));
# define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t);
# define LK(x) pthread_mutex_t x
# define LKI(x) pthread_mutex_init(&(x), NULL);
# define LKD(x) pthread_mutex_destroy(&(x));
# define LKL(x) pthread_mutex_lock(&(x));
# define LKU(x) pthread_mutex_unlock(&(x));
# define LRWK(x) pthread_rwlock_t x
# define LRWKI(x) pthread_rwlock_init(&(x), NULL);
# define LRWKD(x) pthread_rwlock_destroy(&(x));
# define LRWKWL(x) pthread_rwlock_wrlock(&(x));
# define LRWKRL(x) pthread_rwlock_rdlock(&(x));
# define LRWKU(x) pthread_rwlock_unlock(&(x));
# else /* EFL_HAVE_WIN32_THREADS */
# define WIN32_LEAN_AND_MEAN
# include <windows.h>
# undef WIN32_LEAN_AND_MEAN
typedef struct
{
HANDLE thread;
void *val;
} win32_thread;
# define PH(x) win32_thread *x
# define PHE(x, y) ((x) == (y))
# define PHS() (HANDLE)GetCurrentThreadId()
int _ecore_thread_win32_create(win32_thread **x, LPTHREAD_START_ROUTINE f, void *d)
{
win32_thread *t;
t = (win32_thread *)calloc(1, sizeof(win32_thread));
if (!t)
return -1;
(t)->thread = CreateThread(NULL, 0, f, d, 0, NULL);
if (!t->thread)
{
free(t);
return -1;
}
t->val = d;
*x = t;
return 0;
}
# define PHC(x, f, d) _ecore_thread_win32_create(&(x), (LPTHREAD_START_ROUTINE)f, d)
int _ecore_thread_win32_join(win32_thread *x, void **res)
{
if (!PHE(x, PHS()))
{
WaitForSingleObject(x->thread, INFINITE);
CloseHandle(x->thread);
}
if (res) *res = x->val;
free(x);
return 0;
}
# define PHJ(x, p) _ecore_thread_win32_join(x, (void**)(&(p)))
# define PHA(x) TerminateThread(x->thread, 0)
# define LK(x) HANDLE x
# define LKI(x) x = CreateMutex(NULL, FALSE, NULL)
# define LKD(x) CloseHandle(x)
# define LKL(x) WaitForSingleObject(x, INFINITE)
# define LKU(x) ReleaseMutex(x)
typedef struct
{
HANDLE semaphore;
LONG threads_count;
CRITICAL_SECTION threads_count_lock;
} win32_cond;
# define CD(x) win32_cond *x
# define CDI(x) \
do { \
x = (win32_cond *)calloc(1, sizeof(win32_cond)); \
if (x) \
{ \
x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \
if (x->semaphore) \
InitializeCriticalSection(&x->threads_count_lock); \
else \
{ \
free(x); \
x = NULL; \
} \
} \
} while (0)
# define CDD(x) \
do { \
CloseHandle(x->semaphore); \
free(x); \
x = NULL; \
} while (0)
# define CDB(x) \
do { \
EnterCriticalSection(&x->threads_count_lock); \
if (x->threads_count > 0) \
ReleaseSemaphore(x->semaphore, x->threads_count, NULL); \
LeaveCriticalSection (&x->threads_count_lock); \
} while (0)
int _ecore_thread_win32_cond_timedwait(win32_cond *c, HANDLE *external_mutex, struct timeval *t)
{
DWORD res;
DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000);
LKL(external_mutex);
EnterCriticalSection (&c->threads_count_lock);
c->threads_count++;
LeaveCriticalSection (&c->threads_count_lock);
LKU(external_mutex);
res = WaitForSingleObject(c->semaphore, val);
if (res == WAIT_OBJECT_0)
return 0;
else
return -1;
}
# define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t)
typedef struct
{
LONG readers_count;
LONG writers_count;
int readers;
int writers;
LK(mutex);
CD(cond_read);
CD(cond_write);
} win32_rwl;
# define LRWK(x) win32_rwl *x
# define LRWKI(x) \
do { \
x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \
if (x) \
{ \
LKI(x->mutex); \
if (x->mutex) \
{ \
CDI(x->cond_read); \
if (x->cond_read) \
{ \
CDI(x->cond_write); \
if (!x->cond_write) \
{ \
CDD(x->cond_read); \
LKD(x->mutex); \
free(x); \
x = NULL; \
} \
} \
else \
{ \
LKD(x->mutex); \
free(x); \
x = NULL; \
} \
} \
else \
{ \
free(x); \
x = NULL; \
} \
} \
} while (0)
# define LRWKD(x) \
do { \
LKU(x->mutex); \
LKD(x->mutex); \
CDD(x->cond_write); \
CDD(x->cond_read); \
free(x); \
} while (0)
# define LRWKWL(x) \
do { \
DWORD res; \
LKU(x->mutex); \
if (x->writers || x->readers > 0) \
{ \
x->writers_count++; \
while (x->writers || x->readers > 0) \
{ \
EnterCriticalSection(&x->cond_write->threads_count_lock); \
x->cond_read->threads_count++; \
LeaveCriticalSection(&x->cond_write->threads_count_lock); \
res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
if (res != WAIT_OBJECT_0) break; \
} \
x->writers_count--; \
} \
if (res == 0) x->writers_count = 1; \
LKU(x->mutex); \
} while (0)
# define LRWKRL(x) \
do { \
DWORD res; \
LKL(x->mutex); \
if (x->writers) \
{ \
x->readers_count++; \
while (x->writers) \
{ \
EnterCriticalSection(&x->cond_write->threads_count_lock); \
x->cond_read->threads_count++; \
LeaveCriticalSection(&x->cond_write->threads_count_lock); \
res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
if (res != WAIT_OBJECT_0) break; \
} \
x->readers_count--; \
} \
if (res == 0) \
x->readers++; \
LKU(x->mutex); \
} while (0)
# define LRWKU(x) \
do { \
LKL(x->mutex); \
if (x->writers) \
{ \
x->writers = 0; \
if (x->readers_count == 1) \
{ \
EnterCriticalSection(&x->cond_read->threads_count_lock); \
if (x->cond_read->threads_count > 0) \
ReleaseSemaphore(x->cond_read->semaphore, 1, 0); \
LeaveCriticalSection(&x->cond_read->threads_count_lock); \
} \
else if (x->readers_count > 0) \
CDB(x->cond_read); \
else if (x->writers_count > 0) \
{ \
EnterCriticalSection (&x->cond_write->threads_count_lock); \
if (x->cond_write->threads_count > 0) \
ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
LeaveCriticalSection (&x->cond_write->threads_count_lock); \
} \
} \
else if (x->readers > 0) \
{ \
x->readers--; \
if (x->readers == 0 && x->writers_count > 0) \
{ \
EnterCriticalSection (&x->cond_write->threads_count_lock); \
if (x->cond_write->threads_count > 0) \
ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
LeaveCriticalSection (&x->cond_write->threads_count_lock); \
} \
} \
LKU(x->mutex); \
} while (0)
# endif
#endif
typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
typedef struct _Ecore_Pthread Ecore_Pthread;
typedef struct _Ecore_Thread_Data Ecore_Thread_Data;
struct _Ecore_Thread_Data
{
void *data;
Eina_Free_Cb cb;
};
struct _Ecore_Pthread_Worker
{
union {
struct {
Ecore_Thread_Cb func_blocking;
} short_run;
struct {
Ecore_Thread_Cb func_heavy;
Ecore_Thread_Notify_Cb func_notify;
Ecore_Pipe *notify;
Ecore_Pipe *direct_pipe;
Ecore_Pthread_Worker *direct_worker;
int send;
int received;
} feedback_run;
} u;
Ecore_Thread_Cb func_cancel;
Ecore_Thread_Cb func_end;
#ifdef EFL_HAVE_THREADS
PH(self);
Eina_Hash *hash;
CD(cond);
LK(mutex);
#endif
const void *data;
Eina_Bool cancel : 1;
Eina_Bool feedback_run : 1;
Eina_Bool kill : 1;
Eina_Bool reschedule : 1;
Eina_Bool no_queue : 1;
};
#ifdef EFL_HAVE_THREADS
typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
struct _Ecore_Pthread_Data
{
Ecore_Pthread_Worker *death_job;
Ecore_Pipe *p;
void *data;
PH(thread);
};
#endif
static void _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte);
static int _ecore_thread_count_max = 0;
static int ECORE_THREAD_PIPE_DEL = 0;
static Eina_Array *_ecore_thread_pipe = NULL;
static Ecore_Pipe*
_ecore_thread_pipe_get(void)
{
if (eina_array_count_get(_ecore_thread_pipe) > 0)
return eina_array_pop(_ecore_thread_pipe);
return ecore_pipe_add(_ecore_thread_handler, NULL);
}
#ifdef EFL_HAVE_THREADS
static int _ecore_thread_count = 0;
static Ecore_Event_Handler *del_handler = NULL;
static Eina_List *_ecore_active_job_threads = NULL;
static Eina_List *_ecore_pending_job_threads = NULL;
static Eina_List *_ecore_pending_job_threads_feedback = NULL;
static LK(_ecore_pending_job_threads_mutex);
static Eina_Hash *_ecore_thread_global_hash = NULL;
static LRWK(_ecore_thread_global_hash_lock);
static LK(_ecore_thread_global_hash_mutex);
static CD(_ecore_thread_global_hash_cond);
static Eina_Bool have_main_loop_thread = 0;
static Eina_Trash *_ecore_thread_worker_trash = NULL;
static int _ecore_thread_worker_count = 0;
static void *_ecore_thread_worker(Ecore_Pthread_Data *pth);
static Ecore_Pthread_Worker *_ecore_thread_worker_new(void);
static PH(get_main_loop_thread)(void)
{
static PH(main_loop_thread);
static pid_t main_loop_pid;
pid_t pid = getpid();
if (pid != main_loop_pid)
{
main_loop_pid = pid;
main_loop_thread = PHS();
have_main_loop_thread = 1;
}
return main_loop_thread;
}
static void
_ecore_thread_worker_free(Ecore_Pthread_Worker *worker)
{
if (_ecore_thread_worker_count > (_ecore_thread_count_max + 1) * 16)
{
free(worker);
return ;
}
eina_trash_push(&_ecore_thread_worker_trash, worker);
}
static void
_ecore_thread_data_free(void *data)
{
Ecore_Thread_Data *d = data;
if (d->cb) d->cb(d->data);
free(d);
}
static void
_ecore_thread_pipe_free(void *data __UNUSED__, void *event)
{
Ecore_Pipe *p = event;
if (eina_array_count_get(_ecore_thread_pipe) < 50)
eina_array_push(_ecore_thread_pipe, p);
else
ecore_pipe_del(p);
eina_threads_shutdown();
}
static Eina_Bool
_ecore_thread_pipe_del(void *data __UNUSED__, int type __UNUSED__, void *event __UNUSED__)
{
/* This is a hack to delay pipe destruction until we are out of its internal loop. */
return ECORE_CALLBACK_CANCEL;
}
static void
_ecore_thread_end(Ecore_Pthread_Data *pth, Ecore_Thread *work)
{
Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) work;
Ecore_Pipe *p;
if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue))
_ecore_thread_count--;
if (PHJ(pth->thread, p) != 0)
return ;
if (eina_list_count(_ecore_pending_job_threads) > 0
&& (unsigned int) _ecore_thread_count < eina_list_count(_ecore_pending_job_threads)
&& _ecore_thread_count < _ecore_thread_count_max)
{
/* One more thread should be created. */
INF("spawning threads because of still pending jobs.");
pth->death_job = _ecore_thread_worker_new();
if (!pth->p || !pth->death_job) goto end;
eina_threads_init();
if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
{
_ecore_thread_count++;
return ;
}
eina_threads_shutdown();
end:
if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
}
_ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
free(pth);
}
static void
_ecore_thread_kill(Ecore_Pthread_Worker *work)
{
if (work->cancel)
{
if (work->func_cancel)
work->func_cancel((void *) work->data, (Ecore_Thread *) work);
}
else
{
if (work->func_end)
work->func_end((void *) work->data, (Ecore_Thread *) work);
}
if (work->feedback_run)
{
ecore_pipe_del(work->u.feedback_run.notify);
if (work->u.feedback_run.direct_pipe)
eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe);
if (work->u.feedback_run.direct_worker)
_ecore_thread_worker_free(work->u.feedback_run.direct_worker);
}
CDD(work->cond);
LKD(work->mutex);
2010-07-23 10:52:50 -07:00
if (work->hash)
eina_hash_free(work->hash);
free(work);
}
static void
_ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
{
Ecore_Pthread_Worker *work;
if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
work = *(Ecore_Pthread_Worker **)buffer;
if (work->feedback_run)
{
if (work->u.feedback_run.send != work->u.feedback_run.received)
{
work->kill = EINA_TRUE;
return ;
}
}
_ecore_thread_kill(work);
}
static void
_ecore_notify_handler(void *data, void *buffer, unsigned int nbyte)
{
Ecore_Pthread_Worker *work = data;
void *user_data;
2010-07-10 04:08:20 -07:00
if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
2010-07-10 04:08:20 -07:00
user_data = *(void **)buffer;
work->u.feedback_run.received++;
if (work->u.feedback_run.func_notify)
work->u.feedback_run.func_notify((void *) work->data, (Ecore_Thread *) work, user_data);
/* Force reading all notify event before killing the thread */
if (work->kill && work->u.feedback_run.send == work->u.feedback_run.received)
{
_ecore_thread_kill(work);
}
}
static void
_ecore_short_job(Ecore_Pipe *end_pipe)
{
Ecore_Pthread_Worker *work;
while (_ecore_pending_job_threads)
{
LKL(_ecore_pending_job_threads_mutex);
if (!_ecore_pending_job_threads)
{
LKU(_ecore_pending_job_threads_mutex);
break;
}
work = eina_list_data_get(_ecore_pending_job_threads);
_ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads,
_ecore_pending_job_threads);
LKU(_ecore_pending_job_threads_mutex);
2010-09-29 23:09:20 -07:00
if (!work->cancel)
work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work);
if (work->reschedule)
{
work->reschedule = EINA_FALSE;
LKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
LKU(_ecore_pending_job_threads_mutex);
}
else
{
ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
}
}
}
static void
_ecore_feedback_job(Ecore_Pipe *end_pipe, PH(thread))
{
Ecore_Pthread_Worker *work;
while (_ecore_pending_job_threads_feedback)
{
LKL(_ecore_pending_job_threads_mutex);
if (!_ecore_pending_job_threads_feedback)
{
LKU(_ecore_pending_job_threads_mutex);
break;
}
work = eina_list_data_get(_ecore_pending_job_threads_feedback);
_ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback,
_ecore_pending_job_threads_feedback);
LKU(_ecore_pending_job_threads_mutex);
work->self = thread;
2010-09-29 23:09:20 -07:00
if (!work->cancel)
work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
if (work->reschedule)
{
work->reschedule = EINA_FALSE;
LKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, work);
LKU(_ecore_pending_job_threads_mutex);
}
else
{
ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
}
}
}
static void *
_ecore_direct_worker(Ecore_Pthread_Worker *work)
{
Ecore_Pthread_Data *pth;
#ifdef EFL_POSIX_THREADS
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
#endif
eina_sched_prio_drop();
pth = malloc(sizeof (Ecore_Pthread_Data));
if (!pth) return NULL;
pth->p = work->u.feedback_run.direct_pipe;
if (!pth->p)
{
free(pth);
return NULL;
}
pth->thread = PHS();
work->self = pth->thread;
work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
2010-07-10 04:08:20 -07:00
ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
work = work->u.feedback_run.direct_worker;
if (!work)
{
free(pth);
return NULL;
}
work->data = pth;
work->u.short_run.func_blocking = NULL;
2010-07-10 04:08:20 -07:00
work->func_end = (void *) _ecore_thread_end;
work->func_cancel = NULL;
work->cancel = EINA_FALSE;
work->feedback_run = EINA_FALSE;
work->kill = EINA_FALSE;
work->hash = NULL;
CDI(work->cond);
LKI(work->mutex);
2010-07-10 04:08:20 -07:00
ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
return pth->p;
}
static void *
_ecore_thread_worker(Ecore_Pthread_Data *pth)
{
Ecore_Pthread_Worker *work;
#ifdef EFL_POSIX_THREADS
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
#endif
eina_sched_prio_drop();
restart:
if (_ecore_pending_job_threads) _ecore_short_job(pth->p);
if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread);
/* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
LKL(_ecore_pending_job_threads_mutex);
if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
{
LKU(_ecore_pending_job_threads_mutex);
goto restart;
}
LKU(_ecore_pending_job_threads_mutex);
/* Sleep a little to prevent premature death */
#ifdef _WIN32
Sleep(1); /* around 50ms */
#else
usleep(200);
#endif
LKL(_ecore_pending_job_threads_mutex);
if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
{
LKU(_ecore_pending_job_threads_mutex);
goto restart;
}
LKU(_ecore_pending_job_threads_mutex);
work = pth->death_job;
if (!work) return NULL;
work->data = pth;
work->u.short_run.func_blocking = NULL;
2010-07-10 04:08:20 -07:00
work->func_end = (void *) _ecore_thread_end;
work->func_cancel = NULL;
work->cancel = EINA_FALSE;
work->feedback_run = EINA_FALSE;
work->kill = EINA_FALSE;
work->hash = NULL;
CDI(work->cond);
LKI(work->mutex);
2010-07-10 04:08:20 -07:00
ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
return pth->p;
}
#endif
static Ecore_Pthread_Worker *
_ecore_thread_worker_new(void)
{
Ecore_Pthread_Worker *result;
#ifdef EFL_HAVE_THREADS
result = eina_trash_pop(&_ecore_thread_worker_trash);
if (!result) result = malloc(sizeof (Ecore_Pthread_Worker));
else _ecore_thread_worker_count--;
return result;
#else
return malloc(sizeof (Ecore_Pthread_Worker));
#endif
}
void
_ecore_thread_init(void)
{
_ecore_thread_count_max = eina_cpu_count();
if (_ecore_thread_count_max <= 0)
_ecore_thread_count_max = 1;
ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
_ecore_thread_pipe = eina_array_new(8);
#ifdef EFL_HAVE_THREADS
del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
LKI(_ecore_pending_job_threads_mutex);
LRWKI(_ecore_thread_global_hash_lock);
LKI(_ecore_thread_global_hash_mutex);
CDI(_ecore_thread_global_hash_cond);
#endif
}
void
_ecore_thread_shutdown(void)
{
/* FIXME: If function are still running in the background, should we kill them ? */
Ecore_Pipe *p;
Eina_Array_Iterator it;
unsigned int i;
#ifdef EFL_HAVE_THREADS
Ecore_Pthread_Worker *work;
Ecore_Pthread_Data *pth;
LKL(_ecore_pending_job_threads_mutex);
EINA_LIST_FREE(_ecore_pending_job_threads, work)
{
if (work->func_cancel)
work->func_cancel((void *)work->data, (Ecore_Thread *) work);
free(work);
}
EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
{
if (work->func_cancel)
work->func_cancel((void *)work->data, (Ecore_Thread *) work);
free(work);
}
LKU(_ecore_pending_job_threads_mutex);
/* Improve emergency shutdown */
EINA_LIST_FREE(_ecore_active_job_threads, pth)
{
Ecore_Pipe *ep;
PHA(pth->thread);
PHJ(pth->thread, ep);
ecore_pipe_del(pth->p);
}
2010-07-23 10:52:50 -07:00
if (_ecore_thread_global_hash)
eina_hash_free(_ecore_thread_global_hash);
ecore_event_handler_del(del_handler);
have_main_loop_thread = 0;
del_handler = NULL;
LKD(_ecore_pending_job_threads_mutex);
LRWKD(_ecore_thread_global_hash_lock);
LKD(_ecore_thread_global_hash_mutex);
CDD(_ecore_thread_global_hash_cond);
#endif
EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it)
ecore_pipe_del(p);
eina_array_free(_ecore_thread_pipe);
_ecore_thread_pipe = NULL;
}
2010-10-17 00:03:28 -07:00
void
_ecore_thread_assert_main_loop_thread(const char *function)
{
Eina_Bool good;
#ifdef EFL_HAVE_THREADS
good = PHE(get_main_loop_thread(), PHS());
#else
good = EINA_TRUE;
#endif
if (!good)
{
EINA_LOG_CRIT("Call to %s from wrong thread!", function);
abort();
}
}
EAPI Ecore_Thread *
ecore_thread_run(Ecore_Thread_Cb func_blocking,
Ecore_Thread_Cb func_end,
Ecore_Thread_Cb func_cancel,
const void *data)
{
Ecore_Pthread_Worker *work;
#ifdef EFL_HAVE_THREADS
Ecore_Pthread_Data *pth = NULL;
#endif
if (!func_blocking) return NULL;
work = _ecore_thread_worker_new();
if (!work)
{
if (func_cancel)
func_cancel((void *) data, NULL);
return NULL;
}
work->u.short_run.func_blocking = func_blocking;
work->func_end = func_end;
work->func_cancel = func_cancel;
work->cancel = EINA_FALSE;
work->feedback_run = EINA_FALSE;
work->kill = EINA_FALSE;
work->reschedule = EINA_FALSE;
work->data = data;
#ifdef EFL_HAVE_THREADS
work->hash = NULL;
CDI(work->cond);
LKI(work->mutex);
LKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
if (_ecore_thread_count == _ecore_thread_count_max)
{
LKU(_ecore_pending_job_threads_mutex);
return (Ecore_Thread *) work;
}
LKU(_ecore_pending_job_threads_mutex);
/* One more thread could be created. */
pth = malloc(sizeof (Ecore_Pthread_Data));
if (!pth) goto on_error;
pth->p = _ecore_thread_pipe_get();
pth->death_job = _ecore_thread_worker_new();
if (!pth->p || !pth->death_job) goto on_error;
eina_threads_init();
if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
{
_ecore_thread_count++;
return (Ecore_Thread *) work;
}
eina_threads_shutdown();
on_error:
if (pth)
{
if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
free(pth);
}
if (_ecore_thread_count == 0)
{
LKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads = eina_list_remove(_ecore_pending_job_threads, work);
LKU(_ecore_pending_job_threads_mutex);
if (work->func_cancel)
work->func_cancel((void *) work->data, (Ecore_Thread *) work);
free(work);
work = NULL;
}
2010-07-10 04:08:20 -07:00
return (Ecore_Thread *) work;
#else
/*
If no thread and as we don't want to break app that rely on this
facility, we will lock the interface until we are done.
*/
do {
/* Handle reschedule by forcing it here. That would mean locking the app,
* would be better with an idler, but really to complex for a case where
* thread should really exist.
*/
work->reschedule = EINA_FALSE;
func_blocking((void *)data, (Ecore_Thread *) work);
if (work->cancel == EINA_FALSE) func_end((void *)data, (Ecore_Thread *) work);
else func_cancel((void *)data, (Ecore_Thread *) work);
} while (work->reschedule == EINA_TRUE);
free(work);
return NULL;
#endif
}
EAPI Eina_Bool
ecore_thread_cancel(Ecore_Thread *thread)
{
#ifdef EFL_HAVE_THREADS
Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread;
Eina_List *l;
if (!work)
return EINA_TRUE;
if (work->cancel)
return EINA_FALSE;
if (work->feedback_run)
{
if (work->kill)
return EINA_TRUE;
if (work->u.feedback_run.send != work->u.feedback_run.received)
goto on_exit;
}
LKL(_ecore_pending_job_threads_mutex);
if ((have_main_loop_thread) &&
(PHE(get_main_loop_thread(), PHS())))
{
if (!work->feedback_run)
EINA_LIST_FOREACH(_ecore_pending_job_threads, l, work)
{
if ((void *) work == (void *) thread)
{
_ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads, l);
LKU(_ecore_pending_job_threads_mutex);
if (work->func_cancel)
work->func_cancel((void *) work->data, (Ecore_Thread *) work);
free(work);
return EINA_TRUE;
}
}
else
EINA_LIST_FOREACH(_ecore_pending_job_threads_feedback, l, work)
{
if ((void *) work == (void *) thread)
{
_ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback, l);
LKU(_ecore_pending_job_threads_mutex);
if (work->func_cancel)
work->func_cancel((void *) work->data, (Ecore_Thread *) work);
free(work);
return EINA_TRUE;
}
}
}
LKU(_ecore_pending_job_threads_mutex);
/* Delay the destruction */
on_exit:
2010-10-06 06:01:35 -07:00
((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE;
return EINA_FALSE;
#else
return EINA_TRUE;
#endif
}
EAPI Eina_Bool
ecore_thread_check(Ecore_Thread *thread)
{
2010-07-10 04:08:20 -07:00
Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
if (!worker) return EINA_TRUE;
return worker->cancel;
}
EAPI Ecore_Thread *ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
2010-09-29 23:09:20 -07:00
Ecore_Thread_Notify_Cb func_notify,
Ecore_Thread_Cb func_end,
Ecore_Thread_Cb func_cancel,
2010-09-29 23:09:20 -07:00
const void *data,
Eina_Bool try_no_queue)
{
#ifdef EFL_HAVE_THREADS
Ecore_Pthread_Worker *worker;
Ecore_Pthread_Data *pth = NULL;
if (!func_heavy) return NULL;
worker = _ecore_thread_worker_new();
if (!worker) goto on_error;
worker->u.feedback_run.func_heavy = func_heavy;
worker->u.feedback_run.func_notify = func_notify;
worker->hash = NULL;
CDI(worker->cond);
LKI(worker->mutex);
worker->func_cancel = func_cancel;
worker->func_end = func_end;
worker->data = data;
worker->cancel = EINA_FALSE;
worker->feedback_run = EINA_TRUE;
worker->kill = EINA_FALSE;
worker->reschedule = EINA_FALSE;
worker->u.feedback_run.send = 0;
worker->u.feedback_run.received = 0;
worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
worker->u.feedback_run.direct_pipe = NULL;
worker->u.feedback_run.direct_worker = NULL;
if (!try_no_queue)
{
PH(t);
worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get();
worker->u.feedback_run.direct_worker = _ecore_thread_worker_new();
worker->no_queue = EINA_TRUE;
eina_threads_init();
if (PHC(t, _ecore_direct_worker, worker) == 0)
Fix priority dropping Lowering priority was wrong. Some bugs: 1) You don't lower the priority by setting the scheduler policy to some of the real-time ones (SCHED_RR or SCHER_FIFO). If you do so, you are actually increasing the priority of the workers and your main thread you be preempted and stalled until the workers complete their job. Fortunately this will only happen if your programming is running as root, as normal users (without CAP_SYS_NICE) are unable to set priority to real-time values. 2) setpriority() and getpriority() are not part of pthread and you can't use the id returned by pthread. Manpage explicitly says so on pthread_self(3): "The thread ID returned by pthread_self() is not the same thing as the kernel thread ID returned by a call to gettid(2)." Since glibc does not have a gettid, here we are using syscall(SYS_gettid) This patch was tested with the program below. Compile and run: $ gcc p_hello2.c -o p_hello2 -lpthread $ ./p_hello2 10 You'll see that the main thread remains with its priority and threads created by the main thread change their own niceness. #include <errno.h> #include <pthread.h> #include <sched.h> #include <stdio.h> #include <stdlib.h> #include <sys/resource.h> #include <sys/syscall.h> #include <sys/time.h> #include <sys/types.h> /* Lower priority of current thread. * * It's used by worker threads so they use up "bg cpu" as it was really intended * to work. If current thread is running with real-time priority, we decrease * our priority by 5. This is done in a portable way. Otherwise we are * running with SCHED_OTHER policy and there's no portable way to set the nice * level on current thread. In Linux, it does work and it's the only one that is * implemented. */ static void _ecore_thread_pri_drop(void) { struct sched_param param; int pol, prio, ret; pid_t tid; pthread_t pthread_id; pthread_id = pthread_self(); ret = pthread_getschedparam(pthread_id, &pol, &param); if (ret) { fprintf(stderr, "Unable to query sched parameters\n"); return; } if (pol == SCHED_RR || pol == SCHED_FIFO) { prio = sched_get_priority_max(pol); param.sched_priority += 5; if (prio > 0 && param.sched_priority > prio) param.sched_priority = prio; pthread_setschedparam(pthread_id, pol, &param); } #ifdef __linux__ else { tid = syscall(SYS_gettid); errno = 0; prio = getpriority(PRIO_PROCESS, tid); if (errno == 0) { prio += 5; if (prio > 19) prio = 19; setpriority(PRIO_PROCESS, tid, prio); } } #endif } /* * p_hello.c -- a hello program (in pthread) */ #define MAX_THREAD 1000 typedef struct { int id; } parm; void *hello(void *arg) { parm *p=(parm *)arg; pid_t tid; int prio; tid = syscall(SYS_gettid); printf("[%d] Hello from node %d\n", tid, p->id); pthread_yield(); printf("[%d] tid=%lu\n", tid); _ecore_thread_pri_drop(); prio = getpriority(PRIO_PROCESS, tid); printf("[%d] New nice value: %d\n", tid, prio); return (NULL); } void main(int argc, char* argv[]) { int n,i; pthread_t *threads; pthread_attr_t pthread_custom_attr; parm *p; pid_t tid; int prio; if (argc != 2) { printf ("Usage: %s n\n where n is no. of threads\n",argv[0]); exit(1); } n=atoi(argv[1]); if ((n < 1) || (n > MAX_THREAD)) { printf ("The no of thread should between 1 and %d.\n",MAX_THREAD); exit(1); } threads = (pthread_t *)malloc(n * sizeof(*threads)); pthread_attr_init(&pthread_custom_attr); p = (parm *)malloc(n * sizeof(parm)); /* Start up thread */ tid = syscall(SYS_gettid); for (i=0; i<n; i++) { prio = getpriority(PRIO_PROCESS, tid); printf("[%d] root thread nice value: %d\n", tid, prio); p[i].id=i; pthread_create(&threads[i], &pthread_custom_attr, hello, (void *)(p+i)); } /* Synchronize the completion of each thread. */ for (i=0; i<n; i++) { pthread_join(threads[i],NULL); } free(p); } SVN revision: 52039
2010-09-09 04:48:31 -07:00
return (Ecore_Thread *) worker;
eina_threads_shutdown();
}
worker->no_queue = EINA_FALSE;
LKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, worker);
if (_ecore_thread_count == _ecore_thread_count_max)
{
LKU(_ecore_pending_job_threads_mutex);
return (Ecore_Thread *) worker;
}
LKU(_ecore_pending_job_threads_mutex);
/* One more thread could be created. */
pth = malloc(sizeof (Ecore_Pthread_Data));
if (!pth) goto on_error;
pth->p = _ecore_thread_pipe_get();
pth->death_job = _ecore_thread_worker_new();
if (!pth->p || !pth->death_job) goto on_error;
eina_threads_init();
if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
{
_ecore_thread_count++;
return (Ecore_Thread *) worker;
}
eina_threads_shutdown();
on_error:
if (pth)
{
if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
free(pth);
}
if (_ecore_thread_count == 0)
{
LKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads_feedback = eina_list_remove(_ecore_pending_job_threads_feedback,
worker);
LKU(_ecore_pending_job_threads_mutex);
if (func_cancel) func_cancel((void *) data, NULL);
if (worker)
{
ecore_pipe_del(worker->u.feedback_run.notify);
free(worker);
worker = NULL;
}
}
2010-07-10 04:08:20 -07:00
return (Ecore_Thread *) worker;
#else
Ecore_Pthread_Worker worker;
(void) try_no_queue;
/*
If no thread and as we don't want to break app that rely on this
facility, we will lock the interface until we are done.
*/
worker.u.feedback_run.func_heavy = func_heavy;
worker.u.feedback_run.func_notify = func_notify;
worker.u.feedback_run.notify = NULL;
worker.u.feedback_run.send = 0;
worker.u.feedback_run.received = 0;
worker.func_cancel = func_cancel;
worker.func_end = func_end;
worker.data = data;
worker.cancel = EINA_FALSE;
worker.feedback_run = EINA_TRUE;
worker.kill = EINA_FALSE;
do {
worker.reschedule = EINA_FALSE;
func_heavy((void *)data, (Ecore_Thread *) &worker);
if (worker.cancel) func_cancel((void *)data, (Ecore_Thread *) &worker);
else func_end((void *)data, (Ecore_Thread *) &worker);
} while (worker.reschedule == EINA_TRUE);
return NULL;
#endif
}
EAPI Eina_Bool
ecore_thread_feedback(Ecore_Thread *thread, const void *data)
{
2010-07-10 04:08:20 -07:00
Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
if (!worker) return EINA_FALSE;
if (!worker->feedback_run) return EINA_FALSE;
#ifdef EFL_HAVE_THREADS
if (!PHE(worker->self, PHS())) return EINA_FALSE;
worker->u.feedback_run.send++;
ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *));
return EINA_TRUE;
#else
worker->u.feedback_run.func_notify((void*) worker->data, thread, (void*) data);
return EINA_TRUE;
#endif
}
EAPI Eina_Bool
ecore_thread_reschedule(Ecore_Thread *thread)
{
Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
if (!worker) return EINA_FALSE;
#ifdef EFL_HAVE_THREADS
if (!PHE(worker->self, PHS())) return EINA_FALSE;
#endif
worker->reschedule = EINA_TRUE;
return EINA_TRUE;
}
EAPI int
ecore_thread_active_get(void)
{
#ifdef EFL_HAVE_THREADS
return _ecore_thread_count;
#else
return 0;
#endif
}
EAPI int
ecore_thread_pending_get(void)
{
int ret;
#ifdef EFL_HAVE_THREADS
LKL(_ecore_pending_job_threads_mutex);
ret = eina_list_count(_ecore_pending_job_threads);
LKU(_ecore_pending_job_threads_mutex);
return ret;
#else
return 0;
#endif
}
EAPI int
ecore_thread_pending_feedback_get(void)
{
int ret;
#ifdef EFL_HAVE_THREADS
LKL(_ecore_pending_job_threads_mutex);
ret = eina_list_count(_ecore_pending_job_threads_feedback);
LKU(_ecore_pending_job_threads_mutex);
return ret;
#else
return 0;
#endif
}
EAPI int
ecore_thread_pending_total_get(void)
{
int ret;
#ifdef EFL_HAVE_THREADS
LKL(_ecore_pending_job_threads_mutex);
ret = eina_list_count(_ecore_pending_job_threads) + eina_list_count(_ecore_pending_job_threads_feedback);
LKU(_ecore_pending_job_threads_mutex);
return ret;
#else
return 0;
#endif
}
EAPI int
ecore_thread_max_get(void)
{
return _ecore_thread_count_max;
}
EAPI void
ecore_thread_max_set(int num)
{
if (num < 1) return;
/* avoid doing something hilarious by blocking dumb users */
if (num >= (2 * eina_cpu_count())) return;
_ecore_thread_count_max = num;
}
EAPI void
ecore_thread_max_reset(void)
{
_ecore_thread_count_max = eina_cpu_count();
}
EAPI int
ecore_thread_available_get(void)
{
int ret;
#ifdef EFL_HAVE_THREADS
LKL(_ecore_pending_job_threads_mutex);
ret = _ecore_thread_count_max - _ecore_thread_count;
LKU(_ecore_pending_job_threads_mutex);
return ret;
#else
return 0;
#endif
}
EAPI Eina_Bool
ecore_thread_local_data_add(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
{
Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
Ecore_Thread_Data *d;
Eina_Bool ret;
if ((!thread) || (!key) || (!value))
return EINA_FALSE;
#ifdef EFL_HAVE_THREADS
if (!PHE(worker->self, PHS())) return EINA_FALSE;
if (!worker->hash)
worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
if (!worker->hash)
return EINA_FALSE;
if (!(d = malloc(sizeof(Ecore_Thread_Data))))
return EINA_FALSE;
d->data = value;
d->cb = cb;
if (direct)
ret = eina_hash_direct_add(worker->hash, key, d);
else
ret = eina_hash_add(worker->hash, key, d);
CDB(worker->cond);
return ret;
#else
return EINA_TRUE;
#endif
}
EAPI void *
ecore_thread_local_data_set(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb)
{
Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
Ecore_Thread_Data *d, *r;
void *ret;
if ((!thread) || (!key) || (!value))
return NULL;
#ifdef EFL_HAVE_THREADS
if (!PHE(worker->self, PHS())) return NULL;
if (!worker->hash)
worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
if (!worker->hash)
return NULL;
if (!(d = malloc(sizeof(Ecore_Thread_Data))))
return NULL;
d->data = value;
d->cb = cb;
r = eina_hash_set(worker->hash, key, d);
CDB(worker->cond);
ret = r->data;
free(r);
return ret;
#else
return NULL;
#endif
}
EAPI void *
ecore_thread_local_data_find(Ecore_Thread *thread, const char *key)
{
Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
Ecore_Thread_Data *d;
if ((!thread) || (!key))
return NULL;
#ifdef EFL_HAVE_THREADS
if (!PHE(worker->self, PHS())) return NULL;
if (!worker->hash)
return NULL;
d = eina_hash_find(worker->hash, key);
if (d)
return d->data;
return NULL;
#else
return NULL;
#endif
}
EAPI Eina_Bool
ecore_thread_local_data_del(Ecore_Thread *thread, const char *key)
{
Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
if ((!thread) || (!key))
return EINA_FALSE;
#ifdef EFL_HAVE_THREADS
if (!PHE(worker->self, PHS())) return EINA_FALSE;
if (!worker->hash)
return EINA_FALSE;
return eina_hash_del_by_key(worker->hash, key);
#else
return EINA_TRUE;
#endif
}
2010-07-23 06:12:42 -07:00
EAPI Eina_Bool
ecore_thread_global_data_add(const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
{
Eina_Bool ret;
Ecore_Thread_Data *d;
if ((!key) || (!value))
return EINA_FALSE;
#ifdef EFL_HAVE_THREADS
LRWKWL(_ecore_thread_global_hash_lock);
if (!_ecore_thread_global_hash)
_ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
LRWKU(_ecore_thread_global_hash_lock);
if (!(d = malloc(sizeof(Ecore_Thread_Data))))
return EINA_FALSE;
d->data = value;
d->cb = cb;
if (!_ecore_thread_global_hash)
return EINA_FALSE;
LRWKWL(_ecore_thread_global_hash_lock);
if (direct)
ret = eina_hash_direct_add(_ecore_thread_global_hash, key, d);
else
ret = eina_hash_add(_ecore_thread_global_hash, key, d);
LRWKU(_ecore_thread_global_hash_lock);
CDB(_ecore_thread_global_hash_cond);
return ret;
#else
return EINA_TRUE;
#endif
}
EAPI void *
ecore_thread_global_data_set(const char *key, void *value, Eina_Free_Cb cb)
{
Ecore_Thread_Data *d, *r;
void *ret;
if ((!key) || (!value))
return NULL;
#ifdef EFL_HAVE_THREADS
LRWKWL(_ecore_thread_global_hash_lock);
if (!_ecore_thread_global_hash)
_ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
LRWKU(_ecore_thread_global_hash_lock);
if (!_ecore_thread_global_hash)
return NULL;
if (!(d = malloc(sizeof(Ecore_Thread_Data))))
return NULL;
d->data = value;
d->cb = cb;
LRWKWL(_ecore_thread_global_hash_lock);
r = eina_hash_set(_ecore_thread_global_hash, key, d);
LRWKU(_ecore_thread_global_hash_lock);
CDB(_ecore_thread_global_hash_cond);
ret = r->data;
free(r);
return ret;
#else
return NULL;
#endif
}
EAPI void *
ecore_thread_global_data_find(const char *key)
{
Ecore_Thread_Data *ret;
if (!key)
return NULL;
#ifdef EFL_HAVE_THREADS
if (!_ecore_thread_global_hash) return NULL;
LRWKRL(_ecore_thread_global_hash_lock);
ret = eina_hash_find(_ecore_thread_global_hash, key);
LRWKU(_ecore_thread_global_hash_lock);
if (ret)
return ret->data;
return NULL;
#else
return NULL;
#endif
}
EAPI Eina_Bool
ecore_thread_global_data_del(const char *key)
{
Eina_Bool ret;
if (!key)
return EINA_FALSE;
#ifdef EFL_HAVE_THREADS
if (!_ecore_thread_global_hash)
return EINA_FALSE;
LRWKWL(_ecore_thread_global_hash_lock);
ret = eina_hash_del_by_key(_ecore_thread_global_hash, key);
LRWKU(_ecore_thread_global_hash_lock);
return ret;
#else
return EINA_TRUE;
#endif
}
EAPI void *
ecore_thread_global_data_wait(const char *key, double seconds)
{
double tm = 0;
Ecore_Thread_Data *ret = NULL;
if (!key)
return NULL;
#ifdef EFL_HAVE_THREADS
if (!_ecore_thread_global_hash)
return NULL;
if (seconds > 0)
tm = ecore_time_get() + seconds;
while (1)
{
#ifndef _WIN32
struct timespec t = { 0, 0 };
t.tv_sec = (long int)tm;
t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000);
#else
struct timeval t = { 0, 0 };
t.tv_sec = (long int)tm;
t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000);
#endif
LRWKRL(_ecore_thread_global_hash_lock);
ret = eina_hash_find(_ecore_thread_global_hash, key);
LRWKU(_ecore_thread_global_hash_lock);
if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get())))
break;
LKL(_ecore_thread_global_hash_mutex);
CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t);
LKU(_ecore_thread_global_hash_mutex);
}
if (ret) return ret->data;
return NULL;
#else
return NULL;
#endif
}