ragfs_index/
indexer.rs

1//! Main indexing service.
2
3use chrono::Utc;
4use ragfs_chunker::ChunkerRegistry;
5use ragfs_core::{
6    Chunk, ChunkConfig, ChunkMetadata, ChunkOutput, ContentType, EmbeddingConfig, Error, FileEvent,
7    FileRecord, FileStatus, IndexStats, Indexer, Result, VectorStore,
8};
9use ragfs_embed::EmbedderPool;
10use ragfs_extract::ExtractorRegistry;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use tokio::sync::{RwLock, broadcast, mpsc};
14use tracing::{debug, error, info, warn};
15use uuid::Uuid;
16
17use crate::watcher::FileWatcher;
18
19/// Index update events.
20#[derive(Debug, Clone)]
21pub enum IndexUpdate {
22    FileIndexed { path: PathBuf, chunk_count: u32 },
23    FileRemoved { path: PathBuf },
24    FileError { path: PathBuf, error: String },
25    IndexingStarted { path: PathBuf },
26}
27
28/// Configuration for the indexer.
29#[derive(Debug, Clone)]
30pub struct IndexerConfig {
31    /// Chunk configuration
32    pub chunk_config: ChunkConfig,
33    /// Embedding configuration
34    pub embed_config: EmbeddingConfig,
35    /// Include patterns (glob)
36    pub include_patterns: Vec<String>,
37    /// Exclude patterns (glob)
38    pub exclude_patterns: Vec<String>,
39}
40
41impl Default for IndexerConfig {
42    fn default() -> Self {
43        Self {
44            chunk_config: ChunkConfig::default(),
45            embed_config: EmbeddingConfig::default(),
46            include_patterns: vec!["**/*".to_string()],
47            exclude_patterns: vec![
48                "**/.*".to_string(),
49                "**/.git/**".to_string(),
50                "**/node_modules/**".to_string(),
51                "**/target/**".to_string(),
52                "**/__pycache__/**".to_string(),
53                "**/*.lock".to_string(),
54            ],
55        }
56    }
57}
58
59/// Main indexing service.
60pub struct IndexerService {
61    /// Root path being indexed
62    root: PathBuf,
63    /// Vector store
64    store: Arc<dyn VectorStore>,
65    /// Extractor registry
66    extractors: Arc<ExtractorRegistry>,
67    /// Chunker registry
68    chunkers: Arc<ChunkerRegistry>,
69    /// Embedder pool
70    embedder: Arc<EmbedderPool>,
71    /// Configuration
72    config: IndexerConfig,
73    /// Current stats
74    stats: Arc<RwLock<IndexStats>>,
75    /// Event sender for file watcher
76    event_tx: mpsc::Sender<FileEvent>,
77    /// Event receiver
78    event_rx: Arc<RwLock<mpsc::Receiver<FileEvent>>>,
79    /// Update broadcast
80    update_tx: broadcast::Sender<IndexUpdate>,
81    /// File watcher (if active)
82    watcher: Arc<RwLock<Option<FileWatcher>>>,
83    /// Running flag
84    running: Arc<RwLock<bool>>,
85}
86
87impl IndexerService {
88    /// Create a new indexer service.
89    pub fn new(
90        root: PathBuf,
91        store: Arc<dyn VectorStore>,
92        extractors: Arc<ExtractorRegistry>,
93        chunkers: Arc<ChunkerRegistry>,
94        embedder: Arc<EmbedderPool>,
95        config: IndexerConfig,
96    ) -> Self {
97        let (event_tx, event_rx) = mpsc::channel(1024);
98        let (update_tx, _) = broadcast::channel(256);
99
100        Self {
101            root,
102            store,
103            extractors,
104            chunkers,
105            embedder,
106            config,
107            stats: Arc::new(RwLock::new(IndexStats::default())),
108            event_tx,
109            event_rx: Arc::new(RwLock::new(event_rx)),
110            update_tx,
111            watcher: Arc::new(RwLock::new(None)),
112            running: Arc::new(RwLock::new(false)),
113        }
114    }
115
116    /// Subscribe to index updates.
117    #[must_use]
118    pub fn subscribe(&self) -> broadcast::Receiver<IndexUpdate> {
119        self.update_tx.subscribe()
120    }
121
122    /// Get the root path.
123    #[must_use]
124    pub fn root(&self) -> &Path {
125        &self.root
126    }
127
128    /// Start the indexer background task.
129    pub async fn start(&self) -> Result<()> {
130        let mut running = self.running.write().await;
131        if *running {
132            return Ok(());
133        }
134        *running = true;
135        drop(running);
136
137        info!("Starting indexer for {:?}", self.root);
138
139        // Initialize store
140        self.store.init().await.map_err(Error::Store)?;
141
142        // Start file watcher
143        let watcher =
144            FileWatcher::new(self.event_tx.clone(), std::time::Duration::from_millis(500))
145                .map_err(|e| Error::Other(format!("watcher error: {e}")))?;
146        {
147            let mut w = self.watcher.write().await;
148            *w = Some(watcher);
149        }
150
151        // Watch root directory
152        {
153            let mut w = self.watcher.write().await;
154            if let Some(ref mut watcher) = *w {
155                watcher
156                    .watch(&self.root)
157                    .map_err(|e| Error::Other(format!("watch error: {e}")))?;
158            }
159        }
160
161        // Clone what we need for the background task
162        let event_rx = Arc::clone(&self.event_rx);
163        let update_tx = self.update_tx.clone();
164        let running = Arc::clone(&self.running);
165        let store = Arc::clone(&self.store);
166        let extractors = Arc::clone(&self.extractors);
167        let chunkers = Arc::clone(&self.chunkers);
168        let embedder = Arc::clone(&self.embedder);
169        let config = self.config.clone();
170        let stats = Arc::clone(&self.stats);
171
172        // Spawn event processing task
173        tokio::spawn(async move {
174            let mut rx = event_rx.write().await;
175            while *running.read().await {
176                match rx.recv().await {
177                    Some(event) => {
178                        debug!("Received file event: {:?}", event);
179                        match &event {
180                            FileEvent::Created(path) | FileEvent::Modified(path) => {
181                                let _ = update_tx
182                                    .send(IndexUpdate::IndexingStarted { path: path.clone() });
183
184                                match process_file(
185                                    path,
186                                    &store,
187                                    &extractors,
188                                    &chunkers,
189                                    &embedder,
190                                    &config,
191                                )
192                                .await
193                                {
194                                    Ok(chunk_count) => {
195                                        info!("Indexed {:?} ({} chunks)", path, chunk_count);
196                                        let _ = update_tx.send(IndexUpdate::FileIndexed {
197                                            path: path.clone(),
198                                            chunk_count,
199                                        });
200
201                                        // Update stats
202                                        let mut s = stats.write().await;
203                                        s.indexed_files += 1;
204                                        s.total_chunks += u64::from(chunk_count);
205                                        s.last_update = Some(Utc::now());
206                                    }
207                                    Err(e) => {
208                                        error!("Failed to index {:?}: {}", path, e);
209                                        let _ = update_tx.send(IndexUpdate::FileError {
210                                            path: path.clone(),
211                                            error: e.to_string(),
212                                        });
213
214                                        // Update stats
215                                        let mut s = stats.write().await;
216                                        s.error_files += 1;
217                                    }
218                                }
219                            }
220                            FileEvent::Deleted(path) => {
221                                if let Err(e) = store.delete_by_file_path(path).await {
222                                    error!("Failed to delete {:?}: {}", path, e);
223                                } else {
224                                    let _ = update_tx
225                                        .send(IndexUpdate::FileRemoved { path: path.clone() });
226                                }
227                            }
228                            FileEvent::Renamed { from, to } => {
229                                // Delete old, index new
230                                if let Err(e) = store.delete_by_file_path(from).await {
231                                    warn!("Failed to delete old path {:?}: {}", from, e);
232                                }
233                                let _ = update_tx
234                                    .send(IndexUpdate::IndexingStarted { path: to.clone() });
235
236                                match process_file(
237                                    to,
238                                    &store,
239                                    &extractors,
240                                    &chunkers,
241                                    &embedder,
242                                    &config,
243                                )
244                                .await
245                                {
246                                    Ok(chunk_count) => {
247                                        let _ = update_tx.send(IndexUpdate::FileIndexed {
248                                            path: to.clone(),
249                                            chunk_count,
250                                        });
251                                    }
252                                    Err(e) => {
253                                        let _ = update_tx.send(IndexUpdate::FileError {
254                                            path: to.clone(),
255                                            error: e.to_string(),
256                                        });
257                                    }
258                                }
259                            }
260                        }
261                    }
262                    None => break,
263                }
264            }
265        });
266
267        // Initial scan
268        self.scan().await?;
269
270        Ok(())
271    }
272
273    /// Perform initial scan of the root directory.
274    async fn scan(&self) -> Result<()> {
275        info!("Scanning {:?}", self.root);
276
277        let root = self.root.clone();
278        let event_tx = self.event_tx.clone();
279        let exclude_patterns = self.config.exclude_patterns.clone();
280
281        // Walk directory in background thread (blocking I/O)
282        tokio::task::spawn_blocking(move || {
283            scan_directory(&root, &event_tx, &exclude_patterns);
284        })
285        .await
286        .map_err(|e| Error::Other(format!("scan task failed: {e}")))?;
287
288        Ok(())
289    }
290
291    /// Process a single file through the pipeline.
292    pub async fn process_single(&self, path: &Path) -> Result<u32> {
293        process_file(
294            path,
295            &self.store,
296            &self.extractors,
297            &self.chunkers,
298            &self.embedder,
299            &self.config,
300        )
301        .await
302    }
303
304    /// Reindex a path (file or directory).
305    ///
306    /// If the path is a file, it will be reindexed (existing chunks deleted first).
307    /// If the path is a directory, all files in it will be reindexed recursively.
308    pub async fn reindex_path(&self, path: &Path) -> Result<()> {
309        if !path.exists() {
310            return Err(Error::Other(format!(
311                "Path does not exist: {}",
312                path.display()
313            )));
314        }
315
316        if path.is_file() {
317            // Delete existing chunks and reindex
318            let _ = self.store.delete_by_file_path(path).await;
319
320            let _ = self.update_tx.send(IndexUpdate::IndexingStarted {
321                path: path.to_path_buf(),
322            });
323
324            match process_file(
325                path,
326                &self.store,
327                &self.extractors,
328                &self.chunkers,
329                &self.embedder,
330                &self.config,
331            )
332            .await
333            {
334                Ok(chunk_count) => {
335                    info!("Reindexed {:?} ({} chunks)", path, chunk_count);
336                    let _ = self.update_tx.send(IndexUpdate::FileIndexed {
337                        path: path.to_path_buf(),
338                        chunk_count,
339                    });
340                }
341                Err(e) => {
342                    error!("Failed to reindex {:?}: {}", path, e);
343                    let _ = self.update_tx.send(IndexUpdate::FileError {
344                        path: path.to_path_buf(),
345                        error: e.to_string(),
346                    });
347                    return Err(e);
348                }
349            }
350        } else if path.is_dir() {
351            // Reindex all files in directory recursively
352            info!("Reindexing directory {:?}", path);
353            self.reindex_directory(path).await?;
354        }
355
356        Ok(())
357    }
358
359    /// Recursively reindex all files in a directory.
360    async fn reindex_directory(&self, dir: &Path) -> Result<()> {
361        let entries = tokio::fs::read_dir(dir)
362            .await
363            .map_err(|e| Error::Other(format!("Failed to read directory: {e}")))?;
364
365        let mut entries_stream = tokio_stream::wrappers::ReadDirStream::new(entries);
366
367        use tokio_stream::StreamExt;
368        while let Some(entry) = entries_stream.next().await {
369            let entry = entry.map_err(|e| Error::Other(format!("Failed to read entry: {e}")))?;
370            let path = entry.path();
371
372            // Check exclusion patterns
373            let path_str = path.to_string_lossy();
374            let should_exclude = self.config.exclude_patterns.iter().any(|pattern| {
375                if pattern.contains("**") {
376                    let parts: Vec<&str> = pattern.split("**").collect();
377                    if parts.len() == 2 {
378                        let prefix = parts[0].trim_matches('/');
379                        let suffix = parts[1].trim_matches('/');
380                        (prefix.is_empty() || path_str.contains(prefix))
381                            && (suffix.is_empty() || path_str.contains(suffix))
382                    } else {
383                        false
384                    }
385                } else if pattern.starts_with('*') {
386                    path_str.ends_with(pattern.trim_start_matches('*'))
387                } else {
388                    path_str.contains(pattern.trim_matches('*'))
389                }
390            });
391
392            if should_exclude {
393                continue;
394            }
395
396            if path.is_dir() {
397                // Recurse into subdirectory using a boxed future to avoid infinite recursion
398                Box::pin(self.reindex_directory(&path)).await?;
399            } else if path.is_file() {
400                // Reindex file - use process_file directly to avoid recursion
401                let _ = self.store.delete_by_file_path(&path).await;
402
403                let _ = self
404                    .update_tx
405                    .send(IndexUpdate::IndexingStarted { path: path.clone() });
406
407                match process_file(
408                    &path,
409                    &self.store,
410                    &self.extractors,
411                    &self.chunkers,
412                    &self.embedder,
413                    &self.config,
414                )
415                .await
416                {
417                    Ok(chunk_count) => {
418                        info!("Reindexed {:?} ({} chunks)", path, chunk_count);
419                        let _ = self.update_tx.send(IndexUpdate::FileIndexed {
420                            path: path.clone(),
421                            chunk_count,
422                        });
423                    }
424                    Err(e) => {
425                        warn!("Failed to reindex {:?}: {}", path, e);
426                        let _ = self.update_tx.send(IndexUpdate::FileError {
427                            path: path.clone(),
428                            error: e.to_string(),
429                        });
430                        // Continue with other files
431                    }
432                }
433            }
434        }
435
436        Ok(())
437    }
438}
439
440/// Scan a directory and send file events.
441fn scan_directory(root: &Path, event_tx: &mpsc::Sender<FileEvent>, exclude_patterns: &[String]) {
442    use std::fs;
443
444    fn visit_dir(dir: &Path, event_tx: &mpsc::Sender<FileEvent>, exclude_patterns: &[String]) {
445        let entries = match fs::read_dir(dir) {
446            Ok(e) => e,
447            Err(e) => {
448                warn!("Cannot read directory {:?}: {}", dir, e);
449                return;
450            }
451        };
452
453        for entry in entries.flatten() {
454            let path = entry.path();
455
456            // Check exclusion patterns
457            let path_str = path.to_string_lossy();
458            let should_exclude = exclude_patterns.iter().any(|pattern| {
459                // Simple glob matching
460                if pattern.contains("**") {
461                    let parts: Vec<&str> = pattern.split("**").collect();
462                    if parts.len() == 2 {
463                        let prefix = parts[0].trim_matches('/');
464                        let suffix = parts[1].trim_matches('/');
465                        (prefix.is_empty() || path_str.contains(prefix))
466                            && (suffix.is_empty() || path_str.contains(suffix))
467                    } else {
468                        false
469                    }
470                } else if pattern.starts_with('*') {
471                    path_str.ends_with(pattern.trim_start_matches('*'))
472                } else {
473                    path_str.contains(pattern.trim_matches('*'))
474                }
475            });
476
477            if should_exclude {
478                continue;
479            }
480
481            if path.is_dir() {
482                visit_dir(&path, event_tx, exclude_patterns);
483            } else if path.is_file() {
484                // Send as Created event
485                if let Err(e) = event_tx.blocking_send(FileEvent::Created(path.clone())) {
486                    warn!("Failed to queue file {:?}: {}", path, e);
487                }
488            }
489        }
490    }
491
492    visit_dir(root, event_tx, exclude_patterns);
493}
494
495/// Process a file through the full pipeline: extract → chunk → embed → store.
496async fn process_file(
497    path: &Path,
498    store: &Arc<dyn VectorStore>,
499    extractors: &Arc<ExtractorRegistry>,
500    chunkers: &Arc<ChunkerRegistry>,
501    embedder: &Arc<EmbedderPool>,
502    config: &IndexerConfig,
503) -> Result<u32> {
504    // Get file metadata
505    let metadata = tokio::fs::metadata(path)
506        .await
507        .map_err(|e| Error::Other(format!("Failed to get metadata: {e}")))?;
508
509    if !metadata.is_file() {
510        return Ok(0);
511    }
512
513    // Compute content hash
514    let content_hash = compute_hash(path).await?;
515
516    // Check if already indexed with same hash
517    if let Ok(Some(existing)) = store.get_file(path).await
518        && existing.content_hash == content_hash
519        && existing.status == FileStatus::Indexed
520    {
521        debug!("File {:?} already indexed, skipping", path);
522        return Ok(existing.chunk_count);
523    }
524
525    // Determine MIME type
526    let mime_type = mime_guess::from_path(path)
527        .first_or_text_plain()
528        .to_string();
529
530    // Extract content
531    let content = extractors
532        .extract(path, &mime_type)
533        .await
534        .map_err(Error::Extraction)?;
535
536    if content.text.is_empty() {
537        debug!("Empty content for {:?}, skipping", path);
538        return Ok(0);
539    }
540
541    // Determine content type
542    let content_type = determine_content_type(path, &mime_type, &content);
543
544    // Chunk content
545    let chunk_outputs = chunkers
546        .chunk(&content, &content_type, &config.chunk_config)
547        .await
548        .map_err(Error::Chunking)?;
549
550    if chunk_outputs.is_empty() {
551        return Ok(0);
552    }
553
554    // Prepare texts for embedding
555    let texts: Vec<&str> = chunk_outputs.iter().map(|c| c.content.as_str()).collect();
556
557    // Generate embeddings
558    let embeddings = embedder
559        .embed_batch(&texts, &config.embed_config)
560        .await
561        .map_err(Error::Embedding)?;
562
563    // Create Chunk objects
564    let file_id = Uuid::new_v4();
565    let now = Utc::now();
566    let model_name = embedder.model_name().to_string();
567
568    let chunks: Vec<Chunk> = chunk_outputs
569        .into_iter()
570        .zip(embeddings.into_iter())
571        .enumerate()
572        .map(|(idx, (output, emb_output))| {
573            build_chunk(
574                file_id,
575                path,
576                idx as u32,
577                output,
578                emb_output.embedding,
579                &content_type,
580                &mime_type,
581                &model_name,
582                now,
583            )
584        })
585        .collect();
586
587    let chunk_count = chunks.len() as u32;
588
589    // Delete old chunks for this file
590    let _ = store.delete_by_file_path(path).await;
591
592    // Store chunks
593    store.upsert_chunks(&chunks).await.map_err(Error::Store)?;
594
595    // Store file record
596    let file_record = FileRecord {
597        id: file_id,
598        path: path.to_path_buf(),
599        size_bytes: metadata.len(),
600        mime_type,
601        content_hash,
602        modified_at: metadata
603            .modified()
604            .map_or_else(|_| now, chrono::DateTime::<Utc>::from),
605        indexed_at: Some(now),
606        chunk_count,
607        status: FileStatus::Indexed,
608        error_message: None,
609    };
610
611    store
612        .upsert_file(&file_record)
613        .await
614        .map_err(Error::Store)?;
615
616    Ok(chunk_count)
617}
618
619/// Compute blake3 hash of file content.
620async fn compute_hash(path: &Path) -> Result<String> {
621    let content = tokio::fs::read(path)
622        .await
623        .map_err(|e| Error::Other(format!("Failed to read file: {e}")))?;
624
625    let hash = blake3::hash(&content);
626    Ok(hash.to_hex().to_string())
627}
628
629/// Determine content type from path, MIME type, and extracted content.
630fn determine_content_type(
631    path: &Path,
632    mime_type: &str,
633    content: &ragfs_core::ExtractedContent,
634) -> ContentType {
635    // Check for code files
636    let code_extensions = [
637        ("rs", "rust"),
638        ("py", "python"),
639        ("js", "javascript"),
640        ("ts", "typescript"),
641        ("tsx", "typescript"),
642        ("jsx", "javascript"),
643        ("java", "java"),
644        ("go", "go"),
645        ("c", "c"),
646        ("cpp", "cpp"),
647        ("h", "c"),
648        ("hpp", "cpp"),
649        ("rb", "ruby"),
650        ("php", "php"),
651        ("swift", "swift"),
652        ("kt", "kotlin"),
653        ("scala", "scala"),
654        ("ex", "elixir"),
655        ("hs", "haskell"),
656        ("ml", "ocaml"),
657        ("lua", "lua"),
658        ("sh", "bash"),
659        ("sql", "sql"),
660    ];
661
662    if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
663        let ext_lower = ext.to_lowercase();
664        for (code_ext, lang) in &code_extensions {
665            if ext_lower == *code_ext {
666                return ContentType::Code {
667                    language: (*lang).to_string(),
668                    symbol: None,
669                };
670            }
671        }
672    }
673
674    // Check for markdown
675    if mime_type.contains("markdown")
676        || path
677            .extension()
678            .is_some_and(|e| e == "md" || e == "markdown")
679    {
680        return ContentType::Markdown;
681    }
682
683    // Check language from extraction metadata
684    if let Some(ref lang) = content.metadata.language
685        && !lang.is_empty()
686    {
687        return ContentType::Code {
688            language: lang.clone(),
689            symbol: None,
690        };
691    }
692
693    ContentType::Text
694}
695
696/// Build a Chunk from chunk output and embedding.
697fn build_chunk(
698    file_id: Uuid,
699    file_path: &Path,
700    chunk_index: u32,
701    output: ChunkOutput,
702    embedding: Vec<f32>,
703    content_type: &ContentType,
704    mime_type: &str,
705    model_name: &str,
706    now: chrono::DateTime<Utc>,
707) -> Chunk {
708    Chunk {
709        id: Uuid::new_v4(),
710        file_id,
711        file_path: file_path.to_path_buf(),
712        content: output.content,
713        content_type: content_type.clone(),
714        mime_type: Some(mime_type.to_string()),
715        chunk_index,
716        byte_range: output.byte_range,
717        line_range: output.line_range,
718        parent_chunk_id: None,
719        depth: output.depth,
720        embedding: Some(embedding),
721        metadata: ChunkMetadata {
722            embedding_model: Some(model_name.to_string()),
723            indexed_at: Some(now),
724            token_count: None,
725            extra: Default::default(),
726        },
727    }
728}
729
730#[async_trait::async_trait]
731impl Indexer for IndexerService {
732    async fn watch(&self, path: &Path) -> Result<()> {
733        let mut w = self.watcher.write().await;
734        if let Some(ref mut watcher) = *w {
735            watcher
736                .watch(path)
737                .map_err(|e| Error::Other(format!("watch error: {e}")))?;
738        }
739        Ok(())
740    }
741
742    async fn stop(&self) -> Result<()> {
743        let mut running = self.running.write().await;
744        *running = false;
745        info!("Indexer stopped");
746        Ok(())
747    }
748
749    async fn index(&self, path: &Path, force: bool) -> Result<()> {
750        if force {
751            // Delete existing and reindex
752            let _ = self.store.delete_by_file_path(path).await;
753        }
754
755        // Queue file for indexing
756        self.event_tx
757            .send(FileEvent::Modified(path.to_path_buf()))
758            .await
759            .map_err(|e| Error::Other(format!("send error: {e}")))?;
760        Ok(())
761    }
762
763    async fn stats(&self) -> Result<IndexStats> {
764        Ok(self.stats.read().await.clone())
765    }
766
767    async fn needs_reindex(&self, path: &Path) -> Result<bool> {
768        // Check if file exists in store and compare hash
769        match self.store.get_file(path).await {
770            Ok(Some(record)) => {
771                let current_hash = compute_hash(path).await?;
772                Ok(record.content_hash != current_hash)
773            }
774            Ok(None) => Ok(true),
775            Err(e) => Err(Error::Store(e)),
776        }
777    }
778}
779
780#[cfg(test)]
781mod tests {
782    use super::*;
783    use ragfs_chunker::{ChunkerRegistry, FixedSizeChunker};
784    use ragfs_core::{
785        ContentMetadataInfo, EmbedError, Embedder, EmbeddingConfig, EmbeddingOutput, Modality,
786        SearchQuery, SearchResult, StoreError, StoreStats,
787    };
788    use ragfs_embed::EmbedderPool;
789    use ragfs_extract::ExtractorRegistry;
790    use std::collections::HashMap;
791    use tempfile::tempdir;
792
793    const TEST_DIM: usize = 384;
794
795    // ==================== Mock Embedder ====================
796
797    struct MockEmbedder {
798        dimension: usize,
799    }
800
801    impl MockEmbedder {
802        fn new(dimension: usize) -> Self {
803            Self { dimension }
804        }
805    }
806
807    #[async_trait::async_trait]
808    impl Embedder for MockEmbedder {
809        fn model_name(&self) -> &str {
810            "mock-embedder"
811        }
812
813        fn dimension(&self) -> usize {
814            self.dimension
815        }
816
817        fn max_tokens(&self) -> usize {
818            512
819        }
820
821        fn modalities(&self) -> &[Modality] {
822            &[Modality::Text]
823        }
824
825        async fn embed_text(
826            &self,
827            texts: &[&str],
828            _config: &EmbeddingConfig,
829        ) -> std::result::Result<Vec<EmbeddingOutput>, EmbedError> {
830            Ok(texts
831                .iter()
832                .map(|_| EmbeddingOutput {
833                    embedding: vec![0.1; self.dimension],
834                    token_count: 10,
835                })
836                .collect())
837        }
838
839        async fn embed_query(
840            &self,
841            _query: &str,
842            _config: &EmbeddingConfig,
843        ) -> std::result::Result<EmbeddingOutput, EmbedError> {
844            Ok(EmbeddingOutput {
845                embedding: vec![0.1; self.dimension],
846                token_count: 10,
847            })
848        }
849    }
850
851    // ==================== Mock VectorStore ====================
852
853    struct MockStore {
854        chunks: Arc<RwLock<Vec<Chunk>>>,
855        files: Arc<RwLock<HashMap<PathBuf, FileRecord>>>,
856    }
857
858    impl MockStore {
859        fn new() -> Self {
860            Self {
861                chunks: Arc::new(RwLock::new(Vec::new())),
862                files: Arc::new(RwLock::new(HashMap::new())),
863            }
864        }
865    }
866
867    #[async_trait::async_trait]
868    impl VectorStore for MockStore {
869        async fn init(&self) -> std::result::Result<(), StoreError> {
870            Ok(())
871        }
872
873        async fn upsert_chunks(&self, chunks: &[Chunk]) -> std::result::Result<(), StoreError> {
874            let mut store = self.chunks.write().await;
875            for chunk in chunks {
876                store.push(chunk.clone());
877            }
878            Ok(())
879        }
880
881        async fn search(
882            &self,
883            _query: SearchQuery,
884        ) -> std::result::Result<Vec<SearchResult>, StoreError> {
885            Ok(vec![])
886        }
887
888        async fn hybrid_search(
889            &self,
890            _query: SearchQuery,
891        ) -> std::result::Result<Vec<SearchResult>, StoreError> {
892            Ok(vec![])
893        }
894
895        async fn delete_by_file_path(&self, path: &Path) -> std::result::Result<u64, StoreError> {
896            let mut chunks = self.chunks.write().await;
897            let initial_len = chunks.len();
898            chunks.retain(|c| c.file_path != path);
899            let deleted = initial_len - chunks.len();
900            let mut files = self.files.write().await;
901            files.remove(path);
902            Ok(deleted as u64)
903        }
904
905        async fn get_file(
906            &self,
907            path: &Path,
908        ) -> std::result::Result<Option<FileRecord>, StoreError> {
909            let files = self.files.read().await;
910            Ok(files.get(path).cloned())
911        }
912
913        async fn upsert_file(&self, record: &FileRecord) -> std::result::Result<(), StoreError> {
914            let mut files = self.files.write().await;
915            files.insert(record.path.clone(), record.clone());
916            Ok(())
917        }
918
919        async fn stats(&self) -> std::result::Result<StoreStats, StoreError> {
920            let chunks = self.chunks.read().await;
921            let files = self.files.read().await;
922            Ok(StoreStats {
923                total_chunks: chunks.len() as u64,
924                total_files: files.len() as u64,
925                index_size_bytes: 0,
926                last_updated: None,
927            })
928        }
929
930        async fn update_file_path(
931            &self,
932            _old_path: &Path,
933            _new_path: &Path,
934        ) -> std::result::Result<u64, StoreError> {
935            Ok(0)
936        }
937
938        async fn get_chunks_for_file(
939            &self,
940            path: &Path,
941        ) -> std::result::Result<Vec<Chunk>, StoreError> {
942            let chunks = self.chunks.read().await;
943            Ok(chunks
944                .iter()
945                .filter(|c| c.file_path == path)
946                .cloned()
947                .collect())
948        }
949
950        async fn get_all_chunks(&self) -> std::result::Result<Vec<Chunk>, StoreError> {
951            let chunks = self.chunks.read().await;
952            Ok(chunks.clone())
953        }
954
955        async fn get_all_files(&self) -> std::result::Result<Vec<FileRecord>, StoreError> {
956            let files = self.files.read().await;
957            Ok(files.values().cloned().collect())
958        }
959    }
960
961    // ==================== Helper function tests ====================
962
963    #[test]
964    fn test_determine_content_type_rust() {
965        let path = PathBuf::from("/test/file.rs");
966        let content = ragfs_core::ExtractedContent {
967            text: "fn main() {}".to_string(),
968            elements: vec![],
969            images: vec![],
970            metadata: ContentMetadataInfo::default(),
971        };
972
973        let result = determine_content_type(&path, "text/x-rust", &content);
974        match result {
975            ContentType::Code { language, .. } => assert_eq!(language, "rust"),
976            _ => panic!("Expected Code content type"),
977        }
978    }
979
980    #[test]
981    fn test_determine_content_type_python() {
982        let path = PathBuf::from("/test/script.py");
983        let content = ragfs_core::ExtractedContent {
984            text: "def main(): pass".to_string(),
985            elements: vec![],
986            images: vec![],
987            metadata: ContentMetadataInfo::default(),
988        };
989
990        let result = determine_content_type(&path, "text/x-python", &content);
991        match result {
992            ContentType::Code { language, .. } => assert_eq!(language, "python"),
993            _ => panic!("Expected Code content type"),
994        }
995    }
996
997    #[test]
998    fn test_determine_content_type_markdown() {
999        let path = PathBuf::from("/test/readme.md");
1000        let content = ragfs_core::ExtractedContent {
1001            text: "# Hello".to_string(),
1002            elements: vec![],
1003            images: vec![],
1004            metadata: ContentMetadataInfo::default(),
1005        };
1006
1007        let result = determine_content_type(&path, "text/markdown", &content);
1008        assert!(matches!(result, ContentType::Markdown));
1009    }
1010
1011    #[test]
1012    fn test_determine_content_type_text() {
1013        let path = PathBuf::from("/test/notes.txt");
1014        let content = ragfs_core::ExtractedContent {
1015            text: "Some text content".to_string(),
1016            elements: vec![],
1017            images: vec![],
1018            metadata: ContentMetadataInfo::default(),
1019        };
1020
1021        let result = determine_content_type(&path, "text/plain", &content);
1022        assert!(matches!(result, ContentType::Text));
1023    }
1024
1025    #[test]
1026    fn test_determine_content_type_javascript() {
1027        let path = PathBuf::from("/test/app.js");
1028        let content = ragfs_core::ExtractedContent {
1029            text: "const x = 1;".to_string(),
1030            elements: vec![],
1031            images: vec![],
1032            metadata: ContentMetadataInfo::default(),
1033        };
1034
1035        let result = determine_content_type(&path, "application/javascript", &content);
1036        match result {
1037            ContentType::Code { language, .. } => assert_eq!(language, "javascript"),
1038            _ => panic!("Expected Code content type"),
1039        }
1040    }
1041
1042    #[tokio::test]
1043    async fn test_compute_hash() {
1044        let temp_dir = tempdir().unwrap();
1045        let file_path = temp_dir.path().join("test.txt");
1046        std::fs::write(&file_path, "test content").unwrap();
1047
1048        let hash = compute_hash(&file_path).await.unwrap();
1049        assert!(!hash.is_empty());
1050        assert_eq!(hash.len(), 64); // blake3 hex is 64 chars
1051
1052        // Same content should produce same hash
1053        let hash2 = compute_hash(&file_path).await.unwrap();
1054        assert_eq!(hash, hash2);
1055    }
1056
1057    #[tokio::test]
1058    async fn test_compute_hash_different_content() {
1059        let temp_dir = tempdir().unwrap();
1060
1061        let file1 = temp_dir.path().join("file1.txt");
1062        let file2 = temp_dir.path().join("file2.txt");
1063
1064        std::fs::write(&file1, "content 1").unwrap();
1065        std::fs::write(&file2, "content 2").unwrap();
1066
1067        let hash1 = compute_hash(&file1).await.unwrap();
1068        let hash2 = compute_hash(&file2).await.unwrap();
1069
1070        assert_ne!(hash1, hash2);
1071    }
1072
1073    #[test]
1074    fn test_build_chunk() {
1075        use ragfs_core::ChunkOutputMetadata;
1076
1077        let file_id = Uuid::new_v4();
1078        let file_path = PathBuf::from("/test/file.txt");
1079        let chunk_output = ChunkOutput {
1080            content: "Test chunk content".to_string(),
1081            byte_range: 0..18,
1082            line_range: Some(0..1),
1083            parent_index: None,
1084            depth: 0,
1085            metadata: ChunkOutputMetadata::default(),
1086        };
1087        let embedding = vec![0.1; TEST_DIM];
1088        let content_type = ContentType::Text;
1089        let mime_type = "text/plain";
1090        let model_name = "test-model";
1091        let now = Utc::now();
1092
1093        let chunk = build_chunk(
1094            file_id,
1095            &file_path,
1096            0,
1097            chunk_output,
1098            embedding.clone(),
1099            &content_type,
1100            mime_type,
1101            model_name,
1102            now,
1103        );
1104
1105        assert_eq!(chunk.file_id, file_id);
1106        assert_eq!(chunk.file_path, file_path);
1107        assert_eq!(chunk.chunk_index, 0);
1108        assert_eq!(chunk.content, "Test chunk content");
1109        assert_eq!(chunk.embedding, Some(embedding));
1110        assert_eq!(chunk.mime_type, Some("text/plain".to_string()));
1111        assert!(matches!(chunk.content_type, ContentType::Text));
1112    }
1113
1114    // ==================== IndexerService tests ====================
1115
1116    fn create_test_indexer(store: Arc<dyn VectorStore>) -> IndexerService {
1117        use ragfs_extract::TextExtractor;
1118
1119        let mut extractors = ExtractorRegistry::new();
1120        extractors.register("text", TextExtractor::new());
1121        let extractors = Arc::new(extractors);
1122
1123        let mut chunkers = ChunkerRegistry::new();
1124        chunkers.register("fixed", FixedSizeChunker::new());
1125        chunkers.set_default("fixed");
1126        let chunkers = Arc::new(chunkers);
1127
1128        let embedder = Arc::new(MockEmbedder::new(TEST_DIM));
1129        let embedder_pool = Arc::new(EmbedderPool::new(embedder, 1));
1130
1131        let config = IndexerConfig::default();
1132
1133        IndexerService::new(
1134            PathBuf::from("/tmp"),
1135            store,
1136            extractors,
1137            chunkers,
1138            embedder_pool,
1139            config,
1140        )
1141    }
1142
1143    #[tokio::test]
1144    async fn test_process_single_file() {
1145        let temp_dir = tempdir().unwrap();
1146        let file_path = temp_dir.path().join("test.txt");
1147        std::fs::write(&file_path, "This is test content for indexing.").unwrap();
1148
1149        let store = Arc::new(MockStore::new());
1150        let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1151
1152        let chunk_count = indexer.process_single(&file_path).await.unwrap();
1153
1154        assert!(chunk_count > 0, "Should have created at least one chunk");
1155
1156        // Verify chunks were stored
1157        let stored_chunks = store.chunks.read().await;
1158        assert!(!stored_chunks.is_empty());
1159        assert!(stored_chunks.iter().all(|c| c.file_path == file_path));
1160
1161        // Verify file record was stored
1162        let files = store.files.read().await;
1163        assert!(files.contains_key(&file_path));
1164        let file_record = files.get(&file_path).unwrap();
1165        assert_eq!(file_record.chunk_count, chunk_count);
1166        assert_eq!(file_record.status, FileStatus::Indexed);
1167    }
1168
1169    #[tokio::test]
1170    async fn test_process_single_skip_already_indexed() {
1171        let temp_dir = tempdir().unwrap();
1172        let file_path = temp_dir.path().join("test.txt");
1173        std::fs::write(&file_path, "Test content").unwrap();
1174
1175        let store = Arc::new(MockStore::new());
1176        let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1177
1178        // First indexing
1179        let chunk_count1 = indexer.process_single(&file_path).await.unwrap();
1180
1181        // Count chunks after first indexing
1182        let _chunks_after_first = store.chunks.read().await.len();
1183
1184        // Second indexing (should skip because content hasn't changed)
1185        let chunk_count2 = indexer.process_single(&file_path).await.unwrap();
1186
1187        assert_eq!(chunk_count1, chunk_count2);
1188
1189        // Chunk count should be same (old chunks deleted and new ones added if reindexed,
1190        // or no change if skipped)
1191        let chunks_after_second = store.chunks.read().await.len();
1192        // Note: process_file deletes old chunks before adding new ones, so count may vary
1193        assert!(chunks_after_second > 0);
1194    }
1195
1196    #[tokio::test]
1197    async fn test_needs_reindex_new_file() {
1198        let temp_dir = tempdir().unwrap();
1199        let file_path = temp_dir.path().join("new_file.txt");
1200        std::fs::write(&file_path, "New content").unwrap();
1201
1202        let store = Arc::new(MockStore::new());
1203        let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1204
1205        // File not in store should need reindex
1206        let needs = indexer.needs_reindex(&file_path).await.unwrap();
1207        assert!(needs);
1208    }
1209
1210    #[tokio::test]
1211    async fn test_needs_reindex_unchanged_file() {
1212        let temp_dir = tempdir().unwrap();
1213        let file_path = temp_dir.path().join("unchanged.txt");
1214        std::fs::write(&file_path, "Unchanged content").unwrap();
1215
1216        let store = Arc::new(MockStore::new());
1217        let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1218
1219        // Index the file first
1220        indexer.process_single(&file_path).await.unwrap();
1221
1222        // Same content should not need reindex
1223        let needs = indexer.needs_reindex(&file_path).await.unwrap();
1224        assert!(!needs);
1225    }
1226
1227    #[tokio::test]
1228    async fn test_needs_reindex_modified_file() {
1229        let temp_dir = tempdir().unwrap();
1230        let file_path = temp_dir.path().join("modified.txt");
1231        std::fs::write(&file_path, "Original content").unwrap();
1232
1233        let store = Arc::new(MockStore::new());
1234        let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1235
1236        // Index the file
1237        indexer.process_single(&file_path).await.unwrap();
1238
1239        // Modify the file
1240        std::fs::write(&file_path, "Modified content - different!").unwrap();
1241
1242        // Modified file should need reindex
1243        let needs = indexer.needs_reindex(&file_path).await.unwrap();
1244        assert!(needs);
1245    }
1246
1247    #[tokio::test]
1248    async fn test_reindex_path_file() {
1249        let temp_dir = tempdir().unwrap();
1250        let file_path = temp_dir.path().join("reindex_test.txt");
1251        std::fs::write(&file_path, "Original content for reindex test").unwrap();
1252
1253        let store = Arc::new(MockStore::new());
1254        let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1255
1256        // Index initially
1257        indexer.process_single(&file_path).await.unwrap();
1258
1259        let initial_chunks = store.chunks.read().await.len();
1260        assert!(initial_chunks > 0);
1261
1262        // Modify content
1263        std::fs::write(&file_path, "New content after modification").unwrap();
1264
1265        // Reindex
1266        indexer.reindex_path(&file_path).await.unwrap();
1267
1268        // Verify file was reindexed
1269        let files = store.files.read().await;
1270        assert!(files.contains_key(&file_path));
1271    }
1272
1273    #[tokio::test]
1274    async fn test_empty_file_returns_zero_chunks() {
1275        let temp_dir = tempdir().unwrap();
1276        let file_path = temp_dir.path().join("empty.txt");
1277        std::fs::write(&file_path, "").unwrap();
1278
1279        let store = Arc::new(MockStore::new());
1280        let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1281
1282        let chunk_count = indexer.process_single(&file_path).await.unwrap();
1283
1284        assert_eq!(chunk_count, 0, "Empty file should produce zero chunks");
1285    }
1286
1287    #[test]
1288    fn test_indexer_config_default() {
1289        let config = IndexerConfig::default();
1290
1291        assert!(!config.include_patterns.is_empty());
1292        assert!(config.include_patterns.contains(&"**/*".to_string()));
1293
1294        assert!(!config.exclude_patterns.is_empty());
1295        assert!(config.exclude_patterns.contains(&"**/.git/**".to_string()));
1296        assert!(
1297            config
1298                .exclude_patterns
1299                .contains(&"**/node_modules/**".to_string())
1300        );
1301        assert!(
1302            config
1303                .exclude_patterns
1304                .contains(&"**/target/**".to_string())
1305        );
1306    }
1307
1308    #[tokio::test]
1309    async fn test_subscribe_receives_updates() {
1310        let store = Arc::new(MockStore::new());
1311        let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1312
1313        let mut receiver = indexer.subscribe();
1314
1315        // Note: This test is limited because we can't easily trigger updates
1316        // without starting the full indexer service. The subscribe mechanism
1317        // is tested indirectly through the update_tx.send() calls in process_file.
1318
1319        // Verify we can create a receiver without panicking
1320        assert!(receiver.try_recv().is_err()); // No messages yet
1321    }
1322
1323    #[test]
1324    fn test_index_update_variants() {
1325        // Test that all IndexUpdate variants can be created
1326        let _indexed = IndexUpdate::FileIndexed {
1327            path: PathBuf::from("/test"),
1328            chunk_count: 5,
1329        };
1330
1331        let _removed = IndexUpdate::FileRemoved {
1332            path: PathBuf::from("/test"),
1333        };
1334
1335        let _error = IndexUpdate::FileError {
1336            path: PathBuf::from("/test"),
1337            error: "test error".to_string(),
1338        };
1339
1340        let _started = IndexUpdate::IndexingStarted {
1341            path: PathBuf::from("/test"),
1342        };
1343    }
1344}