Source code for asyncstdlib.builtins

from typing import (
    Iterable,
    AsyncIterable,
    Union,
    AsyncIterator,
    TypeVar,
    Awaitable,
    Callable,
    Tuple,
    List,
    Set,
    Optional,
    Dict,
    Any,
    overload,
)
import builtins as _sync_builtins

from ._core import (
    aiter,
    AnyIterable,
    ScopedIter,
    awaitify as _awaitify,
    Sentinel,
)


T = TypeVar("T", contravariant=True)
K = TypeVar("K")
R = TypeVar("R", covariant=True)
# Variadic overloads
T1 = TypeVar("T1")
T2 = TypeVar("T2")
T3 = TypeVar("T3")
T4 = TypeVar("T4")
T5 = TypeVar("T5")

__ANEXT_DEFAULT = Sentinel("<no default>")


[docs]async def anext(iterator: AsyncIterator[T], default=__ANEXT_DEFAULT) -> T: """ Retrieve the next item from the async iterator :raises StopAsyncIteration: if ``iterator`` is exhausted and ``default`` is not set If ``default`` is given, it is returned if the ``iterator`` is exhausted. Otherwise, :py:exc:`StopAsyncIteration` is raised for an exhausted ``iterator``. .. note:: This function is not :term:`async neutral`. The ``iterator`` must be an :term:`asynchronous iterator`, i.e. support the :py:meth:`~object.__anext__` method. """ try: return await iterator.__anext__() except StopAsyncIteration: if default is __ANEXT_DEFAULT: raise return default
__ITER_DEFAULT = Sentinel("<no default>") @overload def iter(subject: AnyIterable[T]) -> AsyncIterator[T]: pass @overload def iter(subject: Callable[[], Awaitable[T]], sentinel: T) -> AsyncIterator[T]: pass
[docs]def iter( subject: Union[AnyIterable[T], Callable[[], Awaitable[T]]], sentinel: Union[Sentinel, T] = __ITER_DEFAULT, ) -> AsyncIterator[T]: """ An async iterator object yielding elements from ``subject`` :raises TypeError: if ``subject`` does not support any iteration protocol If ``sentinel`` is not given, the ``subject`` must support the async iteration protocol (the :py:meth:`~object.__aiter__` method), the regular iteration protocol (the :py:meth:`~object.__iter__` method), or it must support the sequence protocol (the :py:meth:`~object.__getitem__` method with integer arguments starting at 0). In either case, an async iterator is returned. If ``sentinel`` is given, subject must be an (async) callable. In this case, :py:func:`~.iter` provides an async iterator that uses ``await subject()`` to produce new values. Once a value equals ``sentinel``, the value is discarded and iteration stops. .. seealso:: Use :py:func:`~.scoped_iter` to ensure an (async) iterable is eventually closed and only :term:`borrowed <borrowing>` until then. """ if sentinel is __ITER_DEFAULT: return aiter(subject) elif not callable(subject): raise TypeError("iter(v, w): v must be callable") else: return acallable_iterator(subject, sentinel)
async def acallable_iterator( subject: Callable[[], Awaitable[T]], sentinel: T ) -> AsyncIterator[T]: subject = _awaitify(subject) value = await subject() while value != sentinel: yield value value = await subject()
[docs]async def all(iterable: AnyIterable[T]) -> bool: """ Return :py:data:`True` if none of the elements of the (async) ``iterable`` are false """ async with ScopedIter(iterable) as item_iter: async for element in item_iter: if not element: return False return True
[docs]async def any(iterable: AnyIterable[T]) -> bool: """ Return :py:data:`False` if none of the elements of the (async) ``iterable`` are true """ async with ScopedIter(iterable) as item_iter: async for element in item_iter: if element: return True return False
@overload def zip( __it1: AnyIterable[T1], *, strict=False, ) -> AsyncIterator[Tuple[T1]]: ... @overload def zip( __it1: AnyIterable[T1], __it2: AnyIterable[T2], *, strict=False, ) -> AsyncIterator[Tuple[T1, T2]]: ... @overload def zip( __it1: AnyIterable[T1], __it2: AnyIterable[T2], __it3: AnyIterable[T3], *, strict=False, ) -> AsyncIterator[Tuple[T1, T2, T3]]: ... @overload def zip( __it1: AnyIterable[T1], __it2: AnyIterable[T2], __it3: AnyIterable[T3], __it4: AnyIterable[T4], *, strict=False, ) -> AsyncIterator[Tuple[T1, T2, T3, T4]]: ... @overload def zip( __it1: AnyIterable[T1], __it2: AnyIterable[T2], __it3: AnyIterable[T3], __it4: AnyIterable[T4], __it5: AnyIterable[T5], *, strict=False, ) -> AsyncIterator[Tuple[T1, T2, T3, T4, T5]]: ... @overload def zip( __it1: AnyIterable[Any], __it2: AnyIterable[Any], __it3: AnyIterable[Any], __it4: AnyIterable[Any], __it5: AnyIterable[Any], *iterables: AnyIterable[Any], strict=False, ) -> AsyncIterator[Tuple[Any, ...]]: ...
[docs]async def zip( *iterables: AnyIterable[Any], strict=False ) -> AsyncIterator[Tuple[Any, ...]]: """ Create an async iterator that aggregates elements from each of the (async) iterables :raises ValueError: if the ``iterables`` are not equal length and ``strict`` is set The next element of ``zip`` is a :py:class:`tuple` of the next element of each of its ``iterables``. As soon as any of its ``iterables`` is exhausted, ``zip`` is exhausted as well. This means that if ``zip`` receives *n* ``iterables``, with the shortest having *m* elements, it becomes a generator *m*-times producing an *n*-tuple. .. code:: python3 async for va, vb, vc in zip(a, b, c): print(f'a => {va}, b => {vb}, c => {vc}' If ``iterables`` is empty, the ``zip`` iterator is empty as well. Multiple ``iterables`` may be mixed regular and async iterables. When called with ``strict=True``, all ``iterables`` must be of same length; in this mode ``zip`` raises :py:exc:`ValueError` if any ``iterables`` are not exhausted with the others. """ if not iterables: return aiters = (*(aiter(it) for it in iterables),) del iterables try: inner = _zip_inner(aiters) if not strict else _zip_inner_strict(aiters) async for items in inner: yield items finally: for iterator in aiters: try: aclose = iterator.aclose() except AttributeError: pass else: await aclose
async def _zip_inner(aiters): try: while True: yield (*[await anext(it) for it in aiters],) except StopAsyncIteration: return async def _zip_inner_strict(aiters): tried = 0 try: while True: items = [] for tried, _aiter in _sync_builtins.enumerate(aiters): # noqa: B007 items.append(await anext(_aiter)) yield (*items,) except StopAsyncIteration: # after the first iterable provided an item, some later iterable was empty if tried > 0: plural = " " if tried == 1 else "s 1-" raise ValueError( f"zip() argument {tried+1} is shorter than argument{plural}{tried}" ) # after the first iterable was empty, some later iterable may be not sentinel = object() for tried, _aiter in _sync_builtins.enumerate(aiters): if await anext(_aiter, sentinel) is not sentinel: plural = " " if tried == 1 else "s 1-" raise ValueError( f"zip() argument {tried+1} is longer than argument{plural}{tried}" ) return @overload def map( function: Callable[[T1], Awaitable[R]], __it1: AnyIterable[T1], ) -> AsyncIterator[R]: ... @overload def map( function: Callable[[T1], R], __it1: AnyIterable[T1], ) -> AsyncIterator[R]: ... @overload def map( function: Callable[[T1, T2], Awaitable[R]], __it1: AnyIterable[T1], __it2: AnyIterable[T2], ) -> AsyncIterator[R]: ... @overload def map( function: Callable[[T1, T2], R], __it1: AnyIterable[T1], __it2: AnyIterable[T2] ) -> AsyncIterator[R]: ... @overload def map( function: Callable[[T1, T2, T3], Awaitable[R]], __it1: AnyIterable[T1], __it2: AnyIterable[T2], __it3: AnyIterable[T3], ) -> AsyncIterator[R]: ... @overload def map( function: Callable[[T1, T2, T3], R], __it1: AnyIterable[T1], __it2: AnyIterable[T2], __it3: AnyIterable[T3], ) -> AsyncIterator[R]: ... @overload def map( function: Callable[[T1, T2, T3, T4], Awaitable[R]], __it1: AnyIterable[T1], __it2: AnyIterable[T2], __it3: AnyIterable[T3], __it4: AnyIterable[T4], ) -> AsyncIterator[R]: ... @overload def map( function: Callable[[T1, T2, T3, T4], R], __it1: AnyIterable[T1], __it2: AnyIterable[T2], __it3: AnyIterable[T3], __it4: AnyIterable[T4], ) -> AsyncIterator[R]: ... @overload def map( function: Callable[[T1, T2, T3, T4, T5], Awaitable[R]], __it1: AnyIterable[T1], __it2: AnyIterable[T2], __it3: AnyIterable[T3], __it4: AnyIterable[T4], __it5: AnyIterable[T5], ) -> AsyncIterator[R]: ... @overload def map( function: Callable[[T1, T2, T3, T4, T5], R], __it1: AnyIterable[T1], __it2: AnyIterable[T2], __it3: AnyIterable[T3], __it4: AnyIterable[T4], __it5: AnyIterable[T5], ) -> AsyncIterator[R]: ... @overload def map( function: Callable[..., Awaitable[R]], __it1: AnyIterable[Any], __it2: AnyIterable[Any], __it3: AnyIterable[Any], __it4: AnyIterable[Any], __it5: AnyIterable[Any], *iterable: AnyIterable[Any], ) -> AsyncIterator[R]: ... @overload def map( function: Callable[..., R], __it1: AnyIterable[Any], __it2: AnyIterable[Any], __it3: AnyIterable[Any], __it4: AnyIterable[Any], __it5: AnyIterable[Any], *iterable: AnyIterable[Any], ) -> AsyncIterator[R]: ...
[docs]async def map( function: Union[Callable[..., R], Callable[..., Awaitable[R]]], *iterable: AnyIterable[Any], ) -> AsyncIterator[R]: r""" An async iterator mapping an (async) function to items from (async) iterables At each step, ``map`` collects the next item from each iterable and calls ``function`` with all items; if ``function`` provides an awaitable, it is ``await``\ ed. The result is the next value of ``map``. Barring sync/async translation, ``map`` is equivalent to ``(await function(*args) async for args in zip(iterables))``. It is important that ``func`` receives *one* item from *each* iterable at every step. For *n* ``iterable``, ``func`` must take *n* positional arguments. Similar to :py:func:`~.zip`, ``map`` is exhausted as soon as its *first* argument is exhausted. The ``function`` may be a regular or async callable. Multiple ``iterable`` may be mixed regular and async iterables. """ function = _awaitify(function) async with ScopedIter(zip(*iterable)) as args_iter: async for args in args_iter: result = function(*args) yield await result
__MAX_DEFAULT = Sentinel("<no default>")
[docs]async def max( iterable: AnyIterable[T], *, key: Optional[Callable[[T], Any]] = None, default: T = __MAX_DEFAULT, ) -> T: """ Return the largest item from an (async) iterable or from two or more values :raises ValueError: if ``iterable`` is empty and ``default`` is not set The ``key`` argument specifies a one-argument ordering function like that used for :py:meth:`list.sort`. It may be a regular or async callable and defaults to the identity function. The ``default`` argument specifies an object to return if the provided ``iterable`` is empty. If the ``iterable`` is empty and ``default`` is not provided, a :py:exc:`ValueError` is raised. .. note:: The two-or-more-arguments variant is not supported, as it does not benefit from being ``async``. Use the builtin :py:func:`max` function instead. """ async with ScopedIter(iterable) as item_iter: best = await anext(item_iter, default=__MAX_DEFAULT) if best is __MAX_DEFAULT: if default is __MAX_DEFAULT: raise ValueError("max() arg is an empty sequence") return default if key is None: async for item in item_iter: if item > best: best = item else: key = _awaitify(key) best_key = await key(best) async for item in item_iter: item_key = await key(item) if item_key > best_key: best = item best_key = item_key return best
[docs]async def min( iterable: AnyIterable[T], *, key: Optional[Callable[[T], Any]] = None, default: T = __MAX_DEFAULT, ) -> T: """ Return the smallest item from an (async) iterable or from two or more values :raises ValueError: if ``iterable`` is empty and ``default`` is not set The ``key`` argument specifies a one-argument ordering function like that used for :py:meth:`list.sort`. It may be a regular or async callable and defaults to the identity function. The ``default`` argument specifies an object to return if the provided ``iterable`` is empty. If the ``iterable`` is empty and ``default`` is not provided, a :py:exc:`ValueError` is raised. .. note:: The two-or-more-arguments variant is not supported, as it does not benefit from being ``async``. Use the builtin :py:func:`min` function instead. """ async with ScopedIter(iterable) as item_iter: best = await anext(item_iter, default=__MAX_DEFAULT) if best is __MAX_DEFAULT: if default is __MAX_DEFAULT: raise ValueError("min() arg is an empty sequence") return default if key is None: async for item in item_iter: if item < best: best = item else: key = _awaitify(key) best_key = await key(best) async for item in item_iter: item_key = await key(item) if item_key < best_key: best = item best_key = item_key return best
[docs]async def filter( function: Union[Callable[[T], bool], Callable[[T], Awaitable[bool]], None], iterable: AnyIterable[T], ) -> AsyncIterator[T]: """ An async iterator of elements in an (async) iterable filtered by an (async) callable Barring sync/async translation, ``filter`` is equivalent to ``(element async for args in iterable if await func(element))``. The ``function`` may be a regular or async callable. The ``iterable`` may be a regular or async iterable. """ async with ScopedIter(iterable) as item_iter: if function is None: async for item in item_iter: if item: yield item else: function = _awaitify(function) async for item in item_iter: if await function(item): # type: ignore yield item
[docs]async def enumerate(iterable: AnyIterable[T], start=0) -> AsyncIterator[Tuple[int, T]]: """ An async iterator of running count and element in an (async) iterable The count begins at ``start`` for the first element of ``iterable``, and is incremented by ``1`` for each further element. The ``iterable`` may be a regular or async iterable. """ count = start async with ScopedIter(iterable) as item_iter: async for item in item_iter: yield count, item count += 1
[docs]async def sum(iterable: AnyIterable[T], start: T = 0) -> T: """ Sum of ``start`` and all elements in the (async) iterable """ total = start async for item in aiter(iterable): total += item return total
[docs]async def list(iterable: Union[Iterable[T], AsyncIterable[T]] = ()) -> List[T]: """ Create a :py:class:`list` from an (async) iterable This is equivalent to ``[element async for element in iterable]``. """ return [element async for element in aiter(iterable)]
[docs]async def tuple(iterable: Union[Iterable[T], AsyncIterable[T]] = ()) -> Tuple[T, ...]: """ Create a :py:class:`tuple` from an (async) iterable """ return (*[element async for element in aiter(iterable)],)
@overload async def dict( # noqa: F811 iterable: Union[Iterable[Tuple[K, T]], AsyncIterable[Tuple[K, T]]] = (), ) -> Dict[K, T]: pass @overload # noqa: F811 async def dict( # noqa: F811 iterable: Union[Iterable[Tuple[K, T]], AsyncIterable[Tuple[K, T]]] = (), **kwargs: T, ) -> Dict[Union[K, str], T]: pass
[docs]async def dict( # noqa: F811 iterable: Union[Iterable[Tuple[K, T]], AsyncIterable[Tuple[K, T]]] = (), **kwargs: T, ) -> Dict[Union[K, str], T]: """ Create a :py:class:`dict` from an (async) iterable and keywords This is equivalent to ``{key: value async for key, value in iterable}`` if no keywords are provided. """ if not iterable: return {**kwargs} base_dict: Dict[Union[K, str], T] = { key: value async for key, value in aiter(iterable) } if kwargs: base_dict.update(kwargs) return base_dict
[docs]async def set(iterable: Union[Iterable[T], AsyncIterable[T]] = ()) -> Set[T]: """ Create a :py:class:`set` from an (async) iterable This is equivalent to ``{element async for element in iterable}``. """ return {element async for element in aiter(iterable)}
async def _identity(x: T) -> T: """Asynchronous identity function, returns its argument unchanged""" return x
[docs]async def sorted( iterable: AnyIterable[T], *, key: Optional[Callable[[T], Any]] = None, reverse: bool = False, ) -> List[T]: """ Sort items from an (async) iterable into a new list The optional ``key`` argument specifies a one-argument (async) callable, which provides a substitute for determining the sort order of each item. The special value and default :py:data:`None` represents the identity functions, i.e. compares items directly. The default sort order is ascending, that is items with ``a < b`` imply ``result.index(a) < result.index(b)``. Use ``reverse=True`` for descending sort order. .. note:: The actual sorting is synchronous, so a very large ``iterable`` or very slow comparison may block the event loop notably. It is guaranteed to be worst-case O(n log n) runtime. """ if key is None: try: return _sync_builtins.sorted(iterable, reverse=reverse) except TypeError: pass key = _awaitify(key) if key is not None else _identity keyed_items = [(await key(item), item) async for item in aiter(iterable)] keyed_items.sort(key=lambda ki: ki[0], reverse=reverse) return [item for key, item in keyed_items]