Spec-review catch on 61c485f: the batched GROUP BY aggregates for
unpaid-per-project and paid-per-project x month were running TWO
filtered queries and summing them in Python. Any adjustment with
BOTH project FK AND work_log.project set was double-counted.
Every Overtime adjustment fits that shape (price_overtime sets
both). So every unpaid Overtime was silently inflating the
outstanding-costs dashboard by its own amount, and every paid
Overtime inflated the Per-project-monthly-payroll stacked chart.
Fix: annotate Coalesce('project_id', 'work_log__project_id') so
each adjustment contributes to exactly one project (matches the
original Q(...) | Q(...) OR-filter semantics).
New regression test locks in the "count once" behaviour with an
Overtime adjustment that has both FKs set. Previously there was no
test covering the sum correctness of outstanding-costs - only
context-key presence.
Tests: 69/69. Query counts per tab: pending 24q / history 24q /
loans 25q / adjustments 32q (2 fewer per tab than 61c485f because
Coalesce folded two filtered queries into one).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1382 lines
63 KiB
Python
1382 lines
63 KiB
Python
# === TESTS FOR WORK LOG PAYROLL CROSS-LINK ===
|
||
# Covers the _build_work_log_payroll_context helper — the core logic that
|
||
# determines, for each worker on a log, whether they were paid for it.
|
||
|
||
import datetime
|
||
from decimal import Decimal
|
||
|
||
from django.conf import settings
|
||
from django.contrib.auth.models import User
|
||
from django.test import TestCase
|
||
from django.urls import reverse
|
||
|
||
from core.models import Project, Team, Worker, WorkLog, PayrollRecord, PayrollAdjustment, Loan
|
||
from core.views import _build_work_log_payroll_context, _build_report_context
|
||
|
||
|
||
class WorkLogPayrollContextTests(TestCase):
|
||
"""Tests for the helper that builds the payroll-status view of a work log."""
|
||
|
||
def setUp(self):
|
||
# Minimal scenario: 1 admin, 1 project, 1 team, 3 workers, 1 log.
|
||
# Worker A has been paid for the log; Worker B is priced-not-paid;
|
||
# Worker C is unpaid.
|
||
self.admin = User.objects.create_user(username='admin', is_staff=True)
|
||
|
||
self.project = Project.objects.create(name='Test Project')
|
||
self.team = Team.objects.create(name='Team X', supervisor=self.admin)
|
||
|
||
self.worker_a = Worker.objects.create(name='Alice', id_number='A1', monthly_salary=Decimal('4000'))
|
||
self.worker_b = Worker.objects.create(name='Bob', id_number='B1', monthly_salary=Decimal('4000'))
|
||
self.worker_c = Worker.objects.create(name='Carol', id_number='C1', monthly_salary=Decimal('4000'))
|
||
|
||
self.log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 10),
|
||
project=self.project,
|
||
team=self.team,
|
||
supervisor=self.admin,
|
||
)
|
||
self.log.workers.add(self.worker_a, self.worker_b, self.worker_c)
|
||
|
||
# Worker A has a PayrollRecord linking them and this log — "Paid".
|
||
self.record_a = PayrollRecord.objects.create(
|
||
worker=self.worker_a,
|
||
amount_paid=Decimal('200.00'),
|
||
date=datetime.date(2026, 4, 15),
|
||
)
|
||
self.record_a.work_logs.add(self.log)
|
||
|
||
# Worker B appears in priced_workers but has no PayrollRecord — "Priced, not paid".
|
||
self.log.priced_workers.add(self.worker_b)
|
||
|
||
# Worker C has neither — "Unpaid".
|
||
|
||
def test_returns_log_and_worker_rows(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
self.assertEqual(ctx['log'], self.log)
|
||
self.assertEqual(len(ctx['worker_rows']), 3)
|
||
|
||
def test_paid_worker_has_payslip_link(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
row = next(r for r in ctx['worker_rows'] if r['worker'].id == self.worker_a.id)
|
||
self.assertEqual(row['status'], 'Paid')
|
||
self.assertEqual(row['payroll_record'], self.record_a)
|
||
self.assertGreater(row['earned'], 0)
|
||
|
||
def test_priced_but_unpaid_worker(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
row = next(r for r in ctx['worker_rows'] if r['worker'].id == self.worker_b.id)
|
||
self.assertEqual(row['status'], 'Priced, not paid')
|
||
self.assertIsNone(row['payroll_record'])
|
||
|
||
def test_totally_unpaid_worker(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
row = next(r for r in ctx['worker_rows'] if r['worker'].id == self.worker_c.id)
|
||
self.assertEqual(row['status'], 'Unpaid')
|
||
self.assertIsNone(row['payroll_record'])
|
||
|
||
def test_totals(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
# Paid = Alice's daily_rate (one record exists for this log+worker).
|
||
self.assertEqual(ctx['total_paid'], self.worker_a.daily_rate)
|
||
# Outstanding = Bob + Carol each at their daily_rate.
|
||
expected = self.worker_b.daily_rate + self.worker_c.daily_rate
|
||
self.assertEqual(ctx['total_outstanding'], expected)
|
||
|
||
def test_adjustments_linked_to_log(self):
|
||
adj = PayrollAdjustment.objects.create(
|
||
worker=self.worker_a,
|
||
project=self.project,
|
||
type='Overtime',
|
||
amount=Decimal('50.00'),
|
||
date=datetime.date(2026, 4, 10),
|
||
description='Extra hour',
|
||
work_log=self.log,
|
||
)
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
self.assertIn(adj, ctx['adjustments'])
|
||
|
||
def test_pay_period_absent_if_no_schedule(self):
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
self.assertEqual(ctx['pay_period'], (None, None))
|
||
|
||
def test_pay_period_present_when_schedule_configured(self):
|
||
self.team.pay_frequency = 'weekly'
|
||
self.team.pay_start_date = datetime.date(2026, 1, 5) # A Monday
|
||
self.team.save()
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
start, end = ctx['pay_period']
|
||
self.assertIsNotNone(start)
|
||
self.assertIsNotNone(end)
|
||
self.assertLessEqual(start, self.log.date)
|
||
self.assertGreaterEqual(end, self.log.date)
|
||
|
||
def test_overtime_needs_pricing_flag(self):
|
||
"""Flag fires when log has OT hours but no priced_workers yet."""
|
||
# Start: no OT, no priced workers -> flag False
|
||
self.assertFalse(_build_work_log_payroll_context(self.log)['overtime_needs_pricing'])
|
||
|
||
# Add OT hours but keep priced_workers empty -> flag True
|
||
self.log.overtime_amount = Decimal('0.50')
|
||
self.log.priced_workers.clear()
|
||
self.log.save()
|
||
self.assertTrue(_build_work_log_payroll_context(self.log)['overtime_needs_pricing'])
|
||
|
||
# Price the OT -> flag False again
|
||
self.log.priced_workers.add(self.worker_a)
|
||
self.assertFalse(_build_work_log_payroll_context(self.log)['overtime_needs_pricing'])
|
||
|
||
def test_query_count_is_bounded(self):
|
||
"""Helper should not issue per-worker queries — guards against N+1."""
|
||
# 4 queries: payroll_records, priced_workers, workers.all, adjustments
|
||
# (plus whichever assertNumQueries overhead Django's test client adds).
|
||
# We assert a tight upper bound; regressions that add per-worker queries
|
||
# will push this well above the bound and fail the test.
|
||
with self.assertNumQueries(4):
|
||
_build_work_log_payroll_context(self.log)
|
||
|
||
def test_empty_log_returns_zero_totals(self):
|
||
"""Log with no workers: helper returns empty rows and zero totals."""
|
||
empty_log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 11),
|
||
project=self.project,
|
||
team=self.team,
|
||
supervisor=self.admin,
|
||
)
|
||
ctx = _build_work_log_payroll_context(empty_log)
|
||
self.assertEqual(ctx['worker_rows'], [])
|
||
self.assertEqual(ctx['total_earned'], Decimal('0.00'))
|
||
self.assertEqual(ctx['total_paid'], Decimal('0.00'))
|
||
self.assertEqual(ctx['total_outstanding'], Decimal('0.00'))
|
||
self.assertEqual(ctx['adjustments'], [])
|
||
|
||
def test_log_without_team_has_no_pay_period(self):
|
||
"""Log whose team was later soft-deleted to NULL still works."""
|
||
self.log.team = None
|
||
self.log.save()
|
||
ctx = _build_work_log_payroll_context(self.log)
|
||
self.assertEqual(ctx['pay_period'], (None, None))
|
||
# The rest of the context should still populate correctly.
|
||
self.assertEqual(len(ctx['worker_rows']), 3)
|
||
|
||
|
||
# === TESTS FOR THE WORK LOG PAYROLL AJAX ENDPOINT ===
|
||
# These cover the JSON endpoint that the Task-5 modal will consume.
|
||
# The endpoint must: return JSON to admins, forbid supervisors/anons,
|
||
# and 404 on unknown logs.
|
||
|
||
class WorkLogPayrollAjaxTests(TestCase):
|
||
"""Tests for the JSON AJAX endpoint that powers the modal."""
|
||
|
||
def setUp(self):
|
||
# One admin, one non-admin supervisor, and a simple log with one worker.
|
||
self.admin = User.objects.create_user(
|
||
username='admin', password='pass', is_staff=True
|
||
)
|
||
self.supervisor = User.objects.create_user(
|
||
username='sup', password='pass', is_staff=False
|
||
)
|
||
project = Project.objects.create(name='P')
|
||
team = Team.objects.create(name='T', supervisor=self.admin)
|
||
worker = Worker.objects.create(name='W', id_number='1', monthly_salary=Decimal('4000'))
|
||
self.log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 10),
|
||
project=project, team=team, supervisor=self.admin,
|
||
)
|
||
self.log.workers.add(worker)
|
||
|
||
def test_admin_sees_200_json(self):
|
||
# Admin hits the endpoint and gets a well-formed JSON body.
|
||
self.client.login(username='admin', password='pass')
|
||
url = reverse('work_log_payroll_ajax', args=[self.log.id])
|
||
resp = self.client.get(url)
|
||
self.assertEqual(resp.status_code, 200)
|
||
data = resp.json()
|
||
self.assertEqual(data['log_id'], self.log.id)
|
||
self.assertEqual(len(data['worker_rows']), 1)
|
||
self.assertEqual(data['worker_rows'][0]['status'], 'Unpaid')
|
||
|
||
def test_supervisor_forbidden(self):
|
||
# A non-admin user (even if authenticated) gets 403 JSON.
|
||
self.client.login(username='sup', password='pass')
|
||
url = reverse('work_log_payroll_ajax', args=[self.log.id])
|
||
resp = self.client.get(url)
|
||
self.assertEqual(resp.status_code, 403)
|
||
|
||
def test_anonymous_redirected_to_login(self):
|
||
# @login_required intercepts before our view ever runs — 302 to /accounts/login/.
|
||
url = reverse('work_log_payroll_ajax', args=[self.log.id])
|
||
resp = self.client.get(url)
|
||
self.assertEqual(resp.status_code, 302)
|
||
|
||
def test_missing_log_is_404(self):
|
||
# get_object_or_404 returns a 404 if the log_id doesn't exist.
|
||
self.client.login(username='admin', password='pass')
|
||
resp = self.client.get('/history/99999/payroll/ajax/')
|
||
self.assertEqual(resp.status_code, 404)
|
||
|
||
def test_full_payload_with_paid_worker_null_team_and_ot(self):
|
||
"""One scenario exercising: paid worker, null team, OT flag, full URL."""
|
||
# Create a paid worker on a separate log with no team (null team edge case)
|
||
# and with overtime hours but no priced_workers (unpriced OT flag).
|
||
project = Project.objects.create(name='P-full')
|
||
paid_worker = Worker.objects.create(
|
||
name='Paula', id_number='P1', monthly_salary=Decimal('4000')
|
||
)
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 11),
|
||
project=project,
|
||
team=None, # null team
|
||
supervisor=self.admin,
|
||
overtime_amount=Decimal('1.50'), # > 0 and no priced_workers -> flag fires
|
||
)
|
||
log.workers.add(paid_worker)
|
||
|
||
# Link a PayrollRecord so Paula shows as Paid
|
||
record = PayrollRecord.objects.create(
|
||
worker=paid_worker,
|
||
amount_paid=Decimal('250.00'),
|
||
date=datetime.date(2026, 4, 16),
|
||
)
|
||
record.work_logs.add(log)
|
||
|
||
# Link an adjustment to the same log
|
||
adj = PayrollAdjustment.objects.create(
|
||
worker=paid_worker,
|
||
project=project,
|
||
type='Overtime',
|
||
amount=Decimal('75.00'),
|
||
date=datetime.date(2026, 4, 11),
|
||
description='OT pricing',
|
||
work_log=log,
|
||
payroll_record=record,
|
||
)
|
||
|
||
self.client.login(username='admin', password='pass')
|
||
resp = self.client.get(reverse('work_log_payroll_ajax', args=[log.id]))
|
||
self.assertEqual(resp.status_code, 200)
|
||
data = resp.json()
|
||
|
||
# Null team branch
|
||
self.assertIsNone(data['team'])
|
||
# Project is still present
|
||
self.assertEqual(data['project']['name'], 'P-full')
|
||
# Paid worker serialization shape
|
||
paula_row = next(r for r in data['worker_rows'] if r['worker_name'] == 'Paula')
|
||
self.assertEqual(paula_row['status'], 'Paid')
|
||
self.assertEqual(paula_row['payroll_record_id'], record.pk)
|
||
self.assertEqual(paula_row['paid_date'], '2026-04-16')
|
||
# OT flag fires (overtime > 0, no priced_workers)
|
||
self.assertTrue(data['overtime_needs_pricing'])
|
||
# full_page_url is the reverse of work_log_payroll_detail
|
||
self.assertEqual(data['full_page_url'], f'/history/{log.id}/')
|
||
# Adjustment serialized with payslip link
|
||
self.assertEqual(len(data['adjustments']), 1)
|
||
self.assertEqual(data['adjustments'][0]['type'], 'Overtime')
|
||
self.assertEqual(data['adjustments'][0]['payroll_record_id'], record.pk)
|
||
|
||
|
||
# === TESTS FOR THE WORK LOG PAYROLL FULL-PAGE VIEW ===
|
||
# These cover the HTML page at /history/<id>/ that shares the same context
|
||
# builder as the AJAX endpoint. Admin sees a 200 HTML page; supervisor 403.
|
||
|
||
class WorkLogPayrollDetailTests(TestCase):
|
||
"""Tests for the full-page /history/<id>/ view."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(
|
||
username='admin', password='pass', is_staff=True
|
||
)
|
||
self.supervisor = User.objects.create_user(
|
||
username='sup', password='pass', is_staff=False
|
||
)
|
||
project = Project.objects.create(name='P2')
|
||
team = Team.objects.create(name='T2', supervisor=self.admin)
|
||
worker = Worker.objects.create(name='Wanda', id_number='X', monthly_salary=Decimal('4000'))
|
||
self.log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 10),
|
||
project=project, team=team, supervisor=self.admin,
|
||
)
|
||
self.log.workers.add(worker)
|
||
|
||
def test_admin_gets_full_page(self):
|
||
self.client.login(username='admin', password='pass')
|
||
url = reverse('work_log_payroll_detail', args=[self.log.id])
|
||
resp = self.client.get(url)
|
||
self.assertEqual(resp.status_code, 200)
|
||
self.assertContains(resp, 'FoxFitt')
|
||
self.assertContains(resp, 'History')
|
||
self.assertContains(resp, 'Wanda')
|
||
|
||
def test_supervisor_forbidden(self):
|
||
self.client.login(username='sup', password='pass')
|
||
url = reverse('work_log_payroll_detail', args=[self.log.id])
|
||
resp = self.client.get(url)
|
||
self.assertEqual(resp.status_code, 403)
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR PAYROLL REPORT FILTER INFLATION ===
|
||
# Regression tests for the M2M double-JOIN bug in _build_report_context.
|
||
#
|
||
# THE BUG (fixed 2026-04-23):
|
||
# Filtering a report by project AND team via chained `.filter(work_logs__field=X)`
|
||
# calls produced TWO separate JOIN aliases on the core_payrollrecord_work_logs
|
||
# M2M table. Any downstream `.values().annotate(Sum(...))` then aggregated
|
||
# across the cartesian product of matching rows, inflating every per-worker
|
||
# total_paid and every payments_by_date amount by N² (where N = number of
|
||
# matching work logs per record). Total_paid_out itself was correct because
|
||
# `.aggregate(Sum(...))` wraps distinct() in a subquery, but the per-row
|
||
# values/annotate pattern doesn't get that help.
|
||
#
|
||
# These tests lock down the fix: with filters applied, the worker-level and
|
||
# date-level totals must equal the real payment amount, not a multiplied one.
|
||
# =============================================================================
|
||
|
||
|
||
class ReportContextFilterInflationTests(TestCase):
|
||
"""Report aggregates must not inflate when project + team filters combine."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='admin-r', is_staff=True)
|
||
self.project = Project.objects.create(name='Solar Farm Gamma')
|
||
self.team = Team.objects.create(name='Team Gamma', supervisor=self.admin)
|
||
self.worker = Worker.objects.create(
|
||
name='Test Worker', id_number='TW1', monthly_salary=Decimal('4000')
|
||
)
|
||
# Worker must be in the team's M2M for the adjustment-summary test
|
||
# to find them via worker__teams__id=team_id. In real data this is
|
||
# the standard setup — workers belong to a team.
|
||
self.team.workers.add(self.worker)
|
||
# Three work logs in the range, all in the same project + team.
|
||
# This is the minimum setup that reproduces the N² inflation: with
|
||
# one payroll record linked to 3 logs, the double-JOIN produces 9 rows.
|
||
self.logs = []
|
||
for day in (5, 10, 15):
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, day),
|
||
project=self.project,
|
||
team=self.team,
|
||
supervisor=self.admin,
|
||
)
|
||
log.workers.add(self.worker)
|
||
self.logs.append(log)
|
||
# One payment covering all 3 logs.
|
||
self.record = PayrollRecord.objects.create(
|
||
worker=self.worker,
|
||
amount_paid=Decimal('600.00'),
|
||
date=datetime.date(2026, 3, 20),
|
||
)
|
||
self.record.work_logs.add(*self.logs)
|
||
|
||
def _ctx(self, project_ids=None, team_ids=None):
|
||
return _build_report_context(
|
||
datetime.date(2026, 3, 1),
|
||
datetime.date(2026, 3, 31),
|
||
project_ids=project_ids,
|
||
team_ids=team_ids,
|
||
)
|
||
|
||
def test_worker_breakdown_not_inflated_with_project_filter_only(self):
|
||
ctx = self._ctx(project_ids=[self.project.id])
|
||
self.assertEqual(len(ctx['worker_breakdown']), 1)
|
||
# Pre-fix: this was 600 × 3 = 1800 (one JOIN, 3-way inflation).
|
||
self.assertEqual(ctx['worker_breakdown'][0]['total_paid'], Decimal('600.00'))
|
||
|
||
def test_worker_breakdown_not_inflated_with_both_filters(self):
|
||
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
|
||
self.assertEqual(len(ctx['worker_breakdown']), 1)
|
||
# Pre-fix: this was 600 × 3 × 3 = 5400 (two JOINs, 9-way inflation).
|
||
self.assertEqual(ctx['worker_breakdown'][0]['total_paid'], Decimal('600.00'))
|
||
|
||
def test_payments_by_date_not_inflated_with_both_filters(self):
|
||
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
|
||
payments = list(ctx['payments_by_date'])
|
||
self.assertEqual(len(payments), 1)
|
||
self.assertEqual(payments[0]['total'], Decimal('600.00'))
|
||
|
||
def test_total_paid_out_stays_correct_with_both_filters(self):
|
||
"""Regression guard: total_paid_out was ALREADY correct pre-fix
|
||
because .aggregate() handles distinct() via a subquery. Lock it in
|
||
so a future refactor doesn't accidentally reintroduce inflation here."""
|
||
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
|
||
self.assertEqual(ctx['total_paid_out'], Decimal('600.00'))
|
||
|
||
def test_adjustment_summary_not_inflated_with_team_filter(self):
|
||
"""Adjustments filtered by team go through worker__teams (M2M) — same
|
||
bug class. values().annotate(Sum()) would inflate if the worker is in
|
||
multiple teams or if the JOIN is chained with other M2M filters."""
|
||
PayrollAdjustment.objects.create(
|
||
worker=self.worker,
|
||
project=self.project,
|
||
type='Bonus',
|
||
amount=Decimal('100.00'),
|
||
date=datetime.date(2026, 3, 10),
|
||
description='Test bonus',
|
||
)
|
||
ctx = self._ctx(project_ids=[self.project.id], team_ids=[self.team.id])
|
||
totals = {item['type']: item['total'] for item in ctx['adjustment_totals']}
|
||
self.assertEqual(totals.get('Bonus'), Decimal('100.00'))
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR SUPERVISOR PICKER QUERYSET ===
|
||
# Regression tests for the TeamForm/ProjectForm supervisor dropdown.
|
||
#
|
||
# THE BUG (fixed 2026-04-23):
|
||
# The picker required either admin flags (is_staff/is_superuser) or "Work Logger"
|
||
# group membership. Any active user NOT pre-enrolled in that specific group
|
||
# was invisible, even though the app's downstream `is_supervisor()` helper
|
||
# grants supervisor powers to anyone assigned to a team/project FK/M2M —
|
||
# regardless of group. The picker was strictly more restrictive than the
|
||
# permission model, hiding field supervisors from the person trying to assign
|
||
# them. Fix: allow any active user; let the act of assignment confer
|
||
# supervisor-ness (matching how `is_supervisor` works).
|
||
# =============================================================================
|
||
|
||
|
||
class SupervisorPickerQuerysetTests(TestCase):
|
||
"""Supervisor picker must surface all active users, not just admins."""
|
||
|
||
def test_regular_active_user_is_selectable(self):
|
||
"""A plain active user (no is_staff, no is_superuser, no Work Logger
|
||
group) must appear. Pre-fix: excluded by the Q filter."""
|
||
from core.forms import _supervisor_user_queryset
|
||
regular = User.objects.create_user(
|
||
username='field_supervisor', password='pass',
|
||
is_staff=False, is_superuser=False,
|
||
)
|
||
self.assertIn(regular, _supervisor_user_queryset())
|
||
|
||
def test_user_in_unrelated_group_is_still_selectable(self):
|
||
"""A user in some random group (not 'Work Logger') is still an active
|
||
user and must show up. Pre-fix: excluded because the Q filter specifically
|
||
checked groups__name='Work Logger'."""
|
||
from django.contrib.auth.models import Group
|
||
from core.forms import _supervisor_user_queryset
|
||
user = User.objects.create_user(username='misc_group_user', password='pass')
|
||
group, _ = Group.objects.get_or_create(name='Some Other Group')
|
||
user.groups.add(group)
|
||
self.assertIn(user, _supervisor_user_queryset())
|
||
|
||
def test_inactive_user_still_excluded(self):
|
||
"""Inactive users must NEVER appear in the picker — even if they used
|
||
to be admins or Work Loggers. Active-only is the one hard guardrail."""
|
||
from core.forms import _supervisor_user_queryset
|
||
inactive = User.objects.create_user(
|
||
username='former_employee', password='pass',
|
||
is_active=False, is_staff=True,
|
||
)
|
||
self.assertNotIn(inactive, _supervisor_user_queryset())
|
||
|
||
def test_admin_still_selectable(self):
|
||
"""Defense-in-depth: admins continue to appear (no regression for the
|
||
existing use case)."""
|
||
from core.forms import _supervisor_user_queryset
|
||
admin = User.objects.create_user(
|
||
username='an_admin', password='pass', is_staff=True
|
||
)
|
||
self.assertIn(admin, _supervisor_user_queryset())
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR EXECUTIVE REPORT v2 ===
|
||
# Covers the new helpers introduced in the report rebuild (Apr 2026):
|
||
# _company_cost_velocity, _current_outstanding_in_scope, _team_project_activity,
|
||
# and the multi-filter extension of _build_report_context.
|
||
# =============================================================================
|
||
|
||
|
||
class CompanyCostVelocityTests(TestCase):
|
||
"""Company-wide avg daily and monthly labour cost (hero KPI card 3 & 4)."""
|
||
|
||
def test_empty_db_returns_zero(self):
|
||
from core.views import _company_cost_velocity
|
||
result = _company_cost_velocity()
|
||
self.assertEqual(result['avg_daily'], Decimal('0.00'))
|
||
self.assertEqual(result['avg_monthly'], Decimal('0.00'))
|
||
self.assertEqual(result['working_days'], 0)
|
||
|
||
def test_known_values(self):
|
||
from core.views import _company_cost_velocity
|
||
# Setup: 2 workers (daily_rate = 4000/20 = R 200 each), each works 5 distinct dates.
|
||
# Lifetime cost = 2 workers * 5 days * R 200 = R 2000. Working days = 5.
|
||
# Avg daily = 2000 / 5 = R 400.
|
||
# Avg monthly = 400 * 30.44 = R 12,176.
|
||
admin = User.objects.create_user(username='admin-cv', is_staff=True)
|
||
project = Project.objects.create(name='P')
|
||
w1 = Worker.objects.create(name='W1', id_number='W1', monthly_salary=Decimal('4000'))
|
||
w2 = Worker.objects.create(name='W2', id_number='W2', monthly_salary=Decimal('4000'))
|
||
for d in range(1, 6): # 5 distinct dates
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, d),
|
||
project=project, supervisor=admin,
|
||
)
|
||
log.workers.add(w1, w2)
|
||
|
||
result = _company_cost_velocity()
|
||
self.assertEqual(result['working_days'], 5)
|
||
self.assertEqual(result['avg_daily'], Decimal('400.00'))
|
||
# Tolerance: ±1 cent for the 30.44 multiplication
|
||
self.assertAlmostEqual(
|
||
float(result['avg_monthly']), 12176.00, delta=0.01
|
||
)
|
||
|
||
def test_duplicate_dates_not_double_counted(self):
|
||
"""Two workers working the same date = 1 distinct date, not 2."""
|
||
from core.views import _company_cost_velocity
|
||
admin = User.objects.create_user(username='admin-cv2', is_staff=True)
|
||
project = Project.objects.create(name='P2')
|
||
w1 = Worker.objects.create(name='X', id_number='X1', monthly_salary=Decimal('4000'))
|
||
w2 = Worker.objects.create(name='Y', id_number='Y1', monthly_salary=Decimal('4000'))
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 1), project=project, supervisor=admin,
|
||
)
|
||
log.workers.add(w1, w2)
|
||
result = _company_cost_velocity()
|
||
self.assertEqual(result['working_days'], 1) # not 2
|
||
|
||
|
||
class CurrentOutstandingInScopeTests(TestCase):
|
||
"""Hero card 2 — 'Outstanding NOW' with optional filter scope."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='a-out', is_staff=True)
|
||
self.p1 = Project.objects.create(name='ProjA')
|
||
self.p2 = Project.objects.create(name='ProjB')
|
||
self.t1 = Team.objects.create(name='TeamA', supervisor=self.admin)
|
||
self.w = Worker.objects.create(
|
||
name='Wkr', id_number='W1', monthly_salary=Decimal('4000')
|
||
)
|
||
self.t1.workers.add(self.w)
|
||
# Unpaid log on project 1
|
||
log1 = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 1),
|
||
project=self.p1, team=self.t1, supervisor=self.admin,
|
||
)
|
||
log1.workers.add(self.w)
|
||
# Unpaid log on project 2
|
||
log2 = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 2),
|
||
project=self.p2, team=self.t1, supervisor=self.admin,
|
||
)
|
||
log2.workers.add(self.w)
|
||
|
||
def test_no_filters_includes_all_projects(self):
|
||
from core.views import _current_outstanding_in_scope
|
||
result = _current_outstanding_in_scope()
|
||
# daily_rate = 4000/20 = 200; 2 unpaid logs * 200 = 400
|
||
self.assertEqual(result['total'], Decimal('400.00'))
|
||
self.assertEqual(len(result['by_project']), 2)
|
||
|
||
def test_project_filter_scopes_total(self):
|
||
from core.views import _current_outstanding_in_scope
|
||
result = _current_outstanding_in_scope(project_ids=[self.p1.id])
|
||
self.assertEqual(result['total'], Decimal('200.00'))
|
||
self.assertEqual(len(result['by_project']), 1)
|
||
self.assertEqual(result['by_project'][0]['name'], 'ProjA')
|
||
|
||
def test_team_filter_scopes_total(self):
|
||
"""Team filter on work logs + worker__teams on adjustments."""
|
||
from core.views import _current_outstanding_in_scope
|
||
# Adjustment on a worker not in t1
|
||
other_worker = Worker.objects.create(
|
||
name='Other', id_number='O1', monthly_salary=Decimal('4000')
|
||
)
|
||
PayrollAdjustment.objects.create(
|
||
worker=other_worker, project=self.p1, type='Bonus',
|
||
amount=Decimal('500.00'), date=datetime.date(2026, 3, 3),
|
||
)
|
||
# With team filter, only self.w's logs appear — R 400 total
|
||
result = _current_outstanding_in_scope(team_ids=[self.t1.id])
|
||
self.assertEqual(result['total'], Decimal('400.00'))
|
||
# The R500 bonus on other_worker must NOT appear in by_project because
|
||
# that worker isn't in t1 — the team scope excludes them entirely.
|
||
self.assertEqual(len(result['by_project']), 2)
|
||
amounts = [row['amount'] for row in result['by_project']]
|
||
self.assertNotIn(Decimal('500.00'), amounts)
|
||
|
||
|
||
class TeamProjectActivityTests(TestCase):
|
||
"""Chapter IV pivot: rows=team, columns=project, cell=distinct log dates."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='a-tpa', is_staff=True)
|
||
self.p1 = Project.objects.create(name='P1')
|
||
self.p2 = Project.objects.create(name='P2')
|
||
self.t1 = Team.objects.create(name='T1', supervisor=self.admin)
|
||
self.t2 = Team.objects.create(name='T2', supervisor=self.admin)
|
||
w = Worker.objects.create(name='W', id_number='W1', monthly_salary=Decimal('4000'))
|
||
|
||
# T1 works 3 distinct dates on P1
|
||
for d in (1, 2, 3):
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, d), project=self.p1, team=self.t1,
|
||
supervisor=self.admin,
|
||
)
|
||
log.workers.add(w)
|
||
|
||
# T2 works 2 distinct dates on P1 and 1 on P2
|
||
for d in (4, 5):
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, d), project=self.p1, team=self.t2,
|
||
supervisor=self.admin,
|
||
)
|
||
log.workers.add(w)
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 6), project=self.p2, team=self.t2,
|
||
supervisor=self.admin,
|
||
)
|
||
log.workers.add(w)
|
||
|
||
self.logs_qs = WorkLog.objects.filter(
|
||
date__gte=datetime.date(2026, 3, 1),
|
||
date__lte=datetime.date(2026, 3, 31),
|
||
)
|
||
|
||
def test_pivot_shape(self):
|
||
from core.views import _team_project_activity
|
||
r = _team_project_activity(self.logs_qs)
|
||
# 2 columns (P1, P2), 2 rows (T1, T2)
|
||
self.assertEqual(len(r['columns']), 2)
|
||
self.assertEqual(len(r['rows']), 2)
|
||
|
||
def test_cell_counts(self):
|
||
from core.views import _team_project_activity
|
||
r = _team_project_activity(self.logs_qs)
|
||
rows = {row['team_name']: row for row in r['rows']}
|
||
# T1 has 3 days on P1, 0 on P2
|
||
self.assertEqual(rows['T1']['cells_by_project_id'][self.p1.id], 3)
|
||
self.assertEqual(rows['T1']['cells_by_project_id'].get(self.p2.id, 0), 0)
|
||
# T2 has 2 days on P1, 1 on P2
|
||
self.assertEqual(rows['T2']['cells_by_project_id'][self.p1.id], 2)
|
||
self.assertEqual(rows['T2']['cells_by_project_id'][self.p2.id], 1)
|
||
|
||
def test_row_and_column_totals(self):
|
||
from core.views import _team_project_activity
|
||
r = _team_project_activity(self.logs_qs)
|
||
rows = {row['team_name']: row for row in r['rows']}
|
||
self.assertEqual(rows['T1']['row_total'], 3)
|
||
self.assertEqual(rows['T2']['row_total'], 3)
|
||
self.assertEqual(r['col_totals'][self.p1.id], 5)
|
||
self.assertEqual(r['col_totals'][self.p2.id], 1)
|
||
self.assertEqual(r['grand_total'], 6)
|
||
|
||
def test_team_with_no_logs_omitted(self):
|
||
"""Team with zero logs in the period should not appear as a row."""
|
||
from core.views import _team_project_activity
|
||
Team.objects.create(name='GhostTeam', supervisor=self.admin)
|
||
r = _team_project_activity(self.logs_qs)
|
||
team_names = [row['team_name'] for row in r['rows']]
|
||
self.assertNotIn('GhostTeam', team_names)
|
||
|
||
|
||
class ChapterOneEnrichmentTests(TestCase):
|
||
"""Chapter I — All Time Projects gains working_days and avg_per_working_day."""
|
||
|
||
def test_alltime_projects_includes_working_days_and_avg(self):
|
||
from core.views import _build_report_context
|
||
admin = User.objects.create_user(username='c1', is_staff=True)
|
||
proj = Project.objects.create(name='C1', start_date=datetime.date(2026, 1, 1))
|
||
w = Worker.objects.create(name='W', id_number='W1', monthly_salary=Decimal('4000'))
|
||
# 4 distinct dates, 1 worker each; daily_rate=200; total = R 800; working_days=4; avg=200
|
||
for d in (1, 2, 3, 4):
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, d), project=proj, supervisor=admin,
|
||
)
|
||
log.workers.add(w)
|
||
ctx = _build_report_context(
|
||
datetime.date(2026, 1, 1), datetime.date(2026, 12, 31),
|
||
)
|
||
by_name = {p['project']: p for p in ctx['alltime_projects']}
|
||
self.assertIn('C1', by_name)
|
||
self.assertEqual(by_name['C1']['working_days'], 4)
|
||
self.assertEqual(by_name['C1']['avg_per_working_day'], Decimal('200.00'))
|
||
self.assertEqual(by_name['C1']['start_date'], datetime.date(2026, 1, 1))
|
||
# last_activity = the most recent WorkLog.date (4th of March here)
|
||
self.assertEqual(
|
||
by_name['C1']['last_activity'], datetime.date(2026, 3, 4),
|
||
'alltime_projects rows should expose the most-recent log date '
|
||
'so the report can show "Last Activity" per project'
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR MULTI-VALUE FILTER SUPPORT (Task 6) ===
|
||
# _build_report_context now accepts project_ids and team_ids lists.
|
||
# =============================================================================
|
||
|
||
|
||
class ReportMultiFilterTests(TestCase):
|
||
"""Task 6 — multi-value project_ids / team_ids filters."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(username='mf', is_staff=True)
|
||
self.p1 = Project.objects.create(name='P1')
|
||
self.p2 = Project.objects.create(name='P2')
|
||
self.p3 = Project.objects.create(name='P3')
|
||
self.team = Team.objects.create(name='T', supervisor=self.admin)
|
||
self.w = Worker.objects.create(
|
||
name='W', id_number='W1', monthly_salary=Decimal('4000')
|
||
)
|
||
self.team.workers.add(self.w)
|
||
# One log + one paid record per project
|
||
for proj in (self.p1, self.p2, self.p3):
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 1),
|
||
project=proj, team=self.team, supervisor=self.admin,
|
||
)
|
||
log.workers.add(self.w)
|
||
rec = PayrollRecord.objects.create(
|
||
worker=self.w, amount_paid=Decimal('100.00'),
|
||
date=datetime.date(2026, 3, 5),
|
||
)
|
||
rec.work_logs.add(log)
|
||
|
||
def _ctx(self, project_ids=None, team_ids=None):
|
||
from core.views import _build_report_context
|
||
return _build_report_context(
|
||
datetime.date(2026, 3, 1), datetime.date(2026, 3, 31),
|
||
project_ids=project_ids, team_ids=team_ids,
|
||
)
|
||
|
||
def test_multi_project_union(self):
|
||
ctx = self._ctx(project_ids=[self.p1.id, self.p2.id])
|
||
# Two projects paid R 100 each = R 200; third excluded
|
||
self.assertEqual(ctx['total_paid_out'], Decimal('200.00'))
|
||
|
||
def test_empty_list_equals_none(self):
|
||
ctx_none = self._ctx(project_ids=None)
|
||
ctx_empty = self._ctx(project_ids=[])
|
||
self.assertEqual(ctx_none['total_paid_out'], ctx_empty['total_paid_out'])
|
||
|
||
def test_no_inflation_with_multi_project(self):
|
||
"""Worker breakdown must not inflate when multiple projects are selected."""
|
||
ctx = self._ctx(project_ids=[self.p1.id, self.p2.id, self.p3.id])
|
||
self.assertEqual(len(ctx['worker_breakdown']), 1)
|
||
# All three records are for the same worker, R 100 each = R 300
|
||
self.assertEqual(ctx['worker_breakdown'][0]['total_paid'], Decimal('300.00'))
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR INLINE FILTERS (Report Page) ===
|
||
# Pill-as-dropdown + cross-filter feature. Most behaviour is template/JS;
|
||
# the only backend surface is the project_team_pairs_json context key that
|
||
# powers the client-side Team<->Project cross-filter.
|
||
# =============================================================================
|
||
|
||
|
||
class InlineFiltersPairsContextTests(TestCase):
|
||
"""Report view must serialise distinct (project_id, team_id) pairs for
|
||
the pill-popover cross-filter JS."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(
|
||
username='admin-if', password='pass', is_staff=True
|
||
)
|
||
self.p1 = Project.objects.create(name='P1')
|
||
self.p2 = Project.objects.create(name='P2')
|
||
self.t1 = Team.objects.create(name='T1', supervisor=self.admin)
|
||
self.t2 = Team.objects.create(name='T2', supervisor=self.admin)
|
||
self.w = Worker.objects.create(
|
||
name='W', id_number='W1', monthly_salary=Decimal('4000')
|
||
)
|
||
# Log t1 on p1, t2 on p2 — so pairs should be [(p1,t1), (p2,t2)]
|
||
for proj, team in [(self.p1, self.t1), (self.p2, self.t2)]:
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 1),
|
||
project=proj, team=team, supervisor=self.admin,
|
||
)
|
||
log.workers.add(self.w)
|
||
|
||
def test_pairs_context_key_populated(self):
|
||
# The context value is a raw Python list of dicts; Django's
|
||
# |json_script filter handles the single JSON serialisation at
|
||
# template render time (no double-encoding).
|
||
self.client.login(username='admin-if', password='pass')
|
||
url = reverse('generate_report')
|
||
resp = self.client.get(url + '?from_month=2026-03&to_month=2026-04')
|
||
self.assertEqual(resp.status_code, 200)
|
||
pairs = resp.context['project_team_pairs_json']
|
||
# Each entry has both project_id and team_id
|
||
for p in pairs:
|
||
self.assertIn('project_id', p)
|
||
self.assertIn('team_id', p)
|
||
# Expected pairs (as tuples for set comparison)
|
||
pair_set = {(p['project_id'], p['team_id']) for p in pairs}
|
||
self.assertIn((self.p1.id, self.t1.id), pair_set)
|
||
self.assertIn((self.p2.id, self.t2.id), pair_set)
|
||
|
||
def test_pairs_excludes_null_project_or_team(self):
|
||
"""Logs with null project or null team should not appear in pairs."""
|
||
# Add a log with team=None
|
||
log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 3, 2),
|
||
project=self.p1, team=None, supervisor=self.admin,
|
||
)
|
||
log.workers.add(self.w)
|
||
|
||
self.client.login(username='admin-if', password='pass')
|
||
resp = self.client.get(reverse('generate_report') + '?from_month=2026-03&to_month=2026-04')
|
||
pairs = resp.context['project_team_pairs_json']
|
||
# No pair should have team_id=None
|
||
self.assertTrue(all(p['team_id'] is not None for p in pairs))
|
||
|
||
def test_pairs_renders_as_valid_json_in_template(self):
|
||
"""End-to-end: the rendered HTML must contain a single, valid JSON
|
||
array inside the <script id="projectTeamPairs"> tag — NOT a
|
||
JSON-encoded string (which was the bug that broke all pill
|
||
interactions before the context key was changed from
|
||
`json.dumps(pairs)` to raw `pairs`)."""
|
||
import json as _json
|
||
import re
|
||
|
||
self.client.login(username='admin-if', password='pass')
|
||
resp = self.client.get(reverse('generate_report') + '?from_month=2026-03&to_month=2026-04')
|
||
html = resp.content.decode('utf-8')
|
||
|
||
# Extract the JSON payload inside <script id="projectTeamPairs">...</script>
|
||
match = re.search(
|
||
r'<script id="projectTeamPairs"[^>]*>(.*?)</script>',
|
||
html, re.DOTALL
|
||
)
|
||
self.assertIsNotNone(match, 'projectTeamPairs <script> tag missing')
|
||
payload = match.group(1).strip()
|
||
|
||
# Must parse to a LIST, not a string.
|
||
parsed = _json.loads(payload)
|
||
self.assertIsInstance(parsed, list,
|
||
"Double-encoded JSON regression: browser's JSON.parse "
|
||
"would return a string here, killing pairs.forEach() in the "
|
||
"pill-popover JS. See 2026-04-23 bugfix.")
|
||
# And the list members must be dicts with project_id + team_id
|
||
for entry in parsed:
|
||
self.assertIsInstance(entry, dict)
|
||
self.assertIn('project_id', entry)
|
||
self.assertIn('team_id', entry)
|
||
|
||
def test_pickers_and_pairs_are_date_scoped(self):
|
||
"""Checkpoint-1 refinement: projects/teams lists + the pair map
|
||
include only entries with WorkLog activity INSIDE the selected
|
||
date range — NOT entire-history entries. Entries that are in the
|
||
URL's `?project=` or `?team=` selection are always preserved,
|
||
though, so the user's pick can never vanish."""
|
||
# Add a third project/team that ONLY worked outside the report window
|
||
out_project = Project.objects.create(name='P-out')
|
||
out_team = Team.objects.create(name='T-out', supervisor=self.admin)
|
||
out_log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 1, 15), # outside March window
|
||
project=out_project, team=out_team, supervisor=self.admin,
|
||
)
|
||
out_log.workers.add(self.w)
|
||
|
||
self.client.login(username='admin-if', password='pass')
|
||
# Request only March 2026 — Jan logs should be excluded
|
||
resp = self.client.get(
|
||
reverse('generate_report') + '?from_month=2026-03&to_month=2026-03'
|
||
)
|
||
self.assertEqual(resp.status_code, 200)
|
||
|
||
# Picker lists: out-of-range project + team should NOT appear
|
||
project_ids = {p.id for p in resp.context['projects']}
|
||
team_ids = {t.id for t in resp.context['teams']}
|
||
self.assertIn(self.p1.id, project_ids)
|
||
self.assertIn(self.p2.id, project_ids)
|
||
self.assertNotIn(out_project.id, project_ids,
|
||
'Out-of-range project must not appear in the date-scoped list')
|
||
self.assertIn(self.t1.id, team_ids)
|
||
self.assertIn(self.t2.id, team_ids)
|
||
self.assertNotIn(out_team.id, team_ids,
|
||
'Out-of-range team must not appear in the date-scoped list')
|
||
|
||
# Pair map: must also be date-scoped
|
||
pair_set = {(p['project_id'], p['team_id'])
|
||
for p in resp.context['project_team_pairs_json']}
|
||
self.assertNotIn((out_project.id, out_team.id), pair_set,
|
||
'Out-of-range pair must not appear in the cross-filter map')
|
||
|
||
def test_url_selected_projects_survive_even_out_of_range(self):
|
||
"""A project explicitly in the URL's ?project= selection must
|
||
remain in the picker list even if it has no logs in the current
|
||
date range — otherwise the user couldn't see (or deselect) their
|
||
own pick."""
|
||
out_project = Project.objects.create(name='P-out')
|
||
# Never logs anything in any date range
|
||
|
||
self.client.login(username='admin-if', password='pass')
|
||
resp = self.client.get(
|
||
reverse('generate_report')
|
||
+ '?from_month=2026-03&to_month=2026-03'
|
||
+ f'&project={out_project.id}'
|
||
)
|
||
self.assertEqual(resp.status_code, 200)
|
||
project_ids = {p.id for p in resp.context['projects']}
|
||
self.assertIn(out_project.id, project_ids,
|
||
'URL-selected project must survive the date-scope filter')
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR |type_slug FILTER ===
|
||
# Used by Adjustments tab to build CSS class names from type labels.
|
||
# =============================================================================
|
||
|
||
|
||
class TypeSlugFilterTests(TestCase):
|
||
"""format_tags.type_slug converts adjustment-type labels to slugs."""
|
||
|
||
def test_spaces_become_hyphens_and_lowercased(self):
|
||
from core.templatetags.format_tags import type_slug
|
||
self.assertEqual(type_slug('Advance Payment'), 'advance-payment')
|
||
self.assertEqual(type_slug('New Loan'), 'new-loan')
|
||
self.assertEqual(type_slug('Bonus'), 'bonus')
|
||
|
||
def test_empty_or_none_returns_empty_string(self):
|
||
from core.templatetags.format_tags import type_slug
|
||
self.assertEqual(type_slug(''), '')
|
||
self.assertEqual(type_slug(None), '')
|
||
|
||
def test_idempotent_on_already_slugged(self):
|
||
from core.templatetags.format_tags import type_slug
|
||
self.assertEqual(type_slug('bonus'), 'bonus')
|
||
|
||
|
||
# =============================================================================
|
||
# === TESTS FOR ADJUSTMENTS TAB (payroll_dashboard ?status=adjustments) ===
|
||
# Covers the new tab's backend: filters, sort, stats, pagination.
|
||
# Each test creates its own fresh fixture via setUp.
|
||
# NOTE: PayrollRecord only accepts worker/date/amount_paid (see core/models.py).
|
||
# The plan spec used days_worked/total_amount — those do NOT exist. Adapted.
|
||
# =============================================================================
|
||
|
||
|
||
class AdjustmentsTabTests(TestCase):
|
||
"""New Adjustments tab on /payroll/?status=adjustments."""
|
||
|
||
def setUp(self):
|
||
self.admin = User.objects.create_user(
|
||
username='adj-admin', password='pass', is_staff=True, is_superuser=True
|
||
)
|
||
self.sup = User.objects.create_user(
|
||
username='adj-sup', password='pass'
|
||
)
|
||
self.w1 = Worker.objects.create(
|
||
name='Alice', id_number='A1', monthly_salary=Decimal('4000')
|
||
)
|
||
self.w2 = Worker.objects.create(
|
||
name='Bob', id_number='B1', monthly_salary=Decimal('4000')
|
||
)
|
||
# Two teams, BOTH workers in BOTH teams, so the naive M2M JOIN
|
||
# multiplies rows by team count. Exercises the subquery fix.
|
||
self.team = Team.objects.create(name='Alpha', supervisor=self.admin)
|
||
self.team2 = Team.objects.create(name='Beta', supervisor=self.admin)
|
||
self.team.workers.add(self.w1, self.w2)
|
||
self.team2.workers.add(self.w1, self.w2)
|
||
self.proj = Project.objects.create(name='Site X')
|
||
# 3 unpaid adjustments — 1 bonus Alice, 1 bonus Bob, 1 deduction Alice
|
||
self.a1 = PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='Bonus',
|
||
amount=Decimal('500'), date=datetime.date(2026, 4, 10),
|
||
description='April bonus',
|
||
)
|
||
self.a2 = PayrollAdjustment.objects.create(
|
||
worker=self.w2, project=self.proj, type='Bonus',
|
||
amount=Decimal('300'), date=datetime.date(2026, 4, 11),
|
||
description='Project milestone',
|
||
)
|
||
self.a3 = PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='Deduction',
|
||
amount=Decimal('100'), date=datetime.date(2026, 3, 28),
|
||
description='Missing tool',
|
||
)
|
||
self.url = reverse('payroll_dashboard') + '?status=adjustments'
|
||
|
||
def _login_admin(self):
|
||
self.client.login(username='adj-admin', password='pass')
|
||
|
||
def test_admin_sees_adjustments_tab(self):
|
||
self._login_admin()
|
||
resp = self.client.get(self.url)
|
||
self.assertEqual(resp.status_code, 200)
|
||
self.assertEqual(resp.context['active_tab'], 'adjustments')
|
||
# All 3 fixture adjustments should be in the listing
|
||
self.assertEqual(len(resp.context['adj_page'].object_list), 3)
|
||
|
||
def test_supervisor_forbidden(self):
|
||
self.client.login(username='adj-sup', password='pass')
|
||
resp = self.client.get(self.url)
|
||
# Existing payroll_dashboard pattern: non-admin is redirected home
|
||
self.assertEqual(resp.status_code, 302)
|
||
|
||
def test_type_multi_filter(self):
|
||
"""?type=Bonus&type=Deduction returns the UNION (3 rows: 2 bonuses + 1
|
||
deduction), not the intersection."""
|
||
self._login_admin()
|
||
resp = self.client.get(self.url + '&type=Bonus&type=Deduction')
|
||
self.assertEqual(resp.context['adj_total_count'], 3)
|
||
ids = {a.id for a in resp.context['adj_page'].object_list}
|
||
self.assertEqual(ids, {self.a1.id, self.a2.id, self.a3.id})
|
||
|
||
def test_worker_multi_filter(self):
|
||
self._login_admin()
|
||
resp = self.client.get(self.url + f'&worker={self.w1.id}')
|
||
ids = {a.id for a in resp.context['adj_page'].object_list}
|
||
self.assertIn(self.a1.id, ids)
|
||
self.assertNotIn(self.a2.id, ids)
|
||
self.assertIn(self.a3.id, ids)
|
||
|
||
def test_team_filter_uses_subquery_no_inflation(self):
|
||
"""Filtering by team must NOT multiply rows. With 2 teams x 2 workers x 3
|
||
adjustments, a naive worker__teams__id__in filter would return 6 inflated
|
||
rows; the subquery pattern returns the true 3. See CLAUDE.md ORM gotcha."""
|
||
self._login_admin()
|
||
resp = self.client.get(
|
||
self.url + f'&team={self.team.id}&team={self.team2.id}'
|
||
)
|
||
# .count() at the queryset level would blow up under inflation —
|
||
# asserting it guards against regressions more strictly than checking
|
||
# the paginator's object_list length.
|
||
self.assertEqual(resp.context['adj_total_count'], 3)
|
||
self.assertEqual(len(resp.context['adj_page'].object_list), 3)
|
||
|
||
def test_status_filter_unpaid(self):
|
||
self._login_admin()
|
||
# Mark a1 as paid — PayrollRecord fields are worker/date/amount_paid
|
||
pr = PayrollRecord.objects.create(
|
||
worker=self.w1, date=datetime.date(2026, 4, 15),
|
||
amount_paid=Decimal('4000'),
|
||
)
|
||
self.a1.payroll_record = pr
|
||
self.a1.save()
|
||
resp = self.client.get(self.url + '&adj_status=unpaid')
|
||
ids = {a.id for a in resp.context['adj_page'].object_list}
|
||
self.assertNotIn(self.a1.id, ids)
|
||
self.assertIn(self.a2.id, ids)
|
||
self.assertIn(self.a3.id, ids)
|
||
|
||
def test_date_range_filter(self):
|
||
self._login_admin()
|
||
# March 1 to March 31 -> only a3 (dated 28 Mar)
|
||
resp = self.client.get(
|
||
self.url + '&adj_date_from=2026-03-01&adj_date_to=2026-03-31'
|
||
)
|
||
ids = {a.id for a in resp.context['adj_page'].object_list}
|
||
self.assertEqual(ids, {self.a3.id})
|
||
|
||
def test_stats_scoped_to_filtered_set(self):
|
||
self._login_admin()
|
||
resp = self.client.get(self.url + '&type=Bonus')
|
||
# 2 bonuses, 0 paid, total R 800 additive, 0 deductive
|
||
self.assertEqual(resp.context['adj_total_count'], 2)
|
||
self.assertEqual(resp.context['adj_unpaid_count'], 2)
|
||
self.assertEqual(resp.context['adj_additive_sum'], Decimal('800.00'))
|
||
self.assertEqual(resp.context['adj_deductive_sum'], Decimal('0.00'))
|
||
|
||
def test_group_by_type(self):
|
||
self._login_admin()
|
||
resp = self.client.get(self.url + '&group_by=type')
|
||
groups = resp.context['adj_groups']
|
||
self.assertIsNotNone(groups)
|
||
labels = {g['label'] for g in groups}
|
||
self.assertEqual(labels, {'Bonus', 'Deduction'})
|
||
bonus_group = next(g for g in groups if g['label'] == 'Bonus')
|
||
self.assertEqual(bonus_group['count'], 2)
|
||
self.assertEqual(bonus_group['net_sum'], Decimal('800.00')) # +R 500 + +R 300
|
||
deduction_group = next(g for g in groups if g['label'] == 'Deduction')
|
||
self.assertEqual(deduction_group['net_sum'], Decimal('-100.00'))
|
||
# Groups must be ordered by descending |net_sum| — biggest impact
|
||
# first. |800| > |100| so Bonus must come before Deduction.
|
||
self.assertEqual(groups[0]['label'], 'Bonus')
|
||
|
||
def test_group_by_worker(self):
|
||
self._login_admin()
|
||
resp = self.client.get(self.url + '&group_by=worker')
|
||
groups = resp.context['adj_groups']
|
||
self.assertIsNotNone(groups)
|
||
labels = {g['label'] for g in groups}
|
||
self.assertEqual(labels, {'Alice', 'Bob'})
|
||
alice = next(g for g in groups if g['label'] == 'Alice')
|
||
# Alice: +R 500 bonus + (-R 100) deduction = +R 400 net
|
||
self.assertEqual(alice['count'], 2)
|
||
self.assertEqual(alice['net_sum'], Decimal('400.00'))
|
||
|
||
def test_bulk_delete_only_affects_unpaid(self):
|
||
"""POST /payroll/adjustments/bulk-delete/ with mixed paid+unpaid IDs
|
||
deletes ONLY the unpaid rows. Paid rows are untouched (payroll
|
||
history is immutable)."""
|
||
self._login_admin()
|
||
# Pay a1 (leave a2, a3 unpaid)
|
||
pr = PayrollRecord.objects.create(
|
||
worker=self.w1, date=datetime.date(2026, 4, 15),
|
||
amount_paid=Decimal('4000'),
|
||
)
|
||
self.a1.payroll_record = pr
|
||
self.a1.save()
|
||
resp = self.client.post(
|
||
reverse('bulk_delete_adjustments'),
|
||
{'adjustment_ids': [self.a1.id, self.a2.id, self.a3.id]},
|
||
)
|
||
self.assertEqual(resp.status_code, 200)
|
||
body = resp.json()
|
||
self.assertEqual(body['deleted'], 2)
|
||
self.assertEqual(body['requested'], 3)
|
||
# a1 survives (paid), a2 + a3 gone
|
||
self.assertTrue(PayrollAdjustment.objects.filter(id=self.a1.id).exists())
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=self.a2.id).exists())
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=self.a3.id).exists())
|
||
|
||
def test_bulk_delete_requires_admin(self):
|
||
"""Non-admin supervisors cannot bulk-delete adjustments."""
|
||
self.client.login(username='adj-sup', password='pass')
|
||
resp = self.client.post(
|
||
reverse('bulk_delete_adjustments'),
|
||
{'adjustment_ids': [self.a1.id]},
|
||
)
|
||
self.assertEqual(resp.status_code, 403)
|
||
# a1 still present
|
||
self.assertTrue(PayrollAdjustment.objects.filter(id=self.a1.id).exists())
|
||
|
||
def test_bulk_delete_cascades_new_loan(self):
|
||
"""Bulk-deleting a 'New Loan' adjustment must also delete its
|
||
linked Loan row AND any still-unpaid Loan Repayment adjustments
|
||
— same cascade as the single-row delete_adjustment view. Without
|
||
this, the bulk endpoint would orphan Loan rows and leave pending
|
||
repayments in place."""
|
||
# Create a Loan + New Loan adjustment + unpaid repayment
|
||
loan = Loan.objects.create(
|
||
worker=self.w1,
|
||
principal_amount=Decimal('1000'),
|
||
remaining_balance=Decimal('1000'),
|
||
date=datetime.date(2026, 4, 1),
|
||
loan_type='loan',
|
||
)
|
||
new_loan_adj = PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='New Loan',
|
||
amount=Decimal('1000'), date=datetime.date(2026, 4, 1),
|
||
description='Test loan', loan=loan,
|
||
)
|
||
unpaid_repayment = PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='Loan Repayment',
|
||
amount=Decimal('500'), date=datetime.date(2026, 5, 1),
|
||
description='First repayment', loan=loan,
|
||
)
|
||
|
||
self._login_admin()
|
||
resp = self.client.post(
|
||
reverse('bulk_delete_adjustments'),
|
||
{'adjustment_ids': [new_loan_adj.id]},
|
||
)
|
||
self.assertEqual(resp.status_code, 200)
|
||
body = resp.json()
|
||
self.assertEqual(body['deleted'], 1)
|
||
|
||
# New Loan adjustment gone
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=new_loan_adj.id).exists())
|
||
# Linked Loan row gone (cascade)
|
||
self.assertFalse(Loan.objects.filter(id=loan.id).exists())
|
||
# Unpaid repayment gone (cascade)
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=unpaid_repayment.id).exists())
|
||
|
||
def test_bulk_delete_skips_loan_with_paid_repayments(self):
|
||
"""If a 'New Loan' has any paid repayments, bulk-delete must
|
||
refuse to delete it (would lose audit trail). Other rows in the
|
||
batch are unaffected."""
|
||
loan = Loan.objects.create(
|
||
worker=self.w1,
|
||
principal_amount=Decimal('1000'),
|
||
remaining_balance=Decimal('500'),
|
||
date=datetime.date(2026, 4, 1),
|
||
loan_type='loan',
|
||
)
|
||
new_loan_adj = PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='New Loan',
|
||
amount=Decimal('1000'), date=datetime.date(2026, 4, 1),
|
||
description='Test loan', loan=loan,
|
||
)
|
||
# One PAID repayment
|
||
pr = PayrollRecord.objects.create(
|
||
worker=self.w1, date=datetime.date(2026, 5, 1),
|
||
amount_paid=Decimal('500'),
|
||
)
|
||
PayrollAdjustment.objects.create(
|
||
worker=self.w1, project=self.proj, type='Loan Repayment',
|
||
amount=Decimal('500'), date=datetime.date(2026, 5, 1),
|
||
description='Paid', loan=loan, payroll_record=pr,
|
||
)
|
||
|
||
self._login_admin()
|
||
# Send the New Loan plus a2 (unpaid Bonus) — expect only a2 to delete
|
||
resp = self.client.post(
|
||
reverse('bulk_delete_adjustments'),
|
||
{'adjustment_ids': [new_loan_adj.id, self.a2.id]},
|
||
)
|
||
self.assertEqual(resp.status_code, 200)
|
||
body = resp.json()
|
||
self.assertEqual(body['deleted'], 1) # only a2
|
||
self.assertEqual(body['requested'], 2)
|
||
self.assertEqual(body['skipped_reasons'], {'has_paid_repayments': 1})
|
||
|
||
# New Loan survives, Loan survives, a2 gone
|
||
self.assertTrue(PayrollAdjustment.objects.filter(id=new_loan_adj.id).exists())
|
||
self.assertTrue(Loan.objects.filter(id=loan.id).exists())
|
||
self.assertFalse(PayrollAdjustment.objects.filter(id=self.a2.id).exists())
|
||
|
||
def test_team_worker_pairs_json_context_key(self):
|
||
"""Cross-filter map is a raw Python list of {team_id, worker_id}
|
||
dicts. Django's |json_script filter handles serialisation at
|
||
template render time (no double-encoding — see the 2026-04-23
|
||
inline-filters regression test)."""
|
||
self._login_admin()
|
||
resp = self.client.get(self.url)
|
||
pairs = resp.context['team_worker_pairs_json']
|
||
self.assertIsInstance(pairs, list)
|
||
for entry in pairs:
|
||
self.assertIn('team_id', entry)
|
||
self.assertIn('worker_id', entry)
|
||
# Our fixture: two teams (Alpha + Beta) with both workers in each
|
||
pair_set = {(p['team_id'], p['worker_id']) for p in pairs}
|
||
self.assertIn((self.team.id, self.w1.id), pair_set)
|
||
self.assertIn((self.team.id, self.w2.id), pair_set)
|
||
self.assertIn((self.team2.id, self.w1.id), pair_set)
|
||
self.assertIn((self.team2.id, self.w2.id), pair_set)
|
||
|
||
|
||
# === Cache-bust token tests ===
|
||
# The deployment_timestamp context variable controls the ?v=... query
|
||
# string on our CSS URL. It MUST only change when custom.css changes
|
||
# — otherwise Cloudflare cache-HIT rate on the CSS drops to zero and
|
||
# every page reload re-fetches 64 KB from the VM.
|
||
|
||
class CacheBustTokenTests(TestCase):
|
||
"""Regression tests for the mtime-based cache-bust token."""
|
||
|
||
def test_token_is_an_integer(self):
|
||
"""Token must be int (templates cast to str; float would show a dot)."""
|
||
from core.context_processors import project_context
|
||
ctx = project_context(request=None)
|
||
self.assertIsInstance(ctx['deployment_timestamp'], int)
|
||
|
||
def test_token_is_stable_across_two_calls(self):
|
||
"""Critical property: two back-to-back calls return the same
|
||
token — because custom.css hasn't changed between them. This
|
||
is the entire point of the mtime-based approach."""
|
||
from core.context_processors import project_context
|
||
t1 = project_context(request=None)['deployment_timestamp']
|
||
t2 = project_context(request=None)['deployment_timestamp']
|
||
self.assertEqual(t1, t2)
|
||
|
||
def test_token_falls_back_if_file_missing(self):
|
||
"""If static/css/custom.css is somehow missing (fresh
|
||
container pre-collectstatic), we must NOT crash. We fall back
|
||
to int(time.time()) so every page still renders."""
|
||
import core.context_processors as cp
|
||
try:
|
||
# Monkey-patch so the function sees a guaranteed-missing path.
|
||
# (The function itself is not patched — only the path constant.)
|
||
cp._CSS_PATH_FOR_TOKEN = cp.Path('/definitely/does/not/exist.css')
|
||
# Should return an int and NOT raise.
|
||
token = cp._compute_cache_bust_token()
|
||
self.assertIsInstance(token, int)
|
||
finally:
|
||
# Reset the module-level path constant so other tests (or reruns)
|
||
# get the real CSS file back.
|
||
cp._CSS_PATH_FOR_TOKEN = cp.Path(settings.BASE_DIR) / 'static' / 'css' / 'custom.css'
|
||
|
||
|
||
# === REGRESSION: adjustment project-attribution double-count ===
|
||
# Ticket summary: commit 61c485f replaced a per-project loop using SQL
|
||
# OR (`Q(project=P) | Q(work_log__project=P)`) with two SEPARATE
|
||
# filter+GROUP BY queries that were summed in Python. Any adjustment
|
||
# with BOTH `project_id` AND `work_log.project_id` set matched both
|
||
# queries and got counted TWICE.
|
||
#
|
||
# Every Overtime adjustment fits that shape — `price_overtime()` in
|
||
# views.py sets both FKs to the same project. So every unpaid
|
||
# Overtime silently inflated the outstanding-costs dashboard by its
|
||
# own amount. This test locks in the "count once" behaviour.
|
||
class PayrollDashboardAdjustmentAggregationTests(TestCase):
|
||
"""Regression tests for the per-project adjustment aggregation used by
|
||
the payroll dashboard. An adjustment with both `project` and
|
||
`work_log.project` set must contribute its amount ONCE to that
|
||
project, not twice."""
|
||
|
||
def setUp(self):
|
||
# Admin so the payroll_dashboard view accepts us (is_admin helper
|
||
# returns True for is_staff OR is_superuser).
|
||
self.admin = User.objects.create_user(
|
||
username='double-count-admin',
|
||
password='pw',
|
||
is_staff=True,
|
||
is_superuser=True,
|
||
)
|
||
self.client.force_login(self.admin)
|
||
|
||
# One active project, one worker, one fresh work log.
|
||
self.project = Project.objects.create(
|
||
name='Double-Count Test Site',
|
||
start_date=datetime.date(2026, 4, 1),
|
||
active=True,
|
||
)
|
||
self.worker = Worker.objects.create(
|
||
name='DC Worker',
|
||
id_number='DC1',
|
||
monthly_salary=Decimal('10000'),
|
||
)
|
||
self.work_log = WorkLog.objects.create(
|
||
date=datetime.date(2026, 4, 23),
|
||
project=self.project,
|
||
supervisor=self.admin,
|
||
)
|
||
# NOTE: deliberately no workers on the log — we do NOT want
|
||
# unpaid-log wage cost to pollute this test; we only want to
|
||
# measure the adjustment contribution.
|
||
|
||
# THE KEY SHAPE: one unpaid adjustment with BOTH FKs set to the
|
||
# same project. This mirrors how price_overtime() creates every
|
||
# Overtime adjustment (adj.project = worklog.project, and
|
||
# adj.work_log = worklog).
|
||
PayrollAdjustment.objects.create(
|
||
worker=self.worker,
|
||
type='Overtime', # additive → contributes positively
|
||
amount=Decimal('500'),
|
||
project=self.project,
|
||
work_log=self.work_log,
|
||
date=datetime.date(2026, 4, 23),
|
||
description='Regression-test Overtime',
|
||
)
|
||
|
||
def test_overtime_with_both_project_and_work_log_counts_once(self):
|
||
"""An unpaid Overtime with BOTH project + work_log.project set
|
||
must contribute its R500 amount once to outstanding-costs —
|
||
not R1000.
|
||
|
||
Before the Coalesce fix, the batched aggregates summed the
|
||
direct-project group + the work_log-project group separately
|
||
in Python, so an Overtime adjustment landed in both and got
|
||
double-counted. After the fix (Coalesce('project_id',
|
||
'work_log__project_id')) each adjustment row is attributed to
|
||
exactly one effective project.
|
||
"""
|
||
response = self.client.get(reverse('payroll_dashboard'))
|
||
self.assertEqual(response.status_code, 200)
|
||
outstanding = response.context['outstanding_project_costs']
|
||
|
||
# Find the entry for our test project by name (the shape is
|
||
# {'name': str, 'cost': Decimal} — no 'id' key).
|
||
ours = next(
|
||
(p for p in outstanding if p['name'] == self.project.name),
|
||
None,
|
||
)
|
||
self.assertIsNotNone(
|
||
ours,
|
||
"Test project should appear in outstanding_project_costs "
|
||
"(its unpaid Overtime is non-zero).",
|
||
)
|
||
self.assertEqual(
|
||
ours['cost'],
|
||
Decimal('500'),
|
||
"Overtime adjustment with both project + work_log.project "
|
||
"FKs set must count ONCE (R500), not twice (R1000). If "
|
||
"this fails with R1000 the project-attribution double-"
|
||
"count bug has reappeared.",
|
||
)
|