|
|
@@ -1,9 +1,20 @@
|
|
|
+import json
|
|
|
import random
|
|
|
+import aiohttp
|
|
|
+import asyncio
|
|
|
+import string
|
|
|
+import uuid
|
|
|
+
|
|
|
+from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
+from sqlalchemy import select, delete
|
|
|
from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
|
|
|
-from typing import List
|
|
|
+from app.models.user import VasUser
|
|
|
+from app.models.product import VasProduct
|
|
|
+from app.models.schema import VasSchema
|
|
|
+from app.schemas.order import VasOrderCreate
|
|
|
+from typing import List, Optional
|
|
|
from datetime import date, datetime, timedelta
|
|
|
-from app.schemas.fake import FakeUser
|
|
|
-
|
|
|
+from app.services.order_service import OrderService
|
|
|
# ----------------------------------------
|
|
|
# 邮箱域名
|
|
|
# ----------------------------------------
|
|
|
@@ -16,243 +27,124 @@ DOMAINS = [
|
|
|
"teamymail.cfd"
|
|
|
]
|
|
|
|
|
|
-def get_email_prefix(email: str) -> str:
|
|
|
- if not email or "@" not in email:
|
|
|
- return ""
|
|
|
- return email.split("@", 1)[0]
|
|
|
-
|
|
|
-def generate_stable_email(base: str) -> str:
|
|
|
- if not base or not base.strip():
|
|
|
- base = "user"
|
|
|
- index = abs(hash(base)) % len(DOMAINS)
|
|
|
- return f"{base}@{DOMAINS[index]}"
|
|
|
-
|
|
|
-# ----------------------------------------
|
|
|
-# 国籍对应名字和护照格式
|
|
|
-# 全 ASCII
|
|
|
-# ----------------------------------------
|
|
|
-NATIONALITY_DATA = {
|
|
|
- "China": {
|
|
|
- "male_names": ["Wei", "Jian", "Liang", "Hao", "Jun"],
|
|
|
- "female_names": ["Mei", "Li", "Fang", "Xia", "Lan"],
|
|
|
- "surnames": ["Zhang", "Wang", "Li", "Liu", "Chen"],
|
|
|
- "passport_pattern": "E?########"
|
|
|
- },
|
|
|
- "India": {
|
|
|
- "male_names": ["Arjun", "Rohan", "Raj", "Vikram", "Siddharth"],
|
|
|
- "female_names": ["Ananya", "Priya", "Aisha", "Neha", "Kavya"],
|
|
|
- "surnames": ["Sharma", "Patel", "Singh", "Kumar", "Gupta"],
|
|
|
- "passport_pattern": "?#######"
|
|
|
- },
|
|
|
- # "United States": {
|
|
|
- # "male_names": ["James", "John", "Robert", "Michael", "William"],
|
|
|
- # "female_names": ["Mary", "Patricia", "Linda", "Elizabeth", "Jennifer"],
|
|
|
- # "surnames": ["Smith", "Johnson", "Williams", "Brown", "Jones"],
|
|
|
- # "passport_pattern": "#########"
|
|
|
- # },
|
|
|
- # "United Kingdom": {
|
|
|
- # "male_names": ["Oliver", "George", "Harry", "Jack", "Jacob"],
|
|
|
- # "female_names": ["Olivia", "Amelia", "Isla", "Ava", "Emily"],
|
|
|
- # "surnames": ["Smith", "Jones", "Taylor", "Brown", "Williams"],
|
|
|
- # "passport_pattern": "#########"
|
|
|
- # },
|
|
|
- # "Ireland": {
|
|
|
- # "male_names": ["Conor", "Sean", "Patrick", "Liam", "Finn"],
|
|
|
- # "female_names": ["Aoife", "Saoirse", "Emily", "Grace", "Ciara"],
|
|
|
- # "surnames": ["Murphy", "Kelly", "OBrien", "Walsh", "Ryan"],
|
|
|
- # "passport_pattern": "P########"
|
|
|
- # },
|
|
|
- # "Germany": {
|
|
|
- # "male_names": ["Max", "Lukas", "Felix", "Leon", "Paul"],
|
|
|
- # "female_names": ["Anna", "Emma", "Sophia", "Marie", "Laura"],
|
|
|
- # "surnames": ["Mueller", "Schmidt", "Schneider", "Fischer", "Weber"],
|
|
|
- # "passport_pattern": "C########"
|
|
|
- # },
|
|
|
- # "France": {
|
|
|
- # "male_names": ["Louis", "Gabriel", "Raphael", "Arthur", "Jules"],
|
|
|
- # "female_names": ["Emma", "Louise", "Alice", "Chloe", "Lina"],
|
|
|
- # "surnames": ["Martin", "Bernard", "Thomas", "Robert", "Richard"],
|
|
|
- # "passport_pattern": "########"
|
|
|
- # },
|
|
|
- # "Japan": {
|
|
|
- # "male_names": ["Hiroshi", "Takumi", "Kenta", "Yuki", "Ryo"],
|
|
|
- # "female_names": ["Yui", "Sakura", "Mao", "Aya", "Hana"],
|
|
|
- # "surnames": ["Sato", "Suzuki", "Takahashi", "Tanaka", "Watanabe"],
|
|
|
- # "passport_pattern": "TR#######"
|
|
|
- # }
|
|
|
-}
|
|
|
-
|
|
|
-# ----------------------------------------
|
|
|
-# 居住国家地址/电话/邮编(全 ASCII)
|
|
|
-# ----------------------------------------
|
|
|
-RESIDENCE_DATA = {
|
|
|
- "China": {
|
|
|
- "cities": ["Beijing", "Shanghai", "Shenzhen", "Guangzhou", "Chengdu"],
|
|
|
- "streets": ["Heping Road", "Jiefang Road", "Renmin Street", "Zhongshan Avenue", "Fuxing Street"],
|
|
|
- "postcode_prefix": "1",
|
|
|
- "phone_code": "+86",
|
|
|
- "phone_length": 11
|
|
|
- },
|
|
|
- "India": {
|
|
|
- "cities": ["Delhi", "Mumbai", "Bangalore", "Kolkata", "Chennai"],
|
|
|
- "streets": ["MG Road", "Connaught Place", "Brigade Road", "Park Street", "Anna Salai"],
|
|
|
- "postcode_prefix": "5",
|
|
|
- "phone_code": "+91",
|
|
|
- "phone_length": 10
|
|
|
- },
|
|
|
- "United Kingdom": {
|
|
|
- "cities": ["London", "Manchester", "Birmingham", "Leeds", "Glasgow"],
|
|
|
- "streets": ["High Street", "Station Road", "Main Street", "Church Road", "London Road"],
|
|
|
- "postcode_prefix": "SW",
|
|
|
- "phone_code": "+44",
|
|
|
- "phone_length": 10
|
|
|
- },
|
|
|
- "Ireland": {
|
|
|
- "cities": ["Dublin", "Cork", "Galway", "Limerick", "Waterford"],
|
|
|
- "streets": ["OConnell Street", "Patrick Street", "Main Street", "High Street", "Bridge Street"],
|
|
|
- "postcode_prefix": "D",
|
|
|
- "phone_code": "+353",
|
|
|
- "phone_length": 9
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-# ----------------------------------------
|
|
|
-# 辅助函数
|
|
|
-# ----------------------------------------
|
|
|
-def generate_passport(pattern: str) -> str:
|
|
|
- s = ""
|
|
|
- for c in pattern:
|
|
|
- if c == "#":
|
|
|
- s += str(random.randint(0, 9))
|
|
|
- elif c == "?":
|
|
|
- s += random.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
|
|
- else:
|
|
|
- s += c
|
|
|
- return s
|
|
|
-
|
|
|
-def generate_name(nationality: str, gender: str):
|
|
|
- data = NATIONALITY_DATA[nationality]
|
|
|
- if gender == "Male":
|
|
|
- first = random.choice(data["male_names"])
|
|
|
- else:
|
|
|
- first = random.choice(data["female_names"])
|
|
|
- last = random.choice(data["surnames"])
|
|
|
- return first, last
|
|
|
-
|
|
|
-def generate_address(residence_country: str):
|
|
|
- data = RESIDENCE_DATA.get(residence_country)
|
|
|
- street = random.choice(data["streets"])
|
|
|
- city = random.choice(data["cities"])
|
|
|
- postcode = data["postcode_prefix"] + "".join([str(random.randint(0, 9)) for _ in range(5)])
|
|
|
- return f"{random.randint(1, 200)} {street}", city, postcode
|
|
|
-
|
|
|
-# def generate_phone(residence_country: str):
|
|
|
-# data = RESIDENCE_DATA.get(residence_country)
|
|
|
-# num = "".join([str(random.randint(0,9)) for _ in range(data["phone_length"])])
|
|
|
-# return data["phone_code"], num
|
|
|
-
|
|
|
-def generate_phone(country: str):
|
|
|
- if country == "China":
|
|
|
- prefix = "86"
|
|
|
- first_digit = "1"
|
|
|
- length = 11
|
|
|
- elif country == "India":
|
|
|
- prefix = "91"
|
|
|
- first_digit = random.choice(["7", "8", "9"])
|
|
|
- length = 10
|
|
|
- elif country == "Ireland":
|
|
|
- prefix = "353"
|
|
|
- first_digit = "8"
|
|
|
- length = 9
|
|
|
- elif country == "United Kingdom":
|
|
|
- prefix = "44"
|
|
|
- first_digit = "7"
|
|
|
- length = 10
|
|
|
- elif country == "United States":
|
|
|
- prefix = "1"
|
|
|
- first_digit = str(random.randint(2,9))
|
|
|
- length = 10
|
|
|
- elif country == "Germany":
|
|
|
- prefix = "49"
|
|
|
- first_digit = str(random.randint(1,9))
|
|
|
- length = 10
|
|
|
- else:
|
|
|
- prefix = "+00"
|
|
|
- first_digit = "1"
|
|
|
- length = 10
|
|
|
+class FakeService:
|
|
|
|
|
|
- # 生成国内号码(ASCII数字)
|
|
|
- number = first_digit + "".join(str(random.randint(0,9)) for _ in range(length-1))
|
|
|
- return prefix, '0' + number
|
|
|
-
|
|
|
-# ----------------------------------------
|
|
|
-# 主函数
|
|
|
-# ----------------------------------------
|
|
|
-def generate_fake_users(num: int, living_country: str = "Ireland") -> List[FakeUser]:
|
|
|
- users = []
|
|
|
- if living_country not in RESIDENCE_DATA.keys():
|
|
|
- raise BizLogicError(f'Living country only have {RESIDENCE_DATA.keys()}')
|
|
|
- for _ in range(num):
|
|
|
- gender = random.choice(["Male","Female"])
|
|
|
- nationality = random.choice(list(NATIONALITY_DATA.keys()))
|
|
|
- first_name, last_name = generate_name(nationality, gender)
|
|
|
- passport_no = generate_passport(NATIONALITY_DATA[nationality]["passport_pattern"])
|
|
|
-
|
|
|
- address_line1, city, postcode = generate_address(living_country)
|
|
|
- phone_country_code, phone_no = generate_phone(living_country)
|
|
|
-
|
|
|
- fake_email = f"{first_name.lower()}.{last_name.lower()}@example.com"
|
|
|
- alias_email = generate_stable_email(f"{first_name.lower()}.{last_name.lower()}")
|
|
|
+ RANDOM_USER_API = "https://randomuser.me/api/?nat=ie"
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def random_passport():
|
|
|
+ """生成简单护照号"""
|
|
|
+ prefix = random.choice(["P", "L", "X"])
|
|
|
+ digits = "".join(random.choices(string.digits, k=8))
|
|
|
+ return prefix + digits
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def random_passport_expiry():
|
|
|
+ """护照有效期 3-10 年"""
|
|
|
+ today = datetime.today()
|
|
|
+ future = today + timedelta(days=random.randint(3*365, 10*365))
|
|
|
+ return future.strftime("%Y-%m-%d")
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def random_expected_dates():
|
|
|
+ """预约时间范围"""
|
|
|
+ start = datetime.today() + timedelta(days=4)
|
|
|
+ end = start + timedelta(days=random.randint(7, 60))
|
|
|
+ return start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def random_gmail(first, last):
|
|
|
+ """
|
|
|
+ 生成随机 Gmail
|
|
|
+ """
|
|
|
+ first = first.lower()
|
|
|
+ last = last.lower()
|
|
|
+
|
|
|
+ patterns = [
|
|
|
+ f"{first}{random.randint(10,99)}",
|
|
|
+ f"{first}.{last}{random.randint(10,999)}",
|
|
|
+ f"{first[0]}{last}{random.randint(100,999)}"
|
|
|
+ f"{first}{uuid.uuid4().hex[:3]}",
|
|
|
+ f"{first}.{uuid.uuid4().hex[:4]}",
|
|
|
+ f"{first[0]}{uuid.uuid4().hex[:5]}"
|
|
|
+
|
|
|
+ ]
|
|
|
+
|
|
|
+ name = random.choice(patterns)
|
|
|
+
|
|
|
+ return f"{name}@gmail.com"
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ async def fetch_irish_users(num: int):
|
|
|
+
|
|
|
+ url = f"https://randomuser.me/api/?results={num}&nat=ie"
|
|
|
+
|
|
|
+ async with aiohttp.ClientSession() as session:
|
|
|
+ async with session.get(url) as resp:
|
|
|
+ data = await resp.json()
|
|
|
+
|
|
|
+ return data["results"]
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ async def generate_visametric_applicants(num: int):
|
|
|
|
|
|
- travel_date = date.today() + timedelta(days=random.randint(30,90))
|
|
|
- passport_expiry_date = date.today() + timedelta(days=random.randint(365,3650))
|
|
|
- birthday = date.today() - timedelta(days=random.randint(18*365,50*365))
|
|
|
+ users = await FakeService.fetch_irish_users(num)
|
|
|
|
|
|
- user = FakeUser(
|
|
|
- provider="fake",
|
|
|
- visa_center="fake",
|
|
|
- order_no=f"{random.randint(10000000,99999999)}",
|
|
|
- social_account=f"fake-{first_name.lower()}{random.randint(1,99)}",
|
|
|
- account=fake_email,
|
|
|
- password="Password123!",
|
|
|
+ results = []
|
|
|
|
|
|
- last_name=last_name,
|
|
|
- first_name=first_name,
|
|
|
- gender=gender,
|
|
|
- birthday=birthday,
|
|
|
- nationality=nationality,
|
|
|
- passport_no=passport_no,
|
|
|
- passport_expiry_date=passport_expiry_date,
|
|
|
- email=fake_email,
|
|
|
- alias_email=alias_email,
|
|
|
+ for u in users:
|
|
|
|
|
|
- address_line1=address_line1,
|
|
|
- address_line2="",
|
|
|
- city=city,
|
|
|
- state="",
|
|
|
- postcode=postcode,
|
|
|
- phone_country_code=phone_country_code,
|
|
|
- phone_no=phone_no,
|
|
|
+ first = u["name"]["first"]
|
|
|
+ last = u["name"]["last"]
|
|
|
+ start_date, end_date = FakeService.random_expected_dates()
|
|
|
|
|
|
- travel_date=travel_date,
|
|
|
- cover_letter="This is a test cover letter.",
|
|
|
- passport_image_url="https://placekitten.com/400/400",
|
|
|
- selfie_image_url="https://placekitten.com/401/401",
|
|
|
- application_form_url="https://example.com/form",
|
|
|
+ item = {
|
|
|
+ "first_name": u["name"]["first"],
|
|
|
+ "last_name": u["name"]["last"],
|
|
|
+ "birthday": u["dob"]["date"][:10],
|
|
|
+ "passport_no": FakeService.random_passport(),
|
|
|
+ "passport_expiry_date": FakeService.random_passport_expiry(),
|
|
|
+ "email": FakeService.random_gmail(first, last),
|
|
|
+ "phone_country_code": "353",
|
|
|
+ "phone_no": u["cell"].replace(" ", "").replace("-", ""),
|
|
|
+ "expected_start_date": start_date,
|
|
|
+ "expected_end_date": end_date,
|
|
|
+ "social_media_account": f"fake-{u['login']['username']}"
|
|
|
+ }
|
|
|
|
|
|
- priority=random.randint(1,5),
|
|
|
- expected_submit_start='2000-01-01',
|
|
|
- expected_submit_end='2100-01-01',
|
|
|
- rules="",
|
|
|
- status=0,
|
|
|
- placeholder=1,
|
|
|
- appointment_datetime=datetime.now() + timedelta(days=random.randint(20,200)),
|
|
|
- appointment_letter_url="https://example.com/appointment",
|
|
|
- pnr_number="",
|
|
|
- payment_link="",
|
|
|
- payment_help=1,
|
|
|
- note="This is a test note."
|
|
|
- )
|
|
|
- users.append(user)
|
|
|
+ results.append(item)
|
|
|
|
|
|
- return users
|
|
|
+ return results
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ async def generate_orders(db: AsyncSession, num: int, product_id: int, auth_user: VasUser):
|
|
|
+ stmt = select(VasProduct).where(VasProduct.id == product_id)
|
|
|
+ product_obj = (await db.execute(stmt)).scalar_one_or_none()
|
|
|
+ if not product_obj:
|
|
|
+ raise NotFoundError("Product not exist")
|
|
|
+
|
|
|
+ schema_id = product_obj.schema_id
|
|
|
+ stmt = select(VasSchema).where(VasSchema.id == schema_id)
|
|
|
+ schema_obj = (await db.execute(stmt)).scalar_one_or_none()
|
|
|
+ if not schema_obj:
|
|
|
+ raise NotFoundError("Schema not exist")
|
|
|
+ schmea_json = schema_obj.schema_json
|
|
|
+
|
|
|
+ applicants = await FakeService.generate_visametric_applicants(num=num)
|
|
|
+
|
|
|
+ orders = []
|
|
|
+ print(json.dumps(applicants, ensure_ascii=False, indent=2))
|
|
|
+ for app in applicants:
|
|
|
+ order_data = VasOrderCreate(
|
|
|
+ product_id=product_id,
|
|
|
+ user_inputs=app
|
|
|
+ )
|
|
|
+
|
|
|
+ obj = await OrderService.create_by_admin(
|
|
|
+ db=db,
|
|
|
+ data=order_data,
|
|
|
+ product=product_obj,
|
|
|
+ auth_user=auth_user
|
|
|
+ )
|
|
|
+ orders.append(obj)
|
|
|
+ return orders
|