218 lines
8.7 KiB
Python
218 lines
8.7 KiB
Python
import os
|
||
import re
|
||
import csv
|
||
import io
|
||
import time
|
||
import pandas as pd
|
||
import requests
|
||
from docx import Document
|
||
from django.shortcuts import render, redirect, get_object_or_404
|
||
from django.http import HttpResponse, FileResponse
|
||
from django.utils import timezone
|
||
from .models import ExtractionTask, ExtractedUser
|
||
|
||
def home(request):
|
||
"""Render the landing screen with the tool interface."""
|
||
tasks = ExtractionTask.objects.all().order_by('-created_at')[:10]
|
||
return render(request, "core/index.html", {"tasks": tasks})
|
||
|
||
def analyze(request):
|
||
if request.method == "POST":
|
||
task_type = request.POST.get("task_type", "fans")
|
||
raw_text = request.POST.get("raw_text", "").strip()
|
||
|
||
if not raw_text:
|
||
return redirect('home')
|
||
|
||
# Create task
|
||
task = ExtractionTask.objects.create(
|
||
task_type=task_type,
|
||
raw_text=raw_text
|
||
)
|
||
|
||
extracted_count = 0
|
||
found_ids = set()
|
||
|
||
# --- PHASE 1: ROBUST FANS/FOLLOWING PARSING ---
|
||
if task_type in ['fans', 'following']:
|
||
# Strategy A: Look for explicit ID markers
|
||
# Expected format: Nickname followed by ID line
|
||
|
||
lines = [l.strip() for l in raw_text.split('\n') if l.strip()]
|
||
|
||
for i, line in enumerate(lines):
|
||
xhs_id = None
|
||
nickname = "未知用户"
|
||
|
||
# Check for explicit ID marker in this line
|
||
match = re.search(r'(?:小红书号|ID|id)[::\s]*([a-zA-Z0-9_.-]{5,})', line, re.IGNORECASE)
|
||
if match:
|
||
xhs_id = match.group(1).strip()
|
||
# Nickname is likely the previous line
|
||
if i > 0:
|
||
nickname = lines[i-1]
|
||
|
||
if xhs_id and xhs_id not in found_ids:
|
||
# Clean nickname (remove ID if it's there)
|
||
nickname = re.sub(r'(?:小红书号|ID|id).*', '', nickname, flags=re.IGNORECASE).strip()
|
||
if not nickname: nickname = "小红书用户"
|
||
|
||
ExtractedUser.objects.create(
|
||
task=task,
|
||
nickname=nickname[:250],
|
||
xhs_id=xhs_id[:100],
|
||
)
|
||
found_ids.add(xhs_id)
|
||
extracted_count += 1
|
||
|
||
# Strategy B: If still nothing, look for "nickname / ID" pattern without markers
|
||
if extracted_count == 0:
|
||
for i in range(len(lines) - 1):
|
||
line1 = lines[i]
|
||
line2 = lines[i+1]
|
||
# If line2 looks like an ID (alphanumeric, 6-15 chars) and line1 is not too long
|
||
if re.match(r'^[a-zA-Z0-9_.-]{6,15}$', line2) and len(line1) < 40:
|
||
if line2 not in found_ids:
|
||
ExtractedUser.objects.create(
|
||
task=task,
|
||
nickname=line1[:250],
|
||
xhs_id=line2[:100],
|
||
)
|
||
found_ids.add(line2)
|
||
extracted_count += 1
|
||
|
||
# --- PHASE 2: ROBUST COMMENT PARSING ---
|
||
if task_type == 'comments' or extracted_count == 0:
|
||
# Pattern: [Nickname]
|
||
[Content]
|
||
[Time/Location]
|
||
# Time formats: 10-24, 2小时前, 昨天, 刚刚, 3天前
|
||
time_pattern = r'^(\d{2}-\d{2}|\d+[-天小分][前时钟]*|昨天|刚刚|\d{4}-\d{2}-\d{2}.*|IP:.*)$'
|
||
|
||
lines = [l.strip() for l in raw_text.split('\n') if l.strip()]
|
||
i = 0
|
||
while i < len(lines) - 1:
|
||
nickname = lines[i]
|
||
potential_content = lines[i+1]
|
||
|
||
# Check if there's a third line for time
|
||
if i + 2 < len(lines) and re.match(time_pattern, lines[i+2]):
|
||
content = potential_content
|
||
time_info = lines[i+2]
|
||
if len(nickname) < 50:
|
||
ExtractedUser.objects.create(
|
||
task=task,
|
||
nickname=nickname[:250],
|
||
comment_text=f"[{time_info}] {content}"
|
||
)
|
||
extracted_count += 1
|
||
i += 3
|
||
continue
|
||
i += 1
|
||
|
||
# --- PHASE 3: FALLBACK & SMART LINK HANDLING ---
|
||
if extracted_count == 0:
|
||
all_urls = re.findall(r'https?://[^\s]+', raw_text)
|
||
for url in all_urls:
|
||
ExtractedUser.objects.create(
|
||
task=task,
|
||
nickname="待采集主页",
|
||
profile_url=url[:500],
|
||
comment_text="[智能识别] 已锁定目标。由于小红书加密机制,请点击「高精度修复」手动粘贴列表内容。"
|
||
)
|
||
extracted_count += 1
|
||
|
||
if not all_urls:
|
||
chunks = re.split(r'[\s,,;;]', raw_text)
|
||
for chunk in chunks:
|
||
chunk = chunk.strip()
|
||
if re.match(r'^[a-zA-Z0-9_.-]{6,20}$', chunk) and chunk not in found_ids:
|
||
ExtractedUser.objects.create(
|
||
task=task,
|
||
nickname="待分析用户",
|
||
xhs_id=chunk[:100],
|
||
)
|
||
found_ids.add(chunk)
|
||
extracted_count += 1
|
||
|
||
return redirect('task_detail', task_id=task.id)
|
||
return redirect('home')
|
||
|
||
def task_detail(request, task_id):
|
||
task = get_object_or_404(ExtractionTask, id=task_id)
|
||
users = task.users.all()
|
||
needs_paste = False
|
||
if task.users.count() <= 1 and len(task.raw_text) < 300:
|
||
needs_paste = True
|
||
|
||
return render(request, "core/task_detail.html", {
|
||
"task": task,
|
||
"users": users,
|
||
"needs_paste": needs_paste
|
||
})
|
||
|
||
def history(request):
|
||
tasks = ExtractionTask.objects.all().order_by('-created_at')
|
||
return render(request, "core/history.html", {"tasks": tasks})
|
||
|
||
def export_task(request, task_id, format):
|
||
task = get_object_or_404(ExtractionTask, id=task_id)
|
||
users = task.users.all()
|
||
|
||
data = []
|
||
for user in users:
|
||
row = {
|
||
"昵称": user.nickname,
|
||
"小红书ID": user.xhs_id,
|
||
"主页链接": user.profile_url,
|
||
"评论/备注": user.comment_text,
|
||
"提取时间": user.extracted_at.strftime('%Y-%m-%d %H:%M')
|
||
}
|
||
data.append(row)
|
||
|
||
if not data:
|
||
data = [{"昵称": "未提取到数据", "小红书ID": "-", "主页链接": "-"}]
|
||
|
||
df = pd.DataFrame(data)
|
||
timestamp = timezone.now().strftime('%Y%m%d_%H%M')
|
||
filename = f"xhs_{{task.task_type}}_{{timestamp}}"
|
||
|
||
if format == 'csv':
|
||
response = HttpResponse(content_type='text/csv')
|
||
response['Content-Disposition'] = f'attachment; filename="{{filename}}.csv"'
|
||
df.to_csv(path_or_buf=response, index=False, encoding='utf-8-sig')
|
||
return response
|
||
|
||
elif format == 'excel':
|
||
output = io.BytesIO()
|
||
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
||
df.to_excel(writer, index=False, sheet_name='Data')
|
||
output.seek(0)
|
||
response = HttpResponse(output.read(), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
|
||
response['Content-Disposition'] = f'attachment; filename="{{filename}}.xlsx"'
|
||
return response
|
||
|
||
elif format == 'word':
|
||
doc = Document()
|
||
doc.add_heading(f'小红书数据导出 - {{task.get_task_type_display()}}', 0)
|
||
doc.add_paragraph(f'导出时间: {{timezone.now().strftime("%Y-%m-%d %H:%M:%S")}}\n')
|
||
|
||
if not df.empty:
|
||
table = doc.add_table(rows=1, cols=len(df.columns))
|
||
hdr_cells = table.rows[0].cells
|
||
for i, column in enumerate(df.columns):
|
||
hdr_cells[i].text = column
|
||
|
||
for index, row in df.iterrows():
|
||
row_cells = table.add_row().cells
|
||
for i, column in enumerate(df.columns):
|
||
row_cells[i].text = str(row[column]) if row[column] else ""
|
||
|
||
output = io.BytesIO()
|
||
doc.save(output)
|
||
output.seek(0)
|
||
response = HttpResponse(output.read(), content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document')
|
||
response['Content-Disposition'] = f'attachment; filename="{{filename}}.docx"'
|
||
return response
|
||
|
||
return redirect('task_detail', task_id=task.id) |