Monitoring & Scaling: Production-Grade LLM Operations
Published: November 2025 | 45 min read | Code on GitHub
Building a Robust and Scalable LLM Triage System
In this final installment, we'll implement comprehensive monitoring, alerting, and scaling strategies to ensure our LLM-powered triage system is production-ready and can handle real-world workloads.
System Architecture
graph TD
A[Services] -->|Metrics| B[Prometheus]
A -->|Logs| C[Loki]
A -->|Traces| D[OpenTelemetry]
B --> E[Grafana]
C --> E
D --> E
E --> F[Alerts]
subgraph "Monitoring Stack"
B
C
D
E
end
style A fill:#8f8,stroke:#333
style B fill:#e6522c,stroke:#333
style C fill:#f7a41d,stroke:#333
style D fill:#5a4fcf,stroke:#333
style E fill:#f15a29,stroke:#333
style F fill:#f88,stroke:#333
1. Performance Monitoring
1.1 Prometheus Metrics
from prometheus_client import start_http_server, Counter, Histogram, Gauge
import time
# Define metrics
REQUESTS = Counter(
'llm_requests_total',
'Total number of LLM requests',
['model', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'llm_request_duration_seconds',
'LLM request duration in seconds',
['model', 'endpoint']
)
TOKENS_USED = Counter(
'llm_tokens_used_total',
'Total number of tokens used',
['model', 'type'] # type: 'input' or 'output'
)
EMBEDDING_DIMENSIONS = Gauge(
'embedding_dimensions',
'Dimensionality of embeddings',
['model']
)
class LLMMonitor:
def __init__(self, model_name: str):
self.model_name = model_name
def track_request(self, endpoint: str):
"""Decorator to track LLM API calls."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
status = 'success'
return result
except Exception as e:
status = 'error'
raise
finally:
duration = time.time() - start_time
# Record metrics
REQUESTS.labels(
model=self.model_name,
endpoint=endpoint,
status=status
).inc()
REQUEST_DURATION.labels(
model=self.model_name,
endpoint=endpoint
).observe(duration)
# Record token usage if available
if hasattr(result, 'usage'):
TOKENS_USED.labels(
model=self.model_name,
type='input'
).inc(result.usage.prompt_tokens)
TOKENS_USED.labels(
model=self.model_name,
type='output'
).inc(result.usage.completion_tokens)
return wrapper
return decorator
1.2 Logging Configuration
import logging
import json
from datetime import datetime
from pythonjsonlogger import jsonlogger
class JsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
if not log_record.get('timestamp'):
now = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['timestamp'] = now
if log_record.get('level'):
log_record['level'] = log_record['level'].upper()
else:
log_record['level'] = record.levelname
log_record['service'] = 'llm-triage'
# Add request context if available
if hasattr(record, 'request_id'):
log_record['request_id'] = record.request_id
if hasattr(record, 'user_id'):
log_record['user_id'] = record.user_id
def setup_logging(level=logging.INFO):
"""Configure structured JSON logging."""
logger = logging.getLogger()
logger.setLevel(level)
# Remove existing handlers
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# Add JSON formatter
formatter = JsonFormatter(
'%(timestamp)s %(level)s %(name)s %(message)s',
timestamp=True
)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
2. Quality Monitoring
2.1 Response Quality Metrics
from dataclasses import dataclass
from typing import List, Dict, Any
import numpy as np
from datetime import datetime
@dataclass
class ResponseQuality:
response_id: str
request_id: str
timestamp: datetime
metrics: Dict[str, float]
class QualityMonitor:
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.metrics_history = []
def record_metrics(self, metrics: Dict[str, float]) -> str:
"""Record quality metrics for a response."""
response_id = str(uuid.uuid4())
quality = ResponseQuality(
response_id=response_id,
request_id=metrics.get('request_id', ''),
timestamp=datetime.utcnow(),
metrics=metrics
)
self.metrics_history.append(quality)
# Keep only the most recent metrics
if len(self.metrics_history) > self.window_size:
self.metrics_history = self.metrics_history[-self.window_size:]
return response_id
def detect_drift(self, metric_name: str, threshold: float = 2.0) -> bool:
"""Detect statistical drift in a metric."""
if not self.metrics_history:
return False
# Get recent values (last 10% of window)
recent_values = [
m.metrics.get(metric_name, 0)
for m in self.metrics_history[-self.window_size // 10:]
]
if not recent_values:
return False
# Compare with historical baseline
historical_values = [
m.metrics.get(metric_name, 0)
for m in self.metrics_history[:-len(recent_values)]
]
if not historical_values:
return False
# Simple z-score based drift detection
recent_mean = np.mean(recent_values)
hist_mean = np.mean(historical_values)
hist_std = np.std(historical_values)
if hist_std == 0:
return False
z_score = abs((recent_mean - hist_mean) / hist_std)
return z_score > threshold
2.2 Feedback Collection
from pydantic import BaseModel
from typing import Optional
from enum import Enum
class FeedbackType(str, Enum):
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
class UserFeedback(BaseModel):
response_id: str
feedback_type: FeedbackType
rating: Optional[int] = None # 1-5 scale
comment: Optional[str] = None
user_id: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class FeedbackCollector:
def __init__(self, db_connection):
self.db = db_connection
async def record_feedback(self, feedback: UserFeedback):
"""Store user feedback in the database."""
query = """
INSERT INTO feedback (
id, response_id, feedback_type, rating,
comment, user_id, metadata, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
"""
await self.db.execute(
query,
str(uuid.uuid4()),
feedback.response_id,
feedback.feedback_type,
feedback.rating,
feedback.comment,
feedback.user_id,
feedback.metadata or {}
)
async def get_feedback_stats(self, time_window: str = '7d'):
"""Get feedback statistics for the given time window."""
query = """
SELECT
feedback_type,
COUNT(*) as count,
AVG(rating) as avg_rating,
COUNT(comment) as has_comment_count
FROM feedback
WHERE created_at > NOW() - $1::interval
GROUP BY feedback_type
"""
return await self.db.fetch(query, time_window)
3. Scaling Strategies
3.1 Horizontal Scaling with Kubernetes
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-triage-api
labels:
app: llm-triage
tier: api
spec:
replicas: 3
selector:
matchLabels:
app: llm-triage
tier: api
template:
metadata:
labels:
app: llm-triage
tier: api
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
spec:
containers:
- name: api
image: your-registry/llm-triage:latest
ports:
- containerPort: 8000
env:
- name: ENVIRONMENT
value: production
- name: LOG_LEVEL
value: INFO
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: llm-triage-api
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: llm-triage-api
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: External
external:
metric:
name: requests_per_second
selector:
matchLabels:
app: llm-triage
target:
type: AverageValue
averageValue: 1000
3.2 Caching Layer
from typing import Optional, TypeVar, Callable, Any
import pickle
import hashlib
import aioredis
T = TypeVar('T')
class CacheManager:
def __init__(self, redis_url: str, ttl: int = 3600):
self.redis = aioredis.from_url(redis_url)
self.ttl = ttl
def _generate_key(self, func: Callable, *args, **kwargs) -> str:
"""Generate a cache key from function and arguments."""
key_parts = [
func.__module__,
func.__name__,
pickle.dumps(args),
pickle.dumps(kwargs)
]
key_bytes = pickle.dumps(key_parts)
return f"cache:{hashlib.md5(key_bytes).hexdigest()}"
async def cached(
self,
func: Callable[..., T],
*args,
**kwargs
) -> T:
"""Decorator to cache function results."""
key = self._generate_key(func, *args, **kwargs)
# Try to get from cache
cached = await self.redis.get(key)
if cached is not None:
return pickle.loads(cached)
# Call the function
result = await func(*args, **kwargs)
# Cache the result
await self.redis.set(
key,
pickle.dumps(result),
ex=self.ttl
)
return result
async def invalidate_pattern(self, pattern: str) -> int:
"""Invalidate cache entries matching a pattern."""
keys = []
async for key in self.redis.scan_iter(match=pattern):
keys.append(key)
if keys:
await self.redis.delete(*keys)
return len(keys)
4. Continuous Improvement
4.1 A/B Testing Framework
from typing import Dict, Any, Optional
import random
from dataclasses import dataclass
@dataclass
class ExperimentVariant:
name: str
weight: float
config: Dict[str, Any]
class Experiment:
def __init__(self, name: str, variants: list[ExperimentVariant]):
self.name = name
self.variants = variants
self._validate_weights()
def _validate_weights(self):
total_weight = sum(v.weight for v in self.variants)
if not (0.99 <= total_weight <= 1.01): # Allow for floating point imprecision
raise ValueError(f"Weights must sum to 1, got {total_weight}")
def assign_variant(self, user_id: str) -> ExperimentVariant:
"""Assign a variant using consistent hashing."""
# Use a hash of the user ID and experiment name for consistency
hash_val = hash(f"{self.name}:{user_id}")
random.seed(hash_val)
# Select variant based on weights
rand = random.random()
cumulative_weight = 0.0
for variant in self.variants:
cumulative_weight += variant.weight
if rand <= cumulative_weight:
return variant
return self.variants[-1] # Fallback
# Example usage
model_experiment = Experiment(
name="llm_model_ab_test",
variants=[
ExperimentVariant(
name="gpt-4",
weight=0.5,
config={"model": "gpt-4-1106-preview", "temperature": 0.7}
),
ExperimentVariant(
name="claude-2",
weight=0.3,
config={"model": "anthropic/claude-2", "temperature": 0.7}
),
ExperimentVariant(
name="llama2-70b",
weight=0.2,
config={"model": "meta-llama/Llama-2-70b-chat-hf", "temperature": 0.8}
)
]
)
5. Incident Response
5.1 Alert Manager Configuration
# alertmanager/config.yml
global:
resolve_timeout: 5m
slack_api_url: 'https://hooks.slack.com/services/...'
route:
group_by: ['alertname', 'service']
group_wait: 30s
group_interval: 5m
repeat_interval: 3h
receiver: 'slack-notifications'
routes:
- match:
severity: 'critical'
receiver: 'on-call-sms'
receivers:
- name: 'slack-notifications'
slack_configs:
- channel: '#alerts'
send_resolved: true
title: '{{ template "slack.default.title" . }}'
text: '{{ template "slack.default.text" . }}'
- name: 'on-call-sms'
webhook_configs:
- url: 'https://api.pagerduty.com/integration/...'
send_resolved: true
templates:
- '/etc/alertmanager/templates/*.tmpl'
6. Final Architecture
graph TD
A[Client] -->|HTTP| B[API Gateway]
B -->|Load Balance| C[API Instances]
C --> D[LLM Service]
C --> E[Vector DB]
C --> F[Feature Store]
C --> G[Cache]
H[Monitoring] -->|Metrics| I[Prometheus]
H -->|Logs| J[Loki]
H -->|Traces| K[Tempo]
L[Alert Manager] --> M[Slack]
L --> N[PagerDuty]
O[CI/CD] -->|Deploy| P[Kubernetes]
style A fill:#9f9,stroke:#333
style B fill:#99f,stroke:#333
style C fill:#f99,stroke:#333
style D fill:#f9f,stroke:#333
style E fill:#9ff,stroke:#333
style F fill:#ff9,stroke:#333
style G fill:#f9f,stroke:#333
style H fill:#ccc,stroke:#333
style I fill:#e6522c,stroke:#333
style J fill:#f7a41d,stroke:#333
style K fill:#5a4fcf,stroke:#333
style L fill:#f88,stroke:#333
style M fill:#4a154b,stroke:#333
style N fill:#08a5e1,stroke:#333
style O fill:#9c0,stroke:#333
style P fill:#326ce5,stroke:#333
7. Conclusion
In this comprehensive series, we've built a production-grade LLM-powered healthcare triage system from the ground up. Here's a quick recap of what we've covered:
- Problem Space & Architecture - Defined the requirements and high-level design
- Data Pipeline - Implemented robust data ingestion, transformation, and storage
- Vector Search - Built semantic search capabilities with Qdrant
- LLM Integration - Added RAG for accurate, context-aware responses
- Monitoring & Scaling - Ensured reliability and performance at scale