Skip to content

guides

Intended Documentation

Python ML Pipelines

Gate perception → policy pipelines (PyTorch / JAX) on Intended Authority Tokens using the Python SDK or the raw HTTP API.

Integrating Intended in Python ML Pipelines#

Audience: ML engineers running perception / planning pipelines for physical agents in Python — typically PyTorch / JAX models that consume sensor streams and emit structured commands.

Prereqs: Python 3.10+, an Intended tenant API key (mrt_live_… / mrt_test_…).

Where Intended fits in an ML pipeline#

sensors ──▶ perception model ──▶ planner / policy ──▶ structured command ──▶ controller
                                                          │
                                                          ▼
                                          classify + evaluate + (maybe) mint token
                                                          │
                                                          ▼
                                          Authority Token  OR  safe-default
                                                          │
                                                          ▼
                                          controller dispatch (gated on token)

Every time your model produces a command that becomes real-world motion: classify the command, snapshot the relevant state, and ask for an Authority Token. The token is the credential the controller checks. Without a valid token, the controller falls back to the DAG node's declared safe-default.

This is the same loop as the ROS2 guide without the ROS2 plumbing — useful for standalone perception→command pipelines (AVs outside ROS2, PX4/ArduPilot wrappers), surgical platforms whose planner is a Python service, and agriculture / mining / construction stacks where ROS is not the dominant integration.

Install#

bash
pip install intended

The Python SDK is a thin convenience wrapper over the HTTP API documented in the Authority API reference. Anything you can do with the SDK you can do with requests; the field names and semantics are identical.

Register the actor first#

Token issuance for an unregistered actorIdentity returns 403 actor_not_registered. Register each robot once:

python
import os, requests

BASE = os.environ["INTENDED_API_URL"]
HEADERS = {
    "Authorization": f"Bearer {os.environ['INTENDED_API_KEY']}",
    "x-tenant-id": os.environ["INTENDED_TENANT_ID"],
    "Content-Type": "application/json",
}

requests.post(
    f"{BASE}/v1/physical/actors",
    headers=HEADERS,
    json={"actorIdentity": "drone-fleet-3", "actorKind": "drone"},
).raise_for_status()

The classify → issue loop#

python
import os, requests

BASE = os.environ["INTENDED_API_URL"]
HEADERS = {
    "Authorization": f"Bearer {os.environ['INTENDED_API_KEY']}",
    "x-tenant-id": os.environ["INTENDED_TENANT_ID"],
    "Content-Type": "application/json",
}

def authorize(goal: dict, dag: dict, physical_state: dict, idempotency_key: str):
    # 1. Classify (optional — the issuer will classify if you omit oiCode).
    classify = requests.post(
        f"{BASE}/v1/physical/classify", headers=HEADERS, json={"goal": goal}
    ).json()
    if classify["failClosed"]:
        return {"decision": "DENY", "safeDefault": dag["safeDefault"]}

    # 2. Issue a token bound to this actor + this DAG node.
    resp = requests.post(
        f"{BASE}/v1/physical/authority-tokens",
        headers={**HEADERS, "Idempotency-Key": idempotency_key},
        json={
            "intent": {"oiCode": classify["oiCode"], "structuredGoal": goal},
            "dagNode": dag,
            "claims": {
                "actorIdentity": goal["actor"]["identifier"],
                "deadlineMs": dag["deadlineMs"],
                "safeDefault": dag["safeDefault"],
                "safetyBit": classify["safetyBit"],
                "safetyCitations": classify["safetyCitations"],
            },
            "physicalState": physical_state,
        },
    )

    if resp.status_code == 200 and resp.json().get("decision") == "ALLOW":
        return resp.json()                       # has token, expiresAtMs, tokenJti
    if resp.status_code == 200:                  # ESCALATE
        return resp.json()                       # has escalationTicketId, approvalUrl
    if resp.status_code in (422, 423):           # policy denied / audit-gap
        return resp.json()["error"]["details"]   # has decision, clauseId, safeDefault
    resp.raise_for_status()


goal = {
    "schema": "px4:command/MAV_CMD_NAV_WAYPOINT",
    "verb": "navigate-to-waypoint",
    "object": "waypoint-7",
    "parameters": {"lat": 37.7749, "lon": -122.4194, "alt_m": 80},
    "actor": {"kind": "drone", "identifier": "drone-fleet-3"},
}
dag = {
    "nodeId": "nav-step-1",
    "oiCode": "OI-1605",
    "deadlineMs": 500,
    "safeDefault": "hold-position",
    "realTimeTier": "rt-soft",
}
result = authorize(goal, dag, snapshot_state(), idempotency_key="nav-step-1-0001")
if result.get("decision") == "ALLOW":
    controller.dispatch_with_token(result["token"])
else:
    controller.actuate_safe_default(result["safeDefault"])

The token lifetime is the deadline

dagNode.deadlineMs is the token's lifetime — expiresAtMs = issuedAt + deadlineMs. A 500ms deadline yields a 500ms token. Request a fresh token per DAG node; do not treat one token as a session credential.

Pattern: per-frame perception → policy gate#

For perception-heavy pipelines (autonomous driving, agricultural row following, surgical autonomy) you may run the classifier every frame and re-issue tokens only when the cited OI category changes. This caps cloud QPS without sacrificing correctness:

python
def now_ms() -> int:
    import time
    return int(time.time() * 1000)

class GatedPolicyHead:
    def __init__(self, dag, actor_identity):
        self._dag = dag
        self._actor = actor_identity
        self._cached = None       # last ALLOW response
        self._cached_oil = None

    def authorize(self, goal, physical_state):
        classify = requests.post(
            f"{BASE}/v1/physical/classify", headers=HEADERS, json={"goal": goal}
        ).json()
        if classify["failClosed"]:
            return None

        # Reuse if OI category is unchanged and the token still outlives the deadline.
        if (
            self._cached
            and classify["oiCode"] == self._cached_oil
            and self._cached["expiresAtMs"] - now_ms() > self._dag["deadlineMs"]
        ):
            return self._cached["token"]

        result = authorize(goal, self._dag, physical_state,
                           idempotency_key=f"{self._dag['nodeId']}-{now_ms()}")
        if result.get("decision") != "ALLOW":
            return None
        self._cached, self._cached_oil = result, classify["oiCode"]
        return result["token"]

When the classifier shifts the OI category — e.g. OI-2001 (lane change) replacing OI-2003 (mission planning) — a fresh token is required by design: the new category may carry a different policy, safe-default, or approval requirement.

Pattern: vision-grounded intent#

For agents whose intent is perception-determined, carry a fingerprint of the relevant tensor as a parameter so the decision is replayable:

python
goal = {
    "schema": "custom:perception/agricultural-row-follow",
    "verb": "follow-row",
    "parameters": {
        "row_id": detected_row.id,
        "perception_confidence": float(detection.confidence),
        "perception_fingerprint": tensor_hash(latest_frame),  # for audit replay
    },
    "actor": {"kind": "agv", "identifier": "tractor-12"},
}

The fingerprint lands in the audit chain so you can replay the decision later — critical for safety reviews of ML-driven autonomy.

Vision is not a safety-rated input

A camera-derived predicate is not safety-rated in the IEC 61508 sense. Mark such predicates safetyRated: false and protocol: "untrusted". If you need redundant human detection, pair a safety-rated curtain/scanner and aggregate with a consensus clause, or downgrade the action to a non-safety OI category. Do not claim a safety rating you do not have — auditors will catch it.

Snapshotting state#

Submit live world state as a predicate → PhysicalStateValue map on each issuance (or stand it up separately with POST /v1/physical/snapshots). The discriminated union and attestation protocols are in the API reference wire format.

python
def snapshot_state() -> dict:
    return {
        "perception/operator_in_cabin": {
            "kind": "boolean",
            "value": world_model.latest_cabin_occupancy().has_human,
            "asOfTimestampMs": now_ms(),
            "attestation": {
                "channel": "cabin-cam-stack",
                "safetyRated": False,        # vision is not a safety-rated bus
                "protocol": "untrusted",
            },
        },
    }

Real-time considerations#

The cloud round-trip is the gate today — there is no shipped edge verifier. For pipelines running at ≥30Hz, do not issue per frame; use the cached pattern above and re-issue only on OI transitions. For genuinely real-time loops (≤10ms decision budgets) Python is the wrong language for the hot path: mint tokens from a non-RT Python service and verify them downstream against the cloud-served JWKS. A native Rust verifier is roadmap (see Rust safety-critical firmware).

Testing#

  • Point INTENDED_API_URL at the sandbox base and use an mrt_test_… key. The signer in non-prod is an ephemeral per-process keypair — tokens do not survive a server restart and are not valid against production verifiers.
  • For replay tests, capture historical perception traces + state snapshots and feed them through your gating wrapper, asserting the decision shape.

See also#

Python ML Pipelines | Intended