import asyncio
import functools
from typing import Optional, Callable, Awaitable, List
from urllib.parse import quote
from aiohttp import web
from ._undefined import UNDEFINED
from ._middleware import Middleware
from ._page import Page
from ..events import DAZZLER_SETUP
def _default_page(
default_redirect,
page_title='Login',
form_header='Sign In',
page_url='/auth',
) -> Page:
from dazzler.components import core, auth
async def layout(request: web.Request):
next_url = request.query.get('next_url') or default_redirect
error = request.query.get('err')
if request.get('user'):
return core.Text('Already signed on!')
footer = core.Container(
'Invalid credentials',
style={'color': 'red', 'padding': '0.5rem'},
identity='login-error'
) if error else UNDEFINED
return core.Container([
auth.Login(
'/auth/login',
method='post',
identity='login-form',
next_url=next_url,
bordered=True,
header=core.Html('h2', form_header),
footer=footer,
style={
'padding': '2rem',
}
),
], style={
'display': 'flex',
'justifyContent': 'center',
'alignItems': 'center',
'width': '100%',
'marginTop': '4rem'
})
page = Page(
__name__, layout,
packages=['dazzler_core', 'dazzler_auth'],
title=page_title,
url=page_url,
)
return page
[docs]def build_register_page(
register_handler,
title='New User',
include_email=False,
page_name='register',
page_url='/auth/register',
custom_fields=None,
submit_label='Register',
username_pattern=r'[\w\d\-_]+'
) -> Page:
from dazzler.components import core, html
from dazzler.presets import PresetColor
fields = [
{
'name': 'username',
'label': 'Username',
'input_props': {
'required': True,
'minLength': 2,
'maxLength': 100,
'pattern': username_pattern,
}
},
{
'name': 'password',
'label': 'Password',
'type': 'password',
'input_props': {
'required': True,
'minLength': 8,
'maxLength': 256
}
}
]
if include_email:
fields.append({
'name': 'email',
'label': 'Email',
'type': 'email',
'input_props': {
'required': True,
}
})
fields = fields + (custom_fields or [])
submit_url = f'{page_url}/submit'
async def layout(request: web.Request):
error = request.query.get('error')
if request.get('user'):
return core.Text(
'Already registered!', font_weight='bold'
)
if error:
footer = core.Panel(
content=core.Text(
error,
style={'color': 'red', 'padding': '0.5rem'}
),
title_background=PresetColor.DANGER_DARK,
title_color='neutral',
title='Error',
preset_background='neutral-light'
)
else:
footer = UNDEFINED
return core.Box(core.Container(
[
core.Form(
fields=fields,
header=html.H2(title),
submit_label=submit_label,
footer=footer,
action=submit_url,
method='post',
preset_background=PresetColor.NEUTRAL,
bordered=True,
rounded=True,
stacked=True,
),
],
width='80%',
padding_top='4rem'
),
justify='center',
# preset_background=PresetColor.NEUTRAL_DARK,
min_height='100vh',
height='100%'
)
page = Page(
page_name,
layout,
packages=['dazzler_core', 'dazzler_html'],
url=page_url,
title=title,
)
page.route('/submit', method='post')(register_handler)
return page
[docs]class User:
"""
Base user of the dazzler auth system.
"""
username: str
roles: Optional[List[str]]
email: Optional[str]
metadata: Optional[dict]
[docs] def __init__(
self,
username: str,
roles: List[str] = None,
email: str = None,
metadata: dict = None,
):
self.username = username
self.roles = roles or []
self.email = email
self.metadata = metadata
[docs]class AuthBackend:
"""
Handle the request part of authentication protocols.
"""
[docs] async def is_authenticated(self, request: web.Request) -> bool:
"""
If the request is authenticated.
:param request: The request to verify if authentication credentials
are provided.
:return: Whether the user is authenticated.
"""
raise NotImplementedError
[docs] async def login(
self, user: User, request: web.Request, response: web.Response
):
"""
Do what it takes to store the login info after being successfully
authenticated by the authenticator.
:param user: The authenticated user
:param request: Incoming request.
:param response: A redirect response if needed to set cookies.
:return:
"""
raise NotImplementedError
[docs] async def logout(
self, user: User, request: web.Request, response: web.Response
):
"""
Delete what makes this user authenticated in the system.
:param user: User to logout.
:param request: Incoming request.
:param response: A redirect response if need to clear cookies.
:return:
"""
raise NotImplementedError
[docs] async def get_username(self, request: web.Request) -> str:
"""
Get the username from the request.
:param request: Incoming request.
:return: Retrieved username.
"""
raise NotImplementedError
[docs]class AuthSessionBackend(AuthBackend):
"""Auth Backend integrated with the session system."""
[docs] async def is_authenticated(self, request: web.Request) -> bool:
username = await request['session'].get('username')
return username is not UNDEFINED
[docs] async def login(
self, user: User,
request: web.Request,
response: web.Response
):
session = request['session']
await session.set('username', user.username)
[docs] async def logout(self, user, request, response):
session = request['session']
await session.delete('username')
[docs] async def get_username(self, request: web.Request) -> str:
return await request['session'].get('username')
[docs]class Authenticator:
"""
Base authenticator, a subclass of this must be presented to
DazzlerAuth init in order to provide authentication for an app.
:type app: dazzler.Dazzler
"""
app = None
[docs] def __init__(self, app):
self.app = app
[docs] async def authenticate(self, username: str, password: str) \
-> Optional[User]:
"""
For login purpose.
:param username:
:param password:
:return:
"""
raise NotImplementedError
[docs] async def get_user(self, username: str) -> User:
"""
Retrieve a user by it's username.
:param username:
:return:
"""
raise NotImplementedError
# pylint: disable=no-self-use
[docs] async def authorize(self, user: User, page: Page) -> bool:
"""
Implement to authorize on a page basis.
:param user:
:param page:
"""
return len(set(user.roles).intersection(page.authorizations)) > 0
[docs] async def register_user(
self,
username: str,
password: str,
email: str = None,
fields: dict = None,
) -> Optional[str]:
"""
Register an user on form submit.
Return a string as an error, otherwise the operation is successful.
"""
[docs]class AuthMiddleware(Middleware):
"""
Add the user if authenticated to the request object.
"""
[docs] def __init__(self, app, auth):
self.app = app
self.auth = auth
async def __call__(
self, request: web.Request
) -> Optional[Callable[[web.Response], Awaitable]]:
if await self.auth.backend.is_authenticated(request):
request['user'] = await self.auth.authenticator.get_user(
(await self.auth.backend.get_username(request))
)
return None
[docs]class DazzlerAuth:
"""
Handle the logic for page authentication.
Requires an authenticator to provide the end user authentication method.
Default to session backend, make sure a session middleware is present.
Activates in the configs with:
[authentication]
enable = True
authenticator = "module.submodule:AuthenticatorClass"
"""
[docs] def __init__(
self, app,
authenticator: Authenticator,
backend: AuthBackend = None,
login_page: Page = None,
default_redirect: str = None,
register_page: Page = None
):
"""
:param app:
:type app: dazzler.Dazzler
:param backend:
"""
self.app = app
self.backend = backend or AuthSessionBackend()
self.authenticator = authenticator
self.authenticator.app = app
self.app.middlewares.append(AuthMiddleware(app, self))
self.login_page = login_page or _default_page(
default_redirect,
page_title=app.config.authentication.login.page_title,
page_url=app.config.authentication.login.page_url,
form_header=app.config.authentication.login.form_header,
)
app.server.route_page = self.require_page_login(app.server.route_page)
app.server.route_page_json = self.require_page_login(
app.server.route_page_json, redirect=False,
)
app.server.route_update = self.require_page_login(
app.server.route_update, redirect=False,
)
app.server.route_call = self.require_page_login(
app.server.route_call, redirect=False,
)
self.login_page.route('/login', method='post')(self.login)
self.login_page.route('/logout', method='post')(self.logout)
self.logout_url = f'{self.login_page.url}/logout'
app.add_page(self.login_page)
if app.config.authentication.register.enable:
register = app.config.authentication.register
self.register_page = register_page or build_register_page(
self.register,
page_name=register.page_name,
page_url=register.page_url,
custom_fields=self._get_custom_fields(),
include_email=register.require_email,
)
app.add_page(self.register_page)
app.events.subscribe(DAZZLER_SETUP, self._setup)
[docs] async def login(self, request: web.Request):
data = await request.post()
next_url = data.get('next_url')
username = data.get('username')
password = data.get('password')
login_url = request.app.router[self.login_page.name].url_for()
if not next_url:
next_url = self.app.config.authentication.login.default_redirect
response = web.HTTPSeeOther(location=next_url)
if not username or not password:
raise web.HTTPFound(
location=f'{login_url}?next_url={quote(next_url)}&err=1'
)
user = await self.authenticator.authenticate(username, password)
if user:
await self.backend.login(user, request, response)
raise response
# Cheap throttling on failures.
await asyncio.sleep(0.25)
if self.login_page:
raise web.HTTPFound(
location=f'{login_url}?next_url={quote(next_url)}&err=1'
)
raise web.HTTPUnauthorized()
[docs] async def logout(self, request: web.Request):
data = await request.post()
next_url = data.get('next_url')
response = web.HTTPSeeOther(location=next_url)
user = await self.authenticator.get_user(
(await self.backend.get_username(request))
)
await self.backend.logout(user, request, response)
raise response
[docs] async def register(self, request: web.Request):
data = await request.post()
username = data.get('username')
password = data.get('password')
if not username or not password:
raise web.HTTPSeeOther(
location=f'{self.register_page.url}?'
f'error={quote("Please fill username and password")}',
)
email = data.get('email')
fields = {
field['name']: data.get(field['name'])
for field in self._get_custom_fields()
}
if username.lower() in \
self.app.config.authentication.register.reserved_usernames:
raise web.HTTPSeeOther(
location=f'{self.register_page.url}'
f'?error={quote("Invalid username")}',
)
self.app.logger.debug(f'Register user: {username}')
error = await self.authenticator.register_user(
username, password, email, fields
)
if not error:
response = web.HTTPSeeOther(
location=self.app.config.authentication.register.next_url
)
# login on the backend after registering.
await self.backend.login(User(username), request, response)
raise response
error_message = 'Invalid user information'
if self.app.config.debug:
# Only show error info if not in production.
error_message = error
raise web.HTTPSeeOther(
location=f'{self.register_page.url}?error={quote(error_message)}',
)
[docs] def require_page_login(self, func, redirect=True, handle_page=True):
@functools.wraps(func)
async def auth_page_wrapper(request: web.Request, page: Page):
if page.require_login:
if not await self.backend.is_authenticated(request):
if self.login_page and redirect:
url = str(
request.app.router[self.login_page.name].url_for()
)
next_url = str(request.url)
raise web.HTTPFound(
location=f'{url}?next_url={quote(next_url)}'
)
raise web.HTTPUnauthorized()
if page.authorizations:
authorized = False
user = request.get('user')
if user:
authorized = await self.authenticator.authorize(
user,
page
)
if not authorized:
raise web.HTTPForbidden
if handle_page:
return await func(request, page)
return await func(request)
return auth_page_wrapper
def _get_custom_fields(self):
return [
{'name': name, 'label': label, 'type': field_type}
for name, label, field_type
in self.app.config.authentication.register.custom_fields
]
async def _setup(self, _):
# Wrap all pages routes with a login required.
for page in self.app.pages.values():
if not page.require_login:
continue
for route in page.routes:
route.handler = functools.partial(
self.require_page_login(route.handler, handle_page=False),
page=page,
)