36 lines
1.2 KiB
Python
36 lines
1.2 KiB
Python
from rest_framework.authentication import BaseAuthentication
|
|
from rest_framework.exceptions import AuthenticationFailed
|
|
from django.utils import timezone
|
|
from .models import APIToken
|
|
|
|
class APITokenAuthentication(BaseAuthentication):
|
|
def authenticate(self, request):
|
|
auth_header = request.headers.get('Authorization')
|
|
if not auth_header:
|
|
return None
|
|
|
|
try:
|
|
# Support "Bearer <token>" and "Token <token>"
|
|
prefix, token = auth_header.split()
|
|
if prefix.lower() not in ['bearer', 'token']:
|
|
return None
|
|
except ValueError:
|
|
return None
|
|
|
|
return self._authenticate_credentials(token)
|
|
|
|
def _authenticate_credentials(self, key):
|
|
try:
|
|
token = APIToken.objects.select_related('user').get(token=key, is_active=True)
|
|
except APIToken.DoesNotExist:
|
|
raise AuthenticationFailed('Invalid or inactive API token')
|
|
|
|
if not token.user.is_active:
|
|
raise AuthenticationFailed('User is inactive or deleted')
|
|
|
|
# Update last used timestamp
|
|
token.last_used_at = timezone.now()
|
|
token.save(update_fields=['last_used_at'])
|
|
|
|
return (token.user, token)
|