Skip to content

Registries

Tool, resource, and prompt lookup, plus session storage and SSE infrastructure.

ToolBinding wraps a ServiceSpec (mutation tools); SelectorToolBinding wraps a SelectorSpec and carries the read-shaped pipeline knobs (filter_set, ordering_fields, paginate). The shared ToolRegistry accepts either kind and is what tools/list and tools/call iterate.

ToolBinding dataclass

Bases: Generic[InputT, ResultT, ExtraT]

All wiring for a single MCP tool, derived from a ServiceSpec.

A tool is the projection of a service callable plus its declared input and output serializers. The MCP server invokes spec.service directly via resolve_callable_kwargs + run_service — there is no view or viewset in the dispatch path.

The Generic[InputT, ResultT, ExtraT] parameters mirror ServiceSpec's generics and are purely informational for type checkers. They default to Any when omitted, so existing call sites keep working unchanged.

SelectorToolBinding dataclass

Bases: Generic[ResultT, ExtraT]

All wiring for a single MCP read-shaped tool, derived from a SelectorSpec.

Mirrors :class:ToolBinding (which wraps a ServiceSpec for mutations), but the dispatch pipeline is read-shaped. The shape is chosen by :attr:kind:

kind=LIST runs the full pipeline:

.. code-block:: text

arguments → validate(merged inputSchema) → run_selector
          → FilterSet(data=...).qs    (if ``filter_set`` set)
          → order_by(...)             (if ``ordering_fields`` set)
          → paginate                  (if ``paginate=True``)
          → output_serializer(many=True)
          → ToolResult

kind=RETRIEVE skips the post-fetch pipeline entirely — the selector's single-instance return goes straight to output_serializer(many=False). Combining RETRIEVE with filter_set / ordering_fields / paginate is rejected at construction (those knobs only make sense on a collection).

Selectors return raw, unscoped data (a queryset for LIST, a single instance for RETRIEVE) — the tool layer owns shape decisions. A LIST binding with none of filter_set / ordering_fields / paginate set behaves like a plain RPC read that calls the selector and renders its return value verbatim.

The Generic[InputT, ResultT, ExtraT] parameters mirror SelectorSpec's generics and are purely informational for type checkers.

kind property

kind: SelectorKind

Shape discriminator — derived from the spec's required kind field.

Sister-repo 0.13+ made kind a required field on :class:SelectorSpec, so the binding doesn't store an independent copy — it would only be a chance for the two to drift. Exposed as a property so the dispatch layer can keep reading binding.kind unchanged.

ToolRegistry

Name → tool binding lookup.

Holds both :class:ToolBinding (service tools, mutations) and :class:SelectorToolBinding (selector tools, reads). Names share a namespace — duplicates are rejected loudly so a misconfigured project surfaces the conflict at discovery time rather than silently shadowing a tool.

ResourceBinding dataclass

Bases: Generic[ResultT]

All wiring for a single MCP resource (or resource template).

A resource is a selector callable plus a URI template. The MCP server invokes the selector directly via resolve_callable_kwargs + run_selector — there is no view or viewset in the dispatch path.

output_serializer is consulted by resources/read to render the selector's return value. mime_type advertises the encoding we will return — usually "application/json".

kwargs_provider mirrors SelectorSpec.kwargs from djangorestframework-services >= 0.6: when set, the handler invokes it once per request and merges the returned dict into the kwarg pool. The provider receives a synthesised :class:MCPServiceView (URI-template variables exposed as view.kwargs, the binding name as view.action).

The Generic[ResultT] parameter is purely informational — it lets callers pin the selector's return type for IDE / type-checker help. Defaults to Any when omitted.

ResourceRegistry

URI / URI-template → :class:ResourceBinding lookup.

Concrete resources (no template variables) are matched by exact URI; templates are matched by regex derived from the template. resolve returns the binding plus the variable bindings extracted from the URI.

PromptBinding dataclass

All wiring for a single MCP prompt.

A prompt is a server-defined message-template the client invokes by name. The render callable receives the client-supplied arguments as kwargs and returns either:

  • a list of :class:PromptMessage instances (full control), or
  • a list of strings (each becomes a user text message), or
  • a single string (becomes one user text message), or
  • a coroutine yielding any of the above.

The handler normalises whatever shape the callable returns into the spec's messages list at dispatch time.

PromptRegistry

Name → :class:PromptBinding lookup.

Mirrors :class:ToolRegistry exactly — names are unique, duplicates raise loudly at registration time.

Bulk registration

register_tools(server, definitions, *, selector_defaults=None, service_defaults=None) is an additive entry point for registering many tools in one call. Pass a list of ToolDefinition.service(...) / ToolDefinition.selector(...) instances plus per-kind ServiceDefaults / SelectorDefaults that fill in fields each definition leaves as None. Returns the resulting bindings in input order.

register_tools

register_tools(
    server: MCPServer,
    definitions: Iterable[ToolDefinition],
    *,
    selector_defaults: SelectorDefaults | None = None,
    service_defaults: ServiceDefaults | None = None,
) -> list[ToolBinding | SelectorToolBinding]

Register every :class:ToolDefinition against server.

Defaults dataclasses supply per-kind kwarg defaults that are merged underneath each definition's own values (definition wins on conflict — every field on the definition that is not None is considered "set by the author").

Returns the list of resulting bindings in the same order as definitions, so test harnesses and observability code can introspect what landed.

Raises :class:TypeError if a definition's kind is unrecognised (the discriminator is internal, so this can only happen via direct :class:ToolDefinition construction with an unsupported value).

ToolDefinition dataclass

Declarative description of a single tool, fed to :func:register_tools.

ToolDefinition is a transport-agnostic container — it holds the kwargs that would otherwise be passed to :meth:MCPServer.register_service_tool or :meth:MCPServer.register_selector_tool, plus a :class:ToolKind discriminator that selects between them at dispatch time.

Construct via the classmethods, not the dataclass constructor — the methods enforce the per-kind kwarg surface (a service definition can't set filter_set; a selector definition can't omit input_serializer quietly etc.). Direct construction is available for tests and tooling but bypasses the type-shape guarantees.

Every per-call kwarg defaults to None; downstream :func:register_tools treats None as "no override", which lets a :class:SelectorDefaults / :class:ServiceDefaults instance supply the value, falling back to the registration method's own default if neither is set.

service classmethod

service(
    *,
    name: str,
    spec: ServiceSpec,
    description: str | None = None,
    title: str | None = None,
    display_name: str | None = None,
    display_description: str | None = None,
    output_format: OutputFormat | None = None,
    permissions: Sequence[Any] | None = None,
    rate_limits: Sequence[Any] | None = None,
    annotations: dict[str, Any] | None = None,
    include_structured_content: bool | None = None,
    include_output_schema: bool | None = None,
    argument_binding: ArgumentBinding | None = None,
    unknown_arguments: UnknownArguments | None = None,
    always_listed: bool | None = None,
    spec_kwargs_provides: Sequence[str] | None = None,
) -> ToolDefinition

Typed entry point for service-tool definitions.

selector classmethod

selector(
    *,
    name: str,
    spec: SelectorSpec,
    description: str | None = None,
    title: str | None = None,
    display_name: str | None = None,
    display_description: str | None = None,
    input_serializer: type | None = None,
    output_format: OutputFormat | None = None,
    permissions: Sequence[Any] | None = None,
    rate_limits: Sequence[Any] | None = None,
    annotations: dict[str, Any] | None = None,
    filter_set: Any | None = None,
    ordering_fields: Sequence[str] | None = None,
    paginate: bool | None = None,
    include_structured_content: bool | None = None,
    include_output_schema: bool | None = None,
    argument_binding: ArgumentBinding | None = None,
    unknown_arguments: UnknownArguments | None = None,
    always_listed: bool | None = None,
    spec_kwargs_provides: Sequence[str] | None = None,
) -> ToolDefinition

Typed entry point for selector-tool definitions.

The selector's LIST / RETRIEVE shape lives on the spec (SelectorSpec.kind, required in djangorestframework-services 0.13+), not here — the bulk registration loop reads it from there.

ServiceDefaults dataclass

Per-kind defaults for :func:register_tools over service definitions.

Every field is Optional and None is the "no override" sentinel — only non-None values are applied as defaults to the matching :meth:MCPServer.register_service_tool call. A per- definition value always wins over the default.

Because include_structured_content and include_output_schema are tri-state on the registration method (None = inherit global setting, True/False = force), the same None-as-sentinel convention applies here: ServiceDefaults(include_structured_content=None) is identical to "no override" — if you want to force every binding to inherit the global, leave it unset; if you want to force True/False, pass that explicitly.

SelectorDefaults dataclass

Per-kind defaults for :func:register_tools over selector definitions.

Sister of :class:ServiceDefaults. Same conventions:

  • Every field is Optional.
  • None = "no override; use the per-definition or the :meth:MCPServer.register_selector_tool default".
  • Per-definition kwargs always win on conflict.

Selector-only knobs (input_serializer, filter_set, ordering_fields, paginate) live here too so a project that wants every selector tool to paginate by default can express that in one place.

ToolKind

Bases: Enum

Discriminator for :class:ToolDefinition and the :func:register_tools dispatch table.

Internal-only — never appears on the wire. Members map directly to the two registration entry points on :class:MCPServer:

  • SERVICE → :meth:MCPServer.register_service_tool
  • SELECTOR → :meth:MCPServer.register_selector_tool

Use :meth:ToolDefinition.service / :meth:ToolDefinition.selector instead of constructing :class:ToolDefinition with this kwarg directly — the classmethods are the typed entry points.

UnknownArguments

Bases: Enum

How a binding handles MCP arguments keys not declared in its inputSchema.

Decouples "fields I want strictly validated" from "fields the client may pass through anyway". Built on top of input_serializer, not as a replacement — DRF validation still runs on the declared fields in every mode.

Members:

  • REJECT (default) — the merged inputSchema advertises "additionalProperties": false and the validator rejects any key that isn't part of the binding's known field set. Failure surfaces as -32602 with the offending key names in data.detail["non_field_errors"].
  • PASSTHROUGH — the merged inputSchema advertises "additionalProperties": true; unknown keys survive validation and are merged onto the validated dict before binding to the callable. Useful when the client sends evolving query args (q / cursor / since) that the spec author wants to forward to the callable without restating each in the serializer.
  • IGNORE — the merged inputSchema advertises "additionalProperties": true, but unknown keys are dropped after validation. Forward-compatibility mode: older clients can still send fields newer servers haven't formalised yet, and the server silently accepts and ignores them.

Reserved transport-controlled keys (request / user / data) and selector-tool post-fetch keys (ordering / page / limit) are never considered "unknown" in any mode — they're handled by the dispatch pipeline, not the validator.

Plain :class:Enum, same discipline as :class:ArgumentBinding: internal-only value, no string coercion at API boundaries.

ArgumentBinding

Bases: Enum

How MCP arguments flow into the kwarg pool of the dispatched callable.

The pool always carries request and user. This enum controls whether MCP arguments show up as a single data= key (the historical behavior, well-suited to mutation-shaped services that take an input_serializer-validated dict) or are spread as top-level pool keys so the callable can declare them as individual parameters (the natural shape for selectors and parametric reads).

Members:

  • DATA_ONLY — MCP arguments enter the pool only as data=<validated-or-raw>. Default for :class:ToolBinding (service tools), since mutation services typically take a single validated data payload.
  • MERGE — every key from the validated MCP arguments (or, when no input_serializer is declared, the raw arguments minus the pipeline-reserved keys ordering / page / limit) is added to the pool. spec.kwargs(...) output overrides on conflict — author-declared kwargs win over client-supplied ones, a critical invariant for project-scoping selectors. Default for :class:SelectorToolBinding (selector tools).
  • REPLACE — like MERGE, but spec.kwargs(...) loses on conflict. Useful only when the kwargs provider supplies defaults the client is allowed to override.

The value is internal — never appears on the wire, never coerced from a string. ArgumentBinding is a plain :class:Enum (not str, Enum); pass the member directly when registering tools.

Selector-tool schema

Helpers that build the merged inputSchema for selector tools — exposed for projects that want to introspect filter / ordering / pagination property generation outside of the registration flow.

filterset_to_schema_properties

filterset_to_schema_properties(filter_set_class: Any) -> dict[str, dict[str, Any]]

Map a django-filter FilterSet class to JSON Schema properties.

Walks FilterSet.declared_filters plus any auto-generated filters (from a Meta declaration) and returns a property dict shaped like the "properties" key of a JSON Schema object — ready to merge into a tool's inputSchema.

Filter properties are always optional from MCP's perspective: they narrow the queryset but are not required to call the tool. The merger in :mod:rest_framework_mcp.schema.input_schema does not add filter names to the required array.

Common filter classes get accurate JSON Schema mappings; exotic classes fall back to {} (the JSON Schema "any value" shape) so a custom filter never breaks tool discovery. Tests document which classes are precisely mapped.

Raises ImportError when django-filter isn't installed — this only fires when a binding is actually constructed with filter_set=..., so projects that don't use the integration are unaffected.

build_selector_tool_input_schema

build_selector_tool_input_schema(binding: SelectorToolBinding) -> dict[str, Any]

Build the JSON Schema for a selector tool's inputSchema.

Merges four sources, in order of precedence (later sources override earlier ones on key collision):

  1. spec.input_serializer — any explicit input shape declared by the consumer (e.g. for tool-specific args that aren't filter params). All required-marked fields stay required.
  2. filter_set — properties derived from django-filter filter declarations. All optional.
  3. ordering_fields — adds an ordering property as an enum of "<field>" and "-<field>" values. Optional.
  4. paginate=True — adds optional page (positive integer) and limit (positive integer) properties.

The final schema is always an object with "type": "object", "properties": {...}, and "required": [...] only when at least one required field exists.

Session stores

SessionStore

Bases: Protocol

Pluggable persistence for MCP-Session-Id lifecycle.

The transport calls :meth:create after a successful initialize — binding the new session to the authenticated principal — and :meth:owner on every subsequent request to enforce both that clients re-initialize after a server restart and that a session minted under one principal cannot be presented by another. :meth:destroy is invoked on HTTP DELETE (after the same ownership check).

principal_id is an opaque string the transport derives from the authenticated token (see :func:rest_framework_mcp.transport.utils.principal_for_token); stores persist and return it verbatim.

.. versionchanged:: 0.7 :meth:create takes a required keyword-only principal_id and :meth:owner joined the protocol. Custom store implementations must add both; storing the principal alongside the session id is the only new obligation.

InMemorySessionStore

Process-local session store. Useful for tests and single-process dev servers.

State lives on the instance, so each store is isolated. Multi-process deployments should use :class:DjangoCacheSessionStore instead — this class will not see sessions created in another process.

DjangoCacheSessionStore

Session store backed by django.core.cache.

Works across processes — the production-suitable default. TTL is fixed at 24 hours; for stricter pinning, projects can subclass and override :meth:create.

The cached value is the owning principal id, so :meth:owner is a single cache read. Sessions written by pre-0.7 versions stored True instead of a principal — those fail the ownership comparison and the client transparently re-initializes.

Server-initiated push

SSEBroker

Bases: Protocol

Pluggable pub/sub for server-pushed MCP messages.

The transport calls :meth:subscribe when a client opens GET /mcp/, :meth:publish from app code that wants to push a payload to a specific session, and :meth:unsubscribe when the streaming generator unwinds.

Two concrete implementations ship today:

  • :class:InMemorySSEBroker — single-process, no infra. Suitable for development and single-worker ASGI deployments.
  • :class:RedisSSEBroker — Redis pub/sub. Required for multi-worker deployments where any worker can serve the streaming GET. Pulled in via the [redis] optional extra.

The contract is intentionally narrow: a session has at most one live subscriber; publish returns True if a delivery was attempted, False if no subscriber was attached. Implementations decide whether publish is fire-and-forget or awaits delivery confirmation; the MCP transport treats it as best-effort either way.

InMemorySSEBroker

In-process per-session pub/sub for server-pushed MCP messages.

Each subscribed session gets a private :class:asyncio.Queue. App code running in the same Python process publishes to it via :meth:publish; the streaming GET generator pulls off the queue and emits SSE frames.

State is instance-scoped — the :class:MCPServer owns one broker, so multiple servers in the same process don't share state. Multi-process deployments need an out-of-process backend; see :class:RedisSSEBroker (in the [redis] extra) for the production choice.

The broker enforces a single subscriber per session — if a client re-subscribes (e.g. after a dropped connection), the previous queue is replaced and the old generator will eventually error out on its next await. There is no replay; clients that need durability should call tools/call directly rather than relying on SSE.

unsubscribe

unsubscribe(session_id: str, queue: Queue[Any]) -> None

Remove queue from the registry if it's still the live subscriber.

Compares by identity so a re-subscribed session doesn't accidentally unregister the new queue when the old generator shuts down.

publish async

publish(session_id: str, payload: Any) -> bool

Enqueue payload for session_id if a subscriber exists.

Returns True if delivery was attempted, False if the session had no subscriber. The caller decides how to react to a miss — most callers will ignore it (the client will catch up via a fresh tools/call round-trip).

RedisSSEBroker

Cross-process SSE broker backed by Redis pub/sub.

Drop-in replacement for :class:InMemorySSEBroker when running multiple ASGI workers behind a load balancer. The streaming GET handler can land on any worker; await server.notify(...) from a different worker reaches the right session because every worker subscribes to the same Redis channel.

Each session subscribes to its own Redis channel (<prefix>:<session_id>) and runs a background asyncio.Task that pulls messages off the Redis pub/sub stream and pushes them onto a local :class:asyncio.Queue — the same queue shape the SSE response generator expects. JSON encode/decode happens at the broker boundary so app code pushes Python dicts and the streaming generator sees them as dicts too.

Wire it into :class:MCPServer:

.. code-block:: python

from redis.asyncio import Redis
from rest_framework_mcp import MCPServer
from rest_framework_mcp.transport.redis_sse_broker import RedisSSEBroker

broker = RedisSSEBroker(Redis.from_url("redis://localhost:6379/0"))
server = MCPServer(name="my-app", sse_broker=broker)

Caveats:

  • Same single-subscriber-per-session contract as the in-memory broker (re-subscribing replaces the old subscriber's queue).
  • Message replay is a separate, opt-in collaborator — pair this with :class:RedisSSEReplayBuffer (passed as MCPServer(sse_replay_buffer=...)) for cross-worker Last-Event-ID resume.
  • The Redis client's lifecycle is the consumer's responsibility — close it during ASGI lifespan shutdown.

publish async

publish(session_id: str, payload: Any) -> bool

Publish to the session's channel and report whether anyone received it.

redis.publish returns the number of subscribers that got the message; we surface True when at least one listener was attached (typical case), False otherwise. Note that "0 subscribers" can also mean the streaming task hasn't connected yet — callers that require strict at-least-once delivery should layer their own retry.

has_subscriber

has_subscriber(session_id: str) -> bool

Local-only check.

Reflects whether this worker has an active subscriber. Across- process visibility would require an extra Redis round-trip and isn't useful for the typical caller (the streaming generator only cares about its own queue).

SSE replay (resume)

SSEReplayBuffer

Bases: Protocol

Pluggable per-session ring buffer for SSE event replay.

Pair this with an :class:SSEBroker to support Last-Event-ID resume — when a client reconnects with that header, the SSE response generator drains every event past the supplied ID from the buffer before entering live mode, so the client sees no gap from the server's POV.

The buffer is the single source of truth for event IDs: :meth:record assigns a new monotonic ID per session and returns it, so the live frame and any future replayed frame agree on the ID. The transport wraps that ID into the broker payload as {"_mcp_event_id", "_mcp_payload"} and the SSE response generator unwraps it to emit id: lines.

Implementations should bound their per-session storage — replay buffers without a cap leak when clients never reconnect. The shipped in-memory variant uses a fixed-size :class:collections.deque; the Redis variant uses XADD MAXLEN ~ N for capped streams.

Resume is opt-in: pass sse_replay_buffer=... to :class:MCPServer to enable it. When omitted, the SSE wire shape is unchanged (no id: lines) and Last-Event-ID from clients is silently ignored.

record async

record(session_id: str, payload: Any) -> str

Persist payload for session_id and return its event ID.

The returned ID is what the SSE response emits as the id: line and what the client echoes back via Last-Event-ID on resume. IDs must be monotonic within a session; cross-session ordering is not required.

replay

replay(session_id: str, after_id: str | None) -> AsyncIterator[tuple[str, Any]]

Yield (event_id, payload) pairs strictly after after_id.

after_id=None (no header sent) yields nothing — fresh subscribe is the no-replay path. An after_id that's older than the buffer's oldest retained event yields whatever is still in the ring (best-effort delivery; the client knows it lost some events only by counting). An after_id newer than the latest recorded event yields nothing — the client is already up to date.

forget async

forget(session_id: str) -> None

Drop all retained events for session_id.

Called when a session is explicitly destroyed (DELETE) so dead sessions don't accumulate buffer state. Implementations that rely on TTL-based eviction can no-op this.

InMemorySSEReplayBuffer

In-process bounded replay buffer for SSE event resume.

Each session holds its own :class:collections.deque capped at max_events; the oldest event is evicted when a new one arrives. Event IDs are zero-padded monotonic integers per session — string- valued because the SSE wire format is string-only and clients echo them back verbatim via Last-Event-ID.

Suitable for single-process ASGI deployments. Multi-worker deployments must use :class:RedisSSEReplayBuffer because the streaming GET that handles a resume can land on a different worker than the one that recorded the events.

State is instance-scoped — :class:MCPServer owns one buffer, so multiple servers in the same process don't share replay history.

RedisSSEReplayBuffer

Cross-process replay buffer backed by Redis Streams.

Drop-in replacement for :class:InMemorySSEReplayBuffer when running multiple ASGI workers. The streaming GET that handles a reconnect can land on any worker; reading from a shared Redis Stream means the replay is the same regardless of which worker recorded the events.

Stream IDs are auto-assigned by Redis (ms-seq format) and are monotonic within a session — they double as the SSE event IDs the client echoes back via Last-Event-ID. MAXLEN ~ N caps the retained history per session; the ~ makes trimming approximate (Redis trims when convenient) which is fine for replay buffers.

Wire it into :class:MCPServer::

from redis.asyncio import Redis
from rest_framework_mcp import MCPServer
from rest_framework_mcp.transport.redis_sse_replay_buffer import (
    RedisSSEReplayBuffer,
)

client = Redis.from_url("redis://localhost:6379/0")
buffer = RedisSSEReplayBuffer(client, max_events=2048)
server = MCPServer(name="my-app", sse_broker=..., sse_replay_buffer=buffer)

The Redis client is the consumer's responsibility — close it during ASGI lifespan shutdown.

record async

record(session_id: str, payload: Any) -> str

Append payload to the session's stream and return the assigned ID.

XADD <key> MAXLEN ~ N * data <json> — the * lets Redis choose a monotonic ID; ~ makes trimming approximate (Redis trims at internal node boundaries, which is faster than exact trimming and bounds memory in the same shape).