Source code for dazzler.contrib.redis
import json
import os
from typing import Any
from aiohttp import web
from dazzler.events import DAZZLER_SETUP, DAZZLER_STOP
from dazzler.system import Middleware, UNDEFINED
from dazzler.system.session import SessionBackEnd
[docs]async def get_redis_pool():
import aioredis
return await aioredis.create_redis_pool(
os.getenv('REDIS_URL', 'redis://localhost:6379')
)
[docs]class RedisMiddleware(Middleware):
"""
Insert the aioredis pool instance into the aiohttp app and requests.
:type redis: aioredis.Redis
:type app: dazzler.Dazzler
"""
[docs] def __init__(self, app, redis=None):
self.app = app
self.redis = redis
app.events.subscribe('dazzler_setup', self._setup)
async def _setup(self, _):
if not self.redis:
self.redis = await get_redis_pool()
self.app.server.app['redis'] = self.redis
async def __call__(self, request: web.Request):
request['redis'] = self.redis
[docs]class RedisSessionBackend(SessionBackEnd):
"""
Backed by aioredis.
Values are serialized to json first to keep the types.
Install with ``pip install dazzler[redis]``
:seealso: https://aioredis.readthedocs.io/
:type redis: aioredis.Redis
"""
[docs] def __init__(self, app, redis=None):
super().__init__(app)
self.redis = redis
app.events.subscribe(DAZZLER_SETUP, self._setup)
app.events.subscribe(DAZZLER_STOP, self._cleanup)
async def _cleanup(self, _):
self.redis.close()
await self.redis.wait_closed()
async def _setup(self, _):
if not self.redis and 'redis' in self.app.server.app:
self.redis = self.app.server.app['redis']
else:
self.redis = await get_redis_pool()
[docs] async def set(self, session_id: str, key: str, value: Any):
transaction = self.redis.multi_exec()
# Serialize to keep the type.
transaction.hset(session_id, key, json.dumps({'v': value}))
transaction.expire(
session_id, self.app.config.session.duration
)
await transaction.execute()
[docs] async def get(self, session_id: str, key: str):
if not await self.redis.hexists(session_id, key):
return UNDEFINED
data = await self.redis.hget(session_id, key)
await self.redis.expire(
session_id, self.app.config.session.duration
)
data = json.loads(data)
return data['v']
[docs] async def delete(self, session_id: str, key: str):
await self.redis.hdel(session_id, key)