| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- import json
- import random
- import aiohttp
- import asyncio
- import string
- import uuid
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select, delete
- from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
- from app.models.user import VasUser
- from app.models.product import VasProduct
- from app.models.schema import VasSchema
- from app.schemas.order import VasOrderCreate
- from typing import List, Optional
- from datetime import date, datetime, timedelta
- from app.services.order_service import OrderService
- # ----------------------------------------
- # 邮箱域名
- # ----------------------------------------
- DOMAINS = [
- "gmail-app.com",
- "outlooksearch.com",
- "hotmails.vip",
- "gmail365.cc",
- "ymails.top",
- "teamymail.cfd"
- ]
- class FakeService:
-
- RANDOM_USER_API = "https://randomuser.me/api/?nat=ie"
- @staticmethod
- def random_passport():
- """生成简单护照号"""
- prefix = random.choice(["P", "L", "X"])
- digits = "".join(random.choices(string.digits, k=8))
- return prefix + digits
- @staticmethod
- def random_passport_expiry():
- """护照有效期 3-10 年"""
- today = datetime.today()
- future = today + timedelta(days=random.randint(3*365, 10*365))
- return future.strftime("%Y-%m-%d")
- @staticmethod
- def random_expected_dates():
- """预约时间范围"""
- start = datetime.today() + timedelta(days=4)
- end = start + timedelta(days=random.randint(7, 60))
- return start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")
-
- @staticmethod
- def random_gmail(first, last):
- """
- 生成随机 Gmail
- """
- first = first.lower()
- last = last.lower()
- patterns = [
- f"{first}{random.randint(10,99)}",
- f"{first}.{last}{random.randint(10,999)}",
- f"{first[0]}{last}{random.randint(100,999)}"
- f"{first}{uuid.uuid4().hex[:3]}",
- f"{first}.{uuid.uuid4().hex[:4]}",
- f"{first[0]}{uuid.uuid4().hex[:5]}"
-
- ]
- name = random.choice(patterns)
- return f"{name}@gmail.com"
- @staticmethod
- async def fetch_irish_users(num: int):
- url = f"https://randomuser.me/api/?results={num}&nat=ie"
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as resp:
- data = await resp.json()
- return data["results"]
-
- @staticmethod
- async def generate_visametric_applicants(num: int):
- users = await FakeService.fetch_irish_users(num)
- results = []
- for u in users:
- first = u["name"]["first"]
- last = u["name"]["last"]
- start_date, end_date = FakeService.random_expected_dates()
- item = {
- "first_name": u["name"]["first"],
- "last_name": u["name"]["last"],
- "birthday": u["dob"]["date"][:10],
- "passport_no": FakeService.random_passport(),
- "passport_expiry_date": FakeService.random_passport_expiry(),
- "email": FakeService.random_gmail(first, last),
- "phone_country_code": "353",
- "phone_no": u["cell"].replace(" ", "").replace("-", ""),
- "expected_start_date": start_date,
- "expected_end_date": end_date,
- "social_media_account": f"fake-{u['login']['username']}"
- }
- results.append(item)
- return results
-
- @staticmethod
- async def generate_orders(db: AsyncSession, num: int, product_id: int, auth_user: VasUser):
- stmt = select(VasProduct).where(VasProduct.id == product_id)
- product_obj = (await db.execute(stmt)).scalar_one_or_none()
- if not product_obj:
- raise NotFoundError("Product not exist")
-
- schema_id = product_obj.schema_id
- stmt = select(VasSchema).where(VasSchema.id == schema_id)
- schema_obj = (await db.execute(stmt)).scalar_one_or_none()
- if not schema_obj:
- raise NotFoundError("Schema not exist")
- schmea_json = schema_obj.schema_json
-
- applicants = await FakeService.generate_visametric_applicants(num=num)
-
- orders = []
- print(json.dumps(applicants, ensure_ascii=False, indent=2))
- for app in applicants:
- order_data = VasOrderCreate(
- product_id=product_id,
- user_inputs=app
- )
- obj = await OrderService.create_by_admin(
- db=db,
- data=order_data,
- product=product_obj,
- auth_user=auth_user
- )
- orders.append(obj)
- return orders
|