Middleware

Filtri HTTP potenti per elaborare richieste e risposte in modo elegante

Pipeline HTTP: I middleware in Flux creano una pipeline dove ogni richiesta passa attraverso una serie di filtri prima di raggiungere il controller.

Concetti Base

I middleware sono filtri che si eseguono prima o dopo l'azione del controller:

# Flusso richiesta con middleware
Request → Middleware 1 → Middleware 2 → Controller → Response
                ↑                                        ↓
                ← Middleware 1 ← Middleware 2 ←━━━━━━━━━━┘

# Middleware base
from flux.http import Middleware

class ExampleMiddleware(Middleware):
    
    def handle(self, request, next):
        """
        Gestisce la richiesta
        Args:
            request: Oggetto richiesta HTTP
            next: Prossimo middleware/controller nella pipeline
        """
        # Logica PRIMA del controller
        print(f"Processing request to {request.url}")
        
        # Chiama il prossimo middleware/controller
        response = next(request)
        
        # Logica DOPO il controller
        print(f"Sending response with status {response.status}")
        
        return response

Creare Middleware

Genera e implementa middleware personalizzati:

# Creare nuovo middleware
flux make:middleware AuthMiddleware
flux make:middleware ApiRateLimitMiddleware
flux make:middleware CorsMiddleware
# app/middleware/AuthMiddleware.flux
from flux.http import Middleware
from flux.auth import Auth

class AuthMiddleware(Middleware):
    
    def handle(self, request, next):
        # Controlla se utente è autenticato
        if not Auth.check():
            if request.wants_json():
                return json_response({"error": "Unauthorized"}, 401)
            else:
                return redirect("/login")
        
        # Aggiunge utente alla richiesta
        request.user = Auth.user()
        
        return next(request)

# app/middleware/AdminMiddleware.flux
class AdminMiddleware(Middleware):
    
    def handle(self, request, next):
        user = request.user or Auth.user()
        
        if not user or not user.is_admin():
            if request.wants_json():
                return json_response({"error": "Forbidden"}, 403)
            else:
                return redirect("/").with("error", "Accesso negato")
        
        return next(request)

# app/middleware/GuestMiddleware.flux
class GuestMiddleware(Middleware):
    """Middleware per utenti non autenticati"""
    
    def handle(self, request, next):
        if Auth.check():
            return redirect("/dashboard")
        
        return next(request)

Registrare Middleware

Configura middleware globali, di gruppo e per route:

# config/middleware.flux
MIDDLEWARE = {
    # Middleware globali (eseguiti per ogni richiesta)
    "global": [
        "flux.middleware.TrimStrings",
        "flux.middleware.ConvertEmptyStringsToNull",
        "app.middleware.TrustProxies",
        "app.middleware.EncryptCookies",
        "app.middleware.StartSession",
        "app.middleware.VerifyCsrfToken",
    ],
    
    # Gruppi di middleware
    "groups": {
        "web": [
            "app.middleware.ShareErrorsFromSession",
            "app.middleware.SubstituteBindings",
        ],
        
        "api": [
            "app.middleware.ThrottleRequests:60,1",  # 60 req/min
            "app.middleware.SubstituteBindings",
        ],
        
        "auth": [
            "app.middleware.AuthMiddleware",
        ],
        
        "admin": [
            "app.middleware.AuthMiddleware", 
            "app.middleware.AdminMiddleware",
        ]
    },
    
    # Middleware per alias
    "aliases": {
        "auth": "app.middleware.AuthMiddleware",
        "admin": "app.middleware.AdminMiddleware",
        "guest": "app.middleware.GuestMiddleware",
        "throttle": "app.middleware.ThrottleRequests",
        "cors": "app.middleware.CorsMiddleware",
        "cache": "app.middleware.CacheResponse",
    }
}

Applicare Middleware

Usa middleware nelle routes e nei controller:

# routes/web.flux

# Middleware su route singola
@route("/admin/dashboard", middleware=["auth", "admin"])
def admin_dashboard():
    return render("admin.dashboard")

# Middleware con parametri
@route("/api/users", middleware=["throttle:30,1"])  # 30 req/min
def api_users():
    return json_response(User.all())

# Gruppo di route con middleware
with middleware(["auth"]):
    @route("/profile")
    def profile():
        return render("profile")
    
    @route("/settings")
    def settings():
        return render("settings")

# Middleware condizionale
@route("/posts/{id}")
@middleware("auth", only=["edit", "update", "delete"])
@middleware("cache:300", only=["show"])  # Cache 5 minuti
def posts_controller():
    pass

# Nei controller
class UserController:
    
    def __init__(self):
        # Middleware per tutti i metodi
        self.middleware("auth")
        
        # Middleware per metodi specifici
        self.middleware("admin", only=["create", "store", "destroy"])
        self.middleware("throttle:10,1", only=["store", "update"])
        
        # Escludi metodi
        self.middleware("cache:600", except=["store", "update"])
    
    def index(self):
        return render("users.index", {"users": User.all()})
    
    def store(self):
        # Logica creazione utente
        pass

Middleware con Parametri

Passa parametri ai middleware per personalizzare il comportamento:

# app/middleware/ThrottleMiddleware.flux
class ThrottleMiddleware(Middleware):
    
    def handle(self, request, next, max_attempts=60, decay_minutes=1):
        """
        Throttling con parametri configurabili
        Args:
            max_attempts: Numero massimo tentativi
            decay_minutes: Finestra temporale in minuti
        """
        key = self.resolve_request_signature(request)
        max_attempts = int(max_attempts)
        decay_minutes = int(decay_minutes)
        
        if self.limiter.too_many_attempts(key, max_attempts):
            return self.build_exception_response(key, max_attempts)
        
        self.limiter.hit(key, decay_minutes * 60)
        
        response = next(request)
        
        return self.add_headers(
            response, max_attempts,
            self.calculate_remaining(key, max_attempts)
        )
    
    def resolve_request_signature(self, request):
        """Crea chiave unica per la richiesta"""
        return f"{request.ip}:{request.url}"

# app/middleware/CacheMiddleware.flux
class CacheMiddleware(Middleware):
    
    def handle(self, request, next, minutes=60, tags=None):
        """
        Cache response per tempo specificato
        """
        cache_key = f"response:{request.url}:{request.query_string}"
        minutes = int(minutes)
        tags = tags.split(",") if tags else []
        
        # Controlla cache esistente
        cached_response = cache_get(cache_key)
        if cached_response:
            return self.build_cached_response(cached_response)
        
        # Esegue richiesta
        response = next(request)
        
        # Cache solo risposte 200
        if response.status == 200:
            cache_put(cache_key, {
                "content": response.content,
                "headers": response.headers,
                "status": response.status
            }, minutes, tags)
        
        return response

# Uso con parametri
@route("/api/posts", middleware=["throttle:100,1"])  # 100 req/min
@route("/blog", middleware=["cache:30,blog,posts"])  # 30 min, tags: blog,posts
def blog_posts():
    return render("blog.index")

Middleware Built-in

Flux include middleware pronti all'uso:

# Middleware di sicurezza
"flux.middleware.TrustProxies"           # Gestione proxy
"flux.middleware.EncryptCookies"         # Crittografia cookies
"flux.middleware.VerifyCsrfToken"        # Protezione CSRF
"flux.middleware.PreventRequestsDuringMaintenance"

# Middleware di gestione dati
"flux.middleware.TrimStrings"            # Rimuove spazi da input
"flux.middleware.ConvertEmptyStringsToNull"  # "" → null
"flux.middleware.SubstituteBindings"     # Route model binding

# Middleware sessioni
"flux.middleware.StartSession"           # Avvia sessioni
"flux.middleware.ShareErrorsFromSession" # Condivide errori con views

# Middleware API
"flux.middleware.ThrottleRequests"       # Rate limiting
"flux.middleware.CorsMiddleware"         # Cross-Origin Resource Sharing
"flux.middleware.ValidateSignature"     # Validazione firma richieste

# Middleware cache
"flux.middleware.CacheResponse"          # Cache response HTTP
"flux.middleware.ETagMiddleware"         # Gestione ETag headers

# Middleware logging
"flux.middleware.LogRequests"            # Log richieste HTTP
"flux.middleware.ProfileRequests"        # Profiling performance

Middleware Avanzati

Implementazioni complesse per casi d'uso specifici:

# app/middleware/MaintenanceMiddleware.flux
class MaintenanceMiddleware(Middleware):
    """Modalità manutenzione con whitelist IP"""
    
    def handle(self, request, next):
        if not config("app.maintenance_mode", False):
            return next(request)
        
        # IP whitelisted
        allowed_ips = config("app.maintenance_allowed_ips", [])
        if request.ip in allowed_ips:
            return next(request)
        
        # Utenti admin
        if Auth.check() and Auth.user().is_admin():
            return next(request)
        
        # Mostra pagina manutenzione
        return response(render("maintenance"), 503)

# app/middleware/SecurityHeadersMiddleware.flux
class SecurityHeadersMiddleware(Middleware):
    """Headers di sicurezza"""
    
    def handle(self, request, next):
        response = next(request)
        
        # Security headers
        response.headers.update({
            "X-Content-Type-Options": "nosniff",
            "X-Frame-Options": "DENY",
            "X-XSS-Protection": "1; mode=block",
            "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
            "Content-Security-Policy": self.get_csp_policy(),
            "Referrer-Policy": "strict-origin-when-cross-origin"
        })
        
        return response
    
    def get_csp_policy(self):
        return ("default-src 'self'; "
                "script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
                "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
                "font-src 'self' https://fonts.gstatic.com; "
                "img-src 'self' data: https:;")

# app/middleware/ApiVersioningMiddleware.flux
class ApiVersioningMiddleware(Middleware):
    """Versioning API automatico"""
    
    def handle(self, request, next):
        # Determina versione da header o URL
        version = (request.header("Accept-Version") or 
                  request.segment("v1", "v2") or 
                  "v1")
        
        request.api_version = version
        
        # Routing per versione
        if version == "v2":
            request.controller_namespace = "App\\Http\\Controllers\\Api\\V2"
        else:
            request.controller_namespace = "App\\Http\\Controllers\\Api\\V1"
        
        response = next(request)
        
        # Header versione nella response
        response.header("API-Version", version)
        
        return response

# app/middleware/RequestLoggingMiddleware.flux
class RequestLoggingMiddleware(Middleware):
    """Logging avanzato richieste"""
    
    def handle(self, request, next):
        start_time = time.time()
        request_id = str(uuid4())
        
        # Log richiesta
        log_info("Request started", {
            "request_id": request_id,
            "method": request.method,
            "url": request.url,
            "ip": request.ip,
            "user_agent": request.header("User-Agent"),
            "user_id": Auth.id() if Auth.check() else None
        })
        
        # Aggiungi ID alla richiesta
        request.id = request_id
        
        try:
            response = next(request)
            
            # Log risposta
            end_time = time.time()
            duration = (end_time - start_time) * 1000  # ms
            
            log_info("Request completed", {
                "request_id": request_id,
                "status": response.status,
                "duration_ms": round(duration, 2),
                "memory_mb": round(memory_usage() / 1024 / 1024, 2)
            })
            
            return response
            
        except Exception as e:
            # Log errore
            end_time = time.time()
            duration = (end_time - start_time) * 1000
            
            log_error("Request failed", {
                "request_id": request_id,
                "error": str(e),
                "duration_ms": round(duration, 2)
            })
            
            raise e

Middleware per API

Middleware specializzati per API REST:

# app/middleware/ApiAuthMiddleware.flux
class ApiAuthMiddleware(Middleware):
    """Autenticazione API con token"""
    
    def handle(self, request, next):
        # Bearer token
        token = self.extract_token(request)
        if not token:
            return json_response({"error": "Token required"}, 401)
        
        # Valida token
        user = self.validate_token(token)
        if not user:
            return json_response({"error": "Invalid token"}, 401)
        
        # Rate limiting per utente
        if self.is_rate_limited(user):
            return json_response({
                "error": "Rate limit exceeded",
                "retry_after": self.get_retry_after(user)
            }, 429)
        
        request.user = user
        Auth.set_user(user)
        
        return next(request)
    
    def extract_token(self, request):
        auth_header = request.header("Authorization")
        if auth_header and auth_header.startswith("Bearer "):
            return auth_header[7:]
        return None
    
    def validate_token(self, token):
        return User.where("api_token", token).where("active", True).first()

# app/middleware/JsonMiddleware.flux
class JsonMiddleware(Middleware):
    """Forza response JSON per API"""
    
    def handle(self, request, next):
        # Forza Accept: application/json
        request.headers["Accept"] = "application/json"
        
        response = next(request)
        
        # Assicura Content-Type JSON
        if not response.headers.get("Content-Type"):
            response.headers["Content-Type"] = "application/json"
        
        return response

# app/middleware/ApiResponseMiddleware.flux
class ApiResponseMiddleware(Middleware):
    """Standardizza response API"""
    
    def handle(self, request, next):
        try:
            response = next(request)
            
            # Wrapper per success response
            if response.status >= 200 and response.status < 300:
                data = response.get_original_content()
                
                wrapped_response = {
                    "success": True,
                    "data": data,
                    "message": "Request successful",
                    "timestamp": now().isoformat(),
                    "request_id": getattr(request, "id", None)
                }
                
                return json_response(wrapped_response, response.status)
            
            return response
            
        except ValidationException as e:
            return json_response({
                "success": False,
                "error": "Validation failed",
                "errors": e.errors,
                "timestamp": now().isoformat(),
                "request_id": getattr(request, "id", None)
            }, 422)
            
        except Exception as e:
            return json_response({
                "success": False,
                "error": "Internal server error",
                "message": str(e) if config("app.debug") else "Something went wrong",
                "timestamp": now().isoformat(),
                "request_id": getattr(request, "id", None)
            }, 500)

Middleware Condizionali

Middleware che si eseguono solo in certe condizioni:

# app/middleware/ConditionalMiddleware.flux
class ConditionalMiddleware(Middleware):
    
    def handle(self, request, next):
        # Esegue solo in produzione
        if config("app.env") == "production":
            return self.production_logic(request, next)
        
        return next(request)

# Middleware basato su feature flags
class FeatureMiddleware(Middleware):
    
    def __init__(self, feature_name):
        self.feature_name = feature_name
    
    def handle(self, request, next):
        if not self.is_feature_enabled(request):
            return response("Feature not available", 404)
        
        return next(request)
    
    def is_feature_enabled(self, request):
        user = getattr(request, "user", None)
        
        # Feature flag per utente
        if user and user.has_feature(self.feature_name):
            return True
        
        # Feature flag globale
        return config(f"features.{self.feature_name}", False)

# Middleware per A/B testing
class ABTestMiddleware(Middleware):
    
    def handle(self, request, next):
        if not hasattr(request, "user") or not request.user:
            return next(request)
        
        # Determina variante A/B test
        user_id = request.user.id
        test_variant = "A" if user_id % 2 == 0 else "B"
        
        request.ab_variant = test_variant
        
        # Memorizza in sessione
        session_set("ab_variant", test_variant)
        
        return next(request)

# Uso nei controller
class HomeController:
    
    def index(self, request):
        variant = getattr(request, "ab_variant", "A")
        
        if variant == "B":
            return render("home.variant_b")
        else:
            return render("home.index")

Pipeline Middleware

Controlla l'ordine e il flusso dei middleware:

# Pipeline personalizzata
class MiddlewarePipeline:
    
    def __init__(self):
        self.middleware = []
    
    def pipe(self, middleware_class):
        """Aggiunge middleware alla pipeline"""
        self.middleware.append(middleware_class)
        return self
    
    def through(self, request):
        """Esegue la pipeline"""
        def pipeline_handler(request):
            return self.run_pipeline(request, 0)
        
        return pipeline_handler(request)
    
    def run_pipeline(self, request, index):
        if index >= len(self.middleware):
            return request  # Fine pipeline
        
        middleware = self.middleware[index]
        
        def next_handler(request):
            return self.run_pipeline(request, index + 1)
        
        return middleware().handle(request, next_handler)

# Uso della pipeline
def process_api_request(request):
    pipeline = MiddlewarePipeline()
    
    result = (pipeline
              .pipe(CorsMiddleware)
              .pipe(ThrottleMiddleware)
              .pipe(ApiAuthMiddleware)
              .pipe(JsonMiddleware)
              .through(request))
    
    return result

# Middleware con priorità
class PriorityMiddleware(Middleware):
    priority = 100  # Alto = eseguito prima
    
    def handle(self, request, next):
        return next(request)

class HighPriorityMiddleware(Middleware):
    priority = 200
    
    def handle(self, request, next):
        return next(request)

# Ordinamento automatico per priorità
def sort_middleware_by_priority(middleware_list):
    return sorted(middleware_list, 
                 key=lambda m: getattr(m, "priority", 0), 
                 reverse=True)

Testing Middleware

Testa middleware in isolamento:

# tests/middleware/test_auth_middleware.flux
class TestAuthMiddleware(TestCase):
    
    def test_allows_authenticated_user(self):
        # Setup utente autenticato
        user = UserFactory.create()
        Auth.login(user)
        
        # Mock request
        request = self.create_request("/dashboard")
        middleware = AuthMiddleware()
        
        # Mock next handler
        def next_handler(req):
            return response("Success")
        
        # Esegui middleware
        result = middleware.handle(request, next_handler)
        
        # Assertions
        self.assertEqual(result.content, "Success")
        self.assertEqual(request.user.id, user.id)
    
    def test_redirects_guest_user(self):
        # Nessun utente autenticato
        Auth.logout()
        
        request = self.create_request("/dashboard")
        middleware = AuthMiddleware()
        
        def next_handler(req):
            return response("Should not reach here")
        
        result = middleware.handle(request, next_handler)
        
        # Verifica redirect
        self.assertEqual(result.status, 302)
        self.assertTrue(result.headers["Location"].endswith("/login"))
    
    def test_returns_json_for_api_request(self):
        request = self.create_request("/api/users")
        request.headers["Accept"] = "application/json"
        
        middleware = AuthMiddleware()
        
        def next_handler(req):
            return response("Should not reach here")
        
        result = middleware.handle(request, next_handler)
        
        # Verifica JSON response
        self.assertEqual(result.status, 401)
        data = json.loads(result.content)
        self.assertEqual(data["error"], "Unauthorized")

# Test integrazione middleware
class TestMiddlewareIntegration(TestCase):
    
    def test_middleware_pipeline(self):
        # Test pipeline completa
        user = UserFactory.admin().create()
        
        response = self.acting_as(user)\
                       .get("/admin/dashboard")
        
        self.assertEqual(response.status, 200)
    
    def test_middleware_with_parameters(self):
        # Test throttling
        for i in range(5):
            response = self.get("/api/test")
            if i < 3:
                self.assertEqual(response.status, 200)
            else:
                self.assertEqual(response.status, 429)  # Too Many Requests

# Helper per creare mock request
def create_mock_request(url="/", method="GET", headers=None):
    request = MockRequest()
    request.url = url
    request.method = method
    request.headers = headers or {}
    request.ip = "127.0.0.1"
    return request

Esempio Completo: Sistema di Logging

# app/middleware/ComprehensiveLoggingMiddleware.flux
class ComprehensiveLoggingMiddleware(Middleware):
    """Sistema di logging completo per debugging e monitoring"""
    
    def handle(self, request, next):
        # Setup logging context
        start_time = time.time()
        request_id = str(uuid4())
        
        # Context per tutti i log
        log_context = {
            "request_id": request_id,
            "method": request.method,
            "url": request.url,
            "ip": self.get_real_ip(request),
            "user_agent": request.header("User-Agent", "Unknown"),
            "user_id": Auth.id() if Auth.check() else None,
            "session_id": session_id(),
            "timestamp": now().isoformat()
        }
        
        # Log richiesta iniziale
        self.log_request_start(log_context, request)
        
        # Aggiungi context alla richiesta
        request.log_context = log_context
        request.id = request_id
        
        try:
            # Monitora memory e query
            initial_memory = memory_usage()
            initial_queries = DB.query_count()
            
            response = next(request)
            
            # Calcola metriche
            end_time = time.time()
            duration = (end_time - start_time) * 1000
            memory_used = memory_usage() - initial_memory
            queries_executed = DB.query_count() - initial_queries
            
            # Log successo
            self.log_request_success(log_context, response, {
                "duration_ms": round(duration, 2),
                "memory_mb": round(memory_used / 1024 / 1024, 2),
                "queries_count": queries_executed,
                "response_size": len(response.content) if hasattr(response, "content") else 0
            })
            
            # Log performance warnings
            self.check_performance_warnings(log_context, duration, memory_used, queries_executed)
            
            return response
            
        except Exception as e:
            # Log errori
            end_time = time.time()
            duration = (end_time - start_time) * 1000
            
            self.log_request_error(log_context, e, {
                "duration_ms": round(duration, 2),
                "error_type": type(e).__name__,
                "stack_trace": self.get_stack_trace(e)
            })
            
            # Re-raise per error handling normale
            raise e
    
    def log_request_start(self, context, request):
        """Log inizio richiesta"""
        log_info("REQUEST_START", {
            **context,
            "headers": self.sanitize_headers(request.headers),
            "query_params": dict(request.query),
            "body_size": len(request.body) if hasattr(request, "body") else 0
        })
    
    def log_request_success(self, context, response, metrics):
        """Log richiesta completata con successo"""
        log_info("REQUEST_SUCCESS", {
            **context,
            "status": response.status,
            "response_headers": dict(response.headers),
            **metrics
        })
    
    def log_request_error(self, context, error, metrics):
        """Log errore richiesta"""
        log_error("REQUEST_ERROR", {
            **context,
            "error_message": str(error),
            **metrics
        })
    
    def check_performance_warnings(self, context, duration, memory, queries):
        """Controlla e logga warning performance"""
        warnings = []
        
        if duration > 1000:  # > 1 secondo
            warnings.append(f"Slow request: {duration:.0f}ms")
        
        if memory > 50 * 1024 * 1024:  # > 50MB
            warnings.append(f"High memory usage: {memory/1024/1024:.1f}MB")
        
        if queries > 10:
            warnings.append(f"Many queries: {queries}")
        
        if warnings:
            log_warning("PERFORMANCE_WARNING", {
                **context,
                "warnings": warnings,
                "metrics": {
                    "duration_ms": duration,
                    "memory_mb": memory / 1024 / 1024,
                    "queries": queries
                }
            })
    
    def get_real_ip(self, request):
        """Ottieni IP reale considerando proxy"""
        # Controlla headers comuni per IP forwarded
        for header in ["X-Forwarded-For", "X-Real-IP", "X-Client-IP"]:
            ip = request.header(header)
            if ip:
                return ip.split(",")[0].strip()
        
        return request.ip
    
    def sanitize_headers(self, headers):
        """Rimuovi headers sensibili dal log"""
        sensitive = ["Authorization", "Cookie", "X-API-Key"]
        sanitized = {}
        
        for key, value in headers.items():
            if key in sensitive:
                sanitized[key] = "[REDACTED]"
            else:
                sanitized[key] = value
        
        return sanitized
    
    def get_stack_trace(self, error):
        """Ottieni stack trace limitato"""
        import traceback
        return traceback.format_exc()[:1000]  # Limita a 1000 caratteri

# Configurazione logging
# config/logging.flux
LOGGING = {
    "handlers": {
        "requests": {
            "driver": "daily",
            "path": "storage/logs/requests.log",
            "level": "info",
            "days": 30
        },
        "performance": {
            "driver": "daily", 
            "path": "storage/logs/performance.log",
            "level": "warning",
            "days": 7
        },
        "errors": {
            "driver": "daily",
            "path": "storage/logs/errors.log", 
            "level": "error",
            "days": 90
        }
    }
}

# Dashboard per analizzare logs
class LogAnalyticsDashboard:
    
    def get_request_stats(self, hours=24):
        """Statistiche richieste ultime N ore"""
        return {
            "total_requests": self.count_requests(hours),
            "avg_response_time": self.avg_response_time(hours),
            "error_rate": self.error_rate(hours),
            "top_endpoints": self.top_endpoints(hours),
            "slow_requests": self.slow_requests(hours)
        }
    
    def count_requests(self, hours):
        # Analizza log files o database
        pass
    
    def identify_performance_issues(self):
        """Identifica problemi performance ricorrenti"""
        return {
            "slow_queries": self.find_slow_queries(),
            "memory_leaks": self.find_memory_issues(),
            "high_traffic_endpoints": self.find_traffic_spikes()
        }

Best Practices

🏗️ Architettura

  • Un middleware per una responsabilità
  • Mantieni middleware leggeri e veloci
  • Usa parametri per configurabilità
  • Documenta effetti sui controller

⚡ Performance

  • Minimizza logica nei middleware globali
  • Cache risultati costosi
  • Evita query database pesanti
  • Monitora impatto performance

🔒 Sicurezza

  • Valida input nei middleware
  • Non loggare dati sensibili
  • Gestisci eccezioni gracefully
  • Usa HTTPS per middleware auth
🚀 Ottimo! Ora padroneggi i middleware di Flux per creare pipeline HTTP potenti. Scopri come implementare Autenticazione completa per proteggere la tua applicazione.