diff --git a/src/vector_indexer/contextual_processor.py b/src/vector_indexer/contextual_processor.py index b225cf3..6b21d32 100644 --- a/src/vector_indexer/contextual_processor.py +++ b/src/vector_indexer/contextual_processor.py @@ -41,7 +41,7 @@ def __init__( async def process_document( self, document: ProcessingDocument - ) -> List[ContextualChunk]: + ) -> tuple[List[ContextualChunk], int]: """ Process single document into contextual chunks. @@ -49,7 +49,8 @@ async def process_document( document: Document to process Returns: - List of contextual chunks with embeddings + Tuple of (contextual chunks with embeddings, number of chunks + dropped due to context-generation failure) """ logger.info( f"Processing document {document.document_hash} ({len(document.content)} characters)" @@ -69,11 +70,13 @@ async def process_document( # Step 3: Create contextual chunks (filter out failed context generations) contextual_chunks: List[ContextualChunk] = [] valid_contextual_contents: List[str] = [] + failed_chunks = 0 for i, (base_chunk, context) in enumerate( zip(base_chunks, contexts, strict=True) ): if isinstance(context, Exception): + failed_chunks += 1 self.error_logger.log_context_generation_failure( document.document_hash, i, str(context), self.config.max_retries ) @@ -128,7 +131,7 @@ async def process_document( logger.error( f"No valid chunks created for document {document.document_hash}" ) - return [] + return [], failed_chunks # Step 4: Create embeddings for all valid contextual chunks try: @@ -154,9 +157,10 @@ async def process_document( raise logger.info( - f"Successfully processed document {document.document_hash}: {len(contextual_chunks)} chunks" + f"Successfully processed document {document.document_hash}: " + f"{len(contextual_chunks)} chunks ({failed_chunks} dropped)" ) - return contextual_chunks + return contextual_chunks, failed_chunks except Exception as e: logger.error( diff --git a/src/vector_indexer/error_logger.py b/src/vector_indexer/error_logger.py index 1d11cba..c62de79 100644 --- a/src/vector_indexer/error_logger.py +++ b/src/vector_indexer/error_logger.py @@ -158,15 +158,17 @@ def log_processing_stats(self, stats: ProcessingStats) -> None: stats_dict["end_time"] = stats.end_time.isoformat() stats_dict["duration"] = stats.duration stats_dict["success_rate"] = stats.success_rate + stats_dict["chunk_success_rate"] = stats.chunk_success_rate with open(self.config.stats_log_file, "w", encoding="utf-8") as f: json.dump(stats_dict, f, indent=2) logger.info( f"Processing completed - Success rate: {stats.success_rate:.1%}, " + f"Chunk success rate: {stats.chunk_success_rate:.1%}, " f"Duration: {stats.duration}, " f"Processed: {stats.documents_processed}/{stats.total_documents} documents, " - f"Chunks: {stats.total_chunks_processed}" + f"Chunks: {stats.total_chunks_processed} ok / {stats.total_chunks_failed} failed" ) except Exception as e: logger.error(f"Failed to write stats log: {e}") diff --git a/src/vector_indexer/main_indexer.py b/src/vector_indexer/main_indexer.py index 45ce5ff..bf40768 100644 --- a/src/vector_indexer/main_indexer.py +++ b/src/vector_indexer/main_indexer.py @@ -15,7 +15,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from vector_indexer.config.config_loader import ConfigLoader -from vector_indexer.document_loader import DocumentLoader +from vector_indexer.document_loader import DocumentLoader, DocumentLoadError from vector_indexer.contextual_processor import ContextualProcessor from vector_indexer.qdrant_manager import QdrantManager from vector_indexer.error_logger import ErrorLogger @@ -169,7 +169,7 @@ async def process_all_documents(self) -> ProcessingStats: # Process documents with controlled concurrency semaphore = asyncio.Semaphore(self.config.max_concurrent_documents) - tasks: List[asyncio.Task[tuple[int, str]]] = [] + tasks: List[asyncio.Task[tuple[int, str, int]]] = [] for doc_info in documents: task = asyncio.create_task( @@ -189,6 +189,9 @@ async def process_all_documents(self) -> ProcessingStats: chunks_info: Dict[ str, Dict[str, Any] ] = {} # Track chunk counts for metadata update + # Only documents that processed successfully are marked as + # processed in DVC tracking, so failures are retried next run. + processed_documents: List[DocumentInfo] = [] for i, result in enumerate(results): if isinstance(result, Exception): doc_info = documents[i] @@ -200,16 +203,18 @@ async def process_all_documents(self) -> ProcessingStats: doc_info.document_hash, str(result) ) else: - # Result should be tuple of (chunk_count, content_hash) + # Result should be tuple of (chunk_count, content_hash, failed_chunks) doc_info = documents[i] self.stats.documents_processed += 1 - if isinstance(result, tuple) and len(result) == 2: - chunk_count, content_hash = result + processed_documents.append(doc_info) + if isinstance(result, tuple) and len(result) == 3: + chunk_count, content_hash, failed_chunks = result self.stats.total_chunks_processed += chunk_count + self.stats.total_chunks_failed += failed_chunks # Track chunk count using content_hash (not directory hash) chunks_info[content_hash] = {"chunk_count": chunk_count} logger.info( - f"CHUNK COUNT: Document {doc_info.document_hash[:12]}... (content: {content_hash[:12]}...) -> {chunk_count} chunks" + f"CHUNK COUNT: Document {doc_info.document_hash[:12]}... (content: {content_hash[:12]}...) -> {chunk_count} chunks ({failed_chunks} failed)" ) # Log the complete chunks_info dictionary @@ -227,10 +232,10 @@ async def process_all_documents(self) -> ProcessingStats: # Step 4: Update processed files tracking (even if no new documents processed) if diff_detector: try: - # Update metadata for newly processed files - if documents: + # Update metadata for newly processed files (successful only) + if processed_documents: processed_paths = [ - doc.cleaned_txt_path for doc in documents + doc.cleaned_txt_path for doc in processed_documents ] if processed_paths: logger.debug( @@ -290,7 +295,7 @@ async def _process_single_document( doc_info: DocumentInfo, qdrant_manager: QdrantManager, semaphore: asyncio.Semaphore, - ) -> tuple[int, str]: + ) -> tuple[int, str, int]: """ Process a single document with contextual retrieval. @@ -300,7 +305,9 @@ async def _process_single_document( semaphore: Concurrency control semaphore Returns: - tuple: (chunk_count: int, content_hash: str) or Exception on error + tuple: (chunk_count: int, content_hash: str, failed_chunks: int). + Raises on any failure (including load failure or zero usable chunks), + so the document is counted as failed rather than as success. """ async with semaphore: logger.info(f"Processing document: {doc_info.document_hash}") @@ -310,29 +317,31 @@ async def _process_single_document( document = self.document_loader.load_document(doc_info) if not document: - logger.warning(f"Could not load document: {doc_info.document_hash}") - return (0, doc_info.document_hash) + raise DocumentLoadError( + f"Could not load document: {doc_info.document_hash}" + ) # Process document with contextual retrieval - contextual_chunks = await self.contextual_processor.process_document( - document - ) + ( + contextual_chunks, + failed_chunks, + ) = await self.contextual_processor.process_document(document) if not contextual_chunks: - logger.warning( - f"No chunks created for document: {doc_info.document_hash}" + raise RuntimeError( + f"No chunks created for document: {doc_info.document_hash} " + f"({failed_chunks} chunks failed context generation)" ) - return (0, document.document_hash) # Store chunks in Qdrant await qdrant_manager.store_chunks(contextual_chunks) logger.info( f"Successfully processed document {doc_info.document_hash}: " - f"{len(contextual_chunks)} chunks" + f"{len(contextual_chunks)} chunks ({failed_chunks} dropped)" ) - return (len(contextual_chunks), document.document_hash) + return (len(contextual_chunks), document.document_hash, failed_chunks) except Exception as e: logger.error(f"Error processing document {doc_info.document_hash}: {e}") @@ -352,10 +361,12 @@ def _log_final_summary(self) -> None: logger.info(f" • Failed Chunks: {self.stats.total_chunks_failed}") if self.stats.total_documents > 0: - success_rate = ( - self.stats.documents_processed / self.stats.total_documents - ) * 100 - logger.info(f"Success Rate: {success_rate:.1f}%") + logger.info(f"Success Rate: {self.stats.success_rate * 100:.1f}%") + + if self.stats.total_chunks_processed + self.stats.total_chunks_failed > 0: + logger.info( + f"Chunk Success Rate: {self.stats.chunk_success_rate * 100:.1f}%" + ) logger.info(f"Processing Duration: {self.stats.duration}") @@ -365,6 +376,11 @@ def _log_final_summary(self) -> None: ) logger.info("Check failure logs for details") + if self.stats.total_chunks_failed > 0: + logger.warning( + f" {self.stats.total_chunks_failed} chunks failed processing" + ) + async def run_health_check(self) -> bool: """ Run health check on all components. @@ -617,12 +633,20 @@ async def _execute_cleanup_operations( return total_deleted def _cleanup_datasets(self) -> None: - """Remove datasets folder after processing.""" + """Remove datasets folder contents after processing. + + Only the folder's contents are removed, not the folder itself, since + the datasets path is a mounted volume in the container. + """ try: datasets_path = Path(self.config.dataset_base_path) if datasets_path.exists(): - shutil.rmtree(str(datasets_path)) - logger.info(f"Datasets folder cleaned up: {datasets_path}") + for child in datasets_path.iterdir(): + if child.is_dir(): + shutil.rmtree(str(child)) + else: + child.unlink() + logger.info(f"Datasets folder contents cleaned up: {datasets_path}") else: logger.debug(f"Datasets folder does not exist: {datasets_path}") except Exception as e: diff --git a/src/vector_indexer/models.py b/src/vector_indexer/models.py index 752ea02..41ae1ce 100644 --- a/src/vector_indexer/models.py +++ b/src/vector_indexer/models.py @@ -96,6 +96,14 @@ def success_rate(self) -> float: return self.documents_processed / self.total_documents return 0.0 + @property + def chunk_success_rate(self) -> float: + """Calculate chunk success rate (processed vs processed + failed).""" + total_chunks = self.total_chunks_processed + self.total_chunks_failed + if total_chunks > 0: + return self.total_chunks_processed / total_chunks + return 0.0 + class ProcessingError(BaseModel): """Error information for failed processing."""