40260-vm/core/scanner.py
Flatlogic Bot 1a0d620188 AI
2026-06-13 09:48:20 +00:00

169 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import math
import re
from dataclasses import dataclass
from urllib.parse import urlparse
from .models import ThreatScan
SUSPICIOUS_TLDS = {"zip", "mov", "click", "country", "gq", "tk", "ml", "cf"}
BRAND_TERMS = {"paypal", "microsoft", "google", "apple", "amazon", "bank", "chase", "wellsfargo", "office365"}
URGENCY_TERMS = {"urgent", "immediately", "verify", "suspended", "locked", "limited", "expire", "password", "invoice", "wire", "gift card", "crypto"}
CREDENTIAL_TERMS = {"login", "signin", "sign in", "password", "2fa", "otp", "account", "credentials", "ssn"}
URL_SHORTENERS = {"bit.ly", "tinyurl.com", "t.co", "goo.gl", "ow.ly", "is.gd", "buff.ly", "cutt.ly"}
@dataclass
class ScanResult:
risk_score: int
risk_level: str
verdict: str
explanation: str
indicators: list[dict]
recommended_actions: list[str]
target_preview: str
content_hash: str
def _hash_content(content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def _preview(content: str, limit: int = 180) -> str:
clean = re.sub(r"\s+", " ", content).strip()
clean = re.sub(r"([A-Za-z0-9._%+-])[A-Za-z0-9._%+-]*(@)", r"•••", clean)
return clean[:limit] + ("" if len(clean) > limit else "")
def _add(indicators: list[dict], label: str, weight: int, detail: str):
indicators.append({"label": label, "weight": weight, "detail": detail})
def _risk_level(score: int) -> str:
if score >= 85:
return ThreatScan.RiskLevel.CRITICAL
if score >= 65:
return ThreatScan.RiskLevel.HIGH
if score >= 35:
return ThreatScan.RiskLevel.MEDIUM
return ThreatScan.RiskLevel.LOW
def _verdict(score: int) -> str:
if score >= 85:
return "Likely malicious — isolate and do not interact"
if score >= 65:
return "High-risk suspicious content"
if score >= 35:
return "Needs review before trusting"
return "Low risk based on current signals"
def _actions(level: str) -> list[str]:
if level in {ThreatScan.RiskLevel.CRITICAL, ThreatScan.RiskLevel.HIGH}:
return [
"Do not click links, download attachments, or enter credentials.",
"Report this item to your security team or service provider.",
"If you already interacted, rotate passwords and review account activity.",
]
if level == ThreatScan.RiskLevel.MEDIUM:
return [
"Verify the sender/domain through an independent channel.",
"Hover or inspect links before opening them.",
"Avoid sharing credentials or payment details until confirmed.",
]
return [
"No strong malicious signals were found, but continue to verify unexpected requests.",
"Keep software and browser protections enabled.",
]
def scan_content(scan_type: str, content: str) -> ScanResult:
indicators: list[dict] = []
score = 5
lowered = content.lower()
if scan_type == ThreatScan.ScanType.URL:
parsed = urlparse(content)
host = (parsed.netloc or parsed.path).lower().split(":")[0]
path = parsed.path.lower()
labels = [part for part in host.split(".") if part]
tld = labels[-1] if labels else ""
if parsed.scheme == "http":
score += 18
_add(indicators, "Unencrypted HTTP", 18, "The URL uses http:// instead of https://.")
if host.replace(".", "").isdigit() or re.match(r"^\d+\.\d+\.\d+\.\d+$", host):
score += 22
_add(indicators, "IP address host", 22, "Phishing links often hide behind raw IP addresses.")
if host in URL_SHORTENERS:
score += 20
_add(indicators, "Shortened URL", 20, "Shorteners hide the final destination until opened.")
if tld in SUSPICIOUS_TLDS:
score += 14
_add(indicators, "Higher-risk TLD", 14, f".{tld} domains are frequently abused in commodity phishing.")
if len(host) > 38 or len(content) > 120:
score += 10
_add(indicators, "Long destination", 10, "Very long hosts/URLs can hide deceptive tracking or redirect chains.")
if "@" in content:
score += 18
_add(indicators, "@ symbol in URL", 18, "The @ character can disguise the actual destination host.")
if sum(1 for ch in host if ch == "-") >= 2:
score += 8
_add(indicators, "Hyphen-heavy domain", 8, "Multiple hyphens can imitate legitimate brand domains.")
matched_brands = [brand for brand in BRAND_TERMS if brand in host and not host.endswith(f"{brand}.com")]
if matched_brands:
score += 16
_add(indicators, "Brand impersonation pattern", 16, f"The host contains sensitive brand terms: {', '.join(matched_brands[:3])}.")
if any(term in path for term in CREDENTIAL_TERMS):
score += 12
_add(indicators, "Credential-themed path", 12, "The path references login, password, or account actions.")
else:
urgent_hits = [term for term in URGENCY_TERMS if term in lowered]
credential_hits = [term for term in CREDENTIAL_TERMS if term in lowered]
money_hits = re.findall(r"\$\s?\d+|wire transfer|gift card|bitcoin|crypto", lowered)
url_count = len(re.findall(r"https?://|www\.", lowered))
if urgent_hits:
weight = min(25, 8 + len(urgent_hits) * 4)
score += weight
_add(indicators, "Urgency and pressure language", weight, f"Found terms such as {', '.join(urgent_hits[:5])}.")
if credential_hits:
weight = min(24, 10 + len(credential_hits) * 3)
score += weight
_add(indicators, "Credential request", weight, f"The message asks about {', '.join(credential_hits[:5])}.")
if money_hits:
score += 16
_add(indicators, "Payment or transfer request", 16, "The message references money movement or irreversible payments.")
if url_count:
score += min(20, url_count * 7)
_add(indicators, "Embedded link", min(20, url_count * 7), f"Detected {url_count} link-like item(s) in the message.")
if re.search(r"dear (customer|user|client)|kindly|act now|final notice", lowered):
score += 10
_add(indicators, "Common scam phrasing", 10, "The wording resembles common phishing templates.")
if len(content) < 40 and any(term in lowered for term in ["click", "verify", "login"]):
score += 8
_add(indicators, "Sparse context", 8, "Short messages with action links are harder to verify safely.")
# Normalize so a pile-up of weak signals does not instantly max out risk.
score = min(100, max(0, round(100 * (1 - math.exp(-score / 85)))))
if not indicators:
_add(indicators, "No strong threat indicators", 0, "The scanner did not find obvious phishing markers in this sample.")
level = _risk_level(score)
explanation = (
"This first MVP uses a local heuristic/NLP-style rules engine designed to be replaced or blended "
"with a trained Scikit-learn model. It does not store the raw submission; the dashboard saves only "
"a sanitized preview, SHA-256 hash, score, and explanation."
)
return ScanResult(
risk_score=score,
risk_level=level,
verdict=_verdict(score),
explanation=explanation,
indicators=sorted(indicators, key=lambda item: item["weight"], reverse=True),
recommended_actions=_actions(level),
target_preview=_preview(content),
content_hash=_hash_content(content),
)