Middleware
Filtri HTTP potenti per elaborare richieste e risposte in modo elegante
Pipeline HTTP: I middleware in Flux creano una pipeline dove ogni richiesta
passa attraverso una serie di filtri prima di raggiungere il controller.
Concetti Base
I middleware sono filtri che si eseguono prima o dopo l'azione del controller:
# Flusso richiesta con middleware
Request → Middleware 1 → Middleware 2 → Controller → Response
↑ ↓
← Middleware 1 ← Middleware 2 ←━━━━━━━━━━┘
# Middleware base
from flux.http import Middleware
class ExampleMiddleware(Middleware):
def handle(self, request, next):
"""
Gestisce la richiesta
Args:
request: Oggetto richiesta HTTP
next: Prossimo middleware/controller nella pipeline
"""
# Logica PRIMA del controller
print(f"Processing request to {request.url}")
# Chiama il prossimo middleware/controller
response = next(request)
# Logica DOPO il controller
print(f"Sending response with status {response.status}")
return response
Creare Middleware
Genera e implementa middleware personalizzati:
# Creare nuovo middleware
flux make:middleware AuthMiddleware
flux make:middleware ApiRateLimitMiddleware
flux make:middleware CorsMiddleware
# app/middleware/AuthMiddleware.flux
from flux.http import Middleware
from flux.auth import Auth
class AuthMiddleware(Middleware):
def handle(self, request, next):
# Controlla se utente è autenticato
if not Auth.check():
if request.wants_json():
return json_response({"error": "Unauthorized"}, 401)
else:
return redirect("/login")
# Aggiunge utente alla richiesta
request.user = Auth.user()
return next(request)
# app/middleware/AdminMiddleware.flux
class AdminMiddleware(Middleware):
def handle(self, request, next):
user = request.user or Auth.user()
if not user or not user.is_admin():
if request.wants_json():
return json_response({"error": "Forbidden"}, 403)
else:
return redirect("/").with("error", "Accesso negato")
return next(request)
# app/middleware/GuestMiddleware.flux
class GuestMiddleware(Middleware):
"""Middleware per utenti non autenticati"""
def handle(self, request, next):
if Auth.check():
return redirect("/dashboard")
return next(request)
Registrare Middleware
Configura middleware globali, di gruppo e per route:
# config/middleware.flux
MIDDLEWARE = {
# Middleware globali (eseguiti per ogni richiesta)
"global": [
"flux.middleware.TrimStrings",
"flux.middleware.ConvertEmptyStringsToNull",
"app.middleware.TrustProxies",
"app.middleware.EncryptCookies",
"app.middleware.StartSession",
"app.middleware.VerifyCsrfToken",
],
# Gruppi di middleware
"groups": {
"web": [
"app.middleware.ShareErrorsFromSession",
"app.middleware.SubstituteBindings",
],
"api": [
"app.middleware.ThrottleRequests:60,1", # 60 req/min
"app.middleware.SubstituteBindings",
],
"auth": [
"app.middleware.AuthMiddleware",
],
"admin": [
"app.middleware.AuthMiddleware",
"app.middleware.AdminMiddleware",
]
},
# Middleware per alias
"aliases": {
"auth": "app.middleware.AuthMiddleware",
"admin": "app.middleware.AdminMiddleware",
"guest": "app.middleware.GuestMiddleware",
"throttle": "app.middleware.ThrottleRequests",
"cors": "app.middleware.CorsMiddleware",
"cache": "app.middleware.CacheResponse",
}
}
Applicare Middleware
Usa middleware nelle routes e nei controller:
# routes/web.flux
# Middleware su route singola
@route("/admin/dashboard", middleware=["auth", "admin"])
def admin_dashboard():
return render("admin.dashboard")
# Middleware con parametri
@route("/api/users", middleware=["throttle:30,1"]) # 30 req/min
def api_users():
return json_response(User.all())
# Gruppo di route con middleware
with middleware(["auth"]):
@route("/profile")
def profile():
return render("profile")
@route("/settings")
def settings():
return render("settings")
# Middleware condizionale
@route("/posts/{id}")
@middleware("auth", only=["edit", "update", "delete"])
@middleware("cache:300", only=["show"]) # Cache 5 minuti
def posts_controller():
pass
# Nei controller
class UserController:
def __init__(self):
# Middleware per tutti i metodi
self.middleware("auth")
# Middleware per metodi specifici
self.middleware("admin", only=["create", "store", "destroy"])
self.middleware("throttle:10,1", only=["store", "update"])
# Escludi metodi
self.middleware("cache:600", except=["store", "update"])
def index(self):
return render("users.index", {"users": User.all()})
def store(self):
# Logica creazione utente
pass
Middleware con Parametri
Passa parametri ai middleware per personalizzare il comportamento:
# app/middleware/ThrottleMiddleware.flux
class ThrottleMiddleware(Middleware):
def handle(self, request, next, max_attempts=60, decay_minutes=1):
"""
Throttling con parametri configurabili
Args:
max_attempts: Numero massimo tentativi
decay_minutes: Finestra temporale in minuti
"""
key = self.resolve_request_signature(request)
max_attempts = int(max_attempts)
decay_minutes = int(decay_minutes)
if self.limiter.too_many_attempts(key, max_attempts):
return self.build_exception_response(key, max_attempts)
self.limiter.hit(key, decay_minutes * 60)
response = next(request)
return self.add_headers(
response, max_attempts,
self.calculate_remaining(key, max_attempts)
)
def resolve_request_signature(self, request):
"""Crea chiave unica per la richiesta"""
return f"{request.ip}:{request.url}"
# app/middleware/CacheMiddleware.flux
class CacheMiddleware(Middleware):
def handle(self, request, next, minutes=60, tags=None):
"""
Cache response per tempo specificato
"""
cache_key = f"response:{request.url}:{request.query_string}"
minutes = int(minutes)
tags = tags.split(",") if tags else []
# Controlla cache esistente
cached_response = cache_get(cache_key)
if cached_response:
return self.build_cached_response(cached_response)
# Esegue richiesta
response = next(request)
# Cache solo risposte 200
if response.status == 200:
cache_put(cache_key, {
"content": response.content,
"headers": response.headers,
"status": response.status
}, minutes, tags)
return response
# Uso con parametri
@route("/api/posts", middleware=["throttle:100,1"]) # 100 req/min
@route("/blog", middleware=["cache:30,blog,posts"]) # 30 min, tags: blog,posts
def blog_posts():
return render("blog.index")
Middleware Built-in
Flux include middleware pronti all'uso:
# Middleware di sicurezza
"flux.middleware.TrustProxies" # Gestione proxy
"flux.middleware.EncryptCookies" # Crittografia cookies
"flux.middleware.VerifyCsrfToken" # Protezione CSRF
"flux.middleware.PreventRequestsDuringMaintenance"
# Middleware di gestione dati
"flux.middleware.TrimStrings" # Rimuove spazi da input
"flux.middleware.ConvertEmptyStringsToNull" # "" → null
"flux.middleware.SubstituteBindings" # Route model binding
# Middleware sessioni
"flux.middleware.StartSession" # Avvia sessioni
"flux.middleware.ShareErrorsFromSession" # Condivide errori con views
# Middleware API
"flux.middleware.ThrottleRequests" # Rate limiting
"flux.middleware.CorsMiddleware" # Cross-Origin Resource Sharing
"flux.middleware.ValidateSignature" # Validazione firma richieste
# Middleware cache
"flux.middleware.CacheResponse" # Cache response HTTP
"flux.middleware.ETagMiddleware" # Gestione ETag headers
# Middleware logging
"flux.middleware.LogRequests" # Log richieste HTTP
"flux.middleware.ProfileRequests" # Profiling performance
Middleware Avanzati
Implementazioni complesse per casi d'uso specifici:
# app/middleware/MaintenanceMiddleware.flux
class MaintenanceMiddleware(Middleware):
"""Modalità manutenzione con whitelist IP"""
def handle(self, request, next):
if not config("app.maintenance_mode", False):
return next(request)
# IP whitelisted
allowed_ips = config("app.maintenance_allowed_ips", [])
if request.ip in allowed_ips:
return next(request)
# Utenti admin
if Auth.check() and Auth.user().is_admin():
return next(request)
# Mostra pagina manutenzione
return response(render("maintenance"), 503)
# app/middleware/SecurityHeadersMiddleware.flux
class SecurityHeadersMiddleware(Middleware):
"""Headers di sicurezza"""
def handle(self, request, next):
response = next(request)
# Security headers
response.headers.update({
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "1; mode=block",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"Content-Security-Policy": self.get_csp_policy(),
"Referrer-Policy": "strict-origin-when-cross-origin"
})
return response
def get_csp_policy(self):
return ("default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
"font-src 'self' https://fonts.gstatic.com; "
"img-src 'self' data: https:;")
# app/middleware/ApiVersioningMiddleware.flux
class ApiVersioningMiddleware(Middleware):
"""Versioning API automatico"""
def handle(self, request, next):
# Determina versione da header o URL
version = (request.header("Accept-Version") or
request.segment("v1", "v2") or
"v1")
request.api_version = version
# Routing per versione
if version == "v2":
request.controller_namespace = "App\\Http\\Controllers\\Api\\V2"
else:
request.controller_namespace = "App\\Http\\Controllers\\Api\\V1"
response = next(request)
# Header versione nella response
response.header("API-Version", version)
return response
# app/middleware/RequestLoggingMiddleware.flux
class RequestLoggingMiddleware(Middleware):
"""Logging avanzato richieste"""
def handle(self, request, next):
start_time = time.time()
request_id = str(uuid4())
# Log richiesta
log_info("Request started", {
"request_id": request_id,
"method": request.method,
"url": request.url,
"ip": request.ip,
"user_agent": request.header("User-Agent"),
"user_id": Auth.id() if Auth.check() else None
})
# Aggiungi ID alla richiesta
request.id = request_id
try:
response = next(request)
# Log risposta
end_time = time.time()
duration = (end_time - start_time) * 1000 # ms
log_info("Request completed", {
"request_id": request_id,
"status": response.status,
"duration_ms": round(duration, 2),
"memory_mb": round(memory_usage() / 1024 / 1024, 2)
})
return response
except Exception as e:
# Log errore
end_time = time.time()
duration = (end_time - start_time) * 1000
log_error("Request failed", {
"request_id": request_id,
"error": str(e),
"duration_ms": round(duration, 2)
})
raise e
Middleware per API
Middleware specializzati per API REST:
# app/middleware/ApiAuthMiddleware.flux
class ApiAuthMiddleware(Middleware):
"""Autenticazione API con token"""
def handle(self, request, next):
# Bearer token
token = self.extract_token(request)
if not token:
return json_response({"error": "Token required"}, 401)
# Valida token
user = self.validate_token(token)
if not user:
return json_response({"error": "Invalid token"}, 401)
# Rate limiting per utente
if self.is_rate_limited(user):
return json_response({
"error": "Rate limit exceeded",
"retry_after": self.get_retry_after(user)
}, 429)
request.user = user
Auth.set_user(user)
return next(request)
def extract_token(self, request):
auth_header = request.header("Authorization")
if auth_header and auth_header.startswith("Bearer "):
return auth_header[7:]
return None
def validate_token(self, token):
return User.where("api_token", token).where("active", True).first()
# app/middleware/JsonMiddleware.flux
class JsonMiddleware(Middleware):
"""Forza response JSON per API"""
def handle(self, request, next):
# Forza Accept: application/json
request.headers["Accept"] = "application/json"
response = next(request)
# Assicura Content-Type JSON
if not response.headers.get("Content-Type"):
response.headers["Content-Type"] = "application/json"
return response
# app/middleware/ApiResponseMiddleware.flux
class ApiResponseMiddleware(Middleware):
"""Standardizza response API"""
def handle(self, request, next):
try:
response = next(request)
# Wrapper per success response
if response.status >= 200 and response.status < 300:
data = response.get_original_content()
wrapped_response = {
"success": True,
"data": data,
"message": "Request successful",
"timestamp": now().isoformat(),
"request_id": getattr(request, "id", None)
}
return json_response(wrapped_response, response.status)
return response
except ValidationException as e:
return json_response({
"success": False,
"error": "Validation failed",
"errors": e.errors,
"timestamp": now().isoformat(),
"request_id": getattr(request, "id", None)
}, 422)
except Exception as e:
return json_response({
"success": False,
"error": "Internal server error",
"message": str(e) if config("app.debug") else "Something went wrong",
"timestamp": now().isoformat(),
"request_id": getattr(request, "id", None)
}, 500)
Middleware Condizionali
Middleware che si eseguono solo in certe condizioni:
# app/middleware/ConditionalMiddleware.flux
class ConditionalMiddleware(Middleware):
def handle(self, request, next):
# Esegue solo in produzione
if config("app.env") == "production":
return self.production_logic(request, next)
return next(request)
# Middleware basato su feature flags
class FeatureMiddleware(Middleware):
def __init__(self, feature_name):
self.feature_name = feature_name
def handle(self, request, next):
if not self.is_feature_enabled(request):
return response("Feature not available", 404)
return next(request)
def is_feature_enabled(self, request):
user = getattr(request, "user", None)
# Feature flag per utente
if user and user.has_feature(self.feature_name):
return True
# Feature flag globale
return config(f"features.{self.feature_name}", False)
# Middleware per A/B testing
class ABTestMiddleware(Middleware):
def handle(self, request, next):
if not hasattr(request, "user") or not request.user:
return next(request)
# Determina variante A/B test
user_id = request.user.id
test_variant = "A" if user_id % 2 == 0 else "B"
request.ab_variant = test_variant
# Memorizza in sessione
session_set("ab_variant", test_variant)
return next(request)
# Uso nei controller
class HomeController:
def index(self, request):
variant = getattr(request, "ab_variant", "A")
if variant == "B":
return render("home.variant_b")
else:
return render("home.index")
Pipeline Middleware
Controlla l'ordine e il flusso dei middleware:
# Pipeline personalizzata
class MiddlewarePipeline:
def __init__(self):
self.middleware = []
def pipe(self, middleware_class):
"""Aggiunge middleware alla pipeline"""
self.middleware.append(middleware_class)
return self
def through(self, request):
"""Esegue la pipeline"""
def pipeline_handler(request):
return self.run_pipeline(request, 0)
return pipeline_handler(request)
def run_pipeline(self, request, index):
if index >= len(self.middleware):
return request # Fine pipeline
middleware = self.middleware[index]
def next_handler(request):
return self.run_pipeline(request, index + 1)
return middleware().handle(request, next_handler)
# Uso della pipeline
def process_api_request(request):
pipeline = MiddlewarePipeline()
result = (pipeline
.pipe(CorsMiddleware)
.pipe(ThrottleMiddleware)
.pipe(ApiAuthMiddleware)
.pipe(JsonMiddleware)
.through(request))
return result
# Middleware con priorità
class PriorityMiddleware(Middleware):
priority = 100 # Alto = eseguito prima
def handle(self, request, next):
return next(request)
class HighPriorityMiddleware(Middleware):
priority = 200
def handle(self, request, next):
return next(request)
# Ordinamento automatico per priorità
def sort_middleware_by_priority(middleware_list):
return sorted(middleware_list,
key=lambda m: getattr(m, "priority", 0),
reverse=True)
Testing Middleware
Testa middleware in isolamento:
# tests/middleware/test_auth_middleware.flux
class TestAuthMiddleware(TestCase):
def test_allows_authenticated_user(self):
# Setup utente autenticato
user = UserFactory.create()
Auth.login(user)
# Mock request
request = self.create_request("/dashboard")
middleware = AuthMiddleware()
# Mock next handler
def next_handler(req):
return response("Success")
# Esegui middleware
result = middleware.handle(request, next_handler)
# Assertions
self.assertEqual(result.content, "Success")
self.assertEqual(request.user.id, user.id)
def test_redirects_guest_user(self):
# Nessun utente autenticato
Auth.logout()
request = self.create_request("/dashboard")
middleware = AuthMiddleware()
def next_handler(req):
return response("Should not reach here")
result = middleware.handle(request, next_handler)
# Verifica redirect
self.assertEqual(result.status, 302)
self.assertTrue(result.headers["Location"].endswith("/login"))
def test_returns_json_for_api_request(self):
request = self.create_request("/api/users")
request.headers["Accept"] = "application/json"
middleware = AuthMiddleware()
def next_handler(req):
return response("Should not reach here")
result = middleware.handle(request, next_handler)
# Verifica JSON response
self.assertEqual(result.status, 401)
data = json.loads(result.content)
self.assertEqual(data["error"], "Unauthorized")
# Test integrazione middleware
class TestMiddlewareIntegration(TestCase):
def test_middleware_pipeline(self):
# Test pipeline completa
user = UserFactory.admin().create()
response = self.acting_as(user)\
.get("/admin/dashboard")
self.assertEqual(response.status, 200)
def test_middleware_with_parameters(self):
# Test throttling
for i in range(5):
response = self.get("/api/test")
if i < 3:
self.assertEqual(response.status, 200)
else:
self.assertEqual(response.status, 429) # Too Many Requests
# Helper per creare mock request
def create_mock_request(url="/", method="GET", headers=None):
request = MockRequest()
request.url = url
request.method = method
request.headers = headers or {}
request.ip = "127.0.0.1"
return request
Esempio Completo: Sistema di Logging
# app/middleware/ComprehensiveLoggingMiddleware.flux
class ComprehensiveLoggingMiddleware(Middleware):
"""Sistema di logging completo per debugging e monitoring"""
def handle(self, request, next):
# Setup logging context
start_time = time.time()
request_id = str(uuid4())
# Context per tutti i log
log_context = {
"request_id": request_id,
"method": request.method,
"url": request.url,
"ip": self.get_real_ip(request),
"user_agent": request.header("User-Agent", "Unknown"),
"user_id": Auth.id() if Auth.check() else None,
"session_id": session_id(),
"timestamp": now().isoformat()
}
# Log richiesta iniziale
self.log_request_start(log_context, request)
# Aggiungi context alla richiesta
request.log_context = log_context
request.id = request_id
try:
# Monitora memory e query
initial_memory = memory_usage()
initial_queries = DB.query_count()
response = next(request)
# Calcola metriche
end_time = time.time()
duration = (end_time - start_time) * 1000
memory_used = memory_usage() - initial_memory
queries_executed = DB.query_count() - initial_queries
# Log successo
self.log_request_success(log_context, response, {
"duration_ms": round(duration, 2),
"memory_mb": round(memory_used / 1024 / 1024, 2),
"queries_count": queries_executed,
"response_size": len(response.content) if hasattr(response, "content") else 0
})
# Log performance warnings
self.check_performance_warnings(log_context, duration, memory_used, queries_executed)
return response
except Exception as e:
# Log errori
end_time = time.time()
duration = (end_time - start_time) * 1000
self.log_request_error(log_context, e, {
"duration_ms": round(duration, 2),
"error_type": type(e).__name__,
"stack_trace": self.get_stack_trace(e)
})
# Re-raise per error handling normale
raise e
def log_request_start(self, context, request):
"""Log inizio richiesta"""
log_info("REQUEST_START", {
**context,
"headers": self.sanitize_headers(request.headers),
"query_params": dict(request.query),
"body_size": len(request.body) if hasattr(request, "body") else 0
})
def log_request_success(self, context, response, metrics):
"""Log richiesta completata con successo"""
log_info("REQUEST_SUCCESS", {
**context,
"status": response.status,
"response_headers": dict(response.headers),
**metrics
})
def log_request_error(self, context, error, metrics):
"""Log errore richiesta"""
log_error("REQUEST_ERROR", {
**context,
"error_message": str(error),
**metrics
})
def check_performance_warnings(self, context, duration, memory, queries):
"""Controlla e logga warning performance"""
warnings = []
if duration > 1000: # > 1 secondo
warnings.append(f"Slow request: {duration:.0f}ms")
if memory > 50 * 1024 * 1024: # > 50MB
warnings.append(f"High memory usage: {memory/1024/1024:.1f}MB")
if queries > 10:
warnings.append(f"Many queries: {queries}")
if warnings:
log_warning("PERFORMANCE_WARNING", {
**context,
"warnings": warnings,
"metrics": {
"duration_ms": duration,
"memory_mb": memory / 1024 / 1024,
"queries": queries
}
})
def get_real_ip(self, request):
"""Ottieni IP reale considerando proxy"""
# Controlla headers comuni per IP forwarded
for header in ["X-Forwarded-For", "X-Real-IP", "X-Client-IP"]:
ip = request.header(header)
if ip:
return ip.split(",")[0].strip()
return request.ip
def sanitize_headers(self, headers):
"""Rimuovi headers sensibili dal log"""
sensitive = ["Authorization", "Cookie", "X-API-Key"]
sanitized = {}
for key, value in headers.items():
if key in sensitive:
sanitized[key] = "[REDACTED]"
else:
sanitized[key] = value
return sanitized
def get_stack_trace(self, error):
"""Ottieni stack trace limitato"""
import traceback
return traceback.format_exc()[:1000] # Limita a 1000 caratteri
# Configurazione logging
# config/logging.flux
LOGGING = {
"handlers": {
"requests": {
"driver": "daily",
"path": "storage/logs/requests.log",
"level": "info",
"days": 30
},
"performance": {
"driver": "daily",
"path": "storage/logs/performance.log",
"level": "warning",
"days": 7
},
"errors": {
"driver": "daily",
"path": "storage/logs/errors.log",
"level": "error",
"days": 90
}
}
}
# Dashboard per analizzare logs
class LogAnalyticsDashboard:
def get_request_stats(self, hours=24):
"""Statistiche richieste ultime N ore"""
return {
"total_requests": self.count_requests(hours),
"avg_response_time": self.avg_response_time(hours),
"error_rate": self.error_rate(hours),
"top_endpoints": self.top_endpoints(hours),
"slow_requests": self.slow_requests(hours)
}
def count_requests(self, hours):
# Analizza log files o database
pass
def identify_performance_issues(self):
"""Identifica problemi performance ricorrenti"""
return {
"slow_queries": self.find_slow_queries(),
"memory_leaks": self.find_memory_issues(),
"high_traffic_endpoints": self.find_traffic_spikes()
}
Best Practices
🏗️ Architettura
- Un middleware per una responsabilità
- Mantieni middleware leggeri e veloci
- Usa parametri per configurabilità
- Documenta effetti sui controller
⚡ Performance
- Minimizza logica nei middleware globali
- Cache risultati costosi
- Evita query database pesanti
- Monitora impatto performance
🔒 Sicurezza
- Valida input nei middleware
- Non loggare dati sensibili
- Gestisci eccezioni gracefully
- Usa HTTPS per middleware auth
🚀 Ottimo! Ora padroneggi i middleware di Flux per creare pipeline HTTP potenti.
Scopri come implementare Autenticazione completa
per proteggere la tua applicazione.