1use chrono::Utc;
4use ragfs_chunker::ChunkerRegistry;
5use ragfs_core::{
6 Chunk, ChunkConfig, ChunkMetadata, ChunkOutput, ContentType, EmbeddingConfig, Error, FileEvent,
7 FileRecord, FileStatus, IndexStats, Indexer, Result, VectorStore,
8};
9use ragfs_embed::EmbedderPool;
10use ragfs_extract::ExtractorRegistry;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use tokio::sync::{RwLock, broadcast, mpsc};
14use tracing::{debug, error, info, warn};
15use uuid::Uuid;
16
17use crate::watcher::FileWatcher;
18
19#[derive(Debug, Clone)]
21pub enum IndexUpdate {
22 FileIndexed { path: PathBuf, chunk_count: u32 },
23 FileRemoved { path: PathBuf },
24 FileError { path: PathBuf, error: String },
25 IndexingStarted { path: PathBuf },
26}
27
28#[derive(Debug, Clone)]
30pub struct IndexerConfig {
31 pub chunk_config: ChunkConfig,
33 pub embed_config: EmbeddingConfig,
35 pub include_patterns: Vec<String>,
37 pub exclude_patterns: Vec<String>,
39}
40
41impl Default for IndexerConfig {
42 fn default() -> Self {
43 Self {
44 chunk_config: ChunkConfig::default(),
45 embed_config: EmbeddingConfig::default(),
46 include_patterns: vec!["**/*".to_string()],
47 exclude_patterns: vec![
48 "**/.*".to_string(),
49 "**/.git/**".to_string(),
50 "**/node_modules/**".to_string(),
51 "**/target/**".to_string(),
52 "**/__pycache__/**".to_string(),
53 "**/*.lock".to_string(),
54 ],
55 }
56 }
57}
58
59pub struct IndexerService {
61 root: PathBuf,
63 store: Arc<dyn VectorStore>,
65 extractors: Arc<ExtractorRegistry>,
67 chunkers: Arc<ChunkerRegistry>,
69 embedder: Arc<EmbedderPool>,
71 config: IndexerConfig,
73 stats: Arc<RwLock<IndexStats>>,
75 event_tx: mpsc::Sender<FileEvent>,
77 event_rx: Arc<RwLock<mpsc::Receiver<FileEvent>>>,
79 update_tx: broadcast::Sender<IndexUpdate>,
81 watcher: Arc<RwLock<Option<FileWatcher>>>,
83 running: Arc<RwLock<bool>>,
85}
86
87impl IndexerService {
88 pub fn new(
90 root: PathBuf,
91 store: Arc<dyn VectorStore>,
92 extractors: Arc<ExtractorRegistry>,
93 chunkers: Arc<ChunkerRegistry>,
94 embedder: Arc<EmbedderPool>,
95 config: IndexerConfig,
96 ) -> Self {
97 let (event_tx, event_rx) = mpsc::channel(1024);
98 let (update_tx, _) = broadcast::channel(256);
99
100 Self {
101 root,
102 store,
103 extractors,
104 chunkers,
105 embedder,
106 config,
107 stats: Arc::new(RwLock::new(IndexStats::default())),
108 event_tx,
109 event_rx: Arc::new(RwLock::new(event_rx)),
110 update_tx,
111 watcher: Arc::new(RwLock::new(None)),
112 running: Arc::new(RwLock::new(false)),
113 }
114 }
115
116 #[must_use]
118 pub fn subscribe(&self) -> broadcast::Receiver<IndexUpdate> {
119 self.update_tx.subscribe()
120 }
121
122 #[must_use]
124 pub fn root(&self) -> &Path {
125 &self.root
126 }
127
128 pub async fn start(&self) -> Result<()> {
130 let mut running = self.running.write().await;
131 if *running {
132 return Ok(());
133 }
134 *running = true;
135 drop(running);
136
137 info!("Starting indexer for {:?}", self.root);
138
139 self.store.init().await.map_err(Error::Store)?;
141
142 let watcher =
144 FileWatcher::new(self.event_tx.clone(), std::time::Duration::from_millis(500))
145 .map_err(|e| Error::Other(format!("watcher error: {e}")))?;
146 {
147 let mut w = self.watcher.write().await;
148 *w = Some(watcher);
149 }
150
151 {
153 let mut w = self.watcher.write().await;
154 if let Some(ref mut watcher) = *w {
155 watcher
156 .watch(&self.root)
157 .map_err(|e| Error::Other(format!("watch error: {e}")))?;
158 }
159 }
160
161 let event_rx = Arc::clone(&self.event_rx);
163 let update_tx = self.update_tx.clone();
164 let running = Arc::clone(&self.running);
165 let store = Arc::clone(&self.store);
166 let extractors = Arc::clone(&self.extractors);
167 let chunkers = Arc::clone(&self.chunkers);
168 let embedder = Arc::clone(&self.embedder);
169 let config = self.config.clone();
170 let stats = Arc::clone(&self.stats);
171
172 tokio::spawn(async move {
174 let mut rx = event_rx.write().await;
175 while *running.read().await {
176 match rx.recv().await {
177 Some(event) => {
178 debug!("Received file event: {:?}", event);
179 match &event {
180 FileEvent::Created(path) | FileEvent::Modified(path) => {
181 let _ = update_tx
182 .send(IndexUpdate::IndexingStarted { path: path.clone() });
183
184 match process_file(
185 path,
186 &store,
187 &extractors,
188 &chunkers,
189 &embedder,
190 &config,
191 )
192 .await
193 {
194 Ok(chunk_count) => {
195 info!("Indexed {:?} ({} chunks)", path, chunk_count);
196 let _ = update_tx.send(IndexUpdate::FileIndexed {
197 path: path.clone(),
198 chunk_count,
199 });
200
201 let mut s = stats.write().await;
203 s.indexed_files += 1;
204 s.total_chunks += u64::from(chunk_count);
205 s.last_update = Some(Utc::now());
206 }
207 Err(e) => {
208 error!("Failed to index {:?}: {}", path, e);
209 let _ = update_tx.send(IndexUpdate::FileError {
210 path: path.clone(),
211 error: e.to_string(),
212 });
213
214 let mut s = stats.write().await;
216 s.error_files += 1;
217 }
218 }
219 }
220 FileEvent::Deleted(path) => {
221 if let Err(e) = store.delete_by_file_path(path).await {
222 error!("Failed to delete {:?}: {}", path, e);
223 } else {
224 let _ = update_tx
225 .send(IndexUpdate::FileRemoved { path: path.clone() });
226 }
227 }
228 FileEvent::Renamed { from, to } => {
229 if let Err(e) = store.delete_by_file_path(from).await {
231 warn!("Failed to delete old path {:?}: {}", from, e);
232 }
233 let _ = update_tx
234 .send(IndexUpdate::IndexingStarted { path: to.clone() });
235
236 match process_file(
237 to,
238 &store,
239 &extractors,
240 &chunkers,
241 &embedder,
242 &config,
243 )
244 .await
245 {
246 Ok(chunk_count) => {
247 let _ = update_tx.send(IndexUpdate::FileIndexed {
248 path: to.clone(),
249 chunk_count,
250 });
251 }
252 Err(e) => {
253 let _ = update_tx.send(IndexUpdate::FileError {
254 path: to.clone(),
255 error: e.to_string(),
256 });
257 }
258 }
259 }
260 }
261 }
262 None => break,
263 }
264 }
265 });
266
267 self.scan().await?;
269
270 Ok(())
271 }
272
273 async fn scan(&self) -> Result<()> {
275 info!("Scanning {:?}", self.root);
276
277 let root = self.root.clone();
278 let event_tx = self.event_tx.clone();
279 let exclude_patterns = self.config.exclude_patterns.clone();
280
281 tokio::task::spawn_blocking(move || {
283 scan_directory(&root, &event_tx, &exclude_patterns);
284 })
285 .await
286 .map_err(|e| Error::Other(format!("scan task failed: {e}")))?;
287
288 Ok(())
289 }
290
291 pub async fn process_single(&self, path: &Path) -> Result<u32> {
293 process_file(
294 path,
295 &self.store,
296 &self.extractors,
297 &self.chunkers,
298 &self.embedder,
299 &self.config,
300 )
301 .await
302 }
303
304 pub async fn reindex_path(&self, path: &Path) -> Result<()> {
309 if !path.exists() {
310 return Err(Error::Other(format!(
311 "Path does not exist: {}",
312 path.display()
313 )));
314 }
315
316 if path.is_file() {
317 let _ = self.store.delete_by_file_path(path).await;
319
320 let _ = self.update_tx.send(IndexUpdate::IndexingStarted {
321 path: path.to_path_buf(),
322 });
323
324 match process_file(
325 path,
326 &self.store,
327 &self.extractors,
328 &self.chunkers,
329 &self.embedder,
330 &self.config,
331 )
332 .await
333 {
334 Ok(chunk_count) => {
335 info!("Reindexed {:?} ({} chunks)", path, chunk_count);
336 let _ = self.update_tx.send(IndexUpdate::FileIndexed {
337 path: path.to_path_buf(),
338 chunk_count,
339 });
340 }
341 Err(e) => {
342 error!("Failed to reindex {:?}: {}", path, e);
343 let _ = self.update_tx.send(IndexUpdate::FileError {
344 path: path.to_path_buf(),
345 error: e.to_string(),
346 });
347 return Err(e);
348 }
349 }
350 } else if path.is_dir() {
351 info!("Reindexing directory {:?}", path);
353 self.reindex_directory(path).await?;
354 }
355
356 Ok(())
357 }
358
359 async fn reindex_directory(&self, dir: &Path) -> Result<()> {
361 let entries = tokio::fs::read_dir(dir)
362 .await
363 .map_err(|e| Error::Other(format!("Failed to read directory: {e}")))?;
364
365 let mut entries_stream = tokio_stream::wrappers::ReadDirStream::new(entries);
366
367 use tokio_stream::StreamExt;
368 while let Some(entry) = entries_stream.next().await {
369 let entry = entry.map_err(|e| Error::Other(format!("Failed to read entry: {e}")))?;
370 let path = entry.path();
371
372 let path_str = path.to_string_lossy();
374 let should_exclude = self.config.exclude_patterns.iter().any(|pattern| {
375 if pattern.contains("**") {
376 let parts: Vec<&str> = pattern.split("**").collect();
377 if parts.len() == 2 {
378 let prefix = parts[0].trim_matches('/');
379 let suffix = parts[1].trim_matches('/');
380 (prefix.is_empty() || path_str.contains(prefix))
381 && (suffix.is_empty() || path_str.contains(suffix))
382 } else {
383 false
384 }
385 } else if pattern.starts_with('*') {
386 path_str.ends_with(pattern.trim_start_matches('*'))
387 } else {
388 path_str.contains(pattern.trim_matches('*'))
389 }
390 });
391
392 if should_exclude {
393 continue;
394 }
395
396 if path.is_dir() {
397 Box::pin(self.reindex_directory(&path)).await?;
399 } else if path.is_file() {
400 let _ = self.store.delete_by_file_path(&path).await;
402
403 let _ = self
404 .update_tx
405 .send(IndexUpdate::IndexingStarted { path: path.clone() });
406
407 match process_file(
408 &path,
409 &self.store,
410 &self.extractors,
411 &self.chunkers,
412 &self.embedder,
413 &self.config,
414 )
415 .await
416 {
417 Ok(chunk_count) => {
418 info!("Reindexed {:?} ({} chunks)", path, chunk_count);
419 let _ = self.update_tx.send(IndexUpdate::FileIndexed {
420 path: path.clone(),
421 chunk_count,
422 });
423 }
424 Err(e) => {
425 warn!("Failed to reindex {:?}: {}", path, e);
426 let _ = self.update_tx.send(IndexUpdate::FileError {
427 path: path.clone(),
428 error: e.to_string(),
429 });
430 }
432 }
433 }
434 }
435
436 Ok(())
437 }
438}
439
440fn scan_directory(root: &Path, event_tx: &mpsc::Sender<FileEvent>, exclude_patterns: &[String]) {
442 use std::fs;
443
444 fn visit_dir(dir: &Path, event_tx: &mpsc::Sender<FileEvent>, exclude_patterns: &[String]) {
445 let entries = match fs::read_dir(dir) {
446 Ok(e) => e,
447 Err(e) => {
448 warn!("Cannot read directory {:?}: {}", dir, e);
449 return;
450 }
451 };
452
453 for entry in entries.flatten() {
454 let path = entry.path();
455
456 let path_str = path.to_string_lossy();
458 let should_exclude = exclude_patterns.iter().any(|pattern| {
459 if pattern.contains("**") {
461 let parts: Vec<&str> = pattern.split("**").collect();
462 if parts.len() == 2 {
463 let prefix = parts[0].trim_matches('/');
464 let suffix = parts[1].trim_matches('/');
465 (prefix.is_empty() || path_str.contains(prefix))
466 && (suffix.is_empty() || path_str.contains(suffix))
467 } else {
468 false
469 }
470 } else if pattern.starts_with('*') {
471 path_str.ends_with(pattern.trim_start_matches('*'))
472 } else {
473 path_str.contains(pattern.trim_matches('*'))
474 }
475 });
476
477 if should_exclude {
478 continue;
479 }
480
481 if path.is_dir() {
482 visit_dir(&path, event_tx, exclude_patterns);
483 } else if path.is_file() {
484 if let Err(e) = event_tx.blocking_send(FileEvent::Created(path.clone())) {
486 warn!("Failed to queue file {:?}: {}", path, e);
487 }
488 }
489 }
490 }
491
492 visit_dir(root, event_tx, exclude_patterns);
493}
494
495async fn process_file(
497 path: &Path,
498 store: &Arc<dyn VectorStore>,
499 extractors: &Arc<ExtractorRegistry>,
500 chunkers: &Arc<ChunkerRegistry>,
501 embedder: &Arc<EmbedderPool>,
502 config: &IndexerConfig,
503) -> Result<u32> {
504 let metadata = tokio::fs::metadata(path)
506 .await
507 .map_err(|e| Error::Other(format!("Failed to get metadata: {e}")))?;
508
509 if !metadata.is_file() {
510 return Ok(0);
511 }
512
513 let content_hash = compute_hash(path).await?;
515
516 if let Ok(Some(existing)) = store.get_file(path).await
518 && existing.content_hash == content_hash
519 && existing.status == FileStatus::Indexed
520 {
521 debug!("File {:?} already indexed, skipping", path);
522 return Ok(existing.chunk_count);
523 }
524
525 let mime_type = mime_guess::from_path(path)
527 .first_or_text_plain()
528 .to_string();
529
530 let content = extractors
532 .extract(path, &mime_type)
533 .await
534 .map_err(Error::Extraction)?;
535
536 if content.text.is_empty() {
537 debug!("Empty content for {:?}, skipping", path);
538 return Ok(0);
539 }
540
541 let content_type = determine_content_type(path, &mime_type, &content);
543
544 let chunk_outputs = chunkers
546 .chunk(&content, &content_type, &config.chunk_config)
547 .await
548 .map_err(Error::Chunking)?;
549
550 if chunk_outputs.is_empty() {
551 return Ok(0);
552 }
553
554 let texts: Vec<&str> = chunk_outputs.iter().map(|c| c.content.as_str()).collect();
556
557 let embeddings = embedder
559 .embed_batch(&texts, &config.embed_config)
560 .await
561 .map_err(Error::Embedding)?;
562
563 let file_id = Uuid::new_v4();
565 let now = Utc::now();
566 let model_name = embedder.model_name().to_string();
567
568 let chunks: Vec<Chunk> = chunk_outputs
569 .into_iter()
570 .zip(embeddings.into_iter())
571 .enumerate()
572 .map(|(idx, (output, emb_output))| {
573 build_chunk(
574 file_id,
575 path,
576 idx as u32,
577 output,
578 emb_output.embedding,
579 &content_type,
580 &mime_type,
581 &model_name,
582 now,
583 )
584 })
585 .collect();
586
587 let chunk_count = chunks.len() as u32;
588
589 let _ = store.delete_by_file_path(path).await;
591
592 store.upsert_chunks(&chunks).await.map_err(Error::Store)?;
594
595 let file_record = FileRecord {
597 id: file_id,
598 path: path.to_path_buf(),
599 size_bytes: metadata.len(),
600 mime_type,
601 content_hash,
602 modified_at: metadata
603 .modified()
604 .map_or_else(|_| now, chrono::DateTime::<Utc>::from),
605 indexed_at: Some(now),
606 chunk_count,
607 status: FileStatus::Indexed,
608 error_message: None,
609 };
610
611 store
612 .upsert_file(&file_record)
613 .await
614 .map_err(Error::Store)?;
615
616 Ok(chunk_count)
617}
618
619async fn compute_hash(path: &Path) -> Result<String> {
621 let content = tokio::fs::read(path)
622 .await
623 .map_err(|e| Error::Other(format!("Failed to read file: {e}")))?;
624
625 let hash = blake3::hash(&content);
626 Ok(hash.to_hex().to_string())
627}
628
629fn determine_content_type(
631 path: &Path,
632 mime_type: &str,
633 content: &ragfs_core::ExtractedContent,
634) -> ContentType {
635 let code_extensions = [
637 ("rs", "rust"),
638 ("py", "python"),
639 ("js", "javascript"),
640 ("ts", "typescript"),
641 ("tsx", "typescript"),
642 ("jsx", "javascript"),
643 ("java", "java"),
644 ("go", "go"),
645 ("c", "c"),
646 ("cpp", "cpp"),
647 ("h", "c"),
648 ("hpp", "cpp"),
649 ("rb", "ruby"),
650 ("php", "php"),
651 ("swift", "swift"),
652 ("kt", "kotlin"),
653 ("scala", "scala"),
654 ("ex", "elixir"),
655 ("hs", "haskell"),
656 ("ml", "ocaml"),
657 ("lua", "lua"),
658 ("sh", "bash"),
659 ("sql", "sql"),
660 ];
661
662 if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
663 let ext_lower = ext.to_lowercase();
664 for (code_ext, lang) in &code_extensions {
665 if ext_lower == *code_ext {
666 return ContentType::Code {
667 language: (*lang).to_string(),
668 symbol: None,
669 };
670 }
671 }
672 }
673
674 if mime_type.contains("markdown")
676 || path
677 .extension()
678 .is_some_and(|e| e == "md" || e == "markdown")
679 {
680 return ContentType::Markdown;
681 }
682
683 if let Some(ref lang) = content.metadata.language
685 && !lang.is_empty()
686 {
687 return ContentType::Code {
688 language: lang.clone(),
689 symbol: None,
690 };
691 }
692
693 ContentType::Text
694}
695
696fn build_chunk(
698 file_id: Uuid,
699 file_path: &Path,
700 chunk_index: u32,
701 output: ChunkOutput,
702 embedding: Vec<f32>,
703 content_type: &ContentType,
704 mime_type: &str,
705 model_name: &str,
706 now: chrono::DateTime<Utc>,
707) -> Chunk {
708 Chunk {
709 id: Uuid::new_v4(),
710 file_id,
711 file_path: file_path.to_path_buf(),
712 content: output.content,
713 content_type: content_type.clone(),
714 mime_type: Some(mime_type.to_string()),
715 chunk_index,
716 byte_range: output.byte_range,
717 line_range: output.line_range,
718 parent_chunk_id: None,
719 depth: output.depth,
720 embedding: Some(embedding),
721 metadata: ChunkMetadata {
722 embedding_model: Some(model_name.to_string()),
723 indexed_at: Some(now),
724 token_count: None,
725 extra: Default::default(),
726 },
727 }
728}
729
730#[async_trait::async_trait]
731impl Indexer for IndexerService {
732 async fn watch(&self, path: &Path) -> Result<()> {
733 let mut w = self.watcher.write().await;
734 if let Some(ref mut watcher) = *w {
735 watcher
736 .watch(path)
737 .map_err(|e| Error::Other(format!("watch error: {e}")))?;
738 }
739 Ok(())
740 }
741
742 async fn stop(&self) -> Result<()> {
743 let mut running = self.running.write().await;
744 *running = false;
745 info!("Indexer stopped");
746 Ok(())
747 }
748
749 async fn index(&self, path: &Path, force: bool) -> Result<()> {
750 if force {
751 let _ = self.store.delete_by_file_path(path).await;
753 }
754
755 self.event_tx
757 .send(FileEvent::Modified(path.to_path_buf()))
758 .await
759 .map_err(|e| Error::Other(format!("send error: {e}")))?;
760 Ok(())
761 }
762
763 async fn stats(&self) -> Result<IndexStats> {
764 Ok(self.stats.read().await.clone())
765 }
766
767 async fn needs_reindex(&self, path: &Path) -> Result<bool> {
768 match self.store.get_file(path).await {
770 Ok(Some(record)) => {
771 let current_hash = compute_hash(path).await?;
772 Ok(record.content_hash != current_hash)
773 }
774 Ok(None) => Ok(true),
775 Err(e) => Err(Error::Store(e)),
776 }
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use super::*;
783 use ragfs_chunker::{ChunkerRegistry, FixedSizeChunker};
784 use ragfs_core::{
785 ContentMetadataInfo, EmbedError, Embedder, EmbeddingConfig, EmbeddingOutput, Modality,
786 SearchQuery, SearchResult, StoreError, StoreStats,
787 };
788 use ragfs_embed::EmbedderPool;
789 use ragfs_extract::ExtractorRegistry;
790 use std::collections::HashMap;
791 use tempfile::tempdir;
792
793 const TEST_DIM: usize = 384;
794
795 struct MockEmbedder {
798 dimension: usize,
799 }
800
801 impl MockEmbedder {
802 fn new(dimension: usize) -> Self {
803 Self { dimension }
804 }
805 }
806
807 #[async_trait::async_trait]
808 impl Embedder for MockEmbedder {
809 fn model_name(&self) -> &str {
810 "mock-embedder"
811 }
812
813 fn dimension(&self) -> usize {
814 self.dimension
815 }
816
817 fn max_tokens(&self) -> usize {
818 512
819 }
820
821 fn modalities(&self) -> &[Modality] {
822 &[Modality::Text]
823 }
824
825 async fn embed_text(
826 &self,
827 texts: &[&str],
828 _config: &EmbeddingConfig,
829 ) -> std::result::Result<Vec<EmbeddingOutput>, EmbedError> {
830 Ok(texts
831 .iter()
832 .map(|_| EmbeddingOutput {
833 embedding: vec![0.1; self.dimension],
834 token_count: 10,
835 })
836 .collect())
837 }
838
839 async fn embed_query(
840 &self,
841 _query: &str,
842 _config: &EmbeddingConfig,
843 ) -> std::result::Result<EmbeddingOutput, EmbedError> {
844 Ok(EmbeddingOutput {
845 embedding: vec![0.1; self.dimension],
846 token_count: 10,
847 })
848 }
849 }
850
851 struct MockStore {
854 chunks: Arc<RwLock<Vec<Chunk>>>,
855 files: Arc<RwLock<HashMap<PathBuf, FileRecord>>>,
856 }
857
858 impl MockStore {
859 fn new() -> Self {
860 Self {
861 chunks: Arc::new(RwLock::new(Vec::new())),
862 files: Arc::new(RwLock::new(HashMap::new())),
863 }
864 }
865 }
866
867 #[async_trait::async_trait]
868 impl VectorStore for MockStore {
869 async fn init(&self) -> std::result::Result<(), StoreError> {
870 Ok(())
871 }
872
873 async fn upsert_chunks(&self, chunks: &[Chunk]) -> std::result::Result<(), StoreError> {
874 let mut store = self.chunks.write().await;
875 for chunk in chunks {
876 store.push(chunk.clone());
877 }
878 Ok(())
879 }
880
881 async fn search(
882 &self,
883 _query: SearchQuery,
884 ) -> std::result::Result<Vec<SearchResult>, StoreError> {
885 Ok(vec![])
886 }
887
888 async fn hybrid_search(
889 &self,
890 _query: SearchQuery,
891 ) -> std::result::Result<Vec<SearchResult>, StoreError> {
892 Ok(vec![])
893 }
894
895 async fn delete_by_file_path(&self, path: &Path) -> std::result::Result<u64, StoreError> {
896 let mut chunks = self.chunks.write().await;
897 let initial_len = chunks.len();
898 chunks.retain(|c| c.file_path != path);
899 let deleted = initial_len - chunks.len();
900 let mut files = self.files.write().await;
901 files.remove(path);
902 Ok(deleted as u64)
903 }
904
905 async fn get_file(
906 &self,
907 path: &Path,
908 ) -> std::result::Result<Option<FileRecord>, StoreError> {
909 let files = self.files.read().await;
910 Ok(files.get(path).cloned())
911 }
912
913 async fn upsert_file(&self, record: &FileRecord) -> std::result::Result<(), StoreError> {
914 let mut files = self.files.write().await;
915 files.insert(record.path.clone(), record.clone());
916 Ok(())
917 }
918
919 async fn stats(&self) -> std::result::Result<StoreStats, StoreError> {
920 let chunks = self.chunks.read().await;
921 let files = self.files.read().await;
922 Ok(StoreStats {
923 total_chunks: chunks.len() as u64,
924 total_files: files.len() as u64,
925 index_size_bytes: 0,
926 last_updated: None,
927 })
928 }
929
930 async fn update_file_path(
931 &self,
932 _old_path: &Path,
933 _new_path: &Path,
934 ) -> std::result::Result<u64, StoreError> {
935 Ok(0)
936 }
937
938 async fn get_chunks_for_file(
939 &self,
940 path: &Path,
941 ) -> std::result::Result<Vec<Chunk>, StoreError> {
942 let chunks = self.chunks.read().await;
943 Ok(chunks
944 .iter()
945 .filter(|c| c.file_path == path)
946 .cloned()
947 .collect())
948 }
949
950 async fn get_all_chunks(&self) -> std::result::Result<Vec<Chunk>, StoreError> {
951 let chunks = self.chunks.read().await;
952 Ok(chunks.clone())
953 }
954
955 async fn get_all_files(&self) -> std::result::Result<Vec<FileRecord>, StoreError> {
956 let files = self.files.read().await;
957 Ok(files.values().cloned().collect())
958 }
959 }
960
961 #[test]
964 fn test_determine_content_type_rust() {
965 let path = PathBuf::from("/test/file.rs");
966 let content = ragfs_core::ExtractedContent {
967 text: "fn main() {}".to_string(),
968 elements: vec![],
969 images: vec![],
970 metadata: ContentMetadataInfo::default(),
971 };
972
973 let result = determine_content_type(&path, "text/x-rust", &content);
974 match result {
975 ContentType::Code { language, .. } => assert_eq!(language, "rust"),
976 _ => panic!("Expected Code content type"),
977 }
978 }
979
980 #[test]
981 fn test_determine_content_type_python() {
982 let path = PathBuf::from("/test/script.py");
983 let content = ragfs_core::ExtractedContent {
984 text: "def main(): pass".to_string(),
985 elements: vec![],
986 images: vec![],
987 metadata: ContentMetadataInfo::default(),
988 };
989
990 let result = determine_content_type(&path, "text/x-python", &content);
991 match result {
992 ContentType::Code { language, .. } => assert_eq!(language, "python"),
993 _ => panic!("Expected Code content type"),
994 }
995 }
996
997 #[test]
998 fn test_determine_content_type_markdown() {
999 let path = PathBuf::from("/test/readme.md");
1000 let content = ragfs_core::ExtractedContent {
1001 text: "# Hello".to_string(),
1002 elements: vec![],
1003 images: vec![],
1004 metadata: ContentMetadataInfo::default(),
1005 };
1006
1007 let result = determine_content_type(&path, "text/markdown", &content);
1008 assert!(matches!(result, ContentType::Markdown));
1009 }
1010
1011 #[test]
1012 fn test_determine_content_type_text() {
1013 let path = PathBuf::from("/test/notes.txt");
1014 let content = ragfs_core::ExtractedContent {
1015 text: "Some text content".to_string(),
1016 elements: vec![],
1017 images: vec![],
1018 metadata: ContentMetadataInfo::default(),
1019 };
1020
1021 let result = determine_content_type(&path, "text/plain", &content);
1022 assert!(matches!(result, ContentType::Text));
1023 }
1024
1025 #[test]
1026 fn test_determine_content_type_javascript() {
1027 let path = PathBuf::from("/test/app.js");
1028 let content = ragfs_core::ExtractedContent {
1029 text: "const x = 1;".to_string(),
1030 elements: vec![],
1031 images: vec![],
1032 metadata: ContentMetadataInfo::default(),
1033 };
1034
1035 let result = determine_content_type(&path, "application/javascript", &content);
1036 match result {
1037 ContentType::Code { language, .. } => assert_eq!(language, "javascript"),
1038 _ => panic!("Expected Code content type"),
1039 }
1040 }
1041
1042 #[tokio::test]
1043 async fn test_compute_hash() {
1044 let temp_dir = tempdir().unwrap();
1045 let file_path = temp_dir.path().join("test.txt");
1046 std::fs::write(&file_path, "test content").unwrap();
1047
1048 let hash = compute_hash(&file_path).await.unwrap();
1049 assert!(!hash.is_empty());
1050 assert_eq!(hash.len(), 64); let hash2 = compute_hash(&file_path).await.unwrap();
1054 assert_eq!(hash, hash2);
1055 }
1056
1057 #[tokio::test]
1058 async fn test_compute_hash_different_content() {
1059 let temp_dir = tempdir().unwrap();
1060
1061 let file1 = temp_dir.path().join("file1.txt");
1062 let file2 = temp_dir.path().join("file2.txt");
1063
1064 std::fs::write(&file1, "content 1").unwrap();
1065 std::fs::write(&file2, "content 2").unwrap();
1066
1067 let hash1 = compute_hash(&file1).await.unwrap();
1068 let hash2 = compute_hash(&file2).await.unwrap();
1069
1070 assert_ne!(hash1, hash2);
1071 }
1072
1073 #[test]
1074 fn test_build_chunk() {
1075 use ragfs_core::ChunkOutputMetadata;
1076
1077 let file_id = Uuid::new_v4();
1078 let file_path = PathBuf::from("/test/file.txt");
1079 let chunk_output = ChunkOutput {
1080 content: "Test chunk content".to_string(),
1081 byte_range: 0..18,
1082 line_range: Some(0..1),
1083 parent_index: None,
1084 depth: 0,
1085 metadata: ChunkOutputMetadata::default(),
1086 };
1087 let embedding = vec![0.1; TEST_DIM];
1088 let content_type = ContentType::Text;
1089 let mime_type = "text/plain";
1090 let model_name = "test-model";
1091 let now = Utc::now();
1092
1093 let chunk = build_chunk(
1094 file_id,
1095 &file_path,
1096 0,
1097 chunk_output,
1098 embedding.clone(),
1099 &content_type,
1100 mime_type,
1101 model_name,
1102 now,
1103 );
1104
1105 assert_eq!(chunk.file_id, file_id);
1106 assert_eq!(chunk.file_path, file_path);
1107 assert_eq!(chunk.chunk_index, 0);
1108 assert_eq!(chunk.content, "Test chunk content");
1109 assert_eq!(chunk.embedding, Some(embedding));
1110 assert_eq!(chunk.mime_type, Some("text/plain".to_string()));
1111 assert!(matches!(chunk.content_type, ContentType::Text));
1112 }
1113
1114 fn create_test_indexer(store: Arc<dyn VectorStore>) -> IndexerService {
1117 use ragfs_extract::TextExtractor;
1118
1119 let mut extractors = ExtractorRegistry::new();
1120 extractors.register("text", TextExtractor::new());
1121 let extractors = Arc::new(extractors);
1122
1123 let mut chunkers = ChunkerRegistry::new();
1124 chunkers.register("fixed", FixedSizeChunker::new());
1125 chunkers.set_default("fixed");
1126 let chunkers = Arc::new(chunkers);
1127
1128 let embedder = Arc::new(MockEmbedder::new(TEST_DIM));
1129 let embedder_pool = Arc::new(EmbedderPool::new(embedder, 1));
1130
1131 let config = IndexerConfig::default();
1132
1133 IndexerService::new(
1134 PathBuf::from("/tmp"),
1135 store,
1136 extractors,
1137 chunkers,
1138 embedder_pool,
1139 config,
1140 )
1141 }
1142
1143 #[tokio::test]
1144 async fn test_process_single_file() {
1145 let temp_dir = tempdir().unwrap();
1146 let file_path = temp_dir.path().join("test.txt");
1147 std::fs::write(&file_path, "This is test content for indexing.").unwrap();
1148
1149 let store = Arc::new(MockStore::new());
1150 let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1151
1152 let chunk_count = indexer.process_single(&file_path).await.unwrap();
1153
1154 assert!(chunk_count > 0, "Should have created at least one chunk");
1155
1156 let stored_chunks = store.chunks.read().await;
1158 assert!(!stored_chunks.is_empty());
1159 assert!(stored_chunks.iter().all(|c| c.file_path == file_path));
1160
1161 let files = store.files.read().await;
1163 assert!(files.contains_key(&file_path));
1164 let file_record = files.get(&file_path).unwrap();
1165 assert_eq!(file_record.chunk_count, chunk_count);
1166 assert_eq!(file_record.status, FileStatus::Indexed);
1167 }
1168
1169 #[tokio::test]
1170 async fn test_process_single_skip_already_indexed() {
1171 let temp_dir = tempdir().unwrap();
1172 let file_path = temp_dir.path().join("test.txt");
1173 std::fs::write(&file_path, "Test content").unwrap();
1174
1175 let store = Arc::new(MockStore::new());
1176 let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1177
1178 let chunk_count1 = indexer.process_single(&file_path).await.unwrap();
1180
1181 let _chunks_after_first = store.chunks.read().await.len();
1183
1184 let chunk_count2 = indexer.process_single(&file_path).await.unwrap();
1186
1187 assert_eq!(chunk_count1, chunk_count2);
1188
1189 let chunks_after_second = store.chunks.read().await.len();
1192 assert!(chunks_after_second > 0);
1194 }
1195
1196 #[tokio::test]
1197 async fn test_needs_reindex_new_file() {
1198 let temp_dir = tempdir().unwrap();
1199 let file_path = temp_dir.path().join("new_file.txt");
1200 std::fs::write(&file_path, "New content").unwrap();
1201
1202 let store = Arc::new(MockStore::new());
1203 let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1204
1205 let needs = indexer.needs_reindex(&file_path).await.unwrap();
1207 assert!(needs);
1208 }
1209
1210 #[tokio::test]
1211 async fn test_needs_reindex_unchanged_file() {
1212 let temp_dir = tempdir().unwrap();
1213 let file_path = temp_dir.path().join("unchanged.txt");
1214 std::fs::write(&file_path, "Unchanged content").unwrap();
1215
1216 let store = Arc::new(MockStore::new());
1217 let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1218
1219 indexer.process_single(&file_path).await.unwrap();
1221
1222 let needs = indexer.needs_reindex(&file_path).await.unwrap();
1224 assert!(!needs);
1225 }
1226
1227 #[tokio::test]
1228 async fn test_needs_reindex_modified_file() {
1229 let temp_dir = tempdir().unwrap();
1230 let file_path = temp_dir.path().join("modified.txt");
1231 std::fs::write(&file_path, "Original content").unwrap();
1232
1233 let store = Arc::new(MockStore::new());
1234 let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1235
1236 indexer.process_single(&file_path).await.unwrap();
1238
1239 std::fs::write(&file_path, "Modified content - different!").unwrap();
1241
1242 let needs = indexer.needs_reindex(&file_path).await.unwrap();
1244 assert!(needs);
1245 }
1246
1247 #[tokio::test]
1248 async fn test_reindex_path_file() {
1249 let temp_dir = tempdir().unwrap();
1250 let file_path = temp_dir.path().join("reindex_test.txt");
1251 std::fs::write(&file_path, "Original content for reindex test").unwrap();
1252
1253 let store = Arc::new(MockStore::new());
1254 let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1255
1256 indexer.process_single(&file_path).await.unwrap();
1258
1259 let initial_chunks = store.chunks.read().await.len();
1260 assert!(initial_chunks > 0);
1261
1262 std::fs::write(&file_path, "New content after modification").unwrap();
1264
1265 indexer.reindex_path(&file_path).await.unwrap();
1267
1268 let files = store.files.read().await;
1270 assert!(files.contains_key(&file_path));
1271 }
1272
1273 #[tokio::test]
1274 async fn test_empty_file_returns_zero_chunks() {
1275 let temp_dir = tempdir().unwrap();
1276 let file_path = temp_dir.path().join("empty.txt");
1277 std::fs::write(&file_path, "").unwrap();
1278
1279 let store = Arc::new(MockStore::new());
1280 let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1281
1282 let chunk_count = indexer.process_single(&file_path).await.unwrap();
1283
1284 assert_eq!(chunk_count, 0, "Empty file should produce zero chunks");
1285 }
1286
1287 #[test]
1288 fn test_indexer_config_default() {
1289 let config = IndexerConfig::default();
1290
1291 assert!(!config.include_patterns.is_empty());
1292 assert!(config.include_patterns.contains(&"**/*".to_string()));
1293
1294 assert!(!config.exclude_patterns.is_empty());
1295 assert!(config.exclude_patterns.contains(&"**/.git/**".to_string()));
1296 assert!(
1297 config
1298 .exclude_patterns
1299 .contains(&"**/node_modules/**".to_string())
1300 );
1301 assert!(
1302 config
1303 .exclude_patterns
1304 .contains(&"**/target/**".to_string())
1305 );
1306 }
1307
1308 #[tokio::test]
1309 async fn test_subscribe_receives_updates() {
1310 let store = Arc::new(MockStore::new());
1311 let indexer = create_test_indexer(Arc::clone(&store) as Arc<dyn VectorStore>);
1312
1313 let mut receiver = indexer.subscribe();
1314
1315 assert!(receiver.try_recv().is_err()); }
1322
1323 #[test]
1324 fn test_index_update_variants() {
1325 let _indexed = IndexUpdate::FileIndexed {
1327 path: PathBuf::from("/test"),
1328 chunk_count: 5,
1329 };
1330
1331 let _removed = IndexUpdate::FileRemoved {
1332 path: PathBuf::from("/test"),
1333 };
1334
1335 let _error = IndexUpdate::FileError {
1336 path: PathBuf::from("/test"),
1337 error: "test error".to_string(),
1338 };
1339
1340 let _started = IndexUpdate::IndexingStarted {
1341 path: PathBuf::from("/test"),
1342 };
1343 }
1344}