37769-vm/core/voter_import_logic.py
2026-05-30 08:01:02 +00:00

183 lines
9.7 KiB
Python

import io
import csv
import os
def _robust_decode(content):
if not content: return ""
for enc in ["utf-8-sig", "utf-8", "iso-8859-1", "windows-1252"]:
try: return content.decode(enc)
except UnicodeDecodeError: continue
return content.decode("utf-8", errors="replace")
def _read_csv_robust(file_path):
with open(file_path, "rb") as f:
return io.StringIO(_robust_decode(f.read()))
def import_voters(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get("file_path")
tenant_id = request.POST.get("tenant")
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f"map_{field_name}")
try:
with _read_csv_robust(file_path) as f:
total_count = sum(1 for line in f) - 1
f.seek(0)
reader = csv.DictReader(f)
preview_rows = []
voter_ids_for_preview = []
for i, row in enumerate(reader):
if i < 10:
preview_rows.append(row)
v_id = row.get(mapping.get("voter_id"))
if v_id:
voter_ids_for_preview.append(v_id.strip())
else:
break
existing_preview_ids = set(Voter.objects.filter(tenant=tenant, voter_id__in=voter_ids_for_preview).values_list("voter_id", flat=True))
preview_data = []
for row in preview_rows:
voter_id_val = row.get(mapping.get("voter_id"))
action = "update" if voter_id_val and voter_id_val.strip() in existing_preview_ids else "create"
preview_data.append({
"action": action,
"identifier": f"Voter ID: {voter_id_val}",
"details": f"Name: {row.get(mapping.get('first_name', ''))} {row.get(mapping.get('last_name', ''))}"
})
context = self.admin_site.each_context(request)
context.update({
"title": "Import Preview",
"total_count": total_count,
"create_count": sum(1 for d in preview_data if d['action'] == 'create'),
"update_count": sum(1 for d in preview_data if d['action'] == 'update'),
"preview_data": preview_data,
"mapping": mapping,
"file_path": file_path,
"tenant_id": tenant_id,
"action_url": request.path,
"opts": self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("../")
elif "_import" in request.POST:
file_path = request.POST.get("file_path")
tenant_id = request.POST.get("tenant")
tenant = Tenant.objects.get(id=tenant_id)
mapping = {}
for field_name, _ in VOTER_MAPPABLE_FIELDS:
mapping[field_name] = request.POST.get(f"map_{field_name}")
try:
created_count = 0
updated_count = 0
errors = 0
failed_rows = []
with _read_csv_robust(file_path) as f_read:
reader = csv.DictReader(f_read)
for row in reader:
try:
raw_voter_id = row.get(mapping.get("voter_id"))
voter_id = raw_voter_id.strip() if raw_voter_id else None
if not voter_id:
row["Import Error"] = "Voter ID is required."
failed_rows.append(row)
errors += 1
continue
defaults = {}
for field_name, _ in VOTER_MAPPABLE_FIELDS:
if field_name == "voter_id": continue
csv_column = mapping.get(field_name)
if csv_column and csv_column in row:
field_value = row[csv_column].strip()
if field_name in ["birthdate", "registration_date"]:
defaults[field_name] = parse_any_date(field_value)
elif field_name in ["is_targeted", "is_inactive", "target_door_visit", "door_visit", "voted"]:
if field_value.lower() in ['true', '1', 'yes']:
defaults[field_name] = True
elif field_value.lower() in ['false', '0', 'no']:
defaults[field_name] = False
elif field_name == "phone":
defaults[field_name] = format_phone_number(field_value)
elif field_name == "email":
defaults[field_name] = field_value.lower()
else:
defaults[field_name] = field_value
if defaults.get("voted") is True:
defaults["target_door_visit"] = False
defaults["call_queue_status"] = "no_call_required"
voter, created = Voter.objects.update_or_create(
tenant=tenant,
voter_id=voter_id,
defaults=defaults
)
if created: created_count += 1
else: updated_count += 1
except Exception as e:
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
from django.db.models import Exists, OuterRef
subquery = Voter.objects.filter(address_street=OuterRef('address_street'), city=OuterRef('city'), state=OuterRef('state'), zip_code=OuterRef('zip_code'), tenant=tenant, is_targeted=True)
Voter.objects.filter(tenant=tenant, door_visit=False, target_door_visit=True).annotate(has_targeted=Exists(subquery)).filter(has_targeted=False).update(target_door_visit=False)
if os.path.exists(file_path): os.remove(file_path)
self.message_user(request, f"Import complete: {created_count + updated_count} voters processed. ({created_count} created, {updated_count} updated, {errors} errors)")
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000]
request.session.modified = True
if errors > 0:
error_url = reverse("admin:voter-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
return redirect("../")
except Exception as e:
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("../")
else:
form = VoterImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks(): tmp.write(chunk)
file_path = tmp.name
with _read_csv_robust(file_path) as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
"title": "Map Voter Fields",
"headers": headers,
"model_fields": VOTER_MAPPABLE_FIELDS,
"tenant_id": tenant.id,
"file_path": file_path,
"action_url": request.path,
"opts": self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = VoterImportForm()
context = self.admin_site.each_context(request)
context.update({'form': form, 'title': "Import Voters", 'opts': self.model._meta})
return render(request, "admin/import_csv.html", context)