"""The sessions-side tool runner — the managed-agents counterpart to ``client.beta.messages.tool_runner``. :class:`SessionToolRunner` attaches to a managed-agents session's event stream, reconciles against the events-list endpoint, dispatches every ``agent.tool_use`` *and* ``agent.custom_tool_use`` event against a local tool registry, posts the matching result event back (``user.tool_result`` / ``user.custom_tool_result``), and yields one :class:`DispatchedToolCall` per completed call. It also stops itself once the session has been idle (``stop_reason`` ``end_turn``) for ``max_idle`` seconds. It does **not** touch the work-item lease — wrap it in :class:`anthropic.lib.environments.EnvironmentWorker` if you need heartbeating / force-stop. """ from __future__ import annotations import json import math import time import logging import contextlib from typing import TYPE_CHECKING, Union, cast from dataclasses import dataclass from collections.abc import Sequence, AsyncIterator import anyio from .._retry import TRANSIENT_ERRORS, is_fatal_status_error from ..._types import Headers from ._tool_dispatch import tool_registry, run_runnable_tool, tool_error_content from .._scoped_client import HelperTag, _copy_client_with_bearer_auth from ._beta_functions import ( ToolError, BetaRunnableTool, BetaAsyncRunnableTool, BetaFunctionToolResultType, aclose_runnable_tool, ) from ...types.beta.sessions import BetaManagedAgentsAgentToolUseEvent, BetaManagedAgentsAgentCustomToolUseEvent from ...types.beta.sessions.beta_managed_agents_user_tool_result_event_params import ( Content as _SessionContent, BetaManagedAgentsUserToolResultEventParams, ) from ...types.beta.sessions.beta_managed_agents_user_custom_tool_result_event_params import ( BetaManagedAgentsUserCustomToolResultEventParams, ) if TYPE_CHECKING: from ..._client import AsyncAnthropic from ...resources.beta.sessions.events import AsyncEvents __all__ = [ "SessionToolRunner", "DispatchedToolCall", "DispatchedToolUseEvent", "DispatchedToolResultParams", "BetaAnyRunnableTool", "MANAGED_AGENTS_BETA", "DEFAULT_MAX_IDLE", # Re-exported for ``anthropic.lib.environments._worker``, which drives the # runner as an async context manager inside its own task group. "_run_session_tools", ] # Either sync or async runnable tool — the union the session-side runners # accept. ``Beta``-prefixed for consistency with the released # ``BetaRunnableTool`` (sync) / ``BetaAsyncRunnableTool`` (async) members it # unions; those two are unchanged. BetaAnyRunnableTool = Union[BetaRunnableTool, BetaAsyncRunnableTool] # The two tool-call event kinds the runner dispatches against the local tool # registry, and the matching result-event params it posts back for each: # # agent.tool_use -> user.tool_result (builtin agent_toolset tools) # agent.custom_tool_use -> user.custom_tool_result (custom, user-defined tools) # # ``agent.mcp_tool_use`` is intentionally absent — MCP tools run server-side and # the runner never sees a result to post for them. DispatchedToolUseEvent = Union[BetaManagedAgentsAgentToolUseEvent, BetaManagedAgentsAgentCustomToolUseEvent] DispatchedToolResultParams = Union[ BetaManagedAgentsUserToolResultEventParams, BetaManagedAgentsUserCustomToolResultEventParams, ] # anthropic-beta gating Sessions access to self-hosted environments. The Sessions # resource auto-injects this header on its own requests; this constant is kept # for the work-item ``stop`` call the worker issues against the Work resource. MANAGED_AGENTS_BETA = "managed-agents-2026-04-01" STREAM_BACKOFF_START = 0.5 STREAM_BACKOFF_CAP = 10.0 # Outer per-tool-call timeout. This MUST stay strictly greater than the bash # tool's own ``agent_toolset.BASH_DEFAULT_TIMEOUT`` (120s). The bash tool wraps # its read in its own ``anyio.fail_after(BASH_DEFAULT_TIMEOUT)`` and, on # ``TimeoutError``, tears down the subprocess. If this outer deadline equalled # the inner one, the *outer* fail_after could win the race; anyio then raises # the parent scope's cancel as a plain ``Cancelled`` (NOT ``TimeoutError``), so # the bash tool's ``except TimeoutError`` cleanup never runs and its subprocess # is left alive with the timed-out command still queued — the next bash call # then reads stale output. The 30s margin gives the inner fail_after room to # fire and clean up before this one. (BashSession also now closes on any # outer-scope cancel as a belt-and-braces backstop, but these two timeouts must # still never be equal.) Invariant covered by # tests/lib/tools/test_session_runner.py::test_tool_timeout_exceeds_bash_default. TOOL_TIMEOUT = 150.0 SEND_RETRIES = 3 # Grace period, in seconds, that the runner keeps running after the session goes # idle with stop_reason ``end_turn`` before it stops; any new event in that # window resets it. ``max_idle=None`` disables it (run until the session ends). DEFAULT_MAX_IDLE = 60.0 log = logging.getLogger(__name__) class _IdleClock: """Tracks how long the session has been idle after an ``end_turn`` stop. :attr:`end_turn_at` is the monotonic timestamp of the most recent ``session.status_idle`` event with ``stop_reason.type == "end_turn"`` for which no newer event has since arrived; ``None`` whenever the session is not in that state. :meth:`SessionToolRunner._idle_watchdog` stops the runner once it has been set for ``max_idle`` seconds. The clock is event-driven, not polled: every armed-state change signals the :attr:`wake` event so the watchdog wakes immediately instead of waiting out a poll interval. The watchdog captures :attr:`wake` *before* it reads :attr:`end_turn_at`, so a change landing between the read and the wait still wakes it. """ __slots__ = ("end_turn_at", "wake") def __init__(self) -> None: self.end_turn_at: float | None = None self.wake = anyio.Event() def _signal(self) -> None: # Wake any current waiter and arm a fresh event for the next wait. self.wake.set() self.wake = anyio.Event() def note_event(self, ev: object) -> None: """Arm the clock on an ``end_turn`` idle, disarm it on anything else.""" if ( getattr(ev, "type", None) == "session.status_idle" and getattr(getattr(ev, "stop_reason", None), "type", None) == "end_turn" ): self.arm() else: self.disarm() def arm(self) -> None: """(Re)start the idle countdown from now and wake the watchdog.""" self.end_turn_at = time.monotonic() self._signal() def disarm(self) -> None: """Cancel the idle countdown; only signals on an actual transition.""" if self.end_turn_at is not None: self.end_turn_at = None self._signal() @dataclass(frozen=True) class DispatchedToolCall: """One tool call observed by :class:`SessionToolRunner`. Covers both tool-call event kinds — a builtin ``agent.tool_use`` and a custom ``agent.custom_tool_use``. The originating event is in :attr:`event` (with its input) and the posted-back result in :attr:`result`; ``name`` and ``tool_use_id`` are flat conveniences mirroring ``event``. """ event: DispatchedToolUseEvent """The full ``agent.tool_use`` / ``agent.custom_tool_use`` event the agent emitted. The tool input is ``event.input``.""" result: DispatchedToolResultParams | None """The result event the runner computed and attempted to post back to the session — ``user.tool_result`` for an ``agent.tool_use`` call, ``user.custom_tool_result`` for an ``agent.custom_tool_use`` call. The computed content is ``result["content"]``. ``None`` when the runner deliberately posted nothing: the tool name is not one this runner owns, so the ``tool_use_id`` was left pending for its owner. ``posted`` is ``False`` in that case.""" tool_use_id: str """Convenience: the id of the originating tool-call event — the same value as ``event.id`` for both event kinds.""" name: str """Convenience: the tool name — the same value as ``event.name``.""" is_error: bool """Convenience: whether the result is an error — the same value as ``result["is_error"]``. Always ``False`` for a skipped unowned call (the runner reaches no verdict on a tool it does not own; ``result`` is ``None``).""" posted: bool = True """``True`` if the result event made it to the session. ``False`` if all retries were exhausted or the server returned a permanent 4xx — in which case the session-side agent will *not* see this result and the consumer may want to surface that or retry at a higher level — and also ``False``, with ``result`` left ``None``, when the tool name is not one this runner owns and it deliberately posted nothing, leaving the ``tool_use_id`` pending for its owner (the split-client partial-fulfilment behavior).""" _HELPER: HelperTag = "session-tool-runner" def _scoped_client(client: AsyncAnthropic, environment_key: str | None) -> AsyncAnthropic: """Build the runner's request client. With an environment key, defer to :func:`_copy_client_with_bearer_auth` for a Bearer-only sub-client. Without one, layer the helper-telemetry header onto the caller's client via ``with_options`` (parent is not mutated). """ if environment_key is not None: return _copy_client_with_bearer_auth(client, auth_token=environment_key, helper=_HELPER) return client.with_options(default_headers={"x-stainless-helper": _HELPER}) def _to_session_content(content: BetaFunctionToolResultType) -> list[_SessionContent]: """Bridge Messages-API tool-result content to the narrower Sessions-API content union. The two APIs share text/image/document/search_result block shapes but use distinct nominal TypedDicts; ToolReference blocks have no Sessions equivalent so they are stringified. """ if isinstance(content, str): return [{"type": "text", "text": content or "(no output)"}] out: list[_SessionContent] = [] for block in content: kind = block.get("type") if kind == "text": text = cast("str", block.get("text") or "(no output)") out.append({"type": "text", "text": text}) elif kind in ("image", "document", "search_result"): out.append(cast("_SessionContent", block)) else: out.append({"type": "text", "text": json.dumps(block)}) return out or [{"type": "text", "text": "(no output)"}] def _build_result_event( ev: DispatchedToolUseEvent, content: BetaFunctionToolResultType, is_error: bool, ) -> DispatchedToolResultParams: """Build the result-event params matching ``ev``'s tool-call kind. A custom tool call (``agent.custom_tool_use``) is answered with a ``user.custom_tool_result`` keyed by ``custom_tool_use_id``; a builtin tool call (``agent.tool_use``) with a ``user.tool_result`` keyed by ``tool_use_id``. Both use the codegen'd event-params TypedDicts. """ session_content = _to_session_content(content) if ev.type == "agent.custom_tool_use": custom_result: BetaManagedAgentsUserCustomToolResultEventParams = { "type": "user.custom_tool_result", "custom_tool_use_id": ev.id, "is_error": is_error, "content": session_content, } return custom_result builtin_result: BetaManagedAgentsUserToolResultEventParams = { "type": "user.tool_result", "tool_use_id": ev.id, "is_error": is_error, "content": session_content, } return builtin_result class SessionToolRunner: """Attach to a managed-agents session and dispatch its tool calls locally. The sessions-side counterpart to ``client.beta.messages.tool_runner``: an async iterable that, for each ``agent.tool_use`` or ``agent.custom_tool_use`` event the agent emits, executes the matching tool from ``tools``, posts the matching result event back (``user.tool_result`` for a builtin tool call, ``user.custom_tool_result`` for a custom one), and yields one :class:`DispatchedToolCall`. Internally drives event-stream reconnect (with capped backoff) and result posting via an ``anyio`` task group, so it works under both ``asyncio`` and ``trio``. Iteration ends when the session terminates (``session.status_terminated`` / ``session.deleted``), when the consumer breaks out of the loop, or — once the session has gone idle with ``stop_reason`` ``end_turn`` — when ``max_idle`` seconds elapse with no new event (any new event resets the countdown; it re-arms on the next ``end_turn`` idle). ``max_idle=None`` disables that last condition. On exit it runs each tool's optional cleanup: the ``close`` hook and, for tools defined as an (async) context manager, its ``__exit__`` / ``__aexit__``. It does **not** touch the work-item lease — wrap it in an :class:`~anthropic.lib.environments.EnvironmentWorker` for heartbeating / force-stop. Pass ``environment_key`` to authenticate the event stream / list / send calls with the self-hosted environment key (bearered, with the client's default ``x-api-key`` dropped); leave it unset to use the client's own credentials. A self-hosted session is commonly serviced by **two** clients at once: this runner inside the customer's sandbox (registered with the file/shell sandbox tools) and the customer's app backend (handling the agent's ``custom`` function tools). The Sessions API has a partial-fulfilment contract: when a session pauses on ``requires_action`` the pending tool-call ids can mix both kinds, and each client must post results **only** for the ids it owns and leave the rest pending for the other client. A tool-call event whose name is not in ``tools`` is therefore assumed to belong to the other client: the runner posts no result for it, does not mark it answered, and leaves the ``tool_use_id`` pending — but still yields a :class:`DispatchedToolCall` (``posted=False``, ``is_error=False``, ``result=None``) so the caller can observe the unowned dispatch. Usage:: from anthropic.lib.tools.agent_toolset import AgentToolContext, beta_agent_toolset_20260401 async with AgentToolContext(workdir="/workspace") as env: async for call in client.beta.sessions.events.tool_runner( work.data.id, tools=[*beta_agent_toolset_20260401(env), my_tool], ): print(f"{call.name} -> {'error' if call.is_error else 'ok'}") """ def __init__( self, client: AsyncAnthropic, session_id: str, *, tools: Sequence[BetaAnyRunnableTool], max_idle: float | None = DEFAULT_MAX_IDLE, environment_key: str | None = None, extra_headers: Headers | None = None, ) -> None: self.session_id = session_id self.tools: Sequence[BetaAnyRunnableTool] = tools self.max_idle = max_idle # All event stream / list / send requests are issued via this scoped # sub-client: Bearer-only when an environment key is set, otherwise the # caller's own client with the helper-telemetry header layered on. self._scoped = _scoped_client(client, environment_key) # Per-request passthrough headers: threaded into every event stream / # list / send via that call's ``extra_headers=`` (make_request_options) # — never assigned onto the client, so client state is not mutated. # Auth and ``x-stainless-helper`` come from the scoped sub-client and # the parent client's ``default_headers`` propagate via its # ``client.copy()``; per the SDK's standard ``extra_headers`` # precedence a caller header overrides the scoped client's same-named # default for that request, so this is for caller passthrough (trace # ids etc.), not auth. self.extra_headers = extra_headers async def __aiter__(self) -> AsyncIterator[DispatchedToolCall]: async with self._run() as calls: async for call in calls: yield call async def until_done(self) -> None: """Drive the runner to completion, discarding the per-call observations. Named to match ``BetaToolRunner.until_done`` (and to avoid colliding with :meth:`EnvironmentWorker.run`, which is a forever-loop): it returns once the session ends / goes idle, rather than running until cancelled. """ async for _ in self: pass # -- run lifecycle ------------------------------------------------------ @contextlib.asynccontextmanager async def _run(self) -> AsyncIterator[AsyncIterator[DispatchedToolCall]]: """Drive the session tool loop, yielding an iterator of :class:`DispatchedToolCall`. :meth:`__aiter__` (and the module-level :func:`_run_session_tools` shim used by ``EnvironmentWorker``) wrap this. Per-run state lives on ``self`` as private attributes so the loops below — :meth:`_stream_loop`, :meth:`_dispatch_loop`, :meth:`_reconcile`, :meth:`_idle_watchdog`, :meth:`_stop_watcher` — can mutate it as methods rather than threading a shared state object through free functions. """ self._events: AsyncEvents = self._scoped.beta.sessions.events log.info("session tool runner starting session_id=%s", self.session_id) self._tools_by_name: dict[str, BetaAnyRunnableTool] = tool_registry(self.tools) # ``_seen`` dedups tool-call events across the stream and the reconcile # pass (by event id); ``_answered`` holds the ids whose result post has # actually landed, so a failed post is retried on the next reconcile. self._seen: set[str] = set() self._answered: set[str] = set() self._stop = anyio.Event() self._idle_clock = _IdleClock() self._send_work, self._recv_work = anyio.create_memory_object_stream[DispatchedToolUseEvent]( max_buffer_size=100, ) self._send_results, self._recv_results = anyio.create_memory_object_stream[DispatchedToolCall]( max_buffer_size=math.inf, ) async def iterator() -> AsyncIterator[DispatchedToolCall]: # ``_recv_results`` is explicitly closed in the outer ``finally`` to # keep cleanup deterministic regardless of whether the consumer # iterated at all (e.g. ``async with runner._run(): pass``). async for call in self._recv_results: yield call try: # The outer ``CancelScope`` absorbs the task-group cancellation we # trigger in the ``finally`` below, so it doesn't surface to the # consumer as ``Cancelled``. with anyio.CancelScope(): async with anyio.create_task_group() as tg: # The stop watcher closes ``_send_work`` when ``_stop`` is # set so the dispatch loop's ``receive()`` raises # EndOfStream and the loop exits cleanly without us having # to inject a sentinel or race two awaitables. tg.start_soon(self._stop_watcher) tg.start_soon(self._stream_loop) tg.start_soon(self._dispatch_loop) if self.max_idle is not None: tg.start_soon(self._idle_watchdog) try: yield iterator() finally: # Signal every loop to exit. Most exit voluntarily on # ``_stop``; cancelling the task group's scope wakes # anything still blocked on an unrelated await (e.g. an # uncancellable test fake). anyio absorbs the resulting # cancel via the outer ``CancelScope``. self._stop.set() tg.cancel_scope.cancel() finally: # Explicitly close every stream so anyio doesn't warn on GC. # ``aclose`` is idempotent, so it's fine if the producer already # closed its end during normal shutdown. with anyio.CancelScope(shield=True): for stream in (self._recv_results, self._send_results, self._recv_work, self._send_work): try: await stream.aclose() except Exception: pass # Run each tool's optional cleanup (``close`` hook and, for # context-manager tools, ``__exit__`` / ``__aexit__``). Shielded so # the hooks survive the surrounding cancellation. with anyio.CancelScope(shield=True): for tool in self.tools: await aclose_runnable_tool(tool) # -- event-stream + reconcile ------------------------------------------ async def _reconcile(self) -> None: """Read full history and enqueue every tool-call event still unanswered. Two-pass: read the whole history before emitting so a tool-call whose result appears later in the same history is not re-dispatched. Pairs ``agent.tool_use`` with ``user.tool_result`` and ``agent.custom_tool_use`` with ``user.custom_tool_result`` when computing which calls are answered. """ pending: list[DispatchedToolUseEvent] = [] last_was_end_turn = False list_failed = False try: async for ev in self._events.list(self.session_id, limit=1000, extra_headers=self.extra_headers): if ev.type == "agent.tool_use" or ev.type == "agent.custom_tool_use": # Mark the event seen so the live stream doesn't re-enqueue it, but # decide whether it still needs executing from ``_answered``, not # ``_seen``: a call whose result post failed is seen-but-unanswered # and must be retried on the next reconcile pass rather than dropped. self._seen.add(ev.id) pending.append(ev) elif ev.type == "user.tool_result": self._answered.add(ev.tool_use_id) elif ev.type == "user.custom_tool_result": self._answered.add(ev.custom_tool_use_id) last_was_end_turn = ( ev.type == "session.status_idle" and getattr(getattr(ev, "stop_reason", None), "type", None) == "end_turn" ) except Exception as e: # Pagination may have failed partway through; the ``_answered`` set # could be incomplete, so dispatching ``pending`` now would risk # re-running a tool whose result was on a page we never reached. # The next reconnect will retry the reconcile. Leave ``_idle_clock`` # untouched since the history we read may be incomplete. log.warning("reconcile list failed; skipping pending enqueue error=%s", e) list_failed = True if list_failed: # Roll back the ids we added to ``_seen`` so the live stream can # re-process them rather than silently dedup what we never finished # reading. for ev in pending: self._seen.discard(ev.id) return unanswered = [ev for ev in pending if ev.id not in self._answered] # If the most recent event in history is an ``end_turn`` idle and there # is no outstanding tool work, the session is done — arm the idle clock # so the watchdog counts down even if that ``end_turn`` arrived during a # disconnect. if last_was_end_turn and not unanswered: self._idle_clock.arm() else: self._idle_clock.disarm() for ev in unanswered: await self._send_work.send(ev) async def _stream_loop(self) -> None: backoff = STREAM_BACKOFF_START while not self._stop.is_set(): try: # Open the stream *before* reconciling: with the stream already # attached, an event emitted in the gap between the list call # and the attach is delivered live instead of lost. ``_seen`` # dedups any overlap between the history and the live stream. async with await self._events.stream(self.session_id, extra_headers=self.extra_headers) as stream: await self._reconcile() async for ev in stream: backoff = STREAM_BACKOFF_START # Arm/disarm the idle clock: an ``end_turn`` idle starts # the grace countdown, any other event cancels it. self._idle_clock.note_event(ev) if ev.type == "agent.tool_use" or ev.type == "agent.custom_tool_use": if ev.id not in self._seen: self._seen.add(ev.id) await self._send_work.send(ev) elif ev.type == "user.tool_result": self._answered.add(ev.tool_use_id) elif ev.type == "user.custom_tool_result": self._answered.add(ev.custom_tool_use_id) elif ev.type in ("session.status_terminated", "session.deleted"): log.info("session terminated") self._stop.set() return except TRANSIENT_ERRORS as e: if self._stop.is_set(): return if is_fatal_status_error(e): # No amount of backoff will fix a 401/403; bail out so the # consumer sees the runner exit instead of looping silently. log.error("stream failed permanently error=%s", e) self._stop.set() return log.warning("stream disconnected, reconnecting backoff=%.1fs error=%s", backoff, e) with anyio.move_on_after(backoff): await self._stop.wait() backoff = min(backoff * 2, STREAM_BACKOFF_CAP) # -- tool dispatch ------------------------------------------------------ async def _dispatch_loop(self) -> None: try: while True: try: ev = await self._recv_work.receive() except anyio.EndOfStream: # Producer side closed — usually because ``_stop`` was set # (the idle watchdog or stream loop signalled it). return if ev.id in self._answered: continue # Shielded execute so consumer-side cancellation can't interrupt # an in-flight tool. The result will still be posted and the # DispatchedToolCall enqueued before the cancel propagates. with anyio.CancelScope(shield=True): await self._execute(ev) finally: # Closing the results stream signals the iterator that no more # results will arrive. Wrapped in a shield because we're often in a # cancellation path on the way out. with anyio.CancelScope(shield=True): await self._send_results.aclose() async def _execute(self, ev: DispatchedToolUseEvent) -> None: log.info("executing tool tool=%s tool_use_id=%s", ev.name, ev.id) tool = self._tools_by_name.get(ev.name) tool_result: DispatchedToolResultParams | None if tool is None: # Skip unowned (split-client partial fulfilment): a name this # runner is not registered for belongs to the other client # servicing this session (typically the customer's app backend # handling custom tools). Post NO result, do not mark it answered, # and leave the tool_use_id pending for its owner — claiming it # would corrupt the conversation (the model would read "not # implemented" as the tool output while the real result from the # other client arrives afterwards). Still yield the call so the # caller can observe the unowned dispatch; nothing was sent, so # ``posted`` and ``is_error`` stay False and ``result`` is None. # The id stays unanswered, so reconcile keeps it out of the # idle/end-turn accounting and re-surfaces it after a reconnect # until its owner answers it. log.info( "tool %r not owned by this runner; leaving tool_use_id=%s pending for its owner", ev.name, ev.id, ) tool_result = None is_error = False sent = False else: content: BetaFunctionToolResultType is_error = False input_ = dict(ev.input) try: with anyio.fail_after(TOOL_TIMEOUT): content = await run_runnable_tool(tool, input_) except TimeoutError: content = f"tool {ev.name!r} timed out" is_error = True except ToolError as e: content = tool_error_content(e) is_error = True except Exception as e: log.exception("tool %s raised", ev.name) content = tool_error_content(e) is_error = True tool_result = _build_result_event(ev, content, is_error) sent = await self._send_result(tool_result, ev.id) try: await self._send_results.send( DispatchedToolCall( event=ev, result=tool_result, tool_use_id=ev.id, name=ev.name, is_error=is_error, posted=sent, ) ) except anyio.BrokenResourceError: # The receiver closed early (consumer broke out of the iterator). # Result was still posted to the session; we just can't surface # the observability event. pass async def _send_result(self, tool_result: DispatchedToolResultParams, tool_use_id: str) -> bool: """Post ``tool_result`` back to the session, retrying transient failures. ``tool_use_id`` is the originating tool-call event id — passed explicitly because the result params key it differently (``tool_use_id`` vs ``custom_tool_use_id``) depending on the kind. """ last_err: Exception | None = None for i in range(SEND_RETRIES): try: await self._events.send( self.session_id, events=[tool_result], extra_headers=self.extra_headers, ) self._answered.add(tool_use_id) return True except TRANSIENT_ERRORS as e: last_err = e if is_fatal_status_error(e): break # Don't sleep after the final attempt — there is no retry to wait for. if i < SEND_RETRIES - 1: await anyio.sleep(i + 1) log.error("failed to send tool result tool_use_id=%s error=%s", tool_use_id, last_err) return False # -- background watchers ----------------------------------------------- async def _idle_watchdog(self) -> None: """Stop the runner once the session has been idle (``end_turn``) for ``max_idle`` seconds with no new events. Event-driven: it blocks on the idle clock's wake event rather than polling. Capturing ``clock.wake`` *before* reading ``clock.end_turn_at`` closes the race where the clock changes between the read and the wait. """ max_idle = self.max_idle assert max_idle is not None clock = self._idle_clock while not self._stop.is_set(): wake = clock.wake at = clock.end_turn_at if at is None: # Not armed: block until the clock arms (or the runner stops). await _wait_first(wake, self._stop) continue remaining = max_idle - (time.monotonic() - at) if remaining <= 0: log.info("session idle after end_turn for %.0fs; stopping", max_idle) self._stop.set() return # Armed: wait out the remaining grace, waking early if the clock # changes (disarmed, or re-armed with a fresh timestamp) or the # runner stops. Either way, loop back and re-evaluate. with anyio.move_on_after(remaining): await _wait_first(wake, self._stop) async def _stop_watcher(self) -> None: """When ``_stop`` is set, close the work stream so :meth:`_dispatch_loop` exits.""" await self._stop.wait() await self._send_work.aclose() async def _wait_first(*events: anyio.Event) -> None: """Return as soon as any of ``events`` is set.""" async with anyio.create_task_group() as tg: async def _waiter(ev: anyio.Event) -> None: await ev.wait() tg.cancel_scope.cancel() for ev in events: tg.start_soon(_waiter, ev) @contextlib.asynccontextmanager async def _run_session_tools( client: AsyncAnthropic, session_id: str, *, tools: Sequence[BetaAnyRunnableTool], max_idle: float | None = DEFAULT_MAX_IDLE, environment_key: str | None = None, extra_headers: Headers | None = None, ) -> AsyncIterator[AsyncIterator[DispatchedToolCall]]: """Internal: drive a :class:`SessionToolRunner` as an async context manager. Kept as a thin module-level shim because :class:`~anthropic.lib.environments.EnvironmentWorker` enters the runner inside its own task group and wants the context-manager shape for deterministic cleanup. New code should iterate :class:`SessionToolRunner` directly. """ runner = SessionToolRunner( client, session_id, tools=tools, max_idle=max_idle, environment_key=environment_key, extra_headers=extra_headers, ) async with runner._run() as calls: # noqa: SLF001 yield calls