From e8aedeac36ebc363c601ed936791ac7bf9f48adc Mon Sep 17 00:00:00 2001 From: Ozzieisaacs Date: Sat, 9 May 2020 15:44:53 +0200 Subject: [PATCH] Refactored admin.py --- cps/admin.py | 542 +++++++++++++++++++++++++++------------------------ 1 file changed, 286 insertions(+), 256 deletions(-) diff --git a/cps/admin.py b/cps/admin.py index 340a4251..a546e518 100644 --- a/cps/admin.py +++ b/cps/admin.py @@ -429,7 +429,6 @@ def delete_restriction(res_type): return "" -#@admi.route("/ajax/listrestriction//", defaults={'user_id': '0'}) @admi.route("/ajax/listrestriction/") @login_required @admin_required @@ -475,6 +474,7 @@ def list_restriction(res_type): response.headers["Content-Type"] = "application/json; charset=utf-8" return response + @admi.route("/config", methods=["GET", "POST"]) @unconfigured def basic_configuration(): @@ -484,19 +484,23 @@ def basic_configuration(): return _configuration_result() -def _configuration_update_helper(): - reboot_required = False - db_change = False - to_save = request.form.to_dict() +def _config_int(to_save, x): + return config.set_from_dictionary(to_save, x, int) - _config_string = lambda x: config.set_from_dictionary(to_save, x, lambda y: y.strip() if y else y) - _config_int = lambda x: config.set_from_dictionary(to_save, x, int) - _config_checkbox = lambda x: config.set_from_dictionary(to_save, x, lambda y: y == "on", False) - _config_checkbox_int = lambda x: config.set_from_dictionary(to_save, x, lambda y: 1 if (y == "on") else 0, 0) - db_change |= _config_string("config_calibre_dir") +def _config_checkbox(to_save, x): + return config.set_from_dictionary(to_save, x, lambda y: y == "on", False) - # Google drive setup + +def _config_checkbox_int(to_save, x): + return config.set_from_dictionary(to_save, x, lambda y: 1 if (y == "on") else 0, 0) + + +def _config_string(to_save, x): + return config.set_from_dictionary(to_save, x, lambda y: y.strip() if y else y) + + +def _configuration_gdrive_helper(to_save): if not os.path.isfile(gdriveutils.SETTINGS_YAML): config.config_use_google_drive = False @@ -515,138 +519,162 @@ def _configuration_update_helper(): # always show google drive settings, but in case of error deny support config.config_use_google_drive = (not gdriveError) and ("config_use_google_drive" in to_save) - if _config_string("config_google_drive_folder"): + if _config_string(to_save, "config_google_drive_folder"): gdriveutils.deleteDatabaseOnChange() + return gdriveError + +def _configuration_oauth_helper(to_save): + active_oauths = 0 + for element in oauthblueprints: + if to_save["config_" + str(element['id']) + "_oauth_client_id"] != element['oauth_client_id'] \ + or to_save["config_" + str(element['id']) + "_oauth_client_secret"] != element['oauth_client_secret']: + reboot_required = True + element['oauth_client_id'] = to_save["config_" + str(element['id']) + "_oauth_client_id"] + element['oauth_client_secret'] = to_save["config_" + str(element['id']) + "_oauth_client_secret"] + if to_save["config_" + str(element['id']) + "_oauth_client_id"] \ + and to_save["config_" + str(element['id']) + "_oauth_client_secret"]: + active_oauths += 1 + element["active"] = 1 + else: + element["active"] = 0 + ub.session.query(ub.OAuthProvider).filter(ub.OAuthProvider.id == element['id']).update( + {"oauth_client_id": to_save["config_" + str(element['id']) + "_oauth_client_id"], + "oauth_client_secret": to_save["config_" + str(element['id']) + "_oauth_client_secret"], + "active": element["active"]}) + +def _configuration_logfile_helper(gdriveError, to_save): + reboot_required = False + reboot_required |= _config_int(to_save, "config_log_level") + reboot_required |= _config_string(to_save, "config_logfile") + if not logger.is_valid_logfile(config.config_logfile): + return _configuration_result(_('Logfile Location is not Valid, Please Enter Correct Path'), gdriveError) + + reboot_required |= _config_checkbox_int(to_save, "config_access_log") + reboot_required |= _config_string(to_save, "config_access_logfile") + if not logger.is_valid_logfile(config.config_access_logfile): + return _configuration_result(_('Access Logfile Location is not Valid, Please Enter Correct Path'), gdriveError) + return reboot_required + +def _configuration_ldap_helper(gdriveError, to_save): + reboot_required = False + reboot_required |= _config_string(to_save, "config_ldap_provider_url") + reboot_required |= _config_int(to_save, "config_ldap_port") + reboot_required |= _config_int(to_save, "config_ldap_authentication") + reboot_required |= _config_string(to_save, "config_ldap_dn") + reboot_required |= _config_string(to_save, "config_ldap_serv_username") + reboot_required |= _config_string(to_save, "config_ldap_user_object") + reboot_required |= _config_string(to_save, "config_ldap_group_object_filter") + reboot_required |= _config_string(to_save, "config_ldap_group_members_field") + reboot_required |= _config_checkbox(to_save, "config_ldap_openldap") + reboot_required |= _config_int(to_save, "config_ldap_encryption") + reboot_required |= _config_string(to_save, "config_ldap_cert_path") + _config_string(to_save, "config_ldap_group_name") + if "config_ldap_serv_password" in to_save and to_save["config_ldap_serv_password"] != "": + reboot_required |= 1 + config.set_from_dictionary(to_save, "config_ldap_serv_password", base64.b64encode, encode='UTF-8') + config.save() + + if not config.config_ldap_provider_url \ + or not config.config_ldap_port \ + or not config.config_ldap_dn \ + or not config.config_ldap_user_object: + return _configuration_result(_('Please Enter a LDAP Provider, ' + 'Port, DN and User Object Identifier'), gdriveError) + + if config.config_ldap_authentication > constants.LDAP_AUTH_ANONYMOUS: + if config.config_ldap_authentication > constants.LDAP_AUTH_UNAUTHENTICATE: + if not config.config_ldap_serv_username or not bool(config.config_ldap_serv_password): + return _configuration_result('Please Enter a LDAP Service Account and Password', gdriveError) + else: + if not config.config_ldap_serv_username: + return _configuration_result('Please Enter a LDAP Service Account', gdriveError) + + if config.config_ldap_group_object_filter: + if config.config_ldap_group_object_filter.count("%s") != 1: + return _configuration_result(_('LDAP Group Object Filter Needs to Have One "%s" Format Identifier'), + gdriveError) + if config.config_ldap_group_object_filter.count("(") != config.config_ldap_group_object_filter.count(")"): + return _configuration_result(_('LDAP Group Object Filter Has Unmatched Parenthesis'), + gdriveError) + + if config.config_ldap_user_object.count("%s") != 1: + return _configuration_result(_('LDAP User Object Filter needs to Have One "%s" Format Identifier'), + gdriveError) + if config.config_ldap_user_object.count("(") != config.config_ldap_user_object.count(")"): + return _configuration_result(_('LDAP User Object Filter Has Unmatched Parenthesis'), + gdriveError) + + if config.config_ldap_cert_path and not os.path.isdir(config.config_ldap_cert_path): + return _configuration_result(_('LDAP Certificate Location is not Valid, Please Enter Correct Path'), + gdriveError) + return reboot_required + + +def _configuration_update_helper(): + reboot_required = False + db_change = False + to_save = request.form.to_dict() + + + db_change |= _config_string(to_save, "config_calibre_dir") + + # Google drive setup + gdriveError = _configuration_gdrive_helper(to_save) + - reboot_required |= _config_int("config_port") + reboot_required |= _config_int(to_save, "config_port") - reboot_required |= _config_string("config_keyfile") + reboot_required |= _config_string(to_save, "config_keyfile") if config.config_keyfile and not os.path.isfile(config.config_keyfile): return _configuration_result(_('Keyfile Location is not Valid, Please Enter Correct Path'), gdriveError) - reboot_required |= _config_string("config_certfile") + reboot_required |= _config_string(to_save, "config_certfile") if config.config_certfile and not os.path.isfile(config.config_certfile): return _configuration_result(_('Certfile Location is not Valid, Please Enter Correct Path'), gdriveError) - _config_checkbox_int("config_uploading") - _config_checkbox_int("config_anonbrowse") - _config_checkbox_int("config_public_reg") - reboot_required |= _config_checkbox_int("config_kobo_sync") - _config_checkbox_int("config_kobo_proxy") + _config_checkbox_int(to_save, "config_uploading") + _config_checkbox_int(to_save, "config_anonbrowse") + _config_checkbox_int(to_save, "config_public_reg") + reboot_required |= _config_checkbox_int(to_save, "config_kobo_sync") + _config_checkbox_int(to_save, "config_kobo_proxy") - _config_string("config_calibre") - _config_string("config_converterpath") - _config_string("config_kepubifypath") + _config_string(to_save, "config_calibre") + _config_string(to_save, "config_converterpath") + _config_string(to_save, "config_kepubifypath") - reboot_required |= _config_int("config_login_type") + reboot_required |= _config_int(to_save, "config_login_type") #LDAP configurator, if config.config_login_type == constants.LOGIN_LDAP: - reboot_required |= _config_string("config_ldap_provider_url") - reboot_required |= _config_int("config_ldap_port") - reboot_required |= _config_int("config_ldap_authentication") - reboot_required |= _config_string("config_ldap_dn") - reboot_required |= _config_string("config_ldap_serv_username") - reboot_required |= _config_string("config_ldap_user_object") - reboot_required |= _config_string("config_ldap_group_object_filter") - reboot_required |= _config_string("config_ldap_group_members_field") - reboot_required |= _config_checkbox("config_ldap_openldap") - reboot_required |= _config_int("config_ldap_encryption") - reboot_required |= _config_string("config_ldap_cert_path") - _config_string("config_ldap_group_name") - if "config_ldap_serv_password" in to_save and to_save["config_ldap_serv_password"] != "": - reboot_required |= 1 - config.set_from_dictionary(to_save, "config_ldap_serv_password", base64.b64encode, encode='UTF-8') - config.save() - - if not config.config_ldap_provider_url \ - or not config.config_ldap_port \ - or not config.config_ldap_dn \ - or not config.config_ldap_user_object: - return _configuration_result(_('Please Enter a LDAP Provider, ' - 'Port, DN and User Object Identifier'), gdriveError) - - if config.config_ldap_authentication > constants.LDAP_AUTH_ANONYMOUS: - if config.config_ldap_authentication > constants.LDAP_AUTH_UNAUTHENTICATE: - if not config.config_ldap_serv_username or not bool(config.config_ldap_serv_password): - return _configuration_result('Please Enter a LDAP Service Account and Password', gdriveError) - else: - if not config.config_ldap_serv_username: - return _configuration_result('Please Enter a LDAP Service Account', gdriveError) - - if config.config_ldap_group_object_filter: - if config.config_ldap_group_object_filter.count("%s") != 1: - return _configuration_result(_('LDAP Group Object Filter Needs to Have One "%s" Format Identifier'), - gdriveError) - if config.config_ldap_group_object_filter.count("(") != config.config_ldap_group_object_filter.count(")"): - return _configuration_result(_('LDAP Group Object Filter Has Unmatched Parenthesis'), - gdriveError) - - if config.config_ldap_user_object.count("%s") != 1: - return _configuration_result(_('LDAP User Object Filter needs to Have One "%s" Format Identifier'), - gdriveError) - if config.config_ldap_user_object.count("(") != config.config_ldap_user_object.count(")"): - return _configuration_result(_('LDAP User Object Filter Has Unmatched Parenthesis'), - gdriveError) - - if config.config_ldap_cert_path and not os.path.isdir(config.config_ldap_cert_path): - return _configuration_result(_('LDAP Certificate Location is not Valid, Please Enter Correct Path'), - gdriveError) + reboot_required |= _configuration_ldap_helper(to_save, gdriveError) # Remote login configuration - _config_checkbox("config_remote_login") + _config_checkbox(to_save, "config_remote_login") if not config.config_remote_login: ub.session.query(ub.RemoteAuthToken).filter(ub.RemoteAuthToken.token_type==0).delete() # Goodreads configuration - _config_checkbox("config_use_goodreads") - _config_string("config_goodreads_api_key") - _config_string("config_goodreads_api_secret") + _config_checkbox(to_save, "config_use_goodreads") + _config_string(to_save, "config_goodreads_api_key") + _config_string(to_save, "config_goodreads_api_secret") if services.goodreads_support: services.goodreads_support.connect(config.config_goodreads_api_key, config.config_goodreads_api_secret, config.config_use_goodreads) - _config_int("config_updatechannel") + _config_int(to_save, "config_updatechannel") # Reverse proxy login configuration - _config_checkbox("config_allow_reverse_proxy_header_login") - _config_string("config_reverse_proxy_login_header_name") + _config_checkbox(to_save, "config_allow_reverse_proxy_header_login") + _config_string(to_save, "config_reverse_proxy_login_header_name") - # GitHub OAuth configuration + # OAuth configuration if config.config_login_type == constants.LOGIN_OAUTH: - active_oauths = 0 - - for element in oauthblueprints: - if to_save["config_" + str(element['id']) + "_oauth_client_id"] != element['oauth_client_id'] \ - or to_save["config_" + str(element['id']) + "_oauth_client_secret"] != element['oauth_client_secret']: - reboot_required = True - element['oauth_client_id'] = to_save["config_" + str(element['id']) + "_oauth_client_id"] - element['oauth_client_secret'] = to_save["config_" + str(element['id']) + "_oauth_client_secret"] - if to_save["config_"+str(element['id'])+"_oauth_client_id"] \ - and to_save["config_"+str(element['id'])+"_oauth_client_secret"]: - active_oauths += 1 - element["active"] = 1 - else: - element["active"] = 0 - ub.session.query(ub.OAuthProvider).filter(ub.OAuthProvider.id == element['id']).update( - {"oauth_client_id":to_save["config_"+str(element['id'])+"_oauth_client_id"], - "oauth_client_secret":to_save["config_"+str(element['id'])+"_oauth_client_secret"], - "active":element["active"]}) - - - reboot_required |= _config_int("config_log_level") - reboot_required |= _config_string("config_logfile") - if not logger.is_valid_logfile(config.config_logfile): - return _configuration_result(_('Logfile Location is not Valid, Please Enter Correct Path'), gdriveError) - - reboot_required |= _config_checkbox_int("config_access_log") - reboot_required |= _config_string("config_access_logfile") - if not logger.is_valid_logfile(config.config_access_logfile): - return _configuration_result(_('Access Logfile Location is not Valid, Please Enter Correct Path'), gdriveError) + _configuration_oauth_helper(to_save) + reboot_required |= _configuration_logfile_helper(to_save, gdriveError) # Rarfile Content configuration - _config_string("config_rarfile_location") + _config_string(to_save, "config_rarfile_location") unrar_status = helper.check_unrar(config.config_rarfile_location) if unrar_status: return _configuration_result(unrar_status, gdriveError) @@ -660,7 +688,6 @@ def _configuration_update_helper(): return _configuration_result('%s' % e, gdriveError) if db_change: - # reload(db) if not db.setup_db(config): return _configuration_result(_('DB Location is not Valid, Please Enter Correct Path'), gdriveError) @@ -698,6 +725,144 @@ def _configuration_result(error_flash=None, gdriveError=None): title=_(u"Basic Configuration"), page="config") +def _handle_new_user(to_save, content,languages, translations, kobo_support): + content.default_language = to_save["default_language"] + # content.mature_content = "Show_mature_content" in to_save + content.locale = to_save.get("locale", content.locale) + + content.sidebar_view = sum(int(key[5:]) for key in to_save if key.startswith('show_')) + if "show_detail_random" in to_save: + content.sidebar_view |= constants.DETAIL_RANDOM + + content.role = constants.selected_roles(to_save) + + if not to_save["nickname"] or not to_save["email"] or not to_save["password"]: + flash(_(u"Please fill out all fields!"), category="error") + return render_title_template("user_edit.html", new_user=1, content=content, translations=translations, + registered_oauth=oauth_check, kobo_support=kobo_support, + title=_(u"Add new user")) + content.password = generate_password_hash(to_save["password"]) + existing_user = ub.session.query(ub.User).filter(func.lower(ub.User.nickname) == to_save["nickname"].lower()) \ + .first() + existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()) \ + .first() + if not existing_user and not existing_email: + content.nickname = to_save["nickname"] + if config.config_public_reg and not check_valid_domain(to_save["email"]): + flash(_(u"E-mail is not from valid domain"), category="error") + return render_title_template("user_edit.html", new_user=1, content=content, translations=translations, + registered_oauth=oauth_check, kobo_support=kobo_support, + title=_(u"Add new user")) + else: + content.email = to_save["email"] + else: + flash(_(u"Found an existing account for this e-mail address or nickname."), category="error") + return render_title_template("user_edit.html", new_user=1, content=content, translations=translations, + languages=languages, title=_(u"Add new user"), page="newuser", + kobo_support=kobo_support, registered_oauth=oauth_check) + try: + content.allowed_tags = config.config_allowed_tags + content.denied_tags = config.config_denied_tags + content.allowed_column_value = config.config_allowed_column_value + content.denied_column_value = config.config_denied_column_value + ub.session.add(content) + ub.session.commit() + flash(_(u"User '%(user)s' created", user=content.nickname), category="success") + return redirect(url_for('admin.admin')) + except IntegrityError: + ub.session.rollback() + flash(_(u"Found an existing account for this e-mail address or nickname."), category="error") + + +def _handle_edit_user(to_save, content,languages, translations, kobo_support, downloads): + if "delete" in to_save: + if ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN, + ub.User.id != content.id).count(): + ub.session.query(ub.User).filter(ub.User.id == content.id).delete() + ub.session.commit() + flash(_(u"User '%(nick)s' deleted", nick=content.nickname), category="success") + return redirect(url_for('admin.admin')) + else: + flash(_(u"No admin user remaining, can't delete user", nick=content.nickname), category="error") + return redirect(url_for('admin.admin')) + else: + if not ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN, + ub.User.id != content.id).count() and \ + not 'admin_role' in to_save: + flash(_(u"No admin user remaining, can't remove admin role", nick=content.nickname), category="error") + return redirect(url_for('admin.admin')) + + if "password" in to_save and to_save["password"]: + content.password = generate_password_hash(to_save["password"]) + anonymous = content.is_anonymous + content.role = constants.selected_roles(to_save) + if anonymous: + content.role |= constants.ROLE_ANONYMOUS + else: + content.role &= ~constants.ROLE_ANONYMOUS + + val = [int(k[5:]) for k in to_save if k.startswith('show_')] + sidebar = ub.get_sidebar_config() + for element in sidebar: + value = element['visibility'] + if value in val and not content.check_visibility(value): + content.sidebar_view |= value + elif not value in val and content.check_visibility(value): + content.sidebar_view &= ~value + + if "Show_detail_random" in to_save: + content.sidebar_view |= constants.DETAIL_RANDOM + else: + content.sidebar_view &= ~constants.DETAIL_RANDOM + + if "default_language" in to_save: + content.default_language = to_save["default_language"] + if "locale" in to_save and to_save["locale"]: + content.locale = to_save["locale"] + if to_save["email"] and to_save["email"] != content.email: + existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()) \ + .first() + if not existing_email: + content.email = to_save["email"] + else: + flash(_(u"Found an existing account for this e-mail address."), category="error") + return render_title_template("user_edit.html", + translations=translations, + languages=languages, + mail_configured=config.get_mail_server_configured(), + kobo_support=kobo_support, + new_user=0, + content=content, + downloads=downloads, + registered_oauth=oauth_check, + title=_(u"Edit User %(nick)s", nick=content.nickname), page="edituser") + if "nickname" in to_save and to_save["nickname"] != content.nickname: + # Query User nickname, if not existing, change + if not ub.session.query(ub.User).filter(ub.User.nickname == to_save["nickname"]).scalar(): + content.nickname = to_save["nickname"] + else: + flash(_(u"This username is already taken"), category="error") + return render_title_template("user_edit.html", + translations=translations, + languages=languages, + mail_configured=config.get_mail_server_configured(), + new_user=0, content=content, + downloads=downloads, + registered_oauth=oauth_check, + kobo_support=kobo_support, + title=_(u"Edit User %(nick)s", nick=content.nickname), + page="edituser") + + if "kindle_mail" in to_save and to_save["kindle_mail"] != content.kindle_mail: + content.kindle_mail = to_save["kindle_mail"] + try: + ub.session.commit() + flash(_(u"User '%(nick)s' updated", nick=content.nickname), category="success") + except IntegrityError: + ub.session.rollback() + flash(_(u"An unknown error occured."), category="error") + + @admi.route("/admin/user/new", methods=["GET", "POST"]) @login_required @admin_required @@ -708,52 +873,7 @@ def new_user(): kobo_support = feature_support['kobo'] and config.config_kobo_sync if request.method == "POST": to_save = request.form.to_dict() - content.default_language = to_save["default_language"] - # content.mature_content = "Show_mature_content" in to_save - content.locale = to_save.get("locale", content.locale) - - content.sidebar_view = sum(int(key[5:]) for key in to_save if key.startswith('show_')) - if "show_detail_random" in to_save: - content.sidebar_view |= constants.DETAIL_RANDOM - - content.role = constants.selected_roles(to_save) - - if not to_save["nickname"] or not to_save["email"] or not to_save["password"]: - flash(_(u"Please fill out all fields!"), category="error") - return render_title_template("user_edit.html", new_user=1, content=content, translations=translations, - registered_oauth=oauth_check, kobo_support=kobo_support, - title=_(u"Add new user")) - content.password = generate_password_hash(to_save["password"]) - existing_user = ub.session.query(ub.User).filter(func.lower(ub.User.nickname) == to_save["nickname"].lower())\ - .first() - existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower())\ - .first() - if not existing_user and not existing_email: - content.nickname = to_save["nickname"] - if config.config_public_reg and not check_valid_domain(to_save["email"]): - flash(_(u"E-mail is not from valid domain"), category="error") - return render_title_template("user_edit.html", new_user=1, content=content, translations=translations, - registered_oauth=oauth_check, kobo_support=kobo_support, - title=_(u"Add new user")) - else: - content.email = to_save["email"] - else: - flash(_(u"Found an existing account for this e-mail address or nickname."), category="error") - return render_title_template("user_edit.html", new_user=1, content=content, translations=translations, - languages=languages, title=_(u"Add new user"), page="newuser", - kobo_support=kobo_support, registered_oauth=oauth_check) - try: - content.allowed_tags = config.config_allowed_tags - content.denied_tags = config.config_denied_tags - content.allowed_column_value = config.config_allowed_column_value - content.denied_column_value = config.config_denied_column_value - ub.session.add(content) - ub.session.commit() - flash(_(u"User '%(user)s' created", user=content.nickname), category="success") - return redirect(url_for('admin.admin')) - except IntegrityError: - ub.session.rollback() - flash(_(u"Found an existing account for this e-mail address or nickname."), category="error") + _handle_new_user(to_save, content, languages, translations, kobo_support) else: content.role = config.config_default_role content.sidebar_view = config.config_default_show @@ -778,15 +898,12 @@ def update_mailsettings(): to_save = request.form.to_dict() log.debug("update_mailsettings %r", to_save) - _config_string = lambda x: config.set_from_dictionary(to_save, x, lambda y: y.strip() if y else y) - _config_int = lambda x: config.set_from_dictionary(to_save, x, int) - - _config_string("mail_server") - _config_int("mail_port") - _config_int("mail_use_ssl") - _config_string("mail_login") - _config_string("mail_password") - _config_string("mail_from") + _config_string(to_save, "mail_server") + _config_int(to_save, "mail_port") + _config_int(to_save, "mail_use_ssl") + _config_string(to_save, "mail_login") + _config_string(to_save, "mail_password") + _config_string(to_save, "mail_from") config.save() if to_save.get("test"): @@ -823,96 +940,9 @@ def edit_user(user_id): downloads.append(downloadbook) else: ub.delete_download(book.book_id) - # ub.session.query(ub.Downloads).filter(book.book_id == ub.Downloads.book_id).delete() - # ub.session.commit() if request.method == "POST": to_save = request.form.to_dict() - if "delete" in to_save: - if ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN, - ub.User.id != content.id).count(): - ub.session.query(ub.User).filter(ub.User.id == content.id).delete() - ub.session.commit() - flash(_(u"User '%(nick)s' deleted", nick=content.nickname), category="success") - return redirect(url_for('admin.admin')) - else: - flash(_(u"No admin user remaining, can't delete user", nick=content.nickname), category="error") - return redirect(url_for('admin.admin')) - else: - if not ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN, - ub.User.id != content.id).count() and \ - not 'admin_role' in to_save: - flash(_(u"No admin user remaining, can't remove admin role", nick=content.nickname), category="error") - return redirect(url_for('admin.admin')) - - if "password" in to_save and to_save["password"]: - content.password = generate_password_hash(to_save["password"]) - anonymous = content.is_anonymous - content.role = constants.selected_roles(to_save) - if anonymous: - content.role |= constants.ROLE_ANONYMOUS - else: - content.role &= ~constants.ROLE_ANONYMOUS - - val = [int(k[5:]) for k in to_save if k.startswith('show_')] - sidebar = ub.get_sidebar_config() - for element in sidebar: - value = element['visibility'] - if value in val and not content.check_visibility(value): - content.sidebar_view |= value - elif not value in val and content.check_visibility(value): - content.sidebar_view &= ~value - - if "Show_detail_random" in to_save: - content.sidebar_view |= constants.DETAIL_RANDOM - else: - content.sidebar_view &= ~constants.DETAIL_RANDOM - - if "default_language" in to_save: - content.default_language = to_save["default_language"] - if "locale" in to_save and to_save["locale"]: - content.locale = to_save["locale"] - if to_save["email"] and to_save["email"] != content.email: - existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()) \ - .first() - if not existing_email: - content.email = to_save["email"] - else: - flash(_(u"Found an existing account for this e-mail address."), category="error") - return render_title_template("user_edit.html", - translations=translations, - languages=languages, - mail_configured = config.get_mail_server_configured(), - kobo_support=kobo_support, - new_user=0, - content=content, - downloads=downloads, - registered_oauth=oauth_check, - title=_(u"Edit User %(nick)s", nick=content.nickname), page="edituser") - if "nickname" in to_save and to_save["nickname"] != content.nickname: - # Query User nickname, if not existing, change - if not ub.session.query(ub.User).filter(ub.User.nickname == to_save["nickname"]).scalar(): - content.nickname = to_save["nickname"] - else: - flash(_(u"This username is already taken"), category="error") - return render_title_template("user_edit.html", - translations=translations, - languages=languages, - mail_configured=config.get_mail_server_configured(), - new_user=0, content=content, - downloads=downloads, - registered_oauth=oauth_check, - kobo_support=kobo_support, - title=_(u"Edit User %(nick)s", nick=content.nickname), - page="edituser") - - if "kindle_mail" in to_save and to_save["kindle_mail"] != content.kindle_mail: - content.kindle_mail = to_save["kindle_mail"] - try: - ub.session.commit() - flash(_(u"User '%(nick)s' updated", nick=content.nickname), category="success") - except IntegrityError: - ub.session.rollback() - flash(_(u"An unknown error occured."), category="error") + _handle_edit_user(to_save, content, languages, translations, kobo_support, downloads) return render_title_template("user_edit.html", translations=translations, languages=languages,