From 907461c8e8d7d92e2c70c4709f015659bbcbfe61 Mon Sep 17 00:00:00 2001 From: shenlan Date: Tue, 5 Aug 2025 09:26:07 +0800 Subject: [PATCH] server: implement rag system modules --- docs/rag-system-design.md | 77 ++++++++++++++++++++++++++++++++++++ go.mod | 4 +- server/rag/api/api.go | 62 +++++++++++++++++++++++++++++ server/rag/config/config.go | 33 ++++++++++++++++ server/rag/embed/bge.go | 42 ++++++++++++++++++++ server/rag/embed/embed.go | 8 ++++ server/rag/embed/openai.go | 49 +++++++++++++++++++++++ server/rag/ingest/ingest.go | 43 ++++++++++++++++++++ server/rag/store/store.go | 79 +++++++++++++++++++++++++++++++++++++ server/rag/sync/sync.go | 45 +++++++++++++++++++++ 10 files changed, 441 insertions(+), 1 deletion(-) create mode 100644 docs/rag-system-design.md create mode 100644 server/rag/api/api.go create mode 100644 server/rag/config/config.go create mode 100644 server/rag/embed/bge.go create mode 100644 server/rag/embed/embed.go create mode 100644 server/rag/embed/openai.go create mode 100644 server/rag/ingest/ingest.go create mode 100644 server/rag/store/store.go create mode 100644 server/rag/sync/sync.go diff --git a/docs/rag-system-design.md b/docs/rag-system-design.md new file mode 100644 index 0000000..7d19962 --- /dev/null +++ b/docs/rag-system-design.md @@ -0,0 +1,77 @@ +# 基于 Go 的 RAG 系统设计 + +本文档描述一个使用 Go 实现的 Retrieval Augmented Generation (RAG) 系统方案,满足多仓库文档同步、Markdown 解析与分块、向量化存储以及问答检索的需求。 + +## 1. 数据源同步 + +- **多仓库配置**:支持在配置文件中声明多个 GitHub 仓库及其关注路径,例如: + - `repoA/docs/` + - `repoB/guides/` + - `repoC/tutorials/` +- **同步方式**:使用 `git clone` / `git pull` 将远程仓库同步到本地,定时任务或 Webhook 触发。 +- **增量更新**:检测新增或修改的 Markdown 文档,触发后续嵌入流程。 + +## 2. Markdown 处理与分块 + +- 使用 `goldmark` 或 `blackfriday` 将 Markdown 渲染为纯文本。 +- 对文档按照 500~1000 tokens 分块,保存每块的顺序和位置信息。 +- 每个分块生成唯一 `chunk_id` 以便回溯源文档位置。 + +## 3. 向量化 + +- 默认嵌入模型: + - [bge-m3](https://github.com/BAAI-bge/): 本地部署,HTTP 服务返回 1536 维向量。 + - [OpenAI `text-embedding-3-large`](https://platform.openai.com/docs/guides/embeddings): 通过 OpenAI API 获取 1536 维向量。 +- 统一的 `Embed(text string) ([]float32, error)` 接口屏蔽具体实现,可在配置中切换模型。 + +## 4. 数据库设计 + +使用 PostgreSQL + [pgvector](https://github.com/pgvector/pgvector)。建表及索引 SQL 如下: + +```sql +CREATE TABLE documents ( + id BIGSERIAL PRIMARY KEY, + repo TEXT NOT NULL, -- 来源仓库 + path TEXT NOT NULL, -- 文件路径 + chunk_id INT NOT NULL, + content TEXT NOT NULL, + embedding VECTOR(1536), -- 向量 + metadata JSONB -- 额外信息:标签/更新时间等 +); + +-- 向量索引 +CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops); + +-- 元数据索引 +CREATE INDEX idx_documents_metadata ON documents USING gin (metadata); +``` + +## 5. 检索与问答流程 + +1. 用户提出问题,服务端调用 `Embed()` 对问题生成向量。 +2. 在 `documents` 表中通过 `cosine` 相似度检索 Top K 的分块。 +3. 将检索结果拼装为 Prompt,调用 GPT/Claude 等大模型生成回答。 +4. 返回答案并附带来源文档信息以便追溯。 + +## 6. Go 代码模块划分 + +``` +server/rag/ +├── config/ # 仓库与模型配置 +├── sync/ # GitHub 同步逻辑 +├── ingest/ # Markdown 解析与分块 +├── embed/ # 向量化接口实现 +├── store/ # PostgreSQL + pgvector 操作封装 +└── api/ # REST/gRPC 接口,提供问答与同步触发 +``` + +各模块通过 `go-pg`/`pgx`、`go-git` 等库实现,协程与通道用于提升并行处理能力。 + +## 7. 未来扩展 + +- 支持更多文件格式,如 PDF、HTML。 +- 嵌入向量批量写入以提升效率。 +- 引入缓存与摘要生成,进一步优化响应速度。 + +以上设计为后续实现提供结构化指导,可在项目中逐步落地。 + diff --git a/go.mod b/go.mod index 2e74828..763c365 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/go-git/go-git/v5 v5.16.2 github.com/jackc/pgx/v5 v5.7.5 github.com/yuin/goldmark v1.7.13 + gopkg.in/yaml.v3 v3.0.1 gorm.io/gorm v1.25.2 ) @@ -32,6 +33,7 @@ require ( github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect @@ -52,9 +54,9 @@ require ( golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.37.0 // indirect golang.org/x/net v0.39.0 // indirect + golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect golang.org/x/text v0.24.0 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/server/rag/api/api.go b/server/rag/api/api.go new file mode 100644 index 0000000..a5aa579 --- /dev/null +++ b/server/rag/api/api.go @@ -0,0 +1,62 @@ +package api + +import ( + "context" + "net/http" + + "github.com/gin-gonic/gin" + "xcontrol/server/rag/config" + "xcontrol/server/rag/embed" + "xcontrol/server/rag/ingest" + "xcontrol/server/rag/store" + rsync "xcontrol/server/rag/sync" +) + +// Register mounts RAG routes on the gin engine. +func Register(r *gin.Engine, cfg *config.Config, st *store.Store, emb embed.Embedder) { + r.POST("/rag/sync", func(c *gin.Context) { + for _, repo := range cfg.Repos { + files, err := rsync.Repo(repo) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + for _, f := range files { + docs, err := ingest.File(repo.URL, f) + if err != nil { + continue + } + for i := range docs { + vec, err := emb.Embed(c.Request.Context(), docs[i].Content) + if err != nil { + continue + } + docs[i].Embedding = vec + } + st.Upsert(context.Background(), docs) + } + } + c.JSON(http.StatusOK, gin.H{"status": "ok"}) + }) + + r.POST("/rag/query", func(c *gin.Context) { + var req struct { + Question string `json:"question"` + } + if err := c.BindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + vec, err := emb.Embed(c.Request.Context(), req.Question) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + docs, err := st.Search(c.Request.Context(), vec, 5) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"chunks": docs}) + }) +} diff --git a/server/rag/config/config.go b/server/rag/config/config.go new file mode 100644 index 0000000..fe10208 --- /dev/null +++ b/server/rag/config/config.go @@ -0,0 +1,33 @@ +package config + +import ( + "gopkg.in/yaml.v3" + "os" +) + +// Repo holds configuration for a single Git repository and paths to index. +type Repo struct { + URL string `yaml:"url"` + Branch string `yaml:"branch"` + Paths []string `yaml:"paths"` + Local string `yaml:"local"` +} + +// Config describes the RAG ingestion settings. +type Config struct { + Repos []Repo `yaml:"repos"` + Embedder string `yaml:"embedder"` +} + +// Load reads YAML configuration from path. +func Load(path string) (*Config, error) { + b, err := os.ReadFile(path) + if err != nil { + return nil, err + } + var c Config + if err := yaml.Unmarshal(b, &c); err != nil { + return nil, err + } + return &c, nil +} diff --git a/server/rag/embed/bge.go b/server/rag/embed/bge.go new file mode 100644 index 0000000..0805755 --- /dev/null +++ b/server/rag/embed/bge.go @@ -0,0 +1,42 @@ +package embed + +import ( + "bytes" + "context" + "encoding/json" + "net/http" +) + +// BGE calls a local bge-m3 embedding service. +type BGE struct { + Endpoint string + Client *http.Client +} + +// NewBGE returns a new BGE embedder. +func NewBGE(endpoint string) *BGE { + return &BGE{Endpoint: endpoint, Client: &http.Client{}} +} + +// Embed posts text to the bge service and parses the vector. +func (b *BGE) Embed(ctx context.Context, text string) ([]float32, error) { + body := map[string]string{"text": text} + data, _ := json.Marshal(body) + req, err := http.NewRequestWithContext(ctx, "POST", b.Endpoint, bytes.NewReader(data)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + resp, err := b.Client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + var res struct { + Embedding []float32 `json:"embedding"` + } + if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + return nil, err + } + return res.Embedding, nil +} diff --git a/server/rag/embed/embed.go b/server/rag/embed/embed.go new file mode 100644 index 0000000..aae0139 --- /dev/null +++ b/server/rag/embed/embed.go @@ -0,0 +1,8 @@ +package embed + +import "context" + +// Embedder produces a vector representation for input text. +type Embedder interface { + Embed(ctx context.Context, text string) ([]float32, error) +} diff --git a/server/rag/embed/openai.go b/server/rag/embed/openai.go new file mode 100644 index 0000000..0652e1b --- /dev/null +++ b/server/rag/embed/openai.go @@ -0,0 +1,49 @@ +package embed + +import ( + "bytes" + "context" + "encoding/json" + "net/http" +) + +// OpenAI calls the OpenAI embeddings endpoint. +type OpenAI struct { + APIKey string + Model string + Client *http.Client +} + +// NewOpenAI creates a new OpenAI embedder. +func NewOpenAI(model, key string) *OpenAI { + return &OpenAI{Model: model, APIKey: key, Client: &http.Client{}} +} + +// Embed generates an embedding using OpenAI API. +func (o *OpenAI) Embed(ctx context.Context, text string) ([]float32, error) { + body := map[string]any{"input": text, "model": o.Model} + b, _ := json.Marshal(body) + req, err := http.NewRequestWithContext(ctx, "POST", "https://api.openai.com/v1/embeddings", bytes.NewReader(b)) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+o.APIKey) + req.Header.Set("Content-Type", "application/json") + resp, err := o.Client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + var res struct { + Data []struct { + Embedding []float32 `json:"embedding"` + } `json:"data"` + } + if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + return nil, err + } + if len(res.Data) == 0 { + return nil, nil + } + return res.Data[0].Embedding, nil +} diff --git a/server/rag/ingest/ingest.go b/server/rag/ingest/ingest.go new file mode 100644 index 0000000..42b1a28 --- /dev/null +++ b/server/rag/ingest/ingest.go @@ -0,0 +1,43 @@ +package ingest + +import ( + "bytes" + "io/ioutil" + "strings" + + "github.com/yuin/goldmark" + "xcontrol/server/rag/store" +) + +const chunkSize = 800 + +// File reads a markdown file and returns chunked documents. +func File(repo, path string) ([]store.Document, error) { + b, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + var buf bytes.Buffer + if err := goldmark.Convert(b, &buf); err != nil { + return nil, err + } + words := strings.Fields(buf.String()) + var docs []store.Document + for i := 0; i < len(words); i += chunkSize { + end := i + chunkSize + if end > len(words) { + end = len(words) + } + chunk := strings.Join(words[i:end], " ") + docs = append(docs, store.Document{ + Repo: repo, + Path: path, + ChunkID: len(docs), + Content: chunk, + Metadata: map[string]any{ + "offset": i, + }, + }) + } + return docs, nil +} diff --git a/server/rag/store/store.go b/server/rag/store/store.go new file mode 100644 index 0000000..01dcb28 --- /dev/null +++ b/server/rag/store/store.go @@ -0,0 +1,79 @@ +package store + +import ( + "context" + "encoding/json" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// Document represents a chunk stored in Postgres. +type Document struct { + ID int64 + Repo string + Path string + ChunkID int + Content string + Embedding []float32 + Metadata map[string]any +} + +// Store wraps a pgx pool for vector operations. +type Store struct { + pool *pgxpool.Pool +} + +// New creates a new Store connected using dsn. +func New(ctx context.Context, dsn string) (*Store, error) { + p, err := pgxpool.New(ctx, dsn) + if err != nil { + return nil, err + } + return &Store{pool: p}, nil +} + +// Upsert writes documents and their embeddings to the database. +func (s *Store) Upsert(ctx context.Context, docs []Document) error { + for _, d := range docs { + meta, _ := json.Marshal(d.Metadata) + _, err := s.pool.Exec(ctx, + `INSERT INTO documents (repo,path,chunk_id,content,embedding,metadata) + VALUES ($1,$2,$3,$4,$5,$6) + ON CONFLICT (repo,path,chunk_id) DO UPDATE + SET content=EXCLUDED.content, + embedding=EXCLUDED.embedding, + metadata=EXCLUDED.metadata`, + d.Repo, d.Path, d.ChunkID, d.Content, d.Embedding, meta, + ) + if err != nil { + return err + } + } + return nil +} + +// Search returns top similar documents ordered by cosine distance. +func (s *Store) Search(ctx context.Context, vec []float32, limit int) ([]Document, error) { + rows, err := s.pool.Query(ctx, + `SELECT repo,path,chunk_id,content,metadata + FROM documents + ORDER BY embedding <-> $1 + LIMIT $2`, vec, limit, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + var res []Document + for rows.Next() { + var d Document + var meta []byte + if err := rows.Scan(&d.Repo, &d.Path, &d.ChunkID, &d.Content, &meta); err != nil { + return nil, err + } + json.Unmarshal(meta, &d.Metadata) + res = append(res, d) + } + return res, rows.Err() +} diff --git a/server/rag/sync/sync.go b/server/rag/sync/sync.go new file mode 100644 index 0000000..8adf6e1 --- /dev/null +++ b/server/rag/sync/sync.go @@ -0,0 +1,45 @@ +package sync + +import ( + "io/fs" + "path/filepath" + + git "github.com/go-git/go-git/v5" + "xcontrol/server/rag/config" +) + +// Repo synchronizes the configured repository and returns markdown file paths. +func Repo(c config.Repo) ([]string, error) { + if _, err := git.PlainOpen(c.Local); err != nil { + if _, err := git.PlainClone(c.Local, false, &git.CloneOptions{URL: c.URL}); err != nil { + return nil, err + } + } else { + r, err := git.PlainOpen(c.Local) + if err != nil { + return nil, err + } + w, err := r.Worktree() + if err != nil { + return nil, err + } + _ = w.Pull(&git.PullOptions{RemoteName: "origin"}) + } + var files []string + for _, p := range c.Paths { + root := filepath.Join(c.Local, p) + filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return nil + } + if d.IsDir() { + return nil + } + if filepath.Ext(path) == ".md" { + files = append(files, path) + } + return nil + }) + } + return files, nil +}