Merge pull request #654 from tobi/workoff/t_657414dc-dev-review

fix: keep partial embeddings pending
This commit is contained in:
Tobias Lütke 2026-05-16 13:52:49 -04:00 committed by GitHub
commit 87520252a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 178 additions and 31 deletions

View File

@ -17,6 +17,10 @@
per result). Output payload for `--full` queries is now proportional
to total document size.
- macOS Metal: `qmd query --json` now flushes successful JSON output and uses a safe immediate-exit path on Darwin to avoid ggml Metal finalizer aborts; other commands still dispose LLM contexts/models before the llama runtime. #368
- Embedding: require complete chunk coverage before treating a document as
embedded, remove partial vectors when chunk/session failures leave a
document incomplete, and keep `qmd status` pending counts honest after
interrupted long embed runs. #637 #378
- Embedding: `qmd embed -c <collection>` now scopes pending-doc selection
to the requested collection instead of embedding global pending work.
Scoped `--force` clears only collection-owned vectors, preserves shared

View File

@ -1806,7 +1806,7 @@ async function vectorIndex(
}
// Check if there's work to do before starting
const hashesToEmbed = getHashesNeedingEmbedding(db, batchOptions?.collection);
const hashesToEmbed = getHashesNeedingEmbedding(db, batchOptions?.collection, model);
if (hashesToEmbed === 0 && !force) {
console.log(`${c.green}✓ All content hashes already have embeddings.${c.reset}`);
closeDb();

View File

@ -871,10 +871,15 @@ function initializeDatabase(db: Database): void {
seq INTEGER NOT NULL DEFAULT 0,
pos INTEGER NOT NULL DEFAULT 0,
model TEXT NOT NULL,
total_chunks INTEGER NOT NULL DEFAULT 1,
embedded_at TEXT NOT NULL,
PRIMARY KEY (hash, seq)
)
`);
const cvInfoAfterCreate = db.prepare(`PRAGMA table_info(content_vectors)`).all() as { name: string }[];
if (!cvInfoAfterCreate.some(col => col.name === 'total_chunks')) {
db.exec(`ALTER TABLE content_vectors ADD COLUMN total_chunks INTEGER NOT NULL DEFAULT 1`);
}
// Store collections — makes the DB self-contained (no external config needed)
db.exec(`
@ -1167,9 +1172,9 @@ export type Store = {
ensureVecTable: (dimensions: number) => void;
// Index health
getHashesNeedingEmbedding: () => number;
getIndexHealth: () => IndexHealthInfo;
getStatus: () => IndexStatus;
getHashesNeedingEmbedding: (model?: string) => number;
getIndexHealth: (model?: string) => IndexHealthInfo;
getStatus: (model?: string) => IndexStatus;
// Caching
getCacheKey: typeof getCacheKey;
@ -1229,7 +1234,7 @@ export type Store = {
// Vector/embedding operations
getHashesForEmbedding: () => { hash: string; body: string; path: string }[];
clearAllEmbeddings: () => void;
insertEmbedding: (hash: string, seq: number, pos: number, embedding: Float32Array, model: string, embeddedAt: string) => void;
insertEmbedding: (hash: string, seq: number, pos: number, embedding: Float32Array, model: string, embeddedAt: string, totalChunks?: number) => void;
};
// =============================================================================
@ -1420,18 +1425,31 @@ function resolveEmbedOptions(options?: EmbedOptions): Required<Pick<EmbedOptions
};
}
function getPendingEmbeddingDocs(db: Database, collection?: string): PendingEmbeddingDoc[] {
function contentVectorExpectedChunksExpr(db: Database): string {
const columns = db.prepare(`PRAGMA table_info(content_vectors)`).all() as { name: string }[];
return columns.some(col => col.name === 'total_chunks') ? 'MAX(total_chunks)' : '1';
}
function getPendingEmbeddingDocs(db: Database, collection?: string, model: string = DEFAULT_EMBED_MODEL): PendingEmbeddingDoc[] {
const collectionFilter = collection ? `AND d.collection = ?` : ``;
const expectedChunksExpr = contentVectorExpectedChunksExpr(db);
const stmt = db.prepare(`
SELECT d.hash, MIN(d.path) as path, length(CAST(c.doc AS BLOB)) as bytes
FROM documents d
JOIN content c ON d.hash = c.hash
LEFT JOIN content_vectors v ON d.hash = v.hash AND v.seq = 0
WHERE d.active = 1 AND v.hash IS NULL ${collectionFilter}
LEFT JOIN (
SELECT hash, model, COUNT(*) AS chunk_count, ${expectedChunksExpr} AS expected_chunks
FROM content_vectors
WHERE model = ?
GROUP BY hash, model
) v ON d.hash = v.hash
WHERE d.active = 1
AND (v.hash IS NULL OR v.chunk_count < v.expected_chunks)
${collectionFilter}
GROUP BY d.hash
ORDER BY MIN(d.path)
`);
return (collection ? stmt.all(collection) : stmt.all()) as PendingEmbeddingDoc[];
return (collection ? stmt.all(model, collection) : stmt.all(model)) as PendingEmbeddingDoc[];
}
function buildEmbeddingBatches(
@ -1502,7 +1520,7 @@ export async function generateEmbeddings(
clearAllEmbeddings(db, options?.collection);
}
const docsToEmbed = getPendingEmbeddingDocs(db, options?.collection);
const docsToEmbed = getPendingEmbeddingDocs(db, options?.collection, model);
if (docsToEmbed.length === 0) {
return { docsProcessed: 0, chunksEmbedded: 0, errors: 0, durationMs: 0 };
@ -1533,6 +1551,7 @@ export async function generateEmbeddings(
const batchDocs = getEmbeddingDocsForBatch(db, batchMeta);
const batchChunks: ChunkItem[] = [];
const expectedChunksByHash = new Map<string, number>();
const batchBytes = batchMeta.reduce((sum, doc) => sum + Math.max(0, doc.bytes), 0);
for (const doc of batchDocs) {
@ -1558,6 +1577,7 @@ export async function generateEmbeddings(
bytes: encoder.encode(chunks[seq]!.text).length,
});
}
expectedChunksByHash.set(doc.hash, chunks.length);
}
totalChunks += batchChunks.length;
@ -1610,7 +1630,7 @@ export async function generateEmbeddings(
const chunk = chunkBatch[i]!;
const embedding = embeddings[i];
if (embedding) {
insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(embedding.embedding), model, now);
insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(embedding.embedding), model, now, expectedChunksByHash.get(chunk.hash) ?? 1);
chunksEmbedded++;
} else {
errors++;
@ -1629,7 +1649,7 @@ export async function generateEmbeddings(
const text = formatDocForEmbedding(chunk.text, chunk.title, embedModelUri);
const result = await session.embed(text, { model });
if (result) {
insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(result.embedding), model, now);
insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(result.embedding), model, now, expectedChunksByHash.get(chunk.hash) ?? 1);
chunksEmbedded++;
} else {
errors++;
@ -1654,6 +1674,11 @@ export async function generateEmbeddings(
});
}
const removedPartialChunks = removeIncompleteEmbeddings(db, expectedChunksByHash, model);
if (removedPartialChunks > 0) {
chunksEmbedded = Math.max(0, chunksEmbedded - removedPartialChunks);
}
bytesProcessed += batchBytes;
options?.onProgress?.({ chunksEmbedded, totalChunks, bytesProcessed, totalBytes, errors });
}
@ -1688,9 +1713,9 @@ export function createStore(dbPath?: string): Store {
ensureVecTable: (dimensions: number) => ensureVecTableInternal(db, dimensions),
// Index health
getHashesNeedingEmbedding: () => getHashesNeedingEmbedding(db),
getIndexHealth: () => getIndexHealth(db),
getStatus: () => getStatus(db),
getHashesNeedingEmbedding: (model?: string) => getHashesNeedingEmbedding(db, undefined, model ?? store.llm?.embedModelName ?? DEFAULT_EMBED_MODEL),
getIndexHealth: (model?: string) => getIndexHealth(db, model ?? store.llm?.embedModelName ?? DEFAULT_EMBED_MODEL),
getStatus: (model?: string) => getStatus(db, model ?? store.llm?.embedModelName ?? DEFAULT_EMBED_MODEL),
// Caching
getCacheKey,
@ -1750,7 +1775,7 @@ export function createStore(dbPath?: string): Store {
// Vector/embedding operations
getHashesForEmbedding: () => getHashesForEmbedding(db),
clearAllEmbeddings: () => clearAllEmbeddings(db),
insertEmbedding: (hash: string, seq: number, pos: number, embedding: Float32Array, model: string, embeddedAt: string) => insertEmbedding(db, hash, seq, pos, embedding, model, embeddedAt),
insertEmbedding: (hash: string, seq: number, pos: number, embedding: Float32Array, model: string, embeddedAt: string, totalChunks?: number) => insertEmbedding(db, hash, seq, pos, embedding, model, embeddedAt, totalChunks),
};
return store;
@ -1949,15 +1974,23 @@ export type IndexStatus = {
// Index health
// =============================================================================
export function getHashesNeedingEmbedding(db: Database, collection?: string): number {
export function getHashesNeedingEmbedding(db: Database, collection?: string, model: string = DEFAULT_EMBED_MODEL): number {
const collectionFilter = collection ? `AND d.collection = ?` : ``;
const expectedChunksExpr = contentVectorExpectedChunksExpr(db);
const stmt = db.prepare(`
SELECT COUNT(DISTINCT d.hash) as count
FROM documents d
LEFT JOIN content_vectors v ON d.hash = v.hash AND v.seq = 0
WHERE d.active = 1 AND v.hash IS NULL ${collectionFilter}
LEFT JOIN (
SELECT hash, model, COUNT(*) AS chunk_count, ${expectedChunksExpr} AS expected_chunks
FROM content_vectors
WHERE model = ?
GROUP BY hash, model
) v ON d.hash = v.hash
WHERE d.active = 1
AND (v.hash IS NULL OR v.chunk_count < v.expected_chunks)
${collectionFilter}
`);
const result = (collection ? stmt.get(collection) : stmt.get()) as { count: number };
const result = (collection ? stmt.get(model, collection) : stmt.get(model)) as { count: number };
return result.count;
}
@ -1967,8 +2000,8 @@ export type IndexHealthInfo = {
daysStale: number | null;
};
export function getIndexHealth(db: Database): IndexHealthInfo {
const needsEmbedding = getHashesNeedingEmbedding(db);
export function getIndexHealth(db: Database, model: string = DEFAULT_EMBED_MODEL): IndexHealthInfo {
const needsEmbedding = getHashesNeedingEmbedding(db, undefined, model);
const totalDocs = (db.prepare(`SELECT COUNT(*) as count FROM documents WHERE active = 1`).get() as { count: number }).count;
const mostRecent = db.prepare(`SELECT MAX(modified_at) as latest FROM documents WHERE active = 1`).get() as { latest: string | null };
@ -3316,15 +3349,22 @@ async function getEmbedding(text: string, model: string, isQuery: boolean, sessi
* Get all unique content hashes that need embeddings (from active documents).
* Returns hash, document body, and a sample path for display purposes.
*/
export function getHashesForEmbedding(db: Database): { hash: string; body: string; path: string }[] {
export function getHashesForEmbedding(db: Database, model: string = DEFAULT_EMBED_MODEL): { hash: string; body: string; path: string }[] {
const expectedChunksExpr = contentVectorExpectedChunksExpr(db);
return db.prepare(`
SELECT d.hash, c.doc as body, MIN(d.path) as path
FROM documents d
JOIN content c ON d.hash = c.hash
LEFT JOIN content_vectors v ON d.hash = v.hash AND v.seq = 0
WHERE d.active = 1 AND v.hash IS NULL
LEFT JOIN (
SELECT hash, model, COUNT(*) AS chunk_count, ${expectedChunksExpr} AS expected_chunks
FROM content_vectors
WHERE model = ?
GROUP BY hash, model
) v ON d.hash = v.hash
WHERE d.active = 1
AND (v.hash IS NULL OR v.chunk_count < v.expected_chunks)
GROUP BY d.hash
`).all() as { hash: string; body: string; path: string }[];
`).all(model) as { hash: string; body: string; path: string }[];
}
/**
@ -3409,13 +3449,14 @@ export function insertEmbedding(
pos: number,
embedding: Float32Array,
model: string,
embeddedAt: string
embeddedAt: string,
totalChunks: number = 1
): void {
const hashSeq = `${hash}_${seq}`;
// Insert content_vectors first — crash-safe ordering (see getHashesForEmbedding)
const insertContentVectorStmt = db.prepare(`INSERT OR REPLACE INTO content_vectors (hash, seq, pos, model, embedded_at) VALUES (?, ?, ?, ?, ?)`);
insertContentVectorStmt.run(hash, seq, pos, model, embeddedAt);
const insertContentVectorStmt = db.prepare(`INSERT OR REPLACE INTO content_vectors (hash, seq, pos, model, total_chunks, embedded_at) VALUES (?, ?, ?, ?, ?, ?)`);
insertContentVectorStmt.run(hash, seq, pos, model, totalChunks, embeddedAt);
// vec0 virtual tables don't support OR REPLACE — use DELETE + INSERT
const deleteVecStmt = db.prepare(`DELETE FROM vectors_vec WHERE hash_seq = ?`);
@ -3424,6 +3465,26 @@ export function insertEmbedding(
insertVecStmt.run(hashSeq, embedding);
}
function removeIncompleteEmbeddings(db: Database, expectedChunksByHash: Map<string, number>, model: string): number {
let removed = 0;
const rowsStmt = db.prepare(`SELECT seq FROM content_vectors WHERE hash = ? AND model = ?`);
const deleteContentStmt = db.prepare(`DELETE FROM content_vectors WHERE hash = ? AND model = ?`);
const deleteVecStmt = db.prepare(`DELETE FROM vectors_vec WHERE hash_seq = ?`);
for (const [hash, expectedChunks] of expectedChunksByHash) {
const rows = rowsStmt.all(hash, model) as { seq: number }[];
if (rows.length === 0 || rows.length === expectedChunks) continue;
for (const row of rows) {
deleteVecStmt.run(`${hash}_${row.seq}`);
}
deleteContentStmt.run(hash, model);
removed += rows.length;
}
return removed;
}
// =============================================================================
// Query expansion
// =============================================================================
@ -3922,7 +3983,7 @@ export function findDocuments(
// Status
// =============================================================================
export function getStatus(db: Database): IndexStatus {
export function getStatus(db: Database, model: string = DEFAULT_EMBED_MODEL): IndexStatus {
// DB is source of truth for collections — config provides supplementary metadata
const dbCollections = db.prepare(`
SELECT
@ -3957,7 +4018,7 @@ export function getStatus(db: Database): IndexStatus {
});
const totalDocs = (db.prepare(`SELECT COUNT(*) as c FROM documents WHERE active = 1`).get() as { c: number }).c;
const needsEmbedding = getHashesNeedingEmbedding(db);
const needsEmbedding = getHashesNeedingEmbedding(db, undefined, model);
const hasVectors = !!db.prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name='vectors_vec'`).get();
return {

View File

@ -2281,6 +2281,26 @@ describe("Index Status", () => {
await cleanupTestDb(store);
});
test("embedding health is scoped to the active embed model", async () => {
const store = await createTestStore();
const collectionName = await createTestCollection();
const activeModel = "hf:active/embed-model.gguf";
const staleModel = "hf:stale/embed-model.gguf";
const now = new Date().toISOString();
store.llm = { embedModelName: activeModel } as any;
store.ensureVecTable(3);
await insertTestDocument(store.db, collectionName, { name: "doc1", hash: "hash1" });
store.insertEmbedding("hash1", 0, 0, new Float32Array([1, 2, 3]), staleModel, now, 1);
expect(store.getHashesNeedingEmbedding()).toBe(1);
expect(store.getStatus().needsEmbedding).toBe(1);
expect(store.getIndexHealth().needsEmbedding).toBe(1);
expect(store.getHashesNeedingEmbedding(staleModel)).toBe(0);
await cleanupTestDb(store);
});
test("getIndexHealth returns health info", async () => {
const store = await createTestStore();
const collectionName = await createTestCollection();
@ -3093,6 +3113,68 @@ describe("Embedding batching", () => {
}
});
test("generateEmbeddings does not mark a partially embedded multi-chunk document complete", async () => {
const store = await createTestStore();
const db = store.db;
const fakeLlm = {
async embed(_text: string, _options?: { model?: string }) {
return { embedding: [0.1, 0.2, 0.3], model: "fake-embed" };
},
async embedBatch(texts: string[], _options?: { model?: string }) {
return texts.map((_text, index) => index === 0
? { embedding: [1, 2, 3], model: "fake-embed" }
: null
);
},
};
setDefaultLlamaCpp(createFakeTokenizer() as any);
store.llm = fakeLlm as any;
try {
await insertTestDocument(db, "docs", {
name: "long-doc",
body: "# Long doc\n\n" + "partial embedding regression ".repeat(260),
});
const result = await generateEmbeddings(store);
expect(result.errors).toBeGreaterThan(0);
expect(db.prepare(`SELECT COUNT(*) as count FROM content_vectors`).get()).toEqual({ count: 0 });
expect(db.prepare(`SELECT COUNT(*) as count FROM vectors_vec`).get()).toEqual({ count: 0 });
expect(store.getHashesNeedingEmbedding()).toBe(1);
expect(store.getStatus().needsEmbedding).toBe(1);
} finally {
setDefaultLlamaCpp(null);
await cleanupTestDb(store);
}
});
test("generateEmbeddings opens a long-lived LLM session for embed runs", async () => {
const store = await createTestStore();
const fakeLlm = createFakeEmbedLlm();
const sessionSpy = vi.spyOn(llmModule, "withLLMSessionForLlm");
setDefaultLlamaCpp(createFakeTokenizer() as any);
store.llm = fakeLlm as any;
try {
await insertTestDocument(store.db, "docs", { name: "one", body: "# One\n\nAlpha" });
await generateEmbeddings(store);
expect(sessionSpy).toHaveBeenCalledWith(
fakeLlm,
expect.any(Function),
expect.objectContaining({ maxDuration: 30 * 60 * 1000, name: "generateEmbeddings" }),
);
} finally {
sessionSpy.mockRestore();
setDefaultLlamaCpp(null);
await cleanupTestDb(store);
}
});
test("vectorSearchQuery uses the active llm embed model for vector lookups", async () => {
const store = await createTestStore();
const model = "hf:Qwen/Qwen3-Embedding-0.6B-GGUF/Qwen3-Embedding-0.6B-Q8_0.gguf";