Java Extension Development
Cirata Symphony provides a Java library for developing extensions. Java extensions are well suited to enterprise workloads, complex data pipelines, and environments where the Java ecosystem is already established. The recommended approach uses Spring Boot with dependency injection, though a plain Java alternative is also supported.
Getting Started
Installation
The Symphony Java library is distributed as a JAR file. Download it from the Languages and Libraries page or obtain it from the Symphony instance. Add it as a dependency in your build system:
Maven:
<dependency>
<groupId>com.cirata</groupId>
<artifactId>cirata-symphony</artifactId>
<version>${symphony.version}</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/cirata-symphony.jar</systemPath>
</dependency>
The library requires the NATS Java client and Jackson for JSON serialization as runtime dependencies.
Project Structure (Spring Boot)
The recommended project structure separates concerns into adapters, endpoints, and a main application class:
my-extension/
├── src/main/java/com/example/myext/
│ ├── MyExtension.java # @SpringBootApplication main class
│ ├── MyConfiguration.java # @ConfigurationProperties for token
│ ├── adapters/
│ │ ├── ExtensionRuntimeAdapter.java # Builds and starts ExtensionRuntime
│ │ ├── CapabilitiesAdapter.java # Implements Capabilities interface
│ │ └── FeaturesAdapter.java # Implements Features interface
│ └── endpoints/
│ └── ExtensionEndpoints.java # NATS microservice handlers
├── src/main/resources/
│ ├── application.properties # Spring config + token
│ └── templates/ # UI templates (.tsx, .md files)
│ ├── home.tsx
│ ├── dashboard.tsx
│ └── help.md
└── pom.xml
The Adapter Pattern
Java extensions use three adapters to separate the runtime
configuration from the business logic. Each adapter is a Spring
@Component that the framework wires together automatically.
ExtensionRuntimeAdapter
This adapter builds and starts the ExtensionRuntime, which manages
the connection to Symphony:
@Component
public class ExtensionRuntimeAdapter {
private final ExtensionRuntime extensionRuntime;
public ExtensionRuntimeAdapter(Capabilities capabilities,
Features features,
MyConfiguration config)
throws ApiKeyUnauthorizedException {
this.extensionRuntime = ExtensionRuntime.builder()
.name("My Extension")
.description("Provides data processing capabilities")
.capabilities(capabilities)
.features(features)
.token(config.getToken())
.prefix("my_ext")
.build();
this.extensionRuntime.start();
}
@Bean
public ExtensionRuntime getExtensionRuntime() {
return this.extensionRuntime;
}
}
The builder accepts the following parameters:
| Parameter | Required | Description |
|---|---|---|
name | Yes | Human-readable name displayed in the Symphony UI |
description | No | Description of the extension's purpose |
capabilities | No | Capabilities implementation declaring pub/sub subjects |
features | No | Features implementation declaring UI resources and storage |
token | No | API token (falls back to SYMPHONY_TOKEN env var) |
prefix | No | Unique prefix for NATS subjects |
Calling start() connects to Symphony and begins the status
publishing loop. The token is resolved in the following order:
- The
tokenparameter in the builder - The OS credential store (Keychain on macOS, Credential Manager on Windows, Secret Service on Linux) — from a previous provisioning run
- The
SYMPHONY_TOKENenvironment variable - The
REGISTRATION_TOKENenvironment variable (one-time provisioning)
When REGISTRATION_TOKEN is set, the extension connects, automatically
provisions a permanent API key with the extension's declared
capabilities, and stores it in the OS credential store (or
~/.config/cirata/ as a fallback). Subsequent runs use the stored
token automatically with no interaction required.
If SYMPHONY_TOKEN contains a registration token (detected by its JWT
capabilities), it is treated as a registration token and provisioned
in the same way.
If no token is available from any source, the extension logs an error with instructions and exits. Extensions never prompt for input on stdin.
Container and Kubernetes deployments
In Docker containers and Kubernetes pods the OS credential store is not available (no keychain daemon). The SDK detects this automatically and falls back to file-based storage, but the file is ephemeral and lost when the container restarts. To avoid re-provisioning on every restart:
- Recommended: inject a pre-provisioned token via the
SYMPHONY_TOKENenvironment variable (e.g. from a Kubernetes Secret). No file is written and no credential store is needed. - Alternative: mount
~/.config/cirata/on a persistent volume so an auto-provisioned token survives pod restarts.
Overriding the token-embedded address
When the hostname encoded in the token is not the right one to reach
Symphony from where the extension runs, set the
SYMPHONY_ADDRESS environment variable. It accepts a bare hostname,
host:port, or a full URL with an explicit http or https scheme.
The Java SDK applies the override across all three startup paths —
the constructor, REGISTRATION_TOKEN provisioning, and provisioning
reconnect.
See SYMPHONY_ADDRESS for the full reference: when to use it, the three accepted forms, how scheme and port route to the HTTPS leg only, TLS SAN considerations, the plaintext-HTTP warning, and validation rules. The same variable behaves identically across the Java, Python, Go, and Rust SDKs.
CapabilitiesAdapter
Declares what NATS subjects the extension needs to publish to or subscribe from. During auto-provisioning, these capabilities determine the default permissions embedded in the extension's API key. Users can later adjust these permissions in the Symphony UI on the API Keys page.
@Component
public class CapabilitiesAdapter implements Capabilities {
@Override
public PubSubCapabilities getCapabilities() {
PubSubCapabilities caps = new PubSubCapabilities();
// Subscribe capabilities: services this extension provides
caps.putSubItem("extensions.my_ext", "My Extension");
caps.putSubItem("extensions.my_ext.process", "Process data");
caps.putSubItem("extensions.my_ext.query", "Query data");
// Publish capabilities: subjects this extension sends to
caps.putPubItem("extensions.my_ext.processed", "Data processed event");
// Specific cross-extension service subjects are permitted when
// the registering user's RBAC role covers them.
caps.putPubItem("extensions.intelligence.mcp", "Query Intelligence MCP tools");
return caps;
}
}
Subscribe capabilities use the extensions.<prefix> convention, and
publish capabilities follow the same extensions.<prefix> convention.
Wildcard subjects using > can cover a range of endpoints — but only
within the extension's own prefix (e.g. extensions.my_ext.>).
Wildcards targeting another extension's prefix are rejected at
provisioning time. To call a specific service on another extension,
declare its full subject (e.g. extensions.intelligence.mcp).
FeaturesAdapter
Declares the extension's UI components, routes, menus, and storage:
@Component
public class FeaturesAdapter implements Features {
private static final Logger log = LoggerFactory.getLogger(FeaturesAdapter.class);
@Override
public ExtensionFeatures getExtensionFeatures() {
// Resources
Resource home = createResource("home", "home.tsx");
Resource help = createResource("help", "help.md", MimeType.TEXT_MARKDOWN);
// Menus
MenuFeature mainMenu = new MenuFeature()
.route("/my_ext")
.icon("fa-puzzle-piece");
// UI feature
UiFeature ui = new UiFeature()
.resources(Map.of(
"home", home,
"help", help
))
.routes(Map.of(
"/my_ext", new RouteMap().uri("ui://my_ext/home"),
"/help/extensions/MyExtension",
new RouteMap().uri("ui://my_ext/help")
))
.menus(Map.of("main", Map.of("My Extension", mainMenu)));
// Storage
StorageFeature storage = new StorageFeature()
.buckets(List.of(
new ExtensionBucket()
.id("my_ext_data")
.description("Extension data"),
new ExtensionBucket()
.id("my_ext_events")
.description("Event log")
.ttl(86400)
.maxHistoryPerKey(10)
.maxBucketSize(1073741824L) // 1 GB
));
return new ExtensionFeatures().ui(ui).storage(storage);
}
private Resource createResource(String id, String file) {
return createResource(id, file, MimeType.TEXT_SYMPHONY_JSX);
}
private Resource createResource(String id, String file, MimeType mime) {
return new Resource()
.uri("ui://my_ext/" + id)
.mimeType(mime)
.text(getTemplate(file));
}
protected String getTemplate(String resourcePath) {
try (var in = Objects.requireNonNull(
getClass().getClassLoader()
.getResourceAsStream("templates/" + resourcePath))) {
return new String(in.readAllBytes(), StandardCharsets.UTF_8);
} catch (Exception ex) {
log.error("Unable to load template {}: {}", resourcePath, ex);
return "";
}
}
}
Generated Types
The FeaturesAdapter uses types generated from the Symphony
OpenAPI specification. The key types are:
| Type | Purpose |
|---|---|
Resource | UI content with URI, MIME type, and text body |
RouteMap | Maps a URL path to a resource URI, with optional nested routes |
MenuFeature | A navigation menu item with route, icon, and sort order |
Widget | A dashboard component with URI, name, and description |
UiFeature | Container for resources, routes, menus, and widgets |
StorageFeature | Declares JetStream buckets |
ExtensionBucket | A single bucket with id, description, and optional TTL, history, and size limit |
MIME Types
The MimeType enum values correspond to the resource types documented
in User Interfaces. The most
common are MimeType.TEXT_SYMPHONY_JSX for React pages,
MimeType.TEXT_SYMPHONY_MODULE for importable shared code,
MimeType.TEXT_HTML_SYMPHONY for standard HTML with platform API
access, MimeType.TEXT_HTMLHTMX for server-rendered HTMX pages
(see HTML + HTMX),
and MimeType.TEXT_MARKDOWN for help content.
Loading Templates
UI templates are stored as files in src/main/resources/templates/
and loaded at runtime via the classloader. This keeps UI content
separate from Java code and makes it easy to edit without
recompilation during development. These files are loaded as strings
and registered as resources in the FeaturesAdapter.
Microservice Endpoints
Endpoints expose functionality as NATS microservices. They are built
using the io.nats.service package and registered with the
ExtensionRuntime.
Creating Endpoints
The endpoint class builds a NATS Service with named handlers
organized into groups:
@Component
public class ExtensionEndpoints {
private static final Logger log =
LoggerFactory.getLogger(ExtensionEndpoints.class);
private final ExtensionRuntime runtime;
private final ObjectMapper mapper;
private final Service natsService;
public ExtensionEndpoints(ExtensionRuntime runtime) {
this.runtime = runtime;
this.mapper = new ObjectMapper();
mapper.configure(
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// Build group hierarchy for NATS subject namespacing
Group group = new Group("cirata")
.appendGroup(new Group("extensions"))
.appendGroup(new Group("my_ext"));
// Produces subject prefix: cirata.extensions.my_ext
// Define endpoints with handlers
ServiceEndpoint processEp = ServiceEndpoint.builder()
.group(group)
.endpointName("process")
.handler(this::handleProcess)
.build();
ServiceEndpoint queryEp = ServiceEndpoint.builder()
.group(group)
.endpointName("query")
.handler(this::handleQuery)
.build();
// Build the NATS service
this.natsService = Service.builder()
.connection(runtime.getUserConnection())
.name("my_ext")
.version("1.0.0")
.description("My Extension Service")
.addServiceEndpoint(processEp)
.addServiceEndpoint(queryEp)
.build();
// Register with the runtime
runtime.setEndpoints(natsService);
}
public CompletableFuture<Boolean> startService() {
return this.natsService.startService();
}
// ... handler methods ...
}
Starting the Service
The NATS service must be started for it to respond to discovery queries. There are two approaches:
ApplicationRunner (recommended): Start the service after all Spring beans are initialized:
@Bean
public ApplicationRunner applicationRunner(
ExtensionEndpoints endpoints,
ExtensionRuntimeAdapter adapter) {
return args -> {
endpoints.startService();
adapter.getExtensionRuntime().waitForCompletion();
};
}
Constructor-based: Start the service immediately during bean
initialization by calling startService() at the end of the
constructor or from a driver component.
The startService() call is required. Without it, the NATS service
will be registered with the runtime, but will not respond to
$SRV.INFO and $SRV.STATS discovery queries. This means the
service will be invisible in the Symphony UI, even though the
extension itself is registered and visible.
Handler Methods
Handlers receive a ServiceMessage and must always respond:
private void handleProcess(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
Map<String, Object> request =
mapper.readValue(msg.getData(), Map.class);
Map<String, Object> result = Map.of(
"status", "success",
"processed", true,
"input", request
);
msg.respond(nc, mapper.writeValueAsString(result));
} catch (Exception ex) {
log.error("Process handler error", ex);
msg.respond(nc, "{}");
}
}
Key rules for handlers:
- Always respond. Every handler must call
msg.respond(), even on error. A handler that does not respond causes the caller to time out. - Return appropriate empty values. Use
"{}"for single-object endpoints and"[]"for list endpoints. - Configure the ObjectMapper. Set
FAIL_ON_UNKNOWN_PROPERTIES = falseso that unexpected fields in request payloads do not cause deserialization failures.
Handler Patterns
JSON with typed DTOs:
private void handleTypedRequest(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
MyRequest request =
mapper.readValue(msg.getData(), MyRequest.class);
MyResponse response = processRequest(request);
msg.respond(nc, mapper.writeValueAsString(response));
} catch (IOException ex) {
log.error("Unable to parse request", ex);
msg.respond(nc, "{}");
}
}
Returning a list:
private void handleListItems(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
List<Map<String, Object>> items = getItems();
msg.respond(nc, mapper.writeValueAsString(items));
} catch (Exception ex) {
log.error("List handler error", ex);
msg.respond(nc, "[]");
}
}
Handler with key-value lookup:
private void handleGetItem(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
Map<String, String> request =
mapper.readValue(msg.getData(), Map.class);
String itemId = request.get("id");
KeyValue bucket = nc.keyValue("my_ext_data");
KeyValueEntry entry = bucket.get(itemId);
if (entry != null) {
msg.respond(nc, entry.getValueAsString());
} else {
msg.respond(nc, "{}");
}
} catch (Exception ex) {
log.error("Get item error", ex);
msg.respond(nc, "{}");
}
}
Outbound NATS Requests and Timeouts
Handlers can call other services using the NATS connection. The NATS Java client provides two families of request methods with different timeout behaviour:
Blocking (nc.request)—waits for a reply and returns a Message. A
Duration timeout is required; there is no blocking overload without one. Returns
null if no reply arrives within the timeout:
private void handleWithLookup(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
// Make an outbound request to another extension's service
Message reply = nc.request(
"cirata.extensions.catalog.lookup",
msg.getData(),
Duration.ofSeconds(5)
);
if (reply == null) {
// No response received within the timeout window
log.warn("Catalog lookup timed out");
msg.respond(nc, "{}");
return;
}
msg.respond(nc, reply.getData());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.error("Catalog lookup interrupted", ex);
msg.respond(nc, "{}");
} catch (Exception ex) {
log.error("Catalog lookup error", ex);
msg.respond(nc, "{}");
}
}
Use Duration.ofSeconds, Duration.ofMillis, or any Duration factory method
to express the limit.
Async (nc.request / nc.requestWithTimeout)—returns a
CompletableFuture<Message> immediately without blocking the handler thread.
The no-timeout overload nc.request(subject, data) returns a future that is
never automatically cancelled; use nc.requestWithTimeout(subject, data, duration)
to bound it:
// Bounded async request — future completes or is cancelled after 5 s
CompletableFuture<Message> future = nc.requestWithTimeout(
"cirata.extensions.catalog.lookup",
msg.getData(),
Duration.ofSeconds(5)
);
future.thenAccept(reply -> {
if (reply != null) { /* process reply */ }
}).exceptionally(ex -> { log.error("Lookup failed", ex); return null; });
Choosing a timeout value:
| Scenario | Suggested timeout |
|---|---|
| KV read or single-hop NATS round-trip | 2–3 s |
| Extension-to-extension service call | 5 s |
| Long-running computation | 30–60 s |
| No reply expected | use nc.publish() instead |
Always handle the null return and InterruptedException from blocking
requests. For async requests without a timeout (nc.request(subject, data)),
the returned future will never complete if no reply arrives—prefer
nc.requestWithTimeout when a deadline is needed.
Usage Reporting
Extensions that consume licensed resources should report their usage
to Symphony for licensing and chargeback. The reportUsage
method on ExtensionRuntime sends dimension measurements to
Symphony, which converts them into license units using the rates
embedded in the active license.
Reporting Usage
Map<String, Double> dimensions = Map.of(
"tables_replicated", 5.0,
"bytes_transferred_gb", 120.5
);
Map<String, Object> response = runtime.reportUsage(dimensions);
The dimensions map contains named measurements specific to your
extension. The keys must match the dimension names defined in the
license's unit rates for your extension prefix. If any dimension key
is not covered by a license rate, Symphony will add your extension
to the disabled_extensions list (per-extension enforcement).
Response
The response map contains:
| Key | Type | Description |
|---|---|---|
result | String | "ok" on success |
units | Double | The computed license units consumed by this report |
enforcement | Map | Current enforcement state object |
Checking Enforcement State
The enforcement state in the response tells your extension whether it should continue operating:
Map<String, Object> response = runtime.reportUsage(dimensions);
@SuppressWarnings("unchecked")
Map<String, Object> enforcement =
(Map<String, Object>) response.get("enforcement");
String status = enforcement != null
? (String) enforcement.get("status")
: "enforcement_ok";
if ("enforcement_enforced".equals(status)) {
log.warn("License enforcement active — pausing operations");
// Stop or reduce licensed operations
}
Subscribing to Enforcement State
In addition to checking the enforcement state from each reportUsage
response, your extension can subscribe to the enforcement broadcast
subject to receive updates in real time—including changes triggered
by other extensions, license uploads, or administrative actions:
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import com.fasterxml.jackson.databind.ObjectMapper;
Connection nc = runtime.getUserConnection();
ObjectMapper mapper = new ObjectMapper();
Dispatcher dispatcher = nc.createDispatcher(msg -> {
try {
@SuppressWarnings("unchecked")
Map<String, Object> state = mapper.readValue(msg.getData(), Map.class);
String status = (String) state.getOrDefault("status", "enforcement_ok");
String message = (String) state.getOrDefault("message", "");
if ("enforcement_enforced".equals(status)) {
log.warn("Enforcement active: {} — pausing operations", message);
// Stop or reduce licensed operations
} else if ("enforcement_warning".equals(status)) {
log.info("Enforcement warning: approaching license limit");
}
} catch (Exception e) {
log.error("Failed to parse enforcement broadcast", e);
}
});
dispatcher.subscribe("cirata.symphony.enforcement");
Enforcement States
| Status | Meaning |
|---|---|
enforcement_ok | Normal operation—units available |
enforcement_warning | 90% or more of licensed units consumed |
enforcement_grace | Units exhausted or license expired, within vendor-set grace period |
enforcement_enforced | Grace period expired or no valid license—stop licensed operations |
Extensions may also be individually disabled via the disabled_extensions field, or flagged as grace-only via the grace_extensions field, even when the global status is enforcement_ok. Check both lists for your extension prefix to determine your operational state.
Attribution
When an extension reports usage, Symphony checks whether any
attribution rules
match the extension prefix and reporting account. If a rule matches,
the usage record is automatically assigned to a business unit with
attribution method auto. Administrators can also manually attribute
records or override automatic attributions via the Attribution UI.
When to Report
Report usage at meaningful checkpoints—for example, after completing a replication batch, processing a set of queries, or transferring a block of data. Avoid reporting on every individual operation if the volume is very high; instead, batch measurements and report periodically.
// Example: report after each replication batch
public void onBatchComplete(List<TableResult> results) throws Exception {
double totalBytes = results.stream()
.mapToDouble(TableResult::getBytes)
.sum();
Map<String, Object> response = runtime.reportUsage(Map.of(
"tables_replicated", (double) results.size(),
"bytes_transferred_gb", totalBytes / 1e9
));
// Check enforcement state after reporting
@SuppressWarnings("unchecked")
Map<String, Object> enforcement =
(Map<String, Object>) response.get("enforcement");
if (enforcement != null
&& "enforcement_enforced".equals(enforcement.get("status"))) {
log.warn("License limit reached — stopping replication");
return;
}
}
See Usage Tracking for how administrators view and manage reported usage.
OpenAPI and REST APIs
Extensions can expose their services as REST APIs through the Symphony server by including an OpenAPI specification in the service metadata.
Adding OpenAPI Metadata
Load the OpenAPI specification from the classpath and include it in the service builder:
String openApiSpec;
try {
var resource = resourceLoader.getResource(
"classpath:openapi/my-ext-openapi.json");
openApiSpec = new String(
resource.getInputStream().readAllBytes(),
StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IllegalStateException("Failed to load OpenAPI spec", e);
}
Service service = Service.builder()
.connection(runtime.getUserConnection())
.name("my_ext")
.version("1.0.0")
.metadata(Map.of("openapi", openApiSpec))
.addServiceEndpoint(endpoint)
.build();
When the openapi metadata key is present, Symphony routes
incoming HTTP requests to the extension's NATS endpoints based on the
path definitions in the specification. Other extensions can also use
the OpenAPI metadata to discover and describe the extension's
capabilities, as the Cirata Intelligence extension does when
exposing services as MCP tools.
Store the OpenAPI specification as a JSON file in
src/main/resources/openapi/.
REST API Response Format
When endpoints are exposed as REST APIs, handlers can respond in two ways:
Raw response (simplest)—respond with the content directly.
Symphony auto-detects the content type: valid JSON is served as
application/json, anything else as text/plain.
// JSON response
private void handleListItems(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
List<Item> items = getItems();
msg.respond(nc, mapper.writeValueAsString(items));
} catch (Exception ex) {
log.error("Error listing items", ex);
msg.respond(nc, "[]");
}
}
// Plain text response
private void handlePing(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
msg.respond(nc, "pong".getBytes(StandardCharsets.UTF_8));
}
Wrapped response (full control)—for control over the HTTP status
code, content type, or response headers, wrap the response in a JSON
object with a body field:
private void handleCreateItem(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
Map<String, Object> request =
mapper.readValue(msg.getData(), Map.class);
Item item = createItem(request);
Map<String, Object> response = Map.of(
"body", Map.of("id", item.getId(), "status", "created"),
"statusCode", 201,
"contentType", "application/json"
);
msg.respond(nc, mapper.writeValueAsString(response));
} catch (Exception ex) {
log.error("Error creating item", ex);
Map<String, Object> error = Map.of(
"body", Map.of("error", ex.getMessage()),
"statusCode", 500
);
msg.respond(nc, mapper.writeValueAsString(error));
}
}
| Field | Required | Description |
|---|---|---|
body | Yes | The response payload. For JSON content types, any JSON value. For other types (e.g. text/plain), a string. |
statusCode | No | HTTP status code (100–599). Defaults to 200. |
contentType | No | MIME type. Defaults to application/json. |
headers | No | Additional HTTP headers as a string-to-string object. |
Both camelCase (statusCode, contentType) and lowercase
(statuscode, contenttype) field names are accepted.
Symphony detects the format by checking whether the response is a
JSON object with a body key. Both formats work identically whether
the endpoint is called through the REST API or directly over NATS.
Storage
Java extensions use the NATS JetStream key-value API for persistent
storage. Buckets declared via StorageFeature are created by the
platform at the cluster's current replica factor (R=3 on 3–4 node
clusters, R=5 on 5+ nodes, R=1 standalone). Declared buckets inherit
the platform's durability guarantees with no additional configuration.
Extensions that create buckets at runtime instead of via
StorageFeature should request the current replica count from
Symphony and set it on the KeyValueConfiguration explicitly — jnats
defaults to R=1 otherwise, and the bucket will not be replicated:
Message reply = nc.request("cirata.services.cluster.info",
new byte[0], Duration.ofSeconds(2));
int replicas = new ObjectMapper()
.readTree(reply.getData())
.get("replicas").asInt();
nc.keyValueManagement().create(KeyValueConfiguration.builder()
.name("runtime_data")
.replicas(replicas)
.build());
Accessing Buckets
Connection nc = runtime.getUserConnection();
KeyValue kv = nc.keyValue("my_ext_data");
Basic Operations
// Store a value
kv.put("item:123", "{\"name\": \"Widget\"}".getBytes(
StandardCharsets.UTF_8));
// Retrieve a value
KeyValueEntry entry = kv.get("item:123");
if (entry != null) {
String value = entry.getValueAsString();
}
// Delete a key
kv.delete("item:123");
// List all keys
List<String> keys = kv.keys();
Batch Write
When writing many keys at once, use KeyValueBatch to publish all
entries concurrently instead of calling put() in a loop. Each
individual put() is a synchronous round-trip; the batch API fires all
publishes via the async JetStream API and collects acks in parallel.
import com.cirata.symphony.KeyValueBatch;
import com.cirata.symphony.KeyValueBatch.BatchEntry;
import com.cirata.symphony.KeyValueBatch.BatchPutResult;
List<BatchEntry> entries = items.stream()
.map(item -> new BatchEntry(item.id(), mapper.writeValueAsBytes(item)))
.toList();
Connection nc = runtime.getUserConnection();
BatchPutResult result = KeyValueBatch.putAll(nc, "my_ext_data", entries);
log.info("Batch write: {} succeeded, {} failed out of {}",
result.succeeded(), result.failed(), result.total());
for (Map.Entry<String, Exception> error : result.errors().entrySet()) {
log.error("Failed key '{}': {}", error.getKey(), error.getValue().getMessage());
}
The batch API never fails fast—a single bad key does not prevent the
remaining entries from being written. BatchPutResult.errors() maps
failed key names to their exceptions. Values written this way are fully
readable via standard KeyValue.get().
BucketProvider Pattern
For extensions with multiple buckets, use a provider to manage bucket access:
public class BucketProvider {
public enum Bucket {
ITEMS("my_ext_items"),
CONFIG("my_ext_config");
private final String bucketName;
Bucket(String name) { this.bucketName = name; }
public String getBucketName() { return bucketName; }
}
private final Connection nc;
private final Map<Bucket, KeyValue> buckets =
new ConcurrentHashMap<>();
public BucketProvider(ExtensionRuntime runtime) {
this.nc = runtime.getUserConnection();
}
public KeyValue getOrCreateBucket(Bucket bucket)
throws IOException, JetStreamApiException,
InterruptedException {
return buckets.computeIfAbsent(bucket, b -> {
try {
return nc.keyValue(b.getBucketName());
} catch (Exception ex) {
throw new RuntimeException(
"Unable to access bucket: " + b.getBucketName(), ex);
}
});
}
}
Configuration
Spring Boot Configuration
Use @ConfigurationProperties to bind the extension token from
application properties:
@Configuration
@ConfigurationProperties(prefix = "myext")
public class MyConfiguration {
private String token;
public String getToken() { return token; }
public void setToken(String token) { this.token = token; }
}
Set the token in application.properties:
myext.token=${SYMPHONY_TOKEN:}
This reads the SYMPHONY_TOKEN environment variable at startup. The
trailing : provides an empty default, allowing the extension to
fall back to the OS credential store or REGISTRATION_TOKEN if the
variable is not set.
Main Application Class
The main class bootstraps Spring Boot and runs the extension until shutdown:
@SpringBootApplication
public class MyExtension {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(MyExtension.class);
app.setBannerMode(Banner.Mode.OFF);
app.run(args);
}
@Bean
public ApplicationRunner applicationRunner(
ExtensionEndpoints endpoints,
ExtensionRuntimeAdapter adapter) {
return args -> {
endpoints.startService();
adapter.getExtensionRuntime().waitForCompletion();
};
}
}
The waitForCompletion() call blocks the main thread until a shutdown
signal (SIGINT or SIGHUP) is received. The runtime handles
disconnection and status cleanup automatically.
Plain Java Alternative
For simple extensions that do not need a dependency injection
framework, the main class can implement the Features interface
directly:
public class SimpleExtension implements Features {
private final ExtensionRuntime runtime;
public SimpleExtension() throws ApiKeyUnauthorizedException {
this.runtime = ExtensionRuntime.builder()
.name("Simple Extension")
.features(this)
.token(System.getenv("SYMPHONY_TOKEN"))
.prefix("simple")
.build();
}
public static void main(String[] args) {
SimpleExtension ext = new SimpleExtension();
ext.runtime.start();
ext.runtime.waitForCompletion();
}
@Override
public ExtensionFeatures getExtensionFeatures() {
Resource home = new Resource()
.uri("ui://simple/home")
.mimeType(MimeType.TEXT_SYMPHONY_JSX)
.text("<h1>Simple Extension</h1>");
UiFeature ui = new UiFeature()
.resources(Map.of("home", home))
.routes(Map.of("/simple",
new RouteMap().uri("ui://simple/home")));
return new ExtensionFeatures().ui(ui);
}
}
This approach is suitable for lightweight extensions that do not require the full Spring Boot infrastructure.
Dynamic Resource Updates
Extensions can update their UI resources at runtime by modifying the
features returned by getExtensionFeatures() and calling
updateExtensionInfo():
// After modifying the features that getExtensionFeatures() returns
runtime.updateExtensionInfo();
This re-publishes the extension's metadata, causing Symphony to pick up the updated resources immediately.
Help Pages
Extension help pages use MimeType.TEXT_MARKDOWN and are routed under
/help/extensions/<ExtensionName>:
Resource help = new Resource()
.uri("ui://my_ext/help")
.mimeType(MimeType.TEXT_MARKDOWN)
.text(getTemplate("help.md"));
// In the routes map:
"/help/extensions/MyExtension", new RouteMap().uri("ui://my_ext/help")
See Markdown Resources for details on frontmatter, template variables, and conventions.
Logging
The Java SDK ships a default Logback configuration
(logback-symphony.xml) that outputs structured JSON to stdout via
Logback's built-in JsonEncoder. If your extension uses SLF4J with
Logback on the classpath, this is picked up automatically — no setup
code is required.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger log = LoggerFactory.getLogger(MyExtension.class);
log.info("Extension started");
log.warn("Cache miss rate is high");
log.error("Query failed", exception); // stack trace included
Each log line is a JSON object with consistent fields (timestamp,
level, message) matching the other Symphony SDKs. Do not write
log files — stdout is captured automatically by journald (systemd)
or the container runtime (Docker/Kubernetes).
To override the default configuration, place your own logback.xml
on the classpath ahead of the SDK's resource.
Observability
Extensions can participate in Symphony's unified telemetry system by enabling observability. This registers NATS endpoints for metrics, logs, and traces that the Observability Extension collects automatically.
Observability obs = Observability.enable(runtime, "myext");
// SLF4J/Logback logging is now captured automatically (INFO+)
// Instrument handlers with automatic trace propagation
runtime.addEndpoint("process",
obs.instrumentedHandler("process", msg -> {
obs.incrementCounter("requests.total", 1);
msg.respond(runtime.getUserConnection(), process(msg.getData()));
})
);
// Standard SLF4J logging — captured automatically when Logback is present
private static final Logger log = LoggerFactory.getLogger(MyExtension.class);
log.info("Extension started");
log.error("Query failed", exception); // stack trace included
// Metrics
obs.setGauge("queue.depth", 42.0);
When Logback is on the classpath, all SLF4J log output at INFO level
or higher is captured automatically—including formatted messages,
logger names, thread names, and full stack traces. Logs emitted inside
an active span are automatically correlated via MDC. No manual handler
attachment is needed (unlike Python and Go). If Logback is not present,
use the direct API: obs.getLogCapture().info("message").
For complete coverage of traces, metrics, logs, and distributed tracing patterns, see Observability.
Multi-Instance Support
Java extensions can run as multiple instances of the same extension type for scalability, fault tolerance, and workload distribution. The SDK provides built-in support for work partitioning, queue-based command delivery, and instance-specific messaging.
Automatic Load Balancing
Microservice endpoints built with Service.builder() are
automatically load-balanced across all instances by NATS micro. No
code changes are needed—deploying additional instances immediately
distributes incoming service requests across them.
The SDK also uses queue subscribe for the commands channel, ensuring that each command is processed by exactly one instance.
WorkPartitioner
For extensions that run background tasks (watchers, monitors, replicators), the WorkPartitioner distributes work items across instances using consistent hashing.
WorkPartitioner wp = new WorkPartitioner(runtime);
wp.onRebalance((totalInstances, myItemCount) -> {
log.info("Rebalanced: {} instances active, I own {} items",
totalInstances, myItemCount);
});
wp.start();
Use isMyWork to check whether the current instance should process
a given work item:
// In a bucket watcher loop
for (String key : discoveredItems) {
if (wp.isMyWork(key)) {
processItem(key);
} else {
log.debug("Skipping {} — owned by another instance", key);
}
}
The partitioner watches the status bucket for instance join and leave
events. When the set of active instances changes, the rebalance
callback fires and isMyWork results may change. Extensions should
re-evaluate their active work items on each watcher iteration rather
than caching ownership decisions.
Integration with Spring Watchers
In Spring Boot extensions such as Ice Flow, background watchers typically run as scheduled tasks or event-driven loops. Integrate the WorkPartitioner by checking ownership before processing each item:
@Component
public class ReplicationWatcher {
private final WorkPartitioner partitioner;
private final KeyValue configBucket;
public ReplicationWatcher(ExtensionRuntime runtime,
WorkPartitioner partitioner) {
this.partitioner = partitioner;
this.configBucket = runtime.getUserConnection()
.keyValue("iceflow_replications");
}
@Scheduled(fixedDelay = 30000)
public void checkReplications() throws Exception {
for (String key : configBucket.keys()) {
if (partitioner.isMyWork(key)) {
runReplication(key);
}
}
}
}
Instance ID
The extension's instance ID is derived from the JWT identifier in the API token. Access it with:
String instanceId = runtime.getInstanceId();
Use the instance ID to tag operational records in shared storage for debugging and traceability.
For a comprehensive guide to multi-instance design patterns, opt-in levels, and best practices, see Multi-Instance Extensions.
Error Handling
Runtime Errors
The ExtensionRuntime includes an UncaughtExceptionHandler that
logs fatal errors and halts the JVM. This fail-fast behavior ensures
that extensions do not continue operating in an inconsistent state.
Handler Error Checklist
- Always call
msg.respond(), even in catch blocks - Return
"{}"for object endpoints and"[]"for list endpoints - Log errors with the exception object for stack traces
- Set
FAIL_ON_UNKNOWN_PROPERTIES = falseon the ObjectMapper - Handle
IOException,JetStreamApiException, andInterruptedExceptionseparately for clear error messages - Check
runtime.isShuttingDown()before expensive operations
private void handleRequest(ServiceMessage msg) {
Connection nc = runtime.getUserConnection();
try {
if (runtime.isShuttingDown()) {
msg.respond(nc, "{}");
return;
}
// ... normal processing ...
} catch (IOException | JetStreamApiException ex) {
log.error("Unable to process request", ex);
msg.respond(nc, "{}");
} catch (InterruptedException ex) {
log.error("Handler interrupted", ex);
Thread.currentThread().interrupt();
msg.respond(nc, "{}");
}
}