409 lines
13 KiB
Python
409 lines
13 KiB
Python
"""
|
||
素材管理 API 路由
|
||
提供素材上传、列表、删除功能
|
||
"""
|
||
import os
|
||
import time
|
||
import logging
|
||
from typing import List, Optional
|
||
from pathlib import Path
|
||
|
||
from fastapi import APIRouter, HTTPException, UploadFile, File
|
||
from fastapi.responses import FileResponse, RedirectResponse
|
||
from pydantic import BaseModel
|
||
|
||
import config
|
||
from modules.db_manager import db
|
||
from modules.legacy_path_mapper import map_legacy_local_path
|
||
|
||
logger = logging.getLogger(__name__)
|
||
router = APIRouter()
|
||
|
||
|
||
# ============================================================
|
||
# Pydantic Models
|
||
# ============================================================
|
||
|
||
class AssetInfo(BaseModel):
|
||
id: int
|
||
project_id: str
|
||
scene_id: int
|
||
asset_type: str
|
||
status: str
|
||
local_path: Optional[str] = None
|
||
url: Optional[str] = None
|
||
metadata: dict = {}
|
||
|
||
|
||
class UploadResponse(BaseModel):
|
||
success: bool
|
||
filename: str
|
||
path: str
|
||
url: str
|
||
|
||
|
||
class StickerItem(BaseModel):
|
||
id: str
|
||
name: str
|
||
url: str
|
||
kind: str = "builtin" # builtin/custom
|
||
tags: List[str] = []
|
||
category: Optional[str] = None
|
||
license: Optional[str] = None
|
||
attribution: Optional[str] = None
|
||
|
||
|
||
# ============================================================
|
||
# API Endpoints
|
||
# ============================================================
|
||
|
||
@router.get("/list/{project_id}", response_model=List[AssetInfo])
|
||
async def list_assets(
|
||
project_id: str,
|
||
asset_type: Optional[str] = None
|
||
):
|
||
"""列出项目的所有素材"""
|
||
assets = db.get_assets(project_id, asset_type)
|
||
|
||
result = []
|
||
for asset in assets:
|
||
# 添加 URL
|
||
url = None
|
||
local_path = asset.get("local_path")
|
||
remote_url = asset.get("remote_url")
|
||
|
||
# 1) 直接可见路径:走 file proxy
|
||
if local_path and os.path.exists(local_path):
|
||
url = f"/api/assets/file/{asset.get('id', 0)}"
|
||
# 1.5) 远端 URL(例如 R2/CDN)。本地缺失时也要能预览
|
||
elif remote_url and isinstance(remote_url, str) and remote_url.strip():
|
||
url = remote_url.strip()
|
||
else:
|
||
# 2) legacy 映射(/root/video-flow -> /legacy/* 或 /app/*)
|
||
mapped_path, mapped_url = map_legacy_local_path(local_path)
|
||
if mapped_path and os.path.exists(mapped_path):
|
||
# 静态 URL 不一定覆盖子目录;优先静态(若提供),否则走 file proxy
|
||
url = mapped_url or f"/api/assets/file/{asset.get('id', 0)}"
|
||
else:
|
||
# 3) 兜底:某些历史数据用 asset_id 命名(例如 /legacy/temp/1.png)
|
||
aid = int(asset.get("id") or 0)
|
||
if aid > 0:
|
||
# 优先按原扩展名尝试
|
||
ext = Path(local_path).suffix.lower() if isinstance(local_path, str) else ""
|
||
candidates = []
|
||
if ext:
|
||
candidates.append((f"/legacy/temp/{aid}{ext}", f"/static/legacy-temp/{aid}{ext}"))
|
||
# 常见图片扩展
|
||
for e in [".png", ".jpg", ".jpeg", ".webp"]:
|
||
candidates.append((f"/legacy/temp/{aid}{e}", f"/static/legacy-temp/{aid}{e}"))
|
||
for p, u in candidates:
|
||
if os.path.exists(p):
|
||
url = u
|
||
break
|
||
|
||
result.append(AssetInfo(
|
||
id=asset.get("id", 0),
|
||
project_id=asset["project_id"],
|
||
scene_id=asset["scene_id"],
|
||
asset_type=asset["asset_type"],
|
||
status=asset["status"],
|
||
local_path=asset.get("local_path"),
|
||
url=url,
|
||
metadata=asset.get("metadata", {})
|
||
))
|
||
|
||
return result
|
||
|
||
|
||
@router.get("/file/{asset_id}")
|
||
async def get_asset_file(asset_id: int):
|
||
"""按 asset_id 直接返回文件(用于前端预览/拖拽素材)。"""
|
||
a = db.get_asset_by_id(int(asset_id))
|
||
if not a:
|
||
raise HTTPException(status_code=404, detail="素材不存在")
|
||
local_path = a.get("local_path")
|
||
remote_url = a.get("remote_url")
|
||
|
||
# 1) 原路径
|
||
if local_path and os.path.exists(local_path):
|
||
return FileResponse(path=local_path, filename=Path(local_path).name)
|
||
|
||
# 1.5) 远端 URL:本地缺失时直接重定向(避免前端 404 黑屏)
|
||
if remote_url and isinstance(remote_url, str) and remote_url.strip():
|
||
return RedirectResponse(url=remote_url.strip(), status_code=307)
|
||
|
||
# 2) legacy 映射
|
||
mapped_path, _ = map_legacy_local_path(local_path)
|
||
if mapped_path and os.path.exists(mapped_path):
|
||
return FileResponse(path=mapped_path, filename=Path(mapped_path).name)
|
||
|
||
# 3) asset_id 命名兜底(常见于历史图片:/legacy/temp/{id}.png)
|
||
aid = int(a.get("id") or 0)
|
||
ext = Path(local_path).suffix.lower() if isinstance(local_path, str) else ""
|
||
candidates = []
|
||
if aid > 0:
|
||
if ext:
|
||
candidates.append(f"/legacy/temp/{aid}{ext}")
|
||
for e in [".png", ".jpg", ".jpeg", ".webp", ".mp4", ".mov", ".m4a", ".mp3"]:
|
||
candidates.append(f"/legacy/temp/{aid}{e}")
|
||
for p in candidates:
|
||
if os.path.exists(p):
|
||
return FileResponse(path=p, filename=Path(p).name)
|
||
|
||
raise HTTPException(status_code=404, detail="素材文件不存在")
|
||
|
||
|
||
@router.post("/upload", response_model=UploadResponse)
|
||
async def upload_asset(
|
||
file: UploadFile = File(...),
|
||
asset_type: str = "custom"
|
||
):
|
||
"""
|
||
上传自定义素材
|
||
支持图片、视频、音频
|
||
"""
|
||
# 验证文件类型
|
||
allowed_extensions = {
|
||
"image": [".jpg", ".jpeg", ".png", ".gif", ".webp"],
|
||
"video": [".mp4", ".mov", ".avi", ".mkv", ".webm"],
|
||
"audio": [".mp3", ".wav", ".m4a", ".aac"],
|
||
"custom": [".jpg", ".jpeg", ".png", ".gif", ".webp",
|
||
".mp4", ".mov", ".avi", ".mkv", ".webm",
|
||
".mp3", ".wav", ".m4a", ".aac"]
|
||
}
|
||
|
||
ext = Path(file.filename).suffix.lower()
|
||
if ext not in allowed_extensions.get(asset_type, allowed_extensions["custom"]):
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail=f"不支持的文件类型: {ext}"
|
||
)
|
||
|
||
# 生成唯一文件名
|
||
timestamp = int(time.time())
|
||
new_filename = f"{asset_type}_{timestamp}{ext}"
|
||
file_path = config.TEMP_DIR / new_filename
|
||
|
||
# 保存文件
|
||
try:
|
||
content = await file.read()
|
||
with open(file_path, "wb") as f:
|
||
f.write(content)
|
||
|
||
url = f"/static/temp/{new_filename}"
|
||
|
||
return UploadResponse(
|
||
success=True,
|
||
filename=new_filename,
|
||
path=str(file_path),
|
||
url=url
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"文件上传失败: {e}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.get("/download/{filename}")
|
||
async def download_asset(filename: str):
|
||
"""下载素材文件"""
|
||
# 查找文件
|
||
for directory in [config.OUTPUT_DIR, config.TEMP_DIR]:
|
||
file_path = directory / filename
|
||
if file_path.exists():
|
||
return FileResponse(
|
||
path=str(file_path),
|
||
filename=filename,
|
||
media_type="application/octet-stream"
|
||
)
|
||
|
||
raise HTTPException(status_code=404, detail="文件不存在")
|
||
|
||
|
||
@router.delete("/{asset_id}")
|
||
async def delete_asset(asset_id: int):
|
||
"""删除素材"""
|
||
# TODO: 实现真正的删除逻辑
|
||
logger.info(f"删除素材: {asset_id}")
|
||
return {"message": "素材已删除", "id": asset_id}
|
||
|
||
|
||
@router.get("/bgm")
|
||
async def list_bgm():
|
||
"""列出可用的 BGM"""
|
||
bgm_dir = config.ASSETS_DIR / "bgm"
|
||
if not bgm_dir.exists():
|
||
return []
|
||
|
||
bgm_files = []
|
||
for f in bgm_dir.iterdir():
|
||
if f.suffix.lower() in ['.mp3', '.mp4', '.m4a', '.wav']:
|
||
bgm_files.append({
|
||
"id": f.name,
|
||
"name": f.stem,
|
||
"path": str(f),
|
||
"url": f"/static/assets/bgm/{f.name}"
|
||
})
|
||
|
||
return bgm_files
|
||
|
||
|
||
@router.get("/fonts")
|
||
async def list_fonts():
|
||
"""列出可用的字体"""
|
||
fonts = []
|
||
|
||
# 项目字体
|
||
if config.FONTS_DIR.exists():
|
||
for f in config.FONTS_DIR.iterdir():
|
||
if f.suffix.lower() in ['.ttf', '.otf', '.ttc']:
|
||
fonts.append({
|
||
"id": f.name,
|
||
"name": f.stem,
|
||
"path": str(f),
|
||
"url": f"/static/fonts/{f.name}",
|
||
# 前端预览可直接使用 id 作为 CSS font-family(配合 FontFace 动态加载)
|
||
"css_family": f.name
|
||
})
|
||
|
||
# 添加系统字体选项
|
||
fonts.append({
|
||
"id": "system-pingfang",
|
||
"name": "苹方 (系统)",
|
||
"path": "/System/Library/Fonts/PingFang.ttc",
|
||
"url": None,
|
||
"css_family": "PingFang SC"
|
||
})
|
||
|
||
return fonts
|
||
|
||
|
||
@router.get("/stickers")
|
||
async def list_stickers():
|
||
"""列出贴纸(PNG/SVG),用于左侧贴纸库。"""
|
||
builtin_dir = config.ASSETS_DIR / "stickers_builtin"
|
||
custom_dir = config.ASSETS_DIR / "stickers_custom"
|
||
custom_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
items: List[dict] = []
|
||
|
||
# builtin: 优先读 index.json(支持分类/标签/授权说明)
|
||
index_path = builtin_dir / "index.json"
|
||
if index_path.exists():
|
||
try:
|
||
import json
|
||
data = json.loads(index_path.read_text(encoding="utf-8"))
|
||
pack = data.get("pack") or {}
|
||
for cat in (data.get("categories") or []):
|
||
cat_name = cat.get("name") or cat.get("id") or "未分类"
|
||
for it in (cat.get("items") or []):
|
||
f = str(it.get("file") or "")
|
||
if not f:
|
||
continue
|
||
url = f"/static/assets/stickers_builtin/{f}"
|
||
items.append({
|
||
"id": str(it.get("id") or f),
|
||
"name": str(it.get("name") or it.get("id") or f),
|
||
"url": url,
|
||
"kind": "builtin",
|
||
"tags": it.get("tags") or [],
|
||
"category": cat_name,
|
||
"license": pack.get("license"),
|
||
"attribution": pack.get("attribution"),
|
||
})
|
||
except Exception as e:
|
||
logger.warning(f"Failed to read stickers_builtin/index.json: {e}")
|
||
|
||
# custom: 扫描目录
|
||
if custom_dir.exists():
|
||
for f in custom_dir.iterdir():
|
||
if f.suffix.lower() not in [".png", ".svg", ".webp"]:
|
||
continue
|
||
items.append({
|
||
"id": f"custom-{f.name}",
|
||
"name": f.stem,
|
||
"url": f"/static/assets/stickers_custom/{f.name}",
|
||
"kind": "custom",
|
||
"tags": [],
|
||
"category": "自定义",
|
||
"license": None,
|
||
"attribution": None,
|
||
})
|
||
|
||
return items
|
||
|
||
|
||
@router.post("/stickers/upload")
|
||
async def upload_sticker(file: UploadFile = File(...)):
|
||
"""上传贴纸(png/svg/webp)到 assets/stickers_custom。"""
|
||
ext = Path(file.filename).suffix.lower()
|
||
if ext not in [".png", ".svg", ".webp"]:
|
||
raise HTTPException(status_code=400, detail=f"不支持的贴纸类型: {ext}")
|
||
target_dir = config.ASSETS_DIR / "stickers_custom"
|
||
target_dir.mkdir(parents=True, exist_ok=True)
|
||
safe_name = Path(file.filename).name
|
||
target = target_dir / safe_name
|
||
if target.exists():
|
||
ts = int(time.time())
|
||
target = target_dir / f"{Path(safe_name).stem}_{ts}{ext}"
|
||
content = await file.read()
|
||
with open(target, "wb") as f:
|
||
f.write(content)
|
||
return {
|
||
"success": True,
|
||
"id": f"custom-{target.name}",
|
||
"name": target.stem,
|
||
"url": f"/static/assets/stickers_custom/{target.name}",
|
||
}
|
||
|
||
|
||
@router.post("/fonts/upload")
|
||
async def upload_font(file: UploadFile = File(...)):
|
||
"""上传字体(ttf/otf/ttc),保存到 assets/fonts,供字幕/花字使用。"""
|
||
ext = Path(file.filename).suffix.lower()
|
||
if ext not in [".ttf", ".otf", ".ttc"]:
|
||
raise HTTPException(status_code=400, detail=f"不支持的字体类型: {ext}")
|
||
|
||
config.FONTS_DIR.mkdir(parents=True, exist_ok=True)
|
||
safe_name = Path(file.filename).name
|
||
target = config.FONTS_DIR / safe_name
|
||
|
||
# 若同名存在,则加时间戳避免覆盖
|
||
if target.exists():
|
||
ts = int(time.time())
|
||
target = config.FONTS_DIR / f"{Path(safe_name).stem}_{ts}{ext}"
|
||
|
||
try:
|
||
content = await file.read()
|
||
if not content or len(content) < 1024:
|
||
raise HTTPException(status_code=400, detail="字体文件为空或损坏")
|
||
with open(target, "wb") as f:
|
||
f.write(content)
|
||
return {
|
||
"success": True,
|
||
"id": target.name,
|
||
"name": target.stem,
|
||
"path": str(target),
|
||
"url": f"/static/fonts/{target.name}",
|
||
"css_family": target.name,
|
||
}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"字体上传失败: {e}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|