efl/src/lib/ecore/ecore_thread.c

1510 lines
36 KiB
C
Raw Normal View History

#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <stdlib.h>
#include <sys/time.h>
#include <assert.h>
#include <sys/types.h>
#ifdef HAVE_UNISTD_H
# include <unistd.h>
#endif
#ifdef HAVE_EVIL
# include <Evil.h>
#endif
#include "Ecore.h"
#include "ecore_private.h"
#ifdef EFL_HAVE_THREADS
# define LK(x) Eina_Lock x
# define LKI(x) eina_lock_new(&(x))
# define LKD(x) eina_lock_free(&(x))
# define LKL(x) eina_lock_take(&(x))
# define LKU(x) eina_lock_release(&(x))
# define CD(x) Eina_Condition x
# define CDI(x, m) eina_condition_new(&(x), &(m))
# define CDD(x) eina_condition_free(&(x))
# define CDB(x) eina_condition_broadcast(&(x))
# define CDW(x, t) eina_condition_timedwait(&(x), t)
# define LRWK(x) Eina_RWLock x
# define LRWKI(x) eina_rwlock_new(&(x));
# define LRWKD(x) eina_rwlock_free(&(x));
# define LRWKWL(x) eina_rwlock_take_write(&(x));
# define LRWKRL(x) eina_rwlock_take_read(&(x));
# define LRWKU(x) eina_rwlock_release(&(x));
# define PH(x) Eina_Thread x
# define PHE(x, y) eina_thread_equal(x, y)
# define PHS() eina_thread_self()
# define PHC(x, f, d) eina_thread_create(&(x), EINA_THREAD_BACKGROUND, -1, (void *)f, d)
# define PHJ(x) eina_thread_join(x)
# ifdef EFL_HAVE_POSIX_THREADS
# include <pthread.h>
# ifdef __linux__
# include <sched.h>
# include <sys/resource.h>
# include <unistd.h>
# include <sys/syscall.h>
# include <errno.h>
# endif
# else /* EFL_HAVE_WIN32_THREADS */
# define WIN32_LEAN_AND_MEAN
# include <windows.h>
# undef WIN32_LEAN_AND_MEAN
# endif
#endif
typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
typedef struct _Ecore_Pthread Ecore_Pthread;
typedef struct _Ecore_Thread_Data Ecore_Thread_Data;
struct _Ecore_Thread_Data
{
void *data;
Eina_Free_Cb cb;
};
struct _Ecore_Pthread_Worker
{
union {
struct
{
Ecore_Thread_Cb func_blocking;
} short_run;
struct
{
Ecore_Thread_Cb func_heavy;
Ecore_Thread_Notify_Cb func_notify;
Ecore_Pthread_Worker *direct_worker;
int send;
int received;
} feedback_run;
struct {
Ecore_Thread_Cb func_main;
Ecore_Thread_Notify_Cb func_notify;
Ecore_Pipe *send;
Ecore_Pthread_Worker *direct_worker;
struct {
int send;
int received;
} from, to;
} message_run;
} u;
Ecore_Thread_Cb func_cancel;
Ecore_Thread_Cb func_end;
#ifdef EFL_HAVE_THREADS
PH(self);
Eina_Hash *hash;
CD(cond);
LK(mutex);
#endif
const void *data;
int cancel;
#ifdef EFL_HAVE_THREADS
LK(cancel_mutex);
#endif
Eina_Bool message_run : 1;
Eina_Bool feedback_run : 1;
Eina_Bool kill : 1;
Eina_Bool reschedule : 1;
Eina_Bool no_queue : 1;
};
#ifdef EFL_HAVE_THREADS
typedef struct _Ecore_Pthread_Notify Ecore_Pthread_Notify;
struct _Ecore_Pthread_Notify
{
Ecore_Pthread_Worker *work;
const void *user_data;
};
typedef void *(*Ecore_Thread_Sync_Cb)(void* data, Ecore_Thread *thread);
typedef struct _Ecore_Pthread_Message Ecore_Pthread_Message;
struct _Ecore_Pthread_Message
{
union {
Ecore_Thread_Cb async;
Ecore_Thread_Sync_Cb sync;
} u;
const void *data;
int code;
Eina_Bool callback : 1;
Eina_Bool sync : 1;
};
#endif
static int _ecore_thread_count_max = 0;
#ifdef EFL_HAVE_THREADS
static void _ecore_thread_handler(void *data);
static int _ecore_thread_count = 0;
static Eina_List *_ecore_running_job = NULL;
static Eina_List *_ecore_pending_job_threads = NULL;
static Eina_List *_ecore_pending_job_threads_feedback = NULL;
static LK(_ecore_pending_job_threads_mutex);
static LK(_ecore_running_job_mutex);
static Eina_Hash *_ecore_thread_global_hash = NULL;
static LRWK(_ecore_thread_global_hash_lock);
static LK(_ecore_thread_global_hash_mutex);
static CD(_ecore_thread_global_hash_cond);
static Eina_Bool have_main_loop_thread = 0;
static Eina_Trash *_ecore_thread_worker_trash = NULL;
static int _ecore_thread_worker_count = 0;
static void *_ecore_thread_worker(void *);
static Ecore_Pthread_Worker *_ecore_thread_worker_new(void);
static PH(get_main_loop_thread) (void)
{
static PH(main_loop_thread);
static pid_t main_loop_pid;
pid_t pid = getpid();
if (pid != main_loop_pid)
{
main_loop_pid = pid;
main_loop_thread = PHS();
have_main_loop_thread = 1;
}
return main_loop_thread;
}
static void
_ecore_thread_worker_free(Ecore_Pthread_Worker *worker)
{
LKD(worker->cancel_mutex);
CDD(worker->cond);
LKD(worker->mutex);
if (_ecore_thread_worker_count > ((_ecore_thread_count_max + 1) * 16))
{
_ecore_thread_worker_count--;
free(worker);
return;
}
eina_trash_push(&_ecore_thread_worker_trash, worker);
}
static void
_ecore_thread_data_free(void *data)
{
Ecore_Thread_Data *d = data;
if (d->cb) d->cb(d->data);
free(d);
}
static void
_ecore_thread_join(PH(thread))
{
PHJ(thread);
}
static void
_ecore_thread_kill(Ecore_Pthread_Worker *work)
{
if (work->cancel)
{
if (work->func_cancel)
work->func_cancel((void *)work->data, (Ecore_Thread *)work);
}
else
{
if (work->func_end)
work->func_end((void *)work->data, (Ecore_Thread *)work);
}
if (work->feedback_run)
{
if (work->u.feedback_run.direct_worker)
_ecore_thread_worker_free(work->u.feedback_run.direct_worker);
}
2010-07-23 10:52:50 -07:00
if (work->hash)
eina_hash_free(work->hash);
_ecore_thread_worker_free(work);
}
static void
_ecore_thread_handler(void *data)
{
Ecore_Pthread_Worker *work = data;
if (work->feedback_run)
{
if (work->u.feedback_run.send != work->u.feedback_run.received)
{
work->kill = EINA_TRUE;
return;
}
}
_ecore_thread_kill(work);
}
#if 0
static void
_ecore_nothing_handler(void *data EINA_UNUSED, void *buffer EINA_UNUSED, unsigned int nbyte EINA_UNUSED)
{
}
#endif
static void
_ecore_notify_handler(void *data)
{
Ecore_Pthread_Notify *notify = data;
Ecore_Pthread_Worker *work = notify->work;
void *user_data = (void*) notify->user_data;
work->u.feedback_run.received++;
if (work->u.feedback_run.func_notify)
work->u.feedback_run.func_notify((void *)work->data, (Ecore_Thread *)work, user_data);
/* Force reading all notify event before killing the thread */
if (work->kill && work->u.feedback_run.send == work->u.feedback_run.received)
{
_ecore_thread_kill(work);
}
free(notify);
}
static void
_ecore_message_notify_handler(void *data)
{
Ecore_Pthread_Notify *notify = data;
Ecore_Pthread_Worker *work = notify->work;
Ecore_Pthread_Message *user_data = (void *) notify->user_data;
Eina_Bool delete = EINA_TRUE;
work->u.message_run.from.received++;
if (!user_data->callback)
{
if (work->u.message_run.func_notify)
work->u.message_run.func_notify((void *) work->data, (Ecore_Thread *) work, (void *) user_data->data);
}
else
{
if (user_data->sync)
{
user_data->data = user_data->u.sync((void*) user_data->data, (Ecore_Thread *) work);
user_data->callback = EINA_FALSE;
user_data->code = INT_MAX;
ecore_pipe_write(work->u.message_run.send, &user_data, sizeof (Ecore_Pthread_Message *));
delete = EINA_FALSE;
}
else
{
user_data->u.async((void*) user_data->data, (Ecore_Thread *) work);
}
}
if (delete)
{
free(user_data);
}
/* Force reading all notify event before killing the thread */
if (work->kill && work->u.message_run.from.send == work->u.message_run.from.received)
{
_ecore_thread_kill(work);
}
free(notify);
}
static void
_ecore_short_job(PH(thread))
{
Ecore_Pthread_Worker *work;
int cancel;
LKL(_ecore_pending_job_threads_mutex);
if (!_ecore_pending_job_threads)
{
LKU(_ecore_pending_job_threads_mutex);
return;
}
work = eina_list_data_get(_ecore_pending_job_threads);
_ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads,
_ecore_pending_job_threads);
LKU(_ecore_pending_job_threads_mutex);
LKL(_ecore_running_job_mutex);
_ecore_running_job = eina_list_append(_ecore_running_job, work);
LKU(_ecore_running_job_mutex);
LKL(work->cancel_mutex);
cancel = work->cancel;
LKU(work->cancel_mutex);
work->self = thread;
if (!cancel)
work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work);
LKL(_ecore_running_job_mutex);
_ecore_running_job = eina_list_remove(_ecore_running_job, work);
LKU(_ecore_running_job_mutex);
if (work->reschedule)
{
work->reschedule = EINA_FALSE;
LKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
LKU(_ecore_pending_job_threads_mutex);
}
else
{
ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
}
}
static void
_ecore_feedback_job(PH(thread))
{
Ecore_Pthread_Worker *work;
int cancel;
LKL(_ecore_pending_job_threads_mutex);
if (!_ecore_pending_job_threads_feedback)
{
LKU(_ecore_pending_job_threads_mutex);
return;
}
work = eina_list_data_get(_ecore_pending_job_threads_feedback);
_ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback,
_ecore_pending_job_threads_feedback);
LKU(_ecore_pending_job_threads_mutex);
LKL(_ecore_running_job_mutex);
_ecore_running_job = eina_list_append(_ecore_running_job, work);
LKU(_ecore_running_job_mutex);
LKL(work->cancel_mutex);
cancel = work->cancel;
LKU(work->cancel_mutex);
work->self = thread;
if (!cancel)
work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
LKL(_ecore_running_job_mutex);
_ecore_running_job = eina_list_remove(_ecore_running_job, work);
LKU(_ecore_running_job_mutex);
if (work->reschedule)
{
work->reschedule = EINA_FALSE;
LKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, work);
LKU(_ecore_pending_job_threads_mutex);
}
else
{
ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
}
}
static void *
_ecore_direct_worker(Ecore_Pthread_Worker *work)
{
work->self = PHS();
if (work->message_run)
work->u.message_run.func_main((void *) work->data, (Ecore_Thread *) work);
else
work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
ecore_main_loop_thread_safe_call_async((Ecore_Cb) _ecore_thread_join,
(void*) PHS());
return NULL;
}
static void *
_ecore_thread_worker(void *data EINA_UNUSED)
{
restart:
_ecore_short_job(PHS());
_ecore_feedback_job(PHS());
/* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
LKL(_ecore_pending_job_threads_mutex);
if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
{
LKU(_ecore_pending_job_threads_mutex);
goto restart;
}
LKU(_ecore_pending_job_threads_mutex);
/* Sleep a little to prevent premature death */
#ifdef _WIN32
Sleep(1); /* around 50ms */
#else
usleep(50);
#endif
LKL(_ecore_pending_job_threads_mutex);
if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
{
LKU(_ecore_pending_job_threads_mutex);
goto restart;
}
_ecore_thread_count--;
ecore_main_loop_thread_safe_call_async((Ecore_Cb) _ecore_thread_join,
(void*) PHS());
LKU(_ecore_pending_job_threads_mutex);
return NULL;
}
#endif
static Ecore_Pthread_Worker *
_ecore_thread_worker_new(void)
{
#ifdef EFL_HAVE_THREADS
Ecore_Pthread_Worker *result;
result = eina_trash_pop(&_ecore_thread_worker_trash);
if (!result)
{
result = calloc(1, sizeof(Ecore_Pthread_Worker));
_ecore_thread_worker_count++;
}
LKI(result->cancel_mutex);
LKI(result->mutex);
CDI(result->cond, result->mutex);
return result;
#else
return malloc(sizeof (Ecore_Pthread_Worker));
#endif
}
void
_ecore_thread_init(void)
{
_ecore_thread_count_max = eina_cpu_count();
if (_ecore_thread_count_max <= 0)
_ecore_thread_count_max = 1;
#ifdef EFL_HAVE_THREADS
LKI(_ecore_pending_job_threads_mutex);
LRWKI(_ecore_thread_global_hash_lock);
LKI(_ecore_thread_global_hash_mutex);
LKI(_ecore_running_job_mutex);
CDI(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex);
#endif
}
void
_ecore_thread_shutdown(void)
{
/* FIXME: If function are still running in the background, should we kill them ? */
#ifdef EFL_HAVE_THREADS
Ecore_Pthread_Worker *work;
Eina_List *l;
Eina_Bool test;
int iteration = 0;
LKL(_ecore_pending_job_threads_mutex);
EINA_LIST_FREE(_ecore_pending_job_threads, work)
{
if (work->func_cancel)
work->func_cancel((void *)work->data, (Ecore_Thread *) work);
free(work);
}
EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
{
if (work->func_cancel)
work->func_cancel((void *)work->data, (Ecore_Thread *) work);
free(work);
}
LKU(_ecore_pending_job_threads_mutex);
LKL(_ecore_running_job_mutex);
EINA_LIST_FOREACH(_ecore_running_job, l, work)
ecore_thread_cancel((Ecore_Thread*) work);
LKU(_ecore_running_job_mutex);
do
{
LKL(_ecore_pending_job_threads_mutex);
if (_ecore_thread_count > 0)
{
test = EINA_TRUE;
}
else
{
test = EINA_FALSE;
}
LKU(_ecore_pending_job_threads_mutex);
iteration++;
if (test) usleep(50000);
}
while (test == EINA_TRUE && iteration < 20);
if (iteration == 20 && _ecore_thread_count > 0)
{
ERR("%i of the child thread are still running after 1s. This can lead to a segv. Sorry.", _ecore_thread_count);
}
if (_ecore_thread_global_hash)
eina_hash_free(_ecore_thread_global_hash);
have_main_loop_thread = 0;
while ((work = eina_trash_pop(&_ecore_thread_worker_trash)))
{
free(work);
}
LKD(_ecore_pending_job_threads_mutex);
LRWKD(_ecore_thread_global_hash_lock);
LKD(_ecore_thread_global_hash_mutex);
LKD(_ecore_running_job_mutex);
CDD(_ecore_thread_global_hash_cond);
#endif
}
2010-10-17 00:03:28 -07:00
EAPI Ecore_Thread *
ecore_thread_run(Ecore_Thread_Cb func_blocking,
Ecore_Thread_Cb func_end,
Ecore_Thread_Cb func_cancel,
const void *data)
{
Ecore_Pthread_Worker *work;
Eina_Bool tried = EINA_FALSE;
#ifdef EFL_HAVE_THREADS
PH(thread);
#endif
EINA_MAIN_LOOP_CHECK_RETURN_VAL(NULL);
if (!func_blocking) return NULL;
work = _ecore_thread_worker_new();
if (!work)
{
if (func_cancel)
func_cancel((void *)data, NULL);
return NULL;
}
work->u.short_run.func_blocking = func_blocking;
work->func_end = func_end;
work->func_cancel = func_cancel;
work->cancel = EINA_FALSE;
work->feedback_run = EINA_FALSE;
work->message_run = EINA_FALSE;
work->kill = EINA_FALSE;
work->reschedule = EINA_FALSE;
work->no_queue = EINA_FALSE;
work->data = data;
#ifdef EFL_HAVE_THREADS
work->self = 0;
work->hash = NULL;
LKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
if (_ecore_thread_count == _ecore_thread_count_max)
{
LKU(_ecore_pending_job_threads_mutex);
return (Ecore_Thread *)work;
}
LKU(_ecore_pending_job_threads_mutex);
/* One more thread could be created. */
eina_threads_init();
LKL(_ecore_pending_job_threads_mutex);
retry:
if (PHC(thread, _ecore_thread_worker, NULL))
{
_ecore_thread_count++;
LKU(_ecore_pending_job_threads_mutex);
return (Ecore_Thread *)work;
}
if (!tried)
{
_ecore_main_call_flush();
tried = EINA_TRUE;
goto retry;
}
if (_ecore_thread_count == 0)
{
_ecore_pending_job_threads = eina_list_remove(_ecore_pending_job_threads, work);
if (work->func_cancel)
work->func_cancel((void *) work->data, (Ecore_Thread *) work);
_ecore_thread_worker_free(work);
work = NULL;
}
LKU(_ecore_pending_job_threads_mutex);
eina_threads_shutdown();
return (Ecore_Thread *)work;
#else
/*
If no thread and as we don't want to break app that rely on this
facility, we will lock the interface until we are done.
*/
do {
/* Handle reschedule by forcing it here. That would mean locking the app,
* would be better with an idler, but really to complex for a case where
* thread should really exist.
*/
work->reschedule = EINA_FALSE;
func_blocking((void *)data, (Ecore_Thread *)work);
if (work->cancel == EINA_FALSE) func_end((void *)data, (Ecore_Thread *)work);
else func_cancel((void *)data, (Ecore_Thread *)work);
} while (work->reschedule == EINA_TRUE);
free(work);
return NULL;
#endif
}
EAPI Eina_Bool
ecore_thread_cancel(Ecore_Thread *thread)
{
#ifdef EFL_HAVE_THREADS
Ecore_Pthread_Worker *volatile work = (Ecore_Pthread_Worker *)thread;
Eina_List *l;
int cancel;
if (!work)
return EINA_TRUE;
LKL(work->cancel_mutex);
cancel = work->cancel;
LKU(work->cancel_mutex);
if (cancel)
return EINA_FALSE;
if (work->feedback_run)
{
if (work->kill)
return EINA_TRUE;
if (work->u.feedback_run.send != work->u.feedback_run.received)
goto on_exit;
}
LKL(_ecore_pending_job_threads_mutex);
if ((have_main_loop_thread) &&
(PHE(get_main_loop_thread(), PHS())))
{
if (!work->feedback_run)
EINA_LIST_FOREACH(_ecore_pending_job_threads, l, work)
{
if ((void *)work == (void *)thread)
{
_ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads, l);
LKU(_ecore_pending_job_threads_mutex);
if (work->func_cancel)
work->func_cancel((void *)work->data, (Ecore_Thread *)work);
free(work);
return EINA_TRUE;
}
}
else
EINA_LIST_FOREACH(_ecore_pending_job_threads_feedback, l, work)
{
if ((void *)work == (void *)thread)
{
_ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback, l);
LKU(_ecore_pending_job_threads_mutex);
if (work->func_cancel)
work->func_cancel((void *)work->data, (Ecore_Thread *)work);
free(work);
return EINA_TRUE;
}
}
}
LKU(_ecore_pending_job_threads_mutex);
work = (Ecore_Pthread_Worker *)thread;
/* Delay the destruction */
on_exit:
LKL(work->cancel_mutex);
work->cancel = EINA_TRUE;
LKU(work->cancel_mutex);
return EINA_FALSE;
#else
(void) thread;
return EINA_TRUE;
#endif
}
EAPI Eina_Bool
ecore_thread_check(Ecore_Thread *thread)
{
Ecore_Pthread_Worker *volatile worker = (Ecore_Pthread_Worker *) thread;
int cancel;
if (!worker) return EINA_TRUE;
#ifdef EFL_HAVE_THREADS
LKL(worker->cancel_mutex);
#endif
cancel = worker->cancel;
/* FIXME: there is an insane bug driving me nuts here. I don't know if
it's a race condition, some cache issue or some alien attack on our software.
But ecore_thread_check will only work correctly with a printf, all the volatile,
lock and even usleep don't help here... */
/* fprintf(stderr, "wc: %i\n", cancel); */
#ifdef EFL_HAVE_THREADS
LKU(worker->cancel_mutex);
#endif
return cancel;
}