Skip to main content

Rust Extension Development

Cirata Symphony provides a Rust library for developing extensions. Rust extensions are well suited to high-performance, low-latency workloads, real-time data processing, and environments where memory safety and minimal runtime overhead are priorities.

Getting Started

Installation

The Symphony Rust library is distributed as a source archive. Download it from the Languages and Libraries page, then extract and reference it as a path dependency in your Cargo.toml:

# Extract the library
tar xf cirata-symphony-<version>-rust-src.tar
[dependencies]
cirata-symphony = { path = "cirata_symphony" }
tokio = { version = "1", features = ["full"] }
serde_json = "1.0"
futures = "0.3"
tracing-subscriber = "0.3"

The library depends on async-nats for NATS connectivity, reqwest for HTTP, and tokio as the async runtime. All transitive dependencies are listed in the included Cargo.toml and pinned in Cargo.lock.

The library requires Rust 1.96 or later.

When building within the Symphony source tree using Bazel, depend on //libs/rust/cirata_symphony and reference crates from the @crates repository.

Project Structure

A Rust extension is typically a single binary crate with any supporting files:

my_extension/
+-- src/
| +-- main.rs # Extension entry point
| +-- handlers.rs # Optional: handler functions
| +-- pages/ # Optional: UI template files
| +-- home.tsx
| +-- help.md
+-- Cargo.toml # Crate manifest
+-- BUILD.bazel # Bazel build configuration (optional)

Minimal Extension

The simplest possible extension connects to Symphony and publishes its availability:

use cirata_symphony::Extension;
use std::env;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

let token = env::var("SYMPHONY_TOKEN")
.expect("SYMPHONY_TOKEN is required");

let ext = Extension::new("Hello World", "hello")
.with_description("A simple Rust extension")
.with_token(token);

ext.run().await?;
Ok(())
}

This extension registers with Symphony and remains connected until stopped with Ctrl+C (SIGINT). The status is published every 30 seconds to keep the extension's entry alive.

Extension Initialization

Builder Pattern

The Extension type uses a builder pattern for configuration:

let ext = Extension::new("My Extension", "my_ext")
.with_description("Provides data processing")
.with_token(token);
MethodRequiredDescription
new(name, prefix)YesHuman-readable name and unique prefix
with_description(desc)NoDescription of the extension's purpose
with_token(token)YesAPI token (typically from SYMPHONY_TOKEN env var)

Authentication

Extensions authenticate with a token obtained from an API Key. The token is resolved in the following order of precedence:

  1. The with_token() builder method
  2. The OS credential store (Keychain on macOS, Credential Manager on Windows, Secret Service on Linux) — from a previous provisioning run
  3. The REGISTRATION_TOKEN environment variable (one-time provisioning)
let token = env::var("SYMPHONY_TOKEN").unwrap_or_default();

When REGISTRATION_TOKEN is set, the extension connects, automatically provisions a permanent API key with the extension's declared capabilities, and stores it in the OS credential store (or ~/.config/cirata/ as a fallback). Subsequent runs use the stored token automatically with no interaction required.

If no token is available from any source, the extension logs an error with instructions and exits. Extensions never prompt for input on stdin.

Container and Kubernetes deployments

In Docker containers and Kubernetes pods the OS credential store is not available (no keychain daemon). The SDK detects this automatically and falls back to file-based storage, but the file is ephemeral and lost when the container restarts. To avoid re-provisioning on every restart:

  • Recommended: inject a pre-provisioned token via the SYMPHONY_TOKEN environment variable (e.g. from a Kubernetes Secret). No file is written and no credential store is needed.
  • Alternative: mount ~/.config/cirata/ on a persistent volume so an auto-provisioned token survives pod restarts.

The library decodes the hostname from the token, exchanges it for NATS credentials via HTTPS, and establishes an authenticated connection using JWT + NKey signature authentication.

Overriding the token-embedded address

When the hostname encoded in the token is not the right one to reach Symphony from where the extension runs, set the SYMPHONY_ADDRESS environment variable. It accepts a bare hostname, host:port, or a full URL with an explicit http or https scheme.

See SYMPHONY_ADDRESS for the full reference: when to use it, the three accepted forms, how scheme and port route to the HTTPS leg only, TLS SAN considerations, the plaintext-HTTP warning, and validation rules. The same variable behaves identically across the Rust, Python, Go, and Java SDKs.

Lifecycle

Simple Run

The run() method starts the extension and blocks until Ctrl+C:

ext.run().await?;

run() performs the following steps:

  1. Establishes a NATS connection to Symphony
  2. Registers the extension in the Symphony registry
  3. Starts periodic status publishing (every 30 seconds)
  4. Blocks until SIGINT is received
  5. Publishes a disconnect status and exits

Run with Setup

For extensions that need microservice endpoints, use run_with_setup(). The callback is invoked after the connection is established but before entering the event loop:

ext.run_with_setup(|ext| Box::pin(async move {
// Set up endpoints here (connection is available)
let svc = ext.add_endpoints("MyService", "1.0.0",
"My service", None).await?;
// ... register endpoint handlers ...
Ok(())
})).await?;

Capabilities

Capabilities declare what NATS subjects the extension needs to publish to or subscribe from. During auto-provisioning, these capabilities determine the default permissions embedded in the extension's API key. Users can later adjust these permissions in the Symphony UI on the API Keys page.

ext.add_capability("sub", "extensions.my_ext", "My Extension").await;
ext.add_capability("sub", "extensions.my_ext.process", "Process data").await;
ext.add_capability("pub", "events.my_ext", "Emit events").await;
ParameterDescription
cap_type"pub" for publish or "sub" for subscribe
keyNATS subject pattern
descriptionHuman-readable description

User Interface

Rust extensions register UI resources with add_resource, mapping a URI to a MIME type and content string:

ext.add_resource(
"ui://my_ext/home",
MimeType::TextSlashSymphonyJsx,
jsx_content,
).await;

ext.add_resource(
"ui://my_ext/common",
MimeType::TextSlashSymphonyModule,
shared_code,
).await;

ext.add_resource(
"ui://my_ext/dashboard",
MimeType::TextSlashHtmlPlusSymphony,
html_content,
).await;

ext.add_resource(
"ui://my_ext/help",
MimeType::TextSlashMarkdown,
help_content,
).await;

// HTMX resource — Symphony provides shell with HTMX, auth, and theme
ext.add_htmx_resource("ui://my_ext/table", "pages/table").await;

Module resources (MimeType::TextSlashSymphonyModule) are imported by JSX resources using import { ... } from '@symphony/extension/my_ext/common'.

HTMX resources use a JSON config instead of HTML content—Symphony provides the HTMX shell with auth injection and theme sync. See HTML + HTMX for details.

For resources that need additional metadata (name, description, CSP):

ext.add_resource_full(
"ui://my_ext/app",
MimeType::TextSlashHtmlSemicolonProfileEqualMcpApp,
app_html,
Some("My App".to_string()),
Some("Interactive application".to_string()),
None, // optional ResourceBaseMeta
).await;

Routes map URL paths to resources:

ext.add_route("/my_ext", "ui://my_ext/home").await;
ext.add_route("/my_ext/dashboard", "ui://my_ext/dashboard").await;

For detailed documentation on each resource type and its capabilities, see User Interfaces.

Including Content from Files

Use Rust's include_str! macro to embed UI content at compile time:

const HOME_PAGE: &str = include_str!("pages/home.tsx");
const HELP_PAGE: &str = include_str!("pages/help.md");

ext.add_resource("ui://my_ext/home", MimeType::TextSlashSymphonyJsx, HOME_PAGE).await;

This keeps UI content in separate files while including them in the binary at build time.

Extensions add navigation menu items using the add_menu method:

ext.add_menu("main", "My Extension", "/my_ext", "fa-puzzle-piece").await;
ext.add_menu("tools", "Data Tools", "/my_ext/tools", "fa-wrench").await;
ParameterDescription
groupMenu group name (e.g., "main", "tools")
itemDisplay label for the menu entry
routeTarget route path
iconFontAwesome icon class

For menu items with external links:

ext.add_menu_with_link(
"main", "Docs", "/docs", "fa-book",
Some("https://docs.example.com".to_string()),
).await;

Widgets

Register dashboard widgets for display in the widget picker:

ext.add_widget("Status", "ui://my_ext/status").await;

// With notes/description
ext.add_widget_with_notes(
"Status",
"ui://my_ext/status",
Some("Real-time status monitor".to_string()),
).await;

Microservice Endpoints

Extensions expose functionality as NATS microservices using add_endpoints inside run_with_setup.

Creating a Service

use cirata_symphony::Extension;
use futures::StreamExt;
use serde_json::json;

let ext = Extension::new("My Extension", "my_ext")
.with_token(token);

ext.run_with_setup(|ext| Box::pin(async move {
let svc = ext.add_endpoints(
"MyService", "1.0.0", "My data service", None,
).await?;

let group = svc.group("cirata.extensions.my_ext");

let mut endpoint = group.endpoint("process").await
.map_err(|e| cirata_symphony::Error::Other(e.to_string()))?;

tokio::spawn(async move {
while let Some(request) = endpoint.next().await {
let input: serde_json::Value =
serde_json::from_slice(&request.message.payload)
.unwrap_or(json!({}));

let result = json!({"status": "success", "input": input});

let _ = request.respond(
Ok(serde_json::to_vec(&result).unwrap().into())
).await;
}
});

Ok(())
})).await?;

The group name determines the NATS subject prefix. An endpoint named process in group cirata.extensions.my_ext listens on subject cirata.extensions.my_ext.process.

Handler Pattern

Endpoints are streams of requests. Each request must be responded to:

tokio::spawn(async move {
while let Some(request) = endpoint.next().await {
// Parse input
let input: serde_json::Value =
serde_json::from_slice(&request.message.payload)
.unwrap_or(json!({}));

// Process and respond
let result = json!({"status": "success", "data": input});
let _ = request.respond(
Ok(serde_json::to_vec(&result).unwrap().into())
).await;
}
});

Key rules for handlers:

  1. Always respond. Every request must receive a response via request.respond(). A handler that does not respond causes the caller to time out.
  2. Return appropriate empty values. Use json!({}) for single-object endpoints and json!([]) for list endpoints.
  3. Handle errors gracefully. Respond with an error payload rather than silently dropping the request.

OpenAPI Metadata

Pass an OpenAPI spec as service metadata to enable REST API exposure and documentation:

use std::collections::HashMap;

let metadata: HashMap<String, String> = [(
"openapi".to_string(),
include_str!("openapi.yaml").to_string(),
)].into_iter().collect();

let svc = ext.add_endpoints(
"MyService", "1.0.0", "My service", Some(metadata),
).await?;

REST Request Parsing

The decorators module provides helpers for parsing REST API requests, equivalent to Python's @rest decorator:

use cirata_symphony::decorators::{parse_rest_request, rest_response, rest_error};

tokio::spawn(async move {
while let Some(request) = endpoint.next().await {
let response = match parse_rest_request(&request.message.payload) {
Ok(rest_req) => {
let name = rest_req.body
.flatten()
.and_then(|b| b.get("name").and_then(|v| v.as_str()).map(String::from))
.unwrap_or_else(|| "World".to_string());

rest_response(200, json!({"message": format!("Hello, {}!", name)}))
}
Err(e) => rest_error(400, &format!("Invalid request: {}", e)),
};

let _ = request.respond(
Ok(serde_json::to_vec(&response).unwrap().into())
).await;
}
});
FunctionDescription
parse_rest_request(payload)Parse bytes into RestApiRequest with method, url, body, path/query params
rest_response(status, body)Create a wrapped REST response with status code
rest_error(status, message)Create an error REST response

REST API Response Format

When endpoints are exposed as REST APIs (via an OpenAPI spec in the service metadata), handlers can respond in two ways:

Raw response (simplest) -- respond with the content directly. Symphony auto-detects the content type: valid JSON is served as application/json, anything else as text/plain.

let result = json!({"items": [{"id": "1", "name": "Widget"}]});
let _ = request.respond(Ok(serde_json::to_vec(&result).unwrap().into())).await;

Wrapped response (full control) -- for control over the HTTP status code, content type, or response headers:

let response = json!({
"body": {"id": item_id, "status": "created"},
"statuscode": 201,
"contenttype": "application/json",
});
let _ = request.respond(Ok(serde_json::to_vec(&response).unwrap().into())).await;
FieldRequiredDescription
bodyYesThe response payload
statuscodeNoHTTP status code (100-599). Defaults to 200
contenttypeNoMIME type. Defaults to application/json
headersNoAdditional HTTP headers as a string-to-string map

Storage

Rust extensions use the NATS JetStream key-value API for persistent storage.

Declaring Buckets

Declare buckets during extension setup:

ext.add_bucket("my_ext_data", "Extension data", 0).await;
ext.add_bucket("my_ext_events", "Event log with 24h TTL", 86400).await;
ParameterDescription
nameBucket name
descriptionHuman-readable description
ttlTime-to-live in seconds. 0 means entries do not expire

The platform creates or updates buckets when the extension registers. Declared buckets inherit the cluster's replica factor (R=3 on 3–4 node clusters, R=5 on 5+ nodes, R=1 standalone) — no action required.

Rust extensions that create buckets at runtime via async-nats directly should request cirata.services.cluster.info (JSON response {"replicas": N}) and set the replica count on their CreateKeyValue config. Without this, async-nats defaults to R=1 and the bucket is not replicated.

Accessing Buckets

Use the JetStream context from the extension to access key-value stores:

// Inside run_with_setup or after initialization
let js = ext.jetstream().await?;
let kv = js.get_key_value("my_ext_data").await
.map_err(|e| cirata_symphony::Error::Other(e.to_string()))?;

Basic Operations

use bytes::Bytes;

// Store a value
kv.put("item:123", Bytes::from(r#"{"name": "Widget"}"#)).await?;

// Retrieve a value
if let Some(entry) = kv.get("item:123").await? {
let value = String::from_utf8_lossy(&entry);
println!("Value: {}", value);
}

// Delete a key
kv.delete("item:123").await?;

Batch Write

When writing many keys at once, use the kv_batch module to publish all entries concurrently instead of calling put() in a loop. Each individual put() is a synchronous round-trip; the batch API fires all publishes via futures::future::join_all and collects acks in parallel.

use cirata_symphony::kv_batch::{put_all, BatchEntry};
use bytes::Bytes;

let entries: Vec<BatchEntry> = items.iter().map(|item| BatchEntry {
key: item.id.clone(),
value: Bytes::from(serde_json::to_vec(item).unwrap()),
}).collect();

let js = ext.jetstream().await?;
let result = put_all(&js, "my_ext_data", entries).await;

println!("Batch write: {} succeeded, {} failed out of {}",
result.succeeded, result.failed, result.total);

for (key, error) in &result.errors {
eprintln!("Failed key '{}': {}", key, error);
}

The batch API never fails fast—a single bad key does not prevent the remaining entries from being written. BatchPutResult.errors maps failed key names to error strings. Values written this way are fully readable via standard kv.get().

Usage Reporting

Extensions that consume licensed resources should report their usage to Symphony for licensing and chargeback:

Reporting Usage

use std::collections::HashMap;

let dimensions: HashMap<String, f64> = [
("tables_replicated".into(), 5.0),
("bytes_transferred_gb".into(), 120.5),
].into_iter().collect();

// The second argument is an optional trace context for correlating the
// usage report with a distributed trace; pass None when not tracing.
let result = ext.report_usage(dimensions, None).await?;
println!("Usage reported: {:?}", result);

The dimension keys must match the dimension names defined in the license's unit rates for your extension prefix.

Enforcement State

Read the current enforcement state:

if let Some(state) = ext.get_enforcement_state().await? {
println!("Enforcement: {:?}", state);
}
StatusMeaning
enforcement_okNormal operation -- units available
enforcement_warning90% or more of licensed units consumed
enforcement_graceUnits exhausted or license expired, within grace period
enforcement_enforcedGrace period expired -- stop licensed operations

When to Report

Report usage at meaningful checkpoints -- for example, after completing a replication batch, processing a set of queries, or transferring a block of data. Avoid reporting on every individual operation; instead, batch measurements and report periodically.

See Usage Tracking for how administrators view and manage reported usage.

Connection Access

For advanced use cases, access the raw NATS connection and JetStream context:

// NATS connection (for raw pub/sub)
if let Some(conn) = ext.conn() {
let response = conn.request("some.subject", payload.into()).await?;
}

// JetStream context (for KV and object stores)
let js = ext.jetstream().await?;

// Extension identifier
if let Some(id) = ext.identifier() {
println!("Extension ID: {}", id);
}

Dynamic Resource Updates

Update resource content at runtime and re-publish to Symphony:

ext.update_resource(
"ui://my_ext/dashboard",
MimeType::TextSlashSymphonyJsx,
updated_content,
).await?;

This updates the resource in the extension info and re-registers the extension so the UI picks up the change immediately.

Using the Client Library

The Rust client can be used independently to interact with Symphony and invoke services provided by other extensions:

use cirata_symphony::Client;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new("your-token").await?;

// Get system info
let info = client.info().await?;
println!("Connected to: {:?}", info);

// List extensions
let extensions = client.extensions().await?;
println!("Extensions: {:?}", extensions);

// Call an extension service
let result = client.call_extension(
"my_ext", "process", json!({"input": "data"}),
).await?;
println!("Result: {:?}", result);

// Access storage
let value = client.storage(Some("my_bucket"), Some("my_key")).await?;

client.close().await?;
Ok(())
}

Logging

Use the tracing crate with the default tracing-subscriber for structured log output to stdout. Call tracing_subscriber::fmt::init() once at the start of main:

use tracing::{info, warn, error};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

info!(extension = "my_ext", "Extension started");
warn!("Cache miss rate is high");
error!(err = %e, "Query failed");
// ...
}

Each log line uses consistent field names (timestamp, level, message) matching the other Symphony SDKs. Do not write log files — stdout is captured automatically by journald (systemd) or the container runtime (Docker/Kubernetes).

The OTLP log capture (via obs.log_capture()) is a separate concern for the observability pipeline — see the next section.

Observability

Extensions can participate in Symphony's unified telemetry system by enabling observability. This registers NATS endpoints for metrics, logs, and traces that the Observability Extension collects automatically.

use cirata_symphony::observability::{Observability, SpanKind};

// Inside run_with_setup:
let obs = Observability::enable(&svc, "myext").await;

// Instrument handlers with trace propagation
let span = obs.start_child_span_from_request(
"process", SpanKind::Server, &request
);
obs.increment_counter("requests.total", 1);
// ... process ...
obs.end_span(span, HashMap::new());

For complete coverage of traces, metrics, logs, and distributed tracing patterns, see Observability.

Multi-Instance Support

Rust extensions can run as multiple instances of the same extension type. The SDK includes built-in support for queue-based command delivery and instance-specific NATS subjects.

Automatic Load Balancing

Microservice endpoints registered with add_endpoints are automatically load-balanced across all instances by NATS micro. No code changes are needed—deploying additional instances immediately distributes incoming service requests across them.

The SDK uses queue subscribe for the commands channel, ensuring that each command is processed by exactly one instance.

Instance-Specific Subjects

Each instance subscribes to cirata.extensions.{prefix}.instance.{instance_id}.> for messages targeted at that specific instance. The instance ID is derived from the JWT identifier in the API token and can be accessed with:

if let Some(id) = ext.identifier() {
println!("Instance ID: {}", id);
}

WorkPartitioner

The Rust SDK provides a WorkPartitioner for distributing background work across instances using consistent hashing. Each instance calls is_my_work to check whether it owns a given work item.

Create a WorkPartitioner via the Extension convenience method:

use std::sync::Arc;

let partitioner = ext.enable_work_partitioning(Some(Arc::new(|| {
println!("Instances changed, re-evaluating work assignments");
})));

Or create one directly:

use cirata_symphony::WorkPartitioner;
use std::sync::Arc;

let partitioner = WorkPartitioner::new(instance_id, prefix, Some(Arc::new(|| {
println!("Rebalanced");
})));

Start watching for instance changes (requires a JetStream context, available from within run_with_setup):

let js = ext.jetstream().await?;
let watcher_handle = partitioner.start(&js).await?;

Check work ownership and query active instances:

// In a watcher loop:
for key in &all_work_items {
if partitioner.is_my_work(key).await {
process_item(key).await;
}
}

// Get active instances:
let instances = partitioner.active_instances().await;

For a comprehensive guide to multi-instance design patterns, opt-in levels, and best practices, see Multi-Instance Extensions.

Error Handling

The library defines a comprehensive error type with variants for each failure mode:

use cirata_symphony::{Error, Result};

match ext.run().await {
Ok(()) => println!("Extension stopped normally"),
Err(Error::InvalidToken(msg)) => eprintln!("Bad token: {}", msg),
Err(Error::ConnectionFailed(msg)) => eprintln!("Connection failed: {}", msg),
Err(Error::AuthenticationFailed(msg)) => eprintln!("Auth failed: {}", msg),
Err(e) => eprintln!("Error: {}", e),
}
Error VariantDescription
InvalidTokenToken is malformed or cannot be decoded
AuthenticationFailedAPI key exchange failed
ConnectionFailedNATS connection could not be established
TimeoutRequest timed out
BucketNotFoundJetStream bucket does not exist
KeyNotFoundKey not found in bucket
InvalidConfigurationMissing required configuration

Reference Examples

  • extensions/rust/helloworld/ -- Tutorial progression (hello0-hello4)
  • extensions/rust/weather/ -- External API integration with React UI
  • extensions/rust/clock/ -- MCP App with time endpoint
  • extensions/rust/markdown_content/ -- Widget with JetStream KV storage
  • extensions/rust/demo_widgets/ -- Multi-widget dashboard with LDM polling
  • libs/rust/cirata_symphony/ -- Library source