Go Extension Development
Cirata Symphony provides a Go library for developing extensions. Go extensions are well suited to high-performance workloads, infrastructure tooling, and environments where a single compiled binary with minimal dependencies is preferred.
Getting Started
Installation
The Symphony Go library is a single Go module, cirata.com/symphony,
containing the extension, client, types, and shared packages.
Download the source archive from the
Languages and Libraries page, extract it,
and point your module at the extracted directory with a single replace
directive:
# Extract the library into your module
tar xf cirata-symphony-latest-go-src.tar -C third_party/cirata-symphony
// v0.0.0 is a placeholder — the replace directive below resolves the
// module from the extracted directory, so the version is not used.
require cirata.com/symphony v0.0.0
replace cirata.com/symphony => ./third_party/cirata-symphony
Then import the packages you need:
import (
"cirata.com/symphony/extension"
"cirata.com/symphony/client"
"cirata.com/symphony/types"
)
One replace directive covers every package, because they all belong to
the single cirata.com/symphony module. Once the module is published to a
Go proxy, go get cirata.com/symphony@latest resolves it directly, without
the archive or the replace directive (or pin a specific release, e.g.
go get cirata.com/symphony@v1.4.9).
The library requires Go 1.25 or later and depends on the NATS Go client
(github.com/nats-io/nats.go) and the NATS micro package for microservice
support.
Project Structure
A Go extension is typically a single module containing a main package and any supporting files:
my_extension/
├── main.go # Extension entry point
├── handlers.go # Optional: microservice handler functions
├── pages/ # Optional: UI template files
│ ├── home.tsx
│ └── help.md
├── go.mod # Module definition
└── go.sum # Dependency checksums
Minimal Extension
The simplest possible extension connects to Symphony and publishes its availability:
package main
import (
"log"
"os"
"cirata.com/symphony/extension"
)
func main() {
token := os.Getenv("SYMPHONY_TOKEN")
ext := extension.NewExtension(
"Hello World", // name
"hello", // prefix
"A simple Go extension", // description
token, // token
)
if err := ext.Run(); err != nil {
log.Fatalf("error: %v", err)
}
}
This extension will register with Symphony and remain connected until stopped with SIGINT or SIGTERM. If no token is available, the extension logs an error with instructions and exits.
Extension Initialization
NewExtension
The NewExtension function creates an extension instance:
ext := extension.NewExtension(
name, // Human-readable name displayed in the UI
prefix, // Unique prefix for NATS subjects and routes
description, // Description of the extension's purpose
token, // API token (falls back to SYMPHONY_TOKEN env var)
)
| Parameter | Required | Description |
|---|---|---|
name | Yes | Human-readable name displayed in the Symphony UI |
prefix | Yes | Unique identifier for NATS subjects and routes |
description | No | Description of the extension's purpose |
token | No | API token (empty string falls back to environment and credential store) |
After creation, use the builder methods to add capabilities,
features, and services to the extension, then start it with Run().
Authentication
Extensions authenticate with a token obtained from an API Key. The token is resolved in the following order of precedence:
- The
tokenparameter inNewExtension - The OS credential store (Keychain on macOS, Credential Manager on Windows, Secret Service on Linux) — from a previous provisioning run
- The
SYMPHONY_TOKENenvironment variable (also checked byNewClient) - The
REGISTRATION_TOKENenvironment variable (one-time provisioning)
When REGISTRATION_TOKEN is set, the extension connects, automatically
provisions a permanent API key with the extension's declared
capabilities, and stores it in the OS credential store (or
~/.config/cirata/ as a fallback). Subsequent runs use the stored
token automatically with no interaction required.
If SYMPHONY_TOKEN contains a registration token (detected by its JWT
capabilities), it is treated as a registration token and provisioned
in the same way.
If no token is available from any source, the extension logs an error with instructions and exits. Extensions never prompt for input on stdin.
Container and Kubernetes deployments
In Docker containers and Kubernetes pods the OS credential store is not available (no keychain daemon). The SDK detects this automatically and falls back to file-based storage, but the file is ephemeral and lost when the container restarts. To avoid re-provisioning on every restart:
- Recommended: inject a pre-provisioned token via the
SYMPHONY_TOKENenvironment variable (e.g. from a Kubernetes Secret). No file is written and no credential store is needed. - Alternative: mount
~/.config/cirata/on a persistent volume so an auto-provisioned token survives pod restarts.
Overriding the token-embedded address
When the hostname encoded in the token is not the right one to reach
Symphony from where the extension runs, set the
SYMPHONY_ADDRESS environment variable. It accepts a bare hostname,
host:port, or a full URL with an explicit http or https scheme.
See SYMPHONY_ADDRESS for the full reference: when to use it, the three accepted forms, how scheme and port route to the HTTPS leg only, TLS SAN considerations, the plaintext-HTTP warning, and validation rules. The same variable behaves identically across the Go, Python, Java, and Rust SDKs.
Lifecycle
The Run() method starts the extension and blocks until a shutdown
signal is received:
err := ext.Run()
Run() performs the following steps:
- Resolves a token from the credential store or environment (see Authentication above)
- Establishes a NATS connection to Symphony
- Registers the extension in the Symphony registry
- Starts periodic status publishing (every 30 seconds)
- Subscribes to the commands channel for runtime directives
- Blocks until SIGINT or SIGTERM is received
- Publishes a disconnect status and drains the connection
The status publishing loop prevents TTL expiration of the extension's entry in Symphony.
Capabilities
Capabilities declare what NATS subjects the extension needs to publish to or subscribe from. During auto-provisioning, these capabilities determine the default permissions embedded in the extension's API key. Users can later adjust these permissions in the Symphony UI on the API Keys page.
// Subscribe capabilities: subjects the extension listens on
ext.AddCapability("sub", "extensions.my_ext", "My Extension")
ext.AddCapability("sub", "extensions.my_ext.process", "Process data")
ext.AddCapability("sub", "extensions.my_ext.query", "Query data")
// Publish capabilities: subjects the extension sends to
ext.AddCapability("pub", "extensions.my_ext.processed", "Data processed event")
| Parameter | Description |
|---|---|
capType | "pub" for publish or "sub" for subscribe |
key | NATS subject pattern |
description | Human-readable description |
Capability names use dot-separated hierarchies. Both subscribe and
publish capabilities follow the extensions.<prefix> convention.
Wildcard capabilities using > can allow a range of subjects. For
example, extensions.my_ext.> would cover all subjects under that
prefix.
User Interface
Extensions add UI components by registering resources, mapping them to routes, and adding menu items to the Symphony navigation. For detailed documentation on each resource type, see User Interfaces.
Resources
A resource is a named piece of UI content identified by a URI and a MIME type:
ext.AddResource("ui://my_ext/home", types.TEXT_SYMPHONY_JSX, pageContent)
ext.AddResource("ui://my_ext/common", types.TEXT_SYMPHONY_MODULE, sharedCode)
ext.AddResource("ui://my_ext/dashboard", types.TEXT_HTMLSYMPHONY, htmlContent)
ext.AddHtmxResource("ui://my_ext/table", "pages/table")
ext.AddResource("ui://my_ext/help", types.TEXT_MARKDOWN, helpContent)
Module resources (types.TEXT_SYMPHONY_MODULE) are imported by JSX
resources using import { ... } from '@symphony/extension/my_ext/common'.
HTMX resources use a JSON config instead of HTML content—Symphony provides the HTMX shell with auth injection and theme sync. See HTML + HTMX for details.
For resources that need additional metadata (name, description):
ext.AddResourceFull(
"ui://my_ext/app",
types.TEXT_HTMLPROFILEMCP_APP,
appHTML,
"My App", // name
"Interactive application", // description
)
See User Interfaces for a comparison of the available MIME types and when to use each one.
Including Content from Files
Use os.ReadFile to load UI content from external files:
content, err := os.ReadFile("pages/home.tsx")
if err != nil {
log.Fatalf("failed to read page: %v", err)
}
ext.AddResource("ui://my_ext/home", types.TEXT_SYMPHONY_JSX, string(content))
Or embed files at compile time with go:embed:
import _ "embed"
//go:embed pages/home.tsx
var homePage string
//go:embed pages/help.md
var helpPage string
ext.AddResource("ui://my_ext/home", types.TEXT_SYMPHONY_JSX, homePage)
ext.AddResource("ui://my_ext/help", types.TEXT_MARKDOWN, helpPage)
Routes
Routes map URL paths to resources so they appear as navigable pages:
ext.AddRoute("/my_ext", "ui://my_ext/home")
ext.AddRoute("/my_ext/dashboard", "ui://my_ext/dashboard")
Nested routes create a hierarchy of pages under a parent path:
ext.AddRouteWithChildren("/my_ext", "ui://my_ext/home", map[string]types.RouteMap{
"/settings": {Uri: "ui://my_ext/settings"},
"/detail": {Uri: "ui://my_ext/detail"},
})
Menus
Menu items appear in the Symphony navigation sidebar:
ext.AddMenu("main", "My Extension", "/my_ext", "fa-puzzle-piece")
ext.AddMenu("tools", "Data Tools", "/my_ext/tools", "fa-wrench")
| Parameter | Description |
|---|---|
group | Menu group name (e.g., "main", "tools") |
item | Display label for the menu entry |
route | Target route path |
icon | FontAwesome icon class |
Widgets
Register dashboard widgets for display in the widget picker:
ext.AddWidget("Status", "ui://my_ext/status")
With notes or a description:
ext.AddWidgetWithNotes("Status", "ui://my_ext/status", "Real-time status monitor")
Help Pages
Extension help pages should use the text/markdown MIME type and be
routed under /help/extensions/<ExtensionName>. See
Markdown Resources for details on
frontmatter, template variables, and conventions.
helpContent, err := os.ReadFile("pages/help.md")
if err != nil {
log.Fatalf("failed to read help: %v", err)
}
ext.AddResource("ui://my_ext/help", types.TEXT_MARKDOWN, string(helpContent))
ext.AddRoute("/help/extensions/MyExtension", "ui://my_ext/help")
Microservice Endpoints
Extensions expose functionality as NATS microservices using the
AddService method and the NATS micro package.
Creating a Service
AddService creates a NATS microservice and must be called after
Run() has established the connection. Because Run() blocks,
services are typically set up using a pattern that separates
configuration from execution:
package main
import (
"encoding/json"
"log"
"os"
"cirata.com/symphony/extension"
"cirata.com/symphony/types"
"github.com/nats-io/nats.go/micro"
)
func main() {
token := os.Getenv("SYMPHONY_TOKEN")
ext := extension.NewExtension(
"My Extension", "my_ext",
"Provides data processing", token,
)
// Configure capabilities, UI, and storage before Run()
ext.AddCapability("sub", "extensions.my_ext", "My Extension")
ext.AddResource("ui://my_ext/home", types.TEXT_SYMPHONY_JSX, pageContent)
ext.AddRoute("/my_ext", "ui://my_ext/home")
ext.AddMenu("main", "My Extension", "/my_ext", "fa-puzzle-piece")
ext.AddBucket("my_ext_data", "Extension data", 0)
if err := ext.Run(); err != nil {
log.Fatalf("error: %v", err)
}
}
For extensions that need microservice endpoints, use AddService
after the connection is established inside the extension lifecycle:
svc, err := ext.AddService("my_ext", "1.0.0")
if err != nil {
log.Fatalf("failed to add service: %v", err)
}
// Create a group for subject namespacing
group := svc.AddGroup("cirata.extensions.my_ext")
// Add endpoint handlers
err = group.AddEndpoint("process", micro.HandlerFunc(processHandler))
if err != nil {
log.Fatalf("failed to add endpoint: %v", err)
}
The group name determines the NATS subject prefix. An endpoint
named process in the group cirata.extensions.my_ext listens on
subject cirata.extensions.my_ext.process.
Handler Functions
Handlers receive a micro.Request and must always respond:
func processHandler(req micro.Request) {
var input map[string]interface{}
if err := json.Unmarshal(req.Data(), &input); err != nil {
req.Error("400", "invalid JSON", nil)
return
}
result := map[string]interface{}{
"status": "success",
"processed": true,
"input": input,
}
response, _ := json.Marshal(result)
req.Respond(response)
}
Key rules for handlers:
- Always respond. Every handler must call
req.Respond()orreq.Error(). A handler that does not respond causes the caller to time out. - Return appropriate empty values. Use
"{}"for single-object endpoints and"[]"for list endpoints. - Handle errors gracefully. Use
req.Error()for structured error responses or respond with an error payload.
Handler Patterns
JSON with typed structs:
type ProcessRequest struct {
Input string `json:"input"`
Mode string `json:"mode"`
}
type ProcessResponse struct {
Status string `json:"status"`
Result string `json:"result"`
}
func typedHandler(req micro.Request) {
var input ProcessRequest
if err := json.Unmarshal(req.Data(), &input); err != nil {
req.Error("400", "invalid request", nil)
return
}
result := ProcessResponse{
Status: "success",
Result: "processed: " + input.Input,
}
response, _ := json.Marshal(result)
req.Respond(response)
}
Returning a list:
func listHandler(req micro.Request) {
items := []map[string]interface{}{
{"id": "1", "name": "Item 1"},
{"id": "2", "name": "Item 2"},
}
response, err := json.Marshal(items)
if err != nil {
req.Respond([]byte("[]"))
return
}
req.Respond(response)
}
Outbound NATS Requests and Timeouts
Handlers can call other services using the NATS connection captured from the enclosing scope. All outbound requests require a timeout:
func makeEnrichHandler(nc *nats.Conn) micro.HandlerFunc {
return func(req micro.Request) {
// Make an outbound request to another extension's service
reply, err := nc.Request(
"cirata.extensions.catalog.lookup",
req.Data(),
5*time.Second,
)
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
// No response received within the timeout window
req.Error("503", "catalog service timed out", nil)
} else if errors.Is(err, nats.ErrNoResponders) {
// No service is listening on that subject
req.Error("503", "catalog service unavailable", nil)
} else {
req.Error("500", err.Error(), nil)
}
return
}
req.Respond(reply.Data)
}
}
Pass the connection to the handler at setup time using a closure:
nc := ext.Client().GetSymphonyClient().GetNatsConnection()
group.AddEndpoint("enrich", makeEnrichHandler(nc))
For timeout control tied to a broader operation context, use
nc.RequestMsgWithContext:
func makeContextHandler(nc *nats.Conn) micro.HandlerFunc {
return func(req micro.Request) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
reply, err := nc.RequestMsgWithContext(ctx, &nats.Msg{
Subject: "cirata.extensions.catalog.lookup",
Data: req.Data(),
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, nats.ErrTimeout) {
req.Error("503", "catalog service timed out", nil)
} else {
req.Error("500", err.Error(), nil)
}
return
}
req.Respond(reply.Data)
}
}
context.WithTimeout is preferable when the outbound call is one step in a
larger pipeline—cancelling the context cancels all downstream requests
immediately.
Choosing a timeout value:
| Scenario | Suggested timeout |
|---|---|
| KV read or single-hop NATS round-trip | 2–3 s |
| Extension-to-extension service call | 5 s |
| Long-running computation | 30–60 s |
| No reply expected | use nc.Publish() instead |
Error values to check:
| Error | Meaning |
|---|---|
nats.ErrTimeout | No response within the timeout window |
nats.ErrNoResponders | No service is subscribed to that subject |
context.DeadlineExceeded | Context deadline fired before a reply |
nc.Request requires a timeout—there is no overload without one and no
package-level default. Passing 0 causes an immediate timeout. Always set a
value that reflects the expected latency of the service being called, so the
limit is explicit at the call site.
REST API Response Format
When endpoints are exposed as REST APIs (via an OpenAPI spec in the service metadata), handlers can respond in two ways:
Raw response (simplest)—respond with the content directly.
Symphony auto-detects the content type: valid JSON is served as
application/json, anything else as text/plain.
// JSON response
func listHandler(req micro.Request) {
items := []Item{{ID: "1", Name: "Widget"}}
data, _ := json.Marshal(items)
req.Respond(data)
}
// Plain text response
func pingHandler(req micro.Request) {
req.Respond([]byte("pong"))
}
Wrapped response (full control)—for control over the HTTP status
code, content type, or response headers, wrap the response in a JSON
object with a body field:
func createHandler(req micro.Request) {
var input CreateRequest
if err := json.Unmarshal(req.Data(), &input); err != nil {
errResp, _ := json.Marshal(map[string]interface{}{
"body": map[string]string{"error": err.Error()},
"statusCode": 400,
})
req.Respond(errResp)
return
}
item := createItem(input)
resp, _ := json.Marshal(map[string]interface{}{
"body": map[string]interface{}{"id": item.ID, "status": "created"},
"statusCode": 201,
"contentType": "application/json",
})
req.Respond(resp)
}
| Field | Required | Description |
|---|---|---|
body | Yes | The response payload. For JSON content types, any JSON value. For other types (e.g. text/plain), a string. |
statusCode | No | HTTP status code (100–599). Defaults to 200. |
contentType | No | MIME type. Defaults to application/json. |
headers | No | Additional HTTP headers as a string-to-string map. |
Symphony detects the format by checking whether the response is a
JSON object with a body key. Both formats work identically whether
the endpoint is called through the REST API or directly over NATS.
Dynamic Updates
Extensions can update their resources at runtime without restarting. This is useful for dashboards that reflect changing state:
err := ext.UpdateResource(
"ui://my_ext/dashboard",
types.TEXT_SYMPHONY_JSX,
updatedContent,
)
if err != nil {
log.Printf("failed to update resource: %v", err)
}
The UpdateResource method replaces the resource content and
re-registers the extension so that Symphony reflects the changes
immediately.
To re-publish the extension info after modifying other fields (such
as adding new routes or menus at runtime), use UpdateExtensionInfo:
ext.AddRoute("/my_ext/new-page", "ui://my_ext/new_page")
if err := ext.UpdateExtensionInfo(); err != nil {
log.Printf("failed to update extension info: %v", err)
}
Usage Reporting
Extensions that consume licensed resources should report their usage
to Symphony for licensing and chargeback. The ReportUsage
method sends dimension measurements to Symphony, which converts
them into license units using the rates embedded in the active license.
Reporting Usage
units, err := ext.ReportUsage(map[string]float64{
"tables_replicated": 5,
"bytes_transferred_gb": 120.5,
})
if err != nil {
log.Printf("failed to report usage: %v", err)
}
log.Printf("consumed %.2f units", units)
The dimensions map contains named measurements specific to your
extension. The keys must match the dimension names defined in the
license's unit rates for your extension prefix. If any dimension key
is not covered by a license rate, Symphony will add your extension
to the disabled_extensions list (per-extension enforcement).
Return Values
ReportUsage returns:
| Value | Type | Description |
|---|---|---|
units | float64 | The computed license units consumed by this report |
err | error | Non-nil if the report failed |
Subscribing to Enforcement State
Symphony broadcasts the enforcement state on the
cirata.symphony.enforcement NATS subject whenever it changes.
Subscribe to this subject to receive real-time updates—including
changes triggered by other extensions, license uploads, or
administrative actions:
nc := ext.Client().GetSymphonyClient().GetNatsConnection()
_, err := nc.Subscribe("cirata.symphony.enforcement", func(msg *nats.Msg) {
var state types.EnforcementState
if err := json.Unmarshal(msg.Data, &state); err != nil {
log.Printf("failed to parse enforcement state: %v", err)
return
}
switch state.GetStatus() {
case types.ENFORCEMENT_ENFORCED:
log.Printf("Enforcement active: %s — pausing operations", state.GetMessage())
// Stop or reduce licensed operations
case types.ENFORCEMENT_WARNING:
log.Println("Approaching license limit")
case types.ENFORCEMENT_GRACE:
log.Println("Units exhausted — in grace period")
default:
log.Println("Enforcement OK")
}
})
You can also fetch the current enforcement state on demand without reporting usage—for example, when your extension first starts:
msg, err := nc.Request("cirata.services.usage.enforcement", []byte("{}"), 5*time.Second)
if err != nil {
log.Printf("failed to fetch enforcement state: %v", err)
} else {
var state types.EnforcementState
json.Unmarshal(msg.Data, &state)
log.Printf("Current enforcement status: %s", state.GetStatus())
}
Enforcement States
| Status | Meaning |
|---|---|
enforcement_ok | Normal operation—units available |
enforcement_warning | 90% or more of licensed units consumed |
enforcement_grace | Units exhausted or license expired, within vendor-set grace period |
enforcement_enforced | Grace period expired or no valid license—stop licensed operations |
Extensions may also be individually disabled via the disabled_extensions field, or flagged as grace-only via the grace_extensions field, even when the global status is enforcement_ok. Check both lists for your extension prefix to determine your operational state.
The Go ReportUsage method returns only the computed units and an error. To receive the full enforcement state (including disabled_extensions and grace_extensions), subscribe to the cirata.symphony.enforcement broadcast or query cirata.services.usage.enforcement. The Python and Java libraries return the full enforcement state inline with each usage report.
Attribution
When an extension reports usage, Symphony checks whether any
attribution rules
match the extension prefix and reporting account. If a rule matches,
the usage record is automatically assigned to a business unit with
attribution method auto. Administrators can also manually attribute
records or override automatic attributions via the Attribution UI.
When to Report
Report usage at meaningful checkpoints—for example, after completing a replication batch, processing a set of queries, or transferring a block of data. Avoid reporting on every individual operation if the volume is very high; instead, batch measurements and report periodically.
// Example: report after each replication batch
func onBatchComplete(ext *extension.Extension, results []TableResult) {
var totalBytes float64
for _, r := range results {
totalBytes += r.Bytes
}
units, err := ext.ReportUsage(map[string]float64{
"tables_replicated": float64(len(results)),
"bytes_transferred_gb": totalBytes / 1e9,
})
if err != nil {
log.Printf("usage report failed: %v", err)
return
}
log.Printf("batch consumed %.2f units", units)
}
See Usage Tracking for how administrators view and manage reported usage.
Storage
Go extensions use the NATS JetStream key-value API for persistent storage. Buckets are declared during extension setup and are created automatically by the platform.
Declaring Buckets
Declare buckets using the AddBucket builder method:
ext.AddBucket("my_ext_data", "Extension data", 0)
ext.AddBucket("my_ext_events", "Event log with 24h TTL", 86400)
| Parameter | Description |
|---|---|
name | Bucket name |
description | Human-readable description |
ttl | Time-to-live in seconds. 0 means entries do not expire |
The platform creates or updates buckets when the extension registers. Re-registering with changed parameters updates the configuration without losing existing data. In a clustered deployment, the platform picks a replica count that matches its own buckets (R=3 on 3–4 node clusters, R=5 on 5+ nodes, R=1 standalone), so declared buckets inherit the same durability guarantees as the rest of Symphony's state.
Runtime Bucket Creation
If your extension needs to create buckets on demand rather than through the registration manifest, use the SDK helper so the replica count matches the platform's policy:
nc := ext.Client().GetSymphonyClient().GetNatsConnection()
js, _ := jetstream.New(nc)
kv, err := extension.CreateOrUpdateKeyValue(ctx, js, nc, jetstream.KeyValueConfig{
Bucket: "runtime_data",
Description: "Created on demand",
// Replicas omitted — SDK populates it from cirata.services.cluster.info.
})
Existing code that calls js.CreateOrUpdateKeyValue directly continues
to work, but will use the NATS default of R=1 unless you set the
replica count yourself. Use extension.GetClusterReplicas(ctx, nc) to
query the value directly if you need it for a stream or other
non-KeyValue config.
Accessing Buckets
Use the NATS connection from the client to access JetStream key-value stores:
nc := ext.Client().GetSymphonyClient().GetNatsConnection()
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("failed to create JetStream context: %v", err)
}
ctx := context.Background()
kv, err := js.KeyValue(ctx, "my_ext_data")
if err != nil {
log.Fatalf("failed to access bucket: %v", err)
}
Basic Operations
ctx := context.Background()
// Store a value
_, err := kv.Put(ctx, "item:123",
[]byte(`{"name": "Widget"}`))
// Retrieve a value
entry, err := kv.Get(ctx, "item:123")
if err == nil {
value := string(entry.Value())
}
// Delete a key
err = kv.Delete(ctx, "item:123")
// List all keys
keys, err := kv.Keys(ctx)
Batch Write
When writing many keys at once, use BatchPut to publish all entries
concurrently instead of calling Put() in a loop. Each individual
Put() is a synchronous round-trip; the batch API fires all publishes
via JetStream.PublishAsync and collects acks in parallel.
import "cirata.com/symphony/extension"
entries := []extension.BatchEntry{
{Key: "item:1", Value: []byte(`{"name":"Widget"}`)},
{Key: "item:2", Value: []byte(`{"name":"Gadget"}`)},
}
result := extension.BatchPut(js, "my_ext_data", entries)
log.Printf("Batch write: %d succeeded, %d failed out of %d",
result.Succeeded, result.Failed, result.Total)
for key, err := range result.Errors {
log.Printf("Failed key '%s': %v", key, err)
}
The batch API never fails fast—a single bad key does not prevent the
remaining entries from being written. BatchPutResult.Errors maps
failed key names to their errors. Values written this way are fully
readable via standard kv.Get().
Using the Client Library
The Go client library can also be used independently to interact with Symphony and invoke services provided by other extensions:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"cirata.com/symphony/client"
)
func main() {
// Empty string reads SYMPHONY_TOKEN from the environment
c, err := client.NewClient("")
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
err = c.Connect()
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
ctx, cancel := context.WithTimeout(
context.Background(), 5*time.Second)
defer cancel()
// Get Symphony info
info, err := c.Info(ctx)
if err != nil {
log.Fatalf("Failed to get info: %v", err)
}
fmt.Printf("Info: %s\n", string(info))
// Invoke an extension service
handle := c.Extension("my_ext")
var response map[string]interface{}
err = handle.InvokeJSON(ctx, "process",
map[string]string{"input": "data"}, &response)
if err != nil {
log.Fatalf("Failed to invoke: %v", err)
}
fmt.Printf("Response: %v\n", response)
}
The InvokeJSON helper marshals the request to JSON, sends a NATS
request to the extension's service, and unmarshals the response into
the provided variable.
Logging
Extensions should use NewSymphonyJSONHandler for structured JSON
output to stderr with field names consistent across all Symphony SDKs
(timestamp, level, message):
import (
"log/slog"
"os"
"cirata.com/symphony/extension/observability"
)
slog.SetDefault(slog.New(
observability.NewSymphonyJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}),
))
slog.Info("Extension started", "version", "1.0")
Do not write log files. stdout/stderr is captured automatically by journald (systemd) or the container runtime (Docker/Kubernetes).
Observability
Extensions can participate in Symphony's unified telemetry system by enabling observability. This registers NATS endpoints for metrics, logs, and traces that the Observability Extension collects automatically.
obs, _ := observability.Enable(ext, svc, "myext")
// Feed logs into both stderr and the OTLP pipeline
slog.SetDefault(slog.New(observability.NewMultiHandler(
observability.NewSymphonyJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}),
obs.GetLogCapture().Handler(),
)))
// Instrument handlers with automatic trace propagation
grp.AddEndpoint("process", obs.InstrumentedHandler("process",
func(req micro.Request) {
obs.GetMetricsCapture().IncrementCounter("requests.total", 1)
// ... process request ...
req.Respond(result)
},
))
slog.Info("Extension started")
The MultiHandler fans out every log record to both stderr (for
journald) and the OTLP log capture buffer (for the Observability
Extension). This is the same pattern the platform itself uses.
For complete coverage of traces, metrics, logs, and distributed tracing patterns, see Observability.
Multi-Instance Support
Go extensions can run as multiple instances of the same extension type for scalability, fault tolerance, and workload distribution. The SDK provides built-in support for work partitioning, queue-based command delivery, and instance-specific messaging.
Automatic Load Balancing
Microservice endpoints registered with AddService are automatically
load-balanced across all instances by NATS micro. No code changes are
needed—deploying additional instances immediately distributes
incoming service requests across them.
The SDK also uses queue subscribe for the commands channel, ensuring that each command is processed by exactly one instance.
WorkPartitioner
For extensions that run background tasks (watchers, monitors, replicators), the WorkPartitioner distributes work items across instances using consistent hashing.
Enable via the extension builder:
ext.EnableWorkPartitioning(func(totalInstances, myItemCount int) {
log.Printf("Rebalanced: %d instances active, I own %d items",
totalInstances, myItemCount)
})
Or create a WorkPartitioner directly:
wp := extension.NewWorkPartitioner(ext)
wp.OnRebalance(func(totalInstances, myItemCount int) {
log.Printf("Rebalanced: %d instances, %d items mine",
totalInstances, myItemCount)
})
wp.Start()
Use IsMyWork to check whether the current instance should process
a given work item:
// In a bucket watcher loop
for _, key := range discoveredItems {
if wp.IsMyWork(key) {
go processItem(key)
} else {
log.Printf("Skipping %s — owned by another instance", key)
}
}
The partitioner watches the status bucket for instance join and leave
events. When the set of active instances changes, the rebalance
callback fires and IsMyWork results may change. Extensions should
re-evaluate their active work items on each watcher iteration rather
than caching ownership decisions.
Instance-Specific Handlers
Register handlers that are targeted at a specific instance using
AddInstanceHandler. Each instance subscribes to
cirata.extensions.{prefix}.instance.{instance_id}.>, and only the
targeted instance receives the request:
ext.AddInstanceHandler("local-status", func(req micro.Request) {
status := map[string]interface{}{
"instance_id": ext.InstanceID(),
"uptime": time.Since(startTime).String(),
"work_items": wp.MyItemCount(),
}
resp, _ := json.Marshal(status)
req.Respond(resp)
})
Callers can target a specific instance via the HTTP API by adding the
?instance=<id> query parameter to the service proxy URL.
Instance ID
The extension's instance ID is derived from the JWT identifier in the API token. Access it with:
id := ext.InstanceID()
Use the instance ID to tag operational records in shared storage for debugging and traceability.
For a comprehensive guide to multi-instance design patterns, opt-in levels, and best practices, see Multi-Instance Extensions.
Error Handling
Extension Lifecycle
Wrap the Run() call to handle initialization failures:
func main() {
token := os.Getenv("SYMPHONY_TOKEN")
ext := extension.NewExtension(
"My Extension", "my_ext", "Description", token)
if err := ext.Run(); err != nil {
log.Fatalf("Extension failed: %v", err)
}
}
The extension runtime handles SIGINT and SIGTERM signals for graceful shutdown, publishing a disconnect status and draining the NATS connection before exiting.
Handler Error Checklist
- Always call
req.Respond()orreq.Error(), even on error - Return
[]byte("{}")for object endpoints and[]byte("[]")for list endpoints - Log errors for debugging with
log.Printf - Use
json.Unmarshalwith permissive struct tags to avoid deserialization failures on unexpected fields