Skip to main content

Multi-Instance Extensions

Symphony supports running multiple instances of the same extension type simultaneously. This enables horizontal scaling for high-throughput workloads, fault tolerance through redundancy, and workload distribution across nodes or availability zones. Multiple instances of an extension share the same prefix, storage buckets, and service subjects—the platform handles routing, load balancing, and work distribution automatically.

API Key Requirements

Each instance of an extension must have its own API key. The API key contains a unique identifier derived from the JWT that Symphony uses to distinguish instances. This identifier is used for status tracking, instance-specific messaging, and work partitioning. If two instances share the same API key, they will have the same identifier and the platform will treat them as a single instance—work partitioning, status tracking, and instance-targeted messaging will not function correctly.

To set up multiple instances:

  1. Navigate to the API Keys page in the Symphony UI.
  2. Create a separate API key for each instance you plan to run.
  3. Configure each instance with its own token using the method appropriate for your extension (e.g., SYMPHONY_TOKEN environment variable, --token command-line flag, or a configuration file).

All instances share the same extension prefix, capabilities, features, and storage buckets. Only the API key (and therefore the token) differs between instances.

How It Works

When multiple instances of the same extension connect to Symphony, several mechanisms work together to distribute requests and background work:

NATS micro load balancing. Stateless service requests (microservice endpoints) are automatically load-balanced across all instances by the NATS micro framework. Each instance subscribes to the same service subjects, and NATS distributes incoming requests in a round-robin fashion. No code changes are needed for this behavior.

WorkPartitioner for background tasks. Background work items — such as monitors, replicators, and scheduled jobs—are distributed across instances using consistent hashing. Each instance watches the extension's status bucket to discover other instances, computes hash(key) % len(instances) to determine ownership, and only processes work items it owns. When instances join or leave, a rebalance callback fires and work is redistributed.

Queue subscribe for commands. The SDK subscribes to the extension's commands channel using a NATS queue group ({prefix}-commands), so only one instance handles each command message. This prevents duplicate processing of administrative directives.

Instance discovery. Each instance publishes its status to the Symphony status bucket with a TTL of 60 seconds. All instances watch this bucket and maintain a sorted list of active peers. This list is used by the WorkPartitioner to compute ownership.

Opt-In Levels

Multi-instance support is designed as a progressive opt-in. Extensions can operate at different levels depending on their needs.

Level 0: Passive (No Changes)

Extensions that only serve stateless microservice endpoints work with multiple instances without any code changes. NATS micro automatically distributes requests across all connected instances. This level is appropriate for query services, data formatters, and other extensions that do not maintain background tasks or local state.

When you deploy additional instances of a Level 0 extension, each instance registers with Symphony and begins receiving a share of incoming requests immediately.

Level 1: Partitioned Work

Extensions that run background tasks—watching buckets, polling external systems, running scheduled jobs—should use the WorkPartitioner to distribute those tasks across instances. Without partitioning, every instance would run every background task, leading to duplicate work.

The WorkPartitioner assigns work items to instances based on a consistent hash of the item key. Each instance calls IsMyWork (Go), is_my_work (Python), or isMyWork (Java) to check whether it owns a given work item.

Go:

wp := extension.NewWorkPartitioner(ext)
wp.OnRebalance(func(total, mine int) {
log.Printf("Rebalance: %d total instances, I own %d items", total, mine)
})
wp.Start()

// In a watcher loop:
for _, key := range allWorkItems {
if wp.IsMyWork(key) {
processItem(key)
}
}

To enable the WorkPartitioner via the extension builder:

ext.EnableWorkPartitioning(func(total, mine int) {
log.Printf("Rebalanced: %d instances, %d items mine", total, mine)
})

Python:

from cirata.symphony import Extension

ext = Extension("My Extension", "my_ext")

def on_rebalance(total, mine):
logging.info(f"Rebalance: {total} instances, {mine} items mine")

ext.enable_work_partitioning(on_rebalance)

# In a watcher loop:
for key in all_work_items:
if ext.work_partitioner.is_my_work(key):
await process_item(key)

Java:

WorkPartitioner wp = new WorkPartitioner(runtime);
wp.onRebalance((total, mine) -> {
log.info("Rebalance: {} total instances, I own {} items", total, mine);
});
wp.start();

// In a watcher loop:
for (String key : allWorkItems) {
if (wp.isMyWork(key)) {
processItem(key);
}
}

Rust:

use cirata_symphony::WorkPartitioner;
use std::sync::Arc;

let partitioner = ext.enable_work_partitioning(Some(Arc::new(|| {
println!("Instances changed, re-evaluating work assignments");
})));

let js = ext.jetstream().await?;
let watcher_handle = partitioner.start(&js).await?;

// In a watcher loop:
for key in &all_work_items {
if partitioner.is_my_work(key).await {
process_item(key).await;
}
}

Level 2: Instance-Targeted Messaging

For advanced coordination scenarios—such as querying a specific instance's local state or implementing leader election—extensions can register instance-specific handlers. Each instance subscribes to cirata.extensions.{prefix}.instance.{instance_id}.> for messages targeted at that specific instance.

Go:

ext.AddInstanceHandler("status", func(req micro.Request) {
// This handler only runs on the targeted instance
status := getLocalStatus()
resp, _ := json.Marshal(status)
req.Respond(resp)
})

Python:

async def instance_status(req):
status = get_local_status()
await req.respond(json.dumps(status).encode("utf-8"))

ext.add_instance_handler("status", instance_status)

Targeting a specific instance via the HTTP API:

The Symphony service proxy accepts an instance query parameter to route a request to a specific extension instance:

curl -H "Authorization: Bearer <token>" \
"https://your-symphony-instance.com/api/v1/extensions/my_ext/service/status?instance=abc123"

The instance parameter value is the instance ID, which is derived from the JWT identifier of the extension's API token.

Work Distribution

The WorkPartitioner uses consistent hashing to assign work items to instances. The algorithm is:

  1. All active instances are discovered from the status bucket and sorted by instance ID.
  2. For each work item key, the hash is computed as hash(key) % len(instances).
  3. If the result equals the current instance's index in the sorted list, the item belongs to this instance.

This approach ensures that each work item is owned by exactly one instance at any time. When the instance count changes, approximately 1/N of the work items are reassigned (where N is the new instance count), minimizing disruption.

The IsMyWork / is_my_work / isMyWork method performs this computation and returns a boolean. It is designed to be called frequently—on every iteration of a watcher loop, for example — with negligible overhead.

Graceful Shutdown and Failover

Each instance publishes its status to the status bucket with a TTL of 60 seconds. As long as the instance is running, it refreshes this entry every 30 seconds. When an instance stops:

  • If the instance shuts down gracefully (SIGINT/SIGTERM), it publishes a disconnect status immediately.
  • If the instance crashes or loses connectivity, its status entry expires after the 60-second TTL.

In both cases, the remaining instances detect the departure through their status bucket watch, update their instance list, and trigger a rebalance. Work items that were owned by the departed instance are redistributed among the survivors.

There is no single point of failure in this design. Any instance can depart without coordination, and the remaining instances will automatically pick up its work within the TTL window.

Best Practices

  • Keep service endpoints stateless. Microservice handlers should not rely on local state that varies between instances. NATS routes requests to any available instance, so a handler that depends on local state may produce inconsistent results.

  • Use WorkPartitioner for background watchers. Any background loop that iterates over work items should gate processing with IsMyWork. This prevents duplicate work and ensures even distribution.

  • Tag operational records with instance_id. When writing operational data (logs, status entries, progress records) to shared storage, include the instance ID so that administrators can trace which instance performed each action.

  • Do not cache state that should be shared. If multiple instances need access to the same data, store it in a JetStream bucket rather than in local memory. Local caches are invisible to other instances and will diverge.

  • Handle rebalance gracefully. The rebalance callback fires when instances join or leave. Use it to cancel work that is no longer owned and to start processing newly assigned items.

  • Test with multiple instances locally. Run two or three instances of your extension during development to verify that work is distributed correctly and that no instance-local assumptions are violated.

Complete Example: Alert Monitor

The following example ties together all three levels of multi-instance support in a single extension. The Alert Monitor watches a set of alert rules and evaluates them periodically. Each rule is evaluated by exactly one instance.

With one instance running, all rules are evaluated by that instance. When a second instance starts, the WorkPartitioner redistributes rules automatically. When an instance stops, survivors pick up its rules within 60 seconds.

The example demonstrates:

FeatureHow it's used
Service endpointcheck_alert—any instance handles a one-off check (load-balanced)
WorkPartitionerDistributes rules so each is evaluated by exactly one instance
Instance handlerstatus—query a specific instance for its assigned rules

Go:

package main

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/cirata-io/symphony/libs/go/extension"
"github.com/nats-io/nats.go"
)

func main() {
ext := extension.NewExtension()

// 1. Register a load-balanced service endpoint.
ext.AddEndpoint("check_alert", func(msg *nats.Msg) {
ruleName := string(msg.Data)
result := evaluateRule(ruleName)
msg.Respond([]byte(result))
})

// 2. Register an instance-specific handler.
ext.AddInstanceHandler("status", func(msg *nats.Msg) {
resp, _ := json.Marshal(myRules)
msg.Respond(resp)
})

// 3. Enable work partitioning.
var wp *extension.WorkPartitioner
wp, _ = ext.EnableWorkPartitioning(func() {
slog.Info("Rebalanced", "instances", len(wp.ActiveInstances()))
reassignRules(wp)
})

// 4. Connect and run.
ext.Run(context.Background())

// 5. Start the partitioner and evaluate rules.
wp.Start(context.Background())
reassignRules(wp)

for {
for _, rule := range myRules {
slog.Info("Alert check", "rule", rule, "result", evaluateRule(rule))
}
time.Sleep(30 * time.Second)
}
}

var myRules []string

func reassignRules(wp *extension.WorkPartitioner) {
allRules := []string{"cpu-high", "disk-full", "memory-low", "net-latency"}
myRules = nil
for _, rule := range allRules {
if wp.IsMyWork(rule) {
myRules = append(myRules, rule)
}
}
slog.Info("Assigned rules", "rules", myRules)
}

func evaluateRule(name string) string {
return fmt.Sprintf("rule %s: OK", name)
}

Python:

import asyncio
import json
import logging

from cirata.symphony import Extension
from cirata.symphony.logging import configure_logging

configure_logging("alertmonitor")
log = logging.getLogger(__name__)

my_rules: list[str] = []
ALL_RULES = ["cpu-high", "disk-full", "memory-low", "net-latency"]


def evaluate_rule(name: str) -> str:
return f"rule {name}: OK"


async def main():
ext = Extension()

# 1. Register a load-balanced service endpoint.
@ext.endpoint("check_alert")
async def check_alert(msg):
rule_name = msg.data.decode()
await msg.respond(evaluate_rule(rule_name).encode())

# 2. Register an instance-specific handler.
async def status_handler(msg):
await msg.respond(json.dumps(my_rules).encode())

ext.add_instance_handler("status", status_handler)

# 3. Enable work partitioning.
def on_rebalance():
reassign_rules(wp)

wp = ext.enable_work_partitioning(on_rebalance=on_rebalance)

# 4. Connect and run.
await ext.operate()

# 5. Start the partitioner and evaluate rules.
await wp.start()
reassign_rules(wp)

while True:
for rule in my_rules:
log.info("Alert check: %s -> %s", rule, evaluate_rule(rule))
await asyncio.sleep(30)


def reassign_rules(wp):
global my_rules
my_rules = [r for r in ALL_RULES if wp.is_my_work(r)]
log.info("Assigned rules: %s", my_rules)


if __name__ == "__main__":
asyncio.run(main())

Java:

package com.example.alertmonitor;

import com.cirata.symphony.ExtensionRuntime;
import com.cirata.symphony.WorkPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AlertMonitor {
private static final Logger log = LoggerFactory.getLogger(AlertMonitor.class);

private static final List<String> ALL_RULES =
Arrays.asList("cpu-high", "disk-full", "memory-low", "net-latency");

private static List<String> myRules = new ArrayList<>();

public static void main(String[] args) throws Exception {
ExtensionRuntime runtime = new ExtensionRuntime();

// 1. Register a load-balanced service endpoint.
runtime.addEndpoint("check_alert", msg -> {
String ruleName = new String(msg.getData());
String result = evaluateRule(ruleName);
msg.getConnection().publish(msg.getReplyTo(), result.getBytes());
});

// 2. Connect and run.
runtime.run();

// 3. Enable work partitioning.
WorkPartitioner wp = new WorkPartitioner(
runtime.getExtensionId(),
runtime.getPrefix(),
runtime.getConnection(),
() -> reassignRules(wp)
);
wp.start();

// 4. Initial rule assignment.
reassignRules(wp);

// 5. Periodically evaluate assigned rules.
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
for (String rule : myRules) {
log.info("Alert check: {} -> {}", rule, evaluateRule(rule));
}
}, 0, 30, TimeUnit.SECONDS);
}

private static void reassignRules(WorkPartitioner wp) {
List<String> assigned = new ArrayList<>();
for (String rule : ALL_RULES) {
if (wp.isMyWork(rule)) {
assigned.add(rule);
}
}
myRules = assigned;
log.info("Assigned rules: {}", myRules);
}

private static String evaluateRule(String name) {
return String.format("rule %s: OK", name);
}
}

Rust:

use cirata_symphony::{Extension, Result};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{info, Level};

static ALL_RULES: &[&str] = &["cpu-high", "disk-full", "memory-low", "net-latency"];

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();

let mut ext = Extension::new();
let my_rules: Arc<RwLock<Vec<String>>> = Arc::new(RwLock::new(Vec::new()));

// 1. Register a load-balanced service endpoint.
ext.add_endpoint("check_alert", |msg| {
let rule_name = String::from_utf8_lossy(&msg.payload);
let result = format!("rule {}: OK", rule_name);
async move { Ok(result.into_bytes()) }
});

// 2. Register an instance-specific handler.
let rules_for_handler = my_rules.clone();
ext.add_instance_handler("status", move |msg| {
let rules = rules_for_handler.clone();
async move {
let r = rules.read().await;
let json = serde_json::to_vec(&*r).unwrap_or_default();
Ok(json)
}
});

// 3. Enable work partitioning.
let wp = ext.enable_work_partitioning(Some(Arc::new(move || {
info!("Rebalance triggered");
})));

// 4. Connect and run.
let js = ext.run().await?;

// 5. Start the partitioner and evaluate rules.
let _handle = wp.start(&js).await;
reassign_rules(&wp, &my_rules).await;

loop {
{
let rules = my_rules.read().await;
for rule in rules.iter() {
info!("Alert check: {} -> rule {}: OK", rule, rule);
}
}
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
reassign_rules(&wp, &my_rules).await;
}
}

async fn reassign_rules(
wp: &cirata_symphony::WorkPartitioner,
my_rules: &Arc<RwLock<Vec<String>>>,
) {
let mut assigned = Vec::new();
for rule in ALL_RULES {
if wp.is_my_work(rule).await {
assigned.push(rule.to_string());
}
}
info!("Assigned rules: {:?}", assigned);
*my_rules.write().await = assigned;
}

How it behaves at runtime

Single instance: WorkPartitioner sees one entry in the status bucket. Every isMyWork call returns true—the sole instance evaluates all four rules.

Second instance starts: Both instances detect the change and fire their rebalance callbacks. Rules are redistributed:

hash("cpu-high")    % 2 = 0  → Instance A
hash("disk-full") % 2 = 1 → Instance B
hash("memory-low") % 2 = 0 → Instance A
hash("net-latency") % 2 = 1 → Instance B

The hash function (FNV-1a 32-bit) is identical across all four SDKs, so any combination of languages will partition consistently.

Instance stops: Its status entry expires (60-second TTL). Survivors rebalance and pick up the orphaned rules.

Service requests: A call to check_alert is load-balanced by NATS micro—any instance can handle it without partitioning.

Targeting a specific instance: To query a particular instance's status, use the instance query parameter:

curl -H "Authorization: Bearer <token>" \
"https://your-symphony-instance.com/api/v1/extensions/alert_monitor/service/status?instance=<instance_id>"

Storage Considerations

All instances of an extension share the same JetStream buckets. Configuration data stored in buckets is visible to every instance. This is by design—shared configuration ensures consistency.

Operational data (such as per-item progress or status) should be tagged with the instance ID to allow debugging and to avoid write conflicts. A common pattern is to use composite keys:

progress:{work_item_key}:{instance_id}

When an instance departs and its work items are reassigned, the new owner should be prepared to read and continue from the previous instance's progress records.

See Also