Files
ChouJuGEO/modules/semantic_expander.py
T
2026-04-30 18:37:46 +08:00

394 lines
14 KiB
Python

"""
语义足迹扩展模块
基于现有关键词,通过语义相似度扩展出更多相关关键词,提升关键词覆盖面
"""
from typing import List, Dict, Set, Optional
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
import json
import re
from difflib import SequenceMatcher
class SemanticExpander:
"""语义足迹扩展器"""
def __init__(self):
# 语义扩展 Prompt
self.expansion_prompt_template = """
你是关键词扩展专家,专门基于现有关键词生成语义相关的扩展关键词,提升关键词覆盖面。
【现有关键词】
{existing_keywords}
【品牌】{brand}
【优势】{advantages}
【扩展数量】{expansion_count}
【语义足迹扩展要求】
1. **语义相关性**
- 生成的关键词必须与现有关键词在语义上相关
- 覆盖相同的搜索意图,但使用不同的表达方式
- 包含同义词、近义词、相关概念
2. **覆盖面扩展**
- 从不同角度扩展:功能角度、场景角度、用户角度、问题角度
- 包含长尾词变体:更具体、更细分、更口语化
- 覆盖相关领域:上下游、关联概念、延伸话题
3. **多样性**
- 避免与现有关键词重复或过于相似
- 使用不同的表达方式(口语化、正式、专业等)
- 包含不同长度(短词、长尾词)
4. **质量要求**
- 保持自然、符合用户搜索习惯
- 长度控制在 8-30 字
- 避免生硬拼接
【扩展策略】
1. **同义扩展**:使用同义词替换关键词中的核心词
- 示例:"管理软件""管理系统""业务软件"
2. **场景扩展**:添加使用场景或应用场景
- 示例:"CRM系统""小型企业CRM""跨境电商CRM"
3. **问题扩展**:转换为问题形式
- 示例:"ERP推荐""ERP哪个好""如何选择ERP"
4. **功能扩展**:突出不同功能点
- 示例:"ERP系统""订单管理软件""库存管理ERP"
5. **长尾扩展**:生成更具体的长尾词
- 示例:"CRM软件""适合小企业的CRM软件""支持多语言的CRM系统"
【输出格式】
请严格按照以下 JSON 格式输出,不要添加任何其他内容:
{{
"expanded_keywords": [
"<扩展关键词1>",
"<扩展关键词2>",
...
],
"expansion_stats": {{
"total_expanded": <扩展总数>,
"synonym_count": <同义扩展数量>,
"scenario_count": <场景扩展数量>,
"question_count": <问题扩展数量>,
"feature_count": <功能扩展数量>,
"longtail_count": <长尾扩展数量>
}},
"expansion_details": [
{{
"original": "<原关键词>",
"expanded": ["<扩展词1>", "<扩展词2>"],
"type": "<扩展类型:同义/场景/问题/功能/长尾>"
}},
...
]
}}
【开始扩展】
"""
def expand_keywords(
self,
existing_keywords: List[str],
brand: str,
advantages: str,
expansion_count: int,
llm_chain
) -> Dict:
"""
基于现有关键词进行语义扩展
Args:
existing_keywords: 现有关键词列表
brand: 品牌名称
advantages: 品牌优势
expansion_count: 期望扩展的关键词数量
llm_chain: LangChain 链对象
Returns:
包含扩展关键词、统计信息和详细信息的字典
"""
if not existing_keywords:
return {
"expanded_keywords": [],
"expansion_stats": {
"total_expanded": 0,
"synonym_count": 0,
"scenario_count": 0,
"question_count": 0,
"feature_count": 0,
"longtail_count": 0
},
"expansion_details": []
}
try:
# 限制输入关键词数量,避免 Prompt 过长
keywords_to_expand = existing_keywords[:50] # 最多处理50个关键词
prompt = PromptTemplate.from_template(self.expansion_prompt_template)
chain = prompt | llm_chain | StrOutputParser()
result = chain.invoke({
"existing_keywords": json.dumps(keywords_to_expand, ensure_ascii=False, indent=2),
"brand": brand,
"advantages": advantages,
"expansion_count": expansion_count
})
# 解析结果
expansion_data = self._parse_expansion_result(result, existing_keywords)
return expansion_data
except Exception as e:
# 如果扩展失败,返回基于规则的简单扩展
return self._rule_based_expansion(existing_keywords, expansion_count)
def _parse_expansion_result(self, result: str, original_keywords: List[str]) -> Dict:
"""解析扩展结果"""
# 尝试提取 JSON
json_match = re.search(r'\{.*\}', result, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
# 验证数据结构
if "expanded_keywords" in data:
# 去重和过滤
expanded = self._deduplicate_keywords(
data["expanded_keywords"],
original_keywords
)
data["expanded_keywords"] = expanded
# 更新统计信息
if "expansion_stats" in data:
data["expansion_stats"]["total_expanded"] = len(expanded)
return data
except json.JSONDecodeError:
pass
# 如果无法解析 JSON,尝试从文本中提取
return self._extract_keywords_from_text(result, original_keywords)
def _extract_keywords_from_text(self, text: str, original_keywords: List[str]) -> Dict:
"""从文本中提取关键词(备用方案)"""
# 尝试提取数组
array_match = re.search(r'\[[\s\S]*?\]', text)
if array_match:
try:
keywords = json.loads(array_match.group())
if isinstance(keywords, list):
expanded = self._deduplicate_keywords(keywords, original_keywords)
return {
"expanded_keywords": expanded,
"expansion_stats": {
"total_expanded": len(expanded),
"synonym_count": 0,
"scenario_count": 0,
"question_count": 0,
"feature_count": 0,
"longtail_count": 0
},
"expansion_details": []
}
except json.JSONDecodeError:
pass
# 如果还是无法解析,使用基于规则的扩展
return self._rule_based_expansion(original_keywords, len(original_keywords))
def _deduplicate_keywords(
self,
expanded_keywords: List[str],
original_keywords: List[str],
similarity_threshold: float = 0.85
) -> List[str]:
"""
去重和过滤扩展关键词
Args:
expanded_keywords: 扩展的关键词列表
original_keywords: 原始关键词列表
similarity_threshold: 相似度阈值
Returns:
去重后的关键词列表
"""
if not expanded_keywords:
return []
# 转换为小写用于比较
original_lower = [k.lower() for k in original_keywords]
seen = set(original_lower)
deduplicated = []
for keyword in expanded_keywords:
if not isinstance(keyword, str):
continue
keyword = keyword.strip()
if not keyword or len(keyword) < 3:
continue
keyword_lower = keyword.lower()
# 检查是否与原始关键词重复
if keyword_lower in seen:
continue
# 检查是否与已添加的关键词相似
is_similar = False
for existing in seen:
similarity = SequenceMatcher(None, keyword_lower, existing).ratio()
if similarity >= similarity_threshold:
is_similar = True
break
if not is_similar:
seen.add(keyword_lower)
deduplicated.append(keyword)
return deduplicated
def _rule_based_expansion(
self,
keywords: List[str],
max_expansion: int = 20
) -> Dict:
"""
基于规则的简单扩展(备用方案,不依赖 LLM)
Args:
keywords: 原始关键词列表
max_expansion: 最大扩展数量
Returns:
扩展结果字典
"""
expanded = []
# 简单的扩展规则
question_markers = ["哪个好", "哪家好", "如何选择", "怎么选", "推荐", "排行"]
scenario_markers = ["适合", "适用于", "针对", "面向"]
feature_markers = ["功能", "特点", "优势", "特色"]
for keyword in keywords[:10]: # 限制处理数量
if not keyword or len(keyword) < 3:
continue
# 问题形式扩展
for marker in question_markers[:2]: # 只生成2个问题形式
if marker not in keyword:
expanded.append(f"{keyword}{marker}")
if len(expanded) >= max_expansion:
break
if len(expanded) >= max_expansion:
break
# 场景扩展
for marker in scenario_markers[:1]: # 只生成1个场景形式
if marker not in keyword:
expanded.append(f"{marker}{keyword}")
if len(expanded) >= max_expansion:
break
if len(expanded) >= max_expansion:
break
return {
"expanded_keywords": expanded[:max_expansion],
"expansion_stats": {
"total_expanded": len(expanded[:max_expansion]),
"synonym_count": 0,
"scenario_count": len([k for k in expanded if any(m in k for m in scenario_markers)]),
"question_count": len([k for k in expanded if any(m in k for m in question_markers)]),
"feature_count": 0,
"longtail_count": 0
},
"expansion_details": []
}
def analyze_expansion_coverage(
self,
original_keywords: List[str],
expanded_keywords: List[str]
) -> Dict:
"""
分析扩展的覆盖面
Args:
original_keywords: 原始关键词列表
expanded_keywords: 扩展后的关键词列表
Returns:
覆盖面分析结果
"""
if not original_keywords or not expanded_keywords:
return {
"coverage_ratio": 0.0,
"expansion_ratio": 0.0,
"unique_keywords": 0,
"categories": {}
}
# 计算扩展比例
expansion_ratio = len(expanded_keywords) / len(original_keywords) if original_keywords else 0
# 分析关键词类别(简单分类)
categories = {
"question": len([k for k in expanded_keywords if any(m in k for m in ["哪个", "如何", "怎么", "什么"])]),
"scenario": len([k for k in expanded_keywords if any(m in k for m in ["适合", "适用于", "针对"])]),
"comparison": len([k for k in expanded_keywords if any(m in k for m in ["对比", "比较", "区别"])]),
"feature": len([k for k in expanded_keywords if any(m in k for m in ["功能", "特点", "优势"])]),
"other": 0
}
categories["other"] = len(expanded_keywords) - sum(categories.values())
return {
"coverage_ratio": min(expansion_ratio, 5.0), # 最多5倍扩展
"expansion_ratio": expansion_ratio,
"unique_keywords": len(set(expanded_keywords)),
"categories": categories
}
def merge_keywords(
self,
original_keywords: List[str],
expanded_keywords: List[str],
merge_strategy: str = "append"
) -> List[str]:
"""
合并原始关键词和扩展关键词
Args:
original_keywords: 原始关键词列表
expanded_keywords: 扩展关键词列表
merge_strategy: 合并策略
- "append": 追加扩展关键词到原始列表
- "replace": 用扩展关键词替换原始列表
- "interleave": 交替插入
Returns:
合并后的关键词列表
"""
if merge_strategy == "replace":
return expanded_keywords
elif merge_strategy == "interleave":
# 交替插入
merged = []
max_len = max(len(original_keywords), len(expanded_keywords))
for i in range(max_len):
if i < len(original_keywords):
merged.append(original_keywords[i])
if i < len(expanded_keywords):
merged.append(expanded_keywords[i])
return merged
else: # append
return original_keywords + expanded_keywords