38686-vm/core/views.py
Konrad du Plessis 66fab12b90 Add 'Pay Immediately' option for New Loan adjustments
When creating a New Loan, a "Pay Immediately" checkbox (checked by
default) processes the loan right away — creates PayrollRecord, sends
payslip to Spark, and records the loan as paid. Unchecking it keeps
the old behavior where the loan sits in Pending Payments.

Also adds loan-only payslip detection (like advance-only) across all
payslip views: email template, PDF template, and browser detail page
show a clean "Loan Payslip" layout instead of "0 days worked".

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 09:59:42 +02:00

2470 lines
100 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# === VIEWS ===
# All the page logic for the LabourPay app.
# Each function here handles a URL and decides what to show the user.
import csv
import json
import datetime
import calendar as cal_module
from decimal import Decimal
from django.shortcuts import render, redirect, get_object_or_404
from django.utils import timezone
from django.db import transaction
from django.db.models import Sum, Count, Q, Prefetch
from django.db.models.functions import TruncMonth
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse, HttpResponseForbidden, HttpResponse
from django.core.mail import EmailMultiAlternatives
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.conf import settings
from .models import Worker, Project, WorkLog, Team, PayrollRecord, Loan, PayrollAdjustment
from .forms import AttendanceLogForm, PayrollAdjustmentForm, ExpenseReceiptForm, ExpenseLineItemFormSet
# NOTE: render_to_pdf is NOT imported here at the top level.
# It's imported lazily inside process_payment() and create_receipt()
# to avoid crashing the entire app if xhtml2pdf is not installed on the server.
# === PAYROLL CONSTANTS ===
# These define which adjustment types ADD to a worker's pay vs SUBTRACT from it.
# "New Loan" and "Advance Payment" are additive — the worker receives money upfront.
# "Loan Repayment" and "Advance Repayment" are deductive — they reduce net pay.
ADDITIVE_TYPES = ['Bonus', 'Overtime', 'New Loan', 'Advance Payment']
DEDUCTIVE_TYPES = ['Deduction', 'Loan Repayment', 'Advance Repayment']
# === PERMISSION HELPERS ===
# These small functions check what kind of user is logged in.
# "Admin" = the boss (is_staff or is_superuser in Django).
# "Supervisor" = someone who manages teams or projects, or is in the Work Logger group.
def is_admin(user):
"""Returns True if the user is staff or superuser (the boss)."""
return user.is_staff or user.is_superuser
def is_supervisor(user):
"""Returns True if the user manages teams, has assigned projects, or is a Work Logger."""
return (
user.supervised_teams.exists()
or user.assigned_projects.exists()
or user.groups.filter(name='Work Logger').exists()
)
def is_staff_or_supervisor(user):
"""Returns True if the user is either an admin or a supervisor."""
return is_admin(user) or is_supervisor(user)
# === PAY SCHEDULE HELPERS ===
# These help figure out a worker's pay period based on their team's schedule.
def get_worker_active_team(worker):
"""Return the worker's active team (first one found), or None."""
return worker.teams.filter(active=True).first()
def get_pay_period(team, reference_date=None):
"""
Calculate the current pay period's start and end dates for a team.
Returns (period_start, period_end) or (None, None) if the team has
no pay schedule configured.
How it works:
- pay_start_date is the "anchor" — the first day of the very first pay period.
- pay_frequency determines the length of each period (7, 14, or ~30 days).
- We step forward from the anchor in period-length increments until
we find the period that contains reference_date (today by default).
"""
if not team or not team.pay_frequency or not team.pay_start_date:
return (None, None)
if reference_date is None:
reference_date = timezone.now().date()
anchor = team.pay_start_date
# === WEEKLY / FORTNIGHTLY ===
# Simple fixed-length periods (7 or 14 days).
if team.pay_frequency in ('weekly', 'fortnightly'):
period_days = 7 if team.pay_frequency == 'weekly' else 14
# How many full periods have passed since the anchor?
days_since_anchor = (reference_date - anchor).days
if days_since_anchor < 0:
# reference_date is before the anchor — use the first period
return (anchor, anchor + datetime.timedelta(days=period_days - 1))
periods_passed = days_since_anchor // period_days
period_start = anchor + datetime.timedelta(days=periods_passed * period_days)
period_end = period_start + datetime.timedelta(days=period_days - 1)
return (period_start, period_end)
# === MONTHLY ===
# Step through calendar months from the anchor's day-of-month.
# E.g., anchor = Jan 15 means periods are: Jan 15Feb 14, Feb 15Mar 14, etc.
elif team.pay_frequency == 'monthly':
anchor_day = anchor.day
current_start = anchor
# Walk forward month by month until we find the period containing today
for _ in range(120): # Safety limit — 10 years of months
if current_start.month == 12:
next_month, next_year = 1, current_start.year + 1
else:
next_month, next_year = current_start.month + 1, current_start.year
# Clamp anchor day to the max days in that month (e.g., 31 → 28 for Feb)
max_day = cal_module.monthrange(next_year, next_month)[1]
next_start = datetime.date(next_year, next_month, min(anchor_day, max_day))
current_end = next_start - datetime.timedelta(days=1)
if reference_date <= current_end:
return (current_start, current_end)
current_start = next_start
return (None, None)
# === HOME DASHBOARD ===
# The main page users see after logging in. Shows different content
# depending on whether the user is an admin or supervisor.
@login_required
def index(request):
user = request.user
if is_admin(user):
# --- ADMIN DASHBOARD ---
# Calculate total value of unpaid work and break it down by project.
# A WorkLog is "unpaid for worker X" if no PayrollRecord links BOTH
# that log AND that worker. This handles partially-paid logs where
# some workers have been paid but others haven't.
all_worklogs = WorkLog.objects.select_related(
'project'
).prefetch_related('workers', 'payroll_records')
# === OUTSTANDING BREAKDOWN ===
# Track unpaid wages and adjustments separately so the dashboard
# can show a clear breakdown of what makes up the total.
unpaid_wages = Decimal('0.00') # Pure daily rates for unpaid workers
pending_adjustments_add = Decimal('0.00') # Unpaid additive adjustments (bonuses, overtime, etc.)
pending_adjustments_sub = Decimal('0.00') # Unpaid deductive adjustments (deductions, repayments)
outstanding_by_project = {}
for wl in all_worklogs:
# Get the set of worker IDs that have been paid for this log
paid_worker_ids = {pr.worker_id for pr in wl.payroll_records.all()}
project_name = wl.project.name
for worker in wl.workers.all():
if worker.id not in paid_worker_ids:
cost = worker.daily_rate
unpaid_wages += cost
if project_name not in outstanding_by_project:
outstanding_by_project[project_name] = Decimal('0.00')
outstanding_by_project[project_name] += cost
# Also include unpaid payroll adjustments (bonuses, deductions, etc.)
# Additive types (Bonus, Overtime, New Loan) increase outstanding.
# Deductive types (Deduction, Loan Repayment, Advance Repayment) decrease it.
unpaid_adjustments = PayrollAdjustment.objects.filter(
payroll_record__isnull=True
).select_related('project')
for adj in unpaid_adjustments:
project_name = adj.project.name if adj.project else 'No Project'
if project_name not in outstanding_by_project:
outstanding_by_project[project_name] = Decimal('0.00')
if adj.type in ADDITIVE_TYPES:
pending_adjustments_add += adj.amount
outstanding_by_project[project_name] += adj.amount
elif adj.type in DEDUCTIVE_TYPES:
pending_adjustments_sub += adj.amount
outstanding_by_project[project_name] -= adj.amount
# Net total = wages + additions - deductions (same result as before, just tracked separately)
outstanding_payments = unpaid_wages + pending_adjustments_add - pending_adjustments_sub
# Sum total paid out in the last 60 days
sixty_days_ago = timezone.now().date() - timezone.timedelta(days=60)
paid_this_month = PayrollRecord.objects.filter(
date__gte=sixty_days_ago
).aggregate(total=Sum('amount_paid'))['total'] or Decimal('0.00')
# Count and total balance of active loans
active_loans_qs = Loan.objects.filter(active=True)
active_loans_count = active_loans_qs.count()
active_loans_balance = active_loans_qs.aggregate(
total=Sum('remaining_balance')
)['total'] or Decimal('0.00')
# This week summary
start_of_week = timezone.now().date() - timezone.timedelta(
days=timezone.now().date().weekday()
)
this_week_logs = WorkLog.objects.filter(date__gte=start_of_week).count()
# Recent activity — last 5 work logs
recent_activity = WorkLog.objects.select_related(
'project', 'supervisor'
).prefetch_related('workers').order_by('-date', '-id')[:5]
# All workers, projects, and teams for the Manage Resources tab.
# The template uses a JS filter bar (Active / Inactive / All) to show/hide
# rows based on data-active attribute — defaults to showing only active items.
workers = Worker.objects.all().order_by('name')
projects = Project.objects.all().order_by('name')
teams = Team.objects.all().order_by('name')
context = {
'is_admin': True,
'outstanding_payments': outstanding_payments,
'unpaid_wages': unpaid_wages,
'pending_adjustments_add': pending_adjustments_add,
'pending_adjustments_sub': pending_adjustments_sub,
'paid_this_month': paid_this_month,
'active_loans_count': active_loans_count,
'active_loans_balance': active_loans_balance,
'outstanding_by_project': outstanding_by_project,
'this_week_logs': this_week_logs,
'recent_activity': recent_activity,
'workers': workers,
'projects': projects,
'teams': teams,
}
return render(request, 'core/index.html', context)
else:
# --- SUPERVISOR DASHBOARD ---
# Count projects this supervisor is assigned to
my_projects_count = user.assigned_projects.filter(active=True).count()
# Count teams this supervisor manages
my_teams_count = user.supervised_teams.filter(active=True).count()
# Count unique workers across all their teams
my_workers_count = Worker.objects.filter(
active=True,
teams__supervisor=user,
teams__active=True
).distinct().count()
# This week summary — only their own logs
start_of_week = timezone.now().date() - timezone.timedelta(
days=timezone.now().date().weekday()
)
this_week_logs = WorkLog.objects.filter(
date__gte=start_of_week, supervisor=user
).count()
# Their last 5 work logs
recent_activity = WorkLog.objects.filter(
supervisor=user
).select_related('project').prefetch_related('workers').order_by('-date', '-id')[:5]
context = {
'is_admin': False,
'my_projects_count': my_projects_count,
'my_teams_count': my_teams_count,
'my_workers_count': my_workers_count,
'this_week_logs': this_week_logs,
'recent_activity': recent_activity,
}
return render(request, 'core/index.html', context)
# === ATTENDANCE LOGGING ===
# This is where supervisors log which workers showed up to work each day.
# Supports logging a single day or a date range (e.g. a whole week).
# Includes conflict detection to prevent double-logging workers.
@login_required
def attendance_log(request):
user = request.user
if request.method == 'POST':
form = AttendanceLogForm(request.POST, user=user)
if form.is_valid():
start_date = form.cleaned_data['date']
end_date = form.cleaned_data.get('end_date') or start_date
include_saturday = form.cleaned_data.get('include_saturday', False)
include_sunday = form.cleaned_data.get('include_sunday', False)
project = form.cleaned_data['project']
team = form.cleaned_data.get('team')
workers = form.cleaned_data['workers']
overtime_amount = form.cleaned_data['overtime_amount']
notes = form.cleaned_data.get('notes', '')
# --- Build list of dates to log ---
# Go through each day from start to end, skipping weekends
# unless the user checked the "Include Saturday/Sunday" boxes
dates_to_log = []
current_date = start_date
while current_date <= end_date:
day_of_week = current_date.weekday() # 0=Mon, 5=Sat, 6=Sun
if day_of_week == 5 and not include_saturday:
current_date += datetime.timedelta(days=1)
continue
if day_of_week == 6 and not include_sunday:
current_date += datetime.timedelta(days=1)
continue
dates_to_log.append(current_date)
current_date += datetime.timedelta(days=1)
if not dates_to_log:
messages.warning(request, 'No valid dates in the selected range.')
# Still need team_workers_json for the JS even on error re-render
tw_map = {}
for t in Team.objects.filter(active=True).prefetch_related('workers'):
tw_map[t.id] = list(t.workers.filter(active=True).values_list('id', flat=True))
return render(request, 'core/attendance_log.html', {
'form': form,
'is_admin': is_admin(user),
'team_workers_json': json.dumps(tw_map),
})
# --- Conflict detection ---
# Check if any selected workers already have a WorkLog on any of these dates
worker_ids = list(workers.values_list('id', flat=True))
existing_logs = WorkLog.objects.filter(
date__in=dates_to_log,
workers__id__in=worker_ids
).prefetch_related('workers').select_related('project')
conflicts = []
for log in existing_logs:
for w in log.workers.all():
if w.id in worker_ids:
conflicts.append({
'worker_name': w.name,
'date': log.date,
'project_name': log.project.name,
})
# If there are conflicts and the user hasn't chosen what to do yet
conflict_action = request.POST.get('conflict_action', '')
if conflicts and not conflict_action:
# Show the conflict warning — let user choose Skip or Overwrite
# Still need team_workers_json for the JS even on conflict re-render
tw_map = {}
for t in Team.objects.filter(active=True).prefetch_related('workers'):
tw_map[t.id] = list(t.workers.filter(active=True).values_list('id', flat=True))
# Pass the selected worker IDs explicitly for the conflict
# re-submission forms. We can't use form.data.workers in the
# template because QueryDict.__getitem__ returns only the last
# value, losing all other selections for multi-value fields.
selected_worker_ids = request.POST.getlist('workers')
return render(request, 'core/attendance_log.html', {
'form': form,
'conflicts': conflicts,
'is_admin': is_admin(user),
'team_workers_json': json.dumps(tw_map),
'selected_worker_ids': selected_worker_ids,
})
# --- Create work logs ---
created_count = 0
skipped_count = 0
for log_date in dates_to_log:
# Check which workers already have a log on this date
workers_with_existing = set(
WorkLog.objects.filter(
date=log_date,
workers__id__in=worker_ids
).values_list('workers__id', flat=True)
)
if conflict_action == 'overwrite':
# Remove conflicting workers from their existing logs
conflicting_logs = WorkLog.objects.filter(
date=log_date,
workers__id__in=worker_ids
)
for existing_log in conflicting_logs:
for w_id in worker_ids:
existing_log.workers.remove(w_id)
workers_to_add = workers
elif conflict_action == 'skip':
# Skip workers who already have logs on this date
workers_to_add = workers.exclude(id__in=workers_with_existing)
skipped_count += len(workers_with_existing & set(worker_ids))
else:
# No conflicts, or first submission — add all workers
workers_to_add = workers
if workers_to_add.exists():
# Create the WorkLog record
work_log = WorkLog.objects.create(
date=log_date,
project=project,
team=team,
supervisor=user, # Auto-set to logged-in user
overtime_amount=overtime_amount,
notes=notes,
)
work_log.workers.set(workers_to_add)
created_count += 1
# Show success message
if created_count > 0:
msg = f'Successfully created {created_count} work log(s).'
if skipped_count > 0:
msg += f' Skipped {skipped_count} conflicts.'
messages.success(request, msg)
else:
messages.warning(request, 'No work logs created — all entries were conflicts.')
return redirect('home')
else:
# Don't pre-fill the start date — force the user to pick one
# so they don't accidentally log work on the wrong day
form = AttendanceLogForm(user=user)
# Build a list of worker data for the estimated cost JavaScript
# (admins only — supervisors don't see the cost card)
worker_rates = {}
if is_admin(user):
for w in Worker.objects.filter(active=True):
worker_rates[str(w.id)] = str(w.daily_rate)
# Build team→workers mapping so the JS can auto-check workers when a
# team is selected from the dropdown. Key = team ID, Value = list of worker IDs.
team_workers_map = {}
teams_qs = Team.objects.filter(active=True).prefetch_related('workers')
if not is_admin(user):
# Supervisors only see their own teams
teams_qs = teams_qs.filter(supervisor=user)
for team in teams_qs:
active_worker_ids = list(
team.workers.filter(active=True).values_list('id', flat=True)
)
team_workers_map[team.id] = active_worker_ids
return render(request, 'core/attendance_log.html', {
'form': form,
'is_admin': is_admin(user),
'worker_rates_json': worker_rates,
'team_workers_json': json.dumps(team_workers_map),
})
# === WORK LOG HISTORY ===
# Shows work logs in two modes: a table list or a monthly calendar grid.
# Supervisors only see their own projects. Admins see everything.
# The calendar view groups logs by day and lets you click a day to see details.
@login_required
def work_history(request):
user = request.user
# Start with base queryset
if is_admin(user):
logs = WorkLog.objects.all()
else:
# Supervisors only see logs for their projects
logs = WorkLog.objects.filter(
Q(supervisor=user) | Q(project__supervisors=user)
).distinct()
# --- Filters ---
# Read filter values from the URL query string.
# Validate numeric params to prevent 500 errors from bad/malformed URLs.
worker_filter = request.GET.get('worker', '')
project_filter = request.GET.get('project', '')
status_filter = request.GET.get('status', '')
# Validate: worker and project must be numeric IDs (or empty)
try:
worker_filter = str(int(worker_filter)) if worker_filter else ''
except (ValueError, TypeError):
worker_filter = ''
try:
project_filter = str(int(project_filter)) if project_filter else ''
except (ValueError, TypeError):
project_filter = ''
# Count total logs BEFORE filtering (so we can show "X of Y" to the user)
total_log_count = logs.count()
if worker_filter:
logs = logs.filter(workers__id=worker_filter).distinct()
if project_filter:
logs = logs.filter(project__id=project_filter)
if status_filter == 'paid':
# "Paid" = has at least one PayrollRecord linked
logs = logs.filter(payroll_records__isnull=False).distinct()
elif status_filter == 'unpaid':
# "Unpaid" = has no PayrollRecord linked
logs = logs.filter(payroll_records__isnull=True)
# Track whether any filter is active (for showing feedback in the template)
has_active_filters = bool(worker_filter or project_filter or status_filter)
# Count filtered results BEFORE adding joins (more efficient SQL)
filtered_log_count = logs.count() if has_active_filters else 0
# If filtering by worker, look up the Worker object so the template can
# show just that worker's name instead of all workers on the log.
filtered_worker_obj = None
if worker_filter:
filtered_worker_obj = Worker.objects.filter(id=worker_filter).first()
# Add related data and order by date (newest first)
logs = logs.select_related(
'project', 'supervisor'
).prefetch_related('workers', 'payroll_records').order_by('-date', '-id')
# Get filter options for the dropdowns
if is_admin(user):
filter_workers = Worker.objects.filter(active=True).order_by('name')
filter_projects = Project.objects.filter(active=True).order_by('name')
else:
supervised_teams = Team.objects.filter(supervisor=user, active=True)
filter_workers = Worker.objects.filter(
active=True, teams__in=supervised_teams
).distinct().order_by('name')
filter_projects = Project.objects.filter(
active=True, supervisors=user
).order_by('name')
# --- View mode: list or calendar ---
view_mode = request.GET.get('view', 'list')
today = timezone.now().date()
# Build a query string that preserves all current filters
# (used by the List/Calendar toggle links to keep filters when switching)
filter_params = ''
if worker_filter:
filter_params += '&worker=' + worker_filter
if project_filter:
filter_params += '&project=' + project_filter
if status_filter:
filter_params += '&status=' + status_filter
context = {
'logs': logs,
'filter_workers': filter_workers,
'filter_projects': filter_projects,
'selected_worker': worker_filter,
'selected_project': project_filter,
'selected_status': status_filter,
'is_admin': is_admin(user),
'view_mode': view_mode,
'filter_params': filter_params,
'has_active_filters': has_active_filters,
'total_log_count': total_log_count,
'filtered_log_count': filtered_log_count,
'filtered_worker_obj': filtered_worker_obj,
}
# === CALENDAR MODE ===
# Build a monthly grid of days, each containing the work logs for that day.
# Also build a JSON object keyed by date string for the JavaScript
# click-to-see-details panel.
if view_mode == 'calendar':
# Get target month from URL (default: current month)
try:
target_year = int(request.GET.get('year', today.year))
target_month = int(request.GET.get('month', today.month))
if not (1 <= target_month <= 12):
target_year, target_month = today.year, today.month
except (ValueError, TypeError):
target_year, target_month = today.year, today.month
# Build the calendar grid using Python's calendar module.
# monthdatescalendar() returns a list of weeks, where each week is
# a list of 7 datetime.date objects (including overflow from prev/next month).
cal = cal_module.Calendar(firstweekday=0) # Week starts on Monday
month_dates = cal.monthdatescalendar(target_year, target_month)
# Get the full date range for the calendar grid (includes overflow days)
first_display_date = month_dates[0][0]
last_display_date = month_dates[-1][-1]
# Filter logs to only this date range (improves performance)
month_logs = logs.filter(date__range=[first_display_date, last_display_date])
# Group logs by date string for quick lookup
logs_by_date = {}
for log in month_logs:
date_key = log.date.isoformat()
if date_key not in logs_by_date:
logs_by_date[date_key] = []
logs_by_date[date_key].append(log)
# Build the calendar_weeks structure that the template iterates over.
# Each day cell has: date, day number, whether it's the current month,
# a list of log objects, and a count badge number.
calendar_weeks = []
for week in month_dates:
week_data = []
for day in week:
date_key = day.isoformat()
day_logs = logs_by_date.get(date_key, [])
week_data.append({
'date': day,
'day': day.day,
'is_current_month': day.month == target_month,
'is_today': day == today,
'records': day_logs,
'count': len(day_logs),
})
calendar_weeks.append(week_data)
# Build detail data for JavaScript — when you click a day cell,
# the JS reads this JSON to populate the detail panel below the calendar.
# NOTE: Pass raw Python dict, not json.dumps() — the template's
# |json_script filter handles serialization.
#
# IMPORTANT: When a worker filter is active, log.workers.all() would
# still return ALL workers on that WorkLog (not just the filtered one).
# We need to narrow the displayed workers to match the filter.
calendar_detail = {}
for date_key, day_logs in logs_by_date.items():
calendar_detail[date_key] = []
for log in day_logs:
# Get the workers to show — if filtering by worker,
# only show that worker (not everyone else on the log)
if worker_filter:
display_workers = [
w for w in log.workers.all()
if str(w.id) == worker_filter
]
else:
display_workers = list(log.workers.all())
entry = {
'project': log.project.name,
'workers': [w.name for w in display_workers],
'supervisor': (
log.supervisor.get_full_name() or log.supervisor.username
) if log.supervisor else '-',
'notes': log.notes or '',
'is_paid': log.payroll_records.exists(),
'overtime': log.get_overtime_amount_display() if log.overtime_amount > 0 else '',
}
# Only show cost data to admins — use filtered workers for amount
if is_admin(user):
entry['amount'] = float(
sum(w.daily_rate for w in display_workers)
)
calendar_detail[date_key].append(entry)
# Calculate previous/next month for navigation arrows
if target_month == 1:
prev_year, prev_month = target_year - 1, 12
else:
prev_year, prev_month = target_year, target_month - 1
if target_month == 12:
next_year, next_month = target_year + 1, 1
else:
next_year, next_month = target_year, target_month + 1
month_name = datetime.date(target_year, target_month, 1).strftime('%B %Y')
context.update({
'calendar_weeks': calendar_weeks,
'calendar_detail': calendar_detail,
'curr_year': target_year,
'curr_month': target_month,
'month_name': month_name,
'prev_year': prev_year,
'prev_month': prev_month,
'next_year': next_year,
'next_month': next_month,
})
return render(request, 'core/work_history.html', context)
# === CSV EXPORT ===
# Downloads the filtered work log history as a CSV file.
# Uses the same filters as the work_history page.
@login_required
def export_work_log_csv(request):
user = request.user
# Build the same queryset as work_history, using the same filters
if is_admin(user):
logs = WorkLog.objects.all()
else:
logs = WorkLog.objects.filter(
Q(supervisor=user) | Q(project__supervisors=user)
).distinct()
worker_filter = request.GET.get('worker', '')
project_filter = request.GET.get('project', '')
status_filter = request.GET.get('status', '')
if worker_filter:
logs = logs.filter(workers__id=worker_filter).distinct()
if project_filter:
logs = logs.filter(project__id=project_filter)
if status_filter == 'paid':
logs = logs.filter(payroll_records__isnull=False).distinct()
elif status_filter == 'unpaid':
logs = logs.filter(payroll_records__isnull=True)
logs = logs.select_related(
'project', 'supervisor'
).prefetch_related('workers', 'payroll_records').order_by('-date', '-id')
# Create the CSV response
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="work_log_history.csv"'
writer = csv.writer(response)
writer.writerow(['Date', 'Project', 'Workers', 'Overtime', 'Payment Status', 'Supervisor'])
for log in logs:
worker_names = ', '.join(w.name for w in log.workers.all())
payment_status = 'Paid' if log.payroll_records.exists() else 'Unpaid'
overtime_display = log.get_overtime_amount_display() if log.overtime_amount > 0 else 'None'
supervisor_name = log.supervisor.get_full_name() or log.supervisor.username if log.supervisor else '-'
writer.writerow([
log.date.strftime('%Y-%m-%d'),
log.project.name,
worker_names,
overtime_display,
payment_status,
supervisor_name,
])
return response
# === EXPORT WORKERS CSV ===
# Downloads all worker data as a CSV file for use in spreadsheets.
# Admin-only — supervisors don't have access to salary/ID data.
@login_required
def export_workers_csv(request):
"""Export all workers to CSV — includes name, ID, phone, salary, daily rate, status."""
if not is_admin(request.user):
return HttpResponseForbidden("Admin access required.")
workers = Worker.objects.all().order_by('name')
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="workers.csv"'
writer = csv.writer(response)
writer.writerow([
'Name', 'ID Number', 'Phone Number', 'Monthly Salary',
'Daily Rate', 'Employment Date', 'Active', 'Notes'
])
for w in workers:
writer.writerow([
w.name,
w.id_number,
w.phone_number,
f'{w.monthly_salary:.2f}',
f'{w.daily_rate:.2f}',
w.employment_date.strftime('%Y-%m-%d') if w.employment_date else '',
'Yes' if w.active else 'No',
w.notes,
])
return response
# === TOGGLE RESOURCE STATUS (AJAX) ===
# Called by the toggle switches on the dashboard to activate/deactivate
# workers, projects, or teams without reloading the page.
@login_required
def toggle_active(request, model_name, item_id):
if request.method != 'POST':
return HttpResponseForbidden("Only POST requests are allowed.")
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
# Map URL parameter to the correct model class
model_map = {
'worker': Worker,
'project': Project,
'team': Team
}
if model_name not in model_map:
return JsonResponse({'error': 'Invalid model'}, status=400)
model = model_map[model_name]
try:
item = model.objects.get(id=item_id)
item.active = not item.active
item.save()
return JsonResponse({
'status': 'success',
'active': item.active,
'message': f'{item.name} is now {"active" if item.active else "inactive"}.'
})
except model.DoesNotExist:
return JsonResponse({'error': 'Item not found'}, status=404)
# =============================================================================
# === PAYROLL DASHBOARD ===
# The main payroll page. Shows per-worker breakdown of what's owed,
# adjustment management, payment processing, and Chart.js analytics.
# Admin-only — supervisors cannot access this page.
# =============================================================================
@login_required
def payroll_dashboard(request):
if not is_admin(request.user):
messages.error(request, 'Only admins can access the payroll dashboard.')
return redirect('home')
status_filter = request.GET.get('status', 'pending')
# --- Per-worker pending payment data ---
# For each active worker, calculate: unpaid days × daily_rate + net adjustments
active_workers = Worker.objects.filter(active=True).prefetch_related(
Prefetch('work_logs', queryset=WorkLog.objects.prefetch_related(
'payroll_records', 'priced_workers'
).select_related('project')),
Prefetch('adjustments', queryset=PayrollAdjustment.objects.filter(
payroll_record__isnull=True
).select_related('project', 'loan', 'work_log'),
to_attr='pending_adjustments_list'),
).order_by('name')
workers_data = []
outstanding_total = Decimal('0.00')
# === OUTSTANDING BREAKDOWN (same as home dashboard) ===
unpaid_wages_total = Decimal('0.00') # Pure daily rates for unpaid workers
pending_adj_add_total = Decimal('0.00') # Unpaid additive adjustments
pending_adj_sub_total = Decimal('0.00') # Unpaid deductive adjustments
all_ot_data = [] # For the Price Overtime modal
for worker in active_workers:
# Find unpaid work logs for this worker.
# A log is "unpaid for this worker" if no PayrollRecord links
# to BOTH this log AND this worker.
unpaid_logs = []
for log in worker.work_logs.all():
paid_worker_ids = {pr.worker_id for pr in log.payroll_records.all()}
if worker.id not in paid_worker_ids:
unpaid_logs.append(log)
log_count = len(unpaid_logs)
log_amount = log_count * worker.daily_rate
# Find unpriced overtime in unpaid logs
ot_data_worker = []
for log in unpaid_logs:
if log.overtime_amount > 0:
priced_ids = {w.id for w in log.priced_workers.all()}
if worker.id not in priced_ids:
ot_entry = {
'worker_id': worker.id,
'worker_name': worker.name,
'log_id': log.id,
'date': log.date.strftime('%Y-%m-%d'),
'project': log.project.name,
'overtime': float(log.overtime_amount),
'ot_label': log.get_overtime_amount_display(),
}
ot_data_worker.append(ot_entry)
all_ot_data.append(ot_entry)
# Calculate net adjustment amount
pending_adjs = worker.pending_adjustments_list
adj_total = Decimal('0.00')
worker_adj_add = Decimal('0.00')
worker_adj_sub = Decimal('0.00')
for adj in pending_adjs:
if adj.type in ADDITIVE_TYPES:
adj_total += adj.amount
worker_adj_add += adj.amount
elif adj.type in DEDUCTIVE_TYPES:
adj_total -= adj.amount
worker_adj_sub += adj.amount
total_payable = log_amount + adj_total
# Only include workers who have something pending
if log_count > 0 or pending_adjs:
# --- Overdue detection ---
# A worker is "overdue" if they have unpaid work from a completed pay period.
# Uses their team's pay schedule to determine the cutoff date.
team = get_worker_active_team(worker)
team_name = team.name if team else ''
earliest_unpaid = min((l.date for l in unpaid_logs), default=None) if unpaid_logs else None
is_overdue = False
if earliest_unpaid and team and team.pay_frequency and team.pay_start_date:
period_start, period_end = get_pay_period(team)
if period_start:
cutoff = period_start - datetime.timedelta(days=1)
is_overdue = earliest_unpaid <= cutoff
has_loan = Loan.objects.filter(worker=worker, active=True).exists()
workers_data.append({
'worker': worker,
'unpaid_count': log_count,
'unpaid_amount': log_amount,
'adj_amount': adj_total,
'total_payable': total_payable,
'adjustments': pending_adjs,
'logs': unpaid_logs,
'ot_data': ot_data_worker,
'day_rate': float(worker.daily_rate),
'team_name': team_name,
'is_overdue': is_overdue,
'has_loan': has_loan,
'earliest_unpaid': earliest_unpaid,
})
outstanding_total += max(total_payable, Decimal('0.00'))
unpaid_wages_total += log_amount
pending_adj_add_total += worker_adj_add
pending_adj_sub_total += worker_adj_sub
# --- Payment history ---
paid_records = PayrollRecord.objects.select_related(
'worker'
).order_by('-date', '-id')
# --- Recent payments total (last 60 days) ---
sixty_days_ago = timezone.now().date() - timezone.timedelta(days=60)
recent_payments_total = PayrollRecord.objects.filter(
date__gte=sixty_days_ago
).aggregate(total=Sum('amount_paid'))['total'] or Decimal('0.00')
# --- Outstanding cost per project ---
# Check per-worker: a WorkLog is "unpaid for worker X" if no PayrollRecord
# links BOTH that log AND that worker. This handles partially-paid logs.
outstanding_project_costs = []
for project in Project.objects.filter(active=True):
project_outstanding = Decimal('0.00')
# Unpaid work log costs — check each worker individually
for log in project.work_logs.prefetch_related('payroll_records', 'workers').all():
paid_worker_ids = {pr.worker_id for pr in log.payroll_records.all()}
for w in log.workers.all():
if w.id not in paid_worker_ids:
project_outstanding += w.daily_rate
# Unpaid adjustments for this project
unpaid_adjs = PayrollAdjustment.objects.filter(
payroll_record__isnull=True
).filter(Q(project=project) | Q(work_log__project=project))
for adj in unpaid_adjs:
if adj.type in ADDITIVE_TYPES:
project_outstanding += adj.amount
elif adj.type in DEDUCTIVE_TYPES:
project_outstanding -= adj.amount
if project_outstanding != 0:
outstanding_project_costs.append({
'name': project.name,
'cost': project_outstanding,
})
# --- Chart data: last 6 months ---
today = timezone.now().date()
chart_months = []
for i in range(5, -1, -1):
m = today.month - i
y = today.year
while m <= 0:
m += 12
y -= 1
chart_months.append((y, m))
chart_labels = [
datetime.date(y, m, 1).strftime('%b %Y') for y, m in chart_months
]
# Monthly payroll totals
paid_by_month_qs = PayrollRecord.objects.annotate(
month=TruncMonth('date')
).values('month').annotate(total=Sum('amount_paid')).order_by('month')
paid_by_month = {
(r['month'].year, r['month'].month): float(r['total'])
for r in paid_by_month_qs
}
chart_totals = [paid_by_month.get((y, m), 0) for y, m in chart_months]
# Per-project monthly costs (for stacked bar chart)
project_chart_data = []
for project in Project.objects.filter(active=True):
monthly_data = []
for y, m in chart_months:
month_cost = Decimal('0.00')
month_logs = project.work_logs.filter(
date__year=y, date__month=m
).prefetch_related('workers')
for log in month_logs:
for w in log.workers.all():
month_cost += w.daily_rate
# Include paid adjustments for this project in this month
paid_adjs = PayrollAdjustment.objects.filter(
payroll_record__isnull=False,
date__year=y, date__month=m,
).filter(Q(project=project) | Q(work_log__project=project))
for adj in paid_adjs:
if adj.type in ADDITIVE_TYPES:
month_cost += adj.amount
elif adj.type in DEDUCTIVE_TYPES:
month_cost -= adj.amount
monthly_data.append(float(month_cost))
if any(v > 0 for v in monthly_data):
project_chart_data.append({
'name': project.name,
'data': monthly_data,
})
# === CHART DATA: Per-Worker Monthly Breakdown ===
# Pre-compute payment breakdown for each active worker over the last 6 months.
# This powers the "By Worker" toggle on the Monthly Payroll Totals chart.
# Only ~14 workers x 6 months = tiny dataset, so we embed it all as JSON
# and switching between workers is instant (no server round-trips).
# Starting date for the 6-month window (first day of the oldest chart month)
six_months_ago_date = datetime.date(chart_months[0][0], chart_months[0][1], 1)
# Query 1: Total amount paid per worker per month.
# Uses database-level grouping — one query for ALL workers at once.
worker_monthly_paid_qs = PayrollRecord.objects.filter(
worker__active=True,
date__gte=six_months_ago_date,
).values(
'worker_id',
month=TruncMonth('date'),
).annotate(total=Sum('amount_paid'))
# Build a fast lookup dict: {(worker_id, year, month): total_paid}
worker_paid_lookup = {}
for row in worker_monthly_paid_qs:
key = (row['worker_id'], row['month'].year, row['month'].month)
worker_paid_lookup[key] = float(row['total'])
# Query 2: Paid adjustment totals grouped by worker, type, and month.
# "Paid" means the adjustment has a linked PayrollRecord.
# We group by the PayrollRecord's date (not the adjustment date)
# so it lines up with when the payment actually happened.
worker_monthly_adj_qs = PayrollAdjustment.objects.filter(
payroll_record__isnull=False,
worker__active=True,
payroll_record__date__gte=six_months_ago_date,
).values(
'worker_id',
'type',
month=TruncMonth('payroll_record__date'),
).annotate(total=Sum('amount'))
# Build a fast lookup dict: {(worker_id, year, month, type): total_amount}
worker_adj_lookup = {}
for row in worker_monthly_adj_qs:
key = (row['worker_id'], row['month'].year, row['month'].month, row['type'])
worker_adj_lookup[key] = float(row['total'])
# Build the final data structure for JavaScript.
# For each worker with payment history, create 6 monthly entries showing
# how their pay breaks down into base pay, overtime, bonuses, etc.
#
# Base pay is reverse-engineered from the net total:
# amount_paid = base + overtime + bonus + new_loan - deduction - loan_repayment - advance
# So: base = amount_paid - overtime - bonus - new_loan + deduction + loan_repayment + advance
worker_chart_data = {}
for worker in Worker.objects.filter(active=True).order_by('name'):
months_data = []
has_any_data = False
for y, m in chart_months:
total_paid = worker_paid_lookup.get((worker.id, y, m), 0)
overtime = worker_adj_lookup.get((worker.id, y, m, 'Overtime'), 0)
bonus = worker_adj_lookup.get((worker.id, y, m, 'Bonus'), 0)
new_loan = worker_adj_lookup.get((worker.id, y, m, 'New Loan'), 0)
deduction = worker_adj_lookup.get((worker.id, y, m, 'Deduction'), 0)
loan_repayment = worker_adj_lookup.get((worker.id, y, m, 'Loan Repayment'), 0)
advance = worker_adj_lookup.get((worker.id, y, m, 'Advance Payment'), 0)
# Reverse-engineer base pay from the net total
base_pay = total_paid - overtime - bonus - new_loan + deduction + loan_repayment + advance
# Clamp to zero — a negative base can happen if adjustments exceed day-rate earnings
base_pay = max(base_pay, 0)
if total_paid > 0:
has_any_data = True
months_data.append({
'base': round(base_pay, 2),
'overtime': round(overtime, 2),
'bonus': round(bonus, 2),
'new_loan': round(new_loan, 2),
'deduction': round(deduction, 2),
'loan_repayment': round(loan_repayment, 2),
'advance': round(advance, 2),
'total': round(total_paid, 2),
})
# Only include workers who actually received at least one payment
if has_any_data:
worker_chart_data[str(worker.id)] = {
'name': worker.name,
'months': months_data,
}
# --- Loans ---
loan_filter = request.GET.get('loan_status', 'active')
if loan_filter == 'history':
loans = Loan.objects.filter(active=False).select_related('worker').order_by('-date')
else:
loans = Loan.objects.filter(active=True).select_related('worker').order_by('-date')
# Total active loan balance (always shown in analytics card, regardless of tab)
active_loans = Loan.objects.filter(active=True)
active_loans_count = active_loans.count()
active_loans_balance = active_loans.aggregate(
total=Sum('remaining_balance')
)['total'] or Decimal('0.00')
# --- Active projects and workers for modal dropdowns ---
active_projects = Project.objects.filter(active=True).order_by('name')
all_workers = Worker.objects.filter(active=True).order_by('name')
all_teams = Team.objects.filter(active=True).prefetch_related('workers').order_by('name')
# Team-workers map for auto-selecting workers when a team is picked
team_workers_map = {}
for team in all_teams:
team_workers_map[str(team.id)] = list(
team.workers.filter(active=True).values_list('id', flat=True)
)
# NOTE: Pass raw Python objects here, NOT json.dumps() strings.
# The template uses Django's |json_script filter which handles
# JSON serialization. If we pre-serialize with json.dumps(), the
# filter double-encodes the data and JavaScript receives strings
# instead of arrays/objects, which crashes the entire script.
context = {
'workers_data': workers_data,
'paid_records': paid_records,
'outstanding_total': outstanding_total,
'unpaid_wages_total': unpaid_wages_total,
'pending_adj_add_total': pending_adj_add_total,
'pending_adj_sub_total': pending_adj_sub_total,
'recent_payments_total': recent_payments_total,
'outstanding_project_costs': outstanding_project_costs,
'active_tab': status_filter,
'all_workers': all_workers,
'all_teams': all_teams,
'team_workers_map_json': team_workers_map,
'adjustment_types': PayrollAdjustment.TYPE_CHOICES,
'active_projects': active_projects,
'loans': loans,
'loan_filter': loan_filter,
'chart_labels_json': chart_labels,
'chart_totals_json': chart_totals,
'project_chart_json': project_chart_data,
'worker_chart_json': worker_chart_data,
'overtime_data_json': all_ot_data,
'today': today, # For pre-filling date fields in modals
'active_loans_count': active_loans_count,
'active_loans_balance': active_loans_balance,
}
return render(request, 'core/payroll_dashboard.html', context)
# =============================================================================
# === SINGLE PAYMENT HELPER ===
# Core payment logic used by both individual payments and batch payments.
# Locks the worker row, creates a PayrollRecord, links logs/adjustments,
# and handles loan repayment deductions — all inside an atomic transaction.
# =============================================================================
def _process_single_payment(worker_id, selected_log_ids=None, selected_adj_ids=None):
"""
Process payment for one worker inside an atomic transaction.
Returns (payroll_record, log_count, logs_amount) on success, or None if nothing to pay.
- worker_id: the Worker's PK
- selected_log_ids: list of WorkLog IDs to include (None = all unpaid)
- selected_adj_ids: list of PayrollAdjustment IDs to include (None = all pending)
"""
with transaction.atomic():
# Lock this worker's row — any concurrent request for the same
# worker will wait here until this transaction commits.
worker = Worker.objects.select_for_update().get(id=worker_id)
# Get unpaid logs, filter to selected if IDs provided
all_unpaid_logs = worker.work_logs.exclude(payroll_records__worker=worker)
if selected_log_ids:
unpaid_logs = all_unpaid_logs.filter(id__in=selected_log_ids)
else:
unpaid_logs = all_unpaid_logs
log_count = unpaid_logs.count()
logs_amount = log_count * worker.daily_rate
# Get pending adjustments, filter to selected if IDs provided
all_pending_adjs = list(worker.adjustments.filter(payroll_record__isnull=True))
if selected_adj_ids:
selected_adj_set = set(selected_adj_ids)
pending_adjs = [a for a in all_pending_adjs if a.id in selected_adj_set]
else:
pending_adjs = all_pending_adjs
# Nothing to pay — already paid or nothing owed
if log_count == 0 and not pending_adjs:
return None
# Calculate net adjustment
adj_amount = Decimal('0.00')
for adj in pending_adjs:
if adj.type in ADDITIVE_TYPES:
adj_amount += adj.amount
elif adj.type in DEDUCTIVE_TYPES:
adj_amount -= adj.amount
total_amount = logs_amount + adj_amount
# Create the PayrollRecord
payroll_record = PayrollRecord.objects.create(
worker=worker,
amount_paid=total_amount,
date=timezone.now().date(),
)
# Link work logs to this payment
payroll_record.work_logs.set(unpaid_logs)
# Link adjustments + handle loan repayments
for adj in pending_adjs:
adj.payroll_record = payroll_record
adj.save()
# If this is a loan or advance repayment, deduct from the balance
if adj.type in ('Loan Repayment', 'Advance Repayment') and adj.loan:
adj.loan.remaining_balance -= adj.amount
if adj.loan.remaining_balance <= 0:
adj.loan.remaining_balance = Decimal('0.00')
adj.loan.active = False
# === ADVANCE-TO-LOAN CONVERSION ===
# If an advance was only partially repaid, the remainder is
# now a regular loan. Change the type so it shows under
# "Loans" in the Loans tab and uses "Loan Repayment" going forward.
elif adj.type == 'Advance Repayment' and adj.loan.loan_type == 'advance':
adj.loan.loan_type = 'loan'
adj.loan.save()
return (payroll_record, log_count, logs_amount)
# =============================================================================
# === PROCESS PAYMENT ===
# HTTP endpoint for paying a single worker. Reads selected IDs from the POST
# form (split payslip), delegates to _process_single_payment, then emails.
# =============================================================================
@login_required
def process_payment(request, worker_id):
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
# Validate the worker exists (returns 404 if not found)
worker = get_object_or_404(Worker, id=worker_id)
# === SPLIT PAYSLIP SUPPORT ===
# If the POST includes specific log/adjustment IDs (from the preview
# modal's checkboxes), only pay those selected items.
# If no IDs provided (e.g., the quick "Pay" button on the table),
# fall back to paying everything — backward compatible.
selected_log_ids = [int(x) for x in request.POST.getlist('selected_log_ids') if x.isdigit()]
selected_adj_ids = [int(x) for x in request.POST.getlist('selected_adj_ids') if x.isdigit()]
result = _process_single_payment(
worker_id,
selected_log_ids=selected_log_ids or None,
selected_adj_ids=selected_adj_ids or None,
)
if result is None:
messages.warning(request, f'No pending payments for {worker.name} — already paid or nothing owed.')
return redirect('payroll_dashboard')
payroll_record, log_count, logs_amount = result
# =========================================================================
# EMAIL PAYSLIP (outside the transaction — if email fails, payment is
# still saved. We don't want a network error to roll back a real payment.)
# =========================================================================
_send_payslip_email(request, worker, payroll_record, log_count, logs_amount)
return redirect('payroll_dashboard')
# =============================================================================
# === PAYSLIP EMAIL HELPER ===
# Generates and sends a payslip (HTML email + PDF attachment).
# Used by both process_payment (regular salary) and add_adjustment (advances).
# =============================================================================
def _send_payslip_email(request, worker, payroll_record, log_count, logs_amount, suppress_messages=False):
"""
Generate and email a payslip for a completed payment.
Called after a PayrollRecord has been created and adjustments linked.
- request: Django request (for messages framework)
- worker: the Worker being paid
- payroll_record: the PayrollRecord just created
- log_count: number of work logs in this payment (0 for advance-only)
- logs_amount: total earnings from work logs (Decimal('0.00') for advance-only)
- suppress_messages: if True, skip Django messages (used by batch pay)
"""
# Lazy import — avoids crashing the app if xhtml2pdf isn't installed
from .utils import render_to_pdf
total_amount = payroll_record.amount_paid
# === DETECT STANDALONE PAYMENT (no work logs, single adjustment) ===
# Advance-only or Loan-only payments use a cleaner payslip layout
# showing just the amount instead of "0 days worked + adjustment".
advance_adj = None
loan_adj = None
if log_count == 0:
adjs_list = list(payroll_record.adjustments.all())
if len(adjs_list) == 1:
if adjs_list[0].type == 'Advance Payment':
advance_adj = adjs_list[0]
elif adjs_list[0].type == 'New Loan':
loan_adj = adjs_list[0]
is_advance = advance_adj is not None
is_loan = loan_adj is not None
if is_advance:
subject = f"Advance Payslip for {worker.name} - {payroll_record.date}"
elif is_loan:
subject = f"Loan Payslip for {worker.name} - {payroll_record.date}"
else:
subject = f"Payslip for {worker.name} - {payroll_record.date}"
# Context for both the HTML email body and the PDF attachment
email_context = {
'record': payroll_record,
'logs_count': log_count,
'logs_amount': logs_amount,
'adjustments': payroll_record.adjustments.all(),
'deductive_types': DEDUCTIVE_TYPES,
'is_advance': is_advance,
'advance_amount': advance_adj.amount if advance_adj else None,
'advance_description': advance_adj.description if advance_adj else '',
'is_loan': is_loan,
'loan_amount': loan_adj.amount if loan_adj else None,
'loan_description': loan_adj.description if loan_adj else '',
}
# 1. Render HTML email body
html_message = render_to_string('core/email/payslip_email.html', email_context)
plain_message = strip_tags(html_message)
# 2. Render PDF attachment (returns None if xhtml2pdf is not installed)
pdf_content = render_to_pdf('core/pdf/payslip_pdf.html', email_context)
# 3. Send email with PDF attached
recipient = getattr(settings, 'SPARK_RECEIPT_EMAIL', None)
if recipient:
try:
email = EmailMultiAlternatives(
subject,
plain_message,
settings.DEFAULT_FROM_EMAIL,
[recipient],
)
email.attach_alternative(html_message, "text/html")
if pdf_content:
email.attach(
f"Payslip_{worker.id}_{payroll_record.date}.pdf",
pdf_content,
'application/pdf'
)
email.send()
if not suppress_messages:
messages.success(
request,
f'Payment of R {total_amount:,.2f} processed for {worker.name}. '
f'Payslip emailed successfully.'
)
except Exception as e:
# Payment is saved — just warn that email failed
if not suppress_messages:
messages.warning(
request,
f'Payment of R {total_amount:,.2f} processed for {worker.name}, '
f'but email delivery failed: {str(e)}'
)
raise # Re-raise so batch_pay can count failures
else:
# No SPARK_RECEIPT_EMAIL configured — just show success
if not suppress_messages:
messages.success(
request,
f'Payment of R {total_amount:,.2f} processed for {worker.name}. '
f'{log_count} work log(s) marked as paid.'
)
# =============================================================================
# === BATCH PAY PREVIEW ===
# AJAX GET endpoint — dry run showing which workers would be paid and how
# much, based on their team's pay schedule. No payments are made here.
# =============================================================================
@login_required
def batch_pay_preview(request):
"""Return JSON preview of batch payment — who gets paid and how much.
Accepts ?mode=all to skip pay-period cutoff and include ALL unpaid items.
Default mode is 'schedule' which splits at last completed pay period."""
if not is_admin(request.user):
return JsonResponse({'error': 'Not authorized'}, status=403)
# === MODE: 'schedule' (default) = split at last paydate, 'all' = pay everything ===
mode = request.GET.get('mode', 'schedule')
eligible = []
skipped = []
total_amount = Decimal('0.00')
# Get all active workers with their work logs and pending adjustments
active_workers = Worker.objects.filter(active=True).prefetch_related(
Prefetch(
'work_logs',
queryset=WorkLog.objects.prefetch_related('payroll_records').select_related('project')
),
Prefetch(
'adjustments',
queryset=PayrollAdjustment.objects.filter(payroll_record__isnull=True)
),
).order_by('name')
for worker in active_workers:
team = get_worker_active_team(worker)
# --- In 'schedule' mode, skip workers without a pay schedule ---
if mode == 'schedule':
if not team or not team.pay_frequency or not team.pay_start_date:
# Check if worker has ANY unpaid items before listing as skipped
has_unpaid = False
for log in worker.work_logs.all():
paid_ids = {pr.worker_id for pr in log.payroll_records.all()}
if worker.id not in paid_ids:
has_unpaid = True
break
if not has_unpaid:
has_unpaid = worker.adjustments.filter(payroll_record__isnull=True).exists()
if has_unpaid:
skipped.append({
'worker_name': worker.name,
'reason': 'No pay schedule configured',
})
continue
# --- Determine cutoff date (if applicable) ---
cutoff_date = None
if mode == 'schedule':
# cutoff_date = end of the last COMPLETED period.
# We pay ALL overdue work (across all past periods), not just one period.
period_start, period_end = get_pay_period(team)
if not period_start:
continue
cutoff_date = period_start - datetime.timedelta(days=1)
# --- Find unpaid logs (with or without cutoff) ---
unpaid_log_ids = []
for log in worker.work_logs.all():
paid_ids = {pr.worker_id for pr in log.payroll_records.all()}
if worker.id not in paid_ids:
# In 'all' mode: no date filter. In 'schedule' mode: only up to cutoff.
if cutoff_date is None or log.date <= cutoff_date:
unpaid_log_ids.append(log.id)
# --- Find pending adjustments (with or without cutoff) ---
unpaid_adj_ids = []
adj_amount = Decimal('0.00')
for adj in worker.adjustments.all():
if cutoff_date is None or (adj.date and adj.date <= cutoff_date):
unpaid_adj_ids.append(adj.id)
if adj.type in ADDITIVE_TYPES:
adj_amount += adj.amount
elif adj.type in DEDUCTIVE_TYPES:
adj_amount -= adj.amount
# Nothing due for this worker
if not unpaid_log_ids and not unpaid_adj_ids:
continue
log_count = len(unpaid_log_ids)
logs_amount = log_count * worker.daily_rate
net = logs_amount + adj_amount
# Skip workers with zero or negative net pay
if net <= 0:
skipped.append({
'worker_name': worker.name,
'reason': f'Net pay is R {net:,.2f} (zero or negative)',
})
continue
# --- Period display text ---
if cutoff_date:
# Use day integer to avoid platform-specific strftime issues
period_display = f"Up to {cutoff_date.day} {cutoff_date.strftime('%b %Y')}"
else:
period_display = "All unpaid"
# Check if worker has any active loans or advances
has_loan = Loan.objects.filter(worker=worker, active=True).exists()
eligible.append({
'worker_id': worker.id,
'worker_name': worker.name,
'team_name': team.name if team else '',
'period': period_display,
'days': log_count,
'logs_amount': float(logs_amount),
'adj_amount': float(adj_amount),
'net_pay': float(net),
'log_ids': unpaid_log_ids,
'adj_ids': unpaid_adj_ids,
'has_loan': has_loan,
})
total_amount += net
return JsonResponse({
'eligible': eligible,
'skipped': skipped,
'total_amount': float(total_amount),
'worker_count': len(eligible),
'mode': mode,
})
# =============================================================================
# === BATCH PAY (PROCESS) ===
# POST endpoint — processes payments for multiple workers at once.
# Each worker gets their own atomic transaction and payslip email.
# =============================================================================
@login_required
def batch_pay(request):
"""Process batch payments for multiple workers using their team pay schedules."""
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
try:
body = json.loads(request.body)
except (json.JSONDecodeError, ValueError):
messages.error(request, 'Invalid request data.')
return redirect('payroll_dashboard')
workers_to_pay = body.get('workers', [])
if not workers_to_pay:
messages.warning(request, 'No workers selected for batch payment.')
return redirect('payroll_dashboard')
# === PROCESS EACH WORKER ===
# Each worker gets their own atomic transaction (independent row locks).
# This means if one worker fails, others still succeed.
paid_count = 0
paid_total = Decimal('0.00')
errors = []
email_queue = [] # Collect payslip data for emails (sent after all payments)
for entry in workers_to_pay:
worker_id = entry.get('worker_id')
log_ids = entry.get('log_ids', [])
adj_ids = entry.get('adj_ids', [])
try:
worker = Worker.objects.get(id=worker_id, active=True)
except Worker.DoesNotExist:
errors.append(f'Worker ID {worker_id} not found or inactive.')
continue
result = _process_single_payment(
worker_id,
selected_log_ids=log_ids or None,
selected_adj_ids=adj_ids or None,
)
if result is None:
continue # Nothing to pay — silently skip
payroll_record, log_count, logs_amount = result
paid_count += 1
paid_total += payroll_record.amount_paid
email_queue.append((worker, payroll_record, log_count, logs_amount))
# === SEND PAYSLIP EMAILS (outside all transactions) ===
# If an email fails, the payment is still saved — same pattern as individual pay.
email_failures = 0
for worker, pr, lc, la in email_queue:
try:
_send_payslip_email(request, worker, pr, lc, la, suppress_messages=True)
except Exception:
email_failures += 1
# === SUMMARY MESSAGE ===
if paid_count > 0:
msg = f'Batch payment complete: {paid_count} worker(s) paid, total R {paid_total:,.2f}.'
if email_failures:
msg += f' ({email_failures} email(s) failed to send.)'
messages.success(request, msg)
for err in errors:
messages.warning(request, err)
if paid_count == 0 and not errors:
messages.info(request, 'No payments were processed — all workers already paid or had zero/negative net pay.')
return redirect('payroll_dashboard')
# =============================================================================
# === PRICE OVERTIME ===
# Creates Overtime adjustments for workers who have unpriced overtime on
# their work logs. Called via AJAX from the Price Overtime modal.
# =============================================================================
@login_required
def price_overtime(request):
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
log_ids = request.POST.getlist('log_id[]')
worker_ids = request.POST.getlist('worker_id[]')
rate_pcts = request.POST.getlist('rate_pct[]')
created_count = 0
for log_id, w_id, pct in zip(log_ids, worker_ids, rate_pcts):
try:
worklog = WorkLog.objects.select_related('project').get(id=int(log_id))
worker = Worker.objects.get(id=int(w_id))
rate_pct = Decimal(pct)
# Calculate: daily_rate × overtime_fraction × (rate_percentage / 100)
amount = worker.daily_rate * worklog.overtime_amount * (rate_pct / Decimal('100'))
if amount > 0:
PayrollAdjustment.objects.create(
worker=worker,
type='Overtime',
amount=amount,
date=worklog.date,
description=f'Overtime ({worklog.get_overtime_amount_display()}) at {pct}% on {worklog.project.name}',
work_log=worklog,
project=worklog.project,
)
# Mark this worker as "priced" for this log's overtime
worklog.priced_workers.add(worker)
created_count += 1
except (WorkLog.DoesNotExist, Worker.DoesNotExist, Exception):
continue
messages.success(request, f'Priced {created_count} overtime adjustment(s).')
return redirect('payroll_dashboard')
# =============================================================================
# === ADD ADJUSTMENT ===
# Creates a new payroll adjustment (bonus, deduction, loan, etc.).
# Called via POST from the Add Adjustment modal.
# =============================================================================
@login_required
def add_adjustment(request):
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
worker_ids = request.POST.getlist('workers')
adj_type = request.POST.get('type', '')
amount_str = request.POST.get('amount', '0')
description = request.POST.get('description', '')
date_str = request.POST.get('date', '')
project_id = request.POST.get('project', '')
# Validate workers — at least one must be selected.
# The frontend also checks this, but this is a safety net in case
# the user has JavaScript disabled or submits via other means.
if not worker_ids:
messages.error(request, 'Please select at least one worker.')
return redirect('payroll_dashboard')
# Validate amount
try:
amount = Decimal(amount_str)
if amount <= 0:
raise ValueError
except (ValueError, Exception):
messages.error(request, 'Please enter a valid amount greater than zero.')
return redirect('payroll_dashboard')
# Validate date
try:
adj_date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date() if date_str else timezone.now().date()
except ValueError:
adj_date = timezone.now().date()
# Validate project for types that require it
project = None
if project_id:
try:
project = Project.objects.get(id=int(project_id))
except Project.DoesNotExist:
pass
project_required_types = ('Overtime', 'Bonus', 'Deduction', 'Advance Payment')
if adj_type in project_required_types and not project:
messages.error(request, 'A project must be selected for this adjustment type.')
return redirect('payroll_dashboard')
created_count = 0
for w_id in worker_ids:
try:
worker = Worker.objects.get(id=int(w_id))
except Worker.DoesNotExist:
continue
loan = None
# === LOAN REPAYMENT — find the worker's active loan ===
if adj_type == 'Loan Repayment':
loan = worker.loans.filter(active=True, loan_type='loan').first()
if not loan:
messages.warning(request, f'{worker.name} has no active loan — skipped.')
continue
# === ADVANCE REPAYMENT — find the worker's active advance ===
if adj_type == 'Advance Repayment':
loan = worker.loans.filter(active=True, loan_type='advance').first()
if not loan:
messages.warning(request, f'{worker.name} has no active advance — skipped.')
continue
# === NEW LOAN — create a Loan record (loan_type='loan') ===
# If "Pay Immediately" is checked (default), the loan is processed
# right away — PayrollRecord is created, payslip emailed to Spark,
# and the adjustment is marked as paid. If unchecked, the loan sits
# in Pending Payments and is included in the next pay cycle.
if adj_type == 'New Loan':
loan = Loan.objects.create(
worker=worker,
loan_type='loan',
principal_amount=amount,
remaining_balance=amount,
date=adj_date,
reason=description,
)
pay_immediately = request.POST.get('pay_immediately') == '1'
if pay_immediately:
# Create the adjustment and immediately mark it as paid
loan_adj = PayrollAdjustment.objects.create(
worker=worker,
type='New Loan',
amount=amount,
date=adj_date,
description=description,
loan=loan,
)
payroll_record = PayrollRecord.objects.create(
worker=worker,
amount_paid=amount,
date=adj_date,
)
loan_adj.payroll_record = payroll_record
loan_adj.save()
# Send payslip email to Spark
_send_payslip_email(request, worker, payroll_record, 0, Decimal('0.00'))
created_count += 1
continue # Skip the generic PayrollAdjustment creation below
# === ADVANCE PAYMENT — immediate payment + auto-repayment ===
# An advance is a salary prepayment — worker gets money now, and
# the full amount is automatically deducted from their next salary.
# Unlike other adjustments, advances are processed IMMEDIATELY
# (they don't sit in Pending Payments waiting for a "Pay" click).
if adj_type == 'Advance Payment':
# VALIDATION: Worker must have unpaid work to justify an advance.
# If they have no logged work, this is a loan, not an advance.
has_unpaid_logs = False
for log in worker.work_logs.all():
paid_worker_ids = set(
log.payroll_records.values_list('worker_id', flat=True)
)
if worker.id not in paid_worker_ids:
has_unpaid_logs = True
break
if not has_unpaid_logs:
messages.warning(
request,
f'{worker.name} has no unpaid work days — cannot create '
f'an advance. Use "New Loan" instead.'
)
continue
# 1. Create the Loan record (tracks the advance balance)
loan = Loan.objects.create(
worker=worker,
loan_type='advance',
principal_amount=amount,
remaining_balance=amount,
date=adj_date,
reason=description or 'Salary advance',
)
# 2. Create the Advance Payment adjustment
advance_adj = PayrollAdjustment.objects.create(
worker=worker,
type='Advance Payment',
amount=amount,
date=adj_date,
description=description,
project=project,
loan=loan,
)
# 3. AUTO-PROCESS: Create PayrollRecord immediately
# (advance is paid now, not at the next payday)
payroll_record = PayrollRecord.objects.create(
worker=worker,
amount_paid=amount,
date=adj_date,
)
advance_adj.payroll_record = payroll_record
advance_adj.save()
# 4. AUTO-CREATE REPAYMENT for the next salary cycle
# This ensures the advance is automatically deducted from
# the worker's next salary without the admin having to remember.
PayrollAdjustment.objects.create(
worker=worker,
type='Advance Repayment',
amount=amount,
date=adj_date,
description=f'Auto-deduction for advance of R {amount:.2f}',
loan=loan,
project=project,
)
# 5. Send payslip email to SparkReceipt
_send_payslip_email(request, worker, payroll_record, 0, Decimal('0.00'))
created_count += 1
continue # Skip the generic PayrollAdjustment creation below
# === ALL OTHER TYPES — create a pending adjustment ===
PayrollAdjustment.objects.create(
worker=worker,
type=adj_type,
amount=amount,
date=adj_date,
description=description,
project=project,
loan=loan,
)
created_count += 1
messages.success(request, f'Created {created_count} {adj_type} adjustment(s).')
return redirect('payroll_dashboard')
# =============================================================================
# === EDIT ADJUSTMENT ===
# Updates an existing unpaid adjustment. Type changes are limited to
# Bonus ↔ Deduction swaps only.
# =============================================================================
@login_required
def edit_adjustment(request, adj_id):
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
adj = get_object_or_404(PayrollAdjustment, id=adj_id)
# Can't edit adjustments that have already been paid
if adj.payroll_record is not None:
messages.error(request, 'Cannot edit a paid adjustment.')
return redirect('payroll_dashboard')
# Can't edit Loan Repayment adjustments (managed by the loan system).
# Advance Repayments CAN be edited — the admin may want to reduce the
# auto-deduction amount (e.g., deduct R50 of a R100 advance this payday).
if adj.type == 'Loan Repayment':
messages.warning(request, 'Loan repayment adjustments cannot be edited directly.')
return redirect('payroll_dashboard')
# Update fields
try:
adj.amount = Decimal(request.POST.get('amount', str(adj.amount)))
except (ValueError, Exception):
pass
adj.description = request.POST.get('description', adj.description)
date_str = request.POST.get('date', '')
if date_str:
try:
adj.date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date()
except ValueError:
pass
# Type change — only allow Bonus ↔ Deduction
new_type = request.POST.get('type', adj.type)
if adj.type in ('Bonus', 'Deduction') and new_type in ('Bonus', 'Deduction'):
adj.type = new_type
# Project
project_id = request.POST.get('project', '')
if project_id:
try:
adj.project = Project.objects.get(id=int(project_id))
except Project.DoesNotExist:
pass
else:
adj.project = None
# === ADVANCE REPAYMENT EDIT — cap amount at loan balance ===
# If the admin edits an auto-created advance repayment, make sure
# the amount doesn't exceed the loan's remaining balance.
if adj.type == 'Advance Repayment' and adj.loan:
if adj.amount > adj.loan.remaining_balance:
adj.amount = adj.loan.remaining_balance
messages.info(
request,
f'Amount capped at loan balance of R {adj.loan.remaining_balance:.2f}.'
)
adj.save()
# If it's a Loan or Advance adjustment, sync the loan details
if adj.type in ('New Loan', 'Advance Payment') and adj.loan:
adj.loan.principal_amount = adj.amount
adj.loan.remaining_balance = adj.amount
adj.loan.reason = adj.description
adj.loan.save()
messages.success(request, f'{adj.type} adjustment updated.')
return redirect('payroll_dashboard')
# =============================================================================
# === DELETE ADJUSTMENT ===
# Removes an unpaid adjustment. Handles cascade logic for Loans and Overtime.
# =============================================================================
@login_required
def delete_adjustment(request, adj_id):
if request.method != 'POST':
return redirect('payroll_dashboard')
if not is_admin(request.user):
return HttpResponseForbidden("Not authorized.")
adj = get_object_or_404(PayrollAdjustment, id=adj_id)
# Can't delete adjustments that have been paid
if adj.payroll_record is not None:
messages.error(request, 'Cannot delete a paid adjustment.')
return redirect('payroll_dashboard')
adj_type = adj.type
worker_name = adj.worker.name
# === CASCADE DELETE for New Loan and Advance Payment ===
# Both create Loan records that need cleanup when deleted.
if adj_type in ('New Loan', 'Advance Payment') and adj.loan:
# Determine which repayment type to look for
repayment_type = 'Advance Repayment' if adj_type == 'Advance Payment' else 'Loan Repayment'
# Check if any paid repayments exist for this loan/advance
paid_repayments = PayrollAdjustment.objects.filter(
loan=adj.loan,
type=repayment_type,
payroll_record__isnull=False,
)
if paid_repayments.exists():
label = 'advance' if adj_type == 'Advance Payment' else 'loan'
messages.error(
request,
f'Cannot delete {label} for {worker_name} — it has paid repayments.'
)
return redirect('payroll_dashboard')
# Delete all unpaid repayments for this loan/advance, then the loan itself
PayrollAdjustment.objects.filter(
loan=adj.loan,
type=repayment_type,
payroll_record__isnull=True,
).delete()
adj.loan.delete()
elif adj_type == 'Overtime' and adj.work_log:
# "Un-price" the overtime — remove worker from priced_workers M2M
adj.work_log.priced_workers.remove(adj.worker)
adj.delete()
messages.success(request, f'{adj_type} adjustment for {worker_name} deleted.')
return redirect('payroll_dashboard')
# =============================================================================
# === PREVIEW PAYSLIP (AJAX) ===
# Returns a JSON preview of what a worker's payslip would look like.
# Called from the Preview Payslip modal without saving anything.
# =============================================================================
@login_required
def preview_payslip(request, worker_id):
if not is_admin(request.user):
return JsonResponse({'error': 'Not authorized'}, status=403)
worker = get_object_or_404(Worker, id=worker_id)
# Find unpaid logs — include the log ID so the frontend can send
# selected IDs back for split payslip (selective payment).
unpaid_logs = []
for log in worker.work_logs.select_related('project').prefetch_related('payroll_records').all():
paid_worker_ids = {pr.worker_id for pr in log.payroll_records.all()}
if worker.id not in paid_worker_ids:
unpaid_logs.append({
'id': log.id,
'date': log.date.strftime('%Y-%m-%d'),
'project': log.project.name,
})
# Sort logs by date so the split makes visual sense (oldest first)
unpaid_logs.sort(key=lambda x: x['date'])
log_count = len(unpaid_logs)
log_amount = float(log_count * worker.daily_rate)
# Find pending adjustments — include ID and date for split payslip
pending_adjs = worker.adjustments.filter(
payroll_record__isnull=True
).select_related('project')
adjustments_list = []
adj_total = 0.0
for adj in pending_adjs:
sign = '+' if adj.type in ADDITIVE_TYPES else '-'
adj_total += float(adj.amount) if adj.type in ADDITIVE_TYPES else -float(adj.amount)
adjustments_list.append({
'id': adj.id,
'type': adj.type,
'amount': float(adj.amount),
'sign': sign,
'description': adj.description,
'project': adj.project.name if adj.project else '',
'date': adj.date.strftime('%Y-%m-%d'),
})
# === ACTIVE LOANS & ADVANCES ===
# Include the worker's outstanding balances so the admin can see the
# full picture and add repayments directly from the preview modal.
active_loans = worker.loans.filter(active=True).order_by('-date')
loans_list = []
for loan in active_loans:
loans_list.append({
'id': loan.id,
'type': loan.loan_type, # 'loan' or 'advance'
'type_label': loan.get_loan_type_display(), # 'Loan' or 'Advance'
'principal': float(loan.principal_amount),
'balance': float(loan.remaining_balance),
'date': loan.date.strftime('%Y-%m-%d'),
'reason': loan.reason or '',
})
# === PAY PERIOD INFO ===
# If the worker belongs to a team with a pay schedule, include the
# current period boundaries so the "Split at Pay Date" button can work.
team = get_worker_active_team(worker)
period_start, period_end = get_pay_period(team)
# cutoff_date = last day of the most recently COMPLETED pay period.
# All unpaid logs on or before this date are "due" for payment.
# E.g., fortnightly periods ending Mar 14, Mar 28, Apr 11...
# If today is Mar 20, cutoff_date = Mar 14 (pay everything through last completed period).
cutoff_date = (period_start - datetime.timedelta(days=1)) if period_start else None
pay_period = {
'has_schedule': period_start is not None,
'start': period_start.strftime('%Y-%m-%d') if period_start else None,
'end': period_end.strftime('%Y-%m-%d') if period_end else None,
'cutoff_date': cutoff_date.strftime('%Y-%m-%d') if cutoff_date else None,
'frequency': team.pay_frequency if team else None,
'team_name': team.name if team else None,
}
return JsonResponse({
'worker_id': worker.id,
'worker_name': worker.name,
'worker_id_number': worker.id_number,
'day_rate': float(worker.daily_rate),
'days_worked': log_count,
'log_amount': log_amount,
'adjustments': adjustments_list,
'adj_total': adj_total,
'net_pay': log_amount + adj_total,
'logs': unpaid_logs,
'active_loans': loans_list,
'pay_period': pay_period,
})
# =============================================================================
# === ADD REPAYMENT (AJAX) ===
# Creates a Loan Repayment or Advance Repayment adjustment for a single worker.
# Called via AJAX POST from the Payslip Preview modal's inline repayment form.
# Returns JSON so the modal can refresh in-place without a page reload.
# =============================================================================
@login_required
def add_repayment_ajax(request, worker_id):
"""AJAX endpoint: add a repayment adjustment and return JSON response."""
if request.method != 'POST':
return JsonResponse({'error': 'POST required'}, status=405)
if not is_admin(request.user):
return JsonResponse({'error': 'Not authorized'}, status=403)
worker = get_object_or_404(Worker, id=worker_id)
# Parse the POST body (sent as JSON from fetch())
try:
body = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
loan_id = body.get('loan_id')
amount_str = body.get('amount', '0')
description = body.get('description', '')
# Validate: loan exists, belongs to this worker, and is active
try:
loan = Loan.objects.get(id=int(loan_id), worker=worker, active=True)
except (Loan.DoesNotExist, ValueError, TypeError):
return JsonResponse({'error': 'No active loan/advance found.'}, status=400)
# Validate: amount is positive
try:
amount = Decimal(str(amount_str))
if amount <= 0:
raise ValueError
except (ValueError, Exception):
return JsonResponse({'error': 'Please enter a valid amount greater than zero.'}, status=400)
# Cap the repayment at the remaining balance (prevent over-repaying)
if amount > loan.remaining_balance:
amount = loan.remaining_balance
# Pick the right repayment type based on loan type
repayment_type = 'Advance Repayment' if loan.loan_type == 'advance' else 'Loan Repayment'
# Create the adjustment (balance deduction happens later during process_payment)
PayrollAdjustment.objects.create(
worker=worker,
type=repayment_type,
amount=amount,
date=timezone.now().date(),
description=description or f'{loan.get_loan_type_display()} repayment',
loan=loan,
)
return JsonResponse({
'success': True,
'message': f'{repayment_type} of R {amount:.2f} added for {worker.name}.',
})
# =============================================================================
# === PAYSLIP DETAIL ===
# Shows a completed payment (PayrollRecord) as a printable payslip page.
# Displays: worker details, work log table, adjustments table, totals.
# Reached from the "Payment History" tab on the payroll dashboard.
# =============================================================================
@login_required
def payslip_detail(request, pk):
"""Show a completed payslip with work logs, adjustments, and totals."""
if not is_admin(request.user):
return redirect('payroll_dashboard')
record = get_object_or_404(PayrollRecord, pk=pk)
# Get the work logs included in this payment
logs = record.work_logs.select_related('project').order_by('date')
# Get the adjustments linked to this payment
adjustments = record.adjustments.all().order_by('type')
# Calculate base pay from logs
# Each log = 1 day of work at the worker's daily rate
base_pay = record.worker.daily_rate * logs.count()
# Calculate net adjustment amount (additive minus deductive)
adjustments_net = record.amount_paid - base_pay
# === DETECT STANDALONE PAYMENT (no work logs, single adjustment) ===
# Advance-only or Loan-only payments use a cleaner layout.
adjs_list = list(adjustments)
advance_adj = None
loan_adj = None
if logs.count() == 0 and len(adjs_list) == 1:
if adjs_list[0].type == 'Advance Payment':
advance_adj = adjs_list[0]
elif adjs_list[0].type == 'New Loan':
loan_adj = adjs_list[0]
context = {
'record': record,
'logs': logs,
'adjustments': adjustments,
'base_pay': base_pay,
'adjustments_net': adjustments_net,
'adjustments_net_abs': abs(adjustments_net),
'deductive_types': DEDUCTIVE_TYPES,
'is_advance': advance_adj is not None,
'advance_adj': advance_adj,
'is_loan': loan_adj is not None,
'loan_adj': loan_adj,
}
return render(request, 'core/payslip.html', context)
# =============================================================================
# === CREATE EXPENSE RECEIPT ===
# Single-page form for recording business expenses.
# Supports dynamic line items (products + amounts) and VAT calculation.
# On save: emails an HTML + PDF receipt to Spark Receipt for accounting.
# =============================================================================
@login_required
def create_receipt(request):
"""Create a new expense receipt and email it to Spark Receipt."""
if not is_staff_or_supervisor(request.user):
return redirect('home')
if request.method == 'POST':
form = ExpenseReceiptForm(request.POST)
items = ExpenseLineItemFormSet(request.POST)
if form.is_valid() and items.is_valid():
# Save the receipt header (but don't commit yet — need to set user)
receipt = form.save(commit=False)
receipt.user = request.user
# Set temporary zero values so the first save doesn't fail.
# (subtotal and total_amount have no default in the model,
# so they'd be NULL — which MariaDB rejects.)
# We'll recalculate these properly after saving line items.
receipt.subtotal = Decimal('0.00')
receipt.vat_amount = Decimal('0.00')
receipt.total_amount = Decimal('0.00')
receipt.save()
# Save line items — link them to this receipt
items.instance = receipt
line_items = items.save()
# === BACKEND VAT CALCULATION ===
# The frontend shows live totals, but we recalculate on the server
# using Python Decimal for accuracy (no floating-point rounding errors).
sum_amount = sum(item.amount for item in line_items)
vat_type = receipt.vat_type
if vat_type == 'Included':
# "VAT Included" means the entered amounts already include 15% VAT.
# To find the pre-VAT subtotal: divide by 1.15
# Example: R100 entered → Subtotal R86.96, VAT R13.04, Total R100
receipt.total_amount = sum_amount
receipt.subtotal = (sum_amount / Decimal('1.15')).quantize(Decimal('0.01'))
receipt.vat_amount = receipt.total_amount - receipt.subtotal
elif vat_type == 'Excluded':
# "VAT Excluded" means the entered amounts are pre-VAT.
# Add 15% on top for the total.
# Example: R100 entered → Subtotal R100, VAT R15, Total R115
receipt.subtotal = sum_amount
receipt.vat_amount = (sum_amount * Decimal('0.15')).quantize(Decimal('0.01'))
receipt.total_amount = receipt.subtotal + receipt.vat_amount
else:
# "None" — no VAT applies
receipt.subtotal = sum_amount
receipt.vat_amount = Decimal('0.00')
receipt.total_amount = sum_amount
receipt.save()
# =================================================================
# EMAIL RECEIPT (same pattern as payslip email)
# If email fails, the receipt is still saved.
# =================================================================
# Lazy import — avoids crashing the app if xhtml2pdf isn't installed
from .utils import render_to_pdf
subject = f"Receipt from {receipt.vendor_name} - {receipt.date}"
email_context = {
'receipt': receipt,
'items': line_items,
}
# 1. Render HTML email body
html_message = render_to_string(
'core/email/receipt_email.html', email_context
)
plain_message = strip_tags(html_message)
# 2. Render PDF attachment (returns None if xhtml2pdf is not installed)
pdf_content = render_to_pdf(
'core/pdf/receipt_pdf.html', email_context
)
# 3. Send email with PDF attached
recipient = getattr(settings, 'SPARK_RECEIPT_EMAIL', None)
if recipient:
try:
email = EmailMultiAlternatives(
subject,
plain_message,
settings.DEFAULT_FROM_EMAIL,
[recipient],
)
email.attach_alternative(html_message, "text/html")
if pdf_content:
email.attach(
f"Receipt_{receipt.id}.pdf",
pdf_content,
'application/pdf'
)
email.send()
messages.success(
request,
'Receipt created and sent to SparkReceipt.'
)
except Exception as e:
messages.warning(
request,
f'Receipt saved, but email failed: {str(e)}'
)
else:
messages.success(request, 'Receipt saved successfully.')
# Redirect back to a blank form for the next receipt
return redirect('create_receipt')
else:
# GET request — show a blank form with today's date
form = ExpenseReceiptForm(initial={'date': timezone.now().date()})
items = ExpenseLineItemFormSet()
return render(request, 'core/create_receipt.html', {
'form': form,
'items': items,
})
# =============================================================================
# === IMPORT DATA (TEMPORARY) ===
# Runs the import_production_data command from the browser.
# Visit /import-data/ once to populate the database. Safe to re-run.
# REMOVE THIS VIEW once data is imported.
# =============================================================================
def import_data(request):
"""Runs the import_production_data management command from the browser."""
from django.core.management import call_command
from io import StringIO
output = StringIO()
try:
call_command('import_production_data', stdout=output)
result = output.getvalue()
lines = result.replace('\n', '<br>')
return HttpResponse(
'<html><body style="font-family: monospace; padding: 20px;">'
'<h2>Import Complete!</h2>'
'<div>' + lines + '</div>'
'<br><br>'
'<a href="/admin/">Go to Admin Panel</a> | '
'<a href="/payroll/">Go to Payroll Dashboard</a> | '
'<a href="/">Go to Dashboard</a>'
'</body></html>'
)
except Exception as e:
return HttpResponse(
'<html><body style="font-family: monospace; padding: 20px; color: red;">'
'<h2>Import Error</h2>'
'<pre>' + str(e) + '</pre>'
'</body></html>',
status=500,
)
# =============================================================================
# === RUN MIGRATIONS ===
# Runs pending database migrations from the browser. Useful when Flatlogic's
# "Pull Latest" doesn't automatically run migrations after a code update.
# Visit /run-migrate/ to apply any pending migrations to the production DB.
# =============================================================================
def run_migrate(request):
"""Runs Django migrate from the browser to apply pending migrations."""
from django.core.management import call_command
from io import StringIO
output = StringIO()
try:
call_command('migrate', stdout=output)
result = output.getvalue()
lines = result.replace('\n', '<br>')
return HttpResponse(
'<html><body style="font-family: monospace; padding: 20px;">'
'<h2>Migrations Complete!</h2>'
'<div>' + lines + '</div>'
'<br><br>'
'<a href="/">Go to Dashboard</a> | '
'<a href="/payroll/">Go to Payroll Dashboard</a>'
'</body></html>'
)
except Exception as e:
return HttpResponse(
'<html><body style="font-family: monospace; padding: 20px; color: red;">'
'<h2>Migration Error</h2>'
'<pre>' + str(e) + '</pre>'
'</body></html>',
status=500,
)