342 lines
13 KiB
Python
342 lines
13 KiB
Python
import os
|
|
import tempfile
|
|
import unittest
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
def _make_temp_sqlite_url() -> str:
|
|
tmp = tempfile.NamedTemporaryFile(prefix="ghostnode_", suffix=".db", delete=False)
|
|
path = tmp.name
|
|
tmp.close()
|
|
# SQLAlchemy on Windows expects forward slashes in sqlite URLs.
|
|
norm = path.replace("\\", "/")
|
|
return f"sqlite:///{norm}"
|
|
|
|
|
|
_DB_URL = _make_temp_sqlite_url()
|
|
os.environ["DATABASE_URL"] = _DB_URL
|
|
|
|
|
|
# IMPORTANT: import app/models AFTER DATABASE_URL is set.
|
|
from fastapi.testclient import TestClient # noqa: E402
|
|
|
|
import worker # noqa: E402
|
|
from database import SessionLocal # noqa: E402
|
|
from models import ( # noqa: E402
|
|
Config,
|
|
Keyword,
|
|
ScrapeRound,
|
|
ScrapeRoundItem,
|
|
TargetSite,
|
|
Listing,
|
|
)
|
|
|
|
|
|
class ScrapeProgressEndpointTests(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
cls.client = TestClient(worker.app)
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Force keyword batching mode so the endpoint returns keyword_batch_enabled=true.
|
|
cfg = db.query(Config).filter(Config.key == "keyword_batch_enabled").first()
|
|
if cfg:
|
|
cfg.value = "true"
|
|
else:
|
|
db.add(Config(key="keyword_batch_enabled", value="true"))
|
|
|
|
# Create dedicated keywords so assertions can match by term.
|
|
kw_due = Keyword(term="TEST_DUE_KEYWORD", weight=1, sort_order=999001)
|
|
kw_not_due = Keyword(term="TEST_RETRY_KEYWORD", weight=1, sort_order=999002)
|
|
kw_zero_attempt = Keyword(term="TEST_ZERO_ATTEMPT_KEYWORD", weight=1, sort_order=999003)
|
|
db.add_all([kw_due, kw_not_due, kw_zero_attempt])
|
|
db.flush()
|
|
|
|
# Use any existing seeded site (or create one if empty).
|
|
site = db.query(TargetSite).order_by(TargetSite.id.asc()).first()
|
|
if site is None:
|
|
site = TargetSite(
|
|
name="TEST_SITE",
|
|
url_template="https://example.com/?q={keyword}",
|
|
search_selector="",
|
|
enabled=1,
|
|
max_pages=1,
|
|
sort_order=0,
|
|
)
|
|
db.add(site)
|
|
db.flush()
|
|
|
|
now = datetime.now()
|
|
cls._round_started_at = now - timedelta(hours=2)
|
|
round_row = ScrapeRound(started_at=cls._round_started_at, status="active")
|
|
db.add(round_row)
|
|
db.flush()
|
|
cls._round_id = round_row.id
|
|
|
|
# Warn due: base >= 1 hour ago -> warn_due should be True.
|
|
due_item = ScrapeRoundItem(
|
|
round_id=cls._round_id,
|
|
site_id=site.id,
|
|
keyword_id=kw_due.id,
|
|
status="pending",
|
|
attempt_count=1,
|
|
first_pending_at=now - timedelta(minutes=90),
|
|
last_attempt_at=now - timedelta(minutes=10),
|
|
last_hour_warn_at=now - timedelta(hours=2),
|
|
last_error=None,
|
|
)
|
|
|
|
# Not due: base < 1 hour ago -> warn_due should be False.
|
|
retry_item = ScrapeRoundItem(
|
|
round_id=cls._round_id,
|
|
site_id=site.id,
|
|
keyword_id=kw_not_due.id,
|
|
status="pending",
|
|
attempt_count=2,
|
|
first_pending_at=now - timedelta(minutes=30),
|
|
last_attempt_at=now - timedelta(minutes=20),
|
|
last_hour_warn_at=None,
|
|
last_error=None,
|
|
)
|
|
|
|
# attempt_count == 0 must be excluded from pending_items.
|
|
zero_attempt_item = ScrapeRoundItem(
|
|
round_id=cls._round_id,
|
|
site_id=site.id,
|
|
keyword_id=kw_zero_attempt.id,
|
|
status="pending",
|
|
attempt_count=0,
|
|
first_pending_at=now - timedelta(hours=2),
|
|
last_attempt_at=None,
|
|
last_hour_warn_at=now - timedelta(hours=2),
|
|
last_error=None,
|
|
)
|
|
|
|
db.add_all([due_item, retry_item, zero_attempt_item])
|
|
|
|
# Seed listings for countdown-sync endpoint.
|
|
# (No listings are seeded by default seed_database().)
|
|
now2 = datetime.now()
|
|
listing_with_price = Listing(
|
|
title="TEST_LISTING_WITH_PRICE_UPDATED_AT",
|
|
link="https://example.com/listing-with-price-updated-at",
|
|
price=100.0,
|
|
currency="USD",
|
|
price_raw="$100",
|
|
time_left="12h",
|
|
time_left_mins=12.5,
|
|
price_updated_at=now2 - timedelta(minutes=5),
|
|
score=0,
|
|
keyword="kw",
|
|
site_name=site.name,
|
|
timestamp=now2 - timedelta(minutes=6),
|
|
closing_alerts_sent="[]",
|
|
images="[]",
|
|
description="",
|
|
)
|
|
listing_without_price = Listing(
|
|
title="TEST_LISTING_NO_PRICE_UPDATED_AT",
|
|
link="https://example.com/listing-no-price-updated-at",
|
|
price=None,
|
|
currency="USD",
|
|
price_raw="",
|
|
time_left="7h",
|
|
time_left_mins=7.0,
|
|
price_updated_at=None,
|
|
score=0,
|
|
keyword="kw",
|
|
site_name=site.name,
|
|
timestamp=now2 - timedelta(minutes=9),
|
|
closing_alerts_sent="[]",
|
|
images="[]",
|
|
description="",
|
|
)
|
|
db.add_all([listing_with_price, listing_without_price])
|
|
db.commit()
|
|
|
|
cls._listing_with_price_id = listing_with_price.id
|
|
cls._listing_without_price_id = listing_without_price.id
|
|
cls._last_price_update_iso = (
|
|
listing_with_price.price_updated_at.isoformat()
|
|
if listing_with_price.price_updated_at
|
|
else None
|
|
)
|
|
|
|
cls._expected_enabled_count = db.query(TargetSite).filter(TargetSite.enabled == 1).count()
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls) -> None:
|
|
# Best-effort cleanup.
|
|
try:
|
|
# DATABASE_URL points to temp db file.
|
|
if _DB_URL.startswith("sqlite:///"):
|
|
db_path = _DB_URL.replace("sqlite:///", "", 1)
|
|
if db_path and os.path.exists(db_path):
|
|
os.remove(db_path)
|
|
except Exception:
|
|
pass
|
|
|
|
def test_progress_returns_warn_due_and_filters_zero_attempt(self) -> None:
|
|
res = self.client.get("/api/scrape/progress")
|
|
self.assertEqual(res.status_code, 200)
|
|
data = res.json()
|
|
|
|
self.assertEqual(data["keyword_batch_enabled"], True)
|
|
self.assertIsNotNone(data["active_round"])
|
|
self.assertEqual(data["active_round"]["id"], self._round_id)
|
|
|
|
items = data["pending_items"]
|
|
# attempt_count==0 is excluded
|
|
self.assertEqual(len(items), 2)
|
|
|
|
kw_terms = {it["keyword_term"]: it for it in items}
|
|
self.assertIn("TEST_DUE_KEYWORD", kw_terms)
|
|
self.assertIn("TEST_RETRY_KEYWORD", kw_terms)
|
|
self.assertNotIn("TEST_ZERO_ATTEMPT_KEYWORD", kw_terms)
|
|
|
|
self.assertTrue(kw_terms["TEST_DUE_KEYWORD"]["warn_due"])
|
|
self.assertFalse(kw_terms["TEST_RETRY_KEYWORD"]["warn_due"])
|
|
|
|
def test_progress_returns_no_active_round(self) -> None:
|
|
# Temporarily mark the active round as finished.
|
|
db = SessionLocal()
|
|
try:
|
|
round_row = db.query(ScrapeRound).filter(ScrapeRound.id == self._round_id).first()
|
|
self.assertIsNotNone(round_row)
|
|
round_row.status = "finished"
|
|
db.flush()
|
|
db.commit()
|
|
|
|
res = self.client.get("/api/scrape/progress")
|
|
self.assertEqual(res.status_code, 200)
|
|
data = res.json()
|
|
|
|
self.assertIsNone(data["active_round"])
|
|
self.assertEqual(data["pending_items"], [])
|
|
finally:
|
|
# Restore active status so tests won't interfere if order changes.
|
|
try:
|
|
db2 = SessionLocal()
|
|
try:
|
|
round_row = db2.query(ScrapeRound).filter(ScrapeRound.id == self._round_id).first()
|
|
if round_row:
|
|
round_row.status = "active"
|
|
db2.flush()
|
|
db2.commit()
|
|
finally:
|
|
db2.close()
|
|
finally:
|
|
db.close()
|
|
|
|
def test_stats_endpoint_shape(self) -> None:
|
|
res = self.client.get("/api/stats")
|
|
self.assertEqual(res.status_code, 200)
|
|
data = res.json()
|
|
|
|
# Ensure core keys exist and types are reasonable.
|
|
self.assertIn("uptime_seconds", data)
|
|
self.assertIsInstance(data["uptime_seconds"], int)
|
|
self.assertIn("engine_status", data)
|
|
self.assertIn("total_scanned", data)
|
|
self.assertIn("total_alerts", data)
|
|
self.assertIn("last_cycle", data)
|
|
self.assertIn("uptime_start", data)
|
|
|
|
def test_countdown_sync_returns_time_left_mins_and_iso_timestamps(self) -> None:
|
|
res = self.client.get("/api/listings/countdown-sync")
|
|
self.assertEqual(res.status_code, 200)
|
|
items = res.json()
|
|
|
|
by_id = {it["id"]: it for it in items}
|
|
self.assertIn(self._listing_with_price_id, by_id)
|
|
self.assertIn(self._listing_without_price_id, by_id)
|
|
|
|
with_price = by_id[self._listing_with_price_id]
|
|
self.assertAlmostEqual(with_price["time_left_mins"], 12.5, places=1)
|
|
self.assertIsNotNone(with_price["price_updated_at"])
|
|
self.assertIsNotNone(with_price["timestamp"])
|
|
|
|
without_price = by_id[self._listing_without_price_id]
|
|
self.assertAlmostEqual(without_price["time_left_mins"], 7.0, places=1)
|
|
self.assertIsNone(without_price["price_updated_at"])
|
|
self.assertIsNotNone(without_price["timestamp"])
|
|
|
|
def test_refresh_status_returns_last_price_update_and_listing_count(self) -> None:
|
|
res = self.client.get("/api/listings/refresh-status")
|
|
self.assertEqual(res.status_code, 200)
|
|
data = res.json()
|
|
|
|
self.assertEqual(data["listing_count"], 2)
|
|
self.assertEqual(data["last_price_update"], self._last_price_update_iso)
|
|
|
|
def test_sites_endpoint_returns_int_flags(self) -> None:
|
|
res = self.client.get("/api/sites")
|
|
self.assertEqual(res.status_code, 200)
|
|
sites = res.json()
|
|
self.assertTrue(isinstance(sites, list))
|
|
self.assertGreater(len(sites), 0)
|
|
|
|
for s in sites:
|
|
# These must be numeric flags (0/1), not JSON booleans.
|
|
self.assertIn(s["enabled"], (0, 1))
|
|
self.assertIn(s["custom_visible_browser"], (0, 1))
|
|
self.assertIn(s["requires_login"], (0, 1))
|
|
self.assertIn(s["login_enabled"], (0, 1))
|
|
|
|
# Ensure types are not JSON booleans.
|
|
self.assertIs(type(s["enabled"]), int)
|
|
self.assertIs(type(s["custom_visible_browser"]), int)
|
|
self.assertIs(type(s["requires_login"]), int)
|
|
self.assertIs(type(s["login_enabled"]), int)
|
|
|
|
def test_enabled_count_matches_db(self) -> None:
|
|
res = self.client.get("/api/sites/enabled-count")
|
|
self.assertEqual(res.status_code, 200)
|
|
data = res.json()
|
|
self.assertEqual(data["count"], self._expected_enabled_count)
|
|
|
|
def test_config_get_returns_flat_string_dict(self) -> None:
|
|
res = self.client.get("/api/config")
|
|
self.assertEqual(res.status_code, 200)
|
|
data = res.json()
|
|
|
|
self.assertIsInstance(data, dict)
|
|
self.assertNotIsInstance(data, list)
|
|
|
|
# Seeded by seed_database(); value must be a string.
|
|
self.assertIn("keyword_batch_enabled", data)
|
|
self.assertIsInstance(data["keyword_batch_enabled"], str)
|
|
|
|
def test_config_post_upserts_flat_dict_values_as_strings(self) -> None:
|
|
key_a = "__TEST_CFG_A"
|
|
key_b = "__TEST_CFG_B"
|
|
|
|
res = self.client.post(
|
|
"/api/config",
|
|
json={key_a: "1", key_b: "abc"},
|
|
)
|
|
self.assertEqual(res.status_code, 200)
|
|
body = res.json()
|
|
self.assertEqual(body["status"], "saved")
|
|
self.assertIn(key_a, body["keys"])
|
|
self.assertIn(key_b, body["keys"])
|
|
|
|
after = self.client.get("/api/config").json()
|
|
self.assertEqual(after[key_a], "1")
|
|
self.assertEqual(after[key_b], "abc")
|
|
|
|
# Upsert (update existing key).
|
|
res2 = self.client.post("/api/config", json={key_a: "2"})
|
|
self.assertEqual(res2.status_code, 200)
|
|
|
|
after2 = self.client.get("/api/config").json()
|
|
self.assertEqual(after2[key_a], "2")
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|
|
|