diff --git a/cps/services/worker.py b/cps/services/worker.py index 92c5fa3d..e434528f 100644 --- a/cps/services/worker.py +++ b/cps/services/worker.py @@ -3,6 +3,7 @@ from __future__ import division, print_function, unicode_literals import threading import abc import uuid +import time try: import queue @@ -59,7 +60,7 @@ class WorkerThread(threading.Thread): threading.Thread.__init__(self) self.dequeued = list() - self.daemon = True + self.doLock = threading.Lock() self.queue = ImprovedQueue() self.num = 0 @@ -101,23 +102,33 @@ class WorkerThread(threading.Thread): # Main thread loop starting the different tasks def run(self): - # this blocks until something is available - item = self.queue.get() - - with self.doLock: - # add to list so that in-progress tasks show up - self.dequeued.append(item) - - # once we hit our trigger, start cleaning up dead tasks - if len(self.dequeued) > TASK_CLEANUP_TRIGGER: - self.cleanup_tasks() - - # sometimes tasks (like Upload) don't actually have work to do and are created as already finished - if item.task.stat is STAT_WAITING: - # CalibreTask.start() should wrap all exceptions in it's own error handling - item.task.start(self) - - self.queue.task_done() + main_thread = _get_main_thread() + while main_thread.is_alive(): + try: + # this blocks until something is available. This can cause issues when the main thread dies - this + # thread will remain alive. We implement a timeout to unblock every second which allows us to check if + # the main thread is still alive. + # We don't use a daemon here because we don't want the tasks to just be abruptly halted, leading to + # possible file / database corruption + item = self.queue.get(timeout=1) + except queue.Empty as ex: + time.sleep(1) + continue + + with self.doLock: + # add to list so that in-progress tasks show up + self.dequeued.append(item) + + # once we hit our trigger, start cleaning up dead tasks + if len(self.dequeued) > TASK_CLEANUP_TRIGGER: + self.cleanup_tasks() + + # sometimes tasks (like Upload) don't actually have work to do and are created as already finished + if item.task.stat is STAT_WAITING: + # CalibreTask.start() should wrap all exceptions in it's own error handling + item.task.start(self) + + self.queue.task_done() class CalibreTask: