Improve the ecore test suite

no more print and lots more asserts
This commit is contained in:
Davide Andreoli 2016-08-14 13:02:19 +02:00
parent d6fc38ea59
commit 4bd421562d
11 changed files with 483 additions and 290 deletions

View File

@ -1,24 +1,35 @@
#!/usr/bin/env python
from efl import ecore
import unittest
import logging
def cb(n, t, a):
print("cb data: %s %s %s" % (n, t, a))
return True
def cb_false(n, t, a):
print("cb data false: %s %s %s" % (n, t, a))
return False
from efl import ecore
class TestTimer(unittest.TestCase):
def cb_renew1(self, n, t, a):
self.assertEqual(n, 123)
self.assertEqual(t, "teste")
self.assertEqual(a, 456)
return ecore.ECORE_CALLBACK_RENEW
def cb_renew2(self, n, t, a):
self.assertEqual(n, 789)
self.assertEqual(t, "bla")
self.assertEqual(a, "something in a")
return ecore.ECORE_CALLBACK_RENEW
def cb_cancel(self, n, t, a):
self.assertEqual(n, 666)
self.assertEqual(t, "bla")
self.assertEqual(a, "something else in a")
return ecore.ECORE_CALLBACK_CANCEL
def testInit(self):
t1 = ecore.timer_add(0.2, cb, 123, "teste", a=456)
t2 = ecore.Timer(0.5, cb, 789, "bla", a="something in a")
t3 = ecore.Timer(0.4, cb_false, 666, "bla", a="something else in a")
t1 = ecore.timer_add(0.2, self.cb_renew1, 123, "teste", a=456)
t2 = ecore.Timer(0.5, self.cb_renew2, 789, "bla", a="something in a")
t3 = ecore.Timer(0.4, self.cb_cancel, 666, "bla", a="something else in a")
t4 = ecore.timer_add(1, ecore.main_loop_quit)
self.assertIsInstance(t1, ecore.Timer)
@ -49,8 +60,10 @@ class TestTimer(unittest.TestCase):
self.assertEqual(t4.is_deleted(), True)
t1.delete()
self.assertEqual(t1.is_deleted(), True)
del t1
t2.delete()
self.assertEqual(t2.is_deleted(), True)
del t2
del t3 # already deleted since returned false
del t4 # already deleted since returned false

View File

@ -1,26 +1,31 @@
#!/usr/bin/env python
from efl import ecore
import unittest
import logging
def cb_true(n, t, a):
print("cb_true: %s %s %s" % (n, t, a))
return True
def cb_false(n, t, a):
print("cb_false: %s %s %s" % (n, t, a))
return False
from efl import ecore
class TestAnimator(unittest.TestCase):
def cb_renew(self, n, t, a):
self.assertEqual(n, 123)
self.assertEqual(t, "teste")
self.assertEqual(a, 456)
return ecore.ECORE_CALLBACK_RENEW
def cb_cancel(self, n, t, a):
self.assertEqual(n, 789)
self.assertEqual(t, "bla")
self.assertEqual(a, "something in a")
return ecore.ECORE_CALLBACK_CANCEL
def testInit(self):
ecore.animator_frametime_set(1.0/24.0)
self.assertEqual(ecore.animator_frametime_get(), 1.0/24.0)
a1 = ecore.animator_add(cb_true, 123, "teste", a=456)
a2 = ecore.Animator(cb_false, 789, "bla", a="something in a")
a1 = ecore.animator_add(self.cb_renew, 123, "teste", a=456)
a2 = ecore.Animator(self.cb_cancel, 789, "bla", a="something in a")
a3 = ecore.animator_add(ecore.main_loop_quit)
self.assertIsInstance(a1, ecore.Animator)
@ -46,6 +51,7 @@ class TestAnimator(unittest.TestCase):
self.assertEqual(a3.is_deleted(), True)
a1.delete()
self.assertEqual(a1.is_deleted(), True)
del a1
del a2 # already deleted since returned false
del a3 # already deleted since returned false

View File

@ -1,31 +1,30 @@
#!/usr/bin/env python
from efl import ecore
import unittest
import logging
counters = [0, 0]
params = None
def poller_cb():
counters[0] += 1
return True
def poller_cb2(one, two, three, test):
global params
params = (one, two, three)
counters[1] += 1
return False
from efl import ecore
class TestPoller(unittest.TestCase):
def testInit(self):
p1 = ecore.Poller(4, poller_cb)
p2 = ecore.Poller(2, poller_cb2, ecore.ECORE_POLLER_CORE,
def cb_renew(self):
self.counters[0] += 1
return ecore.ECORE_CALLBACK_RENEW
def cb_cancel(self, one, two, three, test):
self.assertEqual(one, "uno")
self.assertEqual(two, "due")
self.assertEqual(three, "tre")
self.assertEqual(test, self)
self.counters[1] += 1
return ecore.ECORE_CALLBACK_CANCEL
def testInit(self):
self.counters = [0, 0]
p1 = ecore.Poller(4, self.cb_renew)
p2 = ecore.Poller(2, self.cb_cancel, ecore.ECORE_POLLER_CORE,
"uno", "due", three="tre", test=self)
p3 = ecore.Poller(16, lambda: ecore.main_loop_quit())
@ -35,9 +34,20 @@ class TestPoller(unittest.TestCase):
ecore.main_loop_begin()
self.assertEqual(counters, [4, 1])
self.assertEqual(params, ("uno", "due", "tre"))
# all the callback has been called?
self.assertEqual(self.counters, [4, 1])
# not yet deleted since returned true
self.assertEqual(p1.is_deleted(), False)
p1.delete()
self.assertEqual(p1.is_deleted(), True)
del p1
# already deleted since returned false
self.assertEqual(p2.is_deleted(), True)
self.assertEqual(p3.is_deleted(), True)
del p2
del p3
if __name__ == '__main__':

View File

@ -1,48 +1,54 @@
#!/usr/bin/env python
from efl import ecore
import unittest
import logging
def cb_true(n, t, a):
print("cb_true: %s %s %s" % (n, t, a))
return True
def cb_false(n, t, a):
print("cb_false: %s %s %s" % (n, t, a))
return False
from efl import ecore
class TestIdler(unittest.TestCase):
def cb_renew(self, n, t, a):
self.assertEqual(n, 123)
self.assertEqual(t, "teste")
self.assertEqual(a, 456)
self.counters[0] += 1
return ecore.ECORE_CALLBACK_RENEW
def cb_cancel(self, n, t, a):
self.assertEqual(n, 789)
self.assertEqual(t, "bla")
self.assertEqual(a, "something in a")
self.counters[1] += 1
return ecore.ECORE_CALLBACK_CANCEL
def testInit(self):
i1 = ecore.idler_add(cb_true, 123, "teste", a=456)
i2 = ecore.Idler(cb_false, 789, "bla", a="something in a")
self.counters = [0, 0]
i1 = ecore.idler_add(self.cb_renew, 123, "teste", a=456)
i2 = ecore.Idler(self.cb_cancel, 789, "bla", a="something in a")
self.assertIsInstance(i1, ecore.Idler)
self.assertIsInstance(i2, ecore.Idler)
before1 = i1.__repr__()
before2 = i2.__repr__()
t = ecore.timer_add(1, ecore.main_loop_quit)
ecore.main_loop_begin()
after1 = i1.__repr__()
after2 = i2.__repr__()
# all the callback has been called?
self.assertTrue(self.counters[0] > 1)
self.assertTrue(self.counters[1] == 1)
self.assertEqual(before1, after1)
self.assertNotEqual(before2, after2) # already deleted
self.assertEqual(t.is_deleted(), True)
# not yet deleted since returned true
self.assertEqual(i1.is_deleted(), False)
self.assertEqual(i2.is_deleted(), True)
i1.delete()
del t
self.assertEqual(i1.is_deleted(), True)
del i1
del i2 # already deleted since returned false
# already deleted since returned false
self.assertEqual(i2.is_deleted(), True)
self.assertEqual(t.is_deleted(), True)
del i2
del t
if __name__ == '__main__':

View File

@ -1,65 +1,84 @@
#!/usr/bin/env python
from efl import ecore
import unittest
import logging
import time
def cb_true(n, t, a):
print("cb_true: %s %s %s" % (n, t, a))
return True
def cb_false(n, t, a):
print("cb_false: %s %s %s" % (n, t, a))
return False
def cb_idle():
print("idle...")
return True
def sleeper():
import time
print("sleep 0.1s")
time.sleep(0.1)
return True
from efl import ecore
class TestIdleEnterer(unittest.TestCase):
def testInit(self):
i1 = ecore.IdleEnterer(cb_true, 123, "teste", a=456)
i2 = ecore.IdleEnterer(cb_false, 789, "bla", a="something in a")
def cb_renew(self, n, t, a):
self.assertEqual(n, 123)
self.assertEqual(t, "teste")
self.assertEqual(a, 456)
self.counters[0] += 1
return ecore.ECORE_CALLBACK_RENEW
def cb_cancel(self, n, t, a):
self.assertEqual(n, 789)
self.assertEqual(t, "bla")
self.assertEqual(a, "something in a")
self.counters[1] += 1
return ecore.ECORE_CALLBACK_CANCEL
def cb_idle(self):
self.counters[2] += 1
return ecore.ECORE_CALLBACK_RENEW
def cb_sleeper(self):
time.sleep(0.1)
self.counters[3] += 1
return ecore.ECORE_CALLBACK_RENEW
def testInit(self):
self.counters = [0, 0, 0, 0]
i1 = ecore.IdleEnterer(self.cb_renew, 123, "teste", a=456)
i2 = ecore.IdleEnterer(self.cb_cancel, 789, "bla", a="something in a")
self.assertIsInstance(i1, ecore.IdleEnterer)
self.assertIsInstance(i2, ecore.IdleEnterer)
before1 = i1.__repr__()
before2 = i2.__repr__()
t = ecore.Timer(1, ecore.main_loop_quit)
timer = ecore.Timer(0.1, sleeper)
idler = ecore.Idler(cb_idle)
timer = ecore.Timer(0.1, self.cb_sleeper)
idler = ecore.Idler(self.cb_idle)
ecore.main_loop_begin()
timer.delete()
idler.delete()
after1 = i1.__repr__()
after2 = i2.__repr__()
# all the callback has been called?
self.assertTrue(self.counters[0] > 1)
self.assertTrue(self.counters[1] == 1)
self.assertTrue(self.counters[2] > 1)
self.assertTrue(self.counters[3] > 1)
self.assertEqual(before1, after1)
self.assertNotEqual(before2, after2) # already deleted
self.assertEqual(t.is_deleted(), True)
# "i1" not yet deleted since returned true
self.assertEqual(i1.is_deleted(), False)
self.assertEqual(i2.is_deleted(), True)
i1.delete()
self.assertEqual(i1.is_deleted(), True)
del i1
del i2 # already deleted since returned false
# "i2" already deleted since returned false
self.assertEqual(i2.is_deleted(), True)
del i2
# "t" already deleted since returned false
self.assertEqual(t.is_deleted(), True)
del t
del timer
# "idler" not yet deleted since returned true
self.assertEqual(idler.is_deleted(), False)
idler.delete()
self.assertEqual(idler.is_deleted(), True)
del idler
# "timer" not yet deleted since returned true
self.assertEqual(timer.is_deleted(), False)
timer.delete()
self.assertEqual(timer.is_deleted(), True)
del timer
if __name__ == '__main__':

View File

@ -1,65 +1,84 @@
#!/usr/bin/env python
from efl import ecore
import unittest
import logging
import time
def cb_true(n, t, a):
print("cb_true: %s %s %s" % (n, t, a))
return True
def cb_false(n, t, a):
print("cb_false: %s %s %s" % (n, t, a))
return False
def cb_idle():
print("idle...")
return True
def sleeper():
import time
print("sleep 0.1s")
time.sleep(0.1)
return True
from efl import ecore
class TestIdleExiter(unittest.TestCase):
def cb_renew(self, n, t, a):
self.assertEqual(n, 123)
self.assertEqual(t, "teste")
self.assertEqual(a, 456)
self.counters[0] += 1
return ecore.ECORE_CALLBACK_RENEW
def cb_cancel(self, n, t, a):
self.assertEqual(n, 789)
self.assertEqual(t, "bla")
self.assertEqual(a, "something in a")
self.counters[1] += 1
return ecore.ECORE_CALLBACK_CANCEL
def cb_idle(self):
self.counters[2] += 1
return ecore.ECORE_CALLBACK_RENEW
def cb_sleeper(self):
time.sleep(0.1)
self.counters[3] += 1
return ecore.ECORE_CALLBACK_RENEW
def testInit(self):
i1 = ecore.idle_exiter_add(cb_true, 123, "teste", a=456)
i2 = ecore.IdleExiter(cb_false, 789, "bla", a="something in a")
self.counters = [0, 0, 0, 0]
i1 = ecore.idle_exiter_add(self.cb_renew, 123, "teste", a=456)
i2 = ecore.IdleExiter(self.cb_cancel, 789, "bla", a="something in a")
self.assertIsInstance(i1, ecore.IdleExiter)
self.assertIsInstance(i2, ecore.IdleExiter)
before1 = i1.__repr__()
before2 = i2.__repr__()
timer = ecore.timer_add(0.1, self.cb_sleeper)
idler = ecore.idler_add(self.cb_idle)
t = ecore.timer_add(1, ecore.main_loop_quit)
timer = ecore.timer_add(0.1, sleeper)
idler = ecore.idler_add(cb_idle)
ecore.main_loop_begin()
timer.delete()
idler.delete()
after1 = i1.__repr__()
after2 = i2.__repr__()
# all the callback has been called?
self.assertTrue(self.counters[0] > 1)
self.assertTrue(self.counters[1] == 1)
self.assertTrue(self.counters[2] > 1)
self.assertTrue(self.counters[3] > 1)
self.assertEqual(before1, after1)
self.assertNotEqual(before2, after2) # already deleted
self.assertEqual(t.is_deleted(), True)
# "i1" not yet deleted since returned true
self.assertEqual(i1.is_deleted(), False)
self.assertEqual(i2.is_deleted(), True)
i1.delete()
self.assertEqual(i1.is_deleted(), True)
del i1
del i2 # already deleted since returned false
# "i2" already deleted since returned false
self.assertEqual(i2.is_deleted(), True)
del i2
# "t" already deleted since returned false
self.assertEqual(t.is_deleted(), True)
del t
del timer
# "idler" not yet deleted since returned true
self.assertEqual(idler.is_deleted(), False)
idler.delete()
self.assertEqual(idler.is_deleted(), True)
del idler
# "timer" not yet deleted since returned true
self.assertEqual(timer.is_deleted(), False)
timer.delete()
self.assertEqual(timer.is_deleted(), True)
del timer
if __name__ == '__main__':
formatter = logging.Formatter("[%(levelname)s] %(name)s (%(filename)s: %(lineno)d) --- %(message)s")

View File

@ -7,33 +7,59 @@ import logging
from efl import ecore
def cb_read(fd_handler, a, b):
data = os.read(fd_handler.fd, 50)
print("ready for read: %s, params: a=%s, b=%s " % (fd_handler, a, b))
return True
def timer_write(wfd):
print("write to fd: %s" % wfd)
os.write(wfd, b"[some data]")
return True
NUM_WRITE = 30
class TestFdHandler(unittest.TestCase):
def cb_read(self, fd_handler, a, b):
self.assertEqual(a, 123)
self.assertEqual(b, "xyz")
self.assertTrue(fd_handler.can_read())
self.assertFalse(fd_handler.can_write())
self.assertFalse(fd_handler.has_error())
self.assertEqual(fd_handler.fd, self.rfd)
data = os.read(fd_handler.fd, 32)
# print("READ:", data)
self.assertEqual(data, self.sent_data)
self.num_read += 1
if self.num_read >= NUM_WRITE:
ecore.main_loop_quit()
return ecore.ECORE_CALLBACK_RENEW
def write_timer(self, wfd):
self.sent_data = b"data for the reader #%d" % self.num_write
# print("WRIT:", self.sent_data)
os.write(wfd, self.sent_data)
self.num_write += 1
return ecore.ECORE_CALLBACK_RENEW
def testInit(self):
rfd, wfd = os.pipe()
fdh = ecore.fd_handler_add(rfd, ecore.ECORE_FD_READ, cb_read, 123, b="xyz")
self.num_write = 0
self.num_read = 0
self.sent_data = None
ecore.timer_add(0.2, timer_write, wfd)
self.rfd, wfd = os.pipe()
fdh = ecore.fd_handler_add(self.rfd, ecore.ECORE_FD_READ,
self.cb_read, 123, b="xyz")
print("before: fdh=%s" % fdh)
self.assertIsInstance(fdh, ecore.FdHandler)
ecore.timer_add(1, ecore.main_loop_quit)
ecore.timer_add(0.01, self.write_timer, wfd)
t = ecore.timer_add(1.0, ecore.main_loop_quit) # just a timeout (should quit earlier)
ecore.main_loop_begin()
print("main loop stopped")
t.delete()
print("after: fdh=%s" % fdh)
self.assertEqual(self.num_read, NUM_WRITE)
self.assertEqual(self.num_write, NUM_WRITE)
self.assertEqual(fdh.is_deleted(), False)
fdh.delete()
self.assertEqual(fdh.is_deleted(), True)
del fdh

View File

@ -1,86 +1,130 @@
#!/usr/bin/env python
from efl import ecore
import os
import unittest
import logging
from efl import ecore
script_path = os.path.dirname(os.path.realpath(__file__))
class TestExe(unittest.TestCase):
def testInit(self):
self.num_add = 0
self.num_del = 0
self.num_data = 0
# basic read flags with line support
flags = ecore.ECORE_EXE_PIPE_READ | ecore.ECORE_EXE_PIPE_READ_LINE_BUFFERED
flags = ecore.ECORE_EXE_PIPE_READ | \
ecore.ECORE_EXE_PIPE_READ_LINE_BUFFERED | \
ecore.ECORE_EXE_TERM_WITH_PARENT
# simple ls -la output, monitor for add, del and data (stdout)
# EXE 1: simple ls -la output, monitor for add, del and data (stdout)
def on_add(x, event, a, b, c):
print("ecore.Exe added:")
print(" exe : %s" % x)
print(" event: %s" % event)
print(" extra: arguments: a=%s, b=%s, c=%s" % (a, b, c))
print("")
self.assertEqual(x, exe)
self.assertEqual(x, exe1)
self.assertIsInstance(event, ecore.EventExeAdd)
self.assertEqual(a, 1)
self.assertEqual(b, 2)
self.assertEqual(c, 3)
self.num_add += 1
return ecore.ECORE_CALLBACK_RENEW
def on_del(x, event):
print("ecore.Exe deleted:")
print(" exe : %s" % x)
ecore.timer_add(1.0, ecore.main_loop_quit)
self.assertEqual(x, exe1)
self.assertIsInstance(event, ecore.EventExeDel)
self.num_del += 1
return ecore.ECORE_CALLBACK_RENEW
def on_data(x, event):
self.assertEqual(x, exe)
self.assertEqual(x, exe1)
self.assertIsInstance(event, ecore.EventExeData)
print("ecore.Exe data on stdout:")
print(" Exe : %s" % repr(exe))
print(" Event : %s" % repr(event))
for l in event.lines:
print(" %s" % repr(l))
print()
self.assertTrue(len(event.lines) >= 13) #we have at least 13 files here
self.num_data += 1
return ecore.ECORE_CALLBACK_RENEW
exe = ecore.Exe("ls -la", flags)
exe.on_add_event_add(on_add, 1, c=3, b=2)
exe.on_del_event_add(on_del)
exe.on_data_event_add(on_data)
exe1 = ecore.Exe('ls -l "%s"' % script_path, flags)
exe1.on_add_event_add(on_add, 1, c=3, b=2)
exe1.on_del_event_add(on_del)
exe1.on_data_event_add(on_data)
# EXE 2: use C-api like event handler, will catch all 3 Exe() instances
def catch_all_exe_add(event, pos1, pos2, karg1):
self.assertIsInstance(event, ecore.EventExeAdd)
self.assertEqual(pos1, "pos1")
self.assertEqual(pos2, "pos2")
self.assertEqual(karg1, "k")
self.num_add += 1
return ecore.ECORE_CALLBACK_RENEW
def catch_all_exe_del(event):
self.assertIsInstance(event, ecore.EventExeDel)
self.num_del += 1
return ecore.ECORE_CALLBACK_RENEW
def catch_all_exe_data(event):
self.assertIsInstance(event, ecore.EventExeData)
self.num_data += 1
return ecore.ECORE_CALLBACK_RENEW
ecore.on_exe_add_event_add(catch_all_exe_add, "pos1", "pos2", karg1="k")
ecore.on_exe_del_event_add(catch_all_exe_del)
ecore.on_exe_data_event_add(catch_all_exe_data)
exe2 = ecore.Exe('uname -a', flags)
# use C-api like event handler, will catch all 3 Exe() instances
def catch_all_exe_add(event):
print("")
print(">>> EXE ADDED: %s" % event)
print("")
return 1
ecore.on_exe_add_event_add(catch_all_exe_add)
ecore.Exe("ls -l /", flags)
# start cat, send data to it, then read, then send again, then kill it
# EXE 3: start "cat", send data to it, then read, send again and kill
def on_cat_pipe_add(x, event):
self.assertEqual(x, exe3)
self.assertIsInstance(event, ecore.EventExeAdd)
x.send(b"123\nabc\nxyz\n")
self.num_add += 1
return ecore.ECORE_CALLBACK_RENEW
def on_cat_pipe_del(x, event):
self.assertEqual(x, exe3)
self.assertIsInstance(event, ecore.EventExeDel)
self.num_del += 1
return ecore.ECORE_CALLBACK_RENEW
def on_cat_pipe_data(x, event):
self.assertEqual(x, exe3)
self.assertIsInstance(event, ecore.EventExeData)
self.assertEqual(event.lines, ["123", "abc", "xyz"])
x.on_data_event_del(on_cat_pipe_data)
self.num_data += 1
return ecore.ECORE_CALLBACK_CANCEL
def on_cat_pipe_data2(x, event):
print("cat pipe output:")
print("\n".join(event.lines))
print("")
self.assertEqual(x, exe3)
self.assertIsInstance(event, ecore.EventExeData)
x.send("some\nmore\nlines!\n")
if event.lines and event.lines[-1] == "lines!":
x.on_data_event_del(on_cat_pipe_data2)
x.kill() # otherwise it will stay there forever (will be detached)
x.kill()
self.num_data += 1
return ecore.ECORE_CALLBACK_RENEW
cat_pipe = ecore.Exe("cat", flags | ecore.ECORE_EXE_PIPE_WRITE)
cat_pipe.on_add_event_add(on_cat_pipe_add)
cat_pipe.on_data_event_add(on_cat_pipe_data)
cat_pipe.on_data_event_add(on_cat_pipe_data2)
exe3 = ecore.Exe("cat", flags | ecore.ECORE_EXE_PIPE_WRITE)
exe3.on_add_event_add(on_cat_pipe_add)
exe3.on_del_event_add(on_cat_pipe_del)
exe3.on_data_event_add(on_cat_pipe_data)
exe3.on_data_event_add(on_cat_pipe_data2)
# start everything (assuming 2 secs is enough for ls)
t = ecore.timer_add(2, ecore.main_loop_quit)
ecore.main_loop_begin()
t.delete()
# all the callback has been called?
self.assertEqual(self.num_add, 5)
self.assertEqual(self.num_del, 5)
self.assertEqual(self.num_data, 8)
if __name__ == '__main__':

View File

@ -1,35 +1,58 @@
#!/usr/bin/env python
from efl import ecore
import unittest
import os
import logging
from efl import ecore
def _completion_cb(dest, status):
print("Download Finished: file=%s, status=%s" % (dest, status))
ecore.main_loop_quit()
def _progress_cb(dest, dltotal, dlnow, ultotal, ulnow):
print("Download Progress: file=%s, dltotal=%d, dlnow=%d, ultotal=%d, ulnow=%d" %
(dest, dltotal, dlnow, ultotal, ulnow))
return ecore.ECORE_FILE_PROGRESS_CONTINUE
URL = "http://www.google.com"
DST = "/tmp/ecore_dwnl_test.html"
class TestFileDownload(unittest.TestCase):
def progress_cb(self, dest, dltotal, dlnow, ultotal, ulnow):
# print("Download Progress: file=%s, dltotal=%d, dlnow=%d, ultotal=%d, ulnow=%d" %
# (dest, dltotal, dlnow, ultotal, ulnow))
self.assertEqual(dest, DST)
self.dltotal = dltotal
return ecore.ECORE_FILE_PROGRESS_CONTINUE
def completion_cb(self, dest, status):
# print("Download Finished: file=%s, status=%s" % (dest, status))
self.assertEqual(dest, DST)
self.assertEqual(status, 200)
self.completed = True
self.complete_status = status
ecore.main_loop_quit()
def testInit(self):
dst = "/tmp/ecore_dwnl_test.html"
if os.path.exists(dst):
os.remove(dst)
self.dltotal = 0
self.completed = False
self.complete_status = 0
if os.path.exists(DST):
os.remove(DST)
self.assertEqual(ecore.file_download_protocol_available('http://'), True)
ecore.FileDownload("http://www.google.com", dst,
completion_cb = _completion_cb,
progress_cb = _progress_cb)
ecore.FileDownload(URL, DST,
completion_cb = self.completion_cb,
progress_cb = self.progress_cb)
t = ecore.timer_add(5, ecore.main_loop_quit) # just a timeout (should be faster)
ecore.main_loop_begin()
t.delete()
self.assertTrue(self.completed)
self.assertTrue(self.dltotal > 0)
self.assertEqual(self.complete_status, 200)
self.assertTrue(os.path.exists(DST))
# This doesn't work ... should it ??
# self.assertEqual(os.path.getsize(DST), self.dltotal)
self.assertTrue(os.path.getsize(DST) > 0)
os.remove(DST)
if __name__ == '__main__':

View File

@ -1,84 +1,88 @@
#!/usr/bin/env python
from efl import ecore
import unittest
import os
import tempfile
import logging
import os
counters = [0, 0, 0, 0, 0, 0 , 0, 0]
def monitor_cb(event, path, tmp_path):
""" for reference:
if event == ecore.ECORE_FILE_EVENT_MODIFIED:
print("EVENT_MODIFIED: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_CLOSED:
print("EVENT_CLOSED: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_CREATED_FILE:
print("ECORE_FILE_EVENT_CREATED_FILE: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_CREATED_DIRECTORY:
print("ECORE_FILE_EVENT_CREATED_DIRECTORY: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_DELETED_FILE:
print("ECORE_FILE_EVENT_DELETED_FILE: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_DELETED_DIRECTORY:
print("ECORE_FILE_EVENT_DELETED_DIRECTORY: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_DELETED_SELF:
print("ECORE_FILE_EVENT_DELETED_SELF: '%s'" % path)
"""
counters[event] += 1
def do_stuff(tmp_path, fm):
folder1 = os.path.join(tmp_path, "folder1")
folder2 = os.path.join(tmp_path, "folder2")
file1 = os.path.join(tmp_path, "file1")
file2 = os.path.join(tmp_path, "file2")
# this should trigger two ECORE_FILE_EVENT_CREATED_DIRECTORY
os.mkdir(folder1)
os.mkdir(folder2)
# this should trigger two ECORE_FILE_EVENT_DELETED_DIRECTORY
os.rmdir(folder1)
os.rmdir(folder2)
# this should trigger two ECORE_FILE_EVENT_CREATED_FILE
fp1 = open(file1, 'a')
fp2 = open(file2, 'a')
# this should trigger two ECORE_FILE_EVENT_MODIFIED
fp1.write("nothing to say")
fp2.write("nothing to say")
# this should trigger two ECORE_FILE_EVENT_CLOSED
fp1.close()
fp2.close()
# this should trigger two ECORE_FILE_EVENT_DELETED_FILE
os.remove(file1)
os.remove(file2)
# this should trigger one ECORE_FILE_EVENT_DELETED_SELF !!! we get two
os.rmdir(tmp_path)
from efl import ecore
class TestFileMonitor(unittest.TestCase):
def monitor_cb(self, event, path, tmp_path):
""" for reference:
if event == ecore.ECORE_FILE_EVENT_MODIFIED:
print("EVENT_MODIFIED: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_CLOSED:
print("EVENT_CLOSED: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_CREATED_FILE:
print("ECORE_FILE_EVENT_CREATED_FILE: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_CREATED_DIRECTORY:
print("ECORE_FILE_EVENT_CREATED_DIRECTORY: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_DELETED_FILE:
print("ECORE_FILE_EVENT_DELETED_FILE: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_DELETED_DIRECTORY:
print("ECORE_FILE_EVENT_DELETED_DIRECTORY: '%s'" % path)
elif event == ecore.ECORE_FILE_EVENT_DELETED_SELF:
print("ECORE_FILE_EVENT_DELETED_SELF: '%s'" % path)
"""
self.counters[event] += 1
def do_stuff(self, tmp_path, fm):
folder1 = os.path.join(tmp_path, "folder1")
folder2 = os.path.join(tmp_path, "folder2")
file1 = os.path.join(tmp_path, "file1")
file2 = os.path.join(tmp_path, "file2")
# this should trigger two ECORE_FILE_EVENT_CREATED_DIRECTORY
os.mkdir(folder1)
os.mkdir(folder2)
# this should trigger two ECORE_FILE_EVENT_DELETED_DIRECTORY
os.rmdir(folder1)
os.rmdir(folder2)
# this should trigger two ECORE_FILE_EVENT_CREATED_FILE
fp1 = open(file1, 'a')
fp2 = open(file2, 'a')
# this should trigger two ECORE_FILE_EVENT_MODIFIED
fp1.write("nothing to say")
fp2.write("nothing to say")
# this should trigger two ECORE_FILE_EVENT_CLOSED
fp1.close()
fp2.close()
# this should trigger two ECORE_FILE_EVENT_DELETED_FILE
os.remove(file1)
os.remove(file2)
# this should trigger one ECORE_FILE_EVENT_DELETED_SELF !!! we get two
os.rmdir(tmp_path)
return ecore.ECORE_CALLBACK_CANCEL
def testInit(self):
self. counters = [0, 0, 0, 0, 0, 0 , 0, 0]
path = tempfile.mkdtemp()
fm = ecore.FileMonitor(path, monitor_cb, path)
fm = ecore.FileMonitor(path, self.monitor_cb, path)
self.assertIsInstance(fm, ecore.FileMonitor)
self.assertEqual(fm.path, path)
ecore.Timer(0.1, do_stuff, path, fm)
ecore.Timer(1.0, lambda: ecore.main_loop_quit())
ecore.Timer(0.01, self.do_stuff, path, fm)
t = ecore.Timer(1.0, ecore.main_loop_quit)
ecore.main_loop_begin()
t.delete()
self.assertEqual(fm.path, path)
fm.delete()
# FIXME: we receive two ECORE_FILE_EVENT_DELETED_SELF, it's wrong
# should be [0, 2, 2, 2, 2, 1, 2, 2]
self.assertEqual(counters, [0, 2, 2, 2, 2, 2, 2, 2])
self.assertEqual(self.counters, [0, 2, 2, 2, 2, 2, 2, 2])
if __name__ == '__main__':

View File

@ -8,6 +8,8 @@ import logging
from efl import ecore, ecore_con
TIMEOUT = 5.0 # seconds
class TestCon(unittest.TestCase):
def testLookup(self):
@ -16,11 +18,18 @@ class TestCon(unittest.TestCase):
self.assertEqual(ip, '8.8.8.8')
self.assertEqual(arg1, "arg1")
self.assertEqual(my_karg, 1234)
self.complete = True
ecore.main_loop_quit()
self.complete = False
ecore_con.Lookup('google-public-dns-a.google.com',
_dns_complete, "arg1", my_karg=1234)
t = ecore.Timer(TIMEOUT, ecore.main_loop_quit)
ecore.main_loop_begin()
t.delete()
self.assertTrue(self.complete)
def testUrl(self):
self.complete_counter = 0
@ -64,7 +73,9 @@ class TestCon(unittest.TestCase):
self.assertTrue(u.get()) #perform the GET request
t = ecore.Timer(TIMEOUT, ecore.main_loop_quit)
ecore.main_loop_begin()
t.delete()
self.assertEqual(u.status_code, 200) # assume net is ok
self.assertEqual(self.complete_counter, 16)
@ -76,7 +87,6 @@ class TestCon(unittest.TestCase):
u.delete()
def testUrlDelete(self):
self.test_url1 = 'http://www.example.com'
self.test_url2 = 'http://www.google.com'
self.complete_counter = 0
@ -85,11 +95,15 @@ class TestCon(unittest.TestCase):
self.assertIsInstance(event, ecore_con.EventUrlComplete)
self.assertEqual(event.url.url, self.test_url1)
self.complete_counter += 1
if self.complete_counter >= 11:
ecore.main_loop_quit()
def _on_complete2(event):
self.assertIsInstance(event, ecore_con.EventUrlComplete)
self.assertEqual(event.url.url, self.test_url2)
self.complete_counter += 10
if self.complete_counter >= 11:
ecore.main_loop_quit()
u1 = ecore_con.Url(self.test_url1)
u1.on_complete_event_add(_on_complete1)
@ -105,8 +119,9 @@ class TestCon(unittest.TestCase):
self.assertTrue(u1.get()) #perform the GET request
self.assertTrue(u2.get()) #perform the GET request
ecore.Timer(2.5, lambda: ecore.main_loop_quit())
t = ecore.Timer(TIMEOUT, ecore.main_loop_quit)
ecore.main_loop_begin()
t.delete()
self.assertEqual(u1.status_code, 200) # assume net is ok
self.assertEqual(u2.status_code, 200) # assume net is ok
@ -117,8 +132,10 @@ class TestCon(unittest.TestCase):
def testUrlToFile(self):
self.test_url = 'http://www.example.com'
self.complete = False
def _on_complete(event):
self.complete = True
ecore.main_loop_quit()
fd, path = tempfile.mkstemp()
@ -126,7 +143,11 @@ class TestCon(unittest.TestCase):
u.on_complete_event_add(_on_complete)
u.get()
t = ecore.Timer(TIMEOUT, ecore.main_loop_quit)
ecore.main_loop_begin()
t.delete()
self.assertEqual(self.complete, True)
self.assertEqual(u.status_code, 200) # assume net is ok
self.assertEqual(os.path.getsize(path), u.received_bytes)
os.unlink(path)
@ -147,10 +168,12 @@ class TestCon(unittest.TestCase):
u.on_complete_event_add(_on_complete)
# u.post(self.data_to_post, "multipart/form-data")
u.post(self.data_to_post, "text/txt")
ecore.main_loop_begin()
self.assertEqual(u.status_code, 200) # assume net is ok
t = ecore.Timer(TIMEOUT, ecore.main_loop_quit)
ecore.main_loop_begin()
t.delete()
self.assertEqual(u.status_code, 200) # assume net is ok
u.delete()
if __name__ == '__main__':