Skip to main content

Extension Observability

Cirata Symphony provides built-in observability for extensions through three telemetry signals: metrics, logs, and traces. When an extension enables observability, its telemetry is automatically collected by the Symphony Observability Extension and made available for querying, visualization, and export to external backends.

This guide covers how to instrument your extension to produce telemetry, how that telemetry flows through Symphony, and the SDK-specific APIs available in each language.

Contents


How It Works

  1. The browser UI automatically generates spans for page navigations, HTTP requests, and NATS requests, sending them to the platform via NATS
  2. Your extension uses the observability SDK to record spans, logs, and metrics
  3. The SDK buffers telemetry in memory and registers three NATS endpoints: .metrics, .logs, .traces
  4. The Observability Extension periodically discovers your extension and polls these endpoints
  5. Collected data is stored in DuckDB and available via query endpoints, the Symphony UI, and OTLP export

No direct network connection to the Symphony Observability Extension is needed. All communication happens through NATS request-reply.

Quick Start

The fastest path to full observability is a single Enable call during extension startup. This registers the three telemetry endpoints and gives you an API for recording spans, logs, and metrics.

Go

ext := extension.NewExtension("myext", "myext", "My Extension", token)
svc, _ := ext.AddService("myext", "1.0")

obs, _ := observability.Enable(ext, svc, "myext")
// .metrics, .logs, .traces endpoints are now registered

// In a handler:
func handleQuery(req micro.Request) {
span := obs.StartChildSpan("handle_query", observability.SpanKindServer, req)
defer obs.EndSpan(span, nil)
// ... handler logic ...
}

Python

ext = Extension("myext", "myext", token)
async with ext:
svc = await ext.add_endpoints("myext", "1.0")
obs = await Observability.enable(ext, svc, "myext")

async def handle_query(msg):
span = obs.start_child_span("handle_query", SpanKind.SERVER, msg)
try:
# ... handler logic ...
obs.end_span(span)
except Exception as e:
obs.end_span_with_error(span, e)

Java

ExtensionRuntime runtime = ExtensionRuntime.builder()...build();
Observability obs = Observability.enable(runtime, "myext");

void handleQuery(ServiceMessage msg) {
SpanHandle span = obs.startChildSpan("handle_query", SpanKind.SERVER, msg);
try {
// ... handler logic ...
obs.endSpan(span, Map.of());
} catch (Exception e) {
obs.endSpanWithError(span, e);
}
}

Rust

let ext = Extension::new("myext", "myext").with_token(token);

ext.run_with_setup(|ext| Box::pin(async move {
let svc = ext.add_endpoints("myext", "1.0", "My Extension", None).await?;
let obs = Observability::enable(&svc, "myext").await;
// .metrics, .logs, .traces endpoints are now registered

// In a handler:
let span = obs.start_span("handle_query", SpanKind::Server);
// ... handler logic ...
obs.end_span(span, HashMap::new());
Ok(())
})).await?;

Telemetry Endpoints

When observability is enabled, three NATS service endpoints are registered under your extension's prefix:

EndpointSubjectDescription
metricscirata.extensions.<prefix>.metricsReturns buffered metrics as OTLP protobuf
logscirata.extensions.<prefix>.logsReturns buffered log records as OTLP protobuf
tracescirata.extensions.<prefix>.tracesReturns buffered spans as OTLP protobuf

The Observability Extension discovers these endpoints using NATS service discovery and polls them on a configurable interval (default: 30 seconds). Each poll drains the buffer—the response contains all telemetry accumulated since the last poll.

If your extension is not yet discovered by the Symphony Observability Extension, telemetry is still buffered locally (up to the buffer limits) and will be collected on the next poll after discovery.


Traces

Traces capture the timing and causal relationships of operations in your extension. Each trace is a tree of spans, where each span represents a unit of work.

Creating Spans

Root spans start a new trace with a fresh trace ID:

span := obs.StartSpan("aggregation_cycle", observability.SpanKindInternal)
// ... do work ...
obs.EndSpan(span, map[string]string{"items.count": "42"})

Child spans continue an existing trace, typically from an incoming NATS request that carries a traceparent header:

span := obs.StartChildSpan("handle_request", observability.SpanKindServer, req)
// ... do work ...
obs.EndSpan(span, nil)

If the incoming message has no trace context, StartChildSpan falls back to creating a root span.

Span Kinds

KindValueUse When
INTERNAL1Internal operations not triggered by an external request
SERVER2Handling an incoming request from another component
CLIENT3Making an outgoing request to another component

Ending Spans

Every started span must be ended. There are two ways:

  • EndSpan(span, attributes)—marks the span as successful (status OK). Pass a map of string key-value attributes to attach metadata, or nil/null for no attributes.
  • EndSpanWithError(span, error)—marks the span as failed (status ERROR) and records the error type and message as span attributes.

Attributes

Span attributes are arbitrary key-value string pairs that add context to a span. Common conventions:

AttributeExampleDescription
nats.subjectcirata.extensions.myext.queryThe NATS subject involved
db.operationSELECTDatabase operation type
items.count150Number of items processed
exception.typejava.lang.NullPointerExceptionSet automatically on error spans
exception.messagevalue was nullSet automatically on error spans

Logs

The log capture system bridges your extension's native logging framework to the OTLP log format. Logs are buffered and drained alongside metrics and traces.

SDKIntegrationSetup Required
Goslog.Handler—call obs.GetLogCapture().Handler()One-time: create slog.Logger with the handler
Pythonlogging.Handler—attach obs.logs to any loggerOne-time: logger.addHandler(obs.logs)
JavaAutomatic—Logback bridge attaches on enable()None (Logback on classpath is sufficient)
RustDirect API—obs.log_capture().lock().unwrap().record()Manual record() calls

Go—slog Integration

The Go SDK provides an slog.Handler that captures log output into the OTLP buffer. Combine it with the stderr JSON handler using MultiHandler so that logs flow to both destinations:

obs, _ := observability.Enable(ext, svc, "myext")

// Fan out to stderr (for journald) AND the OTLP buffer
slog.SetDefault(slog.New(observability.NewMultiHandler(
observability.NewSymphonyJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}),
obs.GetLogCapture().Handler(),
)))

slog.Info("Extension started", "version", "1.0")
slog.Error("Query failed", "error", err.Error())

The LogCapture handler filters at INFO level or higher. Each log record includes the message, severity, any structured attributes, and trace/span IDs from the active context.

Direct recording is also available when you need to bypass slog:

obs.GetLogCapture().Record(9, "INFO", "Processing complete", map[string]string{
"duration_ms": "42",
})

Python—logging.Handler Integration

The Python SDK provides a standard logging.Handler that feeds log records into the OTLP buffer. Add it to the root logger alongside configure_logging() (which handles structured JSON to stdout):

import logging
from cirata.symphony.logging import configure_logging
from cirata.symphony.observability import Observability

configure_logging("myext") # Structured JSON to stdout

obs = await Observability.enable(ext, svc, "myext")

# Add OTLP capture as a second handler on the root logger
logging.getLogger().addHandler(obs.logs)

# All log calls now flow to both stdout and the OTLP buffer
logger = logging.getLogger("myext")
logger.info("Extension started")
logger.error("Query failed", exc_info=True)

Do not call logger.setLevel() after configure_logging() — the level is already set. Exception information is automatically extracted when exc_info=True is passed, recording exception.type and exception.message attributes.

Java—Automatic SLF4J/Logback Capture

The Java SDK automatically captures standard logging output when Logback is on the classpath. When Observability.enable() is called, a reflective bridge attaches to Logback's root logger and forwards all INFO-level-and-above log events to the observability system—no manual wiring required.

Observability obs = Observability.enable(runtime, "myext");

// Standard SLF4J logging is captured automatically
private static final Logger log = LoggerFactory.getLogger(MyExtension.class);

log.info("Extension started");
log.warn("Cache miss rate is high");
log.error("Query failed", exception); // stack trace is included

The bridge captures the formatted message, logger name, thread name, and full stack trace (when an exception is attached). Log records are automatically correlated with the active trace context via MDC, so logs emitted inside a span carry the correct trace ID and span ID.

note

The bridge uses reflection to avoid a compile-time dependency on Logback. If Logback is not on the classpath (e.g., when using a different SLF4J backend), the bridge is a silent no-op and you can use the direct recording API instead.

Direct recording is still available for programmatic use or when Logback is not present:

LogCapture logs = obs.getLogCapture();
logs.info("Extension started");
logs.warn("Cache miss rate is high");
logs.error("Query failed: " + e.getMessage());

// Full control over severity and attributes
logs.record(9, "INFO", "Processing batch", Map.of(
"batch.size", "100",
"duration_ms", "250"
));

Rust—Direct Recording

let obs = Observability::enable(&svc, "myext").await;
let logs = obs.log_capture();

logs.lock().unwrap().record(
Severity::Info,
"Extension started",
HashMap::from([("version".into(), "1.0".into())]),
);

Severity Levels

All SDKs use the standard OTLP severity numbers:

LevelNumberText
Debug5DEBUG
Info9INFO
Warn13WARN
Error17ERROR
Fatal21FATAL

Stdout Logging

The OTLP log capture described in the preceding sections is the observability pipeline — it buffers logs for the Observability Extension to poll. Independently, extensions must also configure their stdout output to use structured JSON so that journald (or the container runtime) captures well-formed log lines.

SDKStdout setupOTLP capture
Pythonconfigure_logging("prefix")logging.getLogger().addHandler(obs.logs)
JavaAutomatic (logback-symphony.xml ships in SDK)Automatic (Logback bridge)
GoNewSymphonyJSONHandler + NewMultiHandlerIncluded in the MultiHandler
Rusttracing_subscriber::fmt::init()obs.log_capture().lock().unwrap().record()

Both paths should be active. The examples above show how to wire them together for each language.


Metrics

The metrics capture provides a simple counter and gauge API. Metrics are buffered and drained on each poll cycle.

Go

metrics := obs.GetMetricsCapture()

// Counters — monotonically increasing values
metrics.IncrementCounter("requests.total", 1)
metrics.IncrementCounter("bytes.processed", int64(len(data)))

// Gauges — point-in-time measurements
metrics.SetGauge("queue.depth", float64(queue.Len()))
metrics.SetGauge("cache.hit_rate", 0.85)

Java

Observability obs = Observability.enable(runtime, "myext");
MetricsCapture metrics = obs.getMetricsCapture();

// Counters — monotonically increasing values
metrics.incrementCounter("requests.total", 1);
metrics.incrementCounter("bytes.processed", 1024);

// Gauges — point-in-time measurements
metrics.setGauge("queue.depth", 42.0);
metrics.setGauge("cache.hit_rate", 0.85);

Python

obs = await Observability.enable(ext, svc, "myext")

# Counters — monotonically increasing values
obs.increment_counter("requests.total")
obs.increment_counter("bytes.processed", 1024)

# Gauges — point-in-time measurements
obs.set_gauge("queue.depth", 42.0)
obs.set_gauge("cache.hit_rate", 0.85)

Rust

let obs = Observability::enable(&svc, "myext").await;

// Counters — monotonically increasing values
obs.increment_counter("requests.total", 1);
obs.increment_counter("bytes.processed", 1024);

// Gauges — point-in-time measurements
obs.set_gauge("queue.depth", 42.0);
obs.set_gauge("cache.hit_rate", 0.85);

Distributed Trace Context

Cirata Symphony propagates W3C Trace Context across NATS message boundaries using the traceparent header. This connects spans from different components into a single trace tree.

How It Works

Whenever the Symphony Platform calls your extension via NATS—whether proxying an HTTP request, invoking a service directly, or relaying a CLI command—it injects a traceparent header into the NATS message:

traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
│ │ │ │
│ │ │ └─ flags (sampled)
│ │ └─ parent span ID (8 bytes hex)
│ └─ trace ID (16 bytes hex)
└─ version

Your extension extracts this context and creates a child span, continuing the same trace:

The browser UI is the typical origin of traces. When a user navigates to a page or interacts with the UI, the browser creates a root span and propagates its trace context through HTTP and NATS requests to the platform and extensions.

Extracting Context from Incoming Requests

Each SDK's StartChildSpan method automatically extracts the traceparent header from the incoming NATS message. If the header is present, the new span becomes a child of the caller's span. If it is absent, a new root span is created.

Go:

// StartChildSpan extracts traceparent from req automatically
span := obs.StartChildSpan("handle_request", observability.SpanKindServer, req)

You can also extract the context manually:

tc, ok := extension.ExtractTraceContext(req)
if ok {
// tc.TraceID and tc.SpanID are available
}

Python:

span = obs.start_child_span("handle_request", SpanKind.SERVER, msg)

Or manually:

from cirata.symphony.observability import TracePropagator
ctx = TracePropagator.extract(msg)
if ctx is not None:
# ctx.trace_id and ctx.span_id are available

Java:

SpanHandle span = obs.startChildSpan("handle_request", SpanKind.SERVER, msg);

Or manually:

TraceContext ctx = TracePropagator.extract(msg);
if (ctx.isValid()) {
// ctx.traceId() and ctx.spanId() are available
}

Injecting Context into Outgoing Requests

When your extension calls another extension, inject trace context so the downstream extension can continue the trace.

Go:

// Using the Symphony client library — context propagation is automatic
// when TraceContext is on the Go context
ctx := shared.WithTraceContext(ctx, tc)
resp, err := client.Extension("other-ext").Invoke(ctx, "method", payload)

Java:

// Manual injection for direct NATS calls
TraceContext ctx = new TraceContext(span.traceId(), span.spanId(), (byte) 0x01);
Headers headers = TracePropagator.inject(ctx);
// Use headers when making NATS request

Python:

from cirata.symphony.observability import TracePropagator, TraceContext
ctx = TraceContext.new_root()
headers = TracePropagator.inject(ctx)
# Use headers dict when making NATS request

Browser Telemetry

The Symphony UI automatically instruments all browser interactions. Page navigations, HTTP requests, and NATS requests generate spans under service name ui that appear in the collector alongside backend spans. No configuration is required—telemetry is built into the platform UI.

What is Instrumented

SignalDescriptionAutomatic
Page navigationsRoot span per page visit (page /dashboard, page /extensions, etc.)Yes
HTTP requestsSpan per axios call with method, URL, status code, durationYes
NATS requestsSpan per NATS request with subject and durationYes
Extension iframe NATSNATS requests from sandboxed extension UIsYes
W3C traceparent propagationAll HTTP and NATS calls carry traceparent headersYes

Trace Hierarchy

Browser telemetry creates end-to-end traces that span the browser, the Symphony platform, and your extensions. A single user interaction produces a trace tree like this:

Each page navigation creates a root span. HTTP and NATS requests made during that page load become child spans. When those requests reach the Symphony platform or an extension, the server-side handler creates its own child span—continuing the same trace.

Service Names

Browser spans use service.name: "ui" and platform spans use service.name: "symphony". Extension spans use service.name: "<extension-prefix>". All appear in the same trace when correlated by trace ID. You can filter by service name in the Observability Extension's trace query interface to isolate browser-side, platform-side, or extension-side activity.

How It Works

The UI generates W3C-compliant trace IDs using crypto.getRandomValues(), injects traceparent headers into HTTP requests (via an axios interceptor) and NATS messages (via traced request helpers), and exports completed spans as JSON to the platform's cirata.symphony.traces NATS endpoint. The platform converts these browser spans to OTLP protobuf format for the Observability Extension to collect and store alongside backend telemetry.

Extension UI Telemetry

Extension UIs running in sandboxed iframes get automatic NATS trace propagation via the BridgeHandler. No extension code changes are needed—the bridge injects traceparent headers on every NATS publish and request originating from extension iframes. This means your extension's UI interactions appear as part of the same trace tree as the parent page navigation.


Go SDK

Package: cirata.com/symphony/extension/observability

Types

type SpanKind int32

const (
SpanKindInternal SpanKind = 1 // Internal operations
SpanKindServer SpanKind = 2 // Handling incoming requests
SpanKindClient SpanKind = 3 // Making outgoing requests
)

Observability

// Enable registers .metrics, .logs, .traces endpoints on the NATS service.
func Enable(ext *extension.Extension, svc micro.Service, prefix string) (*Observability, error)

func (o *Observability) StartSpan(name string, kind SpanKind) *SpanHandle
func (o *Observability) StartChildSpan(name string, kind SpanKind, req micro.Request) *SpanHandle
func (o *Observability) EndSpan(span *SpanHandle, attrs map[string]string)
func (o *Observability) EndSpanWithError(span *SpanHandle, err error)
func (o *Observability) GetLogCapture() *LogCapture
func (o *Observability) GetMetricsCapture() *MetricsCapture

SpanHandle

type SpanHandle struct {
TraceID [16]byte
SpanID [8]byte
ParentSpanID []byte // nil for root spans
Name string
Kind SpanKind
StartTime time.Time
}

LogCapture

func (lc *LogCapture) Handler() slog.Handler
func (lc *LogCapture) Record(severity int, severityText, body string, attrs map[string]string)
func (lc *LogCapture) Drain() []byte

MetricsCapture

func (mc *MetricsCapture) IncrementCounter(name string, delta int64)
func (mc *MetricsCapture) SetGauge(name string, value float64)
func (mc *MetricsCapture) Drain() []byte

Trace Context Utilities

These are in the cirata.com/symphony/shared package:

func NewRootTraceContext() TraceContext
func NewChildTraceContext(parent TraceContext) TraceContext
func FormatTraceparent(ctx TraceContext) string
func ParseTraceparent(header string) (TraceContext, error)
func WithTraceContext(ctx context.Context, tc TraceContext) context.Context
func TraceContextFromContext(ctx context.Context) (TraceContext, bool)

And in the extension package:

func ExtractTraceContext(req micro.Request) (shared.TraceContext, bool)

Full Example

package main

import (
"log/slog"
"os"

"cirata.com/symphony/extension"
"cirata.com/symphony/extension/observability"
"github.com/nats-io/nats.go/micro"
)

func main() {
ext := extension.NewExtension("weather", "weather", "Weather Service", token)

svc, err := ext.AddService("weather", "1.0")
if err != nil {
panic(err)
}

obs, err := observability.Enable(ext, svc, "weather")
if err != nil {
panic(err)
}

// Structured JSON to stderr + OTLP capture via MultiHandler
slog.SetDefault(slog.New(observability.NewMultiHandler(
observability.NewSymphonyJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}),
obs.GetLogCapture().Handler(),
)))
slog.Info("Weather extension starting")

// Register a handler
grp := svc.AddGroup("cirata.extensions.weather")
grp.AddEndpoint("forecast", micro.HandlerFunc(func(req micro.Request) {
span := obs.StartChildSpan("forecast", observability.SpanKindServer, req)

city := string(req.Data())
slog.Info("Fetching forecast", "city", city)
obs.GetMetricsCapture().IncrementCounter("forecast.requests", 1)

forecast, err := fetchForecast(city)
if err != nil {
obs.EndSpanWithError(span, err)
req.Error("500", "forecast failed", nil)
return
}

obs.EndSpan(span, map[string]string{"city": city})
req.Respond(forecast)
}))

ext.Run()
}

Python SDK

Package: cirata.symphony.observability

Types

class SpanKind(IntEnum):
INTERNAL = 1
SERVER = 2
CLIENT = 3

Observability

class Observability:
@classmethod
async def enable(cls, ext, svc, prefix: str) -> "Observability"

def start_span(self, name: str, kind: SpanKind = SpanKind.INTERNAL) -> SpanHandle
def start_child_span(self, name: str, kind: SpanKind, msg) -> SpanHandle
def end_span(self, span: SpanHandle, attrs: Optional[dict] = None)
def end_span_with_error(self, span: SpanHandle, error: Exception)

SpanHandle

@dataclass
class SpanHandle:
trace_id: bytes # 16 bytes
span_id: bytes # 8 bytes
parent_span_id: Optional[bytes]
name: str
kind: int
start_time_ns: int

LogCapture

Extends logging.Handler:

class LogCapture(logging.Handler):
def __init__(self, prefix: str, level=logging.INFO)
def emit(self, record: logging.LogRecord) # Called automatically by logging
def drain(self) -> bytes

TracePropagator

class TracePropagator:
@staticmethod
def inject(ctx: TraceContext) -> dict # Returns {"traceparent": "00-..."}

@staticmethod
def extract(msg) -> Optional[TraceContext]

@staticmethod
def parse(header: str) -> Optional[TraceContext]

TraceContext

@dataclass(frozen=True)
class TraceContext:
trace_id: bytes # 16 bytes
span_id: bytes # 8 bytes
flags: int = 0x01

@property
def is_valid(self) -> bool

@staticmethod
def new_root() -> TraceContext

def new_child(self) -> TraceContext

Full Example

import asyncio
import logging
from cirata import symphony
from cirata.symphony.observability import Observability, SpanKind
from cirata.symphony.logging import configure_logging

configure_logging("weather")
logger = logging.getLogger("weather")

async def main():
async with symphony.Extension("weather", "weather") as ext:
svc = await ext.add_endpoints("weather", "1.0")
obs = await Observability.enable(ext, svc, "weather")

# Add OTLP capture alongside stdout (configured by configure_logging)
logging.getLogger().addHandler(obs.logs)
logger.info("Weather extension starting")

async def handle_forecast(msg):
span = obs.start_child_span("forecast", SpanKind.SERVER, msg)
try:
city = msg.data.decode()
logger.info("Fetching forecast for %s", city)
forecast = await fetch_forecast(city)
obs.end_span(span, {"city": city})
await msg.respond(forecast)
except Exception as e:
obs.end_span_with_error(span, e)
logger.error("Forecast failed", exc_info=True)

grp = svc.add_group("cirata.extensions.weather")
await grp.add_endpoint("forecast", handle_forecast)
await ext.operate()

asyncio.run(main())

Java SDK

Package: com.cirata.symphony.observability

Types

public enum SpanKind {
INTERNAL(1), SERVER(2), CLIENT(3), PRODUCER(4), CONSUMER(5);
public int getValue();
}

Observability

public class Observability {
public static Observability enable(ExtensionRuntime runtime, String prefix);

public SpanHandle startSpan(String name, SpanKind kind);
public SpanHandle startChildSpan(String name, SpanKind kind, ServiceMessage msg);
public void endSpan(SpanHandle span, Map<String, String> attributes);
public void endSpanWithError(SpanHandle span, Throwable error);
public TraceCapture getTraceCapture();
public LogCapture getLogCapture();
public MetricsCapture getMetricsCapture();
}

SpanHandle

public record SpanHandle(
byte[] traceId,
byte[] spanId,
byte[] parentSpanId, // null for root spans
String name,
SpanKind kind,
long startNano
) {}

LogCapture

public class LogCapture {
// Automatic: when Logback is on the classpath, INFO+ log events from all
// SLF4J loggers are captured automatically (including stack traces and
// trace context from MDC). No manual setup required.

// Direct recording API (also works without Logback):
public void record(int severityNumber, String severityText, String body,
Map<String, String> attributes);
public void info(String message);
public void warn(String message);
public void error(String message);
public byte[] drain();
}

MetricsCapture

public class MetricsCapture {
public void incrementCounter(String name, long delta);
public void setGauge(String name, double value);
public byte[] drain();
}

TracePropagator

public final class TracePropagator {
public static Headers inject(TraceContext ctx);
public static TraceContext extract(Headers headers);
public static TraceContext extract(ServiceMessage msg);
}

TraceContext

public record TraceContext(byte[] traceId, byte[] spanId, byte traceFlags) {
public static final TraceContext EMPTY;
public boolean isValid();
}

Full Example

public class WeatherExtension {
// Standard SLF4J logger — output is captured automatically
private static final Logger log = LoggerFactory.getLogger(WeatherExtension.class);

public static void main(String[] args) {
ExtensionRuntime runtime = ExtensionRuntime.builder()
.name("weather").prefix("weather").token(token)
.build();

Observability obs = Observability.enable(runtime, "weather");
// Logback bridge is now active — all SLF4J INFO+ logs are captured
log.info("Weather extension starting");

// Register endpoint handler
runtime.addEndpoint("forecast", msg -> {
SpanHandle span = obs.startChildSpan("forecast", SpanKind.SERVER, msg);
try {
String city = new String(msg.getData());
log.info("Fetching forecast for {}", city);
byte[] forecast = fetchForecast(city);
obs.endSpan(span, Map.of("city", city));
msg.respond(runtime.getUserConnection(), forecast);
} catch (Exception e) {
obs.endSpanWithError(span, e);
log.error("Forecast failed", e); // stack trace included
}
});

runtime.run();
}
}

Rust SDK

Module: cirata_symphony::observability

Types

pub enum SpanKind {
Internal = 1,
Server = 2,
Client = 3,
}

Observability

impl Observability {
pub fn new(prefix: &str) -> Self;
pub async fn enable(svc: &async_nats::service::Service, prefix: &str) -> Arc<Self>;
pub fn start_span(&self, name: &str, kind: SpanKind) -> SpanHandle;
pub fn start_child_span(&self, name: &str, kind: SpanKind, parent: &TraceContext) -> SpanHandle;
pub fn start_child_span_from_request(&self, name: &str, kind: SpanKind, request: &async_nats::service::Request) -> SpanHandle;
pub fn start_child_span_from_header(&self, name: &str, kind: SpanKind, traceparent: &str) -> SpanHandle;
pub fn end_span(&self, span: SpanHandle, attrs: HashMap<String, String>);
pub fn end_span_with_error(&self, span: SpanHandle, err: &dyn Error);
pub fn ingest_raw(&self, data: Vec<u8>);
pub fn increment_counter(&self, name: &str, delta: i64);
pub fn set_gauge(&self, name: &str, value: f64);
pub fn drain_traces(&self) -> Vec<u8>;
pub fn drain_logs(&self) -> Vec<u8>;
pub fn drain_metrics(&self) -> Vec<u8>;
pub fn log_capture(&self) -> Arc<Mutex<LogCapture>>;
pub fn metrics_capture(&self) -> Arc<Mutex<MetricsCapture>>;
pub fn trace_capture(&self) -> Arc<Mutex<TraceCapture>>;
}

SpanHandle

pub struct SpanHandle {
pub trace_id: [u8; 16],
pub span_id: [u8; 8],
pub parent_span_id: Option<Vec<u8>>,
pub name: String,
pub kind: i32,
pub start_time_ns: u64,
}

TraceContext

pub struct TraceContext {
pub trace_id: [u8; 16],
pub span_id: [u8; 8],
pub flags: u8,
}

impl TraceContext {
pub fn new_root() -> Self;
pub fn new_child(&self) -> Self;
pub fn format(&self) -> String; // "00-{traceId}-{spanId}-{flags}"
pub fn parse(header: &str) -> Result<Self, String>;
}

LogCapture

pub enum Severity { Debug = 5, Info = 9, Warn = 13, Error = 17 }

impl LogCapture {
pub fn new(prefix: &str) -> Self;
pub fn record(&mut self, severity: Severity, body: &str, attrs: HashMap<String, String>);
pub fn drain(&mut self) -> Vec<u8>;
}

Full Example

use cirata_symphony::extension::Extension;
use cirata_symphony::observability::{Observability, SpanKind};
use std::collections::HashMap;
use tracing::info;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Structured logging to stdout (captured by journald / container runtime)
tracing_subscriber::fmt::init();

let ext = Extension::new("weather", "weather").with_token(token);

ext.run_with_setup(|ext| Box::pin(async move {
let svc = ext.add_endpoints("weather", "1.0", "Weather Service", None).await?;
let obs = Observability::enable(&svc, "weather").await;

// OTLP log capture (in addition to stdout via tracing)
{
let mut logs = obs.log_capture().lock().unwrap();
logs.record(
cirata_symphony::observability::logcapture::Severity::Info,
"Weather extension starting",
HashMap::new(),
);
}

// In a handler:
let span = obs.start_span("forecast", SpanKind::Server);
info!("Fetching forecast");
// ... fetch forecast ...
obs.increment_counter("forecast.requests", 1);
obs.end_span(span, HashMap::from([
("city".into(), "London".into()),
]));

Ok(())
})).await?;

Ok(())
}

Serialization Format

All four SDKs serialize telemetry as OTLP protobuf (ExportTraceServiceRequest, ExportLogsServiceRequest, ExportMetricsServiceRequest). This ensures consistent, compact payloads and eliminates format conversion overhead in the Observability Extension.

SDKSerializationDependency
GoOTLP protobufgo.opentelemetry.io/proto/otlp
PythonOTLP protobufopentelemetry-proto
JavaOTLP protobufio.opentelemetry.proto:opentelemetry-proto
RustOTLP protobufopentelemetry-proto + prost crates

The Python SDK gracefully degrades to empty responses if opentelemetry-proto is not installed. For all other SDKs, the protobuf dependency is required.


Buffer Limits and Data Lifecycle

Each telemetry signal has a fixed-size in-memory buffer. When the buffer is full, the oldest entries are evicted:

SignalBuffer SizeEviction
Traces (spans)500Oldest span dropped
Logs1,000Oldest log record dropped
Metrics (counters/gauges)Unbounded mapReset on drain

The buffer is drained completely each time the Symphony Observability Extension polls your endpoint. With the default 30-second poll interval, your extension should not exceed these limits under normal operation. If you generate high volumes of telemetry, consider:

  • Reducing span granularity (instrument significant operations, not every function call)
  • Filtering log severity (INFO or higher is the default)
  • Using fewer distinct metric names

Best Practices

Span Naming

Use short, descriptive names that identify the operation, not the specific input:

GoodBad
handle_queryhandle query for user 12345
poll cirata.extensions.myext.metricspoll
db.insert_batchinsert 42 records into spans table

Put variable data in span attributes, not the span name. This keeps trace views readable and enables grouping.

Span Granularity

Instrument operations that are meaningful for understanding performance and behavior:

  • Request handlers (SERVER spans)
  • Outgoing calls to other services (CLIENT spans)
  • Significant internal operations like batch processing, cache operations, or scheduled tasks (INTERNAL spans)

Avoid instrumenting trivial operations like field access, simple calculations, or logging calls.

Error Handling

Always end spans, even on error paths. Use defer (Go), try/finally (Python/Java), or scope guards (Rust) to ensure spans are closed:

span := obs.StartSpan("operation", observability.SpanKindInternal)
defer func() {
if err != nil {
obs.EndSpanWithError(span, err)
} else {
obs.EndSpan(span, nil)
}
}()

Log Levels

  • INFO—State changes, startup/shutdown, periodic summaries
  • WARN—Recoverable issues, degraded performance, approaching limits
  • ERROR—Failed operations, unrecoverable errors

Avoid logging at DEBUG level through the observability capture—it generates high volume with limited diagnostic value in production.