You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1994 lines
67 KiB
1994 lines
67 KiB
#!/usr/bin/python |
|
# -*- coding: utf-8 -*- |
|
|
|
# TODO: |
|
# - IPv6 |
|
# - Proxy |
|
# - Fix connman's PropertyChanged when changing off/manual -> dhcp, |
|
# gateway is not updated. |
|
|
|
|
|
import sys |
|
import dbus |
|
import dbus.service |
|
import logging |
|
import argparse |
|
import os.path |
|
|
|
try: |
|
import efl.evas as evas |
|
import efl.ecore as ecore |
|
import efl.edje as edje # Class resolve hack for edje_get |
|
from efl.dbus_mainloop import DBusEcoreMainLoop |
|
import efl.elementary as elm |
|
from efl.elementary import ELM_POLICY_QUIT, \ |
|
ELM_POLICY_QUIT_LAST_WINDOW_CLOSED |
|
from efl.elementary.window import Window, ELM_WIN_BASIC, \ |
|
ELM_WIN_DIALOG_BASIC |
|
from efl.elementary.background import Background |
|
from efl.elementary.box import Box |
|
from efl.elementary.label import Label |
|
from efl.elementary.naviframe import Naviframe |
|
from efl.elementary.popup import Popup |
|
from efl.elementary.button import Button |
|
from efl.elementary.scroller import Scroller, ELM_SCROLLER_POLICY_OFF, \ |
|
ELM_SCROLLER_POLICY_AUTO |
|
from efl.elementary.check import Check |
|
from efl.elementary.progressbar import Progressbar |
|
from efl.elementary.genlist import Genlist, GenlistItemClass |
|
from efl.elementary.segment_control import SegmentControl |
|
from efl.elementary.frame import Frame |
|
from efl.elementary.entry import Entry |
|
from efl.elementary.icon import Icon |
|
from efl.elementary.layout import Layout |
|
from efl.elementary.theme import Theme |
|
except: |
|
import elementary as elm |
|
import evas |
|
import ecore |
|
import edje # Class resolve hack for edje_get |
|
from e_dbus import DBusEcoreMainLoop |
|
from elementary import Window, Background, Box, Label, Naviframe, Popup, \ |
|
Button, Scroller, Check, Progressbar, Genlist, GenlistItemClass, \ |
|
SegmentControl, Frame, Entry, Icon, Layout, Theme, ELM_WIN_BASIC, \ |
|
ELM_WIN_DIALOG_BASIC, ELM_POLICY_QUIT, ELM_SCROLLER_POLICY_OFF, \ |
|
ELM_SCROLLER_POLICY_AUTO, ELM_POLICY_QUIT_LAST_WINDOW_CLOSED |
|
|
|
dbus_ml = DBusEcoreMainLoop() |
|
bus = dbus.SystemBus(mainloop=dbus_ml) |
|
log = logging.getLogger("econnman-bin") |
|
log_handler = logging.StreamHandler() |
|
log_formatter = logging.Formatter( |
|
"%(created)d %(name)s [%(levelname)s]:: %(message)s (@lineno %(lineno)d)" |
|
) |
|
log_handler.setFormatter(log_formatter) |
|
log.addHandler(log_handler) |
|
|
|
manager = None |
|
|
|
EXPAND_BOTH = (evas.EVAS_HINT_EXPAND, evas.EVAS_HINT_EXPAND) |
|
EXPAND_HORIZ = (evas.EVAS_HINT_EXPAND, 0.0) |
|
|
|
FILL_BOTH = (evas.EVAS_HINT_FILL, evas.EVAS_HINT_FILL) |
|
|
|
|
|
# python2 backwards compatibility |
|
try: |
|
from configparser import SafeConfigParser |
|
except ImportError: |
|
from ConfigParser import SafeConfigParser |
|
|
|
|
|
class PNACConfig(SafeConfigParser): |
|
|
|
"""A custom config parser for IEEE802.1x (PNAC) |
|
|
|
Section names are prefixed with service_ |
|
|
|
""" |
|
|
|
CONF_FILE = "/var/lib/connman/econnman.config" |
|
|
|
def __init__(self): |
|
SafeConfigParser.__init__(self) |
|
self.optionxform = str |
|
|
|
def read(self): |
|
args = self.CONF_FILE, 'r' |
|
kwargs = {} |
|
if sys.hexversion >= 0x03000000: |
|
kwargs["encoding"] = 'utf8' |
|
try: |
|
with open(*args, **kwargs) as fd: |
|
self.readfp(fd) |
|
except IOError: |
|
log.error( |
|
"Econnman cannot read the configuration file \"%s\" used by " |
|
"connman to configure your ieee802.1x networks. Make sure the " |
|
"user running econnman is able to read/write it.", |
|
self.CONF_FILE |
|
) |
|
#raise |
|
|
|
# defaults() |
|
# sections() |
|
|
|
def add_section(self, service_name): |
|
secname = 'service_' + service_name |
|
SafeConfigParser.add_section(self, secname) |
|
self.set(service_name, 'Type', 'wifi') |
|
self.set(service_name, 'Name', service_name) |
|
#self.write() |
|
|
|
def has_section(self, service_name): # config_exists |
|
return bool(self.section_get(service_name)) |
|
|
|
# options() |
|
|
|
def has_option(self, service_name, key): |
|
secname = self.section_get(service_name) |
|
return SafeConfigParser.has_option(secname, key) |
|
|
|
# read() |
|
# readfp() |
|
|
|
def get(self, service_name, key): # config_option_get |
|
secname = self.section_get(service_name) |
|
if self.has_option(service_name, key): |
|
return SafeConfigParser.get(self, secname, key) |
|
return None |
|
|
|
# getint() |
|
# getfloat() |
|
# getboolean() |
|
# items() |
|
|
|
def set(self, service_name, key, value): # config_set |
|
secname = self.section_get(service_name) |
|
if not self.has_section(service_name): |
|
self.add_section(service_name) |
|
if value is not None: |
|
SafeConfigParser.set(self, secname, key, value) |
|
elif self.has_option(secname, key): |
|
self.remove_option(secname, key) |
|
#self.write() |
|
|
|
def write(self): |
|
# TODO: Copy owner and mask from existing config file, write to a temp |
|
# file and request overwriting with sudo or polkit, then set the |
|
# owner and mask like it was before. This is to avoid the |
|
# requirement of running the entire econnman instance with root |
|
# rights. |
|
try: |
|
with open(self.CONF_FILE, 'w', encoding='utf8') as configfile: |
|
SafeConfigParser.write(self, configfile) |
|
except IOError: |
|
log.error( |
|
"Econnman cannot write to the configuration file \"%s\" used " |
|
"by connman to configure your ieee802.1x networks. Make sure " |
|
"the user running econnman is able to read/write it.", |
|
self.CONF_FILE |
|
) |
|
|
|
def remove_option(self, service_name, key): |
|
secname = self.section_get(service_name) |
|
SafeConfigParser.remove_option(self, secname, key) |
|
#self.write() |
|
|
|
def remove_section(self, service_name): # config_del |
|
secname = self.section_get(service_name) |
|
ret = SafeConfigParser.remove_section(self, secname) |
|
#self.write() |
|
return ret |
|
|
|
def section_get(self, service_name): # config_get |
|
#secname = 'service_' + service_name |
|
for sec in self.sections(): |
|
if self.has_option(sec, 'Name') and \ |
|
self.get(sec, 'Name') == service_name: |
|
return sec |
|
else: |
|
return None |
|
|
|
|
|
######################################################################## |
|
# Debug helpers: |
|
def dbus_variant_to_str(v): |
|
if isinstance(v, dbus.String): |
|
v = '"%s"' % (str(v),) |
|
elif isinstance(v, dbus.Boolean): |
|
v = str(bool(v)) |
|
elif isinstance(v, (dbus.Dictionary, dbus.Struct)): |
|
v = "{%s}" % (dbus_dict_to_str(v),) |
|
elif isinstance(v, dbus.Array): |
|
v = "[%s]" % (dbus_array_to_str(v),) |
|
elif isinstance(v, dbus.ObjectPath): |
|
v = str(v) |
|
elif isinstance(v, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64, |
|
dbus.UInt16, dbus.UInt32, dbus.UInt64)): |
|
v = int(v) |
|
elif isinstance(v, dbus.Double): |
|
v = float(v) |
|
else: |
|
v = repr(v) |
|
return v |
|
|
|
|
|
def dbus_dict_to_str(d): |
|
"""Help debug by converting a dbus.Dictionary to a string in a shorter |
|
form. |
|
""" |
|
s = [] |
|
for k, v in d.items(): |
|
s.append("%s=%s" % (k, dbus_variant_to_str(v))) |
|
return ", ".join(s) |
|
|
|
|
|
def dbus_array_to_str(a): |
|
"""Help debug by converting a complex structure to a string in shorter |
|
form. |
|
""" |
|
return ", ".join(dbus_variant_to_str(x) for x in a) |
|
|
|
|
|
def dbus_array_of_dict_to_str(a): |
|
"""Help debug by converting a complex structure to a string in a |
|
shorter form with only the keys, not the value. |
|
""" |
|
s = [] |
|
for k, v in a: |
|
s.append(str(k)) |
|
return ", ".join(s) |
|
|
|
|
|
class ObjectView(object): |
|
"""Base for viewing a complex object. |
|
|
|
Implementors must set: |
|
- bus_interface: to assign to self.bus_obj |
|
- create_view(properties): to create the specific view widgets |
|
- on_property_changed(name, value): to update view widgets |
|
|
|
Provided automatically by this class: |
|
- path: object path |
|
- bus_obj: proxy object with specific interface to remote bus object |
|
- obj: main toplevel view object |
|
- box: main toplevel view box |
|
""" |
|
bus_interface = None |
|
|
|
def __init__(self, parent, path, properties): |
|
self.path = path |
|
self.bus_obj = dbus.Interface(bus.get_object("net.connman", path), |
|
self.bus_interface) |
|
self.sig_ch = self.bus_obj.connect_to_signal("PropertyChanged", |
|
self.on_property_changed) |
|
|
|
self.obj = Scroller(parent) |
|
self.obj.on_del_add(self._deleted) |
|
self.obj.size_hint_weight = EXPAND_BOTH |
|
self.obj.policy_set(ELM_SCROLLER_POLICY_OFF, |
|
ELM_SCROLLER_POLICY_AUTO) |
|
self.obj.bounce_set(False, True) |
|
self.obj.content_min_limit(True, False) |
|
|
|
self.box = Box(self.obj) |
|
self.box.size_hint_weight = EXPAND_HORIZ |
|
self.box.horizontal = False |
|
|
|
self.create_view(properties) |
|
|
|
self.obj.content = self.box |
|
for k in properties: |
|
self.on_property_changed(k, properties[k]) |
|
|
|
def _deleted(self, obj): |
|
log.debug("View deleted %s (%s)", self.__class__.__name__, self.path) |
|
self.sig_ch.remove() |
|
self.bus_obj = None |
|
self.sig_ch = None |
|
self.obj = None |
|
|
|
def create_view(self, properties): |
|
log.critical("must be implemented!") |
|
pass |
|
|
|
def on_property_changed(self, name, value): |
|
log.critical("must be implemented!") |
|
|
|
def add_check(self, box, label, callback=None): |
|
obj = Check(box) |
|
obj.size_hint_weight = EXPAND_HORIZ |
|
obj.size_hint_align = FILL_BOTH |
|
obj.text = label |
|
obj.show() |
|
box.pack_end(obj) |
|
if callback: |
|
obj.callback_changed_add(callback) |
|
return obj |
|
|
|
def add_button(self, box, label, callback): |
|
obj = Button(box) |
|
obj.size_hint_weight = EXPAND_HORIZ |
|
obj.size_hint_align = FILL_BOTH |
|
obj.text = label |
|
obj.show() |
|
obj.callback_clicked_add(callback) |
|
box.pack_end(obj) |
|
return obj |
|
|
|
def add_label(self, box, label): |
|
lb = Label(box) |
|
lb.size_hint_weight = EXPAND_HORIZ |
|
lb.size_hint_align = FILL_BOTH |
|
lb.text = label |
|
lb.show() |
|
box.pack_end(lb) |
|
return lb |
|
|
|
def add_progress(self, box, label): |
|
pb = Progressbar(box) |
|
pb.size_hint_weight = EXPAND_HORIZ |
|
pb.size_hint_align = FILL_BOTH |
|
pb.text = label |
|
pb.show() |
|
box.pack_end(pb) |
|
return pb |
|
|
|
def add_segment_control(self, box, options, callback): |
|
sc = SegmentControl(box) |
|
sc.size_hint_weight = EXPAND_HORIZ |
|
sc.size_hint_align = FILL_BOTH |
|
items = {} |
|
for o in options: |
|
items[o] = sc.item_add(None, o) |
|
sc.show() |
|
box.pack_end(sc) |
|
sc.callback_changed_add(callback) |
|
return sc, items |
|
|
|
def add_label_and_segment_control(self, box, options, callback, label): |
|
lb = self.add_label(box, label) |
|
|
|
sc = SegmentControl(box) |
|
sc.size_hint_weight = EXPAND_HORIZ |
|
sc.size_hint_align = FILL_BOTH |
|
items = {} |
|
for o in options: |
|
items[o] = sc.item_add(None, o) |
|
sc.show() |
|
box.pack_end(sc) |
|
sc.callback_changed_add(callback) |
|
return lb, sc, items |
|
|
|
def add_frame_and_box(self, box, label): |
|
fr = Frame(box) |
|
fr.size_hint_weight = EXPAND_HORIZ |
|
fr.size_hint_align = FILL_BOTH |
|
fr.text = label |
|
fr.show() |
|
box.pack_end(fr) |
|
|
|
bx = Box(fr) |
|
bx.size_hint_weight = EXPAND_HORIZ |
|
bx.size_hint_align = FILL_BOTH |
|
bx.horizontal = False |
|
bx.show() |
|
fr.content = bx |
|
return fr, bx |
|
|
|
def add_label_and_entry(self, box, label, callback=None): |
|
lb = self.add_label(box, label) |
|
|
|
en = Entry(box) |
|
en.size_hint_weight = EXPAND_HORIZ |
|
en.size_hint_align = FILL_BOTH |
|
en.single_line = True |
|
en.scrollable = True |
|
en.show() |
|
box.pack_end(en) |
|
if callback: |
|
en.callback_activated_add(callback) |
|
return lb, en |
|
|
|
|
|
######################################################################## |
|
# Views: |
|
class OfflineModeMonitor(object): |
|
"""Monitors the Manager's OfflineMode property as a Toggle. |
|
|
|
The toggle reflects the server state but can be changed by the |
|
user to set the property remotely. |
|
""" |
|
def __init__(self, win): |
|
self.obj = Check(win) |
|
self.obj.style = "toggle" |
|
self.obj.part_text_set("on", "Offline") |
|
self.obj.part_text_set("off", "Online") |
|
self.obj.callback_changed_add(self._on_user_changed) |
|
self.obj.on_del_add(self._deleted) |
|
|
|
def on_reply(properties): |
|
for name, value in properties.items(): |
|
log.debug("property %s: %s", name, value) |
|
self._property_changed(name, value) |
|
|
|
def on_error(exc): |
|
popup_fatal(win, "Failed to get ConnMan Properties", str(exc)) |
|
|
|
manager.GetProperties(reply_handler=on_reply, |
|
error_handler=on_error) |
|
self.sig_ch = manager.connect_to_signal("PropertyChanged", |
|
self._property_changed) |
|
|
|
def _deleted(self, obj): |
|
self.sig_ch.remove() |
|
self.obj = None |
|
self.sig_ch = None |
|
|
|
def _property_changed(self, name, value): |
|
log.debug("property %s: %s", name, value) |
|
if name == "OfflineMode": |
|
self.obj.state = bool(value) |
|
|
|
def _on_user_changed(self, obj): |
|
state = obj.state |
|
|
|
def on_reply(): |
|
log.info("Set OfflineMode=%s", state) |
|
|
|
def on_error(exc): |
|
log.error("Failed to set OfflineMode=%s: %s", state, exc) |
|
obj.state = not state |
|
popup_error(self.obj, "Failed to Apply Offline Mode", |
|
exc.get_dbus_message()) |
|
|
|
manager.SetProperty("OfflineMode", dbus.Boolean(state), |
|
reply_handler=on_reply, error_handler=on_error) |
|
|
|
|
|
class TechList(object): |
|
"""Provides a Genlist with the Technologies supported. |
|
|
|
It will call manager's GetTechnologies() and then keep it updated |
|
with TechnologyAdded and TechnologyRemoved signals, as well as the |
|
technologies properties with PropertyChanged. |
|
|
|
Selecting an item will call C{on_selected(path, tech_properties)}. |
|
""" |
|
def __init__(self, parent, on_selected=None): |
|
self.techs = {} |
|
self.items = {} |
|
self.obj = Genlist(parent) |
|
self.obj.on_del_add(self._deleted) |
|
self.on_selected = on_selected |
|
self.obj.callback_selected_add(self._tech_selected) |
|
self.sig_added = manager.connect_to_signal( |
|
"TechnologyAdded", |
|
self._tech_added |
|
) |
|
self.sig_removed = manager.connect_to_signal( |
|
"TechnologyRemoved", |
|
self._tech_removed |
|
) |
|
self.sig_propch = bus.add_signal_receiver( |
|
self._tech_changed, |
|
"PropertyChanged", |
|
"net.connman.Technology", |
|
"net.connman", |
|
path_keyword='path' |
|
) |
|
self.itc = GenlistItemClass( |
|
item_style="default", |
|
text_get_func=self._item_text_get, |
|
content_get_func=self._item_content_get |
|
) |
|
|
|
manager.GetTechnologies( |
|
reply_handler=self._get_techs_reply, |
|
error_handler=self._get_techs_error |
|
) |
|
|
|
def _deleted(self, lst): |
|
self.sig_added.remove() |
|
self.sig_removed.remove() |
|
self.sig_propch.remove() |
|
|
|
self.obj = None |
|
self.sig_added = None |
|
self.sig_removed = None |
|
self.sig_propch = None |
|
self.techs.clear() |
|
self.items.clear() |
|
|
|
def _get_techs_reply(self, techs): |
|
log.debug("Got technologies: %s", dbus_array_of_dict_to_str(techs)) |
|
for path, properties in techs: |
|
self._tech_added(path, properties) |
|
|
|
def _get_techs_error(self, exc): |
|
log.error("Failed to GetTechnologies(): %s", exc) |
|
popup_error(self.obj, "Failed to get Technologies", |
|
exc.get_dbus_message()) |
|
|
|
def _tech_added(self, path, properties): |
|
path = str(path) |
|
log.debug("Added %s: %s", path, dbus_dict_to_str(properties)) |
|
self.techs[path] = properties |
|
self.items[path] = self.obj.item_append(self.itc, path) |
|
|
|
def _tech_changed(self, name, value, path): |
|
path = str(path) |
|
log.debug("Changed %s: %s=%s", path, name, value) |
|
t = self.techs.get(path) |
|
if not t: |
|
return |
|
t[name] = value |
|
it = self.items.get(path) |
|
if not it: |
|
return |
|
it.update() |
|
|
|
def _tech_removed(self, path): |
|
path = str(path) |
|
log.debug("Removed %s", path) |
|
try: |
|
del self.techs[path] |
|
except KeyError: |
|
pass |
|
try: |
|
it = self.items.pop(path) |
|
it.delete() |
|
except KeyError: |
|
pass |
|
|
|
def _tech_selected(self, lst, item): |
|
item.selected = False |
|
if not self.on_selected: |
|
return |
|
path = item.data |
|
t = self.techs.get(path) |
|
if t: |
|
self.on_selected(path, t) |
|
|
|
def _item_text_get(self, obj, part, item_data): |
|
if part != "elm.text": |
|
return None |
|
t = self.techs.get(item_data) |
|
if not t: |
|
return "Unknown" |
|
return t.get("Name", item_data[len("/net/connman/technology/"):]) |
|
|
|
def _item_content_get(self, obj, part, item_data): |
|
if part == "elm.swallow.end": |
|
ic = Icon(obj) |
|
ic.standard = "arrow_right" |
|
return ic |
|
|
|
if part != "elm.swallow.icon": |
|
return |
|
t = self.techs.get(item_data) |
|
if not t: |
|
return None |
|
|
|
ic = Icon(obj) |
|
if t.get("Connected", False): |
|
ic.standard = "connman-tech-connected" |
|
elif t.get("Powered", False): |
|
ic.standard = "connman-tech-powered" |
|
else: |
|
ic.standard = "connman-tech-offline" |
|
return ic |
|
|
|
|
|
class TechView(ObjectView): |
|
"""Provides a detailed view of the technology given by C{path}. |
|
|
|
The C{properties} argument is used to populate the current state, |
|
which will be updated with net.connman.Technology.PropertyChanged |
|
signal that it will listen. |
|
|
|
User updates will be automatically applied to the server. |
|
""" |
|
bus_interface = "net.connman.Technology" |
|
|
|
def create_view(self, properties): |
|
self.powered = self.add_check(self.box, "Powered", |
|
self._on_user_powered) |
|
|
|
self.connected = self.add_check(self.box, "Connected") |
|
self.connected.disabled = True |
|
|
|
self.scan = self.add_button(self.box, "Scan", self._scan) |
|
|
|
fr, bx = self.add_frame_and_box(self.box, "Tethering") |
|
|
|
self.tethering = self.add_check(bx, "Enabled", |
|
self._on_user_tethering) |
|
|
|
lb, self.identifier = self.add_label_and_entry(bx, "Identifier:") |
|
lb, self.passphrase = self.add_label_and_entry(bx, "Passphrase:") |
|
self.tethering_apply = self.add_button(bx, "Apply Tethering", |
|
self._tethering_apply) |
|
|
|
def _on_user_powered(self, obj): |
|
state = bool(self.powered.state) |
|
|
|
def on_reply(): |
|
log.info("Set %s Powered=%s", self.path, state) |
|
|
|
def on_error(exc): |
|
log.error("Could not set %s Powered=%s: %s", self.path, state, exc) |
|
obj.state = not state |
|
popup_error(self.obj, "Failed to Apply Powered", |
|
exc.get_dbus_message()) |
|
|
|
self.bus_obj.SetProperty( |
|
"Powered", dbus.Boolean(state), |
|
reply_handler=on_reply, error_handler=on_error |
|
) |
|
|
|
def _scan(self, obj): |
|
def on_reply(): |
|
log.debug("Scanned %s", self.path) |
|
self.scan.disabled = False |
|
self.scan.text = "Scan" |
|
|
|
def on_error(exc): |
|
log.error("Could not scan %s", exc) |
|
self.scan.disabled = False |
|
self.scan.text = "Scan" |
|
|
|
self.bus_obj.Scan(reply_handler=on_reply, error_handler=on_error) |
|
self.scan.disabled = True |
|
self.scan.text = "Scanning..." |
|
|
|
def _on_user_tethering(self, obj): |
|
state = bool(obj.state) |
|
self.identifier.disabled = not state |
|
self.passphrase.disabled = not state |
|
|
|
def _tethering_apply(self, obj): |
|
self.to_apply = [ |
|
("TetheringIdentifier", self.identifier.text), |
|
("TetheringPassphrase", self.passphrase.text), |
|
("Tethering", dbus.Boolean(self.tethering.state)), |
|
] |
|
|
|
def apply_next(): |
|
if not self.to_apply: |
|
return |
|
name, value = self.to_apply.pop(0) |
|
self.bus_obj.SetProperty(name, value, |
|
reply_handler=on_reply, |
|
error_handler=on_error) |
|
|
|
def on_reply(): |
|
log.debug("Applied tethering %s", self.path) |
|
self.tethering_apply.disabled = False |
|
self.tethering_apply.text = "Apply Tethering" |
|
apply_next() |
|
|
|
def on_error(exc): |
|
log.error("Could not apply tethering %s", exc) |
|
self.tethering_apply.disabled = False |
|
self.tethering_apply.text = "Apply Tethering" |
|
popup_error(self.obj, "Failed to Apply Tethering", |
|
exc.get_dbus_message()) |
|
|
|
apply_next() |
|
self.tethering_apply.disabled = True |
|
self.tethering_apply.text = "Applying Tethering..." |
|
|
|
def on_property_changed(self, name, value): |
|
log.debug("Changed %s: %s=%s", self.path, name, value) |
|
if name == "Powered": |
|
self.powered.state = bool(value) |
|
elif name == "Connected": |
|
self.connected.state = bool(value) |
|
elif name == "Tethering": |
|
state = bool(value) |
|
self.tethering.state = state |
|
self.identifier.disabled = not state |
|
self.passphrase.disabled = not state |
|
elif name == "TetheringIdentifier": |
|
self.identifier.text = str(value) |
|
elif name == "TetheringPassphrase": |
|
self.passphrase.text = str(value) |
|
|
|
|
|
class ServicesList(object): |
|
"""Provides a Genlist with the known Services. |
|
|
|
It will call manager's GetServices() and then keep it updated with |
|
ServicesChanged signal. |
|
|
|
Selecting an item will call C{on_selected(path, service_properties)}. |
|
""" |
|
def __init__(self, parent, on_selected=None, on_disclosure=None): |
|
self.services = {} |
|
self.items = {} |
|
self.obj = Genlist(parent) |
|
self.on_selected = on_selected |
|
self.on_disclosure = on_disclosure |
|
self.obj.callback_selected_add(self._item_selected) |
|
self.obj.on_del_add(self._deleted) |
|
self.sig_ch = manager.connect_to_signal( |
|
"ServicesChanged", |
|
self._services_changed |
|
) |
|
self.sig_propch = bus.add_signal_receiver( |
|
self._service_prop_changed, |
|
"PropertyChanged", |
|
"net.connman.Service", |
|
"net.connman", |
|
path_keyword='path' |
|
) |
|
manager.GetServices( |
|
reply_handler=self._get_services_reply, |
|
error_handler=self._get_services_error |
|
) |
|
self.itc = GenlistItemClass( |
|
item_style="default", |
|
text_get_func=self._item_text_get, |
|
content_get_func=self._item_content_get |
|
) |
|
|
|
def _deleted(self, obj): |
|
self.sig_ch.remove() |
|
|
|
self.obj = None |
|
self.sig_ch = None |
|
self.services.clear() |
|
self.items.clear() |
|
|
|
def _items_repopulate(self, paths): |
|
for path in paths: |
|
self.items[path] = self.obj.item_append(self.itc, path) |
|
|
|
def _get_services_reply(self, services): |
|
log.debug("Got services: %s", dbus_array_of_dict_to_str(services)) |
|
for path, properties in services: |
|
self._service_added(path, properties) |
|
self._items_repopulate(str(path) for path, properties in services) |
|
|
|
def _get_services_error(self, exc): |
|
log.critical("Failed to GetServices(): %s", exc) |
|
popup_fatal(self.obj, "Failed to get Services", |
|
exc.get_dbus_message()) |
|
|
|
def _service_added(self, path, properties): |
|
log.debug("Added %s: %s", path, dbus_dict_to_str(properties)) |
|
self.services[path] = properties |
|
|
|
def _service_prop_changed(self, name, value, path): |
|
path = str(path) |
|
log.debug("Changed %s: %s=%s", path, name, value) |
|
s = self.services.get(path) |
|
if not s: |
|
return |
|
s[name] = value |
|
it = self.items.get(path) |
|
if not it: |
|
return |
|
it.update() |
|
|
|
def _service_changed(self, path, properties): |
|
log.debug("Changed %s: %s", path, dbus_dict_to_str(properties)) |
|
d = self.services[path] |
|
for k, v in properties.items(): |
|
d[k] = v |
|
|
|
def _services_changed(self, changed, removed): |
|
log.debug("Changed: %s, Removed: %s", |
|
dbus_array_of_dict_to_str(changed), |
|
removed) |
|
|
|
self.items.clear() |
|
self.obj.clear() |
|
|
|
for path in removed: |
|
self._service_removed(path) |
|
for path, properties in changed: |
|
path = str(path) |
|
if path in self.services: |
|
self._service_changed(path, properties) |
|
else: |
|
self._service_added(path, properties) |
|
self._items_repopulate(str(path) for path, properties in changed) |
|
|
|
def _service_removed(self, path): |
|
path = str(path) |
|
log.debug("Removed %s", path) |
|
try: |
|
del self.services[path] |
|
except KeyError: |
|
pass |
|
|
|
def _item_selected(self, lst, item): |
|
item.selected = False |
|
if not self.on_selected: |
|
return |
|
path = item.data |
|
s = self.services.get(path) |
|
if s: |
|
self.on_selected(path, s) |
|
|
|
def _item_disclosure(self, bt, path): |
|
if not self.on_disclosure: |
|
return |
|
s = self.services.get(path) |
|
if s: |
|
self.on_disclosure(path, s) |
|
|
|
def _item_text_get(self, obj, part, item_data): |
|
if part != "elm.text": |
|
return None |
|
t = self.services.get(item_data) |
|
if not t: |
|
return "Unknown" |
|
return t.get("Name", item_data[len("/net/connman/service/"):]) |
|
|
|
def _item_content_get(self, obj, part, item_data): |
|
s = self.services.get(item_data) |
|
if not s: |
|
return None |
|
type = s.get("Type") |
|
state = s.get("State") |
|
error = s.get("Error") |
|
security = s.get("Security") |
|
strength = s.get("Strength") |
|
favorite = s.get("Favorite") |
|
roaming = s.get("Roaming") |
|
auto_connect = s.get("AutoConnect") |
|
connected = (str(state) not in ("idle", "failure")) |
|
|
|
if security: |
|
security = [str(x) for x in security] |
|
if "none" in security: |
|
security.remove("none") |
|
|
|
if part == "elm.swallow.end": |
|
bx = Box(obj) |
|
bx.horizontal = True |
|
bx.homogeneous = True |
|
bx.padding = (2, 0) |
|
bx.align = (1.0, 0.5) |
|
|
|
if connected: |
|
ic = Icon(obj) |
|
ic.standard = "connman-connected" |
|
ic.size_hint_min = ic.size_hint_max = (32, 32) |
|
ic.show() |
|
bx.pack_end(ic) |
|
|
|
if security and favorite: |
|
ic = Icon(obj) |
|
ic.standard = "connman-security-favorite" |
|
ic.size_hint_min = ic.size_hint_max = (32, 32) |
|
ic.show() |
|
bx.pack_end(ic) |
|
elif security: |
|
ic = Icon(obj) |
|
ic.standard = "connman-security" |
|
ic.size_hint_min = ic.size_hint_max = (32, 32) |
|
ic.show() |
|
bx.pack_end(ic) |
|
|
|
ic = Icon(obj) |
|
ic.standard = "arrow_right" |
|
bt = Button(obj) |
|
bt.content = ic |
|
bt.callback_clicked_add(self._item_disclosure, item_data) |
|
bt.propagate_events = False |
|
bt.show() |
|
bt.size_hint_min = bt.size_hint_max = (32, 32) |
|
|
|
bx.pack_end(bt) |
|
return bx |
|
|
|
if part != "elm.swallow.icon": |
|
return |
|
|
|
ly = Layout(obj) |
|
ly.theme_set("icon", type, "default") |
|
ly.size_hint_min_set(32, 32) |
|
|
|
def yesno(val): |
|
return ("no", "yes")[bool(val)] |
|
|
|
def ornone(val): |
|
return val or "none" |
|
|
|
ly.signal_emit("elm,state," + state, "elm") |
|
ly.signal_emit("elm,error," + ornone(error), "elm") |
|
ly.signal_emit("elm,favorite," + yesno(favorite), "elm") |
|
ly.signal_emit("elm,roaming," + yesno(roaming), "elm") |
|
ly.signal_emit("elm,auto_connect," + yesno(auto_connect), "elm") |
|
ly.signal_emit("elm,connected," + yesno(connected), "elm") |
|
|
|
for s in security: |
|
ly.signal_emit("elm,security," + s, "elm") |
|
if security: |
|
ly.signal_emit("elm,security,yes", "elm") |
|
else: |
|
ly.signal_emit("elm,security,none", "elm") |
|
|
|
if strength: |
|
ly.edje.message_send(1, strength) |
|
return ly |
|
|
|
def service_name_get(self, path): |
|
s = self.services.get(path) |
|
if not s: |
|
return None |
|
n = s.get("Name") |
|
if not n: |
|
return None |
|
return str(n) |
|
|
|
|
|
class ServiceView(ObjectView): |
|
"""Provides a detailed view of the service given by C{path}. |
|
|
|
The C{properties} argument is used to populate the current state, |
|
which will be updated with net.connman.Service.PropertyChanged |
|
signal that it will listen. |
|
|
|
User updates will be automatically applied to the server. |
|
""" |
|
bus_interface = "net.connman.Service" |
|
|
|
ipv4_fields = (#("Method", "ipv4_method"), |
|
("Address", "ipv4_address"), |
|
("Netmask", "ipv4_netmask"), |
|
("Gateway", "ipv4_gateway"), |
|
) |
|
ipv6_fields = (#("Method", "ipv6_method"), |
|
("Address", "ipv6_address"), |
|
("Prefix Length", "ipv6_prefix_length"), |
|
("Gateway", "ipv6_gateway"), |
|
#("Privacy", "ipv6_privacy"), |
|
) |
|
proxy_fields = (("Method", "proxy_method"), |
|
("URL", "proxy_url"), |
|
("Servers", "proxy_servers"), |
|
("Excludes", "proxy_excludes"), |
|
) |
|
vpn_fields = ( # named Provider in spec |
|
("Host", "vpn_host"), |
|
("Domain", "vpn_domain"), |
|
("Name", "vpn_name"), |
|
("Type", "vpn_type"), |
|
) |
|
eth_fields = (("Method", "eth_method"), |
|
("Interface", "eth_iface"), |
|
("Address", "eth_addr"), |
|
("MTU", "eth_mtu"), |
|
("Speed", "eth_speed"), |
|
("Duplex", "eth_duplex"), |
|
) |
|
|
|
top_widgets = ( |
|
"connect", |
|
"disconnect", |
|
"forget", |
|
"error", |
|
"auto_connect", |
|
"roaming", |
|
"strength", |
|
"security", |
|
"state", |
|
"nameservers_label", |
|
"nameservers_entry", |
|
"timeservers_label", |
|
"timeservers_entry", |
|
"domains_label", |
|
"domains_entry", |
|
"ipv4_frame", |
|
"ipv6_frame", |
|
"proxy_frame", |
|
"ethernet_frame", |
|
"vpn_frame", |
|
"ieee8021x_frame", |
|
) |
|
|
|
def create_view(self, properties): |
|
self.type = str(properties.get("Type")) |
|
self.security_mode = properties.get("Security") |
|
self.immutable = bool(properties.get("Immutable")) |
|
self.readwrite_list_properties = {} |
|
self.readwrite_list_widget = {} |
|
self.name = str(properties.get("Name")) |
|
|
|
self.connect = self.add_button(self.box, "Connect", self._connect) |
|
self.disconnect = self.add_button(self.box, "Disconnect", |
|
self._disconnect) |
|
|
|
if not self.immutable and self.type != "ethernet": |
|
self.forget = self.add_button(self.box, "Forget Network", |
|
self._forget) |
|
elif self.type == "wifi" and "ieee8021x" in self.security_mode: |
|
pnac_conf.read() |
|
self.forget = self.add_button(self.box, "Forget Network", |
|
self._forget) |
|
if not pnac_conf.has_section(self.name): |
|
self.forget.disabled = True |
|
|
|
self.error = self.add_label(self.box, "error here") |
|
|
|
self.auto_connect = self.add_check(self.box, "Auto connect", |
|
self._on_user_auto_connect) |
|
|
|
if self.type == "cellular": |
|
self.roaming = self.add_check(self.box, "Roaming") |
|
self.roaming.disabled = True |
|
|
|
if properties.get("Strength") is not None: |
|
self.strength = self.add_progress(self.box, "Strength:") |
|
|
|
if self.type == "wifi": |
|
self.security = self.add_label(self.box, "Security:") |
|
|
|
self.state = self.add_label(self.box, properties.get("State")) |
|
|
|
lb, en = self.add_readwrite_list("Name Servers:", "Nameservers", |
|
properties) |
|
self.nameservers_label = lb |
|
self.nameservers_entry = en |
|
|
|
lb, en = self.add_readwrite_list("Time Servers:", "Timeservers", |
|
properties) |
|
self.timeservers_label = lb |
|
self.timeservers_entry = en |
|
|
|
lb, en = self.add_readwrite_list("Domain Names:", "Domains", |
|
properties) |
|
self.domains_label = lb |
|
self.domains_entry = en |
|
|
|
# section: IPv4 |
|
self.ipv4_properties = {"IPv4": {}, "IPv4.Configuration": {}} |
|
fr, bx = self.add_frame_and_box(self.box, "IPv4") |
|
self.ipv4_frame = fr |
|
self.ipv4_box = bx |
|
options = ("Automatic", "Manual", "Off") |
|
self.ipv4_method, self.ipv4_method_items = self.add_segment_control( |
|
bx, options, self._on_ipv4_method) |
|
for name, attr in self.ipv4_fields: |
|
lb, en = self.add_label_and_entry(bx, name) |
|
en.callback_activated_add(self._on_ipv4_property_changed) |
|
en.callback_unfocused_add(self._on_ipv4_property_unfocused) |
|
setattr(self, attr, en) |
|
|
|
# section: IPv6 |
|
self.ipv6_properties = {"IPv6": {}, "IPv6.Configuration": {}} |
|
fr, bx = self.add_frame_and_box(self.box, "IPv6") |
|
self.ipv6_frame = fr |
|
self.ipv6_box = bx |
|
options = ("Automatic", "Manual", "Off") |
|
self.ipv6_method, self.ipv6_method_items = self.add_segment_control( |
|
bx, options, self._on_ipv6_method) |
|
for name, attr in self.ipv6_fields: |
|
lb, en = self.add_label_and_entry(bx, name) |
|
en.callback_activated_add(self._on_ipv6_property_changed) |
|
en.callback_unfocused_add(self._on_ipv6_property_unfocused) |
|
setattr(self, attr, en) |
|
options = ("Disabled", "Enabled", "Prefered") |
|
self.ipv6_privacy_lb, self.ipv6_privacy, self.ipv6_privacy_items = self.add_label_and_segment_control( |
|
bx, options, self._on_ipv6_privacy, "Privacy") |
|
|
|
# section: Proxy: custom contents for direct, auto and manual |
|
# - direct: nothing |
|
# - auto: url |
|
# - manual: servers, excludes |
|
self.proxy_properties = {"Proxy": {}, "Proxy.Configuration": {}} |
|
fr, bx = self.add_frame_and_box(self.box, "Proxy") |
|
self.proxy_frame = fr |
|
self.proxy_box = bx |
|
options = ("Direct", "Automatic", "Manual") |
|
self.proxy_method, self.proxy_method_items = self.add_segment_control( |
|
bx, options, self._on_proxy_method) |
|
self.add_label(bx, "TODO") |
|
|
|
# section: Ethernet / VPN |
|
if self.type in ("wifi", "ethernet", "wimax", "bluetooth", "cellular"): |
|
fr, bx = self.add_readonly_section("Ethernet", self.eth_fields) |
|
self.ethernet_frame = fr |
|
elif self.type == "vpn": |
|
fr, bx = self.add_readonly_section("VPN", self.vpn_fields) |
|
self.vpn_frame = fr |
|
|
|
# section: Two Phase Authentication |
|
if (self.type == "wifi") and ("ieee8021x" in self.security_mode): |
|
fr, bx = self.add_frame_and_box(self.box, "ieee8021x") |
|
self.ieee8021x_frame = fr |
|
#cfg_sec = pnac_conf.section_get(self.name) |
|
|
|
lb = self.add_label(bx, "EAP:") |
|
options = ("PEAP", "TLS", "TTLS", "None") |
|
self.eap_method, self.eap_method_items = self.add_segment_control( |
|
bx, options, self._on_eap_method |
|
) |
|
if pnac_conf.has_section(self.name): |
|
conf_val = pnac_conf.get(self.name, 'EAP') |
|
if conf_val == "peap": |
|
self.eap_method_items["PEAP"].selected = True |
|
elif conf_val == "tls": |
|
self.eap_method_items["TLS"].selected = True |
|
elif conf_val == "ttls": |
|
self.eap_method_items["TTLS"].selected = True |
|
elif conf_val is None: |
|
self.eap_method_items["None"].selected = True |
|
|
|
options = ("TLS", "MSCHAPv2", "None") |
|
lb = self.add_label(bx, "Phase2:") |
|
self.phase2, self.phase2_items = self.add_segment_control( |
|
bx, options, self._on_phase2 |
|
) |
|
if pnac_conf.has_section(self.name): |
|
conf_val = pnac_conf.get(self.name, 'Phase2') |
|
if conf_val == "tls": |
|
self.phase2_items["TLS"].selected = True |
|
elif conf_val == "MSCHAPV2": |
|
self.phase2_items["MSCHAPv2"].selected = True |
|
elif conf_val is None: |
|
self.phase2_items["None"].selected = True |
|
|
|
def add_readonly_section(self, title, fields): |
|
fr, bx = self.add_frame_and_box(self.box, title) |
|
for name, attr in fields: |
|
lb, en = self.add_label_and_entry(bx, "%s:" % (name,)) |
|
en.editable = False |
|
setattr(self, attr, en) |
|
return fr, bx |
|
|
|
def populate_fields(self, fields, value): |
|
for n, a in fields: |
|
v = value.get(n) |
|
if v: |
|
en = getattr(self, a) |
|
en.text = str(v) |
|
|
|
def _readwrite_list_conv(self, a): |
|
if not a: |
|
return "" |
|
return ", ".join(str(x).strip() for x in a) |
|
|
|
def add_readwrite_list(self, title, name, properties): |
|
conf = "%s.Configuration" % (name,) |
|
|
|
used_value = self._readwrite_list_conv(properties.get(name)) |
|
conf_value = self._readwrite_list_conv(properties.get(conf)) |
|
self.readwrite_list_properties[name] = used_value |
|
self.readwrite_list_properties[conf] = conf_value |
|
|
|
def on_changed(obj): |
|
value = obj.text.strip() |
|
orig_value = self.readwrite_list_properties[name] |
|
conf_value = self.readwrite_list_properties[conf] |
|
if (conf_value and value != conf_value) or \ |
|
(not conf_value and value != orig_value): |
|
log.debug( |
|
"User changed %s=%r (%r, %r)" % ( |
|
name, value, |
|
orig_value, conf_value |
|
) |
|
) |
|
value = value.strip() |
|
if not value: |
|
value_array = [] |
|
else: |
|
value_array = list(x.strip() for x in value.split(",")) |
|
self._on_readwrite_changed(conf, value_array) |
|
|
|
def on_unfocused(obj): |
|
self.reload_readwrite_list(name) |
|
|
|
lb, en = self.add_label_and_entry(self.box, title, on_changed) |
|
en.callback_unfocused_add(on_unfocused) |
|
self.readwrite_list_widget[name] = en |
|
self.reload_readwrite_list(name) |
|
return lb, en |
|
|
|
def reload_readwrite_list(self, name): |
|
used_value = self.readwrite_list_properties[name] |
|
conf_value = self.readwrite_list_properties["%s.Configuration" % name] |
|
en = self.readwrite_list_widget[name] |
|
log.debug("%s=%r, %r", name, used_value, conf_value) |
|
en.text = conf_value or used_value |
|
|
|
def update_readwrite_list(self, name, value): |
|
value = self._readwrite_list_conv(value) |
|
if value == self.readwrite_list_properties[name]: |
|
return |
|
self.readwrite_list_properties[name] = value |
|
if name.endswith(".Configuration"): |
|
key = name[:-len(".Configuration")] |
|
else: |
|
key = name |
|
self.reload_readwrite_list(key) |
|
|
|
def on_property_changed(self, name, value): |
|
log.debug("Changed %s: %s=%s", self.path, name, value) |
|
|
|
visibility_changed = False |
|
|
|
if name == "Type": |
|
self.type = str(value) |
|
elif name == "Immutable": |
|
value = bool(value) |
|
self.immutable = value |
|
self.auto_connect.disabled = value |
|
for w in self.readwrite_list_widget.values(): |
|
w.disabled = value |
|
self.ipv4_method.disabled = value |
|
self.ipv4_address.disabled = value |
|
self.ipv4_netmask.disabled = value |
|
self.ipv4_gateway.disabled = value |
|
self.ipv6_method.disabled = value |
|
self.ipv6_address.disabled = value |
|
self.ipv6_prefix_length.disabled = value |
|
self.ipv6_gateway.disabled = value |
|
self.ipv6_privacy.disabled = value |
|
self.proxy_method.disabled = value |
|
elif name == "Favorite": |
|
value = bool(value) |
|
if hasattr(self, "forget"): |
|
if self.forget.visible != value: |
|
self.forget.visible = value |
|
visibility_changed = True |
|
elif name == "State": |
|
value = str(value) |
|
visible = (value == "failure") |
|
self.state.text = "State: %s" % (value,) |
|
if self.error.visible != visible: |
|
self.error.visible = visible |
|
visibility_changed = True |
|
connected = (value not in ("idle", "failure")) |
|
if self.disconnect.visible != connected: |
|
self.disconnect.visible = connected |
|
visibility_changed = True |
|
if self.connect.visible == connected: |
|
self.connect.visible = not connected |
|
visibility_changed = True |
|
elif name == "Error": |
|
self.error.text = "Error: %s" % value |
|
elif name == "AutoConnect": |
|
self.auto_connect.state = bool(value) |
|
elif name == "Strength": |
|
self.strength.value = float(value) / 100.0 |
|
elif self.type == "wifi" and name == "Security": |
|
s = ", ".join(str(x) for x in value) |
|
self.security.text = "Security: %s" % (s,) |
|
elif name == "Roaming": |
|
self.roaming.text = str(value) |
|
elif name == "Ethernet": |
|
self.populate_fields(self.eth_fields, value) |
|
elif name == "Provider": |
|
self.populate_fields(self.vpn_fields, value) |
|
elif name in ("IPv4", "IPv4.Configuration"): |
|
self.ipv4_properties[name] = value |
|
used = self.ipv4_properties["IPv4"] |
|
conf = self.ipv4_properties["IPv4.Configuration"] |
|
|
|
def get_val(name): |
|
v = used.get(name) or conf.get(name) |
|
if not v: |
|
return "" |
|
return str(v) |
|
self.ipv4_address.text = get_val("Address") |
|
self.ipv4_netmask.text = get_val("Netmask") |
|
self.ipv4_gateway.text = get_val("Gateway") |
|
|
|
method = str(conf.get("Method", "")) |
|
editable = (method == "manual") and (not self.immutable) |
|
self.ipv4_address.editable = editable |
|
self.ipv4_netmask.editable = editable |
|
self.ipv4_gateway.editable = editable |
|
|
|
if method in ("dhcp", "fixed"): |
|
self.ipv4_method_items["Automatic"].selected = True |
|
elif method == "manual": |
|
self.ipv4_method_items["Manual"].selected = True |
|
elif method == "off": |
|
self.ipv4_method_items["Off"].selected = True |
|
elif method: |
|
log.error("Unknown method: %s", method) |
|
elif name in ("IPv6", "IPv6.Configuration"): |
|
self.ipv6_properties[name] = value |
|
used = self.ipv6_properties["IPv6"] |
|
conf = self.ipv6_properties["IPv6.Configuration"] |
|
|
|
def get_val(name): |
|
v = used.get(name) or conf.get(name) |
|
if not v: |
|
return "" |
|
return str(v) |
|
self.ipv6_address.text = get_val("Address") |
|
self.ipv6_prefix_length.text = get_val("PrefixLength") |
|
self.ipv6_gateway.text = get_val("Gateway") |
|
|
|
method = str(conf.get("Method", "")) |
|
editable = (method == "manual") and (not self.immutable) |
|
self.ipv6_address.editable = editable |
|
self.ipv6_prefix_length.editable = editable |
|
self.ipv6_gateway.editable = editable |
|
# privacy has only meaning if Method is set to "auto" |
|
editable = (method == "auto") and (not self.immutable) |
|
self.ipv6_privacy.disabled = not editable |
|
|
|
if method in ("auto", "fixed", "6to4"): |
|
self.ipv6_method_items["Automatic"].selected = True |
|
elif method == "manual": |
|
self.ipv6_method_items["Manual"].selected = True |
|
elif method == "off": |
|
self.ipv6_method_items["Off"].selected = True |
|
elif method: |
|
log.error("Unknown method: %s", method) |
|
|
|
privacy = str(conf.get("Privacy", "")) |
|
if privacy == "disabled": |
|
self.ipv6_privacy_items["Disabled"].selected = True |
|
elif privacy == "enabled": |
|
self.ipv6_privacy_items["Enabled"].selected = True |
|
elif privacy == "prefered": |
|
self.ipv6_privacy_items["Prefered"].selected = True |
|
elif privacy: |
|
log.error("Unknown privacy: %s", privacy) |
|
|
|
elif name in ("Proxy", "Proxy.Configuration"): |
|
self.proxy_properties[name] = value |
|
used = self.proxy_properties["Proxy"] |
|
conf = self.proxy_properties["Proxy.Configuration"] |
|
|
|
def get_val(name): |
|
v = used.get(name) or conf.get(name) |
|
if not v: |
|
return "" |
|
return str(v) |
|
# url, servers, excludes |
|
|
|
method = str(conf.get("Method", "")) |
|
editable = (method == "manual") and (not self.immutable) |
|
# use editable... |
|
|
|
if method == "direct": |
|
self.proxy_method_items["Direct"].selected = True |
|
elif method == "manual": |
|
self.proxy_method_items["Manual"].selected = True |
|
elif method == "auto": |
|
self.proxy_method_items["Automatic"].selected = True |
|
elif method: |
|
log.error("Unknown method: %s", method) |
|
elif name in self.readwrite_list_properties: |
|
self.update_readwrite_list(name, value) |
|
|
|
if visibility_changed: |
|
self.box.unpack_all() |
|
for attr in self.top_widgets: |
|
if hasattr(self, attr): |
|
wid = getattr(self, attr) |
|
if wid.visible: |
|
self.box.pack_end(wid) |
|
|
|
def _disconnect(self, obj): |
|
def on_reply(): |
|
log.debug("Disconnected %s", self.path) |
|
self.disconnect.disabled = False |
|
|
|
def on_error(exc): |
|
log.error("Could not disconnect %s", exc) |
|
self.disconnect.disabled = False |
|
|
|
self.bus_obj.Disconnect(reply_handler=on_reply, error_handler=on_error) |
|
self.disconnect.disabled = True |
|
|
|
def _connect(self, obj): |
|
def on_reply(): |
|
log.debug("Connected %s", self.path) |
|
self.connect.disabled = False |
|
|
|
def on_error(exc): |
|
log.error("Could not connect %s", exc) |
|
self.connect.disabled = False |
|
|
|
self.bus_obj.Connect(reply_handler=on_reply, error_handler=on_error) |
|
self.connect.disabled = True |
|
|
|
def _forget(self, obj): |
|
def on_reply(): |
|
log.debug("Removed %s", self.path) |
|
self.forget.disabled = False |
|
|
|
def on_error(exc): |
|
log.error("Could not remove %s", exc) |
|
self.forget.disabled = False |
|
|
|
if self.type == "wifi" and "ieee8021x" in self.security_mode: |
|
if pnac_conf.has_section(self.name): |
|
pnac_conf.remove_section(self.name) |
|
for it in self.phase2_items: |
|
self.phase2_items[it].selected = False |
|
for it in self.eap_method_items: |
|
self.eap_method_items[it].selected = False |
|
else: |
|
self.bus_obj.Remove(reply_handler=on_reply, error_handler=on_error) |
|
self.forget.disabled = True |
|
|
|
def _on_user_auto_connect(self, obj): |
|
state = obj.state |
|
|
|
def on_reply(): |
|
log.info("Set AutoConnect=%s", state) |
|
|
|
def on_error(exc): |
|
log.error("Failed to set AutoConnect=%s: %s", state, exc) |
|
obj.state = not state |
|
popup_error(self.obj, "Failed to Apply Auto Connect", |
|
exc.get_dbus_message()) |
|
|
|
self.bus_obj.SetProperty( |
|
"AutoConnect", dbus.Boolean(state), |
|
reply_handler=on_reply, error_handler=on_error |
|
) |
|
|
|
def _on_readwrite_changed(self, name, value): |
|
def on_reply(): |
|
log.info("Set %s=%s", name, value) |
|
|
|
def on_error(exc): |
|
log.error("Failed to set %s=%s: %s", name, value, exc) |
|
key = name[:-len(".Configuration")] |
|
popup_error(self.obj, "Failed to Apply %s" % (key,), |
|
exc.get_dbus_message()) |
|
self.reload_readwrite_list(key) |
|
self.bus_obj.SetProperty( |
|
name, dbus.Array(value, signature="s"), |
|
reply_handler=on_reply, error_handler=on_error |
|
) |
|
|
|
def _ipv4_apply(self): |
|
value = self.ipv4_method.item_selected.text |
|
if value == "Automatic": |
|
method = "dhcp" |
|
elif value == "Manual": |
|
method = "manual" |
|
elif value == "Off": |
|
method = "off" |
|
|
|
def make_variant(s): |
|
return dbus.String(s, variant_level=1) |
|
new = {"Method": make_variant(method)} |
|
if method == "manual": |
|
if self.ipv4_address.text: |
|
new["Address"] = make_variant(self.ipv4_address.text) |
|
if self.ipv4_netmask.text: |
|
new["Netmask"] = make_variant(self.ipv4_netmask.text) |
|
if self.ipv4_gateway.text: |
|
new["Gateway"] = make_variant(self.ipv4_gateway.text) |
|
if len(new) == 1: # no properties yet |
|
return |
|
|
|
conf = self.ipv4_properties["IPv4.Configuration"] |
|
changed = [] |
|
for k, v in new.items(): |
|
if conf.get(k) != v: |
|
changed.append(k) |
|
log.debug("Changed IPv4: %s", ", ".join(changed)) |
|
if not changed: |
|
return |
|
|
|
def on_reply(): |
|
log.info("Set IPv4=%s", new) |
|
|
|
def on_error(exc): |
|
log.error("Failed to set IPv4.Configuration=%s: %s", new, exc) |
|
popup_error(self.obj, "Failed to Apply IPv4", |
|
exc.get_dbus_message()) |
|
self.bus_obj.SetProperty( |
|
"IPv4.Configuration", new, |
|
reply_handler=on_reply, error_handler=on_error |
|
) |
|
|
|
def _on_ipv4_method(self, obj, item): |
|
if item.text == "Automatic": |
|
method = "dhcp" |
|
elif item.text == "Manual": |
|
method = "manual" |
|
elif item.text == "Off": |
|
method = "off" |
|
conf = self.ipv4_properties["IPv4.Configuration"] |
|
editable = (method == "manual") and (not self.immutable) |
|
self.ipv4_address.editable = editable |
|
self.ipv4_netmask.editable = editable |
|
self.ipv4_gateway.editable = editable |
|
if method == conf["Method"]: |
|
return |
|
self._ipv4_apply() |
|
|
|
def _on_ipv4_property_changed(self, obj): |
|
self._ipv4_apply() |
|
|
|
def _on_ipv4_property_unfocused(self, obj): |
|
used = self.ipv4_properties["IPv4"] |
|
conf = self.ipv4_properties["IPv4.Configuration"] |
|
|
|
def get_val(name): |
|
v = used.get(name) or conf.get(name) |
|
if not v: |
|
return "" |
|
return str(v) |
|
self.ipv4_address.text = get_val("Address") |
|
self.ipv4_netmask.text = get_val("Netmask") |
|
self.ipv4_gateway.text = get_val("Gateway") |
|
|
|
def _ipv6_apply(self): |
|
value = self.ipv6_method.item_selected.text |
|
if value == "Automatic": |
|
method = "auto" |
|
elif value == "Manual": |
|
method = "manual" |
|
elif value == "Off": |
|
method = "off" |
|
|
|
def make_variant(s): |
|
return dbus.String(s, variant_level=1) |
|
new = {"Method": make_variant(method)} |
|
if method == "manual": |
|
if self.ipv6_address.text: |
|
new["Address"] = make_variant(self.ipv6_address.text) |
|
if self.ipv6_prefix_length.text: |
|
new["PrefixLength"] = make_variant(self.ipv6_prefix_length.text) |
|
if self.ipv6_gateway.text: |
|
new["Gateway"] = make_variant(self.ipv6_gateway.text) |
|
value = self.ipv6_privacy.item_selected.text |
|
if value: |
|
new["Privacy"] = make_variant(value) |
|
if len(new) == 1: # no properties yet |
|
return |
|
|
|
conf = self.ipv6_properties["IPv6.Configuration"] |
|
changed = [] |
|
for k, v in new.items(): |
|
if conf.get(k) != v: |
|
changed.append(k) |
|
log.debug("Changed IPv6: %s", ", ".join(changed)) |
|
if not changed: |
|
return |
|
|
|
def on_reply(): |
|
log.info("Set IPv6=%s", new) |
|
|
|
def on_error(exc): |
|
log.error("Failed to set IPv6.Configuration=%s: %s", new, exc) |
|
popup_error(self.obj, "Failed to Apply IPv6", |
|
exc.get_dbus_message()) |
|
self.bus_obj.SetProperty( |
|
"IPv6.Configuration", new, |
|
reply_handler=on_reply, error_handler=on_error |
|
) |
|
|
|
def _on_ipv6_method(self, obj, item): |
|
if item.text == "Automatic": |
|
method = "auto" |
|
elif item.text == "Manual": |
|
method = "manual" |
|
elif item.text == "Off": |
|
method = "off" |
|
conf = self.ipv6_properties["IPv6.Configuration"] |
|
|
|
editable = (method == "manual") and (not self.immutable) |
|
self.ipv6_address.editable = editable |
|
self.ipv6_prefix_length.editable = editable |
|
self.ipv6_gateway.editable = editable |
|
# privacy has only meaning if Method is set to "auto" |
|
editable = (method == "auto") and (not self.immutable) |
|
self.ipv6_privacy.disabled = not editable |
|
|
|
if method == conf["Method"]: |
|
return |
|
self._ipv6_apply() |
|
|
|
def _on_ipv6_privacy(self, obj, item): |
|
if item.text == "Disabled": |
|
privacy = "disabled" |
|
elif item.text == "Enabled": |
|
privacy = "enabled" |
|
elif item.text == "Prefered": |
|
privacy = "prefered" |
|
conf = self.ipv6_properties["IPv6.Configuration"] |
|
if privacy == conf["Privacy"]: |
|
return |
|
self._ipv6_apply() |
|
|
|
def _on_ipv6_property_changed(self, obj): |
|
self._ipv6_apply() |
|
|
|
def _on_ipv6_property_unfocused(self, obj): |
|
used = self.ipv6_properties["IPv6"] |
|
conf = self.ipv6_properties["IPv6.Configuration"] |
|
|
|
def get_val(name): |
|
v = used.get(name) or conf.get(name) |
|
if not v: |
|
return "" |
|
return str(v) |
|
self.ipv6_address.text = get_val("Address") |
|
self.ipv6_prefix_length.text = get_val("PrefixLength") |
|
self.ipv6_gateway.text = get_val("Gateway") |
|
#self.ipv6_privacy.text = get_val("Privacy") |
|
|
|
def _on_proxy_method(self, obj, item): |
|
if item.text == "Direct": |
|
method = "direct" |
|
elif item.text == "Manual": |
|
method = "manual" |
|
elif item.text == "Automatic": |
|
method = "auto" |
|
conf = self.proxy_properties["Proxy.Configuration"] |
|
#editable = (method == "manual") and (not self.immutable) |
|
# use editable... |
|
if method == conf["Method"]: |
|
return |
|
#self._proxy_apply() |
|
|
|
def _on_proxy_changed(self, obj): |
|
pass |
|
#self._proxy_apply() |
|
|
|
def _on_proxy_unfocused(self, obj): |
|
pass |
|
#revert to configured values... |
|
|
|
def _on_eap_method(self, obj, item): |
|
eap_val = None |
|
if item.text == "PEAP": |
|
eap_val = "peap" |
|
elif item.text == "TLS": |
|
eap_val = "tls" |
|
elif item.text == "TTLS": |
|
eap_val = "ttls" |
|
elif item.text == "None": |
|
eap_val = None |
|
|
|
pnac_conf.set(self.name, "EAP", eap_val) |
|
pnac_conf.write() |
|
return |
|
|
|
def _on_phase2(self, obj, item): |
|
phase2_val = None |
|
if item.text == "MSCHAPv2": |
|
phase2_val = "MSCHAPV2" |
|
elif item.text == "TLS": |
|
phase2_val = "tls" |
|
elif item.text == "None": |
|
phase2_val = None |
|
|
|
pnac_conf.set(self.name, 'Phase2', phase2_val) |
|
pnac_conf.write() |
|
return |
|
|
|
|
|
######################################################################## |
|
# Main Actions: |
|
def show_techs(button, naviframe): |
|
def on_selected(path, properties): |
|
name = str(properties.get("Name")) |
|
log.debug("view technology: %r %s", name, path) |
|
tv = TechView(naviframe, path, properties) |
|
naviframe.item_push(name, None, None, tv.obj, "basic") |
|
|
|
tl = TechList(naviframe, on_selected) |
|
naviframe.item_push("Technologies", None, None, tl.obj, "basic") |
|
|
|
|
|
def connect_service(path, properties): |
|
type = properties.get("Type") |
|
if not type: |
|
log.error("cannot try to connect to service without type: %s", path) |
|
return |
|
if type in ("system", "gps", "gadget"): |
|
log.error("cannot connect to service with type: %s", type) |
|
return |
|
sec = properties.get("Security") |
|
|
|
name = properties.get("Name") |
|
if name: |
|
name = str(name) |
|
log.debug("connect to %s (%s): %s", |
|
name, path, dbus_dict_to_str(properties)) |
|
|
|
# Connman only supports two phase auth via config files |
|
if ("ieee8021x" in sec) and not pnac_conf.has_section(name): |
|
popup_error( |
|
win, |
|
"This Network needs Configuration", |
|
"This network uses 802.1x authentication. " |
|
"Please configure the options in the section at the bottom." |
|
) |
|
show_service(path, properties) |
|
return |
|
|
|
def on_reply(): |
|
log.info("Connected to %s (%s)", name, path) |
|
|
|
def on_error(exc): |
|
exc_name = exc.get_dbus_name() |
|
if exc_name == "net.connman.Error.AlreadyConnected" or \ |
|
exc_name == "net.connman.Error.InProgress": |
|
log.debug("Failed to Connect to %s (%s): %s", name, path, exc) |
|
return |
|
log.error("Failed to Connect to %s (%s): %s", name, path, exc) |
|
if exc_name == "net.connman.Error.NotRegistered": |
|
popup_error(win, "Failed to Connect to %s" % name, |
|
"Not registered. Try running \"$ econnman-bin -a\"") |
|
return |
|
popup_error(win, "Failed to Connect to %s" % name, |
|
exc.get_dbus_message()) |
|
|
|
service = dbus.Interface(bus.get_object("net.connman", path), |
|
"net.connman.Service") |
|
service.Connect(reply_handler=on_reply, |
|
error_handler=on_error) |
|
|
|
|
|
def show_service(path, properties): |
|
name = str(properties.get("Name")) |
|
log.debug("view service: %r %s", name, path) |
|
sv = ServiceView(nf, path, properties) |
|
nf.item_push(name, None, None, sv.obj, "basic") |
|
|
|
|
|
######################################################################## |
|
# Agent: |
|
def agent_method(in_signature="", out_signature="", **kargs): |
|
return dbus.service.method("net.connman.Agent", |
|
in_signature=in_signature, |
|
out_signature=out_signature, |
|
**kargs) |
|
|
|
|
|
class Agent(dbus.service.Object): |
|
path = "/org/enlightenment/econnman/agent" |
|
|
|
request_type_conv = { |
|
"SSID": dbus.ByteArray, |
|
} |
|
|
|
class Canceled(dbus.DBusException): |
|
_dbus_error_name = "net.connman.Agent.Error.Canceled" |
|
|
|
def __init__(self, serv_lst): |
|
dbus.service.Object.__init__(self, bus, self.path) |
|
self.dialog = None |
|
self.serv_lst = serv_lst |
|
|
|
@agent_method() |
|
def Release(self): |
|
log.info("Agent released by ConnMan") |
|
if self.dialog: |
|
self.dialog.delete() |
|
self.dialog = None |
|
|
|
@agent_method(in_signature="os") |
|
def RequestBrowser(self, path, url): |
|
log.info("Open browser for %s at %s", path, url) |
|
ecore.exe_run("xdg-open '%s'" % url) |
|
|
|
@agent_method(in_signature="os") |
|
def ReportError(self, path, error): |
|
log.error("ConnMan error %s: %s", path, error) |
|
popup_error(win, "ConnMan Error", str(error)) |
|
|
|
@agent_method() |
|
def Cancel(self): |
|
log.info("Canceled dialog") |
|
if self.dialog: |
|
self.dialog.delete() |
|
self.dialog = None |
|
|
|
@agent_method(in_signature="oa{sv}", out_signature="a{sv}", |
|
async_callbacks=("on_done", "on_error")) |
|
def RequestInput(self, path, fields, on_done, on_error): |
|
log.debug("Request Input for %s: %s", path, dbus_dict_to_str(fields)) |
|
|
|
def on_deleted(obj): |
|
w = self.dialog |
|
self.dialog = None |
|
if w: |
|
e = Agent.Canceled("user canceled") |
|
log.debug("User canceled agent request: %s", e) |
|
on_error(e) |
|
|
|
def on_clicked(obj): |
|
response = {} |
|
keys = [] |
|
for name, en in widgets.items(): |
|
conv = self.request_type_conv.get(name, dbus.String) |
|
v = conv(en.text) |
|
if v: |
|
response[name] = v |
|
keys.append(name) |
|
log.debug("User Replies with keys: %s", ", ".join(keys)) |
|
w = self.dialog |
|
self.dialog = None |
|
on_done(response) |
|
w.delete() |
|
|
|
self.dialog = w = Window("econnman-agent", ELM_WIN_DIALOG_BASIC) |
|
w.title = "ConnMan Requested Input" |
|
w.icon_name = "econnman" |
|
w.autodel = True |
|
w.on_del_add(on_deleted) |
|
w.show() |
|
|
|
bg = Background(w) |
|
bg.size_hint_weight = EXPAND_BOTH |
|
bg.show() |
|
w.resize_object_add(bg) |
|
|
|
bx = Box(w) |
|
bx.size_hint_align = FILL_BOTH |
|
bx.horizontal = False |
|
bx.show() |
|
w.resize_object_add(bx) |
|
|
|
lb = Label(bx) |
|
lb.size_hint_weight = EXPAND_HORIZ |
|
lb.size_hint_align = FILL_BOTH |
|
lb.text = "<b>ConnMan needs your input</b>" |
|
lb.show() |
|
bx.pack_end(lb) |
|
|
|
name = self.serv_lst.service_name_get(path) |
|
if name: |
|
lb = Label(bx) |
|
lb.size_hint_weight = EXPAND_HORIZ |
|
lb.size_hint_align = FILL_BOTH |
|
lb.text = "Service: %s" % (name,) |
|
lb.show() |
|
bx.pack_end(lb) |
|
|
|
widgets = {} |
|
for name, desc in fields.items(): |
|
decos = "" |
|
t = desc.get("Type") |
|
if t and t != "informational": |
|
decos += " (type: %s)" % (t,) |
|
if desc.get("Requirement") == "mandatory": |
|
decos += " REQUIRED" |
|
lb = Label(bx) |
|
lb.size_hint_weight = EXPAND_HORIZ |
|
lb.size_hint_align = FILL_BOTH |
|
lb.text = "%s:%s" % (name, decos) |
|
lb.show() |
|
bx.pack_end(lb) |
|
|
|
en = Entry(bx) |
|
en.size_hint_weight = EXPAND_HORIZ |
|
en.size_hint_align = FILL_BOTH |
|
en.single_line = True |
|
en.scrollable = True |
|
en.text = desc.get("Value", "") |
|
en.editable = (t != "informational") |
|
en.show() |
|
bx.pack_end(en) |
|
widgets[name] = en |
|
|
|
bt = Button(bx) |
|
bt.size_hint_weight = EXPAND_HORIZ |
|
bt.size_hint_align = FILL_BOTH |
|
bt.callback_clicked_add(on_clicked) |
|
bt.text = "Submit" |
|
bt.show() |
|
bx.pack_end(bt) |
|
|
|
|
|
######################################################################## |
|
# GUI helpers: |
|
def popup_fatal(obj, title, message): |
|
"""Shows a popup with a fatal message and a Quit button. |
|
|
|
Dismissing this popup with the Quit button also exits the application. |
|
""" |
|
win = obj.top_widget_get() |
|
log.critical("%s: %s", title, message) |
|
pop = Popup(win) |
|
pop.size_hint_weight = EXPAND_BOTH |
|
pop.part_text_set("title,text", title) |
|
pop.text = message |
|
|
|
bt = Button(win) |
|
bt.text = "Quit" |
|
bt.callback_clicked_add(lambda bt: elm.exit()) |
|
pop.part_content_set("button1", bt) |
|
pop.show() |
|
return pop |
|
|
|
|
|
def popup_error(obj, title, message): |
|
"""Shows a popup with an error message and a Close button.""" |
|
win = obj.top_widget_get() |
|
log.error("%s: %s", title, message) |
|
pop = Popup(win) |
|
pop.size_hint_weight = EXPAND_BOTH |
|
pop.part_text_set("title,text", title) |
|
pop.text = message |
|
|
|
bt = Button(win) |
|
bt.text = "Close" |
|
bt.callback_clicked_add(lambda bt: pop.delete()) |
|
pop.part_content_set("button1", bt) |
|
pop.show() |
|
return pop |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser( |
|
description="Connection Manager for Enlightenment") |
|
parser.add_argument("-v", "--verbose", action="count") |
|
parser.add_argument("-a", "--agent", action="store_true") |
|
args = parser.parse_args() |
|
|
|
level = logging.WARNING |
|
if args.verbose: |
|
level -= 10 * args.verbose |
|
log.setLevel(level) |
|
|
|
pnac_conf = PNACConfig() |
|
|
|
elm.init() |
|
elm.policy_set(ELM_POLICY_QUIT, ELM_POLICY_QUIT_LAST_WINDOW_CLOSED) |
|
|
|
for td in ("./data/theme/default.edj", "@PKGDATADIR@/theme/default.edj"): |
|
if os.path.exists(td): |
|
Theme(default=True).extension_add(td) |
|
|
|
win = Window("econnman", ELM_WIN_BASIC) |
|
win.title = "EConnMan" |
|
win.icon_name = "econnman" |
|
win.autodel = True |
|
win.size = (480, 700) |
|
win.show() |
|
|
|
bg = Background(win) |
|
bg.size_hint_weight = EXPAND_BOTH |
|
bg.show() |
|
win.resize_object_add(bg) |
|
|
|
try: |
|
manager = dbus.Interface(bus.get_object("net.connman", "/"), |
|
"net.connman.Manager") |
|
except dbus.exceptions.DBusException: |
|
popup_fatal(win, "Failed to find ConnMan", |
|
"Check if ConnMan is running.") |
|
elm.run() |
|
elm.shutdown() |
|
raise |
|
|
|
nf = Naviframe(win) |
|
nf.size_hint_weight = EXPAND_BOTH |
|
nf.show() |
|
win.resize_object_add(nf) |
|
|
|
offline_mon = OfflineModeMonitor(win) |
|
|
|
techs = Button(win) |
|
techs.text = "Techs" |
|
techs.callback_clicked_add(show_techs, nf) |
|
|
|
serv_lst = ServicesList(win, connect_service, show_service) |
|
|
|
nf.item_push("EConnMan", offline_mon.obj, techs, serv_lst.obj, "basic") |
|
|
|
if args.agent: |
|
log.debug("create agent") |
|
agent = Agent(serv_lst) |
|
manager.RegisterAgent(agent.path) |
|
log.info("Registered agent at %s", agent.path) |
|
|
|
elm.run() |
|
#pnac_conf.write() |
|
logging.shutdown() |
|
elm.shutdown()
|
|
|