feat(absences): _absence_user_queryset + _sync_absence_payroll_adjustment
Two helpers covering the recurring 'which absences can this user see' and 'sync is_paid with the linked Bonus PayrollAdjustment' patterns. The sync helper refuses to delete an already-paid adjustment — caller surfaces this to the user. Mirrors _delete_adjustment_with_cascade semantics. 8 tests.
This commit is contained in:
parent
bf6f0a5c74
commit
90c0e57659
162
core/tests.py
162
core/tests.py
@ -1734,3 +1734,165 @@ class AbsenceModelTests(TestCase):
|
||||
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 1), reason='sick')
|
||||
Absence.objects.create(worker=self.worker, date=_date(2026, 5, 2), reason='family')
|
||||
self.assertEqual(self.worker.absences.count(), 2)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# === ABSENCE HELPER TESTS (Task 2) ===
|
||||
# Tests for the two shared helpers in core/views.py that every Absence
|
||||
# save/edit/delete path will route through:
|
||||
# - _sync_absence_payroll_adjustment: keeps is_paid in sync with the
|
||||
# linked Bonus PayrollAdjustment (create / delete / refuse-if-paid).
|
||||
# - _absence_user_queryset: scopes visibility by admin vs supervisor.
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class AbsencePayrollSyncTests(TestCase):
|
||||
"""The _sync_absence_payroll_adjustment helper keeps Absence.payroll_adjustment
|
||||
in sync with Absence.is_paid. Mirrors the cascade pattern used for
|
||||
Advance Payment → Loan auto-creation."""
|
||||
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
cls.user = User.objects.create_user(username='admin', is_staff=True)
|
||||
cls.worker = Worker.objects.create(
|
||||
name='Sipho Dlamini', id_number='8501015800087',
|
||||
monthly_salary=Decimal('6000.00'),
|
||||
)
|
||||
cls.project = Project.objects.create(name='Solar Farm Alpha')
|
||||
|
||||
def _make_absence(self, **kw):
|
||||
defaults = dict(
|
||||
worker=self.worker, date=_date(2026, 5, 14), reason='sick',
|
||||
logged_by=self.user, is_paid=False,
|
||||
)
|
||||
defaults.update(kw)
|
||||
return Absence.objects.create(**defaults)
|
||||
|
||||
def test_is_paid_false_no_adjustment(self):
|
||||
"""Unpaid absence: sync is a no-op, no adjustment created."""
|
||||
from core.views import _sync_absence_payroll_adjustment
|
||||
a = self._make_absence(is_paid=False)
|
||||
_sync_absence_payroll_adjustment(a)
|
||||
self.assertIsNone(a.payroll_adjustment)
|
||||
self.assertEqual(PayrollAdjustment.objects.count(), 0)
|
||||
|
||||
def test_is_paid_true_creates_bonus(self):
|
||||
"""Paid absence creates a Bonus PayrollAdjustment at daily_rate."""
|
||||
from core.views import _sync_absence_payroll_adjustment
|
||||
a = self._make_absence(is_paid=True)
|
||||
_sync_absence_payroll_adjustment(a)
|
||||
a.refresh_from_db()
|
||||
self.assertIsNotNone(a.payroll_adjustment)
|
||||
adj = a.payroll_adjustment
|
||||
self.assertEqual(adj.type, 'Bonus')
|
||||
# daily_rate = monthly_salary / 20 = 6000 / 20 = 300
|
||||
self.assertEqual(adj.amount, Decimal('300.00'))
|
||||
self.assertEqual(adj.worker, self.worker)
|
||||
self.assertEqual(adj.date, _date(2026, 5, 14))
|
||||
|
||||
def test_toggle_paid_off_deletes_adjustment(self):
|
||||
"""is_paid True → False: existing adjustment is deleted."""
|
||||
from core.views import _sync_absence_payroll_adjustment
|
||||
a = self._make_absence(is_paid=True)
|
||||
_sync_absence_payroll_adjustment(a)
|
||||
self.assertEqual(PayrollAdjustment.objects.count(), 1)
|
||||
|
||||
a.is_paid = False
|
||||
a.save()
|
||||
_sync_absence_payroll_adjustment(a)
|
||||
a.refresh_from_db()
|
||||
self.assertIsNone(a.payroll_adjustment)
|
||||
self.assertEqual(PayrollAdjustment.objects.count(), 0)
|
||||
|
||||
def test_toggle_paid_on_creates_fresh(self):
|
||||
"""is_paid False → True: new adjustment is created."""
|
||||
from core.views import _sync_absence_payroll_adjustment
|
||||
a = self._make_absence(is_paid=False)
|
||||
_sync_absence_payroll_adjustment(a)
|
||||
self.assertIsNone(a.payroll_adjustment)
|
||||
|
||||
a.is_paid = True
|
||||
a.save()
|
||||
_sync_absence_payroll_adjustment(a)
|
||||
a.refresh_from_db()
|
||||
self.assertIsNotNone(a.payroll_adjustment)
|
||||
|
||||
def test_paid_with_existing_adj_is_idempotent(self):
|
||||
"""is_paid True + adjustment already linked → leave it alone.
|
||||
Admin-edited amount must survive a re-sync."""
|
||||
from core.views import _sync_absence_payroll_adjustment
|
||||
a = self._make_absence(is_paid=True)
|
||||
_sync_absence_payroll_adjustment(a)
|
||||
a.refresh_from_db()
|
||||
original_adj = a.payroll_adjustment
|
||||
self.assertIsNotNone(original_adj)
|
||||
|
||||
# Admin manually edits the amount (e.g. half-day pay)
|
||||
original_adj.amount = Decimal('150.00')
|
||||
original_adj.save()
|
||||
|
||||
# Second sync call — must not destroy admin's edit
|
||||
_sync_absence_payroll_adjustment(a)
|
||||
a.refresh_from_db()
|
||||
self.assertEqual(a.payroll_adjustment_id, original_adj.id)
|
||||
self.assertEqual(a.payroll_adjustment.amount, Decimal('150.00'))
|
||||
self.assertEqual(PayrollAdjustment.objects.count(), 1)
|
||||
|
||||
def test_refuses_if_adjustment_already_paid(self):
|
||||
"""If the linked adjustment has a PayrollRecord (i.e. already paid),
|
||||
the sync helper refuses to delete it — surface to admin instead."""
|
||||
from core.views import _sync_absence_payroll_adjustment
|
||||
a = self._make_absence(is_paid=True)
|
||||
_sync_absence_payroll_adjustment(a)
|
||||
a.refresh_from_db()
|
||||
|
||||
# Simulate the adjustment being paid by attaching a PayrollRecord
|
||||
pr = PayrollRecord.objects.create(worker=self.worker, amount_paid=Decimal('300.00'), date=_date(2026, 5, 30))
|
||||
a.payroll_adjustment.payroll_record = pr
|
||||
a.payroll_adjustment.save()
|
||||
|
||||
a.is_paid = False
|
||||
a.save()
|
||||
with self.assertRaises(ValueError):
|
||||
_sync_absence_payroll_adjustment(a)
|
||||
# Adjustment still exists
|
||||
self.assertEqual(PayrollAdjustment.objects.count(), 1)
|
||||
|
||||
|
||||
class AbsenceUserQuerysetTests(TestCase):
|
||||
"""_absence_user_queryset scopes the queryset to admin (all) or supervisor
|
||||
(workers in their supervised teams)."""
|
||||
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
cls.admin = User.objects.create_user(username='admin', is_staff=True)
|
||||
cls.sup_a = User.objects.create_user(username='sup_a')
|
||||
cls.sup_b = User.objects.create_user(username='sup_b')
|
||||
cls.outsider = User.objects.create_user(username='out')
|
||||
|
||||
cls.worker_a = Worker.objects.create(name='WA', id_number='1', monthly_salary=Decimal('6000'))
|
||||
cls.worker_b = Worker.objects.create(name='WB', id_number='2', monthly_salary=Decimal('6000'))
|
||||
|
||||
cls.team_a = Team.objects.create(name='TA', supervisor=cls.sup_a)
|
||||
cls.team_a.workers.add(cls.worker_a)
|
||||
cls.team_b = Team.objects.create(name='TB', supervisor=cls.sup_b)
|
||||
cls.team_b.workers.add(cls.worker_b)
|
||||
|
||||
Absence.objects.create(worker=cls.worker_a, date=_date(2026, 5, 1), reason='sick')
|
||||
Absence.objects.create(worker=cls.worker_b, date=_date(2026, 5, 1), reason='annual')
|
||||
|
||||
def test_admin_sees_all(self):
|
||||
from core.views import _absence_user_queryset
|
||||
qs = _absence_user_queryset(self.admin)
|
||||
self.assertEqual(qs.count(), 2)
|
||||
|
||||
def test_supervisor_sees_only_own_team(self):
|
||||
from core.views import _absence_user_queryset
|
||||
qs_a = _absence_user_queryset(self.sup_a)
|
||||
self.assertEqual(qs_a.count(), 1)
|
||||
self.assertEqual(qs_a.first().worker, self.worker_a)
|
||||
|
||||
def test_outsider_sees_none(self):
|
||||
from core.views import _absence_user_queryset
|
||||
qs = _absence_user_queryset(self.outsider)
|
||||
self.assertEqual(qs.count(), 0)
|
||||
|
||||
100
core/views.py
100
core/views.py
@ -28,6 +28,7 @@ from .models import (
|
||||
Worker, Project, WorkLog, Team, PayrollRecord, Loan, PayrollAdjustment,
|
||||
WorkerCertificate, WorkerWarning,
|
||||
SiteReport,
|
||||
Absence,
|
||||
)
|
||||
from .forms import (
|
||||
AttendanceLogForm, PayrollAdjustmentForm, ExpenseReceiptForm, ExpenseLineItemFormSet,
|
||||
@ -4294,6 +4295,105 @@ def _delete_adjustment_with_cascade(adj):
|
||||
return True, None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# === ABSENCE HELPERS ===
|
||||
# Two helpers used by every save/edit/delete path on Absence rows.
|
||||
# Kept here (not on the model) so admin / supervisor screens can
|
||||
# react to "the linked adjustment is already paid" — silent
|
||||
# model-side cascades would hide that case from the user.
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def _absence_user_queryset(user):
|
||||
"""Return the Absence rows this user is allowed to see.
|
||||
|
||||
- Admin (is_staff or is_superuser): every Absence in the system.
|
||||
- Supervisor (or anyone else assigned to a team): only Absences
|
||||
for workers in any team they supervise.
|
||||
- Anyone else: an empty queryset.
|
||||
|
||||
The supervisor branch uses `worker__teams__supervisor=user`, which
|
||||
walks Worker -> Team (M2M) -> Team.supervisor (FK). `.distinct()`
|
||||
deduplicates the rows in case a worker is on multiple teams the
|
||||
user supervises.
|
||||
"""
|
||||
if is_admin(user):
|
||||
return Absence.objects.all()
|
||||
return Absence.objects.filter(
|
||||
worker__teams__supervisor=user
|
||||
).distinct()
|
||||
|
||||
|
||||
def _sync_absence_payroll_adjustment(absence):
|
||||
"""Keep `absence.payroll_adjustment` in sync with `absence.is_paid`.
|
||||
|
||||
Behaviour by case:
|
||||
- is_paid=False + no adjustment -> no-op, return None.
|
||||
- is_paid=False + adjustment exists -> delete the adjustment
|
||||
and clear the FK. BUT if the adjustment has already been paid
|
||||
(i.e. has a linked PayrollRecord), raise ValueError so the
|
||||
caller can surface the situation to the admin instead of
|
||||
silently destroying payroll history.
|
||||
- is_paid=True + no adjustment -> create a fresh Bonus
|
||||
PayrollAdjustment at the worker's daily_rate, link it on the
|
||||
absence, return it.
|
||||
- is_paid=True + adjustment exists -> leave it alone. The
|
||||
admin may have edited the amount manually; the helper doesn't
|
||||
second-guess that.
|
||||
|
||||
Returns the linked PayrollAdjustment (or None when unpaid).
|
||||
|
||||
NOTE: the DB value 'Bonus' (Title Case) is used here — see
|
||||
CLAUDE.md "UI-vs-DB naming drift" section. Do not change to
|
||||
'bonus' or 'Sick Pay' or any variation; the rest of the payroll
|
||||
code keys off the exact string 'Bonus'.
|
||||
|
||||
The whole body runs inside a single transaction.atomic() block so
|
||||
that each branch's two DB operations (create/delete + save) either
|
||||
both succeed or both roll back. Without this, a crash between the
|
||||
PayrollAdjustment.create() and the absence.save() would leave an
|
||||
orphaned adjustment with no Absence link.
|
||||
"""
|
||||
with transaction.atomic():
|
||||
adj = absence.payroll_adjustment
|
||||
|
||||
# === UNPAID BRANCH ===
|
||||
if not absence.is_paid:
|
||||
# No adjustment to clean up — nothing to do.
|
||||
if adj is None:
|
||||
return None
|
||||
# Refuse to delete an already-paid adjustment. Caller decides
|
||||
# how to tell the user (toast, inline error, etc.).
|
||||
if adj.payroll_record_id is not None:
|
||||
raise ValueError(
|
||||
'Linked PayrollAdjustment has already been paid '
|
||||
'(via PayrollRecord). Cannot auto-delete.'
|
||||
)
|
||||
# Safe to delete — adjustment is still unpaid.
|
||||
adj.delete()
|
||||
absence.payroll_adjustment = None
|
||||
absence.save(update_fields=['payroll_adjustment'])
|
||||
return None
|
||||
|
||||
# === PAID BRANCH ===
|
||||
# Already linked — keep the existing adjustment (admin may have
|
||||
# tweaked the amount; do not overwrite that).
|
||||
if adj is not None:
|
||||
return adj
|
||||
|
||||
# Create a fresh Bonus adjustment at the worker's daily rate.
|
||||
new_adj = PayrollAdjustment.objects.create(
|
||||
worker=absence.worker,
|
||||
type='Bonus', # DB value (Title Case) — see CLAUDE.md naming-drift section
|
||||
amount=absence.worker.daily_rate,
|
||||
date=absence.date,
|
||||
description=f'Paid {absence.get_reason_display().lower()} — auto-created from Absence #{absence.id}',
|
||||
)
|
||||
absence.payroll_adjustment = new_adj
|
||||
absence.save(update_fields=['payroll_adjustment'])
|
||||
return new_adj
|
||||
|
||||
|
||||
@login_required
|
||||
def delete_adjustment(request, adj_id):
|
||||
if request.method != 'POST':
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user