589 lines
20 KiB
Python
589 lines
20 KiB
Python
import calendar
|
||
from collections import defaultdict
|
||
from datetime import datetime, time, timedelta
|
||
import json
|
||
|
||
from django.contrib import messages
|
||
from django.contrib.auth.decorators import login_required
|
||
from django.core.exceptions import PermissionDenied
|
||
from django.db import transaction
|
||
from django.http import HttpResponse
|
||
from django.shortcuts import get_object_or_404, redirect, render
|
||
from django.urls import reverse
|
||
from django.utils import timezone
|
||
|
||
from .forms import EventForm
|
||
from .models import Event
|
||
|
||
|
||
PROJECT_NAME = 'Roadshow Calendar'
|
||
DEFAULT_META_DESCRIPTION = 'Track where your business will be each day with a polished public calendar and secure staff-only event management.'
|
||
BACKUP_IMPORT_SESSION_KEY = 'event_backup_import_payload'
|
||
|
||
|
||
def _parse_month(month_value: str | None):
|
||
today = timezone.localdate()
|
||
if month_value:
|
||
try:
|
||
parsed = datetime.strptime(month_value, '%Y-%m').date()
|
||
return parsed.replace(day=1)
|
||
except ValueError:
|
||
pass
|
||
return today.replace(day=1)
|
||
|
||
|
||
def _next_month(month_start):
|
||
return (month_start.replace(day=28) + timedelta(days=4)).replace(day=1)
|
||
|
||
|
||
def _previous_month(month_start):
|
||
return (month_start - timedelta(days=1)).replace(day=1)
|
||
|
||
|
||
def _build_month_context(month_value=None):
|
||
month_start = _parse_month(month_value)
|
||
next_month = _next_month(month_start)
|
||
tz = timezone.get_current_timezone()
|
||
range_start = timezone.make_aware(datetime.combine(month_start, time.min), tz)
|
||
range_end = timezone.make_aware(datetime.combine(next_month, time.min), tz)
|
||
|
||
events = list(
|
||
Event.objects.filter(is_published=True, start__lt=range_end, end__gte=range_start).order_by('start', 'name')
|
||
)
|
||
|
||
event_map = defaultdict(list)
|
||
for event in events:
|
||
start_local = timezone.localtime(event.start)
|
||
end_local = timezone.localtime(event.end)
|
||
current_day = start_local.date()
|
||
final_day = end_local.date()
|
||
while current_day <= final_day:
|
||
event_map[current_day.isoformat()].append(
|
||
{
|
||
'name': event.name,
|
||
'location': event.location,
|
||
'time': f"{start_local:%I:%M %p} – {end_local:%I:%M %p}" if start_local.date() == end_local.date() else f"{start_local:%b %d, %I:%M %p} – {end_local:%b %d, %I:%M %p}",
|
||
'summary': event.summary,
|
||
'event_url': event.event_url,
|
||
'detail_url': reverse('event_detail', kwargs={'slug': event.slug}),
|
||
}
|
||
)
|
||
current_day += timedelta(days=1)
|
||
|
||
month_calendar = calendar.Calendar(firstweekday=6)
|
||
today = timezone.localdate()
|
||
calendar_weeks = []
|
||
for week in month_calendar.monthdatescalendar(month_start.year, month_start.month):
|
||
week_days = []
|
||
for day in week:
|
||
iso = day.isoformat()
|
||
day_events = event_map.get(iso, [])
|
||
week_days.append(
|
||
{
|
||
'date': day,
|
||
'iso': iso,
|
||
'day': day.day,
|
||
'in_month': day.month == month_start.month,
|
||
'is_today': day == today,
|
||
'event_count': len(day_events),
|
||
'has_events': bool(day_events),
|
||
}
|
||
)
|
||
calendar_weeks.append(week_days)
|
||
|
||
return {
|
||
'calendar_weeks': calendar_weeks,
|
||
'calendar_events': dict(event_map),
|
||
'month_label': month_start.strftime('%B %Y'),
|
||
'month_value': month_start.strftime('%Y-%m'),
|
||
'prev_month': _previous_month(month_start).strftime('%Y-%m'),
|
||
'next_month': next_month.strftime('%Y-%m'),
|
||
'today_value': today.strftime('%Y-%m'),
|
||
}
|
||
|
||
|
||
def _base_context(**extra):
|
||
context = {
|
||
'project_name': PROJECT_NAME,
|
||
'meta_description': DEFAULT_META_DESCRIPTION,
|
||
}
|
||
context.update(extra)
|
||
return context
|
||
|
||
|
||
def _embed_base_url(request):
|
||
return request.build_absolute_uri(reverse('calendar_embed'))
|
||
|
||
|
||
def _show_embed_header(request):
|
||
return request.GET.get('header', '1') != '0'
|
||
|
||
|
||
def _embed_header_value(request):
|
||
return '1' if _show_embed_header(request) else '0'
|
||
|
||
|
||
def _serialize_event(event):
|
||
return {
|
||
'name': event.name,
|
||
'slug': event.slug,
|
||
'location': event.location,
|
||
'start': event.start.isoformat(),
|
||
'end': event.end.isoformat(),
|
||
'event_url': event.event_url,
|
||
'summary': event.summary,
|
||
'is_published': event.is_published,
|
||
}
|
||
|
||
|
||
def _build_event_backup_payload():
|
||
events = list(Event.objects.all().order_by('start', 'name'))
|
||
return {
|
||
'version': 1,
|
||
'project': PROJECT_NAME,
|
||
'exported_at': timezone.now().isoformat(),
|
||
'event_count': len(events),
|
||
'events': [_serialize_event(event) for event in events],
|
||
}
|
||
|
||
|
||
def _parse_backup_datetime(value, label):
|
||
if not isinstance(value, str) or not value.strip():
|
||
raise ValueError(f'{label} is missing.')
|
||
try:
|
||
parsed = datetime.fromisoformat(value)
|
||
except ValueError as exc:
|
||
raise ValueError(f'{label} is invalid.') from exc
|
||
if timezone.is_naive(parsed):
|
||
parsed = timezone.make_aware(parsed, timezone.get_current_timezone())
|
||
return parsed
|
||
|
||
|
||
def _parse_backup_bool(value, label):
|
||
if isinstance(value, bool):
|
||
return value
|
||
if isinstance(value, int) and value in (0, 1):
|
||
return bool(value)
|
||
if isinstance(value, str):
|
||
normalized = value.strip().lower()
|
||
if normalized in {'1', 'true', 'yes', 'on'}:
|
||
return True
|
||
if normalized in {'0', 'false', 'no', 'off'}:
|
||
return False
|
||
raise ValueError(f'{label} must be true or false.')
|
||
|
||
|
||
def _normalize_event_backup_payload(payload):
|
||
if not isinstance(payload, dict):
|
||
raise ValueError('Backup file must contain one JSON object.')
|
||
if payload.get('version') != 1:
|
||
raise ValueError('Only backup version 1 is supported right now.')
|
||
|
||
raw_events = payload.get('events')
|
||
if not isinstance(raw_events, list):
|
||
raise ValueError('Backup file is missing a valid events list.')
|
||
|
||
normalized_events = []
|
||
seen_slugs = set()
|
||
for index, raw_event in enumerate(raw_events, start=1):
|
||
if not isinstance(raw_event, dict):
|
||
raise ValueError(f'Event #{index} is invalid.')
|
||
|
||
name = str(raw_event.get('name') or '').strip()
|
||
if not name:
|
||
raise ValueError(f'Event #{index} is missing a name.')
|
||
|
||
slug = str(raw_event.get('slug') or '').strip()
|
||
if slug:
|
||
if slug in seen_slugs:
|
||
raise ValueError(f'Backup contains a duplicate slug: {slug}.')
|
||
seen_slugs.add(slug)
|
||
|
||
start = _parse_backup_datetime(raw_event.get('start'), f'Event #{index} start')
|
||
end = _parse_backup_datetime(raw_event.get('end'), f'Event #{index} end')
|
||
if end <= start:
|
||
raise ValueError(f'Event #{index} ends before it starts.')
|
||
|
||
normalized_events.append(
|
||
{
|
||
'name': name,
|
||
'slug': slug,
|
||
'location': str(raw_event.get('location') or '').strip(),
|
||
'start': start,
|
||
'end': end,
|
||
'event_url': str(raw_event.get('event_url') or '').strip(),
|
||
'summary': str(raw_event.get('summary') or '').strip(),
|
||
'is_published': _parse_backup_bool(raw_event.get('is_published', True), f'Event #{index} publication flag'),
|
||
}
|
||
)
|
||
|
||
declared_count = payload.get('event_count')
|
||
if declared_count is not None and declared_count != len(normalized_events):
|
||
raise ValueError('Backup event count does not match the number of event records.')
|
||
|
||
return {
|
||
'version': 1,
|
||
'project': str(payload.get('project') or PROJECT_NAME),
|
||
'exported_at': payload.get('exported_at'),
|
||
'event_count': len(normalized_events),
|
||
'events': normalized_events,
|
||
}
|
||
|
||
|
||
def _format_backup_timestamp(value):
|
||
if not value:
|
||
return 'Unknown'
|
||
try:
|
||
parsed = datetime.fromisoformat(value)
|
||
except (TypeError, ValueError):
|
||
return str(value)
|
||
if timezone.is_naive(parsed):
|
||
parsed = timezone.make_aware(parsed, timezone.get_current_timezone())
|
||
return timezone.localtime(parsed).strftime('%b %d, %Y, %I:%M %p %Z')
|
||
|
||
|
||
def _build_event_import_preview(payload):
|
||
normalized = _normalize_event_backup_payload(payload)
|
||
published_count = sum(1 for event in normalized['events'] if event['is_published'])
|
||
preview_events = normalized['events'][:5]
|
||
return {
|
||
'version': normalized['version'],
|
||
'project': normalized['project'],
|
||
'event_count': normalized['event_count'],
|
||
'published_count': published_count,
|
||
'draft_count': normalized['event_count'] - published_count,
|
||
'exported_at': normalized['exported_at'],
|
||
'exported_at_display': _format_backup_timestamp(normalized['exported_at']),
|
||
'preview_events': preview_events,
|
||
'remaining_count': max(normalized['event_count'] - len(preview_events), 0),
|
||
}
|
||
|
||
|
||
def _get_pending_event_import_preview(request):
|
||
payload = request.session.get(BACKUP_IMPORT_SESSION_KEY)
|
||
if not payload:
|
||
return None
|
||
try:
|
||
return _build_event_import_preview(payload)
|
||
except ValueError:
|
||
request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
|
||
request.session.modified = True
|
||
return None
|
||
|
||
|
||
def _build_dashboard_context(request, import_preview=None):
|
||
events = Event.objects.all().order_by('start', 'name')
|
||
return _base_context(
|
||
page_title='Manage events',
|
||
events=events,
|
||
dashboard_count=events.count(),
|
||
embed_base_url=_embed_base_url(request),
|
||
default_embed_month=timezone.localdate().replace(day=1).strftime('%Y-%m'),
|
||
pending_import=import_preview if import_preview is not None else _get_pending_event_import_preview(request),
|
||
)
|
||
|
||
|
||
def _restore_events_from_backup(payload):
|
||
normalized = _normalize_event_backup_payload(payload)
|
||
with transaction.atomic():
|
||
Event.objects.all().delete()
|
||
for event_data in normalized['events']:
|
||
event = Event(**event_data)
|
||
event.full_clean()
|
||
event.save()
|
||
return normalized['event_count']
|
||
|
||
|
||
@login_required(login_url='login')
|
||
def event_dashboard(request):
|
||
if not request.user.is_staff:
|
||
raise PermissionDenied
|
||
return render(request, 'core/event_dashboard.html', _build_dashboard_context(request))
|
||
|
||
|
||
@login_required(login_url='login')
|
||
def event_export(request):
|
||
if not request.user.is_staff:
|
||
raise PermissionDenied
|
||
|
||
payload = _build_event_backup_payload()
|
||
timestamp = timezone.localtime().strftime('%Y%m%d-%H%M%S')
|
||
response = HttpResponse(
|
||
json.dumps(payload, indent=2),
|
||
content_type='application/json',
|
||
)
|
||
response['Content-Disposition'] = f'attachment; filename="roadshow-calendar-events-{timestamp}.json"'
|
||
return response
|
||
|
||
|
||
@login_required(login_url='login')
|
||
def event_import_preview(request):
|
||
if not request.user.is_staff:
|
||
raise PermissionDenied
|
||
if request.method != 'POST':
|
||
return redirect('event_dashboard')
|
||
|
||
uploaded_file = request.FILES.get('backup_file')
|
||
if not uploaded_file:
|
||
messages.error(request, 'Choose a backup JSON file before previewing the restore.')
|
||
return redirect('event_dashboard')
|
||
|
||
try:
|
||
payload = json.loads(uploaded_file.read().decode('utf-8'))
|
||
preview = _build_event_import_preview(payload)
|
||
except (UnicodeDecodeError, json.JSONDecodeError, ValueError) as exc:
|
||
request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
|
||
request.session.modified = True
|
||
messages.error(request, f'Backup import failed: {exc}')
|
||
return redirect('event_dashboard')
|
||
|
||
request.session[BACKUP_IMPORT_SESSION_KEY] = payload
|
||
request.session.modified = True
|
||
messages.success(request, 'Backup file loaded. Review the restore preview below before replacing live events.')
|
||
return render(request, 'core/event_dashboard.html', _build_dashboard_context(request, import_preview=preview))
|
||
|
||
|
||
@login_required(login_url='login')
|
||
def event_import_restore(request):
|
||
if not request.user.is_staff:
|
||
raise PermissionDenied
|
||
if request.method != 'POST':
|
||
return redirect('event_dashboard')
|
||
|
||
payload = request.session.get(BACKUP_IMPORT_SESSION_KEY)
|
||
if not payload:
|
||
messages.error(request, 'Upload a backup JSON and preview it before running a restore.')
|
||
return redirect('event_dashboard')
|
||
|
||
try:
|
||
restored_count = _restore_events_from_backup(payload)
|
||
except ValueError as exc:
|
||
request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
|
||
request.session.modified = True
|
||
messages.error(request, f'Restore failed: {exc}')
|
||
return redirect('event_dashboard')
|
||
|
||
request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
|
||
request.session.modified = True
|
||
event_label = 'event' if restored_count == 1 else 'events'
|
||
messages.success(request, f'Restore complete. Replaced the live calendar with {restored_count} {event_label} from the backup.')
|
||
return redirect('event_dashboard')
|
||
|
||
|
||
def _build_weekly_recurrence_datetimes(start, end, recurrence_start_date, recurrence_end_date, recurrence_weekday):
|
||
tz = timezone.get_current_timezone()
|
||
local_start = timezone.localtime(start, tz) if timezone.is_aware(start) else start
|
||
start_time = local_start.time().replace(tzinfo=None)
|
||
duration = end - start
|
||
weekday_index = int(recurrence_weekday)
|
||
days_until_first = (weekday_index - recurrence_start_date.weekday()) % 7
|
||
current_date = recurrence_start_date + timedelta(days=days_until_first)
|
||
occurrence_datetimes = []
|
||
|
||
while current_date <= recurrence_end_date:
|
||
occurrence_start = timezone.make_aware(datetime.combine(current_date, start_time), tz)
|
||
occurrence_end = occurrence_start + duration
|
||
occurrence_datetimes.append((occurrence_start, occurrence_end))
|
||
current_date += timedelta(days=7)
|
||
|
||
return occurrence_datetimes
|
||
|
||
|
||
@login_required(login_url='login')
|
||
def event_create(request):
|
||
if not request.user.is_staff:
|
||
raise PermissionDenied
|
||
|
||
if request.method == 'POST':
|
||
form = EventForm(request.POST)
|
||
if form.is_valid():
|
||
recurrence_start_date = form.cleaned_data.get('recurrence_start_date')
|
||
recurrence_end_date = form.cleaned_data.get('recurrence_end_date')
|
||
recurrence_weekday = form.cleaned_data.get('recurrence_weekday')
|
||
|
||
if recurrence_start_date and recurrence_end_date and recurrence_weekday:
|
||
base_event = form.save(commit=False)
|
||
occurrence_datetimes = _build_weekly_recurrence_datetimes(
|
||
start=base_event.start,
|
||
end=base_event.end,
|
||
recurrence_start_date=recurrence_start_date,
|
||
recurrence_end_date=recurrence_end_date,
|
||
recurrence_weekday=recurrence_weekday,
|
||
)
|
||
|
||
created_events = []
|
||
with transaction.atomic():
|
||
for occurrence_start, occurrence_end in occurrence_datetimes:
|
||
event = Event(
|
||
name=base_event.name,
|
||
location=base_event.location,
|
||
start=occurrence_start,
|
||
end=occurrence_end,
|
||
event_url=base_event.event_url,
|
||
summary=base_event.summary,
|
||
is_published=base_event.is_published,
|
||
)
|
||
event.full_clean()
|
||
event.save()
|
||
created_events.append(event)
|
||
|
||
event_label = 'event' if len(created_events) == 1 else 'events'
|
||
messages.success(request, f'Created {len(created_events)} recurring {event_label} and added them to the calendar.')
|
||
return redirect('event_dashboard')
|
||
|
||
event = form.save()
|
||
messages.success(request, 'Event saved and ready for the public calendar.')
|
||
return redirect('event_dashboard_detail', slug=event.slug)
|
||
else:
|
||
form = EventForm()
|
||
|
||
return render(
|
||
request,
|
||
'core/event_form.html',
|
||
_base_context(
|
||
page_title='Add event',
|
||
form=form,
|
||
form_mode='create',
|
||
form_title='Add a new stop to the calendar',
|
||
form_intro='Once saved, published events immediately power the landing page, public calendar, and embeddable widget.',
|
||
submit_label='Save event',
|
||
),
|
||
)
|
||
|
||
|
||
@login_required(login_url='login')
|
||
def event_edit(request, slug):
|
||
if not request.user.is_staff:
|
||
raise PermissionDenied
|
||
|
||
event = get_object_or_404(Event, slug=slug)
|
||
if request.method == 'POST':
|
||
form = EventForm(request.POST, instance=event)
|
||
if form.is_valid():
|
||
event = form.save()
|
||
messages.success(request, 'Event updated successfully.')
|
||
return redirect('event_dashboard_detail', slug=event.slug)
|
||
else:
|
||
form = EventForm(instance=event)
|
||
|
||
return render(
|
||
request,
|
||
'core/event_form.html',
|
||
_base_context(
|
||
page_title=f'Edit {event.name}',
|
||
form=form,
|
||
event=event,
|
||
form_mode='edit',
|
||
form_title='Update this calendar stop',
|
||
form_intro='Change dates, copy, publication status, or the event link without leaving the custom dashboard.',
|
||
submit_label='Save changes',
|
||
),
|
||
)
|
||
|
||
|
||
@login_required(login_url='login')
|
||
def event_delete(request, slug):
|
||
if not request.user.is_staff:
|
||
raise PermissionDenied
|
||
|
||
event = get_object_or_404(Event, slug=slug)
|
||
if request.method == 'POST':
|
||
event_name = event.name
|
||
event.delete()
|
||
messages.success(request, f'{event_name} was deleted.')
|
||
return redirect('event_dashboard')
|
||
|
||
return render(
|
||
request,
|
||
'core/event_confirm_delete.html',
|
||
_base_context(
|
||
page_title=f'Delete {event.name}',
|
||
event=event,
|
||
),
|
||
)
|
||
|
||
|
||
@login_required(login_url='login')
|
||
def event_dashboard_detail(request, slug):
|
||
if not request.user.is_staff:
|
||
raise PermissionDenied
|
||
event = get_object_or_404(Event, slug=slug)
|
||
return render(
|
||
request,
|
||
'core/event_dashboard_detail.html',
|
||
_base_context(
|
||
page_title=event.name,
|
||
event=event,
|
||
),
|
||
)
|
||
|
||
|
||
def home(request):
|
||
month_context = _build_month_context(request.GET.get('month'))
|
||
upcoming_events = list(Event.objects.filter(is_published=True, end__gte=timezone.now()).order_by('start', 'name')[:6])
|
||
embed_url = _embed_base_url(request)
|
||
iframe_snippet = f'<iframe src="{embed_url}" title="Where to find us calendar" width="100%" height="760" style="border:0;border-radius:24px;overflow:hidden;"></iframe>'
|
||
return render(
|
||
request,
|
||
'core/index.html',
|
||
_base_context(
|
||
page_title=PROJECT_NAME,
|
||
hero_events=upcoming_events[:3],
|
||
upcoming_events=upcoming_events,
|
||
embed_url=embed_url,
|
||
iframe_snippet=iframe_snippet,
|
||
**month_context,
|
||
),
|
||
)
|
||
|
||
|
||
def calendar_page(request):
|
||
month_context = _build_month_context(request.GET.get('month'))
|
||
return render(
|
||
request,
|
||
'core/calendar_page.html',
|
||
_base_context(
|
||
page_title='Public calendar',
|
||
**month_context,
|
||
),
|
||
)
|
||
|
||
|
||
def calendar_embed(request):
|
||
month_context = _build_month_context(request.GET.get('month'))
|
||
return render(
|
||
request,
|
||
'core/calendar_embed.html',
|
||
_base_context(
|
||
page_title='Embeddable calendar',
|
||
embedded=True,
|
||
show_embed_header=_show_embed_header(request),
|
||
embed_header_value=_embed_header_value(request),
|
||
**month_context,
|
||
),
|
||
)
|
||
|
||
|
||
def event_list(request):
|
||
events = Event.objects.filter(is_published=True).order_by('start', 'name')
|
||
return render(
|
||
request,
|
||
'core/event_list.html',
|
||
_base_context(
|
||
page_title='Upcoming events',
|
||
events=events,
|
||
),
|
||
)
|
||
|
||
|
||
def event_detail(request, slug):
|
||
event = get_object_or_404(Event, slug=slug, is_published=True)
|
||
return render(
|
||
request,
|
||
'core/event_detail.html',
|
||
_base_context(
|
||
page_title=event.name,
|
||
event=event,
|
||
),
|
||
)
|