This repository has been archived on 2025-10-24. You can view files and clone it, but cannot push or open issues or pull requests.
Files
RIS/App/cache/__init__.py
2024-12-08 12:17:19 +03:00

45 lines
1.1 KiB
Python

import json
from redis import ConnectionError, DataError, Redis
class RedisCache:
def __init__(self, config: dict):
self.config = config
self.conn = self._connect()
def _connect(self):
try:
conn = Redis(**self.config)
return conn
except DataError as err:
print(err)
return None
def set_value(self, name: str, value_dict: dict, ttl: int):
try:
value_js = json.dumps(value_dict)
self.conn.set(name=name, value=value_js)
if ttl > 0:
self.conn.expire(name, ttl)
return True
except ConnectionError as err:
print(err)
return False
def get_value(self, name: str):
try:
value_js = self.conn.get(name)
except:
return None
value_js = self.conn.get(name)
if value_js:
value_dict = json.loads(value_js)
return value_dict
else:
return None
def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()
return True