Source code for next.forms.dispatch

"""Shared POST pipeline and helpers for form action dispatch."""

from __future__ import annotations

import time
import types
from typing import TYPE_CHECKING, Any, cast

from django.http import (
    HttpResponse,
    HttpResponseBadRequest,
    HttpResponseNotAllowed,
    HttpResponseRedirect,
)

from next.deps import REQUEST_DEP_CACHE_ATTR, RESERVED_KEYS, resolver
from next.utils import caller_source_path

from .base import BaseModelForm
from .signals import action_dispatched, form_validation_failed
from .uid import validated_next_form_page_path


if TYPE_CHECKING:
    from collections.abc import Callable
    from pathlib import Path

    from django import forms as django_forms
    from django.http import HttpRequest

    from .backends import FormActionBackend


_FACTORY_TUPLE_LEN = 2


def _get_caller_path(back_count: int = 1) -> Path:
    """Return the path of the module that called into us, skipping frames here."""
    return caller_source_path(
        back_count=back_count,
        max_walk=15,
        skip_while_filename_endswith=("forms.py", "dispatch.py", "decorators.py"),
    )


def _filter_reserved_url_kwargs(url_kwargs: dict[str, object]) -> dict[str, object]:
    """Drop keys that collide with DI names used by `resolve_dependencies`."""
    return {k: v for k, v in url_kwargs.items() if k not in RESERVED_KEYS}


def _url_kwargs_from_post(request: HttpRequest) -> dict[str, object]:
    """Parse `_url_param_*` hidden fields from POST."""
    out: dict[str, object] = {}
    if not hasattr(request, "POST"):
        return out
    for key, value in request.POST.items():
        if not key.startswith("_url_param_"):
            continue
        param_name = key.replace("_url_param_", "")
        if param_name in RESERVED_KEYS:
            continue
        if isinstance(value, str):
            try:
                out[param_name] = int(value)
            except ValueError:
                out[param_name] = value
        else:
            out[param_name] = value
    return out


def _url_kwargs_from_resolver_or_post(request: HttpRequest) -> dict[str, object]:
    """Return URL kwargs from the resolver match, otherwise from POST hidden fields."""
    resolver_match = getattr(request, "resolver_match", None)
    if resolver_match and getattr(resolver_match, "kwargs", None):
        return _filter_reserved_url_kwargs(dict(resolver_match.kwargs))
    if getattr(request, "method", None) == "POST" and hasattr(request, "POST"):
        return _url_kwargs_from_post(request)
    return {}


def _is_model_instance(obj: object) -> bool:
    """Return True when `obj` quacks like a Django model instance."""
    meta = getattr(obj, "_meta", None)
    return meta is not None and hasattr(meta, "model")


def _build_form(
    form_class: type[django_forms.Form],
    initial_data: object,
    *,
    request: HttpRequest | None,
    init_kwargs: dict[str, Any] | None = None,
) -> django_forms.Form:
    """Build a form, bound to POST when `request` is given.

    Non-empty `init_kwargs` bypass `get_initial` and pass `**init_kwargs`
    straight to the form constructor.
    """
    post_data = request.POST if request is not None else None
    files = request.FILES if request is not None and hasattr(request, "FILES") else None
    bound = request is not None
    if init_kwargs:
        # Pass data/files by keyword so init kwargs can also bind names
        # that collide with the first positional slot.
        if bound:
            return form_class(data=post_data, files=files, **init_kwargs)
        return form_class(**init_kwargs)
    if _is_model_instance(initial_data):
        if not issubclass(form_class, BaseModelForm):
            msg = "instance parameter only supported for ModelForm"
            raise TypeError(msg)
        if bound:
            return form_class(post_data, files, instance=initial_data)
        return form_class(instance=initial_data)
    initial = cast("dict[str, Any] | None", initial_data)
    if bound:
        return form_class(post_data, files, initial=initial)
    return form_class(initial=initial)


def _form_from_initial_data(
    form_class: type[django_forms.Form],
    initial_data: object,
    *,
    init_kwargs: dict[str, Any] | None = None,
) -> django_forms.Form:
    """Build an unbound form from `get_initial` result (dict or model instance)."""
    return _build_form(form_class, initial_data, request=None, init_kwargs=init_kwargs)


def _form_action_context_callable(
    form_class: type[django_forms.Form],
) -> Callable[[HttpRequest], types.SimpleNamespace]:
    """Return a callable that builds a form instance for GET error rendering."""

    def context_func(request: HttpRequest) -> types.SimpleNamespace:
        url_kwargs = _url_kwargs_from_resolver_or_post(request)
        dep_cache: dict[str, Any] = {}
        dep_stack: list[str] = []
        resolved_form_class, init_kwargs = _resolve_form_class(
            form_class,
            request,
            url_kwargs,
            dep_cache,
            dep_stack,
        )
        if init_kwargs:
            form_instance = _form_from_initial_data(
                resolved_form_class, None, init_kwargs=init_kwargs
            )
            return types.SimpleNamespace(form=form_instance)
        if not hasattr(resolved_form_class, "get_initial"):
            msg = f"Form class {resolved_form_class} must have get_initial method"
            raise TypeError(msg)
        resolved = resolver.resolve_dependencies(
            resolved_form_class.get_initial,
            request=request,
            _cache=dep_cache,
            _stack=dep_stack,
            **url_kwargs,
        )
        initial_data = resolved_form_class.get_initial(**resolved)
        form_instance = _form_from_initial_data(resolved_form_class, initial_data)
        return types.SimpleNamespace(form=form_instance)

    return context_func


def _bind_form_for_post(
    form_class: type[django_forms.Form],
    request: HttpRequest,
    initial_data: object,
    *,
    init_kwargs: dict[str, Any] | None = None,
) -> django_forms.Form:
    """Return a bound form for POST validation."""
    return _build_form(
        form_class, initial_data, request=request, init_kwargs=init_kwargs
    )


def _resolve_form_class(
    form_class: object,
    request: HttpRequest,
    url_kwargs: dict[str, object],
    dep_cache: dict[str, Any] | None = None,
    dep_stack: list[str] | None = None,
) -> tuple[type[django_forms.Form], dict[str, Any]]:
    """Return `(form_class, init_kwargs)` for the dispatch.

    A factory may return a `Form` subclass or `(cls, init_kwargs)`; the
    latter bypasses `get_initial` and passes `**init_kwargs` to the form
    constructor.
    """
    if isinstance(form_class, type):
        return cast("type[django_forms.Form]", form_class), {}
    if not callable(form_class):
        msg = f"form_class must be a Form subclass or callable, got {form_class!r}"
        raise TypeError(msg)
    cache = dep_cache if dep_cache is not None else {}
    stack = dep_stack if dep_stack is not None else []
    resolved = resolver.resolve_dependencies(
        form_class,
        request=request,
        _cache=cache,
        _stack=stack,
        **url_kwargs,
    )
    produced = form_class(**resolved)
    if isinstance(produced, tuple) and len(produced) == _FACTORY_TUPLE_LEN:
        cls, init_kwargs = produced
        if isinstance(cls, type) and isinstance(init_kwargs, dict):
            return (
                cast("type[django_forms.Form]", cls),
                cast("dict[str, Any]", init_kwargs),
            )
    if not isinstance(produced, type):
        msg = f"form_class factory must return a Form subclass, got {produced!r}"
        raise TypeError(msg)
    return cast("type[django_forms.Form]", produced), {}


def _normalize_handler_response(
    raw: HttpResponse | str | None | object,
) -> HttpResponse | str | None:
    """Coerce handler output to a string, response, redirect, or `None`."""
    if raw is None or isinstance(raw, (HttpResponse, str)):
        return raw
    if hasattr(raw, "url") and (url := getattr(raw, "url", None)):
        return HttpResponseRedirect(url)
    return None


[docs] class FormActionDispatch: """Shared POST pipeline and response shaping for backends."""
[docs] @staticmethod def dispatch( backend: FormActionBackend, request: HttpRequest, action_name: str, meta: dict[str, Any], ) -> HttpResponse: """Validate the form, run the handler, or re-render errors.""" handler = meta["handler"] form_class = meta.get("form_class") if request.method != "POST": return HttpResponseNotAllowed(["POST"]) url_kwargs = _url_kwargs_from_post(request) dep_cache: dict[str, Any] = {} dep_stack: list[str] = [] # Attach the dispatch dep_cache to the request so a re-render after # validation failure (page context + component context) can rejoin # the same DI cache and reuse Depends("name") values resolved here. setattr(request, REQUEST_DEP_CACHE_ATTR, dep_cache) if form_class is None: return FormActionDispatch._dispatch_handler_only( handler, request, action_name, url_kwargs, dep_cache, dep_stack, ) resolved_form_class, init_kwargs = _resolve_form_class( form_class, request, url_kwargs, dep_cache, dep_stack, ) return FormActionDispatch._dispatch_with_form( backend, request, action_name, handler, resolved_form_class, url_kwargs, dep_cache, dep_stack, init_kwargs=init_kwargs, )
@staticmethod def _dispatch_handler_only( # noqa: PLR0913 handler: Callable[..., Any], request: HttpRequest, action_name: str, url_kwargs: dict[str, object], dep_cache: dict[str, Any], dep_stack: list[str], ) -> HttpResponse: resolved = resolver.resolve_dependencies( handler, request=request, _cache=dep_cache, _stack=dep_stack, **url_kwargs, ) start = time.perf_counter() raw = handler(**resolved) duration_ms = (time.perf_counter() - start) * 1000 response = FormActionDispatch.ensure_http_response( _normalize_handler_response(raw), request=request, ) action_dispatched.send( sender=FormActionDispatch, action_name=action_name, form=None, url_kwargs=dict(url_kwargs), duration_ms=duration_ms, response_status=response.status_code, dep_cache=dict(dep_cache), ) return response @staticmethod def _dispatch_with_form( # noqa: PLR0913 backend: FormActionBackend, request: HttpRequest, action_name: str, handler: Callable[..., Any], form_class: type[django_forms.Form], url_kwargs: dict[str, object], dep_cache: dict[str, Any], dep_stack: list[str], *, init_kwargs: dict[str, Any] | None = None, ) -> HttpResponse: if init_kwargs: form = _bind_form_for_post( form_class, request, None, init_kwargs=init_kwargs ) else: if not hasattr(form_class, "get_initial"): msg = f"Form class {form_class} must have get_initial method" raise TypeError(msg) resolved = resolver.resolve_dependencies( form_class.get_initial, request=request, _cache=dep_cache, _stack=dep_stack, **url_kwargs, ) initial_data = form_class.get_initial(**resolved) form = _bind_form_for_post(form_class, request, initial_data) if not form.is_valid(): if form_validation_failed.receivers: error_count = sum(len(errors) for errors in form.errors.values()) form_validation_failed.send( sender=FormActionDispatch, action_name=action_name, error_count=error_count, field_names=tuple(form.errors.keys()), ) return FormActionDispatch.form_response( backend, request, action_name, form, None ) resolved = resolver.resolve_dependencies( handler, request=request, form=form, _cache=dep_cache, _stack=dep_stack, **url_kwargs, ) start = time.perf_counter() raw = handler(**resolved) duration_ms = (time.perf_counter() - start) * 1000 response = FormActionDispatch.ensure_http_response( _normalize_handler_response(raw), request=request, action_name=action_name, backend=backend, ) action_dispatched.send( sender=FormActionDispatch, action_name=action_name, form=form, url_kwargs=dict(url_kwargs), duration_ms=duration_ms, response_status=response.status_code, dep_cache=dict(dep_cache), ) return response
[docs] @staticmethod def form_response( backend: FormActionBackend, request: HttpRequest, action_name: str, form: django_forms.Form | None, template_fragment: str | None, ) -> HttpResponse: """Return full-page HTML for an invalid form submission.""" page_path = validated_next_form_page_path(request) if page_path is None: return HttpResponseBadRequest("Missing or invalid _next_form_page") html = backend.render_form_fragment( request, action_name, form, template_fragment, page_file_path=page_path, ) return HttpResponse(html)
[docs] @staticmethod def render_form_fragment( # noqa: PLR0913 backend: FormActionBackend, request: HttpRequest, action_name: str, form: django_forms.Form | None, template_fragment: str | None, page_file_path: Path, ) -> str: """Delegate to `render_form_page_with_errors` for the given page file.""" from .rendering import render_form_page_with_errors # noqa: PLC0415 return render_form_page_with_errors( backend, request, action_name, form, template_fragment, page_file_path, )
[docs] @staticmethod def ensure_http_response( response: HttpResponse | str | None, request: HttpRequest | None = None, action_name: str | None = None, backend: FormActionBackend | None = None, ) -> HttpResponse: """Coerce `None`, `str`, or `HttpResponse` into an `HttpResponse`.""" response = _normalize_handler_response(response) if response is None: if request and action_name and backend: return FormActionDispatch.form_response( backend, request, action_name, None, None ) return HttpResponse(status=204) if isinstance(response, HttpResponse): return response return HttpResponse(response)
[docs] def build_form_namespace_for_action( action_name: str, request: HttpRequest, ) -> types.SimpleNamespace | None: """Build the `SimpleNamespace(form=...)` used by `{% form %}` when lazy.""" from .manager import form_action_manager # noqa: PLC0415 form_action_manager._ensure_backends() for backend in form_action_manager._backends: meta = backend.get_meta(action_name) if meta is None: continue fc = meta.get("form_class") if fc is None: return None return _form_action_context_callable(fc)(request) return None
__all__ = [ "FormActionDispatch", "_bind_form_for_post", "_filter_reserved_url_kwargs", "_form_action_context_callable", "_form_from_initial_data", "_get_caller_path", "_normalize_handler_response", "_url_kwargs_from_post", "_url_kwargs_from_resolver_or_post", "build_form_namespace_for_action", ]