Migrazioni Database
Versioning controllato del database con rollback sicuri e modifiche incrementali
Controllo Versione: Le migrazioni permettono di gestire la struttura del database
come codice, con versioning, rollback e deployment automatizzati.
Concetti Base
Le migrazioni sono file che descrivono modifiche al database:
# Creare una migrazione
flux make:migration create_users_table
# Creare migrazione per tabella esistente
flux make:migration add_email_to_users_table --table=users
# Eseguire migrazioni
flux migrate
# Rollback ultima migrazione
flux migrate:rollback
# Reset completo database
flux migrate:reset
# Refresh (reset + migrate)
flux migrate:refresh
# Status migrazioni
flux migrate:status
Struttura Migrazione
Ogni migrazione ha due metodi: up() e down():
# database/migrations/2024_12_25_100000_create_users_table.flux
from flux.db.migrations import Migration
from flux.db.schema import Schema
class CreateUsersTable(Migration):
def up(self):
"""Esegue la migrazione"""
Schema.create("users", lambda table:
table.id()
table.string("name")
table.string("email").unique()
table.timestamp("email_verified_at").nullable()
table.string("password")
table.boolean("active").default(True)
table.json("settings").nullable()
table.timestamps()
table.soft_deletes()
)
def down(self):
"""Rollback migrazione"""
Schema.drop("users")
Creare Tabelle
Definisci la struttura delle tabelle:
# Tabella base
Schema.create("posts", lambda table:
table.id() # Primary key auto-increment
table.string("title") # VARCHAR(255)
table.string("slug").unique() # VARCHAR(255) UNIQUE
table.text("content") # TEXT
table.text("excerpt").nullable() # TEXT NULL
table.enum("status", ["draft", "published", "archived"]).default("draft")
table.integer("views").default(0) # INT DEFAULT 0
table.decimal("price", 8, 2).nullable() # DECIMAL(8,2) NULL
table.boolean("featured").default(False) # BOOLEAN DEFAULT FALSE
table.date("published_at").nullable() # DATE NULL
table.timestamps() # created_at, updated_at
table.soft_deletes() # deleted_at
)
# Tabella con foreign keys
Schema.create("posts", lambda table:
table.id()
table.string("title")
table.text("content")
table.foreign_id("user_id").constrained() # user_id → users(id)
table.foreign_id("category_id").constrained("categories")
table.timestamps()
)
# Tabella pivot per Many-to-Many
Schema.create("post_tags", lambda table:
table.id()
table.foreign_id("post_id").constrained().on_delete("cascade")
table.foreign_id("tag_id").constrained().on_delete("cascade")
table.integer("sort_order").default(0)
table.timestamps()
# Indice composto per performance
table.index(["post_id", "tag_id"])
table.unique(["post_id", "tag_id"]) # Previene duplicati
)
# Tabella polimorfica
Schema.create("comments", lambda table:
table.id()
table.text("content")
table.morph("commentable") # commentable_id, commentable_type
table.foreign_id("user_id").constrained()
table.boolean("approved").default(False)
table.timestamps()
# Indice per relazione polimorfica
table.index(["commentable_type", "commentable_id"])
)
Tipi di Colonne
Tutti i tipi di colonne supportati:
# Tipi numerici
table.id() # BIGINT UNSIGNED AUTO_INCREMENT PRIMARY
table.big_integer("big_number") # BIGINT
table.integer("number") # INT
table.medium_integer("med_number") # MEDIUMINT
table.small_integer("small_number") # SMALLINT
table.tiny_integer("tiny_number") # TINYINT
table.unsigned_integer("positive") # INT UNSIGNED
table.decimal("price", 8, 2) # DECIMAL(8,2)
table.double("coordinates", 15, 8) # DOUBLE(15,8)
table.float("percentage", 8, 2) # FLOAT(8,2)
# Tipi stringa
table.string("name", 100) # VARCHAR(100)
table.char("code", 10) # CHAR(10)
table.text("description") # TEXT
table.medium_text("content") # MEDIUMTEXT
table.long_text("article") # LONGTEXT
table.binary("data") # BLOB
table.uuid("identifier") # CHAR(36) o UUID native
# Tipi data/ora
table.date("birth_date") # DATE
table.time("start_time") # TIME
table.datetime("event_datetime") # DATETIME
table.timestamp("created_at") # TIMESTAMP
table.year("birth_year") # YEAR
# Tipi speciali
table.boolean("active") # BOOLEAN/TINYINT(1)
table.enum("status", ["active", "inactive", "pending"])
table.set("permissions", ["read", "write", "delete"])
table.json("metadata") # JSON/TEXT
table.jsonb("data") # JSONB (PostgreSQL)
table.ip_address("client_ip") # Adatto per indirizzi IP
table.mac_address("device_mac") # Adatto per MAC address
# Colonne speciali
table.timestamps() # created_at, updated_at TIMESTAMP
table.soft_deletes("deleted_at") # deleted_at TIMESTAMP NULL
table.remember_token() # remember_token VARCHAR(100) NULL
# Foreign keys
table.foreign_id("user_id") # BIGINT UNSIGNED
table.morph("taggable") # taggable_id, taggable_type
# Modifiers comuni
table.string("email").nullable() # NULL allowed
table.string("name").default("Guest") # Valore default
table.integer("sort").unsigned() # UNSIGNED
table.string("title").unique() # UNIQUE constraint
table.text("content").full_text() # Full-text index
table.string("name").comment("User name") # Commento colonna
Indici e Constraint
Ottimizza performance e integrità dati:
Schema.create("users", lambda table:
table.id()
table.string("name")
table.string("email")
table.string("phone").nullable()
table.integer("age")
table.string("city")
table.timestamps()
# Indici semplici
table.index("email") # INDEX
table.unique("email") # UNIQUE INDEX
table.full_text("name") # FULLTEXT INDEX
# Indici composti
table.index(["city", "age"]) # INDEX su più colonne
table.unique(["email", "phone"]) # UNIQUE su più colonne
# Indici con nomi custom
table.index("email", "idx_users_email")
table.unique(["name", "city"], "unq_users_name_city")
# Primary key composta (raro)
table.primary(["user_id", "role_id"])
)
# Aggiungere indici a tabella esistente
Schema.table("users", lambda table:
table.index("last_login")
table.index(["status", "created_at"], "idx_active_users")
table.spatial_index("coordinates") # Per dati geografici
)
# Foreign key constraints
Schema.create("posts", lambda table:
table.id()
table.string("title")
table.foreign_id("user_id").constrained() # Auto-constraint a users(id)
table.foreign_id("category_id").constrained("categories")
# Foreign key con opzioni
table.foreign_id("author_id")\
.constrained("users")\
.on_update("cascade")\
.on_delete("set_null")
# Foreign key custom
table.foreign("user_id", "fk_posts_user")\
.references("id").on("users")\
.on_delete("cascade")
)
# Check constraints (dove supportati)
Schema.create("products", lambda table:
table.id()
table.string("name")
table.decimal("price", 8, 2)
table.integer("stock")
# Check constraints
table.check("price > 0", "chk_positive_price")
table.check("stock >= 0", "chk_non_negative_stock")
)
Modificare Tabelle
Altera tabelle esistenti in modo sicuro:
# Aggiungere colonne
Schema.table("users", lambda table:
table.string("phone").nullable().after("email")
table.date("birth_date").nullable()
table.json("preferences").default("{}")
table.boolean("newsletter").default(True)
)
# Modificare colonne esistenti
Schema.table("users", lambda table:
table.string("name", 150).change() # Cambia lunghezza
table.text("bio").nullable().change() # Rende nullable
table.integer("age").unsigned().change() # Aggiunge unsigned
)
# Rinominare colonne
Schema.table("users", lambda table:
table.rename_column("name", "full_name")
table.rename_column("phone", "phone_number")
)
# Eliminare colonne
Schema.table("users", lambda table:
table.drop_column("temporary_field")
table.drop_column(["old_field1", "old_field2"])
)
# Aggiungere/eliminare indici
Schema.table("posts", lambda table:
# Aggiungere indici
table.index("slug")
table.unique("slug")
table.index(["user_id", "created_at"])
# Eliminare indici
table.drop_index("posts_title_index")
table.drop_unique("posts_slug_unique")
table.drop_primary()
)
# Foreign keys
Schema.table("posts", lambda table:
# Aggiungere foreign key
table.foreign("user_id").references("id").on("users")
# Eliminare foreign key
table.drop_foreign("posts_user_id_foreign")
table.drop_foreign(["user_id"]) # Auto-detect nome
)
# Rinominare tabella
Schema.rename("posts", "articles")
# Cambiare engine/charset (MySQL)
Schema.table("users", lambda table:
table.engine = "InnoDB"
table.charset = "utf8mb4"
table.collation = "utf8mb4_unicode_ci"
)
Controllo Esistenza
Verifica esistenza prima di modificare:
# Verificare esistenza tabelle/colonne
if Schema.has_table("users"):
print("Table users exists")
if Schema.has_column("users", "email"):
print("Column email exists")
if Schema.has_columns("users", ["name", "email"]):
print("All columns exist")
# Migrazione condizionale
def up(self):
if not Schema.has_table("users"):
Schema.create("users", lambda table:
table.id()
table.string("name")
table.string("email").unique()
table.timestamps()
)
if not Schema.has_column("users", "phone"):
Schema.table("users", lambda table:
table.string("phone").nullable()
)
# Verifica indici
if Schema.has_index("users", "users_email_unique"):
Schema.table("users", lambda table:
table.drop_unique("users_email_unique")
)
# Database-specific checks
if Schema.get_connection().driver_name == "mysql":
# Logica specifica MySQL
pass
elif Schema.get_connection().driver_name == "postgresql":
# Logica specifica PostgreSQL
pass
Seeders
Popola database con dati iniziali:
# database/seeders/DatabaseSeeder.flux
from flux.db.seeders import Seeder
from .UserSeeder import UserSeeder
from .PostSeeder import PostSeeder
from .CategorySeeder import CategorySeeder
class DatabaseSeeder(Seeder):
def run(self):
"""Esegue tutti i seeders"""
self.call([
CategorySeeder,
UserSeeder,
PostSeeder
])
# database/seeders/UserSeeder.flux
class UserSeeder(Seeder):
def run(self):
# Admin user
User.create({
"name": "Admin User",
"email": "admin@example.com",
"password": hash_password("password"),
"role": "admin",
"email_verified_at": now()
})
# Test users con Factory
UserFactory.create_batch(50)
# VIP users
UserFactory.admin().create_batch(5)
# Bulk insert per performance
users_data = []
for i in range(1000):
users_data.append({
"name": f"User {i}",
"email": f"user{i}@example.com",
"password": hash_password("password"),
"created_at": now(),
"updated_at": now()
})
User.insert(users_data)
# database/seeders/CategorySeeder.flux
class CategorySeeder(Seeder):
def run(self):
categories = [
{"name": "Tecnologia", "slug": "tecnologia"},
{"name": "Sport", "slug": "sport"},
{"name": "Viaggi", "slug": "viaggi"},
{"name": "Cucina", "slug": "cucina"}
]
for category_data in categories:
category = Category.create(category_data)
# Sottocategorie
if category.slug == "tecnologia":
category.children().create_many([
{"name": "Programmazione", "slug": "programmazione"},
{"name": "Hardware", "slug": "hardware"},
{"name": "Software", "slug": "software"}
])
# Eseguire seeders
# flux db:seed
# flux db:seed --class=UserSeeder
Comandi CLI
Gestisci migrazioni da linea di comando:
# Creare migrazioni
flux make:migration create_users_table
flux make:migration add_phone_to_users --table=users
flux make:migration create_posts_table --create=posts
# Eseguire migrazioni
flux migrate # Esegue tutte le pending
flux migrate --step=1 # Esegue solo 1 migrazione
flux migrate --path=database/migrations/custom
flux migrate --database=mysql # Database specifico
# Rollback
flux migrate:rollback # Rollback ultimo batch
flux migrate:rollback --step=3 # Rollback 3 migrazioni
flux migrate:rollback --batch=5 # Rollback batch specifico
# Reset e refresh
flux migrate:reset # Rollback tutte le migrazioni
flux migrate:refresh # Reset + migrate
flux migrate:refresh --seed # Refresh + seed
# Status e info
flux migrate:status # Mostra status migrazioni
flux migrate:install # Crea tabella migrations
# Fresh (drop all + migrate)
flux migrate:fresh # Drop tutte le tabelle + migrate
flux migrate:fresh --seed # Fresh + seed
# Seeders
flux db:seed # Esegue DatabaseSeeder
flux db:seed --class=UserSeeder # Seeder specifico
flux make:seeder UserSeeder # Crea nuovo seeder
# Schema dump
flux schema:dump # Esporta schema corrente
flux schema:dump --path=schema.sql
Migrazioni Multi-Database
Gestisci più database contemporaneamente:
# config/database.flux
DATABASES = {
"default": {
"driver": "mysql",
"host": "localhost",
"database": "app_main",
"username": "root",
"password": ""
},
"analytics": {
"driver": "mysql",
"host": "localhost",
"database": "app_analytics",
"username": "root",
"password": ""
},
"logs": {
"driver": "postgresql",
"host": "localhost",
"database": "app_logs",
"username": "postgres",
"password": ""
}
}
# Migrazione per database specifico
class CreateAnalyticsTable(Migration):
connection = "analytics" # Specifica database
def up(self):
Schema.connection("analytics").create("page_views", lambda table:
table.id()
table.string("url")
table.string("user_agent").nullable()
table.ip_address("ip_address")
table.timestamp("viewed_at")
table.index(["url", "viewed_at"])
)
def down(self):
Schema.connection("analytics").drop("page_views")
# Uso nei seeders
class AnalyticsSeeder(Seeder):
connection = "analytics"
def run(self):
# Seed dati analytics
PageView = Model.connection("analytics").table("page_views")
PageView.insert([
{"url": "/", "ip_address": "127.0.0.1", "viewed_at": now()},
{"url": "/blog", "ip_address": "127.0.0.1", "viewed_at": now()}
])
# Comandi CLI per database specifico
# flux migrate --database=analytics
# flux migrate:rollback --database=logs
# flux db:seed --database=analytics --class=AnalyticsSeeder
Migrazioni Personalizzate
Operazioni avanzate e custom:
# Migrazione con raw SQL
class OptimizeDatabase(Migration):
def up(self):
# Ottimizzazioni MySQL
if Schema.get_connection().driver_name == "mysql":
DB.statement("OPTIMIZE TABLE users, posts, comments")
DB.statement("ANALYZE TABLE users, posts")
# Stored procedure
DB.statement("""
CREATE PROCEDURE get_user_stats(IN user_id INT)
BEGIN
SELECT
COUNT(p.id) as posts_count,
COUNT(c.id) as comments_count,
MAX(p.created_at) as last_post
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
LEFT JOIN comments c ON u.id = c.user_id
WHERE u.id = user_id;
END
""")
def down(self):
DB.statement("DROP PROCEDURE IF EXISTS get_user_stats")
# Migrazione per dati
class MigrateUserRoles(Migration):
def up(self):
# Migra da colonna 'role' a tabella roles
# Crea tabella roles se non exists
if not Schema.has_table("roles"):
Schema.create("roles", lambda table:
table.id()
table.string("name").unique()
table.timestamps()
)
# Crea tabella pivot
Schema.create("user_roles", lambda table:
table.foreign_id("user_id").constrained().on_delete("cascade")
table.foreign_id("role_id").constrained().on_delete("cascade")
table.timestamps()
table.primary(["user_id", "role_id"])
)
# Migra dati esistenti
admin_role = Role.create({"name": "admin"})
user_role = Role.create({"name": "user"})
# Assegna ruoli basandosi su colonna esistente
for user in User.where("role", "admin").get():
user.roles().attach(admin_role.id)
for user in User.where("role", "user").get():
user.roles().attach(user_role.id)
# Rimuovi vecchia colonna
Schema.table("users", lambda table:
table.drop_column("role")
)
def down(self):
# Ripristina colonna role
Schema.table("users", lambda table:
table.string("role").default("user")
)
# Migra dati indietro
for user in User.with("roles").get():
if user.roles().where("name", "admin").exists():
user.update({"role": "admin"})
else:
user.update({"role": "user"})
# Elimina tabelle
Schema.drop("user_roles")
Schema.drop("roles")
# Migrazione per performance
class AddSearchIndexes(Migration):
def up(self):
Schema.table("posts", lambda table:
# Full-text search
table.full_text(["title", "content"], "posts_search_idx")
# Indici per query comuni
table.index(["status", "published_at"], "posts_published_idx")
table.index(["user_id", "created_at"], "posts_user_timeline_idx")
)
Schema.table("users", lambda table:
table.index(["active", "last_login"], "users_active_idx")
)
def down(self):
Schema.table("posts", lambda table:
table.drop_full_text("posts_search_idx")
table.drop_index("posts_published_idx")
table.drop_index("posts_user_timeline_idx")
)
Schema.table("users", lambda table:
table.drop_index("users_active_idx")
)
Squashing Migrazioni
Ottimizza migrazioni per deployment:
# Squash più migrazioni in una sola
# flux migrate:squash 2024_01_01_000000 2024_12_31_999999
class SquashedMigrations(Migration):
"""
Squashed migrations from 2024_01_01 to 2024_12_31
Original migrations:
- 2024_01_15_create_users_table
- 2024_02_10_add_phone_to_users
- 2024_03_05_create_posts_table
- 2024_04_20_add_slug_to_posts
- 2024_05_12_create_comments_table
"""
def up(self):
# Users table (finale)
Schema.create("users", lambda table:
table.id()
table.string("name")
table.string("email").unique()
table.string("phone").nullable() # Aggiunto in feb
table.timestamp("email_verified_at").nullable()
table.string("password")
table.boolean("active").default(True)
table.timestamps()
table.soft_deletes()
)
# Posts table (finale)
Schema.create("posts", lambda table:
table.id()
table.string("title")
table.string("slug").unique() # Aggiunto in apr
table.text("content")
table.enum("status", ["draft", "published"]).default("draft")
table.foreign_id("user_id").constrained()
table.timestamps()
table.soft_deletes()
)
# Comments table
Schema.create("comments", lambda table:
table.id()
table.text("content")
table.foreign_id("post_id").constrained().on_delete("cascade")
table.foreign_id("user_id").constrained()
table.boolean("approved").default(False)
table.timestamps()
)
def down(self):
Schema.drop_if_exists("comments")
Schema.drop_if_exists("posts")
Schema.drop_if_exists("users")
# Schema snapshot per fresh installs
# flux schema:dump --squash
class SchemaSnapshot(Migration):
"""Current database schema snapshot"""
def up(self):
# Schema completo corrente
self.create_all_tables()
self.create_all_indexes()
self.create_all_foreign_keys()
def create_all_tables(self):
# Tutte le tabelle in ordine di dipendenza
pass
def create_all_indexes(self):
# Tutti gli indici
pass
def create_all_foreign_keys(self):
# Tutte le foreign keys
pass
Testing Migrazioni
Testa migrazioni in ambiente sicuro:
# tests/migrations/test_user_migration.flux
class TestUserMigration(TestCase):
def setUp(self):
# Setup database test
self.migrate_to("2024_01_14_999999") # Prima della migrazione users
def test_users_table_creation(self):
# Verifica tabella non esiste
self.assert_false(Schema.has_table("users"))
# Esegui migrazione
self.migrate_to("2024_01_15_000000")
# Verifica tabella creata
self.assert_true(Schema.has_table("users"))
self.assert_true(Schema.has_column("users", "email"))
self.assert_true(Schema.has_index("users", "users_email_unique"))
def test_users_rollback(self):
# Vai alla migrazione
self.migrate_to("2024_01_15_000000")
self.assert_true(Schema.has_table("users"))
# Rollback
self.migrate_to("2024_01_14_999999")
self.assert_false(Schema.has_table("users"))
def test_migration_with_data(self):
# Migrazione + dati
self.migrate_to("2024_01_15_000000")
# Inserisci dati test
User.create({
"name": "Test User",
"email": "test@example.com",
"password": "password"
})
# Testa migrazione successiva
self.migrate_to("2024_02_10_000000") # Add phone column
# Verifica dati preservati
user = User.where("email", "test@example.com").first()
self.assert_not_null(user)
self.assert_true(Schema.has_column("users", "phone"))
# Test performance migrazione
class TestMigrationPerformance(TestCase):
def test_large_table_migration(self):
# Crea molti dati
self.seed_large_dataset()
start_time = time.time()
self.migrate_to("2024_03_01_add_index_to_large_table")
end_time = time.time()
# Verifica performance accettabile
self.assert_less_than(end_time - start_time, 60) # Max 60 secondi
def seed_large_dataset(self):
# Inserisci 100k records per test
pass
# Comandi test
# flux test --filter=migration
# flux migrate:test --from=2024_01_01 --to=2024_12_31
Esempio Completo: E-commerce Schema
# database/migrations/2024_12_25_100000_create_ecommerce_schema.flux
class CreateEcommerceSchema(Migration):
def up(self):
# Users table
Schema.create("users", lambda table:
table.id()
table.string("name")
table.string("email").unique()
table.timestamp("email_verified_at").nullable()
table.string("password")
table.date("birth_date").nullable()
table.enum("gender", ["M", "F", "O"]).nullable()
table.boolean("active").default(True)
table.timestamps()
table.soft_deletes()
table.index("email")
table.index("active")
)
# Categories table
Schema.create("categories", lambda table:
table.id()
table.string("name")
table.string("slug").unique()
table.text("description").nullable()
table.string("image").nullable()
table.foreign_id("parent_id").nullable().constrained("categories")
table.integer("sort_order").default(0)
table.boolean("active").default(True)
table.timestamps()
table.index(["parent_id", "sort_order"])
table.index("slug")
)
# Products table
Schema.create("products", lambda table:
table.id()
table.string("name")
table.string("slug").unique()
table.text("description").nullable()
table.text("short_description").nullable()
table.string("sku").unique()
table.decimal("price", 10, 2)
table.decimal("sale_price", 10, 2).nullable()
table.integer("stock_quantity").default(0)
table.boolean("manage_stock").default(True)
table.boolean("in_stock").default(True)
table.decimal("weight", 8, 2).nullable()
table.json("dimensions").nullable() # {length, width, height}
table.foreign_id("category_id").constrained()
table.enum("status", ["active", "inactive", "draft"]).default("draft")
table.boolean("featured").default(False)
table.timestamps()
table.soft_deletes()
table.index(["status", "featured"])
table.index(["category_id", "status"])
table.index("sku")
table.full_text(["name", "description"])
)
# Product images
Schema.create("product_images", lambda table:
table.id()
table.foreign_id("product_id").constrained().on_delete("cascade")
table.string("path")
table.string("alt_text").nullable()
table.integer("sort_order").default(0)
table.boolean("is_primary").default(False)
table.timestamps()
table.index(["product_id", "sort_order"])
)
# Product attributes (size, color, etc.)
Schema.create("attributes", lambda table:
table.id()
table.string("name") # Color, Size, Material
table.string("slug")
table.enum("type", ["text", "number", "select", "multiselect"])
table.json("options").nullable() # Per select: ["Red", "Blue", "Green"]
table.timestamps()
table.unique("slug")
)
# Product attribute values
Schema.create("product_attributes", lambda table:
table.id()
table.foreign_id("product_id").constrained().on_delete("cascade")
table.foreign_id("attribute_id").constrained().on_delete("cascade")
table.string("value")
table.timestamps()
table.index(["product_id", "attribute_id"])
table.unique(["product_id", "attribute_id"])
)
# Addresses
Schema.create("addresses", lambda table:
table.id()
table.foreign_id("user_id").constrained().on_delete("cascade")
table.string("type").default("shipping") # shipping, billing
table.string("first_name")
table.string("last_name")
table.string("company").nullable()
table.string("address_line_1")
table.string("address_line_2").nullable()
table.string("city")
table.string("state").nullable()
table.string("postal_code")
table.string("country", 2) # ISO country code
table.string("phone").nullable()
table.boolean("is_default").default(False)
table.timestamps()
table.index("user_id")
)
# Carts
Schema.create("carts", lambda table:
table.id()
table.foreign_id("user_id").nullable().constrained().on_delete("cascade")
table.string("session_id").nullable() # Per guest users
table.timestamps()
table.index("user_id")
table.index("session_id")
)
# Cart items
Schema.create("cart_items", lambda table:
table.id()
table.foreign_id("cart_id").constrained().on_delete("cascade")
table.foreign_id("product_id").constrained().on_delete("cascade")
table.integer("quantity")
table.decimal("price", 10, 2) # Prezzo al momento dell'aggiunta
table.json("product_options").nullable() # Size, color selezionati
table.timestamps()
table.index("cart_id")
table.unique(["cart_id", "product_id"])
)
# Orders
Schema.create("orders", lambda table:
table.id()
table.string("order_number").unique()
table.foreign_id("user_id").nullable().constrained()
table.enum("status", [
"pending", "processing", "shipped",
"delivered", "cancelled", "refunded"
]).default("pending")
table.decimal("subtotal", 10, 2)
table.decimal("tax_amount", 10, 2).default(0)
table.decimal("shipping_amount", 10, 2).default(0)
table.decimal("discount_amount", 10, 2).default(0)
table.decimal("total", 10, 2)
table.string("currency", 3).default("EUR")
table.json("billing_address")
table.json("shipping_address")
table.string("payment_status").default("pending")
table.string("payment_method").nullable()
table.text("notes").nullable()
table.timestamp("shipped_at").nullable()
table.timestamp("delivered_at").nullable()
table.timestamps()
table.index(["user_id", "status"])
table.index("order_number")
table.index("status")
)
# Order items
Schema.create("order_items", lambda table:
table.id()
table.foreign_id("order_id").constrained().on_delete("cascade")
table.foreign_id("product_id").constrained()
table.string("product_name") # Snapshot
table.string("product_sku") # Snapshot
table.integer("quantity")
table.decimal("price", 10, 2)
table.decimal("total", 10, 2)
table.json("product_options").nullable()
table.timestamps()
table.index("order_id")
)
# Reviews
Schema.create("reviews", lambda table:
table.id()
table.foreign_id("product_id").constrained().on_delete("cascade")
table.foreign_id("user_id").constrained().on_delete("cascade")
table.foreign_id("order_id").nullable().constrained()
table.integer("rating") # 1-5
table.string("title").nullable()
table.text("content").nullable()
table.boolean("approved").default(False)
table.json("images").nullable() # Array di path immagini
table.timestamps()
table.index(["product_id", "approved"])
table.unique(["product_id", "user_id", "order_id"])
table.check("rating >= 1 AND rating <= 5", "valid_rating")
)
# Coupons
Schema.create("coupons", lambda table:
table.id()
table.string("code").unique()
table.enum("type", ["fixed", "percentage"])
table.decimal("value", 10, 2)
table.decimal("minimum_amount", 10, 2).nullable()
table.integer("usage_limit").nullable()
table.integer("used_count").default(0)
table.boolean("active").default(True)
table.date("starts_at").nullable()
table.date("expires_at").nullable()
table.timestamps()
table.index("code")
table.index(["active", "expires_at"])
)
def down(self):
Schema.drop_if_exists("coupons")
Schema.drop_if_exists("reviews")
Schema.drop_if_exists("order_items")
Schema.drop_if_exists("orders")
Schema.drop_if_exists("cart_items")
Schema.drop_if_exists("carts")
Schema.drop_if_exists("addresses")
Schema.drop_if_exists("product_attributes")
Schema.drop_if_exists("attributes")
Schema.drop_if_exists("product_images")
Schema.drop_if_exists("products")
Schema.drop_if_exists("categories")
Schema.drop_if_exists("users")
Best Practices
🔄 Versioning
- Una migrazione per modifica
- Nomi descrittivi e ordinati
- Sempre testare rollback
- Backup prima di modifiche rischiose
🚀 Performance
- Indici per query frequenti
- Foreign keys per integrità
- Batch insert per seeders
- Monitorare tempi esecuzione
🔒 Sicurezza
- Controlli esistenza prima modifiche
- Transazioni per operazioni multiple
- Validazione dati nei seeders
- Rollback testati in staging
🎉 Perfetto! Ora sai gestire completamente lo schema del database con le migrazioni.
Scopri le funzionalità Avanzate di Flux
per portare la tua applicazione al livello successivo.