# app/services/seaweedfs_service.py import httpx from fastapi import UploadFile from app.core.biz_exception import BizLogicError from app.core.logger import logger class SeaweedFSService: MASTER_URL = "http://127.0.0.1:9333" # SeaweedFS master 地址 DOWNLOAD_GATEWAY = "http://45.137.220.138:8888/api/resource/download_file" @classmethod async def upload(cls, file: UploadFile): """上传文件到 SeaweedFS(异步)""" try: async with httpx.AsyncClient(timeout=10) as client: # 1️⃣ 获取 volume assign_resp = await client.get(f"{cls.MASTER_URL}/dir/assign") assign_resp.raise_for_status() assign_data = assign_resp.json() fid = assign_data["fid"] public_url = assign_data["publicUrl"] upload_url = f"http://{public_url}/{fid}" download_url = f"{cls.DOWNLOAD_GATEWAY}?fid={fid}" # 2️⃣ 上传文件 files = { "file": ( file.filename, await file.read(), file.content_type, ) } upload_resp = await client.post(upload_url, files=files) if upload_resp.status_code == 201: return { "fid": fid, "url": download_url, } raise BizLogicError(f"file upload error: {upload_resp.text}") except Exception as e: logger.exception("SeaweedFS upload failed") raise BizLogicError(f"file upload exception: {e}") @classmethod async def get(cls, fid: str): """根据 fid 读取文件(异步)""" try: volume_id = fid.split(",")[0] async with httpx.AsyncClient(timeout=10) as client: lookup_resp = await client.get( f"{cls.MASTER_URL}/dir/lookup", params={"volumeId": volume_id}, ) lookup_resp.raise_for_status() data = lookup_resp.json() if not data.get("locations"): return None public_url = data["locations"][0]["publicUrl"] file_url = f"http://{public_url}/{fid}" file_resp = await client.get(file_url) if file_resp.status_code == 200: return ( file_resp.content, file_resp.headers.get( "Content-Type", "application/octet-stream" ), ) return None except Exception as e: logger.exception(f"SeaweedFS get failed, reason={e}") return None @classmethod async def delete(cls, fid: str) -> bool: """删除文件(异步)""" try: volume_id = fid.split(",")[0] async with httpx.AsyncClient(timeout=10) as client: lookup_resp = await client.get( f"{cls.MASTER_URL}/dir/lookup", params={"volumeId": volume_id}, ) lookup_resp.raise_for_status() data = lookup_resp.json() if not data.get("locations"): return False public_url = data["locations"][0]["publicUrl"] delete_url = f"http://{public_url}/{fid}" del_resp = await client.delete(delete_url) return del_resp.status_code == 202 except Exception as e: logger.exception(f"SeaweedFS delete failed, reason={e}") return False