block in main content area (fallback)
# Returns a plain string, max 1500 chars (enough for AI, cheap on tokens).
JS_DETAIL_TEXT = r"""() => {
const MAX = 1500;
function clean(s) {
return (s || '').replace(/\s+/g, ' ').trim().slice(0, MAX);
}
// 1. JSON-LD
try {
const scripts = document.querySelectorAll('script[type="application/ld+json"]');
for (const s of scripts) {
const d = JSON.parse(s.textContent || '{}');
const desc = d.description || (Array.isArray(d['@graph']) && d['@graph'].find(x => x.description)?.description);
if (desc && desc.length > 20) return clean(desc);
}
} catch(e) {}
// 2. OG meta
const og = document.querySelector('meta[property="og:description"]');
if (og && og.content && og.content.length > 20) return clean(og.content);
// 3. Known selectors (eBay, HiBid, ShopGoodwill, generic)
const sel = [
'[data-testid="x-item-description"] iframe', // eBay (inner frame — skipped gracefully)
'.item-description', '.lot-description', '.description-text',
'#desc_div', '#ItemDescription', '#item-description',
'[itemprop="description"]', '.product-description',
'.auction-description', '.lot-details', '.listing-description',
];
for (const s of sel) {
const el = document.querySelector(s);
if (el && el.innerText && el.innerText.trim().length > 20)
return clean(el.innerText);
}
// 4. Largest text block in main content
const candidates = Array.from(document.querySelectorAll('main p, article p, [role="main"] p, .content p, #content p'));
if (candidates.length) {
const longest = candidates.reduce((a, b) => a.innerText.length > b.innerText.length ? a : b);
if (longest.innerText.trim().length > 40) return clean(longest.innerText);
}
return '';
}"""
async def _fetch_listing_images_batch(page_context, new_links: list, db) -> int:
"""
Visit each new listing's detail page to extract full image gallery
AND lot description text (N18).
Called immediately after initial scrape so images and descriptions are
available right away instead of waiting for the 5-minute price refresh.
Args:
page_context: Playwright BrowserContext (from page.context)
new_links: list of (listing_id, link) tuples
db: SQLAlchemy session
Returns:
Number of listings whose images were updated.
"""
updated = 0
for listing_id, link in new_links:
if not link or link.startswith("no-link"):
continue
try:
dp = await page_context.new_page()
await dp.route(
"**/*.{png,jpg,jpeg,gif,svg,woff,woff2,ttf,mp4,webp}",
lambda route: route.abort(),
)
await dp.goto(link, timeout=25_000, wait_until="domcontentloaded")
# Smart wait — Apollo cache polling for GraphQL SPAs
_lot_id_m = re.search(r'/(?:lot|item|product)/(\d+)', link, re.IGNORECASE)
_lot_id = _lot_id_m.group(1) if _lot_id_m else None
try:
await dp.wait_for_function(JS_APOLLO_WAIT, arg=_lot_id,
timeout=8000, polling=200)
except Exception:
pass
await dp.wait_for_timeout(1200)
img_urls = await dp.evaluate(JS_DETAIL_IMAGES)
desc_text = await dp.evaluate(JS_DETAIL_TEXT)
await dp.close()
# Save images and/or description — at least one must be present to write.
if img_urls or desc_text:
listing = db.query(Listing).filter(Listing.id == listing_id).first()
if listing:
if img_urls:
listing.images = json.dumps(img_urls[:10])
if desc_text:
listing.description = desc_text[:1500]
# N18: Re-run AI with description for better accuracy.
# Only re-analyse listings that passed on title alone (ai_match=1)
# and have a keyword ai_target — rejected lots stay rejected.
_ai_en = _get_config("ai_filter_enabled", "false").lower() == "true"
if _ai_en and listing.ai_match == 1 and listing.keyword:
try:
_kw = db.query(Keyword).filter(Keyword.term == listing.keyword).first()
_tgt = (_kw.ai_target or "").strip() if _kw else ""
if _tgt:
_match2, _reason2 = await _ai_analyze(listing.title, _tgt, desc_text)
listing.ai_match = 1 if _match2 else 0
listing.ai_reason = (_reason2 or "")[:200]
print(f"[AI+Desc] {'✅' if _match2 else '❌'} {listing.title[:40]} → {_reason2}")
except Exception as _ae:
print(f"[AI+Desc] ⚠️ re-analysis failed: {_ae}")
db.flush()
db.commit()
updated += 1
_img_n = len(img_urls) if img_urls else 0
_desc_n = len(desc_text) if desc_text else 0
print(f"[Detail] 🖼️ {listing.title[:35]} → {_img_n} img(s), {_desc_n} desc chars")
except Exception as exc:
print(f"[Images] ⚠️ {link[:55]}: {exc}")
try:
await dp.close()
except Exception:
pass
continue
return updated
async def scrape_site(
page,
site: TargetSite,
keyword: Keyword,
db: Session,
delay_post_search: int = 0,
delay_page_hold: int = 0,
delay_site_open: int = 0,
is_first_keyword: bool = False,
humanize_level: str = "heavy", # raw | low | medium | heavy
round_item_id: int | None = None,
) -> dict:
"""
Navigate to a target site and scrape results for one keyword.
Navigation mode is determined automatically by whether url_template
contains the literal string '{keyword}':
Mode A — Direct Template ({keyword} present)
The URL is built by substituting {keyword} and navigated to directly.
This is the fast path for sites with stable search-result URLs
(eBay, Amazon, etc.). If a search_selector is also provided it is
used only as a fallback when the direct navigation fails.
Mode B — Homepage Search (no {keyword} in url_template)
The bot navigates to url_template as a landing page, then runs
Search Discovery to locate the search input automatically using
ARIA roles, placeholder text, and label associations before
falling back to search_selector if one is stored.
search_selector is optional — leave it blank and Ghost Node will
auto-discover the search box on any 2026-era website.
Returns:
{
status: "done" | "pending",
new_count: int,
pending_reason: str | None
}
"""
new_count = 0
pending_reason: str | None = None
def _update_round_item(_status: str, _reason: str | None) -> None:
"""Best-effort update of a scrape_round_items row for this keyword attempt."""
if round_item_id is None:
return
try:
ri = db.query(ScrapeRoundItem).filter(ScrapeRoundItem.id == round_item_id).first()
if not ri:
return
ri.status = _status
ri.last_error = _reason
ri.last_attempt_at = datetime.now()
if _status == "pending" and not ri.first_pending_at:
ri.first_pending_at = datetime.now()
db.flush()
db.commit()
except Exception as _e:
print(f"[RoundItems] ⚠️ Could not update round item id={round_item_id}: {_e}")
_new_listing_links = [] # (listing_id, link) — for detail-page image fetch
is_direct_mode = "{keyword}" in site.url_template
# ── N14: Login guard — check BEFORE any navigation ───────────────────────
# If the site requires login and the feature is enabled, verify the
# browser session is authenticated. If not logged in, skip this site
# and Telegram-alert the user to use the 🔑 Login button in the dashboard.
if site.requires_login and site.login_enabled:
already_logged_in = await _check_login_status(page, site)
if not already_logged_in:
msg = (
f"🔑
Login required — {site.name}\n"
f"Ghost Node is not logged into this site.\n"
f"Open the Dashboard → Sites tab and press the 🔑
Login button "
f"next to
{site.name}, then log in manually.\n"
f"Scraping this site is paused until you log in."
)
asyncio.create_task(send_alert(msg, subject=f"Login required — {site.name}"))
print(f"[Login] ⛔ {site.name}: not logged in — skipping. Use dashboard 🔑 Login button.")
pending_reason = "login required (not logged in)"
_update_round_item("pending", pending_reason)
return {"status": "pending", "new_count": new_count, "pending_reason": pending_reason}
try:
# ── MODE A: Direct template navigation ───────────────────────────────
if is_direct_mode:
target_url = site.url_template.replace(
"{keyword}", keyword.term.replace(" ", "+")
)
print(
f"[Scraper] MODE=DIRECT {site.name} | "
f"'{keyword.term}' → {target_url}"
)
# ── Homepage pre-visit (medium / heavy only) ─────────────────────
# Raw/low jump straight to the results URL.
# Medium/heavy visit the homepage first for a natural referer chain.
if is_first_keyword and humanize_level.strip().lower() in ("medium", "heavy"):
try:
# Derive homepage from the template URL
from urllib.parse import urlparse as _uparse
_parsed = _uparse(site.url_template)
_homepage = f"{_parsed.scheme}://{_parsed.netloc}/"
print(f"[Scraper] 🏠 Pre-visiting homepage: {_homepage}")
await page.goto(_homepage, timeout=30_000,
wait_until="domcontentloaded",
referer="https://www.google.com/")
# Human idle on homepage — as if glancing at the front page
await asyncio.sleep(_jitter(2.5, pct=0.5))
await _human_mouse(page)
await asyncio.sleep(_jitter(1.2, pct=0.4))
await _human_scroll(page, steps=random.randint(1, 3))
await asyncio.sleep(_jitter(1.0, pct=0.4))
except Exception as hp_exc:
print(f"[Scraper] ⚠️ Homepage pre-visit skipped: {hp_exc}")
try:
await page.goto(target_url, timeout=60_000, wait_until="networkidle",
referer=f"https://{_parsed.netloc}/" if is_first_keyword else "https://www.google.com/")
except Exception as goto_exc:
print(
f"[Scraper] ⚠️ networkidle timeout for {site.name}, "
f"retrying with domcontentloaded: {goto_exc}"
)
await page.goto(
target_url, timeout=60_000, wait_until="domcontentloaded"
)
# ── N2: CAPTCHA check after navigation ────────────────────────────
if await _detect_captcha(page):
print(f"[CAPTCHA] 🤖 Detected on {site.name} — attempting solve…")
_solver = _get_config("captcha_solver", "none").strip().lower()
_api_key = _get_config("captcha_api_key", "").strip()
_solved = False
if _solver == "2captcha" and _api_key:
_solved = await _solve_captcha_2captcha(page, _api_key)
elif _solver == "capsolver" and _api_key:
_solved = await _solve_captcha_capsolver(page, _api_key)
if not _solved:
print(f"[CAPTCHA] ❌ Could not solve CAPTCHA on {site.name} — skipping.")
_record_site_error(site.id, "CAPTCHA not solved")
pending_reason = "captcha not solved"
_update_round_item("pending", pending_reason)
return {"status": "pending", "new_count": new_count, "pending_reason": pending_reason}
# ── N3: Block detection after navigation ──────────────────────────
if await _detect_block(page):
print(f"[Block] 🚫 {site.name} appears to be blocking us.")
_record_site_error(site.id, "Block/rate-limit detected")
pending_reason = "block/rate-limit detected"
_update_round_item("pending", pending_reason)
return {"status": "pending", "new_count": new_count, "pending_reason": pending_reason}
# ── Website-launch delay (Mode A, first keyword only) ────────────
# Page has just opened. Delay fires here — before any scraping.
# Subsequent keywords on the same site skip this (is_first_keyword=False).
if is_first_keyword and delay_site_open > 0:
print(
f"[Scraper] 🌐 Website-launch delay: {delay_site_open}s "
f"— {site.name} opened, holding before scraping "
f"({keyword.term})"
)
await asyncio.sleep(delay_site_open)
print(f"[Scraper] ✅ Website-launch delay done")
# ── MODE B: Homepage search interaction ──────────────────────────────
else:
# search_selector is optional — _discover_search_input will try
# four semantic strategies (ARIA role, searchbox, placeholder, label)
# before falling back to the CSS selector. An empty selector simply
# means all four semantic strategies run with no CSS safety net;
# if they all fail, _discover_search_input raises RuntimeError and
# the except below logs it clearly.
sel = (site.search_selector or "").strip()
print(
f"[Scraper] MODE=HOMEPAGE {site.name} | "
f"'{keyword.term}' via {site.url_template}"
+ (f" selector='{sel}'" if sel else " (auto-discover search box)")
)
# Step 1 — land on the homepage
await page.goto(
site.url_template, timeout=60_000, wait_until="domcontentloaded"
)
# ── Website-launch delay (Mode B, first keyword only) ────────────
# Homepage has just opened — delay fires here, before the search
# box is touched. Subsequent keywords skip this entirely.
if is_first_keyword and delay_site_open > 0:
print(
f"[Scraper] 🌐 Website-launch delay: {delay_site_open}s "
f"— {site.name} homepage opened, holding before search"
)
await asyncio.sleep(delay_site_open)
print(f"[Scraper] ✅ Website-launch delay done")
# Step 2 — Search Discovery: semantic locators → CSS fallback
try:
search_el = await _discover_search_input(page, sel, site.name)
# ── Robust input — works minimised / in background ────────────
# Uses only Playwright Locator methods which are JS-driven
# internally. No OS window focus is ever needed.
#
# Why NOT page.evaluate() + element_handle():
# _discover_search_input returns a Locator. Calling
# locator.element_handle() returns None when the element is
# not uniquely resolved at that instant (e.g. during a
# re-navigation) — passing None into page.evaluate() means
# `el` is null inside the JS, so el.focus() throws
# "TypeError: el.focus is not a function".
#
# Locator.focus() / .fill() / .press() resolve the element
# fresh on every call, retry automatically on transient
# detachment, and inject their actions via CDP (Chrome
# DevTools Protocol) — no OS keyboard or mouse events.
# 1. Focus via Locator — CDP-driven, no OS focus needed
await search_el.focus()
await asyncio.sleep(_jitter(0.4, pct=0.4))
# 2. Type search term — mode depends on humanize_level
await search_el.fill("") # clear first
_hlvl_type = humanize_level.strip().lower()
if _hlvl_type == "raw":
# Raw: instant fill — no timing simulation at all
await search_el.fill(keyword.term)
elif _hlvl_type == "low":
# Low: fill in one shot, small pre/post pause
await asyncio.sleep(random.uniform(0.1, 0.3))
await search_el.fill(keyword.term)
await asyncio.sleep(random.uniform(0.1, 0.3))
elif _hlvl_type == "medium":
# Medium: char-by-char typing, variable WPM, no typos
await asyncio.sleep(_jitter(0.3, pct=0.4))
for char in keyword.term:
await search_el.press(char)
if char == " ":
await asyncio.sleep(random.uniform(0.10, 0.25))
else:
await asyncio.sleep(random.uniform(0.05, 0.10))
await asyncio.sleep(_jitter(0.4, pct=0.4))
else: # heavy
# Heavy: char-by-char, variable WPM, 12% typo+backspace,
# word boundary rhythm, pre-submit re-read pause
await asyncio.sleep(_jitter(0.5, pct=0.5))
typo_chars = "qwertyuiopasdfghjklzxcvbnm"
for char in keyword.term:
if random.random() < 0.12 and len(keyword.term) > 3:
wrong = random.choice(typo_chars)
await search_el.press(wrong)
await asyncio.sleep(random.uniform(0.08, 0.18))
await search_el.press("Backspace")
await asyncio.sleep(random.uniform(0.05, 0.15))
await search_el.press(char)
if char == " ":
await asyncio.sleep(random.uniform(0.12, 0.35))
else:
await asyncio.sleep(random.uniform(0.045, 0.110))
# Pre-submit pause — user re-reads what they typed
await asyncio.sleep(_jitter(0.6, pct=0.5))
# 3. Dispatch an explicit 'input' event as belt-and-braces
await search_el.dispatch_event("input")
# 4. Locator.press() sends Enter via CDP
await search_el.press("Enter")
# Step 3 — wait for results page to settle
await page.wait_for_load_state("networkidle", timeout=60_000)
except Exception as sel_exc:
print(
f"[Scraper] ❌ {site.name}: Search Discovery failed — "
f"{sel_exc}"
)
return 0 # bail — don't scrape the homepage itself
# ── N17: Try AI-generated selectors first ────────────────────────────
# If this site has been auto-adapted (SiteSelectors row with confidence
# >= 50 and not marked stale), use those precise selectors to extract
# directly. On success, jump straight to the hold/loop logic.
# On failure (0 results), mark stale and fall through to JS_EXTRACT.
_ai_adapted_rows: list[dict] = []
_ss_row = None
try:
_ss_db = SessionLocal()
_ss_row = _ss_db.query(SiteSelectors).filter(
SiteSelectors.site_id == site.id,
SiteSelectors.confidence >= 50,
SiteSelectors.stale == False, # noqa: E712
).first()
_ss_db.close()
except Exception:
_ss_row = None
if _ss_row:
print(f"[AutoAdapt] ⚡ {site.name}: using AI selectors "
f"(conf={_ss_row.confidence}, container='{_ss_row.container_sel}')")
_ai_adapted_rows = await _extract_with_selectors(page, _ss_row)
if _ai_adapted_rows:
print(f"[AutoAdapt] ✅ {site.name}: {len(_ai_adapted_rows)} rows via AI selectors")
# Update last_tested_at
try:
_upd_db = SessionLocal()
_upd_ss = _upd_db.query(SiteSelectors).filter(SiteSelectors.site_id == site.id).first()
if _upd_ss:
_upd_ss.last_tested_at = datetime.now()
_upd_db.flush()
_upd_db.commit()
_upd_db.close()
except Exception:
pass
else:
# AI selectors returned nothing — mark stale, fall through to JS_EXTRACT
print(f"[AutoAdapt] ⚠️ {site.name}: AI selectors returned 0 rows — marking stale")
try:
_stale_db = SessionLocal()
_stale_ss = _stale_db.query(SiteSelectors).filter(SiteSelectors.site_id == site.id).first()
if _stale_ss:
_stale_ss.stale = True
_stale_db.flush()
_stale_db.commit()
_stale_db.close()
except Exception:
pass
# Auto-heal: if Auto-Adapter is enabled, queue a re-adapt in background
if _get_config("auto_adapt_enabled", "false").lower() == "true":
print(f"[AutoAdapt] 🔄 Queuing background re-adapt for {site.name}…")
asyncio.create_task(adapt_site_now(site.id))
# ── Collect listing elements ─────────────────────────────────────────
# These selectors are tried in order; the first one that returns
# results wins. ShopGoodwill items match div.lot-card / .item-card.
listing_selectors = [
"li.s-item", # eBay
".item-cell",
"article.product-pod",
"div.lot-card", # ShopGoodwill
".item-card", # ShopGoodwill alternate
"div.listing-item",
"[data-listing-id]",
"div[class*='result']",
"li[class*='product']",
]
items = []
for sel_try in listing_selectors:
items = await page.query_selector_all(sel_try)
if items:
print(
f"[Scraper] {site.name}: matched {len(items)} items "
f"via '{sel_try}'"
)
break
if not items:
# Last-resort: any anchor whose href looks like a product page
items = await page.query_selector_all(
"a[href*='itm'], a[href*='listing'], a[href*='item'], "
"a[href*='/product/'], a[href*='/lot/']"
)
if items:
print(
f"[Scraper] {site.name}: fallback anchor match "
f"({len(items)} items)"
)
if not items:
print(f"[Scraper] ⚠️ {site.name}: no listing elements found on page.")
# ── Delay 2: post-search pause before scraping ────────────────────
if delay_post_search > 0:
print(
f"[Scraper] ⏳ Post-search delay: {delay_post_search}s "
f"— waiting {delay_post_search}s before scraping "
f"({site.name} | '{keyword.term}')"
)
await asyncio.sleep(delay_post_search)
print(f"[Scraper] ✅ Post-search delay done — starting scrape")
# ── Human simulation — level-gated ──────────────────────────────────
# raw: no simulation at all — bare requests, fastest, least safe
# low: one quick mouse move + one scroll pass
# medium: mouse + scroll + post-scroll idle
# heavy: full 5-step sequence with long idles and re-read behaviour
_hlvl = humanize_level.strip().lower()
if _hlvl == "raw":
pass # no simulation whatsoever
elif _hlvl == "low":
await _human_mouse(page)
await asyncio.sleep(_jitter(0.4, pct=0.4))
await _human_scroll(page, steps=random.randint(1, 2))
await asyncio.sleep(_jitter(0.4, pct=0.4))
elif _hlvl == "medium":
await asyncio.sleep(_jitter(0.7, pct=0.4)) # brief page-load idle
await _human_mouse(page)
await asyncio.sleep(_jitter(0.5, pct=0.4))
await _human_scroll(page, steps=random.randint(2, 4))
await asyncio.sleep(_jitter(0.8, pct=0.4)) # post-scroll idle
else: # heavy (default)
# Full 5-step sequence:
# 1. Brief idle — page just loaded, user orients themselves
# 2. Mouse moves toward content area
# 3. Scroll through results with read-rhythm pauses
# 4. More hover-reading
# 5. "Thinking" pause before acting
await asyncio.sleep(_jitter(1.2, pct=0.5)) # page-load idle
await _human_mouse(page) # initial cursor
await asyncio.sleep(_jitter(0.8, pct=0.5))
await _human_scroll(page, steps=random.randint(4, 7)) # read results
await asyncio.sleep(_jitter(0.9, pct=0.5))
await _human_mouse(page) # hover-reading
await asyncio.sleep(_jitter(1.5, pct=0.6)) # thinking pause
# ── Delay 3: page-hold re-scrape loop ───────────────────────────────
# Holds the results page for exactly `delay_page_hold` seconds total.
# The scraper runs a full pass, and as soon as that pass completes it
# checks if time remains — if yes it immediately starts the next pass
# with no idle wait between passes. The loop only ends when the hold
# timer has fully expired OR on the first pass when no hold is set.
# The DB unique-link constraint deduplicates across all passes —
# each listing URL is written exactly once, no clones ever saved.
_hold_deadline = time.time() + (delay_page_hold if delay_page_hold > 0 else 0)
_pass_num = 0
while True:
_pass_num += 1
_pass_new = 0
_pass_start = time.time()
# Always re-query the DOM on every pass so any items that loaded
# after the initial page-settle are captured.
items_current = []
for sel_try in listing_selectors:
items_current = await page.query_selector_all(sel_try)
if items_current:
break
if not items_current:
items_current = await page.query_selector_all(
"a[href*='itm'], a[href*='listing'], a[href*='item'], "
"a[href*='/product/'], a[href*='/lot/']"
)
items_current = items_current[:30]
# Log every pass — show remaining hold time
if delay_page_hold > 0:
_remaining = max(0, int(_hold_deadline - time.time()))
print(
f"[Scraper] 🔁 Page-hold pass #{_pass_num} — "
f"{_remaining}s / {delay_page_hold}s remaining "
f"({site.name} | '{keyword.term}')"
)
# ── Page-level parallel extraction ────────────────────────────────
# If AI selectors produced rows, use them directly (skip JS_EXTRACT).
# Otherwise run the universal JS extractor as before.
if _ai_adapted_rows:
page_data = _ai_adapted_rows
else:
page_data = await page.evaluate(JS_EXTRACT)
from urllib.parse import urljoin
for row in (page_data or []):
try:
title = row.get("title", "").strip()
price_text = row.get("price_text", "").strip()
time_text = row.get("time_text", "").strip()
location = row.get("location", "").strip()
href = row.get("href", "").strip()
images_list = row.get("images", [])
if not title or len(title) < 5:
continue
if href and not href.startswith("http"):
href = urljoin(page.url, href)
score = calculate_attribute_score(title, keyword.weight)
# ── Extract price and check N7 price filters ───────────────────
amount, currency = _extract_price_and_currency(price_text)
# N7: per-keyword price filter
kw_min = keyword.min_price
kw_max = keyword.max_price
if amount is not None:
if kw_min is not None and amount < kw_min:
print(f"[N7] ⬇️ Skipping '{title[:40]}' — price {amount} below min {kw_min}")
continue
if kw_max is not None and amount > kw_max:
print(f"[N7] ⬆️ Skipping '{title[:40]}' — price {amount} above max {kw_max}")
continue
# ── N16: AI filter — runs when keyword has an ai_target ───
_ai_enabled = _get_config("ai_filter_enabled", "false").lower() == "true"
_scoring_on = _get_config("scoring_enabled", "true").lower() == "true"
_ai_target = (keyword.ai_target or "").strip()
_ai_match_val = None
_ai_reason_val = None
if _ai_enabled and _ai_target:
# AI is the judge — score is calculated for display only.
# Score gate is bypassed regardless of scoring_enabled setting.
_ai_match_val, _ai_reason_val = await _ai_analyze(title, _ai_target)
if not _ai_match_val:
# AI rejected — save to DB as rejected (ai_match=0) but don't alert
_stats["total_scanned"] += 1
if not (href and db.query(Listing).filter(Listing.link == href).first()):
amount_rej, currency_rej = _extract_price_and_currency(price_text)
listing_rej = Listing(
title=title[:500],
price=amount_rej,
currency=currency_rej[:10] if currency_rej else "",
price_raw=_format_price(amount_rej, currency_rej)[:100],
time_left=_extract_time_left(time_text)[:60],
link=href or f"no-link-{random.randint(0,999999)}",
score=score,
keyword=keyword.term,
site_name=site.name,
location=location or "",
ai_match=0,
ai_reason=_ai_reason_val[:200] if _ai_reason_val else None,
images=json.dumps(images_list[:10]) if images_list else None,
)
db.add(listing_rej)
db.flush()
db.commit()
continue
elif _scoring_on:
# No AI target — fall back to score gate (only when scoring is enabled)
if score < 0:
continue
# else: scoring disabled AND no AI target → all lots pass through
_stats["total_scanned"] += 1
if href and db.query(Listing).filter(Listing.link == href).first():
continue
# ── N11: Cross-site deduplication (eBay only) ─────────────────
# If the same title already exists on a different eBay region
# within the last 24h, suppress the duplicate listing + alert.
_is_ebay = "ebay" in site.name.lower() or "ebay" in (site.url_template or "").lower()
if _is_ebay:
_cutoff = datetime.now() - timedelta(hours=24)
_recent_other_ebay = db.query(Listing).filter(
Listing.timestamp >= _cutoff,
Listing.site_name != site.name,
Listing.site_name.ilike("%ebay%"),
).all()
_is_cross_dupe = any(
difflib.SequenceMatcher(None, title.lower(), r.title.lower()).ratio() > 0.85
for r in _recent_other_ebay
)
if _is_cross_dupe:
print(f"[N11] 🔁 Cross-site duplicate suppressed: '{title[:50]}'")
continue
price_display = _format_price(amount, currency)
time_left_str = _extract_time_left(time_text)
listing = Listing(
title=title[:500],
price=amount,
currency=currency[:10] if currency else "",
price_raw=price_display[:100],
time_left=time_left_str[:60],
time_left_mins=round(timeLeftToMins(time_left_str), 4) if time_left_str and timeLeftToMins(time_left_str) != float('inf') else None,
price_updated_at=datetime.now(),
link=href or f"no-link-{random.randint(0,999999)}",
score=score,
keyword=keyword.term,
site_name=site.name,
location=location or "",
ai_match=1 if (_ai_enabled and _ai_target) else None,
ai_reason=_ai_reason_val[:200] if _ai_reason_val else None,
images=json.dumps(images_list[:10]) if images_list else None,
)
db.add(listing)
db.flush()
db.commit()
# N4: store USD price for cross-site sorting
if listing.price and listing.currency:
listing.price_usd = _convert_price(listing.price, listing.currency, "USD")
if listing.price_usd:
db.flush()
db.commit()
new_count += 1
_pass_new += 1
_stats["total_alerts"] += 1
_new_listing_links.append((listing.id, listing.link))
_redis_publish("new_listing", {
"id": listing.id, "title": title[:80],
"price": price_display, "site": site.name,
"keyword": keyword.term, "score": score,
})
# ── Alert (with AI verdict if applicable) ─────────────────
_ai_line = f"🤖 AI: ✅ {_ai_reason_val}\n" if (_ai_enabled and _ai_target and _ai_reason_val) else ""
alert = (
f"🎯
Ghost Node — New Hit\n"
f"📦 {title[:80]}\n"
f"💰 {price_display or 'Price unknown'}"
+ (f" | ⏳ {time_left_str}" if time_left_str else "") + "\n"
+ f"🏷️ Keyword:
{keyword.term} | Score: {score}\n"
+ _ai_line
+ f"🌐 Site: {site.name}\n"
f"🔗 {href[:200]}"
)
asyncio.create_task(send_alert(alert, subject=f"Ghost Node — {title[:40]}"))
except Exception as row_exc:
print(f"[Scraper] row parse error: {row_exc}")
continue
# Always log pass summary
_pass_elapsed = round(time.time() - _pass_start, 1)
_remaining_after = max(0, int(_hold_deadline - time.time()))
print(
f"[Scraper] ✓ Pass #{_pass_num} complete in {_pass_elapsed}s — "
f"{_pass_new} new | {new_count} total"
+ (f" | {_remaining_after}s hold remaining — re-scraping now" if delay_page_hold > 0 and _remaining_after > 0 else "")
)
# Exit: no hold timer set, or hold timer has now expired
if delay_page_hold <= 0 or time.time() >= _hold_deadline:
break
# Time remains — start next pass immediately (no idle wait)
# ── N5: Pagination — follow "Next page" up to site.max_pages ─────────
# Runs AFTER the page-hold loop so we only paginate when all passes
# on the current page are complete. Each new page resets the hold timer.
_max_pg = max(1, site.max_pages or 1)
_cur_pg = 1
while _cur_pg < _max_pg:
_went = await _go_next_page(page)
if not _went:
break
_cur_pg += 1
print(f"[Scraper] 📄 {site.name} | '{keyword.term}' → page {_cur_pg}/{_max_pg}")
# Brief human pause between pages
await asyncio.sleep(_jitter(2.5, pct=0.4))
if _hlvl not in ("raw",):
await _human_scroll(page, steps=random.randint(2, 4))
# Re-extract listing items on the new page
items_pg = []
for sel_try in listing_selectors:
items_pg = await page.query_selector_all(sel_try)
if items_pg:
break
if not items_pg:
items_pg = await page.query_selector_all(
"a[href*='itm'], a[href*='listing'], a[href*='item'], "
"a[href*='/product/'], a[href*='/lot/']"
)
# Run the same page-level extraction on the new page
try:
rows_pg = await page.evaluate(JS_EXTRACT)
except Exception:
rows_pg = []
for row in (rows_pg or [])[:50]:
try:
title = (row.get("title") or "").strip()
price_text = (row.get("price_text") or "").strip()
time_text = (row.get("time_text") or "").strip()
href = (row.get("href") or "").strip()
images_list2 = row.get("images", [])
if not title or len(title) < 5:
continue
if href and not href.startswith("http"):
from urllib.parse import urljoin
href = urljoin(page.url, href)
score = calculate_attribute_score(title, keyword.weight)
# ── N16: AI filter (pagination) ───────────────────────────
_ai_en2 = _get_config("ai_filter_enabled", "false").lower() == "true"
_ai_tgt2 = (keyword.ai_target or "").strip()
_ai_match2 = None
_ai_reason2 = None
if _ai_en2 and _ai_tgt2:
_ai_match2, _ai_reason2 = await _ai_analyze(title, _ai_tgt2)
if not _ai_match2:
_stats["total_scanned"] += 1
if not (href and db.query(Listing).filter(Listing.link == href).first()):
_amt_r, _cur_r = _extract_price_and_currency(price_text)
db.add(Listing(
title=title[:500], price=_amt_r,
currency=_cur_r[:10] if _cur_r else "",
price_raw=_format_price(_amt_r, _cur_r)[:100],
time_left=_extract_time_left(time_text)[:60],
link=href or f"no-link-pg{_cur_pg}-{random.randint(0,999999)}",
score=score, keyword=keyword.term, site_name=site.name,
ai_match=0,
ai_reason=_ai_reason2[:200] if _ai_reason2 else None,
images=json.dumps(images_list2[:10]) if images_list2 else None,
))
db.flush()
db.commit()
continue
else:
if score < 0:
continue
_stats["total_scanned"] += 1
if href and db.query(Listing).filter(Listing.link == href).first():
continue
amount, currency = _extract_price_and_currency(price_text)
price_display = _format_price(amount, currency)
time_left_str = _extract_time_left(time_text)
listing = Listing(
title=title[:500], price=amount,
currency=currency[:10] if currency else "",
price_raw=price_display[:100],
time_left=time_left_str[:60],
time_left_mins=round(timeLeftToMins(time_left_str), 4) if time_left_str and timeLeftToMins(time_left_str) != float("inf") else None,
price_updated_at=datetime.now(),
link=href or f"no-link-pg{_cur_pg}-{random.randint(0,999999)}",
score=score, keyword=keyword.term, site_name=site.name,
ai_match=1 if (_ai_en2 and _ai_tgt2) else None,
ai_reason=_ai_reason2[:200] if _ai_reason2 else None,
images=json.dumps(images_list2[:10]) if images_list2 else None,
)
db.add(listing)
db.flush()
db.commit()
new_count += 1
_stats["total_alerts"] += 1
_new_listing_links.append((listing.id, listing.link))
_tl_pg = f" | ⏳ {time_left_str}" if time_left_str else ""
_ai_line2 = f"🤖 AI: ✅ {_ai_reason2}\n" if (_ai_en2 and _ai_tgt2 and _ai_reason2) else ""
alert = (
"🎯
Ghost Node — New Hit"
f" (p{_cur_pg})\n"
f"📦 {title[:80]}\n"
f"💰 {price_display or 'Price unknown'}{_tl_pg}\n"
f"🏷️ Keyword:
{keyword.term} | Score: {score}\n"
+ _ai_line2
+ f"🌐 Site: {site.name}\n"
f"🔗 {href[:200]}"
)
asyncio.create_task(send_alert(alert, subject=f"Ghost Node — {title[:40]}"))
except Exception:
continue
print(f"[Scraper] ✓ {site.name} | '{keyword.term}' page {_cur_pg} → {new_count} total new")
except Exception as nav_exc:
# Baghdad Optimization — single site failure never crashes the engine
print(f"[Scraper] ⚠️ {site.name} | {keyword.term} → {nav_exc}")
_record_site_error(site.id, str(nav_exc)[:400])
pending_reason = f"navigation failed: {str(nav_exc)[:200]}"
_update_round_item("pending", pending_reason)
return {"status": "pending", "new_count": new_count, "pending_reason": pending_reason}
# ── Immediate detail-page image fetch for new listings ──────────────────
# Search results pages only have thumbnails. Visit each new lot's detail
# page NOW (same browser context) to grab all images immediately instead
# of waiting for the 5-minute price refresh pass.
if _new_listing_links:
print(f"[Images] 🖼️ Fetching detail images for {len(_new_listing_links)} new listing(s)…")
try:
_img_updated = await _fetch_listing_images_batch(
page.context, _new_listing_links, db
)
print(f"[Images] ✅ {_img_updated}/{len(_new_listing_links)} listings got full images")
except Exception as img_exc:
print(f"[Images] ⚠️ Batch image fetch failed: {img_exc}")
# ── N13: Record success after clean completion ─────────────────────────────
_record_site_success(site.id)
_update_round_item("done", None)
return {"status": "done", "new_count": new_count, "pending_reason": None}
# ─────────────────────────────────────────────────────────────────────────────
# N9 — Closing-Soon Alert Loop (Thread E)
# ─────────────────────────────────────────────────────────────────────────────
async def closing_alert_loop() -> None:
"""
Multi-interval closing alert loop.
Config: closing_alert_enabled (true/false)
closing_alert_schedule (comma-separated minutes, e.g. "60,30,10,5")
Use "0" to disable all closing alerts while keeping capture alerts.
Each lot can fire multiple alerts — one per configured threshold.
Tracks fired intervals in Listing.closing_alerts_sent (JSON list).
"""
print("[Thread E] Closing-alert loop online.")
while True:
try:
enabled = _get_config("closing_alert_enabled", "false").lower() == "true"
if enabled:
schedule_raw = _get_config("closing_alert_schedule", "30").strip()
# Parse thresholds — "0" means no closing alerts at all
thresholds: list[float] = []
for t in schedule_raw.split(","):
t = t.strip()
if t and t != "0":
try:
thresholds.append(float(t))
except ValueError:
pass
thresholds = sorted(set(thresholds), reverse=True) # e.g. [60, 30, 10, 5]
if thresholds:
db = SessionLocal()
try:
from datetime import timedelta
stale_cutoff = datetime.now() - timedelta(days=7)
max_threshold = max(thresholds)
candidates = (
db.query(Listing)
.filter(
Listing.time_left_mins != None,
Listing.time_left_mins > 0,
Listing.timestamp >= stale_cutoff,
)
.all()
)
for lot in candidates:
if not lot.timestamp:
continue
ref_time = lot.price_updated_at or lot.timestamp
elapsed_mins = (datetime.now() - ref_time).total_seconds() / 60.0
remaining = (lot.time_left_mins or 0) - elapsed_mins
if remaining <= 0:
# Mark all thresholds as sent to avoid re-processing
lot.closing_alerts_sent = json.dumps(thresholds)
db.flush()
db.commit()
continue
# Load which intervals have already fired
try:
fired: list = json.loads(lot.closing_alerts_sent or "[]")
except Exception:
fired = []
for threshold in thresholds:
if threshold in fired:
continue # already sent for this interval
if remaining <= threshold:
mins_int = int(remaining)
secs_int = int((remaining - mins_int) * 60)
time_str = (
f"{mins_int}m {secs_int:02d}s"
if mins_int < 60
else f"{int(remaining/60)}h {mins_int % 60}m"
)
alert = (
f"⏰
CLOSING SOON — {time_str} left!\n"
f"📦 {lot.title[:80]}\n"
f"💰 {lot.price_raw or 'Price unknown'}\n"
f"🏷️ Keyword:
{lot.keyword} | Score: {lot.score}\n"
f"🌐 Site: {lot.site_name}\n"
f"🔗 {lot.link[:200]}"
)
asyncio.create_task(send_alert(alert, subject=f"CLOSING SOON — {lot.title[:40]}"))
fired.append(threshold)
lot.closing_alerts_sent = json.dumps(fired)
db.flush()
db.commit()
print(f"[Thread E] ⏰ @{threshold}min alert: {lot.title[:50]} ({time_str} left)")
finally:
db.close()
except Exception as exc:
print(f"[Thread E] Error: {exc}")
await asyncio.sleep(60)
def run_closing_alert_thread() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(closing_alert_loop())
async def nuclear_engine() -> None:
"""
Main scraper loop — runs forever.
Pulls a FRESH copy of TargetSites + Config from the DB at the TOP of
every cycle, so any site/keyword added via the UI is immediately active.
"""
print("[Thread B] Nuclear engine igniting…")
_stats["engine_status"] = "Running"
async with async_playwright() as pw:
while True:
if _stats["engine_status"] == "Paused":
await asyncio.sleep(10)
continue
# ── N8: Scrape-window check — skip cycle outside allowed hours ────
_win_enabled = _get_config("scrape_window_enabled", "false").lower() == "true"
if _win_enabled:
_now_hour = datetime.now().hour
_start_h = int(_get_config("scrape_start_hour", "8"))
_end_h = int(_get_config("scrape_end_hour", "22"))
# Handles same-day windows (08:00–22:00) and overnight windows (22:00–06:00)
if _start_h <= _end_h:
_in_window = _start_h <= _now_hour < _end_h
else: # overnight window e.g. 22–06
_in_window = _now_hour >= _start_h or _now_hour < _end_h
if not _in_window:
print(
f"[Thread B] 🌙 Outside scrape window ({_start_h:02d}:00–{_end_h:02d}:00) "
f"— current hour {_now_hour:02d}:xx. Sleeping 5min."
)
await asyncio.sleep(300)
continue
# ── Pull live config from DB — fresh session every cycle ──────────
db = SessionLocal()
try:
keywords = db.query(Keyword).order_by(Keyword.sort_order.asc(), Keyword.id.asc()).all()
# Only rows where enabled is explicitly 1.
# We also audit and log any disabled rows so the operator
# can confirm that toggling a site off actually prevents
# the engine from touching it.
target_sites = db.query(TargetSite).filter(TargetSite.enabled == 1).all()
disabled_sites = db.query(TargetSite).filter(TargetSite.enabled != 1).all()
timer_val = int(_get_config("timer", "120"))
delay_launch = int(_get_config("delay_launch", "0"))
delay_site_open = int(_get_config("delay_site_open", "0"))
delay_search = int(_get_config("delay_post_search", "0"))
delay_hold = int(_get_config("delay_page_hold", "0"))
humanize_level = _get_config("humanize_level", "heavy").strip().lower()
if humanize_level not in ("raw", "low", "medium", "heavy"):
humanize_level = "heavy"
print(f"[Thread B] 🎭 Humanize level: {humanize_level.upper()}")
# ── Parallel execution settings ────────────────────────────────
max_concurrent_browsers = max(1, int(_get_config("max_concurrent_browsers", "1")))
max_tabs_per_site = max(1, int(_get_config("max_tabs_per_site", "1")))
keyword_batch_enabled = _get_config("keyword_batch_enabled", "false").lower() == "true"
finally:
db.close()
# N4: Refresh FX rates once per cycle
await _get_fx_rates()
# ── Log exactly which sites the engine will scrape this cycle ─────
if disabled_sites:
skipped = ", ".join(f"'{s.name}'" for s in disabled_sites)
print(f"[Thread B] ⏭️ Skipping disabled site(s): {skipped}")
if target_sites:
site_names = ", ".join(f"'{s.name}'" for s in target_sites)
print(f"[Thread B] 🔄 Cycle starting — {len(target_sites)} site(s): {site_names}")
else:
print("[Thread B] ⚠️ No enabled TargetSites in DB — sleeping 60s.")
await asyncio.sleep(60)
continue
if not keywords:
print("[Thread B] ⚠️ No Keywords in DB — sleeping 60s.")
await asyncio.sleep(60)
continue
_stats["engine_status"] = "Running"
_redis_set_stats(_stats)
_redis_publish("engine_status", {"status": "Running"})
cycle_start = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Keyword batching/progress tracking (optional)
active_round_id: int | None = None
selected_by_site: dict[int, list[tuple[Keyword, int | None]]] = {}
if keyword_batch_enabled:
from datetime import timedelta
now_dt = datetime.now()
round_deadline = now_dt + timedelta(hours=4)
def _create_round_and_items(db_round: Session, sites, kws) -> ScrapeRound:
r = ScrapeRound(started_at=datetime.now(), status="active")
db_round.add(r)
db_round.flush()
for s in sites:
for kw in kws:
db_round.add(
ScrapeRoundItem(
round_id=r.id,
site_id=s.id,
keyword_id=kw.id,
status="pending",
attempt_count=0,
first_pending_at=None,
last_attempt_at=None,
last_error=None,
last_hour_warn_at=None,
)
)
db_round.flush()
return r
db_round = SessionLocal()
try:
# Load (or create) active round.
active_round = (
db_round.query(ScrapeRound)
.filter(ScrapeRound.status == "active")
.order_by(ScrapeRound.started_at.desc())
.first()
)
if not active_round:
active_round = _create_round_and_items(db_round, target_sites, keywords)
db_round.commit()
else:
# If retry window expired, finish + start a new round now.
deadline = active_round.started_at + timedelta(hours=4)
if datetime.now() >= deadline:
# Mark all remaining pending/in-progress keyword attempts as failed.
for ri in db_round.query(ScrapeRoundItem).filter(
ScrapeRoundItem.round_id == active_round.id,
ScrapeRoundItem.status.in_(["pending", "in_progress"]),
).all():
ri.status = "failed"
ri.last_error = "Retry window expired (4h)"
ri.last_attempt_at = datetime.now()
db_round.flush()
active_round.status = "finished"
active_round.finished_at = datetime.now()
db_round.flush()
active_round = _create_round_and_items(db_round, target_sites, keywords)
db_round.commit()
active_round_id = active_round.id
# Ensure round items exist for current (enabled sites × keywords).
# Insert any missing pairs as pending.
existing = (
db_round.query(ScrapeRoundItem.site_id, ScrapeRoundItem.keyword_id)
.filter(ScrapeRoundItem.round_id == active_round_id)
.all()
)
existing_pairs = {(sid, kid) for sid, kid in existing}
missing = []
for s in target_sites:
for kw in keywords:
if (s.id, kw.id) not in existing_pairs:
missing.append(
ScrapeRoundItem(
round_id=active_round_id,
site_id=s.id,
keyword_id=kw.id,
status="pending",
attempt_count=0,
first_pending_at=None,
)
)
if missing:
db_round.add_all(missing)
db_round.flush()
db_round.commit()
# Select up to max_tabs_per_site pending keywords per site.
pending_items = (
db_round.query(ScrapeRoundItem)
.filter(
ScrapeRoundItem.round_id == active_round_id,
ScrapeRoundItem.status == "pending",
)
.all()
)
pending_map: dict[tuple[int, int], ScrapeRoundItem] = {
(ri.site_id, ri.keyword_id): ri for ri in pending_items
}
selected_pairs: dict[int, list[tuple[Keyword, int | None]]] = {
s.id: [] for s in target_sites
}
now_sel = datetime.now()
for s in target_sites:
for kw in keywords:
ri = pending_map.get((s.id, kw.id))
if not ri:
continue
selected_pairs[s.id].append((kw, ri.id))
ri.status = "in_progress"
ri.attempt_count = (ri.attempt_count or 0) + 1
ri.last_attempt_at = now_sel
ri.last_error = None
db_round.flush()
if len(selected_pairs[s.id]) >= max_tabs_per_site:
break
db_round.commit()
selected_by_site = selected_pairs
finally:
db_round.close()
# ── Parallel execution: one coroutine per site ────────────────────
# Semaphore caps how many browsers can be active at the same time.
# When max_concurrent_browsers=1 (default) behaviour is identical
# to the old sequential loop — no risk, no change in output.
_browser_sem = asyncio.Semaphore(max_concurrent_browsers)
async def _scrape_one_site(site) -> None:
"""Launch a browser for one site, scrape all keywords (optionally
in parallel tabs), then close the browser."""
async with _browser_sem:
batch_pairs: list[tuple[Keyword, int | None]]
if keyword_batch_enabled:
batch_pairs = selected_by_site.get(site.id, [])
if not batch_pairs:
return
else:
batch_pairs = [(kw, None) for kw in keywords]
# ── Resolve settings inside the coroutine (thread-safe reads) ──
_browser_label, _browser_exe = _resolve_browser()
_incognito = _get_config("incognito_mode", "false").lower() == "true"
_global_show_browser = _get_config("show_browser", "false").lower() == "true"
# Per-site override:
# - if global show_browser=true -> visible for all sites
# - else -> site becomes visible only if custom_visible_browser=1
_site_custom_visible = int(getattr(site, "custom_visible_browser", 0) or 0) == 1
_show_browser = _global_show_browser or _site_custom_visible
_headless = not _show_browser
if _show_browser:
print(
f"[Browser] 👁️ VISIBLE MODE — browser window will open on screen. "
f"Close Ghost Node or toggle off in Settings when done debugging."
)
_launch_args = [
"--no-sandbox",
# ── Background-throttling kill switches ───────────────
# These flags tell Chromium's internal scheduler and
# renderer to treat this browser exactly the same whether
# it is the foreground window, minimised, or behind other
# windows. No OS focus is ever needed.
#
# Without these flags Chromium intentionally slows down
# background tabs: JS timers fire at 1 Hz instead of
# normal rate, GPU compositing pauses, and wake-locks are
# dropped — all of which cause silent scraping failures.
# Prevents the renderer process from being deprioritised
# when the window loses OS focus or is minimised
"--disable-renderer-backgrounding",
# Prevents background tabs from having their JS timers
# throttled to 1-second intervals
"--disable-background-timer-throttling",
# Prevents Chromium from pausing rendering for windows
# that are fully hidden behind other windows
"--disable-backgrounding-occluded-windows",
# Keeps the GPU process and compositor running at full
# speed even when no visible frame is being presented
"--disable-background-networking",
# Forces Chromium to keep all tabs at the same priority
# regardless of visibility
"--force-fieldtrials=BackgroundTabStopping/disable",
]
# Pick a consistent agent profile — UA, platform, language,
# locale and timezone all match so HTTP headers agree with
# navigator properties (detectors cross-check these).
_profile = random.choice(_agent_profiles)
_vw, _vh = _profile["viewport"]
_launch_kwargs_base: dict = {
"headless": _headless,
"args": _launch_args,
"user_agent": _profile["ua"],
"viewport": {"width": _vw, "height": _vh},
"locale": _profile["locale"],
"timezone_id": _profile["tz"],
"extra_http_headers": {
"Accept-Language": _profile["lang"],
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
},
}
if _browser_exe:
_launch_kwargs_base["executable_path"] = _browser_exe
# N1: Add proxy if enabled
_proxy = _get_proxy()
if _proxy:
_launch_kwargs_base["proxy"] = _proxy
_visibility_tag = "VISIBLE 👁" if _show_browser else "headless"
try:
if _incognito:
browser = await pw.chromium.launch(
headless=_headless,
args=_launch_args,
**({"executable_path": _browser_exe} if _browser_exe else {}),
)
context = await browser.new_context(
user_agent=_profile["ua"],
viewport={"width": _vw, "height": _vh},
locale=_profile["locale"],
timezone_id=_profile["tz"],
)
print(f"[Browser] 🕵️ Launched {_browser_label} — INCOGNITO + {_visibility_tag}")
else:
# Persistent: per-site profile dirs so cookie jars
# don't bleed across different sites.
_site_slug = re.sub(r"[^\w]", "_", site.name.lower())[:20]
_profile_dir = os.path.join(
os.path.dirname(__file__), ".browser_profiles", _site_slug
)
os.makedirs(_profile_dir, exist_ok=True)
context = await pw.chromium.launch_persistent_context(
_profile_dir,
**_launch_kwargs_base,
)
browser = None # persistent context manages its own lifecycle
print(f"[Browser] 🚀 Launched {_browser_label} — NORMAL + {_visibility_tag}")
# ── 30-property stealth init script ──────────────────────
# Built from the selected agent profile so every property
# (UA, platform, language, WebGL renderer, canvas noise,
# audio noise, screen size, timing) is internally consistent.
await context.add_init_script(_build_stealth_script(_profile))
# ── Delay 1: post-launch settle time ─────────────────────
if delay_launch > 0:
print(
f"[Browser] ⏳ Post-launch delay: {delay_launch}s "
f"— browser open, waiting {delay_launch}s before first navigation"
)
await asyncio.sleep(delay_launch)
print(f"[Browser] ✅ Post-launch delay done — navigating now")
# ── Tab-level semaphore: caps parallel keywords per site ──
# Each keyword runs in its own page (tab) within this
# browser context. Tabs share cookies/session (good for
# sites that require login) but have independent V8 runtimes
# and network stacks so they don't block each other.
# Each tab gets its own DB session to avoid SQLAlchemy
# thread-local conflicts across concurrent coroutines.
_tab_sem = asyncio.Semaphore(max_tabs_per_site)
async def _scrape_one_keyword(kw, round_item_id: int | None, is_first: bool) -> None:
async with _tab_sem:
page = await context.new_page()
await page.route(
"**/*.{png,jpg,jpeg,gif,svg,woff,woff2,ttf,mp4,webp}",
lambda route: route.abort(),
)
_kw_db = SessionLocal()
try:
outcome = await scrape_site(
page, site, kw, _kw_db,
delay_post_search=delay_search,
delay_page_hold=delay_hold,
delay_site_open=delay_site_open,
is_first_keyword=is_first,
humanize_level=humanize_level,
round_item_id=round_item_id,
)
found = int(outcome.get("new_count") or 0)
print(
f"[Scraper] ✓ {site.name} | '{kw.term}' "
f"→ {found} new ({outcome.get('status')})"
)
finally:
_kw_db.close()
await page.close()
if max_tabs_per_site == 1:
# Sequential mode — preserve the original inter-keyword
# jitter so timing patterns stay natural.
for _kw_idx, (kw, ri_id) in enumerate(batch_pairs):
await _scrape_one_keyword(kw, ri_id, is_first=(_kw_idx == 0))
if _kw_idx < len(batch_pairs) - 1:
jitter = _jitter(random.uniform(8, 20), pct=0.4)
await asyncio.sleep(jitter)
else:
# Parallel tab mode — all keywords start simultaneously,
# capped by max_tabs_per_site. No inter-keyword sleep
# because tabs are already staggered by network I/O.
tab_tasks = [
_scrape_one_keyword(kw, ri_id, is_first=(i == 0))
for i, (kw, ri_id) in enumerate(batch_pairs)
]
await asyncio.gather(*tab_tasks, return_exceptions=True)
# Close the context (both modes); close browser only if not persistent
await context.close()
if browser is not None:
await browser.close()
except Exception as browser_exc:
print(f"[Thread B] Browser error on {site.name}: {browser_exc}")
# ── Launch all site coroutines — sequential or parallel ───────────
if max_concurrent_browsers == 1:
print(f"[Thread B] 🔁 Sequential mode (max_concurrent_browsers=1)")
for site in target_sites:
await _scrape_one_site(site)
else:
capped = min(max_concurrent_browsers, len(target_sites))
print(
f"[Thread B] ⚡ Parallel mode — {capped} browser(s) × "
f"{max_tabs_per_site} tab(s)/site"
)
await asyncio.gather(
*[_scrape_one_site(site) for site in target_sites],
return_exceptions=True,
)
# If keyword batching is enabled, finish the active round once
# every (site, keyword) has become done/failed.
if keyword_batch_enabled and active_round_id is not None:
db_check = SessionLocal()
try:
remaining = (
db_check.query(ScrapeRoundItem)
.filter(
ScrapeRoundItem.round_id == active_round_id,
ScrapeRoundItem.status.in_(["pending", "in_progress"]),
)
.count()
)
if remaining == 0:
r = db_check.query(ScrapeRound).filter(ScrapeRound.id == active_round_id).first()
if r and r.status == "active":
r.status = "finished"
r.finished_at = datetime.now()
db_check.flush()
db_check.commit()
finally:
db_check.close()
# Hourly warning bookkeeping for pending keyword retries.
# Only items that have already had at least one failed attempt
# (attempt_count > 0) are eligible for hourly warnings.
if keyword_batch_enabled and active_round_id is not None:
db_warn = SessionLocal()
try:
r = db_warn.query(ScrapeRound).filter(ScrapeRound.id == active_round_id).first()
now_warn = datetime.now()
if r:
updated = 0
pending_items = (
db_warn.query(ScrapeRoundItem)
.filter(
ScrapeRoundItem.round_id == active_round_id,
ScrapeRoundItem.status == "pending",
)
.all()
)
for ri in pending_items:
if not (ri.attempt_count or 0) > 0:
continue
base = ri.last_hour_warn_at or ri.first_pending_at or r.started_at
if not base:
continue
if (now_warn - base).total_seconds() >= 3600:
ri.last_hour_warn_at = now_warn
updated += 1
if updated:
db_warn.flush()
db_warn.commit()
finally:
db_warn.close()
_stats["last_cycle"] = cycle_start
_stats["engine_status"] = "Idle — waiting next cycle"
_redis_set_stats(_stats)
_redis_publish("cycle_complete", {"last_cycle": cycle_start})
# ── N8: Boost mode — shorten interval when a lot closes soon ──────
# Check if any tracked lot closes within 30 min. If so, use
# boost_interval_mins instead of the normal timer so the engine
# refreshes more often during the critical closing window.
_boost_secs = int(_get_config("boost_interval_mins", "2")) * 60
_db_boost = SessionLocal()
try:
_soon_cutoff = datetime.now() + timedelta(minutes=30)
_closing_soon = _db_boost.query(Listing).filter(
Listing.time_left_mins != None,
Listing.time_left_mins <= 30,
Listing.time_left_mins > 0,
).count()
finally:
_db_boost.close()
_effective_timer = _boost_secs if _closing_soon else timer_val
_boost_label = f" [⚡ BOOST MODE — {_closing_soon} lot(s) closing soon]" if _closing_soon else ""
# Apply ±25% jitter to the cycle timer so requests never arrive
# at a perfectly predictable interval (a classic bot signature).
_sleep_actual = _jitter(_effective_timer, pct=0.25)
print(f"[Thread B] ✅ Cycle complete. Sleeping {int(_sleep_actual)}s (timer={_effective_timer}s ±25%).{_boost_label}")
# Poll every 5 s so that any API write (new keyword, site, config
# change) sets _cycle_now and the engine wakes up immediately
# instead of waiting the full inter-cycle sleep.
_slept = 0.0
_poll_interval = 5.0
while _slept < _sleep_actual:
if _cycle_now.is_set():
_cycle_now.clear()
print("[Thread B] ⚡ Change detected — skipping sleep, starting new cycle now.")
break
_chunk = min(_poll_interval, _sleep_actual - _slept)
await asyncio.sleep(_chunk)
_slept += _chunk
async def _price_refresh_pass() -> None:
"""
Single price-refresh pass — visits every saved lot page, pulls current
price + time-left, writes changes to DB.
Runs in its OWN dedicated asyncio event loop (Thread D) so it is
completely isolated from the main scraper loop (Thread B).
The two loops never share an event loop, never block each other, and
never compete for the same browser instance. SQLite handles concurrent
DB writes via its WAL journal — each function uses its own SessionLocal.
"""
db = SessionLocal()
try:
listings = db.query(Listing).filter(Listing.link.notlike("no-link-%")).all()
except Exception as exc:
print(f"[Refresh] ❌ DB read failed: {exc}")
db.close()
return
if not listings:
db.close()
return
print(f"[Refresh] 🔄 Starting price pass — {len(listings)} lot(s)…")
updated = 0
try:
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=True,
args=[
"--no-sandbox",
"--disable-renderer-backgrounding",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
],
)
context = await browser.new_context(
user_agent=random.choice(_rotating_agents),
locale="en-GB",
)
await context.add_init_script(
"Object.defineProperty(navigator,'webdriver',{get:()=>undefined});"
"document.hasFocus=()=>true;"
)
for listing in listings:
try:
page = await context.new_page()
await page.route(
"**/*.{png,jpg,jpeg,gif,svg,woff,woff2,ttf,mp4,webp}",
lambda route: route.abort(),
)
await page.goto(
listing.link, timeout=30_000, wait_until="domcontentloaded"
)
# ── Smart wait: Apollo cache polling (HiBid & GraphQL SPAs) ───────
_lot_id_m = re.search(r'/(?:lot|item|product)/(\d+)', listing.link, re.IGNORECASE)
_lot_id = _lot_id_m.group(1) if _lot_id_m else None
try:
await page.wait_for_function(
JS_APOLLO_WAIT, arg=_lot_id, timeout=8000, polling=200
)
except Exception:
pass
await page.wait_for_timeout(1500)
# Pull price + time from the detail page
data = await page.evaluate(r"""() => {
const PRICE_SELS = [
'[class*="current-bid"] [class*="amount"]',
'[class*="current-bid"]', '[class*="bid-amount"]',
'.s-item__price', '[itemprop="price"]',
'span[class*="price"]', '.price', '[class*="price"]',
];
const TIME_SELS = [
'[class*="time-left"]', '[class*="timeleft"]',
'[class*="countdown"]', '[class*="closing-time"]',
'[class*="time-remaining"]', '[class*="ends-in"]',
'.s-item__time-left', '[class*="expire"]',
'[class*="end-time"]', 'time',
];
const q = sels => { for (const s of sels) {
try { const el = document.querySelector(s); if (el) return el; }
catch(e) {} } return null; };
const pe = q(PRICE_SELS), te = q(TIME_SELS);
let pt = pe ? (
pe.innerText ||
pe.getAttribute('data-price') ||
pe.getAttribute('content') || ''
).trim() : '';
if (pt.includes('\n')) {
const ln = pt.split('\n').find(x => /\d/.test(x));
if (ln) pt = ln.trim();
}
return { price_text: pt, time_text: te ? te.innerText.trim() : '' };
}""")
# Pull images via shared 5-layer extractor (same as initial scrape)
img_urls = await page.evaluate(JS_DETAIL_IMAGES)
await page.close()
price_text = (data.get("price_text") or "").strip()
time_text = (data.get("time_text") or "").strip()
# img_urls already set by JS_DETAIL_IMAGES evaluate above
if not price_text and not time_text and not img_urls:
continue
amount, currency = _extract_price_and_currency(price_text)
price_display = _format_price(amount, currency)
time_left_str = _extract_time_left(time_text)
tl_mins = round(timeLeftToMins(time_left_str), 4) if time_left_str and timeLeftToMins(time_left_str) != float('inf') else None
changed = False
if amount is not None and amount != listing.price:
listing.price = amount
listing.currency = currency[:10] if currency else ""
listing.price_raw = price_display[:100]
changed = True
if time_left_str and time_left_str != listing.time_left:
listing.time_left = time_left_str[:60]
listing.time_left_mins = tl_mins
changed = True
# Update images whenever the URL set differs from what we stored —
# handles count changes (0→5, 1→5) AND quality upgrades where count
# stays the same but URLs differ (thumbnail→full-size).
# Guard: never overwrite a good set with an empty result.
if img_urls:
existing_imgs = []
try: existing_imgs = json.loads(listing.images or "[]")
except Exception: pass
if img_urls != existing_imgs:
listing.images = json.dumps(img_urls[:10])
changed = True
if changed:
listing.price_updated_at = datetime.now()
db.commit()
updated += 1
print(
f"[Refresh] ✅ {listing.title[:35]} → "
f"{price_display} | {time_left_str}"
)
except Exception as lot_exc:
print(f"[Refresh] ⚠️ {listing.link[:55]}: {lot_exc}")
continue
await browser.close()
except Exception as exc:
print(f"[Refresh] ❌ Browser error: {exc}")
finally:
db.close()
print(f"[Refresh] ✅ Pass done — {updated}/{len(listings)} updated.")
def run_scraper_thread() -> None:
"""Thread B — main scraper + Telegram C2. Never touches price refresh."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
asyncio.gather(nuclear_engine(), telegram_c2_loop())
)
def run_refresh_thread() -> None:
"""
Thread D — price/time-left refresh, completely isolated from Thread B.
Runs its own asyncio event loop so it never competes with the scraper
for the event loop, browser instances, or DB connections.
SQLite WAL mode handles concurrent writes from both threads safely.
Wakes every 5 minutes, runs a full refresh pass, then sleeps again.
If the pass takes longer than 5 min (large listing table) the next
pass starts immediately after — no overlap possible.
"""
import asyncio as _aio
async def _loop():
INTERVAL = 300 # 5 minutes
print("[Thread D] 💰 Price-refresh thread online.")
while True:
await _aio.sleep(INTERVAL)
try:
await _price_refresh_pass()
except Exception as exc:
print(f"[Thread D] ❌ Unhandled error in refresh pass: {exc}")
loop = _aio.new_event_loop()
_aio.set_event_loop(loop)
loop.run_until_complete(_loop())
# ─────────────────────────────────────────────────────────────────────────────
# Thread A — FastAPI Dashboard
# ─────────────────────────────────────────────────────────────────────────────
app = FastAPI(title="Ghost Node", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ── Static files / Dashboard ─────────────────────────────────────────────────
DASHBOARD_PATH = os.path.join(os.path.dirname(__file__), "dashboard.html")
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard():
# Prefer Next.js static build when it exists; fall back to legacy dashboard.html
_next_index = os.path.join(os.path.dirname(__file__), "frontend", "out", "index.html")
if os.path.exists(_next_index):
with open(_next_index, "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
if os.path.exists(DASHBOARD_PATH):
with open(DASHBOARD_PATH, "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
return HTMLResponse("
Dashboard not found
", status_code=404)
@app.get("/legacy", response_class=HTMLResponse)
async def serve_legacy():
"""Always serves the original dashboard.html regardless of Next.js build."""
if os.path.exists(DASHBOARD_PATH):
with open(DASHBOARD_PATH, "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
return HTMLResponse("
Legacy dashboard not found
", status_code=404)
# ── Stats ────────────────────────────────────────────────────────────────────
@app.get("/api/stats")
def get_stats():
uptime = int(time.time() - _stats["uptime_start"])
return {**_stats, "uptime_seconds": uptime}
# ── Listings ─────────────────────────────────────────────────────────────────
@app.get("/api/listings")
def get_listings(limit: int = 100, db: Session = Depends(get_db)):
rows = (
db.query(Listing)
.order_by(Listing.timestamp.desc())
.limit(limit)
.all()
)
return [r.to_dict() for r in rows]
@app.delete("/api/listings/{listing_id}")
def delete_listing(listing_id: int, db: Session = Depends(get_db)):
row = db.query(Listing).filter(Listing.id == listing_id).first()
if not row:
return JSONResponse({"error": "not found"}, status_code=404)
db.delete(row)
db.commit()
return {"status": "deleted"}
@app.delete("/api/listings")
def clear_listings(db: Session = Depends(get_db)):
db.query(Listing).delete()
db.commit()
return {"status": "cleared"}
@app.get("/api/listings/countdown-sync")
def countdown_sync(db: Session = Depends(get_db)):
"""
Lightweight endpoint polled every 60s by the dashboard countdown ticker.
Returns only the fields needed to keep the live countdown accurate:
- id
- time_left_mins (float, updated by price-refresh Thread D)
- price_updated_at (ISO string — the reference time time_left_mins was measured)
- timestamp (fallback reference if price_updated_at is null)
Much cheaper than /api/listings since it skips title/price/link/score/etc.
The frontend uses this to silently patch data-tlmins and data-captured on
each .tl-cell without triggering a full table re-render.
"""
rows = db.query(
Listing.id,
Listing.time_left_mins,
Listing.price_updated_at,
Listing.timestamp,
).all()
return [
{
"id": r.id,
"time_left_mins": r.time_left_mins,
"price_updated_at": r.price_updated_at.isoformat() if r.price_updated_at else None,
"timestamp": r.timestamp.isoformat() if r.timestamp else None,
}
for r in rows
]
@app.get("/api/listings/refresh-status")
def get_refresh_status(db: Session = Depends(get_db)):
"""
Returns the most recent price_updated_at across all listings.
The dashboard polls this every 30s and re-fetches /api/listings
when the timestamp changes — so price updates appear automatically.
"""
from sqlalchemy import func as sqlfunc
latest = db.query(sqlfunc.max(Listing.price_updated_at)).scalar()
return {
"last_price_update": latest.isoformat() if latest else None,
"listing_count": db.query(Listing).count(),
}
# ── Keywords ─────────────────────────────────────────────────────────────────
@app.get("/api/keywords")
def get_keywords(db: Session = Depends(get_db)):
return [k.to_dict() for k in db.query(Keyword).order_by(Keyword.sort_order.asc(), Keyword.id.asc()).all()]
@app.post("/api/keywords")
async def add_keyword(request: Request, db: Session = Depends(get_db)):
body = await request.json()
term = str(body.get("term", "")).strip()
weight = int(body.get("weight", 1))
if not term:
return JSONResponse({"error": "term required"}, status_code=400)
existing = db.query(Keyword).filter(Keyword.term == term).first()
if existing:
return JSONResponse({"error": "duplicate"}, status_code=409)
# assign sort_order = max + 1 so new keyword goes to the bottom
max_order = db.query(Keyword).count()
kw = Keyword(term=term, weight=weight, sort_order=max_order)
db.add(kw)
db.commit()
db.refresh(kw)
_cycle_now.set() # wake scraper immediately
return kw.to_dict()
@app.put("/api/keywords/{kw_id}")
async def update_keyword(kw_id: int, request: Request, db: Session = Depends(get_db)):
"""Update keyword term, weight, ai_target, min_price, max_price, and/or sort_order."""
row = db.query(Keyword).filter(Keyword.id == kw_id).first()
if not row:
return JSONResponse({"error": "not found"}, status_code=404)
body = await request.json()
if "term" in body:
new_term = str(body["term"]).strip()
if new_term and new_term != row.term:
conflict = db.query(Keyword).filter(Keyword.term == new_term, Keyword.id != kw_id).first()
if conflict:
return JSONResponse({"error": "duplicate term"}, status_code=409)
row.term = new_term
if "weight" in body:
row.weight = max(1, int(body["weight"] or 1))
if "ai_target" in body:
row.ai_target = str(body["ai_target"]).strip() or None
if "min_price" in body:
v = body["min_price"]
row.min_price = float(v) if v not in (None, "", "null") else None
if "max_price" in body:
v = body["max_price"]
row.max_price = float(v) if v not in (None, "", "null") else None
if "sort_order" in body:
row.sort_order = int(body["sort_order"])
db.flush()
db.commit()
db.refresh(row)
_cycle_now.set() # wake scraper immediately
return row.to_dict()
@app.post("/api/keywords/reorder")
async def reorder_keywords(request: Request, db: Session = Depends(get_db)):
"""Accepts {order: [id, id, ...]} and bulk-updates sort_order."""
body = await request.json()
ids = body.get("order", [])
for idx, kw_id in enumerate(ids):
db.query(Keyword).filter(Keyword.id == kw_id).update({"sort_order": idx})
db.flush()
db.commit()
return {"status": "reordered"}
@app.delete("/api/keywords/{kw_id}")
def delete_keyword(kw_id: int, db: Session = Depends(get_db)):
row = db.query(Keyword).filter(Keyword.id == kw_id).first()
if not row:
return JSONResponse({"error": "not found"}, status_code=404)
db.delete(row)
db.commit()
_cycle_now.set() # wake scraper immediately
return {"status": "deleted"}
# ── N6: Scoring Rules ─────────────────────────────────────────────────────────
@app.get("/api/scoring-rules")
def get_scoring_rules(db: Session = Depends(get_db)):
return [r.to_dict() for r in db.query(ScoringRule).order_by(ScoringRule.id.asc()).all()]
@app.post("/api/scoring-rules")
async def create_scoring_rule(request: Request, db: Session = Depends(get_db)):
body = await request.json()
signal = (body.get("signal") or "").strip()
delta = body.get("delta")
if not signal or delta is None:
return JSONResponse({"error": "signal and delta required"}, status_code=400)
if db.query(ScoringRule).filter(ScoringRule.signal.ilike(signal)).first():
return JSONResponse({"error": "duplicate signal"}, status_code=409)
rule = ScoringRule(
signal=signal[:100],
delta=int(delta),
category="positive" if int(delta) > 0 else "negative",
notes=(body.get("notes") or "").strip() or None,
)
db.add(rule)
db.flush()
db.commit()
return rule.to_dict()
@app.put("/api/scoring-rules/{rule_id}")
async def update_scoring_rule(rule_id: int, request: Request, db: Session = Depends(get_db)):
row = db.query(ScoringRule).filter(ScoringRule.id == rule_id).first()
if not row:
return JSONResponse({"error": "not found"}, status_code=404)
body = await request.json()
if "signal" in body:
row.signal = body["signal"].strip()[:100]
if "delta" in body:
row.delta = int(body["delta"])
row.category = "positive" if row.delta > 0 else "negative"
if "notes" in body:
row.notes = (body["notes"] or "").strip() or None
db.flush()
db.commit()
return row.to_dict()
@app.delete("/api/scoring-rules/{rule_id}")
def delete_scoring_rule(rule_id: int, db: Session = Depends(get_db)):
row = db.query(ScoringRule).filter(ScoringRule.id == rule_id).first()
if not row:
return JSONResponse({"error": "not found"}, status_code=404)
db.delete(row)
db.commit()
return {"status": "deleted"}
# ── Target Sites ─────────────────────────────────────────────────────────────
@app.get("/api/sites")
def get_sites(db: Session = Depends(get_db)):
return [s.to_dict() for s in db.query(TargetSite).order_by(TargetSite.sort_order.asc(), TargetSite.id.asc()).all()]
@app.get("/api/sites/enabled-count")
def get_enabled_site_count(db: Session = Depends(get_db)):
"""Returns the number of currently enabled target sites.
Used by the Settings page to validate max_concurrent_browsers."""
count = db.query(TargetSite).filter(TargetSite.enabled == 1).count()
return {"count": count}
@app.get("/api/scrape/progress")
def get_scrape_progress(db: Session = Depends(get_db)):
"""
Live scrape-progress for keyword batching mode.
Returns the active round and the “pending retries that need attention”
(attempt_count>0 only), including hourly warning timing.
"""
keyword_batch_enabled = _get_config("keyword_batch_enabled", "false").lower() == "true"
active_round = (
db.query(ScrapeRound)
.filter(ScrapeRound.status == "active")
.order_by(ScrapeRound.started_at.desc())
.first()
)
if not active_round:
return {
"keyword_batch_enabled": keyword_batch_enabled,
"active_round": None,
"counts": {"pending": 0, "in_progress": 0, "done": 0, "failed": 0},
"pending_items": [],
}
round_id = active_round.id
now_dt = datetime.now()
deadline_at = active_round.started_at + timedelta(hours=4)
def _count_status(st: str) -> int:
return (
db.query(ScrapeRoundItem)
.filter(ScrapeRoundItem.round_id == round_id, ScrapeRoundItem.status == st)
.count()
)
counts = {
"pending": _count_status("pending"),
"in_progress": _count_status("in_progress"),
"done": _count_status("done"),
"failed": _count_status("failed"),
}
pending_items = (
db.query(ScrapeRoundItem)
.filter(ScrapeRoundItem.round_id == round_id, ScrapeRoundItem.status == "pending")
.order_by(ScrapeRoundItem.attempt_count.desc(), ScrapeRoundItem.last_attempt_at.desc())
.limit(50)
.all()
)
site_ids = list({ri.site_id for ri in pending_items})
keyword_ids = list({ri.keyword_id for ri in pending_items})
site_map = {s.id: s.name for s in db.query(TargetSite).filter(TargetSite.id.in_(site_ids)).all()} if site_ids else {}
kw_map = {k.id: k.term for k in db.query(Keyword).filter(Keyword.id.in_(keyword_ids)).all()} if keyword_ids else {}
items_out = []
for ri in pending_items:
# Queued-but-never-attempted items have attempt_count==0.
if (ri.attempt_count or 0) <= 0:
continue
base = ri.last_hour_warn_at or ri.first_pending_at or active_round.started_at
warn_due = bool(base and (now_dt - base).total_seconds() >= 3600)
items_out.append(
{
"round_item_id": ri.id,
"site_name": site_map.get(ri.site_id, str(ri.site_id)),
"keyword_term": kw_map.get(ri.keyword_id, str(ri.keyword_id)),
"attempt_count": ri.attempt_count or 0,
"first_pending_at": ri.first_pending_at.isoformat() if ri.first_pending_at else None,
"last_attempt_at": ri.last_attempt_at.isoformat() if ri.last_attempt_at else None,
"last_hour_warn_at": ri.last_hour_warn_at.isoformat() if ri.last_hour_warn_at else None,
"last_error": (ri.last_error or "")[:300] if ri.last_error else None,
"warn_due": warn_due,
}
)
return {
"keyword_batch_enabled": keyword_batch_enabled,
"active_round": {
"id": active_round.id,
"started_at": active_round.started_at.isoformat() if active_round.started_at else None,
"deadline_at": deadline_at.isoformat() if deadline_at else None,
},
"counts": counts,
"pending_items": items_out,
}
@app.post("/api/sites/reorder")
async def reorder_sites(request: Request, db: Session = Depends(get_db)):
"""Accepts {order: [id, id, ...]} and bulk-updates sort_order."""
body = await request.json()
ids = body.get("order", [])
for idx, site_id in enumerate(ids):
db.query(TargetSite).filter(TargetSite.id == site_id).update({"sort_order": idx})
db.flush()
db.commit()
return {"status": "reordered"}
@app.post("/api/sites")
async def add_site(request: Request, db: Session = Depends(get_db)):
"""
Registers a new TargetSite.
Uses request.json() directly (no Pydantic model) to prevent 400 errors.
Sets enabled=1 explicitly — never relies on column default under concurrent load.
Calls db.flush() before db.commit() to force the INSERT into the SQLite
WAL immediately, so the scraper thread's next DB session sees the new row.
URL mode is inferred automatically:
Mode A — Direct: url_template contains {keyword}
→ scraper substitutes keyword and navigates directly.
Mode B — Homepage: url_template has NO {keyword}
→ scraper navigates to the URL then types in the
search box identified by search_selector.
search_selector is required in this mode.
"""
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "invalid JSON body"}, status_code=400)
name = str(body.get("name", "")).strip()
template = str(body.get("url_template", "")).strip()
selector = str(body.get("search_selector", "")).strip()
def _coerce_int_01(val, default: int = 0) -> int:
if val is None:
return default
if isinstance(val, bool):
return 1 if val else 0
s = str(val).strip().lower()
if s in ("1", "true", "yes", "on"):
return 1
if s in ("0", "false", "no", "off", ""):
return 0
try:
return 1 if int(float(val)) != 0 else 0
except Exception:
return default
if not name or not template:
return JSONResponse({"error": "name and url_template are required"}, status_code=400)
# ── Infer navigation mode and warn (not reject) for homepage mode ────────
is_direct_mode = "{keyword}" in template
if is_direct_mode:
mode_label = "DIRECT (keyword substitution)"
else:
mode_label = "HOMEPAGE (search-box interaction)"
if not selector:
# Warn but still save — operator can add selector via PUT later
print(
f"[API] ⚠️ Site '{name}' saved in HOMEPAGE mode but "
f"search_selector is empty. Add a CSS selector "
f"(e.g. 'input#st') via the Target Sites tab or the "
f"scraper will skip this site until one is provided."
)
max_pages = max(1, int(body.get("max_pages", 1) or 1))
requires_login = bool(body.get("requires_login", False))
login_url = str(body.get("login_url", "") or "").strip()
login_check = str(body.get("login_check_selector", "") or "").strip()
login_enabled = bool(body.get("login_enabled", requires_login))
custom_visible_browser = _coerce_int_01(body.get("custom_visible_browser", 0), default=0)
max_order = db.query(TargetSite).count() # new site goes to the bottom
site = TargetSite(
name=name,
url_template=template,
search_selector=selector,
enabled=1, # explicit — never rely on column default for critical flag
max_pages=max_pages,
sort_order=max_order,
requires_login=requires_login,
login_url=login_url,
login_check_selector=login_check,
login_enabled=login_enabled,
custom_visible_browser=custom_visible_browser,
)
db.add(site)
db.flush() # pushes INSERT to SQLite WAL before commit
db.commit()
db.refresh(site)
print(f"[API] ✅ New TargetSite saved: '{site.name}' id={site.id} "
f"mode={mode_label} pages={max_pages} login={requires_login}")
# Auto-adapt: if enabled, kick off AI selector generation immediately for new site
if _get_config("auto_adapt_enabled", "false").lower() == "true":
asyncio.create_task(adapt_site_now(site.id))
print(f"[AutoAdapt] 🆕 New site '{site.name}' — auto-adapt queued.")
_cycle_now.set() # wake scraper immediately
return site.to_dict()
@app.put("/api/sites/{site_id}")
async def update_site(site_id: int, request: Request, db: Session = Depends(get_db)):
"""
Updates a TargetSite row.
Coerces 'enabled' to a plain integer (1 or 0) regardless of whether the
dashboard sends a JSON boolean (true/false) or integer (1/0) — SQLite
stores INTEGER and the filter TargetSite.enabled == 1 must see an int.
db.flush() is called before db.commit() to push the UPDATE into the
SQLite WAL immediately, closing the race window where the scraper thread
could open a new session and read stale 'enabled' values.
"""
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "invalid JSON body"}, status_code=400)
row = db.query(TargetSite).filter(TargetSite.id == site_id).first()
if not row:
return JSONResponse({"error": "not found"}, status_code=404)
for field in ("name", "url_template", "search_selector"):
if field in body:
setattr(row, field, body[field])
# ── Coerce 'enabled' to plain int so SQLite stores 1 or 0, never True/False ──
if "enabled" in body:
row.enabled = 1 if body["enabled"] else 0
def _coerce_int_01(val, default: int = 0) -> int:
if val is None:
return default
if isinstance(val, bool):
return 1 if val else 0
s = str(val).strip().lower()
if s in ("1", "true", "yes", "on"):
return 1
if s in ("0", "false", "no", "off", ""):
return 0
try:
return 1 if int(float(val)) != 0 else 0
except Exception:
return default
# ── max_pages and login fields ────────────────────────────────────────────
if "max_pages" in body:
row.max_pages = max(1, int(body["max_pages"] or 1))
for field in ("requires_login", "login_enabled"):
if field in body:
setattr(row, field, bool(body[field]))
for field in ("login_url", "login_check_selector"):
if field in body:
setattr(row, field, str(body[field] or "").strip())
if "custom_visible_browser" in body:
row.custom_visible_browser = _coerce_int_01(body.get("custom_visible_browser"), default=0)
db.flush() # ← pushes UPDATE to WAL; scraper thread sees it immediately
db.commit()
db.refresh(row)
status = "ENABLED ✅" if row.enabled == 1 else "DISABLED ⏸"
print(f"[API] ✅ Site '{row.name}' (id={site_id}) → {status}")
_cycle_now.set() # wake scraper immediately
return row.to_dict()
@app.delete("/api/sites/{site_id}")
def delete_site(site_id: int, db: Session = Depends(get_db)):
row = db.query(TargetSite).filter(TargetSite.id == site_id).first()
if not row:
return JSONResponse({"error": "not found"}, status_code=404)
db.delete(row)
db.commit()
_cycle_now.set() # wake scraper immediately
return {"status": "deleted"}
# ── Config / Settings ─────────────────────────────────────────────────────────
@app.get("/api/config")
def get_config(db: Session = Depends(get_db)):
rows = db.query(Config).all()
return {r.key: r.value for r in rows}
@app.post("/api/config")
async def save_config(request: Request, db: Session = Depends(get_db)):
"""
Accepts a JSON dict of key→value pairs and upserts into the Config table.
Uses request.json() directly to avoid Pydantic 400 errors.
db.flush() forces the UPDATEs/INSERTs into the SQLite WAL before commit,
ensuring the scraper thread's next _get_config() call sees fresh values.
"""
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "invalid JSON body"}, status_code=400)
saved_keys: list[str] = []
for key, value in body.items():
row = db.query(Config).filter(Config.key == key).first()
if row:
row.value = str(value)
else:
db.add(Config(key=key, value=str(value)))
saved_keys.append(key)
db.flush() # push dirty rows to SQLite WAL
db.commit() # finalise the transaction on disk
# Terminal confirmation — proves the write happened
print(f"[API] ✅ Config saved to DB: {saved_keys}")
for k in saved_keys:
row = db.query(Config).filter(Config.key == k).first()
display = row.value[:6] + "…" if row and row.value and len(row.value) > 6 else (row.value if row else "")
print(f" {k} = {display!r}")
_cycle_now.set() # wake scraper immediately
return {"status": "saved", "keys": saved_keys}
# ── N16: AI Test Endpoint ──────────────────────────────────────────────────────
@app.post("/api/ai/test")
async def ai_test(request: Request):
"""
Test the AI filter with a sample title and target.
Body: {"title": "...", "ai_target": "..."}
Returns: {"match": bool, "reason": "..."}
"""
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "invalid JSON"}, status_code=400)
title = str(body.get("title", "")).strip()
ai_target = str(body.get("ai_target", "")).strip()
if not title or not ai_target:
return JSONResponse({"error": "title and ai_target required"}, status_code=400)
provider = _get_config("ai_provider", "groq").strip().lower()
if provider == "none":
return {"match": True, "reason": "AI provider is set to none — filter disabled."}
match, reason = await _ai_analyze(title, ai_target)
return {"match": match, "reason": reason, "provider": provider}
@app.get("/api/ai/debug/log")
def ai_debug_log(limit: int = 200, since_id: int = 0):
"""
Return the in-memory AI debug log (newest entries last).
- limit: max entries to return (default 200, max 300)
- since_id: only return entries with id > since_id (for polling — pass the last
id you received to get only new entries since then)
Requires ai_debug = true in config to produce entries; always returns the
current buffer regardless.
"""
with _ai_debug_log_lock:
entries = list(_ai_debug_log)
if since_id > 0:
entries = [e for e in entries if e.get("id", 0) > since_id]
entries = entries[-min(limit, 300):]
debug_on = _ai_debug_enabled()
return {
"debug_enabled": debug_on,
"total_in_buffer": len(list(_ai_debug_log)),
"entries": entries,
}
@app.delete("/api/ai/debug/log")
def ai_debug_log_clear():
"""Clear the in-memory AI debug log buffer."""
with _ai_debug_log_lock:
_ai_debug_log.clear()
return {"status": "ok", "message": "AI debug log cleared."}
# ── Engine Control ─────────────────────────────────────────────────────────────
@app.post("/api/engine/pause")
def engine_pause():
_stats["engine_status"] = "Paused"
_redis_set_stats(_stats)
_redis_publish("engine_status", {"status": "Paused"})
return {"status": "paused"}
@app.post("/api/engine/resume")
def engine_resume():
_stats["engine_status"] = "Running"
_redis_set_stats(_stats)
_redis_publish("engine_status", {"status": "Running"})
return {"status": "running"}
@app.post("/api/engine/restart")
def engine_restart():
"""
Cross-platform hard restart.
Strategy:
1. Respond HTTP 200 immediately so the client gets a clean response.
2. A daemon thread waits 1 second (lets uvicorn flush the response),
then spawns a brand-new Python process running the same script with
the same arguments via subprocess.Popen.
3. After spawning the child, the current process calls os._exit(0) to
terminate itself immediately and release port 7000.
Why not os.execv?
os.execv works on Linux but on Windows it does NOT replace the current
process — it creates a new one while the old one keeps running, which
causes an "address already in use" error on port 7000.
Why subprocess.Popen + os._exit(0)?
Popen detaches the child before the parent exits, so the child is
never left as an orphan. os._exit(0) bypasses Python's atexit hooks
and __del__ finalizers which can deadlock when uvicorn is still
running threads.
"""
import threading, subprocess
def _do_restart() -> None:
time.sleep(1.0) # give uvicorn time to flush the HTTP response
try:
print("[GhostNode] 🔄 Spawning new process…")
# Inherit stdout/stderr so the new process logs to the same terminal
subprocess.Popen(
[sys.executable] + sys.argv,
stdout=None,
stderr=None,
close_fds=True,
)
print("[GhostNode] ✅ New process launched — shutting down this instance.")
except Exception as exc:
print(f"[GhostNode] ❌ Restart failed: {exc}")
return
# Kill this process immediately — port 7000 is now free for the child
os._exit(0)
threading.Thread(target=_do_restart, daemon=True, name="GhostNode-Restart").start()
return {
"status": "restarting",
"message": "New process spawning — this instance will exit in ~1 second.",
}
@app.post("/api/engine/kill")
def engine_kill():
"""
Hard-kill Ghost Node immediately — no restart, no respawn.
Sends HTTP 200 first, then a daemon thread calls os._exit(0) after
a 300 ms flush window. The entire process dies: all threads, the
scraper, the Telegram C2 loop, and uvicorn.
The dashboard will go offline and will NOT reconnect automatically.
The user must restart manually from the terminal.
"""
def _do_kill() -> None:
time.sleep(0.3) # let uvicorn flush the response
print("[GhostNode] ☠ KILL signal received — terminating process.")
os._exit(0)
threading.Thread(target=_do_kill, daemon=True, name="GhostNode-Kill").start()
return {"status": "killed", "message": "Process terminating in ~300ms."}
# ── Telegram connectivity test ────────────────────────────────────────────────
@app.post("/api/telegram/test")
async def test_telegram():
"""
Sends a test message using whatever token/chat_id is currently in the DB.
Returns the full Telegram response body so you can diagnose 401/404 etc.
"""
token = _get_config("telegram_token")
chat_id = _get_config("telegram_chat_id")
if not token or not chat_id:
return JSONResponse(
{"ok": False, "error": "No token or chat_id saved in DB. Open Settings tab and save first."},
status_code=400,
)
url = f"https://api.telegram.org/bot{token}/sendMessage"
try:
async with httpx.AsyncClient(timeout=15) as client:
r = await client.post(
url,
data={"chat_id": chat_id, "text": "👻 Ghost Node — Telegram test OK!", "parse_mode": "HTML"},
)
body = r.json()
if r.status_code == 200:
return {"ok": True, "telegram_response": body}
else:
return JSONResponse(
{"ok": False, "http_status": r.status_code, "telegram_response": body},
status_code=200, # return 200 to JS — the Telegram error is in the body
)
except Exception as exc:
return JSONResponse({"ok": False, "error": str(exc)}, status_code=500)
# ── DB read-back diagnostic ───────────────────────────────────────────────────
# ── N14 — Login trigger endpoint ─────────────────────────────────────────────
@app.post("/api/sites/{site_id}/login")
async def trigger_login(site_id: int, db: Session = Depends(get_db)):
"""
Opens a VISIBLE browser window on the site's login_url so the user can
manually log in. The session is saved to the persistent profile for that
site and reused by the scraper on all future cycles.
Only works when login_enabled = true for this site.
Returns immediately — the browser window stays open for the user to log in.
"""
site = db.query(TargetSite).filter(TargetSite.id == site_id).first()
if not site:
return JSONResponse({"error": "site not found"}, status_code=404)
if not site.login_enabled:
return JSONResponse({"error": "login_enabled is false for this site"}, status_code=400)
if not site.login_url:
return JSONResponse({"error": "No login_url configured for this site"}, status_code=400)
import re as _re2
site_slug = _re2.sub(r"[^a-z0-9]", "_", site.name.lower())[:40]
profile_dir = os.path.join(os.path.dirname(__file__), ".browser_profiles", site_slug)
os.makedirs(profile_dir, exist_ok=True)
async def _open_login_browser():
from playwright.async_api import async_playwright
async with async_playwright() as pw:
try:
_lbl, _exe = _resolve_browser()
ctx = await pw.chromium.launch_persistent_context(
profile_dir,
executable_path=_exe or None,
headless=False, # MUST be visible so user can log in
args=["--no-sandbox"],
)
page = await ctx.new_page()
await page.goto(site.login_url, timeout=60_000, wait_until="domcontentloaded")
print(f"[Login] 🔑 Browser open for {site.name} — log in and close when done.")
# Wait up to 10 minutes for the user to log in and close
await ctx.wait_for_event("close", timeout=600_000)
print(f"[Login] ✅ Session saved for {site.name}.")
except Exception as exc:
print(f"[Login] ❌ {exc}")
# Run in background — don't block the API response
asyncio.create_task(_open_login_browser())
return {
"status": "browser_opening",
"message": f"A visible browser window is opening for {site.name}. Log in and close it when done — the session will be saved automatically.",
"login_url": site.login_url,
"profile_dir": profile_dir,
}
# ── N17 — Auto-Adapter endpoints ─────────────────────────────────────────────
@app.post("/api/sites/{site_id}/adapt")
async def trigger_adapt(site_id: int, db: Session = Depends(get_db)):
"""
Trigger AI selector generation for a site.
Launches a temporary browser, scrapes the site, sends cleaned HTML to
Groq (online) or Ollama (local) for CSS selector generation, validates
live, and persists to the site_selectors table.
Returns immediately with a task status; full result logged to console.
"""
site = db.query(TargetSite).filter(TargetSite.id == site_id).first()
if not site:
return JSONResponse({"error": "site not found"}, status_code=404)
cfg = {r.key: r.value for r in db.query(Config).all()}
provider = cfg.get("ai_provider", "groq")
if provider == "none":
return JSONResponse({"error": "AI provider is set to 'none'. Enable Groq or Ollama in Settings."}, status_code=400)
# NOTE: auto_adapt_enabled only gates *automatic* adaptation on new site creation.
# Manual adaptation via the 🤖 ADAPT button is always permitted if an AI provider
# is configured — do NOT gate it on auto_adapt_enabled.
async def _run():
result = await adapt_site_now(site_id)
confidence = result.get("confidence", 0)
status = "✅" if confidence >= 50 else "⚠️"
print(f"[AutoAdapt] {status} Manual adapt for {site.name} done — confidence {confidence:.1f}")
asyncio.create_task(_run())
return {
"status": "adapting",
"message": f"AI selector generation started for '{site.name}'. Check console for progress. Reload the Sites tab in ~30s to see the result.",
"site_id": site_id,
"provider": provider,
}
@app.get("/api/sites/{site_id}/selectors")
def get_site_selectors(site_id: int, db: Session = Depends(get_db)):
"""Return the stored AI selectors for a site, if any."""
site = db.query(TargetSite).filter(TargetSite.id == site_id).first()
if not site:
return JSONResponse({"error": "site not found"}, status_code=404)
ss = db.query(SiteSelectors).filter(SiteSelectors.site_id == site_id).first()
if not ss:
return {"site_id": site_id, "site_name": site.name, "selectors": None}
return {"site_id": site_id, "site_name": site.name, "selectors": ss.to_dict()}
@app.delete("/api/sites/{site_id}/selectors")
def delete_site_selectors(site_id: int, db: Session = Depends(get_db)):
"""Delete stored AI selectors for a site (forces re-adaptation on next cycle)."""
site = db.query(TargetSite).filter(TargetSite.id == site_id).first()
if not site:
return JSONResponse({"error": "site not found"}, status_code=404)
ss = db.query(SiteSelectors).filter(SiteSelectors.site_id == site_id).first()
if not ss:
return {"status": "ok", "message": "No selectors stored for this site."}
db.flush()
db.delete(ss)
db.commit()
return {"status": "ok", "message": f"Selectors for '{site.name}' deleted. Site will use universal extractor until re-adapted."}
# ── N15 — Export endpoints ────────────────────────────────────────────────────
import csv, json as _json
from io import StringIO
from fastapi.responses import StreamingResponse
@app.get("/api/export/csv")
def export_csv(limit: int = 10000, db: Session = Depends(get_db)):
"""Export all listings to a CSV file download."""
rows = db.query(Listing).order_by(Listing.timestamp.desc()).limit(limit).all()
output = StringIO()
writer = csv.writer(output)
writer.writerow(["ID","Title","Price","Currency","Price Raw","Time Left","Score",
"Keyword","Site","Link","Captured At","Price Updated At"])
for r in rows:
writer.writerow([
r.id, r.title, r.price or "", r.currency or "", r.price_raw or "",
r.time_left or "", r.score, r.keyword or "", r.site_name or "",
r.link, r.timestamp.isoformat() if r.timestamp else "",
r.price_updated_at.isoformat() if r.price_updated_at else "",
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=ghost_node_listings.csv"},
)
@app.get("/api/export/json")
def export_json(limit: int = 10000, db: Session = Depends(get_db)):
"""Export all listings to a JSON file download."""
rows = db.query(Listing).order_by(Listing.timestamp.desc()).limit(limit).all()
data = _json.dumps([r.to_dict() for r in rows], indent=2, default=str)
return StreamingResponse(
iter([data]),
media_type="application/json",
headers={"Content-Disposition": "attachment; filename=ghost_node_listings.json"},
)
@app.get("/api/export/html")
def export_html(limit: int = 10000, db: Session = Depends(get_db)):
"""Export all listings as a self-contained HTML report."""
rows = db.query(Listing).order_by(Listing.timestamp.desc()).limit(limit).all()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
rows_html = ""
for r in rows:
score_color = "#00ff88" if r.score > 0 else "#888"
ts_str = r.timestamp.strftime("%Y-%m-%d %H:%M") if r.timestamp else ""
rows_html += (
"
"
f'| {r.title[:80]} | '
f'{r.price_raw or ""} | '
f'{r.time_left or ""} | '
f'{r.score} | '
f'{r.keyword or ""} | '
f'{r.site_name or ""} | '
f'{ts_str} | '
"
\n"
)
html = f"""
Ghost Node Export — {now}
// GHOST NODE LISTINGS EXPORT
Generated: {now} | {len(rows)} listings
| Title | Price | Time Left | Score | Keyword | Site | Captured |
{rows_html}
"""
return StreamingResponse(
iter([html]),
media_type="text/html",
headers={"Content-Disposition": f"attachment; filename=ghost_node_export_{now[:10]}.html"},
)
# ── Database Backup & Restore ────────────────────────────────────────────────
@app.get("/api/backup/download")
def backup_download():
"""
Stream the raw sniper.db SQLite file as a download.
Only works when using SQLite (not PostgreSQL).
Creates a timestamped filename so backups don't overwrite each other.
"""
from database import DATABASE_URL, _is_sqlite
if not _is_sqlite:
return JSONResponse(
{"error": "Backup only supported for SQLite. Use pg_dump for PostgreSQL."},
status_code=400,
)
# Resolve the actual file path from the SQLite URL
db_path = DATABASE_URL.replace("sqlite:///", "").replace("sqlite://", "")
if not db_path.startswith("/"):
db_path = os.path.join(os.path.dirname(__file__), db_path.lstrip("./"))
db_path = os.path.abspath(db_path)
if not os.path.exists(db_path):
return JSONResponse({"error": f"Database file not found: {db_path}"}, status_code=404)
# Use a safe hot-backup: VACUUM INTO a temp file, then stream it
# This avoids streaming a live WAL-mode DB mid-write
import tempfile, shutil
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"ghost_node_backup_{ts}.db"
tmp_path = os.path.join(tempfile.gettempdir(), backup_filename)
try:
import sqlite3 as _sqlite3
conn = _sqlite3.connect(db_path)
bk = _sqlite3.connect(tmp_path)
conn.backup(bk)
bk.close()
conn.close()
except Exception as exc:
return JSONResponse({"error": f"Backup failed: {exc}"}, status_code=500)
def _stream_and_delete():
try:
with open(tmp_path, "rb") as f:
while chunk := f.read(65536):
yield chunk
finally:
try:
os.remove(tmp_path)
except Exception:
pass
print(f"[Backup] 📦 Streaming backup: {backup_filename} ({os.path.getsize(tmp_path):,} bytes)")
return StreamingResponse(
_stream_and_delete(),
media_type="application/octet-stream",
headers={"Content-Disposition": f"attachment; filename={backup_filename}"},
)
@app.post("/api/backup/restore")
async def backup_restore(request: Request):
"""
Accept a .db file upload and replace the current sniper.db with it.
The server restarts automatically after restore so all connections reopen.
SAFETY: saves the current DB as an auto-backup before overwriting.
Only works when using SQLite.
"""
from database import DATABASE_URL, _is_sqlite
if not _is_sqlite:
return JSONResponse(
{"error": "Restore only supported for SQLite."},
status_code=400,
)
db_path = DATABASE_URL.replace("sqlite:///", "").replace("sqlite://", "")
if not db_path.startswith("/"):
db_path = os.path.join(os.path.dirname(__file__), db_path.lstrip("./"))
db_path = os.path.abspath(db_path)
try:
body = await request.body()
if len(body) < 100:
return JSONResponse({"error": "Uploaded file appears empty or too small."}, status_code=400)
# Verify it's a valid SQLite file (magic bytes: "SQLite format 3")
if not body[:16].startswith(b"SQLite format 3"):
return JSONResponse(
{"error": "File does not appear to be a valid SQLite database."},
status_code=400,
)
# Auto-backup current DB before overwriting
if os.path.exists(db_path):
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
auto_bk = db_path + f".pre_restore_{ts}.bak"
import shutil
shutil.copy2(db_path, auto_bk)
print(f"[Restore] 💾 Auto-backup saved: {auto_bk}")
# Write the uploaded file
with open(db_path, "wb") as f:
f.write(body)
print(f"[Restore] ✅ Database restored from upload ({len(body):,} bytes). Restarting...")
# Restart the process to reopen all DB connections
import threading
def _restart():
import time, sys
time.sleep(1)
os.execv(sys.executable, [sys.executable] + sys.argv)
threading.Thread(target=_restart, daemon=True).start()
return JSONResponse({
"status": "restored",
"message": "Database restored successfully. Ghost Node is restarting — refresh the dashboard in 5 seconds.",
"bytes_written": len(body),
})
except Exception as exc:
return JSONResponse({"error": f"Restore failed: {exc}"}, status_code=500)
@app.get("/api/redis/status")
def redis_status():
"""Check Redis connectivity and return cached stats hash."""
if _redis_client is None:
return {"connected": False, "reason": "REDIS_URL not set or redis package missing"}
try:
_redis_client.ping()
cached = _redis_client.hgetall(_REDIS_STATS_KEY)
return {"connected": True, "url": os.environ.get("REDIS_URL", ""), "cached_stats": cached}
except Exception as exc:
return {"connected": False, "reason": str(exc)}
@app.get("/api/debug/db")
def debug_db(db: Session = Depends(get_db)):
"""
Returns the exact contents of Config and TargetSite tables.
Use this to confirm that Settings-tab saves and new sites
are genuinely written to sniper.db.
"""
configs = {r.key: r.value for r in db.query(Config).all()}
# Mask token for security — show only first 8 chars
if "telegram_token" in configs and configs["telegram_token"]:
t = configs["telegram_token"]
configs["telegram_token"] = t[:8] + "…" if len(t) > 8 else t
sites = [s.to_dict() for s in db.query(TargetSite).all()]
keywords = [k.to_dict() for k in db.query(Keyword).all()]
return {
"config": configs,
"sites": sites,
"keywords": keywords,
"listing_count": db.query(Listing).count(),
}
# ── Phase 7: Serve Next.js static build ─────────────────────────────────────
import pathlib as _pathlib
_frontend_out = _pathlib.Path(__file__).parent / "frontend" / "out"
if _frontend_out.exists():
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse as _FileResponse
# Mount ONLY the _next directory (JS/CSS/image assets).
# We deliberately avoid app.mount("/", html=True) because it intercepts
# ALL paths as a Starlette sub-app, shadowing explicit routes like /legacy.
_next_dir = _frontend_out / "_next"
if _next_dir.exists():
app.mount("/_next", StaticFiles(directory=str(_next_dir)), name="nextjs_assets")
# SPA catch-all — registered last so all specific @app.get() routes win.
# Handles: exact files (favicon.ico, etc.), Next.js .html pages, and the
# SPA index.html fallback for deep-linked client-side routes.
@app.get("/{full_path:path}")
async def serve_spa(full_path: str):
# 1. Exact file match (favicon.ico, *.svg, etc.)
candidate = _frontend_out / full_path
if candidate.is_file():
return _FileResponse(str(candidate))
# 2. Next.js exported page (e.g. "dashboard" → dashboard.html)
html_candidate = _frontend_out / f"{full_path}.html"
if html_candidate.is_file():
return _FileResponse(str(html_candidate))
# 3. SPA fallback — let the client-side router handle it
return _FileResponse(str(_frontend_out / "index.html"))
print("[GhostNode] Serving Next.js frontend from frontend/out/")
# ─────────────────────────────────────────────────────────────────────────────
# Entry Point — spin up threads
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Thread B (scraper & Telegram C2 share one asyncio event loop)
scraper_thread = threading.Thread(
target=run_scraper_thread,
name="GhostNode-Scraper",
daemon=True,
)
scraper_thread.start()
# Thread D (price/time-left refresh — isolated event loop, never blocks scraper)
refresh_thread = threading.Thread(
target=run_refresh_thread,
name="GhostNode-Refresh",
daemon=True,
)
refresh_thread.start()
# Thread E (closing-soon alert — isolated event loop, polls every 60s)
closing_thread = threading.Thread(
target=run_closing_alert_thread,
name="GhostNode-ClosingAlert",
daemon=True,
)
closing_thread.start()
print("[GhostNode] 🕵️ Ghost Node online — Dashboard → http://localhost:7000")
# Thread A (FastAPI via uvicorn — blocks main thread)
uvicorn.run(app, host="0.0.0.0", port=7000, log_level="warning")