Some example for Ecore_Thread

It has some issues that need debugging, I'm not sure if Ecore_Thread doesn't
like having stuff done before the main loop is running and valgrind was also
complaining about some invalid reads in ecore_thread.c, but at least the
example is there for people to look at and report problems, things not clear
or just about anything that may come up from it.


SVN revision: 61293
This commit is contained in:
Iván Briano 2011-07-12 13:38:25 +00:00
parent 4da2c481b5
commit 970c4e6757
4 changed files with 523 additions and 3 deletions

View File

@ -797,4 +797,126 @@
* @until }
*
* @example ecore_animator_example.c
*/
*/
/**
* @page ecore_thread_example_c Ecore_Thread - API overview
*
* Working with threads is hard. Ecore helps to do so a bit easier, but as
* the example in @ref ecore_thread_example.c "ecore_thread_example.c" shows,
* there's a lot to consider even when doing the most simple things.
*
* We'll be going through this thorough example now, showing how the differents
* aspects of @ref Ecore_Thread are used, but users are encourage to avoid
* threads unless it's really the only option, as they always add more
* complexity than the program usually requires.
*
* Ecore Threads come in two flavors, short jobs and feedback jobs. Short jobs
* just run the given function and are more commonly used for small tasks
* where the main loop does not need to know how the work is going in between.
* The short job in our example is so short we had to artifically enlarge it
* with @c sleep(). Other than that, it also uses threads local data to keep
* the data we are working with persistent across different jobs ran by the
* same system thread. This data will be freed when the no more jobs are
* pending and the thread is terminated. If the data doesn't exist in the
* thread's storage, we create it and save it there for future jobs to find
* it. If creation fails, we cancel ourselves, so the main loop knows that
* we didn't just exit normally, meaning the job could not be done. The main
* part of the function checks in each iteration if it was cancelled by the
* main loop, and if it was, it stops processing and clears the data from the
* storage (we assume @c cancel means no one else will need this, but this is
* really application dependant).
* @dontinclude ecore_thread_example.c
* @skip static void
* @until sleep(1)
* @until }
* @until }
*
* Feedback jobs, on the other hand, run tasks that will inform back to the
* main loop its progress, send partial data as is processed, just ping saying
* it's still alive and processing, or anything that needs the thread to talk
* back to the main loop.
* @skip static void
* @until the_end
* @until }
*
* Finally, one more feedback job, but this one will be running outside of
* Ecore's pool, so we can use the pool for real work and keep this very
* light function unchecked. All it does is check if some condition is met
* and send a message to the main loop telling it it's time to close.
* @skip static void
* @until }
* @until }
* @until }
*
* Every now and then the program prints its status, counting threads running
* and pending jobs.
* @skip static void
* @until }
*
* In our main loop, we'll be receiving messages from our feedback jobs using
* the same callback for both of them.
* @skip static void
* @until char *str
*
* The light job running out of the pool will let us know when we can exit our
* program.
* @until }
*
* Next comes the handling of data sent from the actual worker threads, always
* remembering that the data belongs to us now, and not the thread, so it's
* our responsibility to free it.
* @until }
* @until }
*
* Last, the condition to exit is given by how many messages we want to handle,
* so we need to count them and inform the condition checking thread that the
* value changed.
* @until }
*
* When a thread finishes its job or gets cancelled, the main loop is notified
* through the callbacks set when creating the task. In this case, we just
* print what happen and keep track of one of them used to exemplify cancelling.
* Here we are pretending one of our short jobs has a timeout, so if it doesn't
* finish before a timer is triggered, it will be cancelled.
* @skip static void
* @until _cancel_timer_cb
* @until }
*
* The main function does some setup that includes reading parameters from
* the command line to change its behaviour and test different results.
* These are:
* @li -t <some_num> maximum number of threads to run at the same time.
* @li -p <some_path> adds @c some_path to the list used by the feedback jobs.
* This parameter can be used multiple times.
* @li -m <some_num> the number of messages to process before the program is
* signalled to exit.
*
* Skipping some bits, we init Ecore and our application data.
* @skip ecore_init
* @until appdata.max_msgs
*
* If any paths for the feedback jobs were given, we use them, otherwise we
* fallback to some defaults. Always initting the proper mutexes used by the
* threaded job.
* @skip path_list
* @until EINA_LIST_FREE
* @until }
* @until }
*
* Initialize the mutex needed for the condition checking thread
* @skip appdata.mutex
* @until appdata.condition
*
* And start our tasks.
* @until appdata.thread_3
* @until EINA_FALSE
*
* To finalize, set a timer to cancel one of the tasks if it doesn't end
* before the timeout, one more timer for status report and get into the main
* loop. Once we are out, destroy our mutexes and finish the program.
* @until _status_timer_cb
* @until }
*
* @example ecore_thread_example.c
*/

View File

@ -37,7 +37,8 @@ SRCS = \
ecore_fd_handler_gnutls_example.c \
ecore_file_download_example.c \
ecore_pipe_simple_example.c \
ecore_pipe_gstreamer_example.c
ecore_pipe_gstreamer_example.c \
ecore_thread_example.c
EXTRA_DIST = $(SRCS)
@ -61,7 +62,8 @@ pkglib_PROGRAMS += \
ecore_pipe_simple_example \
ecore_con_lookup_example \
ecore_con_url_headers_example \
ecore_con_url_download_example
ecore_con_url_download_example \
ecore_thread_example
ecore_animator_example_LDADD = $(ECOREBASELDADD) @EVAS_LIBS@ $(top_builddir)/src/lib/ecore_evas/libecore_evas.la
ecore_con_lookup_example_LDADD = $(ECOREBASELDADD) $(top_builddir)/src/lib/ecore_con/libecore_con.la

View File

@ -0,0 +1,394 @@
/*
* gcc -o ecore_thread_example ecore_thread_example.c `pkg-config --cflags --libs ecore`
*/
#include <stdio.h>
#include <stdlib.h>
#include <dirent.h>
#include <Ecore.h>
#include <Ecore_Getopt.h>
typedef struct
{
Ecore_Thread *thread_3;
int msgs_received;
int max_msgs;
Eina_Lock mutex;
Eina_Condition condition;
} App_Data;
typedef struct
{
Eina_List *list;
} Thread_Data;
typedef struct
{
char *name;
char *base;
Eina_Lock mutex;
} Feedback_Thread_Data;
typedef struct
{
int all_done;
Eina_List *list;
} App_Msg;
static void
_local_data_free(void *data)
{
Thread_Data *td = data;
char *str;
EINA_LIST_FREE(td->list, str)
{
printf("Freeing string: %s\n", str);
free(str);
}
free(td);
}
static void
_short_job(void *data, Ecore_Thread *th)
{
Thread_Data *td;
int i;
td = ecore_thread_local_data_find(th, "data");
if (!td)
{
td = calloc(1, sizeof(Thread_Data));
if (!td)
{
ecore_thread_cancel(th);
return;
}
ecore_thread_local_data_add(th, "data", td, _local_data_free,
EINA_FALSE);
}
for (i = 0; i < 10; i++)
{
char buf[200];
if (ecore_thread_check(th))
{
ecore_thread_local_data_del(th, "data");
break;
}
snprintf(buf, sizeof(buf), "Thread %p: String number %d", th, i);
td->list = eina_list_append(td->list, strdup(buf));
sleep(1);
}
}
static void
_feedback_job(void *data, Ecore_Thread *th)
{
time_t t;
int i, count;
Feedback_Thread_Data *ftd = NULL;
DIR *dir;
App_Msg *msg;
count = (int)ecore_thread_global_data_find("count");
for (i = 0; i < count; i++)
{
char buf[32];
snprintf(buf, sizeof(buf), "data%d", i);
ftd = ecore_thread_global_data_find(buf);
if (!ftd)
continue;
if (eina_lock_take_try(&ftd->mutex))
break;
else
ftd = NULL;
}
if (!ftd)
return;
dir = opendir(ftd->base);
if (!dir)
goto the_end;
msg = calloc(1, sizeof(App_Msg));
t = time(NULL);
while (time(NULL) < t + 2)
{
struct dirent entry, *result;
if (readdir_r(dir, &entry, &result))
break;
if (!result)
break;
if (strlen(result->d_name) >= 10)
msg->list = eina_list_append(msg->list,
strdup(result->d_name));
}
closedir(dir);
ecore_thread_feedback(th, msg);
the_end:
ecore_thread_global_data_del(ftd->name);
free(ftd->name);
free(ftd->base);
eina_lock_release(&ftd->mutex);
eina_lock_free(&ftd->mutex);
free(ftd);
ecore_thread_reschedule(th);
}
static void
_out_of_pool_job(void *data, Ecore_Thread *th)
{
App_Data *ad = data;
App_Msg *msg;
while (1)
{
int msgs;
eina_condition_wait(&ad->condition);
msgs = ad->msgs_received;
eina_lock_release(&ad->mutex);
if (msgs == ad->max_msgs)
{
msg = calloc(1, sizeof(App_Msg));
msg->all_done = 1;
ecore_thread_feedback(th, msg);
return;
}
}
}
static void
_print_status(void)
{
int active, pending_total, pending_feedback, pending_short, available;
active = ecore_thread_active_get();
pending_total = ecore_thread_pending_total_get();
pending_feedback = ecore_thread_pending_feedback_get();
pending_short = ecore_thread_pending_get();
available = ecore_thread_available_get();
printf("Status:\n\t* Active threads: %d\n"
"\t* Available threads: %d\n"
"\t* Pending short jobs: %d\n"
"\t* Pending feedback jobs: %d\n"
"\t* Pending total: %d\n", active, available, pending_short,
pending_feedback, pending_total);
}
static void
_feedback_job_msg_cb(void *data, Ecore_Thread *th, void *msg_data)
{
App_Data *ad = data;
App_Msg *msg = msg_data;
char *str;
if (msg->all_done)
{
ecore_main_loop_quit();
free(msg);
return;
}
_print_status();
if (!msg->list)
printf("Received an empty list from thread %p\n", th);
else
{
int i = 0;
printf("Received %d elements from threads %p (printing first 5):\n",
eina_list_count(msg->list), th);
EINA_LIST_FREE(msg->list, str)
{
if (i <= 5)
printf("\t%s\n", str);
free(str);
i++;
}
}
eina_lock_take(&ad->mutex);
ad->msgs_received++;
eina_condition_signal(&ad->condition);
eina_lock_release(&ad->mutex);
free(msg);
}
static void
_thread_end_cb(void *data, Ecore_Thread *th)
{
App_Data *ad = data;
printf("Normal termination for thread %p.\n", th);
if (th == ad->thread_3)
ad->thread_3 = NULL;
}
static void
_thread_cancel_cb(void *data, Ecore_Thread *th)
{
App_Data *ad = data;
printf("Thread %p got cancelled.\n", th);
if (th == ad->thread_3)
ad->thread_3 = NULL;
}
static Eina_Bool
_cancel_timer_cb(void *data)
{
App_Data *ad = data;
if (ad->thread_3 && !ecore_thread_check(ad->thread_3))
ecore_thread_cancel(ad->thread_3);
return EINA_FALSE;
}
static Eina_Bool
_status_timer_cb(void *data)
{
_print_status();
return EINA_TRUE;
}
static const Ecore_Getopt optdesc = {
"ecore_thread_example",
NULL,
"0.0",
"(C) 2011 Enlightenment",
"Public domain?",
"Example program for Ecore_Thread",
0,
{
ECORE_GETOPT_STORE_INT('t', "threads", "Max number of threads to run"),
ECORE_GETOPT_STORE_INT('m', "msgs", "Max number of messages to receive"),
ECORE_GETOPT_APPEND_METAVAR('p', "path", "Add path for feedback job",
"STRING", ECORE_GETOPT_TYPE_STR),
ECORE_GETOPT_HELP('h', "help"),
ECORE_GETOPT_SENTINEL
}
};
int
main(int argc, char *argv[])
{
int i, max_threads = 0, max_msgs = 0;
Eina_Bool opt_quit = EINA_FALSE;
Eina_List *path_list = NULL;
App_Data appdata;
Ecore_Getopt_Value values[] = {
ECORE_GETOPT_VALUE_INT(max_threads),
ECORE_GETOPT_VALUE_INT(max_msgs),
ECORE_GETOPT_VALUE_LIST(path_list),
ECORE_GETOPT_VALUE_BOOL(opt_quit),
ECORE_GETOPT_VALUE_NONE
};
ecore_init();
i = ecore_thread_max_get();
printf("Initial max threads: %d\n", i);
memset(&appdata, 0, sizeof(App_Data));
appdata.max_msgs = 1;
if (ecore_getopt_parse(&optdesc, values, argc, argv) < 0)
{
printf("Argument parsing failed\n");
return 1;
}
if (opt_quit)
return 0;
if (max_threads)
{
ecore_thread_max_set(max_threads);
printf("Max threads: %d\n", ecore_thread_max_get());
}
if (max_msgs)
appdata.max_msgs = max_msgs;
if (!path_list)
{
Feedback_Thread_Data *ftd;
ecore_thread_global_data_add("count", (void *)3, NULL, EINA_FALSE);
ftd = calloc(1, sizeof(Feedback_Thread_Data));
ftd->name = strdup("data0");
ftd->base = strdup("/usr/bin");
eina_lock_new(&ftd->mutex);
ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
ftd = calloc(1, sizeof(Feedback_Thread_Data));
ftd->name = strdup("data1");
ftd->base = strdup("/usr/lib");
eina_lock_new(&ftd->mutex);
ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
ftd = calloc(1, sizeof(Feedback_Thread_Data));
ftd->name = strdup("data2");
ftd->base = strdup("/usr/share");
eina_lock_new(&ftd->mutex);
ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
}
else
{
Feedback_Thread_Data *ftd;
char *str;
ecore_thread_global_data_add("count",
(void *)eina_list_count(path_list), NULL,
EINA_FALSE);
i = 0;
EINA_LIST_FREE(path_list, str)
{
char buf[32];
snprintf(buf, sizeof(buf), "data%d", i);
ftd = calloc(1, sizeof(Feedback_Thread_Data));
ftd->name = strdup(buf);
ftd->base = strdup(str);
eina_lock_new(&ftd->mutex);
ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
free(str);
i++;
}
}
eina_lock_new(&appdata.mutex);
eina_condition_new(&appdata.condition, &appdata.mutex);
ecore_thread_feedback_run(_out_of_pool_job, _feedback_job_msg_cb, NULL,
NULL, &appdata, EINA_TRUE);
ecore_thread_run(_short_job, _thread_end_cb, _thread_cancel_cb, &appdata);
ecore_thread_feedback_run(_feedback_job, _feedback_job_msg_cb,
_thread_end_cb, _thread_cancel_cb, &appdata,
EINA_FALSE);
appdata.thread_3 = ecore_thread_run(_short_job, _thread_end_cb,
_thread_cancel_cb, &appdata);
ecore_thread_feedback_run(_feedback_job, _feedback_job_msg_cb,
_thread_end_cb, _thread_cancel_cb, &appdata,
EINA_FALSE);
ecore_timer_add(1.0, _cancel_timer_cb, &appdata);
ecore_timer_add(2.0, _status_timer_cb, NULL);
_print_status();
ecore_main_loop_begin();
eina_condition_free(&appdata.condition);
eina_lock_free(&appdata.mutex);
ecore_shutdown();
return 0;
}

View File

@ -826,6 +826,8 @@ extern "C" {
* for the function running in the thread to send messages to the main
* thread.
*
* See an overview example in @ref ecore_thread_example_c.
*
* @ingroup Ecore_Group
*
* @{