diff --git a/cps/__init__.py b/cps/__init__.py index d557649c..a879da0a 100644 --- a/cps/__init__.py +++ b/cps/__init__.py @@ -102,7 +102,6 @@ def create_app(): web_server.init_app(app, config) calibre_db.setup_db(config, cli.settingspath) - calibre_db.start() babel.init_app(app) _BABEL_TRANSLATIONS.update(str(item) for item in babel.list_translations()) diff --git a/cps/db.py b/cps/db.py index 9d06b149..a5a01aca 100644 --- a/cps/db.py +++ b/cps/db.py @@ -24,7 +24,6 @@ import re import ast import json from datetime import datetime -import threading from sqlalchemy import create_engine from sqlalchemy import Table, Column, ForeignKey, CheckConstraint @@ -32,7 +31,6 @@ from sqlalchemy import String, Integer, Boolean, TIMESTAMP, Float from sqlalchemy.orm import relationship, sessionmaker, scoped_session from sqlalchemy.orm.collections import InstrumentedList from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta -from sqlalchemy.exc import OperationalError from sqlalchemy.pool import StaticPool from flask_login import current_user from sqlalchemy.sql.expression import and_, true, false, text, func, or_ @@ -400,43 +398,14 @@ class AlchemyEncoder(json.JSONEncoder): return json.JSONEncoder.default(self, obj) -class CalibreDB(threading.Thread): +class CalibreDB(): def __init__(self): - threading.Thread.__init__(self) self.engine = None self.session = None - self.queue = None self.log = None self.config = None - def add_queue(self,queue): - self.queue = queue - self.log = logger.create() - - def run(self): - while True: - i = self.queue.get() - if i == 'dummy': - self.queue.task_done() - break - if i['task'] == 'add_format': - cur_book = self.session.query(Books).filter(Books.id == i['id']).first() - cur_book.data.append(i['format']) - try: - # db.session.merge(cur_book) - self.session.commit() - except OperationalError as e: - self.session.rollback() - self.log.error("Database error: %s", e) - # self._handleError(_(u"Database error: %(error)s.", error=e)) - # return - self.queue.task_done() - - - def stop(self): - self.queue.put('dummy') - def setup_db(self, config, app_db_path): self.config = config self.dispose() diff --git a/cps/editbooks.py b/cps/editbooks.py index 1474c837..1de44d98 100644 --- a/cps/editbooks.py +++ b/cps/editbooks.py @@ -34,8 +34,10 @@ from flask_login import current_user, login_required from sqlalchemy.exc import OperationalError from . import constants, logger, isoLanguages, gdriveutils, uploader, helper -from . import config, get_locale, ub, worker, db +from . import config, get_locale, ub, db from . import calibre_db +from .services.worker import WorkerThread +from .tasks.upload import TaskUpload from .web import login_required_if_no_ano, render_title_template, edit_required, upload_required @@ -544,8 +546,8 @@ def upload_single_file(request, book, book_id): # Queue uploader info uploadText=_(u"File format %(ext)s added to %(book)s", ext=file_ext.upper(), book=book.title) - worker.add_upload(current_user.nickname, - "" + uploadText + "") + WorkerThread.add(current_user.nickname, TaskUpload( + "" + uploadText + "")) return uploader.process( saved_filename, *os.path.splitext(requested_file.filename), @@ -889,8 +891,8 @@ def upload(): if error: flash(error, category="error") uploadText=_(u"File %(file)s uploaded", file=title) - worker.add_upload(current_user.nickname, - "" + uploadText + "") + WorkerThread.add(current_user.nickname, TaskUpload( + "" + uploadText + "")) if len(request.files.getlist("btn-upload")) < 2: if current_user.role_edit() or current_user.role_admin(): diff --git a/cps/helper.py b/cps/helper.py index d40128d7..fe938127 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -39,6 +39,7 @@ from sqlalchemy.sql.expression import true, false, and_, text, func from werkzeug.datastructures import Headers from werkzeug.security import generate_password_hash from . import calibre_db +from .tasks.convert import TaskConvert try: from urllib.parse import quote @@ -58,12 +59,12 @@ try: except ImportError: use_PIL = False -from . import logger, config, get_locale, db, ub, worker +from . import logger, config, get_locale, db, ub from . import gdriveutils as gd from .constants import STATIC_DIR as _STATIC_DIR from .subproc_wrapper import process_wait -from .worker import STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS -from .worker import TASK_EMAIL, TASK_CONVERT, TASK_UPLOAD, TASK_CONVERT_ANY +from .services.worker import WorkerThread, STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS +from .tasks.email import TaskEmail log = logger.create() @@ -101,7 +102,7 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format, txt = (u"%s -> %s: %s" % (old_book_format, new_book_format, book.title)) settings['old_book_format'] = old_book_format settings['new_book_format'] = new_book_format - worker.add_convert(file_path, book.id, user_id, txt, settings, kindle_mail) + WorkerThread.add(user_id, TaskConvert(file_path, book.id, txt, settings, kindle_mail)) return None else: error_message = _(u"%(format)s not found: %(fn)s", @@ -110,9 +111,9 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format, def send_test_mail(kindle_mail, user_name): - worker.add_email(_(u'Calibre-Web test e-mail'), None, None, - config.get_mail_settings(), kindle_mail, user_name, - _(u"Test e-mail"), _(u'This e-mail has been sent via Calibre-Web.')) + WorkerThread.add(user_name, TaskEmail(_(u'Calibre-Web test e-mail'), None, None, + config.get_mail_settings(), kindle_mail, _(u"Test e-mail"), + _(u'This e-mail has been sent via Calibre-Web.'))) return @@ -127,9 +128,16 @@ def send_registration_mail(e_mail, user_name, default_password, resend=False): text += "Don't forget to change your password after first login.\r\n" text += "Sincerely\r\n\r\n" text += "Your Calibre-Web team" - worker.add_email(_(u'Get Started with Calibre-Web'), None, None, - config.get_mail_settings(), e_mail, None, - _(u"Registration e-mail for user: %(name)s", name=user_name), text) + WorkerThread.add(None, TaskEmail( + subject=_(u'Get Started with Calibre-Web'), + filepath=None, + attachment=None, + settings=config.get_mail_settings(), + recipient=e_mail, + taskMessage=_(u"Registration e-mail for user: %(name)s", name=user_name), + text=text + )) + return @@ -221,9 +229,9 @@ def send_mail(book_id, book_format, convert, kindle_mail, calibrepath, user_id): for entry in iter(book.data): if entry.format.upper() == book_format.upper(): converted_file_name = entry.name + '.' + book_format.lower() - worker.add_email(_(u"Send to Kindle"), book.path, converted_file_name, - config.get_mail_settings(), kindle_mail, user_id, - _(u"E-mail: %(book)s", book=book.title), _(u'This e-mail has been sent via Calibre-Web.')) + WorkerThread.add(user_id, TaskEmail(_(u"Send to Kindle"), book.path, converted_file_name, + config.get_mail_settings(), kindle_mail, + _(u"E-mail: %(book)s", book=book.title), _(u'This e-mail has been sent via Calibre-Web.'))) return return _(u"The requested file could not be read. Maybe wrong permissions?") @@ -722,47 +730,30 @@ def format_runtime(runtime): # helper function to apply localize status information in tasklist entries def render_task_status(tasklist): renderedtasklist = list() - for task in tasklist: - if task['user'] == current_user.nickname or current_user.role_admin(): - if task['formStarttime']: - task['starttime'] = format_datetime(task['formStarttime'], format='short', locale=get_locale()) - # task2['formStarttime'] = "" - else: - if 'starttime' not in task: - task['starttime'] = "" - - if 'formRuntime' not in task: - task['runtime'] = "" - else: - task['runtime'] = format_runtime(task['formRuntime']) + for num, user, added, task in tasklist: + if user == current_user.nickname or current_user.role_admin(): + ret = {} + if task.start_time: + ret['starttime'] = format_datetime(task.start_time, format='short', locale=get_locale()) + ret['runtime'] = format_runtime(task.runtime) # localize the task status - if isinstance(task['stat'], int): - if task['stat'] == STAT_WAITING: - task['status'] = _(u'Waiting') - elif task['stat'] == STAT_FAIL: - task['status'] = _(u'Failed') - elif task['stat'] == STAT_STARTED: - task['status'] = _(u'Started') - elif task['stat'] == STAT_FINISH_SUCCESS: - task['status'] = _(u'Finished') - else: - task['status'] = _(u'Unknown Status') - - # localize the task type - if isinstance(task['taskType'], int): - if task['taskType'] == TASK_EMAIL: - task['taskMessage'] = _(u'E-mail: ') + task['taskMess'] - elif task['taskType'] == TASK_CONVERT: - task['taskMessage'] = _(u'Convert: ') + task['taskMess'] - elif task['taskType'] == TASK_UPLOAD: - task['taskMessage'] = _(u'Upload: ') + task['taskMess'] - elif task['taskType'] == TASK_CONVERT_ANY: - task['taskMessage'] = _(u'Convert: ') + task['taskMess'] + if isinstance(task.stat, int): + if task.stat == STAT_WAITING: + ret['status'] = _(u'Waiting') + elif task.stat == STAT_FAIL: + ret['status'] = _(u'Failed') + elif task.stat == STAT_STARTED: + ret['status'] = _(u'Started') + elif task.stat == STAT_FINISH_SUCCESS: + ret['status'] = _(u'Finished') else: - task['taskMessage'] = _(u'Unknown Task: ') + task['taskMess'] + ret['status'] = _(u'Unknown Status') - renderedtasklist.append(task) + ret['taskMessage'] = "{}: {}".format(_(task.name), task.message) + ret['progress'] = "{} %".format(int(task.progress * 100)) + ret['user'] = user + renderedtasklist.append(ret) return renderedtasklist diff --git a/cps/server.py b/cps/server.py index 7c2d321d..d27fe239 100644 --- a/cps/server.py +++ b/cps/server.py @@ -200,9 +200,6 @@ class WebServer(object): def stop(self, restart=False): from . import updater_thread updater_thread.stop() - from . import calibre_db - calibre_db.stop() - log.info("webserver stop (restart=%s)", restart) self.restart = restart diff --git a/cps/services/worker.py b/cps/services/worker.py new file mode 100644 index 00000000..05307b52 --- /dev/null +++ b/cps/services/worker.py @@ -0,0 +1,207 @@ + +from __future__ import division, print_function, unicode_literals +import threading +import abc +import uuid + +try: + import queue +except ImportError: + import Queue as queue +from datetime import datetime, timedelta +from collections import namedtuple + +from cps import calibre_db +from cps import logger + +log = logger.create() + +# task 'status' consts +STAT_WAITING = 0 +STAT_FAIL = 1 +STAT_STARTED = 2 +STAT_FINISH_SUCCESS = 3 + +# Only retain this many tasks in dequeued list +TASK_CLEANUP_TRIGGER = 20 + +QueuedTask = namedtuple('QueuedTask', 'num, user, added, task') + + +def _get_main_thread(): + for t in threading.enumerate(): + if t.__class__.__name__ == '_MainThread': + return t + raise Exception("main thread not found?!") + + + +class ImprovedQueue(queue.Queue): + def to_list(self): + """ + Returns a copy of all items in the queue without removing them. + """ + + with self.mutex: + return list(self.queue) + +#Class for all worker tasks in the background +class WorkerThread(threading.Thread): + _instance = None + + @classmethod + def getInstance(cls): + if cls._instance is None: + cls._instance = WorkerThread() + return cls._instance + + def __init__(self): + threading.Thread.__init__(self) + + self.dequeued = list() + + self.doLock = threading.Lock() + self.queue = ImprovedQueue() + self.num = 0 + self.start() + + @classmethod + def add(cls, user, task): + ins = cls.getInstance() + ins.num += 1 + ins.queue.put(QueuedTask( + num=ins.num, + user=user, + added=datetime.now(), + task=task, + )) + + @property + def tasks(self): + with self.doLock: + tasks = self.queue.to_list() + self.dequeued + return sorted(tasks, key=lambda x: x.num) + + + # Main thread loop starting the different tasks + def run(self): + main_thread = _get_main_thread() + while main_thread.is_alive(): + item = self.queue.get() + + with self.doLock: + # once we hit our trigger, start cleaning up dead tasks + if len(self.dequeued) > TASK_CLEANUP_TRIGGER: + dead = [] + alive = [] + for x in self.dequeued: + (dead if x.task.dead else alive).append(x) + + # if the ones that we need to keep are within the trigger, do nothing else + delta = len(self.dequeued) - len(dead) + if delta > TASK_CLEANUP_TRIGGER: + ret = alive + else: + # otherwise, lop off the oldest dead tasks until we hit the target trigger + ret = sorted(dead, key=lambda x: x.task.end_time)[-TASK_CLEANUP_TRIGGER:] + alive + + self.dequeued = sorted(ret, key=lambda x: x.num) + # add to list so that in-progress tasks show up + self.dequeued.append(item) + + # sometimes tasks (like Upload) don't actually have work to do and are created as already finished + if item.task.stat is STAT_WAITING: + # CalibreTask.start() should wrap all exceptions in it's own error handling + item.task.start(self) + + self.queue.task_done() + + +class CalibreTask: + __metaclass__ = abc.ABCMeta + + def __init__(self, message): + self._progress = 0 + self.stat = STAT_WAITING + self.error = None + self.start_time = None + self.end_time = None + self.message = message + self.id = uuid.uuid4() + + @abc.abstractmethod + def run(self, worker_thread): + """Provides the caller some human-readable name for this class""" + raise NotImplementedError + + @abc.abstractmethod + def name(self): + """Provides the caller some human-readable name for this class""" + raise NotImplementedError + + def start(self, *args): + self.start_time = datetime.now() + self.stat = STAT_STARTED + + # catch any unhandled exceptions in a task and automatically fail it + try: + self.run(*args) + except Exception as e: + self._handleError(str(e)) + log.exception(e) + + self.end_time = datetime.now() + + @property + def stat(self): + return self._stat + + @stat.setter + def stat(self, x): + self._stat = x + + @property + def progress(self): + return self._progress + + @progress.setter + def progress(self, x): + if not 0 <= x <= 1: + raise ValueError("Task progress should within [0, 1] range") + self._progress = x + + @property + def error(self): + return self._error + + @error.setter + def error(self, x): + self._error = x + + @property + def runtime(self): + return (self.end_time or datetime.now()) - self.start_time + + @property + def dead(self): + """Determines whether or not this task can be garbage collected + + We have a separate dictating this because there may be certain tasks that want to override this + """ + # By default, we're good to clean a task if it's "Done" + return self.stat in (STAT_FINISH_SUCCESS, STAT_FAIL) + + @progress.setter + def progress(self, x): + # todo: throw error if outside of [0,1] + self._progress = x + + def _handleError(self, error_message): + log.error(error_message) + self.stat = STAT_FAIL + self.progress = 1 + self.error = error_message + + def _handleSuccess(self): + self.stat = STAT_FINISH_SUCCESS + self.progress = 1 diff --git a/cps/tasks/__init__.py b/cps/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cps/tasks/convert.py b/cps/tasks/convert.py new file mode 100644 index 00000000..6a0ac402 --- /dev/null +++ b/cps/tasks/convert.py @@ -0,0 +1,201 @@ +from __future__ import division, print_function, unicode_literals +import sys +import os +import re + +from glob import glob +from shutil import copyfile + +from sqlalchemy.exc import SQLAlchemyError + +from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS +from cps import calibre_db, db +from cps import logger, config +from cps.subproc_wrapper import process_open +from flask_babel import gettext as _ + +from cps.tasks.email import TaskEmail +from cps import gdriveutils +log = logger.create() + + +class TaskConvert(CalibreTask): + def __init__(self, file_path, bookid, taskMessage, settings, kindle_mail): + super(TaskConvert, self).__init__(taskMessage) + self.file_path = file_path + self.bookid = bookid + self.settings = settings + self.kindle_mail = kindle_mail + + self.results = dict() + + def run(self, worker_thread): + self.worker_thread = worker_thread + filename = self._convert_ebook_format() + + if filename: + if config.config_use_google_drive: + gdriveutils.updateGdriveCalibreFromLocal() + if self.kindle_mail: + # if we're sending to kindle after converting, create a one-off task and run it immediately + # todo: figure out how to incorporate this into the progress + try: + task = TaskEmail(self.settings['subject'], self.results["path"], + filename, self.settings, self.kindle_mail, + self.settings['subject'], self.settings['body'], internal=True) + task.start(worker_thread) + + # even though the convert task might be finished, if this task fails, fail the whole thing + if task.stat != STAT_FINISH_SUCCESS: + raise Exception(task.error) + except Exception as e: + return self._handleError(str(e)) + + def _convert_ebook_format(self): + error_message = None + file_path = self.file_path + book_id = self.bookid + format_old_ext = u'.' + self.settings['old_book_format'].lower() + format_new_ext = u'.' + self.settings['new_book_format'].lower() + + # check to see if destination format already exists - + # if it does - mark the conversion task as complete and return a success + # this will allow send to kindle workflow to continue to work + if os.path.isfile(file_path + format_new_ext): + log.info("Book id %d already converted to %s", book_id, format_new_ext) + cur_book = calibre_db.get_book(book_id) + self.results['path'] = file_path + self.results['title'] = cur_book.title + self._handleSuccess() + return file_path + format_new_ext + else: + log.info("Book id %d - target format of %s does not exist. Moving forward with convert.", + book_id, + format_new_ext) + + if config.config_kepubifypath and format_old_ext == '.epub' and format_new_ext == '.kepub': + check, error_message = self._convert_kepubify(file_path, + format_old_ext, + format_new_ext) + else: + # check if calibre converter-executable is existing + if not os.path.exists(config.config_converterpath): + # ToDo Text is not translated + self._handleError(_(u"Calibre ebook-convert %(tool)s not found", tool=config.config_converterpath)) + return + check, error_message = self._convert_calibre(file_path, format_old_ext, format_new_ext) + + if check == 0: + cur_book = calibre_db.get_book(book_id) + if os.path.isfile(file_path + format_new_ext): + # self.db_queue.join() + new_format = db.Data(name=cur_book.data[0].name, + book_format=self.settings['new_book_format'].upper(), + book=book_id, uncompressed_size=os.path.getsize(file_path + format_new_ext)) + + cur_book.data.append(new_format) + + try: + # db.session.merge(cur_book) + calibre_db.session.commit() + except SQLAlchemyError as e: + calibre_db.session.rollback() + log.error("Database error: %s", e) + return + + self.results['path'] = cur_book.path + self.results['title'] = cur_book.title + if config.config_use_google_drive: + os.remove(file_path + format_old_ext) + self._handleSuccess() + return file_path + format_new_ext + else: + error_message = _('%(format)s format not found on disk', format=format_new_ext.upper()) + log.info("ebook converter failed with error while converting book") + if not error_message: + error_message = _('Ebook converter failed with unknown error') + self._handleError(error_message) + return + + def _convert_kepubify(self, file_path, format_old_ext, format_new_ext): + quotes = [1, 3] + command = [config.config_kepubifypath, (file_path + format_old_ext), '-o', os.path.dirname(file_path)] + try: + p = process_open(command, quotes) + except OSError as e: + return 1, _(u"Kepubify-converter failed: %(error)s", error=e) + self.progress = 0.01 + while True: + nextline = p.stdout.readlines() + nextline = [x.strip('\n') for x in nextline if x != '\n'] + if sys.version_info < (3, 0): + nextline = [x.decode('utf-8') for x in nextline] + for line in nextline: + log.debug(line) + if p.poll() is not None: + break + + # ToD Handle + # process returncode + check = p.returncode + + # move file + if check == 0: + converted_file = glob(os.path.join(os.path.dirname(file_path), "*.kepub.epub")) + if len(converted_file) == 1: + copyfile(converted_file[0], (file_path + format_new_ext)) + os.unlink(converted_file[0]) + else: + return 1, _(u"Converted file not found or more than one file in folder %(folder)s", + folder=os.path.dirname(file_path)) + return check, None + + def _convert_calibre(self, file_path, format_old_ext, format_new_ext): + try: + # Linux py2.7 encode as list without quotes no empty element for parameters + # linux py3.x no encode and as list without quotes no empty element for parameters + # windows py2.7 encode as string with quotes empty element for parameters is okay + # windows py 3.x no encode and as string with quotes empty element for parameters is okay + # separate handling for windows and linux + quotes = [1, 2] + command = [config.config_converterpath, (file_path + format_old_ext), + (file_path + format_new_ext)] + quotes_index = 3 + if config.config_calibre: + parameters = config.config_calibre.split(" ") + for param in parameters: + command.append(param) + quotes.append(quotes_index) + quotes_index += 1 + + p = process_open(command, quotes) + except OSError as e: + return 1, _(u"Ebook-converter failed: %(error)s", error=e) + + while p.poll() is None: + nextline = p.stdout.readline() + if os.name == 'nt' and sys.version_info < (3, 0): + nextline = nextline.decode('windows-1252') + elif os.name == 'posix' and sys.version_info < (3, 0): + nextline = nextline.decode('utf-8') + log.debug(nextline.strip('\r\n')) + # parse progress string from calibre-converter + progress = re.search(r"(\d+)%\s.*", nextline) + if progress: + self.progress = int(progress.group(1)) / 100 + + # process returncode + check = p.returncode + calibre_traceback = p.stderr.readlines() + error_message = "" + for ele in calibre_traceback: + if sys.version_info < (3, 0): + ele = ele.decode('utf-8') + log.debug(ele.strip('\n')) + if not ele.startswith('Traceback') and not ele.startswith(' File'): + error_message = _("Calibre failed with error: %(error)s", ele.strip('\n')) + return check, error_message + + @property + def name(self): + return "Convert" diff --git a/cps/tasks/email.py b/cps/tasks/email.py new file mode 100644 index 00000000..3b2db6eb --- /dev/null +++ b/cps/tasks/email.py @@ -0,0 +1,238 @@ +from __future__ import division, print_function, unicode_literals +import sys +import os +import smtplib +import threading +import socket + +try: + from StringIO import StringIO + from email.MIMEBase import MIMEBase + from email.MIMEMultipart import MIMEMultipart + from email.MIMEText import MIMEText +except ImportError: + from io import StringIO + from email.mime.base import MIMEBase + from email.mime.multipart import MIMEMultipart + from email.mime.text import MIMEText + +from email import encoders +from email.utils import formatdate, make_msgid +from email.generator import Generator + +from cps.services.worker import CalibreTask +from cps import logger, config + +from cps import gdriveutils + +log = logger.create() + +CHUNKSIZE = 8192 + + +# Class for sending email with ability to get current progress +class EmailBase: + + transferSize = 0 + progress = 0 + + def data(self, msg): + self.transferSize = len(msg) + (code, resp) = smtplib.SMTP.data(self, msg) + self.progress = 0 + return (code, resp) + + def send(self, strg): + """Send `strg' to the server.""" + log.debug('send: %r', strg[:300]) + if hasattr(self, 'sock') and self.sock: + try: + if self.transferSize: + lock=threading.Lock() + lock.acquire() + self.transferSize = len(strg) + lock.release() + for i in range(0, self.transferSize, CHUNKSIZE): + if isinstance(strg, bytes): + self.sock.send((strg[i:i + CHUNKSIZE])) + else: + self.sock.send((strg[i:i + CHUNKSIZE]).encode('utf-8')) + lock.acquire() + self.progress = i + lock.release() + else: + self.sock.sendall(strg.encode('utf-8')) + except socket.error: + self.close() + raise smtplib.SMTPServerDisconnected('Server not connected') + else: + raise smtplib.SMTPServerDisconnected('please run connect() first') + + @classmethod + def _print_debug(cls, *args): + log.debug(args) + + def getTransferStatus(self): + if self.transferSize: + lock2 = threading.Lock() + lock2.acquire() + value = int((float(self.progress) / float(self.transferSize))*100) + lock2.release() + return value / 100 + else: + return 1 + + +# Class for sending email with ability to get current progress, derived from emailbase class +class Email(EmailBase, smtplib.SMTP): + + def __init__(self, *args, **kwargs): + smtplib.SMTP.__init__(self, *args, **kwargs) + + +# Class for sending ssl encrypted email with ability to get current progress, , derived from emailbase class +class EmailSSL(EmailBase, smtplib.SMTP_SSL): + + def __init__(self, *args, **kwargs): + smtplib.SMTP_SSL.__init__(self, *args, **kwargs) + + +class TaskEmail(CalibreTask): + def __init__(self, subject, filepath, attachment, settings, recipient, taskMessage, text, internal=False): + super(TaskEmail, self).__init__(taskMessage) + self.subject = subject + self.attachment = attachment + self.settings = settings + self.filepath = filepath + self.recipent = recipient + self.text = text + self.asyncSMTP = None + + self.results = dict() + + def run(self, worker_thread): + # create MIME message + msg = MIMEMultipart() + msg['Subject'] = self.subject + msg['Message-Id'] = make_msgid('calibre-web') + msg['Date'] = formatdate(localtime=True) + text = self.text + msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8')) + if self.attachment: + result = self._get_attachment(self.filepath, self.attachment) + if result: + msg.attach(result) + else: + self._handleError(u"Attachment not found") + return + + msg['From'] = self.settings["mail_from"] + msg['To'] = self.recipent + + use_ssl = int(self.settings.get('mail_use_ssl', 0)) + try: + # convert MIME message to string + fp = StringIO() + gen = Generator(fp, mangle_from_=False) + gen.flatten(msg) + msg = fp.getvalue() + + # send email + timeout = 600 # set timeout to 5mins + + # redirect output to logfile on python2 pn python3 debugoutput is caught with overwritten + # _print_debug function + if sys.version_info < (3, 0): + org_smtpstderr = smtplib.stderr + smtplib.stderr = logger.StderrLogger('worker.smtp') + + if use_ssl == 2: + self.asyncSMTP = EmailSSL(self.settings["mail_server"], self.settings["mail_port"], + timeout=timeout) + else: + self.asyncSMTP = Email(self.settings["mail_server"], self.settings["mail_port"], timeout=timeout) + + # link to logginglevel + if logger.is_debug_enabled(): + self.asyncSMTP.set_debuglevel(1) + if use_ssl == 1: + self.asyncSMTP.starttls() + if self.settings["mail_password"]: + self.asyncSMTP.login(str(self.settings["mail_login"]), str(self.settings["mail_password"])) + self.asyncSMTP.sendmail(self.settings["mail_from"], self.recipent, msg) + self.asyncSMTP.quit() + self._handleSuccess() + + if sys.version_info < (3, 0): + smtplib.stderr = org_smtpstderr + + except (MemoryError) as e: + log.exception(e) + self._handleError(u'MemoryError sending email: ' + str(e)) + return None + except (smtplib.SMTPException, smtplib.SMTPAuthenticationError) as e: + if hasattr(e, "smtp_error"): + text = e.smtp_error.decode('utf-8').replace("\n", '. ') + elif hasattr(e, "message"): + text = e.message + else: + log.exception(e) + text = '' + self._handleError(u'Smtplib Error sending email: ' + text) + return None + except (socket.error) as e: + self._handleError(u'Socket Error sending email: ' + e.strerror) + return None + + @property + def progress(self): + if self.asyncSMTP is not None: + return self.asyncSMTP.getTransferStatus() + else: + return self._progress + + @progress.setter + def progress(self, x): + """This gets explicitly set when handle(Success|Error) are called. In this case, remove the SMTP connection""" + if x == 1: + self.asyncSMTP = None + self._progress = x + + + @classmethod + def _get_attachment(cls, bookpath, filename): + """Get file as MIMEBase message""" + calibrepath = config.config_calibre_dir + if config.config_use_google_drive: + df = gdriveutils.getFileFromEbooksFolder(bookpath, filename) + if df: + datafile = os.path.join(calibrepath, bookpath, filename) + if not os.path.exists(os.path.join(calibrepath, bookpath)): + os.makedirs(os.path.join(calibrepath, bookpath)) + df.GetContentFile(datafile) + else: + return None + file_ = open(datafile, 'rb') + data = file_.read() + file_.close() + os.remove(datafile) + else: + try: + file_ = open(os.path.join(calibrepath, bookpath, filename), 'rb') + data = file_.read() + file_.close() + except IOError as e: + log.exception(e) + log.error(u'The requested file could not be read. Maybe wrong permissions?') + return None + + attachment = MIMEBase('application', 'octet-stream') + attachment.set_payload(data) + encoders.encode_base64(attachment) + attachment.add_header('Content-Disposition', 'attachment', + filename=filename) + return attachment + + @property + def name(self): + return "Email" diff --git a/cps/tasks/upload.py b/cps/tasks/upload.py new file mode 100644 index 00000000..ce2cb07b --- /dev/null +++ b/cps/tasks/upload.py @@ -0,0 +1,19 @@ +from __future__ import division, print_function, unicode_literals + +from datetime import datetime +from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS + +class TaskUpload(CalibreTask): + def __init__(self, taskMessage): + super(TaskUpload, self).__init__(taskMessage) + self.start_time = self.end_time = datetime.now() + self.stat = STAT_FINISH_SUCCESS + self.progress = 1 + + def run(self, worker_thread): + """Upload task doesn't have anything to do, it's simply a way to add information to the task list""" + pass + + @property + def name(self): + return "Upload" diff --git a/cps/web.py b/cps/web.py index a2ac4e3a..7f508576 100644 --- a/cps/web.py +++ b/cps/web.py @@ -33,7 +33,7 @@ import re from babel.dates import format_date from babel import Locale as LC from babel.core import UnknownLocaleError -from flask import Blueprint +from flask import Blueprint, jsonify from flask import render_template, request, redirect, send_from_directory, make_response, g, flash, abort, url_for from flask_babel import gettext as _ from flask_login import login_user, logout_user, login_required, current_user, confirm_login @@ -42,6 +42,9 @@ from sqlalchemy.sql.expression import text, func, true, false, not_, and_, or_ from sqlalchemy.orm.attributes import flag_modified from werkzeug.exceptions import default_exceptions, InternalServerError from sqlalchemy.sql.functions import coalesce + +from .services.worker import WorkerThread + try: from werkzeug.exceptions import FailedDependency except ImportError: @@ -49,11 +52,11 @@ except ImportError: from werkzeug.datastructures import Headers from werkzeug.security import generate_password_hash, check_password_hash -from . import constants, logger, isoLanguages, services, worker, cli +from . import constants, logger, isoLanguages, services from . import searched_ids, lm, babel, db, ub, config, get_locale, app from . import calibre_db from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download -from .helper import check_valid_domain, render_task_status, json_serial, \ +from .helper import check_valid_domain, render_task_status, \ get_cc_columns, get_book_cover, get_download_link, send_mail, generate_random_password, \ send_registration_mail, check_send_to_kindle, check_read_formats, tags_filters, reset_password from .pagination import Pagination @@ -376,12 +379,8 @@ def import_ldap_users(): @web.route("/ajax/emailstat") @login_required def get_email_status_json(): - tasks = worker.get_taskstatus() - answer = render_task_status(tasks) - js = json.dumps(answer, default=json_serial) - response = make_response(js) - response.headers["Content-Type"] = "application/json; charset=utf-8" - return response + tasks = WorkerThread.getInstance().tasks + return jsonify(render_task_status(tasks)) @web.route("/ajax/bookmark//", methods=['POST']) @@ -1173,7 +1172,7 @@ def category_list(): @login_required def get_tasks_status(): # if current user admin, show all email, otherwise only own emails - tasks = worker.get_taskstatus() + tasks = WorkerThread.getInstance().tasks answer = render_task_status(tasks) return render_title_template('tasks.html', entries=answer, title=_(u"Tasks"), page="tasks") @@ -1686,6 +1685,7 @@ def profile(): title=_(u"%(name)s's profile", name=current_user.nickname), page="me", kobo_support=kobo_support, registered_oauth=local_oauth_check, oauth_status=oauth_status) + current_user.email = to_save["email"] if "nickname" in to_save and to_save["nickname"] != current_user.nickname: # Query User nickname, if not existing, change if not ub.session.query(ub.User).filter(ub.User.nickname == to_save["nickname"]).scalar(): @@ -1702,7 +1702,6 @@ def profile(): title=_(u"Edit User %(nick)s", nick=current_user.nickname), page="edituser") - current_user.email = to_save["email"] if "show_random" in to_save and to_save["show_random"] == "on": current_user.random_books = 1 if "default_language" in to_save: diff --git a/cps/worker.py b/cps/worker.py deleted file mode 100644 index 6d17f470..00000000 --- a/cps/worker.py +++ /dev/null @@ -1,602 +0,0 @@ -# -*- coding: utf-8 -*- - -# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web) -# Copyright (C) 2018-2019 OzzieIsaacs, bodybybuddha, janeczku -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -from __future__ import division, print_function, unicode_literals -import sys -import os -import re -import smtplib -import socket -import time -import threading -try: - import queue -except ImportError: - import Queue as queue -from glob import glob -from shutil import copyfile -from datetime import datetime - -try: - from StringIO import StringIO - from email.MIMEBase import MIMEBase - from email.MIMEMultipart import MIMEMultipart - from email.MIMEText import MIMEText -except ImportError: - from io import StringIO - from email.mime.base import MIMEBase - from email.mime.multipart import MIMEMultipart - from email.mime.text import MIMEText - -from email import encoders -from email.utils import formatdate -from email.utils import make_msgid -from email.generator import Generator -from flask_babel import gettext as _ - -from . import calibre_db, db -from . import logger, config -from .subproc_wrapper import process_open -from . import gdriveutils - -log = logger.create() - -chunksize = 8192 -# task 'status' consts -STAT_WAITING = 0 -STAT_FAIL = 1 -STAT_STARTED = 2 -STAT_FINISH_SUCCESS = 3 -#taskType consts -TASK_EMAIL = 1 -TASK_CONVERT = 2 -TASK_UPLOAD = 3 -TASK_CONVERT_ANY = 4 - -RET_FAIL = 0 -RET_SUCCESS = 1 - - -def _get_main_thread(): - for t in threading.enumerate(): - if t.__class__.__name__ == '_MainThread': - return t - raise Exception("main thread not found?!") - - -# For gdrive download book from gdrive to calibredir (temp dir for books), read contents in both cases and append -# it in MIME Base64 encoded to -def get_attachment(bookpath, filename): - """Get file as MIMEBase message""" - calibrepath = config.config_calibre_dir - if config.config_use_google_drive: - df = gdriveutils.getFileFromEbooksFolder(bookpath, filename) - if df: - datafile = os.path.join(calibrepath, bookpath, filename) - if not os.path.exists(os.path.join(calibrepath, bookpath)): - os.makedirs(os.path.join(calibrepath, bookpath)) - df.GetContentFile(datafile) - else: - return None - file_ = open(datafile, 'rb') - data = file_.read() - file_.close() - os.remove(datafile) - else: - try: - file_ = open(os.path.join(calibrepath, bookpath, filename), 'rb') - data = file_.read() - file_.close() - except IOError as e: - log.exception(e) - log.error(u'The requested file could not be read. Maybe wrong permissions?') - return None - - attachment = MIMEBase('application', 'octet-stream') - attachment.set_payload(data) - encoders.encode_base64(attachment) - attachment.add_header('Content-Disposition', 'attachment', - filename=filename) - return attachment - - -# Class for sending email with ability to get current progress -class emailbase(): - - transferSize = 0 - progress = 0 - - def data(self, msg): - self.transferSize = len(msg) - (code, resp) = smtplib.SMTP.data(self, msg) - self.progress = 0 - return (code, resp) - - def send(self, strg): - """Send `strg' to the server.""" - log.debug('send: %r', strg[:300]) - if hasattr(self, 'sock') and self.sock: - try: - if self.transferSize: - lock=threading.Lock() - lock.acquire() - self.transferSize = len(strg) - lock.release() - for i in range(0, self.transferSize, chunksize): - if isinstance(strg, bytes): - self.sock.send((strg[i:i+chunksize])) - else: - self.sock.send((strg[i:i + chunksize]).encode('utf-8')) - lock.acquire() - self.progress = i - lock.release() - else: - self.sock.sendall(strg.encode('utf-8')) - except socket.error: - self.close() - raise smtplib.SMTPServerDisconnected('Server not connected') - else: - raise smtplib.SMTPServerDisconnected('please run connect() first') - - @classmethod - def _print_debug(self, *args): - log.debug(args) - - def getTransferStatus(self): - if self.transferSize: - lock2 = threading.Lock() - lock2.acquire() - value = int((float(self.progress) / float(self.transferSize))*100) - lock2.release() - return str(value) + ' %' - else: - return "100 %" - - -# Class for sending email with ability to get current progress, derived from emailbase class -class email(emailbase, smtplib.SMTP): - - def __init__(self, *args, **kwargs): - smtplib.SMTP.__init__(self, *args, **kwargs) - - -# Class for sending ssl encrypted email with ability to get current progress, , derived from emailbase class -class email_SSL(emailbase, smtplib.SMTP_SSL): - - def __init__(self, *args, **kwargs): - smtplib.SMTP_SSL.__init__(self, *args, **kwargs) - - -#Class for all worker tasks in the background -class WorkerThread(threading.Thread): - - def __init__(self): - threading.Thread.__init__(self) - self.status = 0 - self.current = 0 - self.last = 0 - self.queue = list() - self.UIqueue = list() - self.asyncSMTP = None - self.id = 0 - self.db_queue = queue.Queue() - calibre_db.add_queue(self.db_queue) - self.doLock = threading.Lock() - - # Main thread loop starting the different tasks - def run(self): - main_thread = _get_main_thread() - while main_thread.is_alive(): - try: - self.doLock.acquire() - if self.current != self.last: - index = self.current - log.info(index) - log.info(len(self.queue)) - self.doLock.release() - if self.queue[index]['taskType'] == TASK_EMAIL: - self._send_raw_email() - elif self.queue[index]['taskType'] in (TASK_CONVERT, TASK_CONVERT_ANY): - self._convert_any_format() - # TASK_UPLOAD is handled implicitly - self.doLock.acquire() - self.current += 1 - if self.current > self.last: - self.current = self.last - self.doLock.release() - else: - self.doLock.release() - except Exception as e: - log.exception(e) - self.doLock.release() - if main_thread.is_alive(): - time.sleep(1) - - def get_send_status(self): - if self.asyncSMTP: - return self.asyncSMTP.getTransferStatus() - else: - return "0 %" - - def _delete_completed_tasks(self): - for index, task in reversed(list(enumerate(self.UIqueue))): - if task['progress'] == "100 %": - # delete tasks - self.queue.pop(index) - self.UIqueue.pop(index) - # if we are deleting entries before the current index, adjust the index - if index <= self.current and self.current: - self.current -= 1 - self.last = len(self.queue) - - def get_taskstatus(self): - self.doLock.acquire() - if self.current < len(self.queue): - if self.UIqueue[self.current]['stat'] == STAT_STARTED: - if self.queue[self.current]['taskType'] == TASK_EMAIL: - self.UIqueue[self.current]['progress'] = self.get_send_status() - self.UIqueue[self.current]['formRuntime'] = datetime.now() - self.queue[self.current]['starttime'] - self.UIqueue[self.current]['rt'] = self.UIqueue[self.current]['formRuntime'].days*24*60 \ - + self.UIqueue[self.current]['formRuntime'].seconds \ - + self.UIqueue[self.current]['formRuntime'].microseconds - self.doLock.release() - return self.UIqueue - - def _convert_any_format(self): - # convert book, and upload in case of google drive - self.doLock.acquire() - index = self.current - self.doLock.release() - self.UIqueue[index]['stat'] = STAT_STARTED - self.queue[index]['starttime'] = datetime.now() - self.UIqueue[index]['formStarttime'] = self.queue[index]['starttime'] - curr_task = self.queue[index]['taskType'] - filename = self._convert_ebook_format() - if filename: - if config.config_use_google_drive: - gdriveutils.updateGdriveCalibreFromLocal() - if curr_task == TASK_CONVERT: - self.add_email(self.queue[index]['settings']['subject'], self.queue[index]['path'], - filename, self.queue[index]['settings'], self.queue[index]['kindle'], - self.UIqueue[index]['user'], self.queue[index]['title'], - self.queue[index]['settings']['body'], internal=True) - - - def _convert_ebook_format(self): - error_message = None - self.doLock.acquire() - index = self.current - self.doLock.release() - file_path = self.queue[index]['file_path'] - book_id = self.queue[index]['bookid'] - format_old_ext = u'.' + self.queue[index]['settings']['old_book_format'].lower() - format_new_ext = u'.' + self.queue[index]['settings']['new_book_format'].lower() - - # check to see if destination format already exists - - # if it does - mark the conversion task as complete and return a success - # this will allow send to kindle workflow to continue to work - if os.path.isfile(file_path + format_new_ext): - log.info("Book id %d already converted to %s", book_id, format_new_ext) - cur_book = calibre_db.get_book(book_id) - self.queue[index]['path'] = file_path - self.queue[index]['title'] = cur_book.title - self._handleSuccess() - return file_path + format_new_ext - else: - log.info("Book id %d - target format of %s does not exist. Moving forward with convert.", - book_id, - format_new_ext) - - if config.config_kepubifypath and format_old_ext == '.epub' and format_new_ext == '.kepub': - check, error_message = self._convert_kepubify(file_path, - format_old_ext, - format_new_ext, - index) - else: - # check if calibre converter-executable is existing - if not os.path.exists(config.config_converterpath): - # ToDo Text is not translated - self._handleError(_(u"Calibre ebook-convert %(tool)s not found", tool=config.config_converterpath)) - return - check, error_message = self._convert_calibre(file_path, format_old_ext, format_new_ext, index) - - if check == 0: - cur_book = calibre_db.get_book(book_id) - if os.path.isfile(file_path + format_new_ext): - # self.db_queue.join() - new_format = db.Data(name=cur_book.data[0].name, - book_format=self.queue[index]['settings']['new_book_format'].upper(), - book=book_id, uncompressed_size=os.path.getsize(file_path + format_new_ext)) - task = {'task':'add_format','id': book_id, 'format': new_format} - self.db_queue.put(task) - # To Do how to handle error? - - '''cur_book.data.append(new_format) - try: - # db.session.merge(cur_book) - calibre_db.session.commit() - except OperationalError as e: - calibre_db.session.rollback() - log.error("Database error: %s", e) - self._handleError(_(u"Database error: %(error)s.", error=e)) - return''' - - self.queue[index]['path'] = cur_book.path - self.queue[index]['title'] = cur_book.title - if config.config_use_google_drive: - os.remove(file_path + format_old_ext) - self._handleSuccess() - return file_path + format_new_ext - else: - error_message = format_new_ext.upper() + ' format not found on disk' - log.info("ebook converter failed with error while converting book") - if not error_message: - error_message = 'Ebook converter failed with unknown error' - self._handleError(error_message) - return - - - def _convert_calibre(self, file_path, format_old_ext, format_new_ext, index): - try: - # Linux py2.7 encode as list without quotes no empty element for parameters - # linux py3.x no encode and as list without quotes no empty element for parameters - # windows py2.7 encode as string with quotes empty element for parameters is okay - # windows py 3.x no encode and as string with quotes empty element for parameters is okay - # separate handling for windows and linux - quotes = [1, 2] - command = [config.config_converterpath, (file_path + format_old_ext), - (file_path + format_new_ext)] - quotes_index = 3 - if config.config_calibre: - parameters = config.config_calibre.split(" ") - for param in parameters: - command.append(param) - quotes.append(quotes_index) - quotes_index += 1 - - p = process_open(command, quotes) - except OSError as e: - return 1, _(u"Ebook-converter failed: %(error)s", error=e) - - while p.poll() is None: - nextline = p.stdout.readline() - if os.name == 'nt' and sys.version_info < (3, 0): - nextline = nextline.decode('windows-1252') - elif os.name == 'posix' and sys.version_info < (3, 0): - nextline = nextline.decode('utf-8') - log.debug(nextline.strip('\r\n')) - # parse progress string from calibre-converter - progress = re.search(r"(\d+)%\s.*", nextline) - if progress: - self.UIqueue[index]['progress'] = progress.group(1) + ' %' - - # process returncode - check = p.returncode - calibre_traceback = p.stderr.readlines() - error_message = "" - for ele in calibre_traceback: - if sys.version_info < (3, 0): - ele = ele.decode('utf-8') - log.debug(ele.strip('\n')) - if not ele.startswith('Traceback') and not ele.startswith(' File'): - error_message = "Calibre failed with error: %s" % ele.strip('\n') - return check, error_message - - - def _convert_kepubify(self, file_path, format_old_ext, format_new_ext, index): - quotes = [1, 3] - command = [config.config_kepubifypath, (file_path + format_old_ext), '-o', os.path.dirname(file_path)] - try: - p = process_open(command, quotes) - except OSError as e: - return 1, _(u"Kepubify-converter failed: %(error)s", error=e) - self.UIqueue[index]['progress'] = '1 %' - while True: - nextline = p.stdout.readlines() - nextline = [x.strip('\n') for x in nextline if x != '\n'] - if sys.version_info < (3, 0): - nextline = [x.decode('utf-8') for x in nextline] - for line in nextline: - log.debug(line) - if p.poll() is not None: - break - - # ToD Handle - # process returncode - check = p.returncode - - # move file - if check == 0: - converted_file = glob(os.path.join(os.path.dirname(file_path), "*.kepub.epub")) - if len(converted_file) == 1: - copyfile(converted_file[0], (file_path + format_new_ext)) - os.unlink(converted_file[0]) - else: - return 1, _(u"Converted file not found or more than one file in folder %(folder)s", - folder=os.path.dirname(file_path)) - return check, None - - - def add_convert(self, file_path, bookid, user_name, taskMessage, settings, kindle_mail=None): - self.doLock.acquire() - if self.last >= 20: - self._delete_completed_tasks() - # progress, runtime, and status = 0 - self.id += 1 - task = TASK_CONVERT_ANY - if kindle_mail: - task = TASK_CONVERT - self.queue.append({'file_path':file_path, 'bookid':bookid, 'starttime': 0, 'kindle': kindle_mail, - 'taskType': task, 'settings':settings}) - self.UIqueue.append({'user': user_name, 'formStarttime': '', 'progress': " 0 %", 'taskMess': taskMessage, - 'runtime': '0 s', 'stat': STAT_WAITING,'id': self.id, 'taskType': task } ) - - self.last=len(self.queue) - self.doLock.release() - - def add_email(self, subject, filepath, attachment, settings, recipient, user_name, taskMessage, - text, internal=False): - # if more than 20 entries in the list, clean the list - self.doLock.acquire() - if self.last >= 20: - self._delete_completed_tasks() - if internal: - self.current-= 1 - # progress, runtime, and status = 0 - self.id += 1 - self.queue.append({'subject':subject, 'attachment':attachment, 'filepath':filepath, - 'settings':settings, 'recipent':recipient, 'starttime': 0, - 'taskType': TASK_EMAIL, 'text':text}) - self.UIqueue.append({'user': user_name, 'formStarttime': '', 'progress': " 0 %", 'taskMess': taskMessage, - 'runtime': '0 s', 'stat': STAT_WAITING,'id': self.id, 'taskType': TASK_EMAIL }) - self.last=len(self.queue) - self.doLock.release() - - def add_upload(self, user_name, taskMessage): - # if more than 20 entries in the list, clean the list - self.doLock.acquire() - - - if self.last >= 20: - self._delete_completed_tasks() - # progress=100%, runtime=0, and status finished - self.id += 1 - starttime = datetime.now() - self.queue.append({'starttime': starttime, 'taskType': TASK_UPLOAD}) - self.UIqueue.append({'user': user_name, 'formStarttime': starttime, 'progress': "100 %", 'taskMess': taskMessage, - 'runtime': '0 s', 'stat': STAT_FINISH_SUCCESS,'id': self.id, 'taskType': TASK_UPLOAD}) - self.last=len(self.queue) - self.doLock.release() - - def _send_raw_email(self): - self.doLock.acquire() - index = self.current - self.doLock.release() - self.queue[index]['starttime'] = datetime.now() - self.UIqueue[index]['formStarttime'] = self.queue[index]['starttime'] - self.UIqueue[index]['stat'] = STAT_STARTED - obj=self.queue[index] - # create MIME message - msg = MIMEMultipart() - msg['Subject'] = self.queue[index]['subject'] - msg['Message-Id'] = make_msgid('calibre-web') - msg['Date'] = formatdate(localtime=True) - text = self.queue[index]['text'] - msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8')) - if obj['attachment']: - result = get_attachment(obj['filepath'], obj['attachment']) - if result: - msg.attach(result) - else: - self._handleError(u"Attachment not found") - return - - msg['From'] = obj['settings']["mail_from"] - msg['To'] = obj['recipent'] - - use_ssl = int(obj['settings'].get('mail_use_ssl', 0)) - try: - # convert MIME message to string - fp = StringIO() - gen = Generator(fp, mangle_from_=False) - gen.flatten(msg) - msg = fp.getvalue() - - # send email - timeout = 600 # set timeout to 5mins - - # redirect output to logfile on python2 pn python3 debugoutput is caught with overwritten - # _print_debug function - if sys.version_info < (3, 0): - org_smtpstderr = smtplib.stderr - smtplib.stderr = logger.StderrLogger('worker.smtp') - - if use_ssl == 2: - self.asyncSMTP = email_SSL(obj['settings']["mail_server"], obj['settings']["mail_port"], timeout=timeout) - else: - self.asyncSMTP = email(obj['settings']["mail_server"], obj['settings']["mail_port"], timeout=timeout) - - # link to logginglevel - if logger.is_debug_enabled(): - self.asyncSMTP.set_debuglevel(1) - if use_ssl == 1: - self.asyncSMTP.starttls() - if obj['settings']["mail_password"]: - self.asyncSMTP.login(str(obj['settings']["mail_login"]), str(obj['settings']["mail_password"])) - self.asyncSMTP.sendmail(obj['settings']["mail_from"], obj['recipent'], msg) - self.asyncSMTP.quit() - self._handleSuccess() - - if sys.version_info < (3, 0): - smtplib.stderr = org_smtpstderr - - except (MemoryError) as e: - log.exception(e) - self._handleError(u'MemoryError sending email: ' + str(e)) - return None - except (smtplib.SMTPException, smtplib.SMTPAuthenticationError) as e: - if hasattr(e, "smtp_error"): - text = e.smtp_error.decode('utf-8').replace("\n",'. ') - elif hasattr(e, "message"): - text = e.message - else: - log.exception(e) - text = '' - self._handleError(u'Smtplib Error sending email: ' + text) - return None - except (socket.error) as e: - self._handleError(u'Socket Error sending email: ' + e.strerror) - return None - - def _handleError(self, error_message): - log.error(error_message) - self.doLock.acquire() - index = self.current - self.doLock.release() - self.UIqueue[index]['stat'] = STAT_FAIL - self.UIqueue[index]['progress'] = "100 %" - self.UIqueue[index]['formRuntime'] = datetime.now() - self.queue[index]['starttime'] - self.UIqueue[index]['message'] = error_message - - def _handleSuccess(self): - self.doLock.acquire() - index = self.current - self.doLock.release() - self.UIqueue[index]['stat'] = STAT_FINISH_SUCCESS - self.UIqueue[index]['progress'] = "100 %" - self.UIqueue[index]['formRuntime'] = datetime.now() - self.queue[index]['starttime'] - - -def get_taskstatus(): - return _worker.get_taskstatus() - - -def add_email(subject, filepath, attachment, settings, recipient, user_name, taskMessage, text): - return _worker.add_email(subject, filepath, attachment, settings, recipient, user_name, taskMessage, text) - - -def add_upload(user_name, taskMessage): - return _worker.add_upload(user_name, taskMessage) - - -def add_convert(file_path, bookid, user_name, taskMessage, settings, kindle_mail=None): - return _worker.add_convert(file_path, bookid, user_name, taskMessage, settings, kindle_mail) - - -_worker = WorkerThread() -_worker.start()