169 lines
7.5 KiB
Python
169 lines
7.5 KiB
Python
import hashlib
|
||
import math
|
||
import re
|
||
from dataclasses import dataclass
|
||
from urllib.parse import urlparse
|
||
|
||
from .models import ThreatScan
|
||
|
||
SUSPICIOUS_TLDS = {"zip", "mov", "click", "country", "gq", "tk", "ml", "cf"}
|
||
BRAND_TERMS = {"paypal", "microsoft", "google", "apple", "amazon", "bank", "chase", "wellsfargo", "office365"}
|
||
URGENCY_TERMS = {"urgent", "immediately", "verify", "suspended", "locked", "limited", "expire", "password", "invoice", "wire", "gift card", "crypto"}
|
||
CREDENTIAL_TERMS = {"login", "signin", "sign in", "password", "2fa", "otp", "account", "credentials", "ssn"}
|
||
URL_SHORTENERS = {"bit.ly", "tinyurl.com", "t.co", "goo.gl", "ow.ly", "is.gd", "buff.ly", "cutt.ly"}
|
||
|
||
|
||
@dataclass
|
||
class ScanResult:
|
||
risk_score: int
|
||
risk_level: str
|
||
verdict: str
|
||
explanation: str
|
||
indicators: list[dict]
|
||
recommended_actions: list[str]
|
||
target_preview: str
|
||
content_hash: str
|
||
|
||
|
||
def _hash_content(content: str) -> str:
|
||
return hashlib.sha256(content.encode("utf-8")).hexdigest()
|
||
|
||
|
||
def _preview(content: str, limit: int = 180) -> str:
|
||
clean = re.sub(r"\s+", " ", content).strip()
|
||
clean = re.sub(r"([A-Za-z0-9._%+-])[A-Za-z0-9._%+-]*(@)", r"•••", clean)
|
||
return clean[:limit] + ("…" if len(clean) > limit else "")
|
||
|
||
|
||
def _add(indicators: list[dict], label: str, weight: int, detail: str):
|
||
indicators.append({"label": label, "weight": weight, "detail": detail})
|
||
|
||
|
||
def _risk_level(score: int) -> str:
|
||
if score >= 85:
|
||
return ThreatScan.RiskLevel.CRITICAL
|
||
if score >= 65:
|
||
return ThreatScan.RiskLevel.HIGH
|
||
if score >= 35:
|
||
return ThreatScan.RiskLevel.MEDIUM
|
||
return ThreatScan.RiskLevel.LOW
|
||
|
||
|
||
def _verdict(score: int) -> str:
|
||
if score >= 85:
|
||
return "Likely malicious — isolate and do not interact"
|
||
if score >= 65:
|
||
return "High-risk suspicious content"
|
||
if score >= 35:
|
||
return "Needs review before trusting"
|
||
return "Low risk based on current signals"
|
||
|
||
|
||
def _actions(level: str) -> list[str]:
|
||
if level in {ThreatScan.RiskLevel.CRITICAL, ThreatScan.RiskLevel.HIGH}:
|
||
return [
|
||
"Do not click links, download attachments, or enter credentials.",
|
||
"Report this item to your security team or service provider.",
|
||
"If you already interacted, rotate passwords and review account activity.",
|
||
]
|
||
if level == ThreatScan.RiskLevel.MEDIUM:
|
||
return [
|
||
"Verify the sender/domain through an independent channel.",
|
||
"Hover or inspect links before opening them.",
|
||
"Avoid sharing credentials or payment details until confirmed.",
|
||
]
|
||
return [
|
||
"No strong malicious signals were found, but continue to verify unexpected requests.",
|
||
"Keep software and browser protections enabled.",
|
||
]
|
||
|
||
|
||
def scan_content(scan_type: str, content: str) -> ScanResult:
|
||
indicators: list[dict] = []
|
||
score = 5
|
||
lowered = content.lower()
|
||
|
||
if scan_type == ThreatScan.ScanType.URL:
|
||
parsed = urlparse(content)
|
||
host = (parsed.netloc or parsed.path).lower().split(":")[0]
|
||
path = parsed.path.lower()
|
||
labels = [part for part in host.split(".") if part]
|
||
tld = labels[-1] if labels else ""
|
||
|
||
if parsed.scheme == "http":
|
||
score += 18
|
||
_add(indicators, "Unencrypted HTTP", 18, "The URL uses http:// instead of https://.")
|
||
if host.replace(".", "").isdigit() or re.match(r"^\d+\.\d+\.\d+\.\d+$", host):
|
||
score += 22
|
||
_add(indicators, "IP address host", 22, "Phishing links often hide behind raw IP addresses.")
|
||
if host in URL_SHORTENERS:
|
||
score += 20
|
||
_add(indicators, "Shortened URL", 20, "Shorteners hide the final destination until opened.")
|
||
if tld in SUSPICIOUS_TLDS:
|
||
score += 14
|
||
_add(indicators, "Higher-risk TLD", 14, f".{tld} domains are frequently abused in commodity phishing.")
|
||
if len(host) > 38 or len(content) > 120:
|
||
score += 10
|
||
_add(indicators, "Long destination", 10, "Very long hosts/URLs can hide deceptive tracking or redirect chains.")
|
||
if "@" in content:
|
||
score += 18
|
||
_add(indicators, "@ symbol in URL", 18, "The @ character can disguise the actual destination host.")
|
||
if sum(1 for ch in host if ch == "-") >= 2:
|
||
score += 8
|
||
_add(indicators, "Hyphen-heavy domain", 8, "Multiple hyphens can imitate legitimate brand domains.")
|
||
matched_brands = [brand for brand in BRAND_TERMS if brand in host and not host.endswith(f"{brand}.com")]
|
||
if matched_brands:
|
||
score += 16
|
||
_add(indicators, "Brand impersonation pattern", 16, f"The host contains sensitive brand terms: {', '.join(matched_brands[:3])}.")
|
||
if any(term in path for term in CREDENTIAL_TERMS):
|
||
score += 12
|
||
_add(indicators, "Credential-themed path", 12, "The path references login, password, or account actions.")
|
||
else:
|
||
urgent_hits = [term for term in URGENCY_TERMS if term in lowered]
|
||
credential_hits = [term for term in CREDENTIAL_TERMS if term in lowered]
|
||
money_hits = re.findall(r"\$\s?\d+|wire transfer|gift card|bitcoin|crypto", lowered)
|
||
url_count = len(re.findall(r"https?://|www\.", lowered))
|
||
|
||
if urgent_hits:
|
||
weight = min(25, 8 + len(urgent_hits) * 4)
|
||
score += weight
|
||
_add(indicators, "Urgency and pressure language", weight, f"Found terms such as {', '.join(urgent_hits[:5])}.")
|
||
if credential_hits:
|
||
weight = min(24, 10 + len(credential_hits) * 3)
|
||
score += weight
|
||
_add(indicators, "Credential request", weight, f"The message asks about {', '.join(credential_hits[:5])}.")
|
||
if money_hits:
|
||
score += 16
|
||
_add(indicators, "Payment or transfer request", 16, "The message references money movement or irreversible payments.")
|
||
if url_count:
|
||
score += min(20, url_count * 7)
|
||
_add(indicators, "Embedded link", min(20, url_count * 7), f"Detected {url_count} link-like item(s) in the message.")
|
||
if re.search(r"dear (customer|user|client)|kindly|act now|final notice", lowered):
|
||
score += 10
|
||
_add(indicators, "Common scam phrasing", 10, "The wording resembles common phishing templates.")
|
||
if len(content) < 40 and any(term in lowered for term in ["click", "verify", "login"]):
|
||
score += 8
|
||
_add(indicators, "Sparse context", 8, "Short messages with action links are harder to verify safely.")
|
||
|
||
# Normalize so a pile-up of weak signals does not instantly max out risk.
|
||
score = min(100, max(0, round(100 * (1 - math.exp(-score / 85)))))
|
||
if not indicators:
|
||
_add(indicators, "No strong threat indicators", 0, "The scanner did not find obvious phishing markers in this sample.")
|
||
|
||
level = _risk_level(score)
|
||
explanation = (
|
||
"This first MVP uses a local heuristic/NLP-style rules engine designed to be replaced or blended "
|
||
"with a trained Scikit-learn model. It does not store the raw submission; the dashboard saves only "
|
||
"a sanitized preview, SHA-256 hash, score, and explanation."
|
||
)
|
||
return ScanResult(
|
||
risk_score=score,
|
||
risk_level=level,
|
||
verdict=_verdict(score),
|
||
explanation=explanation,
|
||
indicators=sorted(indicators, key=lambda item: item["weight"], reverse=True),
|
||
recommended_actions=_actions(level),
|
||
target_preview=_preview(content),
|
||
content_hash=_hash_content(content),
|
||
)
|