Auto commit: 2026-04-17T04:26:07.442Z

This commit is contained in:
Flatlogic Bot 2026-04-17 04:26:07 +00:00
parent 22e473eb14
commit 0789752dc6
15 changed files with 368 additions and 35 deletions

View File

@ -152,4 +152,9 @@ LOGIN_URL = 'login'
LOGIN_REDIRECT_URL = 'home'
LOGOUT_REDIRECT_URL = 'home'
JWT_ACCESS_TOKEN_MINUTES = int(os.getenv('JWT_ACCESS_TOKEN_MINUTES', '30'))
JWT_REFRESH_TOKEN_DAYS = int(os.getenv('JWT_REFRESH_TOKEN_DAYS', '30'))
JWT_ISSUER = os.getenv('JWT_ISSUER', 'momoledger')
JWT_AUDIENCE = os.getenv('JWT_AUDIENCE', 'momoledger-mobile')
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'

Binary file not shown.

View File

@ -77,8 +77,9 @@ class TransactionForm(forms.Form):
transaction_type = forms.ChoiceField(choices=Transaction.TYPE_CHOICES, widget=forms.Select(attrs={'class': SELECT_CLASS}))
notes = forms.CharField(required=False, widget=forms.Textarea(attrs={'class': 'form-control momo-input', 'rows': 3, 'placeholder': 'Optional note about the transaction'}))
def __init__(self, *args, business=None, **kwargs):
def __init__(self, *args, business=None, instance=None, **kwargs):
self.business = business
self.instance = instance
super().__init__(*args, **kwargs)
def clean(self):
@ -88,6 +89,9 @@ class TransactionForm(forms.Form):
if not self.business or not transaction_type or amount is None:
return cleaned_data
if self.instance is not None:
return cleaned_data
ecash_delta, physical_delta, _ = Transaction.calculate_effect(transaction_type, amount)
if self.business.current_ecash + ecash_delta < 0:
self.add_error('amount', 'Not enough e-cash for this transaction.')
@ -98,6 +102,13 @@ class TransactionForm(forms.Form):
def save(self, user):
if not self.business:
raise ValidationError('Business profile is required.')
if self.instance is not None:
return self.instance.update_logged_transaction(
client_name=self.cleaned_data['client_name'],
amount=self.cleaned_data['amount'],
transaction_type=self.cleaned_data['transaction_type'],
notes=self.cleaned_data['notes'],
)
return Transaction.create_logged_transaction(
business=self.business,
user=user,

112
core/jwt_auth.py Normal file
View File

@ -0,0 +1,112 @@
from datetime import timedelta
import jwt
from django.conf import settings
from django.contrib.auth.models import User
from django.utils import timezone
from .models import BusinessProfile
class JWTAuthError(Exception):
def __init__(self, message: str, *, code: str = 'token_invalid', status_code: int = 401):
super().__init__(message)
self.message = message
self.code = code
self.status_code = status_code
def _timestamp(value):
return int(value.timestamp())
def _base_payload(user: User, profile: BusinessProfile, *, token_type: str, expires_delta):
now = timezone.now()
return {
'sub': str(user.pk),
'username': user.username,
'type': token_type,
'token_version': profile.mobile_token_version,
'iat': _timestamp(now),
'nbf': _timestamp(now),
'exp': _timestamp(now + expires_delta),
'iss': settings.JWT_ISSUER,
'aud': settings.JWT_AUDIENCE,
}
def issue_token_pair(user: User):
profile, _ = BusinessProfile.objects.get_or_create(user=user)
access_lifetime = timedelta(minutes=settings.JWT_ACCESS_TOKEN_MINUTES)
refresh_lifetime = timedelta(days=settings.JWT_REFRESH_TOKEN_DAYS)
access_payload = _base_payload(user, profile, token_type='access', expires_delta=access_lifetime)
refresh_payload = _base_payload(user, profile, token_type='refresh', expires_delta=refresh_lifetime)
access_token = jwt.encode(access_payload, settings.SECRET_KEY, algorithm='HS256')
refresh_token = jwt.encode(refresh_payload, settings.SECRET_KEY, algorithm='HS256')
return {
'access': access_token,
'refresh': refresh_token,
'token_type': 'Bearer',
'access_expires_in': int(access_lifetime.total_seconds()),
'refresh_expires_in': int(refresh_lifetime.total_seconds()),
}
def decode_token(token: str, *, expected_type: str):
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=['HS256'],
audience=settings.JWT_AUDIENCE,
issuer=settings.JWT_ISSUER,
options={'require': ['exp', 'iat', 'nbf', 'sub', 'type', 'token_version']},
)
except jwt.ExpiredSignatureError as exc:
raise JWTAuthError('Token expired. Please log in again.', code='token_expired') from exc
except jwt.InvalidTokenError as exc:
raise JWTAuthError('Invalid token.', code='token_invalid') from exc
token_type = payload.get('type')
if token_type != expected_type:
raise JWTAuthError(f'Expected a {expected_type} token.', code='token_type_invalid')
return payload
def get_user_from_payload(payload):
user = User.objects.filter(pk=payload.get('sub'), is_active=True).first()
if user is None:
raise JWTAuthError('User account not found.', code='user_not_found')
profile, _ = BusinessProfile.objects.get_or_create(user=user)
if profile.mobile_token_version != payload.get('token_version'):
raise JWTAuthError('Token is no longer valid. Please log in again.', code='token_revoked')
return user, profile
def authenticate_authorization_header(header_value: str):
if not header_value:
raise JWTAuthError('Authentication required.', code='auth_required')
scheme, _, token = header_value.partition(' ')
if scheme.lower() != 'bearer' or not token.strip():
raise JWTAuthError('Use Authorization: Bearer <token>.', code='auth_header_invalid')
payload = decode_token(token.strip(), expected_type='access')
return get_user_from_payload(payload)
def authenticate_refresh_token(refresh_token: str):
if not refresh_token:
raise JWTAuthError('Refresh token is required.', code='refresh_required')
payload = decode_token(refresh_token.strip(), expected_type='refresh')
return get_user_from_payload(payload)
def revoke_user_tokens(user: User):
profile, _ = BusinessProfile.objects.get_or_create(user=user)
profile.mobile_token_version += 1
profile.save(update_fields=['mobile_token_version', 'updated_at'])
return profile

View File

@ -0,0 +1,18 @@
# Generated by Django 5.2.7 on 2026-04-17 02:58
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='businessprofile',
name='mobile_token_version',
field=models.PositiveIntegerField(default=1),
),
]

View File

@ -13,6 +13,7 @@ class BusinessProfile(models.Model):
opening_physical_cash = models.DecimalField(max_digits=12, decimal_places=2, default=Decimal('0.00'))
current_ecash = models.DecimalField(max_digits=12, decimal_places=2, default=Decimal('0.00'))
current_physical_cash = models.DecimalField(max_digits=12, decimal_places=2, default=Decimal('0.00'))
mobile_token_version = models.PositiveIntegerField(default=1)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
@ -168,6 +169,11 @@ class Transaction(models.Model):
ecash_after = cls._round(ecash_before + ecash_delta)
physical_after = cls._round(physical_before + physical_delta)
if ecash_after < 0:
raise ValidationError('This change would make e-cash go below zero in your transaction history.')
if physical_after < 0:
raise ValidationError('This change would make physical cash go below zero in your transaction history.')
cls.objects.filter(pk=entry.pk).update(
service_charge=fee,
ecash_before=ecash_before,
@ -184,6 +190,21 @@ class Transaction(models.Model):
business.save(update_fields=['current_ecash', 'current_physical_cash', 'updated_at'])
return business
@transaction.atomic
def update_logged_transaction(self, *, client_name: str, amount: Decimal, transaction_type: str, notes: str = ''):
business = BusinessProfile.objects.select_for_update().get(pk=self.business_id)
self.client_name = client_name
self.amount = self.__class__._round(amount)
self.transaction_type = transaction_type
self.notes = notes
self.save(update_fields=['client_name', 'amount', 'transaction_type', 'notes'])
business = self.__class__.rebalance_business_ledger(business)
refreshed_entry = self.__class__.objects.select_related('created_by').get(pk=self.pk)
return {
'transaction': refreshed_entry,
'business': business,
}
@transaction.atomic
def delete_logged_transaction(self):
business = BusinessProfile.objects.select_for_update().get(pk=self.business_id)

View File

@ -5,6 +5,7 @@ from .views import (
api_login_view,
api_logout_view,
api_profile_view,
api_token_refresh_view,
api_transaction_detail_view,
api_transactions_view,
home,
@ -23,6 +24,8 @@ urlpatterns = [
path('', home, name='home'),
path('api/health/', api_health_view, name='api_health'),
path('api/login/', api_login_view, name='api_login'),
path('api/token/', api_login_view, name='api_token_login'),
path('api/token/refresh/', api_token_refresh_view, name='api_token_refresh'),
path('api/logout/', api_logout_view, name='api_logout'),
path('api/profile/', api_profile_view, name='api_profile'),
path('api/transactions/', api_transactions_view, name='api_transactions'),

View File

@ -1,5 +1,6 @@
import json
from datetime import datetime, time
from functools import wraps
from io import BytesIO
from django.contrib import messages
@ -14,15 +15,46 @@ from django.views.decorators.http import require_GET, require_POST, require_http
from django.utils import timezone
from .forms import BusinessProfileForm, LoginForm, ReportFilterForm, SignUpForm, TransactionForm
from .jwt_auth import (
JWTAuthError,
authenticate_authorization_header,
authenticate_refresh_token,
issue_token_pair,
revoke_user_tokens,
)
from .models import BusinessProfile, Transaction
def get_api_authenticated_user(request):
if request.user.is_authenticated:
return request.user
authorization = (request.META.get('HTTP_AUTHORIZATION') or '').strip()
if not authorization:
return None
user, _ = authenticate_authorization_header(authorization)
request.user = user
return user
def api_login_required(view_func):
@wraps(view_func)
def wrapped(request, *args, **kwargs):
if not request.user.is_authenticated:
try:
user = get_api_authenticated_user(request)
except JWTAuthError as exc:
return JsonResponse({
'ok': False,
'error': exc.message,
'code': exc.code,
}, status=exc.status_code)
if user is None:
return JsonResponse({
'ok': False,
'error': 'Authentication required.',
'code': 'auth_required',
}, status=401)
return view_func(request, *args, **kwargs)
@ -367,25 +399,62 @@ def api_health_view(request):
'app': 'MoMoLedger API',
'message': 'Android backend starter endpoints are available.',
'server_time': timezone.now().isoformat(),
'authenticated': request.user.is_authenticated,
'authenticated': bool(request.user.is_authenticated or (request.META.get('HTTP_AUTHORIZATION') or '').strip()),
})
@csrf_exempt
@require_POST
def api_login_view(request):
if request.user.is_authenticated:
profile = get_profile(request.user)
return JsonResponse({
'ok': True,
'message': 'Already logged in.',
'user': {
'username': request.user.username,
'email': request.user.email,
'business_name': profile.business_name,
},
})
user = request.user if request.user.is_authenticated else None
if user is None:
payload = parse_api_payload(request)
if payload is None:
return JsonResponse({
'ok': False,
'error': 'Invalid JSON body.',
}, status=400)
username = (payload.get('username') or '').strip()
password = payload.get('password') or ''
if not username or not password:
return JsonResponse({
'ok': False,
'error': 'Username and password are required.',
}, status=400)
user = authenticate(request, username=username, password=password)
if user is None:
return JsonResponse({
'ok': False,
'error': 'Invalid username or password.',
}, status=401)
login(request, user)
message = 'Login successful.'
else:
message = 'Already logged in.'
profile = get_profile(user)
tokens = issue_token_pair(user)
return JsonResponse({
'ok': True,
'message': message,
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'business_name': profile.business_name,
},
'tokens': tokens,
})
@csrf_exempt
@require_POST
def api_token_refresh_view(request):
payload = parse_api_payload(request)
if payload is None:
return JsonResponse({
@ -393,49 +462,77 @@ def api_login_view(request):
'error': 'Invalid JSON body.',
}, status=400)
username = (payload.get('username') or '').strip()
password = payload.get('password') or ''
if not username or not password:
refresh_token = (payload.get('refresh_token') or payload.get('refresh') or '').strip()
try:
user, profile = authenticate_refresh_token(refresh_token)
except JWTAuthError as exc:
return JsonResponse({
'ok': False,
'error': 'Username and password are required.',
}, status=400)
'error': exc.message,
'code': exc.code,
}, status=exc.status_code)
user = authenticate(request, username=username, password=password)
if user is None:
return JsonResponse({
'ok': False,
'error': 'Invalid username or password.',
}, status=401)
login(request, user)
profile = get_profile(user)
return JsonResponse({
'ok': True,
'message': 'Login successful.',
'message': 'Token refreshed successfully.',
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'business_name': profile.business_name,
},
'tokens': issue_token_pair(user),
})
@csrf_exempt
@require_POST
def api_logout_view(request):
if not request.user.is_authenticated:
payload = parse_api_payload(request)
if payload is None:
return JsonResponse({
'ok': False,
'error': 'Invalid JSON body.',
}, status=400)
user = request.user if request.user.is_authenticated else None
refresh_token = (payload.get('refresh_token') or payload.get('refresh') or '').strip()
if user is None and refresh_token:
try:
user, _ = authenticate_refresh_token(refresh_token)
except JWTAuthError as exc:
return JsonResponse({
'ok': False,
'error': exc.message,
'code': exc.code,
}, status=exc.status_code)
if user is None:
authorization = (request.META.get('HTTP_AUTHORIZATION') or '').strip()
if authorization:
try:
user, _ = authenticate_authorization_header(authorization)
except JWTAuthError as exc:
return JsonResponse({
'ok': False,
'error': exc.message,
'code': exc.code,
}, status=exc.status_code)
if user is None:
return JsonResponse({
'ok': True,
'message': 'No active session.',
})
logout(request)
revoke_user_tokens(user)
if request.user.is_authenticated:
logout(request)
return JsonResponse({
'ok': True,
'message': 'Logout successful.',
'message': 'Logout successful. Mobile tokens revoked.',
})
@ -522,12 +619,77 @@ def api_transactions_view(request):
@csrf_exempt
@api_login_required
@require_http_methods(['DELETE'])
@require_http_methods(['GET', 'PUT', 'PATCH', 'DELETE'])
def api_transaction_detail_view(request, transaction_id):
profile = get_profile(request.user)
entry = get_object_or_404(profile.transactions.select_related('created_by'), id=transaction_id)
if request.method == 'GET':
return JsonResponse({
'ok': True,
'transaction': serialize_transaction(entry),
'balances': {
'current_ecash': str(profile.current_ecash),
'current_physical_cash': str(profile.current_physical_cash),
'total_cash': str(profile.total_cash),
},
'summary': build_transaction_summary(profile),
})
if request.method in {'PUT', 'PATCH'}:
payload = parse_api_payload(request)
if payload is None:
return JsonResponse({
'ok': False,
'error': 'Invalid JSON body.',
}, status=400)
merged_payload = {
'client_name': entry.client_name,
'amount': str(entry.amount),
'transaction_type': entry.transaction_type,
'notes': entry.notes,
}
merged_payload.update(payload)
form = TransactionForm(merged_payload, business=profile, instance=entry)
if not form.is_valid():
return JsonResponse({
'ok': False,
'error': 'Validation failed.',
'errors': serialize_form_errors(form),
}, status=400)
try:
update_result = form.save(request.user)
except ValidationError as exc:
return JsonResponse({
'ok': False,
'error': exc.messages[0] if exc.messages else 'Could not update transaction.',
}, status=400)
profile = update_result['business']
entry = update_result['transaction']
return JsonResponse({
'ok': True,
'message': 'Transaction updated successfully.',
'transaction': serialize_transaction(entry),
'balances': {
'current_ecash': str(profile.current_ecash),
'current_physical_cash': str(profile.current_physical_cash),
'total_cash': str(profile.total_cash),
},
'summary': build_transaction_summary(profile),
})
deleted_transaction = serialize_transaction(entry)
delete_result = entry.delete_logged_transaction()
try:
delete_result = entry.delete_logged_transaction()
except ValidationError as exc:
return JsonResponse({
'ok': False,
'error': exc.messages[0] if exc.messages else 'Could not delete transaction.',
}, status=400)
profile = delete_result['business']
return JsonResponse({
'ok': True,

View File

@ -2,3 +2,4 @@ Django==5.2.7
mysqlclient==2.2.7
python-dotenv==1.1.1
reportlab==4.4.1
PyJWT==2.10.1