from decimal import Decimal from datetime import datetime, date from django.db import transaction from django.http import HttpResponse from django.utils.safestring import mark_safe import csv import io import logging import tempfile import os from django.contrib import admin, messages from django.urls import path, reverse from django.shortcuts import render, redirect from django.template.response import TemplateResponse from .models import ( format_phone_number, Tenant, TenantUserRole, InteractionType, DonationMethod, ElectionType, EventType, Voter, VotingRecord, Event, EventParticipation, Donation, Interaction, VoterLikelihood, CampaignSettings, Interest, Volunteer, VolunteerEvent, ParticipationStatus, VolunteerRole ) from .forms import ( VoterImportForm, EventImportForm, EventParticipationImportForm, DonationImportForm, InteractionImportForm, VoterLikelihoodImportForm, VolunteerImportForm, VotingRecordImportForm ) logger = logging.getLogger(__name__) VOTER_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('first_name', 'First Name'), ('last_name', 'Last Name'), ('nickname', 'Nickname'), ('birthdate', 'Birthdate'), ('address_street', 'Street Address'), ('city', 'City'), ('state', 'State'), ('prior_state', 'Prior State'), ('zip_code', 'Zip Code'), ('county', 'County'), ('neighborhood', 'Neighborhood'), ('phone', 'Phone'), ('notes', 'Notes'), ('phone_type', 'Phone Type'), ('email', 'Email'), ('district', 'District'), ('precinct', 'Precinct'), ('registration_date', 'Registration Date'), ('is_targeted', 'Is Targeted'), ('candidate_support', 'Candidate Support'), ('yard_sign', 'Yard Sign'), ('window_sticker', 'Window Sticker'), ('latitude', 'Latitude'), ('longitude', 'Longitude'), ('secondary_phone', 'Secondary Phone'), ('secondary_phone_type', 'Secondary Phone Type'), ('door_visit', 'Door Visit'), ] EVENT_MAPPABLE_FIELDS = [ ('name', 'Name'), ('date', 'Date'), ('start_time', 'Start Time'), ('end_time', 'End Time'), ('event_type', 'Event Type (Name)'), ('description', 'Description'), ('location_name', 'Location Name'), ('address', 'Address'), ('city', 'City'), ('state', 'State'), ('zip_code', 'Zip Code'), ('latitude', 'Latitude'), ('longitude', 'Longitude'), ] EVENT_PARTICIPATION_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('first_name', 'First Name'), ('last_name', 'Last Name'), ('event_name', 'Event Name'), ('participation_status', 'Participation Status'), ] DONATION_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('date', 'Date'), ('amount', 'Amount'), ('method', 'Donation Method (Name)'), ] INTERACTION_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('volunteer_email', 'Volunteer Email'), ('date', 'Date'), ('type', 'Interaction Type (Name)'), ('description', 'Description'), ('notes', 'Notes'), ] VOLUNTEER_MAPPABLE_FIELDS = [ ('first_name', 'First Name'), ('last_name', 'Last Name'), ('email', 'Email'), ('phone', 'Phone'), ('notes', 'Notes'), ] VOTER_LIKELIHOOD_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('election_type', 'Election Type (Name)'), ('likelihood', 'Likelihood'), ] VOTING_RECORD_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('election_date', 'Election Date'), ('election_description', 'Election Description'), ('primary_party', 'Primary Party'), ] class BaseImportAdminMixin: def download_errors(self, request): logger.info(f"download_errors called for {self.model._meta.model_name}") session_key = f"{self.model._meta.model_name}_import_errors" failed_rows = request.session.get(session_key, []) if not failed_rows: self.message_user(request, "No error log found in session.", level=messages.WARNING) return redirect("../") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f"attachment; filename={self.model._meta.model_name}_import_errors.csv" if failed_rows: all_keys = set() for r in failed_rows: all_keys.update(r.keys()) writer = csv.DictWriter(response, fieldnames=sorted(list(all_keys))) writer.writeheader() writer.writerows(failed_rows) return response def chunk_reader(self, reader, size): chunk = [] for row in reader: chunk.append(row) if len(chunk) == size: yield chunk chunk = [] if chunk: yield chunk class TenantUserRoleInline(admin.TabularInline): model = TenantUserRole extra = 1 class CampaignSettingsInline(admin.StackedInline): model = CampaignSettings can_delete = False @admin.register(Tenant) class TenantAdmin(admin.ModelAdmin): list_display = ('name', 'created_at') search_fields = ('name',) inlines = [TenantUserRoleInline, CampaignSettingsInline] @admin.register(TenantUserRole) class TenantUserRoleAdmin(admin.ModelAdmin): list_display = ('user', 'tenant', 'role') list_filter = ('tenant', 'role') search_fields = ('user__username', 'tenant__name') @admin.register(InteractionType) class InteractionTypeAdmin(admin.ModelAdmin): list_display = ('name', 'tenant', 'is_active') list_filter = ('tenant', 'is_active') search_fields = ('name',) @admin.register(DonationMethod) class DonationMethodAdmin(admin.ModelAdmin): list_display = ('name', 'tenant', 'is_active') list_filter = ('tenant', 'is_active') search_fields = ('name',) @admin.register(VolunteerRole) class VolunteerRoleAdmin(admin.ModelAdmin): list_display = ("name", "tenant", "is_active") list_filter = ("tenant", "is_active") search_fields = ("name",) @admin.register(ElectionType) class ElectionTypeAdmin(admin.ModelAdmin): list_display = ('name', 'tenant', 'is_active') list_filter = ('tenant', 'is_active') search_fields = ('name',) @admin.register(EventType) class EventTypeAdmin(admin.ModelAdmin): list_display = ('name', 'tenant', 'is_active', 'default_volunteer_role') list_filter = ('tenant', 'is_active') search_fields = ('name',) filter_horizontal = ('available_roles',) @admin.register(ParticipationStatus) class ParticipationStatusAdmin(admin.ModelAdmin): list_display = ('name', 'tenant', 'is_active') list_filter = ('tenant', 'is_active') search_fields = ('name',) change_list_template = 'admin/participationstatus_change_list.html' def changelist_view(self, request, extra_context=None): extra_context = extra_context or {} from core.models import Tenant extra_context['tenants'] = Tenant.objects.all() return super().changelist_view(request, extra_context=extra_context) @admin.register(Interest) class InterestAdmin(admin.ModelAdmin): list_display = ('name', 'tenant') list_filter = ('tenant',) fields = ('tenant', 'name') search_fields = ('name',) class VotingRecordInline(admin.TabularInline): model = VotingRecord extra = 1 class DonationInline(admin.TabularInline): model = Donation extra = 1 class InteractionInline(admin.TabularInline): model = Interaction extra = 1 autocomplete_fields = ['voter', 'type', 'volunteer'] class VoterLikelihoodInline(admin.TabularInline): model = VoterLikelihood extra = 1 class VolunteerEventInline(admin.TabularInline): model = VolunteerEvent extra = 1 @admin.register(Voter) class VoterAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('first_name', 'last_name', 'nickname', 'voter_id', 'tenant', 'district', 'candidate_support', 'is_targeted', 'city', 'state', 'prior_state') list_filter = ('tenant', 'candidate_support', 'is_targeted', 'phone_type', 'yard_sign', 'district', 'city', 'state', 'prior_state') search_fields = ('first_name', 'last_name', 'nickname', 'voter_id', 'address', 'city', 'state', 'prior_state', 'zip_code', 'county') inlines = [VotingRecordInline, DonationInline, InteractionInline, VoterLikelihoodInline] readonly_fields = ('address',) change_list_template = "admin/voter_change_list.html" def changelist_view(self, request, extra_context=None): extra_context = extra_context or {} from core.models import Tenant extra_context["tenants"] = Tenant.objects.all() return super().changelist_view(request, extra_context=extra_context) def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='voter-download-errors'), path('import-voters/', self.admin_site.admin_view(self.import_voters), name='import-voters'), ] return my_urls + urls def import_voters(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get("file_path") tenant_id = request.POST.get("tenant") tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOTER_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f"map_{field_name}") try: with open(file_path, "r", encoding="utf-8-sig") as f: # Optimization: Skip full count for very large files in preview if needed, # but here we'll keep it for accuracy unless it's a known bottleneck. # For now, let's just do a fast line count. total_count = sum(1 for line in f) - 1 f.seek(0) reader = csv.DictReader(f) preview_rows = [] voter_ids_for_preview = [] for i, row in enumerate(reader): if i < 10: preview_rows.append(row) v_id = row.get(mapping.get("voter_id")) if v_id: voter_ids_for_preview.append(v_id.strip()) else: break existing_preview_ids = set(Voter.objects.filter(tenant=tenant, voter_id__in=voter_ids_for_preview).values_list("voter_id", flat=True)) create_count = 0 update_count = 0 for row in preview_rows: voter_id_val = row.get(mapping.get("voter_id")) if voter_id_val and voter_id_val.strip() in existing_preview_ids: update_count += 1 else: create_count += 1 context = self.admin_site.each_context(request) context.update({ "title": "Import Preview", "total_count": total_count, "create_count": create_count, "update_count": update_count, "preview_data": preview_rows, # This should be improved to show actual changes "mapping": mapping, "file_path": file_path, "tenant_id": tenant_id, "action_url": request.path, "opts": self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("../") elif "_import" in request.POST: file_path = request.POST.get("file_path") tenant_id = request.POST.get("tenant") tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOTER_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f"map_{field_name}") try: created_count = 0 updated_count = 0 skipped_no_change = 0 skipped_no_id = 0 errors = 0 failed_rows = [] total_processed = 0 # Temporary storage for error rows to avoid holding large file in memory temp_error_file = None temp_error_file_path = None # Process in chunks to reduce memory usage for very large files with open(file_path, "r", encoding="utf-8-sig") as f_read: reader = csv.DictReader(f_read) for i, row in enumerate(reader): total_processed += 1 try: raw_voter_id = row.get(mapping.get("voter_id")) voter_id = raw_voter_id.strip() if raw_voter_id else None if not voter_id: # Enhanced error message to guide the user mapped_column_name = mapping.get("voter_id", "N/A") error_detail = f"Raw value: '{raw_voter_id}'. " if raw_voter_id is not None else "Value was None." row["Import Error"] = f"Voter ID is required. Please check if the '{mapped_column_name}' column is correctly mapped and contains values for all rows. {error_detail}" failed_rows.append(row) skipped_no_id += 1 errors += 1 continue defaults = {} # Map other fields dynamically for field_name, _ in VOTER_MAPPABLE_FIELDS: csv_column = mapping.get(field_name) if csv_column and csv_column in row: field_value = row[csv_column].strip() if field_name == "birthdate" or field_name == "registration_date": # Handle date conversions if field_value: try: # Attempt to parse common date formats if '/' in field_value: # Try MM/DD/YYYY or DD/MM/YYYY if len(field_value.split('/')[2]) == 2: # YY format dt = datetime.strptime(field_value, '%m/%d/%y').date() if len(field_value.split('/')[0]) < 3 else datetime.strptime(field_value, '%d/%m/%y').date() # noqa else: dt = datetime.strptime(field_value, '%m/%d/%Y').date() if len(field_value.split('/')[0]) < 3 else datetime.strptime(field_value, '%d/%m/%Y').date() # noqa elif '-' in field_value: # Try YYYY-MM-DD or DD-MM-YYYY or MM-DD-YYYY if len(field_value.split('-')[0]) == 4: # YYYY format dt = datetime.strptime(field_value, '%Y-%m-%d').date() elif len(field_value.split('-')[2]) == 4: # YYYY format dt = datetime.strptime(field_value, '%m-%d-%Y').date() if len(field_value.split('-')[0]) < 3 else datetime.strptime(field_value, '%d-%m-%Y').date() # noqa else: # Default to MM-DD-YY dt = datetime.strptime(field_value, '%m-%d-%y').date() else: dt = None if dt: defaults[field_name] = dt else: logger.warning(f"Could not parse date '{field_value}' for field {field_name}. Skipping.") except ValueError as ve: logger.warning(f"Date parsing error for '{field_value}' in field {field_name}: {ve}") except Exception as ex: logger.error(f"Unexpected error parsing date '{field_value}' for field {field_name}: {ex}") elif field_name == "is_targeted" or field_name == "yard_sign" or field_name == "window_sticker" or field_name == "door_visit": # Handle boolean fields if field_value.lower() == 'true' or field_value == '1': defaults[field_name] = True elif field_value.lower() == 'false' or field_value == '0': defaults[field_name] = False else: defaults[field_name] = None # Or sensible default/error elif field_name == "phone": defaults[field_name] = format_phone_number(field_value) elif field_name == "email": defaults[field_name] = field_value.lower() # Store emails as lowercase elif field_name == "candidate_support": if field_value in [choice[0] for choice in Voter.CANDIDATE_SUPPORT_CHOICES]: defaults[field_name] = field_value else: logger.warning(f"Invalid candidate_support value: {field_value}. Skipping.") elif field_name == "phone_type": if field_value in [choice[0] for choice in Voter.PHONE_TYPE_CHOICES]: defaults[field_name] = field_value else: logger.warning(f"Invalid phone_type value: {field_value}. Skipping.") elif field_name == "secondary_phone_type": if field_value in [choice[0] for choice in Voter.PHONE_TYPE_CHOICES]: defaults[field_name] = field_value else: logger.warning(f"Invalid secondary_phone_type value: {field_value}. Skipping.") elif field_name == "state" or field_name == "prior_state": # Ensure state is uppercase and valid length if field_value and len(field_value) <= 2: defaults[field_name] = field_value.upper() else: logger.warning(f"Invalid state value: {field_value}. Skipping.") else: defaults[field_name] = field_value # Try to get voter. If not found, create new. Update if found. voter, created = Voter.objects.update_or_create( tenant=tenant, voter_id=voter_id, defaults=defaults ) if created: created_count += 1 else: updated_count += 1 # Special handling for interests - assuming a comma-separated list in CSV if 'interests' in mapping and row.get(mapping['interests']): interest_names = [name.strip() for name in row[mapping['interests']].split(',') if name.strip()] for interest_name in interest_names: interest, _ = Interest.objects.get_or_create(tenant=tenant, name=interest_name) voter.interests.add(interest) if (i + 1) % 100 == 0: print(f"DEBUG: Voter import progress: {total_processed} processed. {created_count} created. {updated_count} updated.") except Exception as e: row["Import Error"] = str(e) failed_rows.append(row) errors += 1 logger.error(f"Error importing row: {row}. Error: {e}") # Clean up the temporary file if os.path.exists(file_path): os.remove(file_path) if temp_error_file_path and os.path.exists(temp_error_file_path): os.remove(temp_error_file_path) self.message_user(request, f"Import complete: {created_count + updated_count} voters created/updated. ({created_count} new, {updated_count} updated, {skipped_no_change} skipped with no changes, {skipped_no_id} skipped missing ID, {errors} errors)") # Store failed rows in session for download, limit to avoid session overflow request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000] request.session.modified = True if errors > 0: error_url = reverse("admin:voter-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("../") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("../") else: form = VoterImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("../") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ "title": "Map Voter Fields", "headers": headers, "model_fields": VOTER_MAPPABLE_FIELDS, "tenant_id": tenant.id, "file_path": file_path, "action_url": request.path, "opts": self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = VoterImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Voters" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(Event) class EventAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('name', 'date', 'event_type', 'tenant', 'location_name', 'address', 'city', 'state', 'zip_code') list_filter = ('tenant', 'event_type') search_fields = ('name', 'location_name', 'address', 'city', 'state', 'zip_code') inlines = [VolunteerEventInline] change_list_template = "admin/event_change_list.html" def changelist_view(self, request, extra_context=None): extra_context = extra_context or {} from core.models import Tenant extra_context['tenants'] = Tenant.objects.all() return super().changelist_view(request, extra_context=extra_context) def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='event-download-errors'), path('import-events/', self.admin_site.admin_view(self.import_events), name='import-events'), ] return my_urls + urls def import_events(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in EVENT_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 event_name = row.get(mapping.get('name')) event_date = row.get(mapping.get('date')) exists = False if event_name and event_date: try: # Assuming name and date uniquely identify an event # This might need refinement based on actual data uniqueness requirements if '/' in event_date: dt = datetime.strptime(event_date, '%m/%d/%Y').date() elif '-' in event_date: dt = datetime.strptime(event_date, '%Y-%m-%d').date() else: dt = None if dt: exists = Event.objects.filter(tenant=tenant, name=event_name, date=dt).exists() except ValueError: # Handle cases where date parsing fails pass if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"Event: {event_name} (Date: {event_date})", 'details': f"Location: {row.get(mapping.get('location_name', '')) or ''}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("../") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in EVENT_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: count = 0 errors = 0 failed_rows = [] with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: try: event_name = row.get(mapping.get('name')) event_date = row.get(mapping.get('date')) event_type_name = row.get(mapping.get('event_type')) if not event_name or not event_date or not event_type_name: row["Import Error"] = "Missing event name, date, or type" failed_rows.append(row) errors += 1 continue # Date parsing for event_date try: if '/' in event_date: parsed_date = datetime.strptime(event_date, '%m/%d/%Y').date() elif '-' in event_date: parsed_date = datetime.strptime(event_date, '%Y-%m-%d').date() else: row["Import Error"] = "Invalid date format" failed_rows.append(row) errors += 1 continue except ValueError: row["Import Error"] = "Invalid date format" failed_rows.append(row) errors += 1 continue event_type_obj, _ = EventType.objects.get_or_create(tenant=tenant, name=event_type_name) defaults = { 'date': parsed_date, 'event_type': event_type_obj, 'description': row.get(mapping.get('description')) or '', 'location_name': row.get(mapping.get('location_name')) or '', 'address': row.get(mapping.get('address')) or '', 'city': row.get(mapping.get('city')) or '', 'state': row.get(mapping.get('state')) or '', 'zip_code': row.get(mapping.get('zip_code')) or '', 'latitude': row.get(mapping.get('latitude')) or None, 'longitude': row.get(mapping.get('longitude')) or None, } # Handle start_time and end_time start_time_str = row.get(mapping.get('start_time')) if start_time_str: try: defaults['start_time'] = datetime.strptime(start_time_str, '%H:%M').time() except ValueError: logger.warning(f"Invalid start_time format: {start_time_str}. Skipping.") end_time_str = row.get(mapping.get('end_time')) if end_time_str: try: defaults['end_time'] = datetime.strptime(end_time_str, '%H:%M').time() except ValueError: logger.warning(f"Invalid end_time format: {end_time_str}. Skipping.") Event.objects.update_or_create( tenant=tenant, name=event_name, defaults=defaults ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} events.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000] request.session.modified = True if errors > 0: error_url = reverse("admin:event-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("../") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("../") else: form = EventImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("../") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Event Fields", 'headers': headers, 'model_fields': EVENT_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = EventImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Events" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(Volunteer) class VolunteerAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('first_name', 'last_name', 'email', 'phone', 'tenant') list_filter = ('tenant',) search_fields = ('first_name', 'last_name', 'email', 'phone') change_list_template = "admin/volunteer_change_list.html" def changelist_view(self, request, extra_context=None): extra_context = extra_context or {} from core.models import Tenant extra_context['tenants'] = Tenant.objects.all() return super().changelist_view(request, extra_context=extra_context) def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='volunteer-download-errors'), path('import-volunteers/', self.admin_site.admin_view(self.import_volunteers), name='import-volunteers'), ] return my_urls + urls def import_volunteers(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOLUNTEER_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 email = row.get(mapping.get('email')) exists = False if email: exists = Volunteer.objects.filter(tenant=tenant, email=email).exists() if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"Volunteer: {email}", 'details': f"Name: {row.get(mapping.get('first_name', '')) or ''} {row.get(mapping.get('last_name', '')) or ''}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("../") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOLUNTEER_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: count = 0 errors = 0 failed_rows = [] with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: try: email = row.get(mapping.get('email')) if not email: row["Import Error"] = "Missing email" failed_rows.append(row) errors += 1 continue defaults = { 'first_name': row.get(mapping.get('first_name')) or '', 'last_name': row.get(mapping.get('last_name')) or '', 'phone': format_phone_number(row.get(mapping.get('phone')) or ''), 'notes': row.get(mapping.get('notes')) or '', } Volunteer.objects.update_or_create( tenant=tenant, email=email, defaults=defaults ) count += 1 except Exception as e: logger.error(f"Error importing volunteer: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} volunteers.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000] request.session.modified = True if errors > 0: error_url = reverse("admin:volunteer-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("../") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("../") else: form = VolunteerImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("../") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Volunteer Fields", 'headers': headers, 'model_fields': VOLUNTEER_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = VolunteerImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Volunteers" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(EventParticipation) class EventParticipationAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('event', 'voter', 'participation_status') list_filter = ('event', 'participation_status', 'voter__tenant') search_fields = ('event__name', 'voter__first_name', 'voter__last_name', 'voter__voter_id') change_list_template = 'admin/eventparticipation_change_list.html' def changelist_view(self, request, extra_context=None): extra_context = extra_context or {} from core.models import Tenant extra_context['tenants'] = Tenant.objects.all() return super().changelist_list(request, extra_context=extra_context) def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='eventparticipation-download-errors'), path('import-event-participations/', self.admin_site.admin_view(self.import_event_participations), name='import-event-participations'), ] return my_urls + urls def import_event_participations(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in EVENT_PARTICIPATION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 voter_id = row.get(mapping.get('voter_id')) # Extract first_name and last_name from CSV based on mapping csv_first_name = row.get(mapping.get('first_name'), '') csv_last_name = row.get(mapping.get('last_name'), '') csv_full_name = f"{csv_first_name} {csv_last_name}".strip() exists = False voter_full_name = "N/A" # Initialize voter_full_name if voter_id: try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) voter_full_name = f"{voter.first_name} {voter.last_name}" # Get voter's full name if event_name: exists = EventParticipation.objects.filter(voter=voter, event__name=event_name).exists() except Voter.DoesNotExist: pass if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'csv_full_name': csv_full_name, # Add CSV name 'identifier': f"Voter: {voter_full_name} (ID: {voter_id})" if voter_id else "N/A", # Include full name 'details': f"Participation: {row.get(mapping.get('participation_status', '')) or ''}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("../") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in EVENT_PARTICIPATION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) count = 0 errors = 0 failed_rows = [] for row in reader: try: voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None participation_status_val = row.get(mapping.get('participation_status')) if mapping.get('participation_status') else None if voter_id: # Only strip if voter_id is not None voter_id = voter_id.strip() if not voter_id: row["Import Error"] = "Missing voter ID" failed_rows.append(row) errors += 1 continue try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) except Voter.DoesNotExist: error_msg = f"Voter with ID {voter_id} not found" logger.error(error_msg) row["Import Error"] = error_msg failed_rows.append(row) errors += 1 continue event = None event_name = row.get(mapping.get('event_name')) if mapping.get('event_name') else None if event_name: try: event = Event.objects.get(tenant=tenant, name=event_name) except Event.DoesNotExist: pass if not event: error_msg = "Event not found (check Event Name)" logger.error(error_msg) row["Import Error"] = error_msg failed_rows.append(row) errors += 1 continue defaults = {} if participation_status_val and participation_status_val.strip(): status_obj, _ = ParticipationStatus.objects.get_or_create(tenant=tenant, name=participation_status_val.strip()) defaults['participation_status'] = status_obj else: # Default to 'Invited' if not specified status_obj, _ = ParticipationStatus.objects.get_or_create(tenant=tenant, name='Invited') defaults['participation_status'] = status_obj EventParticipation.objects.update_or_create( event=event, voter=voter, defaults=defaults ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} participations.") # Optimization: Limit error log size in session to avoid overflow request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000] request.session.modified = True logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}") if errors > 0: error_url = reverse("admin:eventparticipation-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("../") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("../") else: form = EventParticipationImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("../") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Participation Fields", 'headers': headers, 'model_fields': EVENT_PARTICIPATION_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = EventParticipationImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Participations" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(Donation) class DonationAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('voter', 'date', 'amount', 'method') list_filter = ('voter__tenant', 'method') search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'method__name') change_list_template = 'admin/donation_change_list.html' def changelist_view(self, request, extra_context=None): extra_context = extra_context or {} from core.models import Tenant extra_context['tenants'] = Tenant.objects.all() return super().changelist_view(request, extra_context=extra_context) def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='donation-download-errors'), path('import-donations/', self.admin_site.admin_view(self.import_donations), name='import-donations'), ] return my_urls + urls def import_donations(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in DONATION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 voter_id = row.get(mapping.get('voter_id')) exists = False if voter_id: exists = Voter.objects.filter(tenant=tenant, voter_id=voter_id).exists() if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"Voter ID: {voter_id}", 'details': f"Amount: {row.get(mapping.get('amount', '')) or ''}, Method: {row.get(mapping.get('method', '')) or ''}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("../") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in DONATION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: count = 0 errors = 0 failed_rows = [] with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: try: voter_id = row.get(mapping.get('voter_id')) date_str = row.get(mapping.get('date')) amount_str = row.get(mapping.get('amount')) method_name = row.get(mapping.get('method')) if voter_id: # Only strip if voter_id is not None voter_id = voter_id.strip() if not voter_id: row["Import Error"] = "Missing voter ID" failed_rows.append(row) errors += 1 continue if not date_str or not amount_str: row["Import Error"] = "Missing date or amount" failed_rows.append(row) errors += 1 continue try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) except Voter.DoesNotExist: row["Import Error"] = f"Voter {voter_id} not found" failed_rows.append(row) errors += 1 continue try: if '/' in date_str: parsed_date = datetime.strptime(date_str, '%m/%d/%Y').date() elif '-' in date_str: parsed_date = datetime.strptime(date_str, '%Y-%m-%d').date() else: row["Import Error"] = "Invalid date format" failed_rows.append(row) errors += 1 continue except ValueError: row["Import Error"] = "Invalid date format" failed_rows.append(row) errors += 1 continue try: amount = Decimal(amount_str) except InvalidOperation: row["Import Error"] = "Invalid amount format" failed_rows.append(row) errors += 1 continue donation_method, _ = DonationMethod.objects.get_or_create(tenant=tenant, name=method_name) Donation.objects.create( voter=voter, date=parsed_date, amount=amount, method=donation_method ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} donations.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000] request.session.modified = True if errors > 0: error_url = reverse("admin:donation-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("../") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("../") else: form = DonationImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("../") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Donation Fields", 'headers': headers, 'model_fields': DONATION_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = DonationImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Donations" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(Interaction) class InteractionAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('voter', 'date', 'type', 'description', 'volunteer') list_filter = ('voter__tenant', 'type', 'volunteer') search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'description', 'volunteer__first_name', 'volunteer__last_name') autocomplete_fields = ['voter', 'volunteer'] change_list_template = 'admin/interaction_change_list.html' def changelist_view(self, request, extra_context=None): extra_context = extra_context or {} from core.models import Tenant extra_context['tenants'] = Tenant.objects.all() return super().changelist_view(request, extra_context=extra_context) def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='interaction-download-errors'), path('import-interactions/', self.admin_site.admin_view(self.import_interactions), name='import-interactions'), ] return my_urls + urls def import_interactions(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in INTERACTION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 voter_id = row.get(mapping.get('voter_id')) volunteer_email = row.get(mapping.get('volunteer_email')) exists = False if voter_id: exists = Voter.objects.filter(tenant=tenant, voter_id=voter_id).exists() if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"Voter ID: {voter_id}", 'details': f"Type: {row.get(mapping.get('type', '')) or ''}, Volunteer: {volunteer_email or ''}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("../") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in INTERACTION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: count = 0 errors = 0 failed_rows = [] with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: try: voter_id = row.get(mapping.get('voter_id')) volunteer_email = row.get(mapping.get('volunteer_email')) date_str = row.get(mapping.get('date')) type_name = row.get(mapping.get('type')) if voter_id: # Only strip if voter_id is not None voter_id = voter_id.strip() if not voter_id: row["Import Error"] = "Missing voter ID" failed_rows.append(row) errors += 1 continue if not date_str or not type_name: row["Import Error"] = "Missing date or description" failed_rows.append(row) errors += 1 continue try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) except Voter.DoesNotExist: row["Import Error"] = f"Voter {voter_id} not found" failed_rows.append(row) errors += 1 continue volunteer = None if volunteer_email: try: volunteer = Volunteer.objects.get(tenant=tenant, email=volunteer_email) except Volunteer.DoesNotExist: pass # Volunteer is optional try: if '/' in date_str: parsed_date = datetime.strptime(date_str, '%m/%d/%Y').date() elif '-' in date_str: parsed_date = datetime.strptime(date_str, '%Y-%m-%d').date() else: row["Import Error"] = "Invalid date format" failed_rows.append(row) errors += 1 continue except ValueError: row["Import Error"] = "Invalid date format" failed_rows.append(row) errors += 1 continue interaction_type, _ = InteractionType.objects.get_or_create(tenant=tenant, name=type_name) Interaction.objects.create( voter=voter, volunteer=volunteer, date=parsed_date, type=interaction_type, description=row.get(mapping.get('description')) or '', notes=row.get(mapping.get('notes')) or '' ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} interactions.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000] request.session.modified = True if errors > 0: error_url = reverse("admin:interaction-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("../") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("../") else: form = InteractionImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("../") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Interaction Fields", 'headers': headers, 'model_fields': INTERACTION_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = InteractionImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Interactions" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(VoterLikelihood) class VoterLikelihoodAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('voter', 'election_type', 'likelihood') list_filter = ('voter__tenant', 'election_type', 'likelihood') search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'election_type__name') change_list_template = 'admin/voterlikelihood_change_list.html' def changelist_view(self, request, extra_context=None): extra_context = extra_context or {} from core.models import Tenant extra_context['tenants'] = Tenant.objects.all() return super().changelist_view(request, extra_context=extra_context) def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='voterlikelihood-download-errors'), path('import-likelihoods/', self.admin_site.admin_view(self.import_likelihoods), name='import-likelihoods'), ] return my_urls + urls def import_likelihoods(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 voter_id = row.get(mapping.get('voter_id')) exists = False if voter_id: exists = Voter.objects.filter(tenant=tenant, voter_id=voter_id).exists() if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"Voter ID: {voter_id}", 'details': f"Likelihood: {row.get(mapping.get('likelihood', '')) or ''}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("../") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: count = 0 errors = 0 failed_rows = [] with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: try: voter_id = row.get(mapping.get('voter_id')) election_type_name = row.get(mapping.get('election_type')) likelihood_val = row.get(mapping.get('likelihood')) if voter_id: # Only strip if voter_id is not None voter_id = voter_id.strip() if not voter_id: row["Import Error"] = "Missing voter ID" failed_rows.append(row) errors += 1 continue if not election_type_name or not likelihood_val: row["Import Error"] = "Missing election type or likelihood" failed_rows.append(row) errors += 1 continue try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) except Voter.DoesNotExist: row["Import Error"] = f"Voter {voter_id} not found" failed_rows.append(row) errors += 1 continue election_type, _ = ElectionType.objects.get_or_create(tenant=tenant, name=election_type_name) VoterLikelihood.objects.update_or_create( voter=voter, election_type=election_type, defaults={'likelihood': likelihood_val} ) count += 1 except Exception as e: print(f"DEBUG: Likelihood import failed: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Import complete: {count} likelihoods created/updated.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000] request.session.modified = True if errors > 0: error_url = reverse("admin:voterlikelihood-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("../") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("../") else: form = VoterLikelihoodImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("../") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Likelihood Fields", 'headers': headers, 'model_fields': VOTER_LIKELIHOOD_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = VoterLikelihoodImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Likelihoods" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(VotingRecord) class VotingRecordAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('voter', 'election_date', 'election_description', 'primary_party') list_filter = ('voter__tenant', 'primary_party') search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'election_description') change_list_template = 'admin/votingrecord_change_list.html' def changelist_view(self, request, extra_context=None): extra_context = extra_context or {} from core.models import Tenant extra_context['tenants'] = Tenant.objects.all() return super().changelist_view(request, extra_context=extra_context) def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='votingrecord-download-errors'), path('import-voting-records/', self.admin_site.admin_view(self.import_voting_records), name='import-voting-records'), ] return my_urls + urls def import_voting_records(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOTING_RECORD_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 voter_id = row.get(mapping.get('voter_id')) election_date = row.get(mapping.get('election_date')) exists = False if voter_id and election_date: try: # Assuming voter_id and election_date uniquely identify a voting record # This might need refinement based on actual data uniqueness requirements if '/' in election_date: dt = datetime.strptime(election_date, '%m/%d/%Y').date() elif '-' in election_date: dt = datetime.strptime(election_date, '%Y-%m-%d').date() else: dt = None if dt: exists = VotingRecord.objects.filter(voter__tenant=tenant, voter__voter_id=voter_id, election_date=dt).exists() except ValueError: # Handle cases where date parsing fails pass if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"Voter ID: {voter_id} (Election: {election_date})", 'details': f"Party: {row.get(mapping.get('primary_party', '')) or ''}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("../") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOTING_RECORD_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: count = 0 errors = 0 failed_rows = [] with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: try: voter_id = row.get(mapping.get('voter_id')) election_date_str = row.get(mapping.get('election_date')) election_description = row.get(mapping.get('election_description')) primary_party = row.get(mapping.get('primary_party')) if voter_id: # Only strip if voter_id is not None voter_id = voter_id.strip() if not voter_id: row["Import Error"] = "Missing voter ID" failed_rows.append(row) errors += 1 continue if not election_date_str or not election_description: row["Import Error"] = "Missing election date or description" failed_rows.append(row) errors += 1 continue try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) except Voter.DoesNotExist: row["Import Error"] = f"Voter {voter_id} not found" failed_rows.append(row) errors += 1 continue try: if '/' in election_date_str: parsed_election_date = datetime.strptime(election_date_str, '%m/%d/%Y').date() elif '-' in election_date_str: parsed_election_date = datetime.strptime(election_date_str, '%Y-%m-%d').date() else: row["Import Error"] = "Invalid date format" failed_rows.append(row) errors += 1 continue except ValueError: row["Import Error"] = "Invalid date format" failed_rows.append(row) errors += 1 continue VotingRecord.objects.update_or_create( voter=voter, election_date=parsed_election_date, defaults={ 'election_description': election_description, 'primary_party': primary_party or '' } ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} voting records.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000] request.session.modified = True if errors > 0: error_url = reverse("admin:votingrecord-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("../") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("../") else: form = VotingRecordImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("../") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Voting Record Fields", 'headers': headers, 'model_fields': VOTING_RECORD_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = VotingRecordImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Voting Records" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context)