Relazioni tra Models
Connetti i tuoi dati con relazioni potenti e intuitive
Relazioni Intelligenti: Flux gestisce automaticamente foreign keys,
eager loading e query ottimizzate per eliminare il problema N+1.
Tipi di Relazioni
Flux supporta tutti i tipi di relazioni più comuni:
# One to One
User ↔ Profile
# One to Many
User → Posts
Category → Posts
# Many to Many
Posts ↔ Tags
Users ↔ Roles
# Polymorphic
Comments → Posts/Videos/Photos
Likes → Posts/Comments
# Has One Through
Country → User → Post
# Has Many Through
Country → Users → Posts
One-to-One
Relazione uno-a-uno tra due models:
# User Model
class User(Model):
def profile(self):
return self.has_one("Profile")
def phone(self):
return self.has_one("Phone", "user_id", "id")
# Profile Model
class Profile(Model):
def user(self):
return self.belongs_to("User")
# Phone Model
class Phone(Model):
def user(self):
return self.belongs_to("User")
# Uso delle relazioni
user = User.find(1)
profile = user.profile() # Lazy loading
print(profile.bio)
# Accesso inverso
profile = Profile.find(1)
user = profile.user()
print(user.name)
# Creare relazioni
user = User.find(1)
profile = user.profile().create({
"bio": "Ciao, sono Mario!",
"website": "https://mario.com"
})
# Associare record esistenti
user = User.find(1)
profile = Profile.find(1)
user.profile().associate(profile)
# Dissociare
user.profile().dissociate()
One-to-Many
Relazione uno-a-molti, la più comune:
# User Model (un utente ha molti post)
class User(Model):
def posts(self):
return self.has_many("Post")
def comments(self):
return self.has_many("Comment", "author_id", "id")
def published_posts(self):
return self.has_many("Post").where("status", "published")
# Post Model (un post appartiene a un utente)
class Post(Model):
def author(self):
return self.belongs_to("User", "user_id", "id")
def category(self):
return self.belongs_to("Category")
# Category Model
class Category(Model):
def posts(self):
return self.has_many("Post")
# Uso base
user = User.find(1)
posts = user.posts() # Lazy loading
print(f"{user.name} has {posts.count()} posts")
# Iterare sui posts
for post in user.posts():
print(post.title)
# Query sui posts
recent_posts = user.posts().where("created_at", ">", date_sub(now(), days=7)).get()
popular_posts = user.posts().where("views", ">", 1000).order_by("views", "desc").get()
# Creare post per un utente
user = User.find(1)
post = user.posts().create({
"title": "Nuovo Post",
"content": "Contenuto del post...",
"status": "published"
})
# Creare multipli
user.posts().create_many([
{"title": "Post 1", "content": "Contenuto 1"},
{"title": "Post 2", "content": "Contenuto 2"}
])
# Associare post esistente
user = User.find(1)
post = Post.find(1)
user.posts().save(post)
# Associare multipli
user.posts().save_many([post1, post2, post3])
# Aggiornare tutti i post
user.posts().update({"featured": True})
# Eliminare tutti i post
user.posts().delete()
Many-to-Many
Relazione molti-a-molti con tabella pivot:
# Post Model (un post ha molti tag)
class Post(Model):
def tags(self):
return self.belongs_to_many("Tag")
def categories(self):
return self.belongs_to_many("Category", "post_categories", "post_id", "category_id")
# Tag Model (un tag appartiene a molti post)
class Tag(Model):
def posts(self):
return self.belongs_to_many("Post")
# User Model (utenti e ruoli)
class User(Model):
def roles(self):
return self.belongs_to_many("Role", "user_roles")\
.with_pivot("assigned_at", "assigned_by")\
.with_timestamps()
# Role Model
class Role(Model):
def users(self):
return self.belongs_to_many("User", "user_roles")
def permissions(self):
return self.belongs_to_many("Permission")
# Uso base
post = Post.find(1)
tags = post.tags() # Collection di tag
# Iterare sui tag
for tag in post.tags():
print(tag.name)
# Query sui tag
popular_tags = post.tags().where("popularity", ">", 100).get()
# Attach - associa tag al post
post = Post.find(1)
post.tags().attach(1) # Attach tag con ID 1
post.tags().attach([1, 2, 3]) # Attach multipli tag
# Attach con dati pivot
post.tags().attach(1, {"created_by": current_user().id})
post.tags().attach([
1 => {"priority": "high"},
2 => {"priority": "medium"}
])
# Detach - rimuovi associazione
post.tags().detach(1) # Rimuovi tag 1
post.tags().detach([1, 2]) # Rimuovi multipli
post.tags().detach() # Rimuovi tutti
# Sync - sincronizza (sostituisce tutte le associazioni)
post.tags().sync([1, 2, 3]) # Solo questi tag rimangono
post.tags().sync([
1 => {"priority": "high"},
2 => {"priority": "low"}
])
# Sync senza detach
post.tags().sync_without_detaching([4, 5]) # Aggiunge senza rimuovere
# Toggle - cambia stato associazione
post.tags().toggle([1, 2]) # Se presente rimuove, se assente aggiunge
# Update pivot data
post.tags().update_existing_pivot(1, {"priority": "urgent"})
# Accedere ai dati pivot
post = Post.with("tags").find(1)
for tag in post.tags():
print(f"Tag: {tag.name}, Priority: {tag.pivot.priority}")
# Query con condizioni pivot
user = User.find(1)
admin_roles = user.roles().where_pivot("active", True).get()
recent_roles = user.roles().where_pivot("assigned_at", ">", date_sub(now(), days=30)).get()
# Contare relazioni
post_count = Tag.find(1).posts().count()
tag_count = Post.find(1).tags().count()
Has One Through
Relazione attraverso un model intermedio:
# Country → User → Post (l'ultimo post di un paese)
class Country(Model):
def users(self):
return self.has_many("User")
def latest_post(self):
return self.has_one_through("Post", "User", "country_id", "user_id")
# User Model
class User(Model):
def country(self):
return self.belongs_to("Country")
def posts(self):
return self.has_many("Post")
# Post Model
class Post(Model):
def author(self):
return self.belongs_to("User", "user_id")
# Uso
country = Country.find(1)
latest_post = country.latest_post()
print(f"Latest post from {country.name}: {latest_post.title}")
# Supplier → User → History (ultimo accesso del fornitore)
class Supplier(Model):
def users(self):
return self.has_many("User")
def latest_access(self):
return self.has_one_through("AccessHistory", "User", "supplier_id", "user_id")
def contact_person(self):
return self.has_one_through("User", "Department", "supplier_id", "head_user_id")
# Department Model
class Department(Model):
def supplier(self):
return self.belongs_to("Supplier")
def head(self):
return self.belongs_to("User", "head_user_id")
# Uso con vincoli
supplier = Supplier.find(1)
recent_access = supplier.latest_access().where("created_at", ">", date_sub(now(), days=7)).first()
active_contact = supplier.contact_person().where("active", True).first()
Has Many Through
Relazione molti-a-molti attraverso un model intermedio:
# Country → Users → Posts (tutti i post di un paese)
class Country(Model):
def users(self):
return self.has_many("User")
def posts(self):
return self.has_many_through("Post", "User", "country_id", "user_id")
def published_posts(self):
return self.has_many_through("Post", "User", "country_id", "user_id")\
.where("posts.status", "published")
# Project → Users → Tasks (tutti i task di un progetto)
class Project(Model):
def users(self):
return self.has_many("User", "current_project_id")
def tasks(self):
return self.has_many_through("Task", "User", "current_project_id", "assigned_to")
def completed_tasks(self):
return self.has_many_through("Task", "User", "current_project_id", "assigned_to")\
.where("tasks.status", "completed")
# Uso
country = Country.find(1)
all_posts = country.posts() # Tutti i post degli utenti di questo paese
print(f"{country.name} has {all_posts.count()} posts")
# Con filtri
italy = Country.where("name", "Italy").first()
recent_posts = italy.posts().where("posts.created_at", ">", date_sub(now(), days=7)).get()
# Progetto e task
project = Project.find(1)
pending_tasks = project.tasks().where("tasks.status", "pending").get()
overdue_tasks = project.tasks().where("tasks.due_date", "<", now()).get()
# Statistiche
stats = {
"total_posts": country.posts().count(),
"published_posts": country.published_posts().count(),
"authors_count": country.users().has("posts").count()
}
Relazioni Polimorfiche
Un model può appartenere a più types di models:
# Comment può appartenere a Post o Video
class Comment(Model):
def commentable(self):
return self.morph_to("commentable")
# Post Model
class Post(Model):
def comments(self):
return self.morph_many("Comment", "commentable")
# Video Model
class Video(Model):
def comments(self):
return self.morph_many("Comment", "commentable")
# Image Model
class Image(Model):
def comments(self):
return self.morph_many("Comment", "commentable")
# Uso
# Aggiungere commento a un post
post = Post.find(1)
comment = post.comments().create({
"content": "Ottimo post!",
"author_id": current_user().id
})
# Aggiungere commento a un video
video = Video.find(1)
comment = video.comments().create({
"content": "Video fantastico!",
"author_id": current_user().id
})
# Accedere al parent dal commento
comment = Comment.find(1)
parent = comment.commentable() # Può essere Post, Video, o Image
if isinstance(parent, Post):
print(f"Comment on post: {parent.title}")
elif isinstance(parent, Video):
print(f"Comment on video: {parent.title}")
# Like polimorfici
class Like(Model):
def likeable(self):
return self.morph_to("likeable")
class Post(Model):
def likes(self):
return self.morph_many("Like", "likeable")
class Comment(Model):
def likes(self):
return self.morph_many("Like", "likeable")
# Uso
post = Post.find(1)
like = post.likes().create({"user_id": current_user().id})
comment = Comment.find(1)
like = comment.likes().create({"user_id": current_user().id})
Many-to-Many Polimorfiche
Relazione molti-a-molti con types diversi:
# Tag può essere associato a Post, Video, etc.
class Tag(Model):
def posts(self):
return self.morph_to_many("Post", "taggable")
def videos(self):
return self.morph_to_many("Video", "taggable")
def images(self):
return self.morph_to_many("Image", "taggable")
class Post(Model):
def tags(self):
return self.morph_by_many("Tag", "taggable")
class Video(Model):
def tags(self):
return self.morph_by_many("Tag", "taggable")
class Image(Model):
def tags(self):
return self.morph_by_many("Tag", "taggable")
# Uso
post = Post.find(1)
post.tags().attach([1, 2, 3]) # Associa tag al post
video = Video.find(1)
video.tags().sync([2, 3, 4]) # Sincronizza tag del video
# Trovare tutti i contenuti con un tag
tag = Tag.find(1)
tagged_posts = tag.posts().get()
tagged_videos = tag.videos().get()
tagged_images = tag.images().get()
# Contare contenuti taggati
post_count = tag.posts().count()
video_count = tag.videos().count()
total_count = post_count + video_count
# Con dati pivot
post.tags().attach(1, {"featured": True, "position": 1})
# Accedere ai dati pivot
post = Post.with("tags").find(1)
for tag in post.tags():
if tag.pivot.featured:
print(f"Featured tag: {tag.name} at position {tag.pivot.position}")
Eager Loading
Carica relazioni in anticipo per evitare N+1 queries:
# Problema N+1
users = User.all() # 1 query
for user in users:
print(user.posts().count()) # N queries (una per user)
# Soluzione: Eager Loading
users = User.with("posts").get() # 2 queries totali
for user in users:
print(len(user.posts)) # Nessuna query aggiuntiva
# Eager loading multipli
users = User.with(["posts", "profile", "roles"]).get()
# Eager loading nested
users = User.with(["posts.comments", "posts.tags"]).get()
# Eager loading condizionale
users = User.with(["posts" => lambda q: q.where("published", True)]).get()
# Eager loading con count
users = User.with_count("posts").get()
for user in users:
print(f"{user.name} has {user.posts_count} posts")
# Eager loading con somma/media
users = User.with_sum("orders", "total").get()
users = User.with_avg("reviews", "rating").get()
# Lazy eager loading (dopo aver caricato i models)
users = User.all()
users.load("posts", "profile")
# Lazy eager loading condizionale
users.load(["posts" => lambda q: q.where("featured", True)])
# Eager loading con pivot data
posts = Post.with("tags").get()
for post in posts:
for tag in post.tags:
print(f"Tag: {tag.name}, Priority: {tag.pivot.priority}")
# Prevenire lazy loading
users = User.with("posts").without_relations("comments").get()
# Controllo se relazione è caricata
if user.relation_loaded("posts"):
print("Posts already loaded")
else:
user.load("posts")
Query delle Relazioni
Filtra models basandoti sulle loro relazioni:
# Has - utenti che hanno almeno un post
users_with_posts = User.has("posts").get()
# Has con count
prolific_users = User.has("posts", ">=", 10).get()
# Has con callback
active_users = User.has("posts", lambda q:
q.where("created_at", ">", date_sub(now(), days=30))
).get()
# Doesn't Have - utenti senza post
users_without_posts = User.doesnt_have("posts").get()
# Where Has - utenti con post pubblicati
users_with_published = User.where_has("posts", lambda q:
q.where("status", "published")
).get()
# Where Doesn't Have
users_without_published = User.where_doesnt_have("posts", lambda q:
q.where("status", "published")
).get()
# With Count condizionale
users = User.with_count(["posts", "comments as recent_comments_count" => lambda q:
q.where("created_at", ">", date_sub(now(), days=7))
]).get()
# Query relazioni esistenti
posts_with_tags = Post.whereHas("tags").get()
posts_with_many_comments = Post.has("comments", ">", 5).get()
# Query complesse
popular_authors = User.where_has("posts", lambda q:
q.where("views", ">", 1000)
).with_count("posts").order_by("posts_count", "desc").get()
# Relazioni nested
countries_with_active_users = Country.where_has("users.posts", lambda q:
q.where("status", "published")
.where("created_at", ">", date_sub(now(), days=30))
).get()
# Combinazione di condizioni
expert_users = User.where_has("posts", lambda q:
q.where("status", "published")
).where_has("comments", lambda q:
q.where("approved", True)
).has("posts", ">=", 5).get()
Inserimento e Aggiornamento
Crea e aggiorna records con le loro relazioni:
# Create con relazioni
user = User.create({
"name": "Mario Rossi",
"email": "mario@test.com"
})
# Crea post per l'utente
post = user.posts().create({
"title": "Primo Post",
"content": "Contenuto...",
"status": "published"
})
# Crea multipli
user.posts().create_many([
{"title": "Post 1", "content": "Contenuto 1"},
{"title": "Post 2", "content": "Contenuto 2"}
])
# First or Create
post = user.posts().first_or_create(
{"title": "Post Unico"}, # Condizioni ricerca
{"content": "Contenuto default"} # Valori se creato
)
# Update or Create
post = user.posts().update_or_create(
{"slug": "post-unico"},
{"title": "Post Aggiornato", "content": "Nuovo contenuto"}
)
# Associate/Dissociate (One-to-One, One-to-Many)
user = User.find(1)
profile = Profile.find(1)
user.profile().associate(profile) # Associa
user.profile().dissociate() # Dissocia
# Save/Save Many (One-to-Many)
post = Post.find(1)
user.posts().save(post) # Salva relazione
user.posts().save_many([post1, post2, post3])
# Attach/Detach/Sync (Many-to-Many)
post = Post.find(1)
post.tags().attach([1, 2, 3]) # Associa tag
post.tags().detach([1]) # Rimuovi tag
post.tags().sync([2, 3, 4]) # Sincronizza
# Con attributi pivot
post.tags().attach(1, {"priority": "high", "created_by": current_user().id})
# Sync con detach
post.tags().sync_without_detaching([5, 6]) # Aggiunge senza rimuovere
# Toggle
post.tags().toggle([1, 2]) # Cambia stato associazione
# Update attraverso relazioni
user = User.find(1)
user.posts().update({"featured": True}) # Aggiorna tutti i post
user.posts().where("status", "draft").update({"status": "published"})
# Delete attraverso relazioni
user.posts().delete() # Elimina tutti i post
user.posts().where("views", "<", 10).delete() # Elimina post con poche views
Accessors nelle Relazioni
Computed properties per le relazioni:
class User(Model):
def posts(self):
return self.has_many("Post")
def comments(self):
return self.has_many("Comment")
def followers(self):
return self.belongs_to_many("User", "followers", "user_id", "follower_id")
# Computed properties
def get_posts_count_attribute(self):
return self.posts().count()
def get_published_posts_count_attribute(self):
return self.posts().where("status", "published").count()
def get_recent_posts_attribute(self):
return self.posts().where("created_at", ">", date_sub(now(), days=7)).get()
def get_is_author_attribute(self):
return self.posts().exists()
def get_is_popular_attribute(self):
return self.followers().count() > 100
def get_engagement_score_attribute(self):
posts_score = self.posts().sum("views") or 0
comments_score = self.comments().count() * 10
followers_score = self.followers().count() * 5
return posts_score + comments_score + followers_score
class Post(Model):
def author(self):
return self.belongs_to("User", "user_id")
def comments(self):
return self.has_many("Comment")
def tags(self):
return self.belongs_to_many("Tag")
# Computed properties
def get_comments_count_attribute(self):
return self.comments().count()
def get_approved_comments_count_attribute(self):
return self.comments().where("approved", True).count()
def get_reading_time_attribute(self):
words = str_word_count(strip_tags(self.content))
return math_ceil(words / 200) # 200 words per minute
def get_is_popular_attribute(self):
return self.views > 1000 and self.comments_count > 10
def get_tag_names_attribute(self):
return self.tags().pluck("name").join(", ")
# Uso
user = User.find(1)
print(f"Posts: {user.posts_count}")
print(f"Published: {user.published_posts_count}")
print(f"Is author: {user.is_author}")
print(f"Engagement: {user.engagement_score}")
post = Post.find(1)
print(f"Comments: {post.comments_count}")
print(f"Reading time: {post.reading_time} minutes")
print(f"Tags: {post.tag_names}")
print(f"Is popular: {post.is_popular}")
Esempio Completo: Sistema E-commerce
# Models per sistema e-commerce
class User(Model):
def orders(self):
return self.has_many("Order")
def cart(self):
return self.has_one("Cart")
def addresses(self):
return self.has_many("Address")
def reviews(self):
return self.has_many("Review")
def wishlist(self):
return self.belongs_to_many("Product", "wishlists")
# Computed properties
def get_total_spent_attribute(self):
return self.orders().where("status", "completed").sum("total") or 0
def get_average_order_value_attribute(self):
return self.orders().where("status", "completed").avg("total") or 0
def get_is_vip_attribute(self):
return self.total_spent > 1000
class Product(Model):
def category(self):
return self.belongs_to("Category")
def orders(self):
return self.belongs_to_many("Order", "order_items")\
.with_pivot("quantity", "price")
def reviews(self):
return self.has_many("Review")
def images(self):
return self.morph_many("Image", "imageable")
def wishlisted_by(self):
return self.belongs_to_many("User", "wishlists")
# Computed properties
def get_average_rating_attribute(self):
return self.reviews().avg("rating") or 0
def get_reviews_count_attribute(self):
return self.reviews().count()
def get_times_ordered_attribute(self):
return self.orders().sum("pivot.quantity") or 0
class Order(Model):
def user(self):
return self.belongs_to("User")
def products(self):
return self.belongs_to_many("Product", "order_items")\
.with_pivot("quantity", "price", "subtotal")
def address(self):
return self.belongs_to("Address", "shipping_address_id")
def payment(self):
return self.has_one("Payment")
# Computed properties
def get_items_count_attribute(self):
return self.products().sum("pivot.quantity") or 0
def get_subtotal_attribute(self):
return self.products().sum("pivot.subtotal") or 0
class Category(Model):
def products(self):
return self.has_many("Product")
def parent(self):
return self.belongs_to("Category", "parent_id")
def children(self):
return self.has_many("Category", "parent_id")
# Recursive categories
def ancestors(self):
ancestors = []
current = self.parent()
while current:
ancestors.append(current)
current = current.parent()
return ancestors
def descendants(self):
descendants = []
children = self.children()
for child in children:
descendants.append(child)
descendants.extend(child.descendants())
return descendants
class Review(Model):
def user(self):
return self.belongs_to("User")
def product(self):
return self.belongs_to("Product")
def images(self):
return self.morph_many("Image", "imageable")
class Cart(Model):
def user(self):
return self.belongs_to("User")
def products(self):
return self.belongs_to_many("Product", "cart_items")\
.with_pivot("quantity", "added_at")
def get_total_attribute(self):
total = 0
for product in self.products():
total += product.price * product.pivot.quantity
return total
def get_items_count_attribute(self):
return self.products().sum("pivot.quantity") or 0
# Uso del sistema
def create_order_from_cart(user_id, address_id):
"""Crea ordine dal carrello"""
user = User.find(user_id)
cart = user.cart()
if not cart or cart.products().count() == 0:
raise Exception("Carrello vuoto")
with DB.transaction():
# Crea ordine
order = user.orders().create({
"status": "pending",
"shipping_address_id": address_id,
"total": cart.total
})
# Aggiungi prodotti all'ordine
for product in cart.products():
order.products().attach(product.id, {
"quantity": product.pivot.quantity,
"price": product.price,
"subtotal": product.price * product.pivot.quantity
})
# Svuota carrello
cart.products().detach()
return order
def get_user_recommendations(user_id):
"""Raccomandazioni prodotti per utente"""
user = User.find(user_id)
# Prodotti della stessa categoria degli acquisti precedenti
purchased_categories = user.orders()\
.with("products.category")\
.where("status", "completed")\
.get()\
.pluck("products.*.category.id")\
.flatten()\
.unique()
recommendations = Product.where_in("category_id", purchased_categories)\
.where_not_in("id", user.orders()
.with("products")
.get()
.pluck("products.*.id")
.flatten())\
.with_avg("reviews", "rating")\
.order_by("reviews_avg_rating", "desc")\
.limit(10)\
.get()
return recommendations
def get_product_analytics(product_id):
"""Analytics per prodotto"""
product = Product.with(["reviews", "orders"]).find(product_id)
return {
"total_orders": product.orders().count(),
"total_quantity_sold": product.times_ordered,
"revenue": product.orders().sum("pivot.subtotal"),
"average_rating": product.average_rating,
"reviews_count": product.reviews_count,
"wishlist_count": product.wishlisted_by().count(),
"rating_distribution": product.reviews()
.select("rating", DB.raw("count(*) as count"))
.group_by("rating")
.get()
}
# Query complesse
def get_dashboard_stats():
"""Statistiche dashboard admin"""
return {
"total_users": User.count(),
"vip_users": User.where_has("orders", lambda q:
q.where("status", "completed")
).with_sum("orders", "total").get().filter(lambda u: u.orders_sum_total > 1000).count(),
"top_products": Product.with_count("orders")
.order_by("orders_count", "desc")
.limit(10)
.get(),
"recent_orders": Order.with(["user", "products"])
.where("created_at", ">", date_sub(now(), days=7))
.order_by("created_at", "desc")
.get(),
"categories_performance": Category.with_count("products")
.with_sum("products.orders", "pivot.quantity")
.order_by("products_orders_sum_pivot_quantity", "desc")
.get()
}
Best Practices
⚡ Performance
- Usa eager loading per evitare N+1
- Carica solo le relazioni necessarie
- Usa with_count per conteggi
- Ottimizza query con has/whereHas
🏗️ Struttura
- Definisci relazioni inverse
- Usa nomi descrittivi per i metodi
- Organizza foreign keys coerentemente
- Documenta relazioni complesse
🔒 Integrità
- Usa transazioni per operazioni multiple
- Gestisci cascading deletes
- Valida dati nelle relazioni
- Controlla vincoli di integrità
🎯 Eccellente! Ora sai gestire tutte le tipologie di relazioni in Flux.
Scopri come strutturare il database con le Migrazioni.