from django.shortcuts import render, redirect, get_object_or_404
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.contrib.auth import update_session_auth_hash
from django.urls import reverse
from django.http import JsonResponse, HttpResponse, FileResponse
import subprocess
from django.core.paginator import Paginator
from django.db import transaction
from django.db.models import Sum, Q, Count, F
from django.db.models.functions import TruncMonth, TruncDay
from django.utils import timezone
from django.views.decorators.csrf import csrf_exempt
from django.utils.translation import gettext as _
import json
import decimal
import datetime
import logging
import base64
import os
import random
from django.conf import settings as django_settings
from django.contrib.auth.models import User, Group, Permission
from django.contrib.contenttypes.models import ContentType
from .models import (
SystemSetting, Customer, Supplier, Product, Category, Unit,
Sale, SaleItem, SalePayment, SaleReturn, SaleReturnItem,
Purchase, PurchaseItem, PurchasePayment, PurchaseReturn, PurchaseReturnItem,
Expense, ExpenseCategory, PaymentMethod, LoyaltyTier, LoyaltyTransaction,
Device, CashierSession, CashierCounterRegistry, PurchaseOrder, PurchaseOrderItem,
UserProfile, HeldSale, Quotation, QuotationItem
)
from .forms import (
SystemSettingForm, CustomerForm, SupplierForm, ProductForm, CategoryForm,
UnitForm, ExpenseForm, CashierSessionStartForm, CashierSessionCloseForm
)
from .utils import number_to_words_en, send_whatsapp_message, send_whatsapp_document
from .views_import import *
from django.template.loader import render_to_string
logger = logging.getLogger(__name__)
# --- Basic Views ---
@login_required
def index(request):
settings = SystemSetting.objects.first()
if not settings:
settings = SystemSetting.objects.create()
today = timezone.now().date()
# 1. Financials
total_sales_amount = Sale.objects.aggregate(Sum('total_amount'))['total_amount__sum'] or 0
total_receivables = Sale.objects.aggregate(Sum('balance_due'))['balance_due__sum'] or 0
total_payables = Purchase.objects.aggregate(Sum('balance_due'))['balance_due__sum'] or 0
# 2. Counts
total_sales_count = Sale.objects.count()
total_products = Product.objects.count()
total_customers = Customer.objects.count()
# 3. Charts: Monthly Sales (Last 12 months)
last_12_months = timezone.now() - datetime.timedelta(days=365)
monthly_sales = (
Sale.objects.filter(created_at__gte=last_12_months)
.annotate(month=TruncMonth('created_at'))
.values('month')
.annotate(total=Sum('total_amount'))
.order_by('month'))
monthly_labels = [m['month'].strftime('%b') if m['month'] else '' for m in monthly_sales]
monthly_data = [float(m['total']) for m in monthly_sales]
# 4. Charts: Daily Sales (Last 7 days)
last_7_days = timezone.now() - datetime.timedelta(days=7)
daily_sales = (
Sale.objects.filter(created_at__gte=last_7_days)
.annotate(day=TruncDay('created_at'))
.values('day')
.annotate(total=Sum('total_amount'))
.order_by('day'))
chart_labels = [d['day'].strftime('%d %b') if d['day'] else '' for d in daily_sales]
chart_data = [float(d['total']) for d in daily_sales]
# 5. Category Distribution
category_dist = (
SaleItem.objects.values('product__category__name_en', 'product__category__name_ar')
.annotate(total=Sum('line_total'))
.order_by('-total')[:5])
category_labels = [c['product__category__name_en'] or 'Uncategorized' for c in category_dist]
category_data = [float(c['total']) for c in category_dist]
# 6. Payment Methods
payment_dist = (
SalePayment.objects.values('payment_method__name_en')
.annotate(total=Sum('amount'))
.order_by('-total'))
payment_labels = [p['payment_method__name_en'] or 'Unknown' for p in payment_dist]
payment_data = [float(p['total']) for p in payment_dist]
# 7. Top Products
top_products = SaleItem.objects.values(
'product__name_en', 'product__name_ar'
).annotate(
total_qty=Sum('quantity'),
total_rev=Sum('line_total')
).order_by('-total_rev')[:5]
# 8. Low Stock & Expired
# Low stock
low_stock_qs = Product.objects.filter(stock_quantity__lte=F('min_stock_level'))
low_stock_count = low_stock_qs.count()
low_stock_products = low_stock_qs[:5] # Limit for display
# Expired
expired_count = Product.objects.filter(
has_expiry=True,
expiry_date__lt=today
).count()
# 9. Recent Sales
recent_sales = Sale.objects.select_related('customer', 'created_by').order_by('-created_at')[:10]
context = {
'site_settings': settings,
'settings': settings, # Keep both for safety
'total_sales_amount': total_sales_amount,
'total_receivables': total_receivables,
'total_payables': total_payables,
'total_sales_count': total_sales_count,
'total_products': total_products,
'total_customers': total_customers,
'monthly_labels': monthly_labels,
'monthly_data': monthly_data,
'chart_labels': chart_labels,
'chart_data': chart_data,
'category_labels': category_labels,
'category_data': category_data,
'payment_labels': payment_labels,
'payment_data': payment_data,
'top_products': top_products,
'low_stock_count': low_stock_count,
'low_stock_products': low_stock_products,
'expired_count': expired_count,
'recent_sales': recent_sales,
}
return render(request, 'core/index.html', context)
@login_required
def inventory(request):
logger.info("Inventory view accessed")
products = Product.objects.all().order_by('name_en')
categories = Category.objects.all()
units = Unit.objects.all()
suppliers = Supplier.objects.all().order_by('name')
# Expired/Expiring logic
today = timezone.now().date()
next_30_days = today + datetime.timedelta(days=30)
expired_products = products.filter(has_expiry=True, expiry_date__lt=today)
expiring_soon_products = products.filter(has_expiry=True, expiry_date__gte=today, expiry_date__lte=next_30_days)
settings = SystemSetting.objects.first()
if not settings:
settings = SystemSetting.objects.create()
context = {
'products': products,
'categories': categories,
'units': units,
'suppliers': suppliers,
'expired_products': expired_products,
'expiring_soon_products': expiring_soon_products,
'site_settings': settings,
}
return render(request, 'core/inventory.html', context)
@login_required
def customers(request):
customers = Customer.objects.all().order_by('name')
return render(request, 'core/customers.html', {'customers': customers})
@login_required
def suppliers(request):
suppliers = Supplier.objects.all().order_by('name')
return render(request, 'core/suppliers.html', {'suppliers': suppliers})
@login_required
def settings_view(request):
settings = SystemSetting.objects.first()
if not settings:
settings = SystemSetting.objects.create()
payment_methods = PaymentMethod.objects.filter(is_active=True)
expense_categories = ExpenseCategory.objects.all()
loyalty_tiers = LoyaltyTier.objects.all().order_by("min_points")
devices = Device.objects.all().order_by("name")
if request.method == 'POST':
setting_type = request.POST.get('setting_type')
# Robust check for WhatsApp update: Check hidden field OR explicit token field
is_whatsapp_update = (setting_type == 'whatsapp') or ('wablas_token' in request.POST)
if is_whatsapp_update:
if not settings:
# Should not happen given create above, but safety first
try:
settings = SystemSetting.objects.create()
except Exception:
messages.error(request, _("Database error: Could not save settings."))
return redirect(reverse('settings') + '#whatsapp')
# Handle WhatsApp update manually to avoid validation errors on other fields
settings.wablas_enabled = request.POST.get('wablas_enabled') == 'on'
settings.wablas_token = request.POST.get('wablas_token', '').strip()
settings.wablas_server_url = request.POST.get('wablas_server_url', '').strip()
settings.wablas_secret_key = request.POST.get('wablas_secret_key', '').strip()
settings.save()
messages.success(request, _("WhatsApp settings updated successfully."))
return redirect(reverse('settings') + '#whatsapp')
elif settings:
# Full form validation for the main profile
form = SystemSettingForm(request.POST, request.FILES, instance=settings)
if form.is_valid():
form.save()
messages.success(request, _("Settings updated successfully."))
return redirect('settings')
else:
messages.error(request, _("Please correct the errors below."))
else:
form = SystemSettingForm(instance=settings) if settings else None
return render(request, 'core/settings.html', {
'form': form,
'settings': settings,
'payment_methods': payment_methods,
'expense_categories': expense_categories,
'loyalty_tiers': loyalty_tiers,
'devices': devices
})
@login_required
def profile_view(request):
user = request.user
# Ensure profile exists
UserProfile.objects.get_or_create(user=user)
if request.method == 'POST':
# Check if it's profile update or password update
if 'password' in request.POST and 'confirm_password' in request.POST:
password = request.POST.get('password')
confirm_password = request.POST.get('confirm_password')
if password:
if password == confirm_password:
user.set_password(password)
user.save()
update_session_auth_hash(request, user)
messages.success(request, _("Password updated successfully!"))
else:
messages.error(request, _("Passwords do not match."))
else:
# Profile Update
user.first_name = request.POST.get('first_name', user.first_name)
user.last_name = request.POST.get('last_name', user.last_name)
user.email = request.POST.get('email', user.email)
user.save()
profile = user.profile
profile.phone = request.POST.get('phone', profile.phone)
profile.bio = request.POST.get('bio', profile.bio)
if 'image' in request.FILES:
profile.image = request.FILES['image']
profile.save()
messages.success(request, _("Profile updated successfully!"))
return redirect('profile')
return render(request, 'core/profile.html')
@login_required
def user_management(request):
if request.method == 'POST':
action = request.POST.get('action')
try:
if action == 'add':
username = request.POST.get('username')
email = request.POST.get('email')
password = request.POST.get('password')
group_ids = request.POST.getlist('groups')
if User.objects.filter(username=username).exists():
messages.error(request, _("Username already exists."))
else:
user = User.objects.create_user(username=username, email=email, password=password)
if group_ids:
user.groups.set(group_ids)
messages.success(request, _("User created successfully."))
elif action == 'edit_user':
user_id = request.POST.get('user_id')
user = get_object_or_404(User, id=user_id)
user.email = request.POST.get('email')
password = request.POST.get('password')
if password:
user.set_password(password)
user.save()
group_ids = request.POST.getlist('groups')
user.groups.set(group_ids)
messages.success(request, _("User updated successfully."))
elif action == 'toggle_status':
user_id = request.POST.get('user_id')
if int(user_id) == request.user.id:
messages.error(request, _("You cannot deactivate yourself."))
else:
user = get_object_or_404(User, id=user_id)
user.is_active = not user.is_active
user.save()
status = _("activated") if user.is_active else _("deactivated")
messages.success(request, _(f"User {status}."))
elif action == 'add_group':
name = request.POST.get('name')
perm_ids = request.POST.getlist('permissions')
if Group.objects.filter(name=name).exists():
messages.error(request, _("Group name already exists."))
else:
group = Group.objects.create(name=name)
if perm_ids:
group.permissions.set(perm_ids)
messages.success(request, _("Group created successfully."))
elif action == 'edit_group':
group_id = request.POST.get('group_id')
group = get_object_or_404(Group, id=group_id)
group.name = request.POST.get('name')
group.save()
perm_ids = request.POST.getlist('permissions')
group.permissions.set(perm_ids)
messages.success(request, _("Group updated successfully."))
elif action == 'delete_group':
group_id = request.POST.get('group_id')
group = get_object_or_404(Group, id=group_id)
group.delete()
messages.success(request, _("Group deleted successfully."))
except Exception as e:
logger.error(f"User Management Error: {e}")
messages.error(request, _(f"An error occurred: {e}"))
return redirect('user_management')
users = User.objects.all().order_by('username')
groups = Group.objects.all().order_by('name')
# Filter permissions to exclude internal/system apps if desired, or show all
permissions = Permission.objects.select_related('content_type').order_by('content_type__app_label', 'content_type__model')
return render(request, 'core/users.html', {
'users': users,
'groups': groups,
'permissions': permissions
})
@login_required
def group_details_api(request, pk):
group = get_object_or_404(Group, pk=pk)
permissions = list(group.permissions.values_list('id', flat=True))
return JsonResponse({'permissions': permissions})
# --- POS Views ---
@login_required
def pos(request):
# Permission check
if not (request.user.is_staff or request.user.has_perm('core.view_pos')):
messages.error(request, _("You do not have permission to access the POS system."))
return redirect('index')
# Check for active session
active_session = CashierSession.objects.filter(user=request.user, status='active').first()
if not active_session:
# Check if user is a cashier (assigned to a counter)
if hasattr(request.user, 'counter_assignment'):
messages.warning(request, _("Please open a session to start selling."))
return redirect('start_session')
settings = SystemSetting.objects.first()
if not settings:
settings = SystemSetting.objects.create()
products = Product.objects.filter(is_active=True)
if settings and not settings.allow_zero_stock_sales:
products = products.filter(stock_quantity__gt=0)
customers = Customer.objects.all()
categories = Category.objects.all()
payment_methods = PaymentMethod.objects.filter(is_active=True)
# Ensure at least Cash exists
if not payment_methods.exists():
PaymentMethod.objects.create(name_en="Cash", name_ar="نقدي", is_active=True)
payment_methods = PaymentMethod.objects.filter(is_active=True)
context = {
'products': products,
'customers': customers,
'categories': categories,
'payment_methods': payment_methods,
'settings': settings,
'site_settings': settings, # Add site_settings for template consistency
'active_session': active_session
}
return render(request, 'core/pos.html', context)
@login_required
def customer_display(request):
return render(request, 'core/customer_display.html')
@csrf_exempt
def pos_sync_update(request):
return JsonResponse({'status': 'ok'})
@csrf_exempt
def pos_sync_state(request):
return JsonResponse({'state': {}})
# --- Sales / Invoices ---
@login_required
def invoice_list(request):
sales = Sale.objects.all().order_by('-created_at')
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
if start_date:
sales = sales.filter(created_at__date__gte=start_date)
if end_date:
sales = sales.filter(created_at__date__lte=end_date)
customer_id = request.GET.get('customer')
if customer_id:
sales = sales.filter(customer_id=customer_id)
status = request.GET.get('status')
if status:
sales = sales.filter(status=status)
# NEW: Search functionality
query = request.GET.get('q')
if query:
sales = sales.filter(
Q(customer__name__icontains=query) |
Q(customer__phone__icontains=query) |
Q(invoice_number__icontains=query)
)
settings = SystemSetting.objects.first()
if not settings:
settings = SystemSetting.objects.create()
paginator = Paginator(sales, 20)
context = {
'sales': paginator.get_page(request.GET.get('page')),
'customers': Customer.objects.all(),
'payment_methods': PaymentMethod.objects.filter(is_active=True),
'site_settings': settings,
'query': query, # Pass query back to template
}
return render(request, 'core/invoices.html', context)
@login_required
@login_required
def invoice_create(request):
customers = Customer.objects.all().order_by('name')
products = Product.objects.filter(is_active=True).select_related('category')
payment_methods = PaymentMethod.objects.filter(is_active=True)
site_settings = SystemSetting.objects.first()
if not site_settings:
site_settings = SystemSetting.objects.create()
decimal_places = site_settings.decimal_places or 2
return render(request, 'core/invoice_create.html', {
'customers': customers,
'products': products,
'payment_methods': payment_methods,
'site_settings': site_settings,
'decimal_places': decimal_places,
})
@login_required
def invoice_detail(request, pk):
sale = get_object_or_404(Sale, pk=pk)
return render(request, 'core/invoice_detail.html', {'sale': sale})
@login_required
def edit_invoice(request, pk):
sale = get_object_or_404(Sale, pk=pk)
customers = Customer.objects.all()
products = Product.objects.filter(is_active=True).select_related('category')
payment_methods = PaymentMethod.objects.filter(is_active=True)
site_settings = SystemSetting.objects.first()
if not site_settings:
site_settings = SystemSetting.objects.create()
decimal_places = 2
if site_settings:
decimal_places = site_settings.decimal_places
cart_items = []
for item in sale.items.all().select_related('product'):
cart_items.append({
'id': item.product.id,
'name_en': item.product.name_en,
'name_ar': item.product.name_ar,
'sku': item.product.sku,
'price': float(item.unit_price),
'quantity': float(item.quantity),
'stock': float(item.product.stock_quantity)
})
cart_json = json.dumps(cart_items)
payment_method_id = ""
first_payment = sale.payments.first()
if first_payment and first_payment.payment_method:
payment_method_id = first_payment.payment_method.id
context = {
'sale': sale,
'customers': customers,
'products': products,
'payment_methods': payment_methods,
'site_settings': site_settings,
'decimal_places': decimal_places,
'cart_json': cart_json,
'payment_method_id': payment_method_id
}
return render(request, 'core/invoice_edit.html', context)
@login_required
def delete_sale(request, pk):
sale = get_object_or_404(Sale, pk=pk)
# Restore stock
for item in sale.items.all():
item.product.stock_quantity += item.quantity
item.product.save()
sale.delete()
messages.success(request, _("Sale deleted successfully."))
return redirect('invoices')
@login_required
def add_sale_payment(request, pk):
sale = get_object_or_404(Sale, pk=pk)
if request.method == 'POST':
amount = decimal.Decimal(request.POST.get('amount', 0))
payment_method_id = request.POST.get('payment_method')
pm_name = "Cash"
if payment_method_id:
try:
pm = PaymentMethod.objects.get(id=payment_method_id)
pm_name = pm.name_en
except PaymentMethod.DoesNotExist:
pass
SalePayment.objects.create(
sale=sale,
amount=amount,
payment_method_id=payment_method_id,
payment_method_name=pm_name,
created_by=request.user,
notes=request.POST.get('notes', '')
)
sale.update_balance()
messages.success(request, _("Payment added."))
return redirect('invoice_detail', pk=pk)
@login_required
def customer_payments(request):
payments = SalePayment.objects.select_related('sale', 'sale__customer').order_by('-payment_date', '-created_at')
paginator = Paginator(payments, 25)
page_number = request.GET.get('page')
payments = paginator.get_page(page_number)
return render(request, 'core/customer_payments.html', {'payments': payments})
@login_required
def customer_payment_receipt(request, pk):
payment = get_object_or_404(SalePayment, pk=pk)
settings = None
try:
settings = SystemSetting.objects.first()
except Exception:
pass
return render(request, 'core/payment_receipt.html', {
'payment': payment,
'settings': settings,
'amount_in_words': number_to_words_en(payment.amount)
})
@login_required
def sale_receipt(request, pk):
sale = get_object_or_404(Sale, pk=pk)
settings = None
try:
settings = SystemSetting.objects.first()
except Exception:
pass
return render(request, 'core/sale_receipt.html', {
'sale': sale,
'settings': settings
})
# ---Quotations ---
@login_required
def quotations(request):
quotations = Quotation.objects.all().order_by('-created_at')
return render(request, 'core/quotations.html', {'quotations': quotations})
@login_required
def quotation_create(request):
customers = Customer.objects.all()
products = Product.objects.filter(is_active=True)
return render(request, 'core/quotation_create.html', {'customers': customers, 'products': products})
@login_required
def quotation_detail(request, pk):
quotation = get_object_or_404(Quotation, pk=pk)
return render(request, 'core/quotation_detail.html', {'quotation': quotation})
@login_required
def convert_quotation_to_invoice(request, pk):
quotation = get_object_or_404(Quotation, pk=pk)
if quotation.status != 'converted':
# Create Sale from Quotation
with transaction.atomic():
sale = Sale.objects.create(
customer=quotation.customer,
quotation=quotation,
total_amount=quotation.total_amount,
discount=quotation.discount,
status='unpaid',
balance_due=quotation.total_amount,
created_by=request.user
)
for item in quotation.items.all():
SaleItem.objects.create(
sale=sale,
product=item.product,
quantity=item.quantity,
unit_price=item.unit_price,
line_total=item.line_total
)
# Deduct Stock
item.product.stock_quantity -= item.quantity
item.product.save()
quotation.status = 'converted'
quotation.save()
messages.success(request, _("Quotation converted to Invoice."))
return redirect('invoice_detail', pk=sale.pk)
return redirect('quotations')
@login_required
def delete_quotation(request, pk):
quotation = get_object_or_404(Quotation, pk=pk)
quotation.delete()
messages.success(request, _("Quotation deleted."))
return redirect('quotations')
@csrf_exempt
@login_required
def create_quotation_api(request):
# Simplified API stub
return JsonResponse({'success': True})
# --- Sales Returns ---
@login_required
def sales_returns(request):
returns = SaleReturn.objects.all().order_by('-created_at')
return render(request, 'core/sales_returns.html', {'returns': returns})
@login_required
def sale_return_create(request):
customers = Customer.objects.all()
products = Product.objects.filter(is_active=True)
return render(request, 'core/sale_return_create.html', {
'customers': customers,
'products': products
})
@login_required
def sale_return_detail(request, pk):
sale_return = get_object_or_404(SaleReturn, pk=pk)
return render(request, 'core/sale_return_detail.html', {'sale_return': sale_return})
@login_required
def delete_sale_return(request, pk):
sale_return = get_object_or_404(SaleReturn, pk=pk)
# Restore stock (reverse of return)
for item in sale_return.items.all():
item.product.stock_quantity -= item.quantity
item.product.save()
sale_return.delete()
messages.success(request, _("Sale Return deleted."))
return redirect('sales_returns')
# --- Purchases ---
@login_required
def purchases(request):
# Base QuerySet
purchases_qs = Purchase.objects.select_related('supplier', 'created_by').all().order_by('-created_at')
# Filtering
search_query = request.GET.get('q', '')
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
supplier_id = request.GET.get('supplier')
if search_query:
purchases_qs = purchases_qs.filter(
Q(invoice_number__icontains=search_query) |
Q(notes__icontains=search_query) |
Q(id__icontains=search_query)
)
if start_date:
purchases_qs = purchases_qs.filter(created_at__date__gte=start_date)
if end_date:
purchases_qs = purchases_qs.filter(created_at__date__lte=end_date)
if supplier_id:
purchases_qs = purchases_qs.filter(supplier_id=supplier_id)
# Pagination
paginator = Paginator(purchases_qs, 20)
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
# Context Data
payment_methods = PaymentMethod.objects.filter(is_active=True)
site_settings = SystemSetting.objects.first()
if not site_settings:
site_settings = SystemSetting.objects.create()
suppliers = Supplier.objects.all().order_by('name')
return render(request, 'core/purchases.html', {
'purchases': page_obj,
'payment_methods': payment_methods,
'site_settings': site_settings,
'suppliers': suppliers,
'search_query': search_query,
'start_date': start_date,
'end_date': end_date,
'selected_supplier': int(supplier_id) if supplier_id and supplier_id.isdigit() else None
})
@login_required
@login_required
def purchase_create(request):
suppliers = Supplier.objects.all()
products = Product.objects.filter(is_active=True)
payment_methods = PaymentMethod.objects.filter(is_active=True)
settings = SystemSetting.objects.first()
return render(request, 'core/purchase_create.html', {
'suppliers': suppliers,
'products': products,
'payment_methods': payment_methods,
'decimal_places': settings.decimal_places if settings else 3
})
@login_required
def purchase_detail(request, pk):
purchase = get_object_or_404(Purchase, pk=pk)
return render(request, 'core/purchase_detail.html', {'purchase': purchase})
@login_required
def edit_purchase(request, pk):
purchase = get_object_or_404(Purchase, pk=pk)
suppliers = Supplier.objects.all()
products = Product.objects.filter(is_active=True)
payment_methods = PaymentMethod.objects.filter(is_active=True)
site_settings = SystemSetting.objects.first()
if not site_settings:
site_settings = SystemSetting.objects.create()
decimal_places = site_settings.decimal_places or 2
cart_items = []
logger.info(f"EDIT_PURCHASE: Processing {purchase.items.count()} items for purchase {pk}")
for item in purchase.items.all().select_related('product'):
# Debugging attributes
if hasattr(item, 'unit_price'):
logger.warning(f"Item {item.id} has unit_price attribute! {item.unit_price}")
cart_items.append({
'id': item.product.id,
'name_en': item.product.name_en,
'name_ar': item.product.name_ar,
'sku': item.product.sku,
'cost_price': float(item.cost_price),
'quantity': float(item.quantity)
})
cart_json = json.dumps(cart_items)
payment_method_id = ""
first_payment = purchase.payments.first()
if first_payment and first_payment.payment_method:
payment_method_id = first_payment.payment_method.id
return render(request, 'core/purchase_edit.html', {
'purchase': purchase,
'suppliers': suppliers,
'products': products,
'payment_methods': payment_methods,
'site_settings': site_settings,
'cart_json': cart_json,
'payment_method_id': payment_method_id,
'decimal_places': decimal_places
})
@login_required
def add_purchase_payment(request, pk):
purchase = get_object_or_404(Purchase, pk=pk)
if request.method == 'POST':
amount = decimal.Decimal(request.POST.get('amount', 0))
payment_method_id = request.POST.get('payment_method')
pm_name = "Cash"
if payment_method_id:
try:
pm = PaymentMethod.objects.get(id=payment_method_id)
pm_name = pm.name_en
except PaymentMethod.DoesNotExist:
pass
PurchasePayment.objects.create(
purchase=purchase,
amount=amount,
payment_method_id=payment_method_id,
payment_method_name=pm_name,
created_by=request.user,
notes=request.POST.get('notes', '')
)
purchase.update_balance()
messages.success(request, _("Payment added."))
return redirect('purchase_detail', pk=pk)
@login_required
def delete_purchase(request, pk):
purchase = get_object_or_404(Purchase, pk=pk)
# Restore stock (reverse of purchase)
for item in purchase.items.all():
item.product.stock_quantity -= item.quantity
item.product.save()
purchase.delete()
messages.success(request, _("Purchase deleted."))
return redirect('purchases')
@login_required
@login_required
def supplier_payments(request):
payments = PurchasePayment.objects.select_related('purchase', 'purchase__supplier', 'created_by').all().order_by('-payment_date')
search_query = request.GET.get('search', '')
if search_query:
payments = payments.filter(
Q(purchase__invoice_number__icontains=search_query) |
Q(purchase__supplier__name__icontains=search_query) |
Q(notes__icontains=search_query) |
Q(payment_method_name__icontains=search_query)
)
paginator = Paginator(payments, 20)
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
return render(request, 'core/supplier_payments.html', {
'payments': page_obj,
'search_query': search_query
})
# --- Purchase Returns ---
@login_required
def purchase_returns(request):
returns = PurchaseReturn.objects.all().order_by('-created_at')
return render(request, 'core/purchase_returns.html', {'returns': returns})
@login_required
def purchase_return_create(request):
suppliers = Supplier.objects.all()
products = Product.objects.filter(is_active=True)
return render(request, 'core/purchase_return_create.html', {
'suppliers': suppliers,
'products': products
})
@login_required
def purchase_return_detail(request, pk):
purchase_return = get_object_or_404(PurchaseReturn, pk=pk)
return render(request, 'core/purchase_return_detail.html', {'purchase_return': purchase_return})
@login_required
def delete_purchase_return(request, pk):
purchase_return = get_object_or_404(PurchaseReturn, pk=pk)
# Restore stock
for item in purchase_return.items.all():
item.product.stock_quantity += item.quantity
item.product.save()
purchase_return.delete()
messages.success(request, _("Purchase Return deleted."))
return redirect('purchase_returns')
# --- Expenses ---
@login_required
def expenses_view(request):
expenses = Expense.objects.select_related('category', 'payment_method').all().order_by('-date')
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
category_id = request.GET.get('category')
if start_date:
expenses = expenses.filter(date__gte=start_date)
if end_date:
expenses = expenses.filter(date__lte=end_date)
if category_id:
expenses = expenses.filter(category_id=category_id)
total_expenses = expenses.aggregate(total=Sum('amount'))['total'] or 0
site_settings = SystemSetting.objects.first()
if not site_settings:
site_settings = SystemSetting.objects.create()
categories = ExpenseCategory.objects.all()
payment_methods = PaymentMethod.objects.filter(is_active=True)
paginator = Paginator(expenses, 20)
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
return render(request, 'core/expenses.html', {
'expenses': page_obj,
'categories': categories,
'payment_methods': payment_methods,
'site_settings': site_settings,
'total_expenses': total_expenses
})
@login_required
def expense_create_view(request):
if request.method == 'POST':
form = ExpenseForm(request.POST, request.FILES)
if form.is_valid():
expense = form.save(commit=False)
expense.created_by = request.user
expense.save()
messages.success(request, _("Expense added."))
return redirect('expenses')
else:
form = ExpenseForm()
return render(request, 'core/expense_form.html', {'form': form})
@login_required
def expense_edit_view(request, pk):
expense = get_object_or_404(Expense, pk=pk)
if request.method == 'POST':
form = ExpenseForm(request.POST, request.FILES, instance=expense)
if form.is_valid():
form.save()
messages.success(request, _("Expense updated."))
return redirect('expenses')
else:
form = ExpenseForm(instance=expense)
return render(request, 'core/expense_form.html', {'form': form})
@login_required
def expense_delete_view(request, pk):
expense = get_object_or_404(Expense, pk=pk)
expense.delete()
messages.success(request, _("Expense deleted."))
return redirect('expenses')
@login_required
def expense_categories_view(request):
categories = ExpenseCategory.objects.all()
if request.method == 'POST':
name_en = request.POST.get('name_en')
name_ar = request.POST.get('name_ar')
ExpenseCategory.objects.create(name_en=name_en, name_ar=name_ar)
messages.success(request, _("Category added."))
return redirect('expense_categories')
return render(request, 'core/expense_categories.html', {'categories': categories})
@login_required
def expense_category_delete_view(request, pk):
category = get_object_or_404(ExpenseCategory, pk=pk)
category.delete()
messages.success(request, _("Category deleted."))
return redirect('expense_categories')
@login_required
def expense_report(request):
return render(request, 'core/expense_report.html')
@login_required
def export_expenses_excel(request):
return redirect('expenses')
# --- Reports ---
@login_required
def reports(request):
return render(request, 'core/reports.html')
@login_required
def customer_statement(request):
customers = Customer.objects.all().order_by('name')
selected_customer = None
sales = []
# Totals
total_amount = 0
total_paid = 0
total_balance = 0
customer_id = request.GET.get('customer')
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
if customer_id:
selected_customer = get_object_or_404(Customer, id=customer_id)
sales = Sale.objects.filter(customer=selected_customer).order_by('created_at')
if start_date:
sales = sales.filter(created_at__date__gte=start_date)
if end_date:
sales = sales.filter(created_at__date__lte=end_date)
# Calculate totals
aggregates = sales.aggregate(
sum_total=Sum('total_amount'),
sum_paid=Sum('paid_amount'),
sum_balance=Sum('balance_due')
)
total_amount = aggregates['sum_total'] or 0
total_paid = aggregates['sum_paid'] or 0
total_balance = aggregates['sum_balance'] or 0
context = {
'customers': customers,
'selected_customer': selected_customer,
'sales': sales,
'start_date': start_date,
'end_date': end_date,
'total_amount': total_amount,
'total_paid': total_paid,
'total_balance': total_balance
}
return render(request, 'core/customer_statement.html', context)
@login_required
def supplier_statement(request):
suppliers = Supplier.objects.all().order_by('name')
selected_supplier = None
purchases = []
# Totals
total_amount = 0
total_paid = 0
total_balance = 0
supplier_id = request.GET.get('supplier')
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
if supplier_id:
selected_supplier = get_object_or_404(Supplier, id=supplier_id)
purchases = Purchase.objects.filter(supplier=selected_supplier).order_by('created_at')
if start_date:
purchases = purchases.filter(created_at__date__gte=start_date)
if end_date:
purchases = purchases.filter(created_at__date__lte=end_date)
# Calculate totals
aggregates = purchases.aggregate(
sum_total=Sum('total_amount'),
sum_paid=Sum('paid_amount'),
sum_balance=Sum('balance_due')
)
total_amount = aggregates['sum_total'] or 0
total_paid = aggregates['sum_paid'] or 0
total_balance = aggregates['sum_balance'] or 0
context = {
'suppliers': suppliers,
'selected_supplier': selected_supplier,
'purchases': purchases,
'start_date': start_date,
'end_date': end_date,
'total_amount': total_amount,
'total_paid': total_paid,
'total_balance': total_balance
}
return render(request, 'core/supplier_statement.html', context)
@login_required
def cashflow_report(request):
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
sales = Sale.objects.all()
expenses = Expense.objects.all()
purchases = Purchase.objects.all()
if start_date:
sales = sales.filter(created_at__date__gte=start_date)
expenses = expenses.filter(date__gte=start_date)
purchases = purchases.filter(created_at__date__gte=start_date)
if end_date:
sales = sales.filter(created_at__date__lte=end_date)
expenses = expenses.filter(date__lte=end_date)
purchases = purchases.filter(created_at__date__lte=end_date)
total_sales = sales.aggregate(total=Sum('total_amount'))['total'] or 0
total_expenses = expenses.aggregate(total=Sum('amount'))['total'] or 0
total_purchases = purchases.aggregate(total=Sum('total_amount'))['total'] or 0
net_profit = total_sales - total_expenses - total_purchases
context = {
'total_sales': total_sales,
'total_expenses': total_expenses,
'total_purchases': total_purchases,
'net_profit': net_profit,
'start_date': start_date,
'end_date': end_date
}
return render(request, 'core/cashflow_report.html', context)
# --- Inventory / System ---
@login_required
def add_product(request):
if request.method == 'POST':
form = ProductForm(request.POST, request.FILES)
if form.is_valid():
form.save()
messages.success(request, _("Product added."))
return redirect(reverse('inventory') + '#items')
return redirect('inventory')
@login_required
def edit_product(request, pk):
product = get_object_or_404(Product, pk=pk)
if request.method == 'POST':
form = ProductForm(request.POST, request.FILES, instance=product)
if form.is_valid():
form.save()
messages.success(request, _("Product updated."))
return redirect(reverse('inventory') + '#items')
return redirect('inventory')
@login_required
def delete_product(request, pk):
product = get_object_or_404(Product, pk=pk)
product.delete()
messages.success(request, _("Product deleted."))
return redirect(reverse('inventory') + '#items')
@login_required
def barcode_labels(request):
products = Product.objects.all().order_by('name_en')
return render(request, 'core/barcode_labels.html', {'products': products})
@login_required
def suggest_sku(request):
while True:
# Generate 7 random digits
digits = ''.join([str(random.randint(0, 9)) for _ in range(7)])
new_sku = f"{digits}"
# Ensure it does not exist
if not Product.objects.filter(sku=new_sku).exists():
return JsonResponse({'sku': new_sku})
@login_required
def add_category(request):
if request.method == 'POST':
Category.objects.create(
name_en=request.POST.get('name_en'),
name_ar=request.POST.get('name_ar'),
slug=f"cat-{int(timezone.now().timestamp())}"
)
return redirect('inventory')
@login_required
def edit_category(request, pk):
category = get_object_or_404(Category, pk=pk)
if request.method == 'POST':
category.name_en = request.POST.get('name_en')
category.name_ar = request.POST.get('name_ar')
category.save()
messages.success(request, _("Category updated."))
return redirect(reverse('inventory') + '#categories-list')
@login_required
def delete_category(request, pk):
category = get_object_or_404(Category, pk=pk)
category.delete()
messages.success(request, _("Category deleted."))
return redirect(reverse('inventory') + '#categories-list')
@login_required
def add_unit(request):
if request.method == 'POST':
Unit.objects.create(
name_en=request.POST.get('name_en'),
name_ar=request.POST.get('name_ar'),
short_name=request.POST.get('short_name')
)
return redirect('inventory')
@login_required
def edit_unit(request, pk):
return redirect('inventory')
@login_required
def delete_unit(request, pk):
return redirect('inventory')
@login_required
def add_customer(request):
if request.method == 'POST':
Customer.objects.create(name=request.POST.get('name'), phone=request.POST.get('phone', ''))
return redirect('customers')
@login_required
def edit_customer(request, pk):
customer = get_object_or_404(Customer, pk=pk)
if request.method == 'POST':
customer.name = request.POST.get('name')
customer.phone = request.POST.get('phone')
customer.email = request.POST.get('email')
customer.address = request.POST.get('address')
customer.save()
return redirect('customers')
@login_required
def delete_customer(request, pk):
customer = get_object_or_404(Customer, pk=pk)
customer.delete()
return redirect('customers')
@login_required
def add_supplier(request):
if request.method == 'POST':
Supplier.objects.create(name=request.POST.get('name'), phone=request.POST.get('phone', ''))
return redirect('suppliers')
@login_required
def edit_supplier(request, pk):
supplier = get_object_or_404(Supplier, pk=pk)
if request.method == 'POST':
supplier.name = request.POST.get('name')
supplier.phone = request.POST.get('phone')
supplier.save()
return redirect('suppliers')
@login_required
def delete_supplier(request, pk):
supplier = get_object_or_404(Supplier, pk=pk)
supplier.delete()
return redirect('suppliers')
@login_required
def add_payment_method(request):
if request.method == 'POST':
PaymentMethod.objects.create(name_en=request.POST.get('name_en'), name_ar=request.POST.get('name_ar'))
return redirect('settings')
@login_required
def edit_payment_method(request, pk):
return redirect('settings')
@login_required
def delete_payment_method(request, pk):
return redirect('settings')
@login_required
def add_loyalty_tier(request):
return redirect('settings')
@login_required
def edit_loyalty_tier(request, pk):
return redirect('settings')
@login_required
def delete_loyalty_tier(request, pk):
return redirect('settings')
@login_required
def add_device(request):
if request.method == 'POST':
Device.objects.create(
name=request.POST.get('name'),
device_type=request.POST.get('device_type'),
connection_type=request.POST.get('connection_type'),
ip_address=request.POST.get('ip_address'),
port=request.POST.get('port') or None,
is_active=request.POST.get('is_active') == 'on'
)
return redirect(reverse('settings') + '#devices')
@login_required
def edit_device(request, pk):
device = get_object_or_404(Device, pk=pk)
if request.method == 'POST':
device.name = request.POST.get('name')
device.device_type = request.POST.get('device_type')
device.connection_type = request.POST.get('connection_type')
device.ip_address = request.POST.get('ip_address')
device.port = request.POST.get('port') or None
device.is_active = request.POST.get('is_active') == 'on'
device.save()
return redirect(reverse('settings') + '#devices')
@login_required
def delete_device(request, pk):
device = get_object_or_404(Device, pk=pk)
device.delete()
return redirect(reverse('settings') + '#devices')
@login_required
def test_whatsapp_connection(request):
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Invalid method'})
try:
data = json.loads(request.body)
phone = data.get('phone')
if not phone:
return JsonResponse({'success': False, 'error': 'Phone number required'})
success, msg = send_whatsapp_message(phone, "Test message from Smart Admin")
if success:
return JsonResponse({'success': True, 'message': msg})
else:
# Enhanced Error Handling for Common Wablas Issues
error_msg = str(msg)
if "Access denied" in error_msg and "IP" in error_msg:
error_msg += "
Action Required: Go to Wablas Dashboard > Security and whitelist the IP shown in this error."
return JsonResponse({'success': False, 'error': error_msg})
except Exception as e:
logger.error(f"WhatsApp Test Error: {e}")
return JsonResponse({'success': False, 'error': str(e)})
# --- PDF & WhatsApp Helpers ---
def get_pdf_context(obj, doc_type):
settings = SystemSetting.objects.first()
if not settings:
settings = SystemSetting.objects.create()
amount = 0
if doc_type == 'quotation':
amount = obj.total_amount
elif doc_type == 'invoice':
amount = obj.total_amount
elif doc_type == 'lpo':
amount = obj.total_amount
return {
doc_type: obj,
'sale' if doc_type == 'invoice' else doc_type: obj,
'settings': settings,
'site_settings': settings,
'amount_in_words': number_to_words_en(amount)
}
def generate_pdf_file(template, context, request):
from weasyprint import HTML
html_string = render_to_string(template, context, request=request)
base_url = request.build_absolute_uri('/')
return HTML(string=html_string, base_url=base_url).write_pdf()
@login_required
def download_invoice_pdf(request, pk):
sale = get_object_or_404(Sale, pk=pk)
context = get_pdf_context(sale, 'invoice')
pdf = generate_pdf_file('pdf/invoice_pdf.html', context, request)
response = HttpResponse(pdf, content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename="Invoice_{sale.invoice_number or sale.id}.pdf"'
return response
@login_required
def send_invoice_whatsapp(request):
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Method not allowed'})
try:
# Simple JSON handling
data = json.loads(request.body)
sale_id = data.get('sale_id')
phone = data.get('phone')
except json.JSONDecodeError:
sale_id = request.POST.get('sale_id')
phone = request.POST.get('phone')
if not sale_id:
return JsonResponse({'success': False, 'error': 'Sale ID missing'})
sale = get_object_or_404(Sale, pk=sale_id)
if not phone:
if sale.customer and sale.customer.phone:
phone = sale.customer.phone
else:
return JsonResponse({'success': False, 'error': 'Phone number missing'})
try:
# Generate PDF Server-Side
context = get_pdf_context(sale, 'invoice')
pdf_bytes = generate_pdf_file('pdf/invoice_pdf.html', context, request)
# Save to temp
dir_path = os.path.join(django_settings.MEDIA_ROOT, 'temp_invoices')
os.makedirs(dir_path, exist_ok=True)
filename = f"invoice_{sale.id}_{int(timezone.now().timestamp())}.pdf"
file_path = os.path.join(dir_path, filename)
with open(file_path, 'wb') as f:
f.write(pdf_bytes)
file_url = request.build_absolute_uri(django_settings.MEDIA_URL + 'temp_invoices/' + filename)
success, response_msg = send_whatsapp_document(phone, file_url, caption=f"Invoice #{sale.invoice_number or sale.id}")
if success:
return JsonResponse({'success': True, 'message': response_msg})
else:
error_msg = str(response_msg)
if "Access denied" in error_msg and "IP" in error_msg:
error_msg += "
Action Required: Go to Wablas Dashboard > Security and whitelist the IP shown in this error."
return JsonResponse({'success': False, 'error': error_msg})
except Exception as e:
logger.error(f"WhatsApp Error: {e}")
return JsonResponse({'success': False, 'error': str(e)})
@login_required
def download_quotation_pdf(request, pk):
quotation = get_object_or_404(Quotation, pk=pk)
context = get_pdf_context(quotation, 'quotation')
pdf = generate_pdf_file('pdf/quotation_pdf.html', context, request)
response = HttpResponse(pdf, content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename="Quotation_{quotation.quotation_number or quotation.id}.pdf"'
return response
@login_required
def send_quotation_whatsapp(request):
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Method not allowed'})
try:
data = json.loads(request.body)
quotation_id = data.get('quotation_id')
phone = data.get('phone')
except json.JSONDecodeError:
quotation_id = request.POST.get('quotation_id')
phone = request.POST.get('phone')
if not quotation_id:
return JsonResponse({'success': False, 'error': 'Quotation ID missing'})
quotation = get_object_or_404(Quotation, pk=quotation_id)
if not phone:
if quotation.customer and quotation.customer.phone:
phone = quotation.customer.phone
else:
return JsonResponse({'success': False, 'error': 'Phone number missing'})
try:
context = get_pdf_context(quotation, 'quotation')
pdf_bytes = generate_pdf_file('pdf/quotation_pdf.html', context, request)
dir_path = os.path.join(django_settings.MEDIA_ROOT, 'temp_quotations')
os.makedirs(dir_path, exist_ok=True)
filename = f"quotation_{quotation.id}_{int(timezone.now().timestamp())}.pdf"
file_path = os.path.join(dir_path, filename)
with open(file_path, 'wb') as f:
f.write(pdf_bytes)
file_url = request.build_absolute_uri(django_settings.MEDIA_URL + 'temp_quotations/' + filename)
success, response_msg = send_whatsapp_document(phone, file_url, caption=f"Quotation #{quotation.quotation_number or quotation.id}")
if success:
return JsonResponse({'success': True, 'message': response_msg})
else:
error_msg = str(response_msg)
if "Access denied" in error_msg and "IP" in error_msg:
error_msg += "
Action Required: Go to Wablas Dashboard > Security and whitelist the IP shown in this error."
return JsonResponse({'success': False, 'error': error_msg})
except Exception as e:
logger.error(f"WhatsApp Error: {e}")
return JsonResponse({'success': False, 'error': str(e)})
@login_required
def download_lpo_pdf(request, pk):
lpo = get_object_or_404(PurchaseOrder, pk=pk)
context = get_pdf_context(lpo, 'lpo')
pdf = generate_pdf_file('pdf/lpo_pdf.html', context, request)
response = HttpResponse(pdf, content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename="LPO_{lpo.lpo_number or lpo.id}.pdf"'
return response
@login_required
def send_lpo_whatsapp(request):
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Method not allowed'})
try:
data = json.loads(request.body)
lpo_id = data.get('lpo_id')
phone = data.get('phone')
except json.JSONDecodeError:
lpo_id = request.POST.get('lpo_id')
phone = request.POST.get('phone')
if not lpo_id:
return JsonResponse({'success': False, 'error': 'LPO ID missing'})
lpo = get_object_or_404(PurchaseOrder, pk=lpo_id)
if not phone:
if lpo.supplier and lpo.supplier.phone:
phone = lpo.supplier.phone
else:
return JsonResponse({'success': False, 'error': 'Phone number missing'})
try:
context = get_pdf_context(lpo, 'lpo')
pdf_bytes = generate_pdf_file('pdf/lpo_pdf.html', context, request)
dir_path = os.path.join(django_settings.MEDIA_ROOT, 'temp_lpos')
os.makedirs(dir_path, exist_ok=True)
filename = f"lpo_{lpo.id}_{int(timezone.now().timestamp())}.pdf"
file_path = os.path.join(dir_path, filename)
with open(file_path, 'wb') as f:
f.write(pdf_bytes)
file_url = request.build_absolute_uri(django_settings.MEDIA_URL + 'temp_lpos/' + filename)
success, response_msg = send_whatsapp_document(phone, file_url, caption=f"LPO #{lpo.lpo_number or lpo.id}")
if success and lpo.status == 'draft':
lpo.status = 'sent'
lpo.save()
if success:
return JsonResponse({'success': True, 'message': response_msg})
else:
error_msg = str(response_msg)
if "Access denied" in error_msg and "IP" in error_msg:
error_msg += "
Action Required: Go to Wablas Dashboard > Security and whitelist the IP shown in this error."
return JsonResponse({'success': False, 'error': error_msg})
except Exception as e:
logger.error(f"WhatsApp Error: {e}")
return JsonResponse({'success': False, 'error': str(e)})
# --- LPO ---
@login_required
def lpo_list(request):
lpos = PurchaseOrder.objects.all().order_by('-created_at')
return render(request, 'core/lpo_list.html', {'lpos': lpos})
@login_required
def lpo_create(request):
suppliers = Supplier.objects.all()
products = Product.objects.filter(is_active=True)
return render(request, 'core/lpo_create.html', {'suppliers': suppliers, 'products': products})
@login_required
def lpo_detail(request, pk):
lpo = get_object_or_404(PurchaseOrder, pk=pk)
settings = None
try:
settings = SystemSetting.objects.first()
except Exception:
pass
return render(request, 'core/lpo_detail.html', {'lpo': lpo, 'settings': settings})
@login_required
def convert_lpo_to_purchase(request, pk):
return redirect('purchases')
@login_required
def lpo_delete(request, pk):
lpo = get_object_or_404(PurchaseOrder, pk=pk)
lpo.delete()
return redirect('lpo_list')
@csrf_exempt
@login_required
def create_lpo_api(request):
return JsonResponse({'success': True})
# --- Cashier / Sessions ---
@login_required
def cashier_registry(request):
registries = CashierCounterRegistry.objects.select_related('user', 'counter').all()
# Provide data for dropdowns
users = User.objects.filter(is_active=True).order_by('username')
counters = Device.objects.all().order_by('name')
return render(request, 'core/cashier_registry.html', {
'registries': registries,
'users': users,
'counters': counters
})
@login_required
def cashier_session_list(request):
sessions = CashierSession.objects.all().order_by('-start_time')
return render(request, 'core/session_list.html', {'sessions': sessions})
@login_required
def start_session(request):
if request.method == 'POST':
CashierSession.objects.create(user=request.user, opening_balance=request.POST.get('opening_balance', 0))
return redirect('pos')
return render(request, 'core/start_session.html')
@login_required
def close_session(request):
session = CashierSession.objects.filter(user=request.user, status='active').first()
if request.method == 'POST' and session:
session.closing_balance = request.POST.get('closing_balance', 0)
session.status = 'closed'
session.end_time = timezone.now()
session.save()
return redirect('index')
return render(request, 'core/close_session.html', {'session': session})
@login_required
def session_detail(request, pk):
session = get_object_or_404(CashierSession, pk=pk)
return render(request, 'core/session_detail.html', {'session': session})
# --- APIs ---
@csrf_exempt
@login_required
def create_sale_api(request):
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Invalid request'})
try:
data = json.loads(request.body)
with transaction.atomic():
sale = Sale.objects.create(
customer_id=data.get('customer_id') or None,
invoice_number=data.get('invoice_number', ''),
subtotal=data.get('subtotal', 0),
vat_amount=data.get('vat_amount', 0),
total_amount=data.get('total_amount', 0),
paid_amount=data.get('paid_amount', 0),
payment_type=data.get('payment_type', 'cash'),
created_by=request.user,
status='paid' if data.get('payment_type') == 'cash' else 'partial',
discount=data.get('discount', 0),
loyalty_points_redeemed=data.get('loyalty_points_redeemed', 0),
notes=data.get('notes', ''),
due_date=data.get('due_date') if data.get('due_date') else None
)
for item in data.get('items', []):
SaleItem.objects.create(
sale=sale,
product_id=item['id'],
quantity=item['quantity'],
unit_price=item['price'],
line_total=float(item['quantity']) * float(item['price'])
)
Product.objects.filter(pk=item['id']).update(stock_quantity=F('stock_quantity') - item['quantity'])
# Payment
if sale.paid_amount > 0:
SalePayment.objects.create(
sale=sale,
amount=sale.paid_amount,
payment_method_id=data.get('payment_method_id'),
created_by=request.user
)
# Build Response Data for JS Receipt
settings = None
try:
settings = SystemSetting.objects.first()
except Exception:
pass
business_info = {
'name': settings.business_name if settings else 'Business Name',
'address': settings.address if settings else '',
'phone': settings.phone if settings else '',
'email': settings.email if settings else '',
'currency': settings.currency_symbol if settings else '$',
'vat_number': settings.vat_number if settings else '',
'registration_number': settings.registration_number if settings else '',
'logo_url': settings.logo.url if settings and settings.logo else ""
}
sale_info = {
'id': sale.id,
'created_at': sale.created_at.strftime('%Y-%m-%d %H:%M'),
'customer_name': sale.customer.name if sale.customer else 'Guest',
'subtotal': float(sale.subtotal) if hasattr(sale, 'subtotal') else float(sale.total_amount) - float(sale.vat_amount),
'vat_amount': float(sale.vat_amount),
'total': float(sale.total_amount),
'discount': float(sale.discount),
'items': [
{
'name_en': item.product.name_en,
'name_ar': item.product.name_ar,
'qty': float(item.quantity),
'total': float(item.line_total)
} for item in sale.items.all().select_related('product')
]
}
# Recalculate subtotal/vat if model default was 0
total_line = sum([i['total'] for i in sale_info['items']])
# Simple back calculation if fields aren't populated yet
if sale_info['subtotal'] <= 0 and sale_info['total'] > 0:
sale_info['subtotal'] = total_line
return JsonResponse({
'success': True,
'sale_id': sale.id,
'business': business_info,
'sale': sale_info
})
except Exception as e:
logger.error(f"Sale Error: {e}")
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@login_required
def update_sale_api(request, pk):
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Invalid request'})
try:
data = json.loads(request.body)
sale = get_object_or_404(Sale, pk=pk)
with transaction.atomic():
# 1. Restore Stock from OLD Items
for item in sale.items.all():
Product.objects.filter(pk=item.product_id).update(stock_quantity=F('stock_quantity') + item.quantity)
# 2. Delete OLD Items
sale.items.all().delete()
# 3. Update Sale Details
sale.customer_id = data.get('customer_id') or None
sale.total_amount = data.get('total_amount', 0)
sale.discount = data.get('discount', 0)
sale.notes = data.get('notes', '')
sale.invoice_number = data.get('invoice_number', sale.invoice_number)
sale.payment_type = data.get('payment_type', sale.payment_type)
sale.paid_amount = data.get('paid_amount', sale.paid_amount)
if data.get('due_date'):
sale.due_date = data.get('due_date')
sale.save()
# 4. Create NEW Items and Deduct Stock
for item_data in data.get('items', []):
qty = decimal.Decimal(str(item_data['quantity']))
price = decimal.Decimal(str(item_data['price']))
product_id = item_data['id']
SaleItem.objects.create(
sale=sale,
product_id=product_id,
quantity=qty,
unit_price=price,
line_total=qty * price
)
# Deduct Stock
Product.objects.filter(pk=product_id).update(stock_quantity=F('stock_quantity') - qty)
# 5. Handle Payment Update
sale.update_balance()
return JsonResponse({'success': True})
except Exception as e:
logger.error(f"Update Sale Error: {e}")
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@login_required
def create_purchase_api(request):
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Invalid request'})
try:
data = json.loads(request.body)
with transaction.atomic():
payment_type = data.get('payment_type', 'cash')
paid_amount = decimal.Decimal(str(data.get('paid_amount', 0)))
total_amount = decimal.Decimal(str(data.get('total_amount', 0)))
status = 'paid'
if payment_type == 'credit':
status = 'unpaid'
elif payment_type == 'partial' or (paid_amount < total_amount and paid_amount > 0):
status = 'partial'
elif paid_amount == 0 and total_amount > 0:
status = 'unpaid'
purchase = Purchase.objects.create(
supplier_id=data.get('supplier_id') or None,
invoice_number=data.get('invoice_number', ''),
total_amount=total_amount,
paid_amount=paid_amount,
payment_type=payment_type,
due_date=data.get('due_date') or None,
notes=data.get('notes', ''),
created_by=request.user,
status=status
)
for item in data.get('items', []):
PurchaseItem.objects.create(
purchase=purchase,
product_id=item['id'],
quantity=item['quantity'],
cost_price=item['price'],
line_total=float(item['quantity']) * float(item['price'])
)
# Increase Stock
Product.objects.filter(pk=item['id']).update(stock_quantity=F('stock_quantity') + item['quantity'])
# Payment
if purchase.paid_amount > 0:
PurchasePayment.objects.create(
purchase=purchase,
amount=purchase.paid_amount,
payment_method_id=data.get('payment_method_id'),
created_by=request.user
)
purchase.update_balance()
return JsonResponse({'success': True, 'purchase_id': purchase.id})
except Exception as e:
logger.error(f"Error creating purchase: {e}")
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@login_required
def update_purchase_api(request, pk):
return JsonResponse({'success': True})
@csrf_exempt
@login_required
def create_sale_return_api(request):
if request.method != 'POST':
return JsonResponse({'success': False})
try:
data = json.loads(request.body)
with transaction.atomic():
sale_return = SaleReturn.objects.create(
customer_id=data.get('customer_id'),
created_by=request.user,
total_amount=0
)
for item in data.get('items', []):
SaleReturnItem.objects.create(
sale_return=sale_return,
product_id=item['id'],
quantity=item['quantity'],
unit_price=item['price'],
line_total=float(item['quantity']) * float(item['price'])
)
Product.objects.filter(pk=item['id']).update(stock_quantity=F('stock_quantity') + item['quantity'])
sale_return.total_amount = sum([i.line_total for i in sale_return.items.all()])
sale_return.save()
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@login_required
def create_purchase_return_api(request):
if request.method != 'POST':
return JsonResponse({'success': False})
try:
data = json.loads(request.body)
with transaction.atomic():
pr = PurchaseReturn.objects.create(
supplier_id=data.get('supplier_id'),
created_by=request.user,
total_amount=0
)
for item in data.get('items', []):
PurchaseReturnItem.objects.create(
purchase_return=pr,
product_id=item['id'],
quantity=item['quantity'],
cost_price=item['price'],
line_total=float(item['quantity']) * float(item['price'])
)
Product.objects.filter(pk=item['id']).update(stock_quantity=F('stock_quantity') - item['quantity'])
pr.total_amount = sum([i.line_total for i in pr.items.all()])
pr.save()
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@login_required
def add_customer_ajax(request):
if request.method != 'POST':
return JsonResponse({'success': False})
try:
data = json.loads(request.body)
Customer.objects.create(name=data.get('name'), phone=data.get('phone', ''))
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@login_required
def search_customers_api(request):
query = request.GET.get('q', '')
customers = Customer.objects.filter(
Q(name__icontains=query) | Q(phone__icontains=query)
).values('id', 'name', 'phone')[:10]
return JsonResponse({'results': list(customers)})
@csrf_exempt
@login_required
def add_supplier_ajax(request):
if request.method != 'POST':
return JsonResponse({'success': False})
try:
data = json.loads(request.body)
Supplier.objects.create(name=data.get('name'), phone=data.get('phone', ''))
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@login_required
def add_category_ajax(request):
if request.method != 'POST':
return JsonResponse({'success': False})
try:
data = json.loads(request.body)
Category.objects.create(
name_en=data.get('name_en'),
name_ar=data.get('name_ar'),
slug=f"cat-{int(timezone.now().timestamp())}"
)
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@login_required
def add_unit_ajax(request):
if request.method != 'POST':
return JsonResponse({'success': False})
try:
data = json.loads(request.body)
Unit.objects.create(
name_en=data.get('name_en'),
name_ar=data.get('name_ar'),
short_name=data.get('short_name')
)
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@login_required
def add_payment_method_ajax(request):
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Invalid method'})
try:
data = json.loads(request.body)
pm = PaymentMethod.objects.create(
name_en=data.get('name_en'),
name_ar=data.get('name_ar'),
is_active=data.get('is_active', True)
)
return JsonResponse({
'success': True,
'id': pm.id,
'name_en': pm.name_en,
'name_ar': pm.name_ar
})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@login_required
def get_customer_loyalty_api(request, pk):
return JsonResponse({'points': 0})
@csrf_exempt
@login_required
def hold_sale_api(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
customer_name = ""
customer_id = data.get('customer_id')
if customer_id:
try:
customer = Customer.objects.get(id=customer_id)
customer_name = customer.name
except (Customer.DoesNotExist, ValueError):
pass
# Store everything in cart_data JSON
cart_info = {
'items': data.get('items', []),
'customer_id': customer_id,
'customer_name': customer_name
}
HeldSale.objects.create(
created_by=request.user,
cart_data=json.dumps(cart_info),
customer_name=customer_name,
note=data.get('notes', "")
)
return JsonResponse({'success': True})
except Exception as e:
logger.error(f"Error holding sale: {str(e)}")
return JsonResponse({'success': False, 'error': str(e)})
return JsonResponse({'success': False, 'error': 'Only POST allowed'})
@login_required
def get_held_sales_api(request):
sales = HeldSale.objects.filter(created_by=request.user).order_by('-created_at')
sales_list = []
for s in sales:
try:
cart_info = json.loads(s.cart_data)
item_count = len(cart_info.get('items', []))
except:
item_count = 0
sales_list.append({
'id': s.id,
'customer_name': s.customer_name or _("Walk-in Customer"),
'note': s.note,
'created_at': s.created_at.strftime('%Y-%m-%d %H:%M'),
'item_count': item_count
})
return JsonResponse({'success': True, 'held_sales': sales_list})
@login_required
def recall_held_sale_api(request, pk):
sale = get_object_or_404(HeldSale, pk=pk, created_by=request.user)
try:
cart_info = json.loads(sale.cart_data)
# Support both old format (just items) and new format (dict with items, customer_id, etc)
if isinstance(cart_info, list):
items = cart_info
customer_id = ""
customer_name = sale.customer_name
else:
items = cart_info.get('items', [])
customer_id = cart_info.get('customer_id', "")
customer_name = cart_info.get('customer_name', sale.customer_name)
data = {
'success': True,
'items': items,
'customer_id': customer_id,
'customer_name': customer_name,
}
# Delete the held sale once recalled
sale.delete()
return JsonResponse(data)
except Exception as e:
logger.error(f"Error recalling sale: {str(e)}")
return JsonResponse({'success': False, 'error': str(e)})
@login_required
def delete_held_sale_api(request, pk):
sale = get_object_or_404(HeldSale, pk=pk, created_by=request.user)
sale.delete()
return JsonResponse({'success': True})
@login_required
def backup_database(request):
if not request.user.is_superuser:
messages.error(request, _("You are not authorized to perform this action."))
return redirect('settings')
db_settings = django_settings.DATABASES['default']
db_name = db_settings['NAME']
db_user = db_settings['USER']
db_password = db_settings['PASSWORD']
db_host = db_settings['HOST']
filename = f"backup_{db_name}_{timezone.now().strftime('%Y%m%d_%H%M%S')}.sql"
file_path = os.path.join(django_settings.BASE_DIR, 'tmp', filename)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Use mysqldump
command = f"mysqldump -h {db_host} -u {db_user} -p'{db_password}' {db_name} > {file_path}"
try:
subprocess.check_call(command, shell=True)
response = FileResponse(open(file_path, 'rb'), content_type='application/sql')
response['Content-Disposition'] = f'attachment; filename="{filename}"'
return response
except Exception as e:
messages.error(request, f"Backup failed: {str(e)}")
return redirect('settings')
@login_required
def restore_database(request):
if not request.user.is_superuser:
messages.error(request, _("You are not authorized to perform this action."))
return redirect('settings')
if request.method == 'POST' and request.FILES.get('backup_file'):
backup_file = request.FILES['backup_file']
# Security check: Ensure it's a sql file
if not backup_file.name.endswith('.sql'):
messages.error(request, _("Invalid file format. Please upload a .sql file."))
return redirect('settings')
db_settings = django_settings.DATABASES['default']
db_name = db_settings['NAME']
db_user = db_settings['USER']
db_password = db_settings['PASSWORD']
db_host = db_settings['HOST']
# Save uploaded file temporarily
temp_path = os.path.join(django_settings.BASE_DIR, 'tmp', 'restore.sql')
os.makedirs(os.path.dirname(temp_path), exist_ok=True)
with open(temp_path, 'wb+') as destination:
for chunk in backup_file.chunks():
destination.write(chunk)
# Use mysql to restore
command = f"mysql -h {db_host} -u {db_user} -p'{db_password}' {db_name} < {temp_path}"
try:
subprocess.check_call(command, shell=True)
messages.success(request, _("Database restored successfully!"))
except Exception as e:
messages.error(request, f"Restore failed: {str(e)}")
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
return redirect('settings')