from datetime import timedelta import jwt from django.conf import settings from django.contrib.auth.models import User from django.utils import timezone from .models import BusinessProfile class JWTAuthError(Exception): def __init__(self, message: str, *, code: str = 'token_invalid', status_code: int = 401): super().__init__(message) self.message = message self.code = code self.status_code = status_code def _timestamp(value): return int(value.timestamp()) def _base_payload(user: User, profile: BusinessProfile, *, token_type: str, expires_delta): now = timezone.now() return { 'sub': str(user.pk), 'username': user.username, 'type': token_type, 'token_version': profile.mobile_token_version, 'iat': _timestamp(now), 'nbf': _timestamp(now), 'exp': _timestamp(now + expires_delta), 'iss': settings.JWT_ISSUER, 'aud': settings.JWT_AUDIENCE, } def issue_token_pair(user: User): profile, _ = BusinessProfile.objects.get_or_create(user=user) access_lifetime = timedelta(minutes=settings.JWT_ACCESS_TOKEN_MINUTES) refresh_lifetime = timedelta(days=settings.JWT_REFRESH_TOKEN_DAYS) access_payload = _base_payload(user, profile, token_type='access', expires_delta=access_lifetime) refresh_payload = _base_payload(user, profile, token_type='refresh', expires_delta=refresh_lifetime) access_token = jwt.encode(access_payload, settings.SECRET_KEY, algorithm='HS256') refresh_token = jwt.encode(refresh_payload, settings.SECRET_KEY, algorithm='HS256') return { 'access': access_token, 'refresh': refresh_token, 'token_type': 'Bearer', 'access_expires_in': int(access_lifetime.total_seconds()), 'refresh_expires_in': int(refresh_lifetime.total_seconds()), } def decode_token(token: str, *, expected_type: str): try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=['HS256'], audience=settings.JWT_AUDIENCE, issuer=settings.JWT_ISSUER, options={'require': ['exp', 'iat', 'nbf', 'sub', 'type', 'token_version']}, ) except jwt.ExpiredSignatureError as exc: raise JWTAuthError('Token expired. Please log in again.', code='token_expired') from exc except jwt.InvalidTokenError as exc: raise JWTAuthError('Invalid token.', code='token_invalid') from exc token_type = payload.get('type') if token_type != expected_type: raise JWTAuthError(f'Expected a {expected_type} token.', code='token_type_invalid') return payload def get_user_from_payload(payload): user = User.objects.filter(pk=payload.get('sub'), is_active=True).first() if user is None: raise JWTAuthError('User account not found.', code='user_not_found') profile, _ = BusinessProfile.objects.get_or_create(user=user) if profile.mobile_token_version != payload.get('token_version'): raise JWTAuthError('Token is no longer valid. Please log in again.', code='token_revoked') return user, profile def authenticate_authorization_header(header_value: str): if not header_value: raise JWTAuthError('Authentication required.', code='auth_required') scheme, _, token = header_value.partition(' ') if scheme.lower() != 'bearer' or not token.strip(): raise JWTAuthError('Use Authorization: Bearer .', code='auth_header_invalid') payload = decode_token(token.strip(), expected_type='access') return get_user_from_payload(payload) def authenticate_refresh_token(refresh_token: str): if not refresh_token: raise JWTAuthError('Refresh token is required.', code='refresh_required') payload = decode_token(refresh_token.strip(), expected_type='refresh') return get_user_from_payload(payload) def revoke_user_tokens(user: User): profile, _ = BusinessProfile.objects.get_or_create(user=user) profile.mobile_token_version += 1 profile.save(update_fields=['mobile_token_version', 'updated_at']) return profile