ecore - efl threads - add thredio class

this adds a simple indata and outdata void ptr to begin that you can
set on efl.thread objects (set the indata) and get the outdata too to
get results. then on the efl.appthread side the indata is set on the
efl.appthread before it runs and on quit the thresad can set the
outdata on the appthread, and this appears back on the efl.thread
object in the parent thread.

so you can basically share pointers to anything in and out this way on
start/exit in addition to string args etc.

the reason i made it an extra class (mixin actually) is for future
expansion. sharing more complex data - eina values maybe or objects as
long as they are shared objects, and perhaps acting as an interface
for calling a function at the other end like ecore_thread_async_call
etc.
This commit is contained in:
Carsten Haitzler 2018-03-06 20:49:51 +09:00
parent 1ac60fe022
commit 6865ad1773
9 changed files with 305 additions and 26 deletions

View File

@ -17,6 +17,7 @@ ecore_eolian_files_public = \
lib/ecore/efl_loop_message_handler.eo \
lib/ecore/efl_exe.eo \
lib/ecore/efl_thread.eo \
lib/ecore/efl_threadio.eo \
lib/ecore/efl_appthread.eo \
lib/ecore/efl_task.eo \
lib/ecore/efl_io_closer_fd.eo \
@ -138,6 +139,7 @@ lib/ecore/efl_interpolator_cubic_bezier.c \
lib/ecore/efl_task.c \
lib/ecore/efl_exe.c \
lib/ecore/efl_thread.c \
lib/ecore/efl_threadio.c \
lib/ecore/efl_appthread.c \
lib/ecore/ecore_main_timechanges.c \
lib/ecore/ecore_pipe.c \

View File

@ -46,6 +46,7 @@
#include "efl_task.eo.h"
#include "efl_thread.eo.h"
#include "efl_threadio.eo.h"
#include "efl_exe.eo.h"
#include "efl_loop.eo.h"

View File

@ -197,6 +197,7 @@ struct _Efl_Task_Data
struct _Efl_Appthread_Data
{
int read_listeners;
struct {
int in, out;
Eo *in_handler, *out_handler;
@ -204,7 +205,7 @@ struct _Efl_Appthread_Data
Eina_Bool eos_read : 1;
Eina_Bool can_write : 1;
} fd, ctrl;
int read_listeners;
void *thdat;
};

View File

@ -240,6 +240,22 @@ _efl_appthread_efl_io_writer_can_write_get(Eo *obj EINA_UNUSED, Efl_Appthread_Da
return pd->fd.can_write;
}
void _appthread_threadio_call(Eo *obj, Efl_Appthread_Data *pd, void *func_data, EFlThreadIOCall func, Eina_Free_Cb func_free_cb);
EOLIAN static void
_efl_appthread_efl_threadio_call(Eo *obj, Efl_Appthread_Data *pd, void *func_data, EFlThreadIOCall func, Eina_Free_Cb func_free_cb)
{
_appthread_threadio_call(obj, pd, func_data, func, func_free_cb);
}
void *_appthread_threadio_call_sync(Eo *obj, Efl_Appthread_Data *pd, void *func_data, EFlThreadIOCallSync func, Eina_Free_Cb func_free_cb);
EOLIAN static void *
_efl_appthread_efl_threadio_call_sync(Eo *obj, Efl_Appthread_Data *pd, void *func_data, EFlThreadIOCallSync func, Eina_Free_Cb func_free_cb)
{
return _appthread_threadio_call_sync(obj, pd, func_data, func, func_free_cb);
}
//////////////////////////////////////////////////////////////////////////
#include "efl_appthread.eo.c"

View File

@ -1,7 +1,7 @@
import efl_types;
import eina_types;
class Efl.Appthread (Efl.Loop)
class Efl.Appthread (Efl.Loop, Efl.ThreadIO)
{
[[ ]]
methods {
@ -18,5 +18,7 @@ class Efl.Appthread (Efl.Loop)
Efl.Io.Reader.eos { get; set; }
Efl.Io.Writer.write;
Efl.Io.Writer.can_write { get; set; }
Efl.ThreadIO.call;
Efl.ThreadIO.call_sync;
}
}

View File

@ -16,6 +16,8 @@
#define MY_CLASS EFL_THREAD_CLASS
#define APPTHREAD_CLASS EFL_APPTHREAD_CLASS
typedef struct
{
const char *name;
@ -28,15 +30,30 @@ typedef struct
const char **argv;
} args;
Efl_Callback_Array_Item_Full *event_cb;
void *indata, *outdata;
} Thread_Data;
#define CMD_EXIT 1
#define CMD_EXITED 2
#define CMD_EXIT 1
#define CMD_EXITED 2
#define CMD_CALL 3
#define CMD_CALL_SYNC 4
typedef struct
{
int command;
int data;
Eina_Semaphore sem;
void *data;
} Control_Reply;
typedef struct
{
union {
struct {
int command;
int data;
void *ptr[4];
} d;
unsigned char b[64];
};
} Control_Data;
typedef struct _Efl_Thread_Data Efl_Thread_Data;
@ -84,17 +101,34 @@ _cb_thread_ctrl_out(void *data, const Efl_Event *event EINA_UNUSED)
ssize_t ret;
Efl_Appthread_Data *ad;
ad = efl_data_scope_get(obj, EFL_APPTHREAD_CLASS);
cmd.command = 0;
cmd.data = 0;
ad = efl_data_scope_get(obj, APPTHREAD_CLASS);
memset(&cmd, 0, sizeof(cmd));
ret = read(ad->ctrl.out, &cmd, sizeof(Control_Data));
if (ret == sizeof(Control_Data))
{
if (cmd.command == CMD_EXIT)
if (cmd.d.command == CMD_EXIT)
{
efl_event_callback_call(obj, EFL_TASK_EVENT_EXIT, NULL);
efl_loop_quit(obj, eina_value_int_init(0));
}
else if (cmd.d.command == CMD_CALL)
{
EFlThreadIOCall func = cmd.d.ptr[0];
void *data = cmd.d.ptr[1];
Eina_Free_Cb free_func = cmd.d.ptr[2];
if (func) func(data, obj);
if (free_func) free_func(data);
}
else if (cmd.d.command == CMD_CALL_SYNC)
{
EFlThreadIOCallSync func = cmd.d.ptr[0];
void *data = cmd.d.ptr[1];
Eina_Free_Cb free_func = cmd.d.ptr[2];
Control_Reply *rep = cmd.d.ptr[3];
if (func) rep->data = func(data, obj);
if (free_func) free_func(data);
eina_semaphore_release(&(rep->sem), 1);
}
}
}
@ -193,8 +227,10 @@ _efl_thread_main(void *data, Eina_Thread t)
if (thdat->name) eina_thread_name_set(t, thdat->name);
else eina_thread_name_set(t, "Eflthread");
obj = efl_add(EFL_APPTHREAD_CLASS, NULL);
ad = efl_data_scope_get(obj, EFL_APPTHREAD_CLASS);
obj = efl_add(APPTHREAD_CLASS, NULL);
ad = efl_data_scope_get(obj, APPTHREAD_CLASS);
efl_threadio_indata_set(obj, thdat->indata);
efl_threadio_outdata_set(obj, thdat->outdata);
efl_event_callback_array_add(obj, _appthread_event_callback_watch(), ad);
// add handlers for "stdio"
@ -227,6 +263,7 @@ _efl_thread_main(void *data, Eina_Thread t)
ad->fd.out_handler = thdat->fd.out_handler;
ad->ctrl.in_handler = thdat->ctrl.in_handler;
ad->ctrl.out_handler = thdat->ctrl.out_handler;
ad->thdat = thdat;
if (thdat->event_cb)
{
@ -248,9 +285,11 @@ _efl_thread_main(void *data, Eina_Thread t)
ret = efl_loop_begin(obj);
real = efl_loop_exit_code_process(ret);
thdat->outdata = efl_threadio_outdata_get(obj);
cmd.command = CMD_EXITED;
cmd.data = real;
memset(&cmd, 0, sizeof(cmd));
cmd.d.command = CMD_EXITED;
cmd.d.data = real;
write(thdat->ctrl.in, &cmd, sizeof(Control_Data));
efl_del(obj);
@ -292,6 +331,7 @@ _thread_exit_eval(Eo *obj, Efl_Thread_Data *pd)
pd->exit_called = EINA_TRUE;
efl_ref(obj);
if (pd->thdat) efl_threadio_outdata_set(obj, pd->thdat->outdata);
job = eina_future_then(efl_loop_job(loop), _efl_loop_task_exit, obj);
efl_future_Eina_FutureXXX_then(loop, job);
}
@ -321,22 +361,39 @@ _cb_thread_parent_ctrl_out(void *data, const Efl_Event *event EINA_UNUSED)
if (!pd) return;
cmd.command = 0;
cmd.data = 0;
memset(&cmd, 0, sizeof(cmd));
ret = read(pd->ctrl.out, &cmd, sizeof(Control_Data));
if (ret == sizeof(Control_Data))
{
if (cmd.command == CMD_EXITED)
if (cmd.d.command == CMD_EXITED)
{
if (!pd->exit_read)
{
Efl_Task_Data *td = efl_data_scope_get(obj, EFL_TASK_CLASS);
if (td) td->exit_code = cmd.data;
if (td) td->exit_code = cmd.d.data;
pd->exit_read = EINA_TRUE;
_thread_exit_eval(obj, pd);
}
}
else if (cmd.d.command == CMD_CALL)
{
EFlThreadIOCall func = cmd.d.ptr[0];
void *data = cmd.d.ptr[1];
Eina_Free_Cb free_func = cmd.d.ptr[2];
if (func) func(data, obj);
if (free_func) free_func(data);
}
else if (cmd.d.command == CMD_CALL_SYNC)
{
EFlThreadIOCallSync func = cmd.d.ptr[0];
void *data = cmd.d.ptr[1];
Eina_Free_Cb free_func = cmd.d.ptr[2];
Control_Reply *rep = cmd.d.ptr[3];
if (func) rep->data = func(data, obj);
if (free_func) free_func(data);
eina_semaphore_release(&(rep->sem), 1);
}
}
}
@ -461,18 +518,17 @@ _efl_thread_efl_object_destructor(Eo *obj, Efl_Thread_Data *pd)
// if it hasn't been asked to exit... ask it
if (!pd->end_sent) efl_task_end(obj);
cmd.command = 0;
cmd.data = 0;
memset(&cmd, 0, sizeof(cmd));
ret = read(pd->ctrl.out, &cmd, sizeof(Control_Data));
while (ret == sizeof(Control_Data))
{
if (cmd.command == CMD_EXITED)
if (cmd.d.command == CMD_EXITED)
{
if (!pd->exit_read)
{
Efl_Task_Data *td = efl_data_scope_get(obj, EFL_TASK_CLASS);
if (td) td->exit_code = cmd.data;
if (td) td->exit_code = cmd.d.data;
pd->exit_read = EINA_TRUE;
efl_event_callback_call(obj, EFL_TASK_EVENT_EXIT, NULL);
break;
@ -534,6 +590,9 @@ _efl_thread_efl_task_run(Eo *obj, Efl_Thread_Data *pd)
thdat->ctrl.in = -1;
thdat->ctrl.out = -1;
thdat->indata = efl_threadio_indata_get(obj);
thdat->outdata = efl_threadio_outdata_get(obj);
// input/output pipes
if (td->flags & EFL_TASK_FLAGS_USE_STDIN)
{
@ -625,7 +684,10 @@ _efl_thread_efl_task_run(Eo *obj, Efl_Thread_Data *pd)
thdat->ctrl.out = pipe_to_thread [0]; // read - output from parent
pd->ctrl.in = pipe_to_thread [1]; // write - input to child
pd->ctrl.out = pipe_from_thread[0]; // read - output from child
// yes - these are blocking because we write and read very little
fcntl(thdat->ctrl.in, F_SETFL, O_NONBLOCK);
fcntl(thdat->ctrl.out, F_SETFL, O_NONBLOCK);
fcntl(pd->ctrl.in, F_SETFL, O_NONBLOCK);
fcntl(pd->ctrl.out, F_SETFL, O_NONBLOCK);
eina_file_close_on_exec(pd->ctrl.in, EINA_TRUE);
eina_file_close_on_exec(pd->ctrl.out, EINA_TRUE);
eina_file_close_on_exec(thdat->ctrl.in, EINA_TRUE);
@ -733,8 +795,8 @@ _efl_thread_efl_task_end(Eo *obj EINA_UNUSED, Efl_Thread_Data *pd)
Control_Data cmd;
pd->end_sent = EINA_TRUE;
cmd.command = CMD_EXIT;
cmd.data = 0;
memset(&cmd, 0, sizeof(cmd));
cmd.d.command = CMD_EXIT;
write(pd->ctrl.in, &cmd, sizeof(Control_Data));
}
}
@ -928,6 +990,88 @@ _efl_thread_efl_io_writer_can_write_get(Eo *obj EINA_UNUSED, Efl_Thread_Data *pd
return pd->fd.can_write;
}
void
_appthread_threadio_call(Eo *obj EINA_UNUSED, Efl_Appthread_Data *pd,
void *func_data, EFlThreadIOCall func, Eina_Free_Cb func_free_cb)
{
Thread_Data *thdat = pd->thdat;
Control_Data cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.d.command = CMD_CALL;
cmd.d.ptr[0] = func;
cmd.d.ptr[1] = func_data;
cmd.d.ptr[2] = func_free_cb;
write(thdat->ctrl.in, &cmd, sizeof(Control_Data));
}
EOLIAN static void
_efl_thread_efl_threadio_call(Eo *obj EINA_UNUSED, Efl_Thread_Data *pd,
void *func_data, EFlThreadIOCall func, Eina_Free_Cb func_free_cb)
{
Thread_Data *thdat = pd->thdat;
Control_Data cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.d.command = CMD_CALL;
cmd.d.ptr[0] = func;
cmd.d.ptr[1] = func_data;
cmd.d.ptr[2] = func_free_cb;
write(pd->ctrl.in, &cmd, sizeof(Control_Data));
}
void *
_appthread_threadio_call_sync(Eo *obj EINA_UNUSED, Efl_Appthread_Data *pd,
void *func_data, EFlThreadIOCallSync func, Eina_Free_Cb func_free_cb)
{
Thread_Data *thdat = pd->thdat;
Control_Data cmd;
Control_Reply *rep;
void *data;
memset(&cmd, 0, sizeof(cmd));
cmd.d.command = CMD_CALL_SYNC;
cmd.d.ptr[0] = func;
cmd.d.ptr[1] = func_data;
cmd.d.ptr[2] = func_free_cb;
rep = malloc(sizeof(Control_Reply));
if (!rep) return NULL;
cmd.d.ptr[3] = rep;
rep->data = NULL;
eina_semaphore_new(&(rep->sem), 0);
write(thdat->ctrl.in, &cmd, sizeof(Control_Data));
eina_semaphore_lock(&(rep->sem));
data = rep->data;
free(rep);
return data;
}
EOLIAN static void *
_efl_thread_efl_threadio_call_sync(Eo *obj EINA_UNUSED, Efl_Thread_Data *pd,
void *func_data, EFlThreadIOCallSync func, Eina_Free_Cb func_free_cb)
{
Thread_Data *thdat = pd->thdat;
Control_Data cmd;
Control_Reply *rep;
void *data;
memset(&cmd, 0, sizeof(cmd));
cmd.d.command = CMD_CALL_SYNC;
cmd.d.ptr[0] = func;
cmd.d.ptr[1] = func_data;
cmd.d.ptr[2] = func_free_cb;
rep = malloc(sizeof(Control_Reply));
if (!rep) return NULL;
cmd.d.ptr[3] = rep;
rep->data = NULL;
eina_semaphore_new(&(rep->sem), 0);
write(pd->ctrl.in, &cmd, sizeof(Control_Data));
eina_semaphore_lock(&(rep->sem));
data = rep->data;
free(rep);
return data;
}
//////////////////////////////////////////////////////////////////////////
#include "efl_thread.eo.c"

View File

@ -1,7 +1,7 @@
import efl_types;
import eina_types;
class Efl.Thread (Efl.Task)
class Efl.Thread (Efl.Task, Efl.ThreadIO)
{
methods {
}
@ -18,5 +18,7 @@ class Efl.Thread (Efl.Task)
Efl.Io.Reader.eos { get; set; }
Efl.Io.Writer.write;
Efl.Io.Writer.can_write { get; set; }
Efl.ThreadIO.call;
Efl.ThreadIO.call_sync;
}
}

View File

@ -0,0 +1,58 @@
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <Ecore.h>
#include "ecore_private.h"
#define MY_CLASS EFL_THREADIO_CLASS
typedef struct _Efl_ThreadIO_Data Efl_ThreadIO_Data;
struct _Efl_ThreadIO_Data
{
void *indata, *outdata;
};
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////
EOLIAN static void
_efl_threadio_indata_set(Eo *obj EINA_UNUSED, Efl_ThreadIO_Data *pd, void *data)
{
pd->indata = data;
}
EOLIAN static void *
_efl_threadio_indata_get(Eo *obj EINA_UNUSED, Efl_ThreadIO_Data *pd)
{
return pd->indata;
}
EOLIAN static void
_efl_threadio_outdata_set(Eo *obj EINA_UNUSED, Efl_ThreadIO_Data *pd, void *data)
{
pd->outdata = data;
}
EOLIAN static void *
_efl_threadio_outdata_get(Eo *obj EINA_UNUSED, Efl_ThreadIO_Data *pd)
{
return pd->outdata;
}
EOLIAN static void
_efl_threadio_call(Eo *obj EINA_UNUSED, Efl_ThreadIO_Data *pd EINA_UNUSED, void *func_data EINA_UNUSED, EFlThreadIOCall func EINA_UNUSED, Eina_Free_Cb func_free_cb EINA_UNUSED)
{
}
EOLIAN static void *
_efl_threadio_call_sync(Eo *obj EINA_UNUSED, Efl_ThreadIO_Data *pd EINA_UNUSED, void *func_data EINA_UNUSED, EFlThreadIOCallSync func EINA_UNUSED, Eina_Free_Cb func_free_cb EINA_UNUSED)
{
}
//////////////////////////////////////////////////////////////////////////
#include "efl_threadio.eo.c"

View File

@ -0,0 +1,53 @@
import efl_types;
import eina_types;
function EFlThreadIOCall {
[[ A Function to call on the "other end" of a thread obvject ]]
params {
obj: Efl.Object; [[ ]]
}
};
function EFlThreadIOCallSync {
[[ A Function to call on the "other end" of a thread obvject ]]
params {
obj: Efl.Object; [[ ]]
}
return: void_ptr; [[ ]]
};
mixin Efl.ThreadIO
{
[[ ]]
methods {
@property indata {
set { }
get { }
values {
data: void_ptr; [[ ]]
}
}
@property outdata {
set { }
get { }
values {
data: void_ptr; [[ ]]
}
}
call {
params {
func: EFlThreadIOCall; [[ ]]
}
}
call_sync {
params {
func: EFlThreadIOCallSync; [[ ]]
}
return: void_ptr; [[ ]]
}
}
events {
}
implements {
}
}