Definire Models
ORM potente e intuitivo per interagire con il database in modo elegante
ActiveRecord Pattern: I Models Flux seguono il pattern ActiveRecord,
combinando dati e logica di business in un'unica classe elegante e potente.
Model Base
Ogni Model estende la classe Model e rappresenta una tabella del database:
# app/models/User.flux
from flux.db import Model
class User(Model):
# Nome tabella (opzionale, default: plurale minuscolo)
table = "users"
# Primary key (default: "id")
primary_key = "id"
# Campi fillable (mass assignment)
fillable = ["name", "email", "password", "role"]
# Campi protetti (non fillable)
guarded = ["id", "created_at", "updated_at"]
# Campi nascosti (non serializzati in JSON)
hidden = ["password", "remember_token"]
# Cast automatici
casts = {
"is_active": "boolean",
"settings": "json",
"last_login": "datetime"
}
# Timestamps automatici (default: true)
timestamps = True
# Soft deletes
soft_deletes = True
Convenzioni di Naming
Flux segue convenzioni intuitive per mappare Models e tabelle:
# Convenzioni automatiche
User β users # Plurale minuscolo
BlogPost β blog_posts # Snake_case
Category β categories # Plurale irregolare gestito
# Override manuale
class Product(Model):
table = "inventory_products" # Nome custom
class Person(Model):
table = "people" # Plurale irregolare
Attributi e ProprietΓ
Fillable e Guarded
class User(Model):
# Whitelist: solo questi campi possono essere mass-assigned
fillable = ["name", "email", "bio", "avatar"]
# Blacklist: questi campi sono sempre protetti
guarded = ["id", "role", "is_admin"]
# Uso
user = User.create({
"name": "Mario Rossi",
"email": "mario@test.com",
"role": "admin", # Ignorato (guarded)
"is_admin": True # Ignorato (guarded)
})
# Solo name e email vengono salvati
Cast dei Tipi
class User(Model):
casts = {
"is_active": "boolean", # 0/1 β True/False
"settings": "json", # JSON string β dict
"tags": "array", # CSV β list
"birth_date": "date", # string β date object
"last_login": "datetime", # string β datetime object
"salary": "decimal", # string β Decimal
"score": "float", # string β float
"count": "integer" # string β int
}
# Uso automatico
user = User.find(1)
print(user.is_active) # True (non "1")
print(user.settings["theme"]) # Accesso diretto al dict
print(user.birth_date.year) # Metodi datetime disponibili
Attributi Nascosti
class User(Model):
hidden = ["password", "remember_token", "api_key"]
# Rendi visibile temporaneamente
visible = ["password"] # Override per questo model
# Serializzazione
user = User.find(1)
print(user.to_json()) # password non inclusa
print(user.to_dict()) # password non inclusa
# Mostra campi nascosti
user.make_visible(["password"])
print(user.to_dict()) # password inclusa
CRUD Operations
Create
# Nuovo record
user = User()
user.name = "Mario Rossi"
user.email = "mario@test.com"
user.save()
# Create con dati
user = User.create({
"name": "Luigi Verdi",
"email": "luigi@test.com",
"password": hash_password("secret")
})
# Create o update
user = User.create_or_update(
{"email": "mario@test.com"}, # Condizione
{"name": "Mario Rossi Updated", "last_login": now()} # Dati
)
# Bulk insert
User.insert([
{"name": "User 1", "email": "user1@test.com"},
{"name": "User 2", "email": "user2@test.com"},
{"name": "User 3", "email": "user3@test.com"}
])
# Insert ignore (salta duplicati)
User.insert_or_ignore([
{"email": "existing@test.com", "name": "Test"}
])
Read
# Find per primary key
user = User.find(1)
user = User.find([1, 2, 3]) # Multipli
# Find con fallback
user = User.find_or_fail(1) # Exception se non trovato
user = User.find_or_new(1) # Nuovo model se non trovato
user = User.find_or_create(1, {"name": "Default"})
# First
user = User.first() # Primo record
user = User.where("active", True).first()
user = User.first_or_fail() # Exception se vuoto
user = User.first_or_create({"email": "test@test.com"})
# Get all
users = User.all()
users = User.where("active", True).get()
# Pagination
users = User.paginate(page=1, per_page=10)
users = User.where("role", "admin").paginate(15)
# Count
count = User.count()
count = User.where("active", True).count()
# Exists
exists = User.where("email", "test@test.com").exists()
# Aggregations
avg_age = User.avg("age")
max_score = User.max("score")
min_date = User.min("created_at")
sum_points = User.sum("points")
Update
# Update singolo
user = User.find(1)
user.name = "Nome Aggiornato"
user.save()
# Update con fill
user = User.find(1)
user.fill({"name": "Nuovo Nome", "email": "nuovo@email.com"})
user.save()
# Update mass
User.where("active", False).update({"status": "inactive"})
# Update o create
user = User.update_or_create(
{"email": "test@test.com"}, # Condizione
{"name": "Test User", "active": True} # Dati
)
# Increment/Decrement
User.where("id", 1).increment("login_count")
User.where("id", 1).decrement("attempts", 1)
User.where("role", "premium").increment("credits", 100)
# Touch (aggiorna timestamps)
user.touch()
User.where("active", True).touch()
Delete
# Delete singolo
user = User.find(1)
user.delete()
# Delete per primary key
User.destroy(1)
User.destroy([1, 2, 3])
# Delete con condizioni
User.where("active", False).delete()
# Soft delete (se abilitato)
user.delete() # Soft delete
user.force_delete() # Hard delete
# Restore soft deleted
user.restore()
User.where("deleted_at", "!=", None).restore()
# Solo soft deleted
User.only_trashed().get()
User.with_trashed().get() # Include soft deleted
Query Builder
Costruisci query complesse con una sintassi fluida:
# Where clauses
User.where("name", "Mario").get()
User.where("age", ">", 18).get()
User.where("email", "like", "%@gmail.com").get()
User.where("active", True).where("role", "admin").get()
# Or where
User.where("role", "admin").or_where("role", "moderator").get()
# Where in
User.where_in("id", [1, 2, 3]).get()
User.where_not_in("status", ["banned", "inactive"]).get()
# Where null
User.where_null("deleted_at").get()
User.where_not_null("email_verified_at").get()
# Where between
User.where_between("age", 18, 65).get()
User.where_not_between("score", 0, 50).get()
# Where date
User.where_date("created_at", "2024-01-15").get()
User.where_month("created_at", 12).get()
User.where_year("created_at", 2024).get()
# Complex where
User.where(lambda q:
q.where("age", ">", 18).or_where("verified", True)
).where("active", True).get()
# Order by
User.order_by("name").get()
User.order_by("created_at", "desc").get()
User.order_by("name").order_by("email", "desc").get()
# Group by & Having
User.select("role", "count(*) as total")\
.group_by("role")\
.having("total", ">", 5)\
.get()
# Limit & Offset
User.limit(10).get()
User.offset(20).limit(10).get()
User.skip(20).take(10).get() # Alias
# Select specific columns
User.select("name", "email").get()
User.select(["name", "email", "created_at"]).get()
# Distinct
User.select("role").distinct().get()
# Join (vedi sezione Relazioni per esempi avanzati)
User.join("posts", "users.id", "=", "posts.user_id")\
.select("users.*", "posts.title")\
.get()
Scope e Query Locali
Definisci query riutilizzabili come metodi del Model:
class User(Model):
# Scope globali (sempre applicati)
@staticmethod
def global_scope(query):
return query.where("deleted_at", None)
# Scope locali
def scope_active(self, query):
return query.where("active", True)
def scope_role(self, query, role):
return query.where("role", role)
def scope_recent(self, query, days=7):
return query.where("created_at", ">=",
date_sub(now(), days=days))
def scope_with_posts(self, query):
return query.has("posts")
def scope_search(self, query, term):
return query.where("name", "like", f"%{term}%")\
.or_where("email", "like", f"%{term}%")
# Uso degli scope
active_users = User.active().get()
admins = User.role("admin").get()
recent_users = User.recent(30).get()
search_results = User.search("mario").get()
# Combinazione scope
recent_admins = User.active().role("admin").recent(7).get()
# Scope dinamici
class Post(Model):
def scope_status(self, query, status):
return query.where("status", status)
def scope_author(self, query, author_id):
return query.where("author_id", author_id)
# Uso
published_posts = Post.status("published").get()
my_drafts = Post.author(current_user.id).status("draft").get()
Accessors e Mutators
Modifica automaticamente i dati quando vengono letti o scritti:
class User(Model):
# Accessor - modifica quando leggi
def get_name_attribute(self, value):
return str_title(value) # Sempre title case
def get_full_name_attribute(self):
return f"{self.first_name} {self.last_name}"
def get_avatar_url_attribute(self):
if self.avatar:
return asset(f"avatars/{self.avatar}")
return asset("images/default-avatar.png")
def get_is_admin_attribute(self):
return self.role == "admin"
# Mutator - modifica quando scrivi
def set_password_attribute(self, value):
self.attributes["password"] = hash_password(value)
def set_email_attribute(self, value):
self.attributes["email"] = str_lower(str_trim(value))
def set_name_attribute(self, value):
self.attributes["name"] = str_title(str_trim(value))
# Uso automatico
user = User.find(1)
print(user.name) # Sempre title case (accessor)
print(user.full_name) # Computed attribute
print(user.avatar_url) # URL completo
user.password = "plaintext" # Automaticamente hashato
user.email = " TEST@EXAMPLE.COM " # Pulito e lowercase
user.save()
Eventi del Model
Hook per eseguire codice durante il ciclo di vita del Model:
class User(Model):
# Eventi disponibili
def boot(self):
# Inizializzazione del model
pass
def creating(self):
# Prima della creazione
self.uuid = uuid()
self.created_by = current_user().id if current_user() else None
def created(self):
# Dopo la creazione
log_info(f"Nuovo utente creato: {self.email}")
send_welcome_email(self)
def updating(self):
# Prima dell'aggiornamento
if self.is_dirty("email"):
self.email_verified_at = None
def updated(self):
# Dopo l'aggiornamento
if self.was_changed("role"):
log_info(f"Ruolo cambiato per {self.email}: {self.role}")
def saving(self):
# Prima di save (create o update)
self.updated_by = current_user().id if current_user() else None
def saved(self):
# Dopo save (create o update)
cache_forget(f"user:{self.id}")
def deleting(self):
# Prima della cancellazione
if self.posts().count() > 0:
raise Exception("Cannot delete user with posts")
def deleted(self):
# Dopo la cancellazione
log_info(f"Utente eliminato: {self.email}")
# Observer pattern (alternativo)
class UserObserver:
def creating(self, user):
user.uuid = uuid()
def created(self, user):
send_welcome_email(user)
# Registra observer
User.observe(UserObserver)
Validazione
Validazione automatica dei dati prima del salvataggio:
class User(Model):
# Regole di validazione
rules = {
"name": "required|string|min:2|max:100",
"email": "required|email|unique:users,email",
"password": "required|min:8|confirmed",
"age": "nullable|integer|min:13|max:120",
"role": "required|in:user,admin,moderator"
}
# Messaggi custom
messages = {
"email.unique": "Questa email Γ¨ giΓ registrata",
"password.min": "La password deve essere di almeno 8 caratteri",
"name.required": "Il nome Γ¨ obbligatorio"
}
# Validazione condizionale
def get_rules(self):
rules = self.rules.copy()
# Skip unique check su update
if self.exists:
rules["email"] = "required|email|unique:users,email," + str(self.id)
# Password opzionale su update
if self.exists and not self.password:
rules.pop("password", None)
return rules
# Validazione custom
def validate_custom(self):
errors = []
if self.role == "admin" and self.age < 18:
errors.append("Gli admin devono essere maggiorenni")
if self.email.endswith("@temp.com"):
errors.append("Email temporanee non ammesse")
return errors
# Uso automatico
try:
user = User.create({
"name": "Mario",
"email": "invalid-email", # Errore
"password": "123" # Troppo corta
})
except ValidationException as e:
print(e.errors) # Dict con errori per campo
# Validazione manuale
user = User()
user.name = "Test"
user.email = "test@test.com"
if user.is_valid():
user.save()
else:
print(user.errors)
Model Factory
Genera dati fake per testing e seeding:
# database/factories/UserFactory.flux
from flux.factory import Factory
from faker import Faker
class UserFactory(Factory):
model = User
def definition(self):
fake = Faker("it_IT")
return {
"name": fake.name(),
"email": fake.unique().email(),
"password": hash_password("password"),
"role": fake.random_element(["user", "admin", "moderator"]),
"active": fake.boolean(80), # 80% true
"bio": fake.text(200),
"birth_date": fake.date_between("-80y", "-18y"),
"created_at": fake.date_time_between("-2y", "now")
}
# State methods
def admin(self):
return self.state({"role": "admin", "active": True})
def inactive(self):
return self.state({"active": False})
def with_posts(self, count=5):
return self.has("posts", count)
# Uso delle factory
# Un utente
user = UserFactory.create()
# Multipli utenti
users = UserFactory.create_batch(10)
# Con state
admin = UserFactory.admin().create()
inactive_users = UserFactory.inactive().create_batch(5)
# Con relazioni
user_with_posts = UserFactory.with_posts(3).create()
# Make (non salva nel DB)
user_data = UserFactory.make()
users_data = UserFactory.make_batch(10)
# Override attributi
user = UserFactory.create({"name": "Nome Specifico"})
# Seeders
class DatabaseSeeder:
def run(self):
# Crea admin
admin = UserFactory.admin().create({
"email": "admin@example.com"
})
# Crea utenti normali
UserFactory.create_batch(50)
# Crea utenti con posts
UserFactory.with_posts(3).create_batch(10)
Serializzazione
Converti Models in array, JSON o XML:
class User(Model):
# Campi sempre nascosti
hidden = ["password", "remember_token"]
# Append computed attributes
appends = ["full_name", "avatar_url", "is_online"]
# Serializzazione custom
def to_array(self):
data = super().to_array()
# Aggiungi campi custom
data["posts_count"] = self.posts().count()
data["last_seen"] = date_human(self.last_login)
return data
def to_json_response(self):
return {
"id": self.id,
"name": self.name,
"email": self.email,
"role": self.role,
"avatar": self.avatar_url,
"stats": {
"posts": self.posts().count(),
"followers": self.followers().count()
}
}
# Uso
user = User.find(1)
# Array
print(user.to_array())
# JSON
print(user.to_json())
json_response(user.to_json_response())
# Collection
users = User.all()
print(users.to_array()) # Array di array
print(users.to_json()) # JSON di array
# Con relazioni
user_with_posts = User.with("posts").find(1)
print(user_with_posts.to_json()) # Include posts
# Nascondere campi temporaneamente
user.make_hidden(["email"]).to_json()
user.make_visible(["password"]).to_json()
Esempio Completo: Blog System
# app/models/User.flux
class User(Model):
table = "users"
fillable = ["name", "email", "bio", "avatar"]
hidden = ["password", "remember_token"]
casts = {
"email_verified_at": "datetime",
"settings": "json",
"is_active": "boolean"
}
# Validazione
rules = {
"name": "required|string|min:2|max:100",
"email": "required|email|unique:users,email",
"password": "required|min:8"
}
# Relazioni
def posts(self):
return self.has_many("Post")
def comments(self):
return self.has_many("Comment")
def followers(self):
return self.belongs_to_many("User", "followers",
"user_id", "follower_id")
# Scope
def scope_active(self, query):
return query.where("is_active", True)
def scope_authors(self, query):
return query.has("posts")
# Accessors
def get_full_name_attribute(self):
return f"{self.first_name} {self.last_name}"
def get_avatar_url_attribute(self):
return self.avatar ? asset(f"avatars/{self.avatar}") : asset("images/default-avatar.png")
# Mutators
def set_password_attribute(self, value):
self.attributes["password"] = hash_password(value)
# Eventi
def created(self):
# Invia email di benvenuto
send_welcome_email(self)
# Log
log_info(f"Nuovo utente registrato: {self.email}")
# app/models/Post.flux
class Post(Model):
fillable = ["title", "slug", "content", "excerpt", "status"]
casts = {
"published_at": "datetime",
"is_featured": "boolean",
"meta": "json"
}
# Relazioni
def author(self):
return self.belongs_to("User", "user_id")
def category(self):
return self.belongs_to("Category")
def tags(self):
return self.belongs_to_many("Tag")
def comments(self):
return self.has_many("Comment").order_by("created_at")
# Scope
def scope_published(self, query):
return query.where("status", "published")\
.where("published_at", "<=", now())
def scope_featured(self, query):
return query.where("is_featured", True)
def scope_recent(self, query, days=7):
return query.where("created_at", ">=", date_sub(now(), days=days))
# Accessors
def get_read_time_attribute(self):
words = str_word_count(strip_tags(self.content))
return math_ceil(words / 200) # 200 words per minute
def get_excerpt_attribute(self, value):
return value or str_limit(strip_tags(self.content), 160)
# Mutators
def set_title_attribute(self, value):
self.attributes["title"] = str_title(value)
if not self.slug:
self.attributes["slug"] = str_slug(value)
# Eventi
def saving(self):
# Auto-genera excerpt se vuoto
if not self.excerpt:
self.excerpt = str_limit(strip_tags(self.content), 160)
# Auto-genera slug se vuoto
if not self.slug:
self.slug = str_slug(self.title)
# Uso del sistema
# Crea utente
user = User.create({
"name": "Mario Rossi",
"email": "mario@example.com",
"password": "secretpassword"
})
# Crea post
post = user.posts().create({
"title": "Il mio primo post",
"content": "Contenuto del post...",
"status": "published",
"published_at": now()
})
# Query complesse
popular_posts = Post.published()\
.with("author", "category", "tags")\
.with_count("comments")\
.order_by("views", "desc")\
.limit(10)\
.get()
recent_authors = User.active()\
.authors()\
.with_count("posts")\
.has("posts", ">=", 1)\
.order_by("posts_count", "desc")\
.get()
Best Practices
ποΈ Struttura
- Un Model per tabella
- Usa convenzioni di naming
- Organizza in sottocartelle se necessario
- Mantieni Models focalizzati
π Sicurezza
- Definisci sempre fillable/guarded
- Valida input utente
- Usa mutators per sanitizzazione
- Nascondi campi sensibili
β‘ Performance
- Usa eager loading per relazioni
- Crea scope per query comuni
- Implementa cache dove utile
- Ottimizza query N+1
π― Ottimo lavoro! Ora sai creare Models potenti e sicuri.
Scopri come costruire Query Avanzate
per sfruttare al massimo il tuo database.