Source code for dazzler.contrib.postgresql

import functools
import hashlib
import os
import secrets
import sys
import traceback
import typing
from typing import Any

from aiohttp import web
from precept import Config, ConfigProperty, Nestable

from dazzler.events import DAZZLER_SETUP, DAZZLER_STOP

from dazzler.pages.user_admin import AdminRole, UserAdminPage, AdminUser
from dazzler.system import Middleware as DMiddleware, UNDEFINED
from dazzler.system.auth import Authenticator, User
from dazzler.system.session import SessionBackEnd
from dazzler.tools import replace_all


_sql_formatter = functools.partial(
    replace_all, open_bracket='${', end_bracket='}'
)

_create_session_table_statement = '''
CREATE TABLE  ${schema}.${table} (
    session_id VARCHAR (32) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW(),
    data JSONB NOT NULL,
    PRIMARY KEY (session_id)
)
'''
_insert_session_statement = '''
INSERT INTO ${schema}.${table} (session_id, data) VALUES (%s, '{}')
'''

_update_session_statement = '''
UPDATE ${schema}.${table}
SET data = jsonb_set(data, %s, %s, true)
WHERE session_id = %s;
'''

_trigger_func_statement = '''
CREATE OR REPLACE FUNCTION trigger_set_timestamp()
RETURNS TRIGGER AS $$
BEGIN
  NEW.updated_at = NOW();
  RETURN NEW;
END;
$$ LANGUAGE plpgsql
'''
_trigger_statement = '''
CREATE TRIGGER set_timestamp
BEFORE UPDATE ON ${schema}.${table}
FOR EACH ROW
EXECUTE PROCEDURE trigger_set_timestamp();
'''

_table_exists_statement = '''
SELECT EXISTS(
    SELECT * FROM information_schema.tables
    WHERE table_schema = %s AND table_name = %s
);
'''

_get_session_value_statement = '''
SELECT data -> %s
FROM ${schema}.${table}
WHERE session_id = %s
'''

_delete_session_value_statement = '''
UPDATE ${schema}.${table}
SET data = data - %s
WHERE session_id = %s;
'''

_user_pw_select_statement = '''
select username, password, salt
from ${schema}.${table}
where username = %s and active = true
'''

_get_user_statement = '''
select u.username, u.email, u.metadata, array_agg(r.name) roles
from ${schema}.${user} u
left join ${schema}.${user_roles} ur on u.user_id = ur.user_id
join ${schema}.${role} r on ur.role_id = r.role_id
where u.username = %s
group by u.username, u.email, u.active, u.metadata
'''

_insert_user_statement = '''
insert into ${schema}.${table}
    (username, email, password, salt, metadata)
values (%s, %s, %s, %s, %s);
'''

_create_user_table_statement = '''
create table ${schema}.${table} (
    user_id serial primary key,
    username varchar(100) not null,
    email varchar(256),
    password bytea not null,
    salt bytea not null,
    created_at timestamp default now(),
    updated_at timestamp default now(),
    active bool default true,
    metadata jsonb default '{}'
);
create unique index users_username_idx on ${schema}.${table} (username);
create unique index users_email_idx on ${schema}.${table} (email);
'''

_create_role_table_statement = '''
create table ${schema}.${table} (
    role_id serial primary key,
    name varchar(100) not null,
    description text
);
create unique index role_name_idx on ${schema}.${table} (name);
'''

_create_user_roles_table_statement = '''
create table ${schema}.${user_roles} (
    role_id int not null,
    user_id int not null,

    primary key (role_id, user_id),
    constraint role_id_fk foreign key (role_id)
        references ${schema}.${role}(role_id) on delete cascade,
    constraint username_fk foreign key (user_id)
        references ${schema}.${users}(user_id) on delete cascade
);
'''
_insert_role_statement = '''
insert into ${schema}.${table} (name, description) values (%s, %s)
'''
_insert_user_role_statement = '''
insert into ${schema}.${user_roles} (role_id, user_id)
values (
    (select role_id from ${schema}.${role} where name = %s),
    (select user_id from ${schema}.${users} where username = %s)
)
'''

_admin_get_users_statement = '''
select u.username, u.active, array_agg(r.name)
from ${schema}.${users} u
left join ${schema}.${user_roles} ur on u.user_id= ur.user_id
left join ${schema}.${role} r on ur.role_id = r.role_id
${where}
group by u.username, u.created_at, u.active
order by u.created_at
limit %s offset %s
'''

_admin_get_users_count_statement = '''
select count(distinct u.user_id)
from ${schema}.${users} u
left join ${schema}.${user_roles} ur on u.user_id = ur.user_id
left join ${schema}.${role} r on ur.role_id = r.role_id
${where}
'''

_admin_get_roles_statement = '''
select name, description from ${schema}.${table}
'''
_admin_insert_role_statement = '''
insert into ${schema}.${table} (name, description)
values (%s, %s)
returning role_id
'''
_admin_delete_role_statement = '''
delete from ${schema}.${table}
where name = %s
'''
_admin_update_active_user_statement = '''
update ${schema}.${table} set active = %s where username  = %s
'''

_admin_delete_user_roles_statement = '''
delete from ${schema}.${user_roles}
where role_id not in (
    select role_id
    from ${schema}.${role}
    where name = any(%s)
)
and user_id = (select user_id from ${schema}.${users} where username = %s)
'''
_admin_insert_user_roles_statement = '''
with user_to_edit (cur_user_id) as (
    select user_id
    from ${schema}.${users}
    where username = %s
)
insert into ${schema}.${user_roles}
    select r.role_id, cur_user_id
    from ${schema}.${role} r, user_to_edit
    where r.role_id in (
        select sr.role_id
        from ${schema}.${role} sr
        where sr.name = any (%s)
    ) and r.role_id not in (
        select role_id
        from ${schema}.${user_roles}
        where user_id = cur_user_id
    )
;
'''
_admin_delete_user_statement = '''
delete from ${schema}.${table} where username = %s
'''
_admin_update_role_description_statement = '''
update ${schema}.${table} set description = %s where name = %s
'''


def _add_where(statement):
    if not statement.startswith('WHERE'):
        return 'WHERE'
    return ' AND'


async def _table_exists(cursor, schema, table):
    await cursor.execute(_table_exists_statement, [schema, table])
    result = await cursor.fetchone()
    if result:
        return result[0]


[docs]class PostgresConfig(Config):
[docs] class Postgres(Nestable): dsn = ConfigProperty( default='', auto_environ=True, environ_name='POSTGRES_DSN' )
[docs] class Session(Nestable): table_name = ConfigProperty( default='session', auto_environ=True, environ_name='POSTGRES_SESSION_TABLE' ) schema_name = ConfigProperty( default='public', auto_environ=True, environ_name='POSTGRES_SESSION_SCHEMA' )
session: Session
[docs] class Pool(Nestable): minsize = ConfigProperty( default=1, ) maxsize = ConfigProperty( default=10, )
pool: Pool
[docs] class Middleware(Nestable): request_key = ConfigProperty(default='postgres') app_key = ConfigProperty(default='postgres')
middleware: Middleware
[docs] class Auth(Nestable): schema_name = ConfigProperty( default='public', auto_environ=True, environ_name='POSTGRES_AUTH_SCHEMA' ) user_table_name = ConfigProperty( default='users', auto_environ=True, environ_name='POSTGRES_USER_TABLE' ) role_table_name = ConfigProperty( default='role', auto_environ=True, environ_name='POSTGRES_ROLE_TABLE' ) user_roles_table_name = ConfigProperty( default='user_roles', auto_environ=True, environ_name='POSTGRES_USER_ROLES_TABLE' ) roles = ConfigProperty( default=['admin', 'user'], config_type=list, ) default_user_roles = ConfigProperty( default=['user'], config_type=list, )
[docs] class Encryption(Nestable): cost_factor = ConfigProperty(default=128) blocksize = ConfigProperty(default=64) parallelism = ConfigProperty(default=1) maxmem = ConfigProperty(default=0) dklen = ConfigProperty(default=64)
encryption: Encryption
auth: Auth
postgres: Postgres
[docs]async def get_postgres_pool(config: PostgresConfig): import aiopg pool = await aiopg.create_pool( dsn=config.postgres.dsn, minsize=config.postgres.pool.minsize, maxsize=config.postgres.pool.maxsize, ) return pool
async def _get_pool_from_app(app, config: PostgresConfig): if config.postgres.middleware.app_key in app.server.app: return app.server.app[ config.postgres.middleware.app_key ] return await get_postgres_pool(config)
[docs]class PostgresMiddleware(DMiddleware): """ :type pool: aiopg.Pool """
[docs] def __init__(self, app, config: PostgresConfig, pool=None): self.app = app self.config = config self.pool = pool app.events.subscribe(DAZZLER_SETUP, self._setup) app.events.subscribe(DAZZLER_STOP, self._cleanup)
async def _setup(self, _): if not self.pool: self.pool = await get_postgres_pool(self.config) self.app.server.app[ self.config.postgres.middleware.app_key ] = self.pool async def _cleanup(self, _): self.pool.close() await self.pool.wait_closed() async def __call__(self, request: web.Request): request[self.config.postgres.middleware.request_key] = self.pool
[docs]class PostgresSessionBackend(SessionBackEnd): """ Session backend for PostgreSQL. :Tables: - ``session`` :Configuration: .. code-block:: toml [session] backend = 'PostgreSQL' :type pool: aiopg.Pool """
[docs] def __init__(self, app, config: PostgresConfig = None, pool=None): super().__init__(app) if not config: config = PostgresConfig() if app.config_path: config.read_file(app.config_path) self.config = config self.pool = pool from psycopg2.extras import Json self._json = Json self._insert_statement = _sql_formatter( _insert_session_statement, schema=config.postgres.session.schema_name, table=config.postgres.session.table_name, ) self._update_statement = _sql_formatter( _update_session_statement, schema=config.postgres.session.schema_name, table=config.postgres.session.table_name, ) self._get_statement = _sql_formatter( _get_session_value_statement, schema=config.postgres.session.schema_name, table=config.postgres.session.table_name, ) self._delete_statement = _sql_formatter( _delete_session_value_statement, schema=config.postgres.session.schema_name, table=config.postgres.session.table_name, ) app.events.subscribe(DAZZLER_SETUP, self._setup)
async def _setup(self, _): conf = self.config if not self.pool: self.pool = await _get_pool_from_app(self.app, conf) async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( _table_exists_statement, [ conf.postgres.session.schema_name, conf.postgres.session.table_name, ] ) exists = await cursor.fetchone() if not exists[0]: self.app.logger.debug('Create session table.') await cursor.execute( _sql_formatter( _create_session_table_statement, schema=conf.postgres.session.schema_name, table=conf.postgres.session.table_name, ), ) await cursor.execute(_trigger_func_statement) await cursor.execute( _sql_formatter( _trigger_statement, schema=conf.postgres.session.schema_name, table=conf.postgres.session.table_name, ), )
[docs] async def set(self, session_id: str, key: str, value: Any): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: try: self.app.logger.debug( f'Set session {session_id}: {key}: {value}') await cursor.execute( self._update_statement, [ [key], self._json(value), session_id ] ) except Exception as err: self.app.logger.exception(err) raise err
[docs] async def get(self, session_id: str, key: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: try: await cursor.execute( self._get_statement, [key, session_id] ) value = await cursor.fetchone() if value: v = value[0] if v is None: return UNDEFINED return v except Exception as err: self.app.logger.exception(err) raise err return UNDEFINED
[docs] async def delete(self, session_id: str, key: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: try: await cursor.execute( self._delete_statement, [key, session_id] ) except Exception as err: self.app.logger.exception(err) raise err
[docs] async def on_new_session(self, session_id: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: try: await cursor.execute( self._insert_statement, [session_id] ) except Exception as err: self.app.logger.exception(err) raise err
[docs]class PostgresAuthenticator(Authenticator): """ Authenticator for PostgreSQL. Encryption of user passwords with scrypt. :Tables: - ``users`` - ``role`` - ``user_roles`` :Configuration: .. code-block:: toml [authentication] authenticator = 'dazzler.contrib.postgresql:PostgresAuthenticator' [postgres] dsn = 'host=localhost port=5432 dbname=mydb' [postgres.auth] schema_name = 'public' user_table_name = 'users' :type pool: aiopg.Pool """
[docs] def __init__(self, app, pool=None): super().__init__(app) config = PostgresConfig() if os.path.exists(app.config_path): config.read_file(app.config_path) if not any(isinstance(x, PostgresMiddleware) for x in app.middlewares): # Using another session backend, need to insert the middleware. app.middlewares.insert( 0, PostgresMiddleware(app, config, pool) ) self.config = config self.pool = pool from psycopg2.extras import Json, NamedTupleCursor self._json = Json self._cursor_factory = NamedTupleCursor self._get_user_statement = _sql_formatter( _get_user_statement, schema=config.postgres.auth.schema_name, user=config.postgres.auth.user_table_name, user_roles=config.postgres.auth.user_roles_table_name, role=config.postgres.auth.role_table_name, ) self._insert_user_statement = _sql_formatter( _insert_user_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.user_table_name, ) self._get_user_pw_statement = _sql_formatter( _user_pw_select_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.user_table_name, ) self._insert_role_statement = _sql_formatter( _insert_role_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.role_table_name, ) self._insert_user_roles_statement = _sql_formatter( _insert_user_role_statement, schema=config.postgres.auth.schema_name, user_roles=config.postgres.auth.user_roles_table_name, role=config.postgres.auth.role_table_name, users=config.postgres.auth.user_table_name, ) app.events.subscribe(DAZZLER_SETUP, self.setup)
[docs] async def setup(self, _): conf = self.config.postgres if not self.pool: self.pool = await _get_pool_from_app(self.app, self.config) # setup tables async with self.pool.acquire() as conn: async with conn.cursor() as cursor: exists = await _table_exists( cursor, conf.auth.schema_name, conf.auth.user_table_name ) if exists: return self.app.logger.debug( f'Creating auth tables, schema: {conf.auth.schema_name}' ) await cursor.execute(_trigger_func_statement) await cursor.execute( _sql_formatter( _create_user_table_statement, schema=conf.auth.schema_name, table=conf.auth.user_table_name, ), ) await cursor.execute( _sql_formatter( _trigger_statement, schema=conf.auth.schema_name, table=conf.auth.user_table_name, ), ) await cursor.execute( _sql_formatter( _create_role_table_statement, schema=conf.auth.schema_name, table=conf.auth.role_table_name ) ) await cursor.execute( _sql_formatter( _create_user_roles_table_statement, schema=conf.auth.schema_name, user_roles=conf.auth.user_roles_table_name, users=conf.auth.user_table_name, role=conf.auth.role_table_name, ) ) self.app.logger.debug( f'Creating user roles: {conf.auth.roles}' ) # Seems there is no support for executemany in aiopg for role in conf.auth.roles: await cursor.execute( self._insert_role_statement, [role, f'default {role} role'] )
[docs] async def authenticate(self, username: str, password: str): async with self.pool.acquire() as conn: async with conn.cursor( cursor_factory=self._cursor_factory ) as cursor: await cursor.execute( self._get_user_pw_statement, [username] ) userdata = await cursor.fetchone() if not userdata: return encrypted = await self.app.executor.execute( hashlib.scrypt, password.encode(), n=128, r=64, p=1, salt=userdata.salt ) valid = secrets.compare_digest(encrypted, userdata.password) if valid: # this only set the session, # no need for the other attributes return User(username)
[docs] async def get_user(self, username: str): if not username: return async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(self._get_user_statement, [username]) userdata = await cursor.fetchone() return User( username, email=userdata[1], metadata=userdata[2], roles=userdata[3], )
[docs] async def register_user( self, username: str, password: str, email: str = None, fields: dict = None, ): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: try: salt = self._gen_salt() encrypted = await self._encrypt(password.encode(), salt) await cursor.execute( self._insert_user_statement, [ username, email, encrypted, salt, self._json(fields or {}) ] ) for role in self.config.postgres.auth.default_user_roles: await cursor.execute( self._insert_user_roles_statement, [role, username] ) return None except Exception as err: # pylint: disable=broad-except self.app.logger.exception(err) return traceback.format_exc()
@staticmethod def _gen_salt(): return secrets.randbits(128).to_bytes(16, sys.byteorder) async def _encrypt( self, value: bytes, salt: bytes ): return await self.app.executor.execute( hashlib.scrypt, value, salt=salt, n=self.config.postgres.auth.encryption.cost_factor, r=self.config.postgres.auth.encryption.blocksize, p=self.config.postgres.auth.encryption.parallelism, maxmem=self.config.postgres.auth.encryption.maxmem, dklen=self.config.postgres.auth.encryption.dklen, )
[docs]class PostgresUserAdminPage(UserAdminPage): """ Implementation of UserAdminPage for PostgreSQL. :Configuration: .. code-block:: toml [authentication.admin] enable = true page_ref = 'dazzler.contrib.postgresql:PostgresUserAdminPage' :type pool: aiopg.Pool """
[docs] def __init__(self, app, config: PostgresConfig = None, **kwargs): super().__init__(app, **kwargs) self.config = config if not config: self.config = config = PostgresConfig() if app.config_path: self.config.read_file(app.config_path) self.pool = None self._get_users_statement = _sql_formatter( _admin_get_users_statement, schema=config.postgres.auth.schema_name, users=config.postgres.auth.user_table_name, role=config.postgres.auth.role_table_name, user_roles=config.postgres.auth.user_roles_table_name, ) self._get_user_count_statement = _sql_formatter( _admin_get_users_count_statement, schema=config.postgres.auth.schema_name, users=config.postgres.auth.user_table_name, role=config.postgres.auth.role_table_name, user_roles=config.postgres.auth.user_roles_table_name, ) self._get_roles_statement = _sql_formatter( _admin_get_roles_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.role_table_name, ) self._update_active_user_statement = _sql_formatter( _admin_update_active_user_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.user_table_name, ) self._insert_role_statement = _sql_formatter( _admin_insert_role_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.role_table_name, ) self._delete_role_statement = _sql_formatter( _admin_delete_role_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.role_table_name, ) self._delete_user_roles_statement = _sql_formatter( _admin_delete_user_roles_statement, schema=config.postgres.auth.schema_name, user_roles=config.postgres.auth.user_roles_table_name, role=config.postgres.auth.role_table_name, users=config.postgres.auth.user_table_name, ) self._insert_user_roles_statement = _sql_formatter( _admin_insert_user_roles_statement, schema=config.postgres.auth.schema_name, user_roles=config.postgres.auth.user_roles_table_name, role=config.postgres.auth.role_table_name, users=config.postgres.auth.user_table_name, ) self._delete_user_statement = _sql_formatter( _admin_delete_user_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.user_table_name, ) self._update_role_description_statement = _sql_formatter( _admin_update_role_description_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.role_table_name, )
[docs] async def setup(self, _): if not self.pool: self.pool = await _get_pool_from_app(self.app, self.config)
@staticmethod def _build_user_filter(filters): where = '' args = [] if filters: username = filters.get('username') if username: where = 'WHERE u.username ~* %s' args.append(username) user_roles = filters.get('user_roles') if user_roles and len(user_roles): where += _add_where(where) + ' r.name = any(%s)' args.append(user_roles) active = filters.get('active', UNDEFINED) if active is not UNDEFINED: where += _add_where(where) + ' u.active = %s' args.append(active) return where, args
[docs] async def get_users(self, offset, limit, filters=None): where, args = self._build_user_filter(filters) statement = _sql_formatter(self._get_users_statement, where=where) async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( statement, args + [limit, offset] ) return [ AdminUser(x[0], x[1], [y for y in x[2] if y]) for x in await cursor.fetchall() ]
[docs] async def get_user_count(self, filters=None): where, args = self._build_user_filter(filters) statement = _sql_formatter(self._get_user_count_statement, where=where) async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(statement, args) result = await cursor.fetchone() return result[0]
[docs] async def get_roles(self): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(self._get_roles_statement) return [ AdminRole(*role) for role in await cursor.fetchall() ]
[docs] async def toggle_active_user(self, username: str, active: bool): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._update_active_user_statement, [active, username] )
[docs] async def change_user_roles(self, username: str, roles: typing.List[str]): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._delete_user_roles_statement, [roles, username] ) await cursor.execute( self._insert_user_roles_statement, [username, roles] )
[docs] async def delete_user(self, username: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._delete_user_statement, [username] )
[docs] async def create_role(self, role: str, description: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._insert_role_statement, [role, description or None] )
[docs] async def delete_role(self, role: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._delete_role_statement, [role] )
[docs] async def update_role_description(self, role: str, description: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._update_role_description_statement, [description, role] )