You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1994 lines
67 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
# TODO:
# - IPv6
# - Proxy
# - Fix connman's PropertyChanged when changing off/manual -> dhcp,
# gateway is not updated.
import sys
import dbus
import dbus.service
import logging
import argparse
import os.path
try:
import efl.evas as evas
import efl.ecore as ecore
import efl.edje as edje # Class resolve hack for edje_get
from efl.dbus_mainloop import DBusEcoreMainLoop
import efl.elementary as elm
from efl.elementary import ELM_POLICY_QUIT, \
ELM_POLICY_QUIT_LAST_WINDOW_CLOSED
from efl.elementary.window import Window, ELM_WIN_BASIC, \
ELM_WIN_DIALOG_BASIC
from efl.elementary.background import Background
from efl.elementary.box import Box
from efl.elementary.label import Label
from efl.elementary.naviframe import Naviframe
from efl.elementary.popup import Popup
from efl.elementary.button import Button
from efl.elementary.scroller import Scroller, ELM_SCROLLER_POLICY_OFF, \
ELM_SCROLLER_POLICY_AUTO
from efl.elementary.check import Check
from efl.elementary.progressbar import Progressbar
from efl.elementary.genlist import Genlist, GenlistItemClass
from efl.elementary.segment_control import SegmentControl
from efl.elementary.frame import Frame
from efl.elementary.entry import Entry
from efl.elementary.icon import Icon
from efl.elementary.layout import Layout
from efl.elementary.theme import Theme
except:
import elementary as elm
import evas
import ecore
import edje # Class resolve hack for edje_get
from e_dbus import DBusEcoreMainLoop
from elementary import Window, Background, Box, Label, Naviframe, Popup, \
Button, Scroller, Check, Progressbar, Genlist, GenlistItemClass, \
SegmentControl, Frame, Entry, Icon, Layout, Theme, ELM_WIN_BASIC, \
ELM_WIN_DIALOG_BASIC, ELM_POLICY_QUIT, ELM_SCROLLER_POLICY_OFF, \
ELM_SCROLLER_POLICY_AUTO, ELM_POLICY_QUIT_LAST_WINDOW_CLOSED
dbus_ml = DBusEcoreMainLoop()
bus = dbus.SystemBus(mainloop=dbus_ml)
log = logging.getLogger("econnman-bin")
log_handler = logging.StreamHandler()
log_formatter = logging.Formatter(
"%(created)d %(name)s [%(levelname)s]:: %(message)s (@lineno %(lineno)d)"
)
log_handler.setFormatter(log_formatter)
log.addHandler(log_handler)
manager = None
EXPAND_BOTH = (evas.EVAS_HINT_EXPAND, evas.EVAS_HINT_EXPAND)
EXPAND_HORIZ = (evas.EVAS_HINT_EXPAND, 0.0)
FILL_BOTH = (evas.EVAS_HINT_FILL, evas.EVAS_HINT_FILL)
# python2 backwards compatibility
try:
from configparser import SafeConfigParser
except ImportError:
from ConfigParser import SafeConfigParser
class PNACConfig(SafeConfigParser):
"""A custom config parser for IEEE802.1x (PNAC)
Section names are prefixed with service_
"""
CONF_FILE = "/var/lib/connman/econnman.config"
def __init__(self):
SafeConfigParser.__init__(self)
self.optionxform = str
def read(self):
args = self.CONF_FILE, 'r'
kwargs = {}
if sys.hexversion >= 0x03000000:
kwargs["encoding"] = 'utf8'
try:
with open(*args, **kwargs) as fd:
self.readfp(fd)
except IOError:
log.error(
"Econnman cannot read the configuration file \"%s\" used by "
"connman to configure your ieee802.1x networks. Make sure the "
"user running econnman is able to read/write it.",
self.CONF_FILE
)
#raise
# defaults()
# sections()
def add_section(self, service_name):
secname = 'service_' + service_name
SafeConfigParser.add_section(self, secname)
self.set(service_name, 'Type', 'wifi')
self.set(service_name, 'Name', service_name)
#self.write()
def has_section(self, service_name): # config_exists
return bool(self.section_get(service_name))
# options()
def has_option(self, service_name, key):
secname = self.section_get(service_name)
return SafeConfigParser.has_option(secname, key)
# read()
# readfp()
def get(self, service_name, key): # config_option_get
secname = self.section_get(service_name)
if self.has_option(service_name, key):
return SafeConfigParser.get(self, secname, key)
return None
# getint()
# getfloat()
# getboolean()
# items()
def set(self, service_name, key, value): # config_set
secname = self.section_get(service_name)
if not self.has_section(service_name):
self.add_section(service_name)
if value is not None:
SafeConfigParser.set(self, secname, key, value)
elif self.has_option(secname, key):
self.remove_option(secname, key)
#self.write()
def write(self):
# TODO: Copy owner and mask from existing config file, write to a temp
# file and request overwriting with sudo or polkit, then set the
# owner and mask like it was before. This is to avoid the
# requirement of running the entire econnman instance with root
# rights.
try:
with open(self.CONF_FILE, 'w', encoding='utf8') as configfile:
SafeConfigParser.write(self, configfile)
except IOError:
log.error(
"Econnman cannot write to the configuration file \"%s\" used "
"by connman to configure your ieee802.1x networks. Make sure "
"the user running econnman is able to read/write it.",
self.CONF_FILE
)
def remove_option(self, service_name, key):
secname = self.section_get(service_name)
SafeConfigParser.remove_option(self, secname, key)
#self.write()
def remove_section(self, service_name): # config_del
secname = self.section_get(service_name)
ret = SafeConfigParser.remove_section(self, secname)
#self.write()
return ret
def section_get(self, service_name): # config_get
#secname = 'service_' + service_name
for sec in self.sections():
if self.has_option(sec, 'Name') and \
self.get(sec, 'Name') == service_name:
return sec
else:
return None
########################################################################
# Debug helpers:
def dbus_variant_to_str(v):
if isinstance(v, dbus.String):
v = '"%s"' % (str(v),)
elif isinstance(v, dbus.Boolean):
v = str(bool(v))
elif isinstance(v, (dbus.Dictionary, dbus.Struct)):
v = "{%s}" % (dbus_dict_to_str(v),)
elif isinstance(v, dbus.Array):
v = "[%s]" % (dbus_array_to_str(v),)
elif isinstance(v, dbus.ObjectPath):
v = str(v)
elif isinstance(v, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
dbus.UInt16, dbus.UInt32, dbus.UInt64)):
v = int(v)
elif isinstance(v, dbus.Double):
v = float(v)
else:
v = repr(v)
return v
def dbus_dict_to_str(d):
"""Help debug by converting a dbus.Dictionary to a string in a shorter
form.
"""
s = []
for k, v in d.items():
s.append("%s=%s" % (k, dbus_variant_to_str(v)))
return ", ".join(s)
def dbus_array_to_str(a):
"""Help debug by converting a complex structure to a string in shorter
form.
"""
return ", ".join(dbus_variant_to_str(x) for x in a)
def dbus_array_of_dict_to_str(a):
"""Help debug by converting a complex structure to a string in a
shorter form with only the keys, not the value.
"""
s = []
for k, v in a:
s.append(str(k))
return ", ".join(s)
class ObjectView(object):
"""Base for viewing a complex object.
Implementors must set:
- bus_interface: to assign to self.bus_obj
- create_view(properties): to create the specific view widgets
- on_property_changed(name, value): to update view widgets
Provided automatically by this class:
- path: object path
- bus_obj: proxy object with specific interface to remote bus object
- obj: main toplevel view object
- box: main toplevel view box
"""
bus_interface = None
def __init__(self, parent, path, properties):
self.path = path
self.bus_obj = dbus.Interface(bus.get_object("net.connman", path),
self.bus_interface)
self.sig_ch = self.bus_obj.connect_to_signal("PropertyChanged",
self.on_property_changed)
self.obj = Scroller(parent)
self.obj.on_del_add(self._deleted)
self.obj.size_hint_weight = EXPAND_BOTH
self.obj.policy_set(ELM_SCROLLER_POLICY_OFF,
ELM_SCROLLER_POLICY_AUTO)
self.obj.bounce_set(False, True)
self.obj.content_min_limit(True, False)
self.box = Box(self.obj)
self.box.size_hint_weight = EXPAND_HORIZ
self.box.horizontal = False
self.create_view(properties)
self.obj.content = self.box
for k in properties:
self.on_property_changed(k, properties[k])
def _deleted(self, obj):
log.debug("View deleted %s (%s)", self.__class__.__name__, self.path)
self.sig_ch.remove()
self.bus_obj = None
self.sig_ch = None
self.obj = None
def create_view(self, properties):
log.critical("must be implemented!")
pass
def on_property_changed(self, name, value):
log.critical("must be implemented!")
def add_check(self, box, label, callback=None):
obj = Check(box)
obj.size_hint_weight = EXPAND_HORIZ
obj.size_hint_align = FILL_BOTH
obj.text = label
obj.show()
box.pack_end(obj)
if callback:
obj.callback_changed_add(callback)
return obj
def add_button(self, box, label, callback):
obj = Button(box)
obj.size_hint_weight = EXPAND_HORIZ
obj.size_hint_align = FILL_BOTH
obj.text = label
obj.show()
obj.callback_clicked_add(callback)
box.pack_end(obj)
return obj
def add_label(self, box, label):
lb = Label(box)
lb.size_hint_weight = EXPAND_HORIZ
lb.size_hint_align = FILL_BOTH
lb.text = label
lb.show()
box.pack_end(lb)
return lb
def add_progress(self, box, label):
pb = Progressbar(box)
pb.size_hint_weight = EXPAND_HORIZ
pb.size_hint_align = FILL_BOTH
pb.text = label
pb.show()
box.pack_end(pb)
return pb
def add_segment_control(self, box, options, callback):
sc = SegmentControl(box)
sc.size_hint_weight = EXPAND_HORIZ
sc.size_hint_align = FILL_BOTH
items = {}
for o in options:
items[o] = sc.item_add(None, o)
sc.show()
box.pack_end(sc)
sc.callback_changed_add(callback)
return sc, items
def add_label_and_segment_control(self, box, options, callback, label):
lb = self.add_label(box, label)
sc = SegmentControl(box)
sc.size_hint_weight = EXPAND_HORIZ
sc.size_hint_align = FILL_BOTH
items = {}
for o in options:
items[o] = sc.item_add(None, o)
sc.show()
box.pack_end(sc)
sc.callback_changed_add(callback)
return lb, sc, items
def add_frame_and_box(self, box, label):
fr = Frame(box)
fr.size_hint_weight = EXPAND_HORIZ
fr.size_hint_align = FILL_BOTH
fr.text = label
fr.show()
box.pack_end(fr)
bx = Box(fr)
bx.size_hint_weight = EXPAND_HORIZ
bx.size_hint_align = FILL_BOTH
bx.horizontal = False
bx.show()
fr.content = bx
return fr, bx
def add_label_and_entry(self, box, label, callback=None):
lb = self.add_label(box, label)
en = Entry(box)
en.size_hint_weight = EXPAND_HORIZ
en.size_hint_align = FILL_BOTH
en.single_line = True
en.scrollable = True
en.show()
box.pack_end(en)
if callback:
en.callback_activated_add(callback)
return lb, en
########################################################################
# Views:
class OfflineModeMonitor(object):
"""Monitors the Manager's OfflineMode property as a Toggle.
The toggle reflects the server state but can be changed by the
user to set the property remotely.
"""
def __init__(self, win):
self.obj = Check(win)
self.obj.style = "toggle"
self.obj.part_text_set("on", "Offline")
self.obj.part_text_set("off", "Online")
self.obj.callback_changed_add(self._on_user_changed)
self.obj.on_del_add(self._deleted)
def on_reply(properties):
for name, value in properties.items():
log.debug("property %s: %s", name, value)
self._property_changed(name, value)
def on_error(exc):
popup_fatal(win, "Failed to get ConnMan Properties", str(exc))
manager.GetProperties(reply_handler=on_reply,
error_handler=on_error)
self.sig_ch = manager.connect_to_signal("PropertyChanged",
self._property_changed)
def _deleted(self, obj):
self.sig_ch.remove()
self.obj = None
self.sig_ch = None
def _property_changed(self, name, value):
log.debug("property %s: %s", name, value)
if name == "OfflineMode":
self.obj.state = bool(value)
def _on_user_changed(self, obj):
state = obj.state
def on_reply():
log.info("Set OfflineMode=%s", state)
def on_error(exc):
log.error("Failed to set OfflineMode=%s: %s", state, exc)
obj.state = not state
popup_error(self.obj, "Failed to Apply Offline Mode",
exc.get_dbus_message())
manager.SetProperty("OfflineMode", dbus.Boolean(state),
reply_handler=on_reply, error_handler=on_error)
class TechList(object):
"""Provides a Genlist with the Technologies supported.
It will call manager's GetTechnologies() and then keep it updated
with TechnologyAdded and TechnologyRemoved signals, as well as the
technologies properties with PropertyChanged.
Selecting an item will call C{on_selected(path, tech_properties)}.
"""
def __init__(self, parent, on_selected=None):
self.techs = {}
self.items = {}
self.obj = Genlist(parent)
self.obj.on_del_add(self._deleted)
self.on_selected = on_selected
self.obj.callback_selected_add(self._tech_selected)
self.sig_added = manager.connect_to_signal(
"TechnologyAdded",
self._tech_added
)
self.sig_removed = manager.connect_to_signal(
"TechnologyRemoved",
self._tech_removed
)
self.sig_propch = bus.add_signal_receiver(
self._tech_changed,
"PropertyChanged",
"net.connman.Technology",
"net.connman",
path_keyword='path'
)
self.itc = GenlistItemClass(
item_style="default",
text_get_func=self._item_text_get,
content_get_func=self._item_content_get
)
manager.GetTechnologies(
reply_handler=self._get_techs_reply,
error_handler=self._get_techs_error
)
def _deleted(self, lst):
self.sig_added.remove()
self.sig_removed.remove()
self.sig_propch.remove()
self.obj = None
self.sig_added = None
self.sig_removed = None
self.sig_propch = None
self.techs.clear()
self.items.clear()
def _get_techs_reply(self, techs):
log.debug("Got technologies: %s", dbus_array_of_dict_to_str(techs))
for path, properties in techs:
self._tech_added(path, properties)
def _get_techs_error(self, exc):
log.error("Failed to GetTechnologies(): %s", exc)
popup_error(self.obj, "Failed to get Technologies",
exc.get_dbus_message())
def _tech_added(self, path, properties):
path = str(path)
log.debug("Added %s: %s", path, dbus_dict_to_str(properties))
self.techs[path] = properties
self.items[path] = self.obj.item_append(self.itc, path)
def _tech_changed(self, name, value, path):
path = str(path)
log.debug("Changed %s: %s=%s", path, name, value)
t = self.techs.get(path)
if not t:
return
t[name] = value
it = self.items.get(path)
if not it:
return
it.update()
def _tech_removed(self, path):
path = str(path)
log.debug("Removed %s", path)
try:
del self.techs[path]
except KeyError:
pass
try:
it = self.items.pop(path)
it.delete()
except KeyError:
pass
def _tech_selected(self, lst, item):
item.selected = False
if not self.on_selected:
return
path = item.data
t = self.techs.get(path)
if t:
self.on_selected(path, t)
def _item_text_get(self, obj, part, item_data):
if part != "elm.text":
return None
t = self.techs.get(item_data)
if not t:
return "Unknown"
return t.get("Name", item_data[len("/net/connman/technology/"):])
def _item_content_get(self, obj, part, item_data):
if part == "elm.swallow.end":
ic = Icon(obj)
ic.standard = "arrow_right"
return ic
if part != "elm.swallow.icon":
return
t = self.techs.get(item_data)
if not t:
return None
ic = Icon(obj)
if t.get("Connected", False):
ic.standard = "connman-tech-connected"
elif t.get("Powered", False):
ic.standard = "connman-tech-powered"
else:
ic.standard = "connman-tech-offline"
return ic
class TechView(ObjectView):
"""Provides a detailed view of the technology given by C{path}.
The C{properties} argument is used to populate the current state,
which will be updated with net.connman.Technology.PropertyChanged
signal that it will listen.
User updates will be automatically applied to the server.
"""
bus_interface = "net.connman.Technology"
def create_view(self, properties):
self.powered = self.add_check(self.box, "Powered",
self._on_user_powered)
self.connected = self.add_check(self.box, "Connected")
self.connected.disabled = True
self.scan = self.add_button(self.box, "Scan", self._scan)
fr, bx = self.add_frame_and_box(self.box, "Tethering")
self.tethering = self.add_check(bx, "Enabled",
self._on_user_tethering)
lb, self.identifier = self.add_label_and_entry(bx, "Identifier:")
lb, self.passphrase = self.add_label_and_entry(bx, "Passphrase:")
self.tethering_apply = self.add_button(bx, "Apply Tethering",
self._tethering_apply)
def _on_user_powered(self, obj):
state = bool(self.powered.state)
def on_reply():
log.info("Set %s Powered=%s", self.path, state)
def on_error(exc):
log.error("Could not set %s Powered=%s: %s", self.path, state, exc)
obj.state = not state
popup_error(self.obj, "Failed to Apply Powered",
exc.get_dbus_message())
self.bus_obj.SetProperty(
"Powered", dbus.Boolean(state),
reply_handler=on_reply, error_handler=on_error
)
def _scan(self, obj):
def on_reply():
log.debug("Scanned %s", self.path)
self.scan.disabled = False
self.scan.text = "Scan"
def on_error(exc):
log.error("Could not scan %s", exc)
self.scan.disabled = False
self.scan.text = "Scan"
self.bus_obj.Scan(reply_handler=on_reply, error_handler=on_error)
self.scan.disabled = True
self.scan.text = "Scanning..."
def _on_user_tethering(self, obj):
state = bool(obj.state)
self.identifier.disabled = not state
self.passphrase.disabled = not state
def _tethering_apply(self, obj):
self.to_apply = [
("TetheringIdentifier", self.identifier.text),
("TetheringPassphrase", self.passphrase.text),
("Tethering", dbus.Boolean(self.tethering.state)),
]
def apply_next():
if not self.to_apply:
return
name, value = self.to_apply.pop(0)
self.bus_obj.SetProperty(name, value,
reply_handler=on_reply,
error_handler=on_error)
def on_reply():
log.debug("Applied tethering %s", self.path)
self.tethering_apply.disabled = False
self.tethering_apply.text = "Apply Tethering"
apply_next()
def on_error(exc):
log.error("Could not apply tethering %s", exc)
self.tethering_apply.disabled = False
self.tethering_apply.text = "Apply Tethering"
popup_error(self.obj, "Failed to Apply Tethering",
exc.get_dbus_message())
apply_next()
self.tethering_apply.disabled = True
self.tethering_apply.text = "Applying Tethering..."
def on_property_changed(self, name, value):
log.debug("Changed %s: %s=%s", self.path, name, value)
if name == "Powered":
self.powered.state = bool(value)
elif name == "Connected":
self.connected.state = bool(value)
elif name == "Tethering":
state = bool(value)
self.tethering.state = state
self.identifier.disabled = not state
self.passphrase.disabled = not state
elif name == "TetheringIdentifier":
self.identifier.text = str(value)
elif name == "TetheringPassphrase":
self.passphrase.text = str(value)
class ServicesList(object):
"""Provides a Genlist with the known Services.
It will call manager's GetServices() and then keep it updated with
ServicesChanged signal.
Selecting an item will call C{on_selected(path, service_properties)}.
"""
def __init__(self, parent, on_selected=None, on_disclosure=None):
self.services = {}
self.items = {}
self.obj = Genlist(parent)
self.on_selected = on_selected
self.on_disclosure = on_disclosure
self.obj.callback_selected_add(self._item_selected)
self.obj.on_del_add(self._deleted)
self.sig_ch = manager.connect_to_signal(
"ServicesChanged",
self._services_changed
)
self.sig_propch = bus.add_signal_receiver(
self._service_prop_changed,
"PropertyChanged",
"net.connman.Service",
"net.connman",
path_keyword='path'
)
manager.GetServices(
reply_handler=self._get_services_reply,
error_handler=self._get_services_error
)
self.itc = GenlistItemClass(
item_style="default",
text_get_func=self._item_text_get,
content_get_func=self._item_content_get
)
def _deleted(self, obj):
self.sig_ch.remove()
self.obj = None
self.sig_ch = None
self.services.clear()
self.items.clear()
def _items_repopulate(self, paths):
for path in paths:
self.items[path] = self.obj.item_append(self.itc, path)
def _get_services_reply(self, services):
log.debug("Got services: %s", dbus_array_of_dict_to_str(services))
for path, properties in services:
self._service_added(path, properties)
self._items_repopulate(str(path) for path, properties in services)
def _get_services_error(self, exc):
log.critical("Failed to GetServices(): %s", exc)
popup_fatal(self.obj, "Failed to get Services",
exc.get_dbus_message())
def _service_added(self, path, properties):
log.debug("Added %s: %s", path, dbus_dict_to_str(properties))
self.services[path] = properties
def _service_prop_changed(self, name, value, path):
path = str(path)
log.debug("Changed %s: %s=%s", path, name, value)
s = self.services.get(path)
if not s:
return
s[name] = value
it = self.items.get(path)
if not it:
return
it.update()
def _service_changed(self, path, properties):
log.debug("Changed %s: %s", path, dbus_dict_to_str(properties))
d = self.services[path]
for k, v in properties.items():
d[k] = v
def _services_changed(self, changed, removed):
log.debug("Changed: %s, Removed: %s",
dbus_array_of_dict_to_str(changed),
removed)
self.items.clear()
self.obj.clear()
for path in removed:
self._service_removed(path)
for path, properties in changed:
path = str(path)
if path in self.services:
self._service_changed(path, properties)
else:
self._service_added(path, properties)
self._items_repopulate(str(path) for path, properties in changed)
def _service_removed(self, path):
path = str(path)
log.debug("Removed %s", path)
try:
del self.services[path]
except KeyError:
pass
def _item_selected(self, lst, item):
item.selected = False
if not self.on_selected:
return
path = item.data
s = self.services.get(path)
if s:
self.on_selected(path, s)
def _item_disclosure(self, bt, path):
if not self.on_disclosure:
return
s = self.services.get(path)
if s:
self.on_disclosure(path, s)
def _item_text_get(self, obj, part, item_data):
if part != "elm.text":
return None
t = self.services.get(item_data)
if not t:
return "Unknown"
return t.get("Name", item_data[len("/net/connman/service/"):])
def _item_content_get(self, obj, part, item_data):
s = self.services.get(item_data)
if not s:
return None
type = s.get("Type")
state = s.get("State")
error = s.get("Error")
security = s.get("Security")
strength = s.get("Strength")
favorite = s.get("Favorite")
roaming = s.get("Roaming")
auto_connect = s.get("AutoConnect")
connected = (str(state) not in ("idle", "failure"))
if security:
security = [str(x) for x in security]
if "none" in security:
security.remove("none")
if part == "elm.swallow.end":
bx = Box(obj)
bx.horizontal = True
bx.homogeneous = True
bx.padding = (2, 0)
bx.align = (1.0, 0.5)
if connected:
ic = Icon(obj)
ic.standard = "connman-connected"
ic.size_hint_min = ic.size_hint_max = (32, 32)
ic.show()
bx.pack_end(ic)
if security and favorite:
ic = Icon(obj)
ic.standard = "connman-security-favorite"
ic.size_hint_min = ic.size_hint_max = (32, 32)
ic.show()
bx.pack_end(ic)
elif security:
ic = Icon(obj)
ic.standard = "connman-security"
ic.size_hint_min = ic.size_hint_max = (32, 32)
ic.show()
bx.pack_end(ic)
ic = Icon(obj)
ic.standard = "arrow_right"
bt = Button(obj)
bt.content = ic
bt.callback_clicked_add(self._item_disclosure, item_data)
bt.propagate_events = False
bt.show()
bt.size_hint_min = bt.size_hint_max = (32, 32)
bx.pack_end(bt)
return bx
if part != "elm.swallow.icon":
return
ly = Layout(obj)
ly.theme_set("icon", type, "default")
ly.size_hint_min_set(32, 32)
def yesno(val):
return ("no", "yes")[bool(val)]
def ornone(val):
return val or "none"
ly.signal_emit("elm,state," + state, "elm")
ly.signal_emit("elm,error," + ornone(error), "elm")
ly.signal_emit("elm,favorite," + yesno(favorite), "elm")
ly.signal_emit("elm,roaming," + yesno(roaming), "elm")
ly.signal_emit("elm,auto_connect," + yesno(auto_connect), "elm")
ly.signal_emit("elm,connected," + yesno(connected), "elm")
for s in security:
ly.signal_emit("elm,security," + s, "elm")
if security:
ly.signal_emit("elm,security,yes", "elm")
else:
ly.signal_emit("elm,security,none", "elm")
if strength:
ly.edje.message_send(1, strength)
return ly
def service_name_get(self, path):
s = self.services.get(path)
if not s:
return None
n = s.get("Name")
if not n:
return None
return str(n)
class ServiceView(ObjectView):
"""Provides a detailed view of the service given by C{path}.
The C{properties} argument is used to populate the current state,
which will be updated with net.connman.Service.PropertyChanged
signal that it will listen.
User updates will be automatically applied to the server.
"""
bus_interface = "net.connman.Service"
ipv4_fields = (#("Method", "ipv4_method"),
("Address", "ipv4_address"),
("Netmask", "ipv4_netmask"),
("Gateway", "ipv4_gateway"),
)
ipv6_fields = (#("Method", "ipv6_method"),
("Address", "ipv6_address"),
("Prefix Length", "ipv6_prefix_length"),
("Gateway", "ipv6_gateway"),
#("Privacy", "ipv6_privacy"),
)
proxy_fields = (("Method", "proxy_method"),
("URL", "proxy_url"),
("Servers", "proxy_servers"),
("Excludes", "proxy_excludes"),
)
vpn_fields = ( # named Provider in spec
("Host", "vpn_host"),
("Domain", "vpn_domain"),
("Name", "vpn_name"),
("Type", "vpn_type"),
)
eth_fields = (("Method", "eth_method"),
("Interface", "eth_iface"),
("Address", "eth_addr"),
("MTU", "eth_mtu"),
("Speed", "eth_speed"),
("Duplex", "eth_duplex"),
)
top_widgets = (
"connect",
"disconnect",
"forget",
"error",
"auto_connect",
"roaming",
"strength",
"security",
"state",
"nameservers_label",
"nameservers_entry",
"timeservers_label",
"timeservers_entry",
"domains_label",
"domains_entry",
"ipv4_frame",
"ipv6_frame",
"proxy_frame",
"ethernet_frame",
"vpn_frame",
"ieee8021x_frame",
)
def create_view(self, properties):
self.type = str(properties.get("Type"))
self.security_mode = properties.get("Security")
self.immutable = bool(properties.get("Immutable"))
self.readwrite_list_properties = {}
self.readwrite_list_widget = {}
self.name = str(properties.get("Name"))
self.connect = self.add_button(self.box, "Connect", self._connect)
self.disconnect = self.add_button(self.box, "Disconnect",
self._disconnect)
if not self.immutable and self.type != "ethernet":
self.forget = self.add_button(self.box, "Forget Network",
self._forget)
elif self.type == "wifi" and "ieee8021x" in self.security_mode:
pnac_conf.read()
self.forget = self.add_button(self.box, "Forget Network",
self._forget)
if not pnac_conf.has_section(self.name):
self.forget.disabled = True
self.error = self.add_label(self.box, "error here")
self.auto_connect = self.add_check(self.box, "Auto connect",
self._on_user_auto_connect)
if self.type == "cellular":
self.roaming = self.add_check(self.box, "Roaming")
self.roaming.disabled = True
if properties.get("Strength") is not None:
self.strength = self.add_progress(self.box, "Strength:")
if self.type == "wifi":
self.security = self.add_label(self.box, "Security:")
self.state = self.add_label(self.box, properties.get("State"))
lb, en = self.add_readwrite_list("Name Servers:", "Nameservers",
properties)
self.nameservers_label = lb
self.nameservers_entry = en
lb, en = self.add_readwrite_list("Time Servers:", "Timeservers",
properties)
self.timeservers_label = lb
self.timeservers_entry = en
lb, en = self.add_readwrite_list("Domain Names:", "Domains",
properties)
self.domains_label = lb
self.domains_entry = en
# section: IPv4
self.ipv4_properties = {"IPv4": {}, "IPv4.Configuration": {}}
fr, bx = self.add_frame_and_box(self.box, "IPv4")
self.ipv4_frame = fr
self.ipv4_box = bx
options = ("Automatic", "Manual", "Off")
self.ipv4_method, self.ipv4_method_items = self.add_segment_control(
bx, options, self._on_ipv4_method)
for name, attr in self.ipv4_fields:
lb, en = self.add_label_and_entry(bx, name)
en.callback_activated_add(self._on_ipv4_property_changed)
en.callback_unfocused_add(self._on_ipv4_property_unfocused)
setattr(self, attr, en)
# section: IPv6
self.ipv6_properties = {"IPv6": {}, "IPv6.Configuration": {}}
fr, bx = self.add_frame_and_box(self.box, "IPv6")
self.ipv6_frame = fr
self.ipv6_box = bx
options = ("Automatic", "Manual", "Off")
self.ipv6_method, self.ipv6_method_items = self.add_segment_control(
bx, options, self._on_ipv6_method)
for name, attr in self.ipv6_fields:
lb, en = self.add_label_and_entry(bx, name)
en.callback_activated_add(self._on_ipv6_property_changed)
en.callback_unfocused_add(self._on_ipv6_property_unfocused)
setattr(self, attr, en)
options = ("Disabled", "Enabled", "Prefered")
self.ipv6_privacy_lb, self.ipv6_privacy, self.ipv6_privacy_items = self.add_label_and_segment_control(
bx, options, self._on_ipv6_privacy, "Privacy")
# section: Proxy: custom contents for direct, auto and manual
# - direct: nothing
# - auto: url
# - manual: servers, excludes
self.proxy_properties = {"Proxy": {}, "Proxy.Configuration": {}}
fr, bx = self.add_frame_and_box(self.box, "Proxy")
self.proxy_frame = fr
self.proxy_box = bx
options = ("Direct", "Automatic", "Manual")
self.proxy_method, self.proxy_method_items = self.add_segment_control(
bx, options, self._on_proxy_method)
self.add_label(bx, "TODO")
# section: Ethernet / VPN
if self.type in ("wifi", "ethernet", "wimax", "bluetooth", "cellular"):
fr, bx = self.add_readonly_section("Ethernet", self.eth_fields)
self.ethernet_frame = fr
elif self.type == "vpn":
fr, bx = self.add_readonly_section("VPN", self.vpn_fields)
self.vpn_frame = fr
# section: Two Phase Authentication
if (self.type == "wifi") and ("ieee8021x" in self.security_mode):
fr, bx = self.add_frame_and_box(self.box, "ieee8021x")
self.ieee8021x_frame = fr
#cfg_sec = pnac_conf.section_get(self.name)
lb = self.add_label(bx, "EAP:")
options = ("PEAP", "TLS", "TTLS", "None")
self.eap_method, self.eap_method_items = self.add_segment_control(
bx, options, self._on_eap_method
)
if pnac_conf.has_section(self.name):
conf_val = pnac_conf.get(self.name, 'EAP')
if conf_val == "peap":
self.eap_method_items["PEAP"].selected = True
elif conf_val == "tls":
self.eap_method_items["TLS"].selected = True
elif conf_val == "ttls":
self.eap_method_items["TTLS"].selected = True
elif conf_val is None:
self.eap_method_items["None"].selected = True
options = ("TLS", "MSCHAPv2", "None")
lb = self.add_label(bx, "Phase2:")
self.phase2, self.phase2_items = self.add_segment_control(
bx, options, self._on_phase2
)
if pnac_conf.has_section(self.name):
conf_val = pnac_conf.get(self.name, 'Phase2')
if conf_val == "tls":
self.phase2_items["TLS"].selected = True
elif conf_val == "MSCHAPV2":
self.phase2_items["MSCHAPv2"].selected = True
elif conf_val is None:
self.phase2_items["None"].selected = True
def add_readonly_section(self, title, fields):
fr, bx = self.add_frame_and_box(self.box, title)
for name, attr in fields:
lb, en = self.add_label_and_entry(bx, "%s:" % (name,))
en.editable = False
setattr(self, attr, en)
return fr, bx
def populate_fields(self, fields, value):
for n, a in fields:
v = value.get(n)
if v:
en = getattr(self, a)
en.text = str(v)
def _readwrite_list_conv(self, a):
if not a:
return ""
return ", ".join(str(x).strip() for x in a)
def add_readwrite_list(self, title, name, properties):
conf = "%s.Configuration" % (name,)
used_value = self._readwrite_list_conv(properties.get(name))
conf_value = self._readwrite_list_conv(properties.get(conf))
self.readwrite_list_properties[name] = used_value
self.readwrite_list_properties[conf] = conf_value
def on_changed(obj):
value = obj.text.strip()
orig_value = self.readwrite_list_properties[name]
conf_value = self.readwrite_list_properties[conf]
if (conf_value and value != conf_value) or \
(not conf_value and value != orig_value):
log.debug(
"User changed %s=%r (%r, %r)" % (
name, value,
orig_value, conf_value
)
)
value = value.strip()
if not value:
value_array = []
else:
value_array = list(x.strip() for x in value.split(","))
self._on_readwrite_changed(conf, value_array)
def on_unfocused(obj):
self.reload_readwrite_list(name)
lb, en = self.add_label_and_entry(self.box, title, on_changed)
en.callback_unfocused_add(on_unfocused)
self.readwrite_list_widget[name] = en
self.reload_readwrite_list(name)
return lb, en
def reload_readwrite_list(self, name):
used_value = self.readwrite_list_properties[name]
conf_value = self.readwrite_list_properties["%s.Configuration" % name]
en = self.readwrite_list_widget[name]
log.debug("%s=%r, %r", name, used_value, conf_value)
en.text = conf_value or used_value
def update_readwrite_list(self, name, value):
value = self._readwrite_list_conv(value)
if value == self.readwrite_list_properties[name]:
return
self.readwrite_list_properties[name] = value
if name.endswith(".Configuration"):
key = name[:-len(".Configuration")]
else:
key = name
self.reload_readwrite_list(key)
def on_property_changed(self, name, value):
log.debug("Changed %s: %s=%s", self.path, name, value)
visibility_changed = False
if name == "Type":
self.type = str(value)
elif name == "Immutable":
value = bool(value)
self.immutable = value
self.auto_connect.disabled = value
for w in self.readwrite_list_widget.values():
w.disabled = value
self.ipv4_method.disabled = value
self.ipv4_address.disabled = value
self.ipv4_netmask.disabled = value
self.ipv4_gateway.disabled = value
self.ipv6_method.disabled = value
self.ipv6_address.disabled = value
self.ipv6_prefix_length.disabled = value
self.ipv6_gateway.disabled = value
self.ipv6_privacy.disabled = value
self.proxy_method.disabled = value
elif name == "Favorite":
value = bool(value)
if hasattr(self, "forget"):
if self.forget.visible != value:
self.forget.visible = value
visibility_changed = True
elif name == "State":
value = str(value)
visible = (value == "failure")
self.state.text = "State: %s" % (value,)
if self.error.visible != visible:
self.error.visible = visible
visibility_changed = True
connected = (value not in ("idle", "failure"))
if self.disconnect.visible != connected:
self.disconnect.visible = connected
visibility_changed = True
if self.connect.visible == connected:
self.connect.visible = not connected
visibility_changed = True
elif name == "Error":
self.error.text = "Error: %s" % value
elif name == "AutoConnect":
self.auto_connect.state = bool(value)
elif name == "Strength":
self.strength.value = float(value) / 100.0
elif self.type == "wifi" and name == "Security":
s = ", ".join(str(x) for x in value)
self.security.text = "Security: %s" % (s,)
elif name == "Roaming":
self.roaming.text = str(value)
elif name == "Ethernet":
self.populate_fields(self.eth_fields, value)
elif name == "Provider":
self.populate_fields(self.vpn_fields, value)
elif name in ("IPv4", "IPv4.Configuration"):
self.ipv4_properties[name] = value
used = self.ipv4_properties["IPv4"]
conf = self.ipv4_properties["IPv4.Configuration"]
def get_val(name):
v = used.get(name) or conf.get(name)
if not v:
return ""
return str(v)
self.ipv4_address.text = get_val("Address")
self.ipv4_netmask.text = get_val("Netmask")
self.ipv4_gateway.text = get_val("Gateway")
method = str(conf.get("Method", ""))
editable = (method == "manual") and (not self.immutable)
self.ipv4_address.editable = editable
self.ipv4_netmask.editable = editable
self.ipv4_gateway.editable = editable
if method in ("dhcp", "fixed"):
self.ipv4_method_items["Automatic"].selected = True
elif method == "manual":
self.ipv4_method_items["Manual"].selected = True
elif method == "off":
self.ipv4_method_items["Off"].selected = True
elif method:
log.error("Unknown method: %s", method)
elif name in ("IPv6", "IPv6.Configuration"):
self.ipv6_properties[name] = value
used = self.ipv6_properties["IPv6"]
conf = self.ipv6_properties["IPv6.Configuration"]
def get_val(name):
v = used.get(name) or conf.get(name)
if not v:
return ""
return str(v)
self.ipv6_address.text = get_val("Address")
self.ipv6_prefix_length.text = get_val("PrefixLength")
self.ipv6_gateway.text = get_val("Gateway")
method = str(conf.get("Method", ""))
editable = (method == "manual") and (not self.immutable)
self.ipv6_address.editable = editable
self.ipv6_prefix_length.editable = editable
self.ipv6_gateway.editable = editable
# privacy has only meaning if Method is set to "auto"
editable = (method == "auto") and (not self.immutable)
self.ipv6_privacy.disabled = not editable
if method in ("auto", "fixed", "6to4"):
self.ipv6_method_items["Automatic"].selected = True
elif method == "manual":
self.ipv6_method_items["Manual"].selected = True
elif method == "off":
self.ipv6_method_items["Off"].selected = True
elif method:
log.error("Unknown method: %s", method)
privacy = str(conf.get("Privacy", ""))
if privacy == "disabled":
self.ipv6_privacy_items["Disabled"].selected = True
elif privacy == "enabled":
self.ipv6_privacy_items["Enabled"].selected = True
elif privacy == "prefered":
self.ipv6_privacy_items["Prefered"].selected = True
elif privacy:
log.error("Unknown privacy: %s", privacy)
elif name in ("Proxy", "Proxy.Configuration"):
self.proxy_properties[name] = value
used = self.proxy_properties["Proxy"]
conf = self.proxy_properties["Proxy.Configuration"]
def get_val(name):
v = used.get(name) or conf.get(name)
if not v:
return ""
return str(v)
# url, servers, excludes
method = str(conf.get("Method", ""))
editable = (method == "manual") and (not self.immutable)
# use editable...
if method == "direct":
self.proxy_method_items["Direct"].selected = True
elif method == "manual":
self.proxy_method_items["Manual"].selected = True
elif method == "auto":
self.proxy_method_items["Automatic"].selected = True
elif method:
log.error("Unknown method: %s", method)
elif name in self.readwrite_list_properties:
self.update_readwrite_list(name, value)
if visibility_changed:
self.box.unpack_all()
for attr in self.top_widgets:
if hasattr(self, attr):
wid = getattr(self, attr)
if wid.visible:
self.box.pack_end(wid)
def _disconnect(self, obj):
def on_reply():
log.debug("Disconnected %s", self.path)
self.disconnect.disabled = False
def on_error(exc):
log.error("Could not disconnect %s", exc)
self.disconnect.disabled = False
self.bus_obj.Disconnect(reply_handler=on_reply, error_handler=on_error)
self.disconnect.disabled = True
def _connect(self, obj):
def on_reply():
log.debug("Connected %s", self.path)
self.connect.disabled = False
def on_error(exc):
log.error("Could not connect %s", exc)
self.connect.disabled = False
self.bus_obj.Connect(reply_handler=on_reply, error_handler=on_error)
self.connect.disabled = True
def _forget(self, obj):
def on_reply():
log.debug("Removed %s", self.path)
self.forget.disabled = False
def on_error(exc):
log.error("Could not remove %s", exc)
self.forget.disabled = False
if self.type == "wifi" and "ieee8021x" in self.security_mode:
if pnac_conf.has_section(self.name):
pnac_conf.remove_section(self.name)
for it in self.phase2_items:
self.phase2_items[it].selected = False
for it in self.eap_method_items:
self.eap_method_items[it].selected = False
else:
self.bus_obj.Remove(reply_handler=on_reply, error_handler=on_error)
self.forget.disabled = True
def _on_user_auto_connect(self, obj):
state = obj.state
def on_reply():
log.info("Set AutoConnect=%s", state)
def on_error(exc):
log.error("Failed to set AutoConnect=%s: %s", state, exc)
obj.state = not state
popup_error(self.obj, "Failed to Apply Auto Connect",
exc.get_dbus_message())
self.bus_obj.SetProperty(
"AutoConnect", dbus.Boolean(state),
reply_handler=on_reply, error_handler=on_error
)
def _on_readwrite_changed(self, name, value):
def on_reply():
log.info("Set %s=%s", name, value)
def on_error(exc):
log.error("Failed to set %s=%s: %s", name, value, exc)
key = name[:-len(".Configuration")]
popup_error(self.obj, "Failed to Apply %s" % (key,),
exc.get_dbus_message())
self.reload_readwrite_list(key)
self.bus_obj.SetProperty(
name, dbus.Array(value, signature="s"),
reply_handler=on_reply, error_handler=on_error
)
def _ipv4_apply(self):
value = self.ipv4_method.item_selected.text
if value == "Automatic":
method = "dhcp"
elif value == "Manual":
method = "manual"
elif value == "Off":
method = "off"
def make_variant(s):
return dbus.String(s, variant_level=1)
new = {"Method": make_variant(method)}
if method == "manual":
if self.ipv4_address.text:
new["Address"] = make_variant(self.ipv4_address.text)
if self.ipv4_netmask.text:
new["Netmask"] = make_variant(self.ipv4_netmask.text)
if self.ipv4_gateway.text:
new["Gateway"] = make_variant(self.ipv4_gateway.text)
if len(new) == 1: # no properties yet
return
conf = self.ipv4_properties["IPv4.Configuration"]
changed = []
for k, v in new.items():
if conf.get(k) != v:
changed.append(k)
log.debug("Changed IPv4: %s", ", ".join(changed))
if not changed:
return
def on_reply():
log.info("Set IPv4=%s", new)
def on_error(exc):
log.error("Failed to set IPv4.Configuration=%s: %s", new, exc)
popup_error(self.obj, "Failed to Apply IPv4",
exc.get_dbus_message())
self.bus_obj.SetProperty(
"IPv4.Configuration", new,
reply_handler=on_reply, error_handler=on_error
)
def _on_ipv4_method(self, obj, item):
if item.text == "Automatic":
method = "dhcp"
elif item.text == "Manual":
method = "manual"
elif item.text == "Off":
method = "off"
conf = self.ipv4_properties["IPv4.Configuration"]
editable = (method == "manual") and (not self.immutable)
self.ipv4_address.editable = editable
self.ipv4_netmask.editable = editable
self.ipv4_gateway.editable = editable
if method == conf["Method"]:
return
self._ipv4_apply()
def _on_ipv4_property_changed(self, obj):
self._ipv4_apply()
def _on_ipv4_property_unfocused(self, obj):
used = self.ipv4_properties["IPv4"]
conf = self.ipv4_properties["IPv4.Configuration"]
def get_val(name):
v = used.get(name) or conf.get(name)
if not v:
return ""
return str(v)
self.ipv4_address.text = get_val("Address")
self.ipv4_netmask.text = get_val("Netmask")
self.ipv4_gateway.text = get_val("Gateway")
def _ipv6_apply(self):
value = self.ipv6_method.item_selected.text
if value == "Automatic":
method = "auto"
elif value == "Manual":
method = "manual"
elif value == "Off":
method = "off"
def make_variant(s):
return dbus.String(s, variant_level=1)
new = {"Method": make_variant(method)}
if method == "manual":
if self.ipv6_address.text:
new["Address"] = make_variant(self.ipv6_address.text)
if self.ipv6_prefix_length.text:
new["PrefixLength"] = make_variant(self.ipv6_prefix_length.text)
if self.ipv6_gateway.text:
new["Gateway"] = make_variant(self.ipv6_gateway.text)
value = self.ipv6_privacy.item_selected.text
if value:
new["Privacy"] = make_variant(value)
if len(new) == 1: # no properties yet
return
conf = self.ipv6_properties["IPv6.Configuration"]
changed = []
for k, v in new.items():
if conf.get(k) != v:
changed.append(k)
log.debug("Changed IPv6: %s", ", ".join(changed))
if not changed:
return
def on_reply():
log.info("Set IPv6=%s", new)
def on_error(exc):
log.error("Failed to set IPv6.Configuration=%s: %s", new, exc)
popup_error(self.obj, "Failed to Apply IPv6",
exc.get_dbus_message())
self.bus_obj.SetProperty(
"IPv6.Configuration", new,
reply_handler=on_reply, error_handler=on_error
)
def _on_ipv6_method(self, obj, item):
if item.text == "Automatic":
method = "auto"
elif item.text == "Manual":
method = "manual"
elif item.text == "Off":
method = "off"
conf = self.ipv6_properties["IPv6.Configuration"]
editable = (method == "manual") and (not self.immutable)
self.ipv6_address.editable = editable
self.ipv6_prefix_length.editable = editable
self.ipv6_gateway.editable = editable
# privacy has only meaning if Method is set to "auto"
editable = (method == "auto") and (not self.immutable)
self.ipv6_privacy.disabled = not editable
if method == conf["Method"]:
return
self._ipv6_apply()
def _on_ipv6_privacy(self, obj, item):
if item.text == "Disabled":
privacy = "disabled"
elif item.text == "Enabled":
privacy = "enabled"
elif item.text == "Prefered":
privacy = "prefered"
conf = self.ipv6_properties["IPv6.Configuration"]
if privacy == conf["Privacy"]:
return
self._ipv6_apply()
def _on_ipv6_property_changed(self, obj):
self._ipv6_apply()
def _on_ipv6_property_unfocused(self, obj):
used = self.ipv6_properties["IPv6"]
conf = self.ipv6_properties["IPv6.Configuration"]
def get_val(name):
v = used.get(name) or conf.get(name)
if not v:
return ""
return str(v)
self.ipv6_address.text = get_val("Address")
self.ipv6_prefix_length.text = get_val("PrefixLength")
self.ipv6_gateway.text = get_val("Gateway")
#self.ipv6_privacy.text = get_val("Privacy")
def _on_proxy_method(self, obj, item):
if item.text == "Direct":
method = "direct"
elif item.text == "Manual":
method = "manual"
elif item.text == "Automatic":
method = "auto"
conf = self.proxy_properties["Proxy.Configuration"]
#editable = (method == "manual") and (not self.immutable)
# use editable...
if method == conf["Method"]:
return
#self._proxy_apply()
def _on_proxy_changed(self, obj):
pass
#self._proxy_apply()
def _on_proxy_unfocused(self, obj):
pass
#revert to configured values...
def _on_eap_method(self, obj, item):
eap_val = None
if item.text == "PEAP":
eap_val = "peap"
elif item.text == "TLS":
eap_val = "tls"
elif item.text == "TTLS":
eap_val = "ttls"
elif item.text == "None":
eap_val = None
pnac_conf.set(self.name, "EAP", eap_val)
pnac_conf.write()
return
def _on_phase2(self, obj, item):
phase2_val = None
if item.text == "MSCHAPv2":
phase2_val = "MSCHAPV2"
elif item.text == "TLS":
phase2_val = "tls"
elif item.text == "None":
phase2_val = None
pnac_conf.set(self.name, 'Phase2', phase2_val)
pnac_conf.write()
return
########################################################################
# Main Actions:
def show_techs(button, naviframe):
def on_selected(path, properties):
name = str(properties.get("Name"))
log.debug("view technology: %r %s", name, path)
tv = TechView(naviframe, path, properties)
naviframe.item_push(name, None, None, tv.obj, "basic")
tl = TechList(naviframe, on_selected)
naviframe.item_push("Technologies", None, None, tl.obj, "basic")
def connect_service(path, properties):
type = properties.get("Type")
if not type:
log.error("cannot try to connect to service without type: %s", path)
return
if type in ("system", "gps", "gadget"):
log.error("cannot connect to service with type: %s", type)
return
sec = properties.get("Security")
name = properties.get("Name")
if name:
name = str(name)
log.debug("connect to %s (%s): %s",
name, path, dbus_dict_to_str(properties))
# Connman only supports two phase auth via config files
if ("ieee8021x" in sec) and not pnac_conf.has_section(name):
popup_error(
win,
"This Network needs Configuration",
"This network uses 802.1x authentication. "
"Please configure the options in the section at the bottom."
)
show_service(path, properties)
return
def on_reply():
log.info("Connected to %s (%s)", name, path)
def on_error(exc):
exc_name = exc.get_dbus_name()
if exc_name == "net.connman.Error.AlreadyConnected" or \
exc_name == "net.connman.Error.InProgress":
log.debug("Failed to Connect to %s (%s): %s", name, path, exc)
return
log.error("Failed to Connect to %s (%s): %s", name, path, exc)
if exc_name == "net.connman.Error.NotRegistered":
popup_error(win, "Failed to Connect to %s" % name,
"Not registered. Try running \"$ econnman-bin -a\"")
return
popup_error(win, "Failed to Connect to %s" % name,
exc.get_dbus_message())
service = dbus.Interface(bus.get_object("net.connman", path),
"net.connman.Service")
service.Connect(reply_handler=on_reply,
error_handler=on_error)
def show_service(path, properties):
name = str(properties.get("Name"))
log.debug("view service: %r %s", name, path)
sv = ServiceView(nf, path, properties)
nf.item_push(name, None, None, sv.obj, "basic")
########################################################################
# Agent:
def agent_method(in_signature="", out_signature="", **kargs):
return dbus.service.method("net.connman.Agent",
in_signature=in_signature,
out_signature=out_signature,
**kargs)
class Agent(dbus.service.Object):
path = "/org/enlightenment/econnman/agent"
request_type_conv = {
"SSID": dbus.ByteArray,
}
class Canceled(dbus.DBusException):
_dbus_error_name = "net.connman.Agent.Error.Canceled"
def __init__(self, serv_lst):
dbus.service.Object.__init__(self, bus, self.path)
self.dialog = None
self.serv_lst = serv_lst
@agent_method()
def Release(self):
log.info("Agent released by ConnMan")
if self.dialog:
self.dialog.delete()
self.dialog = None
@agent_method(in_signature="os")
def RequestBrowser(self, path, url):
log.info("Open browser for %s at %s", path, url)
ecore.exe_run("xdg-open '%s'" % url)
@agent_method(in_signature="os")
def ReportError(self, path, error):
log.error("ConnMan error %s: %s", path, error)
popup_error(win, "ConnMan Error", str(error))
@agent_method()
def Cancel(self):
log.info("Canceled dialog")
if self.dialog:
self.dialog.delete()
self.dialog = None
@agent_method(in_signature="oa{sv}", out_signature="a{sv}",
async_callbacks=("on_done", "on_error"))
def RequestInput(self, path, fields, on_done, on_error):
log.debug("Request Input for %s: %s", path, dbus_dict_to_str(fields))
def on_deleted(obj):
w = self.dialog
self.dialog = None
if w:
e = Agent.Canceled("user canceled")
log.debug("User canceled agent request: %s", e)
on_error(e)
def on_clicked(obj):
response = {}
keys = []
for name, en in widgets.items():
conv = self.request_type_conv.get(name, dbus.String)
v = conv(en.text)
if v:
response[name] = v
keys.append(name)
log.debug("User Replies with keys: %s", ", ".join(keys))
w = self.dialog
self.dialog = None
on_done(response)
w.delete()
self.dialog = w = Window("econnman-agent", ELM_WIN_DIALOG_BASIC)
w.title = "ConnMan Requested Input"
w.icon_name = "econnman"
w.autodel = True
w.on_del_add(on_deleted)
w.show()
bg = Background(w)
bg.size_hint_weight = EXPAND_BOTH
bg.show()
w.resize_object_add(bg)
bx = Box(w)
bx.size_hint_align = FILL_BOTH
bx.horizontal = False
bx.show()
w.resize_object_add(bx)
lb = Label(bx)
lb.size_hint_weight = EXPAND_HORIZ
lb.size_hint_align = FILL_BOTH
lb.text = "<b>ConnMan needs your input</b>"
lb.show()
bx.pack_end(lb)
name = self.serv_lst.service_name_get(path)
if name:
lb = Label(bx)
lb.size_hint_weight = EXPAND_HORIZ
lb.size_hint_align = FILL_BOTH
lb.text = "Service: %s" % (name,)
lb.show()
bx.pack_end(lb)
widgets = {}
for name, desc in fields.items():
decos = ""
t = desc.get("Type")
if t and t != "informational":
decos += " (type: %s)" % (t,)
if desc.get("Requirement") == "mandatory":
decos += " REQUIRED"
lb = Label(bx)
lb.size_hint_weight = EXPAND_HORIZ
lb.size_hint_align = FILL_BOTH
lb.text = "%s:%s" % (name, decos)
lb.show()
bx.pack_end(lb)
en = Entry(bx)
en.size_hint_weight = EXPAND_HORIZ
en.size_hint_align = FILL_BOTH
en.single_line = True
en.scrollable = True
en.text = desc.get("Value", "")
en.editable = (t != "informational")
en.show()
bx.pack_end(en)
widgets[name] = en
bt = Button(bx)
bt.size_hint_weight = EXPAND_HORIZ
bt.size_hint_align = FILL_BOTH
bt.callback_clicked_add(on_clicked)
bt.text = "Submit"
bt.show()
bx.pack_end(bt)
########################################################################
# GUI helpers:
def popup_fatal(obj, title, message):
"""Shows a popup with a fatal message and a Quit button.
Dismissing this popup with the Quit button also exits the application.
"""
win = obj.top_widget_get()
log.critical("%s: %s", title, message)
pop = Popup(win)
pop.size_hint_weight = EXPAND_BOTH
pop.part_text_set("title,text", title)
pop.text = message
bt = Button(win)
bt.text = "Quit"
bt.callback_clicked_add(lambda bt: elm.exit())
pop.part_content_set("button1", bt)
pop.show()
return pop
def popup_error(obj, title, message):
"""Shows a popup with an error message and a Close button."""
win = obj.top_widget_get()
log.error("%s: %s", title, message)
pop = Popup(win)
pop.size_hint_weight = EXPAND_BOTH
pop.part_text_set("title,text", title)
pop.text = message
bt = Button(win)
bt.text = "Close"
bt.callback_clicked_add(lambda bt: pop.delete())
pop.part_content_set("button1", bt)
pop.show()
return pop
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Connection Manager for Enlightenment")
parser.add_argument("-v", "--verbose", action="count")
parser.add_argument("-a", "--agent", action="store_true")
args = parser.parse_args()
level = logging.WARNING
if args.verbose:
level -= 10 * args.verbose
log.setLevel(level)
pnac_conf = PNACConfig()
elm.init()
elm.policy_set(ELM_POLICY_QUIT, ELM_POLICY_QUIT_LAST_WINDOW_CLOSED)
for td in ("./data/theme/default.edj", "@PKGDATADIR@/theme/default.edj"):
if os.path.exists(td):
Theme(default=True).extension_add(td)
win = Window("econnman", ELM_WIN_BASIC)
win.title = "EConnMan"
win.icon_name = "econnman"
win.autodel = True
win.size = (480, 700)
win.show()
bg = Background(win)
bg.size_hint_weight = EXPAND_BOTH
bg.show()
win.resize_object_add(bg)
try:
manager = dbus.Interface(bus.get_object("net.connman", "/"),
"net.connman.Manager")
except dbus.exceptions.DBusException:
popup_fatal(win, "Failed to find ConnMan",
"Check if ConnMan is running.")
elm.run()
elm.shutdown()
raise
nf = Naviframe(win)
nf.size_hint_weight = EXPAND_BOTH
nf.show()
win.resize_object_add(nf)
offline_mon = OfflineModeMonitor(win)
techs = Button(win)
techs.text = "Techs"
techs.callback_clicked_add(show_techs, nf)
serv_lst = ServicesList(win, connect_service, show_service)
nf.item_push("EConnMan", offline_mon.obj, techs, serv_lst.obj, "basic")
if args.agent:
log.debug("create agent")
agent = Agent(serv_lst)
manager.RegisterAgent(agent.path)
log.info("Registered agent at %s", agent.path)
elm.run()
#pnac_conf.write()
logging.shutdown()
elm.shutdown()