# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. from __future__ import annotations import json from typing import Mapping, cast from ..._models import construct_type from ..._resource import SyncAPIResource, AsyncAPIResource from ..._exceptions import AnthropicError from ...types.beta.unwrap_webhook_event import UnwrapWebhookEvent __all__ = ["Webhooks", "AsyncWebhooks"] class Webhooks(SyncAPIResource): def unwrap(self, payload: str, *, headers: Mapping[str, str], key: str | bytes | None = None) -> UnwrapWebhookEvent: try: from standardwebhooks import Webhook except ImportError as exc: raise AnthropicError("You need to install `anthropic[webhooks]` to use this method") from exc if key is None: key = self._client.webhook_key if key is None: raise ValueError( "Cannot verify a webhook without a key on either the client's webhook_key or passed in as an argument" ) if not isinstance(headers, dict): headers = dict(headers) Webhook(key).verify(payload, headers) return cast( UnwrapWebhookEvent, construct_type( type_=UnwrapWebhookEvent, value=json.loads(payload), ), ) class AsyncWebhooks(AsyncAPIResource): def unwrap(self, payload: str, *, headers: Mapping[str, str], key: str | bytes | None = None) -> UnwrapWebhookEvent: try: from standardwebhooks import Webhook except ImportError as exc: raise AnthropicError("You need to install `anthropic[webhooks]` to use this method") from exc if key is None: key = self._client.webhook_key if key is None: raise ValueError( "Cannot verify a webhook without a key on either the client's webhook_key or passed in as an argument" ) if not isinstance(headers, dict): headers = dict(headers) Webhook(key).verify(payload, headers) return cast( UnwrapWebhookEvent, construct_type( type_=UnwrapWebhookEvent, value=json.loads(payload), ), )