""" Backup and Restore Utilities Handles creation and restoration of complete system backups """ import os import shutil import subprocess import tarfile import tempfile from datetime import datetime from django.conf import settings from django.utils import timezone from stiftung.models import BackupJob def get_backup_directory(): """Get or create the backup directory""" backup_dir = "/app/backups" os.makedirs(backup_dir, exist_ok=True) return backup_dir def run_backup(backup_job_id): """ Run a backup job This runs in a separate thread to avoid blocking the web interface """ try: backup_job = BackupJob.objects.get(id=backup_job_id) backup_job.status = "running" backup_job.started_at = timezone.now() backup_job.save() backup_dir = get_backup_directory() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"stiftung_backup_{timestamp}.tar.gz" backup_path = os.path.join(backup_dir, backup_filename) # Create temporary directory for backup staging with tempfile.TemporaryDirectory() as temp_dir: staging_dir = os.path.join(temp_dir, "backup_staging") os.makedirs(staging_dir) # 1. Database backup if backup_job.backup_type in ["full", "database"]: db_backup_path = create_database_backup(staging_dir) if not db_backup_path: raise Exception("Database backup failed") # 2. Files backup if backup_job.backup_type in ["full", "files"]: files_backup_path = create_files_backup(staging_dir) if not files_backup_path: raise Exception("Files backup failed") # 3. Create metadata file create_backup_metadata(staging_dir, backup_job) # 4. Create compressed archive create_compressed_backup(staging_dir, backup_path) # 5. Update job status backup_size = os.path.getsize(backup_path) backup_job.status = "completed" backup_job.completed_at = timezone.now() backup_job.backup_filename = backup_filename backup_job.backup_size = backup_size backup_job.save() except Exception as e: backup_job.status = "failed" backup_job.error_message = str(e) backup_job.completed_at = timezone.now() backup_job.save() def create_database_backup(staging_dir): """Create a database backup using pg_dump""" try: db_backup_file = os.path.join(staging_dir, "database.sql") # Get database settings db_settings = settings.DATABASES["default"] # Build pg_dump command cmd = [ "pg_dump", "--host", db_settings.get("HOST", "localhost"), "--port", str(db_settings.get("PORT", 5432)), "--username", db_settings.get("USER", "postgres"), "--format", "custom", "--no-owner", # portability across environments "--no-privileges", # skip GRANT/REVOKE "--no-password", "--file", db_backup_file, db_settings.get("NAME", "stiftung"), ] # Set environment variables for authentication env = os.environ.copy() env["PGPASSWORD"] = db_settings.get("PASSWORD", "") # Run pg_dump result = subprocess.run(cmd, env=env, capture_output=True, text=True) if result.returncode != 0: raise Exception(f"pg_dump failed: {result.stderr}") return db_backup_file except Exception as e: print(f"Database backup failed: {e}") return None def create_files_backup(staging_dir): """Create backup of application files""" try: files_dir = os.path.join(staging_dir, "files") os.makedirs(files_dir) # Files to backup backup_paths = [ "/app/media", # User uploads "/app/static", # Static files "/app/.env", # Environment configuration ] for source_path in backup_paths: if os.path.exists(source_path): basename = os.path.basename(source_path) dest_path = os.path.join(files_dir, basename) if os.path.isdir(source_path): shutil.copytree(source_path, dest_path) else: shutil.copy2(source_path, dest_path) return files_dir except Exception as e: print(f"Files backup failed: {e}") return None def create_backup_metadata(staging_dir, backup_job): """Create metadata file with backup information""" import json metadata = { "backup_id": str(backup_job.id), "backup_type": backup_job.backup_type, "created_at": backup_job.created_at.isoformat(), "created_by": ( backup_job.created_by.username if backup_job.created_by else "system" ), "django_version": "5.0.6", "app_version": "1.0.0", "python_version": "3.12", } metadata_file = os.path.join(staging_dir, "backup_metadata.json") with open(metadata_file, "w") as f: json.dump(metadata, f, indent=2) def create_compressed_backup(staging_dir, backup_path): """Create compressed tar.gz archive""" with tarfile.open(backup_path, "w:gz") as tar: tar.add(staging_dir, arcname=".") def run_restore(restore_job_id, backup_file_path): """ Run a restore job This runs in a separate thread """ try: restore_job = BackupJob.objects.get(id=restore_job_id) restore_job.status = "running" restore_job.started_at = timezone.now() restore_job.save() # Verify backup file exists if not os.path.exists(backup_file_path): raise Exception(f"Backup file not found: {backup_file_path}") # Extract backup with tempfile.TemporaryDirectory() as temp_dir: extract_dir = os.path.join(temp_dir, "restore") os.makedirs(extract_dir) # Extract tar.gz try: with tarfile.open(backup_file_path, "r:gz") as tar: tar.extractall(extract_dir) except Exception as e: raise Exception(f"Failed to extract backup file: {e}") # Validate backup metadata_files = [name for name in os.listdir(extract_dir) if name.endswith('backup_metadata.json')] if not metadata_files: raise Exception("Invalid backup: missing metadata file") # Read metadata import json try: metadata_file = os.path.join(extract_dir, metadata_files[0]) with open(metadata_file, "r") as f: metadata = json.load(f) print(f"Restoring backup created at: {metadata.get('created_at', 'unknown')}") except Exception as e: print(f"Warning: Could not read backup metadata: {e}") # Restore database db_backup_file = os.path.join(extract_dir, "database.sql") if os.path.exists(db_backup_file): print("Restoring database...") restore_database(db_backup_file) print("Database restore completed") else: print("No database backup found in archive") # Restore files files_dir = os.path.join(extract_dir, "files") if os.path.exists(files_dir): print("Restoring files...") restore_files(files_dir) print("Files restore completed") else: print("No files backup found in archive") # Update job status restore_job.status = "completed" restore_job.completed_at = timezone.now() restore_job.save() print(f"Restore job {restore_job_id} completed successfully") except Exception as e: print(f"Restore job {restore_job_id} failed: {e}") restore_job = BackupJob.objects.get(id=restore_job_id) restore_job.status = "failed" restore_job.error_message = str(e) restore_job.completed_at = timezone.now() restore_job.save() def restore_database(db_backup_file): """Restore database from backup""" try: print(f"Starting database restore from: {db_backup_file}") # Get database settings db_settings = settings.DATABASES["default"] print(f"Database settings: {db_settings.get('NAME')} at {db_settings.get('HOST')}:{db_settings.get('PORT')}") # First, try to determine if this is a custom format or SQL format # by checking if the file starts with binary data (custom format) is_custom_format = False try: with open(db_backup_file, 'rb') as f: header = f.read(8) # Custom format files start with 'PGDMP' followed by version info if header.startswith(b'PGDMP'): is_custom_format = True print(f"Detected custom format backup (header: {header})") else: print(f"Detected SQL format backup (header: {header})") except Exception as e: print(f"Could not determine backup format, assuming SQL: {e}") if is_custom_format: print("Using pg_restore for custom format") # Use pg_restore for custom format cmd = [ "pg_restore", "--host", db_settings.get("HOST", "localhost"), "--port", str(db_settings.get("PORT", 5432)), "--username", db_settings.get("USER", "postgres"), "--dbname", db_settings.get("NAME", "stiftung"), "--clean", # Drop existing objects first "--if-exists", # Don't error if objects don't exist "--no-owner", # don't attempt to set original owners "--role", db_settings.get("USER", "postgres"), # set target owner # Remove --single-transaction to allow partial restore even with configuration errors "--disable-triggers", # avoid FK issues during data load "--no-password", "--verbose", # Remove --exit-on-error to allow continuation after configuration warnings db_backup_file, ] else: print("Using psql for SQL format") # Use psql for SQL format cmd = [ "psql", "--host", db_settings.get("HOST", "localhost"), "--port", str(db_settings.get("PORT", 5432)), "--username", db_settings.get("USER", "postgres"), "--dbname", db_settings.get("NAME", "stiftung"), "--no-password", "--file", db_backup_file, ] print(f"Running command: {' '.join(cmd)}") # Set environment variables for authentication env = os.environ.copy() env["PGPASSWORD"] = db_settings.get("PASSWORD", "") # Run the restore command result = subprocess.run(cmd, env=env, capture_output=True, text=True) print(f"Command exit code: {result.returncode}") print(f"STDOUT length: {len(result.stdout)} chars") print(f"STDERR length: {len(result.stderr)} chars") # Show first 500 chars of output for debugging if result.stdout: print(f"STDOUT (first 500 chars): {result.stdout[:500]}...") if result.stderr: print(f"STDERR (first 500 chars): {result.stderr[:500]}...") # Handle different error conditions more gracefully if result.returncode != 0: stderr = result.stderr or "" stdout = result.stdout or "" # Check for known configuration parameter issues if "unrecognized configuration parameter" in stderr: print(f"Warning: Configuration parameter issues detected, but continuing: {stderr[:200]}...") # For configuration parameter issues, we'll consider this a warning, not a fatal error # if there are no other serious errors serious_errors = [line for line in stderr.split('\n') if 'ERROR' in line and 'unrecognized configuration parameter' not in line] if serious_errors: print(f"Serious errors found: {serious_errors}") raise Exception(f"pg_restore failed with serious errors: {'; '.join(serious_errors)}") else: print("Restore completed with configuration warnings (non-fatal)") elif "ERROR" in stderr.upper(): # Look for specific error patterns we can ignore ignorable_patterns = [ "already exists", "does not exist", "unrecognized configuration parameter" ] error_lines = [line for line in stderr.split('\n') if 'ERROR' in line] serious_errors = [] for error_line in error_lines: is_ignorable = any(pattern in error_line for pattern in ignorable_patterns) if not is_ignorable: serious_errors.append(error_line) if serious_errors: print(f"Serious errors found: {serious_errors}") raise Exception(f"Database restore failed with errors: {'; '.join(serious_errors)}") else: print(f"Restore completed with ignorable warnings") else: print(f"Restore completed with warnings but no errors") else: print("Database restore completed successfully with no errors") # Verify data was actually restored by checking table counts try: print("Verifying data was restored...") from django.db import connection with connection.cursor() as cursor: # Check some key tables test_tables = ['stiftung_person', 'stiftung_land', 'stiftung_destinataer'] for table in test_tables: try: cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] print(f"Table {table}: {count} rows") except Exception as e: print(f"Could not check table {table}: {e}") except Exception as e: print(f"Could not verify data restoration: {e}") except Exception as e: print(f"Database restore failed with exception: {e}") raise Exception(f"Database restore failed: {e}") def restore_files(files_dir): """Restore application files""" try: # Restore paths restore_mappings = { "media": "/app/media", "static": "/app/static", ".env": "/app/.env", } for source_name, dest_path in restore_mappings.items(): source_path = os.path.join(files_dir, source_name) if os.path.exists(source_path): # Backup existing files first if os.path.exists(dest_path): backup_path = ( f"{dest_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}" ) if os.path.isdir(dest_path): shutil.move(dest_path, backup_path) else: shutil.copy2(dest_path, backup_path) # Restore files if os.path.isdir(source_path): shutil.copytree(source_path, dest_path) else: shutil.copy2(source_path, dest_path) except Exception as e: raise Exception(f"Files restore failed: {e}") def cleanup_old_backups(keep_count=10): """Clean up old backup files, keeping only the newest ones""" try: backup_dir = get_backup_directory() backup_files = [] for filename in os.listdir(backup_dir): if filename.startswith("stiftung_backup_") and filename.endswith(".tar.gz"): filepath = os.path.join(backup_dir, filename) backup_files.append((filepath, os.path.getmtime(filepath))) # Sort by modification time (newest first) backup_files.sort(key=lambda x: x[1], reverse=True) # Remove old backups for filepath, _ in backup_files[keep_count:]: os.remove(filepath) print(f"Removed old backup: {os.path.basename(filepath)}") except Exception as e: print(f"Cleanup failed: {e}") def validate_backup_file(backup_file_path): """Validate that a backup file is valid and can be restored""" try: if not os.path.exists(backup_file_path): return False, "Backup file does not exist" if not backup_file_path.endswith('.tar.gz'): return False, "Invalid file format. Only .tar.gz files are supported" # Try to open and extract metadata with tempfile.TemporaryDirectory() as temp_dir: try: with tarfile.open(backup_file_path, "r:gz") as tar: # Check if it contains expected files names = tar.getnames() # Look for metadata file (could be with or without ./ prefix) metadata_files = [name for name in names if name.endswith('backup_metadata.json')] if not metadata_files: return False, "Invalid backup: missing metadata" # Extract and validate metadata metadata_file = metadata_files[0] tar.extract(metadata_file, temp_dir) extracted_metadata = os.path.join(temp_dir, metadata_file) import json with open(extracted_metadata, "r") as f: metadata = json.load(f) # Check metadata structure if "backup_type" not in metadata: return False, "Invalid backup metadata" created_at = metadata.get('created_at', 'unknown date') backup_type = metadata.get('backup_type', 'unknown type') return True, f"Valid {backup_type} backup from {created_at}" except tarfile.TarError as e: return False, f"Corrupted backup file: {e}" except json.JSONDecodeError: return False, "Invalid backup metadata format" except Exception as e: return False, f"Validation failed: {e}"