Skip to content

Registries

Tool, resource, and prompt lookup, plus session storage.

ToolBinding wraps a ServiceSpec (mutation tools); SelectorToolBinding wraps a SelectorSpec and carries the read-shaped pipeline knobs (filter_set, ordering_fields, paginate). The shared ToolRegistry accepts either kind and is what tools/list and tools/call iterate.

ToolBinding dataclass

Bases: Generic[InputT, ResultT, ExtraT]

All wiring for a single MCP tool, derived from a ServiceSpec.

A tool is the projection of a service callable plus its declared input and output serializers. The MCP server invokes spec.service directly via resolve_callable_kwargs + run_service — there is no view or viewset in the dispatch path.

The Generic[InputT, ResultT, ExtraT] parameters mirror ServiceSpec's generics and are purely informational for type checkers. They default to Any when omitted, so existing call sites keep working unchanged.

SelectorToolBinding dataclass

Bases: Generic[ResultT, ExtraT]

All wiring for a single MCP read-shaped tool, derived from a SelectorSpec.

Mirrors :class:ToolBinding (which wraps a ServiceSpec for mutations), but the dispatch pipeline is read-shaped:

.. code-block:: text

arguments → validate(merged inputSchema) → run_selector
          → FilterSet(data=...).qs    (if ``filter_set`` set)
          → order_by(...)             (if ``ordering_fields`` set)
          → paginate                  (if ``paginate=True``)
          → output_serializer(many=True)
          → ToolResult

Selectors return raw, unscoped querysets — the tool layer owns the post-fetch pipeline. A binding with none of filter_set / ordering_fields / paginate set behaves like a plain RPC read that calls the selector and renders its return value verbatim.

The Generic[InputT, ResultT, ExtraT] parameters mirror SelectorSpec's generics and are purely informational for type checkers.

ToolRegistry

Name → tool binding lookup.

Holds both :class:ToolBinding (service tools, mutations) and :class:SelectorToolBinding (selector tools, reads). Names share a namespace — duplicates are rejected loudly so a misconfigured project surfaces the conflict at discovery time rather than silently shadowing a tool.

ResourceBinding dataclass

Bases: Generic[ResultT]

All wiring for a single MCP resource (or resource template).

A resource is a selector callable plus a URI template. The MCP server invokes the selector directly via resolve_callable_kwargs + run_selector — there is no view or viewset in the dispatch path.

output_serializer is consulted by resources/read to render the selector's return value. mime_type advertises the encoding we will return — usually "application/json".

kwargs_provider mirrors SelectorSpec.kwargs from djangorestframework-services >= 0.6: when set, the handler invokes it once per request and merges the returned dict into the kwarg pool. The provider receives a synthesised :class:MCPServiceView (URI-template variables exposed as view.kwargs, the binding name as view.action).

The Generic[ResultT] parameter is purely informational — it lets callers pin the selector's return type for IDE / type-checker help. Defaults to Any when omitted.

ResourceRegistry

URI / URI-template → :class:ResourceBinding lookup.

Concrete resources (no template variables) are matched by exact URI; templates are matched by regex derived from the template. resolve returns the binding plus the variable bindings extracted from the URI.

PromptBinding dataclass

All wiring for a single MCP prompt.

A prompt is a server-defined message-template the client invokes by name. The render callable receives the client-supplied arguments as kwargs and returns either:

  • a list of :class:PromptMessage instances (full control), or
  • a list of strings (each becomes a user text message), or
  • a single string (becomes one user text message), or
  • a coroutine yielding any of the above.

The handler normalises whatever shape the callable returns into the spec's messages list at dispatch time.

PromptRegistry

Name → :class:PromptBinding lookup.

Mirrors :class:ToolRegistry exactly — names are unique, duplicates raise loudly at registration time.

Selector-tool schema

Helpers that build the merged inputSchema for selector tools — exposed for projects that want to introspect filter / ordering / pagination property generation outside of the registration flow.

filterset_to_schema_properties

filterset_to_schema_properties(filter_set_class: Any) -> dict[str, dict[str, Any]]

Map a django-filter FilterSet class to JSON Schema properties.

Walks FilterSet.declared_filters plus any auto-generated filters (from a Meta declaration) and returns a property dict shaped like the "properties" key of a JSON Schema object — ready to merge into a tool's inputSchema.

Filter properties are always optional from MCP's perspective: they narrow the queryset but are not required to call the tool. The merger in :mod:rest_framework_mcp.schema.input_schema does not add filter names to the required array.

Common filter classes get accurate JSON Schema mappings; exotic classes fall back to {} (the JSON Schema "any value" shape) so a custom filter never breaks tool discovery. Tests document which classes are precisely mapped.

Raises ImportError when django-filter isn't installed — this only fires when a binding is actually constructed with filter_set=..., so projects that don't use the integration are unaffected.

build_selector_tool_input_schema

build_selector_tool_input_schema(binding: SelectorToolBinding) -> dict[str, Any]

Build the JSON Schema for a selector tool's inputSchema.

Merges four sources, in order of precedence (later sources override earlier ones on key collision):

  1. spec.input_serializer — any explicit input shape declared by the consumer (e.g. for tool-specific args that aren't filter params). All required-marked fields stay required.
  2. filter_set — properties derived from django-filter filter declarations. All optional.
  3. ordering_fields — adds an ordering property as an enum of "<field>" and "-<field>" values. Optional.
  4. paginate=True — adds optional page (positive integer) and limit (positive integer) properties.

The final schema is always an object with "type": "object", "properties": {...}, and "required": [...] only when at least one required field exists.

Session stores

SessionStore

Bases: Protocol

Pluggable persistence for MCP-Session-Id lifecycle.

The transport calls :meth:create after a successful initialize and :meth:exists on every subsequent request to enforce that clients re-initialize after a server restart. :meth:destroy is invoked on HTTP DELETE.

Stores need not retain rich state today (the MCP spec only demands the ID is recognised), but the interface leaves room for future extension such as protocol-version pinning per session.

InMemorySessionStore

Process-local session store. Useful for tests and single-process dev servers.

State lives on the instance, so each store is isolated. Multi-process deployments should use :class:DjangoCacheSessionStore instead — this class will not see sessions created in another process.

DjangoCacheSessionStore

Session store backed by django.core.cache.

Works across processes — the production-suitable default. TTL is fixed at 24 hours; for stricter pinning, projects can subclass and override :meth:create.

Server-initiated push

SSEBroker

Bases: Protocol

Pluggable pub/sub for server-pushed MCP messages.

The transport calls :meth:subscribe when a client opens GET /mcp/, :meth:publish from app code that wants to push a payload to a specific session, and :meth:unsubscribe when the streaming generator unwinds.

Two concrete implementations ship today:

  • :class:InMemorySSEBroker — single-process, no infra. Suitable for development and single-worker ASGI deployments.
  • :class:RedisSSEBroker — Redis pub/sub. Required for multi-worker deployments where any worker can serve the streaming GET. Pulled in via the [redis] optional extra.

The contract is intentionally narrow: a session has at most one live subscriber; publish returns True if a delivery was attempted, False if no subscriber was attached. Implementations decide whether publish is fire-and-forget or awaits delivery confirmation; the MCP transport treats it as best-effort either way.

InMemorySSEBroker

In-process per-session pub/sub for server-pushed MCP messages.

Each subscribed session gets a private :class:asyncio.Queue. App code running in the same Python process publishes to it via :meth:publish; the streaming GET generator pulls off the queue and emits SSE frames.

State is instance-scoped — the :class:MCPServer owns one broker, so multiple servers in the same process don't share state. Multi-process deployments need an out-of-process backend; see :class:RedisSSEBroker (in the [redis] extra) for the production choice.

The broker enforces a single subscriber per session — if a client re-subscribes (e.g. after a dropped connection), the previous queue is replaced and the old generator will eventually error out on its next await. There is no replay; clients that need durability should call tools/call directly rather than relying on SSE.

unsubscribe

unsubscribe(session_id: str, queue: Queue[Any]) -> None

Remove queue from the registry if it's still the live subscriber.

Compares by identity so a re-subscribed session doesn't accidentally unregister the new queue when the old generator shuts down.

publish async

publish(session_id: str, payload: Any) -> bool

Enqueue payload for session_id if a subscriber exists.

Returns True if delivery was attempted, False if the session had no subscriber. The caller decides how to react to a miss — most callers will ignore it (the client will catch up via a fresh tools/call round-trip).

RedisSSEBroker

Cross-process SSE broker backed by Redis pub/sub.

Drop-in replacement for :class:InMemorySSEBroker when running multiple ASGI workers behind a load balancer. The streaming GET handler can land on any worker; await server.notify(...) from a different worker reaches the right session because every worker subscribes to the same Redis channel.

Each session subscribes to its own Redis channel (<prefix>:<session_id>) and runs a background asyncio.Task that pulls messages off the Redis pub/sub stream and pushes them onto a local :class:asyncio.Queue — the same queue shape the SSE response generator expects. JSON encode/decode happens at the broker boundary so app code pushes Python dicts and the streaming generator sees them as dicts too.

Wire it into :class:MCPServer:

.. code-block:: python

from redis.asyncio import Redis
from rest_framework_mcp import MCPServer
from rest_framework_mcp.transport.redis_sse_broker import RedisSSEBroker

broker = RedisSSEBroker(Redis.from_url("redis://localhost:6379/0"))
server = MCPServer(name="my-app", sse_broker=broker)

Caveats:

  • Same single-subscriber-per-session contract as the in-memory broker (re-subscribing replaces the old subscriber's queue).
  • No message replay; Last-Event-ID resume is a separate feature tracked in Phase 7c.
  • The Redis client's lifecycle is the consumer's responsibility — close it during ASGI lifespan shutdown.

publish async

publish(session_id: str, payload: Any) -> bool

Publish to the session's channel and report whether anyone received it.

redis.publish returns the number of subscribers that got the message; we surface True when at least one listener was attached (typical case), False otherwise. Note that "0 subscribers" can also mean the streaming task hasn't connected yet — callers that require strict at-least-once delivery should layer their own retry.

has_subscriber

has_subscriber(session_id: str) -> bool

Local-only check.

Reflects whether this worker has an active subscriber. Across- process visibility would require an extra Redis round-trip and isn't useful for the typical caller (the streaming generator only cares about its own queue).

SSE replay (resume)

SSEReplayBuffer

Bases: Protocol

Pluggable per-session ring buffer for SSE event replay.

Pair this with an :class:SSEBroker to support Last-Event-ID resume — when a client reconnects with that header, the SSE response generator drains every event past the supplied ID from the buffer before entering live mode, so the client sees no gap from the server's POV.

The buffer is the single source of truth for event IDs: :meth:record assigns a new monotonic ID per session and returns it, so the live frame and any future replayed frame agree on the ID. The transport wraps that ID into the broker payload as {"_mcp_event_id", "_mcp_payload"} and the SSE response generator unwraps it to emit id: lines.

Implementations should bound their per-session storage — replay buffers without a cap leak when clients never reconnect. The shipped in-memory variant uses a fixed-size :class:collections.deque; the Redis variant uses XADD MAXLEN ~ N for capped streams.

Resume is opt-in: pass sse_replay_buffer=... to :class:MCPServer to enable it. When omitted, the SSE wire shape is unchanged (no id: lines) and Last-Event-ID from clients is silently ignored.

record async

record(session_id: str, payload: Any) -> str

Persist payload for session_id and return its event ID.

The returned ID is what the SSE response emits as the id: line and what the client echoes back via Last-Event-ID on resume. IDs must be monotonic within a session; cross-session ordering is not required.

replay

replay(session_id: str, after_id: str | None) -> AsyncIterator[tuple[str, Any]]

Yield (event_id, payload) pairs strictly after after_id.

after_id=None (no header sent) yields nothing — fresh subscribe is the no-replay path. An after_id that's older than the buffer's oldest retained event yields whatever is still in the ring (best-effort delivery; the client knows it lost some events only by counting). An after_id newer than the latest recorded event yields nothing — the client is already up to date.

forget async

forget(session_id: str) -> None

Drop all retained events for session_id.

Called when a session is explicitly destroyed (DELETE) so dead sessions don't accumulate buffer state. Implementations that rely on TTL-based eviction can no-op this.

InMemorySSEReplayBuffer

In-process bounded replay buffer for SSE event resume.

Each session holds its own :class:collections.deque capped at max_events; the oldest event is evicted when a new one arrives. Event IDs are zero-padded monotonic integers per session — string- valued because the SSE wire format is string-only and clients echo them back verbatim via Last-Event-ID.

Suitable for single-process ASGI deployments. Multi-worker deployments must use :class:RedisSSEReplayBuffer because the streaming GET that handles a resume can land on a different worker than the one that recorded the events.

State is instance-scoped — :class:MCPServer owns one buffer, so multiple servers in the same process don't share replay history.

RedisSSEReplayBuffer

Cross-process replay buffer backed by Redis Streams.

Drop-in replacement for :class:InMemorySSEReplayBuffer when running multiple ASGI workers. The streaming GET that handles a reconnect can land on any worker; reading from a shared Redis Stream means the replay is the same regardless of which worker recorded the events.

Stream IDs are auto-assigned by Redis (ms-seq format) and are monotonic within a session — they double as the SSE event IDs the client echoes back via Last-Event-ID. MAXLEN ~ N caps the retained history per session; the ~ makes trimming approximate (Redis trims when convenient) which is fine for replay buffers.

Wire it into :class:MCPServer::

from redis.asyncio import Redis
from rest_framework_mcp import MCPServer
from rest_framework_mcp.transport.redis_sse_replay_buffer import (
    RedisSSEReplayBuffer,
)

client = Redis.from_url("redis://localhost:6379/0")
buffer = RedisSSEReplayBuffer(client, max_events=2048)
server = MCPServer(name="my-app", sse_broker=..., sse_replay_buffer=buffer)

The Redis client is the consumer's responsibility — close it during ASGI lifespan shutdown.

record async

record(session_id: str, payload: Any) -> str

Append payload to the session's stream and return the assigned ID.

XADD <key> MAXLEN ~ N * data <json> — the * lets Redis choose a monotonic ID; ~ makes trimming approximate (Redis trims at internal node boundaries, which is faster than exact trimming and bounds memory in the same shape).