import os import re import csv import io import time import pandas as pd import requests from docx import Document from django.shortcuts import render, redirect, get_object_or_404 from django.http import HttpResponse, FileResponse from django.utils import timezone from .models import ExtractionTask, ExtractedUser def home(request): """Render the landing screen with the tool interface.""" tasks = ExtractionTask.objects.all().order_by('-created_at')[:10] return render(request, "core/index.html", {"tasks": tasks}) def analyze(request): if request.method == "POST": task_type = request.POST.get("task_type", "fans") raw_text = request.POST.get("raw_text", "").strip() if not raw_text: return redirect('home') # Create task task = ExtractionTask.objects.create( task_type=task_type, raw_text=raw_text ) extracted_count = 0 found_ids = set() # --- PHASE 1: ROBUST FANS/FOLLOWING PARSING --- if task_type in ['fans', 'following']: # Strategy A: Look for explicit ID markers # Expected format: Nickname followed by ID line lines = [l.strip() for l in raw_text.split('\n') if l.strip()] for i, line in enumerate(lines): xhs_id = None nickname = "未知用户" # Check for explicit ID marker in this line match = re.search(r'(?:小红书号|ID|id)[::\s]*([a-zA-Z0-9_.-]{5,})', line, re.IGNORECASE) if match: xhs_id = match.group(1).strip() # Nickname is likely the previous line if i > 0: nickname = lines[i-1] if xhs_id and xhs_id not in found_ids: # Clean nickname (remove ID if it's there) nickname = re.sub(r'(?:小红书号|ID|id).*', '', nickname, flags=re.IGNORECASE).strip() if not nickname: nickname = "小红书用户" ExtractedUser.objects.create( task=task, nickname=nickname[:250], xhs_id=xhs_id[:100], ) found_ids.add(xhs_id) extracted_count += 1 # Strategy B: If still nothing, look for "nickname / ID" pattern without markers if extracted_count == 0: for i in range(len(lines) - 1): line1 = lines[i] line2 = lines[i+1] # If line2 looks like an ID (alphanumeric, 6-15 chars) and line1 is not too long if re.match(r'^[a-zA-Z0-9_.-]{6,15}$', line2) and len(line1) < 40: if line2 not in found_ids: ExtractedUser.objects.create( task=task, nickname=line1[:250], xhs_id=line2[:100], ) found_ids.add(line2) extracted_count += 1 # --- PHASE 2: ROBUST COMMENT PARSING --- if task_type == 'comments' or extracted_count == 0: # Pattern: [Nickname] [Content] [Time/Location] # Time formats: 10-24, 2小时前, 昨天, 刚刚, 3天前 time_pattern = r'^(\d{2}-\d{2}|\d+[-天小分][前时钟]*|昨天|刚刚|\d{4}-\d{2}-\d{2}.*|IP:.*)$' lines = [l.strip() for l in raw_text.split('\n') if l.strip()] i = 0 while i < len(lines) - 1: nickname = lines[i] potential_content = lines[i+1] # Check if there's a third line for time if i + 2 < len(lines) and re.match(time_pattern, lines[i+2]): content = potential_content time_info = lines[i+2] if len(nickname) < 50: ExtractedUser.objects.create( task=task, nickname=nickname[:250], comment_text=f"[{time_info}] {content}" ) extracted_count += 1 i += 3 continue i += 1 # --- PHASE 3: FALLBACK & SMART LINK HANDLING --- if extracted_count == 0: all_urls = re.findall(r'https?://[^\s]+', raw_text) for url in all_urls: ExtractedUser.objects.create( task=task, nickname="待采集主页", profile_url=url[:500], comment_text="[智能识别] 已锁定目标。由于小红书加密机制,请点击「高精度修复」手动粘贴列表内容。" ) extracted_count += 1 if not all_urls: chunks = re.split(r'[\s,,;;]', raw_text) for chunk in chunks: chunk = chunk.strip() if re.match(r'^[a-zA-Z0-9_.-]{6,20}$', chunk) and chunk not in found_ids: ExtractedUser.objects.create( task=task, nickname="待分析用户", xhs_id=chunk[:100], ) found_ids.add(chunk) extracted_count += 1 return redirect('task_detail', task_id=task.id) return redirect('home') def task_detail(request, task_id): task = get_object_or_404(ExtractionTask, id=task_id) users = task.users.all() needs_paste = False if task.users.count() <= 1 and len(task.raw_text) < 300: needs_paste = True return render(request, "core/task_detail.html", { "task": task, "users": users, "needs_paste": needs_paste }) def history(request): tasks = ExtractionTask.objects.all().order_by('-created_at') return render(request, "core/history.html", {"tasks": tasks}) def export_task(request, task_id, format): task = get_object_or_404(ExtractionTask, id=task_id) users = task.users.all() data = [] for user in users: row = { "昵称": user.nickname, "小红书ID": user.xhs_id, "主页链接": user.profile_url, "评论/备注": user.comment_text, "提取时间": user.extracted_at.strftime('%Y-%m-%d %H:%M') } data.append(row) if not data: data = [{"昵称": "未提取到数据", "小红书ID": "-", "主页链接": "-"}] df = pd.DataFrame(data) timestamp = timezone.now().strftime('%Y%m%d_%H%M') filename = f"xhs_{{task.task_type}}_{{timestamp}}" if format == 'csv': response = HttpResponse(content_type='text/csv') response['Content-Disposition'] = f'attachment; filename="{{filename}}.csv"' df.to_csv(path_or_buf=response, index=False, encoding='utf-8-sig') return response elif format == 'excel': output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, index=False, sheet_name='Data') output.seek(0) response = HttpResponse(output.read(), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') response['Content-Disposition'] = f'attachment; filename="{{filename}}.xlsx"' return response elif format == 'word': doc = Document() doc.add_heading(f'小红书数据导出 - {{task.get_task_type_display()}}', 0) doc.add_paragraph(f'导出时间: {{timezone.now().strftime("%Y-%m-%d %H:%M:%S")}}\n') if not df.empty: table = doc.add_table(rows=1, cols=len(df.columns)) hdr_cells = table.rows[0].cells for i, column in enumerate(df.columns): hdr_cells[i].text = column for index, row in df.iterrows(): row_cells = table.add_row().cells for i, column in enumerate(df.columns): row_cells[i].text = str(row[column]) if row[column] else "" output = io.BytesIO() doc.save(output) output.seek(0) response = HttpResponse(output.read(), content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document') response['Content-Disposition'] = f'attachment; filename="{{filename}}.docx"' return response return redirect('task_detail', task_id=task.id)