39710-vm/core/jwt_auth.py
2026-04-17 04:26:07 +00:00

113 lines
4.0 KiB
Python

from datetime import timedelta
import jwt
from django.conf import settings
from django.contrib.auth.models import User
from django.utils import timezone
from .models import BusinessProfile
class JWTAuthError(Exception):
def __init__(self, message: str, *, code: str = 'token_invalid', status_code: int = 401):
super().__init__(message)
self.message = message
self.code = code
self.status_code = status_code
def _timestamp(value):
return int(value.timestamp())
def _base_payload(user: User, profile: BusinessProfile, *, token_type: str, expires_delta):
now = timezone.now()
return {
'sub': str(user.pk),
'username': user.username,
'type': token_type,
'token_version': profile.mobile_token_version,
'iat': _timestamp(now),
'nbf': _timestamp(now),
'exp': _timestamp(now + expires_delta),
'iss': settings.JWT_ISSUER,
'aud': settings.JWT_AUDIENCE,
}
def issue_token_pair(user: User):
profile, _ = BusinessProfile.objects.get_or_create(user=user)
access_lifetime = timedelta(minutes=settings.JWT_ACCESS_TOKEN_MINUTES)
refresh_lifetime = timedelta(days=settings.JWT_REFRESH_TOKEN_DAYS)
access_payload = _base_payload(user, profile, token_type='access', expires_delta=access_lifetime)
refresh_payload = _base_payload(user, profile, token_type='refresh', expires_delta=refresh_lifetime)
access_token = jwt.encode(access_payload, settings.SECRET_KEY, algorithm='HS256')
refresh_token = jwt.encode(refresh_payload, settings.SECRET_KEY, algorithm='HS256')
return {
'access': access_token,
'refresh': refresh_token,
'token_type': 'Bearer',
'access_expires_in': int(access_lifetime.total_seconds()),
'refresh_expires_in': int(refresh_lifetime.total_seconds()),
}
def decode_token(token: str, *, expected_type: str):
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=['HS256'],
audience=settings.JWT_AUDIENCE,
issuer=settings.JWT_ISSUER,
options={'require': ['exp', 'iat', 'nbf', 'sub', 'type', 'token_version']},
)
except jwt.ExpiredSignatureError as exc:
raise JWTAuthError('Token expired. Please log in again.', code='token_expired') from exc
except jwt.InvalidTokenError as exc:
raise JWTAuthError('Invalid token.', code='token_invalid') from exc
token_type = payload.get('type')
if token_type != expected_type:
raise JWTAuthError(f'Expected a {expected_type} token.', code='token_type_invalid')
return payload
def get_user_from_payload(payload):
user = User.objects.filter(pk=payload.get('sub'), is_active=True).first()
if user is None:
raise JWTAuthError('User account not found.', code='user_not_found')
profile, _ = BusinessProfile.objects.get_or_create(user=user)
if profile.mobile_token_version != payload.get('token_version'):
raise JWTAuthError('Token is no longer valid. Please log in again.', code='token_revoked')
return user, profile
def authenticate_authorization_header(header_value: str):
if not header_value:
raise JWTAuthError('Authentication required.', code='auth_required')
scheme, _, token = header_value.partition(' ')
if scheme.lower() != 'bearer' or not token.strip():
raise JWTAuthError('Use Authorization: Bearer <token>.', code='auth_header_invalid')
payload = decode_token(token.strip(), expected_type='access')
return get_user_from_payload(payload)
def authenticate_refresh_token(refresh_token: str):
if not refresh_token:
raise JWTAuthError('Refresh token is required.', code='refresh_required')
payload = decode_token(refresh_token.strip(), expected_type='refresh')
return get_user_from_payload(payload)
def revoke_user_tokens(user: User):
profile, _ = BusinessProfile.objects.get_or_create(user=user)
profile.mobile_token_version += 1
profile.save(update_fields=['mobile_token_version', 'updated_at'])
return profile