ragfs_store/
lancedb.rs

1//! `LanceDB` implementation of `VectorStore`.
2
3use arrow_array::{
4    Array, ArrayRef, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator,
5    StringArray, UInt8Array, UInt32Array, UInt64Array,
6};
7use arrow_schema::{DataType, Field, Schema};
8use async_trait::async_trait;
9use chrono::Utc;
10use futures::TryStreamExt;
11use lancedb::index::Index;
12use lancedb::index::scalar::{FtsIndexBuilder, FullTextSearchQuery};
13use lancedb::query::{ExecutableQuery, QueryBase, QueryExecutionOptions};
14use lancedb::{Connection, Table, connect};
15use ragfs_core::{
16    Chunk, ChunkMetadata, ContentType, FileRecord, FileStatus, SearchQuery, SearchResult,
17    StoreError, StoreStats, VectorStore,
18};
19use std::collections::HashMap;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use tokio::sync::RwLock;
23use tracing::{debug, info, warn};
24use uuid::Uuid;
25
26const CHUNKS_TABLE: &str = "chunks";
27const FILES_TABLE: &str = "files";
28
29/// LanceDB-based vector store.
30pub struct LanceStore {
31    /// Path to the `LanceDB` database
32    db_path: PathBuf,
33    /// Embedding dimension
34    embedding_dim: usize,
35    /// Database connection (lazy initialized)
36    connection: RwLock<Option<Connection>>,
37    /// Chunks table handle
38    chunks_table: RwLock<Option<Table>>,
39    /// Files table handle
40    files_table: RwLock<Option<Table>>,
41}
42
43impl LanceStore {
44    /// Create a new `LanceStore`.
45    #[must_use]
46    pub fn new(db_path: PathBuf, embedding_dim: usize) -> Self {
47        Self {
48            db_path,
49            embedding_dim,
50            connection: RwLock::new(None),
51            chunks_table: RwLock::new(None),
52            files_table: RwLock::new(None),
53        }
54    }
55
56    /// Get the database path.
57    pub fn db_path(&self) -> &Path {
58        &self.db_path
59    }
60
61    /// Get the embedding dimension.
62    pub fn embedding_dim(&self) -> usize {
63        self.embedding_dim
64    }
65
66    /// Get or create connection.
67    async fn get_connection(&self) -> Result<Connection, StoreError> {
68        {
69            let conn = self.connection.read().await;
70            if let Some(ref c) = *conn {
71                return Ok(c.clone());
72            }
73        }
74
75        let mut conn = self.connection.write().await;
76        if conn.is_none() {
77            let db_path_str = self.db_path.to_string_lossy().to_string();
78            let new_conn = connect(&db_path_str)
79                .execute()
80                .await
81                .map_err(|e| StoreError::Init(format!("Failed to connect to LanceDB: {e}")))?;
82            *conn = Some(new_conn);
83        }
84        Ok(conn.as_ref().unwrap().clone())
85    }
86
87    /// Build chunks table schema.
88    fn chunks_schema(&self) -> Schema {
89        Schema::new(vec![
90            Field::new("chunk_id", DataType::Utf8, false),
91            Field::new("file_id", DataType::Utf8, false),
92            Field::new("file_path", DataType::Utf8, false),
93            Field::new("content", DataType::Utf8, false),
94            Field::new("content_type", DataType::Utf8, false),
95            Field::new("chunk_index", DataType::UInt32, false),
96            Field::new("start_byte", DataType::UInt64, false),
97            Field::new("end_byte", DataType::UInt64, false),
98            Field::new("start_line", DataType::UInt32, true),
99            Field::new("end_line", DataType::UInt32, true),
100            Field::new("parent_chunk_id", DataType::Utf8, true),
101            Field::new("depth", DataType::UInt8, false),
102            Field::new(
103                "vector",
104                DataType::FixedSizeList(
105                    Arc::new(Field::new("item", DataType::Float32, true)),
106                    self.embedding_dim as i32,
107                ),
108                false,
109            ),
110            Field::new("embedding_model", DataType::Utf8, true),
111            Field::new("indexed_at", DataType::Utf8, false),
112            Field::new("file_mime_type", DataType::Utf8, true),
113            Field::new("language", DataType::Utf8, true),
114            Field::new("symbol_type", DataType::Utf8, true),
115            Field::new("symbol_name", DataType::Utf8, true),
116        ])
117    }
118
119    /// Build files table schema.
120    fn files_schema(&self) -> Schema {
121        Schema::new(vec![
122            Field::new("file_id", DataType::Utf8, false),
123            Field::new("path", DataType::Utf8, false),
124            Field::new("size_bytes", DataType::UInt64, false),
125            Field::new("mime_type", DataType::Utf8, false),
126            Field::new("content_hash", DataType::Utf8, false),
127            Field::new("modified_at", DataType::Utf8, false),
128            Field::new("indexed_at", DataType::Utf8, true),
129            Field::new("chunk_count", DataType::UInt32, false),
130            Field::new("status", DataType::Utf8, false),
131            Field::new("error_message", DataType::Utf8, true),
132        ])
133    }
134
135    /// Get or open chunks table.
136    async fn get_chunks_table(&self) -> Result<Table, StoreError> {
137        {
138            let table = self.chunks_table.read().await;
139            if let Some(ref t) = *table {
140                return Ok(t.clone());
141            }
142        }
143
144        let conn = self.get_connection().await?;
145        let mut table_lock = self.chunks_table.write().await;
146
147        if table_lock.is_none() {
148            let t = conn
149                .open_table(CHUNKS_TABLE)
150                .execute()
151                .await
152                .map_err(|e| StoreError::Init(format!("Failed to open chunks table: {e}")))?;
153            *table_lock = Some(t);
154        }
155
156        Ok(table_lock.as_ref().unwrap().clone())
157    }
158
159    /// Get or open files table.
160    async fn get_files_table(&self) -> Result<Table, StoreError> {
161        {
162            let table = self.files_table.read().await;
163            if let Some(ref t) = *table {
164                return Ok(t.clone());
165            }
166        }
167
168        let conn = self.get_connection().await?;
169        let mut table_lock = self.files_table.write().await;
170
171        if table_lock.is_none() {
172            let t = conn
173                .open_table(FILES_TABLE)
174                .execute()
175                .await
176                .map_err(|e| StoreError::Init(format!("Failed to open files table: {e}")))?;
177            *table_lock = Some(t);
178        }
179
180        Ok(table_lock.as_ref().unwrap().clone())
181    }
182
183    /// Convert chunks to Arrow `RecordBatch`.
184    fn chunks_to_batch(&self, chunks: &[Chunk]) -> Result<RecordBatch, StoreError> {
185        let chunk_ids: Vec<_> = chunks.iter().map(|c| c.id.to_string()).collect();
186        let file_ids: Vec<_> = chunks.iter().map(|c| c.file_id.to_string()).collect();
187        let file_paths: Vec<_> = chunks
188            .iter()
189            .map(|c| c.file_path.to_string_lossy().to_string())
190            .collect();
191        let contents: Vec<_> = chunks.iter().map(|c| c.content.clone()).collect();
192        let content_types: Vec<_> = chunks
193            .iter()
194            .map(|c| content_type_to_string(&c.content_type))
195            .collect();
196        let chunk_indices: Vec<_> = chunks.iter().map(|c| c.chunk_index).collect();
197        let start_bytes: Vec<_> = chunks.iter().map(|c| c.byte_range.start).collect();
198        let end_bytes: Vec<_> = chunks.iter().map(|c| c.byte_range.end).collect();
199        let start_lines: Vec<_> = chunks
200            .iter()
201            .map(|c| c.line_range.as_ref().map(|r| r.start))
202            .collect();
203        let end_lines: Vec<_> = chunks
204            .iter()
205            .map(|c| c.line_range.as_ref().map(|r| r.end))
206            .collect();
207        let parent_ids: Vec<_> = chunks
208            .iter()
209            .map(|c| c.parent_chunk_id.map(|id| id.to_string()))
210            .collect();
211        let depths: Vec<_> = chunks.iter().map(|c| c.depth).collect();
212
213        // Build embeddings as FixedSizeList
214        let embeddings: Vec<Option<Vec<Option<f32>>>> = chunks
215            .iter()
216            .map(|c| {
217                c.embedding
218                    .as_ref()
219                    .map(|e| e.iter().map(|&v| Some(v)).collect())
220            })
221            .collect();
222
223        let embedding_models: Vec<_> = chunks
224            .iter()
225            .map(|c| c.metadata.embedding_model.clone())
226            .collect();
227        let indexed_ats: Vec<_> = chunks
228            .iter()
229            .map(|c| {
230                c.metadata
231                    .indexed_at
232                    .map_or_else(|| Utc::now().to_rfc3339(), |t| t.to_rfc3339())
233            })
234            .collect();
235
236        // Extract language/symbol info from content_type
237        let languages: Vec<_> = chunks
238            .iter()
239            .map(|c| match &c.content_type {
240                ContentType::Code { language, .. } => Some(language.clone()),
241                _ => None,
242            })
243            .collect();
244        let symbol_types: Vec<_> = chunks
245            .iter()
246            .map(|c| match &c.content_type {
247                ContentType::Code { symbol, .. } => {
248                    symbol.as_ref().map(|s| format!("{:?}", s.kind))
249                }
250                _ => None,
251            })
252            .collect();
253        let symbol_names: Vec<_> = chunks
254            .iter()
255            .map(|c| match &c.content_type {
256                ContentType::Code { symbol, .. } => symbol.as_ref().map(|s| s.name.clone()),
257                _ => None,
258            })
259            .collect();
260
261        let mime_types: Vec<Option<String>> = chunks.iter().map(|c| c.mime_type.clone()).collect();
262
263        // Build arrays
264        let schema = Arc::new(self.chunks_schema());
265
266        let vector_array = build_vector_array(&embeddings, self.embedding_dim)?;
267
268        let batch = RecordBatch::try_new(
269            schema,
270            vec![
271                Arc::new(StringArray::from(chunk_ids)),
272                Arc::new(StringArray::from(file_ids)),
273                Arc::new(StringArray::from(file_paths)),
274                Arc::new(StringArray::from(contents)),
275                Arc::new(StringArray::from(content_types)),
276                Arc::new(UInt32Array::from(chunk_indices)),
277                Arc::new(UInt64Array::from(start_bytes)),
278                Arc::new(UInt64Array::from(end_bytes)),
279                Arc::new(UInt32Array::from(start_lines)),
280                Arc::new(UInt32Array::from(end_lines)),
281                Arc::new(StringArray::from(parent_ids)),
282                Arc::new(UInt8Array::from(depths)),
283                vector_array,
284                Arc::new(StringArray::from(embedding_models)),
285                Arc::new(StringArray::from(indexed_ats)),
286                Arc::new(StringArray::from(mime_types.clone())),
287                Arc::new(StringArray::from(languages)),
288                Arc::new(StringArray::from(symbol_types)),
289                Arc::new(StringArray::from(symbol_names)),
290            ],
291        )
292        .map_err(|e| StoreError::Insert(format!("Failed to create RecordBatch: {e}")))?;
293
294        Ok(batch)
295    }
296
297    /// Convert file record to Arrow `RecordBatch`.
298    fn file_to_batch(&self, record: &FileRecord) -> Result<RecordBatch, StoreError> {
299        let schema = Arc::new(self.files_schema());
300
301        let batch = RecordBatch::try_new(
302            schema,
303            vec![
304                Arc::new(StringArray::from(vec![record.id.to_string()])),
305                Arc::new(StringArray::from(vec![
306                    record.path.to_string_lossy().to_string(),
307                ])),
308                Arc::new(UInt64Array::from(vec![record.size_bytes])),
309                Arc::new(StringArray::from(vec![record.mime_type.clone()])),
310                Arc::new(StringArray::from(vec![record.content_hash.clone()])),
311                Arc::new(StringArray::from(vec![record.modified_at.to_rfc3339()])),
312                Arc::new(StringArray::from(vec![
313                    record.indexed_at.map(|t| t.to_rfc3339()),
314                ])),
315                Arc::new(UInt32Array::from(vec![record.chunk_count])),
316                Arc::new(StringArray::from(vec![status_to_string(&record.status)])),
317                Arc::new(StringArray::from(vec![record.error_message.clone()])),
318            ],
319        )
320        .map_err(|e| StoreError::Insert(format!("Failed to create file RecordBatch: {e}")))?;
321
322        Ok(batch)
323    }
324}
325
326#[async_trait]
327impl VectorStore for LanceStore {
328    async fn init(&self) -> Result<(), StoreError> {
329        info!("Initializing LanceDB at {:?}", self.db_path);
330
331        // Ensure directory exists
332        if let Some(parent) = self.db_path.parent() {
333            tokio::fs::create_dir_all(parent)
334                .await
335                .map_err(|e| StoreError::Init(format!("Failed to create db directory: {e}")))?;
336        }
337
338        let conn = self.get_connection().await?;
339
340        // Check existing tables
341        let tables = conn
342            .table_names()
343            .execute()
344            .await
345            .map_err(|e| StoreError::Init(format!("Failed to list tables: {e}")))?;
346
347        // Create chunks table if not exists
348        if !tables.contains(&CHUNKS_TABLE.to_string()) {
349            info!("Creating chunks table");
350            let schema = Arc::new(self.chunks_schema());
351            conn.create_empty_table(CHUNKS_TABLE, schema)
352                .execute()
353                .await
354                .map_err(|e| StoreError::Init(format!("Failed to create chunks table: {e}")))?;
355
356            // Create FTS index on content column for hybrid search
357            info!("Creating FTS index on content column");
358            let table = conn
359                .open_table(CHUNKS_TABLE)
360                .execute()
361                .await
362                .map_err(|e| StoreError::Init(format!("Failed to open chunks table: {e}")))?;
363
364            if let Err(e) = table
365                .create_index(&["content"], Index::FTS(FtsIndexBuilder::default()))
366                .execute()
367                .await
368            {
369                warn!("Failed to create FTS index (may already exist): {e}");
370            }
371        }
372
373        // Create files table if not exists
374        if !tables.contains(&FILES_TABLE.to_string()) {
375            info!("Creating files table");
376            let schema = Arc::new(self.files_schema());
377            conn.create_empty_table(FILES_TABLE, schema)
378                .execute()
379                .await
380                .map_err(|e| StoreError::Init(format!("Failed to create files table: {e}")))?;
381        }
382
383        info!("LanceDB initialized successfully");
384        Ok(())
385    }
386
387    async fn upsert_chunks(&self, chunks: &[Chunk]) -> Result<(), StoreError> {
388        if chunks.is_empty() {
389            return Ok(());
390        }
391
392        debug!("Upserting {} chunks", chunks.len());
393
394        let table = self.get_chunks_table().await?;
395        let batch = self.chunks_to_batch(chunks)?;
396        let schema = batch.schema();
397
398        let batches = RecordBatchIterator::new(vec![Ok(batch)], schema);
399
400        table
401            .add(Box::new(batches))
402            .execute()
403            .await
404            .map_err(|e| StoreError::Insert(format!("Failed to insert chunks: {e}")))?;
405
406        debug!("Successfully upserted {} chunks", chunks.len());
407        Ok(())
408    }
409
410    async fn search(&self, query: SearchQuery) -> Result<Vec<SearchResult>, StoreError> {
411        debug!("Searching with limit {}", query.limit);
412
413        let table = self.get_chunks_table().await?;
414
415        let mut results = table
416            .vector_search(query.embedding.clone())
417            .map_err(|e| StoreError::Query(format!("Failed to create search query: {e}")))?
418            .limit(query.limit)
419            .execute()
420            .await
421            .map_err(|e| StoreError::Query(format!("Failed to execute search: {e}")))?;
422
423        let mut search_results = Vec::new();
424
425        while let Some(batch) = results
426            .try_next()
427            .await
428            .map_err(|e| StoreError::Query(format!("Failed to fetch results: {e}")))?
429        {
430            search_results.extend(batch_to_search_results(&batch)?);
431        }
432
433        debug!("Found {} results", search_results.len());
434        Ok(search_results)
435    }
436
437    async fn hybrid_search(&self, query: SearchQuery) -> Result<Vec<SearchResult>, StoreError> {
438        // If no text query provided, fall back to vector-only search
439        let query_text = match &query.text {
440            Some(text) if !text.is_empty() => text.clone(),
441            _ => return self.search(query).await,
442        };
443
444        debug!(
445            "Performing hybrid search with text: '{}' and limit {}",
446            query_text, query.limit
447        );
448
449        let table = self.get_chunks_table().await?;
450
451        // Build hybrid query combining FTS and vector search
452        let fts_query = FullTextSearchQuery::new(query_text);
453
454        let mut results = table
455            .query()
456            .full_text_search(fts_query)
457            .nearest_to(query.embedding.clone())
458            .map_err(|e| StoreError::Query(format!("Failed to create hybrid query: {e}")))?
459            .limit(query.limit)
460            .execute_hybrid(QueryExecutionOptions::default())
461            .await
462            .map_err(|e| StoreError::Query(format!("Failed to execute hybrid search: {e}")))?;
463
464        let mut search_results = Vec::new();
465
466        while let Some(batch) = results
467            .try_next()
468            .await
469            .map_err(|e| StoreError::Query(format!("Failed to fetch hybrid results: {e}")))?
470        {
471            search_results.extend(batch_to_search_results(&batch)?);
472        }
473
474        debug!("Hybrid search found {} results", search_results.len());
475        Ok(search_results)
476    }
477
478    async fn delete_by_file_path(&self, path: &Path) -> Result<u64, StoreError> {
479        let path_str = path.to_string_lossy().to_string();
480        debug!("Deleting chunks for file: {}", path_str);
481
482        let table = self.get_chunks_table().await?;
483
484        table
485            .delete(&format!("file_path = '{}'", path_str.replace('\'', "''")))
486            .await
487            .map_err(|e| StoreError::Delete(format!("Failed to delete chunks: {e}")))?;
488
489        // Also delete from files table
490        let files_table = self.get_files_table().await?;
491        files_table
492            .delete(&format!("path = '{}'", path_str.replace('\'', "''")))
493            .await
494            .map_err(|e| StoreError::Delete(format!("Failed to delete file record: {e}")))?;
495
496        Ok(1) // LanceDB doesn't return count, we assume success
497    }
498
499    async fn update_file_path(&self, from: &Path, to: &Path) -> Result<u64, StoreError> {
500        // LanceDB doesn't support UPDATE directly, so we read-delete-insert
501        debug!("Updating file path from {:?} to {:?}", from, to);
502
503        // 1. Get all chunks for the old path
504        let mut chunks = self.get_chunks_for_file(from).await?;
505        if chunks.is_empty() {
506            debug!("No chunks found for path {:?}", from);
507            return Ok(0);
508        }
509
510        let chunk_count = chunks.len() as u64;
511
512        // 2. Update the file_path in each chunk
513        for chunk in &mut chunks {
514            chunk.file_path = to.to_path_buf();
515        }
516
517        // 3. Delete old chunks
518        self.delete_by_file_path(from).await?;
519
520        // 4. Insert updated chunks
521        self.upsert_chunks(&chunks).await?;
522
523        // 5. Also update file record if exists
524        if let Ok(Some(mut file_record)) = self.get_file(from).await {
525            file_record.path = to.to_path_buf();
526            self.upsert_file(&file_record).await?;
527        }
528
529        info!("Updated {} chunks from {:?} to {:?}", chunk_count, from, to);
530        Ok(chunk_count)
531    }
532
533    async fn get_chunks_for_file(&self, path: &Path) -> Result<Vec<Chunk>, StoreError> {
534        let path_str = path.to_string_lossy().to_string();
535        debug!("Getting chunks for file: {}", path_str);
536
537        let table = self.get_chunks_table().await?;
538
539        let mut results = table
540            .query()
541            .only_if(format!("file_path = '{}'", path_str.replace('\'', "''")))
542            .execute()
543            .await
544            .map_err(|e| StoreError::Query(format!("Failed to query chunks: {e}")))?;
545
546        let mut chunks = Vec::new();
547
548        while let Some(batch) = results
549            .try_next()
550            .await
551            .map_err(|e| StoreError::Query(format!("Failed to fetch chunks: {e}")))?
552        {
553            chunks.extend(batch_to_chunks(&batch)?);
554        }
555
556        Ok(chunks)
557    }
558
559    async fn get_file(&self, path: &Path) -> Result<Option<FileRecord>, StoreError> {
560        let path_str = path.to_string_lossy().to_string();
561        debug!("Getting file record: {}", path_str);
562
563        let table = self.get_files_table().await?;
564
565        let mut results = table
566            .query()
567            .only_if(format!("path = '{}'", path_str.replace('\'', "''")))
568            .limit(1)
569            .execute()
570            .await
571            .map_err(|e| StoreError::Query(format!("Failed to query file: {e}")))?;
572
573        if let Some(batch) = results
574            .try_next()
575            .await
576            .map_err(|e| StoreError::Query(format!("Failed to fetch file: {e}")))?
577        {
578            let records = batch_to_file_records(&batch)?;
579            return Ok(records.into_iter().next());
580        }
581
582        Ok(None)
583    }
584
585    async fn upsert_file(&self, record: &FileRecord) -> Result<(), StoreError> {
586        debug!("Upserting file record: {:?}", record.path);
587
588        let path_str = record.path.to_string_lossy().to_string();
589
590        // Delete existing file record only (not chunks!)
591        let files_table = self.get_files_table().await?;
592        let _ = files_table
593            .delete(&format!("path = '{}'", path_str.replace('\'', "''")))
594            .await;
595
596        // Insert new record
597        let batch = self.file_to_batch(record)?;
598        let schema = batch.schema();
599
600        let batches = RecordBatchIterator::new(vec![Ok(batch)], schema);
601
602        files_table
603            .add(Box::new(batches))
604            .execute()
605            .await
606            .map_err(|e| StoreError::Insert(format!("Failed to insert file record: {e}")))?;
607
608        Ok(())
609    }
610
611    async fn stats(&self) -> Result<StoreStats, StoreError> {
612        let chunks_table = self.get_chunks_table().await?;
613        let files_table = self.get_files_table().await?;
614
615        // Count chunks - use exact same pattern as get_chunks_for_file
616        let mut chunk_count = 0u64;
617        let mut results = chunks_table
618            .query()
619            .only_if("file_path LIKE '%'")
620            .execute()
621            .await
622            .map_err(|e| StoreError::Query(format!("Failed to query chunks: {e}")))?;
623
624        while let Some(batch) = results
625            .try_next()
626            .await
627            .map_err(|e| StoreError::Query(format!("Failed to count chunks: {e}")))?
628        {
629            chunk_count += batch.num_rows() as u64;
630        }
631
632        // Count files - use filter
633        let mut file_count = 0u64;
634        let mut results = files_table
635            .query()
636            .only_if("size_bytes >= 0")
637            .execute()
638            .await
639            .map_err(|e| StoreError::Query(format!("Failed to query files: {e}")))?;
640
641        while let Some(batch) = results
642            .try_next()
643            .await
644            .map_err(|e| StoreError::Query(format!("Failed to count files: {e}")))?
645        {
646            file_count += batch.num_rows() as u64;
647        }
648
649        // Calculate actual index size from disk
650        let index_size_bytes = calculate_dir_size(&self.db_path);
651
652        Ok(StoreStats {
653            total_chunks: chunk_count,
654            total_files: file_count,
655            index_size_bytes,
656            last_updated: Some(Utc::now()),
657        })
658    }
659
660    async fn get_all_chunks(&self) -> Result<Vec<Chunk>, StoreError> {
661        debug!("Getting all chunks");
662
663        let table = self.get_chunks_table().await?;
664
665        let mut results = table
666            .query()
667            .only_if("file_path LIKE '%'")
668            .execute()
669            .await
670            .map_err(|e| StoreError::Query(format!("Failed to query all chunks: {e}")))?;
671
672        let mut chunks = Vec::new();
673
674        while let Some(batch) = results
675            .try_next()
676            .await
677            .map_err(|e| StoreError::Query(format!("Failed to fetch chunks: {e}")))?
678        {
679            chunks.extend(batch_to_chunks(&batch)?);
680        }
681
682        debug!("Retrieved {} chunks", chunks.len());
683        Ok(chunks)
684    }
685
686    async fn get_all_files(&self) -> Result<Vec<FileRecord>, StoreError> {
687        debug!("Getting all file records");
688
689        let table = self.get_files_table().await?;
690
691        let mut results = table
692            .query()
693            .only_if("size_bytes >= 0")
694            .execute()
695            .await
696            .map_err(|e| StoreError::Query(format!("Failed to query all files: {e}")))?;
697
698        let mut records = Vec::new();
699
700        while let Some(batch) = results
701            .try_next()
702            .await
703            .map_err(|e| StoreError::Query(format!("Failed to fetch files: {e}")))?
704        {
705            records.extend(batch_to_file_records(&batch)?);
706        }
707
708        debug!("Retrieved {} file records", records.len());
709        Ok(records)
710    }
711}
712
713// ============================================================================
714// Helper functions
715// ============================================================================
716
717/// Calculate the total size of a directory recursively.
718fn calculate_dir_size(path: &Path) -> u64 {
719    if !path.exists() {
720        return 0;
721    }
722
723    let mut total_size = 0u64;
724
725    if let Ok(entries) = std::fs::read_dir(path) {
726        for entry in entries.flatten() {
727            let entry_path = entry.path();
728            if entry_path.is_file() {
729                if let Ok(metadata) = entry.metadata() {
730                    total_size += metadata.len();
731                }
732            } else if entry_path.is_dir() {
733                total_size += calculate_dir_size(&entry_path);
734            }
735        }
736    }
737
738    total_size
739}
740
741fn content_type_to_string(ct: &ContentType) -> String {
742    match ct {
743        ContentType::Text => "text".to_string(),
744        ContentType::Code { language, .. } => format!("code:{language}"),
745        ContentType::ImageCaption => "image_caption".to_string(),
746        ContentType::PdfPage { page_num } => format!("pdf:{page_num}"),
747        ContentType::Markdown => "markdown".to_string(),
748    }
749}
750
751fn string_to_content_type(s: &str) -> ContentType {
752    if s == "text" {
753        ContentType::Text
754    } else if s == "markdown" {
755        ContentType::Markdown
756    } else if s == "image_caption" {
757        ContentType::ImageCaption
758    } else if let Some(lang) = s.strip_prefix("code:") {
759        ContentType::Code {
760            language: lang.to_string(),
761            symbol: None,
762        }
763    } else if let Some(page) = s.strip_prefix("pdf:") {
764        ContentType::PdfPage {
765            page_num: page.parse().unwrap_or(1),
766        }
767    } else {
768        ContentType::Text
769    }
770}
771
772fn status_to_string(status: &FileStatus) -> String {
773    match status {
774        FileStatus::Pending => "pending".to_string(),
775        FileStatus::Indexing => "indexing".to_string(),
776        FileStatus::Indexed => "indexed".to_string(),
777        FileStatus::Error => "error".to_string(),
778        FileStatus::Deleted => "deleted".to_string(),
779    }
780}
781
782fn string_to_status(s: &str) -> FileStatus {
783    match s {
784        "pending" => FileStatus::Pending,
785        "indexing" => FileStatus::Indexing,
786        "indexed" => FileStatus::Indexed,
787        "error" => FileStatus::Error,
788        "deleted" => FileStatus::Deleted,
789        _ => FileStatus::Pending,
790    }
791}
792
793fn build_vector_array(
794    embeddings: &[Option<Vec<Option<f32>>>],
795    dim: usize,
796) -> Result<ArrayRef, StoreError> {
797    use arrow_array::builder::{FixedSizeListBuilder, Float32Builder};
798
799    let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dim as i32);
800
801    for emb in embeddings {
802        if let Some(values) = emb {
803            let values_builder = builder.values();
804            for &v in values {
805                values_builder.append_option(v);
806            }
807            builder.append(true);
808        } else {
809            // Append zeros for missing embeddings
810            let values_builder = builder.values();
811            for _ in 0..dim {
812                values_builder.append_value(0.0);
813            }
814            builder.append(true);
815        }
816    }
817
818    Ok(Arc::new(builder.finish()))
819}
820
821fn batch_to_search_results(batch: &RecordBatch) -> Result<Vec<SearchResult>, StoreError> {
822    let mut results = Vec::new();
823
824    let chunk_ids = batch
825        .column_by_name("chunk_id")
826        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
827    let file_paths = batch
828        .column_by_name("file_path")
829        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
830    let contents = batch
831        .column_by_name("content")
832        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
833    let start_bytes = batch
834        .column_by_name("start_byte")
835        .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
836    let end_bytes = batch
837        .column_by_name("end_byte")
838        .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
839    let start_lines = batch
840        .column_by_name("start_line")
841        .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
842    let end_lines = batch
843        .column_by_name("end_line")
844        .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
845    let distances = batch
846        .column_by_name("_distance")
847        .and_then(|c| c.as_any().downcast_ref::<Float32Array>());
848
849    let (Some(chunk_ids), Some(file_paths), Some(contents), Some(start_bytes), Some(end_bytes)) =
850        (chunk_ids, file_paths, contents, start_bytes, end_bytes)
851    else {
852        return Err(StoreError::Query("Missing required columns".to_string()));
853    };
854
855    for i in 0..batch.num_rows() {
856        let chunk_id = chunk_ids.value(i);
857        let file_path = file_paths.value(i);
858        let content = contents.value(i);
859        let start = start_bytes.value(i);
860        let end = end_bytes.value(i);
861
862        let line_range = match (start_lines, end_lines) {
863            (Some(sl), Some(el)) if !sl.is_null(i) && !el.is_null(i) => {
864                Some(sl.value(i)..el.value(i))
865            }
866            _ => None,
867        };
868
869        let score = distances.map_or(0.0, |d| 1.0 - d.value(i));
870
871        results.push(SearchResult {
872            chunk_id: Uuid::parse_str(chunk_id).unwrap_or_default(),
873            file_path: PathBuf::from(file_path),
874            content: content.to_string(),
875            score,
876            byte_range: start..end,
877            line_range,
878            metadata: HashMap::new(),
879        });
880    }
881
882    Ok(results)
883}
884
885fn batch_to_chunks(batch: &RecordBatch) -> Result<Vec<Chunk>, StoreError> {
886    let mut chunks = Vec::new();
887
888    // Similar to batch_to_search_results but returns full Chunk structs
889    // Simplified for now - full implementation would parse all fields
890
891    let chunk_ids = batch
892        .column_by_name("chunk_id")
893        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
894    let file_ids = batch
895        .column_by_name("file_id")
896        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
897    let file_paths = batch
898        .column_by_name("file_path")
899        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
900    let contents = batch
901        .column_by_name("content")
902        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
903    let content_types = batch
904        .column_by_name("content_type")
905        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
906    let chunk_indices = batch
907        .column_by_name("chunk_index")
908        .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
909    let start_bytes = batch
910        .column_by_name("start_byte")
911        .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
912    let end_bytes = batch
913        .column_by_name("end_byte")
914        .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
915    let depths = batch
916        .column_by_name("depth")
917        .and_then(|c| c.as_any().downcast_ref::<UInt8Array>());
918    let mime_types = batch
919        .column_by_name("mime_type")
920        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
921    let start_lines = batch
922        .column_by_name("start_line")
923        .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
924    let end_lines = batch
925        .column_by_name("end_line")
926        .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
927    let embeddings = batch
928        .column_by_name("embedding")
929        .and_then(|c| c.as_any().downcast_ref::<FixedSizeListArray>());
930
931    let (
932        Some(chunk_ids),
933        Some(file_ids),
934        Some(file_paths),
935        Some(contents),
936        Some(content_types),
937        Some(chunk_indices),
938        Some(start_bytes),
939        Some(end_bytes),
940        Some(depths),
941    ) = (
942        chunk_ids,
943        file_ids,
944        file_paths,
945        contents,
946        content_types,
947        chunk_indices,
948        start_bytes,
949        end_bytes,
950        depths,
951    )
952    else {
953        return Err(StoreError::Query(
954            "Missing required columns in chunks".to_string(),
955        ));
956    };
957
958    for i in 0..batch.num_rows() {
959        let mime_type = mime_types.and_then(|m| {
960            if m.is_null(i) {
961                None
962            } else {
963                Some(m.value(i).to_string())
964            }
965        });
966
967        // Parse line range from start_line and end_line columns
968        let line_range = match (start_lines, end_lines) {
969            (Some(starts), Some(ends)) if !starts.is_null(i) && !ends.is_null(i) => {
970                Some(starts.value(i)..ends.value(i))
971            }
972            _ => None,
973        };
974
975        // Parse embedding vector
976        let embedding = embeddings.and_then(|emb_array| {
977            if emb_array.is_null(i) {
978                None
979            } else {
980                let values = emb_array.value(i);
981                values
982                    .as_any()
983                    .downcast_ref::<Float32Array>()
984                    .map(|arr| arr.values().to_vec())
985            }
986        });
987
988        chunks.push(Chunk {
989            id: Uuid::parse_str(chunk_ids.value(i)).unwrap_or_default(),
990            file_id: Uuid::parse_str(file_ids.value(i)).unwrap_or_default(),
991            file_path: PathBuf::from(file_paths.value(i)),
992            content: contents.value(i).to_string(),
993            content_type: string_to_content_type(content_types.value(i)),
994            mime_type,
995            chunk_index: chunk_indices.value(i),
996            byte_range: start_bytes.value(i)..end_bytes.value(i),
997            line_range,
998            parent_chunk_id: None,
999            depth: depths.value(i),
1000            embedding,
1001            metadata: ChunkMetadata::default(),
1002        });
1003    }
1004
1005    Ok(chunks)
1006}
1007
1008fn batch_to_file_records(batch: &RecordBatch) -> Result<Vec<FileRecord>, StoreError> {
1009    let mut records = Vec::new();
1010
1011    let file_ids = batch
1012        .column_by_name("file_id")
1013        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1014    let paths = batch
1015        .column_by_name("path")
1016        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1017    let sizes = batch
1018        .column_by_name("size_bytes")
1019        .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
1020    let mime_types = batch
1021        .column_by_name("mime_type")
1022        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1023    let hashes = batch
1024        .column_by_name("content_hash")
1025        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1026    let modified_ats = batch
1027        .column_by_name("modified_at")
1028        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1029    let chunk_counts = batch
1030        .column_by_name("chunk_count")
1031        .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
1032    let statuses = batch
1033        .column_by_name("status")
1034        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1035    let indexed_ats = batch
1036        .column_by_name("indexed_at")
1037        .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1038
1039    let (
1040        Some(file_ids),
1041        Some(paths),
1042        Some(sizes),
1043        Some(mime_types),
1044        Some(hashes),
1045        Some(modified_ats),
1046        Some(chunk_counts),
1047        Some(statuses),
1048    ) = (
1049        file_ids,
1050        paths,
1051        sizes,
1052        mime_types,
1053        hashes,
1054        modified_ats,
1055        chunk_counts,
1056        statuses,
1057    )
1058    else {
1059        return Err(StoreError::Query(
1060            "Missing required columns in files".to_string(),
1061        ));
1062    };
1063
1064    for i in 0..batch.num_rows() {
1065        let modified_at = chrono::DateTime::parse_from_rfc3339(modified_ats.value(i))
1066            .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc));
1067
1068        // Parse indexed_at timestamp
1069        let indexed_at = indexed_ats.and_then(|arr| {
1070            if arr.is_null(i) {
1071                None
1072            } else {
1073                chrono::DateTime::parse_from_rfc3339(arr.value(i))
1074                    .map(|dt| dt.with_timezone(&Utc))
1075                    .ok()
1076            }
1077        });
1078
1079        records.push(FileRecord {
1080            id: Uuid::parse_str(file_ids.value(i)).unwrap_or_default(),
1081            path: PathBuf::from(paths.value(i)),
1082            size_bytes: sizes.value(i),
1083            mime_type: mime_types.value(i).to_string(),
1084            content_hash: hashes.value(i).to_string(),
1085            modified_at,
1086            indexed_at,
1087            chunk_count: chunk_counts.value(i),
1088            status: string_to_status(statuses.value(i)),
1089            error_message: None,
1090        });
1091    }
1092
1093    Ok(records)
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098    use super::*;
1099    use ragfs_core::DistanceMetric;
1100    use std::collections::HashMap;
1101    use tempfile::tempdir;
1102
1103    const TEST_DIM: usize = 384;
1104
1105    fn create_test_chunk(
1106        file_path: &Path,
1107        content: &str,
1108        embedding: Vec<f32>,
1109        chunk_index: u32,
1110    ) -> Chunk {
1111        Chunk {
1112            id: Uuid::new_v4(),
1113            file_id: Uuid::new_v4(),
1114            file_path: file_path.to_path_buf(),
1115            content: content.to_string(),
1116            content_type: ContentType::Text,
1117            mime_type: Some("text/plain".to_string()),
1118            chunk_index,
1119            byte_range: 0..content.len() as u64,
1120            line_range: Some(0..1),
1121            parent_chunk_id: None,
1122            depth: 0,
1123            embedding: Some(embedding),
1124            metadata: ChunkMetadata {
1125                indexed_at: Some(Utc::now()),
1126                embedding_model: Some("test-model".to_string()),
1127                token_count: None,
1128                extra: HashMap::new(),
1129            },
1130        }
1131    }
1132
1133    fn create_random_embedding(dim: usize) -> Vec<f32> {
1134        (0..dim).map(|i| (i as f32 * 0.001).sin()).collect()
1135    }
1136
1137    fn create_test_file_record(path: &Path) -> FileRecord {
1138        FileRecord {
1139            id: Uuid::new_v4(),
1140            path: path.to_path_buf(),
1141            size_bytes: 1024,
1142            mime_type: "text/plain".to_string(),
1143            content_hash: "abc123".to_string(),
1144            modified_at: Utc::now(),
1145            indexed_at: Some(Utc::now()),
1146            chunk_count: 1,
1147            status: FileStatus::Indexed,
1148            error_message: None,
1149        }
1150    }
1151
1152    #[tokio::test]
1153    async fn test_init_creates_tables() {
1154        let temp = tempdir().unwrap();
1155        let db_path = temp.path().join("test.lance");
1156        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1157
1158        let result = store.init().await;
1159        assert!(result.is_ok(), "Init failed: {:?}", result.err());
1160
1161        // Verify tables exist
1162        let conn = store.get_connection().await.unwrap();
1163        let tables = conn.table_names().execute().await.unwrap();
1164        assert!(tables.contains(&"chunks".to_string()));
1165        assert!(tables.contains(&"files".to_string()));
1166    }
1167
1168    #[tokio::test]
1169    async fn test_init_idempotent() {
1170        let temp = tempdir().unwrap();
1171        let db_path = temp.path().join("test.lance");
1172        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1173
1174        // Init twice should succeed
1175        store.init().await.unwrap();
1176        let result = store.init().await;
1177        assert!(result.is_ok());
1178    }
1179
1180    #[tokio::test]
1181    async fn test_upsert_and_get_chunks() {
1182        let temp = tempdir().unwrap();
1183        let db_path = temp.path().join("test.lance");
1184        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1185        store.init().await.unwrap();
1186
1187        let file_path = PathBuf::from("/test/file.txt");
1188        let embedding = create_random_embedding(TEST_DIM);
1189        let chunk = create_test_chunk(&file_path, "Hello world", embedding, 0);
1190
1191        // Upsert
1192        let result = store.upsert_chunks(&[chunk]).await;
1193        assert!(result.is_ok(), "Upsert failed: {:?}", result.err());
1194
1195        // Get chunks back
1196        let chunks = store.get_chunks_for_file(&file_path).await.unwrap();
1197        assert_eq!(chunks.len(), 1);
1198        assert_eq!(chunks[0].content, "Hello world");
1199    }
1200
1201    #[tokio::test]
1202    async fn test_upsert_multiple_chunks() {
1203        let temp = tempdir().unwrap();
1204        let db_path = temp.path().join("test.lance");
1205        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1206        store.init().await.unwrap();
1207
1208        let file_path = PathBuf::from("/test/multi.txt");
1209        let chunks: Vec<Chunk> = (0..5)
1210            .map(|i| {
1211                create_test_chunk(
1212                    &file_path,
1213                    &format!("Chunk content {i}"),
1214                    create_random_embedding(TEST_DIM),
1215                    i,
1216                )
1217            })
1218            .collect();
1219
1220        store.upsert_chunks(&chunks).await.unwrap();
1221
1222        let retrieved = store.get_chunks_for_file(&file_path).await.unwrap();
1223        assert_eq!(retrieved.len(), 5);
1224    }
1225
1226    #[tokio::test]
1227    async fn test_upsert_empty_chunks() {
1228        let temp = tempdir().unwrap();
1229        let db_path = temp.path().join("test.lance");
1230        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1231        store.init().await.unwrap();
1232
1233        // Empty vec should succeed
1234        let result = store.upsert_chunks(&[]).await;
1235        assert!(result.is_ok());
1236    }
1237
1238    #[tokio::test]
1239    async fn test_search_returns_results() {
1240        let temp = tempdir().unwrap();
1241        let db_path = temp.path().join("test.lance");
1242        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1243        store.init().await.unwrap();
1244
1245        // Insert some chunks
1246        let file_path = PathBuf::from("/test/search.txt");
1247        let embedding = create_random_embedding(TEST_DIM);
1248        let chunk = create_test_chunk(&file_path, "Authentication logic", embedding.clone(), 0);
1249        store.upsert_chunks(&[chunk]).await.unwrap();
1250
1251        // Search with same embedding should find it
1252        let query = SearchQuery {
1253            text: Some("auth".to_string()),
1254            embedding: embedding.clone(),
1255            limit: 10,
1256            filters: vec![],
1257            metric: DistanceMetric::Cosine,
1258        };
1259
1260        let results = store.search(query).await.unwrap();
1261        assert!(!results.is_empty());
1262        assert_eq!(results[0].content, "Authentication logic");
1263    }
1264
1265    #[tokio::test]
1266    async fn test_search_respects_limit() {
1267        let temp = tempdir().unwrap();
1268        let db_path = temp.path().join("test.lance");
1269        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1270        store.init().await.unwrap();
1271
1272        // Insert 10 chunks
1273        let file_path = PathBuf::from("/test/limit.txt");
1274        let chunks: Vec<Chunk> = (0..10)
1275            .map(|i| {
1276                create_test_chunk(
1277                    &file_path,
1278                    &format!("Content {i}"),
1279                    create_random_embedding(TEST_DIM),
1280                    i,
1281                )
1282            })
1283            .collect();
1284        store.upsert_chunks(&chunks).await.unwrap();
1285
1286        // Search with limit 3
1287        let query = SearchQuery {
1288            text: Some("test".to_string()),
1289            embedding: create_random_embedding(TEST_DIM),
1290            limit: 3,
1291            filters: vec![],
1292            metric: DistanceMetric::Cosine,
1293        };
1294
1295        let results = store.search(query).await.unwrap();
1296        assert!(results.len() <= 3);
1297    }
1298
1299    #[tokio::test]
1300    async fn test_delete_by_file_path() {
1301        let temp = tempdir().unwrap();
1302        let db_path = temp.path().join("test.lance");
1303        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1304        store.init().await.unwrap();
1305
1306        let file_path = PathBuf::from("/test/delete.txt");
1307        let chunk = create_test_chunk(
1308            &file_path,
1309            "To be deleted",
1310            create_random_embedding(TEST_DIM),
1311            0,
1312        );
1313        store.upsert_chunks(&[chunk]).await.unwrap();
1314
1315        // Verify it exists
1316        let chunks = store.get_chunks_for_file(&file_path).await.unwrap();
1317        assert_eq!(chunks.len(), 1);
1318
1319        // Delete
1320        store.delete_by_file_path(&file_path).await.unwrap();
1321
1322        // Verify it's gone
1323        let chunks = store.get_chunks_for_file(&file_path).await.unwrap();
1324        assert_eq!(chunks.len(), 0);
1325    }
1326
1327    #[tokio::test]
1328    async fn test_upsert_and_get_file_record() {
1329        let temp = tempdir().unwrap();
1330        let db_path = temp.path().join("test.lance");
1331        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1332        store.init().await.unwrap();
1333
1334        let file_path = PathBuf::from("/test/record.txt");
1335        let record = create_test_file_record(&file_path);
1336
1337        // Upsert
1338        store.upsert_file(&record).await.unwrap();
1339
1340        // Get
1341        let retrieved = store.get_file(&file_path).await.unwrap();
1342        assert!(retrieved.is_some());
1343        let retrieved = retrieved.unwrap();
1344        assert_eq!(retrieved.path, file_path);
1345        assert_eq!(retrieved.mime_type, "text/plain");
1346    }
1347
1348    #[tokio::test]
1349    async fn test_get_nonexistent_file() {
1350        let temp = tempdir().unwrap();
1351        let db_path = temp.path().join("test.lance");
1352        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1353        store.init().await.unwrap();
1354
1355        let result = store
1356            .get_file(&PathBuf::from("/nonexistent"))
1357            .await
1358            .unwrap();
1359        assert!(result.is_none());
1360    }
1361
1362    #[tokio::test]
1363    async fn test_stats() {
1364        let temp = tempdir().unwrap();
1365        let db_path = temp.path().join("test.lance");
1366        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1367        store.init().await.unwrap();
1368
1369        // Initially empty
1370        let stats = store.stats().await.unwrap();
1371        assert_eq!(stats.total_chunks, 0);
1372        assert_eq!(stats.total_files, 0);
1373
1374        // Add chunk
1375        let file_path = PathBuf::from("/test/stats.txt");
1376        let chunk = create_test_chunk(
1377            &file_path,
1378            "Stats test",
1379            create_random_embedding(TEST_DIM),
1380            0,
1381        );
1382        store.upsert_chunks(&[chunk]).await.unwrap();
1383
1384        // Add file record (should NOT delete chunks)
1385        let record = create_test_file_record(&file_path);
1386        store.upsert_file(&record).await.unwrap();
1387
1388        // Verify chunks still exist after upsert_file
1389        let chunks = store.get_chunks_for_file(&file_path).await.unwrap();
1390        assert_eq!(
1391            chunks.len(),
1392            1,
1393            "Chunks should still exist after upsert_file"
1394        );
1395
1396        // Verify file exists
1397        let file = store.get_file(&file_path).await.unwrap();
1398        assert!(file.is_some(), "File should be retrievable");
1399
1400        // Check stats
1401        let stats = store.stats().await.unwrap();
1402        assert_eq!(stats.total_chunks, 1);
1403        assert_eq!(stats.total_files, 1);
1404        assert!(
1405            stats.index_size_bytes > 0,
1406            "index_size_bytes should be > 0, got {}",
1407            stats.index_size_bytes
1408        );
1409    }
1410
1411    #[tokio::test]
1412    async fn test_content_type_conversion() {
1413        assert_eq!(content_type_to_string(&ContentType::Text), "text");
1414        assert_eq!(content_type_to_string(&ContentType::Markdown), "markdown");
1415        assert_eq!(
1416            content_type_to_string(&ContentType::Code {
1417                language: "rust".to_string(),
1418                symbol: None
1419            }),
1420            "code:rust"
1421        );
1422        assert_eq!(
1423            content_type_to_string(&ContentType::PdfPage { page_num: 5 }),
1424            "pdf:5"
1425        );
1426
1427        assert!(matches!(string_to_content_type("text"), ContentType::Text));
1428        assert!(matches!(
1429            string_to_content_type("markdown"),
1430            ContentType::Markdown
1431        ));
1432        assert!(matches!(
1433            string_to_content_type("code:python"),
1434            ContentType::Code { language, .. } if language == "python"
1435        ));
1436    }
1437
1438    #[tokio::test]
1439    async fn test_file_status_conversion() {
1440        assert_eq!(status_to_string(&FileStatus::Pending), "pending");
1441        assert_eq!(status_to_string(&FileStatus::Indexed), "indexed");
1442        assert_eq!(status_to_string(&FileStatus::Error), "error");
1443
1444        assert!(matches!(string_to_status("pending"), FileStatus::Pending));
1445        assert!(matches!(string_to_status("indexed"), FileStatus::Indexed));
1446        assert!(matches!(string_to_status("error"), FileStatus::Error));
1447        assert!(matches!(string_to_status("unknown"), FileStatus::Pending));
1448    }
1449
1450    #[tokio::test]
1451    async fn test_chunks_with_code_content_type() {
1452        let temp = tempdir().unwrap();
1453        let db_path = temp.path().join("test.lance");
1454        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1455        store.init().await.unwrap();
1456
1457        let file_path = PathBuf::from("/test/code.rs");
1458        let chunk = Chunk {
1459            id: Uuid::new_v4(),
1460            file_id: Uuid::new_v4(),
1461            file_path: file_path.clone(),
1462            content: "fn main() {}".to_string(),
1463            content_type: ContentType::Code {
1464                language: "rust".to_string(),
1465                symbol: None,
1466            },
1467            mime_type: Some("text/x-rust".to_string()),
1468            chunk_index: 0,
1469            byte_range: 0..12,
1470            line_range: Some(0..1),
1471            parent_chunk_id: None,
1472            depth: 0,
1473            embedding: Some(create_random_embedding(TEST_DIM)),
1474            metadata: ChunkMetadata::default(),
1475        };
1476
1477        store.upsert_chunks(&[chunk]).await.unwrap();
1478
1479        let chunks = store.get_chunks_for_file(&file_path).await.unwrap();
1480        assert_eq!(chunks.len(), 1);
1481        assert!(matches!(
1482            &chunks[0].content_type,
1483            ContentType::Code { language, .. } if language == "rust"
1484        ));
1485    }
1486
1487    #[tokio::test]
1488    async fn test_get_all_files_empty() {
1489        let temp = tempdir().unwrap();
1490        let db_path = temp.path().join("test.lance");
1491        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1492        store.init().await.unwrap();
1493
1494        let files = store.get_all_files().await.unwrap();
1495        assert!(files.is_empty());
1496    }
1497
1498    #[tokio::test]
1499    async fn test_delete_nonexistent_file() {
1500        let temp = tempdir().unwrap();
1501        let db_path = temp.path().join("test.lance");
1502        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1503        store.init().await.unwrap();
1504
1505        let path = PathBuf::from("/nonexistent/file.txt");
1506        // Deleting a nonexistent file should not error
1507        let result = store.delete_by_file_path(&path).await;
1508        assert!(result.is_ok());
1509    }
1510
1511    #[tokio::test]
1512    async fn test_get_file_not_found() {
1513        let temp = tempdir().unwrap();
1514        let db_path = temp.path().join("test.lance");
1515        let store = LanceStore::new(db_path.clone(), TEST_DIM);
1516        store.init().await.unwrap();
1517
1518        let path = PathBuf::from("/nonexistent/file.txt");
1519        let result = store.get_file(&path).await.unwrap();
1520        assert!(result.is_none());
1521    }
1522}