import aiohttp import asyncio import json import os from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.config import settings from app.core.biz_exception import NotFoundError from app.schemas.llm import ParseUserInputsPayload, ParseUserInputsOut from app.models.schema import VasSchema # --- 配置区 --- # 请换成你新生成的 API Key API_KEY = settings.openai_api_key # API_URL = "https://api.openai.com/v1/chat/completions" API_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions" class LlmService: async def handle_parse(db: AsyncSession, payload: ParseUserInputsPayload): stmt = select(VasSchema).where(VasSchema.id == payload.schema_id) obj = (await db.execute(stmt)).scalar_one_or_none() if not obj: raise NotFoundError("Schema not exist") parsed_obj = await LlmService.parse_data_async(payload.input_raw_str, obj.schema_json) out = ParseUserInputsOut(parsed_obj=parsed_obj) return out @staticmethod async def parse_data_async(user_text: str, json_schema: dict): """ [异步版本] 调用 LLM 解析数据 """ headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } # 构造 Prompt system_instruction = "You are a specialized data extraction API. Output valid JSON only." user_prompt = f""" Extract data from the text strictly based on the provided JSON Schema. [JSON Schema] {json.dumps(json_schema)} [User Text] {user_text} """ payload = { # "model": "gpt-4o", # 或 gpt-3.5-turbo "model": "qwen-plus", "messages": [ {"role": "system", "content": system_instruction}, {"role": "user", "content": user_prompt} ], "temperature": 0, "response_format": {"type": "json_object"} # 强制 JSON 模式 } async with aiohttp.ClientSession() as session: async with session.post(API_URL, headers=headers, json=payload, timeout=30) as response: # 1. 检查 HTTP 状态码 if response.status != 200: error_text = await response.text() return {"error": f"HTTP {response.status}", "detail": error_text} # 2. 获取响应体 result = await response.json() # 3. 提取并解析内容 content_str = result['choices'][0]['message']['content'] return json.loads(content_str) # --- 测试运行 --- if __name__ == "__main__": # 定义你的 Schema my_schema = { "type": "object", "properties": { "full_name": {"type": "string"}, "budget": {"type": "integer"}, "items": {"type": "array", "items": {"type": "string"}} }, "required": ["full_name", "budget"] } # 模拟用户输入 user_input = "我是张三,打算花2000块钱买个耳机和键盘。" print("正在解析...") result = LlmService.parse_data_async(user_input, my_schema) print(json.dumps(result, ensure_ascii=False, indent=2))