883 lines
38 KiB
Python
883 lines
38 KiB
Python
import zoneinfo
|
|
from django.db.models.signals import pre_save, post_save, post_delete
|
|
from django.dispatch import receiver
|
|
from django.db import models
|
|
from django.contrib.auth.models import User
|
|
import json
|
|
import urllib.parse
|
|
import urllib.request
|
|
import logging
|
|
from decimal import Decimal
|
|
from django.conf import settings
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def format_phone_number(phone):
|
|
"""Formats a phone number to (xxx) xxx-xxxx if it has 10 digits or 11 starting with 1."""
|
|
if not phone:
|
|
return phone
|
|
digits = re.sub(r'\D', '', str(phone))
|
|
if len(digits) == 10:
|
|
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
|
|
elif len(digits) == 11 and digits.startswith('1'):
|
|
return f"({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
|
|
return phone
|
|
|
|
class Tenant(models.Model):
|
|
name = models.CharField(max_length=100)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class TenantUserRole(models.Model):
|
|
ROLE_CHOICES = [
|
|
('system_admin', 'System Administrator'),
|
|
('campaign_admin', 'Campaign Administrator'),
|
|
('campaign_staff', 'Campaign Staff'),
|
|
]
|
|
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='tenant_roles')
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='user_roles')
|
|
role = models.CharField(max_length=20, choices=ROLE_CHOICES)
|
|
|
|
class Meta:
|
|
unique_together = ('user', 'tenant', 'role')
|
|
|
|
def __str__(self):
|
|
return f"{self.user.username} - {self.tenant.name} ({self.role})"
|
|
|
|
class InteractionType(models.Model):
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='interaction_types')
|
|
name = models.CharField(max_length=100)
|
|
is_active = models.BooleanField(default=True)
|
|
|
|
class Meta:
|
|
unique_together = ('tenant', 'name')
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class DonationMethod(models.Model):
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='donation_methods')
|
|
name = models.CharField(max_length=100)
|
|
is_active = models.BooleanField(default=True)
|
|
|
|
class Meta:
|
|
unique_together = ('tenant', 'name')
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class ElectionType(models.Model):
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='election_types')
|
|
name = models.CharField(max_length=100)
|
|
is_active = models.BooleanField(default=True)
|
|
|
|
class Meta:
|
|
unique_together = ('tenant', 'name')
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class ParticipationStatus(models.Model):
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='participation_statuses')
|
|
name = models.CharField(max_length=100)
|
|
is_active = models.BooleanField(default=True)
|
|
|
|
class Meta:
|
|
unique_together = ('tenant', 'name')
|
|
verbose_name_plural = 'Participation Statuses'
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class VolunteerRole(models.Model):
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='volunteer_roles')
|
|
name = models.CharField(max_length=100)
|
|
is_active = models.BooleanField(default=True)
|
|
|
|
class Meta:
|
|
unique_together = ('tenant', 'name')
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class EventType(models.Model):
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='event_types')
|
|
name = models.CharField(max_length=100)
|
|
available_roles = models.ManyToManyField(VolunteerRole, blank=True, related_name='event_types')
|
|
default_volunteer_role = models.ForeignKey(VolunteerRole, on_delete=models.SET_NULL, null=True, blank=True, related_name="default_for_event_types")
|
|
is_active = models.BooleanField(default=True)
|
|
|
|
class Meta:
|
|
unique_together = ('tenant', 'name')
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class Interest(models.Model):
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='interests')
|
|
name = models.CharField(max_length=100)
|
|
|
|
class Meta:
|
|
unique_together = ('tenant', 'name')
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class Voter(models.Model):
|
|
CANDIDATE_SUPPORT_CHOICES = [
|
|
('unknown', 'Unknown'),
|
|
('supporting', 'Supporting'),
|
|
('not_supporting', 'Not Supporting'),
|
|
]
|
|
YARD_SIGN_CHOICES = [
|
|
('none', 'None'),
|
|
('wants', 'Wants a yard sign'),
|
|
('wants_large', 'Wants a Large Sign'),
|
|
('has', 'Has a yard sign'),
|
|
('has_large', 'Has a Large Sign'),
|
|
]
|
|
WINDOW_STICKER_CHOICES = [
|
|
('none', 'None'),
|
|
('wants', 'Wants Sticker'),
|
|
('has', 'Has Sticker'),
|
|
]
|
|
PHONE_TYPE_CHOICES = [
|
|
('home', 'Home Phone'),
|
|
('cell', 'Cell Phone'),
|
|
('work', 'Work Phone'),
|
|
]
|
|
CALL_QUEUE_STATUS_CHOICES = [
|
|
('no_call_required', 'No Call Required'),
|
|
('to_be_called', 'To Be Called'),
|
|
('in_call_queue', 'In Call Queue'),
|
|
('called', 'Called'),
|
|
]
|
|
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='voters')
|
|
voter_id = models.CharField(max_length=50, blank=True, db_index=True)
|
|
first_name = models.CharField(max_length=100, db_index=True)
|
|
last_name = models.CharField(max_length=100, db_index=True)
|
|
nickname = models.CharField(max_length=100, blank=True)
|
|
birthdate = models.DateField(null=True, blank=True, db_index=True)
|
|
address = models.TextField(blank=True)
|
|
address_street = models.CharField(max_length=255, blank=True, db_index=True)
|
|
city = models.CharField(max_length=100, blank=True, db_index=True)
|
|
state = models.CharField(max_length=2, blank=True, db_index=True)
|
|
prior_state = models.CharField(max_length=2, blank=True)
|
|
zip_code = models.CharField(max_length=20, blank=True, db_index=True)
|
|
county = models.CharField(max_length=100, blank=True)
|
|
latitude = models.DecimalField(max_digits=12, decimal_places=9, null=True, blank=True)
|
|
longitude = models.DecimalField(max_digits=12, decimal_places=9, null=True, blank=True)
|
|
phone = models.CharField(max_length=20, blank=True)
|
|
phone_type = models.CharField(max_length=10, choices=PHONE_TYPE_CHOICES, default='cell')
|
|
secondary_phone = models.CharField(max_length=20, blank=True)
|
|
secondary_phone_type = models.CharField(max_length=10, choices=PHONE_TYPE_CHOICES, default="cell")
|
|
email = models.EmailField(blank=True)
|
|
district = models.CharField(max_length=100, blank=True, db_index=True)
|
|
precinct = models.CharField(max_length=100, blank=True, db_index=True)
|
|
registration_date = models.DateField(null=True, blank=True)
|
|
is_targeted = models.BooleanField(default=False, db_index=True)
|
|
target_door_visit = models.BooleanField(default=False, db_index=True)
|
|
candidate_support = models.CharField(max_length=20, choices=CANDIDATE_SUPPORT_CHOICES, default='unknown', db_index=True)
|
|
yard_sign = models.CharField(max_length=20, choices=YARD_SIGN_CHOICES, default='none', db_index=True)
|
|
ever_had_yard_sign = models.BooleanField(default=False, db_index=True)
|
|
ever_had_large_sign = models.BooleanField(default=False, db_index=True)
|
|
window_sticker = models.CharField(max_length=20, choices=WINDOW_STICKER_CHOICES, default='none', verbose_name='Window Sticker Status', db_index=True)
|
|
notes = models.TextField(blank=True)
|
|
door_visit = models.BooleanField(default=False, db_index=True)
|
|
neighborhood = models.CharField(max_length=100, blank=True, db_index=True)
|
|
is_inactive = models.BooleanField(default=False, db_index=True)
|
|
call_queue_status = models.CharField(max_length=20, choices=CALL_QUEUE_STATUS_CHOICES, default='no_call_required', db_index=True)
|
|
voted = models.BooleanField(default=False, db_index=True)
|
|
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
|
|
class Meta:
|
|
indexes = [
|
|
models.Index(fields=['tenant', 'address_street', 'city', 'state', 'zip_code']),
|
|
models.Index(fields=['tenant', 'is_inactive', 'door_visit', 'target_door_visit']),
|
|
models.Index(fields=['tenant', 'last_name', 'first_name']),
|
|
]
|
|
|
|
def geocode_address(self, use_fallback=True):
|
|
"""
|
|
Attempts to geocode the voter's address using Google Maps API.
|
|
Returns (success, error_message).
|
|
"""
|
|
if not self.address:
|
|
return False, "No address provided."
|
|
|
|
api_key = getattr(settings, 'GOOGLE_MAPS_API_KEY', None)
|
|
if not api_key:
|
|
return False, "Google Maps API Key not configured."
|
|
|
|
def _fetch(addr):
|
|
try:
|
|
query = urllib.parse.quote(addr)
|
|
url = f"https://maps.googleapis.com/maps/api/geocode/json?address={query}&key={api_key}"
|
|
req = urllib.request.Request(url)
|
|
with urllib.request.urlopen(req, timeout=10) as response:
|
|
data = json.loads(response.read().decode())
|
|
if data.get('status') == 'OK':
|
|
result = data['results'][0]
|
|
return result['geometry']['location']['lat'], result['geometry']['location']['lng'], None
|
|
elif data.get('status') == 'ZERO_RESULTS':
|
|
return None, None, "No results found."
|
|
elif data.get('status') == 'OVER_QUERY_LIMIT':
|
|
return None, None, "Query limit exceeded."
|
|
elif data.get('status') == 'REQUEST_DENIED':
|
|
return None, None, f"Request denied: {data.get('error_message', 'No message')}"
|
|
elif data.get('status') == 'INVALID_REQUEST':
|
|
return None, None, "Invalid request."
|
|
else:
|
|
return None, None, f"Google Maps Error: {data.get('status')}"
|
|
except Exception as e:
|
|
return None, None, str(e)
|
|
|
|
logger.info(f"Geocoding with Google Maps: {self.address}")
|
|
lat, lon, err = _fetch(self.address)
|
|
|
|
if not lat and use_fallback:
|
|
# Try fallback: City, State, Zip
|
|
fallback_parts = [self.city, self.state, self.zip_code]
|
|
fallback_addr = ", ".join([p for p in fallback_parts if p])
|
|
if fallback_addr and fallback_addr != self.address:
|
|
logger.info(f"Geocoding fallback: {fallback_addr}")
|
|
lat, lon, fallback_err = _fetch(fallback_addr)
|
|
if lat:
|
|
err = None # Clear previous error if fallback works
|
|
|
|
if lat and lon:
|
|
# Truncate coordinates to 12 characters as requested
|
|
self.latitude = Decimal(str(lat)[:12])
|
|
self.longitude = Decimal(str(lon)[:12])
|
|
logger.info(f"Geocoding success: {self.latitude}, {self.longitude}")
|
|
return True, None
|
|
|
|
logger.warning(f"Geocoding failed for {self.address}: {err}")
|
|
return False, err
|
|
|
|
def save(self, *args, **kwargs):
|
|
if self.yard_sign in ['has', 'wants']:
|
|
self.ever_had_yard_sign = True
|
|
elif self.yard_sign in ['has_large', 'wants_large']:
|
|
self.ever_had_large_sign = True
|
|
skip_geocode = kwargs.pop("skip_geocode", False) or getattr(self, "_skip_geocode", False)
|
|
update_fields = kwargs.get('update_fields')
|
|
|
|
# Auto-format phone number
|
|
self.phone = format_phone_number(self.phone)
|
|
self.secondary_phone = format_phone_number(self.secondary_phone)
|
|
|
|
# Ensure coordinates are truncated to 12 characters before saving
|
|
if self.latitude:
|
|
self.latitude = Decimal(str(self.latitude)[:12])
|
|
if self.longitude:
|
|
self.longitude = Decimal(str(self.longitude)[:12])
|
|
|
|
# Auto concatenation: address street, city, state, zip
|
|
parts = [self.address_street, self.city, self.state, self.zip_code]
|
|
self.address = ", ".join([p for p in parts if p])
|
|
|
|
# Change detection
|
|
should_geocode = False
|
|
|
|
# Detect manual change of target_door_visit
|
|
if self.pk:
|
|
orig = getattr(self, "_orig_obj", None)
|
|
if not orig:
|
|
try:
|
|
orig = Voter.objects.get(pk=self.pk)
|
|
except Voter.DoesNotExist:
|
|
orig = None
|
|
|
|
if orig:
|
|
self._orig_obj = orig # Cache it for geocoding check and signals
|
|
if not orig.target_door_visit and self.target_door_visit:
|
|
# User manually checked the box (or changed it to True)
|
|
self._target_door_visit_manually_set = True
|
|
|
|
# If update_fields is set and doesn't include address components, skip geocode
|
|
if update_fields:
|
|
addr_fields = {'address_street', 'city', 'state', 'zip_code', 'latitude', 'longitude'}
|
|
if not addr_fields.intersection(update_fields):
|
|
skip_geocode = True
|
|
|
|
if not skip_geocode:
|
|
if not self.pk:
|
|
# New record
|
|
# Only auto-geocode if coordinates were not already provided
|
|
if self.latitude is None or self.longitude is None:
|
|
should_geocode = True
|
|
else:
|
|
orig = getattr(self, "_orig_obj", None) # Already set above but being safe
|
|
if orig:
|
|
# Detect if address components changed
|
|
address_changed = (self.address_street != orig.address_street or
|
|
self.city != orig.city or
|
|
self.state != orig.state or
|
|
self.zip_code != orig.zip_code)
|
|
|
|
coords_provided = (self.latitude != orig.latitude or self.longitude != orig.longitude)
|
|
|
|
# If specifically provided in import, treat as provided even if same as DB
|
|
if getattr(self, "_coords_provided_in_import", False):
|
|
coords_provided = True
|
|
|
|
# Auto-geocode if address changed AND coordinates were NOT manually updated
|
|
if address_changed and not coords_provided:
|
|
should_geocode = True
|
|
|
|
# Auto-geocode if coordinates are still missing and were not just provided
|
|
if (self.latitude is None or self.longitude is None) and not coords_provided:
|
|
should_geocode = True
|
|
else:
|
|
should_geocode = True
|
|
|
|
if not skip_geocode and should_geocode and self.address:
|
|
# We don't want to block save if geocoding fails, so we just call it
|
|
self.geocode_address()
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
def __str__(self):
|
|
return f"{self.first_name} {self.last_name}"
|
|
|
|
class VotingRecord(models.Model):
|
|
voter = models.ForeignKey(Voter, on_delete=models.CASCADE, related_name='voting_records')
|
|
election_date = models.DateField()
|
|
election_description = models.CharField(max_length=255)
|
|
primary_party = models.CharField(max_length=100, blank=True)
|
|
|
|
def __str__(self):
|
|
return f"{self.voter} - {self.election_description}"
|
|
|
|
class Event(models.Model):
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='events')
|
|
name = models.CharField(max_length=255, db_index=True)
|
|
date = models.DateField()
|
|
start_time = models.TimeField(null=True, blank=True)
|
|
end_time = models.TimeField(null=True, blank=True)
|
|
event_type = models.ForeignKey(EventType, on_delete=models.PROTECT, null=True)
|
|
default_volunteer_role = models.ForeignKey(VolunteerRole, on_delete=models.SET_NULL, null=True, blank=True, related_name='default_for_events')
|
|
description = models.TextField(blank=True)
|
|
location_name = models.CharField(max_length=255, blank=True)
|
|
address = models.CharField(max_length=255, blank=True)
|
|
city = models.CharField(max_length=100, blank=True)
|
|
state = models.CharField(max_length=2, blank=True)
|
|
zip_code = models.CharField(max_length=20, blank=True)
|
|
latitude = models.DecimalField(max_digits=12, decimal_places=9, null=True, blank=True)
|
|
longitude = models.DecimalField(max_digits=12, decimal_places=9, null=True, blank=True)
|
|
|
|
class Meta:
|
|
unique_together = ('tenant', 'name')
|
|
|
|
def save(self, *args, **kwargs):
|
|
skip_geocode = kwargs.pop("skip_geocode", False) or getattr(self, "_skip_geocode", False)
|
|
# Ensure coordinates are truncated to 12 characters before saving
|
|
if self.latitude:
|
|
self.latitude = Decimal(str(self.latitude)[:12])
|
|
if self.longitude:
|
|
self.longitude = Decimal(str(self.longitude)[:12])
|
|
super().save(*args, **kwargs)
|
|
|
|
def __str__(self):
|
|
if self.name:
|
|
return f"{self.name} ({self.date})"
|
|
return f"{self.event_type} on {self.date}"
|
|
|
|
class Volunteer(models.Model):
|
|
class Meta:
|
|
ordering = ("last_name", "first_name")
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='volunteers')
|
|
user = models.OneToOneField(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='volunteer_profile')
|
|
first_name = models.CharField(max_length=100, blank=True)
|
|
last_name = models.CharField(max_length=100, blank=True)
|
|
email = models.EmailField()
|
|
phone = models.CharField(max_length=20, blank=True)
|
|
interests = models.ManyToManyField(Interest, blank=True, related_name='volunteers')
|
|
assigned_events = models.ManyToManyField(Event, through='VolunteerEvent', related_name='assigned_volunteers')
|
|
is_default_caller = models.BooleanField(default=False)
|
|
notes = models.TextField(blank=True)
|
|
|
|
def save(self, *args, **kwargs):
|
|
skip_geocode = kwargs.pop("skip_geocode", False) or getattr(self, "_skip_geocode", False)
|
|
# Auto-format phone number
|
|
self.phone = format_phone_number(self.phone)
|
|
|
|
if self.is_default_caller:
|
|
# Only one default caller per tenant
|
|
Volunteer.objects.filter(tenant=self.tenant, is_default_caller=True).exclude(pk=self.pk).update(is_default_caller=False)
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
def __str__(self):
|
|
return f"{self.first_name} {self.last_name}".strip() or self.email
|
|
|
|
class VolunteerEvent(models.Model):
|
|
volunteer = models.ForeignKey(Volunteer, on_delete=models.CASCADE, related_name="event_assignments")
|
|
event = models.ForeignKey(Event, on_delete=models.CASCADE, related_name="volunteer_assignments")
|
|
role_type = models.ForeignKey(VolunteerRole, on_delete=models.SET_NULL, null=True, blank=True, related_name="volunteer_assignments")
|
|
|
|
def __str__(self):
|
|
return f"{self.volunteer} at {self.event} as {self.role_type or 'Assigned'}"
|
|
|
|
class EventParticipation(models.Model):
|
|
event = models.ForeignKey(Event, on_delete=models.CASCADE, related_name='participations')
|
|
voter = models.ForeignKey(Voter, on_delete=models.CASCADE, related_name='event_participations')
|
|
participation_status = models.ForeignKey(ParticipationStatus, on_delete=models.PROTECT, null=True)
|
|
|
|
def __str__(self):
|
|
return f"{self.voter} at {self.event} ({self.participation_status})"
|
|
|
|
class Donation(models.Model):
|
|
voter = models.ForeignKey(Voter, on_delete=models.CASCADE, related_name='donations')
|
|
date = models.DateField()
|
|
method = models.ForeignKey(DonationMethod, on_delete=models.SET_NULL, null=True)
|
|
amount = models.DecimalField(max_digits=10, decimal_places=2)
|
|
|
|
def __str__(self):
|
|
return f"{self.voter} - {self.amount} on {self.date}"
|
|
|
|
class Interaction(models.Model):
|
|
voter = models.ForeignKey(Voter, on_delete=models.CASCADE, related_name='interactions')
|
|
volunteer = models.ForeignKey(Volunteer, on_delete=models.SET_NULL, null=True, blank=True, related_name='interactions')
|
|
type = models.ForeignKey(InteractionType, on_delete=models.SET_NULL, null=True)
|
|
date = models.DateTimeField()
|
|
description = models.CharField(max_length=255)
|
|
notes = models.TextField(blank=True)
|
|
|
|
def __str__(self):
|
|
return f"{self.voter} - {self.type} on {self.date}"
|
|
|
|
class VoterLikelihood(models.Model):
|
|
LIKELIHOOD_CHOICES = [
|
|
('not_likely', 'Not Likely'),
|
|
('somewhat_likely', 'Somewhat Likely'),
|
|
('very_likely', 'Very Likely'),
|
|
]
|
|
voter = models.ForeignKey(Voter, on_delete=models.CASCADE, related_name='likelihoods')
|
|
election_type = models.ForeignKey(ElectionType, on_delete=models.CASCADE)
|
|
likelihood = models.CharField(max_length=20, choices=LIKELIHOOD_CHOICES)
|
|
|
|
class Meta:
|
|
unique_together = ('voter', 'election_type')
|
|
|
|
def __str__(self):
|
|
return f"{self.voter} - {self.election_type}: {self.get_likelihood_display()}"
|
|
|
|
class ScheduledCall(models.Model):
|
|
STATUS_CHOICES = [
|
|
('pending', 'Pending'),
|
|
('completed', 'Completed'),
|
|
('cancelled', 'Cancelled'),
|
|
]
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='scheduled_calls')
|
|
voter = models.ForeignKey(Voter, on_delete=models.CASCADE, related_name='scheduled_calls')
|
|
volunteer = models.ForeignKey(Volunteer, on_delete=models.SET_NULL, null=True, blank=True, related_name='assigned_calls')
|
|
comments = models.TextField(blank=True)
|
|
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
def __str__(self):
|
|
return f"Call for {self.voter} assigned to {self.volunteer}"
|
|
|
|
class BulkTask(models.Model):
|
|
TASK_TYPE_CHOICES = [
|
|
('sms', 'SMS'),
|
|
('email', 'Email'),
|
|
]
|
|
STATUS_CHOICES = [
|
|
('pending', 'Pending'),
|
|
('processing', 'In Progress'),
|
|
('completed', 'Completed'),
|
|
('failed', 'Failed'),
|
|
]
|
|
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, related_name='bulk_tasks')
|
|
task_type = models.CharField(max_length=10, choices=TASK_TYPE_CHOICES)
|
|
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
|
|
total_count = models.IntegerField(default=0)
|
|
success_count = models.IntegerField(default=0)
|
|
fail_count = models.IntegerField(default=0)
|
|
error_message = models.TextField(blank=True)
|
|
message_body = models.TextField(blank=True)
|
|
subject = models.CharField(max_length=255, blank=True)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
def __str__(self):
|
|
return f"{self.get_task_type_display()} Task - {self.status} ({self.created_at})"
|
|
|
|
class CampaignSettings(models.Model):
|
|
tenant = models.OneToOneField(Tenant, on_delete=models.CASCADE, related_name='settings')
|
|
donation_goal = models.DecimalField(max_digits=12, decimal_places=2, default=170000.00)
|
|
twilio_account_sid = models.CharField(max_length=100, blank=True, default='ACcd11acb5095cec6477245d385a2bf127')
|
|
twilio_auth_token = models.CharField(max_length=100, blank=True, default='89ec830d0fa02ab0afa6c76084865713')
|
|
twilio_from_number = models.CharField(max_length=20, blank=True, default='+18556945903')
|
|
timezone = models.CharField(max_length=100, default="America/Chicago", choices=[(tz, tz) for tz in sorted(zoneinfo.available_timezones())])
|
|
smtp_host = models.CharField(max_length=255, blank=True)
|
|
smtp_port = models.IntegerField(default=587)
|
|
smtp_username = models.CharField(max_length=255, blank=True)
|
|
smtp_password = models.CharField(max_length=255, blank=True)
|
|
smtp_use_tls = models.BooleanField(default=True)
|
|
smtp_use_ssl = models.BooleanField(default=False)
|
|
email_from_address = models.EmailField(blank=True)
|
|
email_from_name = models.CharField(max_length=255, blank=True)
|
|
call_script = models.TextField(blank=True)
|
|
|
|
class Meta:
|
|
verbose_name = 'Campaign Settings'
|
|
verbose_name_plural = 'Campaign Settings'
|
|
|
|
def clean(self):
|
|
from django.core.exceptions import ValidationError
|
|
if self.smtp_use_tls and self.smtp_use_ssl:
|
|
raise ValidationError('SMTP Use TLS and SMTP Use SSL are mutually exclusive. Please choose only one.')
|
|
|
|
def save(self, *args, **kwargs):
|
|
skip_geocode = kwargs.pop("skip_geocode", False) or getattr(self, "_skip_geocode", False)
|
|
self.full_clean()
|
|
super().save(*args, **kwargs)
|
|
|
|
def __str__(self):
|
|
return f'Settings for {self.tenant.name}'
|
|
|
|
@receiver(post_save, sender=Donation)
|
|
def update_voter_support_on_donation(sender, instance, **kwargs):
|
|
"""
|
|
Automatically set candidate_support to 'supporting' if a voter has a donation > 0.
|
|
"""
|
|
if instance.amount > 0:
|
|
voter = instance.voter
|
|
if voter.candidate_support != 'supporting':
|
|
voter.candidate_support = 'supporting'
|
|
voter.save(update_fields=['candidate_support'])
|
|
|
|
|
|
@receiver(pre_save, sender=Voter)
|
|
def handle_voter_status_on_voted_pre_save(sender, instance, **kwargs):
|
|
"""
|
|
If a voter has voted, ensure they are not targets for door visits or calls.
|
|
"""
|
|
if instance.voted:
|
|
instance.target_door_visit = False
|
|
instance.call_queue_status = 'no_call_required'
|
|
|
|
@receiver(post_save, sender=Voter)
|
|
def update_voter_support_on_yard_sign(sender, instance, **kwargs):
|
|
"""
|
|
Automatically set candidate_support to "supporting" if:
|
|
- Voter is older than 30 (birthdate <= 30 years ago)
|
|
- Someone in their household (including themselves) has a yard sign ("wants" or "has")
|
|
"""
|
|
if getattr(instance, "_skip_signals", False):
|
|
return
|
|
|
|
orig = getattr(instance, "_orig_obj", None)
|
|
|
|
# Detection of manual changes or irrelevant updates
|
|
update_fields = kwargs.get("update_fields")
|
|
support_manually_changed = orig and instance.candidate_support != orig.candidate_support
|
|
|
|
relevant_fields = {"yard_sign", "birthdate", "address_street", "city", "state", "zip_code"}
|
|
|
|
if update_fields:
|
|
if not relevant_fields.intersection(update_fields):
|
|
return
|
|
elif orig and not kwargs.get("created"):
|
|
# If no update_fields, manually check if anything relevant changed
|
|
changed = False
|
|
for field in relevant_fields:
|
|
if getattr(instance, field) != getattr(orig, field):
|
|
changed = True
|
|
break
|
|
if not changed:
|
|
return
|
|
|
|
from datetime import date
|
|
|
|
today = date.today()
|
|
try:
|
|
thirty_years_ago = today.replace(year=today.year - 30)
|
|
except ValueError: # Leap year case
|
|
thirty_years_ago = today.replace(year=today.year - 30, day=today.day - 1)
|
|
|
|
# 1. If this voter now has a yard sign, update everyone in the household who is > 30
|
|
# ONLY update those whose support is currently "unknown" to avoid overwriting intentional choices.
|
|
if instance.yard_sign in ["wants", "has"]:
|
|
queryset = Voter.objects.filter(
|
|
address_street=instance.address_street,
|
|
city=instance.city,
|
|
state=instance.state,
|
|
zip_code=instance.zip_code,
|
|
tenant=instance.tenant,
|
|
birthdate__lte=thirty_years_ago,
|
|
candidate_support="unknown"
|
|
)
|
|
# If support was manually changed in THIS save, exclude this instance from auto-revert
|
|
if support_manually_changed:
|
|
queryset = queryset.exclude(pk=instance.pk)
|
|
|
|
queryset.update(candidate_support="supporting")
|
|
|
|
# 2. If this voter itself is > 30, check if anyone in the household has a yard sign
|
|
elif instance.birthdate and instance.birthdate <= thirty_years_ago:
|
|
# Only auto-set if support is currently unknown and wasn"t just manually changed.
|
|
if not support_manually_changed and instance.candidate_support == "unknown":
|
|
household_has_sign = Voter.objects.filter(
|
|
address_street=instance.address_street,
|
|
city=instance.city,
|
|
state=instance.state,
|
|
zip_code=instance.zip_code,
|
|
tenant=instance.tenant,
|
|
yard_sign__in=["wants", "has"]
|
|
).exists()
|
|
|
|
if household_has_sign:
|
|
Voter.objects.filter(pk=instance.pk).update(candidate_support="supporting")
|
|
elif instance.birthdate and instance.birthdate <= thirty_years_ago:
|
|
household_has_sign = Voter.objects.filter(
|
|
address_street=instance.address_street,
|
|
city=instance.city,
|
|
state=instance.state,
|
|
zip_code=instance.zip_code,
|
|
tenant=instance.tenant,
|
|
yard_sign__in=['wants', 'has']
|
|
).exists()
|
|
|
|
if household_has_sign and instance.candidate_support != 'supporting':
|
|
Voter.objects.filter(pk=instance.pk).update(candidate_support='supporting')
|
|
|
|
@receiver(post_save, sender=Voter)
|
|
def update_target_door_visit_logic(sender, instance, **kwargs):
|
|
"""
|
|
Set target_door_visit = False if door_visit = False and any voter record in the household:
|
|
1. Has a candidate support = 'Supporting' or 'Not Supporting'
|
|
2. Has attended an event (EventParticipation status = 'Attended')
|
|
3. NO ONE in the household is marked as is_targeted = True
|
|
"""
|
|
if getattr(instance, '_skip_signals', False):
|
|
return
|
|
|
|
# Manual override check: if target_door_visit was explicitly set to True in this save,
|
|
# skip the auto-reset logic for THIS voter.
|
|
is_manual_override = getattr(instance, '_target_door_visit_manually_set', False)
|
|
|
|
update_fields = kwargs.get('update_fields')
|
|
if update_fields:
|
|
relevant = {'candidate_support', 'is_targeted', 'door_visit', 'address_street', 'city', 'state', 'zip_code', 'voted'}
|
|
if not relevant.intersection(update_fields):
|
|
return
|
|
|
|
# 0. If this voter has voted, they are no longer a target for door visits.
|
|
if instance.voted:
|
|
if instance.target_door_visit:
|
|
Voter.objects.filter(pk=instance.pk).update(target_door_visit=False)
|
|
|
|
# 1. If this voter was just updated to Supporting or Not Supporting,
|
|
# remove everyone in the household who hasn't been visited from the target list.
|
|
if instance.candidate_support in ['supporting', 'not_supporting']:
|
|
queryset = Voter.objects.filter(
|
|
address_street=instance.address_street,
|
|
city=instance.city,
|
|
state=instance.state,
|
|
zip_code=instance.zip_code,
|
|
tenant=instance.tenant,
|
|
door_visit=False
|
|
)
|
|
if is_manual_override:
|
|
queryset = queryset.exclude(pk=instance.pk)
|
|
queryset.update(target_door_visit=False)
|
|
|
|
# 2. If this voter was just updated to is_targeted = False,
|
|
# and NO ONE in the household is targeted, set target_door_visit = False
|
|
# for everyone in the household who hasn't been visited.
|
|
elif not instance.is_targeted:
|
|
household_has_targeted = Voter.objects.filter(
|
|
address_street=instance.address_street,
|
|
city=instance.city,
|
|
state=instance.state,
|
|
zip_code=instance.zip_code,
|
|
tenant=instance.tenant,
|
|
is_targeted=True
|
|
).exists()
|
|
|
|
if not household_has_targeted:
|
|
queryset = Voter.objects.filter(
|
|
address_street=instance.address_street,
|
|
city=instance.city,
|
|
state=instance.state,
|
|
zip_code=instance.zip_code,
|
|
tenant=instance.tenant,
|
|
door_visit=False
|
|
)
|
|
if is_manual_override:
|
|
queryset = queryset.exclude(pk=instance.pk)
|
|
queryset.update(target_door_visit=False)
|
|
|
|
# 3. If this voter was just saved with door_visit=False,
|
|
# check if anyone in the household (including themselves) has known support,
|
|
# attended an event, or if NO ONE is targeted.
|
|
elif not instance.door_visit and not is_manual_override:
|
|
household_voters = Voter.objects.filter(
|
|
address_street=instance.address_street,
|
|
city=instance.city,
|
|
state=instance.state,
|
|
zip_code=instance.zip_code,
|
|
tenant=instance.tenant
|
|
)
|
|
|
|
household_has_known_support = household_voters.filter(
|
|
candidate_support__in=['supporting', 'not_supporting']
|
|
).exists()
|
|
|
|
household_has_attended = EventParticipation.objects.filter(
|
|
voter__in=household_voters,
|
|
participation_status__name='Attended'
|
|
).exists()
|
|
|
|
household_has_targeted = household_voters.filter(is_targeted=True).exists()
|
|
|
|
if (household_has_known_support or household_has_attended or not household_has_targeted) and instance.target_door_visit:
|
|
Voter.objects.filter(pk=instance.pk).update(target_door_visit=False)
|
|
|
|
@receiver(post_save, sender=EventParticipation)
|
|
def update_target_door_visit_on_participation(sender, instance, **kwargs):
|
|
"""
|
|
Set target_door_visit = False for all household members who haven't been visited
|
|
if someone in the household attended an event.
|
|
"""
|
|
if instance.participation_status and instance.participation_status.name == 'Attended':
|
|
voter = instance.voter
|
|
Voter.objects.filter(
|
|
address_street=voter.address_street,
|
|
city=voter.city,
|
|
state=voter.state,
|
|
zip_code=voter.zip_code,
|
|
tenant=voter.tenant,
|
|
door_visit=False
|
|
).update(target_door_visit=False)
|
|
|
|
@receiver(post_save, sender=Voter)
|
|
def update_voter_call_queue_status_on_voter_save(sender, instance, **kwargs):
|
|
"""
|
|
Sync call_queue_status when is_targeted, candidate_support or voted changes.
|
|
"""
|
|
if getattr(instance, '_skip_signals', False):
|
|
return
|
|
|
|
orig = getattr(instance, '_orig_obj', None)
|
|
if orig and instance.call_queue_status != orig.call_queue_status:
|
|
# If call_queue_status was manually changed, don't auto-override in this save
|
|
return
|
|
|
|
update_fields = kwargs.get('update_fields')
|
|
if update_fields:
|
|
relevant = {'is_targeted', 'candidate_support', 'voted'}
|
|
if not relevant.intersection(update_fields):
|
|
return
|
|
|
|
# PRIORITY 1: If they voted, no call required and cancel pending calls
|
|
if instance.voted:
|
|
# Cancel any pending calls
|
|
ScheduledCall.objects.filter(voter=instance, status='pending').update(status='cancelled')
|
|
|
|
if instance.call_queue_status != 'no_call_required':
|
|
Voter.objects.filter(pk=instance.pk).update(call_queue_status='no_call_required')
|
|
return
|
|
|
|
# PRIORITY 2: Check if in queue (pending scheduled call)
|
|
if ScheduledCall.objects.filter(voter=instance, status='pending').exists():
|
|
if instance.call_queue_status != 'in_call_queue':
|
|
Voter.objects.filter(pk=instance.pk).update(call_queue_status='in_call_queue')
|
|
return
|
|
|
|
# PRIORITY 3: If support is 'supporting', then 'no_call_required'
|
|
if instance.candidate_support == 'supporting':
|
|
if instance.call_queue_status != 'no_call_required':
|
|
Voter.objects.filter(pk=instance.pk).update(call_queue_status='no_call_required')
|
|
return
|
|
|
|
# PRIORITY 4: If un-targeted, set to no_call_required
|
|
if not instance.is_targeted:
|
|
if instance.call_queue_status != 'no_call_required':
|
|
Voter.objects.filter(pk=instance.pk).update(call_queue_status='no_call_required')
|
|
else:
|
|
# If targeted, and currently no_call_required, set to to_be_called
|
|
if instance.call_queue_status == 'no_call_required':
|
|
Voter.objects.filter(pk=instance.pk).update(call_queue_status='to_be_called')
|
|
|
|
@receiver(post_save, sender=ScheduledCall)
|
|
def update_voter_call_queue_status_on_call_save(sender, instance, **kwargs):
|
|
"""
|
|
Sync Voter.call_queue_status when a ScheduledCall is saved.
|
|
"""
|
|
voter = instance.voter
|
|
|
|
# PRIORITY 0: If they voted, always no_call_required
|
|
if voter.voted:
|
|
if voter.call_queue_status != 'no_call_required':
|
|
voter.call_queue_status = 'no_call_required'
|
|
voter.save(update_fields=['call_queue_status'])
|
|
return
|
|
|
|
|
|
# PRIORITY 1: If there is ANY pending call for this voter, ALWAYS in_call_queue
|
|
if ScheduledCall.objects.filter(voter=voter, status='pending').exists():
|
|
if voter.call_queue_status != 'in_call_queue':
|
|
voter.call_queue_status = 'in_call_queue'
|
|
voter.save(update_fields=['call_queue_status'])
|
|
return
|
|
|
|
# PRIORITY 2: If no pending calls, follow normal rules
|
|
if voter.candidate_support == 'supporting':
|
|
if voter.call_queue_status != 'no_call_required':
|
|
voter.call_queue_status = 'no_call_required'
|
|
voter.save(update_fields=['call_queue_status'])
|
|
return
|
|
|
|
if instance.status == 'completed':
|
|
if voter.call_queue_status != 'called':
|
|
voter.call_queue_status = 'called'
|
|
voter.save(update_fields=['call_queue_status'])
|
|
elif instance.status == 'cancelled':
|
|
if voter.is_targeted:
|
|
# Check if they were already called
|
|
if ScheduledCall.objects.filter(voter=voter, status='completed').exists():
|
|
voter.call_queue_status = 'called'
|
|
else:
|
|
voter.call_queue_status = 'to_be_called'
|
|
voter.save(update_fields=['call_queue_status'])
|
|
|
|
@receiver(post_delete, sender=ScheduledCall)
|
|
def update_voter_call_queue_status_on_call_delete(sender, instance, **kwargs):
|
|
"""
|
|
Sync Voter.call_queue_status when a ScheduledCall is deleted.
|
|
"""
|
|
voter = instance.voter
|
|
|
|
# PRIORITY 1: Check if there are other pending calls
|
|
if ScheduledCall.objects.filter(voter=voter, status='pending').exists():
|
|
if voter.call_queue_status != 'in_call_queue':
|
|
voter.call_queue_status = 'in_call_queue'
|
|
voter.save(update_fields=['call_queue_status'])
|
|
return
|
|
|
|
# PRIORITY 2: If no pending calls, follow normal rules
|
|
if voter.candidate_support == 'supporting':
|
|
if voter.call_queue_status != 'no_call_required':
|
|
voter.call_queue_status = 'no_call_required'
|
|
voter.save(update_fields=['call_queue_status'])
|
|
return
|
|
|
|
if voter.is_targeted:
|
|
# If no pending calls left, set back to called or to_be_called
|
|
if ScheduledCall.objects.filter(voter=voter, status='completed').exists():
|
|
voter.call_queue_status = 'called'
|
|
else:
|
|
voter.call_queue_status = 'to_be_called'
|
|
voter.save(update_fields=['call_queue_status']) |