from __future__ import annotations import hashlib import logging import re from datetime import timezone as dt_timezone from datetime import timedelta from email.utils import parsedate_to_datetime from html import unescape from urllib.request import Request, urlopen import xml.etree.ElementTree as ET from django.db.models import Q from django.utils import timezone from .models import Article, NewsSource, Topic logger = logging.getLogger(__name__) HTML_RE = re.compile(r'<[^>]+>') NAMESPACES = { 'atom': 'http://www.w3.org/2005/Atom', 'media': 'http://search.yahoo.com/mrss/', 'content': 'http://purl.org/rss/1.0/modules/content/', } DEFAULT_TOPIC_MAP = { 'ai': 'Artificial Intelligence', 'artificial intelligence': 'Artificial Intelligence', 'startup': 'Startups', 'startups': 'Startups', 'venture': 'Venture Capital', 'funding': 'Venture Capital', 'cloud': 'Product & Cloud', 'saas': 'Product & Cloud', 'product': 'Product & Cloud', 'hardware': 'Hardware', 'chips': 'Hardware', 'security': 'Security', } def strip_html(value: str) -> str: return re.sub(r'\s+', ' ', HTML_RE.sub(' ', unescape(value or ''))).strip() def text_or_empty(node, path: str) -> str: found = node.find(path, NAMESPACES) if found is None: return '' return ''.join(found.itertext()).strip() def pick_topic(*parts: str) -> Topic | None: combined = ' '.join(part.lower() for part in parts if part) for keyword, topic_name in DEFAULT_TOPIC_MAP.items(): if keyword in combined: return Topic.objects.filter(name=topic_name).first() return Topic.objects.order_by('name').first() def parse_datetime(value: str): if not value: return timezone.now() try: parsed = parsedate_to_datetime(value) if timezone.is_naive(parsed): parsed = timezone.make_aware(parsed, dt_timezone.utc) return parsed.astimezone(dt_timezone.utc) except (TypeError, ValueError, IndexError, OverflowError): return timezone.now() def _extract_image(item: ET.Element) -> str: enclosure = item.find('enclosure') if enclosure is not None and 'image' in enclosure.attrib.get('type', ''): return enclosure.attrib.get('url', '') for path in ['media:content', 'media:thumbnail']: media = item.find(path, NAMESPACES) if media is not None: return media.attrib.get('url', '') return '' def _dedupe_key(source: NewsSource, guid: str, link: str, title: str) -> str: payload = f"{source.pk}|{guid or link or title}".encode('utf-8') return hashlib.sha256(payload).hexdigest() def import_feed(source: NewsSource, limit: int = 8) -> int: request = Request( source.feed_url, headers={'User-Agent': 'Mozilla/5.0 FlatlogicNewsroomBot/1.0'}, ) with urlopen(request, timeout=15) as response: body = response.read() root = ET.fromstring(body) channel_items = root.findall('./channel/item') atom_entries = root.findall('./atom:entry', NAMESPACES) items = channel_items or atom_entries created_count = 0 for item in items[:limit]: title = text_or_empty(item, 'title') or text_or_empty(item, 'atom:title') link = text_or_empty(item, 'link') or item.attrib.get('href', '') if not link: atom_link = item.find('atom:link', NAMESPACES) if atom_link is not None: link = atom_link.attrib.get('href', '') guid = text_or_empty(item, 'guid') or text_or_empty(item, 'atom:id') excerpt = ( text_or_empty(item, 'description') or text_or_empty(item, 'atom:summary') or text_or_empty(item, 'content:encoded') ) content = text_or_empty(item, 'content:encoded') or text_or_empty(item, 'atom:content') or excerpt published_raw = ( text_or_empty(item, 'pubDate') or text_or_empty(item, 'published') or text_or_empty(item, 'updated') or text_or_empty(item, 'atom:updated') ) author_name = text_or_empty(item, 'author') or text_or_empty(item, 'atom:author/atom:name') category_text = ' '.join(elem.text or '' for elem in item.findall('category')) dedupe_key = _dedupe_key(source, guid, link, title) if not title or Article.objects.filter(dedupe_key=dedupe_key).exists(): continue article = Article( title=strip_html(title), excerpt=strip_html(excerpt)[:340], content=strip_html(content), article_kind=Article.ArticleKind.RSS, source=source, topic=pick_topic(title, excerpt, category_text, source.name), external_url=link, image_url=_extract_image(item), author_name=strip_html(author_name)[:120], published_at=parse_datetime(published_raw), dedupe_key=dedupe_key, is_published=True, ) article.save() created_count += 1 source.last_synced_at = timezone.now() source.save(update_fields=['last_synced_at']) return created_count def sync_active_sources(limit: int = 6, stale_minutes: int = 45) -> int: threshold = timezone.now() - timedelta(minutes=stale_minutes) sources = NewsSource.objects.filter(is_active=True).filter( Q(last_synced_at__isnull=True) | Q(last_synced_at__lt=threshold) ) total_created = 0 for source in sources: try: total_created += import_feed(source, limit=limit) except Exception as exc: # pragma: no cover - graceful failure for live feed parsing logger.warning('RSS sync failed for %s: %s', source.name, exc) return total_created