38686-vm/core/views.py
Konrad du Plessis 19e565a088 Fix payroll dashboard JS crash + add calendar view to work history
1. Fix json_script double-encoding bug: payroll_dashboard view was
   passing json.dumps() strings to template context, then json_script
   filter serialized them AGAIN. JavaScript received strings instead
   of arrays, crashing the entire DOMContentLoaded handler and
   preventing preview, edit/delete, and other features from working.
   Fix: pass raw Python objects, let json_script handle serialization.

2. Add defense-in-depth: wrap Chart.js initialization in try-catch
   blocks and use Bootstrap getOrCreateInstance() for modals.

3. Add calendar view to work history: monthly grid with day cells
   showing work log indicators, click-to-see-details panel, month
   navigation, and responsive mobile layout. Ported from V2.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 22:31:32 +02:00

1532 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# === VIEWS ===
# All the page logic for the LabourPay app.
# Each function here handles a URL and decides what to show the user.
import csv
import json
import datetime
import calendar as cal_module
from decimal import Decimal
from django.shortcuts import render, redirect, get_object_or_404
from django.utils import timezone
from django.db import transaction
from django.db.models import Sum, Count, Q, Prefetch
from django.db.models.functions import TruncMonth
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse, HttpResponseForbidden, HttpResponse
from django.core.mail import EmailMultiAlternatives
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.conf import settings
from .models import Worker, Project, WorkLog, Team, PayrollRecord, Loan, PayrollAdjustment
from .forms import AttendanceLogForm, PayrollAdjustmentForm, ExpenseReceiptForm, ExpenseLineItemFormSet
# NOTE: render_to_pdf is NOT imported here at the top level.
# It's imported lazily inside process_payment() and create_receipt()
# to avoid crashing the entire app if xhtml2pdf is not installed on the server.
# === PAYROLL CONSTANTS ===
# These define which adjustment types ADD to a worker's pay vs SUBTRACT from it.
# "New Loan" is additive because the worker receives money upfront.
# "Loan Repayment" and "Advance Payment" are deductive — they reduce net pay.
ADDITIVE_TYPES = ['Bonus', 'Overtime', 'New Loan']
DEDUCTIVE_TYPES = ['Deduction', 'Loan Repayment', 'Advance Payment']
# === PERMISSION HELPERS ===
# These small functions check what kind of user is logged in.
# "Admin" = the boss (is_staff or is_superuser in Django).
# "Supervisor" = someone who manages teams or projects, or is in the Work Logger group.
def is_admin(user):
"""Returns True if the user is staff or superuser (the boss)."""
return user.is_staff or user.is_superuser
def is_supervisor(user):
"""Returns True if the user manages teams, has assigned projects, or is a Work Logger."""
return (
user.supervised_teams.exists()
or user.assigned_projects.exists()
or user.groups.filter(name='Work Logger').exists()
)
def is_staff_or_supervisor(user):
"""Returns True if the user is either an admin or a supervisor."""
return is_admin(user) or is_supervisor(user)
# === HOME DASHBOARD ===
# The main page users see after logging in. Shows different content
# depending on whether the user is an admin or supervisor.
@login_required
def index(request):
user = request.user
if is_admin(user):
# --- ADMIN DASHBOARD ---
# Calculate total value of unpaid work and break it down by project.
# A WorkLog is "unpaid" if it has no linked PayrollRecord entries.
unpaid_worklogs = WorkLog.objects.filter(
payroll_records__isnull=True
).select_related('project').prefetch_related('workers')
outstanding_payments = Decimal('0.00')
outstanding_by_project = {}
for wl in unpaid_worklogs:
project_name = wl.project.name
if project_name not in outstanding_by_project:
outstanding_by_project[project_name] = Decimal('0.00')
for worker in wl.workers.all():
cost = worker.daily_rate
outstanding_payments += cost
outstanding_by_project[project_name] += cost
# Also include unpaid payroll adjustments (bonuses, deductions, etc.)
unpaid_adjustments = PayrollAdjustment.objects.filter(
payroll_record__isnull=True
).select_related('project')
for adj in unpaid_adjustments:
outstanding_payments += adj.amount
project_name = adj.project.name if adj.project else 'General'
if project_name not in outstanding_by_project:
outstanding_by_project[project_name] = Decimal('0.00')
outstanding_by_project[project_name] += adj.amount
# Sum total paid out in the last 60 days
sixty_days_ago = timezone.now().date() - timezone.timedelta(days=60)
paid_this_month = PayrollRecord.objects.filter(
date__gte=sixty_days_ago
).aggregate(total=Sum('amount_paid'))['total'] or Decimal('0.00')
# Count and total balance of active loans
active_loans_qs = Loan.objects.filter(active=True)
active_loans_count = active_loans_qs.count()
active_loans_balance = active_loans_qs.aggregate(
total=Sum('remaining_balance')
)['total'] or Decimal('0.00')
# This week summary
start_of_week = timezone.now().date() - timezone.timedelta(
days=timezone.now().date().weekday()
)
this_week_logs = WorkLog.objects.filter(date__gte=start_of_week).count()
# Recent activity — last 5 work logs
recent_activity = WorkLog.objects.select_related(
'project', 'supervisor'
).prefetch_related('workers').order_by('-date', '-id')[:5]
# All workers, projects, and teams for the Manage Resources tab
workers = Worker.objects.all().order_by('name')
projects = Project.objects.all().order_by('name')
teams = Team.objects.all().order_by('name')
context = {
'is_admin': True,
'outstanding_payments': outstanding_payments,
'paid_this_month': paid_this_month,
'active_loans_count': active_loans_count,
'active_loans_balance': active_loans_balance,
'outstanding_by_project': outstanding_by_project,
'this_week_logs': this_week_logs,
'recent_activity': recent_activity,
'workers': workers,
'projects': projects,
'teams': teams,
}
return render(request, 'core/index.html', context)
else:
# --- SUPERVISOR DASHBOARD ---
# Count projects this supervisor is assigned to
my_projects_count = user.assigned_projects.filter(active=True).count()
# Count teams this supervisor manages
my_teams_count = user.supervised_teams.filter(active=True).count()
# Count unique workers across all their teams
my_workers_count = Worker.objects.filter(
active=True,
teams__supervisor=user,
teams__active=True
).distinct().count()
# This week summary — only their own logs
start_of_week = timezone.now().date() - timezone.timedelta(
days=timezone.now().date().weekday()
)
this_week_logs = WorkLog.objects.filter(
date__gte=start_of_week, supervisor=user
).count()
# Their last 5 work logs
recent_activity = WorkLog.objects.filter(
supervisor=user
).select_related('project').prefetch_related('workers').order_by('-date', '-id')[:5]
context = {
'is_admin': False,
'my_projects_count': my_projects_count,
'my_teams_count': my_teams_count,
'my_workers_count': my_workers_count,
'this_week_logs': this_week_logs,
'recent_activity': recent_activity,
}
return render(request, 'core/index.html', context)
# === ATTENDANCE LOGGING ===
# This is where supervisors log which workers showed up to work each day.
# Supports logging a single day or a date range (e.g. a whole week).
# Includes conflict detection to prevent double-logging workers.
@login_required
def attendance_log(request):
user = request.user
if request.method == 'POST':
form = AttendanceLogForm(request.POST, user=user)
if form.is_valid():
start_date = form.cleaned_data['date']
end_date = form.cleaned_data.get('end_date') or start_date
include_saturday = form.cleaned_data.get('include_saturday', False)
include_sunday = form.cleaned_data.get('include_sunday', False)
project = form.cleaned_data['project']
team = form.cleaned_data.get('team')
workers = form.cleaned_data['workers']
overtime_amount = form.cleaned_data['overtime_amount']
notes = form.cleaned_data.get('notes', '')
# --- Build list of dates to log ---
# Go through each day from start to end, skipping weekends
# unless the user checked the "Include Saturday/Sunday" boxes
dates_to_log = []
current_date = start_date
while current_date <= end_date:
day_of_week = current_date.weekday() # 0=Mon, 5=Sat, 6=Sun
if day_of_week == 5 and not include_saturday:
current_date += datetime.timedelta(days=1)
continue
if day_of_week == 6 and not include_sunday:
current_date += datetime.timedelta(days=1)
continue
dates_to_log.append(current_date)
current_date += datetime.timedelta(days=1)
if not dates_to_log:
messages.warning(request, 'No valid dates in the selected range.')
# Still need team_workers_json for the JS even on error re-render
tw_map = {}
for t in Team.objects.filter(active=True).prefetch_related('workers'):
tw_map[t.id] = list(t.workers.filter(active=True).values_list('id', flat=True))
return render(request, 'core/attendance_log.html', {
'form': form,
'is_admin': is_admin(user),
'team_workers_json': json.dumps(tw_map),
})
# --- Conflict detection ---
# Check if any selected workers already have a WorkLog on any of these dates
worker_ids = list(workers.values_list('id', flat=True))
existing_logs = WorkLog.objects.filter(
date__in=dates_to_log,
workers__id__in=worker_ids
).prefetch_related('workers').select_related('project')
conflicts = []
for log in existing_logs:
for w in log.workers.all():
if w.id in worker_ids:
conflicts.append({
'worker_name': w.name,
'date': log.date,
'project_name': log.project.name,
})
# If there are conflicts and the user hasn't chosen what to do yet
conflict_action = request.POST.get('conflict_action', '')
if conflicts and not conflict_action:
# Show the conflict warning — let user choose Skip or Overwrite
# Still need team_workers_json for the JS even on conflict re-render
tw_map = {}
for t in Team.objects.filter(active=True).prefetch_related('workers'):
tw_map[t.id] = list(t.workers.filter(active=True).values_list('id', flat=True))
return render(request, 'core/attendance_log.html', {
'form': form,
'conflicts': conflicts,
'is_admin': is_admin(user),
'team_workers_json': json.dumps(tw_map),
})
# --- Create work logs ---
created_count = 0
skipped_count = 0
for log_date in dates_to_log:
# Check which workers already have a log on this date
workers_with_existing = set(
WorkLog.objects.filter(
date=log_date,
workers__id__in=worker_ids
).values_list('workers__id', flat=True)
)
if conflict_action == 'overwrite':
# Remove conflicting workers from their existing logs
conflicting_logs = WorkLog.objects.filter(
date=log_date,
workers__id__in=worker_ids
)
for existing_log in conflicting_logs:
for w_id in worker_ids:
existing_log.workers.remove(w_id)
workers_to_add = workers
elif conflict_action == 'skip':
# Skip workers who already have logs on this date
workers_to_add = workers.exclude(id__in=workers_with_existing)
skipped_count += len(workers_with_existing & set(worker_ids))
else:
# No conflicts, or first submission — add all workers
workers_to_add = workers
if workers_to_add.exists():
# Create the WorkLog record
work_log = WorkLog.objects.create(
date=log_date,
project=project,
team=team,
supervisor=user, # Auto-set to logged-in user
overtime_amount=overtime_amount,
notes=notes,
)
work_log.workers.set(workers_to_add)
created_count += 1
# Show success message
if created_count > 0:
msg = f'Successfully created {created_count} work log(s).'
if skipped_count > 0:
msg += f' Skipped {skipped_count} conflicts.'
messages.success(request, msg)
else:
messages.warning(request, 'No work logs created — all entries were conflicts.')
return redirect('home')
else:
form = AttendanceLogForm(
user=user,
initial={'date': timezone.now().date()}
)
# Build a list of worker data for the estimated cost JavaScript
# (admins only — supervisors don't see the cost card)
worker_rates = {}
if is_admin(user):
for w in Worker.objects.filter(active=True):
worker_rates[str(w.id)] = str(w.daily_rate)
# Build team→workers mapping so the JS can auto-check workers when a
# team is selected from the dropdown. Key = team ID, Value = list of worker IDs.
team_workers_map = {}
teams_qs = Team.objects.filter(active=True).prefetch_related('workers')
if not is_admin(user):
# Supervisors only see their own teams
teams_qs = teams_qs.filter(supervisor=user)
for team in teams_qs:
active_worker_ids = list(
team.workers.filter(active=True).values_list('id', flat=True)
)
team_workers_map[team.id] = active_worker_ids
return render(request, 'core/attendance_log.html', {
'form': form,
'is_admin': is_admin(user),
'worker_rates_json': worker_rates,
'team_workers_json': json.dumps(team_workers_map),
})
# === WORK LOG HISTORY ===
# Shows work logs in two modes: a table list or a monthly calendar grid.
# Supervisors only see their own projects. Admins see everything.
# The calendar view groups logs by day and lets you click a day to see details.
@login_required
def work_history(request):
user = request.user
# Start with base queryset
if is_admin(user):
logs = WorkLog.objects.all()
else:
# Supervisors only see logs for their projects
logs = WorkLog.objects.filter(
Q(supervisor=user) | Q(project__supervisors=user)
).distinct()
# --- Filters ---
# Read filter values from the URL query string
worker_filter = request.GET.get('worker', '')
project_filter = request.GET.get('project', '')
status_filter = request.GET.get('status', '')
if worker_filter:
logs = logs.filter(workers__id=worker_filter).distinct()
if project_filter:
logs = logs.filter(project__id=project_filter)
if status_filter == 'paid':
# "Paid" = has at least one PayrollRecord linked
logs = logs.filter(payroll_records__isnull=False).distinct()
elif status_filter == 'unpaid':
# "Unpaid" = has no PayrollRecord linked
logs = logs.filter(payroll_records__isnull=True)
# Add related data and order by date (newest first)
logs = logs.select_related(
'project', 'supervisor'
).prefetch_related('workers', 'payroll_records').order_by('-date', '-id')
# Get filter options for the dropdowns
if is_admin(user):
filter_workers = Worker.objects.filter(active=True).order_by('name')
filter_projects = Project.objects.filter(active=True).order_by('name')
else:
supervised_teams = Team.objects.filter(supervisor=user, active=True)
filter_workers = Worker.objects.filter(
active=True, teams__in=supervised_teams
).distinct().order_by('name')
filter_projects = Project.objects.filter(
active=True, supervisors=user
).order_by('name')
# --- View mode: list or calendar ---
view_mode = request.GET.get('view', 'list')
today = timezone.now().date()
# Build a query string that preserves all current filters
# (used by the List/Calendar toggle links to keep filters when switching)
filter_params = ''
if worker_filter:
filter_params += '&worker=' + worker_filter
if project_filter:
filter_params += '&project=' + project_filter
if status_filter:
filter_params += '&status=' + status_filter
context = {
'logs': logs,
'filter_workers': filter_workers,
'filter_projects': filter_projects,
'selected_worker': worker_filter,
'selected_project': project_filter,
'selected_status': status_filter,
'is_admin': is_admin(user),
'view_mode': view_mode,
'filter_params': filter_params,
}
# === CALENDAR MODE ===
# Build a monthly grid of days, each containing the work logs for that day.
# Also build a JSON object keyed by date string for the JavaScript
# click-to-see-details panel.
if view_mode == 'calendar':
# Get target month from URL (default: current month)
try:
target_year = int(request.GET.get('year', today.year))
target_month = int(request.GET.get('month', today.month))
if not (1 <= target_month <= 12):
target_year, target_month = today.year, today.month
except (ValueError, TypeError):
target_year, target_month = today.year, today.month
# Build the calendar grid using Python's calendar module.
# monthdatescalendar() returns a list of weeks, where each week is
# a list of 7 datetime.date objects (including overflow from prev/next month).
cal = cal_module.Calendar(firstweekday=0) # Week starts on Monday
month_dates = cal.monthdatescalendar(target_year, target_month)
# Get the full date range for the calendar grid (includes overflow days)
first_display_date = month_dates[0][0]
last_display_date = month_dates[-1][-1]
# Filter logs to only this date range (improves performance)
month_logs = logs.filter(date__range=[first_display_date, last_display_date])
# Group logs by date string for quick lookup
logs_by_date = {}
for log in month_logs:
date_key = log.date.isoformat()
if date_key not in logs_by_date:
logs_by_date[date_key] = []
logs_by_date[date_key].append(log)
# Build the calendar_weeks structure that the template iterates over.
# Each day cell has: date, day number, whether it's the current month,
# a list of log objects, and a count badge number.
calendar_weeks = []
for week in month_dates:
week_data = []
for day in week:
date_key = day.isoformat()
day_logs = logs_by_date.get(date_key, [])
week_data.append({
'date': day,
'day': day.day,
'is_current_month': day.month == target_month,
'is_today': day == today,
'records': day_logs,
'count': len(day_logs),
})
calendar_weeks.append(week_data)
# Build detail data for JavaScript — when you click a day cell,
# the JS reads this JSON to populate the detail panel below the calendar.
# NOTE: Pass raw Python dict, not json.dumps() — the template's
# |json_script filter handles serialization.
calendar_detail = {}
for date_key, day_logs in logs_by_date.items():
calendar_detail[date_key] = []
for log in day_logs:
entry = {
'project': log.project.name,
'workers': [w.name for w in log.workers.all()],
'supervisor': (
log.supervisor.get_full_name() or log.supervisor.username
) if log.supervisor else '-',
'notes': log.notes or '',
'is_paid': log.payroll_records.exists(),
'overtime': log.get_overtime_amount_display() if log.overtime_amount > 0 else '',
}
# Only show cost data to admins
if is_admin(user):
entry['amount'] = float(
sum(w.daily_rate for w in log.workers.all())
)
calendar_detail[date_key].append(entry)
# Calculate previous/next month for navigation arrows
if target_month == 1:
prev_year, prev_month = target_year - 1, 12
else:
prev_year, prev_month = target_year, target_month - 1
if target_month == 12:
next_year, next_month = target_year + 1, 1
else:
next_year, next_month = target_year, target_month + 1
month_name = datetime.date(target_year, target_month, 1).strftime('%B %Y')
context.update({
'calendar_weeks': calendar_weeks,
'calendar_detail': calendar_detail,
'curr_year': target_year,
'curr_month': target_month,
'month_name': month_name,
'prev_year': prev_year,
'prev_month': prev_month,
'next_year': next_year,
'next_month': next_month,
})
return render(request, 'core/work_history.html', context)
# === CSV EXPORT ===
# Downloads the filtered work log history as a CSV file.
# Uses the same filters as the work_history page.
@login_required
def export_work_log_csv(request):
user = request.user
# Build the same queryset as work_history, using the same filters
if is_admin(user):
logs = WorkLog.objects.all()
else:
logs = WorkLog.objects.filter(
Q(supervisor=user) | Q(project__supervisors=user)
).distinct()
worker_filter = request.GET.get('worker', '')
project_filter = request.GET.get('project', '')
status_filter = request.GET.get('status', '')
if worker_filter:
logs = logs.filter(workers__id=worker_filter).distinct()
if project_filter:
logs = logs.filter(project__id=project_filter)
if status_filter == 'paid':
logs = logs.filter(payroll_records__isnull=False).distinct()
elif status_filter == 'unpaid':
logs = logs.filter(payroll_records__isnull=True)
logs = logs.select_related(
'project', 'supervisor'
).prefetch_related('workers', 'payroll_records').order_by('-date', '-id')
# Create the CSV response
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="work_log_history.csv"'
writer = csv.writer(response)
writer.writerow(['Date', 'Project', 'Workers', 'Overtime', 'Payment Status', 'Supervisor'])
for log in logs:
worker_names = ', '.join(w.name for w in log.workers.all())
payment_status = 'Paid' if log.payroll_records.exists() else 'Unpaid'
overtime_display = log.get_overtime_amount_display() if log.overtime_amount > 0 else 'None'
supervisor_name = log.supervisor.get_full_name() or log.supervisor.username if log.supervisor else '-'
writer.writerow([
log.date.strftime('%Y-%m-%d'),
log.project.name,
worker_names,
overtime_display,
payment_status,
supervisor_name,
])
return response
# === TOGGLE RESOURCE STATUS (AJAX) ===
# Called by the toggle switches on the dashboard to activate/deactivate
# workers, projects, or teams without reloading the page.
@login_required
def toggle_active(request, model_name, item_id):
if request.method != 'POST':
return HttpResponseForbidden("Only POST requests are allowed.")
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
# Map URL parameter to the correct model class
model_map = {
'worker': Worker,
'project': Project,
'team': Team
}
if model_name not in model_map:
return JsonResponse({'error': 'Invalid model'}, status=400)
model = model_map[model_name]
try:
item = model.objects.get(id=item_id)
item.active = not item.active
item.save()
return JsonResponse({
'status': 'success',
'active': item.active,
'message': f'{item.name} is now {"active" if item.active else "inactive"}.'
})
except model.DoesNotExist:
return JsonResponse({'error': 'Item not found'}, status=404)
# =============================================================================
# === PAYROLL DASHBOARD ===
# The main payroll page. Shows per-worker breakdown of what's owed,
# adjustment management, payment processing, and Chart.js analytics.
# Admin-only — supervisors cannot access this page.
# =============================================================================
@login_required
def payroll_dashboard(request):
if not is_admin(request.user):
messages.error(request, 'Only admins can access the payroll dashboard.')
return redirect('home')
status_filter = request.GET.get('status', 'pending')
# --- Per-worker pending payment data ---
# For each active worker, calculate: unpaid days × daily_rate + net adjustments
active_workers = Worker.objects.filter(active=True).prefetch_related(
Prefetch('work_logs', queryset=WorkLog.objects.prefetch_related(
'payroll_records', 'priced_workers'
).select_related('project')),
Prefetch('adjustments', queryset=PayrollAdjustment.objects.filter(
payroll_record__isnull=True
).select_related('project', 'loan', 'work_log'),
to_attr='pending_adjustments_list'),
).order_by('name')
workers_data = []
outstanding_total = Decimal('0.00')
all_ot_data = [] # For the Price Overtime modal
for worker in active_workers:
# Find unpaid work logs for this worker.
# A log is "unpaid for this worker" if no PayrollRecord links
# to BOTH this log AND this worker.
unpaid_logs = []
for log in worker.work_logs.all():
paid_worker_ids = {pr.worker_id for pr in log.payroll_records.all()}
if worker.id not in paid_worker_ids:
unpaid_logs.append(log)
log_count = len(unpaid_logs)
log_amount = log_count * worker.daily_rate
# Find unpriced overtime in unpaid logs
ot_data_worker = []
for log in unpaid_logs:
if log.overtime_amount > 0:
priced_ids = {w.id for w in log.priced_workers.all()}
if worker.id not in priced_ids:
ot_entry = {
'worker_id': worker.id,
'worker_name': worker.name,
'log_id': log.id,
'date': log.date.strftime('%Y-%m-%d'),
'project': log.project.name,
'overtime': float(log.overtime_amount),
'ot_label': log.get_overtime_amount_display(),
}
ot_data_worker.append(ot_entry)
all_ot_data.append(ot_entry)
# Calculate net adjustment amount
pending_adjs = worker.pending_adjustments_list
adj_total = Decimal('0.00')
for adj in pending_adjs:
if adj.type in ADDITIVE_TYPES:
adj_total += adj.amount
elif adj.type in DEDUCTIVE_TYPES:
adj_total -= adj.amount
total_payable = log_amount + adj_total
# Only include workers who have something pending
if log_count > 0 or pending_adjs:
workers_data.append({
'worker': worker,
'unpaid_count': log_count,
'unpaid_amount': log_amount,
'adj_amount': adj_total,
'total_payable': total_payable,
'adjustments': pending_adjs,
'logs': unpaid_logs,
'ot_data': ot_data_worker,
'day_rate': float(worker.daily_rate),
})
outstanding_total += max(total_payable, Decimal('0.00'))
# --- Payment history ---
paid_records = PayrollRecord.objects.select_related(
'worker'
).order_by('-date', '-id')
# --- Recent payments total (last 60 days) ---
sixty_days_ago = timezone.now().date() - timezone.timedelta(days=60)
recent_payments_total = PayrollRecord.objects.filter(
date__gte=sixty_days_ago
).aggregate(total=Sum('amount_paid'))['total'] or Decimal('0.00')
# --- Outstanding cost per project ---
# Check per-worker: a WorkLog is "unpaid for worker X" if no PayrollRecord
# links BOTH that log AND that worker. This handles partially-paid logs.
outstanding_project_costs = []
for project in Project.objects.filter(active=True):
project_outstanding = Decimal('0.00')
# Unpaid work log costs — check each worker individually
for log in project.work_logs.prefetch_related('payroll_records', 'workers').all():
paid_worker_ids = {pr.worker_id for pr in log.payroll_records.all()}
for w in log.workers.all():
if w.id not in paid_worker_ids:
project_outstanding += w.daily_rate
# Unpaid adjustments for this project
unpaid_adjs = PayrollAdjustment.objects.filter(
payroll_record__isnull=True
).filter(Q(project=project) | Q(work_log__project=project))
for adj in unpaid_adjs:
if adj.type in ADDITIVE_TYPES:
project_outstanding += adj.amount
elif adj.type in DEDUCTIVE_TYPES:
project_outstanding -= adj.amount
if project_outstanding != 0:
outstanding_project_costs.append({
'name': project.name,
'cost': project_outstanding,
})
# --- Chart data: last 6 months ---
today = timezone.now().date()
chart_months = []
for i in range(5, -1, -1):
m = today.month - i
y = today.year
while m <= 0:
m += 12
y -= 1
chart_months.append((y, m))
chart_labels = [
datetime.date(y, m, 1).strftime('%b %Y') for y, m in chart_months
]
# Monthly payroll totals
paid_by_month_qs = PayrollRecord.objects.annotate(
month=TruncMonth('date')
).values('month').annotate(total=Sum('amount_paid')).order_by('month')
paid_by_month = {
(r['month'].year, r['month'].month): float(r['total'])
for r in paid_by_month_qs
}
chart_totals = [paid_by_month.get((y, m), 0) for y, m in chart_months]
# Per-project monthly costs (for stacked bar chart)
project_chart_data = []
for project in Project.objects.filter(active=True):
monthly_data = []
for y, m in chart_months:
month_cost = Decimal('0.00')
month_logs = project.work_logs.filter(
date__year=y, date__month=m
).prefetch_related('workers')
for log in month_logs:
for w in log.workers.all():
month_cost += w.daily_rate
# Include paid adjustments for this project in this month
paid_adjs = PayrollAdjustment.objects.filter(
payroll_record__isnull=False,
date__year=y, date__month=m,
).filter(Q(project=project) | Q(work_log__project=project))
for adj in paid_adjs:
if adj.type in ADDITIVE_TYPES:
month_cost += adj.amount
elif adj.type in DEDUCTIVE_TYPES:
month_cost -= adj.amount
monthly_data.append(float(month_cost))
if any(v > 0 for v in monthly_data):
project_chart_data.append({
'name': project.name,
'data': monthly_data,
})
# --- Loans ---
loan_filter = request.GET.get('loan_status', 'active')
if loan_filter == 'history':
loans = Loan.objects.filter(active=False).select_related('worker').order_by('-date')
else:
loans = Loan.objects.filter(active=True).select_related('worker').order_by('-date')
# Total active loan balance (always shown in analytics card, regardless of tab)
active_loans = Loan.objects.filter(active=True)
active_loans_count = active_loans.count()
active_loans_balance = active_loans.aggregate(
total=Sum('remaining_balance')
)['total'] or Decimal('0.00')
# --- Active projects and workers for modal dropdowns ---
active_projects = Project.objects.filter(active=True).order_by('name')
all_workers = Worker.objects.filter(active=True).order_by('name')
all_teams = Team.objects.filter(active=True).prefetch_related('workers').order_by('name')
# Team-workers map for auto-selecting workers when a team is picked
team_workers_map = {}
for team in all_teams:
team_workers_map[str(team.id)] = list(
team.workers.filter(active=True).values_list('id', flat=True)
)
# NOTE: Pass raw Python objects here, NOT json.dumps() strings.
# The template uses Django's |json_script filter which handles
# JSON serialization. If we pre-serialize with json.dumps(), the
# filter double-encodes the data and JavaScript receives strings
# instead of arrays/objects, which crashes the entire script.
context = {
'workers_data': workers_data,
'paid_records': paid_records,
'outstanding_total': outstanding_total,
'recent_payments_total': recent_payments_total,
'outstanding_project_costs': outstanding_project_costs,
'active_tab': status_filter,
'all_workers': all_workers,
'all_teams': all_teams,
'team_workers_map_json': team_workers_map,
'adjustment_types': PayrollAdjustment.TYPE_CHOICES,
'active_projects': active_projects,
'loans': loans,
'loan_filter': loan_filter,
'chart_labels_json': chart_labels,
'chart_totals_json': chart_totals,
'project_chart_json': project_chart_data,
'overtime_data_json': all_ot_data,
'today': today, # For pre-filling date fields in modals
'active_loans_count': active_loans_count,
'active_loans_balance': active_loans_balance,
}
return render(request, 'core/payroll_dashboard.html', context)
# =============================================================================
# === PROCESS PAYMENT ===
# Creates a PayrollRecord for a worker, linking all their unpaid work logs
# and applying any pending adjustments. Handles loan repayment deductions.
# =============================================================================
@login_required
def process_payment(request, worker_id):
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
worker = get_object_or_404(Worker, id=worker_id)
# Find unpaid logs for this worker
unpaid_logs = worker.work_logs.exclude(
payroll_records__worker=worker
)
log_count = unpaid_logs.count()
logs_amount = log_count * worker.daily_rate
# Find pending adjustments
pending_adjs = worker.adjustments.filter(payroll_record__isnull=True)
if log_count == 0 and not pending_adjs.exists():
messages.warning(request, f'No pending payments for {worker.name}.')
return redirect('payroll_dashboard')
# Calculate net adjustment
adj_amount = Decimal('0.00')
for adj in pending_adjs:
if adj.type in ADDITIVE_TYPES:
adj_amount += adj.amount
elif adj.type in DEDUCTIVE_TYPES:
adj_amount -= adj.amount
total_amount = logs_amount + adj_amount
with transaction.atomic():
# Create the PayrollRecord
payroll_record = PayrollRecord.objects.create(
worker=worker,
amount_paid=total_amount,
date=timezone.now().date(),
)
# Link all unpaid work logs to this payment
payroll_record.work_logs.set(unpaid_logs)
# Link all pending adjustments to this payment
for adj in pending_adjs:
adj.payroll_record = payroll_record
adj.save()
# If this is a loan repayment, deduct from the loan balance
if adj.type == 'Loan Repayment' and adj.loan:
adj.loan.remaining_balance -= adj.amount
if adj.loan.remaining_balance <= 0:
adj.loan.remaining_balance = Decimal('0.00')
adj.loan.active = False
adj.loan.save()
# =========================================================================
# EMAIL PAYSLIP (outside the transaction — if email fails, payment is
# still saved. We don't want a network error to roll back a real payment.)
# =========================================================================
# Lazy import — avoids crashing the app if xhtml2pdf isn't installed
from .utils import render_to_pdf
subject = f"Payslip for {worker.name} - {payroll_record.date}"
# Context for both the HTML email body and the PDF attachment
email_context = {
'record': payroll_record,
'logs_count': log_count,
'logs_amount': logs_amount,
'adjustments': payroll_record.adjustments.all(),
'deductive_types': DEDUCTIVE_TYPES,
}
# 1. Render HTML email body
html_message = render_to_string('core/email/payslip_email.html', email_context)
plain_message = strip_tags(html_message)
# 2. Render PDF attachment (returns None if xhtml2pdf is not installed)
pdf_content = render_to_pdf('core/pdf/payslip_pdf.html', email_context)
# 3. Send email with PDF attached
recipient = getattr(settings, 'SPARK_RECEIPT_EMAIL', None)
if recipient:
try:
email = EmailMultiAlternatives(
subject,
plain_message,
settings.DEFAULT_FROM_EMAIL,
[recipient],
)
email.attach_alternative(html_message, "text/html")
if pdf_content:
email.attach(
f"Payslip_{worker.id}_{payroll_record.date}.pdf",
pdf_content,
'application/pdf'
)
email.send()
messages.success(
request,
f'Payment of R {total_amount:,.2f} processed for {worker.name}. '
f'Payslip emailed successfully.'
)
except Exception as e:
# Payment is saved — just warn that email failed
messages.warning(
request,
f'Payment of R {total_amount:,.2f} processed for {worker.name}, '
f'but email delivery failed: {str(e)}'
)
else:
# No SPARK_RECEIPT_EMAIL configured — just show success
messages.success(
request,
f'Payment of R {total_amount:,.2f} processed for {worker.name}. '
f'{log_count} work log(s) marked as paid.'
)
return redirect('payroll_dashboard')
# =============================================================================
# === PRICE OVERTIME ===
# Creates Overtime adjustments for workers who have unpriced overtime on
# their work logs. Called via AJAX from the Price Overtime modal.
# =============================================================================
@login_required
def price_overtime(request):
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
log_ids = request.POST.getlist('log_id[]')
worker_ids = request.POST.getlist('worker_id[]')
rate_pcts = request.POST.getlist('rate_pct[]')
created_count = 0
for log_id, w_id, pct in zip(log_ids, worker_ids, rate_pcts):
try:
worklog = WorkLog.objects.select_related('project').get(id=int(log_id))
worker = Worker.objects.get(id=int(w_id))
rate_pct = Decimal(pct)
# Calculate: daily_rate × overtime_fraction × (rate_percentage / 100)
amount = worker.daily_rate * worklog.overtime_amount * (rate_pct / Decimal('100'))
if amount > 0:
PayrollAdjustment.objects.create(
worker=worker,
type='Overtime',
amount=amount,
date=worklog.date,
description=f'Overtime ({worklog.get_overtime_amount_display()}) at {pct}% on {worklog.project.name}',
work_log=worklog,
project=worklog.project,
)
# Mark this worker as "priced" for this log's overtime
worklog.priced_workers.add(worker)
created_count += 1
except (WorkLog.DoesNotExist, Worker.DoesNotExist, Exception):
continue
messages.success(request, f'Priced {created_count} overtime adjustment(s).')
return redirect('payroll_dashboard')
# =============================================================================
# === ADD ADJUSTMENT ===
# Creates a new payroll adjustment (bonus, deduction, loan, etc.).
# Called via POST from the Add Adjustment modal.
# =============================================================================
@login_required
def add_adjustment(request):
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
worker_ids = request.POST.getlist('workers')
adj_type = request.POST.get('type', '')
amount_str = request.POST.get('amount', '0')
description = request.POST.get('description', '')
date_str = request.POST.get('date', '')
project_id = request.POST.get('project', '')
# Validate amount
try:
amount = Decimal(amount_str)
if amount <= 0:
raise ValueError
except (ValueError, Exception):
messages.error(request, 'Please enter a valid amount greater than zero.')
return redirect('payroll_dashboard')
# Validate date
try:
adj_date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date() if date_str else timezone.now().date()
except ValueError:
adj_date = timezone.now().date()
# Validate project for types that require it
project = None
if project_id:
try:
project = Project.objects.get(id=int(project_id))
except Project.DoesNotExist:
pass
project_required_types = ('Overtime', 'Bonus', 'Deduction', 'Advance Payment')
if adj_type in project_required_types and not project:
messages.error(request, 'A project must be selected for this adjustment type.')
return redirect('payroll_dashboard')
created_count = 0
for w_id in worker_ids:
try:
worker = Worker.objects.get(id=int(w_id))
except Worker.DoesNotExist:
continue
loan = None
if adj_type == 'Loan Repayment':
# Find the worker's active loan
loan = worker.loans.filter(active=True).first()
if not loan:
messages.warning(request, f'{worker.name} has no active loan — skipped.')
continue
if adj_type == 'New Loan':
# Create a new Loan object first
loan = Loan.objects.create(
worker=worker,
principal_amount=amount,
remaining_balance=amount,
date=adj_date,
reason=description,
)
PayrollAdjustment.objects.create(
worker=worker,
type=adj_type,
amount=amount,
date=adj_date,
description=description,
project=project,
loan=loan,
)
created_count += 1
messages.success(request, f'Created {created_count} {adj_type} adjustment(s).')
return redirect('payroll_dashboard')
# =============================================================================
# === EDIT ADJUSTMENT ===
# Updates an existing unpaid adjustment. Type changes are limited to
# Bonus ↔ Deduction swaps only.
# =============================================================================
@login_required
def edit_adjustment(request, adj_id):
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
adj = get_object_or_404(PayrollAdjustment, id=adj_id)
# Can't edit adjustments that have already been paid
if adj.payroll_record is not None:
messages.error(request, 'Cannot edit a paid adjustment.')
return redirect('payroll_dashboard')
# Can't edit Advance Payments
if adj.type == 'Advance Payment':
messages.warning(request, 'Advance payments cannot be edited.')
return redirect('payroll_dashboard')
# Update fields
try:
adj.amount = Decimal(request.POST.get('amount', str(adj.amount)))
except (ValueError, Exception):
pass
adj.description = request.POST.get('description', adj.description)
date_str = request.POST.get('date', '')
if date_str:
try:
adj.date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date()
except ValueError:
pass
# Type change — only allow Bonus ↔ Deduction
new_type = request.POST.get('type', adj.type)
if adj.type in ('Bonus', 'Deduction') and new_type in ('Bonus', 'Deduction'):
adj.type = new_type
# Project
project_id = request.POST.get('project', '')
if project_id:
try:
adj.project = Project.objects.get(id=int(project_id))
except Project.DoesNotExist:
pass
else:
adj.project = None
adj.save()
# If it's a Loan adjustment, sync the loan details
if adj.type == 'New Loan' and adj.loan:
adj.loan.principal_amount = adj.amount
adj.loan.remaining_balance = adj.amount
adj.loan.reason = adj.description
adj.loan.save()
messages.success(request, f'{adj.type} adjustment updated.')
return redirect('payroll_dashboard')
# =============================================================================
# === DELETE ADJUSTMENT ===
# Removes an unpaid adjustment. Handles cascade logic for Loans and Overtime.
# =============================================================================
@login_required
def delete_adjustment(request, adj_id):
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
adj = get_object_or_404(PayrollAdjustment, id=adj_id)
# Can't delete adjustments that have been paid
if adj.payroll_record is not None:
messages.error(request, 'Cannot delete a paid adjustment.')
return redirect('payroll_dashboard')
adj_type = adj.type
worker_name = adj.worker.name
if adj_type == 'New Loan' and adj.loan:
# Check if any paid repayments exist for this loan
paid_repayments = PayrollAdjustment.objects.filter(
loan=adj.loan,
type='Loan Repayment',
payroll_record__isnull=False,
)
if paid_repayments.exists():
messages.error(
request,
f'Cannot delete loan for {worker_name} — it has paid repayments.'
)
return redirect('payroll_dashboard')
# Delete all unpaid repayments for this loan, then the loan itself
PayrollAdjustment.objects.filter(
loan=adj.loan,
type='Loan Repayment',
payroll_record__isnull=True,
).delete()
adj.loan.delete()
elif adj_type == 'Overtime' and adj.work_log:
# "Un-price" the overtime — remove worker from priced_workers M2M
adj.work_log.priced_workers.remove(adj.worker)
adj.delete()
messages.success(request, f'{adj_type} adjustment for {worker_name} deleted.')
return redirect('payroll_dashboard')
# =============================================================================
# === PREVIEW PAYSLIP (AJAX) ===
# Returns a JSON preview of what a worker's payslip would look like.
# Called from the Preview Payslip modal without saving anything.
# =============================================================================
@login_required
def preview_payslip(request, worker_id):
if not is_admin(request.user):
return JsonResponse({'error': 'Not authorized'}, status=403)
worker = get_object_or_404(Worker, id=worker_id)
# Find unpaid logs
unpaid_logs = []
for log in worker.work_logs.select_related('project').prefetch_related('payroll_records').all():
paid_worker_ids = {pr.worker_id for pr in log.payroll_records.all()}
if worker.id not in paid_worker_ids:
unpaid_logs.append({
'date': log.date.strftime('%Y-%m-%d'),
'project': log.project.name,
})
log_count = len(unpaid_logs)
log_amount = float(log_count * worker.daily_rate)
# Find pending adjustments
pending_adjs = worker.adjustments.filter(
payroll_record__isnull=True
).select_related('project')
adjustments_list = []
adj_total = 0.0
for adj in pending_adjs:
sign = '+' if adj.type in ADDITIVE_TYPES else '-'
adj_total += float(adj.amount) if adj.type in ADDITIVE_TYPES else -float(adj.amount)
adjustments_list.append({
'type': adj.type,
'amount': float(adj.amount),
'sign': sign,
'description': adj.description,
'project': adj.project.name if adj.project else '',
})
return JsonResponse({
'worker_name': worker.name,
'worker_id_number': worker.id_number,
'day_rate': float(worker.daily_rate),
'days_worked': log_count,
'log_amount': log_amount,
'adjustments': adjustments_list,
'adj_total': adj_total,
'net_pay': log_amount + adj_total,
'logs': unpaid_logs,
})
# =============================================================================
# === PAYSLIP DETAIL ===
# Shows a completed payment (PayrollRecord) as a printable payslip page.
# Displays: worker details, work log table, adjustments table, totals.
# Reached from the "Payment History" tab on the payroll dashboard.
# =============================================================================
@login_required
def payslip_detail(request, pk):
"""Show a completed payslip with work logs, adjustments, and totals."""
if not is_admin(request.user):
return redirect('payroll_dashboard')
record = get_object_or_404(PayrollRecord, pk=pk)
# Get the work logs included in this payment
logs = record.work_logs.select_related('project').order_by('date')
# Get the adjustments linked to this payment
adjustments = record.adjustments.all().order_by('type')
# Calculate base pay from logs
# Each log = 1 day of work at the worker's daily rate
base_pay = record.worker.daily_rate * logs.count()
# Calculate net adjustment amount (additive minus deductive)
adjustments_net = record.amount_paid - base_pay
context = {
'record': record,
'logs': logs,
'adjustments': adjustments,
'base_pay': base_pay,
'adjustments_net': adjustments_net,
'adjustments_net_abs': abs(adjustments_net),
'deductive_types': DEDUCTIVE_TYPES,
}
return render(request, 'core/payslip.html', context)
# =============================================================================
# === CREATE EXPENSE RECEIPT ===
# Single-page form for recording business expenses.
# Supports dynamic line items (products + amounts) and VAT calculation.
# On save: emails an HTML + PDF receipt to Spark Receipt for accounting.
# =============================================================================
@login_required
def create_receipt(request):
"""Create a new expense receipt and email it to Spark Receipt."""
if not is_staff_or_supervisor(request.user):
return redirect('home')
if request.method == 'POST':
form = ExpenseReceiptForm(request.POST)
items = ExpenseLineItemFormSet(request.POST)
if form.is_valid() and items.is_valid():
# Save the receipt header (but don't commit yet — need to set user)
receipt = form.save(commit=False)
receipt.user = request.user
# Set temporary zero values so the first save doesn't fail.
# (subtotal and total_amount have no default in the model,
# so they'd be NULL — which MariaDB rejects.)
# We'll recalculate these properly after saving line items.
receipt.subtotal = Decimal('0.00')
receipt.vat_amount = Decimal('0.00')
receipt.total_amount = Decimal('0.00')
receipt.save()
# Save line items — link them to this receipt
items.instance = receipt
line_items = items.save()
# === BACKEND VAT CALCULATION ===
# The frontend shows live totals, but we recalculate on the server
# using Python Decimal for accuracy (no floating-point rounding errors).
sum_amount = sum(item.amount for item in line_items)
vat_type = receipt.vat_type
if vat_type == 'Included':
# "VAT Included" means the entered amounts already include 15% VAT.
# To find the pre-VAT subtotal: divide by 1.15
# Example: R100 entered → Subtotal R86.96, VAT R13.04, Total R100
receipt.total_amount = sum_amount
receipt.subtotal = (sum_amount / Decimal('1.15')).quantize(Decimal('0.01'))
receipt.vat_amount = receipt.total_amount - receipt.subtotal
elif vat_type == 'Excluded':
# "VAT Excluded" means the entered amounts are pre-VAT.
# Add 15% on top for the total.
# Example: R100 entered → Subtotal R100, VAT R15, Total R115
receipt.subtotal = sum_amount
receipt.vat_amount = (sum_amount * Decimal('0.15')).quantize(Decimal('0.01'))
receipt.total_amount = receipt.subtotal + receipt.vat_amount
else:
# "None" — no VAT applies
receipt.subtotal = sum_amount
receipt.vat_amount = Decimal('0.00')
receipt.total_amount = sum_amount
receipt.save()
# =================================================================
# EMAIL RECEIPT (same pattern as payslip email)
# If email fails, the receipt is still saved.
# =================================================================
# Lazy import — avoids crashing the app if xhtml2pdf isn't installed
from .utils import render_to_pdf
subject = f"Receipt from {receipt.vendor_name} - {receipt.date}"
email_context = {
'receipt': receipt,
'items': line_items,
}
# 1. Render HTML email body
html_message = render_to_string(
'core/email/receipt_email.html', email_context
)
plain_message = strip_tags(html_message)
# 2. Render PDF attachment (returns None if xhtml2pdf is not installed)
pdf_content = render_to_pdf(
'core/pdf/receipt_pdf.html', email_context
)
# 3. Send email with PDF attached
recipient = getattr(settings, 'SPARK_RECEIPT_EMAIL', None)
if recipient:
try:
email = EmailMultiAlternatives(
subject,
plain_message,
settings.DEFAULT_FROM_EMAIL,
[recipient],
)
email.attach_alternative(html_message, "text/html")
if pdf_content:
email.attach(
f"Receipt_{receipt.id}.pdf",
pdf_content,
'application/pdf'
)
email.send()
messages.success(
request,
'Receipt created and sent to SparkReceipt.'
)
except Exception as e:
messages.warning(
request,
f'Receipt saved, but email failed: {str(e)}'
)
else:
messages.success(request, 'Receipt saved successfully.')
# Redirect back to a blank form for the next receipt
return redirect('create_receipt')
else:
# GET request — show a blank form with today's date
form = ExpenseReceiptForm(initial={'date': timezone.now().date()})
items = ExpenseLineItemFormSet()
return render(request, 'core/create_receipt.html', {
'form': form,
'items': items,
})
# =============================================================================
# === IMPORT DATA (TEMPORARY) ===
# Runs the import_production_data command from the browser.
# Visit /import-data/ once to populate the database. Safe to re-run.
# REMOVE THIS VIEW once data is imported.
# =============================================================================
def import_data(request):
"""Runs the import_production_data management command from the browser."""
from django.core.management import call_command
from io import StringIO
output = StringIO()
try:
call_command('import_production_data', stdout=output)
result = output.getvalue()
lines = result.replace('\n', '<br>')
return HttpResponse(
'<html><body style="font-family: monospace; padding: 20px;">'
'<h2>Import Complete!</h2>'
'<div>' + lines + '</div>'
'<br><br>'
'<a href="/admin/">Go to Admin Panel</a> | '
'<a href="/payroll/">Go to Payroll Dashboard</a> | '
'<a href="/">Go to Dashboard</a>'
'</body></html>'
)
except Exception as e:
return HttpResponse(
'<html><body style="font-family: monospace; padding: 20px; color: red;">'
'<h2>Import Error</h2>'
'<pre>' + str(e) + '</pre>'
'</body></html>',
status=500,
)