Python-EFL: make ecore Exe and FdHandler py3 friendly. svn ignore++

SVN revision: 84029
This commit is contained in:
Davide Andreoli 2013-02-17 20:53:51 +00:00
parent 53e832d0af
commit 50cf3b0a0a
5 changed files with 94 additions and 73 deletions

6
TODO
View File

@ -12,7 +12,6 @@ TODO:
* ecore.Poller
* ecore.FileMonitor
* alert on signal and subprocess module usage (was in python-ecore/ecore/__init__.py)
* Ecore test_08_exe.py : Use unittests
* evas.SmartObject
* edje.Edit
* edje: complete the unit tests
@ -28,11 +27,6 @@ TODO:
* elm.Notify align_set/get/prop
* cleanup elementary_object
TODO FOR PYTHON 3:
* ecore.Exe (use new buffer interface)
* ecore.FdHandler (use new buffer interface)
STUFF LEAVED OUT:

View File

@ -16,10 +16,10 @@
# along with this Python-EFL. If not, see <http://www.gnu.org/licenses/>.
# TODO: remove me after usage is update to new buffer api
cdef extern from "Python.h":
int PyObject_AsReadBuffer(obj, void **buffer, Py_ssize_t *buffer_len) except -1
object PyUnicode_FromStringAndSize(char *s, Py_ssize_t len)
int PyObject_GetBuffer(obj, Py_buffer *view, int flags)
void PyBuffer_Release(Py_buffer *view)
cdef exe_flags2str(int value):
@ -247,6 +247,7 @@ cdef class Exe(object):
def free(self):
self.delete()
"""
def send(self, buffer, long size=0):
cdef const_void *b_data
cdef Py_ssize_t b_size
@ -261,7 +262,27 @@ cdef class Exe(object):
(size, b_size))
return bool(ecore_exe_send(self.exe, b_data, size))
"""
def send(self, buf, long size=0):
cdef Py_buffer buf_view
if isinstance(buf, (str, unicode)):
buf = buf.encode("UTF-8")
PyObject_GetBuffer(buf, &buf_view, 0)
if size <= 0:
size = buf_view.len
elif size > buf_view.len:
raise ValueError(
"given size (%d) is larger than buffer size (%d)." %
(size, buf_view.len))
ret = bool(ecore_exe_send(self.exe, <const_void *>buf_view.buf, buf_view.len))
PyBuffer_Release(&buf_view)
return ret
def close_stdin(self):
ecore_exe_close_stdin(self.exe)
@ -417,11 +438,11 @@ cdef class Exe(object):
filter.callback_del(func, args, kargs)
def exe_run(char *exe_cmd, data=None):
def exe_run(exe_cmd, data=None):
return Exe(exe_cmd, data=data)
def exe_pipe_run(char *exe_cmd, int flags=0, data=None):
def exe_pipe_run(exe_cmd, int flags=0, data=None):
return Exe(exe_cmd, flags, data)

View File

@ -19,8 +19,7 @@ def pkg_config(name, require, min_vers=None):
try:
sys.stdout.write("Checking for " + name + ": ")
ver = subprocess.check_output(["pkg-config", "--modversion", require]).decode("utf-8").strip()
#if min_vers is not None:
if False:
if min_vers is not None:
assert 0 == subprocess.call(["pkg-config", "--atleast-version", min_vers, require])
cflags = subprocess.check_output(["pkg-config", "--cflags", require]).decode("utf-8").split()
libs = subprocess.check_output(["pkg-config", "--libs", require]).decode("utf-8").split()
@ -178,7 +177,7 @@ if __name__ == "__main__":
url = "http://www.enlightenment.org",
description = "Python bindings for the EFL stack",
license = "GNU Lesser General Public License (LGPL)",
packages = ["efl"],
packages = ["efl", "efl.elementary"],
cmdclass = {"build_ext": build_ext},
#ext_modules = modules
ext_modules = cythonize(modules, include_path=["include",], compiler_directives={"embedsignature": False}),

View File

@ -12,7 +12,7 @@ def cb_read(fd_handler, a, b):
def timer_write(wfd):
print("write to fd: %s" % wfd)
os.write(wfd, "[some data]")
os.write(wfd, b"[some data]")
return True

View File

@ -1,81 +1,88 @@
#!/usr/bin/env python
from efl import ecore
import unittest
# basic read flags with line support
flags = ecore.ECORE_EXE_PIPE_READ | ecore.ECORE_EXE_PIPE_READ_LINE_BUFFERED
class TestExe(unittest.TestCase):
def testInit(self):
# basic read flags with line support
flags = ecore.ECORE_EXE_PIPE_READ | ecore.ECORE_EXE_PIPE_READ_LINE_BUFFERED
# simple ls -la output, monitor for add, del and data (stdout)
def on_add(x, event, a, b, c):
print("ecore.Exe added:")
print(" exe : %s" % x)
print(" event: %s" % event)
print(" extra: arguments: a=%s, b=%s, c=%s" % (a, b, c))
print("")
# simple ls -la output, monitor for add, del and data (stdout)
def on_add(x, event, a, b, c):
print("ecore.Exe added:")
print(" exe : %s" % x)
print(" event: %s" % event)
print(" extra: arguments: a=%s, b=%s, c=%s" % (a, b, c))
print("")
self.assertEqual(x, exe)
self.assertIsInstance(event, ecore.EventExeAdd)
self.assertEqual(a, 1)
self.assertEqual(b, 2)
self.assertEqual(c, 3)
assert x == exe
assert isinstance(event, ecore.EventExeAdd)
assert a == 1
assert b == 2
assert c == 3
def on_del(x, event):
print("ecore.Exe deleted:")
print(" exe : %s" % x)
ecore.timer_add(1.0, ecore.main_loop_quit)
def on_del(x, event):
print("ecore.Exe deleted:")
print(" exe : %s" % x)
ecore.timer_add(1.0, ecore.main_loop_quit)
def on_data(x, event):
self.assertEqual(x, exe)
self.assertIsInstance(event, ecore.EventExeData)
print("ecore.Exe data on stdout:")
print(" Exe : %s" % repr(exe))
print(" Event : %s" % repr(event))
for l in event.lines:
print(" %s" % repr(l))
print
def on_data(x, event):
assert x == exe
assert isinstance(event, ecore.EventExeData)
print("ecore.Exe data on stdout:")
print(" Exe : %s" % repr(exe))
print(" Event : %s" % repr(event))
for l in event.lines:
print(" %s" % repr(l))
print
exe = ecore.Exe("ls -la", flags)
exe.on_add_event_add(on_add, 1, c=3, b=2)
exe.on_del_event_add(on_del)
exe.on_data_event_add(on_data)
exe = ecore.Exe("ls -la", flags)
exe.on_add_event_add(on_add, 1, c=3, b=2)
exe.on_del_event_add(on_del)
exe.on_data_event_add(on_data)
# use C-api like event handler, will catch all 3 Exe() instances
def catch_all_exe_add(event):
print("")
print(">>> EXE ADDED: %s" % event)
print("")
return 1
# use C-api like event handler, will catch all 3 Exe() instances
def catch_all_exe_add(event):
print("")
print(">>> EXE ADDED: %s" % event)
print("")
return 1
ecore.on_exe_add_event_add(catch_all_exe_add)
ecore.Exe("ls -l /", flags)
ecore.on_exe_add_event_add(catch_all_exe_add)
ecore.Exe("ls -l /", flags)
# start cat, send data to it, then read, then send again, then kill it
def on_cat_pipe_add(x, event):
x.send("123\nabc\nxyz\n")
# start cat, send data to it, then read, then send again, then kill it
def on_cat_pipe_add(x, event):
x.send(b"123\nabc\nxyz\n")
def on_cat_pipe_data(x, event):
assert event.lines == ["123", "abc", "xyz"]
x.on_data_event_del(on_cat_pipe_data)
def on_cat_pipe_data(x, event):
self.assertEqual(event.lines, ["123", "abc", "xyz"])
x.on_data_event_del(on_cat_pipe_data)
def on_cat_pipe_data2(x, event):
print("cat pipe output:")
print("\n".join(event.lines))
print("")
x.send("some\nmore\nlines!\n")
if event.lines and event.lines[-1] == "lines!":
x.on_data_event_del(on_cat_pipe_data2)
x.kill() # otherwise it will stay there forever (will be detached)
def on_cat_pipe_data2(x, event):
print("cat pipe output:")
print("\n".join(event.lines))
print("")
x.send("some\nmore\nlines!\n")
if event.lines and event.lines[-1] == "lines!":
x.on_data_event_del(on_cat_pipe_data2)
x.kill() # otherwise it will stay there forever (will be detached)
cat_pipe = ecore.Exe("cat", flags | ecore.ECORE_EXE_PIPE_WRITE)
cat_pipe.on_add_event_add(on_cat_pipe_add)
cat_pipe.on_data_event_add(on_cat_pipe_data)
cat_pipe.on_data_event_add(on_cat_pipe_data2)
cat_pipe = ecore.Exe("cat", flags | ecore.ECORE_EXE_PIPE_WRITE)
cat_pipe.on_add_event_add(on_cat_pipe_add)
cat_pipe.on_data_event_add(on_cat_pipe_data)
cat_pipe.on_data_event_add(on_cat_pipe_data2)
ecore.main_loop_begin()
ecore.main_loop_begin()
if __name__ == '__main__':
unittest.main(verbosity=2)
ecore.shutdown()