efl_io_buffered_stream: wraps an I/O object and make it easy to use.

Since all other efl.io objects are low-level, the recommended approach
is to use an efl.io.copier. However when dealing with in-memory,
bi-directional comms like talking to a socket, we always end with 2
queues, 2 copiers and the annoying setup that is being replicated in
ecore_ipc, efl_debug and so on.

This class is the base to make it simpler. Other classes such as
Efl.Net.Socket.Simple, Efl.Net.Dialer.Simple and Efl.Net.Server.Simple
will use it to provide simpler code to users.

I guess we can call EFL+EO Java now?
This commit is contained in:
Gustavo Sverzut Barbieri 2016-11-25 01:27:33 -02:00
parent 16be61c7e1
commit 46341b329d
7 changed files with 1123 additions and 1 deletions

View File

@ -19,7 +19,9 @@ ecore_eolian_files_public = \
lib/ecore/efl_io_stdout.eo \
lib/ecore/efl_io_stderr.eo \
lib/ecore/efl_io_file.eo \
lib/ecore/efl_io_copier.eo
lib/ecore/efl_io_copier.eo \
lib/ecore/efl_io_buffered_stream.eo
ecore_eolian_files = \
$(ecore_eolian_files_public) \
@ -82,6 +84,7 @@ lib/ecore/efl_io_stdout.c \
lib/ecore/efl_io_stderr.c \
lib/ecore/efl_io_file.c \
lib/ecore/efl_io_copier.c \
lib/ecore/efl_io_buffered_stream.c \
lib/ecore/efl_promise.c \
lib/ecore/ecore_pipe.c \
lib/ecore/ecore_poller.c \

View File

@ -51,6 +51,7 @@
/efl_io_copier_example
/efl_io_copier_simple_example
/efl_io_queue_example
/efl_io_buffered_stream_example
/efl_net_server_example
/efl_net_dialer_http_example
/efl_net_dialer_websocket_example

View File

@ -84,6 +84,7 @@ ecore_con_eet_server_example \
efl_io_copier_example \
efl_io_copier_simple_example \
efl_io_queue_example \
efl_io_buffered_stream_example \
efl_net_server_example \
efl_net_dialer_http_example \
efl_net_dialer_websocket_example \
@ -317,6 +318,9 @@ efl_io_copier_simple_example_LDADD = $(ECORE_COMMON_LDADD)
efl_io_queue_example_SOURCES = efl_io_queue_example.c
efl_io_queue_example_LDADD = $(ECORE_CON_COMMON_LDADD)
efl_io_buffered_stream_example_SOURCES = efl_io_buffered_stream_example.c
efl_io_buffered_stream_example_LDADD = $(ECORE_CON_COMMON_LDADD)
efl_net_server_example_SOURCES = efl_net_server_example.c
efl_net_server_example_LDADD = $(ECORE_CON_COMMON_LDADD)
@ -407,6 +411,7 @@ ecore_con_eet_descriptor_example.c \
efl_io_copier_example.c \
efl_io_copier_simple_example.c \
efl_io_queue_example.c \
efl_io_buffered_stream_example.c \
efl_net_server_example.c \
efl_net_dialer_http_example.c \
efl_net_dialer_websocket_example.c \

View File

@ -0,0 +1,312 @@
#define EFL_BETA_API_SUPPORT 1
#define EFL_EO_API_SUPPORT 1
#include <Ecore.h>
#include <Ecore_Getopt.h>
#include <Ecore_Con.h>
static int retval = EXIT_SUCCESS;
static Eina_List *commands = NULL;
static Eina_Slice line_delimiter;
static Eo *stream = NULL;
static void
_command_next(void)
{
Eina_Slice slice;
char *cmd;
if (!commands)
{
efl_io_buffered_stream_eos_mark(stream);
return;
}
cmd = commands->data;
commands = eina_list_remove_list(commands, commands);
slice = (Eina_Slice)EINA_SLICE_STR(cmd);
efl_io_writer_write(stream, &slice, NULL);
fprintf(stderr, "INFO: sent '" EINA_SLICE_STR_FMT "'\n",
EINA_SLICE_STR_PRINT(slice));
/* don't use line_delimiter directly, 'len' may be changed! */
slice = line_delimiter;
efl_io_writer_write(stream, &slice, NULL);
free(cmd);
}
static void
_receiver_data(void *data EINA_UNUSED, const Efl_Event *event)
{
Eina_Slice slice;
if (!efl_io_buffered_stream_slice_get(event->object, &slice)) return;
/* this will happen when we're called when we issue our own
* efl_io_buffered_stream_clear() below.
*/
if (slice.len == 0) return;
/*
* If the server didn't send us the line terminator and closed the
* connection (ie: efl_io_reader_eos_get() == true) or if the
* efl_io_buffered_stream_max_queue_size_input_set() was reached,
* then we may have a line without a trailing delimiter. Check for
* that.
*/
if (!eina_slice_endswith(slice, line_delimiter))
{
fprintf(stderr, "WARNING: received without line-delimiter '"
EINA_SLICE_STR_FMT "'\n",
EINA_SLICE_STR_PRINT(slice));
}
else
{
slice.len -= line_delimiter.len;
fprintf(stderr, "INFO: received '" EINA_SLICE_STR_FMT "'\n",
EINA_SLICE_STR_PRINT(slice));
}
efl_io_buffered_stream_clear(event->object);
_command_next();
}
static void
_dialer_connected(void *data EINA_UNUSED, const Efl_Event *event)
{
fprintf(stderr, "INFO: connected to %s (%s)\n",
efl_net_dialer_address_dial_get(event->object),
efl_net_socket_address_remote_get(event->object));
_command_next();
}
static void
_stream_write_finished(void *data EINA_UNUSED, const Efl_Event *event)
{
fprintf(stderr, "INFO: %s done sending\n", efl_name_get(event->object));
}
static void
_stream_error(void *data EINA_UNUSED, const Efl_Event *event)
{
const Eina_Error *perr = event->info;
fprintf(stderr, "INFO: %s error: #%d '%s'\n",
efl_name_get(event->object), *perr, eina_error_msg_get(*perr));
retval = EXIT_FAILURE;
ecore_main_loop_quit();
}
static void
_stream_eos(void *data EINA_UNUSED, const Efl_Event *event)
{
fprintf(stderr, "INFO: %s eos, quit\n", efl_name_get(event->object));
ecore_main_loop_quit();
}
EFL_CALLBACKS_ARRAY_DEFINE(stream_cbs,
{ EFL_IO_BUFFERED_STREAM_EVENT_LINE, _receiver_data },
{ EFL_IO_READER_EVENT_EOS, _stream_eos },
{ EFL_IO_BUFFERED_STREAM_EVENT_WRITE_FINISHED, _stream_write_finished },
{ EFL_IO_BUFFERED_STREAM_EVENT_ERROR, _stream_error });
static char *
_unescape(const char *str)
{
char *ret = strdup(str);
char *c, *w;
Eina_Bool escaped = EINA_FALSE;
for (c = ret, w = ret; *c != '\0'; c++)
{
if (escaped)
{
escaped = EINA_FALSE;
switch (*c)
{
case 'n': *w = '\n'; break;
case 'r': *w = '\r'; break;
case 't': *w = '\t'; break;
default: w++; /* no change */
}
w++;
}
else
{
if (*c == '\\')
escaped = EINA_TRUE;
else
w++;
}
}
*w = '\0';
return ret;
}
static const Ecore_Getopt options = {
"efl_io_buffered_stream_example", /* program name */
NULL, /* usage line */
"1", /* version */
"(C) 2016 Enlightenment Project", /* copyright */
"BSD 2-Clause", /* license */
/* long description, may be multiline and contain \n */
"Example of Efl_Io_Buffered_Stream usage.\n"
"\n"
"This uses Efl_Io_Buffered_Stream to easily interface with Efl_Net_Dialer_Tcp.",
EINA_FALSE,
{
ECORE_GETOPT_STORE_STR('d', "line-delimiter",
"Changes the line delimiter to be used in both send and receive. Defaults to \\r\\n"),
ECORE_GETOPT_STORE_ULONG('l', "buffer-limit",
"If set will limit buffer size to this limit of bytes. If used alongside with --line-delimiter and that delimiter was not found but bffer limit was reached, the line event will be triggered without the delimiter at the end."),
ECORE_GETOPT_VERSION('V', "version"),
ECORE_GETOPT_COPYRIGHT('C', "copyright"),
ECORE_GETOPT_LICENSE('L', "license"),
ECORE_GETOPT_HELP('h', "help"),
ECORE_GETOPT_STORE_METAVAR_STR(0, NULL,
"The server address as\n"
"IP:PORT to connect using TCP and an IPv4 (A.B.C.D:PORT) or IPv6 ([A:B:C:D::E]:PORT).\n",
"server_address"),
ECORE_GETOPT_APPEND_METAVAR(0, NULL,
"Commands to send",
"commands",
ECORE_GETOPT_TYPE_STR),
ECORE_GETOPT_SENTINEL
}
};
int
main(int argc, char **argv)
{
char *address = NULL;
char *line_delimiter_str = NULL;
char *cmd;
unsigned long buffer_limit = 0;
Eina_Bool quit_option = EINA_FALSE;
Ecore_Getopt_Value values[] = {
ECORE_GETOPT_VALUE_STR(line_delimiter_str),
ECORE_GETOPT_VALUE_ULONG(buffer_limit),
/* standard block to provide version, copyright, license and help */
ECORE_GETOPT_VALUE_BOOL(quit_option), /* -V/--version quits */
ECORE_GETOPT_VALUE_BOOL(quit_option), /* -C/--copyright quits */
ECORE_GETOPT_VALUE_BOOL(quit_option), /* -L/--license quits */
ECORE_GETOPT_VALUE_BOOL(quit_option), /* -h/--help quits */
/* positional argument */
ECORE_GETOPT_VALUE_STR(address),
ECORE_GETOPT_VALUE_LIST(commands),
ECORE_GETOPT_VALUE_NONE /* sentinel */
};
Eina_Error err;
int args;
Eo *dialer, *loop;
ecore_init();
ecore_con_init();
args = ecore_getopt_parse(&options, values, argc, argv);
if (args < 0)
{
fputs("ERROR: Could not parse command line options.\n", stderr);
retval = EXIT_FAILURE;
goto end;
}
if (quit_option) goto end;
args = ecore_getopt_parse_positional(&options, values, argc, argv, args);
if (args < 0)
{
fputs("ERROR: Could not parse positional arguments.\n", stderr);
retval = EXIT_FAILURE;
goto end;
}
line_delimiter_str = _unescape(line_delimiter_str ? line_delimiter_str : "\\r\\n");
if (!commands)
{
fputs("ERROR: missing commands to send.\n", stderr);
retval = EXIT_FAILURE;
goto end;
}
/*
* some objects such as the Efl.Io.Buffered_Stream and
* Efl.Net.Dialer.Tcp depend on main loop, thus their parent must
* be a loop provider. We use the loop itself.
*/
loop = ecore_main_loop_get();
/* The TCP client to use to send/receive network data */
dialer = efl_add(EFL_NET_DIALER_TCP_CLASS, loop,
efl_name_set(efl_added, "dialer"),
efl_event_callback_add(efl_added, EFL_NET_DIALER_EVENT_CONNECTED, _dialer_connected, NULL));
if (!dialer)
{
fprintf(stderr, "ERROR: could not create Efl_Net_Dialer_Tcp\n");
retval = EXIT_FAILURE;
goto end;
}
line_delimiter = (Eina_Slice)EINA_SLICE_STR(line_delimiter_str);
/*
* Without the buffered stream we'd have to create two Efl.Io.Queue
* ourselves, as well as two Efl.Io.Copier to link them with the
* dialer.
*
* Our example's usage is to write each command at once followed by
* the line_delimiter, then wait for a reply from the server, then
* write another.
*
* On incoming data we peek at it with slice_get() and then clear().
*/
stream = efl_add(EFL_IO_BUFFERED_STREAM_CLASS, loop,
efl_name_set(efl_added, "stream"),
efl_io_buffered_stream_inner_io_set(efl_added, dialer), /* mandatory! */
efl_io_buffered_stream_line_delimiter_set(efl_added, &line_delimiter),
efl_io_buffered_stream_max_queue_size_input_set(efl_added, buffer_limit),
efl_io_buffered_stream_max_queue_size_output_set(efl_added, buffer_limit),
efl_event_callback_array_add(efl_added, stream_cbs(), NULL));
if (!stream)
{
fprintf(stderr, "ERROR: could not create Efl_Io_Buffered_Stream\n");
retval = EXIT_FAILURE;
goto error_stream;
}
/*
* From here on it's mostly the same all Efl_Io_Copier would do,
* check efl_io_copier_simple_example.c and efl_io_copier_example.c
*/
err = efl_net_dialer_dial(dialer, address);
if (err)
{
fprintf(stderr, "ERROR: could not dial %s: %s\n",
address, eina_error_msg_get(err));
goto error_dialing;
}
ecore_main_loop_begin();
error_dialing:
efl_io_closer_close(stream);
efl_del(stream);
error_stream:
efl_del(dialer);
end:
EINA_LIST_FREE(commands, cmd)
{
fprintf(stderr, "ERROR: unsent command: %s\n", cmd);
free(cmd);
}
ecore_con_shutdown();
ecore_shutdown();
return retval;
}

View File

@ -110,6 +110,7 @@ EAPI Efl_Future *efl_future_iterator_race(Eina_Iterator *it);
#include "efl_io_stderr.eo.h"
#include "efl_io_file.eo.h"
#include "efl_io_copier.eo.h"
#include "efl_io_buffered_stream.eo.h"
/**
* @}

View File

@ -0,0 +1,549 @@
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#define EFL_IO_READER_PROTECTED 1
#define EFL_IO_WRITER_PROTECTED 1
#define EFL_IO_CLOSER_PROTECTED 1
#include <Ecore.h>
#include "ecore_private.h"
typedef struct
{
Eo *inner_io;
Eo *incoming;
Eo *outgoing;
Eo *sender;
Eo *receiver;
Eina_Bool closed;
Eina_Bool eos;
Eina_Bool can_read;
Eina_Bool can_write;
Eina_Bool is_closer;
} Efl_Io_Buffered_Stream_Data;
#define MY_CLASS EFL_IO_BUFFERED_STREAM_CLASS
static void
_efl_io_buffered_stream_error(void *data, const Efl_Event *event)
{
Eo *o = data;
Eina_Error *perr = event->info;
DBG("%p %s error: %s", o, efl_name_get(event->object), eina_error_msg_get(*perr));
efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_ERROR, event->info);
}
static void
_efl_io_buffered_stream_incoming_can_read_changed(void *data, const Efl_Event *event)
{
Eo *o = data;
if (efl_io_closer_closed_get(o)) return; /* already closed (or closing) */
efl_io_reader_can_read_set(o, efl_io_reader_can_read_get(event->object));
}
static void
_efl_io_buffered_stream_incoming_slice_changed(void *data, const Efl_Event *event EINA_UNUSED)
{
Eo *o = data;
efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_SLICE_CHANGED, NULL);
}
EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_incoming_cbs,
{ EFL_IO_READER_EVENT_CAN_READ_CHANGED, _efl_io_buffered_stream_incoming_can_read_changed },
{ EFL_IO_QUEUE_EVENT_SLICE_CHANGED, _efl_io_buffered_stream_incoming_slice_changed });
static void
_efl_io_buffered_stream_receiver_line(void *data, const Efl_Event *event)
{
Eo *o = data;
efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_LINE, event->info);
}
static void
_efl_io_buffered_stream_receiver_done(void *data, const Efl_Event *event EINA_UNUSED)
{
Eo *o = data;
if (efl_io_closer_closed_get(o)) return; /* already closed (or closing) */
efl_io_reader_eos_set(o, EINA_TRUE);
}
EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_receiver_cbs,
{ EFL_IO_COPIER_EVENT_DONE, _efl_io_buffered_stream_receiver_done },
{ EFL_IO_COPIER_EVENT_LINE, _efl_io_buffered_stream_receiver_line },
{ EFL_IO_COPIER_EVENT_ERROR, _efl_io_buffered_stream_error });
static void
_efl_io_buffered_stream_outgoing_can_write_changed(void *data, const Efl_Event *event)
{
Eo *o = data;
if (efl_io_closer_closed_get(o)) return; /* already closed (or closing) */
efl_io_writer_can_write_set(o, efl_io_writer_can_write_get(event->object));
}
EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_outgoing_cbs,
{ EFL_IO_WRITER_EVENT_CAN_WRITE_CHANGED, _efl_io_buffered_stream_outgoing_can_write_changed });
static void
_efl_io_buffered_stream_sender_done(void *data, const Efl_Event *event EINA_UNUSED)
{
Eo *o = data;
Efl_Io_Buffered_Stream_Data *pd = efl_data_scope_get(o, MY_CLASS);
efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_WRITE_FINISHED, NULL);
if (efl_io_copier_done_get(pd->receiver))
efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_FINISHED, NULL);
}
EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_sender_cbs,
{ EFL_IO_COPIER_EVENT_DONE, _efl_io_buffered_stream_sender_done },
{ EFL_IO_COPIER_EVENT_ERROR, _efl_io_buffered_stream_error });
static void
_efl_io_buffered_stream_inner_io_del(void *data, const Efl_Event *event)
{
Eo *o = data;
Efl_Io_Buffered_Stream_Data *pd = efl_data_scope_get(o, MY_CLASS);
DBG("%p the inner I/O %p was deleted", o, event->object);
if (pd->inner_io == event->object)
pd->inner_io = NULL;
}
EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_inner_io_cbs,
{ EFL_EVENT_DEL, _efl_io_buffered_stream_inner_io_del });
EOLIAN static Efl_Object *
_efl_io_buffered_stream_efl_object_finalize(Eo *o, Efl_Io_Buffered_Stream_Data *pd)
{
if (!pd->inner_io)
{
ERR("no valid I/O was set with efl_io_buffered_stream_inner_io_set()!");
return NULL;
}
return efl_finalize(efl_super(o, MY_CLASS));
}
EOLIAN static void
_efl_io_buffered_stream_efl_object_destructor(Eo *o, Efl_Io_Buffered_Stream_Data *pd)
{
if (pd->incoming)
{
efl_del(pd->incoming);
pd->incoming = NULL;
}
if (pd->outgoing)
{
efl_del(pd->outgoing);
pd->outgoing = NULL;
}
if (pd->sender)
{
efl_del(pd->sender);
pd->sender = NULL;
}
if (pd->receiver)
{
efl_del(pd->receiver);
pd->receiver = NULL;
}
if (pd->inner_io)
{
efl_event_callback_array_del(pd->inner_io, _efl_io_buffered_stream_inner_io_cbs(), o);
efl_unref(pd->inner_io); /* do not del, just take our ref */
pd->inner_io = NULL;
}
efl_destructor(efl_super(o, MY_CLASS));
}
EOLIAN static Eina_Error
_efl_io_buffered_stream_efl_io_closer_close(Eo *o, Efl_Io_Buffered_Stream_Data *pd)
{
Eina_Error err = 0;
EINA_SAFETY_ON_TRUE_RETURN_VAL(pd->closed, EINVAL);
/* line delimiters may be holding a last chunk of data */
if (pd->receiver) efl_io_copier_flush(pd->receiver, EINA_FALSE, EINA_TRUE);
efl_io_writer_can_write_set(o, EINA_FALSE);
efl_io_reader_can_read_set(o, EINA_FALSE);
efl_io_reader_eos_set(o, EINA_TRUE);
pd->closed = EINA_TRUE;
efl_event_callback_call(o, EFL_IO_CLOSER_EVENT_CLOSED, NULL);
if (pd->sender && (!efl_io_closer_closed_get(pd->sender)))
efl_io_closer_close(pd->sender);
if (pd->receiver && (!efl_io_closer_closed_get(pd->receiver)))
efl_io_closer_close(pd->receiver);
return err;
}
EOLIAN static Eina_Bool
_efl_io_buffered_stream_efl_io_closer_closed_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
return pd->closed || efl_io_closer_closed_get(pd->inner_io);
}
EOLIAN static Eina_Bool
_efl_io_buffered_stream_efl_io_closer_close_on_exec_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
return efl_io_closer_close_on_exec_get(pd->inner_io);
}
EOLIAN static Eina_Bool
_efl_io_buffered_stream_efl_io_closer_close_on_exec_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool value)
{
return efl_io_closer_close_on_exec_set(pd->inner_io, value);
}
EOLIAN static Eina_Bool
_efl_io_buffered_stream_efl_io_closer_close_on_destructor_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
return efl_io_closer_close_on_destructor_get(pd->inner_io);
}
EOLIAN static void
_efl_io_buffered_stream_efl_io_closer_close_on_destructor_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool value)
{
efl_io_closer_close_on_destructor_set(pd->inner_io, value);
}
EOLIAN static Eina_Error
_efl_io_buffered_stream_efl_io_reader_read(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Rw_Slice *rw_slice)
{
Eina_Error err;
if (!pd->incoming)
{
WRN("%p reading from inner_io %p (%s) that doesn't implement Efl.Io.Reader",
o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)));
return EINVAL;
}
err = efl_io_reader_read(pd->incoming, rw_slice);
if (err && (err != EAGAIN))
efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_ERROR, &err);
return err;
}
EOLIAN static Eina_Bool
_efl_io_buffered_stream_efl_io_reader_can_read_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
return pd->can_read;
}
EOLIAN static void
_efl_io_buffered_stream_efl_io_reader_can_read_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd EINA_UNUSED, Eina_Bool can_read)
{
EINA_SAFETY_ON_TRUE_RETURN(efl_io_closer_closed_get(o));
if (pd->can_read == can_read) return;
pd->can_read = can_read;
efl_event_callback_call(o, EFL_IO_READER_EVENT_CAN_READ_CHANGED, NULL);
}
EOLIAN static Eina_Bool
_efl_io_buffered_stream_efl_io_reader_eos_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
return pd->eos;
}
EOLIAN static void
_efl_io_buffered_stream_efl_io_reader_eos_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool is_eos)
{
EINA_SAFETY_ON_TRUE_RETURN(efl_io_closer_closed_get(o));
if (pd->eos == is_eos) return;
pd->eos = is_eos;
if (!is_eos) return;
efl_event_callback_call(o, EFL_IO_READER_EVENT_EOS, NULL);
efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_READ_FINISHED, NULL);
if (efl_io_copier_done_get(pd->sender))
efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_FINISHED, NULL);
}
EOLIAN static Eina_Error
_efl_io_buffered_stream_efl_io_writer_write(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Slice *slice, Eina_Slice *remaining)
{
Eina_Error err;
if (!pd->outgoing)
{
WRN("%p writing to inner_io %p (%s) that doesn't implement Efl.Io.Writer",
o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)));
return EINVAL;
}
err = efl_io_writer_write(pd->outgoing, slice, remaining);
if (err && (err != EAGAIN))
efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_ERROR, &err);
return err;
}
EOLIAN static Eina_Bool
_efl_io_buffered_stream_efl_io_writer_can_write_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
return pd->can_write;
}
EOLIAN static void
_efl_io_buffered_stream_efl_io_writer_can_write_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd EINA_UNUSED, Eina_Bool can_write)
{
EINA_SAFETY_ON_TRUE_RETURN(efl_io_closer_closed_get(o));
if (pd->can_write == can_write) return;
pd->can_write = can_write;
efl_event_callback_call(o, EFL_IO_WRITER_EVENT_CAN_WRITE_CHANGED, NULL);
}
EOLIAN static void
_efl_io_buffered_stream_inner_io_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Efl_Object *io)
{
Eina_Bool is_reader, is_writer;
EINA_SAFETY_ON_TRUE_RETURN(efl_finalized_get(o));
EINA_SAFETY_ON_NULL_RETURN(io);
EINA_SAFETY_ON_TRUE_RETURN(pd->inner_io != NULL);
pd->is_closer = efl_isa(io, EFL_IO_CLOSER_MIXIN);
is_reader = efl_isa(io, EFL_IO_READER_INTERFACE);
is_writer = efl_isa(io, EFL_IO_WRITER_INTERFACE);
EINA_SAFETY_ON_TRUE_RETURN((!is_reader) && (!is_writer));
pd->inner_io = efl_ref(io);
efl_event_callback_array_add(io, _efl_io_buffered_stream_inner_io_cbs(), o);
/* inner_io -> incoming */
if (is_reader)
{
DBG("%p inner_io=%p (%s) is Efl.Io.Reader", o, io, efl_class_name_get(efl_class_get(io)));
pd->incoming = efl_add(EFL_IO_QUEUE_CLASS, o,
efl_name_set(efl_added, "incoming"),
efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_incoming_cbs(), o));
EINA_SAFETY_ON_NULL_RETURN(pd->incoming);
pd->receiver = efl_add(EFL_IO_COPIER_CLASS, o,
efl_name_set(efl_added, "receiver"),
efl_io_copier_buffer_limit_set(efl_added, 4096),
efl_io_copier_source_set(efl_added, io),
efl_io_copier_destination_set(efl_added, pd->incoming),
efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE),
efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_receiver_cbs(), o));
EINA_SAFETY_ON_NULL_RETURN(pd->receiver);
}
else
{
DBG("%p inner_io=%p (%s) is not Efl.Io.Reader", o, io, efl_class_name_get(efl_class_get(io)));
efl_io_reader_eos_set(o, EINA_TRUE);
}
/* outgoing -> inner_io */
if (is_writer)
{
DBG("%p inner_io=%p (%s) is Efl.Io.Writer", o, io, efl_class_name_get(efl_class_get(io)));
pd->outgoing = efl_add(EFL_IO_QUEUE_CLASS, o,
efl_name_set(efl_added, "outgoing"),
efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_outgoing_cbs(), o));
EINA_SAFETY_ON_NULL_RETURN(pd->outgoing);
pd->sender = efl_add(EFL_IO_COPIER_CLASS, o,
efl_name_set(efl_added, "sender"),
efl_io_copier_buffer_limit_set(efl_added, 4096),
efl_io_copier_source_set(efl_added, pd->outgoing),
efl_io_copier_destination_set(efl_added, io),
efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE),
efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_sender_cbs(), o));
EINA_SAFETY_ON_NULL_RETURN(pd->sender);
}
else
DBG("%p inner_io=%p (%s) is not Efl.Io.Writer", o, io, efl_class_name_get(efl_class_get(io)));
}
EOLIAN static Efl_Object *
_efl_io_buffered_stream_inner_io_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
return pd->inner_io;
}
EOLIAN static void
_efl_io_buffered_stream_max_queue_size_input_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t max_queue_size_input)
{
if (!pd->incoming)
{
DBG("%p inner_io=%p (%s) is not Efl.Io.Reader, limit=%zu ignored", o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)), max_queue_size_input);
return;
}
efl_io_queue_limit_set(pd->incoming, max_queue_size_input);
}
EOLIAN static size_t
_efl_io_buffered_stream_max_queue_size_input_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
if (!pd->incoming) return 0;
return efl_io_queue_limit_get(pd->incoming);
}
EOLIAN static void
_efl_io_buffered_stream_max_queue_size_output_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t max_queue_size_output)
{
if (!pd->outgoing)
{
DBG("%p inner_io=%p (%s) is not Efl.Io.Writer, limit=%zu ignored", o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)), max_queue_size_output);
return;
}
efl_io_queue_limit_set(pd->outgoing, max_queue_size_output);
}
EOLIAN static size_t
_efl_io_buffered_stream_max_queue_size_output_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
if (!pd->outgoing) return 0;
return efl_io_queue_limit_get(pd->outgoing);
}
EOLIAN static void
_efl_io_buffered_stream_line_delimiter_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, const Eina_Slice *slice)
{
if (!pd->receiver)
{
DBG("%p inner_io=%p (%s) is not Efl.Io.Reader, slice=%p ignored", o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)), slice);
return;
}
efl_io_copier_line_delimiter_set(pd->receiver, slice);
}
EOLIAN static const Eina_Slice *
_efl_io_buffered_stream_line_delimiter_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
if (!pd->receiver) return NULL;
return efl_io_copier_line_delimiter_get(pd->receiver);
}
EOLIAN static void
_efl_io_buffered_stream_inactivity_timeout_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, double seconds)
{
if (pd->receiver)
efl_io_copier_inactivity_timeout_set(pd->receiver, seconds);
if (pd->sender)
efl_io_copier_inactivity_timeout_set(pd->sender, seconds);
}
EOLIAN static double
_efl_io_buffered_stream_inactivity_timeout_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
if (pd->receiver)
return efl_io_copier_inactivity_timeout_get(pd->receiver);
if (pd->sender)
return efl_io_copier_inactivity_timeout_get(pd->sender);
return 0.0;
}
EOLIAN static void
_efl_io_buffered_stream_read_chunk_size_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t size)
{
if (pd->sender)
{
efl_io_copier_buffer_limit_set(pd->sender, size);
efl_io_copier_read_chunk_size_set(pd->sender, size);
}
if (!pd->receiver)
{
efl_io_copier_buffer_limit_set(pd->receiver, size);
efl_io_copier_read_chunk_size_set(pd->receiver, size);
}
}
EOLIAN static size_t
_efl_io_buffered_stream_read_chunk_size_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
if (!pd->receiver) return 0;
return efl_io_copier_read_chunk_size_get(pd->receiver);
}
EOLIAN static size_t
_efl_io_buffered_stream_pending_write_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
if (!pd->outgoing) return 0;
return efl_io_queue_usage_get(pd->outgoing);
}
EOLIAN static size_t
_efl_io_buffered_stream_pending_read_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
if (!pd->incoming) return 0;
return efl_io_queue_usage_get(pd->incoming);
}
EOLIAN static Eina_Bool
_efl_io_buffered_stream_slice_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, Eina_Slice *slice)
{
if (!pd->incoming)
{
if (slice)
{
slice->mem = NULL;
slice->len = 0;
}
return EINA_FALSE;
}
return efl_io_queue_slice_get(pd->incoming, slice);
}
EOLIAN static void
_efl_io_buffered_stream_discard(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t amount)
{
if (!pd->incoming) return;
efl_io_queue_discard(pd->incoming, amount);
}
EOLIAN static void
_efl_io_buffered_stream_clear(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
{
if (!pd->incoming) return;
efl_io_queue_clear(pd->incoming);
}
EOLIAN static void
_efl_io_buffered_stream_eos_mark(Eo *o, Efl_Io_Buffered_Stream_Data *pd)
{
if (!pd->incoming) return;
DBG("%p mark eos", o);
efl_io_queue_eos_mark(pd->outgoing);
}
EOLIAN static Eina_Bool
_efl_io_buffered_stream_flush(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool may_block, Eina_Bool ignore_line_delimiter)
{
size_t pending;
Eina_Bool ret;
EINA_SAFETY_ON_TRUE_RETURN_VAL(efl_io_closer_closed_get(o), EINA_FALSE);
if (!pd->outgoing) return EINA_TRUE;
pending = efl_io_queue_usage_get(pd->outgoing);
if (!pending)
return EINA_TRUE;
if (pd->is_closer && efl_io_closer_closed_get(pd->inner_io))
{
DBG("%p the inner I/O %p is already closed", o, pd->inner_io);
return EINA_TRUE;
}
DBG("%p attempt to flush %zu bytes, may_block=%hhu, ignore_line_delimiter=%hhu...", o, pending, may_block, ignore_line_delimiter);
ret = efl_io_copier_flush(pd->sender, may_block, ignore_line_delimiter);
DBG("%p flushed, ret=%hhu, still pending=%zu", o, ret, efl_io_queue_usage_get(pd->outgoing));
return ret;
}
#include "efl_io_buffered_stream.eo.c"

View File

@ -0,0 +1,251 @@
class Efl.Io.Buffered_Stream (Efl.Loop_User, Efl.Io.Reader, Efl.Io.Writer, Efl.Io.Closer) {
[[A wrapper object offering an easy to use, buffered streams over existing I/O class.
The buffered stream encapsulates an actual @Efl.Io.Reader or
@Efl.Io.Writer, an input @Efl.Io.Queue, an output @Efl.Io.Queue
and these are linked using a input and a output
@Efl.Io.Copier.
The idea is that unlike traditional @Efl.Io.Writer that will
attempt to write directly and thus may take less data than
requested, this one will keep the pending data in its own
buffer, feeding to the actual output when it
@Efl.Io.Writer.can_write. That makes its operation much simpler
as @Efl.Io.Writer.write will always take the full data -- allows
"write and forget", if unlimited (see
@.max_queue_size_output). When finished writing data, the
@.eos_mark and then wait for "write,finished" event to know when all data
was sent.
Reading is also much simpler since incoming data is kept in an
@Efl.Io.Queue, thus its size can be queried with @.pending_read
and read with @Efl.Io.Reader.read or peeked with @.slice_get,
then discarded with @.discard or @.clear.
Then when waiting for a complete message, just peek at its
contents, if not complete do nothing and wait, if complete then
either @Efl.Io.Reader.read to get a copy or manipulate a
read-only reference from @.slice_get and then @.discard
The actual I/O is set with the constructor method @.inner_io.set
and can be retrieved with @.inner_io.get, which should be used
with care -- calling @Efl.Io.Reader.read and
@Efl.Io.Writer.write on it may produce unexpected results.
@since 1.19
]]
methods {
@property inner_io {
[[The inner I/O this wrapper operates on.]]
get {
[[The internal input/output used for actual operations, use with care!]]
}
set {
[[Constructor-only property to set the inner_io.]]
}
values {
io: Efl.Object; [[The input (@Efl.Io.Reader) or output (@Efl.Io.Writer) instance]]
}
}
@property max_queue_size_input {
[[Limit how big the input queue can grow, in bytes.
If limited and @.line_delimiter is set, "line" events
may be emitted with partial contents, without the
trailing delimiter.
]]
get { }
set {
[[Constructor-only property to set buffer limit. 0 is unlimited]]
}
values {
max_queue_size_input: size; [[Defines a maximum buffer size for incoming data, or 0 to allow unlimited amount of bytes]]
}
}
@property max_queue_size_output {
[[Limit how big the output queue can grow, in bytes.
If limited, @Efl.Io.Writer.write will take less data than requested!
]]
get { }
set {
[[Constructor-only property to set buffer limit. 0 is unlimited]]
}
values {
max_queue_size_output: size; [[Defines a maximum buffer size for output data, or 0 to allow unlimited amount of bytes. If limited, @Efl.Io.Writer.write will take less data than requested!]]
}
}
@property line_delimiter {
[[If set, incoming data will be checked for the delimiter and "line" events are The line may include the delimiter, unless it's end-of-stream on @.max_queue_size_input was reached.]]
get { }
set {
[[Change line delimiter to use. If NULL or empty, no delimiter is to be used]]
}
values {
slice: ptr(const(Eina.Slice)); [[The contents may contain \0 and will be copied]]
}
}
@property inactivity_timeout {
[[Error as ETIMEDOUT if it becomes inactive for some time.
If no activity, that is no read or write in the given
amount of seconds, then the object will emit "error"
event with ETIMEDOUT value.
This is specified in seconds and is only active for
greater-than zero. Defaults to inactive.
]]
values {
seconds: double; [[Number inactive seconds to timeout this object. If zero or less, it will be disabled.]]
}
}
@property read_chunk_size {
[[Read chunk size property, in bytes.
When reading the @.inner_io for data to be placed in
input queue, use this as chunk size.
Setting this value large enough may reduce number of
@Efl.Io.Reader.read, improving performance at the expense
of more memory consumption.
This value is bounded by @.max_queue_size_input if it's set.
By default it's 4096.
]]
get {
}
set {
[[Set chunk size for each basic @Efl.Io.Reader.read operation.]]
}
values {
size: size; [[This is the chunk size to use for read operations]]
}
}
@property pending_write {
[[How many bytes are pending write to @.inner_io]]
get { }
values {
usage: size; [[Bytes available to write]]
}
}
@property pending_read {
[[How many bytes are pending (available) for read]]
get { }
values {
usage: size; [[Bytes available to read]]
}
}
slice_get { // TODO: property and return of Eina.Slice (not pointer)
[[Get a temporary access to input queue's internal read memory.
The memory pointed by slice may be changed by other
methods of this class. The event "slice,changed" will be
called in those situations.
]]
params {
@out slice: Eina.Slice; [[Slice of the current buffer, may be invalidated if @Efl.Io.Writer.write, @Efl.Io.Closer.close or @Efl.Io.Reader.read are called. It is the full slice available for reading.]]
}
return: bool (false); [[$true on success, $false otherwise]]
}
discard {
[[Discard the given number of bytes.
This has the same effect as reading and discarding the
given amount of bytes, without executing the actual
copy.
It's often paired with @.slice_get, if users read the
information from the slice and once they're done, that
data must be discarded.
As an example, some protocols provide messages with a
"size" header, then @.slice_get is used to peek into the
available memory to see if there is a "size" and if the
rest of the slice is the full payload, in this case the
slice may be handled to some processing function. When
the function is done, that amount of data must be
discarded -- with this function.
]]
params {
amount: size; [[Bytes to discard]]
}
}
clear {
[[Clear the incoming queue. Same as reading all data.
This is equivalent as calling @.discard with @.pending_read
amount of bytes.
]]
}
eos_mark {
[[Mark this end-of-stream, signals nothing else will be written.
That will forbid any further writes.
Unlike @Efl.Io.Closer.close, this won't clear anything.
When all data is written, "write,finished" is emitted.
]]
}
flush {
[[Forces writing all pending data to destination.
It will return $true if @.pending_read drops to zero, $false
otherwise and more calls to flush must be made.
If the @.inner_io is implements @Efl.Io.Closer and it
was closed, or the wrapper itself was closed, this
function will do nothing and returns $true.
\@note this function may block the main loop execution
until operations complete! This is bad for usability, as
user interface or other operations may freeze. A better
approach is to operate asynchronously and wait for
"write,finished" event.
]]
params {
may_block: bool; [[If $true, then @Efl.Io.Reader.can_read and @Efl.Io.Writer.can_write are not checked and the call may block.]]
ignore_line_delimiter: bool; [[Force flush ignoring line delimiters]]
}
return: bool(true); [[$true if all data was sent, $false otherwise]]
}
}
events {
write,finished; [[@.eos_mark was called and all available data was sent to destination]]
read,finished; [[Same as @Efl.Io.Reader "eos", for consistency.]]
finished; [[Both read and write are finished.]]
error: Eina.Error; [[An error happened and the I/O stopped]]
slice,changed; [[The read-slice returned by @.slice_get may have changed.]]
line: ptr(const(Eina.Slice)); [[If @.line_delimiter is set, will be emitted with current line. The memory is only valid during event callback dispatched and should not be modified. Note that the line slice may not be inside @.slice_get, don't assume that!]]
}
implements {
Efl.Object.finalize;
Efl.Object.destructor;
Efl.Io.Closer.close;
Efl.Io.Closer.closed.get;
Efl.Io.Closer.close_on_exec;
Efl.Io.Closer.close_on_destructor;
Efl.Io.Reader.read;
Efl.Io.Reader.can_read;
Efl.Io.Reader.eos;
Efl.Io.Writer.write;
Efl.Io.Writer.can_write;
}
}