113 lines
4.0 KiB
Python
113 lines
4.0 KiB
Python
from datetime import timedelta
|
|
|
|
import jwt
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import User
|
|
from django.utils import timezone
|
|
|
|
from .models import BusinessProfile
|
|
|
|
|
|
class JWTAuthError(Exception):
|
|
def __init__(self, message: str, *, code: str = 'token_invalid', status_code: int = 401):
|
|
super().__init__(message)
|
|
self.message = message
|
|
self.code = code
|
|
self.status_code = status_code
|
|
|
|
|
|
def _timestamp(value):
|
|
return int(value.timestamp())
|
|
|
|
|
|
def _base_payload(user: User, profile: BusinessProfile, *, token_type: str, expires_delta):
|
|
now = timezone.now()
|
|
return {
|
|
'sub': str(user.pk),
|
|
'username': user.username,
|
|
'type': token_type,
|
|
'token_version': profile.mobile_token_version,
|
|
'iat': _timestamp(now),
|
|
'nbf': _timestamp(now),
|
|
'exp': _timestamp(now + expires_delta),
|
|
'iss': settings.JWT_ISSUER,
|
|
'aud': settings.JWT_AUDIENCE,
|
|
}
|
|
|
|
|
|
def issue_token_pair(user: User):
|
|
profile, _ = BusinessProfile.objects.get_or_create(user=user)
|
|
access_lifetime = timedelta(minutes=settings.JWT_ACCESS_TOKEN_MINUTES)
|
|
refresh_lifetime = timedelta(days=settings.JWT_REFRESH_TOKEN_DAYS)
|
|
access_payload = _base_payload(user, profile, token_type='access', expires_delta=access_lifetime)
|
|
refresh_payload = _base_payload(user, profile, token_type='refresh', expires_delta=refresh_lifetime)
|
|
access_token = jwt.encode(access_payload, settings.SECRET_KEY, algorithm='HS256')
|
|
refresh_token = jwt.encode(refresh_payload, settings.SECRET_KEY, algorithm='HS256')
|
|
return {
|
|
'access': access_token,
|
|
'refresh': refresh_token,
|
|
'token_type': 'Bearer',
|
|
'access_expires_in': int(access_lifetime.total_seconds()),
|
|
'refresh_expires_in': int(refresh_lifetime.total_seconds()),
|
|
}
|
|
|
|
|
|
def decode_token(token: str, *, expected_type: str):
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
settings.SECRET_KEY,
|
|
algorithms=['HS256'],
|
|
audience=settings.JWT_AUDIENCE,
|
|
issuer=settings.JWT_ISSUER,
|
|
options={'require': ['exp', 'iat', 'nbf', 'sub', 'type', 'token_version']},
|
|
)
|
|
except jwt.ExpiredSignatureError as exc:
|
|
raise JWTAuthError('Token expired. Please log in again.', code='token_expired') from exc
|
|
except jwt.InvalidTokenError as exc:
|
|
raise JWTAuthError('Invalid token.', code='token_invalid') from exc
|
|
|
|
token_type = payload.get('type')
|
|
if token_type != expected_type:
|
|
raise JWTAuthError(f'Expected a {expected_type} token.', code='token_type_invalid')
|
|
return payload
|
|
|
|
|
|
def get_user_from_payload(payload):
|
|
user = User.objects.filter(pk=payload.get('sub'), is_active=True).first()
|
|
if user is None:
|
|
raise JWTAuthError('User account not found.', code='user_not_found')
|
|
|
|
profile, _ = BusinessProfile.objects.get_or_create(user=user)
|
|
if profile.mobile_token_version != payload.get('token_version'):
|
|
raise JWTAuthError('Token is no longer valid. Please log in again.', code='token_revoked')
|
|
|
|
return user, profile
|
|
|
|
|
|
def authenticate_authorization_header(header_value: str):
|
|
if not header_value:
|
|
raise JWTAuthError('Authentication required.', code='auth_required')
|
|
|
|
scheme, _, token = header_value.partition(' ')
|
|
if scheme.lower() != 'bearer' or not token.strip():
|
|
raise JWTAuthError('Use Authorization: Bearer <token>.', code='auth_header_invalid')
|
|
|
|
payload = decode_token(token.strip(), expected_type='access')
|
|
return get_user_from_payload(payload)
|
|
|
|
|
|
def authenticate_refresh_token(refresh_token: str):
|
|
if not refresh_token:
|
|
raise JWTAuthError('Refresh token is required.', code='refresh_required')
|
|
|
|
payload = decode_token(refresh_token.strip(), expected_type='refresh')
|
|
return get_user_from_payload(payload)
|
|
|
|
|
|
def revoke_user_tokens(user: User):
|
|
profile, _ = BusinessProfile.objects.get_or_create(user=user)
|
|
profile.mobile_token_version += 1
|
|
profile.save(update_fields=['mobile_token_version', 'updated_at'])
|
|
return profile
|