From 6152601e1fcb28fa44f06519b41fb4639af3b088 Mon Sep 17 00:00:00 2001 From: Cedric BAIL Date: Mon, 20 Feb 2012 15:57:18 +0000 Subject: [PATCH] ecore: rewrite of Ecore_Thread internal to use Eina_Lock and ecore_main_loop_thread_safe_call_async. NOTES: It is now safer and faster. I doubt I will have more time before the release to finish ecore_thread_message_run, nor to make the shutdown nicer. SVN revision: 68164 --- legacy/ecore/ChangeLog | 3 + legacy/ecore/NEWS | 4 +- legacy/ecore/src/lib/ecore/ecore_thread.c | 716 ++++++++++------------ 3 files changed, 344 insertions(+), 379 deletions(-) diff --git a/legacy/ecore/ChangeLog b/legacy/ecore/ChangeLog index db8999cd84..070de1c277 100644 --- a/legacy/ecore/ChangeLog +++ b/legacy/ecore/ChangeLog @@ -505,3 +505,6 @@ * Improve callbacks in ecore_evas to use typedefs for readability. +2012-02-20 Cedric Bail + + * Rewrite internal of Ecore_Thread to use Eina_Lock and ecore_main_loop_thread_safe_call_async. diff --git a/legacy/ecore/NEWS b/legacy/ecore/NEWS index 05dc4f845e..42176112d0 100644 --- a/legacy/ecore/NEWS +++ b/legacy/ecore/NEWS @@ -31,7 +31,9 @@ Improvements: - certificates can now be added for STARTTTLS * ecore_win32: - fix modifiers value on Windows XP - + * ecore_thread: + - use eina_lock + - use Ecore thread safe async call Ecore 1.1.0 diff --git a/legacy/ecore/src/lib/ecore/ecore_thread.c b/legacy/ecore/src/lib/ecore/ecore_thread.c index 4444ad43b4..96b0b8a15e 100644 --- a/legacy/ecore/src/lib/ecore/ecore_thread.c +++ b/legacy/ecore/src/lib/ecore/ecore_thread.c @@ -17,6 +17,25 @@ #ifdef EFL_HAVE_THREADS +# define LK(x) Eina_Lock x +# define LKI(x) eina_lock_new(&(x)) +# define LKD(x) eina_lock_free(&(x)) +# define LKL(x) eina_lock_take(&(x)) +# define LKU(x) eina_lock_release(&(x)) + +# define CD(x) Eina_Condition x +# define CDI(x, m) eina_condition_new(&(x), &(m)) +# define CDD(x) eina_condition_free(&(x)) +# define CDB(x) eina_condition_broadcast(&(x)) +# define CDW(x, t) eina_condition_timedwait(&(x), t) + +# define LRWK(x) Eina_RWLock x +# define LRWKI(x) eina_rwlock_new(&(x)); +# define LRWKD(x) eina_rwlock_free(&(x)); +# define LRWKWL(x) eina_rwlock_take_write(&(x)); +# define LRWKRL(x) eina_rwlock_take_read(&(x)); +# define LRWKU(x) eina_rwlock_release(&(x)); + # ifdef EFL_HAVE_POSIX_THREADS # include # ifdef __linux__ @@ -31,28 +50,9 @@ # define PHE(x, y) pthread_equal(x, y) # define PHS() pthread_self() # define PHC(x, f, d) pthread_create(&(x), NULL, (void *)f, d) -# define PHJ(x, p) pthread_join(x, (void **)(&(p))) +# define PHJ(x) pthread_join(x, NULL) # define PHA(x) pthread_cancel(x) -# define CD(x) pthread_cond_t x -# define CDI(x) pthread_cond_init(&(x), NULL); -# define CDD(x) pthread_cond_destroy(&(x)); -# define CDB(x) pthread_cond_broadcast(&(x)); -# define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t); - -# define LK(x) pthread_mutex_t x -# define LKI(x) pthread_mutex_init(&(x), NULL); -# define LKD(x) pthread_mutex_destroy(&(x)); -# define LKL(x) pthread_mutex_lock(&(x)); -# define LKU(x) pthread_mutex_unlock(&(x)); - -# define LRWK(x) pthread_rwlock_t x -# define LRWKI(x) pthread_rwlock_init(&(x), NULL); -# define LRWKD(x) pthread_rwlock_destroy(&(x)); -# define LRWKWL(x) pthread_rwlock_wrlock(&(x)); -# define LRWKRL(x) pthread_rwlock_rdlock(&(x)); -# define LRWKU(x) pthread_rwlock_unlock(&(x)); - # else /* EFL_HAVE_WIN32_THREADS */ # define WIN32_LEAN_AND_MEAN @@ -108,209 +108,9 @@ _ecore_thread_win32_join(win32_thread *x, return 0; } -# define PHJ(x, p) _ecore_thread_win32_join(x, (void **)(&(p))) +# define PHJ(x) _ecore_thread_win32_join(x, NULL) # define PHA(x) TerminateThread(x->thread, 0) -# define LK(x) HANDLE x -# define LKI(x) x = CreateMutex(NULL, FALSE, NULL) -# define LKD(x) CloseHandle(x) -# define LKL(x) WaitForSingleObject(x, INFINITE) -# define LKU(x) ReleaseMutex(x) - -typedef struct -{ - HANDLE semaphore; - LONG threads_count; - CRITICAL_SECTION threads_count_lock; -} win32_cond; - -# define CD(x) win32_cond * x - -# define CDI(x) \ - do { \ - x = (win32_cond *)calloc(1, sizeof(win32_cond)); \ - if (x) \ - { \ - x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \ - if (x->semaphore) \ - InitializeCriticalSection(&x->threads_count_lock); \ - else \ - { \ - free(x); \ - x = NULL; \ - } \ - } \ - } while (0) - -# define CDD(x) \ - do { \ - CloseHandle(x->semaphore); \ - free(x); \ - x = NULL; \ - } while (0) - -# define CDB(x) \ - do { \ - EnterCriticalSection(&x->threads_count_lock); \ - if (x->threads_count > 0) \ - ReleaseSemaphore(x->semaphore, x->threads_count, NULL); \ - LeaveCriticalSection (&x->threads_count_lock); \ - } while (0) - -int -_ecore_thread_win32_cond_timedwait(win32_cond *c, - HANDLE *external_mutex, - struct timeval *t) -{ - DWORD res; - DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000); - LKL(external_mutex); - EnterCriticalSection (&c->threads_count_lock); - c->threads_count++; - LeaveCriticalSection (&c->threads_count_lock); - LKU(external_mutex); - res = WaitForSingleObject(c->semaphore, val); - if (res == WAIT_OBJECT_0) - return 0; - else - return -1; -} - -# define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t) - -typedef struct -{ - LONG readers_count; - LONG writers_count; - int readers; - int writers; - LK(mutex); - CD(cond_read); - CD(cond_write); -} win32_rwl; - -# define LRWK(x) win32_rwl * x -# define LRWKI(x) \ - do { \ - x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \ - if (x) \ - { \ - LKI(x->mutex); \ - if (x->mutex) \ - { \ - CDI(x->cond_read); \ - if (x->cond_read) \ - { \ - CDI(x->cond_write); \ - if (!x->cond_write) \ - { \ - CDD(x->cond_read); \ - LKD(x->mutex); \ - free(x); \ - x = NULL; \ - } \ - } \ - else \ - { \ - LKD(x->mutex); \ - free(x); \ - x = NULL; \ - } \ - } \ - else \ - { \ - free(x); \ - x = NULL; \ - } \ - } \ - } while (0) - -# define LRWKD(x) \ - do { \ - LKU(x->mutex); \ - LKD(x->mutex); \ - CDD(x->cond_write); \ - CDD(x->cond_read); \ - free(x); \ - } while (0) -# define LRWKWL(x) \ - do { \ - DWORD res; \ - LKU(x->mutex); \ - if (x->writers || x->readers > 0) \ - { \ - x->writers_count++; \ - while (x->writers || x->readers > 0) \ - { \ - EnterCriticalSection(&x->cond_write->threads_count_lock); \ - x->cond_read->threads_count++; \ - LeaveCriticalSection(&x->cond_write->threads_count_lock); \ - res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \ - if (res != WAIT_OBJECT_0) break; \ - } \ - x->writers_count--; \ - } \ - if (res == 0) x->writers_count = 1; \ - LKU(x->mutex); \ - } while (0) -# define LRWKRL(x) \ - do { \ - DWORD res; \ - LKL(x->mutex); \ - if (x->writers) \ - { \ - x->readers_count++; \ - while (x->writers) \ - { \ - EnterCriticalSection(&x->cond_write->threads_count_lock); \ - x->cond_read->threads_count++; \ - LeaveCriticalSection(&x->cond_write->threads_count_lock); \ - res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \ - if (res != WAIT_OBJECT_0) break; \ - } \ - x->readers_count--; \ - } \ - if (res == 0) \ - x->readers++; \ - LKU(x->mutex); \ - } while (0) -# define LRWKU(x) \ - do { \ - LKL(x->mutex); \ - if (x->writers) \ - { \ - x->writers = 0; \ - if (x->readers_count == 1) \ - { \ - EnterCriticalSection(&x->cond_read->threads_count_lock); \ - if (x->cond_read->threads_count > 0) \ - ReleaseSemaphore(x->cond_read->semaphore, 1, 0); \ - LeaveCriticalSection(&x->cond_read->threads_count_lock); \ - } \ - else if (x->readers_count > 0) \ - CDB(x->cond_read); \ - else if (x->writers_count > 0) \ - { \ - EnterCriticalSection (&x->cond_write->threads_count_lock); \ - if (x->cond_write->threads_count > 0) \ - ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \ - LeaveCriticalSection (&x->cond_write->threads_count_lock); \ - } \ - } \ - else if (x->readers > 0) \ - { \ - x->readers--; \ - if (x->readers == 0 && x->writers_count > 0) \ - { \ - EnterCriticalSection (&x->cond_write->threads_count_lock); \ - if (x->cond_write->threads_count > 0) \ - ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \ - LeaveCriticalSection (&x->cond_write->threads_count_lock); \ - } \ - } \ - LKU(x->mutex); \ - } while (0) - # endif #endif @@ -336,14 +136,24 @@ struct _Ecore_Pthread_Worker { Ecore_Thread_Cb func_heavy; Ecore_Thread_Notify_Cb func_notify; - Ecore_Pipe *notify; - Ecore_Pipe *direct_pipe; Ecore_Pthread_Worker *direct_worker; int send; int received; } feedback_run; + struct { + Ecore_Thread_Cb func_main; + Ecore_Thread_Notify_Cb func_notify; + + Ecore_Pipe *send; + Ecore_Pthread_Worker *direct_worker; + + struct { + int send; + int received; + } from, to; + } message_run; } u; Ecore_Thread_Cb func_cancel; @@ -357,47 +167,63 @@ struct _Ecore_Pthread_Worker const void *data; - Eina_Bool cancel : 1; - Eina_Bool feedback_run : 1; - Eina_Bool kill : 1; - Eina_Bool reschedule : 1; - Eina_Bool no_queue : 1; + volatile int cancel; + +#ifdef EFL_HAVE_THREADS + LK(cancel_mutex); +#endif + + Eina_Bool message_run : 1; + Eina_Bool feedback_run : 1; + Eina_Bool kill : 1; + Eina_Bool reschedule : 1; + Eina_Bool no_queue : 1; }; #ifdef EFL_HAVE_THREADS typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data; - struct _Ecore_Pthread_Data { Ecore_Pthread_Worker *death_job; - Ecore_Pipe *p; void *data; PH(thread); }; + +typedef struct _Ecore_Pthread_Notify Ecore_Pthread_Notify; +struct _Ecore_Pthread_Notify +{ + Ecore_Pthread_Worker *work; + const void *user_data; +}; + +typedef void *(*Ecore_Thread_Sync_Cb)(void* data, Ecore_Thread *thread); + +typedef struct _Ecore_Pthread_Message Ecore_Pthread_Message; +struct _Ecore_Pthread_Message +{ + union { + Ecore_Thread_Cb async; + Ecore_Thread_Sync_Cb sync; + } u; + + const void *data; + + int code; + + Eina_Bool callback : 1; + Eina_Bool sync : 1; +}; + #endif static int _ecore_thread_count_max = 0; -static int ECORE_THREAD_PIPE_DEL = 0; -static Eina_Array *_ecore_thread_pipe = NULL; #ifdef EFL_HAVE_THREADS -static void _ecore_thread_handler(void *data __UNUSED__, - void *buffer, - unsigned int nbyte); - -static Ecore_Pipe * -_ecore_thread_pipe_get(void) -{ - if (eina_array_count(_ecore_thread_pipe) > 0) - return eina_array_pop(_ecore_thread_pipe); - - return ecore_pipe_add(_ecore_thread_handler, NULL); -} +static void _ecore_thread_handler(void *data); static int _ecore_thread_count = 0; -static Ecore_Event_Handler *del_handler = NULL; static Eina_List *_ecore_active_job_threads = NULL; static Eina_List *_ecore_pending_job_threads = NULL; static Eina_List *_ecore_pending_job_threads_feedback = NULL; @@ -453,39 +279,16 @@ _ecore_thread_data_free(void *data) free(d); } -static void -_ecore_thread_pipe_free(void *data __UNUSED__, - void *event) -{ - Ecore_Pipe *p = event; - - if (eina_array_count(_ecore_thread_pipe) < 50) - eina_array_push(_ecore_thread_pipe, p); - else - ecore_pipe_del(p); - eina_threads_shutdown(); -} - -static Eina_Bool -_ecore_thread_pipe_del(void *data __UNUSED__, - int type __UNUSED__, - void *event __UNUSED__) -{ - /* This is a hack to delay pipe destruction until we are out of its internal loop. */ - return ECORE_CALLBACK_CANCEL; -} - static void _ecore_thread_end(Ecore_Pthread_Data *pth, Ecore_Thread *work) { Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)work; - Ecore_Pipe *p; - if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue)) + if (!worker->message_run || !worker->feedback_run || (worker->feedback_run && !worker->no_queue)) _ecore_thread_count--; - if (PHJ(pth->thread, p) != 0) + if (PHJ(pth->thread) != 0) return; if (eina_list_count(_ecore_pending_job_threads) > 0 @@ -496,7 +299,7 @@ _ecore_thread_end(Ecore_Pthread_Data *pth, INF("spawning threads because of still pending jobs."); pth->death_job = _ecore_thread_worker_new(); - if (!pth->p || !pth->death_job) goto end; + if (!pth->death_job) goto end; eina_threads_init(); @@ -514,7 +317,6 @@ end: _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth); - ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL); free(pth); } @@ -534,10 +336,6 @@ _ecore_thread_kill(Ecore_Pthread_Worker *work) if (work->feedback_run) { - ecore_pipe_del(work->u.feedback_run.notify); - - if (work->u.feedback_run.direct_pipe) - eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe); if (work->u.feedback_run.direct_worker) _ecore_thread_worker_free(work->u.feedback_run.direct_worker); } @@ -549,15 +347,9 @@ _ecore_thread_kill(Ecore_Pthread_Worker *work) } static void -_ecore_thread_handler(void *data __UNUSED__, - void *buffer, - unsigned int nbyte) +_ecore_thread_handler(void *data) { - Ecore_Pthread_Worker *work; - - if (nbyte != sizeof (Ecore_Pthread_Worker *)) return; - - work = *(Ecore_Pthread_Worker **)buffer; + Ecore_Pthread_Worker *work = data; if (work->feedback_run) { @@ -572,16 +364,17 @@ _ecore_thread_handler(void *data __UNUSED__, } static void -_ecore_notify_handler(void *data, - void *buffer, - unsigned int nbyte) +_ecore_nothing_handler(void *data __UNUSED__, void *buffer __UNUSED__, unsigned int nbyte __UNUSED__) { - Ecore_Pthread_Worker *work = data; - void *user_data; +} - if (nbyte != sizeof (Ecore_Pthread_Worker *)) return; +static void +_ecore_notify_handler(void *data) +{ + Ecore_Pthread_Notify *notify = data; + Ecore_Pthread_Worker *work = notify->work; + void *user_data = (void*) notify->user_data; - user_data = *(void **)buffer; work->u.feedback_run.received++; if (work->u.feedback_run.func_notify) @@ -595,13 +388,58 @@ _ecore_notify_handler(void *data, } static void -_ecore_short_job(Ecore_Pipe *end_pipe, - PH(thread)) +_ecore_message_notify_handler(void *data) +{ + Ecore_Pthread_Notify *notify = data; + Ecore_Pthread_Worker *work = notify->work; + Ecore_Pthread_Message *user_data = (void *) notify->user_data; + Eina_Bool delete = EINA_TRUE; + + work->u.message_run.from.received++; + + if (!user_data->callback) + { + if (work->u.message_run.func_notify) + work->u.message_run.func_notify((void *) work->data, (Ecore_Thread *) work, (void *) user_data->data); + } + else + { + if (user_data->sync) + { + user_data->data = user_data->u.sync((void*) user_data->data, (Ecore_Thread *) work); + user_data->callback = EINA_FALSE; + user_data->code = INT_MAX; + ecore_pipe_write(work->u.message_run.send, &user_data, sizeof (Ecore_Pthread_Message *)); + + delete = EINA_FALSE; + } + else + { + user_data->u.async((void*) user_data->data, (Ecore_Thread *) work); + } + } + + if (delete) + { + free(user_data); + } + + /* Force reading all notify event before killing the thread */ + if (work->kill && work->u.message_run.from.send == work->u.message_run.from.received) + { + _ecore_thread_kill(work); + } +} + +static void +_ecore_short_job(PH(thread)) { Ecore_Pthread_Worker *work; while (_ecore_pending_job_threads) { + int cancel; + LKL(_ecore_pending_job_threads_mutex); if (!_ecore_pending_job_threads) @@ -616,9 +454,12 @@ _ecore_short_job(Ecore_Pipe *end_pipe, LKU(_ecore_pending_job_threads_mutex); + LKL(work->cancel_mutex); + cancel = work->cancel; + LKU(work->cancel_mutex); work->self = thread; - if (!work->cancel) - work->u.short_run.func_blocking((void *)work->data, (Ecore_Thread *)work); + if (!cancel) + work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work); if (work->reschedule) { @@ -630,19 +471,20 @@ _ecore_short_job(Ecore_Pipe *end_pipe, } else { - ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *)); + ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work); } } } static void -_ecore_feedback_job(Ecore_Pipe *end_pipe, - PH(thread)) +_ecore_feedback_job(PH(thread)) { Ecore_Pthread_Worker *work; while (_ecore_pending_job_threads_feedback) { + int cancel; + LKL(_ecore_pending_job_threads_mutex); if (!_ecore_pending_job_threads_feedback) @@ -657,9 +499,12 @@ _ecore_feedback_job(Ecore_Pipe *end_pipe, LKU(_ecore_pending_job_threads_mutex); + LKL(work->cancel_mutex); + cancel = work->cancel; + LKU(work->cancel_mutex); work->self = thread; - if (!work->cancel) - work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work); + if (!cancel) + work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work); if (work->reschedule) { @@ -671,7 +516,7 @@ _ecore_feedback_job(Ecore_Pipe *end_pipe, } else { - ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *)); + ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work); } } } @@ -691,20 +536,20 @@ _ecore_direct_worker(Ecore_Pthread_Worker *work) pth = malloc(sizeof (Ecore_Pthread_Data)); if (!pth) return NULL; - pth->p = work->u.feedback_run.direct_pipe; - if (!pth->p) - { - free(pth); - return NULL; - } pth->thread = PHS(); work->self = pth->thread; - work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work); + if (work->message_run) + work->u.message_run.func_main((void *) work->data, (Ecore_Thread *) work); + else + work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work); - ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *)); + ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work); - work = work->u.feedback_run.direct_worker; + if (work->message_run) + work = work->u.message_run.direct_worker; + else + work = work->u.feedback_run.direct_worker; if (!work) { free(pth); @@ -717,14 +562,17 @@ _ecore_direct_worker(Ecore_Pthread_Worker *work) work->func_cancel = NULL; work->cancel = EINA_FALSE; work->feedback_run = EINA_FALSE; + work->message_run = EINA_FALSE; + work->no_queue = EINA_FALSE; work->kill = EINA_FALSE; work->hash = NULL; - CDI(work->cond); + LKI(work->cancel_mutex); LKI(work->mutex); + CDI(work->cond, work->mutex); - ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *)); + ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work); - return pth->p; + return NULL; } static void * @@ -740,8 +588,8 @@ _ecore_thread_worker(Ecore_Pthread_Data *pth) eina_sched_prio_drop(); restart: - if (_ecore_pending_job_threads) _ecore_short_job(pth->p, pth->thread); - if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread); + if (_ecore_pending_job_threads) _ecore_short_job(pth->thread); + if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->thread); /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */ @@ -777,14 +625,17 @@ restart: work->func_cancel = NULL; work->cancel = EINA_FALSE; work->feedback_run = EINA_FALSE; + work->message_run = EINA_FALSE; work->kill = EINA_FALSE; + work->no_queue = EINA_FALSE; work->hash = NULL; - CDI(work->cond); + LKI(work->cancel_mutex); LKI(work->mutex); + CDI(work->cond, work->mutex); - ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *)); + ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work); - return pth->p; + return NULL; } #endif @@ -813,16 +664,11 @@ _ecore_thread_init(void) if (_ecore_thread_count_max <= 0) _ecore_thread_count_max = 1; - ECORE_THREAD_PIPE_DEL = ecore_event_type_new(); - _ecore_thread_pipe = eina_array_new(8); - #ifdef EFL_HAVE_THREADS - del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL); - LKI(_ecore_pending_job_threads_mutex); LRWKI(_ecore_thread_global_hash_lock); LKI(_ecore_thread_global_hash_mutex); - CDI(_ecore_thread_global_hash_cond); + CDI(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex); #endif } @@ -830,10 +676,6 @@ void _ecore_thread_shutdown(void) { /* FIXME: If function are still running in the background, should we kill them ? */ - Ecore_Pipe *p; - Eina_Array_Iterator it; - unsigned int i; - #ifdef EFL_HAVE_THREADS Ecore_Pthread_Worker *work; Ecore_Pthread_Data *pth; @@ -843,46 +685,39 @@ _ecore_thread_shutdown(void) EINA_LIST_FREE(_ecore_pending_job_threads, work) { if (work->func_cancel) - work->func_cancel((void *)work->data, (Ecore_Thread *)work); + work->func_cancel((void *)work->data, (Ecore_Thread *) work); free(work); } EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work) { if (work->func_cancel) - work->func_cancel((void *)work->data, (Ecore_Thread *)work); + work->func_cancel((void *)work->data, (Ecore_Thread *) work); free(work); } LKU(_ecore_pending_job_threads_mutex); - /* Improve emergency shutdown */ + /* FIXME: Improve emergency shutdown, now that we use async call, we can do something */ EINA_LIST_FREE(_ecore_active_job_threads, pth) { - Ecore_Pipe *ep; - PHA(pth->thread); - PHJ(pth->thread, ep); - - ecore_pipe_del(pth->p); + PHJ(pth->thread); } if (_ecore_thread_global_hash) eina_hash_free(_ecore_thread_global_hash); - _ecore_event_handler_del(del_handler); have_main_loop_thread = 0; - del_handler = NULL; + + while ((work = eina_trash_pop(&_ecore_thread_worker_trash))) + { + free(work); + } LKD(_ecore_pending_job_threads_mutex); LRWKD(_ecore_thread_global_hash_lock); LKD(_ecore_thread_global_hash_mutex); CDD(_ecore_thread_global_hash_cond); #endif - - EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it) - ecore_pipe_del(p); - - eina_array_free(_ecore_thread_pipe); - _ecore_thread_pipe = NULL; } void @@ -927,15 +762,19 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking, work->func_cancel = func_cancel; work->cancel = EINA_FALSE; work->feedback_run = EINA_FALSE; + work->message_run = EINA_FALSE; work->kill = EINA_FALSE; work->reschedule = EINA_FALSE; + work->no_queue = EINA_FALSE; work->data = data; #ifdef EFL_HAVE_THREADS + LKI(work->cancel_mutex); + work->self = 0; work->hash = NULL; - CDI(work->cond); LKI(work->mutex); + CDI(work->cond, work->mutex); LKL(_ecore_pending_job_threads_mutex); _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work); @@ -952,9 +791,8 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking, pth = malloc(sizeof (Ecore_Pthread_Data)); if (!pth) goto on_error; - pth->p = _ecore_thread_pipe_get(); pth->death_job = _ecore_thread_worker_new(); - if (!pth->p || !pth->death_job) goto on_error; + if (!pth->death_job) goto on_error; eina_threads_init(); @@ -969,7 +807,6 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking, on_error: if (pth) { - if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p); if (pth->death_job) _ecore_thread_worker_free(pth->death_job); free(pth); } @@ -981,7 +818,11 @@ on_error: LKU(_ecore_pending_job_threads_mutex); if (work->func_cancel) - work->func_cancel((void *)work->data, (Ecore_Thread *)work); + work->func_cancel((void *) work->data, (Ecore_Thread *) work); + + CDD(work->cond); + LKD(work->mutex); + LKD(work->cancel_mutex); free(work); work = NULL; } @@ -1013,12 +854,16 @@ EAPI Eina_Bool ecore_thread_cancel(Ecore_Thread *thread) { #ifdef EFL_HAVE_THREADS - Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread; + Ecore_Pthread_Worker *volatile work = (Ecore_Pthread_Worker *)thread; Eina_List *l; + int cancel; if (!work) return EINA_TRUE; - if (work->cancel) + LKL(work->cancel_mutex); + cancel = work->cancel; + LKU(work->cancel_mutex); + if (cancel) return EINA_FALSE; if (work->feedback_run) @@ -1070,9 +915,14 @@ ecore_thread_cancel(Ecore_Thread *thread) LKU(_ecore_pending_job_threads_mutex); + work = (Ecore_Pthread_Worker *)thread; + /* Delay the destruction */ -on_exit: - ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE; + on_exit: + LKL(work->cancel_mutex); + work->cancel = EINA_TRUE; + LKU(work->cancel_mutex); + return EINA_FALSE; #else (void) thread; @@ -1083,10 +933,23 @@ on_exit: EAPI Eina_Bool ecore_thread_check(Ecore_Thread *thread) { - Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; + Ecore_Pthread_Worker *volatile worker = (Ecore_Pthread_Worker *) thread; + int cancel; if (!worker) return EINA_TRUE; - return worker->cancel; +#ifdef EFL_HAVE_THREADS + LKL(worker->cancel_mutex); +#endif + cancel = worker->cancel; + /* FIXME: there is an insane bug driving me nuts here. I don't know if + it's a race condition, some cache issue or some alien attack on our software. + But ecore_thread_check will only work correctly with a printf, all the volatile, + lock and even usleep don't help here... */ + /* fprintf(stderr, "wc: %i\n", cancel); */ +#ifdef EFL_HAVE_THREADS + LKU(worker->cancel_mutex); +#endif + return cancel; } EAPI Ecore_Thread * @@ -1109,12 +972,14 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, worker->u.feedback_run.func_heavy = func_heavy; worker->u.feedback_run.func_notify = func_notify; worker->hash = NULL; - CDI(worker->cond); LKI(worker->mutex); + CDI(worker->cond, worker->mutex); worker->func_cancel = func_cancel; worker->func_end = func_end; worker->data = data; + LKI(worker->cancel_mutex); worker->cancel = EINA_FALSE; + worker->message_run = EINA_FALSE; worker->feedback_run = EINA_TRUE; worker->kill = EINA_FALSE; worker->reschedule = EINA_FALSE; @@ -1123,15 +988,12 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, worker->u.feedback_run.send = 0; worker->u.feedback_run.received = 0; - worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker); - worker->u.feedback_run.direct_pipe = NULL; worker->u.feedback_run.direct_worker = NULL; if (!try_no_queue) { PH(t); - worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get(); worker->u.feedback_run.direct_worker = _ecore_thread_worker_new(); worker->no_queue = EINA_TRUE; @@ -1140,6 +1002,12 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, if (PHC(t, _ecore_direct_worker, worker) == 0) return (Ecore_Thread *)worker; + if (worker->u.feedback_run.direct_worker) + { + _ecore_thread_worker_free(worker->u.feedback_run.direct_worker); + worker->u.feedback_run.direct_worker = NULL; + } + eina_threads_shutdown(); } @@ -1160,9 +1028,8 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, pth = malloc(sizeof (Ecore_Pthread_Data)); if (!pth) goto on_error; - pth->p = _ecore_thread_pipe_get(); pth->death_job = _ecore_thread_worker_new(); - if (!pth->p || !pth->death_job) goto on_error; + if (!pth->death_job) goto on_error; eina_threads_init(); @@ -1177,7 +1044,6 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, on_error: if (pth) { - if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p); if (pth->death_job) _ecore_thread_worker_free(pth->death_job); free(pth); } @@ -1193,7 +1059,8 @@ on_error: if (worker) { - ecore_pipe_del(worker->u.feedback_run.notify); + CDD(worker->cond); + LKD(worker->mutex); free(worker); worker = NULL; } @@ -1219,6 +1086,7 @@ on_error: worker.data = data; worker.cancel = EINA_FALSE; worker.feedback_run = EINA_TRUE; + worker.message_run = EINA_FALSE; worker.kill = EINA_FALSE; do { @@ -1241,13 +1109,48 @@ ecore_thread_feedback(Ecore_Thread *thread, Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; if (!worker) return EINA_FALSE; - if (!worker->feedback_run) return EINA_FALSE; #ifdef EFL_HAVE_THREADS if (!PHE(worker->self, PHS())) return EINA_FALSE; - worker->u.feedback_run.send++; - ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *)); + if (worker->feedback_run) + { + Ecore_Pthread_Notify *notify; + + notify = malloc(sizeof (Ecore_Pthread_Notify)); + if (!notify) return EINA_FALSE; + + notify->user_data = data; + notify->work = worker; + worker->u.feedback_run.send++; + + ecore_main_loop_thread_safe_call_async(_ecore_notify_handler, notify); + } + else if (worker->message_run) + { + Ecore_Pthread_Message *msg; + Ecore_Pthread_Notify *notify; + + msg = malloc(sizeof (Ecore_Pthread_Message*)); + if (!msg) return EINA_FALSE; + msg->data = data; + msg->callback = EINA_FALSE; + msg->sync = EINA_FALSE; + + notify = malloc(sizeof (Ecore_Pthread_Notify)); + if (!notify) + { + free(msg); + return EINA_FALSE; + } + notify->work = worker; + notify->user_data = msg; + + worker->u.message_run.from.send++; + ecore_main_loop_thread_safe_call_async(_ecore_message_notify_handler, notify); + } + else + return EINA_FALSE; return EINA_TRUE; #else @@ -1257,6 +1160,74 @@ ecore_thread_feedback(Ecore_Thread *thread, #endif } +#if 0 +EAPI Ecore_Thread * +ecore_thread_message_run(Ecore_Thread_Cb func_main, + Ecore_Thread_Notify_Cb func_notify, + Ecore_Thread_Cb func_end, + Ecore_Thread_Cb func_cancel, + const void *data) +{ +#ifdef EFL_HAVE_THREADS + Ecore_Pthread_Worker *worker; + PH(t); + + if (!func_main) return NULL; + + worker = _ecore_thread_worker_new(); + if (!worker) return NULL; + + worker->u.message_run.func_main = func_main; + worker->u.message_run.func_notify = func_notify; + worker->u.message_run.direct_worker = _ecore_thread_worker_new(); + worker->u.message_run.send = ecore_pipe_add(_ecore_nothing_handler, worker); + worker->u.message_run.from.send = 0; + worker->u.message_run.from.received = 0; + worker->u.message_run.to.send = 0; + worker->u.message_run.to.received = 0; + + ecore_pipe_freeze(worker->u.message_run.send); + + worker->func_cancel = func_cancel; + worker->func_end = func_end; + worker->hash = NULL; + LKI(worker->mutex); + CDI(worker->cond, worker->mutex); + worker->data = data; + + LKI(worker->cancel_mutex); + worker->cancel = EINA_FALSE; + worker->message_run = EINA_TRUE; + worker->feedback_run = EINA_FALSE; + worker->kill = EINA_FALSE; + worker->reschedule = EINA_FALSE; + worker->no_queue = EINA_FALSE; + worker->self = 0; + + eina_threads_init(); + + if (PHC(t, _ecore_direct_worker, worker) == 0) + return (Ecore_Thread*) worker; + + eina_threads_shutdown(); + + if (worker->u.message_run.direct_worker) _ecore_thread_worker_free(worker->u.message_run.direct_worker); + if (worker->u.message_run.send) ecore_pipe_del(worker->u.message_run.send); + + CDD(worker->cond); + LKD(worker->mutex); +#else + /* Note: This type of thread can't and never will work without thread support */ + WRN("ecore_thread_message_run called, but threads disable in Ecore, things will go wrong. Starting now !"); +# warning "You disabled threads support in ecore, I hope you know what you are doing !" +#endif + + func_cancel((void *) data, NULL); + + return NULL; +} +#endif + EAPI Eina_Bool ecore_thread_reschedule(Ecore_Thread *thread) { @@ -1641,24 +1612,13 @@ ecore_thread_global_data_wait(const char *key, while (1) { -#ifndef _WIN32 - struct timespec t = { 0, 0 }; - - t.tv_sec = (long int)tm; - t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000); -#else - struct timeval t = { 0, 0 }; - - t.tv_sec = (long int)tm; - t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000); -#endif LRWKRL(_ecore_thread_global_hash_lock); ret = eina_hash_find(_ecore_thread_global_hash, key); LRWKU(_ecore_thread_global_hash_lock); if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get()))) break; LKL(_ecore_thread_global_hash_mutex); - CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t); + CDW(_ecore_thread_global_hash_cond, tm); LKU(_ecore_thread_global_hash_mutex); } if (ret) return ret->data;