Files
server-configs/siyuan_reader.py
2026-02-13 22:24:27 +08:00

223 lines
7.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
思源笔记阅读器
支持:本地文件读取 / API 读取
"""
import json
import os
from pathlib import Path
from datetime import datetime
# 配置
SIYUAN_DIR = "/root/.openclaw/workspace/siyuan"
DATA_DIR = f"{SIYUAN_DIR}/data"
def get_all_notebooks():
"""获取所有笔记本"""
notebooks = []
notebooks_dir = f"{DATA_DIR}/notebooks"
if not os.path.exists(notebooks_dir):
return []
for name in os.listdir(notebooks_dir):
path = f"{notebooks_dir}/{name}"
if os.path.isdir(path):
notebooks.append({
"id": name,
"path": path
})
return notebooks
def read_sy_file(file_path):
"""读取 .sy 文件并解析"""
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 提取标题
title = data.get("title", "无标题")
# 提取内容块
content_blocks = []
children = data.get("children", [])
for child in children:
block_type = child.get("type", "paragraph")
content = ""
if block_type == "paragraph":
texts = child.get("paragraph", {}).get("rich_text", [])
content = "".join([t.get("plain_text", "") for t in texts])
elif block_type == "heading_1":
texts = child.get("heading_1", {}).get("rich_text", [])
content = "# " + "".join([t.get("plain_text", "") for t in texts])
elif block_type == "heading_2":
texts = child.get("heading_2", {}).get("rich_text", [])
content = "## " + "".join([t.get("plain_text", "") for t in texts])
elif block_type == "heading_3":
texts = child.get("heading_3", {}).get("rich_text", [])
content = "### " + "".join([t.get("plain_text", "") for t in texts])
elif block_type == "bulleted_list_item":
texts = child.get("bulleted_list_item", {}).get("rich_text", [])
content = "" + "".join([t.get("plain_text", "") for t in texts])
elif block_type == "numbered_list_item":
texts = child.get("numbered_list_item", {}).get("rich_text", [])
content = "1. " + "".join([t.get("plain_text", "") for t in texts])
elif block_type == "code_block":
texts = child.get("code_block", {}).get("rich_text", [])
content = "```\n" + "".join([t.get("plain_text", "") for t in texts]) + "\n```"
elif block_type == "quote":
texts = child.get("quote", {}).get("rich_text", [])
content = "> " + "".join([t.get("plain_text", "") for t in texts])
if content.strip():
content_blocks.append(content)
return {
"title": title,
"content": "\n".join(content_blocks),
"blocks": len(content_blocks)
}
except Exception as e:
return {"title": "读取错误", "content": str(e), "blocks": 0}
def get_notebook_content(notebook_id):
"""获取笔记本下所有文档"""
notebook_path = f"{DATA_DIR}/notebooks/{notebook_id}"
if not os.path.exists(notebook_path):
return []
docs = []
for root, dirs, files in os.walk(notebook_path):
for f in files:
if f.endswith(".sy"):
file_path = os.path.join(root, f)
doc = read_sy_file(file_path)
doc["file"] = f
docs.append(doc)
return docs
def search_content(keyword):
"""搜索内容"""
results = []
for notebook in get_all_notebooks():
docs = get_notebook_content(notebook["id"])
for doc in docs:
if keyword.lower() in doc["content"].lower():
results.append({
"notebook": notebook["id"],
"file": doc["file"],
"title": doc["title"],
"match": doc["content"][:200]
})
return results
def generate_report():
"""生成阅读报告"""
print("\n" + "="*70)
print("📚 思源笔记阅读报告")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print("="*70)
notebooks = get_all_notebooks()
print(f"\n📁 发现 {len(notebooks)} 个笔记本\n")
total_docs = 0
total_blocks = 0
for notebook in notebooks:
docs = get_notebook_content(notebook["id"])
total_docs += len(docs)
for doc in docs:
total_blocks += doc["blocks"]
print(f"📄 {doc['file']}: {doc['title']} ({doc['blocks']} 块内容)")
print("\n" + "-"*70)
print(f"📊 统计: {total_docs} 篇文档, {total_blocks} 个内容块")
print("="*70)
def read_notebook(name):
"""读取指定笔记本"""
notebooks = get_all_notebooks()
# 模糊匹配笔记本名
matched = [n for n in notebooks if name.lower() in n["id"].lower()]
if not matched:
print(f"❌ 未找到笔记本: {name}")
return
if len(matched) > 1:
print(f"⚠️ 找到多个匹配的笔记本:")
for m in matched:
print(f" - {m['id']}")
return
notebook = matched[0]
docs = get_notebook_content(notebook["id"])
print(f"\n📚 {notebook['id']}: {len(docs)} 篇文档\n")
for doc in docs:
print("="*60)
print(f"📄 {doc['title']}")
print("="*60)
print(doc["content"])
print()
def main():
import sys
if len(sys.argv) < 2:
generate_report()
return
command = sys.argv[1]
if command == "list":
# 列出所有笔记本
notebooks = get_all_notebooks()
print(f"\n📚 笔记本列表 ({len(notebooks)} 个)\n")
for n in notebooks:
docs = get_notebook_content(n["id"])
print(f" 📁 {n['id']}: {len(docs)} 篇文档")
elif command == "read":
# 读取指定笔记本
if len(sys.argv) < 3:
print("用法: python siyuan_reader.py read <笔记本名>")
return
name = sys.argv[2]
read_notebook(name)
elif command == "search":
# 搜索内容
if len(sys.argv) < 3:
print("用法: python siyuan_reader.py search <关键词>")
return
keyword = sys.argv[2]
results = search_content(keyword)
print(f"\n🔍 搜索 '{keyword}': {len(results)} 条结果\n")
for r in results:
print(f" 📄 {r['file']}: {r['title']}")
print(f" {r['match'][:100]}...")
else:
print("用法:")
print(" python siyuan_reader.py # 生成报告")
print(" python siyuan_reader.py list # 列出笔记本")
print(" python siyuan_reader.py read <名> # 读取笔记本")
print(" python siyuan_reader.py search <词> # 搜索内容")
if __name__ == "__main__":
main()