From 970c4e6757593f8abf90e9e663602cac2fdbf223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Briano?= Date: Tue, 12 Jul 2011 13:38:25 +0000 Subject: [PATCH] Some example for Ecore_Thread It has some issues that need debugging, I'm not sure if Ecore_Thread doesn't like having stuff done before the main loop is running and valgrind was also complaining about some invalid reads in ecore_thread.c, but at least the example is there for people to look at and report problems, things not clear or just about anything that may come up from it. SVN revision: 61293 --- legacy/ecore/doc/examples.dox | 124 +++++- legacy/ecore/src/examples/Makefile.am | 6 +- .../ecore/src/examples/ecore_thread_example.c | 394 ++++++++++++++++++ legacy/ecore/src/lib/ecore/Ecore.h | 2 + 4 files changed, 523 insertions(+), 3 deletions(-) create mode 100644 legacy/ecore/src/examples/ecore_thread_example.c diff --git a/legacy/ecore/doc/examples.dox b/legacy/ecore/doc/examples.dox index 2dbd6d64f5..09984f2f93 100644 --- a/legacy/ecore/doc/examples.dox +++ b/legacy/ecore/doc/examples.dox @@ -797,4 +797,126 @@ * @until } * * @example ecore_animator_example.c - */ \ No newline at end of file + */ + +/** + * @page ecore_thread_example_c Ecore_Thread - API overview + * + * Working with threads is hard. Ecore helps to do so a bit easier, but as + * the example in @ref ecore_thread_example.c "ecore_thread_example.c" shows, + * there's a lot to consider even when doing the most simple things. + * + * We'll be going through this thorough example now, showing how the differents + * aspects of @ref Ecore_Thread are used, but users are encourage to avoid + * threads unless it's really the only option, as they always add more + * complexity than the program usually requires. + * + * Ecore Threads come in two flavors, short jobs and feedback jobs. Short jobs + * just run the given function and are more commonly used for small tasks + * where the main loop does not need to know how the work is going in between. + * The short job in our example is so short we had to artifically enlarge it + * with @c sleep(). Other than that, it also uses threads local data to keep + * the data we are working with persistent across different jobs ran by the + * same system thread. This data will be freed when the no more jobs are + * pending and the thread is terminated. If the data doesn't exist in the + * thread's storage, we create it and save it there for future jobs to find + * it. If creation fails, we cancel ourselves, so the main loop knows that + * we didn't just exit normally, meaning the job could not be done. The main + * part of the function checks in each iteration if it was cancelled by the + * main loop, and if it was, it stops processing and clears the data from the + * storage (we assume @c cancel means no one else will need this, but this is + * really application dependant). + * @dontinclude ecore_thread_example.c + * @skip static void + * @until sleep(1) + * @until } + * @until } + * + * Feedback jobs, on the other hand, run tasks that will inform back to the + * main loop its progress, send partial data as is processed, just ping saying + * it's still alive and processing, or anything that needs the thread to talk + * back to the main loop. + * @skip static void + * @until the_end + * @until } + * + * Finally, one more feedback job, but this one will be running outside of + * Ecore's pool, so we can use the pool for real work and keep this very + * light function unchecked. All it does is check if some condition is met + * and send a message to the main loop telling it it's time to close. + * @skip static void + * @until } + * @until } + * @until } + * + * Every now and then the program prints its status, counting threads running + * and pending jobs. + * @skip static void + * @until } + * + * In our main loop, we'll be receiving messages from our feedback jobs using + * the same callback for both of them. + * @skip static void + * @until char *str + * + * The light job running out of the pool will let us know when we can exit our + * program. + * @until } + * + * Next comes the handling of data sent from the actual worker threads, always + * remembering that the data belongs to us now, and not the thread, so it's + * our responsibility to free it. + * @until } + * @until } + * + * Last, the condition to exit is given by how many messages we want to handle, + * so we need to count them and inform the condition checking thread that the + * value changed. + * @until } + * + * When a thread finishes its job or gets cancelled, the main loop is notified + * through the callbacks set when creating the task. In this case, we just + * print what happen and keep track of one of them used to exemplify cancelling. + * Here we are pretending one of our short jobs has a timeout, so if it doesn't + * finish before a timer is triggered, it will be cancelled. + * @skip static void + * @until _cancel_timer_cb + * @until } + * + * The main function does some setup that includes reading parameters from + * the command line to change its behaviour and test different results. + * These are: + * @li -t maximum number of threads to run at the same time. + * @li -p adds @c some_path to the list used by the feedback jobs. + * This parameter can be used multiple times. + * @li -m the number of messages to process before the program is + * signalled to exit. + * + * Skipping some bits, we init Ecore and our application data. + * @skip ecore_init + * @until appdata.max_msgs + * + * If any paths for the feedback jobs were given, we use them, otherwise we + * fallback to some defaults. Always initting the proper mutexes used by the + * threaded job. + * @skip path_list + * @until EINA_LIST_FREE + * @until } + * @until } + * + * Initialize the mutex needed for the condition checking thread + * @skip appdata.mutex + * @until appdata.condition + * + * And start our tasks. + * @until appdata.thread_3 + * @until EINA_FALSE + * + * To finalize, set a timer to cancel one of the tasks if it doesn't end + * before the timeout, one more timer for status report and get into the main + * loop. Once we are out, destroy our mutexes and finish the program. + * @until _status_timer_cb + * @until } + * + * @example ecore_thread_example.c + */ diff --git a/legacy/ecore/src/examples/Makefile.am b/legacy/ecore/src/examples/Makefile.am index 2b154d0575..a6614faf5a 100644 --- a/legacy/ecore/src/examples/Makefile.am +++ b/legacy/ecore/src/examples/Makefile.am @@ -37,7 +37,8 @@ SRCS = \ ecore_fd_handler_gnutls_example.c \ ecore_file_download_example.c \ ecore_pipe_simple_example.c \ - ecore_pipe_gstreamer_example.c + ecore_pipe_gstreamer_example.c \ + ecore_thread_example.c EXTRA_DIST = $(SRCS) @@ -61,7 +62,8 @@ pkglib_PROGRAMS += \ ecore_pipe_simple_example \ ecore_con_lookup_example \ ecore_con_url_headers_example \ - ecore_con_url_download_example + ecore_con_url_download_example \ + ecore_thread_example ecore_animator_example_LDADD = $(ECOREBASELDADD) @EVAS_LIBS@ $(top_builddir)/src/lib/ecore_evas/libecore_evas.la ecore_con_lookup_example_LDADD = $(ECOREBASELDADD) $(top_builddir)/src/lib/ecore_con/libecore_con.la diff --git a/legacy/ecore/src/examples/ecore_thread_example.c b/legacy/ecore/src/examples/ecore_thread_example.c new file mode 100644 index 0000000000..7028b25910 --- /dev/null +++ b/legacy/ecore/src/examples/ecore_thread_example.c @@ -0,0 +1,394 @@ +/* + * gcc -o ecore_thread_example ecore_thread_example.c `pkg-config --cflags --libs ecore` + */ +#include +#include +#include +#include +#include + +typedef struct +{ + Ecore_Thread *thread_3; + int msgs_received; + int max_msgs; + Eina_Lock mutex; + Eina_Condition condition; +} App_Data; + +typedef struct +{ + Eina_List *list; +} Thread_Data; + +typedef struct +{ + char *name; + char *base; + Eina_Lock mutex; +} Feedback_Thread_Data; + +typedef struct +{ + int all_done; + Eina_List *list; +} App_Msg; + +static void +_local_data_free(void *data) +{ + Thread_Data *td = data; + char *str; + + EINA_LIST_FREE(td->list, str) + { + printf("Freeing string: %s\n", str); + free(str); + } + free(td); +} + +static void +_short_job(void *data, Ecore_Thread *th) +{ + Thread_Data *td; + int i; + + td = ecore_thread_local_data_find(th, "data"); + if (!td) + { + td = calloc(1, sizeof(Thread_Data)); + if (!td) + { + ecore_thread_cancel(th); + return; + } + ecore_thread_local_data_add(th, "data", td, _local_data_free, + EINA_FALSE); + } + + for (i = 0; i < 10; i++) + { + char buf[200]; + + if (ecore_thread_check(th)) + { + ecore_thread_local_data_del(th, "data"); + break; + } + + snprintf(buf, sizeof(buf), "Thread %p: String number %d", th, i); + td->list = eina_list_append(td->list, strdup(buf)); + sleep(1); + } +} + +static void +_feedback_job(void *data, Ecore_Thread *th) +{ + time_t t; + int i, count; + Feedback_Thread_Data *ftd = NULL; + DIR *dir; + App_Msg *msg; + + count = (int)ecore_thread_global_data_find("count"); + for (i = 0; i < count; i++) + { + char buf[32]; + snprintf(buf, sizeof(buf), "data%d", i); + ftd = ecore_thread_global_data_find(buf); + if (!ftd) + continue; + if (eina_lock_take_try(&ftd->mutex)) + break; + else + ftd = NULL; + } + if (!ftd) + return; + + dir = opendir(ftd->base); + if (!dir) + goto the_end; + + msg = calloc(1, sizeof(App_Msg)); + + t = time(NULL); + while (time(NULL) < t + 2) + { + struct dirent entry, *result; + + if (readdir_r(dir, &entry, &result)) + break; + if (!result) + break; + + if (strlen(result->d_name) >= 10) + msg->list = eina_list_append(msg->list, + strdup(result->d_name)); + } + + closedir(dir); + ecore_thread_feedback(th, msg); + +the_end: + ecore_thread_global_data_del(ftd->name); + free(ftd->name); + free(ftd->base); + eina_lock_release(&ftd->mutex); + eina_lock_free(&ftd->mutex); + free(ftd); + ecore_thread_reschedule(th); +} + +static void +_out_of_pool_job(void *data, Ecore_Thread *th) +{ + App_Data *ad = data; + App_Msg *msg; + + while (1) + { + int msgs; + eina_condition_wait(&ad->condition); + msgs = ad->msgs_received; + eina_lock_release(&ad->mutex); + if (msgs == ad->max_msgs) + { + msg = calloc(1, sizeof(App_Msg)); + msg->all_done = 1; + ecore_thread_feedback(th, msg); + return; + } + } +} + +static void +_print_status(void) +{ + int active, pending_total, pending_feedback, pending_short, available; + + active = ecore_thread_active_get(); + pending_total = ecore_thread_pending_total_get(); + pending_feedback = ecore_thread_pending_feedback_get(); + pending_short = ecore_thread_pending_get(); + available = ecore_thread_available_get(); + + printf("Status:\n\t* Active threads: %d\n" + "\t* Available threads: %d\n" + "\t* Pending short jobs: %d\n" + "\t* Pending feedback jobs: %d\n" + "\t* Pending total: %d\n", active, available, pending_short, + pending_feedback, pending_total); +} + +static void +_feedback_job_msg_cb(void *data, Ecore_Thread *th, void *msg_data) +{ + App_Data *ad = data; + App_Msg *msg = msg_data; + char *str; + + if (msg->all_done) + { + ecore_main_loop_quit(); + free(msg); + return; + } + + _print_status(); + + if (!msg->list) + printf("Received an empty list from thread %p\n", th); + else + { + int i = 0; + printf("Received %d elements from threads %p (printing first 5):\n", + eina_list_count(msg->list), th); + EINA_LIST_FREE(msg->list, str) + { + if (i <= 5) + printf("\t%s\n", str); + free(str); + i++; + } + } + + eina_lock_take(&ad->mutex); + ad->msgs_received++; + eina_condition_signal(&ad->condition); + eina_lock_release(&ad->mutex); + + free(msg); +} + +static void +_thread_end_cb(void *data, Ecore_Thread *th) +{ + App_Data *ad = data; + + printf("Normal termination for thread %p.\n", th); + if (th == ad->thread_3) + ad->thread_3 = NULL; +} + +static void +_thread_cancel_cb(void *data, Ecore_Thread *th) +{ + App_Data *ad = data; + + printf("Thread %p got cancelled.\n", th); + if (th == ad->thread_3) + ad->thread_3 = NULL; +} + +static Eina_Bool +_cancel_timer_cb(void *data) +{ + App_Data *ad = data; + + if (ad->thread_3 && !ecore_thread_check(ad->thread_3)) + ecore_thread_cancel(ad->thread_3); + + return EINA_FALSE; +} + +static Eina_Bool +_status_timer_cb(void *data) +{ + _print_status(); + + return EINA_TRUE; +} + +static const Ecore_Getopt optdesc = { + "ecore_thread_example", + NULL, + "0.0", + "(C) 2011 Enlightenment", + "Public domain?", + "Example program for Ecore_Thread", + 0, + { + ECORE_GETOPT_STORE_INT('t', "threads", "Max number of threads to run"), + ECORE_GETOPT_STORE_INT('m', "msgs", "Max number of messages to receive"), + ECORE_GETOPT_APPEND_METAVAR('p', "path", "Add path for feedback job", + "STRING", ECORE_GETOPT_TYPE_STR), + ECORE_GETOPT_HELP('h', "help"), + ECORE_GETOPT_SENTINEL + } +}; + +int +main(int argc, char *argv[]) +{ + int i, max_threads = 0, max_msgs = 0; + Eina_Bool opt_quit = EINA_FALSE; + Eina_List *path_list = NULL; + App_Data appdata; + Ecore_Getopt_Value values[] = { + ECORE_GETOPT_VALUE_INT(max_threads), + ECORE_GETOPT_VALUE_INT(max_msgs), + ECORE_GETOPT_VALUE_LIST(path_list), + ECORE_GETOPT_VALUE_BOOL(opt_quit), + ECORE_GETOPT_VALUE_NONE + }; + + ecore_init(); + + i = ecore_thread_max_get(); + printf("Initial max threads: %d\n", i); + + memset(&appdata, 0, sizeof(App_Data)); + appdata.max_msgs = 1; + + if (ecore_getopt_parse(&optdesc, values, argc, argv) < 0) + { + printf("Argument parsing failed\n"); + return 1; + } + + if (opt_quit) + return 0; + + if (max_threads) + { + ecore_thread_max_set(max_threads); + printf("Max threads: %d\n", ecore_thread_max_get()); + } + if (max_msgs) + appdata.max_msgs = max_msgs; + + if (!path_list) + { + Feedback_Thread_Data *ftd; + ecore_thread_global_data_add("count", (void *)3, NULL, EINA_FALSE); + ftd = calloc(1, sizeof(Feedback_Thread_Data)); + ftd->name = strdup("data0"); + ftd->base = strdup("/usr/bin"); + eina_lock_new(&ftd->mutex); + ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE); + ftd = calloc(1, sizeof(Feedback_Thread_Data)); + ftd->name = strdup("data1"); + ftd->base = strdup("/usr/lib"); + eina_lock_new(&ftd->mutex); + ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE); + ftd = calloc(1, sizeof(Feedback_Thread_Data)); + ftd->name = strdup("data2"); + ftd->base = strdup("/usr/share"); + eina_lock_new(&ftd->mutex); + ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE); + } + else + { + Feedback_Thread_Data *ftd; + char *str; + ecore_thread_global_data_add("count", + (void *)eina_list_count(path_list), NULL, + EINA_FALSE); + i = 0; + EINA_LIST_FREE(path_list, str) + { + char buf[32]; + snprintf(buf, sizeof(buf), "data%d", i); + ftd = calloc(1, sizeof(Feedback_Thread_Data)); + ftd->name = strdup(buf); + ftd->base = strdup(str); + eina_lock_new(&ftd->mutex); + ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE); + free(str); + i++; + } + } + + eina_lock_new(&appdata.mutex); + eina_condition_new(&appdata.condition, &appdata.mutex); + + ecore_thread_feedback_run(_out_of_pool_job, _feedback_job_msg_cb, NULL, + NULL, &appdata, EINA_TRUE); + + ecore_thread_run(_short_job, _thread_end_cb, _thread_cancel_cb, &appdata); + ecore_thread_feedback_run(_feedback_job, _feedback_job_msg_cb, + _thread_end_cb, _thread_cancel_cb, &appdata, + EINA_FALSE); + appdata.thread_3 = ecore_thread_run(_short_job, _thread_end_cb, + _thread_cancel_cb, &appdata); + ecore_thread_feedback_run(_feedback_job, _feedback_job_msg_cb, + _thread_end_cb, _thread_cancel_cb, &appdata, + EINA_FALSE); + + ecore_timer_add(1.0, _cancel_timer_cb, &appdata); + ecore_timer_add(2.0, _status_timer_cb, NULL); + + _print_status(); + + ecore_main_loop_begin(); + + eina_condition_free(&appdata.condition); + eina_lock_free(&appdata.mutex); + + ecore_shutdown(); + + return 0; +} diff --git a/legacy/ecore/src/lib/ecore/Ecore.h b/legacy/ecore/src/lib/ecore/Ecore.h index 3f7ec7340b..c83bfd4971 100644 --- a/legacy/ecore/src/lib/ecore/Ecore.h +++ b/legacy/ecore/src/lib/ecore/Ecore.h @@ -826,6 +826,8 @@ extern "C" { * for the function running in the thread to send messages to the main * thread. * + * See an overview example in @ref ecore_thread_example_c. + * * @ingroup Ecore_Group * * @{