from pathlib import Path import asyncio, json from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware from tinydb import TinyDB, Query from datetime import datetime from typing import Any, Dict, List app = FastAPI(title="大屏媒体轮播系统") # ===================== 模板加载 ===================== TEMPLATE_DIR = Path(__file__).parent / "templates" def _load_template(name: str) -> str: return (TEMPLATE_DIR / name).read_text(encoding="utf-8") SCREEN_HTML = _load_template("screen.html") ADMIN_HTML = _load_template("admin.html") # ===================== SSE 事件管理器 ===================== class EventManager: def __init__(self): self._queues: list[asyncio.Queue] = [] def subscribe(self, q: asyncio.Queue): self._queues.append(q) def unsubscribe(self, q: asyncio.Queue): try: self._queues.remove(q) except ValueError: pass def broadcast(self, event: dict): data = json.dumps(event) dead = [] for q in self._queues: try: q.put_nowait(data) except asyncio.QueueFull: dead.append(q) for q in dead: self.unsubscribe(q) events = EventManager() # ===================== 播放状态(内存) ===================== playback_state: dict = { "status": "idle", # playing | paused | idle "index": 0, "name": "", "type": "", } # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ===================== 配置 ===================== MEDIA_DIR = Path.home() / "Downloads" / "Media" MEDIA_DIR.mkdir(exist_ok=True, parents=True) DB = TinyDB("db.json") FILES_TABLE = DB.table("files") PLAYLIST_TABLE = DB.table("playlist") SETTINGS_TABLE = DB.table("settings") URL_MEDIA_TABLE = DB.table("url_media") User = Query() FileQuery = Query() UrlMediaQuery = Query() ALLOWED_EXTENSIONS = {".mp4", ".mkv", ".avi", ".jpg", ".jpeg", ".png"} # ===================== 初始化 ===================== DEFAULT_SETTINGS = { "username": "admin", "password": "123456", "volume": 80, "auto_play": True, "play_mode": "sequential", "image_duration": 5, } if not SETTINGS_TABLE.search(User.username == "admin"): SETTINGS_TABLE.insert(DEFAULT_SETTINGS) else: # 补齐缺失的默认字段 current = SETTINGS_TABLE.get(User.username == "admin") updated = False for key, val in DEFAULT_SETTINGS.items(): if key not in current: current[key] = val updated = True if updated: SETTINGS_TABLE.update(current, User.username == "admin") # ===================== 前端页面 ===================== @app.get("/", response_class=HTMLResponse) async def screen_page(): return SCREEN_HTML @app.get("/admin", response_class=HTMLResponse) async def admin_page(): return ADMIN_HTML # ===================== 文件访问 ===================== from fastapi.responses import FileResponse @app.get("/file/{path:path}") async def get_file(path: str): target = MEDIA_DIR / path if not target.exists() or not target.is_file(): raise HTTPException(404) return FileResponse(target) # ===================== API ===================== @app.post("/api/login") async def login(username: str = Form(...), password: str = Form(...)): admin = SETTINGS_TABLE.get(User.username == "admin") if admin and admin["password"] == password: return {"success": True} return {"success": False} @app.get("/api/settings") async def get_settings(): s = SETTINGS_TABLE.get(User.username == "admin") return {k: s[k] for k in DEFAULT_SETTINGS if k in s} @app.post("/api/settings") async def update_settings(data: dict): """统一设置更新,支持 volume/play_mode/image_duration""" allowed = {"volume", "play_mode", "image_duration"} update = {k: v for k, v in data.items() if k in allowed} if update: SETTINGS_TABLE.update(update, User.username == "admin") events.broadcast({"action": "settings_changed", **update}) return {"ok": True} @app.get("/media") async def get_media_files(): files = [] # 本地文件 for p in MEDIA_DIR.rglob("*"): if not p.is_file(): continue ext = p.suffix.lower() if ext not in ALLOWED_EXTENSIONS: continue rel = str(p.relative_to(MEDIA_DIR)) t = "video" if ext in [".mp4",".mkv",".avi"] else "image" files.append({ "name": p.name, "relative_path": rel, "type": t, "url": f"/file/{rel}", "source": "local", }) # URL 媒体(持久化在 url_media 表中) for item in URL_MEDIA_TABLE.all(): files.append({ "name": item["name"], "relative_path": item["url"], "type": item["type"], "url": item["url"], "source": "url", }) return {"files": files} @app.post("/upload") async def upload(file: UploadFile = File(...)): ext = Path(file.filename).suffix.lower() if ext not in ALLOWED_EXTENSIONS: raise HTTPException(400) ts = datetime.now().strftime("%Y%m%d%H%M%S") name = f"{Path(file.filename).stem}_{ts}{ext}" path = MEDIA_DIR / name with open(path, "wb") as f: f.write(await file.read()) return {"ok": True} @app.post("/api/delete") async def delete_file(data: dict): path = data["path"] if path.startswith(("http://", "https://")): # 删除 URL 媒体及其播放列表引用 URL_MEDIA_TABLE.remove(UrlMediaQuery.url == path) else: p = MEDIA_DIR / path if p.exists() and p.is_file(): p.unlink() PLAYLIST_TABLE.remove(FileQuery.path == path) events.broadcast({"action": "playlist_changed"}) return {"ok": True} @app.get("/api/playlist") async def get_playlist(): items = PLAYLIST_TABLE.all() admin = SETTINGS_TABLE.get(User.username == "admin") files = [] for item in items: if item.get("source") == "url": ext = Path(item["path"]).suffix.lower() files.append({ "name": item.get("name", item["path"]), "relative_path": item["path"], "type": "video" if ext in [".mp4",".mkv",".avi"] else "image", "source": "url", }) else: p = MEDIA_DIR / item["path"] if p.exists(): ext = p.suffix.lower() files.append({ "name": p.name, "relative_path": item["path"], "type": "video" if ext in [".mp4",".mkv",".avi"] else "image", "source": "local", }) return { "files": files, "volume": admin["volume"], "play_mode": admin.get("play_mode", "sequential"), "image_duration": admin.get("image_duration", 5), } @app.post("/api/playlist/add") async def add_playlist(data: dict): path = data["path"] if not PLAYLIST_TABLE.contains(FileQuery.path == path): item: dict = {"path": path} if path.startswith(("http://", "https://")): item["source"] = "url" # 从 URL 媒体库中获取名称 um = URL_MEDIA_TABLE.get(UrlMediaQuery.url == path) if um: item["name"] = um["name"] PLAYLIST_TABLE.insert(item) events.broadcast({"action": "playlist_changed"}) return {"ok": True} @app.post("/api/playlist/remove") async def remove_playlist(data: dict): PLAYLIST_TABLE.remove(FileQuery.path == data["path"]) events.broadcast({"action": "playlist_changed"}) return {"ok": True} @app.post("/api/media/add-url") async def add_url_media(data: dict): """添加 URL 到媒体库(持久化),不自动加入播放列表""" url = data.get("url", "").strip() if not url: raise HTTPException(400, "url 不能为空") if URL_MEDIA_TABLE.contains(UrlMediaQuery.url == url): return {"ok": True, "duplicate": True} ext = Path(url.split("?")[0]).suffix.lower() name = data.get("name", "").strip() or url.rsplit("/", 1)[-1].split("?")[0] URL_MEDIA_TABLE.insert({ "url": url, "name": name, "type": "video" if ext in [".mp4",".mkv",".avi"] else "image", "added_at": datetime.now().isoformat(), }) return {"ok": True} @app.get("/api/files") async def list_files_with_url(): """输出媒体文件夹中所有文件的 URL""" files = [] for p in sorted(MEDIA_DIR.rglob("*")): if not p.is_file(): continue ext = p.suffix.lower() if ext not in ALLOWED_EXTENSIONS: continue rel = str(p.relative_to(MEDIA_DIR)) t = "video" if ext in [".mp4",".mkv",".avi"] else "image" files.append({ "name": p.name, "type": t, "url": f"/file/{rel}", }) return {"files": files} @app.get("/api/events") async def sse_events(request: Request): """SSE 端点,向后端推送实时播放控制指令""" q: asyncio.Queue = asyncio.Queue(maxsize=32) events.subscribe(q) async def generate(): try: while True: if await request.is_disconnected(): break try: data = await asyncio.wait_for(q.get(), timeout=15) yield f"data: {data}\n\n" except asyncio.TimeoutError: yield ": keepalive\n\n" finally: events.unsubscribe(q) return StreamingResponse(generate(), media_type="text/event-stream") @app.post("/api/control") async def playback_control(data: dict): """后端控制播放:{action: 'play'|'pause'|'next'|'prev'}""" action = data.get("action", "") if action not in ("play", "pause", "next", "prev"): raise HTTPException(400, "action 必须是 play/pause/next/prev") SETTINGS_TABLE.update({"control_action": action}, User.username == "admin") events.broadcast({"action": action}) # 更新本地状态(乐观) if action == "play": playback_state["status"] = "playing" elif action == "pause": playback_state["status"] = "paused" events.broadcast({"action": "state_update", "state": playback_state}) return {"ok": True, "action": action} @app.get("/api/state") async def get_state(): """获取当前播放状态""" return {"state": playback_state} @app.post("/api/state") async def update_state(data: dict): """前端上报播放状态变更""" state = data.get("state", {}) if "status" in state: playback_state["status"] = state["status"] if "index" in state: playback_state["index"] = state["index"] if "name" in state: playback_state["name"] = state["name"] if "type" in state: playback_state["type"] = state["type"] events.broadcast({"action": "state_update", "state": playback_state}) return {"ok": True} if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=True )