Skip to content

Observability Guide

Introduction

This guide provides comprehensive standards for implementing distributed tracing and structured logging across microservices and distributed systems. It covers OpenTelemetry instrumentation, structured logging patterns, log aggregation strategies, and observability best practices.


Table of Contents

  1. Observability Philosophy
  2. Distributed Tracing
  3. Trace Sampling Strategies
  4. Structured Logging
  5. Log Aggregation
  6. Correlation and Context
  7. Error Tracking
  8. Metrics Integration
  9. Metric Naming Conventions
  10. Label and Tag Standards
  11. Grafana Dashboards
  12. CI/CD Integration
  13. Best Practices

Observability Philosophy

The Three Pillars of Observability

┌─────────────────────────────────────────────────────────────┐
│                  Observability Stack                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │
│   │   TRACES    │  │    LOGS     │  │   METRICS   │        │
│   │             │  │             │  │             │        │
│   │  What path  │  │  What       │  │  What is    │        │
│   │  did the    │  │  happened   │  │  the system │        │
│   │  request    │  │  at each    │  │  state over │        │
│   │  take?      │  │  point?     │  │  time?      │        │
│   └──────┬──────┘  └──────┬──────┘  └──────┬──────┘        │
│          │                │                │                │
│          └────────────────┼────────────────┘                │
│                           │                                 │
│                    ┌──────▼──────┐                         │
│                    │ Correlation │                         │
│                    │     ID      │                         │
│                    └─────────────┘                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Key Principles:

  • Traces show the journey of a request across services
  • Logs provide detailed context at each processing point
  • Metrics quantify system behavior over time
  • Correlation IDs tie all three together

Observability Standards

┌─────────────────┬─────────────────────────────────────────┐
│ Standard        │ Description                             │
├─────────────────┼─────────────────────────────────────────┤
│ OpenTelemetry   │ Vendor-neutral instrumentation standard │
│ W3C Trace       │ HTTP trace context propagation          │
│ JSON Logs       │ Machine-parseable structured logs       │
│ Semantic Conv.  │ Consistent attribute naming             │
└─────────────────┴─────────────────────────────────────────┘

Distributed Tracing

OpenTelemetry Setup (Python)

Installation:

pip install opentelemetry-api \
    opentelemetry-sdk \
    opentelemetry-exporter-otlp \
    opentelemetry-instrumentation-requests \
    opentelemetry-instrumentation-flask \
    opentelemetry-instrumentation-sqlalchemy \
    opentelemetry-instrumentation-redis

Tracer configuration:

# src/observability/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.trace.sampling import (
    ParentBasedTraceIdRatio,
    TraceIdRatioBased,
)

def configure_tracer(
    service_name: str,
    service_version: str,
    otlp_endpoint: str = "localhost:4317",
    sample_rate: float = 1.0,
    debug: bool = False,
) -> trace.Tracer:
    """Configure OpenTelemetry tracer with OTLP exporter."""

    resource = Resource.create({
        SERVICE_NAME: service_name,
        SERVICE_VERSION: service_version,
        "deployment.environment": os.getenv("ENVIRONMENT", "development"),
        "host.name": os.getenv("HOSTNAME", "unknown"),
    })

    sampler = ParentBasedTraceIdRatio(sample_rate)

    provider = TracerProvider(
        resource=resource,
        sampler=sampler,
    )

    otlp_exporter = OTLPSpanExporter(
        endpoint=otlp_endpoint,
        insecure=True,
    )
    provider.add_span_processor(
        BatchSpanProcessor(otlp_exporter)
    )

    if debug:
        provider.add_span_processor(
            BatchSpanProcessor(ConsoleSpanExporter())
        )

    trace.set_tracer_provider(provider)

    return trace.get_tracer(service_name, service_version)

tracer = configure_tracer(
    service_name="order-service",
    service_version="1.2.3",
    otlp_endpoint=os.getenv("OTLP_ENDPOINT", "localhost:4317"),
    sample_rate=float(os.getenv("TRACE_SAMPLE_RATE", "0.1")),
)

Auto-instrumentation:

# src/observability/auto_instrument.py
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor

def instrument_all(app=None, engine=None):
    """Apply auto-instrumentation to common libraries."""

    RequestsInstrumentor().instrument()

    Psycopg2Instrumentor().instrument()

    RedisInstrumentor().instrument()

    CeleryInstrumentor().instrument()

    if app:
        FlaskInstrumentor().instrument_app(app)

    if engine:
        SQLAlchemyInstrumentor().instrument(engine=engine)

Manual span creation:

# src/services/order_service.py
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)

class OrderService:
    """Order processing service with tracing."""

    def create_order(self, user_id: str, items: list) -> Order:
        """Create a new order with full tracing."""
        with tracer.start_as_current_span(
            "create_order",
            attributes={
                "user.id": user_id,
                "order.item_count": len(items),
            }
        ) as span:
            try:
                order = self._validate_and_create(user_id, items)

                span.set_attribute("order.id", order.id)
                span.set_attribute("order.total", float(order.total))
                span.set_status(Status(StatusCode.OK))

                return order

            except ValidationError as e:
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.record_exception(e)
                raise

    def _validate_and_create(self, user_id: str, items: list) -> Order:
        """Validate items and create order."""
        with tracer.start_as_current_span("validate_items") as span:
            validated_items = []
            for item in items:
                validated = self._validate_item(item)
                validated_items.append(validated)
            span.set_attribute("validated_count", len(validated_items))

        with tracer.start_as_current_span("check_inventory") as span:
            inventory_status = self.inventory_client.check_availability(
                [i["product_id"] for i in validated_items]
            )
            span.set_attribute("all_available", inventory_status.all_available)

        with tracer.start_as_current_span("calculate_pricing") as span:
            pricing = self.pricing_service.calculate(validated_items)
            span.set_attribute("subtotal", float(pricing.subtotal))
            span.set_attribute("tax", float(pricing.tax))
            span.set_attribute("total", float(pricing.total))

        with tracer.start_as_current_span("persist_order") as span:
            order = Order(
                user_id=user_id,
                items=validated_items,
                pricing=pricing,
            )
            self.repository.save(order)
            span.set_attribute("order.id", order.id)

        return order

    def _validate_item(self, item: dict) -> dict:
        """Validate a single order item."""
        with tracer.start_as_current_span(
            "validate_item",
            attributes={"product.id": item.get("product_id")}
        ):
            product = self.product_client.get(item["product_id"])
            if not product:
                raise ValidationError(f"Product not found: {item['product_id']}")
            return {
                "product_id": product.id,
                "name": product.name,
                "price": product.price,
                "quantity": item["quantity"],
            }

Context propagation:

# src/observability/propagation.py
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import requests

propagator = TraceContextTextMapPropagator()

def make_traced_request(method: str, url: str, **kwargs) -> requests.Response:
    """Make HTTP request with trace context propagation."""
    headers = kwargs.pop("headers", {})

    inject(headers)

    return requests.request(method, url, headers=headers, **kwargs)

def extract_context_from_request(request) -> trace.Context:
    """Extract trace context from incoming HTTP request."""
    return extract(request.headers)

def traced_request_handler(func):
    """Decorator to extract trace context and create span for request handlers."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        context = extract(request.headers)

        tracer = trace.get_tracer(__name__)
        with tracer.start_as_current_span(
            f"{request.method} {request.path}",
            context=context,
            attributes={
                "http.method": request.method,
                "http.url": request.url,
                "http.route": request.path,
                "http.user_agent": request.headers.get("User-Agent", ""),
            }
        ) as span:
            try:
                response = func(*args, **kwargs)
                span.set_attribute("http.status_code", response.status_code)
                return response
            except Exception as e:
                span.record_exception(e)
                span.set_status(Status(StatusCode.ERROR, str(e)))
                raise

    return wrapper

Async tracing:

# src/observability/async_tracing.py
import asyncio
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)

async def traced_async_operation(name: str, coro, **attributes):
    """Execute async operation with tracing."""
    with tracer.start_as_current_span(name, attributes=attributes) as span:
        try:
            result = await coro
            span.set_status(Status(StatusCode.OK))
            return result
        except Exception as e:
            span.record_exception(e)
            span.set_status(Status(StatusCode.ERROR, str(e)))
            raise

class AsyncOrderProcessor:
    """Async order processor with tracing."""

    async def process_order(self, order_id: str) -> dict:
        """Process order with parallel traced operations."""
        with tracer.start_as_current_span(
            "process_order",
            attributes={"order.id": order_id}
        ) as span:
            order = await self.repository.get(order_id)
            span.set_attribute("order.status", order.status)

            results = await asyncio.gather(
                traced_async_operation(
                    "validate_payment",
                    self.payment_service.validate(order.payment_id),
                    payment_id=order.payment_id,
                ),
                traced_async_operation(
                    "reserve_inventory",
                    self.inventory_service.reserve(order.items),
                    item_count=len(order.items),
                ),
                traced_async_operation(
                    "calculate_shipping",
                    self.shipping_service.calculate(order.shipping_address),
                    address_country=order.shipping_address.country,
                ),
            )

            payment_valid, inventory_reserved, shipping_cost = results

            span.set_attribute("payment.valid", payment_valid)
            span.set_attribute("inventory.reserved", inventory_reserved)
            span.set_attribute("shipping.cost", float(shipping_cost))

            return {
                "order_id": order_id,
                "payment_valid": payment_valid,
                "inventory_reserved": inventory_reserved,
                "shipping_cost": shipping_cost,
            }

OpenTelemetry Setup (TypeScript)

Installation:

npm install @opentelemetry/api \
    @opentelemetry/sdk-node \
    @opentelemetry/auto-instrumentations-node \
    @opentelemetry/exporter-trace-otlp-grpc \
    @opentelemetry/resources \
    @opentelemetry/semantic-conventions

Tracer configuration:

// src/observability/tracing.ts
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
import { Resource } from '@opentelemetry/resources';
import {
  SEMRESATTRS_SERVICE_NAME,
  SEMRESATTRS_SERVICE_VERSION,
  SEMRESATTRS_DEPLOYMENT_ENVIRONMENT,
} from '@opentelemetry/semantic-conventions';
import { ParentBasedSampler, TraceIdRatioBasedSampler } from '@opentelemetry/sdk-trace-base';
import { diag, DiagConsoleLogger, DiagLogLevel, trace, Span, SpanStatusCode } from '@opentelemetry/api';

interface TracingConfig {
  serviceName: string;
  serviceVersion: string;
  otlpEndpoint?: string;
  sampleRate?: number;
  debug?: boolean;
}

export function initializeTracing(config: TracingConfig): NodeSDK {
  const {
    serviceName,
    serviceVersion,
    otlpEndpoint = 'localhost:4317',
    sampleRate = 1.0,
    debug = false,
  } = config;

  if (debug) {
    diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
  }

  const resource = new Resource({
    [SEMRESATTRS_SERVICE_NAME]: serviceName,
    [SEMRESATTRS_SERVICE_VERSION]: serviceVersion,
    [SEMRESATTRS_DEPLOYMENT_ENVIRONMENT]: process.env.NODE_ENV || 'development',
    'host.name': process.env.HOSTNAME || 'unknown',
  });

  const traceExporter = new OTLPTraceExporter({
    url: `http://${otlpEndpoint}`,
  });

  const sampler = new ParentBasedSampler({
    root: new TraceIdRatioBasedSampler(sampleRate),
  });

  const sdk = new NodeSDK({
    resource,
    traceExporter,
    sampler,
    instrumentations: [
      getNodeAutoInstrumentations({
        '@opentelemetry/instrumentation-fs': { enabled: false },
        '@opentelemetry/instrumentation-http': {
          ignoreIncomingPaths: ['/health', '/ready', '/metrics'],
        },
      }),
    ],
  });

  sdk.start();

  process.on('SIGTERM', () => {
    sdk.shutdown()
      .then(() => console.log('Tracing terminated'))
      .catch((error) => console.error('Error terminating tracing', error))
      .finally(() => process.exit(0));
  });

  return sdk;
}

export const tracer = trace.getTracer('order-service', '1.0.0');

Manual span creation:

// src/services/order.service.ts
import { trace, Span, SpanStatusCode, context } from '@opentelemetry/api';

const tracer = trace.getTracer('order-service');

interface OrderItem {
  productId: string;
  quantity: number;
  price: number;
}

interface Order {
  id: string;
  userId: string;
  items: OrderItem[];
  total: number;
  status: string;
}

export class OrderService {
  async createOrder(userId: string, items: OrderItem[]): Promise<Order> {
    return tracer.startActiveSpan(
      'createOrder',
      {
        attributes: {
          'user.id': userId,
          'order.item_count': items.length,
        },
      },
      async (span: Span) => {
        try {
          const order = await this.validateAndCreate(userId, items);

          span.setAttributes({
            'order.id': order.id,
            'order.total': order.total,
          });
          span.setStatus({ code: SpanStatusCode.OK });

          return order;
        } catch (error) {
          span.setStatus({
            code: SpanStatusCode.ERROR,
            message: error instanceof Error ? error.message : 'Unknown error',
          });
          span.recordException(error as Error);
          throw error;
        } finally {
          span.end();
        }
      }
    );
  }

  private async validateAndCreate(userId: string, items: OrderItem[]): Promise<Order> {
    const validatedItems = await tracer.startActiveSpan(
      'validateItems',
      async (span: Span) => {
        try {
          const validated = await Promise.all(
            items.map((item) => this.validateItem(item))
          );
          span.setAttribute('validated_count', validated.length);
          return validated;
        } finally {
          span.end();
        }
      }
    );

    const inventoryStatus = await tracer.startActiveSpan(
      'checkInventory',
      async (span: Span) => {
        try {
          const status = await this.inventoryClient.checkAvailability(
            validatedItems.map((i) => i.productId)
          );
          span.setAttribute('all_available', status.allAvailable);
          return status;
        } finally {
          span.end();
        }
      }
    );

    const pricing = await tracer.startActiveSpan(
      'calculatePricing',
      async (span: Span) => {
        try {
          const result = await this.pricingService.calculate(validatedItems);
          span.setAttributes({
            subtotal: result.subtotal,
            tax: result.tax,
            total: result.total,
          });
          return result;
        } finally {
          span.end();
        }
      }
    );

    return tracer.startActiveSpan(
      'persistOrder',
      async (span: Span) => {
        try {
          const order: Order = {
            id: this.generateId(),
            userId,
            items: validatedItems,
            total: pricing.total,
            status: 'pending',
          };
          await this.repository.save(order);
          span.setAttribute('order.id', order.id);
          return order;
        } finally {
          span.end();
        }
      }
    );
  }

  private async validateItem(item: OrderItem): Promise<OrderItem> {
    return tracer.startActiveSpan(
      'validateItem',
      { attributes: { 'product.id': item.productId } },
      async (span: Span) => {
        try {
          const product = await this.productClient.get(item.productId);
          if (!product) {
            throw new Error(`Product not found: ${item.productId}`);
          }
          return { ...item, price: product.price };
        } finally {
          span.end();
        }
      }
    );
  }
}

Context propagation:

// src/observability/propagation.ts
import { context, propagation, trace, SpanStatusCode } from '@opentelemetry/api';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import axios, { AxiosRequestConfig, AxiosResponse } from 'axios';

propagation.setGlobalPropagator(new W3CTraceContextPropagator());

export async function tracedHttpRequest<T>(
  config: AxiosRequestConfig
): Promise<AxiosResponse<T>> {
  const headers: Record<string, string> = {
    ...(config.headers as Record<string, string>),
  };

  propagation.inject(context.active(), headers);

  return axios.request<T>({ ...config, headers });
}

export function extractContextFromHeaders(
  headers: Record<string, string | string[] | undefined>
): ReturnType<typeof propagation.extract> {
  const normalizedHeaders: Record<string, string> = {};
  for (const [key, value] of Object.entries(headers)) {
    if (value) {
      normalizedHeaders[key.toLowerCase()] = Array.isArray(value) ? value[0] : value;
    }
  }
  return propagation.extract(context.active(), normalizedHeaders);
}

export function tracedRequestHandler(handlerName: string) {
  return function (
    target: unknown,
    propertyKey: string,
    descriptor: PropertyDescriptor
  ): PropertyDescriptor {
    const originalMethod = descriptor.value;

    descriptor.value = async function (req: Request, res: Response, ...args: unknown[]) {
      const extractedContext = extractContextFromHeaders(
        req.headers as Record<string, string>
      );

      return context.with(extractedContext, async () => {
        const tracer = trace.getTracer('http-server');
        return tracer.startActiveSpan(
          `${req.method} ${handlerName}`,
          {
            attributes: {
              'http.method': req.method,
              'http.url': req.url,
              'http.route': handlerName,
              'http.user_agent': req.headers['user-agent'] || '',
            },
          },
          async (span) => {
            try {
              const result = await originalMethod.apply(this, [req, res, ...args]);
              span.setAttribute('http.status_code', res.statusCode);
              span.setStatus({ code: SpanStatusCode.OK });
              return result;
            } catch (error) {
              span.recordException(error as Error);
              span.setStatus({
                code: SpanStatusCode.ERROR,
                message: (error as Error).message,
              });
              throw error;
            } finally {
              span.end();
            }
          }
        );
      });
    };

    return descriptor;
  };
}

Span Naming Conventions

# Span naming standards
SPAN_NAMING_CONVENTIONS = {
    # HTTP spans
    "http_server": "{method} {route}",
    "http_client": "HTTP {method}",

    # Database spans
    "db_query": "{db.system} {db.operation}",
    "db_statement": "{db.system} {db.operation} {db.sql.table}",

    # Messaging spans
    "message_publish": "{messaging.system} publish",
    "message_receive": "{messaging.system} receive",
    "message_process": "{messaging.system} process",

    # RPC spans
    "rpc_client": "{rpc.system}/{rpc.service}/{rpc.method}",
    "rpc_server": "{rpc.system}/{rpc.service}/{rpc.method}",

    # Internal spans
    "internal": "{component}.{operation}",
}

# Examples
SPAN_NAMES = {
    "http_server": "GET /api/users/{id}",
    "http_client": "HTTP POST",
    "db_query": "postgresql SELECT",
    "db_statement": "postgresql SELECT users",
    "message_publish": "kafka publish",
    "message_receive": "rabbitmq receive",
    "rpc_client": "grpc/UserService/GetUser",
    "internal": "OrderService.validateItems",
}

Sampling Strategies

# src/observability/sampling.py
from opentelemetry.sdk.trace.sampling import (
    Sampler,
    SamplingResult,
    Decision,
    ParentBased,
    TraceIdRatioBased,
    ALWAYS_ON,
    ALWAYS_OFF,
)
from opentelemetry.trace import Link, SpanKind
from opentelemetry.util.types import Attributes

class PrioritySampler(Sampler):
    """Sample based on request priority and error status."""

    def __init__(self, default_rate: float = 0.1, high_priority_rate: float = 1.0):
        self.default_rate = default_rate
        self.high_priority_rate = high_priority_rate
        self.default_sampler = TraceIdRatioBased(default_rate)
        self.high_priority_sampler = TraceIdRatioBased(high_priority_rate)

    def should_sample(
        self,
        parent_context,
        trace_id: int,
        name: str,
        kind: SpanKind = None,
        attributes: Attributes = None,
        links: list[Link] = None,
    ) -> SamplingResult:
        attributes = attributes or {}

        if attributes.get("error", False):
            return SamplingResult(Decision.RECORD_AND_SAMPLE, attributes)

        if attributes.get("priority") == "high":
            return self.high_priority_sampler.should_sample(
                parent_context, trace_id, name, kind, attributes, links
            )

        if name.startswith("health") or name.startswith("ready"):
            return SamplingResult(Decision.DROP, attributes)

        return self.default_sampler.should_sample(
            parent_context, trace_id, name, kind, attributes, links
        )

    def get_description(self) -> str:
        return f"PrioritySampler(default={self.default_rate}, high={self.high_priority_rate})"

class AdaptiveSampler(Sampler):
    """Adaptive sampler that adjusts rate based on traffic volume."""

    def __init__(
        self,
        target_traces_per_second: float = 10.0,
        min_rate: float = 0.001,
        max_rate: float = 1.0,
    ):
        self.target_tps = target_traces_per_second
        self.min_rate = min_rate
        self.max_rate = max_rate
        self._request_count = 0
        self._last_adjustment = time.time()
        self._current_rate = max_rate
        self._lock = threading.Lock()

    def should_sample(
        self,
        parent_context,
        trace_id: int,
        name: str,
        kind: SpanKind = None,
        attributes: Attributes = None,
        links: list[Link] = None,
    ) -> SamplingResult:
        with self._lock:
            self._request_count += 1
            self._maybe_adjust_rate()

        sampler = TraceIdRatioBased(self._current_rate)
        return sampler.should_sample(
            parent_context, trace_id, name, kind, attributes, links
        )

    def _maybe_adjust_rate(self):
        """Adjust sampling rate based on recent traffic."""
        now = time.time()
        elapsed = now - self._last_adjustment

        if elapsed >= 1.0:
            actual_tps = self._request_count / elapsed
            if actual_tps > 0:
                self._current_rate = min(
                    self.max_rate,
                    max(self.min_rate, self.target_tps / actual_tps)
                )
            self._request_count = 0
            self._last_adjustment = now

    def get_description(self) -> str:
        return f"AdaptiveSampler(target_tps={self.target_tps})"

SAMPLING_CONFIG = {
    "development": {
        "type": "always_on",
        "rate": 1.0,
    },
    "staging": {
        "type": "ratio",
        "rate": 0.5,
    },
    "production": {
        "type": "adaptive",
        "target_tps": 100,
        "min_rate": 0.001,
        "max_rate": 0.1,
    },
}

Trace Sampling Strategies

Sampling Overview

┌─────────────────────────────────────────────────────────────────────┐
│                     Trace Sampling Decision Tree                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Request Arrives                                                    │
│       │                                                             │
│       ▼                                                             │
│  ┌─────────────┐    Yes    ┌─────────────┐                         │
│  │ Error/High  │ ────────► │ Always      │                         │
│  │ Priority?   │           │ Sample      │                         │
│  └──────┬──────┘           └─────────────┘                         │
│         │ No                                                        │
│         ▼                                                           │
│  ┌─────────────┐    Yes    ┌─────────────┐                         │
│  │ Debug/Test  │ ────────► │ Sample      │                         │
│  │ Header?     │           │ 100%        │                         │
│  └──────┬──────┘           └─────────────┘                         │
│         │ No                                                        │
│         ▼                                                           │
│  ┌─────────────┐           ┌─────────────┐                         │
│  │ Apply Rate  │ ────────► │ Probabilis- │                         │
│  │ Limiting    │           │ tic Sample  │                         │
│  └─────────────┘           └─────────────┘                         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Sampling Strategy Comparison

┌──────────────────┬────────────────┬────────────────┬───────────────┐
│ Strategy         │ Use Case       │ Pros           │ Cons          │
├──────────────────┼────────────────┼────────────────┼───────────────┤
│ Always On        │ Development    │ Full visibility│ High overhead │
│ (100%)           │ Debugging      │ Easy debugging │ Not scalable  │
├──────────────────┼────────────────┼────────────────┼───────────────┤
│ Ratio-Based      │ Staging        │ Predictable    │ May miss edge │
│ (fixed %)        │ Low traffic    │ cost control   │ cases         │
├──────────────────┼────────────────┼────────────────┼───────────────┤
│ Rate-Limited     │ Production     │ Cost control   │ May miss      │
│ (traces/sec)     │ High traffic   │ Consistent     │ bursts        │
├──────────────────┼────────────────┼────────────────┼───────────────┤
│ Adaptive         │ Variable       │ Self-adjusting │ Complex to    │
│ (dynamic)        │ traffic        │ Cost efficient │ configure     │
├──────────────────┼────────────────┼────────────────┼───────────────┤
│ Priority-Based   │ Mixed          │ Important      │ Requires      │
│ (rules)          │ workloads      │ traces kept    │ maintenance   │
└──────────────────┴────────────────┴────────────────┴───────────────┘

Head-Based Sampling

Decision made at trace start:

# src/observability/sampling/head_based.py
from opentelemetry.sdk.trace.sampling import (
    Sampler,
    SamplingResult,
    Decision,
    ParentBased,
    TraceIdRatioBased,
)
from opentelemetry.trace import Link, SpanKind
from opentelemetry.util.types import Attributes
from typing import Optional, Sequence
import hashlib


class PriorityBasedSampler(Sampler):
    """
    Head-based sampler with priority rules.

    Samples traces based on configurable priority rules:
    - Error/exception traces: Always sampled
    - Slow requests: Always sampled
    - High-value operations: Higher sample rate
    - Normal operations: Base sample rate
    """

    def __init__(
        self,
        base_rate: float = 0.1,
        high_priority_rate: float = 1.0,
        slow_threshold_ms: float = 1000.0,
    ):
        self.base_rate = base_rate
        self.high_priority_rate = high_priority_rate
        self.slow_threshold_ms = slow_threshold_ms
        self._high_priority_operations = {
            "payment.process",
            "order.create",
            "user.authenticate",
            "checkout.complete",
        }

    def should_sample(
        self,
        parent_context: Optional["Context"],
        trace_id: int,
        name: str,
        kind: Optional[SpanKind] = None,
        attributes: Attributes = None,
        links: Optional[Sequence[Link]] = None,
    ) -> SamplingResult:
        attributes = attributes or {}

        # Always sample errors
        if attributes.get("error", False):
            return SamplingResult(
                decision=Decision.RECORD_AND_SAMPLE,
                attributes={"sampling.reason": "error"},
            )

        # Always sample high-priority operations
        if name in self._high_priority_operations:
            return SamplingResult(
                decision=Decision.RECORD_AND_SAMPLE,
                attributes={"sampling.reason": "high_priority"},
            )

        # Always sample debug requests
        if attributes.get("debug", False):
            return SamplingResult(
                decision=Decision.RECORD_AND_SAMPLE,
                attributes={"sampling.reason": "debug"},
            )

        # Apply probabilistic sampling for normal requests
        trace_id_bytes = trace_id.to_bytes(16, byteorder="big")
        hash_value = int(hashlib.md5(trace_id_bytes).hexdigest(), 16)
        threshold = int(self.base_rate * (2**128 - 1))

        if hash_value < threshold:
            return SamplingResult(
                decision=Decision.RECORD_AND_SAMPLE,
                attributes={"sampling.reason": "probabilistic"},
            )

        return SamplingResult(decision=Decision.DROP)

    def get_description(self) -> str:
        return f"PriorityBasedSampler(base_rate={self.base_rate})"


def create_parent_based_sampler(
    root_sampler: Sampler,
    remote_parent_sampled: Optional[Sampler] = None,
    remote_parent_not_sampled: Optional[Sampler] = None,
) -> ParentBased:
    """
    Create a parent-based sampler that respects parent sampling decisions.

    This ensures trace continuity across service boundaries.
    """
    return ParentBased(
        root=root_sampler,
        remote_parent_sampled=remote_parent_sampled or TraceIdRatioBased(1.0),
        remote_parent_not_sampled=remote_parent_not_sampled or TraceIdRatioBased(0.0),
    )

Tail-Based Sampling

Decision made after trace completion:

# src/observability/sampling/tail_based.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import time
import threading
from collections import defaultdict


class SamplingDecision(Enum):
    SAMPLE = "sample"
    DROP = "drop"
    PENDING = "pending"


@dataclass
class SpanData:
    """Represents a span waiting for sampling decision."""
    trace_id: str
    span_id: str
    parent_span_id: Optional[str]
    name: str
    start_time: float
    end_time: Optional[float] = None
    status: str = "OK"
    attributes: Dict[str, Any] = field(default_factory=dict)
    events: List[Dict[str, Any]] = field(default_factory=list)


@dataclass
class TraceBuffer:
    """Buffer for collecting spans before sampling decision."""
    trace_id: str
    spans: List[SpanData] = field(default_factory=list)
    created_at: float = field(default_factory=time.time)
    decision: SamplingDecision = SamplingDecision.PENDING


class TailBasedSampler:
    """
    Tail-based sampler that makes decisions after trace completion.

    Benefits:
    - Can sample based on full trace context
    - Captures error traces that started normally
    - Better for debugging intermittent issues

    Tradeoffs:
    - Higher memory usage (must buffer spans)
    - Higher latency (wait for trace completion)
    - More complex implementation
    """

    def __init__(
        self,
        buffer_timeout_seconds: float = 30.0,
        max_traces_in_buffer: int = 10000,
        base_sample_rate: float = 0.1,
        error_sample_rate: float = 1.0,
        slow_trace_threshold_ms: float = 5000.0,
    ):
        self.buffer_timeout = buffer_timeout_seconds
        self.max_traces = max_traces_in_buffer
        self.base_rate = base_sample_rate
        self.error_rate = error_sample_rate
        self.slow_threshold = slow_trace_threshold_ms

        self._trace_buffers: Dict[str, TraceBuffer] = {}
        self._lock = threading.Lock()

        # Start cleanup thread
        self._cleanup_thread = threading.Thread(
            target=self._cleanup_expired_traces,
            daemon=True,
        )
        self._cleanup_thread.start()

    def add_span(self, span: SpanData) -> None:
        """Add a span to the trace buffer."""
        with self._lock:
            if span.trace_id not in self._trace_buffers:
                if len(self._trace_buffers) >= self.max_traces:
                    self._evict_oldest_trace()
                self._trace_buffers[span.trace_id] = TraceBuffer(
                    trace_id=span.trace_id
                )

            self._trace_buffers[span.trace_id].spans.append(span)

            # Check if trace is complete (root span ended)
            if self._is_trace_complete(span.trace_id):
                self._make_sampling_decision(span.trace_id)

    def _is_trace_complete(self, trace_id: str) -> bool:
        """Check if all spans in trace have ended."""
        buffer = self._trace_buffers.get(trace_id)
        if not buffer:
            return False

        # Find root span (no parent)
        root_spans = [s for s in buffer.spans if s.parent_span_id is None]
        if not root_spans:
            return False

        return all(s.end_time is not None for s in root_spans)

    def _make_sampling_decision(self, trace_id: str) -> SamplingDecision:
        """Make final sampling decision for a complete trace."""
        buffer = self._trace_buffers.get(trace_id)
        if not buffer:
            return SamplingDecision.DROP

        # Always sample traces with errors
        has_error = any(
            s.status == "ERROR" or s.attributes.get("error", False)
            for s in buffer.spans
        )
        if has_error:
            buffer.decision = SamplingDecision.SAMPLE
            self._export_trace(buffer)
            return SamplingDecision.SAMPLE

        # Always sample slow traces
        trace_duration = self._calculate_trace_duration(buffer)
        if trace_duration > self.slow_threshold:
            buffer.decision = SamplingDecision.SAMPLE
            self._export_trace(buffer)
            return SamplingDecision.SAMPLE

        # Apply probabilistic sampling
        import random
        if random.random() < self.base_rate:
            buffer.decision = SamplingDecision.SAMPLE
            self._export_trace(buffer)
            return SamplingDecision.SAMPLE

        buffer.decision = SamplingDecision.DROP
        del self._trace_buffers[trace_id]
        return SamplingDecision.DROP

    def _calculate_trace_duration(self, buffer: TraceBuffer) -> float:
        """Calculate total trace duration in milliseconds."""
        if not buffer.spans:
            return 0.0

        start_times = [s.start_time for s in buffer.spans]
        end_times = [s.end_time for s in buffer.spans if s.end_time]

        if not end_times:
            return 0.0

        return (max(end_times) - min(start_times)) * 1000

    def _export_trace(self, buffer: TraceBuffer) -> None:
        """Export sampled trace to backend."""
        # Implementation depends on export destination
        # This is where you would send to Jaeger, Zipkin, etc.
        pass

    def _evict_oldest_trace(self) -> None:
        """Evict oldest trace when buffer is full."""
        if not self._trace_buffers:
            return

        oldest_id = min(
            self._trace_buffers.keys(),
            key=lambda tid: self._trace_buffers[tid].created_at,
        )
        del self._trace_buffers[oldest_id]

    def _cleanup_expired_traces(self) -> None:
        """Background thread to clean up expired trace buffers."""
        while True:
            time.sleep(5.0)
            now = time.time()

            with self._lock:
                expired = [
                    tid for tid, buf in self._trace_buffers.items()
                    if now - buf.created_at > self.buffer_timeout
                ]

                for tid in expired:
                    # Make decision on expired traces
                    self._make_sampling_decision(tid)

Sampling Configuration by Environment

# config/sampling.yaml
environments:
  development:
    strategy: always_on
    rate: 1.0
    description: "Sample all traces for debugging"

  staging:
    strategy: ratio_based
    rate: 0.5
    priority_rules:
      - name: errors
        condition: "status == ERROR"
        rate: 1.0
      - name: slow_requests
        condition: "duration > 2000ms"
        rate: 1.0
      - name: auth_operations
        condition: "operation.name contains 'auth'"
        rate: 1.0
    description: "50% base rate with priority overrides"

  production:
    strategy: adaptive
    target_traces_per_second: 100
    min_rate: 0.001
    max_rate: 0.1
    priority_rules:
      - name: errors
        condition: "status == ERROR"
        rate: 1.0
      - name: slow_requests
        condition: "duration > 5000ms"
        rate: 1.0
      - name: payments
        condition: "operation.name starts_with 'payment'"
        rate: 0.5
      - name: health_checks
        condition: "operation.name == 'health_check'"
        rate: 0.0
    tail_sampling:
      enabled: true
      buffer_timeout_seconds: 30
      policies:
        - name: error_traces
          type: status_code
          status_codes: [ERROR]
          sample_rate: 1.0
        - name: latency_traces
          type: latency
          threshold_ms: 3000
          sample_rate: 1.0
    description: "Adaptive rate with tail sampling for errors"

OpenTelemetry Collector Sampling

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  # Probabilistic sampling
  probabilistic_sampler:
    hash_seed: 22
    sampling_percentage: 10

  # Tail-based sampling
  tail_sampling:
    decision_wait: 10s
    num_traces: 100000
    expected_new_traces_per_sec: 1000
    policies:
      # Always sample error traces
      - name: error-policy
        type: status_code
        status_code:
          status_codes: [ERROR]

      # Always sample slow traces
      - name: latency-policy
        type: latency
        latency:
          threshold_ms: 5000

      # Sample specific operations at higher rate
      - name: high-value-policy
        type: string_attribute
        string_attribute:
          key: http.route
          values:
            - /api/v1/checkout
            - /api/v1/payment
          enabled_regex_matching: true

      # Rate limit normal traces
      - name: rate-limiting-policy
        type: rate_limiting
        rate_limiting:
          spans_per_second: 500

      # Probabilistic fallback
      - name: probabilistic-policy
        type: probabilistic
        probabilistic:
          sampling_percentage: 10

exporters:
  jaeger:
    endpoint: jaeger:14250
    tls:
      insecure: true

  prometheus:
    endpoint: "0.0.0.0:8889"

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [tail_sampling]
      exporters: [jaeger]
    metrics:
      receivers: [otlp]
      processors: []
      exporters: [prometheus]

Structured Logging

Python (structlog)

Installation:

pip install structlog python-json-logger

Logger configuration:

# src/observability/logging.py
import logging
import sys
from typing import Any, Dict, Optional

import structlog
from structlog.types import EventDict, Processor

def add_trace_context(
    logger: logging.Logger,
    method_name: str,
    event_dict: EventDict
) -> EventDict:
    """Add OpenTelemetry trace context to log events."""
    from opentelemetry import trace

    span = trace.get_current_span()
    if span.is_recording():
        ctx = span.get_span_context()
        event_dict["trace_id"] = format(ctx.trace_id, "032x")
        event_dict["span_id"] = format(ctx.span_id, "016x")
        event_dict["trace_flags"] = ctx.trace_flags

    return event_dict

def add_service_context(
    logger: logging.Logger,
    method_name: str,
    event_dict: EventDict
) -> EventDict:
    """Add service context to log events."""
    import os

    event_dict["service"] = os.getenv("SERVICE_NAME", "unknown")
    event_dict["version"] = os.getenv("SERVICE_VERSION", "unknown")
    event_dict["environment"] = os.getenv("ENVIRONMENT", "development")
    event_dict["host"] = os.getenv("HOSTNAME", "unknown")

    return event_dict

def configure_logging(
    level: str = "INFO",
    json_format: bool = True,
    add_trace: bool = True,
) -> structlog.BoundLogger:
    """Configure structured logging with structlog."""

    shared_processors: list[Processor] = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.UnicodeDecoder(),
        add_service_context,
    ]

    if add_trace:
        shared_processors.append(add_trace_context)

    if json_format:
        renderer = structlog.processors.JSONRenderer()
    else:
        renderer = structlog.dev.ConsoleRenderer(colors=True)

    structlog.configure(
        processors=shared_processors + [
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

    formatter = structlog.stdlib.ProcessorFormatter(
        foreign_pre_chain=shared_processors,
        processors=[
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
            renderer,
        ],
    )

    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(formatter)

    root_logger = logging.getLogger()
    root_logger.handlers = []
    root_logger.addHandler(handler)
    root_logger.setLevel(getattr(logging, level.upper()))

    for logger_name in ["uvicorn", "uvicorn.error", "uvicorn.access"]:
        logging.getLogger(logger_name).handlers = []
        logging.getLogger(logger_name).propagate = True

    return structlog.get_logger()

logger = configure_logging(
    level=os.getenv("LOG_LEVEL", "INFO"),
    json_format=os.getenv("LOG_FORMAT", "json") == "json",
)

Structured logging usage:

# src/services/user_service.py
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars

logger = structlog.get_logger()

class UserService:
    """User service with structured logging."""

    def login(self, email: str, password: str, request_context: dict) -> dict:
        """Handle user login with comprehensive logging."""
        bind_contextvars(
            request_id=request_context.get("request_id"),
            client_ip=request_context.get("client_ip"),
            user_agent=request_context.get("user_agent"),
        )

        logger.info(
            "login_attempt",
            email=email,
            auth_method="password",
        )

        try:
            user = self.repository.find_by_email(email)
            if not user:
                logger.warning(
                    "login_failed",
                    email=email,
                    reason="user_not_found",
                )
                raise AuthenticationError("Invalid credentials")

            if not self.verify_password(password, user.password_hash):
                logger.warning(
                    "login_failed",
                    email=email,
                    user_id=user.id,
                    reason="invalid_password",
                    failed_attempts=user.failed_login_attempts + 1,
                )
                self._record_failed_attempt(user)
                raise AuthenticationError("Invalid credentials")

            if not user.is_active:
                logger.warning(
                    "login_failed",
                    email=email,
                    user_id=user.id,
                    reason="account_inactive",
                )
                raise AuthenticationError("Account is inactive")

            token = self.create_token(user)

            logger.info(
                "login_success",
                user_id=user.id,
                email=user.email,
                role=user.role,
                mfa_enabled=user.mfa_enabled,
            )

            return {
                "user_id": user.id,
                "token": token,
                "expires_in": 3600,
            }

        except AuthenticationError:
            raise
        except Exception as e:
            logger.error(
                "login_error",
                email=email,
                error_type=type(e).__name__,
                error_message=str(e),
                exc_info=True,
            )
            raise
        finally:
            clear_contextvars()

    def create_user(self, user_data: dict) -> User:
        """Create a new user with audit logging."""
        logger.info(
            "user_creation_started",
            email=user_data.get("email"),
            role=user_data.get("role", "user"),
        )

        try:
            if self.repository.find_by_email(user_data["email"]):
                logger.warning(
                    "user_creation_failed",
                    email=user_data["email"],
                    reason="email_exists",
                )
                raise ValidationError("Email already registered")

            user = User(
                email=user_data["email"],
                name=user_data["name"],
                role=user_data.get("role", "user"),
            )
            user.set_password(user_data["password"])
            self.repository.save(user)

            logger.info(
                "user_created",
                user_id=user.id,
                email=user.email,
                role=user.role,
            )

            return user

        except ValidationError:
            raise
        except Exception as e:
            logger.error(
                "user_creation_error",
                email=user_data.get("email"),
                error_type=type(e).__name__,
                error_message=str(e),
                exc_info=True,
            )
            raise

Context manager for request logging:

# src/middleware/logging_middleware.py
import time
import uuid
from contextlib import contextmanager

import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars

logger = structlog.get_logger()

@contextmanager
def request_logging_context(request):
    """Context manager for request-scoped logging."""
    request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
    start_time = time.time()

    bind_contextvars(
        request_id=request_id,
        method=request.method,
        path=request.path,
        client_ip=request.remote_addr,
        user_agent=request.headers.get("User-Agent", ""),
    )

    logger.info("request_started")

    try:
        yield request_id
    except Exception as e:
        logger.error(
            "request_error",
            error_type=type(e).__name__,
            error_message=str(e),
            exc_info=True,
        )
        raise
    finally:
        duration_ms = (time.time() - start_time) * 1000
        logger.info(
            "request_completed",
            duration_ms=round(duration_ms, 2),
        )
        clear_contextvars()

def logging_middleware(app):
    """Flask middleware for request logging."""
    @app.before_request
    def before_request():
        request.start_time = time.time()
        request.request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))

        bind_contextvars(
            request_id=request.request_id,
            method=request.method,
            path=request.path,
            client_ip=request.remote_addr,
        )

        logger.info("request_started")

    @app.after_request
    def after_request(response):
        duration_ms = (time.time() - request.start_time) * 1000

        logger.info(
            "request_completed",
            status_code=response.status_code,
            duration_ms=round(duration_ms, 2),
            response_size=response.content_length,
        )

        response.headers["X-Request-ID"] = request.request_id

        clear_contextvars()
        return response

    return app

TypeScript (pino)

Installation:

npm install pino pino-pretty pino-http

Logger configuration:

// src/observability/logging.ts
import pino, { Logger, LoggerOptions } from 'pino';
import { trace, context } from '@opentelemetry/api';

interface ServiceContext {
  service: string;
  version: string;
  environment: string;
  host: string;
}

function getServiceContext(): ServiceContext {
  return {
    service: process.env.SERVICE_NAME || 'unknown',
    version: process.env.SERVICE_VERSION || 'unknown',
    environment: process.env.NODE_ENV || 'development',
    host: process.env.HOSTNAME || 'unknown',
  };
}

function getTraceContext(): Record<string, string> | undefined {
  const span = trace.getSpan(context.active());
  if (!span) return undefined;

  const spanContext = span.spanContext();
  return {
    trace_id: spanContext.traceId,
    span_id: spanContext.spanId,
    trace_flags: spanContext.traceFlags.toString(),
  };
}

export function createLogger(name: string): Logger {
  const isDevelopment = process.env.NODE_ENV === 'development';

  const options: LoggerOptions = {
    name,
    level: process.env.LOG_LEVEL || 'info',
    base: getServiceContext(),
    timestamp: pino.stdTimeFunctions.isoTime,
    formatters: {
      level: (label) => ({ level: label }),
    },
    mixin: () => {
      const traceContext = getTraceContext();
      return traceContext ? { ...traceContext } : {};
    },
  };

  if (isDevelopment) {
    return pino({
      ...options,
      transport: {
        target: 'pino-pretty',
        options: {
          colorize: true,
          translateTime: 'SYS:standard',
          ignore: 'pid,hostname',
        },
      },
    });
  }

  return pino(options);
}

export const logger = createLogger('app');

Structured logging usage:

// src/services/user.service.ts
import { createLogger } from '../observability/logging';

const logger = createLogger('UserService');

interface LoginContext {
  requestId: string;
  clientIp: string;
  userAgent: string;
}

export class UserService {
  async login(email: string, password: string, context: LoginContext): Promise<LoginResult> {
    const childLogger = logger.child({
      request_id: context.requestId,
      client_ip: context.clientIp,
      user_agent: context.userAgent,
    });

    childLogger.info({ email, auth_method: 'password' }, 'login_attempt');

    try {
      const user = await this.repository.findByEmail(email);

      if (!user) {
        childLogger.warn({ email, reason: 'user_not_found' }, 'login_failed');
        throw new AuthenticationError('Invalid credentials');
      }

      const passwordValid = await this.verifyPassword(password, user.passwordHash);
      if (!passwordValid) {
        childLogger.warn(
          {
            email,
            user_id: user.id,
            reason: 'invalid_password',
            failed_attempts: user.failedLoginAttempts + 1,
          },
          'login_failed'
        );
        await this.recordFailedAttempt(user);
        throw new AuthenticationError('Invalid credentials');
      }

      if (!user.isActive) {
        childLogger.warn(
          { email, user_id: user.id, reason: 'account_inactive' },
          'login_failed'
        );
        throw new AuthenticationError('Account is inactive');
      }

      const token = await this.createToken(user);

      childLogger.info(
        {
          user_id: user.id,
          email: user.email,
          role: user.role,
          mfa_enabled: user.mfaEnabled,
        },
        'login_success'
      );

      return { userId: user.id, token, expiresIn: 3600 };
    } catch (error) {
      if (error instanceof AuthenticationError) {
        throw error;
      }

      childLogger.error(
        {
          email,
          error_type: error.constructor.name,
          error_message: error.message,
          stack: error.stack,
        },
        'login_error'
      );
      throw error;
    }
  }

  async createUser(userData: CreateUserDto): Promise<User> {
    logger.info(
      { email: userData.email, role: userData.role || 'user' },
      'user_creation_started'
    );

    try {
      const existingUser = await this.repository.findByEmail(userData.email);
      if (existingUser) {
        logger.warn({ email: userData.email, reason: 'email_exists' }, 'user_creation_failed');
        throw new ValidationError('Email already registered');
      }

      const user = new User({
        email: userData.email,
        name: userData.name,
        role: userData.role || 'user',
      });
      await user.setPassword(userData.password);
      await this.repository.save(user);

      logger.info(
        { user_id: user.id, email: user.email, role: user.role },
        'user_created'
      );

      return user;
    } catch (error) {
      if (error instanceof ValidationError) {
        throw error;
      }

      logger.error(
        {
          email: userData.email,
          error_type: error.constructor.name,
          error_message: error.message,
          stack: error.stack,
        },
        'user_creation_error'
      );
      throw error;
    }
  }
}

HTTP request logging middleware:

// src/middleware/logging.middleware.ts
import pinoHttp from 'pino-http';
import { createLogger } from '../observability/logging';
import { v4 as uuidv4 } from 'uuid';
import { Request, Response, NextFunction } from 'express';

const logger = createLogger('http');

export const httpLoggingMiddleware = pinoHttp({
  logger,
  genReqId: (req) => req.headers['x-request-id'] || uuidv4(),
  customProps: (req) => ({
    request_id: req.id,
  }),
  customLogLevel: (req, res, err) => {
    if (res.statusCode >= 500 || err) return 'error';
    if (res.statusCode >= 400) return 'warn';
    return 'info';
  },
  customSuccessMessage: (req, res) => {
    return `${req.method} ${req.url} completed`;
  },
  customErrorMessage: (req, res, err) => {
    return `${req.method} ${req.url} failed: ${err.message}`;
  },
  customAttributeKeys: {
    req: 'request',
    res: 'response',
    err: 'error',
    responseTime: 'duration_ms',
  },
  serializers: {
    req: (req) => ({
      method: req.method,
      url: req.url,
      path: req.path,
      query: req.query,
      headers: {
        'user-agent': req.headers['user-agent'],
        'content-type': req.headers['content-type'],
        host: req.headers.host,
      },
    }),
    res: (res) => ({
      status_code: res.statusCode,
      headers: {
        'content-type': res.getHeader('content-type'),
        'content-length': res.getHeader('content-length'),
      },
    }),
  },
});

export function requestIdMiddleware(req: Request, res: Response, next: NextFunction): void {
  const requestId = (req.headers['x-request-id'] as string) || uuidv4();
  req.id = requestId;
  res.setHeader('X-Request-ID', requestId);
  next();
}

Log Level Standards

# Log level usage standards
LOG_LEVELS = {
    "DEBUG": {
        "description": "Detailed diagnostic information for debugging",
        "examples": [
            "Variable values during execution",
            "Function entry/exit points",
            "Cache hits/misses",
            "Query parameters",
        ],
        "production": False,
    },
    "INFO": {
        "description": "General operational events",
        "examples": [
            "Request started/completed",
            "User login/logout",
            "Background job started/completed",
            "Configuration loaded",
        ],
        "production": True,
    },
    "WARNING": {
        "description": "Unexpected but handled situations",
        "examples": [
            "Deprecated API usage",
            "Retry after transient failure",
            "Rate limit approaching",
            "Authentication failure",
        ],
        "production": True,
    },
    "ERROR": {
        "description": "Error conditions that should be investigated",
        "examples": [
            "Unhandled exceptions",
            "External service failures",
            "Database connection errors",
            "Invalid data received",
        ],
        "production": True,
    },
    "CRITICAL": {
        "description": "System-wide failures requiring immediate attention",
        "examples": [
            "Application startup failure",
            "Data corruption detected",
            "Security breach detected",
            "Critical resource exhausted",
        ],
        "production": True,
    },
}

Log Field Standards

# Standard log field naming conventions
STANDARD_LOG_FIELDS = {
    # Request context
    "request_id": "Unique request identifier (UUID)",
    "trace_id": "Distributed trace ID",
    "span_id": "Current span ID",
    "parent_span_id": "Parent span ID",

    # HTTP context
    "http.method": "HTTP method (GET, POST, etc.)",
    "http.url": "Full request URL",
    "http.path": "Request path",
    "http.status_code": "Response status code",
    "http.duration_ms": "Request duration in milliseconds",
    "http.client_ip": "Client IP address",
    "http.user_agent": "User agent string",

    # User context
    "user.id": "User identifier",
    "user.email": "User email (if allowed by policy)",
    "user.role": "User role or permission level",

    # Service context
    "service.name": "Service name",
    "service.version": "Service version",
    "service.environment": "Deployment environment",
    "service.host": "Host name or container ID",

    # Error context
    "error.type": "Exception class name",
    "error.message": "Error message",
    "error.stack": "Stack trace",
    "error.code": "Application error code",

    # Business context
    "order.id": "Order identifier",
    "payment.id": "Payment identifier",
    "product.id": "Product identifier",
    "transaction.id": "Transaction identifier",

    # Performance context
    "db.query_time_ms": "Database query time",
    "cache.hit": "Cache hit/miss boolean",
    "external.service": "External service name",
    "external.duration_ms": "External call duration",
}

LOG_FIELD_EXAMPLES = {
    "login_success": {
        "event": "login_success",
        "level": "info",
        "timestamp": "2024-01-15T10:30:00.000Z",
        "request_id": "550e8400-e29b-41d4-a716-446655440000",
        "trace_id": "abcd1234567890abcdef1234567890ab",
        "span_id": "1234567890abcdef",
        "service": "auth-service",
        "version": "1.2.3",
        "environment": "production",
        "user.id": "usr_123456",
        "user.email": "user@example.com",
        "user.role": "admin",
        "http.client_ip": "192.168.1.100",
        "http.user_agent": "Mozilla/5.0...",
        "auth_method": "password",
        "mfa_enabled": True,
    },
    "request_completed": {
        "event": "request_completed",
        "level": "info",
        "timestamp": "2024-01-15T10:30:00.500Z",
        "request_id": "550e8400-e29b-41d4-a716-446655440000",
        "trace_id": "abcd1234567890abcdef1234567890ab",
        "http.method": "POST",
        "http.path": "/api/orders",
        "http.status_code": 201,
        "http.duration_ms": 245.5,
        "db.query_count": 3,
        "db.total_time_ms": 45.2,
    },
}

Log Aggregation

ELK Stack Configuration

Filebeat configuration:

# filebeat.yml
filebeat.inputs:
  - type: container
    paths:
      - '/var/lib/docker/containers/*/*.log'
    processors:
      - add_kubernetes_metadata:
          host: ${NODE_NAME}
          matchers:
            - logs_path:
                logs_path: '/var/lib/docker/containers/'

  - type: log
    paths:
      - '/var/log/app/*.log'
    json.keys_under_root: true
    json.add_error_key: true
    json.message_key: message

processors:
  - decode_json_fields:
      fields: ['message']
      target: ''
      overwrite_keys: true
      add_error_key: true

  - add_host_metadata:
      when.not.contains.tags: forwarded

  - add_cloud_metadata: ~

  - add_fields:
      target: ''
      fields:
        environment: ${ENVIRONMENT:development}

output.elasticsearch:
  hosts: ['${ELASTICSEARCH_HOST:localhost:9200}']
  index: 'logs-%{[service.name]}-%{+yyyy.MM.dd}'
  pipeline: 'logs-pipeline'

setup.template:
  name: 'logs'
  pattern: 'logs-*'
  settings:
    index.number_of_shards: 1
    index.number_of_replicas: 1

setup.ilm:
  enabled: true
  rollover_alias: 'logs'
  pattern: '{now/d}-000001'
  policy_name: 'logs-policy'

logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0644

Elasticsearch index template:

{
  "index_patterns": ["logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "index.lifecycle.name": "logs-policy",
      "index.lifecycle.rollover_alias": "logs"
    },
    "mappings": {
      "dynamic_templates": [
        {
          "strings_as_keywords": {
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        }
      ],
      "properties": {
        "@timestamp": { "type": "date" },
        "level": { "type": "keyword" },
        "event": { "type": "keyword" },
        "message": { "type": "text" },
        "service": { "type": "keyword" },
        "version": { "type": "keyword" },
        "environment": { "type": "keyword" },
        "host": { "type": "keyword" },
        "request_id": { "type": "keyword" },
        "trace_id": { "type": "keyword" },
        "span_id": { "type": "keyword" },
        "user": {
          "properties": {
            "id": { "type": "keyword" },
            "email": { "type": "keyword" },
            "role": { "type": "keyword" }
          }
        },
        "http": {
          "properties": {
            "method": { "type": "keyword" },
            "path": { "type": "keyword" },
            "status_code": { "type": "integer" },
            "duration_ms": { "type": "float" },
            "client_ip": { "type": "ip" }
          }
        },
        "error": {
          "properties": {
            "type": { "type": "keyword" },
            "message": { "type": "text" },
            "stack": { "type": "text" }
          }
        }
      }
    }
  }
}

ILM Policy:

{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_age": "1d",
            "max_primary_shard_size": "50gb"
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          },
          "set_priority": {
            "priority": 50
          }
        }
      },
      "cold": {
        "min_age": "30d",
        "actions": {
          "set_priority": {
            "priority": 0
          },
          "freeze": {}
        }
      },
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

Grafana Loki Configuration

Promtail configuration:

# promtail-config.yml
server:
  http_listen_port: 9080
  grpc_listen_port: 0

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push
    tenant_id: default
    batchwait: 1s
    batchsize: 1048576
    timeout: 10s

scrape_configs:
  - job_name: containers
    static_configs:
      - targets:
          - localhost
        labels:
          job: containerlogs
          __path__: /var/lib/docker/containers/*/*log

    pipeline_stages:
      - json:
          expressions:
            output: log
            stream: stream
            timestamp: time

      - json:
          expressions:
            level: level
            event: event
            service: service
            trace_id: trace_id
            span_id: span_id
            request_id: request_id
          source: output

      - labels:
          level:
          service:
          event:

      - timestamp:
          source: timestamp
          format: RFC3339Nano

      - output:
          source: output

  - job_name: kubernetes-pods
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels:
          - __meta_kubernetes_pod_controller_name
        regex: ([0-9a-z-.]+?)(-[0-9a-f]{8,10})?
        action: replace
        target_label: __tmp_controller_name
      - source_labels:
          - __meta_kubernetes_pod_label_app_kubernetes_io_name
          - __meta_kubernetes_pod_label_app
          - __tmp_controller_name
          - __meta_kubernetes_pod_name
        regex: ^;*([^;]+)(;.*)?$
        action: replace
        target_label: app
      - source_labels:
          - __meta_kubernetes_pod_annotation_kubernetes_io_config_hash
        action: replace
        target_label: __tmp_pod_label_hash
      - source_labels:
          - __meta_kubernetes_namespace
        action: replace
        target_label: namespace
      - action: replace
        replacement: /var/log/pods/*$1/*.log
        separator: /
        source_labels:
          - __meta_kubernetes_pod_uid
          - __meta_kubernetes_pod_container_name
        target_label: __path__

    pipeline_stages:
      - cri: {}
      - json:
          expressions:
            level: level
            event: event
            trace_id: trace_id
      - labels:
          level:
          event:

LogQL query examples:

# Find all errors in the last hour
{job="containerlogs", level="error"} | json | line_format "{{.event}}: {{.message}}"

# Count errors by service
sum by (service) (count_over_time({level="error"}[1h]))

# Find slow requests (> 1000ms)
{job="containerlogs"} | json | http_duration_ms > 1000

# Trace a specific request
{job="containerlogs"} | json | request_id="550e8400-e29b-41d4-a716-446655440000"

# Find all logs for a trace
{job="containerlogs"} | json | trace_id="abcd1234567890abcdef1234567890ab"

# Error rate by service over time
sum by (service) (rate({level="error"}[5m]))

# Top 10 slowest endpoints
topk(10, avg by (http_path) (
  avg_over_time({job="containerlogs"} | json | unwrap http_duration_ms [1h])
))

# Login failures by reason
sum by (reason) (count_over_time({event="login_failed"}[1h]))

AWS CloudWatch Configuration

CloudWatch Logs agent configuration:

{
  "agent": {
    "metrics_collection_interval": 60,
    "run_as_user": "cwagent"
  },
  "logs": {
    "logs_collected": {
      "files": {
        "collect_list": [
          {
            "file_path": "/var/log/app/*.log",
            "log_group_name": "/app/${ENVIRONMENT}/${SERVICE_NAME}",
            "log_stream_name": "{instance_id}",
            "timezone": "UTC",
            "timestamp_format": "%Y-%m-%dT%H:%M:%S.%fZ",
            "multi_line_start_pattern": "{",
            "encoding": "utf-8"
          }
        ]
      }
    },
    "log_stream_name": "default",
    "force_flush_interval": 5
  }
}

CloudWatch Logs Insights queries:

-- Find all errors in the last hour
fields @timestamp, @message, service, error.type, error.message
| filter level = "error"
| sort @timestamp desc
| limit 100

-- Count errors by service
stats count(*) as error_count by service
| filter level = "error"
| sort error_count desc

-- Find slow requests
fields @timestamp, http.path, http.duration_ms, trace_id
| filter http.duration_ms > 1000
| sort http.duration_ms desc
| limit 50

-- Trace a specific request
fields @timestamp, @message
| filter request_id = "550e8400-e29b-41d4-a716-446655440000"
| sort @timestamp asc

-- Error rate over time (5 minute buckets)
stats count(*) as total, sum(level = "error") as errors by bin(5m) as time_bucket
| sort time_bucket asc

-- P99 latency by endpoint
stats percentile(http.duration_ms, 99) as p99_latency by http.path
| sort p99_latency desc
| limit 20

-- Login failures by reason
stats count(*) as failures by reason
| filter event = "login_failed"
| sort failures desc

-- User activity timeline
fields @timestamp, event, user.id, http.path
| filter user.id = "usr_123456"
| sort @timestamp desc
| limit 100

Terraform for CloudWatch Logs:

# cloudwatch.tf
resource "aws_cloudwatch_log_group" "app_logs" {
  name              = "/app/${var.environment}/${var.service_name}"
  retention_in_days = var.log_retention_days

  tags = {
    Environment = var.environment
    Service     = var.service_name
  }
}

resource "aws_cloudwatch_log_metric_filter" "error_count" {
  name           = "${var.service_name}-error-count"
  pattern        = "{ $.level = \"error\" }"
  log_group_name = aws_cloudwatch_log_group.app_logs.name

  metric_transformation {
    name          = "ErrorCount"
    namespace     = "App/${var.service_name}"
    value         = "1"
    default_value = "0"
    dimensions = {
      Service = "$.service"
    }
  }
}

resource "aws_cloudwatch_log_metric_filter" "request_latency" {
  name           = "${var.service_name}-request-latency"
  pattern        = "{ $.event = \"request_completed\" }"
  log_group_name = aws_cloudwatch_log_group.app_logs.name

  metric_transformation {
    name      = "RequestLatency"
    namespace = "App/${var.service_name}"
    value     = "$.http.duration_ms"
    unit      = "Milliseconds"
  }
}

resource "aws_cloudwatch_metric_alarm" "high_error_rate" {
  alarm_name          = "${var.service_name}-high-error-rate"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "ErrorCount"
  namespace           = "App/${var.service_name}"
  period              = 300
  statistic           = "Sum"
  threshold           = 10
  alarm_description   = "High error rate detected"

  alarm_actions = [aws_sns_topic.alerts.arn]
  ok_actions    = [aws_sns_topic.alerts.arn]

  dimensions = {
    Service = var.service_name
  }
}

resource "aws_cloudwatch_metric_alarm" "high_latency" {
  alarm_name          = "${var.service_name}-high-latency"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "RequestLatency"
  namespace           = "App/${var.service_name}"
  period              = 300
  extended_statistic  = "p99"
  threshold           = 1000
  alarm_description   = "P99 latency exceeded 1 second"

  alarm_actions = [aws_sns_topic.alerts.arn]
}

Correlation and Context

Correlation ID Implementation

Python middleware:

# src/middleware/correlation.py
import uuid
from contextvars import ContextVar
from functools import wraps
from typing import Callable, Optional

from opentelemetry import trace
from opentelemetry.propagate import extract, inject

correlation_id_var: ContextVar[Optional[str]] = ContextVar(
    "correlation_id", default=None
)

def get_correlation_id() -> Optional[str]:
    """Get current correlation ID."""
    return correlation_id_var.get()

def set_correlation_id(correlation_id: str) -> None:
    """Set correlation ID for current context."""
    correlation_id_var.set(correlation_id)

class CorrelationMiddleware:
    """Middleware to handle correlation ID propagation."""

    HEADER_NAME = "X-Correlation-ID"

    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        headers = dict(scope.get("headers", []))
        correlation_id = headers.get(
            self.HEADER_NAME.lower().encode(),
            str(uuid.uuid4()).encode()
        ).decode()

        set_correlation_id(correlation_id)

        span = trace.get_current_span()
        if span.is_recording():
            span.set_attribute("correlation_id", correlation_id)

        async def send_wrapper(message):
            if message["type"] == "http.response.start":
                headers = list(message.get("headers", []))
                headers.append(
                    (self.HEADER_NAME.lower().encode(), correlation_id.encode())
                )
                message["headers"] = headers
            await send(message)

        await self.app(scope, receive, send_wrapper)

def propagate_correlation(func: Callable) -> Callable:
    """Decorator to propagate correlation ID to async tasks."""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        correlation_id = get_correlation_id()
        if correlation_id:
            set_correlation_id(correlation_id)
        return await func(*args, **kwargs)
    return wrapper

class CorrelatedHttpClient:
    """HTTP client that propagates correlation ID."""

    def __init__(self, base_url: str):
        self.base_url = base_url
        self.session = None

    async def request(
        self,
        method: str,
        path: str,
        **kwargs
    ) -> dict:
        headers = kwargs.pop("headers", {})

        correlation_id = get_correlation_id()
        if correlation_id:
            headers["X-Correlation-ID"] = correlation_id

        inject(headers)

        if self.session is None:
            self.session = aiohttp.ClientSession()

        async with self.session.request(
            method,
            f"{self.base_url}{path}",
            headers=headers,
            **kwargs
        ) as response:
            return await response.json()

    async def get(self, path: str, **kwargs) -> dict:
        return await self.request("GET", path, **kwargs)

    async def post(self, path: str, **kwargs) -> dict:
        return await self.request("POST", path, **kwargs)

TypeScript middleware:

// src/middleware/correlation.ts
import { Request, Response, NextFunction } from 'express';
import { AsyncLocalStorage } from 'async_hooks';
import { v4 as uuidv4 } from 'uuid';
import { trace, context } from '@opentelemetry/api';
import axios, { AxiosInstance, InternalAxiosRequestConfig } from 'axios';

interface CorrelationContext {
  correlationId: string;
  requestId: string;
}

const correlationStorage = new AsyncLocalStorage<CorrelationContext>();

export function getCorrelationId(): string | undefined {
  return correlationStorage.getStore()?.correlationId;
}

export function getRequestId(): string | undefined {
  return correlationStorage.getStore()?.requestId;
}

export function correlationMiddleware(req: Request, res: Response, next: NextFunction): void {
  const correlationId = (req.headers['x-correlation-id'] as string) || uuidv4();
  const requestId = (req.headers['x-request-id'] as string) || uuidv4();

  const correlationContext: CorrelationContext = {
    correlationId,
    requestId,
  };

  const span = trace.getSpan(context.active());
  if (span) {
    span.setAttribute('correlation_id', correlationId);
    span.setAttribute('request_id', requestId);
  }

  res.setHeader('X-Correlation-ID', correlationId);
  res.setHeader('X-Request-ID', requestId);

  correlationStorage.run(correlationContext, () => {
    next();
  });
}

export function createCorrelatedAxiosClient(baseURL: string): AxiosInstance {
  const client = axios.create({ baseURL });

  client.interceptors.request.use((config: InternalAxiosRequestConfig) => {
    const correlationId = getCorrelationId();
    const requestId = getRequestId();

    if (correlationId) {
      config.headers.set('X-Correlation-ID', correlationId);
    }
    if (requestId) {
      config.headers.set('X-Request-ID', requestId);
    }

    return config;
  });

  return client;
}

export function withCorrelation<T extends (...args: unknown[]) => Promise<unknown>>(
  fn: T
): T {
  return (async (...args: Parameters<T>) => {
    const store = correlationStorage.getStore();
    if (store) {
      return correlationStorage.run(store, () => fn(...args));
    }
    return fn(...args);
  }) as T;
}

Request Context Propagation

Python context manager:

# src/observability/context.py
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
import uuid

@dataclass
class RequestContext:
    """Request context for observability."""
    request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    correlation_id: Optional[str] = None
    trace_id: Optional[str] = None
    span_id: Optional[str] = None
    user_id: Optional[str] = None
    tenant_id: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, Any]:
        """Convert context to dictionary."""
        result = {
            "request_id": self.request_id,
        }
        if self.correlation_id:
            result["correlation_id"] = self.correlation_id
        if self.trace_id:
            result["trace_id"] = self.trace_id
        if self.span_id:
            result["span_id"] = self.span_id
        if self.user_id:
            result["user_id"] = self.user_id
        if self.tenant_id:
            result["tenant_id"] = self.tenant_id
        if self.metadata:
            result["metadata"] = self.metadata
        return result

    def to_headers(self) -> Dict[str, str]:
        """Convert context to HTTP headers."""
        headers = {
            "X-Request-ID": self.request_id,
        }
        if self.correlation_id:
            headers["X-Correlation-ID"] = self.correlation_id
        if self.user_id:
            headers["X-User-ID"] = self.user_id
        if self.tenant_id:
            headers["X-Tenant-ID"] = self.tenant_id
        return headers

_request_context: ContextVar[Optional[RequestContext]] = ContextVar(
    "request_context", default=None
)

def get_request_context() -> Optional[RequestContext]:
    """Get current request context."""
    return _request_context.get()

def set_request_context(context: RequestContext) -> None:
    """Set request context."""
    _request_context.set(context)

class RequestContextManager:
    """Context manager for request context."""

    def __init__(self, context: RequestContext):
        self.context = context
        self.token = None

    def __enter__(self) -> RequestContext:
        self.token = _request_context.set(self.context)
        return self.context

    def __exit__(self, *args):
        _request_context.reset(self.token)

def extract_context_from_request(request) -> RequestContext:
    """Extract request context from HTTP request."""
    from opentelemetry import trace

    span = trace.get_current_span()
    span_context = span.get_span_context() if span.is_recording() else None

    return RequestContext(
        request_id=request.headers.get("X-Request-ID", str(uuid.uuid4())),
        correlation_id=request.headers.get("X-Correlation-ID"),
        trace_id=format(span_context.trace_id, "032x") if span_context else None,
        span_id=format(span_context.span_id, "016x") if span_context else None,
        user_id=getattr(request, "user_id", None),
        tenant_id=request.headers.get("X-Tenant-ID"),
    )

Error Tracking

Sentry Integration (Python)

Installation:

pip install sentry-sdk[flask]

Configuration:

# src/observability/sentry.py
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.redis import RedisIntegration

def configure_sentry(
    dsn: str,
    environment: str,
    release: str,
    sample_rate: float = 1.0,
    traces_sample_rate: float = 0.1,
):
    """Configure Sentry error tracking."""
    sentry_sdk.init(
        dsn=dsn,
        environment=environment,
        release=release,
        sample_rate=sample_rate,
        traces_sample_rate=traces_sample_rate,
        integrations=[
            FlaskIntegration(),
            SqlalchemyIntegration(),
            CeleryIntegration(),
            RedisIntegration(),
        ],
        before_send=before_send,
        before_breadcrumb=before_breadcrumb,
        send_default_pii=False,
        attach_stacktrace=True,
        max_breadcrumbs=50,
    )

def before_send(event, hint):
    """Filter or modify events before sending to Sentry."""
    if "exc_info" in hint:
        exc_type, exc_value, tb = hint["exc_info"]

        if isinstance(exc_value, (ValidationError, AuthenticationError)):
            return None

        if isinstance(exc_value, HTTPException) and exc_value.code < 500:
            return None

    if event.get("request", {}).get("url", "").endswith("/health"):
        return None

    from .context import get_request_context
    ctx = get_request_context()
    if ctx:
        event.setdefault("tags", {}).update({
            "correlation_id": ctx.correlation_id,
            "tenant_id": ctx.tenant_id,
        })
        event.setdefault("extra", {}).update(ctx.to_dict())

    return event

def before_breadcrumb(crumb, hint):
    """Filter breadcrumbs."""
    if crumb.get("category") == "http" and "/health" in crumb.get("data", {}).get("url", ""):
        return None
    return crumb

def capture_exception_with_context(error: Exception, **extra):
    """Capture exception with additional context."""
    from .context import get_request_context

    with sentry_sdk.push_scope() as scope:
        ctx = get_request_context()
        if ctx:
            scope.set_tag("correlation_id", ctx.correlation_id)
            scope.set_tag("request_id", ctx.request_id)
            if ctx.user_id:
                scope.set_user({"id": ctx.user_id})

        for key, value in extra.items():
            scope.set_extra(key, value)

        sentry_sdk.capture_exception(error)

Sentry Integration (TypeScript)

Installation:

npm install @sentry/node @sentry/tracing

Configuration:

// src/observability/sentry.ts
import * as Sentry from '@sentry/node';
import { ProfilingIntegration } from '@sentry/profiling-node';
import { Express } from 'express';
import { getCorrelationId, getRequestId } from './correlation';

interface SentryConfig {
  dsn: string;
  environment: string;
  release: string;
  sampleRate?: number;
  tracesSampleRate?: number;
}

export function configureSentry(app: Express, config: SentryConfig): void {
  Sentry.init({
    dsn: config.dsn,
    environment: config.environment,
    release: config.release,
    sampleRate: config.sampleRate ?? 1.0,
    tracesSampleRate: config.tracesSampleRate ?? 0.1,
    integrations: [
      new Sentry.Integrations.Http({ tracing: true }),
      new Sentry.Integrations.Express({ app }),
      new ProfilingIntegration(),
    ],
    profilesSampleRate: 0.1,
    beforeSend: (event, hint) => beforeSend(event, hint),
    beforeBreadcrumb: (breadcrumb) => beforeBreadcrumb(breadcrumb),
    sendDefaultPii: false,
    attachStacktrace: true,
    maxBreadcrumbs: 50,
  });

  app.use(Sentry.Handlers.requestHandler());
  app.use(Sentry.Handlers.tracingHandler());
}

function beforeSend(
  event: Sentry.Event,
  hint: Sentry.EventHint
): Sentry.Event | null {
  const error = hint.originalException;

  if (error instanceof ValidationError || error instanceof AuthenticationError) {
    return null;
  }

  if (error instanceof HttpError && error.statusCode < 500) {
    return null;
  }

  if (event.request?.url?.endsWith('/health')) {
    return null;
  }

  const correlationId = getCorrelationId();
  const requestId = getRequestId();

  if (correlationId || requestId) {
    event.tags = {
      ...event.tags,
      correlation_id: correlationId,
      request_id: requestId,
    };
  }

  return event;
}

function beforeBreadcrumb(breadcrumb: Sentry.Breadcrumb): Sentry.Breadcrumb | null {
  if (
    breadcrumb.category === 'http' &&
    breadcrumb.data?.url?.includes('/health')
  ) {
    return null;
  }
  return breadcrumb;
}

export function captureExceptionWithContext(
  error: Error,
  extra: Record<string, unknown> = {}
): void {
  Sentry.withScope((scope) => {
    const correlationId = getCorrelationId();
    const requestId = getRequestId();

    if (correlationId) {
      scope.setTag('correlation_id', correlationId);
    }
    if (requestId) {
      scope.setTag('request_id', requestId);
    }

    Object.entries(extra).forEach(([key, value]) => {
      scope.setExtra(key, value);
    });

    Sentry.captureException(error);
  });
}

export const sentryErrorHandler = Sentry.Handlers.errorHandler();

Metrics Integration

Prometheus Metrics (Python)

Installation:

pip install prometheus-client

Metrics configuration:

# src/observability/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info, CollectorRegistry
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from functools import wraps
import time

registry = CollectorRegistry()

REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"],
    registry=registry,
)

REQUEST_LATENCY = Histogram(
    "http_request_duration_seconds",
    "HTTP request latency",
    ["method", "endpoint"],
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
    registry=registry,
)

ACTIVE_REQUESTS = Gauge(
    "http_requests_active",
    "Active HTTP requests",
    ["method"],
    registry=registry,
)

DB_QUERY_LATENCY = Histogram(
    "db_query_duration_seconds",
    "Database query latency",
    ["operation", "table"],
    buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0],
    registry=registry,
)

CACHE_OPERATIONS = Counter(
    "cache_operations_total",
    "Cache operations",
    ["operation", "result"],
    registry=registry,
)

ERROR_COUNT = Counter(
    "errors_total",
    "Total errors",
    ["type", "service"],
    registry=registry,
)

SERVICE_INFO = Info(
    "service",
    "Service information",
    registry=registry,
)

def set_service_info(name: str, version: str, environment: str):
    """Set service information."""
    SERVICE_INFO.info({
        "name": name,
        "version": version,
        "environment": environment,
    })

def track_request_metrics(method: str, endpoint: str):
    """Decorator to track request metrics."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            ACTIVE_REQUESTS.labels(method=method).inc()
            start_time = time.time()

            try:
                result = await func(*args, **kwargs)
                status = getattr(result, "status_code", 200)
                REQUEST_COUNT.labels(
                    method=method,
                    endpoint=endpoint,
                    status=status,
                ).inc()
                return result

            except Exception as e:
                REQUEST_COUNT.labels(
                    method=method,
                    endpoint=endpoint,
                    status=500,
                ).inc()
                ERROR_COUNT.labels(
                    type=type(e).__name__,
                    service="api",
                ).inc()
                raise

            finally:
                duration = time.time() - start_time
                REQUEST_LATENCY.labels(
                    method=method,
                    endpoint=endpoint,
                ).observe(duration)
                ACTIVE_REQUESTS.labels(method=method).dec()

        return wrapper
    return decorator

def track_db_query(operation: str, table: str):
    """Context manager to track database query metrics."""
    class QueryTimer:
        def __enter__(self):
            self.start_time = time.time()
            return self

        def __exit__(self, *args):
            duration = time.time() - self.start_time
            DB_QUERY_LATENCY.labels(
                operation=operation,
                table=table,
            ).observe(duration)

    return QueryTimer()

def track_cache_operation(operation: str, hit: bool):
    """Track cache operation."""
    CACHE_OPERATIONS.labels(
        operation=operation,
        result="hit" if hit else "miss",
    ).inc()

def get_metrics():
    """Get metrics in Prometheus format."""
    return generate_latest(registry), CONTENT_TYPE_LATEST

Metric Naming Conventions

Prometheus Metric Naming Rules

┌─────────────────────────────────────────────────────────────────────┐
│                    Prometheus Metric Naming Format                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Format: <namespace>_<subsystem>_<name>_<unit>                     │
│                                                                     │
│  Examples:                                                          │
│  ├── http_server_requests_total                                    │
│  ├── http_server_request_duration_seconds                          │
│  ├── database_connections_active                                    │
│  ├── cache_hits_total                                              │
│  └── queue_messages_pending                                         │
│                                                                     │
│  Rules:                                                             │
│  ├── Use snake_case (lowercase with underscores)                   │
│  ├── Start with a letter (a-z)                                     │
│  ├── Use only [a-zA-Z0-9_]                                         │
│  ├── End with unit suffix (_seconds, _bytes, _total)               │
│  └── Use _total suffix for counters                                │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Metric Type Guidelines

# src/observability/metrics/naming.py
"""
Prometheus metric naming conventions and examples.

@module prometheus_naming
@description Standards for Prometheus metric naming
@version 1.0.0
@author Tyler Dukes
"""

from prometheus_client import Counter, Histogram, Gauge, Summary, Info

# ============================================================================
# COUNTERS - Cumulative values that only increase
# ============================================================================
# Naming: <noun>_<verb>_total or <noun>_total

# Good - Clear noun + action + _total suffix
http_requests_total = Counter(
    "http_requests_total",
    "Total HTTP requests processed",
    ["method", "endpoint", "status_code"],
)

# Good - Specific subsystem prefix
database_queries_total = Counter(
    "database_queries_total",
    "Total database queries executed",
    ["operation", "table"],
)

# Good - Error counter
errors_total = Counter(
    "errors_total",
    "Total errors encountered",
    ["error_type", "service"],
)

# Bad - Missing _total suffix
# http_requests = Counter(...)  # Don't do this

# Bad - Using verb form instead of noun
# request_count = Counter(...)  # Don't do this

# ============================================================================
# GAUGES - Values that can increase or decrease
# ============================================================================
# Naming: <noun>_<state> or <noun>_<measurement>

# Good - Current state
http_requests_in_flight = Gauge(
    "http_requests_in_flight",
    "Number of HTTP requests currently being processed",
    ["method"],
)

# Good - Resource capacity
database_connections_active = Gauge(
    "database_connections_active",
    "Number of active database connections",
    ["pool_name"],
)

database_connections_max = Gauge(
    "database_connections_max",
    "Maximum database connections allowed",
    ["pool_name"],
)

# Good - Memory/resource usage
memory_usage_bytes = Gauge(
    "memory_usage_bytes",
    "Current memory usage in bytes",
    ["process"],
)

# Good - Queue depth
queue_messages_pending = Gauge(
    "queue_messages_pending",
    "Number of messages waiting in queue",
    ["queue_name"],
)

# ============================================================================
# HISTOGRAMS - Distributions of values
# ============================================================================
# Naming: <noun>_<unit> (generates _bucket, _sum, _count)

# Good - Duration in seconds
http_request_duration_seconds = Histogram(
    "http_request_duration_seconds",
    "HTTP request duration in seconds",
    ["method", "endpoint"],
    buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)

# Good - Size in bytes
http_request_size_bytes = Histogram(
    "http_request_size_bytes",
    "HTTP request body size in bytes",
    ["method", "endpoint"],
    buckets=[100, 1000, 10000, 100000, 1000000, 10000000],
)

# Good - Database query latency
database_query_duration_seconds = Histogram(
    "database_query_duration_seconds",
    "Database query duration in seconds",
    ["operation", "table"],
    buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0],
)

# ============================================================================
# SUMMARIES - Quantile distributions
# ============================================================================
# Naming: Same as histograms

# Good - Response time with quantiles
http_response_time_seconds = Summary(
    "http_response_time_seconds",
    "HTTP response time in seconds",
    ["method", "endpoint"],
)

# ============================================================================
# INFO - Static metadata
# ============================================================================
# Naming: <noun>_info

# Good - Application metadata
app_info = Info(
    "app",
    "Application information",
)
app_info.info({
    "version": "1.2.3",
    "environment": "production",
    "commit": "abc123",
})

# Good - Build information
build_info = Info(
    "build",
    "Build information",
)

Unit Suffixes

┌────────────────┬─────────────────────┬─────────────────────────────┐
│ Unit           │ Suffix              │ Example                     │
├────────────────┼─────────────────────┼─────────────────────────────┤
│ Time           │ _seconds            │ request_duration_seconds    │
│ Bytes          │ _bytes              │ response_size_bytes         │
│ Ratio/Percent  │ _ratio or _percent  │ cache_hit_ratio             │
│ Count (total)  │ _total              │ requests_total              │
│ Temperature    │ _celsius            │ cpu_temperature_celsius     │
│ Timestamp      │ _timestamp_seconds  │ last_success_timestamp_sec  │
│ No unit        │ (none)              │ queue_length                │
└────────────────┴─────────────────────┴─────────────────────────────┘

Naming Anti-Patterns

# ============================================================================
# ANTI-PATTERNS - Don't do these
# ============================================================================

# ❌ Bad: Inconsistent casing
# HttpRequestsTotal = Counter(...)  # Use snake_case
# HTTP_REQUESTS_TOTAL = Counter(...)  # Use lowercase

# ❌ Bad: Missing unit suffix
# request_duration = Histogram(...)  # Use _seconds

# ❌ Bad: Wrong metric type
# request_count = Gauge(...)  # Counters should use Counter type
# current_requests = Counter(...)  # Gauges should use Gauge type

# ❌ Bad: Redundant labels
# requests_total with labels ["method", "http_method"]  # Duplicated

# ❌ Bad: High-cardinality labels
# requests_total with labels ["user_id"]  # Unbounded values

# ❌ Bad: Inconsistent naming across metrics
# http_requests_total vs http_request_duration_seconds
# ^^^^ plural                ^^^^^^^ singular - pick one

# ❌ Bad: Using verbs instead of nouns
# process_requests_total  # Use noun: requests_processed_total

# ❌ Bad: Abbreviations
# req_total  # Use full words: requests_total
# db_conn  # Use full words: database_connections

# ✅ Good: Consistent naming pattern
http_server_requests_total = Counter(...)
http_server_request_duration_seconds = Histogram(...)
http_server_request_size_bytes = Histogram(...)
http_server_response_size_bytes = Histogram(...)

Label and Tag Standards

Label Best Practices

┌─────────────────────────────────────────────────────────────────────┐
│                     Label Cardinality Guidelines                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Low Cardinality (GOOD):                                           │
│  ├── method: GET, POST, PUT, DELETE (~10 values)                   │
│  ├── status_code: 200, 201, 400, 404, 500 (~20 values)            │
│  ├── environment: dev, staging, prod (3 values)                    │
│  └── service: api, worker, scheduler (~10 values)                  │
│                                                                     │
│  High Cardinality (BAD):                                           │
│  ├── user_id: Unbounded (millions of values)                       │
│  ├── request_id: Unique per request (infinite)                     │
│  ├── email: Unbounded (millions of values)                         │
│  └── timestamp: Unique per event (infinite)                        │
│                                                                     │
│  Cardinality Impact:                                               │
│  ├── 10 labels × 10 values = 100 time series                       │
│  ├── 10 labels × 100 values = 1,000 time series                    │
│  ├── 10 labels × 1000 values = 10,000 time series                  │
│  └── Memory usage grows linearly with time series count            │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Standard Label Definitions

# src/observability/metrics/labels.py
"""
Standard label definitions for consistent metrics.

@module prometheus_labels
@description Standardized label names and values
@version 1.0.0
@author Tyler Dukes
"""

from enum import Enum
from typing import Dict, List, Optional
from dataclasses import dataclass


class HttpMethod(Enum):
    """Standard HTTP method labels."""
    GET = "GET"
    POST = "POST"
    PUT = "PUT"
    PATCH = "PATCH"
    DELETE = "DELETE"
    HEAD = "HEAD"
    OPTIONS = "OPTIONS"


class HttpStatusCategory(Enum):
    """HTTP status code categories for reduced cardinality."""
    SUCCESS = "2xx"
    REDIRECT = "3xx"
    CLIENT_ERROR = "4xx"
    SERVER_ERROR = "5xx"


class Environment(Enum):
    """Standard environment labels."""
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"


class DatabaseOperation(Enum):
    """Standard database operation labels."""
    SELECT = "select"
    INSERT = "insert"
    UPDATE = "update"
    DELETE = "delete"
    TRANSACTION = "transaction"


class CacheOperation(Enum):
    """Standard cache operation labels."""
    GET = "get"
    SET = "set"
    DELETE = "delete"
    EXPIRE = "expire"


class CacheResult(Enum):
    """Standard cache result labels."""
    HIT = "hit"
    MISS = "miss"
    ERROR = "error"


@dataclass
class LabelConfig:
    """Configuration for a metric label."""
    name: str
    description: str
    allowed_values: Optional[List[str]] = None
    max_cardinality: int = 100


# Standard label configurations
STANDARD_LABELS: Dict[str, LabelConfig] = {
    "method": LabelConfig(
        name="method",
        description="HTTP method (GET, POST, PUT, DELETE, etc.)",
        allowed_values=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"],
        max_cardinality=10,
    ),
    "status_code": LabelConfig(
        name="status_code",
        description="HTTP status code (200, 201, 400, 404, 500, etc.)",
        max_cardinality=50,
    ),
    "status_category": LabelConfig(
        name="status_category",
        description="HTTP status category (2xx, 3xx, 4xx, 5xx)",
        allowed_values=["2xx", "3xx", "4xx", "5xx"],
        max_cardinality=4,
    ),
    "endpoint": LabelConfig(
        name="endpoint",
        description="API endpoint path (normalized, no IDs)",
        max_cardinality=100,
    ),
    "service": LabelConfig(
        name="service",
        description="Service name",
        max_cardinality=50,
    ),
    "environment": LabelConfig(
        name="environment",
        description="Deployment environment",
        allowed_values=["development", "staging", "production"],
        max_cardinality=5,
    ),
    "error_type": LabelConfig(
        name="error_type",
        description="Error classification",
        max_cardinality=50,
    ),
    "database": LabelConfig(
        name="database",
        description="Database name",
        max_cardinality=20,
    ),
    "table": LabelConfig(
        name="table",
        description="Database table name",
        max_cardinality=100,
    ),
    "operation": LabelConfig(
        name="operation",
        description="Operation type (select, insert, update, delete)",
        allowed_values=["select", "insert", "update", "delete", "transaction"],
        max_cardinality=10,
    ),
    "cache": LabelConfig(
        name="cache",
        description="Cache instance name",
        max_cardinality=20,
    ),
    "result": LabelConfig(
        name="result",
        description="Operation result (hit, miss, error)",
        allowed_values=["hit", "miss", "error", "success", "failure"],
        max_cardinality=10,
    ),
    "queue": LabelConfig(
        name="queue",
        description="Message queue name",
        max_cardinality=50,
    ),
}


def normalize_endpoint(path: str) -> str:
    """
    Normalize endpoint path to reduce cardinality.

    Examples:
        /users/123 -> /users/{id}
        /orders/abc-def-ghi/items/456 -> /orders/{id}/items/{id}
    """
    import re

    # Replace UUIDs
    path = re.sub(
        r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
        '{uuid}',
        path,
        flags=re.IGNORECASE,
    )

    # Replace numeric IDs
    path = re.sub(r'/\d+(?=/|$)', '/{id}', path)

    # Replace alphanumeric IDs (common patterns)
    path = re.sub(r'/[a-zA-Z0-9]{20,}(?=/|$)', '/{id}', path)

    return path


def normalize_status_code(status_code: int) -> str:
    """
    Normalize status code to category for reduced cardinality.

    Examples:
        200, 201, 204 -> "2xx"
        400, 401, 404 -> "4xx"
        500, 502, 503 -> "5xx"
    """
    category = status_code // 100
    return f"{category}xx"


def validate_label_value(label_name: str, value: str) -> bool:
    """
    Validate that a label value is within expected bounds.
    """
    config = STANDARD_LABELS.get(label_name)
    if not config:
        return True

    if config.allowed_values:
        return value in config.allowed_values

    return True

Label Usage Examples

# src/observability/metrics/usage_examples.py
"""
Examples of proper label usage in metrics.
"""

from prometheus_client import Counter, Histogram, Gauge
from .labels import normalize_endpoint, normalize_status_code


# ============================================================================
# GOOD: Proper label usage
# ============================================================================

# Good: Low-cardinality labels only
http_requests = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status_category"],
)

# Usage with normalized values
def record_request(method: str, path: str, status_code: int):
    http_requests.labels(
        method=method.upper(),
        endpoint=normalize_endpoint(path),  # /users/123 -> /users/{id}
        status_category=normalize_status_code(status_code),  # 200 -> 2xx
    ).inc()


# Good: Bounded set of label values
cache_operations = Counter(
    "cache_operations_total",
    "Cache operations",
    ["operation", "result"],
)

# Usage
def record_cache_operation(operation: str, hit: bool):
    cache_operations.labels(
        operation=operation,  # get, set, delete
        result="hit" if hit else "miss",
    ).inc()


# Good: Database metrics with bounded labels
db_queries = Histogram(
    "database_query_duration_seconds",
    "Database query duration",
    ["operation", "table"],
    buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0],
)


# ============================================================================
# BAD: Anti-patterns to avoid
# ============================================================================

# ❌ Bad: User ID as label (high cardinality)
# user_requests = Counter(
#     "user_requests_total",
#     "Requests per user",
#     ["user_id"],  # Unbounded! Could be millions of users
# )

# ❌ Bad: Request ID as label (infinite cardinality)
# request_latency = Histogram(
#     "request_latency_seconds",
#     "Request latency",
#     ["request_id"],  # Every request is unique!
# )

# ❌ Bad: Full URL path as label (high cardinality)
# url_requests = Counter(
#     "url_requests_total",
#     "Requests per URL",
#     ["url"],  # /users/1, /users/2, /users/3... unbounded!
# )

# ❌ Bad: Timestamp as label
# events = Counter(
#     "events_total",
#     "Events",
#     ["timestamp"],  # Every second is unique!
# )

# ❌ Bad: Error message as label
# errors = Counter(
#     "errors_total",
#     "Errors",
#     ["message"],  # Error messages vary widely!
# )

# ✅ Instead, use error types:
errors = Counter(
    "errors_total",
    "Errors by type",
    ["error_type", "service"],  # Bounded set of error types
)

Cardinality Monitoring

# src/observability/metrics/cardinality.py
"""
Tools for monitoring and controlling metric cardinality.
"""

from prometheus_client import Gauge, REGISTRY
from typing import Dict, List, Tuple
import logging

logger = logging.getLogger(__name__)

# Metric to track cardinality
metric_cardinality = Gauge(
    "prometheus_metric_cardinality",
    "Number of time series per metric",
    ["metric_name"],
)


def get_metric_cardinality() -> Dict[str, int]:
    """
    Get the cardinality (number of time series) for each metric.
    """
    cardinality: Dict[str, int] = {}

    for metric in REGISTRY.collect():
        count = len(list(metric.samples))
        cardinality[metric.name] = count

    return cardinality


def check_cardinality_limits(
    limits: Dict[str, int],
    action: str = "warn",
) -> List[Tuple[str, int, int]]:
    """
    Check if any metrics exceed their cardinality limits.

    Args:
        limits: Dict mapping metric names to max cardinality
        action: "warn" to log warning, "error" to raise exception

    Returns:
        List of (metric_name, current_cardinality, limit) for violations
    """
    cardinality = get_metric_cardinality()
    violations = []

    for metric_name, limit in limits.items():
        current = cardinality.get(metric_name, 0)
        if current > limit:
            violations.append((metric_name, current, limit))

            if action == "warn":
                logger.warning(
                    "Metric cardinality exceeded",
                    extra={
                        "metric": metric_name,
                        "current": current,
                        "limit": limit,
                    },
                )
            elif action == "error":
                raise ValueError(
                    f"Metric {metric_name} cardinality {current} exceeds limit {limit}"
                )

    return violations


# Recommended cardinality limits
CARDINALITY_LIMITS = {
    "http_requests_total": 1000,
    "http_request_duration_seconds": 1000,
    "database_queries_total": 500,
    "cache_operations_total": 100,
    "errors_total": 200,
}

Grafana Dashboards

Dashboard Structure Standards

┌─────────────────────────────────────────────────────────────────────┐
│                    Dashboard Organization                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Row 1: Overview / Key Metrics                                      │
│  ├── Total Requests (stat panel)                                   │
│  ├── Error Rate (stat panel)                                       │
│  ├── P99 Latency (stat panel)                                      │
│  └── Active Users (stat panel)                                     │
│                                                                     │
│  Row 2: Request Metrics                                             │
│  ├── Requests per Second (time series)                             │
│  ├── Request Duration (heatmap)                                    │
│  └── Status Code Distribution (pie chart)                          │
│                                                                     │
│  Row 3: Error Analysis                                              │
│  ├── Error Rate by Endpoint (time series)                          │
│  ├── Error Types (bar chart)                                       │
│  └── Recent Errors (table)                                         │
│                                                                     │
│  Row 4: Infrastructure                                              │
│  ├── CPU Usage (time series)                                       │
│  ├── Memory Usage (time series)                                    │
│  └── Database Connections (gauge)                                  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Dashboard Template (JSON)

{
  "__inputs": [
    {
      "name": "DS_PROMETHEUS",
      "label": "Prometheus",
      "description": "Prometheus data source",
      "type": "datasource",
      "pluginId": "prometheus",
      "pluginName": "Prometheus"
    }
  ],
  "__requires": [
    {
      "type": "grafana",
      "id": "grafana",
      "name": "Grafana",
      "version": "10.0.0"
    },
    {
      "type": "datasource",
      "id": "prometheus",
      "name": "Prometheus",
      "version": "1.0.0"
    }
  ],
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": "-- Grafana --",
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0, 211, 255, 1)",
        "name": "Annotations & Alerts",
        "type": "dashboard"
      },
      {
        "datasource": "${DS_PROMETHEUS}",
        "enable": true,
        "expr": "ALERTS{alertstate=\"firing\", service=\"$service\"}",
        "iconColor": "red",
        "name": "Alerts",
        "step": "60s",
        "tagKeys": "alertname",
        "textFormat": "{{alertname}}: {{message}}",
        "titleFormat": "Alert"
      }
    ]
  },
  "description": "Service overview dashboard with key metrics",
  "editable": true,
  "gnetId": null,
  "graphTooltip": 1,
  "id": null,
  "iteration": 1700000000000,
  "links": [
    {
      "asDropdown": true,
      "icon": "external link",
      "includeVars": true,
      "keepTime": true,
      "tags": ["service"],
      "targetBlank": true,
      "title": "Related Dashboards",
      "type": "dashboards"
    }
  ],
  "panels": [
    {
      "collapsed": false,
      "gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 },
      "id": 1,
      "panels": [],
      "title": "Overview",
      "type": "row"
    },
    {
      "datasource": "${DS_PROMETHEUS}",
      "fieldConfig": {
        "defaults": {
          "color": { "mode": "thresholds" },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              { "color": "green", "value": null }
            ]
          },
          "unit": "reqps"
        }
      },
      "gridPos": { "h": 4, "w": 6, "x": 0, "y": 1 },
      "id": 2,
      "options": {
        "colorMode": "value",
        "graphMode": "area",
        "justifyMode": "auto",
        "orientation": "auto",
        "reduceOptions": {
          "calcs": ["lastNotNull"],
          "fields": "",
          "values": false
        },
        "textMode": "auto"
      },
      "pluginVersion": "10.0.0",
      "targets": [
        {
          "expr": "sum(rate(http_requests_total{service=\"$service\"}[$__rate_interval]))",
          "legendFormat": "Requests/sec",
          "refId": "A"
        }
      ],
      "title": "Request Rate",
      "type": "stat"
    },
    {
      "datasource": "${DS_PROMETHEUS}",
      "fieldConfig": {
        "defaults": {
          "color": { "mode": "thresholds" },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              { "color": "green", "value": null },
              { "color": "yellow", "value": 1 },
              { "color": "red", "value": 5 }
            ]
          },
          "unit": "percent"
        }
      },
      "gridPos": { "h": 4, "w": 6, "x": 6, "y": 1 },
      "id": 3,
      "options": {
        "colorMode": "value",
        "graphMode": "area",
        "justifyMode": "auto",
        "orientation": "auto",
        "reduceOptions": {
          "calcs": ["lastNotNull"],
          "fields": "",
          "values": false
        },
        "textMode": "auto"
      },
      "targets": [
        {
          "expr": "sum(rate(http_requests_total{service=\"$service\", status_code=~\"5..\"}[$__rate_interval])) / sum(rate(http_requests_total{service=\"$service\"}[$__rate_interval])) * 100",
          "legendFormat": "Error Rate",
          "refId": "A"
        }
      ],
      "title": "Error Rate",
      "type": "stat"
    },
    {
      "datasource": "${DS_PROMETHEUS}",
      "fieldConfig": {
        "defaults": {
          "color": { "mode": "thresholds" },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              { "color": "green", "value": null },
              { "color": "yellow", "value": 0.5 },
              { "color": "red", "value": 1 }
            ]
          },
          "unit": "s"
        }
      },
      "gridPos": { "h": 4, "w": 6, "x": 12, "y": 1 },
      "id": 4,
      "targets": [
        {
          "expr": "histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket{service=\"$service\"}[$__rate_interval])) by (le))",
          "legendFormat": "P99 Latency",
          "refId": "A"
        }
      ],
      "title": "P99 Latency",
      "type": "stat"
    }
  ],
  "refresh": "30s",
  "schemaVersion": 38,
  "style": "dark",
  "tags": ["service", "overview", "prometheus"],
  "templating": {
    "list": [
      {
        "current": {},
        "datasource": "${DS_PROMETHEUS}",
        "definition": "label_values(http_requests_total, service)",
        "hide": 0,
        "includeAll": false,
        "label": "Service",
        "multi": false,
        "name": "service",
        "options": [],
        "query": {
          "query": "label_values(http_requests_total, service)",
          "refId": "StandardVariableQuery"
        },
        "refresh": 2,
        "regex": "",
        "skipUrlSync": false,
        "sort": 1,
        "type": "query"
      },
      {
        "current": {},
        "datasource": "${DS_PROMETHEUS}",
        "definition": "label_values(http_requests_total{service=\"$service\"}, environment)",
        "hide": 0,
        "includeAll": true,
        "label": "Environment",
        "multi": true,
        "name": "environment",
        "options": [],
        "query": {
          "query": "label_values(http_requests_total{service=\"$service\"}, environment)",
          "refId": "StandardVariableQuery"
        },
        "refresh": 2,
        "regex": "",
        "skipUrlSync": false,
        "sort": 1,
        "type": "query"
      }
    ]
  },
  "time": { "from": "now-1h", "to": "now" },
  "timepicker": {
    "refresh_intervals": ["5s", "10s", "30s", "1m", "5m", "15m", "30m", "1h"],
    "time_options": ["5m", "15m", "1h", "6h", "12h", "24h", "2d", "7d", "30d"]
  },
  "timezone": "browser",
  "title": "Service Overview",
  "uid": "service-overview",
  "version": 1
}

Dashboard Variables

# grafana/provisioning/dashboards/variables.yaml
# Standard variables for all dashboards

variables:
  # Service selector
  - name: service
    type: query
    datasource: prometheus
    query: "label_values(up, service)"
    refresh: 2  # On time range change
    sort: 1  # Alphabetical ascending
    multi: false
    includeAll: false

  # Environment selector
  - name: environment
    type: query
    datasource: prometheus
    query: "label_values(up{service=\"$service\"}, environment)"
    refresh: 2
    sort: 1
    multi: true
    includeAll: true
    allValue: ".*"

  # Instance selector
  - name: instance
    type: query
    datasource: prometheus
    query: "label_values(up{service=\"$service\", environment=~\"$environment\"}, instance)"
    refresh: 2
    sort: 1
    multi: true
    includeAll: true

  # Rate interval (for rate() functions)
  - name: rate_interval
    type: interval
    auto: true
    auto_min: "1m"
    options:
      - "1m"
      - "5m"
      - "10m"
      - "30m"
      - "1h"

  # Custom interval for aggregations
  - name: interval
    type: custom
    options:
      - value: "1m"
        text: "1 minute"
      - value: "5m"
        text: "5 minutes"
      - value: "1h"
        text: "1 hour"
    default: "5m"

Alert Rules

# grafana/provisioning/alerting/rules.yaml
apiVersion: 1
groups:
  - orgId: 1
    name: Service Alerts
    folder: Alerts
    interval: 1m
    rules:
      # High Error Rate Alert
      - uid: high-error-rate
        title: High Error Rate
        condition: C
        data:
          - refId: A
            relativeTimeRange:
              from: 300
              to: 0
            datasourceUid: prometheus
            model:
              expr: |
                sum(rate(http_requests_total{status_code=~"5.."}[5m])) by (service)
                /
                sum(rate(http_requests_total[5m])) by (service)
                * 100
              intervalMs: 1000
              maxDataPoints: 43200
              refId: A
          - refId: B
            relativeTimeRange:
              from: 300
              to: 0
            datasourceUid: __expr__
            model:
              conditions:
                - evaluator:
                    params: []
                    type: gt
                  operator:
                    type: and
                  query:
                    params:
                      - B
                  reducer:
                    type: last
              refId: B
              type: reduce
              reducer: last
              expression: A
          - refId: C
            relativeTimeRange:
              from: 300
              to: 0
            datasourceUid: __expr__
            model:
              conditions:
                - evaluator:
                    params:
                      - 5  # 5% error rate threshold
                    type: gt
                  operator:
                    type: and
                  query:
                    params:
                      - C
              refId: C
              type: threshold
              expression: B
        noDataState: NoData
        execErrState: Error
        for: 5m
        annotations:
          summary: "High error rate for {{ $labels.service }}"
          description: |
            Error rate is {{ $values.B.Value | printf "%.2f" }}% for service {{ $labels.service }}.
            This exceeds the 5% threshold.
          runbook_url: "https://runbooks.example.com/high-error-rate"
        labels:
          severity: critical
          team: platform

      # High Latency Alert
      - uid: high-latency
        title: High P99 Latency
        condition: C
        data:
          - refId: A
            datasourceUid: prometheus
            model:
              expr: |
                histogram_quantile(0.99,
                  sum(rate(http_request_duration_seconds_bucket[5m])) by (le, service)
                )
              refId: A
          - refId: B
            datasourceUid: __expr__
            model:
              refId: B
              type: reduce
              reducer: last
              expression: A
          - refId: C
            datasourceUid: __expr__
            model:
              refId: C
              type: threshold
              expression: B
              conditions:
                - evaluator:
                    params:
                      - 2  # 2 second threshold
                    type: gt
        for: 5m
        annotations:
          summary: "High latency for {{ $labels.service }}"
          description: |
            P99 latency is {{ $values.B.Value | printf "%.3f" }}s for service {{ $labels.service }}.
        labels:
          severity: warning
          team: platform

      # Low Request Rate (potential outage)
      - uid: low-request-rate
        title: Abnormally Low Request Rate
        condition: C
        data:
          - refId: A
            datasourceUid: prometheus
            model:
              expr: |
                sum(rate(http_requests_total[5m])) by (service)
          - refId: B
            datasourceUid: __expr__
            model:
              type: reduce
              reducer: last
              expression: A
          - refId: C
            datasourceUid: __expr__
            model:
              type: threshold
              expression: B
              conditions:
                - evaluator:
                    params:
                      - 1  # Less than 1 req/s
                    type: lt
        for: 10m
        annotations:
          summary: "Low request rate for {{ $labels.service }}"
          description: |
            Request rate is {{ $values.B.Value | printf "%.2f" }} req/s for {{ $labels.service }}.
            This may indicate an outage or routing issue.
        labels:
          severity: warning
          team: platform

      # Database Connection Pool Exhaustion
      - uid: db-connections-high
        title: Database Connections Near Limit
        condition: C
        data:
          - refId: A
            datasourceUid: prometheus
            model:
              expr: |
                database_connections_active / database_connections_max * 100
          - refId: B
            datasourceUid: __expr__
            model:
              type: reduce
              reducer: last
              expression: A
          - refId: C
            datasourceUid: __expr__
            model:
              type: threshold
              expression: B
              conditions:
                - evaluator:
                    params:
                      - 80  # 80% threshold
                    type: gt
        for: 5m
        annotations:
          summary: "Database connection pool near exhaustion"
          description: |
            {{ $values.B.Value | printf "%.1f" }}% of database connections are in use.
        labels:
          severity: warning
          team: database

Dashboard Provisioning

# grafana/provisioning/dashboards/dashboards.yaml
apiVersion: 1

providers:
  - name: 'default'
    orgId: 1
    folder: 'Services'
    folderUid: 'services'
    type: file
    disableDeletion: false
    updateIntervalSeconds: 30
    allowUiUpdates: true
    options:
      path: /var/lib/grafana/dashboards/services

  - name: 'infrastructure'
    orgId: 1
    folder: 'Infrastructure'
    folderUid: 'infrastructure'
    type: file
    disableDeletion: false
    options:
      path: /var/lib/grafana/dashboards/infrastructure

  - name: 'alerts'
    orgId: 1
    folder: 'Alerts'
    folderUid: 'alerts'
    type: file
    disableDeletion: false
    options:
      path: /var/lib/grafana/dashboards/alerts

Dashboard Design Patterns

# scripts/grafana/dashboard_generator.py
"""
Dashboard generator following design patterns.

@module dashboard_generator
@description Generate consistent Grafana dashboards
@version 1.0.0
@author Tyler Dukes
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import json


class PanelType(Enum):
    STAT = "stat"
    TIMESERIES = "timeseries"
    TABLE = "table"
    GAUGE = "gauge"
    HEATMAP = "heatmap"
    PIECHART = "piechart"
    BARGAUGE = "bargauge"
    TEXT = "text"


@dataclass
class Panel:
    """Dashboard panel configuration."""
    title: str
    type: PanelType
    query: str
    grid_pos: Dict[str, int]
    unit: str = ""
    thresholds: List[Dict[str, Any]] = field(default_factory=list)
    legend_format: str = ""
    description: str = ""


@dataclass
class Row:
    """Dashboard row configuration."""
    title: str
    panels: List[Panel]
    collapsed: bool = False


@dataclass
class DashboardConfig:
    """Dashboard configuration."""
    title: str
    uid: str
    description: str
    tags: List[str]
    refresh: str = "30s"
    rows: List[Row] = field(default_factory=list)
    variables: List[Dict[str, Any]] = field(default_factory=list)


class DashboardGenerator:
    """Generate Grafana dashboards from configuration."""

    # Standard color schemes
    COLORS = {
        "green": "#73BF69",
        "yellow": "#FADE2A",
        "orange": "#FF9830",
        "red": "#F2495C",
        "blue": "#5794F2",
        "purple": "#B877D9",
    }

    # Standard thresholds
    ERROR_RATE_THRESHOLDS = [
        {"color": "green", "value": None},
        {"color": "yellow", "value": 1},
        {"color": "orange", "value": 3},
        {"color": "red", "value": 5},
    ]

    LATENCY_THRESHOLDS = [
        {"color": "green", "value": None},
        {"color": "yellow", "value": 0.5},
        {"color": "orange", "value": 1},
        {"color": "red", "value": 2},
    ]

    def __init__(self, config: DashboardConfig):
        self.config = config
        self._panel_id = 0

    def _next_panel_id(self) -> int:
        self._panel_id += 1
        return self._panel_id

    def generate(self) -> Dict[str, Any]:
        """Generate complete dashboard JSON."""
        dashboard = {
            "annotations": self._generate_annotations(),
            "description": self.config.description,
            "editable": True,
            "gnetId": None,
            "graphTooltip": 1,  # Shared crosshair
            "id": None,
            "links": [],
            "panels": self._generate_panels(),
            "refresh": self.config.refresh,
            "schemaVersion": 38,
            "style": "dark",
            "tags": self.config.tags,
            "templating": {"list": self._generate_variables()},
            "time": {"from": "now-1h", "to": "now"},
            "timepicker": self._generate_timepicker(),
            "timezone": "browser",
            "title": self.config.title,
            "uid": self.config.uid,
            "version": 1,
        }
        return dashboard

    def _generate_annotations(self) -> Dict[str, List]:
        return {
            "list": [
                {
                    "builtIn": 1,
                    "datasource": "-- Grafana --",
                    "enable": True,
                    "hide": True,
                    "iconColor": "rgba(0, 211, 255, 1)",
                    "name": "Annotations & Alerts",
                    "type": "dashboard",
                }
            ]
        }

    def _generate_panels(self) -> List[Dict]:
        panels = []
        y_pos = 0

        for row in self.config.rows:
            # Add row panel
            panels.append({
                "collapsed": row.collapsed,
                "gridPos": {"h": 1, "w": 24, "x": 0, "y": y_pos},
                "id": self._next_panel_id(),
                "panels": [],
                "title": row.title,
                "type": "row",
            })
            y_pos += 1

            # Add panels in row
            for panel in row.panels:
                panels.append(self._generate_panel(panel, y_pos))

            # Calculate next row position
            if row.panels:
                max_height = max(p.grid_pos.get("h", 8) for p in row.panels)
                y_pos += max_height

        return panels

    def _generate_panel(self, panel: Panel, base_y: int) -> Dict:
        grid_pos = panel.grid_pos.copy()
        grid_pos["y"] = base_y

        return {
            "datasource": "${DS_PROMETHEUS}",
            "description": panel.description,
            "fieldConfig": self._generate_field_config(panel),
            "gridPos": grid_pos,
            "id": self._next_panel_id(),
            "options": self._generate_panel_options(panel),
            "pluginVersion": "10.0.0",
            "targets": [
                {
                    "expr": panel.query,
                    "legendFormat": panel.legend_format,
                    "refId": "A",
                }
            ],
            "title": panel.title,
            "type": panel.type.value,
        }

    def _generate_field_config(self, panel: Panel) -> Dict:
        return {
            "defaults": {
                "color": {"mode": "thresholds"},
                "mappings": [],
                "thresholds": {
                    "mode": "absolute",
                    "steps": panel.thresholds or [{"color": "green", "value": None}],
                },
                "unit": panel.unit,
            },
            "overrides": [],
        }

    def _generate_panel_options(self, panel: Panel) -> Dict:
        if panel.type == PanelType.STAT:
            return {
                "colorMode": "value",
                "graphMode": "area",
                "justifyMode": "auto",
                "orientation": "auto",
                "reduceOptions": {
                    "calcs": ["lastNotNull"],
                    "fields": "",
                    "values": False,
                },
                "textMode": "auto",
            }
        elif panel.type == PanelType.TIMESERIES:
            return {
                "legend": {"displayMode": "list", "placement": "bottom"},
                "tooltip": {"mode": "multi", "sort": "desc"},
            }
        return {}

    def _generate_variables(self) -> List[Dict]:
        variables = [
            {
                "current": {},
                "datasource": "${DS_PROMETHEUS}",
                "definition": "label_values(up, service)",
                "hide": 0,
                "includeAll": False,
                "label": "Service",
                "multi": False,
                "name": "service",
                "options": [],
                "query": {"query": "label_values(up, service)"},
                "refresh": 2,
                "sort": 1,
                "type": "query",
            },
            {
                "current": {},
                "datasource": "${DS_PROMETHEUS}",
                "definition": "label_values(up{service=\"$service\"}, environment)",
                "hide": 0,
                "includeAll": True,
                "label": "Environment",
                "multi": True,
                "name": "environment",
                "options": [],
                "query": {"query": "label_values(up{service=\"$service\"}, environment)"},
                "refresh": 2,
                "sort": 1,
                "type": "query",
            },
        ]
        return variables + self.config.variables

    def _generate_timepicker(self) -> Dict:
        return {
            "refresh_intervals": ["5s", "10s", "30s", "1m", "5m", "15m"],
            "time_options": ["5m", "15m", "1h", "6h", "12h", "24h", "2d", "7d"],
        }


def create_service_dashboard(service_name: str) -> Dict:
    """Create a standard service dashboard."""
    config = DashboardConfig(
        title=f"{service_name} Service Dashboard",
        uid=f"{service_name.lower()}-service",
        description=f"Overview dashboard for {service_name} service",
        tags=["service", service_name.lower(), "prometheus"],
        rows=[
            Row(
                title="Overview",
                panels=[
                    Panel(
                        title="Request Rate",
                        type=PanelType.STAT,
                        query=f'sum(rate(http_requests_total{{service="{service_name}"}}[$__rate_interval]))',
                        grid_pos={"h": 4, "w": 6, "x": 0},
                        unit="reqps",
                        legend_format="Requests/sec",
                    ),
                    Panel(
                        title="Error Rate",
                        type=PanelType.STAT,
                        query=(
                            f'sum(rate(http_requests_total{{service="{service_name}",'
                            f' status_code=~"5.."}}[$__rate_interval])) / '
                            f'sum(rate(http_requests_total{{service="{service_name}"}}[$__rate_interval])) * 100'
                        ),
                        grid_pos={"h": 4, "w": 6, "x": 6},
                        unit="percent",
                        thresholds=DashboardGenerator.ERROR_RATE_THRESHOLDS,
                        legend_format="Error Rate",
                    ),
                    Panel(
                        title="P99 Latency",
                        type=PanelType.STAT,
                        query=f'histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket{{service="{service_name}"}}[$__rate_interval])) by (le))',
                        grid_pos={"h": 4, "w": 6, "x": 12},
                        unit="s",
                        thresholds=DashboardGenerator.LATENCY_THRESHOLDS,
                        legend_format="P99",
                    ),
                ],
            ),
            Row(
                title="Request Metrics",
                panels=[
                    Panel(
                        title="Requests per Second",
                        type=PanelType.TIMESERIES,
                        query=f'sum(rate(http_requests_total{{service="{service_name}"}}[$__rate_interval])) by (endpoint)',
                        grid_pos={"h": 8, "w": 12, "x": 0},
                        unit="reqps",
                        legend_format="{{endpoint}}",
                    ),
                    Panel(
                        title="Request Duration",
                        type=PanelType.HEATMAP,
                        query=f'sum(rate(http_request_duration_seconds_bucket{{service="{service_name}"}}[$__rate_interval])) by (le)',
                        grid_pos={"h": 8, "w": 12, "x": 12},
                        unit="s",
                    ),
                ],
            ),
        ],
    )

    generator = DashboardGenerator(config)
    return generator.generate()


if __name__ == "__main__":
    dashboard = create_service_dashboard("api-gateway")
    print(json.dumps(dashboard, indent=2))

Dashboard Versioning

# grafana/dashboards/README.md
# Dashboard Version Management

## Versioning Strategy

1. **Git-based versioning**: All dashboards stored as JSON in version control
2. **Semantic versioning**: Major.Minor.Patch for dashboard changes
   - Major: Breaking changes (removed panels, renamed variables)
   - Minor: New panels or features
   - Patch: Bug fixes, query optimizations

## Directory Structure

```text
grafana/dashboards/
├── services/
│   ├── api-gateway.json
│   ├── user-service.json
│   └── payment-service.json
├── infrastructure/
│   ├── kubernetes.json
│   ├── database.json
│   └── cache.json
├── alerts/
│   └── alert-overview.json
└── README.md

Change Process

  1. Export dashboard from Grafana UI
  2. Run ./scripts/format-dashboard.sh <file> to normalize JSON
  3. Update version in dashboard JSON
  4. Commit with message: dashboard(<name>): <description>
  5. CI/CD automatically syncs to Grafana

Rollback

# Rollback to previous version
git checkout HEAD~1 -- grafana/dashboards/services/api-gateway.json
kubectl apply -k grafana/

CI/CD Integration

GitHub Actions Observability Validation

name: Observability Validation

on:
  push:
    paths:
      - 'src/observability/**'
      - 'src/middleware/**'
  pull_request:
    paths:
      - 'src/observability/**'
      - 'src/middleware/**'

jobs:
  validate-observability:
    runs-on: ubuntu-latest

    services:
      jaeger:
        image: jaegertracing/all-in-one:latest
        ports:
          - 6831:6831/udp
          - 16686:16686
          - 4317:4317

    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install -e .[observability,test]

      - name: Validate tracing configuration
        env:
          OTLP_ENDPOINT: localhost:4317
        run: |
          python -c "
          from src.observability.tracing import configure_tracer
          tracer = configure_tracer('test-service', '1.0.0')
          print('Tracing configuration valid')
          "

      - name: Validate logging configuration
        run: |
          python -c "
          from src.observability.logging import configure_logging
          logger = configure_logging(level='DEBUG', json_format=True)
          logger.info('test_event', key='value')
          print('Logging configuration valid')
          "

      - name: Run observability tests
        run: pytest tests/observability/ -v

      - name: Check log format compliance
        run: |
          python scripts/validate_log_format.py

  validate-metrics:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install -e .[observability,test]

      - name: Validate metrics endpoint
        run: |
          python -c "
          from src.observability.metrics import get_metrics, set_service_info
          set_service_info('test', '1.0.0', 'test')
          metrics, content_type = get_metrics()
          assert 'service_info' in metrics.decode()
          print('Metrics configuration valid')
          "

Docker Compose for Local Observability

# docker-compose.observability.yml
version: '3.8'

services:
  jaeger:
    image: jaegertracing/all-in-one:latest
    environment:
      COLLECTOR_OTLP_ENABLED: 'true'
    ports:
      - '6831:6831/udp'
      - '16686:16686'
      - '4317:4317'
      - '4318:4318'

  prometheus:
    image: prom/prometheus:latest
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - '9090:9090'
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'

  grafana:
    image: grafana/grafana:latest
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
      GF_AUTH_ANONYMOUS_ENABLED: 'true'
    volumes:
      - ./grafana/provisioning:/etc/grafana/provisioning
    ports:
      - '3000:3000'
    depends_on:
      - prometheus
      - jaeger
      - loki

  loki:
    image: grafana/loki:latest
    ports:
      - '3100:3100'
    command: -config.file=/etc/loki/local-config.yaml

  promtail:
    image: grafana/promtail:latest
    volumes:
      - ./promtail-config.yml:/etc/promtail/config.yml
      - /var/log:/var/log
    command: -config.file=/etc/promtail/config.yml

  otel-collector:
    image: otel/opentelemetry-collector-contrib:latest
    volumes:
      - ./otel-collector-config.yml:/etc/otel-collector-config.yml
    command: ['--config=/etc/otel-collector-config.yml']
    ports:
      - '4317:4317'
      - '4318:4318'
      - '8888:8888'

Best Practices

Observability Checklist

Distributed Tracing:
✅ Use OpenTelemetry for vendor-neutral instrumentation
✅ Propagate trace context across service boundaries
✅ Use semantic span naming conventions
✅ Set appropriate sampling rates per environment
✅ Add relevant attributes to spans
✅ Record exceptions with stack traces

Structured Logging:
✅ Use JSON format in production
✅ Include trace/span IDs in logs
✅ Follow consistent field naming
✅ Use appropriate log levels
✅ Include request context (correlation ID, user ID)
✅ Avoid logging sensitive data

Log Aggregation:
✅ Configure retention policies
✅ Create useful dashboards
✅ Set up alerting on error rates
✅ Index searchable fields
✅ Document query patterns

Correlation:
✅ Generate/propagate correlation IDs
✅ Link traces, logs, and metrics
✅ Include correlation in error reports
✅ Propagate context to async tasks

Anti-Patterns to Avoid

# DON'T: Log without context
logger.error("Something went wrong")

# DO: Include relevant context
logger.error(
    "order_creation_failed",
    order_id=order_id,
    user_id=user_id,
    error_type=type(e).__name__,
    error_message=str(e),
)

# DON'T: Create spans for trivial operations
with tracer.start_as_current_span("get_variable"):
    x = self.config.get("key")

# DO: Create spans for meaningful operations
with tracer.start_as_current_span("fetch_user_preferences"):
    prefs = await self.preferences_service.get(user_id)

# DON'T: Log sensitive data
logger.info("user_login", password=password, ssn=ssn)

# DO: Redact or omit sensitive data
logger.info("user_login", user_id=user.id, email=mask_email(user.email))

# DON'T: Use print statements
print(f"Processing order {order_id}")

# DO: Use structured logging
logger.info("processing_order", order_id=order_id)

# DON'T: Ignore errors silently
try:
    process_order(order)
except Exception:
    pass

# DO: Log and track errors
try:
    process_order(order)
except Exception as e:
    logger.error("order_processing_failed", order_id=order.id, exc_info=True)
    capture_exception_with_context(e, order_id=order.id)
    raise

Resources

Tracing and Instrumentation

Metrics and Monitoring

Dashboards and Visualization

Logging

Error Monitoring and APM


Next Steps: