diff --git a/cps/helper.py b/cps/helper.py index e3a0e5b1..2b2a0adb 100644 --- a/cps/helper.py +++ b/cps/helper.py @@ -723,7 +723,7 @@ def format_runtime(runtime): # helper function to apply localize status information in tasklist entries def render_task_status(tasklist): renderedtasklist = list() - for user, task in tasklist: + for user, added, task in tasklist: if user == current_user.nickname or current_user.role_admin(): ret = {} if task.start_time: diff --git a/cps/services/worker.py b/cps/services/worker.py index e4ccf988..bb3e3b07 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -1,16 +1,18 @@ from __future__ import division, print_function, unicode_literals import threading +import abc +import uuid try: import queue except ImportError: import Queue as queue from datetime import datetime +from collections import namedtuple from cps import calibre_db from cps import logger -import abc log = logger.create() @@ -20,6 +22,11 @@ STAT_FAIL = 1 STAT_STARTED = 2 STAT_FINISH_SUCCESS = 3 +# Only retain this many tasks in dequeued list +TASK_CLEANUP_TRIGGER = 20 + +QueuedTask = namedtuple('QueuedTask', 'user, added, task') + def _get_main_thread(): for t in threading.enumerate(): @@ -51,10 +58,7 @@ class WorkerThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) - self.finished = list() - - self.db_queue = queue.Queue() - calibre_db.add_queue(self.db_queue) + self.dequeued = list() self.doLock = threading.Lock() self.queue = ImprovedQueue() @@ -64,43 +68,41 @@ class WorkerThread(threading.Thread): @classmethod def add(cls, user, task): ins = cls.getInstance() - ins.queue.put((user, task)) + ins.queue.put(QueuedTask( + user=user, + added=datetime.now(), + task=task, + )) @property def tasks(self): with self.doLock: - tasks = list(self.queue.to_list()) + self.finished - return tasks # todo: order by data added + tasks = list(self.queue.to_list()) + self.dequeued + return sorted(tasks, key=lambda x: x.added) # Main thread loop starting the different tasks def run(self): main_thread = _get_main_thread() while main_thread.is_alive(): - user, item = self.queue.get() + item = self.queue.get() # add to list so that in-progress tasks show up with self.doLock: - self.finished.append((user, item)) + # Remove completed tasks if needed + if len(self.dequeued) > TASK_CLEANUP_TRIGGER: + # sort first (just to be certain), then lob off the extra + self.dequeued = sorted(self.dequeued, key=lambda x: x.added)[-1 * TASK_CLEANUP_TRIGGER:] + self.dequeued.append(item) + + user, added, task = item # sometimes tasks (like Upload) don't actually have work to do and are created as already finished - if item.stat is STAT_WAITING: + if task.stat is STAT_WAITING: # CalibreTask.start() should wrap all exceptions in it's own error handling - item.start(self) + task.start(self) self.queue.task_done() - def _delete_completed_tasks(self): - raise NotImplementedError() - # for index, task in reversed(list(enumerate(self.UIqueue))): - # if task['progress'] == "100 %": - # # delete tasks - # self.queue.pop(index) - # self.UIqueue.pop(index) - # # if we are deleting entries before the current index, adjust the index - # if index <= self.current and self.current: - # self.current -= 1 - # self.last = len(self.queue) - class CalibreTask: __metaclass__ = abc.ABCMeta @@ -111,6 +113,7 @@ class CalibreTask: self.start_time = None self.end_time = None self.message = message + self.id = uuid.uuid4() @abc.abstractmethod def run(self, worker_thread):