39640-vm/core/rss.py
Flatlogic Bot c559e6fffc 1
2026-04-14 16:55:47 +00:00

162 lines
5.6 KiB
Python

from __future__ import annotations
import hashlib
import logging
import re
from datetime import timezone as dt_timezone
from datetime import timedelta
from email.utils import parsedate_to_datetime
from html import unescape
from urllib.request import Request, urlopen
import xml.etree.ElementTree as ET
from django.db.models import Q
from django.utils import timezone
from .models import Article, NewsSource, Topic
logger = logging.getLogger(__name__)
HTML_RE = re.compile(r'<[^>]+>')
NAMESPACES = {
'atom': 'http://www.w3.org/2005/Atom',
'media': 'http://search.yahoo.com/mrss/',
'content': 'http://purl.org/rss/1.0/modules/content/',
}
DEFAULT_TOPIC_MAP = {
'ai': 'Artificial Intelligence',
'artificial intelligence': 'Artificial Intelligence',
'startup': 'Startups',
'startups': 'Startups',
'venture': 'Venture Capital',
'funding': 'Venture Capital',
'cloud': 'Product & Cloud',
'saas': 'Product & Cloud',
'product': 'Product & Cloud',
'hardware': 'Hardware',
'chips': 'Hardware',
'security': 'Security',
}
def strip_html(value: str) -> str:
return re.sub(r'\s+', ' ', HTML_RE.sub(' ', unescape(value or ''))).strip()
def text_or_empty(node, path: str) -> str:
found = node.find(path, NAMESPACES)
if found is None:
return ''
return ''.join(found.itertext()).strip()
def pick_topic(*parts: str) -> Topic | None:
combined = ' '.join(part.lower() for part in parts if part)
for keyword, topic_name in DEFAULT_TOPIC_MAP.items():
if keyword in combined:
return Topic.objects.filter(name=topic_name).first()
return Topic.objects.order_by('name').first()
def parse_datetime(value: str):
if not value:
return timezone.now()
try:
parsed = parsedate_to_datetime(value)
if timezone.is_naive(parsed):
parsed = timezone.make_aware(parsed, dt_timezone.utc)
return parsed.astimezone(dt_timezone.utc)
except (TypeError, ValueError, IndexError, OverflowError):
return timezone.now()
def _extract_image(item: ET.Element) -> str:
enclosure = item.find('enclosure')
if enclosure is not None and 'image' in enclosure.attrib.get('type', ''):
return enclosure.attrib.get('url', '')
for path in ['media:content', 'media:thumbnail']:
media = item.find(path, NAMESPACES)
if media is not None:
return media.attrib.get('url', '')
return ''
def _dedupe_key(source: NewsSource, guid: str, link: str, title: str) -> str:
payload = f"{source.pk}|{guid or link or title}".encode('utf-8')
return hashlib.sha256(payload).hexdigest()
def import_feed(source: NewsSource, limit: int = 8) -> int:
request = Request(
source.feed_url,
headers={'User-Agent': 'Mozilla/5.0 FlatlogicNewsroomBot/1.0'},
)
with urlopen(request, timeout=15) as response:
body = response.read()
root = ET.fromstring(body)
channel_items = root.findall('./channel/item')
atom_entries = root.findall('./atom:entry', NAMESPACES)
items = channel_items or atom_entries
created_count = 0
for item in items[:limit]:
title = text_or_empty(item, 'title') or text_or_empty(item, 'atom:title')
link = text_or_empty(item, 'link') or item.attrib.get('href', '')
if not link:
atom_link = item.find('atom:link', NAMESPACES)
if atom_link is not None:
link = atom_link.attrib.get('href', '')
guid = text_or_empty(item, 'guid') or text_or_empty(item, 'atom:id')
excerpt = (
text_or_empty(item, 'description')
or text_or_empty(item, 'atom:summary')
or text_or_empty(item, 'content:encoded')
)
content = text_or_empty(item, 'content:encoded') or text_or_empty(item, 'atom:content') or excerpt
published_raw = (
text_or_empty(item, 'pubDate')
or text_or_empty(item, 'published')
or text_or_empty(item, 'updated')
or text_or_empty(item, 'atom:updated')
)
author_name = text_or_empty(item, 'author') or text_or_empty(item, 'atom:author/atom:name')
category_text = ' '.join(elem.text or '' for elem in item.findall('category'))
dedupe_key = _dedupe_key(source, guid, link, title)
if not title or Article.objects.filter(dedupe_key=dedupe_key).exists():
continue
article = Article(
title=strip_html(title),
excerpt=strip_html(excerpt)[:340],
content=strip_html(content),
article_kind=Article.ArticleKind.RSS,
source=source,
topic=pick_topic(title, excerpt, category_text, source.name),
external_url=link,
image_url=_extract_image(item),
author_name=strip_html(author_name)[:120],
published_at=parse_datetime(published_raw),
dedupe_key=dedupe_key,
is_published=True,
)
article.save()
created_count += 1
source.last_synced_at = timezone.now()
source.save(update_fields=['last_synced_at'])
return created_count
def sync_active_sources(limit: int = 6, stale_minutes: int = 45) -> int:
threshold = timezone.now() - timedelta(minutes=stale_minutes)
sources = NewsSource.objects.filter(is_active=True).filter(
Q(last_synced_at__isnull=True) | Q(last_synced_at__lt=threshold)
)
total_created = 0
for source in sources:
try:
total_created += import_feed(source, limit=limit)
except Exception as exc: # pragma: no cover - graceful failure for live feed parsing
logger.warning('RSS sync failed for %s: %s', source.name, exc)
return total_created