"""Aspects Binding"""
import functools
import typing
import uuid
import asyncio
from aiohttp import web
from .session import Session
from ._undefined import UNDEFINED
from ._component import prepare_aspects
from ._package import Package
from ..errors import TriggerLoopError, GetAspectError, BindingError
__all__ = [
'Trigger', 'State', 'Target', 'BoundAspect', 'Binding', 'BindingContext',
'CallContext', 'coerce_binding'
]
def is_component(aspect):
return isinstance(aspect, dict) \
and 'identity' in aspect \
and 'aspects' in aspect \
and 'name' in aspect \
and 'package' in aspect
def hydrate(aspect):
if is_component(aspect):
pack = Package.package_registry[aspect['package']]
cls = pack.components[aspect['name']]
return cls(**hydrate(aspect['aspects']), identity=aspect['identity'])
if isinstance(aspect, dict):
data = {}
for key, value in aspect.items():
data[key] = hydrate(value)
return data
if isinstance(aspect, list):
return [hydrate(x) for x in aspect]
return aspect
class BoundValue(typing.NamedTuple):
identity: str
aspect: str
value: typing.Any
class _Bind:
def __init__(self, identity: str, aspect: str, regex=False):
"""
:param identity: The identity of the component to bind.
:param aspect: Aspect name to trigger/state.
:param regex: Identity and aspects are converted into regex and
matched against for trigger and states.
"""
self.identity = identity
self.aspect = aspect
self.regex = regex
def __str__(self):
return f'{self.aspect}@{self.identity}'
def __repr__(self):
return f'<{self.__class__.__name__} {self}>'
def __eq__(self, other):
return isinstance(other, _Bind) and str(self) == str(other)
def __hash__(self):
return hash(str(self))
def prepare(self) -> dict:
"""
Prepare the trigger or state for serialization.
:return: Identity and aspect in a dict.
"""
return {
'identity': self.identity,
'aspect': self.aspect,
'regex': self.regex,
'key': str(self)
}
# pylint: disable=too-few-public-methods
[docs]class Trigger(_Bind):
"""Trigger the bound aspect callbacks."""
[docs] def __init__(
self,
identity: str,
aspect: str,
regex=False,
once=None,
skip_initial=False,
):
super().__init__(identity, aspect, regex)
self.once = once
self.skip_initial = skip_initial
[docs] def prepare(self) -> dict:
return {
**super().prepare(),
'once': self.once,
'skip_initial': self.skip_initial,
}
TriggerList = typing.List[Trigger]
# pylint: disable=too-few-public-methods
[docs]class State(_Bind):
"""Usable aspect value in bound aspect without trigger on change."""
# pylint: disable=too-few-public-methods
[docs]class Target(_Bind):
"""
A connection target that will be updated when
the trigger is updated on the frontend.
Target has shorthand syntax ``aspect@identity`` to use with
transforms.
"""
[docs] def __init__(self,
shorthand: str = None,
identity: str = None,
aspect: str = None,
regex: bool = False):
if shorthand and '@' in shorthand:
aspect, identity = shorthand.split('@')
elif not identity or not aspect:
raise BindingError(
f'Invalid Target arguments provided:'
f' short={shorthand} identity={identity} aspect={aspect}'
)
super().__init__(identity, aspect, regex)
StateList = typing.List[State]
[docs]class BoundAspect:
"""
A trigger aspect bound to a method intended to be called when changed.
Data holder for the handler, trigger and state.
"""
[docs] def __init__(
self,
handler,
trigger: typing.Union[TriggerList, Trigger],
states: StateList = None,
call: bool = False,
):
self.handler = handler
self.trigger = trigger
self.states = states or []
self.call = call
[docs] def prepare(self) -> list:
"""
Prepare the binding for serialization.
:return: list of dict with trigger and states definitions.
"""
return [
{
'trigger': trigger.prepare(),
'states': [state.prepare() for state in self.states],
'key': str(trigger),
'regex': trigger.regex,
'call': self.call,
} for trigger in self.triggers
]
@property
def triggers(self):
if isinstance(self.trigger, Trigger):
return [self.trigger]
return self.trigger
async def __call__(self, *args, **kwargs):
return await self.handler(*args, **kwargs)
def __get__(self, instance, owner):
if instance is None:
return self
# Patch self if used in a class.
return functools.partial(self.__call__, instance)
def __str__(self):
return str(self.trigger)
def __repr__(self):
return f'<BoundAspect {self}>'
def __eq__(self, other):
return isinstance(other, BoundAspect) and str(self) == str(other)
def __hash__(self):
return hash(str(self))
class BaseContext:
"""
The context in which the bound function execute.
:type auth: dazzler.system.auth.DazzlerAuth
:type user: dazzler.system.auth.User
"""
def __init__(
self,
identity: str,
request: web.Request,
trigger: BoundValue,
states: typing.Dict[str, dict],
):
self.identity = identity
self.request = request
self.trigger = trigger
self.states = states
self.dazzler = request.app['dazzler']
self.auth = self.dazzler.auth
self.user = request.get('user')
self.session: Session = request.get('session')
async def set_aspect(self, identity, **aspects):
raise NotImplementedError
[docs]class BindingContext(BaseContext):
"""
Context used by websockets binding, ``@page.bind``).
Can request aspects from the frontend and interact with
the ``WebStorage`` api.
"""
[docs] def __init__(
self,
identity: str,
request: web.Request,
trigger: BoundValue,
states: typing.Dict[str, BoundValue],
websocket: web.WebSocketResponse,
request_queue: asyncio.Queue,
create_task: typing.Callable
):
super().__init__(identity, request, trigger, states)
self.websocket = websocket
self.create_task = create_task
self._request_queue = request_queue
self._response_queue = asyncio.Queue()
[docs] async def set_aspect(self, identity, **aspects):
"""
Update aspects of a component on the front end.
:param identity: Identity of the component to update.
:param aspects: The aspects to set on the component.
:return:
"""
if identity == self.identity:
if any(x == self.trigger.aspect for x in aspects):
trigger_key = f'{self.trigger.identity}.{self.trigger.aspect}'
raise TriggerLoopError(
f'Setting the same aspect that triggered: {trigger_key}'
)
regex = isinstance(identity, typing.Pattern)
if regex:
identity = identity.pattern
await self.websocket.send_json({
'kind': 'set-aspect',
'identity': str(identity),
'regex': regex,
'payload': prepare_aspects(aspects)
})
[docs] async def get_aspect(self, identity: str, aspect: str):
"""
Request the value of an aspect from the frontend.
:param identity: Component to get aspect from.
:param aspect: Name of the aspect property.
:return:
"""
response_queue = asyncio.Queue()
await self._request_queue.put({
'request_id': uuid.uuid4().hex,
'queue': response_queue,
'identity': identity,
'aspect': aspect,
'kind': 'get-aspect'
})
value, error = await response_queue.get()
if value is UNDEFINED:
raise GetAspectError(
error or f'Undefined aspect {aspect}@{identity}'
)
return hydrate(value)
async def _get_storage(self, storage, identity):
response_queue = asyncio.Queue()
await self._request_queue.put({
'request_id': uuid.uuid4().hex,
'queue': response_queue,
'identity': identity,
'storage': storage,
'kind': 'get-storage'
})
value, error = await response_queue.get()
if error is not UNDEFINED:
raise error
return value
async def _set_storage(self, storage, identity, payload):
await self._request_queue.put({
'request_id': uuid.uuid4().hex,
'identity': identity,
'kind': 'set-storage',
'storage': storage,
'payload': payload
})
[docs] async def get_local_storage(self, identity):
return await self._get_storage('local', identity)
[docs] async def set_local_storage(self, identity, payload):
await self._set_storage('local', identity, payload)
[docs] async def get_session_storage(self, identity):
return await self._get_storage('session', identity)
[docs] async def set_session_storage(self, identity, payload):
await self._set_storage('session', identity, payload)
# Decorator
[docs]class Binding:
"""Bind a function to execute when the trigger aspect change."""
[docs] def __init__(
self,
trigger: Trigger,
states: StateList = None,
call: bool = False,
):
self.trigger = trigger
self.states = states or []
self.call = call
def __call__(self, func):
@functools.wraps(func)
async def bound(request, data, ws, request_queue, create_task):
trigger = BoundValue(
data['trigger']['identity'],
data['trigger']['aspect'],
hydrate(data['trigger'].get('value'))
)
states = {}
for state in data['states']:
component = states.setdefault(state['identity'], {})
component[state['aspect']] = hydrate(
state.get('value', UNDEFINED)
)
if self.call:
context = CallContext(
trigger.identity,
request,
trigger,
states,
)
else:
context = BindingContext(
trigger.identity,
request,
trigger,
states,
ws,
request_queue,
create_task
)
await func(context)
return context
return BoundAspect(bound, self.trigger, self.states, self.call)
def coerce_binding(value, binding_type: typing.Type = Trigger):
if isinstance(value, list):
return [coerce_binding(x, binding_type) for x in value]
if isinstance(value, binding_type):
return value
if isinstance(value, str):
splat = value.split('@')
if not len(splat) == 2:
raise BindingError(f'Invalid {binding_type.__name__}: {value}')
return binding_type(identity=splat[1], aspect=splat[0])
raise BindingError(f'Invalid {binding_type.__name__}: {value}')
[docs]class CallContext(BaseContext):
"""
Context of a call binding is used to set aspects on a request.
"""
[docs] def __init__(
self,
identity: str,
request: web.Request,
trigger: BoundValue,
states: typing.Dict[str, BoundValue],
):
super().__init__(identity, request, trigger, states)
self._output = {}
[docs] async def set_aspect(self, identity, **aspects):
self._output.setdefault(identity, {})
if identity == self.trigger.identity:
if self.trigger.aspect in set(aspects.keys()):
raise TriggerLoopError(
'Setting the same aspect that triggered: '
f'{self.trigger.aspect}@{identity}'
)
self._output[identity].update(aspects)