python-efl/examples/dbus/test_dbus_spy.py

602 lines
20 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import dbus
import json
import xml.etree.ElementTree as ElementTree
from efl import evas
from efl import elementary as elm
from efl.elementary.window import StandardWindow
from efl.elementary.background import Background
from efl.elementary.box import Box
from efl.elementary.button import Button
from efl.elementary.check import Check
from efl.elementary.entry import Entry, \
Entry_markup_to_utf8 as markup_to_utf8, \
Entry_utf8_to_markup as utf8_to_markup
from efl.elementary.flipselector import FlipSelector
from efl.elementary.label import Label
from efl.elementary.panes import Panes
from efl.elementary.popup import Popup
from efl.elementary.separator import Separator
from efl.elementary.genlist import Genlist, GenlistItem, GenlistItemClass
from efl.dbus_mainloop import DBusEcoreMainLoop
class Options(object):
"""class to contain application options"""
def __init__(self):
self.hide_introspect_stuff = False
self.pretty_output = True
### connect to session and system buses, and set session as the current one
session_bus = dbus.SessionBus(mainloop=DBusEcoreMainLoop())
system_bus = dbus.SystemBus(mainloop=DBusEcoreMainLoop())
bus = session_bus
options = Options()
### Classes to describe various DBus nodes
class DBusNode(object):
"""base object for the others DBus nodes"""
def __init__(self, name, parent):
self._name = name
self._parent = parent
@property
def name(self):
return self._name
@property
def parent(self):
return self._parent
class DBusObject(DBusNode):
"""object to represent a DBus Object """
def __init__(self, name, parent_service):
DBusNode.__init__(self, name, parent_service)
self._interfaces = []
@property
def interfaces(self):
return self._interfaces
class DBusInterface(DBusNode):
"""object to represent a DBus Interface"""
def __init__(self, name, parent_obj):
DBusNode.__init__(self, name, parent_obj)
self._properties = []
self._methods = []
self._signals = []
parent_obj.interfaces.append(self)
@property
def properties(self):
return self._properties
@property
def methods(self):
return self._methods
@property
def signals(self):
return self._signals
class DBusProperty(DBusNode):
"""object to represent a DBus Property"""
def __init__(self, name, parent_iface, typ = 'unknown', access = 'unknown'):
DBusNode.__init__(self, name, parent_iface)
parent_iface.properties.append(self)
self._type = typ
self._access = access
self._value = None
def fetch_value(self):
named_service = self.parent.parent.parent
object_path = self.parent.parent.name
iface_name = self.parent.name
obj = bus.get_object(named_service, object_path)
iface = dbus.Interface(obj, "org.freedesktop.DBus.Properties")
self._value = iface.Get(iface_name, self.name)
@property
def value(self):
return self._value
@property
def type(self):
return self._type
@property
def access(self):
return self._access
class DBusMethod(DBusNode):
"""object to represent a DBus Method"""
def __init__(self, name, parent_iface):
DBusNode.__init__(self, name, parent_iface)
parent_iface.methods.append(self)
self._params = []
self._returns = []
@property
def params(self):
return self._params
@property
def params_str(self):
return ', '.join([(ty+' '+name).strip() for name, ty in self._params])
@property
def returns(self):
return self._returns
@property
def returns_str(self):
return ', '.join([(ty+' '+name).strip() for name, ty in self._returns])
class DBusSignal(DBusNode):
"""object to represent a DBus Signal"""
def __init__(self, name, parent_iface):
DBusNode.__init__(self, name, parent_iface)
parent_iface.signals.append(self)
self._params = []
@property
def params(self):
return self._params
@property
def params_str(self):
return ', '.join([(ty+' '+name).strip() for name, ty in self._params])
### Introspect a named service and return a list of DBusObjects
def recursive_introspect(bus, named_service, object_path, ret_data=None):
# first recursion, create an empty list
if ret_data is None:
ret_data = []
# parse the xml string from the Introspectable interface
obj = bus.get_object(named_service, object_path)
iface = dbus.Interface(obj, 'org.freedesktop.DBus.Introspectable')
xml_data = iface.Introspect()
xml_root = ElementTree.fromstring(xml_data)
# debug
# print('=' * 80)
# print("Introspecting path:'%s' on service:'%s'" % (object_path, named_service))
# print('=' * 80)
# print(xml_data)
# print('=' * 80)
# traverse the xml tree
if xml_root.find('interface'):
# found a new object
obj = DBusObject(object_path, named_service)
ret_data.append(obj)
for xml_node in xml_root:
# found an interface
if xml_node.tag == 'interface':
iface = DBusInterface(xml_node.attrib['name'], obj)
for child in xml_node:
if child.tag == 'property':
typ = child.attrib['type']
access = child.attrib['access']
prop = DBusProperty(child.attrib['name'], iface, typ, access)
if child.tag == 'method':
meth = DBusMethod(child.attrib['name'], iface)
for arg in child:
if arg.tag == 'arg':
if arg.attrib['direction'] == 'out':
L = meth.returns
else:
L = meth.params
L.append((
arg.attrib['name'] if 'name' in arg.attrib else '',
arg.attrib['type'] if 'type' in arg.attrib else ''))
if child.tag == 'signal':
sig = DBusSignal(child.attrib['name'], iface)
for arg in child:
if arg.tag == 'arg':
sig.params.append((
arg.attrib['name'] if 'name' in arg.attrib else '',
arg.attrib['type'] if 'type' in arg.attrib else ''))
# found another node, introspect it...
if xml_node.tag == 'node':
if object_path == '/':
object_path = ''
new_path = '/'.join((object_path, xml_node.attrib['name']))
recursive_introspect(bus, named_service, new_path, ret_data)
return ret_data
### Names genlist (the one on the left)
class NamesListGroupItemClass(GenlistItemClass):
def __init__(self):
GenlistItemClass.__init__(self, item_style="group_index")
def text_get(self, gl, part, name):
return name
class NamesListItemClass(GenlistItemClass):
def __init__(self):
GenlistItemClass.__init__(self, item_style="default")
def text_get(self, gl, part, name):
return name
class NamesList(Genlist):
def __init__(self, parent):
self.win = parent
self.sig1 = None
# create the genlist
Genlist.__init__(self, parent)
self.itc = NamesListItemClass()
self.itc_g = NamesListGroupItemClass()
self.callback_selected_add(self.item_selected_cb)
# add private & public group items
self.public_group = self.item_append(self.itc_g, "Public Services",
flags=elm.ELM_GENLIST_ITEM_GROUP)
self.public_group.select_mode_set(elm.ELM_OBJECT_SELECT_MODE_DISPLAY_ONLY)
self.private_group = self.item_append(self.itc_g, "Private Services",
flags=elm.ELM_GENLIST_ITEM_GROUP)
self.private_group.select_mode_set(elm.ELM_OBJECT_SELECT_MODE_DISPLAY_ONLY)
# populate the genlist
self.populate()
def populate(self):
for name in bus.list_names():
self.service_add(name)
# keep the list updated when a name changes
if self.sig1: self.sig1.remove()
self.sig1 = bus.add_signal_receiver(self.name_owner_changed_cb,
"NameOwnerChanged")
# bus.add_signal_receiver(self.name_acquired_cb, "NameAcquired")
# bus.add_signal_receiver(self.name_lost_cb, "NameLost")
def clear(self):
self.public_group.subitems_clear()
self.private_group.subitems_clear()
def item_selected_cb(self, gl, item):
name = item.data
self.win.detail_list.populate(name)
def sort_cb(self, it1, it2):
return 1 if it1.data.lower() < it2.data.lower() else -1
def service_add(self, name):
print("service_add('%s')" % name)
if name.startswith(":"):
self.item_sorted_insert(self.itc, name, self.sort_cb,
self.private_group, 0, None)
else:
self.item_sorted_insert(self.itc, name, self.sort_cb,
self.public_group, 0, None)
def service_del(self, name):
print("service_del('%s')" % name)
item = self.first_item
while item:
if item.data == name:
item.delete()
return
item = item.next
def name_owner_changed_cb(self, name, old_owner, new_owner):
print("NameOwnerChanged(name='%s', old_owner='%s', new_owner='%s')" %
(name, old_owner, new_owner))
if old_owner == '':
self.service_add(name)
elif new_owner == '':
self.service_del(name)
### Detail genlist (the one on the right)
class ObjectItemClass(GenlistItemClass):
def __init__(self):
GenlistItemClass.__init__(self, item_style="group_index")
def text_get(self, gl, part, obj):
return '[OBJ] ' + obj.name
class NodeItemClass(GenlistItemClass):
def __init__(self):
GenlistItemClass.__init__(self, item_style="default")
def text_get(self, gl, part, obj):
if isinstance(obj, DBusInterface):
return '[IFACE] ' + obj.name
if isinstance(obj, DBusProperty):
if obj.value:
return '[PROP] %s %s (%s) = %s' % \
(obj.type, obj.name, obj.access, obj.value)
else:
return '[PROP] %s %s (%s)' % (obj.type, obj.name, obj.access)
if isinstance(obj, DBusMethod):
params = obj.params_str
rets = obj.returns_str
if rets:
return '𝆑 %s(%s) → (%s)' % (obj.name, params, rets)
else:
return '𝆑 %s(%s)' % (obj.name, params)
if isinstance(obj, DBusSignal):
return '[SIG] %s(%s)' % (obj.name, obj.params_str)
class DetailList(Genlist):
def __init__(self, parent):
Genlist.__init__(self, parent)
self._parent = parent
self.itc = NodeItemClass()
self.itc_g = ObjectItemClass()
self.callback_expand_request_add(self.expand_request_cb)
self.callback_expanded_add(self.expanded_cb)
self.callback_contract_request_add(self.contract_request_cb)
self.callback_contracted_add(self.contracted_cb)
self.callback_clicked_double_add(self.double_click_cb)
def populate(self, name):
print("populate: %s" % name)
self.clear()
# objects
for obj in recursive_introspect(bus, name, '/'):
obj_item = self.item_append(self.itc_g, obj,
flags=elm.ELM_GENLIST_ITEM_GROUP)
obj_item.select_mode_set(elm.ELM_OBJECT_SELECT_MODE_DISPLAY_ONLY)
# interfaces
for iface in obj.interfaces:
if options.hide_introspect_stuff and \
iface.name.startswith("org.freedesktop.DBus"):
continue
iface_item = self.item_append(self.itc, iface,
parent_item=obj_item,
flags=elm.ELM_GENLIST_ITEM_TREE)
def sort_cb(self, it1, it2):
pri1 = pri2 = 0
if isinstance(it1.data, DBusProperty): pri1 = 3
elif isinstance(it1.data, DBusMethod): pri1 = 2
elif isinstance(it1.data, DBusSignal): pri1 = 1
if isinstance(it2.data, DBusProperty): pri2 = 3
elif isinstance(it2.data, DBusMethod): pri2 = 2
elif isinstance(it2.data, DBusSignal): pri2 = 1
if pri1 > pri2: return 1
elif pri1 < pri2: return -1
return 1 if it1.data.name.lower() < it2.data.name.lower() else -1
def expand_request_cb(self, genlist, item):
item.expanded = True
def expanded_cb(self, genlist, item):
iface = item.data
for obj in iface.properties + iface.methods + iface.signals:
self.item_sorted_insert(self.itc, obj, self.sort_cb, parent_item=item)
def contract_request_cb(self, genlist, item):
item.expanded = False
def contracted_cb(self, genlist, item):
item.subitems_clear()
def double_click_cb(self, genlist, item):
if isinstance(item.data, DBusMethod):
MethodRunner(self._parent, item.data)
elif isinstance(item.data, DBusProperty):
item.data.fetch_value()
item.update()
### Methods runner
class MethodRunner(Popup):
def __init__(self, parent, method):
Popup.__init__(self, parent)
self._method = method
self._param_entry = None
self._return_entry = None
# title
self.part_text_set('title,text', 'Method: %s()' % method.name)
self.show()
# content is vbox
vbox = Box(parent)
vbox.show()
self.content = vbox
# params label + entry
if len(method.params) > 0:
label = Label(parent)
label.size_hint_align = 0.0, 0.5
label.text = 'Params: ' + method.params_str
label.show()
vbox.pack_end(label)
en = Entry(parent)
self._param_entry = en
en.editable = True
en.scrollable = True
en.single_line = True
en.entry = ''
en.size_hint_weight = evas.EVAS_HINT_EXPAND, evas.EVAS_HINT_EXPAND
en.size_hint_align = evas.EVAS_HINT_FILL, evas.EVAS_HINT_FILL
en.show()
vbox.pack_end(en)
sp = Separator(win)
sp.horizontal = True
sp.show()
vbox.pack_end(sp)
# returns label + entry
label = Label(parent)
label.size_hint_align = 0.0, 0.5
label.text = 'Returns: '
label.text += method.returns_str if method.returns_str else 'None'
label.show()
vbox.pack_end(label)
en = Entry(parent)
self._return_entry = en
en.size_hint_weight = evas.EVAS_HINT_EXPAND, evas.EVAS_HINT_EXPAND
en.size_hint_align = evas.EVAS_HINT_FILL, evas.EVAS_HINT_FILL
en.editable = False
en.scrollable = True
en.disabled = True
en.single_line = True # TODO this is wrong, but the only way to show the entry :/
en.entry = '<br> <br> <br>'
en.show()
vbox.pack_end(en)
# pretty print check button
def pretty_output_clicked_cb(chk):
options.pretty_output = chk.state
ch = Check(parent)
ch.size_hint_align = 0.0, 0.5
ch.text = "Prettify output (loosing type infos)"
ch.state = options.pretty_output
ch.callback_changed_add(pretty_output_clicked_cb)
ch.show()
vbox.pack_end(ch)
# popup buttons
btn = Button(parent)
btn.text = 'Close'
btn.callback_clicked_add(lambda b: self.delete())
self.part_content_set('button1', btn)
btn = Button(parent)
btn.text = 'Clear output'
btn.callback_clicked_add(lambda b: self._return_entry.entry_set(''))
self.part_content_set('button2', btn)
btn = Button(parent)
btn.text = 'Run method'
btn.callback_clicked_add(self.run_clicked_cb)
self.part_content_set('button3', btn)
def run_clicked_cb(self, btn):
# collect method infos
named_service = self._method.parent.parent.parent
object_path = self._method.parent.parent.name
iface_name = self._method.parent.name
method_name = self._method.name
if self._param_entry:
user_params = markup_to_utf8(self._param_entry.entry)
else:
user_params = None
# create the dbus proxy
obj = bus.get_object(named_service, object_path)
iface = dbus.Interface(obj, iface_name)
meth = iface.get_dbus_method(method_name)
# async method call # TODO make another example for this
try:
if user_params:
meth(eval(user_params),
reply_handler = self.reply_handler,
error_handler = self.error_handler)
else:
meth(reply_handler = self.reply_handler,
error_handler = self.error_handler)
except Exception as e:
s = "Error running method<br>Exception: "
self._return_entry.entry = s + utf8_to_markup(str(e))
# TODO find a way to catch errors after this point
# wrong params for example are raised later :/
def reply_handler(self, *rets):
if rets:
if options.pretty_output:
s = utf8_to_markup(json.dumps(rets, indent=2))
else:
s = utf8_to_markup(str(rets))
else:
s = "Method executed successfully.<br>Nothing returned."
self._return_entry.entry = s
def error_handler(self, *rets):
self._return_entry.entry = 'Error executing method'
### The main window
class MyWin(StandardWindow):
def __init__(self):
StandardWindow.__init__(self, "efl-dbus-spy", "EFL DBus Spy - Espionage")
self.autodel_set(True)
self.callback_delete_request_add(lambda o: elm.exit())
bg = Background(self)
self.resize_object_add(bg)
bg.size_hint_weight_set(evas.EVAS_HINT_EXPAND, evas.EVAS_HINT_EXPAND)
bg.show()
box = Box(self)
self.resize_object_add(box)
box.size_hint_weight_set(evas.EVAS_HINT_EXPAND, evas.EVAS_HINT_EXPAND)
box.show()
flip = FlipSelector(self)
flip.item_append("Session Bus", self.flip_selected_cb, session_bus)
flip.item_append("System Bus", self.flip_selected_cb, system_bus)
box.pack_end(flip)
flip.show()
panes = Panes(self)
panes.size_hint_weight = (evas.EVAS_HINT_EXPAND, evas.EVAS_HINT_EXPAND)
panes.size_hint_align = (evas.EVAS_HINT_FILL, evas.EVAS_HINT_FILL)
# panes.content_left_size = 0.333
# panes.fixed = True
box.pack_end(panes)
panes.show()
self.names_list = NamesList(self)
panes.part_content_set("left", self.names_list)
self.names_list.show()
self.detail_list = DetailList(self)
panes.part_content_set("right", self.detail_list)
self.detail_list.show()
self.resize(700, 500)
self.show()
def flip_selected_cb(self, flipselector, item, selected_bus):
global bus
bus = selected_bus
self.detail_list.clear()
self.names_list.clear()
self.names_list.populate()
if __name__ == "__main__":
elm.init()
win = MyWin()
elm.run()
elm.shutdown()