Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/vector_indexer/contextual_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ def __init__(

async def process_document(
self, document: ProcessingDocument
) -> List[ContextualChunk]:
) -> tuple[List[ContextualChunk], int]:
"""
Process single document into contextual chunks.

Args:
document: Document to process

Returns:
List of contextual chunks with embeddings
Tuple of (contextual chunks with embeddings, number of chunks
dropped due to context-generation failure)
"""
logger.info(
f"Processing document {document.document_hash} ({len(document.content)} characters)"
Expand All @@ -69,11 +70,13 @@ async def process_document(
# Step 3: Create contextual chunks (filter out failed context generations)
contextual_chunks: List[ContextualChunk] = []
valid_contextual_contents: List[str] = []
failed_chunks = 0

for i, (base_chunk, context) in enumerate(
zip(base_chunks, contexts, strict=True)
):
if isinstance(context, Exception):
failed_chunks += 1
self.error_logger.log_context_generation_failure(
document.document_hash, i, str(context), self.config.max_retries
)
Expand Down Expand Up @@ -128,7 +131,7 @@ async def process_document(
logger.error(
f"No valid chunks created for document {document.document_hash}"
)
return []
return [], failed_chunks

# Step 4: Create embeddings for all valid contextual chunks
try:
Expand All @@ -154,9 +157,10 @@ async def process_document(
raise

logger.info(
f"Successfully processed document {document.document_hash}: {len(contextual_chunks)} chunks"
f"Successfully processed document {document.document_hash}: "
f"{len(contextual_chunks)} chunks ({failed_chunks} dropped)"
)
return contextual_chunks
return contextual_chunks, failed_chunks

except Exception as e:
logger.error(
Expand Down
4 changes: 3 additions & 1 deletion src/vector_indexer/error_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,17 @@ def log_processing_stats(self, stats: ProcessingStats) -> None:
stats_dict["end_time"] = stats.end_time.isoformat()
stats_dict["duration"] = stats.duration
stats_dict["success_rate"] = stats.success_rate
stats_dict["chunk_success_rate"] = stats.chunk_success_rate

with open(self.config.stats_log_file, "w", encoding="utf-8") as f:
json.dump(stats_dict, f, indent=2)

logger.info(
f"Processing completed - Success rate: {stats.success_rate:.1%}, "
f"Chunk success rate: {stats.chunk_success_rate:.1%}, "
f"Duration: {stats.duration}, "
f"Processed: {stats.documents_processed}/{stats.total_documents} documents, "
f"Chunks: {stats.total_chunks_processed}"
f"Chunks: {stats.total_chunks_processed} ok / {stats.total_chunks_failed} failed"
)
except Exception as e:
logger.error(f"Failed to write stats log: {e}")
Expand Down
80 changes: 52 additions & 28 deletions src/vector_indexer/main_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
sys.path.append(str(Path(__file__).parent.parent))

from vector_indexer.config.config_loader import ConfigLoader
from vector_indexer.document_loader import DocumentLoader
from vector_indexer.document_loader import DocumentLoader, DocumentLoadError
from vector_indexer.contextual_processor import ContextualProcessor
from vector_indexer.qdrant_manager import QdrantManager
from vector_indexer.error_logger import ErrorLogger
Expand Down Expand Up @@ -169,7 +169,7 @@ async def process_all_documents(self) -> ProcessingStats:

# Process documents with controlled concurrency
semaphore = asyncio.Semaphore(self.config.max_concurrent_documents)
tasks: List[asyncio.Task[tuple[int, str]]] = []
tasks: List[asyncio.Task[tuple[int, str, int]]] = []

for doc_info in documents:
task = asyncio.create_task(
Expand All @@ -189,6 +189,9 @@ async def process_all_documents(self) -> ProcessingStats:
chunks_info: Dict[
str, Dict[str, Any]
] = {} # Track chunk counts for metadata update
# Only documents that processed successfully are marked as
# processed in DVC tracking, so failures are retried next run.
processed_documents: List[DocumentInfo] = []
for i, result in enumerate(results):
if isinstance(result, Exception):
doc_info = documents[i]
Expand All @@ -200,16 +203,18 @@ async def process_all_documents(self) -> ProcessingStats:
doc_info.document_hash, str(result)
)
else:
# Result should be tuple of (chunk_count, content_hash)
# Result should be tuple of (chunk_count, content_hash, failed_chunks)
doc_info = documents[i]
self.stats.documents_processed += 1
if isinstance(result, tuple) and len(result) == 2:
chunk_count, content_hash = result
processed_documents.append(doc_info)
if isinstance(result, tuple) and len(result) == 3:
chunk_count, content_hash, failed_chunks = result
self.stats.total_chunks_processed += chunk_count
self.stats.total_chunks_failed += failed_chunks
# Track chunk count using content_hash (not directory hash)
chunks_info[content_hash] = {"chunk_count": chunk_count}
logger.info(
f"CHUNK COUNT: Document {doc_info.document_hash[:12]}... (content: {content_hash[:12]}...) -> {chunk_count} chunks"
f"CHUNK COUNT: Document {doc_info.document_hash[:12]}... (content: {content_hash[:12]}...) -> {chunk_count} chunks ({failed_chunks} failed)"
)

# Log the complete chunks_info dictionary
Expand All @@ -227,10 +232,10 @@ async def process_all_documents(self) -> ProcessingStats:
# Step 4: Update processed files tracking (even if no new documents processed)
if diff_detector:
try:
# Update metadata for newly processed files
if documents:
# Update metadata for newly processed files (successful only)
if processed_documents:
processed_paths = [
doc.cleaned_txt_path for doc in documents
doc.cleaned_txt_path for doc in processed_documents
]
if processed_paths:
logger.debug(
Expand Down Expand Up @@ -290,7 +295,7 @@ async def _process_single_document(
doc_info: DocumentInfo,
qdrant_manager: QdrantManager,
semaphore: asyncio.Semaphore,
) -> tuple[int, str]:
) -> tuple[int, str, int]:
"""
Process a single document with contextual retrieval.

Expand All @@ -300,7 +305,9 @@ async def _process_single_document(
semaphore: Concurrency control semaphore

Returns:
tuple: (chunk_count: int, content_hash: str) or Exception on error
tuple: (chunk_count: int, content_hash: str, failed_chunks: int).
Raises on any failure (including load failure or zero usable chunks),
so the document is counted as failed rather than as success.
"""
async with semaphore:
logger.info(f"Processing document: {doc_info.document_hash}")
Expand All @@ -310,29 +317,31 @@ async def _process_single_document(
document = self.document_loader.load_document(doc_info)

if not document:
logger.warning(f"Could not load document: {doc_info.document_hash}")
return (0, doc_info.document_hash)
raise DocumentLoadError(
f"Could not load document: {doc_info.document_hash}"
)

# Process document with contextual retrieval
contextual_chunks = await self.contextual_processor.process_document(
document
)
(
contextual_chunks,
failed_chunks,
) = await self.contextual_processor.process_document(document)

if not contextual_chunks:
logger.warning(
f"No chunks created for document: {doc_info.document_hash}"
raise RuntimeError(
f"No chunks created for document: {doc_info.document_hash} "
f"({failed_chunks} chunks failed context generation)"
)
return (0, document.document_hash)

# Store chunks in Qdrant
await qdrant_manager.store_chunks(contextual_chunks)

logger.info(
f"Successfully processed document {doc_info.document_hash}: "
f"{len(contextual_chunks)} chunks"
f"{len(contextual_chunks)} chunks ({failed_chunks} dropped)"
)

return (len(contextual_chunks), document.document_hash)
return (len(contextual_chunks), document.document_hash, failed_chunks)

except Exception as e:
logger.error(f"Error processing document {doc_info.document_hash}: {e}")
Expand All @@ -352,10 +361,12 @@ def _log_final_summary(self) -> None:
logger.info(f" • Failed Chunks: {self.stats.total_chunks_failed}")

if self.stats.total_documents > 0:
success_rate = (
self.stats.documents_processed / self.stats.total_documents
) * 100
logger.info(f"Success Rate: {success_rate:.1f}%")
logger.info(f"Success Rate: {self.stats.success_rate * 100:.1f}%")

if self.stats.total_chunks_processed + self.stats.total_chunks_failed > 0:
logger.info(
f"Chunk Success Rate: {self.stats.chunk_success_rate * 100:.1f}%"
)

logger.info(f"Processing Duration: {self.stats.duration}")

Expand All @@ -365,6 +376,11 @@ def _log_final_summary(self) -> None:
)
logger.info("Check failure logs for details")

if self.stats.total_chunks_failed > 0:
logger.warning(
f" {self.stats.total_chunks_failed} chunks failed processing"
)

async def run_health_check(self) -> bool:
"""
Run health check on all components.
Expand Down Expand Up @@ -617,12 +633,20 @@ async def _execute_cleanup_operations(
return total_deleted

def _cleanup_datasets(self) -> None:
"""Remove datasets folder after processing."""
"""Remove datasets folder contents after processing.

Only the folder's contents are removed, not the folder itself, since
the datasets path is a mounted volume in the container.
"""
try:
datasets_path = Path(self.config.dataset_base_path)
if datasets_path.exists():
shutil.rmtree(str(datasets_path))
logger.info(f"Datasets folder cleaned up: {datasets_path}")
for child in datasets_path.iterdir():
if child.is_dir():
shutil.rmtree(str(child))
else:
child.unlink()
logger.info(f"Datasets folder contents cleaned up: {datasets_path}")
else:
logger.debug(f"Datasets folder does not exist: {datasets_path}")
except Exception as e:
Expand Down
8 changes: 8 additions & 0 deletions src/vector_indexer/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ def success_rate(self) -> float:
return self.documents_processed / self.total_documents
return 0.0

@property
def chunk_success_rate(self) -> float:
"""Calculate chunk success rate (processed vs processed + failed)."""
total_chunks = self.total_chunks_processed + self.total_chunks_failed
if total_chunks > 0:
return self.total_chunks_processed / total_chunks
return 0.0


class ProcessingError(BaseModel):
"""Error information for failed processing."""
Expand Down
Loading