""" Ghost Node — ORM Models, Heuristic Scoring & DB Seeder """ from __future__ import annotations import re from datetime import datetime from typing import Optional from sqlalchemy import Boolean, Column, DateTime, Float, Integer, String, Text from sqlalchemy.sql import func from database import Base, SessionLocal, engine # ── ORM Models ──────────────────────────────────────────────────────────────── class Listing(Base): __tablename__ = "listings" id = Column(Integer, primary_key=True, index=True) title = Column(String(500), nullable=False) price = Column(Float, nullable=True) currency = Column(String(10), nullable=True, default="") price_raw = Column(String(100), nullable=True, default="") time_left = Column(String(60), nullable=True, default="") time_left_mins = Column(Float, nullable=True, default=None) price_updated_at = Column(DateTime(timezone=True), nullable=True) link = Column(String(1000), nullable=False, unique=True) score = Column(Integer, default=0) keyword = Column(String(200), nullable=True) site_name = Column(String(200), nullable=True) timestamp = Column(DateTime(timezone=True), server_default=func.now()) # N16: AI filter results ai_match = Column(Integer, nullable=True, default=None) # 1=match, 0=no match, NULL=not analysed ai_reason = Column(String(500), nullable=True, default=None) # N4/N7 fields location = Column(String(200), nullable=True, default=None) # lot location (city/country) price_usd = Column(Float, nullable=True, default=None) # N4: normalised USD price closing_alerts_sent = Column(Text, nullable=True, default=None) # N9+: JSON list of interval minutes already fired e.g. "[60,30]" images = Column(Text, nullable=True, default=None) # JSON array of image URLs scraped from the lot card def to_dict(self) -> dict: return { "id": self.id, "title": self.title, "price": self.price, "currency": self.currency or "", "price_raw": self.price_raw or "", "time_left": self.time_left or "", "time_left_mins": self.time_left_mins, "price_updated_at": self.price_updated_at.isoformat() if self.price_updated_at else None, "link": self.link, "score": self.score, "keyword": self.keyword, "site_name": self.site_name, "timestamp": self.timestamp.isoformat() if self.timestamp else None, "ai_match": self.ai_match, "ai_reason": self.ai_reason or "", "location": self.location or "", "price_usd": self.price_usd, "closing_alerts_sent": self.closing_alerts_sent or "[]", "images": self.images or "[]", } class Keyword(Base): __tablename__ = "keywords" id = Column(Integer, primary_key=True, index=True) term = Column(String(200), nullable=False, unique=True) weight = Column(Integer, default=1) ai_target = Column(Text, nullable=True, default=None) # N16: AI filter min_price = Column(Float, nullable=True, default=None) # N7: filter — skip if price below max_price = Column(Float, nullable=True, default=None) # N7: filter — skip if price above sort_order = Column(Integer, nullable=True, default=0) # drag-and-drop position def to_dict(self): return { "id": self.id, "term": self.term, "weight": self.weight, "ai_target": self.ai_target or "", "min_price": self.min_price, "max_price": self.max_price, "sort_order": self.sort_order or 0, } class Config(Base): __tablename__ = "config" id = Column(Integer, primary_key=True, index=True) key = Column(String(100), nullable=False, unique=True) value = Column(Text, nullable=True) def to_dict(self): return {"id": self.id, "key": self.key, "value": self.value} class SiteSelectors(Base): """ N17 — AI-generated CSS selectors per target site. Stored after a successful Auto-Adapt run and reused by scrape_site() before falling back to the universal JS extractor. """ __tablename__ = "site_selectors" id = Column(Integer, primary_key=True, index=True) site_id = Column(Integer, nullable=False, unique=True, index=True) # Selectors generated by AI container_sel = Column(String(500), nullable=True) # repeated card/item container title_sel = Column(String(500), nullable=True) # relative to container price_sel = Column(String(500), nullable=True) # relative to container time_sel = Column(String(500), nullable=True) # relative to container link_sel = Column(String(500), nullable=True) # relative to container (use "self" if container is ) next_page_sel = Column(String(500), nullable=True) # for pagination clicks # Quality metrics confidence = Column(Float, default=0) # 0–100 score container_count = Column(Integer, default=0) # items found during validation title_rate = Column(Float, default=0) # % of containers that yielded a title price_rate = Column(Float, default=0) # % of containers that yielded a price # Metadata provider = Column(String(50), nullable=True) # groq | ollama generated_at = Column(DateTime(timezone=True), nullable=True) last_tested_at = Column(DateTime(timezone=True), nullable=True) stale = Column(Boolean, default=False) # True when 0 results returned on live scrape notes = Column(Text, nullable=True) # raw AI response / error notes def to_dict(self) -> dict: return { "id": self.id, "site_id": self.site_id, "container_sel": self.container_sel or "", "title_sel": self.title_sel or "", "price_sel": self.price_sel or "", "time_sel": self.time_sel or "", "link_sel": self.link_sel or "", "next_page_sel": self.next_page_sel or "", "confidence": round(self.confidence or 0, 1), "container_count": self.container_count or 0, "title_rate": round(self.title_rate or 0, 1), "price_rate": round(self.price_rate or 0, 1), "provider": self.provider or "", "generated_at": self.generated_at.isoformat() if self.generated_at else None, "last_tested_at": self.last_tested_at.isoformat() if self.last_tested_at else None, "stale": bool(self.stale), "notes": (self.notes or "")[:300], } class TargetSite(Base): __tablename__ = "target_sites" id = Column(Integer, primary_key=True, index=True) name = Column(String(200), nullable=False) url_template = Column(String(1000), nullable=False) search_selector = Column(String(200), nullable=True, default="") enabled = Column(Integer, default=1) max_pages = Column(Integer, default=1) # N5 pagination sort_order = Column(Integer, nullable=True, default=0) # drag-and-drop position # N13 health tracking last_error = Column(Text, nullable=True) error_count = Column(Integer, default=0) consecutive_failures = Column(Integer, default=0) last_success_at = Column(DateTime(timezone=True), nullable=True) cooldown_until = Column(DateTime(timezone=True), nullable=True) # N14 login support requires_login = Column(Boolean, default=False) login_url = Column(String(500), nullable=True, default="") login_check_selector = Column(String(200), nullable=True, default="") login_enabled = Column(Boolean, default=True) def to_dict(self) -> dict: return { "id": self.id, "name": self.name, "url_template": self.url_template, "search_selector": self.search_selector or "", "enabled": bool(self.enabled), "max_pages": self.max_pages or 1, "sort_order": self.sort_order or 0, "last_error": self.last_error or "", "error_count": self.error_count or 0, "consecutive_failures": self.consecutive_failures or 0, "last_success_at": self.last_success_at.isoformat() if self.last_success_at else None, "cooldown_until": self.cooldown_until.isoformat() if self.cooldown_until else None, "requires_login": bool(self.requires_login), "login_url": self.login_url or "", "login_check_selector": self.login_check_selector or "", "login_enabled": bool(self.login_enabled) if self.login_enabled is not None else True, } # ── N6: Scoring Rules (DB-backed, editable via UI) ──────────────────────────── class ScoringRule(Base): """Editable scoring signals — replaces hardcoded POSITIVE/NEGATIVE_SIGNALS.""" __tablename__ = "scoring_rules" id = Column(Integer, primary_key=True, index=True) signal = Column(String(100), nullable=False) # token to match (case-insensitive) delta = Column(Integer, nullable=False) # positive = boost, negative = penalty category = Column(String(50), nullable=True, default="custom") # "positive" | "negative" | "custom" notes = Column(Text, nullable=True) def to_dict(self) -> dict: return { "id": self.id, "signal": self.signal, "delta": self.delta, "category": self.category or "custom", "notes": self.notes or "", } # Fallback hardcoded signals — used ONLY if scoring_rules table is empty _POSITIVE_SIGNALS_FALLBACK = [ ("GB",10),("RAM",10),("Unlocked",10),("SSD",10),("RTX",10),("GPU",10), ("S10",10),("NVMe",8),("OLED",8),("5G",6),("New",5),("Sealed",5), ] _NEGATIVE_SIGNALS_FALLBACK = [ ("Cover",-10),("Case",-10),("Sleeve",-10),("Box Only",-10),("Broken",-10), ("For Parts",-10),("Cracked",-8),("Damaged",-8),("Read",-5),("Faulty",-10), ] # Seed data for the scoring_rules table (derived from the old hardcoded lists) SEED_SCORING_RULES = ( [(sig, delta, "positive") for sig, delta in _POSITIVE_SIGNALS_FALLBACK] + [(sig, delta, "negative") for sig, delta in _NEGATIVE_SIGNALS_FALLBACK] ) def calculate_attribute_score(text: str, keyword_weight: int = 1) -> int: """Score a listing title against DB scoring rules (falls back to hardcoded).""" tu = text.upper() score = 0 try: db = SessionLocal() try: rules = db.query(ScoringRule).all() if rules: for r in rules: if r.signal.upper() in tu: score += r.delta else: # Empty table — use fallback (first startup before seed runs) for tok, d in _POSITIVE_SIGNALS_FALLBACK + _NEGATIVE_SIGNALS_FALLBACK: if tok.upper() in tu: score += d finally: db.close() except Exception: # If DB unavailable, silently use fallback for tok, d in _POSITIVE_SIGNALS_FALLBACK + _NEGATIVE_SIGNALS_FALLBACK: if tok.upper() in tu: score += d return score * max(1, keyword_weight) # ── Seed Data ───────────────────────────────────────────────────────────────── SEED_KEYWORDS = [ ("RTX 4090",5),("RTX 4080",4),("RTX 3090",4),("Samsung Tab S10",4), ("Samsung Tab S9",3),("iPhone 15 Pro",3),("MacBook Pro M3",3), ("Steam Deck",3),("PS5",3),("Xbox Series X",3),("AirPods Pro",2),("DJI Mini 4",2), ] SEED_SITES = [ {"name":"eBay UK", "url_template":"https://www.ebay.co.uk/sch/i.html?_nkw={keyword}&_sop=10","search_selector":"#gh-ac"}, {"name":"eBay US", "url_template":"https://www.ebay.com/sch/i.html?_nkw={keyword}&_sop=10", "search_selector":"#gh-ac"}, {"name":"ShopGoodwill", "url_template":"https://shopgoodwill.com/home", "search_selector":"input#st"}, ] SEED_CONFIG = [ ("telegram_token",""),("telegram_chat_id",""),("timer","120"), ("browser_choice","auto"),("incognito_mode","false"),("show_browser","false"), ("delay_launch","0"),("delay_post_search","0"),("delay_page_hold","0"), ("delay_site_open","0"),("humanize_level","heavy"), # N2 CAPTCHA ("captcha_solver","none"),("captcha_api_key",""), # N10 channels ("alert_channels","telegram"),("discord_webhook",""), # Gmail (simplified email — no SMTP complexity) ("gmail_address",""),("gmail_app_password",""),("email_to",""), # N9 closing alerts ("closing_alert_enabled","false"), ("closing_alert_schedule","30"), # comma-separated minutes, e.g. "60,30,10,5" or "0" for capture-only # N12 DB ("db_url",""), # N13 auto-disable ("site_auto_disable_after","5"), # N16 AI filter ("ai_filter_enabled","false"), ("ai_debug","false"), # N17 Auto-Adapter ("auto_adapt_enabled","false"), ("ai_provider","groq"), ("ai_model","llama-3.3-70b-versatile"), ("ai_api_key",""), ("ai_base_url","http://localhost:11434"), # N4 currency display ("display_currency",""), # N1 proxy ("proxy_enabled","false"), ("proxy_list",""), # Dashboard preference ("listing_detail_enabled","true"), # Scoring toggle — "false" = AI is sole judge, score signals ignored ("scoring_enabled","true"), # N8 scheduled scraping windows ("scrape_window_enabled","false"), ("scrape_start_hour","8"), # 0-23, local time ("scrape_end_hour","22"), # 0-23, local time — engine sleeps outside this window ("boost_interval_mins","2"), # interval (minutes) to use when a lot closes within 30min ] # ── Schema Migration ─────────────────────────────────────────────────────────── def _add_col(cur, conn_or_session, table, col, typedef, dialect="sqlite"): """Add column if missing — SQLite and PostgreSQL compatible.""" if dialect == "sqlite": cur.execute(f"PRAGMA table_info({table})") existing = {r[1] for r in cur.fetchall()} if col not in existing: cur.execute(f"ALTER TABLE {table} ADD COLUMN {col} {typedef}") conn_or_session.commit() print(f"[GhostNode] 🔧 {table}: +{col}") else: res = conn_or_session.execute( f"SELECT column_name FROM information_schema.columns " f"WHERE table_name='{table}' AND column_name='{col}'" ) if not res.fetchone(): conn_or_session.execute( f'ALTER TABLE "{table}" ADD COLUMN IF NOT EXISTS "{col}" {typedef}' ) conn_or_session.commit() print(f"[GhostNode] 🔧 {table}: +{col}") def _migrate_schema() -> None: dialect = engine.dialect.name if dialect == "sqlite": import sqlite3 db_path = engine.url.database if not db_path or db_path == ":memory:": return try: conn = sqlite3.connect(db_path) cur = conn.cursor() for col, td in [ ("search_selector", "TEXT DEFAULT ''"), ("max_pages", "INTEGER DEFAULT 1"), ("sort_order", "INTEGER DEFAULT 0"), ("last_error", "TEXT"), ("error_count", "INTEGER DEFAULT 0"), ("consecutive_failures","INTEGER DEFAULT 0"), ("last_success_at", "DATETIME"), ("cooldown_until", "DATETIME"), ("requires_login", "BOOLEAN DEFAULT 0"), ("login_url", "TEXT DEFAULT ''"), ("login_check_selector","TEXT DEFAULT ''"), ("login_enabled", "BOOLEAN DEFAULT 1"), ]: _add_col(cur, conn, "target_sites", col, td) for col, td in [ ("price_raw", "TEXT DEFAULT ''"), ("currency", "TEXT DEFAULT ''"), ("time_left", "TEXT DEFAULT ''"), ("time_left_mins", "REAL DEFAULT NULL"), ("price_updated_at", "DATETIME DEFAULT NULL"), ("ai_match", "INTEGER DEFAULT NULL"), ("ai_reason", "TEXT DEFAULT NULL"), ("location", "TEXT DEFAULT NULL"), ("price_usd", "REAL DEFAULT NULL"), ("closing_alerts_sent","TEXT DEFAULT NULL"), ("images", "TEXT DEFAULT NULL"), ]: _add_col(cur, conn, "listings", col, td) for col, td in [ ("ai_target", "TEXT DEFAULT NULL"), ("min_price", "REAL DEFAULT NULL"), ("max_price", "REAL DEFAULT NULL"), ("sort_order","INTEGER DEFAULT 0"), ]: _add_col(cur, conn, "keywords", col, td) conn.close() except Exception as exc: print(f"[GhostNode] ⚠️ Migration error: {exc}") elif dialect == "postgresql": try: with engine.connect() as sess: for col, td in [ ("search_selector","TEXT DEFAULT ''"),("max_pages","INTEGER DEFAULT 1"), ("sort_order","INTEGER DEFAULT 0"), ("last_error","TEXT"),("error_count","INTEGER DEFAULT 0"), ("consecutive_failures","INTEGER DEFAULT 0"), ("last_success_at","TIMESTAMP WITH TIME ZONE"), ("cooldown_until","TIMESTAMP WITH TIME ZONE"), ("requires_login","BOOLEAN DEFAULT FALSE"), ("login_url","TEXT DEFAULT ''"),("login_check_selector","TEXT DEFAULT ''"), ("login_enabled","BOOLEAN DEFAULT TRUE"), ]: _add_col(None, sess, "target_sites", col, td, "postgresql") for col, td in [ ("price_raw","TEXT DEFAULT ''"),("currency","TEXT DEFAULT ''"), ("time_left","TEXT DEFAULT ''"),("time_left_mins","DOUBLE PRECISION"), ("price_updated_at","TIMESTAMP WITH TIME ZONE"), ("ai_match","INTEGER DEFAULT NULL"), ("ai_reason","TEXT DEFAULT NULL"), ("location","TEXT DEFAULT NULL"), ("price_usd","DOUBLE PRECISION DEFAULT NULL"), ("closing_alerts_sent","TEXT DEFAULT NULL"), ("images","TEXT DEFAULT NULL"), ]: _add_col(None, sess, "listings", col, td, "postgresql") for col, td in [ ("ai_target","TEXT DEFAULT NULL"), ("min_price","DOUBLE PRECISION DEFAULT NULL"), ("max_price","DOUBLE PRECISION DEFAULT NULL"), ("sort_order","INTEGER DEFAULT 0"), ]: _add_col(None, sess, "keywords", col, td, "postgresql") except Exception as exc: print(f"[GhostNode] ⚠️ PostgreSQL migration error: {exc}") def seed_database() -> None: Base.metadata.create_all(bind=engine) _migrate_schema() db = SessionLocal() try: if db.query(Keyword).count() == 0: for i, (term, w) in enumerate(SEED_KEYWORDS): db.add(Keyword(term=term, weight=w, sort_order=i)) if db.query(TargetSite).count() == 0: for i, s in enumerate(SEED_SITES): db.add(TargetSite(**s, enabled=1, sort_order=i)) # Upsert config — add any new keys even on existing DBs existing_keys = {r.key for r in db.query(Config).all()} for key, value in SEED_CONFIG: if key not in existing_keys: db.add(Config(key=key, value=value)) # N6: seed scoring rules from hardcoded fallback if table is empty if db.query(ScoringRule).count() == 0: for sig, delta, cat in SEED_SCORING_RULES: db.add(ScoringRule(signal=sig, delta=delta, category=cat)) db.commit() print("[GhostNode] ✅ Database ready.") except Exception as exc: db.rollback() print(f"[GhostNode] ⚠️ Seed error: {exc}") finally: db.close()