Compare commits

...

2 Commits

Author SHA1 Message Date
Haitao Pan
236c83a5f3 Merge feat/pg-memory-bridge: PostgreSQL memory bridge 2026-06-23 19:13:05 +08:00
Haitao Pan
47bd3ded44 feat(pg): add switchable PostgreSQL backend + OpenClaw/Hermes memory bridge
Add an optional PostgreSQL backend (QMD_BACKEND=pg) alongside the
unchanged default SQLite path. PG store uses pgvector (HNSW) for vectors
and pg_jieba + pg_trgm for full-text/Chinese tokenization, with a
namespace column isolating multi-agent memory (openclaw/hermes).

- src/pg/: config, db-pg, schema bootstrap, memory store
- MCP memory_add/memory_search/memory_get tools; qmd pg status + memory CLI
- connection via QMD_PG_URL/DATABASE_URL/qmd config, stunnel TLS 5443
- tests: pg-config (unit) + pg-memory integration (gated on QMD_PG_URL) + pg-compose
- docs/plan: plan, usage, test report, changelog; track docs/**/*.md

SQLite path: zero regression (typecheck clean, 249 passed / 6 skipped).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 19:13:04 +08:00
20 changed files with 1997 additions and 3 deletions

1
.gitignore vendored
View File

@ -15,6 +15,7 @@ texts/
!skills/**/*.md
!finetune/*.md
!docs/*.md
!docs/**/*.md
finetune/outputs/
finetune/data/train/
.claude/

View File

@ -4,6 +4,20 @@
### Changes
- Backend: add an optional **PostgreSQL memory bridge** so qmd can act as a
shared, namespaced, persistent memory store for external agents (OpenClaw,
Hermes, …) on top of `pgvector` + `pg_jieba` + `pg_trgm` (e.g. the
`postgresql.svc.plus` runtime). The local SQLite document workflow is the
default and is unchanged. Enable with `QMD_BACKEND=pg` and `QMD_PG_URL`.
- CLI: `qmd memory add|search|get|rm|ls|namespaces` and `qmd pg status`.
- MCP: when `QMD_BACKEND=pg`, `qmd mcp` also exposes `memory_add`,
`memory_search`, `memory_get`, and `memory_list` tools alongside the
existing document-search tools.
- Search is hybrid: `pg_jieba/tsvector` lexical + `pgvector` cosine, fused
with Reciprocal Rank Fusion; embeddings reuse the same external embedding
API as the SQLite engine. Memory is isolated per `QMD_NAMESPACE`.
- See `docs/plan/pg-memory-bridge-usage.md` for configuration and
`docs/plan/pg-backend-memory-bridge.md` for the design.
- CLI: add `qmd sync` for SSH/rsync-based QMD source-file and YAML-config
synchronization between a local machine and remote QMD host. The sync path
uses resumable rsync transfers, conflict-copy preservation, and keeps SQLite

View File

@ -115,6 +115,27 @@ qmd multi-get "#abc123, #def456"
--json, --csv, --md, --xml, --files
```
## PostgreSQL memory bridge (optional)
QMD can additionally run as a shared, namespaced memory store backed by
PostgreSQL (`pgvector` + `pg_jieba` + `pg_trgm`, e.g. `postgresql.svc.plus`) for
external agents (OpenClaw, Hermes). SQLite remains the default and is unchanged.
```sh
export QMD_BACKEND=pg
export QMD_PG_URL='postgres://user:pass@host:5443/db' # stunnel TLS port
export QMD_NAMESPACE=openclaw # tenant isolation
qmd pg status # backend health
qmd memory add <key> "text" --title T # store/replace (text or stdin)
qmd memory search <query> # hybrid FTS(pg_jieba)+vector(pgvector), RRF
qmd memory get|rm|ls|namespaces
```
When `QMD_BACKEND=pg`, `qmd mcp` also exposes `memory_add/memory_search/
memory_get/memory_list` tools. Code lives in `src/pg/`. Design + usage:
`docs/plan/pg-backend-memory-bridge.md`, `docs/plan/pg-memory-bridge-usage.md`.
## Development
```sh

View File

@ -9,6 +9,7 @@
"better-sqlite3": "12.8.0",
"fast-glob": "3.3.3",
"node-llama-cpp": "3.18.1",
"pg": "8.13.1",
"picomatch": "4.0.4",
"sqlite-vec": "0.1.9",
"web-tree-sitter": "0.26.7",
@ -17,6 +18,7 @@
},
"devDependencies": {
"@types/better-sqlite3": "7.6.13",
"@types/pg": "8.11.10",
"tsx": "4.21.0",
"vitest": "3.2.4",
},
@ -215,6 +217,8 @@
"@types/node": ["@types/node@24.10.1", "", { "dependencies": { "undici-types": "~7.16.0" } }, "sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ=="],
"@types/pg": ["@types/pg@8.11.10", "", { "dependencies": { "@types/node": "*", "pg-protocol": "*", "pg-types": "^4.0.1" } }, "sha512-LczQUW4dbOQzsH2RQ5qoeJ6qJPdrcM/DcMLoqWQkMLMsq83J5lAX3LXjdkWdpscFy67JSOWDnh7Ny/sPFykmkg=="],
"@vitest/expect": ["@vitest/expect@3.2.4", "", { "dependencies": { "@types/chai": "^5.2.2", "@vitest/spy": "3.2.4", "@vitest/utils": "3.2.4", "chai": "^5.2.0", "tinyrainbow": "^2.0.0" } }, "sha512-Io0yyORnB6sikFlt8QW5K7slY4OjqNX9jmJQ02QDda8lyM6B5oNgVWoSoKPac8/kgnCUzuHQKrSLtu/uOqqrig=="],
"@vitest/mocker": ["@vitest/mocker@3.2.4", "", { "dependencies": { "@vitest/spy": "3.2.4", "estree-walker": "^3.0.3", "magic-string": "^0.30.17" }, "peerDependencies": { "msw": "^2.4.9", "vite": "^5.0.0 || ^6.0.0 || ^7.0.0-0" }, "optionalPeers": ["msw", "vite"] }, "sha512-46ryTE9RZO/rfDd7pEqFl7etuyzekzEhUbTW3BvmeO/BcCMEgq59BKhek3dXDWgAj4oMK6OZi+vRr1wPW6qjEQ=="],
@ -521,6 +525,8 @@
"object-inspect": ["object-inspect@1.13.4", "", {}, "sha512-W67iLl4J2EXEGTbfeHCffrjDfitvLANg0UlX3wFUUSTx92KXRFegMHUVgSqE+wvhAbi4WqjGg9czysTV2Epbew=="],
"obuf": ["obuf@1.1.2", "", {}, "sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg=="],
"on-finished": ["on-finished@2.4.1", "", { "dependencies": { "ee-first": "1.1.1" } }, "sha512-oVlzkg3ENAhCk2zdv7IJwd/QUD4z2RxRwpkcGY8psCVcCYZNq4wYnVWALHM+brtuJjePWiYF/ClmuDr8Ch5+kg=="],
"once": ["once@1.4.0", "", { "dependencies": { "wrappy": "1" } }, "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w=="],
@ -541,6 +547,24 @@
"pathval": ["pathval@2.0.1", "", {}, "sha512-//nshmD55c46FuFw26xV/xFAaB5HF9Xdap7HJBBnrKdAd6/GxDBaNA1870O79+9ueg61cZLSVc+OaFlfmObYVQ=="],
"pg": ["pg@8.13.1", "", { "dependencies": { "pg-connection-string": "^2.7.0", "pg-pool": "^3.7.0", "pg-protocol": "^1.7.0", "pg-types": "^2.1.0", "pgpass": "1.x" }, "optionalDependencies": { "pg-cloudflare": "^1.1.1" }, "peerDependencies": { "pg-native": ">=3.0.1" }, "optionalPeers": ["pg-native"] }, "sha512-OUir1A0rPNZlX//c7ksiu7crsGZTKSOXJPgtNiHGIlC9H0lO+NC6ZDYksSgBYY/thSWhnSRBv8w1lieNNGATNQ=="],
"pg-cloudflare": ["pg-cloudflare@1.4.0", "", {}, "sha512-Vo7z/6rrQYxpNRylp4Tlob2elzbh+N/MOQbxFVWCxS7oEx6jF53GTJFxK2WWpKuBRkmiin4Mt+xofFDjx09R0A=="],
"pg-connection-string": ["pg-connection-string@2.14.0", "", {}, "sha512-XwWDGcLRGCXAR8F/AM5bG7Q+A3Wm2s6QeEjlOKZLlH3UYcguiqCWKyWXVag5TLTIjR7oOJUY8kcADaZgWPyLeg=="],
"pg-int8": ["pg-int8@1.0.1", "", {}, "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw=="],
"pg-numeric": ["pg-numeric@1.0.2", "", {}, "sha512-BM/Thnrw5jm2kKLE5uJkXqqExRUY/toLHda65XgFTBTFYZyopbKjBe29Ii3RbkvlsMoFwD+tHeGaCjjv0gHlyw=="],
"pg-pool": ["pg-pool@3.14.0", "", { "peerDependencies": { "pg": ">=8.0" } }, "sha512-gKtPkFdQPU3DksooVLi9LsjZxrsBUZIpa+7aVx+LV5pNh0KzP4Zleud2po+ConrxbuXGBJ6Hfer6hdgpIBpBaw=="],
"pg-protocol": ["pg-protocol@1.15.0", "", {}, "sha512-cq9sECI5s0+uPUXjbz8ioyPJni6RzsRib0US67i5IoTZKw8fNeYlVE7u8F4dG7vEJJtc5wdD1K189lCCUwqWTQ=="],
"pg-types": ["pg-types@4.1.0", "", { "dependencies": { "pg-int8": "1.0.1", "pg-numeric": "1.0.2", "postgres-array": "~3.0.1", "postgres-bytea": "~3.0.0", "postgres-date": "~2.1.0", "postgres-interval": "^3.0.0", "postgres-range": "^1.1.1" } }, "sha512-o2XFanIMy/3+mThw69O8d4n1E5zsLhdO+OPqswezu7Z5ekP4hYDqlDjlmOpYMbzY2Br0ufCwJLdDIXeNVwcWFg=="],
"pgpass": ["pgpass@1.0.6", "", {}, "sha512-lqIfH7bdgsxHAY/ZnUOwm+aCFKrsHBDhSFuk9O0B9uCqJAIkrKTo/+LQqLPLUS4e04+jCmQVikxE3QipH5chPw=="],
"picocolors": ["picocolors@1.1.1", "", {}, "sha512-xceH2snhtb5M9liqDsmEw56le376mTZkEX/jEb/RxNFyegNul7eNslCXP9FDj/Lcu0X8KEyMceP2ntpaHrDEVA=="],
"picomatch": ["picomatch@4.0.4", "", {}, "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A=="],
@ -549,6 +573,16 @@
"postcss": ["postcss@8.5.6", "", { "dependencies": { "nanoid": "^3.3.11", "picocolors": "^1.1.1", "source-map-js": "^1.2.1" } }, "sha512-3Ybi1tAuwAP9s0r1UQ2J4n5Y0G05bJkpUIO0/bI9MhwmD70S5aTWbXGBwxHrelT+XM1k6dM0pk+SwNkpTRN7Pg=="],
"postgres-array": ["postgres-array@3.0.4", "", {}, "sha512-nAUSGfSDGOaOAEGwqsRY27GPOea7CNipJPOA7lPbdEpx5Kg3qzdP0AaWC5MlhTWV9s4hFX39nomVZ+C4tnGOJQ=="],
"postgres-bytea": ["postgres-bytea@3.0.0", "", { "dependencies": { "obuf": "~1.1.2" } }, "sha512-CNd4jim9RFPkObHSjVHlVrxoVQXz7quwNFpz7RY1okNNme49+sVyiTvTRobiLV548Hx/hb1BG+iE7h9493WzFw=="],
"postgres-date": ["postgres-date@2.1.0", "", {}, "sha512-K7Juri8gtgXVcDfZttFKVmhglp7epKb1K4pgrkLxehjqkrgPhfG6OO8LHLkfaqkbpjNRnra018XwAr1yQFWGcA=="],
"postgres-interval": ["postgres-interval@3.0.0", "", {}, "sha512-BSNDnbyZCXSxgA+1f5UU2GmwhoI0aU5yMxRGO8CdFEcY2BQF9xm/7MqKnYoM1nJDk8nONNWDk9WeSmePFhQdlw=="],
"postgres-range": ["postgres-range@1.1.4", "", {}, "sha512-i/hbxIE9803Alj/6ytL7UHQxRvZkI9O4Sy+J3HGc4F4oo/2eQAjTSNJ0bfxyse3bH0nuVesCk+3IRLaMtG3H6w=="],
"prebuild-install": ["prebuild-install@7.1.3", "", { "dependencies": { "detect-libc": "^2.0.0", "expand-template": "^2.0.3", "github-from-package": "0.0.0", "minimist": "^1.2.3", "mkdirp-classic": "^0.5.3", "napi-build-utils": "^2.0.0", "node-abi": "^3.3.0", "pump": "^3.0.0", "rc": "^1.2.7", "simple-get": "^4.0.0", "tar-fs": "^2.0.0", "tunnel-agent": "^0.6.0" }, "bin": { "prebuild-install": "bin.js" } }, "sha512-8Mf2cbV7x1cXPUILADGI3wuhfqWvtiLA1iclTDbFRZkgRQS0NqsPZphna9V+HyTEadheuPmjaJMsbzKQFOzLug=="],
"pretty-bytes": ["pretty-bytes@6.1.1", "", {}, "sha512-mQUvGU6aUFQ+rNvTIAcZuWGRT9a6f6Yrg9bHs4ImKF+HZCEK+plBvnAZYSIQztknZF2qnzNtr6F8s0+IuptdlQ=="],
@ -735,6 +769,8 @@
"wrappy": ["wrappy@1.0.2", "", {}, "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ=="],
"xtend": ["xtend@4.0.2", "", {}, "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ=="],
"y18n": ["y18n@5.0.8", "", {}, "sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA=="],
"yallist": ["yallist@5.0.0", "", {}, "sha512-YgvUTfwqyc7UXVMrB+SImsVYSmTS8X/tSrtdNZMImM+n7+QTriRXyXim0mBrTXNeqzVF0KWGgHPeiyViFFrNDw=="],
@ -777,6 +813,8 @@
"ora/cli-spinners": ["cli-spinners@3.4.0", "", {}, "sha512-bXfOC4QcT1tKXGorxL3wbJm6XJPDqEnij2gQ2m7ESQuE+/z9YFIWnl/5RpTiKWbMq3EVKR4fRLJGn6DVfu0mpw=="],
"pg/pg-types": ["pg-types@2.2.0", "", { "dependencies": { "pg-int8": "1.0.1", "postgres-array": "~2.0.0", "postgres-bytea": "~1.0.0", "postgres-date": "~1.0.4", "postgres-interval": "^1.1.0" } }, "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA=="],
"postcss/nanoid": ["nanoid@3.3.11", "", { "bin": { "nanoid": "bin/nanoid.cjs" } }, "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w=="],
"proper-lockfile/retry": ["retry@0.12.0", "", {}, "sha512-9LkiTwjUh6rT555DtE9rTX+BKByPfrMzEAtnlEtdEwr3Nkffwiihqe2bWADg+OQRjt9gl6ICdmB/ZFDCGAtSow=="],
@ -815,6 +853,14 @@
"ipull/pretty-ms/parse-ms": ["parse-ms@3.0.0", "", {}, "sha512-Tpb8Z7r7XbbtBTrM9UhpkzzaMrqA2VXMT3YChzYltwV3P3pM6t8wl7TvpMnSTosz1aQAdVib7kdoys7vYOPerw=="],
"pg/pg-types/postgres-array": ["postgres-array@2.0.0", "", {}, "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA=="],
"pg/pg-types/postgres-bytea": ["postgres-bytea@1.0.1", "", {}, "sha512-5+5HqXnsZPE65IJZSMkZtURARZelel2oXUEO8rH83VS/hxH5vv1uHquPg5wZs8yMAfdv971IU+kcPUczi7NVBQ=="],
"pg/pg-types/postgres-date": ["postgres-date@1.0.7", "", {}, "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q=="],
"pg/pg-types/postgres-interval": ["postgres-interval@1.2.0", "", { "dependencies": { "xtend": "^4.0.0" } }, "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ=="],
"stdout-update/string-width/get-east-asian-width": ["get-east-asian-width@1.4.0", "", {}, "sha512-QZjmEOC+IT1uk6Rx0sX22V6uHWVwbdbxf1faPqJ1QhLdGgsRGCZoyaQBm/piRdJy/D2um6hM1UP7ZEeQ4EkP+Q=="],
"wrap-ansi/string-width/emoji-regex": ["emoji-regex@8.0.0", "", {}, "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A=="],

View File

@ -0,0 +1,104 @@
# 规划qmd 接入 PostgreSQL成为 OpenClaw + Hermes 的记忆桥梁
> 状态:草案(待评审) · 日期2026-06-22
> 范围决策:**PG 作为可切换的替代后端**SQLite 保持默认且完全不变);桥梁需要 **写入 + 命名空间隔离**
## 1. 现状与判断(已核实)
**qmd**`@tobilu/qmd` 2.1.0)是本地混合检索引擎:
- 存储层两文件:
- `src/db.ts` —— 跨运行时 **同步** SQLite 抽象,窄接口 `exec / prepare→{run,get,all} / transaction / loadExtension / close`
- `src/store.ts` —— 4690 行,承载所有数据访问 + 检索逻辑。
- `Store``createStore()``src/store.ts:1604`)构造的门面对象,约 50 个方法。**今天只有 3 个是 async**`searchVec / expandQuery / rerank`,受 LLM 限制),其余全部同步。
- Schema 强 SQLite 方言:
- `content`内容寻址source of truth
- `documents`(虚拟路径 → 内容 hash 映射)
- `content_vectors` + `vectors_vec`sqlite-vec `vec0` 虚拟表)
- `documents_fts`FTS5 BM25 + 触发器)
- `llm_cache` / `store_collections` / `store_config`
- 已暴露 MCPstdio / http目前以 **只读检索** 为主(约 4 个 tool
**postgresql.svc.plus** 恰好提供桥梁所需全部扩展:**pgvector**(向量)、**pg_jieba + pg_trgm**(中文分词 + 全文/模糊)、**pgmq**轻量消息队列、pg_cron。经 **stunnel TLS 5443** 接入。
> **关键结论****不能在 `Database` 驱动层做适配**。
> 原因:(1) node-postgres 是异步,而 `Database` 接口是同步;(2) FTS5 / `vec0` / `PRAGMA` / 触发器都是 SQLite 专有 SQL换驱动也必须重写 SQL。
> 正确切口是 **把 `Store` 抽象成接口,做并列的后端实现**
## 2. 目标架构(记忆桥梁)
```
OpenClaw agent ─┐ ┌─ pgvector (语义检索)
├─ qmd MCP (stdio/http) ──→ │ pg_jieba+pg_trgm (中文全文/模糊)
Hermes agent ──┘ memory_* + search 工具 └─ pgmq (异步入库队列, 可选)
│ postgresql.svc.plus (stunnel 5443 TLS)
backend=pg ↔ backend=sqlite (默认, 不变)
```
- 默认仍是 `~/.cache/qmd/index.sqlite`;设 `QMD_BACKEND=pg` + 连接串即切到共享 PG。
- 多 agent 通过 **namespace**openclaw / hermes / …)隔离,共用同一 PG 记忆库。
- 价值:记忆从“单机 SQLite”升级为“跨 agent、跨主机、持久共享”的中心化记忆。
## 3. 后端抽象设计(核心)
### 3.1 提升 `Store``IStore` 接口,统一异步
- 现有 `createStore` 原样保留为 `createSqliteStore`**内部逻辑一行不改**,仅把同步方法用 `Promise.resolve(...)` 包一层以满足异步签名 → 满足“原有功能不变”,靠现有测试全绿守住回归。
- 新增工厂 `createStore({ backend, url, namespace })`,按 `QMD_BACKEND` / config 分发到 sqlite 或 pg。
- CLI`src/cli/qmd.ts`)与 MCP`src/mcp/server.ts`)调用处统一 `await`(绝大多数本就在 async 上下文)。
### 3.2 新增 `createPgStore`
- 新文件 `src/store-pg.ts` + `src/db-pg.ts`,使用 `pg` 连接池 + TLS。
- 实现同一 `IStore`,对外行为与 SQLite 后端对齐。
## 4. Schema 映射SQLite → PG 扩展)
| SQLite 现状 | PG 实现 | 备注 |
|---|---|---|
| `content` / `documents` / `llm_cache` / `store_collections` / `store_config` | 普通表,结构基本一致 | 加 `namespace` 列 |
| `content_vectors` + `vectors_vec`(vec0) | 单表 `vector` 列(**pgvector**+ HNSW `vector_cosine_ops` 索引 | 维度由外部 embedding API 决定 |
| `documents_fts`(FTS5 BM25) + 触发器 | `tsvector` 生成列(**pg_jieba** 中文 + english 双配置)+ GIN 索引;`pg_trgm` 供模糊 | ⚠️ `ts_rank` ≠ BM25打分语义不同 |
| docid = hash 前 6 位 | 应用层不变 | — |
| 触发器维护 FTS | PG 生成列 / 触发器 | — |
## 5. 记忆桥梁能力(写入 + 命名空间)
- 所有表加 `namespace TEXT NOT NULL``store({ namespace })` 注入;查询/写入自动按 namespace 过滤。OpenClaw、Hermes 各自隔离,亦可读共享 namespace。
- 新增 MCP 写入工具:
- `memory_add` —— 写入一条记忆(内容寻址 + 分块 + 嵌入)
- `memory_search` —— 即现有 hybrid query带 namespace
- `memory_get`
- 读路径复用现有 search / query / get。
- 可选:用 **pgmq** 做异步入库agent 高频写入先入队,后台批量 embed避免阻塞 agent。
## 6. 连接与配置
- 连接串:`QMD_PG_URL` / `DATABASE_URL`,或 `qmd config set backend pg / pg-url ...`
- TLS走 postgresql.svc.plus 的 stunnel `5443``sslmode` / CA 配置)。`pg` 在 Bun 与 Node 均可用。
- **嵌入向量必须用共享的外部 OpenAI 兼容 API**qmd 已支持),保证跨主机维度/模型一致;本地 node-llama-cpp 仅用于 SQLite 单机场景。
- `qmd status` 显示当前 backend / namespace / PG 健康。
## 7. 分阶段实施
| 阶段 | 内容 | 验收 |
|---|---|---|
| **P1 接口化** | `Store→IStore` 异步化 + `createSqliteStore` 包装 + backend 工厂 + 调用处 await | **全量测试在 SQLite 路径零回归** |
| **P2 PgStore** | `db-pg.ts` + `store-pg.ts`DDL 引导pgvector/pg_jieba/pg_trgm实现全部 `IStore` 方法 | PgStore 通过与 SQLite 相同的单测子集 |
| **P3 配置接入** | env/config 选择 PG连 postgresql.svc.plus`qmd status` 展示后端 | 真实连通 + 健康检查 |
| **P4 桥梁** | `namespace` 隔离 + `memory_add/search/get` MCP 工具(+ 可选 pgmq 异步入库) | 双 agent 命名空间隔离可验证 |
| **P5 测试与文档** | 用 postgresql.svc.plus 镜像起 PG 跑集成/对等测试SQLite vs PG 结果对比),更新 README/CLAUDE.md/CHANGELOG | 对等测试 + 文档 |
## 8. 主要风险
1. **打分语义漂移**PG `ts_rank` 非 BM25 → 排序差异;靠 RRF 融合 + reranker 缓解(可后续评估 ParadeDB BM25
2. **同步→异步重构面广**CLI/MCP 多处调用 → 回归风险;以 SQLite 路径全测试守住。
3. **远程延迟**:远程 PG over TLS vs 本地 SQLite每查询多次往返用连接池 + 批量,必要时保留本地 SQLite 作读缓存。
4. **嵌入一致性**:跨主机必须统一外部 embedding 服务,否则向量不可比。
## 9. 关联文件索引
- qmd 存储抽象:`src/db.ts`、`src/store.ts``createStore` @ `src/store.ts:1604`
- qmd MCP`src/mcp/server.ts`
- qmd CLI`src/cli/qmd.ts`
- PG 运行时与扩展:`../postgresql.svc.plus/`pgvector / pg_jieba / pg_trgm / pgmq / pg_cronstunnel 5443

View File

@ -0,0 +1,32 @@
# 变更日志PostgreSQL 记忆桥梁
> 关联:[规划](pg-backend-memory-bridge.md) · [使用说明](pg-memory-bridge-usage.md) · [测试报告](pg-memory-bridge-test-report.md)
>
> 本文件为该特性的聚焦式变更记录;面向用户的总览见仓库根 `CHANGELOG.md``[Unreleased]`
## [Unreleased] — PostgreSQL 可切换后端 + OpenClaw/Hermes 记忆桥梁
### 新增 (Added)
- **可切换 PG 后端**`QMD_BACKEND=pg` 时改用 PostgreSQLSQLite 仍为默认且行为不变。后端工厂集中在 `src/pg/`
- **PG 记忆存储**`src/pg/`
- schema 引导(`content` / `documents` / `content_vectors` / `llm_cache` / `store_collections` / `store_config`,均带 `namespace` 列)。
- **pgvector** 向量列 + HNSW `vector_cosine_ops` 索引(替代 sqlite-vec `vec0`)。
- **pg_jieba + pg_trgm**`tsvector` 全文/中文分词与模糊匹配(替代 FTS5
- **命名空间隔离**:所有读写按 `namespace`openclaw / hermes / …)过滤,使多 agent 共用同一 PG 记忆库而互不串扰。
- **MCP 记忆工具**`memory_add` / `memory_search` / `memory_get``src/mcp/server.ts`),让 OpenClaw、Hermes 通过 qmd MCP 写入与检索记忆。
- **CLI**`qmd pg status`(连接/扩展/健康探测)与 `memory` 子命令(`src/cli/pg-commands.ts`、`src/cli/qmd.ts`)。
- **连接配置**`QMD_PG_URL` / `DATABASE_URL` / `qmd config`,支持 postgresql.svc.plus 的 stunnel TLS`5443` / `sslmode`)。
- **测试**`test/pg-config.test.ts`(配置解析)、`test/pg-memory.integration.test.ts`(门控 `QMD_PG_URL` 的集成用例)、`test/pg-compose.yml`(本地 PG 编排)。
- **文档**`docs/plan/` 下规划、使用说明、测试报告与本变更日志。
### 变更 (Changed)
- `package.json`:新增 `pg` 运行时依赖Bun 与 Node 均可用)。
- `CLAUDE.md`:补充 PG 后端与记忆桥梁说明。
- `.gitignore`:放开 `docs/**/*.md`,纳入嵌套文档目录。
### 不变 (Unchanged)
- 默认 SQLite 路径(`~/.cache/qmd/index.sqlite`)的索引、检索与所有 CLI/MCP 行为零回归。
### 已知限制 (Known limitations)
- PG `ts_rank` 与 FTS5 BM25 打分语义不同排序存在差异RRF + reranker 缓解)。
- 集成测试需外部 PostgreSQL尚未对接真实实例常态化执行见测试报告 §5

View File

@ -0,0 +1,61 @@
# 测试验证报告PostgreSQL 记忆桥梁
> 关联:[规划](pg-backend-memory-bridge.md) · [使用说明](pg-memory-bridge-usage.md)
> 日期2026-06-23 · 范围P1P5qmd PG 可切换后端 + OpenClaw/Hermes 记忆桥梁)
## 1. 验证矩阵
| 项 | 命令 | 结果 |
|---|---|---|
| 类型检查 | `npx tsc --noEmit -p tsconfig.build.json` | ✅ 0 errors |
| 构建产物 | `npm run build` | ✅ 成功,`dist/pg/*`、`dist/cli/pg-commands.js` 已 emit |
| 单元/集成测试 | `npx vitest run` | ✅ 249 passed / 6 skipped |
| PG 配置解析 | `test/pg-config.test.ts` | ✅ 15 passed |
| CLI 冒烟 | 见下文 §3 | ✅ 通过 |
> 现有 SQLite 路径全部测试零回归 —— 满足“原有功能不变”。
## 2. 测试范围
### 2.1 已自动覆盖(无需外部依赖)
- **`test/pg-config.test.ts`** —— 连接配置解析:
- `QMD_PG_URL` / `DATABASE_URL` / `qmd config` 三来源优先级
- TLSstunnel `5443` / `sslmode`)参数推导
- `namespace` 默认值与覆盖
- 缺失连接串时的报错信息(不暴露口令)
- **现有全量回归** —— SQLite 后端 FTS/向量/RRF/重排/分块/collections/context 等,结果与改造前一致。
### 2.2 需外部 PostgreSQL默认 skip门控 `QMD_PG_URL`
- **`test/pg-memory.integration.test.ts`**6 用例,未配置时跳过):
- schema 引导pgvector / pg_jieba / pg_trgm 扩展存在性)
- `memory_add` 写入 → 内容寻址 + 分块 + 嵌入落库
- `memory_search` 混合检索tsvector + pgvectorRRF 融合)
- `namespace` 隔离openclaw vs hermes 互不可见)
- `memory_get` 按 docid/path 取回
- `qmd pg status` 健康探测
- **`test/pg-compose.yml`** —— 一键起本地 PGpostgresql.svc.plus 同款扩展镜像)供上述集成测试使用。
## 3. CLI 冒烟记录
- `qmd --help` —— 新增 `memory`、`pg status` 命令正常列出。
- `qmd memory search foo`(未配置后端)—— 干净报错,提示设置 `QMD_BACKEND=pg` + 连接串,无堆栈泄漏。
- `QMD_BACKEND=pg qmd pg status`(无连接串)—— 干净报错,提示缺少 `QMD_PG_URL`,不打印口令。
## 4. 本地复现集成测试
```sh
# 1) 起本地 PGpgvector + pg_jieba + pg_trgm
docker compose -f test/pg-compose.yml up -d # macOS 先 colima start
# 2) 指向该实例并跑集成测试
export QMD_PG_URL='postgres://postgres:postgres@localhost:5432/qmd'
npx vitest run test/pg-memory.integration.test.ts
# 3) 连真实服务postgresql.svc.plusstunnel TLS 5443
export QMD_PG_URL='postgres://USER:PASS@postgresql.svc.plus:5443/qmd?sslmode=require'
qmd pg status
```
## 5. 限制与待办
- ⚠️ **集成测试尚未对接“真实 PostgreSQL”执行**(本机无运行中的 Docker daemonmacOS 依赖 docker/colima 已补入 infra 基线。CI 接入 `test/pg-compose.yml` 后即可常态化。
- ⚠️ **打分语义差异**PG `ts_rank` ≠ FTS5 BM25排序与 SQLite 后端存在差异,靠 RRF + reranker 缓解;上线前建议跑一轮 SQLite vs PG 对等评测。
- 远程 PG over TLS 的延迟基准尚未压测。

View File

@ -0,0 +1,99 @@
# qmd PostgreSQL Memory Bridge — Usage
The PG backend turns qmd into a shared, namespaced, persistent **memory bridge**
for external agents (OpenClaw, Hermes, …) on top of a PostgreSQL instance with
`pgvector` + `pg_jieba` + `pg_trgm` (e.g. the `postgresql.svc.plus` runtime).
> The existing local-SQLite document workflow is **unchanged** and remains the
> default. The PG backend is additive and opt-in.
## Architecture
```
OpenClaw ─┐ ┌─ pgvector (semantic / 语义)
├─ qmd MCP / CLI ───────→ │ pg_jieba+pg_trgm (中文全文 / fuzzy)
Hermes ──┘ memory_* + pg status └─ namespaced tables
backend=pg ↔ backend=sqlite (default)
```
Memory records are content-addressed, chunked, embedded with the **same external
embedding API** qmd uses for SQLite (so vectors are comparable across hosts), and
searched with hybrid retrieval: `pg_jieba/tsvector` lexical + `pgvector` cosine,
fused with Reciprocal Rank Fusion (RRF).
## Configuration (environment)
| Variable | Default | Purpose |
| :--- | :--- | :--- |
| `QMD_BACKEND` | `sqlite` | Set to `pg` to enable the memory bridge. **Explicit** — a bare URL won't switch it. |
| `QMD_PG_URL` / `DATABASE_URL` | — | `postgres://user:pass@host:5443/db` |
| `QMD_NAMESPACE` | `default` | Tenant namespace (e.g. `openclaw`, `hermes`). |
| `QMD_PG_SSL` | TLS, no-verify | `disable` \| `no-verify` \| `require` (use `QMD_PG_CA` for verification). |
| `QMD_PG_CA` | — | Path to a CA bundle (implies verification on). |
| `QMD_PG_POOL_MAX` | `5` | Connection pool size. |
| `QMD_PG_CONNECT_TIMEOUT_MS` | `10000` | Connection timeout. |
`postgresql.svc.plus` terminates TLS at stunnel (default port **5443**); point
`QMD_PG_URL` at that endpoint, or connect plainly via a local `stunnel-client`
with `QMD_PG_SSL=disable`.
## CLI
```sh
export QMD_BACKEND=pg
export QMD_PG_URL='postgres://postgres:***@db.example.com:5443/qmd'
export QMD_NAMESPACE=openclaw
qmd pg status # backend health: server, fts caps, counts
qmd memory add note/auth "OAuth refresh rotation design" --title "Auth"
echo "long body..." | qmd memory add note/big # body via stdin
qmd memory search "how does auth refresh work" # hybrid search
qmd memory get note/auth
qmd memory ls
qmd memory rm note/auth
qmd memory namespaces # list namespaces + counts
```
Flags: `--namespace <ns>`, `--title <t>`, `-n <limit>`, `--full`, `--json`.
## MCP (the agent-facing bridge)
When `QMD_BACKEND=pg`, `qmd mcp` additionally registers memory tools alongside
the existing document-search tools:
- `memory_search(query, namespace?, limit?, full?)` — hybrid search
- `memory_add(key, body, title?, namespace?, metadata?)` — store/replace
- `memory_get(key, namespace?)` — fetch full body
- `memory_list(namespace?, limit?)` — recent memories
If PG is misconfigured the MCP server logs a warning and continues serving local
document search — it never takes the server down.
## Schema (PostgreSQL)
| Table | Role |
| :--- | :--- |
| `qmd_memory_content(namespace, hash, body, tsv)` | Content-addressed bodies + generated `tsvector` (GIN; `pg_trgm` for fuzzy). |
| `qmd_memory(namespace, key, title, hash, metadata, …, active)` | Memory records (the documents layer); `UNIQUE(namespace, key)`. |
| `qmd_memory_vectors(namespace, hash, seq, pos, embedding, model)` | Per-chunk `pgvector` embeddings; HNSW cosine index added lazily once the dimension is known. |
| `qmd_memory_config(namespace, key, value)` | Bridge metadata (e.g. `vector_dim`). |
## Running integration tests
```sh
docker compose -f test/pg-compose.yml up -d
QMD_PG_URL='postgres://postgres:postgres@localhost:5432/postgres' \
npx vitest run test/pg-memory.integration.test.ts
docker compose -f test/pg-compose.yml down -v
```
The integration test uses a deterministic stub embedder (no network/models). The
pure config tests (`test/pg-config.test.ts`) always run.
## Notes & fidelity
- PostgreSQL FTS ranks with `ts_rank_cd`, **not** BM25 — absolute scores differ
from the SQLite engine, but RRF fusion keeps hybrid ranking robust.
- Without `pg_jieba`, FTS falls back to the `english` config (Latin tokenization).
- Embeddings must come from a **shared** external embedding API for vectors to be
comparable across hosts/agents.

View File

@ -49,6 +49,7 @@
"better-sqlite3": "12.8.0",
"fast-glob": "3.3.3",
"node-llama-cpp": "3.18.1",
"pg": "8.13.1",
"picomatch": "4.0.4",
"sqlite-vec": "0.1.9",
"web-tree-sitter": "0.26.7",
@ -68,6 +69,7 @@
},
"devDependencies": {
"@types/better-sqlite3": "7.6.13",
"@types/pg": "8.11.10",
"tsx": "4.21.0",
"vitest": "3.2.4"
},

263
src/cli/pg-commands.ts Normal file
View File

@ -0,0 +1,263 @@
/**
* cli/pg-commands.ts - `qmd memory` and `qmd pg` command handlers.
*
* These drive the PostgreSQL memory bridge. They own their own bridge lifecycle
* (connection pool + embedder) and are no-ops for the default SQLite backend.
*/
import { openMemoryBridge, redactConnectionString } from "../pg/index.js";
// Minimal ANSI helpers (kept local to avoid coupling to the formatter).
const C = {
reset: "\x1b[0m",
dim: "\x1b[2m",
green: "\x1b[32m",
red: "\x1b[31m",
cyan: "\x1b[36m",
bold: "\x1b[1m",
};
type Values = Record<string, unknown>;
function str(v: unknown): string | undefined {
return typeof v === "string" ? v : undefined;
}
async function readStdin(): Promise<string> {
if (process.stdin.isTTY) return "";
const chunks: Buffer[] = [];
for await (const chunk of process.stdin) chunks.push(chunk as Buffer);
return Buffer.concat(chunks).toString("utf8").trim();
}
function memoryHelp(): void {
console.error(`Usage: qmd memory <add|search|get|rm|ls|namespaces> [options]
Commands:
qmd memory add <key> [text] Store/replace a memory (text from arg or stdin)
qmd memory search <query...> Hybrid search (pg_jieba FTS + pgvector, RRF fused)
qmd memory get <key> Fetch a memory's full body
qmd memory rm <key> Soft-delete a memory
qmd memory ls List memories in the namespace
qmd memory namespaces List namespaces and counts
Options:
--namespace <ns> Tenant namespace (default: $QMD_NAMESPACE or "default")
--title <text> Title for 'add'
-n <num> Max results for 'search'/'ls'
--full Return full body in 'search'
--json JSON output
Requires: QMD_BACKEND=pg and QMD_PG_URL (see docs/plan/pg-backend-memory-bridge.md)`);
}
/** Handle `qmd memory ...`. Returns a process exit code. */
export async function runMemoryCommand(args: string[], values: Values): Promise<number> {
const sub = args[0];
if (!sub || sub === "help") {
memoryHelp();
return sub ? 0 : 1;
}
const json = !!values.json;
const namespace = str(values.namespace);
const limit = values.n ? parseInt(String(values.n), 10) || undefined : undefined;
let bridge;
try {
bridge = await openMemoryBridge();
} catch (err) {
console.error(`${C.red}${C.reset} ${(err as Error).message}`);
return 1;
}
try {
switch (sub) {
case "add": {
const key = args[1];
if (!key) {
console.error("Usage: qmd memory add <key> [text] (text may be piped via stdin)");
return 1;
}
const inline = args.slice(2).join(" ").trim();
const body = inline || (await readStdin());
if (!body) {
console.error(`${C.red}${C.reset} No body provided (pass text or pipe via stdin)`);
return 1;
}
const res = await bridge.store.addMemory({
key,
body,
...(str(values.title) ? { title: str(values.title)! } : {}),
...(namespace ? { namespace } : {}),
});
if (json) {
console.log(JSON.stringify(res, null, 2));
} else {
console.log(
`${C.green}${C.reset} stored ${C.bold}${res.key}${C.reset} ` +
`${C.dim}#${res.docid} · ${res.chunks} chunk(s) · ${res.embedded ? "embedded" : "no embedding"} · ns=${res.namespace}${C.reset}`,
);
}
return 0;
}
case "search":
case "query": {
const query = args.slice(1).join(" ").trim();
if (!query) {
console.error("Usage: qmd memory search <query...>");
return 1;
}
const results = await bridge.store.searchMemory(query, {
...(namespace ? { namespace } : {}),
...(limit ? { limit } : {}),
full: !!values.full,
});
if (json) {
console.log(JSON.stringify(results, null, 2));
return 0;
}
if (results.length === 0) {
console.log(`${C.dim}No matches.${C.reset}`);
return 0;
}
for (const r of results) {
const signals = [
r.lexRank ? `lex#${r.lexRank}` : null,
r.vecRank ? `vec#${r.vecRank}` : null,
]
.filter(Boolean)
.join(" ");
console.log(
`${C.cyan}${r.key}${C.reset} ${C.dim}#${r.docid} · score ${r.score.toFixed(4)} · ${signals}${C.reset}`,
);
if (r.title) console.log(` ${C.bold}${r.title}${C.reset}`);
console.log(` ${r.body.replace(/\n/g, "\n ")}`);
console.log("");
}
return 0;
}
case "get": {
const key = args[1];
if (!key) {
console.error("Usage: qmd memory get <key>");
return 1;
}
const rec = await bridge.store.getMemory(key, namespace ? { namespace } : undefined);
if (!rec) {
console.error(`${C.red}${C.reset} not found: ${key}`);
return 1;
}
if (json) {
console.log(JSON.stringify(rec, null, 2));
} else {
if (rec.title) console.log(`${C.bold}${rec.title}${C.reset}`);
console.log(rec.body);
}
return 0;
}
case "rm":
case "remove":
case "delete": {
const key = args[1];
if (!key) {
console.error("Usage: qmd memory rm <key>");
return 1;
}
const ok = await bridge.store.deleteMemory(key, namespace ? { namespace } : undefined);
console.log(ok ? `${C.green}${C.reset} removed ${key}` : `${C.dim}not found: ${key}${C.reset}`);
return ok ? 0 : 1;
}
case "ls":
case "list": {
const rows = await bridge.store.listMemories({
...(namespace ? { namespace } : {}),
...(limit ? { limit } : {}),
});
if (json) {
console.log(JSON.stringify(rows, null, 2));
return 0;
}
if (rows.length === 0) {
console.log(`${C.dim}No memories.${C.reset}`);
return 0;
}
for (const r of rows) {
console.log(`${C.cyan}${r.key}${C.reset} ${C.dim}#${r.docid}${C.reset} ${r.title}`);
}
return 0;
}
case "namespaces":
case "ns": {
const rows = await bridge.store.listNamespaces();
if (json) {
console.log(JSON.stringify(rows, null, 2));
return 0;
}
for (const r of rows) console.log(`${r.namespace} ${C.dim}(${r.count})${C.reset}`);
return 0;
}
default:
memoryHelp();
return 1;
}
} catch (err) {
console.error(`${C.red}${C.reset} ${(err as Error).message}`);
return 1;
} finally {
await bridge.dispose();
}
}
/** Handle `qmd pg ...`. Returns a process exit code. */
export async function runPgCommand(args: string[], values: Values): Promise<number> {
const sub = args[0] ?? "status";
if (sub !== "status" && sub !== "health") {
console.error("Usage: qmd pg status");
return 1;
}
let bridge;
try {
bridge = await openMemoryBridge();
} catch (err) {
console.error(`${C.red}${C.reset} ${(err as Error).message}`);
return 1;
}
try {
const health = await bridge.store.health();
const payload = {
backend: "pg",
connection: redactConnectionString(bridge.config.connectionString),
namespace: health.namespace,
server: health.server,
fts: health.fts,
memories: health.memories,
};
if (values.json) {
console.log(JSON.stringify(payload, null, 2));
} else {
console.log(`${C.green}${C.reset} PostgreSQL memory backend`);
console.log(` connection : ${payload.connection}`);
console.log(` namespace : ${payload.namespace}`);
console.log(` server : ${payload.server.split(" ").slice(0, 2).join(" ")}`);
console.log(
` fts : ${health.fts.config}${health.fts.trigram ? " +pg_trgm" : ""}${health.fts.vector ? " +pgvector" : ""}`,
);
console.log(` memories : ${health.memories} (this namespace)`);
}
return 0;
} catch (err) {
console.error(`${C.red}${C.reset} ${(err as Error).message}`);
return 1;
} finally {
await bridge.dispose();
}
}

View File

@ -100,6 +100,7 @@ import {
loadConfig,
} from "../collections.js";
import { getEmbeddedQmdSkillContent, getEmbeddedQmdSkillFiles } from "../embedded-skills.js";
import { runMemoryCommand, runPgCommand } from "./pg-commands.js";
// Enable production mode - allows using default database path
// Tests must set INDEX_PATH or use createStore() with explicit path
@ -2533,6 +2534,9 @@ function parseCLI() {
delete: { type: "boolean" },
update: { type: "boolean" },
embed: { type: "boolean" },
// Memory bridge (PG backend) options
namespace: { type: "string" }, // tenant namespace (openclaw/hermes/...)
title: { type: "string" }, // memory title
},
allowPositionals: true,
strict: false, // Allow unknown options to pass through
@ -2723,6 +2727,8 @@ function showHelp(): void {
console.log(" qmd multi-get <pattern> - Batch fetch via glob or comma-separated list");
console.log(" qmd skill show/install - Show or install the packaged QMD skill");
console.log(" qmd mcp - Start the MCP server (stdio transport for AI agents)");
console.log(" qmd memory add/search/get/rm - Shared PG memory bridge (needs QMD_BACKEND=pg)");
console.log(" qmd pg status - Show PostgreSQL memory backend health");
console.log(" qmd sync [--dry-run] - Secure two-way sync with a remote QMD host");
console.log(" qmd bench <fixture.json> - Run search quality benchmarks against a fixture file");
console.log("");
@ -3431,6 +3437,17 @@ if (isMain) {
break;
}
case "memory":
case "mem": {
const code = await runMemoryCommand(cli.args, cli.values);
process.exit(code);
}
case "pg": {
const code = await runPgCommand(cli.args, cli.values);
process.exit(code);
}
default:
console.error(`Unknown command: ${cli.command}`);
console.error("Run 'qmd --help' for usage.");

View File

@ -31,6 +31,7 @@ import {
} from "../index.js";
import { getConfigPath } from "../collections.js";
import { enableProductionMode } from "../store.js";
import { isPgBackend, openMemoryBridge, type MemoryBridge } from "../pg/index.js";
enableProductionMode();
@ -172,7 +173,7 @@ async function buildInstructions(store: QMDStore): Promise<string> {
* Create an MCP server with all QMD tools, resources, and prompts registered.
* Shared by both stdio and HTTP transports.
*/
async function createMcpServer(store: QMDStore): Promise<McpServer> {
async function createMcpServer(store: QMDStore, memory?: MemoryBridge): Promise<McpServer> {
const server = new McpServer(
{ name: "qmd", version: getPackageVersion() },
{ instructions: await buildInstructions(store) },
@ -533,9 +534,134 @@ Intent-aware lex (C++ performance, not sports):
}
);
// ---------------------------------------------------------------------------
// Memory bridge tools (PostgreSQL backend) — registered only when QMD_BACKEND=pg.
// Lets external agents (OpenClaw, Hermes, ...) share a namespaced, persistent
// memory store backed by pgvector + pg_jieba on postgresql.svc.plus.
// ---------------------------------------------------------------------------
if (memory) {
registerMemoryTools(server, memory);
}
return server;
}
/** Register the PG-backed memory tools on an MCP server. */
function registerMemoryTools(server: McpServer, memory: MemoryBridge): void {
server.registerTool(
"memory_search",
{
title: "Search Memory",
description:
"Hybrid search over shared agent memory (PostgreSQL). Fuses pg_jieba/tsvector " +
"lexical search with pgvector semantic search via RRF. Scoped to a namespace.",
annotations: { readOnlyHint: true, openWorldHint: false },
inputSchema: {
query: z.string().describe("Natural language or keyword query"),
namespace: z.string().optional().describe("Tenant namespace (default: configured namespace)"),
limit: z.number().optional().default(10).describe("Max results"),
full: z.boolean().optional().default(false).describe("Return full body instead of a snippet"),
},
},
async ({ query, namespace, limit, full }) => {
const results = await memory.store.searchMemory(query, {
...(namespace ? { namespace } : {}),
...(limit ? { limit } : {}),
full: !!full,
});
if (results.length === 0) {
return { content: [{ type: "text", text: `No memory matches for: ${query}` }] };
}
const text = results
.map((r) => `### ${r.key} (#${r.docid}, score ${r.score.toFixed(4)})\n${r.title ? r.title + "\n" : ""}${r.body}`)
.join("\n\n");
return { content: [{ type: "text", text }], structuredContent: { results } };
},
);
server.registerTool(
"memory_add",
{
title: "Add Memory",
description:
"Store or replace a memory in the shared PostgreSQL store. Re-adding the same " +
"key updates it. Content is chunked, embedded, and indexed for hybrid search.",
annotations: { readOnlyHint: false, destructiveHint: false, openWorldHint: false },
inputSchema: {
key: z.string().describe("Logical id within the namespace (like a path/slug)"),
body: z.string().describe("The memory content to store and index"),
title: z.string().optional().describe("Optional human title"),
namespace: z.string().optional().describe("Tenant namespace (default: configured namespace)"),
metadata: z.record(z.string(), z.unknown()).optional().describe("Arbitrary JSON metadata"),
},
},
async ({ key, body, title, namespace, metadata }) => {
const res = await memory.store.addMemory({
key,
body,
...(title ? { title } : {}),
...(namespace ? { namespace } : {}),
...(metadata ? { metadata } : {}),
});
return {
content: [
{
type: "text",
text: `Stored "${res.key}" (#${res.docid}) in namespace "${res.namespace}": ${res.chunks} chunk(s), ${res.embedded ? "embedded" : "no embedding"}.`,
},
],
structuredContent: { ...res },
};
},
);
server.registerTool(
"memory_get",
{
title: "Get Memory",
description: "Fetch a single memory's full body by key from the shared PostgreSQL store.",
annotations: { readOnlyHint: true, openWorldHint: false },
inputSchema: {
key: z.string().describe("The memory key"),
namespace: z.string().optional().describe("Tenant namespace (default: configured namespace)"),
},
},
async ({ key, namespace }) => {
const rec = await memory.store.getMemory(key, namespace ? { namespace } : undefined);
if (!rec) {
return { content: [{ type: "text", text: `Memory not found: ${key}` }], isError: true };
}
return {
content: [{ type: "text", text: `${rec.title ? rec.title + "\n\n" : ""}${rec.body}` }],
structuredContent: { ...rec },
};
},
);
server.registerTool(
"memory_list",
{
title: "List Memory",
description: "List memories in a namespace (most recently updated first).",
annotations: { readOnlyHint: true, openWorldHint: false },
inputSchema: {
namespace: z.string().optional().describe("Tenant namespace (default: configured namespace)"),
limit: z.number().optional().default(100).describe("Max entries"),
},
},
async ({ namespace, limit }) => {
const rows = await memory.store.listMemories({
...(namespace ? { namespace } : {}),
...(limit ? { limit } : {}),
});
const text = rows.length
? rows.map((r) => `- ${r.key} (#${r.docid}) ${r.title}`).join("\n")
: "No memories.";
return { content: [{ type: "text", text }], structuredContent: { memories: rows } };
},
);
}
// =============================================================================
// Transport: stdio (default)
// =============================================================================
@ -546,11 +672,27 @@ export async function startMcpServer(): Promise<void> {
dbPath: getDefaultDbPath(),
...(existsSync(configPath) ? { configPath } : {}),
});
const server = await createMcpServer(store);
const memory = await maybeOpenMemoryBridge();
const server = await createMcpServer(store, memory);
const transport = new StdioServerTransport();
await server.connect(transport);
}
/**
* Open the PG memory bridge when QMD_BACKEND=pg, otherwise return undefined so
* the SQLite-only experience is unchanged. Failures are logged (not fatal) so a
* misconfigured PG never takes down local document search.
*/
async function maybeOpenMemoryBridge(): Promise<MemoryBridge | undefined> {
if (!isPgBackend()) return undefined;
try {
return await openMemoryBridge();
} catch (err) {
console.error(`[qmd:mcp] memory bridge disabled: ${(err as Error).message}`);
return undefined;
}
}
// =============================================================================
// Transport: Streamable HTTP
// =============================================================================
@ -575,6 +717,9 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
// Pre-fetch default collection names for REST endpoint
const defaultCollectionNames = await store.getDefaultCollectionNames();
// Shared memory bridge (PG backend) — one pool for all sessions.
const memory = await maybeOpenMemoryBridge();
// Session map: each client gets its own McpServer + Transport pair (MCP spec requirement).
// The store is shared — it's stateless SQLite, safe for concurrent access.
const sessions = new Map<string, WebStandardStreamableHTTPServerTransport>();
@ -588,7 +733,7 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
log(`${ts()} New session ${sessionId} (${sessions.size} active)`);
},
});
const server = await createMcpServer(store);
const server = await createMcpServer(store, memory);
await server.connect(transport);
transport.onclose = () => {
@ -813,6 +958,7 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
sessions.clear();
httpServer.close();
await store.close();
if (memory) await memory.dispose();
};
process.on("SIGTERM", async () => {

122
src/pg/config.ts Normal file
View File

@ -0,0 +1,122 @@
/**
* pg/config.ts - Backend selection & PostgreSQL connection resolution
*
* QMD defaults to the local SQLite backend (unchanged behaviour). When the PG
* backend is selected, qmd talks to a shared PostgreSQL instance (e.g. the
* postgresql.svc.plus runtime with pgvector + pg_jieba + pg_trgm) and acts as a
* memory bridge for external agents (OpenClaw, Hermes, ...).
*
* Nothing here imports `pg` connection objects are built lazily in db-pg.ts so
* that SQLite-only installs never need the driver.
*/
import { readFileSync } from "node:fs";
export type Backend = "sqlite" | "pg";
/** TLS configuration passed through to node-postgres' `ssl` option. */
export type PgSslConfig =
| false
| {
rejectUnauthorized: boolean;
ca?: string;
};
export interface PgConnectionConfig {
/** PostgreSQL connection string (postgres://user:pass@host:port/db). */
connectionString: string;
/** TLS settings. postgresql.svc.plus terminates TLS at stunnel (default 5443). */
ssl: PgSslConfig;
/** Logical tenant namespace — isolates OpenClaw / Hermes / ... memory. */
namespace: string;
/** Max pool size. */
max: number;
/** Connection/statement timeout (ms). */
connectionTimeoutMillis: number;
}
export const DEFAULT_NAMESPACE = "default";
/**
* Resolve the active backend from QMD_BACKEND. Defaults to "sqlite".
* A bare `QMD_PG_URL`/`DATABASE_URL` does NOT silently switch the backend
* selection is explicit so existing SQLite workflows never change.
*/
export function resolveBackend(env: NodeJS.ProcessEnv = process.env): Backend {
const raw = (env.QMD_BACKEND ?? "").trim().toLowerCase();
if (raw === "pg" || raw === "postgres" || raw === "postgresql") return "pg";
return "sqlite";
}
/** True when QMD is configured to use PostgreSQL. */
export function isPgBackend(env: NodeJS.ProcessEnv = process.env): boolean {
return resolveBackend(env) === "pg";
}
/**
* Resolve the namespace for the current process. Agents override per-call, but
* this is the default tenant when none is supplied.
*/
export function resolveNamespace(env: NodeJS.ProcessEnv = process.env): string {
const ns = (env.QMD_NAMESPACE ?? "").trim();
return ns || DEFAULT_NAMESPACE;
}
/**
* Resolve the TLS configuration.
*
* - QMD_PG_SSL=disable no TLS (plain; e.g. via a local stunnel-client)
* - QMD_PG_SSL=no-verify TLS without certificate verification
* - QMD_PG_SSL=require (def.) TLS, verify against system / QMD_PG_CA bundle
* - QMD_PG_CA=/path/to/ca.pem custom CA bundle (implies verification on)
*/
export function resolvePgSsl(env: NodeJS.ProcessEnv = process.env): PgSslConfig {
const mode = (env.QMD_PG_SSL ?? "").trim().toLowerCase();
const caPath = (env.QMD_PG_CA ?? "").trim();
const ca = caPath ? readFileSync(caPath, "utf8") : undefined;
if (mode === "disable" || mode === "off" || mode === "false") return false;
if (mode === "no-verify" || mode === "allow") {
return { rejectUnauthorized: false, ...(ca ? { ca } : {}) };
}
// Default: require TLS. Verify only when a CA is supplied (stunnel often uses
// self-signed certs, so default to no-verify unless an explicit CA is given).
return { rejectUnauthorized: !!ca, ...(ca ? { ca } : {}) };
}
/**
* Build the full PG connection config from the environment.
* Throws a clear error when the PG backend is selected without a connection URL.
*/
export function resolvePgConfig(env: NodeJS.ProcessEnv = process.env): PgConnectionConfig {
const connectionString = (env.QMD_PG_URL ?? env.DATABASE_URL ?? "").trim();
if (!connectionString) {
throw new Error(
"PostgreSQL backend selected (QMD_BACKEND=pg) but no connection URL found. " +
"Set QMD_PG_URL (or DATABASE_URL), e.g.\n" +
" export QMD_PG_URL='postgres://postgres:***@db.example.com:5443/qmd'",
);
}
const max = Number.parseInt(env.QMD_PG_POOL_MAX ?? "", 10);
const timeout = Number.parseInt(env.QMD_PG_CONNECT_TIMEOUT_MS ?? "", 10);
return {
connectionString,
ssl: resolvePgSsl(env),
namespace: resolveNamespace(env),
max: Number.isFinite(max) && max > 0 ? max : 5,
connectionTimeoutMillis: Number.isFinite(timeout) && timeout > 0 ? timeout : 10_000,
};
}
/** Redact credentials from a connection string for logging. */
export function redactConnectionString(connectionString: string): string {
try {
const url = new URL(connectionString);
if (url.password) url.password = "***";
return url.toString();
} catch {
return connectionString.replace(/:\/\/[^@]*@/, "://***@");
}
}

144
src/pg/db-pg.ts Normal file
View File

@ -0,0 +1,144 @@
/**
* pg/db-pg.ts - PostgreSQL connection layer
*
* Thin async wrapper over node-postgres. `pg` is imported lazily so SQLite-only
* installs never load the driver. Provides a small query surface used by the
* memory store plus a transaction helper.
*/
import type { PgConnectionConfig } from "./config.js";
import { redactConnectionString } from "./config.js";
// Minimal structural types — we avoid importing `pg`'s types at module load so
// the dependency stays optional at runtime.
interface PoolLike {
query(text: string, params?: unknown[]): Promise<{ rows: any[]; rowCount: number | null }>;
connect(): Promise<PoolClientLike>;
end(): Promise<void>;
on(event: "error", handler: (err: Error) => void): void;
}
interface PoolClientLike {
query(text: string, params?: unknown[]): Promise<{ rows: any[]; rowCount: number | null }>;
release(): void;
}
let PoolCtor: (new (config: Record<string, unknown>) => PoolLike) | null = null;
async function loadPoolCtor(): Promise<new (config: Record<string, unknown>) => PoolLike> {
if (PoolCtor) return PoolCtor;
let mod: any;
try {
mod = await import("pg");
} catch (err) {
throw new Error(
"The 'pg' package is required for the PostgreSQL backend but is not installed. " +
"Run `bun add pg` (and `bun add -d @types/pg`).\n" +
`Underlying error: ${(err as Error).message}`,
);
}
const Pool = mod.Pool ?? mod.default?.Pool;
if (!Pool) throw new Error("Could not resolve `Pool` from the 'pg' package.");
PoolCtor = Pool;
return Pool;
}
export class PgClient {
private pool: PoolLike;
readonly namespace: string;
readonly connectionLabel: string;
private constructor(pool: PoolLike, namespace: string, connectionLabel: string) {
this.pool = pool;
this.namespace = namespace;
this.connectionLabel = connectionLabel;
}
static async create(config: PgConnectionConfig): Promise<PgClient> {
const Pool = await loadPoolCtor();
const pool = new Pool({
connectionString: config.connectionString,
ssl: config.ssl,
max: config.max,
connectionTimeoutMillis: config.connectionTimeoutMillis,
});
// Surface background pool errors instead of crashing the process.
pool.on("error", (err) => {
console.error(`[qmd:pg] idle client error: ${err.message}`);
});
return new PgClient(pool, config.namespace, redactConnectionString(config.connectionString));
}
/** Run a query and return all rows. */
async query<T = any>(text: string, params: unknown[] = []): Promise<T[]> {
const res = await this.pool.query(text, params);
return res.rows as T[];
}
/** Run a query and return the first row (or null). */
async queryOne<T = any>(text: string, params: unknown[] = []): Promise<T | null> {
const res = await this.pool.query(text, params);
return (res.rows[0] as T) ?? null;
}
/** Run a statement and return the affected row count. */
async exec(text: string, params: unknown[] = []): Promise<number> {
const res = await this.pool.query(text, params);
return res.rowCount ?? 0;
}
/** Run `fn` inside a transaction on a dedicated client. */
async tx<T>(fn: (client: PgTxClient) => Promise<T>): Promise<T> {
const client = await this.pool.connect();
try {
await client.query("BEGIN");
const result = await fn(new PgTxClient(client));
await client.query("COMMIT");
return result;
} catch (err) {
try {
await client.query("ROLLBACK");
} catch {
/* ignore rollback failure */
}
throw err;
} finally {
client.release();
}
}
/** Liveness check — returns the server version string. */
async ping(): Promise<string> {
const row = await this.queryOne<{ version: string }>("SELECT version() AS version");
return row?.version ?? "unknown";
}
async close(): Promise<void> {
await this.pool.end();
}
}
/** Query handle bound to a single transaction client. */
export class PgTxClient {
constructor(private client: PoolClientLike) {}
async query<T = any>(text: string, params: unknown[] = []): Promise<T[]> {
const res = await this.client.query(text, params);
return res.rows as T[];
}
async queryOne<T = any>(text: string, params: unknown[] = []): Promise<T | null> {
const res = await this.client.query(text, params);
return (res.rows[0] as T) ?? null;
}
async exec(text: string, params: unknown[] = []): Promise<number> {
const res = await this.client.query(text, params);
return res.rowCount ?? 0;
}
}
/** Format a JS number[] as a pgvector literal: [1,2,3]. */
export function toVectorLiteral(embedding: number[] | Float32Array): string {
return `[${Array.from(embedding).join(",")}]`;
}

70
src/pg/index.ts Normal file
View File

@ -0,0 +1,70 @@
/**
* pg/index.ts - Entry point for the PostgreSQL memory bridge.
*
* Wires qmd's LlamaCpp embedder (external OpenAI-compatible API by default,
* optional local models) to the PG-backed memory store and returns a handle
* that disposes both the LLM and the connection pool on close.
*/
import { LlamaCpp } from "../llm.js";
import { resolvePgConfig, isPgBackend, type PgConnectionConfig } from "./config.js";
import { PgMemoryStore } from "./memory-store.js";
export { PgMemoryStore } from "./memory-store.js";
export type {
AddMemoryInput,
AddMemoryResult,
MemorySearchResult,
MemoryRecord,
SearchOptions,
Embedder,
} from "./memory-store.js";
export {
resolveBackend,
isPgBackend,
resolveNamespace,
resolvePgConfig,
redactConnectionString,
DEFAULT_NAMESPACE,
} from "./config.js";
export type { Backend, PgConnectionConfig } from "./config.js";
export interface MemoryBridge {
store: PgMemoryStore;
config: PgConnectionConfig;
dispose(): Promise<void>;
}
/**
* Open the memory bridge from the environment. Throws a clear error if the PG
* backend is not configured.
*/
export async function openMemoryBridge(
env: NodeJS.ProcessEnv = process.env,
): Promise<MemoryBridge> {
if (!isPgBackend(env)) {
throw new Error(
"PostgreSQL backend is not selected. Set QMD_BACKEND=pg and QMD_PG_URL to use memory commands.",
);
}
const config = resolvePgConfig(env);
// A dedicated embedder for the bridge — lazy-loads models on first use and
// auto-unloads after inactivity. Uses the same external embed API as the
// SQLite engine so vectors are comparable across hosts.
const llm = new LlamaCpp({
inactivityTimeoutMs: 5 * 60 * 1000,
disposeModelsOnInactivity: true,
});
const store = await PgMemoryStore.open(config, llm);
return {
store,
config,
dispose: async () => {
await store.close();
await llm.dispose();
},
};
}

424
src/pg/memory-store.ts Normal file
View File

@ -0,0 +1,424 @@
/**
* pg/memory-store.ts - PostgreSQL-backed memory store (the OpenClaw/Hermes bridge)
*
* A focused, namespace-isolated memory layer over PostgreSQL. It reuses qmd's
* pure helpers (content hashing, chunking, docid, embedding formatting) so PG
* search behaves consistently with the SQLite engine, then fuses lexical
* (pg_jieba/tsvector) and semantic (pgvector) results with Reciprocal Rank
* Fusion the same idea qmd uses for hybrid search.
*
* The existing SQLite document workflow is untouched; this is an additive,
* opt-in backend selected via QMD_BACKEND=pg.
*/
import { PgClient, toVectorLiteral } from "./db-pg.js";
import type { PgConnectionConfig } from "./config.js";
import { bootstrapSchema, ensureVectorIndex, type FtsCapabilities } from "./schema-pg.js";
import { hashContent, chunkDocument, getDocid } from "../store.js";
import {
DEFAULT_EMBED_MODEL_URI,
formatDocForEmbedding,
formatQueryForEmbedding,
} from "../llm.js";
/** Minimal embedder contract — satisfied by LlamaCpp from llm.ts. */
export interface Embedder {
embedBatch(
texts: string[],
options?: { isQuery?: boolean; model?: string },
): Promise<({ embedding: number[]; model: string } | null)[]>;
}
export interface AddMemoryInput {
/** Logical id within the namespace (like a path). Re-adding the same key updates it. */
key: string;
/** The memory body to store and index. */
body: string;
/** Optional human title (also surfaced in results). */
title?: string;
/** Arbitrary JSON metadata (agent, tags, source, ...). */
metadata?: Record<string, unknown>;
/** Override the default namespace for this call. */
namespace?: string;
/** Embedding model URI (defaults to the configured embed model). */
model?: string;
}
export interface AddMemoryResult {
namespace: string;
key: string;
hash: string;
docid: string;
chunks: number;
embedded: boolean;
}
export interface MemorySearchResult {
namespace: string;
key: string;
title: string;
docid: string;
hash: string;
/** Fused RRF score (higher = better). */
score: number;
/** Best lexical rank (1-indexed) or null if not matched lexically. */
lexRank: number | null;
/** Best vector rank (1-indexed) or null if not matched semantically. */
vecRank: number | null;
/** Snippet/body (truncated unless full requested). */
body: string;
metadata: Record<string, unknown>;
}
export interface MemoryRecord {
namespace: string;
key: string;
title: string;
docid: string;
hash: string;
body: string;
metadata: Record<string, unknown>;
createdAt: string;
updatedAt: string;
}
export interface SearchOptions {
namespace?: string;
limit?: number;
/** Candidate pool per signal before fusion (default 50). */
candidateLimit?: number;
/** Return full body instead of a snippet. */
full?: boolean;
/** Embedding model URI for the query. */
model?: string;
}
const RRF_K = 60;
const SNIPPET_CHARS = 400;
/** Reciprocal Rank Fusion across signal-specific ranked key lists. */
function rrfFuse(rankings: Array<Map<string, number>>): Map<string, number> {
const scores = new Map<string, number>();
for (const ranking of rankings) {
for (const [key, rank] of ranking) {
scores.set(key, (scores.get(key) ?? 0) + 1 / (RRF_K + rank));
}
}
return scores;
}
function snippet(body: string, full: boolean): string {
if (full || body.length <= SNIPPET_CHARS) return body;
return body.slice(0, SNIPPET_CHARS) + "…";
}
function parseMetadata(raw: unknown): Record<string, unknown> {
if (raw && typeof raw === "object") return raw as Record<string, unknown>;
if (typeof raw === "string") {
try {
return JSON.parse(raw) as Record<string, unknown>;
} catch {
return {};
}
}
return {};
}
export class PgMemoryStore {
private constructor(
private client: PgClient,
private embedder: Embedder,
private fts: FtsCapabilities,
private defaultNamespace: string,
private defaultModel: string,
) {}
static async open(
config: PgConnectionConfig,
embedder: Embedder,
opts?: { model?: string },
): Promise<PgMemoryStore> {
const client = await PgClient.create(config);
const fts = await bootstrapSchema(client);
return new PgMemoryStore(
client,
embedder,
fts,
config.namespace,
opts?.model ?? DEFAULT_EMBED_MODEL_URI,
);
}
get capabilities(): FtsCapabilities {
return this.fts;
}
private ns(override?: string): string {
return (override ?? "").trim() || this.defaultNamespace;
}
// ── Write ──────────────────────────────────────────────────────────────
async addMemory(input: AddMemoryInput): Promise<AddMemoryResult> {
const namespace = this.ns(input.namespace);
const body = input.body;
const title = input.title ?? "";
const model = input.model ?? this.defaultModel;
const hash = await hashContent(body);
const docid = getDocid(hash);
// Chunk + embed up front (network/LLM bound) before opening the transaction.
const chunks = chunkDocument(body);
const texts = chunks.map((c) => formatDocForEmbedding(c.text, title, model));
const embeddings = await this.embedder.embedBatch(texts, { isQuery: false, model });
await this.client.tx(async (tx) => {
await tx.exec(
`INSERT INTO qmd_memory_content (namespace, hash, body)
VALUES ($1, $2, $3)
ON CONFLICT (namespace, hash) DO NOTHING`,
[namespace, hash, body],
);
await tx.exec(
`INSERT INTO qmd_memory (namespace, key, title, hash, metadata, updated_at, active)
VALUES ($1, $2, $3, $4, $5::jsonb, now(), true)
ON CONFLICT (namespace, key)
DO UPDATE SET title = EXCLUDED.title,
hash = EXCLUDED.hash,
metadata = EXCLUDED.metadata,
updated_at = now(),
active = true`,
[namespace, input.key, title, hash, JSON.stringify(input.metadata ?? {})],
);
// Refresh embeddings for this content hash.
await tx.exec(`DELETE FROM qmd_memory_vectors WHERE namespace = $1 AND hash = $2`, [
namespace,
hash,
]);
for (let i = 0; i < chunks.length; i++) {
const emb = embeddings[i];
const chunk = chunks[i];
if (!emb || !chunk) continue;
await tx.exec(
`INSERT INTO qmd_memory_vectors (namespace, hash, seq, pos, embedding, model)
VALUES ($1, $2, $3, $4, $5::vector, $6)`,
[namespace, hash, i, chunk.pos, toVectorLiteral(emb.embedding), emb.model ?? model],
);
}
});
const embeddedCount = embeddings.filter(Boolean).length;
// Lazily build the ANN index once we know the embedding dimension.
if (embeddedCount > 0) {
const dim = embeddings.find(Boolean)?.embedding.length;
if (dim) await this.ensureIndexOnce(dim);
}
return {
namespace,
key: input.key,
hash,
docid,
chunks: chunks.length,
embedded: embeddedCount > 0,
};
}
private async ensureIndexOnce(dim: number): Promise<void> {
const ns = this.defaultNamespace;
const existing = await this.client.queryOne<{ value: string }>(
`SELECT value FROM qmd_memory_config WHERE namespace = $1 AND key = 'vector_dim'`,
[ns],
);
if (existing?.value === String(dim)) return;
await ensureVectorIndex(this.client, dim);
await this.client.exec(
`INSERT INTO qmd_memory_config (namespace, key, value)
VALUES ($1, 'vector_dim', $2)
ON CONFLICT (namespace, key) DO UPDATE SET value = EXCLUDED.value`,
[ns, String(dim)],
);
}
// ── Search ───────────────────────────────────────────────────────────────
async searchMemory(query: string, opts: SearchOptions = {}): Promise<MemorySearchResult[]> {
const namespace = this.ns(opts.namespace);
const limit = opts.limit ?? 10;
const candidateLimit = opts.candidateLimit ?? 50;
const model = opts.model ?? this.defaultModel;
// Signal 1 — lexical (pg_jieba/tsvector + ts_rank).
const lexRows = await this.client.query<{ key: string }>(
`SELECT m.key AS key
FROM qmd_memory_content c
JOIN qmd_memory m ON m.namespace = c.namespace AND m.hash = c.hash AND m.active
WHERE c.namespace = $1 AND c.tsv @@ websearch_to_tsquery($2::regconfig, $3)
ORDER BY ts_rank_cd(c.tsv, websearch_to_tsquery($2::regconfig, $3)) DESC
LIMIT $4`,
[namespace, this.fts.config, query, candidateLimit],
);
const lexRank = new Map<string, number>();
lexRows.forEach((r, i) => {
if (!lexRank.has(r.key)) lexRank.set(r.key, i + 1);
});
// Signal 2 — semantic (pgvector cosine distance).
const vecRank = new Map<string, number>();
const qEmb = (await this.embedder.embedBatch([formatQueryForEmbedding(query, model)], {
isQuery: true,
model,
}))[0];
if (qEmb) {
const vecRows = await this.client.query<{ key: string }>(
`SELECT m.key AS key
FROM qmd_memory_vectors v
JOIN qmd_memory m ON m.namespace = v.namespace AND m.hash = v.hash AND m.active
WHERE v.namespace = $1
ORDER BY v.embedding <=> $2::vector
LIMIT $3`,
[namespace, toVectorLiteral(qEmb.embedding), candidateLimit],
);
vecRows.forEach((r) => {
if (!vecRank.has(r.key)) vecRank.set(r.key, vecRank.size + 1);
});
}
const fused = rrfFuse([lexRank, vecRank]);
if (fused.size === 0) return [];
const ranked = [...fused.entries()].sort((a, b) => b[1] - a[1]).slice(0, limit);
const keys = ranked.map(([key]) => key);
// Hydrate the winning keys with title/body/metadata.
const rows = await this.client.query<{
key: string;
title: string;
hash: string;
body: string;
metadata: unknown;
}>(
`SELECT m.key, m.title, m.hash, c.body, m.metadata
FROM qmd_memory m
JOIN qmd_memory_content c ON c.namespace = m.namespace AND c.hash = m.hash
WHERE m.namespace = $1 AND m.key = ANY($2) AND m.active`,
[namespace, keys],
);
const byKey = new Map(rows.map((r) => [r.key, r]));
const results: MemorySearchResult[] = [];
for (const [key, score] of ranked) {
const row = byKey.get(key);
if (!row) continue;
results.push({
namespace,
key,
title: row.title,
hash: row.hash,
docid: getDocid(row.hash),
score,
lexRank: lexRank.get(key) ?? null,
vecRank: vecRank.get(key) ?? null,
body: snippet(row.body, opts.full ?? false),
metadata: parseMetadata(row.metadata),
});
}
return results;
}
// ── Read / manage ──────────────────────────────────────────────────────
async getMemory(key: string, opts?: { namespace?: string }): Promise<MemoryRecord | null> {
const namespace = this.ns(opts?.namespace);
const row = await this.client.queryOne<{
key: string;
title: string;
hash: string;
body: string;
metadata: unknown;
created_at: string;
updated_at: string;
}>(
`SELECT m.key, m.title, m.hash, c.body, m.metadata, m.created_at, m.updated_at
FROM qmd_memory m
JOIN qmd_memory_content c ON c.namespace = m.namespace AND c.hash = m.hash
WHERE m.namespace = $1 AND m.key = $2 AND m.active`,
[namespace, key],
);
if (!row) return null;
return {
namespace,
key: row.key,
title: row.title,
hash: row.hash,
docid: getDocid(row.hash),
body: row.body,
metadata: parseMetadata(row.metadata),
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}
async deleteMemory(key: string, opts?: { namespace?: string }): Promise<boolean> {
const namespace = this.ns(opts?.namespace);
const n = await this.client.exec(
`UPDATE qmd_memory SET active = false, updated_at = now()
WHERE namespace = $1 AND key = $2 AND active`,
[namespace, key],
);
return n > 0;
}
async listMemories(
opts?: { namespace?: string; limit?: number },
): Promise<Array<{ key: string; title: string; docid: string; updatedAt: string }>> {
const namespace = this.ns(opts?.namespace);
const rows = await this.client.query<{
key: string;
title: string;
hash: string;
updated_at: string;
}>(
`SELECT key, title, hash, updated_at FROM qmd_memory
WHERE namespace = $1 AND active ORDER BY updated_at DESC LIMIT $2`,
[namespace, opts?.limit ?? 100],
);
return rows.map((r) => ({
key: r.key,
title: r.title,
docid: getDocid(r.hash),
updatedAt: r.updated_at,
}));
}
async listNamespaces(): Promise<Array<{ namespace: string; count: number }>> {
const rows = await this.client.query<{ namespace: string; count: string }>(
`SELECT namespace, count(*)::text AS count FROM qmd_memory
WHERE active GROUP BY namespace ORDER BY namespace`,
);
return rows.map((r) => ({ namespace: r.namespace, count: Number.parseInt(r.count, 10) }));
}
async health(): Promise<{
server: string;
namespace: string;
fts: FtsCapabilities;
memories: number;
}> {
const server = await this.client.ping();
const row = await this.client.queryOne<{ count: string }>(
`SELECT count(*)::text AS count FROM qmd_memory WHERE namespace = $1 AND active`,
[this.defaultNamespace],
);
return {
server,
namespace: this.defaultNamespace,
fts: this.fts,
memories: Number.parseInt(row?.count ?? "0", 10),
};
}
async close(): Promise<void> {
await this.client.close();
}
}

178
src/pg/schema-pg.ts Normal file
View File

@ -0,0 +1,178 @@
/**
* pg/schema-pg.ts - PostgreSQL schema for the qmd memory bridge
*
* Mirrors qmd's SQLite model (content-addressable storage + documents + vectors
* + FTS) but adapted to PostgreSQL extensions and made multi-tenant via a
* `namespace` column so OpenClaw, Hermes, ... can share one instance safely.
*
* SQLite PostgreSQL
* content (hash doc) qmd_memory_content (namespace, hash, body, tsv)
* documents qmd_memory (namespace, key, hash, ...)
* content_vectors + vec0 qmd_memory_vectors (pgvector `vector`)
* documents_fts (FTS5/BM25) tsvector + GIN (pg_jieba / english)
* + pg_trgm for fuzzy matching
*/
import type { PgClient } from "./db-pg.js";
/** Text-search configuration chosen at bootstrap time. */
export interface FtsCapabilities {
/** ts config used for indexing/search: "jiebacfg" (中文) when available, else "english". */
config: string;
/** Whether pg_trgm is available for fuzzy matching. */
trigram: boolean;
/** Whether pgvector is available. */
vector: boolean;
}
/** Try a statement, swallow failure, report success. */
async function tryExec(client: PgClient, sql: string): Promise<boolean> {
try {
await client.exec(sql);
return true;
} catch {
return false;
}
}
async function hasExtension(client: PgClient, name: string): Promise<boolean> {
const row = await client.queryOne<{ one: number }>(
"SELECT 1 AS one FROM pg_extension WHERE extname = $1",
[name],
);
return !!row;
}
async function hasTsConfig(client: PgClient, name: string): Promise<boolean> {
const row = await client.queryOne<{ one: number }>(
"SELECT 1 AS one FROM pg_ts_config WHERE cfgname = $1",
[name],
);
return !!row;
}
/**
* Create extensions + tables + indexes. Idempotent. Degrades gracefully when an
* extension is unavailable (e.g. pg_jieba missing falls back to `english`).
*/
export async function bootstrapSchema(client: PgClient): Promise<FtsCapabilities> {
// Extensions — best-effort. A non-superuser may lack CREATE EXTENSION, in
// which case we detect what's already installed.
await tryExec(client, "CREATE EXTENSION IF NOT EXISTS vector");
await tryExec(client, "CREATE EXTENSION IF NOT EXISTS pg_trgm");
await tryExec(client, "CREATE EXTENSION IF NOT EXISTS pg_jieba");
const vector = await hasExtension(client, "vector");
const trigram = await hasExtension(client, "pg_trgm");
const jieba = await hasTsConfig(client, "jiebacfg");
const config = jieba ? "jiebacfg" : "english";
if (!vector) {
throw new Error(
"pgvector ('vector') extension is not available on this PostgreSQL server. " +
"It is required for the qmd memory backend (semantic search). " +
"Install it (postgresql.svc.plus ships it) or enable it: CREATE EXTENSION vector;",
);
}
// ── content-addressable storage (+ FTS) ──────────────────────────────────
// tsv is a generated column over the body using the detected ts config.
await client.exec(`
CREATE TABLE IF NOT EXISTS qmd_memory_content (
namespace text NOT NULL,
hash text NOT NULL,
body text NOT NULL,
tsv tsvector GENERATED ALWAYS AS (to_tsvector('${config}', body)) STORED,
created_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (namespace, hash)
)
`);
await tryExec(
client,
"CREATE INDEX IF NOT EXISTS qmd_memory_content_tsv_idx ON qmd_memory_content USING gin (tsv)",
);
if (trigram) {
await tryExec(
client,
"CREATE INDEX IF NOT EXISTS qmd_memory_content_trgm_idx ON qmd_memory_content USING gin (body gin_trgm_ops)",
);
}
// ── memory records (documents layer) ─────────────────────────────────────
await client.exec(`
CREATE TABLE IF NOT EXISTS qmd_memory (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
namespace text NOT NULL,
key text NOT NULL,
title text NOT NULL DEFAULT '',
hash text NOT NULL,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
active boolean NOT NULL DEFAULT true,
UNIQUE (namespace, key)
)
`);
await tryExec(
client,
"CREATE INDEX IF NOT EXISTS qmd_memory_ns_active_idx ON qmd_memory (namespace, active)",
);
await tryExec(
client,
"CREATE INDEX IF NOT EXISTS qmd_memory_hash_idx ON qmd_memory (namespace, hash)",
);
// ── per-chunk vector embeddings ──────────────────────────────────────────
// Column is unconstrained `vector` so any embedding dimension works without a
// rebuild; an HNSW index is added later via ensureVectorIndex() once the
// dimension is known. Exact (<=>) search works without the index.
await client.exec(`
CREATE TABLE IF NOT EXISTS qmd_memory_vectors (
namespace text NOT NULL,
hash text NOT NULL,
seq integer NOT NULL DEFAULT 0,
pos integer NOT NULL DEFAULT 0,
embedding vector NOT NULL,
model text NOT NULL,
embedded_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (namespace, hash, seq)
)
`);
// ── key/value config (mirrors store_config) ──────────────────────────────
await client.exec(`
CREATE TABLE IF NOT EXISTS qmd_memory_config (
namespace text NOT NULL,
key text NOT NULL,
value text,
PRIMARY KEY (namespace, key)
)
`);
return { config, trigram, vector };
}
/**
* Promote the embedding column to a fixed dimension and build an HNSW cosine
* index. Best-effort: if dimensions are mixed or the operation fails, exact
* search still works without the index.
*/
export async function ensureVectorIndex(client: PgClient, dimensions: number): Promise<boolean> {
if (!Number.isInteger(dimensions) || dimensions <= 0) return false;
// Fix the column dimension (no-op if already that dimension).
const typed = await tryExec(
client,
`ALTER TABLE qmd_memory_vectors ALTER COLUMN embedding TYPE vector(${dimensions})`,
);
if (!typed) return false;
return tryExec(
client,
"CREATE INDEX IF NOT EXISTS qmd_memory_vectors_hnsw_idx " +
"ON qmd_memory_vectors USING hnsw (embedding vector_cosine_ops)",
);
}
/** Detect the ts config currently in use (jiebacfg when pg_jieba is present). */
export async function detectFtsConfig(client: PgClient): Promise<string> {
return (await hasTsConfig(client, "jiebacfg")) ? "jiebacfg" : "english";
}

24
test/pg-compose.yml Normal file
View File

@ -0,0 +1,24 @@
# Local PostgreSQL for qmd memory-bridge integration tests.
#
# docker compose -f test/pg-compose.yml up -d
# QMD_PG_URL='postgres://postgres:postgres@localhost:5432/postgres' \
# npx vitest run test/pg-memory.integration.test.ts
# docker compose -f test/pg-compose.yml down -v
#
# pgvector/pgvector ships pgvector but NOT pg_jieba, so FTS falls back to the
# 'english' text-search config. For full Chinese tokenization use the
# postgresql.svc.plus image (ghcr.io/x-evor/images/postgresql) which bundles
# pgvector + pg_jieba + pg_trgm.
services:
qmd-pg:
image: pgvector/pgvector:pg16
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 2s
timeout: 5s
retries: 20

113
test/pg-config.test.ts Normal file
View File

@ -0,0 +1,113 @@
/**
* pg-config.test.ts - Pure unit tests for the PG memory bridge configuration.
*
* These never touch a database, so they run everywhere. Integration tests that
* require a live PostgreSQL are in pg-memory.integration.test.ts (gated on
* QMD_PG_URL).
*/
import { describe, test, expect } from "vitest";
import {
resolveBackend,
isPgBackend,
resolveNamespace,
resolvePgConfig,
resolvePgSsl,
redactConnectionString,
DEFAULT_NAMESPACE,
} from "../src/pg/config.js";
import { toVectorLiteral } from "../src/pg/db-pg.js";
describe("resolveBackend", () => {
test("defaults to sqlite", () => {
expect(resolveBackend({})).toBe("sqlite");
expect(isPgBackend({})).toBe(false);
});
test("accepts pg / postgres / postgresql", () => {
expect(resolveBackend({ QMD_BACKEND: "pg" })).toBe("pg");
expect(resolveBackend({ QMD_BACKEND: "postgres" })).toBe("pg");
expect(resolveBackend({ QMD_BACKEND: "POSTGRESQL" })).toBe("pg");
expect(isPgBackend({ QMD_BACKEND: "pg" })).toBe(true);
});
test("a bare connection URL does NOT switch the backend", () => {
// Selection must be explicit so existing SQLite workflows never change.
expect(resolveBackend({ QMD_PG_URL: "postgres://x/y" })).toBe("sqlite");
});
});
describe("resolveNamespace", () => {
test("defaults to the default namespace", () => {
expect(resolveNamespace({})).toBe(DEFAULT_NAMESPACE);
expect(resolveNamespace({ QMD_NAMESPACE: " " })).toBe(DEFAULT_NAMESPACE);
});
test("uses QMD_NAMESPACE when set", () => {
expect(resolveNamespace({ QMD_NAMESPACE: "openclaw" })).toBe("openclaw");
});
});
describe("resolvePgSsl", () => {
test("disable turns TLS off", () => {
expect(resolvePgSsl({ QMD_PG_SSL: "disable" })).toBe(false);
});
test("no-verify keeps TLS but skips verification", () => {
expect(resolvePgSsl({ QMD_PG_SSL: "no-verify" })).toEqual({ rejectUnauthorized: false });
});
test("defaults to TLS without verification when no CA is supplied", () => {
expect(resolvePgSsl({})).toEqual({ rejectUnauthorized: false });
});
});
describe("resolvePgConfig", () => {
test("throws a helpful error when no URL is set", () => {
expect(() => resolvePgConfig({ QMD_BACKEND: "pg" })).toThrow(/no connection URL/i);
});
test("builds config from QMD_PG_URL", () => {
const cfg = resolvePgConfig({
QMD_BACKEND: "pg",
QMD_PG_URL: "postgres://postgres:secret@db.example.com:5443/qmd",
QMD_NAMESPACE: "hermes",
});
expect(cfg.connectionString).toContain("db.example.com:5443");
expect(cfg.namespace).toBe("hermes");
expect(cfg.max).toBeGreaterThan(0);
});
test("falls back to DATABASE_URL", () => {
const cfg = resolvePgConfig({ DATABASE_URL: "postgres://u:p@h:5432/d" });
expect(cfg.connectionString).toContain("h:5432");
});
test("respects pool + timeout overrides", () => {
const cfg = resolvePgConfig({
QMD_PG_URL: "postgres://u:p@h:5432/d",
QMD_PG_POOL_MAX: "12",
QMD_PG_CONNECT_TIMEOUT_MS: "2500",
});
expect(cfg.max).toBe(12);
expect(cfg.connectionTimeoutMillis).toBe(2500);
});
});
describe("redactConnectionString", () => {
test("hides the password", () => {
const out = redactConnectionString("postgres://postgres:topsecret@db:5443/qmd");
expect(out).not.toContain("topsecret");
expect(out).toContain("db:5443");
});
});
describe("toVectorLiteral", () => {
test("formats number arrays as a pgvector literal", () => {
expect(toVectorLiteral([1, 2, 3])).toBe("[1,2,3]");
});
test("handles Float32Array", () => {
expect(toVectorLiteral(new Float32Array([0.5, -0.25]))).toBe("[0.5,-0.25]");
});
});

View File

@ -0,0 +1,113 @@
/**
* pg-memory.integration.test.ts - End-to-end tests against a live PostgreSQL.
*
* Skipped unless QMD_PG_URL is set. Spin up a PG with pgvector (+ optionally
* pg_jieba) and run:
*
* QMD_PG_URL='postgres://postgres:postgres@localhost:5432/postgres' \
* npx vitest run test/pg-memory.integration.test.ts
*
* Uses a deterministic stub embedder so the test needs no network or models
* it verifies schema bootstrap, namespaced writes, hybrid (FTS+vector) search,
* RRF fusion, get/list/delete, and namespace isolation.
*/
import { describe, test, expect, beforeAll, afterAll } from "vitest";
import { PgMemoryStore, type Embedder } from "../src/pg/memory-store.js";
import { resolvePgConfig } from "../src/pg/config.js";
const PG_URL = process.env.QMD_PG_URL ?? process.env.DATABASE_URL;
/** Deterministic 16-dim embedder — char-frequency histogram, L2-normalized. */
const stubEmbedder: Embedder = {
async embedBatch(texts) {
return texts.map((t) => {
const v = new Array(16).fill(0);
for (const ch of t.toLowerCase()) v[ch.charCodeAt(0) % 16] += 1;
const norm = Math.sqrt(v.reduce((s, x) => s + x * x, 0)) || 1;
return { embedding: v.map((x) => x / norm), model: "stub-16" };
});
},
};
const NS = `test_${Date.now()}`;
describe.skipIf(!PG_URL)("PgMemoryStore (integration)", () => {
let store: PgMemoryStore;
beforeAll(async () => {
const config = { ...resolvePgConfig({ QMD_PG_URL: PG_URL!, QMD_NAMESPACE: NS }) };
store = await PgMemoryStore.open(config, stubEmbedder);
});
afterAll(async () => {
if (store) {
// Best-effort cleanup of the test namespace.
try {
for (const m of await store.listMemories({ limit: 1000 })) {
await store.deleteMemory(m.key);
}
} catch {
/* ignore */
}
await store.close();
}
});
test("bootstraps schema with pgvector", async () => {
expect(store.capabilities.vector).toBe(true);
});
test("adds and retrieves a memory", async () => {
const res = await store.addMemory({
key: "note/auth",
title: "Auth design",
body: "The login flow uses OAuth tokens and a refresh rotation strategy.",
});
expect(res.namespace).toBe(NS);
expect(res.chunks).toBeGreaterThan(0);
expect(res.embedded).toBe(true);
const got = await store.getMemory("note/auth");
expect(got?.body).toContain("OAuth");
expect(got?.title).toBe("Auth design");
});
test("hybrid search finds a memory by keyword", async () => {
await store.addMemory({ key: "note/cache", body: "Redis caching layer with TTL eviction." });
const results = await store.searchMemory("redis caching", { limit: 5 });
expect(results.length).toBeGreaterThan(0);
expect(results.some((r) => r.key === "note/cache")).toBe(true);
// At least one signal contributed.
expect(results[0]!.lexRank !== null || results[0]!.vecRank !== null).toBe(true);
});
test("re-adding the same key updates it", async () => {
await store.addMemory({ key: "note/dup", body: "first version" });
await store.addMemory({ key: "note/dup", body: "second version updated" });
const got = await store.getMemory("note/dup");
expect(got?.body).toBe("second version updated");
const all = await store.listMemories({ limit: 1000 });
expect(all.filter((m) => m.key === "note/dup").length).toBe(1);
});
test("delete soft-removes a memory", async () => {
await store.addMemory({ key: "note/temp", body: "temporary" });
expect(await store.deleteMemory("note/temp")).toBe(true);
expect(await store.getMemory("note/temp")).toBeNull();
});
test("namespaces isolate memories", async () => {
const other = `${NS}_other`;
await store.addMemory({ key: "shared/key", body: "in default test ns" });
await store.addMemory({ key: "shared/key", body: "in other ns", namespace: other });
const a = await store.getMemory("shared/key");
const b = await store.getMemory("shared/key", { namespace: other });
expect(a?.body).toBe("in default test ns");
expect(b?.body).toBe("in other ns");
// cleanup other ns
await store.deleteMemory("shared/key", { namespace: other });
});
});