| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- import os
- import aiohttp
- from fastapi import UploadFile
- from app.core.biz_exception import BizLogicError
- from app.core.logger import logger
- class SeaweedFSService:
- # 1. 优先读取环境变量,默认使用 Docker 内部网络的服务名
- # 只要后端和 SeaweedFS 在同一个 Docker 网络 (visafly-net),这个地址就是通的
- MASTER_URL = os.getenv("SEAWEEDFS_MASTER_URL", "http://seaweedfs-master:9333")
-
- # 文件下载的公网入口(经过你的 Nginx + Backend 代理)
- DOWNLOAD_GATEWAY = "https://visafly.top/api/resource/download_file"
- @classmethod
- async def upload(cls, file: UploadFile):
- """
- 上传文件到 SeaweedFS
- """
- try:
- timeout = aiohttp.ClientTimeout(total=10.0)
- async with aiohttp.ClientSession(timeout=timeout) as client:
- # 1️⃣ 向 Master 申请 fid
- # logger.info(f"Connecting to SeaweedFS Master: {cls.MASTER_URL}")
- try:
- async with client.get(f"{cls.MASTER_URL}/dir/assign") as assign_resp:
- if assign_resp.status >= 300:
- raise BizLogicError("Storage service unavailable")
- assign_data = await assign_resp.json()
- except Exception as e:
- logger.error(f"Failed to connect to Master at {cls.MASTER_URL}. Error: {e}")
- raise BizLogicError("Storage service unavailable")
- fid = assign_data.get("fid")
-
- # 2️⃣ 获取上传地址
- # 在 Docker 共享网络模式下,Master 返回的 publicUrl 就是 "seaweedfs-volume:8080"
- # 后端可以直接解析并访问这个地址,无需任何修改
- public_url = assign_data.get("publicUrl")
-
- if not public_url:
- raise BizLogicError("No volume server available")
- upload_url = f"http://{public_url}/{fid}"
- download_url = f"{cls.DOWNLOAD_GATEWAY}?fid={fid}"
- # 3️⃣ 上传文件数据
- # 读取文件内容
- file_content = await file.read()
- files = {
- "file": (
- file.filename,
- file_content,
- file.content_type or "application/octet-stream",
- )
- }
- # logger.info(f"Uploading to Volume Node: {upload_url}")
- form = aiohttp.FormData()
- form.add_field(
- "file",
- file_content,
- filename=file.filename,
- content_type=file.content_type or "application/octet-stream",
- )
- async with client.post(upload_url, data=form) as upload_resp:
- status = upload_resp.status
- resp_text = await upload_resp.text()
- if status == 201:
- return {
- "fid": fid,
- "url": download_url,
- "size": len(file_content)
- }
- logger.error(f"Upload failed. Status: {status}, Body: {resp_text}")
- raise BizLogicError(f"Storage upload error: {resp_text}")
- except BizLogicError as e:
- raise e
- except Exception as e:
- logger.exception("SeaweedFS upload unexpected error")
- raise BizLogicError(f"Upload exception: {str(e)}")
- @classmethod
- async def get(cls, fid: str):
- """
- 根据 fid 读取文件流
- """
- try:
- if not fid:
- return None
-
- # fid 格式通常是 "3,016a...",逗号前是 volumeId
- volume_id = fid.split(",")[0]
- timeout = aiohttp.ClientTimeout(total=10.0)
- async with aiohttp.ClientSession(timeout=timeout) as client:
- # 1️⃣ 查询文件位置
- async with client.get(
- f"{cls.MASTER_URL}/dir/lookup",
- params={"volumeId": volume_id},
- ) as lookup_resp:
- if lookup_resp.status != 200:
- text = await lookup_resp.text()
- logger.warning(f"Lookup failed for fid {fid}: {text}")
- return None
- data = await lookup_resp.json()
- locations = data.get("locations")
- if not locations:
- return None
- # 2️⃣ 直接请求 Volume Server
- # 同样直接使用 Docker 内部地址
- public_url = locations[0]["publicUrl"]
- file_url = f"http://{public_url}/{fid}"
- async with client.get(file_url) as file_resp:
- if file_resp.status == 200:
- content = await file_resp.read()
- return (
- content,
- file_resp.headers.get("Content-Type", "application/octet-stream"),
- )
- return None
- except Exception as e:
- logger.exception(f"SeaweedFS get failed for fid {fid}")
- return None
- @classmethod
- async def delete(cls, fid: str) -> bool:
- """
- 删除文件
- """
- try:
- if not fid:
- return False
- volume_id = fid.split(",")[0]
- timeout = aiohttp.ClientTimeout(total=5.0)
- async with aiohttp.ClientSession(timeout=timeout) as client:
- # 1️⃣ 查找位置
- async with client.get(
- f"{cls.MASTER_URL}/dir/lookup",
- params={"volumeId": volume_id},
- ) as lookup_resp:
- if lookup_resp.status != 200:
- return False
- data = await lookup_resp.json()
- locations = data.get("locations")
- if not locations:
- return False
- # 2️⃣ 发送删除请求
- public_url = locations[0]["publicUrl"]
- delete_url = f"http://{public_url}/{fid}"
- async with client.delete(delete_url) as del_resp:
- # SeaweedFS 删除成功通常返回 200 或 202
- return del_resp.status in [200, 202]
- except Exception as e:
- logger.error(f"SeaweedFS delete failed for fid {fid}: {e}")
- return False
|