Query Builder
Costruisci query SQL complesse con una sintassi fluida, sicura e potente
Sicurezza Integrata: Il Query Builder di Flux previene automaticamente
SQL injection usando prepared statements e parametri vincolati.
Concetti Base
Il Query Builder può essere usato tramite Models o direttamente:
# Tramite Model
users = User.where("active", True).get()
# Direttamente dal DB
from flux.db import DB
users = DB.table("users").where("active", True).get()
# Query raw sicure
users = DB.select("SELECT * FROM users WHERE active = ?", [True])
# Connessione specifica
users = DB.connection("mysql").table("users").get()
Select e Proiezioni
Controllo completo sui campi da recuperare:
# Select tutto
users = DB.table("users").get()
# Select specifici campi
users = DB.table("users").select("name", "email").get()
users = DB.table("users").select(["name", "email", "created_at"]).get()
# Alias
users = DB.table("users")\
.select("name as full_name", "email as contact")\
.get()
# Funzioni aggregate
stats = DB.table("users")\
.select("role", "count(*) as total")\
.group_by("role")\
.get()
# Espressioni raw
users = DB.table("users")\
.select("name", DB.raw("UPPER(email) as email_upper"))\
.get()
# Distinct
roles = DB.table("users").select("role").distinct().get()
# Additive select
query = DB.table("users").select("name")
query = query.add_select("email") # Aggiunge email
result = query.get()
# Select con subquery
subquery = DB.table("posts").select("user_id").where("status", "published")
users = DB.table("users")\
.select("*")\
.where_in("id", subquery)\
.get()
Clausole Where
Filtri potenti e flessibili:
# Where base
users = DB.table("users").where("active", True).get()
users = DB.table("users").where("age", ">", 18).get()
users = DB.table("users").where("name", "like", "%mario%").get()
# Operatori supportati
users = DB.table("users").where("age", ">=", 18).get()
users = DB.table("users").where("score", "!=", 0).get()
users = DB.table("users").where("email", "not like", "%temp%").get()
# Where multipli (AND)
users = DB.table("users")\
.where("active", True)\
.where("role", "admin")\
.get()
# Or Where
users = DB.table("users")\
.where("role", "admin")\
.or_where("role", "moderator")\
.get()
# Where con array (AND)
users = DB.table("users").where([
["active", "=", True],
["age", ">", 18],
["role", "in", ["admin", "user"]]
]).get()
# Where con closure
users = DB.table("users").where(lambda q:
q.where("age", ">", 18).or_where("verified", True)
).get()
# Where In
users = DB.table("users").where_in("id", [1, 2, 3, 4]).get()
users = DB.table("users").where_not_in("status", ["banned", "deleted"]).get()
# Where Null
users = DB.table("users").where_null("deleted_at").get()
users = DB.table("users").where_not_null("email_verified_at").get()
# Where Between
users = DB.table("users").where_between("age", 18, 65).get()
users = DB.table("users").where_not_between("score", 0, 50).get()
# Where Exists
users = DB.table("users").where_exists(lambda q:
q.select(DB.raw(1))
.from_("posts")
.where_column("posts.user_id", "users.id")
).get()
# Where Column (confronta colonne)
users = DB.table("users")\
.where_column("created_at", "updated_at")\
.get()
# Where JSON (per campi JSON)
users = DB.table("users")\
.where_json("settings->theme", "dark")\
.get()
# Where Full Text (per ricerca full-text)
posts = DB.table("posts")\
.where_full_text(["title", "content"], "search term")\
.get()
Where con Date
Filtri specializzati per campi data:
# Where Date
posts = DB.table("posts").where_date("created_at", "2024-01-15").get()
posts = DB.table("posts").where_date("created_at", ">", "2024-01-01").get()
# Where Year/Month/Day
posts = DB.table("posts").where_year("created_at", 2024).get()
posts = DB.table("posts").where_month("created_at", 12).get()
posts = DB.table("posts").where_day("created_at", 25).get()
# Where Time
logs = DB.table("logs").where_time("created_at", ">", "10:00:00").get()
# Combinazioni
recent_posts = DB.table("posts")\
.where_year("created_at", 2024)\
.where_month("created_at", 12)\
.where("status", "published")\
.get()
# Where con helper date
today_posts = DB.table("posts").where_date("created_at", today()).get()
this_week = DB.table("posts")\
.where_date("created_at", ">=", start_of_week())\
.get()
# Range di date
posts = DB.table("posts")\
.where_between("created_at", "2024-01-01", "2024-12-31")\
.get()
# Date relative
recent = DB.table("posts")\
.where("created_at", ">=", date_sub(now(), days=7))\
.get()
Joins
Unisci tabelle per query complesse:
# Inner Join
users_with_posts = DB.table("users")\
.join("posts", "users.id", "=", "posts.user_id")\
.select("users.*", "posts.title")\
.get()
# Left Join
users = DB.table("users")\
.left_join("posts", "users.id", "=", "posts.user_id")\
.select("users.*", "posts.title")\
.get()
# Right Join
posts = DB.table("posts")\
.right_join("users", "posts.user_id", "=", "users.id")\
.select("posts.*", "users.name")\
.get()
# Cross Join
combinations = DB.table("colors")\
.cross_join("sizes")\
.get()
# Join con condizioni multiple
users = DB.table("users")\
.join("posts", lambda join:
join.on("users.id", "=", "posts.user_id")
.where("posts.status", "=", "published")
).get()
# Join con OR
users = DB.table("users")\
.join("contacts", lambda join:
join.on("users.id", "=", "contacts.user_id")
.or_on("users.email", "=", "contacts.email")
).get()
# Sub-query Join
latest_posts = DB.table("posts")\
.select("user_id", DB.raw("MAX(created_at) as latest"))\
.group_by("user_id")
users = DB.table("users")\
.join_sub(latest_posts, "latest_posts", "users.id", "=", "latest_posts.user_id")\
.get()
# Join con alias
users = DB.table("users as u")\
.join("posts as p", "u.id", "=", "p.user_id")\
.select("u.name", "p.title")\
.get()
Ordinamento e Limiti
Controlla l'ordine e la quantità di risultati:
# Order By
users = DB.table("users").order_by("name").get()
users = DB.table("users").order_by("created_at", "desc").get()
# Ordinamenti multipli
users = DB.table("users")\
.order_by("role")\
.order_by("name", "asc")\
.get()
# Order By Raw
users = DB.table("users")\
.order_by_raw("name COLLATE utf8_unicode_ci")\
.get()
# Random order
users = DB.table("users").in_random_order().get()
# Latest/Oldest (per timestamp)
users = DB.table("users").latest().get() # ORDER BY created_at DESC
users = DB.table("users").latest("updated_at").get()
users = DB.table("users").oldest().get() # ORDER BY created_at ASC
# Limit e Offset
users = DB.table("users").limit(10).get()
users = DB.table("users").offset(20).limit(10).get()
# Take e Skip (aliases)
users = DB.table("users").take(10).get()
users = DB.table("users").skip(20).take(10).get()
# First e Last
first_user = DB.table("users").order_by("created_at").first()
last_user = DB.table("users").order_by("created_at").last()
# Pagination
users = DB.table("users").paginate(page=1, per_page=15)
print(users.data) # I dati
print(users.total) # Totale record
print(users.last_page) # Ultima pagina
# Simple pagination (senza count)
users = DB.table("users").simple_paginate(15)
Raggruppamento e Aggregazione
Raggruppa dati e calcola statistiche:
# Group By
stats = DB.table("users")\
.select("role", DB.raw("count(*) as total"))\
.group_by("role")\
.get()
# Group By multipli
stats = DB.table("orders")\
.select("status", "payment_method", DB.raw("count(*) as total"))\
.group_by("status", "payment_method")\
.get()
# Having
popular_roles = DB.table("users")\
.select("role", DB.raw("count(*) as total"))\
.group_by("role")\
.having("total", ">", 10)\
.get()
# Having con raw
stats = DB.table("orders")\
.select("user_id", DB.raw("SUM(amount) as total"))\
.group_by("user_id")\
.having_raw("SUM(amount) > 1000")\
.get()
# Funzioni aggregate
total_users = DB.table("users").count()
avg_age = DB.table("users").avg("age")
max_score = DB.table("users").max("score")
min_date = DB.table("posts").min("created_at")
sum_amount = DB.table("orders").sum("amount")
# Aggregate con condizioni
active_users = DB.table("users").where("active", True).count()
admin_avg_age = DB.table("users").where("role", "admin").avg("age")
# Aggregate con Group By
role_stats = DB.table("users")\
.select("role")\
.select_raw("count(*) as total")\
.select_raw("avg(age) as avg_age")\
.select_raw("max(created_at) as newest")\
.group_by("role")\
.get()
# Conditional aggregates
stats = DB.table("orders")\
.select_raw("SUM(CASE WHEN status = 'completed' THEN amount ELSE 0 END) as completed_amount")\
.select_raw("SUM(CASE WHEN status = 'pending' THEN amount ELSE 0 END) as pending_amount")\
.first()
Subquery
Query annidate per logica complessa:
# Subquery in Where
high_spenders = DB.table("orders")\
.select("user_id")\
.where("amount", ">", 1000)
users = DB.table("users")\
.where_in("id", high_spenders)\
.get()
# Subquery come colonna
users = DB.table("users")\
.select("*")\
.add_select(DB.table("orders")
.select_raw("count(*)")
.where_column("user_id", "users.id")
.as_("orders_count")
).get()
# Subquery in From
subquery = DB.table("orders")\
.select("user_id", DB.raw("SUM(amount) as total"))\
.group_by("user_id")
big_spenders = DB.table(subquery.as_("totals"))\
.where("total", ">", 5000)\
.get()
# Exists subquery
users_with_posts = DB.table("users")\
.where_exists(lambda q:
q.select(DB.raw(1))
.from_("posts")
.where_column("posts.user_id", "users.id")
).get()
# Not Exists
users_without_posts = DB.table("users")\
.where_not_exists(lambda q:
q.select(DB.raw(1))
.from_("posts")
.where_column("posts.user_id", "users.id")
).get()
# Subquery con Join
latest_posts = DB.table("posts")\
.select("user_id", DB.raw("MAX(created_at) as latest"))\
.group_by("user_id")
users = DB.table("users")\
.join_sub(latest_posts, "latest", "users.id", "=", "latest.user_id")\
.select("users.*", "latest.latest")\
.get()
Union
Combina risultati di query diverse:
# Union base
admins = DB.table("users").where("role", "admin").select("name", "email")
moderators = DB.table("users").where("role", "moderator").select("name", "email")
staff = admins.union(moderators).get()
# Union All (include duplicati)
all_staff = admins.union_all(moderators).get()
# Union con ordinamento
combined = DB.table("users")\
.where("role", "admin")\
.select("name", "email", "role")\
.union(
DB.table("users")
.where("role", "moderator")
.select("name", "email", "role")
)\
.order_by("name")\
.get()
# Union complesso
current_users = DB.table("users")\
.where("active", True)\
.select("id", "name", "email", DB.raw("'current' as type"))
archived_users = DB.table("archived_users")\
.select("id", "name", "email", DB.raw("'archived' as type"))
all_users = current_users.union(archived_users).get()
Query Raw
Esegui SQL grezzo quando necessario:
# Select raw
users = DB.select("SELECT * FROM users WHERE active = ?", [True])
# Raw con named bindings
users = DB.select("""
SELECT * FROM users
WHERE role = :role AND age > :min_age
""", {"role": "admin", "min_age": 18})
# Insert raw
DB.insert("INSERT INTO users (name, email) VALUES (?, ?)",
["Mario Rossi", "mario@test.com"])
# Update raw
affected = DB.update("""
UPDATE users SET last_login = ? WHERE id = ?
""", [now(), 1])
# Delete raw
deleted = DB.delete("DELETE FROM users WHERE active = ?", [False])
# Statement generico
DB.statement("CREATE INDEX idx_users_email ON users(email)")
# Transazioni con raw
DB.transaction(lambda:
DB.statement("SET foreign_key_checks = 0")
DB.delete("DELETE FROM posts WHERE user_id = ?", [1])
DB.delete("DELETE FROM users WHERE id = ?", [1])
DB.statement("SET foreign_key_checks = 1")
)
# Procedure stored
result = DB.select("CALL get_user_stats(?)", [user_id])
# Raw expression in query builder
users = DB.table("users")\
.select("*", DB.raw("UPPER(name) as name_upper"))\
.where_raw("age > ? AND role = ?", [18, "admin"])\
.order_by_raw("RAND()")\
.get()
Transazioni
Gestisci operazioni atomiche:
# Transazione base
try:
DB.begin_transaction()
# Operazioni
user = DB.table("users").insert({"name": "Mario", "email": "mario@test.com"})
DB.table("profiles").insert({"user_id": user.id, "bio": "Ciao mondo"})
DB.commit()
except Exception as e:
DB.rollback()
raise e
# Transazione con context manager
with DB.transaction():
user = User.create({"name": "Mario", "email": "mario@test.com"})
Profile.create({"user_id": user.id, "bio": "Ciao mondo"})
# Transazione con callback
def transfer_funds(from_user, to_user, amount):
def transaction():
# Verifica saldo
if from_user.balance < amount:
raise Exception("Saldo insufficiente")
# Trasferimento
from_user.decrement("balance", amount)
to_user.increment("balance", amount)
# Log transazione
DB.table("transactions").insert({
"from_user_id": from_user.id,
"to_user_id": to_user.id,
"amount": amount,
"created_at": now()
})
DB.transaction(transaction)
# Savepoint (nested transactions)
try:
DB.begin_transaction()
# Operazione 1
user = User.create(user_data)
# Savepoint
DB.savepoint("user_created")
try:
# Operazione rischiosa
risky_operation()
except Exception:
# Rollback solo al savepoint
DB.rollback_to_savepoint("user_created")
DB.commit()
except Exception:
DB.rollback()
# Retry automatico
@DB.retry_on_deadlock(max_attempts=3)
def critical_operation():
with DB.transaction():
# Operazioni critiche
pass
Pessimistic Locking
Controlla l'accesso concorrente ai dati:
# Shared lock (read lock)
users = DB.table("users")\
.where("id", 1)\
.shared_lock()\
.get()
# Exclusive lock (write lock)
user = DB.table("users")\
.where("id", 1)\
.lock_for_update()\
.first()
# Lock in transazione
with DB.transaction():
# Blocca record per aggiornamento
user = User.where("id", 1).lock_for_update().first()
user.balance += 100
user.save()
# Skip locked records
available_jobs = DB.table("jobs")\
.where("status", "pending")\
.skip_locked()\
.lock_for_update()\
.get()
# No wait lock
try:
user = DB.table("users")\
.where("id", 1)\
.lock_for_update(no_wait=True)\
.first()
except LockWaitTimeoutException:
# Gestisci timeout
pass
Chunk Processing
Elabora grandi dataset in batch:
# Chunk base
def process_user(user):
# Elabora singolo utente
user.update({"processed": True})
# Processa 1000 utenti alla volta
DB.table("users")\
.where("processed", False)\
.chunk(1000, process_user)
# Chunk con indice
def process_batch(users, page):
print(f"Processing batch {page}")
for user in users:
process_user(user)
DB.table("users").chunk(500, process_batch)
# Chunk lazy (memoria efficiente)
for users in DB.table("users").lazy_chunk(100):
for user in users:
process_user(user)
# Chunk con order by
DB.table("users")\
.order_by("id")\
.chunk(1000, process_user)
# Chunk con where
DB.table("orders")\
.where("status", "pending")\
.where("created_at", "<", date_sub(now(), hours=1))\
.chunk(500, lambda orders: process_orders(orders))
# Chunk avanzato con controllo errori
def safe_chunk_processor(records, page):
try:
for record in records:
process_record(record)
except Exception as e:
log_error(f"Error processing chunk {page}: {e}")
# Continua con il prossimo chunk
return True # Continue processing
DB.table("large_table").chunk(1000, safe_chunk_processor)
Query Caching
Cache intelligente per query frequenti:
# Cache query result
users = DB.table("users")\
.where("active", True)\
.remember(minutes=10)\
.get()
# Cache con chiave custom
popular_posts = DB.table("posts")\
.where("views", ">", 1000)\
.remember(minutes=30, key="popular_posts")\
.get()
# Cache con tag
users = DB.table("users")\
.where("role", "admin")\
.remember(minutes=15, tags=["users", "admin"])\
.get()
# Cache forever (fino a invalidazione)
settings = DB.table("settings")\
.remember_forever()\
.get()
# Cache condizionale
users = DB.table("users")\
.when(should_cache, lambda q: q.remember(minutes=5))\
.get()
# Invalidazione cache
DB.forget_cache("popular_posts")
DB.flush_cache_tags(["users"])
# Cache con callback
def expensive_query():
return DB.table("complex_view")\
.join("other_table", "complex_view.id", "=", "other_table.ref_id")\
.where("complex_condition", True)\
.get()
result = DB.remember("expensive_query", minutes=60, callback=expensive_query)
Debugging e Profiling
Strumenti per ottimizzare le performance:
# Debug query
users = DB.table("users")\
.where("active", True)\
.debug() # Stampa SQL e bindings
# Log query
DB.enable_query_log()
users = User.where("role", "admin").get()
queries = DB.get_query_log()
# Explain query
explain = DB.table("users")\
.where("active", True)\
.explain()
# Profiling
with DB.profile():
users = User.with("posts", "comments").get()
# Mostra statistiche query
# Listen to queries
def log_query(query, bindings, time):
if time > 100: # Query lente > 100ms
log_warning(f"Slow query: {query} ({time}ms)")
DB.listen(log_query)
# Macro per query riutilizzabili
def search_users(query, term):
return query.where("name", "like", f"%{term}%")\
.or_where("email", "like", f"%{term}%")
DB.macro("search_users", search_users)
# Uso della macro
results = DB.table("users").search_users("mario").get()
Esempio Completo: Analytics Dashboard
# Analytics per dashboard amministratore
class AnalyticsService:
def get_user_stats(self, period="month"):
"""Statistiche utenti per periodo"""
# Base query con filtro periodo
base_query = DB.table("users")
if period == "day":
date_filter = date_sub(now(), days=1)
elif period == "week":
date_filter = date_sub(now(), weeks=1)
elif period == "month":
date_filter = date_sub(now(), months=1)
else:
date_filter = date_sub(now(), years=1)
# Nuovi utenti
new_users = base_query.where("created_at", ">=", date_filter).count()
# Utenti attivi
active_users = base_query.where("last_login", ">=", date_filter).count()
# Utenti per ruolo
users_by_role = DB.table("users")\
.select("role", DB.raw("count(*) as total"))\
.group_by("role")\
.remember(minutes=10, key=f"users_by_role_{period}")\
.get()
# Crescita utenti (query complessa)
growth_data = DB.select("""
SELECT
DATE(created_at) as date,
COUNT(*) as new_users,
SUM(COUNT(*)) OVER (ORDER BY DATE(created_at)) as total_users
FROM users
WHERE created_at >= ?
GROUP BY DATE(created_at)
ORDER BY date
""", [date_filter])
return {
"new_users": new_users,
"active_users": active_users,
"users_by_role": users_by_role,
"growth_data": growth_data
}
def get_content_stats(self):
"""Statistiche contenuti"""
# Posts per status
posts_by_status = DB.table("posts")\
.select("status", DB.raw("count(*) as total"))\
.group_by("status")\
.remember(minutes=5)\
.get()
# Top autori
top_authors = DB.table("users")\
.join("posts", "users.id", "=", "posts.user_id")\
.select("users.name", "users.email",
DB.raw("count(posts.id) as posts_count"))\
.group_by("users.id", "users.name", "users.email")\
.order_by("posts_count", "desc")\
.limit(10)\
.get()
# Contenuti più popolari
popular_content = DB.table("posts")\
.select("title", "views", "comments_count", "created_at")\
.where("status", "published")\
.order_by("views", "desc")\
.limit(20)\
.remember(minutes=15, key="popular_content")\
.get()
return {
"posts_by_status": posts_by_status,
"top_authors": top_authors,
"popular_content": popular_content
}
def get_engagement_metrics(self):
"""Metriche di engagement"""
# Commenti per giorno (ultimi 30 giorni)
comments_trend = DB.select("""
SELECT
DATE(created_at) as date,
COUNT(*) as comments_count
FROM comments
WHERE created_at >= ?
GROUP BY DATE(created_at)
ORDER BY date
""", [date_sub(now(), days=30)])
# Like per post (top 10)
most_liked = DB.table("posts")\
.left_join("likes", "posts.id", "=", "likes.post_id")\
.select("posts.title",
DB.raw("count(likes.id) as likes_count"))\
.group_by("posts.id", "posts.title")\
.order_by("likes_count", "desc")\
.limit(10)\
.get()
# Utenti più attivi
active_commenters = DB.table("users")\
.join("comments", "users.id", "=", "comments.user_id")\
.select("users.name",
DB.raw("count(comments.id) as comments_count"))\
.where("comments.created_at", ">=", date_sub(now(), days=30))\
.group_by("users.id", "users.name")\
.order_by("comments_count", "desc")\
.limit(10)\
.get()
return {
"comments_trend": comments_trend,
"most_liked": most_liked,
"active_commenters": active_commenters
}
def get_performance_insights(self):
"""Insights sulle performance"""
# Query lente
slow_queries = DB.get_slow_queries(threshold=100)
# Tabelle più utilizzate
table_usage = DB.get_table_stats()
# Indici mancanti
missing_indexes = DB.analyze_missing_indexes()
return {
"slow_queries": slow_queries,
"table_usage": table_usage,
"missing_indexes": missing_indexes
}
# Uso del servizio
analytics = AnalyticsService()
# Dashboard data
dashboard_data = {
"user_stats": analytics.get_user_stats("month"),
"content_stats": analytics.get_content_stats(),
"engagement": analytics.get_engagement_metrics()
}
# Report performance (admin only)
if current_user().is_admin():
dashboard_data["performance"] = analytics.get_performance_insights()
return json_response(dashboard_data)
Best Practices
⚡ Performance
- Usa indici appropriati
- Limita i risultati con LIMIT
- Evita SELECT * quando possibile
- Cache query frequenti
🔒 Sicurezza
- Usa sempre parametri vincolati
- Valida input utente
- Limita permessi database
- Evita query raw quando possibile
📊 Debugging
- Monitora query lente
- Usa EXPLAIN per ottimizzazione
- Log query in development
- Profila performance critical path
🚀 Fantastico! Ora padroneggi il Query Builder di Flux.
Scopri come creare Relazioni tra Models
per semplificare query complesse.