#!/usr/bin/env python3 """ 思源笔记阅读器 支持:本地文件读取 / API 读取 """ import json import os from pathlib import Path from datetime import datetime # 配置 SIYUAN_DIR = "/root/.openclaw/workspace/siyuan" DATA_DIR = f"{SIYUAN_DIR}/data" def get_all_notebooks(): """获取所有笔记本""" notebooks = [] notebooks_dir = f"{DATA_DIR}/notebooks" if not os.path.exists(notebooks_dir): return [] for name in os.listdir(notebooks_dir): path = f"{notebooks_dir}/{name}" if os.path.isdir(path): notebooks.append({ "id": name, "path": path }) return notebooks def read_sy_file(file_path): """读取 .sy 文件并解析""" try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) # 提取标题 title = data.get("title", "无标题") # 提取内容块 content_blocks = [] children = data.get("children", []) for child in children: block_type = child.get("type", "paragraph") content = "" if block_type == "paragraph": texts = child.get("paragraph", {}).get("rich_text", []) content = "".join([t.get("plain_text", "") for t in texts]) elif block_type == "heading_1": texts = child.get("heading_1", {}).get("rich_text", []) content = "# " + "".join([t.get("plain_text", "") for t in texts]) elif block_type == "heading_2": texts = child.get("heading_2", {}).get("rich_text", []) content = "## " + "".join([t.get("plain_text", "") for t in texts]) elif block_type == "heading_3": texts = child.get("heading_3", {}).get("rich_text", []) content = "### " + "".join([t.get("plain_text", "") for t in texts]) elif block_type == "bulleted_list_item": texts = child.get("bulleted_list_item", {}).get("rich_text", []) content = "• " + "".join([t.get("plain_text", "") for t in texts]) elif block_type == "numbered_list_item": texts = child.get("numbered_list_item", {}).get("rich_text", []) content = "1. " + "".join([t.get("plain_text", "") for t in texts]) elif block_type == "code_block": texts = child.get("code_block", {}).get("rich_text", []) content = "```\n" + "".join([t.get("plain_text", "") for t in texts]) + "\n```" elif block_type == "quote": texts = child.get("quote", {}).get("rich_text", []) content = "> " + "".join([t.get("plain_text", "") for t in texts]) if content.strip(): content_blocks.append(content) return { "title": title, "content": "\n".join(content_blocks), "blocks": len(content_blocks) } except Exception as e: return {"title": "读取错误", "content": str(e), "blocks": 0} def get_notebook_content(notebook_id): """获取笔记本下所有文档""" notebook_path = f"{DATA_DIR}/notebooks/{notebook_id}" if not os.path.exists(notebook_path): return [] docs = [] for root, dirs, files in os.walk(notebook_path): for f in files: if f.endswith(".sy"): file_path = os.path.join(root, f) doc = read_sy_file(file_path) doc["file"] = f docs.append(doc) return docs def search_content(keyword): """搜索内容""" results = [] for notebook in get_all_notebooks(): docs = get_notebook_content(notebook["id"]) for doc in docs: if keyword.lower() in doc["content"].lower(): results.append({ "notebook": notebook["id"], "file": doc["file"], "title": doc["title"], "match": doc["content"][:200] }) return results def generate_report(): """生成阅读报告""" print("\n" + "="*70) print("📚 思源笔记阅读报告") print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}") print("="*70) notebooks = get_all_notebooks() print(f"\n📁 发现 {len(notebooks)} 个笔记本\n") total_docs = 0 total_blocks = 0 for notebook in notebooks: docs = get_notebook_content(notebook["id"]) total_docs += len(docs) for doc in docs: total_blocks += doc["blocks"] print(f"📄 {doc['file']}: {doc['title']} ({doc['blocks']} 块内容)") print("\n" + "-"*70) print(f"📊 统计: {total_docs} 篇文档, {total_blocks} 个内容块") print("="*70) def read_notebook(name): """读取指定笔记本""" notebooks = get_all_notebooks() # 模糊匹配笔记本名 matched = [n for n in notebooks if name.lower() in n["id"].lower()] if not matched: print(f"❌ 未找到笔记本: {name}") return if len(matched) > 1: print(f"⚠️ 找到多个匹配的笔记本:") for m in matched: print(f" - {m['id']}") return notebook = matched[0] docs = get_notebook_content(notebook["id"]) print(f"\n📚 {notebook['id']}: {len(docs)} 篇文档\n") for doc in docs: print("="*60) print(f"📄 {doc['title']}") print("="*60) print(doc["content"]) print() def main(): import sys if len(sys.argv) < 2: generate_report() return command = sys.argv[1] if command == "list": # 列出所有笔记本 notebooks = get_all_notebooks() print(f"\n📚 笔记本列表 ({len(notebooks)} 个)\n") for n in notebooks: docs = get_notebook_content(n["id"]) print(f" 📁 {n['id']}: {len(docs)} 篇文档") elif command == "read": # 读取指定笔记本 if len(sys.argv) < 3: print("用法: python siyuan_reader.py read <笔记本名>") return name = sys.argv[2] read_notebook(name) elif command == "search": # 搜索内容 if len(sys.argv) < 3: print("用法: python siyuan_reader.py search <关键词>") return keyword = sys.argv[2] results = search_content(keyword) print(f"\n🔍 搜索 '{keyword}': {len(results)} 条结果\n") for r in results: print(f" 📄 {r['file']}: {r['title']}") print(f" {r['match'][:100]}...") else: print("用法:") print(" python siyuan_reader.py # 生成报告") print(" python siyuan_reader.py list # 列出笔记本") print(" python siyuan_reader.py read <名> # 读取笔记本") print(" python siyuan_reader.py search <词> # 搜索内容") if __name__ == "__main__": main()