from decimal import Decimal
from datetime import datetime, date
from django.db import transaction
from django.http import HttpResponse
from django.utils.safestring import mark_safe
import csv
import io
import logging
import tempfile
import os
from django.contrib import admin, messages
from django.urls import path, reverse
from django.shortcuts import render, redirect
from django.template.response import TemplateResponse
from .models import (
format_phone_number,
Tenant, TenantUserRole, InteractionType, DonationMethod, ElectionType, EventType, Voter,
VotingRecord, Event, EventParticipation, Donation, Interaction, VoterLikelihood, CampaignSettings,
Interest, Volunteer, VolunteerEvent, ParticipationStatus, VolunteerRole
)
from .forms import (
VoterImportForm, EventImportForm, EventParticipationImportForm,
DonationImportForm, InteractionImportForm, VoterLikelihoodImportForm,
VolunteerImportForm, VotingRecordImportForm
)
logger = logging.getLogger(__name__)
VOTER_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('first_name', 'First Name'),
('last_name', 'Last Name'),
('nickname', 'Nickname'),
('birthdate', 'Birthdate'),
('address_street', 'Street Address'),
('city', 'City'),
('state', 'State'),
('prior_state', 'Prior State'),
('zip_code', 'Zip Code'),
('county', 'County'),
('neighborhood', 'Neighborhood'),
('phone', 'Phone'),
('notes', 'Notes'),
('phone_type', 'Phone Type'),
('email', 'Email'),
('district', 'District'),
('precinct', 'Precinct'),
('registration_date', 'Registration Date'),
('is_targeted', 'Is Targeted'),
('candidate_support', 'Candidate Support'),
('yard_sign', 'Yard Sign'),
('window_sticker', 'Window Sticker'),
('latitude', 'Latitude'),
('longitude', 'Longitude'),
('secondary_phone', 'Secondary Phone'),
('secondary_phone_type', 'Secondary Phone Type'),
('door_visit', 'Door Visit'),
]
EVENT_MAPPABLE_FIELDS = [
('name', 'Name'),
('date', 'Date'),
('start_time', 'Start Time'),
('end_time', 'End Time'),
('event_type', 'Event Type (Name)'),
('description', 'Description'),
('location_name', 'Location Name'),
('address', 'Address'),
('city', 'City'),
('state', 'State'),
('zip_code', 'Zip Code'),
('latitude', 'Latitude'),
('longitude', 'Longitude'),
]
EVENT_PARTICIPATION_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('first_name', 'First Name'),
('last_name', 'Last Name'),
('event_name', 'Event Name'),
('participation_status', 'Participation Status'),
]
DONATION_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('date', 'Date'),
('amount', 'Amount'),
('method', 'Donation Method (Name)'),
]
INTERACTION_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('volunteer_email', 'Volunteer Email'),
('date', 'Date'),
('type', 'Interaction Type (Name)'),
('description', 'Description'),
('notes', 'Notes'),
]
VOLUNTEER_MAPPABLE_FIELDS = [
('first_name', 'First Name'),
('last_name', 'Last Name'),
('email', 'Email'),
('phone', 'Phone'),
('notes', 'Notes'),
]
VOTER_LIKELIHOOD_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('election_type', 'Election Type (Name)'),
('likelihood', 'Likelihood'),
]
VOTING_RECORD_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('election_date', 'Election Date'),
('election_description', 'Election Description'),
('primary_party', 'Primary Party'),
]
class BaseImportAdminMixin:
def download_errors(self, request):
logger.info(f"download_errors called for {self.model._meta.model_name}")
session_key = f"{self.model._meta.model_name}_import_errors"
failed_rows = request.session.get(session_key, [])
if not failed_rows:
self.message_user(request, "No error log found in session.", level=messages.WARNING)
return redirect("../")
response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = f"attachment; filename={self.model._meta.model_name}_import_errors.csv"
if failed_rows:
all_keys = set()
for r in failed_rows:
all_keys.update(r.keys())
writer = csv.DictWriter(response, fieldnames=sorted(list(all_keys)))
writer.writeheader()
writer.writerows(failed_rows)
return response
def chunk_reader(self, reader, size):
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) == size:
yield chunk
chunk = []
if chunk:
yield chunk
class TenantUserRoleInline(admin.TabularInline):
model = TenantUserRole
extra = 1
class CampaignSettingsInline(admin.StackedInline):
model = CampaignSettings
can_delete = False
@admin.register(Tenant)
class TenantAdmin(admin.ModelAdmin):
list_display = ('name', 'created_at')
search_fields = ('name',)
inlines = [TenantUserRoleInline, CampaignSettingsInline]
@admin.register(TenantUserRole)
class TenantUserRoleAdmin(admin.ModelAdmin):
list_display = ('user', 'tenant', 'role')
list_filter = ('tenant', 'role')
search_fields = ('user__username', 'tenant__name')
@admin.register(InteractionType)
class InteractionTypeAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
@admin.register(DonationMethod)
class DonationMethodAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
@admin.register(VolunteerRole)
class VolunteerRoleAdmin(admin.ModelAdmin):
list_display = ("name", "tenant", "is_active")
list_filter = ("tenant", "is_active")
search_fields = ("name",)
@admin.register(ElectionType)
class ElectionTypeAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
@admin.register(EventType)
class EventTypeAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active', 'default_volunteer_role')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
filter_horizontal = ('available_roles',)
@admin.register(ParticipationStatus)
class ParticipationStatusAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
change_list_template = 'admin/participationstatus_change_list.html'
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context['tenants'] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
@admin.register(Interest)
class InterestAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant')
list_filter = ('tenant',)
fields = ('tenant', 'name')
search_fields = ('name',)
class VotingRecordInline(admin.TabularInline):
model = VotingRecord
extra = 1
class DonationInline(admin.TabularInline):
model = Donation
extra = 1
class InteractionInline(admin.TabularInline):
model = Interaction
extra = 1
autocomplete_fields = ['voter', 'type', 'volunteer']
class VoterLikelihoodInline(admin.TabularInline):
model = VoterLikelihood
extra = 1
class VolunteerEventInline(admin.TabularInline):
model = VolunteerEvent
extra = 1
@admin.register(Voter)
class VoterAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('first_name', 'last_name', 'nickname', 'voter_id', 'tenant', 'district', 'candidate_support', 'is_targeted', 'city', 'state', 'prior_state')
list_filter = ('tenant', 'candidate_support', 'is_targeted', 'phone_type', 'yard_sign', 'district', 'city', 'state', 'prior_state')
search_fields = ('first_name', 'last_name', 'nickname', 'voter_id', 'address', 'city', 'state', 'prior_state', 'zip_code', 'county')
inlines = [VotingRecordInline, DonationInline, InteractionInline, VoterLikelihoodInline]
readonly_fields = ('address',)
change_list_template = "admin/voter_change_list.html"
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context["tenants"] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='voter-download-errors'),
path('import-voters/', self.admin_site.admin_view(self.import_voters), name='import-voters'),
]
return my_urls + urls
def import_voters(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get("file_path")
tenant_id = request.POST.get("tenant")
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f"map_{field_name}")
try:
with open(file_path, "r", encoding="utf-8-sig") as f:
# Optimization: Skip full count for very large files in preview if needed,
# but here we'll keep it for accuracy unless it's a known bottleneck.
# For now, let's just do a fast line count.
total_count = sum(1 for line in f) - 1
f.seek(0)
reader = csv.DictReader(f)
preview_rows = []
voter_ids_for_preview = []
for i, row in enumerate(reader):
if i < 10:
preview_rows.append(row)
v_id = row.get(mapping.get("voter_id"))
if v_id:
voter_ids_for_preview.append(v_id.strip())
else:
break
existing_preview_ids = set(Voter.objects.filter(tenant=tenant, voter_id__in=voter_ids_for_preview).values_list("voter_id", flat=True))
create_count = 0
update_count = 0
for row in preview_rows:
voter_id_val = row.get(mapping.get("voter_id"))
if voter_id_val and voter_id_val.strip() in existing_preview_ids:
update_count += 1
else:
create_count += 1
context = self.admin_site.each_context(request)
context.update({
"title": "Import Preview",
"total_count": total_count,
"create_count": create_count,
"update_count": update_count,
"preview_data": preview_rows, # This should be improved to show actual changes
"mapping": mapping,
"file_path": file_path,
"tenant_id": tenant_id,
"action_url": request.path,
"opts": self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("../")
elif "_import" in request.POST:
file_path = request.POST.get("file_path")
tenant_id = request.POST.get("tenant")
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f"map_{field_name}")
try:
created_count = 0
updated_count = 0
skipped_no_change = 0
skipped_no_id = 0
errors = 0
failed_rows = []
total_processed = 0
# Temporary storage for error rows to avoid holding large file in memory
temp_error_file = None
temp_error_file_path = None
# Process in chunks to reduce memory usage for very large files
with open(file_path, "r", encoding="utf-8-sig") as f_read:
reader = csv.DictReader(f_read)
for i, row in enumerate(reader):
total_processed += 1
try:
raw_voter_id = row.get(mapping.get("voter_id"))
voter_id = raw_voter_id.strip() if raw_voter_id else None
if not voter_id:
# Enhanced error message to guide the user
mapped_column_name = mapping.get("voter_id", "N/A")
error_detail = f"Raw value: '{raw_voter_id}'. " if raw_voter_id is not None else "Value was None."
row["Import Error"] = f"Voter ID is required. Please check if the '{mapped_column_name}' column is correctly mapped and contains values for all rows. {error_detail}"
failed_rows.append(row)
skipped_no_id += 1
errors += 1
continue
defaults = {}
# Map other fields dynamically
for field_name, _ in VOTER_MAPPABLE_FIELDS:
csv_column = mapping.get(field_name)
if csv_column and csv_column in row:
field_value = row[csv_column].strip()
if field_name == "birthdate" or field_name == "registration_date":
# Handle date conversions
if field_value:
try:
# Attempt to parse common date formats
if '/' in field_value:
# Try MM/DD/YYYY or DD/MM/YYYY
if len(field_value.split('/')[2]) == 2: # YY format
dt = datetime.strptime(field_value, '%m/%d/%y').date() if len(field_value.split('/')[0]) < 3 else datetime.strptime(field_value, '%d/%m/%y').date() # noqa
else:
dt = datetime.strptime(field_value, '%m/%d/%Y').date() if len(field_value.split('/')[0]) < 3 else datetime.strptime(field_value, '%d/%m/%Y').date() # noqa
elif '-' in field_value:
# Try YYYY-MM-DD or DD-MM-YYYY or MM-DD-YYYY
if len(field_value.split('-')[0]) == 4: # YYYY format
dt = datetime.strptime(field_value, '%Y-%m-%d').date()
elif len(field_value.split('-')[2]) == 4: # YYYY format
dt = datetime.strptime(field_value, '%m-%d-%Y').date() if len(field_value.split('-')[0]) < 3 else datetime.strptime(field_value, '%d-%m-%Y').date() # noqa
else:
# Default to MM-DD-YY
dt = datetime.strptime(field_value, '%m-%d-%y').date()
else:
dt = None
if dt:
defaults[field_name] = dt
else:
logger.warning(f"Could not parse date '{field_value}' for field {field_name}. Skipping.")
except ValueError as ve:
logger.warning(f"Date parsing error for '{field_value}' in field {field_name}: {ve}")
except Exception as ex:
logger.error(f"Unexpected error parsing date '{field_value}' for field {field_name}: {ex}")
elif field_name == "is_targeted" or field_name == "yard_sign" or field_name == "window_sticker" or field_name == "door_visit":
# Handle boolean fields
if field_value.lower() == 'true' or field_value == '1':
defaults[field_name] = True
elif field_value.lower() == 'false' or field_value == '0':
defaults[field_name] = False
else:
defaults[field_name] = None # Or sensible default/error
elif field_name == "phone":
defaults[field_name] = format_phone_number(field_value)
elif field_name == "email":
defaults[field_name] = field_value.lower() # Store emails as lowercase
elif field_name == "candidate_support":
if field_value in [choice[0] for choice in Voter.CANDIDATE_SUPPORT_CHOICES]:
defaults[field_name] = field_value
else:
logger.warning(f"Invalid candidate_support value: {field_value}. Skipping.")
elif field_name == "phone_type":
if field_value in [choice[0] for choice in Voter.PHONE_TYPE_CHOICES]:
defaults[field_name] = field_value
else:
logger.warning(f"Invalid phone_type value: {field_value}. Skipping.")
elif field_name == "secondary_phone_type":
if field_value in [choice[0] for choice in Voter.PHONE_TYPE_CHOICES]:
defaults[field_name] = field_value
else:
logger.warning(f"Invalid secondary_phone_type value: {field_value}. Skipping.")
elif field_name == "state" or field_name == "prior_state":
# Ensure state is uppercase and valid length
if field_value and len(field_value) <= 2:
defaults[field_name] = field_value.upper()
else:
logger.warning(f"Invalid state value: {field_value}. Skipping.")
else:
defaults[field_name] = field_value
# Try to get voter. If not found, create new. Update if found.
voter, created = Voter.objects.update_or_create(
tenant=tenant,
voter_id=voter_id,
defaults=defaults
)
if created:
created_count += 1
else:
updated_count += 1
# Special handling for interests - assuming a comma-separated list in CSV
if 'interests' in mapping and row.get(mapping['interests']):
interest_names = [name.strip() for name in row[mapping['interests']].split(',') if name.strip()]
for interest_name in interest_names:
interest, _ = Interest.objects.get_or_create(tenant=tenant, name=interest_name)
voter.interests.add(interest)
if (i + 1) % 100 == 0:
print(f"DEBUG: Voter import progress: {total_processed} processed. {created_count} created. {updated_count} updated.")
except Exception as e:
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
logger.error(f"Error importing row: {row}. Error: {e}")
# Clean up the temporary file
if os.path.exists(file_path):
os.remove(file_path)
if temp_error_file_path and os.path.exists(temp_error_file_path):
os.remove(temp_error_file_path)
self.message_user(request, f"Import complete: {created_count + updated_count} voters created/updated. ({created_count} new, {updated_count} updated, {skipped_no_change} skipped with no changes, {skipped_no_id} skipped missing ID, {errors} errors)")
# Store failed rows in session for download, limit to avoid session overflow
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000]
request.session.modified = True
if errors > 0:
error_url = reverse("admin:voter-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING)
return redirect("../")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("../")
else:
form = VoterImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("../")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
"title": "Map Voter Fields",
"headers": headers,
"model_fields": VOTER_MAPPABLE_FIELDS,
"tenant_id": tenant.id,
"file_path": file_path,
"action_url": request.path,
"opts": self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = VoterImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Voters"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(Event)
class EventAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('name', 'date', 'event_type', 'tenant', 'location_name', 'address', 'city', 'state', 'zip_code')
list_filter = ('tenant', 'event_type')
search_fields = ('name', 'location_name', 'address', 'city', 'state', 'zip_code')
inlines = [VolunteerEventInline]
change_list_template = "admin/event_change_list.html"
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context['tenants'] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='event-download-errors'),
path('import-events/', self.admin_site.admin_view(self.import_events), name='import-events'),
]
return my_urls + urls
def import_events(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in EVENT_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
event_name = row.get(mapping.get('name'))
event_date = row.get(mapping.get('date'))
exists = False
if event_name and event_date:
try:
# Assuming name and date uniquely identify an event
# This might need refinement based on actual data uniqueness requirements
if '/' in event_date:
dt = datetime.strptime(event_date, '%m/%d/%Y').date()
elif '-' in event_date:
dt = datetime.strptime(event_date, '%Y-%m-%d').date()
else:
dt = None
if dt:
exists = Event.objects.filter(tenant=tenant, name=event_name, date=dt).exists()
except ValueError:
# Handle cases where date parsing fails
pass
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"Event: {event_name} (Date: {event_date})",
'details': f"Location: {row.get(mapping.get('location_name', '')) or ''}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("../")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in EVENT_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
count = 0
errors = 0
failed_rows = []
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
try:
event_name = row.get(mapping.get('name'))
event_date = row.get(mapping.get('date'))
event_type_name = row.get(mapping.get('event_type'))
if not event_name or not event_date or not event_type_name:
row["Import Error"] = "Missing event name, date, or type"
failed_rows.append(row)
errors += 1
continue
# Date parsing for event_date
try:
if '/' in event_date:
parsed_date = datetime.strptime(event_date, '%m/%d/%Y').date()
elif '-' in event_date:
parsed_date = datetime.strptime(event_date, '%Y-%m-%d').date()
else:
row["Import Error"] = "Invalid date format"
failed_rows.append(row)
errors += 1
continue
except ValueError:
row["Import Error"] = "Invalid date format"
failed_rows.append(row)
errors += 1
continue
event_type_obj, _ = EventType.objects.get_or_create(tenant=tenant, name=event_type_name)
defaults = {
'date': parsed_date,
'event_type': event_type_obj,
'description': row.get(mapping.get('description')) or '',
'location_name': row.get(mapping.get('location_name')) or '',
'address': row.get(mapping.get('address')) or '',
'city': row.get(mapping.get('city')) or '',
'state': row.get(mapping.get('state')) or '',
'zip_code': row.get(mapping.get('zip_code')) or '',
'latitude': row.get(mapping.get('latitude')) or None,
'longitude': row.get(mapping.get('longitude')) or None,
}
# Handle start_time and end_time
start_time_str = row.get(mapping.get('start_time'))
if start_time_str:
try:
defaults['start_time'] = datetime.strptime(start_time_str, '%H:%M').time()
except ValueError:
logger.warning(f"Invalid start_time format: {start_time_str}. Skipping.")
end_time_str = row.get(mapping.get('end_time'))
if end_time_str:
try:
defaults['end_time'] = datetime.strptime(end_time_str, '%H:%M').time()
except ValueError:
logger.warning(f"Invalid end_time format: {end_time_str}. Skipping.")
Event.objects.update_or_create(
tenant=tenant,
name=event_name,
defaults=defaults
)
count += 1
except Exception as e:
logger.error(f"Error importing: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} events.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000]
request.session.modified = True
if errors > 0:
error_url = reverse("admin:event-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING)
return redirect("../")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("../")
else:
form = EventImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("../")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Event Fields",
'headers': headers,
'model_fields': EVENT_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = EventImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Events"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(Volunteer)
class VolunteerAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('first_name', 'last_name', 'email', 'phone', 'tenant')
list_filter = ('tenant',)
search_fields = ('first_name', 'last_name', 'email', 'phone')
change_list_template = "admin/volunteer_change_list.html"
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context['tenants'] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='volunteer-download-errors'),
path('import-volunteers/', self.admin_site.admin_view(self.import_volunteers), name='import-volunteers'),
]
return my_urls + urls
def import_volunteers(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOLUNTEER_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
email = row.get(mapping.get('email'))
exists = False
if email:
exists = Volunteer.objects.filter(tenant=tenant, email=email).exists()
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"Volunteer: {email}",
'details': f"Name: {row.get(mapping.get('first_name', '')) or ''} {row.get(mapping.get('last_name', '')) or ''}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("../")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOLUNTEER_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
count = 0
errors = 0
failed_rows = []
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
try:
email = row.get(mapping.get('email'))
if not email:
row["Import Error"] = "Missing email"
failed_rows.append(row)
errors += 1
continue
defaults = {
'first_name': row.get(mapping.get('first_name')) or '',
'last_name': row.get(mapping.get('last_name')) or '',
'phone': format_phone_number(row.get(mapping.get('phone')) or ''),
'notes': row.get(mapping.get('notes')) or '',
}
Volunteer.objects.update_or_create(
tenant=tenant,
email=email,
defaults=defaults
)
count += 1
except Exception as e:
logger.error(f"Error importing volunteer: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} volunteers.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000]
request.session.modified = True
if errors > 0:
error_url = reverse("admin:volunteer-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING)
return redirect("../")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("../")
else:
form = VolunteerImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("../")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Volunteer Fields",
'headers': headers,
'model_fields': VOLUNTEER_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = VolunteerImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Volunteers"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(EventParticipation)
class EventParticipationAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('event', 'voter', 'participation_status')
list_filter = ('event', 'participation_status', 'voter__tenant')
search_fields = ('event__name', 'voter__first_name', 'voter__last_name', 'voter__voter_id')
change_list_template = 'admin/eventparticipation_change_list.html'
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context['tenants'] = Tenant.objects.all()
return super().changelist_list(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='eventparticipation-download-errors'),
path('import-event-participations/', self.admin_site.admin_view(self.import_event_participations), name='import-event-participations'),
]
return my_urls + urls
def import_event_participations(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in EVENT_PARTICIPATION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
voter_id = row.get(mapping.get('voter_id'))
# Extract first_name and last_name from CSV based on mapping
csv_first_name = row.get(mapping.get('first_name'), '')
csv_last_name = row.get(mapping.get('last_name'), '')
csv_full_name = f"{csv_first_name} {csv_last_name}".strip()
exists = False
voter_full_name = "N/A" # Initialize voter_full_name
if voter_id:
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
voter_full_name = f"{voter.first_name} {voter.last_name}" # Get voter's full name
if event_name:
exists = EventParticipation.objects.filter(voter=voter, event__name=event_name).exists()
except Voter.DoesNotExist:
pass
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'csv_full_name': csv_full_name, # Add CSV name
'identifier': f"Voter: {voter_full_name} (ID: {voter_id})" if voter_id else "N/A", # Include full name
'details': f"Participation: {row.get(mapping.get('participation_status', '')) or ''}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("../")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in EVENT_PARTICIPATION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
failed_rows = []
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None
participation_status_val = row.get(mapping.get('participation_status')) if mapping.get('participation_status') else None
if voter_id: # Only strip if voter_id is not None
voter_id = voter_id.strip()
if not voter_id:
row["Import Error"] = "Missing voter ID"
failed_rows.append(row)
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
error_msg = f"Voter with ID {voter_id} not found"
logger.error(error_msg)
row["Import Error"] = error_msg
failed_rows.append(row)
errors += 1
continue
event = None
event_name = row.get(mapping.get('event_name')) if mapping.get('event_name') else None
if event_name:
try:
event = Event.objects.get(tenant=tenant, name=event_name)
except Event.DoesNotExist:
pass
if not event:
error_msg = "Event not found (check Event Name)"
logger.error(error_msg)
row["Import Error"] = error_msg
failed_rows.append(row)
errors += 1
continue
defaults = {}
if participation_status_val and participation_status_val.strip():
status_obj, _ = ParticipationStatus.objects.get_or_create(tenant=tenant, name=participation_status_val.strip())
defaults['participation_status'] = status_obj
else:
# Default to 'Invited' if not specified
status_obj, _ = ParticipationStatus.objects.get_or_create(tenant=tenant, name='Invited')
defaults['participation_status'] = status_obj
EventParticipation.objects.update_or_create(
event=event,
voter=voter,
defaults=defaults
)
count += 1
except Exception as e:
logger.error(f"Error importing: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} participations.")
# Optimization: Limit error log size in session to avoid overflow
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000]
request.session.modified = True
logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}")
if errors > 0:
error_url = reverse("admin:eventparticipation-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING)
return redirect("../")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("../")
else:
form = EventParticipationImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("../")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Participation Fields",
'headers': headers,
'model_fields': EVENT_PARTICIPATION_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = EventParticipationImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Participations"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(Donation)
class DonationAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('voter', 'date', 'amount', 'method')
list_filter = ('voter__tenant', 'method')
search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'method__name')
change_list_template = 'admin/donation_change_list.html'
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context['tenants'] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='donation-download-errors'),
path('import-donations/', self.admin_site.admin_view(self.import_donations), name='import-donations'),
]
return my_urls + urls
def import_donations(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in DONATION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
voter_id = row.get(mapping.get('voter_id'))
exists = False
if voter_id:
exists = Voter.objects.filter(tenant=tenant, voter_id=voter_id).exists()
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"Voter ID: {voter_id}",
'details': f"Amount: {row.get(mapping.get('amount', '')) or ''}, Method: {row.get(mapping.get('method', '')) or ''}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("../")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in DONATION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
count = 0
errors = 0
failed_rows = []
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id'))
date_str = row.get(mapping.get('date'))
amount_str = row.get(mapping.get('amount'))
method_name = row.get(mapping.get('method'))
if voter_id: # Only strip if voter_id is not None
voter_id = voter_id.strip()
if not voter_id:
row["Import Error"] = "Missing voter ID"
failed_rows.append(row)
errors += 1
continue
if not date_str or not amount_str:
row["Import Error"] = "Missing date or amount"
failed_rows.append(row)
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
row["Import Error"] = f"Voter {voter_id} not found"
failed_rows.append(row)
errors += 1
continue
try:
if '/' in date_str:
parsed_date = datetime.strptime(date_str, '%m/%d/%Y').date()
elif '-' in date_str:
parsed_date = datetime.strptime(date_str, '%Y-%m-%d').date()
else:
row["Import Error"] = "Invalid date format"
failed_rows.append(row)
errors += 1
continue
except ValueError:
row["Import Error"] = "Invalid date format"
failed_rows.append(row)
errors += 1
continue
try:
amount = Decimal(amount_str)
except InvalidOperation:
row["Import Error"] = "Invalid amount format"
failed_rows.append(row)
errors += 1
continue
donation_method, _ = DonationMethod.objects.get_or_create(tenant=tenant, name=method_name)
Donation.objects.create(
voter=voter,
date=parsed_date,
amount=amount,
method=donation_method
)
count += 1
except Exception as e:
logger.error(f"Error importing: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} donations.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000]
request.session.modified = True
if errors > 0:
error_url = reverse("admin:donation-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING)
return redirect("../")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("../")
else:
form = DonationImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("../")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Donation Fields",
'headers': headers,
'model_fields': DONATION_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = DonationImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Donations"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(Interaction)
class InteractionAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('voter', 'date', 'type', 'description', 'volunteer')
list_filter = ('voter__tenant', 'type', 'volunteer')
search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'description', 'volunteer__first_name', 'volunteer__last_name')
autocomplete_fields = ['voter', 'volunteer']
change_list_template = 'admin/interaction_change_list.html'
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context['tenants'] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='interaction-download-errors'),
path('import-interactions/', self.admin_site.admin_view(self.import_interactions), name='import-interactions'),
]
return my_urls + urls
def import_interactions(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in INTERACTION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
voter_id = row.get(mapping.get('voter_id'))
volunteer_email = row.get(mapping.get('volunteer_email'))
exists = False
if voter_id:
exists = Voter.objects.filter(tenant=tenant, voter_id=voter_id).exists()
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"Voter ID: {voter_id}",
'details': f"Type: {row.get(mapping.get('type', '')) or ''}, Volunteer: {volunteer_email or ''}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("../")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in INTERACTION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
count = 0
errors = 0
failed_rows = []
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id'))
volunteer_email = row.get(mapping.get('volunteer_email'))
date_str = row.get(mapping.get('date'))
type_name = row.get(mapping.get('type'))
if voter_id: # Only strip if voter_id is not None
voter_id = voter_id.strip()
if not voter_id:
row["Import Error"] = "Missing voter ID"
failed_rows.append(row)
errors += 1
continue
if not date_str or not type_name:
row["Import Error"] = "Missing date or description"
failed_rows.append(row)
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
row["Import Error"] = f"Voter {voter_id} not found"
failed_rows.append(row)
errors += 1
continue
volunteer = None
if volunteer_email:
try:
volunteer = Volunteer.objects.get(tenant=tenant, email=volunteer_email)
except Volunteer.DoesNotExist:
pass # Volunteer is optional
try:
if '/' in date_str:
parsed_date = datetime.strptime(date_str, '%m/%d/%Y').date()
elif '-' in date_str:
parsed_date = datetime.strptime(date_str, '%Y-%m-%d').date()
else:
row["Import Error"] = "Invalid date format"
failed_rows.append(row)
errors += 1
continue
except ValueError:
row["Import Error"] = "Invalid date format"
failed_rows.append(row)
errors += 1
continue
interaction_type, _ = InteractionType.objects.get_or_create(tenant=tenant, name=type_name)
Interaction.objects.create(
voter=voter,
volunteer=volunteer,
date=parsed_date,
type=interaction_type,
description=row.get(mapping.get('description')) or '',
notes=row.get(mapping.get('notes')) or ''
)
count += 1
except Exception as e:
logger.error(f"Error importing: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} interactions.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000]
request.session.modified = True
if errors > 0:
error_url = reverse("admin:interaction-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING)
return redirect("../")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("../")
else:
form = InteractionImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("../")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Interaction Fields",
'headers': headers,
'model_fields': INTERACTION_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = InteractionImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Interactions"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(VoterLikelihood)
class VoterLikelihoodAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('voter', 'election_type', 'likelihood')
list_filter = ('voter__tenant', 'election_type', 'likelihood')
search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'election_type__name')
change_list_template = 'admin/voterlikelihood_change_list.html'
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context['tenants'] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='voterlikelihood-download-errors'),
path('import-likelihoods/', self.admin_site.admin_view(self.import_likelihoods), name='import-likelihoods'),
]
return my_urls + urls
def import_likelihoods(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
voter_id = row.get(mapping.get('voter_id'))
exists = False
if voter_id:
exists = Voter.objects.filter(tenant=tenant, voter_id=voter_id).exists()
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"Voter ID: {voter_id}",
'details': f"Likelihood: {row.get(mapping.get('likelihood', '')) or ''}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("../")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
count = 0
errors = 0
failed_rows = []
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id'))
election_type_name = row.get(mapping.get('election_type'))
likelihood_val = row.get(mapping.get('likelihood'))
if voter_id: # Only strip if voter_id is not None
voter_id = voter_id.strip()
if not voter_id:
row["Import Error"] = "Missing voter ID"
failed_rows.append(row)
errors += 1
continue
if not election_type_name or not likelihood_val:
row["Import Error"] = "Missing election type or likelihood"
failed_rows.append(row)
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
row["Import Error"] = f"Voter {voter_id} not found"
failed_rows.append(row)
errors += 1
continue
election_type, _ = ElectionType.objects.get_or_create(tenant=tenant, name=election_type_name)
VoterLikelihood.objects.update_or_create(
voter=voter,
election_type=election_type,
defaults={'likelihood': likelihood_val}
)
count += 1
except Exception as e:
print(f"DEBUG: Likelihood import failed: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Import complete: {count} likelihoods created/updated.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000]
request.session.modified = True
if errors > 0:
error_url = reverse("admin:voterlikelihood-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING)
return redirect("../")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("../")
else:
form = VoterLikelihoodImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("../")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Likelihood Fields",
'headers': headers,
'model_fields': VOTER_LIKELIHOOD_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = VoterLikelihoodImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Likelihoods"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(VotingRecord)
class VotingRecordAdmin(BaseImportAdminMixin, admin.ModelAdmin):
list_display = ('voter', 'election_date', 'election_description', 'primary_party')
list_filter = ('voter__tenant', 'primary_party')
search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'election_description')
change_list_template = 'admin/votingrecord_change_list.html'
def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
from core.models import Tenant
extra_context['tenants'] = Tenant.objects.all()
return super().changelist_view(request, extra_context=extra_context)
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('download-errors/', self.admin_site.admin_view(self.download_errors), name='votingrecord-download-errors'),
path('import-voting-records/', self.admin_site.admin_view(self.import_voting_records), name='import-voting-records'),
]
return my_urls + urls
def import_voting_records(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTING_RECORD_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
total_count = 0
create_count = 0
update_count = 0
preview_data = []
for row in reader:
total_count += 1
voter_id = row.get(mapping.get('voter_id'))
election_date = row.get(mapping.get('election_date'))
exists = False
if voter_id and election_date:
try:
# Assuming voter_id and election_date uniquely identify a voting record
# This might need refinement based on actual data uniqueness requirements
if '/' in election_date:
dt = datetime.strptime(election_date, '%m/%d/%Y').date()
elif '-' in election_date:
dt = datetime.strptime(election_date, '%Y-%m-%d').date()
else:
dt = None
if dt:
exists = VotingRecord.objects.filter(voter__tenant=tenant, voter__voter_id=voter_id, election_date=dt).exists()
except ValueError:
# Handle cases where date parsing fails
pass
if exists:
update_count += 1
action = 'update'
else:
create_count += 1
action = 'create'
if len(preview_data) < 10:
preview_data.append({
'action': action,
'identifier': f"Voter ID: {voter_id} (Election: {election_date})",
'details': f"Party: {row.get(mapping.get('primary_party', '')) or ''}"
})
context = self.admin_site.each_context(request)
context.update({
'title': "Import Preview",
'total_count': total_count,
'create_count': create_count,
'update_count': update_count,
'preview_data': preview_data,
'mapping': mapping,
'file_path': file_path,
'tenant_id': tenant_id,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("../")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTING_RECORD_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
count = 0
errors = 0
failed_rows = []
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id'))
election_date_str = row.get(mapping.get('election_date'))
election_description = row.get(mapping.get('election_description'))
primary_party = row.get(mapping.get('primary_party'))
if voter_id: # Only strip if voter_id is not None
voter_id = voter_id.strip()
if not voter_id:
row["Import Error"] = "Missing voter ID"
failed_rows.append(row)
errors += 1
continue
if not election_date_str or not election_description:
row["Import Error"] = "Missing election date or description"
failed_rows.append(row)
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
row["Import Error"] = f"Voter {voter_id} not found"
failed_rows.append(row)
errors += 1
continue
try:
if '/' in election_date_str:
parsed_election_date = datetime.strptime(election_date_str, '%m/%d/%Y').date()
elif '-' in election_date_str:
parsed_election_date = datetime.strptime(election_date_str, '%Y-%m-%d').date()
else:
row["Import Error"] = "Invalid date format"
failed_rows.append(row)
errors += 1
continue
except ValueError:
row["Import Error"] = "Invalid date format"
failed_rows.append(row)
errors += 1
continue
VotingRecord.objects.update_or_create(
voter=voter,
election_date=parsed_election_date,
defaults={
'election_description': election_description,
'primary_party': primary_party or ''
}
)
count += 1
except Exception as e:
logger.error(f"Error importing: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} voting records.")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000]
request.session.modified = True
if errors > 0:
error_url = reverse("admin:votingrecord-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING)
return redirect("../")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("../")
else:
form = VotingRecordImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("../")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Voting Record Fields",
'headers': headers,
'model_fields': VOTING_RECORD_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = VotingRecordImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Voting Records"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)