From Data Engineer to AI Engineer — Part 6: Production AI — From Notebook to Live System

Published on
-
9 mins read
Authors

Series: From Data/Software Engineer to AI Engineer Part 6 of 7 — ← Part 5: AI Agents


The Gap Between Notebook and Production

Every data engineer has seen this: a model that works perfectly in a Jupyter notebook fails in unpredictable ways once it hits real traffic.

AI systems have the same problem — multiplied by the probabilistic nature of LLMs.

In production, your AI system needs to:

  • Handle 100 concurrent requests without slowing down
  • Detect when the model starts behaving differently
  • Track every decision for audit and debugging
  • Control costs that scale with usage
  • Recover gracefully from failures
  • Allow you to roll back a bad prompt update

This is LLMOps — the operational discipline for language model systems.


The Production Architecture

┌─────────────────────────────┐
User Request │ API Gateway │ ← Rate limiting, auth
↓ │ (FastAPI / Azure APIM) │
└──────────────┬──────────────┘
┌─────────────────────────────┐
│ Input Validation │ ← PII check, length, injection
└──────────────┬──────────────┘
┌─────────────────────────────┐
│ Prompt Cache │ ← Exact + semantic cache
└──────────────┬──────────────┘
↓ cache miss
┌─────────────────────────────┐
│ RAG / Agent Logic │ ← Your AI core
└──────────────┬──────────────┘
┌─────────────────────────────┐
│ Output Validation │ ← Schema, confidence, safety
└──────────────┬──────────────┘
┌─────────────────────────────┐
│ Audit Logger │ ← Every inference logged
└──────────────┬──────────────┘
Response
┌─────────────────────────────┐
Background │ Monitoring Dashboard │ ← Latency, cost, drift
└─────────────────────────────┘

Step 1: Wrap Your AI Logic in a Proper API

The first step to production is a clean API around your AI logic.

# pip install fastapi uvicorn pydantic
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
import time
import uuid
import logging
app = FastAPI(title="AI Service", version="1.0.0")
security = HTTPBearer()
logger = logging.getLogger(__name__)
class QueryRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=2000)
user_id: str
session_id: str | None = None
class QueryResponse(BaseModel):
answer: str
sources: list[str]
confidence: float
request_id: str
latency_ms: int
@app.post("/query", response_model=QueryResponse)
async def query(
request: QueryRequest,
credentials: HTTPAuthorizationCredentials = Depends(security)
):
# Auth check
if credentials.credentials != "your-api-key":
raise HTTPException(status_code=401, detail="Invalid API key")
request_id = str(uuid.uuid4())
start_time = time.time()
try:
# Your RAG or agent logic here
result = rag_query(request.question)
latency_ms = int((time.time() - start_time) * 1000)
# Log for monitoring
logger.info({
"request_id": request_id,
"user_id": request.user_id,
"latency_ms": latency_ms,
"confidence": result["confidence"],
"question_length": len(request.question)
})
return QueryResponse(
answer=result["answer"],
sources=result["sources"],
confidence=result["confidence"],
request_id=request_id,
latency_ms=latency_ms
)
except Exception as e:
logger.error(f"Request {request_id} failed: {e}")
raise HTTPException(status_code=500, detail="Internal error — please try again")
# Run: uvicorn app:app --host 0.0.0.0 --port 8000

Step 2: Experiment Tracking with MLflow

Before you can manage model versions, you need to track experiments. MLflow is the industry standard.

pip install mlflow
import mlflow
import mlflow.pyfunc
# ── Track an experiment run ───────────────────────────────
mlflow.set_tracking_uri("http://localhost:5000") # or Azure ML URI
mlflow.set_experiment("rag-qa-system-v2")
with mlflow.start_run(run_name="chunk-size-512-reranker-on"):
# Log your configuration
mlflow.log_params({
"chunk_size": 512,
"chunk_overlap": 64,
"embedding_model": "all-MiniLM-L6-v2",
"llm": "claude-sonnet-4-6",
"top_k": 4,
"reranker": "cross-encoder/ms-marco-MiniLM-L-6-v2",
"temperature": 0.1,
})
# Evaluate on your golden dataset
results = evaluate_on_golden_dataset()
# Log your metrics
mlflow.log_metrics({
"faithfulness": results["faithfulness"],
"answer_relevancy": results["answer_relevancy"],
"context_precision": results["context_precision"],
"avg_latency_ms": results["avg_latency_ms"],
"avg_cost_usd": results["avg_cost_usd"],
})
# Log your prompt as an artifact (version control for prompts)
with open("system_prompt.txt", "w") as f:
f.write(SYSTEM_PROMPT)
mlflow.log_artifact("system_prompt.txt")
print(f"Run ID: {mlflow.active_run().info.run_id}")

Now when you change your chunk size, embedding model, or system prompt, you have a record of what changed and what effect it had. This is the difference between "it worked better after I changed something" and "chunk size 512 improved faithfulness by 8% vs 256."


Step 3: Monitoring — The Three Layers

You need to monitor at three levels simultaneously.

Layer 1: Infrastructure (Is it up?)

import psutil
import prometheus_client as prom
# Metrics that Prometheus scrapes every 15 seconds
REQUEST_COUNT = prom.Counter("ai_requests_total", "Total requests", ["status"])
REQUEST_LATENCY = prom.Histogram("ai_request_duration_ms", "Request latency",
buckets=[100, 250, 500, 1000, 2000, 5000])
TOKEN_USAGE = prom.Counter("ai_tokens_total", "Total tokens used", ["type"])
@app.middleware("http")
async def metrics_middleware(request, call_next):
start = time.time()
response = await call_next(request)
duration_ms = (time.time() - start) * 1000
status = "success" if response.status_code < 400 else "error"
REQUEST_COUNT.labels(status=status).inc()
REQUEST_LATENCY.observe(duration_ms)
return response

Layer 2: Model Quality (Is it answering well?)

Run your golden dataset evaluation on a schedule:

import schedule
def weekly_quality_check():
results = evaluate_on_golden_dataset()
# Alert if quality drops
if results["faithfulness"] < 0.85:
send_alert(
f"⚠️ Faithfulness dropped to {results['faithfulness']:.2f} "
f"(threshold: 0.85). Check for model drift or document changes."
)
if results["avg_latency_ms"] > 3000:
send_alert(f"⚠️ Avg latency {results['avg_latency_ms']}ms — above 3s threshold")
# Log to MLflow for trending
with mlflow.start_run(run_name=f"weekly-eval-{today}"):
mlflow.log_metrics(results)
schedule.every().monday.at("08:00").do(weekly_quality_check)

Layer 3: Business Metrics (Is it delivering value?)

# Track what actually matters to the business
def log_user_feedback(request_id: str, feedback: str, rating: int):
"""Called when user thumbs up/down or provides feedback."""
record = {
"request_id": request_id,
"feedback": feedback,
"rating": rating, # 1-5
"timestamp": datetime.utcnow().isoformat()
}
# Save to your database for analysis
save_to_db(record)
# Weekly: calculate % of queries that got positive feedback
# This is your real ground truth — not RAGAS scores

Step 4: Cost Control

LLM costs scale with tokens. Uncontrolled, they can surprise you.

# Track tokens per request
class TokenTracker:
def __init__(self, daily_budget_usd: float = 50.0):
self.daily_budget_usd = daily_budget_usd
self.today_cost = 0.0
def record_usage(self, input_tokens: int, output_tokens: int, model: str):
# Claude Sonnet pricing (check current pricing)
costs = {
"claude-sonnet-4-6": {"input": 0.000003, "output": 0.000015},
"claude-haiku-4-5": {"input": 0.00000025, "output": 0.00000125},
}
price = costs.get(model, costs["claude-sonnet-4-6"])
cost = (input_tokens * price["input"]) + (output_tokens * price["output"])
self.today_cost += cost
if self.today_cost > self.daily_budget_usd * 0.8:
send_alert(f"⚠️ 80% of daily AI budget used: ${self.today_cost:.2f}")
return cost
tracker = TokenTracker(daily_budget_usd=50.0)
# After each LLM call:
response = client.messages.create(...)
cost = tracker.record_usage(
response.usage.input_tokens,
response.usage.output_tokens,
"claude-sonnet-4-6"
)

Cost optimisation strategies (in order of impact):

  1. Cache repeated queries — identical questions should not re-hit the LLM
  2. Use a smaller model for simple tasks — Haiku for classification, Sonnet for reasoning
  3. Compress your prompts — every redundant sentence costs money at scale
  4. Reduce output tokens — set max_tokens appropriately, instruct the model to be concise
  5. Batch where possible — combine multiple small requests into one
# Simple exact-match cache
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def cached_rag_query(question_hash: str, question: str) -> str:
return rag_query(question)["answer"]
def smart_query(question: str) -> str:
# Hash the question for cache key
q_hash = hashlib.md5(question.lower().strip().encode()).hexdigest()
return cached_rag_query(q_hash, question)

Step 5: Versioning Everything

In traditional software, you version your code. In AI systems, you need to version:

ArtefactWhyTool
CodeObviousGit
PromptsA prompt change can break everythingGit (prompt files) + MLflow
ModelsDifferent versions behave differentlyMLflow Model Registry
Embedding indexDocument updates change retrievalVersion your vector DB
Golden datasetYour evaluation benchmarkGit (JSON/CSV files)
# Version your prompts as code — never hardcode them inline
# prompts/v2/system_prompt.txt
PROMPT_VERSION = "v2.3"
with open(f"prompts/{PROMPT_VERSION}/system_prompt.txt") as f:
SYSTEM_PROMPT = f.read()
# Log which version was used with every request
logger.info({"prompt_version": PROMPT_VERSION, "request_id": request_id})

Step 6: Handling Failures Gracefully

LLMs fail. APIs have outages. Your system needs to handle this without crashing.

import anthropic
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def call_llm_with_retry(messages: list, system: str) -> str:
"""Retry on transient failures with exponential backoff."""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
system=system,
messages=messages
)
return response.content[0].text
def safe_rag_query(question: str) -> dict:
"""Production RAG with full error handling."""
try:
return rag_query(question)
except anthropic.RateLimitError:
logger.warning("Rate limit hit — returning fallback")
return {
"answer": "Our AI service is temporarily busy. Please try again in a moment.",
"sources": [],
"confidence": 0.0,
"fallback": True
}
except anthropic.APIError as e:
logger.error(f"API error: {e}")
return {
"answer": "Service temporarily unavailable. Your query has been logged for manual review.",
"sources": [],
"confidence": 0.0,
"fallback": True
}
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
raise # Re-raise for FastAPI to handle

The Deployment Checklist

Before you push to production, tick every item:

Infrastructure
☐ API wrapped in FastAPI/Flask with proper error handling
☐ Authentication on all endpoints
☐ Rate limiting (per user and global)
☐ Health check endpoint (/health)
☐ Retry logic with exponential backoff
Observability
☐ Structured logging on every request (request_id, user_id, latency, tokens)
☐ Prometheus metrics (request count, latency histogram, error rate)
☐ Alerts: latency > 3s, error rate > 1%, cost > budget threshold
☐ Weekly evaluation job against golden dataset
Reliability
☐ Graceful fallback when LLM is unavailable
☐ max_tokens set on every LLM call (prevents runaway costs)
☐ Request timeout configured
☐ Circuit breaker for LLM API calls
Governance (covered in Part 7)
☐ PII masking on all inputs
☐ Audit log (every inference persisted)
☐ Confidence threshold with fallback behaviour
☐ Content safety filtering

What Good Looks Like

A production AI system at steady state:

  • Latency p95 < 2 seconds for RAG queries
  • Cost < £0.01 per query for typical enterprise use
  • Faithfulness > 0.90 on weekly golden dataset eval
  • Error rate < 0.5% (mostly LLM API transient failures)
  • Zero PII in audit logs (masked before logging)
  • Every inference traceable via request_id from API call to LLM call to response

Summary

ConcernTool / Pattern
API layerFastAPI + Pydantic validation
Experiment trackingMLflow
Prompt versioningGit + prompt files
MonitoringPrometheus + Grafana + structured logs
Cost controlToken tracking + caching + model selection
ReliabilityRetry + fallback + circuit breaker
QualityWeekly golden dataset eval + RAGAS

Next: Part 7 — AI Governance: Building AI Systems You Can Trust

In Part 7, the final part, we cover the governance layer — PII masking, audit logging, responsible AI, and the EU AI Act — the work that separates responsible engineers from the rest.