import sys file_path = 'core/admin.py' with open(file_path, 'r') as f: lines = f.readlines() start_line = -1 for i, line in enumerate(lines): if 'def import_likelihoods(self, request):' in line and 'class VoterLikelihoodAdmin' in lines[i-1]: start_line = i break # Also check if it's just after search_fields or something if 'def import_likelihoods(self, request):' in line: # Check if we are inside VoterLikelihoodAdmin # We can look back for @admin.register(VoterLikelihood) j = i while j > 0: if '@admin.register(VoterLikelihood)' in lines[j]: start_line = i break if '@admin.register' in lines[j] and j < i - 50: # Too far back break j -= 1 if start_line != -1: break if start_line == -1: print("Could not find import_likelihoods in VoterLikelihoodAdmin") sys.exit(1) # Find the end of the method # The method ends before @admin.register(CampaignSettings) end_line = -1 for i in range(start_line, len(lines)): if '@admin.register(CampaignSettings)' in lines[i]: end_line = i break if end_line == -1: print("Could not find end of import_likelihoods") sys.exit(1) new_method = """ def import_likelihoods(self, request): if request.method == "POST": if "_preview" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {k: request.POST.get(f"map_{k}") for k, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS if request.POST.get(f"map_{k}")} try: with open(file_path, 'r', encoding='utf-8-sig') as f: # Fast count and partial preview total_count = sum(1 for line in f) - 1 f.seek(0) reader = csv.DictReader(f) preview_rows = [] voter_ids_for_preview = set() election_types_for_preview = set() v_id_col = mapping.get('voter_id') et_col = mapping.get('election_type') if not v_id_col or not et_col: raise ValueError("Missing mapping for Voter ID or Election Type") for i, row in enumerate(reader): if i < 10: preview_rows.append(row) v_id = row.get(v_id_col) et_name = row.get(et_col) if v_id: voter_ids_for_preview.add(str(v_id).strip()) if et_name: election_types_for_preview.add(str(et_name).strip()) else: break existing_likelihoods = set(VoterLikelihood.objects.filter( voter__tenant=tenant, voter__voter_id__in=voter_ids_for_preview, election_type__name__in=election_types_for_preview ).values_list("voter__voter_id", "election_type__name")) preview_data = [] for row in preview_rows: v_id = str(row.get(v_id_col, '')).strip() et_name = str(row.get(et_col, '')).strip() action = "update" if (v_id, et_name) in existing_likelihoods else "create" preview_data.append({ "action": action, "identifier": f"Voter: {v_id}, Election: {et_name}", "details": f"Likelihood: {row.get(mapping.get('likelihood', '')) or ''}" }) context = self.admin_site.each_context(request) context.update({ "title": "Import Preview", "total_count": total_count, "create_count": "N/A", "update_count": "N/A", "preview_data": preview_data, "mapping": mapping, "file_path": file_path, "tenant_id": tenant_id, "action_url": request.path, "opts": self.model._meta, }) return render(request, "admin/import_preview.html", context) except Exception as e: self.message_user(request, f"Error processing preview: {e}", level=messages.ERROR) return redirect("..") elif "_import" in request.POST: file_path = request.POST.get('file_path') tenant_id = request.POST.get('tenant') tenant = Tenant.objects.get(id=tenant_id) mapping = {k: request.POST.get(f"map_{k}") for k, _ in VOTER_LIKELIHOOD_MAPPABLE_FIELDS if request.POST.get(f"map_{k}")} try: count = 0 created_count = 0 updated_count = 0 skipped_no_change = 0 skipped_no_id = 0 errors = 0 failed_rows = [] batch_size = 500 likelihood_choices = dict(VoterLikelihood.LIKELIHOOD_CHOICES) likelihood_reverse = {v.lower(): k for k, v in likelihood_choices.items()} # Pre-fetch election types for this tenant election_types = {et.name: et for et in ElectionType.objects.filter(tenant=tenant)} def chunk_reader(reader, size): chunk = [] for row in reader: chunk.append(row) if len(chunk) == size: yield chunk chunk = [] if chunk: yield chunk with open(file_path, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) v_id_col = mapping.get("voter_id") et_col = mapping.get("election_type") l_col = mapping.get("likelihood") if not v_id_col or not et_col or not l_col: raise ValueError("Missing mapping for Voter ID, Election Type, or Likelihood") print(f"DEBUG: Starting likelihood import. Tenant: {tenant.name}") total_processed = 0 for chunk in chunk_reader(reader, batch_size): with transaction.atomic(): voter_ids = [str(row.get(v_id_col)).strip() for row in chunk if row.get(v_id_col)] et_names = [str(row.get(et_col)).strip() for row in chunk if row.get(et_col)] # Fetch existing voters voters = {v.voter_id: v for v in Voter.objects.filter(tenant=tenant, voter_id__in=voter_ids).only("id", "voter_id")} # Fetch existing likelihoods existing_likelihoods = { (vl.voter.voter_id, vl.election_type.name): vl for vl in VoterLikelihood.objects.filter( voter__tenant=tenant, voter__voter_id__in=voter_ids, election_type__name__in=et_names ).select_related("voter", "election_type") } to_create = [] to_update = [] processed_in_batch = set() for row in chunk: total_processed += 1 try: raw_v_id = row.get(v_id_col) raw_et_name = row.get(et_col) raw_l_val = row.get(l_col) if raw_v_id is None or raw_et_name is None or raw_l_val is None: skipped_no_id += 1 continue v_id = str(raw_v_id).strip() et_name = str(raw_et_name).strip() l_val = str(raw_l_val).strip() if not v_id or not et_name or not l_val: skipped_no_id += 1 continue if (v_id, et_name) in processed_in_batch: continue processed_in_batch.add((v_id, et_name)) voter = voters.get(v_id) if not voter: print(f"DEBUG: Voter {v_id} not found for likelihood import") row["Import Error"] = f"Voter {v_id} not found" failed_rows.append(row) errors += 1 continue # Get or create election type if et_name not in election_types: election_type, _ = ElectionType.objects.get_or_create(tenant=tenant, name=et_name) election_types[et_name] = election_type election_type = election_types[et_name] # Normalize likelihood normalized_l = None l_val_lower = l_val.lower().replace(' ', '_') if l_val_lower in likelihood_choices: normalized_l = l_val_lower elif l_val_lower in likelihood_reverse: normalized_l = likelihood_reverse[l_val_lower] else: # Try to find by display name more broadly for k, v in likelihood_choices.items(): if v.lower() == l_val.lower(): normalized_l = k break if not normalized_l: row["Import Error"] = f"Invalid likelihood value: {l_val}" failed_rows.append(row) errors += 1 continue vl = existing_likelihoods.get((v_id, et_name)) created = False if not vl: vl = VoterLikelihood(voter=voter, election_type=election_type, likelihood=normalized_l) created = True if not created and vl.likelihood == normalized_l: skipped_no_change += 1 continue vl.likelihood = normalized_l if created: to_create.append(vl) created_count += 1 else: to_update.append(vl) updated_count += 1 count += 1 except Exception as e: print(f"DEBUG: Error importing row {total_processed}: {e}") row["Import Error"] = str(e) failed_rows.append(row) errors += 1 if to_create: VoterLikelihood.objects.bulk_create(to_create) if to_update: VoterLikelihood.objects.bulk_update(to_update, ["likelihood"], batch_size=250) print(f"DEBUG: Likelihood import progress: {total_processed} processed. {count} created/updated. {skipped_no_change} skipped (no change). {skipped_no_id} skipped (no ID). {errors} errors.") if os.path.exists(file_path): os.remove(file_path) success_msg = f"Import complete: {count} likelihoods created/updated. ({created_count} new, {updated_count} updated, {skipped_no_change} skipped with no changes, {skipped_no_id} skipped missing data, {errors} errors)" self.message_user(request, success_msg) request.session[f"{self.model._meta.model_name}_import_errors"] = failed_rows request.session.modified = True if errors > 0: error_url = reverse("admin:voterlikelihood-download-errors") self.message_user(request, mark_safe(f"Failed to import {errors} rows. Download failed records"), level=messages.WARNING) return redirect("..") except Exception as e: print(f"DEBUG: Likelihood import failed: {e}") self.message_user(request, f"Error processing file: {e}", level=messages.ERROR) return redirect("..") else: form = VoterLikelihoodImportForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['file'] tenant = form.cleaned_data['tenant'] if not csv_file.name.endswith('.csv'): self.message_user(request, "Please upload a CSV file.", level=messages.ERROR) return redirect("..") with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: for chunk in csv_file.chunks(): tmp.write(chunk) file_path = tmp.name with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.reader(f) headers = next(reader) context = self.admin_site.each_context(request) context.update({ 'title': "Map Likelihood Fields", 'headers': headers, 'model_fields': VOTER_LIKELIHOOD_MAPPABLE_FIELDS, 'tenant_id': tenant.id, 'file_path': file_path, 'action_url': request.path, 'opts': self.model._meta, }) return render(request, "admin/import_mapping.html", context) else: form = VoterLikelihoodImportForm() context = self.admin_site.each_context(request) context['form'] = form context['title'] = "Import Likelihoods" context['opts'] = self.model._meta return render(request, "admin/import_csv.html", context) """ lines[start_line:end_line] = [new_method] with open(file_path, 'w') as f: f.writelines(lines) print(f"Successfully patched {file_path}")