epour/epour/session.py

401 lines
13 KiB
Python

#
# Epour - A bittorrent client using EFL and libtorrent
#
# Copyright 2012-2015 Kai Huuhko <kai.huuhko@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
import sys
import os
import mimetypes
import urllib
try:
import urlparse
except ImportError:
from urllib import parse as urlparse
import logging
import shutil
try:
import cPickle
except ImportError:
import pickle as cPickle
from collections import OrderedDict
import libtorrent as lt
from efl.ecore import Timer
from xdg.BaseDirectory import save_data_path, load_data_paths
class Session(lt.session):
def __init__(self, conf):
self.conf = conf
self.log = logging.getLogger("epour.session")
from epour import __version__ as version
ver_ints = []
for s in version.split("."):
ver_ints.append(int(s))
ver_ints.append(0)
fp = lt.fingerprint("EP", *ver_ints)
self.log.debug("peer-id: {}".format(fp))
lt.session.__init__(
self,
fingerprint=fp,
# flags=
#lt.session_flags_t.add_default_plugins|
#lt.session_flags_t.start_default_features
)
self.log.info("Session started")
self.torrents = OrderedDict()
#rsdpipsdtsppe
#stheprtertoer
#satrboabaorer
#|t|flgtucrtro
#|s|oorugkam|r
#|||rces|ega||
#|||mks||rep||
mask = 0b0000001000001
self.set_alert_mask(mask)
self.listen_on(
conf.getint("Settings", "listen_low"),
conf.getint("Settings", "listen_high")
)
self.alert_manager = AlertManager(self)
self.alert_manager.callback_add(
"add_torrent_alert", self._add_torrent_cb)
self.alert_manager.callback_add(
"metadata_received_alert", self._metadata_received_cb)
def torrent_finished_move_cb(a):
h = a.handle
if conf.getboolean("Settings", "move_completed_enabled"):
path = conf.get("Settings", "move_completed_path")
if h.save_path() == path:
return
h.move_storage(path)
self.alert_manager.callback_add(
"torrent_finished_alert", torrent_finished_move_cb)
def _add_torrent_cb(self, a):
e = a.error
if e.value() > 0:
self.log.error("Adding torrent failed: %r" % (e.message()))
return
h = a.handle
ihash = str(h.info_hash())
self.torrents[ihash] = a.params
self.log.debug("Torrent added.")
def _metadata_received_cb(self, a):
h = a.handle
ihash = str(h.info_hash())
self.log.debug("Metadata received.")
t_info = h.get_torrent_info()
self.torrents[ihash]["ti"] = t_info
def load_state(self):
for p in load_data_paths("epour"):
path = os.path.join(p, "session")
if os.path.isfile(path):
try:
with open(path, 'rb') as f:
state = lt.bdecode(f.read())
lt.session.load_state(self, state)
except Exception as e:
self.log.debug("Could not load previous session state.")
self.log.debug(e)
else:
self.log.info("Session restored from disk.")
break
settings = self.settings()
from epour import __version__ as version
version += ".0"
ver_s = "Epour/{} libtorrent/{}".format(version, lt.version)
settings.user_agent = ver_s
self.log.debug("User agent: {}".format(ver_s))
self.set_settings(settings)
def save_state(self):
state = lt.session.save_state(self)
path = os.path.join(save_data_path("epour"), "session")
with open(path, 'wb') as f:
f.write(lt.bencode(state))
self.log.debug("Session state saved.")
def load_torrents(self):
for p in load_data_paths("epour"):
torrents_path = os.path.join(p, "torrents")
if os.path.isfile(torrents_path):
break
if not torrents_path:
self.info.debug("No previous list of torrents found.")
return
try:
pkl_file = open(torrents_path, 'rb')
except IOError:
self.log.warning("Could not open the list of torrents.")
else:
try:
torrents = cPickle.load(pkl_file)
except Exception:
self.log.exception("Opening the list of torrents failed.")
else:
self.log.debug(
"List of torrents opened, "
"restoring {} torrents.".format(len(torrents))
)
for i, t in torrents.items():
try:
for k, v in t.items():
if v is None:
continue
elif k == "ti":
# Epour <= 0.6 compat
if isinstance(v, dict):
t[k] = lt.torrent_info(lt.bdecode(v))
else:
t[k] = lt.bdecode(v)
# elif k == "info_hash":
# torrents[i][k] = lt.big_number(v)
except Exception:
self.log.exception("Opening torrent %s failed", i)
continue
self.async_add_torrent(t)
finally:
pkl_file.close()
def save_torrents(self):
self.log.debug("Saving {} torrents.".format(len(self.torrents)))
for i, t in self.torrents.items():
for k, v in t.items():
if k == "info_hash":
if v.is_all_zeros():
del self.torrents[i][k]
else:
self.torrents[i][k] = v.to_bytes()
handles = self.get_torrents()
for h in handles:
if h.is_valid():
i = str(h.info_hash())
t_dict = self.torrents[i]
t_dict["save_path"] = h.save_path()
s = h.status(0)
if s.has_metadata:
resume_data = lt.bencode(h.write_resume_data())
t_dict["resume_data"] = resume_data
t_info = h.get_torrent_info()
t_dict["ti"] = lt.bencode(t_info)
else:
self.log.debug("Handle is invalid, skipping")
path = os.path.join(save_data_path("epour"), "torrents")
with open(path, 'wb') as f:
cPickle.dump(self.torrents, f, protocol=cPickle.HIGHEST_PROTOCOL)
self.log.debug("List of torrents saved.")
# def write_torrent(self, h):
# if h is None:
# self.log.debug("Tried to write torrent while handle was empty.")
# return
# t_info = h.get_torrent_info()
# ihash = str(h.info_hash())
# self.log.debug("Writing torrent file {}".format(ihash))
# md = lt.bdecode(t_info.metadata())
# t = {}
# t["info"] = md
# p = save_data_path("epour")
# t_path = os.path.join(p, "{0}.torrent".format(ihash))
# if t_path:
# with open(t_path, "wb") as f:
# f.write(lt.bencode(t))
# return t_path
def remove_torrent(self, h, with_data=False):
ihash = str(h.info_hash())
del self.torrents[ihash]
lt.session.remove_torrent(self, h, option=with_data)
for p in load_data_paths("epour"):
fr_path = os.path.join(
p, "{0}.fastresume".format(ihash)
)
try:
with open(fr_path):
pass
except IOError:
self.log.debug("Could not remove %s", fr_path)
else:
os.remove(fr_path)
t_path = None
for p in load_data_paths("epour"):
t_path = os.path.join(p, "{0}.torrent".format(ihash))
break
if t_path:
try:
with open(t_path):
pass
except IOError:
self.log.debug("Could not remove torrent file.")
else:
os.remove(t_path)
if not hasattr(lt, "torrent_removed_alert"):
class torrent_removed_alert(object):
def __init__(self, h, info_hash):
self.handle = h
self.info_hash = info_hash
a = torrent_removed_alert(h, ihash)
self.alert_manager.signal(a)
return ihash
def add_torrent_from_file(self, add_dict, t_uri):
mimetype = mimetypes.guess_type(t_uri)[0]
if not mimetype == "application/x-bittorrent":
self.log.error("Invalid file")
return
if t_uri.startswith("file://"):
t_uri = urllib.unquote(urlparse.urlsplit(t_uri).path)
with open(t_uri, 'rb') as t:
t_raw = lt.bdecode(t.read())
info = lt.torrent_info(t_raw)
add_dict["ti"] = info
rd = None
fr_file_name = "{}.fastresume".format(info.info_hash())
for p in load_data_paths("epour"):
path = os.path.join(p, fr_file_name)
if os.path.isfile(path):
try:
with open(path, "rb") as f:
rd = f.read()
except Exception:
self.log.debug("Invalid resume data")
else:
add_dict["resume_data"] = rd
break
ihash = str(info.info_hash())
path = save_data_path("epour")
new_uri = os.path.join(path, "{0}.torrent".format(ihash))
if t_uri == new_uri:
pass
else:
shutil.copy(t_uri, new_uri)
if self.conf.getboolean("Settings", "delete_original"):
self.log.debug(
"Deleting original torrent file {}".format(t_uri))
os.remove(t_uri)
t_uri = new_uri
self.async_add_torrent(add_dict)
def add_torrent_from_magnet(self, add_dict, t_uri):
self.log.debug("Adding %r", t_uri)
t_uri = t_uri.encode("ascii")
tmp_dict = lt.parse_magnet_uri(t_uri)
tmp_dict.update(add_dict)
tmp_dict["info_hash"] = tmp_dict["info_hash"].to_bytes()
self.async_add_torrent(tmp_dict)
def add_torrent_from_hash(self, add_dict, t_uri):
t_uri = t_uri.encode("ascii")
add_dict["info_hash"] = t_uri
self.log.debug("Adding %s", t_uri)
self.async_add_torrent(add_dict)
class AlertManager(object):
log = logging.getLogger("epour.alert")
update_interval = 0.2
alerts = {}
def __init__(self, session):
self.session = session
# TODO: Use the lt alert interval from session settings here?
self.timer = Timer(self.update_interval, self.update)
def callback_add(self, alert_type, cb, *args, **kwargs):
if alert_type not in self.alerts:
self.alerts[alert_type] = []
self.alerts[alert_type].append((cb, args, kwargs))
def callback_del(self, alert_type, cb, *args, **kwargs):
for i, a in enumerate(self.alerts):
if a == (cb, args, kwargs):
del(self.alerts[alert_type][i])
def signal(self, a):
a_name = type(a).__name__
if a_name not in self.alerts:
self.log.debug("No handler: {} | {}".format(a_name, a))
return
for cb, args, kwargs in self.alerts[a_name]:
try:
cb(a, *args, **kwargs)
except:
self.log.exception("Exception while handling alerts")
def update(self):
#self.log.debug("Alerts TICK")
for a in self.session.pop_alerts():
self.signal(a)
return True