Registries¶
Tool, resource, and prompt lookup, plus session storage and SSE infrastructure.
ToolBinding wraps a ServiceSpec (mutation tools);
SelectorToolBinding wraps a SelectorSpec and carries the read-shaped
pipeline knobs (filter_set, ordering_fields, paginate). The shared
ToolRegistry accepts either kind and is what tools/list and
tools/call iterate.
ToolBinding
dataclass
¶
Bases: Generic[InputT, ResultT, ExtraT]
All wiring for a single MCP tool, derived from a ServiceSpec.
A tool is the projection of a service callable plus its declared input
and output serializers. The MCP server invokes spec.service directly
via resolve_callable_kwargs + run_service — there is no view or
viewset in the dispatch path.
The Generic[InputT, ResultT, ExtraT] parameters mirror
ServiceSpec's generics and are purely informational for type
checkers. They default to Any when omitted, so existing call sites
keep working unchanged.
SelectorToolBinding
dataclass
¶
Bases: Generic[ResultT, ExtraT]
All wiring for a single MCP read-shaped tool, derived from a SelectorSpec.
Mirrors :class:ToolBinding (which wraps a ServiceSpec for
mutations), but the dispatch pipeline is read-shaped. The shape is
chosen by :attr:kind:
kind=LIST runs the full pipeline:
.. code-block:: text
arguments → validate(merged inputSchema) → run_selector
→ FilterSet(data=...).qs (if ``filter_set`` set)
→ order_by(...) (if ``ordering_fields`` set)
→ paginate (if ``paginate=True``)
→ output_serializer(many=True)
→ ToolResult
kind=RETRIEVE skips the post-fetch pipeline entirely — the
selector's single-instance return goes straight to
output_serializer(many=False). Combining RETRIEVE with
filter_set / ordering_fields / paginate is rejected at
construction (those knobs only make sense on a collection).
Selectors return raw, unscoped data (a queryset for LIST, a
single instance for RETRIEVE) — the tool layer owns shape
decisions. A LIST binding with none of filter_set /
ordering_fields / paginate set behaves like a plain RPC
read that calls the selector and renders its return value verbatim.
The Generic[InputT, ResultT, ExtraT] parameters mirror
SelectorSpec's generics and are purely informational for type
checkers.
kind
property
¶
Shape discriminator — derived from the spec's required kind field.
Sister-repo 0.13+ made kind a required field on
:class:SelectorSpec, so the binding doesn't store an
independent copy — it would only be a chance for the two to
drift. Exposed as a property so the dispatch layer can keep
reading binding.kind unchanged.
ToolRegistry ¶
Name → tool binding lookup.
Holds both :class:ToolBinding (service tools, mutations) and
:class:SelectorToolBinding (selector tools, reads). Names share a
namespace — duplicates are rejected loudly so a misconfigured project
surfaces the conflict at discovery time rather than silently shadowing
a tool.
ResourceBinding
dataclass
¶
Bases: Generic[ResultT]
All wiring for a single MCP resource (or resource template).
A resource is a selector callable plus a URI template. The MCP server
invokes the selector directly via resolve_callable_kwargs +
run_selector — there is no view or viewset in the dispatch path.
output_serializer is consulted by resources/read to render the
selector's return value. mime_type advertises the encoding we will
return — usually "application/json".
kwargs_provider mirrors SelectorSpec.kwargs from
djangorestframework-services >= 0.6: when set, the handler invokes it
once per request and merges the returned dict into the kwarg pool. The
provider receives a synthesised :class:MCPServiceView (URI-template
variables exposed as view.kwargs, the binding name as
view.action).
The Generic[ResultT] parameter is purely informational — it lets
callers pin the selector's return type for IDE / type-checker help.
Defaults to Any when omitted.
ResourceRegistry ¶
URI / URI-template → :class:ResourceBinding lookup.
Concrete resources (no template variables) are matched by exact URI;
templates are matched by regex derived from the template. resolve
returns the binding plus the variable bindings extracted from the URI.
PromptBinding
dataclass
¶
All wiring for a single MCP prompt.
A prompt is a server-defined message-template the client invokes by
name. The render callable receives the client-supplied arguments as
kwargs and returns either:
- a list of :class:
PromptMessageinstances (full control), or - a list of strings (each becomes a user text message), or
- a single string (becomes one user text message), or
- a coroutine yielding any of the above.
The handler normalises whatever shape the callable returns into the spec's
messages list at dispatch time.
PromptRegistry ¶
Name → :class:PromptBinding lookup.
Mirrors :class:ToolRegistry exactly — names are unique, duplicates
raise loudly at registration time.
Bulk registration¶
register_tools(server, definitions, *, selector_defaults=None, service_defaults=None)
is an additive entry point for registering many tools in one call. Pass
a list of ToolDefinition.service(...) / ToolDefinition.selector(...)
instances plus per-kind ServiceDefaults / SelectorDefaults that fill
in fields each definition leaves as None. Returns the resulting
bindings in input order.
register_tools ¶
register_tools(
server: MCPServer,
definitions: Iterable[ToolDefinition],
*,
selector_defaults: SelectorDefaults | None = None,
service_defaults: ServiceDefaults | None = None,
) -> list[ToolBinding | SelectorToolBinding]
Register every :class:ToolDefinition against server.
Defaults dataclasses supply per-kind kwarg defaults that are merged
underneath each definition's own values (definition wins on
conflict — every field on the definition that is not None is
considered "set by the author").
Returns the list of resulting bindings in the same order as
definitions, so test harnesses and observability code can
introspect what landed.
Raises :class:TypeError if a definition's kind is unrecognised
(the discriminator is internal, so this can only happen via direct
:class:ToolDefinition construction with an unsupported value).
ToolDefinition
dataclass
¶
Declarative description of a single tool, fed to :func:register_tools.
ToolDefinition is a transport-agnostic container — it holds the
kwargs that would otherwise be passed to
:meth:MCPServer.register_service_tool or
:meth:MCPServer.register_selector_tool, plus a :class:ToolKind
discriminator that selects between them at dispatch time.
Construct via the classmethods, not the dataclass constructor — the
methods enforce the per-kind kwarg surface (a service definition
can't set filter_set; a selector definition can't omit
input_serializer quietly etc.). Direct construction is available
for tests and tooling but bypasses the type-shape guarantees.
Every per-call kwarg defaults to None; downstream
:func:register_tools treats None as "no override", which lets
a :class:SelectorDefaults / :class:ServiceDefaults instance
supply the value, falling back to the registration method's own
default if neither is set.
service
classmethod
¶
service(
*,
name: str,
spec: ServiceSpec,
description: str | None = None,
title: str | None = None,
display_name: str | None = None,
display_description: str | None = None,
output_format: OutputFormat | None = None,
permissions: Sequence[Any] | None = None,
rate_limits: Sequence[Any] | None = None,
annotations: dict[str, Any] | None = None,
include_structured_content: bool | None = None,
include_output_schema: bool | None = None,
argument_binding: ArgumentBinding | None = None,
unknown_arguments: UnknownArguments | None = None,
always_listed: bool | None = None,
spec_kwargs_provides: Sequence[str] | None = None,
) -> ToolDefinition
Typed entry point for service-tool definitions.
selector
classmethod
¶
selector(
*,
name: str,
spec: SelectorSpec,
description: str | None = None,
title: str | None = None,
display_name: str | None = None,
display_description: str | None = None,
input_serializer: type | None = None,
output_format: OutputFormat | None = None,
permissions: Sequence[Any] | None = None,
rate_limits: Sequence[Any] | None = None,
annotations: dict[str, Any] | None = None,
filter_set: Any | None = None,
ordering_fields: Sequence[str] | None = None,
paginate: bool | None = None,
include_structured_content: bool | None = None,
include_output_schema: bool | None = None,
argument_binding: ArgumentBinding | None = None,
unknown_arguments: UnknownArguments | None = None,
always_listed: bool | None = None,
spec_kwargs_provides: Sequence[str] | None = None,
) -> ToolDefinition
Typed entry point for selector-tool definitions.
The selector's LIST / RETRIEVE shape lives on the spec
(SelectorSpec.kind, required in
djangorestframework-services 0.13+), not here — the bulk
registration loop reads it from there.
ServiceDefaults
dataclass
¶
Per-kind defaults for :func:register_tools over service definitions.
Every field is Optional and None is the "no override"
sentinel — only non-None values are applied as defaults to the
matching :meth:MCPServer.register_service_tool call. A per-
definition value always wins over the default.
Because include_structured_content and include_output_schema
are tri-state on the registration method (None = inherit global
setting, True/False = force), the same None-as-sentinel
convention applies here: ServiceDefaults(include_structured_content=None)
is identical to "no override" — if you want to force every binding
to inherit the global, leave it unset; if you want to force
True/False, pass that explicitly.
SelectorDefaults
dataclass
¶
Per-kind defaults for :func:register_tools over selector definitions.
Sister of :class:ServiceDefaults. Same conventions:
- Every field is
Optional. None= "no override; use the per-definition or the :meth:MCPServer.register_selector_tooldefault".- Per-definition kwargs always win on conflict.
Selector-only knobs (input_serializer, filter_set,
ordering_fields, paginate) live here too so a project that
wants every selector tool to paginate by default can express that
in one place.
ToolKind ¶
Bases: Enum
Discriminator for :class:ToolDefinition and the
:func:register_tools dispatch table.
Internal-only — never appears on the wire. Members map directly to
the two registration entry points on :class:MCPServer:
SERVICE→ :meth:MCPServer.register_service_toolSELECTOR→ :meth:MCPServer.register_selector_tool
Use :meth:ToolDefinition.service / :meth:ToolDefinition.selector
instead of constructing :class:ToolDefinition with this kwarg
directly — the classmethods are the typed entry points.
UnknownArguments ¶
Bases: Enum
How a binding handles MCP arguments keys not declared in its inputSchema.
Decouples "fields I want strictly validated" from "fields the client
may pass through anyway". Built on top of input_serializer, not as
a replacement — DRF validation still runs on the declared fields in
every mode.
Members:
REJECT(default) — the mergedinputSchemaadvertises"additionalProperties": falseand the validator rejects any key that isn't part of the binding's known field set. Failure surfaces as-32602with the offending key names indata.detail["non_field_errors"].PASSTHROUGH— the mergedinputSchemaadvertises"additionalProperties": true; unknown keys survive validation and are merged onto the validated dict before binding to the callable. Useful when the client sends evolving query args (q/cursor/since) that the spec author wants to forward to the callable without restating each in the serializer.IGNORE— the mergedinputSchemaadvertises"additionalProperties": true, but unknown keys are dropped after validation. Forward-compatibility mode: older clients can still send fields newer servers haven't formalised yet, and the server silently accepts and ignores them.
Reserved transport-controlled keys (request / user / data)
and selector-tool post-fetch keys (ordering / page /
limit) are never considered "unknown" in any mode — they're
handled by the dispatch pipeline, not the validator.
Plain :class:Enum, same discipline as :class:ArgumentBinding:
internal-only value, no string coercion at API boundaries.
ArgumentBinding ¶
Bases: Enum
How MCP arguments flow into the kwarg pool of the dispatched callable.
The pool always carries request and user. This enum controls
whether MCP arguments show up as a single data= key (the
historical behavior, well-suited to mutation-shaped services that take
an input_serializer-validated dict) or are spread as top-level
pool keys so the callable can declare them as individual parameters
(the natural shape for selectors and parametric reads).
Members:
DATA_ONLY— MCPargumentsenter the pool only asdata=<validated-or-raw>. Default for :class:ToolBinding(service tools), since mutation services typically take a single validateddatapayload.MERGE— every key from the validated MCP arguments (or, when noinput_serializeris declared, the raw arguments minus the pipeline-reserved keysordering/page/limit) is added to the pool.spec.kwargs(...)output overrides on conflict — author-declared kwargs win over client-supplied ones, a critical invariant for project-scoping selectors. Default for :class:SelectorToolBinding(selector tools).REPLACE— likeMERGE, butspec.kwargs(...)loses on conflict. Useful only when the kwargs provider supplies defaults the client is allowed to override.
The value is internal — never appears on the wire, never coerced
from a string. ArgumentBinding is a plain :class:Enum (not
str, Enum); pass the member directly when registering tools.
Selector-tool schema¶
Helpers that build the merged inputSchema for selector tools — exposed
for projects that want to introspect filter / ordering / pagination
property generation outside of the registration flow.
filterset_to_schema_properties ¶
Map a django-filter FilterSet class to JSON Schema properties.
Walks FilterSet.declared_filters plus any auto-generated filters
(from a Meta declaration) and returns a property dict shaped like
the "properties" key of a JSON Schema object — ready to merge into
a tool's inputSchema.
Filter properties are always optional from MCP's perspective: they
narrow the queryset but are not required to call the tool. The merger
in :mod:rest_framework_mcp.schema.input_schema does not add filter
names to the required array.
Common filter classes get accurate JSON Schema mappings; exotic
classes fall back to {} (the JSON Schema "any value" shape) so a
custom filter never breaks tool discovery. Tests document which
classes are precisely mapped.
Raises ImportError when django-filter isn't installed — this
only fires when a binding is actually constructed with
filter_set=..., so projects that don't use the integration are
unaffected.
build_selector_tool_input_schema ¶
Build the JSON Schema for a selector tool's inputSchema.
Merges four sources, in order of precedence (later sources override earlier ones on key collision):
spec.input_serializer— any explicit input shape declared by the consumer (e.g. for tool-specific args that aren't filter params). All required-marked fields stay required.filter_set— properties derived fromdjango-filterfilter declarations. All optional.ordering_fields— adds anorderingproperty as an enum of"<field>"and"-<field>"values. Optional.paginate=True— adds optionalpage(positive integer) andlimit(positive integer) properties.
The final schema is always an object with "type": "object",
"properties": {...}, and "required": [...] only when at
least one required field exists.
Session stores¶
SessionStore ¶
Bases: Protocol
Pluggable persistence for MCP-Session-Id lifecycle.
The transport calls :meth:create after a successful initialize —
binding the new session to the authenticated principal — and
:meth:owner on every subsequent request to enforce both that clients
re-initialize after a server restart and that a session minted under
one principal cannot be presented by another. :meth:destroy is
invoked on HTTP DELETE (after the same ownership check).
principal_id is an opaque string the transport derives from the
authenticated token (see
:func:rest_framework_mcp.transport.utils.principal_for_token);
stores persist and return it verbatim.
.. versionchanged:: 0.7
:meth:create takes a required keyword-only principal_id and
:meth:owner joined the protocol. Custom store implementations
must add both; storing the principal alongside the session id is
the only new obligation.
InMemorySessionStore ¶
Process-local session store. Useful for tests and single-process dev servers.
State lives on the instance, so each store is isolated. Multi-process
deployments should use :class:DjangoCacheSessionStore instead — this
class will not see sessions created in another process.
DjangoCacheSessionStore ¶
Session store backed by django.core.cache.
Works across processes — the production-suitable default. TTL is fixed
at 24 hours; for stricter pinning, projects can subclass and override
:meth:create.
The cached value is the owning principal id, so :meth:owner is a
single cache read. Sessions written by pre-0.7 versions stored True
instead of a principal — those fail the ownership comparison and the
client transparently re-initializes.
Server-initiated push¶
SSEBroker ¶
Bases: Protocol
Pluggable pub/sub for server-pushed MCP messages.
The transport calls :meth:subscribe when a client opens
GET /mcp/, :meth:publish from app code that wants to push a
payload to a specific session, and :meth:unsubscribe when the
streaming generator unwinds.
Two concrete implementations ship today:
- :class:
InMemorySSEBroker— single-process, no infra. Suitable for development and single-worker ASGI deployments. - :class:
RedisSSEBroker— Redis pub/sub. Required for multi-worker deployments where any worker can serve the streaming GET. Pulled in via the[redis]optional extra.
The contract is intentionally narrow: a session has at most one live
subscriber; publish returns True if a delivery was attempted,
False if no subscriber was attached. Implementations decide whether
publish is fire-and-forget or awaits delivery confirmation; the MCP
transport treats it as best-effort either way.
InMemorySSEBroker ¶
In-process per-session pub/sub for server-pushed MCP messages.
Each subscribed session gets a private :class:asyncio.Queue. App code
running in the same Python process publishes to it via :meth:publish;
the streaming GET generator pulls off the queue and emits SSE frames.
State is instance-scoped — the :class:MCPServer owns one broker, so
multiple servers in the same process don't share state. Multi-process
deployments need an out-of-process backend; see
:class:RedisSSEBroker (in the [redis] extra) for the production
choice.
The broker enforces a single subscriber per session — if a client
re-subscribes (e.g. after a dropped connection), the previous queue is
replaced and the old generator will eventually error out on its next
await. There is no replay; clients that need durability should call
tools/call directly rather than relying on SSE.
unsubscribe ¶
Remove queue from the registry if it's still the live subscriber.
Compares by identity so a re-subscribed session doesn't accidentally unregister the new queue when the old generator shuts down.
publish
async
¶
Enqueue payload for session_id if a subscriber exists.
Returns True if delivery was attempted, False if the session
had no subscriber. The caller decides how to react to a miss — most
callers will ignore it (the client will catch up via a fresh
tools/call round-trip).
RedisSSEBroker ¶
Cross-process SSE broker backed by Redis pub/sub.
Drop-in replacement for :class:InMemorySSEBroker when running
multiple ASGI workers behind a load balancer. The streaming GET handler
can land on any worker; await server.notify(...) from a different
worker reaches the right session because every worker subscribes to the
same Redis channel.
Each session subscribes to its own Redis channel (<prefix>:<session_id>)
and runs a background asyncio.Task that pulls messages off the
Redis pub/sub stream and pushes them onto a local
:class:asyncio.Queue — the same queue shape the SSE response generator
expects. JSON encode/decode happens at the broker boundary so app code
pushes Python dicts and the streaming generator sees them as dicts too.
Wire it into :class:MCPServer:
.. code-block:: python
from redis.asyncio import Redis
from rest_framework_mcp import MCPServer
from rest_framework_mcp.transport.redis_sse_broker import RedisSSEBroker
broker = RedisSSEBroker(Redis.from_url("redis://localhost:6379/0"))
server = MCPServer(name="my-app", sse_broker=broker)
Caveats:
- Same single-subscriber-per-session contract as the in-memory broker (re-subscribing replaces the old subscriber's queue).
- Message replay is a separate, opt-in collaborator — pair this with
:class:
RedisSSEReplayBuffer(passed asMCPServer(sse_replay_buffer=...)) for cross-workerLast-Event-IDresume. - The Redis client's lifecycle is the consumer's responsibility — close it during ASGI lifespan shutdown.
publish
async
¶
Publish to the session's channel and report whether anyone received it.
redis.publish returns the number of subscribers that got the
message; we surface True when at least one listener was attached
(typical case), False otherwise. Note that "0 subscribers" can
also mean the streaming task hasn't connected yet — callers that
require strict at-least-once delivery should layer their own retry.
has_subscriber ¶
Local-only check.
Reflects whether this worker has an active subscriber. Across- process visibility would require an extra Redis round-trip and isn't useful for the typical caller (the streaming generator only cares about its own queue).
SSE replay (resume)¶
SSEReplayBuffer ¶
Bases: Protocol
Pluggable per-session ring buffer for SSE event replay.
Pair this with an :class:SSEBroker to support
Last-Event-ID
resume — when a client reconnects with that header, the SSE response
generator drains every event past the supplied ID from the buffer
before entering live mode, so the client sees no gap from the
server's POV.
The buffer is the single source of truth for event IDs:
:meth:record assigns a new monotonic ID per session and returns it,
so the live frame and any future replayed frame agree on the ID. The
transport wraps that ID into the broker payload as
{"_mcp_event_id", "_mcp_payload"} and the SSE response generator
unwraps it to emit id: lines.
Implementations should bound their per-session storage — replay
buffers without a cap leak when clients never reconnect. The shipped
in-memory variant uses a fixed-size :class:collections.deque; the
Redis variant uses XADD MAXLEN ~ N for capped streams.
Resume is opt-in: pass sse_replay_buffer=... to
:class:MCPServer to enable it. When omitted, the SSE wire shape is
unchanged (no id: lines) and Last-Event-ID from clients is
silently ignored.
record
async
¶
Persist payload for session_id and return its event ID.
The returned ID is what the SSE response emits as the id: line
and what the client echoes back via Last-Event-ID on resume.
IDs must be monotonic within a session; cross-session ordering
is not required.
replay ¶
Yield (event_id, payload) pairs strictly after after_id.
after_id=None (no header sent) yields nothing — fresh subscribe
is the no-replay path. An after_id that's older than the
buffer's oldest retained event yields whatever is still in the
ring (best-effort delivery; the client knows it lost some events
only by counting). An after_id newer than the latest recorded
event yields nothing — the client is already up to date.
forget
async
¶
Drop all retained events for session_id.
Called when a session is explicitly destroyed (DELETE) so dead sessions don't accumulate buffer state. Implementations that rely on TTL-based eviction can no-op this.
InMemorySSEReplayBuffer ¶
In-process bounded replay buffer for SSE event resume.
Each session holds its own :class:collections.deque capped at
max_events; the oldest event is evicted when a new one arrives.
Event IDs are zero-padded monotonic integers per session — string-
valued because the SSE wire format is string-only and clients echo
them back verbatim via Last-Event-ID.
Suitable for single-process ASGI deployments. Multi-worker
deployments must use :class:RedisSSEReplayBuffer because the
streaming GET that handles a resume can land on a different worker
than the one that recorded the events.
State is instance-scoped — :class:MCPServer owns one buffer, so
multiple servers in the same process don't share replay history.
RedisSSEReplayBuffer ¶
Cross-process replay buffer backed by Redis Streams.
Drop-in replacement for :class:InMemorySSEReplayBuffer when running
multiple ASGI workers. The streaming GET that handles a reconnect
can land on any worker; reading from a shared Redis Stream means the
replay is the same regardless of which worker recorded the events.
Stream IDs are auto-assigned by Redis (ms-seq format) and are
monotonic within a session — they double as the SSE event IDs the
client echoes back via Last-Event-ID. MAXLEN ~ N caps the
retained history per session; the ~ makes trimming approximate
(Redis trims when convenient) which is fine for replay buffers.
Wire it into :class:MCPServer::
from redis.asyncio import Redis
from rest_framework_mcp import MCPServer
from rest_framework_mcp.transport.redis_sse_replay_buffer import (
RedisSSEReplayBuffer,
)
client = Redis.from_url("redis://localhost:6379/0")
buffer = RedisSSEReplayBuffer(client, max_events=2048)
server = MCPServer(name="my-app", sse_broker=..., sse_replay_buffer=buffer)
The Redis client is the consumer's responsibility — close it during ASGI lifespan shutdown.
record
async
¶
Append payload to the session's stream and return the assigned ID.
XADD <key> MAXLEN ~ N * data <json> — the * lets Redis
choose a monotonic ID; ~ makes trimming approximate (Redis
trims at internal node boundaries, which is faster than exact
trimming and bounds memory in the same shape).