39435-vm/core/views.py
2026-04-25 19:43:38 +00:00

589 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import calendar
from collections import defaultdict
from datetime import datetime, time, timedelta
import json
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.core.exceptions import PermissionDenied
from django.db import transaction
from django.http import HttpResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils import timezone
from .forms import EventForm
from .models import Event
PROJECT_NAME = 'Roadshow Calendar'
DEFAULT_META_DESCRIPTION = 'Track where your business will be each day with a polished public calendar and secure staff-only event management.'
BACKUP_IMPORT_SESSION_KEY = 'event_backup_import_payload'
def _parse_month(month_value: str | None):
today = timezone.localdate()
if month_value:
try:
parsed = datetime.strptime(month_value, '%Y-%m').date()
return parsed.replace(day=1)
except ValueError:
pass
return today.replace(day=1)
def _next_month(month_start):
return (month_start.replace(day=28) + timedelta(days=4)).replace(day=1)
def _previous_month(month_start):
return (month_start - timedelta(days=1)).replace(day=1)
def _build_month_context(month_value=None):
month_start = _parse_month(month_value)
next_month = _next_month(month_start)
tz = timezone.get_current_timezone()
range_start = timezone.make_aware(datetime.combine(month_start, time.min), tz)
range_end = timezone.make_aware(datetime.combine(next_month, time.min), tz)
events = list(
Event.objects.filter(is_published=True, start__lt=range_end, end__gte=range_start).order_by('start', 'name')
)
event_map = defaultdict(list)
for event in events:
start_local = timezone.localtime(event.start)
end_local = timezone.localtime(event.end)
current_day = start_local.date()
final_day = end_local.date()
while current_day <= final_day:
event_map[current_day.isoformat()].append(
{
'name': event.name,
'location': event.location,
'time': f"{start_local:%I:%M %p} {end_local:%I:%M %p}" if start_local.date() == end_local.date() else f"{start_local:%b %d, %I:%M %p} {end_local:%b %d, %I:%M %p}",
'summary': event.summary,
'event_url': event.event_url,
'detail_url': reverse('event_detail', kwargs={'slug': event.slug}),
}
)
current_day += timedelta(days=1)
month_calendar = calendar.Calendar(firstweekday=6)
today = timezone.localdate()
calendar_weeks = []
for week in month_calendar.monthdatescalendar(month_start.year, month_start.month):
week_days = []
for day in week:
iso = day.isoformat()
day_events = event_map.get(iso, [])
week_days.append(
{
'date': day,
'iso': iso,
'day': day.day,
'in_month': day.month == month_start.month,
'is_today': day == today,
'event_count': len(day_events),
'has_events': bool(day_events),
}
)
calendar_weeks.append(week_days)
return {
'calendar_weeks': calendar_weeks,
'calendar_events': dict(event_map),
'month_label': month_start.strftime('%B %Y'),
'month_value': month_start.strftime('%Y-%m'),
'prev_month': _previous_month(month_start).strftime('%Y-%m'),
'next_month': next_month.strftime('%Y-%m'),
'today_value': today.strftime('%Y-%m'),
}
def _base_context(**extra):
context = {
'project_name': PROJECT_NAME,
'meta_description': DEFAULT_META_DESCRIPTION,
}
context.update(extra)
return context
def _embed_base_url(request):
return request.build_absolute_uri(reverse('calendar_embed'))
def _show_embed_header(request):
return request.GET.get('header', '1') != '0'
def _embed_header_value(request):
return '1' if _show_embed_header(request) else '0'
def _serialize_event(event):
return {
'name': event.name,
'slug': event.slug,
'location': event.location,
'start': event.start.isoformat(),
'end': event.end.isoformat(),
'event_url': event.event_url,
'summary': event.summary,
'is_published': event.is_published,
}
def _build_event_backup_payload():
events = list(Event.objects.all().order_by('start', 'name'))
return {
'version': 1,
'project': PROJECT_NAME,
'exported_at': timezone.now().isoformat(),
'event_count': len(events),
'events': [_serialize_event(event) for event in events],
}
def _parse_backup_datetime(value, label):
if not isinstance(value, str) or not value.strip():
raise ValueError(f'{label} is missing.')
try:
parsed = datetime.fromisoformat(value)
except ValueError as exc:
raise ValueError(f'{label} is invalid.') from exc
if timezone.is_naive(parsed):
parsed = timezone.make_aware(parsed, timezone.get_current_timezone())
return parsed
def _parse_backup_bool(value, label):
if isinstance(value, bool):
return value
if isinstance(value, int) and value in (0, 1):
return bool(value)
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {'1', 'true', 'yes', 'on'}:
return True
if normalized in {'0', 'false', 'no', 'off'}:
return False
raise ValueError(f'{label} must be true or false.')
def _normalize_event_backup_payload(payload):
if not isinstance(payload, dict):
raise ValueError('Backup file must contain one JSON object.')
if payload.get('version') != 1:
raise ValueError('Only backup version 1 is supported right now.')
raw_events = payload.get('events')
if not isinstance(raw_events, list):
raise ValueError('Backup file is missing a valid events list.')
normalized_events = []
seen_slugs = set()
for index, raw_event in enumerate(raw_events, start=1):
if not isinstance(raw_event, dict):
raise ValueError(f'Event #{index} is invalid.')
name = str(raw_event.get('name') or '').strip()
if not name:
raise ValueError(f'Event #{index} is missing a name.')
slug = str(raw_event.get('slug') or '').strip()
if slug:
if slug in seen_slugs:
raise ValueError(f'Backup contains a duplicate slug: {slug}.')
seen_slugs.add(slug)
start = _parse_backup_datetime(raw_event.get('start'), f'Event #{index} start')
end = _parse_backup_datetime(raw_event.get('end'), f'Event #{index} end')
if end <= start:
raise ValueError(f'Event #{index} ends before it starts.')
normalized_events.append(
{
'name': name,
'slug': slug,
'location': str(raw_event.get('location') or '').strip(),
'start': start,
'end': end,
'event_url': str(raw_event.get('event_url') or '').strip(),
'summary': str(raw_event.get('summary') or '').strip(),
'is_published': _parse_backup_bool(raw_event.get('is_published', True), f'Event #{index} publication flag'),
}
)
declared_count = payload.get('event_count')
if declared_count is not None and declared_count != len(normalized_events):
raise ValueError('Backup event count does not match the number of event records.')
return {
'version': 1,
'project': str(payload.get('project') or PROJECT_NAME),
'exported_at': payload.get('exported_at'),
'event_count': len(normalized_events),
'events': normalized_events,
}
def _format_backup_timestamp(value):
if not value:
return 'Unknown'
try:
parsed = datetime.fromisoformat(value)
except (TypeError, ValueError):
return str(value)
if timezone.is_naive(parsed):
parsed = timezone.make_aware(parsed, timezone.get_current_timezone())
return timezone.localtime(parsed).strftime('%b %d, %Y, %I:%M %p %Z')
def _build_event_import_preview(payload):
normalized = _normalize_event_backup_payload(payload)
published_count = sum(1 for event in normalized['events'] if event['is_published'])
preview_events = normalized['events'][:5]
return {
'version': normalized['version'],
'project': normalized['project'],
'event_count': normalized['event_count'],
'published_count': published_count,
'draft_count': normalized['event_count'] - published_count,
'exported_at': normalized['exported_at'],
'exported_at_display': _format_backup_timestamp(normalized['exported_at']),
'preview_events': preview_events,
'remaining_count': max(normalized['event_count'] - len(preview_events), 0),
}
def _get_pending_event_import_preview(request):
payload = request.session.get(BACKUP_IMPORT_SESSION_KEY)
if not payload:
return None
try:
return _build_event_import_preview(payload)
except ValueError:
request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
request.session.modified = True
return None
def _build_dashboard_context(request, import_preview=None):
events = Event.objects.all().order_by('start', 'name')
return _base_context(
page_title='Manage events',
events=events,
dashboard_count=events.count(),
embed_base_url=_embed_base_url(request),
default_embed_month=timezone.localdate().replace(day=1).strftime('%Y-%m'),
pending_import=import_preview if import_preview is not None else _get_pending_event_import_preview(request),
)
def _restore_events_from_backup(payload):
normalized = _normalize_event_backup_payload(payload)
with transaction.atomic():
Event.objects.all().delete()
for event_data in normalized['events']:
event = Event(**event_data)
event.full_clean()
event.save()
return normalized['event_count']
@login_required(login_url='login')
def event_dashboard(request):
if not request.user.is_staff:
raise PermissionDenied
return render(request, 'core/event_dashboard.html', _build_dashboard_context(request))
@login_required(login_url='login')
def event_export(request):
if not request.user.is_staff:
raise PermissionDenied
payload = _build_event_backup_payload()
timestamp = timezone.localtime().strftime('%Y%m%d-%H%M%S')
response = HttpResponse(
json.dumps(payload, indent=2),
content_type='application/json',
)
response['Content-Disposition'] = f'attachment; filename="roadshow-calendar-events-{timestamp}.json"'
return response
@login_required(login_url='login')
def event_import_preview(request):
if not request.user.is_staff:
raise PermissionDenied
if request.method != 'POST':
return redirect('event_dashboard')
uploaded_file = request.FILES.get('backup_file')
if not uploaded_file:
messages.error(request, 'Choose a backup JSON file before previewing the restore.')
return redirect('event_dashboard')
try:
payload = json.loads(uploaded_file.read().decode('utf-8'))
preview = _build_event_import_preview(payload)
except (UnicodeDecodeError, json.JSONDecodeError, ValueError) as exc:
request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
request.session.modified = True
messages.error(request, f'Backup import failed: {exc}')
return redirect('event_dashboard')
request.session[BACKUP_IMPORT_SESSION_KEY] = payload
request.session.modified = True
messages.success(request, 'Backup file loaded. Review the restore preview below before replacing live events.')
return render(request, 'core/event_dashboard.html', _build_dashboard_context(request, import_preview=preview))
@login_required(login_url='login')
def event_import_restore(request):
if not request.user.is_staff:
raise PermissionDenied
if request.method != 'POST':
return redirect('event_dashboard')
payload = request.session.get(BACKUP_IMPORT_SESSION_KEY)
if not payload:
messages.error(request, 'Upload a backup JSON and preview it before running a restore.')
return redirect('event_dashboard')
try:
restored_count = _restore_events_from_backup(payload)
except ValueError as exc:
request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
request.session.modified = True
messages.error(request, f'Restore failed: {exc}')
return redirect('event_dashboard')
request.session.pop(BACKUP_IMPORT_SESSION_KEY, None)
request.session.modified = True
event_label = 'event' if restored_count == 1 else 'events'
messages.success(request, f'Restore complete. Replaced the live calendar with {restored_count} {event_label} from the backup.')
return redirect('event_dashboard')
def _build_weekly_recurrence_datetimes(start, end, recurrence_start_date, recurrence_end_date, recurrence_weekday):
tz = timezone.get_current_timezone()
local_start = timezone.localtime(start, tz) if timezone.is_aware(start) else start
start_time = local_start.time().replace(tzinfo=None)
duration = end - start
weekday_index = int(recurrence_weekday)
days_until_first = (weekday_index - recurrence_start_date.weekday()) % 7
current_date = recurrence_start_date + timedelta(days=days_until_first)
occurrence_datetimes = []
while current_date <= recurrence_end_date:
occurrence_start = timezone.make_aware(datetime.combine(current_date, start_time), tz)
occurrence_end = occurrence_start + duration
occurrence_datetimes.append((occurrence_start, occurrence_end))
current_date += timedelta(days=7)
return occurrence_datetimes
@login_required(login_url='login')
def event_create(request):
if not request.user.is_staff:
raise PermissionDenied
if request.method == 'POST':
form = EventForm(request.POST)
if form.is_valid():
recurrence_start_date = form.cleaned_data.get('recurrence_start_date')
recurrence_end_date = form.cleaned_data.get('recurrence_end_date')
recurrence_weekday = form.cleaned_data.get('recurrence_weekday')
if recurrence_start_date and recurrence_end_date and recurrence_weekday:
base_event = form.save(commit=False)
occurrence_datetimes = _build_weekly_recurrence_datetimes(
start=base_event.start,
end=base_event.end,
recurrence_start_date=recurrence_start_date,
recurrence_end_date=recurrence_end_date,
recurrence_weekday=recurrence_weekday,
)
created_events = []
with transaction.atomic():
for occurrence_start, occurrence_end in occurrence_datetimes:
event = Event(
name=base_event.name,
location=base_event.location,
start=occurrence_start,
end=occurrence_end,
event_url=base_event.event_url,
summary=base_event.summary,
is_published=base_event.is_published,
)
event.full_clean()
event.save()
created_events.append(event)
event_label = 'event' if len(created_events) == 1 else 'events'
messages.success(request, f'Created {len(created_events)} recurring {event_label} and added them to the calendar.')
return redirect('event_dashboard')
event = form.save()
messages.success(request, 'Event saved and ready for the public calendar.')
return redirect('event_dashboard_detail', slug=event.slug)
else:
form = EventForm()
return render(
request,
'core/event_form.html',
_base_context(
page_title='Add event',
form=form,
form_mode='create',
form_title='Add a new stop to the calendar',
form_intro='Once saved, published events immediately power the landing page, public calendar, and embeddable widget.',
submit_label='Save event',
),
)
@login_required(login_url='login')
def event_edit(request, slug):
if not request.user.is_staff:
raise PermissionDenied
event = get_object_or_404(Event, slug=slug)
if request.method == 'POST':
form = EventForm(request.POST, instance=event)
if form.is_valid():
event = form.save()
messages.success(request, 'Event updated successfully.')
return redirect('event_dashboard_detail', slug=event.slug)
else:
form = EventForm(instance=event)
return render(
request,
'core/event_form.html',
_base_context(
page_title=f'Edit {event.name}',
form=form,
event=event,
form_mode='edit',
form_title='Update this calendar stop',
form_intro='Change dates, copy, publication status, or the event link without leaving the custom dashboard.',
submit_label='Save changes',
),
)
@login_required(login_url='login')
def event_delete(request, slug):
if not request.user.is_staff:
raise PermissionDenied
event = get_object_or_404(Event, slug=slug)
if request.method == 'POST':
event_name = event.name
event.delete()
messages.success(request, f'{event_name} was deleted.')
return redirect('event_dashboard')
return render(
request,
'core/event_confirm_delete.html',
_base_context(
page_title=f'Delete {event.name}',
event=event,
),
)
@login_required(login_url='login')
def event_dashboard_detail(request, slug):
if not request.user.is_staff:
raise PermissionDenied
event = get_object_or_404(Event, slug=slug)
return render(
request,
'core/event_dashboard_detail.html',
_base_context(
page_title=event.name,
event=event,
),
)
def home(request):
month_context = _build_month_context(request.GET.get('month'))
upcoming_events = list(Event.objects.filter(is_published=True, end__gte=timezone.now()).order_by('start', 'name')[:6])
embed_url = _embed_base_url(request)
iframe_snippet = f'<iframe src="{embed_url}" title="Where to find us calendar" width="100%" height="760" style="border:0;border-radius:24px;overflow:hidden;"></iframe>'
return render(
request,
'core/index.html',
_base_context(
page_title=PROJECT_NAME,
hero_events=upcoming_events[:3],
upcoming_events=upcoming_events,
embed_url=embed_url,
iframe_snippet=iframe_snippet,
**month_context,
),
)
def calendar_page(request):
month_context = _build_month_context(request.GET.get('month'))
return render(
request,
'core/calendar_page.html',
_base_context(
page_title='Public calendar',
**month_context,
),
)
def calendar_embed(request):
month_context = _build_month_context(request.GET.get('month'))
return render(
request,
'core/calendar_embed.html',
_base_context(
page_title='Embeddable calendar',
embedded=True,
show_embed_header=_show_embed_header(request),
embed_header_value=_embed_header_value(request),
**month_context,
),
)
def event_list(request):
events = Event.objects.filter(is_published=True).order_by('start', 'name')
return render(
request,
'core/event_list.html',
_base_context(
page_title='Upcoming events',
events=events,
),
)
def event_detail(request, slug):
event = get_object_or_404(Event, slug=slug, is_published=True)
return render(
request,
'core/event_detail.html',
_base_context(
page_title=event.name,
event=event,
),
)