Skip to content

guides

Intended Documentation

ROS2 Integration

Layer Intended Authority Tokens onto a ROS2 agent — cobot, AMR, drone, surgical — using the intended-ros2 package over the cloud authority API.

Integrating Intended on ROS2#

Audience: robotics engineers shipping ROS2 agents (cobots, AMRs, drones, surgical robots) who want to gate state-changing actions on an Intended Authority Token.

Prereqs: ROS2 Humble or newer, Python 3.10+, an Intended tenant API key.

ROS2 has good primitives for what a robot is doing. It has no primitive for whether the robot is allowed to do it. The moment an LLM, planner, or operator command becomes the source of an action, you want an auditable record of which OI category it falls under, a short-lived signed credential the controller checks before commanding motors, and a safe-default fallback if that credential is denied or expires. Intended is that layer.

Architecture#

                       ┌──────────────────────┐
                       │   Intended Cloud     │
                       │  classify · policy · │
                       │  sign · audit        │
                       └──────────▲───────────┘
                                  │ HTTPS (sub-second)
┌────────────────────────────────────────────────────────────────┐
│  Your robot compute                                            │
│                                                                │
│  ┌─────────────┐    ┌────────────────────────┐   ┌──────────┐  │
│  │ Your agent  │───▶│  intended_ros2 node    │──▶│ Verify   │──▶ motors
│  │  (BT, FSM,  │    │  (mediates cloud calls)│   │ token    │  │
│  │   LLM, …)   │    └────────────────────────┘   └──────────┘  │
│  └─────────────┘               │                               │
│        │                       ▼                               │
│  ┌──────────────────────────────────┐                          │
│  │ Your PhysicalStateProvider       │                          │
│  │  (FSoE / PROFIsafe / DDS reads —  │                         │
│  │   you implement; you submit)     │                          │
│  └──────────────────────────────────┘                          │
└────────────────────────────────────────────────────────────────┘

The node mediates: your agent never calls the cloud directly. Your safety-bus data never leaves your network — you read it and submit a structured snapshot. Token verification today is the cloud round-trip; a dedicated edge verifier is roadmap.

Install#

bash
# 1. Clone the package into your colcon workspace.
cd ~/colcon_ws/src
git clone https://github.com/intended-so/intended.git
ln -s intended/packages/intended-ros2 intended_ros2

# 2. Install the Python SDK.
pip install intended

# 3. Build.
cd ~/colcon_ws
colcon build --packages-select intended_ros2
source install/setup.bash

Configure#

bash
export INTENDED_API_URL=https://api.intended.so
export INTENDED_API_KEY=mrt_live_…
export INTENDED_TENANT_ID=tenant_acme_prod
export INTENDED_ACTOR_IDENTITY=cobot-east-3     # your DevID once provisioned
export INTENDED_SAFE_DEFAULT=hold-position      # or stop, request-operator
export INTENDED_DEADLINE_MS=200

INTENDED_ACTOR_IDENTITY should be the IEEE 802.1AR DevID of your robot once provisioned; until then any stable identifier works. It must be registered before it can be authorized.

Register the actor (once)#

python
import os, requests

requests.post(
    f"{os.environ['INTENDED_API_URL']}/v1/physical/actors",
    headers={
        "Authorization": f"Bearer {os.environ['INTENDED_API_KEY']}",
        "x-tenant-id": os.environ["INTENDED_TENANT_ID"],
        "Content-Type": "application/json",
    },
    json={"actorIdentity": os.environ["INTENDED_ACTOR_IDENTITY"], "actorKind": "cobot"},
).raise_for_status()

Calling from your agent#

The structured goal is a ROS2 action shape, not English. From a Python node:

python
import os, requests
from rclpy.node import Node

BASE = os.environ["INTENDED_API_URL"]
HEADERS = {
    "Authorization": f"Bearer {os.environ['INTENDED_API_KEY']}",
    "x-tenant-id": os.environ["INTENDED_TENANT_ID"],
    "Content-Type": "application/json",
}

class PickPlacePlanner(Node):
    def request_authority_for_pick(self, target_pose, seq: int):
        goal = {
            "schema": "ros2:action:moveit_msgs/Pick",
            "verb": "pick",
            "object": "workpiece-A4-7",
            "parameters": {"target_pose": target_pose},
            "actor": {"kind": "cobot", "identifier": os.environ["INTENDED_ACTOR_IDENTITY"]},
        }
        dag = {
            "nodeId": "pick-step-1",
            "oiCode": "OI-1502",           # the issuer may re-classify
            "deadlineMs": int(os.environ["INTENDED_DEADLINE_MS"]),
            "safeDefault": os.environ["INTENDED_SAFE_DEFAULT"],
            "realTimeTier": "rt-soft",
        }
        resp = requests.post(
            f"{BASE}/v1/physical/authority-tokens",
            headers={**HEADERS, "Idempotency-Key": f"pick-step-1-{seq}"},
            json={
                "intent": {"structuredGoal": goal},
                "dagNode": dag,
                "claims": {
                    "actorIdentity": os.environ["INTENDED_ACTOR_IDENTITY"],
                    "deadlineMs": dag["deadlineMs"],
                    "safeDefault": dag["safeDefault"],
                    "safetyBit": True,
                },
                "physicalState": self._snapshot_required_predicates(),
            },
        )
        if resp.status_code == 200 and resp.json().get("decision") == "ALLOW":
            return resp.json()["token"]          # dispatcher verifies before motion
        if resp.status_code in (422, 423):
            detail = resp.json()["error"]["details"]
            self.get_logger().error(f"DENIED → safe-default {detail['safeDefault']}")
            self._actuate_safe_default(detail["safeDefault"])
        return None

Snapshot, don't read

Intended never reads your safety bus. Your _snapshot_required_predicates() builds a predicate → PhysicalStateValue map from your bus/DDS reads and submits it. The discriminated union and attestation protocols are in the API reference wire format.

Implementing the state bridge#

Bridge each safety-bus signal into a structured predicate with honest attestation:

python
def now_ms() -> int:
    import time
    return int(time.time() * 1000)

class FsoeBridge:
    """Reads predicates from a Beckhoff FSoE master via your DDS bridge."""
    def __init__(self, dds_subscriber):
        self._dds = dds_subscriber

    def read(self, predicate: str, deadline_ms: int) -> dict:
        msg = self._dds.read_with_timeout(predicate, timeout_ms=deadline_ms)
        if msg is None:
            return {"kind": "unavailable",
                    "reason": f"timeout reading {predicate}",
                    "asOfTimestampMs": now_ms()}
        return {
            "kind": "boolean",
            "value": msg.value,
            "asOfTimestampMs": msg.publish_timestamp_ms,
            "attestation": {"channel": msg.topic, "safetyRated": True, "protocol": "fsoe"},
        }

safetyRated: true and protocol: "fsoe" are the load-bearing claims. If your policy requires safety-rated input and the bus drops to an untrusted channel (or returns unavailable), the cloud denies on attestation grounds.

Verifying the token#

Today: the controller treats the token as valid if decision == "ALLOW", expiresAtMs > now(), and the signature checks out against /.well-known/jwks.json.

Set the verifier audience to intended-edge-verifier

The cloud signs tokens with aud: "intended-edge-verifier". The sample verifier in the intended-ros2 package defaults expected_audience="intended-edge" — set your verifier's expected audience to intended-edge-verifier so it matches what the issuer actually emits, otherwise valid tokens are rejected. The dedicated edge verifier binary that would standardize this is roadmap.

Common patterns#

  • Behavior trees (BehaviorTree.CPP). Wrap the issuance call so an ESCALATE decision (HTTP 200 with escalationTicketId) pauses the tree; resume when an operator approves via /approvals/{ticketId}/decide, then re-issue with operatorTicketId set.
  • Real-time controllers (ros2_control). Never issue from the controller update loop — the cloud round-trip starves the deadline. Pre-issue from a non-RT planner and pass the token down via the command interface.
  • Multi-robot fleets. One issuance path per robot; tokens bind to actorIdentity, so a token for cobot-east-3 carries that sub and cannot be presented for cobot-east-4.

Troubleshooting#

SymptomLikely causeFix
403 actor_not_registeredActor not on the rosterPOST /v1/physical/actors for this actorIdentity.
Every valid token rejected at verifyVerifier expects intended-edge, issuer mints intended-edge-verifierSet verifier audience to intended-edge-verifier.
422 policy_denied with an attestation clausePredicates marked protocol: untrusted but policy requires safety-ratedWire the safety bus; set safetyRated: true.
failClosed: true on a novel goalNo classifier rule matchedSupply an explicit oiCode, or extend the corpus / use a learned-model fallback.
Token expires before motion startsdeadlineMs shorter than trajectory dispatchIssue closer to motion, or widen the deadline.

See also#

ROS2 Integration | Intended