From 4bd421562d2c2403ef2000ce3d903b991eae7f6e Mon Sep 17 00:00:00 2001 From: Dave Andreoli Date: Sun, 14 Aug 2016 13:02:19 +0200 Subject: [PATCH] Improve the ecore test suite no more print and lots more asserts --- tests/ecore/test_01_timer.py | 37 +++++--- tests/ecore/test_02_animator.py | 28 +++--- tests/ecore/test_03_poller.py | 52 +++++----- tests/ecore/test_04_idler.py | 56 ++++++----- tests/ecore/test_05_idle_enterer.py | 97 +++++++++++-------- tests/ecore/test_06_idle_exiter.py | 95 +++++++++++-------- tests/ecore/test_07_fd_handler.py | 60 ++++++++---- tests/ecore/test_08_exe.py | 136 ++++++++++++++++++--------- tests/ecore/test_09_file_download.py | 53 ++++++++--- tests/ecore/test_10_file_monitor.py | 126 +++++++++++++------------ tests/ecore/test_11_con.py | 33 ++++++- 11 files changed, 483 insertions(+), 290 deletions(-) diff --git a/tests/ecore/test_01_timer.py b/tests/ecore/test_01_timer.py index 81709aa..d1fbfcf 100644 --- a/tests/ecore/test_01_timer.py +++ b/tests/ecore/test_01_timer.py @@ -1,24 +1,35 @@ #!/usr/bin/env python -from efl import ecore import unittest import logging - -def cb(n, t, a): - print("cb data: %s %s %s" % (n, t, a)) - return True - -def cb_false(n, t, a): - print("cb data false: %s %s %s" % (n, t, a)) - return False +from efl import ecore class TestTimer(unittest.TestCase): + + def cb_renew1(self, n, t, a): + self.assertEqual(n, 123) + self.assertEqual(t, "teste") + self.assertEqual(a, 456) + return ecore.ECORE_CALLBACK_RENEW + + def cb_renew2(self, n, t, a): + self.assertEqual(n, 789) + self.assertEqual(t, "bla") + self.assertEqual(a, "something in a") + return ecore.ECORE_CALLBACK_RENEW + + def cb_cancel(self, n, t, a): + self.assertEqual(n, 666) + self.assertEqual(t, "bla") + self.assertEqual(a, "something else in a") + return ecore.ECORE_CALLBACK_CANCEL + def testInit(self): - t1 = ecore.timer_add(0.2, cb, 123, "teste", a=456) - t2 = ecore.Timer(0.5, cb, 789, "bla", a="something in a") - t3 = ecore.Timer(0.4, cb_false, 666, "bla", a="something else in a") + t1 = ecore.timer_add(0.2, self.cb_renew1, 123, "teste", a=456) + t2 = ecore.Timer(0.5, self.cb_renew2, 789, "bla", a="something in a") + t3 = ecore.Timer(0.4, self.cb_cancel, 666, "bla", a="something else in a") t4 = ecore.timer_add(1, ecore.main_loop_quit) self.assertIsInstance(t1, ecore.Timer) @@ -49,8 +60,10 @@ class TestTimer(unittest.TestCase): self.assertEqual(t4.is_deleted(), True) t1.delete() + self.assertEqual(t1.is_deleted(), True) del t1 t2.delete() + self.assertEqual(t2.is_deleted(), True) del t2 del t3 # already deleted since returned false del t4 # already deleted since returned false diff --git a/tests/ecore/test_02_animator.py b/tests/ecore/test_02_animator.py index 36aa33c..e699ed7 100644 --- a/tests/ecore/test_02_animator.py +++ b/tests/ecore/test_02_animator.py @@ -1,26 +1,31 @@ #!/usr/bin/env python -from efl import ecore import unittest import logging - -def cb_true(n, t, a): - print("cb_true: %s %s %s" % (n, t, a)) - return True - -def cb_false(n, t, a): - print("cb_false: %s %s %s" % (n, t, a)) - return False +from efl import ecore class TestAnimator(unittest.TestCase): + + def cb_renew(self, n, t, a): + self.assertEqual(n, 123) + self.assertEqual(t, "teste") + self.assertEqual(a, 456) + return ecore.ECORE_CALLBACK_RENEW + + def cb_cancel(self, n, t, a): + self.assertEqual(n, 789) + self.assertEqual(t, "bla") + self.assertEqual(a, "something in a") + return ecore.ECORE_CALLBACK_CANCEL + def testInit(self): ecore.animator_frametime_set(1.0/24.0) self.assertEqual(ecore.animator_frametime_get(), 1.0/24.0) - a1 = ecore.animator_add(cb_true, 123, "teste", a=456) - a2 = ecore.Animator(cb_false, 789, "bla", a="something in a") + a1 = ecore.animator_add(self.cb_renew, 123, "teste", a=456) + a2 = ecore.Animator(self.cb_cancel, 789, "bla", a="something in a") a3 = ecore.animator_add(ecore.main_loop_quit) self.assertIsInstance(a1, ecore.Animator) @@ -46,6 +51,7 @@ class TestAnimator(unittest.TestCase): self.assertEqual(a3.is_deleted(), True) a1.delete() + self.assertEqual(a1.is_deleted(), True) del a1 del a2 # already deleted since returned false del a3 # already deleted since returned false diff --git a/tests/ecore/test_03_poller.py b/tests/ecore/test_03_poller.py index cdfeadb..c85eada 100644 --- a/tests/ecore/test_03_poller.py +++ b/tests/ecore/test_03_poller.py @@ -1,31 +1,30 @@ #!/usr/bin/env python -from efl import ecore import unittest import logging - -counters = [0, 0] -params = None - - -def poller_cb(): - counters[0] += 1 - return True - -def poller_cb2(one, two, three, test): - global params - - params = (one, two, three) - counters[1] += 1 - return False +from efl import ecore class TestPoller(unittest.TestCase): - def testInit(self): - p1 = ecore.Poller(4, poller_cb) - p2 = ecore.Poller(2, poller_cb2, ecore.ECORE_POLLER_CORE, + def cb_renew(self): + self.counters[0] += 1 + return ecore.ECORE_CALLBACK_RENEW + + def cb_cancel(self, one, two, three, test): + self.assertEqual(one, "uno") + self.assertEqual(two, "due") + self.assertEqual(three, "tre") + self.assertEqual(test, self) + self.counters[1] += 1 + return ecore.ECORE_CALLBACK_CANCEL + + def testInit(self): + self.counters = [0, 0] + + p1 = ecore.Poller(4, self.cb_renew) + p2 = ecore.Poller(2, self.cb_cancel, ecore.ECORE_POLLER_CORE, "uno", "due", three="tre", test=self) p3 = ecore.Poller(16, lambda: ecore.main_loop_quit()) @@ -35,9 +34,20 @@ class TestPoller(unittest.TestCase): ecore.main_loop_begin() - self.assertEqual(counters, [4, 1]) - self.assertEqual(params, ("uno", "due", "tre")) + # all the callback has been called? + self.assertEqual(self.counters, [4, 1]) + # not yet deleted since returned true + self.assertEqual(p1.is_deleted(), False) + p1.delete() + self.assertEqual(p1.is_deleted(), True) + del p1 + + # already deleted since returned false + self.assertEqual(p2.is_deleted(), True) + self.assertEqual(p3.is_deleted(), True) + del p2 + del p3 if __name__ == '__main__': diff --git a/tests/ecore/test_04_idler.py b/tests/ecore/test_04_idler.py index dc9b100..7480df7 100644 --- a/tests/ecore/test_04_idler.py +++ b/tests/ecore/test_04_idler.py @@ -1,48 +1,54 @@ #!/usr/bin/env python -from efl import ecore import unittest import logging - -def cb_true(n, t, a): - print("cb_true: %s %s %s" % (n, t, a)) - return True - -def cb_false(n, t, a): - print("cb_false: %s %s %s" % (n, t, a)) - return False +from efl import ecore class TestIdler(unittest.TestCase): + + def cb_renew(self, n, t, a): + self.assertEqual(n, 123) + self.assertEqual(t, "teste") + self.assertEqual(a, 456) + self.counters[0] += 1 + return ecore.ECORE_CALLBACK_RENEW + + def cb_cancel(self, n, t, a): + self.assertEqual(n, 789) + self.assertEqual(t, "bla") + self.assertEqual(a, "something in a") + self.counters[1] += 1 + return ecore.ECORE_CALLBACK_CANCEL + def testInit(self): - i1 = ecore.idler_add(cb_true, 123, "teste", a=456) - i2 = ecore.Idler(cb_false, 789, "bla", a="something in a") + self.counters = [0, 0] + + i1 = ecore.idler_add(self.cb_renew, 123, "teste", a=456) + i2 = ecore.Idler(self.cb_cancel, 789, "bla", a="something in a") self.assertIsInstance(i1, ecore.Idler) self.assertIsInstance(i2, ecore.Idler) - before1 = i1.__repr__() - before2 = i2.__repr__() - t = ecore.timer_add(1, ecore.main_loop_quit) ecore.main_loop_begin() - after1 = i1.__repr__() - after2 = i2.__repr__() + # all the callback has been called? + self.assertTrue(self.counters[0] > 1) + self.assertTrue(self.counters[1] == 1) - self.assertEqual(before1, after1) - self.assertNotEqual(before2, after2) # already deleted - - self.assertEqual(t.is_deleted(), True) + # not yet deleted since returned true self.assertEqual(i1.is_deleted(), False) - self.assertEqual(i2.is_deleted(), True) - - i1.delete() - del t + self.assertEqual(i1.is_deleted(), True) del i1 - del i2 # already deleted since returned false + + # already deleted since returned false + self.assertEqual(i2.is_deleted(), True) + self.assertEqual(t.is_deleted(), True) + del i2 + del t if __name__ == '__main__': diff --git a/tests/ecore/test_05_idle_enterer.py b/tests/ecore/test_05_idle_enterer.py index e71863c..23d50ff 100644 --- a/tests/ecore/test_05_idle_enterer.py +++ b/tests/ecore/test_05_idle_enterer.py @@ -1,65 +1,84 @@ #!/usr/bin/env python -from efl import ecore import unittest import logging +import time - -def cb_true(n, t, a): - print("cb_true: %s %s %s" % (n, t, a)) - return True - -def cb_false(n, t, a): - print("cb_false: %s %s %s" % (n, t, a)) - return False - -def cb_idle(): - print("idle...") - return True - -def sleeper(): - import time - print("sleep 0.1s") - time.sleep(0.1) - return True +from efl import ecore class TestIdleEnterer(unittest.TestCase): - def testInit(self): - i1 = ecore.IdleEnterer(cb_true, 123, "teste", a=456) - i2 = ecore.IdleEnterer(cb_false, 789, "bla", a="something in a") + def cb_renew(self, n, t, a): + self.assertEqual(n, 123) + self.assertEqual(t, "teste") + self.assertEqual(a, 456) + self.counters[0] += 1 + return ecore.ECORE_CALLBACK_RENEW + + def cb_cancel(self, n, t, a): + self.assertEqual(n, 789) + self.assertEqual(t, "bla") + self.assertEqual(a, "something in a") + self.counters[1] += 1 + return ecore.ECORE_CALLBACK_CANCEL + + def cb_idle(self): + self.counters[2] += 1 + return ecore.ECORE_CALLBACK_RENEW + + def cb_sleeper(self): + time.sleep(0.1) + self.counters[3] += 1 + return ecore.ECORE_CALLBACK_RENEW + + def testInit(self): + self.counters = [0, 0, 0, 0] + + i1 = ecore.IdleEnterer(self.cb_renew, 123, "teste", a=456) + i2 = ecore.IdleEnterer(self.cb_cancel, 789, "bla", a="something in a") self.assertIsInstance(i1, ecore.IdleEnterer) self.assertIsInstance(i2, ecore.IdleEnterer) - before1 = i1.__repr__() - before2 = i2.__repr__() - t = ecore.Timer(1, ecore.main_loop_quit) - timer = ecore.Timer(0.1, sleeper) - idler = ecore.Idler(cb_idle) + timer = ecore.Timer(0.1, self.cb_sleeper) + idler = ecore.Idler(self.cb_idle) ecore.main_loop_begin() - timer.delete() - idler.delete() - after1 = i1.__repr__() - after2 = i2.__repr__() + # all the callback has been called? + self.assertTrue(self.counters[0] > 1) + self.assertTrue(self.counters[1] == 1) + self.assertTrue(self.counters[2] > 1) + self.assertTrue(self.counters[3] > 1) - self.assertEqual(before1, after1) - self.assertNotEqual(before2, after2) # already deleted - - self.assertEqual(t.is_deleted(), True) + # "i1" not yet deleted since returned true self.assertEqual(i1.is_deleted(), False) - self.assertEqual(i2.is_deleted(), True) - i1.delete() + self.assertEqual(i1.is_deleted(), True) del i1 - del i2 # already deleted since returned false + + # "i2" already deleted since returned false + self.assertEqual(i2.is_deleted(), True) + del i2 + + # "t" already deleted since returned false + self.assertEqual(t.is_deleted(), True) del t - del timer + + # "idler" not yet deleted since returned true + self.assertEqual(idler.is_deleted(), False) + idler.delete() + self.assertEqual(idler.is_deleted(), True) del idler + + # "timer" not yet deleted since returned true + self.assertEqual(timer.is_deleted(), False) + timer.delete() + self.assertEqual(timer.is_deleted(), True) + del timer + if __name__ == '__main__': diff --git a/tests/ecore/test_06_idle_exiter.py b/tests/ecore/test_06_idle_exiter.py index 4eddbaa..399b020 100644 --- a/tests/ecore/test_06_idle_exiter.py +++ b/tests/ecore/test_06_idle_exiter.py @@ -1,65 +1,84 @@ #!/usr/bin/env python -from efl import ecore import unittest import logging +import time - -def cb_true(n, t, a): - print("cb_true: %s %s %s" % (n, t, a)) - return True - -def cb_false(n, t, a): - print("cb_false: %s %s %s" % (n, t, a)) - return False - -def cb_idle(): - print("idle...") - return True - -def sleeper(): - import time - print("sleep 0.1s") - time.sleep(0.1) - return True +from efl import ecore class TestIdleExiter(unittest.TestCase): + + def cb_renew(self, n, t, a): + self.assertEqual(n, 123) + self.assertEqual(t, "teste") + self.assertEqual(a, 456) + self.counters[0] += 1 + return ecore.ECORE_CALLBACK_RENEW + + def cb_cancel(self, n, t, a): + self.assertEqual(n, 789) + self.assertEqual(t, "bla") + self.assertEqual(a, "something in a") + self.counters[1] += 1 + return ecore.ECORE_CALLBACK_CANCEL + + def cb_idle(self): + self.counters[2] += 1 + return ecore.ECORE_CALLBACK_RENEW + + def cb_sleeper(self): + time.sleep(0.1) + self.counters[3] += 1 + return ecore.ECORE_CALLBACK_RENEW + def testInit(self): - i1 = ecore.idle_exiter_add(cb_true, 123, "teste", a=456) - i2 = ecore.IdleExiter(cb_false, 789, "bla", a="something in a") + self.counters = [0, 0, 0, 0] + + i1 = ecore.idle_exiter_add(self.cb_renew, 123, "teste", a=456) + i2 = ecore.IdleExiter(self.cb_cancel, 789, "bla", a="something in a") self.assertIsInstance(i1, ecore.IdleExiter) self.assertIsInstance(i2, ecore.IdleExiter) - before1 = i1.__repr__() - before2 = i2.__repr__() + timer = ecore.timer_add(0.1, self.cb_sleeper) + idler = ecore.idler_add(self.cb_idle) t = ecore.timer_add(1, ecore.main_loop_quit) - timer = ecore.timer_add(0.1, sleeper) - idler = ecore.idler_add(cb_idle) - ecore.main_loop_begin() - timer.delete() - idler.delete() - after1 = i1.__repr__() - after2 = i2.__repr__() + # all the callback has been called? + self.assertTrue(self.counters[0] > 1) + self.assertTrue(self.counters[1] == 1) + self.assertTrue(self.counters[2] > 1) + self.assertTrue(self.counters[3] > 1) - self.assertEqual(before1, after1) - self.assertNotEqual(before2, after2) # already deleted - - self.assertEqual(t.is_deleted(), True) + # "i1" not yet deleted since returned true self.assertEqual(i1.is_deleted(), False) - self.assertEqual(i2.is_deleted(), True) - i1.delete() + self.assertEqual(i1.is_deleted(), True) del i1 - del i2 # already deleted since returned false + + # "i2" already deleted since returned false + self.assertEqual(i2.is_deleted(), True) + del i2 + + # "t" already deleted since returned false + self.assertEqual(t.is_deleted(), True) del t - del timer + + # "idler" not yet deleted since returned true + self.assertEqual(idler.is_deleted(), False) + idler.delete() + self.assertEqual(idler.is_deleted(), True) del idler + # "timer" not yet deleted since returned true + self.assertEqual(timer.is_deleted(), False) + timer.delete() + self.assertEqual(timer.is_deleted(), True) + del timer + if __name__ == '__main__': formatter = logging.Formatter("[%(levelname)s] %(name)s (%(filename)s: %(lineno)d) --- %(message)s") diff --git a/tests/ecore/test_07_fd_handler.py b/tests/ecore/test_07_fd_handler.py index 0dcdd85..76127be 100644 --- a/tests/ecore/test_07_fd_handler.py +++ b/tests/ecore/test_07_fd_handler.py @@ -7,33 +7,59 @@ import logging from efl import ecore -def cb_read(fd_handler, a, b): - data = os.read(fd_handler.fd, 50) - print("ready for read: %s, params: a=%s, b=%s " % (fd_handler, a, b)) - return True - -def timer_write(wfd): - print("write to fd: %s" % wfd) - os.write(wfd, b"[some data]") - return True - +NUM_WRITE = 30 class TestFdHandler(unittest.TestCase): + + def cb_read(self, fd_handler, a, b): + self.assertEqual(a, 123) + self.assertEqual(b, "xyz") + + self.assertTrue(fd_handler.can_read()) + self.assertFalse(fd_handler.can_write()) + self.assertFalse(fd_handler.has_error()) + self.assertEqual(fd_handler.fd, self.rfd) + + data = os.read(fd_handler.fd, 32) + # print("READ:", data) + self.assertEqual(data, self.sent_data) + + self.num_read += 1 + if self.num_read >= NUM_WRITE: + ecore.main_loop_quit() + + return ecore.ECORE_CALLBACK_RENEW + + def write_timer(self, wfd): + self.sent_data = b"data for the reader #%d" % self.num_write + # print("WRIT:", self.sent_data) + os.write(wfd, self.sent_data) + + self.num_write += 1 + return ecore.ECORE_CALLBACK_RENEW + def testInit(self): - rfd, wfd = os.pipe() - fdh = ecore.fd_handler_add(rfd, ecore.ECORE_FD_READ, cb_read, 123, b="xyz") + self.num_write = 0 + self.num_read = 0 + self.sent_data = None - ecore.timer_add(0.2, timer_write, wfd) + self.rfd, wfd = os.pipe() + fdh = ecore.fd_handler_add(self.rfd, ecore.ECORE_FD_READ, + self.cb_read, 123, b="xyz") - print("before: fdh=%s" % fdh) + self.assertIsInstance(fdh, ecore.FdHandler) - ecore.timer_add(1, ecore.main_loop_quit) + ecore.timer_add(0.01, self.write_timer, wfd) + t = ecore.timer_add(1.0, ecore.main_loop_quit) # just a timeout (should quit earlier) ecore.main_loop_begin() - print("main loop stopped") + t.delete() - print("after: fdh=%s" % fdh) + self.assertEqual(self.num_read, NUM_WRITE) + self.assertEqual(self.num_write, NUM_WRITE) + self.assertEqual(fdh.is_deleted(), False) fdh.delete() + self.assertEqual(fdh.is_deleted(), True) del fdh diff --git a/tests/ecore/test_08_exe.py b/tests/ecore/test_08_exe.py index b5a8635..adde85f 100644 --- a/tests/ecore/test_08_exe.py +++ b/tests/ecore/test_08_exe.py @@ -1,86 +1,130 @@ #!/usr/bin/env python -from efl import ecore + +import os import unittest import logging +from efl import ecore + + +script_path = os.path.dirname(os.path.realpath(__file__)) + class TestExe(unittest.TestCase): def testInit(self): + self.num_add = 0 + self.num_del = 0 + self.num_data = 0 + # basic read flags with line support - flags = ecore.ECORE_EXE_PIPE_READ | ecore.ECORE_EXE_PIPE_READ_LINE_BUFFERED + flags = ecore.ECORE_EXE_PIPE_READ | \ + ecore.ECORE_EXE_PIPE_READ_LINE_BUFFERED | \ + ecore.ECORE_EXE_TERM_WITH_PARENT - # simple ls -la output, monitor for add, del and data (stdout) + # EXE 1: simple ls -la output, monitor for add, del and data (stdout) def on_add(x, event, a, b, c): - print("ecore.Exe added:") - print(" exe : %s" % x) - print(" event: %s" % event) - print(" extra: arguments: a=%s, b=%s, c=%s" % (a, b, c)) - print("") - - self.assertEqual(x, exe) + self.assertEqual(x, exe1) self.assertIsInstance(event, ecore.EventExeAdd) self.assertEqual(a, 1) self.assertEqual(b, 2) self.assertEqual(c, 3) + self.num_add += 1 + return ecore.ECORE_CALLBACK_RENEW def on_del(x, event): - print("ecore.Exe deleted:") - print(" exe : %s" % x) - ecore.timer_add(1.0, ecore.main_loop_quit) + self.assertEqual(x, exe1) + self.assertIsInstance(event, ecore.EventExeDel) + self.num_del += 1 + return ecore.ECORE_CALLBACK_RENEW def on_data(x, event): - self.assertEqual(x, exe) + self.assertEqual(x, exe1) self.assertIsInstance(event, ecore.EventExeData) - print("ecore.Exe data on stdout:") - print(" Exe : %s" % repr(exe)) - print(" Event : %s" % repr(event)) - for l in event.lines: - print(" %s" % repr(l)) - print() + self.assertTrue(len(event.lines) >= 13) #we have at least 13 files here + self.num_data += 1 + return ecore.ECORE_CALLBACK_RENEW - exe = ecore.Exe("ls -la", flags) - exe.on_add_event_add(on_add, 1, c=3, b=2) - exe.on_del_event_add(on_del) - exe.on_data_event_add(on_data) + exe1 = ecore.Exe('ls -l "%s"' % script_path, flags) + exe1.on_add_event_add(on_add, 1, c=3, b=2) + exe1.on_del_event_add(on_del) + exe1.on_data_event_add(on_data) + + # EXE 2: use C-api like event handler, will catch all 3 Exe() instances + def catch_all_exe_add(event, pos1, pos2, karg1): + self.assertIsInstance(event, ecore.EventExeAdd) + self.assertEqual(pos1, "pos1") + self.assertEqual(pos2, "pos2") + self.assertEqual(karg1, "k") + self.num_add += 1 + return ecore.ECORE_CALLBACK_RENEW + + def catch_all_exe_del(event): + self.assertIsInstance(event, ecore.EventExeDel) + self.num_del += 1 + return ecore.ECORE_CALLBACK_RENEW + + def catch_all_exe_data(event): + self.assertIsInstance(event, ecore.EventExeData) + self.num_data += 1 + return ecore.ECORE_CALLBACK_RENEW + + ecore.on_exe_add_event_add(catch_all_exe_add, "pos1", "pos2", karg1="k") + ecore.on_exe_del_event_add(catch_all_exe_del) + ecore.on_exe_data_event_add(catch_all_exe_data) + exe2 = ecore.Exe('uname -a', flags) - - # use C-api like event handler, will catch all 3 Exe() instances - def catch_all_exe_add(event): - print("") - print(">>> EXE ADDED: %s" % event) - print("") - return 1 - - ecore.on_exe_add_event_add(catch_all_exe_add) - ecore.Exe("ls -l /", flags) - - - - # start cat, send data to it, then read, then send again, then kill it + # EXE 3: start "cat", send data to it, then read, send again and kill def on_cat_pipe_add(x, event): + self.assertEqual(x, exe3) + self.assertIsInstance(event, ecore.EventExeAdd) x.send(b"123\nabc\nxyz\n") + self.num_add += 1 + return ecore.ECORE_CALLBACK_RENEW + + def on_cat_pipe_del(x, event): + self.assertEqual(x, exe3) + self.assertIsInstance(event, ecore.EventExeDel) + + self.num_del += 1 + return ecore.ECORE_CALLBACK_RENEW def on_cat_pipe_data(x, event): + self.assertEqual(x, exe3) + self.assertIsInstance(event, ecore.EventExeData) self.assertEqual(event.lines, ["123", "abc", "xyz"]) x.on_data_event_del(on_cat_pipe_data) + self.num_data += 1 + return ecore.ECORE_CALLBACK_CANCEL def on_cat_pipe_data2(x, event): - print("cat pipe output:") - print("\n".join(event.lines)) - print("") + self.assertEqual(x, exe3) + self.assertIsInstance(event, ecore.EventExeData) x.send("some\nmore\nlines!\n") if event.lines and event.lines[-1] == "lines!": x.on_data_event_del(on_cat_pipe_data2) - x.kill() # otherwise it will stay there forever (will be detached) + x.kill() + self.num_data += 1 + return ecore.ECORE_CALLBACK_RENEW - cat_pipe = ecore.Exe("cat", flags | ecore.ECORE_EXE_PIPE_WRITE) - cat_pipe.on_add_event_add(on_cat_pipe_add) - cat_pipe.on_data_event_add(on_cat_pipe_data) - cat_pipe.on_data_event_add(on_cat_pipe_data2) + exe3 = ecore.Exe("cat", flags | ecore.ECORE_EXE_PIPE_WRITE) + exe3.on_add_event_add(on_cat_pipe_add) + exe3.on_del_event_add(on_cat_pipe_del) + exe3.on_data_event_add(on_cat_pipe_data) + exe3.on_data_event_add(on_cat_pipe_data2) + + # start everything (assuming 2 secs is enough for ls) + t = ecore.timer_add(2, ecore.main_loop_quit) ecore.main_loop_begin() + t.delete() + + + # all the callback has been called? + self.assertEqual(self.num_add, 5) + self.assertEqual(self.num_del, 5) + self.assertEqual(self.num_data, 8) if __name__ == '__main__': diff --git a/tests/ecore/test_09_file_download.py b/tests/ecore/test_09_file_download.py index edd88b0..8e60928 100644 --- a/tests/ecore/test_09_file_download.py +++ b/tests/ecore/test_09_file_download.py @@ -1,35 +1,58 @@ #!/usr/bin/env python -from efl import ecore import unittest import os import logging +from efl import ecore -def _completion_cb(dest, status): - print("Download Finished: file=%s, status=%s" % (dest, status)) - ecore.main_loop_quit() - -def _progress_cb(dest, dltotal, dlnow, ultotal, ulnow): - print("Download Progress: file=%s, dltotal=%d, dlnow=%d, ultotal=%d, ulnow=%d" % - (dest, dltotal, dlnow, ultotal, ulnow)) - return ecore.ECORE_FILE_PROGRESS_CONTINUE +URL = "http://www.google.com" +DST = "/tmp/ecore_dwnl_test.html" class TestFileDownload(unittest.TestCase): + + def progress_cb(self, dest, dltotal, dlnow, ultotal, ulnow): + # print("Download Progress: file=%s, dltotal=%d, dlnow=%d, ultotal=%d, ulnow=%d" % + # (dest, dltotal, dlnow, ultotal, ulnow)) + self.assertEqual(dest, DST) + self.dltotal = dltotal + return ecore.ECORE_FILE_PROGRESS_CONTINUE + + def completion_cb(self, dest, status): + # print("Download Finished: file=%s, status=%s" % (dest, status)) + self.assertEqual(dest, DST) + self.assertEqual(status, 200) + self.completed = True + self.complete_status = status + ecore.main_loop_quit() + def testInit(self): - dst = "/tmp/ecore_dwnl_test.html" - if os.path.exists(dst): - os.remove(dst) + self.dltotal = 0 + self.completed = False + self.complete_status = 0 + + if os.path.exists(DST): + os.remove(DST) self.assertEqual(ecore.file_download_protocol_available('http://'), True) - ecore.FileDownload("http://www.google.com", dst, - completion_cb = _completion_cb, - progress_cb = _progress_cb) + ecore.FileDownload(URL, DST, + completion_cb = self.completion_cb, + progress_cb = self.progress_cb) + t = ecore.timer_add(5, ecore.main_loop_quit) # just a timeout (should be faster) ecore.main_loop_begin() + t.delete() + self.assertTrue(self.completed) + self.assertTrue(self.dltotal > 0) + self.assertEqual(self.complete_status, 200) + self.assertTrue(os.path.exists(DST)) + # This doesn't work ... should it ?? + # self.assertEqual(os.path.getsize(DST), self.dltotal) + self.assertTrue(os.path.getsize(DST) > 0) + os.remove(DST) if __name__ == '__main__': diff --git a/tests/ecore/test_10_file_monitor.py b/tests/ecore/test_10_file_monitor.py index 05af9eb..e147da0 100644 --- a/tests/ecore/test_10_file_monitor.py +++ b/tests/ecore/test_10_file_monitor.py @@ -1,84 +1,88 @@ #!/usr/bin/env python -from efl import ecore import unittest -import os import tempfile import logging +import os - -counters = [0, 0, 0, 0, 0, 0 , 0, 0] - -def monitor_cb(event, path, tmp_path): - """ for reference: - if event == ecore.ECORE_FILE_EVENT_MODIFIED: - print("EVENT_MODIFIED: '%s'" % path) - elif event == ecore.ECORE_FILE_EVENT_CLOSED: - print("EVENT_CLOSED: '%s'" % path) - elif event == ecore.ECORE_FILE_EVENT_CREATED_FILE: - print("ECORE_FILE_EVENT_CREATED_FILE: '%s'" % path) - elif event == ecore.ECORE_FILE_EVENT_CREATED_DIRECTORY: - print("ECORE_FILE_EVENT_CREATED_DIRECTORY: '%s'" % path) - elif event == ecore.ECORE_FILE_EVENT_DELETED_FILE: - print("ECORE_FILE_EVENT_DELETED_FILE: '%s'" % path) - elif event == ecore.ECORE_FILE_EVENT_DELETED_DIRECTORY: - print("ECORE_FILE_EVENT_DELETED_DIRECTORY: '%s'" % path) - elif event == ecore.ECORE_FILE_EVENT_DELETED_SELF: - print("ECORE_FILE_EVENT_DELETED_SELF: '%s'" % path) - """ - counters[event] += 1 - -def do_stuff(tmp_path, fm): - folder1 = os.path.join(tmp_path, "folder1") - folder2 = os.path.join(tmp_path, "folder2") - file1 = os.path.join(tmp_path, "file1") - file2 = os.path.join(tmp_path, "file2") - - # this should trigger two ECORE_FILE_EVENT_CREATED_DIRECTORY - os.mkdir(folder1) - os.mkdir(folder2) - - # this should trigger two ECORE_FILE_EVENT_DELETED_DIRECTORY - os.rmdir(folder1) - os.rmdir(folder2) - - # this should trigger two ECORE_FILE_EVENT_CREATED_FILE - fp1 = open(file1, 'a') - fp2 = open(file2, 'a') - - # this should trigger two ECORE_FILE_EVENT_MODIFIED - fp1.write("nothing to say") - fp2.write("nothing to say") - - # this should trigger two ECORE_FILE_EVENT_CLOSED - fp1.close() - fp2.close() - - # this should trigger two ECORE_FILE_EVENT_DELETED_FILE - os.remove(file1) - os.remove(file2) - - # this should trigger one ECORE_FILE_EVENT_DELETED_SELF !!! we get two - os.rmdir(tmp_path) +from efl import ecore class TestFileMonitor(unittest.TestCase): + + def monitor_cb(self, event, path, tmp_path): + """ for reference: + if event == ecore.ECORE_FILE_EVENT_MODIFIED: + print("EVENT_MODIFIED: '%s'" % path) + elif event == ecore.ECORE_FILE_EVENT_CLOSED: + print("EVENT_CLOSED: '%s'" % path) + elif event == ecore.ECORE_FILE_EVENT_CREATED_FILE: + print("ECORE_FILE_EVENT_CREATED_FILE: '%s'" % path) + elif event == ecore.ECORE_FILE_EVENT_CREATED_DIRECTORY: + print("ECORE_FILE_EVENT_CREATED_DIRECTORY: '%s'" % path) + elif event == ecore.ECORE_FILE_EVENT_DELETED_FILE: + print("ECORE_FILE_EVENT_DELETED_FILE: '%s'" % path) + elif event == ecore.ECORE_FILE_EVENT_DELETED_DIRECTORY: + print("ECORE_FILE_EVENT_DELETED_DIRECTORY: '%s'" % path) + elif event == ecore.ECORE_FILE_EVENT_DELETED_SELF: + print("ECORE_FILE_EVENT_DELETED_SELF: '%s'" % path) + """ + self.counters[event] += 1 + + def do_stuff(self, tmp_path, fm): + folder1 = os.path.join(tmp_path, "folder1") + folder2 = os.path.join(tmp_path, "folder2") + file1 = os.path.join(tmp_path, "file1") + file2 = os.path.join(tmp_path, "file2") + + # this should trigger two ECORE_FILE_EVENT_CREATED_DIRECTORY + os.mkdir(folder1) + os.mkdir(folder2) + + # this should trigger two ECORE_FILE_EVENT_DELETED_DIRECTORY + os.rmdir(folder1) + os.rmdir(folder2) + + # this should trigger two ECORE_FILE_EVENT_CREATED_FILE + fp1 = open(file1, 'a') + fp2 = open(file2, 'a') + + # this should trigger two ECORE_FILE_EVENT_MODIFIED + fp1.write("nothing to say") + fp2.write("nothing to say") + + # this should trigger two ECORE_FILE_EVENT_CLOSED + fp1.close() + fp2.close() + + # this should trigger two ECORE_FILE_EVENT_DELETED_FILE + os.remove(file1) + os.remove(file2) + + # this should trigger one ECORE_FILE_EVENT_DELETED_SELF !!! we get two + os.rmdir(tmp_path) + + return ecore.ECORE_CALLBACK_CANCEL + def testInit(self): + self. counters = [0, 0, 0, 0, 0, 0 , 0, 0] + path = tempfile.mkdtemp() - fm = ecore.FileMonitor(path, monitor_cb, path) + fm = ecore.FileMonitor(path, self.monitor_cb, path) self.assertIsInstance(fm, ecore.FileMonitor) self.assertEqual(fm.path, path) - ecore.Timer(0.1, do_stuff, path, fm) - ecore.Timer(1.0, lambda: ecore.main_loop_quit()) - + ecore.Timer(0.01, self.do_stuff, path, fm) + t = ecore.Timer(1.0, ecore.main_loop_quit) ecore.main_loop_begin() + t.delete() + self.assertEqual(fm.path, path) fm.delete() # FIXME: we receive two ECORE_FILE_EVENT_DELETED_SELF, it's wrong # should be [0, 2, 2, 2, 2, 1, 2, 2] - self.assertEqual(counters, [0, 2, 2, 2, 2, 2, 2, 2]) + self.assertEqual(self.counters, [0, 2, 2, 2, 2, 2, 2, 2]) if __name__ == '__main__': diff --git a/tests/ecore/test_11_con.py b/tests/ecore/test_11_con.py index 1a6208f..d0f9b21 100644 --- a/tests/ecore/test_11_con.py +++ b/tests/ecore/test_11_con.py @@ -8,6 +8,8 @@ import logging from efl import ecore, ecore_con +TIMEOUT = 5.0 # seconds + class TestCon(unittest.TestCase): def testLookup(self): @@ -16,11 +18,18 @@ class TestCon(unittest.TestCase): self.assertEqual(ip, '8.8.8.8') self.assertEqual(arg1, "arg1") self.assertEqual(my_karg, 1234) + self.complete = True ecore.main_loop_quit() + self.complete = False ecore_con.Lookup('google-public-dns-a.google.com', _dns_complete, "arg1", my_karg=1234) + + t = ecore.Timer(TIMEOUT, ecore.main_loop_quit) ecore.main_loop_begin() + t.delete() + + self.assertTrue(self.complete) def testUrl(self): self.complete_counter = 0 @@ -64,7 +73,9 @@ class TestCon(unittest.TestCase): self.assertTrue(u.get()) #perform the GET request + t = ecore.Timer(TIMEOUT, ecore.main_loop_quit) ecore.main_loop_begin() + t.delete() self.assertEqual(u.status_code, 200) # assume net is ok self.assertEqual(self.complete_counter, 16) @@ -76,7 +87,6 @@ class TestCon(unittest.TestCase): u.delete() def testUrlDelete(self): - self.test_url1 = 'http://www.example.com' self.test_url2 = 'http://www.google.com' self.complete_counter = 0 @@ -85,11 +95,15 @@ class TestCon(unittest.TestCase): self.assertIsInstance(event, ecore_con.EventUrlComplete) self.assertEqual(event.url.url, self.test_url1) self.complete_counter += 1 + if self.complete_counter >= 11: + ecore.main_loop_quit() def _on_complete2(event): self.assertIsInstance(event, ecore_con.EventUrlComplete) self.assertEqual(event.url.url, self.test_url2) self.complete_counter += 10 + if self.complete_counter >= 11: + ecore.main_loop_quit() u1 = ecore_con.Url(self.test_url1) u1.on_complete_event_add(_on_complete1) @@ -105,8 +119,9 @@ class TestCon(unittest.TestCase): self.assertTrue(u1.get()) #perform the GET request self.assertTrue(u2.get()) #perform the GET request - ecore.Timer(2.5, lambda: ecore.main_loop_quit()) + t = ecore.Timer(TIMEOUT, ecore.main_loop_quit) ecore.main_loop_begin() + t.delete() self.assertEqual(u1.status_code, 200) # assume net is ok self.assertEqual(u2.status_code, 200) # assume net is ok @@ -117,8 +132,10 @@ class TestCon(unittest.TestCase): def testUrlToFile(self): self.test_url = 'http://www.example.com' + self.complete = False def _on_complete(event): + self.complete = True ecore.main_loop_quit() fd, path = tempfile.mkstemp() @@ -126,7 +143,11 @@ class TestCon(unittest.TestCase): u.on_complete_event_add(_on_complete) u.get() + t = ecore.Timer(TIMEOUT, ecore.main_loop_quit) ecore.main_loop_begin() + t.delete() + + self.assertEqual(self.complete, True) self.assertEqual(u.status_code, 200) # assume net is ok self.assertEqual(os.path.getsize(path), u.received_bytes) os.unlink(path) @@ -147,10 +168,12 @@ class TestCon(unittest.TestCase): u.on_complete_event_add(_on_complete) # u.post(self.data_to_post, "multipart/form-data") u.post(self.data_to_post, "text/txt") - - ecore.main_loop_begin() - self.assertEqual(u.status_code, 200) # assume net is ok + t = ecore.Timer(TIMEOUT, ecore.main_loop_quit) + ecore.main_loop_begin() + t.delete() + + self.assertEqual(u.status_code, 200) # assume net is ok u.delete() if __name__ == '__main__':