feat: email notifications + deadline reminder system
Database: - notification_preferences table (user_id, tenant_id, reminder days, email/digest toggles) - notifications table (type, entity link, read/sent tracking, dedup index) Backend: - NotificationService with background goroutine checking reminders hourly - CheckDeadlineReminders: finds deadlines due in N days per user prefs, creates notifications - Overdue deadline detection and notification - Daily digest at 8am: compiles pending notifications into one email - SendEmail via `m mail send` CLI command - Deduplication: same notification type + entity + day = skip - API: GET/PATCH notifications, unread count, mark read/all-read - API: GET/PUT notification-preferences with upsert Frontend: - NotificationBell in header with unread count badge (polls every 30s) - Dropdown panel with notification list, type-colored dots, time-ago, entity links - Mark individual/all as read - NotificationSettings in Einstellungen page: reminder day toggles, email toggle, digest toggle
This commit is contained in:
501
backend/internal/services/notification_service.go
Normal file
501
backend/internal/services/notification_service.go
Normal file
@@ -0,0 +1,501 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/lib/pq"
|
||||
|
||||
"mgit.msbls.de/m/KanzlAI-mGMT/internal/models"
|
||||
)
|
||||
|
||||
// NotificationService handles notification CRUD, deadline reminders, and email sending.
|
||||
type NotificationService struct {
|
||||
db *sqlx.DB
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewNotificationService creates a new notification service.
|
||||
func NewNotificationService(db *sqlx.DB) *NotificationService {
|
||||
return &NotificationService{
|
||||
db: db,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the background reminder checker (every hour) and daily digest (8am).
|
||||
func (s *NotificationService) Start() {
|
||||
s.wg.Add(1)
|
||||
go s.backgroundLoop()
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down background workers.
|
||||
func (s *NotificationService) Stop() {
|
||||
close(s.stopCh)
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
func (s *NotificationService) backgroundLoop() {
|
||||
defer s.wg.Done()
|
||||
|
||||
// Check reminders on startup
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
s.CheckDeadlineReminders(ctx)
|
||||
cancel()
|
||||
|
||||
reminderTicker := time.NewTicker(1 * time.Hour)
|
||||
defer reminderTicker.Stop()
|
||||
|
||||
// Digest ticker: check every 15 minutes, send at 8am
|
||||
digestTicker := time.NewTicker(15 * time.Minute)
|
||||
defer digestTicker.Stop()
|
||||
|
||||
var lastDigestDate string
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.stopCh:
|
||||
return
|
||||
case <-reminderTicker.C:
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
s.CheckDeadlineReminders(ctx)
|
||||
cancel()
|
||||
case now := <-digestTicker.C:
|
||||
today := now.Format("2006-01-02")
|
||||
hour := now.Hour()
|
||||
if hour >= 8 && lastDigestDate != today {
|
||||
lastDigestDate = today
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
s.SendDailyDigests(ctx)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CheckDeadlineReminders finds deadlines due in N days matching user preferences and creates notifications.
|
||||
func (s *NotificationService) CheckDeadlineReminders(ctx context.Context) {
|
||||
slog.Info("checking deadline reminders")
|
||||
|
||||
// Get all user preferences with email enabled
|
||||
var prefs []models.NotificationPreferences
|
||||
err := s.db.SelectContext(ctx, &prefs,
|
||||
`SELECT user_id, tenant_id, deadline_reminder_days, email_enabled, daily_digest, created_at, updated_at
|
||||
FROM notification_preferences`)
|
||||
if err != nil {
|
||||
slog.Error("failed to load notification preferences", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(prefs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Collect all unique reminder day values across all users
|
||||
daySet := make(map[int64]bool)
|
||||
for _, p := range prefs {
|
||||
for _, d := range p.DeadlineReminderDays {
|
||||
daySet[d] = true
|
||||
}
|
||||
}
|
||||
if len(daySet) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Build array of target dates
|
||||
today := time.Now().Truncate(24 * time.Hour)
|
||||
var targetDates []string
|
||||
dayToDate := make(map[string]int64)
|
||||
for d := range daySet {
|
||||
target := today.AddDate(0, 0, int(d))
|
||||
dateStr := target.Format("2006-01-02")
|
||||
targetDates = append(targetDates, dateStr)
|
||||
dayToDate[dateStr] = d
|
||||
}
|
||||
|
||||
// Also check overdue deadlines
|
||||
todayStr := today.Format("2006-01-02")
|
||||
|
||||
// Find pending deadlines matching target dates
|
||||
type deadlineRow struct {
|
||||
models.Deadline
|
||||
CaseTitle string `db:"case_title"`
|
||||
CaseNumber string `db:"case_number"`
|
||||
}
|
||||
|
||||
// Reminder deadlines (due in N days)
|
||||
var reminderDeadlines []deadlineRow
|
||||
query, args, err := sqlx.In(
|
||||
`SELECT d.*, c.title AS case_title, c.case_number
|
||||
FROM deadlines d
|
||||
JOIN cases c ON c.id = d.case_id
|
||||
WHERE d.status = 'pending' AND d.due_date IN (?)`,
|
||||
targetDates)
|
||||
if err == nil {
|
||||
query = s.db.Rebind(query)
|
||||
err = s.db.SelectContext(ctx, &reminderDeadlines, query, args...)
|
||||
}
|
||||
if err != nil {
|
||||
slog.Error("failed to query reminder deadlines", "error", err)
|
||||
}
|
||||
|
||||
// Overdue deadlines
|
||||
var overdueDeadlines []deadlineRow
|
||||
err = s.db.SelectContext(ctx, &overdueDeadlines,
|
||||
`SELECT d.*, c.title AS case_title, c.case_number
|
||||
FROM deadlines d
|
||||
JOIN cases c ON c.id = d.case_id
|
||||
WHERE d.status = 'pending' AND d.due_date < $1`, todayStr)
|
||||
if err != nil {
|
||||
slog.Error("failed to query overdue deadlines", "error", err)
|
||||
}
|
||||
|
||||
// Create notifications for each user based on their tenant and preferences
|
||||
for _, pref := range prefs {
|
||||
// Reminder notifications
|
||||
for _, dl := range reminderDeadlines {
|
||||
if dl.TenantID != pref.TenantID {
|
||||
continue
|
||||
}
|
||||
daysUntil := dayToDate[dl.DueDate]
|
||||
// Check if this user cares about this many days
|
||||
if !containsDay(pref.DeadlineReminderDays, daysUntil) {
|
||||
continue
|
||||
}
|
||||
|
||||
title := fmt.Sprintf("Frist in %d Tagen: %s", daysUntil, dl.Title)
|
||||
body := fmt.Sprintf("Akte %s — %s\nFällig am %s", dl.CaseNumber, dl.CaseTitle, dl.DueDate)
|
||||
entityType := "deadline"
|
||||
|
||||
s.CreateNotification(ctx, CreateNotificationInput{
|
||||
TenantID: pref.TenantID,
|
||||
UserID: pref.UserID,
|
||||
Type: "deadline_reminder",
|
||||
EntityType: &entityType,
|
||||
EntityID: &dl.ID,
|
||||
Title: title,
|
||||
Body: &body,
|
||||
SendEmail: pref.EmailEnabled && !pref.DailyDigest,
|
||||
})
|
||||
}
|
||||
|
||||
// Overdue notifications
|
||||
for _, dl := range overdueDeadlines {
|
||||
if dl.TenantID != pref.TenantID {
|
||||
continue
|
||||
}
|
||||
|
||||
title := fmt.Sprintf("Frist überfällig: %s", dl.Title)
|
||||
body := fmt.Sprintf("Akte %s — %s\nFällig seit %s", dl.CaseNumber, dl.CaseTitle, dl.DueDate)
|
||||
entityType := "deadline"
|
||||
|
||||
s.CreateNotification(ctx, CreateNotificationInput{
|
||||
TenantID: pref.TenantID,
|
||||
UserID: pref.UserID,
|
||||
Type: "deadline_overdue",
|
||||
EntityType: &entityType,
|
||||
EntityID: &dl.ID,
|
||||
Title: title,
|
||||
Body: &body,
|
||||
SendEmail: pref.EmailEnabled && !pref.DailyDigest,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SendDailyDigests compiles pending notifications into one email per user.
|
||||
func (s *NotificationService) SendDailyDigests(ctx context.Context) {
|
||||
slog.Info("sending daily digests")
|
||||
|
||||
// Find users with daily_digest enabled
|
||||
var prefs []models.NotificationPreferences
|
||||
err := s.db.SelectContext(ctx, &prefs,
|
||||
`SELECT user_id, tenant_id, deadline_reminder_days, email_enabled, daily_digest, created_at, updated_at
|
||||
FROM notification_preferences
|
||||
WHERE daily_digest = true AND email_enabled = true`)
|
||||
if err != nil {
|
||||
slog.Error("failed to load digest preferences", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, pref := range prefs {
|
||||
// Get unsent notifications for this user from the last 24 hours
|
||||
var notifications []models.Notification
|
||||
err := s.db.SelectContext(ctx, ¬ifications,
|
||||
`SELECT id, tenant_id, user_id, type, entity_type, entity_id, title, body, sent_at, read_at, created_at
|
||||
FROM notifications
|
||||
WHERE user_id = $1 AND tenant_id = $2 AND sent_at IS NULL
|
||||
AND created_at > now() - interval '24 hours'
|
||||
ORDER BY created_at DESC`,
|
||||
pref.UserID, pref.TenantID)
|
||||
if err != nil {
|
||||
slog.Error("failed to load unsent notifications", "error", err, "user_id", pref.UserID)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(notifications) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Get user email
|
||||
email := s.getUserEmail(ctx, pref.UserID)
|
||||
if email == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Build digest
|
||||
var lines []string
|
||||
lines = append(lines, fmt.Sprintf("Guten Morgen! Hier ist Ihre Tagesübersicht mit %d Benachrichtigungen:\n", len(notifications)))
|
||||
for _, n := range notifications {
|
||||
body := ""
|
||||
if n.Body != nil {
|
||||
body = " — " + *n.Body
|
||||
}
|
||||
lines = append(lines, fmt.Sprintf("• %s%s", n.Title, body))
|
||||
}
|
||||
lines = append(lines, "\n---\nKanzlAI Kanzleimanagement")
|
||||
|
||||
subject := fmt.Sprintf("KanzlAI Tagesübersicht — %d Benachrichtigungen", len(notifications))
|
||||
bodyText := strings.Join(lines, "\n")
|
||||
|
||||
if err := SendEmail(email, subject, bodyText); err != nil {
|
||||
slog.Error("failed to send digest email", "error", err, "user_id", pref.UserID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Mark all as sent
|
||||
ids := make([]uuid.UUID, len(notifications))
|
||||
for i, n := range notifications {
|
||||
ids[i] = n.ID
|
||||
}
|
||||
query, args, err := sqlx.In(
|
||||
`UPDATE notifications SET sent_at = now() WHERE id IN (?)`, ids)
|
||||
if err == nil {
|
||||
query = s.db.Rebind(query)
|
||||
_, err = s.db.ExecContext(ctx, query, args...)
|
||||
}
|
||||
if err != nil {
|
||||
slog.Error("failed to mark digest notifications sent", "error", err)
|
||||
}
|
||||
|
||||
slog.Info("sent daily digest", "user_id", pref.UserID, "count", len(notifications))
|
||||
}
|
||||
}
|
||||
|
||||
// CreateNotificationInput holds the data for creating a notification.
|
||||
type CreateNotificationInput struct {
|
||||
TenantID uuid.UUID
|
||||
UserID uuid.UUID
|
||||
Type string
|
||||
EntityType *string
|
||||
EntityID *uuid.UUID
|
||||
Title string
|
||||
Body *string
|
||||
SendEmail bool
|
||||
}
|
||||
|
||||
// CreateNotification stores a notification in the DB and optionally sends an email.
|
||||
func (s *NotificationService) CreateNotification(ctx context.Context, input CreateNotificationInput) (*models.Notification, error) {
|
||||
// Dedup: check if we already sent this notification today
|
||||
if input.EntityID != nil {
|
||||
var count int
|
||||
err := s.db.GetContext(ctx, &count,
|
||||
`SELECT COUNT(*) FROM notifications
|
||||
WHERE user_id = $1 AND entity_id = $2 AND type = $3
|
||||
AND created_at::date = CURRENT_DATE`,
|
||||
input.UserID, input.EntityID, input.Type)
|
||||
if err == nil && count > 0 {
|
||||
return nil, nil // Already notified today
|
||||
}
|
||||
}
|
||||
|
||||
var n models.Notification
|
||||
err := s.db.QueryRowxContext(ctx,
|
||||
`INSERT INTO notifications (tenant_id, user_id, type, entity_type, entity_id, title, body)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
RETURNING id, tenant_id, user_id, type, entity_type, entity_id, title, body, sent_at, read_at, created_at`,
|
||||
input.TenantID, input.UserID, input.Type, input.EntityType, input.EntityID,
|
||||
input.Title, input.Body).StructScan(&n)
|
||||
if err != nil {
|
||||
slog.Error("failed to create notification", "error", err)
|
||||
return nil, fmt.Errorf("create notification: %w", err)
|
||||
}
|
||||
|
||||
// Send email immediately if requested (non-digest users)
|
||||
if input.SendEmail {
|
||||
email := s.getUserEmail(ctx, input.UserID)
|
||||
if email != "" {
|
||||
go func() {
|
||||
if err := SendEmail(email, input.Title, derefStr(input.Body)); err != nil {
|
||||
slog.Error("failed to send notification email", "error", err, "user_id", input.UserID)
|
||||
} else {
|
||||
// Mark as sent
|
||||
_, _ = s.db.Exec(`UPDATE notifications SET sent_at = now() WHERE id = $1`, n.ID)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
return &n, nil
|
||||
}
|
||||
|
||||
// ListForUser returns notifications for a user in a tenant, paginated.
|
||||
func (s *NotificationService) ListForUser(ctx context.Context, tenantID, userID uuid.UUID, limit, offset int) ([]models.Notification, int, error) {
|
||||
if limit <= 0 {
|
||||
limit = 50
|
||||
}
|
||||
if limit > 200 {
|
||||
limit = 200
|
||||
}
|
||||
|
||||
var total int
|
||||
err := s.db.GetContext(ctx, &total,
|
||||
`SELECT COUNT(*) FROM notifications WHERE user_id = $1 AND tenant_id = $2`,
|
||||
userID, tenantID)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("count notifications: %w", err)
|
||||
}
|
||||
|
||||
var notifications []models.Notification
|
||||
err = s.db.SelectContext(ctx, ¬ifications,
|
||||
`SELECT id, tenant_id, user_id, type, entity_type, entity_id, title, body, sent_at, read_at, created_at
|
||||
FROM notifications
|
||||
WHERE user_id = $1 AND tenant_id = $2
|
||||
ORDER BY created_at DESC
|
||||
LIMIT $3 OFFSET $4`,
|
||||
userID, tenantID, limit, offset)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("list notifications: %w", err)
|
||||
}
|
||||
|
||||
return notifications, total, nil
|
||||
}
|
||||
|
||||
// UnreadCount returns the number of unread notifications for a user.
|
||||
func (s *NotificationService) UnreadCount(ctx context.Context, tenantID, userID uuid.UUID) (int, error) {
|
||||
var count int
|
||||
err := s.db.GetContext(ctx, &count,
|
||||
`SELECT COUNT(*) FROM notifications WHERE user_id = $1 AND tenant_id = $2 AND read_at IS NULL`,
|
||||
userID, tenantID)
|
||||
return count, err
|
||||
}
|
||||
|
||||
// MarkRead marks a single notification as read.
|
||||
func (s *NotificationService) MarkRead(ctx context.Context, tenantID, userID, notificationID uuid.UUID) error {
|
||||
result, err := s.db.ExecContext(ctx,
|
||||
`UPDATE notifications SET read_at = now()
|
||||
WHERE id = $1 AND user_id = $2 AND tenant_id = $3 AND read_at IS NULL`,
|
||||
notificationID, userID, tenantID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("mark notification read: %w", err)
|
||||
}
|
||||
rows, _ := result.RowsAffected()
|
||||
if rows == 0 {
|
||||
return fmt.Errorf("notification not found or already read")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarkAllRead marks all notifications as read for a user.
|
||||
func (s *NotificationService) MarkAllRead(ctx context.Context, tenantID, userID uuid.UUID) error {
|
||||
_, err := s.db.ExecContext(ctx,
|
||||
`UPDATE notifications SET read_at = now()
|
||||
WHERE user_id = $1 AND tenant_id = $2 AND read_at IS NULL`,
|
||||
userID, tenantID)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetPreferences returns notification preferences for a user, creating defaults if needed.
|
||||
func (s *NotificationService) GetPreferences(ctx context.Context, tenantID, userID uuid.UUID) (*models.NotificationPreferences, error) {
|
||||
var pref models.NotificationPreferences
|
||||
err := s.db.GetContext(ctx, &pref,
|
||||
`SELECT user_id, tenant_id, deadline_reminder_days, email_enabled, daily_digest, created_at, updated_at
|
||||
FROM notification_preferences
|
||||
WHERE user_id = $1 AND tenant_id = $2`,
|
||||
userID, tenantID)
|
||||
if err != nil {
|
||||
// Return defaults if no preferences set
|
||||
return &models.NotificationPreferences{
|
||||
UserID: userID,
|
||||
TenantID: tenantID,
|
||||
DeadlineReminderDays: pq.Int64Array{7, 3, 1},
|
||||
EmailEnabled: true,
|
||||
DailyDigest: false,
|
||||
}, nil
|
||||
}
|
||||
return &pref, nil
|
||||
}
|
||||
|
||||
// UpdatePreferences upserts notification preferences for a user.
|
||||
func (s *NotificationService) UpdatePreferences(ctx context.Context, tenantID, userID uuid.UUID, input UpdatePreferencesInput) (*models.NotificationPreferences, error) {
|
||||
var pref models.NotificationPreferences
|
||||
err := s.db.QueryRowxContext(ctx,
|
||||
`INSERT INTO notification_preferences (user_id, tenant_id, deadline_reminder_days, email_enabled, daily_digest)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (user_id, tenant_id)
|
||||
DO UPDATE SET deadline_reminder_days = $3, email_enabled = $4, daily_digest = $5, updated_at = now()
|
||||
RETURNING user_id, tenant_id, deadline_reminder_days, email_enabled, daily_digest, created_at, updated_at`,
|
||||
userID, tenantID, pq.Int64Array(input.DeadlineReminderDays), input.EmailEnabled, input.DailyDigest).StructScan(&pref)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("update preferences: %w", err)
|
||||
}
|
||||
return &pref, nil
|
||||
}
|
||||
|
||||
// UpdatePreferencesInput holds the data for updating notification preferences.
|
||||
type UpdatePreferencesInput struct {
|
||||
DeadlineReminderDays []int64 `json:"deadline_reminder_days"`
|
||||
EmailEnabled bool `json:"email_enabled"`
|
||||
DailyDigest bool `json:"daily_digest"`
|
||||
}
|
||||
|
||||
// SendEmail sends an email using the `m mail send` CLI command.
|
||||
func SendEmail(to, subject, body string) error {
|
||||
cmd := exec.Command("m", "mail", "send",
|
||||
"--to", to,
|
||||
"--subject", subject,
|
||||
"--body", body,
|
||||
"--yes")
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("m mail send failed: %w (output: %s)", err, string(output))
|
||||
}
|
||||
slog.Info("email sent", "to", to, "subject", subject)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getUserEmail looks up the email for a user from Supabase auth.users.
|
||||
func (s *NotificationService) getUserEmail(ctx context.Context, userID uuid.UUID) string {
|
||||
var email string
|
||||
err := s.db.GetContext(ctx, &email,
|
||||
`SELECT email FROM auth.users WHERE id = $1`, userID)
|
||||
if err != nil {
|
||||
slog.Error("failed to get user email", "error", err, "user_id", userID)
|
||||
return ""
|
||||
}
|
||||
return email
|
||||
}
|
||||
|
||||
func containsDay(arr pq.Int64Array, day int64) bool {
|
||||
for _, d := range arr {
|
||||
if d == day {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func derefStr(s *string) string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
return *s
|
||||
}
|
||||
Reference in New Issue
Block a user