feat(rag): add zhcn schema

This commit is contained in:
shenlan 2025-08-13 14:00:57 +08:00
parent f7b0b06ef2
commit a479b10efd
4 changed files with 117 additions and 22 deletions

View File

@ -1,26 +1,66 @@
-- init.sql - minimal schema for vector RAG (1024 dims)
-- ================================================
-- init.sql - Stable RAG schema with Hybrid Search
-- For pgvector ≥ 0.5, BGE-M3 (1024 dims), zhparser+english
-- ================================================
-- 1. 避免锁表/阻塞
SET lock_timeout = '5s';
SET statement_timeout = '0';
-- 2. 必要扩展(向量 + 中文分词)
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS zhparser;
-- 3. 中文+ 英文混合全文检索配置zhparser + simple
-- 自定义配置名zhcn_search
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_ts_config WHERE cfgname = 'zhcn_search') THEN
CREATE TEXT SEARCH CONFIGURATION zhcn_search (PARSER = zhparser);
ALTER TEXT SEARCH CONFIGURATION zhcn_search
ADD MAPPING FOR n,v,a,i,e,l WITH simple;
END IF;
END$$;
-- 4. 创建主表
CREATE TABLE IF NOT EXISTS public.documents (
id BIGSERIAL PRIMARY KEY,
repo TEXT NOT NULL,
path TEXT NOT NULL,
chunk_id INT NOT NULL,
content TEXT NOT NULL,
embedding VECTOR(1024),
embedding VECTOR(1024), -- 向量字段bge-m3
metadata JSONB,
content_sha TEXT NOT NULL,
content_tsv tsvector GENERATED ALWAYS AS (to_tsvector('english', content)) STORED,
content_sha TEXT NOT NULL,
-- 中文+英文全文搜索字段
content_tsv tsvector GENERATED ALWAYS AS (
setweight(to_tsvector('zhcn_search', coalesce(content, '')), 'A')
) STORED,
-- 文档唯一标识(组合键 doc_key
doc_key TEXT GENERATED ALWAYS AS (
repo || ':' || path || ':' || chunk_id
) STORED,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (repo, path, chunk_id)
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS documents_embedding_idx
ON public.documents USING hnsw (embedding vector_cosine_ops);
-- 5. 唯一约束(支持 UPSERT
CREATE UNIQUE INDEX IF NOT EXISTS documents_doc_key_uk
ON public.documents (doc_key);
CREATE INDEX IF NOT EXISTS documents_content_tsv_idx
-- 6. 向量索引(仅索引非空 embedding
CREATE INDEX IF NOT EXISTS documents_embedding_idx
ON public.documents
USING hnsw (embedding vector_cosine_ops)
WHERE embedding IS NOT NULL;
-- 7. 全文索引(中文 + 英文,使用 zhcn_search
CREATE INDEX IF NOT EXISTS idx_documents_tsv
ON public.documents USING gin (content_tsv);
-- 8. 复合过滤索引(适配 repo + path 检索场景)
CREATE INDEX IF NOT EXISTS idx_documents_repo_path
ON public.documents (repo, path);

View File

@ -48,9 +48,40 @@
psql -h 127.0.0.1 -U shenlan -d mydb -f docs/init.sql
```
该脚本会:
- 创建 `vector` 扩展(若尚未启用)。
- 创建存储文档及其向量的 `documents` 表。
- 为向量检索和 JSONB 元数据建立索引。
- 创建 `vector``zhparser` 扩展(如未启用)。
- 定义混合中文/英文的全文搜索配置 `zhcn_search`
- 创建 `documents` 表,并包含:
- 预计算 `doc_key` 生成列repo:path:chunk_id
- `content_tsv` 生成列支持中文/英文全文检索。
- `embedding` VECTOR(1024) 字段适配 BGE-M3。
- 建立 `HNSW` 向量索引、`GIN` 全文索引以及 `(repo, path)` 复合索引。
### 示例UPSERT 与 Hybrid 检索
插入或更新文档:
```sql
INSERT INTO public.documents (
repo, path, chunk_id, content, embedding, metadata, content_sha
) VALUES (
'docs', 'README.md', 1, '内容...', '[...]', '{}', 'abc123'
)
ON CONFLICT (doc_key) DO UPDATE
SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata,
content_sha = EXCLUDED.content_sha,
updated_at = now();
```
Hybrid 检索:
```sql
SELECT *
FROM public.documents
WHERE content_tsv @@ to_tsquery('zhcn_search', '大模型 & 应用')
AND embedding IS NOT NULL
ORDER BY embedding <#> '[...]'
LIMIT 5;
```
## 5. 测试连接
确认数据库与扩展均正常工作:

View File

@ -109,7 +109,7 @@ func (s *Service) Query(ctx context.Context, question string, limit int) ([]Docu
}
docsMap := map[string]*scored{}
vrows, err := conn.Query(ctx, `SELECT repo,path,chunk_id,content,metadata, embedding <-> $1 AS dist FROM documents ORDER BY embedding <-> $1 LIMIT $2`,
vrows, err := conn.Query(ctx, `SELECT repo,path,chunk_id,content,metadata, embedding <#> $1 AS dist FROM documents WHERE embedding IS NOT NULL ORDER BY embedding <#> $1 LIMIT $2`,
pgvector.NewVector(vecs[0]), cand)
if err != nil {
return nil, err
@ -131,7 +131,7 @@ func (s *Service) Query(ctx context.Context, question string, limit int) ([]Docu
}
vrows.Close()
trows, err := conn.Query(ctx, `SELECT repo,path,chunk_id,content,metadata, ts_rank_cd(content_tsv, websearch_to_tsquery($1)) AS rank FROM documents WHERE content_tsv @@ websearch_to_tsquery($1) ORDER BY rank DESC LIMIT $2`,
trows, err := conn.Query(ctx, `SELECT repo,path,chunk_id,content,metadata, ts_rank_cd(content_tsv, websearch_to_tsquery('zhcn_search', $1)) AS rank FROM documents WHERE content_tsv @@ websearch_to_tsquery('zhcn_search', $1) ORDER BY rank DESC LIMIT $2`,
question, cand)
if err != nil {
return nil, err

View File

@ -21,9 +21,25 @@ type DocRow struct {
}
// EnsureSchema creates the documents table and minimal indexes required for
// hybrid search. It avoids extensive migrations and only ensures the basic
// structure needed by the service.
// hybrid search. It ensures extensions and text search configuration needed by
// the RAG service.
func EnsureSchema(ctx context.Context, conn *pgx.Conn, dim int, _ bool) error {
if _, err := conn.Exec(ctx, `CREATE EXTENSION IF NOT EXISTS vector`); err != nil {
return err
}
if _, err := conn.Exec(ctx, `CREATE EXTENSION IF NOT EXISTS zhparser`); err != nil {
return err
}
if _, err := conn.Exec(ctx, `DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_ts_config WHERE cfgname = 'zhcn_search') THEN
CREATE TEXT SEARCH CONFIGURATION zhcn_search (PARSER = zhparser);
ALTER TEXT SEARCH CONFIGURATION zhcn_search ADD MAPPING FOR n,v,a,i,e,l WITH simple;
END IF;
END$$;`); err != nil {
return err
}
create := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS documents (
id BIGSERIAL PRIMARY KEY,
repo TEXT NOT NULL,
@ -33,18 +49,26 @@ func EnsureSchema(ctx context.Context, conn *pgx.Conn, dim int, _ bool) error {
embedding VECTOR(%d),
metadata JSONB,
content_sha TEXT NOT NULL,
content_tsv tsvector GENERATED ALWAYS AS (to_tsvector('english', content)) STORED,
content_tsv tsvector GENERATED ALWAYS AS (
setweight(to_tsvector('zhcn_search', coalesce(content, '')), 'A')
) STORED,
doc_key TEXT GENERATED ALWAYS AS (repo || ':' || path || ':' || chunk_id) STORED,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(repo,path,chunk_id)
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
)`, dim)
if _, err := conn.Exec(ctx, create); err != nil {
return err
}
if _, err := conn.Exec(ctx, `CREATE INDEX IF NOT EXISTS documents_embedding_idx ON documents USING hnsw (embedding vector_cosine_ops)`); err != nil {
if _, err := conn.Exec(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS documents_doc_key_uk ON documents (doc_key)`); err != nil {
return err
}
if _, err := conn.Exec(ctx, `CREATE INDEX IF NOT EXISTS documents_content_tsv_idx ON documents USING GIN (content_tsv)`); err != nil {
if _, err := conn.Exec(ctx, `CREATE INDEX IF NOT EXISTS documents_embedding_idx ON documents USING hnsw (embedding vector_cosine_ops) WHERE embedding IS NOT NULL`); err != nil {
return err
}
if _, err := conn.Exec(ctx, `CREATE INDEX IF NOT EXISTS idx_documents_tsv ON documents USING gin (content_tsv)`); err != nil {
return err
}
if _, err := conn.Exec(ctx, `CREATE INDEX IF NOT EXISTS idx_documents_repo_path ON documents (repo, path)`); err != nil {
return err
}
return nil
@ -60,7 +84,7 @@ func UpsertDocuments(ctx context.Context, conn *pgx.Conn, rows []DocRow) (int, e
meta, _ := json.Marshal(r.Metadata)
batch.Queue(`INSERT INTO documents (repo,path,chunk_id,content,embedding,metadata,content_sha)
VALUES ($1,$2,$3,$4,$5,$6,$7)
ON CONFLICT (repo,path,chunk_id) DO UPDATE
ON CONFLICT (doc_key) DO UPDATE
SET content=EXCLUDED.content,
embedding=EXCLUDED.embedding,
metadata=EXCLUDED.metadata,