import json from redis import ConnectionError, DataError, Redis class RedisCache: def __init__(self, config: dict): self.config = config self.conn = self._connect() def _connect(self): try: conn = Redis(**self.config) return conn except DataError as err: print(err) return None def set_value(self, name: str, value_dict: dict, ttl: int): try: value_js = json.dumps(value_dict) self.conn.set(name=name, value=value_js) if ttl > 0: self.conn.expire(name, ttl) return True except ConnectionError as err: print(err) return False def get_value(self, name: str): try: value_js = self.conn.get(name) except: return None value_js = self.conn.get(name) if value_js: value_dict = json.loads(value_js) return value_dict else: return None def __exit__(self, exc_type, exc_val, exc_tb): self.conn.close() return True