diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..5f2aa90 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,11 @@ +repos: + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.8.2 + hooks: + - id: ruff + args: [ --fix ] + - id: ruff-format + - repo: https://github.com/pycqa/isort + rev: 5.12.0 + hooks: + - id: isort \ No newline at end of file diff --git a/App/Auth/__init__.py b/App/Auth/__init__.py index ed7e55a..27f07cb 100644 --- a/App/Auth/__init__.py +++ b/App/Auth/__init__.py @@ -1,23 +1,28 @@ -from flask import Blueprint, render_template, redirect, url_for, request, session +from flask import (Blueprint, redirect, render_template, request, session, + url_for) + from .auth_model import auth_model -auth_bp = Blueprint('auth_bp', __name__, template_folder='templates') +auth_bp = Blueprint("auth_bp", __name__, template_folder="templates") -@auth_bp.route('/', methods=['GET', 'POST']) + +@auth_bp.route("/", methods=["GET", "POST"]) def auth(): - if request.method == 'GET': - return render_template('auth.html') + if request.method == "GET": + return render_template("auth.html") else: data = request.form.to_dict() auth_data = auth_model(data) if auth_data.status: - session.update({ - 'user_id': auth_data.result[0]['user_ID'], - 'login': auth_data.result[0]['login'], - 'access_user': data['access'], - 'role': auth_data.result[0]['user_role'], - 'permanent': True - }) - return redirect(url_for('index')) + session.update( + { + "user_id": auth_data.result[0]["user_ID"], + "login": auth_data.result[0]["login"], + "access_user": data["access"], + "role": auth_data.result[0]["user_role"], + "permanent": True, + } + ) + return redirect(url_for("index")) else: - return render_template('error.html', error_message=auth_data.error_message) + return render_template("error.html", error_message=auth_data.error_message) diff --git a/App/Auth/auth_model.py b/App/Auth/auth_model.py index b3cab0b..91027dd 100644 --- a/App/Auth/auth_model.py +++ b/App/Auth/auth_model.py @@ -1,29 +1,29 @@ +import os from dataclasses import dataclass -from Database.work import select_list from Database.sql_provider import SQLProvider - +from Database.work import select_list from flask import current_app -import os -sql_provider = SQLProvider(os.path.join(os.path.dirname(__file__), 'sql')) +sql_provider = SQLProvider(os.path.join(os.path.dirname(__file__), "sql")) + + @dataclass class InfoRespronse: result: tuple error_message: str status: bool -def auth_model(input_data) -> InfoRespronse: - db_config = current_app.config['db_config'] - _sql = sql_provider.get('auth.sql', input_data) +def auth_model(input_data) -> InfoRespronse: + db_config = current_app.config["db_config"] + + _sql = sql_provider.get("auth.sql", input_data) user = select_list(db_config, _sql) if user is None: - return InfoRespronse((), - error_message = 'Ошибка при подключении к БД', - status=False) + return InfoRespronse( + (), error_message="Ошибка при подключении к БД", status=False + ) elif len(user) == 0: - return InfoRespronse((), - error_message = 'Пользователь не найден', - status=False) - return InfoRespronse(user, error_message='', status=True) \ No newline at end of file + return InfoRespronse((), error_message="Пользователь не найден", status=False) + return InfoRespronse(user, error_message="", status=True) diff --git a/App/Database/DBconnect.py b/App/Database/DBconnect.py index 2280c13..989665e 100644 --- a/App/Database/DBconnect.py +++ b/App/Database/DBconnect.py @@ -1,7 +1,9 @@ import pymysql from pymysql.err import * + + class DBContextManager: - def __init__(self, db_config : dict): + def __init__(self, db_config: dict): self.db_config = db_config self.connection = None self.cursor = None @@ -9,27 +11,26 @@ class DBContextManager: def __enter__(self): try: self.connection = pymysql.connect( - host=self.db_config['host'], - port=self.db_config['port'], - user=self.db_config['user'], - password=self.db_config['password'], - db=self.db_config['db'], - charset=self.db_config['charset'] + host=self.db_config["host"], + port=self.db_config["port"], + user=self.db_config["user"], + password=self.db_config["password"], + db=self.db_config["db"], + charset=self.db_config["charset"], ) self.cursor = self.connection.cursor() return self.cursor except (OperationalError, KeyError) as err: print(err.args) return None - + def __exit__(self, exc_type, exc_val, exc_tb): if self.connection and self.cursor: if exc_type: - print(exc_type, '\n', exc_val) + print(exc_type, "\n", exc_val) self.connection.rollback() else: self.connection.commit() self.cursor.close() self.connection.close() return True - \ No newline at end of file diff --git a/App/Database/sql_provider.py b/App/Database/sql_provider.py index a3a1855..97e3fcc 100644 --- a/App/Database/sql_provider.py +++ b/App/Database/sql_provider.py @@ -1,14 +1,15 @@ import os from string import Template + class SQLProvider: def __init__(self, file_path): self.scripts = {} for file in os.listdir(file_path): - _sql = open(f'{file_path}/{file}').read() + _sql = open(f"{file_path}/{file}").read() self.scripts[file] = Template(_sql) def get(self, name, params) -> dict: if name not in self.scripts: - raise ValueError(f'SQL template {name} not found') - return self.scripts[name].substitute(**params) \ No newline at end of file + raise ValueError(f"SQL template {name} not found") + return self.scripts[name].substitute(**params) diff --git a/App/Database/work.py b/App/Database/work.py index 6437858..e0105c3 100644 --- a/App/Database/work.py +++ b/App/Database/work.py @@ -1,5 +1,6 @@ from .DBconnect import DBContextManager + def select_list(db_config, sql) -> list: with DBContextManager(db_config) as cursor: if cursor is None: @@ -11,17 +12,15 @@ def select_list(db_config, sql) -> list: lst = [dict(zip(schema, row)) for row in result] return lst + def procedure(db_config, name, args: tuple) -> list: with DBContextManager(db_config) as cursor: if cursor is None: raise ValueError("Cursor not created") else: cursor.callproc(name, args) - result = cursor.fetchall()[0] - schema = cursor.description[0] - lst = dict(zip(schema, result)) - return lst + def transaction(cursor, sql): cursor.execute(sql) - return True \ No newline at end of file + return True diff --git a/App/Report/__init__.py b/App/Report/__init__.py index ee25b68..015b5f9 100644 --- a/App/Report/__init__.py +++ b/App/Report/__init__.py @@ -1,44 +1,38 @@ -from flask import request, Blueprint, render_template, url_for -from checker import check_auth -from os import path -from datetime import date -from .report_model import view_report_model, create_report_model import json +from datetime import date +from os import path -with open(path.join(path.dirname(__file__), 'reports.json'), encoding='utf-8') as f: +from checker import check_auth +from flask import Blueprint, render_template, request, url_for + +from .report_model import create_report_model, view_report_model + +with open(path.join(path.dirname(__file__), "reports.json"), encoding="utf-8") as f: report_list = json.load(f) -report_bp = Blueprint('report_bp', __name__, template_folder='templates', static_folder='static') +report_bp = Blueprint( + "report_bp", __name__, template_folder="templates", static_folder="static" +) -@report_bp.route('/menu') + +@report_bp.route("/menu") @check_auth def menu(): - if request.method == 'GET': - return render_template('report_menu.html') - -# Рекомендации от ИС -# @report_bp.route('/test', methods=['GET']) -# def get_test(): -# return render_template('report_basic.html', -# is_write=True, -# title='Создание отчетов', -# items=report_list, -# date_today=date.today()) + if request.method == "GET": + return render_template("report_menu.html") -# @report_bp.route('/test', methods=['POST']) -# def post_test(): -# report_response = model(request, report_list) -# return view(report_response) - -@report_bp.route('/create', methods=['GET', 'POST']) + +@report_bp.route("/create", methods=["GET", "POST"]) @check_auth def create(): - if request.method == 'GET': - return render_template('report_basic.html', - is_write=True, - title='Создание отчетов', - items=report_list, - date_today=date.today()) + if request.method == "GET": + return render_template( + "report_basic.html", + is_write=True, + title="Создание отчетов", + items=report_list, + date_today=date.today(), + ) else: result = create_report_model(request, report_list) if result.status: @@ -46,20 +40,26 @@ def create(): else: return render_template("error.html", error_message=result.error_message) -@report_bp.route('/view', methods=['GET', 'POST']) + +@report_bp.route("/view", methods=["GET", "POST"]) @check_auth def view(): - if request.method == 'GET': - return render_template('report_basic.html', - is_write=False, - title='Просмотр отчетов', - items=report_list, - date_today=date.today()) + if request.method == "GET": + return render_template( + "report_basic.html", + is_write=False, + title="Просмотр отчетов", + items=report_list, + date_today=date.today(), + ) else: result = view_report_model(request, report_list) if result.status: - return render_template("output.html", items=result.result, - header='Результаты отчёта', - link = url_for('report_bp.menu')) + return render_template( + "output.html", + items=result.result, + header="Результаты отчёта", + link=url_for("report_bp.menu"), + ) else: - return render_template("error.html", error_message=result.error_message) \ No newline at end of file + return render_template("error.html", error_message=result.error_message) diff --git a/App/Report/report_model.py b/App/Report/report_model.py index 9125413..8578ebd 100644 --- a/App/Report/report_model.py +++ b/App/Report/report_model.py @@ -1,81 +1,74 @@ from dataclasses import dataclass - -from Database.work import select_list, procedure -from Database.sql_provider import SQLProvider - -from flask import current_app, session from os import path -sql_provider = SQLProvider(path.join(path.dirname(__file__), 'sql')) +from Database.sql_provider import SQLProvider +from Database.work import procedure, select_list +from flask import current_app, session + +sql_provider = SQLProvider(path.join(path.dirname(__file__), "sql")) + + @dataclass class InfoRespronse: result: tuple error_message: str status: bool -def check_report(input_data: dict) -> bool: - db_config = current_app.config['db_config'] - result = procedure(db_config, 'check_report', tuple(input_data.values())) - if result is None or result['exist'] == 0: +def check_report(input_data: dict) -> bool: + db_config = current_app.config["db_config"] + + result = procedure(db_config, "check_report", tuple(input_data.values())) + if result is None or result["exist"] == 0: return False return True def view_report_model(request, report_list: dict) -> InfoRespronse: - id = request.form.get('category') - month = request.form.get('month') - year = request.form.get('year') - + id = request.form.get("category") + month = request.form.get("month") + year = request.form.get("year") + data = dict(id=id, month=month, year=year) - if session['role'] in report_list[id]['data']['read']: + if session["role"] in report_list[id]["data"]["read"]: status = check_report(data) if not status: - return InfoRespronse((), - error_message = 'Отчет не найден', - status=False) - - db_config = current_app.config['db_config'] - view_script = report_list[id]['data']['view'] - _sql = sql_provider.get(f'{view_script}.sql', data) + return InfoRespronse((), error_message="Отчет не найден", status=False) + + db_config = current_app.config["db_config"] + view_script = report_list[id]["data"]["view"] + _sql = sql_provider.get(f"{view_script}.sql", data) result = select_list(db_config, _sql) - if result is None: - return InfoRespronse((), - error_message = 'Ошибка в подключении к базе данных. Свяжитесь с администратором', - status=False) - return InfoRespronse(result, error_message='', status=True) + return InfoRespronse(result, error_message="", status=True) else: - return InfoRespronse((), - error_message='Недостаточно прав для чтения данного отчета!', - status=False) - + return InfoRespronse( + (), + error_message="Недостаточно прав для чтения данного отчета!", + status=False, + ) + + def create_report_model(request, report_list: dict) -> InfoRespronse: - id = request.form.get('category') - month = request.form.get('month') - year = request.form.get('year') + id = request.form.get("category") + month = request.form.get("month") + year = request.form.get("year") data = dict(id=id, month=month, year=year) - if session['role'] in report_list[id]['data']['write']: - + if session["role"] in report_list[id]["data"]["write"]: status = check_report(data) if status: - return InfoRespronse((), - error_message = 'Отчет уже существует', - status=False) - - db_config = current_app.config['db_config'] - proc_name = report_list[id]['data']['procedure'] - args = tuple(data.values()) - result = procedure(db_config, proc_name, args) + return InfoRespronse((), error_message="Отчет уже существует", status=False) - if result is None: - return InfoRespronse((), - error_message = 'Ошибка в подключении к базе данных. Свяжитесь с администратором', - status=False) - return InfoRespronse((), error_message='', status=True) + db_config = current_app.config["db_config"] + proc_name = report_list[id]["data"]["procedure"] + args = tuple(data.values()) + procedure(db_config, proc_name, args) + return InfoRespronse((), error_message="", status=True) else: - return InfoRespronse((), - error_message='Недостаточно прав для создания данного отчета!', - status=False) \ No newline at end of file + return InfoRespronse( + (), + error_message="Недостаточно прав для создания данного отчета!", + status=False, + ) diff --git a/App/Requests/__init__.py b/App/Requests/__init__.py index f40799e..6c6e471 100644 --- a/App/Requests/__init__.py +++ b/App/Requests/__init__.py @@ -1,44 +1,55 @@ -from flask import request, Blueprint, render_template, url_for -from os import path -from checker import check_auth -from .requests_model import sklad, materials_per_seller, sellers_names, materials_names import json +from os import path -with open(path.join(path.dirname(__file__), 'zapros_menu.json'), encoding='utf-8') as f: +from checker import check_auth +from flask import Blueprint, render_template, request, url_for + +from .requests_model import (materials_names, materials_per_seller, + sellers_names, sklad) + +with open(path.join(path.dirname(__file__), "zapros_menu.json"), encoding="utf-8") as f: requests_list = json.load(f) -requests_bp = Blueprint('requests_bp', __name__, template_folder='templates') +requests_bp = Blueprint("requests_bp", __name__, template_folder="templates") -@requests_bp.route('/', methods=['GET', 'POST']) + +@requests_bp.route("/", methods=["GET", "POST"]) @check_auth def requests(): - if request.method == 'GET': - return render_template('zapros_menu.html', options=requests_list) + if request.method == "GET": + return render_template("zapros_menu.html", options=requests_list) -@requests_bp.route('/sklad', methods=['GET', 'POST']) + +@requests_bp.route("/sklad", methods=["GET", "POST"]) @check_auth def sklad_zapros(): - if request.method == 'GET': - return render_template('zagotovki.html') + if request.method == "GET": + return render_template("zagotovki.html") else: zagotovki = sklad(request) if zagotovki.status: material = dict(request.form) - header = f'Заготовки на складе из материала \"{material["material"]}\"' - return render_template('output.html', items=zagotovki.result, header=header) + header = f'Заготовки на складе из материала "{material["material"]}"' + return render_template("output.html", items=zagotovki.result, header=header) else: - return render_template('error.html', error_message=zagotovki.error_message) - -@requests_bp.route('/shipments', methods=['GET', 'POST']) + return render_template("error.html", error_message=zagotovki.error_message) + + +@requests_bp.route("/shipments", methods=["GET", "POST"]) @check_auth def sellers_ship(): - if request.method == 'GET': - return render_template('sellers_ship.html') + if request.method == "GET": + return render_template("sellers_ship.html") else: zagotovki = materials_per_seller(request) if zagotovki.status: seller = dict(request.form) - header = f'Поставки от поставщика \"{seller["seller"]}\"' - return render_template('output.html', items=zagotovki.result, header=header, link=url_for('requests_bp.requests')) + header = f'Поставки от поставщика "{seller["seller"]}"' + return render_template( + "output.html", + items=zagotovki.result, + header=header, + link=url_for("requests_bp.requests"), + ) else: - return render_template('error.html', error_message=zagotovki.error_message) \ No newline at end of file + return render_template("error.html", error_message=zagotovki.error_message) diff --git a/App/Requests/requests_model.py b/App/Requests/requests_model.py index e23513b..5f38c03 100644 --- a/App/Requests/requests_model.py +++ b/App/Requests/requests_model.py @@ -1,61 +1,73 @@ from dataclasses import dataclass - -from Database.work import select_list -from Database.sql_provider import SQLProvider - -from flask import current_app from os import path -sql_provider = SQLProvider(path.join(path.dirname(__file__), 'sql')) +from Database.sql_provider import SQLProvider +from Database.work import select_list +from flask import current_app + +sql_provider = SQLProvider(path.join(path.dirname(__file__), "sql")) + + @dataclass class InfoRespronse: result: tuple error_message: str status: bool -def sellers_names() -> InfoRespronse: - db_config = current_app.config['db_config'] - _sql = sql_provider.get('sellers_names.sql', {}) +def sellers_names() -> InfoRespronse: + db_config = current_app.config["db_config"] + + _sql = sql_provider.get("sellers_names.sql", {}) result = select_list(db_config, _sql) if result is None: - return InfoRespronse((), - error_message = 'Ошибка в подключении к базе данных. Свяжитесь с администратором', - status=False) - return InfoRespronse(result, error_message='', status=True) + return InfoRespronse( + (), + error_message="Ошибка в подключении к базе данных. Свяжитесь с администратором", + status=False, + ) + return InfoRespronse(result, error_message="", status=True) + def materials_names() -> InfoRespronse: - db_config = current_app.config['db_config'] - - _sql = sql_provider.get('materials_names.sql', {}) + db_config = current_app.config["db_config"] + + _sql = sql_provider.get("materials_names.sql", {}) result = select_list(db_config, _sql) if result is None: - return InfoRespronse((), - error_message = 'Ошибка в подключении к базе данных. Свяжитесь с администратором', - status=False) - return InfoRespronse(result, error_message='', status=True) + return InfoRespronse( + (), + error_message="Ошибка в подключении к базе данных. Свяжитесь с администратором", + status=False, + ) + return InfoRespronse(result, error_message="", status=True) + def sklad(request) -> InfoRespronse: - db_config = current_app.config['db_config'] + db_config = current_app.config["db_config"] material = dict(request.form) - _sql = sql_provider.get('sklad_material.sql', material) + _sql = sql_provider.get("sklad_material.sql", material) result = select_list(db_config, _sql) if result is None: - return InfoRespronse((), - error_message = 'Ошибка в подключении к базе данных. Свяжитесь с администратором', - status=False) - return InfoRespronse(result, error_message='', status=True) + return InfoRespronse( + (), + error_message="Ошибка в подключении к базе данных. Свяжитесь с администратором", + status=False, + ) + return InfoRespronse(result, error_message="", status=True) def materials_per_seller(request) -> InfoRespronse: - db_config = current_app.config['db_config'] + db_config = current_app.config["db_config"] seller = dict(request.form) - _sql = sql_provider.get('ship_seller.sql', seller) + _sql = sql_provider.get("ship_seller.sql", seller) result = select_list(db_config, _sql) if result is None: - return InfoRespronse((), - error_message = 'Ошибка в подключении к базе данных. Свяжитесь с администратором', - status=False) - return InfoRespronse(result, error_message='', status=True) \ No newline at end of file + return InfoRespronse( + (), + error_message="Ошибка в подключении к базе данных. Свяжитесь с администратором", + status=False, + ) + return InfoRespronse(result, error_message="", status=True) diff --git a/App/Requests/sql/ship_seller.sql b/App/Requests/sql/ship_seller.sql index e0c182c..11edb10 100644 --- a/App/Requests/sql/ship_seller.sql +++ b/App/Requests/sql/ship_seller.sql @@ -1,6 +1,6 @@ SELECT w.waybill_date AS 'Дата поставки', - SUM(w.total) AS 'Общая сумма', - SUM(wl.amount) as 'Количество' + SUM(w.total) AS 'Общая сумма (в рублях)', + SUM(wl.amount) as 'Общее количество заготовок' FROM waybill w JOIN (SELECT waybill_id, SUM(amount) AS amount FROM waybill_lines wl GROUP BY waybill_id)wl USING (waybill_id) JOIN (SELECT user_id, sel_id FROM external_users) eu USING(user_id) diff --git a/App/Requests/sql/sklad_material.sql b/App/Requests/sql/sklad_material.sql index 2a154a5..10de13c 100644 --- a/App/Requests/sql/sklad_material.sql +++ b/App/Requests/sql/sklad_material.sql @@ -1,4 +1,4 @@ -SELECT weight AS 'Вес', price AS 'Цена', +SELECT weight AS 'Вес', price AS 'Цена (в рублях)', count AS 'Количество', last_update AS 'Дата последнего обновления' FROM workpiece WHERE material = '$material' \ No newline at end of file diff --git a/App/Waybill/__init__.py b/App/Waybill/__init__.py index 2701981..406918d 100644 --- a/App/Waybill/__init__.py +++ b/App/Waybill/__init__.py @@ -1,47 +1,61 @@ -from flask import Blueprint, render_template, redirect, url_for, session, request -from checker import check_auth -from .model import index_waybill, form_waybill, clear, button_click, transaction_order_model from datetime import datetime -waybill_bp = Blueprint('waybill_bp', __name__, template_folder='templates', static_folder='static') +from checker import check_auth +from flask import (Blueprint, redirect, render_template, request, session, + url_for) -@waybill_bp.route('/', methods=['GET']) +from .model import (button_click, clear, form_waybill, index_waybill, + transaction_order_model) + +waybill_bp = Blueprint( + "waybill_bp", __name__, template_folder="templates", static_folder="static" +) + + +@waybill_bp.route("/", methods=["GET"]) @check_auth def index(): lst = index_waybill() if lst is not None: waybill = form_waybill() - return render_template('waybill.html', items=lst, waybill=waybill) + return render_template("waybill.html", items=lst, waybill=waybill) else: - return render_template('error.html', error_message="Ошибка в подключении к СУБД") + return render_template( + "error.html", error_message="Ошибка в подключении к СУБД" + ) -@waybill_bp.route('/', methods=['POST']) + +@waybill_bp.route("/", methods=["POST"]) @check_auth def waybill_main(): status = button_click(request) if status: - return redirect(url_for('waybill_bp.index')) + return redirect(url_for("waybill_bp.index")) else: - return render_template("error.html", error_message="Товар не был добавлен в корзину") + return render_template( + "error.html", error_message="Товар не был добавлен в корзину" + ) -@waybill_bp.route('/clear', methods=['GET']) + +@waybill_bp.route("/clear", methods=["GET"]) @check_auth def clear_waybill(): clear() - return redirect(url_for('waybill_bp.index')) + return redirect(url_for("waybill_bp.index")) -@waybill_bp.route('/save_order') + +@waybill_bp.route("/save_order") @check_auth def save_order(): - if not session.get('waybill',{}): - return redirect(url_for('waybill_bp.index')) - - user_id = session.get('user_id',"") + if not session.get("waybill", {}): + return redirect(url_for("waybill_bp.index")) + + user_id = session.get("user_id", "") current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") result = transaction_order_model(user_id, current_date) if result.status: print("Order success") - return render_template("order_finish.html", order_id = result.result[0]) + return render_template("order_finish.html", order_id=result.result[0]) else: - return render_template("error.html", error_message=result.error_message) \ No newline at end of file + return render_template("error.html", error_message=result.error_message) diff --git a/App/Waybill/model.py b/App/Waybill/model.py index 075f385..4335e07 100644 --- a/App/Waybill/model.py +++ b/App/Waybill/model.py @@ -1,12 +1,13 @@ -from Database.sql_provider import SQLProvider -from Database.work import select_list -from Database.DBconnect import DBContextManager - -from flask import current_app, session +import os from dataclasses import dataclass from datetime import date + from cache.wrapper import fetch_from_cache -import os +from Database.DBconnect import DBContextManager +from Database.sql_provider import SQLProvider +from Database.work import select_list +from flask import current_app, session + @dataclass class InfoRespronse: @@ -14,107 +15,114 @@ class InfoRespronse: error_message: str status: bool -sql_provider = SQLProvider(os.path.join(os.path.dirname(__file__), 'sql')) + +sql_provider = SQLProvider(os.path.join(os.path.dirname(__file__), "sql")) + def clear(): - if session.get('waybill',{}): - session.pop('waybill') + if session.get("waybill", {}): + session.pop("waybill") + def form_waybill() -> list: - db_config = current_app.config['db_config'] - cache_config = current_app.config['cache_config'] + db_config = current_app.config["db_config"] + cache_config = current_app.config["cache_config"] - current_waybill = session.get('waybill',{}) + current_waybill = session.get("waybill", {}) waybill = [] - for k,v in current_waybill.items(): - _sql = sql_provider.get('one_good.sql', dict(work_id=k)) - cache_select = fetch_from_cache(f'product_{k}', cache_config)(select_list) + for k, v in current_waybill.items(): + _sql = sql_provider.get("one_good.sql", dict(work_id=k)) + cache_select = fetch_from_cache(f"product_{k}", cache_config)(select_list) product = cache_select(db_config, _sql)[0] - product['amount'] = v + product["amount"] = v waybill.append(product) return waybill -def index_waybill() -> list: - db_config = current_app.config['db_config'] - cache_config = current_app.config['cache_config'] - cache_select = fetch_from_cache('items_cached', cache_config)(select_list) - _sql = sql_provider.get('goods.sql', {}) +def index_waybill() -> list: + db_config = current_app.config["db_config"] + cache_config = current_app.config["cache_config"] + + cache_select = fetch_from_cache("items_cached", cache_config)(select_list) + _sql = sql_provider.get("goods.sql", {}) products = cache_select(db_config, _sql) - if products == None: - return [] - + return products -def button_click(request): - db_config = current_app.config['db_config'] - data = dict(work_id=int(request.form['product_display'])) - _sql = sql_provider.get('one_good.sql', data) +def button_click(request): + db_config = current_app.config["db_config"] + data = dict(work_id=int(request.form["product_display"])) + + _sql = sql_provider.get("one_good.sql", data) result = select_list(db_config, _sql) - if result == None: + if result is None: return False - + product = result[0] - if request.form.get('add'): - if 'waybill' not in session: - session['waybill'] = dict() - session['total'] = '0' - - if str(product['work_id']) in session['waybill']: - pr_id = product['work_id'] - price = product['price'] - amount = int(session['waybill'][str(pr_id)]) - session['waybill'][str(pr_id)] = str(amount+1) - session['total'] = str(int(session['total']) + price) + if request.form.get("add"): + if "waybill" not in session: + session["waybill"] = dict() + session["total"] = "0" + + if str(product["work_id"]) in session["waybill"]: + pr_id = product["work_id"] + price = product["price"] + amount = int(session["waybill"][str(pr_id)]) + session["waybill"][str(pr_id)] = str(amount + 1) + session["total"] = str(int(session["total"]) + price) session.modified = True else: print("NEW WORKPIECE") - pr_id = product['work_id'] - price = product['price'] - session['waybill'][str(pr_id)] = '1' - session['total'] = str(int(session['total']) + price) - print(session['waybill']) + pr_id = product["work_id"] + price = product["price"] + session["waybill"][str(pr_id)] = "1" + session["total"] = str(int(session["total"]) + price) + print(session["waybill"]) session.modified = True - elif request.form.get('product_display_minus'): + elif request.form.get("product_display_minus"): # decreasing count in waybill - - amount = int(session['waybill'][str(product['work_id'])]) + + amount = int(session["waybill"][str(product["work_id"])]) if amount == 1: - session['waybill'].pop(str(product['work_id'])) + session["waybill"].pop(str(product["work_id"])) else: - session['waybill'][str(product['work_id'])] = str(amount-1) - session['total'] = str(int(session['total']) - product['price']) + session["waybill"][str(product["work_id"])] = str(amount - 1) + session["total"] = str(int(session["total"]) - product["price"]) session.modified = True return True -def transaction_order_model(user_id: int, current_date: date): - db_config = current_app.config['db_config'] - waybill = session.get('waybill',{}) - total = session.get('total', 0) +def transaction_order_model(user_id: int, current_date: date) -> InfoRespronse: + db_config = current_app.config["db_config"] + waybill = session.get("waybill", {}) + total = session.get("total", 0) + result = None # Чтобы всё это шло как одна транзакция with DBContextManager(db_config) as cursor: - + if cursor is None: + raise ValueError("Cursor not created") data = dict(e_user_id=user_id, e_order_date=current_date, e_total=total) - try: - _sql = sql_provider.get('create_order.sql', data) - cursor.execute(_sql) - - order_id = cursor.lastrowid - for key, value in waybill.items(): - _sql = sql_provider.get('insert_order_line.sql', - dict(e_order_id = order_id, - e_price = 0, - e_prod_id = int(key), - e_amount = int(value))) - cursor.execute(_sql) - except: - return InfoRespronse((), error_message="Заказ не был создан", status=False) + _sql = sql_provider.get("create_order.sql", data) + cursor.execute(_sql) - result = tuple([order_id]) + order_id = cursor.lastrowid + for key, value in waybill.items(): + _sql = sql_provider.get( + "insert_order_line.sql", + dict( + e_order_id=order_id, + e_price=0, + e_prod_id=int(key), + e_amount=int(value), + ), + ) + cursor.execute(_sql) + result = tuple([order_id]) + if result is None: + return InfoRespronse((), error_message="Заказ не был создан", status=False) clear() - return InfoRespronse(result, error_message="", status=True) \ No newline at end of file + return InfoRespronse(result, error_message="", status=True) diff --git a/App/app.py b/App/app.py index 7f96e0e..97f561d 100644 --- a/App/app.py +++ b/App/app.py @@ -1,32 +1,52 @@ +import json +import os + +from Auth import auth_bp from flask import Flask, render_template, session +from Report import report_bp from Requests import requests_bp from Waybill import waybill_bp -from Auth import auth_bp -from Report import report_bp -import os, json app = Flask(__name__) -app.secret_key = 'suplex' +app.secret_key = "suplex" app.config.update( - db_config=json.load(open(os.path.join(os.path.dirname(__file__), 'data/config.json'), encoding='utf-8')), - db_access=json.load(open(os.path.join(os.path.dirname(__file__), 'data/db_access.json'), encoding='utf-8')), - cache_config=json.load(open(os.path.join(os.path.dirname(__file__), 'data/redis_config.json'), encoding='utf-8')) + db_config=json.load( + open( + os.path.join(os.path.dirname(__file__), "data/config.json"), + encoding="utf-8", + ) + ), + db_access=json.load( + open( + os.path.join(os.path.dirname(__file__), "data/db_access.json"), + encoding="utf-8", + ) + ), + cache_config=json.load( + open( + os.path.join(os.path.dirname(__file__), "data/redis_config.json"), + encoding="utf-8", + ) + ), ) -app.register_blueprint(requests_bp, url_prefix='/requests') -app.register_blueprint(auth_bp, url_prefix='/auth') -app.register_blueprint(report_bp, url_prefix='/report') -app.register_blueprint(waybill_bp, url_prefix='/waybill') +app.register_blueprint(requests_bp, url_prefix="/requests") +app.register_blueprint(auth_bp, url_prefix="/auth") +app.register_blueprint(report_bp, url_prefix="/report") +app.register_blueprint(waybill_bp, url_prefix="/waybill") -@app.route('/') + +@app.route("/") def index(): - return render_template('main_menu.html', ses=session) + return render_template("main_menu.html", ses=session) -@app.route('/logout') + +@app.route("/logout") def logout(): session.clear() - return render_template('main_menu.html', ses=session) + return render_template("main_menu.html", ses=session) -if __name__ == '__main__': - app.run(port=5002, host='0.0.0.0') \ No newline at end of file + +if __name__ == "__main__": + app.run(port=5002, host="0.0.0.0") diff --git a/App/cache/__init__.py b/App/cache/__init__.py index d902592..cee4562 100644 --- a/App/cache/__init__.py +++ b/App/cache/__init__.py @@ -1,5 +1,7 @@ import json -from redis import Redis, ConnectionError, DataError + +from redis import ConnectionError, DataError, Redis + class RedisCache: def __init__(self, config: dict): @@ -36,7 +38,7 @@ class RedisCache: return value_dict else: return None - + def __exit__(self, exc_type, exc_val, exc_tb): self.conn.close() - return True \ No newline at end of file + return True diff --git a/App/cache/wrapper.py b/App/cache/wrapper.py index 11cd216..ab4e96e 100644 --- a/App/cache/wrapper.py +++ b/App/cache/wrapper.py @@ -1,15 +1,18 @@ from functools import wraps + from . import RedisCache + def fetch_from_cache(cache_name: str, cache_config: dict): - cache_conn = RedisCache(cache_config['redis']) - ttl = cache_config['ttl'] + cache_conn = RedisCache(cache_config["redis"]) + ttl = cache_config["ttl"] """ It checks if a cached value exists for a given cache_name. If a cached value exists, it returns the cached value. If no cached value exists, it calls the original function f with the provided arguments, caches the result, and then returns the result. """ + def decorator(f): @wraps(f) def wrapper(*args, **kwargs): @@ -19,7 +22,9 @@ def fetch_from_cache(cache_name: str, cache_config: dict): return cached_value response = f(*args, **kwargs) print("response=", response) - cache_conn.set_value(cache_name,response,ttl) + cache_conn.set_value(cache_name, response, ttl) return response + return wrapper - return decorator \ No newline at end of file + + return decorator diff --git a/App/checker.py b/App/checker.py index 343d9b2..ef3473c 100644 --- a/App/checker.py +++ b/App/checker.py @@ -1,19 +1,22 @@ -from flask import redirect, url_for, session, request, current_app, render_template from functools import wraps +from flask import (current_app, redirect, render_template, request, session, + url_for) + def check_auth(func): @wraps(func) def wrapper(*args, **kwargs): - if 'login' not in session: - return redirect(url_for('auth_bp.auth')) - user_role = session.get('role') + if "login" not in session: + return redirect(url_for("auth_bp.auth")) + user_role = session.get("role") user_request = request.endpoint - print('request_endpoint=', user_request) - user_bp = user_request.split('.')[0] - access = current_app.config['db_access'] + print("request_endpoint=", user_request) + user_bp = user_request.split(".")[0] + access = current_app.config["db_access"] if user_role in access and user_bp in access[user_role]: return func(*args, **kwargs) else: - return render_template('error.html', error_message='Недостаточно прав') - return wrapper \ No newline at end of file + return render_template("error.html", error_message="Недостаточно прав") + + return wrapper