From 46341b329d72736c1f1c47478f760ab8db76bbc8 Mon Sep 17 00:00:00 2001 From: Gustavo Sverzut Barbieri Date: Fri, 25 Nov 2016 01:27:33 -0200 Subject: [PATCH] efl_io_buffered_stream: wraps an I/O object and make it easy to use. Since all other efl.io objects are low-level, the recommended approach is to use an efl.io.copier. However when dealing with in-memory, bi-directional comms like talking to a socket, we always end with 2 queues, 2 copiers and the annoying setup that is being replicated in ecore_ipc, efl_debug and so on. This class is the base to make it simpler. Other classes such as Efl.Net.Socket.Simple, Efl.Net.Dialer.Simple and Efl.Net.Server.Simple will use it to provide simpler code to users. I guess we can call EFL+EO Java now? --- src/Makefile_Ecore.am | 5 +- src/examples/ecore/.gitignore | 1 + src/examples/ecore/Makefile.am | 5 + .../ecore/efl_io_buffered_stream_example.c | 312 ++++++++++ src/lib/ecore/Ecore_Eo.h | 1 + src/lib/ecore/efl_io_buffered_stream.c | 549 ++++++++++++++++++ src/lib/ecore/efl_io_buffered_stream.eo | 251 ++++++++ 7 files changed, 1123 insertions(+), 1 deletion(-) create mode 100644 src/examples/ecore/efl_io_buffered_stream_example.c create mode 100644 src/lib/ecore/efl_io_buffered_stream.c create mode 100644 src/lib/ecore/efl_io_buffered_stream.eo diff --git a/src/Makefile_Ecore.am b/src/Makefile_Ecore.am index 1eeab088b6..7ebef696ce 100644 --- a/src/Makefile_Ecore.am +++ b/src/Makefile_Ecore.am @@ -19,7 +19,9 @@ ecore_eolian_files_public = \ lib/ecore/efl_io_stdout.eo \ lib/ecore/efl_io_stderr.eo \ lib/ecore/efl_io_file.eo \ - lib/ecore/efl_io_copier.eo + lib/ecore/efl_io_copier.eo \ + lib/ecore/efl_io_buffered_stream.eo + ecore_eolian_files = \ $(ecore_eolian_files_public) \ @@ -82,6 +84,7 @@ lib/ecore/efl_io_stdout.c \ lib/ecore/efl_io_stderr.c \ lib/ecore/efl_io_file.c \ lib/ecore/efl_io_copier.c \ +lib/ecore/efl_io_buffered_stream.c \ lib/ecore/efl_promise.c \ lib/ecore/ecore_pipe.c \ lib/ecore/ecore_poller.c \ diff --git a/src/examples/ecore/.gitignore b/src/examples/ecore/.gitignore index e8e8aa9bdb..b353fa783a 100644 --- a/src/examples/ecore/.gitignore +++ b/src/examples/ecore/.gitignore @@ -51,6 +51,7 @@ /efl_io_copier_example /efl_io_copier_simple_example /efl_io_queue_example +/efl_io_buffered_stream_example /efl_net_server_example /efl_net_dialer_http_example /efl_net_dialer_websocket_example diff --git a/src/examples/ecore/Makefile.am b/src/examples/ecore/Makefile.am index bd5796a5ed..db23d30c17 100644 --- a/src/examples/ecore/Makefile.am +++ b/src/examples/ecore/Makefile.am @@ -84,6 +84,7 @@ ecore_con_eet_server_example \ efl_io_copier_example \ efl_io_copier_simple_example \ efl_io_queue_example \ +efl_io_buffered_stream_example \ efl_net_server_example \ efl_net_dialer_http_example \ efl_net_dialer_websocket_example \ @@ -317,6 +318,9 @@ efl_io_copier_simple_example_LDADD = $(ECORE_COMMON_LDADD) efl_io_queue_example_SOURCES = efl_io_queue_example.c efl_io_queue_example_LDADD = $(ECORE_CON_COMMON_LDADD) +efl_io_buffered_stream_example_SOURCES = efl_io_buffered_stream_example.c +efl_io_buffered_stream_example_LDADD = $(ECORE_CON_COMMON_LDADD) + efl_net_server_example_SOURCES = efl_net_server_example.c efl_net_server_example_LDADD = $(ECORE_CON_COMMON_LDADD) @@ -407,6 +411,7 @@ ecore_con_eet_descriptor_example.c \ efl_io_copier_example.c \ efl_io_copier_simple_example.c \ efl_io_queue_example.c \ +efl_io_buffered_stream_example.c \ efl_net_server_example.c \ efl_net_dialer_http_example.c \ efl_net_dialer_websocket_example.c \ diff --git a/src/examples/ecore/efl_io_buffered_stream_example.c b/src/examples/ecore/efl_io_buffered_stream_example.c new file mode 100644 index 0000000000..ec8ff1fd4d --- /dev/null +++ b/src/examples/ecore/efl_io_buffered_stream_example.c @@ -0,0 +1,312 @@ +#define EFL_BETA_API_SUPPORT 1 +#define EFL_EO_API_SUPPORT 1 +#include +#include +#include + +static int retval = EXIT_SUCCESS; +static Eina_List *commands = NULL; +static Eina_Slice line_delimiter; +static Eo *stream = NULL; + +static void +_command_next(void) +{ + Eina_Slice slice; + char *cmd; + + if (!commands) + { + efl_io_buffered_stream_eos_mark(stream); + return; + } + + cmd = commands->data; + commands = eina_list_remove_list(commands, commands); + + slice = (Eina_Slice)EINA_SLICE_STR(cmd); + efl_io_writer_write(stream, &slice, NULL); + fprintf(stderr, "INFO: sent '" EINA_SLICE_STR_FMT "'\n", + EINA_SLICE_STR_PRINT(slice)); + + /* don't use line_delimiter directly, 'len' may be changed! */ + slice = line_delimiter; + efl_io_writer_write(stream, &slice, NULL); + free(cmd); +} + +static void +_receiver_data(void *data EINA_UNUSED, const Efl_Event *event) +{ + Eina_Slice slice; + + if (!efl_io_buffered_stream_slice_get(event->object, &slice)) return; + + /* this will happen when we're called when we issue our own + * efl_io_buffered_stream_clear() below. + */ + if (slice.len == 0) return; + + /* + * If the server didn't send us the line terminator and closed the + * connection (ie: efl_io_reader_eos_get() == true) or if the + * efl_io_buffered_stream_max_queue_size_input_set() was reached, + * then we may have a line without a trailing delimiter. Check for + * that. + */ + if (!eina_slice_endswith(slice, line_delimiter)) + { + fprintf(stderr, "WARNING: received without line-delimiter '" + EINA_SLICE_STR_FMT "'\n", + EINA_SLICE_STR_PRINT(slice)); + } + else + { + slice.len -= line_delimiter.len; + fprintf(stderr, "INFO: received '" EINA_SLICE_STR_FMT "'\n", + EINA_SLICE_STR_PRINT(slice)); + } + + efl_io_buffered_stream_clear(event->object); + _command_next(); +} + +static void +_dialer_connected(void *data EINA_UNUSED, const Efl_Event *event) +{ + fprintf(stderr, "INFO: connected to %s (%s)\n", + efl_net_dialer_address_dial_get(event->object), + efl_net_socket_address_remote_get(event->object)); + + _command_next(); +} + +static void +_stream_write_finished(void *data EINA_UNUSED, const Efl_Event *event) +{ + fprintf(stderr, "INFO: %s done sending\n", efl_name_get(event->object)); +} + +static void +_stream_error(void *data EINA_UNUSED, const Efl_Event *event) +{ + const Eina_Error *perr = event->info; + fprintf(stderr, "INFO: %s error: #%d '%s'\n", + efl_name_get(event->object), *perr, eina_error_msg_get(*perr)); + retval = EXIT_FAILURE; + ecore_main_loop_quit(); +} + +static void +_stream_eos(void *data EINA_UNUSED, const Efl_Event *event) +{ + fprintf(stderr, "INFO: %s eos, quit\n", efl_name_get(event->object)); + ecore_main_loop_quit(); +} + +EFL_CALLBACKS_ARRAY_DEFINE(stream_cbs, + { EFL_IO_BUFFERED_STREAM_EVENT_LINE, _receiver_data }, + { EFL_IO_READER_EVENT_EOS, _stream_eos }, + { EFL_IO_BUFFERED_STREAM_EVENT_WRITE_FINISHED, _stream_write_finished }, + { EFL_IO_BUFFERED_STREAM_EVENT_ERROR, _stream_error }); + +static char * +_unescape(const char *str) +{ + char *ret = strdup(str); + char *c, *w; + Eina_Bool escaped = EINA_FALSE; + + for (c = ret, w = ret; *c != '\0'; c++) + { + if (escaped) + { + escaped = EINA_FALSE; + switch (*c) + { + case 'n': *w = '\n'; break; + case 'r': *w = '\r'; break; + case 't': *w = '\t'; break; + default: w++; /* no change */ + } + w++; + } + else + { + if (*c == '\\') + escaped = EINA_TRUE; + else + w++; + } + } + *w = '\0'; + return ret; +} + +static const Ecore_Getopt options = { + "efl_io_buffered_stream_example", /* program name */ + NULL, /* usage line */ + "1", /* version */ + "(C) 2016 Enlightenment Project", /* copyright */ + "BSD 2-Clause", /* license */ + /* long description, may be multiline and contain \n */ + "Example of Efl_Io_Buffered_Stream usage.\n" + "\n" + "This uses Efl_Io_Buffered_Stream to easily interface with Efl_Net_Dialer_Tcp.", + EINA_FALSE, + { + ECORE_GETOPT_STORE_STR('d', "line-delimiter", + "Changes the line delimiter to be used in both send and receive. Defaults to \\r\\n"), + ECORE_GETOPT_STORE_ULONG('l', "buffer-limit", + "If set will limit buffer size to this limit of bytes. If used alongside with --line-delimiter and that delimiter was not found but bffer limit was reached, the line event will be triggered without the delimiter at the end."), + ECORE_GETOPT_VERSION('V', "version"), + ECORE_GETOPT_COPYRIGHT('C', "copyright"), + ECORE_GETOPT_LICENSE('L', "license"), + ECORE_GETOPT_HELP('h', "help"), + + ECORE_GETOPT_STORE_METAVAR_STR(0, NULL, + "The server address as\n" + "IP:PORT to connect using TCP and an IPv4 (A.B.C.D:PORT) or IPv6 ([A:B:C:D::E]:PORT).\n", + "server_address"), + ECORE_GETOPT_APPEND_METAVAR(0, NULL, + "Commands to send", + "commands", + ECORE_GETOPT_TYPE_STR), + ECORE_GETOPT_SENTINEL + } +}; + +int +main(int argc, char **argv) +{ + char *address = NULL; + char *line_delimiter_str = NULL; + char *cmd; + unsigned long buffer_limit = 0; + Eina_Bool quit_option = EINA_FALSE; + Ecore_Getopt_Value values[] = { + ECORE_GETOPT_VALUE_STR(line_delimiter_str), + ECORE_GETOPT_VALUE_ULONG(buffer_limit), + + /* standard block to provide version, copyright, license and help */ + ECORE_GETOPT_VALUE_BOOL(quit_option), /* -V/--version quits */ + ECORE_GETOPT_VALUE_BOOL(quit_option), /* -C/--copyright quits */ + ECORE_GETOPT_VALUE_BOOL(quit_option), /* -L/--license quits */ + ECORE_GETOPT_VALUE_BOOL(quit_option), /* -h/--help quits */ + + /* positional argument */ + ECORE_GETOPT_VALUE_STR(address), + ECORE_GETOPT_VALUE_LIST(commands), + + ECORE_GETOPT_VALUE_NONE /* sentinel */ + }; + Eina_Error err; + int args; + Eo *dialer, *loop; + + ecore_init(); + ecore_con_init(); + + args = ecore_getopt_parse(&options, values, argc, argv); + if (args < 0) + { + fputs("ERROR: Could not parse command line options.\n", stderr); + retval = EXIT_FAILURE; + goto end; + } + + if (quit_option) goto end; + + args = ecore_getopt_parse_positional(&options, values, argc, argv, args); + if (args < 0) + { + fputs("ERROR: Could not parse positional arguments.\n", stderr); + retval = EXIT_FAILURE; + goto end; + } + + line_delimiter_str = _unescape(line_delimiter_str ? line_delimiter_str : "\\r\\n"); + + if (!commands) + { + fputs("ERROR: missing commands to send.\n", stderr); + retval = EXIT_FAILURE; + goto end; + } + + /* + * some objects such as the Efl.Io.Buffered_Stream and + * Efl.Net.Dialer.Tcp depend on main loop, thus their parent must + * be a loop provider. We use the loop itself. + */ + loop = ecore_main_loop_get(); + + /* The TCP client to use to send/receive network data */ + dialer = efl_add(EFL_NET_DIALER_TCP_CLASS, loop, + efl_name_set(efl_added, "dialer"), + efl_event_callback_add(efl_added, EFL_NET_DIALER_EVENT_CONNECTED, _dialer_connected, NULL)); + if (!dialer) + { + fprintf(stderr, "ERROR: could not create Efl_Net_Dialer_Tcp\n"); + retval = EXIT_FAILURE; + goto end; + } + + line_delimiter = (Eina_Slice)EINA_SLICE_STR(line_delimiter_str); + + /* + * Without the buffered stream we'd have to create two Efl.Io.Queue + * ourselves, as well as two Efl.Io.Copier to link them with the + * dialer. + * + * Our example's usage is to write each command at once followed by + * the line_delimiter, then wait for a reply from the server, then + * write another. + * + * On incoming data we peek at it with slice_get() and then clear(). + */ + stream = efl_add(EFL_IO_BUFFERED_STREAM_CLASS, loop, + efl_name_set(efl_added, "stream"), + efl_io_buffered_stream_inner_io_set(efl_added, dialer), /* mandatory! */ + efl_io_buffered_stream_line_delimiter_set(efl_added, &line_delimiter), + efl_io_buffered_stream_max_queue_size_input_set(efl_added, buffer_limit), + efl_io_buffered_stream_max_queue_size_output_set(efl_added, buffer_limit), + efl_event_callback_array_add(efl_added, stream_cbs(), NULL)); + if (!stream) + { + fprintf(stderr, "ERROR: could not create Efl_Io_Buffered_Stream\n"); + retval = EXIT_FAILURE; + goto error_stream; + } + + /* + * From here on it's mostly the same all Efl_Io_Copier would do, + * check efl_io_copier_simple_example.c and efl_io_copier_example.c + */ + err = efl_net_dialer_dial(dialer, address); + if (err) + { + fprintf(stderr, "ERROR: could not dial %s: %s\n", + address, eina_error_msg_get(err)); + goto error_dialing; + } + + ecore_main_loop_begin(); + + error_dialing: + efl_io_closer_close(stream); + efl_del(stream); + error_stream: + efl_del(dialer); + end: + EINA_LIST_FREE(commands, cmd) + { + fprintf(stderr, "ERROR: unsent command: %s\n", cmd); + free(cmd); + } + + ecore_con_shutdown(); + ecore_shutdown(); + + return retval; +} diff --git a/src/lib/ecore/Ecore_Eo.h b/src/lib/ecore/Ecore_Eo.h index fc46941fac..1387afb344 100644 --- a/src/lib/ecore/Ecore_Eo.h +++ b/src/lib/ecore/Ecore_Eo.h @@ -110,6 +110,7 @@ EAPI Efl_Future *efl_future_iterator_race(Eina_Iterator *it); #include "efl_io_stderr.eo.h" #include "efl_io_file.eo.h" #include "efl_io_copier.eo.h" +#include "efl_io_buffered_stream.eo.h" /** * @} diff --git a/src/lib/ecore/efl_io_buffered_stream.c b/src/lib/ecore/efl_io_buffered_stream.c new file mode 100644 index 0000000000..f75c033aea --- /dev/null +++ b/src/lib/ecore/efl_io_buffered_stream.c @@ -0,0 +1,549 @@ +#ifdef HAVE_CONFIG_H +# include +#endif + +#define EFL_IO_READER_PROTECTED 1 +#define EFL_IO_WRITER_PROTECTED 1 +#define EFL_IO_CLOSER_PROTECTED 1 + +#include +#include "ecore_private.h" + +typedef struct +{ + Eo *inner_io; + Eo *incoming; + Eo *outgoing; + Eo *sender; + Eo *receiver; + Eina_Bool closed; + Eina_Bool eos; + Eina_Bool can_read; + Eina_Bool can_write; + Eina_Bool is_closer; +} Efl_Io_Buffered_Stream_Data; + +#define MY_CLASS EFL_IO_BUFFERED_STREAM_CLASS + +static void +_efl_io_buffered_stream_error(void *data, const Efl_Event *event) +{ + Eo *o = data; + Eina_Error *perr = event->info; + DBG("%p %s error: %s", o, efl_name_get(event->object), eina_error_msg_get(*perr)); + efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_ERROR, event->info); +} + +static void +_efl_io_buffered_stream_incoming_can_read_changed(void *data, const Efl_Event *event) +{ + Eo *o = data; + if (efl_io_closer_closed_get(o)) return; /* already closed (or closing) */ + efl_io_reader_can_read_set(o, efl_io_reader_can_read_get(event->object)); +} + +static void +_efl_io_buffered_stream_incoming_slice_changed(void *data, const Efl_Event *event EINA_UNUSED) +{ + Eo *o = data; + efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_SLICE_CHANGED, NULL); +} + +EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_incoming_cbs, + { EFL_IO_READER_EVENT_CAN_READ_CHANGED, _efl_io_buffered_stream_incoming_can_read_changed }, + { EFL_IO_QUEUE_EVENT_SLICE_CHANGED, _efl_io_buffered_stream_incoming_slice_changed }); + +static void +_efl_io_buffered_stream_receiver_line(void *data, const Efl_Event *event) +{ + Eo *o = data; + efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_LINE, event->info); +} + +static void +_efl_io_buffered_stream_receiver_done(void *data, const Efl_Event *event EINA_UNUSED) +{ + Eo *o = data; + if (efl_io_closer_closed_get(o)) return; /* already closed (or closing) */ + efl_io_reader_eos_set(o, EINA_TRUE); +} + +EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_receiver_cbs, + { EFL_IO_COPIER_EVENT_DONE, _efl_io_buffered_stream_receiver_done }, + { EFL_IO_COPIER_EVENT_LINE, _efl_io_buffered_stream_receiver_line }, + { EFL_IO_COPIER_EVENT_ERROR, _efl_io_buffered_stream_error }); + + +static void +_efl_io_buffered_stream_outgoing_can_write_changed(void *data, const Efl_Event *event) +{ + Eo *o = data; + if (efl_io_closer_closed_get(o)) return; /* already closed (or closing) */ + efl_io_writer_can_write_set(o, efl_io_writer_can_write_get(event->object)); +} + +EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_outgoing_cbs, + { EFL_IO_WRITER_EVENT_CAN_WRITE_CHANGED, _efl_io_buffered_stream_outgoing_can_write_changed }); + +static void +_efl_io_buffered_stream_sender_done(void *data, const Efl_Event *event EINA_UNUSED) +{ + Eo *o = data; + Efl_Io_Buffered_Stream_Data *pd = efl_data_scope_get(o, MY_CLASS); + efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_WRITE_FINISHED, NULL); + if (efl_io_copier_done_get(pd->receiver)) + efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_FINISHED, NULL); +} + +EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_sender_cbs, + { EFL_IO_COPIER_EVENT_DONE, _efl_io_buffered_stream_sender_done }, + { EFL_IO_COPIER_EVENT_ERROR, _efl_io_buffered_stream_error }); + +static void +_efl_io_buffered_stream_inner_io_del(void *data, const Efl_Event *event) +{ + Eo *o = data; + Efl_Io_Buffered_Stream_Data *pd = efl_data_scope_get(o, MY_CLASS); + DBG("%p the inner I/O %p was deleted", o, event->object); + if (pd->inner_io == event->object) + pd->inner_io = NULL; +} + +EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_inner_io_cbs, + { EFL_EVENT_DEL, _efl_io_buffered_stream_inner_io_del }); + + +EOLIAN static Efl_Object * +_efl_io_buffered_stream_efl_object_finalize(Eo *o, Efl_Io_Buffered_Stream_Data *pd) +{ + if (!pd->inner_io) + { + ERR("no valid I/O was set with efl_io_buffered_stream_inner_io_set()!"); + return NULL; + } + + return efl_finalize(efl_super(o, MY_CLASS)); +} + +EOLIAN static void +_efl_io_buffered_stream_efl_object_destructor(Eo *o, Efl_Io_Buffered_Stream_Data *pd) +{ + if (pd->incoming) + { + efl_del(pd->incoming); + pd->incoming = NULL; + } + if (pd->outgoing) + { + efl_del(pd->outgoing); + pd->outgoing = NULL; + } + if (pd->sender) + { + efl_del(pd->sender); + pd->sender = NULL; + } + if (pd->receiver) + { + efl_del(pd->receiver); + pd->receiver = NULL; + } + + if (pd->inner_io) + { + efl_event_callback_array_del(pd->inner_io, _efl_io_buffered_stream_inner_io_cbs(), o); + efl_unref(pd->inner_io); /* do not del, just take our ref */ + pd->inner_io = NULL; + } + + efl_destructor(efl_super(o, MY_CLASS)); +} + +EOLIAN static Eina_Error +_efl_io_buffered_stream_efl_io_closer_close(Eo *o, Efl_Io_Buffered_Stream_Data *pd) +{ + Eina_Error err = 0; + + EINA_SAFETY_ON_TRUE_RETURN_VAL(pd->closed, EINVAL); + + /* line delimiters may be holding a last chunk of data */ + if (pd->receiver) efl_io_copier_flush(pd->receiver, EINA_FALSE, EINA_TRUE); + + efl_io_writer_can_write_set(o, EINA_FALSE); + efl_io_reader_can_read_set(o, EINA_FALSE); + efl_io_reader_eos_set(o, EINA_TRUE); + + pd->closed = EINA_TRUE; + efl_event_callback_call(o, EFL_IO_CLOSER_EVENT_CLOSED, NULL); + + if (pd->sender && (!efl_io_closer_closed_get(pd->sender))) + efl_io_closer_close(pd->sender); + + if (pd->receiver && (!efl_io_closer_closed_get(pd->receiver))) + efl_io_closer_close(pd->receiver); + + return err; +} + +EOLIAN static Eina_Bool +_efl_io_buffered_stream_efl_io_closer_closed_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + return pd->closed || efl_io_closer_closed_get(pd->inner_io); +} + +EOLIAN static Eina_Bool +_efl_io_buffered_stream_efl_io_closer_close_on_exec_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + return efl_io_closer_close_on_exec_get(pd->inner_io); +} + +EOLIAN static Eina_Bool +_efl_io_buffered_stream_efl_io_closer_close_on_exec_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool value) +{ + return efl_io_closer_close_on_exec_set(pd->inner_io, value); +} + +EOLIAN static Eina_Bool +_efl_io_buffered_stream_efl_io_closer_close_on_destructor_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + return efl_io_closer_close_on_destructor_get(pd->inner_io); +} + +EOLIAN static void +_efl_io_buffered_stream_efl_io_closer_close_on_destructor_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool value) +{ + efl_io_closer_close_on_destructor_set(pd->inner_io, value); +} + +EOLIAN static Eina_Error +_efl_io_buffered_stream_efl_io_reader_read(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Rw_Slice *rw_slice) +{ + Eina_Error err; + + if (!pd->incoming) + { + WRN("%p reading from inner_io %p (%s) that doesn't implement Efl.Io.Reader", + o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io))); + return EINVAL; + } + + err = efl_io_reader_read(pd->incoming, rw_slice); + if (err && (err != EAGAIN)) + efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_ERROR, &err); + return err; +} + +EOLIAN static Eina_Bool +_efl_io_buffered_stream_efl_io_reader_can_read_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + return pd->can_read; +} + +EOLIAN static void +_efl_io_buffered_stream_efl_io_reader_can_read_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd EINA_UNUSED, Eina_Bool can_read) +{ + EINA_SAFETY_ON_TRUE_RETURN(efl_io_closer_closed_get(o)); + if (pd->can_read == can_read) return; + pd->can_read = can_read; + efl_event_callback_call(o, EFL_IO_READER_EVENT_CAN_READ_CHANGED, NULL); +} + +EOLIAN static Eina_Bool +_efl_io_buffered_stream_efl_io_reader_eos_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + return pd->eos; +} + +EOLIAN static void +_efl_io_buffered_stream_efl_io_reader_eos_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool is_eos) +{ + EINA_SAFETY_ON_TRUE_RETURN(efl_io_closer_closed_get(o)); + if (pd->eos == is_eos) return; + pd->eos = is_eos; + if (!is_eos) return; + + efl_event_callback_call(o, EFL_IO_READER_EVENT_EOS, NULL); + efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_READ_FINISHED, NULL); + if (efl_io_copier_done_get(pd->sender)) + efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_FINISHED, NULL); +} + +EOLIAN static Eina_Error +_efl_io_buffered_stream_efl_io_writer_write(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Slice *slice, Eina_Slice *remaining) +{ + Eina_Error err; + + if (!pd->outgoing) + { + WRN("%p writing to inner_io %p (%s) that doesn't implement Efl.Io.Writer", + o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io))); + return EINVAL; + } + + err = efl_io_writer_write(pd->outgoing, slice, remaining); + if (err && (err != EAGAIN)) + efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_ERROR, &err); + return err; +} + +EOLIAN static Eina_Bool +_efl_io_buffered_stream_efl_io_writer_can_write_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + return pd->can_write; +} + +EOLIAN static void +_efl_io_buffered_stream_efl_io_writer_can_write_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd EINA_UNUSED, Eina_Bool can_write) +{ + EINA_SAFETY_ON_TRUE_RETURN(efl_io_closer_closed_get(o)); + if (pd->can_write == can_write) return; + pd->can_write = can_write; + efl_event_callback_call(o, EFL_IO_WRITER_EVENT_CAN_WRITE_CHANGED, NULL); +} + +EOLIAN static void +_efl_io_buffered_stream_inner_io_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Efl_Object *io) +{ + Eina_Bool is_reader, is_writer; + + EINA_SAFETY_ON_TRUE_RETURN(efl_finalized_get(o)); + EINA_SAFETY_ON_NULL_RETURN(io); + EINA_SAFETY_ON_TRUE_RETURN(pd->inner_io != NULL); + + pd->is_closer = efl_isa(io, EFL_IO_CLOSER_MIXIN); + is_reader = efl_isa(io, EFL_IO_READER_INTERFACE); + is_writer = efl_isa(io, EFL_IO_WRITER_INTERFACE); + + EINA_SAFETY_ON_TRUE_RETURN((!is_reader) && (!is_writer)); + + pd->inner_io = efl_ref(io); + efl_event_callback_array_add(io, _efl_io_buffered_stream_inner_io_cbs(), o); + + /* inner_io -> incoming */ + if (is_reader) + { + DBG("%p inner_io=%p (%s) is Efl.Io.Reader", o, io, efl_class_name_get(efl_class_get(io))); + pd->incoming = efl_add(EFL_IO_QUEUE_CLASS, o, + efl_name_set(efl_added, "incoming"), + efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_incoming_cbs(), o)); + EINA_SAFETY_ON_NULL_RETURN(pd->incoming); + + pd->receiver = efl_add(EFL_IO_COPIER_CLASS, o, + efl_name_set(efl_added, "receiver"), + efl_io_copier_buffer_limit_set(efl_added, 4096), + efl_io_copier_source_set(efl_added, io), + efl_io_copier_destination_set(efl_added, pd->incoming), + efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE), + efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_receiver_cbs(), o)); + EINA_SAFETY_ON_NULL_RETURN(pd->receiver); + } + else + { + DBG("%p inner_io=%p (%s) is not Efl.Io.Reader", o, io, efl_class_name_get(efl_class_get(io))); + efl_io_reader_eos_set(o, EINA_TRUE); + } + + + /* outgoing -> inner_io */ + if (is_writer) + { + DBG("%p inner_io=%p (%s) is Efl.Io.Writer", o, io, efl_class_name_get(efl_class_get(io))); + pd->outgoing = efl_add(EFL_IO_QUEUE_CLASS, o, + efl_name_set(efl_added, "outgoing"), + efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_outgoing_cbs(), o)); + EINA_SAFETY_ON_NULL_RETURN(pd->outgoing); + + pd->sender = efl_add(EFL_IO_COPIER_CLASS, o, + efl_name_set(efl_added, "sender"), + efl_io_copier_buffer_limit_set(efl_added, 4096), + efl_io_copier_source_set(efl_added, pd->outgoing), + efl_io_copier_destination_set(efl_added, io), + efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE), + efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_sender_cbs(), o)); + EINA_SAFETY_ON_NULL_RETURN(pd->sender); + } + else + DBG("%p inner_io=%p (%s) is not Efl.Io.Writer", o, io, efl_class_name_get(efl_class_get(io))); +} + +EOLIAN static Efl_Object * +_efl_io_buffered_stream_inner_io_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + return pd->inner_io; +} + +EOLIAN static void +_efl_io_buffered_stream_max_queue_size_input_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t max_queue_size_input) +{ + if (!pd->incoming) + { + DBG("%p inner_io=%p (%s) is not Efl.Io.Reader, limit=%zu ignored", o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)), max_queue_size_input); + return; + } + efl_io_queue_limit_set(pd->incoming, max_queue_size_input); +} + +EOLIAN static size_t +_efl_io_buffered_stream_max_queue_size_input_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + if (!pd->incoming) return 0; + return efl_io_queue_limit_get(pd->incoming); +} + +EOLIAN static void +_efl_io_buffered_stream_max_queue_size_output_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t max_queue_size_output) +{ + if (!pd->outgoing) + { + DBG("%p inner_io=%p (%s) is not Efl.Io.Writer, limit=%zu ignored", o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)), max_queue_size_output); + return; + } + efl_io_queue_limit_set(pd->outgoing, max_queue_size_output); +} + +EOLIAN static size_t +_efl_io_buffered_stream_max_queue_size_output_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + if (!pd->outgoing) return 0; + return efl_io_queue_limit_get(pd->outgoing); +} + +EOLIAN static void +_efl_io_buffered_stream_line_delimiter_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, const Eina_Slice *slice) +{ + if (!pd->receiver) + { + DBG("%p inner_io=%p (%s) is not Efl.Io.Reader, slice=%p ignored", o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)), slice); + return; + } + efl_io_copier_line_delimiter_set(pd->receiver, slice); +} + +EOLIAN static const Eina_Slice * +_efl_io_buffered_stream_line_delimiter_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + if (!pd->receiver) return NULL; + return efl_io_copier_line_delimiter_get(pd->receiver); +} + +EOLIAN static void +_efl_io_buffered_stream_inactivity_timeout_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, double seconds) +{ + if (pd->receiver) + efl_io_copier_inactivity_timeout_set(pd->receiver, seconds); + if (pd->sender) + efl_io_copier_inactivity_timeout_set(pd->sender, seconds); +} + +EOLIAN static double +_efl_io_buffered_stream_inactivity_timeout_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + if (pd->receiver) + return efl_io_copier_inactivity_timeout_get(pd->receiver); + if (pd->sender) + return efl_io_copier_inactivity_timeout_get(pd->sender); + return 0.0; +} + +EOLIAN static void +_efl_io_buffered_stream_read_chunk_size_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t size) +{ + if (pd->sender) + { + efl_io_copier_buffer_limit_set(pd->sender, size); + efl_io_copier_read_chunk_size_set(pd->sender, size); + } + + if (!pd->receiver) + { + efl_io_copier_buffer_limit_set(pd->receiver, size); + efl_io_copier_read_chunk_size_set(pd->receiver, size); + } +} + +EOLIAN static size_t +_efl_io_buffered_stream_read_chunk_size_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + if (!pd->receiver) return 0; + return efl_io_copier_read_chunk_size_get(pd->receiver); +} + +EOLIAN static size_t +_efl_io_buffered_stream_pending_write_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + if (!pd->outgoing) return 0; + return efl_io_queue_usage_get(pd->outgoing); +} + +EOLIAN static size_t +_efl_io_buffered_stream_pending_read_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + if (!pd->incoming) return 0; + return efl_io_queue_usage_get(pd->incoming); +} + +EOLIAN static Eina_Bool +_efl_io_buffered_stream_slice_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, Eina_Slice *slice) +{ + if (!pd->incoming) + { + if (slice) + { + slice->mem = NULL; + slice->len = 0; + } + return EINA_FALSE; + } + return efl_io_queue_slice_get(pd->incoming, slice); +} + +EOLIAN static void +_efl_io_buffered_stream_discard(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t amount) +{ + if (!pd->incoming) return; + efl_io_queue_discard(pd->incoming, amount); +} + +EOLIAN static void +_efl_io_buffered_stream_clear(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd) +{ + if (!pd->incoming) return; + efl_io_queue_clear(pd->incoming); +} + +EOLIAN static void +_efl_io_buffered_stream_eos_mark(Eo *o, Efl_Io_Buffered_Stream_Data *pd) +{ + if (!pd->incoming) return; + DBG("%p mark eos", o); + efl_io_queue_eos_mark(pd->outgoing); +} + +EOLIAN static Eina_Bool +_efl_io_buffered_stream_flush(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool may_block, Eina_Bool ignore_line_delimiter) +{ + size_t pending; + Eina_Bool ret; + + EINA_SAFETY_ON_TRUE_RETURN_VAL(efl_io_closer_closed_get(o), EINA_FALSE); + + if (!pd->outgoing) return EINA_TRUE; + + pending = efl_io_queue_usage_get(pd->outgoing); + if (!pending) + return EINA_TRUE; + + if (pd->is_closer && efl_io_closer_closed_get(pd->inner_io)) + { + DBG("%p the inner I/O %p is already closed", o, pd->inner_io); + return EINA_TRUE; + } + + DBG("%p attempt to flush %zu bytes, may_block=%hhu, ignore_line_delimiter=%hhu...", o, pending, may_block, ignore_line_delimiter); + ret = efl_io_copier_flush(pd->sender, may_block, ignore_line_delimiter); + DBG("%p flushed, ret=%hhu, still pending=%zu", o, ret, efl_io_queue_usage_get(pd->outgoing)); + + return ret; +} + +#include "efl_io_buffered_stream.eo.c" diff --git a/src/lib/ecore/efl_io_buffered_stream.eo b/src/lib/ecore/efl_io_buffered_stream.eo new file mode 100644 index 0000000000..28049c4078 --- /dev/null +++ b/src/lib/ecore/efl_io_buffered_stream.eo @@ -0,0 +1,251 @@ +class Efl.Io.Buffered_Stream (Efl.Loop_User, Efl.Io.Reader, Efl.Io.Writer, Efl.Io.Closer) { + [[A wrapper object offering an easy to use, buffered streams over existing I/O class. + + The buffered stream encapsulates an actual @Efl.Io.Reader or + @Efl.Io.Writer, an input @Efl.Io.Queue, an output @Efl.Io.Queue + and these are linked using a input and a output + @Efl.Io.Copier. + + The idea is that unlike traditional @Efl.Io.Writer that will + attempt to write directly and thus may take less data than + requested, this one will keep the pending data in its own + buffer, feeding to the actual output when it + @Efl.Io.Writer.can_write. That makes its operation much simpler + as @Efl.Io.Writer.write will always take the full data -- allows + "write and forget", if unlimited (see + @.max_queue_size_output). When finished writing data, the + @.eos_mark and then wait for "write,finished" event to know when all data + was sent. + + Reading is also much simpler since incoming data is kept in an + @Efl.Io.Queue, thus its size can be queried with @.pending_read + and read with @Efl.Io.Reader.read or peeked with @.slice_get, + then discarded with @.discard or @.clear. + + Then when waiting for a complete message, just peek at its + contents, if not complete do nothing and wait, if complete then + either @Efl.Io.Reader.read to get a copy or manipulate a + read-only reference from @.slice_get and then @.discard + + The actual I/O is set with the constructor method @.inner_io.set + and can be retrieved with @.inner_io.get, which should be used + with care -- calling @Efl.Io.Reader.read and + @Efl.Io.Writer.write on it may produce unexpected results. + + @since 1.19 + ]] + + methods { + @property inner_io { + [[The inner I/O this wrapper operates on.]] + get { + [[The internal input/output used for actual operations, use with care!]] + } + set { + [[Constructor-only property to set the inner_io.]] + } + values { + io: Efl.Object; [[The input (@Efl.Io.Reader) or output (@Efl.Io.Writer) instance]] + } + } + + @property max_queue_size_input { + [[Limit how big the input queue can grow, in bytes. + + If limited and @.line_delimiter is set, "line" events + may be emitted with partial contents, without the + trailing delimiter. + ]] + get { } + set { + [[Constructor-only property to set buffer limit. 0 is unlimited]] + } + values { + max_queue_size_input: size; [[Defines a maximum buffer size for incoming data, or 0 to allow unlimited amount of bytes]] + } + } + + @property max_queue_size_output { + [[Limit how big the output queue can grow, in bytes. + + + If limited, @Efl.Io.Writer.write will take less data than requested! + ]] + get { } + set { + [[Constructor-only property to set buffer limit. 0 is unlimited]] + } + values { + max_queue_size_output: size; [[Defines a maximum buffer size for output data, or 0 to allow unlimited amount of bytes. If limited, @Efl.Io.Writer.write will take less data than requested!]] + } + } + + @property line_delimiter { + [[If set, incoming data will be checked for the delimiter and "line" events are The line may include the delimiter, unless it's end-of-stream on @.max_queue_size_input was reached.]] + get { } + set { + [[Change line delimiter to use. If NULL or empty, no delimiter is to be used]] + } + values { + slice: ptr(const(Eina.Slice)); [[The contents may contain \0 and will be copied]] + } + } + + @property inactivity_timeout { + [[Error as ETIMEDOUT if it becomes inactive for some time. + + If no activity, that is no read or write in the given + amount of seconds, then the object will emit "error" + event with ETIMEDOUT value. + + This is specified in seconds and is only active for + greater-than zero. Defaults to inactive. + ]] + values { + seconds: double; [[Number inactive seconds to timeout this object. If zero or less, it will be disabled.]] + } + } + + @property read_chunk_size { + [[Read chunk size property, in bytes. + + When reading the @.inner_io for data to be placed in + input queue, use this as chunk size. + + Setting this value large enough may reduce number of + @Efl.Io.Reader.read, improving performance at the expense + of more memory consumption. + + This value is bounded by @.max_queue_size_input if it's set. + + By default it's 4096. + ]] + get { + } + set { + [[Set chunk size for each basic @Efl.Io.Reader.read operation.]] + } + values { + size: size; [[This is the chunk size to use for read operations]] + } + } + + @property pending_write { + [[How many bytes are pending write to @.inner_io]] + get { } + values { + usage: size; [[Bytes available to write]] + } + } + + @property pending_read { + [[How many bytes are pending (available) for read]] + get { } + values { + usage: size; [[Bytes available to read]] + } + } + + slice_get { // TODO: property and return of Eina.Slice (not pointer) + [[Get a temporary access to input queue's internal read memory. + + The memory pointed by slice may be changed by other + methods of this class. The event "slice,changed" will be + called in those situations. + ]] + params { + @out slice: Eina.Slice; [[Slice of the current buffer, may be invalidated if @Efl.Io.Writer.write, @Efl.Io.Closer.close or @Efl.Io.Reader.read are called. It is the full slice available for reading.]] + } + return: bool (false); [[$true on success, $false otherwise]] + } + + discard { + [[Discard the given number of bytes. + + This has the same effect as reading and discarding the + given amount of bytes, without executing the actual + copy. + + It's often paired with @.slice_get, if users read the + information from the slice and once they're done, that + data must be discarded. + + As an example, some protocols provide messages with a + "size" header, then @.slice_get is used to peek into the + available memory to see if there is a "size" and if the + rest of the slice is the full payload, in this case the + slice may be handled to some processing function. When + the function is done, that amount of data must be + discarded -- with this function. + ]] + params { + amount: size; [[Bytes to discard]] + } + } + + clear { + [[Clear the incoming queue. Same as reading all data. + + This is equivalent as calling @.discard with @.pending_read + amount of bytes. + ]] + } + + eos_mark { + [[Mark this end-of-stream, signals nothing else will be written. + + That will forbid any further writes. + + Unlike @Efl.Io.Closer.close, this won't clear anything. + + When all data is written, "write,finished" is emitted. + ]] + } + + flush { + [[Forces writing all pending data to destination. + + It will return $true if @.pending_read drops to zero, $false + otherwise and more calls to flush must be made. + + If the @.inner_io is implements @Efl.Io.Closer and it + was closed, or the wrapper itself was closed, this + function will do nothing and returns $true. + + \@note this function may block the main loop execution + until operations complete! This is bad for usability, as + user interface or other operations may freeze. A better + approach is to operate asynchronously and wait for + "write,finished" event. + ]] + params { + may_block: bool; [[If $true, then @Efl.Io.Reader.can_read and @Efl.Io.Writer.can_write are not checked and the call may block.]] + ignore_line_delimiter: bool; [[Force flush ignoring line delimiters]] + } + return: bool(true); [[$true if all data was sent, $false otherwise]] + } + } + + events { + write,finished; [[@.eos_mark was called and all available data was sent to destination]] + read,finished; [[Same as @Efl.Io.Reader "eos", for consistency.]] + finished; [[Both read and write are finished.]] + error: Eina.Error; [[An error happened and the I/O stopped]] + slice,changed; [[The read-slice returned by @.slice_get may have changed.]] + line: ptr(const(Eina.Slice)); [[If @.line_delimiter is set, will be emitted with current line. The memory is only valid during event callback dispatched and should not be modified. Note that the line slice may not be inside @.slice_get, don't assume that!]] + } + + implements { + Efl.Object.finalize; + Efl.Object.destructor; + Efl.Io.Closer.close; + Efl.Io.Closer.closed.get; + Efl.Io.Closer.close_on_exec; + Efl.Io.Closer.close_on_destructor; + Efl.Io.Reader.read; + Efl.Io.Reader.can_read; + Efl.Io.Reader.eos; + Efl.Io.Writer.write; + Efl.Io.Writer.can_write; + } +}