Python Extension Development
Cirata Symphony provides a Python library for developing extensions. Python extensions are lightweight, async-first, and well suited to rapid development, data processing, scripting, and integration with the Python ecosystem.
Getting Started
Prerequisites
- Python 3.10 or later is required
- Ensure
pipis up to date:pip install --upgrade pip
Installation
The Symphony Python library is distributed as a wheel package. Download it from the Languages and Libraries page or install it from the Symphony instance:
pip install cirata_symphony-<version>-py3-none-any.whl
The library provides the cirata.symphony module, which contains
the Extension class used to build extensions.
Project Structure
A Python extension is typically a single directory containing a main script and any supporting files:
my_extension/
├── main.py # Extension entry point
├── handlers.py # Optional: microservice handler functions
├── pages/ # Optional: UI template files
│ ├── home.tsx
│ └── help.md
└── requirements.txt # Dependencies (includes cirata wheel)
Minimal Extension
The simplest possible extension connects to Symphony and publishes its availability:
import asyncio
from cirata import symphony
async def main():
ext = symphony.Extension("Hello World", "hello")
async with ext:
await ext.operate()
if __name__ == '__main__':
asyncio.run(main())
This extension will register with Symphony and remain connected until stopped with SIGINT (Ctrl-C). If no token is available, the extension logs an error with instructions and exits.
Extension Initialization
Constructor
The Extension constructor accepts the following parameters:
ext = symphony.Extension(
name="My Extension", # Required: human-readable name
prefix="my_ext", # Required: unique identifier for NATS subjects and routes
token=None, # Optional: API token (falls back to SYMPHONY_TOKEN env var)
description="Description" # Optional: describes the extension's purpose
)
The prefix is used to namespace NATS subjects, UI resource URIs, and
route paths. It should be short, lowercase, and unique among extensions
operating on the same Symphony instance.
Authentication
Extensions authenticate with a token obtained from an API Key. The token is resolved in the following order of precedence:
- The
tokenparameter in the constructor - The OS credential store (Keychain on macOS, Credential Manager on Windows, Secret Service on Linux) — from a previous provisioning run
- The
SYMPHONY_TOKENenvironment variable - The
REGISTRATION_TOKENenvironment variable (one-time provisioning)
When REGISTRATION_TOKEN is set, the extension connects, automatically
provisions a permanent API key with the extension's declared
capabilities, and stores it in the OS credential store (or
~/.config/cirata/ as a fallback). Subsequent runs use the stored
token automatically with no interaction required.
If SYMPHONY_TOKEN contains a registration token (detected by its JWT
capabilities), it is treated as a registration token and provisioned
in the same way.
If no token is available from any source, the extension logs an error with instructions and exits. Extensions never prompt for input on stdin.
Container and Kubernetes deployments
In Docker containers and Kubernetes pods the OS credential store is not available (no keychain daemon). The SDK detects this automatically and falls back to file-based storage, but the file is ephemeral and lost when the container restarts. To avoid re-provisioning on every restart:
- Recommended: inject a pre-provisioned token via the
SYMPHONY_TOKENenvironment variable (e.g. from a Kubernetes Secret). No file is written and no credential store is needed. - Alternative: mount
~/.config/cirata/on a persistent volume so an auto-provisioned token survives pod restarts.
Overriding the token-embedded address
When the hostname encoded in the token is not the right one to reach
Symphony from where the extension runs, set the
SYMPHONY_ADDRESS environment variable. It accepts a bare hostname,
host:port, or a full URL with an explicit http or https scheme.
See SYMPHONY_ADDRESS for the full reference: when to use it, the three accepted forms, how scheme and port route to the HTTPS leg only, TLS SAN considerations, the plaintext-HTTP warning, and validation rules. The same variable behaves identically across the Python, Go, Java, and Rust SDKs.
Context Manager and Lifecycle
Extensions use Python's async context manager pattern to manage their connection lifecycle:
async with ext:
# Connection established, JetStream buckets initialized
# Configure endpoints, then operate
await ext.operate()
The async with block establishes the NATS connection and initializes
JetStream buckets. The operate() method runs the main event loop,
which:
- Registers the extension with the Symphony registry
- Publishes status updates every 30 seconds to prevent TTL expiration
- Waits for SIGINT to trigger a graceful shutdown
- Publishes a disconnect status on exit
All endpoint and storage setup that requires an active connection must
occur inside the async with block. UI resources, routes, menus, and
capabilities can be configured before it.
Capabilities
Capabilities declare what NATS subjects the extension needs to publish to or subscribe from. During auto-provisioning, these capabilities determine the default permissions embedded in the extension's API key. Users can later adjust these permissions in the Symphony UI on the API Keys page.
# Subscribe capabilities: subjects the extension listens on
ext.add_capability("sub", "extensions.my_ext", "My Extension")
ext.add_capability("sub", "extensions.my_ext.process", "Process data")
ext.add_capability("sub", "extensions.my_ext.query", "Query data")
# Publish capabilities: subjects the extension sends to
ext.add_capability("pub", "extensions.my_ext.processed", "Data processed event")
# Specific cross-extension service subjects are permitted when the
# registering user's RBAC role covers them.
ext.add_capability("pub", "extensions.intelligence.mcp", "Query Intelligence MCP tools")
Capability names use dot-separated hierarchies. Both subscribe and
publish capabilities follow the extensions.<prefix> convention.
Wildcard capabilities using > can allow a range of subjects within the
extension's own prefix (e.g. extensions.my_ext.>). Wildcards targeting
another extension's prefix are rejected at provisioning time. To call a
specific service on another extension, declare its full subject (e.g.
extensions.intelligence.mcp).
User Interface
Extensions add UI components by registering resources, mapping them to routes, and adding menu items to the Symphony navigation. For detailed documentation on each resource type, see User Interfaces.
Resources
A resource is a named piece of UI content identified by a URI and a MIME type:
ext.add_resource("ui://my_ext/home", "text/symphony-jsx", page_content)
ext.add_resource("ui://my_ext/common", "text/symphony-module", shared_code)
ext.add_resource("ui://my_ext/dashboard", "text/html+symphony", html_content)
ext.add_htmx_resource("ui://my_ext/table", "pages/table")
ext.add_resource("ui://my_ext/help", "text/markdown", help_content)
Module resources (text/symphony-module) are not rendered directly—they
are imported by text/symphony-jsx resources using
import { ... } from '@symphony/extension/my_ext/common'.
HTMX resources (text/html+htmx) use a JSON config instead of HTML
content—Symphony provides the HTMX shell with auth injection and
theme sync. See HTML + HTMX for
details.
See User Interfaces for a comparison of the available MIME types and when to use each one.
Routes
Routes map URL paths to resources so they appear as navigable pages:
ext.add_route("/my_ext", "ui://my_ext/home")
Nested routes create a hierarchy of pages under a parent path:
ext.add_route("/my_ext", "ui://my_ext/home", routes={
"/settings": {"uri": "ui://my_ext/settings"},
"/detail": {"uri": "ui://my_ext/detail"}
})
Menus
Menu items appear in the Symphony navigation sidebar:
ext.add_menu("main", "My Extension", "/my_ext", "fa-puzzle-piece")
ext.add_menu("tools", "Data Tools", "/my_ext/tools", "fa-wrench")
The parameters are the group name, display label, target route, and a FontAwesome icon class.
Widgets
Widgets are UI components available for use on the Symphony dashboard:
ext.add_widget("ui://my_ext/status", "System Status", "Real-time status")
Help Pages
Extension help pages should use the text/markdown MIME type and be
routed under /help/extensions/<ExtensionName>. See
Markdown Resources for details on
frontmatter, template variables, and conventions.
import os
help_path = os.path.join(os.path.dirname(__file__), "pages", "help.md")
with open(help_path) as f:
help_content = f.read()
ext.add_resource("ui://my_ext/help", "text/markdown", help_content)
ext.add_route("/help/extensions/MyExtension", "ui://my_ext/help")
Microservice Endpoints
Endpoints expose functionality as NATS microservices that can be invoked by other extensions, the Symphony REST API, and the Symphony UI.
Adding Endpoints
Endpoints must be created inside the async with block after the
connection is established:
async with ext:
if ext.token:
endpoints = await ext.add_endpoints(
name="MyService",
version="1.0.0",
description="My extension service"
)
group = endpoints.add_group(name="cirata.extensions.my_ext")
await group.add_endpoint(name="process", handler=process_handler)
await group.add_endpoint(name="query", handler=query_handler)
await ext.operate()
The endpoint group name determines the NATS subject prefix. An endpoint
named process in the group cirata.extensions.my_ext listens on the
subject cirata.extensions.my_ext.process.
Handler Functions
Handlers are async functions that receive a request object and must respond:
async def process_handler(req):
"""Process incoming data and return a result."""
try:
# req.data contains the request payload as bytes
input_data = req.data.decode('utf-8')
result = f"Processed: {input_data}"
return await req.respond(result.encode('utf-8'))
except Exception as e:
error = f"Error: {str(e)}"
return await req.respond(error.encode('utf-8'))
JSON Request/Response
Most handlers work with JSON-encoded data:
import json
async def query_handler(req):
"""Handle a JSON query and return JSON results."""
try:
request = json.loads(req.data.decode('utf-8'))
filter_value = request.get('filter', 'all')
results = await fetch_data(filter_value)
response = json.dumps({
"status": "success",
"count": len(results),
"items": results
})
return await req.respond(response.encode('utf-8'))
except json.JSONDecodeError:
error = json.dumps({"error": "Invalid JSON"})
return await req.respond(error.encode('utf-8'))
except Exception as e:
error = json.dumps({"error": str(e)})
return await req.respond(error.encode('utf-8'))
Every handler must call req.respond(). If a handler fails to respond,
the caller will time out waiting for a reply.
OpenAPI Metadata
To expose services as REST APIs through the Symphony server, include an OpenAPI specification in the endpoint metadata:
import json
with open("openapi.json") as f:
openapi_spec = f.read()
endpoints = await ext.add_endpoints(
name="MyService",
version="1.0.0",
metadata={"openapi": openapi_spec},
description="My extension service"
)
When an OpenAPI specification is present, Symphony routes HTTP requests received at the server to the corresponding NATS endpoint based on the path definitions in the specification.
REST API Response Format
When endpoints are exposed as REST APIs, handlers can respond in two ways:
Raw response (simplest)—respond with the content directly.
Symphony auto-detects the content type: valid JSON is served as
application/json, anything else as text/plain.
async def list_items(req):
items = await fetch_items()
await req.respond(json.dumps(items).encode("utf-8"))
async def ping(req):
await req.respond(b"pong")
Wrapped response (full control)—for control over the HTTP status
code, content type, or response headers, wrap the response in a JSON
object with a body field:
async def create_item(req):
request_data = json.loads(req.data.decode("utf-8"))
item = await store_item(request_data)
response = json.dumps({
"body": {"id": item["id"], "status": "created"},
"statusCode": 201,
"contentType": "application/json"
})
await req.respond(response.encode("utf-8"))
| Field | Required | Description |
|---|---|---|
body | Yes | The response payload. For JSON content types, any JSON value. For other types (e.g. text/plain), a string. |
statusCode | No | HTTP status code (100–599). Defaults to 200. |
contentType | No | MIME type. Defaults to application/json. |
headers | No | Additional HTTP headers as a string-to-string object. |
Both camelCase (statusCode, contentType) and lowercase
(statuscode, contenttype) field names are accepted.
Symphony detects the format by checking whether the response is a
JSON object with a body key. Both formats work identically whether
the endpoint is called through the REST API or directly over NATS.
Storage
Extensions can use JetStream key-value stores for persistent data.
Buckets are declared during initialization and accessed inside the
async with block.
Declaring Buckets
ext.add_bucket("my_ext_data", "Extension data", ttl=0)
ext.add_bucket("my_ext_cache", "Temporary cache", ttl=300)
# With lifecycle properties
ext.add_bucket(
"my_ext_events",
"Event log",
ttl=86400,
max_history_per_key=10,
max_bucket_size=1073741824 # 1 GB
)
| Parameter | Description |
|---|---|
ttl | Time-to-live in seconds. 0 means entries do not expire. |
max_history_per_key | Maximum number of historical revisions retained per key. 0 or omitted uses the server default (typically 1). |
max_bucket_size | Maximum total size of the bucket in bytes. When exceeded, the oldest entries are discarded. 0 or omitted means no limit. |
The platform creates (or updates) buckets automatically when the extension registers, so you do not need to create them manually. Re-registering with changed parameters updates the bucket configuration without losing existing data. Declared buckets inherit the cluster's replica factor (R=3 on 3–4 node clusters, R=5 on 5+ nodes, R=1 standalone) — no action required.
If you create buckets at runtime rather than via add_bucket, fetch
the current replica count from Symphony and pass it to
create_key_value explicitly. Otherwise nats-py defaults to R=1
and the bucket is not replicated:
import json
msg = await ext.nc.request("cirata.services.cluster.info", b"", timeout=2)
replicas = json.loads(msg.data).get("replicas", 1)
await ext.js.create_key_value(
bucket="runtime_data",
description="Created on demand",
replicas=replicas,
)
Key-Value Operations
async def storage_example(ext):
js = ext.js
kv = await js.key_value("my_ext_data")
# Store a value
await kv.put("config:theme", b'{"mode": "dark"}')
# Retrieve a value
entry = await kv.get("config:theme")
value = json.loads(entry.value.decode('utf-8'))
# List all keys
keys = await kv.keys()
for key in keys:
print(f"Key: {key}")
# Delete a key
await kv.delete("config:theme")
Batch Write
When writing many keys at once, use the kv_batch utility to publish
all entries concurrently instead of calling put() in a loop. Each
individual put() is a synchronous round-trip; the batch API fires all
publishes via asyncio.gather and collects acks in parallel.
from cirata.symphony.kv_batch import put_all
entries = {
item["id"]: json.dumps(item).encode("utf-8")
for item in items
}
js = ext.js
result = await put_all(js, "my_ext_data", entries)
print(f"Batch write: {result.succeeded} succeeded, "
f"{result.failed} failed out of {result.total}")
for key, error in result.errors.items():
print(f"Failed key '{key}': {error}")
The batch API never fails fast—a single bad key does not prevent the
remaining entries from being written. BatchPutResult.errors maps
failed key names to their exceptions. Values written this way are fully
readable via standard kv.get().
JSON Storage Helpers
A common pattern is to wrap key-value operations with JSON serialization:
import json
async def store_json(kv, key, data):
await kv.put(key, json.dumps(data).encode('utf-8'))
async def get_json(kv, key):
entry = await kv.get(key)
return json.loads(entry.value.decode('utf-8'))
Usage Reporting
Extensions that consume licensed resources should report their usage
to Symphony for licensing and chargeback. The report_usage
method sends dimension measurements to Symphony, which converts
them into license units using the rates embedded in the active license.
Reporting Usage
response = await ext.report_usage({
"tables_replicated": 5,
"bytes_transferred_gb": 120.5,
})
The dimensions dictionary contains named measurements specific to
your extension. The keys must match the dimension names defined in the
license's unit rates for your extension prefix. If any dimension key
is not covered by a license rate, Symphony will add your extension
to the disabled_extensions list (per-extension enforcement).
Response
The response is a dictionary containing:
| Field | Description |
|---|---|
result | "ok" on success |
units | The computed license units consumed by this report |
enforcement | Current enforcement state object |
Checking Enforcement State
The enforcement state in the response tells your extension whether it should continue operating:
response = await ext.report_usage({"queries_executed": 100})
enforcement = response.get("enforcement", {})
status = enforcement.get("status", "enforcement_ok")
if status == "enforcement_enforced":
logging.warning("License enforcement active — pausing operations")
# Stop or reduce licensed operations
Subscribing to Enforcement State
In addition to checking the enforcement state from each report_usage
response, your extension can subscribe to the enforcement broadcast
subject to receive updates in real time—including changes triggered
by other extensions, license uploads, or administrative actions:
import json
ENFORCEMENT_SUBJECT = "cirata.symphony.enforcement"
async def on_enforcement(msg):
state = json.loads(msg.data)
status = state.get("status", "enforcement_ok")
message = state.get("message", "")
if status == "enforcement_enforced":
logging.warning("Enforcement active: %s — pausing operations", message)
# Stop or reduce licensed operations
elif status == "enforcement_warning":
logging.info("Enforcement warning: approaching license limit")
# Subscribe during extension setup
sub = await ext.conn.subscribe(ENFORCEMENT_SUBJECT, cb=on_enforcement)
You can also fetch the current enforcement state on demand without reporting usage—for example, when your extension first starts:
reply = await ext.conn.request(
"cirata.services.usage.enforcement",
json.dumps({}).encode("utf-8"),
timeout=5,
)
state = json.loads(reply.data)
logging.info("Current enforcement status: %s", state.get("status"))
Enforcement States
| Status | Meaning |
|---|---|
enforcement_ok | Normal operation—units available |
enforcement_warning | 90% or more of licensed units consumed |
enforcement_grace | Units exhausted or license expired, within vendor-set grace period |
enforcement_enforced | Grace period expired or no valid license—stop licensed operations |
Extensions may also be individually disabled via the disabled_extensions field, or flagged as grace-only via the grace_extensions field, even when the global status is enforcement_ok. Check both lists for your extension prefix to determine your operational state.
Attribution
When an extension reports usage, Symphony checks whether any
attribution rules
match the extension prefix and reporting account. If a rule matches,
the usage record is automatically assigned to a business unit with
attribution method auto. Administrators can also manually attribute
records or override automatic attributions via the Attribution UI.
When to Report
Report usage at meaningful checkpoints—for example, after completing a replication batch, processing a set of queries, or transferring a block of data. Avoid reporting on every individual operation if the volume is very high; instead, batch measurements and report periodically.
# Example: report after each replication batch
async def replicate_batch(ext, tables):
results = await do_replication(tables)
response = await ext.report_usage({
"tables_replicated": len(results),
"bytes_transferred_gb": sum(r.bytes for r in results) / 1e9,
})
# Check enforcement state after reporting
enforcement = response.get("enforcement", {})
if enforcement.get("status") == "enforcement_enforced":
logging.warning("License limit reached — stopping replication")
return
See Usage Tracking for how administrators view and manage reported usage.
Dynamic Updates
Extensions can update their resources at runtime without restarting. This is useful for dashboards that reflect changing state:
async def update_status_page(ext, new_content):
await ext.update_resource(
"ui://my_ext/status",
"text/symphony-jsx",
new_content
)
The update_resource method replaces the resource content and publishes
an extension status update so that Symphony reflects the changes
immediately.
Inter-Extension Communication
Extensions can invoke services provided by other extensions through the client interface:
async with ext:
if ext.token:
client = ext.common.client
# Request-reply to another extension's service
response = await client.request(
subject="cirata.extensions.other.process",
payload=json.dumps({"input": "data"}).encode('utf-8'),
timeout=5.0
)
result = json.loads(response.data.decode('utf-8'))
# Fire-and-forget event notification
await client.publish(
subject="extensions.my_ext.completed",
payload=json.dumps({"job_id": "abc123"}).encode('utf-8')
)
NATS Request Timeouts
The timeout parameter is optional and defaults to 0.5 seconds if omitted.
The default is intentionally short—asyncio.TimeoutError will be raised for
any call that takes more than half a second. Always set an explicit value
appropriate to the expected latency of the service you are calling.
Timeout parameter
response = await client.request(
subject="cirata.extensions.other_ext.query",
payload=json.dumps({"filter": "active"}).encode('utf-8'),
timeout=5.0 # seconds; optional, defaults to 0.5
)
result = json.loads(response.data.decode('utf-8'))
Outbound requests from inside a handler
Handlers have access to the NATS connection via req.nc, which can be used to
call other services:
import nats.errors
async def enrich_handler(req):
"""Calls another service and merges the result."""
try:
# Make an outbound request to another extension's service
reply = await req.nc.request(
"cirata.extensions.catalog.lookup",
req.data,
timeout=5.0
)
enriched = json.loads(reply.data.decode('utf-8'))
return await req.respond(json.dumps(enriched).encode('utf-8'))
except nats.errors.NoRespondersError:
# No service is listening on that subject
error = json.dumps({"error": "catalog service unavailable"})
return await req.respond(error.encode('utf-8'))
except asyncio.TimeoutError:
# Service did not respond within the timeout window
error = json.dumps({"error": "catalog service timed out"})
return await req.respond(error.encode('utf-8'))
Choosing a timeout value
| Scenario | Suggested timeout |
|---|---|
| In-process KV or NATS round-trip | 2–3 s |
| Extension-to-extension service call | 5 s |
| Long-running computation | 30–60 s |
| Fire-and-forget (no reply needed) | use publish() instead |
Handlers that call other services with req.nc.request() must always handle
asyncio.TimeoutError and nats.errors.NoRespondersError, or the caller will
receive a timeout when the inner call stalls. The outer request timeout is
always shorter than it looks—Symphony routes the HTTP request to the
extension over NATS, so the total round-trip budget includes both the inbound
and the outbound leg.
Error Handling
Extension Lifecycle
Wrap the extension lifecycle in a try/except to handle initialization failures:
async def main():
ext = symphony.Extension("My Extension", "my_ext")
# ... configure capabilities, resources, etc.
try:
async with ext:
if ext.token:
# ... add endpoints
pass
await ext.operate()
except RuntimeError as e:
logging.error(f"Extension failed: {e}")
except KeyboardInterrupt:
logging.info("Shutdown requested")
Handler Errors
Always handle exceptions in endpoint handlers and respond with an appropriate error. A handler that raises an unhandled exception will not send a response, causing the caller to time out.
async def safe_handler(req):
try:
data = json.loads(req.data.decode('utf-8'))
result = process(data)
return await req.respond(
json.dumps({"status": "success", "result": result}).encode('utf-8')
)
except json.JSONDecodeError:
return await req.respond(
json.dumps({"error": "Invalid JSON input"}).encode('utf-8')
)
except Exception as e:
return await req.respond(
json.dumps({"error": str(e)}).encode('utf-8')
)
Logging
Extensions should use the SDK's configure_logging() helper for
structured JSON output to stdout. Call it once at module level,
before any other logging:
from cirata.symphony.logging import configure_logging
configure_logging("myext")
This configures the root Python logger to emit JSON with consistent
fields (timestamp, level, extension, message, trace_id,
span_id) to stdout — captured automatically by journald or the
container runtime. Do not create log files or call
logging.basicConfig().
Use standard logging calls throughout your extension:
import logging
logger = logging.getLogger("myext")
logger.info("Processing batch", extra={"items": 42})
logger.error("Query failed", exc_info=True)
Observability
Extensions can participate in Symphony's unified telemetry system by enabling observability. This registers NATS endpoints for metrics, logs, and traces that the Observability Extension collects automatically.
from cirata.symphony.observability import Observability, SpanKind
async with ext:
svc = await ext.add_endpoints("myext", "1.0")
obs = await Observability.enable(ext, svc, "myext")
# Feed logs into the OTLP pipeline (in addition to stdout)
import logging
logging.getLogger().addHandler(obs.logs)
# Instrument handlers with automatic trace propagation
@obs.instrumented_handler("process")
async def handle_process(msg):
obs.increment_counter("requests.total")
result = process(msg.data)
await msg.respond(result)
Adding obs.logs as a handler on the root logger means every log
call flows to both stdout (via configure_logging) and the OTLP
buffer (via obs.logs). Do not call logger.setLevel() after
configure_logging() — the level is already set.
For complete coverage of traces, metrics, logs, and distributed tracing patterns, see Observability.
Multi-Instance Support
Python extensions can run as multiple instances of the same extension type for scalability, fault tolerance, and workload distribution. The SDK provides built-in support for work partitioning, queue-based command delivery, and instance-specific messaging.
Automatic Load Balancing
Microservice endpoints registered with add_endpoints are
automatically load-balanced across all instances by NATS micro. No
code changes are needed—deploying additional instances immediately
distributes incoming service requests across them.
The SDK also uses queue subscribe for the commands channel, ensuring that each command is processed by exactly one instance.
WorkPartitioner
For extensions that run background tasks (watchers, monitors, replicators), enable work partitioning to distribute work items across instances using consistent hashing.
def on_rebalance(total_instances, my_item_count):
logging.info(
f"Rebalanced: {total_instances} instances active, "
f"I own {my_item_count} items"
)
ext.enable_work_partitioning(on_rebalance)
Use is_my_work to check whether the current instance should process
a given work item:
# In a bucket watcher loop
for key in discovered_items:
if ext.work_partitioner.is_my_work(key):
await process_item(key)
else:
logging.debug(f"Skipping {key} — owned by another instance")
The partitioner watches the status bucket for instance join and leave
events. When the set of active instances changes, the rebalance
callback fires and is_my_work results may change. Extensions should
re-evaluate their active work items on each watcher iteration rather
than caching ownership decisions.
Instance-Specific Handlers
Register handlers that are targeted at a specific instance using
add_instance_handler. Each instance subscribes to
cirata.extensions.{prefix}.instance.{instance_id}.>, and only the
targeted instance receives the request:
async def instance_status(req):
status = {
"instance_id": ext.instance_id,
"work_items": ext.work_partitioner.my_item_count,
}
await req.respond(json.dumps(status).encode("utf-8"))
ext.add_instance_handler("local-status", instance_status)
Callers can target a specific instance via the HTTP API by adding the
?instance=<id> query parameter to the service proxy URL.
Instance ID
The extension's instance ID is derived from the JWT identifier in the API token. Access it with:
instance_id = ext.instance_id
Use the instance ID to tag operational records in shared storage for debugging and traceability.
For a comprehensive guide to multi-instance design patterns, opt-in levels, and best practices, see Multi-Instance Extensions.
Complete Example
The following example demonstrates a complete Python extension with capabilities, UI, microservice endpoints, and storage:
"""
Copyright 2025 Cirata.
"""
import asyncio
import json
import logging
import os
from cirata import symphony
from cirata.symphony.logging import configure_logging
configure_logging("catalog")
# --- Endpoint Handlers ---
async def list_handler(req):
"""Return all items from storage."""
try:
kv = await req.nc.jetstream().key_value("catalog_items")
items = []
for key in await kv.keys():
entry = await kv.get(key)
items.append(json.loads(entry.value.decode('utf-8')))
return await req.respond(json.dumps(items).encode('utf-8'))
except Exception as e:
return await req.respond(
json.dumps({"error": str(e)}).encode('utf-8')
)
async def add_handler(req):
"""Add an item to storage."""
try:
item = json.loads(req.data.decode('utf-8'))
item_id = item.get('id', str(hash(json.dumps(item))))
kv = await req.nc.jetstream().key_value("catalog_items")
await kv.put(f"item:{item_id}", json.dumps(item).encode('utf-8'))
return await req.respond(
json.dumps({"status": "created", "id": item_id}).encode('utf-8')
)
except Exception as e:
return await req.respond(
json.dumps({"error": str(e)}).encode('utf-8')
)
# --- UI Content ---
page_content = """
import { useContext, useEffect, useState } from 'react'
import { SymphonyContext } from '@symphony'
const TIMEOUT = 5000
export default function CatalogPage() {
const symphonyContext = useContext(SymphonyContext)
const [items, setItems] = useState([])
useEffect(() => {
symphonyContext.setTitle({
"/": "Home",
"/catalog": "Catalog"
})
loadItems()
}, [])
const loadItems = async () => {
if (symphonyContext.nc === undefined) return
try {
const reply = await symphonyContext.nc.request(
'cirata.extensions.catalog.list',
'',
{ timeout: TIMEOUT }
)
const text = new TextDecoder('utf-8').decode(reply.data)
setItems(JSON.parse(text))
} catch (err) {
console.error('Failed to load items:', err)
}
}
return (
<div>
<h1>Catalog</h1>
<button onClick={loadItems}>Refresh</button>
<ul>
{items.map((item, i) => (
<li key={i}>{item.name || JSON.stringify(item)}</li>
))}
</ul>
</div>
)
}
"""
# --- Main ---
async def main():
ext = symphony.Extension(
name="Catalog",
prefix="catalog",
description="Manages a catalog of items"
)
# Capabilities
ext.add_capability("sub", "extensions.catalog", "Catalog Extension")
ext.add_capability("sub", "extensions.catalog.list", "List items")
ext.add_capability("sub", "extensions.catalog.add", "Add items")
if ext.token:
# UI resources and navigation
ext.add_resource("ui://catalog/home", "text/symphony-jsx", page_content)
ext.add_route("/catalog", "ui://catalog/home")
ext.add_menu("main", "Catalog", "/catalog", "fa-book")
# Help page
help_path = os.path.join(os.path.dirname(__file__), "pages", "help.md")
if os.path.exists(help_path):
with open(help_path) as f:
ext.add_resource("ui://catalog/help", "text/markdown", f.read())
ext.add_route("/help/extensions/Catalog", "ui://catalog/help")
# Storage buckets
ext.add_bucket("catalog_items", "Catalog item storage")
# Run extension
try:
async with ext:
if ext.token:
endpoints = await ext.add_endpoints(
name="catalog",
version="1.0.0",
description="Catalog service"
)
group = endpoints.add_group(name="cirata.extensions.catalog")
await group.add_endpoint(name="list", handler=list_handler)
await group.add_endpoint(name="add", handler=add_handler)
await ext.operate()
except RuntimeError as e:
logging.error(f"Unable to initialize: {e}")
if __name__ == '__main__':
asyncio.run(main())