37769-vm/patch_voter_likelihood.py
2026-01-28 21:21:09 +00:00

331 lines
17 KiB
Python

import sys
file_path = 'core/admin.py'
with open(file_path, 'r') as f:
lines = f.readlines()
start_line = -1
for i, line in enumerate(lines):
if 'def import_likelihoods(self, request):' in line and 'class VoterLikelihoodAdmin' in lines[i-1]:
start_line = i
break
# Also check if it's just after search_fields or something
if 'def import_likelihoods(self, request):' in line:
# Check if we are inside VoterLikelihoodAdmin
# We can look back for @admin.register(VoterLikelihood)
j = i
while j > 0:
if '@admin.register(VoterLikelihood)' in lines[j]:
start_line = i
break
if '@admin.register' in lines[j] and j < i - 50: # Too far back
break
j -= 1
if start_line != -1:
break
if start_line == -1:
print("Could not find import_likelihoods in VoterLikelihoodAdmin")
sys.exit(1)
# Find the end of the method
# The method ends before @admin.register(CampaignSettings)
end_line = -1
for i in range(start_line, len(lines)):
if '@admin.register(CampaignSettings)' in lines[i]:
end_line = i
break
if end_line == -1:
print("Could not find end of import_likelihoods")
sys.exit(1)
new_method = """ def import_likelihoods(self, request):
if request.method == "POST":
if "_preview" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {k: request.POST.get(f"map_{k}") for k, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS if request.POST.get(f"map_{k}")}
try:
with open(file_path, 'r', encoding='utf-8-sig') as f:
# Fast count and partial preview
total_count = sum(1 for line in f) - 1
f.seek(0)
reader = csv.DictReader(f)
preview_rows = []
voter_ids_for_preview = set()
election_types_for_preview = set()
v_id_col = mapping.get('voter_id')
et_col = mapping.get('election_type')
if not v_id_col or not et_col:
raise ValueError("Missing mapping for Voter ID or Election Type")
for i, row in enumerate(reader):
if i < 10:
preview_rows.append(row)
v_id = row.get(v_id_col)
et_name = row.get(et_col)
if v_id: voter_ids_for_preview.add(str(v_id).strip())
if et_name: election_types_for_preview.add(str(et_name).strip())
else:
break
existing_likelihoods = set(VoterLikelihood.objects.filter(
voter__tenant=tenant,
voter__voter_id__in=voter_ids_for_preview,
election_type__name__in=election_types_for_preview
).values_list("voter__voter_id", "election_type__name"))
preview_data = []
for row in preview_rows:
v_id = str(row.get(v_id_col, '')).strip()
et_name = str(row.get(et_col, '')).strip()
action = "update" if (v_id, et_name) in existing_likelihoods else "create"
preview_data.append({
"action": action,
"identifier": f"Voter: {v_id}, Election: {et_name}",
"details": f"Likelihood: {row.get(mapping.get('likelihood', '')) or ''}"
})
context = self.admin_site.each_context(request)
context.update({
"title": "Import Preview",
"total_count": total_count,
"create_count": "N/A",
"update_count": "N/A",
"preview_data": preview_data,
"mapping": mapping,
"file_path": file_path,
"tenant_id": tenant_id,
"action_url": request.path,
"opts": self.model._meta,
})
return render(request, "admin/import_preview.html", context)
except Exception as e:
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
return redirect("..")
elif "_import" in request.POST:
file_path = request.POST.get('file_path')
tenant_id = request.POST.get('tenant')
tenant = Tenant.objects.get(id=tenant_id)
mapping = {k: request.POST.get(f"map_{k}") for k, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS if request.POST.get(f"map_{k}")}
try:
count = 0
created_count = 0
updated_count = 0
skipped_no_change = 0
skipped_no_id = 0
errors = 0
failed_rows = []
batch_size = 500
likelihood_choices = dict(VoterLikelihood.LIKELIHOOD_CHOICES)
likelihood_reverse = {v.lower(): k for k, v in likelihood_choices.items()}
# Pre-fetch election types for this tenant
election_types = {et.name: et for et in ElectionType.objects.filter(tenant=tenant)}
def chunk_reader(reader, size):
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) == size:
yield chunk
chunk = []
if chunk:
yield chunk
with open(file_path, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
v_id_col = mapping.get("voter_id")
et_col = mapping.get("election_type")
l_col = mapping.get("likelihood")
if not v_id_col or not et_col or not l_col:
raise ValueError("Missing mapping for Voter ID, Election Type, or Likelihood")
print(f"DEBUG: Starting likelihood import. Tenant: {tenant.name}")
total_processed = 0
for chunk in chunk_reader(reader, batch_size):
with transaction.atomic():
voter_ids = [str(row.get(v_id_col)).strip() for row in chunk if row.get(v_id_col)]
et_names = [str(row.get(et_col)).strip() for row in chunk if row.get(et_col)]
# Fetch existing voters
voters = {v.voter_id: v for v in Voter.objects.filter(tenant=tenant, voter_id__in=voter_ids).only("id", "voter_id")}
# Fetch existing likelihoods
existing_likelihoods = {
(vl.voter.voter_id, vl.election_type.name): vl
for vl in VoterLikelihood.objects.filter(
voter__tenant=tenant,
voter__voter_id__in=voter_ids,
election_type__name__in=et_names
).select_related("voter", "election_type")
}
to_create = []
to_update = []
processed_in_batch = set()
for row in chunk:
total_processed += 1
try:
raw_v_id = row.get(v_id_col)
raw_et_name = row.get(et_col)
raw_l_val = row.get(l_col)
if raw_v_id is None or raw_et_name is None or raw_l_val is None:
skipped_no_id += 1
continue
v_id = str(raw_v_id).strip()
et_name = str(raw_et_name).strip()
l_val = str(raw_l_val).strip()
if not v_id or not et_name or not l_val:
skipped_no_id += 1
continue
if (v_id, et_name) in processed_in_batch:
continue
processed_in_batch.add((v_id, et_name))
voter = voters.get(v_id)
if not voter:
print(f"DEBUG: Voter {v_id} not found for likelihood import")
row["Import Error"] = f"Voter {v_id} not found"
failed_rows.append(row)
errors += 1
continue
# Get or create election type
if et_name not in election_types:
election_type, _ = ElectionType.objects.get_or_create(tenant=tenant, name=et_name)
election_types[et_name] = election_type
election_type = election_types[et_name]
# Normalize likelihood
normalized_l = None
l_val_lower = l_val.lower().replace(' ', '_')
if l_val_lower in likelihood_choices:
normalized_l = l_val_lower
elif l_val_lower in likelihood_reverse:
normalized_l = likelihood_reverse[l_val_lower]
else:
# Try to find by display name more broadly
for k, v in likelihood_choices.items():
if v.lower() == l_val.lower():
normalized_l = k
break
if not normalized_l:
row["Import Error"] = f"Invalid likelihood value: {l_val}"
failed_rows.append(row)
errors += 1
continue
vl = existing_likelihoods.get((v_id, et_name))
created = False
if not vl:
vl = VoterLikelihood(voter=voter, election_type=election_type, likelihood=normalized_l)
created = True
if not created and vl.likelihood == normalized_l:
skipped_no_change += 1
continue
vl.likelihood = normalized_l
if created:
to_create.append(vl)
created_count += 1
else:
to_update.append(vl)
updated_count += 1
count += 1
except Exception as e:
print(f"DEBUG: Error importing row {total_processed}: {e}")
row["Import Error"] = str(e)
failed_rows.append(row)
errors += 1
if to_create:
VoterLikelihood.objects.bulk_create(to_create)
if to_update:
VoterLikelihood.objects.bulk_update(to_update, ["likelihood"], batch_size=250)
print(f"DEBUG: Likelihood import progress: {total_processed} processed. {count} created/updated. {skipped_no_change} skipped (no change). {skipped_no_id} skipped (no ID). {errors} errors.")
if os.path.exists(file_path):
os.remove(file_path)
success_msg = f"Import complete: {count} likelihoods created/updated. ({created_count} new, {updated_count} updated, {skipped_no_change} skipped with no changes, {skipped_no_id} skipped missing data, {errors} errors)"
self.message_user(request, success_msg)
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows
request.session.modified = True
if errors > 0:
error_url = reverse("admin:voterlikelihood-download-errors")
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
return redirect("..")
except Exception as e:
print(f"DEBUG: Likelihood import failed: {e}")
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
return redirect("..")
else:
form = VoterLikelihoodImportForm(request.POST, request.FILES)
if form.is_valid():
csv_file = request.FILES['file']
tenant = form.cleaned_data['tenant']
if not csv_file.name.endswith('.csv'):
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
return redirect("..")
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
for chunk in csv_file.chunks():
tmp.write(chunk)
file_path = tmp.name
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
headers = next(reader)
context = self.admin_site.each_context(request)
context.update({
'title': "Map Likelihood Fields",
'headers': headers,
'model_fields': VOTER_LIKELIHOOD_MAPPABLE_FIELDS,
'tenant_id': tenant.id,
'file_path': file_path,
'action_url': request.path,
'opts': self.model._meta,
})
return render(request, "admin/import_mapping.html", context)
else:
form = VoterLikelihoodImportForm()
context = self.admin_site.each_context(request)
context['form'] = form
context['title'] = "Import Likelihoods"
context['opts'] = self.model._meta
return render(request, "admin/import_csv.html", context)
"""
lines[start_line:end_line] = [new_method]
with open(file_path, 'w') as f:
f.writelines(lines)
print(f"Successfully patched {file_path}")