Autenticazione

Sistema completo di autenticazione e autorizzazione per applicazioni sicure

Sicurezza Integrata: Flux include un sistema di autenticazione completo con hashing sicuro, session management, JWT, OAuth e controllo granulare dei permessi.

Setup Base

Configura il sistema di autenticazione:

# config/auth.flux
AUTH_CONFIG = {
    # Default guard
    "default": "web",
    
    # Guards disponibili
    "guards": {
        "web": {
            "driver": "session",
            "provider": "users"
        },
        "api": {
            "driver": "token",
            "provider": "users",
            "hash": False  # Token in chiaro nel DB
        },
        "jwt": {
            "driver": "jwt",
            "provider": "users",
            "secret": env("JWT_SECRET"),
            "algo": "HS256",
            "expire": 3600  # 1 ora
        }
    },
    
    # User providers
    "providers": {
        "users": {
            "driver": "flux_orm",
            "model": "User"
        },
        "admin": {
            "driver": "flux_orm", 
            "model": "Admin"
        }
    },
    
    # Password settings
    "passwords": {
        "users": {
            "provider": "users",
            "table": "password_resets",
            "expire": 60,  # minuti
            "throttle": 60  # secondi tra reset
        }
    }
}

User Model

Configura il model User per l'autenticazione:

# app/models/User.flux
from flux.auth import Authenticatable
from flux.db import Model

class User(Model, Authenticatable):
    
    fillable = ["name", "email", "password"]
    hidden = ["password", "remember_token", "api_token"]
    
    casts = {
        "email_verified_at": "datetime",
        "last_login": "datetime",
        "settings": "json"
    }
    
    # Password hashing automatico
    def set_password_attribute(self, value):
        self.attributes["password"] = hash_password(value)
    
    # Verifica password
    def check_password(self, password):
        return verify_password(password, self.password)
    
    # Token API
    def generate_api_token(self):
        self.api_token = generate_token(60)
        self.save()
        return self.api_token
    
    # JWT claims custom
    def get_jwt_custom_claims(self):
        return {
            "role": self.role,
            "permissions": self.permissions.pluck("name").tolist()
        }
    
    # Remember token
    def set_remember_token(self, token):
        self.remember_token = token
    
    def get_remember_token(self):
        return self.remember_token
    
    # Identifier unico
    def get_auth_identifier(self):
        return self.id
    
    # Nome identifier
    def get_auth_identifier_name(self):
        return "id"

Login e Logout

Implementa autenticazione base:

# routes/auth.flux
@route("/login", methods=["GET", "POST"])
def login():
    if request.method == "GET":
        return render("auth.login")
    
    # Validazione
    data = validate_request({
        "email": "required|email",
        "password": "required|min:6",
        "remember": "boolean"
    })
    
    # Tentativo login
    credentials = {
        "email": data["email"],
        "password": data["password"]
    }
    
    remember = data.get("remember", False)
    
    if Auth.attempt(credentials, remember):
        # Login successo
        user = Auth.user()
        user.update({"last_login": now()})
        
        # Log attività
        log_info(f"User {user.email} logged in", {
            "user_id": user.id,
            "ip": request.ip,
            "user_agent": request.header("User-Agent")
        })
        
        # Redirect intended
        return redirect().intended("/dashboard")
    else:
        # Login fallito
        return redirect("/login").with_errors({
            "email": "Credenziali non valide"
        }).with_input()

@route("/logout", methods=["POST"])
@middleware("auth")
def logout():
    user = Auth.user()
    
    # Log logout
    log_info(f"User {user.email} logged out", {"user_id": user.id})
    
    Auth.logout()
    
    return redirect("/").with("success", "Logout effettuato")

# Controller approach
class AuthController:
    
    def show_login(self):
        return render("auth.login")
    
    def login(self):
        # Rate limiting
        if self.has_too_many_login_attempts():
            return self.send_lockout_response()
        
        data = validate_request({
            "email": "required|email",
            "password": "required"
        })
        
        if Auth.attempt(data, request.filled("remember")):
            self.clear_login_attempts()
            return self.send_login_response()
        
        self.increment_login_attempts()
        return self.send_failed_login_response()
    
    def has_too_many_login_attempts(self):
        return self.limiter().too_many_attempts(
            self.throttle_key(), 5, 1  # 5 tentativi per minuto
        )
    
    def throttle_key(self):
        return f"login.{request.input('email')}.{request.ip}"

Registrazione

Sistema di registrazione con validazione:

# routes/auth.flux
@route("/register", methods=["GET", "POST"])
@middleware("guest")  # Solo per non autenticati
def register():
    if request.method == "GET":
        return render("auth.register")
    
    # Validazione registrazione
    data = validate_request({
        "name": "required|string|min:2|max:255",
        "email": "required|email|unique:users,email",
        "password": "required|min:8|confirmed",
        "terms": "required|accepted"
    })
    
    # Crea utente
    user = User.create({
        "name": data["name"],
        "email": data["email"],
        "password": data["password"],  # Auto-hashed dal mutator
        "email_verification_token": generate_token()
    })
    
    # Invia email verifica
    send_email_verification(user)
    
    # Login automatico (opzionale)
    Auth.login(user)
    
    # Log registrazione
    log_info(f"New user registered: {user.email}", {
        "user_id": user.id,
        "ip": request.ip
    })
    
    return redirect("/dashboard").with("success", 
        "Registrazione completata! Controlla la tua email per verificare l'account.")

# Email verification
@route("/email/verify/{id}/{hash}")
def verify_email(id, hash):
    user = User.find_or_fail(id)
    
    # Verifica hash
    expected_hash = sha1(user.email)
    if not hash_equals(hash, expected_hash):
        return redirect("/").with("error", "Link non valido")
    
    # Verifica già completata
    if user.email_verified_at:
        return redirect("/dashboard").with("info", "Email già verificata")
    
    # Completa verifica
    user.update({
        "email_verified_at": now(),
        "email_verification_token": None
    })
    
    return redirect("/dashboard").with("success", "Email verificata con successo!")

@route("/email/resend")
@middleware("auth")
def resend_verification():
    user = Auth.user()
    
    if user.email_verified_at:
        return redirect("/dashboard")
    
    # Throttling
    if user.verification_sent_at and user.verification_sent_at > date_sub(now(), minutes=1):
        return redirect()->back().with("error", "Email già inviata di recente")
    
    send_email_verification(user)
    user.update({"verification_sent_at": now()})
    
    return redirect()->back().with("success", "Email di verifica inviata")

Password Reset

Sistema completo per reset password:

# Password reset request
@route("/password/reset", methods=["GET", "POST"])
def forgot_password():
    if request.method == "GET":
        return render("auth.forgot_password")
    
    data = validate_request({
        "email": "required|email"
    })
    
    user = User.where("email", data["email"]).first()
    
    if user:
        # Genera token reset
        token = generate_token(60)
        
        # Salva in tabella password_resets
        DB.table("password_resets").insert({
            "email": user.email,
            "token": hash_password(token),
            "created_at": now()
        })
        
        # Invia email
        send_password_reset_email(user, token)
    
    # Sempre successo per sicurezza
    return redirect("/password/reset").with("success", 
        "Se l'email esiste, riceverai le istruzioni per il reset")

# Password reset form
@route("/password/reset/{token}", methods=["GET", "POST"])
def reset_password(token):
    if request.method == "GET":
        return render("auth.reset_password", {"token": token})
    
    data = validate_request({
        "email": "required|email",
        "password": "required|min:8|confirmed",
        "token": "required"
    })
    
    # Verifica token
    reset_record = DB.table("password_resets")\
                     .where("email", data["email"])\
                     .where("created_at", ">", date_sub(now(), minutes=60))\
                     .first()
    
    if not reset_record or not verify_password(data["token"], reset_record["token"]):
        return redirect("/password/reset").with("error", "Token non valido o scaduto")
    
    # Reset password
    user = User.where("email", data["email"]).first()
    user.update({"password": data["password"]})
    
    # Rimuovi token usato
    DB.table("password_resets").where("email", data["email"]).delete()
    
    # Login automatico
    Auth.login(user)
    
    return redirect("/dashboard").with("success", "Password aggiornata con successo!")

# Change password (utente loggato)
@route("/password/change", methods=["GET", "POST"])
@middleware("auth")
def change_password():
    if request.method == "GET":
        return render("auth.change_password")
    
    data = validate_request({
        "current_password": "required",
        "password": "required|min:8|confirmed"
    })
    
    user = Auth.user()
    
    # Verifica password corrente
    if not user.check_password(data["current_password"]):
        return redirect()->back().with_errors({
            "current_password": "Password corrente non corretta"
        })
    
    # Aggiorna password
    user.update({"password": data["password"]})
    
    # Log cambio password
    log_info(f"Password changed for user {user.email}", {"user_id": user.id})
    
    return redirect()->back().with("success", "Password aggiornata!")

JWT Authentication

Autenticazione stateless con JSON Web Tokens:

# JWT Service
from flux.auth.jwt import JWT

class JWTService:
    
    def __init__(self):
        self.secret = config("auth.guards.jwt.secret")
        self.algo = config("auth.guards.jwt.algo", "HS256")
        self.expire = config("auth.guards.jwt.expire", 3600)
    
    def generate_token(self, user):
        """Genera JWT token per utente"""
        now = time.time()
        
        payload = {
            "iss": config("app.url"),  # Issuer
            "sub": user.get_auth_identifier(),  # Subject
            "iat": now,  # Issued at
            "exp": now + self.expire,  # Expiration
            "jti": str(uuid4()),  # JWT ID
            
            # Custom claims
            "email": user.email,
            "name": user.name,
            **user.get_jwt_custom_claims()
        }
        
        return JWT.encode(payload, self.secret, self.algo)
    
    def validate_token(self, token):
        """Valida e decodifica JWT token"""
        try:
            payload = JWT.decode(token, self.secret, [self.algo])
            
            # Verifica exp
            if payload["exp"] < time.time():
                return None
            
            return payload
            
        except JWT.InvalidTokenError:
            return None
    
    def refresh_token(self, token):
        """Refresh JWT token se valido"""
        payload = self.validate_token(token)
        if not payload:
            return None
        
        # Trova utente
        user = User.find(payload["sub"])
        if not user:
            return None
        
        # Genera nuovo token
        return self.generate_token(user)

# API Authentication
@route("/api/login", methods=["POST"])
def api_login():
    data = validate_request({
        "email": "required|email",
        "password": "required"
    })
    
    if not Auth.guard("api").attempt(data):
        return json_response({"error": "Invalid credentials"}, 401)
    
    user = Auth.guard("api").user()
    jwt_service = JWTService()
    token = jwt_service.generate_token(user)
    
    return json_response({
        "user": user.to_dict(),
        "token": token,
        "token_type": "Bearer",
        "expires_in": config("auth.guards.jwt.expire")
    })

@route("/api/me")
@middleware("auth:jwt")
def api_me():
    return json_response(Auth.user().to_dict())

@route("/api/refresh", methods=["POST"])
def api_refresh():
    token = request.bearer_token()
    if not token:
        return json_response({"error": "Token required"}, 401)
    
    jwt_service = JWTService()
    new_token = jwt_service.refresh_token(token)
    
    if not new_token:
        return json_response({"error": "Invalid token"}, 401)
    
    return json_response({
        "token": new_token,
        "token_type": "Bearer",
        "expires_in": config("auth.guards.jwt.expire")
    })

# JWT Middleware
class JWTMiddleware(Middleware):
    
    def handle(self, request, next):
        token = request.bearer_token()
        
        if not token:
            return json_response({"error": "Token required"}, 401)
        
        jwt_service = JWTService()
        payload = jwt_service.validate_token(token)
        
        if not payload:
            return json_response({"error": "Invalid token"}, 401)
        
        # Carica utente
        user = User.find(payload["sub"])
        if not user:
            return json_response({"error": "User not found"}, 401)
        
        Auth.set_user(user)
        request.user = user
        
        return next(request)

OAuth Integration

Autenticazione con provider esterni (Google, Facebook, GitHub):

# config/oauth.flux
OAUTH_PROVIDERS = {
    "google": {
        "client_id": env("GOOGLE_CLIENT_ID"),
        "client_secret": env("GOOGLE_CLIENT_SECRET"),
        "redirect_uri": env("APP_URL") + "/auth/google/callback",
        "scopes": ["openid", "email", "profile"]
    },
    "facebook": {
        "client_id": env("FACEBOOK_CLIENT_ID"),
        "client_secret": env("FACEBOOK_CLIENT_SECRET"),
        "redirect_uri": env("APP_URL") + "/auth/facebook/callback",
        "scopes": ["email", "public_profile"]
    },
    "github": {
        "client_id": env("GITHUB_CLIENT_ID"),
        "client_secret": env("GITHUB_CLIENT_SECRET"),
        "redirect_uri": env("APP_URL") + "/auth/github/callback",
        "scopes": ["user:email"]
    }
}

# OAuth Controller
class OAuthController:
    
    @route("/auth/{provider}")
    def redirect_to_provider(self, provider):
        """Redirect al provider OAuth"""
        if provider not in OAUTH_PROVIDERS:
            return redirect("/login").with("error", "Provider non supportato")
        
        oauth = self.get_oauth_client(provider)
        auth_url = oauth.get_authorization_url()
        
        # Salva state per sicurezza
        session_set("oauth_state", oauth.state)
        
        return redirect(auth_url)
    
    @route("/auth/{provider}/callback")
    def handle_provider_callback(self, provider):
        """Gestisce callback OAuth"""
        if provider not in OAUTH_PROVIDERS:
            return redirect("/login").with("error", "Provider non supportato")
        
        # Verifica state
        if request.input("state") != session_get("oauth_state"):
            return redirect("/login").with("error", "Richiesta non valida")
        
        oauth = self.get_oauth_client(provider)
        
        try:
            # Ottieni access token
            token = oauth.get_access_token(request.input("code"))
            
            # Ottieni dati utente
            user_data = oauth.get_user_data(token)
            
            # Trova o crea utente
            user = self.find_or_create_user(provider, user_data)
            
            # Login
            Auth.login(user)
            
            return redirect("/dashboard").with("success", 
                f"Login con {provider.title()} completato!")
            
        except OAuthException as e:
            log_error(f"OAuth error for {provider}: {str(e)}")
            return redirect("/login").with("error", "Errore durante l'autenticazione")
    
    def get_oauth_client(self, provider):
        """Crea client OAuth per provider"""
        config = OAUTH_PROVIDERS[provider]
        
        if provider == "google":
            return GoogleOAuthClient(config)
        elif provider == "facebook":
            return FacebookOAuthClient(config)
        elif provider == "github":
            return GitHubOAuthClient(config)
    
    def find_or_create_user(self, provider, user_data):
        """Trova o crea utente da dati OAuth"""
        # Cerca per provider_id
        social_account = SocialAccount.where("provider", provider)\
                                     .where("provider_id", user_data["id"])\
                                     .first()
        
        if social_account:
            return social_account.user
        
        # Cerca per email
        user = User.where("email", user_data["email"]).first()
        
        if not user:
            # Crea nuovo utente
            user = User.create({
                "name": user_data["name"],
                "email": user_data["email"],
                "email_verified_at": now(),  # OAuth email è verificata
                "avatar": user_data.get("avatar")
            })
        
        # Crea collegamento social
        user.social_accounts().create({
            "provider": provider,
            "provider_id": user_data["id"],
            "provider_token": user_data["token"],
            "provider_refresh_token": user_data.get("refresh_token")
        })
        
        return user

# Social Account Model
class SocialAccount(Model):
    
    fillable = ["provider", "provider_id", "provider_token", "provider_refresh_token"]
    
    def user(self):
        return self.belongs_to("User")

Two-Factor Authentication

Autenticazione a due fattori con TOTP:

# Two-Factor Service
import pyotp
import qrcode
from io import BytesIO

class TwoFactorService:
    
    def generate_secret(self, user):
        """Genera secret per TOTP"""
        secret = pyotp.random_base32()
        
        user.update({
            "two_factor_secret": encrypt(secret),
            "two_factor_confirmed_at": None
        })
        
        return secret
    
    def get_qr_code(self, user):
        """Genera QR code per setup"""
        secret = decrypt(user.two_factor_secret)
        
        totp = pyotp.TOTP(secret)
        provisioning_uri = totp.provisioning_uri(
            name=user.email,
            issuer_name=config("app.name")
        )
        
        # Genera QR code
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(provisioning_uri)
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        
        # Converti in base64
        buffer = BytesIO()
        img.save(buffer, format="PNG")
        img_str = base64.b64encode(buffer.getvalue()).decode()
        
        return f"data:image/png;base64,{img_str}"
    
    def verify_code(self, user, code):
        """Verifica codice TOTP"""
        if not user.two_factor_secret:
            return False
        
        secret = decrypt(user.two_factor_secret)
        totp = pyotp.TOTP(secret)
        
        return totp.verify(code, valid_window=1)  # 30 sec window
    
    def generate_recovery_codes(self, user):
        """Genera codici di recovery"""
        codes = []
        for _ in range(8):
            codes.append(generate_token(8))
        
        # Salva hash dei codici
        hashed_codes = [hash_password(code) for code in codes]
        user.update({"two_factor_recovery_codes": json.dumps(hashed_codes)})
        
        return codes
    
    def verify_recovery_code(self, user, code):
        """Verifica e consuma codice di recovery"""
        if not user.two_factor_recovery_codes:
            return False
        
        hashed_codes = json.loads(user.two_factor_recovery_codes)
        
        for i, hashed_code in enumerate(hashed_codes):
            if verify_password(code, hashed_code):
                # Rimuovi codice usato
                hashed_codes.pop(i)
                user.update({"two_factor_recovery_codes": json.dumps(hashed_codes)})
                return True
        
        return False

# 2FA Setup
@route("/2fa/setup", methods=["GET", "POST"])
@middleware("auth")
def setup_2fa():
    user = Auth.user()
    
    if request.method == "GET":
        if user.two_factor_confirmed_at:
            return redirect("/2fa/manage")
        
        # Genera secret se non esiste
        if not user.two_factor_secret:
            TwoFactorService().generate_secret(user)
        
        qr_code = TwoFactorService().get_qr_code(user)
        
        return render("auth.2fa_setup", {"qr_code": qr_code})
    
    # Conferma setup
    data = validate_request({"code": "required|digits:6"})
    
    if not TwoFactorService().verify_code(user, data["code"]):
        return redirect()->back().with("error", "Codice non valido")
    
    # Conferma 2FA
    user.update({"two_factor_confirmed_at": now()})
    
    # Genera recovery codes
    recovery_codes = TwoFactorService().generate_recovery_codes(user)
    
    return render("auth.2fa_recovery_codes", {"codes": recovery_codes})

# 2FA Challenge
class TwoFactorMiddleware(Middleware):
    
    def handle(self, request, next):
        user = Auth.user()
        
        if not user or not user.two_factor_confirmed_at:
            return next(request)
        
        # Check se già verificato in questa sessione
        if session_get("2fa_verified") == user.id:
            return next(request)
        
        # Redirect a challenge
        return redirect("/2fa/challenge")

@route("/2fa/challenge", methods=["GET", "POST"])
@middleware("auth")
def two_factor_challenge():
    user = Auth.user()
    
    if request.method == "GET":
        return render("auth.2fa_challenge")
    
    data = validate_request({
        "code": "required",
        "recovery": "boolean"
    })
    
    if data.get("recovery"):
        # Usa recovery code
        if not TwoFactorService().verify_recovery_code(user, data["code"]):
            return redirect()->back().with("error", "Codice di recovery non valido")
    else:
        # Usa TOTP
        if not TwoFactorService().verify_code(user, data["code"]):
            return redirect()->back().with("error", "Codice non valido")
    
    # Marca come verificato
    session_set("2fa_verified", user.id)
    
    return redirect().intended("/dashboard")

# Disable 2FA
@route("/2fa/disable", methods=["POST"])
@middleware("auth")
def disable_2fa():
    data = validate_request({
        "password": "required",
        "code": "required"
    })
    
    user = Auth.user()
    
    # Verifica password
    if not user.check_password(data["password"]):
        return redirect()->back().with("error", "Password non corretta")
    
    # Verifica codice 2FA
    if not TwoFactorService().verify_code(user, data["code"]):
        return redirect()->back().with("error", "Codice 2FA non valido")
    
    # Disabilita 2FA
    user.update({
        "two_factor_secret": None,
        "two_factor_confirmed_at": None,
        "two_factor_recovery_codes": None
    })
    
    # Rimuovi verifica sessione
    session_forget("2fa_verified")
    
    return redirect("/profile").with("success", "2FA disabilitato")

Role-Based Access Control

Sistema di ruoli e permessi granulari:

# Models per RBAC
class Role(Model):
    
    fillable = ["name", "display_name", "description"]
    
    def users(self):
        return self.belongs_to_many("User", "user_roles")
    
    def permissions(self):
        return self.belongs_to_many("Permission", "role_permissions")
    
    def give_permission_to(self, permission):
        """Assegna permesso al ruolo"""
        if isinstance(permission, str):
            permission = Permission.where("name", permission).first()
        
        if permission and not self.has_permission(permission):
            self.permissions().attach(permission.id)
    
    def revoke_permission_to(self, permission):
        """Revoca permesso dal ruolo"""
        if isinstance(permission, str):
            permission = Permission.where("name", permission).first()
        
        if permission:
            self.permissions().detach(permission.id)
    
    def has_permission(self, permission):
        """Controlla se ruolo ha permesso"""
        if isinstance(permission, str):
            return self.permissions().where("name", permission).exists()
        
        return self.permissions().where("id", permission.id).exists()

class Permission(Model):
    
    fillable = ["name", "display_name", "description", "category"]
    
    def roles(self):
        return self.belongs_to_many("Role", "role_permissions")

# Estendi User Model
class User(Model, Authenticatable):
    
    def roles(self):
        return self.belongs_to_many("Role", "user_roles")
    
    def permissions(self):
        """Permessi diretti + da ruoli"""
        # Permessi diretti
        direct = self.belongs_to_many("Permission", "user_permissions")
        
        # Permessi da ruoli
        role_permissions = Permission.where_in("id",
            DB.table("role_permissions")
              .where_in("role_id", self.roles().pluck("id"))
              .pluck("permission_id")
        )
        
        # Unisci
        all_permissions = direct.get().merge(role_permissions.get())
        return all_permissions.unique("id")
    
    def assign_role(self, role):
        """Assegna ruolo all'utente"""
        if isinstance(role, str):
            role = Role.where("name", role).first()
        
        if role and not self.has_role(role):
            self.roles().attach(role.id)
    
    def remove_role(self, role):
        """Rimuovi ruolo dall'utente"""
        if isinstance(role, str):
            role = Role.where("name", role).first()
        
        if role:
            self.roles().detach(role.id)
    
    def has_role(self, role):
        """Controlla se utente ha ruolo"""
        if isinstance(role, str):
            return self.roles().where("name", role).exists()
        
        return self.roles().where("id", role.id).exists()
    
    def has_permission(self, permission):
        """Controlla se utente ha permesso"""
        if isinstance(permission, str):
            permission_name = permission
        else:
            permission_name = permission.name
        
        # Controlla permessi diretti
        if self.belongs_to_many("Permission", "user_permissions")\
               .where("name", permission_name).exists():
            return True
        
        # Controlla permessi da ruoli
        for role in self.roles():
            if role.has_permission(permission_name):
                return True
        
        return False
    
    def can(self, permission):
        """Alias per has_permission"""
        return self.has_permission(permission)
    
    def cannot(self, permission):
        """Contrario di can"""
        return not self.can(permission)
    
    def is_admin(self):
        """Controlla se è admin"""
        return self.has_role("admin")

# Authorization Middleware
class RoleMiddleware(Middleware):
    
    def handle(self, request, next, role):
        user = Auth.user()
        
        if not user or not user.has_role(role):
            if request.wants_json():
                return json_response({"error": "Insufficient permissions"}, 403)
            else:
                return redirect("/").with("error", "Accesso negato")
        
        return next(request)

class PermissionMiddleware(Middleware):
    
    def handle(self, request, next, permission):
        user = Auth.user()
        
        if not user or not user.can(permission):
            if request.wants_json():
                return json_response({"error": "Insufficient permissions"}, 403)
            else:
                return redirect("/").with("error", "Permesso negato")
        
        return next(request)

# Uso nei controller
@route("/admin/users")
@middleware("auth", "role:admin")
def admin_users():
    return render("admin.users")

@route("/posts/create") 
@middleware("auth", "permission:create_posts")
def create_post():
    return render("posts.create")

# Gate system
class Gate:
    
    @staticmethod
    def define(ability, callback):
        """Definisce gate personalizzato"""
        Gate._gates[ability] = callback
    
    @staticmethod
    def allows(ability, *args):
        """Controlla se gate permette azione"""
        if ability in Gate._gates:
            return Gate._gates[ability](Auth.user(), *args)
        return False
    
    @staticmethod
    def denies(ability, *args):
        """Contrario di allows"""
        return not Gate.allows(ability, *args)
    
    _gates = {}

# Definisci gates
Gate.define("update_post", lambda user, post: user.id == post.user_id or user.is_admin())
Gate.define("delete_user", lambda user, target: user.is_admin() and user.id != target.id)

# Uso gates
if Gate.allows("update_post", post):
    # Utente può modificare post
    pass

# Helper template
def can(ability, *args):
    """Helper per template"""
    return Gate.allows(ability, *args)

# Nel template
# {% if can("update_post", post) %}
#     <a href="{{ route('posts.edit', post.id) }}">Modifica</a>
# {% endif %}

Session Management

Gestione avanzata delle sessioni utente:

# Session tracking
class UserSession(Model):
    
    fillable = ["user_id", "ip_address", "user_agent", "last_activity", "is_current"]
    
    casts = {
        "last_activity": "datetime",
        "is_current": "boolean"
    }
    
    def user(self):
        return self.belongs_to("User")
    
    def is_active(self):
        """Sessione attiva negli ultimi 5 minuti"""
        return self.last_activity > date_sub(now(), minutes=5)

# Middleware per tracking sessioni
class SessionTrackingMiddleware(Middleware):
    
    def handle(self, request, next):
        if Auth.check():
            self.track_user_session(request)
        
        return next(request)
    
    def track_user_session(self, request):
        user = Auth.user()
        session_id = session_id()
        
        # Aggiorna o crea sessione corrente
        UserSession.update_or_create(
            {"id": session_id},
            {
                "user_id": user.id,
                "ip_address": request.ip,
                "user_agent": request.header("User-Agent"),
                "last_activity": now(),
                "is_current": True
            }
        )
        
        # Marca altre sessioni come non correnti
        UserSession.where("user_id", user.id)\
                   .where("id", "!=", session_id)\
                   .update({"is_current": False})
        
        # Pulisci sessioni vecchie
        UserSession.where("last_activity", "<", date_sub(now(), days=30)).delete()

# Gestione sessioni multiple
@route("/account/sessions")
@middleware("auth")
def active_sessions():
    user = Auth.user()
    sessions = user.sessions().order_by("last_activity", "desc").get()
    
    return render("account.sessions", {"sessions": sessions})

@route("/account/sessions/{session_id}/revoke", methods=["POST"])
@middleware("auth")
def revoke_session(session_id):
    user = Auth.user()
    session = user.sessions().where("id", session_id).first()
    
    if not session:
        return redirect()->back().with("error", "Sessione non trovata")
    
    # Non permettere revoca sessione corrente
    if session.is_current:
        return redirect()->back().with("error", "Non puoi revocare la sessione corrente")
    
    session.delete()
    
    return redirect()->back().with("success", "Sessione revocata")

@route("/account/sessions/revoke-all", methods=["POST"])
@middleware("auth")
def revoke_all_sessions():
    user = Auth.user()
    current_session = session_id()
    
    # Revoca tutte le altre sessioni
    user.sessions().where("id", "!=", current_session).delete()
    
    return redirect()->back().with("success", "Tutte le altre sessioni sono state revocate")

Best Practices

🔐 Sicurezza

  • Hash password con algoritmi sicuri
  • Implementa rate limiting
  • Usa HTTPS in produzione
  • Valida e sanitizza input

🎯 UX

  • Remember me per comodità
  • Reset password semplice
  • Feedback chiari su errori
  • OAuth per registrazione veloce

📊 Monitoring

  • Log tentativi di login
  • Monitora sessioni attive
  • Notifica login sospetti
  • Audit trail per admin
🛡️ Eccellente! Ora hai un sistema di autenticazione completo e sicuro. Scopri come ottimizzare le performance con il Sistema di Cache per applicazioni scalabili.