from django.http import HttpResponse from django.utils.safestring import mark_safe import csv import io import logging import tempfile import os from django.contrib import admin, messages from django.urls import path, reverse from django.shortcuts import render, redirect from django.template.response import TemplateResponse from .models import ( Tenant, TenantUserRole, InteractionType, DonationMethod, ElectionType, EventType, Voter, VotingRecord, Event, EventParticipation, Donation, Interaction, VoterLikelihood, CampaignSettings ) from .forms import ( VoterImportForm, EventImportForm, EventParticipationImportForm, DonationImportForm, InteractionImportForm, VoterLikelihoodImportForm ) logger = logging.getLogger(__name__) VOTER_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('first_name', 'First Name'), ('last_name', 'Last Name'), ('nickname', 'Nickname'), ('birthdate', 'Birthdate'), ('address_street', 'Street Address'), ('city', 'City'), ('state', 'State'), ('prior_state', 'Prior State'), ('zip_code', 'Zip Code'), ('county', 'County'), ('phone', 'Phone'), ('email', 'Email'), ('district', 'District'), ('precinct', 'Precinct'), ('registration_date', 'Registration Date'), ('is_targeted', 'Is Targeted'), ('candidate_support', 'Candidate Support'), ('yard_sign', 'Yard Sign'), ('window_sticker', 'Window Sticker'), ('latitude', 'Latitude'), ('longitude', 'Longitude'), ] EVENT_MAPPABLE_FIELDS = [ ('date', 'Date'), ('event_type', 'Event Type (Name)'), ('description', 'Description'), ] EVENT_PARTICIPATION_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('event_id', 'Event ID'), ('event_date', 'Event Date'), ('event_type', 'Event Type (Name)'), ('participation_type', 'Participation Type'), ] DONATION_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('date', 'Date'), ('amount', 'Amount'), ('method', 'Donation Method (Name)'), ] INTERACTION_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('date', 'Date'), ('type', 'Interaction Type (Name)'), ('description', 'Description'), ('notes', 'Notes'), ] VOTER_LIKELIHOOD_MAPPABLE_FIELDS = [ ('voter_id', 'Voter ID'), ('election_type', 'Election Type (Name)'), ('likelihood', 'Likelihood'), ] class BaseImportAdminMixin: def download_errors(self, request): logger.info(f"download_errors called for {self.model._meta.model_name}") session_key = f"{self.model._meta.model_name}_import_errors" failed_rows = request.session.get(session_key, []) if not failed_rows: self.message_user(request, "No error log found in session.", level=messages.WARNING) return redirect("..") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = f"attachment; filename={self.model._meta.model_name}_import_errors.csv" if failed_rows: all_keys = set() for r in failed_rows: all_keys.update(r.keys()) writer = csv.DictWriter(response, fieldnames=sorted(list(all_keys))) writer.writeheader() writer.writerows(failed_rows) return response class TenantUserRoleInline(admin.TabularInline): model = TenantUserRole extra = 1 class CampaignSettingsInline(admin.StackedInline): model = CampaignSettings can_delete = False @admin.register(Tenant) class TenantAdmin(admin.ModelAdmin): list_display = ('name', 'created_at') search_fields = ('name',) inlines = [TenantUserRoleInline, CampaignSettingsInline] @admin.register(TenantUserRole) class TenantUserRoleAdmin(admin.ModelAdmin): list_display = ('user', 'tenant', 'role') list_filter = ('tenant', 'role') search_fields = ('user__username', 'tenant__name') @admin.register(InteractionType) class InteractionTypeAdmin(admin.ModelAdmin): list_display = ('name', 'tenant', 'is_active') list_filter = ('tenant', 'is_active') search_fields = ('name',) @admin.register(DonationMethod) class DonationMethodAdmin(admin.ModelAdmin): list_display = ('name', 'tenant', 'is_active') list_filter = ('tenant', 'is_active') search_fields = ('name',) @admin.register(ElectionType) class ElectionTypeAdmin(admin.ModelAdmin): list_display = ('name', 'tenant', 'is_active') list_filter = ('tenant', 'is_active') search_fields = ('name',) @admin.register(EventType) class EventTypeAdmin(admin.ModelAdmin): list_display = ('name', 'tenant', 'is_active') list_filter = ('tenant', 'is_active') search_fields = ('name',) class VotingRecordInline(admin.TabularInline): model = VotingRecord extra = 1 class DonationInline(admin.TabularInline): model = Donation extra = 1 class InteractionInline(admin.TabularInline): model = Interaction extra = 1 class VoterLikelihoodInline(admin.TabularInline): model = VoterLikelihood extra = 1 @admin.register(Voter) class VoterAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('first_name', 'last_name', 'nickname', 'voter_id', 'tenant', 'district', 'candidate_support', 'is_targeted', 'city', 'state', 'prior_state') list_filter = ('tenant', 'candidate_support', 'is_targeted', 'yard_sign', 'district', 'city', 'state', 'prior_state') search_fields = ('first_name', 'last_name', 'nickname', 'voter_id', 'address', 'city', 'state', 'prior_state', 'zip_code', 'county') inlines = [VotingRecordInline, DonationInline, InteractionInline, VoterLikelihoodInline] readonly_fields = ('address',) change_list_template = "admin/voter_change_list.html" def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='voter-download-errors'), path('import-voters/', self.admin_site.admin_view(self.import_voters), name='import-voters'), ] return my_urls + urls def import_voters(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOTER_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 voter_id = row.get(mapping.get('voter_id')) exists = Voter.objects.filter(tenant=tenant, voter_id=voter_id).exists() if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': voter_id, 'details': f"{row.get(mapping.get('first_name', '')) or ''} {row.get(mapping.get('last_name', '')) or ''}".strip() }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("..") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOTER_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) count = 0 errors = 0 failed_rows = [] for row in reader: try: voter_data = {} voter_id = '' for field_name, csv_col in mapping.items(): if csv_col: val = row.get(csv_col) if val is not None and str(val).strip() != '': if field_name == 'voter_id': voter_id = val continue if field_name == 'is_targeted': val = str(val).lower() in ['true', '1', 'yes'] voter_data[field_name] = val if 'candidate_support' in voter_data: if voter_data['candidate_support'] not in dict(Voter.SUPPORT_CHOICES): voter_data['candidate_support'] = 'unknown' if 'yard_sign' in voter_data: if voter_data['yard_sign'] not in dict(Voter.YARD_SIGN_CHOICES): voter_data['yard_sign'] = 'none' if 'window_sticker' in voter_data: if voter_data['window_sticker'] not in dict(Voter.WINDOW_STICKER_CHOICES): voter_data['window_sticker'] = 'none' Voter.objects.update_or_create( tenant=tenant, voter_id=voter_id, defaults=voter_data ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} voters.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows request.session.modified = True logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}") if errors > 0: error_url = reverse("admin:voter-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("..") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("..") else: form = VoterImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("..") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Voter Fields", 'headers': headers, 'model_fields': VOTER_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = VoterImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Voters" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(Event) class EventAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('id', 'event_type', 'date', 'tenant') list_filter = ('tenant', 'date', 'event_type') change_list_template = "admin/event_change_list.html" def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='event-download-errors'), path('import-events/', self.admin_site.admin_view(self.import_events), name='import-events'), ] return my_urls + urls def import_events(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in EVENT_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 date = row.get(mapping.get('date')) event_type_name = row.get(mapping.get('event_type')) exists = False if date and event_type_name: exists = Event.objects.filter(tenant=tenant, date=date, event_type__name=event_type_name).exists() if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"{date} - {event_type_name}", 'details': row.get(mapping.get('description', '')) or '' }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("..") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in EVENT_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) count = 0 errors = 0 failed_rows = [] for row in reader: try: date = row.get(mapping.get('date')) if mapping.get('date') else None event_type_name = row.get(mapping.get('event_type')) if mapping.get('event_type') else None description = row.get(mapping.get('description')) if mapping.get('description') else None if not date or not event_type_name: row["Import Error"] = "Missing date or event type" failed_rows.append(row) errors += 1 continue event_type, _ = EventType.objects.get_or_create( tenant=tenant, name=event_type_name ) defaults = {} if description and description.strip(): defaults['description'] = description Event.objects.update_or_create( tenant=tenant, date=date, event_type=event_type, defaults=defaults ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} events.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows request.session.modified = True logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}") if errors > 0: error_url = reverse("admin:event-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("..") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("..") else: form = EventImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("..") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Event Fields", 'headers': headers, 'model_fields': EVENT_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = EventImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Events" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(EventParticipation) class EventParticipationAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('voter', 'event', 'participation_type') list_filter = ('event__tenant', 'event', 'participation_type') change_list_template = "admin/eventparticipation_change_list.html" def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='eventparticipation-download-errors'), path('import-event-participations/', self.admin_site.admin_view(self.import_event_participations), name='import-event-participations'), ] return my_urls + urls def import_event_participations(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in EVENT_PARTICIPATION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 voter_id = row.get(mapping.get('voter_id')) event_id = row.get(mapping.get('event_id')) event_date = row.get(mapping.get('event_date')) event_type_name = row.get(mapping.get('event_type')) exists = False if voter_id: try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) if event_id: exists = EventParticipation.objects.filter(voter=voter, event_id=event_id).exists() elif event_date and event_type_name: exists = EventParticipation.objects.filter(voter=voter, event__date=event_date, event__event_type__name=event_type_name).exists() except Voter.DoesNotExist: pass if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"Voter: {voter_id}", 'details': f"Participation: {row.get(mapping.get('participation_type', '')) or ''}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("..") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in EVENT_PARTICIPATION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) count = 0 errors = 0 failed_rows = [] for row in reader: try: voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None participation_type_val = row.get(mapping.get('participation_type')) if mapping.get('participation_type') else None if not voter_id: row["Import Error"] = "Missing voter ID" failed_rows.append(row) errors += 1 continue try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) except Voter.DoesNotExist: error_msg = f"Voter with ID {voter_id} not found" logger.error(error_msg) row["Import Error"] = error_msg failed_rows.append(row) errors += 1 continue event = None event_id = row.get(mapping.get('event_id')) if mapping.get('event_id') else None if event_id: try: event = Event.objects.get(id=event_id, tenant=tenant) except Event.DoesNotExist: pass if not event: event_date = row.get(mapping.get('event_date')) if mapping.get('event_date') else None event_type_name = row.get(mapping.get('event_type')) if mapping.get('event_type') else None if event_date and event_type_name: try: event_type = EventType.objects.get(tenant=tenant, name=event_type_name) event = Event.objects.get(tenant=tenant, date=event_date, event_type=event_type) except (EventType.DoesNotExist, Event.DoesNotExist): pass if not event: error_msg = "Event not found (check ID, date, or type)" logger.error(error_msg) row["Import Error"] = error_msg failed_rows.append(row) errors += 1 continue defaults = {} if participation_type_val and participation_type_val.strip(): if participation_type_val in dict(EventParticipation.PARTICIPATION_TYPE_CHOICES): defaults['participation_type'] = participation_type_val else: defaults['participation_type'] = 'invited' EventParticipation.objects.update_or_create( event=event, voter=voter, defaults=defaults ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} participations.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows request.session.modified = True logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}") if errors > 0: error_url = reverse("admin:eventparticipation-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("..") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("..") else: form = EventParticipationImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("..") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Participation Fields", 'headers': headers, 'model_fields': EVENT_PARTICIPATION_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = EventParticipationImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Participations" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(Donation) class DonationAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('id', 'voter', 'date', 'amount', 'method') list_filter = ('voter__tenant', 'date', 'method') search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id') change_list_template = "admin/donation_change_list.html" def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='donation-download-errors'), path('import-donations/', self.admin_site.admin_view(self.import_donations), name='import-donations'), ] return my_urls + urls def import_donations(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in DONATION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 voter_id = row.get(mapping.get('voter_id')) date = row.get(mapping.get('date')) amount = row.get(mapping.get('amount')) exists = False if voter_id and date and amount: exists = Donation.objects.filter(voter__tenant=tenant, voter__voter_id=voter_id, date=date, amount=amount).exists() if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"Voter: {voter_id}", 'details': f"Date: {date}, Amount: {amount}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("..") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in DONATION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) count = 0 errors = 0 failed_rows = [] for row in reader: try: voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None if not voter_id: row["Import Error"] = "Missing voter ID" failed_rows.append(row) errors += 1 continue try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) except Voter.DoesNotExist: row["Import Error"] = f"Voter {voter_id} not found" failed_rows.append(row) errors += 1 continue date = row.get(mapping.get('date')) amount = row.get(mapping.get('amount')) method_name = row.get(mapping.get('method')) if not date or not amount: row["Import Error"] = "Missing date or amount" failed_rows.append(row) errors += 1 continue method = None if method_name and method_name.strip(): method, _ = DonationMethod.objects.get_or_create( tenant=tenant, name=method_name ) defaults = {} if method: defaults['method'] = method Donation.objects.update_or_create( voter=voter, date=date, amount=amount, defaults=defaults ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} donations.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows request.session.modified = True logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}") if errors > 0: error_url = reverse("admin:donation-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("..") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("..") else: form = DonationImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("..") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Donation Fields", 'headers': headers, 'model_fields': DONATION_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = DonationImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Donations" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(Interaction) class InteractionAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('id', 'voter', 'type', 'date', 'description') list_filter = ('voter__tenant', 'type', 'date') search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'description') change_list_template = "admin/interaction_change_list.html" def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='interaction-download-errors'), path('import-interactions/', self.admin_site.admin_view(self.import_interactions), name='import-interactions'), ] return my_urls + urls def import_interactions(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in INTERACTION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 voter_id = row.get(mapping.get('voter_id')) date = row.get(mapping.get('date')) exists = False if voter_id and date: exists = Interaction.objects.filter(voter__tenant=tenant, voter__voter_id=voter_id, date=date).exists() if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"Voter: {voter_id}", 'details': f"Date: {date}, Desc: {row.get(mapping.get('description', '')) or ''}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("..") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in INTERACTION_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) count = 0 errors = 0 failed_rows = [] for row in reader: try: voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None if not voter_id: row["Import Error"] = "Missing voter ID" failed_rows.append(row) errors += 1 continue try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) except Voter.DoesNotExist: row["Import Error"] = f"Voter {voter_id} not found" failed_rows.append(row) errors += 1 continue date = row.get(mapping.get('date')) type_name = row.get(mapping.get('type')) description = row.get(mapping.get('description')) notes = row.get(mapping.get('notes')) if not date or not description: row["Import Error"] = "Missing date or description" failed_rows.append(row) errors += 1 continue interaction_type = None if type_name and type_name.strip(): interaction_type, _ = InteractionType.objects.get_or_create( tenant=tenant, name=type_name ) defaults = {} if interaction_type: defaults['type'] = interaction_type if description and description.strip(): defaults['description'] = description if notes and notes.strip(): defaults['notes'] = notes Interaction.objects.update_or_create( voter=voter, date=date, defaults=defaults ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} interactions.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows request.session.modified = True logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}") if errors > 0: error_url = reverse("admin:interaction-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("..") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("..") else: form = InteractionImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("..") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Interaction Fields", 'headers': headers, 'model_fields': INTERACTION_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = InteractionImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Interactions" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(VoterLikelihood) class VoterLikelihoodAdmin(BaseImportAdminMixin, admin.ModelAdmin): list_display = ('id', 'voter', 'election_type', 'likelihood') list_filter = ('voter__tenant', 'election_type', 'likelihood') search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id') change_list_template = "admin/voterlikelihood_change_list.html" def get_urls(self): urls = super().get_urls() my_urls = [ path('download-errors/', self.admin_site.admin_view(self.download_errors), name='voterlikelihood-download-errors'), path('import-likelihoods/', self.admin_site.admin_view(self.import_likelihoods), name='import-likelihoods'), ] return my_urls + urls def import_likelihoods(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) total_count = 0 create_count = 0 update_count = 0 preview_data = [] for row in reader: total_count += 1 voter_id = row.get(mapping.get('voter_id')) election_type_name = row.get(mapping.get('election_type')) exists = False if voter_id and election_type_name: exists = VoterLikelihood.objects.filter(voter__tenant=tenant, voter__voter_id=voter_id, election_type__name=election_type_name).exists() if exists: update_count += 1 action = 'update' else: create_count += 1 action = 'create' if len(preview_data) < 10: preview_data.append({ 'action': action, 'identifier': f"Voter: {voter_id}", 'details': f"Election: {election_type_name}, Likelihood: {row.get(mapping.get('likelihood', '')) or ''}" }) context = self.admin_site.each_context(request) context.update({ 'title': "Import Preview", 'total_count': total_count, 'create_count': create_count, 'update_count': update_count, 'preview_data': preview_data, 'mapping': mapping, 'file_path': file_path, 'tenant_id': tenant_id, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("..") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {} for field_name, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS: mapping[field_name] = request.POST.get(f'map_{field_name}') try: with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.DictReader(f) count = 0 errors = 0 failed_rows = [] for row in reader: try: voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None if not voter_id: row["Import Error"] = "Missing voter ID" failed_rows.append(row) errors += 1 continue try: voter = Voter.objects.get(tenant=tenant, voter_id=voter_id) except Voter.DoesNotExist: row["Import Error"] = f"Voter {voter_id} not found" failed_rows.append(row) errors += 1 continue election_type_name = row.get(mapping.get('election_type')) likelihood_val = row.get(mapping.get('likelihood')) if not election_type_name or not likelihood_val: row["Import Error"] = "Missing election type or likelihood value" failed_rows.append(row) errors += 1 continue election_type, _ = ElectionType.objects.get_or_create( tenant=tenant, name=election_type_name ) # Normalize likelihood likelihood_choices = dict(VoterLikelihood.LIKELIHOOD_CHOICES) normalized_likelihood = None likelihood_val_lower = likelihood_val.lower().replace(' ', '_') if likelihood_val_lower in likelihood_choices: normalized_likelihood = likelihood_val_lower else: # Try to find by display name for k, v in likelihood_choices.items(): if v.lower() == likelihood_val.lower(): normalized_likelihood = k break if not normalized_likelihood: row["Import Error"] = f"Invalid likelihood value: {likelihood_val}" failed_rows.append(row) errors += 1 continue defaults = {} if normalized_likelihood and normalized_likelihood.strip(): defaults['likelihood'] = normalized_likelihood VoterLikelihood.objects.update_or_create( voter=voter, election_type=election_type, defaults=defaults ) count += 1 except Exception as e: logger.error(f"Error importing: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if os.path.exists(file_path): os.remove(file_path) self.message_user(request, f"Successfully imported {count} likelihoods.") request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows request.session.modified = True logger.info(f"Stored {len(failed_rows)} failed rows in session for {self.model._meta.model_name}") if errors > 0: error_url = reverse("admin:voterlikelihood-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("..") except Exception as e: self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("..") else: form = VoterLikelihoodImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("..") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='UTF-8') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Likelihood Fields", 'headers': headers, 'model_fields': VOTER_LIKELIHOOD_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = VoterLikelihoodImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Likelihoods" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) @admin.register(CampaignSettings) class CampaignSettingsAdmin(admin.ModelAdmin): list_display = ('tenant', 'donation_goal') list_filter = ('tenant',)