"""
This module contains a number of functions to handle AdES signature validation.


.. danger::
    This API is incubating, and not all features of the spec have been fully
    implemented at this stage. There will be bugs, and API changes may still
    occur.
"""

import asyncio
import dataclasses
import itertools
import logging
from copy import copy
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import (
    Any,
    Dict,
    FrozenSet,
    Generator,
    Generic,
    Iterable,
    Iterator,
    List,
    Optional,
    Set,
    Tuple,
    Type,
    TypeVar,
    Union,
    overload,
)

import tzlocal
from asn1crypto import cms, keys
from asn1crypto import pdf as asn1_pdf
from asn1crypto import tsp, x509
from asn1crypto.crl import CertificateList
from asn1crypto.ocsp import OCSPResponse
from pyhanko_certvalidator import ValidationContext
from pyhanko_certvalidator.authority import CertTrustAnchor, TrustAnchor
from pyhanko_certvalidator.context import (
    CertValidationPolicySpec,
    ValidationDataHandlers,
)
from pyhanko_certvalidator.errors import PathError
from pyhanko_certvalidator.ltv.ades_past import past_validate
from pyhanko_certvalidator.ltv.poe import (
    KnownPOE,
    POEManager,
    POEType,
    ValidationObject,
    ValidationObjectType,
    digest_for_poe,
)
from pyhanko_certvalidator.ltv.time_slide import ades_gather_prima_facie_revinfo
from pyhanko_certvalidator.ltv.types import ValidationTimingInfo
from pyhanko_certvalidator.path import ValidationPath
from pyhanko_certvalidator.policy_decl import (
    AlgorithmUsagePolicy,
    NonRevokedStatusAssertion,
    RevocationCheckingRule,
)
from pyhanko_certvalidator.registry import PathBuilder, TrustManager
from pyhanko_certvalidator.revinfo.archival import CRLContainer, OCSPContainer
from pyhanko_certvalidator.revinfo.validate_crl import CRLOfInterest
from pyhanko_certvalidator.revinfo.validate_ocsp import OCSPResponseOfInterest

from pyhanko.pdf_utils.reader import HistoricalResolver, PdfFileReader
from pyhanko.sign.ades.report import (
    AdESFailure,
    AdESIndeterminate,
    AdESPassed,
    AdESStatus,
    AdESSubIndic,
)
from pyhanko.sign.general import (
    CMSExtractionError,
    CMSStructuralError,
    MultivaluedAttributeError,
    NonexistentAttributeError,
    extract_certificate_info,
    find_cms_attribute,
    find_unique_cms_attribute,
)
from pyhanko.sign.validation import (
    DocumentSecurityStore,
    EmbeddedPdfSignature,
    errors,
    generic_cms,
)
from pyhanko.sign.validation.settings import KeyUsageConstraints
from pyhanko.sign.validation.status import (
    DocumentTimestampStatus,
    PdfSignatureStatus,
    RevocationDetails,
    SignatureCoverageLevel,
    SignatureStatus,
    SignerAttributeStatus,
    StandardCMSSignatureStatus,
    TimestampSignatureStatus,
)

from ..diff_analysis import DiffPolicy
from .dss import enumerate_ocsp_certs
from .errors import NoDSSFoundError
from .policy_decl import (
    LocalKnowledge,
    PdfSignatureValidationSpec,
    RevinfoOnlineFetchingRule,
    RevocationInfoGatheringSpec,
    SignatureValidationSpec,
    bootstrap_validation_data_handlers,
)
from .utils import CMSAlgorithmUsagePolicy

__all__ = [
    'ades_basic_validation',
    'ades_with_time_validation',
    'ades_lta_validation',
    'ades_timestamp_validation',
    'simulate_future_ades_lta_validation',
    'AdESBasicValidationResult',
    'AdESWithTimeValidationResult',
    'AdESLTAValidationResult',
    'derive_validation_object_identifier',
]

logger = logging.getLogger(__name__)

StatusType = TypeVar('StatusType', bound=SignatureStatus, covariant=True)


def derive_validation_object_binary_data(
    vo: ValidationObject,
) -> Optional[bytes]:
    if vo.object_type == ValidationObjectType.CERTIFICATE:
        return vo.value.dump()
    elif vo.object_type == ValidationObjectType.CRL:
        return vo.value.crl_data.dump()
    elif vo.object_type == ValidationObjectType.OCSP_RESPONSE:
        return vo.value.ocsp_response_data.dump()
    elif vo.object_type in (
        ValidationObjectType.SIGNED_DATA,
        ValidationObjectType.TIMESTAMP,
    ):
        return vo.value['signer_infos'][0]['signature'].native
    else:
        return None


def derive_validation_object_identifier(vo: ValidationObject) -> Optional[str]:
    # TODO for certs and signers, it could make sense to somehow encode
    #  a human-readable "slugified" representation of the common name
    #  to identify things at a glance.
    if vo.object_type == ValidationObjectType.CERTIFICATE:
        marker = digest_for_poe(vo.value.dump()).hex()
    elif vo.object_type == ValidationObjectType.CRL:
        marker = digest_for_poe(vo.value.crl_data.dump()).hex()
    elif vo.object_type == ValidationObjectType.OCSP_RESPONSE:
        marker = digest_for_poe(vo.value.ocsp_response_data.dump()).hex()
    elif vo.object_type in (
        ValidationObjectType.SIGNED_DATA,
        ValidationObjectType.TIMESTAMP,
    ):
        marker = digest_for_poe(
            vo.value['signer_infos'][0]['signature'].native
        ).hex()
    else:
        return None

    return f'vo-{vo.object_type.value}-{marker}'


class ValidationObjectSet:
    def __init__(self, *object_collections: Iterable[ValidationObject]):
        def _pairs():
            for obj in itertools.chain(*object_collections):
                ident = derive_validation_object_identifier(obj)
                if ident:
                    yield ident, obj

        self._things = {k: v for k, v in _pairs()}

    def __iter__(self) -> Iterator[ValidationObject]:
        return iter(self._things.values())

    @staticmethod
    def empty():
        return ValidationObjectSet(())


@dataclass(frozen=True)
class AdESBasicValidationResult(Generic[StatusType]):
    """
    Result of validation of basic signatures.

    ETSI EN 319 102-1, § 5.3
    """

    ades_subindic: AdESSubIndic
    """
    AdES subindication.
    """

    api_status: Optional[StatusType]
    """
    A status descriptor object from pyHanko's own validation API.
    Will be an instance of :class:`.SignatureStatus` or a subclass
    thereof.
    """

    failure_msg: Optional[str]
    """
    A string describing the reason why validation failed,
    if applicable.
    """

    validation_objects: ValidationObjectSet
    """
    Validation objects that were potentially relevant for the validation process.
    """


@dataclass
class _InternalBasicValidationResult:
    ades_subindic: AdESSubIndic
    signature_poe_time: Optional[datetime]
    signature_not_before_time: Optional[datetime]
    validation_path: Optional[ValidationPath]
    status_kwargs: dict = dataclasses.field(default_factory=dict)
    trust_subindic_update: Optional[AdESSubIndic] = None

    signature_ts_validity: Optional[TimestampSignatureStatus] = None
    content_ts_validity: Optional[TimestampSignatureStatus] = None

    signer_attr_status: Optional[SignerAttributeStatus] = None

    def update(self, status_cls, with_ts, with_attrs):
        status_kwargs = self.status_kwargs
        status_kwargs['validation_path'] = self.validation_path
        if self.trust_subindic_update:
            status_kwargs['trust_problem_indic'] = self.trust_subindic_update

        if with_ts and self.signature_ts_validity:
            status_kwargs['timestamp_validity'] = self.signature_ts_validity
        if with_ts and self.content_ts_validity:
            status_kwargs['content_timestamp_validity'] = (
                self.content_ts_validity
            )
        if with_attrs and self.signer_attr_status:
            status_kwargs['ac_attrs'] = self.signer_attr_status.ac_attrs
            status_kwargs['cades_signer_attrs'] = (
                self.signer_attr_status.cades_signer_attrs
            )
            status_kwargs['ac_validation_errs'] = (
                self.signer_attr_status.ac_validation_errs
            )
        return status_cls(**status_kwargs)


@overload
async def ades_timestamp_validation(
    tst_signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    expected_tst_imprint: bytes,
    *,
    status_cls: Type[StatusType],
    timing_info: Optional[ValidationTimingInfo] = None,
    validation_data_handlers: Optional[ValidationDataHandlers] = None,
    extra_status_kwargs: Optional[Dict[str, Any]] = None,
) -> AdESBasicValidationResult: ...


@overload
async def ades_timestamp_validation(
    tst_signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    expected_tst_imprint: bytes,
    *,
    timing_info: Optional[ValidationTimingInfo] = None,
    validation_data_handlers: Optional[ValidationDataHandlers] = None,
    extra_status_kwargs: Optional[Dict[str, Any]] = None,
) -> AdESBasicValidationResult: ...


async def ades_timestamp_validation(
    tst_signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    expected_tst_imprint: bytes,
    timing_info: Optional[ValidationTimingInfo] = None,
    validation_data_handlers: Optional[ValidationDataHandlers] = None,
    extra_status_kwargs: Optional[Dict[str, Any]] = None,
    status_cls=TimestampSignatureStatus,
) -> AdESBasicValidationResult:
    """
    Validate a timestamp token according to ETSI EN 319 102-1 § 5.4.

    :param tst_signed_data:
        The ``SignedData`` value of the timestamp.
    :param validation_spec:
        Validation settings to apply.
    :param expected_tst_imprint:
        The expected message imprint in the timestamp token.
    :param timing_info:
        Data object describing the timing of the validation.
        Defaults to :meth:`.ValidationTimingInfo.now`.
    :param validation_data_handlers:
        Data handlers to manage validation data.
    :param extra_status_kwargs:
        Extra keyword arguments to pass to the signature status object's
        ``__init__`` function.
    :param status_cls:
        The class of the resulting status object in pyHanko's internal
        validation API.
    :return:
        A :class:`.AdESBasicValidationResult`.
    """

    timing_info = timing_info or ValidationTimingInfo.now()
    cert_validation_policy = (
        validation_spec.ts_cert_validation_policy
        or validation_spec.cert_validation_policy
    )

    if validation_data_handlers is None:
        validation_data_handlers = bootstrap_validation_data_handlers(
            spec=validation_spec, timing_info=timing_info
        )

    validation_context = cert_validation_policy.build_validation_context(
        timing_info=timing_info, handlers=validation_data_handlers
    )
    return await _ades_timestamp_validation_from_context(
        tst_signed_data,
        validation_context,
        expected_tst_imprint,
        extra_status_kwargs=extra_status_kwargs,
        status_cls=status_cls,
    )


def _ades_signature_crypto_policy_check(
    signer_info: cms.SignerInfo,
    algo_policy: AlgorithmUsagePolicy,
    control_time: datetime,
    public_key: Optional[keys.PublicKeyInfo],
):
    sig_algo: cms.SignedDigestAlgorithm = signer_info['signature_algorithm']
    sig_allowed = algo_policy.signature_algorithm_allowed(
        sig_algo, control_time, public_key=public_key
    )
    if not sig_allowed:
        msg = (
            f"Signature algorithm {sig_algo.signature_algo} not allowed as "
            f"of {control_time}, which is "
            f"the time of the earliest PoE for the signature."
        )
        raise errors.SignatureValidationError(
            msg,
            ades_subindication=(
                AdESIndeterminate.CRYPTO_CONSTRAINTS_FAILURE
                if sig_allowed.not_allowed_after is None
                else AdESIndeterminate.CRYPTO_CONSTRAINTS_FAILURE_NO_POE
            ),
        )


def _enumerate_validation_objects(
    validation_context: Optional[ValidationContext],
) -> Generator[ValidationObject, None, None]:
    if validation_context is None:
        return
    for ocsp in validation_context.ocsps:
        for cont in OCSPContainer.load_multi(ocsp):
            yield ValidationObject(ValidationObjectType.OCSP_RESPONSE, cont)
        for cert in enumerate_ocsp_certs(ocsp):
            yield ValidationObject(ValidationObjectType.CERTIFICATE, cert)
    for crl in validation_context.crls:
        yield ValidationObject(ValidationObjectType.CRL, CRLContainer(crl))


def _enumerate_certs_in_paths(
    status: Union[SignatureStatus, _InternalBasicValidationResult, None],
):
    if status is None:
        return
    path = status.validation_path
    if path:
        for cert in path.iter_certs(include_root=True):
            yield ValidationObject(ValidationObjectType.CERTIFICATE, cert)
    if isinstance(status, StandardCMSSignatureStatus):
        yield from _enumerate_certs_in_paths(status.timestamp_validity)
        yield from _enumerate_certs_in_paths(status.content_timestamp_validity)
    if isinstance(status, _InternalBasicValidationResult):
        yield from _enumerate_certs_in_paths(status.signature_ts_validity)
        yield from _enumerate_certs_in_paths(status.content_ts_validity)


async def _ades_timestamp_validation_from_context(
    tst_signed_data: cms.SignedData,
    validation_context: ValidationContext,
    expected_tst_imprint: bytes,
    extra_status_kwargs: Optional[Dict[str, Any]] = None,
    status_cls=TimestampSignatureStatus,
) -> AdESBasicValidationResult:
    vos = ValidationObjectSet(_enumerate_validation_objects(validation_context))
    status_kwargs = dict(extra_status_kwargs or {})
    status_kwargs_from_validation = await generic_cms.validate_tst_signed_data(
        tst_signed_data,
        validation_context=validation_context,
        expected_tst_imprint=expected_tst_imprint,
    )
    status_kwargs.update(status_kwargs_from_validation)
    # noinspection PyArgumentList
    status = status_cls(**status_kwargs)
    if not status.intact:
        return AdESBasicValidationResult(
            ades_subindic=AdESFailure.HASH_FAILURE,
            api_status=status,
            failure_msg=None,
            validation_objects=vos,
        )
    elif not status.valid:
        return AdESBasicValidationResult(
            ades_subindic=AdESFailure.SIG_CRYPTO_FAILURE,
            api_status=status,
            failure_msg=None,
            validation_objects=vos,
        )

    interm_result = await _process_basic_validation(
        tst_signed_data,
        status,
        validation_context,
        ac_validation_context=None,
        signature_not_before_time=None,
    )
    interm_result.status_kwargs = status_kwargs
    vos = ValidationObjectSet(
        iter(vos), _enumerate_certs_in_paths(interm_result)
    )
    return AdESBasicValidationResult(
        ades_subindic=interm_result.ades_subindic,
        api_status=interm_result.update(
            status_cls, with_ts=False, with_attrs=False
        ),
        failure_msg=None,
        validation_objects=vos,
    )


async def _ades_process_attached_ts(
    signer_info, validation_context, signed: bool, tst_digest: bytes
) -> AdESBasicValidationResult:
    tst_signed_data = generic_cms.extract_tst_data(signer_info, signed=signed)
    if tst_signed_data is not None:
        return await _ades_timestamp_validation_from_context(
            tst_signed_data,
            validation_context,
            tst_digest,
        )
    return AdESBasicValidationResult(
        ades_subindic=AdESIndeterminate.GENERIC,
        failure_msg=None,
        api_status=None,
        validation_objects=ValidationObjectSet.empty(),
    )


async def _process_basic_validation(
    signed_data: cms.SignedData,
    temp_status: SignatureStatus,
    ts_validation_context: ValidationContext,
    ac_validation_context: Optional[ValidationContext],
    signature_not_before_time: Optional[datetime],
):
    validation_time = temp_status.validation_time
    ades_trust_status: Optional[AdESSubIndic] = temp_status.trust_problem_indic
    signer_info = generic_cms.extract_signer_info(signed_data)
    ts_status: Optional[TimestampSignatureStatus] = None
    if ades_trust_status in (
        AdESIndeterminate.REVOKED_NO_POE,
        AdESIndeterminate.OUT_OF_BOUNDS_NO_POE,
    ):
        # check content timestamp
        # TODO allow selecting one of multiple here
        # FIXME here and in a few other places we presume that the message
        #  imprint algorithm agrees with the TS's algorithm. This is a fairly
        #  safe assumption, but not an airtight one.
        content_ts_result = await _ades_process_attached_ts(
            signer_info,
            ts_validation_context,
            signed=True,
            tst_digest=generic_cms.find_unique_cms_attribute(
                signer_info['signed_attrs'], 'message_digest'
            ).native,
        )
        if content_ts_result.ades_subindic == AdESPassed.OK:
            ts_status = content_ts_result.api_status

            assert ts_status is not None
            if signature_not_before_time is not None:
                signature_not_before_time = max(
                    ts_status.timestamp, signature_not_before_time
                )
            else:
                signature_not_before_time = ts_status.timestamp

            # now we potentially have POE to know for sure that the signer's
            # certificate was in fact revoked/expired.
            # HOWEVER, according to the spec it is _not_ within this functions
            # remit to check the signature timestamp to reverse a positive
            # X_NO_POE judgement!!
            perm_status: AdESSubIndic
            if ades_trust_status == AdESIndeterminate.REVOKED_NO_POE:
                revo_details = temp_status.revocation_details
                assert revo_details is not None
                cutoff = revo_details.revocation_date
                perm_status = AdESFailure.REVOKED
            else:
                cutoff = temp_status.signing_cert.not_valid_after
                perm_status = AdESIndeterminate.EXPIRED

            # help the typechecker
            assert signature_not_before_time is not None
            if signature_not_before_time >= cutoff:
                ades_trust_status = perm_status

    # TODO process signature policy attr once we can

    cert_info = generic_cms.extract_certificate_info(signed_data)

    # FIXME this is not entirely correct, we need past validation
    #  for attr certs as well
    attr_status_kwargs = await generic_cms.collect_signer_attr_status(
        sd_attr_certificates=cert_info.attribute_certs,
        signer_cert=cert_info.signer_cert,
        validation_context=ac_validation_context,
        sd_signed_attrs=signer_info['signed_attrs'],
    )
    ades_subindic = ades_trust_status or AdESPassed.OK
    return _InternalBasicValidationResult(
        ades_subindic=ades_subindic,
        trust_subindic_update=ades_trust_status,
        content_ts_validity=ts_status,
        signature_not_before_time=signature_not_before_time,
        signer_attr_status=SignerAttributeStatus(**attr_status_kwargs),
        signature_poe_time=None,
        validation_path=temp_status.validation_path,
        status_kwargs={'validation_time': validation_time},
    )


def _init_vcs(
    validation_spec: SignatureValidationSpec,
    timing_info: ValidationTimingInfo,
    validation_data_handlers: ValidationDataHandlers,
):
    validation_context = (
        validation_spec.cert_validation_policy.build_validation_context(
            timing_info=timing_info, handlers=validation_data_handlers
        )
    )
    if validation_spec.ts_cert_validation_policy is not None:
        ts_validation_context = (
            validation_spec.ts_cert_validation_policy.build_validation_context(
                timing_info=timing_info, handlers=validation_data_handlers
            )
        )
    else:
        ts_validation_context = validation_context

    if validation_spec.ac_validation_policy is not None:
        ac_validation_context = (
            validation_spec.ac_validation_policy.build_validation_context(
                timing_info=timing_info, handlers=validation_data_handlers
            )
        )
    else:
        ac_validation_context = None

    return validation_context, ts_validation_context, ac_validation_context


# ETSI EN 319 102-1 § 5.3


@overload
async def ades_basic_validation(
    signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    *,
    status_cls: Type[StatusType],
    timing_info: Optional[ValidationTimingInfo] = None,
    raw_digest: Optional[bytes] = None,
    validation_data_handlers: Optional[ValidationDataHandlers] = None,
    signature_not_before_time: Optional[datetime] = None,
    extra_status_kwargs: Optional[Dict[str, Any]] = None,
) -> AdESBasicValidationResult: ...


@overload
async def ades_basic_validation(
    signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    *,
    timing_info: Optional[ValidationTimingInfo] = None,
    raw_digest: Optional[bytes] = None,
    validation_data_handlers: Optional[ValidationDataHandlers] = None,
    signature_not_before_time: Optional[datetime] = None,
    extra_status_kwargs: Optional[Dict[str, Any]] = None,
) -> AdESBasicValidationResult: ...


async def ades_basic_validation(
    signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    timing_info: Optional[ValidationTimingInfo] = None,
    raw_digest: Optional[bytes] = None,
    validation_data_handlers: Optional[ValidationDataHandlers] = None,
    signature_not_before_time: Optional[datetime] = None,
    extra_status_kwargs: Optional[Dict[str, Any]] = None,
    status_cls=StandardCMSSignatureStatus,
) -> AdESBasicValidationResult:
    """
    Validate a CMS signature according to ETSI EN 319 102-1 § 5.3.

    :param signed_data:
        The ``SignedData`` value.
    :param validation_spec:
        Validation settings to apply.
    :param raw_digest:
        The expected message digest attribute value.
    :param timing_info:
        Data object describing the timing of the validation.
        Defaults to :meth:`.ValidationTimingInfo.now`.
    :param validation_data_handlers:
        Data handlers to manage validation data.
    :param extra_status_kwargs:
        Extra keyword arguments to pass to the signature status object's
        ``__init__`` function.
    :param status_cls:
        The class of the resulting status object in pyHanko's internal
        validation API.
    :param signature_not_before_time:
        Time when the signature was known _not_ to exist.
    :return:
        A :class:`.AdESBasicValidationResult`.
    """

    timing_info = timing_info or ValidationTimingInfo.now()
    if validation_data_handlers is None:
        validation_data_handlers = bootstrap_validation_data_handlers(
            spec=validation_spec, timing_info=timing_info
        )
    (
        validation_context,
        ts_validation_context,
        ac_validation_context,
    ) = _init_vcs(validation_spec, timing_info, validation_data_handlers)

    interm_result = await _ades_basic_validation(
        signed_data=signed_data,
        validation_context=validation_context,
        ts_validation_context=ts_validation_context,
        ac_validation_context=ac_validation_context,
        key_usage_settings=validation_spec.key_usage_settings,
        raw_digest=raw_digest,
        signature_not_before_time=signature_not_before_time,
        extra_status_kwargs=extra_status_kwargs,
        status_cls=status_cls,
        algorithm_policy=validation_spec.signature_algorithm_policy,
    )
    if isinstance(interm_result, AdESBasicValidationResult):
        return interm_result

    status: StandardCMSSignatureStatus = interm_result.update(
        StandardCMSSignatureStatus, with_ts=False, with_attrs=True
    )
    vos = ValidationObjectSet(
        _enumerate_validation_objects(validation_context),
        _enumerate_validation_objects(ac_validation_context),
        _enumerate_validation_objects(ts_validation_context),
        _enumerate_certs_in_paths(status),
    )

    return AdESBasicValidationResult(
        ades_subindic=interm_result.ades_subindic,
        api_status=status,
        failure_msg=None,
        validation_objects=vos,
    )


async def _ades_basic_validation(
    signed_data: cms.SignedData,
    validation_context: ValidationContext,
    ts_validation_context: ValidationContext,
    ac_validation_context: Optional[ValidationContext],
    key_usage_settings: KeyUsageConstraints,
    raw_digest: Optional[bytes],
    signature_not_before_time: Optional[datetime],
    extra_status_kwargs: Optional[Dict[str, Any]],
    algorithm_policy: Optional[CMSAlgorithmUsagePolicy],
    status_cls: Type[StatusType],
) -> Union[AdESBasicValidationResult, _InternalBasicValidationResult]:
    status_kwargs = dict(extra_status_kwargs or {})
    vos = ValidationObjectSet(
        _enumerate_validation_objects(validation_context),
        _enumerate_validation_objects(ts_validation_context),
        _enumerate_validation_objects(ac_validation_context),
    )
    try:
        status_kwargs_from_validation = await generic_cms.cms_basic_validation(
            signed_data,
            raw_digest=raw_digest,
            validation_context=validation_context,
            key_usage_settings=key_usage_settings,
            algorithm_policy=algorithm_policy,
        )
        status_kwargs.update(status_kwargs_from_validation)
    except errors.SignatureValidationError as e:
        return AdESBasicValidationResult(
            ades_subindic=e.ades_subindication or AdESIndeterminate.GENERIC,
            failure_msg=e.failure_message,
            api_status=None,
            validation_objects=vos,
        )

    # put the temp status into a SignatureStatus object for convenience
    status: SignatureStatus = status_cls(**status_kwargs)
    if not status.intact:
        return AdESBasicValidationResult(
            ades_subindic=AdESFailure.HASH_FAILURE,
            api_status=status,
            failure_msg=None,
            validation_objects=vos,
        )
    elif not status.valid:
        return AdESBasicValidationResult(
            ades_subindic=AdESFailure.SIG_CRYPTO_FAILURE,
            api_status=status,
            failure_msg=None,
            validation_objects=vos,
        )

    interm_result = await _process_basic_validation(
        signed_data,
        status,
        ts_validation_context,
        ac_validation_context=ac_validation_context,
        signature_not_before_time=signature_not_before_time,
    )
    interm_result.status_kwargs = status_kwargs
    return interm_result


@dataclass(frozen=True)
class AdESWithTimeValidationResult(AdESBasicValidationResult):
    best_signature_time: datetime
    signature_not_before_time: Optional[datetime]


_WITH_TIME_FURTHER_PROC = frozenset(
    {
        AdESPassed.OK,
        AdESIndeterminate.CRYPTO_CONSTRAINTS_FAILURE_NO_POE,
        AdESIndeterminate.REVOKED_NO_POE,
        AdESIndeterminate.REVOKED_CA_NO_POE,
        AdESIndeterminate.TRY_LATER,
        AdESIndeterminate.OUT_OF_BOUNDS_NO_POE,
    }
)


@overload
async def ades_with_time_validation(
    signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    *,
    timing_info: Optional[ValidationTimingInfo] = None,
    raw_digest: Optional[bytes] = None,
    validation_data_handlers: Optional[ValidationDataHandlers] = None,
    signature_not_before_time: Optional[datetime] = None,
    extra_status_kwargs: Optional[Dict[str, Any]] = None,
) -> AdESWithTimeValidationResult: ...


@overload
async def ades_with_time_validation(
    signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    *,
    status_cls: Type[StatusType],
    timing_info: Optional[ValidationTimingInfo] = None,
    raw_digest: Optional[bytes] = None,
    validation_data_handlers: Optional[ValidationDataHandlers] = None,
    signature_not_before_time: Optional[datetime] = None,
    extra_status_kwargs: Optional[Dict[str, Any]] = None,
) -> AdESWithTimeValidationResult: ...


async def ades_with_time_validation(
    signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    timing_info: Optional[ValidationTimingInfo] = None,
    raw_digest: Optional[bytes] = None,
    validation_data_handlers: Optional[ValidationDataHandlers] = None,
    signature_not_before_time: Optional[datetime] = None,
    extra_status_kwargs: Optional[Dict[str, Any]] = None,
    status_cls=StandardCMSSignatureStatus,
) -> AdESWithTimeValidationResult:
    """
    Validate a CMS signature with time according to ETSI EN 319 102-1 § 5.5.

    :param signed_data:
        The ``SignedData`` value.
    :param validation_spec:
        Validation settings to apply.
    :param raw_digest:
        The expected message digest attribute value.
    :param timing_info:
        Data object describing the timing of the validation.
        Defaults to :meth:`.ValidationTimingInfo.now`.
    :param validation_data_handlers:
        Data handlers to manage validation data.
    :param extra_status_kwargs:
        Extra keyword arguments to pass to the signature status object's
        ``__init__`` function.
    :param status_cls:
        The class of the resulting status object in pyHanko's internal
        validation API.
    :param signature_not_before_time:
        Time when the signature was known _not_ to exist.
    :return:
        A :class:`.AdESBasicValidationResult`.
    """

    timing_info = timing_info or ValidationTimingInfo.now()
    if validation_data_handlers is None:
        validation_data_handlers = bootstrap_validation_data_handlers(
            spec=validation_spec, timing_info=timing_info
        )

    (
        validation_context,
        ts_validation_context,
        ac_validation_context,
    ) = _init_vcs(validation_spec, timing_info, validation_data_handlers)

    sig_bytes = signed_data['signer_infos'][0]['signature'].native
    signature_poe_time = validation_data_handlers.poe_manager[sig_bytes]

    interm_result = await _ades_basic_validation(
        signed_data,
        validation_context=validation_context,
        ts_validation_context=ts_validation_context,
        ac_validation_context=ac_validation_context,
        key_usage_settings=validation_spec.key_usage_settings,
        raw_digest=raw_digest,
        signature_not_before_time=signature_not_before_time,
        extra_status_kwargs=extra_status_kwargs,
        status_cls=status_cls,
        algorithm_policy=validation_spec.signature_algorithm_policy,
    )

    if isinstance(interm_result, AdESBasicValidationResult):
        vos = ValidationObjectSet(
            _enumerate_validation_objects(validation_context),
            _enumerate_validation_objects(ts_validation_context),
            _enumerate_validation_objects(ac_validation_context),
            _enumerate_certs_in_paths(interm_result.api_status),
        )
        return AdESWithTimeValidationResult(
            ades_subindic=interm_result.ades_subindic,
            api_status=interm_result.api_status,
            failure_msg=interm_result.failure_msg,
            best_signature_time=signature_poe_time,
            signature_not_before_time=signature_not_before_time,
            validation_objects=vos,
        )
    elif interm_result.ades_subindic not in _WITH_TIME_FURTHER_PROC:
        assert isinstance(interm_result, _InternalBasicValidationResult)
        signature_not_before_time = interm_result.signature_not_before_time
        api_status = interm_result.update(
            status_cls, with_ts=True, with_attrs=True
        )
        vos = ValidationObjectSet(
            _enumerate_validation_objects(validation_context),
            _enumerate_validation_objects(ts_validation_context),
            _enumerate_validation_objects(ac_validation_context),
            _enumerate_certs_in_paths(api_status),
        )
        return AdESWithTimeValidationResult(
            ades_subindic=interm_result.ades_subindic,
            api_status=api_status,
            failure_msg=None,
            best_signature_time=signature_poe_time,
            signature_not_before_time=signature_not_before_time,
            validation_objects=vos,
        )

    signer_info = generic_cms.extract_signer_info(signed_data)
    temp_status = interm_result.update(
        status_cls, with_ts=False, with_attrs=True
    )

    # process signature timestamps
    # TODO allow selecting one of multiple timestamps here?
    tst_digest = generic_cms.compute_signature_tst_digest(signer_info)
    if tst_digest is None:
        # TODO conditionally enforce this based on policy params---
        #  for now we assume that someone calling this method actually cares
        #  about timestamps
        vos = ValidationObjectSet(
            _enumerate_validation_objects(validation_context),
            _enumerate_validation_objects(ts_validation_context),
            _enumerate_validation_objects(ac_validation_context),
            _enumerate_certs_in_paths(interm_result),
        )
        return AdESWithTimeValidationResult(
            ades_subindic=AdESIndeterminate.SIG_CONSTRAINTS_FAILURE,
            api_status=temp_status,
            failure_msg="No signature timestamp present",
            best_signature_time=timing_info.best_signature_time,
            signature_not_before_time=signature_not_before_time,
            validation_objects=vos,
        )
    sig_ts_result = await _ades_process_attached_ts(
        signer_info, validation_context, signed=False, tst_digest=tst_digest
    )
    vos = ValidationObjectSet(
        _enumerate_validation_objects(validation_context),
        _enumerate_validation_objects(ts_validation_context),
        _enumerate_validation_objects(ac_validation_context),
        _enumerate_certs_in_paths(interm_result),
        _enumerate_certs_in_paths(sig_ts_result.api_status),
    )
    if sig_ts_result.ades_subindic != AdESPassed.OK:
        return AdESWithTimeValidationResult(
            ades_subindic=sig_ts_result.ades_subindic,
            api_status=temp_status,
            failure_msg=None,
            best_signature_time=signature_poe_time,
            signature_not_before_time=signature_not_before_time,
            validation_objects=vos,
        )

    ts_status = sig_ts_result.api_status

    # note: we should always have a non-null status if the validation passes,
    assert isinstance(ts_status, TimestampSignatureStatus)

    if signature_poe_time is not None:
        signature_poe_time = min(ts_status.timestamp, signature_poe_time)
    else:
        signature_poe_time = ts_status.timestamp
    interm_result.signature_ts_validity = ts_status
    interm_result.signature_poe_time = signature_poe_time

    if interm_result.ades_subindic == AdESIndeterminate.REVOKED_NO_POE:
        revo_details: RevocationDetails = temp_status.revocation_details
        if signature_poe_time >= revo_details.revocation_date:
            # nothing we can do
            return AdESWithTimeValidationResult(
                ades_subindic=interm_result.ades_subindic,
                api_status=temp_status,
                failure_msg=None,
                best_signature_time=signature_poe_time,
                signature_not_before_time=signature_not_before_time,
                validation_objects=vos,
            )
    elif interm_result.ades_subindic == AdESIndeterminate.OUT_OF_BOUNDS_NO_POE:
        # NOTE: we can't process expiration here since we don't have access
        #  to _timestamped_ revocation information
        if signature_poe_time < temp_status.signing_cert.not_valid_before:
            # FIXME replace temp_status as well
            return AdESWithTimeValidationResult(
                ades_subindic=AdESIndeterminate.NOT_YET_VALID,
                api_status=temp_status,
                failure_msg=None,
                best_signature_time=signature_poe_time,
                signature_not_before_time=signature_not_before_time,
                validation_objects=vos,
            )
    elif (
        interm_result.ades_subindic
        == AdESIndeterminate.CRYPTO_CONSTRAINTS_FAILURE_NO_POE
        or interm_result.ades_subindic == AdESIndeterminate.TRY_LATER
    ):
        # in this case error_time_horizon is set to the point where the
        # constraint was triggered
        if signature_poe_time >= temp_status.error_time_horizon:
            return AdESWithTimeValidationResult(
                ades_subindic=interm_result.ades_subindic,
                api_status=temp_status,
                failure_msg=None,
                best_signature_time=signature_poe_time,
                signature_not_before_time=signature_not_before_time,
                validation_objects=vos,
            )

    # TODO TSTInfo ordering/comparison check
    if (
        signature_not_before_time is not None
        and signature_not_before_time > signature_poe_time
    ):
        return AdESWithTimeValidationResult(
            ades_subindic=AdESIndeterminate.TIMESTAMP_ORDER_FAILURE,
            api_status=temp_status,
            failure_msg=None,
            best_signature_time=signature_poe_time,
            signature_not_before_time=signature_not_before_time,
            validation_objects=vos,
        )
    # TODO handle time-stamp delay
    interm_result.trust_subindic_update = None
    interm_result.status_kwargs['trust_problem_indic'] = None

    status = interm_result.update(status_cls, with_ts=True, with_attrs=True)
    return AdESWithTimeValidationResult(
        ades_subindic=AdESPassed.OK,
        api_status=status,
        failure_msg=None,
        best_signature_time=signature_poe_time,
        signature_not_before_time=signature_not_before_time,
        validation_objects=vos,
    )


class _TrustNoOne(TrustManager):
    def is_root(self, cert: x509.Certificate) -> bool:
        return False

    def find_potential_issuers(
        self, cert: x509.Certificate
    ) -> Iterator[TrustAnchor]:
        return iter(())


def _crl_issuer_cert_poe_boundary(
    crl: CRLOfInterest, cutoff: datetime, poe_manager: POEManager
):
    return any(
        poe_manager[prov_path.path.leaf] <= cutoff
        for prov_path in crl.prov_paths
    )


def _ocsp_issuer_cert_poe_boundary(
    ocsp: OCSPResponseOfInterest, cutoff: datetime, poe_manager: POEManager
):
    return poe_manager[ocsp.prov_path.leaf] <= cutoff


async def _find_revinfo_data_for_leaf_in_past(
    cert: x509.Certificate,
    validation_data_handlers: ValidationDataHandlers,
    control_time: datetime,
    revocation_checking_rule: RevocationCheckingRule,
):
    # Need to find a piece of revinfo for the signing cert, for which we have
    # POE for the issuer cert, which must be dated before the expiration date of
    # the cert. (Standard is unclear as to which cert this refers to, but
    # it's probably the date on the signing cert. Not much point in requiring
    # PoE for a cert before its self-declared expiration date...)
    # Since our revinfo gathering logic is based on paths, we gather up all
    # candidate issuers and work with those "truncated" candidate paths.
    # Trust is not an issue at this stage.
    registry = validation_data_handlers.cert_registry
    candidate_issuers = registry.find_potential_issuers(
        cert=cert, trust_manager=_TrustNoOne()
    )

    def _for_candidate_issuer(iss: x509.Certificate):
        truncated_path = ValidationPath(
            trust_anchor=CertTrustAnchor(iss), interm=[], leaf=cert
        )
        return ades_gather_prima_facie_revinfo(
            path=truncated_path,
            revinfo_manager=validation_data_handlers.revinfo_manager,
            control_time=control_time,
            revocation_checking_rule=revocation_checking_rule,
        )

    job_futures = asyncio.as_completed(
        [_for_candidate_issuer(iss) for iss in candidate_issuers]
    )

    poe_manager = validation_data_handlers.poe_manager

    crls: List[CRLOfInterest] = []
    ocsps: List[OCSPResponseOfInterest] = []
    new_crls: Iterable[CRLOfInterest]
    new_ocsps: Iterable[OCSPResponseOfInterest]
    to_evict: Set[bytes] = set()
    for fut_results in job_futures:
        new_crls, new_ocsps = await fut_results
        # Collect the revinfos for which we have POE for the issuer cert
        # predating the expiration of the signer cert
        for crl_oi in new_crls:
            if _crl_issuer_cert_poe_boundary(
                crl_oi, cert.not_valid_after, poe_manager
            ):
                crls.append(crl_oi)
            else:
                revinfo_data = crl_oi.crl.crl_data.dump()
                to_evict.add(digest_for_poe(revinfo_data))

        for ocsp_oi in new_ocsps:
            if _ocsp_issuer_cert_poe_boundary(
                ocsp_oi, cert.not_valid_after, poe_manager
            ):
                ocsps.append(ocsp_oi)
            else:
                revinfo_data = ocsp_oi.ocsp_response.ocsp_response_data.dump()
                to_evict.add(digest_for_poe(revinfo_data))
    # we only run the eviction logic if we found at least one piece of revinfo
    # that we can actually use (that's what the spec says, shouldn't change
    # validation result, but the reported error probably makes more sense)
    if crls or ocsps:
        validation_data_handlers.revinfo_manager.evict_crls(to_evict)
        validation_data_handlers.revinfo_manager.evict_ocsps(to_evict)
    return crls, ocsps


async def _build_and_past_validate_cert(
    cert: x509.Certificate,
    validation_policy_spec: CertValidationPolicySpec,
    validation_data_handlers: ValidationDataHandlers,
) -> Tuple[ValidationPath, datetime]:
    path_builder = PathBuilder(
        trust_manager=validation_policy_spec.trust_manager,
        registry=validation_data_handlers.cert_registry,
    )

    current_subindication = None
    paths = path_builder.async_build_paths_lazy(cert)
    try:
        async for cand_path in paths:
            past_result: generic_cms.CertvalidatorOperationResult[datetime]
            past_result = await generic_cms.handle_certvalidator_errors(
                past_validate(
                    path=cand_path,
                    validation_policy_spec=validation_policy_spec,
                    validation_data_handlers=validation_data_handlers,
                    init_control_time=None,
                )
            )
            current_subindication = past_result.error_subindic
            validation_time = past_result.success_result
            if current_subindication is None:
                assert validation_time is not None
                return cand_path, validation_time
    finally:
        await paths.cancel()

    msg = "Unable to construct plausible past validation path"
    if current_subindication is not None:
        raise errors.SignatureValidationError(
            failure_message=msg, ades_subindication=current_subindication
        )
    else:
        raise errors.SignatureValidationError(
            failure_message=f"{msg}: no prima facie paths constructed",
            ades_subindication=AdESIndeterminate.NO_CERTIFICATE_CHAIN_FOUND,
        )


async def _ades_past_signature_validation(
    signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    poe_manager: POEManager,
    current_time_sub_indic: Optional[AdESIndeterminate],
    init_control_time: datetime,
    is_timestamp: bool,
) -> ValidationPath:
    validation_data_handlers = bootstrap_validation_data_handlers(
        validation_spec, is_historical=True, poe_manager_override=poe_manager
    )

    signature_bytes = signed_data['signer_infos'][0]['signature'].native
    best_signature_time = poe_manager[signature_bytes]

    try:
        cert_info = extract_certificate_info(signed_data)
        cert = cert_info.signer_cert
        validation_data_handlers.cert_registry.register_multiple(
            cert_info.other_certs
        )
    except CMSExtractionError:
        raise errors.SignatureValidationError(
            'signer certificate not included in signature',
            ades_subindication=AdESIndeterminate.NO_SIGNING_CERTIFICATE_FOUND,
        )

    if is_timestamp:
        cert_validation_policy = (
            validation_spec.ts_cert_validation_policy
            or validation_spec.cert_validation_policy
        )
    else:
        cert_validation_policy = validation_spec.cert_validation_policy
    leaf_crls, leaf_ocsps = await _find_revinfo_data_for_leaf_in_past(
        cert,
        validation_data_handlers,
        control_time=init_control_time,
        revocation_checking_rule=(
            cert_validation_policy.revinfo_policy.revocation_checking_policy.ee_certificate_rule
        ),
    )

    # Key usage for the signer is not something that varies over time, so
    # we delegate that to the caller. This is justified both because it's
    # technically simpler, and because the past signature validation block
    # in AdES is predicated on delegating the basic integrity checks anyhow.
    cert_path, validation_time = await _build_and_past_validate_cert(
        cert,
        validation_policy_spec=cert_validation_policy,
        validation_data_handlers=validation_data_handlers,
    )

    # TODO revisit this once I have a clearer understanding of why this PoE
    #  issuance check is only applied to the EE cert.
    def _pass_contingent_on_revinfo_issuance_poe():
        if not bool(leaf_crls or leaf_ocsps):
            status = AdESIndeterminate.REVOCATION_OUT_OF_BOUNDS_NO_POE
            raise errors.SignatureValidationError(
                failure_message=(
                    "POE for signature available, but could not obtain "
                    "sufficient POE for the issuance of the "
                    "revocation information",
                ),
                ades_subindication=status,
            )

    if best_signature_time <= validation_time:
        # TODO raise an issue with ESI about TRY_LATER here
        if (
            current_time_sub_indic == AdESIndeterminate.REVOKED_NO_POE
            or current_time_sub_indic == AdESIndeterminate.TRY_LATER
        ):
            _pass_contingent_on_revinfo_issuance_poe()
            return cert_path
        elif current_time_sub_indic in (
            AdESIndeterminate.REVOKED_CA_NO_POE,
            AdESIndeterminate.CRYPTO_CONSTRAINTS_FAILURE_NO_POE,
        ):
            # This is an automatic pass given that certvalidator checks
            # these conditions for us as part of past_validate(...)
            return cert_path
        elif current_time_sub_indic in (
            AdESIndeterminate.OUT_OF_BOUNDS_NO_POE,
            AdESIndeterminate.OUT_OF_BOUNDS_NOT_REVOKED,
        ):
            if best_signature_time < cert.not_valid_before:
                raise errors.SignatureValidationError(
                    failure_message="Signature predates cert validity period",
                    ades_subindication=AdESFailure.NOT_YET_VALID,
                )
            elif best_signature_time <= cert.not_valid_after:
                _pass_contingent_on_revinfo_issuance_poe()
                return cert_path

    # TODO here, it would help to preserve more than the sub-indication
    #  from before
    raise errors.SigSeedValueValidationError(
        failure_message=(
            "Past signature validation did not manage "
            "to improve current time result."
        ),
        ades_subindication=current_time_sub_indic,
    )


async def ades_past_signature_validation(
    signed_data: cms.SignedData,
    validation_spec: SignatureValidationSpec,
    poe_manager: POEManager,
    current_time_sub_indic: Optional[AdESIndeterminate],
    init_control_time: Optional[datetime] = None,
) -> AdESSubIndic:
    """
    Validate a CMS signature in the past according
    to ETSI EN 319 102-1 § 5.6.2.4.

    This is internal API.

    .. danger::
        The notion of "past validation" used here is only valid in the
        narrow technical sense in which it is used within AdES.
        It should _never_ be relied upon as a standalone validation routine.

    :param signed_data:
        The ``SignedData`` value.
    :param validation_spec:
        Validation settings to apply.
    :param poe_manager:
        The POE manager from which to source existence proofs.
    :param current_time_sub_indic:
        The AdES subindication from validating the signature
        at the current time with the relevant settings.
    :param init_control_time:
        Initial value for the control time parameter.
    :return:
        An AdES subindication indicating the validation result
        after going through the past validation process.
    """

    eci = signed_data['encap_content_info']
    is_timestamp = eci['content_type'].native == 'tst_info'
    if init_control_time is None:
        init_control_time = datetime.now(tz=tzlocal.get_localzone())
    try:
        await _ades_past_signature_validation(
            signed_data=signed_data,
            validation_spec=validation_spec,
            poe_manager=poe_manager,
            current_time_sub_indic=current_time_sub_indic,
            init_control_time=init_control_time,
            is_timestamp=is_timestamp,
        )
        return AdESPassed.OK
    except errors.SignatureValidationError as e:
        logger.warning(e)
        return e.ades_subindication or AdESIndeterminate.GENERIC
    except PathError as e:
        logger.warning(e)
        return AdESIndeterminate.CERTIFICATE_CHAIN_GENERAL_FAILURE


@dataclass(frozen=True)
class _PrimaFaciePOEItem:
    digest: bytes
    validation_object: ValidationObject


@dataclass(frozen=True)
class _PrimaFaciePOEFromTimeStamp:
    pdf_revision: int
    timestamp_dt: datetime
    poes_implied: FrozenSet[_PrimaFaciePOEItem]
    timestamp_token_signed_data: cms.SignedData
    doc_digest: bytes
    # include info from difference analysis as part of the status kwargs
    forensic_info: dict

    def add_to_poe_manager(self, manager: POEManager):
        for thing in self.poes_implied:
            manager.register_known_poe(
                KnownPOE(
                    poe_type=POEType.VALIDATION,
                    digest=thing.digest,
                    poe_time=self.timestamp_dt,
                    validation_object=thing.validation_object,
                )
            )


def _extract_cert_digests_from_signed_data(
    sd: cms.SignedData,
) -> Generator[_PrimaFaciePOEItem, None, None]:
    cert_choice: cms.CertificateChoices
    for cert_choice in sd['certificates']:
        obj = cert_choice.chosen
        data = obj.dump()
        if cert_choice.name == 'certificate':
            vo_type = ValidationObjectType.CERTIFICATE
        elif cert_choice.name == 'v2_attr_cert':
            # There is no separate type for an attribute cert in
            # ETSI TS 119 102-2, so we mark it as OTHER.
            vo_type = ValidationObjectType.OTHER
        else:
            # skip over unsupported certificate types
            # since we don't want to give the impression
            # in the validation report that we processed them.
            # TODO write test to verify that these don't end up in the report
            continue
        digest = digest_for_poe(data)
        yield _PrimaFaciePOEItem(
            digest=digest,
            validation_object=ValidationObject(object_type=vo_type, value=obj),
        )


def _get_tst_timestamp(sd: cms.SignedData) -> datetime:
    tst_info: tsp.TSTInfo = sd['encap_content_info']['content'].parsed
    return tst_info['gen_time'].native


def _read_validation_objects_from_revinfo_archival(
    revinfo_archival: asn1_pdf.RevocationInfoArchival,
) -> Generator[_PrimaFaciePOEItem, None, None]:
    for crl in revinfo_archival['crl']:
        yield _PrimaFaciePOEItem(
            digest=digest_for_poe(crl.dump()),
            validation_object=ValidationObject(
                object_type=ValidationObjectType.CRL,
                value=CRLContainer(crl),
            ),
        )
    for ocsp in revinfo_archival['ocsp']:
        yield _PrimaFaciePOEItem(
            digest=digest_for_poe(ocsp.dump()),
            validation_object=ValidationObject(
                object_type=ValidationObjectType.OCSP_RESPONSE,
                value=OCSPContainer(ocsp),
            ),
        )


def _read_validation_objects_from_dss(
    dss: DocumentSecurityStore,
) -> Generator[_PrimaFaciePOEItem, None, None]:
    for crl_obj in dss.crls:
        data = crl_obj.get_object().data
        yield _PrimaFaciePOEItem(
            digest=digest_for_poe(data),
            validation_object=ValidationObject(
                object_type=ValidationObjectType.CRL,
                value=CRLContainer(CertificateList.load(data)),
            ),
        )
    for ocsp_obj in dss.ocsps:
        data = ocsp_obj.get_object().data
        yield _PrimaFaciePOEItem(
            digest=digest_for_poe(data),
            validation_object=ValidationObject(
                object_type=ValidationObjectType.OCSP_RESPONSE,
                value=OCSPContainer(OCSPResponse.load(data)),
            ),
        )
    for cert_obj in dss.certs.values():
        data = cert_obj.get_object().data
        yield _PrimaFaciePOEItem(
            digest=digest_for_poe(data),
            validation_object=ValidationObject(
                object_type=ValidationObjectType.CERTIFICATE,
                value=x509.Certificate.load(data),
            ),
        )


def _build_prima_facie_poe_index_from_pdf_timestamps(
    r: PdfFileReader,
    include_content_ts: bool,
    diff_policy: Optional[DiffPolicy],
) -> List[_PrimaFaciePOEFromTimeStamp]:
    # This subroutine implements the POE gathering part of the evidence record
    # processing algorithm in AdES as applied to PDF. For the purposes of this
    # function, the chain of document timestamps is treated as a single evidence
    # record, and all document data in the revision in which a timestamp is
    # contained is considered fair game.
    # Signature timestamps are not processed as such, but POE for the timestamps
    # themselves will be accumulated.
    # Content timestamps can optionally be included. This is not standard
    # in AdES, but since there's no cryptographic difference (in PDF!) between
    # a content TS in a signature and a document timestamp signature, they
    # can be taken into account at the caller's discretion

    # TODO take algorithm usage policy into account?

    # TODO when ingesting OCSP responses, make an effort to register
    #  POE for the embedded certs as well? Esp. potential responder certs.

    # timestamp -> hashes index. We haven't validated the chain of trust
    # of the timestamps yet, so we can't put them in an actual
    # POE manager immediately

    # Since the embedded signature context is necessary to validate the POE's
    # integrity, we do run the integrity checker for the TST data at this stage.
    # The actual trust validation is delegated

    collected_so_far: Set[_PrimaFaciePOEItem] = set()
    # Holds all digests of objects contained in _document_ content so far
    # (note: this is why it's important to traverse the revisions in order)

    for_next_ts: Set[_PrimaFaciePOEItem] = set()
    # Holds digests of objects that will be registered with POE on the next
    # document TS or content TS encountered.

    prima_facie_poe_sets: List[_PrimaFaciePOEFromTimeStamp] = []
    # output array (to avoid having to work with async generators)

    embedded_sig: EmbeddedPdfSignature

    for ix, embedded_sig in enumerate(r.embedded_signatures):
        embedded_sig.compute_integrity_info(
            # different None handling convention
            diff_policy,
            skip_diff=diff_policy is None,
        )

        hist_handler = HistoricalResolver(
            r, revision=embedded_sig.signed_revision
        )

        signed_data: cms.SignedData = embedded_sig.signed_data
        ts_signed_data: Optional[cms.SignedData] = None
        is_doc_ts = False
        if embedded_sig.sig_object_type == '/DocTimeStamp':
            ts_signed_data = signed_data
            is_doc_ts = True
        elif include_content_ts:
            ts_signed_data = generic_cms.extract_tst_data(
                embedded_sig.signer_info, signed=True
            )

        # Important remark: at this time, we do NOT consider signature
        # timestamps when evaluating POE data, only content timestamps &
        # document timestamps!
        # Rationale: the signature timestamp only indirectly protects
        # the document content, and wasn't designed for this purpose.
        # If we want to use signature TSes as well, we'd have to evaluate
        # the integrity of the signature, which requires selecting a certificate
        # (even if just for validation purposes), yada yada. Not doing any of
        # that for now.
        # (This approach might change in the future)

        if ts_signed_data is not None:
            # add DSS content
            try:
                dss = DocumentSecurityStore.read_dss(hist_handler)
                collected_so_far.update(_read_validation_objects_from_dss(dss))
            except NoDSSFoundError:
                pass
            collected_so_far.update(for_next_ts)
            doc_digest = embedded_sig.compute_digest()
            coverage_normal = (
                embedded_sig.evaluate_signature_coverage()
                >= SignatureCoverageLevel.ENTIRE_REVISION
            )
            if coverage_normal:
                prima_facie_poe_sets.append(
                    _PrimaFaciePOEFromTimeStamp(
                        pdf_revision=embedded_sig.signed_revision,
                        timestamp_dt=_get_tst_timestamp(ts_signed_data),
                        poes_implied=frozenset(collected_so_far),
                        timestamp_token_signed_data=ts_signed_data,
                        doc_digest=doc_digest,
                        forensic_info=embedded_sig.summarise_integrity_info(),
                    )
                )
                # reset for_next_ts
                for_next_ts = set()
            for_next_ts.update(
                _extract_cert_digests_from_signed_data(ts_signed_data)
            )

        # the certs in the signature container itself are not part of the
        # signed data in that revision, but they're covered
        # by whatever the next (content) TS covers -> keep 'em
        for_next_ts.update(_extract_cert_digests_from_signed_data(signed_data))

        # same for revinfo embedded Adobe-style:
        # part of the signed data, but not directly timestamped
        # => save for next TS
        signed_attrs = embedded_sig.signer_info['signed_attrs']
        if not is_doc_ts:
            try:
                revinfo_attr: asn1_pdf.RevocationInfoArchival = (
                    find_unique_cms_attribute(
                        signed_attrs, 'adobe_revocation_info_archival'
                    )
                )

                for_next_ts.update(
                    _read_validation_objects_from_revinfo_archival(revinfo_attr)
                )
            except (MultivaluedAttributeError, NonexistentAttributeError):
                pass

        # Prepare a POE entry for the signature itself (to be processed
        # with the next timestamp)
        sig_bytes = embedded_sig.signer_info['signature'].native
        for_next_ts.add(
            _PrimaFaciePOEItem(
                digest=digest_for_poe(sig_bytes),
                validation_object=ValidationObject(
                    object_type=ValidationObjectType.SIGNED_DATA,
                    # For now, we put the entire signed data object here
                    # while we take the digest only over the signature.
                    # This was done for expediency & ease of reasoning
                    # given existing code, but may change in the future.
                    value=embedded_sig.signed_data,
                ),
            )
        )

        # add POE entries for the timestamp(s) attached to this signature
        try:
            content_tses = find_cms_attribute(
                signed_attrs, 'content_time_stamp'
            )
        except (NonexistentAttributeError, CMSStructuralError):
            content_tses = ()

        try:
            signature_tses = find_cms_attribute(
                embedded_sig.signer_info['unsigned_attrs'],
                'signature_time_stamp',
            )
        except (NonexistentAttributeError, CMSStructuralError):
            signature_tses = ()

        for ts_data in itertools.chain(signature_tses, content_tses):
            ts_data_content = ts_data['content']
            for ts_signer_info in ts_data_content['signer_infos']:
                ts_sig_bytes = ts_signer_info['signature'].native
                for_next_ts.add(
                    _PrimaFaciePOEItem(
                        digest=digest_for_poe(ts_sig_bytes),
                        # Same as for signedData: we take the digest over
                        # the signature part only.
                        # This was done for expediency & ease of reasoning
                        # given existing code, but may change in the future.
                        validation_object=ValidationObject(
                            object_type=ValidationObjectType.TIMESTAMP,
                            value=ts_data_content,
                        ),
                    )
                )

    return prima_facie_poe_sets


async def _validate_prima_facie_poe(
    prima_facie_poe_sets: List[_PrimaFaciePOEFromTimeStamp],
    # we assume that the validation info extracted from the DSS
    # has been registered in the revinfo gathering policy object
    # and/or the known cert list, respectively
    validation_spec: SignatureValidationSpec,
    cur_timing_info: Optional[ValidationTimingInfo] = None,
) -> POEManager:
    # Sort by PDF revision, but in ascending order (!)
    # This is a consequence of the way the ER validation algorithm works
    candidate_poes = sorted(prima_facie_poe_sets, key=lambda p: p.pdf_revision)

    cur_timing_info = cur_timing_info or ValidationTimingInfo.now(
        tz=tzlocal.get_localzone()
    )

    resulting_poes = POEManager()
    validation_spec.local_knowledge.add_to_poe_manager(resulting_poes)

    for ix, poe in enumerate(candidate_poes):
        temporary_poes = copy(resulting_poes)
        if ix < len(candidate_poes) - 1:
            # perform temp POE initialisation as the AdES spec requires
            next_poe = candidate_poes[ix + 1]
            next_poe.add_to_poe_manager(temporary_poes)

        validation_data_handlers = bootstrap_validation_data_handlers(
            validation_spec,
            timing_info=cur_timing_info,
            poe_manager_override=temporary_poes,
        )
        cur_time_result = await ades_timestamp_validation(
            tst_signed_data=poe.timestamp_token_signed_data,
            validation_spec=validation_spec,
            timing_info=cur_timing_info,
            expected_tst_imprint=poe.doc_digest,
            validation_data_handlers=validation_data_handlers,
            extra_status_kwargs=poe.forensic_info,
            status_cls=DocumentTimestampStatus,
        )
        sub_indic = cur_time_result.ades_subindic
        if sub_indic.status == AdESStatus.PASSED:
            # still valid at current time => ok
            # (AdES spec on ER validation is unclear about this, but this
            #  should mean that we can skip the past validation block)
            poe.add_to_poe_manager(resulting_poes)
        elif sub_indic.status == AdESStatus.FAILED:
            # TODO more informative reporting?
            raise errors.SignatureValidationError(
                "Permanent failure while evaluating timestamp in PoE chain",
                ades_subindication=sub_indic,
            )
        else:
            # neither pass nor fail => try past validation procedure
            assert isinstance(sub_indic, AdESIndeterminate)
            past_result = await ades_past_signature_validation(
                signed_data=poe.timestamp_token_signed_data,
                validation_spec=validation_spec,
                poe_manager=temporary_poes,
                current_time_sub_indic=sub_indic,
                init_control_time=cur_timing_info.validation_time,
            )
            if past_result.status == AdESStatus.PASSED:
                poe.add_to_poe_manager(resulting_poes)
            else:
                raise errors.SignatureValidationError(
                    "Could not validate timestamp in PoE chain at current "
                    "time, and past validation also failed",
                    ades_subindication=sub_indic,
                )
    return resulting_poes


_LTA_FURTHER_PROC = frozenset(
    {
        AdESPassed.OK,
        AdESIndeterminate.REVOKED_NO_POE,
        AdESIndeterminate.REVOKED_CA_NO_POE,
        AdESIndeterminate.OUT_OF_BOUNDS_NO_POE,
        AdESIndeterminate.OUT_OF_BOUNDS_NOT_REVOKED,
        AdESIndeterminate.CRYPTO_CONSTRAINTS_FAILURE_NO_POE,
        AdESIndeterminate.REVOCATION_OUT_OF_BOUNDS_NO_POE,
        AdESIndeterminate.TRY_LATER,
    }
)

_LTA_TS_FURTHER_PROC = frozenset(
    {
        AdESIndeterminate.REVOKED_CA_NO_POE,
        AdESIndeterminate.OUT_OF_BOUNDS_NO_POE,
        AdESIndeterminate.OUT_OF_BOUNDS_NOT_REVOKED,
        AdESIndeterminate.CRYPTO_CONSTRAINTS_FAILURE_NO_POE,
        AdESIndeterminate.REVOCATION_OUT_OF_BOUNDS_NO_POE,
    }
)


@dataclass(frozen=True)
class AdESLTAValidationResult(AdESWithTimeValidationResult):
    """
    Result of a PAdES validation for a signature providing long-term
    availability and integrity of validation material.
    See ETSI EN 319 102-1, § 5.6.3.
    """

    oldest_evidence_record_timestamp: Optional[datetime]
    """
    The oldest timestamp in the evidence record, after validation.

    .. note::
        For PAdES, this refers to the chain of document timestamp signatures
        after signing.
    """

    signature_timestamp_status: Optional[AdESBasicValidationResult]
    """
    The validation result for the signature time stamp, if applicable.
    """


async def _process_signature_ts(
    embedded_sig: EmbeddedPdfSignature,
    validation_spec: SignatureValidationSpec,
    poe_manager: POEManager,
    timing_info: ValidationTimingInfo,
) -> Optional[AdESBasicValidationResult]:
    signature_bytes = embedded_sig.signer_info['signature'].native
    signature_ts: cms.SignedData = embedded_sig.attached_timestamp_data
    cert_validation_policy = (
        validation_spec.ts_cert_validation_policy
        or validation_spec.cert_validation_policy
    )
    algo_policy = cert_validation_policy.algorithm_usage_policy
    if signature_ts is None:
        return None

    expected_tst_imprint = embedded_sig.compute_tst_digest()
    assert expected_tst_imprint is not None

    signature_ts_prelim_result = await ades_timestamp_validation(
        tst_signed_data=signature_ts,
        validation_spec=validation_spec,
        timing_info=timing_info,
        expected_tst_imprint=expected_tst_imprint,
        validation_data_handlers=bootstrap_validation_data_handlers(
            validation_spec,
            timing_info=timing_info,
            poe_manager_override=poe_manager,
        ),
    )

    ts_current_time_sub_indic = signature_ts_prelim_result.ades_subindic
    signature_ts_result: AdESBasicValidationResult
    if (
        isinstance(ts_current_time_sub_indic, AdESIndeterminate)
        and ts_current_time_sub_indic in _LTA_TS_FURTHER_PROC
    ):
        try:
            # TODO: in principle, we should also run this if the status is
            #  PASSED already. Ensure that that is possible.
            validation_path = await _ades_past_signature_validation(
                signed_data=signature_ts,
                validation_spec=validation_spec,
                poe_manager=poe_manager,
                current_time_sub_indic=ts_current_time_sub_indic,
                init_control_time=timing_info.validation_time,
                is_timestamp=True,
            )
            signature_ts_result = AdESBasicValidationResult(
                ades_subindic=AdESPassed.OK,
                # TODO update pyHanko status object as well
                api_status=(
                    dataclasses.replace(
                        signature_ts_prelim_result.api_status,
                        validation_path=validation_path,
                    )
                    if signature_ts_prelim_result.api_status
                    else None
                ),
                failure_msg=None,
                validation_objects=signature_ts_prelim_result.validation_objects,
            )
        except errors.SignatureValidationError as e:
            signature_ts_result = AdESBasicValidationResult(
                ades_subindic=e.ades_subindication or ts_current_time_sub_indic,
                failure_msg=e.failure_message,
                api_status=signature_ts_prelim_result.api_status,
                validation_objects=signature_ts_prelim_result.validation_objects,
            )
    else:
        signature_ts_result = signature_ts_prelim_result

    tst_info = signature_ts['encap_content_info']['content'].parsed
    if algo_policy is not None and algo_policy.digest_algorithm_allowed(
        tst_info['message_imprint']['hash_algorithm'],
        moment=timing_info.validation_time,
    ):
        signature_ts_dt = tst_info['gen_time'].native
        poe_manager.register(signature_bytes, signature_ts_dt)
    # TODO if/when we fully support signature policies, we should check
    #  whether the policy requires a valid signature timestamp
    return signature_ts_result


def _dss_to_local_knowledge(
    reader: PdfFileReader,
):
    try:
        dss = DocumentSecurityStore.read_dss(reader)
        dss_ocsps = [
            cont
            for resp in dss.ocsps
            for cont in OCSPContainer.load_multi(
                OCSPResponse.load(resp.get_object().data)
            )
        ]
        dss_crls = [
            CRLContainer(crl_data=CertificateList.load(crl.get_object().data))
            for crl in dss.crls
        ]
        dss_certs = list(dss.load_certs())
        local_knowledge = LocalKnowledge(
            known_ocsps=dss_ocsps,
            known_crls=dss_crls,
            known_certs=dss_certs,
        )
    except NoDSSFoundError:
        local_knowledge = LocalKnowledge()
    return local_knowledge


async def ades_lta_validation(
    embedded_sig: EmbeddedPdfSignature,
    pdf_validation_spec: PdfSignatureValidationSpec,
    timing_info: Optional[ValidationTimingInfo] = None,
    signature_not_before_time: Optional[datetime] = None,
) -> AdESLTAValidationResult:
    """
    Validate a PAdES signature providing long-term availability and integrity
    of validation material. See ETSI EN 319 102-1, § 5.6.3.

    For the purposes of PAdES validation, the chain of document time stamps
    in the document serves as the unique Evidence Record (ER).

    :param embedded_sig:
        The PDF signature to validate.
    :param pdf_validation_spec:
        PDF signature validation settings.
    :param timing_info:
        Data object describing the timing of the validation.
        Defaults to :meth:`.ValidationTimingInfo.now`.
    :param signature_not_before_time:
        Time when the signature was known _not_ to exist.
    :return:
        A validation result.
    """

    timing_info = timing_info or ValidationTimingInfo.now(
        tz=tzlocal.get_localzone()
    )

    # (1) process DocTSes as ER
    poe_list = _build_prima_facie_poe_index_from_pdf_timestamps(
        embedded_sig.reader,
        include_content_ts=True,
        diff_policy=pdf_validation_spec.diff_policy,
    )

    validation_spec = pdf_validation_spec.signature_validation_spec
    init_local_knowledge = validation_spec.local_knowledge
    # Ingest CRLs, certs and OCSPs from the DSS
    # (POE info will be processed separately)
    dss_facts = _dss_to_local_knowledge(reader=embedded_sig.reader)
    local_knowledge = LocalKnowledge(
        known_ocsps=init_local_knowledge.known_ocsps + dss_facts.known_ocsps,
        known_crls=init_local_knowledge.known_crls + dss_facts.known_crls,
        known_certs=init_local_knowledge.known_certs + dss_facts.known_certs,
        known_poes=init_local_knowledge.known_poes,
        nonrevoked_assertions=init_local_knowledge.nonrevoked_assertions,
    )

    augmented_validation_spec = dataclasses.replace(
        validation_spec, local_knowledge=local_knowledge
    )

    updated_poe_manager = None
    oldest_evidence_record_timestamp = None
    try:
        updated_poe_manager = await _validate_prima_facie_poe(
            poe_list,
            validation_spec=augmented_validation_spec,
            cur_timing_info=timing_info,
        )
        # The POE list has been validated at this point,
        # so we just pick out the oldest one
        oldest_docts_record = min(
            filter(
                lambda poe: poe.pdf_revision > embedded_sig.signed_revision,
                poe_list,
            ),
            key=lambda poe: poe.pdf_revision,
            default=None,
        )
        if oldest_docts_record is not None:
            oldest_evidence_record_timestamp = oldest_docts_record.timestamp_dt
        elif not local_knowledge.known_poes:
            # do not show this warning if there are POEs in the local knowledge
            logger.warning(
                "No document timestamps after signature; proceeding "
                "without past proof of existence"
            )
    except errors.SignatureValidationError as e:
        logger.warning(
            "Document timestamp chain failed to validate; proceeding "
            "without past proof of existence.",
            exc_info=e,
        )
    if oldest_evidence_record_timestamp is None:
        updated_poe_manager = POEManager()
        local_knowledge.add_to_poe_manager(updated_poe_manager)

    assert updated_poe_manager is not None

    # (2) skipped, is automatic in our implementation

    # (3) Run validation for signatures with time

    with_time_data_handlers = bootstrap_validation_data_handlers(
        spec=augmented_validation_spec,
        timing_info=timing_info,
        poe_manager_override=copy(updated_poe_manager),
    )
    signature_prelim_result = await ades_with_time_validation(
        signed_data=embedded_sig.signed_data,
        validation_spec=augmented_validation_spec,
        timing_info=timing_info,
        validation_data_handlers=with_time_data_handlers,
        raw_digest=embedded_sig.compute_digest(),
        signature_not_before_time=signature_not_before_time,
        extra_status_kwargs=embedded_sig.summarise_integrity_info(),
        status_cls=PdfSignatureStatus,
    )

    # don't branch on policy here, we always continue as if archival info
    # is present
    current_time_sub_indic = signature_prelim_result.ades_subindic
    failure_msg: Optional[str]
    if current_time_sub_indic not in _LTA_FURTHER_PROC:
        failure_msg = (
            "Validation of signature at current time failed with "
            f"indication {current_time_sub_indic}. Past validation not "
            f"applicable."
        )
        return AdESLTAValidationResult(
            ades_subindic=current_time_sub_indic,
            api_status=signature_prelim_result.api_status,
            failure_msg=failure_msg,
            best_signature_time=signature_prelim_result.best_signature_time,
            signature_not_before_time=(
                signature_prelim_result.signature_not_before_time
            ),
            oldest_evidence_record_timestamp=oldest_evidence_record_timestamp,
            signature_timestamp_status=None,
            validation_objects=signature_prelim_result.validation_objects,
        )

    # (4) Register PoE for the signature based on best_signature_time
    signature_bytes = embedded_sig.signer_info['signature'].native
    updated_poe_manager.register(
        signature_bytes,
        poe_type=POEType.VALIDATION,
        dt=signature_prelim_result.best_signature_time,
    )

    # (5) process signature TS if present
    signature_ts_result = await _process_signature_ts(
        embedded_sig,
        validation_spec=augmented_validation_spec,
        poe_manager=copy(updated_poe_manager),
        timing_info=timing_info,
    )

    # (6) past signature validation
    if isinstance(current_time_sub_indic, AdESIndeterminate):
        # TODO: in principle, we should also run this if the status is PASSED
        #  already. Ensure that that is possible.
        past_sig_poe_manager = copy(updated_poe_manager)
        try:
            await _ades_past_signature_validation(
                signed_data=embedded_sig.signed_data,
                validation_spec=augmented_validation_spec,
                poe_manager=past_sig_poe_manager,
                current_time_sub_indic=current_time_sub_indic,
                init_control_time=timing_info.validation_time,
                is_timestamp=False,
            )
            updated_poe_manager = past_sig_poe_manager
        except errors.SignatureValidationError as e:
            sig_poe = past_sig_poe_manager[signature_bytes]
            return AdESLTAValidationResult(
                ades_subindic=e.ades_subindication or current_time_sub_indic,
                failure_msg=e.failure_message,
                # FIXME rewrite api_status!
                api_status=signature_prelim_result.api_status,
                best_signature_time=sig_poe,
                signature_not_before_time=(
                    signature_prelim_result.signature_not_before_time
                ),
                signature_timestamp_status=signature_ts_result,
                oldest_evidence_record_timestamp=(
                    oldest_evidence_record_timestamp
                ),
                validation_objects=signature_prelim_result.validation_objects,
            )

    # (7) get the oldest PoE for the signature
    signature_poe_time = updated_poe_manager[signature_bytes]

    # (8) perform SVA (=> only crypto checks)
    algo_policy = (
        augmented_validation_spec.cert_validation_policy.algorithm_usage_policy
    )
    ades_subindic: AdESSubIndic
    try:
        cert: x509.Certificate = embedded_sig.signer_cert
        if algo_policy is not None:
            _ades_signature_crypto_policy_check(
                embedded_sig.signer_info,
                algo_policy=algo_policy,
                control_time=signature_poe_time,
                public_key=cert.public_key,
            )
        ades_subindic = AdESPassed.OK
        failure_msg = None
    except errors.SignatureValidationError as e:
        ades_subindic = e.ades_subindication or current_time_sub_indic
        failure_msg = e.failure_message

    return AdESLTAValidationResult(
        ades_subindic=ades_subindic,
        api_status=signature_prelim_result.api_status,
        failure_msg=failure_msg,
        best_signature_time=signature_poe_time,
        signature_not_before_time=(
            signature_prelim_result.signature_not_before_time
        ),
        signature_timestamp_status=signature_ts_result,
        oldest_evidence_record_timestamp=oldest_evidence_record_timestamp,
        validation_objects=signature_prelim_result.validation_objects,
    )


async def simulate_future_ades_lta_validation(
    embedded_sig: EmbeddedPdfSignature,
    pdf_validation_spec: PdfSignatureValidationSpec,
    future_validation_time: datetime,
    current_reference_time: Optional[datetime] = None,
) -> AdESLTAValidationResult:
    """
    .. versionadded:: 0.21.0

    Simulate a future LTA validation of a PDF signature, assuming
    perfect timestamp maintenance until the specified point in time.

    .. warning::
        This is experimental API.

    The purpose of this utility function is to act as a sanity check
    for signers and signature archivists.
    It takes validation spec, a future validation time and
    a current reference time (defaults to the current time), and, by fiat,
    generates proofs of existence for all relevant objects in the PDF for that
    reference time. It then executes the PAdES LTA validation algorithm
    with that set of PoEs against the future validation time, with all
    remote fetching functionality disabled.

    The idea is that this allows the caller to assess whether a signature is
    "LTA maintainable", i.e. whether it contains the necessary information for
    the signature to remain validatable if the timestamp chain is extended
    properly. If this check fails but the signature validates at the current
    time, it may indicate a lack of contemporaneous revocation information.

    :param embedded_sig:
        The signature under scrutiny.
    :param pdf_validation_spec:
        The validation spec against which the simulated validation
        should be executed.
    :param future_validation_time:
        The future validation time at which the validation should be simulated.
    :param current_reference_time:
        The reference time at which all relevant objects in the PDF are
        presumed to have been proven to exist for the purposes of
        the (future) validation being simulated. Defaults to the current time.
    :return:
        An AdES LTA validation result.
    """
    now = current_reference_time or datetime.now(tz=timezone.utc)
    timing_info = ValidationTimingInfo(
        validation_time=future_validation_time,
        point_in_time_validation=True,
        best_signature_time=future_validation_time,
    )
    prima_facie_poes = _build_prima_facie_poe_index_from_pdf_timestamps(
        embedded_sig.reader, include_content_ts=True, diff_policy=None
    )
    orig_sig_validation_spec = pdf_validation_spec.signature_validation_spec
    orig_local_knowledge = orig_sig_validation_spec.local_knowledge
    dss_knowledge = _dss_to_local_knowledge(embedded_sig.reader)
    new_nonrevoked_assertions = list(orig_local_knowledge.nonrevoked_assertions)
    # assert the nonrevoked status of the last timestamp cert, since we can't
    # get "future" revinfo anyway
    try:
        last_ts = embedded_sig.reader.embedded_timestamp_signatures[-1]
        new_nonrevoked_assertions.append(
            NonRevokedStatusAssertion(
                last_ts.signer_cert.sha256, at=future_validation_time
            )
        )
    except IndexError:
        pass

    def _poes():
        # For the purposes of this test, we assert all proofs of existence
        # at the current time, including all the prima facie ones gathered from
        # the file. This simulates perfect record keeping without having to
        # introduce extra timestamp tokens into the validation process.
        yield from orig_local_knowledge.assert_existence_known_at(now)
        yield from dss_knowledge.assert_existence_known_at(now)
        for prima_facie_poe in prima_facie_poes:
            for item in prima_facie_poe.poes_implied:
                # for the prima facie ones, we only emit POEs for <now>, since
                # we don't validate the would-be POEs that are embedded in the
                # document at this point
                yield KnownPOE(
                    poe_type=POEType.PROVIDED,
                    digest=item.digest,
                    poe_time=now,
                    validation_object=item.validation_object,
                )

    updated_local_knowledge = dataclasses.replace(
        orig_local_knowledge,
        known_poes=list(_poes()),
        nonrevoked_assertions=new_nonrevoked_assertions,
    )

    updated_pdf_validation_spec = dataclasses.replace(
        pdf_validation_spec,
        signature_validation_spec=dataclasses.replace(
            orig_sig_validation_spec,
            revinfo_gathering_policy=RevocationInfoGatheringSpec(
                RevinfoOnlineFetchingRule.LOCAL_ONLY
            ),
            local_knowledge=updated_local_knowledge,
        ),
    )
    return await ades_lta_validation(
        embedded_sig,
        updated_pdf_validation_spec,
        timing_info=timing_info,
    )
