40045-vm/myproject/orders/payments.py
2026-05-20 10:50:30 +00:00

498 lines
17 KiB
Python

import base64
import hashlib
import hmac
import json
import uuid
from decimal import Decimal, ROUND_HALF_UP
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode, urljoin
from urllib.request import Request, urlopen
from django.conf import settings
from django.urls import reverse
import stripe
ZERO_DECIMAL_CURRENCIES = {
'bif', 'clp', 'djf', 'gnf', 'jpy', 'kmf', 'krw', 'mga', 'pyg', 'rwf', 'ugx', 'vnd', 'vuv', 'xaf', 'xof', 'xpf'
}
class PaymentGatewayError(RuntimeError):
def __init__(self, message, *, details=''):
super().__init__(message)
self.details = details
def payment_currency():
return getattr(settings, 'PAYMENT_CURRENCY', 'NPR').upper()
def stripe_configured():
return bool(settings.STRIPE_SECRET_KEY)
def esewa_configured():
return all(
[
getattr(settings, 'ESEWA_PRODUCT_CODE', '').strip(),
getattr(settings, 'ESEWA_SECRET_KEY', '').strip(),
getattr(settings, 'ESEWA_FORM_URL', '').strip(),
getattr(settings, 'ESEWA_STATUS_URL', '').strip(),
]
)
def khalti_configured():
return all(
[
getattr(settings, 'KHALTI_SECRET_KEY', '').strip(),
getattr(settings, 'KHALTI_INITIATE_URL', '').strip(),
getattr(settings, 'KHALTI_LOOKUP_URL', '').strip(),
]
)
def stripe_value(obj, key, default=None):
if obj is None:
return default
if isinstance(obj, dict):
return obj.get(key, default)
getter = getattr(obj, 'get', None)
if callable(getter):
return getter(key, default)
return getattr(obj, key, default)
def _public_base_url(request):
if settings.PUBLIC_APP_URL:
return settings.PUBLIC_APP_URL
forwarded_proto = request.META.get('HTTP_X_FORWARDED_PROTO', '').split(',')[0].strip()
scheme = forwarded_proto or request.scheme or 'http'
host = request.get_host()
if host.endswith('.flatlogic.app'):
scheme = 'https'
return f'{scheme}://{host}'
def absolute_url(request, path):
return urljoin(f"{_public_base_url(request)}/", path.lstrip('/'))
def _format_amount(amount):
value = Decimal(amount).quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
return format(value, 'f')
def _to_minor_units(amount, currency):
value = Decimal(amount)
if currency.lower() in ZERO_DECIMAL_CURRENCIES:
return int(value.quantize(Decimal('1'), rounding=ROUND_HALF_UP))
return int((value * Decimal('100')).quantize(Decimal('1'), rounding=ROUND_HALF_UP))
def _configure_stripe():
stripe.api_key = settings.STRIPE_SECRET_KEY
def _line_items_for_order(order):
currency = settings.STRIPE_CURRENCY.lower()
line_items = []
for item in order.items.select_related('product').all():
line_items.append(
{
'price_data': {
'currency': currency,
'unit_amount': _to_minor_units(item.price, currency),
'product_data': {
'name': item.product.name,
'description': item.product.description[:500],
},
},
'quantity': int(item.quantity),
}
)
return line_items
def create_checkout_session(request, order):
if not stripe_configured():
raise RuntimeError('Stripe is not configured.')
_configure_stripe()
success_path = reverse('success')
cancel_path = reverse('payment', kwargs={'order_id': order.id})
success_url = f"{absolute_url(request, success_path)}?order_id={order.id}&session_id={{CHECKOUT_SESSION_ID}}"
cancel_url = f"{absolute_url(request, cancel_path)}?cancelled=1"
payload = {
'mode': 'payment',
'line_items': _line_items_for_order(order),
'success_url': success_url,
'cancel_url': cancel_url,
'client_reference_id': str(order.id),
'metadata': {
'order_id': str(order.id),
'user_id': str(order.user_id),
},
'payment_method_types': ['card'],
'billing_address_collection': 'auto',
'submit_type': 'pay',
}
if order.user.email:
payload['customer_email'] = order.user.email
return stripe.checkout.Session.create(**payload)
def retrieve_checkout_session(session_id):
if not stripe_configured():
raise RuntimeError('Stripe is not configured.')
_configure_stripe()
return stripe.checkout.Session.retrieve(session_id)
def construct_webhook_event(payload, signature):
if not stripe_configured() or not settings.STRIPE_WEBHOOK_SECRET:
raise RuntimeError('Stripe webhook is not configured.')
_configure_stripe()
return stripe.Webhook.construct_event(payload, signature, settings.STRIPE_WEBHOOK_SECRET)
def _extract_error_message(body):
try:
payload = json.loads(body)
except json.JSONDecodeError:
return (body or 'Unknown gateway error.').strip()[:300]
if isinstance(payload, dict):
if payload.get('detail'):
return str(payload['detail'])
if payload.get('error_message'):
return str(payload['error_message'])
for key, value in payload.items():
if isinstance(value, list) and value:
return f"{key}: {value[0]}"
return json.dumps(payload)[:300]
return str(payload)[:300]
def _json_request(url, *, method='GET', payload=None, headers=None, timeout=20):
request_headers = {
'Accept': 'application/json',
}
data = None
if payload is not None:
data = json.dumps(payload).encode('utf-8')
request_headers['Content-Type'] = 'application/json'
if headers:
request_headers.update(headers)
request = Request(url, data=data, headers=request_headers, method=method)
try:
with urlopen(request, timeout=timeout) as response:
body = response.read().decode('utf-8')
except HTTPError as exc:
error_body = exc.read().decode('utf-8', errors='replace')
raise PaymentGatewayError(
_extract_error_message(error_body) or f'Gateway request failed with HTTP {exc.code}.',
details=error_body,
) from exc
except URLError as exc:
raise PaymentGatewayError('Gateway request could not be completed.', details=str(exc.reason)) from exc
if not body:
return {}
try:
return json.loads(body)
except json.JSONDecodeError as exc:
raise PaymentGatewayError('Gateway returned an invalid response.', details=body[:500]) from exc
def _signed_field_names_list(signed_field_names):
return [field.strip() for field in str(signed_field_names).split(',') if field and field.strip()]
def _signed_message(payload, signed_field_names):
parts = []
for field_name in _signed_field_names_list(signed_field_names):
if field_name not in payload:
raise PaymentGatewayError(f'Missing signed field: {field_name}')
parts.append(f'{field_name}={payload[field_name]}')
return ','.join(parts)
def _hmac_sha256_base64(secret_key, message):
digest = hmac.new(secret_key.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).digest()
return base64.b64encode(digest).decode('utf-8')
def _first_present(payload, *keys):
for key in keys:
value = payload.get(key)
if value not in (None, ''):
return value
return ''
def build_esewa_redirect(request, order):
if not esewa_configured():
raise PaymentGatewayError('eSewa is not configured yet.')
transaction_uuid = f"ORD-{order.id}-{uuid.uuid4().hex[:12].upper()}"
total_amount = _format_amount(order.total_price)
success_url = f"{absolute_url(request, reverse('esewa_return'))}?order_id={order.id}"
failure_url = f"{absolute_url(request, reverse('esewa_return'))}?order_id={order.id}&failed=1"
fields = {
'amount': total_amount,
'tax_amount': '0',
'total_amount': total_amount,
'transaction_uuid': transaction_uuid,
'product_code': settings.ESEWA_PRODUCT_CODE,
'product_service_charge': '0',
'product_delivery_charge': '0',
'success_url': success_url,
'failure_url': failure_url,
'signed_field_names': 'total_amount,transaction_uuid,product_code',
}
fields['signature'] = _hmac_sha256_base64(
settings.ESEWA_SECRET_KEY,
_signed_message(fields, fields['signed_field_names']),
)
return {
'action_url': settings.ESEWA_FORM_URL,
'fields': fields,
'session_id': transaction_uuid,
'currency': payment_currency(),
}
def _decode_esewa_callback(encoded_data):
if not encoded_data:
raise PaymentGatewayError('eSewa did not return any payment payload.')
padded = encoded_data + ('=' * (-len(encoded_data) % 4))
try:
raw = base64.b64decode(padded)
except Exception as exc:
raise PaymentGatewayError('eSewa returned an unreadable payment payload.') from exc
try:
payload = json.loads(raw.decode('utf-8'))
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
raise PaymentGatewayError('eSewa returned an invalid payment payload.') from exc
if not isinstance(payload, dict):
raise PaymentGatewayError('eSewa returned an unexpected payment payload.')
return {str(key): '' if value is None else str(value) for key, value in payload.items()}
def _verify_esewa_callback_signature(payload):
signature = payload.get('signature', '').strip()
signed_field_names = payload.get('signed_field_names', '').strip()
if not signature or not signed_field_names:
raise PaymentGatewayError('eSewa callback signature is missing.')
expected_signature = _hmac_sha256_base64(
settings.ESEWA_SECRET_KEY,
_signed_message(payload, signed_field_names),
)
if not hmac.compare_digest(expected_signature, signature):
raise PaymentGatewayError('eSewa callback signature could not be verified.')
def _esewa_status_lookup(order):
params = urlencode(
{
'product_code': settings.ESEWA_PRODUCT_CODE,
'total_amount': _format_amount(order.total_price),
'transaction_uuid': order.payment_session_id,
}
)
base_url = settings.ESEWA_STATUS_URL.rstrip('/') + '/'
return _json_request(f'{base_url}?{params}')
def verify_esewa_payment(order, encoded_data):
if not esewa_configured():
raise PaymentGatewayError('eSewa is not configured yet.')
callback = _decode_esewa_callback(encoded_data.strip())
_verify_esewa_callback_signature(callback)
transaction_uuid = callback.get('transaction_uuid', '').strip()
if not transaction_uuid:
raise PaymentGatewayError('eSewa did not return a transaction id.')
if transaction_uuid != (order.payment_session_id or '').strip():
raise PaymentGatewayError('The returned eSewa session did not match the latest payment attempt for this order.')
callback_total = _format_amount(callback.get('total_amount', order.total_price))
order_total = _format_amount(order.total_price)
if callback_total != order_total:
raise PaymentGatewayError('The returned eSewa amount did not match this order total.')
callback_product_code = callback.get('product_code', '').strip()
if callback_product_code and callback_product_code != settings.ESEWA_PRODUCT_CODE:
raise PaymentGatewayError('The returned eSewa product code did not match the configured merchant code.')
status_response = _esewa_status_lookup(order)
if status_response.get('error_message'):
raise PaymentGatewayError(f"eSewa status lookup failed: {status_response['error_message']}")
gateway_status = str(status_response.get('status', '')).upper().strip()
if not gateway_status:
raise PaymentGatewayError('eSewa did not return a payment status for this transaction.')
returned_total = _first_present(status_response, 'total_amount', 'totalAmount')
if returned_total and _format_amount(returned_total) != order_total:
raise PaymentGatewayError('The verified eSewa amount did not match this order total.')
returned_product_code = str(_first_present(status_response, 'product_code', 'productCode', 'scd')).strip()
if returned_product_code and returned_product_code != settings.ESEWA_PRODUCT_CODE:
raise PaymentGatewayError('The verified eSewa merchant code did not match this order.')
reference = str(_first_present(status_response, 'ref_id', 'refId') or callback.get('transaction_code') or transaction_uuid).strip()
if gateway_status == 'COMPLETE':
local_status = 'Paid'
message = 'eSewa payment verified successfully.'
elif gateway_status in {'PENDING', 'AMBIGUOUS'}:
local_status = 'Pending'
message = 'eSewa is still processing this payment.'
else:
local_status = 'Failed'
message = f'eSewa returned {gateway_status} for this payment.'
return {
'status': local_status,
'gateway_status': gateway_status,
'reference': reference,
'session_id': transaction_uuid,
'currency': payment_currency(),
'message': message,
}
def initiate_khalti_payment(request, order):
if not khalti_configured():
raise PaymentGatewayError('Khalti is not configured yet.')
amount_in_paisa = _to_minor_units(order.total_price, payment_currency())
if amount_in_paisa < 1000:
raise PaymentGatewayError('Khalti requires a minimum payment amount of Rs. 10.')
purchase_order_id = f"ORDER-{order.id}-{uuid.uuid4().hex[:8].upper()}"
payload = {
'return_url': f"{absolute_url(request, reverse('khalti_return'))}?order_id={order.id}",
'website_url': _public_base_url(request),
'amount': amount_in_paisa,
'purchase_order_id': purchase_order_id,
'purchase_order_name': f'Order #{order.id}',
}
customer_info = {
'name': order.full_name or order.user.get_full_name() or order.user.username,
'phone': order.phone,
}
if order.user.email:
customer_info['email'] = order.user.email
payload['customer_info'] = {key: value for key, value in customer_info.items() if value}
response = _json_request(
settings.KHALTI_INITIATE_URL,
method='POST',
payload=payload,
headers={'Authorization': f'key {settings.KHALTI_SECRET_KEY}'},
)
payment_url = str(response.get('payment_url', '')).strip()
pidx = str(response.get('pidx', '')).strip()
if not payment_url or not pidx:
raise PaymentGatewayError('Khalti did not return a payment URL for this order.')
return {
'payment_url': payment_url,
'session_id': pidx,
'reference': purchase_order_id,
'currency': payment_currency(),
}
def _khalti_lookup(pidx):
return _json_request(
settings.KHALTI_LOOKUP_URL,
method='POST',
payload={'pidx': pidx},
headers={'Authorization': f'key {settings.KHALTI_SECRET_KEY}'},
)
def verify_khalti_payment(order, pidx):
if not khalti_configured():
raise PaymentGatewayError('Khalti is not configured yet.')
normalized_pidx = (pidx or '').strip()
if not normalized_pidx:
raise PaymentGatewayError('Khalti did not return a payment session id.')
if normalized_pidx != (order.payment_session_id or '').strip():
raise PaymentGatewayError('The returned Khalti session did not match the latest payment attempt for this order.')
lookup = _khalti_lookup(normalized_pidx)
gateway_status = str(lookup.get('status', '')).strip()
if not gateway_status:
raise PaymentGatewayError('Khalti did not return a payment status for this transaction.')
returned_amount = lookup.get('total_amount')
if returned_amount not in (None, ''):
try:
returned_amount_value = int(returned_amount)
except (TypeError, ValueError) as exc:
raise PaymentGatewayError('Khalti returned an invalid payment amount.') from exc
expected_amount = _to_minor_units(order.total_price, payment_currency())
if returned_amount_value != expected_amount:
raise PaymentGatewayError('The returned Khalti amount did not match this order total.')
if order.payment_reference:
returned_order_reference = str(lookup.get('purchase_order_id', '')).strip()
if returned_order_reference and returned_order_reference != order.payment_reference:
raise PaymentGatewayError('The returned Khalti purchase order id did not match this order.')
refunded = bool(lookup.get('refunded'))
transaction_reference = str(lookup.get('transaction_id') or normalized_pidx).strip()
if gateway_status == 'Completed' and not refunded:
local_status = 'Paid'
message = 'Khalti payment verified successfully.'
elif gateway_status in {'Pending', 'Initiated'}:
local_status = 'Pending'
message = 'Khalti is still processing this payment.'
else:
local_status = 'Failed'
message = f'Khalti returned {gateway_status} for this payment.'
return {
'status': local_status,
'gateway_status': gateway_status,
'reference': transaction_reference,
'session_id': normalized_pidx,
'currency': payment_currency(),
'message': message,
}