39246-vm/backend_tests/test_scrape_progress.py
2026-03-20 02:38:40 +03:00

342 lines
13 KiB
Python

import os
import tempfile
import unittest
from datetime import datetime, timedelta
def _make_temp_sqlite_url() -> str:
tmp = tempfile.NamedTemporaryFile(prefix="ghostnode_", suffix=".db", delete=False)
path = tmp.name
tmp.close()
# SQLAlchemy on Windows expects forward slashes in sqlite URLs.
norm = path.replace("\\", "/")
return f"sqlite:///{norm}"
_DB_URL = _make_temp_sqlite_url()
os.environ["DATABASE_URL"] = _DB_URL
# IMPORTANT: import app/models AFTER DATABASE_URL is set.
from fastapi.testclient import TestClient # noqa: E402
import worker # noqa: E402
from database import SessionLocal # noqa: E402
from models import ( # noqa: E402
Config,
Keyword,
ScrapeRound,
ScrapeRoundItem,
TargetSite,
Listing,
)
class ScrapeProgressEndpointTests(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.client = TestClient(worker.app)
db = SessionLocal()
try:
# Force keyword batching mode so the endpoint returns keyword_batch_enabled=true.
cfg = db.query(Config).filter(Config.key == "keyword_batch_enabled").first()
if cfg:
cfg.value = "true"
else:
db.add(Config(key="keyword_batch_enabled", value="true"))
# Create dedicated keywords so assertions can match by term.
kw_due = Keyword(term="TEST_DUE_KEYWORD", weight=1, sort_order=999001)
kw_not_due = Keyword(term="TEST_RETRY_KEYWORD", weight=1, sort_order=999002)
kw_zero_attempt = Keyword(term="TEST_ZERO_ATTEMPT_KEYWORD", weight=1, sort_order=999003)
db.add_all([kw_due, kw_not_due, kw_zero_attempt])
db.flush()
# Use any existing seeded site (or create one if empty).
site = db.query(TargetSite).order_by(TargetSite.id.asc()).first()
if site is None:
site = TargetSite(
name="TEST_SITE",
url_template="https://example.com/?q={keyword}",
search_selector="",
enabled=1,
max_pages=1,
sort_order=0,
)
db.add(site)
db.flush()
now = datetime.now()
cls._round_started_at = now - timedelta(hours=2)
round_row = ScrapeRound(started_at=cls._round_started_at, status="active")
db.add(round_row)
db.flush()
cls._round_id = round_row.id
# Warn due: base >= 1 hour ago -> warn_due should be True.
due_item = ScrapeRoundItem(
round_id=cls._round_id,
site_id=site.id,
keyword_id=kw_due.id,
status="pending",
attempt_count=1,
first_pending_at=now - timedelta(minutes=90),
last_attempt_at=now - timedelta(minutes=10),
last_hour_warn_at=now - timedelta(hours=2),
last_error=None,
)
# Not due: base < 1 hour ago -> warn_due should be False.
retry_item = ScrapeRoundItem(
round_id=cls._round_id,
site_id=site.id,
keyword_id=kw_not_due.id,
status="pending",
attempt_count=2,
first_pending_at=now - timedelta(minutes=30),
last_attempt_at=now - timedelta(minutes=20),
last_hour_warn_at=None,
last_error=None,
)
# attempt_count == 0 must be excluded from pending_items.
zero_attempt_item = ScrapeRoundItem(
round_id=cls._round_id,
site_id=site.id,
keyword_id=kw_zero_attempt.id,
status="pending",
attempt_count=0,
first_pending_at=now - timedelta(hours=2),
last_attempt_at=None,
last_hour_warn_at=now - timedelta(hours=2),
last_error=None,
)
db.add_all([due_item, retry_item, zero_attempt_item])
# Seed listings for countdown-sync endpoint.
# (No listings are seeded by default seed_database().)
now2 = datetime.now()
listing_with_price = Listing(
title="TEST_LISTING_WITH_PRICE_UPDATED_AT",
link="https://example.com/listing-with-price-updated-at",
price=100.0,
currency="USD",
price_raw="$100",
time_left="12h",
time_left_mins=12.5,
price_updated_at=now2 - timedelta(minutes=5),
score=0,
keyword="kw",
site_name=site.name,
timestamp=now2 - timedelta(minutes=6),
closing_alerts_sent="[]",
images="[]",
description="",
)
listing_without_price = Listing(
title="TEST_LISTING_NO_PRICE_UPDATED_AT",
link="https://example.com/listing-no-price-updated-at",
price=None,
currency="USD",
price_raw="",
time_left="7h",
time_left_mins=7.0,
price_updated_at=None,
score=0,
keyword="kw",
site_name=site.name,
timestamp=now2 - timedelta(minutes=9),
closing_alerts_sent="[]",
images="[]",
description="",
)
db.add_all([listing_with_price, listing_without_price])
db.commit()
cls._listing_with_price_id = listing_with_price.id
cls._listing_without_price_id = listing_without_price.id
cls._last_price_update_iso = (
listing_with_price.price_updated_at.isoformat()
if listing_with_price.price_updated_at
else None
)
cls._expected_enabled_count = db.query(TargetSite).filter(TargetSite.enabled == 1).count()
finally:
db.close()
@classmethod
def tearDownClass(cls) -> None:
# Best-effort cleanup.
try:
# DATABASE_URL points to temp db file.
if _DB_URL.startswith("sqlite:///"):
db_path = _DB_URL.replace("sqlite:///", "", 1)
if db_path and os.path.exists(db_path):
os.remove(db_path)
except Exception:
pass
def test_progress_returns_warn_due_and_filters_zero_attempt(self) -> None:
res = self.client.get("/api/scrape/progress")
self.assertEqual(res.status_code, 200)
data = res.json()
self.assertEqual(data["keyword_batch_enabled"], True)
self.assertIsNotNone(data["active_round"])
self.assertEqual(data["active_round"]["id"], self._round_id)
items = data["pending_items"]
# attempt_count==0 is excluded
self.assertEqual(len(items), 2)
kw_terms = {it["keyword_term"]: it for it in items}
self.assertIn("TEST_DUE_KEYWORD", kw_terms)
self.assertIn("TEST_RETRY_KEYWORD", kw_terms)
self.assertNotIn("TEST_ZERO_ATTEMPT_KEYWORD", kw_terms)
self.assertTrue(kw_terms["TEST_DUE_KEYWORD"]["warn_due"])
self.assertFalse(kw_terms["TEST_RETRY_KEYWORD"]["warn_due"])
def test_progress_returns_no_active_round(self) -> None:
# Temporarily mark the active round as finished.
db = SessionLocal()
try:
round_row = db.query(ScrapeRound).filter(ScrapeRound.id == self._round_id).first()
self.assertIsNotNone(round_row)
round_row.status = "finished"
db.flush()
db.commit()
res = self.client.get("/api/scrape/progress")
self.assertEqual(res.status_code, 200)
data = res.json()
self.assertIsNone(data["active_round"])
self.assertEqual(data["pending_items"], [])
finally:
# Restore active status so tests won't interfere if order changes.
try:
db2 = SessionLocal()
try:
round_row = db2.query(ScrapeRound).filter(ScrapeRound.id == self._round_id).first()
if round_row:
round_row.status = "active"
db2.flush()
db2.commit()
finally:
db2.close()
finally:
db.close()
def test_stats_endpoint_shape(self) -> None:
res = self.client.get("/api/stats")
self.assertEqual(res.status_code, 200)
data = res.json()
# Ensure core keys exist and types are reasonable.
self.assertIn("uptime_seconds", data)
self.assertIsInstance(data["uptime_seconds"], int)
self.assertIn("engine_status", data)
self.assertIn("total_scanned", data)
self.assertIn("total_alerts", data)
self.assertIn("last_cycle", data)
self.assertIn("uptime_start", data)
def test_countdown_sync_returns_time_left_mins_and_iso_timestamps(self) -> None:
res = self.client.get("/api/listings/countdown-sync")
self.assertEqual(res.status_code, 200)
items = res.json()
by_id = {it["id"]: it for it in items}
self.assertIn(self._listing_with_price_id, by_id)
self.assertIn(self._listing_without_price_id, by_id)
with_price = by_id[self._listing_with_price_id]
self.assertAlmostEqual(with_price["time_left_mins"], 12.5, places=1)
self.assertIsNotNone(with_price["price_updated_at"])
self.assertIsNotNone(with_price["timestamp"])
without_price = by_id[self._listing_without_price_id]
self.assertAlmostEqual(without_price["time_left_mins"], 7.0, places=1)
self.assertIsNone(without_price["price_updated_at"])
self.assertIsNotNone(without_price["timestamp"])
def test_refresh_status_returns_last_price_update_and_listing_count(self) -> None:
res = self.client.get("/api/listings/refresh-status")
self.assertEqual(res.status_code, 200)
data = res.json()
self.assertEqual(data["listing_count"], 2)
self.assertEqual(data["last_price_update"], self._last_price_update_iso)
def test_sites_endpoint_returns_int_flags(self) -> None:
res = self.client.get("/api/sites")
self.assertEqual(res.status_code, 200)
sites = res.json()
self.assertTrue(isinstance(sites, list))
self.assertGreater(len(sites), 0)
for s in sites:
# These must be numeric flags (0/1), not JSON booleans.
self.assertIn(s["enabled"], (0, 1))
self.assertIn(s["custom_visible_browser"], (0, 1))
self.assertIn(s["requires_login"], (0, 1))
self.assertIn(s["login_enabled"], (0, 1))
# Ensure types are not JSON booleans.
self.assertIs(type(s["enabled"]), int)
self.assertIs(type(s["custom_visible_browser"]), int)
self.assertIs(type(s["requires_login"]), int)
self.assertIs(type(s["login_enabled"]), int)
def test_enabled_count_matches_db(self) -> None:
res = self.client.get("/api/sites/enabled-count")
self.assertEqual(res.status_code, 200)
data = res.json()
self.assertEqual(data["count"], self._expected_enabled_count)
def test_config_get_returns_flat_string_dict(self) -> None:
res = self.client.get("/api/config")
self.assertEqual(res.status_code, 200)
data = res.json()
self.assertIsInstance(data, dict)
self.assertNotIsInstance(data, list)
# Seeded by seed_database(); value must be a string.
self.assertIn("keyword_batch_enabled", data)
self.assertIsInstance(data["keyword_batch_enabled"], str)
def test_config_post_upserts_flat_dict_values_as_strings(self) -> None:
key_a = "__TEST_CFG_A"
key_b = "__TEST_CFG_B"
res = self.client.post(
"/api/config",
json={key_a: "1", key_b: "abc"},
)
self.assertEqual(res.status_code, 200)
body = res.json()
self.assertEqual(body["status"], "saved")
self.assertIn(key_a, body["keys"])
self.assertIn(key_b, body["keys"])
after = self.client.get("/api/config").json()
self.assertEqual(after[key_a], "1")
self.assertEqual(after[key_b], "abc")
# Upsert (update existing key).
res2 = self.client.post("/api/config", json={key_a: "2"})
self.assertEqual(res2.status_code, 200)
after2 = self.client.get("/api/config").json()
self.assertEqual(after2[key_a], "2")
if __name__ == "__main__":
unittest.main()