38461-vm/core/pexels.py
2026-02-16 02:58:28 +00:00

91 lines
3.2 KiB
Python

import os
import requests
from pathlib import Path
API_KEY = os.getenv("PEXELS_KEY", "Vc99rnmOhHhJAbgGQoKLZtsaIVfkeownoQNbTj78VemUjKh08ZYRbf18")
IMAGE_CACHE_DIR = Path("static/images/pexels")
VIDEO_CACHE_DIR = Path("static/videos/pexels")
def fetch_first(query: str, orientation: str = "landscape") -> dict | None:
if not API_KEY:
return None
headers = {"Authorization": API_KEY}
url = "https://api.pexels.com/v1/search"
params = {"query": query, "orientation": orientation, "per_page": 1, "page": 1}
try:
resp = requests.get(url, headers=headers, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
photo = (data.get("photos") or [None])[0]
if not photo:
return None
src = photo["src"].get("large2x") or photo["src"].get("large") or photo["src"].get("original")
IMAGE_CACHE_DIR.mkdir(parents=True, exist_ok=True)
target = IMAGE_CACHE_DIR / f"{photo['id']}.jpg"
if src and not target.exists():
img_resp = requests.get(src, timeout=15)
img_resp.raise_for_status()
target.write_bytes(img_resp.content)
return {
"id": photo["id"],
"local_path": f"images/pexels/{photo['id']}.jpg",
"photographer": photo.get("photographer"),
}
except Exception as e:
print(f"Error fetching image from Pexels: {e}")
return None
def fetch_video(query: str, orientation: str = "landscape") -> dict | None:
if not API_KEY:
return None
headers = {"Authorization": API_KEY}
url = "https://api.pexels.com/videos/search"
params = {"query": query, "orientation": orientation, "per_page": 1, "page": 1}
try:
resp = requests.get(url, headers=headers, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
video_data = (data.get("videos") or [None])[0]
if not video_data:
return None
# Get the best HD video link
video_files = video_data.get("video_files", [])
# Prefer HD/Full HD mp4
best_file = None
for f in video_files:
if f.get("file_type") == "video/mp4":
if not best_file or f.get("width", 0) > best_file.get("width", 0):
best_file = f
if not best_file:
return None
src = best_file["link"]
VIDEO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
target = VIDEO_CACHE_DIR / f"{video_data['id']}.mp4"
if src and not target.exists():
vid_resp = requests.get(src, timeout=30, stream=True)
vid_resp.raise_for_status()
with open(target, 'wb') as f:
for chunk in vid_resp.iter_content(chunk_size=8192):
f.write(chunk)
return {
"id": video_data["id"],
"local_path": f"videos/pexels/{video_data['id']}.mp4",
"user": video_data.get("user", {}).get("name"),
}
except Exception as e:
print(f"Error fetching video from Pexels: {e}")
return None