1use arrow_array::{
4 Array, ArrayRef, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator,
5 StringArray, UInt8Array, UInt32Array, UInt64Array,
6};
7use arrow_schema::{DataType, Field, Schema};
8use async_trait::async_trait;
9use chrono::Utc;
10use futures::TryStreamExt;
11use lancedb::index::Index;
12use lancedb::index::scalar::{FtsIndexBuilder, FullTextSearchQuery};
13use lancedb::query::{ExecutableQuery, QueryBase, QueryExecutionOptions};
14use lancedb::{Connection, Table, connect};
15use ragfs_core::{
16 Chunk, ChunkMetadata, ContentType, FileRecord, FileStatus, SearchQuery, SearchResult,
17 StoreError, StoreStats, VectorStore,
18};
19use std::collections::HashMap;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use tokio::sync::RwLock;
23use tracing::{debug, info, warn};
24use uuid::Uuid;
25
26const CHUNKS_TABLE: &str = "chunks";
27const FILES_TABLE: &str = "files";
28
29pub struct LanceStore {
31 db_path: PathBuf,
33 embedding_dim: usize,
35 connection: RwLock<Option<Connection>>,
37 chunks_table: RwLock<Option<Table>>,
39 files_table: RwLock<Option<Table>>,
41}
42
43impl LanceStore {
44 #[must_use]
46 pub fn new(db_path: PathBuf, embedding_dim: usize) -> Self {
47 Self {
48 db_path,
49 embedding_dim,
50 connection: RwLock::new(None),
51 chunks_table: RwLock::new(None),
52 files_table: RwLock::new(None),
53 }
54 }
55
56 pub fn db_path(&self) -> &Path {
58 &self.db_path
59 }
60
61 pub fn embedding_dim(&self) -> usize {
63 self.embedding_dim
64 }
65
66 async fn get_connection(&self) -> Result<Connection, StoreError> {
68 {
69 let conn = self.connection.read().await;
70 if let Some(ref c) = *conn {
71 return Ok(c.clone());
72 }
73 }
74
75 let mut conn = self.connection.write().await;
76 if conn.is_none() {
77 let db_path_str = self.db_path.to_string_lossy().to_string();
78 let new_conn = connect(&db_path_str)
79 .execute()
80 .await
81 .map_err(|e| StoreError::Init(format!("Failed to connect to LanceDB: {e}")))?;
82 *conn = Some(new_conn);
83 }
84 Ok(conn.as_ref().unwrap().clone())
85 }
86
87 fn chunks_schema(&self) -> Schema {
89 Schema::new(vec![
90 Field::new("chunk_id", DataType::Utf8, false),
91 Field::new("file_id", DataType::Utf8, false),
92 Field::new("file_path", DataType::Utf8, false),
93 Field::new("content", DataType::Utf8, false),
94 Field::new("content_type", DataType::Utf8, false),
95 Field::new("chunk_index", DataType::UInt32, false),
96 Field::new("start_byte", DataType::UInt64, false),
97 Field::new("end_byte", DataType::UInt64, false),
98 Field::new("start_line", DataType::UInt32, true),
99 Field::new("end_line", DataType::UInt32, true),
100 Field::new("parent_chunk_id", DataType::Utf8, true),
101 Field::new("depth", DataType::UInt8, false),
102 Field::new(
103 "vector",
104 DataType::FixedSizeList(
105 Arc::new(Field::new("item", DataType::Float32, true)),
106 self.embedding_dim as i32,
107 ),
108 false,
109 ),
110 Field::new("embedding_model", DataType::Utf8, true),
111 Field::new("indexed_at", DataType::Utf8, false),
112 Field::new("file_mime_type", DataType::Utf8, true),
113 Field::new("language", DataType::Utf8, true),
114 Field::new("symbol_type", DataType::Utf8, true),
115 Field::new("symbol_name", DataType::Utf8, true),
116 ])
117 }
118
119 fn files_schema(&self) -> Schema {
121 Schema::new(vec![
122 Field::new("file_id", DataType::Utf8, false),
123 Field::new("path", DataType::Utf8, false),
124 Field::new("size_bytes", DataType::UInt64, false),
125 Field::new("mime_type", DataType::Utf8, false),
126 Field::new("content_hash", DataType::Utf8, false),
127 Field::new("modified_at", DataType::Utf8, false),
128 Field::new("indexed_at", DataType::Utf8, true),
129 Field::new("chunk_count", DataType::UInt32, false),
130 Field::new("status", DataType::Utf8, false),
131 Field::new("error_message", DataType::Utf8, true),
132 ])
133 }
134
135 async fn get_chunks_table(&self) -> Result<Table, StoreError> {
137 {
138 let table = self.chunks_table.read().await;
139 if let Some(ref t) = *table {
140 return Ok(t.clone());
141 }
142 }
143
144 let conn = self.get_connection().await?;
145 let mut table_lock = self.chunks_table.write().await;
146
147 if table_lock.is_none() {
148 let t = conn
149 .open_table(CHUNKS_TABLE)
150 .execute()
151 .await
152 .map_err(|e| StoreError::Init(format!("Failed to open chunks table: {e}")))?;
153 *table_lock = Some(t);
154 }
155
156 Ok(table_lock.as_ref().unwrap().clone())
157 }
158
159 async fn get_files_table(&self) -> Result<Table, StoreError> {
161 {
162 let table = self.files_table.read().await;
163 if let Some(ref t) = *table {
164 return Ok(t.clone());
165 }
166 }
167
168 let conn = self.get_connection().await?;
169 let mut table_lock = self.files_table.write().await;
170
171 if table_lock.is_none() {
172 let t = conn
173 .open_table(FILES_TABLE)
174 .execute()
175 .await
176 .map_err(|e| StoreError::Init(format!("Failed to open files table: {e}")))?;
177 *table_lock = Some(t);
178 }
179
180 Ok(table_lock.as_ref().unwrap().clone())
181 }
182
183 fn chunks_to_batch(&self, chunks: &[Chunk]) -> Result<RecordBatch, StoreError> {
185 let chunk_ids: Vec<_> = chunks.iter().map(|c| c.id.to_string()).collect();
186 let file_ids: Vec<_> = chunks.iter().map(|c| c.file_id.to_string()).collect();
187 let file_paths: Vec<_> = chunks
188 .iter()
189 .map(|c| c.file_path.to_string_lossy().to_string())
190 .collect();
191 let contents: Vec<_> = chunks.iter().map(|c| c.content.clone()).collect();
192 let content_types: Vec<_> = chunks
193 .iter()
194 .map(|c| content_type_to_string(&c.content_type))
195 .collect();
196 let chunk_indices: Vec<_> = chunks.iter().map(|c| c.chunk_index).collect();
197 let start_bytes: Vec<_> = chunks.iter().map(|c| c.byte_range.start).collect();
198 let end_bytes: Vec<_> = chunks.iter().map(|c| c.byte_range.end).collect();
199 let start_lines: Vec<_> = chunks
200 .iter()
201 .map(|c| c.line_range.as_ref().map(|r| r.start))
202 .collect();
203 let end_lines: Vec<_> = chunks
204 .iter()
205 .map(|c| c.line_range.as_ref().map(|r| r.end))
206 .collect();
207 let parent_ids: Vec<_> = chunks
208 .iter()
209 .map(|c| c.parent_chunk_id.map(|id| id.to_string()))
210 .collect();
211 let depths: Vec<_> = chunks.iter().map(|c| c.depth).collect();
212
213 let embeddings: Vec<Option<Vec<Option<f32>>>> = chunks
215 .iter()
216 .map(|c| {
217 c.embedding
218 .as_ref()
219 .map(|e| e.iter().map(|&v| Some(v)).collect())
220 })
221 .collect();
222
223 let embedding_models: Vec<_> = chunks
224 .iter()
225 .map(|c| c.metadata.embedding_model.clone())
226 .collect();
227 let indexed_ats: Vec<_> = chunks
228 .iter()
229 .map(|c| {
230 c.metadata
231 .indexed_at
232 .map_or_else(|| Utc::now().to_rfc3339(), |t| t.to_rfc3339())
233 })
234 .collect();
235
236 let languages: Vec<_> = chunks
238 .iter()
239 .map(|c| match &c.content_type {
240 ContentType::Code { language, .. } => Some(language.clone()),
241 _ => None,
242 })
243 .collect();
244 let symbol_types: Vec<_> = chunks
245 .iter()
246 .map(|c| match &c.content_type {
247 ContentType::Code { symbol, .. } => {
248 symbol.as_ref().map(|s| format!("{:?}", s.kind))
249 }
250 _ => None,
251 })
252 .collect();
253 let symbol_names: Vec<_> = chunks
254 .iter()
255 .map(|c| match &c.content_type {
256 ContentType::Code { symbol, .. } => symbol.as_ref().map(|s| s.name.clone()),
257 _ => None,
258 })
259 .collect();
260
261 let mime_types: Vec<Option<String>> = chunks.iter().map(|c| c.mime_type.clone()).collect();
262
263 let schema = Arc::new(self.chunks_schema());
265
266 let vector_array = build_vector_array(&embeddings, self.embedding_dim)?;
267
268 let batch = RecordBatch::try_new(
269 schema,
270 vec![
271 Arc::new(StringArray::from(chunk_ids)),
272 Arc::new(StringArray::from(file_ids)),
273 Arc::new(StringArray::from(file_paths)),
274 Arc::new(StringArray::from(contents)),
275 Arc::new(StringArray::from(content_types)),
276 Arc::new(UInt32Array::from(chunk_indices)),
277 Arc::new(UInt64Array::from(start_bytes)),
278 Arc::new(UInt64Array::from(end_bytes)),
279 Arc::new(UInt32Array::from(start_lines)),
280 Arc::new(UInt32Array::from(end_lines)),
281 Arc::new(StringArray::from(parent_ids)),
282 Arc::new(UInt8Array::from(depths)),
283 vector_array,
284 Arc::new(StringArray::from(embedding_models)),
285 Arc::new(StringArray::from(indexed_ats)),
286 Arc::new(StringArray::from(mime_types.clone())),
287 Arc::new(StringArray::from(languages)),
288 Arc::new(StringArray::from(symbol_types)),
289 Arc::new(StringArray::from(symbol_names)),
290 ],
291 )
292 .map_err(|e| StoreError::Insert(format!("Failed to create RecordBatch: {e}")))?;
293
294 Ok(batch)
295 }
296
297 fn file_to_batch(&self, record: &FileRecord) -> Result<RecordBatch, StoreError> {
299 let schema = Arc::new(self.files_schema());
300
301 let batch = RecordBatch::try_new(
302 schema,
303 vec![
304 Arc::new(StringArray::from(vec![record.id.to_string()])),
305 Arc::new(StringArray::from(vec![
306 record.path.to_string_lossy().to_string(),
307 ])),
308 Arc::new(UInt64Array::from(vec![record.size_bytes])),
309 Arc::new(StringArray::from(vec![record.mime_type.clone()])),
310 Arc::new(StringArray::from(vec![record.content_hash.clone()])),
311 Arc::new(StringArray::from(vec![record.modified_at.to_rfc3339()])),
312 Arc::new(StringArray::from(vec![
313 record.indexed_at.map(|t| t.to_rfc3339()),
314 ])),
315 Arc::new(UInt32Array::from(vec![record.chunk_count])),
316 Arc::new(StringArray::from(vec![status_to_string(&record.status)])),
317 Arc::new(StringArray::from(vec![record.error_message.clone()])),
318 ],
319 )
320 .map_err(|e| StoreError::Insert(format!("Failed to create file RecordBatch: {e}")))?;
321
322 Ok(batch)
323 }
324}
325
326#[async_trait]
327impl VectorStore for LanceStore {
328 async fn init(&self) -> Result<(), StoreError> {
329 info!("Initializing LanceDB at {:?}", self.db_path);
330
331 if let Some(parent) = self.db_path.parent() {
333 tokio::fs::create_dir_all(parent)
334 .await
335 .map_err(|e| StoreError::Init(format!("Failed to create db directory: {e}")))?;
336 }
337
338 let conn = self.get_connection().await?;
339
340 let tables = conn
342 .table_names()
343 .execute()
344 .await
345 .map_err(|e| StoreError::Init(format!("Failed to list tables: {e}")))?;
346
347 if !tables.contains(&CHUNKS_TABLE.to_string()) {
349 info!("Creating chunks table");
350 let schema = Arc::new(self.chunks_schema());
351 conn.create_empty_table(CHUNKS_TABLE, schema)
352 .execute()
353 .await
354 .map_err(|e| StoreError::Init(format!("Failed to create chunks table: {e}")))?;
355
356 info!("Creating FTS index on content column");
358 let table = conn
359 .open_table(CHUNKS_TABLE)
360 .execute()
361 .await
362 .map_err(|e| StoreError::Init(format!("Failed to open chunks table: {e}")))?;
363
364 if let Err(e) = table
365 .create_index(&["content"], Index::FTS(FtsIndexBuilder::default()))
366 .execute()
367 .await
368 {
369 warn!("Failed to create FTS index (may already exist): {e}");
370 }
371 }
372
373 if !tables.contains(&FILES_TABLE.to_string()) {
375 info!("Creating files table");
376 let schema = Arc::new(self.files_schema());
377 conn.create_empty_table(FILES_TABLE, schema)
378 .execute()
379 .await
380 .map_err(|e| StoreError::Init(format!("Failed to create files table: {e}")))?;
381 }
382
383 info!("LanceDB initialized successfully");
384 Ok(())
385 }
386
387 async fn upsert_chunks(&self, chunks: &[Chunk]) -> Result<(), StoreError> {
388 if chunks.is_empty() {
389 return Ok(());
390 }
391
392 debug!("Upserting {} chunks", chunks.len());
393
394 let table = self.get_chunks_table().await?;
395 let batch = self.chunks_to_batch(chunks)?;
396 let schema = batch.schema();
397
398 let batches = RecordBatchIterator::new(vec![Ok(batch)], schema);
399
400 table
401 .add(Box::new(batches))
402 .execute()
403 .await
404 .map_err(|e| StoreError::Insert(format!("Failed to insert chunks: {e}")))?;
405
406 debug!("Successfully upserted {} chunks", chunks.len());
407 Ok(())
408 }
409
410 async fn search(&self, query: SearchQuery) -> Result<Vec<SearchResult>, StoreError> {
411 debug!("Searching with limit {}", query.limit);
412
413 let table = self.get_chunks_table().await?;
414
415 let mut results = table
416 .vector_search(query.embedding.clone())
417 .map_err(|e| StoreError::Query(format!("Failed to create search query: {e}")))?
418 .limit(query.limit)
419 .execute()
420 .await
421 .map_err(|e| StoreError::Query(format!("Failed to execute search: {e}")))?;
422
423 let mut search_results = Vec::new();
424
425 while let Some(batch) = results
426 .try_next()
427 .await
428 .map_err(|e| StoreError::Query(format!("Failed to fetch results: {e}")))?
429 {
430 search_results.extend(batch_to_search_results(&batch)?);
431 }
432
433 debug!("Found {} results", search_results.len());
434 Ok(search_results)
435 }
436
437 async fn hybrid_search(&self, query: SearchQuery) -> Result<Vec<SearchResult>, StoreError> {
438 let query_text = match &query.text {
440 Some(text) if !text.is_empty() => text.clone(),
441 _ => return self.search(query).await,
442 };
443
444 debug!(
445 "Performing hybrid search with text: '{}' and limit {}",
446 query_text, query.limit
447 );
448
449 let table = self.get_chunks_table().await?;
450
451 let fts_query = FullTextSearchQuery::new(query_text);
453
454 let mut results = table
455 .query()
456 .full_text_search(fts_query)
457 .nearest_to(query.embedding.clone())
458 .map_err(|e| StoreError::Query(format!("Failed to create hybrid query: {e}")))?
459 .limit(query.limit)
460 .execute_hybrid(QueryExecutionOptions::default())
461 .await
462 .map_err(|e| StoreError::Query(format!("Failed to execute hybrid search: {e}")))?;
463
464 let mut search_results = Vec::new();
465
466 while let Some(batch) = results
467 .try_next()
468 .await
469 .map_err(|e| StoreError::Query(format!("Failed to fetch hybrid results: {e}")))?
470 {
471 search_results.extend(batch_to_search_results(&batch)?);
472 }
473
474 debug!("Hybrid search found {} results", search_results.len());
475 Ok(search_results)
476 }
477
478 async fn delete_by_file_path(&self, path: &Path) -> Result<u64, StoreError> {
479 let path_str = path.to_string_lossy().to_string();
480 debug!("Deleting chunks for file: {}", path_str);
481
482 let table = self.get_chunks_table().await?;
483
484 table
485 .delete(&format!("file_path = '{}'", path_str.replace('\'', "''")))
486 .await
487 .map_err(|e| StoreError::Delete(format!("Failed to delete chunks: {e}")))?;
488
489 let files_table = self.get_files_table().await?;
491 files_table
492 .delete(&format!("path = '{}'", path_str.replace('\'', "''")))
493 .await
494 .map_err(|e| StoreError::Delete(format!("Failed to delete file record: {e}")))?;
495
496 Ok(1) }
498
499 async fn update_file_path(&self, from: &Path, to: &Path) -> Result<u64, StoreError> {
500 debug!("Updating file path from {:?} to {:?}", from, to);
502
503 let mut chunks = self.get_chunks_for_file(from).await?;
505 if chunks.is_empty() {
506 debug!("No chunks found for path {:?}", from);
507 return Ok(0);
508 }
509
510 let chunk_count = chunks.len() as u64;
511
512 for chunk in &mut chunks {
514 chunk.file_path = to.to_path_buf();
515 }
516
517 self.delete_by_file_path(from).await?;
519
520 self.upsert_chunks(&chunks).await?;
522
523 if let Ok(Some(mut file_record)) = self.get_file(from).await {
525 file_record.path = to.to_path_buf();
526 self.upsert_file(&file_record).await?;
527 }
528
529 info!("Updated {} chunks from {:?} to {:?}", chunk_count, from, to);
530 Ok(chunk_count)
531 }
532
533 async fn get_chunks_for_file(&self, path: &Path) -> Result<Vec<Chunk>, StoreError> {
534 let path_str = path.to_string_lossy().to_string();
535 debug!("Getting chunks for file: {}", path_str);
536
537 let table = self.get_chunks_table().await?;
538
539 let mut results = table
540 .query()
541 .only_if(format!("file_path = '{}'", path_str.replace('\'', "''")))
542 .execute()
543 .await
544 .map_err(|e| StoreError::Query(format!("Failed to query chunks: {e}")))?;
545
546 let mut chunks = Vec::new();
547
548 while let Some(batch) = results
549 .try_next()
550 .await
551 .map_err(|e| StoreError::Query(format!("Failed to fetch chunks: {e}")))?
552 {
553 chunks.extend(batch_to_chunks(&batch)?);
554 }
555
556 Ok(chunks)
557 }
558
559 async fn get_file(&self, path: &Path) -> Result<Option<FileRecord>, StoreError> {
560 let path_str = path.to_string_lossy().to_string();
561 debug!("Getting file record: {}", path_str);
562
563 let table = self.get_files_table().await?;
564
565 let mut results = table
566 .query()
567 .only_if(format!("path = '{}'", path_str.replace('\'', "''")))
568 .limit(1)
569 .execute()
570 .await
571 .map_err(|e| StoreError::Query(format!("Failed to query file: {e}")))?;
572
573 if let Some(batch) = results
574 .try_next()
575 .await
576 .map_err(|e| StoreError::Query(format!("Failed to fetch file: {e}")))?
577 {
578 let records = batch_to_file_records(&batch)?;
579 return Ok(records.into_iter().next());
580 }
581
582 Ok(None)
583 }
584
585 async fn upsert_file(&self, record: &FileRecord) -> Result<(), StoreError> {
586 debug!("Upserting file record: {:?}", record.path);
587
588 let path_str = record.path.to_string_lossy().to_string();
589
590 let files_table = self.get_files_table().await?;
592 let _ = files_table
593 .delete(&format!("path = '{}'", path_str.replace('\'', "''")))
594 .await;
595
596 let batch = self.file_to_batch(record)?;
598 let schema = batch.schema();
599
600 let batches = RecordBatchIterator::new(vec![Ok(batch)], schema);
601
602 files_table
603 .add(Box::new(batches))
604 .execute()
605 .await
606 .map_err(|e| StoreError::Insert(format!("Failed to insert file record: {e}")))?;
607
608 Ok(())
609 }
610
611 async fn stats(&self) -> Result<StoreStats, StoreError> {
612 let chunks_table = self.get_chunks_table().await?;
613 let files_table = self.get_files_table().await?;
614
615 let mut chunk_count = 0u64;
617 let mut results = chunks_table
618 .query()
619 .only_if("file_path LIKE '%'")
620 .execute()
621 .await
622 .map_err(|e| StoreError::Query(format!("Failed to query chunks: {e}")))?;
623
624 while let Some(batch) = results
625 .try_next()
626 .await
627 .map_err(|e| StoreError::Query(format!("Failed to count chunks: {e}")))?
628 {
629 chunk_count += batch.num_rows() as u64;
630 }
631
632 let mut file_count = 0u64;
634 let mut results = files_table
635 .query()
636 .only_if("size_bytes >= 0")
637 .execute()
638 .await
639 .map_err(|e| StoreError::Query(format!("Failed to query files: {e}")))?;
640
641 while let Some(batch) = results
642 .try_next()
643 .await
644 .map_err(|e| StoreError::Query(format!("Failed to count files: {e}")))?
645 {
646 file_count += batch.num_rows() as u64;
647 }
648
649 let index_size_bytes = calculate_dir_size(&self.db_path);
651
652 Ok(StoreStats {
653 total_chunks: chunk_count,
654 total_files: file_count,
655 index_size_bytes,
656 last_updated: Some(Utc::now()),
657 })
658 }
659
660 async fn get_all_chunks(&self) -> Result<Vec<Chunk>, StoreError> {
661 debug!("Getting all chunks");
662
663 let table = self.get_chunks_table().await?;
664
665 let mut results = table
666 .query()
667 .only_if("file_path LIKE '%'")
668 .execute()
669 .await
670 .map_err(|e| StoreError::Query(format!("Failed to query all chunks: {e}")))?;
671
672 let mut chunks = Vec::new();
673
674 while let Some(batch) = results
675 .try_next()
676 .await
677 .map_err(|e| StoreError::Query(format!("Failed to fetch chunks: {e}")))?
678 {
679 chunks.extend(batch_to_chunks(&batch)?);
680 }
681
682 debug!("Retrieved {} chunks", chunks.len());
683 Ok(chunks)
684 }
685
686 async fn get_all_files(&self) -> Result<Vec<FileRecord>, StoreError> {
687 debug!("Getting all file records");
688
689 let table = self.get_files_table().await?;
690
691 let mut results = table
692 .query()
693 .only_if("size_bytes >= 0")
694 .execute()
695 .await
696 .map_err(|e| StoreError::Query(format!("Failed to query all files: {e}")))?;
697
698 let mut records = Vec::new();
699
700 while let Some(batch) = results
701 .try_next()
702 .await
703 .map_err(|e| StoreError::Query(format!("Failed to fetch files: {e}")))?
704 {
705 records.extend(batch_to_file_records(&batch)?);
706 }
707
708 debug!("Retrieved {} file records", records.len());
709 Ok(records)
710 }
711}
712
713fn calculate_dir_size(path: &Path) -> u64 {
719 if !path.exists() {
720 return 0;
721 }
722
723 let mut total_size = 0u64;
724
725 if let Ok(entries) = std::fs::read_dir(path) {
726 for entry in entries.flatten() {
727 let entry_path = entry.path();
728 if entry_path.is_file() {
729 if let Ok(metadata) = entry.metadata() {
730 total_size += metadata.len();
731 }
732 } else if entry_path.is_dir() {
733 total_size += calculate_dir_size(&entry_path);
734 }
735 }
736 }
737
738 total_size
739}
740
741fn content_type_to_string(ct: &ContentType) -> String {
742 match ct {
743 ContentType::Text => "text".to_string(),
744 ContentType::Code { language, .. } => format!("code:{language}"),
745 ContentType::ImageCaption => "image_caption".to_string(),
746 ContentType::PdfPage { page_num } => format!("pdf:{page_num}"),
747 ContentType::Markdown => "markdown".to_string(),
748 }
749}
750
751fn string_to_content_type(s: &str) -> ContentType {
752 if s == "text" {
753 ContentType::Text
754 } else if s == "markdown" {
755 ContentType::Markdown
756 } else if s == "image_caption" {
757 ContentType::ImageCaption
758 } else if let Some(lang) = s.strip_prefix("code:") {
759 ContentType::Code {
760 language: lang.to_string(),
761 symbol: None,
762 }
763 } else if let Some(page) = s.strip_prefix("pdf:") {
764 ContentType::PdfPage {
765 page_num: page.parse().unwrap_or(1),
766 }
767 } else {
768 ContentType::Text
769 }
770}
771
772fn status_to_string(status: &FileStatus) -> String {
773 match status {
774 FileStatus::Pending => "pending".to_string(),
775 FileStatus::Indexing => "indexing".to_string(),
776 FileStatus::Indexed => "indexed".to_string(),
777 FileStatus::Error => "error".to_string(),
778 FileStatus::Deleted => "deleted".to_string(),
779 }
780}
781
782fn string_to_status(s: &str) -> FileStatus {
783 match s {
784 "pending" => FileStatus::Pending,
785 "indexing" => FileStatus::Indexing,
786 "indexed" => FileStatus::Indexed,
787 "error" => FileStatus::Error,
788 "deleted" => FileStatus::Deleted,
789 _ => FileStatus::Pending,
790 }
791}
792
793fn build_vector_array(
794 embeddings: &[Option<Vec<Option<f32>>>],
795 dim: usize,
796) -> Result<ArrayRef, StoreError> {
797 use arrow_array::builder::{FixedSizeListBuilder, Float32Builder};
798
799 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dim as i32);
800
801 for emb in embeddings {
802 if let Some(values) = emb {
803 let values_builder = builder.values();
804 for &v in values {
805 values_builder.append_option(v);
806 }
807 builder.append(true);
808 } else {
809 let values_builder = builder.values();
811 for _ in 0..dim {
812 values_builder.append_value(0.0);
813 }
814 builder.append(true);
815 }
816 }
817
818 Ok(Arc::new(builder.finish()))
819}
820
821fn batch_to_search_results(batch: &RecordBatch) -> Result<Vec<SearchResult>, StoreError> {
822 let mut results = Vec::new();
823
824 let chunk_ids = batch
825 .column_by_name("chunk_id")
826 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
827 let file_paths = batch
828 .column_by_name("file_path")
829 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
830 let contents = batch
831 .column_by_name("content")
832 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
833 let start_bytes = batch
834 .column_by_name("start_byte")
835 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
836 let end_bytes = batch
837 .column_by_name("end_byte")
838 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
839 let start_lines = batch
840 .column_by_name("start_line")
841 .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
842 let end_lines = batch
843 .column_by_name("end_line")
844 .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
845 let distances = batch
846 .column_by_name("_distance")
847 .and_then(|c| c.as_any().downcast_ref::<Float32Array>());
848
849 let (Some(chunk_ids), Some(file_paths), Some(contents), Some(start_bytes), Some(end_bytes)) =
850 (chunk_ids, file_paths, contents, start_bytes, end_bytes)
851 else {
852 return Err(StoreError::Query("Missing required columns".to_string()));
853 };
854
855 for i in 0..batch.num_rows() {
856 let chunk_id = chunk_ids.value(i);
857 let file_path = file_paths.value(i);
858 let content = contents.value(i);
859 let start = start_bytes.value(i);
860 let end = end_bytes.value(i);
861
862 let line_range = match (start_lines, end_lines) {
863 (Some(sl), Some(el)) if !sl.is_null(i) && !el.is_null(i) => {
864 Some(sl.value(i)..el.value(i))
865 }
866 _ => None,
867 };
868
869 let score = distances.map_or(0.0, |d| 1.0 - d.value(i));
870
871 results.push(SearchResult {
872 chunk_id: Uuid::parse_str(chunk_id).unwrap_or_default(),
873 file_path: PathBuf::from(file_path),
874 content: content.to_string(),
875 score,
876 byte_range: start..end,
877 line_range,
878 metadata: HashMap::new(),
879 });
880 }
881
882 Ok(results)
883}
884
885fn batch_to_chunks(batch: &RecordBatch) -> Result<Vec<Chunk>, StoreError> {
886 let mut chunks = Vec::new();
887
888 let chunk_ids = batch
892 .column_by_name("chunk_id")
893 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
894 let file_ids = batch
895 .column_by_name("file_id")
896 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
897 let file_paths = batch
898 .column_by_name("file_path")
899 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
900 let contents = batch
901 .column_by_name("content")
902 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
903 let content_types = batch
904 .column_by_name("content_type")
905 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
906 let chunk_indices = batch
907 .column_by_name("chunk_index")
908 .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
909 let start_bytes = batch
910 .column_by_name("start_byte")
911 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
912 let end_bytes = batch
913 .column_by_name("end_byte")
914 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
915 let depths = batch
916 .column_by_name("depth")
917 .and_then(|c| c.as_any().downcast_ref::<UInt8Array>());
918 let mime_types = batch
919 .column_by_name("mime_type")
920 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
921 let start_lines = batch
922 .column_by_name("start_line")
923 .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
924 let end_lines = batch
925 .column_by_name("end_line")
926 .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
927 let embeddings = batch
928 .column_by_name("embedding")
929 .and_then(|c| c.as_any().downcast_ref::<FixedSizeListArray>());
930
931 let (
932 Some(chunk_ids),
933 Some(file_ids),
934 Some(file_paths),
935 Some(contents),
936 Some(content_types),
937 Some(chunk_indices),
938 Some(start_bytes),
939 Some(end_bytes),
940 Some(depths),
941 ) = (
942 chunk_ids,
943 file_ids,
944 file_paths,
945 contents,
946 content_types,
947 chunk_indices,
948 start_bytes,
949 end_bytes,
950 depths,
951 )
952 else {
953 return Err(StoreError::Query(
954 "Missing required columns in chunks".to_string(),
955 ));
956 };
957
958 for i in 0..batch.num_rows() {
959 let mime_type = mime_types.and_then(|m| {
960 if m.is_null(i) {
961 None
962 } else {
963 Some(m.value(i).to_string())
964 }
965 });
966
967 let line_range = match (start_lines, end_lines) {
969 (Some(starts), Some(ends)) if !starts.is_null(i) && !ends.is_null(i) => {
970 Some(starts.value(i)..ends.value(i))
971 }
972 _ => None,
973 };
974
975 let embedding = embeddings.and_then(|emb_array| {
977 if emb_array.is_null(i) {
978 None
979 } else {
980 let values = emb_array.value(i);
981 values
982 .as_any()
983 .downcast_ref::<Float32Array>()
984 .map(|arr| arr.values().to_vec())
985 }
986 });
987
988 chunks.push(Chunk {
989 id: Uuid::parse_str(chunk_ids.value(i)).unwrap_or_default(),
990 file_id: Uuid::parse_str(file_ids.value(i)).unwrap_or_default(),
991 file_path: PathBuf::from(file_paths.value(i)),
992 content: contents.value(i).to_string(),
993 content_type: string_to_content_type(content_types.value(i)),
994 mime_type,
995 chunk_index: chunk_indices.value(i),
996 byte_range: start_bytes.value(i)..end_bytes.value(i),
997 line_range,
998 parent_chunk_id: None,
999 depth: depths.value(i),
1000 embedding,
1001 metadata: ChunkMetadata::default(),
1002 });
1003 }
1004
1005 Ok(chunks)
1006}
1007
1008fn batch_to_file_records(batch: &RecordBatch) -> Result<Vec<FileRecord>, StoreError> {
1009 let mut records = Vec::new();
1010
1011 let file_ids = batch
1012 .column_by_name("file_id")
1013 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1014 let paths = batch
1015 .column_by_name("path")
1016 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1017 let sizes = batch
1018 .column_by_name("size_bytes")
1019 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
1020 let mime_types = batch
1021 .column_by_name("mime_type")
1022 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1023 let hashes = batch
1024 .column_by_name("content_hash")
1025 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1026 let modified_ats = batch
1027 .column_by_name("modified_at")
1028 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1029 let chunk_counts = batch
1030 .column_by_name("chunk_count")
1031 .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
1032 let statuses = batch
1033 .column_by_name("status")
1034 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1035 let indexed_ats = batch
1036 .column_by_name("indexed_at")
1037 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1038
1039 let (
1040 Some(file_ids),
1041 Some(paths),
1042 Some(sizes),
1043 Some(mime_types),
1044 Some(hashes),
1045 Some(modified_ats),
1046 Some(chunk_counts),
1047 Some(statuses),
1048 ) = (
1049 file_ids,
1050 paths,
1051 sizes,
1052 mime_types,
1053 hashes,
1054 modified_ats,
1055 chunk_counts,
1056 statuses,
1057 )
1058 else {
1059 return Err(StoreError::Query(
1060 "Missing required columns in files".to_string(),
1061 ));
1062 };
1063
1064 for i in 0..batch.num_rows() {
1065 let modified_at = chrono::DateTime::parse_from_rfc3339(modified_ats.value(i))
1066 .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc));
1067
1068 let indexed_at = indexed_ats.and_then(|arr| {
1070 if arr.is_null(i) {
1071 None
1072 } else {
1073 chrono::DateTime::parse_from_rfc3339(arr.value(i))
1074 .map(|dt| dt.with_timezone(&Utc))
1075 .ok()
1076 }
1077 });
1078
1079 records.push(FileRecord {
1080 id: Uuid::parse_str(file_ids.value(i)).unwrap_or_default(),
1081 path: PathBuf::from(paths.value(i)),
1082 size_bytes: sizes.value(i),
1083 mime_type: mime_types.value(i).to_string(),
1084 content_hash: hashes.value(i).to_string(),
1085 modified_at,
1086 indexed_at,
1087 chunk_count: chunk_counts.value(i),
1088 status: string_to_status(statuses.value(i)),
1089 error_message: None,
1090 });
1091 }
1092
1093 Ok(records)
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098 use super::*;
1099 use ragfs_core::DistanceMetric;
1100 use std::collections::HashMap;
1101 use tempfile::tempdir;
1102
1103 const TEST_DIM: usize = 384;
1104
1105 fn create_test_chunk(
1106 file_path: &Path,
1107 content: &str,
1108 embedding: Vec<f32>,
1109 chunk_index: u32,
1110 ) -> Chunk {
1111 Chunk {
1112 id: Uuid::new_v4(),
1113 file_id: Uuid::new_v4(),
1114 file_path: file_path.to_path_buf(),
1115 content: content.to_string(),
1116 content_type: ContentType::Text,
1117 mime_type: Some("text/plain".to_string()),
1118 chunk_index,
1119 byte_range: 0..content.len() as u64,
1120 line_range: Some(0..1),
1121 parent_chunk_id: None,
1122 depth: 0,
1123 embedding: Some(embedding),
1124 metadata: ChunkMetadata {
1125 indexed_at: Some(Utc::now()),
1126 embedding_model: Some("test-model".to_string()),
1127 token_count: None,
1128 extra: HashMap::new(),
1129 },
1130 }
1131 }
1132
1133 fn create_random_embedding(dim: usize) -> Vec<f32> {
1134 (0..dim).map(|i| (i as f32 * 0.001).sin()).collect()
1135 }
1136
1137 fn create_test_file_record(path: &Path) -> FileRecord {
1138 FileRecord {
1139 id: Uuid::new_v4(),
1140 path: path.to_path_buf(),
1141 size_bytes: 1024,
1142 mime_type: "text/plain".to_string(),
1143 content_hash: "abc123".to_string(),
1144 modified_at: Utc::now(),
1145 indexed_at: Some(Utc::now()),
1146 chunk_count: 1,
1147 status: FileStatus::Indexed,
1148 error_message: None,
1149 }
1150 }
1151
1152 #[tokio::test]
1153 async fn test_init_creates_tables() {
1154 let temp = tempdir().unwrap();
1155 let db_path = temp.path().join("test.lance");
1156 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1157
1158 let result = store.init().await;
1159 assert!(result.is_ok(), "Init failed: {:?}", result.err());
1160
1161 let conn = store.get_connection().await.unwrap();
1163 let tables = conn.table_names().execute().await.unwrap();
1164 assert!(tables.contains(&"chunks".to_string()));
1165 assert!(tables.contains(&"files".to_string()));
1166 }
1167
1168 #[tokio::test]
1169 async fn test_init_idempotent() {
1170 let temp = tempdir().unwrap();
1171 let db_path = temp.path().join("test.lance");
1172 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1173
1174 store.init().await.unwrap();
1176 let result = store.init().await;
1177 assert!(result.is_ok());
1178 }
1179
1180 #[tokio::test]
1181 async fn test_upsert_and_get_chunks() {
1182 let temp = tempdir().unwrap();
1183 let db_path = temp.path().join("test.lance");
1184 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1185 store.init().await.unwrap();
1186
1187 let file_path = PathBuf::from("/test/file.txt");
1188 let embedding = create_random_embedding(TEST_DIM);
1189 let chunk = create_test_chunk(&file_path, "Hello world", embedding, 0);
1190
1191 let result = store.upsert_chunks(&[chunk]).await;
1193 assert!(result.is_ok(), "Upsert failed: {:?}", result.err());
1194
1195 let chunks = store.get_chunks_for_file(&file_path).await.unwrap();
1197 assert_eq!(chunks.len(), 1);
1198 assert_eq!(chunks[0].content, "Hello world");
1199 }
1200
1201 #[tokio::test]
1202 async fn test_upsert_multiple_chunks() {
1203 let temp = tempdir().unwrap();
1204 let db_path = temp.path().join("test.lance");
1205 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1206 store.init().await.unwrap();
1207
1208 let file_path = PathBuf::from("/test/multi.txt");
1209 let chunks: Vec<Chunk> = (0..5)
1210 .map(|i| {
1211 create_test_chunk(
1212 &file_path,
1213 &format!("Chunk content {i}"),
1214 create_random_embedding(TEST_DIM),
1215 i,
1216 )
1217 })
1218 .collect();
1219
1220 store.upsert_chunks(&chunks).await.unwrap();
1221
1222 let retrieved = store.get_chunks_for_file(&file_path).await.unwrap();
1223 assert_eq!(retrieved.len(), 5);
1224 }
1225
1226 #[tokio::test]
1227 async fn test_upsert_empty_chunks() {
1228 let temp = tempdir().unwrap();
1229 let db_path = temp.path().join("test.lance");
1230 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1231 store.init().await.unwrap();
1232
1233 let result = store.upsert_chunks(&[]).await;
1235 assert!(result.is_ok());
1236 }
1237
1238 #[tokio::test]
1239 async fn test_search_returns_results() {
1240 let temp = tempdir().unwrap();
1241 let db_path = temp.path().join("test.lance");
1242 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1243 store.init().await.unwrap();
1244
1245 let file_path = PathBuf::from("/test/search.txt");
1247 let embedding = create_random_embedding(TEST_DIM);
1248 let chunk = create_test_chunk(&file_path, "Authentication logic", embedding.clone(), 0);
1249 store.upsert_chunks(&[chunk]).await.unwrap();
1250
1251 let query = SearchQuery {
1253 text: Some("auth".to_string()),
1254 embedding: embedding.clone(),
1255 limit: 10,
1256 filters: vec![],
1257 metric: DistanceMetric::Cosine,
1258 };
1259
1260 let results = store.search(query).await.unwrap();
1261 assert!(!results.is_empty());
1262 assert_eq!(results[0].content, "Authentication logic");
1263 }
1264
1265 #[tokio::test]
1266 async fn test_search_respects_limit() {
1267 let temp = tempdir().unwrap();
1268 let db_path = temp.path().join("test.lance");
1269 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1270 store.init().await.unwrap();
1271
1272 let file_path = PathBuf::from("/test/limit.txt");
1274 let chunks: Vec<Chunk> = (0..10)
1275 .map(|i| {
1276 create_test_chunk(
1277 &file_path,
1278 &format!("Content {i}"),
1279 create_random_embedding(TEST_DIM),
1280 i,
1281 )
1282 })
1283 .collect();
1284 store.upsert_chunks(&chunks).await.unwrap();
1285
1286 let query = SearchQuery {
1288 text: Some("test".to_string()),
1289 embedding: create_random_embedding(TEST_DIM),
1290 limit: 3,
1291 filters: vec![],
1292 metric: DistanceMetric::Cosine,
1293 };
1294
1295 let results = store.search(query).await.unwrap();
1296 assert!(results.len() <= 3);
1297 }
1298
1299 #[tokio::test]
1300 async fn test_delete_by_file_path() {
1301 let temp = tempdir().unwrap();
1302 let db_path = temp.path().join("test.lance");
1303 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1304 store.init().await.unwrap();
1305
1306 let file_path = PathBuf::from("/test/delete.txt");
1307 let chunk = create_test_chunk(
1308 &file_path,
1309 "To be deleted",
1310 create_random_embedding(TEST_DIM),
1311 0,
1312 );
1313 store.upsert_chunks(&[chunk]).await.unwrap();
1314
1315 let chunks = store.get_chunks_for_file(&file_path).await.unwrap();
1317 assert_eq!(chunks.len(), 1);
1318
1319 store.delete_by_file_path(&file_path).await.unwrap();
1321
1322 let chunks = store.get_chunks_for_file(&file_path).await.unwrap();
1324 assert_eq!(chunks.len(), 0);
1325 }
1326
1327 #[tokio::test]
1328 async fn test_upsert_and_get_file_record() {
1329 let temp = tempdir().unwrap();
1330 let db_path = temp.path().join("test.lance");
1331 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1332 store.init().await.unwrap();
1333
1334 let file_path = PathBuf::from("/test/record.txt");
1335 let record = create_test_file_record(&file_path);
1336
1337 store.upsert_file(&record).await.unwrap();
1339
1340 let retrieved = store.get_file(&file_path).await.unwrap();
1342 assert!(retrieved.is_some());
1343 let retrieved = retrieved.unwrap();
1344 assert_eq!(retrieved.path, file_path);
1345 assert_eq!(retrieved.mime_type, "text/plain");
1346 }
1347
1348 #[tokio::test]
1349 async fn test_get_nonexistent_file() {
1350 let temp = tempdir().unwrap();
1351 let db_path = temp.path().join("test.lance");
1352 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1353 store.init().await.unwrap();
1354
1355 let result = store
1356 .get_file(&PathBuf::from("/nonexistent"))
1357 .await
1358 .unwrap();
1359 assert!(result.is_none());
1360 }
1361
1362 #[tokio::test]
1363 async fn test_stats() {
1364 let temp = tempdir().unwrap();
1365 let db_path = temp.path().join("test.lance");
1366 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1367 store.init().await.unwrap();
1368
1369 let stats = store.stats().await.unwrap();
1371 assert_eq!(stats.total_chunks, 0);
1372 assert_eq!(stats.total_files, 0);
1373
1374 let file_path = PathBuf::from("/test/stats.txt");
1376 let chunk = create_test_chunk(
1377 &file_path,
1378 "Stats test",
1379 create_random_embedding(TEST_DIM),
1380 0,
1381 );
1382 store.upsert_chunks(&[chunk]).await.unwrap();
1383
1384 let record = create_test_file_record(&file_path);
1386 store.upsert_file(&record).await.unwrap();
1387
1388 let chunks = store.get_chunks_for_file(&file_path).await.unwrap();
1390 assert_eq!(
1391 chunks.len(),
1392 1,
1393 "Chunks should still exist after upsert_file"
1394 );
1395
1396 let file = store.get_file(&file_path).await.unwrap();
1398 assert!(file.is_some(), "File should be retrievable");
1399
1400 let stats = store.stats().await.unwrap();
1402 assert_eq!(stats.total_chunks, 1);
1403 assert_eq!(stats.total_files, 1);
1404 assert!(
1405 stats.index_size_bytes > 0,
1406 "index_size_bytes should be > 0, got {}",
1407 stats.index_size_bytes
1408 );
1409 }
1410
1411 #[tokio::test]
1412 async fn test_content_type_conversion() {
1413 assert_eq!(content_type_to_string(&ContentType::Text), "text");
1414 assert_eq!(content_type_to_string(&ContentType::Markdown), "markdown");
1415 assert_eq!(
1416 content_type_to_string(&ContentType::Code {
1417 language: "rust".to_string(),
1418 symbol: None
1419 }),
1420 "code:rust"
1421 );
1422 assert_eq!(
1423 content_type_to_string(&ContentType::PdfPage { page_num: 5 }),
1424 "pdf:5"
1425 );
1426
1427 assert!(matches!(string_to_content_type("text"), ContentType::Text));
1428 assert!(matches!(
1429 string_to_content_type("markdown"),
1430 ContentType::Markdown
1431 ));
1432 assert!(matches!(
1433 string_to_content_type("code:python"),
1434 ContentType::Code { language, .. } if language == "python"
1435 ));
1436 }
1437
1438 #[tokio::test]
1439 async fn test_file_status_conversion() {
1440 assert_eq!(status_to_string(&FileStatus::Pending), "pending");
1441 assert_eq!(status_to_string(&FileStatus::Indexed), "indexed");
1442 assert_eq!(status_to_string(&FileStatus::Error), "error");
1443
1444 assert!(matches!(string_to_status("pending"), FileStatus::Pending));
1445 assert!(matches!(string_to_status("indexed"), FileStatus::Indexed));
1446 assert!(matches!(string_to_status("error"), FileStatus::Error));
1447 assert!(matches!(string_to_status("unknown"), FileStatus::Pending));
1448 }
1449
1450 #[tokio::test]
1451 async fn test_chunks_with_code_content_type() {
1452 let temp = tempdir().unwrap();
1453 let db_path = temp.path().join("test.lance");
1454 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1455 store.init().await.unwrap();
1456
1457 let file_path = PathBuf::from("/test/code.rs");
1458 let chunk = Chunk {
1459 id: Uuid::new_v4(),
1460 file_id: Uuid::new_v4(),
1461 file_path: file_path.clone(),
1462 content: "fn main() {}".to_string(),
1463 content_type: ContentType::Code {
1464 language: "rust".to_string(),
1465 symbol: None,
1466 },
1467 mime_type: Some("text/x-rust".to_string()),
1468 chunk_index: 0,
1469 byte_range: 0..12,
1470 line_range: Some(0..1),
1471 parent_chunk_id: None,
1472 depth: 0,
1473 embedding: Some(create_random_embedding(TEST_DIM)),
1474 metadata: ChunkMetadata::default(),
1475 };
1476
1477 store.upsert_chunks(&[chunk]).await.unwrap();
1478
1479 let chunks = store.get_chunks_for_file(&file_path).await.unwrap();
1480 assert_eq!(chunks.len(), 1);
1481 assert!(matches!(
1482 &chunks[0].content_type,
1483 ContentType::Code { language, .. } if language == "rust"
1484 ));
1485 }
1486
1487 #[tokio::test]
1488 async fn test_get_all_files_empty() {
1489 let temp = tempdir().unwrap();
1490 let db_path = temp.path().join("test.lance");
1491 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1492 store.init().await.unwrap();
1493
1494 let files = store.get_all_files().await.unwrap();
1495 assert!(files.is_empty());
1496 }
1497
1498 #[tokio::test]
1499 async fn test_delete_nonexistent_file() {
1500 let temp = tempdir().unwrap();
1501 let db_path = temp.path().join("test.lance");
1502 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1503 store.init().await.unwrap();
1504
1505 let path = PathBuf::from("/nonexistent/file.txt");
1506 let result = store.delete_by_file_path(&path).await;
1508 assert!(result.is_ok());
1509 }
1510
1511 #[tokio::test]
1512 async fn test_get_file_not_found() {
1513 let temp = tempdir().unwrap();
1514 let db_path = temp.path().join("test.lance");
1515 let store = LanceStore::new(db_path.clone(), TEST_DIM);
1516 store.init().await.unwrap();
1517
1518 let path = PathBuf::from("/nonexistent/file.txt");
1519 let result = store.get_file(&path).await.unwrap();
1520 assert!(result.is_none());
1521 }
1522}