37680-vm/core/views.py
2026-01-22 12:56:34 +00:00

835 lines
32 KiB
Python

import os
import io
import pandas as pd
import uuid
from datetime import timedelta
from django.shortcuts import render, redirect, get_object_or_404
from django.contrib.auth import login, logout, authenticate
from django.contrib.auth.forms import UserCreationForm, AuthenticationForm
from django.contrib.auth.decorators import login_required
from django.contrib import messages
from django.db import transaction
from django.db.models import Count, Q
from django.http import HttpResponse
from django.core.mail import send_mail
from django.conf import settings
from django.urls import reverse
from django.utils import timezone
from django.contrib.auth import get_user_model
User = get_user_model()
from .models import Company, Profile, JobStatus, RequiredFolder, Job, JobFolderCompletion, JobFile, Invitation, Client
from .forms import CompanyForm, JobStatusForm, RequiredFolderForm, JobForm, JobFileForm, ImportJobsForm, InviteUserForm, ClientForm
def home(request):
if request.user.is_authenticated:
return redirect('dashboard')
context = {
"project_name": "RepairsHub",
}
return render(request, "core/index.html", context)
def register_view(request):
if request.method == 'POST':
form = UserCreationForm(request.POST)
if form.is_valid():
user = form.save()
login(request, user)
return redirect('company_setup')
else:
form = UserCreationForm()
return render(request, 'core/register.html', {'form': form})
def login_view(request):
if request.method == 'POST':
form = AuthenticationForm(data=request.POST)
if form.is_valid():
user = form.get_user()
login(request, user)
return redirect('dashboard')
else:
form = AuthenticationForm()
return render(request, 'core/login.html', {'form': form})
def logout_view(request):
logout(request)
return redirect('home')
@login_required
def company_setup(request):
if request.user.profile.company:
return redirect('dashboard')
if request.method == 'POST':
company_form = CompanyForm(request.POST)
status_names = request.POST.getlist('statuses')
default_status_idx = request.POST.get('default_status')
folder_names = request.POST.getlist('folders')
if company_form.is_valid() and status_names and default_status_idx is not None:
with transaction.atomic():
company = company_form.save()
profile = request.user.profile
profile.company = company
profile.role = 'ADMIN'
profile.save()
for i, name in enumerate(status_names):
if name.strip():
JobStatus.objects.create(
company=company,
name=name.strip(),
is_starting_status=(str(i) == default_status_idx),
order=i
)
for name in folder_names:
if name.strip():
RequiredFolder.objects.create(
company=company,
name=name.strip()
)
messages.success(request, f"Company {company.name} created successfully!")
return redirect('dashboard')
else:
if not status_names:
messages.error(request, "At least one status is required.")
if default_status_idx is None:
messages.error(request, "You must select a starting status.")
else:
company_form = CompanyForm()
return render(request, 'core/company_setup.html', {
'company_form': company_form,
})
@login_required
def dashboard(request):
profile = request.user.profile
if not profile.company:
return redirect('company_setup')
company = profile.company
jobs = Job.objects.filter(company=company).select_related('client', 'status')
total_jobs = jobs.count()
jobs_by_status = jobs.values('status__name').annotate(count=Count('id')).order_by('status__name')
# Jobs with incomplete folders (overall count, existing logic)
jobs_with_incomplete_folders = jobs.filter(folder_completions__is_completed=False).distinct().count()
# NEW LOGIC: Breakdown of incomplete folders per RequiredFolder
incomplete_folder_breakdown = []
required_folders = company.required_folders.all()
for folder in required_folders:
# Count jobs where this specific folder is NOT completed
missing_count = jobs.filter(
folder_completions__folder=folder,
folder_completions__is_completed=False
).distinct().count()
if missing_count > 0: # Only include if there are missing jobs for this folder
incomplete_folder_breakdown.append({
'folder': folder,
'missing_count': missing_count
})
# NEW LOGIC: Group jobs by client with detailed statistics
client_stats = []
clients = company.clients.all()
# Add a "No Client Assigned" entry first if there are jobs without a client
jobs_no_client = jobs.filter(client__isnull=True)
if jobs_no_client.exists():
no_client_total_jobs = jobs_no_client.count()
no_client_incomplete_jobs = jobs_no_client.filter(folder_completions__is_completed=False).distinct().count()
no_client_incomplete_folder_breakdown = []
for folder in required_folders:
missing_count = jobs_no_client.filter(
folder_completions__folder=folder,
folder_completions__is_completed=False
).distinct().count()
if missing_count > 0:
no_client_incomplete_folder_breakdown.append({
'folder_name': folder.name,
'missing_count': missing_count,
'folder_id': folder.id
})
client_stats.append({
'client_name': "No Client Assigned",
'total_jobs': no_client_total_jobs,
'incomplete_jobs': no_client_incomplete_jobs,
'incomplete_folder_breakdown': no_client_incomplete_folder_breakdown
})
for client in clients:
client_jobs = jobs.filter(client=client)
total_client_jobs = client_jobs.count()
if total_client_jobs > 0: # Only include clients with jobs
client_incomplete_jobs = client_jobs.filter(folder_completions__is_completed=False).distinct().count()
client_incomplete_folder_breakdown = []
for folder in required_folders:
missing_count = client_jobs.filter(
folder_completions__folder=folder,
folder_completions__is_completed=False
).distinct().count()
if missing_count > 0:
client_incomplete_folder_breakdown.append({
'folder_name': folder.name,
'missing_count': missing_count,
'folder_id': folder.id
})
client_stats.append({
'client_name': client.name,
'total_jobs': total_client_jobs,
'incomplete_jobs': client_incomplete_jobs,
'incomplete_folder_breakdown': client_incomplete_folder_breakdown
})
context = {
'company': company,
'total_jobs': total_jobs,
'jobs_by_status': jobs_by_status,
'jobs_with_incomplete_folders': jobs_with_incomplete_folders,
'jobs': jobs.order_by('-created_at')[:5],
'incomplete_folder_breakdown': incomplete_folder_breakdown,
'client_stats': client_stats, # Replaced jobs_by_client
}
return render(request, 'core/dashboard.html', context)
@login_required
def job_list(request):
profile = request.user.profile
if not profile.company:
return redirect('company_setup')
company = profile.company
jobs = Job.objects.filter(company=company).order_by('-created_at')
# NEW LOGIC: Filter by missing_folder_id
missing_folder_id = request.GET.get('missing_folder_id')
if missing_folder_id:
# Find all jobs that have this folder as required but it's not completed
jobs = jobs.filter(
folder_completions__folder__id=missing_folder_id,
folder_completions__is_completed=False
).distinct()
# Optional: Get the folder name to display in the template
missing_folder = get_object_or_404(RequiredFolder, pk=missing_folder_id, company=company)
messages.info(request, f"Showing jobs missing '{missing_folder.name}' folder completion.")
return render(request, 'core/job_list.html', {
'jobs': jobs,
'company': company
})
@login_required
def job_create(request):
profile = request.user.profile
if not profile.company:
return redirect('company_setup')
company = profile.company
if request.method == 'POST':
form = JobForm(request.POST, company=company)
if form.is_valid():
job = form.save(commit=False)
job.company = company
job.save()
for folder in company.required_folders.all():
JobFolderCompletion.objects.get_or_create(job=job, folder=folder)
messages.success(request, f"Job {job.job_ref} created successfully!")
return redirect('job_detail', pk=job.pk)
else:
form = JobForm(company=company)
return render(request, 'core/job_form.html', {
'form': form,
'title': 'Create New Job',
'company': company
})
@login_required
def job_detail(request, pk):
profile = request.user.profile
if not profile.company:
return redirect('company_setup')
company = profile.company
job = get_object_or_404(Job, pk=pk, company=company)
for folder in company.required_folders.all():
JobFolderCompletion.objects.get_or_create(job=job, folder=folder)
folder_completions = job.folder_completions.all().select_related('folder')
# Get files for each folder
for completion in folder_completions:
completion.files_list = job.files.filter(folder=completion.folder)
file_form = JobFileForm()
return render(request, 'core/job_detail.html', {
'job': job,
'folder_completions': folder_completions,
'company': company,
'file_form': file_form
})
@login_required
def job_update(request, pk):
profile = request.user.profile
if not profile.company:
return redirect('company_setup')
company = profile.company
job = get_object_or_404(Job, pk=pk, company=company)
if request.method == 'POST':
form = JobForm(request.POST, instance=job, company=company)
if form.is_valid():
form.save()
messages.success(request, f"Job {job.job_ref} updated successfully!")
return redirect('job_detail', pk=job.pk)
else:
form = JobForm(instance=job, company=company)
return render(request, 'core/job_form.html', {
'form': form,
'title': f'Edit Job: {job.job_ref}',
'company': company
})
@login_required
def job_delete(request, pk):
profile = request.user.profile
if not profile.company:
return redirect('company_setup')
company = profile.company
job = get_object_or_404(Job, pk=pk, company=company)
if request.method == 'POST':
# You might want to add a check here if there are any jobs associated with this client
# If jobs are associated, you might want to reassign them or prevent deletion.
job_ref = job.job_ref
job.delete()
messages.success(request, f"Job {job_ref} deleted.")
return redirect('job_list')
return render(request, 'core/job_confirm_delete.html', {
'job': job,
'company': company
})
@login_required
def toggle_folder_completion(request, pk, folder_id):
profile = request.user.profile
if not profile.company:
return redirect('company_setup')
company = profile.company
job = get_object_or_404(Job, pk=pk, company=company)
completion = get_object_or_404(JobFolderCompletion, job=job, folder_id=folder_id)
completion.is_completed = not completion.is_completed
completion.save()
messages.success(request, f"Folder '{completion.folder.name}' status updated.")
return redirect('job_detail', pk=job.pk)
@login_required
def job_upload_file(request, pk, folder_id):
profile = request.user.profile
company = profile.company
job = get_object_or_404(Job, pk=pk, company=company)
folder = get_object_or_404(RequiredFolder, pk=folder_id, company=company)
if request.method == 'POST':
form = JobFileForm(request.POST, request.FILES)
if form.is_valid():
job_file = form.save(commit=False)
job_file.job = job
job_file.folder = folder
job_file.uploaded_by = request.user
job_file.save()
messages.success(request, f"File uploaded to {folder.name}.")
else:
messages.error(request, "Error uploading file.")
return redirect('job_detail', pk=job.pk)
@login_required
def job_delete_file(request, pk, file_id):
profile = request.user.profile
company = profile.company
job = get_object_or_404(Job, pk=pk, company=company)
job_file = get_object_or_404(JobFile, pk=file_id, job=job)
job_file.file.delete()
job_file.delete()
messages.success(request, "File deleted.")
return redirect('job_detail', pk=job.pk)
@login_required
def job_export(request):
profile = request.user.profile
company = profile.company
jobs = Job.objects.filter(company=company)
data = []
for job in jobs:
data.append({
'Job Ref': job.job_ref,
'UPRN': job.uprn,
'Address 1': job.address_line_1,
'Address 2': job.address_line_2,
'Address 3': job.address_line_3,
'Postcode': job.postcode,
'Status': job.status.name,
'Description': job.description,
'Created At': job.created_at.strftime('%Y-%m-%d %H:%M:%S'),
})
df = pd.DataFrame(data)
# Export as Excel
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='Jobs')
output.seek(0)
response = HttpResponse(output, content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
response['Content-Disposition'] = f'attachment; filename="jobs_export_{company.name}.xlsx"'
return response
@login_required
def job_import(request):
profile = request.user.profile
company = profile.company
if request.method == 'POST':
form = ImportJobsForm(request.POST, request.FILES)
if form.is_valid():
file = request.FILES['file']
try:
if file.name.endswith('.csv'):
df = pd.read_csv(file)
else:
df = pd.read_excel(file)
# Basic mapping: Job Ref, UPRN, Address 1, Postcode, Status
# Expecting columns to match roughly or using a simple mapping
starting_status = company.statuses.filter(is_starting_status=True).first()
imported_count = 0
errors = []
with transaction.atomic():
for index, row in df.iterrows():
job_ref = str(row.get('Job Ref', '')).strip()
if not job_ref: continue
uprn = str(row.get('UPRN', '')).strip() if pd.notna(row.get('UPRN')) else None
addr1 = str(row.get('Address 1', '')).strip()
postcode = str(row.get('Postcode', '')).strip()
# Find status or use default
status_name = str(row.get('Status', '')).strip()
status = company.statuses.filter(name__iexact=status_name).first() or starting_status
try:
job, created = Job.objects.update_or_create(
company=company,
job_ref=job_ref,
defaults={
'uprn': uprn,
'address_line_1': addr1,
'postcode': postcode,
'status': status,
}
)
# Initialize folders
for folder in company.required_folders.all():
JobFolderCompletion.objects.get_or_create(job=job, folder=folder)
imported_count += 1
except Exception as e:
errors.append(f"Row {index+2}: {str(e)}")
if errors:
messages.warning(request, f"Imported {imported_count} jobs with some errors: {', '.join(errors[:5])}")
else:
messages.success(request, f"Successfully imported {imported_count} jobs.")
return redirect('job_list')
except Exception as e:
messages.error(request, f"Error processing file: {str(e)}")
else:
form = ImportJobsForm()
return render(request, 'core/job_import.html', {'form': form, 'company': company})
@login_required
def client_list(request):
profile = request.user.profile
if not profile.company or profile.role != 'ADMIN':
messages.error(request, "You must be an admin to manage clients.")
return redirect('dashboard')
company = profile.company
clients = company.clients.all()
return render(request, 'core/client_list.html', {
'clients': clients,
'company': company
})
@login_required
def client_create(request):
profile = request.user.profile
if profile.role != 'ADMIN': return redirect('dashboard')
if request.method == 'POST':
form = ClientForm(request.POST)
if form.is_valid():
client = form.save(commit=False)
client.company = profile.company
client.save()
messages.success(request, "New client created.")
return redirect('settings')
else:
form = ClientForm()
return render(request, 'core/client_form.html', {'form': form, 'title': 'Create Client'})
@login_required
def client_update(request, pk):
profile = request.user.profile
if profile.role != 'ADMIN': return redirect('dashboard')
client = get_object_or_404(Client, pk=pk, company=profile.company)
if request.method == 'POST':
form = ClientForm(request.POST, instance=client)
if form.is_valid():
form.save()
messages.success(request, "Client updated.")
return redirect('settings')
else:
form = ClientForm(instance=client)
return render(request, 'core/client_form.html', {'form': form, 'title': 'Edit Client'})
@login_required
def client_delete(request, pk):
profile = request.user.profile
if profile.role != 'ADMIN': return redirect('dashboard')
client = get_object_or_404(Client, pk=pk, company=profile.company)
if request.method == 'POST':
# You might want to add a check here if there are any jobs associated with this client
# If jobs are associated, you might want to reassign them or prevent deletion.
client.delete()
messages.success(request, "Client deleted.")
return redirect('settings')
return render(request, 'core/client_confirm_delete.html', {'client': client})
@login_required
def settings_view(request):
profile = request.user.profile
if not profile.company:
return redirect('company_setup')
if profile.role != 'ADMIN':
messages.error(request, "Only admins can access company settings.")
return redirect('dashboard')
company = profile.company
statuses = company.statuses.all()
folders = company.required_folders.all()
clients = company.clients.all()
invitations = company.invitations.filter(is_accepted=False, expires_at__gt=timezone.now())
context = {
'company': company,
'statuses': statuses,
'folders': folders,
'clients': clients,
'invitations': invitations
}
return render(request, 'core/settings.html', context)
@login_required
def status_create(request):
profile = request.user.profile
if profile.role != 'ADMIN': return redirect('dashboard')
if request.method == 'POST':
form = JobStatusForm(request.POST)
if form.is_valid():
status = form.save(commit=False)
status.company = profile.company
if status.is_starting_status:
JobStatus.objects.filter(company=profile.company).update(is_starting_status=False)
status.save()
messages.success(request, "New status created.")
return redirect('settings')
else:
form = JobStatusForm()
return render(request, 'core/status_form.html', {'form': form, 'title': 'Create Status'})
@login_required
def status_update(request, pk):
profile = request.user.profile
if profile.role != 'ADMIN': return redirect('dashboard')
status = get_object_or_404(JobStatus, pk=pk, company=profile.company)
if request.method == 'POST':
form = JobStatusForm(request.POST, instance=status)
if form.is_valid():
if form.cleaned_data['is_starting_status']:
JobStatus.objects.filter(company=profile.company).update(is_starting_status=False)
form.save()
messages.success(request, "Status updated.")
return redirect('settings')
else:
form = JobStatusForm(instance=status)
return render(request, 'core/status_form.html', {'form': form, 'title': 'Edit Status'})
@login_required
def status_delete(request, pk):
profile = request.user.profile
if profile.role != 'ADMIN': return redirect('dashboard')
status = get_object_or_404(JobStatus, pk=pk, company=profile.company)
if status.is_starting_status:
messages.error(request, "Cannot delete the starting status. Please set another status as starting first.")
return redirect('dashboard') # Changed to dashboard to avoid redirect loop for settings
if request.method == 'POST':
starting_status = profile.company.statuses.filter(is_starting_status=True).first()
Job.objects.filter(status=status).update(status=starting_status)
status.delete()
messages.success(request, f"Status deleted. Jobs reassigned to {starting_status.name}.")
return redirect('settings')
return render(request, 'core/status_confirm_delete.html', {'status': status})
@login_required
def folder_create(request):
profile = request.user.profile
if profile.role != 'ADMIN': return redirect('dashboard')
if request.method == 'POST':
form = RequiredFolderForm(request.POST)
if form.is_valid():
folder = form.save(commit=False)
folder.company = profile.company
folder.save()
messages.success(request, f"Folder '{folder.name}' added company-wide.")
return redirect('settings')
else:
form = RequiredFolderForm()
return render(request, 'core/folder_form.html', {'form': form})
@login_required
def folder_delete(request, pk):
profile = request.user.profile
if profile.role != 'ADMIN': return redirect('dashboard')
folder = get_object_or_404(RequiredFolder, pk=pk, company=profile.company)
if request.method == 'POST':
folder.delete()
messages.success(request, "Folder removed from company settings.")
return redirect('settings')
return render(request, 'core/folder_confirm_delete.html', {'folder': folder})
@login_required
def invite_user(request):
profile = request.user.profile
if not profile.company or profile.role != 'ADMIN':
messages.error(request, "You must be an admin to invite users.")
return redirect('dashboard')
company = profile.company
if request.method == 'POST':
form = InviteUserForm(request.POST)
if form.is_valid():
email = form.cleaned_data['email']
if Invitation.objects.filter(email=email, company=company, is_accepted=False, expires_at__gt=timezone.now()).exists():
messages.warning(request, f"An active invitation for {email} already exists.")
return redirect('invite_user')
# Check if user already exists in this company
if User.objects.filter(email=email, profile__company=company).exists():
messages.warning(request, f"A user with {email} already exists in your company.")
return redirect('invite_user')
try:
with transaction.atomic():
invitation = Invitation.objects.create(
company=company,
invited_by=request.user,
email=email,
expires_at=timezone.now() + timedelta(days=7) # 7 days expiry
)
invite_link = request.build_absolute_uri(
reverse('accept_invite', args=[invitation.token])
)
subject = f"Invitation to join {company.name} on RepairsHub"
message = f"You have been invited to join {company.name} on RepairsHub. Click the link to accept: {invite_link}"
from_email = settings.DEFAULT_FROM_EMAIL
recipient_list = [email]
send_mail(subject, message, from_email, recipient_list)
messages.success(request, f"Invitation sent to {email}.")
return redirect('settings') # Redirect to settings where user list will be
except Exception as e:
messages.error(request, f"Error sending invitation: {e}")
else:
messages.error(request, "Please correct the error below.")
else:
form = InviteUserForm()
return render(request, 'core/invite_user.html', {'form': form, 'company': company})
def accept_invite(request, token):
invitation = get_object_or_404(Invitation, token=token, is_accepted=False, expires_at__gt=timezone.now())
if request.user.is_authenticated:
# If user is logged in, and tries to accept an invite for another email, prevent it.
if request.user.email != invitation.email:
messages.error(request, "You are logged in with a different email address. Please log out or use the correct account.")
return redirect('home') # Or a more appropriate page
# If the user is already authenticated with the correct email, just associate them.
with transaction.atomic():
profile, created = Profile.objects.get_or_create(user=request.user, defaults={'company': invitation.company, 'role': 'STANDARD'})
if not created and profile.company != invitation.company:
messages.error(request, "You are already part of another company. Please contact support to join a new company.")
return redirect('dashboard')
elif not created and profile.company == invitation.company: # Already in this company
messages.info(request, "You are already a member of this company.")
else: # New association
profile.company = invitation.company
profile.role = 'STANDARD'
profile.save()
invitation.is_accepted = True
invitation.save()
messages.success(request, f"Welcome to {invitation.company.name}!")
return redirect('dashboard')
# If user is not authenticated, they need to register/login
if request.method == 'POST':
form = UserCreationForm(request.POST)
if form.is_valid():
with transaction.atomic():
user = form.save()
user.email = invitation.email # Ensure the user's email matches the invitation
user.save()
profile = user.profile
profile.company = invitation.company
profile.role = 'STANDARD'
profile.save()
invitation.is_accepted = True
invitation.save()
login(request, user)
messages.success(request, f"Welcome to {invitation.company.name}!")
return redirect('dashboard')
else:
messages.error(request, "Please correct the errors below.")
else:
form = UserCreationForm(initial={'email': invitation.email})
return render(request, 'core/accept_invite.html', {'form': form, 'invitation': invitation})
@login_required
def user_list(request):
profile = request.user.profile
if not profile.company or profile.role != 'ADMIN':
messages.error(request, "You must be an admin to manage users.")
return redirect('dashboard')
company = profile.company
users_in_company = User.objects.filter(profile__company=company).select_related('profile')
context = {
'company': company,
'users_in_company': users_in_company,
}
return render(request, 'core/user_list.html', context)
@login_required
def user_update_role(request, pk):
profile = request.user.profile
if not profile.company or profile.role != 'ADMIN':
messages.error(request, "You must be an admin to manage user roles.")
return redirect('dashboard')
user_to_update = get_object_or_404(User, pk=pk, profile__company=profile.company)
if request.method == 'POST':
new_role = request.POST.get('role')
if new_role in ['ADMIN', 'STANDARD']:
user_to_update.profile.role = new_role
user_to_update.profile.save()
messages.success(request, f"Role for {user_to_update.username} updated to {new_role}.")
else:
messages.error(request, "Invalid role selected.")
return redirect('user_list')
@login_required
def user_delete(request, pk):
profile = request.user.profile
if not profile.company or profile.role != 'ADMIN':
messages.error(request, "You must be an admin to delete users.")
return redirect('dashboard')
user_to_delete = get_object_or_404(User, pk=pk, profile__company=profile.company)
if request.method == 'POST':
# Prevent an admin from deleting themselves
if user_to_delete == request.user:
messages.error(request, "You cannot delete your own account.")
return redirect('user_list')
username = user_to_delete.username
user_to_delete.delete()
messages.success(request, f"User {username} deleted.")
return redirect('user_list')