diff --git a/core/tests.py b/core/tests.py
index 75c4e28..77aa620 100644
--- a/core/tests.py
+++ b/core/tests.py
@@ -2,6 +2,7 @@ from datetime import datetime
import json
from django.contrib.auth.models import User
+from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import Client, TestCase
from django.urls import reverse
from django.utils import timezone
@@ -47,6 +48,7 @@ class CalendarViewTests(TestCase):
self.assertEqual(response.status_code, 200)
self.assertContains(response, 'Manage your public event calendar securely')
self.assertContains(response, 'Generate a copy-ready widget snippet')
+ self.assertContains(response, 'Import a previous backup')
def test_embed_page_can_hide_header(self):
response = self.client.get(reverse('calendar_embed') + '?header=0')
@@ -65,6 +67,13 @@ class EventDashboardMutationTests(TestCase):
is_published=True,
)
+ def _backup_file(self, payload):
+ return SimpleUploadedFile(
+ 'events-backup.json',
+ json.dumps(payload).encode('utf-8'),
+ content_type='application/json',
+ )
+
def test_staff_can_export_event_backup(self):
self.client.login(username='staffer', password='pass12345')
response = self.client.get(reverse('event_export'))
@@ -77,6 +86,70 @@ class EventDashboardMutationTests(TestCase):
self.assertEqual(payload['events'][0]['slug'], self.event.slug)
self.assertEqual(payload['events'][0]['name'], self.event.name)
+ def test_staff_can_preview_event_backup_restore(self):
+ self.client.login(username='staffer', password='pass12345')
+ payload = {
+ 'version': 1,
+ 'project': 'Roadshow Calendar',
+ 'exported_at': '2026-04-01T10:30:00+00:00',
+ 'event_count': 1,
+ 'events': [
+ {
+ 'name': 'Recovered Market',
+ 'slug': 'recovered-market-2026-04-20',
+ 'location': 'Harbor Plaza',
+ 'start': '2026-04-20T10:00:00+00:00',
+ 'end': '2026-04-20T14:00:00+00:00',
+ 'event_url': 'https://example.com/recovered',
+ 'summary': 'Recovered from backup',
+ 'is_published': False,
+ }
+ ],
+ }
+
+ response = self.client.post(reverse('event_import_preview'), {'backup_file': self._backup_file(payload)})
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, 'Ready to replace the current calendar')
+ self.assertContains(response, 'Recovered Market')
+ self.assertContains(response, 'Confirm restore and replace events')
+ self.assertIn('event_backup_import_payload', self.client.session)
+
+ def test_staff_can_restore_event_backup(self):
+ self.client.login(username='staffer', password='pass12345')
+ payload = {
+ 'version': 1,
+ 'project': 'Roadshow Calendar',
+ 'exported_at': '2026-04-01T10:30:00+00:00',
+ 'event_count': 1,
+ 'events': [
+ {
+ 'name': 'Recovered Market',
+ 'slug': 'recovered-market-2026-04-20',
+ 'location': 'Harbor Plaza',
+ 'start': '2026-04-20T10:00:00+00:00',
+ 'end': '2026-04-20T14:00:00+00:00',
+ 'event_url': 'https://example.com/recovered',
+ 'summary': 'Recovered from backup',
+ 'is_published': False,
+ }
+ ],
+ }
+
+ preview_response = self.client.post(reverse('event_import_preview'), {'backup_file': self._backup_file(payload)})
+ self.assertEqual(preview_response.status_code, 200)
+
+ response = self.client.post(reverse('event_import_restore'))
+ self.assertRedirects(response, reverse('event_dashboard'))
+ self.assertEqual(Event.objects.count(), 1)
+
+ restored_event = Event.objects.get()
+ self.assertEqual(restored_event.name, 'Recovered Market')
+ self.assertEqual(restored_event.slug, 'recovered-market-2026-04-20')
+ self.assertEqual(restored_event.location, 'Harbor Plaza')
+ self.assertFalse(restored_event.is_published)
+ self.assertFalse(Event.objects.filter(pk=self.event.pk).exists())
+ self.assertNotIn('event_backup_import_payload', self.client.session)
+
def test_staff_can_open_edit_page(self):
self.client.login(username='staffer', password='pass12345')
response = self.client.get(reverse('event_edit', args=[self.event.slug]))
diff --git a/core/urls.py b/core/urls.py
index fd44bcf..0b6544c 100644
--- a/core/urls.py
+++ b/core/urls.py
@@ -9,6 +9,8 @@ from .views import (
event_dashboard_detail,
event_delete,
event_export,
+ event_import_preview,
+ event_import_restore,
event_detail,
event_edit,
event_list,
@@ -23,6 +25,8 @@ urlpatterns = [
path('events//', event_detail, name='event_detail'),
path('dashboard/events/', event_dashboard, name='event_dashboard'),
path('dashboard/events/export/', event_export, name='event_export'),
+ path('dashboard/events/import/preview/', event_import_preview, name='event_import_preview'),
+ path('dashboard/events/import/restore/', event_import_restore, name='event_import_restore'),
path('dashboard/events/new/', event_create, name='event_create'),
path('dashboard/events//edit/', event_edit, name='event_edit'),
path('dashboard/events//delete/', event_delete, name='event_delete'),
diff --git a/core/views.py b/core/views.py
index 615a255..c136494 100644
--- a/core/views.py
+++ b/core/views.py
@@ -5,8 +5,9 @@ import json
from django.contrib import messages
from django.contrib.auth.decorators import login_required
-from django.http import HttpResponse
from django.core.exceptions import PermissionDenied
+from django.db import transaction
+from django.http import HttpResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils import timezone
@@ -17,6 +18,7 @@ from .models import Event
PROJECT_NAME = 'Roadshow Calendar'
DEFAULT_META_DESCRIPTION = 'Track where your business will be each day with a polished public calendar and secure staff-only event management.'
+BACKUP_IMPORT_SESSION_KEY = 'event_backup_import_payload'
def _parse_month(month_value: str | None):
@@ -141,22 +143,158 @@ def _build_event_backup_payload():
}
+def _parse_backup_datetime(value, label):
+ if not isinstance(value, str) or not value.strip():
+ raise ValueError(f'{label} is missing.')
+ try:
+ parsed = datetime.fromisoformat(value)
+ except ValueError as exc:
+ raise ValueError(f'{label} is invalid.') from exc
+ if timezone.is_naive(parsed):
+ parsed = timezone.make_aware(parsed, timezone.get_current_timezone())
+ return parsed
+
+
+def _parse_backup_bool(value, label):
+ if isinstance(value, bool):
+ return value
+ if isinstance(value, int) and value in (0, 1):
+ return bool(value)
+ if isinstance(value, str):
+ normalized = value.strip().lower()
+ if normalized in {'1', 'true', 'yes', 'on'}:
+ return True
+ if normalized in {'0', 'false', 'no', 'off'}:
+ return False
+ raise ValueError(f'{label} must be true or false.')
+
+
+def _normalize_event_backup_payload(payload):
+ if not isinstance(payload, dict):
+ raise ValueError('Backup file must contain one JSON object.')
+ if payload.get('version') != 1:
+ raise ValueError('Only backup version 1 is supported right now.')
+
+ raw_events = payload.get('events')
+ if not isinstance(raw_events, list):
+ raise ValueError('Backup file is missing a valid events list.')
+
+ normalized_events = []
+ seen_slugs = set()
+ for index, raw_event in enumerate(raw_events, start=1):
+ if not isinstance(raw_event, dict):
+ raise ValueError(f'Event #{index} is invalid.')
+
+ name = str(raw_event.get('name') or '').strip()
+ if not name:
+ raise ValueError(f'Event #{index} is missing a name.')
+
+ slug = str(raw_event.get('slug') or '').strip()
+ if slug:
+ if slug in seen_slugs:
+ raise ValueError(f'Backup contains a duplicate slug: {slug}.')
+ seen_slugs.add(slug)
+
+ start = _parse_backup_datetime(raw_event.get('start'), f'Event #{index} start')
+ end = _parse_backup_datetime(raw_event.get('end'), f'Event #{index} end')
+ if end <= start:
+ raise ValueError(f'Event #{index} ends before it starts.')
+
+ normalized_events.append(
+ {
+ 'name': name,
+ 'slug': slug,
+ 'location': str(raw_event.get('location') or '').strip(),
+ 'start': start,
+ 'end': end,
+ 'event_url': str(raw_event.get('event_url') or '').strip(),
+ 'summary': str(raw_event.get('summary') or '').strip(),
+ 'is_published': _parse_backup_bool(raw_event.get('is_published', True), f'Event #{index} publication flag'),
+ }
+ )
+
+ declared_count = payload.get('event_count')
+ if declared_count is not None and declared_count != len(normalized_events):
+ raise ValueError('Backup event count does not match the number of event records.')
+
+ return {
+ 'version': 1,
+ 'project': str(payload.get('project') or PROJECT_NAME),
+ 'exported_at': payload.get('exported_at'),
+ 'event_count': len(normalized_events),
+ 'events': normalized_events,
+ }
+
+
+def _format_backup_timestamp(value):
+ if not value:
+ return 'Unknown'
+ try:
+ parsed = datetime.fromisoformat(value)
+ except (TypeError, ValueError):
+ return str(value)
+ if timezone.is_naive(parsed):
+ parsed = timezone.make_aware(parsed, timezone.get_current_timezone())
+ return timezone.localtime(parsed).strftime('%b %d, %Y, %I:%M %p %Z')
+
+
+def _build_event_import_preview(payload):
+ normalized = _normalize_event_backup_payload(payload)
+ published_count = sum(1 for event in normalized['events'] if event['is_published'])
+ preview_events = normalized['events'][:5]
+ return {
+ 'version': normalized['version'],
+ 'project': normalized['project'],
+ 'event_count': normalized['event_count'],
+ 'published_count': published_count,
+ 'draft_count': normalized['event_count'] - published_count,
+ 'exported_at': normalized['exported_at'],
+ 'exported_at_display': _format_backup_timestamp(normalized['exported_at']),
+ 'preview_events': preview_events,
+ 'remaining_count': max(normalized['event_count'] - len(preview_events), 0),
+ }
+
+
+def _get_pending_event_import_preview(request):
+ payload = request.session.get(BACKUP_IMPORT_SESSION_KEY)
+ if not payload:
+ return None
+ try:
+ return _build_event_import_preview(payload)
+ except ValueError:
+ request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
+ request.session.modified = True
+ return None
+
+
+def _build_dashboard_context(request, import_preview=None):
+ events = Event.objects.all().order_by('start', 'name')
+ return _base_context(
+ page_title='Manage events',
+ events=events,
+ dashboard_count=events.count(),
+ embed_base_url=_embed_base_url(request),
+ default_embed_month=timezone.localdate().replace(day=1).strftime('%Y-%m'),
+ pending_import=import_preview if import_preview is not None else _get_pending_event_import_preview(request),
+ )
+
+
+def _restore_events_from_backup(payload):
+ normalized = _normalize_event_backup_payload(payload)
+ with transaction.atomic():
+ Event.objects.all().delete()
+ for event_data in normalized['events']:
+ event = Event(**event_data)
+ event.full_clean()
+ event.save()
+ return normalized['event_count']
+
+
@login_required(login_url='login')
def event_dashboard(request):
if not request.user.is_staff:
raise PermissionDenied
- events = Event.objects.all().order_by('start', 'name')
- return render(
- request,
- 'core/event_dashboard.html',
- _base_context(
- page_title='Manage events',
- events=events,
- dashboard_count=events.count(),
- embed_base_url=_embed_base_url(request),
- default_embed_month=timezone.localdate().replace(day=1).strftime('%Y-%m'),
- ),
- )
+ return render(request, 'core/event_dashboard.html', _build_dashboard_context(request))
@login_required(login_url='login')
@@ -174,6 +312,60 @@ def event_export(request):
return response
+@login_required(login_url='login')
+def event_import_preview(request):
+ if not request.user.is_staff:
+ raise PermissionDenied
+ if request.method != 'POST':
+ return redirect('event_dashboard')
+
+ uploaded_file = request.FILES.get('backup_file')
+ if not uploaded_file:
+ messages.error(request, 'Choose a backup JSON file before previewing the restore.')
+ return redirect('event_dashboard')
+
+ try:
+ payload = json.loads(uploaded_file.read().decode('utf-8'))
+ preview = _build_event_import_preview(payload)
+ except (UnicodeDecodeError, json.JSONDecodeError, ValueError) as exc:
+ request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
+ request.session.modified = True
+ messages.error(request, f'Backup import failed: {exc}')
+ return redirect('event_dashboard')
+
+ request.session[BACKUP_IMPORT_SESSION_KEY] = payload
+ request.session.modified = True
+ messages.success(request, 'Backup file loaded. Review the restore preview below before replacing live events.')
+ return render(request, 'core/event_dashboard.html', _build_dashboard_context(request, import_preview=preview))
+
+
+@login_required(login_url='login')
+def event_import_restore(request):
+ if not request.user.is_staff:
+ raise PermissionDenied
+ if request.method != 'POST':
+ return redirect('event_dashboard')
+
+ payload = request.session.get(BACKUP_IMPORT_SESSION_KEY)
+ if not payload:
+ messages.error(request, 'Upload a backup JSON and preview it before running a restore.')
+ return redirect('event_dashboard')
+
+ try:
+ restored_count = _restore_events_from_backup(payload)
+ except ValueError as exc:
+ request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
+ request.session.modified = True
+ messages.error(request, f'Restore failed: {exc}')
+ return redirect('event_dashboard')
+
+ request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
+ request.session.modified = True
+ event_label = 'event' if restored_count == 1 else 'events'
+ messages.success(request, f'Restore complete. Replaced the live calendar with {restored_count} {event_label} from the backup.')
+ return redirect('event_dashboard')
+
+
@login_required(login_url='login')
def event_create(request):
if not request.user.is_staff: