# app/schemas/task.py import json from pydantic import BaseModel, field_validator from typing import Optional, Dict, Any, Literal, List from datetime import datetime class VasTaskBase(BaseModel): status: Optional[Literal['pending','grabbed','running','pause','cancelled','completed']] = None priority: Optional[int] = 10 config: Optional[Dict[str, Any]] = None user_inputs: Optional[Dict[str, Any]] = None grabbed_history: Optional[Dict[str, Any]] = None meta: Optional[Dict[str, Any]] = None @field_validator("config", "user_inputs", "grabbed_history", "meta", mode="before") def normalize_json_field(cls, v): if v is None: return None if isinstance(v, str): try: return json.loads(v) except Exception: return {} return v class VasTaskCreate(VasTaskBase): order_id: str routing_key: str script_version: Optional[str] = None expire_at: datetime class VasTaskUpdate(VasTaskBase): pass class VasTaskOut(VasTaskBase): id: int order_id: str routing_key: str script_version: Optional[str] = None attempt_count: int notify_count: int created_at: datetime updated_at: datetime expire_at: datetime model_config = { "from_attributes": True } class VasExpiringTaskItem(BaseModel): id: int order_id: str routing_key: str status: str social_media_account: str customer_name: str # 拼装 first_name + last_name expected_end_date: str email: str days_left: int # 剩余天数 (负数代表已过期)