import typing as t from datetime import datetime from datetime import timedelta from datetime import timezone from http import HTTPStatus from .._internal import _to_str from ..datastructures import Headers from ..datastructures import HeaderSet from ..http import dump_cookie from ..http import HTTP_STATUS_CODES from ..utils import get_content_type from werkzeug.datastructures import CallbackDict from werkzeug.datastructures import ContentRange from werkzeug.datastructures import ContentSecurityPolicy from werkzeug.datastructures import ResponseCacheControl from werkzeug.datastructures import WWWAuthenticate from werkzeug.http import COEP from werkzeug.http import COOP from werkzeug.http import dump_age from werkzeug.http import dump_header from werkzeug.http import dump_options_header from werkzeug.http import http_date from werkzeug.http import parse_age from werkzeug.http import parse_cache_control_header from werkzeug.http import parse_content_range_header from werkzeug.http import parse_csp_header from werkzeug.http import parse_date from werkzeug.http import parse_options_header from werkzeug.http import parse_set_header from werkzeug.http import parse_www_authenticate_header from werkzeug.http import quote_etag from werkzeug.http import unquote_etag from werkzeug.utils import header_property def _set_property(name: str, doc: t.Optional[str] = None) -> property: def fget(self: "Response") -> HeaderSet: def on_update(header_set: HeaderSet) -> None: if not header_set and name in self.headers: del self.headers[name] elif header_set: self.headers[name] = header_set.to_header() return parse_set_header(self.headers.get(name), on_update) def fset( self: "Response", value: t.Optional[ t.Union[str, t.Dict[str, t.Union[str, int]], t.Iterable[str]] ], ) -> None: if not value: del self.headers[name] elif isinstance(value, str): self.headers[name] = value else: self.headers[name] = dump_header(value) return property(fget, fset, doc=doc) class Response: """Represents the non-IO parts of an HTTP response, specifically the status and headers but not the body. This class is not meant for general use. It should only be used when implementing WSGI, ASGI, or another HTTP application spec. Werkzeug provides a WSGI implementation at :cls:`werkzeug.wrappers.Response`. :param status: The status code for the response. Either an int, in which case the default status message is added, or a string in the form ``{code} {message}``, like ``404 Not Found``. Defaults to 200. :param headers: A :class:`~werkzeug.datastructures.Headers` object, or a list of ``(key, value)`` tuples that will be converted to a ``Headers`` object. :param mimetype: The mime type (content type without charset or other parameters) of the response. If the value starts with ``text/`` (or matches some other special cases), the charset will be added to create the ``content_type``. :param content_type: The full content type of the response. Overrides building the value from ``mimetype``. .. versionadded:: 2.0 """ #: the charset of the response. charset = "utf-8" #: the default status if none is provided. default_status = 200 #: the default mimetype if none is provided. default_mimetype = "text/plain" #: Warn if a cookie header exceeds this size. The default, 4093, should be #: safely `supported by most browsers `_. A cookie larger than #: this size will still be sent, but it may be ignored or handled #: incorrectly by some browsers. Set to 0 to disable this check. #: #: .. versionadded:: 0.13 #: #: .. _`cookie`: http://browsercookielimits.squawky.net/ max_cookie_size = 4093 # A :class:`Headers` object representing the response headers. headers: Headers def __init__( self, status: t.Optional[t.Union[int, str, HTTPStatus]] = None, headers: t.Optional[ t.Union[ t.Mapping[str, t.Union[str, int, t.Iterable[t.Union[str, int]]]], t.Iterable[t.Tuple[str, t.Union[str, int]]], ] ] = None, mimetype: t.Optional[str] = None, content_type: t.Optional[str] = None, ) -> None: if isinstance(headers, Headers): self.headers = headers elif not headers: self.headers = Headers() else: self.headers = Headers(headers) if content_type is None: if mimetype is None and "content-type" not in self.headers: mimetype = self.default_mimetype if mimetype is not None: mimetype = get_content_type(mimetype, self.charset) content_type = mimetype if content_type is not None: self.headers["Content-Type"] = content_type if status is None: status = self.default_status self.status = status # type: ignore def __repr__(self) -> str: return f"<{type(self).__name__} [{self.status}]>" @property def status_code(self) -> int: """The HTTP status code as a number.""" return self._status_code @status_code.setter def status_code(self, code: int) -> None: self.status = code # type: ignore @property def status(self) -> str: """The HTTP status code as a string.""" return self._status @status.setter def status(self, value: t.Union[str, int, HTTPStatus]) -> None: if not isinstance(value, (str, bytes, int, HTTPStatus)): raise TypeError("Invalid status argument") self._status, self._status_code = self._clean_status(value) def _clean_status(self, value: t.Union[str, int, HTTPStatus]) -> t.Tuple[str, int]: if isinstance(value, HTTPStatus): value = int(value) status = _to_str(value, self.charset) split_status = status.split(None, 1) if len(split_status) == 0: raise ValueError("Empty status argument") if len(split_status) > 1: if split_status[0].isdigit(): # code and message return status, int(split_status[0]) # multi-word message return f"0 {status}", 0 if split_status[0].isdigit(): # code only status_code = int(split_status[0]) try: status = f"{status_code} {HTTP_STATUS_CODES[status_code].upper()}" except KeyError: status = f"{status_code} UNKNOWN" return status, status_code # one-word message return f"0 {status}", 0 def set_cookie( self, key: str, value: str = "", max_age: t.Optional[t.Union[timedelta, int]] = None, expires: t.Optional[t.Union[str, datetime, int, float]] = None, path: t.Optional[str] = "/", domain: t.Optional[str] = None, secure: bool = False, httponly: bool = False, samesite: t.Optional[str] = None, ) -> None: """Sets a cookie. A warning is raised if the size of the cookie header exceeds :attr:`max_cookie_size`, but the header will still be set. :param key: the key (name) of the cookie to be set. :param value: the value of the cookie. :param max_age: should be a number of seconds, or `None` (default) if the cookie should last only as long as the client's browser session. :param expires: should be a `datetime` object or UNIX timestamp. :param path: limits the cookie to a given path, per default it will span the whole domain. :param domain: if you want to set a cross-domain cookie. For example, ``domain=".example.com"`` will set a cookie that is readable by the domain ``www.example.com``, ``foo.example.com`` etc. Otherwise, a cookie will only be readable by the domain that set it. :param secure: If ``True``, the cookie will only be available via HTTPS. :param httponly: Disallow JavaScript access to the cookie. :param samesite: Limit the scope of the cookie to only be attached to requests that are "same-site". """ self.headers.add( "Set-Cookie", dump_cookie( key, value=value, max_age=max_age, expires=expires, path=path, domain=domain, secure=secure, httponly=httponly, charset=self.charset, max_size=self.max_cookie_size, samesite=samesite, ), ) def delete_cookie( self, key: str, path: str = "/", domain: t.Optional[str] = None, secure: bool = False, httponly: bool = False, samesite: t.Optional[str] = None, ) -> None: """Delete a cookie. Fails silently if key doesn't exist. :param key: the key (name) of the cookie to be deleted. :param path: if the cookie that should be deleted was limited to a path, the path has to be defined here. :param domain: if the cookie that should be deleted was limited to a domain, that domain has to be defined here. :param secure: If ``True``, the cookie will only be available via HTTPS. :param httponly: Disallow JavaScript access to the cookie. :param samesite: Limit the scope of the cookie to only be attached to requests that are "same-site". """ self.set_cookie( key, expires=0, max_age=0, path=path, domain=domain, secure=secure, httponly=httponly, samesite=samesite, ) @property def is_json(self) -> bool: """Check if the mimetype indicates JSON data, either :mimetype:`application/json` or :mimetype:`application/*+json`. """ mt = self.mimetype return mt is not None and ( mt == "application/json" or mt.startswith("application/") and mt.endswith("+json") ) # Common Descriptors @property def mimetype(self) -> t.Optional[str]: """The mimetype (content type without charset etc.)""" ct = self.headers.get("content-type") if ct: return ct.split(";")[0].strip() else: return None @mimetype.setter def mimetype(self, value: str) -> None: self.headers["Content-Type"] = get_content_type(value, self.charset) @property def mimetype_params(self) -> t.Dict[str, str]: """The mimetype parameters as dict. For example if the content type is ``text/html; charset=utf-8`` the params would be ``{'charset': 'utf-8'}``. .. versionadded:: 0.5 """ def on_update(d: CallbackDict) -> None: self.headers["Content-Type"] = dump_options_header(self.mimetype, d) d = parse_options_header(self.headers.get("content-type", ""))[1] return CallbackDict(d, on_update) location = header_property[str]( "Location", doc="""The Location response-header field is used to redirect the recipient to a location other than the Request-URI for completion of the request or identification of a new resource.""", ) age = header_property( "Age", None, parse_age, dump_age, # type: ignore doc="""The Age response-header field conveys the sender's estimate of the amount of time since the response (or its revalidation) was generated at the origin server. Age values are non-negative decimal integers, representing time in seconds.""", ) content_type = header_property[str]( "Content-Type", doc="""The Content-Type entity-header field indicates the media type of the entity-body sent to the recipient or, in the case of the HEAD method, the media type that would have been sent had the request been a GET.""", ) content_length = header_property( "Content-Length", None, int, str, doc="""The Content-Length entity-header field indicates the size of the entity-body, in decimal number of OCTETs, sent to the recipient or, in the case of the HEAD method, the size of the entity-body that would have been sent had the request been a GET.""", ) content_location = header_property[str]( "Content-Location", doc="""The Content-Location entity-header field MAY be used to supply the resource location for the entity enclosed in the message when that entity is accessible from a location separate from the requested resource's URI.""", ) content_encoding = header_property[str]( "Content-Encoding", doc="""The Content-Encoding entity-header field is used as a modifier to the media-type. When present, its value indicates what additional content codings have been applied to the entity-body, and thus what decoding mechanisms must be applied in order to obtain the media-type referenced by the Content-Type header field.""", ) content_md5 = header_property[str]( "Content-MD5", doc="""The Content-MD5 entity-header field, as defined in RFC 1864, is an MD5 digest of the entity-body for the purpose of providing an end-to-end message integrity check (MIC) of the entity-body. (Note: a MIC is good for detecting accidental modification of the entity-body in transit, but is not proof against malicious attacks.)""", ) date = header_property( "Date", None, parse_date, http_date, doc="""The Date general-header field represents the date and time at which the message was originated, having the same semantics as orig-date in RFC 822. .. versionchanged:: 2.0 The datetime object is timezone-aware. """, ) expires = header_property( "Expires", None, parse_date, http_date, doc="""The Expires entity-header field gives the date/time after which the response is considered stale. A stale cache entry may not normally be returned by a cache. .. versionchanged:: 2.0 The datetime object is timezone-aware. """, ) last_modified = header_property( "Last-Modified", None, parse_date, http_date, doc="""The Last-Modified entity-header field indicates the date and time at which the origin server believes the variant was last modified. .. versionchanged:: 2.0 The datetime object is timezone-aware. """, ) @property def retry_after(self) -> t.Optional[datetime]: """The Retry-After response-header field can be used with a 503 (Service Unavailable) response to indicate how long the service is expected to be unavailable to the requesting client. Time in seconds until expiration or date. .. versionchanged:: 2.0 The datetime object is timezone-aware. """ value = self.headers.get("retry-after") if value is None: return None elif value.isdigit(): return datetime.now(timezone.utc) + timedelta(seconds=int(value)) return parse_date(value) @retry_after.setter def retry_after(self, value: t.Optional[t.Union[datetime, int, str]]) -> None: if value is None: if "retry-after" in self.headers: del self.headers["retry-after"] return elif isinstance(value, datetime): value = http_date(value) else: value = str(value) self.headers["Retry-After"] = value vary = _set_property( "Vary", doc="""The Vary field value indicates the set of request-header fields that fully determines, while the response is fresh, whether a cache is permitted to use the response to reply to a subsequent request without revalidation.""", ) content_language = _set_property( "Content-Language", doc="""The Content-Language entity-header field describes the natural language(s) of the intended audience for the enclosed entity. Note that this might not be equivalent to all the languages used within the entity-body.""", ) allow = _set_property( "Allow", doc="""The Allow entity-header field lists the set of methods supported by the resource identified by the Request-URI. The purpose of this field is strictly to inform the recipient of valid methods associated with the resource. An Allow header field MUST be present in a 405 (Method Not Allowed) response.""", ) # ETag @property def cache_control(self) -> ResponseCacheControl: """The Cache-Control general-header field is used to specify directives that MUST be obeyed by all caching mechanisms along the request/response chain. """ def on_update(cache_control: ResponseCacheControl) -> None: if not cache_control and "cache-control" in self.headers: del self.headers["cache-control"] elif cache_control: self.headers["Cache-Control"] = cache_control.to_header() return parse_cache_control_header( self.headers.get("cache-control"), on_update, ResponseCacheControl ) def set_etag(self, etag: str, weak: bool = False) -> None: """Set the etag, and override the old one if there was one.""" self.headers["ETag"] = quote_etag(etag, weak) def get_etag(self) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]: """Return a tuple in the form ``(etag, is_weak)``. If there is no ETag the return value is ``(None, None)``. """ return unquote_etag(self.headers.get("ETag")) accept_ranges = header_property[str]( "Accept-Ranges", doc="""The `Accept-Ranges` header. Even though the name would indicate that multiple values are supported, it must be one string token only. The values ``'bytes'`` and ``'none'`` are common. .. versionadded:: 0.7""", ) @property def content_range(self) -> ContentRange: """The ``Content-Range`` header as a :class:`~werkzeug.datastructures.ContentRange` object. Available even if the header is not set. .. versionadded:: 0.7 """ def on_update(rng: ContentRange) -> None: if not rng: del self.headers["content-range"] else: self.headers["Content-Range"] = rng.to_header() rv = parse_content_range_header(self.headers.get("content-range"), on_update) # always provide a content range object to make the descriptor # more user friendly. It provides an unset() method that can be # used to remove the header quickly. if rv is None: rv = ContentRange(None, None, None, on_update=on_update) return rv @content_range.setter def content_range(self, value: t.Optional[t.Union[ContentRange, str]]) -> None: if not value: del self.headers["content-range"] elif isinstance(value, str): self.headers["Content-Range"] = value else: self.headers["Content-Range"] = value.to_header() # Authorization @property def www_authenticate(self) -> WWWAuthenticate: """The ``WWW-Authenticate`` header in a parsed form.""" def on_update(www_auth: WWWAuthenticate) -> None: if not www_auth and "www-authenticate" in self.headers: del self.headers["www-authenticate"] elif www_auth: self.headers["WWW-Authenticate"] = www_auth.to_header() header = self.headers.get("www-authenticate") return parse_www_authenticate_header(header, on_update) # CSP @property def content_security_policy(self) -> ContentSecurityPolicy: """The ``Content-Security-Policy`` header as a :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available even if the header is not set. The Content-Security-Policy header adds an additional layer of security to help detect and mitigate certain types of attacks. """ def on_update(csp: ContentSecurityPolicy) -> None: if not csp: del self.headers["content-security-policy"] else: self.headers["Content-Security-Policy"] = csp.to_header() rv = parse_csp_header(self.headers.get("content-security-policy"), on_update) if rv is None: rv = ContentSecurityPolicy(None, on_update=on_update) return rv @content_security_policy.setter def content_security_policy( self, value: t.Optional[t.Union[ContentSecurityPolicy, str]] ) -> None: if not value: del self.headers["content-security-policy"] elif isinstance(value, str): self.headers["Content-Security-Policy"] = value else: self.headers["Content-Security-Policy"] = value.to_header() @property def content_security_policy_report_only(self) -> ContentSecurityPolicy: """The ``Content-Security-policy-report-only`` header as a :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available even if the header is not set. The Content-Security-Policy-Report-Only header adds a csp policy that is not enforced but is reported thereby helping detect certain types of attacks. """ def on_update(csp: ContentSecurityPolicy) -> None: if not csp: del self.headers["content-security-policy-report-only"] else: self.headers["Content-Security-policy-report-only"] = csp.to_header() rv = parse_csp_header( self.headers.get("content-security-policy-report-only"), on_update ) if rv is None: rv = ContentSecurityPolicy(None, on_update=on_update) return rv @content_security_policy_report_only.setter def content_security_policy_report_only( self, value: t.Optional[t.Union[ContentSecurityPolicy, str]] ) -> None: if not value: del self.headers["content-security-policy-report-only"] elif isinstance(value, str): self.headers["Content-Security-policy-report-only"] = value else: self.headers["Content-Security-policy-report-only"] = value.to_header() # CORS @property def access_control_allow_credentials(self) -> bool: """Whether credentials can be shared by the browser to JavaScript code. As part of the preflight request it indicates whether credentials can be used on the cross origin request. """ return "Access-Control-Allow-Credentials" in self.headers @access_control_allow_credentials.setter def access_control_allow_credentials(self, value: t.Optional[bool]) -> None: if value is True: self.headers["Access-Control-Allow-Credentials"] = "true" else: self.headers.pop("Access-Control-Allow-Credentials", None) access_control_allow_headers = header_property( "Access-Control-Allow-Headers", load_func=parse_set_header, dump_func=dump_header, doc="Which headers can be sent with the cross origin request.", ) access_control_allow_methods = header_property( "Access-Control-Allow-Methods", load_func=parse_set_header, dump_func=dump_header, doc="Which methods can be used for the cross origin request.", ) access_control_allow_origin = header_property[str]( "Access-Control-Allow-Origin", doc="The origin or '*' for any origin that may make cross origin requests.", ) access_control_expose_headers = header_property( "Access-Control-Expose-Headers", load_func=parse_set_header, dump_func=dump_header, doc="Which headers can be shared by the browser to JavaScript code.", ) access_control_max_age = header_property( "Access-Control-Max-Age", load_func=int, dump_func=str, doc="The maximum age in seconds the access control settings can be cached for.", ) cross_origin_opener_policy = header_property[COOP]( "Cross-Origin-Opener-Policy", load_func=lambda value: COOP(value), dump_func=lambda value: value.value, default=COOP.UNSAFE_NONE, doc="""Allows control over sharing of browsing context group with cross-origin documents. Values must be a member of the :class:`werkzeug.http.COOP` enum.""", ) cross_origin_embedder_policy = header_property[COEP]( "Cross-Origin-Embedder-Policy", load_func=lambda value: COEP(value), dump_func=lambda value: value.value, default=COEP.UNSAFE_NONE, doc="""Prevents a document from loading any cross-origin resources that do not explicitly grant the document permission. Values must be a member of the :class:`werkzeug.http.COEP` enum.""", )