Hora de colocar sua IA no ar!
Fazer deploy de modelos de IA em produção envolve muito mais que só código - é sobre criar sistemas robustos, escaláveis e confiáveis.
Nesta aula, você vai aprender a transformar seus projetos locais em serviços profissionais rodando em servidores reais, acessíveis para milhões de usuários!
Para: Máximo controle e flexibilidade
Containerize seu modelo e exponha via API REST profissional.
Para: Deploy rápido e escalável
Use AWS, Google Cloud, Azure para deploy automatizado.
Para: Custo-benefício otimizado
Deploy usando AWS Lambda, Vercel, ou similar.
Para: Baixa latência
Deploy próximo aos usuários finais.
# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
pydantic==2.5.0
transformers==4.35.0
torch==2.1.0
numpy==1.24.3
pandas==2.0.3
python-multipart==0.0.6
aiofiles==23.2.1
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
scikit-learn==1.3.0
pillow==10.1.0
redis==5.0.1
prometheus-client==0.19.0
psutil==5.9.6
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import uvicorn
import torch
import numpy as np
import pandas as pd
from transformers import pipeline
import json
import logging
import time
import asyncio
from datetime import datetime
import os
import redis
from prometheus_client import Counter, Histogram, generate_latest
import psutil
from typing import Optional, List, Dict, Any
import pickle
import joblib
from PIL import Image
import io
# Configuração de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Métricas Prometheus
REQUEST_COUNT = Counter('ai_requests_total', 'Total AI requests', ['model', 'status'])
REQUEST_DURATION = Histogram('ai_request_duration_seconds', 'Request duration')
MODEL_LOAD_TIME = Histogram('model_load_duration_seconds', 'Model loading time')
class AIModelService:
"""Serviço principal para gerenciar modelos de IA"""
def __init__(self):
self.models = {}
self.redis_client = None
self.setup_redis()
self.load_models()
def setup_redis(self):
"""Configura Redis para cache"""
try:
self.redis_client = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6379)),
db=0,
decode_responses=True
)
self.redis_client.ping()
logger.info("Redis conectado com sucesso")
except Exception as e:
logger.warning(f"Redis não disponível: {e}")
self.redis_client = None
@MODEL_LOAD_TIME.time()
def load_models(self):
"""Carrega todos os modelos necessários"""
logger.info("Carregando modelos de IA...")
try:
# Modelo de sentiment analysis
self.models['sentiment'] = pipeline(
"sentiment-analysis",
model="cardiffnlp/twitter-roberta-base-sentiment-latest",
device=0 if torch.cuda.is_available() else -1
)
# Modelo de classificação de texto
self.models['text_classifier'] = pipeline(
"zero-shot-classification",
device=0 if torch.cuda.is_available() else -1
)
# Modelo personalizado (exemplo)
if os.path.exists('custom_model.pkl'):
with open('custom_model.pkl', 'rb') as f:
self.models['custom'] = pickle.load(f)
logger.info(f"Modelos carregados: {list(self.models.keys())}")
except Exception as e:
logger.error(f"Erro ao carregar modelos: {e}")
raise
def get_cached_prediction(self, cache_key: str) -> Optional[Dict]:
"""Busca predição no cache"""
if not self.redis_client:
return None
try:
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
except Exception as e:
logger.warning(f"Erro ao buscar cache: {e}")
return None
def cache_prediction(self, cache_key: str, prediction: Dict, ttl: int = 3600):
"""Armazena predição no cache"""
if not self.redis_client:
return
try:
self.redis_client.setex(
cache_key,
ttl,
json.dumps(prediction)
)
except Exception as e:
logger.warning(f"Erro ao salvar cache: {e}")
async def predict_sentiment(self, text: str) -> Dict:
"""Análise de sentimento com cache"""
cache_key = f"sentiment:{hash(text)}"
# Verificar cache
cached = self.get_cached_prediction(cache_key)
if cached:
cached['from_cache'] = True
return cached
# Fazer predição
start_time = time.time()
try:
result = self.models['sentiment'](text)
prediction = {
'text': text[:100] + '...' if len(text) > 100 else text,
'sentiment': result[0]['label'],
'confidence': round(result[0]['score'], 4),
'processing_time': round(time.time() - start_time, 4),
'from_cache': False,
'timestamp': datetime.now().isoformat()
}
# Salvar no cache
self.cache_prediction(cache_key, prediction)
REQUEST_COUNT.labels(model='sentiment', status='success').inc()
return prediction
except Exception as e:
REQUEST_COUNT.labels(model='sentiment', status='error').inc()
raise HTTPException(status_code=500, detail=f"Erro na predição: {str(e)}")
async def classify_text(self, text: str, categories: List[str]) -> Dict:
"""Classificação de texto zero-shot"""
cache_key = f"classify:{hash(text + str(categories))}"
cached = self.get_cached_prediction(cache_key)
if cached:
cached['from_cache'] = True
return cached
start_time = time.time()
try:
result = self.models['text_classifier'](text, categories)
prediction = {
'text': text[:100] + '...' if len(text) > 100 else text,
'categories': categories,
'predictions': [
{
'label': label,
'confidence': round(score, 4)
}
for label, score in zip(result['labels'], result['scores'])
],
'top_category': result['labels'][0],
'top_confidence': round(result['scores'][0], 4),
'processing_time': round(time.time() - start_time, 4),
'from_cache': False,
'timestamp': datetime.now().isoformat()
}
self.cache_prediction(cache_key, prediction)
REQUEST_COUNT.labels(model='classifier', status='success').inc()
return prediction
except Exception as e:
REQUEST_COUNT.labels(model='classifier', status='error').inc()
raise HTTPException(status_code=500, detail=f"Erro na classificação: {str(e)}")
# Inicializar serviço
ai_service = AIModelService()
# FastAPI App
app = FastAPI(
title="AI Production API",
description="API profissional para modelos de IA em produção",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# CORS Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Em produção, especifique domínios
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Security
security = HTTPBearer()
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verificação básica de token"""
# Em produção, implemente verificação JWT real
if credentials.credentials != os.getenv('API_TOKEN', 'seu_token_secreto'):
raise HTTPException(
status_code=401,
detail="Token inválido"
)
return credentials.credentials
# Modelos Pydantic
class SentimentRequest(BaseModel):
text: str
class Config:
schema_extra = {
"example": {
"text": "Estou muito feliz com os resultados!"
}
}
class ClassificationRequest(BaseModel):
text: str
categories: List[str]
class Config:
schema_extra = {
"example": {
"text": "Este é um produto incrível que recomendo",
"categories": ["positivo", "neutro", "negativo"]
}
}
class BatchRequest(BaseModel):
texts: List[str]
max_concurrent: Optional[int] = 5
class HealthResponse(BaseModel):
status: str
timestamp: str
models_loaded: List[str]
system_info: Dict[str, Any]
# Rotas de Health Check
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Verificação de saúde do serviço"""
system_info = {
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent,
"python_version": os.sys.version,
"torch_cuda_available": torch.cuda.is_available(),
"redis_connected": ai_service.redis_client is not None
}
return HealthResponse(
status="healthy",
timestamp=datetime.now().isoformat(),
models_loaded=list(ai_service.models.keys()),
system_info=system_info
)
@app.get("/metrics")
async def get_metrics():
"""Métricas Prometheus"""
return generate_latest()
# Rotas da API
@app.post("/predict/sentiment")
@REQUEST_DURATION.time()
async def predict_sentiment(
request: SentimentRequest,
token: str = Depends(verify_token)
):
"""Análise de sentimento"""
if not request.text.strip():
raise HTTPException(status_code=400, detail="Texto não pode estar vazio")
return await ai_service.predict_sentiment(request.text)
@app.post("/predict/classify")
@REQUEST_DURATION.time()
async def classify_text(
request: ClassificationRequest,
token: str = Depends(verify_token)
):
"""Classificação de texto"""
if not request.text.strip():
raise HTTPException(status_code=400, detail="Texto não pode estar vazio")
if len(request.categories) < 2:
raise HTTPException(status_code=400, detail="Forneça pelo menos 2 categorias")
return await ai_service.classify_text(request.text, request.categories)
@app.post("/predict/batch/sentiment")
async def batch_sentiment(
request: BatchRequest,
background_tasks: BackgroundTasks,
token: str = Depends(verify_token)
):
"""Análise de sentimento em lote"""
if len(request.texts) > 100:
raise HTTPException(status_code=400, detail="Máximo 100 textos por lote")
# Processar em paralelo com limite
semaphore = asyncio.Semaphore(request.max_concurrent)
async def process_single(text: str) -> Dict:
async with semaphore:
return await ai_service.predict_sentiment(text)
tasks = [process_single(text) for text in request.texts]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Processar resultados
successful = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append({
'index': i,
'text': request.texts[i][:50] + '...',
'error': str(result)
})
else:
successful.append({
'index': i,
'result': result
})
return {
'total_processed': len(request.texts),
'successful': len(successful),
'errors': len(errors),
'results': successful,
'error_details': errors
}
@app.post("/predict/image")
async def analyze_image(
file: UploadFile = File(...),
token: str = Depends(verify_token)
):
"""Análise de imagem (placeholder)"""
if not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="Apenas imagens são aceitas")
# Ler imagem
contents = await file.read()
image = Image.open(io.BytesIO(contents))
# Placeholder para análise real
# Em produção, use modelos como CLIP, ResNet, etc.
return {
'filename': file.filename,
'size': image.size,
'format': image.format,
'mode': image.mode,
'analysis': 'Análise de imagem não implementada nesta versão',
'timestamp': datetime.now().isoformat()
}
# Rota de monitoramento avançado
@app.get("/admin/stats")
async def get_stats(token: str = Depends(verify_token)):
"""Estatísticas avançadas do sistema"""
stats = {
'uptime': time.time() - startup_time,
'requests_processed': sum([
REQUEST_COUNT.labels(model=m, status=s)._value._value
for m in ['sentiment', 'classifier']
for s in ['success', 'error']
]),
'cache_stats': {},
'model_info': {},
'system_resources': {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'load_average': os.getloadavg() if hasattr(os, 'getloadavg') else [0, 0, 0]
}
}
# Cache stats
if ai_service.redis_client:
try:
cache_info = ai_service.redis_client.info()
stats['cache_stats'] = {
'total_keys': cache_info.get('db0', {}).get('keys', 0),
'memory_usage': cache_info.get('used_memory_human', '0B'),
'hits': cache_info.get('keyspace_hits', 0),
'misses': cache_info.get('keyspace_misses', 0)
}
except Exception as e:
stats['cache_stats'] = {'error': str(e)}
return stats
# Startup event
startup_time = time.time()
@app.on_event("startup")
async def startup_event():
"""Inicialização da aplicação"""
logger.info("🚀 AI Production API iniciando...")
logger.info(f"Modelos carregados: {list(ai_service.models.keys())}")
logger.info("✅ API pronta para receber requisições")
@app.on_event("shutdown")
async def shutdown_event():
"""Limpeza no shutdown"""
logger.info("🛑 Desligando AI Production API...")
if ai_service.redis_client:
ai_service.redis_client.close()
logger.info("✅ Shutdown completo")
# Rota principal
@app.get("/")
async def root():
return {
"message": "AI Production API",
"version": "1.0.0",
"status": "operational",
"docs": "/docs",
"health": "/health",
"metrics": "/metrics"
}
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=int(os.getenv('PORT', 8000)),
workers=int(os.getenv('WORKERS', 1)),
reload=False, # False em produção
access_log=True
)
# Dockerfile
FROM python:3.10-slim
# Instalar dependências do sistema
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Definir diretório de trabalho
WORKDIR /app
# Copiar requirements
COPY requirements.txt .
# Instalar dependências Python
RUN pip install --no-cache-dir -r requirements.txt
# Copiar código da aplicação
COPY . .
# Criar usuário não-root
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# Expor porta
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Comando para executar
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
ai-api:
build: .
ports:
- "8000:8000"
environment:
- REDIS_HOST=redis
- API_TOKEN=seu_token_desenvolvimento
depends_on:
- redis
volumes:
- ./models:/app/models
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
restart: unless-stopped
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
restart: unless-stopped
volumes:
redis_data:
grafana_data:
# Deploy usando AWS ECS + Fargate
# 1. Criar repositório ECR
aws ecr create-repository --repository-name ai-production-api
# 2. Build e push da imagem
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin YOUR_ACCOUNT.dkr.ecr.us-east-1.amazonaws.com
docker build -t ai-production-api .
docker tag ai-production-api:latest YOUR_ACCOUNT.dkr.ecr.us-east-1.amazonaws.com/ai-production-api:latest
docker push YOUR_ACCOUNT.dkr.ecr.us-east-1.amazonaws.com/ai-production-api:latest
# 3. Usar CloudFormation ou Terraform para infraestrutura
# Deploy usando Cloud Run
# 1. Configurar projeto
gcloud config set project SEU_PROJETO_ID
# 2. Build e deploy
gcloud run deploy ai-production-api \
--source . \
--platform managed \
--region us-central1 \
--allow-unauthenticated \
--memory 2Gi \
--cpu 2 \
--max-instances 100
# Deploy usando Azure Container Instances
# 1. Criar grupo de recursos
az group create --name ai-production-rg --location eastus
# 2. Criar container registry
az acr create --resource-group ai-production-rg --name aiproductionacr --sku Basic
# 3. Deploy da aplicação
az container create \
--resource-group ai-production-rg \
--name ai-production-api \
--image aiproductionacr.azurecr.io/ai-api:latest \
--cpu 2 --memory 4 \
--ports 8000
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'ai-api'
static_configs:
- targets: ['ai-api:8000']
scrape_interval: 5s
metrics_path: /metrics
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
# alert_rules.yml
groups:
- name: ai-api-alerts
rules:
- alert: HighErrorRate
expr: rate(ai_requests_total{status="error"}[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "Alta taxa de erro na API de IA"
description: "Taxa de erro acima de 10% nos últimos 5 minutos"
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(ai_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "Tempo de resposta alto"
description: "95% das requisições levam mais de 2 segundos"
- alert: APIDown
expr: up{job="ai-api"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "API de IA fora do ar"
description: "A API não está respondendo aos health checks"
Você acabou de aprender como transformar projetos locais em serviços profissionais de IA rodando em produção!
Na próxima aula, vamos abordar os aspectos éticos e de segurança essenciais para sistemas de IA em produção!
"Deploy bem feito é a ponte entre ideias e impacto real"
- Isaque Victor