task.py 867 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from pydantic import BaseModel, field_validator
  2. from typing import Optional, Any, Dict
  3. from datetime import datetime
  4. class TaskBase(BaseModel):
  5. command: str
  6. args: Optional[Dict[str, Any]] = None
  7. result: Optional[Dict[str, Any]] = None
  8. status: Optional[int] = 0
  9. @field_validator("args", "result", mode="before")
  10. def normalize_json_field(cls, v):
  11. if v is None:
  12. return None
  13. if isinstance(v, str):
  14. try:
  15. return json.loads(v)
  16. except Exception:
  17. return {}
  18. return v
  19. class TaskCreate(TaskBase):
  20. pass
  21. class TaskUpdate(BaseModel):
  22. result: Optional[Dict[str, Any]] = None
  23. status: int
  24. class TaskOut(TaskBase):
  25. id: int
  26. create_at: datetime
  27. update_at: datetime
  28. model_config = {
  29. "from_attributes": True
  30. }