Extension Observability
Cirata Symphony provides built-in observability for extensions through three telemetry signals: metrics, logs, and traces. When an extension enables observability, its telemetry is automatically collected by the Symphony Observability Extension and made available for querying, visualization, and export to external backends.
This guide covers how to instrument your extension to produce telemetry, how that telemetry flows through Symphony, and the SDK-specific APIs available in each language.
Contents
- How It Works
- Quick Start
- Telemetry Endpoints
- Traces
- Logs
- Metrics
- Distributed Trace Context
- Browser Telemetry
- SDK Reference: Go
- SDK Reference: Python
- SDK Reference: Java
- SDK Reference: Rust
- Serialization Format
- Buffer Limits and Data Lifecycle
- Best Practices
How It Works
- The browser UI automatically generates spans for page navigations, HTTP requests, and NATS requests, sending them to the platform via NATS
- Your extension uses the observability SDK to record spans, logs, and metrics
- The SDK buffers telemetry in memory and registers three NATS endpoints:
.metrics,.logs,.traces - The Observability Extension periodically discovers your extension and polls these endpoints
- Collected data is stored in DuckDB and available via query endpoints, the Symphony UI, and OTLP export
No direct network connection to the Symphony Observability Extension is needed. All communication happens through NATS request-reply.
Quick Start
The fastest path to full observability is a single Enable call during extension startup. This registers the three telemetry endpoints and gives you an API for recording spans, logs, and metrics.
Go
ext := extension.NewExtension("myext", "myext", "My Extension", token)
svc, _ := ext.AddService("myext", "1.0")
obs, _ := observability.Enable(ext, svc, "myext")
// .metrics, .logs, .traces endpoints are now registered
// In a handler:
func handleQuery(req micro.Request) {
span := obs.StartChildSpan("handle_query", observability.SpanKindServer, req)
defer obs.EndSpan(span, nil)
// ... handler logic ...
}
Python
ext = Extension("myext", "myext", token)
async with ext:
svc = await ext.add_endpoints("myext", "1.0")
obs = await Observability.enable(ext, svc, "myext")
async def handle_query(msg):
span = obs.start_child_span("handle_query", SpanKind.SERVER, msg)
try:
# ... handler logic ...
obs.end_span(span)
except Exception as e:
obs.end_span_with_error(span, e)
Java
ExtensionRuntime runtime = ExtensionRuntime.builder()...build();
Observability obs = Observability.enable(runtime, "myext");
void handleQuery(ServiceMessage msg) {
SpanHandle span = obs.startChildSpan("handle_query", SpanKind.SERVER, msg);
try {
// ... handler logic ...
obs.endSpan(span, Map.of());
} catch (Exception e) {
obs.endSpanWithError(span, e);
}
}
Rust
let ext = Extension::new("myext", "myext").with_token(token);
ext.run_with_setup(|ext| Box::pin(async move {
let svc = ext.add_endpoints("myext", "1.0", "My Extension", None).await?;
let obs = Observability::enable(&svc, "myext").await;
// .metrics, .logs, .traces endpoints are now registered
// In a handler:
let span = obs.start_span("handle_query", SpanKind::Server);
// ... handler logic ...
obs.end_span(span, HashMap::new());
Ok(())
})).await?;
Telemetry Endpoints
When observability is enabled, three NATS service endpoints are registered under your extension's prefix:
| Endpoint | Subject | Description |
|---|---|---|
metrics | cirata.extensions.<prefix>.metrics | Returns buffered metrics as OTLP protobuf |
logs | cirata.extensions.<prefix>.logs | Returns buffered log records as OTLP protobuf |
traces | cirata.extensions.<prefix>.traces | Returns buffered spans as OTLP protobuf |
The Observability Extension discovers these endpoints using NATS service discovery and polls them on a configurable interval (default: 30 seconds). Each poll drains the buffer—the response contains all telemetry accumulated since the last poll.
If your extension is not yet discovered by the Symphony Observability Extension, telemetry is still buffered locally (up to the buffer limits) and will be collected on the next poll after discovery.
Traces
Traces capture the timing and causal relationships of operations in your extension. Each trace is a tree of spans, where each span represents a unit of work.
Creating Spans
Root spans start a new trace with a fresh trace ID:
span := obs.StartSpan("aggregation_cycle", observability.SpanKindInternal)
// ... do work ...
obs.EndSpan(span, map[string]string{"items.count": "42"})
Child spans continue an existing trace, typically from an incoming NATS request that carries a traceparent header:
span := obs.StartChildSpan("handle_request", observability.SpanKindServer, req)
// ... do work ...
obs.EndSpan(span, nil)
If the incoming message has no trace context, StartChildSpan falls back to creating a root span.
Span Kinds
| Kind | Value | Use When |
|---|---|---|
INTERNAL | 1 | Internal operations not triggered by an external request |
SERVER | 2 | Handling an incoming request from another component |
CLIENT | 3 | Making an outgoing request to another component |
Ending Spans
Every started span must be ended. There are two ways:
EndSpan(span, attributes)—marks the span as successful (status OK). Pass a map of string key-value attributes to attach metadata, ornil/nullfor no attributes.EndSpanWithError(span, error)—marks the span as failed (status ERROR) and records the error type and message as span attributes.
Attributes
Span attributes are arbitrary key-value string pairs that add context to a span. Common conventions:
| Attribute | Example | Description |
|---|---|---|
nats.subject | cirata.extensions.myext.query | The NATS subject involved |
db.operation | SELECT | Database operation type |
items.count | 150 | Number of items processed |
exception.type | java.lang.NullPointerException | Set automatically on error spans |
exception.message | value was null | Set automatically on error spans |
Logs
The log capture system bridges your extension's native logging framework to the OTLP log format. Logs are buffered and drained alongside metrics and traces.
| SDK | Integration | Setup Required |
|---|---|---|
| Go | slog.Handler—call obs.GetLogCapture().Handler() | One-time: create slog.Logger with the handler |
| Python | logging.Handler—attach obs.logs to any logger | One-time: logger.addHandler(obs.logs) |
| Java | Automatic—Logback bridge attaches on enable() | None (Logback on classpath is sufficient) |
| Rust | Direct API—obs.log_capture().lock().unwrap().record() | Manual record() calls |
Go—slog Integration
The Go SDK provides an slog.Handler that captures log output into
the OTLP buffer. Combine it with the stderr JSON handler using
MultiHandler so that logs flow to both destinations:
obs, _ := observability.Enable(ext, svc, "myext")
// Fan out to stderr (for journald) AND the OTLP buffer
slog.SetDefault(slog.New(observability.NewMultiHandler(
observability.NewSymphonyJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}),
obs.GetLogCapture().Handler(),
)))
slog.Info("Extension started", "version", "1.0")
slog.Error("Query failed", "error", err.Error())
The LogCapture handler filters at INFO level or higher. Each log record includes the message, severity, any structured attributes, and trace/span IDs from the active context.
Direct recording is also available when you need to bypass slog:
obs.GetLogCapture().Record(9, "INFO", "Processing complete", map[string]string{
"duration_ms": "42",
})
Python—logging.Handler Integration
The Python SDK provides a standard logging.Handler that feeds log
records into the OTLP buffer. Add it to the root logger alongside
configure_logging() (which handles structured JSON to stdout):
import logging
from cirata.symphony.logging import configure_logging
from cirata.symphony.observability import Observability
configure_logging("myext") # Structured JSON to stdout
obs = await Observability.enable(ext, svc, "myext")
# Add OTLP capture as a second handler on the root logger
logging.getLogger().addHandler(obs.logs)
# All log calls now flow to both stdout and the OTLP buffer
logger = logging.getLogger("myext")
logger.info("Extension started")
logger.error("Query failed", exc_info=True)
Do not call logger.setLevel() after configure_logging() — the
level is already set. Exception information is automatically extracted
when exc_info=True is passed, recording exception.type and
exception.message attributes.
Java—Automatic SLF4J/Logback Capture
The Java SDK automatically captures standard logging output when
Logback is on the classpath. When Observability.enable() is called,
a reflective bridge attaches to Logback's root logger and forwards all
INFO-level-and-above log events to the observability system—no
manual wiring required.
Observability obs = Observability.enable(runtime, "myext");
// Standard SLF4J logging is captured automatically
private static final Logger log = LoggerFactory.getLogger(MyExtension.class);
log.info("Extension started");
log.warn("Cache miss rate is high");
log.error("Query failed", exception); // stack trace is included
The bridge captures the formatted message, logger name, thread name, and full stack trace (when an exception is attached). Log records are automatically correlated with the active trace context via MDC, so logs emitted inside a span carry the correct trace ID and span ID.
The bridge uses reflection to avoid a compile-time dependency on Logback. If Logback is not on the classpath (e.g., when using a different SLF4J backend), the bridge is a silent no-op and you can use the direct recording API instead.
Direct recording is still available for programmatic use or when Logback is not present:
LogCapture logs = obs.getLogCapture();
logs.info("Extension started");
logs.warn("Cache miss rate is high");
logs.error("Query failed: " + e.getMessage());
// Full control over severity and attributes
logs.record(9, "INFO", "Processing batch", Map.of(
"batch.size", "100",
"duration_ms", "250"
));
Rust—Direct Recording
let obs = Observability::enable(&svc, "myext").await;
let logs = obs.log_capture();
logs.lock().unwrap().record(
Severity::Info,
"Extension started",
HashMap::from([("version".into(), "1.0".into())]),
);
Severity Levels
All SDKs use the standard OTLP severity numbers:
| Level | Number | Text |
|---|---|---|
| Debug | 5 | DEBUG |
| Info | 9 | INFO |
| Warn | 13 | WARN |
| Error | 17 | ERROR |
| Fatal | 21 | FATAL |
Stdout Logging
The OTLP log capture described in the preceding sections is the observability pipeline — it buffers logs for the Observability Extension to poll. Independently, extensions must also configure their stdout output to use structured JSON so that journald (or the container runtime) captures well-formed log lines.
| SDK | Stdout setup | OTLP capture |
|---|---|---|
| Python | configure_logging("prefix") | logging.getLogger().addHandler(obs.logs) |
| Java | Automatic (logback-symphony.xml ships in SDK) | Automatic (Logback bridge) |
| Go | NewSymphonyJSONHandler + NewMultiHandler | Included in the MultiHandler |
| Rust | tracing_subscriber::fmt::init() | obs.log_capture().lock().unwrap().record() |
Both paths should be active. The examples above show how to wire them together for each language.
Metrics
The metrics capture provides a simple counter and gauge API. Metrics are buffered and drained on each poll cycle.
Go
metrics := obs.GetMetricsCapture()
// Counters — monotonically increasing values
metrics.IncrementCounter("requests.total", 1)
metrics.IncrementCounter("bytes.processed", int64(len(data)))
// Gauges — point-in-time measurements
metrics.SetGauge("queue.depth", float64(queue.Len()))
metrics.SetGauge("cache.hit_rate", 0.85)
Java
Observability obs = Observability.enable(runtime, "myext");
MetricsCapture metrics = obs.getMetricsCapture();
// Counters — monotonically increasing values
metrics.incrementCounter("requests.total", 1);
metrics.incrementCounter("bytes.processed", 1024);
// Gauges — point-in-time measurements
metrics.setGauge("queue.depth", 42.0);
metrics.setGauge("cache.hit_rate", 0.85);
Python
obs = await Observability.enable(ext, svc, "myext")
# Counters — monotonically increasing values
obs.increment_counter("requests.total")
obs.increment_counter("bytes.processed", 1024)
# Gauges — point-in-time measurements
obs.set_gauge("queue.depth", 42.0)
obs.set_gauge("cache.hit_rate", 0.85)
Rust
let obs = Observability::enable(&svc, "myext").await;
// Counters — monotonically increasing values
obs.increment_counter("requests.total", 1);
obs.increment_counter("bytes.processed", 1024);
// Gauges — point-in-time measurements
obs.set_gauge("queue.depth", 42.0);
obs.set_gauge("cache.hit_rate", 0.85);
Distributed Trace Context
Cirata Symphony propagates W3C Trace Context across NATS message boundaries using the traceparent header. This connects spans from different components into a single trace tree.
How It Works
Whenever the Symphony Platform calls your extension via NATS—whether proxying an HTTP request, invoking a service directly, or relaying a CLI command—it injects a traceparent header into the NATS message:
traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
│ │ │ │
│ │ │ └─ flags (sampled)
│ │ └─ parent span ID (8 bytes hex)
│ └─ trace ID (16 bytes hex)
└─ version
Your extension extracts this context and creates a child span, continuing the same trace:
The browser UI is the typical origin of traces. When a user navigates to a page or interacts with the UI, the browser creates a root span and propagates its trace context through HTTP and NATS requests to the platform and extensions.
Extracting Context from Incoming Requests
Each SDK's StartChildSpan method automatically extracts the traceparent header from the incoming NATS message. If the header is present, the new span becomes a child of the caller's span. If it is absent, a new root span is created.
Go:
// StartChildSpan extracts traceparent from req automatically
span := obs.StartChildSpan("handle_request", observability.SpanKindServer, req)
You can also extract the context manually:
tc, ok := extension.ExtractTraceContext(req)
if ok {
// tc.TraceID and tc.SpanID are available
}
Python:
span = obs.start_child_span("handle_request", SpanKind.SERVER, msg)
Or manually:
from cirata.symphony.observability import TracePropagator
ctx = TracePropagator.extract(msg)
if ctx is not None:
# ctx.trace_id and ctx.span_id are available
Java:
SpanHandle span = obs.startChildSpan("handle_request", SpanKind.SERVER, msg);
Or manually:
TraceContext ctx = TracePropagator.extract(msg);
if (ctx.isValid()) {
// ctx.traceId() and ctx.spanId() are available
}
Injecting Context into Outgoing Requests
When your extension calls another extension, inject trace context so the downstream extension can continue the trace.
Go:
// Using the Symphony client library — context propagation is automatic
// when TraceContext is on the Go context
ctx := shared.WithTraceContext(ctx, tc)
resp, err := client.Extension("other-ext").Invoke(ctx, "method", payload)
Java:
// Manual injection for direct NATS calls
TraceContext ctx = new TraceContext(span.traceId(), span.spanId(), (byte) 0x01);
Headers headers = TracePropagator.inject(ctx);
// Use headers when making NATS request
Python:
from cirata.symphony.observability import TracePropagator, TraceContext
ctx = TraceContext.new_root()
headers = TracePropagator.inject(ctx)
# Use headers dict when making NATS request
Browser Telemetry
The Symphony UI automatically instruments all browser interactions. Page navigations, HTTP requests, and NATS requests generate spans under service name ui that appear in the collector alongside backend spans. No configuration is required—telemetry is built into the platform UI.
What is Instrumented
| Signal | Description | Automatic |
|---|---|---|
| Page navigations | Root span per page visit (page /dashboard, page /extensions, etc.) | Yes |
| HTTP requests | Span per axios call with method, URL, status code, duration | Yes |
| NATS requests | Span per NATS request with subject and duration | Yes |
| Extension iframe NATS | NATS requests from sandboxed extension UIs | Yes |
| W3C traceparent propagation | All HTTP and NATS calls carry traceparent headers | Yes |
Trace Hierarchy
Browser telemetry creates end-to-end traces that span the browser, the Symphony platform, and your extensions. A single user interaction produces a trace tree like this:
Each page navigation creates a root span. HTTP and NATS requests made during that page load become child spans. When those requests reach the Symphony platform or an extension, the server-side handler creates its own child span—continuing the same trace.
Service Names
Browser spans use service.name: "ui" and platform spans use service.name: "symphony". Extension spans use service.name: "<extension-prefix>". All appear in the same trace when correlated by trace ID. You can filter by service name in the Observability Extension's trace query interface to isolate browser-side, platform-side, or extension-side activity.
How It Works
The UI generates W3C-compliant trace IDs using crypto.getRandomValues(), injects traceparent headers into HTTP requests (via an axios interceptor) and NATS messages (via traced request helpers), and exports completed spans as JSON to the platform's cirata.symphony.traces NATS endpoint. The platform converts these browser spans to OTLP protobuf format for the Observability Extension to collect and store alongside backend telemetry.
Extension UI Telemetry
Extension UIs running in sandboxed iframes get automatic NATS trace propagation via the BridgeHandler. No extension code changes are needed—the bridge injects traceparent headers on every NATS publish and request originating from extension iframes. This means your extension's UI interactions appear as part of the same trace tree as the parent page navigation.
Go SDK
Package: cirata.com/symphony/extension/observability
Types
type SpanKind int32
const (
SpanKindInternal SpanKind = 1 // Internal operations
SpanKindServer SpanKind = 2 // Handling incoming requests
SpanKindClient SpanKind = 3 // Making outgoing requests
)
Observability
// Enable registers .metrics, .logs, .traces endpoints on the NATS service.
func Enable(ext *extension.Extension, svc micro.Service, prefix string) (*Observability, error)
func (o *Observability) StartSpan(name string, kind SpanKind) *SpanHandle
func (o *Observability) StartChildSpan(name string, kind SpanKind, req micro.Request) *SpanHandle
func (o *Observability) EndSpan(span *SpanHandle, attrs map[string]string)
func (o *Observability) EndSpanWithError(span *SpanHandle, err error)
func (o *Observability) GetLogCapture() *LogCapture
func (o *Observability) GetMetricsCapture() *MetricsCapture
SpanHandle
type SpanHandle struct {
TraceID [16]byte
SpanID [8]byte
ParentSpanID []byte // nil for root spans
Name string
Kind SpanKind
StartTime time.Time
}
LogCapture
func (lc *LogCapture) Handler() slog.Handler
func (lc *LogCapture) Record(severity int, severityText, body string, attrs map[string]string)
func (lc *LogCapture) Drain() []byte
MetricsCapture
func (mc *MetricsCapture) IncrementCounter(name string, delta int64)
func (mc *MetricsCapture) SetGauge(name string, value float64)
func (mc *MetricsCapture) Drain() []byte
Trace Context Utilities
These are in the cirata.com/symphony/shared package:
func NewRootTraceContext() TraceContext
func NewChildTraceContext(parent TraceContext) TraceContext
func FormatTraceparent(ctx TraceContext) string
func ParseTraceparent(header string) (TraceContext, error)
func WithTraceContext(ctx context.Context, tc TraceContext) context.Context
func TraceContextFromContext(ctx context.Context) (TraceContext, bool)
And in the extension package:
func ExtractTraceContext(req micro.Request) (shared.TraceContext, bool)
Full Example
package main
import (
"log/slog"
"os"
"cirata.com/symphony/extension"
"cirata.com/symphony/extension/observability"
"github.com/nats-io/nats.go/micro"
)
func main() {
ext := extension.NewExtension("weather", "weather", "Weather Service", token)
svc, err := ext.AddService("weather", "1.0")
if err != nil {
panic(err)
}
obs, err := observability.Enable(ext, svc, "weather")
if err != nil {
panic(err)
}
// Structured JSON to stderr + OTLP capture via MultiHandler
slog.SetDefault(slog.New(observability.NewMultiHandler(
observability.NewSymphonyJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}),
obs.GetLogCapture().Handler(),
)))
slog.Info("Weather extension starting")
// Register a handler
grp := svc.AddGroup("cirata.extensions.weather")
grp.AddEndpoint("forecast", micro.HandlerFunc(func(req micro.Request) {
span := obs.StartChildSpan("forecast", observability.SpanKindServer, req)
city := string(req.Data())
slog.Info("Fetching forecast", "city", city)
obs.GetMetricsCapture().IncrementCounter("forecast.requests", 1)
forecast, err := fetchForecast(city)
if err != nil {
obs.EndSpanWithError(span, err)
req.Error("500", "forecast failed", nil)
return
}
obs.EndSpan(span, map[string]string{"city": city})
req.Respond(forecast)
}))
ext.Run()
}
Python SDK
Package: cirata.symphony.observability
Types
class SpanKind(IntEnum):
INTERNAL = 1
SERVER = 2
CLIENT = 3
Observability
class Observability:
@classmethod
async def enable(cls, ext, svc, prefix: str) -> "Observability"
def start_span(self, name: str, kind: SpanKind = SpanKind.INTERNAL) -> SpanHandle
def start_child_span(self, name: str, kind: SpanKind, msg) -> SpanHandle
def end_span(self, span: SpanHandle, attrs: Optional[dict] = None)
def end_span_with_error(self, span: SpanHandle, error: Exception)
SpanHandle
@dataclass
class SpanHandle:
trace_id: bytes # 16 bytes
span_id: bytes # 8 bytes
parent_span_id: Optional[bytes]
name: str
kind: int
start_time_ns: int
LogCapture
Extends logging.Handler:
class LogCapture(logging.Handler):
def __init__(self, prefix: str, level=logging.INFO)
def emit(self, record: logging.LogRecord) # Called automatically by logging
def drain(self) -> bytes
TracePropagator
class TracePropagator:
@staticmethod
def inject(ctx: TraceContext) -> dict # Returns {"traceparent": "00-..."}
@staticmethod
def extract(msg) -> Optional[TraceContext]
@staticmethod
def parse(header: str) -> Optional[TraceContext]
TraceContext
@dataclass(frozen=True)
class TraceContext:
trace_id: bytes # 16 bytes
span_id: bytes # 8 bytes
flags: int = 0x01
@property
def is_valid(self) -> bool
@staticmethod
def new_root() -> TraceContext
def new_child(self) -> TraceContext
Full Example
import asyncio
import logging
from cirata import symphony
from cirata.symphony.observability import Observability, SpanKind
from cirata.symphony.logging import configure_logging
configure_logging("weather")
logger = logging.getLogger("weather")
async def main():
async with symphony.Extension("weather", "weather") as ext:
svc = await ext.add_endpoints("weather", "1.0")
obs = await Observability.enable(ext, svc, "weather")
# Add OTLP capture alongside stdout (configured by configure_logging)
logging.getLogger().addHandler(obs.logs)
logger.info("Weather extension starting")
async def handle_forecast(msg):
span = obs.start_child_span("forecast", SpanKind.SERVER, msg)
try:
city = msg.data.decode()
logger.info("Fetching forecast for %s", city)
forecast = await fetch_forecast(city)
obs.end_span(span, {"city": city})
await msg.respond(forecast)
except Exception as e:
obs.end_span_with_error(span, e)
logger.error("Forecast failed", exc_info=True)
grp = svc.add_group("cirata.extensions.weather")
await grp.add_endpoint("forecast", handle_forecast)
await ext.operate()
asyncio.run(main())
Java SDK
Package: com.cirata.symphony.observability
Types
public enum SpanKind {
INTERNAL(1), SERVER(2), CLIENT(3), PRODUCER(4), CONSUMER(5);
public int getValue();
}
Observability
public class Observability {
public static Observability enable(ExtensionRuntime runtime, String prefix);
public SpanHandle startSpan(String name, SpanKind kind);
public SpanHandle startChildSpan(String name, SpanKind kind, ServiceMessage msg);
public void endSpan(SpanHandle span, Map<String, String> attributes);
public void endSpanWithError(SpanHandle span, Throwable error);
public TraceCapture getTraceCapture();
public LogCapture getLogCapture();
public MetricsCapture getMetricsCapture();
}
SpanHandle
public record SpanHandle(
byte[] traceId,
byte[] spanId,
byte[] parentSpanId, // null for root spans
String name,
SpanKind kind,
long startNano
) {}
LogCapture
public class LogCapture {
// Automatic: when Logback is on the classpath, INFO+ log events from all
// SLF4J loggers are captured automatically (including stack traces and
// trace context from MDC). No manual setup required.
// Direct recording API (also works without Logback):
public void record(int severityNumber, String severityText, String body,
Map<String, String> attributes);
public void info(String message);
public void warn(String message);
public void error(String message);
public byte[] drain();
}
MetricsCapture
public class MetricsCapture {
public void incrementCounter(String name, long delta);
public void setGauge(String name, double value);
public byte[] drain();
}
TracePropagator
public final class TracePropagator {
public static Headers inject(TraceContext ctx);
public static TraceContext extract(Headers headers);
public static TraceContext extract(ServiceMessage msg);
}
TraceContext
public record TraceContext(byte[] traceId, byte[] spanId, byte traceFlags) {
public static final TraceContext EMPTY;
public boolean isValid();
}
Full Example
public class WeatherExtension {
// Standard SLF4J logger — output is captured automatically
private static final Logger log = LoggerFactory.getLogger(WeatherExtension.class);
public static void main(String[] args) {
ExtensionRuntime runtime = ExtensionRuntime.builder()
.name("weather").prefix("weather").token(token)
.build();
Observability obs = Observability.enable(runtime, "weather");
// Logback bridge is now active — all SLF4J INFO+ logs are captured
log.info("Weather extension starting");
// Register endpoint handler
runtime.addEndpoint("forecast", msg -> {
SpanHandle span = obs.startChildSpan("forecast", SpanKind.SERVER, msg);
try {
String city = new String(msg.getData());
log.info("Fetching forecast for {}", city);
byte[] forecast = fetchForecast(city);
obs.endSpan(span, Map.of("city", city));
msg.respond(runtime.getUserConnection(), forecast);
} catch (Exception e) {
obs.endSpanWithError(span, e);
log.error("Forecast failed", e); // stack trace included
}
});
runtime.run();
}
}
Rust SDK
Module: cirata_symphony::observability
Types
pub enum SpanKind {
Internal = 1,
Server = 2,
Client = 3,
}
Observability
impl Observability {
pub fn new(prefix: &str) -> Self;
pub async fn enable(svc: &async_nats::service::Service, prefix: &str) -> Arc<Self>;
pub fn start_span(&self, name: &str, kind: SpanKind) -> SpanHandle;
pub fn start_child_span(&self, name: &str, kind: SpanKind, parent: &TraceContext) -> SpanHandle;
pub fn start_child_span_from_request(&self, name: &str, kind: SpanKind, request: &async_nats::service::Request) -> SpanHandle;
pub fn start_child_span_from_header(&self, name: &str, kind: SpanKind, traceparent: &str) -> SpanHandle;
pub fn end_span(&self, span: SpanHandle, attrs: HashMap<String, String>);
pub fn end_span_with_error(&self, span: SpanHandle, err: &dyn Error);
pub fn ingest_raw(&self, data: Vec<u8>);
pub fn increment_counter(&self, name: &str, delta: i64);
pub fn set_gauge(&self, name: &str, value: f64);
pub fn drain_traces(&self) -> Vec<u8>;
pub fn drain_logs(&self) -> Vec<u8>;
pub fn drain_metrics(&self) -> Vec<u8>;
pub fn log_capture(&self) -> Arc<Mutex<LogCapture>>;
pub fn metrics_capture(&self) -> Arc<Mutex<MetricsCapture>>;
pub fn trace_capture(&self) -> Arc<Mutex<TraceCapture>>;
}
SpanHandle
pub struct SpanHandle {
pub trace_id: [u8; 16],
pub span_id: [u8; 8],
pub parent_span_id: Option<Vec<u8>>,
pub name: String,
pub kind: i32,
pub start_time_ns: u64,
}
TraceContext
pub struct TraceContext {
pub trace_id: [u8; 16],
pub span_id: [u8; 8],
pub flags: u8,
}
impl TraceContext {
pub fn new_root() -> Self;
pub fn new_child(&self) -> Self;
pub fn format(&self) -> String; // "00-{traceId}-{spanId}-{flags}"
pub fn parse(header: &str) -> Result<Self, String>;
}
LogCapture
pub enum Severity { Debug = 5, Info = 9, Warn = 13, Error = 17 }
impl LogCapture {
pub fn new(prefix: &str) -> Self;
pub fn record(&mut self, severity: Severity, body: &str, attrs: HashMap<String, String>);
pub fn drain(&mut self) -> Vec<u8>;
}
Full Example
use cirata_symphony::extension::Extension;
use cirata_symphony::observability::{Observability, SpanKind};
use std::collections::HashMap;
use tracing::info;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Structured logging to stdout (captured by journald / container runtime)
tracing_subscriber::fmt::init();
let ext = Extension::new("weather", "weather").with_token(token);
ext.run_with_setup(|ext| Box::pin(async move {
let svc = ext.add_endpoints("weather", "1.0", "Weather Service", None).await?;
let obs = Observability::enable(&svc, "weather").await;
// OTLP log capture (in addition to stdout via tracing)
{
let mut logs = obs.log_capture().lock().unwrap();
logs.record(
cirata_symphony::observability::logcapture::Severity::Info,
"Weather extension starting",
HashMap::new(),
);
}
// In a handler:
let span = obs.start_span("forecast", SpanKind::Server);
info!("Fetching forecast");
// ... fetch forecast ...
obs.increment_counter("forecast.requests", 1);
obs.end_span(span, HashMap::from([
("city".into(), "London".into()),
]));
Ok(())
})).await?;
Ok(())
}
Serialization Format
All four SDKs serialize telemetry as OTLP protobuf (ExportTraceServiceRequest, ExportLogsServiceRequest, ExportMetricsServiceRequest). This ensures consistent, compact payloads and eliminates format conversion overhead in the Observability Extension.
| SDK | Serialization | Dependency |
|---|---|---|
| Go | OTLP protobuf | go.opentelemetry.io/proto/otlp |
| Python | OTLP protobuf | opentelemetry-proto |
| Java | OTLP protobuf | io.opentelemetry.proto:opentelemetry-proto |
| Rust | OTLP protobuf | opentelemetry-proto + prost crates |
The Python SDK gracefully degrades to empty responses if opentelemetry-proto is not installed. For all other SDKs, the protobuf dependency is required.
Buffer Limits and Data Lifecycle
Each telemetry signal has a fixed-size in-memory buffer. When the buffer is full, the oldest entries are evicted:
| Signal | Buffer Size | Eviction |
|---|---|---|
| Traces (spans) | 500 | Oldest span dropped |
| Logs | 1,000 | Oldest log record dropped |
| Metrics (counters/gauges) | Unbounded map | Reset on drain |
The buffer is drained completely each time the Symphony Observability Extension polls your endpoint. With the default 30-second poll interval, your extension should not exceed these limits under normal operation. If you generate high volumes of telemetry, consider:
- Reducing span granularity (instrument significant operations, not every function call)
- Filtering log severity (INFO or higher is the default)
- Using fewer distinct metric names
Best Practices
Span Naming
Use short, descriptive names that identify the operation, not the specific input:
| Good | Bad |
|---|---|
handle_query | handle query for user 12345 |
poll cirata.extensions.myext.metrics | poll |
db.insert_batch | insert 42 records into spans table |
Put variable data in span attributes, not the span name. This keeps trace views readable and enables grouping.
Span Granularity
Instrument operations that are meaningful for understanding performance and behavior:
- Request handlers (SERVER spans)
- Outgoing calls to other services (CLIENT spans)
- Significant internal operations like batch processing, cache operations, or scheduled tasks (INTERNAL spans)
Avoid instrumenting trivial operations like field access, simple calculations, or logging calls.
Error Handling
Always end spans, even on error paths. Use defer (Go), try/finally (Python/Java), or scope guards (Rust) to ensure spans are closed:
span := obs.StartSpan("operation", observability.SpanKindInternal)
defer func() {
if err != nil {
obs.EndSpanWithError(span, err)
} else {
obs.EndSpan(span, nil)
}
}()
Log Levels
- INFO—State changes, startup/shutdown, periodic summaries
- WARN—Recoverable issues, degraded performance, approaching limits
- ERROR—Failed operations, unrecoverable errors
Avoid logging at DEBUG level through the observability capture—it generates high volume with limited diagnostic value in production.