Skip to main content

Installation

pip install ctxprotocol
With optional FastAPI support:
pip install ctxprotocol[fastapi]

Requirements

  • Python 3.10+
  • httpx (async HTTP)
  • pydantic (type validation)
  • pyjwt[crypto] (JWT verification)

Prerequisites

Before using the API, complete setup at ctxprotocol.com:
1

Sign in

Creates your embedded wallet
2

Set spending cap

Approve USDC spending on the ContextRouter (one-time setup)
3

Fund wallet

Add USDC for tool execution fees
4

Generate API key

In Settings page

Quick Start

import asyncio
from ctxprotocol import ContextClient

async def main():
    async with ContextClient(api_key="sk_live_...") as client:
        answer = await client.query.run(
            query="What are the top whale movements on Base?",
            response_shape="answer_with_evidence",
        )
        print(answer.response)
        print(answer.summary)
        print(answer.evidence.facts if answer.evidence else None)

asyncio.run(main())
Want per-call pricing and spending limits? The SDK also supports Execute mode for direct method calls inside session budgets. See Two SDK Modes below.

Two SDK Modes

The SDK offers two payment models:
ModeMethodPayment ModelUse Case
Queryclient.query.run()Pay-per-responseComplex questions, multi-tool synthesis, curated intelligence
Executeclient.tools.execute()Per call (with spending limit)Deterministic pipelines, raw outputs, explicit cost control
You have access to both modes. Pick the one that fits your use case.
  • Use Query (client.query.run()) when you want a managed librarian contract where Context handles discovery/orchestration (up to 100 MCP calls per response turn) and returns answer_with_evidence or evidence_only. Pay-per-response (~$0.10).
  • Use Execute (client.tools.execute()) when your app/agent is the librarian and you want per-call pricing with spending limits (~$0.001/call).
Most developers start with Query and add Execute later for specific pipelines that need raw data or explicit cost control. You can use both in the same application.

Execute Quick Start

import asyncio
from ctxprotocol import ContextClient

async def main():
    async with ContextClient(api_key="sk_live_...") as client:
        tools = await client.discovery.search(
            "gas prices",
            mode="execute",
            surface="execute",
            require_execute_pricing=True,
        )
        session = await client.tools.start_session(max_spend_usd="1.00")
        method = tools[0].mcp_tools[0]
        result = await client.tools.execute(
            tool_id=tools[0].id,
            tool_name=method.name,
            args={"chainId": 1},
            session_id=session.session.session_id,
        )
        print(result.result)
        print(result.session)  # method_price, spent, remaining, max_spend, ...

asyncio.run(main())
Full working example: See examples/client/execute_client.py for a complete Execute-mode client with multi-call session management and spend tracking.
Mixed listings are first-class: one listing can expose methods to both modes. Methods without explicit execute pricing remain Query-only until pricing metadata is added.
Compatibility: payload fields like price and price_per_query are kept for backward compatibility. In Query mode, they represent listing-level price per response turn. A future major release can add response-named aliases (for example, price_per_response) before deprecating legacy names.

Configuration

Client Options

OptionTypeRequiredDefaultDescription
api_keystrYes-Your Context Protocol API key
base_urlstrNohttps://www.ctxprotocol.comAPI base URL (for development)
import os
from ctxprotocol import ContextClient

# Production
client = ContextClient(api_key=os.environ["CONTEXT_API_KEY"])

# Local development
client = ContextClient(
    api_key="sk_test_...",
    base_url="http://localhost:3000",
)
Always use async with context manager or call await client.close() when done to properly release resources.
The Python SDK automatically retries transient failures (HTTP 5xx, transport errors, and timeouts) with exponential backoff.

API Reference

Discovery

client.discovery.search(query, limit?)

Search for tools matching a query string, with optional mode-aware filters. Parameters:
ParameterTypeRequiredDescription
querystrYesSearch query
limitintNoMaximum results to return (1-50)
mode"query" | "execute"NoDiscovery mode with billing semantics
surface"answer" | "execute" | "both"NoMethod mode filter
query_eligibleboolNoRequire methods marked query eligible
require_execute_pricingboolNoRequire explicit method execute pricing
exclude_latency_classeslist["instant" | "fast" | "slow" | "streaming"]NoExclude by latency class
exclude_slowboolNoConvenience filter for query mode
favorites_onlyboolNoOverride the account-level Favorites-Only Auto Mode for this request
Returns: list[Tool]
tools = await client.discovery.search("ethereum gas", limit=10)

execute_tools = await client.discovery.search(
    "ethereum gas",
    mode="execute",
    surface="execute",
    require_execute_pricing=True,
)

favorite_only_tools = await client.discovery.search(
    "crypto",
    favorites_only=True,
)
If you enable Favorites-Only Auto Mode in Settings, SDK discovery and query requests made with the same API key inherit that account default automatically. Set favorites_only=True or favorites_only=False per request to override it. On Query requests with explicit tools, manual tool selection wins and favorites_only is ignored.

Get featured/popular tools. Parameters:
ParameterTypeRequiredDescription
limitintNoMaximum results to return
mode"query" | "execute"NoOptional discovery mode
surface"answer" | "execute" | "both"NoOptional method mode filter
query_eligibleboolNoOptional query-safe filter
require_execute_pricingboolNoOptional execute-price filter
Returns: list[Tool]
featured = await client.discovery.get_featured(limit=5)
featured_execute = await client.discovery.get_featured(
    limit=5,
    mode="execute",
    require_execute_pricing=True,
)

client.discovery.get(tool_id)

Fetch one tool listing by UUID. Parameters:
ParameterTypeRequiredDescription
tool_idstrYesMarketplace tool UUID
Returns: Tool
tool = await client.discovery.get("uuid-of-tool")
print(tool.name)
print(tool.mcp_tools)

Tools (Execute Mode)

client.tools.execute(tool_id, tool_name, args?)

Execute a single tool method. Execute calls can run inside a session budget (max_spend_usd) with automatic payment after delivery. Parameters:
OptionTypeRequiredDescription
tool_idstrYesUUID of the tool
tool_namestrYesName of the method to call
argsdictNoArguments matching the tool’s inputSchema
idempotency_keystrNoOptional idempotency key (UUID recommended)
mode"execute"NoExplicit mode label (defaults to "execute")
session_idstrNoExecute session ID to accrue spend against
max_spend_usdstrNoOptional inline session budget (if no session_id)
close_sessionboolNoRequest session closure after this call settles
Returns: ExecutionResult
session = await client.tools.start_session(max_spend_usd="2.50")

result = await client.tools.execute(
    tool_id="uuid-of-tool",
    tool_name="get_gas_prices",
    args={"chainId": 1},
    idempotency_key="2bb4bdcb-8609-43f6-af13-75279186de70",
    session_id=session.session.session_id,
)
print(result.method.execute_price_usd)  # explicit method price
print(result.session)  # method_price, spent, remaining, max_spend, ...

client.tools.start_session(max_spend_usd)

Start an execute session budget envelope.
started = await client.tools.start_session(max_spend_usd="5.00")
print(started.session.session_id)
print(started.session.max_spend)

client.tools.get_session(session_id)

Fetch current execute session status/spend.
status = await client.tools.get_session("sess_123")
print(status.session.status)  # open | closed | expired
print(status.session.spent)

client.tools.close_session(session_id)

Close an execute session and trigger final flush behavior.
closed = await client.tools.close_session("sess_123")
print(closed.session.status)  # closed

Developer

client.developer.update_tool(tool_id, ...)

Update contributor-owned tool metadata programmatically. This is for developers managing their own listings, not for buyer query execution. Parameters:
ParameterTypeRequiredDescription
tool_idstrYesTool UUID
namestrNoNew display name
descriptionstrNoNew description
suggested_promptslist[str]NoSuggested buyer prompts
categorystrNoListing category
Returns: dict[str, Any]
updated = await client.developer.update_tool(
    "uuid-of-tool",
    description="Updated coverage and data freshness notes.",
    suggested_prompts=["Which Base wallets had the largest inflows today?"],
)

print(updated)

Query (Pay-Per-Response)

The Query API is Context’s response marketplace. Instead of buying raw API calls, you’re buying curated intelligence. Ask a question, pay once, and get a grounded, managed answer.

client.query.run(query, tools?, favorites_only?, agent_model_id?, response_shape?, include_data?, include_data_url?, include_developer_trace?, idempotency_key?)

Run an agentic query. The managed runtime handles tool discovery, ambiguity resolution, multi-tool execution (up to 100 MCP calls per response turn as a safety cap), and grounding — and returns the selected Query response contract (answer_with_evidence or evidence_only, default answer_with_evidence). Query billing is pay-per-response with automatic payment after delivery. Parameters:
OptionTypeRequiredDescription
querystrYesNatural-language question
toolslist[str]NoTool IDs to use (auto-discover if omitted)
favorites_onlyboolNoOverride the account-level Favorites-Only Auto Mode for this request
agent_model_idstrNoMain librarian agent model ID (omit to use DEFAULT_AGENT_MODEL_ID)
response_shape"answer_with_evidence" | "evidence_only"NoStructured response mode (default: answer_with_evidence).
include_databoolNoInclude bounded execution data inline. Large payloads return a preview plus full-data references
include_data_urlboolNoPersist execution data to blob and return a URL
include_developer_traceboolNoInclude optional developer trace + orchestration diagnostics
idempotency_keystrNoOptional idempotency key (UUID recommended)
resume_fromQueryAttemptReferenceNoResume from a previous Query attempt handle
fork_fromQueryForkReferenceNoFork a previous Query attempt into a new branch
Returns: QueryResult
answer = await client.query.run("What are the top whale movements on Base?")
print(answer.response)      # response text or summary
print(answer.tools_used)    # [QueryToolUsage(id, name, skill_calls)]
print(answer.cost)          # QueryCost(model_cost_usd, tool_cost_usd, total_cost_usd)
# `orchestration_metrics` and `developer_trace` are only populated when
# `include_developer_trace=True` is passed — see the next example.
answer = await client.query.run(
    query="Analyze whale activity on Base",
    favorites_only=True,
    agent_model_id="kimi-k2.6-model",
    response_shape="answer_with_evidence",
    include_data=True,
    include_data_url=True,
    include_developer_trace=True,
    idempotency_key="6e7f1389-f72f-41d9-bf26-0608a4d8be87",
)

print(answer.response_shape)  # "answer_with_evidence"
print(answer.summary)  # short machine-friendly summary
print(answer.evidence.facts if answer.evidence else None)
print(answer.artifacts.data_url if answer.artifacts else None)
print(answer.freshness.as_of if answer.freshness else None)
print(answer.confidence.level if answer.confidence else None)
print(answer.developer_trace.summary if answer.developer_trace else None)
print(
    answer.developer_trace.diagnostics.selection
    if answer.developer_trace and answer.developer_trace.diagnostics
    else None
)
agent_model_id lets headless users choose the main librarian agent model explicitly. If omitted, the API uses its managed default agent model. Internal tool selection remains managed by the server. If response_shape is evidence_only, Context skips the extra prose synthesis layer, but the librarian agent still runs to fetch, compute, and ground the result.Import AGENT_MODEL_IDS and DEFAULT_AGENT_MODEL_ID from ctxprotocol to see the current supported slugs. Omit agent_model_id to use the managed default (DEFAULT_AGENT_MODEL_ID).
The query runtime now exposes a single managed executor surface. The server decides internal budgets, ambiguity handling, and exploration policy from the query itself instead of asking SDK callers to choose a lane.include_developer_trace and orchestration_metrics are optional diagnostic surfaces that are both gated on include_developer_trace=True — they are None on the response envelope when the flag is omitted. Their inner fields are typed but may evolve across rollouts as the managed runtime changes, so treat them as debugging signals rather than a stable execution contract.The developer_trace object mirrors the chat app’s Developer Logs card (same iterative-runtime signals): orchestration_mode="query", summary (tool calls, retries, loop steps), timeline, tool_call_history (with is_code_interpreter flags on Python sandbox calls), execution_trace, verification (with bounded_answer_reason / bounded_answer_data_gap when a safety guardrail stopped retries), and diagnostics (selection, execution contract, cost, contributor searches, tool-registry stats, retry budget, stage timing). The initial_code / final_code fields are the // iterative execution: no generated code sentinel kept for trace compatibility with the previous VM-based runtime — no JavaScript is generated or executed.

Structured Response Shapes

Query is Context’s managed librarian contract. You can choose how much structure you want back:
response_shapeBest forBehavior
answer_with_evidenceFirst-party chat, human-facing apps (default)Prose answer plus structured evidence, artifacts, freshness, confidence, and usage metadata
evidence_onlyExternal agents, downstream automationBounded evidence, computed artifacts, and full-data references without prose synthesis
The first-party chat app defaults to answer_with_evidence, but it is using the same Query contract you get in the SDK.

Query Envelope Fields

When response_shape is answer_with_evidence or evidence_only, the result may include:
FieldWhat it contains
summaryShort machine-friendly summary of the answer
evidenceCanonical facts, source refs, assumptions, known unknowns, retrieval reason codes, and optional wedge-specific market_intelligence
artifactsdata_url, canonical dataset metadata, and stage-artifact kinds
viewOptional UI/render hint such as table, leaderboard, heatmap, or timeseries, plus optional metrics, columns, and rows
outcomePublic outcome label, tone, stop_reason, and optional issue_class
controllerPublic bounded-controller summary including actions_taken, next_action, and patch-preservation flags
freshnessas_of, source timestamps, and freshness note
confidenceConfidence level, reason, fact counts, and gap signals
usageDuration, cost, tools used, outcome type, and optional orchestration metrics
result = await client.query.run(
    query="Which exchanges are seeing the largest BTC inflows and outflows over the last 24 hours?",
    response_shape="evidence_only",
)

print(result.response)  # machine-friendly summary for evidence_only
print(result.summary)
print(result.evidence.source_refs if result.evidence else None)
print(result.evidence.market_intelligence.venue_breakdown if result.evidence and result.evidence.market_intelligence else None)
print(result.stop_reason, result.issue_class, result.actions_taken)
print(result.usage.tools_used if result.usage else None)

High-Fidelity Rehydration (Retrieval-First Synthesis)

When retrieval-first rollout is enabled in the deployment, the query runtime can switch synthesis context assembly from baseline truncation to retrieval-first slices for full-data or truncation-sensitive requests.
  • Stage artifacts are emitted in request-scoped internal storage (selection, execution, synthesis). The scout stage is a legacy artifact slot — scout probe execution is disabled in the current iterative runtime, so no scout stage artifacts are produced.
  • Retrieval primitives (path lookup, array windows/sampling, keyword slices, top-K relevance) are used to build a bounded context pack from canonical execution data.
  • Final synthesis still passes through the existing synthesis safety contract.
  • include_data returns a bounded inline preview when needed, and include_data_url/artifacts.canonical_data_ref reference the same canonical execution dataset used by retrieval-first assembly.

Resume and Fork

Every Query response can include query_session handles. Use resume_from to continue from a previous attempt, or fork_from to branch from an attempt while preserving lineage.
first = await client.query.run(
    query="Find the biggest Base whale movements today",
    include_developer_trace=True,
)

handle = first.query_session
if handle:
    resumed = await client.query.run(
        query="Focus only on wallets with exchange interactions",
        resume_from={
            "sessionId": handle.session_id,
            "attemptId": handle.attempt_id,
        },
    )

    forked = await client.query.run(
        query="Try the same question with stricter evidence requirements",
        fork_from={
            "sessionId": handle.session_id,
            "attemptId": handle.attempt_id,
            "reason": "manual_fork",
        },
    )

    print(resumed.response)
    print(forked.response)

client.query.stream(query, tools?, favorites_only?, agent_model_id?, response_shape?, include_data?, include_data_url?, include_developer_trace?, idempotency_key?)

Same as run() but streams events in real-time via SSE. Returns: AsyncGenerator of stream events
async for event in client.query.stream(
    query="What are the top whale movements?",
):
    if event.type == "tool-status":
        print(f"Tool {event.tool.name}: {event.status}")
    elif event.type == "text-delta":
        print(event.delta, end="")
    elif event.type == "done":
        print(f"\nTotal cost: {event.result.cost.total_cost_usd}")
Use the same idempotency_key when retrying the same logical request after network or timeout errors.
If you stream with response_shape="evidence_only", expect the structured result on the final done event and few or no text-delta events.
query.run() uses the streaming path internally and has a default stream timeout of 600 seconds. Non-streaming SDK requests default to 300 seconds. For complex chart-heavy requests, use async jobs.

Async Query Jobs

Use async jobs when a query may exceed a single blocking SDK request.
job = await client.query.start(
    query="Build a chart-ready dataset of Base whale movements over the last 30 days",
    response_shape="evidence_only",
    include_data_url=True,
)

status = await client.query.get_status(job.job_id)
print(status.status)  # queued | running | completed | failed

completed = await client.query.poll(
    job.job_id,
    interval_ms=2000,
    timeout_ms=15 * 60_000,
)
print(completed.result)

Types

Import Types

from ctxprotocol import (
    # Auth utilities for tool contributors
    verify_context_request,
    is_protected_mcp_method,
    is_open_mcp_method,

    # Client types
    ContextClientOptions,
    Tool,
    McpTool,
    ExecuteOptions,
    ExecutionResult,
    QueryOptions,
    QueryResult,
    QueryDeveloperTrace,
    QueryOrchestrationMetrics,
    QueryResponseEnvelopeEvidence,
    QueryResponseEnvelopeArtifacts,
    QueryResponseEnvelopeView,
    QueryResponseEnvelopeFreshness,
    QueryResponseEnvelopeConfidence,
    QueryResponseEnvelopeUsage,
    QueryComputedArtifact,
    QueryCapabilityMissPayload,
    QueryAssumptionMetadata,
    ContextErrorCode,

    # Auth types (for MCP server contributors)
    VerifyRequestOptions,

    # Context types (for MCP server contributors receiving injected data)
    ContextRequirementType,
    HyperliquidContext,
    PolymarketContext,
    WalletContext,
    UserContext,
)

Tool

class Tool(BaseModel):
    id: str
    name: str
    description: str
    price: str  # listing-level response price metadata (legacy field name)
    category: str | None
    is_verified: bool | None
    mcp_tools: list[McpTool] | None

McpTool

class McpTool(BaseModel):
    name: str
    description: str
    input_schema: dict[str, Any] | None  # JSON Schema for arguments
    output_schema: dict[str, Any] | None  # JSON Schema for response
    meta: dict[str, Any] | None  # alias: "_meta" (mode/eligibility/pricing/context)
    execute_eligible: bool | None
    execute_price_usd: str | None
For argument guidance, use standard JSON Schema fields directly inside inputSchema properties. Put fallback values in default and sample invocations in examples. Do not rely on custom _meta.inputExamples.
TOOLS = [{
    "name": "get_price_history",
    "inputSchema": {
        "type": "object",
        "properties": {
            "symbol": {"type": "string", "default": "BTC", "examples": ["BTC", "ETH", "SOL"]},
            "interval": {"type": "string", "enum": ["1h", "4h", "1d"], "default": "1h", "examples": ["1h", "4h"]},
            "limit": {"type": "number", "default": 100, "examples": [50, 100, 200]},
        },
        "required": [],
    },
}]

ExecutionResult (Execute Mode)

class ExecutionResult(BaseModel):
    mode: Literal["execute"]
    result: Any
    tool: ToolInfo  # { id: str, name: str }
    method: ExecuteMethodInfo  # { name: str, execute_price_usd: str }
    session: ExecuteSessionSpend
    duration_ms: int

ExecuteSessionSpend

class ExecuteSessionSpend(BaseModel):
    mode: Literal["execute"] = "execute"
    session_id: str | None
    method_price: str
    spent: str
    remaining: str | None
    max_spend: str | None
    status: Literal["open", "closed", "expired"] | None
    expires_at: str | None
    close_requested: bool | None
    pending_accrued_count: int | None
    pending_accrued_usd: str | None

QueryResult (Pay-Per-Response)

class QueryResult(BaseModel):
    response: str                        # prose answer or machine-friendly summary
    tools_used: list[QueryToolUsage]     # [{ id, name, skill_calls }]
    cost: QueryCost                      # { model_cost_usd, tool_cost_usd, total_cost_usd }
    duration_ms: int
    data: Any | None                     # Bounded data/preview when include_data=True
    data_url: str | None                 # Optional blob URL (include_data_url=True)
    computed_artifacts: list[QueryComputedArtifact] | None  # Charts/dataframes from code_interpreter
    developer_trace: QueryDeveloperTrace | None  # Optional runtime trace + diagnostics
    orchestration_metrics: QueryOrchestrationMetrics | None  # Optional first-pass metrics
    response_shape: Literal["answer_with_evidence", "evidence_only"] | None
    summary: str | None
    evidence: QueryResponseEnvelopeEvidence | None
    artifacts: QueryResponseEnvelopeArtifacts | None    # Provenance: source refs + dataset handles
    view: QueryResponseEnvelopeView | None
    freshness: QueryResponseEnvelopeFreshness | None
    confidence: QueryResponseEnvelopeConfidence | None
    usage: QueryResponseEnvelopeUsage | None
    outcome_type: Literal["answer", "capability_miss"]
    capability_miss: QueryCapabilityMissPayload | None
    assumption_made: QueryAssumptionMetadata | None

Context Requirement Types

For MCP server contributors building tools that need user context:
Why Context Injection Matters:
  • No Auth Required: Public blockchain/user data is fetched by the platform
  • Security: Your MCP server never handles private keys
  • Simplicity: You receive structured, type-safe data
from ctxprotocol import CONTEXT_REQUIREMENTS_KEY

# Context types supported by the marketplace
ContextRequirementType = Literal["polymarket", "hyperliquid", "wallet"]

# Usage: Declare context requirements in _meta at the tool level
TOOLS = [{
    "name": "analyze_my_positions",
    "description": "Analyze your positions with personalized insights",
    "_meta": {
        "contextRequirements": ["hyperliquid"],
        "rateLimit": {
            "maxRequestsPerMinute": 30,
            "cooldownMs": 2000,
            "maxConcurrency": 1,
            "supportsBulk": True,
            "recommendedBatchTools": ["get_portfolio_snapshot"],
            "notes": "Hobby tier: prefer snapshot endpoints over loops.",
        },
    },
    "inputSchema": {
        "type": "object",
        "properties": {
            "portfolio": {
                "type": "object",
                "description": "Portfolio context (injected by platform)",
            },
        },
        "required": ["portfolio"],
    },
}]
Python reference implementation: Hummingbot contributor server.
For practical guidance on these pacing hints, see Tool Metadata.

Injected Context Types

HyperliquidContext

class HyperliquidContext(BaseModel):
    wallet_address: str
    perp_positions: list[HyperliquidPerpPosition]
    spot_balances: list[HyperliquidSpotBalance]
    open_orders: list[HyperliquidOrder]
    account_summary: HyperliquidAccountSummary
    fetched_at: str

PolymarketContext

class PolymarketContext(BaseModel):
    wallet_address: str
    positions: list[PolymarketPosition]
    open_orders: list[PolymarketOrder]
    total_value: float | None
    fetched_at: str

WalletContext

class WalletContext(BaseModel):
    address: str
    chain_id: int
    native_balance: str | None


class ERC20TokenBalance(BaseModel):
    address: str
    symbol: str
    decimals: int
    balance: str


class ERC20Context(BaseModel):
    tokens: list[ERC20TokenBalance]

Contributor Search Helpers

If you are building a contributor for a search-hard venue, the Python SDK ships an optional helper surface at ctxprotocol.contrib.search. Use it only when the venue’s upstream search is weak enough that deterministic retrieval plus a bounded model judge materially improves candidate resolution. Do not use it for venues that already expose reliable direct search.
from ctxprotocol.contrib.search import (
    build_contributor_search_validation_artifact,
    create_search_intent,
    extract_contributor_searches_from_developer_trace,
    merge_contributor_search_config,
    resolve_contributor_search,
)
What this module is for:
  • contributor-side intent shaping, candidate normalization, shortlist construction, and validated resolution
  • provider-agnostic judge injection with stable override knobs for provider, model, timeout, budget, and disabled
  • machine-readable artifact generation via build_contributor_search_validation_artifact(...)
  • runtime trace inspection via extract_contributor_searches_from_developer_trace(trace) or result.developer_trace.diagnostics.contributor_searches
Operational rules:
  • extra judge spend is contributor-owned in this rollout, so recover it through your own listing response price and/or execute pricing
  • keep deterministic validation around every judge result; malformed, timed-out, over-budget, or contradictory judgments must degrade honestly
  • save replayable validation artifacts alongside your contributor examples. Current reference fixtures live under examples/client/validation/

Error Handling

The SDK raises ContextError with specific error codes:
from ctxprotocol import ContextClient, ContextError

try:
    result = await client.tools.execute(...)
except ContextError as e:
    match e.code:
        case "no_wallet":
            # User needs to set up wallet
            print(f"Setup required: {e.help_url}")
        case "insufficient_allowance":
            # User needs to set a spending cap
            print(f"Set spending cap: {e.help_url}")
        case "payment_failed":
            # Insufficient USDC balance
            pass
        case "execution_failed":
            # Tool execution error
            pass

Error Codes

CodeDescriptionHandling
unauthorizedInvalid API keyCheck configuration
no_walletWallet not set upDirect user to help_url
insufficient_allowanceSpending cap not setDirect user to help_url
payment_failedUSDC payment failedCheck balance
execution_failedTool errorRetry with different args

Securing Your Tool (MCP Contributors)

If you’re building an MCP server, verify incoming requests using ctxprotocol.
If you wrap a search-hard venue whose upstream API cannot reliably resolve the right market or entity, follow Optional Contributor Search Helpers. That pattern stays contributor-side, keeps provider credentials contributor-owned, and does not change client.query.run() or _meta.
Free vs Paid Security Requirements:
Tool TypeSecurity MiddlewareRationale
Free Tools ($0.00)OptionalGreat for distribution and adoption
Paid Tools ($0.01+)MandatoryWe cannot route payments to insecure endpoints
FastMCP is the fastest way to build MCP servers. Use ctxprotocol middleware:
from fastmcp import FastMCP
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.server.dependencies import get_http_headers
from fastmcp.exceptions import ToolError
from ctxprotocol import verify_context_request, ContextError

mcp = FastMCP("my-tool")

class ContextProtocolAuth(Middleware):
    """Verify Context Protocol JWT on tool calls only."""
    
    async def on_call_tool(self, context: MiddlewareContext, call_next):
        headers = get_http_headers()
        try:
            await verify_context_request(
                authorization_header=headers.get("authorization", "")
            )
        except ContextError as e:
            raise ToolError(f"Unauthorized: {e.message}")
        return await call_next(context)

mcp.add_middleware(ContextProtocolAuth())

@mcp.tool
def get_data(query: str) -> dict:
    return {"result": "..."}

if __name__ == "__main__":
    mcp.run(transport="http", port=3000)
FastMCP auto-generates outputSchema from Pydantic return types and includes structuredContent in responses - both required by Context Protocol.

Option 2: Raw FastAPI

For more control, use FastAPI with our middleware:
from fastapi import FastAPI, Request, Depends, HTTPException
from ctxprotocol import create_context_middleware, ContextError

app = FastAPI()
verify_context = create_context_middleware(audience="https://your-tool.com/mcp")

@app.post("/mcp")
async def handle_mcp(request: Request, context: dict = Depends(verify_context)):
    # context contains verified JWT payload (on protected methods)
    # None for open methods like tools/list
    body = await request.json()
    # Handle MCP request...

Manual Verification

For more control, use the lower-level utilities:
from ctxprotocol import verify_context_request, is_protected_mcp_method, ContextError

# Check if a method requires auth
if is_protected_mcp_method(body["method"]):
    try:
        payload = await verify_context_request(
            authorization_header=request.headers.get("authorization"),
            audience="https://your-tool.com/mcp",  # optional
        )
        # payload contains verified JWT claims
    except ContextError:
        raise HTTPException(status_code=401, detail="Unauthorized")

Verification Options

OptionTypeRequiredDescription
authorization_headerstrYesFull Authorization header (e.g., "Bearer eyJ...")
audiencestrNoExpected audience claim for stricter validation

MCP Security Model

Critical for tool contributors: Not all MCP methods require authentication. The middleware selectively protects only execution methods.
MCP MethodAuth RequiredWhy
initialize❌ NoSession setup
tools/list❌ NoDiscovery - agents need to see your schemas
resources/list❌ NoDiscovery
prompts/list❌ NoDiscovery
tools/callYesExecution - costs money, runs your code
What this means in practice:
  • https://your-mcp.com/mcp + initialize → Works without auth
  • https://your-mcp.com/mcp + tools/list → Works without auth
  • https://your-mcp.com/mcp + tools/callRequires Context Protocol JWT
This matches standard API patterns (OpenAPI schemas are public, GraphQL introspection is open).

Payment Flow

Context supports two settlement timings:
  1. Query mode (client.query.*) uses deferred settlement after the response is delivered
  2. Execute mode (client.tools.execute) accrues per-call method spend into execute sessions with automatic batch payment
  3. In both modes, spending caps are enforced via ContextRouter allowance checks
  4. 90% goes to the tool developer, 10% goes to the protocol