seaweedfs_service.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. import os
  2. import aiohttp
  3. from fastapi import UploadFile
  4. from app.core.biz_exception import BizLogicError
  5. from app.core.logger import logger
  6. class SeaweedFSService:
  7. # 1. 优先读取环境变量,默认使用 Docker 内部网络的服务名
  8. # 只要后端和 SeaweedFS 在同一个 Docker 网络 (visafly-net),这个地址就是通的
  9. MASTER_URL = os.getenv("SEAWEEDFS_MASTER_URL", "http://seaweedfs-master:9333")
  10. # 文件下载的公网入口(经过你的 Nginx + Backend 代理)
  11. DOWNLOAD_GATEWAY = "https://visafly.top/api/resource/download_file"
  12. @classmethod
  13. async def upload(cls, file: UploadFile):
  14. """
  15. 上传文件到 SeaweedFS
  16. """
  17. try:
  18. timeout = aiohttp.ClientTimeout(total=10.0)
  19. async with aiohttp.ClientSession(timeout=timeout) as client:
  20. # 1️⃣ 向 Master 申请 fid
  21. # logger.info(f"Connecting to SeaweedFS Master: {cls.MASTER_URL}")
  22. try:
  23. async with client.get(f"{cls.MASTER_URL}/dir/assign") as assign_resp:
  24. if assign_resp.status >= 300:
  25. raise BizLogicError("Storage service unavailable")
  26. assign_data = await assign_resp.json()
  27. except Exception as e:
  28. logger.error(f"Failed to connect to Master at {cls.MASTER_URL}. Error: {e}")
  29. raise BizLogicError("Storage service unavailable")
  30. fid = assign_data.get("fid")
  31. # 2️⃣ 获取上传地址
  32. # 在 Docker 共享网络模式下,Master 返回的 publicUrl 就是 "seaweedfs-volume:8080"
  33. # 后端可以直接解析并访问这个地址,无需任何修改
  34. public_url = assign_data.get("publicUrl")
  35. if not public_url:
  36. raise BizLogicError("No volume server available")
  37. upload_url = f"http://{public_url}/{fid}"
  38. download_url = f"{cls.DOWNLOAD_GATEWAY}?fid={fid}"
  39. # 3️⃣ 上传文件数据
  40. # 读取文件内容
  41. file_content = await file.read()
  42. files = {
  43. "file": (
  44. file.filename,
  45. file_content,
  46. file.content_type or "application/octet-stream",
  47. )
  48. }
  49. # logger.info(f"Uploading to Volume Node: {upload_url}")
  50. form = aiohttp.FormData()
  51. form.add_field(
  52. "file",
  53. file_content,
  54. filename=file.filename,
  55. content_type=file.content_type or "application/octet-stream",
  56. )
  57. async with client.post(upload_url, data=form) as upload_resp:
  58. status = upload_resp.status
  59. resp_text = await upload_resp.text()
  60. if status == 201:
  61. return {
  62. "fid": fid,
  63. "url": download_url,
  64. "size": len(file_content)
  65. }
  66. logger.error(f"Upload failed. Status: {status}, Body: {resp_text}")
  67. raise BizLogicError(f"Storage upload error: {resp_text}")
  68. except BizLogicError as e:
  69. raise e
  70. except Exception as e:
  71. logger.exception("SeaweedFS upload unexpected error")
  72. raise BizLogicError(f"Upload exception: {str(e)}")
  73. @classmethod
  74. async def get(cls, fid: str):
  75. """
  76. 根据 fid 读取文件流
  77. """
  78. try:
  79. if not fid:
  80. return None
  81. # fid 格式通常是 "3,016a...",逗号前是 volumeId
  82. volume_id = fid.split(",")[0]
  83. timeout = aiohttp.ClientTimeout(total=10.0)
  84. async with aiohttp.ClientSession(timeout=timeout) as client:
  85. # 1️⃣ 查询文件位置
  86. async with client.get(
  87. f"{cls.MASTER_URL}/dir/lookup",
  88. params={"volumeId": volume_id},
  89. ) as lookup_resp:
  90. if lookup_resp.status != 200:
  91. text = await lookup_resp.text()
  92. logger.warning(f"Lookup failed for fid {fid}: {text}")
  93. return None
  94. data = await lookup_resp.json()
  95. locations = data.get("locations")
  96. if not locations:
  97. return None
  98. # 2️⃣ 直接请求 Volume Server
  99. # 同样直接使用 Docker 内部地址
  100. public_url = locations[0]["publicUrl"]
  101. file_url = f"http://{public_url}/{fid}"
  102. async with client.get(file_url) as file_resp:
  103. if file_resp.status == 200:
  104. content = await file_resp.read()
  105. return (
  106. content,
  107. file_resp.headers.get("Content-Type", "application/octet-stream"),
  108. )
  109. return None
  110. except Exception as e:
  111. logger.exception(f"SeaweedFS get failed for fid {fid}")
  112. return None
  113. @classmethod
  114. async def delete(cls, fid: str) -> bool:
  115. """
  116. 删除文件
  117. """
  118. try:
  119. if not fid:
  120. return False
  121. volume_id = fid.split(",")[0]
  122. timeout = aiohttp.ClientTimeout(total=5.0)
  123. async with aiohttp.ClientSession(timeout=timeout) as client:
  124. # 1️⃣ 查找位置
  125. async with client.get(
  126. f"{cls.MASTER_URL}/dir/lookup",
  127. params={"volumeId": volume_id},
  128. ) as lookup_resp:
  129. if lookup_resp.status != 200:
  130. return False
  131. data = await lookup_resp.json()
  132. locations = data.get("locations")
  133. if not locations:
  134. return False
  135. # 2️⃣ 发送删除请求
  136. public_url = locations[0]["publicUrl"]
  137. delete_url = f"http://{public_url}/{fid}"
  138. async with client.delete(delete_url) as del_resp:
  139. # SeaweedFS 删除成功通常返回 200 或 202
  140. return del_resp.status in [200, 202]
  141. except Exception as e:
  142. logger.error(f"SeaweedFS delete failed for fid {fid}: {e}")
  143. return False