| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- # app/schemas/task.py
- import json
- from pydantic import BaseModel, field_validator
- from typing import Optional, Dict, Any, Literal, List
- from datetime import datetime
- class VasTaskBase(BaseModel):
- status: Optional[Literal['pending','grabbed','running','pause','cancelled','completed']] = None
- priority: Optional[int] = 10
- config: Optional[Dict[str, Any]] = None
- user_inputs: Optional[Dict[str, Any]] = None
- grabbed_history: Optional[Dict[str, Any]] = None
- meta: Optional[Dict[str, Any]] = None
-
- @field_validator("config", "user_inputs", "grabbed_history", "meta", mode="before")
- def normalize_json_field(cls, v):
- if v is None:
- return None
- if isinstance(v, str):
- try:
- return json.loads(v)
- except Exception:
- return {}
- return v
- class VasTaskCreate(VasTaskBase):
- order_id: str
- routing_key: str
- script_version: Optional[str] = None
- expire_at: datetime
-
- class VasTaskUpdate(VasTaskBase):
- pass
- class VasTaskOut(VasTaskBase):
- id: int
-
- order_id: str
- routing_key: str
- script_version: Optional[str] = None
-
- attempt_count: int
- notify_count: int
-
- created_at: datetime
- updated_at: datetime
- expire_at: datetime
- model_config = {
- "from_attributes": True
- }
-
- class VasExpiringTaskItem(BaseModel):
- id: int
- order_id: str
- routing_key: str
- status: str
- social_media_account: str
- customer_name: str # 拼装 first_name + last_name
- expected_end_date: str
- email: str
- days_left: int # 剩余天数 (负数代表已过期)
|