Source code for asyncstdlib.asynctools

from asyncio import iscoroutinefunction
from functools import wraps
from typing import (
    Union,
    AsyncIterator,
    TypeVar,
    AsyncGenerator,
    Iterable,
    Awaitable,
    AsyncIterable,
    Callable,
    Any,
    overload,
    Optional,
)

from ._typing import AsyncContextManager, T, T1, T2, T3, T4, T5, AnyIterable
from ._core import aiter
from .contextlib import nullcontext


S = TypeVar("S")


class _BorrowedAsyncIterator(AsyncGenerator[T, S]):
    """
    Borrowed async iterator/generator, preventing to ``aclose`` the ``iterable``
    """

    # adding special methods such as `__aiter__` as `__slots__` allows to set them
    # on the instance: the interpreter expects *descriptors* not methods, and
    # `__slots__` are descriptors just like methods.
    __slots__ = "__wrapped__", "__aiter__", "__anext__", "asend", "athrow"

    # Type checker does not understand `__slot__` definitions
    __aiter__: Callable[[Any], AsyncGenerator[T, S]]
    __anext__: Callable[[Any], Awaitable[T]]
    asend: Any
    athrow: Any

    def __init__(self, iterator: Union[AsyncIterator[T], AsyncGenerator[T, S]]):
        self.__wrapped__ = iterator
        # iterator.__aiter__ is likely to return iterator (e.g. for async def: yield)
        # We wrap it in a separate async iterator/generator to hide its __aiter__.
        try:
            wrapped_iterator: AsyncGenerator[T, S] = self._wrapped_iterator(iterator)
            self.__anext__ = iterator.__anext__  # type: ignore
            self.__aiter__ = wrapped_iterator.__aiter__  # type: ignore
        except (AttributeError, TypeError):
            raise TypeError(
                "borrowing requires an async iterator "
                + f"with __aiter__ and __anext__ method, got {type(iterator).__name__}"
            ) from None
        self.__anext__ = wrapped_iterator.__anext__  # type: ignore
        # Our wrapper cannot pass on asend/athrow without getting much heavier.
        # Since interleaving anext/asend/athrow is not allowed, and the wrapper holds
        # no internal state other than the iterator, circumventing it should be fine.
        if hasattr(iterator, "asend"):
            self.asend = iterator.asend  # type: ignore
        if hasattr(iterator, "athrow"):
            self.athrow = iterator.athrow  # type: ignore

    # Py3.6 compatibility
    # Use ``(item async for item in iterator)`` inside
    # ``__init__`` when no longer needed.
    @staticmethod
    async def _wrapped_iterator(
        iterator: Union[AsyncIterator[T], AsyncGenerator[T, S]]
    ) -> AsyncGenerator[T, S]:
        async for item in iterator:
            yield item

    def __repr__(self):
        return f"<asyncstdlib.borrow of {self.__wrapped__!r} at 0x{(id(self)):x}>"

    async def _aclose_wrapper(self):
        wrapper_iterator = self.__aiter__()
        # allow closing the intermediate wrapper
        # this prevents a resource warning if the wrapper is GC'd
        # the underlying iterator is NOT affected by this
        await wrapper_iterator.aclose()
        # disable direct asend/athrow to the underlying iterator
        if hasattr(self, "asend"):
            self.asend = wrapper_iterator.asend
        if hasattr(self, "athrow"):
            self.athrow = wrapper_iterator.athrow

    def aclose(self):
        return self._aclose_wrapper()


class _ScopedAsyncIterator(_BorrowedAsyncIterator[T, S]):
    __slots__ = ()

    def __repr__(self):
        return f"<asyncstdlib.scoped_iter of {self.__wrapped__!r} at 0x{(id(self)):x}>"

    async def aclose(self):
        pass


class _ScopedAsyncIteratorContext(AsyncContextManager[AsyncIterator[T]]):
    """
    Context restricting the lifetime of ``iterator`` to the context scope

    This is an internal helper that relies on ``iterator`` belonging to the scope
    and having an ``aclose`` method.
    """

    __slots__ = "_borrowed_iter", "_iterator"

    def __init__(self, iterator: AsyncIterator[T]):
        self._iterator: AsyncIterator[T] = iterator
        self._borrowed_iter: Optional[_ScopedAsyncIterator[T, Any]] = None

    async def __aenter__(self) -> AsyncIterator[T]:
        if self._borrowed_iter is not None:
            raise RuntimeError("scoped_iter is not re-entrant")
        borrowed_iter = self._borrowed_iter = _ScopedAsyncIterator(self._iterator)
        return borrowed_iter

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self._borrowed_iter._aclose_wrapper()  # type: ignore
        await self._iterator.aclose()  # type: ignore
        return False

    def __repr__(self):
        return f"<{self.__class__.__name__} of {self._iterator!r} at 0x{(id(self)):x}>"


[docs]def borrow(iterator: AsyncIterator[T]) -> _BorrowedAsyncIterator[T, None]: """ Borrow an async iterator, preventing to ``aclose`` it When :term:`borrowing` an async iterator, the original owner assures to close the iterator as needed. In turn, the borrowed iterator does not allow closing the underlying iterator. The borrowed iterator supports :py:meth:`~agen.asend` and :py:meth:`~agen.athrow` if the underlying iterator supports them as well; this allows borrowing either an :py:class:`~collections.abc.AsyncIterator` or :py:class:`~collections.abc.AsyncGenerator`. Regardless of iterator type, :py:meth:`~agen.aclose` is always provided; it closes only the borrowed iterator, not the underlying iterator. .. seealso:: Use :py:func:`~.scoped_iter` to ensure an (async) iterable is eventually closed and only :term:`borrowed <borrowing>` until then. """ return _BorrowedAsyncIterator(iterator)
[docs]def scoped_iter(iterable: AnyIterable[T]): """ Context manager that provides an async iterator for an (async) ``iterable`` Roughly equivalent to combining :py:func:`~asyncstdlib.builtins.iter` with :py:class:`~asyncstdlib.contextlib.closing`. The resulting :term:`asynchronous iterator` is automatically :term:`borrowed <borrowing>` to prevent premature closing when passing the iterator around. .. code-block:: python3 from collections import deque import asyncstdlib as a async def head_tail(iterable, leading=5, trailing=5): '''Provide the first ``leading`` and last ``trailing`` items''' # create async iterator valid for the entire block async with scoped_iter(iterable) as async_iter: # ... safely pass it on without it being closed ... async for item in a.isclice(async_iter, leading): yield item tail = deque(maxlen=trailing) # ... and use it again in the block async for item in async_iter: tail.append(item) for item in tail: yield item Nested scoping of the same iterator is safe: inner scopes automatically forfeit closing the underlying iterator in favour of the outermost scope. This allows passing the scoped iterator to other functions that use :py:func:`scoped_iter`. """ # The iterable has already been borrowed. # Do not unwrap it to preserve method forwarding. if isinstance(iterable, (_BorrowedAsyncIterator, _ScopedAsyncIterator)): return _ScopedAsyncIteratorContext(iterable) iterator = aiter(iterable) # The iterable cannot be closed. # We do not need to take care of it. if not hasattr(iterator, "aclose"): return nullcontext(iterator) return _ScopedAsyncIteratorContext(iterator)
[docs]async def await_each(awaitables: Iterable[Awaitable[T]]) -> AsyncIterable[T]: """ Iterate through ``awaitables`` and await each item This converts an *iterable of async* into an *async iterator* of awaited values. Consequently, we can apply various functions made for ``AsyncIterable[T]`` to ``Iterable[Awaitable[T]]`` as well. Example: .. code-block:: python3 import asyncstdlib as a async def check1() -> bool: ... async def check2() -> bool: ... async def check3() -> bool: ... okay = await a.all( a.await_each([check1(), check2(), check3()]) ) """ for awaitable in awaitables: yield await awaitable
@overload async def apply( __func: Callable[[T1], T], __arg1: Awaitable[T1], ) -> T: ... @overload async def apply( __func: Callable[[T1, T2], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], ) -> T: ... @overload async def apply( __func: Callable[[T1, T2, T3], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], __arg3: Awaitable[T3], ) -> T: ... @overload async def apply( __func: Callable[[T1, T2, T3, T4], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], __arg3: Awaitable[T3], __arg4: Awaitable[T4], ) -> T: ... @overload async def apply( __func: Callable[[T1, T2, T3, T4, T5], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], __arg3: Awaitable[T3], __arg4: Awaitable[T4], __arg5: Awaitable[T5], ) -> T: ... @overload async def apply( __func: Callable[..., T], __arg1: Awaitable[Any], __arg2: Awaitable[Any], __arg3: Awaitable[Any], __arg4: Awaitable[Any], __arg5: Awaitable[Any], *args: Awaitable[Any], **kwargs: Awaitable[Any], ) -> T: ... @overload async def apply( __func: Callable[..., T], **kwargs: Awaitable[Any], ) -> T: ...
[docs]async def apply( __func: Callable[..., T], *args: Awaitable[Any], **kwargs: Awaitable[Any] ) -> T: """ Await the arguments and keyword arguments and then apply ``func`` on them Example: .. code-block:: python3 async def compute_something() -> float: ... async def compute_something_else() -> float: ... result = await apply( lambda x, y: x ** y, compute_something(), compute_something_else()) The function ``apply`` serves, for example, a practical use case when you want to chain operations on awaitables and need to pass around the final awaitable for further operations. """ return __func( *[await arg for arg in args], **{k: await arg for k, arg in kwargs.items()} )
@overload def sync(function: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: ... @overload def sync(function: Callable[..., T]) -> Callable[..., Awaitable[T]]: ...
[docs]def sync(function: Callable) -> Callable[..., Awaitable[T]]: r""" Wraps a callable to ensure its result can be ``await``\ ed Useful to write :term:`async neutral` functions by wrapping callable arguments, or to use synchronous functions where asynchronous ones are expected. Wrapping a regular function defined using ``def`` or ``lambda`` makes it behave roughly as if it were defined using ``async def`` instead. Example: .. code-block:: python3 import asyncstdlib as a def test1_sync(x, y): ... async def test1_async(x): ... async def main(): await a.sync(test1_sync)(x=1, y=2) await a.sync(test1_async)(x=8) await a.sync(lambda x: x ** 3)(x=5) if __name__ == "__main__": asyncio.run(main()) """ if not callable(function): raise TypeError("function argument should be Callable") if iscoroutinefunction(function): return function @wraps(function) async def async_wrapped(*args, **kwargs): result = function(*args, **kwargs) if isinstance(result, Awaitable): return await result return result return async_wrapped