From 9f64a965020efc87c73c0d43ecb52666f18424e5 Mon Sep 17 00:00:00 2001 From: Ozzieisaacs Date: Mon, 22 Jul 2019 18:28:45 +0200 Subject: [PATCH] Fix for #890 --- cps/worker.py | 100 +++++++++++++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 37 deletions(-) diff --git a/cps/worker.py b/cps/worker.py index 9744b91f..23432e00 100644 --- a/cps/worker.py +++ b/cps/worker.py @@ -195,15 +195,18 @@ class WorkerThread(threading.Thread): doLock = threading.Lock() doLock.acquire() if self.current != self.last: + index = self.current doLock.release() - if self.queue[self.current]['taskType'] == TASK_EMAIL: + if self.queue[index]['taskType'] == TASK_EMAIL: self._send_raw_email() - if self.queue[self.current]['taskType'] == TASK_CONVERT: + if self.queue[index]['taskType'] == TASK_CONVERT: self._convert_any_format() - if self.queue[self.current]['taskType'] == TASK_CONVERT_ANY: + if self.queue[index]['taskType'] == TASK_CONVERT_ANY: self._convert_any_format() # TASK_UPLOAD is handled implicitly + doLock.acquire() self.current += 1 + doLock.release() else: doLock.release() if main_thread.is_alive(): @@ -226,6 +229,8 @@ class WorkerThread(threading.Thread): self.last = len(self.queue) def get_taskstatus(self): + doLock = threading.Lock() + doLock.acquire() if self.current < len(self.queue): if self.UIqueue[self.current]['stat'] == STAT_STARTED: if self.queue[self.current]['taskType'] == TASK_EMAIL: @@ -234,30 +239,39 @@ class WorkerThread(threading.Thread): self.UIqueue[self.current]['rt'] = self.UIqueue[self.current]['formRuntime'].days*24*60 \ + self.UIqueue[self.current]['formRuntime'].seconds \ + self.UIqueue[self.current]['formRuntime'].microseconds + doLock.release() return self.UIqueue def _convert_any_format(self): # convert book, and upload in case of google drive - self.UIqueue[self.current]['stat'] = STAT_STARTED - self.queue[self.current]['starttime'] = datetime.now() - self.UIqueue[self.current]['formStarttime'] = self.queue[self.current]['starttime'] - curr_task = self.queue[self.current]['taskType'] + doLock = threading.Lock() + doLock.acquire() + index = self.current + doLock.release() + self.UIqueue[index]['stat'] = STAT_STARTED + self.queue[index]['starttime'] = datetime.now() + self.UIqueue[index]['formStarttime'] = self.queue[self.current]['starttime'] + curr_task = self.queue[index]['taskType'] filename = self._convert_ebook_format() if filename: if config.config_use_google_drive: gdriveutils.updateGdriveCalibreFromLocal() if curr_task == TASK_CONVERT: - self.add_email(self.queue[self.current]['settings']['subject'], self.queue[self.current]['path'], - filename, self.queue[self.current]['settings'], self.queue[self.current]['kindle'], - self.UIqueue[self.current]['user'], self.queue[self.current]['title'], - self.queue[self.current]['settings']['body']) + self.add_email(self.queue[index]['settings']['subject'], self.queue[index]['path'], + filename, self.queue[index]['settings'], self.queue[index]['kindle'], + self.UIqueue[index]['user'], self.queue[index]['title'], + self.queue[index]['settings']['body']) def _convert_ebook_format(self): error_message = None - file_path = self.queue[self.current]['file_path'] - bookid = self.queue[self.current]['bookid'] - format_old_ext = u'.' + self.queue[self.current]['settings']['old_book_format'].lower() - format_new_ext = u'.' + self.queue[self.current]['settings']['new_book_format'].lower() + doLock = threading.Lock() + doLock.acquire() + index = self.current + doLock.release() + file_path = self.queue[index]['file_path'] + bookid = self.queue[index]['bookid'] + format_old_ext = u'.' + self.queue[index]['settings']['old_book_format'].lower() + format_new_ext = u'.' + self.queue[index]['settings']['new_book_format'].lower() # check to see if destination format already exists - # if it does - mark the conversion task as complete and return a success @@ -265,8 +279,8 @@ class WorkerThread(threading.Thread): if os.path.isfile(file_path + format_new_ext): log.info("Book id %d already converted to %s", bookid, format_new_ext) cur_book = db.session.query(db.Books).filter(db.Books.id == bookid).first() - self.queue[self.current]['path'] = file_path - self.queue[self.current]['title'] = cur_book.title + self.queue[index]['path'] = file_path + self.queue[index]['title'] = cur_book.title self._handleSuccess() return file_path + format_new_ext else: @@ -304,13 +318,13 @@ class WorkerThread(threading.Thread): else:''' command = [config.config_converterpath, (file_path + format_old_ext), (file_path + format_new_ext)] - index = 3 + quotes_index = 3 if config.config_calibre: parameters = config.config_calibre.split(" ") for param in parameters: command.append(param) - quotes.append(index) - index += 1 + quotes.append(quotes_index) + quotes_index += 1 p = process_open(command, quotes) # p = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) except OSError as e: @@ -338,7 +352,7 @@ class WorkerThread(threading.Thread): # parse progress string from calibre-converter progress = re.search(r"(\d+)%\s.*", nextline) if progress: - self.UIqueue[self.current]['progress'] = progress.group(1) + ' %' + self.UIqueue[index]['progress'] = progress.group(1) + ' %' # process returncode check = p.returncode @@ -359,12 +373,12 @@ class WorkerThread(threading.Thread): cur_book = db.session.query(db.Books).filter(db.Books.id == bookid).first() if os.path.isfile(file_path + format_new_ext): new_format = db.Data(name=cur_book.data[0].name, - book_format=self.queue[self.current]['settings']['new_book_format'].upper(), + book_format=self.queue[index]['settings']['new_book_format'].upper(), book=bookid, uncompressed_size=os.path.getsize(file_path + format_new_ext)) cur_book.data.append(new_format) db.session.commit() - self.queue[self.current]['path'] = cur_book.path - self.queue[self.current]['title'] = cur_book.title + self.queue[index]['path'] = cur_book.path + self.queue[index]['title'] = cur_book.title if config.config_use_google_drive: os.remove(file_path + format_old_ext) self._handleSuccess() @@ -430,16 +444,20 @@ class WorkerThread(threading.Thread): def _send_raw_email(self): - self.queue[self.current]['starttime'] = datetime.now() - self.UIqueue[self.current]['formStarttime'] = self.queue[self.current]['starttime'] - self.UIqueue[self.current]['stat'] = STAT_STARTED - obj=self.queue[self.current] + doLock = threading.Lock() + doLock.acquire() + index = self.current + doLock.release() + self.queue[index]['starttime'] = datetime.now() + self.UIqueue[index]['formStarttime'] = self.queue[index]['starttime'] + self.UIqueue[index]['stat'] = STAT_STARTED + obj=self.queue[index] # create MIME message msg = MIMEMultipart() - msg['Subject'] = self.queue[self.current]['subject'] + msg['Subject'] = self.queue[index]['subject'] msg['Message-Id'] = make_msgid('calibre-web') msg['Date'] = formatdate(localtime=True) - text = self.queue[self.current]['text'] + text = self.queue[index]['text'] msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8')) if obj['attachment']: result = get_attachment(obj['filepath'], obj['attachment']) @@ -506,15 +524,23 @@ class WorkerThread(threading.Thread): def _handleError(self, error_message): log.error(error_message) - self.UIqueue[self.current]['stat'] = STAT_FAIL - self.UIqueue[self.current]['progress'] = "100 %" - self.UIqueue[self.current]['formRuntime'] = datetime.now() - self.queue[self.current]['starttime'] - self.UIqueue[self.current]['message'] = error_message + doLock = threading.Lock() + doLock.acquire() + index = self.current + doLock.release() + self.UIqueue[index]['stat'] = STAT_FAIL + self.UIqueue[index]['progress'] = "100 %" + self.UIqueue[index]['formRuntime'] = datetime.now() - self.queue[self.current]['starttime'] + self.UIqueue[index]['message'] = error_message def _handleSuccess(self): - self.UIqueue[self.current]['stat'] = STAT_FINISH_SUCCESS - self.UIqueue[self.current]['progress'] = "100 %" - self.UIqueue[self.current]['formRuntime'] = datetime.now() - self.queue[self.current]['starttime'] + doLock = threading.Lock() + doLock.acquire() + index = self.current + doLock.release() + self.UIqueue[index]['stat'] = STAT_FINISH_SUCCESS + self.UIqueue[index]['progress'] = "100 %" + self.UIqueue[index]['formRuntime'] = datetime.now() - self.queue[self.current]['starttime'] _worker = WorkerThread()