37769-vm/core/admin.py
2026-01-28 13:06:12 +00:00

1630 lines
81 KiB
Python

from decimal import Decimal
from datetime import datetime, date
from django.db import transaction
from django.http import HttpResponse
from django.utils.safestring import mark_safe
import csv
import io
import logging
import tempfile
import os
from django.contrib import admin, messages
from django.urls import path, reverse
from django.shortcuts import render, redirect
from django.template.response import TemplateResponse
from .models import (
format_phone_number,
Tenant, TenantUserRole, InteractionType, DonationMethod, ElectionType, EventType, Voter,
VotingRecord, Event, EventParticipation, Donation, Interaction, VoterLikelihood, CampaignSettings,
Interest, Volunteer, VolunteerEvent, ParticipationStatus
)
from .forms import (
VoterImportForm, EventImportForm, EventParticipationImportForm,
DonationImportForm, InteractionImportForm, VoterLikelihoodImportForm,
VolunteerImportForm
)
logger = logging.getLogger(__name__)
VOTER_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('first_name', 'First Name'),
('last_name', 'Last Name'),
('nickname', 'Nickname'),
('birthdate', 'Birthdate'),
('address_street', 'Street Address'),
('city', 'City'),
('state', 'State'),
('prior_state', 'Prior State'),
('zip_code', 'Zip Code'),
('county', 'County'),
('phone', 'Phone'),
('phone_type', 'Phone Type'),
('email', 'Email'),
('district', 'District'),
('precinct', 'Precinct'),
('registration_date', 'Registration Date'),
('is_targeted', 'Is Targeted'),
('candidate_support', 'Candidate Support'),
('yard_sign', 'Yard Sign'),
('window_sticker', 'Window Sticker'),
('latitude', 'Latitude'),
('longitude', 'Longitude'),
]
EVENT_MAPPABLE_FIELDS = [
('name', 'Name'),
('date', 'Date'),
('start_time', 'Start Time'),
('end_time', 'End Time'),
('event_type', 'Event Type (Name)'),
('description', 'Description'),
]
EVENT_PARTICIPATION_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('event_id', 'Event ID'),
('event_date', 'Event Date'),
('event_type', 'Event Type (Name)'),
('participation_status', 'Participation Type'),
]
DONATION_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('date', 'Date'),
('amount', 'Amount'),
('method', 'Donation Method (Name)'),
]
INTERACTION_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('date', 'Date'),
('type', 'Interaction Type (Name)'),
('description', 'Description'),
('notes', 'Notes'),
]
VOLUNTEER_MAPPABLE_FIELDS = [
('first_name', 'First Name'),
('last_name', 'Last Name'),
('email', 'Email'),
('phone', 'Phone'),
]
VOTER_LIKELIHOOD_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('election_type', 'Election Type (Name)'),
('likelihood', 'Likelihood'),
]
class BaseImportAdminMixin:
def download_errors(self, request):
logger.info(f"download_errors called for {self.model._meta.model_name}")
session_key = f"{self.model._meta.model_name}_import_errors"
failed_rows = request.session.get(session_key, [])
if not failed_rows:
self.message_user(request, "No error log found in session.", level=messages.WARNING)
return redirect("..")
response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = f"attachment; filename={self.model._meta.model_name}_import_errors.csv"
if failed_rows:
all_keys = set()
for r in failed_rows:
all_keys.update(r.keys())
writer = csv.DictWriter(response, fieldnames=sorted(list(all_keys)))
writer.writeheader()
writer.writerows(failed_rows)
return response
class TenantUserRoleInline(admin.TabularInline):
model = TenantUserRole
extra = 1
class CampaignSettingsInline(admin.StackedInline):
model = CampaignSettings
can_delete = False
@admin.register(Tenant)
class TenantAdmin(admin.ModelAdmin):
list_display = ('name', 'created_at')
search_fields = ('name',)
inlines = [TenantUserRoleInline, CampaignSettingsInline]
@admin.register(TenantUserRole)
class TenantUserRoleAdmin(admin.ModelAdmin):
list_display = ('user', 'tenant', 'role')
list_filter = ('tenant', 'role')
search_fields = ('user__username', 'tenant__name')
@admin.register(InteractionType)
class InteractionTypeAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
@admin.register(DonationMethod)
class DonationMethodAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
@admin.register(ElectionType)
class ElectionTypeAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
@admin.register(EventType)
class EventTypeAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
@admin.register(ParticipationStatus)
class ParticipationStatusAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
change_list_template = 'admin/participationstatus_change_list.html'
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context['tenants'] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
@admin.register(Interest)
class InterestAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant')
list_filter = ('tenant',)
search_fields = ('name',)
class VotingRecordInline(admin.TabularInline):
model = VotingRecord
extra = 1
class DonationInline(admin.TabularInline):
model = Donation
extra = 1
class InteractionInline(admin.TabularInline):
model = Interaction
extra = 1
class VoterLikelihoodInline(admin.TabularInline):
model = VoterLikelihood
extra = 1
class VolunteerEventInline(admin.TabularInline):
model = VolunteerEvent
extra = 1
@admin.register(Voter)
class VoterAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('first_name', 'last_name', 'nickname', 'voter_id', 'tenant', 'district', 'candidate_support', 'is_targeted', 'city', 'state', 'prior_state')
list_filter = ('tenant', 'candidate_support', 'is_targeted', 'phone_type', 'yard_sign', 'district', 'city', 'state', 'prior_state')
search_fields = ('first_name', 'last_name', 'nickname', 'voter_id', 'address', 'city', 'state', 'prior_state', 'zip_code', 'county')
inlines = [VotingRecordInline, DonationInline, InteractionInline, VoterLikelihoodInline]
readonly_fields = ('address',)
change_list_template = "admin/voter_change_list.html"
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context["tenants"] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='voter-download-errors'),
path('import-voters/', self.admin_site.admin_view(self.import_voters), name='import-voters'),
]
return my_urls + urls
def import_voters(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get("file_path")
tenant_id = request.POST.get("tenant")
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f"map_{field_name}")
try:
with open(file_path, "r", encoding="UTF-8") as f:
# Optimization: Fast count and partial preview
total_count = sum(1 for line in f) - 1
f.seek(0)
reader = csv.DictReader(f)
preview_rows = []
voter_ids_for_preview = []
for i, row in enumerate(reader):
if i < 10:
preview_rows.append(row)
v_id = row.get(mapping.get("voter_id"))
if v_id:
voter_ids_for_preview.append(v_id)
else:
break
existing_preview_ids = set(Voter.objects.filter(tenant=tenant, voter_id__in=voter_ids_for_preview).values_list("voter_id", flat=True))
preview_data = []
for row in preview_rows:
v_id = row.get(mapping.get("voter_id"))
action = "update" if v_id in existing_preview_ids else "create"
preview_data.append({
"action": action,
"identifier": v_id,
"details": f"{row.get(mapping.get('first_name', '')) or ''} {row.get(mapping.get('last_name', '')) or ''}".strip()
})
update_count = "N/A"
create_count = "N/A"
context = self.admin_site.each_context(request)
context.update({
"title": "Import Preview",
"total_count": total_count,
"create_count": create_count,
"update_count": update_count,
"preview_data": preview_data,
"mapping": mapping,
"file_path": file_path,
"tenant_id": tenant_id,
"action_url": request.path,
"opts": self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("..")
elif "_import" in request.POST:
file_path = request.POST.get("file_path")
tenant_id = request.POST.get("tenant")
tenant = Tenant.objects.get(id=tenant_id)
mapping = {k: request.POST.get(f"map_{k}") for k, _ in VOTER_MAPPABLE_FIELDS if request.POST.get(f"map_{k}")}
try:
count = 0
errors = 0
failed_rows = []
batch_size = 500 # Optimized batch size
# Pre-calculate choice dicts and sets
support_choices = dict(Voter.SUPPORT_CHOICES)
yard_sign_choices = dict(Voter.YARD_SIGN_CHOICES)
window_sticker_choices = dict(Voter.WINDOW_STICKER_CHOICES)
phone_type_choices = dict(Voter.PHONE_TYPE_CHOICES)
phone_type_reverse = {v.lower(): k for k, v in phone_type_choices.items()}
# Fields to fetch for change detection
valid_fields = {f.name for f in Voter._meta.get_fields()}
mapped_fields = {f for f in mapping.keys() if f in valid_fields}
fetch_fields = list(mapped_fields | {"voter_id", "address", "phone", "longitude"})
update_fields = list(mapped_fields | {"address", "phone"})
if "voter_id" in update_fields: update_fields.remove("voter_id")
def chunk_reader(reader, size):
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) == size:
yield chunk
chunk = []
if chunk:
yield chunk
with open(file_path, "r", encoding="UTF-8") as f:
reader = csv.DictReader(f)
v_id_col = mapping.get("voter_id")
if not v_id_col:
raise ValueError("Voter ID mapping is missing")
for chunk_index, chunk in enumerate(chunk_reader(reader, batch_size)):
with transaction.atomic():
voter_ids = [row.get(v_id_col) for row in chunk if row.get(v_id_col)]
existing_voters = {v.voter_id: v for v in Voter.objects.filter(tenant=tenant, voter_id__in=voter_ids).only(*fetch_fields)}
to_create = []
to_update = []
processed_in_batch = set()
for row in chunk:
try:
voter_id = row.get(v_id_col)
if not voter_id or voter_id in processed_in_batch:
continue
processed_in_batch.add(voter_id)
voter = existing_voters.get(voter_id)
created = False
if not voter:
voter = Voter(tenant=tenant, voter_id=voter_id)
created = True
changed = created
for field_name, csv_col in mapping.items():
if field_name == "voter_id": continue
val = row.get(csv_col)
if val is None or str(val).strip() == "": continue
# Type-specific conversions
if field_name == "is_targeted":
val = str(val).lower() in ["true", "1", "yes"]
elif field_name in ["birthdate", "registration_date"]:
try:
if isinstance(val, str):
val = datetime.strptime(val.strip(), "%Y-%m-%d").date()
except:
pass
elif field_name == "candidate_support":
if val not in support_choices: val = "unknown"
elif field_name == "yard_sign":
if val not in yard_sign_choices: val = "none"
elif field_name == "window_sticker":
if val not in window_sticker_choices: val = "none"
elif field_name == "phone_type":
val_lower = str(val).lower()
if val_lower in phone_type_choices:
val = val_lower
elif val_lower in phone_type_reverse:
val = phone_type_reverse[val_lower]
else:
val = "cell"
if getattr(voter, field_name) != val:
setattr(voter, field_name, val)
changed = True
# Special fields
old_phone = voter.phone
voter.phone = format_phone_number(voter.phone)
if voter.phone != old_phone:
changed = True
if voter.longitude:
try:
new_lon = Decimal(str(voter.longitude)[:12])
if voter.longitude != new_lon:
voter.longitude = new_lon
changed = True
except:
pass
old_address = voter.address
parts = [voter.address_street, voter.city, voter.state, voter.zip_code]
voter.address = ", ".join([p for p in parts if p])
if voter.address != old_address:
changed = True
if not changed:
continue
if created:
to_create.append(voter)
else:
to_update.append(voter)
count += 1
except Exception as e:
logger.error(f"Error importing row: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if to_create:
Voter.objects.bulk_create(to_create)
if to_update:
Voter.objects.bulk_update(to_update, update_fields, batch_size=250)
logger.info(f"Voter import progress: Processed batch {chunk_index + 1}. Total successes: {count}")
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} voters.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows
request.session.modified = True
if errors > 0:
error_url = reverse("admin:voter-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
return redirect("..")
except Exception as e:
logger.exception("Voter import failed")
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = VoterImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES["file"]
tenant = form.cleaned_data["tenant"]
if not csv_file.name.endswith(".csv"):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, "r", encoding="UTF-8") as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
"title": "Map Voter Fields",
"headers": headers,
"model_fields": VOTER_MAPPABLE_FIELDS,
"tenant_id": tenant.id,
"file_path": file_path,
"action_url": request.path,
"opts": self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
class EventAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('id', 'name', 'event_type', 'date', 'start_time', 'end_time', 'tenant')
list_filter = ('tenant', 'date', 'event_type')
search_fields = ('name', 'description')
change_list_template = "admin/event_change_list.html"
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context["tenants"] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='event-download-errors'),
path('import-events/', self.admin_site.admin_view(self.import_events), name='import-events'),
]
return my_urls + urls
def import_events(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in EVENT_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
date = row.get(mapping.get('date'))
event_type_name = row.get(mapping.get('event_type'))
event_name = row.get(mapping.get('name'))
exists = False
if date and event_type_name:
q = Event.objects.filter(tenant=tenant, date=date, event_type__name=event_type_name)
if event_name:
q = q.filter(name=event_name)
exists = q.exists()
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"{event_name or 'No Name'} ({date} - {event_type_name})",
'details': row.get(mapping.get('description', '')) or ''
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("..")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in EVENT_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
failed_rows = []
for row in reader:
try:
date = row.get(mapping.get('date')) if mapping.get('date') else None
event_type_name = row.get(mapping.get('event_type')) if mapping.get('event_type') else None
description = row.get(mapping.get('description')) if mapping.get('description') else None
name = row.get(mapping.get('name')) if mapping.get('name') else None
start_time = row.get(mapping.get('start_time')) if mapping.get('start_time') else None
end_time = row.get(mapping.get('end_time')) if mapping.get('end_time') else None
if not date or not event_type_name:
row["Import Error"] = "Missing date or event type"
failed_rows.append(row)
errors += 1
continue
event_type, _ = EventType.objects.get_or_create(
tenant=tenant,
name=event_type_name
)
defaults = {}
if description and description.strip():
defaults['description'] = description
if name and name.strip():
defaults['name'] = name
if start_time and start_time.strip():
defaults['start_time'] = start_time
if end_time and end_time.strip():
defaults['end_time'] = end_time
Event.objects.update_or_create(
tenant=tenant,
date=date,
event_type=event_type,
name=name or '',
defaults=defaults
)
count += 1
except Exception as e:
logger.error(f"Error importing: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} events.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows
request.session.modified = True
logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}")
if errors > 0:
error_url = reverse("admin:event-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = EventImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Event Fields",
'headers': headers,
'model_fields': EVENT_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = EventImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Events"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(Volunteer)
class VolunteerAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('first_name', 'last_name', 'email', 'phone', 'tenant', 'user')
list_filter = ('tenant',)
search_fields = ('first_name', 'last_name', 'email', 'phone')
inlines = [VolunteerEventInline, InteractionInline]
filter_horizontal = ('interests',)
change_list_template = "admin/volunteer_change_list.html"
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context["tenants"] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='volunteer-download-errors'),
path('import-volunteers/', self.admin_site.admin_view(self.import_volunteers), name='import-volunteers'),
]
return my_urls + urls
def import_volunteers(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOLUNTEER_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
email = row.get(mapping.get('email'))
exists = Volunteer.objects.filter(tenant=tenant, email=email).exists()
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': email,
'details': f"{row.get(mapping.get('first_name', '')) or ''} {row.get(mapping.get('last_name', '')) or ''}".strip()
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("..")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOLUNTEER_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
failed_rows = []
for row in reader:
try:
email = row.get(mapping.get('email'))
if not email:
row["Import Error"] = "Missing email"
failed_rows.append(row)
errors += 1
continue
volunteer_data = {}
for field_name, csv_col in mapping.items():
if csv_col:
val = row.get(csv_col)
if val is not None and str(val).strip() != '':
if field_name == 'email': continue
volunteer_data[field_name] = val
Volunteer.objects.update_or_create(
tenant=tenant,
email=email,
defaults=volunteer_data
)
count += 1
except Exception as e:
logger.error(f"Error importing volunteer: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} volunteers.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows
request.session.modified = True
if errors > 0:
error_url = reverse("admin:volunteer-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = VolunteerImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Volunteer Fields",
'headers': headers,
'model_fields': VOLUNTEER_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = VolunteerImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Volunteers"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(VolunteerEvent)
class VolunteerEventAdmin(admin.ModelAdmin):
list_display = ('volunteer', 'event', 'role')
list_filter = ('event__tenant', 'event', 'role')
@admin.register(EventParticipation)
class EventParticipationAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('voter', 'event', 'participation_status')
list_filter = ('event__tenant', 'event', 'participation_status')
change_list_template = "admin/eventparticipation_change_list.html"
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='eventparticipation-download-errors'),
path('import-event-participations/', self.admin_site.admin_view(self.import_event_participations), name='import-event-participations'),
]
return my_urls + urls
def import_event_participations(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in EVENT_PARTICIPATION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
voter_id = row.get(mapping.get('voter_id'))
event_id = row.get(mapping.get('event_id'))
event_date = row.get(mapping.get('event_date'))
event_type_name = row.get(mapping.get('event_type'))
exists = False
if voter_id:
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
if event_id:
exists = EventParticipation.objects.filter(voter=voter, event_id=event_id).exists()
elif event_date and event_type_name:
exists = EventParticipation.objects.filter(voter=voter, event__date=event_date, event__event_type__name=event_type_name).exists()
except Voter.DoesNotExist:
pass
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"Voter: {voter_id}",
'details': f"Participation: {row.get(mapping.get('participation_status', '')) or ''}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("..")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in EVENT_PARTICIPATION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
failed_rows = []
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None
participation_status_val = row.get(mapping.get('participation_status')) if mapping.get('participation_status') else None
if not voter_id:
row["Import Error"] = "Missing voter ID"
failed_rows.append(row)
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
error_msg = f"Voter with ID {voter_id} not found"
logger.error(error_msg)
row["Import Error"] = error_msg
failed_rows.append(row)
errors += 1
continue
event = None
event_id = row.get(mapping.get('event_id')) if mapping.get('event_id') else None
if event_id:
try:
event = Event.objects.get(id=event_id, tenant=tenant)
except Event.DoesNotExist:
pass
if not event:
event_date = row.get(mapping.get('event_date')) if mapping.get('event_date') else None
event_type_name = row.get(mapping.get('event_type')) if mapping.get('event_type') else None
if event_date and event_type_name:
try:
event_type = EventType.objects.get(tenant=tenant, name=event_type_name)
event = Event.objects.get(tenant=tenant, date=event_date, event_type=event_type)
except (EventType.DoesNotExist, Event.DoesNotExist):
pass
if not event:
error_msg = "Event not found (check ID, date, or type)"
logger.error(error_msg)
row["Import Error"] = error_msg
failed_rows.append(row)
errors += 1
continue
defaults = {}
if participation_status_val and participation_status_val.strip():
if participation_status_val in dict(EventParticipation.PARTICIPATION_TYPE_CHOICES):
defaults['participation_status'] = participation_status_val
else:
defaults['participation_status'] = 'invited'
EventParticipation.objects.update_or_create(
event=event,
voter=voter,
defaults=defaults
)
count += 1
except Exception as e:
logger.error(f"Error importing: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} participations.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows
request.session.modified = True
logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}")
if errors > 0:
error_url = reverse("admin:eventparticipation-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = EventParticipationImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Participation Fields",
'headers': headers,
'model_fields': EVENT_PARTICIPATION_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = EventParticipationImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Participations"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(Donation)
class DonationAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('id', 'voter', 'date', 'amount', 'method')
list_filter = ('voter__tenant', 'date', 'method')
search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id')
change_list_template = "admin/donation_change_list.html"
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='donation-download-errors'),
path('import-donations/', self.admin_site.admin_view(self.import_donations), name='import-donations'),
]
return my_urls + urls
def import_donations(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in DONATION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
voter_id = row.get(mapping.get('voter_id'))
date = row.get(mapping.get('date'))
amount = row.get(mapping.get('amount'))
exists = False
if voter_id and date and amount:
exists = Donation.objects.filter(voter__tenant=tenant, voter__voter_id=voter_id, date=date, amount=amount).exists()
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"Voter: {voter_id}",
'details': f"Date: {date}, Amount: {amount}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("..")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in DONATION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
failed_rows = []
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None
if not voter_id:
row["Import Error"] = "Missing voter ID"
failed_rows.append(row)
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
row["Import Error"] = f"Voter {voter_id} not found"
failed_rows.append(row)
errors += 1
continue
date = row.get(mapping.get('date'))
amount = row.get(mapping.get('amount'))
method_name = row.get(mapping.get('method'))
if not date or not amount:
row["Import Error"] = "Missing date or amount"
failed_rows.append(row)
errors += 1
continue
method = None
if method_name and method_name.strip():
method, _ = DonationMethod.objects.get_or_create(
tenant=tenant,
name=method_name
)
defaults = {}
if method:
defaults['method'] = method
Donation.objects.update_or_create(
voter=voter,
date=date,
amount=amount,
defaults=defaults
)
count += 1
except Exception as e:
logger.error(f"Error importing: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} donations.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows
request.session.modified = True
logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}")
if errors > 0:
error_url = reverse("admin:donation-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = DonationImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Donation Fields",
'headers': headers,
'model_fields': DONATION_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = DonationImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Donations"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(Interaction)
class InteractionAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('id', 'voter', 'volunteer', 'type', 'date', 'description')
list_filter = ('voter__tenant', 'type', 'date', 'volunteer')
search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'description', 'volunteer__first_name', 'volunteer__last_name')
change_list_template = "admin/interaction_change_list.html"
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='interaction-download-errors'),
path('import-interactions/', self.admin_site.admin_view(self.import_interactions), name='import-interactions'),
]
return my_urls + urls
def import_interactions(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in INTERACTION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
voter_id = row.get(mapping.get('voter_id'))
date = row.get(mapping.get('date'))
exists = False
if voter_id and date:
exists = Interaction.objects.filter(voter__tenant=tenant, voter__voter_id=voter_id, date=date).exists()
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"Voter: {voter_id}",
'details': f"Date: {date}, Desc: {row.get(mapping.get('description', '')) or ''}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("..")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in INTERACTION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
failed_rows = []
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None
if not voter_id:
row["Import Error"] = "Missing voter ID"
failed_rows.append(row)
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
row["Import Error"] = f"Voter {voter_id} not found"
failed_rows.append(row)
errors += 1
continue
date = row.get(mapping.get('date'))
type_name = row.get(mapping.get('type'))
description = row.get(mapping.get('description'))
notes = row.get(mapping.get('notes'))
if not date or not description:
row["Import Error"] = "Missing date or description"
failed_rows.append(row)
errors += 1
continue
interaction_type = None
if type_name and type_name.strip():
interaction_type, _ = InteractionType.objects.get_or_create(
tenant=tenant,
name=type_name
)
defaults = {}
if interaction_type:
defaults['type'] = interaction_type
if description and description.strip():
defaults['description'] = description
if notes and notes.strip():
defaults['notes'] = notes
Interaction.objects.update_or_create(
voter=voter,
date=date,
defaults=defaults
)
count += 1
except Exception as e:
logger.error(f"Error importing: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} interactions.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows
request.session.modified = True
logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}")
if errors > 0:
error_url = reverse("admin:interaction-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = InteractionImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Interaction Fields",
'headers': headers,
'model_fields': INTERACTION_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = InteractionImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Interactions"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(VoterLikelihood)
class VoterLikelihoodAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('id', 'voter', 'election_type', 'likelihood')
list_filter = ('voter__tenant', 'election_type', 'likelihood')
search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id')
change_list_template = "admin/voterlikelihood_change_list.html"
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='voterlikelihood-download-errors'),
path('import-likelihoods/', self.admin_site.admin_view(self.import_likelihoods), name='import-likelihoods'),
]
return my_urls + urls
def import_likelihoods(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
voter_id = row.get(mapping.get('voter_id'))
election_type_name = row.get(mapping.get('election_type'))
exists = False
if voter_id and election_type_name:
exists = VoterLikelihood.objects.filter(voter__tenant=tenant, voter__voter_id=voter_id, election_type__name=election_type_name).exists()
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"Voter: {voter_id}",
'details': f"Election: {election_type_name}, Likelihood: {row.get(mapping.get('likelihood', '')) or ''}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("..")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
failed_rows = []
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None
if not voter_id:
row["Import Error"] = "Missing voter ID"
failed_rows.append(row)
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
row["Import Error"] = f"Voter {voter_id} not found"
failed_rows.append(row)
errors += 1
continue
election_type_name = row.get(mapping.get('election_type'))
likelihood_val = row.get(mapping.get('likelihood'))
if not election_type_name or not likelihood_val:
row["Import Error"] = "Missing election type or likelihood value"
failed_rows.append(row)
errors += 1
continue
election_type, _ = ElectionType.objects.get_or_create(
tenant=tenant,
name=election_type_name
)
# Normalize likelihood
likelihood_choices = dict(VoterLikelihood.LIKELIHOOD_CHOICES)
normalized_likelihood = None
likelihood_val_lower = likelihood_val.lower().replace(' ', '_')
if likelihood_val_lower in likelihood_choices:
normalized_likelihood = likelihood_val_lower
else:
# Try to find by display name
for k, v in likelihood_choices.items():
if v.lower() == likelihood_val.lower():
normalized_likelihood = k
break
if not normalized_likelihood:
row["Import Error"] = f"Invalid likelihood value: {likelihood_val}"
failed_rows.append(row)
errors += 1
continue
defaults = {}
if normalized_likelihood and normalized_likelihood.strip():
defaults['likelihood'] = normalized_likelihood
VoterLikelihood.objects.update_or_create(
voter=voter,
election_type=election_type,
defaults=defaults
)
count += 1
except Exception as e:
logger.error(f"Error importing: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} likelihoods.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows
request.session.modified = True
logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}")
if errors > 0:
error_url = reverse("admin:voterlikelihood-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = VoterLikelihoodImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Likelihood Fields",
'headers': headers,
'model_fields': VOTER_LIKELIHOOD_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = VoterLikelihoodImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Likelihoods"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(CampaignSettings)
class CampaignSettingsAdmin(admin.ModelAdmin):
list_display = ('tenant', 'donation_goal')
list_filter = ('tenant',)