Efl.Loop.coro: easy to use coroutines attached to a main loop.

While Eina_Coro provides a solid base, to use the main loop to
schedule coroutines it needs some manual work we want to avoid.

Efl.Loop.coro method will take a function and schedule it using the
given priority, the returned value is then resolved in a promise for
the returned future.

Basically all users must do is write a function that looks like a
synchronous code and calls eina_coro_yield() (or helper macros), that
will go back to the main loop and then it will reschedule the
coroutine to run according to its priority.

This should reduce the number of callbacks in user's code.
This commit is contained in:
Gustavo Sverzut Barbieri 2017-08-27 00:42:22 -03:00
parent 109bf1b387
commit 306ec6937b
3 changed files with 339 additions and 0 deletions

View File

@ -3216,6 +3216,182 @@ _efl_loop_Eina_FutureXXX_timeout(Eo *obj, Efl_Loop_Data *pd EINA_UNUSED, double
return NULL;
}
typedef struct _Efl_Loop_Coro {
Eina_Promise *promise;
Eina_Coro *coro;
Efl_Loop *loop;
Eina_Future *scheduled;
Efl_Loop_Coro_Cb func;
const void *func_data;
Eina_Free_Cb func_free_cb;
Efl_Loop_Coro_Prio prio;
Eina_Value value;
} Efl_Loop_Coro;
static void
_efl_loop_coro_free(Efl_Loop_Coro *lc)
{
if (lc->func_free_cb) lc->func_free_cb((void *)lc->func_data);
if (lc->scheduled) eina_future_cancel(lc->scheduled);
eina_value_flush(&lc->value);
efl_unref(lc->loop);
free(lc);
}
static void _efl_loop_coro_reschedule(Efl_Loop_Coro *lc);
static Eina_Value
_efl_loop_coro_schedule_resolved(void *data, const Eina_Value value, const Eina_Future *dead_future EINA_UNUSED)
{
Efl_Loop_Coro *lc = data;
Eina_Future *awaiting = NULL;
if (value.type == EINA_VALUE_TYPE_ERROR)
{
Eina_Error err;
eina_value_get(&value, &err);
ERR("coro %p scheduled got error %s, try again.",
lc, eina_error_msg_get(err));
}
else if (!eina_coro_run(&lc->coro, NULL, &awaiting))
{
INF("coroutine %p finished with value type=%p (%s)",
lc, lc->value.type,
lc->value.type ? lc->value.type->name : "EMPTY");
eina_promise_resolve(lc->promise, lc->value);
lc->value = EINA_VALUE_EMPTY; // owned by promise
_efl_loop_coro_free(lc);
return value;
}
else if (awaiting)
{
DBG("coroutine %p is awaiting for future %p, do not reschedule", lc, awaiting);
eina_future_chain(awaiting,
{
.cb = _efl_loop_coro_schedule_resolved,
.data = lc,
.storage = &lc->scheduled,
},
efl_future_cb(lc->loop));
}
else _efl_loop_coro_reschedule(lc);
return value;
}
static void
_efl_loop_coro_reschedule(Efl_Loop_Coro *lc)
{
Eina_Future *f;
// high uses 0-timeout instead of job, since job
// is implemented using events and the Ecore implementation
// will never run timers or anything else, just the new jobs :-/
//
// TODO: bug report ecore_main loop bug.
if (lc->prio == EFL_LOOP_CORO_PRIO_HIGH)
f = efl_loop_Eina_FutureXXX_timeout(lc->loop, 0);
else
f = efl_loop_Eina_FutureXXX_idle(lc->loop);
DBG("coroutine %p rescheduled as future=%p", lc, f);
// NOTE: efl_future_cb() doesn't allow for extra 'data', so it matches
// methods more easily. However we need 'lc' and we can't store in
// loop since we'd not know the key for efl_key_data_get().
// Easy solution: use 2 futures, one to bind and another to resolve.
eina_future_chain(f,
{
.cb = _efl_loop_coro_schedule_resolved,
.data = lc,
.storage = &lc->scheduled,
},
efl_future_cb(lc->loop));
}
static void
_efl_loop_coro_cancel(void *data, const Eina_Promise *dead_promise EINA_UNUSED)
{
Efl_Loop_Coro *lc = data;
INF("canceled coroutine %p (coro=%p)", lc, lc->coro);
eina_coro_cancel(&lc->coro);
_efl_loop_coro_free(lc);
}
static const void *
_efl_loop_coro_cb(void *data, Eina_Bool canceled, Eina_Coro *coro)
{
Efl_Loop_Coro *lc = data;
if (canceled) lc->value = eina_value_error_init(ECANCELED);
else lc->value = lc->func((void *)lc->func_data, coro, lc->loop);
return lc;
}
static Eina_Future *
_efl_loop_coro(Eo *obj, Efl_Loop_Data *pd EINA_UNUSED, Efl_Loop_Coro_Prio prio, void *func_data, Efl_Loop_Coro_Cb func, Eina_Free_Cb func_free_cb)
{
Efl_Loop_Coro *lc;
Eina_Promise *p;
Eina_Future *f;
EINA_SAFETY_ON_NULL_RETURN_VAL(func, NULL);
lc = calloc(1, sizeof(Efl_Loop_Coro));
EINA_SAFETY_ON_NULL_GOTO(lc, calloc_failed);
lc->loop = efl_ref(obj);
lc->func = func;
lc->func_data = func_data;
lc->func_free_cb = func_free_cb;
lc->prio = prio;
lc->coro = eina_coro_new(_efl_loop_coro_cb, lc, EINA_CORO_STACK_SIZE_DEFAULT);
EINA_SAFETY_ON_NULL_GOTO(lc, coro_failed);
p = eina_promise_new(efl_loop_future_scheduler_get(obj),
_efl_loop_coro_cancel, lc);
// lc is dead if p is NULL
EINA_SAFETY_ON_NULL_GOTO(p, promise_failed);
lc->promise = p;
// must be done prior to reschedule, as it may resolve on errors
// and promises without futures are simply ignored, will remain
// alive.
f = eina_future_new(p);
_efl_loop_coro_reschedule(lc);
INF("new coroutine %p (coro=%p)", lc, lc->coro);
// NOTE: Eolian should do efl_future_then() to bind future to object.
return efl_future_Eina_FutureXXX_then(obj, f);
promise_failed:
// _efl_loop_coro_cancel() was called, func was run... just return.
// NOTE: Eolian should do efl_future_then() to bind future to object.
return efl_future_Eina_FutureXXX_then(obj,
eina_future_resolved(efl_loop_future_scheduler_get(obj),
eina_value_error_init(ENOMEM)));
coro_failed:
_efl_loop_coro_free(lc);
calloc_failed:
if (func_free_cb) func_free_cb((void *)func_data);
// NOTE: Eolian should do efl_future_then() to bind future to object.
return efl_future_Eina_FutureXXX_then(obj,
eina_future_resolved(efl_loop_future_scheduler_get(obj),
eina_value_error_init(ENOMEM)));
}
/* This event will be triggered when the main loop is destroyed and destroy its timers along */
static void _efl_loop_internal_cancel(Efl_Internal_Promise *p);

View File

@ -7,6 +7,41 @@ struct Efl.Loop.Arguments {
initialization: bool; [[Set to $true when the program should initialize its internal state. This happen once per process instance.]]
}
enum Efl.Loop.Coro.Prio {
[[Priority class for the coroutine.]]
high = 0, [[high priority coroutine, scheduled using zero-timers (will expire as soon as possible).]]
idle, [[low priority coroutine, scheduled when nothing else should run]]
}
function Efl.Loop.Coro.Cb {
params {
coro: ptr(Eina.Coro); [[The coroutine handle, used to $eina_coro_yield() and voluntarily give back control to the main loop until it's rescheduled.]]
loop: Efl.Loop; [[The loop that schedules this coroutine.]]
}
return: generic_value; [[Value that will resolve the promise,
being delivered to the future chain
attached to the coroutine. Note that the
value will be owned by the Efl_Loop_Coro
and Eina_Future subsystems and will be
flushed (eina_value_flush()) once
unused. Its contents must survive the
function return, that is, it shouldn't
keep pointers to the stack.
]]
}; [[Coroutine function, it will be called back from the
coroutine environment and when executed it's guaranteed that the
main loop will be paused, so shared resources are safe to access
(no locks are required).
\@note Eina_Coro may use threads, then take care to handle
thread-local-storage (TLS) details properly, eventually you
may consider eina_coro_hook_add() to be informed when the
main or coroutine will exit and enter. For instance this is
used by Efl_Object (handled transparently for the user).
]]
class Efl.Loop (Efl.Object)
{
[[The Efl Main Loop
@ -107,6 +142,35 @@ class Efl.Loop (Efl.Object)
}
return: own(ptr(Eina.Future)) /* future<void> */; [[The future handle.]]
}
coro {
[[A future promise that will be resolved using a coroutine.
A coroutine is a function that will be executed
cooperatively with the main loop. The main loop will
schedule the coroutine, explicitly giving control to it --
by then the main loop is paused. The coroutine must then
finish and return, or yield control back to the main loop
using $eina_coro_yield(). This allows for shared context
to be safely interchanged with the main loop -- it is
guaranteed that if the coroutine is running, the main loop
is pause; if the main loop is running the coroutine is
paused.
Coroutines are implemented with @Eina.Coro, see their API
and how it's exposed in your language -- it may be the
case that you don't need to worry and it will be managed
transparently by your language/binding.
Once finished the coroutine returns a value, that will be
used to resolve the promise, propagating thru the future
chain.
]]
params {
@in priority: Efl.Loop.Coro.Prio; [[The priority used to schedule the coroutine.]]
@in func: Efl.Loop.Coro.Cb @nonull; [[The function to run as a coroutine.]]
}
return: own(ptr(Eina.Future)) /* future<> */; [[The future handle, it provides the value returned by $func once it exits.]]
}
job {
[[Will execute that promise in the near future.]]
params {

View File

@ -893,6 +893,103 @@ START_TEST(efl_test_promise_eo_link)
}
END_TEST
#define CORO_COUNT 10
#define CORO_SLEEP 0.1
static Eina_Value
_coro(void *data, Eina_Coro *coro, Efl_Loop *loop EINA_UNUSED)
{
int *pi = data;
for (; *pi < CORO_COUNT; (*pi)++)
{
usleep(CORO_SLEEP * 1000000);
eina_coro_yield_or_return(coro, EINA_VALUE_EMPTY);
}
// returned value is an EINA_VALUE_TYPE_PROMISE
return eina_future_as_value(_str_future_get());
}
static Eina_Bool
_timer_test(void *data)
{
int *pi = data;
(*pi)++;
return EINA_TRUE;
}
START_TEST(efl_test_coro)
{
Eina_Future *f;
int coro_count = 0;
int timer_count = 0;
fail_if(!ecore_init());
f = eina_future_then(efl_loop_coro(ecore_main_loop_get(),
EFL_LOOP_CORO_PRIO_IDLE,
&coro_count, _coro, NULL),
.cb = _simple_ok);
fail_if(!f);
// timer is 2x faster so it will always expire
ecore_timer_add(CORO_SLEEP / 2, _timer_test, &timer_count);
ecore_main_loop_begin();
ecore_shutdown();
ck_assert_int_eq(coro_count, CORO_COUNT);
ck_assert_int_ge(timer_count, CORO_COUNT);
}
END_TEST
static Eina_Value
_await(void *data, Eina_Coro *coro, Efl_Loop *loop)
{
int *pi = data;
for (; *pi < CORO_COUNT; (*pi)++)
{
Eina_Future *f = eina_future_chain(efl_loop_Eina_FutureXXX_timeout(loop, CORO_SLEEP),
// convert to string so we don't get dummy EMPTY...
// happened to me during development :-)
eina_future_cb_convert_to(EINA_VALUE_TYPE_STRING));
// await will eina_coro_yield() internally.
Eina_Value v = eina_future_await(f, coro, NULL);
if (v.type == EINA_VALUE_TYPE_ERROR) return v;
ck_assert_ptr_eq(v.type, EINA_VALUE_TYPE_STRING); // job delivers EINA_VALUE_EMPTY
}
// returned value is an EINA_VALUE_TYPE_PROMISE
return eina_future_as_value(_str_future_get());
}
START_TEST(efl_test_promise_future_await)
{
Eina_Future *f;
int coro_count = 0;
int timer_count = 0;
fail_if(!ecore_init());
f = eina_future_then(efl_loop_coro(ecore_main_loop_get(),
EFL_LOOP_CORO_PRIO_IDLE,
&coro_count, _await, NULL),
.cb = _simple_ok);
fail_if(!f);
// timer is 2x faster so it will always expire
ecore_timer_add(CORO_SLEEP / 2, _timer_test, &timer_count);
ecore_main_loop_begin();
ecore_shutdown();
ck_assert_int_eq(coro_count, CORO_COUNT);
ck_assert_int_ge(timer_count, CORO_COUNT);
}
END_TEST
void ecore_test_ecore_promise2(TCase *tc)
{
tcase_add_test(tc, efl_test_timeout);
@ -913,4 +1010,6 @@ void ecore_test_ecore_promise2(TCase *tc)
//FIXME: We should move this to EO tests, however they depend on Ecore...
tcase_add_test(tc, efl_test_promise_eo);
tcase_add_test(tc, efl_test_promise_eo_link);
tcase_add_test(tc, efl_test_coro);
tcase_add_test(tc, efl_test_promise_future_await);
}