From d1fcb71f843b3e334c71698234447a4c92b03c9b Mon Sep 17 00:00:00 2001 From: Cedric BAIL Date: Wed, 30 Jun 2010 13:25:28 +0000 Subject: [PATCH] * ecore: add ecore_long_run facility with notify to main loop. SVN revision: 49948 --- legacy/ecore/src/lib/ecore/Ecore.h | 8 + legacy/ecore/src/lib/ecore/ecore_thread.c | 404 ++++++++++++++++++---- 2 files changed, 343 insertions(+), 69 deletions(-) diff --git a/legacy/ecore/src/lib/ecore/Ecore.h b/legacy/ecore/src/lib/ecore/Ecore.h index 610b1affa4..4e514b9c48 100644 --- a/legacy/ecore/src/lib/ecore/Ecore.h +++ b/legacy/ecore/src/lib/ecore/Ecore.h @@ -336,7 +336,15 @@ extern "C" { EAPI void ecore_pipe_read_close(Ecore_Pipe *p); EAPI Ecore_Thread *ecore_thread_run(void (*func_heavy)(void *data), void (*func_end)(void *data), void (*func_cancel)(void *data), const void *data); + EAPI Ecore_Thread *ecore_long_run(void (*func_heavy)(Ecore_Thread *thread, void *data), + void (*func_notify)(Ecore_Thread *thread, void *data), + void (*func_end)(void *data), + void (*func_cancel)(void *data), + const void *data, + Eina_Bool try_no_queue); EAPI Eina_Bool ecore_thread_cancel(Ecore_Thread *thread); + EAPI Eina_Bool ecore_thread_check(Ecore_Thread *thread); + EAPI Eina_Bool ecore_thread_notify(Ecore_Thread *thread, void *data); EAPI double ecore_time_get(void); EAPI double ecore_loop_time_get(void); diff --git a/legacy/ecore/src/lib/ecore/ecore_thread.c b/legacy/ecore/src/lib/ecore/ecore_thread.c index 561d0ccce4..8a9229df64 100644 --- a/legacy/ecore/src/lib/ecore/ecore_thread.c +++ b/legacy/ecore/src/lib/ecore/ecore_thread.c @@ -13,25 +13,43 @@ #include "Ecore.h" #include "ecore_private.h" -#ifdef EFL_HAVE_PTHREAD typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker; -typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data; typedef struct _Ecore_Pthread Ecore_Pthread; struct _Ecore_Pthread_Worker { - void (*func_heavy)(void *data); - void (*func_end)(void *data); + union { + struct { + void (*func_heavy)(void *data); + } short_run; + struct { + void (*func_heavy)(Ecore_Thread *thread, void *data); + void (*func_notify)(Ecore_Thread *thread, void *data); + + Ecore_Pipe *notify; + +#ifdef EFL_HAVE_PTHREAD + pthread_t self; +#endif + } long_run; + } u; + void (*func_cancel)(void *data); + void (*func_end)(void *data); const void *data; Eina_Bool cancel : 1; + Eina_Bool long_run : 1; }; +#ifdef EFL_HAVE_PTHREAD +typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data; + struct _Ecore_Pthread_Data { Ecore_Pipe *p; + void *data; pthread_t thread; }; #endif @@ -41,8 +59,9 @@ static int ECORE_THREAD_PIPE_DEL = 0; #ifdef EFL_HAVE_PTHREAD static int _ecore_thread_count = 0; -static Eina_List *_ecore_thread_data = NULL; static Eina_List *_ecore_thread = NULL; +static Eina_List *_ecore_thread_data = NULL; +static Eina_List *_ecore_long_thread_data = NULL; static Ecore_Event_Handler *del_handler = NULL; static pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER; @@ -75,64 +94,6 @@ _ecore_thread_end(Ecore_Pthread_Data *pth) ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL); } -static void * -_ecore_thread_worker(Ecore_Pthread_Data *pth) -{ - Ecore_Pthread_Worker *work; - - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); - - pthread_mutex_lock(&_mutex); - _ecore_thread_count++; - pthread_mutex_unlock(&_mutex); - - on_error: - - while (_ecore_thread_data) - { - pthread_mutex_lock(&_mutex); - - if (!_ecore_thread_data) - { - pthread_mutex_unlock(&_mutex); - break; - } - - work = eina_list_data_get(_ecore_thread_data); - _ecore_thread_data = eina_list_remove_list(_ecore_thread_data, _ecore_thread_data); - - pthread_mutex_unlock(&_mutex); - - work->func_heavy((void*) work->data); - - ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*)); - } - - pthread_mutex_lock(&_mutex); - if (_ecore_thread_data) - { - pthread_mutex_unlock(&_mutex); - goto on_error; - } - _ecore_thread_count--; - - pthread_mutex_unlock(&_mutex); - - work = malloc(sizeof (Ecore_Pthread_Worker)); - if (!work) return NULL; - - work->data = pth; - work->func_heavy = NULL; - work->func_end = (void*) _ecore_thread_end; - work->func_cancel = NULL; - work->cancel = EINA_FALSE; - - ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*)); - - return pth->p; -} - static void _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte) { @@ -149,11 +110,173 @@ _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte) } else { - work->func_end((void*) work->data); + if (work->func_end) + work->func_end((void*) work->data); } + if (work->long_run) ecore_pipe_del(work->u.long_run.notify); free(work); } + +static void +_ecore_notify_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte) +{ + Ecore_Pthread_Worker *work; + + if (nbyte != sizeof (Ecore_Pthread_Worker*)) return ; + + work = *(Ecore_Pthread_Worker**)buffer; + + if (work->u.long_run.func_notify) + work->u.long_run.func_notify((Ecore_Thread *) work, (void*) work->data); +} + +static void +_ecore_short_job(Ecore_Pipe *end_pipe) +{ + Ecore_Pthread_Worker *work; + + while (_ecore_thread_data) + { + pthread_mutex_lock(&_mutex); + + if (!_ecore_thread_data) + { + pthread_mutex_unlock(&_mutex); + break; + } + + work = eina_list_data_get(_ecore_thread_data); + _ecore_thread_data = eina_list_remove_list(_ecore_thread_data, _ecore_thread_data); + + pthread_mutex_unlock(&_mutex); + + work->u.short_run.func_heavy((void*) work->data); + + ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker*)); + } +} + +static void +_ecore_long_job(Ecore_Pipe *end_pipe, pthread_t thread) +{ + Ecore_Pthread_Worker *work; + + while (_ecore_long_thread_data) + { + pthread_mutex_lock(&_mutex); + + if (!_ecore_long_thread_data) + { + pthread_mutex_unlock(&_mutex); + break; + } + + work = eina_list_data_get(_ecore_long_thread_data); + _ecore_long_thread_data = eina_list_remove_list(_ecore_long_thread_data, _ecore_long_thread_data); + + pthread_mutex_unlock(&_mutex); + + work->u.long_run.self = thread; + work->u.long_run.func_heavy((Ecore_Thread *) work, (void*) work->data); + + ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker*)); + } +} + +static void * +_ecore_direct_worker(Ecore_Pthread_Worker *work) +{ + Ecore_Pthread_Data *pth; + + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + + pth = malloc(sizeof (Ecore_Pthread_Data)); + if (!pth) return NULL; + + pth->p = ecore_pipe_add(_ecore_thread_handler, NULL); + if (!pth->p) + { + free(pth); + return NULL; + } + pth->thread = pthread_self(); + + work->u.long_run.self = pth->thread; + work->u.long_run.func_heavy((Ecore_Thread *) work, (void*) work->data); + + ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*)); + + work = malloc(sizeof (Ecore_Pthread_Worker)); + if (!work) + { + ecore_pipe_del(pth->p); + free(pth); + return NULL; + } + + work->data = pth; + work->u.short_run.func_heavy = NULL; + work->func_end = (void*) _ecore_thread_end; + work->func_cancel = NULL; + work->cancel = EINA_FALSE; + work->long_run = EINA_FALSE; + + ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*)); + + return pth->p; +} + +static void * +_ecore_thread_worker(Ecore_Pthread_Data *pth) +{ + Ecore_Pthread_Worker *work; + + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + + pthread_mutex_lock(&_mutex); + _ecore_thread_count++; + pthread_mutex_unlock(&_mutex); + + on_error: + if (_ecore_thread_data) _ecore_short_job(pth->p); + if (_ecore_long_thread_data) _ecore_long_job(pth->p, pth->thread); + + /* FIXME: Check if there is long running task todo, and switch to long run handler. */ + + pthread_mutex_lock(&_mutex); + if (_ecore_thread_data) + { + pthread_mutex_unlock(&_mutex); + goto on_error; + } + if (_ecore_long_thread_data) + { + pthread_mutex_unlock(&_mutex); + goto on_error; + } + + _ecore_thread_count--; + + pthread_mutex_unlock(&_mutex); + + work = malloc(sizeof (Ecore_Pthread_Worker)); + if (!work) return NULL; + + work->data = pth; + work->u.short_run.func_heavy = NULL; + work->func_end = (void*) _ecore_thread_end; + work->func_cancel = NULL; + work->cancel = EINA_FALSE; + work->long_run = EINA_FALSE; + + ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*)); + + return pth->p; +} + #endif void @@ -223,7 +346,9 @@ ecore_thread_run(void (*func_heavy)(void *data), { #ifdef EFL_HAVE_PTHREAD Ecore_Pthread_Worker *work; - Ecore_Pthread_Data *pth; + Ecore_Pthread_Data *pth = NULL; + + if (!func_heavy) return NULL; work = malloc(sizeof (Ecore_Pthread_Worker)); if (!work) @@ -232,10 +357,11 @@ ecore_thread_run(void (*func_heavy)(void *data), return NULL; } - work->func_heavy = func_heavy; + work->u.short_run.func_heavy = func_heavy; work->func_end = func_end; work->func_cancel = func_cancel; work->cancel = EINA_FALSE; + work->long_run = EINA_FALSE; work->data = data; pthread_mutex_lock(&_mutex); @@ -251,22 +377,29 @@ ecore_thread_run(void (*func_heavy)(void *data), /* One more thread could be created. */ pth = malloc(sizeof (Ecore_Pthread_Data)); - if (!pth) - goto on_error; + if (!pth) goto on_error; pth->p = ecore_pipe_add(_ecore_thread_handler, NULL); + if (!pth->p) goto on_error; if (pthread_create(&pth->thread, NULL, (void*) _ecore_thread_worker, pth) == 0) return (Ecore_Thread*) work; on_error: + if (pth) + { + if (pth->p) ecore_pipe_del(pth->p); + free(pth); + } + if (_ecore_thread_count == 0) { if (work->func_cancel) work->func_cancel((void*) work->data); free(work); + work = NULL; } - return NULL; + return (Ecore_Thread*) work; #else /* If no thread and as we don't want to break app that rely on this @@ -321,3 +454,136 @@ ecore_thread_cancel(Ecore_Thread *thread) return EINA_TRUE; #endif } + +EAPI Eina_Bool +ecore_thread_check(Ecore_Thread *thread) +{ + Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker*) thread; + + if (!worker) return EINA_FALSE; + return worker->cancel; +} + +EAPI Ecore_Thread * +ecore_long_run(void (*func_heavy)(Ecore_Thread *thread, void *data), + void (*func_notify)(Ecore_Thread *thread, void *data), + void (*func_end)(void *data), + void (*func_cancel)(void *data), + const void *data, + Eina_Bool try_no_queue) +{ + +#ifdef EFL_HAVE_PTHREAD + Ecore_Pthread_Worker *worker; + Ecore_Pthread_Data *pth = NULL; + + if (!func_heavy) return NULL; + + worker = malloc(sizeof (Ecore_Pthread_Worker)); + if (!worker) goto on_error; + + worker->u.long_run.func_heavy = func_heavy; + worker->u.long_run.func_notify = func_notify; + worker->func_cancel = func_cancel; + worker->func_end = func_end; + worker->data = data; + worker->cancel = EINA_FALSE; + worker->long_run = EINA_TRUE; + + worker->u.long_run.notify = ecore_pipe_add(_ecore_notify_handler, NULL); + + if (!try_no_queue) + { + pthread_t t; + + if (pthread_create(&t, NULL, (void*) _ecore_direct_worker, worker) == 0) + return (Ecore_Thread*) worker; + } + + pthread_mutex_lock(&_mutex); + _ecore_long_thread_data = eina_list_append(_ecore_long_thread_data, worker); + + if (_ecore_thread_count == _ecore_thread_count_max) + { + pthread_mutex_unlock(&_mutex); + return (Ecore_Thread*) worker; + } + + pthread_mutex_unlock(&_mutex); + + /* One more thread could be created. */ + pth = malloc(sizeof (Ecore_Pthread_Data)); + if (!pth) goto on_error; + + pth->p = ecore_pipe_add(_ecore_thread_handler, NULL); + if (pth->p) goto on_error; + + if (pthread_create(&pth->thread, NULL, (void*) _ecore_thread_worker, pth) == 0) + return (Ecore_Thread*) worker; + + on_error: + if (pth) + { + if (pth->p) ecore_pipe_del(pth->p); + free(pth); + } + + if (_ecore_thread_count == 0) + { + if (func_cancel) func_cancel((void*) data); + + if (worker) + { + ecore_pipe_del(worker->u.long_run.notify); + free(worker); + worker = NULL; + } + } + + return (Ecore_Thread*) worker; +#else + Ecore_Pthread_Worker worker; + + /* + If no thread and as we don't want to break app that rely on this + facility, we will lock the interface until we are done. + */ + worker.u.long_run.func_heavy = func_heavy; + worker.u.long_run.func_notify = func_notify; + worker.u.long_run.notify = NULL; + worker.func_cancel = func_cancel; + worker.func_end = func_end; + worker.data = data; + worker.cancel = EINA_FALSE; + worker.long_run = EINA_TRUE; + + func_heavy((Ecore_Thread*) &worker, data); + + if (worker.cancel) func_cancel(data); + else func_end(data); + + return NULL; +#endif +} + +EAPI Eina_Bool +ecore_thread_notify(Ecore_Thread *thread, void *data) +{ + Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker*) thread; + + if (!worker) return EINA_FALSE; + if (!worker->long_run) return EINA_FALSE; + +#ifdef EFL_HAVE_PTHREAD + if (worker->u.long_run.self != pthread_self()) return EINA_FALSE; + + ecore_pipe_write(worker->u.long_run.notify, data, sizeof (void*)); + + return EINA_TRUE; +#else + worker->u.long_run.func_notify(thread, data); + + return EINA_TRUE; +#endif +} +