summaryrefslogtreecommitdiff
path: root/src/lib/ecore/efl_io_buffered_stream.c
diff options
context:
space:
mode:
authorGustavo Sverzut Barbieri <barbieri@profusion.mobi>2016-11-25 01:27:33 -0200
committerGustavo Sverzut Barbieri <barbieri@profusion.mobi>2016-11-25 17:27:32 -0200
commit46341b329d72736c1f1c47478f760ab8db76bbc8 (patch)
treef659749812561bd8aa1bb5e6ae386977ed6fe797 /src/lib/ecore/efl_io_buffered_stream.c
parent16be61c7e15d89ba9ba4529e0d280dcb5fb81da7 (diff)
efl_io_buffered_stream: wraps an I/O object and make it easy to use.
Since all other efl.io objects are low-level, the recommended approach is to use an efl.io.copier. However when dealing with in-memory, bi-directional comms like talking to a socket, we always end with 2 queues, 2 copiers and the annoying setup that is being replicated in ecore_ipc, efl_debug and so on. This class is the base to make it simpler. Other classes such as Efl.Net.Socket.Simple, Efl.Net.Dialer.Simple and Efl.Net.Server.Simple will use it to provide simpler code to users. I guess we can call EFL+EO Java now?
Diffstat (limited to '')
-rw-r--r--src/lib/ecore/efl_io_buffered_stream.c549
1 files changed, 549 insertions, 0 deletions
diff --git a/src/lib/ecore/efl_io_buffered_stream.c b/src/lib/ecore/efl_io_buffered_stream.c
new file mode 100644
index 0000000000..f75c033aea
--- /dev/null
+++ b/src/lib/ecore/efl_io_buffered_stream.c
@@ -0,0 +1,549 @@
1#ifdef HAVE_CONFIG_H
2# include <config.h>
3#endif
4
5#define EFL_IO_READER_PROTECTED 1
6#define EFL_IO_WRITER_PROTECTED 1
7#define EFL_IO_CLOSER_PROTECTED 1
8
9#include <Ecore.h>
10#include "ecore_private.h"
11
12typedef struct
13{
14 Eo *inner_io;
15 Eo *incoming;
16 Eo *outgoing;
17 Eo *sender;
18 Eo *receiver;
19 Eina_Bool closed;
20 Eina_Bool eos;
21 Eina_Bool can_read;
22 Eina_Bool can_write;
23 Eina_Bool is_closer;
24} Efl_Io_Buffered_Stream_Data;
25
26#define MY_CLASS EFL_IO_BUFFERED_STREAM_CLASS
27
28static void
29_efl_io_buffered_stream_error(void *data, const Efl_Event *event)
30{
31 Eo *o = data;
32 Eina_Error *perr = event->info;
33 DBG("%p %s error: %s", o, efl_name_get(event->object), eina_error_msg_get(*perr));
34 efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_ERROR, event->info);
35}
36
37static void
38_efl_io_buffered_stream_incoming_can_read_changed(void *data, const Efl_Event *event)
39{
40 Eo *o = data;
41 if (efl_io_closer_closed_get(o)) return; /* already closed (or closing) */
42 efl_io_reader_can_read_set(o, efl_io_reader_can_read_get(event->object));
43}
44
45static void
46_efl_io_buffered_stream_incoming_slice_changed(void *data, const Efl_Event *event EINA_UNUSED)
47{
48 Eo *o = data;
49 efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_SLICE_CHANGED, NULL);
50}
51
52EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_incoming_cbs,
53 { EFL_IO_READER_EVENT_CAN_READ_CHANGED, _efl_io_buffered_stream_incoming_can_read_changed },
54 { EFL_IO_QUEUE_EVENT_SLICE_CHANGED, _efl_io_buffered_stream_incoming_slice_changed });
55
56static void
57_efl_io_buffered_stream_receiver_line(void *data, const Efl_Event *event)
58{
59 Eo *o = data;
60 efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_LINE, event->info);
61}
62
63static void
64_efl_io_buffered_stream_receiver_done(void *data, const Efl_Event *event EINA_UNUSED)
65{
66 Eo *o = data;
67 if (efl_io_closer_closed_get(o)) return; /* already closed (or closing) */
68 efl_io_reader_eos_set(o, EINA_TRUE);
69}
70
71EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_receiver_cbs,
72 { EFL_IO_COPIER_EVENT_DONE, _efl_io_buffered_stream_receiver_done },
73 { EFL_IO_COPIER_EVENT_LINE, _efl_io_buffered_stream_receiver_line },
74 { EFL_IO_COPIER_EVENT_ERROR, _efl_io_buffered_stream_error });
75
76
77static void
78_efl_io_buffered_stream_outgoing_can_write_changed(void *data, const Efl_Event *event)
79{
80 Eo *o = data;
81 if (efl_io_closer_closed_get(o)) return; /* already closed (or closing) */
82 efl_io_writer_can_write_set(o, efl_io_writer_can_write_get(event->object));
83}
84
85EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_outgoing_cbs,
86 { EFL_IO_WRITER_EVENT_CAN_WRITE_CHANGED, _efl_io_buffered_stream_outgoing_can_write_changed });
87
88static void
89_efl_io_buffered_stream_sender_done(void *data, const Efl_Event *event EINA_UNUSED)
90{
91 Eo *o = data;
92 Efl_Io_Buffered_Stream_Data *pd = efl_data_scope_get(o, MY_CLASS);
93 efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_WRITE_FINISHED, NULL);
94 if (efl_io_copier_done_get(pd->receiver))
95 efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_FINISHED, NULL);
96}
97
98EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_sender_cbs,
99 { EFL_IO_COPIER_EVENT_DONE, _efl_io_buffered_stream_sender_done },
100 { EFL_IO_COPIER_EVENT_ERROR, _efl_io_buffered_stream_error });
101
102static void
103_efl_io_buffered_stream_inner_io_del(void *data, const Efl_Event *event)
104{
105 Eo *o = data;
106 Efl_Io_Buffered_Stream_Data *pd = efl_data_scope_get(o, MY_CLASS);
107 DBG("%p the inner I/O %p was deleted", o, event->object);
108 if (pd->inner_io == event->object)
109 pd->inner_io = NULL;
110}
111
112EFL_CALLBACKS_ARRAY_DEFINE(_efl_io_buffered_stream_inner_io_cbs,
113 { EFL_EVENT_DEL, _efl_io_buffered_stream_inner_io_del });
114
115
116EOLIAN static Efl_Object *
117_efl_io_buffered_stream_efl_object_finalize(Eo *o, Efl_Io_Buffered_Stream_Data *pd)
118{
119 if (!pd->inner_io)
120 {
121 ERR("no valid I/O was set with efl_io_buffered_stream_inner_io_set()!");
122 return NULL;
123 }
124
125 return efl_finalize(efl_super(o, MY_CLASS));
126}
127
128EOLIAN static void
129_efl_io_buffered_stream_efl_object_destructor(Eo *o, Efl_Io_Buffered_Stream_Data *pd)
130{
131 if (pd->incoming)
132 {
133 efl_del(pd->incoming);
134 pd->incoming = NULL;
135 }
136 if (pd->outgoing)
137 {
138 efl_del(pd->outgoing);
139 pd->outgoing = NULL;
140 }
141 if (pd->sender)
142 {
143 efl_del(pd->sender);
144 pd->sender = NULL;
145 }
146 if (pd->receiver)
147 {
148 efl_del(pd->receiver);
149 pd->receiver = NULL;
150 }
151
152 if (pd->inner_io)
153 {
154 efl_event_callback_array_del(pd->inner_io, _efl_io_buffered_stream_inner_io_cbs(), o);
155 efl_unref(pd->inner_io); /* do not del, just take our ref */
156 pd->inner_io = NULL;
157 }
158
159 efl_destructor(efl_super(o, MY_CLASS));
160}
161
162EOLIAN static Eina_Error
163_efl_io_buffered_stream_efl_io_closer_close(Eo *o, Efl_Io_Buffered_Stream_Data *pd)
164{
165 Eina_Error err = 0;
166
167 EINA_SAFETY_ON_TRUE_RETURN_VAL(pd->closed, EINVAL);
168
169 /* line delimiters may be holding a last chunk of data */
170 if (pd->receiver) efl_io_copier_flush(pd->receiver, EINA_FALSE, EINA_TRUE);
171
172 efl_io_writer_can_write_set(o, EINA_FALSE);
173 efl_io_reader_can_read_set(o, EINA_FALSE);
174 efl_io_reader_eos_set(o, EINA_TRUE);
175
176 pd->closed = EINA_TRUE;
177 efl_event_callback_call(o, EFL_IO_CLOSER_EVENT_CLOSED, NULL);
178
179 if (pd->sender && (!efl_io_closer_closed_get(pd->sender)))
180 efl_io_closer_close(pd->sender);
181
182 if (pd->receiver && (!efl_io_closer_closed_get(pd->receiver)))
183 efl_io_closer_close(pd->receiver);
184
185 return err;
186}
187
188EOLIAN static Eina_Bool
189_efl_io_buffered_stream_efl_io_closer_closed_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
190{
191 return pd->closed || efl_io_closer_closed_get(pd->inner_io);
192}
193
194EOLIAN static Eina_Bool
195_efl_io_buffered_stream_efl_io_closer_close_on_exec_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
196{
197 return efl_io_closer_close_on_exec_get(pd->inner_io);
198}
199
200EOLIAN static Eina_Bool
201_efl_io_buffered_stream_efl_io_closer_close_on_exec_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool value)
202{
203 return efl_io_closer_close_on_exec_set(pd->inner_io, value);
204}
205
206EOLIAN static Eina_Bool
207_efl_io_buffered_stream_efl_io_closer_close_on_destructor_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
208{
209 return efl_io_closer_close_on_destructor_get(pd->inner_io);
210}
211
212EOLIAN static void
213_efl_io_buffered_stream_efl_io_closer_close_on_destructor_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool value)
214{
215 efl_io_closer_close_on_destructor_set(pd->inner_io, value);
216}
217
218EOLIAN static Eina_Error
219_efl_io_buffered_stream_efl_io_reader_read(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Rw_Slice *rw_slice)
220{
221 Eina_Error err;
222
223 if (!pd->incoming)
224 {
225 WRN("%p reading from inner_io %p (%s) that doesn't implement Efl.Io.Reader",
226 o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)));
227 return EINVAL;
228 }
229
230 err = efl_io_reader_read(pd->incoming, rw_slice);
231 if (err && (err != EAGAIN))
232 efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_ERROR, &err);
233 return err;
234}
235
236EOLIAN static Eina_Bool
237_efl_io_buffered_stream_efl_io_reader_can_read_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
238{
239 return pd->can_read;
240}
241
242EOLIAN static void
243_efl_io_buffered_stream_efl_io_reader_can_read_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd EINA_UNUSED, Eina_Bool can_read)
244{
245 EINA_SAFETY_ON_TRUE_RETURN(efl_io_closer_closed_get(o));
246 if (pd->can_read == can_read) return;
247 pd->can_read = can_read;
248 efl_event_callback_call(o, EFL_IO_READER_EVENT_CAN_READ_CHANGED, NULL);
249}
250
251EOLIAN static Eina_Bool
252_efl_io_buffered_stream_efl_io_reader_eos_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
253{
254 return pd->eos;
255}
256
257EOLIAN static void
258_efl_io_buffered_stream_efl_io_reader_eos_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool is_eos)
259{
260 EINA_SAFETY_ON_TRUE_RETURN(efl_io_closer_closed_get(o));
261 if (pd->eos == is_eos) return;
262 pd->eos = is_eos;
263 if (!is_eos) return;
264
265 efl_event_callback_call(o, EFL_IO_READER_EVENT_EOS, NULL);
266 efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_READ_FINISHED, NULL);
267 if (efl_io_copier_done_get(pd->sender))
268 efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_FINISHED, NULL);
269}
270
271EOLIAN static Eina_Error
272_efl_io_buffered_stream_efl_io_writer_write(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Slice *slice, Eina_Slice *remaining)
273{
274 Eina_Error err;
275
276 if (!pd->outgoing)
277 {
278 WRN("%p writing to inner_io %p (%s) that doesn't implement Efl.Io.Writer",
279 o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)));
280 return EINVAL;
281 }
282
283 err = efl_io_writer_write(pd->outgoing, slice, remaining);
284 if (err && (err != EAGAIN))
285 efl_event_callback_call(o, EFL_IO_BUFFERED_STREAM_EVENT_ERROR, &err);
286 return err;
287}
288
289EOLIAN static Eina_Bool
290_efl_io_buffered_stream_efl_io_writer_can_write_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
291{
292 return pd->can_write;
293}
294
295EOLIAN static void
296_efl_io_buffered_stream_efl_io_writer_can_write_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd EINA_UNUSED, Eina_Bool can_write)
297{
298 EINA_SAFETY_ON_TRUE_RETURN(efl_io_closer_closed_get(o));
299 if (pd->can_write == can_write) return;
300 pd->can_write = can_write;
301 efl_event_callback_call(o, EFL_IO_WRITER_EVENT_CAN_WRITE_CHANGED, NULL);
302}
303
304EOLIAN static void
305_efl_io_buffered_stream_inner_io_set(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Efl_Object *io)
306{
307 Eina_Bool is_reader, is_writer;
308
309 EINA_SAFETY_ON_TRUE_RETURN(efl_finalized_get(o));
310 EINA_SAFETY_ON_NULL_RETURN(io);
311 EINA_SAFETY_ON_TRUE_RETURN(pd->inner_io != NULL);
312
313 pd->is_closer = efl_isa(io, EFL_IO_CLOSER_MIXIN);
314 is_reader = efl_isa(io, EFL_IO_READER_INTERFACE);
315 is_writer = efl_isa(io, EFL_IO_WRITER_INTERFACE);
316
317 EINA_SAFETY_ON_TRUE_RETURN((!is_reader) && (!is_writer));
318
319 pd->inner_io = efl_ref(io);
320 efl_event_callback_array_add(io, _efl_io_buffered_stream_inner_io_cbs(), o);
321
322 /* inner_io -> incoming */
323 if (is_reader)
324 {
325 DBG("%p inner_io=%p (%s) is Efl.Io.Reader", o, io, efl_class_name_get(efl_class_get(io)));
326 pd->incoming = efl_add(EFL_IO_QUEUE_CLASS, o,
327 efl_name_set(efl_added, "incoming"),
328 efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_incoming_cbs(), o));
329 EINA_SAFETY_ON_NULL_RETURN(pd->incoming);
330
331 pd->receiver = efl_add(EFL_IO_COPIER_CLASS, o,
332 efl_name_set(efl_added, "receiver"),
333 efl_io_copier_buffer_limit_set(efl_added, 4096),
334 efl_io_copier_source_set(efl_added, io),
335 efl_io_copier_destination_set(efl_added, pd->incoming),
336 efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE),
337 efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_receiver_cbs(), o));
338 EINA_SAFETY_ON_NULL_RETURN(pd->receiver);
339 }
340 else
341 {
342 DBG("%p inner_io=%p (%s) is not Efl.Io.Reader", o, io, efl_class_name_get(efl_class_get(io)));
343 efl_io_reader_eos_set(o, EINA_TRUE);
344 }
345
346
347 /* outgoing -> inner_io */
348 if (is_writer)
349 {
350 DBG("%p inner_io=%p (%s) is Efl.Io.Writer", o, io, efl_class_name_get(efl_class_get(io)));
351 pd->outgoing = efl_add(EFL_IO_QUEUE_CLASS, o,
352 efl_name_set(efl_added, "outgoing"),
353 efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_outgoing_cbs(), o));
354 EINA_SAFETY_ON_NULL_RETURN(pd->outgoing);
355
356 pd->sender = efl_add(EFL_IO_COPIER_CLASS, o,
357 efl_name_set(efl_added, "sender"),
358 efl_io_copier_buffer_limit_set(efl_added, 4096),
359 efl_io_copier_source_set(efl_added, pd->outgoing),
360 efl_io_copier_destination_set(efl_added, io),
361 efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE),
362 efl_event_callback_array_add(efl_added, _efl_io_buffered_stream_sender_cbs(), o));
363 EINA_SAFETY_ON_NULL_RETURN(pd->sender);
364 }
365 else
366 DBG("%p inner_io=%p (%s) is not Efl.Io.Writer", o, io, efl_class_name_get(efl_class_get(io)));
367}
368
369EOLIAN static Efl_Object *
370_efl_io_buffered_stream_inner_io_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
371{
372 return pd->inner_io;
373}
374
375EOLIAN static void
376_efl_io_buffered_stream_max_queue_size_input_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t max_queue_size_input)
377{
378 if (!pd->incoming)
379 {
380 DBG("%p inner_io=%p (%s) is not Efl.Io.Reader, limit=%zu ignored", o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)), max_queue_size_input);
381 return;
382 }
383 efl_io_queue_limit_set(pd->incoming, max_queue_size_input);
384}
385
386EOLIAN static size_t
387_efl_io_buffered_stream_max_queue_size_input_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
388{
389 if (!pd->incoming) return 0;
390 return efl_io_queue_limit_get(pd->incoming);
391}
392
393EOLIAN static void
394_efl_io_buffered_stream_max_queue_size_output_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t max_queue_size_output)
395{
396 if (!pd->outgoing)
397 {
398 DBG("%p inner_io=%p (%s) is not Efl.Io.Writer, limit=%zu ignored", o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)), max_queue_size_output);
399 return;
400 }
401 efl_io_queue_limit_set(pd->outgoing, max_queue_size_output);
402}
403
404EOLIAN static size_t
405_efl_io_buffered_stream_max_queue_size_output_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
406{
407 if (!pd->outgoing) return 0;
408 return efl_io_queue_limit_get(pd->outgoing);
409}
410
411EOLIAN static void
412_efl_io_buffered_stream_line_delimiter_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, const Eina_Slice *slice)
413{
414 if (!pd->receiver)
415 {
416 DBG("%p inner_io=%p (%s) is not Efl.Io.Reader, slice=%p ignored", o, pd->inner_io, efl_class_name_get(efl_class_get(pd->inner_io)), slice);
417 return;
418 }
419 efl_io_copier_line_delimiter_set(pd->receiver, slice);
420}
421
422EOLIAN static const Eina_Slice *
423_efl_io_buffered_stream_line_delimiter_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
424{
425 if (!pd->receiver) return NULL;
426 return efl_io_copier_line_delimiter_get(pd->receiver);
427}
428
429EOLIAN static void
430_efl_io_buffered_stream_inactivity_timeout_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, double seconds)
431{
432 if (pd->receiver)
433 efl_io_copier_inactivity_timeout_set(pd->receiver, seconds);
434 if (pd->sender)
435 efl_io_copier_inactivity_timeout_set(pd->sender, seconds);
436}
437
438EOLIAN static double
439_efl_io_buffered_stream_inactivity_timeout_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
440{
441 if (pd->receiver)
442 return efl_io_copier_inactivity_timeout_get(pd->receiver);
443 if (pd->sender)
444 return efl_io_copier_inactivity_timeout_get(pd->sender);
445 return 0.0;
446}
447
448EOLIAN static void
449_efl_io_buffered_stream_read_chunk_size_set(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t size)
450{
451 if (pd->sender)
452 {
453 efl_io_copier_buffer_limit_set(pd->sender, size);
454 efl_io_copier_read_chunk_size_set(pd->sender, size);
455 }
456
457 if (!pd->receiver)
458 {
459 efl_io_copier_buffer_limit_set(pd->receiver, size);
460 efl_io_copier_read_chunk_size_set(pd->receiver, size);
461 }
462}
463
464EOLIAN static size_t
465_efl_io_buffered_stream_read_chunk_size_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
466{
467 if (!pd->receiver) return 0;
468 return efl_io_copier_read_chunk_size_get(pd->receiver);
469}
470
471EOLIAN static size_t
472_efl_io_buffered_stream_pending_write_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
473{
474 if (!pd->outgoing) return 0;
475 return efl_io_queue_usage_get(pd->outgoing);
476}
477
478EOLIAN static size_t
479_efl_io_buffered_stream_pending_read_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
480{
481 if (!pd->incoming) return 0;
482 return efl_io_queue_usage_get(pd->incoming);
483}
484
485EOLIAN static Eina_Bool
486_efl_io_buffered_stream_slice_get(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, Eina_Slice *slice)
487{
488 if (!pd->incoming)
489 {
490 if (slice)
491 {
492 slice->mem = NULL;
493 slice->len = 0;
494 }
495 return EINA_FALSE;
496 }
497 return efl_io_queue_slice_get(pd->incoming, slice);
498}
499
500EOLIAN static void
501_efl_io_buffered_stream_discard(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd, size_t amount)
502{
503 if (!pd->incoming) return;
504 efl_io_queue_discard(pd->incoming, amount);
505}
506
507EOLIAN static void
508_efl_io_buffered_stream_clear(Eo *o EINA_UNUSED, Efl_Io_Buffered_Stream_Data *pd)
509{
510 if (!pd->incoming) return;
511 efl_io_queue_clear(pd->incoming);
512}
513
514EOLIAN static void
515_efl_io_buffered_stream_eos_mark(Eo *o, Efl_Io_Buffered_Stream_Data *pd)
516{
517 if (!pd->incoming) return;
518 DBG("%p mark eos", o);
519 efl_io_queue_eos_mark(pd->outgoing);
520}
521
522EOLIAN static Eina_Bool
523_efl_io_buffered_stream_flush(Eo *o, Efl_Io_Buffered_Stream_Data *pd, Eina_Bool may_block, Eina_Bool ignore_line_delimiter)
524{
525 size_t pending;
526 Eina_Bool ret;
527
528 EINA_SAFETY_ON_TRUE_RETURN_VAL(efl_io_closer_closed_get(o), EINA_FALSE);
529
530 if (!pd->outgoing) return EINA_TRUE;
531
532 pending = efl_io_queue_usage_get(pd->outgoing);
533 if (!pending)
534 return EINA_TRUE;
535
536 if (pd->is_closer && efl_io_closer_closed_get(pd->inner_io))
537 {
538 DBG("%p the inner I/O %p is already closed", o, pd->inner_io);
539 return EINA_TRUE;
540 }
541
542 DBG("%p attempt to flush %zu bytes, may_block=%hhu, ignore_line_delimiter=%hhu...", o, pending, may_block, ignore_line_delimiter);
543 ret = efl_io_copier_flush(pd->sender, may_block, ignore_line_delimiter);
544 DBG("%p flushed, ret=%hhu, still pending=%zu", o, ret, efl_io_queue_usage_get(pd->outgoing));
545
546 return ret;
547}
548
549#include "efl_io_buffered_stream.eo.c"