diff --git a/core/tests.py b/core/tests.py index 924e8e7..0ad7770 100644 --- a/core/tests.py +++ b/core/tests.py @@ -1734,3 +1734,165 @@ class AbsenceModelTests(TestCase): Absence.objects.create(worker=self.worker, date=_date(2026, 5, 1), reason='sick') Absence.objects.create(worker=self.worker, date=_date(2026, 5, 2), reason='family') self.assertEqual(self.worker.absences.count(), 2) + + +# ============================================================================= +# === ABSENCE HELPER TESTS (Task 2) === +# Tests for the two shared helpers in core/views.py that every Absence +# save/edit/delete path will route through: +# - _sync_absence_payroll_adjustment: keeps is_paid in sync with the +# linked Bonus PayrollAdjustment (create / delete / refuse-if-paid). +# - _absence_user_queryset: scopes visibility by admin vs supervisor. +# ============================================================================= + + +class AbsencePayrollSyncTests(TestCase): + """The _sync_absence_payroll_adjustment helper keeps Absence.payroll_adjustment + in sync with Absence.is_paid. Mirrors the cascade pattern used for + Advance Payment → Loan auto-creation.""" + + @classmethod + def setUpTestData(cls): + cls.user = User.objects.create_user(username='admin', is_staff=True) + cls.worker = Worker.objects.create( + name='Sipho Dlamini', id_number='8501015800087', + monthly_salary=Decimal('6000.00'), + ) + cls.project = Project.objects.create(name='Solar Farm Alpha') + + def _make_absence(self, **kw): + defaults = dict( + worker=self.worker, date=_date(2026, 5, 14), reason='sick', + logged_by=self.user, is_paid=False, + ) + defaults.update(kw) + return Absence.objects.create(**defaults) + + def test_is_paid_false_no_adjustment(self): + """Unpaid absence: sync is a no-op, no adjustment created.""" + from core.views import _sync_absence_payroll_adjustment + a = self._make_absence(is_paid=False) + _sync_absence_payroll_adjustment(a) + self.assertIsNone(a.payroll_adjustment) + self.assertEqual(PayrollAdjustment.objects.count(), 0) + + def test_is_paid_true_creates_bonus(self): + """Paid absence creates a Bonus PayrollAdjustment at daily_rate.""" + from core.views import _sync_absence_payroll_adjustment + a = self._make_absence(is_paid=True) + _sync_absence_payroll_adjustment(a) + a.refresh_from_db() + self.assertIsNotNone(a.payroll_adjustment) + adj = a.payroll_adjustment + self.assertEqual(adj.type, 'Bonus') + # daily_rate = monthly_salary / 20 = 6000 / 20 = 300 + self.assertEqual(adj.amount, Decimal('300.00')) + self.assertEqual(adj.worker, self.worker) + self.assertEqual(adj.date, _date(2026, 5, 14)) + + def test_toggle_paid_off_deletes_adjustment(self): + """is_paid True → False: existing adjustment is deleted.""" + from core.views import _sync_absence_payroll_adjustment + a = self._make_absence(is_paid=True) + _sync_absence_payroll_adjustment(a) + self.assertEqual(PayrollAdjustment.objects.count(), 1) + + a.is_paid = False + a.save() + _sync_absence_payroll_adjustment(a) + a.refresh_from_db() + self.assertIsNone(a.payroll_adjustment) + self.assertEqual(PayrollAdjustment.objects.count(), 0) + + def test_toggle_paid_on_creates_fresh(self): + """is_paid False → True: new adjustment is created.""" + from core.views import _sync_absence_payroll_adjustment + a = self._make_absence(is_paid=False) + _sync_absence_payroll_adjustment(a) + self.assertIsNone(a.payroll_adjustment) + + a.is_paid = True + a.save() + _sync_absence_payroll_adjustment(a) + a.refresh_from_db() + self.assertIsNotNone(a.payroll_adjustment) + + def test_paid_with_existing_adj_is_idempotent(self): + """is_paid True + adjustment already linked → leave it alone. + Admin-edited amount must survive a re-sync.""" + from core.views import _sync_absence_payroll_adjustment + a = self._make_absence(is_paid=True) + _sync_absence_payroll_adjustment(a) + a.refresh_from_db() + original_adj = a.payroll_adjustment + self.assertIsNotNone(original_adj) + + # Admin manually edits the amount (e.g. half-day pay) + original_adj.amount = Decimal('150.00') + original_adj.save() + + # Second sync call — must not destroy admin's edit + _sync_absence_payroll_adjustment(a) + a.refresh_from_db() + self.assertEqual(a.payroll_adjustment_id, original_adj.id) + self.assertEqual(a.payroll_adjustment.amount, Decimal('150.00')) + self.assertEqual(PayrollAdjustment.objects.count(), 1) + + def test_refuses_if_adjustment_already_paid(self): + """If the linked adjustment has a PayrollRecord (i.e. already paid), + the sync helper refuses to delete it — surface to admin instead.""" + from core.views import _sync_absence_payroll_adjustment + a = self._make_absence(is_paid=True) + _sync_absence_payroll_adjustment(a) + a.refresh_from_db() + + # Simulate the adjustment being paid by attaching a PayrollRecord + pr = PayrollRecord.objects.create(worker=self.worker, amount_paid=Decimal('300.00'), date=_date(2026, 5, 30)) + a.payroll_adjustment.payroll_record = pr + a.payroll_adjustment.save() + + a.is_paid = False + a.save() + with self.assertRaises(ValueError): + _sync_absence_payroll_adjustment(a) + # Adjustment still exists + self.assertEqual(PayrollAdjustment.objects.count(), 1) + + +class AbsenceUserQuerysetTests(TestCase): + """_absence_user_queryset scopes the queryset to admin (all) or supervisor + (workers in their supervised teams).""" + + @classmethod + def setUpTestData(cls): + cls.admin = User.objects.create_user(username='admin', is_staff=True) + cls.sup_a = User.objects.create_user(username='sup_a') + cls.sup_b = User.objects.create_user(username='sup_b') + cls.outsider = User.objects.create_user(username='out') + + cls.worker_a = Worker.objects.create(name='WA', id_number='1', monthly_salary=Decimal('6000')) + cls.worker_b = Worker.objects.create(name='WB', id_number='2', monthly_salary=Decimal('6000')) + + cls.team_a = Team.objects.create(name='TA', supervisor=cls.sup_a) + cls.team_a.workers.add(cls.worker_a) + cls.team_b = Team.objects.create(name='TB', supervisor=cls.sup_b) + cls.team_b.workers.add(cls.worker_b) + + Absence.objects.create(worker=cls.worker_a, date=_date(2026, 5, 1), reason='sick') + Absence.objects.create(worker=cls.worker_b, date=_date(2026, 5, 1), reason='annual') + + def test_admin_sees_all(self): + from core.views import _absence_user_queryset + qs = _absence_user_queryset(self.admin) + self.assertEqual(qs.count(), 2) + + def test_supervisor_sees_only_own_team(self): + from core.views import _absence_user_queryset + qs_a = _absence_user_queryset(self.sup_a) + self.assertEqual(qs_a.count(), 1) + self.assertEqual(qs_a.first().worker, self.worker_a) + + def test_outsider_sees_none(self): + from core.views import _absence_user_queryset + qs = _absence_user_queryset(self.outsider) + self.assertEqual(qs.count(), 0) diff --git a/core/views.py b/core/views.py index d4b79b3..9ab5583 100644 --- a/core/views.py +++ b/core/views.py @@ -28,6 +28,7 @@ from .models import ( Worker, Project, WorkLog, Team, PayrollRecord, Loan, PayrollAdjustment, WorkerCertificate, WorkerWarning, SiteReport, + Absence, ) from .forms import ( AttendanceLogForm, PayrollAdjustmentForm, ExpenseReceiptForm, ExpenseLineItemFormSet, @@ -4294,6 +4295,105 @@ def _delete_adjustment_with_cascade(adj): return True, None +# ============================================================================= +# === ABSENCE HELPERS === +# Two helpers used by every save/edit/delete path on Absence rows. +# Kept here (not on the model) so admin / supervisor screens can +# react to "the linked adjustment is already paid" — silent +# model-side cascades would hide that case from the user. +# ============================================================================= + + +def _absence_user_queryset(user): + """Return the Absence rows this user is allowed to see. + + - Admin (is_staff or is_superuser): every Absence in the system. + - Supervisor (or anyone else assigned to a team): only Absences + for workers in any team they supervise. + - Anyone else: an empty queryset. + + The supervisor branch uses `worker__teams__supervisor=user`, which + walks Worker -> Team (M2M) -> Team.supervisor (FK). `.distinct()` + deduplicates the rows in case a worker is on multiple teams the + user supervises. + """ + if is_admin(user): + return Absence.objects.all() + return Absence.objects.filter( + worker__teams__supervisor=user + ).distinct() + + +def _sync_absence_payroll_adjustment(absence): + """Keep `absence.payroll_adjustment` in sync with `absence.is_paid`. + + Behaviour by case: + - is_paid=False + no adjustment -> no-op, return None. + - is_paid=False + adjustment exists -> delete the adjustment + and clear the FK. BUT if the adjustment has already been paid + (i.e. has a linked PayrollRecord), raise ValueError so the + caller can surface the situation to the admin instead of + silently destroying payroll history. + - is_paid=True + no adjustment -> create a fresh Bonus + PayrollAdjustment at the worker's daily_rate, link it on the + absence, return it. + - is_paid=True + adjustment exists -> leave it alone. The + admin may have edited the amount manually; the helper doesn't + second-guess that. + + Returns the linked PayrollAdjustment (or None when unpaid). + + NOTE: the DB value 'Bonus' (Title Case) is used here — see + CLAUDE.md "UI-vs-DB naming drift" section. Do not change to + 'bonus' or 'Sick Pay' or any variation; the rest of the payroll + code keys off the exact string 'Bonus'. + + The whole body runs inside a single transaction.atomic() block so + that each branch's two DB operations (create/delete + save) either + both succeed or both roll back. Without this, a crash between the + PayrollAdjustment.create() and the absence.save() would leave an + orphaned adjustment with no Absence link. + """ + with transaction.atomic(): + adj = absence.payroll_adjustment + + # === UNPAID BRANCH === + if not absence.is_paid: + # No adjustment to clean up — nothing to do. + if adj is None: + return None + # Refuse to delete an already-paid adjustment. Caller decides + # how to tell the user (toast, inline error, etc.). + if adj.payroll_record_id is not None: + raise ValueError( + 'Linked PayrollAdjustment has already been paid ' + '(via PayrollRecord). Cannot auto-delete.' + ) + # Safe to delete — adjustment is still unpaid. + adj.delete() + absence.payroll_adjustment = None + absence.save(update_fields=['payroll_adjustment']) + return None + + # === PAID BRANCH === + # Already linked — keep the existing adjustment (admin may have + # tweaked the amount; do not overwrite that). + if adj is not None: + return adj + + # Create a fresh Bonus adjustment at the worker's daily rate. + new_adj = PayrollAdjustment.objects.create( + worker=absence.worker, + type='Bonus', # DB value (Title Case) — see CLAUDE.md naming-drift section + amount=absence.worker.daily_rate, + date=absence.date, + description=f'Paid {absence.get_reason_display().lower()} — auto-created from Absence #{absence.id}', + ) + absence.payroll_adjustment = new_adj + absence.save(update_fields=['payroll_adjustment']) + return new_adj + + @login_required def delete_adjustment(request, adj_id): if request.method != 'POST':