38156-vm/core/views.py
2026-02-04 00:24:27 +00:00

614 lines
24 KiB
Python

import os
import platform
import json
import csv
from django.shortcuts import render, redirect, get_object_or_404
from django.utils import timezone
from django.contrib.auth.decorators import login_required
from django.db.models import Sum, Q, Prefetch
from django.core.mail import send_mail
from django.conf import settings
from django.contrib import messages
from django.http import JsonResponse, HttpResponse
from .models import Worker, Project, Team, WorkLog, PayrollRecord, Loan, PayrollAdjustment
from .forms import WorkLogForm
from datetime import timedelta
from decimal import Decimal
def home(request):
"""Render the landing screen with dashboard stats."""
workers_count = Worker.objects.count()
projects_count = Project.objects.count()
teams_count = Team.objects.count()
recent_logs = WorkLog.objects.order_by('-date')[:5]
# Analytics
# 1. Outstanding Payments (Approximate, from logs only)
outstanding_total = 0
active_workers = Worker.objects.filter(is_active=True)
for worker in active_workers:
# Find unpaid logs for this worker
unpaid_logs_count = worker.work_logs.exclude(paid_in__worker=worker).count()
outstanding_total += unpaid_logs_count * worker.day_rate
# 2. Project Costs (Active Projects)
# Calculate sum of day_rates for all workers in all logs for each project
project_costs = []
active_projects = Project.objects.filter(is_active=True)
# Simple iteration for calculation (safer than complex annotations given properties)
for project in active_projects:
cost = 0
logs = project.logs.all()
for log in logs:
# We need to sum the day_rate of all workers in this log
# Optimization: prefetch workers if slow, but for now just iterate
for worker in log.workers.all():
cost += worker.day_rate
if cost > 0:
project_costs.append({'name': project.name, 'cost': cost})
# 3. Previous 2 months payments
two_months_ago = timezone.now().date() - timedelta(days=60)
recent_payments_total = PayrollRecord.objects.filter(date__gte=two_months_ago).aggregate(total=Sum('amount'))['total'] or 0
context = {
"workers_count": workers_count,
"projects_count": projects_count,
"teams_count": teams_count,
"recent_logs": recent_logs,
"current_time": timezone.now(),
"outstanding_total": outstanding_total,
"project_costs": project_costs,
"recent_payments_total": recent_payments_total,
}
return render(request, "core/index.html", context)
def log_attendance(request):
# Build team workers map for frontend JS (needed for both GET and POST if re-rendering)
teams_qs = Team.objects.filter(is_active=True)
if request.user.is_authenticated and not request.user.is_superuser:
teams_qs = teams_qs.filter(supervisor=request.user)
team_workers_map = {}
for team in teams_qs:
# Get active workers for the team
active_workers = team.workers.filter(is_active=True).values_list('id', flat=True)
team_workers_map[team.id] = list(active_workers)
if request.method == 'POST':
form = WorkLogForm(request.POST, user=request.user)
if form.is_valid():
start_date = form.cleaned_data['date']
end_date = form.cleaned_data.get('end_date')
include_sat = form.cleaned_data.get('include_saturday')
include_sun = form.cleaned_data.get('include_sunday')
selected_workers = form.cleaned_data['workers']
project = form.cleaned_data['project']
notes = form.cleaned_data['notes']
conflict_action = request.POST.get('conflict_action')
# Generate Target Dates
target_dates = []
if end_date and end_date >= start_date:
curr = start_date
while curr <= end_date:
# 5 = Saturday, 6 = Sunday
if (curr.weekday() == 5 and not include_sat) or (curr.weekday() == 6 and not include_sun):
curr += timedelta(days=1)
continue
target_dates.append(curr)
curr += timedelta(days=1)
else:
target_dates = [start_date]
if not target_dates:
messages.warning(request, "No valid dates selected (check weekends).")
return render(request, 'core/log_attendance.html', {
'form': form, 'team_workers_json': json.dumps(team_workers_map)
})
# Check Conflicts - Scan all target dates
if not conflict_action:
conflicts = []
for d in target_dates:
# Find workers who already have a log on this date
existing_logs = WorkLog.objects.filter(date=d, workers__in=selected_workers).distinct()
for log in existing_logs:
# Which of the selected workers are in this log?
for w in log.workers.all():
if w in selected_workers:
# Avoid adding duplicates if multiple logs exist for same worker/day (rare but possible)
conflict_entry = {'name': f"{w.name} ({d.strftime('%Y-%m-%d')})"}
if conflict_entry not in conflicts:
conflicts.append(conflict_entry)
if conflicts:
context = {
'form': form,
'team_workers_json': json.dumps(team_workers_map),
'conflicting_workers': conflicts,
'is_conflict': True,
}
return render(request, 'core/log_attendance.html', context)
# Execution Phase
created_count = 0
skipped_count = 0
overwritten_count = 0
for d in target_dates:
# Find conflicts for this specific day
day_conflicts = Worker.objects.filter(
work_logs__date=d,
id__in=selected_workers.values_list('id', flat=True)
).distinct()
workers_to_save = list(selected_workers)
if day_conflicts.exists():
if conflict_action == 'skip':
conflicting_ids = day_conflicts.values_list('id', flat=True)
workers_to_save = [w for w in selected_workers if w.id not in conflicting_ids]
skipped_count += day_conflicts.count()
elif conflict_action == 'overwrite':
# Remove conflicting workers from their OLD logs
for worker in day_conflicts:
old_logs = WorkLog.objects.filter(date=d, workers=worker)
for log in old_logs:
log.workers.remove(worker)
if log.workers.count() == 0:
log.delete()
overwritten_count += day_conflicts.count()
# workers_to_save remains full list
if workers_to_save:
# Create Log
log = WorkLog.objects.create(
date=d,
project=project,
notes=notes,
supervisor=request.user if request.user.is_authenticated else None
)
log.workers.set(workers_to_save)
created_count += len(workers_to_save)
msg = f"Logged {created_count} entries."
if skipped_count:
msg += f" Skipped {skipped_count} duplicates."
if overwritten_count:
msg += f" Overwrote {overwritten_count} previous entries."
messages.success(request, msg)
return redirect('home')
else:
form = WorkLogForm(user=request.user if request.user.is_authenticated else None)
context = {
'form': form,
'team_workers_json': json.dumps(team_workers_map)
}
return render(request, 'core/log_attendance.html', context)
def work_log_list(request):
"""View work log history with advanced filtering."""
worker_id = request.GET.get('worker')
team_id = request.GET.get('team')
project_id = request.GET.get('project')
payment_status = request.GET.get('payment_status') # 'paid', 'unpaid', 'all'
logs = WorkLog.objects.all().prefetch_related('workers', 'project', 'supervisor', 'paid_in').order_by('-date', '-id')
target_worker = None
if worker_id:
logs = logs.filter(workers__id=worker_id)
# Fetch the worker to get the day rate reliably
target_worker = Worker.objects.filter(id=worker_id).first()
if team_id:
# Find workers in this team and filter logs containing them
team_workers = Worker.objects.filter(teams__id=team_id)
logs = logs.filter(workers__in=team_workers).distinct()
if project_id:
logs = logs.filter(project_id=project_id)
if payment_status == 'paid':
# Logs that are linked to at least one PayrollRecord
logs = logs.filter(paid_in__isnull=False).distinct()
elif payment_status == 'unpaid':
# This is tricky because a log can have multiple workers, some paid some not.
# But usually a WorkLog is marked paid when its workers are paid.
# If we filtered by worker, we can check if THAT worker is paid in that log.
if worker_id:
worker = get_object_or_404(Worker, pk=worker_id)
logs = logs.exclude(paid_in__worker=worker)
else:
logs = logs.filter(paid_in__isnull=True)
# Calculate amounts for display
# Convert to list to attach attributes
final_logs = []
total_amount = 0
for log in logs:
if target_worker:
log.display_amount = target_worker.day_rate
else:
# Sum of all workers in this log
log.display_amount = sum(w.day_rate for w in log.workers.all())
final_logs.append(log)
total_amount += log.display_amount
# Context for filters
context = {
'logs': final_logs,
'total_amount': total_amount,
'workers': Worker.objects.filter(is_active=True).order_by('name'),
'teams': Team.objects.filter(is_active=True).order_by('name'),
'projects': Project.objects.filter(is_active=True).order_by('name'),
'selected_worker': int(worker_id) if worker_id else None,
'selected_team': int(team_id) if team_id else None,
'selected_project': int(project_id) if project_id else None,
'selected_payment_status': payment_status,
'target_worker': target_worker,
}
return render(request, 'core/work_log_list.html', context)
def export_work_log_csv(request):
"""Export filtered work logs to CSV."""
worker_id = request.GET.get('worker')
team_id = request.GET.get('team')
project_id = request.GET.get('project')
payment_status = request.GET.get('payment_status')
logs = WorkLog.objects.all().prefetch_related('workers', 'project', 'supervisor', 'paid_in').order_by('-date', '-id')
target_worker = None
if worker_id:
logs = logs.filter(workers__id=worker_id)
target_worker = Worker.objects.filter(id=worker_id).first()
if team_id:
team_workers = Worker.objects.filter(teams__id=team_id)
logs = logs.filter(workers__in=team_workers).distinct()
if project_id:
logs = logs.filter(project_id=project_id)
if payment_status == 'paid':
logs = logs.filter(paid_in__isnull=False).distinct()
elif payment_status == 'unpaid':
if worker_id:
worker = get_object_or_404(Worker, pk=worker_id)
logs = logs.exclude(paid_in__worker=worker)
else:
logs = logs.filter(paid_in__isnull=True)
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="work_logs.csv"'
writer = csv.writer(response)
writer.writerow(['Date', 'Project', 'Workers', 'Amount', 'Payment Status', 'Supervisor'])
for log in logs:
# Amount Logic
if target_worker:
display_amount = target_worker.day_rate
workers_str = target_worker.name
else:
display_amount = sum(w.day_rate for w in log.workers.all())
workers_str = ", ".join([w.name for w in log.workers.all()])
# Payment Status Logic
is_paid = log.paid_in.exists()
status_str = "Paid" if is_paid else "Pending"
writer.writerow([
log.date,
log.project.name,
workers_str,
f"{display_amount:.2f}",
status_str,
log.supervisor.username if log.supervisor else "System"
])
return response
def manage_resources(request):
"""View to manage active status of resources."""
# Prefetch teams for workers to avoid N+1 in template
workers = Worker.objects.all().prefetch_related('teams').order_by('name')
projects = Project.objects.all().order_by('name')
teams = Team.objects.all().prefetch_related('workers').order_by('name')
context = {
'workers': workers,
'projects': projects,
'teams': teams,
}
return render(request, 'core/manage_resources.html', context)
def toggle_resource_status(request, model_type, pk):
"""Toggle the is_active status of a resource."""
if request.method == 'POST':
model_map = {
'worker': Worker,
'project': Project,
'team': Team,
}
model_class = model_map.get(model_type)
if model_class:
obj = get_object_or_404(model_class, pk=pk)
obj.is_active = not obj.is_active
obj.save()
if request.headers.get('x-requested-with') == 'XMLHttpRequest':
return JsonResponse({
'success': True,
'is_active': obj.is_active,
'message': f"{obj.name} is now {'Active' if obj.is_active else 'Inactive'}ணை."
})
return redirect('manage_resources')
def payroll_dashboard(request):
"""Dashboard for payroll management with filtering."""
status_filter = request.GET.get('status', 'pending') # pending, paid, all
# Common Analytics
outstanding_total = 0
active_workers = Worker.objects.filter(is_active=True).order_by('name').prefetch_related('adjustments')
workers_data = [] # For pending payments
for worker in active_workers:
# Unpaid Work Logs
unpaid_logs = worker.work_logs.exclude(paid_in__worker=worker)
log_count = unpaid_logs.count()
log_amount = log_count * worker.day_rate
# Pending Adjustments (unlinked to any payroll record)
pending_adjustments = worker.adjustments.filter(payroll_record__isnull=True)
adj_total = Decimal('0.00')
for adj in pending_adjustments:
if adj.type in ['BONUS', 'OVERTIME', 'LOAN']:
adj_total += adj.amount
elif adj.type in ['DEDUCTION', 'LOAN_REPAYMENT']:
adj_total -= adj.amount
total_payable = log_amount + adj_total
# Only show if there is something to pay or negative (e.g. loan repayment greater than work)
# Note: If total_payable is negative, it implies they owe money.
if log_count > 0 or pending_adjustments.exists():
outstanding_total += max(total_payable, Decimal('0.00')) # Only count positive payable for grand total
if status_filter in ['pending', 'all']:
workers_data.append({
'worker': worker,
'unpaid_count': log_count,
'unpaid_amount': log_amount,
'adj_amount': adj_total,
'total_payable': total_payable,
'adjustments': pending_adjustments,
'logs': unpaid_logs
})
# Paid History
paid_records = []
if status_filter in ['paid', 'all']:
paid_records = PayrollRecord.objects.select_related('worker').order_by('-date', '-id')
# Analytics: Project Costs (Active Projects)
project_costs = []
active_projects = Project.objects.filter(is_active=True)
for project in active_projects:
cost = 0
logs = project.logs.all()
for log in logs:
for worker in log.workers.all():
cost += worker.day_rate
if cost > 0:
project_costs.append({'name': project.name, 'cost': cost})
# Analytics: Previous 2 months payments
two_months_ago = timezone.now().date() - timedelta(days=60)
recent_payments_total = PayrollRecord.objects.filter(date__gte=two_months_ago).aggregate(total=Sum('amount'))['total'] or 0
# Active Loans for dropdowns/modals
all_workers = Worker.objects.filter(is_active=True).order_by('name')
context = {
'workers_data': workers_data,
'paid_records': paid_records,
'outstanding_total': outstanding_total,
'project_costs': project_costs,
'recent_payments_total': recent_payments_total,
'active_tab': status_filter,
'all_workers': all_workers,
'adjustment_types': PayrollAdjustment.ADJUSTMENT_TYPES,
}
return render(request, 'core/payroll_dashboard.html', context)
def process_payment(request, worker_id):
"""Process payment for a worker, mark logs as paid, link adjustments, and email receipt."""
worker = get_object_or_404(Worker, pk=worker_id)
if request.method == 'POST':
# Find unpaid logs
unpaid_logs = worker.work_logs.exclude(paid_in__worker=worker)
log_count = unpaid_logs.count()
logs_amount = log_count * worker.day_rate
# Find pending adjustments
pending_adjustments = worker.adjustments.filter(payroll_record__isnull=True)
adj_amount = Decimal('0.00')
for adj in pending_adjustments:
if adj.type in ['BONUS', 'OVERTIME', 'LOAN']:
adj_amount += adj.amount
elif adj.type in ['DEDUCTION', 'LOAN_REPAYMENT']:
adj_amount -= adj.amount
total_amount = logs_amount + adj_amount
if log_count > 0 or pending_adjustments.exists():
# Create Payroll Record
payroll_record = PayrollRecord.objects.create(
worker=worker,
amount=total_amount,
date=timezone.now().date()
)
# Link logs
payroll_record.work_logs.set(unpaid_logs)
# Link Adjustments and Handle Loans
for adj in pending_adjustments:
adj.payroll_record = payroll_record
adj.save()
# Update Loan Balance if it's a repayment
if adj.type == 'LOAN_REPAYMENT' and adj.loan:
adj.loan.balance -= adj.amount
if adj.loan.balance <= 0:
adj.loan.balance = 0
adj.loan.is_active = False
adj.loan.save()
payroll_record.save()
# Email Notification
subject = f"Payslip for {worker.name} - {payroll_record.date}"
message = (
f"Payslip Generated\n\n"
f"Record ID: #{payroll_record.id}\n"
f"Worker: {worker.name}\n"
f"Date: {payroll_record.date}\n"
f"Total Paid: R {payroll_record.amount}\n\n"
f"Breakdown:\n"
f"Base Pay ({log_count} days): R {logs_amount}\n"
f"Adjustments: R {adj_amount}\n\n"
f"This is an automated notification."
)
recipient_list = ['foxfitt-ed9wc+expense@to.sparkreceipt.com']
try:
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, recipient_list)
messages.success(request, f"Payment processed for {worker.name}. Net Pay: R {payroll_record.amount}")
except Exception as e:
messages.warning(request, f"Payment processed, but email delivery failed: {str(e)}")
return redirect('payroll_dashboard')
return redirect('payroll_dashboard')
def payslip_detail(request, pk):
"""Show details of a payslip (Payment Record)."""
record = get_object_or_404(PayrollRecord, pk=pk)
# Get the logs included in this payment
logs = record.work_logs.all().order_by('date')
adjustments = record.adjustments.all().order_by('type')
# Calculate base pay from logs (re-verify logic)
# The record.amount is the final NET.
# We can reconstruct the display.
base_pay = sum(w.day_rate for l in logs for w in l.workers.all() if w == record.worker)
adjustments_net = record.amount - base_pay
context = {
'record': record,
'logs': logs,
'adjustments': adjustments,
'base_pay': base_pay,
'adjustments_net': adjustments_net,
}
return render(request, 'core/payslip.html', context)
def loan_list(request):
"""List outstanding and historical loans."""
filter_status = request.GET.get('status', 'active') # active, history
if filter_status == 'history':
loans = Loan.objects.filter(is_active=False).order_by('-date')
else:
loans = Loan.objects.filter(is_active=True).order_by('-date')
context = {
'loans': loans,
'filter_status': filter_status,
'workers': Worker.objects.filter(is_active=True).order_by('name'), # For modal
}
return render(request, 'core/loan_list.html', context)
def add_loan(request):
"""Create a new loan."""
if request.method == 'POST':
worker_id = request.POST.get('worker')
amount = request.POST.get('amount')
reason = request.POST.get('reason')
date = request.POST.get('date') or timezone.now().date()
if worker_id and amount:
worker = get_object_or_404(Worker, pk=worker_id)
Loan.objects.create(
worker=worker,
amount=amount,
date=date,
reason=reason
)
messages.success(request, f"Loan of R{amount} recorded for {worker.name}.")
return redirect('loan_list')
def add_adjustment(request):
"""Add a payroll adjustment (Bonus, Overtime, Deduction, Repayment)."""
if request.method == 'POST':
worker_id = request.POST.get('worker')
adj_type = request.POST.get('type')
amount = request.POST.get('amount')
description = request.POST.get('description')
date = request.POST.get('date') or timezone.now().date()
loan_id = request.POST.get('loan_id') # Optional, for repayments
if worker_id and amount and adj_type:
worker = get_object_or_404(Worker, pk=worker_id)
# Validation for repayment OR Creation for New Loan
loan = None
if adj_type == 'LOAN_REPAYMENT':
if loan_id:
loan = get_object_or_404(Loan, pk=loan_id)
else:
# Try to find an active loan
loan = worker.loans.filter(is_active=True).first()
if not loan:
messages.warning(request, f"Cannot add repayment: {worker.name} has no active loans.")
return redirect('payroll_dashboard')
elif adj_type == 'LOAN':
# Create the Loan object tracking the debt
loan = Loan.objects.create(
worker=worker,
amount=amount,
date=date,
reason=description
)
PayrollAdjustment.objects.create(
worker=worker,
type=adj_type,
amount=amount,
description=description,
date=date,
loan=loan
)
messages.success(request, f"{adj_type} of R{amount} added for {worker.name}.")
return redirect('payroll_dashboard')