fake_service.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import json
  2. import random
  3. import aiohttp
  4. import asyncio
  5. import string
  6. import uuid
  7. from sqlalchemy.ext.asyncio import AsyncSession
  8. from sqlalchemy import select, delete
  9. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  10. from app.models.user import VasUser
  11. from app.models.product import VasProduct
  12. from app.models.schema import VasSchema
  13. from app.schemas.order import VasOrderCreate
  14. from typing import List, Optional
  15. from datetime import date, datetime, timedelta
  16. from app.services.order_service import OrderService
  17. # ----------------------------------------
  18. # 邮箱域名
  19. # ----------------------------------------
  20. DOMAINS = [
  21. "gmail-app.com",
  22. "outlooksearch.com",
  23. "hotmails.vip",
  24. "gmail365.cc",
  25. "ymails.top",
  26. "teamymail.cfd"
  27. ]
  28. class FakeService:
  29. RANDOM_USER_API = "https://randomuser.me/api/?nat=ie"
  30. @staticmethod
  31. def random_passport():
  32. """生成简单护照号"""
  33. prefix = random.choice(["P", "L", "X"])
  34. digits = "".join(random.choices(string.digits, k=8))
  35. return prefix + digits
  36. @staticmethod
  37. def random_passport_expiry():
  38. """护照有效期 3-10 年"""
  39. today = datetime.today()
  40. future = today + timedelta(days=random.randint(3*365, 10*365))
  41. return future.strftime("%Y-%m-%d")
  42. @staticmethod
  43. def random_expected_dates():
  44. """预约时间范围"""
  45. start = datetime.today() + timedelta(days=4)
  46. end = start + timedelta(days=random.randint(7, 60))
  47. return start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")
  48. @staticmethod
  49. def random_gmail(first, last):
  50. """
  51. 生成随机 Gmail
  52. """
  53. first = first.lower()
  54. last = last.lower()
  55. patterns = [
  56. f"{first}{random.randint(10,99)}",
  57. f"{first}.{last}{random.randint(10,999)}",
  58. f"{first[0]}{last}{random.randint(100,999)}"
  59. f"{first}{uuid.uuid4().hex[:3]}",
  60. f"{first}.{uuid.uuid4().hex[:4]}",
  61. f"{first[0]}{uuid.uuid4().hex[:5]}"
  62. ]
  63. name = random.choice(patterns)
  64. return f"{name}@gmail.com"
  65. @staticmethod
  66. async def fetch_irish_users(num: int):
  67. url = f"https://randomuser.me/api/?results={num}&nat=ie"
  68. async with aiohttp.ClientSession() as session:
  69. async with session.get(url) as resp:
  70. data = await resp.json()
  71. return data["results"]
  72. @staticmethod
  73. async def generate_visametric_applicants(num: int):
  74. users = await FakeService.fetch_irish_users(num)
  75. results = []
  76. for u in users:
  77. first = u["name"]["first"]
  78. last = u["name"]["last"]
  79. start_date, end_date = FakeService.random_expected_dates()
  80. item = {
  81. "first_name": u["name"]["first"],
  82. "last_name": u["name"]["last"],
  83. "birthday": u["dob"]["date"][:10],
  84. "passport_no": FakeService.random_passport(),
  85. "passport_expiry_date": FakeService.random_passport_expiry(),
  86. "email": FakeService.random_gmail(first, last),
  87. "phone_country_code": "353",
  88. "phone_no": u["cell"].replace(" ", "").replace("-", ""),
  89. "expected_start_date": start_date,
  90. "expected_end_date": end_date,
  91. "social_media_account": f"fake-{u['login']['username']}"
  92. }
  93. results.append(item)
  94. return results
  95. @staticmethod
  96. async def generate_orders(db: AsyncSession, num: int, product_id: int, auth_user: VasUser):
  97. stmt = select(VasProduct).where(VasProduct.id == product_id)
  98. product_obj = (await db.execute(stmt)).scalar_one_or_none()
  99. if not product_obj:
  100. raise NotFoundError("Product not exist")
  101. schema_id = product_obj.schema_id
  102. stmt = select(VasSchema).where(VasSchema.id == schema_id)
  103. schema_obj = (await db.execute(stmt)).scalar_one_or_none()
  104. if not schema_obj:
  105. raise NotFoundError("Schema not exist")
  106. schmea_json = schema_obj.schema_json
  107. applicants = await FakeService.generate_visametric_applicants(num=num)
  108. orders = []
  109. print(json.dumps(applicants, ensure_ascii=False, indent=2))
  110. for app in applicants:
  111. order_data = VasOrderCreate(
  112. product_id=product_id,
  113. user_inputs=app
  114. )
  115. obj = await OrderService.create_by_admin(
  116. db=db,
  117. data=order_data,
  118. product=product_obj,
  119. auth_user=auth_user
  120. )
  121. orders.append(obj)
  122. return orders