epour/epour/session.py

755 lines
23 KiB
Python

#
# Epour - A bittorrent client using EFL and libtorrent
#
# Copyright 2012-2017 Kai Huuhko <kai.huuhko@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
import os
import mimetypes
from urllib.parse import urlparse, urlsplit
import logging
import shutil
from distutils.version import LooseVersion
try:
import cPickle
except ImportError:
import pickle as cPickle
from collections import OrderedDict
import libtorrent as lt
from efl.ecore import Timer
from xdg.BaseDirectory import save_data_path, load_data_paths
lt_version = LooseVersion(lt.version)
lt_version_post_breaking_change = lt_version >= LooseVersion("1.2.0.0")
log = logging.getLogger("epour.session")
flags_t = lt.add_torrent_params_flags_t
default_flags = (
flags_t.flag_apply_ip_filter +
flags_t.flag_update_subscribe +
flags_t.flag_duplicate_is_error +
flags_t.flag_auto_managed)
def read_torrent_file(info_hash):
if not info_hash:
log.debug("Tried to read torrent with invalid info_hash.")
return
info_hash = str(info_hash)
log.debug("Reading torrent file %s.torrent", info_hash)
paths = load_data_paths("epour")
for p in paths:
t_path = os.path.join(p, "%s.torrent" % info_hash)
if os.path.exists(t_path):
ti = lt.torrent_info(t_path)
return ti
def read_resume_data(info_hash):
log.debug("Reading resume data for %s", info_hash)
data = None
paths = load_data_paths("epour")
for p in paths:
t_path = os.path.join(p, "%s.fastresume" % info_hash)
if os.path.exists(t_path):
with open(t_path, "rb") as fp:
data = fp.read()
break
if data:
return data
else:
raise ValueError("Fast Resume data not found")
def lt_compatibility_convert(params):
for k, v in params.items():
if type(v) == lt.sha1_hash:
params[k] = v.to_bytes()
def get_session_settings(session: lt.session) -> dict:
"""
Returns a dictionary containing all the settings pairs (key, value), regardless of the version of libtorrent.
"""
if lt_version_post_breaking_change:
settings = session.get_settings()
else:
settings = session.settings().__dict__
return settings
def save_settings(session: lt.session, settings: dict) -> None:
"""
Save the settings on a session object. It uses the right API according to the libtorrent version.
"""
if lt_version_post_breaking_change:
session.apply_settings(settings)
else:
session.set_settings(settings)
class Session(lt.session):
def __init__(self, conf, shutdown_cb):
self.conf = conf
self._shutdown_cb = shutdown_cb
self._shutdown_timer = None
self._torrents_changed = False
self.torrents = OrderedDict()
self._outstanding_resume_data = 0
from epour import __version__ as version
ver_ints = []
for s in version.split("."):
ver_ints.append(int(s))
ver_ints.append(0)
fp = lt.fingerprint("EP", *ver_ints)
log.debug("peer-id: %s", fp)
lt.session.__init__(
self,
fingerprint=fp,
# flags=
#lt.session_flags_t.add_default_plugins|
#lt.session_flags_t.start_default_features
)
log.info("Session started")
#rsdpipsdtsppe
#stheprtertoer
#satrboabaorer
#|t|flgtucrtro
#|s|oorugkam|r
#|||rces|ega||
#|||mks||rep||
mask = 0b0000001000001
self.set_alert_mask(mask)
self.listen_on(
conf.getint("Settings", "listen_low"),
conf.getint("Settings", "listen_high"))
self.alert_manager = AlertManager(self)
self.alert_manager.callback_add("add_torrent_alert", self._add_torrent_cb)
self.alert_manager.callback_add("metadata_received_alert", self._metadata_received_cb)
def save_resume_alert_cb(a):
handle = a.handle
if handle is None:
log.error("Tried to write resume data with invalid handle.")
return
info_hash = str(handle.info_hash())
data = a.resume_data
self.torrents[info_hash].write_resume_data(data)
self._outstanding_resume_data -= 1
log.debug("Resume data written for %s", info_hash)
def save_resume_failed_alert_cb(a):
log.error(a.message)
self._outstanding_resume_data -= 1
self.alert_manager.callback_add("save_resume_data_alert", save_resume_alert_cb)
self.alert_manager.callback_add("save_resume_data_failed_alert", save_resume_failed_alert_cb)
def torrent_finished_move_cb(a):
h = a.handle
if conf.getboolean("Settings", "move_completed_enabled"):
path = conf.get("Settings", "move_completed_path")
if h.save_path() == path:
return
h.move_storage(path)
self.alert_manager.callback_add(
"torrent_finished_alert", torrent_finished_move_cb)
def periodic_save_func():
self.save_resume_data()
if self._torrents_changed:
self.save_torrents()
self._torrents_changed = False
return True
self.periodic_save_timer = Timer(15.0, periodic_save_func)
def status_updates_timer_cb():
self.post_torrent_updates(64)
return True
self.status_updates_timer = Timer(1.0, status_updates_timer_cb)
def state_update_alert_cb(a):
statuses = a.status
for status in statuses:
info_hash = status.info_hash
self.torrents[str(info_hash)].status = status
self.alert_manager.callback_add("state_update_alert", state_update_alert_cb)
def _add_torrent_cb(self, a):
e = a.error
if e.value() > 0:
params = a.params
info_hash = str(params["info_hash"])
if info_hash in self.torrents:
del self.torrents[info_hash]
log.error("Adding torrent %s failed: %s", info_hash, e.message())
return
handle = a.handle
info_hash = handle.info_hash()
log.debug("Torrent %s added", info_hash)
if str(info_hash) in self.torrents:
log.debug("Torrent already in list, setting session")
self.torrents[str(info_hash)].set_session(self)
else:
torrent = Torrent(self, info_hash)
self.torrents[str(info_hash)] = torrent
self._torrents_changed = True
def _metadata_received_cb(self, a):
handle = a.handle
info_hash = handle.info_hash()
log.debug("Metadata received for %s", str(info_hash))
torrent = self.torrents[str(info_hash)]
torrent.add_metadata()
def load_state(self):
for p in load_data_paths("epour"):
path = os.path.join(p, "session")
if os.path.isfile(path):
try:
with open(path, 'rb') as f:
state = lt.bdecode(f.read())
lt.session.load_state(self, state)
except Exception as e:
log.debug("Could not load previous session state.")
log.debug(e)
else:
log.info("Session restored from disk.")
break
from epour import __version__ as version
version += ".0"
ver_s = "Epour/{} libtorrent/{}".format(version, lt.version)
settings = get_session_settings(self)
settings["user_agent"] = ver_s
log.debug("User agent: %s", ver_s)
save_settings(self, settings)
def save_state(self):
state = lt.session.save_state(self)
path = os.path.join(save_data_path("epour"), "session")
with open(path, 'wb') as f:
f.write(lt.bencode(state))
log.debug("Session state saved.")
def load_torrents(self):
for p in load_data_paths("epour"):
torrents_path = os.path.join(p, "torrents")
if os.path.isfile(torrents_path):
break
if not torrents_path:
log.debug("Previous list of torrents not found.")
return
try:
pkl_file = open(torrents_path, 'rb')
except IOError:
log.warning("Could not open the list of torrents.")
else:
try:
torrents = cPickle.load(pkl_file)
except Exception:
log.exception("Opening the list of torrents failed.")
else:
log.debug(
"List of torrents opened, "
"restoring %d torrents.", len(torrents)
)
for info_hash, torrent in torrents.items():
log.debug("Restoring torrent %s", info_hash)
self.torrents[info_hash] = torrent
params_dict = torrent.get_params()
params = None
ti = None
if "ti" in params_dict and params_dict["ti"]:
try:
ti = read_torrent_file(info_hash)
except Exception:
log.exception("Opening torrent %s failed", info_hash)
else:
params_dict["ti"] = ti
if ti:
if not lt_version_post_breaking_change:
try:
data = read_resume_data(info_hash)
except Exception:
log.exception("Reading resume data failed.")
else:
params_dict["resume_data"] = data
else:
try:
data = read_resume_data(info_hash)
params = lt.read_resume_data(data)
except Exception:
log.exception("Reading resume data failed.")
else:
params.trackers = list(set(params.trackers))
if not lt_version_post_breaking_change:
if params is None:
log.warn("Falling back to < lt 1.2 compatibility handling.")
lt_compatibility_convert(params_dict)
params = params_dict
else:
if params is None:
params = lt.add_torrent_params()
for k, v in params_dict.items():
setattr(params, k, v)
try:
self.async_add_torrent(params)
except Exception:
log.exception("Opening torrent %s failed", info_hash)
continue
finally:
pkl_file.close()
def save_resume_data(self):
for handle in self.get_torrents():
if not handle.is_valid():
log.error("Invalid handle while trying to save resume data")
continue
status = handle.status(0)
if not status.has_metadata:
continue
if not status.need_save_resume:
continue
handle.save_resume_data()
self._outstanding_resume_data += 1
def shutdown(self):
self.pause()
self.save_resume_data()
def _check_outstanding():
if self._outstanding_resume_data == 0:
self.save_torrents()
self.save_state()
self._shutdown_cb()
return False
else:
return True
self._shutdown_timer = Timer(1.0, _check_outstanding)
def save_torrents(self):
for info_hash, torrent in self.torrents.items():
info_hash2 = str(torrent.handle.info_hash())
assert info_hash == info_hash2, "%s is not %s" % (info_hash, info_hash2)
path = os.path.join(save_data_path("epour"), "torrents")
try:
data = cPickle.dumps(self.torrents, protocol=cPickle.HIGHEST_PROTOCOL)
except Exception:
log.exception("Failed to save torrents")
else:
with open(path, 'wb') as fp:
fp.write(data)
log.debug("List of torrents saved.")
def remove_torrent(self, h, with_data=False):
info_hash = str(h.info_hash())
torrent = self.torrents[info_hash]
torrent.delete_torrent_file()
torrent.delete_resume_data()
del self.torrents[info_hash]
lt.session.remove_torrent(self, h, option=with_data)
self._torrents_changed = True
def add_torrent_with_uri(self, uri):
storage_path = self.conf.get("Settings", "storage_path")
#default_flags = self.conf.get("Settings", "default_flags")
add_dict = {
"save_path": storage_path,
"flags": default_flags,
}
self.fill_add_dict_based_on_uri(add_dict, uri)
self.add_torrent_with_dict(add_dict)
def fill_add_dict_based_on_uri(self, add_dict, uri):
parsed_uri = urlparse(uri)
if parsed_uri.scheme == "magnet":
add_dict["url"] = uri
elif parsed_uri.scheme == "file" or parsed_uri.scheme == "" and os.path.isfile(parsed_uri.path):
path = parsed_uri.path
mimetype = mimetypes.guess_type(path)[0]
if not mimetype == "application/x-bittorrent":
log.warning("%s is not of a known torrent file type", path)
with open(path, 'rb') as t:
t_raw = lt.bdecode(t.read())
info = lt.torrent_info(t_raw)
add_dict["ti"] = info
ihash = str(info.info_hash())
path_dir = save_data_path("epour")
new_path = os.path.join(path_dir, "{0}.torrent".format(ihash))
if path == new_path:
pass
else:
shutil.copy(path, new_path)
if self.conf.getboolean("Settings", "delete_original"):
log.debug(
"Deleting original torrent file %s", path)
os.remove(path)
path = new_path
elif len(uri) == 40: # looks like a sha1 string
add_dict["info_hash"] = uri
# elif uri.scheme == "http" or uri.scheme == "https":
# pass
else:
raise RuntimeError("Could not parse the torrent string.")
def add_torrent_with_dict(self, add_dict):
self.async_add_torrent(add_dict)
class AlertManager(object):
log = logging.getLogger("epour.alert")
update_interval = 0.2
alerts = {}
def __init__(self, session):
self.session = session
# TODO: Use the lt alert interval from session settings here?
self.timer = Timer(self.update_interval, self.update)
def callback_add(self, alert_type, cb, *args, **kwargs):
if alert_type not in self.alerts:
self.alerts[alert_type] = []
self.alerts[alert_type].append((cb, args, kwargs))
def callback_del(self, alert_type, cb, *args, **kwargs):
for i, a in enumerate(self.alerts):
if a == (cb, args, kwargs):
del(self.alerts[alert_type][i])
def signal(self, a):
a_name = type(a).__name__
if a_name not in self.alerts:
log.debug("No handler: %s | %s", a_name, a)
return
for cb, args, kwargs in self.alerts[a_name]:
try:
cb(a, *args, **kwargs)
except:
log.exception("Exception while handling alerts")
def update(self):
for a in self.session.pop_alerts():
self.signal(a)
return True
def status_to_flags(status):
#flags_t = lt.add_torrent_params_flags_t
flags = 0
flags += 1 if status.is_seeding else 0
#flags += 2 deprecated
flags += 4 if status.upload_mode else 0
flags += 8 if status.share_mode else 0
flags += 16 if status.ip_filter_applies else 0
flags += 32 if status.paused else 0
flags += 64 if status.auto_managed else 0
flags += 128 # duplicate_is_error
#flags += 256 deprecated
flags += 512 # update_subscribe
flags += 1024 if status.super_seeding else 0
flags += 2048 if status.sequential_download else 0
#flags += 4096 if pinned else 0
#flags += 8192 if stop_when_ready else 0
#flags += 16384 if override_trackers else 0
#flags += 32768 if override_web_seeds else 0
#flags += 65536 deprecated
#flags += 131072 if override_resume_data else 0
#flags += 262144 if merge_resume_trackers else 0
#flags += 524288 if use_resume_save_path else 0
#flags += 1048576 if merge_resume_http_seeds else 0
return flags
class Torrent(object):
def __init__(self, session, info_hash, options={}):
assert isinstance(session, lt.session)
assert isinstance(info_hash, lt.sha1_hash)
self.session = session
self.info_hash = info_hash
self.options = options
self._status = None
# @property
# def info_hash(self):
# return self.handle.info_hash()
@property
def handle(self):
return self.session.find_torrent(self.info_hash)
@property
def state(self):
if getattr(self, "_state", None):
return self._state
else:
status = self.handle.status(0)
state = status.state
self._state = state
return state
@state.setter
def state(self, value):
self._state = value
@property
def status(self):
if getattr(self, "_status", None):
return self._status
else:
status = self.handle.status(64)
self._status = status
return status
@status.setter
def status(self, value):
self._status = value
def get_params(self):
return self._params
def _get_params(self):
handle = self.handle
status = handle.status()
flags = status_to_flags(status)
# trackers = []
# for tracker in handle.trackers():
# trackers.append(tracker["url"])
# trackers = ",".join(trackers)
params = {
#"trackers": trackers,
"url_seeds": handle.url_seeds(),
#"dht_nodes":
"name": status.name,
"save_path": status.save_path,
"storage_mode": status.storage_mode,
#"storage":
#"userdata":
"file_priorities": handle.file_priorities(),
#"trackerid":
#"url":
#"uuid":
#"source_feed_url":
"flags": flags,
#"info_hash": handle.info_hash().to_bytes(),
"info_hash": self.info_hash,
"max_uploads": handle.max_uploads(),
"max_connections": handle.max_connections(),
"upload_limit": handle.upload_limit(),
"download_limit": handle.download_limit(),
}
if self.has_metadata:
params["ti"] = self.torrent_file_path
return params
def __getstate__(self):
state = self.__dict__.copy()
params = self._get_params()
info_hash1 = params["info_hash"].to_bytes()
params["info_hash"] = info_hash1
state["_params"] = params
info_hash2 = state["info_hash"].to_bytes()
state["info_hash"] = info_hash2
del state["session"]
del state["_status"]
del state["_state"]
return state
def __setstate__(self, state):
params = state["_params"]
info_hash1 = params["info_hash"]
info_hash1 = lt.sha1_hash(info_hash1)
params["info_hash"] = info_hash1
info_hash2 = state["info_hash"]
info_hash2 = lt.sha1_hash(info_hash2)
state["info_hash"] = info_hash1
self.__dict__.update(state)
def set_session(self, session):
self.session = session
@property
def has_metadata(self):
return self.handle.status(0).has_metadata
@property
def torrent_file_path(self):
paths = load_data_paths("epour")
for p in paths:
t_path = os.path.join(p, "{0}.torrent".format(self.info_hash))
if os.path.exists(t_path):
return t_path
return None
def write_torrent_file(self):
assert self.handle is not None
handle = self.handle
info_hash = str(handle.info_hash())
log.debug("Writing torrent file {0}.torrent".format(info_hash))
p = save_data_path("epour")
t_path = os.path.join(p, "{0}.torrent".format(info_hash))
if t_path:
t_info = handle.torrent_file()
metadata = lt.bdecode(t_info.metadata())
torrent_file = {"info": metadata}
with open(t_path, "wb") as fp:
fp.write(lt.bencode(torrent_file))
log.debug("Torrent file was written to %s" % t_path)
return t_path
def write_resume_data(self, data):
assert self.handle is not None
info_hash = self.handle.info_hash()
info_hash = str(info_hash)
log.debug("Writing resume data for {}".format(info_hash))
t_path = os.path.join(
save_data_path("epour"),
info_hash + ".fastresume")
if t_path:
with open(t_path, "wb") as f:
f.write(lt.bencode(data))
else:
return
return t_path
def delete_torrent_file(self):
assert self.handle is not None
info_hash = str(self.handle.info_hash())
for p in load_data_paths("epour"):
t_path = os.path.join(p, "{0}.torrent".format(info_hash))
try:
with open(t_path):
pass
except IOError:
continue
else:
os.remove(t_path)
break
def delete_resume_data(self):
assert self.handle is not None
info_hash = str(self.handle.info_hash())
for p in load_data_paths("epour"):
fr_path = os.path.join(p, "{0}.fastresume".format(info_hash))
try:
with open(fr_path):
pass
except IOError:
continue
else:
os.remove(fr_path)
def add_metadata(self):
self.write_torrent_file()