seaweedfs_service.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import os
  2. import httpx
  3. from fastapi import UploadFile
  4. from app.core.biz_exception import BizLogicError
  5. from app.core.logger import logger
  6. class SeaweedFSService:
  7. # 1. 优先读取环境变量,默认使用 Docker 内部网络的服务名
  8. # 只要后端和 SeaweedFS 在同一个 Docker 网络 (visafly-net),这个地址就是通的
  9. MASTER_URL = os.getenv("SEAWEEDFS_MASTER_URL", "http://seaweedfs-master:9333")
  10. # 文件下载的公网入口(经过你的 Nginx + Backend 代理)
  11. DOWNLOAD_GATEWAY = "https://visafly.top/api/resource/download_file"
  12. @classmethod
  13. async def upload(cls, file: UploadFile):
  14. """
  15. 上传文件到 SeaweedFS
  16. """
  17. try:
  18. async with httpx.AsyncClient(timeout=10.0) as client:
  19. # 1️⃣ 向 Master 申请 fid
  20. # logger.info(f"Connecting to SeaweedFS Master: {cls.MASTER_URL}")
  21. try:
  22. assign_resp = await client.get(f"{cls.MASTER_URL}/dir/assign")
  23. assign_resp.raise_for_status()
  24. except Exception as e:
  25. logger.error(f"Failed to connect to Master at {cls.MASTER_URL}. Error: {e}")
  26. raise BizLogicError("Storage service unavailable")
  27. assign_data = assign_resp.json()
  28. fid = assign_data.get("fid")
  29. # 2️⃣ 获取上传地址
  30. # 在 Docker 共享网络模式下,Master 返回的 publicUrl 就是 "seaweedfs-volume:8080"
  31. # 后端可以直接解析并访问这个地址,无需任何修改
  32. public_url = assign_data.get("publicUrl")
  33. if not public_url:
  34. raise BizLogicError("No volume server available")
  35. upload_url = f"http://{public_url}/{fid}"
  36. download_url = f"{cls.DOWNLOAD_GATEWAY}?fid={fid}"
  37. # 3️⃣ 上传文件数据
  38. # 读取文件内容
  39. file_content = await file.read()
  40. files = {
  41. "file": (
  42. file.filename,
  43. file_content,
  44. file.content_type or "application/octet-stream",
  45. )
  46. }
  47. # logger.info(f"Uploading to Volume Node: {upload_url}")
  48. upload_resp = await client.post(upload_url, files=files)
  49. if upload_resp.status_code == 201:
  50. return {
  51. "fid": fid,
  52. "url": download_url,
  53. "size": len(file_content)
  54. }
  55. logger.error(f"Upload failed. Status: {upload_resp.status_code}, Body: {upload_resp.text}")
  56. raise BizLogicError(f"Storage upload error: {upload_resp.text}")
  57. except BizLogicError as e:
  58. raise e
  59. except Exception as e:
  60. logger.exception("SeaweedFS upload unexpected error")
  61. raise BizLogicError(f"Upload exception: {str(e)}")
  62. @classmethod
  63. async def get(cls, fid: str):
  64. """
  65. 根据 fid 读取文件流
  66. """
  67. try:
  68. if not fid:
  69. return None
  70. # fid 格式通常是 "3,016a...",逗号前是 volumeId
  71. volume_id = fid.split(",")[0]
  72. async with httpx.AsyncClient(timeout=10.0) as client:
  73. # 1️⃣ 查询文件位置
  74. lookup_resp = await client.get(
  75. f"{cls.MASTER_URL}/dir/lookup",
  76. params={"volumeId": volume_id},
  77. )
  78. if lookup_resp.status_code != 200:
  79. logger.warning(f"Lookup failed for fid {fid}: {lookup_resp.text}")
  80. return None
  81. data = lookup_resp.json()
  82. locations = data.get("locations")
  83. if not locations:
  84. return None
  85. # 2️⃣ 直接请求 Volume Server
  86. # 同样直接使用 Docker 内部地址
  87. public_url = locations[0]["publicUrl"]
  88. file_url = f"http://{public_url}/{fid}"
  89. file_resp = await client.get(file_url)
  90. if file_resp.status_code == 200:
  91. return (
  92. file_resp.content,
  93. file_resp.headers.get("Content-Type", "application/octet-stream"),
  94. )
  95. return None
  96. except Exception as e:
  97. logger.exception(f"SeaweedFS get failed for fid {fid}")
  98. return None
  99. @classmethod
  100. async def delete(cls, fid: str) -> bool:
  101. """
  102. 删除文件
  103. """
  104. try:
  105. if not fid:
  106. return False
  107. volume_id = fid.split(",")[0]
  108. async with httpx.AsyncClient(timeout=5.0) as client:
  109. # 1️⃣ 查找位置
  110. lookup_resp = await client.get(
  111. f"{cls.MASTER_URL}/dir/lookup",
  112. params={"volumeId": volume_id},
  113. )
  114. if lookup_resp.status_code != 200:
  115. return False
  116. data = lookup_resp.json()
  117. locations = data.get("locations")
  118. if not locations:
  119. return False
  120. # 2️⃣ 发送删除请求
  121. public_url = locations[0]["publicUrl"]
  122. delete_url = f"http://{public_url}/{fid}"
  123. del_resp = await client.delete(delete_url)
  124. # SeaweedFS 删除成功通常返回 200 或 202
  125. return del_resp.status_code in [200, 202]
  126. except Exception as e:
  127. logger.error(f"SeaweedFS delete failed for fid {fid}: {e}")
  128. return False