Definire Models

ORM potente e intuitivo per interagire con il database in modo elegante

ActiveRecord Pattern: I Models Flux seguono il pattern ActiveRecord, combinando dati e logica di business in un'unica classe elegante e potente.

Model Base

Ogni Model estende la classe Model e rappresenta una tabella del database:

# app/models/User.flux
from flux.db import Model

class User(Model):
    # Nome tabella (opzionale, default: plurale minuscolo)
    table = "users"
    
    # Primary key (default: "id")
    primary_key = "id"
    
    # Campi fillable (mass assignment)
    fillable = ["name", "email", "password", "role"]
    
    # Campi protetti (non fillable)
    guarded = ["id", "created_at", "updated_at"]
    
    # Campi nascosti (non serializzati in JSON)
    hidden = ["password", "remember_token"]
    
    # Cast automatici
    casts = {
        "is_active": "boolean",
        "settings": "json",
        "last_login": "datetime"
    }
    
    # Timestamps automatici (default: true)
    timestamps = True
    
    # Soft deletes
    soft_deletes = True

Convenzioni di Naming

Flux segue convenzioni intuitive per mappare Models e tabelle:

# Convenzioni automatiche
User       β†’ users         # Plurale minuscolo
BlogPost   β†’ blog_posts    # Snake_case
Category   β†’ categories    # Plurale irregolare gestito

# Override manuale
class Product(Model):
    table = "inventory_products"  # Nome custom
    
class Person(Model):
    table = "people"              # Plurale irregolare

Attributi e ProprietΓ 

Fillable e Guarded

class User(Model):
                    # Whitelist: solo questi campi possono essere mass-assigned
                    fillable = ["name", "email", "bio", "avatar"]
                    
                    # Blacklist: questi campi sono sempre protetti
                    guarded = ["id", "role", "is_admin"]
                    
# Uso
user = User.create({
    "name": "Mario Rossi",
    "email": "mario@test.com",
    "role": "admin",        # Ignorato (guarded)
    "is_admin": True        # Ignorato (guarded)
})

# Solo name e email vengono salvati

Cast dei Tipi

class User(Model):
    casts = {
        "is_active": "boolean",      # 0/1 β†’ True/False
        "settings": "json",          # JSON string β†’ dict
        "tags": "array",             # CSV β†’ list
        "birth_date": "date",        # string β†’ date object
        "last_login": "datetime",    # string β†’ datetime object
        "salary": "decimal",         # string β†’ Decimal
        "score": "float",            # string β†’ float
        "count": "integer"           # string β†’ int
    }

# Uso automatico
user = User.find(1)
print(user.is_active)        # True (non "1")
print(user.settings["theme"]) # Accesso diretto al dict
print(user.birth_date.year)  # Metodi datetime disponibili

Attributi Nascosti

class User(Model):
    hidden = ["password", "remember_token", "api_key"]
    
    # Rendi visibile temporaneamente
    visible = ["password"]  # Override per questo model

# Serializzazione
user = User.find(1)
print(user.to_json())    # password non inclusa
print(user.to_dict())    # password non inclusa

# Mostra campi nascosti
user.make_visible(["password"])
print(user.to_dict())    # password inclusa

CRUD Operations

Create

# Nuovo record
user = User()
user.name = "Mario Rossi"
user.email = "mario@test.com"
user.save()

# Create con dati
user = User.create({
    "name": "Luigi Verdi",
    "email": "luigi@test.com",
    "password": hash_password("secret")
})

# Create o update
user = User.create_or_update(
    {"email": "mario@test.com"},  # Condizione
    {"name": "Mario Rossi Updated", "last_login": now()}  # Dati
)

# Bulk insert
User.insert([
    {"name": "User 1", "email": "user1@test.com"},
    {"name": "User 2", "email": "user2@test.com"},
    {"name": "User 3", "email": "user3@test.com"}
])

# Insert ignore (salta duplicati)
User.insert_or_ignore([
    {"email": "existing@test.com", "name": "Test"}
])

Read

# Find per primary key
user = User.find(1)
user = User.find([1, 2, 3])  # Multipli

# Find con fallback
user = User.find_or_fail(1)      # Exception se non trovato
user = User.find_or_new(1)       # Nuovo model se non trovato
user = User.find_or_create(1, {"name": "Default"})

# First
user = User.first()                    # Primo record
user = User.where("active", True).first()
user = User.first_or_fail()           # Exception se vuoto
user = User.first_or_create({"email": "test@test.com"})

# Get all
users = User.all()
users = User.where("active", True).get()

# Pagination
users = User.paginate(page=1, per_page=10)
users = User.where("role", "admin").paginate(15)

# Count
count = User.count()
count = User.where("active", True).count()

# Exists
exists = User.where("email", "test@test.com").exists()

# Aggregations
avg_age = User.avg("age")
max_score = User.max("score")
min_date = User.min("created_at")
sum_points = User.sum("points")

Update

# Update singolo
user = User.find(1)
user.name = "Nome Aggiornato"
user.save()

# Update con fill
user = User.find(1)
user.fill({"name": "Nuovo Nome", "email": "nuovo@email.com"})
user.save()

# Update mass
User.where("active", False).update({"status": "inactive"})

# Update o create
user = User.update_or_create(
    {"email": "test@test.com"},  # Condizione
    {"name": "Test User", "active": True}  # Dati
)

# Increment/Decrement
User.where("id", 1).increment("login_count")
User.where("id", 1).decrement("attempts", 1)
User.where("role", "premium").increment("credits", 100)

# Touch (aggiorna timestamps)
user.touch()
User.where("active", True).touch()

Delete

# Delete singolo
user = User.find(1)
user.delete()

# Delete per primary key
User.destroy(1)
User.destroy([1, 2, 3])

# Delete con condizioni
User.where("active", False).delete()

# Soft delete (se abilitato)
user.delete()        # Soft delete
user.force_delete()  # Hard delete

# Restore soft deleted
user.restore()
User.where("deleted_at", "!=", None).restore()

# Solo soft deleted
User.only_trashed().get()
User.with_trashed().get()    # Include soft deleted

Query Builder

Costruisci query complesse con una sintassi fluida:

# Where clauses
User.where("name", "Mario").get()
User.where("age", ">", 18).get()
User.where("email", "like", "%@gmail.com").get()
User.where("active", True).where("role", "admin").get()

# Or where
User.where("role", "admin").or_where("role", "moderator").get()

# Where in
User.where_in("id", [1, 2, 3]).get()
User.where_not_in("status", ["banned", "inactive"]).get()

# Where null
User.where_null("deleted_at").get()
User.where_not_null("email_verified_at").get()

# Where between
User.where_between("age", 18, 65).get()
User.where_not_between("score", 0, 50).get()

# Where date
User.where_date("created_at", "2024-01-15").get()
User.where_month("created_at", 12).get()
User.where_year("created_at", 2024).get()

# Complex where
User.where(lambda q: 
    q.where("age", ">", 18).or_where("verified", True)
).where("active", True).get()

# Order by
User.order_by("name").get()
User.order_by("created_at", "desc").get()
User.order_by("name").order_by("email", "desc").get()

# Group by & Having
User.select("role", "count(*) as total")\
    .group_by("role")\
    .having("total", ">", 5)\
    .get()

# Limit & Offset
User.limit(10).get()
User.offset(20).limit(10).get()
User.skip(20).take(10).get()  # Alias

# Select specific columns
User.select("name", "email").get()
User.select(["name", "email", "created_at"]).get()

# Distinct
User.select("role").distinct().get()

# Join (vedi sezione Relazioni per esempi avanzati)
User.join("posts", "users.id", "=", "posts.user_id")\
    .select("users.*", "posts.title")\
    .get()

Scope e Query Locali

Definisci query riutilizzabili come metodi del Model:

class User(Model):
    # Scope globali (sempre applicati)
    @staticmethod
    def global_scope(query):
        return query.where("deleted_at", None)
    
    # Scope locali
    def scope_active(self, query):
        return query.where("active", True)
    
    def scope_role(self, query, role):
        return query.where("role", role)
    
    def scope_recent(self, query, days=7):
        return query.where("created_at", ">=", 
                          date_sub(now(), days=days))
    
    def scope_with_posts(self, query):
        return query.has("posts")
    
    def scope_search(self, query, term):
        return query.where("name", "like", f"%{term}%")\
                   .or_where("email", "like", f"%{term}%")

# Uso degli scope
active_users = User.active().get()
admins = User.role("admin").get()
recent_users = User.recent(30).get()
search_results = User.search("mario").get()

# Combinazione scope
recent_admins = User.active().role("admin").recent(7).get()

# Scope dinamici
class Post(Model):
    def scope_status(self, query, status):
        return query.where("status", status)
    
    def scope_author(self, query, author_id):
        return query.where("author_id", author_id)

# Uso
published_posts = Post.status("published").get()
my_drafts = Post.author(current_user.id).status("draft").get()

Accessors e Mutators

Modifica automaticamente i dati quando vengono letti o scritti:

class User(Model):
    # Accessor - modifica quando leggi
    def get_name_attribute(self, value):
        return str_title(value)  # Sempre title case
    
    def get_full_name_attribute(self):
        return f"{self.first_name} {self.last_name}"
    
    def get_avatar_url_attribute(self):
        if self.avatar:
            return asset(f"avatars/{self.avatar}")
        return asset("images/default-avatar.png")
    
    def get_is_admin_attribute(self):
        return self.role == "admin"
    
    # Mutator - modifica quando scrivi
    def set_password_attribute(self, value):
        self.attributes["password"] = hash_password(value)
    
    def set_email_attribute(self, value):
        self.attributes["email"] = str_lower(str_trim(value))
    
    def set_name_attribute(self, value):
        self.attributes["name"] = str_title(str_trim(value))

# Uso automatico
user = User.find(1)
print(user.name)         # Sempre title case (accessor)
print(user.full_name)    # Computed attribute
print(user.avatar_url)   # URL completo

user.password = "plaintext"  # Automaticamente hashato
user.email = "  TEST@EXAMPLE.COM  "  # Pulito e lowercase
user.save()

Eventi del Model

Hook per eseguire codice durante il ciclo di vita del Model:

class User(Model):
    # Eventi disponibili
    def boot(self):
        # Inizializzazione del model
        pass
    
    def creating(self):
        # Prima della creazione
        self.uuid = uuid()
        self.created_by = current_user().id if current_user() else None
    
    def created(self):
        # Dopo la creazione
        log_info(f"Nuovo utente creato: {self.email}")
        send_welcome_email(self)
    
    def updating(self):
        # Prima dell'aggiornamento
        if self.is_dirty("email"):
            self.email_verified_at = None
    
    def updated(self):
        # Dopo l'aggiornamento
        if self.was_changed("role"):
            log_info(f"Ruolo cambiato per {self.email}: {self.role}")
    
    def saving(self):
        # Prima di save (create o update)
        self.updated_by = current_user().id if current_user() else None
    
    def saved(self):
        # Dopo save (create o update)
        cache_forget(f"user:{self.id}")
    
    def deleting(self):
        # Prima della cancellazione
        if self.posts().count() > 0:
            raise Exception("Cannot delete user with posts")
    
    def deleted(self):
        # Dopo la cancellazione
        log_info(f"Utente eliminato: {self.email}")

# Observer pattern (alternativo)
class UserObserver:
    def creating(self, user):
        user.uuid = uuid()
    
    def created(self, user):
        send_welcome_email(user)

# Registra observer
User.observe(UserObserver)

Validazione

Validazione automatica dei dati prima del salvataggio:

class User(Model):
    # Regole di validazione
    rules = {
        "name": "required|string|min:2|max:100",
        "email": "required|email|unique:users,email",
        "password": "required|min:8|confirmed",
        "age": "nullable|integer|min:13|max:120",
        "role": "required|in:user,admin,moderator"
    }
    
    # Messaggi custom
    messages = {
        "email.unique": "Questa email Γ¨ giΓ  registrata",
        "password.min": "La password deve essere di almeno 8 caratteri",
        "name.required": "Il nome Γ¨ obbligatorio"
    }
    
    # Validazione condizionale
    def get_rules(self):
        rules = self.rules.copy()
        
        # Skip unique check su update
        if self.exists:
            rules["email"] = "required|email|unique:users,email," + str(self.id)
        
        # Password opzionale su update
        if self.exists and not self.password:
            rules.pop("password", None)
            
        return rules
    
    # Validazione custom
    def validate_custom(self):
        errors = []
        
        if self.role == "admin" and self.age < 18:
            errors.append("Gli admin devono essere maggiorenni")
        
        if self.email.endswith("@temp.com"):
            errors.append("Email temporanee non ammesse")
            
        return errors

# Uso automatico
try:
    user = User.create({
        "name": "Mario",
        "email": "invalid-email",  # Errore
        "password": "123"          # Troppo corta
    })
except ValidationException as e:
    print(e.errors)  # Dict con errori per campo

# Validazione manuale
user = User()
user.name = "Test"
user.email = "test@test.com"

if user.is_valid():
    user.save()
else:
    print(user.errors)

Model Factory

Genera dati fake per testing e seeding:

# database/factories/UserFactory.flux
from flux.factory import Factory
from faker import Faker

class UserFactory(Factory):
    model = User
    
    def definition(self):
        fake = Faker("it_IT")
        
        return {
            "name": fake.name(),
            "email": fake.unique().email(),
            "password": hash_password("password"),
            "role": fake.random_element(["user", "admin", "moderator"]),
            "active": fake.boolean(80),  # 80% true
            "bio": fake.text(200),
            "birth_date": fake.date_between("-80y", "-18y"),
            "created_at": fake.date_time_between("-2y", "now")
        }
    
    # State methods
    def admin(self):
        return self.state({"role": "admin", "active": True})
    
    def inactive(self):
        return self.state({"active": False})
    
    def with_posts(self, count=5):
        return self.has("posts", count)

# Uso delle factory
# Un utente
user = UserFactory.create()

# Multipli utenti
users = UserFactory.create_batch(10)

# Con state
admin = UserFactory.admin().create()
inactive_users = UserFactory.inactive().create_batch(5)

# Con relazioni
user_with_posts = UserFactory.with_posts(3).create()

# Make (non salva nel DB)
user_data = UserFactory.make()
users_data = UserFactory.make_batch(10)

# Override attributi
user = UserFactory.create({"name": "Nome Specifico"})

# Seeders
class DatabaseSeeder:
    def run(self):
        # Crea admin
        admin = UserFactory.admin().create({
            "email": "admin@example.com"
        })
        
        # Crea utenti normali
        UserFactory.create_batch(50)
        
        # Crea utenti con posts
        UserFactory.with_posts(3).create_batch(10)

Serializzazione

Converti Models in array, JSON o XML:

class User(Model):
    # Campi sempre nascosti
    hidden = ["password", "remember_token"]
    
    # Append computed attributes
    appends = ["full_name", "avatar_url", "is_online"]
    
    # Serializzazione custom
    def to_array(self):
        data = super().to_array()
        
        # Aggiungi campi custom
        data["posts_count"] = self.posts().count()
        data["last_seen"] = date_human(self.last_login)
        
        return data
    
    def to_json_response(self):
        return {
            "id": self.id,
            "name": self.name,
            "email": self.email,
            "role": self.role,
            "avatar": self.avatar_url,
            "stats": {
                "posts": self.posts().count(),
                "followers": self.followers().count()
            }
        }

# Uso
user = User.find(1)

# Array
print(user.to_array())

# JSON
print(user.to_json())
json_response(user.to_json_response())

# Collection
users = User.all()
print(users.to_array())  # Array di array
print(users.to_json())   # JSON di array

# Con relazioni
user_with_posts = User.with("posts").find(1)
print(user_with_posts.to_json())  # Include posts

# Nascondere campi temporaneamente
user.make_hidden(["email"]).to_json()
user.make_visible(["password"]).to_json()

Esempio Completo: Blog System

# app/models/User.flux
class User(Model):
    table = "users"
    
    fillable = ["name", "email", "bio", "avatar"]
    hidden = ["password", "remember_token"]
    
    casts = {
        "email_verified_at": "datetime",
        "settings": "json",
        "is_active": "boolean"
    }
    
    # Validazione
    rules = {
        "name": "required|string|min:2|max:100",
        "email": "required|email|unique:users,email",
        "password": "required|min:8"
    }
    
    # Relazioni
    def posts(self):
        return self.has_many("Post")
    
    def comments(self):
        return self.has_many("Comment")
    
    def followers(self):
        return self.belongs_to_many("User", "followers", 
                                   "user_id", "follower_id")
    
    # Scope
    def scope_active(self, query):
        return query.where("is_active", True)
    
    def scope_authors(self, query):
        return query.has("posts")
    
    # Accessors
    def get_full_name_attribute(self):
        return f"{self.first_name} {self.last_name}"
    
    def get_avatar_url_attribute(self):
        return self.avatar ? asset(f"avatars/{self.avatar}") : asset("images/default-avatar.png")
    
    # Mutators
    def set_password_attribute(self, value):
        self.attributes["password"] = hash_password(value)
    
    # Eventi
    def created(self):
        # Invia email di benvenuto
        send_welcome_email(self)
        
        # Log
        log_info(f"Nuovo utente registrato: {self.email}")

# app/models/Post.flux
class Post(Model):
    fillable = ["title", "slug", "content", "excerpt", "status"]
    
    casts = {
        "published_at": "datetime",
        "is_featured": "boolean",
        "meta": "json"
    }
    
    # Relazioni
    def author(self):
        return self.belongs_to("User", "user_id")
    
    def category(self):
        return self.belongs_to("Category")
    
    def tags(self):
        return self.belongs_to_many("Tag")
    
    def comments(self):
        return self.has_many("Comment").order_by("created_at")
    
    # Scope
    def scope_published(self, query):
        return query.where("status", "published")\
                   .where("published_at", "<=", now())
    
    def scope_featured(self, query):
        return query.where("is_featured", True)
    
    def scope_recent(self, query, days=7):
        return query.where("created_at", ">=", date_sub(now(), days=days))
    
    # Accessors
    def get_read_time_attribute(self):
        words = str_word_count(strip_tags(self.content))
        return math_ceil(words / 200)  # 200 words per minute
    
    def get_excerpt_attribute(self, value):
        return value or str_limit(strip_tags(self.content), 160)
    
    # Mutators
    def set_title_attribute(self, value):
        self.attributes["title"] = str_title(value)
        if not self.slug:
            self.attributes["slug"] = str_slug(value)
    
    # Eventi
    def saving(self):
        # Auto-genera excerpt se vuoto
        if not self.excerpt:
            self.excerpt = str_limit(strip_tags(self.content), 160)
        
        # Auto-genera slug se vuoto
        if not self.slug:
            self.slug = str_slug(self.title)

# Uso del sistema
# Crea utente
user = User.create({
    "name": "Mario Rossi",
    "email": "mario@example.com",
    "password": "secretpassword"
})

# Crea post
post = user.posts().create({
    "title": "Il mio primo post",
    "content": "Contenuto del post...",
    "status": "published",
    "published_at": now()
})

# Query complesse
popular_posts = Post.published()\
                   .with("author", "category", "tags")\
                   .with_count("comments")\
                   .order_by("views", "desc")\
                   .limit(10)\
                   .get()

recent_authors = User.active()\
                    .authors()\
                    .with_count("posts")\
                    .has("posts", ">=", 1)\
                    .order_by("posts_count", "desc")\
                    .get()

Best Practices

πŸ—οΈ Struttura

  • Un Model per tabella
  • Usa convenzioni di naming
  • Organizza in sottocartelle se necessario
  • Mantieni Models focalizzati

πŸ”’ Sicurezza

  • Definisci sempre fillable/guarded
  • Valida input utente
  • Usa mutators per sanitizzazione
  • Nascondi campi sensibili

⚑ Performance

  • Usa eager loading per relazioni
  • Crea scope per query comuni
  • Implementa cache dove utile
  • Ottimizza query N+1
🎯 Ottimo lavoro! Ora sai creare Models potenti e sicuri. Scopri come costruire Query Avanzate per sfruttare al massimo il tuo database.