Rust Extension Development
Cirata Symphony provides a Rust library for developing extensions. Rust extensions are well suited to high-performance, low-latency workloads, real-time data processing, and environments where memory safety and minimal runtime overhead are priorities.
Getting Started
Installation
The Symphony Rust library is distributed as a source archive.
Download it from the Languages and Libraries
page, then extract and reference it as a path dependency in your
Cargo.toml:
# Extract the library
tar xf cirata-symphony-<version>-rust-src.tar
[dependencies]
cirata-symphony = { path = "cirata_symphony" }
tokio = { version = "1", features = ["full"] }
serde_json = "1.0"
futures = "0.3"
tracing-subscriber = "0.3"
The library depends on async-nats for NATS connectivity, reqwest
for HTTP, and tokio as the async runtime. All transitive
dependencies are listed in the included Cargo.toml and pinned in
Cargo.lock.
The library requires Rust 1.96 or later.
When building within the Symphony source tree using Bazel,
depend on //libs/rust/cirata_symphony and reference crates from
the @crates repository.
Project Structure
A Rust extension is typically a single binary crate with any supporting files:
my_extension/
+-- src/
| +-- main.rs # Extension entry point
| +-- handlers.rs # Optional: handler functions
| +-- pages/ # Optional: UI template files
| +-- home.tsx
| +-- help.md
+-- Cargo.toml # Crate manifest
+-- BUILD.bazel # Bazel build configuration (optional)
Minimal Extension
The simplest possible extension connects to Symphony and publishes its availability:
use cirata_symphony::Extension;
use std::env;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let token = env::var("SYMPHONY_TOKEN")
.expect("SYMPHONY_TOKEN is required");
let ext = Extension::new("Hello World", "hello")
.with_description("A simple Rust extension")
.with_token(token);
ext.run().await?;
Ok(())
}
This extension registers with Symphony and remains connected until stopped with Ctrl+C (SIGINT). The status is published every 30 seconds to keep the extension's entry alive.
Extension Initialization
Builder Pattern
The Extension type uses a builder pattern for configuration:
let ext = Extension::new("My Extension", "my_ext")
.with_description("Provides data processing")
.with_token(token);
| Method | Required | Description |
|---|---|---|
new(name, prefix) | Yes | Human-readable name and unique prefix |
with_description(desc) | No | Description of the extension's purpose |
with_token(token) | Yes | API token (typically from SYMPHONY_TOKEN env var) |
Authentication
Extensions authenticate with a token obtained from an API Key. The token is resolved in the following order of precedence:
- The
with_token()builder method - The OS credential store (Keychain on macOS, Credential Manager on Windows, Secret Service on Linux) — from a previous provisioning run
- The
REGISTRATION_TOKENenvironment variable (one-time provisioning)
let token = env::var("SYMPHONY_TOKEN").unwrap_or_default();
When REGISTRATION_TOKEN is set, the extension connects, automatically
provisions a permanent API key with the extension's declared
capabilities, and stores it in the OS credential store (or
~/.config/cirata/ as a fallback). Subsequent runs use the stored
token automatically with no interaction required.
If no token is available from any source, the extension logs an error with instructions and exits. Extensions never prompt for input on stdin.
Container and Kubernetes deployments
In Docker containers and Kubernetes pods the OS credential store is not available (no keychain daemon). The SDK detects this automatically and falls back to file-based storage, but the file is ephemeral and lost when the container restarts. To avoid re-provisioning on every restart:
- Recommended: inject a pre-provisioned token via the
SYMPHONY_TOKENenvironment variable (e.g. from a Kubernetes Secret). No file is written and no credential store is needed. - Alternative: mount
~/.config/cirata/on a persistent volume so an auto-provisioned token survives pod restarts.
The library decodes the hostname from the token, exchanges it for NATS credentials via HTTPS, and establishes an authenticated connection using JWT + NKey signature authentication.
Overriding the token-embedded address
When the hostname encoded in the token is not the right one to reach
Symphony from where the extension runs, set the
SYMPHONY_ADDRESS environment variable. It accepts a bare hostname,
host:port, or a full URL with an explicit http or https scheme.
See SYMPHONY_ADDRESS for the full reference: when to use it, the three accepted forms, how scheme and port route to the HTTPS leg only, TLS SAN considerations, the plaintext-HTTP warning, and validation rules. The same variable behaves identically across the Rust, Python, Go, and Java SDKs.
Lifecycle
Simple Run
The run() method starts the extension and blocks until Ctrl+C:
ext.run().await?;
run() performs the following steps:
- Establishes a NATS connection to Symphony
- Registers the extension in the Symphony registry
- Starts periodic status publishing (every 30 seconds)
- Blocks until SIGINT is received
- Publishes a disconnect status and exits
Run with Setup
For extensions that need microservice endpoints, use
run_with_setup(). The callback is invoked after the connection is
established but before entering the event loop:
ext.run_with_setup(|ext| Box::pin(async move {
// Set up endpoints here (connection is available)
let svc = ext.add_endpoints("MyService", "1.0.0",
"My service", None).await?;
// ... register endpoint handlers ...
Ok(())
})).await?;
Capabilities
Capabilities declare what NATS subjects the extension needs to publish to or subscribe from. During auto-provisioning, these capabilities determine the default permissions embedded in the extension's API key. Users can later adjust these permissions in the Symphony UI on the API Keys page.
ext.add_capability("sub", "extensions.my_ext", "My Extension").await;
ext.add_capability("sub", "extensions.my_ext.process", "Process data").await;
ext.add_capability("pub", "events.my_ext", "Emit events").await;
| Parameter | Description |
|---|---|
cap_type | "pub" for publish or "sub" for subscribe |
key | NATS subject pattern |
description | Human-readable description |
User Interface
Rust extensions register UI resources with add_resource, mapping
a URI to a MIME type and content string:
ext.add_resource(
"ui://my_ext/home",
MimeType::TextSlashSymphonyJsx,
jsx_content,
).await;
ext.add_resource(
"ui://my_ext/common",
MimeType::TextSlashSymphonyModule,
shared_code,
).await;
ext.add_resource(
"ui://my_ext/dashboard",
MimeType::TextSlashHtmlPlusSymphony,
html_content,
).await;
ext.add_resource(
"ui://my_ext/help",
MimeType::TextSlashMarkdown,
help_content,
).await;
// HTMX resource — Symphony provides shell with HTMX, auth, and theme
ext.add_htmx_resource("ui://my_ext/table", "pages/table").await;
Module resources (MimeType::TextSlashSymphonyModule) are imported by
JSX resources using
import { ... } from '@symphony/extension/my_ext/common'.
HTMX resources use a JSON config instead of HTML content—Symphony provides the HTMX shell with auth injection and theme sync. See HTML + HTMX for details.
For resources that need additional metadata (name, description, CSP):
ext.add_resource_full(
"ui://my_ext/app",
MimeType::TextSlashHtmlSemicolonProfileEqualMcpApp,
app_html,
Some("My App".to_string()),
Some("Interactive application".to_string()),
None, // optional ResourceBaseMeta
).await;
Routes map URL paths to resources:
ext.add_route("/my_ext", "ui://my_ext/home").await;
ext.add_route("/my_ext/dashboard", "ui://my_ext/dashboard").await;
For detailed documentation on each resource type and its capabilities, see User Interfaces.
Including Content from Files
Use Rust's include_str! macro to embed UI content at compile time:
const HOME_PAGE: &str = include_str!("pages/home.tsx");
const HELP_PAGE: &str = include_str!("pages/help.md");
ext.add_resource("ui://my_ext/home", MimeType::TextSlashSymphonyJsx, HOME_PAGE).await;
This keeps UI content in separate files while including them in the binary at build time.
Menus
Extensions add navigation menu items using the add_menu method:
ext.add_menu("main", "My Extension", "/my_ext", "fa-puzzle-piece").await;
ext.add_menu("tools", "Data Tools", "/my_ext/tools", "fa-wrench").await;
| Parameter | Description |
|---|---|
group | Menu group name (e.g., "main", "tools") |
item | Display label for the menu entry |
route | Target route path |
icon | FontAwesome icon class |
For menu items with external links:
ext.add_menu_with_link(
"main", "Docs", "/docs", "fa-book",
Some("https://docs.example.com".to_string()),
).await;
Widgets
Register dashboard widgets for display in the widget picker:
ext.add_widget("Status", "ui://my_ext/status").await;
// With notes/description
ext.add_widget_with_notes(
"Status",
"ui://my_ext/status",
Some("Real-time status monitor".to_string()),
).await;
Microservice Endpoints
Extensions expose functionality as NATS microservices using
add_endpoints inside run_with_setup.
Creating a Service
use cirata_symphony::Extension;
use futures::StreamExt;
use serde_json::json;
let ext = Extension::new("My Extension", "my_ext")
.with_token(token);
ext.run_with_setup(|ext| Box::pin(async move {
let svc = ext.add_endpoints(
"MyService", "1.0.0", "My data service", None,
).await?;
let group = svc.group("cirata.extensions.my_ext");
let mut endpoint = group.endpoint("process").await
.map_err(|e| cirata_symphony::Error::Other(e.to_string()))?;
tokio::spawn(async move {
while let Some(request) = endpoint.next().await {
let input: serde_json::Value =
serde_json::from_slice(&request.message.payload)
.unwrap_or(json!({}));
let result = json!({"status": "success", "input": input});
let _ = request.respond(
Ok(serde_json::to_vec(&result).unwrap().into())
).await;
}
});
Ok(())
})).await?;
The group name determines the NATS subject prefix. An endpoint named
process in group cirata.extensions.my_ext listens on subject
cirata.extensions.my_ext.process.
Handler Pattern
Endpoints are streams of requests. Each request must be responded to:
tokio::spawn(async move {
while let Some(request) = endpoint.next().await {
// Parse input
let input: serde_json::Value =
serde_json::from_slice(&request.message.payload)
.unwrap_or(json!({}));
// Process and respond
let result = json!({"status": "success", "data": input});
let _ = request.respond(
Ok(serde_json::to_vec(&result).unwrap().into())
).await;
}
});
Key rules for handlers:
- Always respond. Every request must receive a response via
request.respond(). A handler that does not respond causes the caller to time out. - Return appropriate empty values. Use
json!({})for single-object endpoints andjson!([])for list endpoints. - Handle errors gracefully. Respond with an error payload rather than silently dropping the request.
OpenAPI Metadata
Pass an OpenAPI spec as service metadata to enable REST API exposure and documentation:
use std::collections::HashMap;
let metadata: HashMap<String, String> = [(
"openapi".to_string(),
include_str!("openapi.yaml").to_string(),
)].into_iter().collect();
let svc = ext.add_endpoints(
"MyService", "1.0.0", "My service", Some(metadata),
).await?;
REST Request Parsing
The decorators module provides helpers for parsing REST API
requests, equivalent to Python's @rest decorator:
use cirata_symphony::decorators::{parse_rest_request, rest_response, rest_error};
tokio::spawn(async move {
while let Some(request) = endpoint.next().await {
let response = match parse_rest_request(&request.message.payload) {
Ok(rest_req) => {
let name = rest_req.body
.flatten()
.and_then(|b| b.get("name").and_then(|v| v.as_str()).map(String::from))
.unwrap_or_else(|| "World".to_string());
rest_response(200, json!({"message": format!("Hello, {}!", name)}))
}
Err(e) => rest_error(400, &format!("Invalid request: {}", e)),
};
let _ = request.respond(
Ok(serde_json::to_vec(&response).unwrap().into())
).await;
}
});
| Function | Description |
|---|---|
parse_rest_request(payload) | Parse bytes into RestApiRequest with method, url, body, path/query params |
rest_response(status, body) | Create a wrapped REST response with status code |
rest_error(status, message) | Create an error REST response |
REST API Response Format
When endpoints are exposed as REST APIs (via an OpenAPI spec in the service metadata), handlers can respond in two ways:
Raw response (simplest) -- respond with the content directly.
Symphony auto-detects the content type: valid JSON is served as
application/json, anything else as text/plain.
let result = json!({"items": [{"id": "1", "name": "Widget"}]});
let _ = request.respond(Ok(serde_json::to_vec(&result).unwrap().into())).await;
Wrapped response (full control) -- for control over the HTTP status code, content type, or response headers:
let response = json!({
"body": {"id": item_id, "status": "created"},
"statuscode": 201,
"contenttype": "application/json",
});
let _ = request.respond(Ok(serde_json::to_vec(&response).unwrap().into())).await;
| Field | Required | Description |
|---|---|---|
body | Yes | The response payload |
statuscode | No | HTTP status code (100-599). Defaults to 200 |
contenttype | No | MIME type. Defaults to application/json |
headers | No | Additional HTTP headers as a string-to-string map |
Storage
Rust extensions use the NATS JetStream key-value API for persistent storage.
Declaring Buckets
Declare buckets during extension setup:
ext.add_bucket("my_ext_data", "Extension data", 0).await;
ext.add_bucket("my_ext_events", "Event log with 24h TTL", 86400).await;
| Parameter | Description |
|---|---|
name | Bucket name |
description | Human-readable description |
ttl | Time-to-live in seconds. 0 means entries do not expire |
The platform creates or updates buckets when the extension registers. Declared buckets inherit the cluster's replica factor (R=3 on 3–4 node clusters, R=5 on 5+ nodes, R=1 standalone) — no action required.
Rust extensions that create buckets at runtime via async-nats
directly should request cirata.services.cluster.info (JSON response
{"replicas": N}) and set the replica count on their
CreateKeyValue config. Without this, async-nats defaults to R=1
and the bucket is not replicated.
Accessing Buckets
Use the JetStream context from the extension to access key-value stores:
// Inside run_with_setup or after initialization
let js = ext.jetstream().await?;
let kv = js.get_key_value("my_ext_data").await
.map_err(|e| cirata_symphony::Error::Other(e.to_string()))?;
Basic Operations
use bytes::Bytes;
// Store a value
kv.put("item:123", Bytes::from(r#"{"name": "Widget"}"#)).await?;
// Retrieve a value
if let Some(entry) = kv.get("item:123").await? {
let value = String::from_utf8_lossy(&entry);
println!("Value: {}", value);
}
// Delete a key
kv.delete("item:123").await?;
Batch Write
When writing many keys at once, use the kv_batch module to publish
all entries concurrently instead of calling put() in a loop. Each
individual put() is a synchronous round-trip; the batch API fires all
publishes via futures::future::join_all and collects acks in parallel.
use cirata_symphony::kv_batch::{put_all, BatchEntry};
use bytes::Bytes;
let entries: Vec<BatchEntry> = items.iter().map(|item| BatchEntry {
key: item.id.clone(),
value: Bytes::from(serde_json::to_vec(item).unwrap()),
}).collect();
let js = ext.jetstream().await?;
let result = put_all(&js, "my_ext_data", entries).await;
println!("Batch write: {} succeeded, {} failed out of {}",
result.succeeded, result.failed, result.total);
for (key, error) in &result.errors {
eprintln!("Failed key '{}': {}", key, error);
}
The batch API never fails fast—a single bad key does not prevent the
remaining entries from being written. BatchPutResult.errors maps
failed key names to error strings. Values written this way are fully
readable via standard kv.get().
Usage Reporting
Extensions that consume licensed resources should report their usage to Symphony for licensing and chargeback:
Reporting Usage
use std::collections::HashMap;
let dimensions: HashMap<String, f64> = [
("tables_replicated".into(), 5.0),
("bytes_transferred_gb".into(), 120.5),
].into_iter().collect();
// The second argument is an optional trace context for correlating the
// usage report with a distributed trace; pass None when not tracing.
let result = ext.report_usage(dimensions, None).await?;
println!("Usage reported: {:?}", result);
The dimension keys must match the dimension names defined in the license's unit rates for your extension prefix.
Enforcement State
Read the current enforcement state:
if let Some(state) = ext.get_enforcement_state().await? {
println!("Enforcement: {:?}", state);
}
| Status | Meaning |
|---|---|
enforcement_ok | Normal operation -- units available |
enforcement_warning | 90% or more of licensed units consumed |
enforcement_grace | Units exhausted or license expired, within grace period |
enforcement_enforced | Grace period expired -- stop licensed operations |
When to Report
Report usage at meaningful checkpoints -- for example, after completing a replication batch, processing a set of queries, or transferring a block of data. Avoid reporting on every individual operation; instead, batch measurements and report periodically.
See Usage Tracking for how administrators view and manage reported usage.
Connection Access
For advanced use cases, access the raw NATS connection and JetStream context:
// NATS connection (for raw pub/sub)
if let Some(conn) = ext.conn() {
let response = conn.request("some.subject", payload.into()).await?;
}
// JetStream context (for KV and object stores)
let js = ext.jetstream().await?;
// Extension identifier
if let Some(id) = ext.identifier() {
println!("Extension ID: {}", id);
}
Dynamic Resource Updates
Update resource content at runtime and re-publish to Symphony:
ext.update_resource(
"ui://my_ext/dashboard",
MimeType::TextSlashSymphonyJsx,
updated_content,
).await?;
This updates the resource in the extension info and re-registers the extension so the UI picks up the change immediately.
Using the Client Library
The Rust client can be used independently to interact with Symphony and invoke services provided by other extensions:
use cirata_symphony::Client;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new("your-token").await?;
// Get system info
let info = client.info().await?;
println!("Connected to: {:?}", info);
// List extensions
let extensions = client.extensions().await?;
println!("Extensions: {:?}", extensions);
// Call an extension service
let result = client.call_extension(
"my_ext", "process", json!({"input": "data"}),
).await?;
println!("Result: {:?}", result);
// Access storage
let value = client.storage(Some("my_bucket"), Some("my_key")).await?;
client.close().await?;
Ok(())
}
Logging
Use the tracing crate with the default tracing-subscriber for
structured log output to stdout. Call tracing_subscriber::fmt::init()
once at the start of main:
use tracing::{info, warn, error};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
info!(extension = "my_ext", "Extension started");
warn!("Cache miss rate is high");
error!(err = %e, "Query failed");
// ...
}
Each log line uses consistent field names (timestamp, level,
message) matching the other Symphony SDKs. Do not write log files
— stdout is captured automatically by journald (systemd) or the
container runtime (Docker/Kubernetes).
The OTLP log capture (via obs.log_capture()) is a separate concern
for the observability pipeline — see the next section.
Observability
Extensions can participate in Symphony's unified telemetry system by enabling observability. This registers NATS endpoints for metrics, logs, and traces that the Observability Extension collects automatically.
use cirata_symphony::observability::{Observability, SpanKind};
// Inside run_with_setup:
let obs = Observability::enable(&svc, "myext").await;
// Instrument handlers with trace propagation
let span = obs.start_child_span_from_request(
"process", SpanKind::Server, &request
);
obs.increment_counter("requests.total", 1);
// ... process ...
obs.end_span(span, HashMap::new());
For complete coverage of traces, metrics, logs, and distributed tracing patterns, see Observability.
Multi-Instance Support
Rust extensions can run as multiple instances of the same extension type. The SDK includes built-in support for queue-based command delivery and instance-specific NATS subjects.
Automatic Load Balancing
Microservice endpoints registered with add_endpoints are
automatically load-balanced across all instances by NATS micro. No
code changes are needed—deploying additional instances immediately
distributes incoming service requests across them.
The SDK uses queue subscribe for the commands channel, ensuring that each command is processed by exactly one instance.
Instance-Specific Subjects
Each instance subscribes to
cirata.extensions.{prefix}.instance.{instance_id}.> for messages
targeted at that specific instance. The instance ID is derived from
the JWT identifier in the API token and can be accessed with:
if let Some(id) = ext.identifier() {
println!("Instance ID: {}", id);
}
WorkPartitioner
The Rust SDK provides a WorkPartitioner for distributing background
work across instances using consistent hashing. Each instance calls
is_my_work to check whether it owns a given work item.
Create a WorkPartitioner via the Extension convenience method:
use std::sync::Arc;
let partitioner = ext.enable_work_partitioning(Some(Arc::new(|| {
println!("Instances changed, re-evaluating work assignments");
})));
Or create one directly:
use cirata_symphony::WorkPartitioner;
use std::sync::Arc;
let partitioner = WorkPartitioner::new(instance_id, prefix, Some(Arc::new(|| {
println!("Rebalanced");
})));
Start watching for instance changes (requires a JetStream context,
available from within run_with_setup):
let js = ext.jetstream().await?;
let watcher_handle = partitioner.start(&js).await?;
Check work ownership and query active instances:
// In a watcher loop:
for key in &all_work_items {
if partitioner.is_my_work(key).await {
process_item(key).await;
}
}
// Get active instances:
let instances = partitioner.active_instances().await;
For a comprehensive guide to multi-instance design patterns, opt-in levels, and best practices, see Multi-Instance Extensions.
Error Handling
The library defines a comprehensive error type with variants for each failure mode:
use cirata_symphony::{Error, Result};
match ext.run().await {
Ok(()) => println!("Extension stopped normally"),
Err(Error::InvalidToken(msg)) => eprintln!("Bad token: {}", msg),
Err(Error::ConnectionFailed(msg)) => eprintln!("Connection failed: {}", msg),
Err(Error::AuthenticationFailed(msg)) => eprintln!("Auth failed: {}", msg),
Err(e) => eprintln!("Error: {}", e),
}
| Error Variant | Description |
|---|---|
InvalidToken | Token is malformed or cannot be decoded |
AuthenticationFailed | API key exchange failed |
ConnectionFailed | NATS connection could not be established |
Timeout | Request timed out |
BucketNotFound | JetStream bucket does not exist |
KeyNotFound | Key not found in bucket |
InvalidConfiguration | Missing required configuration |
Reference Examples
extensions/rust/helloworld/-- Tutorial progression (hello0-hello4)extensions/rust/weather/-- External API integration with React UIextensions/rust/clock/-- MCP App with time endpointextensions/rust/markdown_content/-- Widget with JetStream KV storageextensions/rust/demo_widgets/-- Multi-widget dashboard with LDM pollinglibs/rust/cirata_symphony/-- Library source