import hashlib import math import re from dataclasses import dataclass from urllib.parse import urlparse from .models import ThreatScan SUSPICIOUS_TLDS = {"zip", "mov", "click", "country", "gq", "tk", "ml", "cf"} BRAND_TERMS = {"paypal", "microsoft", "google", "apple", "amazon", "bank", "chase", "wellsfargo", "office365"} URGENCY_TERMS = {"urgent", "immediately", "verify", "suspended", "locked", "limited", "expire", "password", "invoice", "wire", "gift card", "crypto"} CREDENTIAL_TERMS = {"login", "signin", "sign in", "password", "2fa", "otp", "account", "credentials", "ssn"} URL_SHORTENERS = {"bit.ly", "tinyurl.com", "t.co", "goo.gl", "ow.ly", "is.gd", "buff.ly", "cutt.ly"} @dataclass class ScanResult: risk_score: int risk_level: str verdict: str explanation: str indicators: list[dict] recommended_actions: list[str] target_preview: str content_hash: str def _hash_content(content: str) -> str: return hashlib.sha256(content.encode("utf-8")).hexdigest() def _preview(content: str, limit: int = 180) -> str: clean = re.sub(r"\s+", " ", content).strip() clean = re.sub(r"([A-Za-z0-9._%+-])[A-Za-z0-9._%+-]*(@)", r"•••", clean) return clean[:limit] + ("…" if len(clean) > limit else "") def _add(indicators: list[dict], label: str, weight: int, detail: str): indicators.append({"label": label, "weight": weight, "detail": detail}) def _risk_level(score: int) -> str: if score >= 85: return ThreatScan.RiskLevel.CRITICAL if score >= 65: return ThreatScan.RiskLevel.HIGH if score >= 35: return ThreatScan.RiskLevel.MEDIUM return ThreatScan.RiskLevel.LOW def _verdict(score: int) -> str: if score >= 85: return "Likely malicious — isolate and do not interact" if score >= 65: return "High-risk suspicious content" if score >= 35: return "Needs review before trusting" return "Low risk based on current signals" def _actions(level: str) -> list[str]: if level in {ThreatScan.RiskLevel.CRITICAL, ThreatScan.RiskLevel.HIGH}: return [ "Do not click links, download attachments, or enter credentials.", "Report this item to your security team or service provider.", "If you already interacted, rotate passwords and review account activity.", ] if level == ThreatScan.RiskLevel.MEDIUM: return [ "Verify the sender/domain through an independent channel.", "Hover or inspect links before opening them.", "Avoid sharing credentials or payment details until confirmed.", ] return [ "No strong malicious signals were found, but continue to verify unexpected requests.", "Keep software and browser protections enabled.", ] def scan_content(scan_type: str, content: str) -> ScanResult: indicators: list[dict] = [] score = 5 lowered = content.lower() if scan_type == ThreatScan.ScanType.URL: parsed = urlparse(content) host = (parsed.netloc or parsed.path).lower().split(":")[0] path = parsed.path.lower() labels = [part for part in host.split(".") if part] tld = labels[-1] if labels else "" if parsed.scheme == "http": score += 18 _add(indicators, "Unencrypted HTTP", 18, "The URL uses http:// instead of https://.") if host.replace(".", "").isdigit() or re.match(r"^\d+\.\d+\.\d+\.\d+$", host): score += 22 _add(indicators, "IP address host", 22, "Phishing links often hide behind raw IP addresses.") if host in URL_SHORTENERS: score += 20 _add(indicators, "Shortened URL", 20, "Shorteners hide the final destination until opened.") if tld in SUSPICIOUS_TLDS: score += 14 _add(indicators, "Higher-risk TLD", 14, f".{tld} domains are frequently abused in commodity phishing.") if len(host) > 38 or len(content) > 120: score += 10 _add(indicators, "Long destination", 10, "Very long hosts/URLs can hide deceptive tracking or redirect chains.") if "@" in content: score += 18 _add(indicators, "@ symbol in URL", 18, "The @ character can disguise the actual destination host.") if sum(1 for ch in host if ch == "-") >= 2: score += 8 _add(indicators, "Hyphen-heavy domain", 8, "Multiple hyphens can imitate legitimate brand domains.") matched_brands = [brand for brand in BRAND_TERMS if brand in host and not host.endswith(f"{brand}.com")] if matched_brands: score += 16 _add(indicators, "Brand impersonation pattern", 16, f"The host contains sensitive brand terms: {', '.join(matched_brands[:3])}.") if any(term in path for term in CREDENTIAL_TERMS): score += 12 _add(indicators, "Credential-themed path", 12, "The path references login, password, or account actions.") else: urgent_hits = [term for term in URGENCY_TERMS if term in lowered] credential_hits = [term for term in CREDENTIAL_TERMS if term in lowered] money_hits = re.findall(r"\$\s?\d+|wire transfer|gift card|bitcoin|crypto", lowered) url_count = len(re.findall(r"https?://|www\.", lowered)) if urgent_hits: weight = min(25, 8 + len(urgent_hits) * 4) score += weight _add(indicators, "Urgency and pressure language", weight, f"Found terms such as {', '.join(urgent_hits[:5])}.") if credential_hits: weight = min(24, 10 + len(credential_hits) * 3) score += weight _add(indicators, "Credential request", weight, f"The message asks about {', '.join(credential_hits[:5])}.") if money_hits: score += 16 _add(indicators, "Payment or transfer request", 16, "The message references money movement or irreversible payments.") if url_count: score += min(20, url_count * 7) _add(indicators, "Embedded link", min(20, url_count * 7), f"Detected {url_count} link-like item(s) in the message.") if re.search(r"dear (customer|user|client)|kindly|act now|final notice", lowered): score += 10 _add(indicators, "Common scam phrasing", 10, "The wording resembles common phishing templates.") if len(content) < 40 and any(term in lowered for term in ["click", "verify", "login"]): score += 8 _add(indicators, "Sparse context", 8, "Short messages with action links are harder to verify safely.") # Normalize so a pile-up of weak signals does not instantly max out risk. score = min(100, max(0, round(100 * (1 - math.exp(-score / 85))))) if not indicators: _add(indicators, "No strong threat indicators", 0, "The scanner did not find obvious phishing markers in this sample.") level = _risk_level(score) explanation = ( "This first MVP uses a local heuristic/NLP-style rules engine designed to be replaced or blended " "with a trained Scikit-learn model. It does not store the raw submission; the dashboard saves only " "a sanitized preview, SHA-256 hash, score, and explanation." ) return ScanResult( risk_score=score, risk_level=level, verdict=_verdict(score), explanation=explanation, indicators=sorted(indicators, key=lambda item: item["weight"], reverse=True), recommended_actions=_actions(level), target_preview=_preview(content), content_hash=_hash_content(content), )