| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- # app/services/seaweedfs_service.py
- import httpx
- from fastapi import UploadFile
- from app.core.biz_exception import BizLogicError
- from app.core.logger import logger
- class SeaweedFSService:
- MASTER_URL = "http://127.0.0.1:9333" # SeaweedFS master 地址
- DOWNLOAD_GATEWAY = "http://45.137.220.138:8888/api/resource/download_file"
- @classmethod
- async def upload(cls, file: UploadFile):
- """上传文件到 SeaweedFS(异步)"""
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- # 1️⃣ 获取 volume
- assign_resp = await client.get(f"{cls.MASTER_URL}/dir/assign")
- assign_resp.raise_for_status()
- assign_data = assign_resp.json()
- fid = assign_data["fid"]
- public_url = assign_data["publicUrl"]
- upload_url = f"http://{public_url}/{fid}"
- download_url = f"{cls.DOWNLOAD_GATEWAY}?fid={fid}"
- # 2️⃣ 上传文件
- files = {
- "file": (
- file.filename,
- await file.read(),
- file.content_type,
- )
- }
- upload_resp = await client.post(upload_url, files=files)
- if upload_resp.status_code == 201:
- return {
- "fid": fid,
- "url": download_url,
- }
- raise BizLogicError(f"file upload error: {upload_resp.text}")
- except Exception as e:
- logger.exception("SeaweedFS upload failed")
- raise BizLogicError(f"file upload exception: {e}")
- @classmethod
- async def get(cls, fid: str):
- """根据 fid 读取文件(异步)"""
- try:
- volume_id = fid.split(",")[0]
- async with httpx.AsyncClient(timeout=10) as client:
- lookup_resp = await client.get(
- f"{cls.MASTER_URL}/dir/lookup",
- params={"volumeId": volume_id},
- )
- lookup_resp.raise_for_status()
- data = lookup_resp.json()
- if not data.get("locations"):
- return None
- public_url = data["locations"][0]["publicUrl"]
- file_url = f"http://{public_url}/{fid}"
- file_resp = await client.get(file_url)
- if file_resp.status_code == 200:
- return (
- file_resp.content,
- file_resp.headers.get(
- "Content-Type", "application/octet-stream"
- ),
- )
- return None
- except Exception as e:
- logger.exception(f"SeaweedFS get failed, reason={e}")
- return None
- @classmethod
- async def delete(cls, fid: str) -> bool:
- """删除文件(异步)"""
- try:
- volume_id = fid.split(",")[0]
- async with httpx.AsyncClient(timeout=10) as client:
- lookup_resp = await client.get(
- f"{cls.MASTER_URL}/dir/lookup",
- params={"volumeId": volume_id},
- )
- lookup_resp.raise_for_status()
- data = lookup_resp.json()
- if not data.get("locations"):
- return False
- public_url = data["locations"][0]["publicUrl"]
- delete_url = f"http://{public_url}/{fid}"
- del_resp = await client.delete(delete_url)
- return del_resp.status_code == 202
- except Exception as e:
- logger.exception(f"SeaweedFS delete failed, reason={e}")
- return False
|