Ecosystem/13worker-observability

worker-observability

GitHub: Stackbilt-dev/worker-observability · Apache-2.0 · v0.3.0

Not yet published to npm. Install directly from GitHub:

npm install github:Stackbilt-dev/worker-observability

Part of the Stackbilt ecosystem. Pairs naturally with @stackbilt/audit-chain for tamper-evident audit logs and @stackbilt/evidence-core for content quality signals. The hosted Pro product on Stackbilder wraps this library with D1-backed storage and a live dashboard.

A complete monitoring stack extracted from a production orchestration platform. Zero production dependencies — uses only the Web Crypto API and Cloudflare bindings.


Quick start

createMonitoring initializes all modules at once with a shared redaction policy:

import { createMonitoring, createMonitoringMiddleware } from '@stackbilt/worker-observability';

const monitoring = createMonitoring({
  service:         'my-worker',
  version:         '1.0.0',
  environment:     'production',
  analyticsEngine: env.ANALYTICS,  // optional: Cloudflare Analytics Engine binding
  enableTracing:   true,
  enableSLOs:      true,
  enableAlerting:  true,
  stackbilt: {
    endpoint: 'https://stackbilder.com/api/observe/ingest',
    token:    env.STACKBILT_TOKEN,  // optional: export to Stackbilder Pro dashboard
  },
});

monitoring.logger.info('Worker started');
monitoring.metrics.increment('requests.total');
monitoring.errorTracker.track(new Error('something broke'));

Modules

Module Class / factory Purpose
Health Checks HealthChecker, HealthAggregator Dependency probes, D1/DO/service binding checks, weighted aggregate
Structured Logging createLogger JSON logging, child loggers, multiple outputs
Metrics MetricsCollector, BusinessMetrics Counters, gauges, histograms, timers, pluggable exporters
Tracing Tracer W3C Trace Context, span-based, context propagation
Error Tracking InMemoryErrorTracker, CircuitBreaker Aggregation, circuit breaker, exponential backoff
SLI/SLO SLIMonitor Availability, latency, error rate objectives + error budget
Alerting AlertManager Threshold rules, Slack/webhook/email/PagerDuty channels
Redaction Redactor Credential and PII field masking — enabled by default
Dashboard createHTMLDashboard, DashboardAggregator Self-contained HTML monitoring page

Health Checks

import { HealthChecker, commonChecks, createHealthEndpoint } from '@stackbilt/worker-observability';

const checker = new HealthChecker({ service: 'my-worker', version: '1.0.0', startTime: Date.now() });

checker.register('database',     commonChecks.database(env.DB));
checker.register('counter-do',   commonChecks.durableObject(env.COUNTER, 'health'));
checker.register('auth-service', commonChecks.serviceBinding(env.AUTH, 'auth'));
checker.register('cache', async () => ({ name: 'cache', status: 'healthy', timestamp: Date.now() }));

// Register an external dependency probe
checker.registerDependency({ name: 'stripe-api', type: 'api', url: 'https://api.stripe.com/v1', timeout: 3000, critical: true });

// Expose as a /health route handler
const healthHandler = createHealthEndpoint(checker);

HealthAggregator combines multiple checkers with weighted scoring:

const aggregator = new HealthAggregator();
aggregator.registerService('api',    apiChecker,    1.0);
aggregator.registerService('worker', workerChecker, 0.5);

const { status, score } = await aggregator.getAggregatedHealth();
// { status: 'healthy', score: 0.92, services: [...] }

Structured Logging

import { createLogger, JSONOutput } from '@stackbilt/worker-observability';

const logger = createLogger({ service: 'my-worker', minLevel: 'info', output: new JSONOutput() });

logger.info('Request received', { path: '/api/users' });
logger.error('Query failed', new Error('timeout'), { table: 'users' });

// Child loggers inherit parent context
const reqLogger = logger.child({ requestId: 'abc-123', userId: 'user-1' });
reqLogger.info('Processing request');

Available outputs: JSONOutput, ConsoleOutput, CloudflareAnalyticsOutput.


Metrics

import { MetricsCollector } from '@stackbilt/worker-observability';

const metrics = new MetricsCollector({ service: 'my-worker', flushInterval: 30000 });

metrics.increment('requests.total', 1, { method: 'GET' });
metrics.gauge('connections.active', 42);
metrics.histogram('request.duration', 150, 'milliseconds', { path: '/api' });

const end = metrics.startTimer('db.query', { table: 'users' });
await db.query('SELECT ...');
end();

const result = await metrics.measure('api.call', () => fetch('https://api.example.com'));

Available exporters: ConsoleExporter, CloudflareAnalyticsExporter, DatadogMetricsExporter.

BusinessMetrics adds pre-built patterns for workflow tracking, agent token usage, and cost monitoring:

const biz = new BusinessMetrics(metrics);
biz.workflowCreated('tenant-1', 'data-pipeline', 'wf-123');
biz.agentTokensUsed('tenant-1', 'claude-sonnet-4-6', 'anthropic', 1500);
biz.computeCost('tenant-1', 'inference', 0.003);

Distributed Tracing

W3C Trace Context propagation with span-based tracing:

import { Tracer, trace } from '@stackbilt/worker-observability';

const tracer = new Tracer({ service: 'my-worker', sampling: 1.0 });

// Async helper
const result = await trace(tracer, 'db-query', async (span) => {
  span.setAttributes({ 'db.statement': 'SELECT ...' });
  return await db.query('SELECT ...');
});

// Context propagation across service calls
const parentCtx = tracer.extract(request.headers);
const childSpan  = tracer.startSpan('downstream-call', { parent: parentCtx });
tracer.inject(childSpan.getContext(), outgoingHeaders);

Available exporters: ConsoleTraceExporter, CloudflareTraceExporter.


Error Tracking

import { InMemoryErrorTracker, CircuitBreaker, withErrorTracking } from '@stackbilt/worker-observability';

const tracker = new InMemoryErrorTracker();
tracker.track(new Error('connection reset'), { requestId: 'abc-123' });

// Circuit breaker: 5 failures → open for 60s, 2 successes to close
const breaker = new CircuitBreaker(5, 60000, 2);
const result = await breaker.execute(() => fetch('https://api.example.com'));
console.log(breaker.getState()); // 'closed' | 'open' | 'half-open'

// Wrap with automatic tracking
const data = await withErrorTracking(tracker, () => riskyOperation());

SLI/SLO Monitoring

import { SLIMonitor, AvailabilitySLI, LatencySLI, ErrorRateSLI, createStandardSLOs } from '@stackbilt/worker-observability';

const monitor = new SLIMonitor(metrics);
monitor.registerSLI(new AvailabilitySLI(metrics, 'my-worker'));
monitor.registerSLI(new LatencySLI(metrics, 'my-worker', 500));   // 500ms target
monitor.registerSLI(new ErrorRateSLI(metrics, 'my-worker', 1));   // 1% error budget

createStandardSLOs('my-worker').forEach(slo => monitor.registerSLO(slo));

const status = await monitor.getSLOStatus();
for (const [id, s] of status) {
  console.log(`${id}: ${s.status} — error budget remaining: ${s.errorBudget.remaining}%`);
}

Alert Manager

import { AlertManager, createStandardAlerts } from '@stackbilt/worker-observability';

const alertManager = new AlertManager(metrics, { service: 'my-worker', environment: 'production' });
createStandardAlerts('my-worker').forEach(rule => alertManager.registerRule(rule));

// Register a Slack channel
alertManager.registerChannel({
  id: 'slack', name: 'Ops Slack', type: 'slack',
  config: { webhook_url: 'https://hooks.slack.com/...' }, enabled: true,
});

Supported channel types: slack, webhook, email, pagerduty.


Redaction

Enabled by default on all signal pathways (logs, traces, metric tags, egress payloads).

Default deny-list covers:

  • HTTP credential headers (authorization, cookie, x-api-key, x-auth-token, etc.)
  • Any field name containing token, secret, password, api_key, bearer, private_key
  • Session identifiers (session, session_id, session_token)
  • PII email fields — broad substring match with a keep-list for non-PII variants (email_verified, email_template_id, etc.)
  • Client IPs (GDPR Article 4(1)): client_ip, cf-connecting-ip, x-forwarded-for, true-client-ip, etc.

Matching is case-insensitive. Field names remain visible; only values are replaced with [REDACTED].

Known limitation: The redactor operates on field names, not values. Secrets interpolated into log message strings are not caught. Use structured logging — separate message + metadata fields.

import { Redactor } from '@stackbilt/worker-observability';

// Default — covers common credentials and PII
const r = new Redactor();
r.redact({ headers: { authorization: 'Bearer abc' } });
// → { headers: { authorization: '[REDACTED]' } }

// Extend with service-specific fields
const custom = new Redactor({
  patterns:    [/^x-stripe-signature$/i, /customer[-_]?id/i],
  fieldNames:  ['X-Internal-Audit-Token'],
  keepPatterns: [/^email_campaign$/i],
});

Redaction is enforced at two layers: per-signal on generation, and again at egress in StackbiltCloudExporter before the HTTPS POST to stackbilder.com/api/observe/ingest.


Hono middleware

import { metricsMiddleware, tracingMiddleware, createMonitoringMiddleware } from '@stackbilt/worker-observability';

app.use('*', metricsMiddleware(metrics));
app.use('*', tracingMiddleware(tracer));
// or all at once:
const middlewares = createMonitoringMiddleware({ logger, metrics, tracer });

Cloudflare Analytics Engine

Several exporters write to Analytics Engine for zero-cost metric storage. Bind a dataset in wrangler.toml:

[[analytics_engine_datasets]]
binding = "ANALYTICS"

Then pass it to createMonitoring({ analyticsEngine: env.ANALYTICS }). The CloudflareAnalyticsOutput, CloudflareAnalyticsExporter, and CloudflareTraceExporter all consume this binding.


Hosted dashboard (Stackbilder Pro)

The hosted product on stackbilder.com wraps this library with D1-backed telemetry storage and a live dashboard. Set the stackbilt config block in createMonitoring to export from your Worker to the hosted ingest endpoint. See API Reference for the ingest API.