Source code for aiohttp.client

"""HTTP Client for asyncio."""

import asyncio
import base64
import hashlib
import json
import os
import sys
import traceback
import warnings
from collections.abc import (
    Awaitable,
    Callable,
    Coroutine,
    Generator,
    Iterable,
    Sequence,
)
from contextlib import suppress
from types import TracebackType
from typing import (
    TYPE_CHECKING,
    Any,
    Final,
    Generic,
    Literal,
    TypedDict,
    TypeVar,
    overload,
)

import attr
from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr
from yarl import URL

from . import hdrs, http, payload
from ._websocket.reader import WebSocketDataQueue
from .abc import AbstractCookieJar
from .client_exceptions import (
    ClientConnectionError,
    ClientConnectionResetError,
    ClientConnectorCertificateError,
    ClientConnectorDNSError,
    ClientConnectorError,
    ClientConnectorSSLError,
    ClientError,
    ClientHttpProxyError,
    ClientOSError,
    ClientPayloadError,
    ClientProxyConnectionError,
    ClientResponseError,
    ClientSSLError,
    ConnectionTimeoutError,
    ContentTypeError,
    InvalidURL,
    InvalidUrlClientError,
    InvalidUrlRedirectClientError,
    NonHttpUrlClientError,
    NonHttpUrlRedirectClientError,
    RedirectClientError,
    ServerConnectionError,
    ServerDisconnectedError,
    ServerFingerprintMismatch,
    ServerTimeoutError,
    SocketTimeoutError,
    TooManyRedirects,
    WSMessageTypeError,
    WSServerHandshakeError,
)
from .client_middlewares import ClientMiddlewareType, build_client_middlewares
from .client_reqrep import (
    ClientRequest as ClientRequest,
    ClientResponse as ClientResponse,
    Fingerprint as Fingerprint,
    RequestInfo as RequestInfo,
    _merge_ssl_params,
)
from .client_ws import (
    DEFAULT_WS_CLIENT_TIMEOUT,
    ClientWebSocketResponse as ClientWebSocketResponse,
    ClientWSTimeout as ClientWSTimeout,
)
from .connector import (
    HTTP_AND_EMPTY_SCHEMA_SET,
    BaseConnector as BaseConnector,
    NamedPipeConnector as NamedPipeConnector,
    TCPConnector as TCPConnector,
    UnixConnector as UnixConnector,
)
from .cookiejar import CookieJar
from .helpers import (
    _SENTINEL,
    DEBUG,
    DEFAULT_CHUNK_SIZE,
    EMPTY_BODY_METHODS,
    BasicAuth,
    TimeoutHandle,
    basicauth_from_netrc,
    get_env_proxy_for_url,
    netrc_from_env,
    sentinel,
    strip_auth_from_url,
)
from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter
from .http_websocket import WSHandshakeError, ws_ext_gen, ws_ext_parse
from .tracing import Trace, TraceConfig
from .typedefs import (
    JSONBytesEncoder,
    JSONEncoder,
    LooseCookies,
    LooseHeaders,
    Query,
    StrOrURL,
)

__all__ = (
    # client_exceptions
    "ClientConnectionError",
    "ClientConnectionResetError",
    "ClientConnectorCertificateError",
    "ClientConnectorDNSError",
    "ClientConnectorError",
    "ClientConnectorSSLError",
    "ClientError",
    "ClientHttpProxyError",
    "ClientOSError",
    "ClientPayloadError",
    "ClientProxyConnectionError",
    "ClientResponseError",
    "ClientSSLError",
    "ConnectionTimeoutError",
    "ContentTypeError",
    "InvalidURL",
    "InvalidUrlClientError",
    "RedirectClientError",
    "NonHttpUrlClientError",
    "InvalidUrlRedirectClientError",
    "NonHttpUrlRedirectClientError",
    "ServerConnectionError",
    "ServerDisconnectedError",
    "ServerFingerprintMismatch",
    "ServerTimeoutError",
    "SocketTimeoutError",
    "TooManyRedirects",
    "WSServerHandshakeError",
    # client_reqrep
    "ClientRequest",
    "ClientResponse",
    "Fingerprint",
    "RequestInfo",
    # connector
    "BaseConnector",
    "TCPConnector",
    "UnixConnector",
    "NamedPipeConnector",
    # client_ws
    "ClientWebSocketResponse",
    # client
    "ClientSession",
    "ClientTimeout",
    "ClientWSTimeout",
    "request",
    "WSMessageTypeError",
)


if TYPE_CHECKING:
    from ssl import SSLContext
else:
    SSLContext = Any

if sys.version_info >= (3, 11) and TYPE_CHECKING:
    from typing import Unpack


class _RequestOptions(TypedDict, total=False):
    params: Query
    data: Any
    json: Any
    cookies: LooseCookies | None
    headers: LooseHeaders | None
    skip_auto_headers: Iterable[str] | None
    auth: BasicAuth | None
    allow_redirects: bool
    max_redirects: int
    compress: str | bool | None
    chunked: bool | None
    expect100: bool
    raise_for_status: None | bool | Callable[[ClientResponse], Awaitable[None]]
    read_until_eof: bool
    proxy: StrOrURL | None
    proxy_auth: BasicAuth | None
    timeout: "ClientTimeout | _SENTINEL | None"
    ssl: SSLContext | bool | Fingerprint
    server_hostname: str | None
    proxy_headers: LooseHeaders | None
    trace_request_ctx: object
    read_bufsize: int | None
    auto_decompress: bool | None
    max_line_size: int | None
    max_field_size: int | None
    max_headers: int | None
    middlewares: Sequence[ClientMiddlewareType] | None


class _WSConnectOptions(TypedDict, total=False):
    method: str
    protocols: Iterable[str]
    timeout: "ClientWSTimeout | _SENTINEL"
    receive_timeout: float | None
    autoclose: bool
    autoping: bool
    heartbeat: float | None
    auth: BasicAuth | None
    origin: str | None
    params: Query
    headers: LooseHeaders | None
    proxy: StrOrURL | None
    proxy_auth: BasicAuth | None
    ssl: SSLContext | bool | Fingerprint
    verify_ssl: bool | None
    fingerprint: bytes | None
    ssl_context: SSLContext | None
    server_hostname: str | None
    proxy_headers: LooseHeaders | None
    compress: int
    max_msg_size: int


[docs] @attr.s(auto_attribs=True, frozen=True, slots=True) class ClientTimeout: total: float | None = None connect: float | None = None sock_read: float | None = None sock_connect: float | None = None ceil_threshold: float = 5
# pool_queue_timeout: Optional[float] = None # dns_resolution_timeout: Optional[float] = None # socket_connect_timeout: Optional[float] = None # connection_acquiring_timeout: Optional[float] = None # new_connection_timeout: Optional[float] = None # http_header_timeout: Optional[float] = None # response_body_timeout: Optional[float] = None # to create a timeout specific for a single request, either # - create a completely new one to overwrite the default # - or use http://www.attrs.org/en/stable/api.html#attr.evolve # to overwrite the defaults # 5 Minute default read timeout DEFAULT_TIMEOUT: Final[ClientTimeout] = ClientTimeout(total=5 * 60, sock_connect=30) # https://www.rfc-editor.org/rfc/rfc9110#section-9.2.2 IDEMPOTENT_METHODS = frozenset({"GET", "HEAD", "OPTIONS", "TRACE", "PUT", "DELETE"}) _RetType_co = TypeVar( "_RetType_co", bound="ClientResponse | ClientWebSocketResponse[bool]", covariant=True, ) _CharsetResolver = Callable[[ClientResponse, bytes], str]
[docs] class ClientSession: """First-class interface for making HTTP requests.""" ATTRS = frozenset( [ "_base_url", "_base_url_origin", "_source_traceback", "_connector", "_loop", "_cookie_jar", "_connector_owner", "_default_auth", "_version", "_json_serialize", "_json_serialize_bytes", "_requote_redirect_url", "_timeout", "_raise_for_status", "_auto_decompress", "_trust_env", "_default_headers", "_skip_auto_headers", "_request_class", "_response_class", "_ws_response_class", "_trace_configs", "_read_bufsize", "_max_line_size", "_max_field_size", "_max_headers", "_resolve_charset", "_default_proxy", "_default_proxy_auth", "_retry_connection", "_middlewares", "requote_redirect_url", ] ) _source_traceback: traceback.StackSummary | None = None _connector: BaseConnector | None = None def __init__( self, base_url: StrOrURL | None = None, *, connector: BaseConnector | None = None, loop: asyncio.AbstractEventLoop | None = None, cookies: LooseCookies | None = None, headers: LooseHeaders | None = None, proxy: StrOrURL | None = None, proxy_auth: BasicAuth | None = None, skip_auto_headers: Iterable[str] | None = None, auth: BasicAuth | None = None, json_serialize: JSONEncoder = json.dumps, json_serialize_bytes: JSONBytesEncoder | None = None, request_class: type[ClientRequest] = ClientRequest, response_class: type[ClientResponse] = ClientResponse, ws_response_class: type[ClientWebSocketResponse] = ClientWebSocketResponse, version: HttpVersion = http.HttpVersion11, cookie_jar: AbstractCookieJar | None = None, connector_owner: bool = True, raise_for_status: bool | Callable[[ClientResponse], Awaitable[None]] = False, read_timeout: float | _SENTINEL = sentinel, conn_timeout: float | None = None, timeout: object | ClientTimeout = sentinel, auto_decompress: bool = True, trust_env: bool = False, requote_redirect_url: bool = True, trace_configs: list[TraceConfig] | None = None, read_bufsize: int = DEFAULT_CHUNK_SIZE, max_line_size: int = 8190, max_field_size: int = 8190, max_headers: int = 128, fallback_charset_resolver: _CharsetResolver = lambda r, b: "utf-8", middlewares: Sequence[ClientMiddlewareType] = (), ssl_shutdown_timeout: _SENTINEL | None | float = sentinel, ) -> None: # We initialise _connector to None immediately, as it's referenced in __del__() # and could cause issues if an exception occurs during initialisation. self._connector: BaseConnector | None = None if loop is None: if connector is not None: loop = connector._loop loop = loop or asyncio.get_running_loop() if base_url is None or isinstance(base_url, URL): self._base_url: URL | None = base_url self._base_url_origin = None if base_url is None else base_url.origin() else: self._base_url = URL(base_url) self._base_url_origin = self._base_url.origin() assert self._base_url.absolute, "Only absolute URLs are supported" if self._base_url is not None and not self._base_url.path.endswith("/"): raise ValueError("base_url must have a trailing '/'") if timeout is sentinel or timeout is None: self._timeout = DEFAULT_TIMEOUT if read_timeout is not sentinel: warnings.warn( "read_timeout is deprecated, use timeout argument instead", DeprecationWarning, stacklevel=2, ) self._timeout = attr.evolve(self._timeout, total=read_timeout) if conn_timeout is not None: self._timeout = attr.evolve(self._timeout, connect=conn_timeout) warnings.warn( "conn_timeout is deprecated, use timeout argument instead", DeprecationWarning, stacklevel=2, ) else: if not isinstance(timeout, ClientTimeout): raise ValueError( f"timeout parameter cannot be of {type(timeout)} type, " "please use 'timeout=ClientTimeout(...)'", ) self._timeout = timeout if read_timeout is not sentinel: raise ValueError( "read_timeout and timeout parameters " "conflict, please setup " "timeout.read" ) if conn_timeout is not None: raise ValueError( "conn_timeout and timeout parameters " "conflict, please setup " "timeout.connect" ) if ssl_shutdown_timeout is not sentinel: warnings.warn( "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0", DeprecationWarning, stacklevel=2, ) if connector is None: connector = TCPConnector( loop=loop, ssl_shutdown_timeout=ssl_shutdown_timeout ) if connector._loop is not loop: raise RuntimeError("Session and connector has to use same event loop") self._loop = loop if loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) if cookie_jar is None: cookie_jar = CookieJar(loop=loop) self._cookie_jar = cookie_jar if cookies: self._cookie_jar.update_cookies(cookies) if auth is not None: warnings.warn( "The 'auth' parameter is deprecated and will be removed in v4;" " pass headers={'Authorization': " "aiohttp.encode_basic_auth(login, password)} instead", DeprecationWarning, stacklevel=2, ) if proxy_auth is not None: warnings.warn( "The 'proxy_auth' parameter is deprecated and will be removed in v4;" " pass proxy_headers={'Proxy-Authorization': " "aiohttp.encode_basic_auth(login, password)} instead", DeprecationWarning, stacklevel=2, ) self._connector = connector self._connector_owner = connector_owner self._default_auth = auth self._version = version self._json_serialize = json_serialize self._json_serialize_bytes = json_serialize_bytes self._raise_for_status = raise_for_status self._auto_decompress = auto_decompress self._trust_env = trust_env self._requote_redirect_url = requote_redirect_url self._read_bufsize = read_bufsize self._max_line_size = max_line_size self._max_field_size = max_field_size self._max_headers = max_headers # Convert to list of tuples if headers: real_headers: CIMultiDict[str] = CIMultiDict(headers) else: real_headers = CIMultiDict() self._default_headers: CIMultiDict[str] = real_headers if skip_auto_headers is not None: self._skip_auto_headers = frozenset(istr(i) for i in skip_auto_headers) else: self._skip_auto_headers = frozenset() self._request_class = request_class self._response_class = response_class self._ws_response_class = ws_response_class self._trace_configs = trace_configs or [] for trace_config in self._trace_configs: trace_config.freeze() self._resolve_charset = fallback_charset_resolver self._default_proxy = proxy self._default_proxy_auth = proxy_auth self._retry_connection: bool = True self._middlewares = middlewares def __init_subclass__(cls: type["ClientSession"]) -> None: warnings.warn( f"Inheritance class {cls.__name__} from ClientSession is discouraged", DeprecationWarning, stacklevel=2, ) if DEBUG: def __setattr__(self, name: str, val: Any) -> None: if name not in self.ATTRS: warnings.warn( f"Setting custom ClientSession.{name} attribute is discouraged", DeprecationWarning, stacklevel=2, ) super().__setattr__(name, val) def __del__(self, _warnings: Any = warnings) -> None: if not self.closed: kwargs = {"source": self} _warnings.warn( f"Unclosed client session {self!r}", ResourceWarning, **kwargs ) context = {"client_session": self, "message": "Unclosed client session"} if self._source_traceback is not None: context["source_traceback"] = self._source_traceback self._loop.call_exception_handler(context) if sys.version_info >= (3, 11) and TYPE_CHECKING: def request( self, method: str, url: StrOrURL, **kwargs: Unpack[_RequestOptions], ) -> "_RequestContextManager": ... else:
[docs] def request( self, method: str, url: StrOrURL, **kwargs: Any ) -> "_RequestContextManager": """Perform HTTP request.""" return _RequestContextManager(self._request(method, url, **kwargs))
def _build_url(self, str_or_url: StrOrURL) -> URL: url = URL(str_or_url) if self._base_url and not url.absolute: return self._base_url.join(url) return url async def _request( self, method: str, str_or_url: StrOrURL, *, params: Query = None, data: Any = None, json: Any = None, cookies: LooseCookies | None = None, headers: LooseHeaders | None = None, skip_auto_headers: Iterable[str] | None = None, auth: BasicAuth | None = None, allow_redirects: bool = True, max_redirects: int = 10, compress: str | bool | None = None, chunked: bool | None = None, expect100: bool = False, raise_for_status: ( None | bool | Callable[[ClientResponse], Awaitable[None]] ) = None, read_until_eof: bool = True, proxy: StrOrURL | None = None, proxy_auth: BasicAuth | None = None, timeout: ClientTimeout | _SENTINEL = sentinel, verify_ssl: bool | None = None, fingerprint: bytes | None = None, ssl_context: SSLContext | None = None, ssl: SSLContext | bool | Fingerprint = True, server_hostname: str | None = None, proxy_headers: LooseHeaders | None = None, trace_request_ctx: object = None, read_bufsize: int | None = None, auto_decompress: bool | None = None, max_line_size: int | None = None, max_field_size: int | None = None, max_headers: int | None = None, middlewares: Sequence[ClientMiddlewareType] | None = None, ) -> ClientResponse: # NOTE: timeout clamps existing connect and read timeouts. We cannot # set the default to None because we need to detect if the user wants # to use the existing timeouts by setting timeout to None. if self.closed: raise RuntimeError("Session is closed") ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) if auth is not None: warnings.warn( "The 'auth' parameter is deprecated and will be removed in v4;" " pass headers={'Authorization': " "aiohttp.encode_basic_auth(login, password)} instead", DeprecationWarning, stacklevel=3, ) if proxy_auth is not None: warnings.warn( "The 'proxy_auth' parameter is deprecated and will be removed in v4;" " pass proxy_headers={'Proxy-Authorization': " "aiohttp.encode_basic_auth(login, password)} instead", DeprecationWarning, stacklevel=3, ) if data is not None and json is not None: raise ValueError( "data and json parameters can not be used at the same time" ) elif json is not None: if self._json_serialize_bytes is not None: data = payload.JsonBytesPayload(json, dumps=self._json_serialize_bytes) else: data = payload.JsonPayload(json, dumps=self._json_serialize) if not isinstance(chunked, bool) and chunked is not None: warnings.warn("Chunk size is deprecated #1615", DeprecationWarning) redirects = 0 history: list[ClientResponse] = [] version = self._version params = params or {} # Merge with default headers and transform to CIMultiDict headers = self._prepare_headers(headers) try: url = self._build_url(str_or_url) except ValueError as e: raise InvalidUrlClientError(str_or_url) from e assert self._connector is not None if url.scheme not in self._connector.allowed_protocol_schema_set: raise NonHttpUrlClientError(url) skip_headers: Iterable[istr] | None if skip_auto_headers is not None: skip_headers = { istr(i) for i in skip_auto_headers } | self._skip_auto_headers elif self._skip_auto_headers: skip_headers = self._skip_auto_headers else: skip_headers = None if proxy is None: proxy = self._default_proxy if proxy_auth is None: proxy_auth = self._default_proxy_auth if proxy is None: proxy_headers = None else: proxy_headers = self._prepare_headers(proxy_headers) try: proxy = URL(proxy) except ValueError as e: raise InvalidURL(proxy) from e if timeout is sentinel: real_timeout: ClientTimeout = self._timeout else: if not isinstance(timeout, ClientTimeout): real_timeout = ClientTimeout(total=timeout) else: real_timeout = timeout # timeout is cumulative for all request operations # (request, redirects, responses, data consuming) tm = TimeoutHandle( self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold ) handle = tm.start() if read_bufsize is None: read_bufsize = self._read_bufsize if auto_decompress is None: auto_decompress = self._auto_decompress if max_line_size is None: max_line_size = self._max_line_size if max_field_size is None: max_field_size = self._max_field_size if max_headers is None: max_headers = self._max_headers traces = [ Trace( self, trace_config, trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx), ) for trace_config in self._trace_configs ] for trace in traces: await trace.send_request_start(method, url.update_query(params), headers) timer = tm.timer() try: with timer: # https://www.rfc-editor.org/rfc/rfc9112.html#name-retrying-requests retry_persistent_connection = ( self._retry_connection and method in IDEMPOTENT_METHODS ) while True: url, auth_from_url = strip_auth_from_url(url) if not url.raw_host: # NOTE: Bail early, otherwise, causes `InvalidURL` through # NOTE: `self._request_class()` below. err_exc_cls = ( InvalidUrlRedirectClientError if redirects else InvalidUrlClientError ) raise err_exc_cls(url) # If `auth` was passed for an already authenticated URL, # disallow only if this is the initial URL; this is to avoid issues # with sketchy redirects that are not the caller's responsibility if not history and (auth and auth_from_url): raise ValueError( "Cannot combine AUTH argument with " "credentials encoded in URL" ) # Override the auth with the one from the URL only if we # have no auth, or if we got an auth from a redirect URL if auth is None or (history and auth_from_url is not None): auth = auth_from_url if ( auth is None and self._default_auth and ( not self._base_url or self._base_url_origin == url.origin() ) ): auth = self._default_auth # Try netrc if auth is still None and trust_env is enabled. if auth is None and self._trust_env and url.host is not None: auth = await self._loop.run_in_executor( None, self._get_netrc_auth, url.host ) # It would be confusing if we support explicit # Authorization header with auth argument if ( headers is not None and auth is not None and hdrs.AUTHORIZATION in headers ): raise ValueError( "Cannot combine AUTHORIZATION header " "with AUTH argument or credentials " "encoded in URL" ) all_cookies = self._cookie_jar.filter_cookies(url) if cookies is not None: tmp_cookie_jar = CookieJar( unsafe=self._cookie_jar.unsafe, quote_cookie=self._cookie_jar.quote_cookie, ) tmp_cookie_jar.update_cookies(cookies) req_cookies = tmp_cookie_jar.filter_cookies(url) if req_cookies: all_cookies.load(req_cookies) proxy_: URL | None = None if proxy is not None: proxy_ = URL(proxy) elif self._trust_env: with suppress(LookupError): proxy_, proxy_auth = await asyncio.to_thread( get_env_proxy_for_url, url ) req = self._request_class( method, url, params=params, headers=headers, skip_auto_headers=skip_headers, data=data, cookies=all_cookies, auth=auth, version=version, compress=compress, chunked=chunked, expect100=expect100, loop=self._loop, response_class=self._response_class, proxy=proxy_, proxy_auth=proxy_auth, timer=timer, session=self, ssl=ssl if ssl is not None else True, server_hostname=server_hostname, proxy_headers=proxy_headers, traces=traces, trust_env=self.trust_env, ) async def _connect_and_send_request( req: ClientRequest, ) -> ClientResponse: # connection timeout assert self._connector is not None try: conn = await self._connector.connect( req, traces=traces, timeout=real_timeout ) except asyncio.TimeoutError as exc: raise ConnectionTimeoutError( f"Connection timeout to host {req.url}" ) from exc assert conn.protocol is not None conn.protocol.set_response_params( timer=timer, skip_payload=req.method in EMPTY_BODY_METHODS, read_until_eof=read_until_eof, auto_decompress=auto_decompress, read_timeout=real_timeout.sock_read, read_bufsize=read_bufsize, timeout_ceil_threshold=self._connector._timeout_ceil_threshold, max_line_size=max_line_size, max_field_size=max_field_size, max_headers=max_headers, ) try: resp = await req.send(conn) try: await resp.start(conn) except BaseException: resp.close() raise except BaseException: conn.close() raise return resp # Apply middleware (if any) - per-request middleware overrides session middleware effective_middlewares = ( self._middlewares if middlewares is None else middlewares ) if effective_middlewares: handler = build_client_middlewares( _connect_and_send_request, effective_middlewares ) else: handler = _connect_and_send_request try: resp = await handler(req) # Client connector errors should not be retried except ( ConnectionTimeoutError, ClientConnectorError, ClientConnectorCertificateError, ClientConnectorSSLError, ): raise except (ClientOSError, ServerDisconnectedError): if retry_persistent_connection: retry_persistent_connection = False continue raise except ClientError: raise except OSError as exc: if exc.errno is None and isinstance(exc, asyncio.TimeoutError): raise raise ClientOSError(*exc.args) from exc # Update cookies from raw headers to preserve duplicates if resp._raw_cookie_headers: self._cookie_jar.update_cookies_from_headers( resp._raw_cookie_headers, resp.url ) # redirects if resp.status in (301, 302, 303, 307, 308) and allow_redirects: for trace in traces: await trace.send_request_redirect( method, url.update_query(params), headers, resp ) redirects += 1 history.append(resp) if max_redirects and redirects >= max_redirects: if req._body is not None: await req._body.close() resp.close() raise TooManyRedirects( history[0].request_info, tuple(history) ) # For 301 and 302, mimic IE, now changed in RFC # https://github.com/kennethreitz/requests/pull/269 if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or ( resp.status in (301, 302) and resp.method == hdrs.METH_POST ): method = hdrs.METH_GET data = None if headers.get(hdrs.CONTENT_LENGTH): headers.pop(hdrs.CONTENT_LENGTH) else: # For 307/308, always preserve the request body # For 301/302 with non-POST methods, preserve the request body # https://www.rfc-editor.org/rfc/rfc9110#section-15.4.3-3.1 # Use the existing payload to avoid recreating it from # a potentially consumed file. # # If the payload is already consumed and cannot be replayed, # fail fast instead of silently sending an empty body. if req._body is not None and req._body.consumed: resp.close() raise ClientPayloadError( "Cannot follow redirect with a consumed request " "body. Use bytes, a seekable file-like object, " "or set allow_redirects=False." ) data = req._body r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get( hdrs.URI ) if r_url is None: # see github.com/aio-libs/aiohttp/issues/2022 break else: # reading from correct redirection # response is forbidden resp.release() try: parsed_redirect_url = URL( r_url, encoded=not self._requote_redirect_url ) except ValueError as e: if req._body is not None: await req._body.close() resp.close() raise InvalidUrlRedirectClientError( r_url, "Server attempted redirecting to a location that does not look like a URL", ) from e scheme = parsed_redirect_url.scheme if scheme not in HTTP_AND_EMPTY_SCHEMA_SET: if req._body is not None: await req._body.close() resp.close() raise NonHttpUrlRedirectClientError(r_url) elif not scheme: parsed_redirect_url = url.join(parsed_redirect_url) try: redirect_origin = parsed_redirect_url.origin() except ValueError as origin_val_err: if req._body is not None: await req._body.close() resp.close() raise InvalidUrlRedirectClientError( parsed_redirect_url, "Invalid redirect URL origin", ) from origin_val_err if url.origin() != redirect_origin: auth = None cookies = None headers.pop(hdrs.AUTHORIZATION, None) headers.pop(hdrs.COOKIE, None) headers.pop(hdrs.PROXY_AUTHORIZATION, None) url = parsed_redirect_url params = {} resp.release() continue break if req._body is not None: await req._body.close() # check response status if raise_for_status is None: raise_for_status = self._raise_for_status if raise_for_status is None: pass elif callable(raise_for_status): await raise_for_status(resp) elif raise_for_status: resp.raise_for_status() # register connection if handle is not None: if resp.connection is not None: resp.connection.add_callback(handle.cancel) else: handle.cancel() resp._history = tuple(history) for trace in traces: await trace.send_request_end( method, url.update_query(params), headers, resp ) return resp except BaseException as e: # cleanup timer tm.close() if handle: handle.cancel() handle = None for trace in traces: await trace.send_request_exception( method, url.update_query(params), headers, e ) raise if sys.version_info >= (3, 11) and TYPE_CHECKING: @overload def ws_connect( self, url: StrOrURL, *, decode_text: Literal[True] = ..., **kwargs: Unpack[_WSConnectOptions], ) -> "_BaseRequestContextManager[ClientWebSocketResponse[Literal[True]]]": ... @overload def ws_connect( self, url: StrOrURL, *, decode_text: Literal[False], **kwargs: Unpack[_WSConnectOptions], ) -> "_BaseRequestContextManager[ClientWebSocketResponse[Literal[False]]]": ... @overload def ws_connect( self, url: StrOrURL, *, decode_text: bool = ..., **kwargs: Unpack[_WSConnectOptions], ) -> "_BaseRequestContextManager[ClientWebSocketResponse[bool]]": ...
[docs] def ws_connect( self, url: StrOrURL, *, method: str = hdrs.METH_GET, protocols: Iterable[str] = (), timeout: ClientWSTimeout | _SENTINEL = sentinel, receive_timeout: float | None = None, autoclose: bool = True, autoping: bool = True, heartbeat: float | None = None, auth: BasicAuth | None = None, origin: str | None = None, params: Query = None, headers: LooseHeaders | None = None, proxy: StrOrURL | None = None, proxy_auth: BasicAuth | None = None, ssl: SSLContext | bool | Fingerprint = True, verify_ssl: bool | None = None, fingerprint: bytes | None = None, ssl_context: SSLContext | None = None, server_hostname: str | None = None, proxy_headers: LooseHeaders | None = None, compress: int = 0, max_msg_size: int = 4 * 1024 * 1024, decode_text: bool = True, ) -> "_BaseRequestContextManager[ClientWebSocketResponse[bool]]": """Initiate websocket connection.""" return _WSRequestContextManager( self._ws_connect( url, method=method, protocols=protocols, timeout=timeout, receive_timeout=receive_timeout, autoclose=autoclose, autoping=autoping, heartbeat=heartbeat, auth=auth, origin=origin, params=params, headers=headers, proxy=proxy, proxy_auth=proxy_auth, ssl=ssl, verify_ssl=verify_ssl, fingerprint=fingerprint, ssl_context=ssl_context, server_hostname=server_hostname, proxy_headers=proxy_headers, compress=compress, max_msg_size=max_msg_size, decode_text=decode_text, ) )
if sys.version_info >= (3, 11) and TYPE_CHECKING: @overload async def _ws_connect( self, url: StrOrURL, *, decode_text: Literal[True] = ..., **kwargs: Unpack[_WSConnectOptions], ) -> "ClientWebSocketResponse[Literal[True]]": ... @overload async def _ws_connect( self, url: StrOrURL, *, decode_text: Literal[False], **kwargs: Unpack[_WSConnectOptions], ) -> "ClientWebSocketResponse[Literal[False]]": ... @overload async def _ws_connect( self, url: StrOrURL, *, decode_text: bool = ..., **kwargs: Unpack[_WSConnectOptions], ) -> "ClientWebSocketResponse[bool]": ... async def _ws_connect( self, url: StrOrURL, *, method: str = hdrs.METH_GET, protocols: Iterable[str] = (), timeout: ClientWSTimeout | _SENTINEL = sentinel, receive_timeout: float | None = None, autoclose: bool = True, autoping: bool = True, heartbeat: float | None = None, auth: BasicAuth | None = None, origin: str | None = None, params: Query = None, headers: LooseHeaders | None = None, proxy: StrOrURL | None = None, proxy_auth: BasicAuth | None = None, ssl: SSLContext | bool | Fingerprint = True, verify_ssl: bool | None = None, fingerprint: bytes | None = None, ssl_context: SSLContext | None = None, server_hostname: str | None = None, proxy_headers: LooseHeaders | None = None, compress: int = 0, max_msg_size: int = 4 * 1024 * 1024, decode_text: bool = True, ) -> "ClientWebSocketResponse[bool]": if auth is not None: warnings.warn( "The 'auth' parameter is deprecated and will be removed in v4;" " pass headers={'Authorization': " "aiohttp.encode_basic_auth(login, password)} instead", DeprecationWarning, stacklevel=3, ) if proxy_auth is not None: warnings.warn( "The 'proxy_auth' parameter is deprecated and will be removed in v4;" " pass proxy_headers={'Proxy-Authorization': " "aiohttp.encode_basic_auth(login, password)} instead", DeprecationWarning, stacklevel=3, ) if timeout is not sentinel: if isinstance(timeout, ClientWSTimeout): ws_timeout = timeout else: warnings.warn( "parameter 'timeout' of type 'float' " "is deprecated, please use " "'timeout=ClientWSTimeout(ws_close=...)'", DeprecationWarning, stacklevel=2, ) ws_timeout = ClientWSTimeout(ws_close=timeout) else: ws_timeout = DEFAULT_WS_CLIENT_TIMEOUT if receive_timeout is not None: warnings.warn( "float parameter 'receive_timeout' " "is deprecated, please use parameter " "'timeout=ClientWSTimeout(ws_receive=...)'", DeprecationWarning, stacklevel=2, ) ws_timeout = attr.evolve(ws_timeout, ws_receive=receive_timeout) if headers is None: real_headers: CIMultiDict[str] = CIMultiDict() else: real_headers = CIMultiDict(headers) default_headers = { hdrs.UPGRADE: "websocket", hdrs.CONNECTION: "Upgrade", hdrs.SEC_WEBSOCKET_VERSION: "13", } for key, value in default_headers.items(): real_headers.setdefault(key, value) sec_key = base64.b64encode(os.urandom(16)) real_headers[hdrs.SEC_WEBSOCKET_KEY] = sec_key.decode() if protocols: real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols) if origin is not None: real_headers[hdrs.ORIGIN] = origin if compress: extstr = ws_ext_gen(compress=compress) real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr # For the sake of backward compatibility, if user passes in None, convert it to True if ssl is None: warnings.warn( "ssl=None is deprecated, please use ssl=True", DeprecationWarning, stacklevel=2, ) ssl = True ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) # send request resp = await self.request( method, url, params=params, headers=real_headers, read_until_eof=False, auth=auth, proxy=proxy, proxy_auth=proxy_auth, ssl=ssl, server_hostname=server_hostname, proxy_headers=proxy_headers, ) try: # check handshake if resp.status != 101: raise WSServerHandshakeError( resp.request_info, resp.history, message="Invalid response status", status=resp.status, headers=resp.headers, ) if resp.headers.get(hdrs.UPGRADE, "").lower() != "websocket": raise WSServerHandshakeError( resp.request_info, resp.history, message="Invalid upgrade header", status=resp.status, headers=resp.headers, ) if resp.headers.get(hdrs.CONNECTION, "").lower() != "upgrade": raise WSServerHandshakeError( resp.request_info, resp.history, message="Invalid connection header", status=resp.status, headers=resp.headers, ) # key calculation r_key = resp.headers.get(hdrs.SEC_WEBSOCKET_ACCEPT, "") match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()).decode() if r_key != match: raise WSServerHandshakeError( resp.request_info, resp.history, message="Invalid challenge response", status=resp.status, headers=resp.headers, ) # websocket protocol protocol = None if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers: resp_protocols = [ proto.strip() for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",") ] for proto in resp_protocols: if proto in protocols: protocol = proto break # websocket compress notakeover = False if compress: compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS) if compress_hdrs: try: compress, notakeover = ws_ext_parse(compress_hdrs) except WSHandshakeError as exc: raise WSServerHandshakeError( resp.request_info, resp.history, message=exc.args[0], status=resp.status, headers=resp.headers, ) from exc else: compress = 0 notakeover = False conn = resp.connection assert conn is not None conn_proto = conn.protocol assert conn_proto is not None # For WS connection the read_timeout must be either receive_timeout or greater # None == no timeout, i.e. infinite timeout, so None is the max timeout possible if ws_timeout.ws_receive is None: # Reset regardless conn_proto.read_timeout = None elif conn_proto.read_timeout is not None: conn_proto.read_timeout = max( ws_timeout.ws_receive, conn_proto.read_timeout ) transport = conn.transport assert transport is not None reader = WebSocketDataQueue(conn_proto, DEFAULT_CHUNK_SIZE, loop=self._loop) writer = WebSocketWriter( conn_proto, transport, use_mask=True, compress=compress, notakeover=notakeover, ) except BaseException: resp.close() raise else: ws_resp = self._ws_response_class( reader, writer, protocol, resp, ws_timeout, autoclose, autoping, self._loop, heartbeat=heartbeat, compress=compress, client_notakeover=notakeover, ) parser = WebSocketReader(reader, max_msg_size, decode_text=decode_text) cb = None if heartbeat is None else ws_resp._on_data_received conn_proto.set_parser(parser, reader, data_received_cb=cb) return ws_resp def _prepare_headers(self, headers: LooseHeaders | None) -> "CIMultiDict[str]": """Add default headers and transform it to CIMultiDict""" # Convert headers to MultiDict result = CIMultiDict(self._default_headers) if headers: if not isinstance(headers, (MultiDictProxy, MultiDict)): headers = CIMultiDict(headers) added_names: set[str] = set() for key, value in headers.items(): if key in added_names: result.add(key, value) else: result[key] = value added_names.add(key) return result def _get_netrc_auth(self, host: str) -> BasicAuth | None: """ Get auth from netrc for the given host. This method is designed to be called in an executor to avoid blocking I/O in the event loop. """ netrc_obj = netrc_from_env() try: return basicauth_from_netrc(netrc_obj, host) except LookupError: return None if sys.version_info >= (3, 11) and TYPE_CHECKING: def get( self, url: StrOrURL, **kwargs: Unpack[_RequestOptions], ) -> "_RequestContextManager": ... def options( self, url: StrOrURL, **kwargs: Unpack[_RequestOptions], ) -> "_RequestContextManager": ... def head( self, url: StrOrURL, **kwargs: Unpack[_RequestOptions], ) -> "_RequestContextManager": ... def post( self, url: StrOrURL, **kwargs: Unpack[_RequestOptions], ) -> "_RequestContextManager": ... def put( self, url: StrOrURL, **kwargs: Unpack[_RequestOptions], ) -> "_RequestContextManager": ... def patch( self, url: StrOrURL, **kwargs: Unpack[_RequestOptions], ) -> "_RequestContextManager": ... def delete( self, url: StrOrURL, **kwargs: Unpack[_RequestOptions], ) -> "_RequestContextManager": ... else:
[docs] def get( self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any ) -> "_RequestContextManager": """Perform HTTP GET request.""" return _RequestContextManager( self._request( hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs ) )
[docs] def options( self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any ) -> "_RequestContextManager": """Perform HTTP OPTIONS request.""" return _RequestContextManager( self._request( hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs ) )
[docs] def head( self, url: StrOrURL, *, allow_redirects: bool = False, **kwargs: Any ) -> "_RequestContextManager": """Perform HTTP HEAD request.""" return _RequestContextManager( self._request( hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs ) )
[docs] def post( self, url: StrOrURL, *, data: Any = None, **kwargs: Any ) -> "_RequestContextManager": """Perform HTTP POST request.""" return _RequestContextManager( self._request(hdrs.METH_POST, url, data=data, **kwargs) )
[docs] def put( self, url: StrOrURL, *, data: Any = None, **kwargs: Any ) -> "_RequestContextManager": """Perform HTTP PUT request.""" return _RequestContextManager( self._request(hdrs.METH_PUT, url, data=data, **kwargs) )
[docs] def patch( self, url: StrOrURL, *, data: Any = None, **kwargs: Any ) -> "_RequestContextManager": """Perform HTTP PATCH request.""" return _RequestContextManager( self._request(hdrs.METH_PATCH, url, data=data, **kwargs) )
[docs] def delete(self, url: StrOrURL, **kwargs: Any) -> "_RequestContextManager": """Perform HTTP DELETE request.""" return _RequestContextManager( self._request(hdrs.METH_DELETE, url, **kwargs) )
[docs] async def close(self) -> None: """Close underlying connector. Release all acquired resources. """ if not self.closed: if self._connector is not None and self._connector_owner: await self._connector.close() self._connector = None
@property def closed(self) -> bool: """Is client session closed. A readonly property. """ return self._connector is None or self._connector.closed @property def connector(self) -> BaseConnector | None: """Connector instance used for the session.""" return self._connector @property def cookie_jar(self) -> AbstractCookieJar: """The session cookies.""" return self._cookie_jar @property def version(self) -> tuple[int, int]: """The session HTTP protocol version.""" return self._version @property def requote_redirect_url(self) -> bool: """Do URL requoting on redirection handling.""" return self._requote_redirect_url @requote_redirect_url.setter def requote_redirect_url(self, val: bool) -> None: """Do URL requoting on redirection handling.""" warnings.warn( "session.requote_redirect_url modification is deprecated #2778", DeprecationWarning, stacklevel=2, ) self._requote_redirect_url = val @property def loop(self) -> asyncio.AbstractEventLoop: """Session's loop.""" warnings.warn( "client.loop property is deprecated", DeprecationWarning, stacklevel=2 ) return self._loop @property def timeout(self) -> ClientTimeout: """Timeout for the session.""" return self._timeout @property def headers(self) -> "CIMultiDict[str]": """The default headers of the client session.""" return self._default_headers @property def skip_auto_headers(self) -> frozenset[istr]: """Headers for which autogeneration should be skipped""" return self._skip_auto_headers @property def auth(self) -> BasicAuth | None: """An object that represents HTTP Basic Authorization""" return self._default_auth @property def json_serialize(self) -> JSONEncoder: """Json serializer callable""" return self._json_serialize @property def connector_owner(self) -> bool: """Should connector be closed on session closing""" return self._connector_owner @property def raise_for_status( self, ) -> bool | Callable[[ClientResponse], Awaitable[None]]: """Should `ClientResponse.raise_for_status()` be called for each response.""" return self._raise_for_status @property def auto_decompress(self) -> bool: """Should the body response be automatically decompressed.""" return self._auto_decompress @property def trust_env(self) -> bool: """ Should proxies information from environment or netrc be trusted. Information is from HTTP_PROXY / HTTPS_PROXY environment variables or ~/.netrc file if present. """ return self._trust_env @property def trace_configs(self) -> list[TraceConfig]: """A list of TraceConfig instances used for client tracing""" return self._trace_configs
[docs] def detach(self) -> None: """Detach connector from session without closing the former. Session is switched to closed state anyway. """ self._connector = None
def __enter__(self) -> None: raise TypeError("Use async with instead") def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: # __exit__ should exist in pair with __enter__ but never executed pass # pragma: no cover async def __aenter__(self) -> "ClientSession": return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.close()
class _BaseRequestContextManager( Coroutine[Any, Any, _RetType_co], Generic[_RetType_co] ): __slots__ = ("_coro", "_resp") def __init__(self, coro: Coroutine[asyncio.Future[Any], None, _RetType_co]) -> None: self._coro: Coroutine[asyncio.Future[Any], None, _RetType_co] = coro def send(self, arg: None) -> asyncio.Future[Any]: return self._coro.send(arg) def throw(self, *args: Any, **kwargs: Any) -> asyncio.Future[Any]: return self._coro.throw(*args, **kwargs) def close(self) -> None: return self._coro.close() def __await__(self) -> Generator[Any, None, _RetType_co]: ret = self._coro.__await__() return ret def __iter__(self) -> Generator[Any, None, _RetType_co]: return self.__await__() async def __aenter__(self) -> _RetType_co: self._resp: _RetType_co = await self._coro return await self._resp.__aenter__() # type: ignore[return-value] async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: TracebackType | None, ) -> None: await self._resp.__aexit__(exc_type, exc, tb) _RequestContextManager = _BaseRequestContextManager[ClientResponse] _WSRequestContextManager = _BaseRequestContextManager[ClientWebSocketResponse[bool]] class _SessionRequestContextManager: __slots__ = ("_coro", "_resp", "_session") def __init__( self, coro: Coroutine[asyncio.Future[Any], None, ClientResponse], session: ClientSession, ) -> None: self._coro = coro self._resp: ClientResponse | None = None self._session = session async def __aenter__(self) -> ClientResponse: try: self._resp = await self._coro except BaseException: await self._session.close() raise else: return self._resp async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: TracebackType | None, ) -> None: assert self._resp is not None self._resp.close() await self._session.close() if sys.version_info >= (3, 11) and TYPE_CHECKING: def request( method: str, url: StrOrURL, *, version: HttpVersion = http.HttpVersion11, connector: BaseConnector | None = None, loop: asyncio.AbstractEventLoop | None = None, **kwargs: Unpack[_RequestOptions], ) -> _SessionRequestContextManager: ... else:
[docs] def request( method: str, url: StrOrURL, *, version: HttpVersion = http.HttpVersion11, connector: BaseConnector | None = None, loop: asyncio.AbstractEventLoop | None = None, **kwargs: Any, ) -> _SessionRequestContextManager: """Constructs and sends a request. Returns response object. method - HTTP method url - request url params - (optional) Dictionary or bytes to be sent in the query string of the new request data - (optional) Dictionary, bytes, or file-like object to send in the body of the request json - (optional) Any json compatible python object headers - (optional) Dictionary of HTTP Headers to send with the request cookies - (optional) Dict object to send with the request auth - (optional) BasicAuth named tuple represent HTTP Basic Auth auth - aiohttp.helpers.BasicAuth allow_redirects - (optional) If set to False, do not follow redirects version - Request HTTP version. compress - Set to True if request has to be compressed with deflate encoding. chunked - Set to chunk size for chunked transfer encoding. expect100 - Expect 100-continue response from server. connector - BaseConnector sub-class instance to support connection pooling. read_until_eof - Read response until eof if response does not have Content-Length header. loop - Optional event loop. timeout - Optional ClientTimeout settings structure, 5min total timeout by default. Usage:: >>> import aiohttp >>> async with aiohttp.request('GET', 'http://python.org/') as resp: ... print(resp) ... data = await resp.read() <ClientResponse(https://www.python.org/) [200 OK]> """ connector_owner = False if connector is None: connector_owner = True connector = TCPConnector(loop=loop, force_close=True) session = ClientSession( loop=loop, cookies=kwargs.pop("cookies", None), version=version, timeout=kwargs.pop("timeout", sentinel), connector=connector, connector_owner=connector_owner, ) return _SessionRequestContextManager( session._request(method, url, **kwargs), session, )