from typing import (
Any,
TypeVar,
AsyncIterator,
List,
Awaitable,
Union,
Callable,
Optional,
Deque,
Generic,
Iterator,
Tuple,
overload,
AsyncGenerator,
)
from collections import deque
from ._utility import public_module
from ._core import (
ScopedIter,
AnyIterable,
awaitify as _awaitify,
Sentinel,
borrow as _borrow,
)
from .builtins import anext, zip, enumerate as aenumerate, aiter as aiter
T = TypeVar("T")
S = TypeVar("S")
R = TypeVar("R")
# Variadic overloads
T1 = TypeVar("T1")
T2 = TypeVar("T2")
T3 = TypeVar("T3")
T4 = TypeVar("T4")
T5 = TypeVar("T5")
[docs]async def cycle(iterable: AnyIterable[T]) -> AsyncIterator[T]:
"""
An :term:`asynchronous iterator` indefinitely iterating over ``iterable``
This lazily exhausts ``iterable`` on its first pass, and recalls items
from an internal buffer on subsequent passes. If ``iterable`` is empty,
the resulting iterator terminates immediately.
This means items from ``iterable`` are provided immediately as they become
available, even if later items are not ready yet. Subsequent passes directly
provide items, without replicating any delays of the original ``iterable``.
All items produced by ``iterable`` are stored internally, which may consume
significant memory.
"""
buffer: List[T] = []
async with ScopedIter(iterable) as async_iter:
async for item in async_iter: # type: T
buffer.append(item)
yield item
if not buffer:
return
while True:
for item in buffer:
yield item
__ACCUMULATE_SENTINEL = Sentinel("<no default>")
async def add(x, y):
"""The default reduction of :py:func:`~.accumulate`"""
return x + y
[docs]async def accumulate(
iterable: AnyIterable[T],
function: Union[Callable[[T, T], T], Callable[[T, T], Awaitable[T]]] = add,
*,
initial: T = __ACCUMULATE_SENTINEL,
) -> AsyncIterator[T]:
"""
An :term:`asynchronous iterator` on the running reduction of ``iterable``
:raises TypeError: if ``iterable`` is empty and ``initial`` is not given
This is conceptually equivalent to :py:func:`~asyncstdlib.functools.reduce`
in that it applies a reduction ``function`` iteratively on the ``iterable``.
However, the iterator yields the *running* reduction value as each value
is fetched from ``iterable``.
The ``function`` defaults to ``operator.add``, providing a running sum.
If an ``initial`` value is provided, it is the first value processed and yielded.
Provided that all parameters are given and valid, this is roughly equivalent to:
.. code-block:: python3
async def accumulate(iterable, function, *, initial):
current = initial
yield current
async for value in accumulate:
current = await function(current, value)
yield current
"""
async with ScopedIter(iterable) as item_iter:
try:
value = (
initial
if initial is not __ACCUMULATE_SENTINEL
else await anext(item_iter)
)
except StopAsyncIteration:
raise TypeError("accumulate() of empty sequence with no initial value")
function = _awaitify(function)
yield value
async for head in item_iter:
value = await function(value, head)
yield value
[docs]async def chain(*iterables: AnyIterable[T]) -> AsyncIterator[T]:
"""
An :term:`asynchronous iterator` flattening values from all ``iterables``
The resulting iterator consecutively iterates over and yields all values from
each of the ``iterables``. This is similar to converting all ``iterables`` to
sequences and concatenating them, but lazily exhausts each iterable.
"""
for iterable in iterables:
async with ScopedIter(iterable) as iterator:
async for item in iterator:
yield item
@public_module(__name__, "chain.from_iterable")
async def chain_from_iterable(
iterable: AnyIterable[AnyIterable[T]],
) -> AsyncIterator[T]:
"""
Alternate constructor for :py:func:`~.chain` that lazily exhausts iterables as well
"""
async with ScopedIter(iterable) as iterables:
async for sub_iterable in iterables:
async with ScopedIter(sub_iterable) as iterator:
async for item in iterator:
yield item
chain.from_iterable = chain_from_iterable
[docs]async def compress(
data: AnyIterable[T], selectors: AnyIterable[bool]
) -> AsyncIterator[T]:
"""
An :term:`asynchronous iterator` for items of ``data`` with true ``selectors``
Lazily iterates both ``data`` and ``selectors`` pairwise, yielding only ``data``
items for which their paired ``selectors`` evaluate as true.
Roughly equivalent to:
.. code-block:: python3
async def compress(data, selectors):
return (item async for item, select in zip(data, selectors) if select)
"""
async with ScopedIter(data) as data_iter, ScopedIter(selectors) as selectors_iter:
async for item, keep in zip(data_iter, selectors_iter):
if keep:
yield item
[docs]async def dropwhile(
predicate: Union[Callable[[T], bool], Callable[[T], Awaitable[bool]]],
iterable: AnyIterable[T],
) -> AsyncIterator[T]:
"""
Yield items from ``iterable`` after ``predicate(item)`` is no longer true
This lazily iterates over ``iterable``, discarding items as long as evaluating
``predicate`` for the current item is true. As soon as ``predicate`` evaluates
as true for the current item, this item is yielded. All subsequent items are
yielded immediately as they become available, without evaluating ``predicate``
for them.
"""
async with ScopedIter(iterable) as async_iter:
predicate = _awaitify(predicate)
async for item in async_iter:
if not await predicate(item): # type: ignore
yield item
break
async for item in async_iter:
yield item
[docs]async def islice(iterable: AnyIterable[T], *args: Optional[int]) -> AsyncIterator[T]:
"""
An :term:`asynchronous iterator` over items from ``iterable`` in a :py:class:`slice`
Aside from the iterable, this function accepts one to three parameters as understood
by :py:class:`slice`:
a single parameter ``stop``, or up to three parameters ``start, stop [, step]``.
The first ``start`` items of ``iterable`` are discarded. Afterwards, each ``step``
item is yielded until a total of ``stop`` items have been fetched. This effectively
is a lazy, asynchronous version of ``iterable[start:stop:step]``.
"""
s = slice(*args)
start, stop, step = s.start or 0, s.stop, s.step or 1
async with ScopedIter(iterable) as async_iter:
# always consume the first ``start`` items, even if the slice is empty
if start > 0:
async for _count, element in aenumerate(_borrow(async_iter), start=1):
if _count == start:
break
if stop is None:
async for idx, element in aenumerate(async_iter, start=0):
if not idx % step:
yield element
elif stop <= start:
return
else:
# We would actually check ``idx >= stop -1`` later on.
# Since we do that for every ``idx``, we subtract ``1`` once here.
stop -= start + 1
async for idx, element in aenumerate(async_iter, start=0):
if not idx % step:
yield element
if idx >= stop:
return
[docs]async def starmap(
function: Union[Callable[..., T], Callable[..., Awaitable[T]]],
iterable: AnyIterable,
) -> AsyncIterator[T]:
"""
An :term:`asynchronous iterator` applying a function to arguments from an iterable
This is conceptually similar to :py:func:`~asyncstdlib.builtins.map`, but applies
a single iterable of multiple arguments instead of
multiple iterables of a single argument each.
The same way that
``function(a, b)`` can be generalized to ``map(function, iter_a, iter_b)``,
``function(*c)`` can be generalized to ``starmap(function, iter_c)``.
"""
function = _awaitify(function)
async with ScopedIter(iterable) as async_iter:
async for args in async_iter:
yield await function(*args)
[docs]async def takewhile(
predicate: Union[Callable[[T], bool], Callable[[T], Awaitable[bool]]],
iterable: AnyIterable[T],
) -> AsyncIterator[T]:
"""
Yield items from ``iterable`` as long as ``predicate(item)`` is true
This lazily iterates over ``iterable``, yielding items as long as evaluating
``predicate`` for the current item is true. As soon as ``predicate`` evaluates
as false for the current item, no more items are yielded. Note that if
``iterable`` is a single-use iterator, the item is available neither from
``iterable`` nor ``takewhile`` and effectively discarded.
"""
async with ScopedIter(iterable) as async_iter:
predicate = _awaitify(predicate)
async for item in async_iter:
if await predicate(item):
yield item
else:
break
async def tee_peer(
iterator: AsyncIterator[T], buffer: Deque[T], peers: List[Deque[T]], cleanup: bool
) -> AsyncGenerator[T, None]:
try:
while True:
if not buffer:
try:
item = await iterator.__anext__()
except StopAsyncIteration:
break
else:
# Append to all buffers, including our own. We'll fetch our
# item from the buffer again, instead of yielding it directly.
# This ensures the proper item ordering if any of our peers
# are fetching items concurrently. They may have buffered their
# item already.
for peer in peers:
peer.append(item)
yield buffer.popleft()
finally:
for idx, item in enumerate(peers): # pragma: no branch
if item is buffer:
peers.pop(idx)
break
if cleanup and not peers and hasattr(iterator, "aclose"):
await iterator.aclose()
@public_module(__name__, "tee")
class Tee(Generic[T]):
"""
Create ``n`` separate asynchronous iterators over ``iterable``
This splits a single ``iterable`` into multiple iterators, each providing
the same items in the same order.
All child iterators may advance separately but share the same items
from ``iterable`` -- when the most advanced iterator retrieves an item,
it is buffered until the least advanced iterator has yielded it as well.
A ``tee`` works lazily and can handle an infinite ``iterable``, provided
that all iterators advance.
.. code-block:: python3
async def derivative(sensor_data):
previous, current = a.tee(sensor_data, n=2)
await a.anext(previous) # advance one iterator
return a.map(operator.sub, previous, current)
Unlike :py:func:`itertools.tee`, :py:func:`~.tee` returns a custom type instead
of a :py:class:`tuple`. Like a tuple, it can be indexed, iterated and unpacked
to get the child iterators. In addition, its :py:meth:`~.tee.aclose` method
immediately closes all children, and it can be used in an ``async with`` context
for the same effect.
If ``iterable`` is an iterator and read elsewhere, ``tee`` will *not*
provide these items. Also, ``tee`` must internally buffer each item until the
last iterator has yielded it; if the most and least advanced iterator differ
by most data, using a :py:class:`list` is faster (but not lazy).
If the underlying iterable is concurrency safe (``anext`` may be awaited
concurrently) the resulting iterators are concurrency safe as well. Otherwise,
the iterators are safe if there is only ever one single "most advanced" iterator.
"""
def __init__(self, iterable: AnyIterable[T], n: int = 2):
self._iterator = aiter(iterable)
_cleanup = self._iterator is iterable
self._buffers = [deque() for _ in range(n)]
self._children = tuple(
tee_peer(
iterator=self._iterator,
buffer=buffer,
peers=self._buffers,
cleanup=_cleanup,
)
for buffer in self._buffers
)
def __len__(self) -> int:
return len(self._children)
def __getitem__(self, item) -> AsyncIterator[T]:
return self._children[item]
def __iter__(self) -> Iterator[AnyIterable[T]]:
yield from self._children
async def __aenter__(self) -> "Tee[T]":
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclose()
return False
async def aclose(self):
for child in self._children:
await child.aclose()
tee = Tee
[docs]async def pairwise(iterable: AnyIterable[T]) -> AsyncIterator[Tuple[T, T]]:
"""
Yield successive, overlapping pairs of items from ``iterable``
Pairs are yielded as the newest item is available from ``iterable``. No pair
is emitted if ``iterable`` has one or zero items; however, if there is one item
``pairwise`` will wait for and consume it before finishing.
"""
async with ScopedIter(iterable) as async_iter:
prev = await anext(async_iter, None) # any default is fine – we never yield it
async for current in async_iter:
yield prev, current
prev = current
async def _repeat(value):
while True:
yield value
@overload
def zip_longest(
__it1: AnyIterable[T1],
*,
fillvalue: S = None,
) -> AsyncIterator[Tuple[T1]]:
...
@overload
def zip_longest(
__it1: AnyIterable[T1],
__it2: AnyIterable[T2],
*,
fillvalue: S = None,
) -> AsyncIterator[Tuple[Union[T1, S], Union[T2, S]]]:
...
@overload
def zip_longest(
__it1: AnyIterable[T1],
__it2: AnyIterable[T2],
__it3: AnyIterable[T3],
*,
fillvalue: S = None,
) -> AsyncIterator[Tuple[Union[T1, S], Union[T2, S], Union[T3, S]]]:
...
@overload
def zip_longest(
__it1: AnyIterable[T1],
__it2: AnyIterable[T2],
__it3: AnyIterable[T3],
__it4: AnyIterable[T4],
*,
fillvalue: S = None,
) -> AsyncIterator[Tuple[Union[T1, S], Union[T2, S], Union[T3, S], Union[T4, S]]]:
...
@overload
def zip_longest(
__it1: AnyIterable[T1],
__it2: AnyIterable[T2],
__it3: AnyIterable[T3],
__it4: AnyIterable[T4],
__it5: AnyIterable[T5],
*,
fillvalue: S = None,
) -> AsyncIterator[
Tuple[Union[T1, S], Union[T2, S], Union[T3, S], Union[T4, S], Union[T5, S]]
]:
...
@overload
def zip_longest(
__it1: AnyIterable[Any],
__it2: AnyIterable[Any],
__it3: AnyIterable[Any],
__it4: AnyIterable[Any],
__it5: AnyIterable[Any],
*iterables: AnyIterable[Any],
fillvalue: S = None,
) -> AsyncIterator[Tuple[Any, ...]]:
...
[docs]async def zip_longest(
*iterables: AnyIterable[Any], fillvalue: S = None
) -> AsyncIterator[Tuple[Any, ...]]:
"""
Create an async iterator that aggregates elements from each of the (async) iterables
The next element of ``zip_longest`` is a :py:class:`tuple` of the next element of
each of its ``iterables``. As soon as all of its ``iterables`` are exhausted,
``zip_longest`` is exhausted as well. Shorter iterables are padded by ``fillvalue``.
This means that if ``zip_longest`` receives *n* ``iterables``,
with the longest having *m* elements, it becomes a generator *m*-times producing
an *n*-tuple.
If ``iterables`` is empty, the ``zip_longest`` iterator is empty as well.
Multiple ``iterables`` may be mixed regular and async iterables.
"""
if not iterables:
return
fill_iter = aiter(_repeat(fillvalue))
async_iters = list(aiter(it) for it in iterables)
del iterables
try:
remaining = len(async_iters)
while True:
values = []
for index, aiterator in enumerate(async_iters):
try:
value = await anext(aiterator)
except StopAsyncIteration:
remaining -= 1
if not remaining:
return
async_iters[index] = fill_iter
values.append(fillvalue)
else:
values.append(value)
del value
yield tuple(values)
finally:
await fill_iter.aclose()
for iterator in async_iters:
try:
aclose = iterator.aclose()
except AttributeError:
pass
else:
await aclose
async def identity(x: T) -> T:
"""Asynchronous identity function, returns its argument unchanged"""
return x
@overload # noqa: F811
async def groupby( # noqa: F811
iterable: AnyIterable[T],
) -> AsyncIterator[Tuple[T, AsyncIterator[T]]]:
...
@overload # noqa: F811
async def groupby( # noqa: F811
iterable: AnyIterable[T], key: None
) -> AsyncIterator[Tuple[T, AsyncIterator[T]]]:
...
@overload # noqa: F811
async def groupby( # noqa: F811
iterable: AnyIterable[T], key: Union[Callable[[T], R], Callable[[T], Awaitable[R]]]
) -> AsyncIterator[Tuple[R, AsyncIterator[T]]]:
...
[docs]async def groupby( # noqa: F811
iterable: AnyIterable[T],
key: Optional[Union[Callable[[T], R], Callable[[T], Awaitable[R]]]] = identity,
):
"""
Create an async iterator over consecutive keys and groups from the (async) iterable
The groups generated by ``groupby`` are consecutive with respect to the original
(async) iterable. That is, multiple groups may have the same key if there is any
intermediate group with different key. For example, the iterable
``1, 1, 1, 2, 2, 1`` is split into the groups ``1, 2, 1``.
The async iterator returned by ``groupby`` as well as the async iterators of
each group share the same underlying iterator. This means that previous groups
are no longer accessible if the ``groubpy`` iterator advances to the next group.
In specific, it is not safe to concurrently advance both the ``groupby`` iterator
itself and any of its group iterators.
In contrast to the original :py:func:`itertools.groupby`, it is generally not
useful to sort ``iterable`` by ``key`` beforehand. Since both values and keys are
required up-front for sorting, this loses the advantage of asynchronous,
lazy iteration and evaluation.
"""
# whether the current group was exhausted and the next begins already
exhausted = False
# `current_*`: buffer for key/value the current group peeked beyond its end
current_key = current_value = nothing = object() # type: Any
make_key: Callable[[T], Awaitable[R]] = (
_awaitify(key) if key is not None else identity
)
async with ScopedIter(iterable) as async_iter:
# fast-forward mode: advance to the next group
async def seek_group() -> AsyncIterator[T]:
nonlocal current_value, current_key, exhausted
# Note: `value` always ends up being some T
# - value is something: we can never unset it
# - value is `nothing`: the previous group was not exhausted,
# and we scan at least one new value
value: T = current_value
if not exhausted:
previous_key = current_key
while previous_key == current_key:
value = await anext(async_iter)
current_key = await make_key(value)
current_value = nothing
exhausted = False
return group(current_key, value=value)
# the lazy iterable of all items with the same key
async def group(desired_key, value: T) -> AsyncIterator[T]:
nonlocal current_value, current_key, exhausted
yield value
async for value in async_iter:
next_key = await make_key(value)
if next_key == desired_key:
yield value
else:
exhausted = True
current_value = value
current_key = next_key
break
try:
while True:
next_group = await seek_group()
async with ScopedIter(next_group) as scoped_group:
yield current_key, scoped_group
except StopAsyncIteration:
return