From Data Engineer to AI Engineer — Part 6: Production AI — From Notebook to Live System
- Published on
- -9 mins read
- Authors
- Name
- Vijay Anand Pandian
- @vijayanandrp
Series: From Data/Software Engineer to AI Engineer Part 6 of 7 — ← Part 5: AI Agents
The Gap Between Notebook and Production
Every data engineer has seen this: a model that works perfectly in a Jupyter notebook fails in unpredictable ways once it hits real traffic.
AI systems have the same problem — multiplied by the probabilistic nature of LLMs.
In production, your AI system needs to:
- Handle 100 concurrent requests without slowing down
- Detect when the model starts behaving differently
- Track every decision for audit and debugging
- Control costs that scale with usage
- Recover gracefully from failures
- Allow you to roll back a bad prompt update
This is LLMOps — the operational discipline for language model systems.
The Production Architecture
┌─────────────────────────────┐User Request │ API Gateway │ ← Rate limiting, auth ↓ │ (FastAPI / Azure APIM) │ └──────────────┬──────────────┘ ↓ ┌─────────────────────────────┐ │ Input Validation │ ← PII check, length, injection └──────────────┬──────────────┘ ↓ ┌─────────────────────────────┐ │ Prompt Cache │ ← Exact + semantic cache └──────────────┬──────────────┘ ↓ cache miss ┌─────────────────────────────┐ │ RAG / Agent Logic │ ← Your AI core └──────────────┬──────────────┘ ↓ ┌─────────────────────────────┐ │ Output Validation │ ← Schema, confidence, safety └──────────────┬──────────────┘ ↓ ┌─────────────────────────────┐ │ Audit Logger │ ← Every inference logged └──────────────┬──────────────┘ ↓ Response
┌─────────────────────────────┐Background │ Monitoring Dashboard │ ← Latency, cost, drift └─────────────────────────────┘Step 1: Wrap Your AI Logic in a Proper API
The first step to production is a clean API around your AI logic.
# pip install fastapi uvicorn pydantic
from fastapi import FastAPI, HTTPException, Dependsfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentialsfrom pydantic import BaseModel, Fieldimport timeimport uuidimport logging
app = FastAPI(title="AI Service", version="1.0.0")security = HTTPBearer()logger = logging.getLogger(__name__)
class QueryRequest(BaseModel): question: str = Field(..., min_length=1, max_length=2000) user_id: str session_id: str | None = None
class QueryResponse(BaseModel): answer: str sources: list[str] confidence: float request_id: str latency_ms: int
@app.post("/query", response_model=QueryResponse)async def query( request: QueryRequest, credentials: HTTPAuthorizationCredentials = Depends(security)): # Auth check if credentials.credentials != "your-api-key": raise HTTPException(status_code=401, detail="Invalid API key") request_id = str(uuid.uuid4()) start_time = time.time() try: # Your RAG or agent logic here result = rag_query(request.question) latency_ms = int((time.time() - start_time) * 1000) # Log for monitoring logger.info({ "request_id": request_id, "user_id": request.user_id, "latency_ms": latency_ms, "confidence": result["confidence"], "question_length": len(request.question) }) return QueryResponse( answer=result["answer"], sources=result["sources"], confidence=result["confidence"], request_id=request_id, latency_ms=latency_ms ) except Exception as e: logger.error(f"Request {request_id} failed: {e}") raise HTTPException(status_code=500, detail="Internal error — please try again")
# Run: uvicorn app:app --host 0.0.0.0 --port 8000Step 2: Experiment Tracking with MLflow
Before you can manage model versions, you need to track experiments. MLflow is the industry standard.
pip install mlflow
import mlflowimport mlflow.pyfunc
# ── Track an experiment run ───────────────────────────────mlflow.set_tracking_uri("http://localhost:5000") # or Azure ML URImlflow.set_experiment("rag-qa-system-v2")
with mlflow.start_run(run_name="chunk-size-512-reranker-on"): # Log your configuration mlflow.log_params({ "chunk_size": 512, "chunk_overlap": 64, "embedding_model": "all-MiniLM-L6-v2", "llm": "claude-sonnet-4-6", "top_k": 4, "reranker": "cross-encoder/ms-marco-MiniLM-L-6-v2", "temperature": 0.1, }) # Evaluate on your golden dataset results = evaluate_on_golden_dataset() # Log your metrics mlflow.log_metrics({ "faithfulness": results["faithfulness"], "answer_relevancy": results["answer_relevancy"], "context_precision": results["context_precision"], "avg_latency_ms": results["avg_latency_ms"], "avg_cost_usd": results["avg_cost_usd"], }) # Log your prompt as an artifact (version control for prompts) with open("system_prompt.txt", "w") as f: f.write(SYSTEM_PROMPT) mlflow.log_artifact("system_prompt.txt") print(f"Run ID: {mlflow.active_run().info.run_id}")Now when you change your chunk size, embedding model, or system prompt, you have a record of what changed and what effect it had. This is the difference between "it worked better after I changed something" and "chunk size 512 improved faithfulness by 8% vs 256."
Step 3: Monitoring — The Three Layers
You need to monitor at three levels simultaneously.
Layer 1: Infrastructure (Is it up?)
import psutilimport prometheus_client as prom
# Metrics that Prometheus scrapes every 15 secondsREQUEST_COUNT = prom.Counter("ai_requests_total", "Total requests", ["status"])REQUEST_LATENCY = prom.Histogram("ai_request_duration_ms", "Request latency", buckets=[100, 250, 500, 1000, 2000, 5000])TOKEN_USAGE = prom.Counter("ai_tokens_total", "Total tokens used", ["type"])
@app.middleware("http")async def metrics_middleware(request, call_next): start = time.time() response = await call_next(request) duration_ms = (time.time() - start) * 1000 status = "success" if response.status_code < 400 else "error" REQUEST_COUNT.labels(status=status).inc() REQUEST_LATENCY.observe(duration_ms) return responseLayer 2: Model Quality (Is it answering well?)
Run your golden dataset evaluation on a schedule:
import schedule
def weekly_quality_check(): results = evaluate_on_golden_dataset() # Alert if quality drops if results["faithfulness"] < 0.85: send_alert( f"⚠️ Faithfulness dropped to {results['faithfulness']:.2f} " f"(threshold: 0.85). Check for model drift or document changes." ) if results["avg_latency_ms"] > 3000: send_alert(f"⚠️ Avg latency {results['avg_latency_ms']}ms — above 3s threshold") # Log to MLflow for trending with mlflow.start_run(run_name=f"weekly-eval-{today}"): mlflow.log_metrics(results)
schedule.every().monday.at("08:00").do(weekly_quality_check)Layer 3: Business Metrics (Is it delivering value?)
# Track what actually matters to the businessdef log_user_feedback(request_id: str, feedback: str, rating: int): """Called when user thumbs up/down or provides feedback.""" record = { "request_id": request_id, "feedback": feedback, "rating": rating, # 1-5 "timestamp": datetime.utcnow().isoformat() } # Save to your database for analysis save_to_db(record)
# Weekly: calculate % of queries that got positive feedback# This is your real ground truth — not RAGAS scoresStep 4: Cost Control
LLM costs scale with tokens. Uncontrolled, they can surprise you.
# Track tokens per requestclass TokenTracker: def __init__(self, daily_budget_usd: float = 50.0): self.daily_budget_usd = daily_budget_usd self.today_cost = 0.0 def record_usage(self, input_tokens: int, output_tokens: int, model: str): # Claude Sonnet pricing (check current pricing) costs = { "claude-sonnet-4-6": {"input": 0.000003, "output": 0.000015}, "claude-haiku-4-5": {"input": 0.00000025, "output": 0.00000125}, } price = costs.get(model, costs["claude-sonnet-4-6"]) cost = (input_tokens * price["input"]) + (output_tokens * price["output"]) self.today_cost += cost if self.today_cost > self.daily_budget_usd * 0.8: send_alert(f"⚠️ 80% of daily AI budget used: ${self.today_cost:.2f}") return cost
tracker = TokenTracker(daily_budget_usd=50.0)
# After each LLM call:response = client.messages.create(...)cost = tracker.record_usage( response.usage.input_tokens, response.usage.output_tokens, "claude-sonnet-4-6")Cost optimisation strategies (in order of impact):
- Cache repeated queries — identical questions should not re-hit the LLM
- Use a smaller model for simple tasks — Haiku for classification, Sonnet for reasoning
- Compress your prompts — every redundant sentence costs money at scale
- Reduce output tokens — set
max_tokensappropriately, instruct the model to be concise - Batch where possible — combine multiple small requests into one
# Simple exact-match cachefrom functools import lru_cacheimport hashlib
@lru_cache(maxsize=1000)def cached_rag_query(question_hash: str, question: str) -> str: return rag_query(question)["answer"]
def smart_query(question: str) -> str: # Hash the question for cache key q_hash = hashlib.md5(question.lower().strip().encode()).hexdigest() return cached_rag_query(q_hash, question)Step 5: Versioning Everything
In traditional software, you version your code. In AI systems, you need to version:
| Artefact | Why | Tool |
|---|---|---|
| Code | Obvious | Git |
| Prompts | A prompt change can break everything | Git (prompt files) + MLflow |
| Models | Different versions behave differently | MLflow Model Registry |
| Embedding index | Document updates change retrieval | Version your vector DB |
| Golden dataset | Your evaluation benchmark | Git (JSON/CSV files) |
# Version your prompts as code — never hardcode them inline# prompts/v2/system_prompt.txt
PROMPT_VERSION = "v2.3"
with open(f"prompts/{PROMPT_VERSION}/system_prompt.txt") as f: SYSTEM_PROMPT = f.read()
# Log which version was used with every requestlogger.info({"prompt_version": PROMPT_VERSION, "request_id": request_id})Step 6: Handling Failures Gracefully
LLMs fail. APIs have outages. Your system needs to handle this without crashing.
import anthropicfrom tenacity import retry, stop_after_attempt, wait_exponential
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))def call_llm_with_retry(messages: list, system: str) -> str: """Retry on transient failures with exponential backoff.""" response = client.messages.create( model="claude-sonnet-4-6", max_tokens=512, system=system, messages=messages ) return response.content[0].text
def safe_rag_query(question: str) -> dict: """Production RAG with full error handling.""" try: return rag_query(question) except anthropic.RateLimitError: logger.warning("Rate limit hit — returning fallback") return { "answer": "Our AI service is temporarily busy. Please try again in a moment.", "sources": [], "confidence": 0.0, "fallback": True } except anthropic.APIError as e: logger.error(f"API error: {e}") return { "answer": "Service temporarily unavailable. Your query has been logged for manual review.", "sources": [], "confidence": 0.0, "fallback": True } except Exception as e: logger.error(f"Unexpected error: {e}", exc_info=True) raise # Re-raise for FastAPI to handleThe Deployment Checklist
Before you push to production, tick every item:
Infrastructure ☐ API wrapped in FastAPI/Flask with proper error handling ☐ Authentication on all endpoints ☐ Rate limiting (per user and global) ☐ Health check endpoint (/health) ☐ Retry logic with exponential backoff
Observability ☐ Structured logging on every request (request_id, user_id, latency, tokens) ☐ Prometheus metrics (request count, latency histogram, error rate) ☐ Alerts: latency > 3s, error rate > 1%, cost > budget threshold ☐ Weekly evaluation job against golden dataset
Reliability ☐ Graceful fallback when LLM is unavailable ☐ max_tokens set on every LLM call (prevents runaway costs) ☐ Request timeout configured ☐ Circuit breaker for LLM API calls
Governance (covered in Part 7) ☐ PII masking on all inputs ☐ Audit log (every inference persisted) ☐ Confidence threshold with fallback behaviour ☐ Content safety filteringWhat Good Looks Like
A production AI system at steady state:
- Latency p95 < 2 seconds for RAG queries
- Cost < £0.01 per query for typical enterprise use
- Faithfulness > 0.90 on weekly golden dataset eval
- Error rate < 0.5% (mostly LLM API transient failures)
- Zero PII in audit logs (masked before logging)
- Every inference traceable via request_id from API call to LLM call to response
Summary
| Concern | Tool / Pattern |
|---|---|
| API layer | FastAPI + Pydantic validation |
| Experiment tracking | MLflow |
| Prompt versioning | Git + prompt files |
| Monitoring | Prometheus + Grafana + structured logs |
| Cost control | Token tracking + caching + model selection |
| Reliability | Retry + fallback + circuit breaker |
| Quality | Weekly golden dataset eval + RAGAS |
Next: Part 7 — AI Governance: Building AI Systems You Can Trust
In Part 7, the final part, we cover the governance layer — PII masking, audit logging, responsible AI, and the EU AI Act — the work that separates responsible engineers from the rest.