diff --git a/cps/editbooks.py b/cps/editbooks.py index 7330572b..903b50e3 100644 --- a/cps/editbooks.py +++ b/cps/editbooks.py @@ -35,6 +35,8 @@ from sqlalchemy.exc import OperationalError from . import constants, logger, isoLanguages, gdriveutils, uploader, helper from . import config, get_locale, ub, worker, db from . import calibre_db +from .services.worker import WorkerThread +from .tasks.upload import TaskUpload from .web import login_required_if_no_ano, render_title_template, edit_required, upload_required @@ -509,8 +511,8 @@ def upload_single_file(request, book, book_id): # Queue uploader info uploadText=_(u"File format %(ext)s added to %(book)s", ext=file_ext.upper(), book=book.title) - worker.add_upload(current_user.nickname, - "" + uploadText + "") + WorkerThread.add(current_user.nickname, TaskUpload( + "" + uploadText + "")) return uploader.process( saved_filename, *os.path.splitext(requested_file.filename), @@ -854,8 +856,8 @@ def upload(): if error: flash(error, category="error") uploadText=_(u"File %(file)s uploaded", file=title) - worker.add_upload(current_user.nickname, - "" + uploadText + "") + WorkerThread.add(current_user.nickname, TaskUpload( + "" + uploadText + "")) if len(request.files.getlist("btn-upload")) < 2: if current_user.role_edit() or current_user.role_admin(): diff --git a/cps/helper.py b/cps/helper.py index 1634ec63..cd6133dd 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -68,6 +68,7 @@ from .subproc_wrapper import process_wait from .worker import STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS from .worker import TASK_EMAIL, TASK_CONVERT, TASK_UPLOAD, TASK_CONVERT_ANY from .services.worker import WorkerThread +from .tasks.email import TaskEmail from . import tasks @@ -115,9 +116,9 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format, def send_test_mail(kindle_mail, user_name): - worker.add_email(_(u'Calibre-Web test e-mail'), None, None, - config.get_mail_settings(), kindle_mail, user_name, - _(u"Test e-mail"), _(u'This e-mail has been sent via Calibre-Web.')) + WorkerThread.add(user_name, TaskEmail(_(u'Calibre-Web test e-mail'), None, None, + config.get_mail_settings(), kindle_mail, _(u"Test e-mail"), + _(u'This e-mail has been sent via Calibre-Web.'))) return @@ -132,9 +133,9 @@ def send_registration_mail(e_mail, user_name, default_password, resend=False): text += "Don't forget to change your password after first login.\r\n" text += "Sincerely\r\n\r\n" text += "Your Calibre-Web team" - worker.add_email(_(u'Get Started with Calibre-Web'), None, None, + WorkerThread.add(None, TaskEmail(_(u'Get Started with Calibre-Web'), None, None, config.get_mail_settings(), e_mail, None, - _(u"Registration e-mail for user: %(name)s", name=user_name), text) + _(u"Registration e-mail for user: %(name)s", name=user_name), text)) return @@ -226,9 +227,9 @@ def send_mail(book_id, book_format, convert, kindle_mail, calibrepath, user_id): for entry in iter(book.data): if entry.format.upper() == book_format.upper(): converted_file_name = entry.name + '.' + book_format.lower() - worker.add_email(_(u"Send to Kindle"), book.path, converted_file_name, - config.get_mail_settings(), kindle_mail, user_id, - _(u"E-mail: %(book)s", book=book.title), _(u'This e-mail has been sent via Calibre-Web.')) + WorkerThread.add(user_id, TaskEmail(_(u"Send to Kindle"), book.path, converted_file_name, + config.get_mail_settings(), kindle_mail, + _(u"E-mail: %(book)s", book=book.title), _(u'This e-mail has been sent via Calibre-Web.'))) return return _(u"The requested file could not be read. Maybe wrong permissions?") diff --git a/cps/services/worker.py b/cps/services/worker.py index 61f199f9..61a4bae2 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -67,7 +67,7 @@ class ImprovedQueue(queue.Queue): #Class for all worker tasks in the background class WorkerThread(threading.Thread): - __instance = None + _instance = None @classmethod def getInstance(cls): @@ -112,11 +112,12 @@ class WorkerThread(threading.Thread): with self.doLock: self.finished.append((user, item)) - try: - item.start(self) - print(item) - except Exception as e: - log.exception(e) + # sometimes tasks (like Upload) don't actually have work to do and are created as already finished + if item.stat is STAT_WAITING: + try: + item.start(self) + except Exception as e: + log.exception(e) self.queue.task_done() @@ -161,6 +162,7 @@ class CalibreTask(metaclass=abc.ABCMeta): def start(self, *args): self.start_time = datetime.now() + self.stat = STAT_STARTED self.run(*args) self.end_time = datetime.now() diff --git a/cps/tasks/email.py b/cps/tasks/email.py new file mode 100644 index 00000000..fbce103e --- /dev/null +++ b/cps/tasks/email.py @@ -0,0 +1,221 @@ +from __future__ import division, print_function, unicode_literals +import sys +import os +import smtplib +import threading +import socket + +try: + from StringIO import StringIO + from email.MIMEBase import MIMEBase + from email.MIMEMultipart import MIMEMultipart + from email.MIMEText import MIMEText +except ImportError: + from io import StringIO + from email.mime.base import MIMEBase + from email.mime.multipart import MIMEMultipart + from email.mime.text import MIMEText + +from email import encoders +from email.utils import formatdate, make_msgid +from email.generator import Generator + +from cps.services.worker import CalibreTask +from cps import logger, config + +from cps import gdriveutils + +log = logger.create() + +chunksize = 8192 + + +# Class for sending email with ability to get current progress +class EmailBase(): + + transferSize = 0 + progress = 0 + + def data(self, msg): + self.transferSize = len(msg) + (code, resp) = smtplib.SMTP.data(self, msg) + self.progress = 0 + return (code, resp) + + def send(self, strg): + """Send `strg' to the server.""" + log.debug('send: %r', strg[:300]) + if hasattr(self, 'sock') and self.sock: + try: + if self.transferSize: + lock=threading.Lock() + lock.acquire() + self.transferSize = len(strg) + lock.release() + for i in range(0, self.transferSize, chunksize): + if isinstance(strg, bytes): + self.sock.send((strg[i:i+chunksize])) + else: + self.sock.send((strg[i:i + chunksize]).encode('utf-8')) + lock.acquire() + self.progress = i + lock.release() + else: + self.sock.sendall(strg.encode('utf-8')) + except socket.error: + self.close() + raise smtplib.SMTPServerDisconnected('Server not connected') + else: + raise smtplib.SMTPServerDisconnected('please run connect() first') + + @classmethod + def _print_debug(self, *args): + log.debug(args) + + def getTransferStatus(self): + if self.transferSize: + lock2 = threading.Lock() + lock2.acquire() + value = int((float(self.progress) / float(self.transferSize))*100) + lock2.release() + return str(value) + ' %' + else: + return "100 %" + + +# Class for sending email with ability to get current progress, derived from emailbase class +class Email(EmailBase, smtplib.SMTP): + + def __init__(self, *args, **kwargs): + smtplib.SMTP.__init__(self, *args, **kwargs) + + +# Class for sending ssl encrypted email with ability to get current progress, , derived from emailbase class +class EmailSSL(EmailBase, smtplib.SMTP_SSL): + + def __init__(self, *args, **kwargs): + smtplib.SMTP_SSL.__init__(self, *args, **kwargs) + + +class TaskEmail(CalibreTask): + def __init__(self, subject, filepath, attachment, settings, recipient, taskMessage, text, internal=False): + super().__init__(taskMessage) + self.subject = subject + self.attachment = attachment + self.settings = settings + self.filepath = filepath + self.recipent = recipient + self.text = text + + self.results = dict() + + def run(self, worker_thread): + # create MIME message + msg = MIMEMultipart() + msg['Subject'] = self.subject + msg['Message-Id'] = make_msgid('calibre-web') + msg['Date'] = formatdate(localtime=True) + text = self.text + msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8')) + if self.attachment: + result = self.get_attachment(self.filepath, self.attachment) + if result: + msg.attach(result) + else: + self._handleError(u"Attachment not found") + return + + msg['From'] = self.settings["mail_from"] + msg['To'] = self.recipent + + use_ssl = int(self.settings.get('mail_use_ssl', 0)) + try: + # convert MIME message to string + fp = StringIO() + gen = Generator(fp, mangle_from_=False) + gen.flatten(msg) + msg = fp.getvalue() + + # send email + timeout = 600 # set timeout to 5mins + + # redirect output to logfile on python2 pn python3 debugoutput is caught with overwritten + # _print_debug function + if sys.version_info < (3, 0): + org_smtpstderr = smtplib.stderr + smtplib.stderr = logger.StderrLogger('worker.smtp') + + if use_ssl == 2: + self.asyncSMTP = EmailSSL(self.settings["mail_server"], self.settings["mail_port"], + timeout=timeout) + else: + self.asyncSMTP = Email(self.settings["mail_server"], self.settings["mail_port"], timeout=timeout) + + # link to logginglevel + if logger.is_debug_enabled(): + self.asyncSMTP.set_debuglevel(1) + if use_ssl == 1: + self.asyncSMTP.starttls() + if self.settings["mail_password"]: + self.asyncSMTP.login(str(self.settings["mail_login"]), str(self.settings["mail_password"])) + self.asyncSMTP.sendmail(self.settings["mail_from"], self.recipent, msg) + self.asyncSMTP.quit() + self._handleSuccess() + + if sys.version_info < (3, 0): + smtplib.stderr = org_smtpstderr + + except (MemoryError) as e: + log.exception(e) + self._handleError(u'MemoryError sending email: ' + str(e)) + return None + except (smtplib.SMTPException, smtplib.SMTPAuthenticationError) as e: + if hasattr(e, "smtp_error"): + text = e.smtp_error.decode('utf-8').replace("\n", '. ') + elif hasattr(e, "message"): + text = e.message + else: + log.exception(e) + text = '' + self._handleError(u'Smtplib Error sending email: ' + text) + return None + except (socket.error) as e: + self._handleError(u'Socket Error sending email: ' + e.strerror) + return None + + def _get_attachment(bookpath, filename): + """Get file as MIMEBase message""" + calibrepath = config.config_calibre_dir + if config.config_use_google_drive: + df = gdriveutils.getFileFromEbooksFolder(bookpath, filename) + if df: + datafile = os.path.join(calibrepath, bookpath, filename) + if not os.path.exists(os.path.join(calibrepath, bookpath)): + os.makedirs(os.path.join(calibrepath, bookpath)) + df.GetContentFile(datafile) + else: + return None + file_ = open(datafile, 'rb') + data = file_.read() + file_.close() + os.remove(datafile) + else: + try: + file_ = open(os.path.join(calibrepath, bookpath, filename), 'rb') + data = file_.read() + file_.close() + except IOError as e: + log.exception(e) + log.error(u'The requested file could not be read. Maybe wrong permissions?') + return None + + attachment = MIMEBase('application', 'octet-stream') + attachment.set_payload(data) + encoders.encode_base64(attachment) + attachment.add_header('Content-Disposition', 'attachment', + filename=filename) + return attachment + + @property + def name(self): + return "Email" diff --git a/cps/tasks/upload.py b/cps/tasks/upload.py new file mode 100644 index 00000000..0e678b51 --- /dev/null +++ b/cps/tasks/upload.py @@ -0,0 +1,18 @@ +from __future__ import division, print_function, unicode_literals + +from datetime import datetime +from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS + +class TaskUpload(CalibreTask): + def __init__(self, taskMessage): + super().__init__(taskMessage) + self.start_time = self.end_time = datetime.now() + self.stat = STAT_FINISH_SUCCESS + + def run(self, worker_thread): + """Upload task doesn't have anything to do, it's simply a way to add information to the task list""" + pass + + @property + def name(self): + return "Upload" diff --git a/cps/web.py b/cps/web.py index 6064c1e9..e21068fa 100644 --- a/cps/web.py +++ b/cps/web.py @@ -41,6 +41,9 @@ from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError from sqlalchemy.sql.expression import text, func, true, false, not_, and_, or_ from werkzeug.exceptions import default_exceptions, InternalServerError from sqlalchemy.sql.functions import coalesce + +from .services.worker import WorkerThread + try: from werkzeug.exceptions import FailedDependency except ImportError: @@ -48,7 +51,7 @@ except ImportError: from werkzeug.datastructures import Headers from werkzeug.security import generate_password_hash, check_password_hash -from . import constants, logger, isoLanguages, services, worker, worker2, cli +from . import constants, logger, isoLanguages, services, worker, cli from . import searched_ids, lm, babel, db, ub, config, get_locale, app from . import calibre_db from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download @@ -383,7 +386,8 @@ def import_ldap_users(): @web.route("/ajax/emailstat") @login_required def get_email_status_json(): - tasks = worker2._worker2.tasks + + tasks = WorkerThread.getInstance().tasks return jsonify(render_task_status(tasks))