Observability Guide
Introduction¶
This guide provides comprehensive standards for implementing distributed tracing and structured logging across microservices and distributed systems. It covers OpenTelemetry instrumentation, structured logging patterns, log aggregation strategies, and observability best practices.
Table of Contents¶
- Observability Philosophy
- Distributed Tracing
- Trace Sampling Strategies
- Structured Logging
- Log Aggregation
- Correlation and Context
- Error Tracking
- Metrics Integration
- Metric Naming Conventions
- Label and Tag Standards
- Grafana Dashboards
- CI/CD Integration
- Best Practices
Observability Philosophy¶
The Three Pillars of Observability¶
┌─────────────────────────────────────────────────────────────┐
│ Observability Stack │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TRACES │ │ LOGS │ │ METRICS │ │
│ │ │ │ │ │ │ │
│ │ What path │ │ What │ │ What is │ │
│ │ did the │ │ happened │ │ the system │ │
│ │ request │ │ at each │ │ state over │ │
│ │ take? │ │ point? │ │ time? │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Correlation │ │
│ │ ID │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Key Principles:
- Traces show the journey of a request across services
- Logs provide detailed context at each processing point
- Metrics quantify system behavior over time
- Correlation IDs tie all three together
Observability Standards¶
┌─────────────────┬─────────────────────────────────────────┐
│ Standard │ Description │
├─────────────────┼─────────────────────────────────────────┤
│ OpenTelemetry │ Vendor-neutral instrumentation standard │
│ W3C Trace │ HTTP trace context propagation │
│ JSON Logs │ Machine-parseable structured logs │
│ Semantic Conv. │ Consistent attribute naming │
└─────────────────┴─────────────────────────────────────────┘
Distributed Tracing¶
OpenTelemetry Setup (Python)¶
Installation:
pip install opentelemetry-api \
opentelemetry-sdk \
opentelemetry-exporter-otlp \
opentelemetry-instrumentation-requests \
opentelemetry-instrumentation-flask \
opentelemetry-instrumentation-sqlalchemy \
opentelemetry-instrumentation-redis
Tracer configuration:
# src/observability/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.trace.sampling import (
ParentBasedTraceIdRatio,
TraceIdRatioBased,
)
def configure_tracer(
service_name: str,
service_version: str,
otlp_endpoint: str = "localhost:4317",
sample_rate: float = 1.0,
debug: bool = False,
) -> trace.Tracer:
"""Configure OpenTelemetry tracer with OTLP exporter."""
resource = Resource.create({
SERVICE_NAME: service_name,
SERVICE_VERSION: service_version,
"deployment.environment": os.getenv("ENVIRONMENT", "development"),
"host.name": os.getenv("HOSTNAME", "unknown"),
})
sampler = ParentBasedTraceIdRatio(sample_rate)
provider = TracerProvider(
resource=resource,
sampler=sampler,
)
otlp_exporter = OTLPSpanExporter(
endpoint=otlp_endpoint,
insecure=True,
)
provider.add_span_processor(
BatchSpanProcessor(otlp_exporter)
)
if debug:
provider.add_span_processor(
BatchSpanProcessor(ConsoleSpanExporter())
)
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name, service_version)
tracer = configure_tracer(
service_name="order-service",
service_version="1.2.3",
otlp_endpoint=os.getenv("OTLP_ENDPOINT", "localhost:4317"),
sample_rate=float(os.getenv("TRACE_SAMPLE_RATE", "0.1")),
)
Auto-instrumentation:
# src/observability/auto_instrument.py
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
def instrument_all(app=None, engine=None):
"""Apply auto-instrumentation to common libraries."""
RequestsInstrumentor().instrument()
Psycopg2Instrumentor().instrument()
RedisInstrumentor().instrument()
CeleryInstrumentor().instrument()
if app:
FlaskInstrumentor().instrument_app(app)
if engine:
SQLAlchemyInstrumentor().instrument(engine=engine)
Manual span creation:
# src/services/order_service.py
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
class OrderService:
"""Order processing service with tracing."""
def create_order(self, user_id: str, items: list) -> Order:
"""Create a new order with full tracing."""
with tracer.start_as_current_span(
"create_order",
attributes={
"user.id": user_id,
"order.item_count": len(items),
}
) as span:
try:
order = self._validate_and_create(user_id, items)
span.set_attribute("order.id", order.id)
span.set_attribute("order.total", float(order.total))
span.set_status(Status(StatusCode.OK))
return order
except ValidationError as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
def _validate_and_create(self, user_id: str, items: list) -> Order:
"""Validate items and create order."""
with tracer.start_as_current_span("validate_items") as span:
validated_items = []
for item in items:
validated = self._validate_item(item)
validated_items.append(validated)
span.set_attribute("validated_count", len(validated_items))
with tracer.start_as_current_span("check_inventory") as span:
inventory_status = self.inventory_client.check_availability(
[i["product_id"] for i in validated_items]
)
span.set_attribute("all_available", inventory_status.all_available)
with tracer.start_as_current_span("calculate_pricing") as span:
pricing = self.pricing_service.calculate(validated_items)
span.set_attribute("subtotal", float(pricing.subtotal))
span.set_attribute("tax", float(pricing.tax))
span.set_attribute("total", float(pricing.total))
with tracer.start_as_current_span("persist_order") as span:
order = Order(
user_id=user_id,
items=validated_items,
pricing=pricing,
)
self.repository.save(order)
span.set_attribute("order.id", order.id)
return order
def _validate_item(self, item: dict) -> dict:
"""Validate a single order item."""
with tracer.start_as_current_span(
"validate_item",
attributes={"product.id": item.get("product_id")}
):
product = self.product_client.get(item["product_id"])
if not product:
raise ValidationError(f"Product not found: {item['product_id']}")
return {
"product_id": product.id,
"name": product.name,
"price": product.price,
"quantity": item["quantity"],
}
Context propagation:
# src/observability/propagation.py
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import requests
propagator = TraceContextTextMapPropagator()
def make_traced_request(method: str, url: str, **kwargs) -> requests.Response:
"""Make HTTP request with trace context propagation."""
headers = kwargs.pop("headers", {})
inject(headers)
return requests.request(method, url, headers=headers, **kwargs)
def extract_context_from_request(request) -> trace.Context:
"""Extract trace context from incoming HTTP request."""
return extract(request.headers)
def traced_request_handler(func):
"""Decorator to extract trace context and create span for request handlers."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
context = extract(request.headers)
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
f"{request.method} {request.path}",
context=context,
attributes={
"http.method": request.method,
"http.url": request.url,
"http.route": request.path,
"http.user_agent": request.headers.get("User-Agent", ""),
}
) as span:
try:
response = func(*args, **kwargs)
span.set_attribute("http.status_code", response.status_code)
return response
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
return wrapper
Async tracing:
# src/observability/async_tracing.py
import asyncio
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
async def traced_async_operation(name: str, coro, **attributes):
"""Execute async operation with tracing."""
with tracer.start_as_current_span(name, attributes=attributes) as span:
try:
result = await coro
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
class AsyncOrderProcessor:
"""Async order processor with tracing."""
async def process_order(self, order_id: str) -> dict:
"""Process order with parallel traced operations."""
with tracer.start_as_current_span(
"process_order",
attributes={"order.id": order_id}
) as span:
order = await self.repository.get(order_id)
span.set_attribute("order.status", order.status)
results = await asyncio.gather(
traced_async_operation(
"validate_payment",
self.payment_service.validate(order.payment_id),
payment_id=order.payment_id,
),
traced_async_operation(
"reserve_inventory",
self.inventory_service.reserve(order.items),
item_count=len(order.items),
),
traced_async_operation(
"calculate_shipping",
self.shipping_service.calculate(order.shipping_address),
address_country=order.shipping_address.country,
),
)
payment_valid, inventory_reserved, shipping_cost = results
span.set_attribute("payment.valid", payment_valid)
span.set_attribute("inventory.reserved", inventory_reserved)
span.set_attribute("shipping.cost", float(shipping_cost))
return {
"order_id": order_id,
"payment_valid": payment_valid,
"inventory_reserved": inventory_reserved,
"shipping_cost": shipping_cost,
}
OpenTelemetry Setup (TypeScript)¶
Installation:
npm install @opentelemetry/api \
@opentelemetry/sdk-node \
@opentelemetry/auto-instrumentations-node \
@opentelemetry/exporter-trace-otlp-grpc \
@opentelemetry/resources \
@opentelemetry/semantic-conventions
Tracer configuration:
// src/observability/tracing.ts
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
import { Resource } from '@opentelemetry/resources';
import {
SEMRESATTRS_SERVICE_NAME,
SEMRESATTRS_SERVICE_VERSION,
SEMRESATTRS_DEPLOYMENT_ENVIRONMENT,
} from '@opentelemetry/semantic-conventions';
import { ParentBasedSampler, TraceIdRatioBasedSampler } from '@opentelemetry/sdk-trace-base';
import { diag, DiagConsoleLogger, DiagLogLevel, trace, Span, SpanStatusCode } from '@opentelemetry/api';
interface TracingConfig {
serviceName: string;
serviceVersion: string;
otlpEndpoint?: string;
sampleRate?: number;
debug?: boolean;
}
export function initializeTracing(config: TracingConfig): NodeSDK {
const {
serviceName,
serviceVersion,
otlpEndpoint = 'localhost:4317',
sampleRate = 1.0,
debug = false,
} = config;
if (debug) {
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
}
const resource = new Resource({
[SEMRESATTRS_SERVICE_NAME]: serviceName,
[SEMRESATTRS_SERVICE_VERSION]: serviceVersion,
[SEMRESATTRS_DEPLOYMENT_ENVIRONMENT]: process.env.NODE_ENV || 'development',
'host.name': process.env.HOSTNAME || 'unknown',
});
const traceExporter = new OTLPTraceExporter({
url: `http://${otlpEndpoint}`,
});
const sampler = new ParentBasedSampler({
root: new TraceIdRatioBasedSampler(sampleRate),
});
const sdk = new NodeSDK({
resource,
traceExporter,
sampler,
instrumentations: [
getNodeAutoInstrumentations({
'@opentelemetry/instrumentation-fs': { enabled: false },
'@opentelemetry/instrumentation-http': {
ignoreIncomingPaths: ['/health', '/ready', '/metrics'],
},
}),
],
});
sdk.start();
process.on('SIGTERM', () => {
sdk.shutdown()
.then(() => console.log('Tracing terminated'))
.catch((error) => console.error('Error terminating tracing', error))
.finally(() => process.exit(0));
});
return sdk;
}
export const tracer = trace.getTracer('order-service', '1.0.0');
Manual span creation:
// src/services/order.service.ts
import { trace, Span, SpanStatusCode, context } from '@opentelemetry/api';
const tracer = trace.getTracer('order-service');
interface OrderItem {
productId: string;
quantity: number;
price: number;
}
interface Order {
id: string;
userId: string;
items: OrderItem[];
total: number;
status: string;
}
export class OrderService {
async createOrder(userId: string, items: OrderItem[]): Promise<Order> {
return tracer.startActiveSpan(
'createOrder',
{
attributes: {
'user.id': userId,
'order.item_count': items.length,
},
},
async (span: Span) => {
try {
const order = await this.validateAndCreate(userId, items);
span.setAttributes({
'order.id': order.id,
'order.total': order.total,
});
span.setStatus({ code: SpanStatusCode.OK });
return order;
} catch (error) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error instanceof Error ? error.message : 'Unknown error',
});
span.recordException(error as Error);
throw error;
} finally {
span.end();
}
}
);
}
private async validateAndCreate(userId: string, items: OrderItem[]): Promise<Order> {
const validatedItems = await tracer.startActiveSpan(
'validateItems',
async (span: Span) => {
try {
const validated = await Promise.all(
items.map((item) => this.validateItem(item))
);
span.setAttribute('validated_count', validated.length);
return validated;
} finally {
span.end();
}
}
);
const inventoryStatus = await tracer.startActiveSpan(
'checkInventory',
async (span: Span) => {
try {
const status = await this.inventoryClient.checkAvailability(
validatedItems.map((i) => i.productId)
);
span.setAttribute('all_available', status.allAvailable);
return status;
} finally {
span.end();
}
}
);
const pricing = await tracer.startActiveSpan(
'calculatePricing',
async (span: Span) => {
try {
const result = await this.pricingService.calculate(validatedItems);
span.setAttributes({
subtotal: result.subtotal,
tax: result.tax,
total: result.total,
});
return result;
} finally {
span.end();
}
}
);
return tracer.startActiveSpan(
'persistOrder',
async (span: Span) => {
try {
const order: Order = {
id: this.generateId(),
userId,
items: validatedItems,
total: pricing.total,
status: 'pending',
};
await this.repository.save(order);
span.setAttribute('order.id', order.id);
return order;
} finally {
span.end();
}
}
);
}
private async validateItem(item: OrderItem): Promise<OrderItem> {
return tracer.startActiveSpan(
'validateItem',
{ attributes: { 'product.id': item.productId } },
async (span: Span) => {
try {
const product = await this.productClient.get(item.productId);
if (!product) {
throw new Error(`Product not found: ${item.productId}`);
}
return { ...item, price: product.price };
} finally {
span.end();
}
}
);
}
}
Context propagation:
// src/observability/propagation.ts
import { context, propagation, trace, SpanStatusCode } from '@opentelemetry/api';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import axios, { AxiosRequestConfig, AxiosResponse } from 'axios';
propagation.setGlobalPropagator(new W3CTraceContextPropagator());
export async function tracedHttpRequest<T>(
config: AxiosRequestConfig
): Promise<AxiosResponse<T>> {
const headers: Record<string, string> = {
...(config.headers as Record<string, string>),
};
propagation.inject(context.active(), headers);
return axios.request<T>({ ...config, headers });
}
export function extractContextFromHeaders(
headers: Record<string, string | string[] | undefined>
): ReturnType<typeof propagation.extract> {
const normalizedHeaders: Record<string, string> = {};
for (const [key, value] of Object.entries(headers)) {
if (value) {
normalizedHeaders[key.toLowerCase()] = Array.isArray(value) ? value[0] : value;
}
}
return propagation.extract(context.active(), normalizedHeaders);
}
export function tracedRequestHandler(handlerName: string) {
return function (
target: unknown,
propertyKey: string,
descriptor: PropertyDescriptor
): PropertyDescriptor {
const originalMethod = descriptor.value;
descriptor.value = async function (req: Request, res: Response, ...args: unknown[]) {
const extractedContext = extractContextFromHeaders(
req.headers as Record<string, string>
);
return context.with(extractedContext, async () => {
const tracer = trace.getTracer('http-server');
return tracer.startActiveSpan(
`${req.method} ${handlerName}`,
{
attributes: {
'http.method': req.method,
'http.url': req.url,
'http.route': handlerName,
'http.user_agent': req.headers['user-agent'] || '',
},
},
async (span) => {
try {
const result = await originalMethod.apply(this, [req, res, ...args]);
span.setAttribute('http.status_code', res.statusCode);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.recordException(error as Error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: (error as Error).message,
});
throw error;
} finally {
span.end();
}
}
);
});
};
return descriptor;
};
}
Span Naming Conventions¶
# Span naming standards
SPAN_NAMING_CONVENTIONS = {
# HTTP spans
"http_server": "{method} {route}",
"http_client": "HTTP {method}",
# Database spans
"db_query": "{db.system} {db.operation}",
"db_statement": "{db.system} {db.operation} {db.sql.table}",
# Messaging spans
"message_publish": "{messaging.system} publish",
"message_receive": "{messaging.system} receive",
"message_process": "{messaging.system} process",
# RPC spans
"rpc_client": "{rpc.system}/{rpc.service}/{rpc.method}",
"rpc_server": "{rpc.system}/{rpc.service}/{rpc.method}",
# Internal spans
"internal": "{component}.{operation}",
}
# Examples
SPAN_NAMES = {
"http_server": "GET /api/users/{id}",
"http_client": "HTTP POST",
"db_query": "postgresql SELECT",
"db_statement": "postgresql SELECT users",
"message_publish": "kafka publish",
"message_receive": "rabbitmq receive",
"rpc_client": "grpc/UserService/GetUser",
"internal": "OrderService.validateItems",
}
Sampling Strategies¶
# src/observability/sampling.py
from opentelemetry.sdk.trace.sampling import (
Sampler,
SamplingResult,
Decision,
ParentBased,
TraceIdRatioBased,
ALWAYS_ON,
ALWAYS_OFF,
)
from opentelemetry.trace import Link, SpanKind
from opentelemetry.util.types import Attributes
class PrioritySampler(Sampler):
"""Sample based on request priority and error status."""
def __init__(self, default_rate: float = 0.1, high_priority_rate: float = 1.0):
self.default_rate = default_rate
self.high_priority_rate = high_priority_rate
self.default_sampler = TraceIdRatioBased(default_rate)
self.high_priority_sampler = TraceIdRatioBased(high_priority_rate)
def should_sample(
self,
parent_context,
trace_id: int,
name: str,
kind: SpanKind = None,
attributes: Attributes = None,
links: list[Link] = None,
) -> SamplingResult:
attributes = attributes or {}
if attributes.get("error", False):
return SamplingResult(Decision.RECORD_AND_SAMPLE, attributes)
if attributes.get("priority") == "high":
return self.high_priority_sampler.should_sample(
parent_context, trace_id, name, kind, attributes, links
)
if name.startswith("health") or name.startswith("ready"):
return SamplingResult(Decision.DROP, attributes)
return self.default_sampler.should_sample(
parent_context, trace_id, name, kind, attributes, links
)
def get_description(self) -> str:
return f"PrioritySampler(default={self.default_rate}, high={self.high_priority_rate})"
class AdaptiveSampler(Sampler):
"""Adaptive sampler that adjusts rate based on traffic volume."""
def __init__(
self,
target_traces_per_second: float = 10.0,
min_rate: float = 0.001,
max_rate: float = 1.0,
):
self.target_tps = target_traces_per_second
self.min_rate = min_rate
self.max_rate = max_rate
self._request_count = 0
self._last_adjustment = time.time()
self._current_rate = max_rate
self._lock = threading.Lock()
def should_sample(
self,
parent_context,
trace_id: int,
name: str,
kind: SpanKind = None,
attributes: Attributes = None,
links: list[Link] = None,
) -> SamplingResult:
with self._lock:
self._request_count += 1
self._maybe_adjust_rate()
sampler = TraceIdRatioBased(self._current_rate)
return sampler.should_sample(
parent_context, trace_id, name, kind, attributes, links
)
def _maybe_adjust_rate(self):
"""Adjust sampling rate based on recent traffic."""
now = time.time()
elapsed = now - self._last_adjustment
if elapsed >= 1.0:
actual_tps = self._request_count / elapsed
if actual_tps > 0:
self._current_rate = min(
self.max_rate,
max(self.min_rate, self.target_tps / actual_tps)
)
self._request_count = 0
self._last_adjustment = now
def get_description(self) -> str:
return f"AdaptiveSampler(target_tps={self.target_tps})"
SAMPLING_CONFIG = {
"development": {
"type": "always_on",
"rate": 1.0,
},
"staging": {
"type": "ratio",
"rate": 0.5,
},
"production": {
"type": "adaptive",
"target_tps": 100,
"min_rate": 0.001,
"max_rate": 0.1,
},
}
Trace Sampling Strategies¶
Sampling Overview¶
┌─────────────────────────────────────────────────────────────────────┐
│ Trace Sampling Decision Tree │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Request Arrives │
│ │ │
│ ▼ │
│ ┌─────────────┐ Yes ┌─────────────┐ │
│ │ Error/High │ ────────► │ Always │ │
│ │ Priority? │ │ Sample │ │
│ └──────┬──────┘ └─────────────┘ │
│ │ No │
│ ▼ │
│ ┌─────────────┐ Yes ┌─────────────┐ │
│ │ Debug/Test │ ────────► │ Sample │ │
│ │ Header? │ │ 100% │ │
│ └──────┬──────┘ └─────────────┘ │
│ │ No │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Apply Rate │ ────────► │ Probabilis- │ │
│ │ Limiting │ │ tic Sample │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Sampling Strategy Comparison¶
┌──────────────────┬────────────────┬────────────────┬───────────────┐
│ Strategy │ Use Case │ Pros │ Cons │
├──────────────────┼────────────────┼────────────────┼───────────────┤
│ Always On │ Development │ Full visibility│ High overhead │
│ (100%) │ Debugging │ Easy debugging │ Not scalable │
├──────────────────┼────────────────┼────────────────┼───────────────┤
│ Ratio-Based │ Staging │ Predictable │ May miss edge │
│ (fixed %) │ Low traffic │ cost control │ cases │
├──────────────────┼────────────────┼────────────────┼───────────────┤
│ Rate-Limited │ Production │ Cost control │ May miss │
│ (traces/sec) │ High traffic │ Consistent │ bursts │
├──────────────────┼────────────────┼────────────────┼───────────────┤
│ Adaptive │ Variable │ Self-adjusting │ Complex to │
│ (dynamic) │ traffic │ Cost efficient │ configure │
├──────────────────┼────────────────┼────────────────┼───────────────┤
│ Priority-Based │ Mixed │ Important │ Requires │
│ (rules) │ workloads │ traces kept │ maintenance │
└──────────────────┴────────────────┴────────────────┴───────────────┘
Head-Based Sampling¶
Decision made at trace start:
# src/observability/sampling/head_based.py
from opentelemetry.sdk.trace.sampling import (
Sampler,
SamplingResult,
Decision,
ParentBased,
TraceIdRatioBased,
)
from opentelemetry.trace import Link, SpanKind
from opentelemetry.util.types import Attributes
from typing import Optional, Sequence
import hashlib
class PriorityBasedSampler(Sampler):
"""
Head-based sampler with priority rules.
Samples traces based on configurable priority rules:
- Error/exception traces: Always sampled
- Slow requests: Always sampled
- High-value operations: Higher sample rate
- Normal operations: Base sample rate
"""
def __init__(
self,
base_rate: float = 0.1,
high_priority_rate: float = 1.0,
slow_threshold_ms: float = 1000.0,
):
self.base_rate = base_rate
self.high_priority_rate = high_priority_rate
self.slow_threshold_ms = slow_threshold_ms
self._high_priority_operations = {
"payment.process",
"order.create",
"user.authenticate",
"checkout.complete",
}
def should_sample(
self,
parent_context: Optional["Context"],
trace_id: int,
name: str,
kind: Optional[SpanKind] = None,
attributes: Attributes = None,
links: Optional[Sequence[Link]] = None,
) -> SamplingResult:
attributes = attributes or {}
# Always sample errors
if attributes.get("error", False):
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes={"sampling.reason": "error"},
)
# Always sample high-priority operations
if name in self._high_priority_operations:
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes={"sampling.reason": "high_priority"},
)
# Always sample debug requests
if attributes.get("debug", False):
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes={"sampling.reason": "debug"},
)
# Apply probabilistic sampling for normal requests
trace_id_bytes = trace_id.to_bytes(16, byteorder="big")
hash_value = int(hashlib.md5(trace_id_bytes).hexdigest(), 16)
threshold = int(self.base_rate * (2**128 - 1))
if hash_value < threshold:
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes={"sampling.reason": "probabilistic"},
)
return SamplingResult(decision=Decision.DROP)
def get_description(self) -> str:
return f"PriorityBasedSampler(base_rate={self.base_rate})"
def create_parent_based_sampler(
root_sampler: Sampler,
remote_parent_sampled: Optional[Sampler] = None,
remote_parent_not_sampled: Optional[Sampler] = None,
) -> ParentBased:
"""
Create a parent-based sampler that respects parent sampling decisions.
This ensures trace continuity across service boundaries.
"""
return ParentBased(
root=root_sampler,
remote_parent_sampled=remote_parent_sampled or TraceIdRatioBased(1.0),
remote_parent_not_sampled=remote_parent_not_sampled or TraceIdRatioBased(0.0),
)
Tail-Based Sampling¶
Decision made after trace completion:
# src/observability/sampling/tail_based.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import time
import threading
from collections import defaultdict
class SamplingDecision(Enum):
SAMPLE = "sample"
DROP = "drop"
PENDING = "pending"
@dataclass
class SpanData:
"""Represents a span waiting for sampling decision."""
trace_id: str
span_id: str
parent_span_id: Optional[str]
name: str
start_time: float
end_time: Optional[float] = None
status: str = "OK"
attributes: Dict[str, Any] = field(default_factory=dict)
events: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class TraceBuffer:
"""Buffer for collecting spans before sampling decision."""
trace_id: str
spans: List[SpanData] = field(default_factory=list)
created_at: float = field(default_factory=time.time)
decision: SamplingDecision = SamplingDecision.PENDING
class TailBasedSampler:
"""
Tail-based sampler that makes decisions after trace completion.
Benefits:
- Can sample based on full trace context
- Captures error traces that started normally
- Better for debugging intermittent issues
Tradeoffs:
- Higher memory usage (must buffer spans)
- Higher latency (wait for trace completion)
- More complex implementation
"""
def __init__(
self,
buffer_timeout_seconds: float = 30.0,
max_traces_in_buffer: int = 10000,
base_sample_rate: float = 0.1,
error_sample_rate: float = 1.0,
slow_trace_threshold_ms: float = 5000.0,
):
self.buffer_timeout = buffer_timeout_seconds
self.max_traces = max_traces_in_buffer
self.base_rate = base_sample_rate
self.error_rate = error_sample_rate
self.slow_threshold = slow_trace_threshold_ms
self._trace_buffers: Dict[str, TraceBuffer] = {}
self._lock = threading.Lock()
# Start cleanup thread
self._cleanup_thread = threading.Thread(
target=self._cleanup_expired_traces,
daemon=True,
)
self._cleanup_thread.start()
def add_span(self, span: SpanData) -> None:
"""Add a span to the trace buffer."""
with self._lock:
if span.trace_id not in self._trace_buffers:
if len(self._trace_buffers) >= self.max_traces:
self._evict_oldest_trace()
self._trace_buffers[span.trace_id] = TraceBuffer(
trace_id=span.trace_id
)
self._trace_buffers[span.trace_id].spans.append(span)
# Check if trace is complete (root span ended)
if self._is_trace_complete(span.trace_id):
self._make_sampling_decision(span.trace_id)
def _is_trace_complete(self, trace_id: str) -> bool:
"""Check if all spans in trace have ended."""
buffer = self._trace_buffers.get(trace_id)
if not buffer:
return False
# Find root span (no parent)
root_spans = [s for s in buffer.spans if s.parent_span_id is None]
if not root_spans:
return False
return all(s.end_time is not None for s in root_spans)
def _make_sampling_decision(self, trace_id: str) -> SamplingDecision:
"""Make final sampling decision for a complete trace."""
buffer = self._trace_buffers.get(trace_id)
if not buffer:
return SamplingDecision.DROP
# Always sample traces with errors
has_error = any(
s.status == "ERROR" or s.attributes.get("error", False)
for s in buffer.spans
)
if has_error:
buffer.decision = SamplingDecision.SAMPLE
self._export_trace(buffer)
return SamplingDecision.SAMPLE
# Always sample slow traces
trace_duration = self._calculate_trace_duration(buffer)
if trace_duration > self.slow_threshold:
buffer.decision = SamplingDecision.SAMPLE
self._export_trace(buffer)
return SamplingDecision.SAMPLE
# Apply probabilistic sampling
import random
if random.random() < self.base_rate:
buffer.decision = SamplingDecision.SAMPLE
self._export_trace(buffer)
return SamplingDecision.SAMPLE
buffer.decision = SamplingDecision.DROP
del self._trace_buffers[trace_id]
return SamplingDecision.DROP
def _calculate_trace_duration(self, buffer: TraceBuffer) -> float:
"""Calculate total trace duration in milliseconds."""
if not buffer.spans:
return 0.0
start_times = [s.start_time for s in buffer.spans]
end_times = [s.end_time for s in buffer.spans if s.end_time]
if not end_times:
return 0.0
return (max(end_times) - min(start_times)) * 1000
def _export_trace(self, buffer: TraceBuffer) -> None:
"""Export sampled trace to backend."""
# Implementation depends on export destination
# This is where you would send to Jaeger, Zipkin, etc.
pass
def _evict_oldest_trace(self) -> None:
"""Evict oldest trace when buffer is full."""
if not self._trace_buffers:
return
oldest_id = min(
self._trace_buffers.keys(),
key=lambda tid: self._trace_buffers[tid].created_at,
)
del self._trace_buffers[oldest_id]
def _cleanup_expired_traces(self) -> None:
"""Background thread to clean up expired trace buffers."""
while True:
time.sleep(5.0)
now = time.time()
with self._lock:
expired = [
tid for tid, buf in self._trace_buffers.items()
if now - buf.created_at > self.buffer_timeout
]
for tid in expired:
# Make decision on expired traces
self._make_sampling_decision(tid)
Sampling Configuration by Environment¶
# config/sampling.yaml
environments:
development:
strategy: always_on
rate: 1.0
description: "Sample all traces for debugging"
staging:
strategy: ratio_based
rate: 0.5
priority_rules:
- name: errors
condition: "status == ERROR"
rate: 1.0
- name: slow_requests
condition: "duration > 2000ms"
rate: 1.0
- name: auth_operations
condition: "operation.name contains 'auth'"
rate: 1.0
description: "50% base rate with priority overrides"
production:
strategy: adaptive
target_traces_per_second: 100
min_rate: 0.001
max_rate: 0.1
priority_rules:
- name: errors
condition: "status == ERROR"
rate: 1.0
- name: slow_requests
condition: "duration > 5000ms"
rate: 1.0
- name: payments
condition: "operation.name starts_with 'payment'"
rate: 0.5
- name: health_checks
condition: "operation.name == 'health_check'"
rate: 0.0
tail_sampling:
enabled: true
buffer_timeout_seconds: 30
policies:
- name: error_traces
type: status_code
status_codes: [ERROR]
sample_rate: 1.0
- name: latency_traces
type: latency
threshold_ms: 3000
sample_rate: 1.0
description: "Adaptive rate with tail sampling for errors"
OpenTelemetry Collector Sampling¶
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
# Probabilistic sampling
probabilistic_sampler:
hash_seed: 22
sampling_percentage: 10
# Tail-based sampling
tail_sampling:
decision_wait: 10s
num_traces: 100000
expected_new_traces_per_sec: 1000
policies:
# Always sample error traces
- name: error-policy
type: status_code
status_code:
status_codes: [ERROR]
# Always sample slow traces
- name: latency-policy
type: latency
latency:
threshold_ms: 5000
# Sample specific operations at higher rate
- name: high-value-policy
type: string_attribute
string_attribute:
key: http.route
values:
- /api/v1/checkout
- /api/v1/payment
enabled_regex_matching: true
# Rate limit normal traces
- name: rate-limiting-policy
type: rate_limiting
rate_limiting:
spans_per_second: 500
# Probabilistic fallback
- name: probabilistic-policy
type: probabilistic
probabilistic:
sampling_percentage: 10
exporters:
jaeger:
endpoint: jaeger:14250
tls:
insecure: true
prometheus:
endpoint: "0.0.0.0:8889"
service:
pipelines:
traces:
receivers: [otlp]
processors: [tail_sampling]
exporters: [jaeger]
metrics:
receivers: [otlp]
processors: []
exporters: [prometheus]
Structured Logging¶
Python (structlog)¶
Installation:
pip install structlog python-json-logger
Logger configuration:
# src/observability/logging.py
import logging
import sys
from typing import Any, Dict, Optional
import structlog
from structlog.types import EventDict, Processor
def add_trace_context(
logger: logging.Logger,
method_name: str,
event_dict: EventDict
) -> EventDict:
"""Add OpenTelemetry trace context to log events."""
from opentelemetry import trace
span = trace.get_current_span()
if span.is_recording():
ctx = span.get_span_context()
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
event_dict["trace_flags"] = ctx.trace_flags
return event_dict
def add_service_context(
logger: logging.Logger,
method_name: str,
event_dict: EventDict
) -> EventDict:
"""Add service context to log events."""
import os
event_dict["service"] = os.getenv("SERVICE_NAME", "unknown")
event_dict["version"] = os.getenv("SERVICE_VERSION", "unknown")
event_dict["environment"] = os.getenv("ENVIRONMENT", "development")
event_dict["host"] = os.getenv("HOSTNAME", "unknown")
return event_dict
def configure_logging(
level: str = "INFO",
json_format: bool = True,
add_trace: bool = True,
) -> structlog.BoundLogger:
"""Configure structured logging with structlog."""
shared_processors: list[Processor] = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.UnicodeDecoder(),
add_service_context,
]
if add_trace:
shared_processors.append(add_trace_context)
if json_format:
renderer = structlog.processors.JSONRenderer()
else:
renderer = structlog.dev.ConsoleRenderer(colors=True)
structlog.configure(
processors=shared_processors + [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors,
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
renderer,
],
)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.handlers = []
root_logger.addHandler(handler)
root_logger.setLevel(getattr(logging, level.upper()))
for logger_name in ["uvicorn", "uvicorn.error", "uvicorn.access"]:
logging.getLogger(logger_name).handlers = []
logging.getLogger(logger_name).propagate = True
return structlog.get_logger()
logger = configure_logging(
level=os.getenv("LOG_LEVEL", "INFO"),
json_format=os.getenv("LOG_FORMAT", "json") == "json",
)
Structured logging usage:
# src/services/user_service.py
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars
logger = structlog.get_logger()
class UserService:
"""User service with structured logging."""
def login(self, email: str, password: str, request_context: dict) -> dict:
"""Handle user login with comprehensive logging."""
bind_contextvars(
request_id=request_context.get("request_id"),
client_ip=request_context.get("client_ip"),
user_agent=request_context.get("user_agent"),
)
logger.info(
"login_attempt",
email=email,
auth_method="password",
)
try:
user = self.repository.find_by_email(email)
if not user:
logger.warning(
"login_failed",
email=email,
reason="user_not_found",
)
raise AuthenticationError("Invalid credentials")
if not self.verify_password(password, user.password_hash):
logger.warning(
"login_failed",
email=email,
user_id=user.id,
reason="invalid_password",
failed_attempts=user.failed_login_attempts + 1,
)
self._record_failed_attempt(user)
raise AuthenticationError("Invalid credentials")
if not user.is_active:
logger.warning(
"login_failed",
email=email,
user_id=user.id,
reason="account_inactive",
)
raise AuthenticationError("Account is inactive")
token = self.create_token(user)
logger.info(
"login_success",
user_id=user.id,
email=user.email,
role=user.role,
mfa_enabled=user.mfa_enabled,
)
return {
"user_id": user.id,
"token": token,
"expires_in": 3600,
}
except AuthenticationError:
raise
except Exception as e:
logger.error(
"login_error",
email=email,
error_type=type(e).__name__,
error_message=str(e),
exc_info=True,
)
raise
finally:
clear_contextvars()
def create_user(self, user_data: dict) -> User:
"""Create a new user with audit logging."""
logger.info(
"user_creation_started",
email=user_data.get("email"),
role=user_data.get("role", "user"),
)
try:
if self.repository.find_by_email(user_data["email"]):
logger.warning(
"user_creation_failed",
email=user_data["email"],
reason="email_exists",
)
raise ValidationError("Email already registered")
user = User(
email=user_data["email"],
name=user_data["name"],
role=user_data.get("role", "user"),
)
user.set_password(user_data["password"])
self.repository.save(user)
logger.info(
"user_created",
user_id=user.id,
email=user.email,
role=user.role,
)
return user
except ValidationError:
raise
except Exception as e:
logger.error(
"user_creation_error",
email=user_data.get("email"),
error_type=type(e).__name__,
error_message=str(e),
exc_info=True,
)
raise
Context manager for request logging:
# src/middleware/logging_middleware.py
import time
import uuid
from contextlib import contextmanager
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars
logger = structlog.get_logger()
@contextmanager
def request_logging_context(request):
"""Context manager for request-scoped logging."""
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
start_time = time.time()
bind_contextvars(
request_id=request_id,
method=request.method,
path=request.path,
client_ip=request.remote_addr,
user_agent=request.headers.get("User-Agent", ""),
)
logger.info("request_started")
try:
yield request_id
except Exception as e:
logger.error(
"request_error",
error_type=type(e).__name__,
error_message=str(e),
exc_info=True,
)
raise
finally:
duration_ms = (time.time() - start_time) * 1000
logger.info(
"request_completed",
duration_ms=round(duration_ms, 2),
)
clear_contextvars()
def logging_middleware(app):
"""Flask middleware for request logging."""
@app.before_request
def before_request():
request.start_time = time.time()
request.request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
bind_contextvars(
request_id=request.request_id,
method=request.method,
path=request.path,
client_ip=request.remote_addr,
)
logger.info("request_started")
@app.after_request
def after_request(response):
duration_ms = (time.time() - request.start_time) * 1000
logger.info(
"request_completed",
status_code=response.status_code,
duration_ms=round(duration_ms, 2),
response_size=response.content_length,
)
response.headers["X-Request-ID"] = request.request_id
clear_contextvars()
return response
return app
TypeScript (pino)¶
Installation:
npm install pino pino-pretty pino-http
Logger configuration:
// src/observability/logging.ts
import pino, { Logger, LoggerOptions } from 'pino';
import { trace, context } from '@opentelemetry/api';
interface ServiceContext {
service: string;
version: string;
environment: string;
host: string;
}
function getServiceContext(): ServiceContext {
return {
service: process.env.SERVICE_NAME || 'unknown',
version: process.env.SERVICE_VERSION || 'unknown',
environment: process.env.NODE_ENV || 'development',
host: process.env.HOSTNAME || 'unknown',
};
}
function getTraceContext(): Record<string, string> | undefined {
const span = trace.getSpan(context.active());
if (!span) return undefined;
const spanContext = span.spanContext();
return {
trace_id: spanContext.traceId,
span_id: spanContext.spanId,
trace_flags: spanContext.traceFlags.toString(),
};
}
export function createLogger(name: string): Logger {
const isDevelopment = process.env.NODE_ENV === 'development';
const options: LoggerOptions = {
name,
level: process.env.LOG_LEVEL || 'info',
base: getServiceContext(),
timestamp: pino.stdTimeFunctions.isoTime,
formatters: {
level: (label) => ({ level: label }),
},
mixin: () => {
const traceContext = getTraceContext();
return traceContext ? { ...traceContext } : {};
},
};
if (isDevelopment) {
return pino({
...options,
transport: {
target: 'pino-pretty',
options: {
colorize: true,
translateTime: 'SYS:standard',
ignore: 'pid,hostname',
},
},
});
}
return pino(options);
}
export const logger = createLogger('app');
Structured logging usage:
// src/services/user.service.ts
import { createLogger } from '../observability/logging';
const logger = createLogger('UserService');
interface LoginContext {
requestId: string;
clientIp: string;
userAgent: string;
}
export class UserService {
async login(email: string, password: string, context: LoginContext): Promise<LoginResult> {
const childLogger = logger.child({
request_id: context.requestId,
client_ip: context.clientIp,
user_agent: context.userAgent,
});
childLogger.info({ email, auth_method: 'password' }, 'login_attempt');
try {
const user = await this.repository.findByEmail(email);
if (!user) {
childLogger.warn({ email, reason: 'user_not_found' }, 'login_failed');
throw new AuthenticationError('Invalid credentials');
}
const passwordValid = await this.verifyPassword(password, user.passwordHash);
if (!passwordValid) {
childLogger.warn(
{
email,
user_id: user.id,
reason: 'invalid_password',
failed_attempts: user.failedLoginAttempts + 1,
},
'login_failed'
);
await this.recordFailedAttempt(user);
throw new AuthenticationError('Invalid credentials');
}
if (!user.isActive) {
childLogger.warn(
{ email, user_id: user.id, reason: 'account_inactive' },
'login_failed'
);
throw new AuthenticationError('Account is inactive');
}
const token = await this.createToken(user);
childLogger.info(
{
user_id: user.id,
email: user.email,
role: user.role,
mfa_enabled: user.mfaEnabled,
},
'login_success'
);
return { userId: user.id, token, expiresIn: 3600 };
} catch (error) {
if (error instanceof AuthenticationError) {
throw error;
}
childLogger.error(
{
email,
error_type: error.constructor.name,
error_message: error.message,
stack: error.stack,
},
'login_error'
);
throw error;
}
}
async createUser(userData: CreateUserDto): Promise<User> {
logger.info(
{ email: userData.email, role: userData.role || 'user' },
'user_creation_started'
);
try {
const existingUser = await this.repository.findByEmail(userData.email);
if (existingUser) {
logger.warn({ email: userData.email, reason: 'email_exists' }, 'user_creation_failed');
throw new ValidationError('Email already registered');
}
const user = new User({
email: userData.email,
name: userData.name,
role: userData.role || 'user',
});
await user.setPassword(userData.password);
await this.repository.save(user);
logger.info(
{ user_id: user.id, email: user.email, role: user.role },
'user_created'
);
return user;
} catch (error) {
if (error instanceof ValidationError) {
throw error;
}
logger.error(
{
email: userData.email,
error_type: error.constructor.name,
error_message: error.message,
stack: error.stack,
},
'user_creation_error'
);
throw error;
}
}
}
HTTP request logging middleware:
// src/middleware/logging.middleware.ts
import pinoHttp from 'pino-http';
import { createLogger } from '../observability/logging';
import { v4 as uuidv4 } from 'uuid';
import { Request, Response, NextFunction } from 'express';
const logger = createLogger('http');
export const httpLoggingMiddleware = pinoHttp({
logger,
genReqId: (req) => req.headers['x-request-id'] || uuidv4(),
customProps: (req) => ({
request_id: req.id,
}),
customLogLevel: (req, res, err) => {
if (res.statusCode >= 500 || err) return 'error';
if (res.statusCode >= 400) return 'warn';
return 'info';
},
customSuccessMessage: (req, res) => {
return `${req.method} ${req.url} completed`;
},
customErrorMessage: (req, res, err) => {
return `${req.method} ${req.url} failed: ${err.message}`;
},
customAttributeKeys: {
req: 'request',
res: 'response',
err: 'error',
responseTime: 'duration_ms',
},
serializers: {
req: (req) => ({
method: req.method,
url: req.url,
path: req.path,
query: req.query,
headers: {
'user-agent': req.headers['user-agent'],
'content-type': req.headers['content-type'],
host: req.headers.host,
},
}),
res: (res) => ({
status_code: res.statusCode,
headers: {
'content-type': res.getHeader('content-type'),
'content-length': res.getHeader('content-length'),
},
}),
},
});
export function requestIdMiddleware(req: Request, res: Response, next: NextFunction): void {
const requestId = (req.headers['x-request-id'] as string) || uuidv4();
req.id = requestId;
res.setHeader('X-Request-ID', requestId);
next();
}
Log Level Standards¶
# Log level usage standards
LOG_LEVELS = {
"DEBUG": {
"description": "Detailed diagnostic information for debugging",
"examples": [
"Variable values during execution",
"Function entry/exit points",
"Cache hits/misses",
"Query parameters",
],
"production": False,
},
"INFO": {
"description": "General operational events",
"examples": [
"Request started/completed",
"User login/logout",
"Background job started/completed",
"Configuration loaded",
],
"production": True,
},
"WARNING": {
"description": "Unexpected but handled situations",
"examples": [
"Deprecated API usage",
"Retry after transient failure",
"Rate limit approaching",
"Authentication failure",
],
"production": True,
},
"ERROR": {
"description": "Error conditions that should be investigated",
"examples": [
"Unhandled exceptions",
"External service failures",
"Database connection errors",
"Invalid data received",
],
"production": True,
},
"CRITICAL": {
"description": "System-wide failures requiring immediate attention",
"examples": [
"Application startup failure",
"Data corruption detected",
"Security breach detected",
"Critical resource exhausted",
],
"production": True,
},
}
Log Field Standards¶
# Standard log field naming conventions
STANDARD_LOG_FIELDS = {
# Request context
"request_id": "Unique request identifier (UUID)",
"trace_id": "Distributed trace ID",
"span_id": "Current span ID",
"parent_span_id": "Parent span ID",
# HTTP context
"http.method": "HTTP method (GET, POST, etc.)",
"http.url": "Full request URL",
"http.path": "Request path",
"http.status_code": "Response status code",
"http.duration_ms": "Request duration in milliseconds",
"http.client_ip": "Client IP address",
"http.user_agent": "User agent string",
# User context
"user.id": "User identifier",
"user.email": "User email (if allowed by policy)",
"user.role": "User role or permission level",
# Service context
"service.name": "Service name",
"service.version": "Service version",
"service.environment": "Deployment environment",
"service.host": "Host name or container ID",
# Error context
"error.type": "Exception class name",
"error.message": "Error message",
"error.stack": "Stack trace",
"error.code": "Application error code",
# Business context
"order.id": "Order identifier",
"payment.id": "Payment identifier",
"product.id": "Product identifier",
"transaction.id": "Transaction identifier",
# Performance context
"db.query_time_ms": "Database query time",
"cache.hit": "Cache hit/miss boolean",
"external.service": "External service name",
"external.duration_ms": "External call duration",
}
LOG_FIELD_EXAMPLES = {
"login_success": {
"event": "login_success",
"level": "info",
"timestamp": "2024-01-15T10:30:00.000Z",
"request_id": "550e8400-e29b-41d4-a716-446655440000",
"trace_id": "abcd1234567890abcdef1234567890ab",
"span_id": "1234567890abcdef",
"service": "auth-service",
"version": "1.2.3",
"environment": "production",
"user.id": "usr_123456",
"user.email": "user@example.com",
"user.role": "admin",
"http.client_ip": "192.168.1.100",
"http.user_agent": "Mozilla/5.0...",
"auth_method": "password",
"mfa_enabled": True,
},
"request_completed": {
"event": "request_completed",
"level": "info",
"timestamp": "2024-01-15T10:30:00.500Z",
"request_id": "550e8400-e29b-41d4-a716-446655440000",
"trace_id": "abcd1234567890abcdef1234567890ab",
"http.method": "POST",
"http.path": "/api/orders",
"http.status_code": 201,
"http.duration_ms": 245.5,
"db.query_count": 3,
"db.total_time_ms": 45.2,
},
}
Log Aggregation¶
ELK Stack Configuration¶
Filebeat configuration:
# filebeat.yml
filebeat.inputs:
- type: container
paths:
- '/var/lib/docker/containers/*/*.log'
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: '/var/lib/docker/containers/'
- type: log
paths:
- '/var/log/app/*.log'
json.keys_under_root: true
json.add_error_key: true
json.message_key: message
processors:
- decode_json_fields:
fields: ['message']
target: ''
overwrite_keys: true
add_error_key: true
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_fields:
target: ''
fields:
environment: ${ENVIRONMENT:development}
output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:localhost:9200}']
index: 'logs-%{[service.name]}-%{+yyyy.MM.dd}'
pipeline: 'logs-pipeline'
setup.template:
name: 'logs'
pattern: 'logs-*'
settings:
index.number_of_shards: 1
index.number_of_replicas: 1
setup.ilm:
enabled: true
rollover_alias: 'logs'
pattern: '{now/d}-000001'
policy_name: 'logs-policy'
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
Elasticsearch index template:
{
"index_patterns": ["logs-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.lifecycle.name": "logs-policy",
"index.lifecycle.rollover_alias": "logs"
},
"mappings": {
"dynamic_templates": [
{
"strings_as_keywords": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword",
"ignore_above": 1024
}
}
}
],
"properties": {
"@timestamp": { "type": "date" },
"level": { "type": "keyword" },
"event": { "type": "keyword" },
"message": { "type": "text" },
"service": { "type": "keyword" },
"version": { "type": "keyword" },
"environment": { "type": "keyword" },
"host": { "type": "keyword" },
"request_id": { "type": "keyword" },
"trace_id": { "type": "keyword" },
"span_id": { "type": "keyword" },
"user": {
"properties": {
"id": { "type": "keyword" },
"email": { "type": "keyword" },
"role": { "type": "keyword" }
}
},
"http": {
"properties": {
"method": { "type": "keyword" },
"path": { "type": "keyword" },
"status_code": { "type": "integer" },
"duration_ms": { "type": "float" },
"client_ip": { "type": "ip" }
}
},
"error": {
"properties": {
"type": { "type": "keyword" },
"message": { "type": "text" },
"stack": { "type": "text" }
}
}
}
}
}
}
ILM Policy:
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_age": "1d",
"max_primary_shard_size": "50gb"
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"set_priority": {
"priority": 0
},
"freeze": {}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
Grafana Loki Configuration¶
Promtail configuration:
# promtail-config.yml
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
tenant_id: default
batchwait: 1s
batchsize: 1048576
timeout: 10s
scrape_configs:
- job_name: containers
static_configs:
- targets:
- localhost
labels:
job: containerlogs
__path__: /var/lib/docker/containers/*/*log
pipeline_stages:
- json:
expressions:
output: log
stream: stream
timestamp: time
- json:
expressions:
level: level
event: event
service: service
trace_id: trace_id
span_id: span_id
request_id: request_id
source: output
- labels:
level:
service:
event:
- timestamp:
source: timestamp
format: RFC3339Nano
- output:
source: output
- job_name: kubernetes-pods
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels:
- __meta_kubernetes_pod_controller_name
regex: ([0-9a-z-.]+?)(-[0-9a-f]{8,10})?
action: replace
target_label: __tmp_controller_name
- source_labels:
- __meta_kubernetes_pod_label_app_kubernetes_io_name
- __meta_kubernetes_pod_label_app
- __tmp_controller_name
- __meta_kubernetes_pod_name
regex: ^;*([^;]+)(;.*)?$
action: replace
target_label: app
- source_labels:
- __meta_kubernetes_pod_annotation_kubernetes_io_config_hash
action: replace
target_label: __tmp_pod_label_hash
- source_labels:
- __meta_kubernetes_namespace
action: replace
target_label: namespace
- action: replace
replacement: /var/log/pods/*$1/*.log
separator: /
source_labels:
- __meta_kubernetes_pod_uid
- __meta_kubernetes_pod_container_name
target_label: __path__
pipeline_stages:
- cri: {}
- json:
expressions:
level: level
event: event
trace_id: trace_id
- labels:
level:
event:
LogQL query examples:
# Find all errors in the last hour
{job="containerlogs", level="error"} | json | line_format "{{.event}}: {{.message}}"
# Count errors by service
sum by (service) (count_over_time({level="error"}[1h]))
# Find slow requests (> 1000ms)
{job="containerlogs"} | json | http_duration_ms > 1000
# Trace a specific request
{job="containerlogs"} | json | request_id="550e8400-e29b-41d4-a716-446655440000"
# Find all logs for a trace
{job="containerlogs"} | json | trace_id="abcd1234567890abcdef1234567890ab"
# Error rate by service over time
sum by (service) (rate({level="error"}[5m]))
# Top 10 slowest endpoints
topk(10, avg by (http_path) (
avg_over_time({job="containerlogs"} | json | unwrap http_duration_ms [1h])
))
# Login failures by reason
sum by (reason) (count_over_time({event="login_failed"}[1h]))
AWS CloudWatch Configuration¶
CloudWatch Logs agent configuration:
{
"agent": {
"metrics_collection_interval": 60,
"run_as_user": "cwagent"
},
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/var/log/app/*.log",
"log_group_name": "/app/${ENVIRONMENT}/${SERVICE_NAME}",
"log_stream_name": "{instance_id}",
"timezone": "UTC",
"timestamp_format": "%Y-%m-%dT%H:%M:%S.%fZ",
"multi_line_start_pattern": "{",
"encoding": "utf-8"
}
]
}
},
"log_stream_name": "default",
"force_flush_interval": 5
}
}
CloudWatch Logs Insights queries:
-- Find all errors in the last hour
fields @timestamp, @message, service, error.type, error.message
| filter level = "error"
| sort @timestamp desc
| limit 100
-- Count errors by service
stats count(*) as error_count by service
| filter level = "error"
| sort error_count desc
-- Find slow requests
fields @timestamp, http.path, http.duration_ms, trace_id
| filter http.duration_ms > 1000
| sort http.duration_ms desc
| limit 50
-- Trace a specific request
fields @timestamp, @message
| filter request_id = "550e8400-e29b-41d4-a716-446655440000"
| sort @timestamp asc
-- Error rate over time (5 minute buckets)
stats count(*) as total, sum(level = "error") as errors by bin(5m) as time_bucket
| sort time_bucket asc
-- P99 latency by endpoint
stats percentile(http.duration_ms, 99) as p99_latency by http.path
| sort p99_latency desc
| limit 20
-- Login failures by reason
stats count(*) as failures by reason
| filter event = "login_failed"
| sort failures desc
-- User activity timeline
fields @timestamp, event, user.id, http.path
| filter user.id = "usr_123456"
| sort @timestamp desc
| limit 100
Terraform for CloudWatch Logs:
# cloudwatch.tf
resource "aws_cloudwatch_log_group" "app_logs" {
name = "/app/${var.environment}/${var.service_name}"
retention_in_days = var.log_retention_days
tags = {
Environment = var.environment
Service = var.service_name
}
}
resource "aws_cloudwatch_log_metric_filter" "error_count" {
name = "${var.service_name}-error-count"
pattern = "{ $.level = \"error\" }"
log_group_name = aws_cloudwatch_log_group.app_logs.name
metric_transformation {
name = "ErrorCount"
namespace = "App/${var.service_name}"
value = "1"
default_value = "0"
dimensions = {
Service = "$.service"
}
}
}
resource "aws_cloudwatch_log_metric_filter" "request_latency" {
name = "${var.service_name}-request-latency"
pattern = "{ $.event = \"request_completed\" }"
log_group_name = aws_cloudwatch_log_group.app_logs.name
metric_transformation {
name = "RequestLatency"
namespace = "App/${var.service_name}"
value = "$.http.duration_ms"
unit = "Milliseconds"
}
}
resource "aws_cloudwatch_metric_alarm" "high_error_rate" {
alarm_name = "${var.service_name}-high-error-rate"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "ErrorCount"
namespace = "App/${var.service_name}"
period = 300
statistic = "Sum"
threshold = 10
alarm_description = "High error rate detected"
alarm_actions = [aws_sns_topic.alerts.arn]
ok_actions = [aws_sns_topic.alerts.arn]
dimensions = {
Service = var.service_name
}
}
resource "aws_cloudwatch_metric_alarm" "high_latency" {
alarm_name = "${var.service_name}-high-latency"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "RequestLatency"
namespace = "App/${var.service_name}"
period = 300
extended_statistic = "p99"
threshold = 1000
alarm_description = "P99 latency exceeded 1 second"
alarm_actions = [aws_sns_topic.alerts.arn]
}
Correlation and Context¶
Correlation ID Implementation¶
Python middleware:
# src/middleware/correlation.py
import uuid
from contextvars import ContextVar
from functools import wraps
from typing import Callable, Optional
from opentelemetry import trace
from opentelemetry.propagate import extract, inject
correlation_id_var: ContextVar[Optional[str]] = ContextVar(
"correlation_id", default=None
)
def get_correlation_id() -> Optional[str]:
"""Get current correlation ID."""
return correlation_id_var.get()
def set_correlation_id(correlation_id: str) -> None:
"""Set correlation ID for current context."""
correlation_id_var.set(correlation_id)
class CorrelationMiddleware:
"""Middleware to handle correlation ID propagation."""
HEADER_NAME = "X-Correlation-ID"
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
headers = dict(scope.get("headers", []))
correlation_id = headers.get(
self.HEADER_NAME.lower().encode(),
str(uuid.uuid4()).encode()
).decode()
set_correlation_id(correlation_id)
span = trace.get_current_span()
if span.is_recording():
span.set_attribute("correlation_id", correlation_id)
async def send_wrapper(message):
if message["type"] == "http.response.start":
headers = list(message.get("headers", []))
headers.append(
(self.HEADER_NAME.lower().encode(), correlation_id.encode())
)
message["headers"] = headers
await send(message)
await self.app(scope, receive, send_wrapper)
def propagate_correlation(func: Callable) -> Callable:
"""Decorator to propagate correlation ID to async tasks."""
@wraps(func)
async def wrapper(*args, **kwargs):
correlation_id = get_correlation_id()
if correlation_id:
set_correlation_id(correlation_id)
return await func(*args, **kwargs)
return wrapper
class CorrelatedHttpClient:
"""HTTP client that propagates correlation ID."""
def __init__(self, base_url: str):
self.base_url = base_url
self.session = None
async def request(
self,
method: str,
path: str,
**kwargs
) -> dict:
headers = kwargs.pop("headers", {})
correlation_id = get_correlation_id()
if correlation_id:
headers["X-Correlation-ID"] = correlation_id
inject(headers)
if self.session is None:
self.session = aiohttp.ClientSession()
async with self.session.request(
method,
f"{self.base_url}{path}",
headers=headers,
**kwargs
) as response:
return await response.json()
async def get(self, path: str, **kwargs) -> dict:
return await self.request("GET", path, **kwargs)
async def post(self, path: str, **kwargs) -> dict:
return await self.request("POST", path, **kwargs)
TypeScript middleware:
// src/middleware/correlation.ts
import { Request, Response, NextFunction } from 'express';
import { AsyncLocalStorage } from 'async_hooks';
import { v4 as uuidv4 } from 'uuid';
import { trace, context } from '@opentelemetry/api';
import axios, { AxiosInstance, InternalAxiosRequestConfig } from 'axios';
interface CorrelationContext {
correlationId: string;
requestId: string;
}
const correlationStorage = new AsyncLocalStorage<CorrelationContext>();
export function getCorrelationId(): string | undefined {
return correlationStorage.getStore()?.correlationId;
}
export function getRequestId(): string | undefined {
return correlationStorage.getStore()?.requestId;
}
export function correlationMiddleware(req: Request, res: Response, next: NextFunction): void {
const correlationId = (req.headers['x-correlation-id'] as string) || uuidv4();
const requestId = (req.headers['x-request-id'] as string) || uuidv4();
const correlationContext: CorrelationContext = {
correlationId,
requestId,
};
const span = trace.getSpan(context.active());
if (span) {
span.setAttribute('correlation_id', correlationId);
span.setAttribute('request_id', requestId);
}
res.setHeader('X-Correlation-ID', correlationId);
res.setHeader('X-Request-ID', requestId);
correlationStorage.run(correlationContext, () => {
next();
});
}
export function createCorrelatedAxiosClient(baseURL: string): AxiosInstance {
const client = axios.create({ baseURL });
client.interceptors.request.use((config: InternalAxiosRequestConfig) => {
const correlationId = getCorrelationId();
const requestId = getRequestId();
if (correlationId) {
config.headers.set('X-Correlation-ID', correlationId);
}
if (requestId) {
config.headers.set('X-Request-ID', requestId);
}
return config;
});
return client;
}
export function withCorrelation<T extends (...args: unknown[]) => Promise<unknown>>(
fn: T
): T {
return (async (...args: Parameters<T>) => {
const store = correlationStorage.getStore();
if (store) {
return correlationStorage.run(store, () => fn(...args));
}
return fn(...args);
}) as T;
}
Request Context Propagation¶
Python context manager:
# src/observability/context.py
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
import uuid
@dataclass
class RequestContext:
"""Request context for observability."""
request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
correlation_id: Optional[str] = None
trace_id: Optional[str] = None
span_id: Optional[str] = None
user_id: Optional[str] = None
tenant_id: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert context to dictionary."""
result = {
"request_id": self.request_id,
}
if self.correlation_id:
result["correlation_id"] = self.correlation_id
if self.trace_id:
result["trace_id"] = self.trace_id
if self.span_id:
result["span_id"] = self.span_id
if self.user_id:
result["user_id"] = self.user_id
if self.tenant_id:
result["tenant_id"] = self.tenant_id
if self.metadata:
result["metadata"] = self.metadata
return result
def to_headers(self) -> Dict[str, str]:
"""Convert context to HTTP headers."""
headers = {
"X-Request-ID": self.request_id,
}
if self.correlation_id:
headers["X-Correlation-ID"] = self.correlation_id
if self.user_id:
headers["X-User-ID"] = self.user_id
if self.tenant_id:
headers["X-Tenant-ID"] = self.tenant_id
return headers
_request_context: ContextVar[Optional[RequestContext]] = ContextVar(
"request_context", default=None
)
def get_request_context() -> Optional[RequestContext]:
"""Get current request context."""
return _request_context.get()
def set_request_context(context: RequestContext) -> None:
"""Set request context."""
_request_context.set(context)
class RequestContextManager:
"""Context manager for request context."""
def __init__(self, context: RequestContext):
self.context = context
self.token = None
def __enter__(self) -> RequestContext:
self.token = _request_context.set(self.context)
return self.context
def __exit__(self, *args):
_request_context.reset(self.token)
def extract_context_from_request(request) -> RequestContext:
"""Extract request context from HTTP request."""
from opentelemetry import trace
span = trace.get_current_span()
span_context = span.get_span_context() if span.is_recording() else None
return RequestContext(
request_id=request.headers.get("X-Request-ID", str(uuid.uuid4())),
correlation_id=request.headers.get("X-Correlation-ID"),
trace_id=format(span_context.trace_id, "032x") if span_context else None,
span_id=format(span_context.span_id, "016x") if span_context else None,
user_id=getattr(request, "user_id", None),
tenant_id=request.headers.get("X-Tenant-ID"),
)
Error Tracking¶
Sentry Integration (Python)¶
Installation:
pip install sentry-sdk[flask]
Configuration:
# src/observability/sentry.py
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.redis import RedisIntegration
def configure_sentry(
dsn: str,
environment: str,
release: str,
sample_rate: float = 1.0,
traces_sample_rate: float = 0.1,
):
"""Configure Sentry error tracking."""
sentry_sdk.init(
dsn=dsn,
environment=environment,
release=release,
sample_rate=sample_rate,
traces_sample_rate=traces_sample_rate,
integrations=[
FlaskIntegration(),
SqlalchemyIntegration(),
CeleryIntegration(),
RedisIntegration(),
],
before_send=before_send,
before_breadcrumb=before_breadcrumb,
send_default_pii=False,
attach_stacktrace=True,
max_breadcrumbs=50,
)
def before_send(event, hint):
"""Filter or modify events before sending to Sentry."""
if "exc_info" in hint:
exc_type, exc_value, tb = hint["exc_info"]
if isinstance(exc_value, (ValidationError, AuthenticationError)):
return None
if isinstance(exc_value, HTTPException) and exc_value.code < 500:
return None
if event.get("request", {}).get("url", "").endswith("/health"):
return None
from .context import get_request_context
ctx = get_request_context()
if ctx:
event.setdefault("tags", {}).update({
"correlation_id": ctx.correlation_id,
"tenant_id": ctx.tenant_id,
})
event.setdefault("extra", {}).update(ctx.to_dict())
return event
def before_breadcrumb(crumb, hint):
"""Filter breadcrumbs."""
if crumb.get("category") == "http" and "/health" in crumb.get("data", {}).get("url", ""):
return None
return crumb
def capture_exception_with_context(error: Exception, **extra):
"""Capture exception with additional context."""
from .context import get_request_context
with sentry_sdk.push_scope() as scope:
ctx = get_request_context()
if ctx:
scope.set_tag("correlation_id", ctx.correlation_id)
scope.set_tag("request_id", ctx.request_id)
if ctx.user_id:
scope.set_user({"id": ctx.user_id})
for key, value in extra.items():
scope.set_extra(key, value)
sentry_sdk.capture_exception(error)
Sentry Integration (TypeScript)¶
Installation:
npm install @sentry/node @sentry/tracing
Configuration:
// src/observability/sentry.ts
import * as Sentry from '@sentry/node';
import { ProfilingIntegration } from '@sentry/profiling-node';
import { Express } from 'express';
import { getCorrelationId, getRequestId } from './correlation';
interface SentryConfig {
dsn: string;
environment: string;
release: string;
sampleRate?: number;
tracesSampleRate?: number;
}
export function configureSentry(app: Express, config: SentryConfig): void {
Sentry.init({
dsn: config.dsn,
environment: config.environment,
release: config.release,
sampleRate: config.sampleRate ?? 1.0,
tracesSampleRate: config.tracesSampleRate ?? 0.1,
integrations: [
new Sentry.Integrations.Http({ tracing: true }),
new Sentry.Integrations.Express({ app }),
new ProfilingIntegration(),
],
profilesSampleRate: 0.1,
beforeSend: (event, hint) => beforeSend(event, hint),
beforeBreadcrumb: (breadcrumb) => beforeBreadcrumb(breadcrumb),
sendDefaultPii: false,
attachStacktrace: true,
maxBreadcrumbs: 50,
});
app.use(Sentry.Handlers.requestHandler());
app.use(Sentry.Handlers.tracingHandler());
}
function beforeSend(
event: Sentry.Event,
hint: Sentry.EventHint
): Sentry.Event | null {
const error = hint.originalException;
if (error instanceof ValidationError || error instanceof AuthenticationError) {
return null;
}
if (error instanceof HttpError && error.statusCode < 500) {
return null;
}
if (event.request?.url?.endsWith('/health')) {
return null;
}
const correlationId = getCorrelationId();
const requestId = getRequestId();
if (correlationId || requestId) {
event.tags = {
...event.tags,
correlation_id: correlationId,
request_id: requestId,
};
}
return event;
}
function beforeBreadcrumb(breadcrumb: Sentry.Breadcrumb): Sentry.Breadcrumb | null {
if (
breadcrumb.category === 'http' &&
breadcrumb.data?.url?.includes('/health')
) {
return null;
}
return breadcrumb;
}
export function captureExceptionWithContext(
error: Error,
extra: Record<string, unknown> = {}
): void {
Sentry.withScope((scope) => {
const correlationId = getCorrelationId();
const requestId = getRequestId();
if (correlationId) {
scope.setTag('correlation_id', correlationId);
}
if (requestId) {
scope.setTag('request_id', requestId);
}
Object.entries(extra).forEach(([key, value]) => {
scope.setExtra(key, value);
});
Sentry.captureException(error);
});
}
export const sentryErrorHandler = Sentry.Handlers.errorHandler();
Metrics Integration¶
Prometheus Metrics (Python)¶
Installation:
pip install prometheus-client
Metrics configuration:
# src/observability/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info, CollectorRegistry
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from functools import wraps
import time
registry = CollectorRegistry()
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
registry=registry,
)
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"HTTP request latency",
["method", "endpoint"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
registry=registry,
)
ACTIVE_REQUESTS = Gauge(
"http_requests_active",
"Active HTTP requests",
["method"],
registry=registry,
)
DB_QUERY_LATENCY = Histogram(
"db_query_duration_seconds",
"Database query latency",
["operation", "table"],
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0],
registry=registry,
)
CACHE_OPERATIONS = Counter(
"cache_operations_total",
"Cache operations",
["operation", "result"],
registry=registry,
)
ERROR_COUNT = Counter(
"errors_total",
"Total errors",
["type", "service"],
registry=registry,
)
SERVICE_INFO = Info(
"service",
"Service information",
registry=registry,
)
def set_service_info(name: str, version: str, environment: str):
"""Set service information."""
SERVICE_INFO.info({
"name": name,
"version": version,
"environment": environment,
})
def track_request_metrics(method: str, endpoint: str):
"""Decorator to track request metrics."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
ACTIVE_REQUESTS.labels(method=method).inc()
start_time = time.time()
try:
result = await func(*args, **kwargs)
status = getattr(result, "status_code", 200)
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=status,
).inc()
return result
except Exception as e:
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=500,
).inc()
ERROR_COUNT.labels(
type=type(e).__name__,
service="api",
).inc()
raise
finally:
duration = time.time() - start_time
REQUEST_LATENCY.labels(
method=method,
endpoint=endpoint,
).observe(duration)
ACTIVE_REQUESTS.labels(method=method).dec()
return wrapper
return decorator
def track_db_query(operation: str, table: str):
"""Context manager to track database query metrics."""
class QueryTimer:
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, *args):
duration = time.time() - self.start_time
DB_QUERY_LATENCY.labels(
operation=operation,
table=table,
).observe(duration)
return QueryTimer()
def track_cache_operation(operation: str, hit: bool):
"""Track cache operation."""
CACHE_OPERATIONS.labels(
operation=operation,
result="hit" if hit else "miss",
).inc()
def get_metrics():
"""Get metrics in Prometheus format."""
return generate_latest(registry), CONTENT_TYPE_LATEST
Metric Naming Conventions¶
Prometheus Metric Naming Rules¶
┌─────────────────────────────────────────────────────────────────────┐
│ Prometheus Metric Naming Format │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Format: <namespace>_<subsystem>_<name>_<unit> │
│ │
│ Examples: │
│ ├── http_server_requests_total │
│ ├── http_server_request_duration_seconds │
│ ├── database_connections_active │
│ ├── cache_hits_total │
│ └── queue_messages_pending │
│ │
│ Rules: │
│ ├── Use snake_case (lowercase with underscores) │
│ ├── Start with a letter (a-z) │
│ ├── Use only [a-zA-Z0-9_] │
│ ├── End with unit suffix (_seconds, _bytes, _total) │
│ └── Use _total suffix for counters │
│ │
└─────────────────────────────────────────────────────────────────────┘
Metric Type Guidelines¶
# src/observability/metrics/naming.py
"""
Prometheus metric naming conventions and examples.
@module prometheus_naming
@description Standards for Prometheus metric naming
@version 1.0.0
@author Tyler Dukes
"""
from prometheus_client import Counter, Histogram, Gauge, Summary, Info
# ============================================================================
# COUNTERS - Cumulative values that only increase
# ============================================================================
# Naming: <noun>_<verb>_total or <noun>_total
# Good - Clear noun + action + _total suffix
http_requests_total = Counter(
"http_requests_total",
"Total HTTP requests processed",
["method", "endpoint", "status_code"],
)
# Good - Specific subsystem prefix
database_queries_total = Counter(
"database_queries_total",
"Total database queries executed",
["operation", "table"],
)
# Good - Error counter
errors_total = Counter(
"errors_total",
"Total errors encountered",
["error_type", "service"],
)
# Bad - Missing _total suffix
# http_requests = Counter(...) # Don't do this
# Bad - Using verb form instead of noun
# request_count = Counter(...) # Don't do this
# ============================================================================
# GAUGES - Values that can increase or decrease
# ============================================================================
# Naming: <noun>_<state> or <noun>_<measurement>
# Good - Current state
http_requests_in_flight = Gauge(
"http_requests_in_flight",
"Number of HTTP requests currently being processed",
["method"],
)
# Good - Resource capacity
database_connections_active = Gauge(
"database_connections_active",
"Number of active database connections",
["pool_name"],
)
database_connections_max = Gauge(
"database_connections_max",
"Maximum database connections allowed",
["pool_name"],
)
# Good - Memory/resource usage
memory_usage_bytes = Gauge(
"memory_usage_bytes",
"Current memory usage in bytes",
["process"],
)
# Good - Queue depth
queue_messages_pending = Gauge(
"queue_messages_pending",
"Number of messages waiting in queue",
["queue_name"],
)
# ============================================================================
# HISTOGRAMS - Distributions of values
# ============================================================================
# Naming: <noun>_<unit> (generates _bucket, _sum, _count)
# Good - Duration in seconds
http_request_duration_seconds = Histogram(
"http_request_duration_seconds",
"HTTP request duration in seconds",
["method", "endpoint"],
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
# Good - Size in bytes
http_request_size_bytes = Histogram(
"http_request_size_bytes",
"HTTP request body size in bytes",
["method", "endpoint"],
buckets=[100, 1000, 10000, 100000, 1000000, 10000000],
)
# Good - Database query latency
database_query_duration_seconds = Histogram(
"database_query_duration_seconds",
"Database query duration in seconds",
["operation", "table"],
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0],
)
# ============================================================================
# SUMMARIES - Quantile distributions
# ============================================================================
# Naming: Same as histograms
# Good - Response time with quantiles
http_response_time_seconds = Summary(
"http_response_time_seconds",
"HTTP response time in seconds",
["method", "endpoint"],
)
# ============================================================================
# INFO - Static metadata
# ============================================================================
# Naming: <noun>_info
# Good - Application metadata
app_info = Info(
"app",
"Application information",
)
app_info.info({
"version": "1.2.3",
"environment": "production",
"commit": "abc123",
})
# Good - Build information
build_info = Info(
"build",
"Build information",
)
Unit Suffixes¶
┌────────────────┬─────────────────────┬─────────────────────────────┐
│ Unit │ Suffix │ Example │
├────────────────┼─────────────────────┼─────────────────────────────┤
│ Time │ _seconds │ request_duration_seconds │
│ Bytes │ _bytes │ response_size_bytes │
│ Ratio/Percent │ _ratio or _percent │ cache_hit_ratio │
│ Count (total) │ _total │ requests_total │
│ Temperature │ _celsius │ cpu_temperature_celsius │
│ Timestamp │ _timestamp_seconds │ last_success_timestamp_sec │
│ No unit │ (none) │ queue_length │
└────────────────┴─────────────────────┴─────────────────────────────┘
Naming Anti-Patterns¶
# ============================================================================
# ANTI-PATTERNS - Don't do these
# ============================================================================
# ❌ Bad: Inconsistent casing
# HttpRequestsTotal = Counter(...) # Use snake_case
# HTTP_REQUESTS_TOTAL = Counter(...) # Use lowercase
# ❌ Bad: Missing unit suffix
# request_duration = Histogram(...) # Use _seconds
# ❌ Bad: Wrong metric type
# request_count = Gauge(...) # Counters should use Counter type
# current_requests = Counter(...) # Gauges should use Gauge type
# ❌ Bad: Redundant labels
# requests_total with labels ["method", "http_method"] # Duplicated
# ❌ Bad: High-cardinality labels
# requests_total with labels ["user_id"] # Unbounded values
# ❌ Bad: Inconsistent naming across metrics
# http_requests_total vs http_request_duration_seconds
# ^^^^ plural ^^^^^^^ singular - pick one
# ❌ Bad: Using verbs instead of nouns
# process_requests_total # Use noun: requests_processed_total
# ❌ Bad: Abbreviations
# req_total # Use full words: requests_total
# db_conn # Use full words: database_connections
# ✅ Good: Consistent naming pattern
http_server_requests_total = Counter(...)
http_server_request_duration_seconds = Histogram(...)
http_server_request_size_bytes = Histogram(...)
http_server_response_size_bytes = Histogram(...)
Label and Tag Standards¶
Label Best Practices¶
┌─────────────────────────────────────────────────────────────────────┐
│ Label Cardinality Guidelines │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Low Cardinality (GOOD): │
│ ├── method: GET, POST, PUT, DELETE (~10 values) │
│ ├── status_code: 200, 201, 400, 404, 500 (~20 values) │
│ ├── environment: dev, staging, prod (3 values) │
│ └── service: api, worker, scheduler (~10 values) │
│ │
│ High Cardinality (BAD): │
│ ├── user_id: Unbounded (millions of values) │
│ ├── request_id: Unique per request (infinite) │
│ ├── email: Unbounded (millions of values) │
│ └── timestamp: Unique per event (infinite) │
│ │
│ Cardinality Impact: │
│ ├── 10 labels × 10 values = 100 time series │
│ ├── 10 labels × 100 values = 1,000 time series │
│ ├── 10 labels × 1000 values = 10,000 time series │
│ └── Memory usage grows linearly with time series count │
│ │
└─────────────────────────────────────────────────────────────────────┘
Standard Label Definitions¶
# src/observability/metrics/labels.py
"""
Standard label definitions for consistent metrics.
@module prometheus_labels
@description Standardized label names and values
@version 1.0.0
@author Tyler Dukes
"""
from enum import Enum
from typing import Dict, List, Optional
from dataclasses import dataclass
class HttpMethod(Enum):
"""Standard HTTP method labels."""
GET = "GET"
POST = "POST"
PUT = "PUT"
PATCH = "PATCH"
DELETE = "DELETE"
HEAD = "HEAD"
OPTIONS = "OPTIONS"
class HttpStatusCategory(Enum):
"""HTTP status code categories for reduced cardinality."""
SUCCESS = "2xx"
REDIRECT = "3xx"
CLIENT_ERROR = "4xx"
SERVER_ERROR = "5xx"
class Environment(Enum):
"""Standard environment labels."""
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
class DatabaseOperation(Enum):
"""Standard database operation labels."""
SELECT = "select"
INSERT = "insert"
UPDATE = "update"
DELETE = "delete"
TRANSACTION = "transaction"
class CacheOperation(Enum):
"""Standard cache operation labels."""
GET = "get"
SET = "set"
DELETE = "delete"
EXPIRE = "expire"
class CacheResult(Enum):
"""Standard cache result labels."""
HIT = "hit"
MISS = "miss"
ERROR = "error"
@dataclass
class LabelConfig:
"""Configuration for a metric label."""
name: str
description: str
allowed_values: Optional[List[str]] = None
max_cardinality: int = 100
# Standard label configurations
STANDARD_LABELS: Dict[str, LabelConfig] = {
"method": LabelConfig(
name="method",
description="HTTP method (GET, POST, PUT, DELETE, etc.)",
allowed_values=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"],
max_cardinality=10,
),
"status_code": LabelConfig(
name="status_code",
description="HTTP status code (200, 201, 400, 404, 500, etc.)",
max_cardinality=50,
),
"status_category": LabelConfig(
name="status_category",
description="HTTP status category (2xx, 3xx, 4xx, 5xx)",
allowed_values=["2xx", "3xx", "4xx", "5xx"],
max_cardinality=4,
),
"endpoint": LabelConfig(
name="endpoint",
description="API endpoint path (normalized, no IDs)",
max_cardinality=100,
),
"service": LabelConfig(
name="service",
description="Service name",
max_cardinality=50,
),
"environment": LabelConfig(
name="environment",
description="Deployment environment",
allowed_values=["development", "staging", "production"],
max_cardinality=5,
),
"error_type": LabelConfig(
name="error_type",
description="Error classification",
max_cardinality=50,
),
"database": LabelConfig(
name="database",
description="Database name",
max_cardinality=20,
),
"table": LabelConfig(
name="table",
description="Database table name",
max_cardinality=100,
),
"operation": LabelConfig(
name="operation",
description="Operation type (select, insert, update, delete)",
allowed_values=["select", "insert", "update", "delete", "transaction"],
max_cardinality=10,
),
"cache": LabelConfig(
name="cache",
description="Cache instance name",
max_cardinality=20,
),
"result": LabelConfig(
name="result",
description="Operation result (hit, miss, error)",
allowed_values=["hit", "miss", "error", "success", "failure"],
max_cardinality=10,
),
"queue": LabelConfig(
name="queue",
description="Message queue name",
max_cardinality=50,
),
}
def normalize_endpoint(path: str) -> str:
"""
Normalize endpoint path to reduce cardinality.
Examples:
/users/123 -> /users/{id}
/orders/abc-def-ghi/items/456 -> /orders/{id}/items/{id}
"""
import re
# Replace UUIDs
path = re.sub(
r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
'{uuid}',
path,
flags=re.IGNORECASE,
)
# Replace numeric IDs
path = re.sub(r'/\d+(?=/|$)', '/{id}', path)
# Replace alphanumeric IDs (common patterns)
path = re.sub(r'/[a-zA-Z0-9]{20,}(?=/|$)', '/{id}', path)
return path
def normalize_status_code(status_code: int) -> str:
"""
Normalize status code to category for reduced cardinality.
Examples:
200, 201, 204 -> "2xx"
400, 401, 404 -> "4xx"
500, 502, 503 -> "5xx"
"""
category = status_code // 100
return f"{category}xx"
def validate_label_value(label_name: str, value: str) -> bool:
"""
Validate that a label value is within expected bounds.
"""
config = STANDARD_LABELS.get(label_name)
if not config:
return True
if config.allowed_values:
return value in config.allowed_values
return True
Label Usage Examples¶
# src/observability/metrics/usage_examples.py
"""
Examples of proper label usage in metrics.
"""
from prometheus_client import Counter, Histogram, Gauge
from .labels import normalize_endpoint, normalize_status_code
# ============================================================================
# GOOD: Proper label usage
# ============================================================================
# Good: Low-cardinality labels only
http_requests = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status_category"],
)
# Usage with normalized values
def record_request(method: str, path: str, status_code: int):
http_requests.labels(
method=method.upper(),
endpoint=normalize_endpoint(path), # /users/123 -> /users/{id}
status_category=normalize_status_code(status_code), # 200 -> 2xx
).inc()
# Good: Bounded set of label values
cache_operations = Counter(
"cache_operations_total",
"Cache operations",
["operation", "result"],
)
# Usage
def record_cache_operation(operation: str, hit: bool):
cache_operations.labels(
operation=operation, # get, set, delete
result="hit" if hit else "miss",
).inc()
# Good: Database metrics with bounded labels
db_queries = Histogram(
"database_query_duration_seconds",
"Database query duration",
["operation", "table"],
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0],
)
# ============================================================================
# BAD: Anti-patterns to avoid
# ============================================================================
# ❌ Bad: User ID as label (high cardinality)
# user_requests = Counter(
# "user_requests_total",
# "Requests per user",
# ["user_id"], # Unbounded! Could be millions of users
# )
# ❌ Bad: Request ID as label (infinite cardinality)
# request_latency = Histogram(
# "request_latency_seconds",
# "Request latency",
# ["request_id"], # Every request is unique!
# )
# ❌ Bad: Full URL path as label (high cardinality)
# url_requests = Counter(
# "url_requests_total",
# "Requests per URL",
# ["url"], # /users/1, /users/2, /users/3... unbounded!
# )
# ❌ Bad: Timestamp as label
# events = Counter(
# "events_total",
# "Events",
# ["timestamp"], # Every second is unique!
# )
# ❌ Bad: Error message as label
# errors = Counter(
# "errors_total",
# "Errors",
# ["message"], # Error messages vary widely!
# )
# ✅ Instead, use error types:
errors = Counter(
"errors_total",
"Errors by type",
["error_type", "service"], # Bounded set of error types
)
Cardinality Monitoring¶
# src/observability/metrics/cardinality.py
"""
Tools for monitoring and controlling metric cardinality.
"""
from prometheus_client import Gauge, REGISTRY
from typing import Dict, List, Tuple
import logging
logger = logging.getLogger(__name__)
# Metric to track cardinality
metric_cardinality = Gauge(
"prometheus_metric_cardinality",
"Number of time series per metric",
["metric_name"],
)
def get_metric_cardinality() -> Dict[str, int]:
"""
Get the cardinality (number of time series) for each metric.
"""
cardinality: Dict[str, int] = {}
for metric in REGISTRY.collect():
count = len(list(metric.samples))
cardinality[metric.name] = count
return cardinality
def check_cardinality_limits(
limits: Dict[str, int],
action: str = "warn",
) -> List[Tuple[str, int, int]]:
"""
Check if any metrics exceed their cardinality limits.
Args:
limits: Dict mapping metric names to max cardinality
action: "warn" to log warning, "error" to raise exception
Returns:
List of (metric_name, current_cardinality, limit) for violations
"""
cardinality = get_metric_cardinality()
violations = []
for metric_name, limit in limits.items():
current = cardinality.get(metric_name, 0)
if current > limit:
violations.append((metric_name, current, limit))
if action == "warn":
logger.warning(
"Metric cardinality exceeded",
extra={
"metric": metric_name,
"current": current,
"limit": limit,
},
)
elif action == "error":
raise ValueError(
f"Metric {metric_name} cardinality {current} exceeds limit {limit}"
)
return violations
# Recommended cardinality limits
CARDINALITY_LIMITS = {
"http_requests_total": 1000,
"http_request_duration_seconds": 1000,
"database_queries_total": 500,
"cache_operations_total": 100,
"errors_total": 200,
}
Grafana Dashboards¶
Dashboard Structure Standards¶
┌─────────────────────────────────────────────────────────────────────┐
│ Dashboard Organization │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Row 1: Overview / Key Metrics │
│ ├── Total Requests (stat panel) │
│ ├── Error Rate (stat panel) │
│ ├── P99 Latency (stat panel) │
│ └── Active Users (stat panel) │
│ │
│ Row 2: Request Metrics │
│ ├── Requests per Second (time series) │
│ ├── Request Duration (heatmap) │
│ └── Status Code Distribution (pie chart) │
│ │
│ Row 3: Error Analysis │
│ ├── Error Rate by Endpoint (time series) │
│ ├── Error Types (bar chart) │
│ └── Recent Errors (table) │
│ │
│ Row 4: Infrastructure │
│ ├── CPU Usage (time series) │
│ ├── Memory Usage (time series) │
│ └── Database Connections (gauge) │
│ │
└─────────────────────────────────────────────────────────────────────┘
Dashboard Template (JSON)¶
{
"__inputs": [
{
"name": "DS_PROMETHEUS",
"label": "Prometheus",
"description": "Prometheus data source",
"type": "datasource",
"pluginId": "prometheus",
"pluginName": "Prometheus"
}
],
"__requires": [
{
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "10.0.0"
},
{
"type": "datasource",
"id": "prometheus",
"name": "Prometheus",
"version": "1.0.0"
}
],
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
},
{
"datasource": "${DS_PROMETHEUS}",
"enable": true,
"expr": "ALERTS{alertstate=\"firing\", service=\"$service\"}",
"iconColor": "red",
"name": "Alerts",
"step": "60s",
"tagKeys": "alertname",
"textFormat": "{{alertname}}: {{message}}",
"titleFormat": "Alert"
}
]
},
"description": "Service overview dashboard with key metrics",
"editable": true,
"gnetId": null,
"graphTooltip": 1,
"id": null,
"iteration": 1700000000000,
"links": [
{
"asDropdown": true,
"icon": "external link",
"includeVars": true,
"keepTime": true,
"tags": ["service"],
"targetBlank": true,
"title": "Related Dashboards",
"type": "dashboards"
}
],
"panels": [
{
"collapsed": false,
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 },
"id": 1,
"panels": [],
"title": "Overview",
"type": "row"
},
{
"datasource": "${DS_PROMETHEUS}",
"fieldConfig": {
"defaults": {
"color": { "mode": "thresholds" },
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null }
]
},
"unit": "reqps"
}
},
"gridPos": { "h": 4, "w": 6, "x": 0, "y": 1 },
"id": 2,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
"textMode": "auto"
},
"pluginVersion": "10.0.0",
"targets": [
{
"expr": "sum(rate(http_requests_total{service=\"$service\"}[$__rate_interval]))",
"legendFormat": "Requests/sec",
"refId": "A"
}
],
"title": "Request Rate",
"type": "stat"
},
{
"datasource": "${DS_PROMETHEUS}",
"fieldConfig": {
"defaults": {
"color": { "mode": "thresholds" },
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 1 },
{ "color": "red", "value": 5 }
]
},
"unit": "percent"
}
},
"gridPos": { "h": 4, "w": 6, "x": 6, "y": 1 },
"id": 3,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
"textMode": "auto"
},
"targets": [
{
"expr": "sum(rate(http_requests_total{service=\"$service\", status_code=~\"5..\"}[$__rate_interval])) / sum(rate(http_requests_total{service=\"$service\"}[$__rate_interval])) * 100",
"legendFormat": "Error Rate",
"refId": "A"
}
],
"title": "Error Rate",
"type": "stat"
},
{
"datasource": "${DS_PROMETHEUS}",
"fieldConfig": {
"defaults": {
"color": { "mode": "thresholds" },
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 0.5 },
{ "color": "red", "value": 1 }
]
},
"unit": "s"
}
},
"gridPos": { "h": 4, "w": 6, "x": 12, "y": 1 },
"id": 4,
"targets": [
{
"expr": "histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket{service=\"$service\"}[$__rate_interval])) by (le))",
"legendFormat": "P99 Latency",
"refId": "A"
}
],
"title": "P99 Latency",
"type": "stat"
}
],
"refresh": "30s",
"schemaVersion": 38,
"style": "dark",
"tags": ["service", "overview", "prometheus"],
"templating": {
"list": [
{
"current": {},
"datasource": "${DS_PROMETHEUS}",
"definition": "label_values(http_requests_total, service)",
"hide": 0,
"includeAll": false,
"label": "Service",
"multi": false,
"name": "service",
"options": [],
"query": {
"query": "label_values(http_requests_total, service)",
"refId": "StandardVariableQuery"
},
"refresh": 2,
"regex": "",
"skipUrlSync": false,
"sort": 1,
"type": "query"
},
{
"current": {},
"datasource": "${DS_PROMETHEUS}",
"definition": "label_values(http_requests_total{service=\"$service\"}, environment)",
"hide": 0,
"includeAll": true,
"label": "Environment",
"multi": true,
"name": "environment",
"options": [],
"query": {
"query": "label_values(http_requests_total{service=\"$service\"}, environment)",
"refId": "StandardVariableQuery"
},
"refresh": 2,
"regex": "",
"skipUrlSync": false,
"sort": 1,
"type": "query"
}
]
},
"time": { "from": "now-1h", "to": "now" },
"timepicker": {
"refresh_intervals": ["5s", "10s", "30s", "1m", "5m", "15m", "30m", "1h"],
"time_options": ["5m", "15m", "1h", "6h", "12h", "24h", "2d", "7d", "30d"]
},
"timezone": "browser",
"title": "Service Overview",
"uid": "service-overview",
"version": 1
}
Dashboard Variables¶
# grafana/provisioning/dashboards/variables.yaml
# Standard variables for all dashboards
variables:
# Service selector
- name: service
type: query
datasource: prometheus
query: "label_values(up, service)"
refresh: 2 # On time range change
sort: 1 # Alphabetical ascending
multi: false
includeAll: false
# Environment selector
- name: environment
type: query
datasource: prometheus
query: "label_values(up{service=\"$service\"}, environment)"
refresh: 2
sort: 1
multi: true
includeAll: true
allValue: ".*"
# Instance selector
- name: instance
type: query
datasource: prometheus
query: "label_values(up{service=\"$service\", environment=~\"$environment\"}, instance)"
refresh: 2
sort: 1
multi: true
includeAll: true
# Rate interval (for rate() functions)
- name: rate_interval
type: interval
auto: true
auto_min: "1m"
options:
- "1m"
- "5m"
- "10m"
- "30m"
- "1h"
# Custom interval for aggregations
- name: interval
type: custom
options:
- value: "1m"
text: "1 minute"
- value: "5m"
text: "5 minutes"
- value: "1h"
text: "1 hour"
default: "5m"
Alert Rules¶
# grafana/provisioning/alerting/rules.yaml
apiVersion: 1
groups:
- orgId: 1
name: Service Alerts
folder: Alerts
interval: 1m
rules:
# High Error Rate Alert
- uid: high-error-rate
title: High Error Rate
condition: C
data:
- refId: A
relativeTimeRange:
from: 300
to: 0
datasourceUid: prometheus
model:
expr: |
sum(rate(http_requests_total{status_code=~"5.."}[5m])) by (service)
/
sum(rate(http_requests_total[5m])) by (service)
* 100
intervalMs: 1000
maxDataPoints: 43200
refId: A
- refId: B
relativeTimeRange:
from: 300
to: 0
datasourceUid: __expr__
model:
conditions:
- evaluator:
params: []
type: gt
operator:
type: and
query:
params:
- B
reducer:
type: last
refId: B
type: reduce
reducer: last
expression: A
- refId: C
relativeTimeRange:
from: 300
to: 0
datasourceUid: __expr__
model:
conditions:
- evaluator:
params:
- 5 # 5% error rate threshold
type: gt
operator:
type: and
query:
params:
- C
refId: C
type: threshold
expression: B
noDataState: NoData
execErrState: Error
for: 5m
annotations:
summary: "High error rate for {{ $labels.service }}"
description: |
Error rate is {{ $values.B.Value | printf "%.2f" }}% for service {{ $labels.service }}.
This exceeds the 5% threshold.
runbook_url: "https://runbooks.example.com/high-error-rate"
labels:
severity: critical
team: platform
# High Latency Alert
- uid: high-latency
title: High P99 Latency
condition: C
data:
- refId: A
datasourceUid: prometheus
model:
expr: |
histogram_quantile(0.99,
sum(rate(http_request_duration_seconds_bucket[5m])) by (le, service)
)
refId: A
- refId: B
datasourceUid: __expr__
model:
refId: B
type: reduce
reducer: last
expression: A
- refId: C
datasourceUid: __expr__
model:
refId: C
type: threshold
expression: B
conditions:
- evaluator:
params:
- 2 # 2 second threshold
type: gt
for: 5m
annotations:
summary: "High latency for {{ $labels.service }}"
description: |
P99 latency is {{ $values.B.Value | printf "%.3f" }}s for service {{ $labels.service }}.
labels:
severity: warning
team: platform
# Low Request Rate (potential outage)
- uid: low-request-rate
title: Abnormally Low Request Rate
condition: C
data:
- refId: A
datasourceUid: prometheus
model:
expr: |
sum(rate(http_requests_total[5m])) by (service)
- refId: B
datasourceUid: __expr__
model:
type: reduce
reducer: last
expression: A
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: B
conditions:
- evaluator:
params:
- 1 # Less than 1 req/s
type: lt
for: 10m
annotations:
summary: "Low request rate for {{ $labels.service }}"
description: |
Request rate is {{ $values.B.Value | printf "%.2f" }} req/s for {{ $labels.service }}.
This may indicate an outage or routing issue.
labels:
severity: warning
team: platform
# Database Connection Pool Exhaustion
- uid: db-connections-high
title: Database Connections Near Limit
condition: C
data:
- refId: A
datasourceUid: prometheus
model:
expr: |
database_connections_active / database_connections_max * 100
- refId: B
datasourceUid: __expr__
model:
type: reduce
reducer: last
expression: A
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: B
conditions:
- evaluator:
params:
- 80 # 80% threshold
type: gt
for: 5m
annotations:
summary: "Database connection pool near exhaustion"
description: |
{{ $values.B.Value | printf "%.1f" }}% of database connections are in use.
labels:
severity: warning
team: database
Dashboard Provisioning¶
# grafana/provisioning/dashboards/dashboards.yaml
apiVersion: 1
providers:
- name: 'default'
orgId: 1
folder: 'Services'
folderUid: 'services'
type: file
disableDeletion: false
updateIntervalSeconds: 30
allowUiUpdates: true
options:
path: /var/lib/grafana/dashboards/services
- name: 'infrastructure'
orgId: 1
folder: 'Infrastructure'
folderUid: 'infrastructure'
type: file
disableDeletion: false
options:
path: /var/lib/grafana/dashboards/infrastructure
- name: 'alerts'
orgId: 1
folder: 'Alerts'
folderUid: 'alerts'
type: file
disableDeletion: false
options:
path: /var/lib/grafana/dashboards/alerts
Dashboard Design Patterns¶
# scripts/grafana/dashboard_generator.py
"""
Dashboard generator following design patterns.
@module dashboard_generator
@description Generate consistent Grafana dashboards
@version 1.0.0
@author Tyler Dukes
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import json
class PanelType(Enum):
STAT = "stat"
TIMESERIES = "timeseries"
TABLE = "table"
GAUGE = "gauge"
HEATMAP = "heatmap"
PIECHART = "piechart"
BARGAUGE = "bargauge"
TEXT = "text"
@dataclass
class Panel:
"""Dashboard panel configuration."""
title: str
type: PanelType
query: str
grid_pos: Dict[str, int]
unit: str = ""
thresholds: List[Dict[str, Any]] = field(default_factory=list)
legend_format: str = ""
description: str = ""
@dataclass
class Row:
"""Dashboard row configuration."""
title: str
panels: List[Panel]
collapsed: bool = False
@dataclass
class DashboardConfig:
"""Dashboard configuration."""
title: str
uid: str
description: str
tags: List[str]
refresh: str = "30s"
rows: List[Row] = field(default_factory=list)
variables: List[Dict[str, Any]] = field(default_factory=list)
class DashboardGenerator:
"""Generate Grafana dashboards from configuration."""
# Standard color schemes
COLORS = {
"green": "#73BF69",
"yellow": "#FADE2A",
"orange": "#FF9830",
"red": "#F2495C",
"blue": "#5794F2",
"purple": "#B877D9",
}
# Standard thresholds
ERROR_RATE_THRESHOLDS = [
{"color": "green", "value": None},
{"color": "yellow", "value": 1},
{"color": "orange", "value": 3},
{"color": "red", "value": 5},
]
LATENCY_THRESHOLDS = [
{"color": "green", "value": None},
{"color": "yellow", "value": 0.5},
{"color": "orange", "value": 1},
{"color": "red", "value": 2},
]
def __init__(self, config: DashboardConfig):
self.config = config
self._panel_id = 0
def _next_panel_id(self) -> int:
self._panel_id += 1
return self._panel_id
def generate(self) -> Dict[str, Any]:
"""Generate complete dashboard JSON."""
dashboard = {
"annotations": self._generate_annotations(),
"description": self.config.description,
"editable": True,
"gnetId": None,
"graphTooltip": 1, # Shared crosshair
"id": None,
"links": [],
"panels": self._generate_panels(),
"refresh": self.config.refresh,
"schemaVersion": 38,
"style": "dark",
"tags": self.config.tags,
"templating": {"list": self._generate_variables()},
"time": {"from": "now-1h", "to": "now"},
"timepicker": self._generate_timepicker(),
"timezone": "browser",
"title": self.config.title,
"uid": self.config.uid,
"version": 1,
}
return dashboard
def _generate_annotations(self) -> Dict[str, List]:
return {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": True,
"hide": True,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard",
}
]
}
def _generate_panels(self) -> List[Dict]:
panels = []
y_pos = 0
for row in self.config.rows:
# Add row panel
panels.append({
"collapsed": row.collapsed,
"gridPos": {"h": 1, "w": 24, "x": 0, "y": y_pos},
"id": self._next_panel_id(),
"panels": [],
"title": row.title,
"type": "row",
})
y_pos += 1
# Add panels in row
for panel in row.panels:
panels.append(self._generate_panel(panel, y_pos))
# Calculate next row position
if row.panels:
max_height = max(p.grid_pos.get("h", 8) for p in row.panels)
y_pos += max_height
return panels
def _generate_panel(self, panel: Panel, base_y: int) -> Dict:
grid_pos = panel.grid_pos.copy()
grid_pos["y"] = base_y
return {
"datasource": "${DS_PROMETHEUS}",
"description": panel.description,
"fieldConfig": self._generate_field_config(panel),
"gridPos": grid_pos,
"id": self._next_panel_id(),
"options": self._generate_panel_options(panel),
"pluginVersion": "10.0.0",
"targets": [
{
"expr": panel.query,
"legendFormat": panel.legend_format,
"refId": "A",
}
],
"title": panel.title,
"type": panel.type.value,
}
def _generate_field_config(self, panel: Panel) -> Dict:
return {
"defaults": {
"color": {"mode": "thresholds"},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": panel.thresholds or [{"color": "green", "value": None}],
},
"unit": panel.unit,
},
"overrides": [],
}
def _generate_panel_options(self, panel: Panel) -> Dict:
if panel.type == PanelType.STAT:
return {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": ["lastNotNull"],
"fields": "",
"values": False,
},
"textMode": "auto",
}
elif panel.type == PanelType.TIMESERIES:
return {
"legend": {"displayMode": "list", "placement": "bottom"},
"tooltip": {"mode": "multi", "sort": "desc"},
}
return {}
def _generate_variables(self) -> List[Dict]:
variables = [
{
"current": {},
"datasource": "${DS_PROMETHEUS}",
"definition": "label_values(up, service)",
"hide": 0,
"includeAll": False,
"label": "Service",
"multi": False,
"name": "service",
"options": [],
"query": {"query": "label_values(up, service)"},
"refresh": 2,
"sort": 1,
"type": "query",
},
{
"current": {},
"datasource": "${DS_PROMETHEUS}",
"definition": "label_values(up{service=\"$service\"}, environment)",
"hide": 0,
"includeAll": True,
"label": "Environment",
"multi": True,
"name": "environment",
"options": [],
"query": {"query": "label_values(up{service=\"$service\"}, environment)"},
"refresh": 2,
"sort": 1,
"type": "query",
},
]
return variables + self.config.variables
def _generate_timepicker(self) -> Dict:
return {
"refresh_intervals": ["5s", "10s", "30s", "1m", "5m", "15m"],
"time_options": ["5m", "15m", "1h", "6h", "12h", "24h", "2d", "7d"],
}
def create_service_dashboard(service_name: str) -> Dict:
"""Create a standard service dashboard."""
config = DashboardConfig(
title=f"{service_name} Service Dashboard",
uid=f"{service_name.lower()}-service",
description=f"Overview dashboard for {service_name} service",
tags=["service", service_name.lower(), "prometheus"],
rows=[
Row(
title="Overview",
panels=[
Panel(
title="Request Rate",
type=PanelType.STAT,
query=f'sum(rate(http_requests_total{{service="{service_name}"}}[$__rate_interval]))',
grid_pos={"h": 4, "w": 6, "x": 0},
unit="reqps",
legend_format="Requests/sec",
),
Panel(
title="Error Rate",
type=PanelType.STAT,
query=(
f'sum(rate(http_requests_total{{service="{service_name}",'
f' status_code=~"5.."}}[$__rate_interval])) / '
f'sum(rate(http_requests_total{{service="{service_name}"}}[$__rate_interval])) * 100'
),
grid_pos={"h": 4, "w": 6, "x": 6},
unit="percent",
thresholds=DashboardGenerator.ERROR_RATE_THRESHOLDS,
legend_format="Error Rate",
),
Panel(
title="P99 Latency",
type=PanelType.STAT,
query=f'histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket{{service="{service_name}"}}[$__rate_interval])) by (le))',
grid_pos={"h": 4, "w": 6, "x": 12},
unit="s",
thresholds=DashboardGenerator.LATENCY_THRESHOLDS,
legend_format="P99",
),
],
),
Row(
title="Request Metrics",
panels=[
Panel(
title="Requests per Second",
type=PanelType.TIMESERIES,
query=f'sum(rate(http_requests_total{{service="{service_name}"}}[$__rate_interval])) by (endpoint)',
grid_pos={"h": 8, "w": 12, "x": 0},
unit="reqps",
legend_format="{{endpoint}}",
),
Panel(
title="Request Duration",
type=PanelType.HEATMAP,
query=f'sum(rate(http_request_duration_seconds_bucket{{service="{service_name}"}}[$__rate_interval])) by (le)',
grid_pos={"h": 8, "w": 12, "x": 12},
unit="s",
),
],
),
],
)
generator = DashboardGenerator(config)
return generator.generate()
if __name__ == "__main__":
dashboard = create_service_dashboard("api-gateway")
print(json.dumps(dashboard, indent=2))
Dashboard Versioning¶
# grafana/dashboards/README.md
# Dashboard Version Management
## Versioning Strategy
1. **Git-based versioning**: All dashboards stored as JSON in version control
2. **Semantic versioning**: Major.Minor.Patch for dashboard changes
- Major: Breaking changes (removed panels, renamed variables)
- Minor: New panels or features
- Patch: Bug fixes, query optimizations
## Directory Structure
```text
grafana/dashboards/
├── services/
│ ├── api-gateway.json
│ ├── user-service.json
│ └── payment-service.json
├── infrastructure/
│ ├── kubernetes.json
│ ├── database.json
│ └── cache.json
├── alerts/
│ └── alert-overview.json
└── README.md
Change Process¶
- Export dashboard from Grafana UI
- Run
./scripts/format-dashboard.sh <file>to normalize JSON - Update version in dashboard JSON
- Commit with message:
dashboard(<name>): <description> - CI/CD automatically syncs to Grafana
Rollback¶
# Rollback to previous version
git checkout HEAD~1 -- grafana/dashboards/services/api-gateway.json
kubectl apply -k grafana/
CI/CD Integration¶
GitHub Actions Observability Validation¶
name: Observability Validation
on:
push:
paths:
- 'src/observability/**'
- 'src/middleware/**'
pull_request:
paths:
- 'src/observability/**'
- 'src/middleware/**'
jobs:
validate-observability:
runs-on: ubuntu-latest
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- 6831:6831/udp
- 16686:16686
- 4317:4317
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -e .[observability,test]
- name: Validate tracing configuration
env:
OTLP_ENDPOINT: localhost:4317
run: |
python -c "
from src.observability.tracing import configure_tracer
tracer = configure_tracer('test-service', '1.0.0')
print('Tracing configuration valid')
"
- name: Validate logging configuration
run: |
python -c "
from src.observability.logging import configure_logging
logger = configure_logging(level='DEBUG', json_format=True)
logger.info('test_event', key='value')
print('Logging configuration valid')
"
- name: Run observability tests
run: pytest tests/observability/ -v
- name: Check log format compliance
run: |
python scripts/validate_log_format.py
validate-metrics:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -e .[observability,test]
- name: Validate metrics endpoint
run: |
python -c "
from src.observability.metrics import get_metrics, set_service_info
set_service_info('test', '1.0.0', 'test')
metrics, content_type = get_metrics()
assert 'service_info' in metrics.decode()
print('Metrics configuration valid')
"
Docker Compose for Local Observability¶
# docker-compose.observability.yml
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:latest
environment:
COLLECTOR_OTLP_ENABLED: 'true'
ports:
- '6831:6831/udp'
- '16686:16686'
- '4317:4317'
- '4318:4318'
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- '9090:9090'
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
grafana:
image: grafana/grafana:latest
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
GF_AUTH_ANONYMOUS_ENABLED: 'true'
volumes:
- ./grafana/provisioning:/etc/grafana/provisioning
ports:
- '3000:3000'
depends_on:
- prometheus
- jaeger
- loki
loki:
image: grafana/loki:latest
ports:
- '3100:3100'
command: -config.file=/etc/loki/local-config.yaml
promtail:
image: grafana/promtail:latest
volumes:
- ./promtail-config.yml:/etc/promtail/config.yml
- /var/log:/var/log
command: -config.file=/etc/promtail/config.yml
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
volumes:
- ./otel-collector-config.yml:/etc/otel-collector-config.yml
command: ['--config=/etc/otel-collector-config.yml']
ports:
- '4317:4317'
- '4318:4318'
- '8888:8888'
Best Practices¶
Observability Checklist¶
Distributed Tracing:
✅ Use OpenTelemetry for vendor-neutral instrumentation
✅ Propagate trace context across service boundaries
✅ Use semantic span naming conventions
✅ Set appropriate sampling rates per environment
✅ Add relevant attributes to spans
✅ Record exceptions with stack traces
Structured Logging:
✅ Use JSON format in production
✅ Include trace/span IDs in logs
✅ Follow consistent field naming
✅ Use appropriate log levels
✅ Include request context (correlation ID, user ID)
✅ Avoid logging sensitive data
Log Aggregation:
✅ Configure retention policies
✅ Create useful dashboards
✅ Set up alerting on error rates
✅ Index searchable fields
✅ Document query patterns
Correlation:
✅ Generate/propagate correlation IDs
✅ Link traces, logs, and metrics
✅ Include correlation in error reports
✅ Propagate context to async tasks
Anti-Patterns to Avoid¶
# DON'T: Log without context
logger.error("Something went wrong")
# DO: Include relevant context
logger.error(
"order_creation_failed",
order_id=order_id,
user_id=user_id,
error_type=type(e).__name__,
error_message=str(e),
)
# DON'T: Create spans for trivial operations
with tracer.start_as_current_span("get_variable"):
x = self.config.get("key")
# DO: Create spans for meaningful operations
with tracer.start_as_current_span("fetch_user_preferences"):
prefs = await self.preferences_service.get(user_id)
# DON'T: Log sensitive data
logger.info("user_login", password=password, ssn=ssn)
# DO: Redact or omit sensitive data
logger.info("user_login", user_id=user.id, email=mask_email(user.email))
# DON'T: Use print statements
print(f"Processing order {order_id}")
# DO: Use structured logging
logger.info("processing_order", order_id=order_id)
# DON'T: Ignore errors silently
try:
process_order(order)
except Exception:
pass
# DO: Log and track errors
try:
process_order(order)
except Exception as e:
logger.error("order_processing_failed", order_id=order.id, exc_info=True)
capture_exception_with_context(e, order_id=order.id)
raise
Resources¶
Tracing and Instrumentation¶
- OpenTelemetry Documentation
- OpenTelemetry Semantic Conventions
- Jaeger Documentation
- Zipkin Documentation
Metrics and Monitoring¶
- Prometheus Documentation
- Prometheus Best Practices
- Prometheus Naming Conventions
- PromQL Documentation
Dashboards and Visualization¶
Logging¶
Error Monitoring and APM¶
Next Steps:
- Review Testing Strategies for observability testing
- See Security Scanning Guide for log security
- Check GitHub Actions Guide for CI/CD observability