Skip to main content

Java Extension Development

Cirata Symphony provides a Java library for developing extensions. Java extensions are well suited to enterprise workloads, complex data pipelines, and environments where the Java ecosystem is already established. The recommended approach uses Spring Boot with dependency injection, though a plain Java alternative is also supported.

Getting Started

Installation

The Symphony Java library is distributed as a JAR file. Download it from the Languages and Libraries page or obtain it from the Symphony instance. Add it as a dependency in your build system:

Maven:

<dependency>
<groupId>com.cirata</groupId>
<artifactId>cirata-symphony</artifactId>
<version>${symphony.version}</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/cirata-symphony.jar</systemPath>
</dependency>

The library requires the NATS Java client and Jackson for JSON serialization as runtime dependencies.

Project Structure (Spring Boot)

The recommended project structure separates concerns into adapters, endpoints, and a main application class:

my-extension/
├── src/main/java/com/example/myext/
│ ├── MyExtension.java # @SpringBootApplication main class
│ ├── MyConfiguration.java # @ConfigurationProperties for token
│ ├── adapters/
│ │ ├── ExtensionRuntimeAdapter.java # Builds and starts ExtensionRuntime
│ │ ├── CapabilitiesAdapter.java # Implements Capabilities interface
│ │ └── FeaturesAdapter.java # Implements Features interface
│ └── endpoints/
│ └── ExtensionEndpoints.java # NATS microservice handlers
├── src/main/resources/
│ ├── application.properties # Spring config + token
│ └── templates/ # UI templates (.tsx, .md files)
│ ├── home.tsx
│ ├── dashboard.tsx
│ └── help.md
└── pom.xml

The Adapter Pattern

Java extensions use three adapters to separate the runtime configuration from the business logic. Each adapter is a Spring @Component that the framework wires together automatically.

ExtensionRuntimeAdapter

This adapter builds and starts the ExtensionRuntime, which manages the connection to Symphony:

@Component
public class ExtensionRuntimeAdapter {
private final ExtensionRuntime extensionRuntime;

public ExtensionRuntimeAdapter(Capabilities capabilities,
Features features,
MyConfiguration config)
throws ApiKeyUnauthorizedException {
this.extensionRuntime = ExtensionRuntime.builder()
.name("My Extension")
.description("Provides data processing capabilities")
.capabilities(capabilities)
.features(features)
.token(config.getToken())
.prefix("my_ext")
.build();
this.extensionRuntime.start();
}

@Bean
public ExtensionRuntime getExtensionRuntime() {
return this.extensionRuntime;
}
}

The builder accepts the following parameters:

ParameterRequiredDescription
nameYesHuman-readable name displayed in the Symphony UI
descriptionNoDescription of the extension's purpose
capabilitiesNoCapabilities implementation declaring pub/sub subjects
featuresNoFeatures implementation declaring UI resources and storage
tokenNoAPI token (falls back to SYMPHONY_TOKEN env var)
prefixNoUnique prefix for NATS subjects

Calling start() connects to Symphony and begins the status publishing loop. The token is resolved in the following order:

  1. The token parameter in the builder
  2. The OS credential store (Keychain on macOS, Credential Manager on Windows, Secret Service on Linux) — from a previous provisioning run
  3. The SYMPHONY_TOKEN environment variable
  4. The REGISTRATION_TOKEN environment variable (one-time provisioning)

When REGISTRATION_TOKEN is set, the extension connects, automatically provisions a permanent API key with the extension's declared capabilities, and stores it in the OS credential store (or ~/.config/cirata/ as a fallback). Subsequent runs use the stored token automatically with no interaction required.

If SYMPHONY_TOKEN contains a registration token (detected by its JWT capabilities), it is treated as a registration token and provisioned in the same way.

If no token is available from any source, the extension logs an error with instructions and exits. Extensions never prompt for input on stdin.

Container and Kubernetes deployments

In Docker containers and Kubernetes pods the OS credential store is not available (no keychain daemon). The SDK detects this automatically and falls back to file-based storage, but the file is ephemeral and lost when the container restarts. To avoid re-provisioning on every restart:

  • Recommended: inject a pre-provisioned token via the SYMPHONY_TOKEN environment variable (e.g. from a Kubernetes Secret). No file is written and no credential store is needed.
  • Alternative: mount ~/.config/cirata/ on a persistent volume so an auto-provisioned token survives pod restarts.

Overriding the token-embedded address

When the hostname encoded in the token is not the right one to reach Symphony from where the extension runs, set the SYMPHONY_ADDRESS environment variable. It accepts a bare hostname, host:port, or a full URL with an explicit http or https scheme. The Java SDK applies the override across all three startup paths — the constructor, REGISTRATION_TOKEN provisioning, and provisioning reconnect.

See SYMPHONY_ADDRESS for the full reference: when to use it, the three accepted forms, how scheme and port route to the HTTPS leg only, TLS SAN considerations, the plaintext-HTTP warning, and validation rules. The same variable behaves identically across the Java, Python, Go, and Rust SDKs.

CapabilitiesAdapter

Declares what NATS subjects the extension needs to publish to or subscribe from. During auto-provisioning, these capabilities determine the default permissions embedded in the extension's API key. Users can later adjust these permissions in the Symphony UI on the API Keys page.

@Component
public class CapabilitiesAdapter implements Capabilities {
@Override
public PubSubCapabilities getCapabilities() {
PubSubCapabilities caps = new PubSubCapabilities();

// Subscribe capabilities: services this extension provides
caps.putSubItem("extensions.my_ext", "My Extension");
caps.putSubItem("extensions.my_ext.process", "Process data");
caps.putSubItem("extensions.my_ext.query", "Query data");

// Publish capabilities: subjects this extension sends to
caps.putPubItem("extensions.my_ext.processed", "Data processed event");

// Specific cross-extension service subjects are permitted when
// the registering user's RBAC role covers them.
caps.putPubItem("extensions.intelligence.mcp", "Query Intelligence MCP tools");

return caps;
}
}

Subscribe capabilities use the extensions.<prefix> convention, and publish capabilities follow the same extensions.<prefix> convention. Wildcard subjects using > can cover a range of endpoints — but only within the extension's own prefix (e.g. extensions.my_ext.>). Wildcards targeting another extension's prefix are rejected at provisioning time. To call a specific service on another extension, declare its full subject (e.g. extensions.intelligence.mcp).

FeaturesAdapter

Declares the extension's UI components, routes, menus, and storage:

@Component
public class FeaturesAdapter implements Features {
private static final Logger log = LoggerFactory.getLogger(FeaturesAdapter.class);

@Override
public ExtensionFeatures getExtensionFeatures() {
// Resources
Resource home = createResource("home", "home.tsx");
Resource help = createResource("help", "help.md", MimeType.TEXT_MARKDOWN);

// Menus
MenuFeature mainMenu = new MenuFeature()
.route("/my_ext")
.icon("fa-puzzle-piece");

// UI feature
UiFeature ui = new UiFeature()
.resources(Map.of(
"home", home,
"help", help
))
.routes(Map.of(
"/my_ext", new RouteMap().uri("ui://my_ext/home"),
"/help/extensions/MyExtension",
new RouteMap().uri("ui://my_ext/help")
))
.menus(Map.of("main", Map.of("My Extension", mainMenu)));

// Storage
StorageFeature storage = new StorageFeature()
.buckets(List.of(
new ExtensionBucket()
.id("my_ext_data")
.description("Extension data"),
new ExtensionBucket()
.id("my_ext_events")
.description("Event log")
.ttl(86400)
.maxHistoryPerKey(10)
.maxBucketSize(1073741824L) // 1 GB
));

return new ExtensionFeatures().ui(ui).storage(storage);
}

private Resource createResource(String id, String file) {
return createResource(id, file, MimeType.TEXT_SYMPHONY_JSX);
}

private Resource createResource(String id, String file, MimeType mime) {
return new Resource()
.uri("ui://my_ext/" + id)
.mimeType(mime)
.text(getTemplate(file));
}

protected String getTemplate(String resourcePath) {
try (var in = Objects.requireNonNull(
getClass().getClassLoader()
.getResourceAsStream("templates/" + resourcePath))) {
return new String(in.readAllBytes(), StandardCharsets.UTF_8);
} catch (Exception ex) {
log.error("Unable to load template {}: {}", resourcePath, ex);
return "";
}
}
}

Generated Types

The FeaturesAdapter uses types generated from the Symphony OpenAPI specification. The key types are:

TypePurpose
ResourceUI content with URI, MIME type, and text body
RouteMapMaps a URL path to a resource URI, with optional nested routes
MenuFeatureA navigation menu item with route, icon, and sort order
WidgetA dashboard component with URI, name, and description
UiFeatureContainer for resources, routes, menus, and widgets
StorageFeatureDeclares JetStream buckets
ExtensionBucketA single bucket with id, description, and optional TTL, history, and size limit

MIME Types

The MimeType enum values correspond to the resource types documented in User Interfaces. The most common are MimeType.TEXT_SYMPHONY_JSX for React pages, MimeType.TEXT_SYMPHONY_MODULE for importable shared code, MimeType.TEXT_HTML_SYMPHONY for standard HTML with platform API access, MimeType.TEXT_HTMLHTMX for server-rendered HTMX pages (see HTML + HTMX), and MimeType.TEXT_MARKDOWN for help content.

Loading Templates

UI templates are stored as files in src/main/resources/templates/ and loaded at runtime via the classloader. This keeps UI content separate from Java code and makes it easy to edit without recompilation during development. These files are loaded as strings and registered as resources in the FeaturesAdapter.

Microservice Endpoints

Endpoints expose functionality as NATS microservices. They are built using the io.nats.service package and registered with the ExtensionRuntime.

Creating Endpoints

The endpoint class builds a NATS Service with named handlers organized into groups:

@Component
public class ExtensionEndpoints {
private static final Logger log =
LoggerFactory.getLogger(ExtensionEndpoints.class);
private final ExtensionRuntime runtime;
private final ObjectMapper mapper;
private final Service natsService;

public ExtensionEndpoints(ExtensionRuntime runtime) {
this.runtime = runtime;
this.mapper = new ObjectMapper();
mapper.configure(
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

// Build group hierarchy for NATS subject namespacing
Group group = new Group("cirata")
.appendGroup(new Group("extensions"))
.appendGroup(new Group("my_ext"));
// Produces subject prefix: cirata.extensions.my_ext

// Define endpoints with handlers
ServiceEndpoint processEp = ServiceEndpoint.builder()
.group(group)
.endpointName("process")
.handler(this::handleProcess)
.build();

ServiceEndpoint queryEp = ServiceEndpoint.builder()
.group(group)
.endpointName("query")
.handler(this::handleQuery)
.build();

// Build the NATS service
this.natsService = Service.builder()
.connection(runtime.getUserConnection())
.name("my_ext")
.version("1.0.0")
.description("My Extension Service")
.addServiceEndpoint(processEp)
.addServiceEndpoint(queryEp)
.build();

// Register with the runtime
runtime.setEndpoints(natsService);
}

public CompletableFuture<Boolean> startService() {
return this.natsService.startService();
}

// ... handler methods ...
}

Starting the Service

The NATS service must be started for it to respond to discovery queries. There are two approaches:

ApplicationRunner (recommended): Start the service after all Spring beans are initialized:

@Bean
public ApplicationRunner applicationRunner(
ExtensionEndpoints endpoints,
ExtensionRuntimeAdapter adapter) {
return args -> {
endpoints.startService();
adapter.getExtensionRuntime().waitForCompletion();
};
}

Constructor-based: Start the service immediately during bean initialization by calling startService() at the end of the constructor or from a driver component.

warning

The startService() call is required. Without it, the NATS service will be registered with the runtime, but will not respond to $SRV.INFO and $SRV.STATS discovery queries. This means the service will be invisible in the Symphony UI, even though the extension itself is registered and visible.

Handler Methods

Handlers receive a ServiceMessage and must always respond:

private void handleProcess(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
Map<String, Object> request =
mapper.readValue(msg.getData(), Map.class);

Map<String, Object> result = Map.of(
"status", "success",
"processed", true,
"input", request
);
msg.respond(nc, mapper.writeValueAsString(result));
} catch (Exception ex) {
log.error("Process handler error", ex);
msg.respond(nc, "{}");
}
}

Key rules for handlers:

  1. Always respond. Every handler must call msg.respond(), even on error. A handler that does not respond causes the caller to time out.
  2. Return appropriate empty values. Use "{}" for single-object endpoints and "[]" for list endpoints.
  3. Configure the ObjectMapper. Set FAIL_ON_UNKNOWN_PROPERTIES = false so that unexpected fields in request payloads do not cause deserialization failures.

Handler Patterns

JSON with typed DTOs:

private void handleTypedRequest(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
MyRequest request =
mapper.readValue(msg.getData(), MyRequest.class);
MyResponse response = processRequest(request);
msg.respond(nc, mapper.writeValueAsString(response));
} catch (IOException ex) {
log.error("Unable to parse request", ex);
msg.respond(nc, "{}");
}
}

Returning a list:

private void handleListItems(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
List<Map<String, Object>> items = getItems();
msg.respond(nc, mapper.writeValueAsString(items));
} catch (Exception ex) {
log.error("List handler error", ex);
msg.respond(nc, "[]");
}
}

Handler with key-value lookup:

private void handleGetItem(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
Map<String, String> request =
mapper.readValue(msg.getData(), Map.class);
String itemId = request.get("id");

KeyValue bucket = nc.keyValue("my_ext_data");
KeyValueEntry entry = bucket.get(itemId);

if (entry != null) {
msg.respond(nc, entry.getValueAsString());
} else {
msg.respond(nc, "{}");
}
} catch (Exception ex) {
log.error("Get item error", ex);
msg.respond(nc, "{}");
}
}

Outbound NATS Requests and Timeouts

Handlers can call other services using the NATS connection. The NATS Java client provides two families of request methods with different timeout behaviour:

Blocking (nc.request)—waits for a reply and returns a Message. A Duration timeout is required; there is no blocking overload without one. Returns null if no reply arrives within the timeout:

private void handleWithLookup(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
// Make an outbound request to another extension's service
Message reply = nc.request(
"cirata.extensions.catalog.lookup",
msg.getData(),
Duration.ofSeconds(5)
);

if (reply == null) {
// No response received within the timeout window
log.warn("Catalog lookup timed out");
msg.respond(nc, "{}");
return;
}

msg.respond(nc, reply.getData());

} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.error("Catalog lookup interrupted", ex);
msg.respond(nc, "{}");
} catch (Exception ex) {
log.error("Catalog lookup error", ex);
msg.respond(nc, "{}");
}
}

Use Duration.ofSeconds, Duration.ofMillis, or any Duration factory method to express the limit.

Async (nc.request / nc.requestWithTimeout)—returns a CompletableFuture<Message> immediately without blocking the handler thread. The no-timeout overload nc.request(subject, data) returns a future that is never automatically cancelled; use nc.requestWithTimeout(subject, data, duration) to bound it:

// Bounded async request — future completes or is cancelled after 5 s
CompletableFuture<Message> future = nc.requestWithTimeout(
"cirata.extensions.catalog.lookup",
msg.getData(),
Duration.ofSeconds(5)
);
future.thenAccept(reply -> {
if (reply != null) { /* process reply */ }
}).exceptionally(ex -> { log.error("Lookup failed", ex); return null; });

Choosing a timeout value:

ScenarioSuggested timeout
KV read or single-hop NATS round-trip2–3 s
Extension-to-extension service call5 s
Long-running computation30–60 s
No reply expecteduse nc.publish() instead
warning

Always handle the null return and InterruptedException from blocking requests. For async requests without a timeout (nc.request(subject, data)), the returned future will never complete if no reply arrives—prefer nc.requestWithTimeout when a deadline is needed.

Usage Reporting

Extensions that consume licensed resources should report their usage to Symphony for licensing and chargeback. The reportUsage method on ExtensionRuntime sends dimension measurements to Symphony, which converts them into license units using the rates embedded in the active license.

Reporting Usage

Map<String, Double> dimensions = Map.of(
"tables_replicated", 5.0,
"bytes_transferred_gb", 120.5
);

Map<String, Object> response = runtime.reportUsage(dimensions);

The dimensions map contains named measurements specific to your extension. The keys must match the dimension names defined in the license's unit rates for your extension prefix. If any dimension key is not covered by a license rate, Symphony will add your extension to the disabled_extensions list (per-extension enforcement).

Response

The response map contains:

KeyTypeDescription
resultString"ok" on success
unitsDoubleThe computed license units consumed by this report
enforcementMapCurrent enforcement state object

Checking Enforcement State

The enforcement state in the response tells your extension whether it should continue operating:

Map<String, Object> response = runtime.reportUsage(dimensions);

@SuppressWarnings("unchecked")
Map<String, Object> enforcement =
(Map<String, Object>) response.get("enforcement");
String status = enforcement != null
? (String) enforcement.get("status")
: "enforcement_ok";

if ("enforcement_enforced".equals(status)) {
log.warn("License enforcement active — pausing operations");
// Stop or reduce licensed operations
}

Subscribing to Enforcement State

In addition to checking the enforcement state from each reportUsage response, your extension can subscribe to the enforcement broadcast subject to receive updates in real time—including changes triggered by other extensions, license uploads, or administrative actions:

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import com.fasterxml.jackson.databind.ObjectMapper;

Connection nc = runtime.getUserConnection();
ObjectMapper mapper = new ObjectMapper();
Dispatcher dispatcher = nc.createDispatcher(msg -> {
try {
@SuppressWarnings("unchecked")
Map<String, Object> state = mapper.readValue(msg.getData(), Map.class);
String status = (String) state.getOrDefault("status", "enforcement_ok");
String message = (String) state.getOrDefault("message", "");

if ("enforcement_enforced".equals(status)) {
log.warn("Enforcement active: {} — pausing operations", message);
// Stop or reduce licensed operations
} else if ("enforcement_warning".equals(status)) {
log.info("Enforcement warning: approaching license limit");
}
} catch (Exception e) {
log.error("Failed to parse enforcement broadcast", e);
}
});
dispatcher.subscribe("cirata.symphony.enforcement");

Enforcement States

StatusMeaning
enforcement_okNormal operation—units available
enforcement_warning90% or more of licensed units consumed
enforcement_graceUnits exhausted or license expired, within vendor-set grace period
enforcement_enforcedGrace period expired or no valid license—stop licensed operations

Extensions may also be individually disabled via the disabled_extensions field, or flagged as grace-only via the grace_extensions field, even when the global status is enforcement_ok. Check both lists for your extension prefix to determine your operational state.

Attribution

When an extension reports usage, Symphony checks whether any attribution rules match the extension prefix and reporting account. If a rule matches, the usage record is automatically assigned to a business unit with attribution method auto. Administrators can also manually attribute records or override automatic attributions via the Attribution UI.

When to Report

Report usage at meaningful checkpoints—for example, after completing a replication batch, processing a set of queries, or transferring a block of data. Avoid reporting on every individual operation if the volume is very high; instead, batch measurements and report periodically.

// Example: report after each replication batch
public void onBatchComplete(List<TableResult> results) throws Exception {
double totalBytes = results.stream()
.mapToDouble(TableResult::getBytes)
.sum();

Map<String, Object> response = runtime.reportUsage(Map.of(
"tables_replicated", (double) results.size(),
"bytes_transferred_gb", totalBytes / 1e9
));

// Check enforcement state after reporting
@SuppressWarnings("unchecked")
Map<String, Object> enforcement =
(Map<String, Object>) response.get("enforcement");
if (enforcement != null
&& "enforcement_enforced".equals(enforcement.get("status"))) {
log.warn("License limit reached — stopping replication");
return;
}
}

See Usage Tracking for how administrators view and manage reported usage.

OpenAPI and REST APIs

Extensions can expose their services as REST APIs through the Symphony server by including an OpenAPI specification in the service metadata.

Adding OpenAPI Metadata

Load the OpenAPI specification from the classpath and include it in the service builder:

String openApiSpec;
try {
var resource = resourceLoader.getResource(
"classpath:openapi/my-ext-openapi.json");
openApiSpec = new String(
resource.getInputStream().readAllBytes(),
StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IllegalStateException("Failed to load OpenAPI spec", e);
}

Service service = Service.builder()
.connection(runtime.getUserConnection())
.name("my_ext")
.version("1.0.0")
.metadata(Map.of("openapi", openApiSpec))
.addServiceEndpoint(endpoint)
.build();

When the openapi metadata key is present, Symphony routes incoming HTTP requests to the extension's NATS endpoints based on the path definitions in the specification. Other extensions can also use the OpenAPI metadata to discover and describe the extension's capabilities, as the Cirata Intelligence extension does when exposing services as MCP tools.

Store the OpenAPI specification as a JSON file in src/main/resources/openapi/.

REST API Response Format

When endpoints are exposed as REST APIs, handlers can respond in two ways:

Raw response (simplest)—respond with the content directly. Symphony auto-detects the content type: valid JSON is served as application/json, anything else as text/plain.

// JSON response
private void handleListItems(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
List<Item> items = getItems();
msg.respond(nc, mapper.writeValueAsString(items));
} catch (Exception ex) {
log.error("Error listing items", ex);
msg.respond(nc, "[]");
}
}

// Plain text response
private void handlePing(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
msg.respond(nc, "pong".getBytes(StandardCharsets.UTF_8));
}

Wrapped response (full control)—for control over the HTTP status code, content type, or response headers, wrap the response in a JSON object with a body field:

private void handleCreateItem(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
Map<String, Object> request =
mapper.readValue(msg.getData(), Map.class);
Item item = createItem(request);

Map<String, Object> response = Map.of(
"body", Map.of("id", item.getId(), "status", "created"),
"statusCode", 201,
"contentType", "application/json"
);
msg.respond(nc, mapper.writeValueAsString(response));
} catch (Exception ex) {
log.error("Error creating item", ex);
Map<String, Object> error = Map.of(
"body", Map.of("error", ex.getMessage()),
"statusCode", 500
);
msg.respond(nc, mapper.writeValueAsString(error));
}
}
FieldRequiredDescription
bodyYesThe response payload. For JSON content types, any JSON value. For other types (e.g. text/plain), a string.
statusCodeNoHTTP status code (100–599). Defaults to 200.
contentTypeNoMIME type. Defaults to application/json.
headersNoAdditional HTTP headers as a string-to-string object.

Both camelCase (statusCode, contentType) and lowercase (statuscode, contenttype) field names are accepted.

Symphony detects the format by checking whether the response is a JSON object with a body key. Both formats work identically whether the endpoint is called through the REST API or directly over NATS.

Storage

Java extensions use the NATS JetStream key-value API for persistent storage. Buckets declared via StorageFeature are created by the platform at the cluster's current replica factor (R=3 on 3–4 node clusters, R=5 on 5+ nodes, R=1 standalone). Declared buckets inherit the platform's durability guarantees with no additional configuration.

Extensions that create buckets at runtime instead of via StorageFeature should request the current replica count from Symphony and set it on the KeyValueConfiguration explicitly — jnats defaults to R=1 otherwise, and the bucket will not be replicated:

Message reply = nc.request("cirata.services.cluster.info",
new byte[0], Duration.ofSeconds(2));
int replicas = new ObjectMapper()
.readTree(reply.getData())
.get("replicas").asInt();

nc.keyValueManagement().create(KeyValueConfiguration.builder()
.name("runtime_data")
.replicas(replicas)
.build());

Accessing Buckets

Connection nc = runtime.getUserConnection();
KeyValue kv = nc.keyValue("my_ext_data");

Basic Operations

// Store a value
kv.put("item:123", "{\"name\": \"Widget\"}".getBytes(
StandardCharsets.UTF_8));

// Retrieve a value
KeyValueEntry entry = kv.get("item:123");
if (entry != null) {
String value = entry.getValueAsString();
}

// Delete a key
kv.delete("item:123");

// List all keys
List<String> keys = kv.keys();

Batch Write

When writing many keys at once, use KeyValueBatch to publish all entries concurrently instead of calling put() in a loop. Each individual put() is a synchronous round-trip; the batch API fires all publishes via the async JetStream API and collects acks in parallel.

import com.cirata.symphony.KeyValueBatch;
import com.cirata.symphony.KeyValueBatch.BatchEntry;
import com.cirata.symphony.KeyValueBatch.BatchPutResult;

List<BatchEntry> entries = items.stream()
.map(item -> new BatchEntry(item.id(), mapper.writeValueAsBytes(item)))
.toList();

Connection nc = runtime.getUserConnection();
BatchPutResult result = KeyValueBatch.putAll(nc, "my_ext_data", entries);

log.info("Batch write: {} succeeded, {} failed out of {}",
result.succeeded(), result.failed(), result.total());

for (Map.Entry<String, Exception> error : result.errors().entrySet()) {
log.error("Failed key '{}': {}", error.getKey(), error.getValue().getMessage());
}

The batch API never fails fast—a single bad key does not prevent the remaining entries from being written. BatchPutResult.errors() maps failed key names to their exceptions. Values written this way are fully readable via standard KeyValue.get().

BucketProvider Pattern

For extensions with multiple buckets, use a provider to manage bucket access:

public class BucketProvider {
public enum Bucket {
ITEMS("my_ext_items"),
CONFIG("my_ext_config");

private final String bucketName;
Bucket(String name) { this.bucketName = name; }
public String getBucketName() { return bucketName; }
}

private final Connection nc;
private final Map<Bucket, KeyValue> buckets =
new ConcurrentHashMap<>();

public BucketProvider(ExtensionRuntime runtime) {
this.nc = runtime.getUserConnection();
}

public KeyValue getOrCreateBucket(Bucket bucket)
throws IOException, JetStreamApiException,
InterruptedException {
return buckets.computeIfAbsent(bucket, b -> {
try {
return nc.keyValue(b.getBucketName());
} catch (Exception ex) {
throw new RuntimeException(
"Unable to access bucket: " + b.getBucketName(), ex);
}
});
}
}

Configuration

Spring Boot Configuration

Use @ConfigurationProperties to bind the extension token from application properties:

@Configuration
@ConfigurationProperties(prefix = "myext")
public class MyConfiguration {
private String token;

public String getToken() { return token; }
public void setToken(String token) { this.token = token; }
}

Set the token in application.properties:

myext.token=${SYMPHONY_TOKEN:}

This reads the SYMPHONY_TOKEN environment variable at startup. The trailing : provides an empty default, allowing the extension to fall back to the OS credential store or REGISTRATION_TOKEN if the variable is not set.

Main Application Class

The main class bootstraps Spring Boot and runs the extension until shutdown:

@SpringBootApplication
public class MyExtension {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(MyExtension.class);
app.setBannerMode(Banner.Mode.OFF);
app.run(args);
}

@Bean
public ApplicationRunner applicationRunner(
ExtensionEndpoints endpoints,
ExtensionRuntimeAdapter adapter) {
return args -> {
endpoints.startService();
adapter.getExtensionRuntime().waitForCompletion();
};
}
}

The waitForCompletion() call blocks the main thread until a shutdown signal (SIGINT or SIGHUP) is received. The runtime handles disconnection and status cleanup automatically.

Plain Java Alternative

For simple extensions that do not need a dependency injection framework, the main class can implement the Features interface directly:

public class SimpleExtension implements Features {
private final ExtensionRuntime runtime;

public SimpleExtension() throws ApiKeyUnauthorizedException {
this.runtime = ExtensionRuntime.builder()
.name("Simple Extension")
.features(this)
.token(System.getenv("SYMPHONY_TOKEN"))
.prefix("simple")
.build();
}

public static void main(String[] args) {
SimpleExtension ext = new SimpleExtension();
ext.runtime.start();
ext.runtime.waitForCompletion();
}

@Override
public ExtensionFeatures getExtensionFeatures() {
Resource home = new Resource()
.uri("ui://simple/home")
.mimeType(MimeType.TEXT_SYMPHONY_JSX)
.text("<h1>Simple Extension</h1>");

UiFeature ui = new UiFeature()
.resources(Map.of("home", home))
.routes(Map.of("/simple",
new RouteMap().uri("ui://simple/home")));

return new ExtensionFeatures().ui(ui);
}
}

This approach is suitable for lightweight extensions that do not require the full Spring Boot infrastructure.

Dynamic Resource Updates

Extensions can update their UI resources at runtime by modifying the features returned by getExtensionFeatures() and calling updateExtensionInfo():

// After modifying the features that getExtensionFeatures() returns
runtime.updateExtensionInfo();

This re-publishes the extension's metadata, causing Symphony to pick up the updated resources immediately.

Help Pages

Extension help pages use MimeType.TEXT_MARKDOWN and are routed under /help/extensions/<ExtensionName>:

Resource help = new Resource()
.uri("ui://my_ext/help")
.mimeType(MimeType.TEXT_MARKDOWN)
.text(getTemplate("help.md"));

// In the routes map:
"/help/extensions/MyExtension", new RouteMap().uri("ui://my_ext/help")

See Markdown Resources for details on frontmatter, template variables, and conventions.

Logging

The Java SDK ships a default Logback configuration (logback-symphony.xml) that outputs structured JSON to stdout via Logback's built-in JsonEncoder. If your extension uses SLF4J with Logback on the classpath, this is picked up automatically — no setup code is required.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

private static final Logger log = LoggerFactory.getLogger(MyExtension.class);

log.info("Extension started");
log.warn("Cache miss rate is high");
log.error("Query failed", exception); // stack trace included

Each log line is a JSON object with consistent fields (timestamp, level, message) matching the other Symphony SDKs. Do not write log files — stdout is captured automatically by journald (systemd) or the container runtime (Docker/Kubernetes).

To override the default configuration, place your own logback.xml on the classpath ahead of the SDK's resource.

Observability

Extensions can participate in Symphony's unified telemetry system by enabling observability. This registers NATS endpoints for metrics, logs, and traces that the Observability Extension collects automatically.

Observability obs = Observability.enable(runtime, "myext");
// SLF4J/Logback logging is now captured automatically (INFO+)

// Instrument handlers with automatic trace propagation
runtime.addEndpoint("process",
obs.instrumentedHandler("process", msg -> {
obs.incrementCounter("requests.total", 1);
msg.respond(runtime.getUserConnection(), process(msg.getData()));
})
);

// Standard SLF4J logging — captured automatically when Logback is present
private static final Logger log = LoggerFactory.getLogger(MyExtension.class);
log.info("Extension started");
log.error("Query failed", exception); // stack trace included

// Metrics
obs.setGauge("queue.depth", 42.0);

When Logback is on the classpath, all SLF4J log output at INFO level or higher is captured automatically—including formatted messages, logger names, thread names, and full stack traces. Logs emitted inside an active span are automatically correlated via MDC. No manual handler attachment is needed (unlike Python and Go). If Logback is not present, use the direct API: obs.getLogCapture().info("message").

For complete coverage of traces, metrics, logs, and distributed tracing patterns, see Observability.

Multi-Instance Support

Java extensions can run as multiple instances of the same extension type for scalability, fault tolerance, and workload distribution. The SDK provides built-in support for work partitioning, queue-based command delivery, and instance-specific messaging.

Automatic Load Balancing

Microservice endpoints built with Service.builder() are automatically load-balanced across all instances by NATS micro. No code changes are needed—deploying additional instances immediately distributes incoming service requests across them.

The SDK also uses queue subscribe for the commands channel, ensuring that each command is processed by exactly one instance.

WorkPartitioner

For extensions that run background tasks (watchers, monitors, replicators), the WorkPartitioner distributes work items across instances using consistent hashing.

WorkPartitioner wp = new WorkPartitioner(runtime);
wp.onRebalance((totalInstances, myItemCount) -> {
log.info("Rebalanced: {} instances active, I own {} items",
totalInstances, myItemCount);
});
wp.start();

Use isMyWork to check whether the current instance should process a given work item:

// In a bucket watcher loop
for (String key : discoveredItems) {
if (wp.isMyWork(key)) {
processItem(key);
} else {
log.debug("Skipping {} — owned by another instance", key);
}
}

The partitioner watches the status bucket for instance join and leave events. When the set of active instances changes, the rebalance callback fires and isMyWork results may change. Extensions should re-evaluate their active work items on each watcher iteration rather than caching ownership decisions.

Integration with Spring Watchers

In Spring Boot extensions such as Ice Flow, background watchers typically run as scheduled tasks or event-driven loops. Integrate the WorkPartitioner by checking ownership before processing each item:

@Component
public class ReplicationWatcher {
private final WorkPartitioner partitioner;
private final KeyValue configBucket;

public ReplicationWatcher(ExtensionRuntime runtime,
WorkPartitioner partitioner) {
this.partitioner = partitioner;
this.configBucket = runtime.getUserConnection()
.keyValue("iceflow_replications");
}

@Scheduled(fixedDelay = 30000)
public void checkReplications() throws Exception {
for (String key : configBucket.keys()) {
if (partitioner.isMyWork(key)) {
runReplication(key);
}
}
}
}

Instance ID

The extension's instance ID is derived from the JWT identifier in the API token. Access it with:

String instanceId = runtime.getInstanceId();

Use the instance ID to tag operational records in shared storage for debugging and traceability.

For a comprehensive guide to multi-instance design patterns, opt-in levels, and best practices, see Multi-Instance Extensions.

Error Handling

Runtime Errors

The ExtensionRuntime includes an UncaughtExceptionHandler that logs fatal errors and halts the JVM. This fail-fast behavior ensures that extensions do not continue operating in an inconsistent state.

Handler Error Checklist

  1. Always call msg.respond(), even in catch blocks
  2. Return "{}" for object endpoints and "[]" for list endpoints
  3. Log errors with the exception object for stack traces
  4. Set FAIL_ON_UNKNOWN_PROPERTIES = false on the ObjectMapper
  5. Handle IOException, JetStreamApiException, and InterruptedException separately for clear error messages
  6. Check runtime.isShuttingDown() before expensive operations
private void handleRequest(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
if (runtime.isShuttingDown()) {
msg.respond(nc, "{}");
return;
}
// ... normal processing ...
} catch (IOException | JetStreamApiException ex) {
log.error("Unable to process request", ex);
msg.respond(nc, "{}");
} catch (InterruptedException ex) {
log.error("Handler interrupted", ex);
Thread.currentThread().interrupt();
msg.respond(nc, "{}");
}
}