import os import httpx from fastapi import UploadFile from app.core.biz_exception import BizLogicError from app.core.logger import logger class SeaweedFSService: # 1. 优先读取环境变量,默认使用 Docker 内部网络的服务名 # 只要后端和 SeaweedFS 在同一个 Docker 网络 (visafly-net),这个地址就是通的 MASTER_URL = os.getenv("SEAWEEDFS_MASTER_URL", "http://seaweedfs-master:9333") # 文件下载的公网入口(经过你的 Nginx + Backend 代理) DOWNLOAD_GATEWAY = "https://visafly.top/api/resource/download_file" @classmethod async def upload(cls, file: UploadFile): """ 上传文件到 SeaweedFS """ try: async with httpx.AsyncClient(timeout=10.0) as client: # 1️⃣ 向 Master 申请 fid # logger.info(f"Connecting to SeaweedFS Master: {cls.MASTER_URL}") try: assign_resp = await client.get(f"{cls.MASTER_URL}/dir/assign") assign_resp.raise_for_status() except Exception as e: logger.error(f"Failed to connect to Master at {cls.MASTER_URL}. Error: {e}") raise BizLogicError("Storage service unavailable") assign_data = assign_resp.json() fid = assign_data.get("fid") # 2️⃣ 获取上传地址 # 在 Docker 共享网络模式下,Master 返回的 publicUrl 就是 "seaweedfs-volume:8080" # 后端可以直接解析并访问这个地址,无需任何修改 public_url = assign_data.get("publicUrl") if not public_url: raise BizLogicError("No volume server available") upload_url = f"http://{public_url}/{fid}" download_url = f"{cls.DOWNLOAD_GATEWAY}?fid={fid}" # 3️⃣ 上传文件数据 # 读取文件内容 file_content = await file.read() files = { "file": ( file.filename, file_content, file.content_type or "application/octet-stream", ) } # logger.info(f"Uploading to Volume Node: {upload_url}") upload_resp = await client.post(upload_url, files=files) if upload_resp.status_code == 201: return { "fid": fid, "url": download_url, "size": len(file_content) } logger.error(f"Upload failed. Status: {upload_resp.status_code}, Body: {upload_resp.text}") raise BizLogicError(f"Storage upload error: {upload_resp.text}") except BizLogicError as e: raise e except Exception as e: logger.exception("SeaweedFS upload unexpected error") raise BizLogicError(f"Upload exception: {str(e)}") @classmethod async def get(cls, fid: str): """ 根据 fid 读取文件流 """ try: if not fid: return None # fid 格式通常是 "3,016a...",逗号前是 volumeId volume_id = fid.split(",")[0] async with httpx.AsyncClient(timeout=10.0) as client: # 1️⃣ 查询文件位置 lookup_resp = await client.get( f"{cls.MASTER_URL}/dir/lookup", params={"volumeId": volume_id}, ) if lookup_resp.status_code != 200: logger.warning(f"Lookup failed for fid {fid}: {lookup_resp.text}") return None data = lookup_resp.json() locations = data.get("locations") if not locations: return None # 2️⃣ 直接请求 Volume Server # 同样直接使用 Docker 内部地址 public_url = locations[0]["publicUrl"] file_url = f"http://{public_url}/{fid}" file_resp = await client.get(file_url) if file_resp.status_code == 200: return ( file_resp.content, file_resp.headers.get("Content-Type", "application/octet-stream"), ) return None except Exception as e: logger.exception(f"SeaweedFS get failed for fid {fid}") return None @classmethod async def delete(cls, fid: str) -> bool: """ 删除文件 """ try: if not fid: return False volume_id = fid.split(",")[0] async with httpx.AsyncClient(timeout=5.0) as client: # 1️⃣ 查找位置 lookup_resp = await client.get( f"{cls.MASTER_URL}/dir/lookup", params={"volumeId": volume_id}, ) if lookup_resp.status_code != 200: return False data = lookup_resp.json() locations = data.get("locations") if not locations: return False # 2️⃣ 发送删除请求 public_url = locations[0]["publicUrl"] delete_url = f"http://{public_url}/{fid}" del_resp = await client.delete(delete_url) # SeaweedFS 删除成功通常返回 200 或 202 return del_resp.status_code in [200, 202] except Exception as e: logger.error(f"SeaweedFS delete failed for fid {fid}: {e}") return False