eina/ecore: allow threads to be canceled, use in ecore_con.

As discussed in the mailing list, many people will use worker threads
to execute blocking syscalls and mandating ecore_thread_check() for
voluntary preemption reduces the ecore_thread usefulness a lot.

A clear example is ecore_con usage of connect() and getaddrinfo() in
threads. If the connect timeout expires, the thread will be cancelled,
but it was blocked on syscalls and they will hang around for long
time. If the application exits, ecore will print an error saying it
can SEGV.

Then enable access to pthread_setcancelstate(PTHREAD_CANCEL_ENABLE)
via eina_thread_cancellable_set(EINA_TRUE), to pthread_cancel() via
eina_thread_cancel(), to pthread_cleanup_push()/pthread_cleanup_pop()
via EINA_THREAD_CLEANUP_PUSH()/EINA_THREAD_CLEANUP_POP() and so on.

Ecore threads will enforce non-cancellable threads on its own code,
but the user may decide to enable that and allow cancellation, that's
not an issue since ecore_thread now plays well and use cleanup
functions.

Ecore con connect/resolve make use of that and enable cancellable
state, efl_net_dialer_tcp benefits a lot from that.

A good comparison of the benefit is to run:

   ./src/examples/ecore/efl_io_copier_example tcp://google.com:1234 :stdout:

before and after. It will timeout after 30s and with this patch the
thread is gone, no ecore error is printed about possible SEGV.
This commit is contained in:
Gustavo Sverzut Barbieri 2016-09-14 01:38:58 -03:00
parent 5df0b6765b
commit 960e1a1d16
6 changed files with 373 additions and 41 deletions

View File

@ -1794,6 +1794,17 @@ EAPI Ecore_Thread *ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, Ecore_T
* ecore_thread_reschedule().
* @li The function is prepared to leave early by checking if
* ecore_thread_check() returns @c EINA_TRUE.
* @li The function marks the thread as cancellable using
* eina_thread_cancellable_set(), allowing the thread to be terminated
* at explicit cancellation points defined with
* eina_thread_cancel_checkpoint() or with syscalls mentioned at
* man:pthreads(7). This allows blocking operations such as network or
* disk access to be stopped without polling
* ecore_thread_check(). Note that a cancelled thread may leak
* resources if no cleanup function is registered with
* EINA_THREAD_CLEANUP_PUSH(). Consider running such code using
* eina_thread_cancellable_run().
*
* The user function can cancel itself by calling ecore_thread_cancel(), but
* it should always use the ::Ecore_Thread handle passed to it and never
@ -1804,6 +1815,7 @@ EAPI Ecore_Thread *ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, Ecore_T
* returns @c EINA_TRUE or after the @c func_cancel callback returns.
*
* @see ecore_thread_check()
* @see eina_thread_cancellable_run()
*/
EAPI Eina_Bool ecore_thread_cancel(Ecore_Thread *thread);

View File

@ -202,6 +202,7 @@ _ecore_thread_data_free(void *data)
static void
_ecore_thread_join(PH(thread))
{
DBG("joining thread=%" PRIu64, (uint64_t)thread);
PHJ(thread);
}
@ -319,6 +320,31 @@ _ecore_message_notify_handler(void *data)
free(notify);
}
static void
_ecore_short_job_cleanup(void *data)
{
Ecore_Pthread_Worker *work = data;
DBG("cleanup work=%p, thread=%" PRIu64, work, (uint64_t)work->self);
SLKL(_ecore_running_job_mutex);
_ecore_running_job = eina_list_remove(_ecore_running_job, work);
SLKU(_ecore_running_job_mutex);
if (work->reschedule)
{
work->reschedule = EINA_FALSE;
SLKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
SLKU(_ecore_pending_job_threads_mutex);
}
else
{
ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
}
}
static void
_ecore_short_job(PH(thread))
{
@ -346,19 +372,31 @@ _ecore_short_job(PH(thread))
cancel = work->cancel;
SLKU(work->cancel_mutex);
work->self = thread;
EINA_THREAD_CLEANUP_PUSH(_ecore_short_job_cleanup, work);
if (!cancel)
work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work);
eina_thread_cancellable_set(EINA_FALSE, NULL);
EINA_THREAD_CLEANUP_POP(EINA_TRUE);
}
static void
_ecore_feedback_job_cleanup(void *data)
{
Ecore_Pthread_Worker *work = data;
DBG("cleanup work=%p, thread=%" PRIu64, work, (uint64_t)work->self);
SLKL(_ecore_running_job_mutex);
_ecore_running_job = eina_list_remove(_ecore_running_job, work);
SLKU(_ecore_running_job_mutex);
if (work->reschedule)
{
work->reschedule = EINA_FALSE;
SLKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
_ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, work);
SLKU(_ecore_pending_job_threads_mutex);
}
else
@ -393,52 +431,69 @@ _ecore_feedback_job(PH(thread))
cancel = work->cancel;
SLKU(work->cancel_mutex);
work->self = thread;
EINA_THREAD_CLEANUP_PUSH(_ecore_feedback_job_cleanup, work);
if (!cancel)
work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
eina_thread_cancellable_set(EINA_FALSE, NULL);
EINA_THREAD_CLEANUP_POP(EINA_TRUE);
}
SLKL(_ecore_running_job_mutex);
_ecore_running_job = eina_list_remove(_ecore_running_job, work);
SLKU(_ecore_running_job_mutex);
static void
_ecore_direct_worker_cleanup(void *data)
{
Ecore_Pthread_Worker *work = data;
if (work->reschedule)
{
work->reschedule = EINA_FALSE;
SLKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, work);
SLKU(_ecore_pending_job_threads_mutex);
}
else
{
ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
}
DBG("cleanup work=%p, thread=%" PRIu64 " (should join)", work, (uint64_t)work->self);
ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
ecore_main_loop_thread_safe_call_async((Ecore_Cb) _ecore_thread_join,
(void*)(intptr_t)PHS());
}
static void *
_ecore_direct_worker(Ecore_Pthread_Worker *work)
{
eina_thread_cancellable_set(EINA_FALSE, NULL);
eina_thread_name_set(eina_thread_self(), "Ethread-feedback");
work->self = PHS();
EINA_THREAD_CLEANUP_PUSH(_ecore_direct_worker_cleanup, work);
if (work->message_run)
work->u.message_run.func_main((void *) work->data, (Ecore_Thread *) work);
else
work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
ecore_main_loop_thread_safe_call_async((Ecore_Cb) _ecore_thread_join,
(void*)(intptr_t)PHS());
eina_thread_cancellable_set(EINA_FALSE, NULL);
EINA_THREAD_CLEANUP_POP(EINA_TRUE);
return NULL;
}
static void
_ecore_thread_worker_cleanup(void *data EINA_UNUSED)
{
DBG("cleanup thread=%" PRIu64 " (should join)", PHS());
SLKL(_ecore_pending_job_threads_mutex);
_ecore_thread_count--;
SLKU(_ecore_pending_job_threads_mutex);
ecore_main_loop_thread_safe_call_async((Ecore_Cb) _ecore_thread_join,
(void*)(intptr_t)PHS());
}
static void *
_ecore_thread_worker(void *data EINA_UNUSED)
{
eina_thread_cancellable_set(EINA_FALSE, NULL);
EINA_THREAD_CLEANUP_PUSH(_ecore_thread_worker_cleanup, NULL);
restart:
/* these 2 are cancellation points as user cb may enable */
_ecore_short_job(PHS());
_ecore_feedback_job(PHS());
/* from here on, cancellations are guaranteed to be disabled */
/* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
eina_thread_name_set(eina_thread_self(), "Ethread-worker");
@ -463,12 +518,10 @@ restart:
SLKU(_ecore_pending_job_threads_mutex);
goto restart;
}
_ecore_thread_count--;
ecore_main_loop_thread_safe_call_async((Ecore_Cb) _ecore_thread_join,
(void*)(intptr_t)PHS());
SLKU(_ecore_pending_job_threads_mutex);
EINA_THREAD_CLEANUP_POP(EINA_TRUE);
return NULL;
}
@ -728,6 +781,7 @@ ecore_thread_cancel(Ecore_Thread *thread)
/* Delay the destruction */
on_exit:
eina_thread_cancel(work->self); /* noop unless eina_thread_cancellable_set() was used by user */
SLKL(work->cancel_mutex);
work->cancel = EINA_TRUE;
SLKU(work->cancel_mutex);

View File

@ -3126,11 +3126,19 @@ typedef struct _Efl_Net_Resolve_Async_Data
} Efl_Net_Resolve_Async_Data;
static void
_efl_net_resolve_async_run(void *data, Ecore_Thread *thread)
_efl_net_resolve_async_run(void *data, Ecore_Thread *thread EINA_UNUSED)
{
Efl_Net_Resolve_Async_Data *d = data;
while (!ecore_thread_check(thread))
/* allows ecore_thread_cancel() to cancel at some points, see
* man:pthreads(7).
*
* no need to set cleanup functions since the main thread will
* handle that with _efl_net_resolve_async_cancel().
*/
eina_thread_cancellable_set(EINA_TRUE, NULL);
while (EINA_TRUE)
{
DBG("resolving host='%s' port='%s'", d->host, d->port);
d->gai_error = getaddrinfo(d->host, d->port, d->hints, &d->result);
@ -3141,6 +3149,8 @@ _efl_net_resolve_async_run(void *data, Ecore_Thread *thread)
break;
}
eina_thread_cancellable_set(EINA_FALSE, NULL);
if (eina_log_domain_level_check(_ecore_con_log_dom, EINA_LOG_LEVEL_DBG))
{
char buf[INET6_ADDRSTRLEN + sizeof("[]:65536")] = "";
@ -3235,12 +3245,20 @@ typedef struct _Efl_Net_Connect_Async_Data
} Efl_Net_Connect_Async_Data;
static void
_efl_net_connect_async_run(void *data, Ecore_Thread *thread)
_efl_net_connect_async_run(void *data, Ecore_Thread *thread EINA_UNUSED)
{
Efl_Net_Connect_Async_Data *d = data;
char buf[INET6_ADDRSTRLEN + sizeof("[]:65536")] = "";
int r;
/* allows ecore_thread_cancel() to cancel at some points, see
* man:pthreads(7).
*
* no need to set cleanup functions since the main thread will
* handle that with _efl_net_connect_async_cancel().
*/
eina_thread_cancellable_set(EINA_TRUE, NULL);
d->error = 0;
d->sockfd = efl_net_socket4(d->addr->sa_family, d->type, d->protocol, d->close_on_exec);
@ -3251,14 +3269,6 @@ _efl_net_connect_async_run(void *data, Ecore_Thread *thread)
return;
}
if (ecore_thread_check(thread))
{
d->error = ECANCELED;
close(d->sockfd);
d->sockfd = -1;
return;
}
if (eina_log_domain_level_check(_ecore_con_log_dom, EINA_LOG_LEVEL_DBG))
efl_net_ip_port_fmt(buf, sizeof(buf), d->addr);
@ -3267,10 +3277,15 @@ _efl_net_connect_async_run(void *data, Ecore_Thread *thread)
r = connect(d->sockfd, d->addr, d->addrlen);
if (r < 0)
{
int fd = d->sockfd;
d->error = errno;
close(d->sockfd);
d->sockfd = -1;
DBG("connect(%d, %s) failed: %s", d->sockfd, buf, strerror(errno));
/* close() is a cancellation point, thus unset sockfd before
* closing, so the main thread _efl_net_connect_async_cancel()
* won't close it again.
*/
close(fd);
DBG("connect(%d, %s) failed: %s", fd, buf, strerror(errno));
return;
}

View File

@ -243,7 +243,6 @@ extern "C" {
#include <eina_cpu.h>
#include <eina_sched.h>
#include <eina_tiler.h>
#include <eina_thread.h>
#include <eina_hamster.h>
#include <eina_matrixsparse.h>
#include <eina_str.h>
@ -254,6 +253,7 @@ extern "C" {
#include <eina_quadtree.h>
#include <eina_simple_xml_parser.h>
#include <eina_lock.h>
#include <eina_thread.h> /* after eina_lock.h since it will include pthread.h with proper flags */
#include <eina_prefix.h>
#include <eina_refcount.h>
#include <eina_mmap.h>

View File

@ -23,6 +23,7 @@
#include <stdlib.h>
#include "eina_config.h"
#include "eina_lock.h" /* it will include pthread.h with proper flags */
#include "eina_thread.h"
#include "eina_sched.h"
#include "eina_cpu.h"
@ -226,6 +227,37 @@ eina_thread_name_set(Eina_Thread t, const char *name)
return EINA_FALSE;
}
EAPI Eina_Bool
eina_thread_cancel(Eina_Thread t)
{
return pthread_cancel((pthread_t)t) == 0;
}
EAPI Eina_Bool
eina_thread_cancellable_set(Eina_Bool cancellable, Eina_Bool *was_cancellable)
{
int state = cancellable ? PTHREAD_CANCEL_ENABLE : PTHREAD_CANCEL_DISABLE;
int old = 0;
int r;
/* enforce deferred in case users changed to asynchronous themselves */
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &old);
r = pthread_setcancelstate(state, &old);
if (was_cancellable && r == 0)
*was_cancellable = (old == PTHREAD_CANCEL_ENABLE);
return r == 0;
}
EAPI void
eina_thread_cancel_checkpoint(void)
{
pthread_testcancel();
}
EAPI const void *EINA_THREAD_JOIN_CANCELED = PTHREAD_CANCELED;
Eina_Bool
eina_thread_init(void)
{

View File

@ -99,6 +99,20 @@ EAPI Eina_Bool eina_thread_create(Eina_Thread *t,
Eina_Thread_Priority prio, int affinity,
Eina_Thread_Cb func, const void *data) EINA_ARG_NONNULL(1, 4) EINA_WARN_UNUSED_RESULT;
/**
* The return value of eina_thread_join() if it was canceled with
* eina_thread_cancel().
*
* A thread must be explicitly flagged as cancellable with
* eina_thread_cancellable_set(), by default it's not and this value
* shouldn't be returned.
*
* @see eina_thread_join()
*
* @since 1.19
*/
EAPI extern const void *EINA_THREAD_JOIN_CANCELED;
/**
* Join a currently running thread, waiting until it finishes.
*
@ -110,6 +124,8 @@ EAPI Eina_Bool eina_thread_create(Eina_Thread *t,
* @param t thread identifier to wait.
* @return value returned by @a t creation function @c func() or
* @c NULL on errors. Check error with @ref Eina_Error_Group.
* If the thread was canceled, it will return
* EINA_THREAD_JOIN_CANCELED.
* @since 1.8
*/
EAPI void *eina_thread_join(Eina_Thread t);
@ -131,6 +147,209 @@ EAPI void *eina_thread_join(Eina_Thread t);
*/
EAPI Eina_Bool eina_thread_name_set(Eina_Thread t, const char *name);
/**
* Attempt to cancel a running thread.
*
* This function sends a cancellation request to the thread, however
* that request is only fulfilled if the thread is cancellable
* (eina_thread_cancellable_set() with EINA_TRUE as first paramter)
* and it will wait for a cancellation point, be
* eina_thread_cancel_checkpoint() or some syscall as defined in
* man:pthreads(7).
*
* A thread that was canceled will return EINA_THREAD_JOIN_CANCELED
* when eina_thread_join() is called.
*
* @param t thread to cancel.
*
* @return EINA_FALSE if thread was not running, EINA_TRUE
* otherwise. Note that if a thread is not cancellable and it
* is running, this function will return EINA_TRUE!
*
* @since 1.19
*/
EAPI Eina_Bool eina_thread_cancel(Eina_Thread t);
/**
* Enable or disable if the current thread can be canceled.
*
* By default eina_thread_create() will return a thread with
* cancellation disabled. One can enable the cancellation by using
* EINA_TRUE in @a cancellable.
*
* Eina threads follow pthread_setcanceltype()
* PTHREAD_CANCEL_DEFERRED, that is, the actual termination will wait
* for a cancellation point, usually a syscall defined in
* man:pthreads(7) or an explicit cancellation point defined with
* eina_thread_cancel_checkpoint().
*
* In order to provide cleanup around critical blocks use
* EINA_THREAD_CLEANUP_PUSH() and EINA_THREAD_CLEANUP_POP() macros
* (which maps to pthread_cleanup_push() and pthread_cleanup_pop()),
* or the helper function eina_thread_cancellable_run() which does the
* pair for you.
*
* @param cancellable if EINA_TRUE, this thread will be accept
* cancellation requests. If EINA_FALSE -- the default, it will
* ignore cancellation requests.
* @param was_cancellable if non-NULL, will return the previous state,
* shall you want to restore.
*
* @return EINA_TRUE if it succeeds in setting the cancellable state
* or EINA_FALSE otherwise.
*
* @see eina_thread_cancel_checkpoint()
* @see EINA_THREAD_CLEANUP_PUSH()
* @see EINA_THREAD_CLEANUP_POP()
* @see eina_thread_cancellable_run()
* @see eina_thread_cancel()
*
* @since 1.19
*/
EAPI Eina_Bool eina_thread_cancellable_set(Eina_Bool cancellable, Eina_Bool *was_cancellable);
/**
* If the current thread is cancellable, this introduces a
* cancellation check point. Otherwise it's a no-operation.
*
* Eina threads follow pthread_setcanceltype()
* PTHREAD_CANCEL_DEFERRED, that is, the actual termination will wait
* for a cancellation point, usually a syscall defined in
* man:pthreads(7) or an explicit cancellation point defined with this
* function.
*
* @see eina_thread_cancel_checkpoint()
*
* @since 1.19
*/
EAPI void eina_thread_cancel_checkpoint(void);
/**
* @def EINA_THREAD_CLEANUP_PUSH(cleanup, data)
*
* @brief Push a cleanup function to be executed when the thread is
* canceled.
*
* This macro will schedule a function cleanup(data) to be executed if
* the thread is canceled with eina_thread_cancel() and the thread
* was previously marked as cancellable with
* eina_thread_cancellable_set().
*
* It @b must be paired with EINA_THREAD_CLEANUP_POP() in the same
* code block as they will expand to do {} while ()!
*
* The cleanup function may also be executed if
* EINA_THREAD_CLEANUP_POP(EINA_TRUE) is used.
*
* @note If the block within EINA_THREAD_CLEANUP_PUSH() and
* EINA_THREAD_CLEANUP_POP() returns, the cleanup callback will
* @b not be executed! To avoid problems prefer to use
* eina_thread_cancellable_run()!
*
* @param cleanup the function to execute on cancellation.
* @param data the context to give to cleanup function.
*
* @see eina_thread_cancellable_run()
*
* @since 1.19
*/
#define EINA_THREAD_CLEANUP_PUSH(cleanup, data) \
pthread_cleanup_push(cleanup, data)
/**
* @def EINA_THREAD_CLEANUP_POP(exec_cleanup)
*
* @brief Pop a cleanup function to be executed when the thread is
* canceled.
*
* This macro will remove a previously pushed cleanup function, thus
* if the thread is canceled with eina_thread_cancel() and the thread
* was previously marked as cancellable with
* eina_thread_cancellable_set(), that cleanup won't be executed
* anymore.
*
* It @b must be paired with EINA_THREAD_CLEANUP_PUSH() in the same
* code block as they will expand to do {} while ()!
*
* @note If the block within EINA_THREAD_CLEANUP_PUSH() and
* EINA_THREAD_CLEANUP_POP() returns, the cleanup callback will
* @b not be executed even if exec_cleanup is EINA_TRUE! To
* avoid problems prefer to use eina_thread_cancellable_run()!
*
* @param exec_cleanup if EINA_TRUE, the function registered with
* EINA_THREAD_CLEANUP_PUSH() will be executed.
*
* @see eina_thread_cancellable_run()
*
* @since 1.19
*/
#define EINA_THREAD_CLEANUP_POP(exec_cleanup) \
pthread_cleanup_pop(exec_cleanup)
/**
* @typedef Eina_Thread_Cancellable_Run_Cb
* Type for the definition of a cancellable callback to run.
*
* @since 1.19
*/
typedef void *(*Eina_Thread_Cancellable_Run_Cb)(void *data);
/**
* This function will setup cleanup callback, turn the thread
* cancellable, execute the given callback, reset the cancellable
* state to its old value, run the cleanup callback and then return
* the callback return value.
*
* @note cleanup_cb is configured @b before the thread is made
* cancellable, thus it @b will be executed while @a cb may not
* in the case the thread was already canceled and that was
* pending.
*
* This helper does exactly the following code. Shall you need a
* slightly different behavior, use the base calls yourself.
*
* @code
* Eina_Bool old = EINA_FALSE;
* void *ret;
*
* EINA_THREAD_CLEANUP_PUSH(cleanup_cb, data);
* eina_thread_cancellable_set(EINA_TRUE, &old); // is a cancellation point
* ret = cb(data); // may not run if was previously canceled
* EINA_THREAD_CLEANUP_POP(EINA_TRUE);
* eina_thread_cancellable_set(old, NULL);
* return ret;
* @endcode
*
* @param cb a cancellable callback to possibly run. The callback @b
* may not be executed if the thread had a pending cancellation
* request. During its execution the callback may be canceled
* at explicit cancellation points using
* eina_thread_cancel_checkpoint(), as well as some syscalls
* defined in man:pthreads(7).
* @param cleanup_cb a cleanup callback to be executed regardless of
* the thread being canceled or not. This function will be
* executed even if @a cb wasn't.
* @param data context to give to both @a cb and @a cleanup_cb.
*
* @return the return value of @a cb. If the thread was canceled,
* this function will not return.
*
* @since 1.19
*/
static inline void *
eina_thread_cancellable_run(Eina_Thread_Cancellable_Run_Cb cb, Eina_Free_Cb cleanup_cb, void *data)
{
Eina_Bool old = EINA_FALSE;
void *ret;
EINA_THREAD_CLEANUP_PUSH(cleanup_cb, data);
eina_thread_cancellable_set(EINA_TRUE, &old); // is a cancellation point
ret = cb(data); // may not run if was previously canceled
EINA_THREAD_CLEANUP_POP(EINA_TRUE);
eina_thread_cancellable_set(old, NULL);
return ret;
}
/**
* @}
*/