import functools
import hashlib
import os
import secrets
import sys
import traceback
import typing
from typing import Any

from aiohttp import web
from precept import Config, ConfigProperty, Nestable


from dazzler.pages.user_admin import AdminRole, UserAdminPage, AdminUser
from dazzler.system import Middleware as DMiddleware, UNDEFINED
from dazzler.system.auth import Authenticator, User
from dazzler.system.session import SessionBackEnd
from import replace_all

_sql_formatter = functools.partial(
    replace_all, open_bracket='${', end_bracket='}'

_create_session_table_statement = '''
CREATE TABLE  ${schema}.${table} (
    session_id VARCHAR (32) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW(),
    data JSONB NOT NULL,
    PRIMARY KEY (session_id)
_insert_session_statement = '''
INSERT INTO ${schema}.${table} (session_id, data) VALUES (%s, '{}')

_update_session_statement = '''
UPDATE ${schema}.${table}
SET data = jsonb_set(data, %s, %s, true)
WHERE session_id = %s;

_trigger_func_statement = '''
CREATE OR REPLACE FUNCTION trigger_set_timestamp()
  NEW.updated_at = NOW();
$$ LANGUAGE plpgsql
_trigger_statement = '''
CREATE TRIGGER set_timestamp
BEFORE UPDATE ON ${schema}.${table}
EXECUTE PROCEDURE trigger_set_timestamp();

_table_exists_statement = '''
    SELECT * FROM information_schema.tables
    WHERE table_schema = %s AND table_name = %s

_get_session_value_statement = '''
SELECT data -> %s
FROM ${schema}.${table}
WHERE session_id = %s

_delete_session_value_statement = '''
UPDATE ${schema}.${table}
SET data = data - %s
WHERE session_id = %s;

_user_pw_select_statement = '''
select username, password, salt
from ${schema}.${table}
where username = %s and active = true

_get_user_statement = '''
select u.username,, u.metadata, array_agg( roles
from ${schema}.${user} u
left join ${schema}.${user_roles} ur on u.user_id = ur.user_id
join ${schema}.${role} r on ur.role_id = r.role_id
where u.username = %s
group by u.username,,, u.metadata

_insert_user_statement = '''
insert into ${schema}.${table}
    (username, email, password, salt, metadata)
values (%s, %s, %s, %s, %s);

_create_user_table_statement = '''
create table ${schema}.${table} (
    user_id serial primary key,
    username varchar(100) not null,
    email varchar(256),
    password bytea not null,
    salt bytea not null,
    created_at timestamp default now(),
    updated_at timestamp default now(),
    active bool default true,
    metadata jsonb default '{}'
create unique index users_username_idx on ${schema}.${table} (username);
create unique index users_email_idx on ${schema}.${table} (email);

_create_role_table_statement = '''
create table ${schema}.${table} (
    role_id serial primary key,
    name varchar(100) not null,
    description text
create unique index role_name_idx on ${schema}.${table} (name);

_create_user_roles_table_statement = '''
create table ${schema}.${user_roles} (
    role_id int not null,
    user_id int not null,

    primary key (role_id, user_id),
    constraint role_id_fk foreign key (role_id)
        references ${schema}.${role}(role_id) on delete cascade,
    constraint username_fk foreign key (user_id)
        references ${schema}.${users}(user_id) on delete cascade
_insert_role_statement = '''
insert into ${schema}.${table} (name, description) values (%s, %s)
_insert_user_role_statement = '''
insert into ${schema}.${user_roles} (role_id, user_id)
values (
    (select role_id from ${schema}.${role} where name = %s),
    (select user_id from ${schema}.${users} where username = %s)

_admin_get_users_statement = '''
select u.username,, array_agg(
from ${schema}.${users} u
left join ${schema}.${user_roles} ur on u.user_id= ur.user_id
left join ${schema}.${role} r on ur.role_id = r.role_id
group by u.username, u.created_at,
order by u.created_at
limit %s offset %s

_admin_get_users_count_statement = '''
select count(distinct u.user_id)
from ${schema}.${users} u
left join ${schema}.${user_roles} ur on u.user_id = ur.user_id
left join ${schema}.${role} r on ur.role_id = r.role_id

_admin_get_roles_statement = '''
select name, description from ${schema}.${table}
_admin_insert_role_statement = '''
insert into ${schema}.${table} (name, description)
values (%s, %s)
returning role_id
_admin_delete_role_statement = '''
delete from ${schema}.${table}
where name = %s
_admin_update_active_user_statement = '''
update ${schema}.${table} set active = %s where username  = %s

_admin_delete_user_roles_statement = '''
delete from ${schema}.${user_roles}
where role_id not in (
    select role_id
    from ${schema}.${role}
    where name = any(%s)
and user_id = (select user_id from ${schema}.${users} where username = %s)
_admin_insert_user_roles_statement = '''
with user_to_edit (cur_user_id) as (
    select user_id
    from ${schema}.${users}
    where username = %s
insert into ${schema}.${user_roles}
    select r.role_id, cur_user_id
    from ${schema}.${role} r, user_to_edit
    where r.role_id in (
        select sr.role_id
        from ${schema}.${role} sr
        where = any (%s)
    ) and r.role_id not in (
        select role_id
        from ${schema}.${user_roles}
        where user_id = cur_user_id
_admin_delete_user_statement = '''
delete from ${schema}.${table} where username = %s
_admin_update_role_description_statement = '''
update ${schema}.${table} set description = %s where name = %s

def _add_where(statement):
    if not statement.startswith('WHERE'):
        return 'WHERE'
    return ' AND'

async def _table_exists(cursor, schema, table):
    await cursor.execute(_table_exists_statement, [schema, table])
    result = await cursor.fetchone()
    if result:
        return result[0]

[docs]class PostgresConfig(Config):
[docs] class Postgres(Nestable): dsn = ConfigProperty( default='', auto_environ=True, environ_name='POSTGRES_DSN' )
[docs] class Session(Nestable): table_name = ConfigProperty( default='session', auto_environ=True, environ_name='POSTGRES_SESSION_TABLE' ) schema_name = ConfigProperty( default='public', auto_environ=True, environ_name='POSTGRES_SESSION_SCHEMA' )
session: Session
[docs] class Pool(Nestable): minsize = ConfigProperty( default=1, ) maxsize = ConfigProperty( default=10, )
pool: Pool
[docs] class Middleware(Nestable): request_key = ConfigProperty(default='postgres') app_key = ConfigProperty(default='postgres')
middleware: Middleware
[docs] class Auth(Nestable): schema_name = ConfigProperty( default='public', auto_environ=True, environ_name='POSTGRES_AUTH_SCHEMA' ) user_table_name = ConfigProperty( default='users', auto_environ=True, environ_name='POSTGRES_USER_TABLE' ) role_table_name = ConfigProperty( default='role', auto_environ=True, environ_name='POSTGRES_ROLE_TABLE' ) user_roles_table_name = ConfigProperty( default='user_roles', auto_environ=True, environ_name='POSTGRES_USER_ROLES_TABLE' ) roles = ConfigProperty( default=['admin', 'user'], config_type=list, ) default_user_roles = ConfigProperty( default=['user'], config_type=list, )
[docs] class Encryption(Nestable): cost_factor = ConfigProperty(default=128) blocksize = ConfigProperty(default=64) parallelism = ConfigProperty(default=1) maxmem = ConfigProperty(default=0) dklen = ConfigProperty(default=64)
encryption: Encryption
auth: Auth
postgres: Postgres
[docs]async def get_postgres_pool(config: PostgresConfig): import aiopg pool = await aiopg.create_pool( dsn=config.postgres.dsn, minsize=config.postgres.pool.minsize, maxsize=config.postgres.pool.maxsize, ) return pool
async def _get_pool_from_app(app, config: PostgresConfig): if config.postgres.middleware.app_key in return[ config.postgres.middleware.app_key ] return await get_postgres_pool(config)
[docs]class PostgresMiddleware(DMiddleware): """ :type pool: aiopg.Pool """
[docs] def __init__(self, app, config: PostgresConfig, pool=None): = app self.config = config self.pool = pool, self._setup), self._cleanup)
async def _setup(self, _): if not self.pool: self.pool = await get_postgres_pool(self.config)[ self.config.postgres.middleware.app_key ] = self.pool async def _cleanup(self, _): self.pool.close() await self.pool.wait_closed() async def __call__(self, request: web.Request): request[self.config.postgres.middleware.request_key] = self.pool
[docs]class PostgresSessionBackend(SessionBackEnd): """ Session backend for PostgreSQL. :Tables: - ``session`` :Configuration: .. code-block:: toml [session] backend = 'PostgreSQL' :type pool: aiopg.Pool """
[docs] def __init__(self, app, config: PostgresConfig = None, pool=None): super().__init__(app) if not config: config = PostgresConfig() if app.config_path: config.read_file(app.config_path) self.config = config self.pool = pool from psycopg2.extras import Json self._json = Json self._insert_statement = _sql_formatter( _insert_session_statement, schema=config.postgres.session.schema_name, table=config.postgres.session.table_name, ) self._update_statement = _sql_formatter( _update_session_statement, schema=config.postgres.session.schema_name, table=config.postgres.session.table_name, ) self._get_statement = _sql_formatter( _get_session_value_statement, schema=config.postgres.session.schema_name, table=config.postgres.session.table_name, ) self._delete_statement = _sql_formatter( _delete_session_value_statement, schema=config.postgres.session.schema_name, table=config.postgres.session.table_name, ), self._setup)
async def _setup(self, _): conf = self.config if not self.pool: self.pool = await _get_pool_from_app(, conf) async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( _table_exists_statement, [ conf.postgres.session.schema_name, conf.postgres.session.table_name, ] ) exists = await cursor.fetchone() if not exists[0]:'Create session table.') await cursor.execute( _sql_formatter( _create_session_table_statement, schema=conf.postgres.session.schema_name, table=conf.postgres.session.table_name, ), ) await cursor.execute(_trigger_func_statement) await cursor.execute( _sql_formatter( _trigger_statement, schema=conf.postgres.session.schema_name, table=conf.postgres.session.table_name, ), )
[docs] async def set(self, session_id: str, key: str, value: Any): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: try: f'Set session {session_id}: {key}: {value}') await cursor.execute( self._update_statement, [ [key], self._json(value), session_id ] ) except Exception as err: raise err
[docs] async def get(self, session_id: str, key: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: try: await cursor.execute( self._get_statement, [key, session_id] ) value = await cursor.fetchone() if value: v = value[0] if v is None: return UNDEFINED return v except Exception as err: raise err return UNDEFINED
[docs] async def delete(self, session_id: str, key: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: try: await cursor.execute( self._delete_statement, [key, session_id] ) except Exception as err: raise err
[docs] async def on_new_session(self, session_id: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: try: await cursor.execute( self._insert_statement, [session_id] ) except Exception as err: raise err
[docs]class PostgresAuthenticator(Authenticator): """ Authenticator for PostgreSQL. Encryption of user passwords with scrypt. :Tables: - ``users`` - ``role`` - ``user_roles`` :Configuration: .. code-block:: toml [authentication] authenticator = 'dazzler.contrib.postgresql:PostgresAuthenticator' [postgres] dsn = 'host=localhost port=5432 dbname=mydb' [postgres.auth] schema_name = 'public' user_table_name = 'users' :type pool: aiopg.Pool """
[docs] def __init__(self, app, pool=None): super().__init__(app) config = PostgresConfig() if os.path.exists(app.config_path): config.read_file(app.config_path) if not any(isinstance(x, PostgresMiddleware) for x in app.middlewares): # Using another session backend, need to insert the middleware. app.middlewares.insert( 0, PostgresMiddleware(app, config, pool) ) self.config = config self.pool = pool from psycopg2.extras import Json, NamedTupleCursor self._json = Json self._cursor_factory = NamedTupleCursor self._get_user_statement = _sql_formatter( _get_user_statement, schema=config.postgres.auth.schema_name, user=config.postgres.auth.user_table_name, user_roles=config.postgres.auth.user_roles_table_name, role=config.postgres.auth.role_table_name, ) self._insert_user_statement = _sql_formatter( _insert_user_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.user_table_name, ) self._get_user_pw_statement = _sql_formatter( _user_pw_select_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.user_table_name, ) self._insert_role_statement = _sql_formatter( _insert_role_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.role_table_name, ) self._insert_user_roles_statement = _sql_formatter( _insert_user_role_statement, schema=config.postgres.auth.schema_name, user_roles=config.postgres.auth.user_roles_table_name, role=config.postgres.auth.role_table_name, users=config.postgres.auth.user_table_name, ), self.setup)
[docs] async def setup(self, _): conf = self.config.postgres if not self.pool: self.pool = await _get_pool_from_app(, self.config) # setup tables async with self.pool.acquire() as conn: async with conn.cursor() as cursor: exists = await _table_exists( cursor, conf.auth.schema_name, conf.auth.user_table_name ) if exists: return f'Creating auth tables, schema: {conf.auth.schema_name}' ) await cursor.execute(_trigger_func_statement) await cursor.execute( _sql_formatter( _create_user_table_statement, schema=conf.auth.schema_name, table=conf.auth.user_table_name, ), ) await cursor.execute( _sql_formatter( _trigger_statement, schema=conf.auth.schema_name, table=conf.auth.user_table_name, ), ) await cursor.execute( _sql_formatter( _create_role_table_statement, schema=conf.auth.schema_name, table=conf.auth.role_table_name ) ) await cursor.execute( _sql_formatter( _create_user_roles_table_statement, schema=conf.auth.schema_name, user_roles=conf.auth.user_roles_table_name, users=conf.auth.user_table_name, role=conf.auth.role_table_name, ) ) f'Creating user roles: {conf.auth.roles}' ) # Seems there is no support for executemany in aiopg for role in conf.auth.roles: await cursor.execute( self._insert_role_statement, [role, f'default {role} role'] )
[docs] async def authenticate(self, username: str, password: str): async with self.pool.acquire() as conn: async with conn.cursor( cursor_factory=self._cursor_factory ) as cursor: await cursor.execute( self._get_user_pw_statement, [username] ) userdata = await cursor.fetchone() if not userdata: return encrypted = await hashlib.scrypt, password.encode(), n=128, r=64, p=1, salt=userdata.salt ) valid = secrets.compare_digest(encrypted, userdata.password) if valid: # this only set the session, # no need for the other attributes return User(username)
[docs] async def get_user(self, username: str): if not username: return async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(self._get_user_statement, [username]) userdata = await cursor.fetchone() return User( username, email=userdata[1], metadata=userdata[2], roles=userdata[3], )
[docs] async def register_user( self, username: str, password: str, email: str = None, fields: dict = None, ): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: try: salt = self._gen_salt() encrypted = await self._encrypt(password.encode(), salt) await cursor.execute( self._insert_user_statement, [ username, email, encrypted, salt, self._json(fields or {}) ] ) for role in self.config.postgres.auth.default_user_roles: await cursor.execute( self._insert_user_roles_statement, [role, username] ) return None except Exception as err: # pylint: disable=broad-except return traceback.format_exc()
@staticmethod def _gen_salt(): return secrets.randbits(128).to_bytes(16, sys.byteorder) async def _encrypt( self, value: bytes, salt: bytes ): return await hashlib.scrypt, value, salt=salt, n=self.config.postgres.auth.encryption.cost_factor, r=self.config.postgres.auth.encryption.blocksize, p=self.config.postgres.auth.encryption.parallelism, maxmem=self.config.postgres.auth.encryption.maxmem, dklen=self.config.postgres.auth.encryption.dklen, )
[docs]class PostgresUserAdminPage(UserAdminPage): """ Implementation of UserAdminPage for PostgreSQL. :Configuration: .. code-block:: toml [authentication.admin] enable = true page_ref = 'dazzler.contrib.postgresql:PostgresUserAdminPage' :type pool: aiopg.Pool """
[docs] def __init__(self, app, config: PostgresConfig = None, **kwargs): super().__init__(app, **kwargs) self.config = config if not config: self.config = config = PostgresConfig() if app.config_path: self.config.read_file(app.config_path) self.pool = None self._get_users_statement = _sql_formatter( _admin_get_users_statement, schema=config.postgres.auth.schema_name, users=config.postgres.auth.user_table_name, role=config.postgres.auth.role_table_name, user_roles=config.postgres.auth.user_roles_table_name, ) self._get_user_count_statement = _sql_formatter( _admin_get_users_count_statement, schema=config.postgres.auth.schema_name, users=config.postgres.auth.user_table_name, role=config.postgres.auth.role_table_name, user_roles=config.postgres.auth.user_roles_table_name, ) self._get_roles_statement = _sql_formatter( _admin_get_roles_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.role_table_name, ) self._update_active_user_statement = _sql_formatter( _admin_update_active_user_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.user_table_name, ) self._insert_role_statement = _sql_formatter( _admin_insert_role_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.role_table_name, ) self._delete_role_statement = _sql_formatter( _admin_delete_role_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.role_table_name, ) self._delete_user_roles_statement = _sql_formatter( _admin_delete_user_roles_statement, schema=config.postgres.auth.schema_name, user_roles=config.postgres.auth.user_roles_table_name, role=config.postgres.auth.role_table_name, users=config.postgres.auth.user_table_name, ) self._insert_user_roles_statement = _sql_formatter( _admin_insert_user_roles_statement, schema=config.postgres.auth.schema_name, user_roles=config.postgres.auth.user_roles_table_name, role=config.postgres.auth.role_table_name, users=config.postgres.auth.user_table_name, ) self._delete_user_statement = _sql_formatter( _admin_delete_user_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.user_table_name, ) self._update_role_description_statement = _sql_formatter( _admin_update_role_description_statement, schema=config.postgres.auth.schema_name, table=config.postgres.auth.role_table_name, )
[docs] async def setup(self, _): if not self.pool: self.pool = await _get_pool_from_app(, self.config)
@staticmethod def _build_user_filter(filters): where = '' args = [] if filters: username = filters.get('username') if username: where = 'WHERE u.username ~* %s' args.append(username) user_roles = filters.get('user_roles') if user_roles and len(user_roles): where += _add_where(where) + ' = any(%s)' args.append(user_roles) active = filters.get('active', UNDEFINED) if active is not UNDEFINED: where += _add_where(where) + ' = %s' args.append(active) return where, args
[docs] async def get_users(self, offset, limit, filters=None): where, args = self._build_user_filter(filters) statement = _sql_formatter(self._get_users_statement, where=where) async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( statement, args + [limit, offset] ) return [ AdminUser(x[0], x[1], [y for y in x[2] if y]) for x in await cursor.fetchall() ]
[docs] async def get_user_count(self, filters=None): where, args = self._build_user_filter(filters) statement = _sql_formatter(self._get_user_count_statement, where=where) async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(statement, args) result = await cursor.fetchone() return result[0]
[docs] async def get_roles(self): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(self._get_roles_statement) return [ AdminRole(*role) for role in await cursor.fetchall() ]
[docs] async def toggle_active_user(self, username: str, active: bool): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._update_active_user_statement, [active, username] )
[docs] async def change_user_roles(self, username: str, roles: typing.List[str]): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._delete_user_roles_statement, [roles, username] ) await cursor.execute( self._insert_user_roles_statement, [username, roles] )
[docs] async def delete_user(self, username: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._delete_user_statement, [username] )
[docs] async def create_role(self, role: str, description: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._insert_role_statement, [role, description or None] )
[docs] async def delete_role(self, role: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._delete_role_statement, [role] )
[docs] async def update_role_description(self, role: str, description: str): async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( self._update_role_description_statement, [description, role] )