37769-vm/core/views.py
2026-01-30 04:40:56 +00:00

1166 lines
48 KiB
Python

import base64
import re
import urllib.parse
import urllib.request
import csv
import io
from django.http import JsonResponse, HttpResponse
from django.urls import reverse
from django.shortcuts import render, redirect, get_object_or_404
from django.db.models import Q, Sum
from django.contrib import messages
from django.core.paginator import Paginator
from .models import Voter, Tenant, Interaction, Donation, VoterLikelihood, EventParticipation, Event, EventType, InteractionType, DonationMethod, ElectionType, CampaignSettings, Volunteer, ParticipationStatus, VolunteerEvent, Interest
from .forms import VoterForm, InteractionForm, DonationForm, VoterLikelihoodForm, EventParticipationForm, VoterImportForm, AdvancedVoterSearchForm, EventParticipantAddForm, EventForm, VolunteerForm, VolunteerEventForm, VolunteerEventAddForm
import logging
from django.utils import timezone
logger = logging.getLogger(__name__)
def index(request):
"""
Main landing page for Grassroots Campaign Manager.
Displays a list of campaigns if the user is logged in but hasn't selected one.
"""
tenants = Tenant.objects.all()
selected_tenant_id = request.session.get('tenant_id')
selected_tenant = None
metrics = {}
recent_interactions = []
upcoming_events = []
if selected_tenant_id:
selected_tenant = Tenant.objects.filter(id=selected_tenant_id).first()
if selected_tenant:
voters = selected_tenant.voters.all()
total_donations = Donation.objects.filter(voter__tenant=selected_tenant).aggregate(total=Sum('amount'))['total'] or 0
# Get or create settings for the tenant
campaign_settings, _ = CampaignSettings.objects.get_or_create(tenant=selected_tenant)
donation_goal = campaign_settings.donation_goal
donation_percentage = 0
if donation_goal > 0:
donation_percentage = float(round((total_donations / donation_goal) * 100, 1))
metrics = {
'total_registered_voters': voters.count(),
'total_target_voters': voters.filter(is_targeted=True).count(),
'total_supporting': voters.filter(candidate_support='supporting').count(),
'total_target_households': voters.filter(is_targeted=True).exclude(address='').values('address').distinct().count(),
'total_door_visits': Interaction.objects.filter(voter__tenant=selected_tenant, type__name='Door Visit').count(),
'total_signs': voters.filter(Q(yard_sign='wants') | Q(yard_sign='has')).count(),
'total_window_stickers': voters.filter(Q(window_sticker='wants') | Q(window_sticker='has')).count(),
'total_donations': float(total_donations),
'donation_goal': float(donation_goal),
'donation_percentage': donation_percentage,
'volunteers_count': Volunteer.objects.filter(tenant=selected_tenant).count(),
'interactions_count': Interaction.objects.filter(voter__tenant=selected_tenant).count(),
'events_count': Event.objects.filter(tenant=selected_tenant).count(),
}
recent_interactions = Interaction.objects.filter(voter__tenant=selected_tenant).order_by('-date')[:5]
upcoming_events = Event.objects.filter(tenant=selected_tenant, date__gte=timezone.now().date()).order_by('date')[:5]
context = {
'tenants': tenants,
'selected_tenant': selected_tenant,
'metrics': metrics,
'recent_interactions': recent_interactions,
'upcoming_events': upcoming_events,
}
return render(request, 'core/index.html', context)
def select_campaign(request, tenant_id):
"""
Sets the selected campaign in the session.
"""
tenant = get_object_or_404(Tenant, id=tenant_id)
request.session['tenant_id'] = tenant.id
messages.success(request, f"You are now managing: {tenant.name}")
return redirect('index')
def voter_list(request):
"""
List and search voters. Restricted to selected tenant.
"""
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
query = request.GET.get("q")
voters = Voter.objects.filter(tenant=tenant).order_by("last_name", "first_name")
# Filtering based on dashboard metrics
if request.GET.get("is_targeted") == "true":
voters = voters.filter(is_targeted=True)
if request.GET.get("support") == "supporting":
voters = voters.filter(candidate_support="supporting")
if request.GET.get("has_address") == "true":
voters = voters.exclude(address__isnull=True).exclude(address="")
if request.GET.get("visited") == "true":
voters = voters.filter(interactions__type__name="Door Visit").distinct()
if request.GET.get("yard_sign") == "true":
voters = voters.filter(Q(yard_sign="wants") | Q(yard_sign="has"))
if request.GET.get("window_sticker") == "true":
voters = voters.filter(Q(window_sticker="wants") | Q(window_sticker="has"))
if request.GET.get("has_donations") == "true":
voters = voters.filter(donations__isnull=False).distinct()
if query:
query = query.strip()
search_filter = Q(first_name__icontains=query) | Q(last_name__icontains=query) | Q(voter_id__icontains=query)
if "," in query:
parts = [p.strip() for p in query.split(",")]
if len(parts) >= 2:
last_part = parts[0]
first_part = parts[1]
search_filter |= Q(last_name__icontains=last_part, first_name__icontains=first_part)
elif " " in query:
parts = query.split()
if len(parts) >= 2:
first_part = parts[0]
last_part = " ".join(parts[1:])
search_filter |= Q(first_name__icontains=first_part, last_name__icontains=last_part)
voters = voters.filter(search_filter).order_by("last_name", "first_name")
paginator = Paginator(voters, 50)
page_number = request.GET.get('page')
voters_page = paginator.get_page(page_number)
context = {
"voters": voters_page,
"query": query,
"selected_tenant": tenant
}
return render(request, "core/voter_list.html", context)
def voter_detail(request, voter_id):
"""
360-degree view of a voter.
"""
selected_tenant_id = request.session.get('tenant_id')
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect('index')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
voter = get_object_or_404(Voter, id=voter_id, tenant=tenant)
context = {
'voter': voter,
'selected_tenant': tenant,
'voting_records': voter.voting_records.all().order_by('-election_date'),
'donations': voter.donations.all().order_by('-date'),
'interactions': voter.interactions.all().order_by('-date'),
'event_participations': voter.event_participations.all().order_by('-event__date'),
'likelihoods': voter.likelihoods.all(),
'voter_form': VoterForm(instance=voter),
'interaction_form': InteractionForm(tenant=tenant),
'donation_form': DonationForm(tenant=tenant),
'likelihood_form': VoterLikelihoodForm(tenant=tenant),
'event_participation_form': EventParticipationForm(tenant=tenant),
}
return render(request, 'core/voter_detail.html', context)
def voter_edit(request, voter_id):
"""
Update voter core demographics.
"""
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
voter = get_object_or_404(Voter, id=voter_id, tenant=tenant)
if request.method == 'POST':
# Log incoming coordinate data for debugging
lat_raw = request.POST.get('latitude')
lon_raw = request.POST.get('longitude')
logger.info(f"Voter Edit POST: lat={lat_raw}, lon={lon_raw}")
form = VoterForm(request.POST, instance=voter)
if form.is_valid():
# If coordinates were provided in POST, ensure they are applied to the instance
# This handles cases where readonly or other widget settings might interfere
voter = form.save(commit=False)
if lat_raw:
try:
voter.latitude = lat_raw
except: pass
if lon_raw:
try:
voter.longitude = lon_raw
except: pass
voter.save()
messages.success(request, "Voter profile updated successfully.")
else:
logger.warning(f"Voter Edit Form Invalid: {form.errors}")
for field, errors in form.errors.items():
for error in errors:
messages.error(request, f"Error in {field}: {error}")
return redirect('voter_detail', voter_id=voter.id)
def add_interaction(request, voter_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
voter = get_object_or_404(Voter, id=voter_id, tenant=tenant)
if request.method == 'POST':
form = InteractionForm(request.POST, tenant=tenant)
if form.is_valid():
interaction = form.save(commit=False)
interaction.voter = voter
interaction.save()
messages.success(request, "Interaction added.")
return redirect(reverse('voter_detail', kwargs={'voter_id': voter.id}) + '?active_tab=interactions')
def edit_interaction(request, interaction_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
interaction = get_object_or_404(Interaction, id=interaction_id, voter__tenant=tenant)
if request.method == 'POST':
form = InteractionForm(request.POST, instance=interaction, tenant=tenant)
if form.is_valid():
form.save()
messages.success(request, "Interaction updated.")
return redirect(reverse('voter_detail', kwargs={'voter_id': interaction.voter.id}) + '?active_tab=interactions')
def delete_interaction(request, interaction_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
interaction = get_object_or_404(Interaction, id=interaction_id, voter__tenant=tenant)
voter_id = interaction.voter.id
if request.method == 'POST':
interaction.delete()
messages.success(request, "Interaction deleted.")
return redirect(reverse('voter_detail', kwargs={'voter_id': voter_id}) + '?active_tab=interactions')
def add_donation(request, voter_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
voter = get_object_or_404(Voter, id=voter_id, tenant=tenant)
if request.method == 'POST':
form = DonationForm(request.POST, tenant=tenant)
if form.is_valid():
donation = form.save(commit=False)
donation.voter = voter
donation.save()
messages.success(request, "Donation recorded.")
return redirect(reverse('voter_detail', kwargs={'voter_id': voter.id}) + '?active_tab=donations')
def edit_donation(request, donation_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
donation = get_object_or_404(Donation, id=donation_id, voter__tenant=tenant)
if request.method == 'POST':
form = DonationForm(request.POST, instance=donation, tenant=tenant)
if form.is_valid():
form.save()
messages.success(request, "Donation updated.")
return redirect(reverse('voter_detail', kwargs={'voter_id': donation.voter.id}) + '?active_tab=donations')
def delete_donation(request, donation_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
donation = get_object_or_404(Donation, id=donation_id, voter__tenant=tenant)
voter_id = donation.voter.id
if request.method == 'POST':
donation.delete()
messages.success(request, "Donation deleted.")
return redirect(reverse('voter_detail', kwargs={'voter_id': voter_id}) + '?active_tab=donations')
def add_likelihood(request, voter_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
voter = get_object_or_404(Voter, id=voter_id, tenant=tenant)
if request.method == 'POST':
form = VoterLikelihoodForm(request.POST, tenant=tenant)
if form.is_valid():
likelihood = form.save(commit=False)
likelihood.voter = voter
# Handle potential duplicate election_type
VoterLikelihood.objects.filter(voter=voter, election_type=likelihood.election_type).delete()
likelihood.save()
messages.success(request, "Likelihood updated.")
return redirect('voter_detail', voter_id=voter.id)
def edit_likelihood(request, likelihood_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
likelihood = get_object_or_404(VoterLikelihood, id=likelihood_id, voter__tenant=tenant)
if request.method == 'POST':
form = VoterLikelihoodForm(request.POST, instance=likelihood, tenant=tenant)
if form.is_valid():
# Check for conflict with another record of same election_type
election_type = form.cleaned_data['election_type']
if VoterLikelihood.objects.filter(voter=likelihood.voter, election_type=election_type).exclude(id=likelihood.id).exists():
VoterLikelihood.objects.filter(voter=likelihood.voter, election_type=election_type).exclude(id=likelihood.id).delete()
form.save()
messages.success(request, "Likelihood updated.")
return redirect('voter_detail', voter_id=likelihood.voter.id)
def delete_likelihood(request, likelihood_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
likelihood = get_object_or_404(VoterLikelihood, id=likelihood_id, voter__tenant=tenant)
voter_id = likelihood.voter.id
if request.method == 'POST':
likelihood.delete()
messages.success(request, "Likelihood record deleted.")
return redirect('voter_detail', voter_id=voter_id)
def add_event_participation(request, voter_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
voter = get_object_or_404(Voter, id=voter_id, tenant=tenant)
if request.method == 'POST':
form = EventParticipationForm(request.POST, tenant=tenant)
if form.is_valid():
participation = form.save(commit=False)
participation.voter = voter
# Avoid duplicate participation
if not EventParticipation.objects.filter(voter=voter, event=participation.event).exists():
participation.save()
messages.success(request, "Event participation added.")
else:
messages.warning(request, "Voter is already participating in this event.")
return redirect(reverse('voter_detail', kwargs={'voter_id': voter.id}) + '?active_tab=events')
def edit_event_participation(request, participation_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
participation = get_object_or_404(EventParticipation, id=participation_id, voter__tenant=tenant)
if request.method == 'POST':
form = EventParticipationForm(request.POST, instance=participation, tenant=tenant)
if form.is_valid():
event = form.cleaned_data['event']
if EventParticipation.objects.filter(voter=participation.voter, event=event).exclude(id=participation.id).exists():
messages.warning(request, "Voter is already participating in that event.")
else:
form.save()
messages.success(request, "Event participation updated.")
return redirect(reverse('voter_detail', kwargs={'voter_id': participation.voter.id}) + '?active_tab=events')
def delete_event_participation(request, participation_id):
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
participation = get_object_or_404(EventParticipation, id=participation_id, voter__tenant=tenant)
voter_id = participation.voter.id
if request.method == 'POST':
participation.delete()
messages.success(request, "Event participation removed.")
return redirect(reverse('voter_detail', kwargs={'voter_id': voter_id}) + '?active_tab=events')
def voter_geocode(request, voter_id):
"""
Manually trigger geocoding for a voter, potentially using values from the request.
"""
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
voter = get_object_or_404(Voter, id=voter_id, tenant=tenant)
if request.method == 'POST':
street = request.POST.get('address_street', voter.address_street)
city = request.POST.get('city', voter.city)
state = request.POST.get('state', voter.state)
zip_code = request.POST.get('zip_code', voter.zip_code)
parts = [street, city, state, zip_code]
full_address = ", ".join([p for p in parts if p])
# Use a temporary instance to avoid saving until the user clicks "Save" in the modal
temp_voter = Voter(
address_street=street,
city=city,
state=state,
zip_code=zip_code,
address=full_address
)
success, error_msg = temp_voter.geocode_address()
if success:
return JsonResponse({
'success': True,
'latitude': str(temp_voter.latitude),
'longitude': str(temp_voter.longitude),
'address': full_address
})
else:
return JsonResponse({
'success': False,
'error': f"Geocoding failed: {error_msg or 'No results found.'}"
})
return JsonResponse({'success': False, 'error': 'Invalid request method.'})
def voter_advanced_search(request):
"""
Advanced search for voters with multiple filters.
"""
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
voters = Voter.objects.filter(tenant=tenant).order_by("last_name", "first_name")
form = AdvancedVoterSearchForm(request.GET)
if form.is_valid():
data = form.cleaned_data
if data.get('first_name'):
voters = voters.filter(first_name__icontains=data['first_name'])
if data.get('last_name'):
voters = voters.filter(last_name__icontains=data['last_name'])
if data.get('voter_id'):
voters = voters.filter(voter_id__icontains=data['voter_id'])
if data.get('birth_month'):
voters = voters.filter(birthdate__month=data['birth_month'])
if data.get('city'):
voters = voters.filter(city__icontains=data['city'])
if data.get('zip_code'):
voters = voters.filter(zip_code__icontains=data['zip_code'])
if data.get('district'):
voters = voters.filter(district__icontains=data['district'])
if data.get('precinct'):
voters = voters.filter(precinct__icontains=data['precinct'])
if data.get('phone_type'):
voters = voters.filter(phone_type=data['phone_type'])
if data.get('is_targeted'):
voters = voters.filter(is_targeted=True)
if data.get('candidate_support'):
voters = voters.filter(candidate_support=data['candidate_support'])
if data.get('yard_sign'):
voters = voters.filter(yard_sign=data['yard_sign'])
if data.get('window_sticker'):
voters = voters.filter(window_sticker=data['window_sticker'])
paginator = Paginator(voters, 50)
page_number = request.GET.get('page')
voters_page = paginator.get_page(page_number)
context = {
'form': form,
'voters': voters_page,
'selected_tenant': tenant,
}
return render(request, 'core/voter_advanced_search.html', context)
def export_voters_csv(request):
"""
Exports selected or filtered voters to a CSV file.
"""
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
if request.method != 'POST':
return redirect('voter_advanced_search')
action = request.POST.get('action')
voters = Voter.objects.filter(tenant=tenant)
if action == 'export_selected':
voter_ids = request.POST.getlist('selected_voters')
voters = voters.filter(id__in=voter_ids)
else: # export_all
# Re-apply filters from hidden inputs
# These are passed as filter_fieldname
filters = {}
for key, value in request.POST.items():
if key.startswith('filter_') and value:
field_name = key.replace('filter_', '')
filters[field_name] = value
# We can use the AdvancedVoterSearchForm to validate and apply filters
# but we need to pass data without the prefix
form = AdvancedVoterSearchForm(filters)
if form.is_valid():
data = form.cleaned_data
if data.get('first_name'):
voters = voters.filter(first_name__icontains=data['first_name'])
if data.get('last_name'):
voters = voters.filter(last_name__icontains=data['last_name'])
if data.get('voter_id'):
voters = voters.filter(voter_id__icontains=data['voter_id'])
if data.get('birth_month'):
voters = voters.filter(birthdate__month=data['birth_month'])
if data.get('city'):
voters = voters.filter(city__icontains=data['city'])
if data.get('zip_code'):
voters = voters.filter(zip_code__icontains=data['zip_code'])
if data.get('district'):
voters = voters.filter(district__icontains=data['district'])
if data.get('precinct'):
voters = voters.filter(precinct__icontains=data['precinct'])
if data.get('phone_type'):
voters = voters.filter(phone_type=data['phone_type'])
if data.get('is_targeted'):
voters = voters.filter(is_targeted=True)
if data.get('candidate_support'):
voters = voters.filter(candidate_support=data['candidate_support'])
if data.get('yard_sign'):
voters = voters.filter(yard_sign=data['yard_sign'])
if data.get('window_sticker'):
voters = voters.filter(window_sticker=data['window_sticker'])
voters = voters.order_by('last_name', 'first_name')
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename="voters_export_{timezone.now().strftime("%Y%m%d_%H%M%S")}.csv"'
writer = csv.writer(response)
writer.writerow([
'Voter ID', 'First Name', 'Last Name', 'Nickname', 'Birthdate',
'Address', 'City', 'State', 'Zip Code', 'Phone', 'Phone Type', 'Secondary Phone', 'Secondary Phone Type', 'Email',
'District', 'Precinct', 'Is Targeted', 'Support', 'Yard Sign', 'Window Sticker', 'Notes'
])
for voter in voters:
writer.writerow([
voter.voter_id, voter.first_name, voter.last_name, voter.nickname, voter.birthdate,
voter.address, voter.city, voter.state, voter.zip_code, voter.phone, voter.get_phone_type_display(), voter.secondary_phone, voter.get_secondary_phone_type_display(), voter.email,
voter.district, voter.precinct, 'Yes' if voter.is_targeted else 'No',
voter.get_candidate_support_display(), voter.get_yard_sign_display(), voter.get_window_sticker_display(), voter.notes
])
return response
def voter_delete(request, voter_id):
"""
Delete a voter profile.
"""
selected_tenant_id = request.session.get('tenant_id')
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
voter = get_object_or_404(Voter, id=voter_id, tenant=tenant)
if request.method == 'POST':
voter.delete()
messages.success(request, "Voter profile deleted successfully.")
return redirect('voter_list')
return redirect('voter_detail', voter_id=voter.id)
def bulk_send_sms(request):
"""
Sends bulk SMS to selected voters using Twilio API.
"""
if request.method != 'POST':
return redirect('voter_advanced_search')
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
settings = getattr(tenant, 'settings', None)
if not settings:
messages.error(request, "Campaign settings not found.")
return redirect('voter_advanced_search')
account_sid = settings.twilio_account_sid
auth_token = settings.twilio_auth_token
from_number = settings.twilio_from_number
if not account_sid or not auth_token or not from_number:
messages.error(request, "Twilio configuration is incomplete in Campaign Settings.")
return redirect('voter_advanced_search')
voter_ids = request.POST.getlist('selected_voters')
message_body = request.POST.get('message_body')
client_time_str = request.POST.get('client_time')
interaction_date = timezone.now()
if client_time_str:
try:
from datetime import datetime
interaction_date = datetime.fromisoformat(client_time_str)
if timezone.is_naive(interaction_date):
interaction_date = timezone.make_aware(interaction_date)
except Exception as e:
logger.warning(f'Failed to parse client_time {client_time_str}: {e}')
if not message_body:
messages.error(request, "Message body cannot be empty.")
return redirect('voter_advanced_search')
voters = Voter.objects.filter(tenant=tenant, id__in=voter_ids, phone_type='cell').exclude(phone='')
if not voters.exists():
messages.warning(request, "No voters with a valid cell phone number were selected.")
return redirect('voter_advanced_search')
success_count = 0
fail_count = 0
auth_str = f"{account_sid}:{auth_token}"
auth_header = base64.b64encode(auth_str.encode()).decode()
url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
# Get or create interaction type for SMS
interaction_type, _ = InteractionType.objects.get_or_create(tenant=tenant, name="SMS Text")
for voter in voters:
# Format phone to E.164 (assume US +1)
digits = re.sub(r'\D', '', str(voter.phone))
if len(digits) == 10:
to_number = f"+1{digits}"
elif len(digits) == 11 and digits.startswith('1'):
to_number = f"+{digits}"
else:
# Skip invalid phone numbers
fail_count += 1
continue
data_dict = {
'To': to_number,
'From': from_number,
'Body': message_body
}
data = urllib.parse.urlencode(data_dict).encode()
req = urllib.request.Request(url, data=data, method='POST')
req.add_header("Authorization", f"Basic {auth_header}")
try:
with urllib.request.urlopen(req, timeout=10) as response:
if response.status in [200, 201]:
success_count += 1
# Log interaction
Interaction.objects.create(
voter=voter,
type=interaction_type,
date=interaction_date,
description='Mass SMS Text',
notes=message_body
)
else:
fail_count += 1
except Exception as e:
logger.error(f"Error sending SMS to {voter.phone}: {e}")
fail_count += 1
messages.success(request, f"Bulk SMS process completed: {success_count} successful, {fail_count} failed/skipped.")
return redirect('voter_advanced_search')
def event_list(request):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
events = Event.objects.filter(tenant=tenant).order_by('-date')
context = {
'tenant': tenant,
'events': events,
'selected_tenant': tenant,
}
return render(request, 'core/event_list.html', context)
def event_detail(request, event_id):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
event = get_object_or_404(Event, id=event_id, tenant=tenant)
participations = event.participations.all().select_related('voter', 'participation_status').order_by('voter__last_name', 'voter__first_name')
# Get assigned volunteers
volunteers = event.volunteer_assignments.all().select_related('volunteer').order_by('volunteer__last_name', 'volunteer__first_name')
# Form for adding a new participant
add_form = EventParticipantAddForm(tenant=tenant)
# Form for adding a new volunteer
add_volunteer_form = VolunteerEventAddForm(tenant=tenant)
participation_statuses = ParticipationStatus.objects.filter(tenant=tenant, is_active=True)
context = {
'tenant': tenant,
'selected_tenant': tenant,
'event': event,
'participations': participations,
'volunteers': volunteers,
'add_form': add_form,
'add_volunteer_form': add_volunteer_form,
'participation_statuses': participation_statuses,
}
return render(request, 'core/event_detail.html', context)
def event_add_participant(request, event_id):
tenant_id = request.session.get("tenant_id")
tenant = get_object_or_404(Tenant, id=tenant_id)
event = get_object_or_404(Event, id=event_id, tenant=tenant)
if request.method == 'POST':
form = EventParticipantAddForm(request.POST, tenant=tenant)
if form.is_valid():
participation = form.save(commit=False)
participation.event = event
if not EventParticipation.objects.filter(event=event, voter=participation.voter).exists():
participation.save()
messages.success(request, f"{participation.voter} added to event.")
else:
messages.warning(request, "Voter is already a participant.")
else:
messages.error(request, "Error adding participant.")
return redirect('event_detail', event_id=event.id)
def event_edit_participant(request, participation_id):
tenant_id = request.session.get("tenant_id")
tenant = get_object_or_404(Tenant, id=tenant_id)
participation = get_object_or_404(EventParticipation, id=participation_id, event__tenant=tenant)
if request.method == 'POST':
status_id = request.POST.get('participation_status')
if status_id:
status = get_object_or_404(ParticipationStatus, id=status_id, tenant=tenant)
participation.participation_status = status
participation.save()
messages.success(request, f"Participation updated for {participation.voter}.")
else:
messages.error(request, "Invalid status.")
return redirect('event_detail', event_id=participation.event.id)
def event_delete_participant(request, participation_id):
tenant_id = request.session.get("tenant_id")
tenant = get_object_or_404(Tenant, id=tenant_id)
participation = get_object_or_404(EventParticipation, id=participation_id, event__tenant=tenant)
event_id = participation.event.id
voter_name = str(participation.voter)
participation.delete()
messages.success(request, f"{voter_name} removed from event.")
return redirect('event_detail', event_id=event_id)
def voter_search_json(request):
"""
JSON endpoint for voter search, used by autocomplete/search UI.
"""
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
return JsonResponse({"results": []})
query = request.GET.get("q", "").strip()
if len(query) < 2:
return JsonResponse({"results": []})
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
voters = Voter.objects.filter(tenant=tenant)
search_filter = Q(first_name__icontains=query) | Q(last_name__icontains=query) | Q(voter_id__icontains=query)
if "," in query:
parts = [p.strip() for p in query.split(",") ]
if len(parts) >= 2:
search_filter |= Q(last_name__icontains=parts[0], first_name__icontains=parts[1])
results = voters.filter(search_filter).order_by("last_name", "first_name")[:20]
data = []
for v in results:
data.append({
"id": v.id,
"text": f"{v.last_name}, {v.first_name} ({v.voter_id})",
"address": v.address,
"phone": v.phone
})
return JsonResponse({"results": data})
def volunteer_list(request):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
volunteers = Volunteer.objects.filter(tenant=tenant).order_by('last_name', 'first_name')
# Simple search
query = request.GET.get("q")
if query:
volunteers = volunteers.filter(
Q(first_name__icontains=query) | Q(last_name__icontains=query) | Q(email__icontains=query)
)
# Interest filter
interest_id = request.GET.get("interest")
if interest_id:
volunteers = volunteers.filter(interests__id=interest_id)
paginator = Paginator(volunteers, 50)
page_number = request.GET.get('page')
volunteers_page = paginator.get_page(page_number)
interests = Interest.objects.filter(tenant=tenant).order_by('name')
context = {
'tenant': tenant,
'selected_tenant': tenant,
'volunteers': volunteers_page,
'query': query,
'interests': interests,
'selected_interest': interest_id,
}
return render(request, 'core/volunteer_list.html', context)
def volunteer_add(request):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
if request.method == 'POST':
form = VolunteerForm(request.POST, tenant=tenant)
if form.is_valid():
volunteer = form.save(commit=False)
volunteer.tenant = tenant
volunteer.save()
form.save_m2m() # Save interests
messages.success(request, f"Volunteer {volunteer} added successfully.")
return redirect('volunteer_detail', volunteer_id=volunteer.id)
else:
form = VolunteerForm(tenant=tenant)
context = {
'form': form,
'tenant': tenant,
'selected_tenant': tenant,
}
return render(request, 'core/volunteer_detail.html', context)
def volunteer_detail(request, volunteer_id):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
volunteer = get_object_or_404(Volunteer, id=volunteer_id, tenant=tenant)
if request.method == 'POST':
form = VolunteerForm(request.POST, instance=volunteer, tenant=tenant)
if form.is_valid():
form.save()
messages.success(request, f"Volunteer {volunteer} updated successfully.")
return redirect('volunteer_detail', volunteer_id=volunteer.id)
else:
form = VolunteerForm(instance=volunteer, tenant=tenant)
assignments = volunteer.event_assignments.all().select_related('event')
assign_form = VolunteerEventForm(tenant=tenant)
context = {
'volunteer': volunteer,
'form': form,
'assignments': assignments,
'assign_form': assign_form,
'tenant': tenant,
'selected_tenant': tenant,
}
return render(request, 'core/volunteer_detail.html', context)
def volunteer_delete(request, volunteer_id):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
volunteer = get_object_or_404(Volunteer, id=volunteer_id, tenant=tenant)
if request.method == 'POST':
volunteer.delete()
messages.success(request, "Volunteer deleted.")
return redirect('volunteer_list')
return redirect('volunteer_detail', volunteer_id=volunteer_id)
def volunteer_assign_event(request, volunteer_id):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
volunteer = get_object_or_404(Volunteer, id=volunteer_id, tenant=tenant)
if request.method == 'POST':
form = VolunteerEventForm(request.POST, tenant=tenant)
if form.is_valid():
assignment = form.save(commit=False)
assignment.volunteer = volunteer
assignment.save()
messages.success(request, f"Assigned to {assignment.event}.")
else:
messages.error(request, "Error assigning to event.")
return redirect('volunteer_detail', volunteer_id=volunteer.id)
def volunteer_remove_event(request, assignment_id):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
assignment = get_object_or_404(VolunteerEvent, id=assignment_id, volunteer__tenant=tenant)
volunteer_id = assignment.volunteer.id
assignment.delete()
messages.success(request, "Assignment removed.")
return redirect('volunteer_detail', volunteer_id=volunteer_id)
def interest_add(request):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
return JsonResponse({'success': False, 'error': 'No campaign selected.'})
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
if request.method == 'POST':
name = request.POST.get('name', '').strip()
if name:
interest, created = Interest.objects.get_or_create(tenant=tenant, name=name)
if created:
return JsonResponse({'success': True, 'id': interest.id, 'name': interest.name})
else:
return JsonResponse({'success': False, 'error': 'Interest already exists.'})
return JsonResponse({'success': False, 'error': 'Invalid request.'})
def interest_delete(request, interest_id):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
return JsonResponse({'success': False, 'error': 'No campaign selected.'})
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
interest = get_object_or_404(Interest, id=interest_id, tenant=tenant)
if request.method == 'POST':
interest.delete()
return JsonResponse({'success': True})
return JsonResponse({'success': False, 'error': 'Invalid request.'})
def event_create(request):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
if request.method == "POST":
form = EventForm(request.POST, tenant=tenant)
if form.is_valid():
event = form.save(commit=False)
event.tenant = tenant
event.save()
messages.success(request, "Event created successfully.")
return redirect("event_detail", event_id=event.id)
else:
form = EventForm(tenant=tenant)
context = {
"form": form,
"tenant": tenant,
"selected_tenant": tenant,
"is_create": True,
}
return render(request, "core/event_edit.html", context)
def event_edit(request, event_id):
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
event = get_object_or_404(Event, id=event_id, tenant=tenant)
if request.method == 'POST':
form = EventForm(request.POST, instance=event, tenant=tenant)
if form.is_valid():
form.save()
messages.success(request, "Event updated successfully.")
return redirect('event_detail', event_id=event.id)
else:
form = EventForm(instance=event, tenant=tenant)
context = {
'form': form,
'event': event,
'tenant': tenant,
'selected_tenant': tenant,
}
return render(request, 'core/event_edit.html', context)
def volunteer_search_json(request):
"""
JSON endpoint for volunteer search, used by autocomplete/search UI.
"""
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
return JsonResponse({"results": []})
query = request.GET.get("q", "").strip()
if len(query) < 2:
return JsonResponse({"results": []})
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
volunteers = Volunteer.objects.filter(tenant=tenant)
search_filter = Q(first_name__icontains=query) | Q(last_name__icontains=query) | Q(email__icontains=query)
results = volunteers.filter(search_filter).order_by("last_name", "first_name")[:20]
data = []
for v in results:
data.append({
"id": v.id,
"text": f"{v.first_name} {v.last_name} ({v.email})",
"phone": v.phone
})
return JsonResponse({"results": data})
def event_add_volunteer(request, event_id):
tenant_id = request.session.get("tenant_id")
tenant = get_object_or_404(Tenant, id=tenant_id)
event = get_object_or_404(Event, id=event_id, tenant=tenant)
if request.method == 'POST':
form = VolunteerEventAddForm(request.POST, tenant=tenant)
if form.is_valid():
assignment = form.save(commit=False)
assignment.event = event
if not VolunteerEvent.objects.filter(event=event, volunteer=assignment.volunteer).exists():
assignment.save()
messages.success(request, f"{assignment.volunteer} added as volunteer.")
else:
messages.warning(request, "Volunteer is already assigned to this event.")
else:
messages.error(request, "Error adding volunteer.")
return redirect('event_detail', event_id=event.id)
def event_remove_volunteer(request, assignment_id):
tenant_id = request.session.get("tenant_id")
tenant = get_object_or_404(Tenant, id=tenant_id)
assignment = get_object_or_404(VolunteerEvent, id=assignment_id, event__tenant=tenant)
event_id = assignment.event.id
volunteer_name = str(assignment.volunteer)
assignment.delete()
messages.success(request, f"{volunteer_name} removed from event volunteers.")
return redirect('event_detail', event_id=event_id)
def volunteer_bulk_send_sms(request):
"""
Sends bulk SMS to selected volunteers using Twilio API.
"""
if request.method != 'POST':
return redirect('volunteer_list')
selected_tenant_id = request.session.get("tenant_id")
if not selected_tenant_id:
messages.warning(request, "Please select a campaign first.")
return redirect("index")
tenant = get_object_or_404(Tenant, id=selected_tenant_id)
settings = getattr(tenant, 'settings', None)
if not settings:
messages.error(request, "Campaign settings not found.")
return redirect('volunteer_list')
account_sid = settings.twilio_account_sid
auth_token = settings.twilio_auth_token
from_number = settings.twilio_from_number
if not account_sid or not auth_token or not from_number:
messages.error(request, "Twilio configuration is incomplete in Campaign Settings.")
return redirect('volunteer_list')
volunteer_ids = request.POST.getlist('selected_volunteers')
message_body = request.POST.get('message_body')
if not message_body:
messages.error(request, "Message body cannot be empty.")
return redirect('volunteer_list')
volunteers = Volunteer.objects.filter(tenant=tenant, id__in=volunteer_ids).exclude(phone='')
if not volunteers.exists():
messages.warning(request, "No volunteers with a valid phone number were selected.")
return redirect('volunteer_list')
success_count = 0
fail_count = 0
auth_str = f"{account_sid}:{auth_token}"
auth_header = base64.b64encode(auth_str.encode()).decode()
url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
for volunteer in volunteers:
# Format phone to E.164 (assume US +1)
digits = re.sub(r'\D', '', str(volunteer.phone))
if len(digits) == 10:
to_number = f"+1{digits}"
elif len(digits) == 11 and digits.startswith('1'):
to_number = f"+{digits}"
else:
# Skip invalid phone numbers
fail_count += 1
continue
data_dict = {
'To': to_number,
'From': from_number,
'Body': message_body
}
data = urllib.parse.urlencode(data_dict).encode()
req = urllib.request.Request(url, data=data, method='POST')
req.add_header("Authorization", f"Basic {auth_header}")
try:
with urllib.request.urlopen(req, timeout=10) as response:
if response.status in [200, 201]:
success_count += 1
else:
fail_count += 1
except Exception as e:
logger.error(f"Error sending SMS to volunteer {volunteer.phone}: {e}")
fail_count += 1
messages.success(request, f"Bulk SMS process completed: {success_count} successful, {fail_count} failed/skipped.")
return redirect('volunteer_list')