from typing import (
Callable,
Coroutine,
Iterable,
AsyncIterator,
TypeVar,
Awaitable,
Deque,
Tuple,
Any,
)
from functools import wraps
from collections import deque
from random import randint
T = TypeVar("T")
async def asyncify(iterable: Iterable[T]) -> AsyncIterator[T]:
"""
Convert an iterable into an async iterable
This is intended to sequence literals like lists to `async` iterators
in order to force usage of `async` code paths. There is no functional
or other advantage otherwise.
"""
for value in iterable:
yield value
def awaitify(call: Callable[..., T]) -> Callable[..., Awaitable[T]]:
"""
Convert a callable (`foo()`) into an async callable (`await foo()`)
This is intended to convert `lambda` expressions to `async` functions
in order to force usage of `async` code paths. There is no functional
or other advantage otherwise.
"""
async def await_wrapper(*args: Any, **kwargs: Any) -> T:
return call(*args, **kwargs)
return await_wrapper
[docs]
class Schedule:
r"""
Signal to the event loop to adopt and run new coroutines
:param coros: The coroutines to start running
In order to communicate with the event loop and start the coroutines,
the :py:class:`Schedule` must be `await`\ ed.
"""
def __init__(self, *coros: Coroutine[Any, Any, Any]):
self.coros = coros
def __await__(self):
yield self
[docs]
class Switch:
"""
Signal to the event loop to run another coroutine
Pauses the coroutine but immediately continues after
all other runnable coroutines of the event loop.
This is similar to the common ``sleep(0)`` function
of regular event loop frameworks.
If a single argument is given, this specifies how many
turns should be skipped. The default corresponds to `0`.
If two arguments are given, this is interpreted as an
inclusive interval to randomly select the skip count.
"""
def __init__(self, skip: int = 0, limit: int = 0, /) -> None:
if limit <= 0:
self._idle_count = skip
else:
self._idle_count = randint(skip, limit)
def __await__(self):
yield self
for _ in range(self._idle_count):
yield self
[docs]
class Lock:
"""Simple lock for exclusive access"""
def __init__(self):
self._owned = False
self._waiting: list[object] = []
async def __aenter__(self):
if self._owned:
# wait until it is our turn to take the lock
token = object()
self._waiting.append(token)
# a spin-lock should be fine since tests are short anyways
while self._owned or self._waiting[0] is not token:
await Switch()
# we will take the lock now, remove our wait claim
self._waiting.pop(0)
self._owned = True
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any):
self._owned = False
[docs]
def sync(test_case: Callable[..., Coroutine[None, Any, Any]], /) -> Callable[..., None]:
"""
Mark an ``async def`` test case to be run synchronously with children
This provides a primitive "event loop" which only responds
to :py:class:`Schedule`, :py:class:`Switch` and :py:class:`Lock`.
It should be applied as a decorator on an ``async def`` function, which
is then turned into a synchronous callable that will run the ``async def``
function and all tasks it spawns.
Other decorators, most prominently :py:func:`pytest.mark.parametrize`,
can be applied around it.
"""
@wraps(test_case)
def run_sync(*args: Any, **kwargs: Any):
run_queue: Deque[Tuple[Coroutine[Any, Any, Any], Any]] = deque()
run_queue.append((test_case(*args, **kwargs), None))
while run_queue:
coro, event = run_queue.popleft()
try:
event = coro.send(event)
except StopIteration as e:
result = e.args[0] if e.args else None
assert result is None, f"got '{result!r}' expected 'None'"
else:
if isinstance(event, Schedule):
run_queue.extend((new_coro, None) for new_coro in event.coros)
run_queue.append((coro, event))
elif isinstance(event, Switch):
run_queue.append((coro, event))
else: # pragma: no cover
raise RuntimeError(
f"test case {test_case} yielded an unexpected event {event}"
)
return run_sync