183 lines
9.7 KiB
Python
183 lines
9.7 KiB
Python
import io
|
|
import csv
|
|
import os
|
|
|
|
def _robust_decode(content):
|
|
if not content: return ""
|
|
for enc in ["utf-8-sig", "utf-8", "iso-8859-1", "windows-1252"]:
|
|
try: return content.decode(enc)
|
|
except UnicodeDecodeError: continue
|
|
return content.decode("utf-8", errors="replace")
|
|
|
|
def _read_csv_robust(file_path):
|
|
with open(file_path, "rb") as f:
|
|
return io.StringIO(_robust_decode(f.read()))
|
|
|
|
def import_voters(self, request):
|
|
if request.method == "POST":
|
|
if "_preview" in request.POST:
|
|
file_path = request.POST.get("file_path")
|
|
tenant_id = request.POST.get("tenant")
|
|
tenant = Tenant.objects.get(id=tenant_id)
|
|
|
|
mapping = {}
|
|
for field_name, _ in VOTER_MAPPABLE_FIELDS:
|
|
mapping[field_name] = request.POST.get(f"map_{field_name}")
|
|
|
|
try:
|
|
with _read_csv_robust(file_path) as f:
|
|
total_count = sum(1 for line in f) - 1
|
|
f.seek(0)
|
|
reader = csv.DictReader(f)
|
|
preview_rows = []
|
|
voter_ids_for_preview = []
|
|
for i, row in enumerate(reader):
|
|
if i < 10:
|
|
preview_rows.append(row)
|
|
v_id = row.get(mapping.get("voter_id"))
|
|
if v_id:
|
|
voter_ids_for_preview.append(v_id.strip())
|
|
else:
|
|
break
|
|
|
|
existing_preview_ids = set(Voter.objects.filter(tenant=tenant, voter_id__in=voter_ids_for_preview).values_list("voter_id", flat=True))
|
|
|
|
preview_data = []
|
|
for row in preview_rows:
|
|
voter_id_val = row.get(mapping.get("voter_id"))
|
|
action = "update" if voter_id_val and voter_id_val.strip() in existing_preview_ids else "create"
|
|
preview_data.append({
|
|
"action": action,
|
|
"identifier": f"Voter ID: {voter_id_val}",
|
|
"details": f"Name: {row.get(mapping.get('first_name', ''))} {row.get(mapping.get('last_name', ''))}"
|
|
})
|
|
|
|
context = self.admin_site.each_context(request)
|
|
context.update({
|
|
"title": "Import Preview",
|
|
"total_count": total_count,
|
|
"create_count": sum(1 for d in preview_data if d['action'] == 'create'),
|
|
"update_count": sum(1 for d in preview_data if d['action'] == 'update'),
|
|
"preview_data": preview_data,
|
|
"mapping": mapping,
|
|
"file_path": file_path,
|
|
"tenant_id": tenant_id,
|
|
"action_url": request.path,
|
|
"opts": self.model._meta,
|
|
})
|
|
return render(request, "admin/import_preview.html", context)
|
|
except Exception as e:
|
|
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
|
|
return redirect("../")
|
|
elif "_import" in request.POST:
|
|
file_path = request.POST.get("file_path")
|
|
tenant_id = request.POST.get("tenant")
|
|
tenant = Tenant.objects.get(id=tenant_id)
|
|
|
|
mapping = {}
|
|
for field_name, _ in VOTER_MAPPABLE_FIELDS:
|
|
mapping[field_name] = request.POST.get(f"map_{field_name}")
|
|
|
|
try:
|
|
created_count = 0
|
|
updated_count = 0
|
|
errors = 0
|
|
failed_rows = []
|
|
|
|
with _read_csv_robust(file_path) as f_read:
|
|
reader = csv.DictReader(f_read)
|
|
for row in reader:
|
|
try:
|
|
raw_voter_id = row.get(mapping.get("voter_id"))
|
|
voter_id = raw_voter_id.strip() if raw_voter_id else None
|
|
|
|
if not voter_id:
|
|
row["Import Error"] = "Voter ID is required."
|
|
failed_rows.append(row)
|
|
errors += 1
|
|
continue
|
|
|
|
defaults = {}
|
|
for field_name, _ in VOTER_MAPPABLE_FIELDS:
|
|
if field_name == "voter_id": continue
|
|
csv_column = mapping.get(field_name)
|
|
if csv_column and csv_column in row:
|
|
field_value = row[csv_column].strip()
|
|
if field_name in ["birthdate", "registration_date"]:
|
|
defaults[field_name] = parse_any_date(field_value)
|
|
elif field_name in ["is_targeted", "is_inactive", "target_door_visit", "door_visit", "voted"]:
|
|
if field_value.lower() in ['true', '1', 'yes']:
|
|
defaults[field_name] = True
|
|
elif field_value.lower() in ['false', '0', 'no']:
|
|
defaults[field_name] = False
|
|
elif field_name == "phone":
|
|
defaults[field_name] = format_phone_number(field_value)
|
|
elif field_name == "email":
|
|
defaults[field_name] = field_value.lower()
|
|
else:
|
|
defaults[field_name] = field_value
|
|
|
|
if defaults.get("voted") is True:
|
|
defaults["target_door_visit"] = False
|
|
defaults["call_queue_status"] = "no_call_required"
|
|
|
|
voter, created = Voter.objects.update_or_create(
|
|
tenant=tenant,
|
|
voter_id=voter_id,
|
|
defaults=defaults
|
|
)
|
|
if created: created_count += 1
|
|
else: updated_count += 1
|
|
|
|
except Exception as e:
|
|
row["Import Error"] = str(e)
|
|
failed_rows.append(row)
|
|
errors += 1
|
|
|
|
from django.db.models import Exists, OuterRef
|
|
subquery = Voter.objects.filter(address_street=OuterRef('address_street'), city=OuterRef('city'), state=OuterRef('state'), zip_code=OuterRef('zip_code'), tenant=tenant, is_targeted=True)
|
|
Voter.objects.filter(tenant=tenant, door_visit=False, target_door_visit=True).annotate(has_targeted=Exists(subquery)).filter(has_targeted=False).update(target_door_visit=False)
|
|
if os.path.exists(file_path): os.remove(file_path)
|
|
self.message_user(request, f"Import complete: {created_count + updated_count} voters processed. ({created_count} created, {updated_count} updated, {errors} errors)")
|
|
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows[:1000]
|
|
request.session.modified = True
|
|
|
|
if errors > 0:
|
|
error_url = reverse("admin:voter-download-errors")
|
|
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
|
|
return redirect("../")
|
|
except Exception as e:
|
|
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
|
|
return redirect("../")
|
|
else:
|
|
form = VoterImportForm(request.POST, request.FILES)
|
|
if form.is_valid():
|
|
csv_file = request.FILES['file']
|
|
tenant = form.cleaned_data['tenant']
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
|
|
for chunk in csv_file.chunks(): tmp.write(chunk)
|
|
file_path = tmp.name
|
|
|
|
with _read_csv_robust(file_path) as f:
|
|
reader = csv.reader(f)
|
|
headers = next(reader)
|
|
|
|
context = self.admin_site.each_context(request)
|
|
context.update({
|
|
"title": "Map Voter Fields",
|
|
"headers": headers,
|
|
"model_fields": VOTER_MAPPABLE_FIELDS,
|
|
"tenant_id": tenant.id,
|
|
"file_path": file_path,
|
|
"action_url": request.path,
|
|
"opts": self.model._meta,
|
|
})
|
|
return render(request, "admin/import_mapping.html", context)
|
|
else:
|
|
form = VoterImportForm()
|
|
|
|
context = self.admin_site.each_context(request)
|
|
context.update({'form': form, 'title': "Import Voters", 'opts': self.model._meta})
|
|
return render(request, "admin/import_csv.html", context)
|