from typing import (
Iterable,
AsyncIterable,
Union,
AsyncIterator,
Awaitable,
Callable,
Tuple,
List,
Set,
Optional,
Dict,
Any,
)
import builtins as _sync_builtins
from ._typing import T, R, HK, LT, AnyIterable
from ._core import (
aiter,
ScopedIter,
awaitify as _awaitify,
Sentinel,
)
__ANEXT_DEFAULT = Sentinel("<no default>")
[docs]
async def anext(
iterator: AsyncIterator[T], default: Union[Sentinel, T] = __ANEXT_DEFAULT
) -> T:
"""
Retrieve the next item from the async iterator
:raises StopAsyncIteration: if ``iterator`` is exhausted and ``default`` is not set
If ``default`` is given, it is returned if the ``iterator`` is exhausted.
Otherwise, :py:exc:`StopAsyncIteration` is raised for an exhausted ``iterator``.
.. note::
This function is not :term:`async neutral`.
The ``iterator`` must be an :term:`asynchronous iterator`,
i.e. support the :py:meth:`~object.__anext__` method.
"""
try:
return await iterator.__anext__()
except StopAsyncIteration:
if default is __ANEXT_DEFAULT:
raise
return default # type: ignore
__ITER_DEFAULT = Sentinel("<no default>")
[docs]
def iter(
subject: Union[AnyIterable[T], Callable[[], Awaitable[T]]],
sentinel: Union[Sentinel, T] = __ITER_DEFAULT,
) -> AsyncIterator[T]:
"""
An async iterator object yielding elements from ``subject``
:raises TypeError: if ``subject`` does not support any iteration protocol
If ``sentinel`` is not given, the ``subject`` must support
the async iteration protocol (the :py:meth:`~object.__aiter__` method),
the regular iteration protocol (the :py:meth:`~object.__iter__` method),
or it must support the sequence protocol (the :py:meth:`~object.__getitem__`
method with integer arguments starting at 0).
In either case, an async iterator is returned.
If ``sentinel`` is given, subject must be an (async) callable. In this case,
:py:func:`~.iter` provides an async iterator that uses ``await subject()``
to produce new values. Once a value equals ``sentinel``, the value is discarded
and iteration stops.
.. seealso:: Use :py:func:`~.scoped_iter` to ensure an (async) iterable
is eventually closed and only :term:`borrowed <borrowing>` until then.
"""
if sentinel is __ITER_DEFAULT:
return aiter(subject) # type: ignore
elif not callable(subject):
raise TypeError("iter(v, w): v must be callable")
else:
assert not isinstance(sentinel, Sentinel)
return acallable_iterator(subject, sentinel)
async def acallable_iterator(
subject: Callable[[], Awaitable[T]], sentinel: T
) -> AsyncIterator[T]:
subject = _awaitify(subject)
value = await subject()
while value != sentinel:
yield value
value = await subject()
[docs]
async def all(iterable: AnyIterable[Any]) -> bool:
"""
Return :py:data:`True` if none of the elements of the (async) ``iterable`` are false
"""
async with ScopedIter(iterable) as item_iter:
async for element in item_iter:
if not element:
return False
return True
[docs]
async def any(iterable: AnyIterable[Any]) -> bool:
"""
Return :py:data:`False` if none of the elements of the (async) ``iterable`` are true
"""
async with ScopedIter(iterable) as item_iter:
async for element in item_iter:
if element:
return True
return False
[docs]
async def zip(
*iterables: AnyIterable[Any], strict: bool = False
) -> AsyncIterator[Tuple[Any, ...]]:
"""
Create an async iterator that aggregates elements from each of the (async) iterables
:raises ValueError: if the ``iterables`` are not equal length and ``strict`` is set
The next element of ``zip`` is a :py:class:`tuple` of the next element of
each of its ``iterables``. As soon as any of its ``iterables`` is exhausted,
``zip`` is exhausted as well. This means that if ``zip`` receives *n* ``iterables``,
with the shortest having *m* elements, it becomes a generator *m*-times producing
an *n*-tuple.
.. code:: python3
async for va, vb, vc in zip(a, b, c):
print(f'a => {va}, b => {vb}, c => {vc}'
If ``iterables`` is empty, the ``zip`` iterator is empty as well.
Multiple ``iterables`` may be mixed regular and async iterables.
When called with ``strict=True``, all ``iterables`` must be of same length;
in this mode ``zip`` raises :py:exc:`ValueError` if any ``iterables`` are not
exhausted with the others.
"""
if not iterables:
return
aiters = (*(aiter(it) for it in iterables),)
del iterables
try:
inner = _zip_inner(aiters) if not strict else _zip_inner_strict(aiters)
async for items in inner:
yield items
finally:
for iterator in aiters:
try:
aclose = iterator.aclose # type: ignore
except AttributeError:
pass
else:
await aclose()
async def _zip_inner(
aiters: Tuple[AsyncIterator[T], ...]
) -> AsyncIterator[Tuple[T, ...]]:
"""Direct zip transposing tuple-of-iterators to iterator-of-tuples"""
try:
while True:
yield (*[await anext(it) for it in aiters],)
except StopAsyncIteration:
return
async def _zip_inner_strict(
aiters: Tuple[AsyncIterator[T], ...]
) -> AsyncIterator[Tuple[T, ...]]:
"""Length aware zip checking that all iterators are equal length"""
# track index of the last iterator we tried to anext
tried = 0
try:
while True:
items: _sync_builtins.list[T] = []
for tried, _aiter in _sync_builtins.enumerate(aiters): # noqa: B007
items.append(await anext(_aiter))
yield (*items,)
except StopAsyncIteration:
# after the first iterable provided an item, some later iterable was empty
if tried > 0:
plural = " " if tried == 1 else "s 1-"
raise ValueError(
f"zip() argument {tried + 1} is shorter than argument{plural}{tried}"
) from None
# after the first iterable was empty, some later iterable may be not
sentinel = object()
for tried, _aiter in _sync_builtins.enumerate(aiters):
if await anext(_aiter, sentinel) is not sentinel:
plural = " " if tried == 1 else "s 1-"
raise ValueError(
f"zip() argument {tried + 1} is longer than argument{plural}{tried}"
) from None
return
[docs]
async def map(
function: Union[Callable[..., R], Callable[..., Awaitable[R]]],
*iterable: AnyIterable[Any],
) -> AsyncIterator[R]:
r"""
An async iterator mapping an (async) function to items from (async) iterables
At each step, ``map`` collects the next item from each iterable and calls
``function`` with all items; if ``function`` provides an awaitable,
it is ``await``\ ed. The result is the next value of ``map``.
Barring sync/async translation, ``map`` is equivalent to
``(await function(*args) async for args in zip(iterables))``.
It is important that ``func`` receives *one* item from *each* iterable at
every step. For *n* ``iterable``, ``func`` must take *n* positional arguments.
Similar to :py:func:`~.zip`, ``map`` is exhausted as soon as its
*first* argument is exhausted.
The ``function`` may be a regular or async callable.
Multiple ``iterable`` may be mixed regular and async iterables.
"""
function = _awaitify(function)
async with ScopedIter(zip(*iterable)) as args_iter:
async for args in args_iter:
result = function(*args)
yield await result
__MIN_MAX_DEFAULT = Sentinel("<no default>")
[docs]
async def max(
iterable: AnyIterable[Any],
*,
key: Optional[Callable[[Any], Any]] = None,
default: Any = __MIN_MAX_DEFAULT,
) -> Any:
"""
Return the largest item from an (async) iterable or from two or more values
:raises ValueError: if ``iterable`` is empty and ``default`` is not set
The ``key`` argument specifies a one-argument ordering function like that used
for :py:meth:`list.sort`. It may be a regular or async callable and defaults to
the identity function. The ``default`` argument specifies an object to return
if the provided ``iterable`` is empty. If the ``iterable`` is empty and
``default`` is not provided, a :py:exc:`ValueError` is raised.
.. note::
The two-or-more-arguments variant is not supported,
as it does not benefit from being ``async``.
Use the builtin :py:func:`max` function instead.
"""
return await _min_max(iterable, key, True, default)
[docs]
async def min(
iterable: AnyIterable[Any],
*,
key: Optional[Callable[[Any], Any]] = None,
default: Any = __MIN_MAX_DEFAULT,
) -> Any:
"""
Return the smallest item from an (async) iterable or from two or more values
:raises ValueError: if ``iterable`` is empty and ``default`` is not set
The ``key`` argument specifies a one-argument ordering function like that used
for :py:meth:`list.sort`. It may be a regular or async callable and defaults to
the identity function. The ``default`` argument specifies an object to return
if the provided ``iterable`` is empty. If the ``iterable`` is empty and
``default`` is not provided, a :py:exc:`ValueError` is raised.
.. note::
The two-or-more-arguments variant is not supported,
as it does not benefit from being ``async``.
Use the builtin :py:func:`min` function instead.
"""
return await _min_max(iterable, key, False, default)
async def _min_max(
iterable: AnyIterable[LT],
key: Optional[Callable[[LT], Any]],
invert: bool,
default: LT = __MIN_MAX_DEFAULT, # type: ignore
) -> LT:
"""
Implementation of ``min``/``max``
:param invert: compute ``max`` if ``True`` and ``min`` otherwise
"""
async with ScopedIter(iterable) as item_iter:
best = await anext(item_iter, default=default)
# this implies that item_iter is empty and default is __MIN_MAX_DEFAULT
if best is __MIN_MAX_DEFAULT: # type: ignore
name = "max" if invert else "min"
raise ValueError(f"{name}() arg is an empty sequence")
elif key is None:
async for item in item_iter:
if invert ^ (item < best):
best = item
else:
key = _awaitify(key)
best_key = await key(best)
async for item in item_iter:
item_key = await key(item)
if invert ^ (item_key < best_key):
best = item
best_key = item_key
return best
[docs]
async def filter(
function: Union[Callable[[T], bool], Callable[[T], Awaitable[bool]], None],
iterable: AnyIterable[T],
) -> AsyncIterator[T]:
"""
An async iterator of elements in an (async) iterable filtered by an (async) callable
Barring sync/async translation, ``filter`` is equivalent to
``(element async for args in iterable if await func(element))``.
The ``function`` may be a regular or async callable.
The ``iterable`` may be a regular or async iterable.
"""
async with ScopedIter(iterable) as item_iter:
if function is None:
async for item in item_iter:
if item:
yield item
else:
function = _awaitify(function)
async for item in item_iter:
if await function(item):
yield item
[docs]
async def enumerate(
iterable: AnyIterable[T], start: int = 0
) -> AsyncIterator[Tuple[int, T]]:
"""
An async iterator of running count and element in an (async) iterable
The count begins at ``start`` for the first element of ``iterable``,
and is incremented by ``1`` for each further element.
The ``iterable`` may be a regular or async iterable.
"""
count = start
async with ScopedIter(iterable) as item_iter:
async for item in item_iter:
yield count, item
count += 1
[docs]
async def sum(iterable: AnyIterable[Any], start: Any = 0) -> Any:
"""
Sum of ``start`` and all elements in the (async) iterable
"""
total = start
async for item in aiter(iterable):
total += item
return total
[docs]
async def list(iterable: Union[Iterable[T], AsyncIterable[T]] = ()) -> List[T]:
"""
Create a :py:class:`list` from an (async) iterable
This is equivalent to ``[element async for element in iterable]``.
"""
return [element async for element in aiter(iterable)]
[docs]
async def tuple(iterable: Union[Iterable[T], AsyncIterable[T]] = ()) -> Tuple[T, ...]:
"""
Create a :py:class:`tuple` from an (async) iterable
"""
return (*[element async for element in aiter(iterable)],)
[docs]
async def dict( # noqa: F811
iterable: Union[Iterable[Tuple[HK, T]], AsyncIterable[Tuple[HK, T]]] = (),
**kwargs: T,
) -> Dict[Any, T]:
"""
Create a :py:class:`dict` from an (async) iterable and keywords
This is equivalent to ``{key: value async for key, value in iterable}``
if no keywords are provided.
"""
if not iterable:
return {**kwargs}
base_dict: Dict[Any, T] = {key: value async for key, value in aiter(iterable)}
if kwargs:
base_dict.update(kwargs)
return base_dict
[docs]
async def set(iterable: Union[Iterable[T], AsyncIterable[T]] = ()) -> Set[T]:
"""
Create a :py:class:`set` from an (async) iterable
This is equivalent to ``{element async for element in iterable}``.
"""
return {element async for element in aiter(iterable)}
[docs]
async def sorted(
iterable: AnyIterable[T],
*,
key: Optional[Callable[[T], LT]] = None,
reverse: bool = False,
) -> List[T]:
"""
Sort items from an (async) iterable into a new list
The optional ``key`` argument specifies a one-argument (async) callable, which
provides a substitute for determining the sort order of each item.
The special value and default :py:data:`None` represents the identity functions,
i.e. compares items directly.
The default sort order is ascending, that is items with ``a < b``
imply ``result.index(a) < result.index(b)``. Use ``reverse=True``
for descending sort order.
.. note::
The actual sorting is synchronous,
so a very large ``iterable`` or very slow comparison
may block the event loop notably.
It is guaranteed to be worst-case O(n log n) runtime.
"""
if key is None:
# TODO: is this a worthwhile optimisation?
try:
return _sync_builtins.sorted(iterable, reverse=reverse) # type: ignore
except TypeError:
items: _sync_builtins.list[Any] = [item async for item in aiter(iterable)]
items.sort(reverse=reverse)
return items
else:
async_key = _awaitify(key)
keyed_items = [(await async_key(item), item) async for item in aiter(iterable)]
keyed_items.sort(key=lambda ki: ki[0], reverse=reverse)
return [item for _, item in keyed_items]