import inspect
import json as json_module
import logging
from functools import partialmethod
from functools import wraps
from http import client
from itertools import groupby
from re import Pattern
from threading import Lock as _ThreadingLock
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Mapping
from typing import NamedTuple
from typing import Optional
from typing import Sequence
from typing import Sized
from typing import Tuple
from typing import Type
from typing import Union
from typing import overload
from warnings import warn

import yaml
from requests.adapters import HTTPAdapter
from requests.adapters import MaxRetryError
from requests.exceptions import ConnectionError
from requests.exceptions import RetryError

from responses.matchers import json_params_matcher as _json_params_matcher
from responses.matchers import query_string_matcher as _query_string_matcher
from responses.matchers import urlencoded_params_matcher as _urlencoded_params_matcher
from responses.registries import FirstMatchRegistry

try:
    from typing_extensions import Literal
except ImportError:  # pragma: no cover
    from typing import Literal  # type: ignore  # pragma: no cover

from io import BufferedReader
from io import BytesIO
from unittest import mock as std_mock
from urllib.parse import parse_qsl
from urllib.parse import quote
from urllib.parse import urlsplit
from urllib.parse import urlunparse
from urllib.parse import urlunsplit

from urllib3.response import HTTPHeaderDict
from urllib3.response import HTTPResponse
from urllib3.util.url import parse_url

if TYPE_CHECKING:  # pragma: no cover
    # import only for linter run
    import os
    from typing import Protocol
    from unittest.mock import _patch as _mock_patcher

    from requests import PreparedRequest
    from requests import models
    from urllib3 import Retry as _Retry

    class UnboundSend(Protocol):
        def __call__(
            self,
            adapter: HTTPAdapter,
            request: PreparedRequest,
            *args: Any,
            **kwargs: Any,
        ) -> models.Response:
            ...

    # Block of type annotations
    _Body = Union[str, BaseException, "Response", BufferedReader, bytes, None]
    _F = Callable[..., Any]
    _HeaderSet = Optional[Union[Mapping[str, str], List[Tuple[str, str]]]]
    _MatcherIterable = Iterable[Callable[..., Tuple[bool, str]]]
    _HTTPMethodOrResponse = Optional[Union[str, "BaseResponse"]]
    _URLPatternType = Union["Pattern[str]", str]
    _HTTPAdapterSend = Callable[
        [
            HTTPAdapter,
            PreparedRequest,
            bool,
            Union[float, Tuple[float, float], Tuple[float, None], None],
            Union[bool, str],
            Union[bytes, str, Tuple[Union[bytes, str], Union[bytes, str]], None],
            Optional[Mapping[str, str]],
        ],
        models.Response,
    ]


class Call(NamedTuple):
    request: "PreparedRequest"
    response: "_Body"


_real_send = HTTPAdapter.send
_UNSET = object()

logger = logging.getLogger("responses")


class FalseBool:
    """Class to mock up built-in False boolean.

    Used for backwards compatibility, see
    https://github.com/getsentry/responses/issues/464
    """

    def __bool__(self) -> bool:
        return False


def urlencoded_params_matcher(params: Optional[Dict[str, str]]) -> Callable[..., Any]:
    warn(
        "Function is deprecated. Use 'from responses.matchers import urlencoded_params_matcher'",
        DeprecationWarning,
    )
    return _urlencoded_params_matcher(params)


def json_params_matcher(params: Optional[Dict[str, Any]]) -> Callable[..., Any]:
    warn(
        "Function is deprecated. Use 'from responses.matchers import json_params_matcher'",
        DeprecationWarning,
    )
    return _json_params_matcher(params)


def _has_unicode(s: str) -> bool:
    return any(ord(char) > 128 for char in s)


def _clean_unicode(url: str) -> str:
    """Clean up URLs, which use punycode to handle unicode chars.

    Applies percent encoding to URL path and query if required.

    Parameters
    ----------
    url : str
        URL that should be cleaned from unicode

    Returns
    -------
    str
        Cleaned URL

    """
    urllist = list(urlsplit(url))
    netloc = urllist[1]
    if _has_unicode(netloc):
        domains = netloc.split(".")
        for i, d in enumerate(domains):
            if _has_unicode(d):
                d = "xn--" + d.encode("punycode").decode("ascii")
                domains[i] = d
        urllist[1] = ".".join(domains)
        url = urlunsplit(urllist)

    # Clean up path/query/params, which use url-encoding to handle unicode chars
    chars = list(url)
    for i, x in enumerate(chars):
        if ord(x) > 128:
            chars[i] = quote(x)

    return "".join(chars)


def get_wrapped(
    func: Callable[..., Any],
    responses: "RequestsMock",
    *,
    registry: Optional[Any] = None,
    assert_all_requests_are_fired: Optional[bool] = None,
) -> Callable[..., Any]:
    """Wrap provided function inside ``responses`` context manager.

    Provides a synchronous or asynchronous wrapper for the function.


    Parameters
    ----------
    func : Callable
        Function to wrap.
    responses : RequestsMock
        Mock object that is used as context manager.
    registry : FirstMatchRegistry, optional
        Custom registry that should be applied. See ``responses.registries``
    assert_all_requests_are_fired : bool
        Raise an error if not all registered responses were executed.

    Returns
    -------
    Callable
        Wrapped function

    """
    assert_mock = std_mock.patch.object(
        target=responses,
        attribute="assert_all_requests_are_fired",
        new=assert_all_requests_are_fired,
    )

    if inspect.iscoroutinefunction(func):
        # set asynchronous wrapper if requestor function is asynchronous
        @wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:  # type: ignore[misc]
            if registry is not None:
                responses._set_registry(registry)

            with assert_mock, responses:
                return await func(*args, **kwargs)

    else:

        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:  # type: ignore[misc]
            if registry is not None:
                responses._set_registry(registry)

            with assert_mock, responses:
                # set 'assert_all_requests_are_fired' temporarily for a single run.
                # Mock automatically unsets to avoid leakage to another decorated
                # function since we still apply the value on 'responses.mock' object
                return func(*args, **kwargs)

    return wrapper


class CallList(Sequence[Any], Sized):
    def __init__(self) -> None:
        self._calls: List[Call] = []

    def __iter__(self) -> Iterator[Call]:
        return iter(self._calls)

    def __len__(self) -> int:
        return len(self._calls)

    @overload
    def __getitem__(self, idx: int) -> Call:
        ...

    @overload
    def __getitem__(self, idx: "slice[int, int, Optional[int]]") -> List[Call]:
        ...

    def __getitem__(self, idx: Union[int, slice]) -> Union[Call, List[Call]]:
        return self._calls[idx]

    def add(self, request: "PreparedRequest", response: "_Body") -> None:
        self._calls.append(Call(request, response))

    def add_call(self, call: Call) -> None:
        self._calls.append(call)

    def reset(self) -> None:
        self._calls = []


def _ensure_url_default_path(
    url: "_URLPatternType",
) -> "_URLPatternType":
    """Add empty URL path '/' if doesn't exist.

    Examples
    --------
    >>> _ensure_url_default_path("http://example.com")
    "http://example.com/"

    Parameters
    ----------
    url : str or re.Pattern
        URL to validate.

    Returns
    -------
    url : str or re.Pattern
        Modified URL if str or unchanged re.Pattern

    """
    if isinstance(url, str):
        url_parts = list(urlsplit(url))
        if url_parts[2] == "":
            url_parts[2] = "/"
            url = urlunsplit(url_parts)
    return url


def _get_url_and_path(url: str) -> str:
    """Construct URL only containing scheme, netloc and path by truncating other parts.

    This method complies with RFC 3986.

    Examples
    --------
    >>> _get_url_and_path("http://example.com/path;segment?ab=xy&zed=qwe#test=1&foo=bar")
    "http://example.com/path;segment"


    Parameters
    ----------
    url : str
        URL to parse.

    Returns
    -------
    url : str
        URL with scheme, netloc and path

    """
    url_parsed = urlsplit(url)
    url_and_path = urlunparse(
        [url_parsed.scheme, url_parsed.netloc, url_parsed.path, None, None, None]
    )
    return parse_url(url_and_path).url


def _handle_body(
    body: Optional[Union[bytes, BufferedReader, str]]
) -> Union[BufferedReader, BytesIO]:
    """Generates `Response` body.

    Parameters
    ----------
    body : str or bytes or BufferedReader
        Input data to generate `Response` body.

    Returns
    -------
    body : BufferedReader or BytesIO
        `Response` body

    """
    if isinstance(body, str):
        body = body.encode("utf-8")
    if isinstance(body, BufferedReader):
        return body

    data = BytesIO(body)  # type: ignore[arg-type]

    def is_closed() -> bool:
        """
        Real Response uses HTTPResponse as body object.
        Thus, when method is_closed is called first to check if there is any more
        content to consume and the file-like object is still opened

        This method ensures stability to work for both:
        https://github.com/getsentry/responses/issues/438
        https://github.com/getsentry/responses/issues/394

        where file should be intentionally be left opened to continue consumption
        """
        if not data.closed and data.read(1):
            # if there is more bytes to read then keep open, but return pointer
            data.seek(-1, 1)
            return False
        else:
            if not data.closed:
                # close but return False to mock like is still opened
                data.close()
                return False

            # only if file really closed (by us) return True
            return True

    data.isclosed = is_closed  # type: ignore[attr-defined]
    return data


class BaseResponse:
    passthrough: bool = False
    content_type: Optional[str] = None
    headers: Optional[Mapping[str, str]] = None
    stream: Optional[bool] = False

    def __init__(
        self,
        method: str,
        url: "_URLPatternType",
        match_querystring: Union[bool, object] = None,
        match: "_MatcherIterable" = (),
        *,
        passthrough: bool = False,
    ) -> None:
        self.method: str = method
        # ensure the url has a default path set if the url is a string
        self.url: "_URLPatternType" = _ensure_url_default_path(url)

        if self._should_match_querystring(match_querystring):
            match = tuple(match) + (
                _query_string_matcher(urlsplit(self.url).query),  # type: ignore[arg-type]
            )

        self.match: "_MatcherIterable" = match
        self._calls: CallList = CallList()
        self.passthrough = passthrough

        self.status: int = 200
        self.body: "_Body" = ""

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, BaseResponse):
            return False

        if self.method != other.method:
            return False

        # Can't simply do an equality check on the objects directly here since __eq__ isn't
        # implemented for regex. It might seem to work as regex is using a cache to return
        # the same regex instances, but it doesn't in all cases.
        self_url = self.url.pattern if isinstance(self.url, Pattern) else self.url
        other_url = other.url.pattern if isinstance(other.url, Pattern) else other.url

        return self_url == other_url

    def __ne__(self, other: Any) -> bool:
        return not self.__eq__(other)

    def _should_match_querystring(
        self, match_querystring_argument: Union[bool, object]
    ) -> Union[bool, object]:
        if isinstance(self.url, Pattern):
            # the old default from <= 0.9.0
            return False

        if match_querystring_argument is not None:
            if not isinstance(match_querystring_argument, FalseBool):
                warn(
                    (
                        "Argument 'match_querystring' is deprecated. "
                        "Use 'responses.matchers.query_param_matcher' or "
                        "'responses.matchers.query_string_matcher'"
                    ),
                    DeprecationWarning,
                )
            return match_querystring_argument

        return bool(urlsplit(self.url).query)

    def _url_matches(self, url: "_URLPatternType", other: str) -> bool:
        """Compares two URLs.

        Compares only scheme, netloc and path. If 'url' is a re.Pattern, then checks that
        'other' matches the pattern.

        Parameters
        ----------
        url : Union["Pattern[str]", str]
            Reference URL or Pattern to compare.

        other : str
            URl that should be compared.

        Returns
        -------
        bool
            True, if URLs are identical or 'other' matches the pattern.

        """
        if isinstance(url, str):
            if _has_unicode(url):
                url = _clean_unicode(url)

            return _get_url_and_path(url) == _get_url_and_path(other)

        elif isinstance(url, Pattern) and url.match(other):
            return True

        else:
            return False

    @staticmethod
    def _req_attr_matches(
        match: "_MatcherIterable", request: "PreparedRequest"
    ) -> Tuple[bool, str]:
        for matcher in match:
            valid, reason = matcher(request)
            if not valid:
                return False, reason

        return True, ""

    def get_headers(self) -> HTTPHeaderDict:
        headers = HTTPHeaderDict()  # Duplicate headers are legal

        # Add Content-Type if it exists and is not already in headers
        if self.content_type and (
            not self.headers or "Content-Type" not in self.headers
        ):
            headers["Content-Type"] = self.content_type

        # Extend headers if they exist
        if self.headers:
            headers.extend(self.headers)

        return headers

    def get_response(self, request: "PreparedRequest") -> HTTPResponse:
        raise NotImplementedError

    def matches(self, request: "PreparedRequest") -> Tuple[bool, str]:
        if request.method != self.method:
            return False, "Method does not match"

        if not self._url_matches(self.url, str(request.url)):
            return False, "URL does not match"

        valid, reason = self._req_attr_matches(self.match, request)
        if not valid:
            return False, reason

        return True, ""

    @property
    def call_count(self) -> int:
        return len(self._calls)

    @property
    def calls(self) -> CallList:
        return self._calls


def _form_response(
    body: Union[BufferedReader, BytesIO],
    headers: Optional[Mapping[str, str]],
    status: int,
    request_method: Optional[str],
) -> HTTPResponse:
    """
    Function to generate `urllib3.response.HTTPResponse` object.

    The cookie handling functionality of the `requests` library relies on the response object
    having an original response object with the headers stored in the `msg` attribute.
    Instead of supplying a file-like object of type `HTTPMessage` for the headers, we provide
    the headers directly. This approach eliminates the need to parse the headers into a file-like
    object and then rely on the library to unparse it back. These additional conversions can
    introduce potential errors.
    """

    data = BytesIO()
    data.close()

    """
    The type `urllib3.response.HTTPResponse` is incorrect; we should
    use `http.client.HTTPResponse` instead. However, changing this requires opening
    a real socket to imitate the object. This may not be desired, as some users may
    want to completely restrict network access in their tests.
    See https://github.com/getsentry/responses/issues/691
    """
    orig_response = HTTPResponse(
        body=data,  # required to avoid "ValueError: Unable to determine whether fp is closed."
        msg=headers,  # type: ignore[arg-type]
        preload_content=False,
    )
    return HTTPResponse(
        status=status,
        reason=client.responses.get(status, None),
        body=body,
        headers=headers,
        original_response=orig_response,  # type: ignore[arg-type]  # See comment above
        preload_content=False,
        request_method=request_method,
    )


class Response(BaseResponse):
    def __init__(
        self,
        method: str,
        url: "_URLPatternType",
        body: "_Body" = "",
        json: Optional[Any] = None,
        status: int = 200,
        headers: Optional[Mapping[str, str]] = None,
        stream: Optional[bool] = None,
        content_type: Union[str, object] = _UNSET,
        auto_calculate_content_length: bool = False,
        **kwargs: Any,
    ) -> None:
        super().__init__(method, url, **kwargs)

        # if we were passed a `json` argument,
        # override the body and content_type
        if json is not None:
            assert not body
            body = json_module.dumps(json)
            if content_type is _UNSET:
                content_type = "application/json"

        if content_type is _UNSET:
            if isinstance(body, str) and _has_unicode(body):
                content_type = "text/plain; charset=utf-8"
            else:
                content_type = "text/plain"

        self.body: "_Body" = body
        self.status: int = status
        self.headers: Optional[Mapping[str, str]] = headers

        if stream is not None:
            warn(
                "stream argument is deprecated. Use stream parameter in request directly",
                DeprecationWarning,
            )

        self.stream: Optional[bool] = stream
        self.content_type: str = content_type  # type: ignore[assignment]
        self.auto_calculate_content_length: bool = auto_calculate_content_length

    def get_response(self, request: "PreparedRequest") -> HTTPResponse:
        if self.body and isinstance(self.body, Exception):
            setattr(self.body, "request", request)
            raise self.body

        headers = self.get_headers()
        status = self.status

        assert not isinstance(self.body, (Response, BaseException))
        body = _handle_body(self.body)

        if (
            self.auto_calculate_content_length
            and isinstance(body, BytesIO)
            and "Content-Length" not in headers
        ):
            content_length = len(body.getvalue())
            headers["Content-Length"] = str(content_length)

        return _form_response(body, headers, status, request.method)

    def __repr__(self) -> str:
        return (
            "<Response(url='{url}' status={status} "
            "content_type='{content_type}' headers='{headers}')>".format(
                url=self.url,
                status=self.status,
                content_type=self.content_type,
                headers=json_module.dumps(self.headers),
            )
        )


class CallbackResponse(BaseResponse):
    def __init__(
        self,
        method: str,
        url: "_URLPatternType",
        callback: Callable[[Any], Any],
        stream: Optional[bool] = None,
        content_type: Optional[str] = "text/plain",
        **kwargs: Any,
    ) -> None:
        super().__init__(method, url, **kwargs)

        self.callback = callback

        if stream is not None:
            warn(
                "stream argument is deprecated. Use stream parameter in request directly",
                DeprecationWarning,
            )
        self.stream: Optional[bool] = stream
        self.content_type: Optional[str] = content_type

    def get_response(self, request: "PreparedRequest") -> HTTPResponse:
        headers = self.get_headers()

        result = self.callback(request)
        if isinstance(result, Exception):
            raise result

        status, r_headers, body = result
        if isinstance(body, Exception):
            raise body

        # If the callback set a content-type remove the one
        # set in add_callback() so that we don't have multiple
        # content type values.
        has_content_type = False
        if isinstance(r_headers, dict) and "Content-Type" in r_headers:
            has_content_type = True
        elif isinstance(r_headers, list):
            has_content_type = any(
                [h for h in r_headers if h and h[0].lower() == "content-type"]
            )
        if has_content_type:
            headers.pop("Content-Type", None)

        body = _handle_body(body)
        headers.extend(r_headers)

        return _form_response(body, headers, status, request.method)


class PassthroughResponse(BaseResponse):
    def __init__(self, *args: Any, **kwargs: Any):
        super().__init__(*args, passthrough=True, **kwargs)


class RequestsMock:
    DELETE: Literal["DELETE"] = "DELETE"
    GET: Literal["GET"] = "GET"
    HEAD: Literal["HEAD"] = "HEAD"
    OPTIONS: Literal["OPTIONS"] = "OPTIONS"
    PATCH: Literal["PATCH"] = "PATCH"
    POST: Literal["POST"] = "POST"
    PUT: Literal["PUT"] = "PUT"

    Response: Type[Response] = Response

    # Make the `matchers` name available under a RequestsMock instance
    from responses import matchers

    response_callback: Optional[Callable[[Any], Any]] = None

    def __init__(
        self,
        assert_all_requests_are_fired: bool = True,
        response_callback: Optional[Callable[[Any], Any]] = None,
        passthru_prefixes: Tuple[str, ...] = (),
        target: str = "requests.adapters.HTTPAdapter.send",
        registry: Type[FirstMatchRegistry] = FirstMatchRegistry,
        *,
        real_adapter_send: "_HTTPAdapterSend" = _real_send,
    ) -> None:
        self._calls: CallList = CallList()
        self.reset()
        self._registry: FirstMatchRegistry = registry()  # call only after reset
        self.assert_all_requests_are_fired: bool = assert_all_requests_are_fired
        self.response_callback: Optional[Callable[[Any], Response]] = response_callback
        self.passthru_prefixes: Tuple[_URLPatternType, ...] = tuple(passthru_prefixes)
        self.target: str = target
        self._patcher: Optional["_mock_patcher[Any]"] = None
        self._thread_lock = _ThreadingLock()
        self._real_send = real_adapter_send

    def get_registry(self) -> FirstMatchRegistry:
        """Returns current registry instance with responses.

        Returns
        -------
        FirstMatchRegistry
            Current registry instance with responses.

        """
        return self._registry

    def _set_registry(self, new_registry: Type[FirstMatchRegistry]) -> None:
        """Replaces current registry with `new_registry`.

        Parameters
        ----------
        new_registry : Type[FirstMatchRegistry]
            Class reference of the registry that should be set, eg OrderedRegistry

        """
        if self.registered():
            err_msg = (
                "Cannot replace Registry, current registry has responses.\n"
                "Run 'responses.registry.reset()' first"
            )
            raise AttributeError(err_msg)

        self._registry = new_registry()

    def reset(self) -> None:
        """Resets registry (including type), calls, passthru_prefixes to default values."""
        self._registry = FirstMatchRegistry()
        self._calls.reset()
        self.passthru_prefixes = ()

    def add(
        self,
        method: "_HTTPMethodOrResponse" = None,
        url: "Optional[_URLPatternType]" = None,
        body: "_Body" = "",
        adding_headers: "_HeaderSet" = None,
        *args: Any,
        **kwargs: Any,
    ) -> BaseResponse:
        """
        >>> import responses

        A basic request:
        >>> responses.add(responses.GET, 'http://example.com')

        You can also directly pass an object which implements the
        ``BaseResponse`` interface:

        >>> responses.add(Response(...))

        A JSON payload:

        >>> responses.add(
        >>>     method='GET',
        >>>     url='http://example.com',
        >>>     json={'foo': 'bar'},
        >>> )

        Custom headers:

        >>> responses.add(
        >>>     method='GET',
        >>>     url='http://example.com',
        >>>     headers={'X-Header': 'foo'},
        >>> )

        """
        if isinstance(method, BaseResponse):
            return self._registry.add(method)

        if adding_headers is not None:
            kwargs.setdefault("headers", adding_headers)
        if (
            "content_type" in kwargs
            and "headers" in kwargs
            and kwargs["headers"] is not None
        ):
            header_keys = [header.lower() for header in kwargs["headers"]]
            if "content-type" in header_keys:
                raise RuntimeError(
                    "You cannot define both `content_type` and `headers[Content-Type]`."
                    " Using the `content_type` kwarg is recommended."
                )

        assert url is not None
        assert isinstance(method, str)
        response = Response(method=method, url=url, body=body, **kwargs)
        return self._registry.add(response)

    delete = partialmethod(add, DELETE)
    get = partialmethod(add, GET)
    head = partialmethod(add, HEAD)
    options = partialmethod(add, OPTIONS)
    patch = partialmethod(add, PATCH)
    post = partialmethod(add, POST)
    put = partialmethod(add, PUT)

    def _parse_response_file(
        self, file_path: "Union[str, bytes, os.PathLike[Any]]"
    ) -> "Dict[str, Any]":
        with open(file_path) as file:
            data = yaml.safe_load(file)
        return data

    def _add_from_file(self, file_path: "Union[str, bytes, os.PathLike[Any]]") -> None:
        data = self._parse_response_file(file_path)

        for rsp in data["responses"]:
            rsp = rsp["response"]
            self.add(
                method=rsp["method"],
                url=rsp["url"],
                body=rsp["body"],
                status=rsp["status"],
                headers=rsp["headers"] if "headers" in rsp else None,
                content_type=rsp["content_type"],
                auto_calculate_content_length=rsp["auto_calculate_content_length"],
            )

    def add_passthru(self, prefix: "_URLPatternType") -> None:
        """
        Register a URL prefix or regex to passthru any non-matching mock requests to.

        For example, to allow any request to 'https://example.com', but require
        mocks for the remainder, you would add the prefix as so:

        >>> import responses
        >>> responses.add_passthru('https://example.com')

        Regex can be used like:

        >>> import re
        >>> responses.add_passthru(re.compile('https://example.com/\\w+'))
        """
        if not isinstance(prefix, Pattern) and _has_unicode(prefix):
            prefix = _clean_unicode(prefix)
        self.passthru_prefixes += (prefix,)

    def remove(
        self,
        method_or_response: "_HTTPMethodOrResponse" = None,
        url: "Optional[_URLPatternType]" = None,
    ) -> List[BaseResponse]:
        """
        Removes a response previously added using ``add()``, identified
        either by a response object inheriting ``BaseResponse`` or
        ``method`` and ``url``. Removes all matching responses.

        >>> import responses
        >>> responses.add(responses.GET, 'http://example.org')
        >>> responses.remove(responses.GET, 'http://example.org')
        """
        if isinstance(method_or_response, BaseResponse):
            response = method_or_response
        else:
            assert url is not None
            assert isinstance(method_or_response, str)
            response = BaseResponse(method=method_or_response, url=url)

        return self._registry.remove(response)

    def replace(
        self,
        method_or_response: "_HTTPMethodOrResponse" = None,
        url: "Optional[_URLPatternType]" = None,
        body: "_Body" = "",
        *args: Any,
        **kwargs: Any,
    ) -> BaseResponse:
        """
        Replaces a response previously added using ``add()``. The signature
        is identical to ``add()``. The response is identified using ``method``
        and ``url``, and the first matching response is replaced.

        >>> import responses
        >>> responses.add(responses.GET, 'http://example.org', json={'data': 1})
        >>> responses.replace(responses.GET, 'http://example.org', json={'data': 2})
        """
        if isinstance(method_or_response, BaseResponse):
            response = method_or_response
        else:
            assert url is not None
            assert isinstance(method_or_response, str)
            response = Response(method=method_or_response, url=url, body=body, **kwargs)

        return self._registry.replace(response)

    def upsert(
        self,
        method_or_response: "_HTTPMethodOrResponse" = None,
        url: "Optional[_URLPatternType]" = None,
        body: "_Body" = "",
        *args: Any,
        **kwargs: Any,
    ) -> BaseResponse:
        """
        Replaces a response previously added using ``add()``, or adds the response
        if no response exists.  Responses are matched using ``method``and ``url``.
        The first matching response is replaced.

        >>> import responses
        >>> responses.add(responses.GET, 'http://example.org', json={'data': 1})
        >>> responses.upsert(responses.GET, 'http://example.org', json={'data': 2})
        """
        try:
            return self.replace(method_or_response, url, body, *args, **kwargs)
        except ValueError:
            return self.add(method_or_response, url, body, *args, **kwargs)

    def add_callback(
        self,
        method: str,
        url: "_URLPatternType",
        callback: Callable[
            ["PreparedRequest"],
            Union[Exception, Tuple[int, Mapping[str, str], "_Body"]],
        ],
        match_querystring: Union[bool, FalseBool] = FalseBool(),
        content_type: Optional[str] = "text/plain",
        match: "_MatcherIterable" = (),
    ) -> None:
        self._registry.add(
            CallbackResponse(
                url=url,
                method=method,
                callback=callback,
                content_type=content_type,
                match_querystring=match_querystring,
                match=match,
            )
        )

    def registered(self) -> List["BaseResponse"]:
        return self._registry.registered

    @property
    def calls(self) -> CallList:
        return self._calls

    def __enter__(self) -> "RequestsMock":
        self.start()
        return self

    def __exit__(self, type: Any, value: Any, traceback: Any) -> bool:
        success = type is None
        try:
            self.stop(allow_assert=success)
        finally:
            self.reset()
        return success

    @overload
    def activate(self, func: "_F" = ...) -> "_F":
        """Overload for scenario when 'responses.activate' is used."""

    @overload
    def activate(  # type: ignore[misc]
        self,
        *,
        registry: Type[Any] = ...,
        assert_all_requests_are_fired: bool = ...,
    ) -> Callable[["_F"], "_F"]:
        """Overload for scenario when
        'responses.activate(registry=, assert_all_requests_are_fired=True)' is used.
        See https://github.com/getsentry/responses/pull/469 for more details
        """

    def activate(
        self,
        func: Optional["_F"] = None,
        *,
        registry: Optional[Type[Any]] = None,
        assert_all_requests_are_fired: bool = False,
    ) -> Union[Callable[["_F"], "_F"], "_F"]:
        if func is not None:
            return get_wrapped(func, self)

        def deco_activate(function: "_F") -> Callable[..., Any]:
            return get_wrapped(
                function,
                self,
                registry=registry,
                assert_all_requests_are_fired=assert_all_requests_are_fired,
            )

        return deco_activate

    def _find_match(
        self, request: "PreparedRequest"
    ) -> Tuple[Optional["BaseResponse"], List[str]]:
        """
        Iterates through all available matches and validates if any of them matches the request

        :param request: (PreparedRequest), request object
        :return:
            (Response) found match. If multiple found, then remove & return the first match.
            (list) list with reasons why other matches don't match
        """
        with self._thread_lock:
            return self._registry.find(request)

    def _parse_request_params(
        self, url: str
    ) -> Dict[str, Union[str, int, float, List[Optional[Union[str, int, float]]]]]:
        params: Dict[str, Union[str, int, float, List[Any]]] = {}
        for key, val in groupby(parse_qsl(urlsplit(url).query), lambda kv: kv[0]):
            values = list(map(lambda x: x[1], val))
            if len(values) == 1:
                values = values[0]  # type: ignore[assignment]
            params[key] = values
        return params

    def _read_filelike_body(
        self, body: Union[str, bytes, BufferedReader, None]
    ) -> Union[str, bytes, None]:
        # Requests/urllib support multiple types of body, including file-like objects.
        # Read from the file if it's a file-like object to avoid storing a closed file
        # in the call list and allow the user to compare against the data that was in the
        # request.
        # See GH #719
        if isinstance(body, str) or isinstance(body, bytes) or body is None:
            return body
        # Based on
        # https://github.com/urllib3/urllib3/blob/abbfbcb1dd274fc54b4f0a7785fd04d59b634195/src/urllib3/util/request.py#L220
        if hasattr(body, "read") or isinstance(body, BufferedReader):
            return body.read()
        return body

    def _on_request(
        self,
        adapter: "HTTPAdapter",
        request: "PreparedRequest",
        *,
        retries: Optional["_Retry"] = None,
        **kwargs: Any,
    ) -> "models.Response":
        # add attributes params and req_kwargs to 'request' object for further match comparison
        # original request object does not have these attributes
        request.params = self._parse_request_params(request.path_url)  # type: ignore[attr-defined]
        request.req_kwargs = kwargs  # type: ignore[attr-defined]
        request_url = str(request.url)
        request.body = self._read_filelike_body(request.body)

        match, match_failed_reasons = self._find_match(request)
        resp_callback = self.response_callback

        if match is None:
            if any(
                [
                    p.match(request_url)
                    if isinstance(p, Pattern)
                    else request_url.startswith(p)
                    for p in self.passthru_prefixes
                ]
            ):
                logger.info("request.allowed-passthru", extra={"url": request_url})
                return self._real_send(adapter, request, **kwargs)  # type: ignore

            error_msg = (
                "Connection refused by Responses - the call doesn't "
                "match any registered mock.\n\n"
                "Request: \n"
                f"- {request.method} {request_url}\n\n"
                "Available matches:\n"
            )
            for i, m in enumerate(self.registered()):
                error_msg += "- {} {} {}\n".format(
                    m.method, m.url, match_failed_reasons[i]
                )

            if self.passthru_prefixes:
                error_msg += "Passthru prefixes:\n"
                for p in self.passthru_prefixes:
                    error_msg += f"- {p}\n"

            response = ConnectionError(error_msg)
            response.request = request

            self._calls.add(request, response)
            raise response

        if match.passthrough:
            logger.info("request.passthrough-response", extra={"url": request_url})
            response = self._real_send(adapter, request, **kwargs)  # type: ignore
        else:
            try:
                response = adapter.build_response(  # type: ignore[assignment]
                    request, match.get_response(request)
                )
            except BaseException as response:
                call = Call(request, response)
                self._calls.add_call(call)
                match.calls.add_call(call)
                raise

        if resp_callback:
            response = resp_callback(response)  # type: ignore[misc]
        call = Call(request, response)  # type: ignore[misc]
        self._calls.add_call(call)
        match.calls.add_call(call)

        retries = retries or adapter.max_retries
        # first validate that current request is eligible to be retried.
        # See ``urllib3.util.retry.Retry`` documentation.
        if retries.is_retry(
            method=response.request.method, status_code=response.status_code  # type: ignore[misc]
        ):
            try:
                retries = retries.increment(
                    method=response.request.method,  # type: ignore[misc]
                    url=response.url,  # type: ignore[misc]
                    response=response.raw,  # type: ignore[misc]
                )
                return self._on_request(adapter, request, retries=retries, **kwargs)
            except MaxRetryError as e:
                if retries.raise_on_status:
                    """Since we call 'retries.increment()' by ourselves, we always set "error"
                    argument equal to None, thus, MaxRetryError exception will be raised with
                    ResponseError as a 'reason'.

                    Here we're emulating the `if isinstance(e.reason, ResponseError):`
                    branch found at:
                    https://github.com/psf/requests/blob/
                    177dd90f18a8f4dc79a7d2049f0a3f4fcc5932a0/requests/adapters.py#L549
                    """
                    raise RetryError(e, request=request)

                return response
        return response

    def unbound_on_send(self) -> "UnboundSend":
        def send(
            adapter: "HTTPAdapter",
            request: "PreparedRequest",
            *args: Any,
            **kwargs: Any,
        ) -> "models.Response":
            if args:
                # that probably means that the request was sent from the custom adapter
                # It is fully legit to send positional args from adapter, although,
                # `requests` implementation does it always with kwargs
                # See for more info: https://github.com/getsentry/responses/issues/642
                try:
                    kwargs["stream"] = args[0]
                    kwargs["timeout"] = args[1]
                    kwargs["verify"] = args[2]
                    kwargs["cert"] = args[3]
                    kwargs["proxies"] = args[4]
                except IndexError:
                    # not all kwargs are required
                    pass

            return self._on_request(adapter, request, **kwargs)

        return send

    def start(self) -> None:
        if self._patcher:
            # we must not override value of the _patcher if already applied
            # this prevents issues when one decorated function is called from
            # another decorated function
            return

        self._patcher = std_mock.patch(target=self.target, new=self.unbound_on_send())
        self._patcher.start()

    def stop(self, allow_assert: bool = True) -> None:
        if self._patcher:
            # prevent stopping unstarted patchers
            self._patcher.stop()

            # once patcher is stopped, clean it. This is required to create a new
            # fresh patcher on self.start()
            self._patcher = None

        if not self.assert_all_requests_are_fired:
            return

        if not allow_assert:
            return

        not_called = [m for m in self.registered() if m.call_count == 0]
        if not_called:
            raise AssertionError(
                "Not all requests have been executed {!r}".format(
                    [(match.method, match.url) for match in not_called]
                )
            )

    def assert_call_count(self, url: str, count: int) -> bool:
        call_count = len(
            [
                1
                for call in self.calls
                if call.request.url == _ensure_url_default_path(url)
            ]
        )
        if call_count == count:
            return True
        else:
            raise AssertionError(
                f"Expected URL '{url}' to be called {count} times. Called {call_count} times."
            )


# expose default mock namespace
mock = _default_mock = RequestsMock(assert_all_requests_are_fired=False)
__all__ = [
    "CallbackResponse",
    "Response",
    "RequestsMock",
    # Exposed by the RequestsMock class:
    "activate",
    "add",
    "_add_from_file",
    "add_callback",
    "add_passthru",
    "_deprecated_assert_all_requests_are_fired",
    "assert_call_count",
    "calls",
    "delete",
    "DELETE",
    "get",
    "GET",
    "head",
    "HEAD",
    "options",
    "OPTIONS",
    "_deprecated_passthru_prefixes",
    "patch",
    "PATCH",
    "post",
    "POST",
    "put",
    "PUT",
    "registered",
    "remove",
    "replace",
    "reset",
    "response_callback",
    "start",
    "stop",
    "_deprecated_target",
    "upsert",
]

# expose only methods and/or read-only methods
activate = _default_mock.activate
add = _default_mock.add
_add_from_file = _default_mock._add_from_file
add_callback = _default_mock.add_callback
add_passthru = _default_mock.add_passthru
_deprecated_assert_all_requests_are_fired = _default_mock.assert_all_requests_are_fired
assert_call_count = _default_mock.assert_call_count
calls = _default_mock.calls
delete = _default_mock.delete
DELETE = _default_mock.DELETE
get = _default_mock.get
GET = _default_mock.GET
head = _default_mock.head
HEAD = _default_mock.HEAD
options = _default_mock.options
OPTIONS = _default_mock.OPTIONS
_deprecated_passthru_prefixes = _default_mock.passthru_prefixes
patch = _default_mock.patch
PATCH = _default_mock.PATCH
post = _default_mock.post
POST = _default_mock.POST
put = _default_mock.put
PUT = _default_mock.PUT
registered = _default_mock.registered
remove = _default_mock.remove
replace = _default_mock.replace
reset = _default_mock.reset
response_callback = _default_mock.response_callback
start = _default_mock.start
stop = _default_mock.stop
_deprecated_target = _default_mock.target
upsert = _default_mock.upsert


deprecated_names = ["assert_all_requests_are_fired", "passthru_prefixes", "target"]


def __getattr__(name: str) -> Any:
    if name in deprecated_names:
        warn(
            f"{name} is deprecated. Please use 'responses.mock.{name}",
            DeprecationWarning,
        )
        return globals()[f"_deprecated_{name}"]
    raise AttributeError(f"module {__name__} has no attribute {name}")
