aboutsummaryrefslogblamecommitdiffstats
path: root/src/lib/ecore/efl_io_copier.c
blob: dbedfcf9591d22ae9515a6d7f8f7d0a344bb2177 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

                                 













                                    

                                 
                    





                                    
                             

                    
                            
                           
                                 












                                                                             
                                   








                                                                              
                                              

                                                           
                                                                  






                                                                            
                                                   
                                                                 
                                                                       




                                                                   
                 
                                                                                       
 

                                                               
            


           
                                                                           
 
                                                                      
                                             
 
                                                                                

                                                                   

 
                 
                                                                     
 
                                                            


                                               
 
                      
 
              






                                                                       




                                                                       
                                                         
                                                            
      




                                                                           
                                               
      

                
            






                                                          




                                               
                                           



                             
                                                         

                                                      
      























                                                                                              
                                                                          

                                                                                      














                                                  
                                                                               

                                                                                           




















                                                       
                  
                            


                                          
                                     




                                          




                                                        

                                         


           







                                                                    


                                                   

                                                                      


               

                                                                         
                                               

                     
                                                                    



                                     
                                         
 
                                                                      






                                                                   
           
                                                                                      







                                                   
                       


                                          
                                       
 
                                             

                         


               
                                                               






                                                                              







                                                               
                                                                      

               


                         
                                        


                                                                               
                                         









                                                                   
                                                                    






                                      
                                                                                      











                                                            
                                                                         




















                                                                       
                                                                  


           
                                                                             






                                                            
                                                                            














                                                                                                           
                                                                          
















                                                                                
                                                         





















                                                                                
                                                         
           
                                                                                             
                                                                                                         






                                                                           
                                                                                            











                                                            
                                                                                 









                                                            
                                               



                               
                                                                           

                                                                        
                                                                    






                                                                                                                   
                                                                               











                                                                                         
                                                              













                                                                                
                                                              
           
                                                                                                  
                                                                                                              



























                                                                                
                                                                                




                           
                                                                                              
 
                                           
      
                                           



                                        
                      





                                      
                                                       



                                                               
                        
                                                                                  
 
                             





                                                                                          


                                             



                              
                                                                                   












                                                                              



























                                                                                              
               
                                 
 
                            
                                              
 
                 
                                          
 








                                                                                
                                                           










                                                                            
                                                                









                                                                                
                                                                






                                  



                       
                                                                                        




                     
                                                                                                                                













                                                                      
                  
                                                                                    
 

                                                       


                    
                                                                                      
 
                                 

 
                       
                                                                                                         







                                                        



                                                                

                       
      
                                                                      














                                                                      





                                                                       
                                                                 
                                                            





                                                                           
                                               




                   
                    
                                                                                
 



                                                        
                                                            






                                                                                                  
                                                                                                                  

                                                                  
                                                                                                                                  
 








                                                                       
                                                                      



                                                              



                                                                    
                                 
                                       
                                











                                                                 
                        














                                                                         
                                                                               
 
                                                  
                                      




                               
 


                                          













                                                                   







                                          







                                             





                                                                                                                  
                                                                  

                                                                
                                                                            





                                                                     
                                                                                               




                            
                                                                                                                              
 

                                                              
 
                                                                  
                                                                            
 
                                                                            
                                                                                 


                       
                                                                                                     
 
                                  

 
                             
#define EFL_IO_COPIER_PROTECTED 1

#ifdef HAVE_CONFIG_H
# include <config.h>
#endif

#include <Ecore.h>
#include "ecore_private.h"

#define MY_CLASS EFL_IO_COPIER_CLASS
#define DEF_READ_CHUNK_SIZE 4096

typedef struct _Efl_Io_Copier_Data
{
   Efl_Io_Reader *source;
   Efl_Io_Writer *destination;
   Eina_Future *inactivity_timer;
   Eina_Future *job;
   Eina_Binbuf *buf;
   Eina_Slice line_delimiter;
   size_t buffer_limit;
   size_t read_chunk_size;
   struct {
      uint64_t read, written, total;
   } progress;
   double timeout_inactivity;
   Eina_Bool closed;
   Eina_Bool done;
   Eina_Bool force_dispatch;
   Eina_Bool close_on_exec;
   Eina_Bool close_on_invalidate;
} Efl_Io_Copier_Data;

static void _efl_io_copier_write(Eo *o, Efl_Io_Copier_Data *pd);
static void _efl_io_copier_read(Eo *o, Efl_Io_Copier_Data *pd);

#define _COPIER_DBG(o, pd) \
  do \
    { \
       if (eina_log_domain_level_check(_ecore_log_dom, EINA_LOG_LEVEL_DBG)) \
         { \
            DBG("copier={%p %s, refs=%d, closed=%d, done=%d, buf=%zd}", \
                o, \
                efl_class_name_get(efl_class_get(o)), \
                efl_ref_count(o), \
                efl_io_closer_closed_get(o), \
                pd->done, \
                pd->buf ? eina_binbuf_length_get(pd->buf): 0); \
            if (!pd->source) \
              DBG("source=NULL"); \
            else \
              DBG("source={%p %s, refs=%d, can_read=%d, eos=%d, closed=%d}", \
                  pd->source, \
                  efl_class_name_get(efl_class_get(pd->source)), \
                  efl_ref_count(pd->source), \
                  efl_io_reader_can_read_get(pd->source), \
                  efl_io_reader_eos_get(pd->source), \
                  efl_isa(pd->source, EFL_IO_CLOSER_INTERFACE) ? \
                  efl_io_closer_closed_get(pd->source) : 0); \
            if (!pd->destination) \
              DBG("destination=NULL"); \
            else \
              DBG("destination={%p %s, refs=%d, can_write=%d, closed=%d}", \
                  pd->destination, \
                  efl_class_name_get(efl_class_get(pd->destination)), \
                  efl_ref_count(pd->destination), \
                  efl_io_writer_can_write_get(pd->destination), \
                  efl_isa(pd->destination, EFL_IO_CLOSER_INTERFACE) ? \
                  efl_io_closer_closed_get(pd->destination) : 0); \
         } \
    } \
  while (0)

static Eina_Value
_efl_io_copier_timeout_inactivity_cb(Eo *o, void *data EINA_UNUSED, const Eina_Value v)
{
   Eina_Error err = ETIMEDOUT;
   efl_event_callback_call(o, EFL_IO_COPIER_EVENT_ERROR, &err);
   return v;
}

static void
_efl_io_copier_timeout_inactivity_reschedule(Eo *o, Efl_Io_Copier_Data *pd)
{
   if (pd->inactivity_timer) eina_future_cancel(pd->inactivity_timer);
   if (pd->timeout_inactivity <= 0.0) return;

   efl_future_then(o, efl_loop_timeout(efl_loop_get(o), pd->timeout_inactivity),
                   .success = _efl_io_copier_timeout_inactivity_cb,
                   .storage = &pd->inactivity_timer);
}

static Eina_Value
_efl_io_copier_job(Eo *o, void *data EINA_UNUSED, const Eina_Value v)
{
   Efl_Io_Copier_Data *pd = efl_data_scope_get(o, MY_CLASS);
   uint64_t old_read = pd->progress.read;
   uint64_t old_written = pd->progress.written;
   uint64_t old_total = pd->progress.total;

   _COPIER_DBG(o, pd);

   efl_ref(o);

   if (pd->source && efl_io_reader_can_read_get(pd->source))
     _efl_io_copier_read(o, pd);

   if (pd->destination && efl_io_writer_can_write_get(pd->destination))
     _efl_io_copier_write(o, pd);

   if ((old_read != pd->progress.read) ||
       (old_written != pd->progress.written) ||
       (old_total != pd->progress.total))
     {
        efl_event_callback_call(o, EFL_IO_COPIER_EVENT_PROGRESS, NULL);
        if (pd->closed) return v; /* cb may call close */
        _efl_io_copier_timeout_inactivity_reschedule(o, pd);
     }

   if (!pd->source || efl_io_reader_eos_get(pd->source))
     {
        if ((!pd->done) &&
            ((!pd->destination) || (eina_binbuf_length_get(pd->buf) == 0)))
          efl_io_copier_done_set(o, EINA_TRUE);
     }

   efl_unref(o);
   return v;
}

static void
_efl_io_copier_job_schedule(Eo *o, Efl_Io_Copier_Data *pd)
{
   if (pd->job) return;

   // When invalidated, no need to delay action
   if (efl_invalidated_get(o))
     {
        Eina_Value v = EINA_VALUE_EMPTY;

        v = _efl_io_copier_job(o, NULL, v);
        eina_value_flush(&v);
     }
   else
     {
        efl_future_then(o, efl_loop_job(efl_loop_get(o)),
                        .success = _efl_io_copier_job,
                        .storage = &pd->job);
     }
}

/* NOTE: the returned slice may be smaller than requested since the
 * internal binbuf may be modified from inside event calls.
 *
 * parameter slice_of_binbuf must have mem pointing to pd->binbuf
 */
static Eina_Slice
_efl_io_copier_dispatch_data_events(Eo *o, Efl_Io_Copier_Data *pd, Eina_Slice slice_of_binbuf)
{
   Eina_Slice tmp;
   size_t offset;

   tmp = eina_binbuf_slice_get(pd->buf);
   if ((slice_of_binbuf.bytes < tmp.bytes) ||
       (eina_slice_end_get(slice_of_binbuf) > eina_slice_end_get(tmp)))
     {
        CRI("slice_of_binbuf=" EINA_SLICE_FMT " must be inside binbuf=" EINA_SLICE_FMT,
            EINA_SLICE_PRINT(slice_of_binbuf), EINA_SLICE_PRINT(tmp));
        return (Eina_Slice){.mem = NULL, .len = 0};
     }

   offset = slice_of_binbuf.bytes - tmp.bytes;

   efl_event_callback_call(o, EFL_IO_COPIER_EVENT_DATA, &slice_of_binbuf);
   if (pd->closed) return (Eina_Slice){.mem = NULL, .len = 0}; /* cb may call close */

   /* user may have modified pd->buf, like calling
    * efl_io_copier_buffer_limit_set()
    */
   tmp = eina_binbuf_slice_get(pd->buf);
   if (offset <= tmp.len)
     {
        tmp.len -= offset;
        tmp.bytes += offset;
     }
   if (tmp.len > slice_of_binbuf.len)
     tmp.len = slice_of_binbuf.len;
   slice_of_binbuf = tmp;

   if (pd->line_delimiter.len > 0)
     {
        efl_event_callback_call(o, EFL_IO_COPIER_EVENT_LINE, &slice_of_binbuf);
        if (pd->closed) return (Eina_Slice){.mem = NULL, .len = 0}; /* cb may call close */

        /* user may have modified pd->buf, like calling
         * efl_io_copier_buffer_limit_set()
         */
        tmp = eina_binbuf_slice_get(pd->buf);
        if (offset <= tmp.len)
          {
             tmp.len -= offset;
             tmp.bytes += offset;
          }
        if (tmp.len > slice_of_binbuf.len)
          tmp.len = slice_of_binbuf.len;
        slice_of_binbuf = tmp;
     }

   return slice_of_binbuf;
}

static void
_efl_io_copier_read(Eo *o, Efl_Io_Copier_Data *pd)
{
   Eina_Rw_Slice rw_slice;
   Eina_Error err;
   size_t used, expand_size;

   EINA_SAFETY_ON_TRUE_RETURN(pd->closed);

   expand_size = pd->read_chunk_size;
   used = eina_binbuf_length_get(pd->buf);
   if (pd->buffer_limit > 0)
     {
        if (pd->buffer_limit <= used)
          {
             return;
          }
        else if (pd->buffer_limit > used)
          {
             size_t available = pd->buffer_limit - used;
             if (expand_size > available)
               expand_size = available;
          }
     }

   rw_slice = eina_binbuf_expand(pd->buf, expand_size);
   if (rw_slice.len == 0)
     {
        err = ENOMEM;
        efl_event_callback_call(o, EFL_IO_COPIER_EVENT_ERROR, &err);
        return;
     }

   err = efl_io_reader_read(pd->source, &rw_slice);
   if (err)
     {
        if (err != EAGAIN)
          efl_event_callback_call(o, EFL_IO_COPIER_EVENT_ERROR, &err);
        return;
     }

   if (pd->closed) return; /* read(source) triggers cb, may call close */

   if (!eina_binbuf_use(pd->buf, rw_slice.len))
     {
        err = ENOMEM;
        efl_event_callback_call(o, EFL_IO_COPIER_EVENT_ERROR, &err);
        return;
     }

   pd->progress.read += rw_slice.len;
   efl_io_copier_done_set(o, EINA_FALSE);

   if ((!pd->destination) && (eina_binbuf_length_get(pd->buf) > used))
     {
        /* Note: if there is a destination, dispatch data and line
         * from write since it will remove from binbuf and make it
         * simple to not repeat data that was already sent.
         *
         * however, if there is no destination, then emit the event
         * here.
         */
        _efl_io_copier_dispatch_data_events(o, pd, eina_rw_slice_slice_get(rw_slice));
     }

   _efl_io_copier_job_schedule(o, pd);
}

static void
_efl_io_copier_write(Eo *o, Efl_Io_Copier_Data *pd)
{
   Eina_Slice ro_slice;
   Eina_Error err;

   EINA_SAFETY_ON_TRUE_RETURN(pd->closed);
   EINA_SAFETY_ON_NULL_RETURN(pd->buf);

   ro_slice = eina_binbuf_slice_get(pd->buf);
   if (ro_slice.len == 0)
     {
        return;
     }

   if ((pd->line_delimiter.len > 0) && (!pd->force_dispatch) &&
       (pd->source && !efl_io_reader_eos_get(pd->source)))
     {
        const uint8_t *p = eina_slice_find(ro_slice, pd->line_delimiter);
        if (p)
          ro_slice.len = p - ro_slice.bytes + pd->line_delimiter.len;
        else if ((pd->buffer_limit == 0) || (ro_slice.len < pd->buffer_limit))
          {
             return;
          }
     }

   err = efl_io_writer_write(pd->destination, &ro_slice, NULL);
   if (err)
     {
        if (err != EAGAIN)
          efl_event_callback_call(o, EFL_IO_COPIER_EVENT_ERROR, &err);
        return;
     }
   if (ro_slice.len == 0)
     return;

   pd->progress.written += ro_slice.len;

   if (pd->closed) return; /* write(destination) triggers cb, may call close */

   efl_io_copier_done_set(o, EINA_FALSE);

   /* Note: dispatch data and line from write since it will remove
    * from binbuf and make it simple to not repeat data that was
    * already sent.
    */
   ro_slice = _efl_io_copier_dispatch_data_events(o, pd, ro_slice);

   if (!eina_binbuf_remove(pd->buf, 0, ro_slice.len))
     {
        err = ENOMEM;
        efl_event_callback_call(o, EFL_IO_COPIER_EVENT_ERROR, &err);
        return;
     }

   _efl_io_copier_job_schedule(o, pd);
}

static void
_efl_io_copier_source_can_read_changed(void *data, const Efl_Event *event EINA_UNUSED)
{
   Eo *o = data;
   Efl_Io_Copier_Data *pd = efl_data_scope_get(o, MY_CLASS);
   if (pd->closed) return;

   _COPIER_DBG(o, pd);

   if (efl_io_reader_can_read_get(pd->source))
     _efl_io_copier_job_schedule(o, pd);
}

static void
_efl_io_copier_source_eos(void *data, const Efl_Event *event EINA_UNUSED)
{
   Eo *o = data;
   Efl_Io_Copier_Data *pd = efl_data_scope_get(o, MY_CLASS);
   if (pd->closed) return;

   _COPIER_DBG(o, pd);

   _efl_io_copier_job_schedule(o, pd);
}

static void
_efl_io_copier_source_size_apply(Eo *o, Efl_Io_Copier_Data *pd)
{
   if (pd->closed) return;
   pd->progress.total = efl_io_sizer_size_get(pd->source);

   _COPIER_DBG(o, pd);

   if (pd->destination && efl_isa(pd->destination, EFL_IO_SIZER_MIXIN))
     efl_io_sizer_resize(pd->destination, pd->progress.total);

   efl_event_callback_call(o, EFL_IO_COPIER_EVENT_PROGRESS, NULL);
}

static void
_efl_io_copier_source_resized(void *data, const Efl_Event *event EINA_UNUSED)
{
   Eo *o = data;
   Efl_Io_Copier_Data *pd = efl_data_scope_get(o, MY_CLASS);
   _efl_io_copier_source_size_apply(o, pd);
}

static void
_efl_io_copier_source_closed(void *data, const Efl_Event *event EINA_UNUSED)
{
   Eo *o = data;
   Efl_Io_Copier_Data *pd = efl_data_scope_get(o, MY_CLASS);
   if (pd->closed) return;

   _COPIER_DBG(o, pd);

   _efl_io_copier_job_schedule(o, pd);
}

EFL_CALLBACKS_ARRAY_DEFINE(source_cbs,
                          { EFL_IO_READER_EVENT_CAN_READ_CHANGED, _efl_io_copier_source_can_read_changed },
                          { EFL_IO_READER_EVENT_EOS, _efl_io_copier_source_eos });

EOLIAN static Efl_Io_Reader *
_efl_io_copier_source_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   return pd->source;
}

EOLIAN static void
_efl_io_copier_source_set(Eo *o, Efl_Io_Copier_Data *pd, Efl_Io_Reader *source)
{
   if (pd->source == source) return;

   if (pd->source)
     {
        if (efl_isa(pd->source, EFL_IO_SIZER_MIXIN))
          {
             efl_event_callback_del(pd->source, EFL_IO_SIZER_EVENT_SIZE_CHANGED,
                                    _efl_io_copier_source_resized, o);
             pd->progress.total = 0;
          }
        if (efl_isa(pd->source, EFL_IO_CLOSER_INTERFACE))
          {
             efl_event_callback_del(pd->source, EFL_IO_CLOSER_EVENT_CLOSED,
                                    _efl_io_copier_source_closed, o);
          }
        efl_event_callback_array_del(pd->source, source_cbs(), o);
        efl_unref(pd->source);
        pd->source = NULL;
     }

   if (source)
     {
        EINA_SAFETY_ON_TRUE_RETURN(pd->closed);
        pd->source = efl_ref(source);
        efl_event_callback_array_add(pd->source, source_cbs(), o);

        if (efl_isa(pd->source, EFL_IO_SIZER_MIXIN))
          {
             efl_event_callback_add(pd->source, EFL_IO_SIZER_EVENT_SIZE_CHANGED,
                                    _efl_io_copier_source_resized, o);
             _efl_io_copier_source_size_apply(o, pd);
          }

        if (efl_isa(pd->source, EFL_IO_CLOSER_INTERFACE))
          {
             efl_io_closer_close_on_exec_set(pd->source, efl_io_closer_close_on_exec_get(o));
             efl_io_closer_close_on_invalidate_set(pd->source, efl_io_closer_close_on_invalidate_get(o));
             efl_event_callback_add(pd->source, EFL_IO_CLOSER_EVENT_CLOSED,
                                     _efl_io_copier_source_closed, o);
          }
     }
}

static void
_efl_io_copier_destination_can_write_changed(void *data, const Efl_Event *event EINA_UNUSED)
{
   Eo *o = data;
   Efl_Io_Copier_Data *pd = efl_data_scope_get(o, MY_CLASS);
   if (pd->closed) return;

   _COPIER_DBG(o, pd);

   if (efl_io_writer_can_write_get(pd->destination))
     _efl_io_copier_job_schedule(o, pd);
}

static void
_efl_io_copier_destination_closed(void *data, const Efl_Event *event EINA_UNUSED)
{
   Eo *o = data;
   Efl_Io_Copier_Data *pd = efl_data_scope_get(o, MY_CLASS);
   if (pd->closed) return;

   _COPIER_DBG(o, pd);

   if (eina_binbuf_length_get(pd->buf) == 0)
     {
        if (!pd->done)
          efl_io_copier_done_set(o, EINA_TRUE);
     }
   else
     {
        Eina_Error err = EBADF;
        if (pd->inactivity_timer) eina_future_cancel(pd->inactivity_timer);
        WRN("copier %p destination %p closed with %zd bytes pending...",
            o, pd->destination, eina_binbuf_length_get(pd->buf));
        efl_event_callback_call(o, EFL_IO_COPIER_EVENT_ERROR, &err);
     }
}

EFL_CALLBACKS_ARRAY_DEFINE(destination_cbs,
                          { EFL_IO_WRITER_EVENT_CAN_WRITE_CHANGED, _efl_io_copier_destination_can_write_changed });

EOLIAN static Efl_Io_Writer *
_efl_io_copier_destination_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   return pd->destination;
}

EOLIAN static void
_efl_io_copier_destination_set(Eo *o, Efl_Io_Copier_Data *pd, Efl_Io_Writer *destination)
{
   if (pd->destination == destination) return;

   if (pd->destination)
     {
        efl_event_callback_array_del(pd->destination, destination_cbs(), o);
        if (efl_isa(pd->destination, EFL_IO_CLOSER_INTERFACE))
          {
             efl_event_callback_del(pd->destination, EFL_IO_CLOSER_EVENT_CLOSED,
                                    _efl_io_copier_destination_closed, o);
          }
        efl_unref(pd->destination);
        pd->destination = NULL;
     }

   if (destination)
     {
        EINA_SAFETY_ON_TRUE_RETURN(pd->closed);
        pd->destination = efl_ref(destination);
        efl_event_callback_array_add(pd->destination, destination_cbs(), o);

        if (efl_isa(pd->destination, EFL_IO_CLOSER_INTERFACE))
          {
             efl_io_closer_close_on_exec_set(pd->destination, efl_io_closer_close_on_exec_get(o));
             efl_io_closer_close_on_invalidate_set(pd->destination, efl_io_closer_close_on_invalidate_get(o));
             efl_event_callback_add(pd->destination, EFL_IO_CLOSER_EVENT_CLOSED,
                                     _efl_io_copier_destination_closed, o);
          }
        if (efl_isa(pd->destination, EFL_IO_SIZER_MIXIN) &&
            pd->source && efl_isa(pd->source, EFL_IO_SIZER_MIXIN))
          {
             efl_io_sizer_resize(pd->destination, pd->progress.total);
          }
     }
}

EOLIAN static void
_efl_io_copier_buffer_limit_set(Eo *o, Efl_Io_Copier_Data *pd, size_t size)
{
   size_t used;

   EINA_SAFETY_ON_TRUE_RETURN(pd->closed);

   if (pd->buffer_limit == size) return;
   pd->buffer_limit = size;
   if (size == 0) return;

   used = eina_binbuf_length_get(pd->buf);
   if (used > size) eina_binbuf_remove(pd->buf, size, used);
   if (pd->read_chunk_size > size) efl_io_copier_read_chunk_size_set(o, size);
}

EOLIAN static size_t
_efl_io_copier_buffer_limit_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   return pd->buffer_limit;
}

EOLIAN static void
_efl_io_copier_line_delimiter_set(Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd, Eina_Slice slice)
{
   if (pd->line_delimiter.mem == slice.mem)
     {
        pd->line_delimiter.len = slice.len;
        return;
     }

   free((void *)pd->line_delimiter.mem);
   if (slice.len == 0)
     {
        pd->line_delimiter.mem = NULL;
        pd->line_delimiter.len = 0;
     }
   else
     {
        Eina_Rw_Slice rw_slice = eina_slice_dup(slice);
        pd->line_delimiter = eina_rw_slice_slice_get(rw_slice);
     }
}

EOLIAN static Eina_Slice
_efl_io_copier_line_delimiter_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   return pd->line_delimiter;
}


EOLIAN static void
_efl_io_copier_read_chunk_size_set(Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd, size_t size)
{
   EINA_SAFETY_ON_TRUE_RETURN(pd->closed);

   if (size == 0) size = DEF_READ_CHUNK_SIZE;
   pd->read_chunk_size = size;
}

EOLIAN static size_t
_efl_io_copier_read_chunk_size_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   return pd->read_chunk_size > 0 ? pd->read_chunk_size : DEF_READ_CHUNK_SIZE;
}

EOLIAN static Eina_Error
_efl_io_copier_efl_io_closer_close(Eo *o, Efl_Io_Copier_Data *pd)
{
   Eina_Error err = 0, r;

   EINA_SAFETY_ON_TRUE_RETURN_VAL(pd->closed, EINVAL);

   _COPIER_DBG(o, pd);

   while (pd->buf)
     {
        size_t pending = eina_binbuf_length_get(pd->buf);
        if (pending == 0) break;
        else if (pd->destination && efl_io_writer_can_write_get(pd->destination))
          {
             DBG("copier %p destination %p closed with %zd bytes pending, do final write...",
                 o, pd->destination, pending);
             pd->force_dispatch = EINA_TRUE;
             _efl_io_copier_write(o, pd);
             pd->force_dispatch = EINA_FALSE;
          }
        else if (!pd->destination)
          {
             Eina_Slice binbuf_slice = eina_binbuf_slice_get(pd->buf);
             DBG("copier %p destination %p closed with %zd bytes pending, dispatch events...",
                 o, pd->destination, pending);
             _efl_io_copier_dispatch_data_events(o, pd, binbuf_slice);
             break;
          }
        else
          {
             DBG("copier %p destination %p closed with %zd bytes pending...",
                 o, pd->destination, pending);
             break;
          }
     }

   if (pd->job)
     eina_future_cancel(pd->job);

   if (pd->inactivity_timer)
     eina_future_cancel(pd->inactivity_timer);

   if (!pd->done)
     efl_io_copier_done_set(o, EINA_TRUE);

   if (pd->source)
     {
        if (efl_isa(pd->source, EFL_IO_SIZER_MIXIN))
          {
             efl_event_callback_del(pd->source, EFL_IO_SIZER_EVENT_SIZE_CHANGED,
                                    _efl_io_copier_source_resized, o);
             pd->progress.total = 0;
          }
        efl_event_callback_array_del(pd->source, source_cbs(), o);
        if (efl_isa(pd->source, EFL_IO_CLOSER_INTERFACE) &&
            !efl_io_closer_closed_get(pd->source))
          {
             efl_event_callback_del(pd->source, EFL_IO_CLOSER_EVENT_CLOSED,
                                    _efl_io_copier_source_closed, o);
             err = efl_io_closer_close(pd->source);
          }
     }

   if (pd->destination)
     {
        efl_event_callback_array_del(pd->destination, destination_cbs(), o);
        if (efl_isa(pd->destination, EFL_IO_CLOSER_INTERFACE) &&
            !efl_io_closer_closed_get(pd->destination))
          {
             efl_event_callback_del(pd->destination, EFL_IO_CLOSER_EVENT_CLOSED,
                                    _efl_io_copier_destination_closed, o);
             r = efl_io_closer_close(pd->destination);
             if (!err) err = r;
          }
     }

   pd->closed = EINA_TRUE;
   efl_event_callback_call(o, EFL_IO_CLOSER_EVENT_CLOSED, NULL);

   if (pd->buf)
     {
        eina_binbuf_free(pd->buf);
        pd->buf = NULL;
     }

   return err;
}

EOLIAN static Eina_Bool
_efl_io_copier_efl_io_closer_closed_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   return pd->closed;
}

EOLIAN static void
_efl_io_copier_progress_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd, uint64_t *read, uint64_t *written, uint64_t *total)
{
   if (read) *read = pd->progress.read;
   if (written) *written = pd->progress.written;
   if (total) *total = pd->progress.total;
}

EOLIAN static Eina_Binbuf *
_efl_io_copier_binbuf_steal(Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   Eina_Binbuf *ret = pd->buf;
   pd->buf = eina_binbuf_new();
   return ret;
}

EOLIAN static void
_efl_io_copier_timeout_inactivity_set(Eo *o, Efl_Io_Copier_Data *pd, double seconds)
{
   pd->timeout_inactivity = seconds;
   _efl_io_copier_timeout_inactivity_reschedule(o, pd);
}

EOLIAN static double
_efl_io_copier_timeout_inactivity_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   return pd->timeout_inactivity;
}

EOLIAN static Eina_Bool
_efl_io_copier_flush(Eo *o, Efl_Io_Copier_Data *pd, Eina_Bool may_block, Eina_Bool ignore_line_delimiter)
{
   uint64_t old_read = pd->progress.read;
   uint64_t old_written = pd->progress.written;
   uint64_t old_total = pd->progress.total;

   _COPIER_DBG(o, pd);

   if (pd->source && !efl_io_reader_eos_get(pd->source))
     {
        if (may_block || efl_io_reader_can_read_get(pd->source))
          _efl_io_copier_read(o, pd);
     }

   if (pd->destination)
     {
        if (may_block || efl_io_writer_can_write_get(pd->destination))
          {
             pd->force_dispatch = ignore_line_delimiter;
             _efl_io_copier_write(o, pd);
             pd->force_dispatch = EINA_FALSE;
          }
     }
   else if (ignore_line_delimiter && pd->buf)
     {
        size_t pending = eina_binbuf_length_get(pd->buf);
        if (pending)
          {
             Eina_Slice binbuf_slice = eina_binbuf_slice_get(pd->buf);
             _efl_io_copier_dispatch_data_events(o, pd, binbuf_slice);
          }
     }

   if ((old_read != pd->progress.read) ||
       (old_written != pd->progress.written) ||
       (old_total != pd->progress.total))
     {
        efl_event_callback_call(o, EFL_IO_COPIER_EVENT_PROGRESS, NULL);
        if (pd->closed) return EINA_TRUE; /* cb may call close */
        _efl_io_copier_timeout_inactivity_reschedule(o, pd);
     }

   if (!pd->source || efl_io_reader_eos_get(pd->source))
     {
        if ((!pd->done) &&
            ((!pd->destination) || (eina_binbuf_length_get(pd->buf) == 0)))
          efl_io_copier_done_set(o, EINA_TRUE);
     }

   return pd->done;
}

EOLIAN static size_t
_efl_io_copier_pending_size_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   return pd->buf ? eina_binbuf_length_get(pd->buf) : 0;
}

EOLIAN static Eina_Bool
_efl_io_copier_done_get(const Eo *o, Efl_Io_Copier_Data *pd)
{
   DBG("%p done=%d pending=%zd source={%p %s, eos=%d, closed=%d}, destination={%p %s, closed=%d}",
       o, pd->done,
       pd->buf ? eina_binbuf_length_get(pd->buf) : 0,
       pd->source,
       pd->source ? efl_class_name_get(pd->source) : "",
       pd->source ? efl_io_reader_eos_get(pd->source) : 1,
       pd->source ? (efl_isa(pd->source, EFL_IO_CLOSER_INTERFACE) ? efl_io_closer_closed_get(pd->source) : 0) : 1,
       pd->destination,
       pd->destination ? efl_class_name_get(pd->destination) : "",
       pd->destination ? (efl_isa(pd->destination, EFL_IO_CLOSER_INTERFACE) ? efl_io_closer_closed_get(pd->destination) : 0) : 1);

   return pd->done;
}

EOLIAN static void
_efl_io_copier_done_set(Eo *o, Efl_Io_Copier_Data *pd, Eina_Bool value)
{
   if (pd->done == value) return;
   pd->done = value;
   if (!value) return;
   if (pd->inactivity_timer) eina_future_cancel(pd->inactivity_timer);
   efl_event_callback_call(o, EFL_IO_COPIER_EVENT_DONE, NULL);
}


EOLIAN static Eo *
_efl_io_copier_efl_object_constructor(Eo *o, Efl_Io_Copier_Data *pd)
{
   pd->buf = eina_binbuf_new();
   pd->close_on_exec = EINA_TRUE;
   pd->close_on_invalidate = EINA_TRUE;
   pd->timeout_inactivity = 0.0;

   EINA_SAFETY_ON_NULL_RETURN_VAL(pd->buf, NULL);

   return efl_constructor(efl_super(o, MY_CLASS));
}

EOLIAN static Eo *
_efl_io_copier_efl_object_finalize(Eo *o, Efl_Io_Copier_Data *pd)
{
   if (pd->read_chunk_size == 0)
     efl_io_copier_read_chunk_size_set(o, DEF_READ_CHUNK_SIZE);

   if (!efl_loop_get(o))
     {
        ERR("Set a loop provider as parent of this copier!");
        return NULL;
     }

   if ((pd->source && efl_io_reader_can_read_get(pd->source)) ||
       (pd->destination && efl_io_writer_can_write_get(pd->destination)))
     _efl_io_copier_job_schedule(o, pd);

   _COPIER_DBG(o, pd);

   return efl_finalize(efl_super(o, MY_CLASS));
}

EOLIAN static void
_efl_io_copier_efl_object_invalidate(Eo *o, Efl_Io_Copier_Data *pd EINA_UNUSED)
{
   if (efl_io_closer_close_on_invalidate_get(o) &&
       (!efl_io_closer_closed_get(o)))
     {
        efl_event_freeze(o);
        efl_io_closer_close(o);
        efl_event_thaw(o);
     }

   efl_io_copier_source_set(o, NULL);
   efl_io_copier_destination_set(o, NULL);

   efl_invalidate(efl_super(o, MY_CLASS));
}

EOLIAN static void
_efl_io_copier_efl_object_destructor(Eo *o, Efl_Io_Copier_Data *pd)
{
   _COPIER_DBG(o, pd);

   if (pd->job)
     eina_future_cancel(pd->job);

   if (pd->inactivity_timer)
     eina_future_cancel(pd->inactivity_timer);

   efl_destructor(efl_super(o, MY_CLASS));

   if (pd->buf)
     {
        eina_binbuf_free(pd->buf);
        pd->buf = NULL;
     }

   if (pd->line_delimiter.mem)
     {
        free((void *)pd->line_delimiter.mem);
        pd->line_delimiter.mem = NULL;
        pd->line_delimiter.len = 0;
     }
}

EOLIAN static Eina_Bool
_efl_io_copier_efl_io_closer_close_on_exec_set(Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd, Eina_Bool close_on_exec)
{
   if (pd->close_on_exec == close_on_exec) return EINA_TRUE;
   pd->close_on_exec = close_on_exec;

   if (pd->source && efl_isa(pd->source, EFL_IO_CLOSER_INTERFACE))
     efl_io_closer_close_on_exec_set(pd->source, close_on_exec);

   if (pd->destination && efl_isa(pd->destination, EFL_IO_CLOSER_INTERFACE))
     efl_io_closer_close_on_exec_set(pd->destination, close_on_exec);

   return EINA_TRUE;
}

EOLIAN static Eina_Bool
_efl_io_copier_efl_io_closer_close_on_exec_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   return pd->close_on_exec;
}

EOLIAN static void
_efl_io_copier_efl_io_closer_close_on_invalidate_set(Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd, Eina_Bool close_on_invalidate)
{
   if (pd->close_on_invalidate == close_on_invalidate) return;
   pd->close_on_invalidate = close_on_invalidate;

   if (pd->source && efl_isa(pd->source, EFL_IO_CLOSER_INTERFACE))
     efl_io_closer_close_on_invalidate_set(pd->source, close_on_invalidate);

   if (pd->destination && efl_isa(pd->destination, EFL_IO_CLOSER_INTERFACE))
     efl_io_closer_close_on_invalidate_set(pd->destination, close_on_invalidate);
}

EOLIAN static Eina_Bool
_efl_io_copier_efl_io_closer_close_on_invalidate_get(const Eo *o EINA_UNUSED, Efl_Io_Copier_Data *pd)
{
   return pd->close_on_invalidate;
}

#include "efl_io_copier.eo.c"