Skip to main content

Python Extension Development

Cirata Symphony provides a Python library for developing extensions. Python extensions are lightweight, async-first, and well suited to rapid development, data processing, scripting, and integration with the Python ecosystem.

Getting Started

Prerequisites

  • Python 3.10 or later is required
  • Ensure pip is up to date: pip install --upgrade pip

Installation

The Symphony Python library is distributed as a wheel package. Download it from the Languages and Libraries page or install it from the Symphony instance:

pip install cirata_symphony-<version>-py3-none-any.whl

The library provides the cirata.symphony module, which contains the Extension class used to build extensions.

Project Structure

A Python extension is typically a single directory containing a main script and any supporting files:

my_extension/
├── main.py # Extension entry point
├── handlers.py # Optional: microservice handler functions
├── pages/ # Optional: UI template files
│ ├── home.tsx
│ └── help.md
└── requirements.txt # Dependencies (includes cirata wheel)

Minimal Extension

The simplest possible extension connects to Symphony and publishes its availability:

import asyncio
from cirata import symphony

async def main():
ext = symphony.Extension("Hello World", "hello")

async with ext:
await ext.operate()

if __name__ == '__main__':
asyncio.run(main())

This extension will register with Symphony and remain connected until stopped with SIGINT (Ctrl-C). If no token is available, the extension logs an error with instructions and exits.

Extension Initialization

Constructor

The Extension constructor accepts the following parameters:

ext = symphony.Extension(
name="My Extension", # Required: human-readable name
prefix="my_ext", # Required: unique identifier for NATS subjects and routes
token=None, # Optional: API token (falls back to SYMPHONY_TOKEN env var)
description="Description" # Optional: describes the extension's purpose
)

The prefix is used to namespace NATS subjects, UI resource URIs, and route paths. It should be short, lowercase, and unique among extensions operating on the same Symphony instance.

Authentication

Extensions authenticate with a token obtained from an API Key. The token is resolved in the following order of precedence:

  1. The token parameter in the constructor
  2. The OS credential store (Keychain on macOS, Credential Manager on Windows, Secret Service on Linux) — from a previous provisioning run
  3. The SYMPHONY_TOKEN environment variable
  4. The REGISTRATION_TOKEN environment variable (one-time provisioning)

When REGISTRATION_TOKEN is set, the extension connects, automatically provisions a permanent API key with the extension's declared capabilities, and stores it in the OS credential store (or ~/.config/cirata/ as a fallback). Subsequent runs use the stored token automatically with no interaction required.

If SYMPHONY_TOKEN contains a registration token (detected by its JWT capabilities), it is treated as a registration token and provisioned in the same way.

If no token is available from any source, the extension logs an error with instructions and exits. Extensions never prompt for input on stdin.

Container and Kubernetes deployments

In Docker containers and Kubernetes pods the OS credential store is not available (no keychain daemon). The SDK detects this automatically and falls back to file-based storage, but the file is ephemeral and lost when the container restarts. To avoid re-provisioning on every restart:

  • Recommended: inject a pre-provisioned token via the SYMPHONY_TOKEN environment variable (e.g. from a Kubernetes Secret). No file is written and no credential store is needed.
  • Alternative: mount ~/.config/cirata/ on a persistent volume so an auto-provisioned token survives pod restarts.

Overriding the token-embedded address

When the hostname encoded in the token is not the right one to reach Symphony from where the extension runs, set the SYMPHONY_ADDRESS environment variable. It accepts a bare hostname, host:port, or a full URL with an explicit http or https scheme.

See SYMPHONY_ADDRESS for the full reference: when to use it, the three accepted forms, how scheme and port route to the HTTPS leg only, TLS SAN considerations, the plaintext-HTTP warning, and validation rules. The same variable behaves identically across the Python, Go, Java, and Rust SDKs.

Context Manager and Lifecycle

Extensions use Python's async context manager pattern to manage their connection lifecycle:

async with ext:
# Connection established, JetStream buckets initialized
# Configure endpoints, then operate
await ext.operate()

The async with block establishes the NATS connection and initializes JetStream buckets. The operate() method runs the main event loop, which:

  1. Registers the extension with the Symphony registry
  2. Publishes status updates every 30 seconds to prevent TTL expiration
  3. Waits for SIGINT to trigger a graceful shutdown
  4. Publishes a disconnect status on exit
note

All endpoint and storage setup that requires an active connection must occur inside the async with block. UI resources, routes, menus, and capabilities can be configured before it.

Capabilities

Capabilities declare what NATS subjects the extension needs to publish to or subscribe from. During auto-provisioning, these capabilities determine the default permissions embedded in the extension's API key. Users can later adjust these permissions in the Symphony UI on the API Keys page.

# Subscribe capabilities: subjects the extension listens on
ext.add_capability("sub", "extensions.my_ext", "My Extension")
ext.add_capability("sub", "extensions.my_ext.process", "Process data")
ext.add_capability("sub", "extensions.my_ext.query", "Query data")

# Publish capabilities: subjects the extension sends to
ext.add_capability("pub", "extensions.my_ext.processed", "Data processed event")

# Specific cross-extension service subjects are permitted when the
# registering user's RBAC role covers them.
ext.add_capability("pub", "extensions.intelligence.mcp", "Query Intelligence MCP tools")

Capability names use dot-separated hierarchies. Both subscribe and publish capabilities follow the extensions.<prefix> convention.

Wildcard capabilities using > can allow a range of subjects within the extension's own prefix (e.g. extensions.my_ext.>). Wildcards targeting another extension's prefix are rejected at provisioning time. To call a specific service on another extension, declare its full subject (e.g. extensions.intelligence.mcp).

User Interface

Extensions add UI components by registering resources, mapping them to routes, and adding menu items to the Symphony navigation. For detailed documentation on each resource type, see User Interfaces.

Resources

A resource is a named piece of UI content identified by a URI and a MIME type:

ext.add_resource("ui://my_ext/home", "text/symphony-jsx", page_content)
ext.add_resource("ui://my_ext/common", "text/symphony-module", shared_code)
ext.add_resource("ui://my_ext/dashboard", "text/html+symphony", html_content)
ext.add_htmx_resource("ui://my_ext/table", "pages/table")
ext.add_resource("ui://my_ext/help", "text/markdown", help_content)

Module resources (text/symphony-module) are not rendered directly—they are imported by text/symphony-jsx resources using import { ... } from '@symphony/extension/my_ext/common'.

HTMX resources (text/html+htmx) use a JSON config instead of HTML content—Symphony provides the HTMX shell with auth injection and theme sync. See HTML + HTMX for details.

See User Interfaces for a comparison of the available MIME types and when to use each one.

Routes

Routes map URL paths to resources so they appear as navigable pages:

ext.add_route("/my_ext", "ui://my_ext/home")

Nested routes create a hierarchy of pages under a parent path:

ext.add_route("/my_ext", "ui://my_ext/home", routes={
"/settings": {"uri": "ui://my_ext/settings"},
"/detail": {"uri": "ui://my_ext/detail"}
})

Menu items appear in the Symphony navigation sidebar:

ext.add_menu("main", "My Extension", "/my_ext", "fa-puzzle-piece")
ext.add_menu("tools", "Data Tools", "/my_ext/tools", "fa-wrench")

The parameters are the group name, display label, target route, and a FontAwesome icon class.

Widgets

Widgets are UI components available for use on the Symphony dashboard:

ext.add_widget("ui://my_ext/status", "System Status", "Real-time status")

Help Pages

Extension help pages should use the text/markdown MIME type and be routed under /help/extensions/<ExtensionName>. See Markdown Resources for details on frontmatter, template variables, and conventions.

import os

help_path = os.path.join(os.path.dirname(__file__), "pages", "help.md")
with open(help_path) as f:
help_content = f.read()

ext.add_resource("ui://my_ext/help", "text/markdown", help_content)
ext.add_route("/help/extensions/MyExtension", "ui://my_ext/help")

Microservice Endpoints

Endpoints expose functionality as NATS microservices that can be invoked by other extensions, the Symphony REST API, and the Symphony UI.

Adding Endpoints

Endpoints must be created inside the async with block after the connection is established:

async with ext:
if ext.token:
endpoints = await ext.add_endpoints(
name="MyService",
version="1.0.0",
description="My extension service"
)
group = endpoints.add_group(name="cirata.extensions.my_ext")
await group.add_endpoint(name="process", handler=process_handler)
await group.add_endpoint(name="query", handler=query_handler)

await ext.operate()

The endpoint group name determines the NATS subject prefix. An endpoint named process in the group cirata.extensions.my_ext listens on the subject cirata.extensions.my_ext.process.

Handler Functions

Handlers are async functions that receive a request object and must respond:

async def process_handler(req):
"""Process incoming data and return a result."""
try:
# req.data contains the request payload as bytes
input_data = req.data.decode('utf-8')
result = f"Processed: {input_data}"
return await req.respond(result.encode('utf-8'))
except Exception as e:
error = f"Error: {str(e)}"
return await req.respond(error.encode('utf-8'))

JSON Request/Response

Most handlers work with JSON-encoded data:

import json

async def query_handler(req):
"""Handle a JSON query and return JSON results."""
try:
request = json.loads(req.data.decode('utf-8'))
filter_value = request.get('filter', 'all')

results = await fetch_data(filter_value)

response = json.dumps({
"status": "success",
"count": len(results),
"items": results
})
return await req.respond(response.encode('utf-8'))
except json.JSONDecodeError:
error = json.dumps({"error": "Invalid JSON"})
return await req.respond(error.encode('utf-8'))
except Exception as e:
error = json.dumps({"error": str(e)})
return await req.respond(error.encode('utf-8'))
note

Every handler must call req.respond(). If a handler fails to respond, the caller will time out waiting for a reply.

OpenAPI Metadata

To expose services as REST APIs through the Symphony server, include an OpenAPI specification in the endpoint metadata:

import json

with open("openapi.json") as f:
openapi_spec = f.read()

endpoints = await ext.add_endpoints(
name="MyService",
version="1.0.0",
metadata={"openapi": openapi_spec},
description="My extension service"
)

When an OpenAPI specification is present, Symphony routes HTTP requests received at the server to the corresponding NATS endpoint based on the path definitions in the specification.

REST API Response Format

When endpoints are exposed as REST APIs, handlers can respond in two ways:

Raw response (simplest)—respond with the content directly. Symphony auto-detects the content type: valid JSON is served as application/json, anything else as text/plain.

async def list_items(req):
items = await fetch_items()
await req.respond(json.dumps(items).encode("utf-8"))

async def ping(req):
await req.respond(b"pong")

Wrapped response (full control)—for control over the HTTP status code, content type, or response headers, wrap the response in a JSON object with a body field:

async def create_item(req):
request_data = json.loads(req.data.decode("utf-8"))
item = await store_item(request_data)

response = json.dumps({
"body": {"id": item["id"], "status": "created"},
"statusCode": 201,
"contentType": "application/json"
})
await req.respond(response.encode("utf-8"))
FieldRequiredDescription
bodyYesThe response payload. For JSON content types, any JSON value. For other types (e.g. text/plain), a string.
statusCodeNoHTTP status code (100–599). Defaults to 200.
contentTypeNoMIME type. Defaults to application/json.
headersNoAdditional HTTP headers as a string-to-string object.

Both camelCase (statusCode, contentType) and lowercase (statuscode, contenttype) field names are accepted.

Symphony detects the format by checking whether the response is a JSON object with a body key. Both formats work identically whether the endpoint is called through the REST API or directly over NATS.

Storage

Extensions can use JetStream key-value stores for persistent data. Buckets are declared during initialization and accessed inside the async with block.

Declaring Buckets

ext.add_bucket("my_ext_data", "Extension data", ttl=0)
ext.add_bucket("my_ext_cache", "Temporary cache", ttl=300)

# With lifecycle properties
ext.add_bucket(
"my_ext_events",
"Event log",
ttl=86400,
max_history_per_key=10,
max_bucket_size=1073741824 # 1 GB
)
ParameterDescription
ttlTime-to-live in seconds. 0 means entries do not expire.
max_history_per_keyMaximum number of historical revisions retained per key. 0 or omitted uses the server default (typically 1).
max_bucket_sizeMaximum total size of the bucket in bytes. When exceeded, the oldest entries are discarded. 0 or omitted means no limit.

The platform creates (or updates) buckets automatically when the extension registers, so you do not need to create them manually. Re-registering with changed parameters updates the bucket configuration without losing existing data. Declared buckets inherit the cluster's replica factor (R=3 on 3–4 node clusters, R=5 on 5+ nodes, R=1 standalone) — no action required.

If you create buckets at runtime rather than via add_bucket, fetch the current replica count from Symphony and pass it to create_key_value explicitly. Otherwise nats-py defaults to R=1 and the bucket is not replicated:

import json
msg = await ext.nc.request("cirata.services.cluster.info", b"", timeout=2)
replicas = json.loads(msg.data).get("replicas", 1)

await ext.js.create_key_value(
bucket="runtime_data",
description="Created on demand",
replicas=replicas,
)

Key-Value Operations

async def storage_example(ext):
js = ext.js
kv = await js.key_value("my_ext_data")

# Store a value
await kv.put("config:theme", b'{"mode": "dark"}')

# Retrieve a value
entry = await kv.get("config:theme")
value = json.loads(entry.value.decode('utf-8'))

# List all keys
keys = await kv.keys()
for key in keys:
print(f"Key: {key}")

# Delete a key
await kv.delete("config:theme")

Batch Write

When writing many keys at once, use the kv_batch utility to publish all entries concurrently instead of calling put() in a loop. Each individual put() is a synchronous round-trip; the batch API fires all publishes via asyncio.gather and collects acks in parallel.

from cirata.symphony.kv_batch import put_all

entries = {
item["id"]: json.dumps(item).encode("utf-8")
for item in items
}

js = ext.js
result = await put_all(js, "my_ext_data", entries)

print(f"Batch write: {result.succeeded} succeeded, "
f"{result.failed} failed out of {result.total}")

for key, error in result.errors.items():
print(f"Failed key '{key}': {error}")

The batch API never fails fast—a single bad key does not prevent the remaining entries from being written. BatchPutResult.errors maps failed key names to their exceptions. Values written this way are fully readable via standard kv.get().

JSON Storage Helpers

A common pattern is to wrap key-value operations with JSON serialization:

import json

async def store_json(kv, key, data):
await kv.put(key, json.dumps(data).encode('utf-8'))

async def get_json(kv, key):
entry = await kv.get(key)
return json.loads(entry.value.decode('utf-8'))

Usage Reporting

Extensions that consume licensed resources should report their usage to Symphony for licensing and chargeback. The report_usage method sends dimension measurements to Symphony, which converts them into license units using the rates embedded in the active license.

Reporting Usage

response = await ext.report_usage({
"tables_replicated": 5,
"bytes_transferred_gb": 120.5,
})

The dimensions dictionary contains named measurements specific to your extension. The keys must match the dimension names defined in the license's unit rates for your extension prefix. If any dimension key is not covered by a license rate, Symphony will add your extension to the disabled_extensions list (per-extension enforcement).

Response

The response is a dictionary containing:

FieldDescription
result"ok" on success
unitsThe computed license units consumed by this report
enforcementCurrent enforcement state object

Checking Enforcement State

The enforcement state in the response tells your extension whether it should continue operating:

response = await ext.report_usage({"queries_executed": 100})

enforcement = response.get("enforcement", {})
status = enforcement.get("status", "enforcement_ok")

if status == "enforcement_enforced":
logging.warning("License enforcement active — pausing operations")
# Stop or reduce licensed operations

Subscribing to Enforcement State

In addition to checking the enforcement state from each report_usage response, your extension can subscribe to the enforcement broadcast subject to receive updates in real time—including changes triggered by other extensions, license uploads, or administrative actions:

import json

ENFORCEMENT_SUBJECT = "cirata.symphony.enforcement"

async def on_enforcement(msg):
state = json.loads(msg.data)
status = state.get("status", "enforcement_ok")
message = state.get("message", "")

if status == "enforcement_enforced":
logging.warning("Enforcement active: %s — pausing operations", message)
# Stop or reduce licensed operations
elif status == "enforcement_warning":
logging.info("Enforcement warning: approaching license limit")

# Subscribe during extension setup
sub = await ext.conn.subscribe(ENFORCEMENT_SUBJECT, cb=on_enforcement)

You can also fetch the current enforcement state on demand without reporting usage—for example, when your extension first starts:

reply = await ext.conn.request(
"cirata.services.usage.enforcement",
json.dumps({}).encode("utf-8"),
timeout=5,
)
state = json.loads(reply.data)
logging.info("Current enforcement status: %s", state.get("status"))

Enforcement States

StatusMeaning
enforcement_okNormal operation—units available
enforcement_warning90% or more of licensed units consumed
enforcement_graceUnits exhausted or license expired, within vendor-set grace period
enforcement_enforcedGrace period expired or no valid license—stop licensed operations

Extensions may also be individually disabled via the disabled_extensions field, or flagged as grace-only via the grace_extensions field, even when the global status is enforcement_ok. Check both lists for your extension prefix to determine your operational state.

Attribution

When an extension reports usage, Symphony checks whether any attribution rules match the extension prefix and reporting account. If a rule matches, the usage record is automatically assigned to a business unit with attribution method auto. Administrators can also manually attribute records or override automatic attributions via the Attribution UI.

When to Report

Report usage at meaningful checkpoints—for example, after completing a replication batch, processing a set of queries, or transferring a block of data. Avoid reporting on every individual operation if the volume is very high; instead, batch measurements and report periodically.

# Example: report after each replication batch
async def replicate_batch(ext, tables):
results = await do_replication(tables)

response = await ext.report_usage({
"tables_replicated": len(results),
"bytes_transferred_gb": sum(r.bytes for r in results) / 1e9,
})

# Check enforcement state after reporting
enforcement = response.get("enforcement", {})
if enforcement.get("status") == "enforcement_enforced":
logging.warning("License limit reached — stopping replication")
return

See Usage Tracking for how administrators view and manage reported usage.

Dynamic Updates

Extensions can update their resources at runtime without restarting. This is useful for dashboards that reflect changing state:

async def update_status_page(ext, new_content):
await ext.update_resource(
"ui://my_ext/status",
"text/symphony-jsx",
new_content
)

The update_resource method replaces the resource content and publishes an extension status update so that Symphony reflects the changes immediately.

Inter-Extension Communication

Extensions can invoke services provided by other extensions through the client interface:

async with ext:
if ext.token:
client = ext.common.client

# Request-reply to another extension's service
response = await client.request(
subject="cirata.extensions.other.process",
payload=json.dumps({"input": "data"}).encode('utf-8'),
timeout=5.0
)
result = json.loads(response.data.decode('utf-8'))

# Fire-and-forget event notification
await client.publish(
subject="extensions.my_ext.completed",
payload=json.dumps({"job_id": "abc123"}).encode('utf-8')
)

NATS Request Timeouts

The timeout parameter is optional and defaults to 0.5 seconds if omitted. The default is intentionally short—asyncio.TimeoutError will be raised for any call that takes more than half a second. Always set an explicit value appropriate to the expected latency of the service you are calling.

Timeout parameter

response = await client.request(
subject="cirata.extensions.other_ext.query",
payload=json.dumps({"filter": "active"}).encode('utf-8'),
timeout=5.0 # seconds; optional, defaults to 0.5
)
result = json.loads(response.data.decode('utf-8'))

Outbound requests from inside a handler

Handlers have access to the NATS connection via req.nc, which can be used to call other services:

import nats.errors

async def enrich_handler(req):
"""Calls another service and merges the result."""
try:
# Make an outbound request to another extension's service
reply = await req.nc.request(
"cirata.extensions.catalog.lookup",
req.data,
timeout=5.0
)
enriched = json.loads(reply.data.decode('utf-8'))
return await req.respond(json.dumps(enriched).encode('utf-8'))

except nats.errors.NoRespondersError:
# No service is listening on that subject
error = json.dumps({"error": "catalog service unavailable"})
return await req.respond(error.encode('utf-8'))

except asyncio.TimeoutError:
# Service did not respond within the timeout window
error = json.dumps({"error": "catalog service timed out"})
return await req.respond(error.encode('utf-8'))

Choosing a timeout value

ScenarioSuggested timeout
In-process KV or NATS round-trip2–3 s
Extension-to-extension service call5 s
Long-running computation30–60 s
Fire-and-forget (no reply needed)use publish() instead
warning

Handlers that call other services with req.nc.request() must always handle asyncio.TimeoutError and nats.errors.NoRespondersError, or the caller will receive a timeout when the inner call stalls. The outer request timeout is always shorter than it looks—Symphony routes the HTTP request to the extension over NATS, so the total round-trip budget includes both the inbound and the outbound leg.

Error Handling

Extension Lifecycle

Wrap the extension lifecycle in a try/except to handle initialization failures:

async def main():
ext = symphony.Extension("My Extension", "my_ext")
# ... configure capabilities, resources, etc.

try:
async with ext:
if ext.token:
# ... add endpoints
pass
await ext.operate()
except RuntimeError as e:
logging.error(f"Extension failed: {e}")
except KeyboardInterrupt:
logging.info("Shutdown requested")

Handler Errors

Always handle exceptions in endpoint handlers and respond with an appropriate error. A handler that raises an unhandled exception will not send a response, causing the caller to time out.

async def safe_handler(req):
try:
data = json.loads(req.data.decode('utf-8'))
result = process(data)
return await req.respond(
json.dumps({"status": "success", "result": result}).encode('utf-8')
)
except json.JSONDecodeError:
return await req.respond(
json.dumps({"error": "Invalid JSON input"}).encode('utf-8')
)
except Exception as e:
return await req.respond(
json.dumps({"error": str(e)}).encode('utf-8')
)

Logging

Extensions should use the SDK's configure_logging() helper for structured JSON output to stdout. Call it once at module level, before any other logging:

from cirata.symphony.logging import configure_logging

configure_logging("myext")

This configures the root Python logger to emit JSON with consistent fields (timestamp, level, extension, message, trace_id, span_id) to stdout — captured automatically by journald or the container runtime. Do not create log files or call logging.basicConfig().

Use standard logging calls throughout your extension:

import logging

logger = logging.getLogger("myext")
logger.info("Processing batch", extra={"items": 42})
logger.error("Query failed", exc_info=True)

Observability

Extensions can participate in Symphony's unified telemetry system by enabling observability. This registers NATS endpoints for metrics, logs, and traces that the Observability Extension collects automatically.

from cirata.symphony.observability import Observability, SpanKind

async with ext:
svc = await ext.add_endpoints("myext", "1.0")
obs = await Observability.enable(ext, svc, "myext")

# Feed logs into the OTLP pipeline (in addition to stdout)
import logging
logging.getLogger().addHandler(obs.logs)

# Instrument handlers with automatic trace propagation
@obs.instrumented_handler("process")
async def handle_process(msg):
obs.increment_counter("requests.total")
result = process(msg.data)
await msg.respond(result)

Adding obs.logs as a handler on the root logger means every log call flows to both stdout (via configure_logging) and the OTLP buffer (via obs.logs). Do not call logger.setLevel() after configure_logging() — the level is already set.

For complete coverage of traces, metrics, logs, and distributed tracing patterns, see Observability.

Multi-Instance Support

Python extensions can run as multiple instances of the same extension type for scalability, fault tolerance, and workload distribution. The SDK provides built-in support for work partitioning, queue-based command delivery, and instance-specific messaging.

Automatic Load Balancing

Microservice endpoints registered with add_endpoints are automatically load-balanced across all instances by NATS micro. No code changes are needed—deploying additional instances immediately distributes incoming service requests across them.

The SDK also uses queue subscribe for the commands channel, ensuring that each command is processed by exactly one instance.

WorkPartitioner

For extensions that run background tasks (watchers, monitors, replicators), enable work partitioning to distribute work items across instances using consistent hashing.

def on_rebalance(total_instances, my_item_count):
logging.info(
f"Rebalanced: {total_instances} instances active, "
f"I own {my_item_count} items"
)

ext.enable_work_partitioning(on_rebalance)

Use is_my_work to check whether the current instance should process a given work item:

# In a bucket watcher loop
for key in discovered_items:
if ext.work_partitioner.is_my_work(key):
await process_item(key)
else:
logging.debug(f"Skipping {key} — owned by another instance")

The partitioner watches the status bucket for instance join and leave events. When the set of active instances changes, the rebalance callback fires and is_my_work results may change. Extensions should re-evaluate their active work items on each watcher iteration rather than caching ownership decisions.

Instance-Specific Handlers

Register handlers that are targeted at a specific instance using add_instance_handler. Each instance subscribes to cirata.extensions.{prefix}.instance.{instance_id}.>, and only the targeted instance receives the request:

async def instance_status(req):
status = {
"instance_id": ext.instance_id,
"work_items": ext.work_partitioner.my_item_count,
}
await req.respond(json.dumps(status).encode("utf-8"))

ext.add_instance_handler("local-status", instance_status)

Callers can target a specific instance via the HTTP API by adding the ?instance=<id> query parameter to the service proxy URL.

Instance ID

The extension's instance ID is derived from the JWT identifier in the API token. Access it with:

instance_id = ext.instance_id

Use the instance ID to tag operational records in shared storage for debugging and traceability.

For a comprehensive guide to multi-instance design patterns, opt-in levels, and best practices, see Multi-Instance Extensions.

Complete Example

The following example demonstrates a complete Python extension with capabilities, UI, microservice endpoints, and storage:

"""
Copyright 2025 Cirata.
"""

import asyncio
import json
import logging
import os
from cirata import symphony
from cirata.symphony.logging import configure_logging

configure_logging("catalog")

# --- Endpoint Handlers ---

async def list_handler(req):
"""Return all items from storage."""
try:
kv = await req.nc.jetstream().key_value("catalog_items")
items = []
for key in await kv.keys():
entry = await kv.get(key)
items.append(json.loads(entry.value.decode('utf-8')))
return await req.respond(json.dumps(items).encode('utf-8'))
except Exception as e:
return await req.respond(
json.dumps({"error": str(e)}).encode('utf-8')
)

async def add_handler(req):
"""Add an item to storage."""
try:
item = json.loads(req.data.decode('utf-8'))
item_id = item.get('id', str(hash(json.dumps(item))))
kv = await req.nc.jetstream().key_value("catalog_items")
await kv.put(f"item:{item_id}", json.dumps(item).encode('utf-8'))
return await req.respond(
json.dumps({"status": "created", "id": item_id}).encode('utf-8')
)
except Exception as e:
return await req.respond(
json.dumps({"error": str(e)}).encode('utf-8')
)

# --- UI Content ---

page_content = """
import { useContext, useEffect, useState } from 'react'
import { SymphonyContext } from '@symphony'

const TIMEOUT = 5000

export default function CatalogPage() {
const symphonyContext = useContext(SymphonyContext)
const [items, setItems] = useState([])

useEffect(() => {
symphonyContext.setTitle({
"/": "Home",
"/catalog": "Catalog"
})
loadItems()
}, [])

const loadItems = async () => {
if (symphonyContext.nc === undefined) return
try {
const reply = await symphonyContext.nc.request(
'cirata.extensions.catalog.list',
'',
{ timeout: TIMEOUT }
)
const text = new TextDecoder('utf-8').decode(reply.data)
setItems(JSON.parse(text))
} catch (err) {
console.error('Failed to load items:', err)
}
}

return (
<div>
<h1>Catalog</h1>
<button onClick={loadItems}>Refresh</button>
<ul>
{items.map((item, i) => (
<li key={i}>{item.name || JSON.stringify(item)}</li>
))}
</ul>
</div>
)
}
"""

# --- Main ---

async def main():
ext = symphony.Extension(
name="Catalog",
prefix="catalog",
description="Manages a catalog of items"
)

# Capabilities
ext.add_capability("sub", "extensions.catalog", "Catalog Extension")
ext.add_capability("sub", "extensions.catalog.list", "List items")
ext.add_capability("sub", "extensions.catalog.add", "Add items")

if ext.token:
# UI resources and navigation
ext.add_resource("ui://catalog/home", "text/symphony-jsx", page_content)
ext.add_route("/catalog", "ui://catalog/home")
ext.add_menu("main", "Catalog", "/catalog", "fa-book")

# Help page
help_path = os.path.join(os.path.dirname(__file__), "pages", "help.md")
if os.path.exists(help_path):
with open(help_path) as f:
ext.add_resource("ui://catalog/help", "text/markdown", f.read())
ext.add_route("/help/extensions/Catalog", "ui://catalog/help")

# Storage buckets
ext.add_bucket("catalog_items", "Catalog item storage")

# Run extension
try:
async with ext:
if ext.token:
endpoints = await ext.add_endpoints(
name="catalog",
version="1.0.0",
description="Catalog service"
)
group = endpoints.add_group(name="cirata.extensions.catalog")
await group.add_endpoint(name="list", handler=list_handler)
await group.add_endpoint(name="add", handler=add_handler)

await ext.operate()
except RuntimeError as e:
logging.error(f"Unable to initialize: {e}")

if __name__ == '__main__':
asyncio.run(main())