ecore: add Ecore_Coroutine.

That work clearly was possible thanks to Leandro. If you want more information
go to his blog : http://tia.mat.br/posts/async_io_with_coroutines/ .

The main difference with his implementation is more portable and not thread safe.
It does not have a custom swapcontext (would make sense as we don't need to save
the sigcontext) so it will be less fast. If people are ready to contribute asm
patch for that purpose I will be happy to apply them.

As for portability this code should work on all architecture we already support
thanks to a nice hack with setjmp/longjmp borowed from libcoroutine. We do use
Fiber for Windows support, but as 1.8 is completely borken in that regard, this
is theorical work only.

Thinks left to do :
- Eoify the API
- Documentation
- More tests
- Add support for coroutine in fd handler
- Add coroutine support to ecore_thread api
- Write some example
This commit is contained in:
Cedric BAIL 2013-02-18 14:32:51 +01:00
parent ceed029a84
commit 91711d8a69
9 changed files with 501 additions and 2 deletions

View File

@ -1805,6 +1805,51 @@ fi
EFL_ADD_LIBS([ECORE], [${LTLIBINTL}])
# coroutine function specific
AC_COMPILE_IFELSE(
[AC_LANG_PROGRAM(
[[
#include <ucontext.h>
]],
[[
ucontext_t test;
getcontext(&test);
]])],
[have_ucontext="yes"],
[have_ucontext="no"])
AC_MSG_CHECKING([for ucontext])
AC_MSG_RESULT([${have_ucontext}])
AC_COMPILE_IFELSE(
[AC_LANG_PROGRAM(
[[
#include <setjmp.h>
]],
[[
jmp_buf context;
setjmp(&context);
]])],
[have_setjmp="yes"],
[have_setjmp="no"])
AC_MSG_CHECKING([for setjmp])
AC_MSG_RESULT([${have_setjmp}])
if test "X${have_windows}" = "xyes"; then
AC_DEFINE(USE_FIBER, 1, [Define to 1 if you have Windows Fiber support.])
EFL_ADD_FEATURE([system], [coroutine], [fiber])
elif test "x${have_ucontext}" = "xyes"; then
AC_DEFINE(USE_UCONTEXT, 1, [Define to 1 if you have posix ucontext functions.])
EFL_ADD_FEATURE([system], [coroutine], [ucontext])
elif test "x${have_setjmp}" = "xyes"; then
AC_DEFINE(USE_SETJMP, 1, [Define to 1 if you have setjmp/longjmp functions.])
EFL_ADD_FEATURE([system], [coroutine], [setjmp])
else
AC_MSG_ERROR([You don't have a working way to implement coroutine. Exiting...])
fi
### Check availability
EFL_LIB_END([Ecore])

View File

@ -27,6 +27,7 @@ lib/ecore/ecore_time.c \
lib/ecore/ecore_timer.c \
lib/ecore/ecore_thread.c \
lib/ecore/ecore_throttle.c \
lib/ecore/ecore_coroutine.c \
lib/ecore/ecore_private.h
if HAVE_WIN32
@ -66,6 +67,7 @@ tests/ecore/ecore_test_ecore.c \
tests/ecore/ecore_test_ecore_con.c \
tests/ecore/ecore_test_ecore_x.c \
tests/ecore/ecore_test_ecore_imf.c \
tests/ecore/ecore_test_coroutine.c \
tests/ecore/ecore_suite.h
tests_ecore_ecore_suite_CPPFLAGS = \

View File

@ -2779,6 +2779,27 @@ EAPI void *ecore_thread_global_data_wait(const char *key, double seconds);
* @}
*/
typedef struct _Ecore_Coroutine Ecore_Coroutine;
typedef int (*Ecore_Coroutine_Cb)(void *data, Ecore_Coroutine *coro);
typedef enum {
ECORE_COROUTINE_NEW,
ECORE_COROUTINE_RUNNING,
ECORE_COROUTINE_FINISHED
} Ecore_Coroutine_State;
EAPI Ecore_Coroutine *ecore_coroutine_add(int stack_size, Ecore_Coroutine_Cb func, void *data);
EAPI void *ecore_coroutine_del(Ecore_Coroutine *coro);
EAPI int ecore_coroutine_resume(Ecore_Coroutine *coro);
EAPI void ecore_coroutine_yield(Ecore_Coroutine *coro, int value);
EAPI void *ecore_coroutine_data_get(Ecore_Coroutine *coro);
EAPI Ecore_Coroutine_State ecore_coroutine_state_get(Ecore_Coroutine *coro);
EAPI void ecore_coroutine_defer(Ecore_Coroutine *coro, Eina_Free_Cb func, void *data);
EAPI void *ecore_coroutine_alloc(Ecore_Coroutine *coro, size_t size);
/**
* @defgroup Ecore_Pipe_Group Pipe wrapper
*

View File

@ -166,6 +166,7 @@ ecore_init(void)
_ecore_glib_init();
_ecore_job_init();
_ecore_time_init();
_ecore_coroutine_init();
eina_lock_new(&_thread_mutex);
eina_condition_new(&_thread_cond, &_thread_mutex);
@ -234,8 +235,9 @@ ecore_shutdown(void)
}
if (--_ecore_init_count != 0)
goto unlock;
if (_ecore_fps_debug) _ecore_fps_debug_shutdown();
_ecore_coroutine_shutdown();
_ecore_poller_shutdown();
_ecore_animator_shutdown();
_ecore_glib_shutdown();

View File

@ -0,0 +1,339 @@
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <stdlib.h>
#include <sys/time.h>
#include <assert.h>
#include <sys/types.h>
#include <unistd.h>
#if defined(USE_UCONTEXT)
# include <sys/time.h>
# include <sys/resource.h>
# include <ucontext.h>
#elif defined(USE_SETJMP)
# include <sys/time.h>
# include <sys/resource.h>
# include <setjmp.h>
#endif
#ifdef HAVE_EVIL
# include <Evil.h>
#endif
#ifdef _WIN32
# define USE_FIBERS
#endif
#include "Ecore.h"
#include "ecore_private.h"
typedef struct _Ecore_Coroutine_Defer Ecore_Coroutine_Defer;
struct _Ecore_Coroutine_Defer
{
Eina_Free_Cb func;
void *data;
};
struct _Ecore_Coroutine
{
#if defined(USE_FIBERS)
void *context;
#elif defined(USE_UCONTEXT)
ucontext_t context;
#elif defined(USE_SETJMP)
jmp_buf context;
#else
# error "No coroutine implementation !"
#endif
Eina_Inarray defer;
Ecore_Coroutine_Cb func;
void *data;
Ecore_Coroutine_State state;
#ifdef USE_VALGRIND
int vg_stack_id;
#endif
int yield_value;
unsigned char stack[1];
};
#ifdef __x86_64__
static const int const _ecore_coroutine_default_stack_size = 16 * 1024;
#else
static const int const _ecore_coroutine_default_stack_size = 12 * 1024;
#endif
static void
_ecore_coroutine_finish(Ecore_Coroutine *coro)
{
int return_value = coro->func(coro->data, coro);
coro->state = ECORE_COROUTINE_FINISHED;
ecore_coroutine_yield(coro, return_value);
}
#if defined(USE_UCONTEXT)
# ifdef __x86_64__
union ptr_splitter {
void *ptr;
uint32_t part[sizeof(void *) / sizeof(uint32_t)];
};
static void
_ecore_coroutine_entry_point(uint32_t part0, uint32_t part1)
{
union ptr_splitter p = {
.part = { part0, part1 }
};
Ecore_Coroutine *coro = p.ptr;
_ecore_coroutine_finish(coro);
}
# else
static void
_ecore_coroutine_entry_point(Ecore_Coroutine *coro)
{
_ecore_coroutine_finish(coro);
}
# endif
#else
static void
_ecore_coroutine_entry_point(Ecore_Coroutine *coro)
{
_ecore_coroutine_finish(coro);
}
# if defined(USE_SETJMP)
static void
_ecore_coroutine_setjmp(Ecore_Coroutine *coro)
{
setjmp(coro->context);
/* The idea of this trick come from libcoroutine */
/* __jmpbuf[6] == stack pointer */
/* __jmpbuf[7] == program counter */
self->env[0].__jmpbuf[6] = ((uintptr_t)(&coro->stack));
self->env[0].__jmpbuf[7] = ((uintptr_t)_ecore_coroutine_entry_point);
}
# endif
#endif
#if defined(USE_FIBERS)
static void *caller;
static void *callee;
#elif defined(USE_UCONTEXT)
static ucontext_t caller;
static ucontext_t callee;
#elif defined(USE_SETJMP)
static jmp_buf caller;
static jmp_buf callee;
#endif
void
_ecore_coroutine_init(void)
{
#if defined(USE_FIBERS)
caller = GetCurrentFiber();
if (caller == (LPVOID) 0x1e00)
{
caller = ConvertThreadToFiber(NULL);
}
#else
memset(&caller, 0, sizeof (caller));
memset(&callee, 0, sizeof (callee));
#endif
}
void
_ecore_coroutine_shutdown(void)
{
#ifdef USE_FIBERS
ConvertFiberToThread();
#endif
// FIXME: should we track lost coroutine ?
}
EAPI Ecore_Coroutine *
ecore_coroutine_add(int stack_size, Ecore_Coroutine_Cb func, void *data)
{
Ecore_Coroutine *coro;
unsigned char *stack;
if (stack_size <= 0)
{
#if defined(USE_UCONTEXT) || defined(USE_SETJMP)
struct rlimit check;
if (getrlimit(RLIMIT_STACK, &check))
check.rlim_cur = _ecore_coroutine_default_stack_size;
stack_size = check.rlim_cur;
#elif defined(USE_FIBERS)
stack_size = _ecore_coroutine_default_stack_size;
#endif
if (stack_size < _ecore_coroutine_default_stack_size)
stack_size = _ecore_coroutine_default_stack_size;
}
coro = malloc(sizeof (Ecore_Coroutine) + stack_size - 1);
if (!coro) return NULL;
stack = coro->stack;
#ifdef USE_VALGRIND
coro->vg_stack_id = VALGRIND_STACK_REGISTER(stack, stack + stack_size);
#endif
coro->state = ECORE_COROUTINE_NEW;
coro->func = func;
coro->data = data;
eina_inarray_step_set(&coro->defer,
sizeof (Eina_Inarray), sizeof (Ecore_Coroutine_Defer),
8);
#if defined(USE_UCONTEXT)
getcontext(&coro->context);
coro->context.uc_stack.ss_sp = stack;
coro->context.uc_stack.ss_size = stack_size;
coro->context.uc_stack.ss_flags = 0;
coro->context.uc_link = NULL;
# ifdef __x86_64__
union ptr_splitter p = { .ptr = coro };
makecontext(&coro->context, (void (*)())_ecore_coroutine_entry_point,
2, p.part[0], p.part[1]);
# else
makecontext(&coro->context, (void (*)())_ecore_coroutine_entry_point,
1, coro);
# endif
#elif defined(USE_FIBERS)
coro->context = CreateFiber(stack_size,
(LPFIBER_START_ROUTINE)_ecore_coroutine_entry_point,
coro);
if (!coro->context)
{
free(coro);
return NULL;
}
#elif defined(USE_SETJMP)
/* We use an intermediate function call to setup the stack with the right arguments */
_ecore_coroutine_setjmp(coro);
#endif
return coro;
}
EAPI void *
ecore_coroutine_del(Ecore_Coroutine *coro)
{
void *data;
data = coro->data;
while (eina_inarray_count(&coro->defer))
{
Ecore_Coroutine_Defer *defer;
defer = eina_inarray_pop(&coro->defer);
defer->func(defer->data);
}
eina_inarray_flush(&coro->defer);
#ifdef USE_VALGRIND
VALGRIND_STACK_DEREGISTER(coro->vg_stack_id);
#endif
#ifdef USE_FIBERS
DeleteFiber(coro->context);
#endif
free(coro);
return data;
}
EAPI int
ecore_coroutine_resume(Ecore_Coroutine *coro)
{
#if defined(USE_FIBERS)
void *prev_caller;
#elif defined(USE_UCONTEXT)
ucontext_t prev_caller;
#elif defined(USE_SETJMP)
jmp_buf prev_caller;
#endif
if (coro->state == ECORE_COROUTINE_FINISHED)
return 0;
coro->state = ECORE_COROUTINE_RUNNING;
prev_caller = caller;
#if defined(USE_FIBERS)
SwitchToFiber(coro->context);
#elif defined(USE_UCONTEXT)
swapcontext(&caller, &coro->context);
#elif defined(USE_SETJMP)
setjmp(caller);
longjmp(coro->context);
#endif
#ifndef USE_FIBERS
// As fiber do handle the callee stack for us, no need here
coro->context = callee;
#endif
caller = prev_caller;
return coro->yield_value;
}
EAPI void
ecore_coroutine_yield(Ecore_Coroutine *coro, int value)
{
coro->yield_value = value;
#if defined(USE_FIBERS)
SwitchToFiber(caller);
#elif defined(USE_UCONTEXT)
swapcontext(&callee, &caller);
#elif defined(USE_SETJMP)
setjmp(callee);
longjmp(caller);
#endif
}
EAPI void *
ecore_coroutine_data_get(Ecore_Coroutine *coro)
{
return coro->data;
}
EAPI Ecore_Coroutine_State
ecore_coroutine_state_get(Ecore_Coroutine *coro)
{
return coro->state;
}
EAPI void
ecore_coroutine_defer(Ecore_Coroutine *coro, Eina_Free_Cb func, void *data)
{
Ecore_Coroutine_Defer *defer;
defer = eina_inarray_grow(&coro->defer, 1);
defer->func = func;
defer->data = data;
}
EAPI void *
ecore_coroutine_alloc(Ecore_Coroutine *coro, size_t size)
{
void *data;
data = malloc(size);
ecore_coroutine_defer(coro, free, data);
return data;
}

View File

@ -238,6 +238,9 @@ void _ecore_job_shutdown(void);
void _ecore_main_loop_init(void);
void _ecore_main_loop_shutdown(void);
void _ecore_coroutine_init(void);
void _ecore_coroutine_shutdown(void);
void _ecore_throttle(void);
void _ecore_main_call_flush(void);

View File

@ -25,6 +25,7 @@ static const Ecore_Test_Case etc[] = {
#if HAVE_ECORE_AUDIO
{ "Ecore Audio", ecore_test_ecore_audio},
#endif
{ "Ecore_Coroutine", ecore_test_coroutine },
{ NULL, NULL }
};

View File

@ -8,6 +8,6 @@ void ecore_test_ecore_con(TCase *tc);
void ecore_test_ecore_x(TCase *tc);
void ecore_test_ecore_imf(TCase *tc);
void ecore_test_ecore_audio(TCase *tc);
void ecore_test_coroutine(TCase *tc);
#endif /* _ECORE_SUITE_H */

View File

@ -0,0 +1,86 @@
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <stdio.h>
#include <unistd.h>
#include <Eina.h>
#include <Ecore.h>
#include "ecore_suite.h"
typedef struct _Ecore_Coroutine_Test Ecore_Coroutine_Test;
struct _Ecore_Coroutine_Test
{
int v;
int s;
};
static Ecore_Coroutine_Test t1;
static int
_ecore_test_v1(void *data, Ecore_Coroutine *coro)
{
int *t = data;
ecore_coroutine_yield(coro, 7);
t1.v = *t;
ecore_coroutine_yield(coro, 42);
t1.s = *t;
return 0xDEADBEEF;
}
static int
_ecore_test_v2(void *data, Ecore_Coroutine *coro EINA_UNUSED)
{
int *t = data;
ecore_coroutine_yield(coro, 42);
t1.v = *t;
ecore_coroutine_yield(coro, 7);
t1.s = *t;
return 0xDEADBEEF;
}
START_TEST(ecore_test_coroutine_simple)
{
Ecore_Coroutine *coro1;
Ecore_Coroutine *coro2;
int value[] = { 7, 42, 0xDEADBEEF };
ecore_init();
t1.v = 0xDEADBEEF; t1.s = 0xDEADBEEF;
coro1 = ecore_coroutine_add(0, _ecore_test_v1, &value[0]);
coro2 = ecore_coroutine_add(4*1024*1024, _ecore_test_v2, &value[1]);
fail_if(ecore_coroutine_state_get(coro1) != ECORE_COROUTINE_NEW);
fail_if(ecore_coroutine_resume(coro1) != 7);
fail_if(t1.v != 0xDEADBEEF || t1.s != 0xDEADBEEF);
fail_if(ecore_coroutine_resume(coro2) != 42);
fail_if(t1.v != 0xDEADBEEF || t1.s != 0xDEADBEEF);
fail_if(ecore_coroutine_resume(coro1) != 42);
fail_if(t1.v != value[0] || t1.s != 0xDEADBEEF);
fail_if(ecore_coroutine_resume(coro1) != 0xDEADBEEF);
fail_if(t1.v != value[0] || t1.s != value[0]);
fail_if(ecore_coroutine_resume(coro2) != 7);
fail_if(t1.v != value[1] || t1.s != value[0]);
fail_if(ecore_coroutine_resume(coro2) != 0xDEADBEEF);
fail_if(t1.v != value[1] || t1.s != value[1]);
fail_if(ecore_coroutine_state_get(coro2) != ECORE_COROUTINE_FINISHED);
fail_if(ecore_coroutine_del(coro1) != &value[0]);
fail_if(ecore_coroutine_del(coro2) != &value[1]);
ecore_shutdown();
}
END_TEST
void ecore_test_coroutine(TCase *tc)
{
tcase_add_test(tc, ecore_test_coroutine_simple);
}