You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
136 lines
4.8 KiB
Python
136 lines
4.8 KiB
Python
2 years ago
|
from typing import Any, Dict, NoReturn, Pattern, Tuple, Type, TypeVar, Union
|
||
|
|
||
|
__all__ = [
|
||
|
"ProtocolError",
|
||
|
"LocalProtocolError",
|
||
|
"RemoteProtocolError",
|
||
|
"validate",
|
||
|
"bytesify",
|
||
|
]
|
||
|
|
||
|
|
||
|
class ProtocolError(Exception):
|
||
|
"""Exception indicating a violation of the HTTP/1.1 protocol.
|
||
|
|
||
|
This as an abstract base class, with two concrete base classes:
|
||
|
:exc:`LocalProtocolError`, which indicates that you tried to do something
|
||
|
that HTTP/1.1 says is illegal, and :exc:`RemoteProtocolError`, which
|
||
|
indicates that the remote peer tried to do something that HTTP/1.1 says is
|
||
|
illegal. See :ref:`error-handling` for details.
|
||
|
|
||
|
In addition to the normal :exc:`Exception` features, it has one attribute:
|
||
|
|
||
|
.. attribute:: error_status_hint
|
||
|
|
||
|
This gives a suggestion as to what status code a server might use if
|
||
|
this error occurred as part of a request.
|
||
|
|
||
|
For a :exc:`RemoteProtocolError`, this is useful as a suggestion for
|
||
|
how you might want to respond to a misbehaving peer, if you're
|
||
|
implementing a server.
|
||
|
|
||
|
For a :exc:`LocalProtocolError`, this can be taken as a suggestion for
|
||
|
how your peer might have responded to *you* if h11 had allowed you to
|
||
|
continue.
|
||
|
|
||
|
The default is 400 Bad Request, a generic catch-all for protocol
|
||
|
violations.
|
||
|
|
||
|
"""
|
||
|
|
||
|
def __init__(self, msg: str, error_status_hint: int = 400) -> None:
|
||
|
if type(self) is ProtocolError:
|
||
|
raise TypeError("tried to directly instantiate ProtocolError")
|
||
|
Exception.__init__(self, msg)
|
||
|
self.error_status_hint = error_status_hint
|
||
|
|
||
|
|
||
|
# Strategy: there are a number of public APIs where a LocalProtocolError can
|
||
|
# be raised (send(), all the different event constructors, ...), and only one
|
||
|
# public API where RemoteProtocolError can be raised
|
||
|
# (receive_data()). Therefore we always raise LocalProtocolError internally,
|
||
|
# and then receive_data will translate this into a RemoteProtocolError.
|
||
|
#
|
||
|
# Internally:
|
||
|
# LocalProtocolError is the generic "ProtocolError".
|
||
|
# Externally:
|
||
|
# LocalProtocolError is for local errors and RemoteProtocolError is for
|
||
|
# remote errors.
|
||
|
class LocalProtocolError(ProtocolError):
|
||
|
def _reraise_as_remote_protocol_error(self) -> NoReturn:
|
||
|
# After catching a LocalProtocolError, use this method to re-raise it
|
||
|
# as a RemoteProtocolError. This method must be called from inside an
|
||
|
# except: block.
|
||
|
#
|
||
|
# An easy way to get an equivalent RemoteProtocolError is just to
|
||
|
# modify 'self' in place.
|
||
|
self.__class__ = RemoteProtocolError # type: ignore
|
||
|
# But the re-raising is somewhat non-trivial -- you might think that
|
||
|
# now that we've modified the in-flight exception object, that just
|
||
|
# doing 'raise' to re-raise it would be enough. But it turns out that
|
||
|
# this doesn't work, because Python tracks the exception type
|
||
|
# (exc_info[0]) separately from the exception object (exc_info[1]),
|
||
|
# and we only modified the latter. So we really do need to re-raise
|
||
|
# the new type explicitly.
|
||
|
# On py3, the traceback is part of the exception object, so our
|
||
|
# in-place modification preserved it and we can just re-raise:
|
||
|
raise self
|
||
|
|
||
|
|
||
|
class RemoteProtocolError(ProtocolError):
|
||
|
pass
|
||
|
|
||
|
|
||
|
def validate(
|
||
|
regex: Pattern[bytes], data: bytes, msg: str = "malformed data", *format_args: Any
|
||
|
) -> Dict[str, bytes]:
|
||
|
match = regex.fullmatch(data)
|
||
|
if not match:
|
||
|
if format_args:
|
||
|
msg = msg.format(*format_args)
|
||
|
raise LocalProtocolError(msg)
|
||
|
return match.groupdict()
|
||
|
|
||
|
|
||
|
# Sentinel values
|
||
|
#
|
||
|
# - Inherit identity-based comparison and hashing from object
|
||
|
# - Have a nice repr
|
||
|
# - Have a *bonus property*: type(sentinel) is sentinel
|
||
|
#
|
||
|
# The bonus property is useful if you want to take the return value from
|
||
|
# next_event() and do some sort of dispatch based on type(event).
|
||
|
|
||
|
_T_Sentinel = TypeVar("_T_Sentinel", bound="Sentinel")
|
||
|
|
||
|
|
||
|
class Sentinel(type):
|
||
|
def __new__(
|
||
|
cls: Type[_T_Sentinel],
|
||
|
name: str,
|
||
|
bases: Tuple[type, ...],
|
||
|
namespace: Dict[str, Any],
|
||
|
**kwds: Any
|
||
|
) -> _T_Sentinel:
|
||
|
assert bases == (Sentinel,)
|
||
|
v = super().__new__(cls, name, bases, namespace, **kwds)
|
||
|
v.__class__ = v # type: ignore
|
||
|
return v
|
||
|
|
||
|
def __repr__(self) -> str:
|
||
|
return self.__name__
|
||
|
|
||
|
|
||
|
# Used for methods, request targets, HTTP versions, header names, and header
|
||
|
# values. Accepts ascii-strings, or bytes/bytearray/memoryview/..., and always
|
||
|
# returns bytes.
|
||
|
def bytesify(s: Union[bytes, bytearray, memoryview, int, str]) -> bytes:
|
||
|
# Fast-path:
|
||
|
if type(s) is bytes:
|
||
|
return s
|
||
|
if isinstance(s, str):
|
||
|
s = s.encode("ascii")
|
||
|
if isinstance(s, int):
|
||
|
raise TypeError("expected bytes-like object, not int")
|
||
|
return bytes(s)
|