218 lines
8.2 KiB
Python
218 lines
8.2 KiB
Python
import threading
|
|
import base64
|
|
import urllib.parse
|
|
import urllib.request
|
|
import re
|
|
import logging
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from django.utils import timezone
|
|
from django.db import connection, transaction
|
|
from .models import BulkTask, Voter, Volunteer, Interaction, InteractionType
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def send_single_sms(url, auth_header, from_number, to_number, message_body):
|
|
"""
|
|
Sends a single SMS using Twilio API. Returns (success, error_msg).
|
|
"""
|
|
data_dict = {
|
|
'To': to_number,
|
|
'From': from_number,
|
|
'Body': message_body
|
|
}
|
|
data = urllib.parse.urlencode(data_dict).encode()
|
|
|
|
req = urllib.request.Request(url, data=data, method='POST')
|
|
req.add_header("Authorization", f"Basic {auth_header}")
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=10) as response:
|
|
if response.status in [200, 201]:
|
|
return True, None
|
|
else:
|
|
return False, f"HTTP {response.status}"
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def run_bulk_sms_task(task_id, object_ids, select_all_results, search_filters=None, object_type='voter'):
|
|
"""
|
|
Background task to send bulk SMS to voters or volunteers.
|
|
"""
|
|
# Ensure a fresh connection for the thread
|
|
connection.close()
|
|
|
|
try:
|
|
task = BulkTask.objects.get(id=task_id)
|
|
task.status = 'processing'
|
|
task.save()
|
|
|
|
tenant = task.tenant
|
|
settings_obj = getattr(tenant, 'settings', None)
|
|
|
|
if not settings_obj:
|
|
task.status = 'failed'
|
|
task.error_message = "Campaign settings not found."
|
|
task.save()
|
|
return
|
|
|
|
account_sid = settings_obj.twilio_account_sid
|
|
auth_token = settings_obj.twilio_auth_token
|
|
from_number = settings_obj.twilio_from_number
|
|
|
|
if not account_sid or not auth_token or not from_number:
|
|
task.status = 'failed'
|
|
task.error_message = "Twilio configuration is incomplete."
|
|
task.save()
|
|
return
|
|
|
|
message_body = task.message_body
|
|
|
|
# Determine the queryset of objects (Voters or Volunteers)
|
|
if object_type == 'voter':
|
|
if select_all_results and search_filters:
|
|
from .filter_helper import get_filtered_voter_queryset_from_filters
|
|
queryset = get_filtered_voter_queryset_from_filters(tenant, search_filters)
|
|
queryset = queryset.filter(phone_type='cell').exclude(phone='')
|
|
else:
|
|
queryset = Voter.objects.filter(tenant=tenant, id__in=object_ids, phone_type='cell').exclude(phone='')
|
|
else: # volunteer
|
|
queryset = Volunteer.objects.filter(tenant=tenant, id__in=object_ids).exclude(phone='')
|
|
|
|
task.total_count = queryset.count()
|
|
task.save()
|
|
|
|
if task.total_count == 0:
|
|
task.status = 'completed'
|
|
task.error_message = f"No {object_type}s with a valid phone number found."
|
|
task.save()
|
|
return
|
|
|
|
auth_str = f"{account_sid}:{auth_token}"
|
|
auth_header = base64.b64encode(auth_str.encode()).decode()
|
|
url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
|
|
|
|
interaction_type = None
|
|
if object_type == 'voter':
|
|
interaction_type, _ = InteractionType.objects.get_or_create(tenant=tenant, name="SMS Text")
|
|
|
|
success_count = 0
|
|
fail_count = 0
|
|
|
|
interactions_to_create = []
|
|
|
|
# Batch size for updating task status and creating interactions
|
|
batch_size = 50
|
|
max_workers = 10 # Parallelize Twilio requests
|
|
|
|
# Using iterator() to avoid loading all objects into memory
|
|
# Note: We need to handle IDs and phones to avoid "too many open connections" or stale data
|
|
# Actually, iterator() is fine.
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
future_to_obj = {}
|
|
|
|
for obj in queryset.iterator():
|
|
# Format phone to E.164 (assume US +1)
|
|
digits = re.sub(r'\D', '', str(obj.phone))
|
|
if len(digits) == 10:
|
|
to_number = f"+1{digits}"
|
|
elif len(digits) == 11 and digits.startswith('1'):
|
|
to_number = f"+{digits}"
|
|
else:
|
|
fail_count += 1
|
|
continue
|
|
|
|
future = executor.submit(send_single_sms, url, auth_header, from_number, to_number, message_body)
|
|
future_to_obj[future] = obj
|
|
|
|
# If we have many futures, process them to avoid memory issues and see progress
|
|
if len(future_to_obj) >= batch_size:
|
|
# Collect completed results
|
|
for future in as_completed(future_to_obj):
|
|
obj = future_to_obj.pop(future)
|
|
success, error = future.result()
|
|
if success:
|
|
success_count += 1
|
|
if object_type == 'voter':
|
|
interactions_to_create.append(Interaction(
|
|
voter=obj,
|
|
type=interaction_type,
|
|
date=timezone.now(),
|
|
description='Mass SMS Text',
|
|
notes=message_body
|
|
))
|
|
else:
|
|
fail_count += 1
|
|
logger.error(f"Error sending SMS to {obj.phone}: {error}")
|
|
|
|
if len(future_to_obj) < batch_size // 2: # Keep some buffer
|
|
break
|
|
|
|
# Update status and interactions
|
|
if len(interactions_to_create) >= batch_size:
|
|
Interaction.objects.bulk_create(interactions_to_create)
|
|
interactions_to_create = []
|
|
|
|
task.success_count = success_count
|
|
task.fail_count = fail_count
|
|
task.save()
|
|
|
|
# Process remaining futures
|
|
for future in as_completed(future_to_obj):
|
|
obj = future_to_obj[future]
|
|
success, error = future.result()
|
|
if success:
|
|
success_count += 1
|
|
if object_type == 'voter':
|
|
interactions_to_create.append(Interaction(
|
|
voter=obj,
|
|
type=interaction_type,
|
|
date=timezone.now(),
|
|
description='Mass SMS Text',
|
|
notes=message_body
|
|
))
|
|
else:
|
|
fail_count += 1
|
|
logger.error(f"Error sending SMS to {obj.phone}: {error}")
|
|
|
|
if interactions_to_create:
|
|
Interaction.objects.bulk_create(interactions_to_create)
|
|
interactions_to_create = []
|
|
|
|
task.success_count = success_count
|
|
task.fail_count = fail_count
|
|
task.status = 'completed'
|
|
task.save()
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Unexpected error in bulk SMS task: {e}")
|
|
try:
|
|
task = BulkTask.objects.get(id=task_id)
|
|
task.status = 'failed'
|
|
task.error_message = str(e)
|
|
task.save()
|
|
except:
|
|
pass
|
|
finally:
|
|
connection.close()
|
|
|
|
def start_bulk_sms_task(tenant, message_body, object_ids, select_all_results, search_filters=None, object_type='voter'):
|
|
"""
|
|
Creates a BulkTask and starts the background thread.
|
|
"""
|
|
task = BulkTask.objects.create(
|
|
tenant=tenant,
|
|
task_type='sms',
|
|
message_body=message_body,
|
|
status='pending'
|
|
)
|
|
|
|
thread = threading.Thread(
|
|
target=run_bulk_sms_task,
|
|
args=(task.id, object_ids, select_all_results, search_filters, object_type)
|
|
)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
return task
|