Skip to main content

Go Extension Development

Cirata Symphony provides a Go library for developing extensions. Go extensions are well suited to high-performance workloads, infrastructure tooling, and environments where a single compiled binary with minimal dependencies is preferred.

Getting Started

Installation

The Symphony Go library is a single Go module, cirata.com/symphony, containing the extension, client, types, and shared packages.

Download the source archive from the Languages and Libraries page, extract it, and point your module at the extracted directory with a single replace directive:

# Extract the library into your module
tar xf cirata-symphony-latest-go-src.tar -C third_party/cirata-symphony
// v0.0.0 is a placeholder — the replace directive below resolves the
// module from the extracted directory, so the version is not used.
require cirata.com/symphony v0.0.0

replace cirata.com/symphony => ./third_party/cirata-symphony

Then import the packages you need:

import (
"cirata.com/symphony/extension"
"cirata.com/symphony/client"
"cirata.com/symphony/types"
)

One replace directive covers every package, because they all belong to the single cirata.com/symphony module. Once the module is published to a Go proxy, go get cirata.com/symphony@latest resolves it directly, without the archive or the replace directive (or pin a specific release, e.g. go get cirata.com/symphony@v1.4.9).

The library requires Go 1.25 or later and depends on the NATS Go client (github.com/nats-io/nats.go) and the NATS micro package for microservice support.

Project Structure

A Go extension is typically a single module containing a main package and any supporting files:

my_extension/
├── main.go # Extension entry point
├── handlers.go # Optional: microservice handler functions
├── pages/ # Optional: UI template files
│ ├── home.tsx
│ └── help.md
├── go.mod # Module definition
└── go.sum # Dependency checksums

Minimal Extension

The simplest possible extension connects to Symphony and publishes its availability:

package main

import (
"log"
"os"

"cirata.com/symphony/extension"
)

func main() {
token := os.Getenv("SYMPHONY_TOKEN")

ext := extension.NewExtension(
"Hello World", // name
"hello", // prefix
"A simple Go extension", // description
token, // token
)

if err := ext.Run(); err != nil {
log.Fatalf("error: %v", err)
}
}

This extension will register with Symphony and remain connected until stopped with SIGINT or SIGTERM. If no token is available, the extension logs an error with instructions and exits.

Extension Initialization

NewExtension

The NewExtension function creates an extension instance:

ext := extension.NewExtension(
name, // Human-readable name displayed in the UI
prefix, // Unique prefix for NATS subjects and routes
description, // Description of the extension's purpose
token, // API token (falls back to SYMPHONY_TOKEN env var)
)
ParameterRequiredDescription
nameYesHuman-readable name displayed in the Symphony UI
prefixYesUnique identifier for NATS subjects and routes
descriptionNoDescription of the extension's purpose
tokenNoAPI token (empty string falls back to environment and credential store)

After creation, use the builder methods to add capabilities, features, and services to the extension, then start it with Run().

Authentication

Extensions authenticate with a token obtained from an API Key. The token is resolved in the following order of precedence:

  1. The token parameter in NewExtension
  2. The OS credential store (Keychain on macOS, Credential Manager on Windows, Secret Service on Linux) — from a previous provisioning run
  3. The SYMPHONY_TOKEN environment variable (also checked by NewClient)
  4. The REGISTRATION_TOKEN environment variable (one-time provisioning)

When REGISTRATION_TOKEN is set, the extension connects, automatically provisions a permanent API key with the extension's declared capabilities, and stores it in the OS credential store (or ~/.config/cirata/ as a fallback). Subsequent runs use the stored token automatically with no interaction required.

If SYMPHONY_TOKEN contains a registration token (detected by its JWT capabilities), it is treated as a registration token and provisioned in the same way.

If no token is available from any source, the extension logs an error with instructions and exits. Extensions never prompt for input on stdin.

Container and Kubernetes deployments

In Docker containers and Kubernetes pods the OS credential store is not available (no keychain daemon). The SDK detects this automatically and falls back to file-based storage, but the file is ephemeral and lost when the container restarts. To avoid re-provisioning on every restart:

  • Recommended: inject a pre-provisioned token via the SYMPHONY_TOKEN environment variable (e.g. from a Kubernetes Secret). No file is written and no credential store is needed.
  • Alternative: mount ~/.config/cirata/ on a persistent volume so an auto-provisioned token survives pod restarts.

Overriding the token-embedded address

When the hostname encoded in the token is not the right one to reach Symphony from where the extension runs, set the SYMPHONY_ADDRESS environment variable. It accepts a bare hostname, host:port, or a full URL with an explicit http or https scheme.

See SYMPHONY_ADDRESS for the full reference: when to use it, the three accepted forms, how scheme and port route to the HTTPS leg only, TLS SAN considerations, the plaintext-HTTP warning, and validation rules. The same variable behaves identically across the Go, Python, Java, and Rust SDKs.

Lifecycle

The Run() method starts the extension and blocks until a shutdown signal is received:

err := ext.Run()

Run() performs the following steps:

  1. Resolves a token from the credential store or environment (see Authentication above)
  2. Establishes a NATS connection to Symphony
  3. Registers the extension in the Symphony registry
  4. Starts periodic status publishing (every 30 seconds)
  5. Subscribes to the commands channel for runtime directives
  6. Blocks until SIGINT or SIGTERM is received
  7. Publishes a disconnect status and drains the connection

The status publishing loop prevents TTL expiration of the extension's entry in Symphony.

Capabilities

Capabilities declare what NATS subjects the extension needs to publish to or subscribe from. During auto-provisioning, these capabilities determine the default permissions embedded in the extension's API key. Users can later adjust these permissions in the Symphony UI on the API Keys page.

// Subscribe capabilities: subjects the extension listens on
ext.AddCapability("sub", "extensions.my_ext", "My Extension")
ext.AddCapability("sub", "extensions.my_ext.process", "Process data")
ext.AddCapability("sub", "extensions.my_ext.query", "Query data")

// Publish capabilities: subjects the extension sends to
ext.AddCapability("pub", "extensions.my_ext.processed", "Data processed event")
ParameterDescription
capType"pub" for publish or "sub" for subscribe
keyNATS subject pattern
descriptionHuman-readable description

Capability names use dot-separated hierarchies. Both subscribe and publish capabilities follow the extensions.<prefix> convention.

Wildcard capabilities using > can allow a range of subjects. For example, extensions.my_ext.> would cover all subjects under that prefix.

User Interface

Extensions add UI components by registering resources, mapping them to routes, and adding menu items to the Symphony navigation. For detailed documentation on each resource type, see User Interfaces.

Resources

A resource is a named piece of UI content identified by a URI and a MIME type:

ext.AddResource("ui://my_ext/home", types.TEXT_SYMPHONY_JSX, pageContent)
ext.AddResource("ui://my_ext/common", types.TEXT_SYMPHONY_MODULE, sharedCode)
ext.AddResource("ui://my_ext/dashboard", types.TEXT_HTMLSYMPHONY, htmlContent)
ext.AddHtmxResource("ui://my_ext/table", "pages/table")
ext.AddResource("ui://my_ext/help", types.TEXT_MARKDOWN, helpContent)

Module resources (types.TEXT_SYMPHONY_MODULE) are imported by JSX resources using import { ... } from '@symphony/extension/my_ext/common'.

HTMX resources use a JSON config instead of HTML content—Symphony provides the HTMX shell with auth injection and theme sync. See HTML + HTMX for details.

For resources that need additional metadata (name, description):

ext.AddResourceFull(
"ui://my_ext/app",
types.TEXT_HTMLPROFILEMCP_APP,
appHTML,
"My App", // name
"Interactive application", // description
)

See User Interfaces for a comparison of the available MIME types and when to use each one.

Including Content from Files

Use os.ReadFile to load UI content from external files:

content, err := os.ReadFile("pages/home.tsx")
if err != nil {
log.Fatalf("failed to read page: %v", err)
}
ext.AddResource("ui://my_ext/home", types.TEXT_SYMPHONY_JSX, string(content))

Or embed files at compile time with go:embed:

import _ "embed"

//go:embed pages/home.tsx
var homePage string

//go:embed pages/help.md
var helpPage string

ext.AddResource("ui://my_ext/home", types.TEXT_SYMPHONY_JSX, homePage)
ext.AddResource("ui://my_ext/help", types.TEXT_MARKDOWN, helpPage)

Routes

Routes map URL paths to resources so they appear as navigable pages:

ext.AddRoute("/my_ext", "ui://my_ext/home")
ext.AddRoute("/my_ext/dashboard", "ui://my_ext/dashboard")

Nested routes create a hierarchy of pages under a parent path:

ext.AddRouteWithChildren("/my_ext", "ui://my_ext/home", map[string]types.RouteMap{
"/settings": {Uri: "ui://my_ext/settings"},
"/detail": {Uri: "ui://my_ext/detail"},
})

Menu items appear in the Symphony navigation sidebar:

ext.AddMenu("main", "My Extension", "/my_ext", "fa-puzzle-piece")
ext.AddMenu("tools", "Data Tools", "/my_ext/tools", "fa-wrench")
ParameterDescription
groupMenu group name (e.g., "main", "tools")
itemDisplay label for the menu entry
routeTarget route path
iconFontAwesome icon class

Widgets

Register dashboard widgets for display in the widget picker:

ext.AddWidget("Status", "ui://my_ext/status")

With notes or a description:

ext.AddWidgetWithNotes("Status", "ui://my_ext/status", "Real-time status monitor")

Help Pages

Extension help pages should use the text/markdown MIME type and be routed under /help/extensions/<ExtensionName>. See Markdown Resources for details on frontmatter, template variables, and conventions.

helpContent, err := os.ReadFile("pages/help.md")
if err != nil {
log.Fatalf("failed to read help: %v", err)
}

ext.AddResource("ui://my_ext/help", types.TEXT_MARKDOWN, string(helpContent))
ext.AddRoute("/help/extensions/MyExtension", "ui://my_ext/help")

Microservice Endpoints

Extensions expose functionality as NATS microservices using the AddService method and the NATS micro package.

Creating a Service

AddService creates a NATS microservice and must be called after Run() has established the connection. Because Run() blocks, services are typically set up using a pattern that separates configuration from execution:

package main

import (
"encoding/json"
"log"
"os"

"cirata.com/symphony/extension"
"cirata.com/symphony/types"
"github.com/nats-io/nats.go/micro"
)

func main() {
token := os.Getenv("SYMPHONY_TOKEN")

ext := extension.NewExtension(
"My Extension", "my_ext",
"Provides data processing", token,
)

// Configure capabilities, UI, and storage before Run()
ext.AddCapability("sub", "extensions.my_ext", "My Extension")
ext.AddResource("ui://my_ext/home", types.TEXT_SYMPHONY_JSX, pageContent)
ext.AddRoute("/my_ext", "ui://my_ext/home")
ext.AddMenu("main", "My Extension", "/my_ext", "fa-puzzle-piece")
ext.AddBucket("my_ext_data", "Extension data", 0)

if err := ext.Run(); err != nil {
log.Fatalf("error: %v", err)
}
}

For extensions that need microservice endpoints, use AddService after the connection is established inside the extension lifecycle:

svc, err := ext.AddService("my_ext", "1.0.0")
if err != nil {
log.Fatalf("failed to add service: %v", err)
}

// Create a group for subject namespacing
group := svc.AddGroup("cirata.extensions.my_ext")

// Add endpoint handlers
err = group.AddEndpoint("process", micro.HandlerFunc(processHandler))
if err != nil {
log.Fatalf("failed to add endpoint: %v", err)
}

The group name determines the NATS subject prefix. An endpoint named process in the group cirata.extensions.my_ext listens on subject cirata.extensions.my_ext.process.

Handler Functions

Handlers receive a micro.Request and must always respond:

func processHandler(req micro.Request) {
var input map[string]interface{}
if err := json.Unmarshal(req.Data(), &input); err != nil {
req.Error("400", "invalid JSON", nil)
return
}

result := map[string]interface{}{
"status": "success",
"processed": true,
"input": input,
}

response, _ := json.Marshal(result)
req.Respond(response)
}

Key rules for handlers:

  1. Always respond. Every handler must call req.Respond() or req.Error(). A handler that does not respond causes the caller to time out.
  2. Return appropriate empty values. Use "{}" for single-object endpoints and "[]" for list endpoints.
  3. Handle errors gracefully. Use req.Error() for structured error responses or respond with an error payload.

Handler Patterns

JSON with typed structs:

type ProcessRequest struct {
Input string `json:"input"`
Mode string `json:"mode"`
}

type ProcessResponse struct {
Status string `json:"status"`
Result string `json:"result"`
}

func typedHandler(req micro.Request) {
var input ProcessRequest
if err := json.Unmarshal(req.Data(), &input); err != nil {
req.Error("400", "invalid request", nil)
return
}

result := ProcessResponse{
Status: "success",
Result: "processed: " + input.Input,
}

response, _ := json.Marshal(result)
req.Respond(response)
}

Returning a list:

func listHandler(req micro.Request) {
items := []map[string]interface{}{
{"id": "1", "name": "Item 1"},
{"id": "2", "name": "Item 2"},
}

response, err := json.Marshal(items)
if err != nil {
req.Respond([]byte("[]"))
return
}
req.Respond(response)
}

Outbound NATS Requests and Timeouts

Handlers can call other services using the NATS connection captured from the enclosing scope. All outbound requests require a timeout:

func makeEnrichHandler(nc *nats.Conn) micro.HandlerFunc {
return func(req micro.Request) {
// Make an outbound request to another extension's service
reply, err := nc.Request(
"cirata.extensions.catalog.lookup",
req.Data(),
5*time.Second,
)
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
// No response received within the timeout window
req.Error("503", "catalog service timed out", nil)
} else if errors.Is(err, nats.ErrNoResponders) {
// No service is listening on that subject
req.Error("503", "catalog service unavailable", nil)
} else {
req.Error("500", err.Error(), nil)
}
return
}

req.Respond(reply.Data)
}
}

Pass the connection to the handler at setup time using a closure:

nc := ext.Client().GetSymphonyClient().GetNatsConnection()
group.AddEndpoint("enrich", makeEnrichHandler(nc))

For timeout control tied to a broader operation context, use nc.RequestMsgWithContext:

func makeContextHandler(nc *nats.Conn) micro.HandlerFunc {
return func(req micro.Request) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

reply, err := nc.RequestMsgWithContext(ctx, &nats.Msg{
Subject: "cirata.extensions.catalog.lookup",
Data: req.Data(),
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, nats.ErrTimeout) {
req.Error("503", "catalog service timed out", nil)
} else {
req.Error("500", err.Error(), nil)
}
return
}

req.Respond(reply.Data)
}
}

context.WithTimeout is preferable when the outbound call is one step in a larger pipeline—cancelling the context cancels all downstream requests immediately.

Choosing a timeout value:

ScenarioSuggested timeout
KV read or single-hop NATS round-trip2–3 s
Extension-to-extension service call5 s
Long-running computation30–60 s
No reply expecteduse nc.Publish() instead

Error values to check:

ErrorMeaning
nats.ErrTimeoutNo response within the timeout window
nats.ErrNoRespondersNo service is subscribed to that subject
context.DeadlineExceededContext deadline fired before a reply
warning

nc.Request requires a timeout—there is no overload without one and no package-level default. Passing 0 causes an immediate timeout. Always set a value that reflects the expected latency of the service being called, so the limit is explicit at the call site.

REST API Response Format

When endpoints are exposed as REST APIs (via an OpenAPI spec in the service metadata), handlers can respond in two ways:

Raw response (simplest)—respond with the content directly. Symphony auto-detects the content type: valid JSON is served as application/json, anything else as text/plain.

// JSON response
func listHandler(req micro.Request) {
items := []Item{{ID: "1", Name: "Widget"}}
data, _ := json.Marshal(items)
req.Respond(data)
}

// Plain text response
func pingHandler(req micro.Request) {
req.Respond([]byte("pong"))
}

Wrapped response (full control)—for control over the HTTP status code, content type, or response headers, wrap the response in a JSON object with a body field:

func createHandler(req micro.Request) {
var input CreateRequest
if err := json.Unmarshal(req.Data(), &input); err != nil {
errResp, _ := json.Marshal(map[string]interface{}{
"body": map[string]string{"error": err.Error()},
"statusCode": 400,
})
req.Respond(errResp)
return
}

item := createItem(input)
resp, _ := json.Marshal(map[string]interface{}{
"body": map[string]interface{}{"id": item.ID, "status": "created"},
"statusCode": 201,
"contentType": "application/json",
})
req.Respond(resp)
}
FieldRequiredDescription
bodyYesThe response payload. For JSON content types, any JSON value. For other types (e.g. text/plain), a string.
statusCodeNoHTTP status code (100–599). Defaults to 200.
contentTypeNoMIME type. Defaults to application/json.
headersNoAdditional HTTP headers as a string-to-string map.

Symphony detects the format by checking whether the response is a JSON object with a body key. Both formats work identically whether the endpoint is called through the REST API or directly over NATS.

Dynamic Updates

Extensions can update their resources at runtime without restarting. This is useful for dashboards that reflect changing state:

err := ext.UpdateResource(
"ui://my_ext/dashboard",
types.TEXT_SYMPHONY_JSX,
updatedContent,
)
if err != nil {
log.Printf("failed to update resource: %v", err)
}

The UpdateResource method replaces the resource content and re-registers the extension so that Symphony reflects the changes immediately.

To re-publish the extension info after modifying other fields (such as adding new routes or menus at runtime), use UpdateExtensionInfo:

ext.AddRoute("/my_ext/new-page", "ui://my_ext/new_page")
if err := ext.UpdateExtensionInfo(); err != nil {
log.Printf("failed to update extension info: %v", err)
}

Usage Reporting

Extensions that consume licensed resources should report their usage to Symphony for licensing and chargeback. The ReportUsage method sends dimension measurements to Symphony, which converts them into license units using the rates embedded in the active license.

Reporting Usage

units, err := ext.ReportUsage(map[string]float64{
"tables_replicated": 5,
"bytes_transferred_gb": 120.5,
})
if err != nil {
log.Printf("failed to report usage: %v", err)
}
log.Printf("consumed %.2f units", units)

The dimensions map contains named measurements specific to your extension. The keys must match the dimension names defined in the license's unit rates for your extension prefix. If any dimension key is not covered by a license rate, Symphony will add your extension to the disabled_extensions list (per-extension enforcement).

Return Values

ReportUsage returns:

ValueTypeDescription
unitsfloat64The computed license units consumed by this report
errerrorNon-nil if the report failed

Subscribing to Enforcement State

Symphony broadcasts the enforcement state on the cirata.symphony.enforcement NATS subject whenever it changes. Subscribe to this subject to receive real-time updates—including changes triggered by other extensions, license uploads, or administrative actions:

nc := ext.Client().GetSymphonyClient().GetNatsConnection()

_, err := nc.Subscribe("cirata.symphony.enforcement", func(msg *nats.Msg) {
var state types.EnforcementState
if err := json.Unmarshal(msg.Data, &state); err != nil {
log.Printf("failed to parse enforcement state: %v", err)
return
}

switch state.GetStatus() {
case types.ENFORCEMENT_ENFORCED:
log.Printf("Enforcement active: %s — pausing operations", state.GetMessage())
// Stop or reduce licensed operations
case types.ENFORCEMENT_WARNING:
log.Println("Approaching license limit")
case types.ENFORCEMENT_GRACE:
log.Println("Units exhausted — in grace period")
default:
log.Println("Enforcement OK")
}
})

You can also fetch the current enforcement state on demand without reporting usage—for example, when your extension first starts:

msg, err := nc.Request("cirata.services.usage.enforcement", []byte("{}"), 5*time.Second)
if err != nil {
log.Printf("failed to fetch enforcement state: %v", err)
} else {
var state types.EnforcementState
json.Unmarshal(msg.Data, &state)
log.Printf("Current enforcement status: %s", state.GetStatus())
}

Enforcement States

StatusMeaning
enforcement_okNormal operation—units available
enforcement_warning90% or more of licensed units consumed
enforcement_graceUnits exhausted or license expired, within vendor-set grace period
enforcement_enforcedGrace period expired or no valid license—stop licensed operations

Extensions may also be individually disabled via the disabled_extensions field, or flagged as grace-only via the grace_extensions field, even when the global status is enforcement_ok. Check both lists for your extension prefix to determine your operational state.

info

The Go ReportUsage method returns only the computed units and an error. To receive the full enforcement state (including disabled_extensions and grace_extensions), subscribe to the cirata.symphony.enforcement broadcast or query cirata.services.usage.enforcement. The Python and Java libraries return the full enforcement state inline with each usage report.

Attribution

When an extension reports usage, Symphony checks whether any attribution rules match the extension prefix and reporting account. If a rule matches, the usage record is automatically assigned to a business unit with attribution method auto. Administrators can also manually attribute records or override automatic attributions via the Attribution UI.

When to Report

Report usage at meaningful checkpoints—for example, after completing a replication batch, processing a set of queries, or transferring a block of data. Avoid reporting on every individual operation if the volume is very high; instead, batch measurements and report periodically.

// Example: report after each replication batch
func onBatchComplete(ext *extension.Extension, results []TableResult) {
var totalBytes float64
for _, r := range results {
totalBytes += r.Bytes
}

units, err := ext.ReportUsage(map[string]float64{
"tables_replicated": float64(len(results)),
"bytes_transferred_gb": totalBytes / 1e9,
})
if err != nil {
log.Printf("usage report failed: %v", err)
return
}
log.Printf("batch consumed %.2f units", units)
}

See Usage Tracking for how administrators view and manage reported usage.

Storage

Go extensions use the NATS JetStream key-value API for persistent storage. Buckets are declared during extension setup and are created automatically by the platform.

Declaring Buckets

Declare buckets using the AddBucket builder method:

ext.AddBucket("my_ext_data", "Extension data", 0)
ext.AddBucket("my_ext_events", "Event log with 24h TTL", 86400)
ParameterDescription
nameBucket name
descriptionHuman-readable description
ttlTime-to-live in seconds. 0 means entries do not expire

The platform creates or updates buckets when the extension registers. Re-registering with changed parameters updates the configuration without losing existing data. In a clustered deployment, the platform picks a replica count that matches its own buckets (R=3 on 3–4 node clusters, R=5 on 5+ nodes, R=1 standalone), so declared buckets inherit the same durability guarantees as the rest of Symphony's state.

Runtime Bucket Creation

If your extension needs to create buckets on demand rather than through the registration manifest, use the SDK helper so the replica count matches the platform's policy:

nc := ext.Client().GetSymphonyClient().GetNatsConnection()
js, _ := jetstream.New(nc)

kv, err := extension.CreateOrUpdateKeyValue(ctx, js, nc, jetstream.KeyValueConfig{
Bucket: "runtime_data",
Description: "Created on demand",
// Replicas omitted — SDK populates it from cirata.services.cluster.info.
})

Existing code that calls js.CreateOrUpdateKeyValue directly continues to work, but will use the NATS default of R=1 unless you set the replica count yourself. Use extension.GetClusterReplicas(ctx, nc) to query the value directly if you need it for a stream or other non-KeyValue config.

Accessing Buckets

Use the NATS connection from the client to access JetStream key-value stores:

nc := ext.Client().GetSymphonyClient().GetNatsConnection()
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("failed to create JetStream context: %v", err)
}

ctx := context.Background()
kv, err := js.KeyValue(ctx, "my_ext_data")
if err != nil {
log.Fatalf("failed to access bucket: %v", err)
}

Basic Operations

ctx := context.Background()

// Store a value
_, err := kv.Put(ctx, "item:123",
[]byte(`{"name": "Widget"}`))

// Retrieve a value
entry, err := kv.Get(ctx, "item:123")
if err == nil {
value := string(entry.Value())
}

// Delete a key
err = kv.Delete(ctx, "item:123")

// List all keys
keys, err := kv.Keys(ctx)

Batch Write

When writing many keys at once, use BatchPut to publish all entries concurrently instead of calling Put() in a loop. Each individual Put() is a synchronous round-trip; the batch API fires all publishes via JetStream.PublishAsync and collects acks in parallel.

import "cirata.com/symphony/extension"

entries := []extension.BatchEntry{
{Key: "item:1", Value: []byte(`{"name":"Widget"}`)},
{Key: "item:2", Value: []byte(`{"name":"Gadget"}`)},
}

result := extension.BatchPut(js, "my_ext_data", entries)

log.Printf("Batch write: %d succeeded, %d failed out of %d",
result.Succeeded, result.Failed, result.Total)

for key, err := range result.Errors {
log.Printf("Failed key '%s': %v", key, err)
}

The batch API never fails fast—a single bad key does not prevent the remaining entries from being written. BatchPutResult.Errors maps failed key names to their errors. Values written this way are fully readable via standard kv.Get().

Using the Client Library

The Go client library can also be used independently to interact with Symphony and invoke services provided by other extensions:

package main

import (
"context"
"encoding/json"
"fmt"
"log"
"time"

"cirata.com/symphony/client"
)

func main() {
// Empty string reads SYMPHONY_TOKEN from the environment
c, err := client.NewClient("")
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}

err = c.Connect()
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}

ctx, cancel := context.WithTimeout(
context.Background(), 5*time.Second)
defer cancel()

// Get Symphony info
info, err := c.Info(ctx)
if err != nil {
log.Fatalf("Failed to get info: %v", err)
}
fmt.Printf("Info: %s\n", string(info))

// Invoke an extension service
handle := c.Extension("my_ext")

var response map[string]interface{}
err = handle.InvokeJSON(ctx, "process",
map[string]string{"input": "data"}, &response)
if err != nil {
log.Fatalf("Failed to invoke: %v", err)
}
fmt.Printf("Response: %v\n", response)
}

The InvokeJSON helper marshals the request to JSON, sends a NATS request to the extension's service, and unmarshals the response into the provided variable.

Logging

Extensions should use NewSymphonyJSONHandler for structured JSON output to stderr with field names consistent across all Symphony SDKs (timestamp, level, message):

import (
"log/slog"
"os"

"cirata.com/symphony/extension/observability"
)

slog.SetDefault(slog.New(
observability.NewSymphonyJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}),
))

slog.Info("Extension started", "version", "1.0")

Do not write log files. stdout/stderr is captured automatically by journald (systemd) or the container runtime (Docker/Kubernetes).

Observability

Extensions can participate in Symphony's unified telemetry system by enabling observability. This registers NATS endpoints for metrics, logs, and traces that the Observability Extension collects automatically.

obs, _ := observability.Enable(ext, svc, "myext")

// Feed logs into both stderr and the OTLP pipeline
slog.SetDefault(slog.New(observability.NewMultiHandler(
observability.NewSymphonyJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}),
obs.GetLogCapture().Handler(),
)))

// Instrument handlers with automatic trace propagation
grp.AddEndpoint("process", obs.InstrumentedHandler("process",
func(req micro.Request) {
obs.GetMetricsCapture().IncrementCounter("requests.total", 1)
// ... process request ...
req.Respond(result)
},
))

slog.Info("Extension started")

The MultiHandler fans out every log record to both stderr (for journald) and the OTLP log capture buffer (for the Observability Extension). This is the same pattern the platform itself uses.

For complete coverage of traces, metrics, logs, and distributed tracing patterns, see Observability.

Multi-Instance Support

Go extensions can run as multiple instances of the same extension type for scalability, fault tolerance, and workload distribution. The SDK provides built-in support for work partitioning, queue-based command delivery, and instance-specific messaging.

Automatic Load Balancing

Microservice endpoints registered with AddService are automatically load-balanced across all instances by NATS micro. No code changes are needed—deploying additional instances immediately distributes incoming service requests across them.

The SDK also uses queue subscribe for the commands channel, ensuring that each command is processed by exactly one instance.

WorkPartitioner

For extensions that run background tasks (watchers, monitors, replicators), the WorkPartitioner distributes work items across instances using consistent hashing.

Enable via the extension builder:

ext.EnableWorkPartitioning(func(totalInstances, myItemCount int) {
log.Printf("Rebalanced: %d instances active, I own %d items",
totalInstances, myItemCount)
})

Or create a WorkPartitioner directly:

wp := extension.NewWorkPartitioner(ext)
wp.OnRebalance(func(totalInstances, myItemCount int) {
log.Printf("Rebalanced: %d instances, %d items mine",
totalInstances, myItemCount)
})
wp.Start()

Use IsMyWork to check whether the current instance should process a given work item:

// In a bucket watcher loop
for _, key := range discoveredItems {
if wp.IsMyWork(key) {
go processItem(key)
} else {
log.Printf("Skipping %s — owned by another instance", key)
}
}

The partitioner watches the status bucket for instance join and leave events. When the set of active instances changes, the rebalance callback fires and IsMyWork results may change. Extensions should re-evaluate their active work items on each watcher iteration rather than caching ownership decisions.

Instance-Specific Handlers

Register handlers that are targeted at a specific instance using AddInstanceHandler. Each instance subscribes to cirata.extensions.{prefix}.instance.{instance_id}.>, and only the targeted instance receives the request:

ext.AddInstanceHandler("local-status", func(req micro.Request) {
status := map[string]interface{}{
"instance_id": ext.InstanceID(),
"uptime": time.Since(startTime).String(),
"work_items": wp.MyItemCount(),
}
resp, _ := json.Marshal(status)
req.Respond(resp)
})

Callers can target a specific instance via the HTTP API by adding the ?instance=<id> query parameter to the service proxy URL.

Instance ID

The extension's instance ID is derived from the JWT identifier in the API token. Access it with:

id := ext.InstanceID()

Use the instance ID to tag operational records in shared storage for debugging and traceability.

For a comprehensive guide to multi-instance design patterns, opt-in levels, and best practices, see Multi-Instance Extensions.

Error Handling

Extension Lifecycle

Wrap the Run() call to handle initialization failures:

func main() {
token := os.Getenv("SYMPHONY_TOKEN")
ext := extension.NewExtension(
"My Extension", "my_ext", "Description", token)

if err := ext.Run(); err != nil {
log.Fatalf("Extension failed: %v", err)
}
}

The extension runtime handles SIGINT and SIGTERM signals for graceful shutdown, publishing a disconnect status and draining the NATS connection before exiting.

Handler Error Checklist

  1. Always call req.Respond() or req.Error(), even on error
  2. Return []byte("{}") for object endpoints and []byte("[]") for list endpoints
  3. Log errors for debugging with log.Printf
  4. Use json.Unmarshal with permissive struct tags to avoid deserialization failures on unexpected fields