A comprehensive Retrieval-Augmented Generation (RAG) system designed for Healthcare IT that aggregates information from multiple data sources (PDFs, SharePoint, databases, and more) to provide concise, unified outputs.
- Architecture Overview
- Complete Code Flow
- Module Deep Dive
- Tech Stack
- Project Structure
- Quick Start
- Usage Examples
- Configuration Options
- Healthcare IT Considerations
- Extending the System
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β KnowledgeRAG System β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β PDF Files β β SharePoint β β Databases β β Web/URLs β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DATA CONNECTORS LAYER β β
β β src/connectors/ β β
β β βββ base.py (BaseConnector, ConnectorRegistry) β β
β β βββ pdf_connector.py (PDFConnector) β β
β β βββ sharepoint_connector.py (SharePointConnector) β β
β β βββ database_connector.py (DatabaseConnector) β β
β β βββ web_connector.py (WebConnector) β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ List[Document] β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PROCESSING PIPELINE β β
β β src/processing/ β β
β β βββ preprocessor.py (TextPreprocessor, PHIMasker) β β
β β βββ chunker.py (RecursiveChunker, SemanticChunker) β β
β β βββ embeddings.py (EmbeddingManager) β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ List[Document] with embeddings β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β VECTOR STORE LAYER β β
β β src/vectorstore/ β β
β β βββ store.py (VectorStoreManager - ChromaDB/FAISS) β β
β β βββ retriever.py (VectorRetriever, HybridRetriever) β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ Retrieved chunks with scores β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RAG ENGINE LAYER β β
β β src/rag/ β β
β β βββ engine.py (RAGEngine - main orchestrator) β β
β β βββ prompts.py (PromptTemplates - healthcare-specific) β β
β β βββ reranker.py (CrossEncoderReranker, LLMReranker) β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ RAGResponse (answer + sources) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β API LAYER β β
β β src/api/ β β
β β βββ main.py (FastAPI app, lifespan management) β β
β β βββ routes.py (/query, /ingest, /sources, /health) β β
β β βββ models.py (Pydantic request/response schemas) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β UTILITIES β β
β β src/utils/ β β
β β βββ config.py (Settings - loads .env and settings.yaml) β β
β β βββ logger.py (Structured logging with loguru) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
This section explains the entire data flow from document ingestion to query response.
User adds documents β Connector loads β Preprocessor cleans β Chunker splits β Embeddings generated β Stored in VectorDB
Step-by-step:
# 1. CONNECTOR: Load raw documents from source
# File: src/connectors/pdf_connector.py
# Class: PDFConnector.load()
# Output: List[Document] with raw text + metadata
pdf_connector = PDFConnector(source_path="./data/documents")
raw_documents = pdf_connector.load()
# Each Document has: page_content (str), metadata (dict with source, page, etc.)
# 2. PREPROCESSOR: Clean and normalize text
# File: src/processing/preprocessor.py
# Class: TextPreprocessor.process()
# - Removes extra whitespace
# - Normalizes unicode
# - Optionally masks PHI (Protected Health Information)
preprocessor = TextPreprocessor(mask_phi=True)
clean_documents = preprocessor.process_documents(raw_documents)
# 3. CHUNKER: Split documents into smaller pieces
# File: src/processing/chunker.py
# Class: RecursiveChunker.chunk()
# - Splits by paragraph β sentence β word
# - Maintains chunk_size with overlap for context
chunker = RecursiveChunker(chunk_size=1000, chunk_overlap=200)
chunks = chunker.chunk_documents(clean_documents)
# Now we have many small Document objects (chunks)
# 4. EMBEDDINGS: Convert text to vectors
# File: src/processing/embeddings.py
# Class: EmbeddingManager.embed_documents()
# - Uses OpenAI text-embedding-ada-002 or HuggingFace
# - Returns 1536-dim vectors (OpenAI) or 384-dim (MiniLM)
embedding_manager = EmbeddingManager(provider="openai")
# Embeddings are generated during vector store addition
# 5. VECTOR STORE: Store chunks with embeddings
# File: src/vectorstore/store.py
# Class: VectorStoreManager.add_documents()
# - Stores in ChromaDB (persistent) or FAISS (in-memory)
# - Metadata preserved for source attribution
store = VectorStoreManager(store_type="chroma", persist_directory="./chroma_db")
store.add_documents(chunks) # Embeddings generated automaticallyUser query β Embed query β Vector search β Rerank results β Build prompt β LLM generates β Return response with sources
Step-by-step:
# 1. USER QUERY: Natural language question
query = "What are the compliance requirements for patient data?"
# 2. QUERY EMBEDDING: Convert question to vector
# File: src/processing/embeddings.py
# Same embedding model used for documents
query_embedding = embedding_manager.embed_query(query)
# 3. RETRIEVAL: Find similar chunks
# File: src/vectorstore/retriever.py
# Class: VectorRetriever.retrieve() or HybridRetriever
# - Performs cosine similarity search
# - Returns top_k most relevant chunks
retriever = VectorRetriever(vector_store=store, top_k=5)
relevant_chunks = retriever.retrieve(query)
# Each chunk has: content, metadata (source, page), similarity score
# 4. RERANKING: Improve relevance ordering
# File: src/rag/reranker.py
# Class: CrossEncoderReranker.rerank()
# - Uses cross-encoder model for precise scoring
# - Re-orders chunks by true relevance
reranker = CrossEncoderReranker()
reranked_chunks = reranker.rerank(query, relevant_chunks)
# 5. PROMPT BUILDING: Construct LLM prompt
# File: src/rag/prompts.py
# Class: PromptTemplates.get_qa_prompt()
# - Injects retrieved context into template
# - Healthcare-specific instructions
prompt = PromptTemplates.get_qa_prompt(
context=reranked_chunks,
question=query
)
# Prompt includes: system instructions + context + question
# 6. LLM GENERATION: Get answer from OpenAI
# File: src/rag/engine.py
# Class: RAGEngine._generate_response()
# - Calls OpenAI GPT-4 or GPT-3.5
# - Synthesizes answer from multiple sources
llm = ChatOpenAI(model="gpt-4", temperature=0.1)
answer = llm.invoke(prompt)
# 7. RESPONSE BUILDING: Package answer with metadata
# File: src/rag/engine.py
# Dataclass: RAGResponse
response = RAGResponse(
answer=answer.content,
sources=[chunk.metadata['source'] for chunk in reranked_chunks],
confidence=calculate_confidence(reranked_chunks)
)βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DOCUMENT INGESTION FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β PDF/SharePoint/DB/Web β
β β β
β βΌ β
β βββββββββββββββββββββ β
β β Connector β β src/connectors/*.py β
β β .load() β Returns: List[Document] β
β βββββββββββ¬ββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββ β
β β Preprocessor β β src/processing/preprocessor.py β
β β .process_docs() β - Clean text, mask PHI β
β βββββββββββ¬ββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββ β
β β Chunker β β src/processing/chunker.py β
β β .chunk_docs() β - Split into 1000-char chunks β
β βββββββββββ¬ββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββ β
β β VectorStore β β src/vectorstore/store.py β
β β .add_documents() β - Generate embeddings β
β β β - Store in ChromaDB/FAISS β
β βββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β QUERY FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β User Question: "What are compliance requirements?" β
β β β
β βΌ β
β βββββββββββββββββββββ β
β β RAGEngine β β src/rag/engine.py (ORCHESTRATOR) β
β β .query() β β
β βββββββββββ¬ββββββββββ β
β β β
β ββββββββββββββββββββββββββββββββββββββββ β
β βΌ β β
β βββββββββββββββββββββ β β
β β Retriever β β src/vectorstore/retriever.py β
β β .retrieve() β - Embed query β
β β β - Vector similarity search β
β β β - Return top_k chunks β
β βββββββββββ¬ββββββββββ β β
β β β β
β βΌ β β
β βββββββββββββββββββββ β β
β β Reranker β β src/rag/reranker.py β β
β β .rerank() β - Cross-encoder scoring β
β β β - Reorder by relevance β
β βββββββββββ¬ββββββββββ β β
β β β β
β βΌ β β
β βββββββββββββββββββββ β β
β β PromptTemplates β β src/rag/prompts.py β β
β β .get_qa_prompt() β - Build context β β
β βββββββββββ¬ββββββββββ β β
β β β β
β βΌ β β
β βββββββββββββββββββββ β β
β β OpenAI LLM β β langchain_openai β β
β β .invoke() β - Generate answer β β
β βββββββββββ¬ββββββββββ β β
β β β β
β βΌ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RAGResponse β β
β β - answer β Final output with source attribution β
β β - sources β β
β β - confidence β β
β βββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The connector layer provides a pluggable architecture for adding new data sources.
# Key Classes:
# - BaseConnector: Abstract class all connectors inherit from
# - DocumentMetadata: Standardized metadata structure
# - ConnectorRegistry: Register and discover connectors dynamically
class BaseConnector(ABC):
@abstractmethod
def load(self) -> List[Document]:
"""Load documents from the source"""
pass
@abstractmethod
def get_source_type(self) -> str:
"""Return source type identifier (e.g., 'pdf', 'sharepoint')"""
pass# Uses: PyPDFLoader (primary), pdfplumber (fallback)
# Features:
# - Extracts text page-by-page
# - Preserves page numbers in metadata
# - Handles multi-page documents
# - Falls back to pdfplumber for complex PDFs
class PDFConnector(BaseConnector):
def load(self) -> List[Document]:
# Loads all PDFs from source_path directory
# Returns Document with metadata: {source, page, total_pages}# Uses: Office365-REST-Python-Client
# Features:
# - Authenticates with client credentials
# - Downloads files from specified libraries
# - Supports recursive folder scanning
# - Handles various file types (PDF, DOCX, etc.)
class SharePointConnector(BaseConnector):
def __init__(self, site_url, client_id, client_secret, library_name):
# Connects to SharePoint site
def load(self) -> List[Document]:
# Downloads and processes documents from SharePoint# Uses: SQLAlchemy
# Features:
# - Connects to various SQL databases
# - Executes custom queries
# - Converts rows to documents
# - Supports metadata extraction
class DatabaseConnector(BaseConnector):
def __init__(self, connection_string, query):
# Connects using SQLAlchemy
def load(self) -> List[Document]:
# Executes query, converts results to documents# Uses: BeautifulSoup, requests
# Features:
# - Fetches content from URLs
# - Extracts main text content
# - Handles HTML parsing
# - Stores URL as metadata
class WebConnector(BaseConnector):
def __init__(self, urls: List[str]):
self.urls = urls
def load(self) -> List[Document]:
# Scrapes each URL and returns as documentsThe processing layer handles text transformation before storage.
# Key Classes:
# - TextPreprocessor: Main text cleaning class
# - PHIMasker: HIPAA-compliant PII/PHI masking
class TextPreprocessor:
def __init__(self, mask_phi=False):
self.phi_masker = PHIMasker() if mask_phi else None
def process(self, text: str) -> str:
# 1. Normalize unicode
# 2. Remove extra whitespace
# 3. Optionally mask PHI (SSN, MRN, phone numbers)
return cleaned_text
class PHIMasker:
"""Masks Protected Health Information for HIPAA compliance"""
PATTERNS = {
'ssn': r'\d{3}-\d{2}-\d{4}',
'phone': r'\d{3}[-.\s]?\d{3}[-.\s]?\d{4}',
'mrn': r'MRN[:\s]?\d{6,10}',
# ... more patterns
}# Key Classes:
# - RecursiveChunker: Default chunking strategy
# - SemanticChunker: Splits by semantic similarity
# - SentenceChunker: Splits at sentence boundaries
class RecursiveChunker:
def __init__(self, chunk_size=1000, chunk_overlap=200):
# chunk_size: Maximum characters per chunk
# chunk_overlap: Characters shared between chunks (for context)
def chunk_documents(self, documents: List[Document]) -> List[Document]:
# Uses LangChain's RecursiveCharacterTextSplitter
# Splits by: paragraph β sentence β word
# Preserves metadata across chunksWhy chunk_overlap?
Document: "The patient was admitted on Monday. Tests were ordered. Results came back Tuesday."
Without overlap (chunk_size=30):
Chunk 1: "The patient was admitted on M"
Chunk 2: "onday. Tests were ordered. Re" β Context lost!
With overlap=10:
Chunk 1: "The patient was admitted on M"
Chunk 2: "d on Monday. Tests were order" β Overlaps maintain context
# Key Class:
# - EmbeddingManager: Manages embedding model selection and generation
class EmbeddingManager:
PROVIDERS = {
'openai': OpenAIEmbeddings, # 1536 dimensions
'huggingface': HuggingFaceEmbeddings, # 384-768 dimensions
'azure': AzureOpenAIEmbeddings # For HIPAA compliance
}
def __init__(self, provider='openai', model_name=None):
# Initializes appropriate embedding model
def embed_documents(self, texts: List[str]) -> List[List[float]]:
# Converts text to vectors
def embed_query(self, query: str) -> List[float]:
# Converts single query to vectorThe vector store layer handles persistence and retrieval of embeddings.
# Key Class:
# - VectorStoreManager: Manages ChromaDB or FAISS
class VectorStoreManager:
def __init__(
self,
store_type='chroma', # 'chroma' or 'faiss'
persist_directory='./db', # Where to save
embedding_manager=None # Embedding model to use
):
# Initializes vector store
def add_documents(self, documents: List[Document]) -> None:
# 1. Generates embeddings for each document
# 2. Stores vectors with metadata
# 3. Persists to disk (ChromaDB) or memory (FAISS)
def similarity_search(self, query: str, k: int = 5) -> List[Document]:
# 1. Embeds the query
# 2. Finds k nearest neighbors
# 3. Returns documents with scores
# ChromaDB vs FAISS:
# - ChromaDB: Persistent, good for production, supports metadata filtering
# - FAISS: In-memory, faster, better for large datasets# Key Classes:
# - BaseRetriever: Abstract retriever interface
# - VectorRetriever: Pure vector similarity search
# - HybridRetriever: Combines BM25 + vector search
class VectorRetriever(BaseRetriever):
def retrieve(self, query: str, top_k: int = 5) -> List[RetrievedChunk]:
# Pure semantic search using embeddings
class HybridRetriever(BaseRetriever):
def __init__(self, vector_store, bm25_weight=0.3, vector_weight=0.7):
# Combines keyword (BM25) and semantic (vector) search
def retrieve(self, query: str, top_k: int = 5) -> List[RetrievedChunk]:
# 1. Get BM25 scores (keyword matching)
# 2. Get vector similarity scores
# 3. Combine: final_score = bm25_weight * bm25 + vector_weight * vector
# 4. Return top_k by combined score
# When to use Hybrid:
# - When exact keyword matches are important
# - Medical codes, patient IDs, specific terms
# - Better for "find documents mentioning ICD-10 code J18.9"The RAG engine orchestrates the entire query pipeline.
# Key Class & Dataclass:
# - RAGEngine: Main query processing class
# - RAGResponse: Structured response object
@dataclass
class RAGResponse:
answer: str # Generated response
sources: List[str] # Source document paths
chunks_used: List[str] # Actual text chunks used
confidence: float # Confidence score (0-1)
processing_time: float # Query time in seconds
class RAGEngine:
def __init__(
self,
vector_store: VectorStoreManager,
retriever: BaseRetriever,
reranker: Optional[BaseReranker] = None,
llm_model: str = 'gpt-4'
):
self.llm = ChatOpenAI(model=llm_model, temperature=0.1)
# Low temperature = more factual, less creative
def query(
self,
question: str,
source_filter: Optional[List[str]] = None,
top_k: int = 5
) -> RAGResponse:
# MAIN QUERY FLOW:
# 1. Retrieve relevant chunks
chunks = self.retriever.retrieve(question, top_k)
# 2. Optional: Filter by source type
if source_filter:
chunks = [c for c in chunks if c.source_type in source_filter]
# 3. Rerank for better relevance
if self.reranker:
chunks = self.reranker.rerank(question, chunks)
# 4. Build prompt with context
prompt = PromptTemplates.get_qa_prompt(chunks, question)
# 5. Generate response
response = self.llm.invoke(prompt)
# 6. Return structured response
return RAGResponse(
answer=response.content,
sources=list(set(c.metadata['source'] for c in chunks)),
chunks_used=[c.page_content for c in chunks],
confidence=self._calculate_confidence(chunks)
)# Key Class:
# - PromptTemplates: Healthcare-specific prompt engineering
class PromptTemplates:
QA_TEMPLATE = """You are a helpful healthcare information assistant.
Use the following context to answer the question. If the answer is not
in the context, say "I don't have enough information to answer this."
Always cite your sources by mentioning which document the information came from.
Context:
{context}
Question: {question}
Answer:"""
@classmethod
def get_qa_prompt(cls, chunks: List[Document], question: str) -> str:
context = "\n\n".join([
f"[Source: {c.metadata.get('source', 'Unknown')}]\n{c.page_content}"
for c in chunks
])
return cls.QA_TEMPLATE.format(context=context, question=question)# Key Classes:
# - CrossEncoderReranker: Uses cross-encoder model (recommended)
# - LLMReranker: Uses LLM for reranking (more expensive)
class CrossEncoderReranker:
def __init__(self, model_name='cross-encoder/ms-marco-MiniLM-L-6-v2'):
self.model = CrossEncoder(model_name)
def rerank(self, query: str, chunks: List[Document]) -> List[Document]:
# Cross-encoder scores (query, chunk) pairs directly
# More accurate than embedding similarity
pairs = [(query, chunk.page_content) for chunk in chunks]
scores = self.model.predict(pairs)
# Sort by score descending
ranked = sorted(zip(chunks, scores), key=lambda x: x[1], reverse=True)
return [chunk for chunk, score in ranked]
# Why rerank?
# - Initial retrieval uses bi-encoder (fast but less accurate)
# - Reranking uses cross-encoder (slower but more accurate)
# - Best of both worlds: fast retrieval + accurate rerankingThe API layer exposes the system via REST endpoints.
# FastAPI app with lifespan management
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# STARTUP: Initialize RAG engine, load vector store
app.state.rag_engine = RAGEngine(...)
yield
# SHUTDOWN: Cleanup resources
app = FastAPI(
title="KnowledgeRAG API",
lifespan=lifespan
)
# CORS enabled for web clients
app.add_middleware(CORSMiddleware, allow_origins=["*"], ...)# Available endpoints:
@router.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""
Query the knowledge base
Request:
{
"question": "What are the compliance requirements?",
"source_filter": ["pdf", "sharepoint"], # optional
"top_k": 5 # optional
}
Response:
{
"answer": "Based on the documents...",
"sources": ["policy.pdf", "guidelines.pdf"],
"confidence": 0.85
}
"""
@router.post("/ingest")
async def ingest(request: IngestRequest):
"""
Ingest new documents
Request:
{
"source_type": "pdf",
"source_path": "/path/to/documents"
}
"""
@router.get("/sources")
async def list_sources():
"""List all ingested document sources"""
@router.get("/health")
async def health_check():
"""API health check endpoint"""# Request/Response validation
class QueryRequest(BaseModel):
question: str
source_filter: Optional[List[str]] = None
top_k: int = 5
class QueryResponse(BaseModel):
answer: str
sources: List[str]
confidence: float
processing_time: float
class IngestRequest(BaseModel):
source_type: str # 'pdf', 'sharepoint', 'database', 'web'
source_path: str
config: Optional[Dict] = None# Loads configuration from .env and settings.yaml
class Settings:
def __init__(self):
# Load .env file
load_dotenv()
# Load settings.yaml
with open('config/settings.yaml') as f:
self.config = yaml.safe_load(f)
# Environment variables (secrets)
self.openai_api_key = os.getenv('OPENAI_API_KEY')
self.sharepoint_client_id = os.getenv('SHAREPOINT_CLIENT_ID')
# ...
# YAML settings (non-secrets)
self.chunk_size = self.config['chunking']['chunk_size']
self.llm_model = self.config['llm']['model']
# ...
# Usage:
settings = Settings()# Uses loguru for structured logging
from loguru import logger
# Configure logger
logger.add(
"logs/app.log",
rotation="10 MB",
retention="7 days",
level="INFO"
)
# Usage throughout the app:
logger.info("Processing document", source=doc.metadata['source'])
logger.error("Failed to connect", error=str(e))| Component | Technology | Purpose |
|---|---|---|
| LLM Framework | LangChain | Orchestration, chains, and agents |
| Embeddings | OpenAI / HuggingFace | Text-to-vector conversion |
| Vector Store | ChromaDB (default), FAISS | Similarity search and retrieval |
| LLM | OpenAI GPT-4 / Local LLMs | Response generation |
| Document Processing | LangChain Document Loaders | Multi-format ingestion |
| API | FastAPI | REST API interface |
| SharePoint | Office365-REST-Python-Client | SharePoint integration |
| Package | Version | Purpose |
|---|---|---|
langchain |
1.2.0 | Core LangChain framework |
langchain-openai |
1.1.6 | OpenAI integration |
chromadb |
1.4.0 | Vector database |
sentence-transformers |
5.2.0 | Local embeddings |
fastapi |
0.128.0 | REST API framework |
pydantic |
2.12.5 | Data validation |
KnowledgeRAG/
βββ src/
β βββ __init__.py
β βββ connectors/ # Data source connectors
β β βββ __init__.py
β β βββ base.py # Abstract base connector & registry
β β βββ pdf_connector.py # PDF file connector
β β βββ sharepoint_connector.py # SharePoint connector
β β βββ database_connector.py # SQL database connector
β β βββ web_connector.py # Web/URL connector
β βββ processing/ # Document processing pipeline
β β βββ __init__.py
β β βββ preprocessor.py # Text cleaning & PHI masking
β β βββ chunker.py # Text chunking strategies
β β βββ embeddings.py # Embedding generation
β βββ vectorstore/ # Vector database operations
β β βββ __init__.py
β β βββ store.py # Vector store manager
β β βββ retriever.py # Retrieval strategies
β βββ rag/ # RAG engine
β β βββ __init__.py
β β βββ engine.py # Main RAG engine (orchestrator)
β β βββ prompts.py # Prompt templates
β β βββ reranker.py # Result reranking
β βββ api/ # API layer
β β βββ __init__.py
β β βββ main.py # FastAPI application
β β βββ routes.py # API routes
β β βββ models.py # Pydantic models
β βββ utils/ # Utilities
β βββ __init__.py
β βββ config.py # Configuration management
β βββ logger.py # Logging setup
βββ config/ # Configuration files
β βββ settings.yaml # Application settings
βββ data/ # Data directory
β βββ documents/ # Place PDF files here
β βββ README.md
βββ tests/ # Unit tests
β βββ __init__.py
β βββ test_basic.py
βββ examples/ # Usage examples
β βββ usage.py
βββ logs/ # Log files (auto-created)
βββ chroma_db/ # ChromaDB storage (auto-created)
βββ main.py # CLI entry point
βββ requirements.txt # Python dependencies
βββ .env # Environment variables (create from .env.example)
βββ .env.example # Environment template
βββ .gitignore # Git ignore rules
# You've already done this:
cd /home/linuxdev1/PracticeApps/KnowledgeRAG
python -m venv venv
source venv/bin/activate
pip install -r requirements.txtEdit the .env file with your OpenAI API key:
# Edit .env file
nano .env
# Add your key:
OPENAI_API_KEY=sk-your-actual-api-key-here# Add some PDF files to the data directory
cp /path/to/your/documents/*.pdf ./data/documents/
# Or create a sample text file for testing
echo "This is a test document about patient intake procedures." > ./data/documents/sample.txtOption A: Using the CLI
# Activate virtual environment first
source venv/bin/activate
# Start interactive query mode
python main.py query
# Or start the API server
python main.py apiOption B: Using Python directly
# test_quick.py
from src.connectors.pdf_connector import PDFConnector
from src.vectorstore.store import VectorStoreManager
from src.rag.engine import RAGEngine
# 1. Load documents
connector = PDFConnector(source_path="./data/documents")
documents = connector.load()
print(f"Loaded {len(documents)} documents")
# 2. Store in vector database
store = VectorStoreManager()
store.add_documents(documents)
print("Documents stored in vector database")
# 3. Query
engine = RAGEngine(vector_store=store)
response = engine.query("What is mentioned in the documents?")
print(f"Answer: {response.answer}")
print(f"Sources: {response.sources}")Option C: Using the API
# Start the server
uvicorn src.api.main:app --reload --host 0.0.0.0 --port 8000
# Access Swagger docs at: http://localhost:8000/docs
# Query via curl
curl -X POST "http://localhost:8000/query" \
-H "Content-Type: application/json" \
-d '{"question": "What is the patient intake process?"}'from src.connectors.pdf_connector import PDFConnector
from src.processing.preprocessor import TextPreprocessor
from src.processing.chunker import RecursiveChunker
from src.vectorstore.store import VectorStoreManager
from src.rag.engine import RAGEngine
# Step 1: Load PDFs
pdf_connector = PDFConnector(source_path="./data/documents")
raw_docs = pdf_connector.load()
print(f"Loaded {len(raw_docs)} pages from PDFs")
# Step 2: Preprocess (optional PHI masking)
preprocessor = TextPreprocessor(mask_phi=True)
clean_docs = preprocessor.process_documents(raw_docs)
# Step 3: Chunk documents
chunker = RecursiveChunker(chunk_size=1000, chunk_overlap=200)
chunks = chunker.chunk_documents(clean_docs)
print(f"Created {len(chunks)} chunks")
# Step 4: Store in vector database
store = VectorStoreManager(
store_type="chroma",
persist_directory="./chroma_db"
)
store.add_documents(chunks)
print("Documents indexed successfully")
# Step 5: Query
engine = RAGEngine(vector_store=store)
response = engine.query("What are the main topics covered?")
print(f"\nAnswer: {response.answer}")
print(f"\nSources used: {response.sources}")
print(f"Confidence: {response.confidence:.2%}")from src.connectors.pdf_connector import PDFConnector
from src.connectors.web_connector import WebConnector
from src.connectors.database_connector import DatabaseConnector
from src.vectorstore.store import VectorStoreManager
# Initialize vector store (shared)
store = VectorStoreManager()
# Load from PDFs
pdf_connector = PDFConnector(source_path="./data/documents")
pdf_docs = pdf_connector.load()
store.add_documents(pdf_docs)
print(f"Added {len(pdf_docs)} PDF documents")
# Load from web URLs
web_connector = WebConnector(urls=[
"https://example.com/policy.html",
"https://example.com/guidelines.html"
])
web_docs = web_connector.load()
store.add_documents(web_docs)
print(f"Added {len(web_docs)} web documents")
# Load from database
db_connector = DatabaseConnector(
connection_string="postgresql://user:pass@localhost/db",
query="SELECT content, title FROM knowledge_base"
)
db_docs = db_connector.load()
store.add_documents(db_docs)
print(f"Added {len(db_docs)} database records")
# Now queries will search across ALL sourcesfrom src.rag.engine import RAGEngine
engine = RAGEngine(vector_store=store)
# Query only PDF sources
response = engine.query(
question="What are the medication guidelines?",
source_filter=["pdf"],
top_k=3
)
# Query only SharePoint sources
response = engine.query(
question="What is the latest HR policy?",
source_filter=["sharepoint"],
top_k=5
)from src.connectors.base import BaseConnector, ConnectorRegistry
from langchain.schema import Document
from typing import List
@ConnectorRegistry.register("custom")
class CustomConnector(BaseConnector):
"""Custom connector for your data source"""
def __init__(self, api_endpoint: str, api_key: str):
self.api_endpoint = api_endpoint
self.api_key = api_key
def load(self) -> List[Document]:
# Your custom loading logic here
# Must return List[Document]
data = fetch_from_api(self.api_endpoint, self.api_key)
documents = []
for item in data:
doc = Document(
page_content=item['text'],
metadata={
'source': self.api_endpoint,
'source_type': self.get_source_type(),
'id': item['id'],
'timestamp': item['created_at']
}
)
documents.append(doc)
return documents
def get_source_type(self) -> str:
return "custom"
# Use your custom connector
custom = CustomConnector(api_endpoint="https://api.example.com", api_key="xxx")
docs = custom.load()
store.add_documents(docs)# Required for OpenAI
OPENAI_API_KEY=sk-your-key-here
# Optional: Azure OpenAI (for HIPAA compliance)
AZURE_OPENAI_API_KEY=your-azure-key
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
# Optional: SharePoint
SHAREPOINT_SITE_URL=https://yourcompany.sharepoint.com/sites/YourSite
SHAREPOINT_CLIENT_ID=your-client-id
SHAREPOINT_CLIENT_SECRET=your-client-secret
# Optional: Database
DATABASE_CONNECTION_STRING=postgresql://user:pass@host:5432/db# Embedding Configuration
embedding:
provider: "openai" # Options: openai, huggingface, azure
model: "text-embedding-ada-002" # OpenAI model
# model: "sentence-transformers/all-MiniLM-L6-v2" # HuggingFace (free)
# LLM Configuration
llm:
provider: "openai" # Options: openai, azure, huggingface
model: "gpt-4" # Options: gpt-4, gpt-3.5-turbo
temperature: 0.1 # Lower = more factual
max_tokens: 1000 # Max response length
# Chunking Configuration
chunking:
strategy: "recursive" # Options: recursive, semantic, sentence
chunk_size: 1000 # Characters per chunk
chunk_overlap: 200 # Overlap between chunks
# Retrieval Configuration
retrieval:
top_k: 5 # Number of chunks to retrieve
strategy: "hybrid" # Options: vector, hybrid, mmr
rerank: true # Enable reranking
rerank_model: "cross-encoder/ms-marco-MiniLM-L-6-v2"
# Vector Store Configuration
vectorstore:
type: "chroma" # Options: chroma, faiss
persist_directory: "./chroma_db" # Where to save
collection_name: "knowledge_base" # Collection name
# API Configuration
api:
host: "0.0.0.0"
port: 8000
cors_origins: ["*"] # Allowed origins
# Logging Configuration
logging:
level: "INFO" # Options: DEBUG, INFO, WARNING, ERROR
file: "logs/app.log"
rotation: "10 MB"
retention: "7 days"-
Use Azure OpenAI - HIPAA BAA available
# config/settings.yaml llm: provider: "azure" deployment_name: "your-deployment"
-
Enable PHI Masking - Automatically mask sensitive data
preprocessor = TextPreprocessor(mask_phi=True)
-
Use Local Models - No data leaves your network
embedding: provider: "huggingface" model: "sentence-transformers/all-MiniLM-L6-v2"
All queries are automatically logged with:
- Timestamp
- User (if authenticated)
- Query text
- Sources accessed
- Response generated
Every response includes:
- List of source documents used
- Page numbers (for PDFs)
- Confidence score
- Chunk text used for transparency
- Create a new file in
src/connectors/ - Inherit from
BaseConnector - Implement
load()andget_source_type() - Register with
@ConnectorRegistry.register("name")
- Create a new class in
src/processing/chunker.py - Inherit from
BaseChunker - Implement
chunk_documents()
- Create a new class in
src/vectorstore/retriever.py - Inherit from
BaseRetriever - Implement
retrieve()
# Activate virtual environment
source venv/bin/activate
# Run tests
pytest tests/ -v
# Run with coverage
pytest tests/ --cov=src --cov-report=html-
"OPENAI_API_KEY not set"
# Check .env file cat .env # Should contain: OPENAI_API_KEY=sk-...
-
"No module named 'src'"
# Make sure you're in the project directory cd /home/linuxdev1/PracticeApps/KnowledgeRAG source venv/bin/activate
-
"ChromaDB connection error"
# Delete and recreate the database rm -rf ./chroma_db -
"Out of memory"
- Reduce chunk_size in settings.yaml
- Use FAISS instead of ChromaDB for large datasets
- Use HuggingFace embeddings instead of OpenAI
MIT License
- Add your OpenAI API key to
.env - Add sample documents to
data/documents/ - Run the system using
python main.py apiorpython main.py query - Test the API at
http://localhost:8000/docs
Need help? Check the examples in examples/usage.py or the tests in tests/test_basic.py.