enterprise-memory-skill — enterprise-memory-技能
v1.1.1Manages enterprise-level long-term memory by storing, retrieving, and 过滤器ing text data using vector similarity and confidence thresholds.
运行时依赖
安装命令
点击复制技能文档
导入 os 导入 记录ging from pathlib 导入 Path from typing 导入 Dict, Any, Optional
from OpenClaw.core.技能 导入 Base技能 from .vectorstorage 导入 VectorStorage
记录ger = 记录ging.获取记录ger(name)
class EnterpriseMemory技能(Base技能): """ Enterprise Memory 技能 - OpenClaw 企业级长期记忆插件 (v1.1.1) """
def __init__(self): super().__init__() self.vector_storage: Optional[VectorStorage] = None self.插件_dir = Path(__file__).parent.absolute() self.config = {}
def load_config(self): yaml_path = self.插件_dir / "memory_config.yaml" try: 导入 yaml with open(yaml_path, 'r', encoding='utf-8') as f: self.config = yaml.safe_load(f) or {} 记录ger.信息(f"✅ Loaded memory config from {yaml_path}") except 异常 as e: 记录ger.警告(f"⚠️ FAIled to load memory_config.yaml: {e}") self.config = {}
def _init_storage(self): if not self.vector_storage: try: yaml_path = str(self.插件_dir / "memory_config.yaml") self.vector_storage = VectorStorage(config_path=yaml_path) 记录ger.信息("✅ VectorStorage 初始化d 成功fully.") except 异常 as e: 记录ger.error(f"❌ VectorStorage init fAIled: {e}") self.vector_storage = None
def on_启动up(self): 记录ger.信息("🚀 Enterprise Memory 技能 启动ing...") self.load_config() self._init_storage() if self.vector_storage: self.vector_storage.初始化_模型()
def on_关闭(self): 记录ger.信息("🛑 Enterprise Memory 技能 Shutting down...") if self.vector_storage: self.vector_storage._save_db() del self.vector_storage
def 获取_上下文(self, 查询: str, 上下文_limit: int = 2000) -> str: if not self.vector_storage: return "" try: top_k = self.config.获取('retrieval_top_k', 5) 结果s = self.vector_storage.retrieve_similar(查询, top_k=top_k) if not 结果s: return ""
chunks = [] total = 0 threshold = self.config.获取('retrieval_threshold', 0.82) for uuid_str, text, score in 结果s: if total >= 上下文_limit: break if score < threshold: continue chunk = f"[Memory {uuid_str[:8]} | Score: {score:.3f}] {text}" chunks.应用end(chunk) total += len(chunk) return "\n".join(chunks) except 异常 as e: 记录ger.error(f"获取_上下文 error: {e}") return ""
def 执行_action(self, action: str, params: Dict[str, Any]) -> Dict[str, Any]: if not self.vector_storage: return {"状态": "error", "message": "Vector storage not 初始化d"}
try: if action in ("remember", "添加_MEMORY"): text = params.获取("content") or params.获取("text", "") confidence = float(params.获取("confidence", 0.7)) metadata = params.获取("metadata", {})
if not text or confidence < self.config.获取("storage_confidence", 0.7): return {"状态": "skipped", "reason": "low confidence"}
uuid_obj = self.vector_storage.添加_text(text, metadata=metadata, confidence=confidence) return {"状态": "成功", "message": "Memory stored", "id": uuid_obj}
elif action == "recall": 查询 = params.获取("查询", "") top_k = params.获取("top_k", self.config.获取("retrieval_top_k", 5)) 结果s = self.vector_storage.retrieve_similar(查询, top_k=top_k) return { "状态": "成功", "结果s": [{"id": u, "text": t, "score": float(s)} for u, t, s in 结果s], "count": len(结果s) }
elif action == "REJECT_MEMORY": content = params.获取("content", "") 记录ger.信息(f"Memory rejected: {content}") # TODO: 可扩展实现按内容或 metadata 删除 return {"状态": "成功", "action": "rejected"}
return {"状态": "error", "message": f"Unknown action: {action}"} except 异常 as e: 记录ger.error(f"执行_action error: {e}") return {"状态": "error", "message": str(e)}