@ -82,33 +82,37 @@ class WorkerThread(threading.Thread):
tasks = self . queue . to_list ( ) + self . dequeued
return sorted ( tasks , key = lambda x : x . num )
def cleanup_tasks ( self ) :
with self . doLock :
dead = [ ]
alive = [ ]
for x in self . dequeued :
( dead if x . task . dead else alive ) . append ( x )
# if the ones that we need to keep are within the trigger, do nothing else
delta = len ( self . dequeued ) - len ( dead )
if delta > TASK_CLEANUP_TRIGGER :
ret = alive
else :
# otherwise, lop off the oldest dead tasks until we hit the target trigger
ret = sorted ( dead , key = lambda x : x . task . end_time ) [ - TASK_CLEANUP_TRIGGER : ] + alive
self . dequeued = sorted ( ret , key = lambda x : x . num )
# Main thread loop starting the different tasks
def run ( self ) :
main_thread = _get_main_thread ( )
while main_thread . is_alive ( ) :
# this blocks until something is available
item = self . queue . get ( )
with self . doLock :
# once we hit our trigger, start cleaning up dead tasks
if len ( self . dequeued ) > TASK_CLEANUP_TRIGGER :
dead = [ ]
alive = [ ]
for x in self . dequeued :
( dead if x . task . dead else alive ) . append ( x )
# if the ones that we need to keep are within the trigger, do nothing else
delta = len ( self . dequeued ) - len ( dead )
if delta > TASK_CLEANUP_TRIGGER :
ret = alive
else :
# otherwise, lop off the oldest dead tasks until we hit the target trigger
ret = sorted ( dead , key = lambda x : x . task . end_time ) [ - TASK_CLEANUP_TRIGGER : ] + alive
self . dequeued = sorted ( ret , key = lambda x : x . num )
# add to list so that in-progress tasks show up
self . dequeued . append ( item )
# once we hit our trigger, start cleaning up dead tasks
if len ( self . dequeued ) > TASK_CLEANUP_TRIGGER :
self . cleanup_tasks ( )
# sometimes tasks (like Upload) don't actually have work to do and are created as already finished
if item . task . stat is STAT_WAITING :
# CalibreTask.start() should wrap all exceptions in it's own error handling