Query Builder

Costruisci query SQL complesse con una sintassi fluida, sicura e potente

Sicurezza Integrata: Il Query Builder di Flux previene automaticamente SQL injection usando prepared statements e parametri vincolati.

Concetti Base

Il Query Builder può essere usato tramite Models o direttamente:

# Tramite Model
users = User.where("active", True).get()

# Direttamente dal DB
from flux.db import DB
users = DB.table("users").where("active", True).get()

# Query raw sicure
users = DB.select("SELECT * FROM users WHERE active = ?", [True])

# Connessione specifica
users = DB.connection("mysql").table("users").get()

Select e Proiezioni

Controllo completo sui campi da recuperare:

# Select tutto
users = DB.table("users").get()

# Select specifici campi
users = DB.table("users").select("name", "email").get()
users = DB.table("users").select(["name", "email", "created_at"]).get()

# Alias
users = DB.table("users")\
          .select("name as full_name", "email as contact")\
          .get()

# Funzioni aggregate
stats = DB.table("users")\
          .select("role", "count(*) as total")\
          .group_by("role")\
          .get()

# Espressioni raw
users = DB.table("users")\
          .select("name", DB.raw("UPPER(email) as email_upper"))\
          .get()

# Distinct
roles = DB.table("users").select("role").distinct().get()

# Additive select
query = DB.table("users").select("name")
query = query.add_select("email")  # Aggiunge email
result = query.get()

# Select con subquery
subquery = DB.table("posts").select("user_id").where("status", "published")
users = DB.table("users")\
          .select("*")\
          .where_in("id", subquery)\
          .get()

Clausole Where

Filtri potenti e flessibili:

# Where base
users = DB.table("users").where("active", True).get()
users = DB.table("users").where("age", ">", 18).get()
users = DB.table("users").where("name", "like", "%mario%").get()

# Operatori supportati
users = DB.table("users").where("age", ">=", 18).get()
users = DB.table("users").where("score", "!=", 0).get()
users = DB.table("users").where("email", "not like", "%temp%").get()

# Where multipli (AND)
users = DB.table("users")\
          .where("active", True)\
          .where("role", "admin")\
          .get()

# Or Where
users = DB.table("users")\
          .where("role", "admin")\
          .or_where("role", "moderator")\
          .get()

# Where con array (AND)
users = DB.table("users").where([
    ["active", "=", True],
    ["age", ">", 18],
    ["role", "in", ["admin", "user"]]
]).get()

# Where con closure
users = DB.table("users").where(lambda q:
    q.where("age", ">", 18).or_where("verified", True)
).get()

# Where In
users = DB.table("users").where_in("id", [1, 2, 3, 4]).get()
users = DB.table("users").where_not_in("status", ["banned", "deleted"]).get()

# Where Null
users = DB.table("users").where_null("deleted_at").get()
users = DB.table("users").where_not_null("email_verified_at").get()

# Where Between
users = DB.table("users").where_between("age", 18, 65).get()
users = DB.table("users").where_not_between("score", 0, 50).get()

# Where Exists
users = DB.table("users").where_exists(lambda q:
    q.select(DB.raw(1))
     .from_("posts")
     .where_column("posts.user_id", "users.id")
).get()

# Where Column (confronta colonne)
users = DB.table("users")\
          .where_column("created_at", "updated_at")\
          .get()

# Where JSON (per campi JSON)
users = DB.table("users")\
          .where_json("settings->theme", "dark")\
          .get()

# Where Full Text (per ricerca full-text)
posts = DB.table("posts")\
          .where_full_text(["title", "content"], "search term")\
          .get()

Where con Date

Filtri specializzati per campi data:

# Where Date
posts = DB.table("posts").where_date("created_at", "2024-01-15").get()
posts = DB.table("posts").where_date("created_at", ">", "2024-01-01").get()

# Where Year/Month/Day
posts = DB.table("posts").where_year("created_at", 2024).get()
posts = DB.table("posts").where_month("created_at", 12).get()
posts = DB.table("posts").where_day("created_at", 25).get()

# Where Time
logs = DB.table("logs").where_time("created_at", ">", "10:00:00").get()

# Combinazioni
recent_posts = DB.table("posts")\
                 .where_year("created_at", 2024)\
                 .where_month("created_at", 12)\
                 .where("status", "published")\
                 .get()

# Where con helper date
today_posts = DB.table("posts").where_date("created_at", today()).get()
this_week = DB.table("posts")\
              .where_date("created_at", ">=", start_of_week())\
              .get()

# Range di date
posts = DB.table("posts")\
          .where_between("created_at", "2024-01-01", "2024-12-31")\
          .get()

# Date relative
recent = DB.table("posts")\
           .where("created_at", ">=", date_sub(now(), days=7))\
           .get()

Joins

Unisci tabelle per query complesse:

# Inner Join
users_with_posts = DB.table("users")\
                     .join("posts", "users.id", "=", "posts.user_id")\
                     .select("users.*", "posts.title")\
                     .get()

# Left Join
users = DB.table("users")\
          .left_join("posts", "users.id", "=", "posts.user_id")\
          .select("users.*", "posts.title")\
          .get()

# Right Join
posts = DB.table("posts")\
          .right_join("users", "posts.user_id", "=", "users.id")\
          .select("posts.*", "users.name")\
          .get()

# Cross Join
combinations = DB.table("colors")\
                 .cross_join("sizes")\
                 .get()

# Join con condizioni multiple
users = DB.table("users")\
          .join("posts", lambda join:
              join.on("users.id", "=", "posts.user_id")
                  .where("posts.status", "=", "published")
          ).get()

# Join con OR
users = DB.table("users")\
          .join("contacts", lambda join:
              join.on("users.id", "=", "contacts.user_id")
                  .or_on("users.email", "=", "contacts.email")
          ).get()

# Sub-query Join
latest_posts = DB.table("posts")\
                 .select("user_id", DB.raw("MAX(created_at) as latest"))\
                 .group_by("user_id")

users = DB.table("users")\
          .join_sub(latest_posts, "latest_posts", "users.id", "=", "latest_posts.user_id")\
          .get()

# Join con alias
users = DB.table("users as u")\
          .join("posts as p", "u.id", "=", "p.user_id")\
          .select("u.name", "p.title")\
          .get()

Ordinamento e Limiti

Controlla l'ordine e la quantità di risultati:

# Order By
users = DB.table("users").order_by("name").get()
users = DB.table("users").order_by("created_at", "desc").get()

# Ordinamenti multipli
users = DB.table("users")\
          .order_by("role")\
          .order_by("name", "asc")\
          .get()

# Order By Raw
users = DB.table("users")\
          .order_by_raw("name COLLATE utf8_unicode_ci")\
          .get()

# Random order
users = DB.table("users").in_random_order().get()

# Latest/Oldest (per timestamp)
users = DB.table("users").latest().get()        # ORDER BY created_at DESC
users = DB.table("users").latest("updated_at").get()
users = DB.table("users").oldest().get()        # ORDER BY created_at ASC

# Limit e Offset
users = DB.table("users").limit(10).get()
users = DB.table("users").offset(20).limit(10).get()

# Take e Skip (aliases)
users = DB.table("users").take(10).get()
users = DB.table("users").skip(20).take(10).get()

# First e Last
first_user = DB.table("users").order_by("created_at").first()
last_user = DB.table("users").order_by("created_at").last()

# Pagination
users = DB.table("users").paginate(page=1, per_page=15)
print(users.data)        # I dati
print(users.total)       # Totale record
print(users.last_page)   # Ultima pagina

# Simple pagination (senza count)
users = DB.table("users").simple_paginate(15)

Raggruppamento e Aggregazione

Raggruppa dati e calcola statistiche:

# Group By
stats = DB.table("users")\
          .select("role", DB.raw("count(*) as total"))\
          .group_by("role")\
          .get()

# Group By multipli
stats = DB.table("orders")\
          .select("status", "payment_method", DB.raw("count(*) as total"))\
          .group_by("status", "payment_method")\
          .get()

# Having
popular_roles = DB.table("users")\
                  .select("role", DB.raw("count(*) as total"))\
                  .group_by("role")\
                  .having("total", ">", 10)\
                  .get()

# Having con raw
stats = DB.table("orders")\
          .select("user_id", DB.raw("SUM(amount) as total"))\
          .group_by("user_id")\
          .having_raw("SUM(amount) > 1000")\
          .get()

# Funzioni aggregate
total_users = DB.table("users").count()
avg_age = DB.table("users").avg("age")
max_score = DB.table("users").max("score")
min_date = DB.table("posts").min("created_at")
sum_amount = DB.table("orders").sum("amount")

# Aggregate con condizioni
active_users = DB.table("users").where("active", True).count()
admin_avg_age = DB.table("users").where("role", "admin").avg("age")

# Aggregate con Group By
role_stats = DB.table("users")\
               .select("role")\
               .select_raw("count(*) as total")\
               .select_raw("avg(age) as avg_age")\
               .select_raw("max(created_at) as newest")\
               .group_by("role")\
               .get()

# Conditional aggregates
stats = DB.table("orders")\
          .select_raw("SUM(CASE WHEN status = 'completed' THEN amount ELSE 0 END) as completed_amount")\
          .select_raw("SUM(CASE WHEN status = 'pending' THEN amount ELSE 0 END) as pending_amount")\
          .first()

Subquery

Query annidate per logica complessa:

# Subquery in Where
high_spenders = DB.table("orders")\
                  .select("user_id")\
                  .where("amount", ">", 1000)

users = DB.table("users")\
          .where_in("id", high_spenders)\
          .get()

# Subquery come colonna
users = DB.table("users")\
          .select("*")\
          .add_select(DB.table("orders")
                        .select_raw("count(*)")
                        .where_column("user_id", "users.id")
                        .as_("orders_count")
          ).get()

# Subquery in From
subquery = DB.table("orders")\
             .select("user_id", DB.raw("SUM(amount) as total"))\
             .group_by("user_id")

big_spenders = DB.table(subquery.as_("totals"))\
                 .where("total", ">", 5000)\
                 .get()

# Exists subquery
users_with_posts = DB.table("users")\
                     .where_exists(lambda q:
                         q.select(DB.raw(1))
                          .from_("posts")
                          .where_column("posts.user_id", "users.id")
                     ).get()

# Not Exists
users_without_posts = DB.table("users")\
                        .where_not_exists(lambda q:
                            q.select(DB.raw(1))
                             .from_("posts")
                             .where_column("posts.user_id", "users.id")
                        ).get()

# Subquery con Join
latest_posts = DB.table("posts")\
                 .select("user_id", DB.raw("MAX(created_at) as latest"))\
                 .group_by("user_id")

users = DB.table("users")\
          .join_sub(latest_posts, "latest", "users.id", "=", "latest.user_id")\
          .select("users.*", "latest.latest")\
          .get()

Union

Combina risultati di query diverse:

# Union base
admins = DB.table("users").where("role", "admin").select("name", "email")
moderators = DB.table("users").where("role", "moderator").select("name", "email")

staff = admins.union(moderators).get()

# Union All (include duplicati)
all_staff = admins.union_all(moderators).get()

# Union con ordinamento
combined = DB.table("users")\
             .where("role", "admin")\
             .select("name", "email", "role")\
             .union(
                 DB.table("users")
                   .where("role", "moderator")
                   .select("name", "email", "role")
             )\
             .order_by("name")\
             .get()

# Union complesso
current_users = DB.table("users")\
                  .where("active", True)\
                  .select("id", "name", "email", DB.raw("'current' as type"))

archived_users = DB.table("archived_users")\
                   .select("id", "name", "email", DB.raw("'archived' as type"))

all_users = current_users.union(archived_users).get()

Query Raw

Esegui SQL grezzo quando necessario:

# Select raw
users = DB.select("SELECT * FROM users WHERE active = ?", [True])

# Raw con named bindings
users = DB.select("""
    SELECT * FROM users 
    WHERE role = :role AND age > :min_age
""", {"role": "admin", "min_age": 18})

# Insert raw
DB.insert("INSERT INTO users (name, email) VALUES (?, ?)", 
          ["Mario Rossi", "mario@test.com"])

# Update raw
affected = DB.update("""
    UPDATE users SET last_login = ? WHERE id = ?
""", [now(), 1])

# Delete raw
deleted = DB.delete("DELETE FROM users WHERE active = ?", [False])

# Statement generico
DB.statement("CREATE INDEX idx_users_email ON users(email)")

# Transazioni con raw
DB.transaction(lambda:
    DB.statement("SET foreign_key_checks = 0")
    DB.delete("DELETE FROM posts WHERE user_id = ?", [1])
    DB.delete("DELETE FROM users WHERE id = ?", [1])
    DB.statement("SET foreign_key_checks = 1")
)

# Procedure stored
result = DB.select("CALL get_user_stats(?)", [user_id])

# Raw expression in query builder
users = DB.table("users")\
          .select("*", DB.raw("UPPER(name) as name_upper"))\
          .where_raw("age > ? AND role = ?", [18, "admin"])\
          .order_by_raw("RAND()")\
          .get()

Transazioni

Gestisci operazioni atomiche:

# Transazione base
try:
    DB.begin_transaction()
    
    # Operazioni
    user = DB.table("users").insert({"name": "Mario", "email": "mario@test.com"})
    DB.table("profiles").insert({"user_id": user.id, "bio": "Ciao mondo"})
    
    DB.commit()
except Exception as e:
    DB.rollback()
    raise e

# Transazione con context manager
with DB.transaction():
    user = User.create({"name": "Mario", "email": "mario@test.com"})
    Profile.create({"user_id": user.id, "bio": "Ciao mondo"})

# Transazione con callback
def transfer_funds(from_user, to_user, amount):
    def transaction():
        # Verifica saldo
        if from_user.balance < amount:
            raise Exception("Saldo insufficiente")
        
        # Trasferimento
        from_user.decrement("balance", amount)
        to_user.increment("balance", amount)
        
        # Log transazione
        DB.table("transactions").insert({
            "from_user_id": from_user.id,
            "to_user_id": to_user.id,
            "amount": amount,
            "created_at": now()
        })
    
    DB.transaction(transaction)

# Savepoint (nested transactions)
try:
    DB.begin_transaction()
    
    # Operazione 1
    user = User.create(user_data)
    
    # Savepoint
    DB.savepoint("user_created")
    
    try:
        # Operazione rischiosa
        risky_operation()
    except Exception:
        # Rollback solo al savepoint
        DB.rollback_to_savepoint("user_created")
    
    DB.commit()
except Exception:
    DB.rollback()

# Retry automatico
@DB.retry_on_deadlock(max_attempts=3)
def critical_operation():
    with DB.transaction():
        # Operazioni critiche
        pass

Pessimistic Locking

Controlla l'accesso concorrente ai dati:

# Shared lock (read lock)
users = DB.table("users")\
          .where("id", 1)\
          .shared_lock()\
          .get()

# Exclusive lock (write lock)
user = DB.table("users")\
         .where("id", 1)\
         .lock_for_update()\
         .first()

# Lock in transazione
with DB.transaction():
    # Blocca record per aggiornamento
    user = User.where("id", 1).lock_for_update().first()
    user.balance += 100
    user.save()

# Skip locked records
available_jobs = DB.table("jobs")\
                   .where("status", "pending")\
                   .skip_locked()\
                   .lock_for_update()\
                   .get()

# No wait lock
try:
    user = DB.table("users")\
             .where("id", 1)\
             .lock_for_update(no_wait=True)\
             .first()
except LockWaitTimeoutException:
    # Gestisci timeout
    pass

Chunk Processing

Elabora grandi dataset in batch:

# Chunk base
def process_user(user):
    # Elabora singolo utente
    user.update({"processed": True})

# Processa 1000 utenti alla volta
DB.table("users")\
  .where("processed", False)\
  .chunk(1000, process_user)

# Chunk con indice
def process_batch(users, page):
    print(f"Processing batch {page}")
    for user in users:
        process_user(user)

DB.table("users").chunk(500, process_batch)

# Chunk lazy (memoria efficiente)
for users in DB.table("users").lazy_chunk(100):
    for user in users:
        process_user(user)

# Chunk con order by
DB.table("users")\
  .order_by("id")\
  .chunk(1000, process_user)

# Chunk con where
DB.table("orders")\
  .where("status", "pending")\
  .where("created_at", "<", date_sub(now(), hours=1))\
  .chunk(500, lambda orders: process_orders(orders))

# Chunk avanzato con controllo errori
def safe_chunk_processor(records, page):
    try:
        for record in records:
            process_record(record)
    except Exception as e:
        log_error(f"Error processing chunk {page}: {e}")
        # Continua con il prossimo chunk
        return True  # Continue processing
    
DB.table("large_table").chunk(1000, safe_chunk_processor)

Query Caching

Cache intelligente per query frequenti:

# Cache query result
users = DB.table("users")\
          .where("active", True)\
          .remember(minutes=10)\
          .get()

# Cache con chiave custom
popular_posts = DB.table("posts")\
                  .where("views", ">", 1000)\
                  .remember(minutes=30, key="popular_posts")\
                  .get()

# Cache con tag
users = DB.table("users")\
          .where("role", "admin")\
          .remember(minutes=15, tags=["users", "admin"])\
          .get()

# Cache forever (fino a invalidazione)
settings = DB.table("settings")\
             .remember_forever()\
             .get()

# Cache condizionale
users = DB.table("users")\
          .when(should_cache, lambda q: q.remember(minutes=5))\
          .get()

# Invalidazione cache
DB.forget_cache("popular_posts")
DB.flush_cache_tags(["users"])

# Cache con callback
def expensive_query():
    return DB.table("complex_view")\
             .join("other_table", "complex_view.id", "=", "other_table.ref_id")\
             .where("complex_condition", True)\
             .get()

result = DB.remember("expensive_query", minutes=60, callback=expensive_query)

Debugging e Profiling

Strumenti per ottimizzare le performance:

# Debug query
users = DB.table("users")\
          .where("active", True)\
          .debug()  # Stampa SQL e bindings

# Log query
DB.enable_query_log()
users = User.where("role", "admin").get()
queries = DB.get_query_log()

# Explain query
explain = DB.table("users")\
            .where("active", True)\
            .explain()

# Profiling
with DB.profile():
    users = User.with("posts", "comments").get()
    # Mostra statistiche query

# Listen to queries
def log_query(query, bindings, time):
    if time > 100:  # Query lente > 100ms
        log_warning(f"Slow query: {query} ({time}ms)")

DB.listen(log_query)

# Macro per query riutilizzabili
def search_users(query, term):
    return query.where("name", "like", f"%{term}%")\
                .or_where("email", "like", f"%{term}%")

DB.macro("search_users", search_users)

# Uso della macro
results = DB.table("users").search_users("mario").get()

Esempio Completo: Analytics Dashboard

# Analytics per dashboard amministratore
class AnalyticsService:
    
    def get_user_stats(self, period="month"):
        """Statistiche utenti per periodo"""
        
        # Base query con filtro periodo
        base_query = DB.table("users")
        
        if period == "day":
            date_filter = date_sub(now(), days=1)
        elif period == "week":
            date_filter = date_sub(now(), weeks=1)
        elif period == "month":
            date_filter = date_sub(now(), months=1)
        else:
            date_filter = date_sub(now(), years=1)
        
        # Nuovi utenti
        new_users = base_query.where("created_at", ">=", date_filter).count()
        
        # Utenti attivi
        active_users = base_query.where("last_login", ">=", date_filter).count()
        
        # Utenti per ruolo
        users_by_role = DB.table("users")\
                          .select("role", DB.raw("count(*) as total"))\
                          .group_by("role")\
                          .remember(minutes=10, key=f"users_by_role_{period}")\
                          .get()
        
        # Crescita utenti (query complessa)
        growth_data = DB.select("""
            SELECT 
                DATE(created_at) as date,
                COUNT(*) as new_users,
                SUM(COUNT(*)) OVER (ORDER BY DATE(created_at)) as total_users
            FROM users 
            WHERE created_at >= ?
            GROUP BY DATE(created_at)
            ORDER BY date
        """, [date_filter])
        
        return {
            "new_users": new_users,
            "active_users": active_users,
            "users_by_role": users_by_role,
            "growth_data": growth_data
        }
    
    def get_content_stats(self):
        """Statistiche contenuti"""
        
        # Posts per status
        posts_by_status = DB.table("posts")\
                            .select("status", DB.raw("count(*) as total"))\
                            .group_by("status")\
                            .remember(minutes=5)\
                            .get()
        
        # Top autori
        top_authors = DB.table("users")\
                        .join("posts", "users.id", "=", "posts.user_id")\
                        .select("users.name", "users.email", 
                               DB.raw("count(posts.id) as posts_count"))\
                        .group_by("users.id", "users.name", "users.email")\
                        .order_by("posts_count", "desc")\
                        .limit(10)\
                        .get()
        
        # Contenuti più popolari
        popular_content = DB.table("posts")\
                            .select("title", "views", "comments_count", "created_at")\
                            .where("status", "published")\
                            .order_by("views", "desc")\
                            .limit(20)\
                            .remember(minutes=15, key="popular_content")\
                            .get()
        
        return {
            "posts_by_status": posts_by_status,
            "top_authors": top_authors,
            "popular_content": popular_content
        }
    
    def get_engagement_metrics(self):
        """Metriche di engagement"""
        
        # Commenti per giorno (ultimi 30 giorni)
        comments_trend = DB.select("""
            SELECT 
                DATE(created_at) as date,
                COUNT(*) as comments_count
            FROM comments 
            WHERE created_at >= ?
            GROUP BY DATE(created_at)
            ORDER BY date
        """, [date_sub(now(), days=30)])
        
        # Like per post (top 10)
        most_liked = DB.table("posts")\
                       .left_join("likes", "posts.id", "=", "likes.post_id")\
                       .select("posts.title", 
                              DB.raw("count(likes.id) as likes_count"))\
                       .group_by("posts.id", "posts.title")\
                       .order_by("likes_count", "desc")\
                       .limit(10)\
                       .get()
        
        # Utenti più attivi
        active_commenters = DB.table("users")\
                              .join("comments", "users.id", "=", "comments.user_id")\
                              .select("users.name", 
                                     DB.raw("count(comments.id) as comments_count"))\
                              .where("comments.created_at", ">=", date_sub(now(), days=30))\
                              .group_by("users.id", "users.name")\
                              .order_by("comments_count", "desc")\
                              .limit(10)\
                              .get()
        
        return {
            "comments_trend": comments_trend,
            "most_liked": most_liked,
            "active_commenters": active_commenters
        }
    
    def get_performance_insights(self):
        """Insights sulle performance"""
        
        # Query lente
        slow_queries = DB.get_slow_queries(threshold=100)
        
        # Tabelle più utilizzate
        table_usage = DB.get_table_stats()
        
        # Indici mancanti
        missing_indexes = DB.analyze_missing_indexes()
        
        return {
            "slow_queries": slow_queries,
            "table_usage": table_usage,
            "missing_indexes": missing_indexes
        }

# Uso del servizio
analytics = AnalyticsService()

# Dashboard data
dashboard_data = {
    "user_stats": analytics.get_user_stats("month"),
    "content_stats": analytics.get_content_stats(),
    "engagement": analytics.get_engagement_metrics()
}

# Report performance (admin only)
if current_user().is_admin():
    dashboard_data["performance"] = analytics.get_performance_insights()

return json_response(dashboard_data)

Best Practices

⚡ Performance

  • Usa indici appropriati
  • Limita i risultati con LIMIT
  • Evita SELECT * quando possibile
  • Cache query frequenti

🔒 Sicurezza

  • Usa sempre parametri vincolati
  • Valida input utente
  • Limita permessi database
  • Evita query raw quando possibile

📊 Debugging

  • Monitora query lente
  • Usa EXPLAIN per ottimizzazione
  • Log query in development
  • Profila performance critical path
🚀 Fantastico! Ora padroneggi il Query Builder di Flux. Scopri come creare Relazioni tra Models per semplificare query complesse.