import abc
import asyncio
import base64
import functools
import hashlib
import html
import inspect
import keyword
import os
import platform
import re
import sys
import warnings
from collections.abc import (
Awaitable,
Callable,
Container,
Generator,
Iterable,
Iterator,
Mapping,
Sized,
)
from functools import wraps
from pathlib import Path
from re import Pattern
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Final, NoReturn, Optional, TypedDict, cast
from yarl import URL, __version__ as yarl_version
from . import hdrs
from .abc import AbstractMatchInfo, AbstractRouter, AbstractView
from .helpers import DEBUG, DEFAULT_CHUNK_SIZE
from .http import HttpVersion11
from .typedefs import Handler, PathLike
from .web_exceptions import (
HTTPException,
HTTPExpectationFailed,
HTTPForbidden,
HTTPMethodNotAllowed,
HTTPNotFound,
)
from .web_fileresponse import FileResponse
from .web_request import Request
from .web_response import Response, StreamResponse
from .web_routedef import AbstractRouteDef
__all__ = (
"UrlDispatcher",
"UrlMappingMatchInfo",
"AbstractResource",
"Resource",
"PlainResource",
"DynamicResource",
"AbstractRoute",
"ResourceRoute",
"StaticResource",
"View",
)
if TYPE_CHECKING:
from .web_app import Application
BaseDict = dict[str, str]
else:
BaseDict = dict
CIRCULAR_SYMLINK_ERROR = (RuntimeError,) if sys.version_info < (3, 13) else ()
YARL_VERSION: Final[tuple[int, ...]] = tuple(map(int, yarl_version.split(".")[:2]))
HTTP_METHOD_RE: Final[Pattern[str]] = re.compile(
r"^[0-9A-Za-z!#\$%&'\*\+\-\.\^_`\|~]+$"
)
ROUTE_RE: Final[Pattern[str]] = re.compile(
r"(\{[_a-zA-Z][^{}]*(?:\{[^{}]*\}[^{}]*)*\})"
)
PATH_SEP: Final[str] = re.escape("/")
IS_WINDOWS: Final[bool] = platform.system() == "Windows"
_ExpectHandler = Callable[[Request], Awaitable[StreamResponse | None]]
_Resolve = tuple[Optional["UrlMappingMatchInfo"], set[str]]
html_escape = functools.partial(html.escape, quote=True)
class _InfoDict(TypedDict, total=False):
path: str
formatter: str
pattern: Pattern[str]
directory: Path
prefix: str
routes: Mapping[str, "AbstractRoute"]
app: "Application"
domain: str
rule: "AbstractRuleMatching"
http_exception: HTTPException
[docs]
class AbstractResource(Sized, Iterable["AbstractRoute"]):
def __init__(self, *, name: str | None = None) -> None:
self._name = name
@property
def name(self) -> str | None:
return self._name
@property
@abc.abstractmethod
def canonical(self) -> str:
"""Exposes the resource's canonical path.
For example '/foo/bar/{name}'
"""
[docs]
@abc.abstractmethod # pragma: no branch
def url_for(self, **kwargs: str) -> URL:
"""Construct url for resource with additional params."""
[docs]
@abc.abstractmethod # pragma: no branch
async def resolve(self, request: Request) -> _Resolve:
"""Resolve resource.
Return (UrlMappingMatchInfo, allowed_methods) pair.
"""
@abc.abstractmethod
def add_prefix(self, prefix: str) -> None:
"""Add a prefix to processed URLs.
Required for subapplications support.
"""
[docs]
@abc.abstractmethod
def get_info(self) -> _InfoDict:
"""Return a dict with additional info useful for introspection"""
def freeze(self) -> None:
pass
@abc.abstractmethod
def raw_match(self, path: str) -> bool:
"""Perform a raw match against path"""
[docs]
class AbstractRoute(abc.ABC):
def __init__(
self,
method: str,
handler: Handler | type[AbstractView],
*,
expect_handler: _ExpectHandler | None = None,
resource: AbstractResource | None = None,
) -> None:
if expect_handler is None:
expect_handler = _default_expect_handler
assert inspect.iscoroutinefunction(expect_handler) or (
sys.version_info < (3, 14) and asyncio.iscoroutinefunction(expect_handler)
), f"Coroutine is expected, got {expect_handler!r}"
method = method.upper()
if not HTTP_METHOD_RE.match(method):
raise ValueError(f"{method} is not allowed HTTP method")
assert callable(handler), handler
if inspect.iscoroutinefunction(handler) or (
sys.version_info < (3, 14) and asyncio.iscoroutinefunction(handler)
):
pass
elif inspect.isgeneratorfunction(handler):
if TYPE_CHECKING:
assert False
warnings.warn(
"Bare generators are deprecated, use @coroutine wrapper",
DeprecationWarning,
)
elif isinstance(handler, type) and issubclass(handler, AbstractView):
pass
else:
warnings.warn(
"Bare functions are deprecated, use async ones", DeprecationWarning
)
@wraps(handler)
async def handler_wrapper(request: Request) -> StreamResponse:
result = old_handler(request) # type: ignore[call-arg]
if asyncio.iscoroutine(result):
result = await result
assert isinstance(result, StreamResponse)
return result
old_handler = handler
handler = handler_wrapper
self._method = method
self._handler = handler
self._expect_handler = expect_handler
self._resource = resource
@property
def method(self) -> str:
return self._method
@property
def handler(self) -> Handler:
return self._handler
@property
@abc.abstractmethod
def name(self) -> str | None:
"""Optional route's name, always equals to resource's name."""
@property
def resource(self) -> AbstractResource | None:
return self._resource
@abc.abstractmethod
def get_info(self) -> _InfoDict:
"""Return a dict with additional info useful for introspection"""
[docs]
@abc.abstractmethod # pragma: no branch
def url_for(self, *args: str, **kwargs: str) -> URL:
"""Construct url for route with additional params."""
[docs]
async def handle_expect_header(self, request: Request) -> StreamResponse | None:
return await self._expect_handler(request)
[docs]
class UrlMappingMatchInfo(BaseDict, AbstractMatchInfo):
__slots__ = ("_route", "_apps", "_current_app", "_frozen")
def __init__(self, match_dict: dict[str, str], route: AbstractRoute) -> None:
super().__init__(match_dict)
self._route = route
self._apps: list[Application] = []
self._current_app: Application | None = None
self._frozen = False
@property
def handler(self) -> Handler:
return self._route.handler
@property
def route(self) -> AbstractRoute:
return self._route
@property
def expect_handler(self) -> _ExpectHandler:
return self._route.handle_expect_header
@property
def http_exception(self) -> HTTPException | None:
return None
def get_info(self) -> _InfoDict: # type: ignore[override]
return self._route.get_info()
@property
def apps(self) -> tuple["Application", ...]:
return tuple(self._apps)
def add_app(self, app: "Application") -> None:
if self._frozen:
raise RuntimeError("Cannot change apps stack after .freeze() call")
if self._current_app is None:
self._current_app = app
self._apps.insert(0, app)
@property
def current_app(self) -> "Application":
app = self._current_app
assert app is not None
return app
@current_app.setter
def current_app(self, app: "Application") -> None:
if DEBUG: # pragma: no cover
if app not in self._apps:
raise RuntimeError(
f"Expected one of the following apps {self._apps!r}, got {app!r}"
)
self._current_app = app
def freeze(self) -> None:
self._frozen = True
def __repr__(self) -> str:
return f"<MatchInfo {super().__repr__()}: {self._route}>"
class MatchInfoError(UrlMappingMatchInfo):
__slots__ = ("_exception",)
def __init__(self, http_exception: HTTPException) -> None:
self._exception = http_exception
super().__init__({}, SystemRoute(self._exception))
@property
def http_exception(self) -> HTTPException:
return self._exception
def __repr__(self) -> str:
return f"<MatchInfoError {self._exception.status}: {self._exception.reason}>"
async def _default_expect_handler(request: Request) -> None:
"""Default handler for Expect header.
Just send "100 Continue" to client.
raise HTTPExpectationFailed if value of header is not "100-continue"
"""
expect = request.headers.get(hdrs.EXPECT, "")
if request.version == HttpVersion11:
if expect.lower() == "100-continue":
await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n")
# Reset output_size as we haven't started the main body yet.
request.writer.output_size = 0
else:
raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect)
[docs]
class Resource(AbstractResource):
def __init__(self, *, name: str | None = None) -> None:
super().__init__(name=name)
self._routes: dict[str, ResourceRoute] = {}
self._any_route: ResourceRoute | None = None
self._allowed_methods: set[str] = set()
[docs]
def add_route(
self,
method: str,
handler: type[AbstractView] | Handler,
*,
expect_handler: _ExpectHandler | None = None,
) -> "ResourceRoute":
if route := self._routes.get(method, self._any_route):
raise RuntimeError(
"Added route will never be executed, "
f"method {route.method} is already "
"registered"
)
route_obj = ResourceRoute(method, handler, self, expect_handler=expect_handler)
self.register_route(route_obj)
return route_obj
def register_route(self, route: "ResourceRoute") -> None:
assert isinstance(
route, ResourceRoute
), f"Instance of Route class is required, got {route!r}"
if route.method == hdrs.METH_ANY:
self._any_route = route
self._allowed_methods.add(route.method)
self._routes[route.method] = route
async def resolve(self, request: Request) -> _Resolve:
if (match_dict := self._match(request.rel_url.path_safe)) is None:
return None, set()
if route := self._routes.get(request.method, self._any_route):
return UrlMappingMatchInfo(match_dict, route), self._allowed_methods
return None, self._allowed_methods
@abc.abstractmethod
def _match(self, path: str) -> dict[str, str] | None:
pass # pragma: no cover
def __len__(self) -> int:
return len(self._routes)
def __iter__(self) -> Iterator["ResourceRoute"]:
return iter(self._routes.values())
# TODO: implement all abstract methods
[docs]
class PlainResource(Resource):
def __init__(self, path: str, *, name: str | None = None) -> None:
super().__init__(name=name)
assert not path or path.startswith("/")
self._path = path
@property
def canonical(self) -> str:
return self._path
def freeze(self) -> None:
if not self._path:
self._path = "/"
def add_prefix(self, prefix: str) -> None:
assert prefix.startswith("/")
assert not prefix.endswith("/")
assert len(prefix) > 1
self._path = prefix + self._path
def _match(self, path: str) -> dict[str, str] | None:
# string comparison is about 10 times faster than regexp matching
if self._path == path:
return {}
return None
def raw_match(self, path: str) -> bool:
return self._path == path
def get_info(self) -> _InfoDict:
return {"path": self._path}
[docs]
def url_for(self) -> URL: # type: ignore[override]
return URL.build(path=self._path, encoded=True)
def __repr__(self) -> str:
name = "'" + self.name + "' " if self.name is not None else ""
return f"<PlainResource {name} {self._path}>"
[docs]
class DynamicResource(Resource):
DYN = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*)\}")
DYN_WITH_RE = re.compile(r"\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*):(?P<re>.+)\}")
GOOD = r"[^{}/]+"
def __init__(self, path: str, *, name: str | None = None) -> None:
super().__init__(name=name)
self._orig_path = path
pattern = ""
formatter = ""
for part in ROUTE_RE.split(path):
match = self.DYN.fullmatch(part)
if match:
pattern += "(?P<{}>{})".format(match.group("var"), self.GOOD)
formatter += "{" + match.group("var") + "}"
continue
match = self.DYN_WITH_RE.fullmatch(part)
if match:
pattern += "(?P<{var}>{re})".format(**match.groupdict())
formatter += "{" + match.group("var") + "}"
continue
if "{" in part or "}" in part:
raise ValueError(f"Invalid path '{path}'['{part}']")
part = _requote_path(part)
formatter += part
pattern += re.escape(part)
try:
compiled = re.compile(pattern)
except re.error as exc:
raise ValueError(f"Bad pattern '{pattern}': {exc}") from None
assert compiled.pattern.startswith(PATH_SEP)
assert formatter.startswith("/")
self._pattern = compiled
self._formatter = formatter
@property
def canonical(self) -> str:
return self._formatter
def add_prefix(self, prefix: str) -> None:
assert prefix.startswith("/")
assert not prefix.endswith("/")
assert len(prefix) > 1
self._pattern = re.compile(re.escape(prefix) + self._pattern.pattern)
self._formatter = prefix + self._formatter
def _match(self, path: str) -> dict[str, str] | None:
match = self._pattern.fullmatch(path)
if match is None:
return None
return {
key: _unquote_path_safe(value) for key, value in match.groupdict().items()
}
def raw_match(self, path: str) -> bool:
return self._orig_path == path
def get_info(self) -> _InfoDict:
return {"formatter": self._formatter, "pattern": self._pattern}
[docs]
def url_for(self, **parts: str) -> URL:
url = self._formatter.format_map({k: _quote_path(v) for k, v in parts.items()})
return URL.build(path=url, encoded=True)
def __repr__(self) -> str:
name = "'" + self.name + "' " if self.name is not None else ""
return f"<DynamicResource {name} {self._formatter}>"
class PrefixResource(AbstractResource):
def __init__(self, prefix: str, *, name: str | None = None) -> None:
assert not prefix or prefix.startswith("/"), prefix
assert prefix in ("", "/") or not prefix.endswith("/"), prefix
super().__init__(name=name)
self._prefix = _requote_path(prefix)
self._prefix2 = self._prefix + "/"
@property
def canonical(self) -> str:
return self._prefix
def add_prefix(self, prefix: str) -> None:
assert prefix.startswith("/")
assert not prefix.endswith("/")
assert len(prefix) > 1
self._prefix = prefix + self._prefix
self._prefix2 = self._prefix + "/"
def raw_match(self, prefix: str) -> bool:
return False
# TODO: impl missing abstract methods
[docs]
class StaticResource(PrefixResource):
VERSION_KEY = "v"
def __init__(
self,
prefix: str,
directory: PathLike,
*,
name: str | None = None,
expect_handler: _ExpectHandler | None = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
show_index: bool = False,
follow_symlinks: bool = False,
append_version: bool = False,
) -> None:
super().__init__(prefix, name=name)
try:
directory = Path(directory).expanduser().resolve(strict=True)
except FileNotFoundError as error:
raise ValueError(f"'{directory}' does not exist") from error
if not directory.is_dir():
raise ValueError(f"'{directory}' is not a directory")
self._directory = directory
self._show_index = show_index
self._chunk_size = chunk_size
self._follow_symlinks = follow_symlinks
self._expect_handler = expect_handler
self._append_version = append_version
self._routes = {
"GET": ResourceRoute(
"GET", self._handle, self, expect_handler=expect_handler
),
"HEAD": ResourceRoute(
"HEAD", self._handle, self, expect_handler=expect_handler
),
}
self._allowed_methods = set(self._routes)
[docs]
def url_for( # type: ignore[override]
self,
*,
filename: PathLike,
append_version: bool | None = None,
) -> URL:
if append_version is None:
append_version = self._append_version
filename = str(filename).lstrip("/")
url = URL.build(path=self._prefix, encoded=True)
# filename is not encoded
if YARL_VERSION < (1, 6):
url = url / filename.replace("%", "%25")
else:
url = url / filename
if append_version:
unresolved_path = self._directory.joinpath(filename)
try:
if self._follow_symlinks:
normalized_path = Path(os.path.normpath(unresolved_path))
normalized_path.relative_to(self._directory)
filepath = normalized_path.resolve()
else:
filepath = unresolved_path.resolve()
filepath.relative_to(self._directory)
except (ValueError, FileNotFoundError):
# ValueError for case when path point to symlink
# with follow_symlinks is False
return url # relatively safe
if filepath.is_file():
# TODO cache file content
# with file watcher for cache invalidation
with filepath.open("rb") as f:
file_bytes = f.read()
h = self._get_file_hash(file_bytes)
url = url.with_query({self.VERSION_KEY: h})
return url
return url
@staticmethod
def _get_file_hash(byte_array: bytes) -> str:
m = hashlib.sha256() # todo sha256 can be configurable param
m.update(byte_array)
b64 = base64.urlsafe_b64encode(m.digest())
return b64.decode("ascii")
def get_info(self) -> _InfoDict:
return {
"directory": self._directory,
"prefix": self._prefix,
"routes": self._routes,
}
def set_options_route(self, handler: Handler) -> None:
if "OPTIONS" in self._routes:
raise RuntimeError("OPTIONS route was set already")
self._routes["OPTIONS"] = ResourceRoute(
"OPTIONS", handler, self, expect_handler=self._expect_handler
)
self._allowed_methods.add("OPTIONS")
async def resolve(self, request: Request) -> _Resolve:
path = request.rel_url.path_safe
method = request.method
# We normalise here to avoid matches that traverse below the static root.
# e.g. /static/../../../../home/user/webapp/static/
norm_path = os.path.normpath(path)
if IS_WINDOWS:
norm_path = norm_path.replace("\\", "/")
if not norm_path.startswith(self._prefix2) and norm_path != self._prefix:
return None, set()
allowed_methods = self._allowed_methods
if method not in allowed_methods:
return None, allowed_methods
match_dict = {"filename": _unquote_path_safe(path[len(self._prefix) + 1 :])}
return (UrlMappingMatchInfo(match_dict, self._routes[method]), allowed_methods)
def __len__(self) -> int:
return len(self._routes)
def __iter__(self) -> Iterator[AbstractRoute]:
return iter(self._routes.values())
async def _handle(self, request: Request) -> StreamResponse:
filename = request.match_info["filename"]
if Path(filename).is_absolute():
# filename is an absolute path e.g. //network/share or D:\path
# which could be a UNC path leading to NTLM credential theft
raise HTTPNotFound()
unresolved_path = self._directory.joinpath(filename)
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None, self._resolve_path_to_response, unresolved_path
)
def _resolve_path_to_response(self, unresolved_path: Path) -> StreamResponse:
"""Take the unresolved path and query the file system to form a response."""
# Check for access outside the root directory. For follow symlinks, URI
# cannot traverse out, but symlinks can. Otherwise, no access outside
# root is permitted.
try:
if self._follow_symlinks:
normalized_path = Path(os.path.normpath(unresolved_path))
normalized_path.relative_to(self._directory)
file_path = normalized_path.resolve()
else:
file_path = unresolved_path.resolve()
file_path.relative_to(self._directory)
except (ValueError, *CIRCULAR_SYMLINK_ERROR) as error:
# ValueError is raised for the relative check. Circular symlinks
# raise here on resolving for python < 3.13.
raise HTTPNotFound() from error
# if path is a directory, return the contents if permitted. Note the
# directory check will raise if a segment is not readable.
try:
if file_path.is_dir():
if self._show_index:
return Response(
text=self._directory_as_html(file_path),
content_type="text/html",
)
else:
raise HTTPForbidden()
except PermissionError as error:
raise HTTPForbidden() from error
# Return the file response, which handles all other checks.
return FileResponse(file_path, chunk_size=self._chunk_size)
def _directory_as_html(self, dir_path: Path) -> str:
"""returns directory's index as html."""
assert dir_path.is_dir()
relative_path_to_dir = dir_path.relative_to(self._directory).as_posix()
index_of = f"Index of /{html_escape(relative_path_to_dir)}"
h1 = f"<h1>{index_of}</h1>"
index_list = []
dir_index = dir_path.iterdir()
for _file in sorted(dir_index):
# show file url as relative to static path
rel_path = _file.relative_to(self._directory).as_posix()
quoted_file_url = _quote_path(f"{self._prefix}/{rel_path}")
# if file is a directory, add '/' to the end of the name
if _file.is_dir():
file_name = f"{_file.name}/"
else:
file_name = _file.name
index_list.append(
f'<li><a href="{quoted_file_url}">{html_escape(file_name)}</a></li>'
)
ul = "<ul>\n{}\n</ul>".format("\n".join(index_list))
body = f"<body>\n{h1}\n{ul}\n</body>"
head_str = f"<head>\n<title>{index_of}</title>\n</head>"
html = f"<html>\n{head_str}\n{body}\n</html>"
return html
def __repr__(self) -> str:
name = "'" + self.name + "'" if self.name is not None else ""
return f"<StaticResource {name} {self._prefix} -> {self._directory!r}>"
[docs]
class PrefixedSubAppResource(PrefixResource):
def __init__(self, prefix: str, app: "Application") -> None:
super().__init__(prefix)
self._app = app
self._add_prefix_to_resources(prefix)
def add_prefix(self, prefix: str) -> None:
super().add_prefix(prefix)
self._add_prefix_to_resources(prefix)
def _add_prefix_to_resources(self, prefix: str) -> None:
router = self._app.router
for resource in router.resources():
# Since the canonical path of a resource is about
# to change, we need to unindex it and then reindex
router.unindex_resource(resource)
resource.add_prefix(prefix)
router.index_resource(resource)
[docs]
def url_for(self, *args: str, **kwargs: str) -> URL:
raise RuntimeError(".url_for() is not supported by sub-application root")
def get_info(self) -> _InfoDict:
return {"app": self._app, "prefix": self._prefix}
async def resolve(self, request: Request) -> _Resolve:
match_info = await self._app.router.resolve(request)
match_info.add_app(self._app)
if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
methods = match_info.http_exception.allowed_methods
else:
methods = set()
return match_info, methods
def __len__(self) -> int:
return len(self._app.router.routes())
def __iter__(self) -> Iterator[AbstractRoute]:
return iter(self._app.router.routes())
def __repr__(self) -> str:
return f"<PrefixedSubAppResource {self._prefix} -> {self._app!r}>"
class AbstractRuleMatching(abc.ABC):
@abc.abstractmethod # pragma: no branch
async def match(self, request: Request) -> bool:
"""Return bool if the request satisfies the criteria"""
@abc.abstractmethod # pragma: no branch
def get_info(self) -> _InfoDict:
"""Return a dict with additional info useful for introspection"""
@property
@abc.abstractmethod # pragma: no branch
def canonical(self) -> str:
"""Return a str"""
class Domain(AbstractRuleMatching):
re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)")
def __init__(self, domain: str) -> None:
super().__init__()
self._domain = self.validation(domain)
@property
def canonical(self) -> str:
return self._domain
def validation(self, domain: str) -> str:
if not isinstance(domain, str):
raise TypeError("Domain must be str")
domain = domain.rstrip(".").lower()
if not domain:
raise ValueError("Domain cannot be empty")
elif "://" in domain:
raise ValueError("Scheme not supported")
url = URL("http://" + domain)
assert url.raw_host is not None
if not all(self.re_part.fullmatch(x) for x in url.raw_host.split(".")):
raise ValueError("Domain not valid")
if url.port == 80:
return url.raw_host
return f"{url.raw_host}:{url.port}"
async def match(self, request: Request) -> bool:
host = request.headers.get(hdrs.HOST)
if not host:
return False
return self.match_domain(host)
def match_domain(self, host: str) -> bool:
return host.lower() == self._domain
def get_info(self) -> _InfoDict:
return {"domain": self._domain}
class MaskDomain(Domain):
re_part = re.compile(r"(?!-)[a-z\d\*-]{1,63}(?<!-)")
def __init__(self, domain: str) -> None:
super().__init__(domain)
mask = self._domain.replace(".", r"\.").replace("*", ".*")
self._mask = re.compile(mask)
@property
def canonical(self) -> str:
return self._mask.pattern
def match_domain(self, host: str) -> bool:
return self._mask.fullmatch(host) is not None
class MatchedSubAppResource(PrefixedSubAppResource):
def __init__(self, rule: AbstractRuleMatching, app: "Application") -> None:
AbstractResource.__init__(self)
self._prefix = ""
self._app = app
self._rule = rule
@property
def canonical(self) -> str:
return self._rule.canonical
def get_info(self) -> _InfoDict:
return {"app": self._app, "rule": self._rule}
async def resolve(self, request: Request) -> _Resolve:
if not await self._rule.match(request):
return None, set()
match_info = await self._app.router.resolve(request)
match_info.add_app(self._app)
if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
methods = match_info.http_exception.allowed_methods
else:
methods = set()
return match_info, methods
def __repr__(self) -> str:
return f"<MatchedSubAppResource -> {self._app!r}>"
[docs]
class ResourceRoute(AbstractRoute):
"""A route with resource"""
def __init__(
self,
method: str,
handler: Handler | type[AbstractView],
resource: AbstractResource,
*,
expect_handler: _ExpectHandler | None = None,
) -> None:
super().__init__(
method, handler, expect_handler=expect_handler, resource=resource
)
def __repr__(self) -> str:
return f"<ResourceRoute [{self.method}] {self._resource} -> {self.handler!r}"
@property
def name(self) -> str | None:
if self._resource is None:
return None
return self._resource.name
def url_for(self, *args: str, **kwargs: str) -> URL:
"""Construct url for route with additional params."""
assert self._resource is not None
return self._resource.url_for(*args, **kwargs)
def get_info(self) -> _InfoDict:
assert self._resource is not None
return self._resource.get_info()
class SystemRoute(AbstractRoute):
def __init__(self, http_exception: HTTPException) -> None:
super().__init__(hdrs.METH_ANY, self._handle)
self._http_exception = http_exception
def url_for(self, *args: str, **kwargs: str) -> URL:
raise RuntimeError(".url_for() is not allowed for SystemRoute")
@property
def name(self) -> str | None:
return None
def get_info(self) -> _InfoDict:
return {"http_exception": self._http_exception}
async def _handle(self, request: Request) -> StreamResponse:
raise self._http_exception
@property
def status(self) -> int:
return self._http_exception.status
@property
def reason(self) -> str:
return self._http_exception.reason
def __repr__(self) -> str:
return f"<SystemRoute {self.status}: {self.reason}>"
[docs]
class View(AbstractView):
async def _iter(self) -> StreamResponse:
if self.request.method not in hdrs.METH_ALL:
self._raise_allowed_methods()
method: Callable[[], Awaitable[StreamResponse]] | None
method = getattr(self, self.request.method.lower(), None)
if method is None:
self._raise_allowed_methods()
ret = await method()
assert isinstance(ret, StreamResponse)
return ret
def __await__(self) -> Generator[None, None, StreamResponse]:
return self._iter().__await__()
def _raise_allowed_methods(self) -> NoReturn:
allowed_methods = {m for m in hdrs.METH_ALL if hasattr(self, m.lower())}
raise HTTPMethodNotAllowed(self.request.method, allowed_methods)
class ResourcesView(Sized, Iterable[AbstractResource], Container[AbstractResource]):
def __init__(self, resources: list[AbstractResource]) -> None:
self._resources = resources
def __len__(self) -> int:
return len(self._resources)
def __iter__(self) -> Iterator[AbstractResource]:
yield from self._resources
def __contains__(self, resource: object) -> bool:
return resource in self._resources
class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]):
def __init__(self, resources: list[AbstractResource]):
self._routes: list[AbstractRoute] = []
for resource in resources:
for route in resource:
self._routes.append(route)
def __len__(self) -> int:
return len(self._routes)
def __iter__(self) -> Iterator[AbstractRoute]:
yield from self._routes
def __contains__(self, route: object) -> bool:
return route in self._routes
[docs]
class UrlDispatcher(AbstractRouter, Mapping[str, AbstractResource]):
NAME_SPLIT_RE = re.compile(r"[.:-]")
def __init__(self) -> None:
super().__init__()
self._resources: list[AbstractResource] = []
self._named_resources: dict[str, AbstractResource] = {}
self._resource_index: dict[str, list[AbstractResource]] = {}
self._matched_sub_app_resources: list[MatchedSubAppResource] = []
[docs]
async def resolve(self, request: Request) -> UrlMappingMatchInfo:
resource_index = self._resource_index
allowed_methods: set[str] = set()
# MatchedSubAppResource is primarily used to match on domain names
# (though custom rules could match on other things). This means that
# the traversal algorithm below can't be applied, and that we likely
# need to check these first so a sub app that defines the same path
# as a parent app will get priority if there's a domain match.
#
# For most cases we do not expect there to be many of these since
# currently they are only added by `.add_domain()`.
for resource in self._matched_sub_app_resources:
match_dict, allowed = await resource.resolve(request)
if match_dict is not None:
return match_dict
else:
allowed_methods |= allowed
# Walk the url parts looking for candidates. We walk the url backwards
# to ensure the most explicit match is found first. If there are multiple
# candidates for a given url part because there are multiple resources
# registered for the same canonical path, we resolve them in a linear
# fashion to ensure registration order is respected.
url_part = request.rel_url.path_safe
while url_part:
for candidate in resource_index.get(url_part, ()):
match_dict, allowed = await candidate.resolve(request)
if match_dict is not None:
return match_dict
else:
allowed_methods |= allowed
if url_part == "/":
break
url_part = url_part.rpartition("/")[0] or "/"
if allowed_methods:
return MatchInfoError(HTTPMethodNotAllowed(request.method, allowed_methods))
return MatchInfoError(HTTPNotFound())
def __iter__(self) -> Iterator[str]:
return iter(self._named_resources)
def __len__(self) -> int:
return len(self._named_resources)
def __contains__(self, resource: object) -> bool:
return resource in self._named_resources
def __getitem__(self, name: str) -> AbstractResource:
return self._named_resources[name]
[docs]
def resources(self) -> ResourcesView:
return ResourcesView(self._resources)
[docs]
def routes(self) -> RoutesView:
return RoutesView(self._resources)
[docs]
def named_resources(self) -> Mapping[str, AbstractResource]:
return MappingProxyType(self._named_resources)
def register_resource(self, resource: AbstractResource) -> None:
assert isinstance(
resource, AbstractResource
), f"Instance of AbstractResource class is required, got {resource!r}"
if self.frozen:
raise RuntimeError("Cannot register a resource into frozen router.")
name = resource.name
if name is not None:
parts = self.NAME_SPLIT_RE.split(name)
for part in parts:
if keyword.iskeyword(part):
raise ValueError(
f"Incorrect route name {name!r}, "
"python keywords cannot be used "
"for route name"
)
if not part.isidentifier():
raise ValueError(
f"Incorrect route name {name!r}, "
"the name should be a sequence of "
"python identifiers separated "
"by dash, dot or column"
)
if name in self._named_resources:
raise ValueError(
f"Duplicate {name!r}, "
f"already handled by {self._named_resources[name]!r}"
)
self._named_resources[name] = resource
self._resources.append(resource)
if isinstance(resource, MatchedSubAppResource):
# We cannot index match sub-app resources because they have match rules
self._matched_sub_app_resources.append(resource)
else:
self.index_resource(resource)
def _get_resource_index_key(self, resource: AbstractResource) -> str:
"""Return a key to index the resource in the resource index."""
if "{" in (index_key := resource.canonical):
# strip at the first { to allow for variables, and than
# rpartition at / to allow for variable parts in the path
# For example if the canonical path is `/core/locations{tail:.*}`
# the index key will be `/core` since index is based on the
# url parts split by `/`
index_key = index_key.partition("{")[0].rpartition("/")[0]
return index_key.rstrip("/") or "/"
def index_resource(self, resource: AbstractResource) -> None:
"""Add a resource to the resource index."""
resource_key = self._get_resource_index_key(resource)
# There may be multiple resources for a canonical path
# so we keep them in a list to ensure that registration
# order is respected.
self._resource_index.setdefault(resource_key, []).append(resource)
def unindex_resource(self, resource: AbstractResource) -> None:
"""Remove a resource from the resource index."""
resource_key = self._get_resource_index_key(resource)
self._resource_index[resource_key].remove(resource)
[docs]
def add_resource(self, path: str, *, name: str | None = None) -> Resource:
if path and not path.startswith("/"):
raise ValueError("path should be started with / or be empty")
# Reuse last added resource if path and name are the same
if self._resources:
resource = self._resources[-1]
if resource.name == name and resource.raw_match(path):
return cast(Resource, resource)
if not ("{" in path or "}" in path or ROUTE_RE.search(path)):
resource = PlainResource(path, name=name)
self.register_resource(resource)
return resource
resource = DynamicResource(path, name=name)
self.register_resource(resource)
return resource
[docs]
def add_route(
self,
method: str,
path: str,
handler: Handler | type[AbstractView],
*,
name: str | None = None,
expect_handler: _ExpectHandler | None = None,
) -> AbstractRoute:
resource = self.add_resource(path, name=name)
return resource.add_route(method, handler, expect_handler=expect_handler)
[docs]
def add_static(
self,
prefix: str,
path: PathLike,
*,
name: str | None = None,
expect_handler: _ExpectHandler | None = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
show_index: bool = False,
follow_symlinks: bool = False,
append_version: bool = False,
) -> AbstractResource:
"""Add static files view.
prefix - url prefix
path - folder with files
"""
assert prefix.startswith("/")
if prefix.endswith("/"):
prefix = prefix[:-1]
resource = StaticResource(
prefix,
path,
name=name,
expect_handler=expect_handler,
chunk_size=chunk_size,
show_index=show_index,
follow_symlinks=follow_symlinks,
append_version=append_version,
)
self.register_resource(resource)
return resource
[docs]
def add_head(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
"""Shortcut for add_route with method HEAD."""
return self.add_route(hdrs.METH_HEAD, path, handler, **kwargs)
def add_options(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
"""Shortcut for add_route with method OPTIONS."""
return self.add_route(hdrs.METH_OPTIONS, path, handler, **kwargs)
[docs]
def add_get(
self,
path: str,
handler: Handler,
*,
name: str | None = None,
allow_head: bool = True,
**kwargs: Any,
) -> AbstractRoute:
"""Shortcut for add_route with method GET.
If allow_head is true, another
route is added allowing head requests to the same endpoint.
"""
resource = self.add_resource(path, name=name)
if allow_head:
resource.add_route(hdrs.METH_HEAD, handler, **kwargs)
return resource.add_route(hdrs.METH_GET, handler, **kwargs)
[docs]
def add_post(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
"""Shortcut for add_route with method POST."""
return self.add_route(hdrs.METH_POST, path, handler, **kwargs)
[docs]
def add_put(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
"""Shortcut for add_route with method PUT."""
return self.add_route(hdrs.METH_PUT, path, handler, **kwargs)
[docs]
def add_patch(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
"""Shortcut for add_route with method PATCH."""
return self.add_route(hdrs.METH_PATCH, path, handler, **kwargs)
[docs]
def add_delete(self, path: str, handler: Handler, **kwargs: Any) -> AbstractRoute:
"""Shortcut for add_route with method DELETE."""
return self.add_route(hdrs.METH_DELETE, path, handler, **kwargs)
[docs]
def add_view(
self, path: str, handler: type[AbstractView], **kwargs: Any
) -> AbstractRoute:
"""Shortcut for add_route with ANY methods for a class-based view."""
return self.add_route(hdrs.METH_ANY, path, handler, **kwargs)
def freeze(self) -> None:
super().freeze()
for resource in self._resources:
resource.freeze()
[docs]
def add_routes(self, routes: Iterable[AbstractRouteDef]) -> list[AbstractRoute]:
"""Append routes to route table.
Parameter should be a sequence of RouteDef objects.
Returns a list of registered AbstractRoute instances.
"""
registered_routes = []
for route_def in routes:
registered_routes.extend(route_def.register(self))
return registered_routes
def _quote_path(value: str) -> str:
if YARL_VERSION < (1, 6):
value = value.replace("%", "%25")
return URL.build(path=value, encoded=False).raw_path
def _unquote_path_safe(value: str) -> str:
if "%" not in value:
return value
return value.replace("%2F", "/").replace("%25", "%")
def _requote_path(value: str) -> str:
# Quote non-ascii characters and other characters which must be quoted,
# but preserve existing %-sequences.
result = _quote_path(value)
if "%" in value:
result = result.replace("%25", "%")
return result