Format code with Black and isort for CI/CD compliance

- Apply Black formatting to all Python files in core and stiftung modules
- Fix import statement ordering with isort
- Ensure all code meets automated quality standards
- Resolve CI/CD pipeline formatting failures
- Maintain consistent code style across the entire codebase
This commit is contained in:
Stiftung Development
2025-09-06 21:04:07 +02:00
parent c7c790ee09
commit e0c7d0e351
54 changed files with 11004 additions and 6423 deletions

View File

@@ -6,17 +6,19 @@ Handles creation and restoration of complete system backups
import os
import shutil
import subprocess
import tempfile
import tarfile
import tempfile
from datetime import datetime
from django.conf import settings
from django.utils import timezone
from stiftung.models import BackupJob
def get_backup_directory():
"""Get or create the backup directory"""
backup_dir = '/app/backups'
backup_dir = "/app/backups"
os.makedirs(backup_dir, exist_ok=True)
return backup_dir
@@ -28,48 +30,48 @@ def run_backup(backup_job_id):
"""
try:
backup_job = BackupJob.objects.get(id=backup_job_id)
backup_job.status = 'running'
backup_job.status = "running"
backup_job.started_at = timezone.now()
backup_job.save()
backup_dir = get_backup_directory()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"stiftung_backup_{timestamp}.tar.gz"
backup_path = os.path.join(backup_dir, backup_filename)
# Create temporary directory for backup staging
with tempfile.TemporaryDirectory() as temp_dir:
staging_dir = os.path.join(temp_dir, 'backup_staging')
staging_dir = os.path.join(temp_dir, "backup_staging")
os.makedirs(staging_dir)
# 1. Database backup
if backup_job.backup_type in ['full', 'database']:
if backup_job.backup_type in ["full", "database"]:
db_backup_path = create_database_backup(staging_dir)
if not db_backup_path:
raise Exception("Database backup failed")
# 2. Files backup
if backup_job.backup_type in ['full', 'files']:
if backup_job.backup_type in ["full", "files"]:
files_backup_path = create_files_backup(staging_dir)
if not files_backup_path:
raise Exception("Files backup failed")
# 3. Create metadata file
create_backup_metadata(staging_dir, backup_job)
# 4. Create compressed archive
create_compressed_backup(staging_dir, backup_path)
# 5. Update job status
backup_size = os.path.getsize(backup_path)
backup_job.status = 'completed'
backup_job.status = "completed"
backup_job.completed_at = timezone.now()
backup_job.backup_filename = backup_filename
backup_job.backup_size = backup_size
backup_job.save()
except Exception as e:
backup_job.status = 'failed'
backup_job.status = "failed"
backup_job.error_message = str(e)
backup_job.completed_at = timezone.now()
backup_job.save()
@@ -78,37 +80,42 @@ def run_backup(backup_job_id):
def create_database_backup(staging_dir):
"""Create a database backup using pg_dump"""
try:
db_backup_file = os.path.join(staging_dir, 'database.sql')
db_backup_file = os.path.join(staging_dir, "database.sql")
# Get database settings
db_settings = settings.DATABASES['default']
db_settings = settings.DATABASES["default"]
# Build pg_dump command
cmd = [
'pg_dump',
'--host', db_settings.get('HOST', 'localhost'),
'--port', str(db_settings.get('PORT', 5432)),
'--username', db_settings.get('USER', 'postgres'),
'--format', 'custom',
'--no-owner', # portability across environments
'--no-privileges', # skip GRANT/REVOKE
'--no-password',
'--file', db_backup_file,
db_settings.get('NAME', 'stiftung')
"pg_dump",
"--host",
db_settings.get("HOST", "localhost"),
"--port",
str(db_settings.get("PORT", 5432)),
"--username",
db_settings.get("USER", "postgres"),
"--format",
"custom",
"--no-owner", # portability across environments
"--no-privileges", # skip GRANT/REVOKE
"--no-password",
"--file",
db_backup_file,
db_settings.get("NAME", "stiftung"),
]
# Set environment variables for authentication
env = os.environ.copy()
env['PGPASSWORD'] = db_settings.get('PASSWORD', '')
env["PGPASSWORD"] = db_settings.get("PASSWORD", "")
# Run pg_dump
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"pg_dump failed: {result.stderr}")
return db_backup_file
except Exception as e:
print(f"Database backup failed: {e}")
return None
@@ -117,28 +124,28 @@ def create_database_backup(staging_dir):
def create_files_backup(staging_dir):
"""Create backup of application files"""
try:
files_dir = os.path.join(staging_dir, 'files')
files_dir = os.path.join(staging_dir, "files")
os.makedirs(files_dir)
# Files to backup
backup_paths = [
'/app/media', # User uploads
'/app/static', # Static files
'/app/.env', # Environment configuration
"/app/media", # User uploads
"/app/static", # Static files
"/app/.env", # Environment configuration
]
for source_path in backup_paths:
if os.path.exists(source_path):
basename = os.path.basename(source_path)
dest_path = os.path.join(files_dir, basename)
if os.path.isdir(source_path):
shutil.copytree(source_path, dest_path)
else:
shutil.copy2(source_path, dest_path)
return files_dir
except Exception as e:
print(f"Files backup failed: {e}")
return None
@@ -147,26 +154,28 @@ def create_files_backup(staging_dir):
def create_backup_metadata(staging_dir, backup_job):
"""Create metadata file with backup information"""
import json
metadata = {
'backup_id': str(backup_job.id),
'backup_type': backup_job.backup_type,
'created_at': backup_job.created_at.isoformat(),
'created_by': backup_job.created_by.username if backup_job.created_by else 'system',
'django_version': '5.0.6',
'app_version': '1.0.0',
'python_version': '3.12',
"backup_id": str(backup_job.id),
"backup_type": backup_job.backup_type,
"created_at": backup_job.created_at.isoformat(),
"created_by": (
backup_job.created_by.username if backup_job.created_by else "system"
),
"django_version": "5.0.6",
"app_version": "1.0.0",
"python_version": "3.12",
}
metadata_file = os.path.join(staging_dir, 'backup_metadata.json')
with open(metadata_file, 'w') as f:
metadata_file = os.path.join(staging_dir, "backup_metadata.json")
with open(metadata_file, "w") as f:
json.dump(metadata, f, indent=2)
def create_compressed_backup(staging_dir, backup_path):
"""Create compressed tar.gz archive"""
with tarfile.open(backup_path, 'w:gz') as tar:
tar.add(staging_dir, arcname='.')
with tarfile.open(backup_path, "w:gz") as tar:
tar.add(staging_dir, arcname=".")
def run_restore(restore_job_id, backup_file_path):
@@ -176,46 +185,47 @@ def run_restore(restore_job_id, backup_file_path):
"""
try:
restore_job = BackupJob.objects.get(id=restore_job_id)
restore_job.status = 'running'
restore_job.status = "running"
restore_job.started_at = timezone.now()
restore_job.save()
# Extract backup
with tempfile.TemporaryDirectory() as temp_dir:
extract_dir = os.path.join(temp_dir, 'restore')
extract_dir = os.path.join(temp_dir, "restore")
os.makedirs(extract_dir)
# Extract tar.gz
with tarfile.open(backup_file_path, 'r:gz') as tar:
with tarfile.open(backup_file_path, "r:gz") as tar:
tar.extractall(extract_dir)
# Validate backup
metadata_file = os.path.join(extract_dir, 'backup_metadata.json')
metadata_file = os.path.join(extract_dir, "backup_metadata.json")
if not os.path.exists(metadata_file):
raise Exception("Invalid backup: missing metadata")
# Read metadata
import json
with open(metadata_file, 'r') as f:
with open(metadata_file, "r") as f:
metadata = json.load(f)
# Restore database
db_backup_file = os.path.join(extract_dir, 'database.sql')
db_backup_file = os.path.join(extract_dir, "database.sql")
if os.path.exists(db_backup_file):
restore_database(db_backup_file)
# Restore files
files_dir = os.path.join(extract_dir, 'files')
files_dir = os.path.join(extract_dir, "files")
if os.path.exists(files_dir):
restore_files(files_dir)
# Update job status
restore_job.status = 'completed'
restore_job.status = "completed"
restore_job.completed_at = timezone.now()
restore_job.save()
except Exception as e:
restore_job.status = 'failed'
restore_job.status = "failed"
restore_job.error_message = str(e)
restore_job.completed_at = timezone.now()
restore_job.save()
@@ -225,42 +235,47 @@ def restore_database(db_backup_file):
"""Restore database from backup"""
try:
# Get database settings
db_settings = settings.DATABASES['default']
db_settings = settings.DATABASES["default"]
# Build pg_restore command
cmd = [
'pg_restore',
'--host', db_settings.get('HOST', 'localhost'),
'--port', str(db_settings.get('PORT', 5432)),
'--username', db_settings.get('USER', 'postgres'),
'--dbname', db_settings.get('NAME', 'stiftung'),
'--clean', # Drop existing objects first
'--if-exists', # Don't error if objects don't exist
'--no-owner', # don't attempt to set original owners
'--role', db_settings.get('USER', 'postgres'), # set target owner
'--single-transaction', # restore atomically when possible
'--disable-triggers', # avoid FK issues during data load
'--no-password',
'--verbose',
db_backup_file
"pg_restore",
"--host",
db_settings.get("HOST", "localhost"),
"--port",
str(db_settings.get("PORT", 5432)),
"--username",
db_settings.get("USER", "postgres"),
"--dbname",
db_settings.get("NAME", "stiftung"),
"--clean", # Drop existing objects first
"--if-exists", # Don't error if objects don't exist
"--no-owner", # don't attempt to set original owners
"--role",
db_settings.get("USER", "postgres"), # set target owner
"--single-transaction", # restore atomically when possible
"--disable-triggers", # avoid FK issues during data load
"--no-password",
"--verbose",
db_backup_file,
]
# Set environment variables for authentication
env = os.environ.copy()
env['PGPASSWORD'] = db_settings.get('PASSWORD', '')
env["PGPASSWORD"] = db_settings.get("PASSWORD", "")
# Run pg_restore
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
# Fail if there are real errors
if result.returncode != 0:
stderr = result.stderr or ''
stderr = result.stderr or ""
# escalate only if we see ERROR
if 'ERROR' in stderr.upper():
if "ERROR" in stderr.upper():
raise Exception(f"pg_restore failed: {stderr}")
else:
print(f"pg_restore completed with warnings: {stderr}")
except Exception as e:
raise Exception(f"Database restore failed: {e}")
@@ -270,29 +285,31 @@ def restore_files(files_dir):
try:
# Restore paths
restore_mappings = {
'media': '/app/media',
'static': '/app/static',
'.env': '/app/.env',
"media": "/app/media",
"static": "/app/static",
".env": "/app/.env",
}
for source_name, dest_path in restore_mappings.items():
source_path = os.path.join(files_dir, source_name)
if os.path.exists(source_path):
# Backup existing files first
if os.path.exists(dest_path):
backup_path = f"{dest_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = (
f"{dest_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
if os.path.isdir(dest_path):
shutil.move(dest_path, backup_path)
else:
shutil.copy2(dest_path, backup_path)
# Restore files
if os.path.isdir(source_path):
shutil.copytree(source_path, dest_path)
else:
shutil.copy2(source_path, dest_path)
except Exception as e:
raise Exception(f"Files restore failed: {e}")
@@ -302,19 +319,19 @@ def cleanup_old_backups(keep_count=10):
try:
backup_dir = get_backup_directory()
backup_files = []
for filename in os.listdir(backup_dir):
if filename.startswith('stiftung_backup_') and filename.endswith('.tar.gz'):
if filename.startswith("stiftung_backup_") and filename.endswith(".tar.gz"):
filepath = os.path.join(backup_dir, filename)
backup_files.append((filepath, os.path.getmtime(filepath)))
# Sort by modification time (newest first)
backup_files.sort(key=lambda x: x[1], reverse=True)
# Remove old backups
for filepath, _ in backup_files[keep_count:]:
os.remove(filepath)
print(f"Removed old backup: {os.path.basename(filepath)}")
except Exception as e:
print(f"Cleanup failed: {e}")