98 lines
4.6 KiB
Python
98 lines
4.6 KiB
Python
import csv
|
|
import json
|
|
import os
|
|
from dataclasses import dataclass
|
|
from decimal import Decimal, ROUND_HALF_UP
|
|
from io import StringIO
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import urlencode
|
|
from urllib.request import urlopen
|
|
|
|
from django.db.models import DecimalField, Q, Sum, Value
|
|
from django.db.models.functions import Coalesce, ExtractMonth, ExtractYear
|
|
|
|
from .models import Trip
|
|
|
|
|
|
@dataclass
|
|
class DistanceResult:
|
|
ok: bool
|
|
miles: Decimal | None = None
|
|
message: str = ""
|
|
|
|
|
|
def calculate_google_maps_distance(start_location: str, end_location: str) -> DistanceResult:
|
|
api_key = os.getenv("GOOGLE_MAPS_API_KEY", "").strip()
|
|
if not api_key:
|
|
return DistanceResult(ok=False, message="Google Maps API key is not configured yet. Add GOOGLE_MAPS_API_KEY to enable route mileage.")
|
|
|
|
params = urlencode({"origins": start_location, "destinations": end_location, "mode": "driving", "units": "imperial", "key": api_key})
|
|
url = f"https://maps.googleapis.com/maps/api/distancematrix/json?{params}"
|
|
|
|
try:
|
|
with urlopen(url, timeout=12) as response:
|
|
payload = json.loads(response.read().decode("utf-8"))
|
|
except (HTTPError, URLError, TimeoutError):
|
|
return DistanceResult(ok=False, message="We could not reach Google Maps right now. Please try again or enter miles manually.")
|
|
|
|
if payload.get("status") != "OK":
|
|
return DistanceResult(ok=False, message="Google Maps could not validate that route. Please refine both addresses.")
|
|
|
|
rows = payload.get("rows") or []
|
|
elements = (rows[0].get("elements") if rows else []) or []
|
|
element = elements[0] if elements else {}
|
|
if element.get("status") != "OK":
|
|
return DistanceResult(ok=False, message="Google Maps could not find that route. Try a fuller street address or city/state.")
|
|
|
|
meters = element.get("distance", {}).get("value")
|
|
if meters is None:
|
|
return DistanceResult(ok=False, message="Google Maps did not return a distance for that route.")
|
|
|
|
miles = (Decimal(str(meters)) / Decimal("1609.344")).quantize(Decimal("0.1"), rounding=ROUND_HALF_UP)
|
|
return DistanceResult(ok=True, miles=miles, message="Driving mileage calculated from Google Maps.")
|
|
|
|
|
|
def apply_trip_filters(queryset, data):
|
|
if data.get("start_date"):
|
|
queryset = queryset.filter(date__gte=data["start_date"])
|
|
if data.get("end_date"):
|
|
queryset = queryset.filter(date__lte=data["end_date"])
|
|
if data.get("month"):
|
|
queryset = queryset.annotate(filter_month=ExtractMonth("date")).filter(filter_month=data["month"])
|
|
if data.get("year"):
|
|
queryset = queryset.annotate(filter_year=ExtractYear("date")).filter(filter_year=data["year"])
|
|
if data.get("trip_type"):
|
|
queryset = queryset.filter(trip_type=data["trip_type"])
|
|
return queryset
|
|
|
|
|
|
def report_queryset(cleaned_data):
|
|
queryset = Trip.objects.all()
|
|
report_type = cleaned_data.get("report_type")
|
|
if report_type == "month":
|
|
queryset = queryset.filter(date__month=cleaned_data["month"], date__year=cleaned_data["year"])
|
|
elif report_type == "range":
|
|
queryset = queryset.filter(date__range=(cleaned_data["start_date"], cleaned_data["end_date"]))
|
|
elif report_type == "year":
|
|
queryset = queryset.filter(date__year=cleaned_data["year"])
|
|
return queryset.order_by("date", "start_time", "created_at")
|
|
|
|
|
|
def summarize_trips(queryset):
|
|
aggregates = queryset.aggregate(
|
|
business_miles=Coalesce(Sum("distance_miles", filter=Q(trip_type=Trip.TripType.BUSINESS)), Value(Decimal("0.0")), output_field=DecimalField(max_digits=10, decimal_places=1)),
|
|
non_business_miles=Coalesce(Sum("distance_miles", filter=~Q(trip_type=Trip.TripType.BUSINESS)), Value(Decimal("0.0")), output_field=DecimalField(max_digits=10, decimal_places=1)),
|
|
total_miles=Coalesce(Sum("distance_miles"), Value(Decimal("0.0")), output_field=DecimalField(max_digits=10, decimal_places=1)),
|
|
)
|
|
aggregates["trip_count"] = queryset.count()
|
|
return aggregates
|
|
|
|
|
|
def export_trips_csv(trips):
|
|
buffer = StringIO()
|
|
writer = csv.writer(buffer)
|
|
writer.writerow(["Date of trip", "Start time", "End time", "Starting location", "Destination", "Business purpose", "Trip type", "Starting odometer", "Ending odometer", "Miles for this trip", "Distance source", "Notes", "Created at", "Updated at"])
|
|
for trip in trips:
|
|
writer.writerow([trip.date, trip.start_time, trip.end_time, trip.start_location, trip.end_location, trip.business_purpose, trip.get_trip_type_display(), trip.start_odometer or "", trip.end_odometer or "", trip.distance_miles or "", trip.get_distance_source_display(), trip.notes, trip.created_at, trip.updated_at])
|
|
return buffer.getvalue()
|