37769-vm/core/admin.py
2026-01-25 05:02:55 +00:00

859 lines
39 KiB
Python

import csv
import io
import logging
import tempfile
import os
from django.contrib import admin, messages
from django.urls import path
from django.shortcuts import render, redirect
from django.template.response import TemplateResponse
from .models import (
Tenant, TenantUserRole, InteractionType, DonationMethod, ElectionType, EventType, Voter,
VotingRecord, Event, EventParticipation, Donation, Interaction, VoterLikelihood, CampaignSettings
)
from .forms import (
VoterImportForm, EventImportForm, EventParticipationImportForm,
DonationImportForm, InteractionImportForm, VoterLikelihoodImportForm
)
logger = logging.getLogger(__name__)
VOTER_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('first_name', 'First Name'),
('last_name', 'Last Name'),
('nickname', 'Nickname'),
('birthdate', 'Birthdate'),
('address_street', 'Street Address'),
('city', 'City'),
('state', 'State'),
('prior_state', 'Prior State'),
('zip_code', 'Zip Code'),
('county', 'County'),
('phone', 'Phone'),
('email', 'Email'),
('district', 'District'),
('precinct', 'Precinct'),
('registration_date', 'Registration Date'),
('is_targeted', 'Is Targeted'),
('candidate_support', 'Candidate Support'),
('yard_sign', 'Yard Sign'),
('window_sticker', 'Window Sticker'),
('latitude', 'Latitude'),
('longitude', 'Longitude'),
]
EVENT_MAPPABLE_FIELDS = [
('date', 'Date'),
('event_type', 'Event Type (Name)'),
('description', 'Description'),
]
EVENT_PARTICIPATION_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('event_id', 'Event ID'),
('event_date', 'Event Date'),
('event_type', 'Event Type (Name)'),
('participation_type', 'Participation Type'),
]
DONATION_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('date', 'Date'),
('amount', 'Amount'),
('method', 'Donation Method (Name)'),
]
INTERACTION_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('date', 'Date'),
('type', 'Interaction Type (Name)'),
('description', 'Description'),
('notes', 'Notes'),
]
VOTER_LIKELIHOOD_MAPPABLE_FIELDS = [
('voter_id', 'Voter ID'),
('election_type', 'Election Type (Name)'),
('likelihood', 'Likelihood'),
]
class TenantUserRoleInline(admin.TabularInline):
model = TenantUserRole
extra = 1
class CampaignSettingsInline(admin.StackedInline):
model = CampaignSettings
can_delete = False
@admin.register(Tenant)
class TenantAdmin(admin.ModelAdmin):
list_display = ('name', 'slug', 'created_at')
search_fields = ('name',)
inlines = [TenantUserRoleInline, CampaignSettingsInline]
@admin.register(TenantUserRole)
class TenantUserRoleAdmin(admin.ModelAdmin):
list_display = ('user', 'tenant', 'role')
list_filter = ('tenant', 'role')
search_fields = ('user__username', 'tenant__name')
@admin.register(InteractionType)
class InteractionTypeAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
@admin.register(DonationMethod)
class DonationMethodAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
@admin.register(ElectionType)
class ElectionTypeAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
@admin.register(EventType)
class EventTypeAdmin(admin.ModelAdmin):
list_display = ('name', 'tenant', 'is_active')
list_filter = ('tenant', 'is_active')
search_fields = ('name',)
class VotingRecordInline(admin.TabularInline):
model = VotingRecord
extra = 1
class DonationInline(admin.TabularInline):
model = Donation
extra = 1
class InteractionInline(admin.TabularInline):
model = Interaction
extra = 1
class VoterLikelihoodInline(admin.TabularInline):
model = VoterLikelihood
extra = 1
@admin.register(Voter)
class VoterAdmin(admin.ModelAdmin):
list_display = ('first_name', 'last_name', 'nickname', 'voter_id', 'tenant', 'district', 'candidate_support', 'is_targeted', 'city', 'state', 'prior_state')
list_filter = ('tenant', 'candidate_support', 'is_targeted', 'yard_sign', 'district', 'city', 'state', 'prior_state')
search_fields = ('first_name', 'last_name', 'nickname', 'voter_id', 'address', 'city', 'state', 'prior_state', 'zip_code', 'county')
inlines = [VotingRecordInline, DonationInline, InteractionInline, VoterLikelihoodInline]
change_list_template = "admin/voter_change_list.html"
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('import-voters/', self.admin_site.admin_view(self.import_voters), name='import-voters'),
]
return my_urls + urls
def import_voters(self, request):
if request.method == "POST":
if "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
for row in reader:
try:
voter_data = {}
for field_name, csv_col in mapping.items():
if csv_col:
val = row.get(csv_col)
if val is not None:
if field_name == 'is_targeted':
val = str(val).lower() in ['true', '1', 'yes']
voter_data[field_name] = val
voter_id = voter_data.pop('voter_id', '')
if 'candidate_support' in voter_data:
if voter_data['candidate_support'] not in dict(Voter.SUPPORT_CHOICES):
voter_data['candidate_support'] = 'unknown'
if 'yard_sign' in voter_data:
if voter_data['yard_sign'] not in dict(Voter.YARD_SIGN_CHOICES):
voter_data['yard_sign'] = 'none'
if 'window_sticker' in voter_data:
if voter_data['window_sticker'] not in dict(Voter.WINDOW_STICKER_CHOICES):
voter_data['window_sticker'] = 'none'
for d_field in ['registration_date', 'birthdate', 'latitude', 'longitude']:
if d_field in voter_data and not voter_data[d_field]:
del voter_data[d_field]
Voter.objects.update_or_create(
tenant=tenant,
voter_id=voter_id,
defaults=voter_data
)
count += 1
except Exception as e:
logger.error(f"Error importing voter row: {e}")
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} voters.")
if errors > 0:
self.message_user(request, f"Failed to import {errors} rows.", level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = VoterImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Voter Fields",
'headers': headers,
'model_fields': VOTER_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = VoterImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Voters"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(Event)
class EventAdmin(admin.ModelAdmin):
list_display = ('id', 'event_type', 'date', 'tenant')
list_filter = ('tenant', 'date', 'event_type')
change_list_template = "admin/event_change_list.html"
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('import-events/', self.admin_site.admin_view(self.import_events), name='import-events'),
]
return my_urls + urls
def import_events(self, request):
if request.method == "POST":
if "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in EVENT_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
for row in reader:
try:
date = row.get(mapping.get('date')) if mapping.get('date') else None
event_type_name = row.get(mapping.get('event_type')) if mapping.get('event_type') else None
description = row.get(mapping.get('description')) if mapping.get('description') else ''
if not date or not event_type_name:
errors += 1
continue
event_type, _ = EventType.objects.get_or_create(
tenant=tenant,
name=event_type_name
)
Event.objects.create(
tenant=tenant,
date=date,
event_type=event_type,
description=description
)
count += 1
except Exception as e:
logger.error(f"Error importing event row: {e}")
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} events.")
if errors > 0:
self.message_user(request, f"Failed to import {errors} rows.", level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = EventImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Event Fields",
'headers': headers,
'model_fields': EVENT_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = EventImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Events"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(EventParticipation)
class EventParticipationAdmin(admin.ModelAdmin):
list_display = ('voter', 'event', 'participation_type')
list_filter = ('event__tenant', 'event', 'participation_type')
change_list_template = "admin/eventparticipation_change_list.html"
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('import-event-participations/', self.admin_site.admin_view(self.import_event_participations), name='import-event-participations'),
]
return my_urls + urls
def import_event_participations(self, request):
if request.method == "POST":
if "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in EVENT_PARTICIPATION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None
participation_type = row.get(mapping.get('participation_type')) if mapping.get('participation_type') else 'invited'
if not voter_id:
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
logger.error(f"Voter not found: {voter_id} in tenant {tenant.name}")
errors += 1
continue
event = None
event_id = row.get(mapping.get('event_id')) if mapping.get('event_id') else None
if event_id:
try:
event = Event.objects.get(id=event_id, tenant=tenant)
except Event.DoesNotExist:
pass
if not event:
event_date = row.get(mapping.get('event_date')) if mapping.get('event_date') else None
event_type_name = row.get(mapping.get('event_type')) if mapping.get('event_type') else None
if event_date and event_type_name:
try:
event_type = EventType.objects.get(tenant=tenant, name=event_type_name)
event = Event.objects.get(tenant=tenant, date=event_date, event_type=event_type)
except (EventType.DoesNotExist, Event.DoesNotExist):
pass
if not event:
logger.error(f"Event not found for row")
errors += 1
continue
if participation_type not in dict(EventParticipation.PARTICIPATION_TYPE_CHOICES):
participation_type = 'invited'
EventParticipation.objects.update_or_create(
event=event,
voter=voter,
defaults={'participation_type': participation_type}
)
count += 1
except Exception as e:
logger.error(f"Error importing participation row: {e}")
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} participations.")
if errors > 0:
self.message_user(request, f"Failed to import {errors} rows.", level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = EventParticipationImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Participation Fields",
'headers': headers,
'model_fields': EVENT_PARTICIPATION_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = EventParticipationImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Participations"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(Donation)
class DonationAdmin(admin.ModelAdmin):
list_display = ('id', 'voter', 'date', 'amount', 'method')
list_filter = ('voter__tenant', 'date', 'method')
search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id')
change_list_template = "admin/donation_change_list.html"
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('import-donations/', self.admin_site.admin_view(self.import_donations), name='import-donations'),
]
return my_urls + urls
def import_donations(self, request):
if request.method == "POST":
if "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in DONATION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None
if not voter_id:
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
errors += 1
continue
date = row.get(mapping.get('date'))
amount = row.get(mapping.get('amount'))
method_name = row.get(mapping.get('method'))
if not date or not amount:
errors += 1
continue
method = None
if method_name:
method, _ = DonationMethod.objects.get_or_create(
tenant=tenant,
name=method_name
)
Donation.objects.create(
voter=voter,
date=date,
amount=amount,
method=method
)
count += 1
except Exception as e:
logger.error(f"Error importing donation row: {e}")
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} donations.")
if errors > 0:
self.message_user(request, f"Failed to import {errors} rows.", level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = DonationImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Donation Fields",
'headers': headers,
'model_fields': DONATION_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = DonationImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Donations"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(Interaction)
class InteractionAdmin(admin.ModelAdmin):
list_display = ('id', 'voter', 'type', 'date', 'description')
list_filter = ('voter__tenant', 'type', 'date')
search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id', 'description')
change_list_template = "admin/interaction_change_list.html"
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('import-interactions/', self.admin_site.admin_view(self.import_interactions), name='import-interactions'),
]
return my_urls + urls
def import_interactions(self, request):
if request.method == "POST":
if "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in INTERACTION_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None
if not voter_id:
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
errors += 1
continue
date = row.get(mapping.get('date'))
type_name = row.get(mapping.get('type'))
description = row.get(mapping.get('description'))
notes = row.get(mapping.get('notes')) if mapping.get('notes') else ''
if not date or not description:
errors += 1
continue
interaction_type = None
if type_name:
interaction_type, _ = InteractionType.objects.get_or_create(
tenant=tenant,
name=type_name
)
Interaction.objects.create(
voter=voter,
date=date,
type=interaction_type,
description=description,
notes=notes
)
count += 1
except Exception as e:
logger.error(f"Error importing interaction row: {e}")
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} interactions.")
if errors > 0:
self.message_user(request, f"Failed to import {errors} rows.", level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = InteractionImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Interaction Fields",
'headers': headers,
'model_fields': INTERACTION_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = InteractionImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Interactions"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(VoterLikelihood)
class VoterLikelihoodAdmin(admin.ModelAdmin):
list_display = ('id', 'voter', 'election_type', 'likelihood')
list_filter = ('voter__tenant', 'election_type', 'likelihood')
search_fields = ('voter__first_name', 'voter__last_name', 'voter__voter_id')
change_list_template = "admin/voterlikelihood_change_list.html"
def get_urls(self):
urls = super().get_urls()
my_urls = [
path('import-likelihoods/', self.admin_site.admin_view(self.import_likelihoods), name='import-likelihoods'),
]
return my_urls + urls
def import_likelihoods(self, request):
if request.method == "POST":
if "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f'map_{field_name}')
try:
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.DictReader(f)
count = 0
errors = 0
for row in reader:
try:
voter_id = row.get(mapping.get('voter_id')) if mapping.get('voter_id') else None
if not voter_id:
errors += 1
continue
try:
voter = Voter.objects.get(tenant=tenant, voter_id=voter_id)
except Voter.DoesNotExist:
errors += 1
continue
election_type_name = row.get(mapping.get('election_type'))
likelihood_val = row.get(mapping.get('likelihood'))
if not election_type_name or not likelihood_val:
errors += 1
continue
election_type, _ = ElectionType.objects.get_or_create(
tenant=tenant,
name=election_type_name
)
# Normalize likelihood
likelihood_choices = dict(VoterLikelihood.LIKELIHOOD_CHOICES)
normalized_likelihood = None
likelihood_val_lower = likelihood_val.lower().replace(' ', '_')
if likelihood_val_lower in likelihood_choices:
normalized_likelihood = likelihood_val_lower
else:
# Try to find by display name
for k, v in likelihood_choices.items():
if v.lower() == likelihood_val.lower():
normalized_likelihood = k
break
if not normalized_likelihood:
errors += 1
continue
VoterLikelihood.objects.update_or_create(
voter=voter,
election_type=election_type,
defaults={'likelihood': normalized_likelihood}
)
count += 1
except Exception as e:
logger.error(f"Error importing likelihood row: {e}")
errors += 1
if os.path.exists(file_path):
os.remove(file_path)
self.message_user(request, f"Successfully imported {count} likelihoods.")
if errors > 0:
self.message_user(request, f"Failed to import {errors} rows.", level=messages.WARNING)
return redirect("..")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = VoterLikelihoodImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='UTF-8') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Likelihood Fields",
'headers': headers,
'model_fields': VOTER_LIKELIHOOD_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = VoterLikelihoodImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Likelihoods"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
@admin.register(CampaignSettings)
class CampaignSettingsAdmin(admin.ModelAdmin):
list_display = ('tenant', 'donation_goal')
list_filter = ('tenant',)