Выделение БД и кэша как отдельные модули

This commit is contained in:
2024-12-04 16:38:30 +03:00
parent 76525f19d3
commit 580b7d07e5
15 changed files with 11 additions and 129 deletions

View File

@@ -13,7 +13,8 @@ class DBContextManager:
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
db=self.db_config['db']
db=self.db_config['db'],
charset=self.db_config['charset']
)
self.cursor = self.connection.cursor()
return self.cursor

View File

@@ -11,7 +11,7 @@ def select_list(db_config, sql) -> list:
lst = [dict(zip(schema, row)) for row in result]
return lst
def procedure(db_config, name, args: tuple):
def procedure(db_config, name, args: tuple) -> list:
with DBContextManager(db_config) as cursor:
if cursor is None:
raise ValueError("Cursor not created")
@@ -20,4 +20,8 @@ def procedure(db_config, name, args: tuple):
result = cursor.fetchall()[0]
schema = cursor.description[0]
lst = dict(zip(schema, result))
return lst
return lst
def transaction(cursor, sql):
cursor.execute(sql)
return True

View File

@@ -1,34 +0,0 @@
import pymysql
from pymysql.err import *
class DBContextManager:
def __init__(self, db_config : dict):
self.db_config = db_config
self.connection = None
self.cursor = None
def __enter__(self):
try:
self.connection = pymysql.connect(
host=self.db_config['host'],
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
db=self.db_config['db']
)
self.cursor = self.connection.cursor()
return self.cursor
except (OperationalError, KeyError) as err:
print(err.args)
return None
def __exit__(self, exc_type, exc_val, exc_tb):
if self.connection and self.cursor:
if exc_type:
print(exc_type, '\n', exc_val)
self.connection.rollback()
else:
self.connection.commit()
self.cursor.close()
self.connection.close()
return True

View File

@@ -1,12 +0,0 @@
from .DBconnect import DBContextManager
def select_list(db_config, sql) -> list:
with DBContextManager(db_config) as cursor:
if cursor is None:
raise ValueError("Cursor not created")
else:
cursor.execute(sql)
result = cursor.fetchall()
schema = [item[0] for item in cursor.description]
lst = [dict(zip(schema, row)) for row in result]
return lst

View File

@@ -1,14 +0,0 @@
import os
from string import Template
class SQLProvider:
def __init__(self, file_path):
self.scripts = {}
for file in os.listdir(file_path):
_sql = open(f'{file_path}/{file}').read()
self.scripts[file] = Template(_sql)
def get(self, name, params) -> dict:
if name not in self.scripts:
raise ValueError(f'SQL template {name} not found')
return self.scripts[name].substitute(**params)

View File

@@ -1,34 +0,0 @@
import pymysql
from pymysql.err import *
class DBContextManager:
def __init__(self, db_config : dict):
self.db_config = db_config
self.connection = None
self.cursor = None
def __enter__(self):
try:
self.connection = pymysql.connect(
host=self.db_config['host'],
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
db=self.db_config['db']
)
self.cursor = self.connection.cursor()
return self.cursor
except (OperationalError, KeyError) as err:
print(err.args)
return None
def __exit__(self, exc_type, exc_val, exc_tb):
if self.connection and self.cursor:
if exc_type:
print(exc_type, '\n', exc_val)
self.connection.rollback()
else:
self.connection.commit()
self.cursor.close()
self.connection.close()
return True

View File

@@ -1,14 +0,0 @@
import os
from string import Template
class SQLProvider:
def __init__(self, file_path):
self.scripts = {}
for file in os.listdir(file_path):
_sql = open(f'{file_path}/{file}').read()
self.scripts[file] = Template(_sql)
def get(self, name, params) -> dict:
if name not in self.scripts:
raise ValueError(f'SQL template {name} not found')
return self.scripts[name].substitute(**params)

View File

@@ -1,16 +0,0 @@
from .DBconnect import DBContextManager
def select_list(db_config, sql) -> list:
with DBContextManager(db_config) as cursor:
if cursor is None:
raise ValueError("Cursor not created")
else:
cursor.execute(sql)
result = cursor.fetchall()
schema = [item[0] for item in cursor.description]
lst = [dict(zip(schema, row)) for row in result]
return lst
def transaction(cursor, sql):
cursor.execute(sql)
return True

View File

@@ -1,5 +1,5 @@
from functools import wraps
from Waybill.cache import RedisCache
from . import RedisCache
def fetch_from_cache(cache_name: str, cache_config: dict):
cache_conn = RedisCache(cache_config['redis'])

View File

@@ -3,5 +3,6 @@
"port": 3306,
"user": "manager",
"password": "ilikepizza",
"db": "sklad"
"db": "sklad",
"charset": "utf8"
}