import os import tempfile import unittest from datetime import datetime, timedelta def _make_temp_sqlite_url() -> str: tmp = tempfile.NamedTemporaryFile(prefix="ghostnode_", suffix=".db", delete=False) path = tmp.name tmp.close() # SQLAlchemy on Windows expects forward slashes in sqlite URLs. norm = path.replace("\\", "/") return f"sqlite:///{norm}" _DB_URL = _make_temp_sqlite_url() os.environ["DATABASE_URL"] = _DB_URL # IMPORTANT: import app/models AFTER DATABASE_URL is set. from fastapi.testclient import TestClient # noqa: E402 import worker # noqa: E402 from database import SessionLocal # noqa: E402 from models import ( # noqa: E402 Config, Keyword, ScrapeRound, ScrapeRoundItem, TargetSite, Listing, ) class ScrapeProgressEndpointTests(unittest.TestCase): @classmethod def setUpClass(cls) -> None: cls.client = TestClient(worker.app) db = SessionLocal() try: # Force keyword batching mode so the endpoint returns keyword_batch_enabled=true. cfg = db.query(Config).filter(Config.key == "keyword_batch_enabled").first() if cfg: cfg.value = "true" else: db.add(Config(key="keyword_batch_enabled", value="true")) # Create dedicated keywords so assertions can match by term. kw_due = Keyword(term="TEST_DUE_KEYWORD", weight=1, sort_order=999001) kw_not_due = Keyword(term="TEST_RETRY_KEYWORD", weight=1, sort_order=999002) kw_zero_attempt = Keyword(term="TEST_ZERO_ATTEMPT_KEYWORD", weight=1, sort_order=999003) db.add_all([kw_due, kw_not_due, kw_zero_attempt]) db.flush() # Use any existing seeded site (or create one if empty). site = db.query(TargetSite).order_by(TargetSite.id.asc()).first() if site is None: site = TargetSite( name="TEST_SITE", url_template="https://example.com/?q={keyword}", search_selector="", enabled=1, max_pages=1, sort_order=0, ) db.add(site) db.flush() now = datetime.now() cls._round_started_at = now - timedelta(hours=2) round_row = ScrapeRound(started_at=cls._round_started_at, status="active") db.add(round_row) db.flush() cls._round_id = round_row.id # Warn due: base >= 1 hour ago -> warn_due should be True. due_item = ScrapeRoundItem( round_id=cls._round_id, site_id=site.id, keyword_id=kw_due.id, status="pending", attempt_count=1, first_pending_at=now - timedelta(minutes=90), last_attempt_at=now - timedelta(minutes=10), last_hour_warn_at=now - timedelta(hours=2), last_error=None, ) # Not due: base < 1 hour ago -> warn_due should be False. retry_item = ScrapeRoundItem( round_id=cls._round_id, site_id=site.id, keyword_id=kw_not_due.id, status="pending", attempt_count=2, first_pending_at=now - timedelta(minutes=30), last_attempt_at=now - timedelta(minutes=20), last_hour_warn_at=None, last_error=None, ) # attempt_count == 0 must be excluded from pending_items. zero_attempt_item = ScrapeRoundItem( round_id=cls._round_id, site_id=site.id, keyword_id=kw_zero_attempt.id, status="pending", attempt_count=0, first_pending_at=now - timedelta(hours=2), last_attempt_at=None, last_hour_warn_at=now - timedelta(hours=2), last_error=None, ) db.add_all([due_item, retry_item, zero_attempt_item]) # Seed listings for countdown-sync endpoint. # (No listings are seeded by default seed_database().) now2 = datetime.now() listing_with_price = Listing( title="TEST_LISTING_WITH_PRICE_UPDATED_AT", link="https://example.com/listing-with-price-updated-at", price=100.0, currency="USD", price_raw="$100", time_left="12h", time_left_mins=12.5, price_updated_at=now2 - timedelta(minutes=5), score=0, keyword="kw", site_name=site.name, timestamp=now2 - timedelta(minutes=6), closing_alerts_sent="[]", images="[]", description="", ) listing_without_price = Listing( title="TEST_LISTING_NO_PRICE_UPDATED_AT", link="https://example.com/listing-no-price-updated-at", price=None, currency="USD", price_raw="", time_left="7h", time_left_mins=7.0, price_updated_at=None, score=0, keyword="kw", site_name=site.name, timestamp=now2 - timedelta(minutes=9), closing_alerts_sent="[]", images="[]", description="", ) db.add_all([listing_with_price, listing_without_price]) db.commit() cls._listing_with_price_id = listing_with_price.id cls._listing_without_price_id = listing_without_price.id cls._last_price_update_iso = ( listing_with_price.price_updated_at.isoformat() if listing_with_price.price_updated_at else None ) cls._expected_enabled_count = db.query(TargetSite).filter(TargetSite.enabled == 1).count() finally: db.close() @classmethod def tearDownClass(cls) -> None: # Best-effort cleanup. try: # DATABASE_URL points to temp db file. if _DB_URL.startswith("sqlite:///"): db_path = _DB_URL.replace("sqlite:///", "", 1) if db_path and os.path.exists(db_path): os.remove(db_path) except Exception: pass def test_progress_returns_warn_due_and_filters_zero_attempt(self) -> None: res = self.client.get("/api/scrape/progress") self.assertEqual(res.status_code, 200) data = res.json() self.assertEqual(data["keyword_batch_enabled"], True) self.assertIsNotNone(data["active_round"]) self.assertEqual(data["active_round"]["id"], self._round_id) items = data["pending_items"] # attempt_count==0 is excluded self.assertEqual(len(items), 2) kw_terms = {it["keyword_term"]: it for it in items} self.assertIn("TEST_DUE_KEYWORD", kw_terms) self.assertIn("TEST_RETRY_KEYWORD", kw_terms) self.assertNotIn("TEST_ZERO_ATTEMPT_KEYWORD", kw_terms) self.assertTrue(kw_terms["TEST_DUE_KEYWORD"]["warn_due"]) self.assertFalse(kw_terms["TEST_RETRY_KEYWORD"]["warn_due"]) def test_progress_returns_no_active_round(self) -> None: # Temporarily mark the active round as finished. db = SessionLocal() try: round_row = db.query(ScrapeRound).filter(ScrapeRound.id == self._round_id).first() self.assertIsNotNone(round_row) round_row.status = "finished" db.flush() db.commit() res = self.client.get("/api/scrape/progress") self.assertEqual(res.status_code, 200) data = res.json() self.assertIsNone(data["active_round"]) self.assertEqual(data["pending_items"], []) finally: # Restore active status so tests won't interfere if order changes. try: db2 = SessionLocal() try: round_row = db2.query(ScrapeRound).filter(ScrapeRound.id == self._round_id).first() if round_row: round_row.status = "active" db2.flush() db2.commit() finally: db2.close() finally: db.close() def test_stats_endpoint_shape(self) -> None: res = self.client.get("/api/stats") self.assertEqual(res.status_code, 200) data = res.json() # Ensure core keys exist and types are reasonable. self.assertIn("uptime_seconds", data) self.assertIsInstance(data["uptime_seconds"], int) self.assertIn("engine_status", data) self.assertIn("total_scanned", data) self.assertIn("total_alerts", data) self.assertIn("last_cycle", data) self.assertIn("uptime_start", data) def test_countdown_sync_returns_time_left_mins_and_iso_timestamps(self) -> None: res = self.client.get("/api/listings/countdown-sync") self.assertEqual(res.status_code, 200) items = res.json() by_id = {it["id"]: it for it in items} self.assertIn(self._listing_with_price_id, by_id) self.assertIn(self._listing_without_price_id, by_id) with_price = by_id[self._listing_with_price_id] self.assertAlmostEqual(with_price["time_left_mins"], 12.5, places=1) self.assertIsNotNone(with_price["price_updated_at"]) self.assertIsNotNone(with_price["timestamp"]) without_price = by_id[self._listing_without_price_id] self.assertAlmostEqual(without_price["time_left_mins"], 7.0, places=1) self.assertIsNone(without_price["price_updated_at"]) self.assertIsNotNone(without_price["timestamp"]) def test_refresh_status_returns_last_price_update_and_listing_count(self) -> None: res = self.client.get("/api/listings/refresh-status") self.assertEqual(res.status_code, 200) data = res.json() self.assertEqual(data["listing_count"], 2) self.assertEqual(data["last_price_update"], self._last_price_update_iso) def test_sites_endpoint_returns_int_flags(self) -> None: res = self.client.get("/api/sites") self.assertEqual(res.status_code, 200) sites = res.json() self.assertTrue(isinstance(sites, list)) self.assertGreater(len(sites), 0) for s in sites: # These must be numeric flags (0/1), not JSON booleans. self.assertIn(s["enabled"], (0, 1)) self.assertIn(s["custom_visible_browser"], (0, 1)) self.assertIn(s["requires_login"], (0, 1)) self.assertIn(s["login_enabled"], (0, 1)) # Ensure types are not JSON booleans. self.assertIs(type(s["enabled"]), int) self.assertIs(type(s["custom_visible_browser"]), int) self.assertIs(type(s["requires_login"]), int) self.assertIs(type(s["login_enabled"]), int) def test_enabled_count_matches_db(self) -> None: res = self.client.get("/api/sites/enabled-count") self.assertEqual(res.status_code, 200) data = res.json() self.assertEqual(data["count"], self._expected_enabled_count) def test_config_get_returns_flat_string_dict(self) -> None: res = self.client.get("/api/config") self.assertEqual(res.status_code, 200) data = res.json() self.assertIsInstance(data, dict) self.assertNotIsInstance(data, list) # Seeded by seed_database(); value must be a string. self.assertIn("keyword_batch_enabled", data) self.assertIsInstance(data["keyword_batch_enabled"], str) def test_config_post_upserts_flat_dict_values_as_strings(self) -> None: key_a = "__TEST_CFG_A" key_b = "__TEST_CFG_B" res = self.client.post( "/api/config", json={key_a: "1", key_b: "abc"}, ) self.assertEqual(res.status_code, 200) body = res.json() self.assertEqual(body["status"], "saved") self.assertIn(key_a, body["keys"]) self.assertIn(key_b, body["keys"]) after = self.client.get("/api/config").json() self.assertEqual(after[key_a], "1") self.assertEqual(after[key_b], "abc") # Upsert (update existing key). res2 = self.client.post("/api/config", json={key_a: "2"}) self.assertEqual(res2.status_code, 200) after2 = self.client.get("/api/config").json() self.assertEqual(after2[key_a], "2") if __name__ == "__main__": unittest.main()