seaweedfs_service.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # app/services/seaweedfs_service.py
  2. import httpx
  3. from fastapi import UploadFile
  4. from app.core.biz_exception import BizLogicError
  5. from app.core.logger import logger
  6. class SeaweedFSService:
  7. MASTER_URL = "http://127.0.0.1:9333" # SeaweedFS master 地址
  8. DOWNLOAD_GATEWAY = "http://45.137.220.138:8888/api/resource/download_file"
  9. @classmethod
  10. async def upload(cls, file: UploadFile):
  11. """上传文件到 SeaweedFS(异步)"""
  12. try:
  13. async with httpx.AsyncClient(timeout=10) as client:
  14. # 1️⃣ 获取 volume
  15. assign_resp = await client.get(f"{cls.MASTER_URL}/dir/assign")
  16. assign_resp.raise_for_status()
  17. assign_data = assign_resp.json()
  18. fid = assign_data["fid"]
  19. public_url = assign_data["publicUrl"]
  20. upload_url = f"http://{public_url}/{fid}"
  21. download_url = f"{cls.DOWNLOAD_GATEWAY}?fid={fid}"
  22. # 2️⃣ 上传文件
  23. files = {
  24. "file": (
  25. file.filename,
  26. await file.read(),
  27. file.content_type,
  28. )
  29. }
  30. upload_resp = await client.post(upload_url, files=files)
  31. if upload_resp.status_code == 201:
  32. return {
  33. "fid": fid,
  34. "url": download_url,
  35. }
  36. raise BizLogicError(f"file upload error: {upload_resp.text}")
  37. except Exception as e:
  38. logger.exception("SeaweedFS upload failed")
  39. raise BizLogicError(f"file upload exception: {e}")
  40. @classmethod
  41. async def get(cls, fid: str):
  42. """根据 fid 读取文件(异步)"""
  43. try:
  44. volume_id = fid.split(",")[0]
  45. async with httpx.AsyncClient(timeout=10) as client:
  46. lookup_resp = await client.get(
  47. f"{cls.MASTER_URL}/dir/lookup",
  48. params={"volumeId": volume_id},
  49. )
  50. lookup_resp.raise_for_status()
  51. data = lookup_resp.json()
  52. if not data.get("locations"):
  53. return None
  54. public_url = data["locations"][0]["publicUrl"]
  55. file_url = f"http://{public_url}/{fid}"
  56. file_resp = await client.get(file_url)
  57. if file_resp.status_code == 200:
  58. return (
  59. file_resp.content,
  60. file_resp.headers.get(
  61. "Content-Type", "application/octet-stream"
  62. ),
  63. )
  64. return None
  65. except Exception as e:
  66. logger.exception(f"SeaweedFS get failed, reason={e}")
  67. return None
  68. @classmethod
  69. async def delete(cls, fid: str) -> bool:
  70. """删除文件(异步)"""
  71. try:
  72. volume_id = fid.split(",")[0]
  73. async with httpx.AsyncClient(timeout=10) as client:
  74. lookup_resp = await client.get(
  75. f"{cls.MASTER_URL}/dir/lookup",
  76. params={"volumeId": volume_id},
  77. )
  78. lookup_resp.raise_for_status()
  79. data = lookup_resp.json()
  80. if not data.get("locations"):
  81. return False
  82. public_url = data["locations"][0]["publicUrl"]
  83. delete_url = f"http://{public_url}/{fid}"
  84. del_resp = await client.delete(delete_url)
  85. return del_resp.status_code == 202
  86. except Exception as e:
  87. logger.exception(f"SeaweedFS delete failed, reason={e}")
  88. return False