Source code for asyncstdlib.contextlib

from typing import (
    TypeVar,
    Generic,
    AsyncGenerator,
    Callable,
    Optional,
    Union,
    Any,
    Awaitable,
    Deque,
    overload,
)
from functools import wraps
from collections import deque
from functools import partial
import sys

from ._typing import Protocol, AsyncContextManager, ContextManager, T
from ._core import awaitify
from ._utility import public_module, slot_get as _slot_get


# typing.AsyncContextManager uses contextlib.AbstractAsyncContextManager if available,
# and a custom implementation otherwise. No need to replicate it.
AbstractContextManager = AsyncContextManager


class ACloseable(Protocol):
    async def aclose(self):
        """Asynchronously close this object"""


AC = TypeVar("AC", bound=ACloseable)


[docs]def contextmanager( func: Callable[..., AsyncGenerator[T, None]] ) -> Callable[..., AsyncContextManager[T]]: """ Create an asynchronous context manager out of an asynchronous generator function This is intended as a decorator for an asynchronous generator function. The asynchronous generator should ``yield`` once, at which point the body of the context manager executes. If ``yield`` provides a value, this becomes the value of the context in the block. .. code-block:: python3 @contextmanager async def Context(*args, **kwargs): # __aenter__ yield # context value # __aexit__ Note that if an exception ends the context block, it gets re-raised at the ``yield`` inside the asynchronous generator (via :py:meth:`~agen.athrow`). In order to handle this exception, the ``yield`` should be wrapped in a ``try`` statement. """ @wraps(func) def helper(*args, **kwds): return _AsyncGeneratorContextManager(func, args, kwds) return helper
class _AsyncGeneratorContextManager: def __init__(self, func, args, kwds): self.gen = func(*args, **kwds) self.__doc__ = getattr(func, "__doc__", type(self).__doc__) async def __aenter__(self): try: return await self.gen.__anext__() except StopAsyncIteration: raise RuntimeError("generator did not yield to __aenter__") from None async def __aexit__(self, exc_type, exc_val, exc_tb): if exc_type is None: try: await self.gen.__anext__() except StopAsyncIteration: return else: raise RuntimeError("generator did not stop after __aexit__") else: try: await self.gen.athrow(exc_type, exc_val, exc_tb) except StopAsyncIteration as exc: return exc is not exc_tb except RuntimeError as exc: if exc is exc_val: return False # Handle promotion of unhandled Stop[Async]Iteration to RuntimeError if isinstance(exc_val, (StopIteration, StopAsyncIteration)): if exc.__cause__ is exc_val: return False raise except exc_type as exc: if exc is not exc_val: raise else: raise RuntimeError("generator did not stop after throw() in __aexit__") @public_module(__name__, "closing") class Closing(Generic[AC]): """ Create an :term:`asynchronous context manager` to ``aclose`` some ``thing`` on exit Once entered, the context manager guarantees to ``await thing.aclose()`` at the end of its block. This is useful for safe cleanup even if errors occur. Use :py:class:`~.closing` for objects that need reliable cleanup but do not support the context manager protocol. For example, it is advisable to prompty clean up any :term:`asynchronous iterator` that holds resources: .. code-block:: python3 import asyncstdlib as a async with a.closing(a.iter(something)) as async_iter: async for element in async_iter: ... .. seealso:: Use :py:func:`~.scoped_iter` to ensure an (async) iterable is eventually closed and only :term:`borrowed <borrowing>` until then. """ def __init__(self, thing: AC): self.thing = thing async def __aenter__(self) -> AC: return self.thing async def __aexit__(self, exc_type, exc_val, exc_tb): await self.thing.aclose() closing = Closing @public_module(__name__, "nullcontext") class NullContext(Generic[T]): """ Create an :term:`asynchronous context manager` that only returns ``enter_result`` Intended as a neutral element, a :py:class:`~.nullcontext` serves as a placeholder where an async context manager is semantically required but not meaningfull. This allows for an optional async context manager with a default :py:class:`~.nullcontext`, or to prevent closing of an existing context manager in an ``async with`` statement. .. code-block:: python3 async def safe_fetch(source): if not isinstance(source, AsyncIterator): # use a context manager if required ... acm = a.closing(iter(source)) else: # ... or a neutral placeholder acm = a.nullcontext(source) async with acm as async_iter: ... """ __slots__ = ("enter_result",) @overload def __init__(self: "NullContext[None]", enter_result: None = ...) -> None: ... @overload def __init__(self: "NullContext[T]", enter_result: T) -> None: ... def __init__(self, enter_result=None): self.enter_result = enter_result @overload def __aenter__(self: "NullContext[None]") -> None: ... @overload def __aenter__(self: "NullContext[T]") -> T: ... async def __aenter__(self): return self.enter_result async def __aexit__(self, exc_type, exc_val, exc_tb): pass nullcontext = NullContext SE = TypeVar( "SE", bound=Union[ AsyncContextManager, ContextManager, Callable[[Any, BaseException, Any], Optional[bool]], Callable[[Any, BaseException, Any], Awaitable[Optional[bool]]], ], )
[docs]class ExitStack: """ Context Manager emulating several nested Context Managers Once an :py:class:`~.ExitStack` is entered, :py:meth:`enter_context` can be used to emulate entering further context managers. When unwinding the stack, context managers are exited in LIFO order, effectively emulating nested context managers. The primary use-case is programmatically entering optional or a dynamically sized number of context managers. In addition, arbitrary cleanup functions and callbacks can be registered using :py:meth:`push` and :py:meth:`callback`. This allows running additional cleanup, similar to ``defer`` statements in other languages. .. note:: Unlike :py:class:`contextlib.AsyncExitStack`, this class provides an :term:`async neutral` version of the :py:class:`contextlib.ExitStack`. There are no separate methods to distinguish async and regular arguments. """ def __init__(self): self._exit_callbacks: Deque[Callable[..., Awaitable[Optional[bool]]]] = deque() @staticmethod async def _aexit_callback(callback, exc_type, exc_val, tb): """Invoke a callback as if it were an ``__aexit__`` method""" await callback() return False # callbacks never suppress exceptions
[docs] def pop_all(self) -> "ExitStack": """ Transfer all exit callbacks to a new :py:class:`~.ExitStack` :return: new :py:class:`~.ExitStack` owning all previously registered callbacks The responsibility of invoking previously registered handlers is fully transferred to the new :py:class:`~.ExitStack`. Neither calling this method, nor closing the original :py:class:`~.ExitStack` (via :py:meth:`~.aclose` or an ``async with`` statement) invokes these callbacks. Note that callbacks added after calling :py:meth:`~.pop_all` are not affected by this. """ new_stack = type(self)() new_stack._exit_callbacks, self._exit_callbacks = self._exit_callbacks, deque() return new_stack
[docs] def push(self, exit: SE) -> SE: """ Registers a callback with the standard ``__aexit__`` method signature :param exit: the exit callback to invoke on ``__aexit__`` :return: the ``exit`` parameter, unchanged When the stack is unwound, callbacks receive the current exception details, and are expected to return :py:data:`True` if the exception should be suppressed. Two normalizations are applied to match the ``__aexit__`` signature: * If ``exit`` has an ``__aexit__`` method, this method is used instead. * If ``exit`` has an ``__exit__`` method, this method is used instead. It is automatically treated as asynchronous. * If ``exit`` is not asynchronous, it is automatically treated as such. Note that ``exit`` is only treated as :term:`async neutral` when it does not have an ``__aexit__`` method. If an ``__aexit__`` method is found, it is expected to conform to the :py:meth:`object.__aexit__` signature. Regardless of internal normalizations, ``exit`` is always returned unchanged. This allows using ``push`` as a decorator. .. seealso:: When receiving a context manager, this method only sets up ``__aexit__`` or ``__exit__`` for being called. It does not *enter* the context manager. If a context manager must also be entered, use :py:meth:`~.enter_context` instead. """ try: aexit = _slot_get(exit, "__aexit__") except AttributeError: try: aexit = awaitify(_slot_get(exit, "__exit__")) except AttributeError: assert callable( exit ), f"Expected (async) context manager or callable, got {exit}" aexit = awaitify(exit) self._exit_callbacks.append(aexit) return exit
[docs] def callback(self, callback: Callable, *args, **kwargs): """ Registers an arbitrary callback to be called with arguments on unwinding :return: the ``callback`` parameter, unchanged The callback is invoked as ``await callback(*args, **kwargs)`` when the stack unwinds. It does not receive the current exception details and cannot suppress the exception handled by the stack. The callback is treated as :term:`async neutral`, i.e. it may be a synchronous function. This method does not change its argument, and can be used as a context manager. """ self._exit_callbacks.append( partial(self._aexit_callback, awaitify(partial(callback, *args, **kwargs))) ) return callback
[docs] async def enter_context(self, cm: AsyncContextManager): """ Enter the supplied context manager, and register it for exit if successful This method is equivalent to using ``cm`` in an ``async with`` statement; if ``cm`` can only be used in a ``with`` statement, it is silently promoted. The stack will enter ``cm`` and, if successful, ensure that ``cm`` is exited when the stack unwinds. The return value of this method is the value that ``cm`` provides in an ``async with`` statement. .. code-block:: python3 # explicitly enter context managers async with cm_a as value_a, cm_b as value_b: ... # programmatically enter context managers async with a.ExitStack() as exit_stack: value_a = exit_stack.enter_context(cm_a) value_b = exit_stack.enter_context(cm_b) ... When unwinding, the context manager is exited as if it were part of a regular stack of ``async with`` (that is, in LIFO order). It receives the current exception details and may suppress it as usual. As with the ``async with`` statement, if the context cannot be entered (that is, ``await cm.__aenter__()`` throws an exception) it is not exited either. """ try: aexit = _slot_get(cm, "__aexit__") except AttributeError: aexit = awaitify(_slot_get(cm, "__exit__")) context_value = _slot_get(cm, "__enter__")() else: context_value = await _slot_get(cm, "__aenter__")() self._exit_callbacks.append(aexit) return context_value
[docs] async def aclose(self): """ Immediately unwind the context stack .. note:: Unlike the regular :py:meth:`contextlib.ExitStack.close` method, this method is ``async`` and follows the ``aclose`` naming convention. """ await self.__aexit__(None, None, None)
@staticmethod def _stitch_context( exception: BaseException, context: BaseException, base_context: Optional[BaseException], ): """ Emulate that `exception` was caused by an unhandled `context` :param exception: the exception to adjust :param context: the context :param base_context: the latest context we would adjust """ # we may have receive a child exception of the one that needs stitching # walk the contexts until we reach a root exception (no context) or our # own base context exc_context = exception.__context__ while exc_context is not None and exc_context is not base_context: if exc_context is context: return exception = exc_context exc_context = exception.__context__ # Change the end of the chain to point to the exception # we expect it to reference exception.__context__ = context async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, tb): received_exc = exc_type is not None # Even if we don't handle an exception *right now*, we may be part # of an exception handler unwinding gracefully. This is our __context__. unwind_context = sys.exc_info()[1] suppress_exc = False reraise_exc = False # Callbacks are invoked in LIFO order to match nested context managers for callback in reversed(self._exit_callbacks): try: if await callback(exc_type, exc_val, tb): suppress_exc = True reraise_exc = False exc_type = exc_val = tb = None except BaseException as exc: # simulate the stack of exceptions by setting the context self._stitch_context(exc, exc_val, unwind_context) reraise_exc = True exc_type, exc_val, tb = type(exc), exc, exc.__traceback__ if reraise_exc: # The __context__ is replaced by a normal `raise`, and only # preserved by a bare `raise` in an except block. exc_context = exc_val.__context__ try: raise exc_val except BaseException: exc_val.__context__ = exc_context raise return received_exc and suppress_exc