Sistema di Cache
Cache intelligente e distribuita per prestazioni ottimali
Performance First: Il sistema di cache di Flux supporta Redis, Memcached,
cache distribuita e strategie avanzate per ottimizzare le prestazioni delle tue applicazioni.
Configurazione Cache
Setup dei driver di cache disponibili:
# config/cache.flux
CACHE_CONFIG = {
# Driver di default
"default": "redis",
# Store disponibili
"stores": {
"file": {
"driver": "file",
"path": "storage/cache",
"prefix": "flux_cache"
},
"redis": {
"driver": "redis",
"connection": "cache", # Da config/database.flux
"prefix": "flux_cache:",
"serializer": "pickle" # pickle, json, msgpack
},
"memcached": {
"driver": "memcached",
"servers": [
{"host": "127.0.0.1", "port": 11211, "weight": 100}
],
"prefix": "flux_cache:",
"options": {
"compression": True,
"serializer": "pickle"
}
},
"database": {
"driver": "database",
"table": "cache",
"connection": None, # Default DB connection
"prefix": "flux_cache"
},
"memory": {
"driver": "memory",
"max_size": 1000 # Max items in memory
},
"distributed": {
"driver": "distributed",
"stores": ["redis", "memcached"],
"strategy": "replicate" # replicate, sharding
}
},
# Prefisso globale
"prefix": env("CACHE_PREFIX", "flux"),
# Serializzazione
"serialize": {
"compress": True,
"algorithm": "gzip"
}
}
Operazioni Cache Base
Operazioni fondamentali per gestire la cache:
# Importa cache
from flux.cache import Cache
# Get - recupera valore
value = Cache.get("key")
value = Cache.get("key", "default_value")
# Put - salva valore con TTL
Cache.put("key", "value", seconds=3600) # 1 ora
Cache.put("key", "value", minutes=60)
Cache.put("key", "value", hours=1)
Cache.put("key", "value", days=1)
# Forever - salva senza scadenza
Cache.forever("key", "value")
# Remember - get o put se non esiste
value = Cache.remember("key", seconds=3600, lambda: expensive_operation())
# Remember forever
value = Cache.remember_forever("key", lambda: get_config_data())
# Has - controlla esistenza
if Cache.has("key"):
print("Key exists")
# Forget - rimuovi chiave
Cache.forget("key")
# Flush - pulisci tutta la cache
Cache.flush()
# Pull - get e rimuovi
value = Cache.pull("key", "default")
# Increment/Decrement
Cache.increment("counter")
Cache.increment("counter", 5)
Cache.decrement("counter")
Cache.decrement("counter", 2)
# Many operations
Cache.put_many({
"key1": "value1",
"key2": "value2",
"key3": "value3"
}, seconds=3600)
values = Cache.many(["key1", "key2", "key3"])
# {"key1": "value1", "key2": "value2", "key3": None}
Cache.forget_many(["key1", "key2", "key3"])
Cache Tags
Organizza e gestisci cache correlate con i tag:
# Cache con tag
Cache.tags(["users", "posts"]).put("user_posts_1", data, hours=2)
Cache.tags(["users"]).put("user_profile_1", profile, hours=1)
Cache.tags(["posts"]).put("post_comments_1", comments, minutes=30)
# Get con tag
user_posts = Cache.tags(["users", "posts"]).get("user_posts_1")
# Remember con tag
popular_posts = Cache.tags(["posts", "popular"]).remember(
"popular_posts",
hours=6,
lambda: Post.where("views", ">", 1000).order_by("views", "desc").limit(10).get()
)
# Flush per tag
Cache.tags(["users"]).flush() # Rimuove tutto cache taggato "users"
Cache.tags(["posts", "popular"]).flush() # Rimuove cache con entrambi i tag
# Operazioni multiple con tag
Cache.tags(["products"]).put_many({
"product_1": product_1_data,
"product_2": product_2_data,
"featured_products": featured_list
}, hours=4)
# Helper per gestione tag
class CacheTagManager:
@staticmethod
def invalidate_user_cache(user_id):
"""Invalida tutta la cache relativa a un utente"""
Cache.tags([f"user_{user_id}", "users"]).flush()
@staticmethod
def invalidate_post_cache(post_id):
"""Invalida cache di un post"""
Cache.tags([f"post_{post_id}", "posts"]).flush()
@staticmethod
def invalidate_category_cache(category_id):
"""Invalida cache di una categoria"""
Cache.tags([f"category_{category_id}", "categories"]).flush()
# Uso negli eventi Model
class Post(Model):
def saved(self):
# Invalida cache quando post viene salvato
CacheTagManager.invalidate_post_cache(self.id)
CacheTagManager.invalidate_category_cache(self.category_id)
# Invalida cache generali
Cache.tags(["posts", "popular"]).flush()
def deleted(self):
# Pulisci cache quando eliminato
CacheTagManager.invalidate_post_cache(self.id)
Cache Strategies
Strategie avanzate di caching:
# Cache-Aside Pattern
class UserService:
def get_user(self, user_id):
"""Cache-aside pattern"""
cache_key = f"user_{user_id}"
# Controlla cache
user = Cache.get(cache_key)
if user:
return user
# Cache miss - carica dal DB
user = User.find(user_id)
if user:
# Salva in cache
Cache.put(cache_key, user, hours=1)
return user
def update_user(self, user_id, data):
"""Write-through pattern"""
user = User.find(user_id)
user.update(data)
# Aggiorna cache immediatamente
cache_key = f"user_{user_id}"
Cache.put(cache_key, user, hours=1)
return user
def delete_user(self, user_id):
"""Write-around pattern"""
User.destroy(user_id)
# Rimuovi da cache
Cache.forget(f"user_{user_id}")
# Write-Behind Pattern
class WriteBackCache:
def __init__(self):
self.pending_writes = []
self.max_batch_size = 100
self.flush_interval = 300 # 5 minuti
def put(self, key, value, ttl=3600):
"""Scrive in cache e programma write-back"""
Cache.put(key, value, seconds=ttl)
# Aggiungi a pending writes
self.pending_writes.append({
"key": key,
"value": value,
"timestamp": time.time()
})
# Flush se necessario
if len(self.pending_writes) >= self.max_batch_size:
self.flush_to_database()
def flush_to_database(self):
"""Scrive dati pendenti nel database"""
if not self.pending_writes:
return
# Batch write to DB
for item in self.pending_writes:
self.write_to_db(item["key"], item["value"])
self.pending_writes.clear()
def write_to_db(self, key, value):
"""Implementa scrittura specifica nel DB"""
pass
# Cache Warming
class CacheWarmer:
def warm_popular_content(self):
"""Pre-carica contenuti popolari"""
# Posts popolari
popular_posts = Post.where("views", ">", 1000)\
.order_by("views", "desc")\
.limit(20).get()
Cache.tags(["posts", "popular"]).put(
"popular_posts",
popular_posts,
hours=6
)
# Categorie con post count
categories = Category.with_count("posts").get()
Cache.tags(["categories"]).put(
"categories_with_count",
categories,
hours=12
)
# Utenti attivi
active_users = User.where("last_login", ">", date_sub(now(), days=7))\
.count()
Cache.put("stats_active_users", active_users, hours=1)
def warm_user_data(self, user_id):
"""Pre-carica dati utente"""
user = User.with(["posts", "profile"]).find(user_id)
Cache.tags([f"user_{user_id}"]).put_many({
f"user_{user_id}": user,
f"user_posts_{user_id}": user.posts,
f"user_profile_{user_id}": user.profile
}, hours=2)
# Circuit Breaker Pattern
class CacheCircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
"""Chiama funzione con circuit breaker"""
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.timeout:
self.state = "HALF_OPEN"
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = "CLOSED"
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
Query Caching
Cache automatica per query database:
# Query caching nei Models
class User(Model):
def get_posts_cached(self):
"""Posts con cache automatica"""
return self.posts().remember(minutes=30)
def get_popular_posts(self):
"""Posts popolari con cache"""
return Cache.tags(["posts", f"user_{self.id}"])\
.remember(
f"user_popular_posts_{self.id}",
hours=2,
lambda: self.posts()
.where("views", ">", 100)
.order_by("views", "desc")
.limit(10)
.get()
)
# Query Builder con cache
users = User.where("active", True)\
.cache(minutes=15)\
.get()
# Cache con tag nelle query
posts = Post.where("published", True)\
.cache(minutes=30, tags=["posts", "published"])\
.get()
# Cache condizionale
users = User.when(should_cache, lambda q: q.cache(minutes=10))\
.get()
# Cache personalizzata
class CachedQuery:
def __init__(self, query, cache_key, ttl=3600, tags=None):
self.query = query
self.cache_key = cache_key
self.ttl = ttl
self.tags = tags or []
def get(self):
"""Esegue query con cache"""
cache_instance = Cache.tags(self.tags) if self.tags else Cache
return cache_instance.remember(
self.cache_key,
seconds=self.ttl,
lambda: self.query.get()
)
def first(self):
"""Primo risultato con cache"""
cache_instance = Cache.tags(self.tags) if self.tags else Cache
return cache_instance.remember(
f"{self.cache_key}_first",
seconds=self.ttl,
lambda: self.query.first()
)
def count(self):
"""Count con cache"""
cache_instance = Cache.tags(self.tags) if self.tags else Cache
return cache_instance.remember(
f"{self.cache_key}_count",
seconds=self.ttl,
lambda: self.query.count()
)
# Uso cached query
def get_active_users():
query = User.where("active", True).where("last_login", ">", date_sub(now(), days=30))
cached_query = CachedQuery(
query,
"active_users_last_30_days",
ttl=1800, # 30 minuti
tags=["users", "active"]
)
return cached_query.get()
# Invalidazione automatica
class User(Model):
def saved(self):
# Invalida cache users quando utente cambia
Cache.tags(["users"]).flush()
def deleted(self):
Cache.tags(["users"]).flush()
Response Caching
Cache delle response HTTP complete:
# Response caching middleware
class ResponseCacheMiddleware(Middleware):
def handle(self, request, next, minutes=60, tags=None):
# Genera chiave cache dalla richiesta
cache_key = self.generate_cache_key(request)
tags = tags.split(",") if tags else []
# Controlla cache esistente
cache_instance = Cache.tags(tags) if tags else Cache
cached_response = cache_instance.get(cache_key)
if cached_response:
return self.build_cached_response(cached_response)
# Esegui richiesta
response = next(request)
# Cache solo response di successo
if response.status == 200:
cache_data = {
"content": response.content,
"headers": dict(response.headers),
"status": response.status
}
cache_instance.put(cache_key, cache_data, minutes=int(minutes))
return response
def generate_cache_key(self, request):
"""Genera chiave cache dalla richiesta"""
components = [
request.method,
request.path,
request.query_string,
request.header("Accept-Language", ""),
str(Auth.id() if Auth.check() else "guest")
]
return f"response_cache:{hash(':'.join(components))}"
def build_cached_response(self, cache_data):
"""Ricostruisce response da cache"""
response = Response(cache_data["content"], cache_data["status"])
for key, value in cache_data["headers"].items():
response.headers[key] = value
# Aggiungi header cache
response.headers["X-Cache"] = "HIT"
return response
# Uso nelle route
@route("/blog")
@middleware("cache:30,blog,posts") # 30 min, tags: blog,posts
def blog_index():
posts = Post.published().with("author").order_by("created_at", "desc").get()
return render("blog.index", {"posts": posts})
@route("/api/stats")
@middleware("cache:5,api,stats") # 5 min, tags: api,stats
def api_stats():
return json_response({
"users_count": User.count(),
"posts_count": Post.count(),
"comments_count": Comment.count()
})
# Cache condizionale
class ConditionalCacheMiddleware(Middleware):
def handle(self, request, next, minutes=60):
# Non cachare per utenti loggati
if Auth.check():
return next(request)
# Non cachare richieste POST
if request.method != "GET":
return next(request)
# Non cachare se ci sono parametri query specifici
if request.has("no_cache"):
return next(request)
# Applica cache normale
return ResponseCacheMiddleware().handle(request, next, minutes)
# ETag caching
class ETagMiddleware(Middleware):
def handle(self, request, next):
response = next(request)
if response.status == 200:
# Genera ETag dal contenuto
etag = f'"{hash(response.content)}"'
response.headers["ETag"] = etag
# Controlla If-None-Match
if request.header("If-None-Match") == etag:
return Response("", 304) # Not Modified
return response
# Fragment caching nei template
# {% cache "sidebar", minutes=10, tags=["sidebar", "widgets"] %}
# {# Contenuto costoso da generare #}
# {% include "partials.sidebar" %}
# {% endcache %}
Distributed Caching
Cache distribuita per applicazioni multi-server:
# Distributed cache configuration
class DistributedCache:
def __init__(self, stores, strategy="replicate"):
self.stores = stores
self.strategy = strategy
self.hash_ring = ConsistentHashRing() if strategy == "sharding" else None
def get(self, key):
"""Get con fallback tra store"""
if self.strategy == "replicate":
# Prova tutti gli store in ordine
for store in self.stores:
try:
value = store.get(key)
if value is not None:
return value
except Exception:
continue
return None
elif self.strategy == "sharding":
# Usa consistent hashing
store = self.hash_ring.get_node(key)
return store.get(key)
def put(self, key, value, ttl=3600):
"""Put basato su strategia"""
if self.strategy == "replicate":
# Replica su tutti gli store
for store in self.stores:
try:
store.put(key, value, ttl)
except Exception as e:
log_error(f"Failed to replicate to store: {e}")
elif self.strategy == "sharding":
# Salva solo sullo store designato
store = self.hash_ring.get_node(key)
store.put(key, value, ttl)
def forget(self, key):
"""Rimuovi da tutti gli store"""
for store in self.stores:
try:
store.forget(key)
except Exception:
pass
# Consistent Hash Ring per sharding
class ConsistentHashRing:
def __init__(self, replicas=100):
self.replicas = replicas
self.ring = {}
self.sorted_keys = []
self.nodes = []
def add_node(self, node):
"""Aggiungi nodo al ring"""
self.nodes.append(node)
for i in range(self.replicas):
key = hash(f"{node}:{i}")
self.ring[key] = node
self.sorted_keys = sorted(self.ring.keys())
def remove_node(self, node):
"""Rimuovi nodo dal ring"""
self.nodes.remove(node)
for i in range(self.replicas):
key = hash(f"{node}:{i}")
del self.ring[key]
self.sorted_keys = sorted(self.ring.keys())
def get_node(self, key):
"""Trova nodo per chiave"""
if not self.ring:
return None
hashed_key = hash(key)
# Trova primo nodo >= hashed_key
for ring_key in self.sorted_keys:
if ring_key >= hashed_key:
return self.ring[ring_key]
# Wrap around al primo nodo
return self.ring[self.sorted_keys[0]]
# Cache replication con conflict resolution
class ReplicatedCache:
def __init__(self, primary_store, replica_stores):
self.primary = primary_store
self.replicas = replica_stores
def get(self, key):
"""Read from primary, fallback to replicas"""
try:
return self.primary.get(key)
except Exception:
# Fallback to replicas
for replica in self.replicas:
try:
value = replica.get(key)
if value is not None:
# Repair primary asynchronously
self.async_repair(key, value)
return value
except Exception:
continue
return None
def put(self, key, value, ttl=3600):
"""Write to primary, replicate async"""
# Write to primary first
self.primary.put(key, value, ttl)
# Async replication
self.async_replicate(key, value, ttl)
def async_replicate(self, key, value, ttl):
"""Replica asincrona"""
def replicate():
for replica in self.replicas:
try:
replica.put(key, value, ttl)
except Exception as e:
log_error(f"Replication failed: {e}")
# Esegui in background
threading.Thread(target=replicate).start()
def async_repair(self, key, value):
"""Ripara primary store"""
def repair():
try:
self.primary.put(key, value, 3600) # Default TTL
except Exception as e:
log_error(f"Cache repair failed: {e}")
threading.Thread(target=repair).start()
# Cache cluster awareness
class ClusterAwareCache:
def __init__(self):
self.local_cache = {}
self.cluster_nodes = self.discover_cluster_nodes()
def put(self, key, value, ttl=3600):
"""Put con invalidazione cluster"""
# Salva localmente
Cache.put(key, value, ttl)
# Notifica altri nodi del cluster
self.notify_cluster_invalidation(key)
def notify_cluster_invalidation(self, key):
"""Notifica invalidazione agli altri nodi"""
for node in self.cluster_nodes:
try:
# Invia notifica HTTP o via message queue
self.send_invalidation_message(node, key)
except Exception as e:
log_error(f"Failed to notify node {node}: {e}")
def handle_invalidation_message(self, key):
"""Gestisce messaggio di invalidazione"""
Cache.forget(key)
def discover_cluster_nodes(self):
"""Scopre altri nodi del cluster"""
# Implementa service discovery
# Es: Consul, etcd, database, etc.
return []
Cache Monitoring
Monitoraggio e analisi delle performance della cache:
# Cache monitoring e stats
class CacheMonitor:
def __init__(self):
self.stats = {
"hits": 0,
"misses": 0,
"puts": 0,
"deletes": 0,
"flushes": 0
}
self.start_time = time.time()
def record_hit(self, key):
"""Registra cache hit"""
self.stats["hits"] += 1
log_debug(f"Cache HIT: {key}")
def record_miss(self, key):
"""Registra cache miss"""
self.stats["misses"] += 1
log_debug(f"Cache MISS: {key}")
def record_put(self, key, size=0):
"""Registra cache put"""
self.stats["puts"] += 1
log_debug(f"Cache PUT: {key} ({size} bytes)")
def get_hit_ratio(self):
"""Calcola hit ratio"""
total = self.stats["hits"] + self.stats["misses"]
return (self.stats["hits"] / total * 100) if total > 0 else 0
def get_stats(self):
"""Statistiche complete"""
uptime = time.time() - self.start_time
return {
**self.stats,
"hit_ratio": round(self.get_hit_ratio(), 2),
"uptime_seconds": round(uptime),
"requests_per_second": round((self.stats["hits"] + self.stats["misses"]) / uptime, 2)
}
def reset_stats(self):
"""Reset statistiche"""
self.stats = {key: 0 for key in self.stats}
self.start_time = time.time()
# Monitored cache wrapper
class MonitoredCache:
def __init__(self, cache_store):
self.cache = cache_store
self.monitor = CacheMonitor()
def get(self, key, default=None):
"""Get con monitoring"""
value = self.cache.get(key)
if value is not None:
self.monitor.record_hit(key)
return value
else:
self.monitor.record_miss(key)
return default
def put(self, key, value, ttl=3600):
"""Put con monitoring"""
size = len(str(value).encode('utf-8'))
self.cache.put(key, value, ttl)
self.monitor.record_put(key, size)
def remember(self, key, ttl, callback):
"""Remember con monitoring"""
value = self.get(key)
if value is not None:
return value
# Cache miss - calcola valore
value = callback()
self.put(key, value, ttl)
return value
# Performance analyzer
class CachePerformanceAnalyzer:
def __init__(self):
self.slow_queries = []
self.hot_keys = {}
self.memory_usage = []
def analyze_slow_operations(self, threshold_ms=100):
"""Analizza operazioni lente"""
slow_operations = []
for operation in self.get_recent_operations():
if operation["duration_ms"] > threshold_ms:
slow_operations.append(operation)
return sorted(slow_operations, key=lambda x: x["duration_ms"], reverse=True)
def analyze_hot_keys(self, min_accesses=100):
"""Identifica chiavi più utilizzate"""
return {k: v for k, v in self.hot_keys.items() if v >= min_accesses}
def analyze_memory_usage(self):
"""Analizza utilizzo memoria"""
if not self.memory_usage:
return {}
return {
"current_mb": self.memory_usage[-1],
"peak_mb": max(self.memory_usage),
"average_mb": sum(self.memory_usage) / len(self.memory_usage),
"trend": "increasing" if self.memory_usage[-1] > self.memory_usage[0] else "decreasing"
}
def generate_report(self):
"""Genera report completo"""
return {
"slow_operations": self.analyze_slow_operations(),
"hot_keys": self.analyze_hot_keys(),
"memory_analysis": self.analyze_memory_usage(),
"recommendations": self.generate_recommendations()
}
def generate_recommendations(self):
"""Genera raccomandazioni ottimizzazione"""
recommendations = []
# Controlla hit ratio
hit_ratio = CacheMonitor().get_hit_ratio()
if hit_ratio < 70:
recommendations.append("Consider increasing cache TTL or reviewing cache strategy")
# Controlla hot keys
hot_keys = self.analyze_hot_keys()
if len(hot_keys) > 10:
recommendations.append("Consider implementing cache warming for hot keys")
# Controlla memoria
memory_analysis = self.analyze_memory_usage()
if memory_analysis.get("trend") == "increasing":
recommendations.append("Monitor memory usage - consider cache eviction policies")
return recommendations
# Cache dashboard
@route("/admin/cache/dashboard")
@middleware("auth", "admin")
def cache_dashboard():
monitor = CacheMonitor()
analyzer = CachePerformanceAnalyzer()
return render("admin.cache_dashboard", {
"stats": monitor.get_stats(),
"performance": analyzer.generate_report(),
"redis_info": get_redis_info() if config("cache.default") == "redis" else None
})
@route("/admin/cache/flush", methods=["POST"])
@middleware("auth", "admin")
def flush_cache():
Cache.flush()
return redirect("/admin/cache/dashboard").with("success", "Cache flushed successfully")
def get_redis_info():
"""Ottieni info Redis"""
try:
redis_client = Cache.store("redis").get_redis()
info = redis_client.info()
return {
"used_memory_human": info.get("used_memory_human"),
"connected_clients": info.get("connected_clients"),
"total_commands_processed": info.get("total_commands_processed"),
"keyspace_hits": info.get("keyspace_hits"),
"keyspace_misses": info.get("keyspace_misses")
}
except Exception:
return None
Esempio Completo: E-commerce Cache
# Sistema di cache completo per e-commerce
class EcommerceCacheService:
def __init__(self):
self.cache = Cache
self.monitor = CacheMonitor()
def get_product(self, product_id):
"""Prodotto con cache multi-layer"""
# Layer 1: Cache locale (in-memory)
local_key = f"product_local_{product_id}"
product = self.get_from_local_cache(local_key)
if product:
return product
# Layer 2: Cache distribuita (Redis)
redis_key = f"product_{product_id}"
product = self.cache.tags(["products"]).get(redis_key)
if product:
# Salva in cache locale
self.set_local_cache(local_key, product, 300) # 5 min
return product
# Layer 3: Database
product = Product.with(["category", "images", "reviews"]).find(product_id)
if product:
# Cache con TTL basato su popolarità
ttl = self.calculate_ttl(product)
self.cache.tags(["products", f"category_{product.category_id}"])\
.put(redis_key, product, seconds=ttl)
self.set_local_cache(local_key, product, 300)
return product
def get_category_products(self, category_id, page=1, filters=None):
"""Prodotti categoria con cache intelligente"""
# Genera chiave cache basata su filtri
cache_key = self.generate_category_cache_key(category_id, page, filters)
# Controlla cache
products = self.cache.tags(["products", f"category_{category_id}"])\
.get(cache_key)
if not products:
# Carica dal database
query = Product.where("category_id", category_id)\
.where("active", True)
# Applica filtri
if filters:
query = self.apply_filters(query, filters)
products = query.paginate(page=page, per_page=20)
# Cache con TTL variabile
ttl = 3600 if not filters else 1800 # Meno cache per filtri
self.cache.tags(["products", f"category_{category_id}"])\
.put(cache_key, products, seconds=ttl)
return products
def get_cart(self, user_id):
"""Carrello utente con cache ottimizzata"""
cache_key = f"cart_{user_id}"
# Cache molto breve per carrello (dati critici)
cart = self.cache.tags([f"user_{user_id}", "carts"])\
.get(cache_key)
if not cart:
cart = Cart.where("user_id", user_id)\
.with("items.product")\
.first()
if cart:
# Cache breve per dati transazionali
self.cache.tags([f"user_{user_id}", "carts"])\
.put(cache_key, cart, seconds=300) # 5 min
return cart
def get_user_recommendations(self, user_id):
"""Raccomandazioni utente con cache personalizzata"""
cache_key = f"recommendations_{user_id}"
recommendations = self.cache.tags([f"user_{user_id}", "recommendations"])\
.get(cache_key)
if not recommendations:
# Algoritmo raccomandazioni (costoso)
recommendations = self.calculate_recommendations(user_id)
# Cache lunga per raccomandazioni
self.cache.tags([f"user_{user_id}", "recommendations"])\
.put(cache_key, recommendations, hours=6)
return recommendations
def get_homepage_data(self):
"""Dati homepage con cache warming"""
cache_key = "homepage_data"
data = self.cache.tags(["homepage"]).get(cache_key)
if not data:
# Carica dati homepage
data = {
"featured_products": Product.where("featured", True)\
.limit(8).get(),
"categories": Category.where("show_homepage", True)\
.with_count("products")\
.get(),
"popular_products": Product.order_by("views", "desc")\
.limit(12).get(),
"recent_reviews": Review.with("user", "product")\
.order_by("created_at", "desc")\
.limit(6).get()
}
# Cache homepage per 1 ora
self.cache.tags(["homepage"]).put(cache_key, data, hours=1)
return data
def invalidate_product_cache(self, product_id):
"""Invalida cache prodotto e correlate"""
product = Product.find(product_id)
# Invalida cache specifiche
self.cache.forget(f"product_{product_id}")
self.cache.tags([f"category_{product.category_id}"]).flush()
# Invalida cache correlate
if product.featured:
self.cache.tags(["homepage"]).flush()
# Invalida raccomandazioni utenti che hanno visto questo prodotto
user_ids = ViewedProduct.where("product_id", product_id)\
.pluck("user_id")
for user_id in user_ids:
self.cache.tags([f"user_{user_id}", "recommendations"]).flush()
def warm_cache(self):
"""Pre-carica cache importante"""
# Homepage data
self.get_homepage_data()
# Prodotti popolari
popular_products = Product.order_by("views", "desc").limit(50).get()
for product in popular_products:
self.get_product(product.id)
# Categorie principali
main_categories = Category.where("level", 1).get()
for category in main_categories:
self.get_category_products(category.id)
log_info("Cache warming completed")
def calculate_ttl(self, product):
"""Calcola TTL basato su popolarità prodotto"""
if product.views > 10000:
return 7200 # 2 ore per prodotti molto popolari
elif product.views > 1000:
return 3600 # 1 ora per prodotti popolari
else:
return 1800 # 30 min per prodotti normali
def generate_category_cache_key(self, category_id, page, filters):
"""Genera chiave cache per categoria"""
components = [f"category_{category_id}", f"page_{page}"]
if filters:
# Ordina filtri per chiave consistente
sorted_filters = sorted(filters.items())
filter_string = "_".join([f"{k}_{v}" for k, v in sorted_filters])
components.append(filter_string)
return "_".join(components)
def apply_filters(self, query, filters):
"""Applica filtri alla query"""
if "price_min" in filters:
query = query.where("price", ">=", filters["price_min"])
if "price_max" in filters:
query = query.where("price", "<=", filters["price_max"])
if "brand" in filters:
query = query.where("brand", filters["brand"])
if "rating" in filters:
query = query.where("average_rating", ">=", filters["rating"])
return query
def calculate_recommendations(self, user_id):
"""Algoritmo raccomandazioni (placeholder)"""
# Implementa algoritmo collaborative filtering
# o content-based recommendations
return Product.limit(10).get()
# Integrazione con eventi
class Product(Model):
def saved(self):
# Invalida cache quando prodotto viene salvato
cache_service = EcommerceCacheService()
cache_service.invalidate_product_cache(self.id)
def deleted(self):
# Pulisci cache quando eliminato
cache_service = EcommerceCacheService()
cache_service.invalidate_product_cache(self.id)
# Scheduled task per cache warming
@task("hourly")
def warm_ecommerce_cache():
"""Task schedulato per cache warming"""
cache_service = EcommerceCacheService()
cache_service.warm_cache()
# Performance monitoring
@task("daily")
def analyze_cache_performance():
"""Analizza performance cache giornaliera"""
analyzer = CachePerformanceAnalyzer()
report = analyzer.generate_report()
# Invia report via email agli admin
send_mail(
config("admin.email"),
"Daily Cache Performance Report",
"emails.cache_report",
{"report": report}
)
Best Practices
🎯 Strategia
- Cache dati costosi da calcolare
- Usa TTL appropriati per tipo di dato
- Implementa cache warming per dati critici
- Monitora hit ratio e performance
🔧 Tecnica
- Usa tag per invalidazione intelligente
- Implementa cache multi-layer
- Gestisci cache stampeding
- Comprimi dati grandi
📊 Monitoring
- Traccia hit/miss ratio
- Monitora memory usage
- Identifica hot keys
- Analizza performance trends
⚡ Fantastico! Ora hai un sistema di cache completo e performante.
Completa il tuo percorso con le guide di Deployment
per mettere la tua applicazione in produzione.