38686-vm/core/tests.py
Konrad du Plessis 3ef6db71c9 refactor(report): drop dead year_projects context + SQL cost velocity
Two unrelated cleanups in `_build_report_context` and the helper next
to it.

- Removed `year_projects`, `year_teams`, and `current_year` from the
  report context dict. No template ever rendered them — they were
  added 2026-04 as part of an executive-report design that never
  shipped that section. Each render fired 2 extra GROUP BY queries
  for nothing.

- `_company_cost_velocity` no longer loops every (work_log × worker)
  pair in Python. Single SQL aggregate (`Sum(monthly_salary / 20)`)
  instead — one round-trip regardless of dataset size. Old behaviour
  loaded the entire WorkLog table + M2M into memory for the hero KPI
  card. Regression test (`test_sql_aggregate_matches_python_loop`)
  uses the old Python loop as the expected oracle.

Findings 14 + 16.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 02:01:48 +02:00

3293 lines
146 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# === TESTS FOR WORK LOG PAYROLL CROSS-LINK ===
# Covers the _build_work_log_payroll_context helper — the core logic that
# determines, for each worker on a log, whether they were paid for it.
import datetime
from decimal import Decimal
from django.conf import settings
from django.contrib.auth.models import User
from django.test import TestCase
from django.urls import reverse
from core.models import Project, Team, Worker, WorkLog, PayrollRecord, PayrollAdjustment, Loan, Absence
from core.views import _build_work_log_payroll_context, _build_report_context
class WorkLogPayrollContextTests(TestCase):
"""Tests for the helper that builds the payroll-status view of a work log."""
def setUp(self):
# Minimal scenario: 1 admin, 1 project, 1 team, 3 workers, 1 log.
# Worker A has been paid for the log; Worker B is priced-not-paid;
# Worker C is unpaid.
self.admin = User.objects.create_user(username='admin', is_staff=True)
self.project = Project.objects.create(name='Test Project')
self.team = Team.objects.create(name='Team X', supervisor=self.admin)
self.worker_a = Worker.objects.create(name='Alice', id_number='A1', monthly_salary=Decimal('4000'))
self.worker_b = Worker.objects.create(name='Bob', id_number='B1', monthly_salary=Decimal('4000'))
self.worker_c = Worker.objects.create(name='Carol', id_number='C1', monthly_salary=Decimal('4000'))
self.log = WorkLog.objects.create(
date=datetime.date(2026, 4, 10),
project=self.project,
team=self.team,
supervisor=self.admin,
)
self.log.workers.add(self.worker_a, self.worker_b, self.worker_c)
# Worker A has a PayrollRecord linking them and this log — "Paid".
self.record_a = PayrollRecord.objects.create(
worker=self.worker_a,
amount_paid=Decimal('200.00'),
date=datetime.date(2026, 4, 15),
)
self.record_a.work_logs.add(self.log)
# Worker B appears in priced_workers but has no PayrollRecord — "Priced, not paid".
self.log.priced_workers.add(self.worker_b)
# Worker C has neither — "Unpaid".
def test_returns_log_and_worker_rows(self):
ctx = _build_work_log_payroll_context(self.log)
self.assertEqual(ctx['log'], self.log)
self.assertEqual(len(ctx['worker_rows']), 3)
def test_paid_worker_has_payslip_link(self):
ctx = _build_work_log_payroll_context(self.log)
row = next(r for r in ctx['worker_rows'] if r['worker'].id == self.worker_a.id)
self.assertEqual(row['status'], 'Paid')
self.assertEqual(row['payroll_record'], self.record_a)
self.assertGreater(row['earned'], 0)
def test_priced_but_unpaid_worker(self):
ctx = _build_work_log_payroll_context(self.log)
row = next(r for r in ctx['worker_rows'] if r['worker'].id == self.worker_b.id)
self.assertEqual(row['status'], 'Priced, not paid')
self.assertIsNone(row['payroll_record'])
def test_totally_unpaid_worker(self):
ctx = _build_work_log_payroll_context(self.log)
row = next(r for r in ctx['worker_rows'] if r['worker'].id == self.worker_c.id)
self.assertEqual(row['status'], 'Unpaid')
self.assertIsNone(row['payroll_record'])
def test_totals(self):
ctx = _build_work_log_payroll_context(self.log)
# Paid = Alice's daily_rate (one record exists for this log+worker).
self.assertEqual(ctx['total_paid'], self.worker_a.daily_rate)
# Outstanding = Bob + Carol each at their daily_rate.
expected = self.worker_b.daily_rate + self.worker_c.daily_rate
self.assertEqual(ctx['total_outstanding'], expected)
def test_adjustments_linked_to_log(self):
adj = PayrollAdjustment.objects.create(
worker=self.worker_a,
project=self.project,
type='Overtime',
amount=Decimal('50.00'),
date=datetime.date(2026, 4, 10),
description='Extra hour',
work_log=self.log,
)
ctx = _build_work_log_payroll_context(self.log)
self.assertIn(adj, ctx['adjustments'])
def test_pay_period_absent_if_no_schedule(self):
ctx = _build_work_log_payroll_context(self.log)
self.assertEqual(ctx['pay_period'], (None, None))
def test_pay_period_present_when_schedule_configured(self):
self.team.pay_frequency = 'weekly'
self.team.pay_start_date = datetime.date(2026, 1, 5) # A Monday
self.team.save()
ctx = _build_work_log_payroll_context(self.log)
start, end = ctx['pay_period']
self.assertIsNotNone(start)
self.assertIsNotNone(end)
self.assertLessEqual(start, self.log.date)
self.assertGreaterEqual(end, self.log.date)
def test_overtime_needs_pricing_flag(self):
"""Flag fires when log has OT hours but no priced_workers yet."""
# Start: no OT, no priced workers -> flag False
self.assertFalse(_build_work_log_payroll_context(self.log)['overtime_needs_pricing'])
# Add OT hours but keep priced_workers empty -> flag True
self.log.overtime_amount = Decimal('0.50')
self.log.priced_workers.clear()
self.log.save()
self.assertTrue(_build_work_log_payroll_context(self.log)['overtime_needs_pricing'])
# Price the OT -> flag False again
self.log.priced_workers.add(self.worker_a)
self.assertFalse(_build_work_log_payroll_context(self.log)['overtime_needs_pricing'])
def test_query_count_is_bounded(self):
"""Helper should not issue per-worker queries — guards against N+1."""
# 4 queries: payroll_records, priced_workers, workers.all, adjustments
# (plus whichever assertNumQueries overhead Django's test client adds).
# We assert a tight upper bound; regressions that add per-worker queries
# will push this well above the bound and fail the test.
with self.assertNumQueries(4):
_build_work_log_payroll_context(self.log)
def test_empty_log_returns_zero_totals(self):
"""Log with no workers: helper returns empty rows and zero totals."""
empty_log = WorkLog.objects.create(
date=datetime.date(2026, 4, 11),
project=self.project,
team=self.team,
supervisor=self.admin,
)
ctx = _build_work_log_payroll_context(empty_log)
self.assertEqual(ctx['worker_rows'], [])
self.assertEqual(ctx['total_earned'], Decimal('0.00'))
self.assertEqual(ctx['total_paid'], Decimal('0.00'))
self.assertEqual(ctx['total_outstanding'], Decimal('0.00'))
self.assertEqual(ctx['adjustments'], [])
def test_log_without_team_has_no_pay_period(self):
"""Log whose team was later soft-deleted to NULL still works."""
self.log.team = None
self.log.save()
ctx = _build_work_log_payroll_context(self.log)
self.assertEqual(ctx['pay_period'], (None, None))
# The rest of the context should still populate correctly.
self.assertEqual(len(ctx['worker_rows']), 3)
# === TESTS FOR THE WORK LOG PAYROLL AJAX ENDPOINT ===
# These cover the JSON endpoint that the Task-5 modal will consume.
# The endpoint must: return JSON to admins, forbid supervisors/anons,
# and 404 on unknown logs.
class WorkLogPayrollAjaxTests(TestCase):
"""Tests for the JSON AJAX endpoint that powers the modal."""
def setUp(self):
# One admin, one non-admin supervisor, and a simple log with one worker.
self.admin = User.objects.create_user(
username='admin', password='pass', is_staff=True
)
self.supervisor = User.objects.create_user(
username='sup', password='pass', is_staff=False
)
project = Project.objects.create(name='P')
team = Team.objects.create(name='T', supervisor=self.admin)
worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('4000'))
self.log = WorkLog.objects.create(
date=datetime.date(2026, 4, 10),
project=project, team=team, supervisor=self.admin,
)
self.log.workers.add(worker)
def test_admin_sees_200_json(self):
# Admin hits the endpoint and gets a well-formed JSON body.
self.client.login(username='admin', password='pass')
url = reverse('work_log_payroll_ajax', args=[self.log.id])
resp = self.client.get(url)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertEqual(data['log_id'], self.log.id)
self.assertEqual(len(data['worker_rows']), 1)
self.assertEqual(data['worker_rows'][0]['status'], 'Unpaid')
def test_supervisor_forbidden(self):
# A non-admin user (even if authenticated) gets 403 JSON.
self.client.login(username='sup', password='pass')
url = reverse('work_log_payroll_ajax', args=[self.log.id])
resp = self.client.get(url)
self.assertEqual(resp.status_code, 403)
def test_anonymous_redirected_to_login(self):
# @login_required intercepts before our view ever runs — 302 to /accounts/login/.
url = reverse('work_log_payroll_ajax', args=[self.log.id])
resp = self.client.get(url)
self.assertEqual(resp.status_code, 302)
def test_missing_log_is_404(self):
# get_object_or_404 returns a 404 if the log_id doesn't exist.
self.client.login(username='admin', password='pass')
resp = self.client.get('/history/99999/payroll/ajax/')
self.assertEqual(resp.status_code, 404)
def test_full_payload_with_paid_worker_null_team_and_ot(self):
"""One scenario exercising: paid worker, null team, OT flag, full URL."""
# Create a paid worker on a separate log with no team (null team edge case)
# and with overtime hours but no priced_workers (unpriced OT flag).
project = Project.objects.create(name='P-full')
paid_worker = Worker.objects.create(
name='Paula', id_number='P1', monthly_salary=Decimal('4000')
)
log = WorkLog.objects.create(
date=datetime.date(2026, 4, 11),
project=project,
team=None, # null team
supervisor=self.admin,
overtime_amount=Decimal('1.50'), # > 0 and no priced_workers -> flag fires
)
log.workers.add(paid_worker)
# Link a PayrollRecord so Paula shows as Paid
record = PayrollRecord.objects.create(
worker=paid_worker,
amount_paid=Decimal('250.00'),
date=datetime.date(2026, 4, 16),
)
record.work_logs.add(log)
# Link an adjustment to the same log
adj = PayrollAdjustment.objects.create(
worker=paid_worker,
project=project,
type='Overtime',
amount=Decimal('75.00'),
date=datetime.date(2026, 4, 11),
description='OT pricing',
work_log=log,
payroll_record=record,
)
self.client.login(username='admin', password='pass')
resp = self.client.get(reverse('work_log_payroll_ajax', args=[log.id]))
self.assertEqual(resp.status_code, 200)
data = resp.json()
# Null team branch
self.assertIsNone(data['team'])
# Project is still present
self.assertEqual(data['project']['name'], 'P-full')
# Paid worker serialization shape
paula_row = next(r for r in data['worker_rows'] if r['worker_name'] == 'Paula')
self.assertEqual(paula_row['status'], 'Paid')
self.assertEqual(paula_row['payroll_record_id'], record.pk)
self.assertEqual(paula_row['paid_date'], '2026-04-16')
# OT flag fires (overtime > 0, no priced_workers)
self.assertTrue(data['overtime_needs_pricing'])
# full_page_url is the reverse of work_log_payroll_detail
self.assertEqual(data['full_page_url'], f'/history/{log.id}/')
# Adjustment serialized with payslip link
self.assertEqual(len(data['adjustments']), 1)
self.assertEqual(data['adjustments'][0]['type'], 'Overtime')
self.assertEqual(data['adjustments'][0]['payroll_record_id'], record.pk)
# === TESTS FOR THE WORK LOG PAYROLL FULL-PAGE VIEW ===
# These cover the HTML page at /history/<id>/ that shares the same context
# builder as the AJAX endpoint. Admin sees a 200 HTML page; supervisor 403.
class WorkLogPayrollDetailTests(TestCase):
"""Tests for the full-page /history/<id>/ view."""
def setUp(self):
self.admin = User.objects.create_user(
username='admin', password='pass', is_staff=True
)
self.supervisor = User.objects.create_user(
username='sup', password='pass', is_staff=False
)
project = Project.objects.create(name='P2')
team = Team.objects.create(name='T2', supervisor=self.admin)
worker = Worker.objects.create(name='Wanda', id_number='X', monthly_salary=Decimal('4000'))
self.log = WorkLog.objects.create(
date=datetime.date(2026, 4, 10),
project=project, team=team, supervisor=self.admin,
)
self.log.workers.add(worker)
def test_admin_gets_full_page(self):
self.client.login(username='admin', password='pass')
url = reverse('work_log_payroll_detail', args=[self.log.id])
resp = self.client.get(url)
self.assertEqual(resp.status_code, 200)
self.assertContains(resp, 'FoxFitt')
self.assertContains(resp, 'History')
self.assertContains(resp, 'Wanda')
def test_supervisor_forbidden(self):
self.client.login(username='sup', password='pass')
url = reverse('work_log_payroll_detail', args=[self.log.id])
resp = self.client.get(url)
self.assertEqual(resp.status_code, 403)
# =============================================================================
# === TESTS FOR PAYROLL REPORT FILTER INFLATION ===
# Regression tests for the M2M double-JOIN bug in _build_report_context.
#
# THE BUG (fixed 2026-04-23):
# Filtering a report by project AND team via chained `.filter(work_logs__field=X)`
# calls produced TWO separate JOIN aliases on the core_payrollrecord_work_logs
# M2M table. Any downstream `.values().annotate(Sum(...))` then aggregated
# across the cartesian product of matching rows, inflating every per-worker
# total_paid and every payments_by_date amount by N² (where N = number of
# matching work logs per record). Total_paid_out itself was correct because
# `.aggregate(Sum(...))` wraps distinct() in a subquery, but the per-row
# values/annotate pattern doesn't get that help.
#
# These tests lock down the fix: with filters applied, the worker-level and
# date-level totals must equal the real payment amount, not a multiplied one.
# =============================================================================
class ReportContextFilterInflationTests(TestCase):
"""Report aggregates must not inflate when project + team filters combine."""
def setUp(self):
self.admin = User.objects.create_user(username='admin-r', is_staff=True)
self.project = Project.objects.create(name='Solar Farm Gamma')
self.team = Team.objects.create(name='Team Gamma', supervisor=self.admin)
self.worker = Worker.objects.create(
name='Test Worker', id_number='TW1', monthly_salary=Decimal('4000')
)
# Worker must be in the team's M2M for the adjustment-summary test
# to find them via worker__teams__id=team_id. In real data this is
# the standard setup — workers belong to a team.
self.team.workers.add(self.worker)
# Three work logs in the range, all in the same project + team.
# This is the minimum setup that reproduces the N² inflation: with
# one payroll record linked to 3 logs, the double-JOIN produces 9 rows.
self.logs = []
for day in (5, 10, 15):
log = WorkLog.objects.create(
date=datetime.date(2026, 3, day),
project=self.project,
team=self.team,
supervisor=self.admin,
)
log.workers.add(self.worker)
self.logs.append(log)
# One payment covering all 3 logs.
self.record = PayrollRecord.objects.create(
worker=self.worker,
amount_paid=Decimal('600.00'),
date=datetime.date(2026, 3, 20),
)
self.record.work_logs.add(*self.logs)
def _ctx(self, project_ids=None, team_ids=None):
return _build_report_context(
datetime.date(2026, 3, 1),
datetime.date(2026, 3, 31),
project_ids=project_ids,
team_ids=team_ids,
)
def test_worker_breakdown_not_inflated_with_project_filter_only(self):
ctx = self._ctx(project_ids=[self.project.id])
self.assertEqual(len(ctx['worker_breakdown']), 1)
# Pre-fix: this was 600 × 3 = 1800 (one JOIN, 3-way inflation).
self.assertEqual(ctx['worker_breakdown'][0]['total_paid'], Decimal('600.00'))
def test_worker_breakdown_not_inflated_with_both_filters(self):
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
self.assertEqual(len(ctx['worker_breakdown']), 1)
# Pre-fix: this was 600 × 3 × 3 = 5400 (two JOINs, 9-way inflation).
self.assertEqual(ctx['worker_breakdown'][0]['total_paid'], Decimal('600.00'))
def test_payments_by_date_not_inflated_with_both_filters(self):
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
payments = list(ctx['payments_by_date'])
self.assertEqual(len(payments), 1)
self.assertEqual(payments[0]['total'], Decimal('600.00'))
def test_total_paid_out_stays_correct_with_both_filters(self):
"""Regression guard: total_paid_out was ALREADY correct pre-fix
because .aggregate() handles distinct() via a subquery. Lock it in
so a future refactor doesn't accidentally reintroduce inflation here."""
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
self.assertEqual(ctx['total_paid_out'], Decimal('600.00'))
def test_adjustment_summary_not_inflated_with_team_filter(self):
"""Adjustments filtered by team go through worker__teams (M2M) — same
bug class. values().annotate(Sum()) would inflate if the worker is in
multiple teams or if the JOIN is chained with other M2M filters."""
PayrollAdjustment.objects.create(
worker=self.worker,
project=self.project,
type='Bonus',
amount=Decimal('100.00'),
date=datetime.date(2026, 3, 10),
description='Test bonus',
)
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
totals = {item['type']: item['total'] for item in ctx['adjustment_totals']}
self.assertEqual(totals.get('Bonus'), Decimal('100.00'))
# =============================================================================
# === TESTS FOR SUPERVISOR PICKER QUERYSET ===
# Regression tests for the TeamForm/ProjectForm supervisor dropdown.
#
# THE BUG (fixed 2026-04-23):
# The picker required either admin flags (is_staff/is_superuser) or "Work Logger"
# group membership. Any active user NOT pre-enrolled in that specific group
# was invisible, even though the app's downstream `is_supervisor()` helper
# grants supervisor powers to anyone assigned to a team/project FK/M2M —
# regardless of group. The picker was strictly more restrictive than the
# permission model, hiding field supervisors from the person trying to assign
# them. Fix: allow any active user; let the act of assignment confer
# supervisor-ness (matching how `is_supervisor` works).
# =============================================================================
class SupervisorPickerQuerysetTests(TestCase):
"""Supervisor picker must surface all active users, not just admins."""
def test_regular_active_user_is_selectable(self):
"""A plain active user (no is_staff, no is_superuser, no Work Logger
group) must appear. Pre-fix: excluded by the Q filter."""
from core.forms import _supervisor_user_queryset
regular = User.objects.create_user(
username='field_supervisor', password='pass',
is_staff=False, is_superuser=False,
)
self.assertIn(regular, _supervisor_user_queryset())
def test_user_in_unrelated_group_is_still_selectable(self):
"""A user in some random group (not 'Work Logger') is still an active
user and must show up. Pre-fix: excluded because the Q filter specifically
checked groups__name='Work Logger'."""
from django.contrib.auth.models import Group
from core.forms import _supervisor_user_queryset
user = User.objects.create_user(username='misc_group_user', password='pass')
group, _ = Group.objects.get_or_create(name='Some Other Group')
user.groups.add(group)
self.assertIn(user, _supervisor_user_queryset())
def test_inactive_user_still_excluded(self):
"""Inactive users must NEVER appear in the picker — even if they used
to be admins or Work Loggers. Active-only is the one hard guardrail."""
from core.forms import _supervisor_user_queryset
inactive = User.objects.create_user(
username='former_employee', password='pass',
is_active=False, is_staff=True,
)
self.assertNotIn(inactive, _supervisor_user_queryset())
def test_admin_still_selectable(self):
"""Defense-in-depth: admins continue to appear (no regression for the
existing use case)."""
from core.forms import _supervisor_user_queryset
admin = User.objects.create_user(
username='an_admin', password='pass', is_staff=True
)
self.assertIn(admin, _supervisor_user_queryset())
# =============================================================================
# === TESTS FOR EXECUTIVE REPORT v2 ===
# Covers the new helpers introduced in the report rebuild (Apr 2026):
# _company_cost_velocity, _current_outstanding_in_scope, _team_project_activity,
# and the multi-filter extension of _build_report_context.
# =============================================================================
class CompanyCostVelocityTests(TestCase):
"""Company-wide avg daily and monthly labour cost (hero KPI card 3 & 4)."""
def test_empty_db_returns_zero(self):
from core.views import _company_cost_velocity
result = _company_cost_velocity()
self.assertEqual(result['avg_daily'], Decimal('0.00'))
self.assertEqual(result['avg_monthly'], Decimal('0.00'))
self.assertEqual(result['working_days'], 0)
def test_known_values(self):
from core.views import _company_cost_velocity
# Setup: 2 workers (daily_rate = 4000/20 = R 200 each), each works 5 distinct dates.
# Lifetime cost = 2 workers * 5 days * R 200 = R 2000. Working days = 5.
# Avg daily = 2000 / 5 = R 400.
# Avg monthly = 400 * 30.44 = R 12,176.
admin = User.objects.create_user(username='admin-cv', is_staff=True)
project = Project.objects.create(name='P')
w1 = Worker.objects.create(name='W1', id_number='W1', monthly_salary=Decimal('4000'))
w2 = Worker.objects.create(name='W2', id_number='W2', monthly_salary=Decimal('4000'))
for d in range(1, 6): # 5 distinct dates
log = WorkLog.objects.create(
date=datetime.date(2026, 3, d),
project=project, supervisor=admin,
)
log.workers.add(w1, w2)
result = _company_cost_velocity()
self.assertEqual(result['working_days'], 5)
self.assertEqual(result['avg_daily'], Decimal('400.00'))
# Tolerance: ±1 cent for the 30.44 multiplication
self.assertAlmostEqual(
float(result['avg_monthly']), 12176.00, delta=0.01
)
def test_duplicate_dates_not_double_counted(self):
"""Two workers working the same date = 1 distinct date, not 2."""
from core.views import _company_cost_velocity
admin = User.objects.create_user(username='admin-cv2', is_staff=True)
project = Project.objects.create(name='P2')
w1 = Worker.objects.create(name='X', id_number='X1', monthly_salary=Decimal('4000'))
w2 = Worker.objects.create(name='Y', id_number='Y1', monthly_salary=Decimal('4000'))
log = WorkLog.objects.create(
date=datetime.date(2026, 3, 1), project=project, supervisor=admin,
)
log.workers.add(w1, w2)
result = _company_cost_velocity()
self.assertEqual(result['working_days'], 1) # not 2
class ReportContextDeadKeysTests(TestCase):
"""Regression for Finding 14: the report context used to emit
`year_projects`, `year_teams`, and `current_year` — none of which
any template consumed. Removing them saved 2 GROUP BY queries per
/report/ render. This test ensures they don't come back."""
def setUp(self):
self.admin = User.objects.create_user(username='dead-keys', is_staff=True)
def test_year_projects_not_in_context(self):
ctx = _build_report_context(
datetime.date(2026, 1, 1), datetime.date(2026, 1, 31),
)
self.assertNotIn('year_projects', ctx)
self.assertNotIn('year_teams', ctx)
self.assertNotIn('current_year', ctx)
class CompanyCostVelocitySQLAggregateTests(TestCase):
"""Regression for Finding 16: _company_cost_velocity used to do a
Python loop summing worker.daily_rate. Now uses a single SQL
aggregate. Both implementations must produce the same result."""
def setUp(self):
self.admin = User.objects.create_user(username='cv-sql', is_staff=True)
self.project = Project.objects.create(name='SQL CV')
# 3 workers at different salaries so the result is sensitive to
# any miscalculation (multiplier, division order, etc).
self.w1 = Worker.objects.create(name='A', id_number='C1', monthly_salary=Decimal('4000'))
self.w2 = Worker.objects.create(name='B', id_number='C2', monthly_salary=Decimal('6000'))
self.w3 = Worker.objects.create(name='C', id_number='C3', monthly_salary=Decimal('8000'))
# Log on 3 dates, different worker combos
log1 = WorkLog.objects.create(
date=datetime.date(2026, 3, 1), project=self.project, supervisor=self.admin,
)
log1.workers.add(self.w1, self.w2, self.w3) # All three
log2 = WorkLog.objects.create(
date=datetime.date(2026, 3, 2), project=self.project, supervisor=self.admin,
)
log2.workers.add(self.w1, self.w2)
log3 = WorkLog.objects.create(
date=datetime.date(2026, 3, 3), project=self.project, supervisor=self.admin,
)
log3.workers.add(self.w3)
def test_sql_aggregate_matches_python_loop(self):
"""SQL aggregate must match the result of summing daily_rate in Python."""
from core.views import _company_cost_velocity
# Expected via Python loop (the OLD implementation)
total_cost_python = Decimal('0.00')
for wl in WorkLog.objects.prefetch_related('workers').all():
for worker in wl.workers.all():
total_cost_python += worker.daily_rate
# 3 distinct dates
expected_days = 3
expected_avg_daily = (total_cost_python / expected_days).quantize(Decimal('0.01'))
result = _company_cost_velocity()
self.assertEqual(result['working_days'], expected_days)
self.assertEqual(result['avg_daily'], expected_avg_daily)
class CurrentOutstandingInScopeTests(TestCase):
"""Hero card 2 — 'Outstanding NOW' with optional filter scope."""
def setUp(self):
self.admin = User.objects.create_user(username='a-out', is_staff=True)
self.p1 = Project.objects.create(name='ProjA')
self.p2 = Project.objects.create(name='ProjB')
self.t1 = Team.objects.create(name='TeamA', supervisor=self.admin)
self.w = Worker.objects.create(
name='Wkr', id_number='W1', monthly_salary=Decimal('4000')
)
self.t1.workers.add(self.w)
# Unpaid log on project 1
log1 = WorkLog.objects.create(
date=datetime.date(2026, 3, 1),
project=self.p1, team=self.t1, supervisor=self.admin,
)
log1.workers.add(self.w)
# Unpaid log on project 2
log2 = WorkLog.objects.create(
date=datetime.date(2026, 3, 2),
project=self.p2, team=self.t1, supervisor=self.admin,
)
log2.workers.add(self.w)
def test_no_filters_includes_all_projects(self):
from core.views import _current_outstanding_in_scope
result = _current_outstanding_in_scope()
# daily_rate = 4000/20 = 200; 2 unpaid logs * 200 = 400
self.assertEqual(result['total'], Decimal('400.00'))
self.assertEqual(len(result['by_project']), 2)
def test_project_filter_scopes_total(self):
from core.views import _current_outstanding_in_scope
result = _current_outstanding_in_scope(project_ids=[self.p1.id])
self.assertEqual(result['total'], Decimal('200.00'))
self.assertEqual(len(result['by_project']), 1)
self.assertEqual(result['by_project'][0]['name'], 'ProjA')
def test_team_filter_scopes_total(self):
"""Team filter on work logs + worker__teams on adjustments."""
from core.views import _current_outstanding_in_scope
# Adjustment on a worker not in t1
other_worker = Worker.objects.create(
name='Other', id_number='O1', monthly_salary=Decimal('4000')
)
PayrollAdjustment.objects.create(
worker=other_worker, project=self.p1, type='Bonus',
amount=Decimal('500.00'), date=datetime.date(2026, 3, 3),
)
# With team filter, only self.w's logs appear — R 400 total
result = _current_outstanding_in_scope(team_ids=[self.t1.id])
self.assertEqual(result['total'], Decimal('400.00'))
# The R500 bonus on other_worker must NOT appear in by_project because
# that worker isn't in t1 — the team scope excludes them entirely.
self.assertEqual(len(result['by_project']), 2)
amounts = [row['amount'] for row in result['by_project']]
self.assertNotIn(Decimal('500.00'), amounts)
class ComputeOutstandingProjectIdKeyingTests(TestCase):
"""Regression for Finding 8: outstanding_by_project must key by
project_id (not name) so two projects with identical names stay
separate rows in the home dashboard's 'Outstanding by Project' card."""
def setUp(self):
self.admin = User.objects.create_user(username='a-pidkey', is_staff=True)
# Two distinct projects that happen to share a name (e.g. a
# historic project that was renamed and a new one reusing the
# same label — perfectly possible in this codebase).
self.p1 = Project.objects.create(name='Solar Phase 1')
self.p2 = Project.objects.create(name='Solar Phase 1')
self.w = Worker.objects.create(
name='W', id_number='W-PID', monthly_salary=Decimal('4000'),
)
log1 = WorkLog.objects.create(
date=datetime.date(2026, 3, 1),
project=self.p1, supervisor=self.admin,
)
log1.workers.add(self.w)
log2 = WorkLog.objects.create(
date=datetime.date(2026, 3, 2),
project=self.p2, supervisor=self.admin,
)
log2.workers.add(self.w)
def test_same_named_projects_stay_separate_in_sorted_list(self):
from core.views import _compute_outstanding
result = _compute_outstanding()
# Both rows should appear separately — keyed by project_id.
same_named = [
r for r in result['outstanding_by_project_sorted']
if r['name'] == 'Solar Phase 1'
]
self.assertEqual(
len(same_named), 2,
'Two distinct projects sharing a name must stay separate '
'(keyed by project_id, not name).'
)
# Their amounts must each be R 200 (daily_rate of R 4000/20).
for row in same_named:
self.assertEqual(row['amount'], Decimal('200.00'))
# And their ids must differ.
ids = {row['id'] for row in same_named}
self.assertEqual(len(ids), 2)
class ComputeOutstandingActiveScopeTests(TestCase):
"""Regression for Findings 7/17: home dashboard and payroll dashboard
used to produce DIFFERENT outstanding totals because home included
inactive workers' unpaid wages while payroll dashboard didn't.
_compute_outstanding now defaults to active workers only — matching
the payroll dashboard."""
def setUp(self):
self.admin = User.objects.create_user(username='a-active', is_staff=True)
self.project = Project.objects.create(name='ActiveCheck')
self.active_worker = Worker.objects.create(
name='Active', id_number='AC1', monthly_salary=Decimal('4000'), active=True,
)
self.inactive_worker = Worker.objects.create(
name='Inactive', id_number='IN1', monthly_salary=Decimal('4000'), active=False,
)
log = WorkLog.objects.create(
date=datetime.date(2026, 3, 1),
project=self.project, supervisor=self.admin,
)
log.workers.add(self.active_worker, self.inactive_worker)
def test_default_excludes_inactive_workers(self):
from core.views import _compute_outstanding
result = _compute_outstanding()
# Only the active worker's R 200 daily rate should be counted.
self.assertEqual(result['unpaid_wages'], Decimal('200.00'))
self.assertEqual(result['outstanding_payments'], Decimal('200.00'))
def test_include_inactive_workers_flag(self):
from core.views import _compute_outstanding
result = _compute_outstanding(include_inactive_workers=True)
# Both workers counted — total R 400.
self.assertEqual(result['unpaid_wages'], Decimal('400.00'))
self.assertEqual(result['outstanding_payments'], Decimal('400.00'))
def test_unpaid_adj_on_inactive_worker_excluded_by_default(self):
"""Unpaid adjustments on inactive workers should ALSO be excluded
by default, so the dashboards don't include phantom liabilities
from deactivated workers."""
from core.views import _compute_outstanding
# Add an unpaid bonus to the inactive worker
PayrollAdjustment.objects.create(
worker=self.inactive_worker, project=self.project,
type='Bonus', amount=Decimal('500.00'),
date=datetime.date(2026, 3, 2),
)
result = _compute_outstanding()
# The R 500 bonus on the inactive worker must NOT be counted
self.assertEqual(result['pending_adj_add'], Decimal('0.00'))
# And SHOULD be counted when explicitly requested
result_with = _compute_outstanding(include_inactive_workers=True)
self.assertEqual(result_with['pending_adj_add'], Decimal('500.00'))
class TeamProjectActivityTests(TestCase):
"""Chapter IV pivot: rows=team, columns=project, cell=distinct log dates."""
def setUp(self):
self.admin = User.objects.create_user(username='a-tpa', is_staff=True)
self.p1 = Project.objects.create(name='P1')
self.p2 = Project.objects.create(name='P2')
self.t1 = Team.objects.create(name='T1', supervisor=self.admin)
self.t2 = Team.objects.create(name='T2', supervisor=self.admin)
w = Worker.objects.create(name='W', id_number='W1', monthly_salary=Decimal('4000'))
# T1 works 3 distinct dates on P1
for d in (1, 2, 3):
log = WorkLog.objects.create(
date=datetime.date(2026, 3, d), project=self.p1, team=self.t1,
supervisor=self.admin,
)
log.workers.add(w)
# T2 works 2 distinct dates on P1 and 1 on P2
for d in (4, 5):
log = WorkLog.objects.create(
date=datetime.date(2026, 3, d), project=self.p1, team=self.t2,
supervisor=self.admin,
)
log.workers.add(w)
log = WorkLog.objects.create(
date=datetime.date(2026, 3, 6), project=self.p2, team=self.t2,
supervisor=self.admin,
)
log.workers.add(w)
self.logs_qs = WorkLog.objects.filter(
date__gte=datetime.date(2026, 3, 1),
date__lte=datetime.date(2026, 3, 31),
)
def test_pivot_shape(self):
from core.views import _team_project_activity
r = _team_project_activity(self.logs_qs)
# 2 columns (P1, P2), 2 rows (T1, T2)
self.assertEqual(len(r['columns']), 2)
self.assertEqual(len(r['rows']), 2)
def test_cell_counts(self):
from core.views import _team_project_activity
r = _team_project_activity(self.logs_qs)
rows = {row['team_name']: row for row in r['rows']}
# T1 has 3 days on P1, 0 on P2
self.assertEqual(rows['T1']['cells_by_project_id'][self.p1.id], 3)
self.assertEqual(rows['T1']['cells_by_project_id'].get(self.p2.id, 0), 0)
# T2 has 2 days on P1, 1 on P2
self.assertEqual(rows['T2']['cells_by_project_id'][self.p1.id], 2)
self.assertEqual(rows['T2']['cells_by_project_id'][self.p2.id], 1)
def test_row_and_column_totals(self):
from core.views import _team_project_activity
r = _team_project_activity(self.logs_qs)
rows = {row['team_name']: row for row in r['rows']}
self.assertEqual(rows['T1']['row_total'], 3)
self.assertEqual(rows['T2']['row_total'], 3)
self.assertEqual(r['col_totals'][self.p1.id], 5)
self.assertEqual(r['col_totals'][self.p2.id], 1)
self.assertEqual(r['grand_total'], 6)
def test_team_with_no_logs_omitted(self):
"""Team with zero logs in the period should not appear as a row."""
from core.views import _team_project_activity
Team.objects.create(name='GhostTeam', supervisor=self.admin)
r = _team_project_activity(self.logs_qs)
team_names = [row['team_name'] for row in r['rows']]
self.assertNotIn('GhostTeam', team_names)
class ChapterOneEnrichmentTests(TestCase):
"""Chapter I — All Time Projects gains working_days and avg_per_working_day."""
def test_alltime_projects_includes_working_days_and_avg(self):
from core.views import _build_report_context
admin = User.objects.create_user(username='c1', is_staff=True)
proj = Project.objects.create(name='C1', start_date=datetime.date(2026, 1, 1))
w = Worker.objects.create(name='W', id_number='W1', monthly_salary=Decimal('4000'))
# 4 distinct dates, 1 worker each; daily_rate=200; total = R 800; working_days=4; avg=200
for d in (1, 2, 3, 4):
log = WorkLog.objects.create(
date=datetime.date(2026, 3, d), project=proj, supervisor=admin,
)
log.workers.add(w)
ctx = _build_report_context(
datetime.date(2026, 1, 1), datetime.date(2026, 12, 31),
)
by_name = {p['project']: p for p in ctx['alltime_projects']}
self.assertIn('C1', by_name)
self.assertEqual(by_name['C1']['working_days'], 4)
self.assertEqual(by_name['C1']['avg_per_working_day'], Decimal('200.00'))
self.assertEqual(by_name['C1']['start_date'], datetime.date(2026, 1, 1))
# last_activity = the most recent WorkLog.date (4th of March here)
self.assertEqual(
by_name['C1']['last_activity'], datetime.date(2026, 3, 4),
'alltime_projects rows should expose the most-recent log date '
'so the report can show "Last Activity" per project'
)
# =============================================================================
# === TESTS FOR MULTI-VALUE FILTER SUPPORT (Task 6) ===
# _build_report_context now accepts project_ids and team_ids lists.
# =============================================================================
class ReportMultiFilterTests(TestCase):
"""Task 6 — multi-value project_ids / team_ids filters."""
def setUp(self):
self.admin = User.objects.create_user(username='mf', is_staff=True)
self.p1 = Project.objects.create(name='P1')
self.p2 = Project.objects.create(name='P2')
self.p3 = Project.objects.create(name='P3')
self.team = Team.objects.create(name='T', supervisor=self.admin)
self.w = Worker.objects.create(
name='W', id_number='W1', monthly_salary=Decimal('4000')
)
self.team.workers.add(self.w)
# One log + one paid record per project
for proj in (self.p1, self.p2, self.p3):
log = WorkLog.objects.create(
date=datetime.date(2026, 3, 1),
project=proj, team=self.team, supervisor=self.admin,
)
log.workers.add(self.w)
rec = PayrollRecord.objects.create(
worker=self.w, amount_paid=Decimal('100.00'),
date=datetime.date(2026, 3, 5),
)
rec.work_logs.add(log)
def _ctx(self, project_ids=None, team_ids=None):
from core.views import _build_report_context
return _build_report_context(
datetime.date(2026, 3, 1), datetime.date(2026, 3, 31),
project_ids=project_ids, team_ids=team_ids,
)
def test_multi_project_union(self):
ctx = self._ctx(project_ids=[self.p1.id, self.p2.id])
# Two projects paid R 100 each = R 200; third excluded
self.assertEqual(ctx['total_paid_out'], Decimal('200.00'))
def test_empty_list_equals_none(self):
ctx_none = self._ctx(project_ids=None)
ctx_empty = self._ctx(project_ids=[])
self.assertEqual(ctx_none['total_paid_out'], ctx_empty['total_paid_out'])
def test_no_inflation_with_multi_project(self):
"""Worker breakdown must not inflate when multiple projects are selected."""
ctx = self._ctx(project_ids=[self.p1.id, self.p2.id, self.p3.id])
self.assertEqual(len(ctx['worker_breakdown']), 1)
# All three records are for the same worker, R 100 each = R 300
self.assertEqual(ctx['worker_breakdown'][0]['total_paid'], Decimal('300.00'))
# =============================================================================
# === TESTS FOR INLINE FILTERS (Report Page) ===
# Pill-as-dropdown + cross-filter feature. Most behaviour is template/JS;
# the only backend surface is the project_team_pairs_json context key that
# powers the client-side Team<->Project cross-filter.
# =============================================================================
class InlineFiltersPairsContextTests(TestCase):
"""Report view must serialise distinct (project_id, team_id) pairs for
the pill-popover cross-filter JS."""
def setUp(self):
self.admin = User.objects.create_user(
username='admin-if', password='pass', is_staff=True
)
self.p1 = Project.objects.create(name='P1')
self.p2 = Project.objects.create(name='P2')
self.t1 = Team.objects.create(name='T1', supervisor=self.admin)
self.t2 = Team.objects.create(name='T2', supervisor=self.admin)
self.w = Worker.objects.create(
name='W', id_number='W1', monthly_salary=Decimal('4000')
)
# Log t1 on p1, t2 on p2 — so pairs should be [(p1,t1), (p2,t2)]
for proj, team in [(self.p1, self.t1), (self.p2, self.t2)]:
log = WorkLog.objects.create(
date=datetime.date(2026, 3, 1),
project=proj, team=team, supervisor=self.admin,
)
log.workers.add(self.w)
def test_pairs_context_key_populated(self):
# The context value is a raw Python list of dicts; Django's
# |json_script filter handles the single JSON serialisation at
# template render time (no double-encoding).
self.client.login(username='admin-if', password='pass')
url = reverse('generate_report')
resp = self.client.get(url + '?from_month=2026-03&to_month=2026-04')
self.assertEqual(resp.status_code, 200)
pairs = resp.context['project_team_pairs_json']
# Each entry has both project_id and team_id
for p in pairs:
self.assertIn('project_id', p)
self.assertIn('team_id', p)
# Expected pairs (as tuples for set comparison)
pair_set = {(p['project_id'], p['team_id']) for p in pairs}
self.assertIn((self.p1.id, self.t1.id), pair_set)
self.assertIn((self.p2.id, self.t2.id), pair_set)
def test_pairs_excludes_null_project_or_team(self):
"""Logs with null project or null team should not appear in pairs."""
# Add a log with team=None
log = WorkLog.objects.create(
date=datetime.date(2026, 3, 2),
project=self.p1, team=None, supervisor=self.admin,
)
log.workers.add(self.w)
self.client.login(username='admin-if', password='pass')
resp = self.client.get(reverse('generate_report') + '?from_month=2026-03&to_month=2026-04')
pairs = resp.context['project_team_pairs_json']
# No pair should have team_id=None
self.assertTrue(all(p['team_id'] is not None for p in pairs))
def test_pairs_renders_as_valid_json_in_template(self):
"""End-to-end: the rendered HTML must contain a single, valid JSON
array inside the <script id="projectTeamPairs"> tag — NOT a
JSON-encoded string (which was the bug that broke all pill
interactions before the context key was changed from
`json.dumps(pairs)` to raw `pairs`)."""
import json as _json
import re
self.client.login(username='admin-if', password='pass')
resp = self.client.get(reverse('generate_report') + '?from_month=2026-03&to_month=2026-04')
html = resp.content.decode('utf-8')
# Extract the JSON payload inside <script id="projectTeamPairs">...</script>
match = re.search(
r'<script id="projectTeamPairs"[^>]*>(.*?)</script>',
html, re.DOTALL
)
self.assertIsNotNone(match, 'projectTeamPairs <script> tag missing')
payload = match.group(1).strip()
# Must parse to a LIST, not a string.
parsed = _json.loads(payload)
self.assertIsInstance(parsed, list,
"Double-encoded JSON regression: browser's JSON.parse "
"would return a string here, killing pairs.forEach() in the "
"pill-popover JS. See 2026-04-23 bugfix.")
# And the list members must be dicts with project_id + team_id
for entry in parsed:
self.assertIsInstance(entry, dict)
self.assertIn('project_id', entry)
self.assertIn('team_id', entry)
def test_pickers_and_pairs_are_date_scoped(self):
"""Checkpoint-1 refinement: projects/teams lists + the pair map
include only entries with WorkLog activity INSIDE the selected
date range — NOT entire-history entries. Entries that are in the
URL's `?project=` or `?team=` selection are always preserved,
though, so the user's pick can never vanish."""
# Add a third project/team that ONLY worked outside the report window
out_project = Project.objects.create(name='P-out')
out_team = Team.objects.create(name='T-out', supervisor=self.admin)
out_log = WorkLog.objects.create(
date=datetime.date(2026, 1, 15), # outside March window
project=out_project, team=out_team, supervisor=self.admin,
)
out_log.workers.add(self.w)
self.client.login(username='admin-if', password='pass')
# Request only March 2026 — Jan logs should be excluded
resp = self.client.get(
reverse('generate_report') + '?from_month=2026-03&to_month=2026-03'
)
self.assertEqual(resp.status_code, 200)
# Picker lists: out-of-range project + team should NOT appear
project_ids = {p.id for p in resp.context['projects']}
team_ids = {t.id for t in resp.context['teams']}
self.assertIn(self.p1.id, project_ids)
self.assertIn(self.p2.id, project_ids)
self.assertNotIn(out_project.id, project_ids,
'Out-of-range project must not appear in the date-scoped list')
self.assertIn(self.t1.id, team_ids)
self.assertIn(self.t2.id, team_ids)
self.assertNotIn(out_team.id, team_ids,
'Out-of-range team must not appear in the date-scoped list')
# Pair map: must also be date-scoped
pair_set = {(p['project_id'], p['team_id'])
for p in resp.context['project_team_pairs_json']}
self.assertNotIn((out_project.id, out_team.id), pair_set,
'Out-of-range pair must not appear in the cross-filter map')
def test_url_selected_projects_survive_even_out_of_range(self):
"""A project explicitly in the URL's ?project= selection must
remain in the picker list even if it has no logs in the current
date range — otherwise the user couldn't see (or deselect) their
own pick."""
out_project = Project.objects.create(name='P-out')
# Never logs anything in any date range
self.client.login(username='admin-if', password='pass')
resp = self.client.get(
reverse('generate_report')
+ '?from_month=2026-03&to_month=2026-03'
+ f'&project={out_project.id}'
)
self.assertEqual(resp.status_code, 200)
project_ids = {p.id for p in resp.context['projects']}
self.assertIn(out_project.id, project_ids,
'URL-selected project must survive the date-scope filter')
# =============================================================================
# === TESTS FOR |type_slug FILTER ===
# Used by Adjustments tab to build CSS class names from type labels.
# =============================================================================
class TypeSlugFilterTests(TestCase):
"""format_tags.type_slug converts adjustment-type labels to slugs."""
def test_spaces_become_hyphens_and_lowercased(self):
from core.templatetags.format_tags import type_slug
self.assertEqual(type_slug('Advance Payment'), 'advance-payment')
self.assertEqual(type_slug('New Loan'), 'new-loan')
self.assertEqual(type_slug('Bonus'), 'bonus')
def test_empty_or_none_returns_empty_string(self):
from core.templatetags.format_tags import type_slug
self.assertEqual(type_slug(''), '')
self.assertEqual(type_slug(None), '')
def test_idempotent_on_already_slugged(self):
from core.templatetags.format_tags import type_slug
self.assertEqual(type_slug('bonus'), 'bonus')
# =============================================================================
# === TESTS FOR ADJUSTMENTS TAB (payroll_dashboard ?status=adjustments) ===
# Covers the new tab's backend: filters, sort, stats, pagination.
# Each test creates its own fresh fixture via setUp.
# NOTE: PayrollRecord only accepts worker/date/amount_paid (see core/models.py).
# The plan spec used days_worked/total_amount — those do NOT exist. Adapted.
# =============================================================================
class AdjustmentsTabTests(TestCase):
"""New Adjustments tab on /payroll/?status=adjustments."""
def setUp(self):
self.admin = User.objects.create_user(
username='adj-admin', password='pass', is_staff=True, is_superuser=True
)
self.sup = User.objects.create_user(
username='adj-sup', password='pass'
)
self.w1 = Worker.objects.create(
name='Alice', id_number='A1', monthly_salary=Decimal('4000')
)
self.w2 = Worker.objects.create(
name='Bob', id_number='B1', monthly_salary=Decimal('4000')
)
# Two teams, BOTH workers in BOTH teams, so the naive M2M JOIN
# multiplies rows by team count. Exercises the subquery fix.
self.team = Team.objects.create(name='Alpha', supervisor=self.admin)
self.team2 = Team.objects.create(name='Beta', supervisor=self.admin)
self.team.workers.add(self.w1, self.w2)
self.team2.workers.add(self.w1, self.w2)
self.proj = Project.objects.create(name='Site X')
# 3 unpaid adjustments — 1 bonus Alice, 1 bonus Bob, 1 deduction Alice
self.a1 = PayrollAdjustment.objects.create(
worker=self.w1, project=self.proj, type='Bonus',
amount=Decimal('500'), date=datetime.date(2026, 4, 10),
description='April bonus',
)
self.a2 = PayrollAdjustment.objects.create(
worker=self.w2, project=self.proj, type='Bonus',
amount=Decimal('300'), date=datetime.date(2026, 4, 11),
description='Project milestone',
)
self.a3 = PayrollAdjustment.objects.create(
worker=self.w1, project=self.proj, type='Deduction',
amount=Decimal('100'), date=datetime.date(2026, 3, 28),
description='Missing tool',
)
self.url = reverse('payroll_dashboard') + '?status=adjustments'
def _login_admin(self):
self.client.login(username='adj-admin', password='pass')
def test_admin_sees_adjustments_tab(self):
self._login_admin()
resp = self.client.get(self.url)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.context['active_tab'], 'adjustments')
# All 3 fixture adjustments should be in the listing
self.assertEqual(len(resp.context['adj_page'].object_list), 3)
def test_supervisor_forbidden(self):
self.client.login(username='adj-sup', password='pass')
resp = self.client.get(self.url)
# Existing payroll_dashboard pattern: non-admin is redirected home
self.assertEqual(resp.status_code, 302)
def test_type_multi_filter(self):
"""?type=Bonus&type=Deduction returns the UNION (3 rows: 2 bonuses + 1
deduction), not the intersection."""
self._login_admin()
resp = self.client.get(self.url + '&type=Bonus&type=Deduction')
self.assertEqual(resp.context['adj_total_count'], 3)
ids = {a.id for a in resp.context['adj_page'].object_list}
self.assertEqual(ids, {self.a1.id, self.a2.id, self.a3.id})
def test_worker_multi_filter(self):
self._login_admin()
resp = self.client.get(self.url + f'&worker={self.w1.id}')
ids = {a.id for a in resp.context['adj_page'].object_list}
self.assertIn(self.a1.id, ids)
self.assertNotIn(self.a2.id, ids)
self.assertIn(self.a3.id, ids)
def test_team_filter_uses_subquery_no_inflation(self):
"""Filtering by team must NOT multiply rows. With 2 teams x 2 workers x 3
adjustments, a naive worker__teams__id__in filter would return 6 inflated
rows; the subquery pattern returns the true 3. See CLAUDE.md ORM gotcha."""
self._login_admin()
resp = self.client.get(
self.url + f'&team={self.team.id}&team={self.team2.id}'
)
# .count() at the queryset level would blow up under inflation —
# asserting it guards against regressions more strictly than checking
# the paginator's object_list length.
self.assertEqual(resp.context['adj_total_count'], 3)
self.assertEqual(len(resp.context['adj_page'].object_list), 3)
def test_status_filter_unpaid(self):
self._login_admin()
# Mark a1 as paid — PayrollRecord fields are worker/date/amount_paid
pr = PayrollRecord.objects.create(
worker=self.w1, date=datetime.date(2026, 4, 15),
amount_paid=Decimal('4000'),
)
self.a1.payroll_record = pr
self.a1.save()
resp = self.client.get(self.url + '&adj_status=unpaid')
ids = {a.id for a in resp.context['adj_page'].object_list}
self.assertNotIn(self.a1.id, ids)
self.assertIn(self.a2.id, ids)
self.assertIn(self.a3.id, ids)
def test_date_range_filter(self):
self._login_admin()
# March 1 to March 31 -> only a3 (dated 28 Mar)
resp = self.client.get(
self.url + '&adj_date_from=2026-03-01&adj_date_to=2026-03-31'
)
ids = {a.id for a in resp.context['adj_page'].object_list}
self.assertEqual(ids, {self.a3.id})
def test_stats_scoped_to_filtered_set(self):
self._login_admin()
resp = self.client.get(self.url + '&type=Bonus')
# 2 bonuses, 0 paid, total R 800 additive, 0 deductive
self.assertEqual(resp.context['adj_total_count'], 2)
self.assertEqual(resp.context['adj_unpaid_count'], 2)
self.assertEqual(resp.context['adj_additive_sum'], Decimal('800.00'))
self.assertEqual(resp.context['adj_deductive_sum'], Decimal('0.00'))
def test_group_by_type(self):
self._login_admin()
resp = self.client.get(self.url + '&group_by=type')
groups = resp.context['adj_groups']
self.assertIsNotNone(groups)
labels = {g['label'] for g in groups}
self.assertEqual(labels, {'Bonus', 'Deduction'})
bonus_group = next(g for g in groups if g['label'] == 'Bonus')
self.assertEqual(bonus_group['count'], 2)
self.assertEqual(bonus_group['net_sum'], Decimal('800.00')) # +R 500 + +R 300
deduction_group = next(g for g in groups if g['label'] == 'Deduction')
self.assertEqual(deduction_group['net_sum'], Decimal('-100.00'))
# Groups must be ordered by descending |net_sum| — biggest impact
# first. |800| > |100| so Bonus must come before Deduction.
self.assertEqual(groups[0]['label'], 'Bonus')
def test_group_by_worker(self):
self._login_admin()
resp = self.client.get(self.url + '&group_by=worker')
groups = resp.context['adj_groups']
self.assertIsNotNone(groups)
labels = {g['label'] for g in groups}
self.assertEqual(labels, {'Alice', 'Bob'})
alice = next(g for g in groups if g['label'] == 'Alice')
# Alice: +R 500 bonus + (-R 100) deduction = +R 400 net
self.assertEqual(alice['count'], 2)
self.assertEqual(alice['net_sum'], Decimal('400.00'))
def test_bulk_delete_only_affects_unpaid(self):
"""POST /payroll/adjustments/bulk-delete/ with mixed paid+unpaid IDs
deletes ONLY the unpaid rows. Paid rows are untouched (payroll
history is immutable)."""
self._login_admin()
# Pay a1 (leave a2, a3 unpaid)
pr = PayrollRecord.objects.create(
worker=self.w1, date=datetime.date(2026, 4, 15),
amount_paid=Decimal('4000'),
)
self.a1.payroll_record = pr
self.a1.save()
resp = self.client.post(
reverse('bulk_delete_adjustments'),
{'adjustment_ids': [self.a1.id, self.a2.id, self.a3.id]},
)
self.assertEqual(resp.status_code, 200)
body = resp.json()
self.assertEqual(body['deleted'], 2)
self.assertEqual(body['requested'], 3)
# a1 survives (paid), a2 + a3 gone
self.assertTrue(PayrollAdjustment.objects.filter(id=self.a1.id).exists())
self.assertFalse(PayrollAdjustment.objects.filter(id=self.a2.id).exists())
self.assertFalse(PayrollAdjustment.objects.filter(id=self.a3.id).exists())
def test_bulk_delete_requires_admin(self):
"""Non-admin supervisors cannot bulk-delete adjustments."""
self.client.login(username='adj-sup', password='pass')
resp = self.client.post(
reverse('bulk_delete_adjustments'),
{'adjustment_ids': [self.a1.id]},
)
self.assertEqual(resp.status_code, 403)
# a1 still present
self.assertTrue(PayrollAdjustment.objects.filter(id=self.a1.id).exists())
def test_bulk_delete_cascades_new_loan(self):
"""Bulk-deleting a 'New Loan' adjustment must also delete its
linked Loan row AND any still-unpaid Loan Repayment adjustments
— same cascade as the single-row delete_adjustment view. Without
this, the bulk endpoint would orphan Loan rows and leave pending
repayments in place."""
# Create a Loan + New Loan adjustment + unpaid repayment
loan = Loan.objects.create(
worker=self.w1,
principal_amount=Decimal('1000'),
remaining_balance=Decimal('1000'),
date=datetime.date(2026, 4, 1),
loan_type='loan',
)
new_loan_adj = PayrollAdjustment.objects.create(
worker=self.w1, project=self.proj, type='New Loan',
amount=Decimal('1000'), date=datetime.date(2026, 4, 1),
description='Test loan', loan=loan,
)
unpaid_repayment = PayrollAdjustment.objects.create(
worker=self.w1, project=self.proj, type='Loan Repayment',
amount=Decimal('500'), date=datetime.date(2026, 5, 1),
description='First repayment', loan=loan,
)
self._login_admin()
resp = self.client.post(
reverse('bulk_delete_adjustments'),
{'adjustment_ids': [new_loan_adj.id]},
)
self.assertEqual(resp.status_code, 200)
body = resp.json()
self.assertEqual(body['deleted'], 1)
# New Loan adjustment gone
self.assertFalse(PayrollAdjustment.objects.filter(id=new_loan_adj.id).exists())
# Linked Loan row gone (cascade)
self.assertFalse(Loan.objects.filter(id=loan.id).exists())
# Unpaid repayment gone (cascade)
self.assertFalse(PayrollAdjustment.objects.filter(id=unpaid_repayment.id).exists())
def test_bulk_delete_skips_loan_with_paid_repayments(self):
"""If a 'New Loan' has any paid repayments, bulk-delete must
refuse to delete it (would lose audit trail). Other rows in the
batch are unaffected."""
loan = Loan.objects.create(
worker=self.w1,
principal_amount=Decimal('1000'),
remaining_balance=Decimal('500'),
date=datetime.date(2026, 4, 1),
loan_type='loan',
)
new_loan_adj = PayrollAdjustment.objects.create(
worker=self.w1, project=self.proj, type='New Loan',
amount=Decimal('1000'), date=datetime.date(2026, 4, 1),
description='Test loan', loan=loan,
)
# One PAID repayment
pr = PayrollRecord.objects.create(
worker=self.w1, date=datetime.date(2026, 5, 1),
amount_paid=Decimal('500'),
)
PayrollAdjustment.objects.create(
worker=self.w1, project=self.proj, type='Loan Repayment',
amount=Decimal('500'), date=datetime.date(2026, 5, 1),
description='Paid', loan=loan, payroll_record=pr,
)
self._login_admin()
# Send the New Loan plus a2 (unpaid Bonus) — expect only a2 to delete
resp = self.client.post(
reverse('bulk_delete_adjustments'),
{'adjustment_ids': [new_loan_adj.id, self.a2.id]},
)
self.assertEqual(resp.status_code, 200)
body = resp.json()
self.assertEqual(body['deleted'], 1) # only a2
self.assertEqual(body['requested'], 2)
self.assertEqual(body['skipped_reasons'], {'has_paid_repayments': 1})
# New Loan survives, Loan survives, a2 gone
self.assertTrue(PayrollAdjustment.objects.filter(id=new_loan_adj.id).exists())
self.assertTrue(Loan.objects.filter(id=loan.id).exists())
self.assertFalse(PayrollAdjustment.objects.filter(id=self.a2.id).exists())
def test_team_worker_pairs_json_context_key(self):
"""Cross-filter map is a raw Python list of {team_id, worker_id}
dicts. Django's |json_script filter handles serialisation at
template render time (no double-encoding — see the 2026-04-23
inline-filters regression test)."""
self._login_admin()
resp = self.client.get(self.url)
pairs = resp.context['team_worker_pairs_json']
self.assertIsInstance(pairs, list)
for entry in pairs:
self.assertIn('team_id', entry)
self.assertIn('worker_id', entry)
# Our fixture: two teams (Alpha + Beta) with both workers in each
pair_set = {(p['team_id'], p['worker_id']) for p in pairs}
self.assertIn((self.team.id, self.w1.id), pair_set)
self.assertIn((self.team.id, self.w2.id), pair_set)
self.assertIn((self.team2.id, self.w1.id), pair_set)
self.assertIn((self.team2.id, self.w2.id), pair_set)
# === Cache-bust token tests ===
# The deployment_timestamp context variable controls the ?v=... query
# string on our CSS URL. It MUST only change when custom.css changes
# — otherwise Cloudflare cache-HIT rate on the CSS drops to zero and
# every page reload re-fetches 64 KB from the VM.
class CacheBustTokenTests(TestCase):
"""Regression tests for the mtime-based cache-bust token."""
def test_token_is_an_integer(self):
"""Token must be int (templates cast to str; float would show a dot)."""
from core.context_processors import project_context
ctx = project_context(request=None)
self.assertIsInstance(ctx['deployment_timestamp'], int)
def test_token_is_stable_across_two_calls(self):
"""Critical property: two back-to-back calls return the same
token — because custom.css hasn't changed between them. This
is the entire point of the mtime-based approach."""
from core.context_processors import project_context
t1 = project_context(request=None)['deployment_timestamp']
t2 = project_context(request=None)['deployment_timestamp']
self.assertEqual(t1, t2)
def test_token_falls_back_if_file_missing(self):
"""If static/css/custom.css is somehow missing (fresh
container pre-collectstatic), we must NOT crash. We fall back
to int(time.time()) so every page still renders."""
import core.context_processors as cp
try:
# Monkey-patch so the function sees a guaranteed-missing path.
# (The function itself is not patched — only the path constant.)
cp._CSS_PATH_FOR_TOKEN = cp.Path('/definitely/does/not/exist.css')
# Should return an int and NOT raise.
token = cp._compute_cache_bust_token()
self.assertIsInstance(token, int)
finally:
# Reset the module-level path constant so other tests (or reruns)
# get the real CSS file back.
cp._CSS_PATH_FOR_TOKEN = cp.Path(settings.BASE_DIR) / 'static' / 'css' / 'custom.css'
# === REGRESSION: adjustment project-attribution double-count ===
# Ticket summary: commit 61c485f replaced a per-project loop using SQL
# OR (`Q(project=P) | Q(work_log__project=P)`) with two SEPARATE
# filter+GROUP BY queries that were summed in Python. Any adjustment
# with BOTH `project_id` AND `work_log.project_id` set matched both
# queries and got counted TWICE.
#
# Every Overtime adjustment fits that shape — `price_overtime()` in
# views.py sets both FKs to the same project. So every unpaid
# Overtime silently inflated the outstanding-costs dashboard by its
# own amount. This test locks in the "count once" behaviour.
class PayrollDashboardAdjustmentAggregationTests(TestCase):
"""Regression tests for the per-project adjustment aggregation used by
the payroll dashboard. An adjustment with both `project` and
`work_log.project` set must contribute its amount ONCE to that
project, not twice."""
def setUp(self):
# Admin so the payroll_dashboard view accepts us (is_admin helper
# returns True for is_staff OR is_superuser).
self.admin = User.objects.create_user(
username='double-count-admin',
password='pw',
is_staff=True,
is_superuser=True,
)
self.client.force_login(self.admin)
# One active project, one worker, one fresh work log.
self.project = Project.objects.create(
name='Double-Count Test Site',
start_date=datetime.date(2026, 4, 1),
active=True,
)
self.worker = Worker.objects.create(
name='DC Worker',
id_number='DC1',
monthly_salary=Decimal('10000'),
)
self.work_log = WorkLog.objects.create(
date=datetime.date(2026, 4, 23),
project=self.project,
supervisor=self.admin,
)
# NOTE: deliberately no workers on the log — we do NOT want
# unpaid-log wage cost to pollute this test; we only want to
# measure the adjustment contribution.
# THE KEY SHAPE: one unpaid adjustment with BOTH FKs set to the
# same project. This mirrors how price_overtime() creates every
# Overtime adjustment (adj.project = worklog.project, and
# adj.work_log = worklog).
PayrollAdjustment.objects.create(
worker=self.worker,
type='Overtime', # additive → contributes positively
amount=Decimal('500'),
project=self.project,
work_log=self.work_log,
date=datetime.date(2026, 4, 23),
description='Regression-test Overtime',
)
def test_overtime_with_both_project_and_work_log_counts_once(self):
"""An unpaid Overtime with BOTH project + work_log.project set
must contribute its R500 amount once to outstanding-costs —
not R1000.
Before the Coalesce fix, the batched aggregates summed the
direct-project group + the work_log-project group separately
in Python, so an Overtime adjustment landed in both and got
double-counted. After the fix (Coalesce('project_id',
'work_log__project_id')) each adjustment row is attributed to
exactly one effective project.
"""
response = self.client.get(reverse('payroll_dashboard'))
self.assertEqual(response.status_code, 200)
outstanding = response.context['outstanding_project_costs']
# Find the entry for our test project by name (the shape is
# {'name': str, 'cost': Decimal} — no 'id' key).
ours = next(
(p for p in outstanding if p['name'] == self.project.name),
None,
)
self.assertIsNotNone(
ours,
"Test project should appear in outstanding_project_costs "
"(its unpaid Overtime is non-zero).",
)
self.assertEqual(
ours['cost'],
Decimal('500'),
"Overtime adjustment with both project + work_log.project "
"FKs set must count ONCE (R500), not twice (R1000). If "
"this fails with R1000 the project-attribution double-"
"count bug has reappeared.",
)
# ============================================================================
# === SITE REPORT TESTS ===
# Cover the SiteReport model + form + views (edit, detail) + the
# attendance-log redirect that lands the supervisor on the new report
# form. The model uses a JSONField for `metrics`, so we exercise the
# round-trip (save form -> JSON in DB -> form re-loads from JSON).
# ============================================================================
from core.models import SiteReport
from core.forms import SiteReportForm
class SiteReportModelTests(TestCase):
"""The SiteReport model itself — sanity checks for defaults +
1:1 reverse accessor behaviour."""
def setUp(self):
self.admin = User.objects.create_user(username='sr-admin', is_staff=True)
self.project = Project.objects.create(name='SR Project')
self.work_log = WorkLog.objects.create(
date=datetime.date(2026, 4, 27),
project=self.project,
supervisor=self.admin,
)
def test_metrics_defaults_to_empty_dict(self):
"""A fresh SiteReport with no metrics passed should have an
empty-dict default — never None — so template/view code can
safely call .get('counts', {}) without an AttributeError."""
report = SiteReport.objects.create(work_log=self.work_log)
self.assertEqual(report.metrics, {})
self.assertIsNotNone(report.metrics)
def test_reverse_accessor_does_not_exist_when_absent(self):
"""A WorkLog with no SiteReport: accessing `work_log.site_report`
raises DoesNotExist (1:1 reverse semantics). Templates/views
must wrap this access in try/except or hasattr()."""
with self.assertRaises(SiteReport.DoesNotExist):
self.work_log.site_report # noqa: B018 — the access itself is the test
def test_reverse_accessor_works_when_present(self):
"""Once a SiteReport is created, work_log.site_report returns it."""
report = SiteReport.objects.create(work_log=self.work_log, weather='sunny')
self.assertEqual(self.work_log.site_report, report)
self.assertEqual(self.work_log.site_report.weather, 'sunny')
def test_metrics_round_trip_with_arbitrary_keys(self):
"""JSONField accepts arbitrary keys — we don't enforce schema at
the DB layer; future schema changes won't reject historic data."""
report = SiteReport.objects.create(
work_log=self.work_log,
metrics={
'counts': {'plinths_cast': 8, 'old_metric_no_longer_in_schema': 3},
'checks': {'steel_tied': True},
},
)
report.refresh_from_db()
self.assertEqual(report.metrics['counts']['plinths_cast'], 8)
self.assertEqual(report.metrics['counts']['old_metric_no_longer_in_schema'], 3)
self.assertTrue(report.metrics['checks']['steel_tied'])
class SiteReportFormTests(TestCase):
"""SiteReportForm: dynamic field generation + JSON serialisation."""
def setUp(self):
self.admin = User.objects.create_user(username='srf-admin', is_staff=True)
self.project = Project.objects.create(name='SRF Project')
self.work_log = WorkLog.objects.create(
date=datetime.date(2026, 4, 27),
project=self.project,
supervisor=self.admin,
)
def test_form_dynamically_includes_count_and_check_fields(self):
"""The form's __init__ should attach one field per metric in
the schema — this is what makes adding a new metric a one-line
edit to site_report_schema.py."""
from core.site_report_schema import COUNT_METRICS, CHECK_METRICS
form = SiteReportForm(work_log=self.work_log)
for m in COUNT_METRICS:
self.assertIn(f"count_{m['key']}", form.fields)
for m in CHECK_METRICS:
self.assertIn(f"check_{m['key']}", form.fields)
def test_form_save_serialises_metrics_into_json_blob(self):
"""Submitting count + check fields should produce a metrics
dict with 'counts' and 'checks' top-level keys, populated from
the form input. Blank counts → 0; unchecked checks → False."""
data = {
'weather': 'sunny',
'temperature_min': 18,
'temperature_max': 28,
'notes': 'Hot day, good progress',
'count_plinths_cast': 8,
'count_plinth_holes_dug': 12,
# other count fields left blank → should default to 0
'check_steel_tied': 'on', # checkbox 'on' = checked
# check_boxes_cleaned not posted → False
}
form = SiteReportForm(data=data, work_log=self.work_log)
self.assertTrue(form.is_valid(), msg=form.errors)
report = form.save()
self.assertEqual(report.weather, 'sunny')
self.assertEqual(report.temperature_min, 18)
self.assertEqual(report.temperature_max, 28)
self.assertIn('counts', report.metrics)
self.assertIn('checks', report.metrics)
self.assertEqual(report.metrics['counts']['plinths_cast'], 8)
self.assertEqual(report.metrics['counts']['plinth_holes_dug'], 12)
# Blank counts default to 0, not absent
self.assertEqual(report.metrics['counts']['boxes_placed'], 0)
self.assertTrue(report.metrics['checks']['steel_tied'])
self.assertFalse(report.metrics['checks']['boxes_cleaned'])
def test_form_validates_temp_min_max_relationship(self):
"""If both temps are set and min > max, the supervisor probably
swapped them — surface a ValidationError rather than silently
accept it."""
data = {
'weather': '',
'temperature_min': 35,
'temperature_max': 18,
'notes': '',
}
form = SiteReportForm(data=data, work_log=self.work_log)
self.assertFalse(form.is_valid())
# The error is form-level (clean()), not field-level
self.assertTrue(any('Min temperature' in e for e in form.non_field_errors()))
def test_form_pre_fills_from_existing_metrics(self):
"""Editing an existing SiteReport should pre-populate the
dynamic fields from the JSON blob."""
existing = SiteReport.objects.create(
work_log=self.work_log,
weather='cloudy',
metrics={'counts': {'plinths_cast': 5}, 'checks': {'town_run': True}},
)
form = SiteReportForm(instance=existing)
self.assertEqual(form.fields['count_plinths_cast'].initial, 5)
self.assertTrue(form.fields['check_town_run'].initial)
class SiteReportEditViewTests(TestCase):
"""The edit view — permissions, GET behaviour, POST behaviour."""
def setUp(self):
# Two users: an admin (full access) and a supervisor of a
# DIFFERENT project (must be denied access to this project's
# work logs).
self.admin = User.objects.create_user(username='sre-admin', password='pw', is_staff=True)
self.supervisor = User.objects.create_user(username='sre-sup', password='pw')
self.outsider_supervisor = User.objects.create_user(username='sre-out', password='pw')
self.project = Project.objects.create(name='SRE Project')
self.project.supervisors.add(self.supervisor)
self.other_project = Project.objects.create(name='Other Project')
self.other_project.supervisors.add(self.outsider_supervisor)
self.work_log = WorkLog.objects.create(
date=datetime.date(2026, 4, 27),
project=self.project,
supervisor=self.supervisor,
)
def test_admin_get_returns_blank_form_for_new_report(self):
self.client.force_login(self.admin)
url = reverse('site_report_edit', kwargs={'work_log_id': self.work_log.id})
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.assertContains(response, 'Log Today') # heading on the create variant
def test_admin_post_creates_site_report(self):
self.client.force_login(self.admin)
url = reverse('site_report_edit', kwargs={'work_log_id': self.work_log.id})
response = self.client.post(url, {
'weather': 'rain',
'temperature_min': '15',
'temperature_max': '22',
'notes': 'Wet day, stopped early',
'count_plinths_cast': '4',
'check_rain_delay': 'on',
})
self.assertEqual(response.status_code, 302) # redirect to home on success
self.assertEqual(SiteReport.objects.count(), 1)
report = SiteReport.objects.first()
self.assertEqual(report.work_log_id, self.work_log.id)
self.assertEqual(report.weather, 'rain')
self.assertEqual(report.created_by_id, self.admin.id)
self.assertEqual(report.metrics['counts']['plinths_cast'], 4)
self.assertTrue(report.metrics['checks']['rain_delay'])
def test_admin_post_updates_existing_report_without_changing_created_by(self):
# A report already exists, created by the supervisor.
SiteReport.objects.create(
work_log=self.work_log,
weather='sunny',
created_by=self.supervisor,
)
# Admin edits it later.
self.client.force_login(self.admin)
url = reverse('site_report_edit', kwargs={'work_log_id': self.work_log.id})
response = self.client.post(url, {
'weather': 'cloudy',
'temperature_min': '',
'temperature_max': '',
'notes': 'Admin edited',
})
self.assertEqual(response.status_code, 302)
report = SiteReport.objects.get(work_log=self.work_log)
self.assertEqual(report.weather, 'cloudy')
# created_by is preserved — the original author keeps credit
self.assertEqual(report.created_by_id, self.supervisor.id)
def test_project_supervisor_can_edit_their_own_projects_report(self):
self.client.force_login(self.supervisor)
url = reverse('site_report_edit', kwargs={'work_log_id': self.work_log.id})
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
def test_outsider_supervisor_is_forbidden(self):
"""A supervisor of a DIFFERENT project must not be able to
access this project's site reports."""
self.client.force_login(self.outsider_supervisor)
url = reverse('site_report_edit', kwargs={'work_log_id': self.work_log.id})
response = self.client.get(url)
self.assertEqual(response.status_code, 403)
class SiteReportDetailViewTests(TestCase):
"""The read-only detail page."""
def setUp(self):
self.admin = User.objects.create_user(username='srd-admin', password='pw', is_staff=True)
self.project = Project.objects.create(name='SRD Project')
self.work_log = WorkLog.objects.create(
date=datetime.date(2026, 4, 27),
project=self.project,
supervisor=self.admin,
)
def test_404_when_no_report_exists(self):
"""The detail page is for VIEWING reports — if there isn't one,
404 (the supervisor should use the edit URL to create one)."""
self.client.force_login(self.admin)
url = reverse('site_report_detail', kwargs={'work_log_id': self.work_log.id})
response = self.client.get(url)
self.assertEqual(response.status_code, 404)
def test_displays_report_data_when_present(self):
SiteReport.objects.create(
work_log=self.work_log,
weather='sunny',
temperature_min=18,
temperature_max=32,
notes='Brief but visible note',
metrics={
'counts': {'plinths_cast': 7},
'checks': {'steel_tied': True},
},
)
self.client.force_login(self.admin)
url = reverse('site_report_detail', kwargs={'work_log_id': self.work_log.id})
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
# Spot-check that the values reach the rendered HTML
self.assertContains(response, 'Brief but visible note')
self.assertContains(response, 'Sunny')
# The count appears as a numeric in the count card
self.assertContains(response, '7')
class AttendanceLogRedirectsToSiteReportTests(TestCase):
"""The two-step flow: after submitting attendance, the user lands
on the site-report edit page for the most recently created log."""
def setUp(self):
self.admin = User.objects.create_user(username='atrr-admin', password='pw', is_staff=True)
self.project = Project.objects.create(name='AT Project')
self.worker = Worker.objects.create(
name='Test Worker',
id_number='AT1',
monthly_salary=Decimal('10000'),
)
def test_successful_attendance_post_redirects_to_site_report_edit(self):
"""The redirect target is the site-report form, not home — so
the supervisor lands somewhere they can immediately log progress."""
self.client.force_login(self.admin)
response = self.client.post(reverse('attendance_log'), {
'date': '2026-04-27',
'project': self.project.id,
'workers': [self.worker.id],
'overtime_amount': '0.00',
'notes': '',
})
self.assertEqual(response.status_code, 302)
# The redirect URL contains '/site-report/' and ends with '/edit/'
self.assertIn('/site-report/', response.url)
self.assertTrue(response.url.endswith('/edit/'),
msg=f"Expected redirect to .../edit/, got {response.url}")
# And exactly one work log was created
self.assertEqual(WorkLog.objects.count(), 1)
# ====================================================================
# === Worker Absence — Phase 1: Model layer ==========================
# ====================================================================
from datetime import date as _date
from django.db import IntegrityError
class AbsenceModelTests(TestCase):
"""Per-worker dated absence records. Mirrors WorkerWarning shape."""
@classmethod
def setUpTestData(cls):
cls.worker = Worker.objects.create(
name='Joe Mokoena', id_number='8001015800086',
monthly_salary=Decimal('6000.00'),
)
def test_defaults(self):
"""is_paid defaults to False; payroll_adjustment is null; date defaults to today."""
a = Absence.objects.create(worker=self.worker, reason='sick')
# Re-fetch from DB so the DateField default (timezone.now) is coerced
# from datetime → date by the DB layer (mirrors WorkerWarning pattern).
a.refresh_from_db()
self.assertFalse(a.is_paid)
self.assertIsNone(a.payroll_adjustment)
self.assertEqual(a.date, _date.today())
def test_unique_per_worker_per_day(self):
"""Cannot have two absences for the same worker on the same day."""
d = _date(2026, 5, 14)
Absence.objects.create(worker=self.worker, date=d, reason='sick')
with self.assertRaises(IntegrityError):
Absence.objects.create(worker=self.worker, date=d, reason='family')
def test_ordering_newest_first(self):
"""Default queryset order is -date."""
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 10), reason='sick')
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 15), reason='annual')
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 12), reason='other')
dates = list(Absence.objects.values_list('date', flat=True))
self.assertEqual(dates, [_date(2026, 5, 15), _date(2026, 5, 12), _date(2026, 5, 10)])
def test_reverse_accessor_on_worker(self):
"""worker.absences.all() works (related_name='absences')."""
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 1), reason='sick')
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 2), reason='family')
self.assertEqual(self.worker.absences.count(), 2)
# =============================================================================
# === ABSENCE HELPER TESTS (Task 2) ===
# Tests for the two shared helpers in core/views.py that every Absence
# save/edit/delete path will route through:
# - _sync_absence_payroll_adjustment: keeps is_paid in sync with the
# linked Bonus PayrollAdjustment (create / delete / refuse-if-paid).
# - _absence_user_queryset: scopes visibility by admin vs supervisor.
# =============================================================================
class AbsencePayrollSyncTests(TestCase):
"""The _sync_absence_payroll_adjustment helper keeps Absence.payroll_adjustment
in sync with Absence.is_paid. Mirrors the cascade pattern used for
Advance Payment → Loan auto-creation."""
@classmethod
def setUpTestData(cls):
cls.user = User.objects.create_user(username='admin', is_staff=True)
cls.worker = Worker.objects.create(
name='Sipho Dlamini', id_number='8501015800087',
monthly_salary=Decimal('6000.00'),
)
cls.project = Project.objects.create(name='Solar Farm Alpha')
def _make_absence(self, **kw):
defaults = dict(
worker=self.worker, date=_date(2026, 5, 14), reason='sick',
logged_by=self.user, is_paid=False,
)
defaults.update(kw)
return Absence.objects.create(**defaults)
def test_is_paid_false_no_adjustment(self):
"""Unpaid absence: sync is a no-op, no adjustment created."""
from core.views import _sync_absence_payroll_adjustment
a = self._make_absence(is_paid=False)
_sync_absence_payroll_adjustment(a)
self.assertIsNone(a.payroll_adjustment)
self.assertEqual(PayrollAdjustment.objects.count(), 0)
def test_is_paid_true_creates_bonus(self):
"""Paid absence creates a Bonus PayrollAdjustment at daily_rate."""
from core.views import _sync_absence_payroll_adjustment
a = self._make_absence(is_paid=True)
_sync_absence_payroll_adjustment(a)
a.refresh_from_db()
self.assertIsNotNone(a.payroll_adjustment)
adj = a.payroll_adjustment
self.assertEqual(adj.type, 'Bonus')
# daily_rate = monthly_salary / 20 = 6000 / 20 = 300
self.assertEqual(adj.amount, Decimal('300.00'))
self.assertEqual(adj.worker, self.worker)
self.assertEqual(adj.date, _date(2026, 5, 14))
def test_toggle_paid_off_deletes_adjustment(self):
"""is_paid True → False: existing adjustment is deleted."""
from core.views import _sync_absence_payroll_adjustment
a = self._make_absence(is_paid=True)
_sync_absence_payroll_adjustment(a)
self.assertEqual(PayrollAdjustment.objects.count(), 1)
a.is_paid = False
a.save()
_sync_absence_payroll_adjustment(a)
a.refresh_from_db()
self.assertIsNone(a.payroll_adjustment)
self.assertEqual(PayrollAdjustment.objects.count(), 0)
def test_toggle_paid_on_creates_fresh(self):
"""is_paid False → True: new adjustment is created."""
from core.views import _sync_absence_payroll_adjustment
a = self._make_absence(is_paid=False)
_sync_absence_payroll_adjustment(a)
self.assertIsNone(a.payroll_adjustment)
a.is_paid = True
a.save()
_sync_absence_payroll_adjustment(a)
a.refresh_from_db()
self.assertIsNotNone(a.payroll_adjustment)
def test_paid_with_existing_adj_is_idempotent(self):
"""is_paid True + adjustment already linked → leave it alone.
Admin-edited amount must survive a re-sync."""
from core.views import _sync_absence_payroll_adjustment
a = self._make_absence(is_paid=True)
_sync_absence_payroll_adjustment(a)
a.refresh_from_db()
original_adj = a.payroll_adjustment
self.assertIsNotNone(original_adj)
# Admin manually edits the amount (e.g. half-day pay)
original_adj.amount = Decimal('150.00')
original_adj.save()
# Second sync call — must not destroy admin's edit
_sync_absence_payroll_adjustment(a)
a.refresh_from_db()
self.assertEqual(a.payroll_adjustment_id, original_adj.id)
self.assertEqual(a.payroll_adjustment.amount, Decimal('150.00'))
self.assertEqual(PayrollAdjustment.objects.count(), 1)
def test_refuses_if_adjustment_already_paid(self):
"""If the linked adjustment has a PayrollRecord (i.e. already paid),
the sync helper refuses to delete it — surface to admin instead."""
from core.views import _sync_absence_payroll_adjustment
a = self._make_absence(is_paid=True)
_sync_absence_payroll_adjustment(a)
a.refresh_from_db()
# Simulate the adjustment being paid by attaching a PayrollRecord
pr = PayrollRecord.objects.create(worker=self.worker, amount_paid=Decimal('300.00'), date=_date(2026, 5, 30))
a.payroll_adjustment.payroll_record = pr
a.payroll_adjustment.save()
a.is_paid = False
a.save()
with self.assertRaises(ValueError):
_sync_absence_payroll_adjustment(a)
# Adjustment still exists
self.assertEqual(PayrollAdjustment.objects.count(), 1)
class AbsenceUserQuerysetTests(TestCase):
"""_absence_user_queryset scopes the queryset to admin (all) or supervisor
(workers in their supervised teams)."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(username='admin', is_staff=True)
cls.sup_a = User.objects.create_user(username='sup_a')
cls.sup_b = User.objects.create_user(username='sup_b')
cls.outsider = User.objects.create_user(username='out')
cls.worker_a = Worker.objects.create(name='WA', id_number='1', monthly_salary=Decimal('6000'))
cls.worker_b = Worker.objects.create(name='WB', id_number='2', monthly_salary=Decimal('6000'))
cls.team_a = Team.objects.create(name='TA', supervisor=cls.sup_a)
cls.team_a.workers.add(cls.worker_a)
cls.team_b = Team.objects.create(name='TB', supervisor=cls.sup_b)
cls.team_b.workers.add(cls.worker_b)
Absence.objects.create(worker=cls.worker_a, date=_date(2026, 5, 1), reason='sick')
Absence.objects.create(worker=cls.worker_b, date=_date(2026, 5, 1), reason='annual')
def test_admin_sees_all(self):
from core.views import _absence_user_queryset
qs = _absence_user_queryset(self.admin)
self.assertEqual(qs.count(), 2)
def test_supervisor_sees_only_own_team(self):
from core.views import _absence_user_queryset
qs_a = _absence_user_queryset(self.sup_a)
self.assertEqual(qs_a.count(), 1)
self.assertEqual(qs_a.first().worker, self.worker_a)
def test_outsider_sees_none(self):
from core.views import _absence_user_queryset
qs = _absence_user_queryset(self.outsider)
self.assertEqual(qs.count(), 0)
# =============================================================================
# === ABSENCE FORM TESTS (Task 3) ===
# Tests for the form classes added in core/forms.py:
# - AbsenceLogForm: standalone /absences/log/ with date-range + multi-worker
# - AbsenceEditForm: edit one existing absence
# =============================================================================
class AbsenceFormTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(username='admin', is_staff=True)
cls.worker_a = Worker.objects.create(name='WA', id_number='1', monthly_salary=Decimal('6000'))
cls.worker_b = Worker.objects.create(name='WB', id_number='2', monthly_salary=Decimal('6000'))
cls.project = Project.objects.create(name='P1')
cls.team = Team.objects.create(name='T', supervisor=cls.admin)
cls.team.workers.add(cls.worker_a, cls.worker_b)
def test_single_date_submission(self):
from core.forms import AbsenceLogForm
form = AbsenceLogForm(
data={
'date': '2026-05-14',
'reason': 'sick',
'is_paid': False,
'notes': 'flu',
'team': self.team.id,
'workers': [self.worker_a.id],
},
user=self.admin,
)
self.assertTrue(form.is_valid(), msg=form.errors)
# cleaned data has expanded (worker, date) tuples
tuples = form.expanded_pairs()
self.assertEqual(len(tuples), 1)
self.assertEqual(tuples[0], (self.worker_a, _date(2026, 5, 14)))
def test_range_expansion_skips_weekends_by_default(self):
from core.forms import AbsenceLogForm
# Thursday → Monday (5 days, Sat+Sun skipped → 3 days)
form = AbsenceLogForm(
data={
'date': '2026-05-14', # Thursday
'end_date': '2026-05-18', # Monday
'reason': 'annual',
'workers': [self.worker_a.id],
},
user=self.admin,
)
self.assertTrue(form.is_valid())
dates = [d for _w, d in form.expanded_pairs()]
# 14 Thu, 15 Fri, 18 Mon (16 Sat / 17 Sun skipped)
self.assertEqual(dates, [_date(2026, 5, 14), _date(2026, 5, 15), _date(2026, 5, 18)])
def test_range_expansion_with_weekend_toggles(self):
from core.forms import AbsenceLogForm
form = AbsenceLogForm(
data={
'date': '2026-05-14',
'end_date': '2026-05-17', # Thursday → Sunday
'reason': 'sick',
'include_saturday': True,
'include_sunday': True,
'workers': [self.worker_a.id],
},
user=self.admin,
)
self.assertTrue(form.is_valid())
dates = [d for _w, d in form.expanded_pairs()]
self.assertEqual(len(dates), 4) # all 4 days
def test_end_date_before_start_rejected(self):
from core.forms import AbsenceLogForm
form = AbsenceLogForm(
data={
'date': '2026-05-15',
'end_date': '2026-05-10',
'reason': 'sick',
'workers': [self.worker_a.id],
},
user=self.admin,
)
self.assertFalse(form.is_valid())
self.assertIn('end_date', form.errors)
def test_duplicate_absence_rejected(self):
"""If absence(worker, date) already exists, form is invalid."""
from core.forms import AbsenceLogForm
Absence.objects.create(worker=self.worker_a, date=_date(2026, 5, 14), reason='sick')
form = AbsenceLogForm(
data={
'date': '2026-05-14',
'reason': 'family',
'workers': [self.worker_a.id],
},
user=self.admin,
)
self.assertFalse(form.is_valid())
# Error message names the worker
self.assertIn('already', str(form.errors).lower())
def test_worklog_conflict_flagged_not_blocking(self):
"""If worker has a WorkLog on the absence date, form is still valid
but conflicting_worklogs() returns the conflict rows."""
from core.forms import AbsenceLogForm
wl = WorkLog.objects.create(
date=_date(2026, 5, 14), project=self.project, supervisor=self.admin,
)
wl.workers.add(self.worker_a)
form = AbsenceLogForm(
data={
'date': '2026-05-14',
'reason': 'sick',
'workers': [self.worker_a.id],
},
user=self.admin,
)
self.assertTrue(form.is_valid()) # Conflicts are warnings, not errors
conflicts = form.conflicting_worklogs()
self.assertEqual(len(conflicts), 1)
self.assertEqual(conflicts[0]['worker_id'], self.worker_a.id)
self.assertEqual(conflicts[0]['work_log_id'], wl.id)
def test_edit_form_supervisor_scope(self):
"""AbsenceEditForm with user=supervisor only shows workers in
supervisor's active teams. Other-team workers are not in the
dropdown queryset."""
from core.forms import AbsenceEditForm
# Create a supervisor with their own team
sup = User.objects.create_user(username='sup_edit', password='pw')
sup_team = Team.objects.create(name='SupTeam', supervisor=sup)
sup_worker = Worker.objects.create(
name='Sup Worker', id_number='99', monthly_salary=Decimal('6000'),
)
sup_team.workers.add(sup_worker)
# worker_b is on a team supervised by self.admin, not by sup
abs1 = Absence.objects.create(worker=sup_worker, date=_date(2026, 6, 1), reason='sick')
form = AbsenceEditForm(instance=abs1, user=sup)
worker_ids = list(form.fields['worker'].queryset.values_list('id', flat=True))
self.assertIn(sup_worker.id, worker_ids)
self.assertNotIn(self.worker_b.id, worker_ids)
def test_edit_form_uniqueness_uses_date_error(self):
"""AbsenceEditForm.clean() adds the uniqueness error to the 'date'
field (not raise), so it renders next to the field."""
from core.forms import AbsenceEditForm
# Existing absence on (worker_a, 2026-06-15)
abs1 = Absence.objects.create(worker=self.worker_a, date=_date(2026, 6, 15), reason='sick')
# Try to edit a DIFFERENT absence to clash with abs1
abs2 = Absence.objects.create(worker=self.worker_a, date=_date(2026, 6, 20), reason='annual')
form = AbsenceEditForm(
instance=abs2,
data={
'worker': self.worker_a.id,
'date': '2026-06-15', # clash
'reason': 'family',
'notes': '',
},
)
self.assertFalse(form.is_valid())
# Error must be on the 'date' field, not non-field
self.assertIn('date', form.errors)
self.assertIn('already', str(form.errors['date']).lower())
# === ABSENCE LOG + CONFIRM VIEW TESTS (Task 4) ===
# GET shows the form. POST without conflicts creates Absence rows
# immediately. POST with conflicts stashes pending data in the session
# and redirects to /absences/log/confirm/ where the admin can opt to
# also remove the conflicting WorkLog entries before committing.
class AbsenceLogViewTests(TestCase):
"""GET shows form; POST without conflicts creates absences immediately;
POST with conflicts stashes pending data in session + redirects to
/absences/log/confirm/."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
cls.worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('6000'))
cls.project = Project.objects.create(name='P')
cls.team = Team.objects.create(name='T', supervisor=cls.admin)
cls.team.workers.add(cls.worker)
def setUp(self):
self.client.force_login(self.admin)
def test_get_returns_200(self):
resp = self.client.get('/absences/log/')
self.assertEqual(resp.status_code, 200)
def test_post_creates_absences_when_no_conflict(self):
resp = self.client.post('/absences/log/', data={
'date': '2026-05-14',
'reason': 'sick',
'workers': [self.worker.id],
'notes': 'flu',
})
self.assertEqual(Absence.objects.count(), 1)
absence = Absence.objects.first()
self.assertEqual(absence.worker, self.worker)
self.assertEqual(absence.reason, 'sick')
self.assertFalse(absence.is_paid)
self.assertEqual(absence.logged_by, self.admin)
self.assertRedirects(resp, '/absences/', fetch_redirect_response=False)
def test_post_with_paid_creates_adjustment(self):
self.client.post('/absences/log/', data={
'date': '2026-05-14',
'reason': 'sick',
'is_paid': 'on',
'workers': [self.worker.id],
})
absence = Absence.objects.first()
self.assertTrue(absence.is_paid)
self.assertIsNotNone(absence.payroll_adjustment)
self.assertEqual(absence.payroll_adjustment.type, 'Bonus')
def test_post_with_worklog_conflict_redirects_to_confirm(self):
wl = WorkLog.objects.create(date=_date(2026, 5, 14), project=self.project, supervisor=self.admin)
wl.workers.add(self.worker)
resp = self.client.post('/absences/log/', data={
'date': '2026-05-14',
'reason': 'sick',
'workers': [self.worker.id],
})
self.assertEqual(Absence.objects.count(), 0) # NOT created yet
self.assertRedirects(resp, '/absences/log/confirm/', fetch_redirect_response=False)
# Session has stashed pending data
self.assertIn('absence_pending', self.client.session)
def test_supervisor_can_post(self):
sup = User.objects.create_user(username='sup', password='pw')
sup_team = Team.objects.create(name='ST', supervisor=sup)
sup_team.workers.add(self.worker)
self.client.force_login(sup)
resp = self.client.post('/absences/log/', data={
'date': '2026-05-14',
'reason': 'sick',
'workers': [self.worker.id],
})
self.assertEqual(Absence.objects.count(), 1)
def test_outsider_gets_403(self):
outsider = User.objects.create_user(username='out', password='pw')
self.client.force_login(outsider)
resp = self.client.get('/absences/log/')
self.assertEqual(resp.status_code, 403)
class AbsenceConfirmViewTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
cls.worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('6000'))
cls.project = Project.objects.create(name='P')
def setUp(self):
self.client.force_login(self.admin)
# Pre-stash pending data + create the conflict WorkLog
self.wl = WorkLog.objects.create(date=_date(2026, 5, 14), project=self.project, supervisor=self.admin)
self.wl.workers.add(self.worker)
session = self.client.session
session['absence_pending'] = {
'pairs': [[self.worker.id, '2026-05-14']],
'reason': 'sick',
'is_paid': False,
'notes': 'flu',
'conflicts': [{
'worker_id': self.worker.id, 'worker_name': 'W',
'date': '2026-05-14', 'work_log_id': self.wl.id,
'project_name': 'P',
}],
}
session.save()
def test_get_without_session_redirects_back(self):
# Clear session
session = self.client.session
session.pop('absence_pending', None)
session.save()
resp = self.client.get('/absences/log/confirm/')
self.assertRedirects(resp, '/absences/log/', fetch_redirect_response=False)
def test_get_with_session_shows_warning(self):
resp = self.client.get('/absences/log/confirm/')
self.assertEqual(resp.status_code, 200)
self.assertContains(resp, 'already') # warning text
def test_post_creates_absences_and_removes_from_worklog(self):
resp = self.client.post('/absences/log/confirm/', data={
f'remove_from_worklog_{self.wl.id}_{self.worker.id}': 'on',
})
self.assertEqual(Absence.objects.count(), 1)
self.assertNotIn(self.worker, self.wl.workers.all())
self.assertRedirects(resp, '/absences/', fetch_redirect_response=False)
self.assertNotIn('absence_pending', self.client.session)
def test_post_without_removal_still_creates_absences(self):
resp = self.client.post('/absences/log/confirm/', data={})
self.assertEqual(Absence.objects.count(), 1)
# Worker still in WorkLog because admin didn't tick the box
self.assertIn(self.worker, self.wl.workers.all())
def test_post_drops_unauthorized_removal_keys(self):
"""SECURITY: a POST key referencing a WorkLog that wasn't in the
stashed conflicts list must be silently ignored. The confirm page
is a strict accept-or-reject gate against what was shown."""
# Create an UNRELATED WorkLog with a worker on it
other_worker = Worker.objects.create(
name='Other Worker', id_number='99', monthly_salary=Decimal('6000'),
)
other_wl = WorkLog.objects.create(
date=_date(2026, 5, 14), project=self.project, supervisor=self.admin,
)
other_wl.workers.add(other_worker)
# Confirm POST tries to remove other_worker from other_wl
# (this pair is NOT in the stashed conflicts — only (self.wl.id, self.worker.id) is)
resp = self.client.post('/absences/log/confirm/', data={
f'remove_from_worklog_{other_wl.id}_{other_worker.id}': 'on',
})
# The unauthorized removal must NOT have happened
self.assertIn(other_worker, other_wl.workers.all())
# The legitimate absence creation still happens
self.assertEqual(Absence.objects.count(), 1)
self.assertRedirects(resp, '/absences/', fetch_redirect_response=False)
def test_post_with_malformed_removal_key_is_dropped(self):
"""Malformed POST keys like remove_from_worklog_abc_5 must not crash
the view — they're silently skipped."""
resp = self.client.post('/absences/log/confirm/', data={
'remove_from_worklog_abc_5': 'on', # non-numeric wl_id
'remove_from_worklog_42_xyz': 'on', # non-numeric worker_id
'remove_from_worklog_only_three': 'on', # not enough parts
})
self.assertEqual(Absence.objects.count(), 1) # Still creates absence
self.assertEqual(resp.status_code, 302) # No 500
def test_confirm_page_renders_new_copy(self):
"""Round D-followup — confirm page uses action-oriented copy + Cancel button."""
resp = self.client.get('/absences/log/confirm/')
self.assertEqual(resp.status_code, 200)
# New banner copy
self.assertContains(resp, "were already logged")
# Cancel button (was 'Back to form')
self.assertContains(resp, "Cancel")
# Save button (was 'Confirm &amp; Create Absences')
self.assertContains(resp, "Save Absences")
# Recommended hint on the checkbox
self.assertContains(resp, "recommended")
# === ABSENCE LIST / EDIT / DELETE / EXPORT VIEW TESTS ============================
# Covers Task 5 of the Worker Absences feature: browsing absences via /absences/
# with filters and pagination; editing a single absence and the paid-flag side
# effect; deleting absences with cascade rules; admin CSV export.
class AbsenceListViewTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
cls.sup = User.objects.create_user(username='sup', password='pw')
cls.worker_a = Worker.objects.create(name='WA', id_number='1', monthly_salary=Decimal('6000'))
cls.worker_b = Worker.objects.create(name='WB', id_number='2', monthly_salary=Decimal('6000'))
cls.team_a = Team.objects.create(name='TA', supervisor=cls.sup)
cls.team_a.workers.add(cls.worker_a)
Absence.objects.create(worker=cls.worker_a, date=_date(2026, 5, 1), reason='sick')
Absence.objects.create(worker=cls.worker_b, date=_date(2026, 5, 1), reason='annual')
def test_admin_sees_all(self):
self.client.force_login(self.admin)
resp = self.client.get('/absences/')
self.assertContains(resp, 'WA')
self.assertContains(resp, 'WB')
def test_supervisor_only_sees_own(self):
self.client.force_login(self.sup)
resp = self.client.get('/absences/')
self.assertContains(resp, 'WA')
self.assertNotContains(resp, 'WB')
def test_filter_by_reason(self):
"""?reason=sick → table rows show only sick-absence workers.
(WB has annual leave, so WB's name should not appear in any
table row; but WB CAN appear in the worker filter dropdown —
that's the option list, which is intentionally NOT narrowed
by the current filter so users can pivot between filters.)
"""
self.client.force_login(self.admin)
resp = self.client.get('/absences/?reason=sick')
# Check the row content — both workers may appear in the filter
# dropdown <option>s, but only WA should have a table data cell.
self.assertContains(resp, '<td>WA</td>', html=False)
self.assertNotContains(resp, '<td>WB</td>', html=False)
def test_filter_by_multiple_reasons(self):
"""?reason=sick&reason=annual → table shows BOTH workers.
Regression test for Fix A2 (May 2026): the reason filter accepts
multiple values via request.GET.getlist('reason') and OR-unions
them with filter(reason__in=...). Without this, the second
?reason= param overrides the first and only one worker would
appear.
"""
self.client.force_login(self.admin)
resp = self.client.get('/absences/?reason=sick&reason=annual')
self.assertContains(resp, '<td>WA</td>', html=False)
self.assertContains(resp, '<td>WB</td>', html=False)
def test_malformed_date_param_does_not_crash(self):
"""SECURITY: garbage in URL params must not 500. parse_date()
returns None on invalid input — those filters get skipped.
"""
self.client.force_login(self.admin)
resp = self.client.get('/absences/?date_from=not-a-date&date_to=also-bad')
self.assertEqual(resp.status_code, 200)
class AbsenceEditDeleteTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
cls.worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('6000'))
cls.absence = Absence.objects.create(worker=cls.worker, date=_date(2026, 5, 14), reason='sick')
def setUp(self):
self.client.force_login(self.admin)
def test_edit_toggling_paid_creates_adjustment(self):
resp = self.client.post(f'/absences/{self.absence.id}/edit/', data={
'worker': self.worker.id,
'date': '2026-05-14',
'reason': 'sick',
'is_paid': 'on',
'notes': '',
})
self.absence.refresh_from_db()
self.assertTrue(self.absence.is_paid)
self.assertIsNotNone(self.absence.payroll_adjustment)
def test_edit_untoggling_paid_deletes_adjustment(self):
self.absence.is_paid = True
self.absence.save()
from core.views import _sync_absence_payroll_adjustment
_sync_absence_payroll_adjustment(self.absence)
self.absence.refresh_from_db()
adj_id = self.absence.payroll_adjustment.id
self.client.post(f'/absences/{self.absence.id}/edit/', data={
'worker': self.worker.id,
'date': '2026-05-14',
'reason': 'sick',
'notes': '',
# is_paid not in POST -> unchecked
})
self.assertFalse(PayrollAdjustment.objects.filter(id=adj_id).exists())
def test_delete_cascade_unpaid_adjustment(self):
self.absence.is_paid = True
self.absence.save()
from core.views import _sync_absence_payroll_adjustment
_sync_absence_payroll_adjustment(self.absence)
self.absence.refresh_from_db()
adj_id = self.absence.payroll_adjustment.id
resp = self.client.post(f'/absences/{self.absence.id}/delete/')
self.assertFalse(Absence.objects.filter(id=self.absence.id).exists())
self.assertFalse(PayrollAdjustment.objects.filter(id=adj_id).exists())
def test_delete_refuses_when_adjustment_paid(self):
self.absence.is_paid = True
self.absence.save()
from core.views import _sync_absence_payroll_adjustment
_sync_absence_payroll_adjustment(self.absence)
self.absence.refresh_from_db()
# Mark the adjustment as paid
pr = PayrollRecord.objects.create(worker=self.worker, amount_paid=Decimal('300'), date=_date(2026, 5, 30))
self.absence.payroll_adjustment.payroll_record = pr
self.absence.payroll_adjustment.save()
resp = self.client.post(f'/absences/{self.absence.id}/delete/')
self.assertTrue(Absence.objects.filter(id=self.absence.id).exists()) # NOT deleted
def test_supervisor_cannot_edit_other_team_absence(self):
sup = User.objects.create_user(username='sup', password='pw')
# sup doesn't supervise the team that worker is on
self.client.force_login(sup)
resp = self.client.get(f'/absences/{self.absence.id}/edit/')
self.assertEqual(resp.status_code, 404)
def test_edit_refuses_to_untoggle_paid_adjustment(self):
"""SECURITY/CORRECTNESS: untoggling is_paid on an already-paid
absence must roll back atomically. The absence stays is_paid=True,
the adjustment stays linked, and an error is surfaced."""
self.absence.is_paid = True
self.absence.save()
from core.views import _sync_absence_payroll_adjustment
_sync_absence_payroll_adjustment(self.absence)
self.absence.refresh_from_db()
adj_id = self.absence.payroll_adjustment.id
# Simulate adjustment already paid
pr = PayrollRecord.objects.create(
worker=self.worker, amount_paid=Decimal('300'), date=_date(2026, 5, 30),
)
adj = self.absence.payroll_adjustment
adj.payroll_record = pr
adj.save()
# Admin tries to untick is_paid
resp = self.client.post(f'/absences/{self.absence.id}/edit/', data={
'worker': self.worker.id,
'date': '2026-05-14',
'reason': 'sick',
'notes': '',
# is_paid not in POST → form sees unchecked
})
# Rollback verified:
self.absence.refresh_from_db()
self.assertTrue(self.absence.is_paid)
self.assertEqual(self.absence.payroll_adjustment_id, adj_id)
self.assertTrue(PayrollAdjustment.objects.filter(id=adj_id).exists())
class AbsenceExportCSVTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
cls.sup = User.objects.create_user(username='sup', password='pw')
cls.worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('6000'))
Absence.objects.create(worker=cls.worker, date=_date(2026, 5, 14), reason='sick')
def test_admin_can_export(self):
self.client.force_login(self.admin)
resp = self.client.get('/absences/export/')
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp['Content-Type'], 'text/csv')
self.assertIn(b'W,', resp.content)
self.assertIn(b'Sick', resp.content)
def test_supervisor_forbidden(self):
self.client.force_login(self.sup)
resp = self.client.get('/absences/export/')
self.assertEqual(resp.status_code, 403)
# =============================================================================
# === ABSENCE PROJECT FK TESTS (Round B) ===
# Tests for migration 0015_absence_project — adds an optional Project FK
# to Absence so paid absences can be cost-attributed to the right project,
# and so admin can filter "which workers were absent on Solar Farm X?".
# =============================================================================
class AbsenceProjectTests(TestCase):
"""Round B — Absence.project FK + project propagation to Bonus adjustment."""
@classmethod
def setUpTestData(cls):
# Admin user for the form-submit tests.
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
# Worker with a known daily rate (6000 / 20 = 300).
cls.worker = Worker.objects.create(
name='W', id_number='1', monthly_salary=Decimal('6000'),
)
# Project the absence will be linked to.
cls.project = Project.objects.create(name='Solar Farm Alpha')
# Team membership so worker is in the supervisor's reach if needed
# (not strictly required for these tests, but keeps the setup
# consistent with the rest of the absence-tests block above).
cls.team = Team.objects.create(name='T', supervisor=cls.admin)
cls.team.workers.add(cls.worker)
def test_project_on_absence_model_nullable(self):
"""Absence.project is optional — can be None or a real project."""
# Default: project is null.
a = Absence.objects.create(worker=self.worker, reason='sick')
self.assertIsNone(a.project)
# Setting it works.
a.project = self.project
a.save()
a.refresh_from_db()
self.assertEqual(a.project, self.project)
def test_paid_absence_with_project_propagates_to_bonus(self):
"""When is_paid=True, the auto-Bonus inherits absence.project for cost attribution."""
from core.views import _sync_absence_payroll_adjustment
a = Absence.objects.create(
worker=self.worker, reason='sick', is_paid=True, project=self.project,
)
_sync_absence_payroll_adjustment(a)
a.refresh_from_db()
# The Bonus adjustment carries the same project as the absence.
self.assertEqual(a.payroll_adjustment.project, self.project)
# And the adjustment is the right type at the right amount.
self.assertEqual(a.payroll_adjustment.type, 'Bonus')
self.assertEqual(a.payroll_adjustment.amount, Decimal('300.00'))
def test_log_form_accepts_project(self):
"""POST /absences/log/ with a project ID creates the Absence with that FK set."""
self.client.force_login(self.admin)
resp = self.client.post('/absences/log/', data={
'date': '2026-05-14',
'reason': 'sick',
'project': self.project.id,
'workers': [self.worker.id],
})
# After successful save the view redirects to the list.
self.assertEqual(resp.status_code, 302)
# Exactly one absence — and it carries the project we picked.
a = Absence.objects.first()
self.assertIsNotNone(a)
self.assertEqual(a.project, self.project)
def test_edit_form_can_change_project(self):
"""Edit form can add or change the project on an existing absence."""
self.client.force_login(self.admin)
# Start with no project link.
a = Absence.objects.create(worker=self.worker, date=_date(2026, 5, 14), reason='sick')
self.assertIsNone(a.project)
# Edit it — pick the project.
resp = self.client.post(f'/absences/{a.id}/edit/', data={
'worker': self.worker.id,
'date': '2026-05-14',
'project': self.project.id,
'reason': 'sick',
'notes': '',
})
# Redirects to list on success.
self.assertEqual(resp.status_code, 302)
a.refresh_from_db()
self.assertEqual(a.project, self.project)
def test_list_filter_by_project(self):
"""?project=X filters absences by Absence.project_id directly (not via work_logs)."""
self.client.force_login(self.admin)
other_project = Project.objects.create(name='Other')
# Two absences, different projects.
Absence.objects.create(
worker=self.worker, date=_date(2026, 5, 1), reason='sick',
project=self.project,
)
Absence.objects.create(
worker=self.worker, date=_date(2026, 5, 2), reason='annual',
project=other_project,
)
# Filtering by self.project should show only the sick (matching)
# row and exclude the annual one. We check by looking at the
# 'page' object on the context — the rendered HTML also has
# badge CSS classes inline in <style>, so we can't substring-match
# on the class names alone (they appear in the style block even
# when there are no rows).
resp = self.client.get(f'/absences/?project={self.project.id}')
self.assertEqual(resp.status_code, 200)
page = resp.context['page']
# Only one row in the page (the sick one), and it belongs to
# `self.project` — proves the FK filter narrowed to direct matches.
rows = list(page.object_list)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].reason, 'sick')
self.assertEqual(rows[0].project, self.project)
def test_log_form_paid_absence_propagates_project_to_bonus(self):
"""Full chain: POST /absences/log/ with is_paid+project → Bonus has project.
Regression guard: someone refactoring _create_absences_atomic to forget
threading `project` through the helper would silently lose the project
link on the auto-created adjustment. This test exercises the entire
form → view → helper → adjustment chain."""
self.client.force_login(self.admin)
self.client.post('/absences/log/', data={
'date': '2026-05-14',
'reason': 'sick',
'is_paid': 'on',
'project': self.project.id,
'workers': [self.worker.id],
})
a = Absence.objects.first()
self.assertIsNotNone(a)
self.assertEqual(a.project, self.project)
self.assertIsNotNone(a.payroll_adjustment)
self.assertEqual(a.payroll_adjustment.project, self.project)
self.assertEqual(a.payroll_adjustment.type, 'Bonus')
# ====================================================================
# === Worker Absence — Round C: Attendance → Absence shortcut =========
# Konrad's ask: after submitting attendance, give us a button that
# jumps straight to /absences/log/ pre-filled with the same date /
# team / project. The attendance form has two submit buttons
# differentiated by name=next_action value=log_only|log_absences.
# ====================================================================
class AbsenceAttendanceShortcutTests(TestCase):
"""Round C — Submit + Log Absences button on /attendance/log/.
Submitting attendance with next_action='log_absences' redirects to
/absences/log/ pre-filled with the date / team / project. The
default submit ('log_only' or absent) keeps the existing Site
Report flow."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(
username='roundc-admin', password='pw', is_staff=True,
)
cls.worker = Worker.objects.create(
name='W', id_number='RC1', monthly_salary=Decimal('6000'),
)
cls.project = Project.objects.create(name='Solar Farm Alpha')
cls.team = Team.objects.create(name='Team A', supervisor=cls.admin)
cls.team.workers.add(cls.worker)
def setUp(self):
self.client.force_login(self.admin)
def test_default_attendance_submit_unchanged(self):
"""next_action absent or 'log_only' → existing Site Report redirect."""
resp = self.client.post(reverse('attendance_log'), data={
'date': '2026-05-14',
'project': self.project.id,
'team': self.team.id,
'workers': [self.worker.id],
'overtime_amount': '0.00',
'notes': '',
# next_action omitted on purpose — should fall through to
# the existing Site Report behaviour.
})
self.assertEqual(resp.status_code, 302)
self.assertIn('/site-report/', resp.url)
def test_log_only_explicit_value_still_goes_to_site_report(self):
"""An explicit next_action='log_only' (the default button) keeps
the existing behaviour — important for backwards compatibility."""
resp = self.client.post(reverse('attendance_log'), data={
'date': '2026-05-14',
'project': self.project.id,
'team': self.team.id,
'workers': [self.worker.id],
'overtime_amount': '0.00',
'notes': '',
'next_action': 'log_only',
})
self.assertEqual(resp.status_code, 302)
self.assertIn('/site-report/', resp.url)
def test_log_absences_button_redirects_to_absence_log(self):
"""next_action='log_absences' → redirect to /absences/log/ with
date / team / project query-string params pre-filled."""
resp = self.client.post(reverse('attendance_log'), data={
'date': '2026-05-14',
'project': self.project.id,
'team': self.team.id,
'workers': [self.worker.id],
'overtime_amount': '0.00',
'notes': '',
'next_action': 'log_absences',
})
self.assertEqual(resp.status_code, 302)
self.assertIn('/absences/log/', resp.url)
self.assertIn('date=2026-05-14', resp.url)
self.assertIn(f'team={self.team.id}', resp.url)
self.assertIn(f'project={self.project.id}', resp.url)
def test_absence_log_prefills_from_url_params(self):
"""GET /absences/log/?date=X&team=Y&project=Z → form is
pre-populated. We do a cheap rendered-HTML check for the
date value (the most user-visible signal)."""
url = (
reverse('absence_log')
+ f'?date=2026-05-14&team={self.team.id}&project={self.project.id}'
)
resp = self.client.get(url)
self.assertEqual(resp.status_code, 200)
# Date field should render with value="2026-05-14"
self.assertContains(resp, 'value="2026-05-14"')
def test_log_absences_intent_survives_conflict_resolution(self):
"""Round C — when conflicts force a re-render of the attendance form,
the next_action='log_absences' value is preserved as a hidden field
via the conflict-form's form.data.items loop. Resolving the conflict
(e.g. Overwrite) then redirects to /absences/log/ as originally intended."""
# Pre-create a conflicting WorkLog for the same worker+date
pre_existing_log = WorkLog.objects.create(
date=datetime.date(2026, 5, 14),
project=self.project,
supervisor=self.admin,
)
pre_existing_log.workers.add(self.worker)
# First POST — should hit conflict detection (not redirect)
resp = self.client.post('/attendance/log/', data={
'date': '2026-05-14',
'project': self.project.id,
'team': self.team.id,
'workers': [self.worker.id],
'overtime_amount': '0.00',
'notes': '',
'next_action': 'log_absences',
})
# Should render the conflict resolution screen (200), not redirect
self.assertEqual(resp.status_code, 200)
# The next_action value must be present as a hidden input in the rendered conflict form
self.assertContains(resp, 'name="next_action"')
self.assertContains(resp, 'value="log_absences"')
# Second POST — resolve the conflict via Overwrite, carrying next_action through
resp2 = self.client.post('/attendance/log/', data={
'date': '2026-05-14',
'project': self.project.id,
'team': self.team.id,
'workers': [self.worker.id],
'overtime_amount': '0.00',
'notes': '',
'next_action': 'log_absences',
'conflict_action': 'overwrite',
})
# Now should redirect to /absences/log/ with prefill params
self.assertEqual(resp2.status_code, 302)
self.assertIn('/absences/log/', resp2.url)
self.assertIn('date=2026-05-14', resp2.url)
# === ROUND D — Absences tab on /workers/<id>/ + dashboard alert card ===
# These tests cover the worker-detail Absences tab (YTD chips + recent-50
# table) and the conditional "X absent in last 7 days" stat card on the
# admin dashboard. Both pull from the same Absence queryset; the dashboard
# card only renders when count > 0.
class AbsenceWorkerDetailTests(TestCase):
"""Round D — Absences tab on /workers/<id>/ shows YTD totals + table."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
cls.worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('6000'))
cls.project = Project.objects.create(name='P')
from django.utils import timezone as _tz
current_year = _tz.now().year
cls.current_year = current_year
Absence.objects.create(worker=cls.worker, date=_date(current_year, 1, 5), reason='sick')
Absence.objects.create(worker=cls.worker, date=_date(current_year, 2, 10), reason='sick')
Absence.objects.create(worker=cls.worker, date=_date(current_year, 3, 1), reason='annual')
# Last year — should NOT count toward YTD
Absence.objects.create(worker=cls.worker, date=_date(current_year - 1, 5, 1), reason='sick')
def test_worker_detail_includes_ytd_totals(self):
self.client.force_login(self.admin)
resp = self.client.get(f'/workers/{self.worker.id}/')
self.assertEqual(resp.status_code, 200)
ytd = resp.context['absence_ytd_totals']
# Two sick + one annual this year; last year's sick is excluded.
self.assertEqual(ytd.get('sick'), 2)
self.assertEqual(ytd.get('annual'), 1)
# Last year's sick absence is in worker_absences (not filtered) but
# NOT in YTD totals. All 4 absences appear in the recent-50 table.
worker_absences = resp.context['worker_absences']
all_dates = [a.date for a in worker_absences]
self.assertEqual(len(all_dates), 4)
def test_worker_detail_renders_tab_and_chips(self):
self.client.force_login(self.admin)
resp = self.client.get(f'/workers/{self.worker.id}/')
self.assertContains(resp, 'Absences') # tab label
self.assertContains(resp, 'Sick: 2') # YTD chip
self.assertContains(resp, 'Annual Leave: 1')
class DateWindowOffByOneTests(TestCase):
"""Regression for Finding 13: 'last N days' windows on the dashboard
used to subtract N (not N-1), producing N+1 inclusive days. Now they
span exactly N days."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(
username='offbyone', password='pw', is_staff=True, is_superuser=True,
)
cls.worker = Worker.objects.create(
name='W', id_number='OBO1', monthly_salary=Decimal('6000'),
)
def setUp(self):
self.client.force_login(self.admin)
def test_absences_last_7_days_window_is_exactly_7(self):
"""Today + 6 days prior should be the 7-day window. Day 7 prior
(the 8th day inclusive) MUST be excluded."""
from datetime import timedelta as _td
from django.utils import timezone as _tz
today = _tz.localdate()
# Six absences at 0,1,2,3,4,5,6 days back → all should count (7 total)
for n in range(7):
Absence.objects.create(
worker=self.worker, date=today - _td(days=n),
reason='sick' if n % 2 == 0 else 'other',
)
# An absence 7 days back must NOT count (would make the window span 8 days)
Absence.objects.create(
worker=self.worker, date=today - _td(days=7), reason='other',
)
resp = self.client.get('/')
self.assertEqual(
resp.context['absences_recent_count'], 7,
'"Last 7 days" must span exactly 7 calendar days inclusive.',
)
def test_recent_payments_last_60_days_window_is_exactly_60(self):
"""Payroll-dashboard 'Paid (Last 60 Days)' must span exactly 60 days."""
from datetime import timedelta as _td
from django.utils import timezone as _tz
today = _tz.localdate()
# Payment 59 days back → counts
PayrollRecord.objects.create(
worker=self.worker, amount_paid=Decimal('100.00'),
date=today - _td(days=59),
)
# Payment 60 days back → MUST NOT count (would span 61 days)
PayrollRecord.objects.create(
worker=self.worker, amount_paid=Decimal('9999.00'),
date=today - _td(days=60),
)
resp = self.client.get('/payroll/?status=history')
self.assertEqual(
resp.context['recent_payments_total'], Decimal('100.00'),
'"Last 60 days" must span exactly 60 calendar days inclusive.',
)
class AbsenceDashboardCardTests(TestCase):
"""Round D — Admin dashboard shows 'X absent in last 7 days' alert card
when count > 0; renders nothing when count is 0."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
cls.worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('6000'))
def setUp(self):
self.client.force_login(self.admin)
def test_card_shows_when_recent_absences_exist(self):
from datetime import timedelta as _td
from django.utils import timezone as _tz
today = _tz.now().date()
Absence.objects.create(worker=self.worker, date=today, reason='sick')
Absence.objects.create(worker=self.worker, date=today - _td(days=3), reason='annual')
# Outside the 7-day window — should NOT count
Absence.objects.create(worker=self.worker, date=today - _td(days=10), reason='other')
resp = self.client.get('/')
self.assertEqual(resp.context['absences_recent_count'], 2)
self.assertContains(resp, 'absent in last 7 days')
def test_card_hidden_when_zero(self):
Absence.objects.all().delete()
resp = self.client.get('/')
self.assertEqual(resp.context['absences_recent_count'], 0)
self.assertNotContains(resp, 'absent in last 7 days')
# =============================================================================
# === FINAL PRE-PUSH POLISH FIXES ===
# Three small fixes from the final code review of the Absences feature:
# 1) AbsenceAdmin.save_model() runs the sync helper.
# 2) _delete_adjustment_with_cascade clears Absence.is_paid when
# deleting a Bonus that was created from an Absence.
# 3) (template only — no test needed) Resources menu shows Absences
# to supervisors as well as staff.
# =============================================================================
class AbsenceAdminAndCascadeTests(TestCase):
"""Final-pre-push fixes: Django admin sync + bulk-delete cascade
+ supervisor menu visibility."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(
username='admin_polish', password='pw', is_staff=True, is_superuser=True,
)
cls.worker = Worker.objects.create(
name='W', id_number='1', monthly_salary=Decimal('6000'),
)
def test_admin_save_model_syncs_adjustment(self):
"""Django admin edit toggling is_paid -> True creates the Bonus."""
from django.contrib.admin.sites import AdminSite
from core.admin import AbsenceAdmin
from django.http import HttpRequest
absence = Absence.objects.create(
worker=self.worker, date=_date(2026, 6, 1),
reason='sick', is_paid=False,
)
site = AdminSite()
admin_cls = AbsenceAdmin(Absence, site)
# Simulate admin form submission: flip is_paid to True.
absence.is_paid = True
req = HttpRequest()
req.user = self.admin
req.session = {}
# The messages framework needs middleware that isn't wired up in
# a bare HttpRequest. We only care that save_model triggers the
# sync helper — if a message tries to render and the framework
# objects, swallow it.
try:
admin_cls.save_model(req, absence, None, change=True)
except Exception:
pass
absence.refresh_from_db()
self.assertTrue(absence.is_paid)
self.assertIsNotNone(absence.payroll_adjustment)
self.assertEqual(absence.payroll_adjustment.type, 'Bonus')
def test_delete_adjustment_cascade_clears_absence_is_paid(self):
"""Deleting a Bonus via _delete_adjustment_with_cascade clears the
is_paid flag on the linked Absence (preventing state drift)."""
from core.views import (
_sync_absence_payroll_adjustment,
_delete_adjustment_with_cascade,
)
absence = Absence.objects.create(
worker=self.worker, date=_date(2026, 6, 5),
reason='sick', is_paid=True,
)
_sync_absence_payroll_adjustment(absence)
absence.refresh_from_db()
self.assertIsNotNone(absence.payroll_adjustment)
adj = absence.payroll_adjustment
# Delete the adjustment via the cascade helper — this is what
# /payroll/?status=adjustments bulk-delete goes through.
ok, reason = _delete_adjustment_with_cascade(adj)
self.assertTrue(ok)
self.assertIsNone(reason)
absence.refresh_from_db()
self.assertFalse(absence.is_paid)
self.assertIsNone(absence.payroll_adjustment)
class WorkerListTeamFilterTests(TestCase):
"""The /workers/ page accepts ?team=<id> to narrow the list to that
team's members, ?team=none for unassigned workers, and ?team= (empty)
for the default 'All teams' view."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(
username='admin', password='pw', is_staff=True, is_superuser=True,
)
# Two teams with workers and one worker unassigned to any team.
cls.team_a = Team.objects.create(name='Alpha Team')
cls.team_b = Team.objects.create(name='Bravo Team')
cls.w_alpha = Worker.objects.create(
name='Alice Alpha', id_number='AA1', monthly_salary=Decimal('6000'),
)
cls.w_bravo = Worker.objects.create(
name='Bob Bravo', id_number='BB1', monthly_salary=Decimal('6000'),
)
cls.w_orphan = Worker.objects.create(
name='Otto Orphan', id_number='OO1', monthly_salary=Decimal('6000'),
)
cls.team_a.workers.add(cls.w_alpha)
cls.team_b.workers.add(cls.w_bravo)
# w_orphan stays unassigned
def setUp(self):
self.client.force_login(self.admin)
def test_no_filter_shows_all_active(self):
resp = self.client.get('/workers/')
self.assertContains(resp, 'Alice Alpha')
self.assertContains(resp, 'Bob Bravo')
self.assertContains(resp, 'Otto Orphan')
def test_team_filter_narrows_to_team(self):
resp = self.client.get(f'/workers/?team={self.team_a.id}')
self.assertContains(resp, 'Alice Alpha')
self.assertNotContains(resp, 'Bob Bravo')
self.assertNotContains(resp, 'Otto Orphan')
def test_team_filter_none_shows_only_unassigned(self):
resp = self.client.get('/workers/?team=none')
self.assertNotContains(resp, 'Alice Alpha')
self.assertNotContains(resp, 'Bob Bravo')
self.assertContains(resp, 'Otto Orphan')
def test_team_filter_dropdown_lists_active_teams(self):
"""The Team filter dropdown should render the active teams as
options, plus 'All teams' and 'No team assigned'."""
resp = self.client.get('/workers/')
self.assertContains(resp, 'All teams')
self.assertContains(resp, 'No team assigned')
self.assertContains(resp, 'Alpha Team')
self.assertContains(resp, 'Bravo Team')
class WorkHistoryTeamFilterTests(TestCase):
"""The /history/ page accepts ?team=<id> to narrow to logs tagged
with that team, ?team=none for logs with no team set, and empty
for all logs (default)."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(
username='admin', password='pw', is_staff=True, is_superuser=True,
)
cls.project = Project.objects.create(name='Solar Farm Alpha')
cls.worker = Worker.objects.create(
name='Alpha Worker', id_number='AW1', monthly_salary=Decimal('6000'),
)
cls.team_a = Team.objects.create(name='Alpha Crew', supervisor=cls.admin)
cls.team_b = Team.objects.create(name='Bravo Crew', supervisor=cls.admin)
cls.team_a.workers.add(cls.worker)
cls.team_b.workers.add(cls.worker)
cls.log_a = WorkLog.objects.create(
date=_date(2026, 5, 10), project=cls.project,
team=cls.team_a, supervisor=cls.admin,
)
cls.log_a.workers.add(cls.worker)
cls.log_b = WorkLog.objects.create(
date=_date(2026, 5, 11), project=cls.project,
team=cls.team_b, supervisor=cls.admin,
)
cls.log_b.workers.add(cls.worker)
# Log with no team — ad-hoc attendance
cls.log_none = WorkLog.objects.create(
date=_date(2026, 5, 12), project=cls.project,
team=None, supervisor=cls.admin,
)
cls.log_none.workers.add(cls.worker)
def setUp(self):
self.client.force_login(self.admin)
def test_team_filter_narrows_to_team(self):
resp = self.client.get(f'/history/?team={self.team_a.id}')
log_ids = [l.id for l in resp.context['logs']]
self.assertIn(self.log_a.id, log_ids)
self.assertNotIn(self.log_b.id, log_ids)
self.assertNotIn(self.log_none.id, log_ids)
def test_team_filter_none_shows_logs_with_no_team(self):
resp = self.client.get('/history/?team=none')
log_ids = [l.id for l in resp.context['logs']]
self.assertNotIn(self.log_a.id, log_ids)
self.assertNotIn(self.log_b.id, log_ids)
self.assertIn(self.log_none.id, log_ids)
def test_team_filter_empty_shows_all(self):
resp = self.client.get('/history/')
log_ids = [l.id for l in resp.context['logs']]
self.assertIn(self.log_a.id, log_ids)
self.assertIn(self.log_b.id, log_ids)
self.assertIn(self.log_none.id, log_ids)
def test_team_filter_propagates_to_filter_params(self):
"""selected_team round-trips correctly into filter_params for
the List/Calendar toggle links."""
resp = self.client.get(f'/history/?team={self.team_a.id}')
self.assertIn(f'team={self.team_a.id}', resp.context['filter_params'])
self.assertEqual(resp.context['selected_team'], str(self.team_a.id))
class DashboardPaidThisMonthTests(TestCase):
"""Regression: 'Paid This Month' on the admin dashboard must be the
CURRENT CALENDAR MONTH only — not a rolling 60-day window. Previously
this card showed the same value as the payroll dashboard's 'Paid (60D)'
card, which was misleading when explaining the dashboard to a
non-developer."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(
username='admin', password='pw', is_staff=True, is_superuser=True,
)
cls.worker = Worker.objects.create(
name='Pay W', id_number='PW1', monthly_salary=Decimal('6000'),
)
def setUp(self):
self.client.force_login(self.admin)
def test_paid_this_month_excludes_last_month(self):
"""A payment dated 45 days ago (almost certainly in the prior
calendar month) must NOT count toward 'Paid This Month'."""
today = datetime.date.today()
forty_five_days_ago = today - datetime.timedelta(days=45)
# In-month payment (counts)
PayrollRecord.objects.create(
worker=self.worker, amount_paid=Decimal('1000.00'), date=today,
)
# 45-days-old payment (does NOT count — almost always prior month)
PayrollRecord.objects.create(
worker=self.worker, amount_paid=Decimal('9999.00'),
date=forty_five_days_ago,
)
resp = self.client.get('/')
self.assertEqual(resp.context['paid_this_month'], Decimal('1000.00'))
def test_paid_this_month_includes_first_of_month(self):
"""A payment dated 1st of the current month counts (boundary case)."""
today = datetime.date.today()
first_of_month = today.replace(day=1)
PayrollRecord.objects.create(
worker=self.worker, amount_paid=Decimal('500.00'),
date=first_of_month,
)
resp = self.client.get('/')
self.assertEqual(resp.context['paid_this_month'], Decimal('500.00'))
def test_paid_this_month_zero_when_no_payments(self):
"""No payments in the current month → zero (not None)."""
resp = self.client.get('/')
self.assertEqual(resp.context['paid_this_month'], Decimal('0.00'))