/absences/ filtered list with pagination + reason badges; /absences/<id>/edit/ syncs adjustment on save; /absences/<id>/delete/ cascades unpaid adjustment, refuses if paid; /absences/export/ admin-only CSV. 10 tests.
2424 lines
108 KiB
Python
2424 lines
108 KiB
Python
# === TESTS FOR WORK LOG PAYROLL CROSS-LINK ===
|
||
# Covers the _build_work_log_payroll_context helper — the core logic that
|
||
# determines, for each worker on a log, whether they were paid for it.
|
||
|
||
import datetime
|
||
from decimal import Decimal
|
||
|
||
from django.conf import settings
|
||
from django.contrib.auth.models import User
|
||
from django.test import TestCase
|
||
from django.urls import reverse
|
||
|
||
from core.models import Project, Team, Worker, WorkLog, PayrollRecord, PayrollAdjustment, Loan, Absence
|
||
from core.views import _build_work_log_payroll_context, _build_report_context
|
||
|
||
|
||
class WorkLogPayrollContextTests(TestCase):
|
||
"""Tests for the helper that builds the payroll-status view of a work log."""
|
||
|
||
def setUp(self):
|
||
# Minimal scenario: 1 admin, 1 project, 1 team, 3 workers, 1 log.
|
||
# Worker A has been paid for the log; Worker B is priced-not-paid;
|
||
# Worker C is unpaid.
|
||
self.admin = User.objects.create_user(username='admin', is_staff=True)
|
||
|
||
self.project = Project.objects.create(name='Test Project')
|
||
self.team = Team.objects.create(name='Team X', supervisor=self.admin)
|
||
|
||
self.worker_a = Worker.objects.create(name='Alice', id_number='A1', monthly_salary=Decimal('4000'))
|
||
self.worker_b = Worker.objects.create(name='Bob', id_number='B1', monthly_salary=Decimal('4000'))
|
||
self.worker_c = Worker.objects.create(name='Carol', id_number='C1', monthly_salary=Decimal('4000'))
|
||
|
||
self.log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 10),
|
||
project=self.project,
|
||
team=self.team,
|
||
supervisor=self.admin,
|
||
)
|
||
self.log.workers.add(self.worker_a, self.worker_b, self.worker_c)
|
||
|
||
# Worker A has a PayrollRecord linking them and this log — "Paid".
|
||
self.record_a = PayrollRecord.objects.create(
|
||
worker=self.worker_a,
|
||
amount_paid=Decimal('200.00'),
|
||
date=datetime.date(2026, 4, 15),
|
||
)
|
||
self.record_a.work_logs.add(self.log)
|
||
|
||
# Worker B appears in priced_workers but has no PayrollRecord — "Priced, not paid".
|
||
self.log.priced_workers.add(self.worker_b)
|
||
|
||
# Worker C has neither — "Unpaid".
|
||
|
||
def test_returns_log_and_worker_rows(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
self.assertEqual(ctx['log'], self.log)
|
||
self.assertEqual(len(ctx['worker_rows']), 3)
|
||
|
||
def test_paid_worker_has_payslip_link(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
row = next(r for r in ctx['worker_rows'] if r['worker'].id == self.worker_a.id)
|
||
self.assertEqual(row['status'], 'Paid')
|
||
self.assertEqual(row['payroll_record'], self.record_a)
|
||
self.assertGreater(row['earned'], 0)
|
||
|
||
def test_priced_but_unpaid_worker(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
row = next(r for r in ctx['worker_rows'] if r['worker'].id == self.worker_b.id)
|
||
self.assertEqual(row['status'], 'Priced, not paid')
|
||
self.assertIsNone(row['payroll_record'])
|
||
|
||
def test_totally_unpaid_worker(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
row = next(r for r in ctx['worker_rows'] if r['worker'].id == self.worker_c.id)
|
||
self.assertEqual(row['status'], 'Unpaid')
|
||
self.assertIsNone(row['payroll_record'])
|
||
|
||
def test_totals(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
# Paid = Alice's daily_rate (one record exists for this log+worker).
|
||
self.assertEqual(ctx['total_paid'], self.worker_a.daily_rate)
|
||
# Outstanding = Bob + Carol each at their daily_rate.
|
||
expected = self.worker_b.daily_rate + self.worker_c.daily_rate
|
||
self.assertEqual(ctx['total_outstanding'], expected)
|
||
|
||
def test_adjustments_linked_to_log(self):
|
||
adj = PayrollAdjustment.objects.create(
|
||
worker=self.worker_a,
|
||
project=self.project,
|
||
type='Overtime',
|
||
amount=Decimal('50.00'),
|
||
date=datetime.date(2026, 4, 10),
|
||
description='Extra hour',
|
||
work_log=self.log,
|
||
)
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
self.assertIn(adj, ctx['adjustments'])
|
||
|
||
def test_pay_period_absent_if_no_schedule(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
self.assertEqual(ctx['pay_period'], (None, None))
|
||
|
||
def test_pay_period_present_when_schedule_configured(self):
|
||
self.team.pay_frequency = 'weekly'
|
||
self.team.pay_start_date = datetime.date(2026, 1, 5) # A Monday
|
||
self.team.save()
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
start, end = ctx['pay_period']
|
||
self.assertIsNotNone(start)
|
||
self.assertIsNotNone(end)
|
||
self.assertLessEqual(start, self.log.date)
|
||
self.assertGreaterEqual(end, self.log.date)
|
||
|
||
def test_overtime_needs_pricing_flag(self):
|
||
"""Flag fires when log has OT hours but no priced_workers yet."""
|
||
# Start: no OT, no priced workers -> flag False
|
||
self.assertFalse(_build_work_log_payroll_context(self.log)['overtime_needs_pricing'])
|
||
|
||
# Add OT hours but keep priced_workers empty -> flag True
|
||
self.log.overtime_amount = Decimal('0.50')
|
||
self.log.priced_workers.clear()
|
||
self.log.save()
|
||
self.assertTrue(_build_work_log_payroll_context(self.log)['overtime_needs_pricing'])
|
||
|
||
# Price the OT -> flag False again
|
||
self.log.priced_workers.add(self.worker_a)
|
||
self.assertFalse(_build_work_log_payroll_context(self.log)['overtime_needs_pricing'])
|
||
|
||
def test_query_count_is_bounded(self):
|
||
"""Helper should not issue per-worker queries — guards against N+1."""
|
||
# 4 queries: payroll_records, priced_workers, workers.all, adjustments
|
||
# (plus whichever assertNumQueries overhead Django's test client adds).
|
||
# We assert a tight upper bound; regressions that add per-worker queries
|
||
# will push this well above the bound and fail the test.
|
||
with self.assertNumQueries(4):
|
||
_build_work_log_payroll_context(self.log)
|
||
|
||
def test_empty_log_returns_zero_totals(self):
|
||
"""Log with no workers: helper returns empty rows and zero totals."""
|
||
empty_log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 11),
|
||
project=self.project,
|
||
team=self.team,
|
||
supervisor=self.admin,
|
||
)
|
||
ctx = _build_work_log_payroll_context(empty_log)
|
||
self.assertEqual(ctx['worker_rows'], [])
|
||
self.assertEqual(ctx['total_earned'], Decimal('0.00'))
|
||
self.assertEqual(ctx['total_paid'], Decimal('0.00'))
|
||
self.assertEqual(ctx['total_outstanding'], Decimal('0.00'))
|
||
self.assertEqual(ctx['adjustments'], [])
|
||
|
||
def test_log_without_team_has_no_pay_period(self):
|
||
"""Log whose team was later soft-deleted to NULL still works."""
|
||
self.log.team = None
|
||
self.log.save()
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
self.assertEqual(ctx['pay_period'], (None, None))
|
||
# The rest of the context should still populate correctly.
|
||
self.assertEqual(len(ctx['worker_rows']), 3)
|
||
|
||
|
||
# === TESTS FOR THE WORK LOG PAYROLL AJAX ENDPOINT ===
|
||
# These cover the JSON endpoint that the Task-5 modal will consume.
|
||
# The endpoint must: return JSON to admins, forbid supervisors/anons,
|
||
# and 404 on unknown logs.
|
||
|
||
class WorkLogPayrollAjaxTests(TestCase):
|
||
"""Tests for the JSON AJAX endpoint that powers the modal."""
|
||
|
||
def setUp(self):
|
||
# One admin, one non-admin supervisor, and a simple log with one worker.
|
||
self.admin = User.objects.create_user(
|
||
username='admin', password='pass', is_staff=True
|
||
)
|
||
self.supervisor = User.objects.create_user(
|
||
username='sup', password='pass', is_staff=False
|
||
)
|
||
project = Project.objects.create(name='P')
|
||
team = Team.objects.create(name='T', supervisor=self.admin)
|
||
worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('4000'))
|
||
self.log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 10),
|
||
project=project, team=team, supervisor=self.admin,
|
||
)
|
||
self.log.workers.add(worker)
|
||
|
||
def test_admin_sees_200_json(self):
|
||
# Admin hits the endpoint and gets a well-formed JSON body.
|
||
self.client.login(username='admin', password='pass')
|
||
url = reverse('work_log_payroll_ajax', args=[self.log.id])
|
||
resp = self.client.get(url)
|
||
self.assertEqual(resp.status_code, 200)
|
||
data = resp.json()
|
||
self.assertEqual(data['log_id'], self.log.id)
|
||
self.assertEqual(len(data['worker_rows']), 1)
|
||
self.assertEqual(data['worker_rows'][0]['status'], 'Unpaid')
|
||
|
||
def test_supervisor_forbidden(self):
|
||
# A non-admin user (even if authenticated) gets 403 JSON.
|
||
self.client.login(username='sup', password='pass')
|
||
url = reverse('work_log_payroll_ajax', args=[self.log.id])
|
||
resp = self.client.get(url)
|
||
self.assertEqual(resp.status_code, 403)
|
||
|
||
def test_anonymous_redirected_to_login(self):
|
||
# @login_required intercepts before our view ever runs — 302 to /accounts/login/.
|
||
url = reverse('work_log_payroll_ajax', args=[self.log.id])
|
||
resp = self.client.get(url)
|
||
self.assertEqual(resp.status_code, 302)
|
||
|
||
def test_missing_log_is_404(self):
|
||
# get_object_or_404 returns a 404 if the log_id doesn't exist.
|
||
self.client.login(username='admin', password='pass')
|
||
resp = self.client.get('/history/99999/payroll/ajax/')
|
||
self.assertEqual(resp.status_code, 404)
|
||
|
||
def test_full_payload_with_paid_worker_null_team_and_ot(self):
|
||
"""One scenario exercising: paid worker, null team, OT flag, full URL."""
|
||
# Create a paid worker on a separate log with no team (null team edge case)
|
||
# and with overtime hours but no priced_workers (unpriced OT flag).
|
||
project = Project.objects.create(name='P-full')
|
||
paid_worker = Worker.objects.create(
|
||
name='Paula', id_number='P1', monthly_salary=Decimal('4000')
|
||
)
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 11),
|
||
project=project,
|
||
team=None, # null team
|
||
supervisor=self.admin,
|
||
overtime_amount=Decimal('1.50'), # > 0 and no priced_workers -> flag fires
|
||
)
|
||
log.workers.add(paid_worker)
|
||
|
||
# Link a PayrollRecord so Paula shows as Paid
|
||
record = PayrollRecord.objects.create(
|
||
worker=paid_worker,
|
||
amount_paid=Decimal('250.00'),
|
||
date=datetime.date(2026, 4, 16),
|
||
)
|
||
record.work_logs.add(log)
|
||
|
||
# Link an adjustment to the same log
|
||
adj = PayrollAdjustment.objects.create(
|
||
worker=paid_worker,
|
||
project=project,
|
||
type='Overtime',
|
||
amount=Decimal('75.00'),
|
||
date=datetime.date(2026, 4, 11),
|
||
description='OT pricing',
|
||
work_log=log,
|
||
payroll_record=record,
|
||
)
|
||
|
||
self.client.login(username='admin', password='pass')
|
||
resp = self.client.get(reverse('work_log_payroll_ajax', args=[log.id]))
|
||
self.assertEqual(resp.status_code, 200)
|
||
data = resp.json()
|
||
|
||
# Null team branch
|
||
self.assertIsNone(data['team'])
|
||
# Project is still present
|
||
self.assertEqual(data['project']['name'], 'P-full')
|
||
# Paid worker serialization shape
|
||
paula_row = next(r for r in data['worker_rows'] if r['worker_name'] == 'Paula')
|
||
self.assertEqual(paula_row['status'], 'Paid')
|
||
self.assertEqual(paula_row['payroll_record_id'], record.pk)
|
||
self.assertEqual(paula_row['paid_date'], '2026-04-16')
|
||
# OT flag fires (overtime > 0, no priced_workers)
|
||
self.assertTrue(data['overtime_needs_pricing'])
|
||
# full_page_url is the reverse of work_log_payroll_detail
|
||
self.assertEqual(data['full_page_url'], f'/history/{log.id}/')
|
||
# Adjustment serialized with payslip link
|
||
self.assertEqual(len(data['adjustments']), 1)
|
||
self.assertEqual(data['adjustments'][0]['type'], 'Overtime')
|
||
self.assertEqual(data['adjustments'][0]['payroll_record_id'], record.pk)
|
||
|
||
|
||
# === TESTS FOR THE WORK LOG PAYROLL FULL-PAGE VIEW ===
|
||
# These cover the HTML page at /history/<id>/ that shares the same context
|
||
# builder as the AJAX endpoint. Admin sees a 200 HTML page; supervisor 403.
|
||
|
||
class WorkLogPayrollDetailTests(TestCase):
|
||
"""Tests for the full-page /history/<id>/ view."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(
|
||
username='admin', password='pass', is_staff=True
|
||
)
|
||
self.supervisor = User.objects.create_user(
|
||
username='sup', password='pass', is_staff=False
|
||
)
|
||
project = Project.objects.create(name='P2')
|
||
team = Team.objects.create(name='T2', supervisor=self.admin)
|
||
worker = Worker.objects.create(name='Wanda', id_number='X', monthly_salary=Decimal('4000'))
|
||
self.log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 10),
|
||
project=project, team=team, supervisor=self.admin,
|
||
)
|
||
self.log.workers.add(worker)
|
||
|
||
def test_admin_gets_full_page(self):
|
||
self.client.login(username='admin', password='pass')
|
||
url = reverse('work_log_payroll_detail', args=[self.log.id])
|
||
resp = self.client.get(url)
|
||
self.assertEqual(resp.status_code, 200)
|
||
self.assertContains(resp, 'FoxFitt')
|
||
self.assertContains(resp, 'History')
|
||
self.assertContains(resp, 'Wanda')
|
||
|
||
def test_supervisor_forbidden(self):
|
||
self.client.login(username='sup', password='pass')
|
||
url = reverse('work_log_payroll_detail', args=[self.log.id])
|
||
resp = self.client.get(url)
|
||
self.assertEqual(resp.status_code, 403)
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR PAYROLL REPORT FILTER INFLATION ===
|
||
# Regression tests for the M2M double-JOIN bug in _build_report_context.
|
||
#
|
||
# THE BUG (fixed 2026-04-23):
|
||
# Filtering a report by project AND team via chained `.filter(work_logs__field=X)`
|
||
# calls produced TWO separate JOIN aliases on the core_payrollrecord_work_logs
|
||
# M2M table. Any downstream `.values().annotate(Sum(...))` then aggregated
|
||
# across the cartesian product of matching rows, inflating every per-worker
|
||
# total_paid and every payments_by_date amount by N² (where N = number of
|
||
# matching work logs per record). Total_paid_out itself was correct because
|
||
# `.aggregate(Sum(...))` wraps distinct() in a subquery, but the per-row
|
||
# values/annotate pattern doesn't get that help.
|
||
#
|
||
# These tests lock down the fix: with filters applied, the worker-level and
|
||
# date-level totals must equal the real payment amount, not a multiplied one.
|
||
# =============================================================================
|
||
|
||
|
||
class ReportContextFilterInflationTests(TestCase):
|
||
"""Report aggregates must not inflate when project + team filters combine."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='admin-r', is_staff=True)
|
||
self.project = Project.objects.create(name='Solar Farm Gamma')
|
||
self.team = Team.objects.create(name='Team Gamma', supervisor=self.admin)
|
||
self.worker = Worker.objects.create(
|
||
name='Test Worker', id_number='TW1', monthly_salary=Decimal('4000')
|
||
)
|
||
# Worker must be in the team's M2M for the adjustment-summary test
|
||
# to find them via worker__teams__id=team_id. In real data this is
|
||
# the standard setup — workers belong to a team.
|
||
self.team.workers.add(self.worker)
|
||
# Three work logs in the range, all in the same project + team.
|
||
# This is the minimum setup that reproduces the N² inflation: with
|
||
# one payroll record linked to 3 logs, the double-JOIN produces 9 rows.
|
||
self.logs = []
|
||
for day in (5, 10, 15):
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, day),
|
||
project=self.project,
|
||
team=self.team,
|
||
supervisor=self.admin,
|
||
)
|
||
log.workers.add(self.worker)
|
||
self.logs.append(log)
|
||
# One payment covering all 3 logs.
|
||
self.record = PayrollRecord.objects.create(
|
||
worker=self.worker,
|
||
amount_paid=Decimal('600.00'),
|
||
date=datetime.date(2026, 3, 20),
|
||
)
|
||
self.record.work_logs.add(*self.logs)
|
||
|
||
def _ctx(self, project_ids=None, team_ids=None):
|
||
return _build_report_context(
|
||
datetime.date(2026, 3, 1),
|
||
datetime.date(2026, 3, 31),
|
||
project_ids=project_ids,
|
||
team_ids=team_ids,
|
||
)
|
||
|
||
def test_worker_breakdown_not_inflated_with_project_filter_only(self):
|
||
ctx = self._ctx(project_ids=[self.project.id])
|
||
self.assertEqual(len(ctx['worker_breakdown']), 1)
|
||
# Pre-fix: this was 600 × 3 = 1800 (one JOIN, 3-way inflation).
|
||
self.assertEqual(ctx['worker_breakdown'][0]['total_paid'], Decimal('600.00'))
|
||
|
||
def test_worker_breakdown_not_inflated_with_both_filters(self):
|
||
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
|
||
self.assertEqual(len(ctx['worker_breakdown']), 1)
|
||
# Pre-fix: this was 600 × 3 × 3 = 5400 (two JOINs, 9-way inflation).
|
||
self.assertEqual(ctx['worker_breakdown'][0]['total_paid'], Decimal('600.00'))
|
||
|
||
def test_payments_by_date_not_inflated_with_both_filters(self):
|
||
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
|
||
payments = list(ctx['payments_by_date'])
|
||
self.assertEqual(len(payments), 1)
|
||
self.assertEqual(payments[0]['total'], Decimal('600.00'))
|
||
|
||
def test_total_paid_out_stays_correct_with_both_filters(self):
|
||
"""Regression guard: total_paid_out was ALREADY correct pre-fix
|
||
because .aggregate() handles distinct() via a subquery. Lock it in
|
||
so a future refactor doesn't accidentally reintroduce inflation here."""
|
||
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
|
||
self.assertEqual(ctx['total_paid_out'], Decimal('600.00'))
|
||
|
||
def test_adjustment_summary_not_inflated_with_team_filter(self):
|
||
"""Adjustments filtered by team go through worker__teams (M2M) — same
|
||
bug class. values().annotate(Sum()) would inflate if the worker is in
|
||
multiple teams or if the JOIN is chained with other M2M filters."""
|
||
PayrollAdjustment.objects.create(
|
||
worker=self.worker,
|
||
project=self.project,
|
||
type='Bonus',
|
||
amount=Decimal('100.00'),
|
||
date=datetime.date(2026, 3, 10),
|
||
description='Test bonus',
|
||
)
|
||
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
|
||
totals = {item['type']: item['total'] for item in ctx['adjustment_totals']}
|
||
self.assertEqual(totals.get('Bonus'), Decimal('100.00'))
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR SUPERVISOR PICKER QUERYSET ===
|
||
# Regression tests for the TeamForm/ProjectForm supervisor dropdown.
|
||
#
|
||
# THE BUG (fixed 2026-04-23):
|
||
# The picker required either admin flags (is_staff/is_superuser) or "Work Logger"
|
||
# group membership. Any active user NOT pre-enrolled in that specific group
|
||
# was invisible, even though the app's downstream `is_supervisor()` helper
|
||
# grants supervisor powers to anyone assigned to a team/project FK/M2M —
|
||
# regardless of group. The picker was strictly more restrictive than the
|
||
# permission model, hiding field supervisors from the person trying to assign
|
||
# them. Fix: allow any active user; let the act of assignment confer
|
||
# supervisor-ness (matching how `is_supervisor` works).
|
||
# =============================================================================
|
||
|
||
|
||
class SupervisorPickerQuerysetTests(TestCase):
|
||
"""Supervisor picker must surface all active users, not just admins."""
|
||
|
||
def test_regular_active_user_is_selectable(self):
|
||
"""A plain active user (no is_staff, no is_superuser, no Work Logger
|
||
group) must appear. Pre-fix: excluded by the Q filter."""
|
||
from core.forms import _supervisor_user_queryset
|
||
regular = User.objects.create_user(
|
||
username='field_supervisor', password='pass',
|
||
is_staff=False, is_superuser=False,
|
||
)
|
||
self.assertIn(regular, _supervisor_user_queryset())
|
||
|
||
def test_user_in_unrelated_group_is_still_selectable(self):
|
||
"""A user in some random group (not 'Work Logger') is still an active
|
||
user and must show up. Pre-fix: excluded because the Q filter specifically
|
||
checked groups__name='Work Logger'."""
|
||
from django.contrib.auth.models import Group
|
||
from core.forms import _supervisor_user_queryset
|
||
user = User.objects.create_user(username='misc_group_user', password='pass')
|
||
group, _ = Group.objects.get_or_create(name='Some Other Group')
|
||
user.groups.add(group)
|
||
self.assertIn(user, _supervisor_user_queryset())
|
||
|
||
def test_inactive_user_still_excluded(self):
|
||
"""Inactive users must NEVER appear in the picker — even if they used
|
||
to be admins or Work Loggers. Active-only is the one hard guardrail."""
|
||
from core.forms import _supervisor_user_queryset
|
||
inactive = User.objects.create_user(
|
||
username='former_employee', password='pass',
|
||
is_active=False, is_staff=True,
|
||
)
|
||
self.assertNotIn(inactive, _supervisor_user_queryset())
|
||
|
||
def test_admin_still_selectable(self):
|
||
"""Defense-in-depth: admins continue to appear (no regression for the
|
||
existing use case)."""
|
||
from core.forms import _supervisor_user_queryset
|
||
admin = User.objects.create_user(
|
||
username='an_admin', password='pass', is_staff=True
|
||
)
|
||
self.assertIn(admin, _supervisor_user_queryset())
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR EXECUTIVE REPORT v2 ===
|
||
# Covers the new helpers introduced in the report rebuild (Apr 2026):
|
||
# _company_cost_velocity, _current_outstanding_in_scope, _team_project_activity,
|
||
# and the multi-filter extension of _build_report_context.
|
||
# =============================================================================
|
||
|
||
|
||
class CompanyCostVelocityTests(TestCase):
|
||
"""Company-wide avg daily and monthly labour cost (hero KPI card 3 & 4)."""
|
||
|
||
def test_empty_db_returns_zero(self):
|
||
from core.views import _company_cost_velocity
|
||
result = _company_cost_velocity()
|
||
self.assertEqual(result['avg_daily'], Decimal('0.00'))
|
||
self.assertEqual(result['avg_monthly'], Decimal('0.00'))
|
||
self.assertEqual(result['working_days'], 0)
|
||
|
||
def test_known_values(self):
|
||
from core.views import _company_cost_velocity
|
||
# Setup: 2 workers (daily_rate = 4000/20 = R 200 each), each works 5 distinct dates.
|
||
# Lifetime cost = 2 workers * 5 days * R 200 = R 2000. Working days = 5.
|
||
# Avg daily = 2000 / 5 = R 400.
|
||
# Avg monthly = 400 * 30.44 = R 12,176.
|
||
admin = User.objects.create_user(username='admin-cv', is_staff=True)
|
||
project = Project.objects.create(name='P')
|
||
w1 = Worker.objects.create(name='W1', id_number='W1', monthly_salary=Decimal('4000'))
|
||
w2 = Worker.objects.create(name='W2', id_number='W2', monthly_salary=Decimal('4000'))
|
||
for d in range(1, 6): # 5 distinct dates
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, d),
|
||
project=project, supervisor=admin,
|
||
)
|
||
log.workers.add(w1, w2)
|
||
|
||
result = _company_cost_velocity()
|
||
self.assertEqual(result['working_days'], 5)
|
||
self.assertEqual(result['avg_daily'], Decimal('400.00'))
|
||
# Tolerance: ±1 cent for the 30.44 multiplication
|
||
self.assertAlmostEqual(
|
||
float(result['avg_monthly']), 12176.00, delta=0.01
|
||
)
|
||
|
||
def test_duplicate_dates_not_double_counted(self):
|
||
"""Two workers working the same date = 1 distinct date, not 2."""
|
||
from core.views import _company_cost_velocity
|
||
admin = User.objects.create_user(username='admin-cv2', is_staff=True)
|
||
project = Project.objects.create(name='P2')
|
||
w1 = Worker.objects.create(name='X', id_number='X1', monthly_salary=Decimal('4000'))
|
||
w2 = Worker.objects.create(name='Y', id_number='Y1', monthly_salary=Decimal('4000'))
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 1), project=project, supervisor=admin,
|
||
)
|
||
log.workers.add(w1, w2)
|
||
result = _company_cost_velocity()
|
||
self.assertEqual(result['working_days'], 1) # not 2
|
||
|
||
|
||
class CurrentOutstandingInScopeTests(TestCase):
|
||
"""Hero card 2 — 'Outstanding NOW' with optional filter scope."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='a-out', is_staff=True)
|
||
self.p1 = Project.objects.create(name='ProjA')
|
||
self.p2 = Project.objects.create(name='ProjB')
|
||
self.t1 = Team.objects.create(name='TeamA', supervisor=self.admin)
|
||
self.w = Worker.objects.create(
|
||
name='Wkr', id_number='W1', monthly_salary=Decimal('4000')
|
||
)
|
||
self.t1.workers.add(self.w)
|
||
# Unpaid log on project 1
|
||
log1 = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 1),
|
||
project=self.p1, team=self.t1, supervisor=self.admin,
|
||
)
|
||
log1.workers.add(self.w)
|
||
# Unpaid log on project 2
|
||
log2 = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 2),
|
||
project=self.p2, team=self.t1, supervisor=self.admin,
|
||
)
|
||
log2.workers.add(self.w)
|
||
|
||
def test_no_filters_includes_all_projects(self):
|
||
from core.views import _current_outstanding_in_scope
|
||
result = _current_outstanding_in_scope()
|
||
# daily_rate = 4000/20 = 200; 2 unpaid logs * 200 = 400
|
||
self.assertEqual(result['total'], Decimal('400.00'))
|
||
self.assertEqual(len(result['by_project']), 2)
|
||
|
||
def test_project_filter_scopes_total(self):
|
||
from core.views import _current_outstanding_in_scope
|
||
result = _current_outstanding_in_scope(project_ids=[self.p1.id])
|
||
self.assertEqual(result['total'], Decimal('200.00'))
|
||
self.assertEqual(len(result['by_project']), 1)
|
||
self.assertEqual(result['by_project'][0]['name'], 'ProjA')
|
||
|
||
def test_team_filter_scopes_total(self):
|
||
"""Team filter on work logs + worker__teams on adjustments."""
|
||
from core.views import _current_outstanding_in_scope
|
||
# Adjustment on a worker not in t1
|
||
other_worker = Worker.objects.create(
|
||
name='Other', id_number='O1', monthly_salary=Decimal('4000')
|
||
)
|
||
PayrollAdjustment.objects.create(
|
||
worker=other_worker, project=self.p1, type='Bonus',
|
||
amount=Decimal('500.00'), date=datetime.date(2026, 3, 3),
|
||
)
|
||
# With team filter, only self.w's logs appear — R 400 total
|
||
result = _current_outstanding_in_scope(team_ids=[self.t1.id])
|
||
self.assertEqual(result['total'], Decimal('400.00'))
|
||
# The R500 bonus on other_worker must NOT appear in by_project because
|
||
# that worker isn't in t1 — the team scope excludes them entirely.
|
||
self.assertEqual(len(result['by_project']), 2)
|
||
amounts = [row['amount'] for row in result['by_project']]
|
||
self.assertNotIn(Decimal('500.00'), amounts)
|
||
|
||
|
||
class TeamProjectActivityTests(TestCase):
|
||
"""Chapter IV pivot: rows=team, columns=project, cell=distinct log dates."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='a-tpa', is_staff=True)
|
||
self.p1 = Project.objects.create(name='P1')
|
||
self.p2 = Project.objects.create(name='P2')
|
||
self.t1 = Team.objects.create(name='T1', supervisor=self.admin)
|
||
self.t2 = Team.objects.create(name='T2', supervisor=self.admin)
|
||
w = Worker.objects.create(name='W', id_number='W1', monthly_salary=Decimal('4000'))
|
||
|
||
# T1 works 3 distinct dates on P1
|
||
for d in (1, 2, 3):
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, d), project=self.p1, team=self.t1,
|
||
supervisor=self.admin,
|
||
)
|
||
log.workers.add(w)
|
||
|
||
# T2 works 2 distinct dates on P1 and 1 on P2
|
||
for d in (4, 5):
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, d), project=self.p1, team=self.t2,
|
||
supervisor=self.admin,
|
||
)
|
||
log.workers.add(w)
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 6), project=self.p2, team=self.t2,
|
||
supervisor=self.admin,
|
||
)
|
||
log.workers.add(w)
|
||
|
||
self.logs_qs = WorkLog.objects.filter(
|
||
date__gte=datetime.date(2026, 3, 1),
|
||
date__lte=datetime.date(2026, 3, 31),
|
||
)
|
||
|
||
def test_pivot_shape(self):
|
||
from core.views import _team_project_activity
|
||
r = _team_project_activity(self.logs_qs)
|
||
# 2 columns (P1, P2), 2 rows (T1, T2)
|
||
self.assertEqual(len(r['columns']), 2)
|
||
self.assertEqual(len(r['rows']), 2)
|
||
|
||
def test_cell_counts(self):
|
||
from core.views import _team_project_activity
|
||
r = _team_project_activity(self.logs_qs)
|
||
rows = {row['team_name']: row for row in r['rows']}
|
||
# T1 has 3 days on P1, 0 on P2
|
||
self.assertEqual(rows['T1']['cells_by_project_id'][self.p1.id], 3)
|
||
self.assertEqual(rows['T1']['cells_by_project_id'].get(self.p2.id, 0), 0)
|
||
# T2 has 2 days on P1, 1 on P2
|
||
self.assertEqual(rows['T2']['cells_by_project_id'][self.p1.id], 2)
|
||
self.assertEqual(rows['T2']['cells_by_project_id'][self.p2.id], 1)
|
||
|
||
def test_row_and_column_totals(self):
|
||
from core.views import _team_project_activity
|
||
r = _team_project_activity(self.logs_qs)
|
||
rows = {row['team_name']: row for row in r['rows']}
|
||
self.assertEqual(rows['T1']['row_total'], 3)
|
||
self.assertEqual(rows['T2']['row_total'], 3)
|
||
self.assertEqual(r['col_totals'][self.p1.id], 5)
|
||
self.assertEqual(r['col_totals'][self.p2.id], 1)
|
||
self.assertEqual(r['grand_total'], 6)
|
||
|
||
def test_team_with_no_logs_omitted(self):
|
||
"""Team with zero logs in the period should not appear as a row."""
|
||
from core.views import _team_project_activity
|
||
Team.objects.create(name='GhostTeam', supervisor=self.admin)
|
||
r = _team_project_activity(self.logs_qs)
|
||
team_names = [row['team_name'] for row in r['rows']]
|
||
self.assertNotIn('GhostTeam', team_names)
|
||
|
||
|
||
class ChapterOneEnrichmentTests(TestCase):
|
||
"""Chapter I — All Time Projects gains working_days and avg_per_working_day."""
|
||
|
||
def test_alltime_projects_includes_working_days_and_avg(self):
|
||
from core.views import _build_report_context
|
||
admin = User.objects.create_user(username='c1', is_staff=True)
|
||
proj = Project.objects.create(name='C1', start_date=datetime.date(2026, 1, 1))
|
||
w = Worker.objects.create(name='W', id_number='W1', monthly_salary=Decimal('4000'))
|
||
# 4 distinct dates, 1 worker each; daily_rate=200; total = R 800; working_days=4; avg=200
|
||
for d in (1, 2, 3, 4):
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, d), project=proj, supervisor=admin,
|
||
)
|
||
log.workers.add(w)
|
||
ctx = _build_report_context(
|
||
datetime.date(2026, 1, 1), datetime.date(2026, 12, 31),
|
||
)
|
||
by_name = {p['project']: p for p in ctx['alltime_projects']}
|
||
self.assertIn('C1', by_name)
|
||
self.assertEqual(by_name['C1']['working_days'], 4)
|
||
self.assertEqual(by_name['C1']['avg_per_working_day'], Decimal('200.00'))
|
||
self.assertEqual(by_name['C1']['start_date'], datetime.date(2026, 1, 1))
|
||
# last_activity = the most recent WorkLog.date (4th of March here)
|
||
self.assertEqual(
|
||
by_name['C1']['last_activity'], datetime.date(2026, 3, 4),
|
||
'alltime_projects rows should expose the most-recent log date '
|
||
'so the report can show "Last Activity" per project'
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR MULTI-VALUE FILTER SUPPORT (Task 6) ===
|
||
# _build_report_context now accepts project_ids and team_ids lists.
|
||
# =============================================================================
|
||
|
||
|
||
class ReportMultiFilterTests(TestCase):
|
||
"""Task 6 — multi-value project_ids / team_ids filters."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='mf', is_staff=True)
|
||
self.p1 = Project.objects.create(name='P1')
|
||
self.p2 = Project.objects.create(name='P2')
|
||
self.p3 = Project.objects.create(name='P3')
|
||
self.team = Team.objects.create(name='T', supervisor=self.admin)
|
||
self.w = Worker.objects.create(
|
||
name='W', id_number='W1', monthly_salary=Decimal('4000')
|
||
)
|
||
self.team.workers.add(self.w)
|
||
# One log + one paid record per project
|
||
for proj in (self.p1, self.p2, self.p3):
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 1),
|
||
project=proj, team=self.team, supervisor=self.admin,
|
||
)
|
||
log.workers.add(self.w)
|
||
rec = PayrollRecord.objects.create(
|
||
worker=self.w, amount_paid=Decimal('100.00'),
|
||
date=datetime.date(2026, 3, 5),
|
||
)
|
||
rec.work_logs.add(log)
|
||
|
||
def _ctx(self, project_ids=None, team_ids=None):
|
||
from core.views import _build_report_context
|
||
return _build_report_context(
|
||
datetime.date(2026, 3, 1), datetime.date(2026, 3, 31),
|
||
project_ids=project_ids, team_ids=team_ids,
|
||
)
|
||
|
||
def test_multi_project_union(self):
|
||
ctx = self._ctx(project_ids=[self.p1.id, self.p2.id])
|
||
# Two projects paid R 100 each = R 200; third excluded
|
||
self.assertEqual(ctx['total_paid_out'], Decimal('200.00'))
|
||
|
||
def test_empty_list_equals_none(self):
|
||
ctx_none = self._ctx(project_ids=None)
|
||
ctx_empty = self._ctx(project_ids=[])
|
||
self.assertEqual(ctx_none['total_paid_out'], ctx_empty['total_paid_out'])
|
||
|
||
def test_no_inflation_with_multi_project(self):
|
||
"""Worker breakdown must not inflate when multiple projects are selected."""
|
||
ctx = self._ctx(project_ids=[self.p1.id, self.p2.id, self.p3.id])
|
||
self.assertEqual(len(ctx['worker_breakdown']), 1)
|
||
# All three records are for the same worker, R 100 each = R 300
|
||
self.assertEqual(ctx['worker_breakdown'][0]['total_paid'], Decimal('300.00'))
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR INLINE FILTERS (Report Page) ===
|
||
# Pill-as-dropdown + cross-filter feature. Most behaviour is template/JS;
|
||
# the only backend surface is the project_team_pairs_json context key that
|
||
# powers the client-side Team<->Project cross-filter.
|
||
# =============================================================================
|
||
|
||
|
||
class InlineFiltersPairsContextTests(TestCase):
|
||
"""Report view must serialise distinct (project_id, team_id) pairs for
|
||
the pill-popover cross-filter JS."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(
|
||
username='admin-if', password='pass', is_staff=True
|
||
)
|
||
self.p1 = Project.objects.create(name='P1')
|
||
self.p2 = Project.objects.create(name='P2')
|
||
self.t1 = Team.objects.create(name='T1', supervisor=self.admin)
|
||
self.t2 = Team.objects.create(name='T2', supervisor=self.admin)
|
||
self.w = Worker.objects.create(
|
||
name='W', id_number='W1', monthly_salary=Decimal('4000')
|
||
)
|
||
# Log t1 on p1, t2 on p2 — so pairs should be [(p1,t1), (p2,t2)]
|
||
for proj, team in [(self.p1, self.t1), (self.p2, self.t2)]:
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 1),
|
||
project=proj, team=team, supervisor=self.admin,
|
||
)
|
||
log.workers.add(self.w)
|
||
|
||
def test_pairs_context_key_populated(self):
|
||
# The context value is a raw Python list of dicts; Django's
|
||
# |json_script filter handles the single JSON serialisation at
|
||
# template render time (no double-encoding).
|
||
self.client.login(username='admin-if', password='pass')
|
||
url = reverse('generate_report')
|
||
resp = self.client.get(url + '?from_month=2026-03&to_month=2026-04')
|
||
self.assertEqual(resp.status_code, 200)
|
||
pairs = resp.context['project_team_pairs_json']
|
||
# Each entry has both project_id and team_id
|
||
for p in pairs:
|
||
self.assertIn('project_id', p)
|
||
self.assertIn('team_id', p)
|
||
# Expected pairs (as tuples for set comparison)
|
||
pair_set = {(p['project_id'], p['team_id']) for p in pairs}
|
||
self.assertIn((self.p1.id, self.t1.id), pair_set)
|
||
self.assertIn((self.p2.id, self.t2.id), pair_set)
|
||
|
||
def test_pairs_excludes_null_project_or_team(self):
|
||
"""Logs with null project or null team should not appear in pairs."""
|
||
# Add a log with team=None
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 2),
|
||
project=self.p1, team=None, supervisor=self.admin,
|
||
)
|
||
log.workers.add(self.w)
|
||
|
||
self.client.login(username='admin-if', password='pass')
|
||
resp = self.client.get(reverse('generate_report') + '?from_month=2026-03&to_month=2026-04')
|
||
pairs = resp.context['project_team_pairs_json']
|
||
# No pair should have team_id=None
|
||
self.assertTrue(all(p['team_id'] is not None for p in pairs))
|
||
|
||
def test_pairs_renders_as_valid_json_in_template(self):
|
||
"""End-to-end: the rendered HTML must contain a single, valid JSON
|
||
array inside the <script id="projectTeamPairs"> tag — NOT a
|
||
JSON-encoded string (which was the bug that broke all pill
|
||
interactions before the context key was changed from
|
||
`json.dumps(pairs)` to raw `pairs`)."""
|
||
import json as _json
|
||
import re
|
||
|
||
self.client.login(username='admin-if', password='pass')
|
||
resp = self.client.get(reverse('generate_report') + '?from_month=2026-03&to_month=2026-04')
|
||
html = resp.content.decode('utf-8')
|
||
|
||
# Extract the JSON payload inside <script id="projectTeamPairs">...</script>
|
||
match = re.search(
|
||
r'<script id="projectTeamPairs"[^>]*>(.*?)</script>',
|
||
html, re.DOTALL
|
||
)
|
||
self.assertIsNotNone(match, 'projectTeamPairs <script> tag missing')
|
||
payload = match.group(1).strip()
|
||
|
||
# Must parse to a LIST, not a string.
|
||
parsed = _json.loads(payload)
|
||
self.assertIsInstance(parsed, list,
|
||
"Double-encoded JSON regression: browser's JSON.parse "
|
||
"would return a string here, killing pairs.forEach() in the "
|
||
"pill-popover JS. See 2026-04-23 bugfix.")
|
||
# And the list members must be dicts with project_id + team_id
|
||
for entry in parsed:
|
||
self.assertIsInstance(entry, dict)
|
||
self.assertIn('project_id', entry)
|
||
self.assertIn('team_id', entry)
|
||
|
||
def test_pickers_and_pairs_are_date_scoped(self):
|
||
"""Checkpoint-1 refinement: projects/teams lists + the pair map
|
||
include only entries with WorkLog activity INSIDE the selected
|
||
date range — NOT entire-history entries. Entries that are in the
|
||
URL's `?project=` or `?team=` selection are always preserved,
|
||
though, so the user's pick can never vanish."""
|
||
# Add a third project/team that ONLY worked outside the report window
|
||
out_project = Project.objects.create(name='P-out')
|
||
out_team = Team.objects.create(name='T-out', supervisor=self.admin)
|
||
out_log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 1, 15), # outside March window
|
||
project=out_project, team=out_team, supervisor=self.admin,
|
||
)
|
||
out_log.workers.add(self.w)
|
||
|
||
self.client.login(username='admin-if', password='pass')
|
||
# Request only March 2026 — Jan logs should be excluded
|
||
resp = self.client.get(
|
||
reverse('generate_report') + '?from_month=2026-03&to_month=2026-03'
|
||
)
|
||
self.assertEqual(resp.status_code, 200)
|
||
|
||
# Picker lists: out-of-range project + team should NOT appear
|
||
project_ids = {p.id for p in resp.context['projects']}
|
||
team_ids = {t.id for t in resp.context['teams']}
|
||
self.assertIn(self.p1.id, project_ids)
|
||
self.assertIn(self.p2.id, project_ids)
|
||
self.assertNotIn(out_project.id, project_ids,
|
||
'Out-of-range project must not appear in the date-scoped list')
|
||
self.assertIn(self.t1.id, team_ids)
|
||
self.assertIn(self.t2.id, team_ids)
|
||
self.assertNotIn(out_team.id, team_ids,
|
||
'Out-of-range team must not appear in the date-scoped list')
|
||
|
||
# Pair map: must also be date-scoped
|
||
pair_set = {(p['project_id'], p['team_id'])
|
||
for p in resp.context['project_team_pairs_json']}
|
||
self.assertNotIn((out_project.id, out_team.id), pair_set,
|
||
'Out-of-range pair must not appear in the cross-filter map')
|
||
|
||
def test_url_selected_projects_survive_even_out_of_range(self):
|
||
"""A project explicitly in the URL's ?project= selection must
|
||
remain in the picker list even if it has no logs in the current
|
||
date range — otherwise the user couldn't see (or deselect) their
|
||
own pick."""
|
||
out_project = Project.objects.create(name='P-out')
|
||
# Never logs anything in any date range
|
||
|
||
self.client.login(username='admin-if', password='pass')
|
||
resp = self.client.get(
|
||
reverse('generate_report')
|
||
+ '?from_month=2026-03&to_month=2026-03'
|
||
+ f'&project={out_project.id}'
|
||
)
|
||
self.assertEqual(resp.status_code, 200)
|
||
project_ids = {p.id for p in resp.context['projects']}
|
||
self.assertIn(out_project.id, project_ids,
|
||
'URL-selected project must survive the date-scope filter')
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR |type_slug FILTER ===
|
||
# Used by Adjustments tab to build CSS class names from type labels.
|
||
# =============================================================================
|
||
|
||
|
||
class TypeSlugFilterTests(TestCase):
|
||
"""format_tags.type_slug converts adjustment-type labels to slugs."""
|
||
|
||
def test_spaces_become_hyphens_and_lowercased(self):
|
||
from core.templatetags.format_tags import type_slug
|
||
self.assertEqual(type_slug('Advance Payment'), 'advance-payment')
|
||
self.assertEqual(type_slug('New Loan'), 'new-loan')
|
||
self.assertEqual(type_slug('Bonus'), 'bonus')
|
||
|
||
def test_empty_or_none_returns_empty_string(self):
|
||
from core.templatetags.format_tags import type_slug
|
||
self.assertEqual(type_slug(''), '')
|
||
self.assertEqual(type_slug(None), '')
|
||
|
||
def test_idempotent_on_already_slugged(self):
|
||
from core.templatetags.format_tags import type_slug
|
||
self.assertEqual(type_slug('bonus'), 'bonus')
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR ADJUSTMENTS TAB (payroll_dashboard ?status=adjustments) ===
|
||
# Covers the new tab's backend: filters, sort, stats, pagination.
|
||
# Each test creates its own fresh fixture via setUp.
|
||
# NOTE: PayrollRecord only accepts worker/date/amount_paid (see core/models.py).
|
||
# The plan spec used days_worked/total_amount — those do NOT exist. Adapted.
|
||
# =============================================================================
|
||
|
||
|
||
class AdjustmentsTabTests(TestCase):
|
||
"""New Adjustments tab on /payroll/?status=adjustments."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(
|
||
username='adj-admin', password='pass', is_staff=True, is_superuser=True
|
||
)
|
||
self.sup = User.objects.create_user(
|
||
username='adj-sup', password='pass'
|
||
)
|
||
self.w1 = Worker.objects.create(
|
||
name='Alice', id_number='A1', monthly_salary=Decimal('4000')
|
||
)
|
||
self.w2 = Worker.objects.create(
|
||
name='Bob', id_number='B1', monthly_salary=Decimal('4000')
|
||
)
|
||
# Two teams, BOTH workers in BOTH teams, so the naive M2M JOIN
|
||
# multiplies rows by team count. Exercises the subquery fix.
|
||
self.team = Team.objects.create(name='Alpha', supervisor=self.admin)
|
||
self.team2 = Team.objects.create(name='Beta', supervisor=self.admin)
|
||
self.team.workers.add(self.w1, self.w2)
|
||
self.team2.workers.add(self.w1, self.w2)
|
||
self.proj = Project.objects.create(name='Site X')
|
||
# 3 unpaid adjustments — 1 bonus Alice, 1 bonus Bob, 1 deduction Alice
|
||
self.a1 = PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='Bonus',
|
||
amount=Decimal('500'), date=datetime.date(2026, 4, 10),
|
||
description='April bonus',
|
||
)
|
||
self.a2 = PayrollAdjustment.objects.create(
|
||
worker=self.w2, project=self.proj, type='Bonus',
|
||
amount=Decimal('300'), date=datetime.date(2026, 4, 11),
|
||
description='Project milestone',
|
||
)
|
||
self.a3 = PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='Deduction',
|
||
amount=Decimal('100'), date=datetime.date(2026, 3, 28),
|
||
description='Missing tool',
|
||
)
|
||
self.url = reverse('payroll_dashboard') + '?status=adjustments'
|
||
|
||
def _login_admin(self):
|
||
self.client.login(username='adj-admin', password='pass')
|
||
|
||
def test_admin_sees_adjustments_tab(self):
|
||
self._login_admin()
|
||
resp = self.client.get(self.url)
|
||
self.assertEqual(resp.status_code, 200)
|
||
self.assertEqual(resp.context['active_tab'], 'adjustments')
|
||
# All 3 fixture adjustments should be in the listing
|
||
self.assertEqual(len(resp.context['adj_page'].object_list), 3)
|
||
|
||
def test_supervisor_forbidden(self):
|
||
self.client.login(username='adj-sup', password='pass')
|
||
resp = self.client.get(self.url)
|
||
# Existing payroll_dashboard pattern: non-admin is redirected home
|
||
self.assertEqual(resp.status_code, 302)
|
||
|
||
def test_type_multi_filter(self):
|
||
"""?type=Bonus&type=Deduction returns the UNION (3 rows: 2 bonuses + 1
|
||
deduction), not the intersection."""
|
||
self._login_admin()
|
||
resp = self.client.get(self.url + '&type=Bonus&type=Deduction')
|
||
self.assertEqual(resp.context['adj_total_count'], 3)
|
||
ids = {a.id for a in resp.context['adj_page'].object_list}
|
||
self.assertEqual(ids, {self.a1.id, self.a2.id, self.a3.id})
|
||
|
||
def test_worker_multi_filter(self):
|
||
self._login_admin()
|
||
resp = self.client.get(self.url + f'&worker={self.w1.id}')
|
||
ids = {a.id for a in resp.context['adj_page'].object_list}
|
||
self.assertIn(self.a1.id, ids)
|
||
self.assertNotIn(self.a2.id, ids)
|
||
self.assertIn(self.a3.id, ids)
|
||
|
||
def test_team_filter_uses_subquery_no_inflation(self):
|
||
"""Filtering by team must NOT multiply rows. With 2 teams x 2 workers x 3
|
||
adjustments, a naive worker__teams__id__in filter would return 6 inflated
|
||
rows; the subquery pattern returns the true 3. See CLAUDE.md ORM gotcha."""
|
||
self._login_admin()
|
||
resp = self.client.get(
|
||
self.url + f'&team={self.team.id}&team={self.team2.id}'
|
||
)
|
||
# .count() at the queryset level would blow up under inflation —
|
||
# asserting it guards against regressions more strictly than checking
|
||
# the paginator's object_list length.
|
||
self.assertEqual(resp.context['adj_total_count'], 3)
|
||
self.assertEqual(len(resp.context['adj_page'].object_list), 3)
|
||
|
||
def test_status_filter_unpaid(self):
|
||
self._login_admin()
|
||
# Mark a1 as paid — PayrollRecord fields are worker/date/amount_paid
|
||
pr = PayrollRecord.objects.create(
|
||
worker=self.w1, date=datetime.date(2026, 4, 15),
|
||
amount_paid=Decimal('4000'),
|
||
)
|
||
self.a1.payroll_record = pr
|
||
self.a1.save()
|
||
resp = self.client.get(self.url + '&adj_status=unpaid')
|
||
ids = {a.id for a in resp.context['adj_page'].object_list}
|
||
self.assertNotIn(self.a1.id, ids)
|
||
self.assertIn(self.a2.id, ids)
|
||
self.assertIn(self.a3.id, ids)
|
||
|
||
def test_date_range_filter(self):
|
||
self._login_admin()
|
||
# March 1 to March 31 -> only a3 (dated 28 Mar)
|
||
resp = self.client.get(
|
||
self.url + '&adj_date_from=2026-03-01&adj_date_to=2026-03-31'
|
||
)
|
||
ids = {a.id for a in resp.context['adj_page'].object_list}
|
||
self.assertEqual(ids, {self.a3.id})
|
||
|
||
def test_stats_scoped_to_filtered_set(self):
|
||
self._login_admin()
|
||
resp = self.client.get(self.url + '&type=Bonus')
|
||
# 2 bonuses, 0 paid, total R 800 additive, 0 deductive
|
||
self.assertEqual(resp.context['adj_total_count'], 2)
|
||
self.assertEqual(resp.context['adj_unpaid_count'], 2)
|
||
self.assertEqual(resp.context['adj_additive_sum'], Decimal('800.00'))
|
||
self.assertEqual(resp.context['adj_deductive_sum'], Decimal('0.00'))
|
||
|
||
def test_group_by_type(self):
|
||
self._login_admin()
|
||
resp = self.client.get(self.url + '&group_by=type')
|
||
groups = resp.context['adj_groups']
|
||
self.assertIsNotNone(groups)
|
||
labels = {g['label'] for g in groups}
|
||
self.assertEqual(labels, {'Bonus', 'Deduction'})
|
||
bonus_group = next(g for g in groups if g['label'] == 'Bonus')
|
||
self.assertEqual(bonus_group['count'], 2)
|
||
self.assertEqual(bonus_group['net_sum'], Decimal('800.00')) # +R 500 + +R 300
|
||
deduction_group = next(g for g in groups if g['label'] == 'Deduction')
|
||
self.assertEqual(deduction_group['net_sum'], Decimal('-100.00'))
|
||
# Groups must be ordered by descending |net_sum| — biggest impact
|
||
# first. |800| > |100| so Bonus must come before Deduction.
|
||
self.assertEqual(groups[0]['label'], 'Bonus')
|
||
|
||
def test_group_by_worker(self):
|
||
self._login_admin()
|
||
resp = self.client.get(self.url + '&group_by=worker')
|
||
groups = resp.context['adj_groups']
|
||
self.assertIsNotNone(groups)
|
||
labels = {g['label'] for g in groups}
|
||
self.assertEqual(labels, {'Alice', 'Bob'})
|
||
alice = next(g for g in groups if g['label'] == 'Alice')
|
||
# Alice: +R 500 bonus + (-R 100) deduction = +R 400 net
|
||
self.assertEqual(alice['count'], 2)
|
||
self.assertEqual(alice['net_sum'], Decimal('400.00'))
|
||
|
||
def test_bulk_delete_only_affects_unpaid(self):
|
||
"""POST /payroll/adjustments/bulk-delete/ with mixed paid+unpaid IDs
|
||
deletes ONLY the unpaid rows. Paid rows are untouched (payroll
|
||
history is immutable)."""
|
||
self._login_admin()
|
||
# Pay a1 (leave a2, a3 unpaid)
|
||
pr = PayrollRecord.objects.create(
|
||
worker=self.w1, date=datetime.date(2026, 4, 15),
|
||
amount_paid=Decimal('4000'),
|
||
)
|
||
self.a1.payroll_record = pr
|
||
self.a1.save()
|
||
resp = self.client.post(
|
||
reverse('bulk_delete_adjustments'),
|
||
{'adjustment_ids': [self.a1.id, self.a2.id, self.a3.id]},
|
||
)
|
||
self.assertEqual(resp.status_code, 200)
|
||
body = resp.json()
|
||
self.assertEqual(body['deleted'], 2)
|
||
self.assertEqual(body['requested'], 3)
|
||
# a1 survives (paid), a2 + a3 gone
|
||
self.assertTrue(PayrollAdjustment.objects.filter(id=self.a1.id).exists())
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=self.a2.id).exists())
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=self.a3.id).exists())
|
||
|
||
def test_bulk_delete_requires_admin(self):
|
||
"""Non-admin supervisors cannot bulk-delete adjustments."""
|
||
self.client.login(username='adj-sup', password='pass')
|
||
resp = self.client.post(
|
||
reverse('bulk_delete_adjustments'),
|
||
{'adjustment_ids': [self.a1.id]},
|
||
)
|
||
self.assertEqual(resp.status_code, 403)
|
||
# a1 still present
|
||
self.assertTrue(PayrollAdjustment.objects.filter(id=self.a1.id).exists())
|
||
|
||
def test_bulk_delete_cascades_new_loan(self):
|
||
"""Bulk-deleting a 'New Loan' adjustment must also delete its
|
||
linked Loan row AND any still-unpaid Loan Repayment adjustments
|
||
— same cascade as the single-row delete_adjustment view. Without
|
||
this, the bulk endpoint would orphan Loan rows and leave pending
|
||
repayments in place."""
|
||
# Create a Loan + New Loan adjustment + unpaid repayment
|
||
loan = Loan.objects.create(
|
||
worker=self.w1,
|
||
principal_amount=Decimal('1000'),
|
||
remaining_balance=Decimal('1000'),
|
||
date=datetime.date(2026, 4, 1),
|
||
loan_type='loan',
|
||
)
|
||
new_loan_adj = PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='New Loan',
|
||
amount=Decimal('1000'), date=datetime.date(2026, 4, 1),
|
||
description='Test loan', loan=loan,
|
||
)
|
||
unpaid_repayment = PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='Loan Repayment',
|
||
amount=Decimal('500'), date=datetime.date(2026, 5, 1),
|
||
description='First repayment', loan=loan,
|
||
)
|
||
|
||
self._login_admin()
|
||
resp = self.client.post(
|
||
reverse('bulk_delete_adjustments'),
|
||
{'adjustment_ids': [new_loan_adj.id]},
|
||
)
|
||
self.assertEqual(resp.status_code, 200)
|
||
body = resp.json()
|
||
self.assertEqual(body['deleted'], 1)
|
||
|
||
# New Loan adjustment gone
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=new_loan_adj.id).exists())
|
||
# Linked Loan row gone (cascade)
|
||
self.assertFalse(Loan.objects.filter(id=loan.id).exists())
|
||
# Unpaid repayment gone (cascade)
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=unpaid_repayment.id).exists())
|
||
|
||
def test_bulk_delete_skips_loan_with_paid_repayments(self):
|
||
"""If a 'New Loan' has any paid repayments, bulk-delete must
|
||
refuse to delete it (would lose audit trail). Other rows in the
|
||
batch are unaffected."""
|
||
loan = Loan.objects.create(
|
||
worker=self.w1,
|
||
principal_amount=Decimal('1000'),
|
||
remaining_balance=Decimal('500'),
|
||
date=datetime.date(2026, 4, 1),
|
||
loan_type='loan',
|
||
)
|
||
new_loan_adj = PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='New Loan',
|
||
amount=Decimal('1000'), date=datetime.date(2026, 4, 1),
|
||
description='Test loan', loan=loan,
|
||
)
|
||
# One PAID repayment
|
||
pr = PayrollRecord.objects.create(
|
||
worker=self.w1, date=datetime.date(2026, 5, 1),
|
||
amount_paid=Decimal('500'),
|
||
)
|
||
PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='Loan Repayment',
|
||
amount=Decimal('500'), date=datetime.date(2026, 5, 1),
|
||
description='Paid', loan=loan, payroll_record=pr,
|
||
)
|
||
|
||
self._login_admin()
|
||
# Send the New Loan plus a2 (unpaid Bonus) — expect only a2 to delete
|
||
resp = self.client.post(
|
||
reverse('bulk_delete_adjustments'),
|
||
{'adjustment_ids': [new_loan_adj.id, self.a2.id]},
|
||
)
|
||
self.assertEqual(resp.status_code, 200)
|
||
body = resp.json()
|
||
self.assertEqual(body['deleted'], 1) # only a2
|
||
self.assertEqual(body['requested'], 2)
|
||
self.assertEqual(body['skipped_reasons'], {'has_paid_repayments': 1})
|
||
|
||
# New Loan survives, Loan survives, a2 gone
|
||
self.assertTrue(PayrollAdjustment.objects.filter(id=new_loan_adj.id).exists())
|
||
self.assertTrue(Loan.objects.filter(id=loan.id).exists())
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=self.a2.id).exists())
|
||
|
||
def test_team_worker_pairs_json_context_key(self):
|
||
"""Cross-filter map is a raw Python list of {team_id, worker_id}
|
||
dicts. Django's |json_script filter handles serialisation at
|
||
template render time (no double-encoding — see the 2026-04-23
|
||
inline-filters regression test)."""
|
||
self._login_admin()
|
||
resp = self.client.get(self.url)
|
||
pairs = resp.context['team_worker_pairs_json']
|
||
self.assertIsInstance(pairs, list)
|
||
for entry in pairs:
|
||
self.assertIn('team_id', entry)
|
||
self.assertIn('worker_id', entry)
|
||
# Our fixture: two teams (Alpha + Beta) with both workers in each
|
||
pair_set = {(p['team_id'], p['worker_id']) for p in pairs}
|
||
self.assertIn((self.team.id, self.w1.id), pair_set)
|
||
self.assertIn((self.team.id, self.w2.id), pair_set)
|
||
self.assertIn((self.team2.id, self.w1.id), pair_set)
|
||
self.assertIn((self.team2.id, self.w2.id), pair_set)
|
||
|
||
|
||
# === Cache-bust token tests ===
|
||
# The deployment_timestamp context variable controls the ?v=... query
|
||
# string on our CSS URL. It MUST only change when custom.css changes
|
||
# — otherwise Cloudflare cache-HIT rate on the CSS drops to zero and
|
||
# every page reload re-fetches 64 KB from the VM.
|
||
|
||
class CacheBustTokenTests(TestCase):
|
||
"""Regression tests for the mtime-based cache-bust token."""
|
||
|
||
def test_token_is_an_integer(self):
|
||
"""Token must be int (templates cast to str; float would show a dot)."""
|
||
from core.context_processors import project_context
|
||
ctx = project_context(request=None)
|
||
self.assertIsInstance(ctx['deployment_timestamp'], int)
|
||
|
||
def test_token_is_stable_across_two_calls(self):
|
||
"""Critical property: two back-to-back calls return the same
|
||
token — because custom.css hasn't changed between them. This
|
||
is the entire point of the mtime-based approach."""
|
||
from core.context_processors import project_context
|
||
t1 = project_context(request=None)['deployment_timestamp']
|
||
t2 = project_context(request=None)['deployment_timestamp']
|
||
self.assertEqual(t1, t2)
|
||
|
||
def test_token_falls_back_if_file_missing(self):
|
||
"""If static/css/custom.css is somehow missing (fresh
|
||
container pre-collectstatic), we must NOT crash. We fall back
|
||
to int(time.time()) so every page still renders."""
|
||
import core.context_processors as cp
|
||
try:
|
||
# Monkey-patch so the function sees a guaranteed-missing path.
|
||
# (The function itself is not patched — only the path constant.)
|
||
cp._CSS_PATH_FOR_TOKEN = cp.Path('/definitely/does/not/exist.css')
|
||
# Should return an int and NOT raise.
|
||
token = cp._compute_cache_bust_token()
|
||
self.assertIsInstance(token, int)
|
||
finally:
|
||
# Reset the module-level path constant so other tests (or reruns)
|
||
# get the real CSS file back.
|
||
cp._CSS_PATH_FOR_TOKEN = cp.Path(settings.BASE_DIR) / 'static' / 'css' / 'custom.css'
|
||
|
||
|
||
# === REGRESSION: adjustment project-attribution double-count ===
|
||
# Ticket summary: commit 61c485f replaced a per-project loop using SQL
|
||
# OR (`Q(project=P) | Q(work_log__project=P)`) with two SEPARATE
|
||
# filter+GROUP BY queries that were summed in Python. Any adjustment
|
||
# with BOTH `project_id` AND `work_log.project_id` set matched both
|
||
# queries and got counted TWICE.
|
||
#
|
||
# Every Overtime adjustment fits that shape — `price_overtime()` in
|
||
# views.py sets both FKs to the same project. So every unpaid
|
||
# Overtime silently inflated the outstanding-costs dashboard by its
|
||
# own amount. This test locks in the "count once" behaviour.
|
||
class PayrollDashboardAdjustmentAggregationTests(TestCase):
|
||
"""Regression tests for the per-project adjustment aggregation used by
|
||
the payroll dashboard. An adjustment with both `project` and
|
||
`work_log.project` set must contribute its amount ONCE to that
|
||
project, not twice."""
|
||
|
||
def setUp(self):
|
||
# Admin so the payroll_dashboard view accepts us (is_admin helper
|
||
# returns True for is_staff OR is_superuser).
|
||
self.admin = User.objects.create_user(
|
||
username='double-count-admin',
|
||
password='pw',
|
||
is_staff=True,
|
||
is_superuser=True,
|
||
)
|
||
self.client.force_login(self.admin)
|
||
|
||
# One active project, one worker, one fresh work log.
|
||
self.project = Project.objects.create(
|
||
name='Double-Count Test Site',
|
||
start_date=datetime.date(2026, 4, 1),
|
||
active=True,
|
||
)
|
||
self.worker = Worker.objects.create(
|
||
name='DC Worker',
|
||
id_number='DC1',
|
||
monthly_salary=Decimal('10000'),
|
||
)
|
||
self.work_log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 23),
|
||
project=self.project,
|
||
supervisor=self.admin,
|
||
)
|
||
# NOTE: deliberately no workers on the log — we do NOT want
|
||
# unpaid-log wage cost to pollute this test; we only want to
|
||
# measure the adjustment contribution.
|
||
|
||
# THE KEY SHAPE: one unpaid adjustment with BOTH FKs set to the
|
||
# same project. This mirrors how price_overtime() creates every
|
||
# Overtime adjustment (adj.project = worklog.project, and
|
||
# adj.work_log = worklog).
|
||
PayrollAdjustment.objects.create(
|
||
worker=self.worker,
|
||
type='Overtime', # additive → contributes positively
|
||
amount=Decimal('500'),
|
||
project=self.project,
|
||
work_log=self.work_log,
|
||
date=datetime.date(2026, 4, 23),
|
||
description='Regression-test Overtime',
|
||
)
|
||
|
||
def test_overtime_with_both_project_and_work_log_counts_once(self):
|
||
"""An unpaid Overtime with BOTH project + work_log.project set
|
||
must contribute its R500 amount once to outstanding-costs —
|
||
not R1000.
|
||
|
||
Before the Coalesce fix, the batched aggregates summed the
|
||
direct-project group + the work_log-project group separately
|
||
in Python, so an Overtime adjustment landed in both and got
|
||
double-counted. After the fix (Coalesce('project_id',
|
||
'work_log__project_id')) each adjustment row is attributed to
|
||
exactly one effective project.
|
||
"""
|
||
response = self.client.get(reverse('payroll_dashboard'))
|
||
self.assertEqual(response.status_code, 200)
|
||
outstanding = response.context['outstanding_project_costs']
|
||
|
||
# Find the entry for our test project by name (the shape is
|
||
# {'name': str, 'cost': Decimal} — no 'id' key).
|
||
ours = next(
|
||
(p for p in outstanding if p['name'] == self.project.name),
|
||
None,
|
||
)
|
||
self.assertIsNotNone(
|
||
ours,
|
||
"Test project should appear in outstanding_project_costs "
|
||
"(its unpaid Overtime is non-zero).",
|
||
)
|
||
self.assertEqual(
|
||
ours['cost'],
|
||
Decimal('500'),
|
||
"Overtime adjustment with both project + work_log.project "
|
||
"FKs set must count ONCE (R500), not twice (R1000). If "
|
||
"this fails with R1000 the project-attribution double-"
|
||
"count bug has reappeared.",
|
||
)
|
||
|
||
|
||
# ============================================================================
|
||
# === SITE REPORT TESTS ===
|
||
# Cover the SiteReport model + form + views (edit, detail) + the
|
||
# attendance-log redirect that lands the supervisor on the new report
|
||
# form. The model uses a JSONField for `metrics`, so we exercise the
|
||
# round-trip (save form -> JSON in DB -> form re-loads from JSON).
|
||
# ============================================================================
|
||
from core.models import SiteReport
|
||
from core.forms import SiteReportForm
|
||
|
||
|
||
class SiteReportModelTests(TestCase):
|
||
"""The SiteReport model itself — sanity checks for defaults +
|
||
1:1 reverse accessor behaviour."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='sr-admin', is_staff=True)
|
||
self.project = Project.objects.create(name='SR Project')
|
||
self.work_log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 27),
|
||
project=self.project,
|
||
supervisor=self.admin,
|
||
)
|
||
|
||
def test_metrics_defaults_to_empty_dict(self):
|
||
"""A fresh SiteReport with no metrics passed should have an
|
||
empty-dict default — never None — so template/view code can
|
||
safely call .get('counts', {}) without an AttributeError."""
|
||
report = SiteReport.objects.create(work_log=self.work_log)
|
||
self.assertEqual(report.metrics, {})
|
||
self.assertIsNotNone(report.metrics)
|
||
|
||
def test_reverse_accessor_does_not_exist_when_absent(self):
|
||
"""A WorkLog with no SiteReport: accessing `work_log.site_report`
|
||
raises DoesNotExist (1:1 reverse semantics). Templates/views
|
||
must wrap this access in try/except or hasattr()."""
|
||
with self.assertRaises(SiteReport.DoesNotExist):
|
||
self.work_log.site_report # noqa: B018 — the access itself is the test
|
||
|
||
def test_reverse_accessor_works_when_present(self):
|
||
"""Once a SiteReport is created, work_log.site_report returns it."""
|
||
report = SiteReport.objects.create(work_log=self.work_log, weather='sunny')
|
||
self.assertEqual(self.work_log.site_report, report)
|
||
self.assertEqual(self.work_log.site_report.weather, 'sunny')
|
||
|
||
def test_metrics_round_trip_with_arbitrary_keys(self):
|
||
"""JSONField accepts arbitrary keys — we don't enforce schema at
|
||
the DB layer; future schema changes won't reject historic data."""
|
||
report = SiteReport.objects.create(
|
||
work_log=self.work_log,
|
||
metrics={
|
||
'counts': {'plinths_cast': 8, 'old_metric_no_longer_in_schema': 3},
|
||
'checks': {'steel_tied': True},
|
||
},
|
||
)
|
||
report.refresh_from_db()
|
||
self.assertEqual(report.metrics['counts']['plinths_cast'], 8)
|
||
self.assertEqual(report.metrics['counts']['old_metric_no_longer_in_schema'], 3)
|
||
self.assertTrue(report.metrics['checks']['steel_tied'])
|
||
|
||
|
||
class SiteReportFormTests(TestCase):
|
||
"""SiteReportForm: dynamic field generation + JSON serialisation."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='srf-admin', is_staff=True)
|
||
self.project = Project.objects.create(name='SRF Project')
|
||
self.work_log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 27),
|
||
project=self.project,
|
||
supervisor=self.admin,
|
||
)
|
||
|
||
def test_form_dynamically_includes_count_and_check_fields(self):
|
||
"""The form's __init__ should attach one field per metric in
|
||
the schema — this is what makes adding a new metric a one-line
|
||
edit to site_report_schema.py."""
|
||
from core.site_report_schema import COUNT_METRICS, CHECK_METRICS
|
||
form = SiteReportForm(work_log=self.work_log)
|
||
for m in COUNT_METRICS:
|
||
self.assertIn(f"count_{m['key']}", form.fields)
|
||
for m in CHECK_METRICS:
|
||
self.assertIn(f"check_{m['key']}", form.fields)
|
||
|
||
def test_form_save_serialises_metrics_into_json_blob(self):
|
||
"""Submitting count + check fields should produce a metrics
|
||
dict with 'counts' and 'checks' top-level keys, populated from
|
||
the form input. Blank counts → 0; unchecked checks → False."""
|
||
data = {
|
||
'weather': 'sunny',
|
||
'temperature_min': 18,
|
||
'temperature_max': 28,
|
||
'notes': 'Hot day, good progress',
|
||
'count_plinths_cast': 8,
|
||
'count_plinth_holes_dug': 12,
|
||
# other count fields left blank → should default to 0
|
||
'check_steel_tied': 'on', # checkbox 'on' = checked
|
||
# check_boxes_cleaned not posted → False
|
||
}
|
||
form = SiteReportForm(data=data, work_log=self.work_log)
|
||
self.assertTrue(form.is_valid(), msg=form.errors)
|
||
report = form.save()
|
||
self.assertEqual(report.weather, 'sunny')
|
||
self.assertEqual(report.temperature_min, 18)
|
||
self.assertEqual(report.temperature_max, 28)
|
||
self.assertIn('counts', report.metrics)
|
||
self.assertIn('checks', report.metrics)
|
||
self.assertEqual(report.metrics['counts']['plinths_cast'], 8)
|
||
self.assertEqual(report.metrics['counts']['plinth_holes_dug'], 12)
|
||
# Blank counts default to 0, not absent
|
||
self.assertEqual(report.metrics['counts']['boxes_placed'], 0)
|
||
self.assertTrue(report.metrics['checks']['steel_tied'])
|
||
self.assertFalse(report.metrics['checks']['boxes_cleaned'])
|
||
|
||
def test_form_validates_temp_min_max_relationship(self):
|
||
"""If both temps are set and min > max, the supervisor probably
|
||
swapped them — surface a ValidationError rather than silently
|
||
accept it."""
|
||
data = {
|
||
'weather': '',
|
||
'temperature_min': 35,
|
||
'temperature_max': 18,
|
||
'notes': '',
|
||
}
|
||
form = SiteReportForm(data=data, work_log=self.work_log)
|
||
self.assertFalse(form.is_valid())
|
||
# The error is form-level (clean()), not field-level
|
||
self.assertTrue(any('Min temperature' in e for e in form.non_field_errors()))
|
||
|
||
def test_form_pre_fills_from_existing_metrics(self):
|
||
"""Editing an existing SiteReport should pre-populate the
|
||
dynamic fields from the JSON blob."""
|
||
existing = SiteReport.objects.create(
|
||
work_log=self.work_log,
|
||
weather='cloudy',
|
||
metrics={'counts': {'plinths_cast': 5}, 'checks': {'town_run': True}},
|
||
)
|
||
form = SiteReportForm(instance=existing)
|
||
self.assertEqual(form.fields['count_plinths_cast'].initial, 5)
|
||
self.assertTrue(form.fields['check_town_run'].initial)
|
||
|
||
|
||
class SiteReportEditViewTests(TestCase):
|
||
"""The edit view — permissions, GET behaviour, POST behaviour."""
|
||
|
||
def setUp(self):
|
||
# Two users: an admin (full access) and a supervisor of a
|
||
# DIFFERENT project (must be denied access to this project's
|
||
# work logs).
|
||
self.admin = User.objects.create_user(username='sre-admin', password='pw', is_staff=True)
|
||
self.supervisor = User.objects.create_user(username='sre-sup', password='pw')
|
||
self.outsider_supervisor = User.objects.create_user(username='sre-out', password='pw')
|
||
|
||
self.project = Project.objects.create(name='SRE Project')
|
||
self.project.supervisors.add(self.supervisor)
|
||
|
||
self.other_project = Project.objects.create(name='Other Project')
|
||
self.other_project.supervisors.add(self.outsider_supervisor)
|
||
|
||
self.work_log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 27),
|
||
project=self.project,
|
||
supervisor=self.supervisor,
|
||
)
|
||
|
||
def test_admin_get_returns_blank_form_for_new_report(self):
|
||
self.client.force_login(self.admin)
|
||
url = reverse('site_report_edit', kwargs={'work_log_id': self.work_log.id})
|
||
response = self.client.get(url)
|
||
self.assertEqual(response.status_code, 200)
|
||
self.assertContains(response, 'Log Today') # heading on the create variant
|
||
|
||
def test_admin_post_creates_site_report(self):
|
||
self.client.force_login(self.admin)
|
||
url = reverse('site_report_edit', kwargs={'work_log_id': self.work_log.id})
|
||
response = self.client.post(url, {
|
||
'weather': 'rain',
|
||
'temperature_min': '15',
|
||
'temperature_max': '22',
|
||
'notes': 'Wet day, stopped early',
|
||
'count_plinths_cast': '4',
|
||
'check_rain_delay': 'on',
|
||
})
|
||
self.assertEqual(response.status_code, 302) # redirect to home on success
|
||
self.assertEqual(SiteReport.objects.count(), 1)
|
||
report = SiteReport.objects.first()
|
||
self.assertEqual(report.work_log_id, self.work_log.id)
|
||
self.assertEqual(report.weather, 'rain')
|
||
self.assertEqual(report.created_by_id, self.admin.id)
|
||
self.assertEqual(report.metrics['counts']['plinths_cast'], 4)
|
||
self.assertTrue(report.metrics['checks']['rain_delay'])
|
||
|
||
def test_admin_post_updates_existing_report_without_changing_created_by(self):
|
||
# A report already exists, created by the supervisor.
|
||
SiteReport.objects.create(
|
||
work_log=self.work_log,
|
||
weather='sunny',
|
||
created_by=self.supervisor,
|
||
)
|
||
# Admin edits it later.
|
||
self.client.force_login(self.admin)
|
||
url = reverse('site_report_edit', kwargs={'work_log_id': self.work_log.id})
|
||
response = self.client.post(url, {
|
||
'weather': 'cloudy',
|
||
'temperature_min': '',
|
||
'temperature_max': '',
|
||
'notes': 'Admin edited',
|
||
})
|
||
self.assertEqual(response.status_code, 302)
|
||
report = SiteReport.objects.get(work_log=self.work_log)
|
||
self.assertEqual(report.weather, 'cloudy')
|
||
# created_by is preserved — the original author keeps credit
|
||
self.assertEqual(report.created_by_id, self.supervisor.id)
|
||
|
||
def test_project_supervisor_can_edit_their_own_projects_report(self):
|
||
self.client.force_login(self.supervisor)
|
||
url = reverse('site_report_edit', kwargs={'work_log_id': self.work_log.id})
|
||
response = self.client.get(url)
|
||
self.assertEqual(response.status_code, 200)
|
||
|
||
def test_outsider_supervisor_is_forbidden(self):
|
||
"""A supervisor of a DIFFERENT project must not be able to
|
||
access this project's site reports."""
|
||
self.client.force_login(self.outsider_supervisor)
|
||
url = reverse('site_report_edit', kwargs={'work_log_id': self.work_log.id})
|
||
response = self.client.get(url)
|
||
self.assertEqual(response.status_code, 403)
|
||
|
||
|
||
class SiteReportDetailViewTests(TestCase):
|
||
"""The read-only detail page."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='srd-admin', password='pw', is_staff=True)
|
||
self.project = Project.objects.create(name='SRD Project')
|
||
self.work_log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 27),
|
||
project=self.project,
|
||
supervisor=self.admin,
|
||
)
|
||
|
||
def test_404_when_no_report_exists(self):
|
||
"""The detail page is for VIEWING reports — if there isn't one,
|
||
404 (the supervisor should use the edit URL to create one)."""
|
||
self.client.force_login(self.admin)
|
||
url = reverse('site_report_detail', kwargs={'work_log_id': self.work_log.id})
|
||
response = self.client.get(url)
|
||
self.assertEqual(response.status_code, 404)
|
||
|
||
def test_displays_report_data_when_present(self):
|
||
SiteReport.objects.create(
|
||
work_log=self.work_log,
|
||
weather='sunny',
|
||
temperature_min=18,
|
||
temperature_max=32,
|
||
notes='Brief but visible note',
|
||
metrics={
|
||
'counts': {'plinths_cast': 7},
|
||
'checks': {'steel_tied': True},
|
||
},
|
||
)
|
||
self.client.force_login(self.admin)
|
||
url = reverse('site_report_detail', kwargs={'work_log_id': self.work_log.id})
|
||
response = self.client.get(url)
|
||
self.assertEqual(response.status_code, 200)
|
||
# Spot-check that the values reach the rendered HTML
|
||
self.assertContains(response, 'Brief but visible note')
|
||
self.assertContains(response, 'Sunny')
|
||
# The count appears as a numeric in the count card
|
||
self.assertContains(response, '7')
|
||
|
||
|
||
class AttendanceLogRedirectsToSiteReportTests(TestCase):
|
||
"""The two-step flow: after submitting attendance, the user lands
|
||
on the site-report edit page for the most recently created log."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='atrr-admin', password='pw', is_staff=True)
|
||
self.project = Project.objects.create(name='AT Project')
|
||
self.worker = Worker.objects.create(
|
||
name='Test Worker',
|
||
id_number='AT1',
|
||
monthly_salary=Decimal('10000'),
|
||
)
|
||
|
||
def test_successful_attendance_post_redirects_to_site_report_edit(self):
|
||
"""The redirect target is the site-report form, not home — so
|
||
the supervisor lands somewhere they can immediately log progress."""
|
||
self.client.force_login(self.admin)
|
||
response = self.client.post(reverse('attendance_log'), {
|
||
'date': '2026-04-27',
|
||
'project': self.project.id,
|
||
'workers': [self.worker.id],
|
||
'overtime_amount': '0.00',
|
||
'notes': '',
|
||
})
|
||
self.assertEqual(response.status_code, 302)
|
||
# The redirect URL contains '/site-report/' and ends with '/edit/'
|
||
self.assertIn('/site-report/', response.url)
|
||
self.assertTrue(response.url.endswith('/edit/'),
|
||
msg=f"Expected redirect to .../edit/, got {response.url}")
|
||
# And exactly one work log was created
|
||
self.assertEqual(WorkLog.objects.count(), 1)
|
||
|
||
|
||
# ====================================================================
|
||
# === Worker Absence — Phase 1: Model layer ==========================
|
||
# ====================================================================
|
||
|
||
from datetime import date as _date
|
||
from django.db import IntegrityError
|
||
|
||
|
||
class AbsenceModelTests(TestCase):
|
||
"""Per-worker dated absence records. Mirrors WorkerWarning shape."""
|
||
|
||
@classmethod
|
||
def setUpTestData(cls):
|
||
cls.worker = Worker.objects.create(
|
||
name='Joe Mokoena', id_number='8001015800086',
|
||
monthly_salary=Decimal('6000.00'),
|
||
)
|
||
|
||
def test_defaults(self):
|
||
"""is_paid defaults to False; payroll_adjustment is null; date defaults to today."""
|
||
a = Absence.objects.create(worker=self.worker, reason='sick')
|
||
# Re-fetch from DB so the DateField default (timezone.now) is coerced
|
||
# from datetime → date by the DB layer (mirrors WorkerWarning pattern).
|
||
a.refresh_from_db()
|
||
self.assertFalse(a.is_paid)
|
||
self.assertIsNone(a.payroll_adjustment)
|
||
self.assertEqual(a.date, _date.today())
|
||
|
||
def test_unique_per_worker_per_day(self):
|
||
"""Cannot have two absences for the same worker on the same day."""
|
||
d = _date(2026, 5, 14)
|
||
Absence.objects.create(worker=self.worker, date=d, reason='sick')
|
||
with self.assertRaises(IntegrityError):
|
||
Absence.objects.create(worker=self.worker, date=d, reason='family')
|
||
|
||
def test_ordering_newest_first(self):
|
||
"""Default queryset order is -date."""
|
||
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 10), reason='sick')
|
||
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 15), reason='annual')
|
||
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 12), reason='other')
|
||
dates = list(Absence.objects.values_list('date', flat=True))
|
||
self.assertEqual(dates, [_date(2026, 5, 15), _date(2026, 5, 12), _date(2026, 5, 10)])
|
||
|
||
def test_reverse_accessor_on_worker(self):
|
||
"""worker.absences.all() works (related_name='absences')."""
|
||
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 1), reason='sick')
|
||
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 2), reason='family')
|
||
self.assertEqual(self.worker.absences.count(), 2)
|
||
|
||
|
||
# =============================================================================
|
||
# === ABSENCE HELPER TESTS (Task 2) ===
|
||
# Tests for the two shared helpers in core/views.py that every Absence
|
||
# save/edit/delete path will route through:
|
||
# - _sync_absence_payroll_adjustment: keeps is_paid in sync with the
|
||
# linked Bonus PayrollAdjustment (create / delete / refuse-if-paid).
|
||
# - _absence_user_queryset: scopes visibility by admin vs supervisor.
|
||
# =============================================================================
|
||
|
||
|
||
class AbsencePayrollSyncTests(TestCase):
|
||
"""The _sync_absence_payroll_adjustment helper keeps Absence.payroll_adjustment
|
||
in sync with Absence.is_paid. Mirrors the cascade pattern used for
|
||
Advance Payment → Loan auto-creation."""
|
||
|
||
@classmethod
|
||
def setUpTestData(cls):
|
||
cls.user = User.objects.create_user(username='admin', is_staff=True)
|
||
cls.worker = Worker.objects.create(
|
||
name='Sipho Dlamini', id_number='8501015800087',
|
||
monthly_salary=Decimal('6000.00'),
|
||
)
|
||
cls.project = Project.objects.create(name='Solar Farm Alpha')
|
||
|
||
def _make_absence(self, **kw):
|
||
defaults = dict(
|
||
worker=self.worker, date=_date(2026, 5, 14), reason='sick',
|
||
logged_by=self.user, is_paid=False,
|
||
)
|
||
defaults.update(kw)
|
||
return Absence.objects.create(**defaults)
|
||
|
||
def test_is_paid_false_no_adjustment(self):
|
||
"""Unpaid absence: sync is a no-op, no adjustment created."""
|
||
from core.views import _sync_absence_payroll_adjustment
|
||
a = self._make_absence(is_paid=False)
|
||
_sync_absence_payroll_adjustment(a)
|
||
self.assertIsNone(a.payroll_adjustment)
|
||
self.assertEqual(PayrollAdjustment.objects.count(), 0)
|
||
|
||
def test_is_paid_true_creates_bonus(self):
|
||
"""Paid absence creates a Bonus PayrollAdjustment at daily_rate."""
|
||
from core.views import _sync_absence_payroll_adjustment
|
||
a = self._make_absence(is_paid=True)
|
||
_sync_absence_payroll_adjustment(a)
|
||
a.refresh_from_db()
|
||
self.assertIsNotNone(a.payroll_adjustment)
|
||
adj = a.payroll_adjustment
|
||
self.assertEqual(adj.type, 'Bonus')
|
||
# daily_rate = monthly_salary / 20 = 6000 / 20 = 300
|
||
self.assertEqual(adj.amount, Decimal('300.00'))
|
||
self.assertEqual(adj.worker, self.worker)
|
||
self.assertEqual(adj.date, _date(2026, 5, 14))
|
||
|
||
def test_toggle_paid_off_deletes_adjustment(self):
|
||
"""is_paid True → False: existing adjustment is deleted."""
|
||
from core.views import _sync_absence_payroll_adjustment
|
||
a = self._make_absence(is_paid=True)
|
||
_sync_absence_payroll_adjustment(a)
|
||
self.assertEqual(PayrollAdjustment.objects.count(), 1)
|
||
|
||
a.is_paid = False
|
||
a.save()
|
||
_sync_absence_payroll_adjustment(a)
|
||
a.refresh_from_db()
|
||
self.assertIsNone(a.payroll_adjustment)
|
||
self.assertEqual(PayrollAdjustment.objects.count(), 0)
|
||
|
||
def test_toggle_paid_on_creates_fresh(self):
|
||
"""is_paid False → True: new adjustment is created."""
|
||
from core.views import _sync_absence_payroll_adjustment
|
||
a = self._make_absence(is_paid=False)
|
||
_sync_absence_payroll_adjustment(a)
|
||
self.assertIsNone(a.payroll_adjustment)
|
||
|
||
a.is_paid = True
|
||
a.save()
|
||
_sync_absence_payroll_adjustment(a)
|
||
a.refresh_from_db()
|
||
self.assertIsNotNone(a.payroll_adjustment)
|
||
|
||
def test_paid_with_existing_adj_is_idempotent(self):
|
||
"""is_paid True + adjustment already linked → leave it alone.
|
||
Admin-edited amount must survive a re-sync."""
|
||
from core.views import _sync_absence_payroll_adjustment
|
||
a = self._make_absence(is_paid=True)
|
||
_sync_absence_payroll_adjustment(a)
|
||
a.refresh_from_db()
|
||
original_adj = a.payroll_adjustment
|
||
self.assertIsNotNone(original_adj)
|
||
|
||
# Admin manually edits the amount (e.g. half-day pay)
|
||
original_adj.amount = Decimal('150.00')
|
||
original_adj.save()
|
||
|
||
# Second sync call — must not destroy admin's edit
|
||
_sync_absence_payroll_adjustment(a)
|
||
a.refresh_from_db()
|
||
self.assertEqual(a.payroll_adjustment_id, original_adj.id)
|
||
self.assertEqual(a.payroll_adjustment.amount, Decimal('150.00'))
|
||
self.assertEqual(PayrollAdjustment.objects.count(), 1)
|
||
|
||
def test_refuses_if_adjustment_already_paid(self):
|
||
"""If the linked adjustment has a PayrollRecord (i.e. already paid),
|
||
the sync helper refuses to delete it — surface to admin instead."""
|
||
from core.views import _sync_absence_payroll_adjustment
|
||
a = self._make_absence(is_paid=True)
|
||
_sync_absence_payroll_adjustment(a)
|
||
a.refresh_from_db()
|
||
|
||
# Simulate the adjustment being paid by attaching a PayrollRecord
|
||
pr = PayrollRecord.objects.create(worker=self.worker, amount_paid=Decimal('300.00'), date=_date(2026, 5, 30))
|
||
a.payroll_adjustment.payroll_record = pr
|
||
a.payroll_adjustment.save()
|
||
|
||
a.is_paid = False
|
||
a.save()
|
||
with self.assertRaises(ValueError):
|
||
_sync_absence_payroll_adjustment(a)
|
||
# Adjustment still exists
|
||
self.assertEqual(PayrollAdjustment.objects.count(), 1)
|
||
|
||
|
||
class AbsenceUserQuerysetTests(TestCase):
|
||
"""_absence_user_queryset scopes the queryset to admin (all) or supervisor
|
||
(workers in their supervised teams)."""
|
||
|
||
@classmethod
|
||
def setUpTestData(cls):
|
||
cls.admin = User.objects.create_user(username='admin', is_staff=True)
|
||
cls.sup_a = User.objects.create_user(username='sup_a')
|
||
cls.sup_b = User.objects.create_user(username='sup_b')
|
||
cls.outsider = User.objects.create_user(username='out')
|
||
|
||
cls.worker_a = Worker.objects.create(name='WA', id_number='1', monthly_salary=Decimal('6000'))
|
||
cls.worker_b = Worker.objects.create(name='WB', id_number='2', monthly_salary=Decimal('6000'))
|
||
|
||
cls.team_a = Team.objects.create(name='TA', supervisor=cls.sup_a)
|
||
cls.team_a.workers.add(cls.worker_a)
|
||
cls.team_b = Team.objects.create(name='TB', supervisor=cls.sup_b)
|
||
cls.team_b.workers.add(cls.worker_b)
|
||
|
||
Absence.objects.create(worker=cls.worker_a, date=_date(2026, 5, 1), reason='sick')
|
||
Absence.objects.create(worker=cls.worker_b, date=_date(2026, 5, 1), reason='annual')
|
||
|
||
def test_admin_sees_all(self):
|
||
from core.views import _absence_user_queryset
|
||
qs = _absence_user_queryset(self.admin)
|
||
self.assertEqual(qs.count(), 2)
|
||
|
||
def test_supervisor_sees_only_own_team(self):
|
||
from core.views import _absence_user_queryset
|
||
qs_a = _absence_user_queryset(self.sup_a)
|
||
self.assertEqual(qs_a.count(), 1)
|
||
self.assertEqual(qs_a.first().worker, self.worker_a)
|
||
|
||
def test_outsider_sees_none(self):
|
||
from core.views import _absence_user_queryset
|
||
qs = _absence_user_queryset(self.outsider)
|
||
self.assertEqual(qs.count(), 0)
|
||
|
||
|
||
# =============================================================================
|
||
# === ABSENCE FORM TESTS (Task 3) ===
|
||
# Tests for the three form classes added in core/forms.py:
|
||
# - AbsenceLogForm: standalone /absences/log/ with date-range + multi-worker
|
||
# - AbsenceQuickForm: minimal modal form on /attendance/log/
|
||
# - AbsenceEditForm: edit one existing absence
|
||
# =============================================================================
|
||
|
||
|
||
class AbsenceFormTests(TestCase):
|
||
@classmethod
|
||
def setUpTestData(cls):
|
||
cls.admin = User.objects.create_user(username='admin', is_staff=True)
|
||
cls.worker_a = Worker.objects.create(name='WA', id_number='1', monthly_salary=Decimal('6000'))
|
||
cls.worker_b = Worker.objects.create(name='WB', id_number='2', monthly_salary=Decimal('6000'))
|
||
cls.project = Project.objects.create(name='P1')
|
||
cls.team = Team.objects.create(name='T', supervisor=cls.admin)
|
||
cls.team.workers.add(cls.worker_a, cls.worker_b)
|
||
|
||
def test_single_date_submission(self):
|
||
from core.forms import AbsenceLogForm
|
||
form = AbsenceLogForm(
|
||
data={
|
||
'date': '2026-05-14',
|
||
'reason': 'sick',
|
||
'is_paid': False,
|
||
'notes': 'flu',
|
||
'team': self.team.id,
|
||
'workers': [self.worker_a.id],
|
||
},
|
||
user=self.admin,
|
||
)
|
||
self.assertTrue(form.is_valid(), msg=form.errors)
|
||
# cleaned data has expanded (worker, date) tuples
|
||
tuples = form.expanded_pairs()
|
||
self.assertEqual(len(tuples), 1)
|
||
self.assertEqual(tuples[0], (self.worker_a, _date(2026, 5, 14)))
|
||
|
||
def test_range_expansion_skips_weekends_by_default(self):
|
||
from core.forms import AbsenceLogForm
|
||
# Thursday → Monday (5 days, Sat+Sun skipped → 3 days)
|
||
form = AbsenceLogForm(
|
||
data={
|
||
'date': '2026-05-14', # Thursday
|
||
'end_date': '2026-05-18', # Monday
|
||
'reason': 'annual',
|
||
'workers': [self.worker_a.id],
|
||
},
|
||
user=self.admin,
|
||
)
|
||
self.assertTrue(form.is_valid())
|
||
dates = [d for _w, d in form.expanded_pairs()]
|
||
# 14 Thu, 15 Fri, 18 Mon (16 Sat / 17 Sun skipped)
|
||
self.assertEqual(dates, [_date(2026, 5, 14), _date(2026, 5, 15), _date(2026, 5, 18)])
|
||
|
||
def test_range_expansion_with_weekend_toggles(self):
|
||
from core.forms import AbsenceLogForm
|
||
form = AbsenceLogForm(
|
||
data={
|
||
'date': '2026-05-14',
|
||
'end_date': '2026-05-17', # Thursday → Sunday
|
||
'reason': 'sick',
|
||
'include_saturday': True,
|
||
'include_sunday': True,
|
||
'workers': [self.worker_a.id],
|
||
},
|
||
user=self.admin,
|
||
)
|
||
self.assertTrue(form.is_valid())
|
||
dates = [d for _w, d in form.expanded_pairs()]
|
||
self.assertEqual(len(dates), 4) # all 4 days
|
||
|
||
def test_end_date_before_start_rejected(self):
|
||
from core.forms import AbsenceLogForm
|
||
form = AbsenceLogForm(
|
||
data={
|
||
'date': '2026-05-15',
|
||
'end_date': '2026-05-10',
|
||
'reason': 'sick',
|
||
'workers': [self.worker_a.id],
|
||
},
|
||
user=self.admin,
|
||
)
|
||
self.assertFalse(form.is_valid())
|
||
self.assertIn('end_date', form.errors)
|
||
|
||
def test_duplicate_absence_rejected(self):
|
||
"""If absence(worker, date) already exists, form is invalid."""
|
||
from core.forms import AbsenceLogForm
|
||
Absence.objects.create(worker=self.worker_a, date=_date(2026, 5, 14), reason='sick')
|
||
form = AbsenceLogForm(
|
||
data={
|
||
'date': '2026-05-14',
|
||
'reason': 'family',
|
||
'workers': [self.worker_a.id],
|
||
},
|
||
user=self.admin,
|
||
)
|
||
self.assertFalse(form.is_valid())
|
||
# Error message names the worker
|
||
self.assertIn('already', str(form.errors).lower())
|
||
|
||
def test_worklog_conflict_flagged_not_blocking(self):
|
||
"""If worker has a WorkLog on the absence date, form is still valid
|
||
but conflicting_worklogs() returns the conflict rows."""
|
||
from core.forms import AbsenceLogForm
|
||
wl = WorkLog.objects.create(
|
||
date=_date(2026, 5, 14), project=self.project, supervisor=self.admin,
|
||
)
|
||
wl.workers.add(self.worker_a)
|
||
form = AbsenceLogForm(
|
||
data={
|
||
'date': '2026-05-14',
|
||
'reason': 'sick',
|
||
'workers': [self.worker_a.id],
|
||
},
|
||
user=self.admin,
|
||
)
|
||
self.assertTrue(form.is_valid()) # Conflicts are warnings, not errors
|
||
conflicts = form.conflicting_worklogs()
|
||
self.assertEqual(len(conflicts), 1)
|
||
self.assertEqual(conflicts[0]['worker_id'], self.worker_a.id)
|
||
self.assertEqual(conflicts[0]['work_log_id'], wl.id)
|
||
|
||
def test_edit_form_supervisor_scope(self):
|
||
"""AbsenceEditForm with user=supervisor only shows workers in
|
||
supervisor's active teams. Other-team workers are not in the
|
||
dropdown queryset."""
|
||
from core.forms import AbsenceEditForm
|
||
|
||
# Create a supervisor with their own team
|
||
sup = User.objects.create_user(username='sup_edit', password='pw')
|
||
sup_team = Team.objects.create(name='SupTeam', supervisor=sup)
|
||
sup_worker = Worker.objects.create(
|
||
name='Sup Worker', id_number='99', monthly_salary=Decimal('6000'),
|
||
)
|
||
sup_team.workers.add(sup_worker)
|
||
|
||
# worker_b is on a team supervised by self.admin, not by sup
|
||
abs1 = Absence.objects.create(worker=sup_worker, date=_date(2026, 6, 1), reason='sick')
|
||
form = AbsenceEditForm(instance=abs1, user=sup)
|
||
worker_ids = list(form.fields['worker'].queryset.values_list('id', flat=True))
|
||
self.assertIn(sup_worker.id, worker_ids)
|
||
self.assertNotIn(self.worker_b.id, worker_ids)
|
||
|
||
def test_edit_form_uniqueness_uses_date_error(self):
|
||
"""AbsenceEditForm.clean() adds the uniqueness error to the 'date'
|
||
field (not raise), so it renders next to the field."""
|
||
from core.forms import AbsenceEditForm
|
||
# Existing absence on (worker_a, 2026-06-15)
|
||
abs1 = Absence.objects.create(worker=self.worker_a, date=_date(2026, 6, 15), reason='sick')
|
||
# Try to edit a DIFFERENT absence to clash with abs1
|
||
abs2 = Absence.objects.create(worker=self.worker_a, date=_date(2026, 6, 20), reason='annual')
|
||
form = AbsenceEditForm(
|
||
instance=abs2,
|
||
data={
|
||
'worker': self.worker_a.id,
|
||
'date': '2026-06-15', # clash
|
||
'reason': 'family',
|
||
'notes': '',
|
||
},
|
||
)
|
||
self.assertFalse(form.is_valid())
|
||
# Error must be on the 'date' field, not non-field
|
||
self.assertIn('date', form.errors)
|
||
self.assertIn('already', str(form.errors['date']).lower())
|
||
|
||
|
||
# === ABSENCE LOG + CONFIRM VIEW TESTS (Task 4) ===
|
||
# GET shows the form. POST without conflicts creates Absence rows
|
||
# immediately. POST with conflicts stashes pending data in the session
|
||
# and redirects to /absences/log/confirm/ where the admin can opt to
|
||
# also remove the conflicting WorkLog entries before committing.
|
||
|
||
class AbsenceLogViewTests(TestCase):
|
||
"""GET shows form; POST without conflicts creates absences immediately;
|
||
POST with conflicts stashes pending data in session + redirects to
|
||
/absences/log/confirm/."""
|
||
|
||
@classmethod
|
||
def setUpTestData(cls):
|
||
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
|
||
cls.worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('6000'))
|
||
cls.project = Project.objects.create(name='P')
|
||
cls.team = Team.objects.create(name='T', supervisor=cls.admin)
|
||
cls.team.workers.add(cls.worker)
|
||
|
||
def setUp(self):
|
||
self.client.force_login(self.admin)
|
||
|
||
def test_get_returns_200(self):
|
||
resp = self.client.get('/absences/log/')
|
||
self.assertEqual(resp.status_code, 200)
|
||
|
||
def test_post_creates_absences_when_no_conflict(self):
|
||
resp = self.client.post('/absences/log/', data={
|
||
'date': '2026-05-14',
|
||
'reason': 'sick',
|
||
'workers': [self.worker.id],
|
||
'notes': 'flu',
|
||
})
|
||
self.assertEqual(Absence.objects.count(), 1)
|
||
absence = Absence.objects.first()
|
||
self.assertEqual(absence.worker, self.worker)
|
||
self.assertEqual(absence.reason, 'sick')
|
||
self.assertFalse(absence.is_paid)
|
||
self.assertEqual(absence.logged_by, self.admin)
|
||
self.assertRedirects(resp, '/absences/', fetch_redirect_response=False)
|
||
|
||
def test_post_with_paid_creates_adjustment(self):
|
||
self.client.post('/absences/log/', data={
|
||
'date': '2026-05-14',
|
||
'reason': 'sick',
|
||
'is_paid': 'on',
|
||
'workers': [self.worker.id],
|
||
})
|
||
absence = Absence.objects.first()
|
||
self.assertTrue(absence.is_paid)
|
||
self.assertIsNotNone(absence.payroll_adjustment)
|
||
self.assertEqual(absence.payroll_adjustment.type, 'Bonus')
|
||
|
||
def test_post_with_worklog_conflict_redirects_to_confirm(self):
|
||
wl = WorkLog.objects.create(date=_date(2026, 5, 14), project=self.project, supervisor=self.admin)
|
||
wl.workers.add(self.worker)
|
||
resp = self.client.post('/absences/log/', data={
|
||
'date': '2026-05-14',
|
||
'reason': 'sick',
|
||
'workers': [self.worker.id],
|
||
})
|
||
self.assertEqual(Absence.objects.count(), 0) # NOT created yet
|
||
self.assertRedirects(resp, '/absences/log/confirm/', fetch_redirect_response=False)
|
||
# Session has stashed pending data
|
||
self.assertIn('absence_pending', self.client.session)
|
||
|
||
def test_supervisor_can_post(self):
|
||
sup = User.objects.create_user(username='sup', password='pw')
|
||
sup_team = Team.objects.create(name='ST', supervisor=sup)
|
||
sup_team.workers.add(self.worker)
|
||
self.client.force_login(sup)
|
||
resp = self.client.post('/absences/log/', data={
|
||
'date': '2026-05-14',
|
||
'reason': 'sick',
|
||
'workers': [self.worker.id],
|
||
})
|
||
self.assertEqual(Absence.objects.count(), 1)
|
||
|
||
def test_outsider_gets_403(self):
|
||
outsider = User.objects.create_user(username='out', password='pw')
|
||
self.client.force_login(outsider)
|
||
resp = self.client.get('/absences/log/')
|
||
self.assertEqual(resp.status_code, 403)
|
||
|
||
|
||
class AbsenceConfirmViewTests(TestCase):
|
||
@classmethod
|
||
def setUpTestData(cls):
|
||
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
|
||
cls.worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('6000'))
|
||
cls.project = Project.objects.create(name='P')
|
||
|
||
def setUp(self):
|
||
self.client.force_login(self.admin)
|
||
# Pre-stash pending data + create the conflict WorkLog
|
||
self.wl = WorkLog.objects.create(date=_date(2026, 5, 14), project=self.project, supervisor=self.admin)
|
||
self.wl.workers.add(self.worker)
|
||
session = self.client.session
|
||
session['absence_pending'] = {
|
||
'pairs': [[self.worker.id, '2026-05-14']],
|
||
'reason': 'sick',
|
||
'is_paid': False,
|
||
'notes': 'flu',
|
||
'conflicts': [{
|
||
'worker_id': self.worker.id, 'worker_name': 'W',
|
||
'date': '2026-05-14', 'work_log_id': self.wl.id,
|
||
'project_name': 'P',
|
||
}],
|
||
}
|
||
session.save()
|
||
|
||
def test_get_without_session_redirects_back(self):
|
||
# Clear session
|
||
session = self.client.session
|
||
session.pop('absence_pending', None)
|
||
session.save()
|
||
resp = self.client.get('/absences/log/confirm/')
|
||
self.assertRedirects(resp, '/absences/log/', fetch_redirect_response=False)
|
||
|
||
def test_get_with_session_shows_warning(self):
|
||
resp = self.client.get('/absences/log/confirm/')
|
||
self.assertEqual(resp.status_code, 200)
|
||
self.assertContains(resp, 'already') # warning text
|
||
|
||
def test_post_creates_absences_and_removes_from_worklog(self):
|
||
resp = self.client.post('/absences/log/confirm/', data={
|
||
f'remove_from_worklog_{self.wl.id}_{self.worker.id}': 'on',
|
||
})
|
||
self.assertEqual(Absence.objects.count(), 1)
|
||
self.assertNotIn(self.worker, self.wl.workers.all())
|
||
self.assertRedirects(resp, '/absences/', fetch_redirect_response=False)
|
||
self.assertNotIn('absence_pending', self.client.session)
|
||
|
||
def test_post_without_removal_still_creates_absences(self):
|
||
resp = self.client.post('/absences/log/confirm/', data={})
|
||
self.assertEqual(Absence.objects.count(), 1)
|
||
# Worker still in WorkLog because admin didn't tick the box
|
||
self.assertIn(self.worker, self.wl.workers.all())
|
||
|
||
def test_post_drops_unauthorized_removal_keys(self):
|
||
"""SECURITY: a POST key referencing a WorkLog that wasn't in the
|
||
stashed conflicts list must be silently ignored. The confirm page
|
||
is a strict accept-or-reject gate against what was shown."""
|
||
# Create an UNRELATED WorkLog with a worker on it
|
||
other_worker = Worker.objects.create(
|
||
name='Other Worker', id_number='99', monthly_salary=Decimal('6000'),
|
||
)
|
||
other_wl = WorkLog.objects.create(
|
||
date=_date(2026, 5, 14), project=self.project, supervisor=self.admin,
|
||
)
|
||
other_wl.workers.add(other_worker)
|
||
|
||
# Confirm POST tries to remove other_worker from other_wl
|
||
# (this pair is NOT in the stashed conflicts — only (self.wl.id, self.worker.id) is)
|
||
resp = self.client.post('/absences/log/confirm/', data={
|
||
f'remove_from_worklog_{other_wl.id}_{other_worker.id}': 'on',
|
||
})
|
||
|
||
# The unauthorized removal must NOT have happened
|
||
self.assertIn(other_worker, other_wl.workers.all())
|
||
# The legitimate absence creation still happens
|
||
self.assertEqual(Absence.objects.count(), 1)
|
||
self.assertRedirects(resp, '/absences/', fetch_redirect_response=False)
|
||
|
||
def test_post_with_malformed_removal_key_is_dropped(self):
|
||
"""Malformed POST keys like remove_from_worklog_abc_5 must not crash
|
||
the view — they're silently skipped."""
|
||
resp = self.client.post('/absences/log/confirm/', data={
|
||
'remove_from_worklog_abc_5': 'on', # non-numeric wl_id
|
||
'remove_from_worklog_42_xyz': 'on', # non-numeric worker_id
|
||
'remove_from_worklog_only_three': 'on', # not enough parts
|
||
})
|
||
self.assertEqual(Absence.objects.count(), 1) # Still creates absence
|
||
self.assertEqual(resp.status_code, 302) # No 500
|
||
|
||
|
||
# === ABSENCE LIST / EDIT / DELETE / EXPORT VIEW TESTS ============================
|
||
# Covers Task 5 of the Worker Absences feature: browsing absences via /absences/
|
||
# with filters and pagination; editing a single absence and the paid-flag side
|
||
# effect; deleting absences with cascade rules; admin CSV export.
|
||
|
||
class AbsenceListViewTests(TestCase):
|
||
@classmethod
|
||
def setUpTestData(cls):
|
||
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
|
||
cls.sup = User.objects.create_user(username='sup', password='pw')
|
||
cls.worker_a = Worker.objects.create(name='WA', id_number='1', monthly_salary=Decimal('6000'))
|
||
cls.worker_b = Worker.objects.create(name='WB', id_number='2', monthly_salary=Decimal('6000'))
|
||
cls.team_a = Team.objects.create(name='TA', supervisor=cls.sup)
|
||
cls.team_a.workers.add(cls.worker_a)
|
||
Absence.objects.create(worker=cls.worker_a, date=_date(2026, 5, 1), reason='sick')
|
||
Absence.objects.create(worker=cls.worker_b, date=_date(2026, 5, 1), reason='annual')
|
||
|
||
def test_admin_sees_all(self):
|
||
self.client.force_login(self.admin)
|
||
resp = self.client.get('/absences/')
|
||
self.assertContains(resp, 'WA')
|
||
self.assertContains(resp, 'WB')
|
||
|
||
def test_supervisor_only_sees_own(self):
|
||
self.client.force_login(self.sup)
|
||
resp = self.client.get('/absences/')
|
||
self.assertContains(resp, 'WA')
|
||
self.assertNotContains(resp, 'WB')
|
||
|
||
def test_filter_by_reason(self):
|
||
"""?reason=sick → table rows show only sick-absence workers.
|
||
(WB has annual leave, so WB's name should not appear in any
|
||
table row; but WB CAN appear in the worker filter dropdown —
|
||
that's the option list, which is intentionally NOT narrowed
|
||
by the current filter so users can pivot between filters.)
|
||
"""
|
||
self.client.force_login(self.admin)
|
||
resp = self.client.get('/absences/?reason=sick')
|
||
# Check the row content — both workers may appear in the filter
|
||
# dropdown <option>s, but only WA should have a table data cell.
|
||
self.assertContains(resp, '<td>WA</td>', html=False)
|
||
self.assertNotContains(resp, '<td>WB</td>', html=False)
|
||
|
||
def test_malformed_date_param_does_not_crash(self):
|
||
"""SECURITY: garbage in URL params must not 500. parse_date()
|
||
returns None on invalid input — those filters get skipped.
|
||
"""
|
||
self.client.force_login(self.admin)
|
||
resp = self.client.get('/absences/?date_from=not-a-date&date_to=also-bad')
|
||
self.assertEqual(resp.status_code, 200)
|
||
|
||
|
||
class AbsenceEditDeleteTests(TestCase):
|
||
@classmethod
|
||
def setUpTestData(cls):
|
||
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
|
||
cls.worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('6000'))
|
||
cls.absence = Absence.objects.create(worker=cls.worker, date=_date(2026, 5, 14), reason='sick')
|
||
|
||
def setUp(self):
|
||
self.client.force_login(self.admin)
|
||
|
||
def test_edit_toggling_paid_creates_adjustment(self):
|
||
resp = self.client.post(f'/absences/{self.absence.id}/edit/', data={
|
||
'worker': self.worker.id,
|
||
'date': '2026-05-14',
|
||
'reason': 'sick',
|
||
'is_paid': 'on',
|
||
'notes': '',
|
||
})
|
||
self.absence.refresh_from_db()
|
||
self.assertTrue(self.absence.is_paid)
|
||
self.assertIsNotNone(self.absence.payroll_adjustment)
|
||
|
||
def test_edit_untoggling_paid_deletes_adjustment(self):
|
||
self.absence.is_paid = True
|
||
self.absence.save()
|
||
from core.views import _sync_absence_payroll_adjustment
|
||
_sync_absence_payroll_adjustment(self.absence)
|
||
self.absence.refresh_from_db()
|
||
adj_id = self.absence.payroll_adjustment.id
|
||
|
||
self.client.post(f'/absences/{self.absence.id}/edit/', data={
|
||
'worker': self.worker.id,
|
||
'date': '2026-05-14',
|
||
'reason': 'sick',
|
||
'notes': '',
|
||
# is_paid not in POST -> unchecked
|
||
})
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=adj_id).exists())
|
||
|
||
def test_delete_cascade_unpaid_adjustment(self):
|
||
self.absence.is_paid = True
|
||
self.absence.save()
|
||
from core.views import _sync_absence_payroll_adjustment
|
||
_sync_absence_payroll_adjustment(self.absence)
|
||
self.absence.refresh_from_db()
|
||
adj_id = self.absence.payroll_adjustment.id
|
||
|
||
resp = self.client.post(f'/absences/{self.absence.id}/delete/')
|
||
self.assertFalse(Absence.objects.filter(id=self.absence.id).exists())
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=adj_id).exists())
|
||
|
||
def test_delete_refuses_when_adjustment_paid(self):
|
||
self.absence.is_paid = True
|
||
self.absence.save()
|
||
from core.views import _sync_absence_payroll_adjustment
|
||
_sync_absence_payroll_adjustment(self.absence)
|
||
self.absence.refresh_from_db()
|
||
# Mark the adjustment as paid
|
||
pr = PayrollRecord.objects.create(worker=self.worker, amount_paid=Decimal('300'), date=_date(2026, 5, 30))
|
||
self.absence.payroll_adjustment.payroll_record = pr
|
||
self.absence.payroll_adjustment.save()
|
||
|
||
resp = self.client.post(f'/absences/{self.absence.id}/delete/')
|
||
self.assertTrue(Absence.objects.filter(id=self.absence.id).exists()) # NOT deleted
|
||
|
||
def test_supervisor_cannot_edit_other_team_absence(self):
|
||
sup = User.objects.create_user(username='sup', password='pw')
|
||
# sup doesn't supervise the team that worker is on
|
||
self.client.force_login(sup)
|
||
resp = self.client.get(f'/absences/{self.absence.id}/edit/')
|
||
self.assertEqual(resp.status_code, 404)
|
||
|
||
def test_edit_refuses_to_untoggle_paid_adjustment(self):
|
||
"""SECURITY/CORRECTNESS: untoggling is_paid on an already-paid
|
||
absence must roll back atomically. The absence stays is_paid=True,
|
||
the adjustment stays linked, and an error is surfaced."""
|
||
self.absence.is_paid = True
|
||
self.absence.save()
|
||
from core.views import _sync_absence_payroll_adjustment
|
||
_sync_absence_payroll_adjustment(self.absence)
|
||
self.absence.refresh_from_db()
|
||
adj_id = self.absence.payroll_adjustment.id
|
||
|
||
# Simulate adjustment already paid
|
||
pr = PayrollRecord.objects.create(
|
||
worker=self.worker, amount_paid=Decimal('300'), date=_date(2026, 5, 30),
|
||
)
|
||
adj = self.absence.payroll_adjustment
|
||
adj.payroll_record = pr
|
||
adj.save()
|
||
|
||
# Admin tries to untick is_paid
|
||
resp = self.client.post(f'/absences/{self.absence.id}/edit/', data={
|
||
'worker': self.worker.id,
|
||
'date': '2026-05-14',
|
||
'reason': 'sick',
|
||
'notes': '',
|
||
# is_paid not in POST → form sees unchecked
|
||
})
|
||
|
||
# Rollback verified:
|
||
self.absence.refresh_from_db()
|
||
self.assertTrue(self.absence.is_paid)
|
||
self.assertEqual(self.absence.payroll_adjustment_id, adj_id)
|
||
self.assertTrue(PayrollAdjustment.objects.filter(id=adj_id).exists())
|
||
|
||
|
||
class AbsenceExportCSVTests(TestCase):
|
||
@classmethod
|
||
def setUpTestData(cls):
|
||
cls.admin = User.objects.create_user(username='admin', password='pw', is_staff=True)
|
||
cls.sup = User.objects.create_user(username='sup', password='pw')
|
||
cls.worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('6000'))
|
||
Absence.objects.create(worker=cls.worker, date=_date(2026, 5, 14), reason='sick')
|
||
|
||
def test_admin_can_export(self):
|
||
self.client.force_login(self.admin)
|
||
resp = self.client.get('/absences/export/')
|
||
self.assertEqual(resp.status_code, 200)
|
||
self.assertEqual(resp['Content-Type'], 'text/csv')
|
||
self.assertIn(b'W,', resp.content)
|
||
self.assertIn(b'Sick', resp.content)
|
||
|
||
def test_supervisor_forbidden(self):
|
||
self.client.force_login(self.sup)
|
||
resp = self.client.get('/absences/export/')
|
||
self.assertEqual(resp.status_code, 403)
|