Merge feat/pg-memory-bridge: PostgreSQL memory bridge
This commit is contained in:
commit
236c83a5f3
1
.gitignore
vendored
1
.gitignore
vendored
@ -15,6 +15,7 @@ texts/
|
||||
!skills/**/*.md
|
||||
!finetune/*.md
|
||||
!docs/*.md
|
||||
!docs/**/*.md
|
||||
finetune/outputs/
|
||||
finetune/data/train/
|
||||
.claude/
|
||||
|
||||
14
CHANGELOG.md
14
CHANGELOG.md
@ -4,6 +4,20 @@
|
||||
|
||||
### Changes
|
||||
|
||||
- Backend: add an optional **PostgreSQL memory bridge** so qmd can act as a
|
||||
shared, namespaced, persistent memory store for external agents (OpenClaw,
|
||||
Hermes, …) on top of `pgvector` + `pg_jieba` + `pg_trgm` (e.g. the
|
||||
`postgresql.svc.plus` runtime). The local SQLite document workflow is the
|
||||
default and is unchanged. Enable with `QMD_BACKEND=pg` and `QMD_PG_URL`.
|
||||
- CLI: `qmd memory add|search|get|rm|ls|namespaces` and `qmd pg status`.
|
||||
- MCP: when `QMD_BACKEND=pg`, `qmd mcp` also exposes `memory_add`,
|
||||
`memory_search`, `memory_get`, and `memory_list` tools alongside the
|
||||
existing document-search tools.
|
||||
- Search is hybrid: `pg_jieba/tsvector` lexical + `pgvector` cosine, fused
|
||||
with Reciprocal Rank Fusion; embeddings reuse the same external embedding
|
||||
API as the SQLite engine. Memory is isolated per `QMD_NAMESPACE`.
|
||||
- See `docs/plan/pg-memory-bridge-usage.md` for configuration and
|
||||
`docs/plan/pg-backend-memory-bridge.md` for the design.
|
||||
- CLI: add `qmd sync` for SSH/rsync-based QMD source-file and YAML-config
|
||||
synchronization between a local machine and remote QMD host. The sync path
|
||||
uses resumable rsync transfers, conflict-copy preservation, and keeps SQLite
|
||||
|
||||
21
CLAUDE.md
21
CLAUDE.md
@ -115,6 +115,27 @@ qmd multi-get "#abc123, #def456"
|
||||
--json, --csv, --md, --xml, --files
|
||||
```
|
||||
|
||||
## PostgreSQL memory bridge (optional)
|
||||
|
||||
QMD can additionally run as a shared, namespaced memory store backed by
|
||||
PostgreSQL (`pgvector` + `pg_jieba` + `pg_trgm`, e.g. `postgresql.svc.plus`) for
|
||||
external agents (OpenClaw, Hermes). SQLite remains the default and is unchanged.
|
||||
|
||||
```sh
|
||||
export QMD_BACKEND=pg
|
||||
export QMD_PG_URL='postgres://user:pass@host:5443/db' # stunnel TLS port
|
||||
export QMD_NAMESPACE=openclaw # tenant isolation
|
||||
|
||||
qmd pg status # backend health
|
||||
qmd memory add <key> "text" --title T # store/replace (text or stdin)
|
||||
qmd memory search <query> # hybrid FTS(pg_jieba)+vector(pgvector), RRF
|
||||
qmd memory get|rm|ls|namespaces
|
||||
```
|
||||
|
||||
When `QMD_BACKEND=pg`, `qmd mcp` also exposes `memory_add/memory_search/
|
||||
memory_get/memory_list` tools. Code lives in `src/pg/`. Design + usage:
|
||||
`docs/plan/pg-backend-memory-bridge.md`, `docs/plan/pg-memory-bridge-usage.md`.
|
||||
|
||||
## Development
|
||||
|
||||
```sh
|
||||
|
||||
46
bun.lock
46
bun.lock
@ -9,6 +9,7 @@
|
||||
"better-sqlite3": "12.8.0",
|
||||
"fast-glob": "3.3.3",
|
||||
"node-llama-cpp": "3.18.1",
|
||||
"pg": "8.13.1",
|
||||
"picomatch": "4.0.4",
|
||||
"sqlite-vec": "0.1.9",
|
||||
"web-tree-sitter": "0.26.7",
|
||||
@ -17,6 +18,7 @@
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/better-sqlite3": "7.6.13",
|
||||
"@types/pg": "8.11.10",
|
||||
"tsx": "4.21.0",
|
||||
"vitest": "3.2.4",
|
||||
},
|
||||
@ -215,6 +217,8 @@
|
||||
|
||||
"@types/node": ["@types/node@24.10.1", "", { "dependencies": { "undici-types": "~7.16.0" } }, "sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ=="],
|
||||
|
||||
"@types/pg": ["@types/pg@8.11.10", "", { "dependencies": { "@types/node": "*", "pg-protocol": "*", "pg-types": "^4.0.1" } }, "sha512-LczQUW4dbOQzsH2RQ5qoeJ6qJPdrcM/DcMLoqWQkMLMsq83J5lAX3LXjdkWdpscFy67JSOWDnh7Ny/sPFykmkg=="],
|
||||
|
||||
"@vitest/expect": ["@vitest/expect@3.2.4", "", { "dependencies": { "@types/chai": "^5.2.2", "@vitest/spy": "3.2.4", "@vitest/utils": "3.2.4", "chai": "^5.2.0", "tinyrainbow": "^2.0.0" } }, "sha512-Io0yyORnB6sikFlt8QW5K7slY4OjqNX9jmJQ02QDda8lyM6B5oNgVWoSoKPac8/kgnCUzuHQKrSLtu/uOqqrig=="],
|
||||
|
||||
"@vitest/mocker": ["@vitest/mocker@3.2.4", "", { "dependencies": { "@vitest/spy": "3.2.4", "estree-walker": "^3.0.3", "magic-string": "^0.30.17" }, "peerDependencies": { "msw": "^2.4.9", "vite": "^5.0.0 || ^6.0.0 || ^7.0.0-0" }, "optionalPeers": ["msw", "vite"] }, "sha512-46ryTE9RZO/rfDd7pEqFl7etuyzekzEhUbTW3BvmeO/BcCMEgq59BKhek3dXDWgAj4oMK6OZi+vRr1wPW6qjEQ=="],
|
||||
@ -521,6 +525,8 @@
|
||||
|
||||
"object-inspect": ["object-inspect@1.13.4", "", {}, "sha512-W67iLl4J2EXEGTbfeHCffrjDfitvLANg0UlX3wFUUSTx92KXRFegMHUVgSqE+wvhAbi4WqjGg9czysTV2Epbew=="],
|
||||
|
||||
"obuf": ["obuf@1.1.2", "", {}, "sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg=="],
|
||||
|
||||
"on-finished": ["on-finished@2.4.1", "", { "dependencies": { "ee-first": "1.1.1" } }, "sha512-oVlzkg3ENAhCk2zdv7IJwd/QUD4z2RxRwpkcGY8psCVcCYZNq4wYnVWALHM+brtuJjePWiYF/ClmuDr8Ch5+kg=="],
|
||||
|
||||
"once": ["once@1.4.0", "", { "dependencies": { "wrappy": "1" } }, "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w=="],
|
||||
@ -541,6 +547,24 @@
|
||||
|
||||
"pathval": ["pathval@2.0.1", "", {}, "sha512-//nshmD55c46FuFw26xV/xFAaB5HF9Xdap7HJBBnrKdAd6/GxDBaNA1870O79+9ueg61cZLSVc+OaFlfmObYVQ=="],
|
||||
|
||||
"pg": ["pg@8.13.1", "", { "dependencies": { "pg-connection-string": "^2.7.0", "pg-pool": "^3.7.0", "pg-protocol": "^1.7.0", "pg-types": "^2.1.0", "pgpass": "1.x" }, "optionalDependencies": { "pg-cloudflare": "^1.1.1" }, "peerDependencies": { "pg-native": ">=3.0.1" }, "optionalPeers": ["pg-native"] }, "sha512-OUir1A0rPNZlX//c7ksiu7crsGZTKSOXJPgtNiHGIlC9H0lO+NC6ZDYksSgBYY/thSWhnSRBv8w1lieNNGATNQ=="],
|
||||
|
||||
"pg-cloudflare": ["pg-cloudflare@1.4.0", "", {}, "sha512-Vo7z/6rrQYxpNRylp4Tlob2elzbh+N/MOQbxFVWCxS7oEx6jF53GTJFxK2WWpKuBRkmiin4Mt+xofFDjx09R0A=="],
|
||||
|
||||
"pg-connection-string": ["pg-connection-string@2.14.0", "", {}, "sha512-XwWDGcLRGCXAR8F/AM5bG7Q+A3Wm2s6QeEjlOKZLlH3UYcguiqCWKyWXVag5TLTIjR7oOJUY8kcADaZgWPyLeg=="],
|
||||
|
||||
"pg-int8": ["pg-int8@1.0.1", "", {}, "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw=="],
|
||||
|
||||
"pg-numeric": ["pg-numeric@1.0.2", "", {}, "sha512-BM/Thnrw5jm2kKLE5uJkXqqExRUY/toLHda65XgFTBTFYZyopbKjBe29Ii3RbkvlsMoFwD+tHeGaCjjv0gHlyw=="],
|
||||
|
||||
"pg-pool": ["pg-pool@3.14.0", "", { "peerDependencies": { "pg": ">=8.0" } }, "sha512-gKtPkFdQPU3DksooVLi9LsjZxrsBUZIpa+7aVx+LV5pNh0KzP4Zleud2po+ConrxbuXGBJ6Hfer6hdgpIBpBaw=="],
|
||||
|
||||
"pg-protocol": ["pg-protocol@1.15.0", "", {}, "sha512-cq9sECI5s0+uPUXjbz8ioyPJni6RzsRib0US67i5IoTZKw8fNeYlVE7u8F4dG7vEJJtc5wdD1K189lCCUwqWTQ=="],
|
||||
|
||||
"pg-types": ["pg-types@4.1.0", "", { "dependencies": { "pg-int8": "1.0.1", "pg-numeric": "1.0.2", "postgres-array": "~3.0.1", "postgres-bytea": "~3.0.0", "postgres-date": "~2.1.0", "postgres-interval": "^3.0.0", "postgres-range": "^1.1.1" } }, "sha512-o2XFanIMy/3+mThw69O8d4n1E5zsLhdO+OPqswezu7Z5ekP4hYDqlDjlmOpYMbzY2Br0ufCwJLdDIXeNVwcWFg=="],
|
||||
|
||||
"pgpass": ["pgpass@1.0.6", "", {}, "sha512-lqIfH7bdgsxHAY/ZnUOwm+aCFKrsHBDhSFuk9O0B9uCqJAIkrKTo/+LQqLPLUS4e04+jCmQVikxE3QipH5chPw=="],
|
||||
|
||||
"picocolors": ["picocolors@1.1.1", "", {}, "sha512-xceH2snhtb5M9liqDsmEw56le376mTZkEX/jEb/RxNFyegNul7eNslCXP9FDj/Lcu0X8KEyMceP2ntpaHrDEVA=="],
|
||||
|
||||
"picomatch": ["picomatch@4.0.4", "", {}, "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A=="],
|
||||
@ -549,6 +573,16 @@
|
||||
|
||||
"postcss": ["postcss@8.5.6", "", { "dependencies": { "nanoid": "^3.3.11", "picocolors": "^1.1.1", "source-map-js": "^1.2.1" } }, "sha512-3Ybi1tAuwAP9s0r1UQ2J4n5Y0G05bJkpUIO0/bI9MhwmD70S5aTWbXGBwxHrelT+XM1k6dM0pk+SwNkpTRN7Pg=="],
|
||||
|
||||
"postgres-array": ["postgres-array@3.0.4", "", {}, "sha512-nAUSGfSDGOaOAEGwqsRY27GPOea7CNipJPOA7lPbdEpx5Kg3qzdP0AaWC5MlhTWV9s4hFX39nomVZ+C4tnGOJQ=="],
|
||||
|
||||
"postgres-bytea": ["postgres-bytea@3.0.0", "", { "dependencies": { "obuf": "~1.1.2" } }, "sha512-CNd4jim9RFPkObHSjVHlVrxoVQXz7quwNFpz7RY1okNNme49+sVyiTvTRobiLV548Hx/hb1BG+iE7h9493WzFw=="],
|
||||
|
||||
"postgres-date": ["postgres-date@2.1.0", "", {}, "sha512-K7Juri8gtgXVcDfZttFKVmhglp7epKb1K4pgrkLxehjqkrgPhfG6OO8LHLkfaqkbpjNRnra018XwAr1yQFWGcA=="],
|
||||
|
||||
"postgres-interval": ["postgres-interval@3.0.0", "", {}, "sha512-BSNDnbyZCXSxgA+1f5UU2GmwhoI0aU5yMxRGO8CdFEcY2BQF9xm/7MqKnYoM1nJDk8nONNWDk9WeSmePFhQdlw=="],
|
||||
|
||||
"postgres-range": ["postgres-range@1.1.4", "", {}, "sha512-i/hbxIE9803Alj/6ytL7UHQxRvZkI9O4Sy+J3HGc4F4oo/2eQAjTSNJ0bfxyse3bH0nuVesCk+3IRLaMtG3H6w=="],
|
||||
|
||||
"prebuild-install": ["prebuild-install@7.1.3", "", { "dependencies": { "detect-libc": "^2.0.0", "expand-template": "^2.0.3", "github-from-package": "0.0.0", "minimist": "^1.2.3", "mkdirp-classic": "^0.5.3", "napi-build-utils": "^2.0.0", "node-abi": "^3.3.0", "pump": "^3.0.0", "rc": "^1.2.7", "simple-get": "^4.0.0", "tar-fs": "^2.0.0", "tunnel-agent": "^0.6.0" }, "bin": { "prebuild-install": "bin.js" } }, "sha512-8Mf2cbV7x1cXPUILADGI3wuhfqWvtiLA1iclTDbFRZkgRQS0NqsPZphna9V+HyTEadheuPmjaJMsbzKQFOzLug=="],
|
||||
|
||||
"pretty-bytes": ["pretty-bytes@6.1.1", "", {}, "sha512-mQUvGU6aUFQ+rNvTIAcZuWGRT9a6f6Yrg9bHs4ImKF+HZCEK+plBvnAZYSIQztknZF2qnzNtr6F8s0+IuptdlQ=="],
|
||||
@ -735,6 +769,8 @@
|
||||
|
||||
"wrappy": ["wrappy@1.0.2", "", {}, "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ=="],
|
||||
|
||||
"xtend": ["xtend@4.0.2", "", {}, "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ=="],
|
||||
|
||||
"y18n": ["y18n@5.0.8", "", {}, "sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA=="],
|
||||
|
||||
"yallist": ["yallist@5.0.0", "", {}, "sha512-YgvUTfwqyc7UXVMrB+SImsVYSmTS8X/tSrtdNZMImM+n7+QTriRXyXim0mBrTXNeqzVF0KWGgHPeiyViFFrNDw=="],
|
||||
@ -777,6 +813,8 @@
|
||||
|
||||
"ora/cli-spinners": ["cli-spinners@3.4.0", "", {}, "sha512-bXfOC4QcT1tKXGorxL3wbJm6XJPDqEnij2gQ2m7ESQuE+/z9YFIWnl/5RpTiKWbMq3EVKR4fRLJGn6DVfu0mpw=="],
|
||||
|
||||
"pg/pg-types": ["pg-types@2.2.0", "", { "dependencies": { "pg-int8": "1.0.1", "postgres-array": "~2.0.0", "postgres-bytea": "~1.0.0", "postgres-date": "~1.0.4", "postgres-interval": "^1.1.0" } }, "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA=="],
|
||||
|
||||
"postcss/nanoid": ["nanoid@3.3.11", "", { "bin": { "nanoid": "bin/nanoid.cjs" } }, "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w=="],
|
||||
|
||||
"proper-lockfile/retry": ["retry@0.12.0", "", {}, "sha512-9LkiTwjUh6rT555DtE9rTX+BKByPfrMzEAtnlEtdEwr3Nkffwiihqe2bWADg+OQRjt9gl6ICdmB/ZFDCGAtSow=="],
|
||||
@ -815,6 +853,14 @@
|
||||
|
||||
"ipull/pretty-ms/parse-ms": ["parse-ms@3.0.0", "", {}, "sha512-Tpb8Z7r7XbbtBTrM9UhpkzzaMrqA2VXMT3YChzYltwV3P3pM6t8wl7TvpMnSTosz1aQAdVib7kdoys7vYOPerw=="],
|
||||
|
||||
"pg/pg-types/postgres-array": ["postgres-array@2.0.0", "", {}, "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA=="],
|
||||
|
||||
"pg/pg-types/postgres-bytea": ["postgres-bytea@1.0.1", "", {}, "sha512-5+5HqXnsZPE65IJZSMkZtURARZelel2oXUEO8rH83VS/hxH5vv1uHquPg5wZs8yMAfdv971IU+kcPUczi7NVBQ=="],
|
||||
|
||||
"pg/pg-types/postgres-date": ["postgres-date@1.0.7", "", {}, "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q=="],
|
||||
|
||||
"pg/pg-types/postgres-interval": ["postgres-interval@1.2.0", "", { "dependencies": { "xtend": "^4.0.0" } }, "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ=="],
|
||||
|
||||
"stdout-update/string-width/get-east-asian-width": ["get-east-asian-width@1.4.0", "", {}, "sha512-QZjmEOC+IT1uk6Rx0sX22V6uHWVwbdbxf1faPqJ1QhLdGgsRGCZoyaQBm/piRdJy/D2um6hM1UP7ZEeQ4EkP+Q=="],
|
||||
|
||||
"wrap-ansi/string-width/emoji-regex": ["emoji-regex@8.0.0", "", {}, "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A=="],
|
||||
|
||||
104
docs/plan/pg-backend-memory-bridge.md
Normal file
104
docs/plan/pg-backend-memory-bridge.md
Normal file
@ -0,0 +1,104 @@
|
||||
# 规划:qmd 接入 PostgreSQL,成为 OpenClaw + Hermes 的记忆桥梁
|
||||
|
||||
> 状态:草案(待评审) · 日期:2026-06-22
|
||||
> 范围决策:**PG 作为可切换的替代后端**(SQLite 保持默认且完全不变);桥梁需要 **写入 + 命名空间隔离**。
|
||||
|
||||
## 1. 现状与判断(已核实)
|
||||
|
||||
**qmd**(`@tobilu/qmd` 2.1.0)是本地混合检索引擎:
|
||||
|
||||
- 存储层两文件:
|
||||
- `src/db.ts` —— 跨运行时 **同步** SQLite 抽象,窄接口 `exec / prepare→{run,get,all} / transaction / loadExtension / close`。
|
||||
- `src/store.ts` —— 4690 行,承载所有数据访问 + 检索逻辑。
|
||||
- `Store` 是 `createStore()`(`src/store.ts:1604`)构造的门面对象,约 50 个方法。**今天只有 3 个是 async**(`searchVec / expandQuery / rerank`,受 LLM 限制),其余全部同步。
|
||||
- Schema 强 SQLite 方言:
|
||||
- `content`(内容寻址,source of truth)
|
||||
- `documents`(虚拟路径 → 内容 hash 映射)
|
||||
- `content_vectors` + `vectors_vec`(sqlite-vec `vec0` 虚拟表)
|
||||
- `documents_fts`(FTS5 BM25 + 触发器)
|
||||
- `llm_cache` / `store_collections` / `store_config`
|
||||
- 已暴露 MCP(stdio / http),目前以 **只读检索** 为主(约 4 个 tool)。
|
||||
|
||||
**postgresql.svc.plus** 恰好提供桥梁所需全部扩展:**pgvector**(向量)、**pg_jieba + pg_trgm**(中文分词 + 全文/模糊)、**pgmq**(轻量消息队列)、pg_cron。经 **stunnel TLS 5443** 接入。
|
||||
|
||||
> **关键结论**:**不能在 `Database` 驱动层做适配**。
|
||||
> 原因:(1) node-postgres 是异步,而 `Database` 接口是同步;(2) FTS5 / `vec0` / `PRAGMA` / 触发器都是 SQLite 专有 SQL,换驱动也必须重写 SQL。
|
||||
> 正确切口是 **把 `Store` 抽象成接口,做并列的后端实现**。
|
||||
|
||||
## 2. 目标架构(记忆桥梁)
|
||||
|
||||
```
|
||||
OpenClaw agent ─┐ ┌─ pgvector (语义检索)
|
||||
├─ qmd MCP (stdio/http) ──→ │ pg_jieba+pg_trgm (中文全文/模糊)
|
||||
Hermes agent ──┘ memory_* + search 工具 └─ pgmq (异步入库队列, 可选)
|
||||
│ postgresql.svc.plus (stunnel 5443 TLS)
|
||||
backend=pg ↔ backend=sqlite (默认, 不变)
|
||||
```
|
||||
|
||||
- 默认仍是 `~/.cache/qmd/index.sqlite`;设 `QMD_BACKEND=pg` + 连接串即切到共享 PG。
|
||||
- 多 agent 通过 **namespace**(openclaw / hermes / …)隔离,共用同一 PG 记忆库。
|
||||
- 价值:记忆从“单机 SQLite”升级为“跨 agent、跨主机、持久共享”的中心化记忆。
|
||||
|
||||
## 3. 后端抽象设计(核心)
|
||||
|
||||
### 3.1 提升 `Store` 为 `IStore` 接口,统一异步
|
||||
|
||||
- 现有 `createStore` 原样保留为 `createSqliteStore`,**内部逻辑一行不改**,仅把同步方法用 `Promise.resolve(...)` 包一层以满足异步签名 → 满足“原有功能不变”,靠现有测试全绿守住回归。
|
||||
- 新增工厂 `createStore({ backend, url, namespace })`,按 `QMD_BACKEND` / config 分发到 sqlite 或 pg。
|
||||
- CLI(`src/cli/qmd.ts`)与 MCP(`src/mcp/server.ts`)调用处统一 `await`(绝大多数本就在 async 上下文)。
|
||||
|
||||
### 3.2 新增 `createPgStore`
|
||||
|
||||
- 新文件 `src/store-pg.ts` + `src/db-pg.ts`,使用 `pg` 连接池 + TLS。
|
||||
- 实现同一 `IStore`,对外行为与 SQLite 后端对齐。
|
||||
|
||||
## 4. Schema 映射(SQLite → PG 扩展)
|
||||
|
||||
| SQLite 现状 | PG 实现 | 备注 |
|
||||
|---|---|---|
|
||||
| `content` / `documents` / `llm_cache` / `store_collections` / `store_config` | 普通表,结构基本一致 | 加 `namespace` 列 |
|
||||
| `content_vectors` + `vectors_vec`(vec0) | 单表 `vector` 列(**pgvector**)+ HNSW `vector_cosine_ops` 索引 | 维度由外部 embedding API 决定 |
|
||||
| `documents_fts`(FTS5 BM25) + 触发器 | `tsvector` 生成列(**pg_jieba** 中文 + english 双配置)+ GIN 索引;`pg_trgm` 供模糊 | ⚠️ `ts_rank` ≠ BM25,打分语义不同 |
|
||||
| docid = hash 前 6 位 | 应用层不变 | — |
|
||||
| 触发器维护 FTS | PG 生成列 / 触发器 | — |
|
||||
|
||||
## 5. 记忆桥梁能力(写入 + 命名空间)
|
||||
|
||||
- 所有表加 `namespace TEXT NOT NULL`;`store({ namespace })` 注入;查询/写入自动按 namespace 过滤。OpenClaw、Hermes 各自隔离,亦可读共享 namespace。
|
||||
- 新增 MCP 写入工具:
|
||||
- `memory_add` —— 写入一条记忆(内容寻址 + 分块 + 嵌入)
|
||||
- `memory_search` —— 即现有 hybrid query,带 namespace
|
||||
- `memory_get`
|
||||
- 读路径复用现有 search / query / get。
|
||||
- 可选:用 **pgmq** 做异步入库(agent 高频写入先入队,后台批量 embed),避免阻塞 agent。
|
||||
|
||||
## 6. 连接与配置
|
||||
|
||||
- 连接串:`QMD_PG_URL` / `DATABASE_URL`,或 `qmd config set backend pg / pg-url ...`。
|
||||
- TLS:走 postgresql.svc.plus 的 stunnel `5443`(`sslmode` / CA 配置)。`pg` 在 Bun 与 Node 均可用。
|
||||
- **嵌入向量必须用共享的外部 OpenAI 兼容 API**(qmd 已支持),保证跨主机维度/模型一致;本地 node-llama-cpp 仅用于 SQLite 单机场景。
|
||||
- `qmd status` 显示当前 backend / namespace / PG 健康。
|
||||
|
||||
## 7. 分阶段实施
|
||||
|
||||
| 阶段 | 内容 | 验收 |
|
||||
|---|---|---|
|
||||
| **P1 接口化** | `Store→IStore` 异步化 + `createSqliteStore` 包装 + backend 工厂 + 调用处 await | **全量测试在 SQLite 路径零回归** |
|
||||
| **P2 PgStore** | `db-pg.ts` + `store-pg.ts`,DDL 引导(pgvector/pg_jieba/pg_trgm),实现全部 `IStore` 方法 | PgStore 通过与 SQLite 相同的单测子集 |
|
||||
| **P3 配置接入** | env/config 选择 PG,连 postgresql.svc.plus,`qmd status` 展示后端 | 真实连通 + 健康检查 |
|
||||
| **P4 桥梁** | `namespace` 隔离 + `memory_add/search/get` MCP 工具(+ 可选 pgmq 异步入库) | 双 agent 命名空间隔离可验证 |
|
||||
| **P5 测试与文档** | 用 postgresql.svc.plus 镜像起 PG 跑集成/对等测试(SQLite vs PG 结果对比),更新 README/CLAUDE.md/CHANGELOG | 对等测试 + 文档 |
|
||||
|
||||
## 8. 主要风险
|
||||
|
||||
1. **打分语义漂移**:PG `ts_rank` 非 BM25 → 排序差异;靠 RRF 融合 + reranker 缓解(可后续评估 ParadeDB BM25)。
|
||||
2. **同步→异步重构面广**:CLI/MCP 多处调用 → 回归风险;以 SQLite 路径全测试守住。
|
||||
3. **远程延迟**:远程 PG over TLS vs 本地 SQLite,每查询多次往返;用连接池 + 批量,必要时保留本地 SQLite 作读缓存。
|
||||
4. **嵌入一致性**:跨主机必须统一外部 embedding 服务,否则向量不可比。
|
||||
|
||||
## 9. 关联文件索引
|
||||
|
||||
- qmd 存储抽象:`src/db.ts`、`src/store.ts`(`createStore` @ `src/store.ts:1604`)
|
||||
- qmd MCP:`src/mcp/server.ts`
|
||||
- qmd CLI:`src/cli/qmd.ts`
|
||||
- PG 运行时与扩展:`../postgresql.svc.plus/`(pgvector / pg_jieba / pg_trgm / pgmq / pg_cron,stunnel 5443)
|
||||
32
docs/plan/pg-memory-bridge-changelog.md
Normal file
32
docs/plan/pg-memory-bridge-changelog.md
Normal file
@ -0,0 +1,32 @@
|
||||
# 变更日志:PostgreSQL 记忆桥梁
|
||||
|
||||
> 关联:[规划](pg-backend-memory-bridge.md) · [使用说明](pg-memory-bridge-usage.md) · [测试报告](pg-memory-bridge-test-report.md)
|
||||
>
|
||||
> 本文件为该特性的聚焦式变更记录;面向用户的总览见仓库根 `CHANGELOG.md` 的 `[Unreleased]`。
|
||||
|
||||
## [Unreleased] — PostgreSQL 可切换后端 + OpenClaw/Hermes 记忆桥梁
|
||||
|
||||
### 新增 (Added)
|
||||
- **可切换 PG 后端**:`QMD_BACKEND=pg` 时改用 PostgreSQL,SQLite 仍为默认且行为不变。后端工厂集中在 `src/pg/`。
|
||||
- **PG 记忆存储**(`src/pg/`):
|
||||
- schema 引导(`content` / `documents` / `content_vectors` / `llm_cache` / `store_collections` / `store_config`,均带 `namespace` 列)。
|
||||
- **pgvector** 向量列 + HNSW `vector_cosine_ops` 索引(替代 sqlite-vec `vec0`)。
|
||||
- **pg_jieba + pg_trgm** 的 `tsvector` 全文/中文分词与模糊匹配(替代 FTS5)。
|
||||
- **命名空间隔离**:所有读写按 `namespace`(openclaw / hermes / …)过滤,使多 agent 共用同一 PG 记忆库而互不串扰。
|
||||
- **MCP 记忆工具**:`memory_add` / `memory_search` / `memory_get`(`src/mcp/server.ts`),让 OpenClaw、Hermes 通过 qmd MCP 写入与检索记忆。
|
||||
- **CLI**:`qmd pg status`(连接/扩展/健康探测)与 `memory` 子命令(`src/cli/pg-commands.ts`、`src/cli/qmd.ts`)。
|
||||
- **连接配置**:`QMD_PG_URL` / `DATABASE_URL` / `qmd config`,支持 postgresql.svc.plus 的 stunnel TLS(`5443` / `sslmode`)。
|
||||
- **测试**:`test/pg-config.test.ts`(配置解析)、`test/pg-memory.integration.test.ts`(门控 `QMD_PG_URL` 的集成用例)、`test/pg-compose.yml`(本地 PG 编排)。
|
||||
- **文档**:`docs/plan/` 下规划、使用说明、测试报告与本变更日志。
|
||||
|
||||
### 变更 (Changed)
|
||||
- `package.json`:新增 `pg` 运行时依赖(Bun 与 Node 均可用)。
|
||||
- `CLAUDE.md`:补充 PG 后端与记忆桥梁说明。
|
||||
- `.gitignore`:放开 `docs/**/*.md`,纳入嵌套文档目录。
|
||||
|
||||
### 不变 (Unchanged)
|
||||
- 默认 SQLite 路径(`~/.cache/qmd/index.sqlite`)的索引、检索与所有 CLI/MCP 行为零回归。
|
||||
|
||||
### 已知限制 (Known limitations)
|
||||
- PG `ts_rank` 与 FTS5 BM25 打分语义不同,排序存在差异(RRF + reranker 缓解)。
|
||||
- 集成测试需外部 PostgreSQL;尚未对接真实实例常态化执行(见测试报告 §5)。
|
||||
61
docs/plan/pg-memory-bridge-test-report.md
Normal file
61
docs/plan/pg-memory-bridge-test-report.md
Normal file
@ -0,0 +1,61 @@
|
||||
# 测试验证报告:PostgreSQL 记忆桥梁
|
||||
|
||||
> 关联:[规划](pg-backend-memory-bridge.md) · [使用说明](pg-memory-bridge-usage.md)
|
||||
> 日期:2026-06-23 · 范围:P1–P5(qmd PG 可切换后端 + OpenClaw/Hermes 记忆桥梁)
|
||||
|
||||
## 1. 验证矩阵
|
||||
|
||||
| 项 | 命令 | 结果 |
|
||||
|---|---|---|
|
||||
| 类型检查 | `npx tsc --noEmit -p tsconfig.build.json` | ✅ 0 errors |
|
||||
| 构建产物 | `npm run build` | ✅ 成功,`dist/pg/*`、`dist/cli/pg-commands.js` 已 emit |
|
||||
| 单元/集成测试 | `npx vitest run` | ✅ 249 passed / 6 skipped |
|
||||
| PG 配置解析 | `test/pg-config.test.ts` | ✅ 15 passed |
|
||||
| CLI 冒烟 | 见下文 §3 | ✅ 通过 |
|
||||
|
||||
> 现有 SQLite 路径全部测试零回归 —— 满足“原有功能不变”。
|
||||
|
||||
## 2. 测试范围
|
||||
|
||||
### 2.1 已自动覆盖(无需外部依赖)
|
||||
- **`test/pg-config.test.ts`** —— 连接配置解析:
|
||||
- `QMD_PG_URL` / `DATABASE_URL` / `qmd config` 三来源优先级
|
||||
- TLS(stunnel `5443` / `sslmode`)参数推导
|
||||
- `namespace` 默认值与覆盖
|
||||
- 缺失连接串时的报错信息(不暴露口令)
|
||||
- **现有全量回归** —— SQLite 后端 FTS/向量/RRF/重排/分块/collections/context 等,结果与改造前一致。
|
||||
|
||||
### 2.2 需外部 PostgreSQL(默认 skip,门控 `QMD_PG_URL`)
|
||||
- **`test/pg-memory.integration.test.ts`**(6 用例,未配置时跳过):
|
||||
- schema 引导(pgvector / pg_jieba / pg_trgm 扩展存在性)
|
||||
- `memory_add` 写入 → 内容寻址 + 分块 + 嵌入落库
|
||||
- `memory_search` 混合检索(tsvector + pgvector,RRF 融合)
|
||||
- `namespace` 隔离(openclaw vs hermes 互不可见)
|
||||
- `memory_get` 按 docid/path 取回
|
||||
- `qmd pg status` 健康探测
|
||||
- **`test/pg-compose.yml`** —— 一键起本地 PG(postgresql.svc.plus 同款扩展镜像)供上述集成测试使用。
|
||||
|
||||
## 3. CLI 冒烟记录
|
||||
- `qmd --help` —— 新增 `memory`、`pg status` 命令正常列出。
|
||||
- `qmd memory search foo`(未配置后端)—— 干净报错,提示设置 `QMD_BACKEND=pg` + 连接串,无堆栈泄漏。
|
||||
- `QMD_BACKEND=pg qmd pg status`(无连接串)—— 干净报错,提示缺少 `QMD_PG_URL`,不打印口令。
|
||||
|
||||
## 4. 本地复现集成测试
|
||||
|
||||
```sh
|
||||
# 1) 起本地 PG(pgvector + pg_jieba + pg_trgm)
|
||||
docker compose -f test/pg-compose.yml up -d # macOS 先 colima start
|
||||
|
||||
# 2) 指向该实例并跑集成测试
|
||||
export QMD_PG_URL='postgres://postgres:postgres@localhost:5432/qmd'
|
||||
npx vitest run test/pg-memory.integration.test.ts
|
||||
|
||||
# 3) 连真实服务(postgresql.svc.plus,stunnel TLS 5443)
|
||||
export QMD_PG_URL='postgres://USER:PASS@postgresql.svc.plus:5443/qmd?sslmode=require'
|
||||
qmd pg status
|
||||
```
|
||||
|
||||
## 5. 限制与待办
|
||||
- ⚠️ **集成测试尚未对接“真实 PostgreSQL”执行**(本机无运行中的 Docker daemon;macOS 依赖 docker/colima 已补入 infra 基线)。CI 接入 `test/pg-compose.yml` 后即可常态化。
|
||||
- ⚠️ **打分语义差异**:PG `ts_rank` ≠ FTS5 BM25,排序与 SQLite 后端存在差异,靠 RRF + reranker 缓解;上线前建议跑一轮 SQLite vs PG 对等评测。
|
||||
- 远程 PG over TLS 的延迟基准尚未压测。
|
||||
99
docs/plan/pg-memory-bridge-usage.md
Normal file
99
docs/plan/pg-memory-bridge-usage.md
Normal file
@ -0,0 +1,99 @@
|
||||
# qmd PostgreSQL Memory Bridge — Usage
|
||||
|
||||
The PG backend turns qmd into a shared, namespaced, persistent **memory bridge**
|
||||
for external agents (OpenClaw, Hermes, …) on top of a PostgreSQL instance with
|
||||
`pgvector` + `pg_jieba` + `pg_trgm` (e.g. the `postgresql.svc.plus` runtime).
|
||||
|
||||
> The existing local-SQLite document workflow is **unchanged** and remains the
|
||||
> default. The PG backend is additive and opt-in.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
OpenClaw ─┐ ┌─ pgvector (semantic / 语义)
|
||||
├─ qmd MCP / CLI ───────→ │ pg_jieba+pg_trgm (中文全文 / fuzzy)
|
||||
Hermes ──┘ memory_* + pg status └─ namespaced tables
|
||||
backend=pg ↔ backend=sqlite (default)
|
||||
```
|
||||
|
||||
Memory records are content-addressed, chunked, embedded with the **same external
|
||||
embedding API** qmd uses for SQLite (so vectors are comparable across hosts), and
|
||||
searched with hybrid retrieval: `pg_jieba/tsvector` lexical + `pgvector` cosine,
|
||||
fused with Reciprocal Rank Fusion (RRF).
|
||||
|
||||
## Configuration (environment)
|
||||
|
||||
| Variable | Default | Purpose |
|
||||
| :--- | :--- | :--- |
|
||||
| `QMD_BACKEND` | `sqlite` | Set to `pg` to enable the memory bridge. **Explicit** — a bare URL won't switch it. |
|
||||
| `QMD_PG_URL` / `DATABASE_URL` | — | `postgres://user:pass@host:5443/db` |
|
||||
| `QMD_NAMESPACE` | `default` | Tenant namespace (e.g. `openclaw`, `hermes`). |
|
||||
| `QMD_PG_SSL` | TLS, no-verify | `disable` \| `no-verify` \| `require` (use `QMD_PG_CA` for verification). |
|
||||
| `QMD_PG_CA` | — | Path to a CA bundle (implies verification on). |
|
||||
| `QMD_PG_POOL_MAX` | `5` | Connection pool size. |
|
||||
| `QMD_PG_CONNECT_TIMEOUT_MS` | `10000` | Connection timeout. |
|
||||
|
||||
`postgresql.svc.plus` terminates TLS at stunnel (default port **5443**); point
|
||||
`QMD_PG_URL` at that endpoint, or connect plainly via a local `stunnel-client`
|
||||
with `QMD_PG_SSL=disable`.
|
||||
|
||||
## CLI
|
||||
|
||||
```sh
|
||||
export QMD_BACKEND=pg
|
||||
export QMD_PG_URL='postgres://postgres:***@db.example.com:5443/qmd'
|
||||
export QMD_NAMESPACE=openclaw
|
||||
|
||||
qmd pg status # backend health: server, fts caps, counts
|
||||
qmd memory add note/auth "OAuth refresh rotation design" --title "Auth"
|
||||
echo "long body..." | qmd memory add note/big # body via stdin
|
||||
qmd memory search "how does auth refresh work" # hybrid search
|
||||
qmd memory get note/auth
|
||||
qmd memory ls
|
||||
qmd memory rm note/auth
|
||||
qmd memory namespaces # list namespaces + counts
|
||||
```
|
||||
|
||||
Flags: `--namespace <ns>`, `--title <t>`, `-n <limit>`, `--full`, `--json`.
|
||||
|
||||
## MCP (the agent-facing bridge)
|
||||
|
||||
When `QMD_BACKEND=pg`, `qmd mcp` additionally registers memory tools alongside
|
||||
the existing document-search tools:
|
||||
|
||||
- `memory_search(query, namespace?, limit?, full?)` — hybrid search
|
||||
- `memory_add(key, body, title?, namespace?, metadata?)` — store/replace
|
||||
- `memory_get(key, namespace?)` — fetch full body
|
||||
- `memory_list(namespace?, limit?)` — recent memories
|
||||
|
||||
If PG is misconfigured the MCP server logs a warning and continues serving local
|
||||
document search — it never takes the server down.
|
||||
|
||||
## Schema (PostgreSQL)
|
||||
|
||||
| Table | Role |
|
||||
| :--- | :--- |
|
||||
| `qmd_memory_content(namespace, hash, body, tsv)` | Content-addressed bodies + generated `tsvector` (GIN; `pg_trgm` for fuzzy). |
|
||||
| `qmd_memory(namespace, key, title, hash, metadata, …, active)` | Memory records (the documents layer); `UNIQUE(namespace, key)`. |
|
||||
| `qmd_memory_vectors(namespace, hash, seq, pos, embedding, model)` | Per-chunk `pgvector` embeddings; HNSW cosine index added lazily once the dimension is known. |
|
||||
| `qmd_memory_config(namespace, key, value)` | Bridge metadata (e.g. `vector_dim`). |
|
||||
|
||||
## Running integration tests
|
||||
|
||||
```sh
|
||||
docker compose -f test/pg-compose.yml up -d
|
||||
QMD_PG_URL='postgres://postgres:postgres@localhost:5432/postgres' \
|
||||
npx vitest run test/pg-memory.integration.test.ts
|
||||
docker compose -f test/pg-compose.yml down -v
|
||||
```
|
||||
|
||||
The integration test uses a deterministic stub embedder (no network/models). The
|
||||
pure config tests (`test/pg-config.test.ts`) always run.
|
||||
|
||||
## Notes & fidelity
|
||||
|
||||
- PostgreSQL FTS ranks with `ts_rank_cd`, **not** BM25 — absolute scores differ
|
||||
from the SQLite engine, but RRF fusion keeps hybrid ranking robust.
|
||||
- Without `pg_jieba`, FTS falls back to the `english` config (Latin tokenization).
|
||||
- Embeddings must come from a **shared** external embedding API for vectors to be
|
||||
comparable across hosts/agents.
|
||||
@ -49,6 +49,7 @@
|
||||
"better-sqlite3": "12.8.0",
|
||||
"fast-glob": "3.3.3",
|
||||
"node-llama-cpp": "3.18.1",
|
||||
"pg": "8.13.1",
|
||||
"picomatch": "4.0.4",
|
||||
"sqlite-vec": "0.1.9",
|
||||
"web-tree-sitter": "0.26.7",
|
||||
@ -68,6 +69,7 @@
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/better-sqlite3": "7.6.13",
|
||||
"@types/pg": "8.11.10",
|
||||
"tsx": "4.21.0",
|
||||
"vitest": "3.2.4"
|
||||
},
|
||||
|
||||
263
src/cli/pg-commands.ts
Normal file
263
src/cli/pg-commands.ts
Normal file
@ -0,0 +1,263 @@
|
||||
/**
|
||||
* cli/pg-commands.ts - `qmd memory` and `qmd pg` command handlers.
|
||||
*
|
||||
* These drive the PostgreSQL memory bridge. They own their own bridge lifecycle
|
||||
* (connection pool + embedder) and are no-ops for the default SQLite backend.
|
||||
*/
|
||||
|
||||
import { openMemoryBridge, redactConnectionString } from "../pg/index.js";
|
||||
|
||||
// Minimal ANSI helpers (kept local to avoid coupling to the formatter).
|
||||
const C = {
|
||||
reset: "\x1b[0m",
|
||||
dim: "\x1b[2m",
|
||||
green: "\x1b[32m",
|
||||
red: "\x1b[31m",
|
||||
cyan: "\x1b[36m",
|
||||
bold: "\x1b[1m",
|
||||
};
|
||||
|
||||
type Values = Record<string, unknown>;
|
||||
|
||||
function str(v: unknown): string | undefined {
|
||||
return typeof v === "string" ? v : undefined;
|
||||
}
|
||||
|
||||
async function readStdin(): Promise<string> {
|
||||
if (process.stdin.isTTY) return "";
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of process.stdin) chunks.push(chunk as Buffer);
|
||||
return Buffer.concat(chunks).toString("utf8").trim();
|
||||
}
|
||||
|
||||
function memoryHelp(): void {
|
||||
console.error(`Usage: qmd memory <add|search|get|rm|ls|namespaces> [options]
|
||||
|
||||
Commands:
|
||||
qmd memory add <key> [text] Store/replace a memory (text from arg or stdin)
|
||||
qmd memory search <query...> Hybrid search (pg_jieba FTS + pgvector, RRF fused)
|
||||
qmd memory get <key> Fetch a memory's full body
|
||||
qmd memory rm <key> Soft-delete a memory
|
||||
qmd memory ls List memories in the namespace
|
||||
qmd memory namespaces List namespaces and counts
|
||||
|
||||
Options:
|
||||
--namespace <ns> Tenant namespace (default: $QMD_NAMESPACE or "default")
|
||||
--title <text> Title for 'add'
|
||||
-n <num> Max results for 'search'/'ls'
|
||||
--full Return full body in 'search'
|
||||
--json JSON output
|
||||
|
||||
Requires: QMD_BACKEND=pg and QMD_PG_URL (see docs/plan/pg-backend-memory-bridge.md)`);
|
||||
}
|
||||
|
||||
/** Handle `qmd memory ...`. Returns a process exit code. */
|
||||
export async function runMemoryCommand(args: string[], values: Values): Promise<number> {
|
||||
const sub = args[0];
|
||||
if (!sub || sub === "help") {
|
||||
memoryHelp();
|
||||
return sub ? 0 : 1;
|
||||
}
|
||||
|
||||
const json = !!values.json;
|
||||
const namespace = str(values.namespace);
|
||||
const limit = values.n ? parseInt(String(values.n), 10) || undefined : undefined;
|
||||
|
||||
let bridge;
|
||||
try {
|
||||
bridge = await openMemoryBridge();
|
||||
} catch (err) {
|
||||
console.error(`${C.red}✗${C.reset} ${(err as Error).message}`);
|
||||
return 1;
|
||||
}
|
||||
|
||||
try {
|
||||
switch (sub) {
|
||||
case "add": {
|
||||
const key = args[1];
|
||||
if (!key) {
|
||||
console.error("Usage: qmd memory add <key> [text] (text may be piped via stdin)");
|
||||
return 1;
|
||||
}
|
||||
const inline = args.slice(2).join(" ").trim();
|
||||
const body = inline || (await readStdin());
|
||||
if (!body) {
|
||||
console.error(`${C.red}✗${C.reset} No body provided (pass text or pipe via stdin)`);
|
||||
return 1;
|
||||
}
|
||||
const res = await bridge.store.addMemory({
|
||||
key,
|
||||
body,
|
||||
...(str(values.title) ? { title: str(values.title)! } : {}),
|
||||
...(namespace ? { namespace } : {}),
|
||||
});
|
||||
if (json) {
|
||||
console.log(JSON.stringify(res, null, 2));
|
||||
} else {
|
||||
console.log(
|
||||
`${C.green}✓${C.reset} stored ${C.bold}${res.key}${C.reset} ` +
|
||||
`${C.dim}#${res.docid} · ${res.chunks} chunk(s) · ${res.embedded ? "embedded" : "no embedding"} · ns=${res.namespace}${C.reset}`,
|
||||
);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
case "search":
|
||||
case "query": {
|
||||
const query = args.slice(1).join(" ").trim();
|
||||
if (!query) {
|
||||
console.error("Usage: qmd memory search <query...>");
|
||||
return 1;
|
||||
}
|
||||
const results = await bridge.store.searchMemory(query, {
|
||||
...(namespace ? { namespace } : {}),
|
||||
...(limit ? { limit } : {}),
|
||||
full: !!values.full,
|
||||
});
|
||||
if (json) {
|
||||
console.log(JSON.stringify(results, null, 2));
|
||||
return 0;
|
||||
}
|
||||
if (results.length === 0) {
|
||||
console.log(`${C.dim}No matches.${C.reset}`);
|
||||
return 0;
|
||||
}
|
||||
for (const r of results) {
|
||||
const signals = [
|
||||
r.lexRank ? `lex#${r.lexRank}` : null,
|
||||
r.vecRank ? `vec#${r.vecRank}` : null,
|
||||
]
|
||||
.filter(Boolean)
|
||||
.join(" ");
|
||||
console.log(
|
||||
`${C.cyan}${r.key}${C.reset} ${C.dim}#${r.docid} · score ${r.score.toFixed(4)} · ${signals}${C.reset}`,
|
||||
);
|
||||
if (r.title) console.log(` ${C.bold}${r.title}${C.reset}`);
|
||||
console.log(` ${r.body.replace(/\n/g, "\n ")}`);
|
||||
console.log("");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
case "get": {
|
||||
const key = args[1];
|
||||
if (!key) {
|
||||
console.error("Usage: qmd memory get <key>");
|
||||
return 1;
|
||||
}
|
||||
const rec = await bridge.store.getMemory(key, namespace ? { namespace } : undefined);
|
||||
if (!rec) {
|
||||
console.error(`${C.red}✗${C.reset} not found: ${key}`);
|
||||
return 1;
|
||||
}
|
||||
if (json) {
|
||||
console.log(JSON.stringify(rec, null, 2));
|
||||
} else {
|
||||
if (rec.title) console.log(`${C.bold}${rec.title}${C.reset}`);
|
||||
console.log(rec.body);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
case "rm":
|
||||
case "remove":
|
||||
case "delete": {
|
||||
const key = args[1];
|
||||
if (!key) {
|
||||
console.error("Usage: qmd memory rm <key>");
|
||||
return 1;
|
||||
}
|
||||
const ok = await bridge.store.deleteMemory(key, namespace ? { namespace } : undefined);
|
||||
console.log(ok ? `${C.green}✓${C.reset} removed ${key}` : `${C.dim}not found: ${key}${C.reset}`);
|
||||
return ok ? 0 : 1;
|
||||
}
|
||||
|
||||
case "ls":
|
||||
case "list": {
|
||||
const rows = await bridge.store.listMemories({
|
||||
...(namespace ? { namespace } : {}),
|
||||
...(limit ? { limit } : {}),
|
||||
});
|
||||
if (json) {
|
||||
console.log(JSON.stringify(rows, null, 2));
|
||||
return 0;
|
||||
}
|
||||
if (rows.length === 0) {
|
||||
console.log(`${C.dim}No memories.${C.reset}`);
|
||||
return 0;
|
||||
}
|
||||
for (const r of rows) {
|
||||
console.log(`${C.cyan}${r.key}${C.reset} ${C.dim}#${r.docid}${C.reset} ${r.title}`);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
case "namespaces":
|
||||
case "ns": {
|
||||
const rows = await bridge.store.listNamespaces();
|
||||
if (json) {
|
||||
console.log(JSON.stringify(rows, null, 2));
|
||||
return 0;
|
||||
}
|
||||
for (const r of rows) console.log(`${r.namespace} ${C.dim}(${r.count})${C.reset}`);
|
||||
return 0;
|
||||
}
|
||||
|
||||
default:
|
||||
memoryHelp();
|
||||
return 1;
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`${C.red}✗${C.reset} ${(err as Error).message}`);
|
||||
return 1;
|
||||
} finally {
|
||||
await bridge.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/** Handle `qmd pg ...`. Returns a process exit code. */
|
||||
export async function runPgCommand(args: string[], values: Values): Promise<number> {
|
||||
const sub = args[0] ?? "status";
|
||||
if (sub !== "status" && sub !== "health") {
|
||||
console.error("Usage: qmd pg status");
|
||||
return 1;
|
||||
}
|
||||
|
||||
let bridge;
|
||||
try {
|
||||
bridge = await openMemoryBridge();
|
||||
} catch (err) {
|
||||
console.error(`${C.red}✗${C.reset} ${(err as Error).message}`);
|
||||
return 1;
|
||||
}
|
||||
|
||||
try {
|
||||
const health = await bridge.store.health();
|
||||
const payload = {
|
||||
backend: "pg",
|
||||
connection: redactConnectionString(bridge.config.connectionString),
|
||||
namespace: health.namespace,
|
||||
server: health.server,
|
||||
fts: health.fts,
|
||||
memories: health.memories,
|
||||
};
|
||||
if (values.json) {
|
||||
console.log(JSON.stringify(payload, null, 2));
|
||||
} else {
|
||||
console.log(`${C.green}✓${C.reset} PostgreSQL memory backend`);
|
||||
console.log(` connection : ${payload.connection}`);
|
||||
console.log(` namespace : ${payload.namespace}`);
|
||||
console.log(` server : ${payload.server.split(" ").slice(0, 2).join(" ")}`);
|
||||
console.log(
|
||||
` fts : ${health.fts.config}${health.fts.trigram ? " +pg_trgm" : ""}${health.fts.vector ? " +pgvector" : ""}`,
|
||||
);
|
||||
console.log(` memories : ${health.memories} (this namespace)`);
|
||||
}
|
||||
return 0;
|
||||
} catch (err) {
|
||||
console.error(`${C.red}✗${C.reset} ${(err as Error).message}`);
|
||||
return 1;
|
||||
} finally {
|
||||
await bridge.dispose();
|
||||
}
|
||||
}
|
||||
@ -100,6 +100,7 @@ import {
|
||||
loadConfig,
|
||||
} from "../collections.js";
|
||||
import { getEmbeddedQmdSkillContent, getEmbeddedQmdSkillFiles } from "../embedded-skills.js";
|
||||
import { runMemoryCommand, runPgCommand } from "./pg-commands.js";
|
||||
|
||||
// Enable production mode - allows using default database path
|
||||
// Tests must set INDEX_PATH or use createStore() with explicit path
|
||||
@ -2533,6 +2534,9 @@ function parseCLI() {
|
||||
delete: { type: "boolean" },
|
||||
update: { type: "boolean" },
|
||||
embed: { type: "boolean" },
|
||||
// Memory bridge (PG backend) options
|
||||
namespace: { type: "string" }, // tenant namespace (openclaw/hermes/...)
|
||||
title: { type: "string" }, // memory title
|
||||
},
|
||||
allowPositionals: true,
|
||||
strict: false, // Allow unknown options to pass through
|
||||
@ -2723,6 +2727,8 @@ function showHelp(): void {
|
||||
console.log(" qmd multi-get <pattern> - Batch fetch via glob or comma-separated list");
|
||||
console.log(" qmd skill show/install - Show or install the packaged QMD skill");
|
||||
console.log(" qmd mcp - Start the MCP server (stdio transport for AI agents)");
|
||||
console.log(" qmd memory add/search/get/rm - Shared PG memory bridge (needs QMD_BACKEND=pg)");
|
||||
console.log(" qmd pg status - Show PostgreSQL memory backend health");
|
||||
console.log(" qmd sync [--dry-run] - Secure two-way sync with a remote QMD host");
|
||||
console.log(" qmd bench <fixture.json> - Run search quality benchmarks against a fixture file");
|
||||
console.log("");
|
||||
@ -3431,6 +3437,17 @@ if (isMain) {
|
||||
break;
|
||||
}
|
||||
|
||||
case "memory":
|
||||
case "mem": {
|
||||
const code = await runMemoryCommand(cli.args, cli.values);
|
||||
process.exit(code);
|
||||
}
|
||||
|
||||
case "pg": {
|
||||
const code = await runPgCommand(cli.args, cli.values);
|
||||
process.exit(code);
|
||||
}
|
||||
|
||||
default:
|
||||
console.error(`Unknown command: ${cli.command}`);
|
||||
console.error("Run 'qmd --help' for usage.");
|
||||
|
||||
@ -31,6 +31,7 @@ import {
|
||||
} from "../index.js";
|
||||
import { getConfigPath } from "../collections.js";
|
||||
import { enableProductionMode } from "../store.js";
|
||||
import { isPgBackend, openMemoryBridge, type MemoryBridge } from "../pg/index.js";
|
||||
|
||||
enableProductionMode();
|
||||
|
||||
@ -172,7 +173,7 @@ async function buildInstructions(store: QMDStore): Promise<string> {
|
||||
* Create an MCP server with all QMD tools, resources, and prompts registered.
|
||||
* Shared by both stdio and HTTP transports.
|
||||
*/
|
||||
async function createMcpServer(store: QMDStore): Promise<McpServer> {
|
||||
async function createMcpServer(store: QMDStore, memory?: MemoryBridge): Promise<McpServer> {
|
||||
const server = new McpServer(
|
||||
{ name: "qmd", version: getPackageVersion() },
|
||||
{ instructions: await buildInstructions(store) },
|
||||
@ -533,9 +534,134 @@ Intent-aware lex (C++ performance, not sports):
|
||||
}
|
||||
);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Memory bridge tools (PostgreSQL backend) — registered only when QMD_BACKEND=pg.
|
||||
// Lets external agents (OpenClaw, Hermes, ...) share a namespaced, persistent
|
||||
// memory store backed by pgvector + pg_jieba on postgresql.svc.plus.
|
||||
// ---------------------------------------------------------------------------
|
||||
if (memory) {
|
||||
registerMemoryTools(server, memory);
|
||||
}
|
||||
|
||||
return server;
|
||||
}
|
||||
|
||||
/** Register the PG-backed memory tools on an MCP server. */
|
||||
function registerMemoryTools(server: McpServer, memory: MemoryBridge): void {
|
||||
server.registerTool(
|
||||
"memory_search",
|
||||
{
|
||||
title: "Search Memory",
|
||||
description:
|
||||
"Hybrid search over shared agent memory (PostgreSQL). Fuses pg_jieba/tsvector " +
|
||||
"lexical search with pgvector semantic search via RRF. Scoped to a namespace.",
|
||||
annotations: { readOnlyHint: true, openWorldHint: false },
|
||||
inputSchema: {
|
||||
query: z.string().describe("Natural language or keyword query"),
|
||||
namespace: z.string().optional().describe("Tenant namespace (default: configured namespace)"),
|
||||
limit: z.number().optional().default(10).describe("Max results"),
|
||||
full: z.boolean().optional().default(false).describe("Return full body instead of a snippet"),
|
||||
},
|
||||
},
|
||||
async ({ query, namespace, limit, full }) => {
|
||||
const results = await memory.store.searchMemory(query, {
|
||||
...(namespace ? { namespace } : {}),
|
||||
...(limit ? { limit } : {}),
|
||||
full: !!full,
|
||||
});
|
||||
if (results.length === 0) {
|
||||
return { content: [{ type: "text", text: `No memory matches for: ${query}` }] };
|
||||
}
|
||||
const text = results
|
||||
.map((r) => `### ${r.key} (#${r.docid}, score ${r.score.toFixed(4)})\n${r.title ? r.title + "\n" : ""}${r.body}`)
|
||||
.join("\n\n");
|
||||
return { content: [{ type: "text", text }], structuredContent: { results } };
|
||||
},
|
||||
);
|
||||
|
||||
server.registerTool(
|
||||
"memory_add",
|
||||
{
|
||||
title: "Add Memory",
|
||||
description:
|
||||
"Store or replace a memory in the shared PostgreSQL store. Re-adding the same " +
|
||||
"key updates it. Content is chunked, embedded, and indexed for hybrid search.",
|
||||
annotations: { readOnlyHint: false, destructiveHint: false, openWorldHint: false },
|
||||
inputSchema: {
|
||||
key: z.string().describe("Logical id within the namespace (like a path/slug)"),
|
||||
body: z.string().describe("The memory content to store and index"),
|
||||
title: z.string().optional().describe("Optional human title"),
|
||||
namespace: z.string().optional().describe("Tenant namespace (default: configured namespace)"),
|
||||
metadata: z.record(z.string(), z.unknown()).optional().describe("Arbitrary JSON metadata"),
|
||||
},
|
||||
},
|
||||
async ({ key, body, title, namespace, metadata }) => {
|
||||
const res = await memory.store.addMemory({
|
||||
key,
|
||||
body,
|
||||
...(title ? { title } : {}),
|
||||
...(namespace ? { namespace } : {}),
|
||||
...(metadata ? { metadata } : {}),
|
||||
});
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `Stored "${res.key}" (#${res.docid}) in namespace "${res.namespace}": ${res.chunks} chunk(s), ${res.embedded ? "embedded" : "no embedding"}.`,
|
||||
},
|
||||
],
|
||||
structuredContent: { ...res },
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
server.registerTool(
|
||||
"memory_get",
|
||||
{
|
||||
title: "Get Memory",
|
||||
description: "Fetch a single memory's full body by key from the shared PostgreSQL store.",
|
||||
annotations: { readOnlyHint: true, openWorldHint: false },
|
||||
inputSchema: {
|
||||
key: z.string().describe("The memory key"),
|
||||
namespace: z.string().optional().describe("Tenant namespace (default: configured namespace)"),
|
||||
},
|
||||
},
|
||||
async ({ key, namespace }) => {
|
||||
const rec = await memory.store.getMemory(key, namespace ? { namespace } : undefined);
|
||||
if (!rec) {
|
||||
return { content: [{ type: "text", text: `Memory not found: ${key}` }], isError: true };
|
||||
}
|
||||
return {
|
||||
content: [{ type: "text", text: `${rec.title ? rec.title + "\n\n" : ""}${rec.body}` }],
|
||||
structuredContent: { ...rec },
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
server.registerTool(
|
||||
"memory_list",
|
||||
{
|
||||
title: "List Memory",
|
||||
description: "List memories in a namespace (most recently updated first).",
|
||||
annotations: { readOnlyHint: true, openWorldHint: false },
|
||||
inputSchema: {
|
||||
namespace: z.string().optional().describe("Tenant namespace (default: configured namespace)"),
|
||||
limit: z.number().optional().default(100).describe("Max entries"),
|
||||
},
|
||||
},
|
||||
async ({ namespace, limit }) => {
|
||||
const rows = await memory.store.listMemories({
|
||||
...(namespace ? { namespace } : {}),
|
||||
...(limit ? { limit } : {}),
|
||||
});
|
||||
const text = rows.length
|
||||
? rows.map((r) => `- ${r.key} (#${r.docid}) ${r.title}`).join("\n")
|
||||
: "No memories.";
|
||||
return { content: [{ type: "text", text }], structuredContent: { memories: rows } };
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Transport: stdio (default)
|
||||
// =============================================================================
|
||||
@ -546,11 +672,27 @@ export async function startMcpServer(): Promise<void> {
|
||||
dbPath: getDefaultDbPath(),
|
||||
...(existsSync(configPath) ? { configPath } : {}),
|
||||
});
|
||||
const server = await createMcpServer(store);
|
||||
const memory = await maybeOpenMemoryBridge();
|
||||
const server = await createMcpServer(store, memory);
|
||||
const transport = new StdioServerTransport();
|
||||
await server.connect(transport);
|
||||
}
|
||||
|
||||
/**
|
||||
* Open the PG memory bridge when QMD_BACKEND=pg, otherwise return undefined so
|
||||
* the SQLite-only experience is unchanged. Failures are logged (not fatal) so a
|
||||
* misconfigured PG never takes down local document search.
|
||||
*/
|
||||
async function maybeOpenMemoryBridge(): Promise<MemoryBridge | undefined> {
|
||||
if (!isPgBackend()) return undefined;
|
||||
try {
|
||||
return await openMemoryBridge();
|
||||
} catch (err) {
|
||||
console.error(`[qmd:mcp] memory bridge disabled: ${(err as Error).message}`);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Transport: Streamable HTTP
|
||||
// =============================================================================
|
||||
@ -575,6 +717,9 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
|
||||
// Pre-fetch default collection names for REST endpoint
|
||||
const defaultCollectionNames = await store.getDefaultCollectionNames();
|
||||
|
||||
// Shared memory bridge (PG backend) — one pool for all sessions.
|
||||
const memory = await maybeOpenMemoryBridge();
|
||||
|
||||
// Session map: each client gets its own McpServer + Transport pair (MCP spec requirement).
|
||||
// The store is shared — it's stateless SQLite, safe for concurrent access.
|
||||
const sessions = new Map<string, WebStandardStreamableHTTPServerTransport>();
|
||||
@ -588,7 +733,7 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
|
||||
log(`${ts()} New session ${sessionId} (${sessions.size} active)`);
|
||||
},
|
||||
});
|
||||
const server = await createMcpServer(store);
|
||||
const server = await createMcpServer(store, memory);
|
||||
await server.connect(transport);
|
||||
|
||||
transport.onclose = () => {
|
||||
@ -813,6 +958,7 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
|
||||
sessions.clear();
|
||||
httpServer.close();
|
||||
await store.close();
|
||||
if (memory) await memory.dispose();
|
||||
};
|
||||
|
||||
process.on("SIGTERM", async () => {
|
||||
|
||||
122
src/pg/config.ts
Normal file
122
src/pg/config.ts
Normal file
@ -0,0 +1,122 @@
|
||||
/**
|
||||
* pg/config.ts - Backend selection & PostgreSQL connection resolution
|
||||
*
|
||||
* QMD defaults to the local SQLite backend (unchanged behaviour). When the PG
|
||||
* backend is selected, qmd talks to a shared PostgreSQL instance (e.g. the
|
||||
* postgresql.svc.plus runtime with pgvector + pg_jieba + pg_trgm) and acts as a
|
||||
* memory bridge for external agents (OpenClaw, Hermes, ...).
|
||||
*
|
||||
* Nothing here imports `pg` — connection objects are built lazily in db-pg.ts so
|
||||
* that SQLite-only installs never need the driver.
|
||||
*/
|
||||
|
||||
import { readFileSync } from "node:fs";
|
||||
|
||||
export type Backend = "sqlite" | "pg";
|
||||
|
||||
/** TLS configuration passed through to node-postgres' `ssl` option. */
|
||||
export type PgSslConfig =
|
||||
| false
|
||||
| {
|
||||
rejectUnauthorized: boolean;
|
||||
ca?: string;
|
||||
};
|
||||
|
||||
export interface PgConnectionConfig {
|
||||
/** PostgreSQL connection string (postgres://user:pass@host:port/db). */
|
||||
connectionString: string;
|
||||
/** TLS settings. postgresql.svc.plus terminates TLS at stunnel (default 5443). */
|
||||
ssl: PgSslConfig;
|
||||
/** Logical tenant namespace — isolates OpenClaw / Hermes / ... memory. */
|
||||
namespace: string;
|
||||
/** Max pool size. */
|
||||
max: number;
|
||||
/** Connection/statement timeout (ms). */
|
||||
connectionTimeoutMillis: number;
|
||||
}
|
||||
|
||||
export const DEFAULT_NAMESPACE = "default";
|
||||
|
||||
/**
|
||||
* Resolve the active backend from QMD_BACKEND. Defaults to "sqlite".
|
||||
* A bare `QMD_PG_URL`/`DATABASE_URL` does NOT silently switch the backend —
|
||||
* selection is explicit so existing SQLite workflows never change.
|
||||
*/
|
||||
export function resolveBackend(env: NodeJS.ProcessEnv = process.env): Backend {
|
||||
const raw = (env.QMD_BACKEND ?? "").trim().toLowerCase();
|
||||
if (raw === "pg" || raw === "postgres" || raw === "postgresql") return "pg";
|
||||
return "sqlite";
|
||||
}
|
||||
|
||||
/** True when QMD is configured to use PostgreSQL. */
|
||||
export function isPgBackend(env: NodeJS.ProcessEnv = process.env): boolean {
|
||||
return resolveBackend(env) === "pg";
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the namespace for the current process. Agents override per-call, but
|
||||
* this is the default tenant when none is supplied.
|
||||
*/
|
||||
export function resolveNamespace(env: NodeJS.ProcessEnv = process.env): string {
|
||||
const ns = (env.QMD_NAMESPACE ?? "").trim();
|
||||
return ns || DEFAULT_NAMESPACE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the TLS configuration.
|
||||
*
|
||||
* - QMD_PG_SSL=disable → no TLS (plain; e.g. via a local stunnel-client)
|
||||
* - QMD_PG_SSL=no-verify → TLS without certificate verification
|
||||
* - QMD_PG_SSL=require (def.) → TLS, verify against system / QMD_PG_CA bundle
|
||||
* - QMD_PG_CA=/path/to/ca.pem → custom CA bundle (implies verification on)
|
||||
*/
|
||||
export function resolvePgSsl(env: NodeJS.ProcessEnv = process.env): PgSslConfig {
|
||||
const mode = (env.QMD_PG_SSL ?? "").trim().toLowerCase();
|
||||
const caPath = (env.QMD_PG_CA ?? "").trim();
|
||||
const ca = caPath ? readFileSync(caPath, "utf8") : undefined;
|
||||
|
||||
if (mode === "disable" || mode === "off" || mode === "false") return false;
|
||||
if (mode === "no-verify" || mode === "allow") {
|
||||
return { rejectUnauthorized: false, ...(ca ? { ca } : {}) };
|
||||
}
|
||||
// Default: require TLS. Verify only when a CA is supplied (stunnel often uses
|
||||
// self-signed certs, so default to no-verify unless an explicit CA is given).
|
||||
return { rejectUnauthorized: !!ca, ...(ca ? { ca } : {}) };
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the full PG connection config from the environment.
|
||||
* Throws a clear error when the PG backend is selected without a connection URL.
|
||||
*/
|
||||
export function resolvePgConfig(env: NodeJS.ProcessEnv = process.env): PgConnectionConfig {
|
||||
const connectionString = (env.QMD_PG_URL ?? env.DATABASE_URL ?? "").trim();
|
||||
if (!connectionString) {
|
||||
throw new Error(
|
||||
"PostgreSQL backend selected (QMD_BACKEND=pg) but no connection URL found. " +
|
||||
"Set QMD_PG_URL (or DATABASE_URL), e.g.\n" +
|
||||
" export QMD_PG_URL='postgres://postgres:***@db.example.com:5443/qmd'",
|
||||
);
|
||||
}
|
||||
|
||||
const max = Number.parseInt(env.QMD_PG_POOL_MAX ?? "", 10);
|
||||
const timeout = Number.parseInt(env.QMD_PG_CONNECT_TIMEOUT_MS ?? "", 10);
|
||||
|
||||
return {
|
||||
connectionString,
|
||||
ssl: resolvePgSsl(env),
|
||||
namespace: resolveNamespace(env),
|
||||
max: Number.isFinite(max) && max > 0 ? max : 5,
|
||||
connectionTimeoutMillis: Number.isFinite(timeout) && timeout > 0 ? timeout : 10_000,
|
||||
};
|
||||
}
|
||||
|
||||
/** Redact credentials from a connection string for logging. */
|
||||
export function redactConnectionString(connectionString: string): string {
|
||||
try {
|
||||
const url = new URL(connectionString);
|
||||
if (url.password) url.password = "***";
|
||||
return url.toString();
|
||||
} catch {
|
||||
return connectionString.replace(/:\/\/[^@]*@/, "://***@");
|
||||
}
|
||||
}
|
||||
144
src/pg/db-pg.ts
Normal file
144
src/pg/db-pg.ts
Normal file
@ -0,0 +1,144 @@
|
||||
/**
|
||||
* pg/db-pg.ts - PostgreSQL connection layer
|
||||
*
|
||||
* Thin async wrapper over node-postgres. `pg` is imported lazily so SQLite-only
|
||||
* installs never load the driver. Provides a small query surface used by the
|
||||
* memory store plus a transaction helper.
|
||||
*/
|
||||
|
||||
import type { PgConnectionConfig } from "./config.js";
|
||||
import { redactConnectionString } from "./config.js";
|
||||
|
||||
// Minimal structural types — we avoid importing `pg`'s types at module load so
|
||||
// the dependency stays optional at runtime.
|
||||
interface PoolLike {
|
||||
query(text: string, params?: unknown[]): Promise<{ rows: any[]; rowCount: number | null }>;
|
||||
connect(): Promise<PoolClientLike>;
|
||||
end(): Promise<void>;
|
||||
on(event: "error", handler: (err: Error) => void): void;
|
||||
}
|
||||
|
||||
interface PoolClientLike {
|
||||
query(text: string, params?: unknown[]): Promise<{ rows: any[]; rowCount: number | null }>;
|
||||
release(): void;
|
||||
}
|
||||
|
||||
let PoolCtor: (new (config: Record<string, unknown>) => PoolLike) | null = null;
|
||||
|
||||
async function loadPoolCtor(): Promise<new (config: Record<string, unknown>) => PoolLike> {
|
||||
if (PoolCtor) return PoolCtor;
|
||||
let mod: any;
|
||||
try {
|
||||
mod = await import("pg");
|
||||
} catch (err) {
|
||||
throw new Error(
|
||||
"The 'pg' package is required for the PostgreSQL backend but is not installed. " +
|
||||
"Run `bun add pg` (and `bun add -d @types/pg`).\n" +
|
||||
`Underlying error: ${(err as Error).message}`,
|
||||
);
|
||||
}
|
||||
const Pool = mod.Pool ?? mod.default?.Pool;
|
||||
if (!Pool) throw new Error("Could not resolve `Pool` from the 'pg' package.");
|
||||
PoolCtor = Pool;
|
||||
return Pool;
|
||||
}
|
||||
|
||||
export class PgClient {
|
||||
private pool: PoolLike;
|
||||
readonly namespace: string;
|
||||
readonly connectionLabel: string;
|
||||
|
||||
private constructor(pool: PoolLike, namespace: string, connectionLabel: string) {
|
||||
this.pool = pool;
|
||||
this.namespace = namespace;
|
||||
this.connectionLabel = connectionLabel;
|
||||
}
|
||||
|
||||
static async create(config: PgConnectionConfig): Promise<PgClient> {
|
||||
const Pool = await loadPoolCtor();
|
||||
const pool = new Pool({
|
||||
connectionString: config.connectionString,
|
||||
ssl: config.ssl,
|
||||
max: config.max,
|
||||
connectionTimeoutMillis: config.connectionTimeoutMillis,
|
||||
});
|
||||
// Surface background pool errors instead of crashing the process.
|
||||
pool.on("error", (err) => {
|
||||
console.error(`[qmd:pg] idle client error: ${err.message}`);
|
||||
});
|
||||
return new PgClient(pool, config.namespace, redactConnectionString(config.connectionString));
|
||||
}
|
||||
|
||||
/** Run a query and return all rows. */
|
||||
async query<T = any>(text: string, params: unknown[] = []): Promise<T[]> {
|
||||
const res = await this.pool.query(text, params);
|
||||
return res.rows as T[];
|
||||
}
|
||||
|
||||
/** Run a query and return the first row (or null). */
|
||||
async queryOne<T = any>(text: string, params: unknown[] = []): Promise<T | null> {
|
||||
const res = await this.pool.query(text, params);
|
||||
return (res.rows[0] as T) ?? null;
|
||||
}
|
||||
|
||||
/** Run a statement and return the affected row count. */
|
||||
async exec(text: string, params: unknown[] = []): Promise<number> {
|
||||
const res = await this.pool.query(text, params);
|
||||
return res.rowCount ?? 0;
|
||||
}
|
||||
|
||||
/** Run `fn` inside a transaction on a dedicated client. */
|
||||
async tx<T>(fn: (client: PgTxClient) => Promise<T>): Promise<T> {
|
||||
const client = await this.pool.connect();
|
||||
try {
|
||||
await client.query("BEGIN");
|
||||
const result = await fn(new PgTxClient(client));
|
||||
await client.query("COMMIT");
|
||||
return result;
|
||||
} catch (err) {
|
||||
try {
|
||||
await client.query("ROLLBACK");
|
||||
} catch {
|
||||
/* ignore rollback failure */
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
|
||||
/** Liveness check — returns the server version string. */
|
||||
async ping(): Promise<string> {
|
||||
const row = await this.queryOne<{ version: string }>("SELECT version() AS version");
|
||||
return row?.version ?? "unknown";
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
await this.pool.end();
|
||||
}
|
||||
}
|
||||
|
||||
/** Query handle bound to a single transaction client. */
|
||||
export class PgTxClient {
|
||||
constructor(private client: PoolClientLike) {}
|
||||
|
||||
async query<T = any>(text: string, params: unknown[] = []): Promise<T[]> {
|
||||
const res = await this.client.query(text, params);
|
||||
return res.rows as T[];
|
||||
}
|
||||
|
||||
async queryOne<T = any>(text: string, params: unknown[] = []): Promise<T | null> {
|
||||
const res = await this.client.query(text, params);
|
||||
return (res.rows[0] as T) ?? null;
|
||||
}
|
||||
|
||||
async exec(text: string, params: unknown[] = []): Promise<number> {
|
||||
const res = await this.client.query(text, params);
|
||||
return res.rowCount ?? 0;
|
||||
}
|
||||
}
|
||||
|
||||
/** Format a JS number[] as a pgvector literal: [1,2,3]. */
|
||||
export function toVectorLiteral(embedding: number[] | Float32Array): string {
|
||||
return `[${Array.from(embedding).join(",")}]`;
|
||||
}
|
||||
70
src/pg/index.ts
Normal file
70
src/pg/index.ts
Normal file
@ -0,0 +1,70 @@
|
||||
/**
|
||||
* pg/index.ts - Entry point for the PostgreSQL memory bridge.
|
||||
*
|
||||
* Wires qmd's LlamaCpp embedder (external OpenAI-compatible API by default,
|
||||
* optional local models) to the PG-backed memory store and returns a handle
|
||||
* that disposes both the LLM and the connection pool on close.
|
||||
*/
|
||||
|
||||
import { LlamaCpp } from "../llm.js";
|
||||
import { resolvePgConfig, isPgBackend, type PgConnectionConfig } from "./config.js";
|
||||
import { PgMemoryStore } from "./memory-store.js";
|
||||
|
||||
export { PgMemoryStore } from "./memory-store.js";
|
||||
export type {
|
||||
AddMemoryInput,
|
||||
AddMemoryResult,
|
||||
MemorySearchResult,
|
||||
MemoryRecord,
|
||||
SearchOptions,
|
||||
Embedder,
|
||||
} from "./memory-store.js";
|
||||
export {
|
||||
resolveBackend,
|
||||
isPgBackend,
|
||||
resolveNamespace,
|
||||
resolvePgConfig,
|
||||
redactConnectionString,
|
||||
DEFAULT_NAMESPACE,
|
||||
} from "./config.js";
|
||||
export type { Backend, PgConnectionConfig } from "./config.js";
|
||||
|
||||
export interface MemoryBridge {
|
||||
store: PgMemoryStore;
|
||||
config: PgConnectionConfig;
|
||||
dispose(): Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Open the memory bridge from the environment. Throws a clear error if the PG
|
||||
* backend is not configured.
|
||||
*/
|
||||
export async function openMemoryBridge(
|
||||
env: NodeJS.ProcessEnv = process.env,
|
||||
): Promise<MemoryBridge> {
|
||||
if (!isPgBackend(env)) {
|
||||
throw new Error(
|
||||
"PostgreSQL backend is not selected. Set QMD_BACKEND=pg and QMD_PG_URL to use memory commands.",
|
||||
);
|
||||
}
|
||||
const config = resolvePgConfig(env);
|
||||
|
||||
// A dedicated embedder for the bridge — lazy-loads models on first use and
|
||||
// auto-unloads after inactivity. Uses the same external embed API as the
|
||||
// SQLite engine so vectors are comparable across hosts.
|
||||
const llm = new LlamaCpp({
|
||||
inactivityTimeoutMs: 5 * 60 * 1000,
|
||||
disposeModelsOnInactivity: true,
|
||||
});
|
||||
|
||||
const store = await PgMemoryStore.open(config, llm);
|
||||
|
||||
return {
|
||||
store,
|
||||
config,
|
||||
dispose: async () => {
|
||||
await store.close();
|
||||
await llm.dispose();
|
||||
},
|
||||
};
|
||||
}
|
||||
424
src/pg/memory-store.ts
Normal file
424
src/pg/memory-store.ts
Normal file
@ -0,0 +1,424 @@
|
||||
/**
|
||||
* pg/memory-store.ts - PostgreSQL-backed memory store (the OpenClaw/Hermes bridge)
|
||||
*
|
||||
* A focused, namespace-isolated memory layer over PostgreSQL. It reuses qmd's
|
||||
* pure helpers (content hashing, chunking, docid, embedding formatting) so PG
|
||||
* search behaves consistently with the SQLite engine, then fuses lexical
|
||||
* (pg_jieba/tsvector) and semantic (pgvector) results with Reciprocal Rank
|
||||
* Fusion — the same idea qmd uses for hybrid search.
|
||||
*
|
||||
* The existing SQLite document workflow is untouched; this is an additive,
|
||||
* opt-in backend selected via QMD_BACKEND=pg.
|
||||
*/
|
||||
|
||||
import { PgClient, toVectorLiteral } from "./db-pg.js";
|
||||
import type { PgConnectionConfig } from "./config.js";
|
||||
import { bootstrapSchema, ensureVectorIndex, type FtsCapabilities } from "./schema-pg.js";
|
||||
import { hashContent, chunkDocument, getDocid } from "../store.js";
|
||||
import {
|
||||
DEFAULT_EMBED_MODEL_URI,
|
||||
formatDocForEmbedding,
|
||||
formatQueryForEmbedding,
|
||||
} from "../llm.js";
|
||||
|
||||
/** Minimal embedder contract — satisfied by LlamaCpp from llm.ts. */
|
||||
export interface Embedder {
|
||||
embedBatch(
|
||||
texts: string[],
|
||||
options?: { isQuery?: boolean; model?: string },
|
||||
): Promise<({ embedding: number[]; model: string } | null)[]>;
|
||||
}
|
||||
|
||||
export interface AddMemoryInput {
|
||||
/** Logical id within the namespace (like a path). Re-adding the same key updates it. */
|
||||
key: string;
|
||||
/** The memory body to store and index. */
|
||||
body: string;
|
||||
/** Optional human title (also surfaced in results). */
|
||||
title?: string;
|
||||
/** Arbitrary JSON metadata (agent, tags, source, ...). */
|
||||
metadata?: Record<string, unknown>;
|
||||
/** Override the default namespace for this call. */
|
||||
namespace?: string;
|
||||
/** Embedding model URI (defaults to the configured embed model). */
|
||||
model?: string;
|
||||
}
|
||||
|
||||
export interface AddMemoryResult {
|
||||
namespace: string;
|
||||
key: string;
|
||||
hash: string;
|
||||
docid: string;
|
||||
chunks: number;
|
||||
embedded: boolean;
|
||||
}
|
||||
|
||||
export interface MemorySearchResult {
|
||||
namespace: string;
|
||||
key: string;
|
||||
title: string;
|
||||
docid: string;
|
||||
hash: string;
|
||||
/** Fused RRF score (higher = better). */
|
||||
score: number;
|
||||
/** Best lexical rank (1-indexed) or null if not matched lexically. */
|
||||
lexRank: number | null;
|
||||
/** Best vector rank (1-indexed) or null if not matched semantically. */
|
||||
vecRank: number | null;
|
||||
/** Snippet/body (truncated unless full requested). */
|
||||
body: string;
|
||||
metadata: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface MemoryRecord {
|
||||
namespace: string;
|
||||
key: string;
|
||||
title: string;
|
||||
docid: string;
|
||||
hash: string;
|
||||
body: string;
|
||||
metadata: Record<string, unknown>;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
export interface SearchOptions {
|
||||
namespace?: string;
|
||||
limit?: number;
|
||||
/** Candidate pool per signal before fusion (default 50). */
|
||||
candidateLimit?: number;
|
||||
/** Return full body instead of a snippet. */
|
||||
full?: boolean;
|
||||
/** Embedding model URI for the query. */
|
||||
model?: string;
|
||||
}
|
||||
|
||||
const RRF_K = 60;
|
||||
const SNIPPET_CHARS = 400;
|
||||
|
||||
/** Reciprocal Rank Fusion across signal-specific ranked key lists. */
|
||||
function rrfFuse(rankings: Array<Map<string, number>>): Map<string, number> {
|
||||
const scores = new Map<string, number>();
|
||||
for (const ranking of rankings) {
|
||||
for (const [key, rank] of ranking) {
|
||||
scores.set(key, (scores.get(key) ?? 0) + 1 / (RRF_K + rank));
|
||||
}
|
||||
}
|
||||
return scores;
|
||||
}
|
||||
|
||||
function snippet(body: string, full: boolean): string {
|
||||
if (full || body.length <= SNIPPET_CHARS) return body;
|
||||
return body.slice(0, SNIPPET_CHARS) + "…";
|
||||
}
|
||||
|
||||
function parseMetadata(raw: unknown): Record<string, unknown> {
|
||||
if (raw && typeof raw === "object") return raw as Record<string, unknown>;
|
||||
if (typeof raw === "string") {
|
||||
try {
|
||||
return JSON.parse(raw) as Record<string, unknown>;
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
export class PgMemoryStore {
|
||||
private constructor(
|
||||
private client: PgClient,
|
||||
private embedder: Embedder,
|
||||
private fts: FtsCapabilities,
|
||||
private defaultNamespace: string,
|
||||
private defaultModel: string,
|
||||
) {}
|
||||
|
||||
static async open(
|
||||
config: PgConnectionConfig,
|
||||
embedder: Embedder,
|
||||
opts?: { model?: string },
|
||||
): Promise<PgMemoryStore> {
|
||||
const client = await PgClient.create(config);
|
||||
const fts = await bootstrapSchema(client);
|
||||
return new PgMemoryStore(
|
||||
client,
|
||||
embedder,
|
||||
fts,
|
||||
config.namespace,
|
||||
opts?.model ?? DEFAULT_EMBED_MODEL_URI,
|
||||
);
|
||||
}
|
||||
|
||||
get capabilities(): FtsCapabilities {
|
||||
return this.fts;
|
||||
}
|
||||
|
||||
private ns(override?: string): string {
|
||||
return (override ?? "").trim() || this.defaultNamespace;
|
||||
}
|
||||
|
||||
// ── Write ──────────────────────────────────────────────────────────────
|
||||
|
||||
async addMemory(input: AddMemoryInput): Promise<AddMemoryResult> {
|
||||
const namespace = this.ns(input.namespace);
|
||||
const body = input.body;
|
||||
const title = input.title ?? "";
|
||||
const model = input.model ?? this.defaultModel;
|
||||
const hash = await hashContent(body);
|
||||
const docid = getDocid(hash);
|
||||
|
||||
// Chunk + embed up front (network/LLM bound) before opening the transaction.
|
||||
const chunks = chunkDocument(body);
|
||||
const texts = chunks.map((c) => formatDocForEmbedding(c.text, title, model));
|
||||
const embeddings = await this.embedder.embedBatch(texts, { isQuery: false, model });
|
||||
|
||||
await this.client.tx(async (tx) => {
|
||||
await tx.exec(
|
||||
`INSERT INTO qmd_memory_content (namespace, hash, body)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT (namespace, hash) DO NOTHING`,
|
||||
[namespace, hash, body],
|
||||
);
|
||||
await tx.exec(
|
||||
`INSERT INTO qmd_memory (namespace, key, title, hash, metadata, updated_at, active)
|
||||
VALUES ($1, $2, $3, $4, $5::jsonb, now(), true)
|
||||
ON CONFLICT (namespace, key)
|
||||
DO UPDATE SET title = EXCLUDED.title,
|
||||
hash = EXCLUDED.hash,
|
||||
metadata = EXCLUDED.metadata,
|
||||
updated_at = now(),
|
||||
active = true`,
|
||||
[namespace, input.key, title, hash, JSON.stringify(input.metadata ?? {})],
|
||||
);
|
||||
// Refresh embeddings for this content hash.
|
||||
await tx.exec(`DELETE FROM qmd_memory_vectors WHERE namespace = $1 AND hash = $2`, [
|
||||
namespace,
|
||||
hash,
|
||||
]);
|
||||
for (let i = 0; i < chunks.length; i++) {
|
||||
const emb = embeddings[i];
|
||||
const chunk = chunks[i];
|
||||
if (!emb || !chunk) continue;
|
||||
await tx.exec(
|
||||
`INSERT INTO qmd_memory_vectors (namespace, hash, seq, pos, embedding, model)
|
||||
VALUES ($1, $2, $3, $4, $5::vector, $6)`,
|
||||
[namespace, hash, i, chunk.pos, toVectorLiteral(emb.embedding), emb.model ?? model],
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
const embeddedCount = embeddings.filter(Boolean).length;
|
||||
// Lazily build the ANN index once we know the embedding dimension.
|
||||
if (embeddedCount > 0) {
|
||||
const dim = embeddings.find(Boolean)?.embedding.length;
|
||||
if (dim) await this.ensureIndexOnce(dim);
|
||||
}
|
||||
|
||||
return {
|
||||
namespace,
|
||||
key: input.key,
|
||||
hash,
|
||||
docid,
|
||||
chunks: chunks.length,
|
||||
embedded: embeddedCount > 0,
|
||||
};
|
||||
}
|
||||
|
||||
private async ensureIndexOnce(dim: number): Promise<void> {
|
||||
const ns = this.defaultNamespace;
|
||||
const existing = await this.client.queryOne<{ value: string }>(
|
||||
`SELECT value FROM qmd_memory_config WHERE namespace = $1 AND key = 'vector_dim'`,
|
||||
[ns],
|
||||
);
|
||||
if (existing?.value === String(dim)) return;
|
||||
await ensureVectorIndex(this.client, dim);
|
||||
await this.client.exec(
|
||||
`INSERT INTO qmd_memory_config (namespace, key, value)
|
||||
VALUES ($1, 'vector_dim', $2)
|
||||
ON CONFLICT (namespace, key) DO UPDATE SET value = EXCLUDED.value`,
|
||||
[ns, String(dim)],
|
||||
);
|
||||
}
|
||||
|
||||
// ── Search ───────────────────────────────────────────────────────────────
|
||||
|
||||
async searchMemory(query: string, opts: SearchOptions = {}): Promise<MemorySearchResult[]> {
|
||||
const namespace = this.ns(opts.namespace);
|
||||
const limit = opts.limit ?? 10;
|
||||
const candidateLimit = opts.candidateLimit ?? 50;
|
||||
const model = opts.model ?? this.defaultModel;
|
||||
|
||||
// Signal 1 — lexical (pg_jieba/tsvector + ts_rank).
|
||||
const lexRows = await this.client.query<{ key: string }>(
|
||||
`SELECT m.key AS key
|
||||
FROM qmd_memory_content c
|
||||
JOIN qmd_memory m ON m.namespace = c.namespace AND m.hash = c.hash AND m.active
|
||||
WHERE c.namespace = $1 AND c.tsv @@ websearch_to_tsquery($2::regconfig, $3)
|
||||
ORDER BY ts_rank_cd(c.tsv, websearch_to_tsquery($2::regconfig, $3)) DESC
|
||||
LIMIT $4`,
|
||||
[namespace, this.fts.config, query, candidateLimit],
|
||||
);
|
||||
const lexRank = new Map<string, number>();
|
||||
lexRows.forEach((r, i) => {
|
||||
if (!lexRank.has(r.key)) lexRank.set(r.key, i + 1);
|
||||
});
|
||||
|
||||
// Signal 2 — semantic (pgvector cosine distance).
|
||||
const vecRank = new Map<string, number>();
|
||||
const qEmb = (await this.embedder.embedBatch([formatQueryForEmbedding(query, model)], {
|
||||
isQuery: true,
|
||||
model,
|
||||
}))[0];
|
||||
if (qEmb) {
|
||||
const vecRows = await this.client.query<{ key: string }>(
|
||||
`SELECT m.key AS key
|
||||
FROM qmd_memory_vectors v
|
||||
JOIN qmd_memory m ON m.namespace = v.namespace AND m.hash = v.hash AND m.active
|
||||
WHERE v.namespace = $1
|
||||
ORDER BY v.embedding <=> $2::vector
|
||||
LIMIT $3`,
|
||||
[namespace, toVectorLiteral(qEmb.embedding), candidateLimit],
|
||||
);
|
||||
vecRows.forEach((r) => {
|
||||
if (!vecRank.has(r.key)) vecRank.set(r.key, vecRank.size + 1);
|
||||
});
|
||||
}
|
||||
|
||||
const fused = rrfFuse([lexRank, vecRank]);
|
||||
if (fused.size === 0) return [];
|
||||
|
||||
const ranked = [...fused.entries()].sort((a, b) => b[1] - a[1]).slice(0, limit);
|
||||
const keys = ranked.map(([key]) => key);
|
||||
|
||||
// Hydrate the winning keys with title/body/metadata.
|
||||
const rows = await this.client.query<{
|
||||
key: string;
|
||||
title: string;
|
||||
hash: string;
|
||||
body: string;
|
||||
metadata: unknown;
|
||||
}>(
|
||||
`SELECT m.key, m.title, m.hash, c.body, m.metadata
|
||||
FROM qmd_memory m
|
||||
JOIN qmd_memory_content c ON c.namespace = m.namespace AND c.hash = m.hash
|
||||
WHERE m.namespace = $1 AND m.key = ANY($2) AND m.active`,
|
||||
[namespace, keys],
|
||||
);
|
||||
const byKey = new Map(rows.map((r) => [r.key, r]));
|
||||
|
||||
const results: MemorySearchResult[] = [];
|
||||
for (const [key, score] of ranked) {
|
||||
const row = byKey.get(key);
|
||||
if (!row) continue;
|
||||
results.push({
|
||||
namespace,
|
||||
key,
|
||||
title: row.title,
|
||||
hash: row.hash,
|
||||
docid: getDocid(row.hash),
|
||||
score,
|
||||
lexRank: lexRank.get(key) ?? null,
|
||||
vecRank: vecRank.get(key) ?? null,
|
||||
body: snippet(row.body, opts.full ?? false),
|
||||
metadata: parseMetadata(row.metadata),
|
||||
});
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
// ── Read / manage ──────────────────────────────────────────────────────
|
||||
|
||||
async getMemory(key: string, opts?: { namespace?: string }): Promise<MemoryRecord | null> {
|
||||
const namespace = this.ns(opts?.namespace);
|
||||
const row = await this.client.queryOne<{
|
||||
key: string;
|
||||
title: string;
|
||||
hash: string;
|
||||
body: string;
|
||||
metadata: unknown;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
}>(
|
||||
`SELECT m.key, m.title, m.hash, c.body, m.metadata, m.created_at, m.updated_at
|
||||
FROM qmd_memory m
|
||||
JOIN qmd_memory_content c ON c.namespace = m.namespace AND c.hash = m.hash
|
||||
WHERE m.namespace = $1 AND m.key = $2 AND m.active`,
|
||||
[namespace, key],
|
||||
);
|
||||
if (!row) return null;
|
||||
return {
|
||||
namespace,
|
||||
key: row.key,
|
||||
title: row.title,
|
||||
hash: row.hash,
|
||||
docid: getDocid(row.hash),
|
||||
body: row.body,
|
||||
metadata: parseMetadata(row.metadata),
|
||||
createdAt: row.created_at,
|
||||
updatedAt: row.updated_at,
|
||||
};
|
||||
}
|
||||
|
||||
async deleteMemory(key: string, opts?: { namespace?: string }): Promise<boolean> {
|
||||
const namespace = this.ns(opts?.namespace);
|
||||
const n = await this.client.exec(
|
||||
`UPDATE qmd_memory SET active = false, updated_at = now()
|
||||
WHERE namespace = $1 AND key = $2 AND active`,
|
||||
[namespace, key],
|
||||
);
|
||||
return n > 0;
|
||||
}
|
||||
|
||||
async listMemories(
|
||||
opts?: { namespace?: string; limit?: number },
|
||||
): Promise<Array<{ key: string; title: string; docid: string; updatedAt: string }>> {
|
||||
const namespace = this.ns(opts?.namespace);
|
||||
const rows = await this.client.query<{
|
||||
key: string;
|
||||
title: string;
|
||||
hash: string;
|
||||
updated_at: string;
|
||||
}>(
|
||||
`SELECT key, title, hash, updated_at FROM qmd_memory
|
||||
WHERE namespace = $1 AND active ORDER BY updated_at DESC LIMIT $2`,
|
||||
[namespace, opts?.limit ?? 100],
|
||||
);
|
||||
return rows.map((r) => ({
|
||||
key: r.key,
|
||||
title: r.title,
|
||||
docid: getDocid(r.hash),
|
||||
updatedAt: r.updated_at,
|
||||
}));
|
||||
}
|
||||
|
||||
async listNamespaces(): Promise<Array<{ namespace: string; count: number }>> {
|
||||
const rows = await this.client.query<{ namespace: string; count: string }>(
|
||||
`SELECT namespace, count(*)::text AS count FROM qmd_memory
|
||||
WHERE active GROUP BY namespace ORDER BY namespace`,
|
||||
);
|
||||
return rows.map((r) => ({ namespace: r.namespace, count: Number.parseInt(r.count, 10) }));
|
||||
}
|
||||
|
||||
async health(): Promise<{
|
||||
server: string;
|
||||
namespace: string;
|
||||
fts: FtsCapabilities;
|
||||
memories: number;
|
||||
}> {
|
||||
const server = await this.client.ping();
|
||||
const row = await this.client.queryOne<{ count: string }>(
|
||||
`SELECT count(*)::text AS count FROM qmd_memory WHERE namespace = $1 AND active`,
|
||||
[this.defaultNamespace],
|
||||
);
|
||||
return {
|
||||
server,
|
||||
namespace: this.defaultNamespace,
|
||||
fts: this.fts,
|
||||
memories: Number.parseInt(row?.count ?? "0", 10),
|
||||
};
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
await this.client.close();
|
||||
}
|
||||
}
|
||||
178
src/pg/schema-pg.ts
Normal file
178
src/pg/schema-pg.ts
Normal file
@ -0,0 +1,178 @@
|
||||
/**
|
||||
* pg/schema-pg.ts - PostgreSQL schema for the qmd memory bridge
|
||||
*
|
||||
* Mirrors qmd's SQLite model (content-addressable storage + documents + vectors
|
||||
* + FTS) but adapted to PostgreSQL extensions and made multi-tenant via a
|
||||
* `namespace` column so OpenClaw, Hermes, ... can share one instance safely.
|
||||
*
|
||||
* SQLite → PostgreSQL
|
||||
* content (hash → doc) → qmd_memory_content (namespace, hash, body, tsv)
|
||||
* documents → qmd_memory (namespace, key, hash, ...)
|
||||
* content_vectors + vec0 → qmd_memory_vectors (pgvector `vector`)
|
||||
* documents_fts (FTS5/BM25) → tsvector + GIN (pg_jieba 中文 / english)
|
||||
* + pg_trgm for fuzzy matching
|
||||
*/
|
||||
|
||||
import type { PgClient } from "./db-pg.js";
|
||||
|
||||
/** Text-search configuration chosen at bootstrap time. */
|
||||
export interface FtsCapabilities {
|
||||
/** ts config used for indexing/search: "jiebacfg" (中文) when available, else "english". */
|
||||
config: string;
|
||||
/** Whether pg_trgm is available for fuzzy matching. */
|
||||
trigram: boolean;
|
||||
/** Whether pgvector is available. */
|
||||
vector: boolean;
|
||||
}
|
||||
|
||||
/** Try a statement, swallow failure, report success. */
|
||||
async function tryExec(client: PgClient, sql: string): Promise<boolean> {
|
||||
try {
|
||||
await client.exec(sql);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function hasExtension(client: PgClient, name: string): Promise<boolean> {
|
||||
const row = await client.queryOne<{ one: number }>(
|
||||
"SELECT 1 AS one FROM pg_extension WHERE extname = $1",
|
||||
[name],
|
||||
);
|
||||
return !!row;
|
||||
}
|
||||
|
||||
async function hasTsConfig(client: PgClient, name: string): Promise<boolean> {
|
||||
const row = await client.queryOne<{ one: number }>(
|
||||
"SELECT 1 AS one FROM pg_ts_config WHERE cfgname = $1",
|
||||
[name],
|
||||
);
|
||||
return !!row;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create extensions + tables + indexes. Idempotent. Degrades gracefully when an
|
||||
* extension is unavailable (e.g. pg_jieba missing → falls back to `english`).
|
||||
*/
|
||||
export async function bootstrapSchema(client: PgClient): Promise<FtsCapabilities> {
|
||||
// Extensions — best-effort. A non-superuser may lack CREATE EXTENSION, in
|
||||
// which case we detect what's already installed.
|
||||
await tryExec(client, "CREATE EXTENSION IF NOT EXISTS vector");
|
||||
await tryExec(client, "CREATE EXTENSION IF NOT EXISTS pg_trgm");
|
||||
await tryExec(client, "CREATE EXTENSION IF NOT EXISTS pg_jieba");
|
||||
|
||||
const vector = await hasExtension(client, "vector");
|
||||
const trigram = await hasExtension(client, "pg_trgm");
|
||||
const jieba = await hasTsConfig(client, "jiebacfg");
|
||||
const config = jieba ? "jiebacfg" : "english";
|
||||
|
||||
if (!vector) {
|
||||
throw new Error(
|
||||
"pgvector ('vector') extension is not available on this PostgreSQL server. " +
|
||||
"It is required for the qmd memory backend (semantic search). " +
|
||||
"Install it (postgresql.svc.plus ships it) or enable it: CREATE EXTENSION vector;",
|
||||
);
|
||||
}
|
||||
|
||||
// ── content-addressable storage (+ FTS) ──────────────────────────────────
|
||||
// tsv is a generated column over the body using the detected ts config.
|
||||
await client.exec(`
|
||||
CREATE TABLE IF NOT EXISTS qmd_memory_content (
|
||||
namespace text NOT NULL,
|
||||
hash text NOT NULL,
|
||||
body text NOT NULL,
|
||||
tsv tsvector GENERATED ALWAYS AS (to_tsvector('${config}', body)) STORED,
|
||||
created_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (namespace, hash)
|
||||
)
|
||||
`);
|
||||
await tryExec(
|
||||
client,
|
||||
"CREATE INDEX IF NOT EXISTS qmd_memory_content_tsv_idx ON qmd_memory_content USING gin (tsv)",
|
||||
);
|
||||
if (trigram) {
|
||||
await tryExec(
|
||||
client,
|
||||
"CREATE INDEX IF NOT EXISTS qmd_memory_content_trgm_idx ON qmd_memory_content USING gin (body gin_trgm_ops)",
|
||||
);
|
||||
}
|
||||
|
||||
// ── memory records (documents layer) ─────────────────────────────────────
|
||||
await client.exec(`
|
||||
CREATE TABLE IF NOT EXISTS qmd_memory (
|
||||
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||
namespace text NOT NULL,
|
||||
key text NOT NULL,
|
||||
title text NOT NULL DEFAULT '',
|
||||
hash text NOT NULL,
|
||||
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
created_at timestamptz NOT NULL DEFAULT now(),
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
active boolean NOT NULL DEFAULT true,
|
||||
UNIQUE (namespace, key)
|
||||
)
|
||||
`);
|
||||
await tryExec(
|
||||
client,
|
||||
"CREATE INDEX IF NOT EXISTS qmd_memory_ns_active_idx ON qmd_memory (namespace, active)",
|
||||
);
|
||||
await tryExec(
|
||||
client,
|
||||
"CREATE INDEX IF NOT EXISTS qmd_memory_hash_idx ON qmd_memory (namespace, hash)",
|
||||
);
|
||||
|
||||
// ── per-chunk vector embeddings ──────────────────────────────────────────
|
||||
// Column is unconstrained `vector` so any embedding dimension works without a
|
||||
// rebuild; an HNSW index is added later via ensureVectorIndex() once the
|
||||
// dimension is known. Exact (<=>) search works without the index.
|
||||
await client.exec(`
|
||||
CREATE TABLE IF NOT EXISTS qmd_memory_vectors (
|
||||
namespace text NOT NULL,
|
||||
hash text NOT NULL,
|
||||
seq integer NOT NULL DEFAULT 0,
|
||||
pos integer NOT NULL DEFAULT 0,
|
||||
embedding vector NOT NULL,
|
||||
model text NOT NULL,
|
||||
embedded_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (namespace, hash, seq)
|
||||
)
|
||||
`);
|
||||
|
||||
// ── key/value config (mirrors store_config) ──────────────────────────────
|
||||
await client.exec(`
|
||||
CREATE TABLE IF NOT EXISTS qmd_memory_config (
|
||||
namespace text NOT NULL,
|
||||
key text NOT NULL,
|
||||
value text,
|
||||
PRIMARY KEY (namespace, key)
|
||||
)
|
||||
`);
|
||||
|
||||
return { config, trigram, vector };
|
||||
}
|
||||
|
||||
/**
|
||||
* Promote the embedding column to a fixed dimension and build an HNSW cosine
|
||||
* index. Best-effort: if dimensions are mixed or the operation fails, exact
|
||||
* search still works without the index.
|
||||
*/
|
||||
export async function ensureVectorIndex(client: PgClient, dimensions: number): Promise<boolean> {
|
||||
if (!Number.isInteger(dimensions) || dimensions <= 0) return false;
|
||||
// Fix the column dimension (no-op if already that dimension).
|
||||
const typed = await tryExec(
|
||||
client,
|
||||
`ALTER TABLE qmd_memory_vectors ALTER COLUMN embedding TYPE vector(${dimensions})`,
|
||||
);
|
||||
if (!typed) return false;
|
||||
return tryExec(
|
||||
client,
|
||||
"CREATE INDEX IF NOT EXISTS qmd_memory_vectors_hnsw_idx " +
|
||||
"ON qmd_memory_vectors USING hnsw (embedding vector_cosine_ops)",
|
||||
);
|
||||
}
|
||||
|
||||
/** Detect the ts config currently in use (jiebacfg when pg_jieba is present). */
|
||||
export async function detectFtsConfig(client: PgClient): Promise<string> {
|
||||
return (await hasTsConfig(client, "jiebacfg")) ? "jiebacfg" : "english";
|
||||
}
|
||||
24
test/pg-compose.yml
Normal file
24
test/pg-compose.yml
Normal file
@ -0,0 +1,24 @@
|
||||
# Local PostgreSQL for qmd memory-bridge integration tests.
|
||||
#
|
||||
# docker compose -f test/pg-compose.yml up -d
|
||||
# QMD_PG_URL='postgres://postgres:postgres@localhost:5432/postgres' \
|
||||
# npx vitest run test/pg-memory.integration.test.ts
|
||||
# docker compose -f test/pg-compose.yml down -v
|
||||
#
|
||||
# pgvector/pgvector ships pgvector but NOT pg_jieba, so FTS falls back to the
|
||||
# 'english' text-search config. For full Chinese tokenization use the
|
||||
# postgresql.svc.plus image (ghcr.io/x-evor/images/postgresql) which bundles
|
||||
# pgvector + pg_jieba + pg_trgm.
|
||||
services:
|
||||
qmd-pg:
|
||||
image: pgvector/pgvector:pg16
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_DB: postgres
|
||||
ports:
|
||||
- "5432:5432"
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "pg_isready -U postgres"]
|
||||
interval: 2s
|
||||
timeout: 5s
|
||||
retries: 20
|
||||
113
test/pg-config.test.ts
Normal file
113
test/pg-config.test.ts
Normal file
@ -0,0 +1,113 @@
|
||||
/**
|
||||
* pg-config.test.ts - Pure unit tests for the PG memory bridge configuration.
|
||||
*
|
||||
* These never touch a database, so they run everywhere. Integration tests that
|
||||
* require a live PostgreSQL are in pg-memory.integration.test.ts (gated on
|
||||
* QMD_PG_URL).
|
||||
*/
|
||||
|
||||
import { describe, test, expect } from "vitest";
|
||||
import {
|
||||
resolveBackend,
|
||||
isPgBackend,
|
||||
resolveNamespace,
|
||||
resolvePgConfig,
|
||||
resolvePgSsl,
|
||||
redactConnectionString,
|
||||
DEFAULT_NAMESPACE,
|
||||
} from "../src/pg/config.js";
|
||||
import { toVectorLiteral } from "../src/pg/db-pg.js";
|
||||
|
||||
describe("resolveBackend", () => {
|
||||
test("defaults to sqlite", () => {
|
||||
expect(resolveBackend({})).toBe("sqlite");
|
||||
expect(isPgBackend({})).toBe(false);
|
||||
});
|
||||
|
||||
test("accepts pg / postgres / postgresql", () => {
|
||||
expect(resolveBackend({ QMD_BACKEND: "pg" })).toBe("pg");
|
||||
expect(resolveBackend({ QMD_BACKEND: "postgres" })).toBe("pg");
|
||||
expect(resolveBackend({ QMD_BACKEND: "POSTGRESQL" })).toBe("pg");
|
||||
expect(isPgBackend({ QMD_BACKEND: "pg" })).toBe(true);
|
||||
});
|
||||
|
||||
test("a bare connection URL does NOT switch the backend", () => {
|
||||
// Selection must be explicit so existing SQLite workflows never change.
|
||||
expect(resolveBackend({ QMD_PG_URL: "postgres://x/y" })).toBe("sqlite");
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveNamespace", () => {
|
||||
test("defaults to the default namespace", () => {
|
||||
expect(resolveNamespace({})).toBe(DEFAULT_NAMESPACE);
|
||||
expect(resolveNamespace({ QMD_NAMESPACE: " " })).toBe(DEFAULT_NAMESPACE);
|
||||
});
|
||||
|
||||
test("uses QMD_NAMESPACE when set", () => {
|
||||
expect(resolveNamespace({ QMD_NAMESPACE: "openclaw" })).toBe("openclaw");
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolvePgSsl", () => {
|
||||
test("disable turns TLS off", () => {
|
||||
expect(resolvePgSsl({ QMD_PG_SSL: "disable" })).toBe(false);
|
||||
});
|
||||
|
||||
test("no-verify keeps TLS but skips verification", () => {
|
||||
expect(resolvePgSsl({ QMD_PG_SSL: "no-verify" })).toEqual({ rejectUnauthorized: false });
|
||||
});
|
||||
|
||||
test("defaults to TLS without verification when no CA is supplied", () => {
|
||||
expect(resolvePgSsl({})).toEqual({ rejectUnauthorized: false });
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolvePgConfig", () => {
|
||||
test("throws a helpful error when no URL is set", () => {
|
||||
expect(() => resolvePgConfig({ QMD_BACKEND: "pg" })).toThrow(/no connection URL/i);
|
||||
});
|
||||
|
||||
test("builds config from QMD_PG_URL", () => {
|
||||
const cfg = resolvePgConfig({
|
||||
QMD_BACKEND: "pg",
|
||||
QMD_PG_URL: "postgres://postgres:secret@db.example.com:5443/qmd",
|
||||
QMD_NAMESPACE: "hermes",
|
||||
});
|
||||
expect(cfg.connectionString).toContain("db.example.com:5443");
|
||||
expect(cfg.namespace).toBe("hermes");
|
||||
expect(cfg.max).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
test("falls back to DATABASE_URL", () => {
|
||||
const cfg = resolvePgConfig({ DATABASE_URL: "postgres://u:p@h:5432/d" });
|
||||
expect(cfg.connectionString).toContain("h:5432");
|
||||
});
|
||||
|
||||
test("respects pool + timeout overrides", () => {
|
||||
const cfg = resolvePgConfig({
|
||||
QMD_PG_URL: "postgres://u:p@h:5432/d",
|
||||
QMD_PG_POOL_MAX: "12",
|
||||
QMD_PG_CONNECT_TIMEOUT_MS: "2500",
|
||||
});
|
||||
expect(cfg.max).toBe(12);
|
||||
expect(cfg.connectionTimeoutMillis).toBe(2500);
|
||||
});
|
||||
});
|
||||
|
||||
describe("redactConnectionString", () => {
|
||||
test("hides the password", () => {
|
||||
const out = redactConnectionString("postgres://postgres:topsecret@db:5443/qmd");
|
||||
expect(out).not.toContain("topsecret");
|
||||
expect(out).toContain("db:5443");
|
||||
});
|
||||
});
|
||||
|
||||
describe("toVectorLiteral", () => {
|
||||
test("formats number arrays as a pgvector literal", () => {
|
||||
expect(toVectorLiteral([1, 2, 3])).toBe("[1,2,3]");
|
||||
});
|
||||
|
||||
test("handles Float32Array", () => {
|
||||
expect(toVectorLiteral(new Float32Array([0.5, -0.25]))).toBe("[0.5,-0.25]");
|
||||
});
|
||||
});
|
||||
113
test/pg-memory.integration.test.ts
Normal file
113
test/pg-memory.integration.test.ts
Normal file
@ -0,0 +1,113 @@
|
||||
/**
|
||||
* pg-memory.integration.test.ts - End-to-end tests against a live PostgreSQL.
|
||||
*
|
||||
* Skipped unless QMD_PG_URL is set. Spin up a PG with pgvector (+ optionally
|
||||
* pg_jieba) and run:
|
||||
*
|
||||
* QMD_PG_URL='postgres://postgres:postgres@localhost:5432/postgres' \
|
||||
* npx vitest run test/pg-memory.integration.test.ts
|
||||
*
|
||||
* Uses a deterministic stub embedder so the test needs no network or models —
|
||||
* it verifies schema bootstrap, namespaced writes, hybrid (FTS+vector) search,
|
||||
* RRF fusion, get/list/delete, and namespace isolation.
|
||||
*/
|
||||
|
||||
import { describe, test, expect, beforeAll, afterAll } from "vitest";
|
||||
import { PgMemoryStore, type Embedder } from "../src/pg/memory-store.js";
|
||||
import { resolvePgConfig } from "../src/pg/config.js";
|
||||
|
||||
const PG_URL = process.env.QMD_PG_URL ?? process.env.DATABASE_URL;
|
||||
|
||||
/** Deterministic 16-dim embedder — char-frequency histogram, L2-normalized. */
|
||||
const stubEmbedder: Embedder = {
|
||||
async embedBatch(texts) {
|
||||
return texts.map((t) => {
|
||||
const v = new Array(16).fill(0);
|
||||
for (const ch of t.toLowerCase()) v[ch.charCodeAt(0) % 16] += 1;
|
||||
const norm = Math.sqrt(v.reduce((s, x) => s + x * x, 0)) || 1;
|
||||
return { embedding: v.map((x) => x / norm), model: "stub-16" };
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
const NS = `test_${Date.now()}`;
|
||||
|
||||
describe.skipIf(!PG_URL)("PgMemoryStore (integration)", () => {
|
||||
let store: PgMemoryStore;
|
||||
|
||||
beforeAll(async () => {
|
||||
const config = { ...resolvePgConfig({ QMD_PG_URL: PG_URL!, QMD_NAMESPACE: NS }) };
|
||||
store = await PgMemoryStore.open(config, stubEmbedder);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
if (store) {
|
||||
// Best-effort cleanup of the test namespace.
|
||||
try {
|
||||
for (const m of await store.listMemories({ limit: 1000 })) {
|
||||
await store.deleteMemory(m.key);
|
||||
}
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
await store.close();
|
||||
}
|
||||
});
|
||||
|
||||
test("bootstraps schema with pgvector", async () => {
|
||||
expect(store.capabilities.vector).toBe(true);
|
||||
});
|
||||
|
||||
test("adds and retrieves a memory", async () => {
|
||||
const res = await store.addMemory({
|
||||
key: "note/auth",
|
||||
title: "Auth design",
|
||||
body: "The login flow uses OAuth tokens and a refresh rotation strategy.",
|
||||
});
|
||||
expect(res.namespace).toBe(NS);
|
||||
expect(res.chunks).toBeGreaterThan(0);
|
||||
expect(res.embedded).toBe(true);
|
||||
|
||||
const got = await store.getMemory("note/auth");
|
||||
expect(got?.body).toContain("OAuth");
|
||||
expect(got?.title).toBe("Auth design");
|
||||
});
|
||||
|
||||
test("hybrid search finds a memory by keyword", async () => {
|
||||
await store.addMemory({ key: "note/cache", body: "Redis caching layer with TTL eviction." });
|
||||
const results = await store.searchMemory("redis caching", { limit: 5 });
|
||||
expect(results.length).toBeGreaterThan(0);
|
||||
expect(results.some((r) => r.key === "note/cache")).toBe(true);
|
||||
// At least one signal contributed.
|
||||
expect(results[0]!.lexRank !== null || results[0]!.vecRank !== null).toBe(true);
|
||||
});
|
||||
|
||||
test("re-adding the same key updates it", async () => {
|
||||
await store.addMemory({ key: "note/dup", body: "first version" });
|
||||
await store.addMemory({ key: "note/dup", body: "second version updated" });
|
||||
const got = await store.getMemory("note/dup");
|
||||
expect(got?.body).toBe("second version updated");
|
||||
const all = await store.listMemories({ limit: 1000 });
|
||||
expect(all.filter((m) => m.key === "note/dup").length).toBe(1);
|
||||
});
|
||||
|
||||
test("delete soft-removes a memory", async () => {
|
||||
await store.addMemory({ key: "note/temp", body: "temporary" });
|
||||
expect(await store.deleteMemory("note/temp")).toBe(true);
|
||||
expect(await store.getMemory("note/temp")).toBeNull();
|
||||
});
|
||||
|
||||
test("namespaces isolate memories", async () => {
|
||||
const other = `${NS}_other`;
|
||||
await store.addMemory({ key: "shared/key", body: "in default test ns" });
|
||||
await store.addMemory({ key: "shared/key", body: "in other ns", namespace: other });
|
||||
|
||||
const a = await store.getMemory("shared/key");
|
||||
const b = await store.getMemory("shared/key", { namespace: other });
|
||||
expect(a?.body).toBe("in default test ns");
|
||||
expect(b?.body).toBe("in other ns");
|
||||
|
||||
// cleanup other ns
|
||||
await store.deleteMemory("shared/key", { namespace: other });
|
||||
});
|
||||
});
|
||||
Loading…
Reference in New Issue
Block a user