Deployment
Deploy professionale di applicazioni Flux in produzione
Production Ready: Flux supporta deployment moderni con Docker, Kubernetes,
CI/CD automatizzato e monitoring completo per applicazioni scalabili e affidabili.
Preparazione Produzione
Ottimizza l'applicazione per l'ambiente di produzione:
# config/app.flux
APP_CONFIG = {
"name": "My Flux App",
"env": "production",
"debug": False,
"url": "https://myapp.com",
# Security
"key": env("APP_KEY"), # 32 caratteri casuali
"cipher": "AES-256-CBC",
# Timezone
"timezone": "Europe/Rome",
"locale": "it",
# Logging
"log_level": "warning",
"log_channel": "stack",
# Performance
"cache_compiled_templates": True,
"optimize_autoloader": True,
"route_cache": True,
"config_cache": True
}
# .env.production
APP_ENV=production
APP_DEBUG=false
APP_KEY=base64:your-32-character-random-key-here
# Database
DB_CONNECTION=mysql
DB_HOST=db.myapp.com
DB_PORT=3306
DB_DATABASE=myapp_prod
DB_USERNAME=myapp_user
DB_PASSWORD=secure_password
# Cache
CACHE_DRIVER=redis
REDIS_HOST=redis.myapp.com
REDIS_PASSWORD=redis_password
REDIS_PORT=6379
# Queue
QUEUE_CONNECTION=redis
QUEUE_FAILED_DRIVER=database
# Mail
MAIL_MAILER=smtp
MAIL_HOST=smtp.mailgun.org
MAIL_PORT=587
MAIL_USERNAME=postmaster@mg.myapp.com
MAIL_PASSWORD=mailgun_password
MAIL_ENCRYPTION=tls
# Monitoring
SENTRY_DSN=https://your-sentry-dsn@sentry.io/project
NEW_RELIC_LICENSE_KEY=your-newrelic-key
# Storage
AWS_ACCESS_KEY_ID=your-aws-key
AWS_SECRET_ACCESS_KEY=your-aws-secret
AWS_DEFAULT_REGION=eu-west-1
AWS_BUCKET=myapp-production
Server Setup
Configurazione server per applicazioni Flux:
# Nginx configuration
# /etc/nginx/sites-available/myapp.com
server {
listen 80;
listen [::]:80;
server_name myapp.com www.myapp.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
listen [::]:443 ssl http2;
server_name myapp.com www.myapp.com;
root /var/www/myapp/public;
index index.py;
# SSL Configuration
ssl_certificate /etc/letsencrypt/live/myapp.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/myapp.com/privkey.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512;
ssl_prefer_server_ciphers off;
# Security Headers
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header X-Content-Type-Options "nosniff" always;
add_header Referrer-Policy "no-referrer-when-downgrade" always;
add_header Content-Security-Policy "default-src 'self' http: https: data: blob: 'unsafe-inline'" always;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# Gzip Compression
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_proxied expired no-cache no-store private must-revalidate auth;
gzip_types
text/plain
text/css
text/xml
text/javascript
application/json
application/javascript
application/xml
application/rss+xml
application/atom+xml
image/svg+xml;
# Static files
location ~* \.(css|js|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ {
expires 1y;
add_header Cache-Control "public, immutable";
access_log off;
}
# Flux application
location / {
try_files $uri $uri/ /index.py$is_args$args;
}
# Security
location ~ /\.(?!well-known).* {
deny all;
}
# Rate limiting
limit_req_zone $binary_remote_addr zone=login:10m rate=5r/m;
location /auth/login {
limit_req zone=login burst=3 nodelay;
try_files $uri $uri/ /index.py$is_args$args;
}
}
# Systemd service
# /etc/systemd/system/myapp.service
[Unit]
Description=My Flux App
After=network.target
[Service]
Type=notify
User=www-data
Group=www-data
WorkingDirectory=/var/www/myapp
Environment=PATH=/var/www/myapp/venv/bin
ExecStart=/var/www/myapp/venv/bin/flux serve --host=127.0.0.1 --port=8000
ExecReload=/bin/kill -s HUP $MAINPID
KillMode=mixed
TimeoutStopSec=5
PrivateTmp=true
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
# Supervisor configuration (alternativa)
# /etc/supervisor/conf.d/myapp.conf
[program:myapp]
directory=/var/www/myapp
command=/var/www/myapp/venv/bin/flux serve --host=127.0.0.1 --port=8000
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/myapp.log
environment=PATH="/var/www/myapp/venv/bin"
Docker Deployment
Containerizzazione con Docker per deployment moderni:
# Dockerfile
FROM python:3.11-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
DEBIAN_FRONTEND=noninteractive
# Install system dependencies
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
build-essential \
curl \
libpq-dev \
libmariadb-dev \
nginx \
supervisor \
&& rm -rf /var/lib/apt/lists/*
# Create app user
RUN useradd --create-home --shell /bin/bash app
# Set work directory
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY --chown=app:app . .
# Create necessary directories
RUN mkdir -p /app/storage/logs /app/storage/cache \
&& chown -R app:app /app/storage
# Copy configuration files
COPY docker/nginx.conf /etc/nginx/sites-available/default
COPY docker/supervisor.conf /etc/supervisor/conf.d/app.conf
# Expose port
EXPOSE 80
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost/health || exit 1
# Start supervisor
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/supervisord.conf"]
# docker-compose.yml
version: '3.8'
services:
app:
build: .
container_name: myapp
restart: unless-stopped
ports:
- "80:80"
- "443:443"
environment:
- APP_ENV=production
- DB_HOST=db
- REDIS_HOST=redis
volumes:
- ./storage:/app/storage
- ./ssl:/etc/ssl/certs
depends_on:
- db
- redis
networks:
- app-network
db:
image: mysql:8.0
container_name: myapp-db
restart: unless-stopped
environment:
MYSQL_DATABASE: myapp
MYSQL_USER: myapp
MYSQL_PASSWORD: secure_password
MYSQL_ROOT_PASSWORD: root_password
volumes:
- db_data:/var/lib/mysql
- ./docker/mysql:/docker-entrypoint-initdb.d
networks:
- app-network
redis:
image: redis:7-alpine
container_name: myapp-redis
restart: unless-stopped
command: redis-server --appendonly yes --requirepass redis_password
volumes:
- redis_data:/data
networks:
- app-network
nginx:
image: nginx:alpine
container_name: myapp-nginx
restart: unless-stopped
ports:
- "80:80"
- "443:443"
volumes:
- ./docker/nginx:/etc/nginx/conf.d
- ./ssl:/etc/ssl/certs
- ./storage/logs/nginx:/var/log/nginx
depends_on:
- app
networks:
- app-network
volumes:
db_data:
redis_data:
networks:
app-network:
driver: bridge
# docker-compose.override.yml (per development)
version: '3.8'
services:
app:
environment:
- APP_ENV=local
- APP_DEBUG=true
volumes:
- .:/app
ports:
- "8000:8000"
command: flux serve --host=0.0.0.0 --port=8000 --reload
Kubernetes Deployment
Orchestrazione con Kubernetes per alta disponibilità:
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: myapp
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: myapp-config
namespace: myapp
data:
APP_ENV: "production"
APP_DEBUG: "false"
DB_CONNECTION: "mysql"
DB_HOST: "myapp-mysql"
DB_PORT: "3306"
DB_DATABASE: "myapp"
CACHE_DRIVER: "redis"
REDIS_HOST: "myapp-redis"
REDIS_PORT: "6379"
---
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: myapp-secrets
namespace: myapp
type: Opaque
stringData:
APP_KEY: "base64:your-32-character-random-key-here"
DB_USERNAME: "myapp"
DB_PASSWORD: "secure_password"
REDIS_PASSWORD: "redis_password"
---
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp
namespace: myapp
labels:
app: myapp
spec:
replicas: 3
selector:
matchLabels:
app: myapp
template:
metadata:
labels:
app: myapp
spec:
containers:
- name: myapp
image: myapp:latest
ports:
- containerPort: 8000
envFrom:
- configMapRef:
name: myapp-config
- secretRef:
name: myapp-secrets
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: storage
mountPath: /app/storage
volumes:
- name: storage
persistentVolumeClaim:
claimName: myapp-storage
---
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: myapp-service
namespace: myapp
spec:
selector:
app: myapp
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: ClusterIP
---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: myapp-ingress
namespace: myapp
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/rate-limit: "100"
nginx.ingress.kubernetes.io/rate-limit-window: "1m"
spec:
tls:
- hosts:
- myapp.com
secretName: myapp-tls
rules:
- host: myapp.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: myapp-service
port:
number: 80
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: myapp-hpa
namespace: myapp
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: myapp
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
CI/CD Pipeline
Automazione deployment con GitHub Actions:
# .github/workflows/ci-cd.yml
name: CI/CD Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
test:
runs-on: ubuntu-latest
services:
mysql:
image: mysql:8.0
env:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: test_db
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
ports:
- 3306:3306
redis:
image: redis:7
options: --health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Cache dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Copy environment file
run: cp .env.testing .env
- name: Run migrations
run: flux migrate --env=testing
- name: Run tests
run: |
flux test --coverage
flux test --parallel
- name: Code quality checks
run: |
flux lint
flux security-check
flux analyze --strict
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run security scan
uses: securecodewarrior/github-action-add-sarif@v1
with:
sarif-file: 'security-report.sarif'
- name: Dependency vulnerability scan
run: |
pip install safety
safety check --json --output security-report.json
build:
needs: [test, security]
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha,prefix={{branch}}-
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
deploy-staging:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/develop'
environment: staging
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
echo "Deploying to staging environment"
# kubectl apply -f k8s/staging/
# helm upgrade --install myapp-staging ./helm-chart --values values-staging.yaml
deploy-production:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
method: kubeconfig
kubeconfig: ${{ secrets.KUBE_CONFIG }}
- name: Deploy to production
run: |
kubectl set image deployment/myapp myapp=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} -n myapp
kubectl rollout status deployment/myapp -n myapp
- name: Run post-deployment tests
run: |
# Smoke tests
curl -f https://myapp.com/health
flux test:integration --env=production
- name: Notify deployment
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
channel: '#deployments'
webhook_url: ${{ secrets.SLACK_WEBHOOK }}
if: always()
rollback:
runs-on: ubuntu-latest
if: failure()
environment: production
steps:
- name: Rollback deployment
run: |
kubectl rollout undo deployment/myapp -n myapp
kubectl rollout status deployment/myapp -n myapp
Monitoring e Logging
Sistema completo di monitoring per produzione:
# config/monitoring.flux
MONITORING_CONFIG = {
# Application Performance Monitoring
"apm": {
"enabled": True,
"service_name": "myapp",
"service_version": "1.0.0",
"environment": env("APP_ENV"),
"sample_rate": 0.1, # 10% delle transazioni
},
# Metrics
"metrics": {
"enabled": True,
"namespace": "myapp",
"default_tags": {
"service": "myapp",
"version": "1.0.0",
"environment": env("APP_ENV")
}
},
# Health checks
"health": {
"endpoint": "/health",
"checks": [
"database",
"redis",
"queue",
"storage",
"external_api"
]
},
# Alerts
"alerts": {
"channels": ["email", "slack", "pagerduty"],
"thresholds": {
"response_time": 2000, # ms
"error_rate": 5, # %
"cpu_usage": 80, # %
"memory_usage": 85, # %
"disk_usage": 90 # %
}
}
}
# Health check endpoint
@route("/health")
def health_check():
"""Endpoint per health check"""
checks = {}
overall_status = "healthy"
# Database check
try:
DB.select("SELECT 1")
checks["database"] = {"status": "healthy", "latency_ms": 5}
except Exception as e:
checks["database"] = {"status": "unhealthy", "error": str(e)}
overall_status = "unhealthy"
# Redis check
try:
Cache.put("health_check", "ok", seconds=1)
Cache.get("health_check")
checks["redis"] = {"status": "healthy", "latency_ms": 2}
except Exception as e:
checks["redis"] = {"status": "unhealthy", "error": str(e)}
overall_status = "unhealthy"
# Queue check
try:
queue_size = Queue.size()
checks["queue"] = {"status": "healthy", "pending_jobs": queue_size}
except Exception as e:
checks["queue"] = {"status": "unhealthy", "error": str(e)}
overall_status = "unhealthy"
# External API check
try:
response = requests.get("https://api.external.com/health", timeout=5)
if response.status_code == 200:
checks["external_api"] = {"status": "healthy"}
else:
checks["external_api"] = {"status": "degraded", "status_code": response.status_code}
except Exception as e:
checks["external_api"] = {"status": "unhealthy", "error": str(e)}
return json_response({
"status": overall_status,
"timestamp": now().isoformat(),
"version": "1.0.0",
"uptime_seconds": get_uptime(),
"checks": checks
}, status=200 if overall_status == "healthy" else 503)
# Metrics collection
class MetricsCollector:
def __init__(self):
self.statsd = statsd.StatsClient(
host=config("STATSD_HOST", "localhost"),
port=config("STATSD_PORT", 8125),
prefix="myapp"
)
def increment(self, metric, value=1, tags=None):
"""Incrementa counter"""
self.statsd.incr(metric, value, tags=tags)
def gauge(self, metric, value, tags=None):
"""Set gauge value"""
self.statsd.gauge(metric, value, tags=tags)
def timing(self, metric, value, tags=None):
"""Record timing"""
self.statsd.timing(metric, value, tags=tags)
def histogram(self, metric, value, tags=None):
"""Record histogram value"""
self.statsd.histogram(metric, value, tags=tags)
# Middleware per metriche
class MetricsMiddleware(Middleware):
def __init__(self):
self.metrics = MetricsCollector()
def handle(self, request, next):
start_time = time.time()
# Incrementa request counter
self.metrics.increment("requests.total", tags=[
f"method:{request.method}",
f"endpoint:{request.route.name if request.route else 'unknown'}"
])
try:
response = next(request)
# Metriche response
duration = (time.time() - start_time) * 1000
self.metrics.timing("requests.duration", duration, tags=[
f"method:{request.method}",
f"status:{response.status}"
])
self.metrics.increment("requests.status", tags=[
f"status:{response.status}",
f"status_class:{response.status // 100}xx"
])
return response
except Exception as e:
# Metriche errori
self.metrics.increment("requests.errors", tags=[
f"method:{request.method}",
f"error:{type(e).__name__}"
])
raise e
# Custom logging
import structlog
# Structured logging configuration
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# Logging middleware
class StructuredLoggingMiddleware(Middleware):
def handle(self, request, next):
start_time = time.time()
request_id = str(uuid4())
# Context logging
logger.info("request_started",
request_id=request_id,
method=request.method,
url=request.url,
ip=request.ip,
user_agent=request.header("User-Agent"),
user_id=Auth.id() if Auth.check() else None)
try:
response = next(request)
duration = (time.time() - start_time) * 1000
logger.info("request_completed",
request_id=request_id,
status=response.status,
duration_ms=round(duration, 2),
response_size=len(response.content) if hasattr(response, 'content') else 0)
return response
except Exception as e:
duration = (time.time() - start_time) * 1000
logger.error("request_failed",
request_id=request_id,
error=str(e),
error_type=type(e).__name__,
duration_ms=round(duration, 2),
exc_info=True)
raise e
Performance Optimization
Ottimizzazioni per massime prestazioni:
# Optimization service
class PerformanceOptimizer:
def optimize_application(self):
"""Ottimizza l'applicazione per produzione"""
# Compile templates
self.compile_templates()
# Cache routes
self.cache_routes()
# Cache configuration
self.cache_config()
# Optimize autoloader
self.optimize_autoloader()
# Minify assets
self.minify_assets()
# Generate sitemap
self.generate_sitemap()
def compile_templates(self):
"""Pre-compila tutti i template"""
template_paths = glob.glob("resources/views/**/*.flux.html", recursive=True)
for template_path in template_paths:
# Compile template to bytecode
compiled = compile_template(template_path)
cache_path = template_path.replace(".flux.html", ".compiled")
with open(cache_path, "wb") as f:
f.write(compiled)
def cache_routes(self):
"""Cache route definitions"""
routes = self.discover_routes()
route_cache = {
"routes": routes,
"compiled_at": now().isoformat(),
"version": get_app_version()
}
with open("bootstrap/cache/routes.json", "w") as f:
json.dump(route_cache, f)
def cache_config(self):
"""Cache configuration files"""
config_data = {}
config_files = glob.glob("config/*.flux")
for config_file in config_files:
name = os.path.basename(config_file).replace(".flux", "")
config_data[name] = load_config(config_file)
with open("bootstrap/cache/config.json", "w") as f:
json.dump(config_data, f)
def optimize_autoloader(self):
"""Ottimizza autoloader delle classi"""
# Genera mappa delle classi
class_map = self.generate_class_map()
with open("bootstrap/cache/classes.json", "w") as f:
json.dump(class_map, f)
def minify_assets(self):
"""Minifica CSS e JavaScript"""
# CSS minification
css_files = glob.glob("public/css/*.css")
minified_css = self.minify_css_files(css_files)
with open("public/css/app.min.css", "w") as f:
f.write(minified_css)
# JavaScript minification
js_files = glob.glob("public/js/*.js")
minified_js = self.minify_js_files(js_files)
with open("public/js/app.min.js", "w") as f:
f.write(minified_js)
# Generate manifest
self.generate_asset_manifest()
def generate_asset_manifest(self):
"""Genera manifest degli asset per cache busting"""
manifest = {}
# CSS files
css_files = glob.glob("public/css/*.css")
for css_file in css_files:
hash_value = self.get_file_hash(css_file)
name = os.path.basename(css_file)
manifest[f"css/{name}"] = f"css/{name}?v={hash_value}"
# JS files
js_files = glob.glob("public/js/*.js")
for js_file in js_files:
hash_value = self.get_file_hash(js_file)
name = os.path.basename(js_file)
manifest[f"js/{name}"] = f"js/{name}?v={hash_value}"
with open("public/mix-manifest.json", "w") as f:
json.dump(manifest, f)
# Database optimization
class DatabaseOptimizer:
def optimize_queries(self):
"""Ottimizza query frequenti"""
# Analyze slow queries
slow_queries = self.analyze_slow_queries()
for query in slow_queries:
self.optimize_query(query)
def create_indexes(self):
"""Crea indici per performance"""
indexes = [
# User indexes
("users", "email"),
("users", "created_at"),
("users", ["active", "created_at"]),
# Post indexes
("posts", "published_at"),
("posts", ["status", "published_at"]),
("posts", "user_id"),
# Comment indexes
("comments", "post_id"),
("comments", ["post_id", "created_at"]),
]
for table, columns in indexes:
self.create_index_if_not_exists(table, columns)
def optimize_tables(self):
"""Ottimizza tabelle database"""
tables = ["users", "posts", "comments", "sessions"]
for table in tables:
# Analyze table
DB.statement(f"ANALYZE TABLE {table}")
# Optimize table (MySQL)
if DB.connection().driver_name == "mysql":
DB.statement(f"OPTIMIZE TABLE {table}")
# CDN integration
class CDNService:
def __init__(self):
self.cdn_url = config("CDN_URL")
self.s3_client = boto3.client('s3')
def upload_assets(self):
"""Upload assets to CDN"""
asset_files = [
*glob.glob("public/css/**/*", recursive=True),
*glob.glob("public/js/**/*", recursive=True),
*glob.glob("public/images/**/*", recursive=True),
*glob.glob("public/fonts/**/*", recursive=True)
]
for asset_file in asset_files:
if os.path.isfile(asset_file):
self.upload_to_s3(asset_file)
def upload_to_s3(self, file_path):
"""Upload file to S3"""
key = file_path.replace("public/", "")
self.s3_client.upload_file(
file_path,
config("AWS_BUCKET"),
key,
ExtraArgs={
'ContentType': self.get_content_type(file_path),
'CacheControl': 'max-age=31536000' # 1 year
}
)
def invalidate_cache(self, paths):
"""Invalida cache CloudFront"""
cloudfront = boto3.client('cloudfront')
response = cloudfront.create_invalidation(
DistributionId=config("CLOUDFRONT_DISTRIBUTION_ID"),
InvalidationBatch={
'Paths': {
'Quantity': len(paths),
'Items': paths
},
'CallerReference': str(uuid4())
}
)
return response
Backup e Recovery
Strategie di backup e disaster recovery:
# Backup service
class BackupService:
def __init__(self):
self.s3_client = boto3.client('s3')
self.backup_bucket = config("BACKUP_BUCKET")
def create_full_backup(self):
"""Backup completo dell'applicazione"""
backup_id = f"backup_{now().strftime('%Y%m%d_%H%M%S')}"
# Database backup
db_backup = self.backup_database(backup_id)
# Files backup
files_backup = self.backup_files(backup_id)
# Configuration backup
config_backup = self.backup_configuration(backup_id)
# Create backup manifest
manifest = {
"backup_id": backup_id,
"timestamp": now().isoformat(),
"type": "full",
"database": db_backup,
"files": files_backup,
"configuration": config_backup,
"app_version": get_app_version()
}
self.upload_manifest(backup_id, manifest)
return backup_id
def backup_database(self, backup_id):
"""Backup database"""
dump_file = f"/tmp/{backup_id}_database.sql"
# MySQL dump
if DB.connection().driver_name == "mysql":
cmd = [
"mysqldump",
f"--host={config('DB_HOST')}",
f"--user={config('DB_USERNAME')}",
f"--password={config('DB_PASSWORD')}",
"--single-transaction",
"--routines",
"--triggers",
config('DB_DATABASE')
]
with open(dump_file, 'w') as f:
subprocess.run(cmd, stdout=f, check=True)
# Compress and upload
compressed_file = f"{dump_file}.gz"
with open(dump_file, 'rb') as f_in:
with gzip.open(compressed_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Upload to S3
s3_key = f"backups/{backup_id}/database.sql.gz"
self.s3_client.upload_file(compressed_file, self.backup_bucket, s3_key)
# Cleanup
os.remove(dump_file)
os.remove(compressed_file)
return {
"file": s3_key,
"size": os.path.getsize(compressed_file),
"checksum": self.calculate_checksum(compressed_file)
}
def backup_files(self, backup_id):
"""Backup storage files"""
storage_dirs = ["storage/app", "storage/logs", "public/uploads"]
backup_files = []
for storage_dir in storage_dirs:
if os.path.exists(storage_dir):
tar_file = f"/tmp/{backup_id}_{storage_dir.replace('/', '_')}.tar.gz"
with tarfile.open(tar_file, "w:gz") as tar:
tar.add(storage_dir, arcname=os.path.basename(storage_dir))
# Upload to S3
s3_key = f"backups/{backup_id}/{os.path.basename(tar_file)}"
self.s3_client.upload_file(tar_file, self.backup_bucket, s3_key)
backup_files.append({
"directory": storage_dir,
"file": s3_key,
"size": os.path.getsize(tar_file)
})
os.remove(tar_file)
return backup_files
def restore_from_backup(self, backup_id):
"""Restore da backup"""
# Download manifest
manifest = self.download_manifest(backup_id)
# Restore database
self.restore_database(manifest["database"])
# Restore files
for file_backup in manifest["files"]:
self.restore_files(file_backup)
# Restore configuration
self.restore_configuration(manifest["configuration"])
log_info(f"Restore completed from backup {backup_id}")
def schedule_backups(self):
"""Configura backup schedulati"""
# Daily backup at 2 AM
schedule.every().day.at("02:00").do(self.create_incremental_backup)
# Weekly full backup on Sunday at 1 AM
schedule.every().sunday.at("01:00").do(self.create_full_backup)
# Monthly archive
schedule.every().month.do(self.archive_old_backups)
# Disaster recovery
class DisasterRecoveryService:
def __init__(self):
self.backup_service = BackupService()
self.health_checker = HealthChecker()
def create_recovery_plan(self):
"""Crea piano di disaster recovery"""
return {
"rto": 4, # Recovery Time Objective (hours)
"rpo": 1, # Recovery Point Objective (hours)
"procedures": [
"1. Assess the extent of the disaster",
"2. Activate the disaster recovery team",
"3. Set up backup infrastructure",
"4. Restore from latest backup",
"5. Redirect traffic to backup site",
"6. Verify system functionality",
"7. Monitor system performance",
"8. Plan for primary site recovery"
],
"contacts": [
{"role": "Team Lead", "phone": "+39 xxx xxx xxxx"},
{"role": "DevOps", "phone": "+39 xxx xxx xxxx"},
{"role": "Database Admin", "phone": "+39 xxx xxx xxxx"}
],
"resources": {
"backup_site": "AWS eu-west-1",
"dns_provider": "Cloudflare",
"monitoring": "DataDog",
"communication": "Slack #incidents"
}
}
def failover_to_backup(self):
"""Failover al sito di backup"""
# Update DNS to point to backup
self.update_dns_records()
# Start backup infrastructure
self.start_backup_infrastructure()
# Restore latest backup
latest_backup = self.backup_service.get_latest_backup()
self.backup_service.restore_from_backup(latest_backup)
# Verify functionality
if self.health_checker.check_all_systems():
log_info("Failover completed successfully")
self.notify_team("Failover completed")
else:
log_error("Failover failed - manual intervention required")
self.alert_team("Failover failed")
# Monitoring del backup
@task("daily")
def verify_backups():
"""Verifica integrità backup giornalieri"""
backup_service = BackupService()
recent_backups = backup_service.get_recent_backups(days=7)
for backup in recent_backups:
if backup_service.verify_backup_integrity(backup):
log_info(f"Backup {backup['id']} verified successfully")
else:
log_error(f"Backup {backup['id']} integrity check failed")
alert_admin(f"Backup integrity issue: {backup['id']}")
@task("weekly")
def test_restore_procedure():
"""Test restore procedure settimanale"""
backup_service = BackupService()
# Create test environment
test_env = create_test_environment()
# Restore latest backup to test environment
latest_backup = backup_service.get_latest_backup()
backup_service.restore_to_environment(latest_backup, test_env)
# Run smoke tests
if run_smoke_tests(test_env):
log_info("Restore test passed")
else:
log_error("Restore test failed")
alert_admin("Restore procedure test failed")
# Cleanup test environment
cleanup_test_environment(test_env)
Best Practices
🔒 Sicurezza
- Usa HTTPS sempre in produzione
- Configura security headers
- Implementa rate limiting
- Scansiona vulnerabilità regolarmente
⚡ Performance
- Ottimizza asset e database
- Usa CDN per contenuti statici
- Implementa caching efficace
- Monitora metriche chiave
🛡️ Affidabilità
- Backup automatici e testati
- Health check completi
- Auto-scaling configurato
- Piano disaster recovery
🚀 Congratulazioni! Hai completato la documentazione di Flux!
Ora hai tutti gli strumenti per creare applicazioni web moderne, scalabili e sicure
con deployment professionale in produzione.