diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ee8337..d1f26af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ per result). Output payload for `--full` queries is now proportional to total document size. - macOS Metal: `qmd query --json` now flushes successful JSON output and uses a safe immediate-exit path on Darwin to avoid ggml Metal finalizer aborts; other commands still dispose LLM contexts/models before the llama runtime. #368 +- Embedding: require complete chunk coverage before treating a document as + embedded, remove partial vectors when chunk/session failures leave a + document incomplete, and keep `qmd status` pending counts honest after + interrupted long embed runs. #637 #378 - Embedding: `qmd embed -c ` now scopes pending-doc selection to the requested collection instead of embedding global pending work. Scoped `--force` clears only collection-owned vectors, preserves shared diff --git a/src/cli/qmd.ts b/src/cli/qmd.ts index fda6d8e..2ff3796 100755 --- a/src/cli/qmd.ts +++ b/src/cli/qmd.ts @@ -1806,7 +1806,7 @@ async function vectorIndex( } // Check if there's work to do before starting - const hashesToEmbed = getHashesNeedingEmbedding(db, batchOptions?.collection); + const hashesToEmbed = getHashesNeedingEmbedding(db, batchOptions?.collection, model); if (hashesToEmbed === 0 && !force) { console.log(`${c.green}✓ All content hashes already have embeddings.${c.reset}`); closeDb(); diff --git a/src/store.ts b/src/store.ts index 003feca..5323245 100644 --- a/src/store.ts +++ b/src/store.ts @@ -871,10 +871,15 @@ function initializeDatabase(db: Database): void { seq INTEGER NOT NULL DEFAULT 0, pos INTEGER NOT NULL DEFAULT 0, model TEXT NOT NULL, + total_chunks INTEGER NOT NULL DEFAULT 1, embedded_at TEXT NOT NULL, PRIMARY KEY (hash, seq) ) `); + const cvInfoAfterCreate = db.prepare(`PRAGMA table_info(content_vectors)`).all() as { name: string }[]; + if (!cvInfoAfterCreate.some(col => col.name === 'total_chunks')) { + db.exec(`ALTER TABLE content_vectors ADD COLUMN total_chunks INTEGER NOT NULL DEFAULT 1`); + } // Store collections — makes the DB self-contained (no external config needed) db.exec(` @@ -1167,9 +1172,9 @@ export type Store = { ensureVecTable: (dimensions: number) => void; // Index health - getHashesNeedingEmbedding: () => number; - getIndexHealth: () => IndexHealthInfo; - getStatus: () => IndexStatus; + getHashesNeedingEmbedding: (model?: string) => number; + getIndexHealth: (model?: string) => IndexHealthInfo; + getStatus: (model?: string) => IndexStatus; // Caching getCacheKey: typeof getCacheKey; @@ -1229,7 +1234,7 @@ export type Store = { // Vector/embedding operations getHashesForEmbedding: () => { hash: string; body: string; path: string }[]; clearAllEmbeddings: () => void; - insertEmbedding: (hash: string, seq: number, pos: number, embedding: Float32Array, model: string, embeddedAt: string) => void; + insertEmbedding: (hash: string, seq: number, pos: number, embedding: Float32Array, model: string, embeddedAt: string, totalChunks?: number) => void; }; // ============================================================================= @@ -1420,18 +1425,31 @@ function resolveEmbedOptions(options?: EmbedOptions): Required col.name === 'total_chunks') ? 'MAX(total_chunks)' : '1'; +} + +function getPendingEmbeddingDocs(db: Database, collection?: string, model: string = DEFAULT_EMBED_MODEL): PendingEmbeddingDoc[] { const collectionFilter = collection ? `AND d.collection = ?` : ``; + const expectedChunksExpr = contentVectorExpectedChunksExpr(db); const stmt = db.prepare(` SELECT d.hash, MIN(d.path) as path, length(CAST(c.doc AS BLOB)) as bytes FROM documents d JOIN content c ON d.hash = c.hash - LEFT JOIN content_vectors v ON d.hash = v.hash AND v.seq = 0 - WHERE d.active = 1 AND v.hash IS NULL ${collectionFilter} + LEFT JOIN ( + SELECT hash, model, COUNT(*) AS chunk_count, ${expectedChunksExpr} AS expected_chunks + FROM content_vectors + WHERE model = ? + GROUP BY hash, model + ) v ON d.hash = v.hash + WHERE d.active = 1 + AND (v.hash IS NULL OR v.chunk_count < v.expected_chunks) + ${collectionFilter} GROUP BY d.hash ORDER BY MIN(d.path) `); - return (collection ? stmt.all(collection) : stmt.all()) as PendingEmbeddingDoc[]; + return (collection ? stmt.all(model, collection) : stmt.all(model)) as PendingEmbeddingDoc[]; } function buildEmbeddingBatches( @@ -1502,7 +1520,7 @@ export async function generateEmbeddings( clearAllEmbeddings(db, options?.collection); } - const docsToEmbed = getPendingEmbeddingDocs(db, options?.collection); + const docsToEmbed = getPendingEmbeddingDocs(db, options?.collection, model); if (docsToEmbed.length === 0) { return { docsProcessed: 0, chunksEmbedded: 0, errors: 0, durationMs: 0 }; @@ -1533,6 +1551,7 @@ export async function generateEmbeddings( const batchDocs = getEmbeddingDocsForBatch(db, batchMeta); const batchChunks: ChunkItem[] = []; + const expectedChunksByHash = new Map(); const batchBytes = batchMeta.reduce((sum, doc) => sum + Math.max(0, doc.bytes), 0); for (const doc of batchDocs) { @@ -1558,6 +1577,7 @@ export async function generateEmbeddings( bytes: encoder.encode(chunks[seq]!.text).length, }); } + expectedChunksByHash.set(doc.hash, chunks.length); } totalChunks += batchChunks.length; @@ -1610,7 +1630,7 @@ export async function generateEmbeddings( const chunk = chunkBatch[i]!; const embedding = embeddings[i]; if (embedding) { - insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(embedding.embedding), model, now); + insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(embedding.embedding), model, now, expectedChunksByHash.get(chunk.hash) ?? 1); chunksEmbedded++; } else { errors++; @@ -1629,7 +1649,7 @@ export async function generateEmbeddings( const text = formatDocForEmbedding(chunk.text, chunk.title, embedModelUri); const result = await session.embed(text, { model }); if (result) { - insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(result.embedding), model, now); + insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(result.embedding), model, now, expectedChunksByHash.get(chunk.hash) ?? 1); chunksEmbedded++; } else { errors++; @@ -1654,6 +1674,11 @@ export async function generateEmbeddings( }); } + const removedPartialChunks = removeIncompleteEmbeddings(db, expectedChunksByHash, model); + if (removedPartialChunks > 0) { + chunksEmbedded = Math.max(0, chunksEmbedded - removedPartialChunks); + } + bytesProcessed += batchBytes; options?.onProgress?.({ chunksEmbedded, totalChunks, bytesProcessed, totalBytes, errors }); } @@ -1688,9 +1713,9 @@ export function createStore(dbPath?: string): Store { ensureVecTable: (dimensions: number) => ensureVecTableInternal(db, dimensions), // Index health - getHashesNeedingEmbedding: () => getHashesNeedingEmbedding(db), - getIndexHealth: () => getIndexHealth(db), - getStatus: () => getStatus(db), + getHashesNeedingEmbedding: (model?: string) => getHashesNeedingEmbedding(db, undefined, model ?? store.llm?.embedModelName ?? DEFAULT_EMBED_MODEL), + getIndexHealth: (model?: string) => getIndexHealth(db, model ?? store.llm?.embedModelName ?? DEFAULT_EMBED_MODEL), + getStatus: (model?: string) => getStatus(db, model ?? store.llm?.embedModelName ?? DEFAULT_EMBED_MODEL), // Caching getCacheKey, @@ -1750,7 +1775,7 @@ export function createStore(dbPath?: string): Store { // Vector/embedding operations getHashesForEmbedding: () => getHashesForEmbedding(db), clearAllEmbeddings: () => clearAllEmbeddings(db), - insertEmbedding: (hash: string, seq: number, pos: number, embedding: Float32Array, model: string, embeddedAt: string) => insertEmbedding(db, hash, seq, pos, embedding, model, embeddedAt), + insertEmbedding: (hash: string, seq: number, pos: number, embedding: Float32Array, model: string, embeddedAt: string, totalChunks?: number) => insertEmbedding(db, hash, seq, pos, embedding, model, embeddedAt, totalChunks), }; return store; @@ -1949,15 +1974,23 @@ export type IndexStatus = { // Index health // ============================================================================= -export function getHashesNeedingEmbedding(db: Database, collection?: string): number { +export function getHashesNeedingEmbedding(db: Database, collection?: string, model: string = DEFAULT_EMBED_MODEL): number { const collectionFilter = collection ? `AND d.collection = ?` : ``; + const expectedChunksExpr = contentVectorExpectedChunksExpr(db); const stmt = db.prepare(` SELECT COUNT(DISTINCT d.hash) as count FROM documents d - LEFT JOIN content_vectors v ON d.hash = v.hash AND v.seq = 0 - WHERE d.active = 1 AND v.hash IS NULL ${collectionFilter} + LEFT JOIN ( + SELECT hash, model, COUNT(*) AS chunk_count, ${expectedChunksExpr} AS expected_chunks + FROM content_vectors + WHERE model = ? + GROUP BY hash, model + ) v ON d.hash = v.hash + WHERE d.active = 1 + AND (v.hash IS NULL OR v.chunk_count < v.expected_chunks) + ${collectionFilter} `); - const result = (collection ? stmt.get(collection) : stmt.get()) as { count: number }; + const result = (collection ? stmt.get(model, collection) : stmt.get(model)) as { count: number }; return result.count; } @@ -1967,8 +2000,8 @@ export type IndexHealthInfo = { daysStale: number | null; }; -export function getIndexHealth(db: Database): IndexHealthInfo { - const needsEmbedding = getHashesNeedingEmbedding(db); +export function getIndexHealth(db: Database, model: string = DEFAULT_EMBED_MODEL): IndexHealthInfo { + const needsEmbedding = getHashesNeedingEmbedding(db, undefined, model); const totalDocs = (db.prepare(`SELECT COUNT(*) as count FROM documents WHERE active = 1`).get() as { count: number }).count; const mostRecent = db.prepare(`SELECT MAX(modified_at) as latest FROM documents WHERE active = 1`).get() as { latest: string | null }; @@ -3316,15 +3349,22 @@ async function getEmbedding(text: string, model: string, isQuery: boolean, sessi * Get all unique content hashes that need embeddings (from active documents). * Returns hash, document body, and a sample path for display purposes. */ -export function getHashesForEmbedding(db: Database): { hash: string; body: string; path: string }[] { +export function getHashesForEmbedding(db: Database, model: string = DEFAULT_EMBED_MODEL): { hash: string; body: string; path: string }[] { + const expectedChunksExpr = contentVectorExpectedChunksExpr(db); return db.prepare(` SELECT d.hash, c.doc as body, MIN(d.path) as path FROM documents d JOIN content c ON d.hash = c.hash - LEFT JOIN content_vectors v ON d.hash = v.hash AND v.seq = 0 - WHERE d.active = 1 AND v.hash IS NULL + LEFT JOIN ( + SELECT hash, model, COUNT(*) AS chunk_count, ${expectedChunksExpr} AS expected_chunks + FROM content_vectors + WHERE model = ? + GROUP BY hash, model + ) v ON d.hash = v.hash + WHERE d.active = 1 + AND (v.hash IS NULL OR v.chunk_count < v.expected_chunks) GROUP BY d.hash - `).all() as { hash: string; body: string; path: string }[]; + `).all(model) as { hash: string; body: string; path: string }[]; } /** @@ -3409,13 +3449,14 @@ export function insertEmbedding( pos: number, embedding: Float32Array, model: string, - embeddedAt: string + embeddedAt: string, + totalChunks: number = 1 ): void { const hashSeq = `${hash}_${seq}`; // Insert content_vectors first — crash-safe ordering (see getHashesForEmbedding) - const insertContentVectorStmt = db.prepare(`INSERT OR REPLACE INTO content_vectors (hash, seq, pos, model, embedded_at) VALUES (?, ?, ?, ?, ?)`); - insertContentVectorStmt.run(hash, seq, pos, model, embeddedAt); + const insertContentVectorStmt = db.prepare(`INSERT OR REPLACE INTO content_vectors (hash, seq, pos, model, total_chunks, embedded_at) VALUES (?, ?, ?, ?, ?, ?)`); + insertContentVectorStmt.run(hash, seq, pos, model, totalChunks, embeddedAt); // vec0 virtual tables don't support OR REPLACE — use DELETE + INSERT const deleteVecStmt = db.prepare(`DELETE FROM vectors_vec WHERE hash_seq = ?`); @@ -3424,6 +3465,26 @@ export function insertEmbedding( insertVecStmt.run(hashSeq, embedding); } +function removeIncompleteEmbeddings(db: Database, expectedChunksByHash: Map, model: string): number { + let removed = 0; + const rowsStmt = db.prepare(`SELECT seq FROM content_vectors WHERE hash = ? AND model = ?`); + const deleteContentStmt = db.prepare(`DELETE FROM content_vectors WHERE hash = ? AND model = ?`); + const deleteVecStmt = db.prepare(`DELETE FROM vectors_vec WHERE hash_seq = ?`); + + for (const [hash, expectedChunks] of expectedChunksByHash) { + const rows = rowsStmt.all(hash, model) as { seq: number }[]; + if (rows.length === 0 || rows.length === expectedChunks) continue; + + for (const row of rows) { + deleteVecStmt.run(`${hash}_${row.seq}`); + } + deleteContentStmt.run(hash, model); + removed += rows.length; + } + + return removed; +} + // ============================================================================= // Query expansion // ============================================================================= @@ -3922,7 +3983,7 @@ export function findDocuments( // Status // ============================================================================= -export function getStatus(db: Database): IndexStatus { +export function getStatus(db: Database, model: string = DEFAULT_EMBED_MODEL): IndexStatus { // DB is source of truth for collections — config provides supplementary metadata const dbCollections = db.prepare(` SELECT @@ -3957,7 +4018,7 @@ export function getStatus(db: Database): IndexStatus { }); const totalDocs = (db.prepare(`SELECT COUNT(*) as c FROM documents WHERE active = 1`).get() as { c: number }).c; - const needsEmbedding = getHashesNeedingEmbedding(db); + const needsEmbedding = getHashesNeedingEmbedding(db, undefined, model); const hasVectors = !!db.prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name='vectors_vec'`).get(); return { diff --git a/test/store.test.ts b/test/store.test.ts index 2adf717..9f132f8 100644 --- a/test/store.test.ts +++ b/test/store.test.ts @@ -2281,6 +2281,26 @@ describe("Index Status", () => { await cleanupTestDb(store); }); + test("embedding health is scoped to the active embed model", async () => { + const store = await createTestStore(); + const collectionName = await createTestCollection(); + const activeModel = "hf:active/embed-model.gguf"; + const staleModel = "hf:stale/embed-model.gguf"; + const now = new Date().toISOString(); + + store.llm = { embedModelName: activeModel } as any; + store.ensureVecTable(3); + await insertTestDocument(store.db, collectionName, { name: "doc1", hash: "hash1" }); + store.insertEmbedding("hash1", 0, 0, new Float32Array([1, 2, 3]), staleModel, now, 1); + + expect(store.getHashesNeedingEmbedding()).toBe(1); + expect(store.getStatus().needsEmbedding).toBe(1); + expect(store.getIndexHealth().needsEmbedding).toBe(1); + expect(store.getHashesNeedingEmbedding(staleModel)).toBe(0); + + await cleanupTestDb(store); + }); + test("getIndexHealth returns health info", async () => { const store = await createTestStore(); const collectionName = await createTestCollection(); @@ -3093,6 +3113,68 @@ describe("Embedding batching", () => { } }); + test("generateEmbeddings does not mark a partially embedded multi-chunk document complete", async () => { + const store = await createTestStore(); + const db = store.db; + const fakeLlm = { + async embed(_text: string, _options?: { model?: string }) { + return { embedding: [0.1, 0.2, 0.3], model: "fake-embed" }; + }, + async embedBatch(texts: string[], _options?: { model?: string }) { + return texts.map((_text, index) => index === 0 + ? { embedding: [1, 2, 3], model: "fake-embed" } + : null + ); + }, + }; + + setDefaultLlamaCpp(createFakeTokenizer() as any); + store.llm = fakeLlm as any; + + try { + await insertTestDocument(db, "docs", { + name: "long-doc", + body: "# Long doc\n\n" + "partial embedding regression ".repeat(260), + }); + + const result = await generateEmbeddings(store); + + expect(result.errors).toBeGreaterThan(0); + expect(db.prepare(`SELECT COUNT(*) as count FROM content_vectors`).get()).toEqual({ count: 0 }); + expect(db.prepare(`SELECT COUNT(*) as count FROM vectors_vec`).get()).toEqual({ count: 0 }); + expect(store.getHashesNeedingEmbedding()).toBe(1); + expect(store.getStatus().needsEmbedding).toBe(1); + } finally { + setDefaultLlamaCpp(null); + await cleanupTestDb(store); + } + }); + + test("generateEmbeddings opens a long-lived LLM session for embed runs", async () => { + const store = await createTestStore(); + const fakeLlm = createFakeEmbedLlm(); + const sessionSpy = vi.spyOn(llmModule, "withLLMSessionForLlm"); + + setDefaultLlamaCpp(createFakeTokenizer() as any); + store.llm = fakeLlm as any; + + try { + await insertTestDocument(store.db, "docs", { name: "one", body: "# One\n\nAlpha" }); + + await generateEmbeddings(store); + + expect(sessionSpy).toHaveBeenCalledWith( + fakeLlm, + expect.any(Function), + expect.objectContaining({ maxDuration: 30 * 60 * 1000, name: "generateEmbeddings" }), + ); + } finally { + sessionSpy.mockRestore(); + setDefaultLlamaCpp(null); + await cleanupTestDb(store); + } + }); + test("vectorSearchQuery uses the active llm embed model for vector lookups", async () => { const store = await createTestStore(); const model = "hf:Qwen/Qwen3-Embedding-0.6B-GGUF/Qwen3-Embedding-0.6B-Q8_0.gguf";