ecore_ipc: implement ecore_con_server_connect() using Efl.Net.Dialer.

Use the new Efl.Net.Dialer classes to implement
ecore_con_server_connect() scenario.

Note that since Windows still doesn't provide any equivalent to
Efl.Net.Dialer.Unix, we keep the legacy code for it.
This commit is contained in:
Gustavo Sverzut Barbieri 2016-11-23 12:17:14 -02:00
parent 715c882073
commit e34b07e6e2
2 changed files with 318 additions and 3 deletions

View File

@ -404,9 +404,118 @@ ecore_ipc_server_add(Ecore_Ipc_Type compl_type, const char *name, int port, cons
return svr;
}
/* FIXME: need to add protocol type parameter */
EAPI Ecore_Ipc_Server *
ecore_ipc_server_connect(Ecore_Ipc_Type compl_type, char *name, int port, const void *data)
static void
_ecore_ipc_dialer_del(Ecore_Ipc_Server *svr)
{
DBG("dialer %p del", svr);
if (svr->dialer.recv_copier)
{
efl_del(svr->dialer.recv_copier);
svr->dialer.recv_copier = NULL;
}
if (svr->dialer.send_copier)
{
efl_del(svr->dialer.send_copier);
svr->dialer.send_copier = NULL;
}
if (svr->dialer.input)
{
efl_del(svr->dialer.input);
svr->dialer.input = NULL;
}
if (svr->dialer.dialer)
{
efl_del(svr->dialer.dialer);
svr->dialer.dialer = NULL;
}
}
static void
_ecore_ipc_dialer_eos(void *data, const Efl_Event *event EINA_UNUSED)
{
Ecore_Ipc_Server *svr = data;
DBG("dialer %p %p eos", svr, svr->dialer.dialer);
_ecore_ipc_dialer_del(svr);
ecore_ipc_post_event_server_del(svr);
}
static void
_ecore_ipc_dialer_error(void *data, const Efl_Event *event)
{
Ecore_Ipc_Server *svr = data;
Eina_Error *perr = event->info;
WRN("dialer %p %p error %s", svr, svr->dialer.dialer, eina_error_msg_get(*perr));
if (!efl_io_closer_closed_get(svr->dialer.dialer))
efl_io_closer_close(svr->dialer.dialer); /* triggers EOS */
}
static void
_ecore_ipc_dialer_connected(void *data, const Efl_Event *event EINA_UNUSED)
{
Ecore_Ipc_Server *svr = data;
DBG("connected to %s %s",
efl_class_name_get(efl_class_get(svr->dialer.dialer)),
efl_net_dialer_address_dial_get(svr->dialer.dialer));
ecore_ipc_post_event_server_add(svr);
}
EFL_CALLBACKS_ARRAY_DEFINE(_ecore_ipc_dialer_cbs,
{ EFL_IO_READER_EVENT_EOS, _ecore_ipc_dialer_eos },
{ EFL_NET_DIALER_EVENT_ERROR, _ecore_ipc_dialer_error },
{ EFL_NET_DIALER_EVENT_CONNECTED, _ecore_ipc_dialer_connected });
static Eina_Bool ecore_ipc_server_data_process(Ecore_Ipc_Server *svr, void *data, int size, Eina_Bool *stolen);
static void
_ecore_ipc_dialer_copier_data(void *data, const Efl_Event *event EINA_UNUSED)
{
Ecore_Ipc_Server *svr = data;
Eina_Binbuf *binbuf;
uint8_t *mem;
int size;
Eina_Bool stolen;
DBG("dialer %p recv_copier %p data", svr, svr->dialer.recv_copier);
binbuf = efl_io_copier_binbuf_steal(svr->dialer.recv_copier);
EINA_SAFETY_ON_NULL_RETURN(binbuf);
size = eina_binbuf_length_get(binbuf);
mem = eina_binbuf_string_steal(binbuf);
eina_binbuf_free(binbuf);
ecore_ipc_server_data_process(svr, mem, size, &stolen);
if (!stolen) free(mem);
}
static void
_ecore_ipc_dialer_copier_error(void *data, const Efl_Event *event)
{
Ecore_Ipc_Server *svr = data;
Eina_Error *perr = event->info;
WRN("dialer %p %p copier %p error %s", svr, svr->dialer.dialer, event->object, eina_error_msg_get(*perr));
if (!efl_io_closer_closed_get(svr->dialer.dialer))
efl_io_closer_close(svr->dialer.dialer);
}
EFL_CALLBACKS_ARRAY_DEFINE(_ecore_ipc_dialer_copier_cbs,
{ EFL_IO_COPIER_EVENT_ERROR, _ecore_ipc_dialer_copier_error });
#ifndef EFL_NET_DIALER_UNIX_CLASS
static Ecore_Ipc_Server *
ecore_ipc_server_connect_legacy(Ecore_Ipc_Type compl_type, char *name, int port, const void *data)
{
Ecore_Ipc_Server *svr;
Ecore_Ipc_Type type;
@ -447,6 +556,133 @@ ecore_ipc_server_connect(Ecore_Ipc_Type compl_type, char *name, int port, const
ECORE_MAGIC_SET(svr, ECORE_MAGIC_IPC_SERVER);
return svr;
}
#endif
/* FIXME: need to add protocol type parameter */
EAPI Ecore_Ipc_Server *
ecore_ipc_server_connect(Ecore_Ipc_Type type, char *name, int port, const void *data)
{
Ecore_Ipc_Server *svr;
Eo *loop = ecore_main_loop_get();
char *address = NULL;
Eina_Error err;
EINA_SAFETY_ON_NULL_RETURN_VAL(name, NULL);
#ifndef EFL_NET_DIALER_UNIX_CLASS
if (((type & ECORE_IPC_TYPE) == ECORE_IPC_LOCAL_USER) ||
((type & ECORE_IPC_TYPE) == ECORE_IPC_LOCAL_SYSTEM))
{
DBG("no 'local' Efl.Net.Dialer for your system yet, use legacy Ecore_Con");
return ecore_ipc_server_connect_legacy(type, name, port, data);
}
#endif
svr = calloc(1, sizeof(Ecore_Ipc_Server));
EINA_SAFETY_ON_NULL_RETURN_VAL(svr, NULL);
if (0) { }
#ifdef EFL_NET_DIALER_UNIX_CLASS
if ((type & ECORE_IPC_TYPE) == ECORE_IPC_LOCAL_USER)
{
address = ecore_con_local_path_new(EINA_FALSE, name, port);
EINA_SAFETY_ON_NULL_GOTO(address, error_dialer);
svr->dialer.dialer = efl_add(EFL_NET_DIALER_UNIX_CLASS, ecore_main_loop_get());
EINA_SAFETY_ON_NULL_GOTO(svr->dialer.dialer, error_dialer);
}
else if ((type & ECORE_IPC_TYPE) == ECORE_IPC_LOCAL_SYSTEM)
{
address = ecore_con_local_path_new(EINA_TRUE, name, port);
EINA_SAFETY_ON_NULL_GOTO(address, error_dialer);
svr->dialer.dialer = efl_add(EFL_NET_DIALER_UNIX_CLASS, ecore_main_loop_get());
EINA_SAFETY_ON_NULL_GOTO(svr->dialer.dialer, error_dialer);
}
#endif /* EFL_NET_DIALER_UNIX_CLASS */
else if ((type & ECORE_IPC_TYPE) == ECORE_IPC_REMOTE_SYSTEM)
{
char buf[4096];
if (port <= 0)
{
ERR("remote system requires port>=0, got %d", port);
goto error_dialer;
}
snprintf(buf, sizeof(buf), "%s:%d", name, port);
address = strdup(buf);
EINA_SAFETY_ON_NULL_GOTO(address, error_dialer);
if ((type & ECORE_IPC_USE_SSL) == ECORE_IPC_USE_SSL)
{
svr->dialer.dialer = efl_add(EFL_NET_DIALER_SSL_CLASS, loop);
EINA_SAFETY_ON_NULL_GOTO(svr->dialer.dialer, error_dialer);
}
else
{
svr->dialer.dialer = efl_add(EFL_NET_DIALER_TCP_CLASS, loop);
EINA_SAFETY_ON_NULL_GOTO(svr->dialer.dialer, error_dialer);
}
if ((type & ECORE_IPC_NO_PROXY) == ECORE_IPC_NO_PROXY)
efl_net_dialer_proxy_set(svr->dialer.dialer, "");
}
else
{
ERR("IPC Type must be one of: local_user, local_system or remote_system");
goto error_dialer;
}
efl_io_closer_close_on_destructor_set(svr->dialer.dialer, EINA_TRUE);
efl_event_callback_array_add(svr->dialer.dialer, _ecore_ipc_dialer_cbs(), svr);
svr->dialer.input = efl_add(EFL_IO_QUEUE_CLASS, loop);
EINA_SAFETY_ON_NULL_GOTO(svr->dialer.input, error);
svr->dialer.send_copier = efl_add(EFL_IO_COPIER_CLASS, loop,
efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE),
efl_io_copier_source_set(efl_added, svr->dialer.input),
efl_io_copier_destination_set(efl_added, svr->dialer.dialer),
efl_event_callback_array_add(efl_added, _ecore_ipc_dialer_copier_cbs(), svr));
EINA_SAFETY_ON_NULL_GOTO(svr->dialer.send_copier, error);
svr->dialer.recv_copier = efl_add(EFL_IO_COPIER_CLASS, loop,
efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE),
efl_io_copier_source_set(efl_added, svr->dialer.dialer),
efl_event_callback_array_add(efl_added, _ecore_ipc_dialer_copier_cbs(), svr),
efl_event_callback_add(efl_added, EFL_IO_COPIER_EVENT_DATA, _ecore_ipc_dialer_copier_data, svr));
EINA_SAFETY_ON_NULL_GOTO(svr->dialer.recv_copier, error);
err = efl_net_dialer_dial(svr->dialer.dialer, address);
if (err)
{
WRN("Could not reach %s %s: %s",
efl_class_name_get(efl_class_get(svr->dialer.dialer)),
address, eina_error_msg_get(err));
goto error;
}
DBG("connecting %p %s address='%s'",
svr->dialer.dialer,
efl_class_name_get(efl_class_get(svr->dialer.dialer)),
address);
svr->data = (void *)data;
servers = eina_list_append(servers, svr);
ECORE_MAGIC_SET(svr, ECORE_MAGIC_IPC_SERVER);
free(address);
return svr;
error:
free(address);
_ecore_ipc_dialer_del(svr);
free(svr);
return NULL; /* dialer will trigger all cleanup on its own callbacks */
error_dialer:
free(svr);
return NULL;
}
EAPI void *
ecore_ipc_server_del(Ecore_Ipc_Server *svr)
@ -473,13 +709,17 @@ ecore_ipc_server_del(Ecore_Ipc_Server *svr)
cl->svr = NULL;
ecore_ipc_client_del(cl);
}
if (svr->dialer.dialer) _ecore_ipc_dialer_del(svr);
if (svr->server) ecore_con_server_del(svr->server);
servers = eina_list_remove(servers, svr);
if (svr->buf) free(svr->buf);
ECORE_MAGIC_SET(svr, ECORE_MAGIC_NONE);
DBG("server %p freed", svr);
free(svr);
}
else DBG("server %p has %d events pending, postpone deletion", svr, svr->event_count);
return data;
}
@ -504,6 +744,11 @@ ecore_ipc_server_connected_get(Ecore_Ipc_Server *svr)
"ecore_ipc_server_connected_get");
return EINA_FALSE;
}
if (svr->dialer.dialer)
return efl_net_dialer_connected_get(svr->dialer.dialer);
else if (!svr->server) return EINA_FALSE;
return ecore_con_server_connected_get(svr->server);
}
@ -589,6 +834,48 @@ ecore_ipc_server_send(Ecore_Ipc_Server *svr, int major, int minor, int ref, int
*head |= md << (4 * 5);
*head = htonl(*head);
svr->prev.o = msg;
if (svr->dialer.input)
{
Eina_Slice slice;
Eina_Error err;
slice.mem = dat;
slice.len = s;
err = efl_io_writer_write(svr->dialer.input, &slice, NULL);
if (err)
{
ERR("could not write queue=%p %zd bytes: %s",
svr->dialer.input, slice.len, eina_error_msg_get(err));
return 0;
}
if (slice.len < (size_t)s)
{
ERR("only wrote %zd of %d bytes to queue %p",
slice.len, s, svr->dialer.input);
return 0;
}
slice.mem = data;
slice.len = size;
err = efl_io_writer_write(svr->dialer.input, &slice, NULL);
if (err)
{
ERR("could not write queue=%p %zd bytes: %s",
svr->dialer.input, slice.len, eina_error_msg_get(err));
return 0;
}
if (slice.len < (size_t)size)
{
ERR("only wrote %zd of %d bytes to queue %p",
slice.len, size, svr->dialer.input);
return 0;
}
return s + size;
}
else if (!svr->server) return 0;
ret = ecore_con_server_send(svr->server, dat, s);
if (size > 0) ret += ecore_con_server_send(svr->server, data, size);
return ret;
@ -639,6 +926,17 @@ ecore_ipc_server_ip_get(Ecore_Ipc_Server *svr)
"ecore_ipc_server_ip_get");
return NULL;
}
if (svr->dialer.dialer)
{
if (efl_isa(svr->dialer.dialer, EFL_NET_DIALER_TCP_CLASS) ||
efl_isa(svr->dialer.dialer, EFL_NET_DIALER_SSL_CLASS))
return efl_net_dialer_address_dial_get(svr->dialer.dialer);
/* original IPC just returned IP for remote connections */
return NULL;
}
else if (!svr->server) return NULL;
return ecore_con_server_ip_get(svr->server);
}
@ -651,6 +949,14 @@ ecore_ipc_server_flush(Ecore_Ipc_Server *svr)
"ecore_ipc_server_server_flush");
return;
}
if (svr->dialer.input)
{
while (efl_io_queue_usage_get(svr->dialer.input) > 0)
efl_io_copier_flush(svr->dialer.send_copier);
return;
}
else if (!svr->server) return;
ecore_con_server_flush(svr->server);
}

View File

@ -84,6 +84,15 @@ struct _Ecore_Ipc_Client
struct _Ecore_Ipc_Server
{
ECORE_MAGIC;
/* when used as dialer: ecore_ipc_server_connect() */
struct {
Eo *input;
Eo *dialer;
Eo *recv_copier;
Eo *send_copier;
} dialer;
Ecore_Con_Server *server;
Eina_List *clients;
void *data;