| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- import aiohttp
- import asyncio
- import json
- import os
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from app.core.config import settings
- from app.schemas.llm import ParseUserInputsPayload, ParseUserInputsOut
- from app.models.schema import VasSchema
- # --- 配置区 ---
- # 请换成你新生成的 API Key
- API_KEY = settings.openai_api_key
- API_URL = "https://api.openai.com/v1/chat/completions"
- class LlmService:
- async def handle_parse(db: AsyncSession, payload: ParseUserInputsPayload):
- stmt = select(VasSchema).where(VasSchema.id == payload.schema_id)
- obj = (await db.execute(stmt)).scalar_one_or_none()
- if not obj:
- raise NotFoundError("Schema not exist")
- parsed_obj = await LlmService.parse_data_async(payload.input_raw_str, obj.schema_content)
- out = ParseUserInputsOut(parsed_obj=parsed_obj)
- return out
-
- @staticmethod
- async def parse_data_async(user_text: str, json_schema: dict):
- """
- [异步版本] 调用 LLM 解析数据
- """
- headers = {
- "Authorization": f"Bearer {API_KEY}",
- "Content-Type": "application/json"
- }
- # 构造 Prompt
- system_instruction = "You are a specialized data extraction API. Output valid JSON only."
-
- user_prompt = f"""
- Extract data from the text strictly based on the provided JSON Schema.
-
- [JSON Schema]
- {json.dumps(json_schema)}
-
- [User Text]
- {user_text}
- """
- payload = {
- "model": "gpt-4o", # 或 gpt-3.5-turbo
- "messages": [
- {"role": "system", "content": system_instruction},
- {"role": "user", "content": user_prompt}
- ],
- "temperature": 0,
- "response_format": {"type": "json_object"} # 强制 JSON 模式
- }
-
- async with aiohttp.ClientSession() as session:
- async with session.post(API_URL, headers=headers, json=payload, timeout=30) as response:
-
- # 1. 检查 HTTP 状态码
- if response.status != 200:
- error_text = await response.text()
- return {"error": f"HTTP {response.status}", "detail": error_text}
-
- # 2. 获取响应体
- result = await response.json()
-
- # 3. 提取并解析内容
- content_str = result['choices'][0]['message']['content']
- return json.loads(content_str)
- # --- 测试运行 ---
- if __name__ == "__main__":
- # 定义你的 Schema
- my_schema = {
- "type": "object",
- "properties": {
- "full_name": {"type": "string"},
- "budget": {"type": "integer"},
- "items": {"type": "array", "items": {"type": "string"}}
- },
- "required": ["full_name", "budget"]
- }
- # 模拟用户输入
- user_input = "我是张三,打算花2000块钱买个耳机和键盘。"
-
- print("正在解析...")
- result = parse_data_api(user_input, my_schema)
- print(json.dumps(result, ensure_ascii=False, indent=2))
|