server: implement rag system modules

This commit is contained in:
shenlan 2025-08-05 09:26:07 +08:00
parent b9a8c7ea3a
commit 907461c8e8
10 changed files with 441 additions and 1 deletions

77
docs/rag-system-design.md Normal file
View File

@ -0,0 +1,77 @@
# 基于 Go 的 RAG 系统设计
本文档描述一个使用 Go 实现的 Retrieval Augmented Generation (RAG) 系统方案满足多仓库文档同步、Markdown 解析与分块、向量化存储以及问答检索的需求。
## 1. 数据源同步
- **多仓库配置**:支持在配置文件中声明多个 GitHub 仓库及其关注路径,例如:
- `repoA/docs/`
- `repoB/guides/`
- `repoC/tutorials/`
- **同步方式**:使用 `git clone` / `git pull` 将远程仓库同步到本地,定时任务或 Webhook 触发。
- **增量更新**:检测新增或修改的 Markdown 文档,触发后续嵌入流程。
## 2. Markdown 处理与分块
- 使用 `goldmark``blackfriday` 将 Markdown 渲染为纯文本。
- 对文档按照 500~1000 tokens 分块,保存每块的顺序和位置信息。
- 每个分块生成唯一 `chunk_id` 以便回溯源文档位置。
## 3. 向量化
- 默认嵌入模型:
- [bge-m3](https://github.com/BAAI-bge/): 本地部署HTTP 服务返回 1536 维向量。
- [OpenAI `text-embedding-3-large`](https://platform.openai.com/docs/guides/embeddings): 通过 OpenAI API 获取 1536 维向量。
- 统一的 `Embed(text string) ([]float32, error)` 接口屏蔽具体实现,可在配置中切换模型。
## 4. 数据库设计
使用 PostgreSQL + [pgvector](https://github.com/pgvector/pgvector)。建表及索引 SQL 如下:
```sql
CREATE TABLE documents (
id BIGSERIAL PRIMARY KEY,
repo TEXT NOT NULL, -- 来源仓库
path TEXT NOT NULL, -- 文件路径
chunk_id INT NOT NULL,
content TEXT NOT NULL,
embedding VECTOR(1536), -- 向量
metadata JSONB -- 额外信息:标签/更新时间等
);
-- 向量索引
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);
-- 元数据索引
CREATE INDEX idx_documents_metadata ON documents USING gin (metadata);
```
## 5. 检索与问答流程
1. 用户提出问题,服务端调用 `Embed()` 对问题生成向量。
2. 在 `documents` 表中通过 `cosine` 相似度检索 Top K 的分块。
3. 将检索结果拼装为 Prompt调用 GPT/Claude 等大模型生成回答。
4. 返回答案并附带来源文档信息以便追溯。
## 6. Go 代码模块划分
```
server/rag/
├── config/ # 仓库与模型配置
├── sync/ # GitHub 同步逻辑
├── ingest/ # Markdown 解析与分块
├── embed/ # 向量化接口实现
├── store/ # PostgreSQL + pgvector 操作封装
└── api/ # REST/gRPC 接口,提供问答与同步触发
```
各模块通过 `go-pg`/`pgx`、`go-git` 等库实现,协程与通道用于提升并行处理能力。
## 7. 未来扩展
- 支持更多文件格式,如 PDF、HTML。
- 嵌入向量批量写入以提升效率。
- 引入缓存与摘要生成,进一步优化响应速度。
以上设计为后续实现提供结构化指导,可在项目中逐步落地。

4
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/go-git/go-git/v5 v5.16.2
github.com/jackc/pgx/v5 v5.7.5
github.com/yuin/goldmark v1.7.13
gopkg.in/yaml.v3 v3.0.1
gorm.io/gorm v1.25.2
)
@ -32,6 +33,7 @@ require (
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
@ -52,9 +54,9 @@ require (
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

62
server/rag/api/api.go Normal file
View File

@ -0,0 +1,62 @@
package api
import (
"context"
"net/http"
"github.com/gin-gonic/gin"
"xcontrol/server/rag/config"
"xcontrol/server/rag/embed"
"xcontrol/server/rag/ingest"
"xcontrol/server/rag/store"
rsync "xcontrol/server/rag/sync"
)
// Register mounts RAG routes on the gin engine.
func Register(r *gin.Engine, cfg *config.Config, st *store.Store, emb embed.Embedder) {
r.POST("/rag/sync", func(c *gin.Context) {
for _, repo := range cfg.Repos {
files, err := rsync.Repo(repo)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
for _, f := range files {
docs, err := ingest.File(repo.URL, f)
if err != nil {
continue
}
for i := range docs {
vec, err := emb.Embed(c.Request.Context(), docs[i].Content)
if err != nil {
continue
}
docs[i].Embedding = vec
}
st.Upsert(context.Background(), docs)
}
}
c.JSON(http.StatusOK, gin.H{"status": "ok"})
})
r.POST("/rag/query", func(c *gin.Context) {
var req struct {
Question string `json:"question"`
}
if err := c.BindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
vec, err := emb.Embed(c.Request.Context(), req.Question)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
docs, err := st.Search(c.Request.Context(), vec, 5)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"chunks": docs})
})
}

View File

@ -0,0 +1,33 @@
package config
import (
"gopkg.in/yaml.v3"
"os"
)
// Repo holds configuration for a single Git repository and paths to index.
type Repo struct {
URL string `yaml:"url"`
Branch string `yaml:"branch"`
Paths []string `yaml:"paths"`
Local string `yaml:"local"`
}
// Config describes the RAG ingestion settings.
type Config struct {
Repos []Repo `yaml:"repos"`
Embedder string `yaml:"embedder"`
}
// Load reads YAML configuration from path.
func Load(path string) (*Config, error) {
b, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var c Config
if err := yaml.Unmarshal(b, &c); err != nil {
return nil, err
}
return &c, nil
}

42
server/rag/embed/bge.go Normal file
View File

@ -0,0 +1,42 @@
package embed
import (
"bytes"
"context"
"encoding/json"
"net/http"
)
// BGE calls a local bge-m3 embedding service.
type BGE struct {
Endpoint string
Client *http.Client
}
// NewBGE returns a new BGE embedder.
func NewBGE(endpoint string) *BGE {
return &BGE{Endpoint: endpoint, Client: &http.Client{}}
}
// Embed posts text to the bge service and parses the vector.
func (b *BGE) Embed(ctx context.Context, text string) ([]float32, error) {
body := map[string]string{"text": text}
data, _ := json.Marshal(body)
req, err := http.NewRequestWithContext(ctx, "POST", b.Endpoint, bytes.NewReader(data))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
resp, err := b.Client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var res struct {
Embedding []float32 `json:"embedding"`
}
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
return nil, err
}
return res.Embedding, nil
}

View File

@ -0,0 +1,8 @@
package embed
import "context"
// Embedder produces a vector representation for input text.
type Embedder interface {
Embed(ctx context.Context, text string) ([]float32, error)
}

View File

@ -0,0 +1,49 @@
package embed
import (
"bytes"
"context"
"encoding/json"
"net/http"
)
// OpenAI calls the OpenAI embeddings endpoint.
type OpenAI struct {
APIKey string
Model string
Client *http.Client
}
// NewOpenAI creates a new OpenAI embedder.
func NewOpenAI(model, key string) *OpenAI {
return &OpenAI{Model: model, APIKey: key, Client: &http.Client{}}
}
// Embed generates an embedding using OpenAI API.
func (o *OpenAI) Embed(ctx context.Context, text string) ([]float32, error) {
body := map[string]any{"input": text, "model": o.Model}
b, _ := json.Marshal(body)
req, err := http.NewRequestWithContext(ctx, "POST", "https://api.openai.com/v1/embeddings", bytes.NewReader(b))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+o.APIKey)
req.Header.Set("Content-Type", "application/json")
resp, err := o.Client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var res struct {
Data []struct {
Embedding []float32 `json:"embedding"`
} `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
return nil, err
}
if len(res.Data) == 0 {
return nil, nil
}
return res.Data[0].Embedding, nil
}

View File

@ -0,0 +1,43 @@
package ingest
import (
"bytes"
"io/ioutil"
"strings"
"github.com/yuin/goldmark"
"xcontrol/server/rag/store"
)
const chunkSize = 800
// File reads a markdown file and returns chunked documents.
func File(repo, path string) ([]store.Document, error) {
b, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
var buf bytes.Buffer
if err := goldmark.Convert(b, &buf); err != nil {
return nil, err
}
words := strings.Fields(buf.String())
var docs []store.Document
for i := 0; i < len(words); i += chunkSize {
end := i + chunkSize
if end > len(words) {
end = len(words)
}
chunk := strings.Join(words[i:end], " ")
docs = append(docs, store.Document{
Repo: repo,
Path: path,
ChunkID: len(docs),
Content: chunk,
Metadata: map[string]any{
"offset": i,
},
})
}
return docs, nil
}

79
server/rag/store/store.go Normal file
View File

@ -0,0 +1,79 @@
package store
import (
"context"
"encoding/json"
"github.com/jackc/pgx/v5/pgxpool"
)
// Document represents a chunk stored in Postgres.
type Document struct {
ID int64
Repo string
Path string
ChunkID int
Content string
Embedding []float32
Metadata map[string]any
}
// Store wraps a pgx pool for vector operations.
type Store struct {
pool *pgxpool.Pool
}
// New creates a new Store connected using dsn.
func New(ctx context.Context, dsn string) (*Store, error) {
p, err := pgxpool.New(ctx, dsn)
if err != nil {
return nil, err
}
return &Store{pool: p}, nil
}
// Upsert writes documents and their embeddings to the database.
func (s *Store) Upsert(ctx context.Context, docs []Document) error {
for _, d := range docs {
meta, _ := json.Marshal(d.Metadata)
_, err := s.pool.Exec(ctx,
`INSERT INTO documents (repo,path,chunk_id,content,embedding,metadata)
VALUES ($1,$2,$3,$4,$5,$6)
ON CONFLICT (repo,path,chunk_id) DO UPDATE
SET content=EXCLUDED.content,
embedding=EXCLUDED.embedding,
metadata=EXCLUDED.metadata`,
d.Repo, d.Path, d.ChunkID, d.Content, d.Embedding, meta,
)
if err != nil {
return err
}
}
return nil
}
// Search returns top similar documents ordered by cosine distance.
func (s *Store) Search(ctx context.Context, vec []float32, limit int) ([]Document, error) {
rows, err := s.pool.Query(ctx,
`SELECT repo,path,chunk_id,content,metadata
FROM documents
ORDER BY embedding <-> $1
LIMIT $2`, vec, limit,
)
if err != nil {
return nil, err
}
defer rows.Close()
var res []Document
for rows.Next() {
var d Document
var meta []byte
if err := rows.Scan(&d.Repo, &d.Path, &d.ChunkID, &d.Content, &meta); err != nil {
return nil, err
}
json.Unmarshal(meta, &d.Metadata)
res = append(res, d)
}
return res, rows.Err()
}

45
server/rag/sync/sync.go Normal file
View File

@ -0,0 +1,45 @@
package sync
import (
"io/fs"
"path/filepath"
git "github.com/go-git/go-git/v5"
"xcontrol/server/rag/config"
)
// Repo synchronizes the configured repository and returns markdown file paths.
func Repo(c config.Repo) ([]string, error) {
if _, err := git.PlainOpen(c.Local); err != nil {
if _, err := git.PlainClone(c.Local, false, &git.CloneOptions{URL: c.URL}); err != nil {
return nil, err
}
} else {
r, err := git.PlainOpen(c.Local)
if err != nil {
return nil, err
}
w, err := r.Worktree()
if err != nil {
return nil, err
}
_ = w.Pull(&git.PullOptions{RemoteName: "origin"})
}
var files []string
for _, p := range c.Paths {
root := filepath.Join(c.Local, p)
filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return nil
}
if d.IsDir() {
return nil
}
if filepath.Ext(path) == ".md" {
files = append(files, path)
}
return nil
})
}
return files, nil
}