""" Backup and Restore Utilities Handles creation and restoration of complete system backups """ import os import shutil import subprocess import tarfile import tempfile from datetime import datetime from django.conf import settings from django.utils import timezone from stiftung.models import BackupJob def get_backup_directory(): """Get or create the backup directory""" backup_dir = "/app/backups" os.makedirs(backup_dir, exist_ok=True) return backup_dir def run_backup(backup_job_id): """ Run a backup job This runs in a separate thread to avoid blocking the web interface """ try: backup_job = BackupJob.objects.get(id=backup_job_id) backup_job.status = "running" backup_job.started_at = timezone.now() backup_job.save() backup_dir = get_backup_directory() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"stiftung_backup_{timestamp}.tar.gz" backup_path = os.path.join(backup_dir, backup_filename) # Create temporary directory for backup staging with tempfile.TemporaryDirectory() as temp_dir: staging_dir = os.path.join(temp_dir, "backup_staging") os.makedirs(staging_dir) # 1. Database backup if backup_job.backup_type in ["full", "database"]: db_backup_path = create_database_backup(staging_dir) if not db_backup_path: raise Exception("Database backup failed") # 2. Files backup if backup_job.backup_type in ["full", "files"]: files_backup_path = create_files_backup(staging_dir) if not files_backup_path: raise Exception("Files backup failed") # 3. Create metadata file create_backup_metadata(staging_dir, backup_job) # 4. Create compressed archive create_compressed_backup(staging_dir, backup_path) # 5. Update job status backup_size = os.path.getsize(backup_path) backup_job.status = "completed" backup_job.completed_at = timezone.now() backup_job.backup_filename = backup_filename backup_job.backup_size = backup_size backup_job.save() except Exception as e: backup_job.status = "failed" backup_job.error_message = str(e) backup_job.completed_at = timezone.now() backup_job.save() def create_database_backup(staging_dir): """Create a database backup using pg_dump""" try: db_backup_file = os.path.join(staging_dir, "database.sql") # Get database settings db_settings = settings.DATABASES["default"] # Build pg_dump command cmd = [ "pg_dump", "--host", db_settings.get("HOST", "localhost"), "--port", str(db_settings.get("PORT", 5432)), "--username", db_settings.get("USER", "postgres"), "--format", "custom", "--no-owner", # portability across environments "--no-privileges", # skip GRANT/REVOKE "--no-password", "--file", db_backup_file, db_settings.get("NAME", "stiftung"), ] # Set environment variables for authentication env = os.environ.copy() env["PGPASSWORD"] = db_settings.get("PASSWORD", "") # Run pg_dump result = subprocess.run(cmd, env=env, capture_output=True, text=True) if result.returncode != 0: raise Exception(f"pg_dump failed: {result.stderr}") return db_backup_file except Exception as e: print(f"Database backup failed: {e}") return None def create_files_backup(staging_dir): """Create backup of application files""" try: files_dir = os.path.join(staging_dir, "files") os.makedirs(files_dir) # Files to backup backup_paths = [ "/app/media", # User uploads "/app/static", # Static files "/app/.env", # Environment configuration ] for source_path in backup_paths: if os.path.exists(source_path): basename = os.path.basename(source_path) dest_path = os.path.join(files_dir, basename) if os.path.isdir(source_path): shutil.copytree(source_path, dest_path) else: shutil.copy2(source_path, dest_path) return files_dir except Exception as e: print(f"Files backup failed: {e}") return None def create_backup_metadata(staging_dir, backup_job): """Create metadata file with backup information""" import json metadata = { "backup_id": str(backup_job.id), "backup_type": backup_job.backup_type, "created_at": backup_job.created_at.isoformat(), "created_by": ( backup_job.created_by.username if backup_job.created_by else "system" ), "django_version": "5.0.6", "app_version": "1.0.0", "python_version": "3.12", } metadata_file = os.path.join(staging_dir, "backup_metadata.json") with open(metadata_file, "w") as f: json.dump(metadata, f, indent=2) def create_compressed_backup(staging_dir, backup_path): """Create compressed tar.gz archive""" with tarfile.open(backup_path, "w:gz") as tar: tar.add(staging_dir, arcname=".") def run_restore(restore_job_id, backup_file_path): """ Run a restore job This runs in a separate thread """ try: restore_job = BackupJob.objects.get(id=restore_job_id) restore_job.status = "running" restore_job.started_at = timezone.now() restore_job.save() # Extract backup with tempfile.TemporaryDirectory() as temp_dir: extract_dir = os.path.join(temp_dir, "restore") os.makedirs(extract_dir) # Extract tar.gz with tarfile.open(backup_file_path, "r:gz") as tar: tar.extractall(extract_dir) # Validate backup metadata_file = os.path.join(extract_dir, "backup_metadata.json") if not os.path.exists(metadata_file): raise Exception("Invalid backup: missing metadata") # Read metadata import json with open(metadata_file, "r") as f: metadata = json.load(f) # Restore database db_backup_file = os.path.join(extract_dir, "database.sql") if os.path.exists(db_backup_file): restore_database(db_backup_file) # Restore files files_dir = os.path.join(extract_dir, "files") if os.path.exists(files_dir): restore_files(files_dir) # Update job status restore_job.status = "completed" restore_job.completed_at = timezone.now() restore_job.save() except Exception as e: restore_job.status = "failed" restore_job.error_message = str(e) restore_job.completed_at = timezone.now() restore_job.save() def restore_database(db_backup_file): """Restore database from backup""" try: # Get database settings db_settings = settings.DATABASES["default"] # Build pg_restore command cmd = [ "pg_restore", "--host", db_settings.get("HOST", "localhost"), "--port", str(db_settings.get("PORT", 5432)), "--username", db_settings.get("USER", "postgres"), "--dbname", db_settings.get("NAME", "stiftung"), "--clean", # Drop existing objects first "--if-exists", # Don't error if objects don't exist "--no-owner", # don't attempt to set original owners "--role", db_settings.get("USER", "postgres"), # set target owner "--single-transaction", # restore atomically when possible "--disable-triggers", # avoid FK issues during data load "--no-password", "--verbose", db_backup_file, ] # Set environment variables for authentication env = os.environ.copy() env["PGPASSWORD"] = db_settings.get("PASSWORD", "") # Run pg_restore result = subprocess.run(cmd, env=env, capture_output=True, text=True) # Fail if there are real errors if result.returncode != 0: stderr = result.stderr or "" # escalate only if we see ERROR if "ERROR" in stderr.upper(): raise Exception(f"pg_restore failed: {stderr}") else: print(f"pg_restore completed with warnings: {stderr}") except Exception as e: raise Exception(f"Database restore failed: {e}") def restore_files(files_dir): """Restore application files""" try: # Restore paths restore_mappings = { "media": "/app/media", "static": "/app/static", ".env": "/app/.env", } for source_name, dest_path in restore_mappings.items(): source_path = os.path.join(files_dir, source_name) if os.path.exists(source_path): # Backup existing files first if os.path.exists(dest_path): backup_path = ( f"{dest_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}" ) if os.path.isdir(dest_path): shutil.move(dest_path, backup_path) else: shutil.copy2(dest_path, backup_path) # Restore files if os.path.isdir(source_path): shutil.copytree(source_path, dest_path) else: shutil.copy2(source_path, dest_path) except Exception as e: raise Exception(f"Files restore failed: {e}") def cleanup_old_backups(keep_count=10): """Clean up old backup files, keeping only the newest ones""" try: backup_dir = get_backup_directory() backup_files = [] for filename in os.listdir(backup_dir): if filename.startswith("stiftung_backup_") and filename.endswith(".tar.gz"): filepath = os.path.join(backup_dir, filename) backup_files.append((filepath, os.path.getmtime(filepath))) # Sort by modification time (newest first) backup_files.sort(key=lambda x: x[1], reverse=True) # Remove old backups for filepath, _ in backup_files[keep_count:]: os.remove(filepath) print(f"Removed old backup: {os.path.basename(filepath)}") except Exception as e: print(f"Cleanup failed: {e}")