Files

206 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
消息格式转换模块
将 OpenAI Responses API 请求格式转换为 DeepSeek Chat Completions API 格式。
"""
def _clean_schema(obj):
"""递归清除 JSON Schema 中 DeepSeek 不支持的字段"""
if not isinstance(obj, dict):
return obj
cleaned = {}
for k, v in obj.items():
if k in ("additionalProperties", "strict"):
continue
if isinstance(v, dict):
cleaned[k] = _clean_schema(v)
elif isinstance(v, list):
cleaned[k] = [_clean_schema(i) if isinstance(i, dict) else i for i in v]
else:
cleaned[k] = v
return cleaned
def _convert_tools(tools: list) -> list:
"""将工具定义从 Responses API 格式转换为 Chat Completions API 格式"""
result = []
for tool in tools:
if not isinstance(tool, dict):
continue
if tool.get("type") != "function":
continue
func = {
"name": tool.get("name", ""),
"description": tool.get("description", ""),
}
if "parameters" in tool:
func["parameters"] = _clean_schema(tool["parameters"])
result.append({"type": "function", "function": func})
return result
def _convert_tool_choice(tc):
"""将 tool_choice 从 Responses API 格式转换为 Chat Completions 格式"""
if tc is None:
return "auto"
if isinstance(tc, str):
return tc
if isinstance(tc, dict) and tc.get("type") == "function":
return {"type": "function", "function": {"name": tc.get("name", "")}}
return "auto"
def extract_messages(data: dict):
"""
从 Responses API 请求中提取 messages 列表、tools 列表和 tool_choice。
支持两种输入格式:
- Responses APIinput/instructions 字段)
- Chat Completions APImessages 字段)
Returns:
(messages, tools, tool_choice)
"""
ROLE_MAP = {"developer": "system"}
raw_tools = data.get("tools", [])
tools = _convert_tools(raw_tools)
tool_choice = _convert_tool_choice(data.get("tool_choice"))
if "input" not in data:
if "messages" in data:
return data["messages"], tools, tool_choice
return [], tools, tool_choice
inp = data["input"]
if isinstance(inp, str):
messages = []
if "instructions" in data and data["instructions"]:
messages.append({"role": "system", "content": data["instructions"]})
messages.append({"role": "user", "content": inp})
return messages, tools, tool_choice
if not isinstance(inp, list):
return [], tools, tool_choice
messages = []
if "instructions" in data and data["instructions"]:
messages.append({"role": "system", "content": data["instructions"]})
pending_tool_calls = []
pending_reasoning = ""
def _flush_tool_calls():
nonlocal pending_tool_calls, pending_reasoning
if pending_tool_calls:
msg = {
"role": "assistant",
"content": "",
"tool_calls": pending_tool_calls,
}
if pending_reasoning:
msg["reasoning_content"] = pending_reasoning
messages.append(msg)
pending_tool_calls = []
pending_reasoning = ""
for item in inp:
if not isinstance(item, dict):
continue
item_type = item.get("type")
if item_type == "message":
_flush_tool_calls()
role = item.get("role", "user")
role = ROLE_MAP.get(role, role)
content = item.get("content", "")
if isinstance(content, list):
texts = []
tool_calls = []
for c in content:
if not isinstance(c, dict):
continue
c_type = c.get("type")
if c_type in ("text", "input_text", "output_text"):
t = c.get("text", "")
if t.strip():
texts.append(t)
elif c_type == "tool_call":
tool_calls.append({
"id": c.get("id", ""),
"type": "function",
"function": {
"name": c.get("name", ""),
"arguments": c.get("arguments", ""),
},
})
text_content = "\n".join(texts)
if tool_calls:
msg = {"role": role, "content": text_content or ""}
msg["tool_calls"] = tool_calls
if item.get("reasoning_content"):
msg["reasoning_content"] = item["reasoning_content"]
messages.append(msg)
elif text_content:
msg = {"role": role, "content": text_content}
if item.get("reasoning_content"):
msg["reasoning_content"] = item["reasoning_content"]
messages.append(msg)
elif isinstance(content, str) and content.strip():
msg = {"role": role, "content": content.strip()}
if item.get("reasoning_content"):
msg["reasoning_content"] = item["reasoning_content"]
messages.append(msg)
elif item_type == "function_call":
pending_tool_calls.append({
"id": item.get("call_id", ""),
"type": "function",
"function": {
"name": item.get("name", ""),
"arguments": item.get("arguments", ""),
},
})
if item.get("reasoning_content") and not pending_reasoning:
pending_reasoning = item["reasoning_content"]
elif item_type == "function_call_output":
_flush_tool_calls()
messages.append({
"role": "tool",
"tool_call_id": item.get("call_id", ""),
"content": item.get("output", ""),
})
_flush_tool_calls()
# ---- 重排消息:确保 tool 消息紧跟对应的 assistant 消息 ----
reordered = []
i = 0
while i < len(messages):
msg = messages[i]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
expected_ids = {tc["id"] for tc in msg["tool_calls"]}
tool_msgs = []
non_tool_msgs = []
j = i + 1
while j < len(messages) and expected_ids:
nxt = messages[j]
if nxt.get("role") == "tool" and nxt.get("tool_call_id") in expected_ids:
expected_ids.remove(nxt["tool_call_id"])
tool_msgs.append(nxt)
elif nxt.get("role") in ("system", "developer"):
non_tool_msgs.append(nxt)
else:
break
j += 1
reordered.extend(non_tool_msgs)
reordered.append(msg)
reordered.extend(tool_msgs)
i = j
else:
reordered.append(msg)
i += 1
return reordered, tools, tool_choice