331 lines
17 KiB
Python
331 lines
17 KiB
Python
import sys
|
|
|
|
file_path = 'core/admin.py'
|
|
with open(file_path, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
start_line = -1
|
|
for i, line in enumerate(lines):
|
|
if 'def import_likelihoods(self, request):' in line and 'class VoterLikelihoodAdmin' in lines[i-1]:
|
|
start_line = i
|
|
break
|
|
# Also check if it's just after search_fields or something
|
|
if 'def import_likelihoods(self, request):' in line:
|
|
# Check if we are inside VoterLikelihoodAdmin
|
|
# We can look back for @admin.register(VoterLikelihood)
|
|
j = i
|
|
while j > 0:
|
|
if '@admin.register(VoterLikelihood)' in lines[j]:
|
|
start_line = i
|
|
break
|
|
if '@admin.register' in lines[j] and j < i - 50: # Too far back
|
|
break
|
|
j -= 1
|
|
if start_line != -1:
|
|
break
|
|
|
|
if start_line == -1:
|
|
print("Could not find import_likelihoods in VoterLikelihoodAdmin")
|
|
sys.exit(1)
|
|
|
|
# Find the end of the method
|
|
# The method ends before @admin.register(CampaignSettings)
|
|
end_line = -1
|
|
for i in range(start_line, len(lines)):
|
|
if '@admin.register(CampaignSettings)' in lines[i]:
|
|
end_line = i
|
|
break
|
|
|
|
if end_line == -1:
|
|
print("Could not find end of import_likelihoods")
|
|
sys.exit(1)
|
|
|
|
new_method = """ def import_likelihoods(self, request):
|
|
if request.method == "POST":
|
|
if "_preview" in request.POST:
|
|
file_path = request.POST.get('file_path')
|
|
tenant_id = request.POST.get('tenant')
|
|
tenant = Tenant.objects.get(id=tenant_id)
|
|
mapping = {k: request.POST.get(f"map_{k}") for k, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS if request.POST.get(f"map_{k}")}
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8-sig') as f:
|
|
# Fast count and partial preview
|
|
total_count = sum(1 for line in f) - 1
|
|
f.seek(0)
|
|
reader = csv.DictReader(f)
|
|
preview_rows = []
|
|
voter_ids_for_preview = set()
|
|
election_types_for_preview = set()
|
|
|
|
v_id_col = mapping.get('voter_id')
|
|
et_col = mapping.get('election_type')
|
|
|
|
if not v_id_col or not et_col:
|
|
raise ValueError("Missing mapping for Voter ID or Election Type")
|
|
|
|
for i, row in enumerate(reader):
|
|
if i < 10:
|
|
preview_rows.append(row)
|
|
v_id = row.get(v_id_col)
|
|
et_name = row.get(et_col)
|
|
if v_id: voter_ids_for_preview.add(str(v_id).strip())
|
|
if et_name: election_types_for_preview.add(str(et_name).strip())
|
|
else:
|
|
break
|
|
|
|
existing_likelihoods = set(VoterLikelihood.objects.filter(
|
|
voter__tenant=tenant,
|
|
voter__voter_id__in=voter_ids_for_preview,
|
|
election_type__name__in=election_types_for_preview
|
|
).values_list("voter__voter_id", "election_type__name"))
|
|
|
|
preview_data = []
|
|
for row in preview_rows:
|
|
v_id = str(row.get(v_id_col, '')).strip()
|
|
et_name = str(row.get(et_col, '')).strip()
|
|
action = "update" if (v_id, et_name) in existing_likelihoods else "create"
|
|
preview_data.append({
|
|
"action": action,
|
|
"identifier": f"Voter: {v_id}, Election: {et_name}",
|
|
"details": f"Likelihood: {row.get(mapping.get('likelihood', '')) or ''}"
|
|
})
|
|
|
|
context = self.admin_site.each_context(request)
|
|
context.update({
|
|
"title": "Import Preview",
|
|
"total_count": total_count,
|
|
"create_count": "N/A",
|
|
"update_count": "N/A",
|
|
"preview_data": preview_data,
|
|
"mapping": mapping,
|
|
"file_path": file_path,
|
|
"tenant_id": tenant_id,
|
|
"action_url": request.path,
|
|
"opts": self.model._meta,
|
|
})
|
|
return render(request, "admin/import_preview.html", context)
|
|
except Exception as e:
|
|
self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR)
|
|
return redirect("..")
|
|
|
|
elif "_import" in request.POST:
|
|
file_path = request.POST.get('file_path')
|
|
tenant_id = request.POST.get('tenant')
|
|
tenant = Tenant.objects.get(id=tenant_id)
|
|
mapping = {k: request.POST.get(f"map_{k}") for k, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS if request.POST.get(f"map_{k}")}
|
|
|
|
try:
|
|
count = 0
|
|
created_count = 0
|
|
updated_count = 0
|
|
skipped_no_change = 0
|
|
skipped_no_id = 0
|
|
errors = 0
|
|
failed_rows = []
|
|
batch_size = 500
|
|
|
|
likelihood_choices = dict(VoterLikelihood.LIKELIHOOD_CHOICES)
|
|
likelihood_reverse = {v.lower(): k for k, v in likelihood_choices.items()}
|
|
|
|
# Pre-fetch election types for this tenant
|
|
election_types = {et.name: et for et in ElectionType.objects.filter(tenant=tenant)}
|
|
|
|
def chunk_reader(reader, size):
|
|
chunk = []
|
|
for row in reader:
|
|
chunk.append(row)
|
|
if len(chunk) == size:
|
|
yield chunk
|
|
chunk = []
|
|
if chunk:
|
|
yield chunk
|
|
|
|
with open(file_path, "r", encoding="utf-8-sig") as f:
|
|
reader = csv.DictReader(f)
|
|
v_id_col = mapping.get("voter_id")
|
|
et_col = mapping.get("election_type")
|
|
l_col = mapping.get("likelihood")
|
|
|
|
if not v_id_col or not et_col or not l_col:
|
|
raise ValueError("Missing mapping for Voter ID, Election Type, or Likelihood")
|
|
|
|
print(f"DEBUG: Starting likelihood import. Tenant: {tenant.name}")
|
|
|
|
total_processed = 0
|
|
for chunk in chunk_reader(reader, batch_size):
|
|
with transaction.atomic():
|
|
voter_ids = [str(row.get(v_id_col)).strip() for row in chunk if row.get(v_id_col)]
|
|
et_names = [str(row.get(et_col)).strip() for row in chunk if row.get(et_col)]
|
|
|
|
# Fetch existing voters
|
|
voters = {v.voter_id: v for v in Voter.objects.filter(tenant=tenant, voter_id__in=voter_ids).only("id", "voter_id")}
|
|
|
|
# Fetch existing likelihoods
|
|
existing_likelihoods = {
|
|
(vl.voter.voter_id, vl.election_type.name): vl
|
|
for vl in VoterLikelihood.objects.filter(
|
|
voter__tenant=tenant,
|
|
voter__voter_id__in=voter_ids,
|
|
election_type__name__in=et_names
|
|
).select_related("voter", "election_type")
|
|
}
|
|
|
|
to_create = []
|
|
to_update = []
|
|
processed_in_batch = set()
|
|
|
|
for row in chunk:
|
|
total_processed += 1
|
|
try:
|
|
raw_v_id = row.get(v_id_col)
|
|
raw_et_name = row.get(et_col)
|
|
raw_l_val = row.get(l_col)
|
|
|
|
if raw_v_id is None or raw_et_name is None or raw_l_val is None:
|
|
skipped_no_id += 1
|
|
continue
|
|
|
|
v_id = str(raw_v_id).strip()
|
|
et_name = str(raw_et_name).strip()
|
|
l_val = str(raw_l_val).strip()
|
|
|
|
if not v_id or not et_name or not l_val:
|
|
skipped_no_id += 1
|
|
continue
|
|
|
|
if (v_id, et_name) in processed_in_batch:
|
|
continue
|
|
processed_in_batch.add((v_id, et_name))
|
|
|
|
voter = voters.get(v_id)
|
|
if not voter:
|
|
print(f"DEBUG: Voter {v_id} not found for likelihood import")
|
|
row["Import Error"] = f"Voter {v_id} not found"
|
|
failed_rows.append(row)
|
|
errors += 1
|
|
continue
|
|
|
|
# Get or create election type
|
|
if et_name not in election_types:
|
|
election_type, _ = ElectionType.objects.get_or_create(tenant=tenant, name=et_name)
|
|
election_types[et_name] = election_type
|
|
election_type = election_types[et_name]
|
|
|
|
# Normalize likelihood
|
|
normalized_l = None
|
|
l_val_lower = l_val.lower().replace(' ', '_')
|
|
if l_val_lower in likelihood_choices:
|
|
normalized_l = l_val_lower
|
|
elif l_val_lower in likelihood_reverse:
|
|
normalized_l = likelihood_reverse[l_val_lower]
|
|
else:
|
|
# Try to find by display name more broadly
|
|
for k, v in likelihood_choices.items():
|
|
if v.lower() == l_val.lower():
|
|
normalized_l = k
|
|
break
|
|
|
|
if not normalized_l:
|
|
row["Import Error"] = f"Invalid likelihood value: {l_val}"
|
|
failed_rows.append(row)
|
|
errors += 1
|
|
continue
|
|
|
|
vl = existing_likelihoods.get((v_id, et_name))
|
|
created = False
|
|
if not vl:
|
|
vl = VoterLikelihood(voter=voter, election_type=election_type, likelihood=normalized_l)
|
|
created = True
|
|
|
|
if not created and vl.likelihood == normalized_l:
|
|
skipped_no_change += 1
|
|
continue
|
|
|
|
vl.likelihood = normalized_l
|
|
|
|
if created:
|
|
to_create.append(vl)
|
|
created_count += 1
|
|
else:
|
|
to_update.append(vl)
|
|
updated_count += 1
|
|
|
|
count += 1
|
|
except Exception as e:
|
|
print(f"DEBUG: Error importing row {total_processed}: {e}")
|
|
row["Import Error"] = str(e)
|
|
failed_rows.append(row)
|
|
errors += 1
|
|
|
|
if to_create:
|
|
VoterLikelihood.objects.bulk_create(to_create)
|
|
if to_update:
|
|
VoterLikelihood.objects.bulk_update(to_update, ["likelihood"], batch_size=250)
|
|
|
|
print(f"DEBUG: Likelihood import progress: {total_processed} processed. {count} created/updated. {skipped_no_change} skipped (no change). {skipped_no_id} skipped (no ID). {errors} errors.")
|
|
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
success_msg = f"Import complete: {count} likelihoods created/updated. ({created_count} new, {updated_count} updated, {skipped_no_change} skipped with no changes, {skipped_no_id} skipped missing data, {errors} errors)"
|
|
self.message_user(request, success_msg)
|
|
|
|
request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows
|
|
request.session.modified = True
|
|
if errors > 0:
|
|
error_url = reverse("admin:voterlikelihood-download-errors")
|
|
self.message_user(request, mark_safe(f"Failed to import {errors} rows. <a href='{error_url}' download>Download failed records</a>"), level=messages.WARNING)
|
|
return redirect("..")
|
|
except Exception as e:
|
|
print(f"DEBUG: Likelihood import failed: {e}")
|
|
self.message_user(request, f"Error processing file: {e}", level=messages.ERROR)
|
|
return redirect("..")
|
|
else:
|
|
form = VoterLikelihoodImportForm(request.POST, request.FILES)
|
|
if form.is_valid():
|
|
csv_file = request.FILES['file']
|
|
tenant = form.cleaned_data['tenant']
|
|
|
|
if not csv_file.name.endswith('.csv'):
|
|
self.message_user(request, "Please upload a CSV file.", level=messages.ERROR)
|
|
return redirect("..")
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
|
|
for chunk in csv_file.chunks():
|
|
tmp.write(chunk)
|
|
file_path = tmp.name
|
|
|
|
with open(file_path, 'r', encoding='utf-8-sig') as f:
|
|
reader = csv.reader(f)
|
|
headers = next(reader)
|
|
|
|
context = self.admin_site.each_context(request)
|
|
context.update({
|
|
'title': "Map Likelihood Fields",
|
|
'headers': headers,
|
|
'model_fields': VOTER_LIKELIHOOD_MAPPABLE_FIELDS,
|
|
'tenant_id': tenant.id,
|
|
'file_path': file_path,
|
|
'action_url': request.path,
|
|
'opts': self.model._meta,
|
|
})
|
|
return render(request, "admin/import_mapping.html", context)
|
|
else:
|
|
form = VoterLikelihoodImportForm()
|
|
|
|
context = self.admin_site.each_context(request)
|
|
context['form'] = form
|
|
context['title'] = "Import Likelihoods"
|
|
context['opts'] = self.model._meta
|
|
return render(request, "admin/import_csv.html", context)
|
|
|
|
"""
|
|
|
|
lines[start_line:end_line] = [new_method]
|
|
|
|
with open(file_path, 'w') as f:
|
|
f.writelines(lines)
|
|
|
|
print(f"Successfully patched {file_path}")
|