39948-vm/frontend/src/lib/offline/DownloadManager.ts

797 lines
22 KiB
TypeScript

/**
* DownloadManager
*
* Manages asset downloads with queue, retry, and progress tracking.
* Adapted from hoboken's videoProcessingQueue pattern for frontend use.
*/
import { PRELOAD_CONFIG } from '../../config/preload.config';
import { OFFLINE_CONFIG } from '../../config/offline.config';
import { downloadEventBus } from './DownloadEventBus';
import { StorageManager } from './StorageManager';
import { OfflineDbManager } from '../offlineDb/OfflineDbManager';
import {
extractStoragePath,
isPresignedUrl,
markPresignedUrlFailed,
buildProxyUrl,
} from '../assetUrl';
import { logger } from '../logger';
import type {
PreloadJobStatus,
AssetVariantType,
AssetType,
DownloadQueueItem,
} from '../../types/offline';
interface DownloadJob {
id: string;
assetId: string;
projectId: string;
url: string;
filename: string;
variantType: AssetVariantType;
assetType: AssetType;
priority: number;
status: PreloadJobStatus;
progress: number;
bytesLoaded: number;
totalBytes: number;
retryCount: number;
addedAt: number;
storageKey: string; // Canonical storage key for consistent caching
persist?: boolean; // Persist to IndexedDB for resume (default: true)
usedProxyFallback?: boolean; // Whether we've already tried proxy URL fallback
abortController?: AbortController;
resolve?: () => void;
reject?: (error: Error) => void;
/** Streaming mode state */
streamingMode?: {
enabled: boolean;
minBufferBytes: number;
streamingUrl: string;
didSignalReady: boolean;
};
}
class DownloadManagerClass {
private queue: DownloadJob[] = [];
private activeDownloads: Map<string, DownloadJob> = new Map();
private isPaused = false;
private isProcessing = false;
// Blob URL cache for instant lookup (storageKey → blobUrl)
private readyBlobUrls: Map<string, string> = new Map();
// Raw Blob cache for creating fresh blob URLs (storageKey → Blob)
// Used by transitions to avoid decoder state issues with pre-created blob URLs
private readyBlobs: Map<string, Blob> = new Map();
private config = {
maxConcurrent: PRELOAD_CONFIG.maxConcurrentDownloads,
chunkSize: PRELOAD_CONFIG.videoChunkSize,
maxRetries: PRELOAD_CONFIG.maxRetries,
retryDelayMs: PRELOAD_CONFIG.retryDelayMs,
largeFileThreshold: PRELOAD_CONFIG.largeFileThreshold,
};
/**
* Add a download job to the queue
* Always creates blob URLs for reliable playback on all devices
*/
async addJob(params: {
assetId: string;
projectId: string;
url: string;
filename: string;
variantType: AssetVariantType;
assetType: AssetType;
priority?: number;
storageKey?: string; // Optional, will extract if not provided
persist?: boolean; // Persist to IndexedDB for resume (default: true)
/** Enable streaming mode - signal ready after minimum buffer downloaded */
streamingMode?: {
enabled: boolean;
minBufferBytes?: number;
};
}): Promise<void> {
const storageKey = params.storageKey || extractStoragePath(params.url);
// Check cache status - if fully cached, create blob URL and return
const assetInfo = await StorageManager.getAssetInfo(storageKey);
if (assetInfo?.exists && !assetInfo.isPartial) {
// Fully cached - create blob URL if not already ready
if (!this.readyBlobUrls.has(storageKey)) {
await this.createBlobUrlFromCache(storageKey);
}
return;
}
// Check if already in queue (use storageKey for deduplication)
if (
this.queue.some((j) => j.storageKey === storageKey) ||
Array.from(this.activeDownloads.values()).some(
(j) => j.storageKey === storageKey,
)
) {
return; // Already queued
}
return new Promise((resolve, reject) => {
const job: DownloadJob = {
id: `dl-${Date.now()}-${Math.random().toString(36).slice(2, 9)}`,
assetId: params.assetId,
projectId: params.projectId,
url: params.url,
filename: params.filename,
variantType: params.variantType,
assetType: params.assetType,
priority:
params.priority ??
this.calculatePriority(params.assetType, params.variantType),
status: 'queued',
progress: 0,
bytesLoaded: 0,
totalBytes: 0,
retryCount: 0,
addedAt: Date.now(),
storageKey,
persist: params.persist ?? true,
resolve,
reject,
// Initialize streaming mode if enabled
streamingMode: params.streamingMode?.enabled
? {
enabled: true,
minBufferBytes:
params.streamingMode.minBufferBytes ?? this.getMinBufferBytes(),
streamingUrl: params.url,
didSignalReady: false,
}
: undefined,
};
// Persist to IndexedDB for resume capability (default true)
if (job.persist !== false) {
this.persistQueueItem(job);
}
// Insert in priority order (higher priority first)
const insertIndex = this.queue.findIndex(
(q) => q.priority < job.priority,
);
if (insertIndex === -1) {
this.queue.push(job);
} else {
this.queue.splice(insertIndex, 0, job);
}
downloadEventBus.emitQueueUpdate();
this.processQueue();
});
}
/**
* Calculate priority based on asset and variant type
*/
private calculatePriority(
assetType: AssetType,
variantType: AssetVariantType,
): number {
const typePriority = PRELOAD_CONFIG.priority.assetType[assetType] || 0;
const variantPriority = PRELOAD_CONFIG.priority.variant[variantType] || 0;
return typePriority + variantPriority;
}
/**
* Persist queue item to IndexedDB
*/
private async persistQueueItem(job: DownloadJob): Promise<void> {
const queueItem: DownloadQueueItem = {
id: job.id,
projectId: job.projectId,
assetId: job.assetId,
url: job.url,
filename: job.filename,
status: job.status,
priority: job.priority,
retryCount: job.retryCount,
bytesLoaded: job.bytesLoaded,
totalBytes: job.totalBytes,
addedAt: job.addedAt,
};
await OfflineDbManager.addToQueue(queueItem);
}
/**
* Process the download queue
*/
private async processQueue(): Promise<void> {
if (this.isProcessing || this.isPaused) return;
if (this.queue.length === 0 && this.activeDownloads.size === 0) return;
this.isProcessing = true;
while (
!this.isPaused &&
this.activeDownloads.size < this.config.maxConcurrent &&
this.queue.length > 0
) {
const job = this.queue.shift();
if (!job) break;
// Use storageKey as Map key (stable) instead of url (can change on fallback)
this.activeDownloads.set(job.storageKey, job);
this.downloadAsset(job);
}
this.isProcessing = false;
}
/**
* Download a single asset with progress tracking
*/
private async downloadAsset(job: DownloadJob): Promise<void> {
job.status = 'downloading';
job.abortController = new AbortController();
await OfflineDbManager.updateQueueStatus(job.id, 'downloading');
downloadEventBus.emitPreloadStart({
jobId: job.id,
assetId: job.assetId,
url: job.url,
});
try {
const response = await fetch(job.url, {
signal: job.abortController.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const contentLength = response.headers.get('content-length');
job.totalBytes = contentLength ? parseInt(contentLength, 10) : 0;
let blob: Blob;
if (response.body) {
// Stream with progress tracking
const reader = response.body.getReader();
const chunks: BlobPart[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
job.bytesLoaded += value.length;
job.progress =
job.totalBytes > 0
? Math.round((job.bytesLoaded / job.totalBytes) * 100)
: 0;
downloadEventBus.emitPreloadProgress({
jobId: job.id,
progress: job.progress,
bytesLoaded: job.bytesLoaded,
totalBytes: job.totalBytes,
});
// Check if streaming mode and minimum buffer reached
if (
job.streamingMode?.enabled &&
!job.streamingMode.didSignalReady &&
job.bytesLoaded >= job.streamingMode.minBufferBytes
) {
job.streamingMode.didSignalReady = true;
logger.info('[DownloadManager] Streaming ready', {
storageKey: job.storageKey.slice(-50),
bytesLoaded: job.bytesLoaded,
minBuffer: job.streamingMode.minBufferBytes,
});
// Emit streaming ready event
downloadEventBus.emitStreamingReady({
jobId: job.id,
storageKey: job.storageKey,
streamingUrl: job.streamingMode.streamingUrl,
bytesLoaded: job.bytesLoaded,
totalBytes: job.totalBytes,
});
}
// Only update queue progress if persisting
if (job.persist !== false) {
await OfflineDbManager.updateQueueProgress(
job.id,
job.bytesLoaded,
job.totalBytes,
);
}
}
blob = new Blob(chunks, {
type:
response.headers.get('content-type') || 'application/octet-stream',
});
} else {
// No streaming, get blob directly
blob = await response.blob();
job.totalBytes = blob.size;
job.bytesLoaded = blob.size;
job.progress = 100;
}
// Store the asset using canonical storage key
await StorageManager.storeAsset(job.storageKey, blob, {
id: job.assetId,
projectId: job.projectId,
filename: job.filename,
variantType: job.variantType,
assetType: job.assetType,
isPartial: false,
});
// Store ORIGINAL blob directly (bypass cache retrieval to avoid potential corruption)
this.readyBlobs.set(job.storageKey, blob);
logger.info('[DownloadManager] Stored original blob', {
storageKey: job.storageKey.slice(-50),
blobSize: blob.size,
blobType: blob.type,
});
// Always create blob URL for reliable playback on all devices
await this.createBlobUrlFromCache(job.storageKey);
// Mark as completed
job.status = 'completed';
if (job.persist !== false) {
await OfflineDbManager.removeFromQueue(job.id);
}
downloadEventBus.emitPreloadComplete({
jobId: job.id,
assetId: job.assetId,
storageKey: job.storageKey,
});
job.resolve?.();
} catch (error) {
if (job.abortController.signal.aborted) {
// Download was cancelled
job.status = 'paused';
return;
}
const errorMessage =
error instanceof Error ? error.message : 'Unknown error';
// If presigned URL failed and we haven't tried proxy yet, fall back to proxy
if (isPresignedUrl(job.url) && !job.usedProxyFallback) {
markPresignedUrlFailed(job.storageKey);
const proxyUrl = buildProxyUrl(job.storageKey);
logger.info(
'[DownloadManager] Presigned URL failed, retrying with proxy',
{
storageKey: job.storageKey.slice(-50),
error: errorMessage,
},
);
// Update job to use proxy URL and retry immediately
job.url = proxyUrl;
job.usedProxyFallback = true;
job.retryCount = 0; // Reset retry count for proxy attempt
job.bytesLoaded = 0;
job.progress = 0;
job.status = 'queued';
// Retry immediately with proxy URL
this.queue.unshift(job);
this.processQueue();
return;
}
job.retryCount++;
if (job.retryCount < this.config.maxRetries) {
// Retry with backoff
job.status = 'queued';
await OfflineDbManager.updateQueueStatus(job.id, 'queued');
setTimeout(() => {
this.queue.unshift(job);
this.processQueue();
}, this.config.retryDelayMs * job.retryCount);
} else {
// Max retries exceeded
job.status = 'error';
await OfflineDbManager.updateQueueStatus(job.id, 'error', errorMessage);
downloadEventBus.emitPreloadError({
jobId: job.id,
assetId: job.assetId,
error: errorMessage,
});
job.reject?.(error instanceof Error ? error : new Error(errorMessage));
}
} finally {
this.activeDownloads.delete(job.storageKey);
downloadEventBus.emitQueueUpdate();
this.processQueue();
}
}
/**
* Pause all downloads
*/
pauseAll(): void {
this.isPaused = true;
this.activeDownloads.forEach((job) => {
job.abortController?.abort();
job.status = 'paused';
});
downloadEventBus.emitQueueUpdate();
}
/**
* Resume all downloads
*/
resumeAll(): void {
this.isPaused = false;
// Move paused jobs back to queue
this.activeDownloads.forEach((job) => {
if (job.status === 'paused') {
job.status = 'queued';
this.queue.unshift(job);
}
});
this.activeDownloads.clear();
downloadEventBus.emitQueueUpdate();
this.processQueue();
}
/**
* Cancel a specific download
*/
cancelJob(jobId: string): void {
// Check active downloads (keyed by storageKey)
const entries = Array.from(this.activeDownloads.entries());
for (const [storageKey, job] of entries) {
if (job.id === jobId) {
job.abortController?.abort();
this.activeDownloads.delete(storageKey);
OfflineDbManager.removeFromQueue(jobId);
downloadEventBus.emitQueueUpdate();
return;
}
}
// Check queue
const index = this.queue.findIndex((j) => j.id === jobId);
if (index !== -1) {
this.queue.splice(index, 1);
OfflineDbManager.removeFromQueue(jobId);
downloadEventBus.emitQueueUpdate();
}
}
/**
* Cancel all downloads for a project
*/
cancelProjectDownloads(projectId: string): void {
// Cancel active downloads (keyed by storageKey)
const entries = Array.from(this.activeDownloads.entries());
for (const [storageKey, job] of entries) {
if (job.projectId === projectId) {
job.abortController?.abort();
this.activeDownloads.delete(storageKey);
}
}
// Remove from queue
this.queue = this.queue.filter((j) => j.projectId !== projectId);
// Clear from IndexedDB
OfflineDbManager.clearProjectQueue(projectId);
downloadEventBus.emitQueueUpdate();
}
/**
* Clear entire queue
*/
clearQueue(): void {
// Abort all active downloads
this.activeDownloads.forEach((job) => {
job.abortController?.abort();
});
this.activeDownloads.clear();
// Clear queue
this.queue = [];
// Clear IndexedDB
OfflineDbManager.clearQueue();
downloadEventBus.emitQueueUpdate();
}
/**
* Get current queue status
*/
getStatus(): {
queueLength: number;
activeCount: number;
isPaused: boolean;
} {
return {
queueLength: this.queue.length,
activeCount: this.activeDownloads.size,
isPaused: this.isPaused,
};
}
/**
* Restore queue from IndexedDB (for resume after page reload)
*/
async restoreQueue(): Promise<void> {
const pendingItems = await OfflineDbManager.getPendingQueue();
for (const item of pendingItems) {
// Get canonical storage key
const storageKey = extractStoragePath(item.url);
// Skip if already downloaded
const hasAsset = await StorageManager.hasAsset(storageKey);
if (hasAsset) {
await OfflineDbManager.removeFromQueue(item.id);
continue;
}
// Re-add to queue
const job: DownloadJob = {
id: item.id,
assetId: item.assetId,
projectId: item.projectId,
url: item.url,
filename: item.filename,
variantType: 'original',
assetType: 'other',
priority: item.priority,
status: 'queued',
progress: 0,
bytesLoaded: item.bytesLoaded,
totalBytes: item.totalBytes,
retryCount: item.retryCount,
addedAt: item.addedAt,
storageKey,
persist: true,
};
this.queue.push(job);
}
// Sort by priority
this.queue.sort((a, b) => b.priority - a.priority);
if (this.queue.length > 0) {
downloadEventBus.emitQueueUpdate();
this.processQueue();
}
}
/**
* Get a ready blob URL for instant display (O(1) lookup)
*/
getReadyBlobUrl(url: string): string | null {
const storageKey = extractStoragePath(url);
return this.readyBlobUrls.get(storageKey) || null;
}
/**
* Get raw Blob for creating fresh blob URLs (O(1) lookup).
* Used by transitions to avoid decoder state issues with pre-created blob URLs.
* Returns null if blob not cached.
*/
getReadyBlob(url: string): Blob | null {
const storageKey = extractStoragePath(url);
return this.readyBlobs.get(storageKey) || null;
}
/**
* Cache an externally fetched blob and register blob URL for instant lookup.
* Use this when fetching via XHR (e.g., transition playback) to enable caching.
*/
async cacheBlob(
storageKey: string,
blob: Blob,
metadata: {
assetType: AssetType;
projectId?: string;
},
): Promise<string> {
// Store in Cache API / IndexedDB via existing StorageManager
await StorageManager.storeAsset(storageKey, blob, {
id: `cached-${storageKey}`,
projectId: metadata.projectId || '',
filename: storageKey.split('/').pop() || 'asset',
variantType: 'original',
assetType: metadata.assetType,
});
// Store raw Blob for transitions (they need fresh blob URLs each playback)
this.readyBlobs.set(storageKey, blob);
// Create blob URL and register for instant O(1) lookup
const blobUrl = URL.createObjectURL(blob);
this.readyBlobUrls.set(storageKey, blobUrl);
// Emit event for consumers (existing pattern)
downloadEventBus.emitBlobUrlReady({
storageKey,
blobUrl,
});
logger.info('[DownloadManager] Cached external blob', {
storageKey: storageKey.slice(-50),
size: blob.size,
});
return blobUrl;
}
/**
* Create blob URL from cached asset and store in readyBlobUrls map
*/
private async createBlobUrlFromCache(storageKey: string): Promise<void> {
try {
const blob = await StorageManager.getAsset(storageKey);
if (!blob) {
logger.info('[DownloadManager] No blob found for', {
storageKey: storageKey.slice(-50),
});
return;
}
// Store raw Blob for transitions (they need fresh blob URLs each playback
// to avoid decoder state issues that cause video jumping)
// Only set if not already present (original download blob takes priority)
if (!this.readyBlobs.has(storageKey)) {
this.readyBlobs.set(storageKey, blob);
}
const blobUrl = URL.createObjectURL(blob);
// Decode images to prevent white flash
if (this.isImageUrl(storageKey)) {
await this.decodeImage(blobUrl);
}
this.readyBlobUrls.set(storageKey, blobUrl);
// Emit event for consumers
downloadEventBus.emitBlobUrlReady({
storageKey,
blobUrl,
});
logger.info('[DownloadManager] Blob URL ready', {
storageKey: storageKey.slice(-50),
blobUrl: blobUrl.slice(0, 30),
});
} catch (error) {
logger.error('[DownloadManager] Failed to create blob URL', {
storageKey: storageKey.slice(-50),
error: error instanceof Error ? error.message : 'unknown',
});
}
}
/**
* Clear blob URLs cache (call on unmount to prevent memory leaks)
*/
clearBlobUrls(): void {
this.readyBlobUrls.forEach((blobUrl) => URL.revokeObjectURL(blobUrl));
this.readyBlobUrls.clear();
this.readyBlobs.clear();
}
/**
* Clear blob URLs for specific storage keys (call when deleting project offline data)
*/
clearBlobUrlsForKeys(storageKeys: string[]): void {
for (const key of storageKeys) {
const blobUrl = this.readyBlobUrls.get(key);
if (blobUrl) {
URL.revokeObjectURL(blobUrl);
this.readyBlobUrls.delete(key);
}
this.readyBlobs.delete(key);
}
}
/**
* Check if URL is an image based on extension
*/
private isImageUrl(url: string): boolean {
const imageExtensions = [
'.jpg',
'.jpeg',
'.png',
'.gif',
'.webp',
'.avif',
'.svg',
];
const lowerUrl = url.toLowerCase();
return imageExtensions.some((ext) => lowerUrl.includes(ext));
}
/**
* Decode image for instant display (prevents white flash)
*/
private decodeImage(blobUrl: string): Promise<void> {
return new Promise((resolve) => {
const img = new Image();
img.src = blobUrl;
if (typeof img.decode === 'function') {
img
.decode()
.then(() => resolve())
.catch(() => resolve());
} else {
img.onload = () => resolve();
img.onerror = () => resolve();
}
});
}
/**
* Check if device is mobile (for streaming buffer size)
*/
private isMobile(): boolean {
if (typeof navigator === 'undefined') return false;
return /iPhone|iPad|iPod|Android/i.test(navigator.userAgent);
}
/**
* Get minimum buffer bytes based on device type
*/
private getMinBufferBytes(): number {
if (this.isMobile()) {
return PRELOAD_CONFIG.streaming.mobile?.minBufferBytes || 2 * 1024 * 1024;
}
return PRELOAD_CONFIG.streaming.minBufferBytes;
}
/**
* Check if asset is cached or get streaming URL for first playback.
* Returns cached blob URL for instant O(1) playback, or presigned URL for streaming.
*/
getStreamingUrlIfNeeded(
storageKey: string,
presignedUrl: string,
): { url: string; isFromCache: boolean } {
// Fully cached - use blob URL
const cachedUrl = this.readyBlobUrls.get(storageKey);
if (cachedUrl) {
return { url: cachedUrl, isFromCache: true };
}
// Not cached - use presigned URL for streaming
return { url: presignedUrl, isFromCache: false };
}
}
// Singleton instance
export const downloadManager = new DownloadManagerClass();