import os import aiohttp from fastapi import UploadFile from app.core.biz_exception import BizLogicError from app.core.logger import logger class SeaweedFSService: # 1. 优先读取环境变量,默认使用 Docker 内部网络的服务名 # 只要后端和 SeaweedFS 在同一个 Docker 网络 (visafly-net),这个地址就是通的 MASTER_URL = os.getenv("SEAWEEDFS_MASTER_URL", "http://seaweedfs-master:9333") # 文件下载的公网入口(经过你的 Nginx + Backend 代理) DOWNLOAD_GATEWAY = "https://visafly.top/api/resource/download_file" @classmethod async def upload(cls, file: UploadFile): """ 上传文件到 SeaweedFS """ try: timeout = aiohttp.ClientTimeout(total=10.0) async with aiohttp.ClientSession(timeout=timeout) as client: # 1️⃣ 向 Master 申请 fid # logger.info(f"Connecting to SeaweedFS Master: {cls.MASTER_URL}") try: async with client.get(f"{cls.MASTER_URL}/dir/assign") as assign_resp: if assign_resp.status >= 300: raise BizLogicError("Storage service unavailable") assign_data = await assign_resp.json() except Exception as e: logger.error(f"Failed to connect to Master at {cls.MASTER_URL}. Error: {e}") raise BizLogicError("Storage service unavailable") fid = assign_data.get("fid") # 2️⃣ 获取上传地址 # 在 Docker 共享网络模式下,Master 返回的 publicUrl 就是 "seaweedfs-volume:8080" # 后端可以直接解析并访问这个地址,无需任何修改 public_url = assign_data.get("publicUrl") if not public_url: raise BizLogicError("No volume server available") upload_url = f"http://{public_url}/{fid}" download_url = f"{cls.DOWNLOAD_GATEWAY}?fid={fid}" # 3️⃣ 上传文件数据 # 读取文件内容 file_content = await file.read() files = { "file": ( file.filename, file_content, file.content_type or "application/octet-stream", ) } # logger.info(f"Uploading to Volume Node: {upload_url}") form = aiohttp.FormData() form.add_field( "file", file_content, filename=file.filename, content_type=file.content_type or "application/octet-stream", ) async with client.post(upload_url, data=form) as upload_resp: status = upload_resp.status resp_text = await upload_resp.text() if status == 201: return { "fid": fid, "url": download_url, "size": len(file_content) } logger.error(f"Upload failed. Status: {status}, Body: {resp_text}") raise BizLogicError(f"Storage upload error: {resp_text}") except BizLogicError as e: raise e except Exception as e: logger.exception("SeaweedFS upload unexpected error") raise BizLogicError(f"Upload exception: {str(e)}") @classmethod async def get(cls, fid: str): """ 根据 fid 读取文件流 """ try: if not fid: return None # fid 格式通常是 "3,016a...",逗号前是 volumeId volume_id = fid.split(",")[0] timeout = aiohttp.ClientTimeout(total=10.0) async with aiohttp.ClientSession(timeout=timeout) as client: # 1️⃣ 查询文件位置 async with client.get( f"{cls.MASTER_URL}/dir/lookup", params={"volumeId": volume_id}, ) as lookup_resp: if lookup_resp.status != 200: text = await lookup_resp.text() logger.warning(f"Lookup failed for fid {fid}: {text}") return None data = await lookup_resp.json() locations = data.get("locations") if not locations: return None # 2️⃣ 直接请求 Volume Server # 同样直接使用 Docker 内部地址 public_url = locations[0]["publicUrl"] file_url = f"http://{public_url}/{fid}" async with client.get(file_url) as file_resp: if file_resp.status == 200: content = await file_resp.read() return ( content, file_resp.headers.get("Content-Type", "application/octet-stream"), ) return None except Exception as e: logger.exception(f"SeaweedFS get failed for fid {fid}") return None @classmethod async def delete(cls, fid: str) -> bool: """ 删除文件 """ try: if not fid: return False volume_id = fid.split(",")[0] timeout = aiohttp.ClientTimeout(total=5.0) async with aiohttp.ClientSession(timeout=timeout) as client: # 1️⃣ 查找位置 async with client.get( f"{cls.MASTER_URL}/dir/lookup", params={"volumeId": volume_id}, ) as lookup_resp: if lookup_resp.status != 200: return False data = await lookup_resp.json() locations = data.get("locations") if not locations: return False # 2️⃣ 发送删除请求 public_url = locations[0]["publicUrl"] delete_url = f"http://{public_url}/{fid}" async with client.delete(delete_url) as del_resp: # SeaweedFS 删除成功通常返回 200 或 202 return del_resp.status in [200, 202] except Exception as e: logger.error(f"SeaweedFS delete failed for fid {fid}: {e}") return False