Registries¶
Tool, resource, and prompt lookup, plus session storage.
ToolBinding wraps a ServiceSpec (mutation tools);
SelectorToolBinding wraps a SelectorSpec and carries the read-shaped
pipeline knobs (filter_set, ordering_fields, paginate). The shared
ToolRegistry accepts either kind and is what tools/list and
tools/call iterate.
ToolBinding
dataclass
¶
Bases: Generic[InputT, ResultT, ExtraT]
All wiring for a single MCP tool, derived from a ServiceSpec.
A tool is the projection of a service callable plus its declared input
and output serializers. The MCP server invokes spec.service directly
via resolve_callable_kwargs + run_service — there is no view or
viewset in the dispatch path.
The Generic[InputT, ResultT, ExtraT] parameters mirror
ServiceSpec's generics and are purely informational for type
checkers. They default to Any when omitted, so existing call sites
keep working unchanged.
SelectorToolBinding
dataclass
¶
Bases: Generic[ResultT, ExtraT]
All wiring for a single MCP read-shaped tool, derived from a SelectorSpec.
Mirrors :class:ToolBinding (which wraps a ServiceSpec for
mutations), but the dispatch pipeline is read-shaped:
.. code-block:: text
arguments → validate(merged inputSchema) → run_selector
→ FilterSet(data=...).qs (if ``filter_set`` set)
→ order_by(...) (if ``ordering_fields`` set)
→ paginate (if ``paginate=True``)
→ output_serializer(many=True)
→ ToolResult
Selectors return raw, unscoped querysets — the tool layer owns the
post-fetch pipeline. A binding with none of filter_set /
ordering_fields / paginate set behaves like a plain RPC read
that calls the selector and renders its return value verbatim.
The Generic[InputT, ResultT, ExtraT] parameters mirror
SelectorSpec's generics and are purely informational for type
checkers.
ToolRegistry ¶
Name → tool binding lookup.
Holds both :class:ToolBinding (service tools, mutations) and
:class:SelectorToolBinding (selector tools, reads). Names share a
namespace — duplicates are rejected loudly so a misconfigured project
surfaces the conflict at discovery time rather than silently shadowing
a tool.
ResourceBinding
dataclass
¶
Bases: Generic[ResultT]
All wiring for a single MCP resource (or resource template).
A resource is a selector callable plus a URI template. The MCP server
invokes the selector directly via resolve_callable_kwargs +
run_selector — there is no view or viewset in the dispatch path.
output_serializer is consulted by resources/read to render the
selector's return value. mime_type advertises the encoding we will
return — usually "application/json".
kwargs_provider mirrors SelectorSpec.kwargs from
djangorestframework-services >= 0.6: when set, the handler invokes it
once per request and merges the returned dict into the kwarg pool. The
provider receives a synthesised :class:MCPServiceView (URI-template
variables exposed as view.kwargs, the binding name as
view.action).
The Generic[ResultT] parameter is purely informational — it lets
callers pin the selector's return type for IDE / type-checker help.
Defaults to Any when omitted.
ResourceRegistry ¶
URI / URI-template → :class:ResourceBinding lookup.
Concrete resources (no template variables) are matched by exact URI;
templates are matched by regex derived from the template. resolve
returns the binding plus the variable bindings extracted from the URI.
PromptBinding
dataclass
¶
All wiring for a single MCP prompt.
A prompt is a server-defined message-template the client invokes by
name. The render callable receives the client-supplied arguments as
kwargs and returns either:
- a list of :class:
PromptMessageinstances (full control), or - a list of strings (each becomes a user text message), or
- a single string (becomes one user text message), or
- a coroutine yielding any of the above.
The handler normalises whatever shape the callable returns into the spec's
messages list at dispatch time.
PromptRegistry ¶
Name → :class:PromptBinding lookup.
Mirrors :class:ToolRegistry exactly — names are unique, duplicates
raise loudly at registration time.
Selector-tool schema¶
Helpers that build the merged inputSchema for selector tools — exposed
for projects that want to introspect filter / ordering / pagination
property generation outside of the registration flow.
filterset_to_schema_properties ¶
Map a django-filter FilterSet class to JSON Schema properties.
Walks FilterSet.declared_filters plus any auto-generated filters
(from a Meta declaration) and returns a property dict shaped like
the "properties" key of a JSON Schema object — ready to merge into
a tool's inputSchema.
Filter properties are always optional from MCP's perspective: they
narrow the queryset but are not required to call the tool. The merger
in :mod:rest_framework_mcp.schema.input_schema does not add filter
names to the required array.
Common filter classes get accurate JSON Schema mappings; exotic
classes fall back to {} (the JSON Schema "any value" shape) so a
custom filter never breaks tool discovery. Tests document which
classes are precisely mapped.
Raises ImportError when django-filter isn't installed — this
only fires when a binding is actually constructed with
filter_set=..., so projects that don't use the integration are
unaffected.
build_selector_tool_input_schema ¶
Build the JSON Schema for a selector tool's inputSchema.
Merges four sources, in order of precedence (later sources override earlier ones on key collision):
spec.input_serializer— any explicit input shape declared by the consumer (e.g. for tool-specific args that aren't filter params). All required-marked fields stay required.filter_set— properties derived fromdjango-filterfilter declarations. All optional.ordering_fields— adds anorderingproperty as an enum of"<field>"and"-<field>"values. Optional.paginate=True— adds optionalpage(positive integer) andlimit(positive integer) properties.
The final schema is always an object with "type": "object",
"properties": {...}, and "required": [...] only when at
least one required field exists.
Session stores¶
SessionStore ¶
Bases: Protocol
Pluggable persistence for MCP-Session-Id lifecycle.
The transport calls :meth:create after a successful initialize and
:meth:exists on every subsequent request to enforce that clients
re-initialize after a server restart. :meth:destroy is invoked on
HTTP DELETE.
Stores need not retain rich state today (the MCP spec only demands the ID is recognised), but the interface leaves room for future extension such as protocol-version pinning per session.
InMemorySessionStore ¶
Process-local session store. Useful for tests and single-process dev servers.
State lives on the instance, so each store is isolated. Multi-process
deployments should use :class:DjangoCacheSessionStore instead — this
class will not see sessions created in another process.
DjangoCacheSessionStore ¶
Session store backed by django.core.cache.
Works across processes — the production-suitable default. TTL is fixed
at 24 hours; for stricter pinning, projects can subclass and override
:meth:create.
Server-initiated push¶
SSEBroker ¶
Bases: Protocol
Pluggable pub/sub for server-pushed MCP messages.
The transport calls :meth:subscribe when a client opens
GET /mcp/, :meth:publish from app code that wants to push a
payload to a specific session, and :meth:unsubscribe when the
streaming generator unwinds.
Two concrete implementations ship today:
- :class:
InMemorySSEBroker— single-process, no infra. Suitable for development and single-worker ASGI deployments. - :class:
RedisSSEBroker— Redis pub/sub. Required for multi-worker deployments where any worker can serve the streaming GET. Pulled in via the[redis]optional extra.
The contract is intentionally narrow: a session has at most one live
subscriber; publish returns True if a delivery was attempted,
False if no subscriber was attached. Implementations decide whether
publish is fire-and-forget or awaits delivery confirmation; the MCP
transport treats it as best-effort either way.
InMemorySSEBroker ¶
In-process per-session pub/sub for server-pushed MCP messages.
Each subscribed session gets a private :class:asyncio.Queue. App code
running in the same Python process publishes to it via :meth:publish;
the streaming GET generator pulls off the queue and emits SSE frames.
State is instance-scoped — the :class:MCPServer owns one broker, so
multiple servers in the same process don't share state. Multi-process
deployments need an out-of-process backend; see
:class:RedisSSEBroker (in the [redis] extra) for the production
choice.
The broker enforces a single subscriber per session — if a client
re-subscribes (e.g. after a dropped connection), the previous queue is
replaced and the old generator will eventually error out on its next
await. There is no replay; clients that need durability should call
tools/call directly rather than relying on SSE.
unsubscribe ¶
Remove queue from the registry if it's still the live subscriber.
Compares by identity so a re-subscribed session doesn't accidentally unregister the new queue when the old generator shuts down.
publish
async
¶
Enqueue payload for session_id if a subscriber exists.
Returns True if delivery was attempted, False if the session
had no subscriber. The caller decides how to react to a miss — most
callers will ignore it (the client will catch up via a fresh
tools/call round-trip).
RedisSSEBroker ¶
Cross-process SSE broker backed by Redis pub/sub.
Drop-in replacement for :class:InMemorySSEBroker when running
multiple ASGI workers behind a load balancer. The streaming GET handler
can land on any worker; await server.notify(...) from a different
worker reaches the right session because every worker subscribes to the
same Redis channel.
Each session subscribes to its own Redis channel (<prefix>:<session_id>)
and runs a background asyncio.Task that pulls messages off the
Redis pub/sub stream and pushes them onto a local
:class:asyncio.Queue — the same queue shape the SSE response generator
expects. JSON encode/decode happens at the broker boundary so app code
pushes Python dicts and the streaming generator sees them as dicts too.
Wire it into :class:MCPServer:
.. code-block:: python
from redis.asyncio import Redis
from rest_framework_mcp import MCPServer
from rest_framework_mcp.transport.redis_sse_broker import RedisSSEBroker
broker = RedisSSEBroker(Redis.from_url("redis://localhost:6379/0"))
server = MCPServer(name="my-app", sse_broker=broker)
Caveats:
- Same single-subscriber-per-session contract as the in-memory broker (re-subscribing replaces the old subscriber's queue).
- No message replay;
Last-Event-IDresume is a separate feature tracked in Phase 7c. - The Redis client's lifecycle is the consumer's responsibility — close it during ASGI lifespan shutdown.
publish
async
¶
Publish to the session's channel and report whether anyone received it.
redis.publish returns the number of subscribers that got the
message; we surface True when at least one listener was attached
(typical case), False otherwise. Note that "0 subscribers" can
also mean the streaming task hasn't connected yet — callers that
require strict at-least-once delivery should layer their own retry.
has_subscriber ¶
Local-only check.
Reflects whether this worker has an active subscriber. Across- process visibility would require an extra Redis round-trip and isn't useful for the typical caller (the streaming generator only cares about its own queue).
SSE replay (resume)¶
SSEReplayBuffer ¶
Bases: Protocol
Pluggable per-session ring buffer for SSE event replay.
Pair this with an :class:SSEBroker to support
Last-Event-ID
resume — when a client reconnects with that header, the SSE response
generator drains every event past the supplied ID from the buffer
before entering live mode, so the client sees no gap from the
server's POV.
The buffer is the single source of truth for event IDs:
:meth:record assigns a new monotonic ID per session and returns it,
so the live frame and any future replayed frame agree on the ID. The
transport wraps that ID into the broker payload as
{"_mcp_event_id", "_mcp_payload"} and the SSE response generator
unwraps it to emit id: lines.
Implementations should bound their per-session storage — replay
buffers without a cap leak when clients never reconnect. The shipped
in-memory variant uses a fixed-size :class:collections.deque; the
Redis variant uses XADD MAXLEN ~ N for capped streams.
Resume is opt-in: pass sse_replay_buffer=... to
:class:MCPServer to enable it. When omitted, the SSE wire shape is
unchanged (no id: lines) and Last-Event-ID from clients is
silently ignored.
record
async
¶
Persist payload for session_id and return its event ID.
The returned ID is what the SSE response emits as the id: line
and what the client echoes back via Last-Event-ID on resume.
IDs must be monotonic within a session; cross-session ordering
is not required.
replay ¶
Yield (event_id, payload) pairs strictly after after_id.
after_id=None (no header sent) yields nothing — fresh subscribe
is the no-replay path. An after_id that's older than the
buffer's oldest retained event yields whatever is still in the
ring (best-effort delivery; the client knows it lost some events
only by counting). An after_id newer than the latest recorded
event yields nothing — the client is already up to date.
forget
async
¶
Drop all retained events for session_id.
Called when a session is explicitly destroyed (DELETE) so dead sessions don't accumulate buffer state. Implementations that rely on TTL-based eviction can no-op this.
InMemorySSEReplayBuffer ¶
In-process bounded replay buffer for SSE event resume.
Each session holds its own :class:collections.deque capped at
max_events; the oldest event is evicted when a new one arrives.
Event IDs are zero-padded monotonic integers per session — string-
valued because the SSE wire format is string-only and clients echo
them back verbatim via Last-Event-ID.
Suitable for single-process ASGI deployments. Multi-worker
deployments must use :class:RedisSSEReplayBuffer because the
streaming GET that handles a resume can land on a different worker
than the one that recorded the events.
State is instance-scoped — :class:MCPServer owns one buffer, so
multiple servers in the same process don't share replay history.
RedisSSEReplayBuffer ¶
Cross-process replay buffer backed by Redis Streams.
Drop-in replacement for :class:InMemorySSEReplayBuffer when running
multiple ASGI workers. The streaming GET that handles a reconnect
can land on any worker; reading from a shared Redis Stream means the
replay is the same regardless of which worker recorded the events.
Stream IDs are auto-assigned by Redis (ms-seq format) and are
monotonic within a session — they double as the SSE event IDs the
client echoes back via Last-Event-ID. MAXLEN ~ N caps the
retained history per session; the ~ makes trimming approximate
(Redis trims when convenient) which is fine for replay buffers.
Wire it into :class:MCPServer::
from redis.asyncio import Redis
from rest_framework_mcp import MCPServer
from rest_framework_mcp.transport.redis_sse_replay_buffer import (
RedisSSEReplayBuffer,
)
client = Redis.from_url("redis://localhost:6379/0")
buffer = RedisSSEReplayBuffer(client, max_events=2048)
server = MCPServer(name="my-app", sse_broker=..., sse_replay_buffer=buffer)
The Redis client is the consumer's responsibility — close it during ASGI lifespan shutdown.
record
async
¶
Append payload to the session's stream and return the assigned ID.
XADD <key> MAXLEN ~ N * data <json> — the * lets Redis
choose a monotonic ID; ~ makes trimming approximate (Redis
trims at internal node boundaries, which is faster than exact
trimming and bounds memory in the same shape).