pre-commit changes

This commit is contained in:
2024-12-08 12:17:19 +03:00
parent 2852ce1f96
commit 87e9029b09
15 changed files with 400 additions and 293 deletions

View File

@@ -1,7 +1,9 @@
import pymysql
from pymysql.err import *
class DBContextManager:
def __init__(self, db_config : dict):
def __init__(self, db_config: dict):
self.db_config = db_config
self.connection = None
self.cursor = None
@@ -9,27 +11,26 @@ class DBContextManager:
def __enter__(self):
try:
self.connection = pymysql.connect(
host=self.db_config['host'],
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
db=self.db_config['db'],
charset=self.db_config['charset']
host=self.db_config["host"],
port=self.db_config["port"],
user=self.db_config["user"],
password=self.db_config["password"],
db=self.db_config["db"],
charset=self.db_config["charset"],
)
self.cursor = self.connection.cursor()
return self.cursor
except (OperationalError, KeyError) as err:
print(err.args)
return None
def __exit__(self, exc_type, exc_val, exc_tb):
if self.connection and self.cursor:
if exc_type:
print(exc_type, '\n', exc_val)
print(exc_type, "\n", exc_val)
self.connection.rollback()
else:
self.connection.commit()
self.cursor.close()
self.connection.close()
return True

View File

@@ -1,14 +1,15 @@
import os
from string import Template
class SQLProvider:
def __init__(self, file_path):
self.scripts = {}
for file in os.listdir(file_path):
_sql = open(f'{file_path}/{file}').read()
_sql = open(f"{file_path}/{file}").read()
self.scripts[file] = Template(_sql)
def get(self, name, params) -> dict:
if name not in self.scripts:
raise ValueError(f'SQL template {name} not found')
return self.scripts[name].substitute(**params)
raise ValueError(f"SQL template {name} not found")
return self.scripts[name].substitute(**params)

View File

@@ -1,5 +1,6 @@
from .DBconnect import DBContextManager
def select_list(db_config, sql) -> list:
with DBContextManager(db_config) as cursor:
if cursor is None:
@@ -11,6 +12,7 @@ def select_list(db_config, sql) -> list:
lst = [dict(zip(schema, row)) for row in result]
return lst
def procedure(db_config, name, args: tuple) -> list:
with DBContextManager(db_config) as cursor:
if cursor is None:
@@ -22,6 +24,7 @@ def procedure(db_config, name, args: tuple) -> list:
lst = dict(zip(schema, result))
return lst
def transaction(cursor, sql):
cursor.execute(sql)
return True
return True