- System Overview
- Core Architecture Principles
- Base Class Hierarchy
- Integration Patterns
- Data Flow Architecture
- Component Architecture
- Design Patterns
- Security & Compliance Architecture
- Performance Architecture
- Testing Architecture
- Deployment Architecture
- Monitoring & Observability
The IntegratedML Flexible Model Integration Demo showcases a sophisticated architecture that bridges enterprise database capabilities with modern machine learning workflows. This guide provides a comprehensive technical deep-dive into the system design, base classes, integration patterns, and architectural decisions.
The project is structured around a series of Jupyter notebooks that provide interactive, domain-specific demonstrations.
- Per-Domain Notebooks: Each demo resides in its own directory under
demos/*/notebooks/, providing a self-contained environment for exploration. - Shared Plotting Utilities: Common visualization functions are centralized in
notebooks/utils/plotting.pyto ensure consistent and reusable plotting code. - Shared Python Modules: Core database, data loading, and model management logic is located in the
shared/directory to promote code reuse and maintainability across all notebooks.
- Database-Native ML: Execute ML models directly within database environments without data movement
- Pluggable Design: Support arbitrary custom models through standardized interfaces
- Production Ready: Enterprise-grade performance, security, and scalability
- Developer Friendly: Familiar scikit-learn patterns with enhanced capabilities
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
├─────────────────────────────────────────────────────────────┤
│ Business Logic │
├─────────────────────────────────────────────────────────────┤
│ Model Abstraction │
├─────────────────────────────────────────────────────────────┤
│ Integration Layer │
├─────────────────────────────────────────────────────────────┤
│ Database Engine │
└─────────────────────────────────────────────────────────────┘
Application Layer: Business applications, SQL queries, BI tools Business Logic: Domain-specific processing, feature engineering, validation Model Abstraction: Standardized ML interfaces (IntegratedMLBaseModel) Integration Layer: IntegratedML framework, serialization, lifecycle management Database Engine: IRIS database with native ML capabilities
Each model type implements only the interfaces it needs:
- IntegratedMLBaseModel: Core functionality (fit, predict, serialization)
- ClassificationModel: Classification-specific methods (predict_proba, decision thresholds)
- RegressionModel: Regression-specific methods (confidence intervals, residuals)
- EnsembleModel: Multi-model orchestration (voting, weighting, meta-learning)
High-level modules (business logic) depend on abstractions (base classes), not concrete implementations. This enables:
- Flexible Model Integration: Swap model implementations without changing business logic
- Testing: Mock model implementations for unit testing
- Evolution: Add new model types without breaking existing code
┌─────────────────────────────────┐
│ BaseEstimator (sklearn) │
│ + ClassifierMixin │
│ + RegressorMixin │
└────────────────┬────────────────┘
│
┌────────────────▼────────────────┐
│ IntegratedMLBaseModel (ABC) │
│ │
│ Core Interface: │
│ • fit(X, y) │
│ • predict(X) │
│ • save_model() / load_model() │
│ • _validate_parameters() │
│ • _validate_input() │
└─────────┬───────────────────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
┌─────────▼─────────┐ ┌────▼──────────┐ ┌───▼──────────┐
│ ClassificationModel│ │RegressionModel│ │EnsembleModel │
│ + ClassifierMixin │ │ + RegressorMixin│ │ │
│ │ │ │ │ │
│ Additional: │ │ Additional: │ │ Additional: │
│ • predict_proba() │ │ • predict_with │ │ • add_comp │
│ • predict_log_ │ │ _interval() │ │ • set_voting│
│ proba() │ │ • get_residuals│ │ _weights()│
│ • decision_ │ │ • score() │ │ • get_comp │
│ function() │ │ │ │ _predict │
└─────────┬──────────┘ └────┬───────────┘ └───┬─────────┘
│ │ │
│ │ │
┌────────▼─────────┐ ┌────▼──────────┐ ┌────▼──────────┐
│ Credit Risk │ │ Sales │ │ Fraud │
│ Classifier │ │ Forecasting │ │ Detection │
│ (demo 1) │ │ (demo 3) │ │ Ensemble │
│ │ │ │ │ (demo 2) │
└──────────────────┘ └───────────────┘ └───────────────┘
Demo-Specific Models:
• CustomCreditRiskClassifier → Credit risk with feature engineering
• HybridForecastingModel → Prophet + LightGBM combination
• EnsembleFraudDetector → Multi-model ensemble (Neural + Rules + Anomaly)
• DNASequenceClassifier → K-NN with custom distance metrics
Inheritance Flow: Each layer adds specialized functionality while inheriting core capabilities from parent classes. Demo models implement domain-specific logic (custom feature engineering, ensemble strategies, third-party library integration) while maintaining IntegratedML compatibility.
The foundation of all flexible model integration, providing essential functionality:
class IntegratedMLBaseModel(BaseEstimator, ABC):
"""
Abstract base class ensuring IntegratedML compatibility.
Responsibilities:
- Parameter validation and serialization
- Model lifecycle management (fit/predict)
- Input validation and preprocessing
- Metadata storage and retrieval
- Error handling and logging
"""
# Core interface
@abstractmethod
def fit(X, y) -> 'IntegratedMLBaseModel'
@abstractmethod
def predict(X) -> np.ndarray
@abstractmethod
def _validate_parameters() -> None
# Utility methods
def get_params() -> Dict[str, Any]
def set_params(**params) -> 'IntegratedMLBaseModel'
def save_model(path: str) -> None
def load_model(path: str) -> 'IntegratedMLBaseModel'
def get_model_info() -> Dict[str, Any]Extends IntegratedMLBaseModel for classification tasks:
class ClassificationModel(IntegratedMLBaseModel, ClassifierMixin):
"""
Classification-specific functionality.
Additional Responsibilities:
- Probability predictions (predict_proba)
- Class label handling and encoding
- Decision threshold management
- Classification metrics integration
"""
def predict_proba(X) -> np.ndarray
def predict_log_proba(X) -> np.ndarray
def decision_function(X) -> np.ndarrayExtends IntegratedMLBaseModel for regression tasks:
class RegressionModel(IntegratedMLBaseModel, RegressorMixin):
"""
Regression-specific functionality.
Additional Responsibilities:
- Confidence interval prediction
- Residual analysis and diagnostics
- Uncertainty quantification
- Regression metrics integration
"""
def predict_with_interval(X, confidence_level) -> Tuple[np.ndarray, np.ndarray, np.ndarray]
def get_residuals(X, y) -> np.ndarray
def score(X, y) -> floatCoordinates multiple models for enhanced performance:
class EnsembleModel(IntegratedMLBaseModel):
"""
Multi-model orchestration and voting.
Additional Responsibilities:
- Component model management
- Voting strategy implementation
- Meta-learning and stacking
- Component performance tracking
"""
def add_component(name: str, model: IntegratedMLBaseModel) -> None
def set_voting_weights(weights: Dict[str, float]) -> None
def get_component_predictions(X) -> Dict[str, np.ndarray]
def get_component_info() -> Dict[str, Dict]All models implement consistent parameter validation:
def _validate_parameters(self) -> None:
"""
Validate all model parameters and raise descriptive errors.
Pattern:
1. Call super()._validate_parameters()
2. Validate model-specific parameters
3. Raise ValueError with clear error messages
4. Perform cross-parameter validation
"""
super()._validate_parameters()
# Type validation
if not isinstance(self.learning_rate, (int, float)):
raise ValueError(f"learning_rate must be numeric, got {type(self.learning_rate)}")
# Range validation
if not 0 < self.learning_rate <= 1:
raise ValueError(f"learning_rate must be in (0, 1], got {self.learning_rate}")
# Cross-parameter validation
if self.max_depth is not None and self.max_depth < 1:
raise ValueError("max_depth must be None or positive integer")Standardized input validation across all models:
def _validate_input(self, X: Union[np.ndarray, pd.DataFrame],
check_fitted: bool = False) -> np.ndarray:
"""
Validate and normalize input data.
Responsibilities:
- Convert DataFrames to arrays
- Validate shape consistency
- Handle missing values
- Check feature count consistency
- Store feature names if available
"""
if check_fitted and not self.is_fitted:
raise ValueError("Model must be fitted before prediction")
# Convert to numpy array
if hasattr(X, 'values'):
X = X.values
X = np.asarray(X)
# Validate dimensions
if X.ndim != 2:
raise ValueError(f"Expected 2D array, got {X.ndim}D")
# Validate feature count
if hasattr(self, 'n_features_in_') and X.shape[1] != self.n_features_in_:
raise ValueError(f"Expected {self.n_features_in_} features, got {X.shape[1]}")
return XConsistent model training and prediction lifecycle:
def fit(self, X, y):
"""
Standard model training lifecycle.
Pattern:
1. Validate inputs
2. Store feature metadata
3. Perform model-specific training
4. Set is_fitted flag
5. Store model metadata
6. Return self for method chaining
"""
# Input validation
X = self._validate_input(X)
y = self._validate_target(y)
# Store metadata
self.n_features_in_ = X.shape[1]
if hasattr(X, 'columns'):
self.feature_names_in_ = X.columns.tolist()
# Model-specific training
self._fit_model(X, y)
# Mark as fitted and store metadata
self.is_fitted = True
self._model_metadata = self._create_model_metadata(X, y)
return self
def predict(self, X):
"""
Standard prediction lifecycle.
Pattern:
1. Check fitted status
2. Validate inputs
3. Perform predictions
4. Validate outputs
5. Return predictions
"""
if not self.is_fitted:
raise ValueError("Model must be fitted before prediction")
X = self._validate_input(X, check_fitted=True)
predictions = self._predict_model(X)
return self._validate_predictions(predictions)Robust model persistence for production deployment:
def save_model(self, path: str) -> None:
"""
Save model with complete state preservation.
Components:
- Model parameters and hyperparameters
- Trained model state (weights, trees, etc.)
- Feature metadata (names, types, scaling)
- Training metadata (samples, performance)
- Version and dependency information
"""
import pickle
import json
from pathlib import Path
model_dir = Path(path)
model_dir.mkdir(parents=True, exist_ok=True)
# Save model state
with open(model_dir / 'model_state.pkl', 'wb') as f:
pickle.dump(self._get_serializable_state(), f)
# Save metadata
metadata = {
'model_class': self.__class__.__name__,
'version': self._get_version(),
'parameters': self.get_params(),
'feature_metadata': self._get_feature_metadata(),
'training_metadata': self._model_metadata,
'dependencies': self._get_dependencies()
}
with open(model_dir / 'metadata.json', 'w') as f:
json.dump(metadata, f, indent=2, default=str)
@classmethod
def load_model(cls, path: str) -> 'IntegratedMLBaseModel':
"""
Load model with full state restoration.
Validation:
- Version compatibility
- Dependency availability
- Parameter consistency
- Feature metadata matching
"""
import pickle
import json
from pathlib import Path
model_dir = Path(path)
# Load metadata
with open(model_dir / 'metadata.json', 'r') as f:
metadata = json.load(f)
# Validate compatibility
cls._validate_load_compatibility(metadata)
# Create instance
instance = cls(**metadata['parameters'])
# Load state
with open(model_dir / 'model_state.pkl', 'rb') as f:
state = pickle.load(f)
instance._restore_serializable_state(state)
instance._model_metadata = metadata['training_metadata']
return instanceRaw Data → Feature Engineering → Validation → Model Training → Serialization
↓ ↓ ↓ ↓ ↓
Database Custom Logic Base Classes Model-Specific Production
Tables (Domain) (Validation) (Algorithms) Storage
- Data Extraction: SQL queries extract training data from database tables
- Feature Engineering: Domain-specific transformations create model-ready features
- Validation: Base classes validate inputs, parameters, and constraints
- Model Training: Algorithm-specific training logic learns from data
- Serialization: Complete model state saved for production deployment
New Data → Feature Engineering → Model Loading → Prediction → Result Integration
↓ ↓ ↓ ↓ ↓
Real-time Same Logic Deserialization Algorithm Database
Input (Consistency) (State Restore) (Inference) Updates
- Data Input: New observations arrive via SQL queries or streaming
- Feature Engineering: Identical transformations applied to new data
- Model Loading: Trained model loaded from serialized state
- Prediction: Model generates predictions for new observations
- Integration: Results written back to database or returned to application
SQL Query → IntegratedML → Model Registry → Execution Engine → Results
↓ ↓ ↓ ↓ ↓
Business Framework Model Storage Optimized Database
Logic (Routing) (Serialized) Execution Integration
- SQL Query: Business users write standard SQL with PREDICT() calls
- IntegratedML Framework: Routes prediction requests to appropriate models
- Model Registry: Locates and loads specified model from storage
- Execution Engine: Optimized prediction execution within database context
- Results: Predictions seamlessly integrated with SQL query results
integratedml-demos/
├── shared/ # Reusable components
│ ├── models/ # Base model classes
│ │ ├── base.py # IntegratedMLBaseModel
│ │ ├── classification.py # ClassificationModel
│ │ ├── regression.py # RegressionModel
│ │ └── ensemble.py # EnsembleModel
│ ├── utils/ # Shared utilities
│ │ ├── validation.py # Input validation
│ │ ├── serialization.py # Model persistence
│ │ └── metrics.py # Performance metrics
│ ├── data/ # Data utilities
│ │ ├── generators.py # Synthetic data generation
│ │ └── preprocessing.py # Feature engineering
│ └── testing/ # Testing framework
│ ├── base_tests.py # Base test classes
│ └── fixtures.py # Test data fixtures
├── demos/ # Progressive complexity demos
│ ├── credit_risk/ # Demo 1: Basic classification
│ │ ├── models/ # Custom model implementations
│ │ ├── data/ # Demo-specific data
│ │ ├── notebooks/ # Interactive tutorials
│ │ ├── scripts/ # Automation scripts
│ │ └── tests/ # Demo-specific tests
│ ├── fraud_detection/ # Demo 2: Ensemble methods
│ │ └── [similar structure]
│ └── sales_forecasting/ # Demo 3: Third-party integration
│ └── [similar structure]
├── examples/ # Usage examples
│ ├── quick_start_example.py # Basic usage patterns
│ └── model_config_template.yaml # Configuration template
└── docs/ # Comprehensive documentation
├── tutorials/ # Step-by-step guides
├── api_reference.md # Complete API documentation
├── architecture.md # This document
└── deployment.md # Production deployment
The architecture uses a layered dependency approach:
Level 1 - Core Dependencies
- Python 3.8+ (base language)
- NumPy (numerical computing)
- Pandas (data manipulation)
- Scikit-learn (ML framework compatibility)
Level 2 - Specialized Dependencies
- Credit Risk: Standard ML libraries (minimal dependencies)
- Fraud Detection: Ensemble libraries, IRIS Vector Search
- Sales Forecasting: Prophet, LightGBM (complex dependencies)
Level 3 - Optional Dependencies
- Visualization: Matplotlib, Plotly
- Advanced ML: XGBoost, TensorFlow
- Database: IRIS database connectors
This layered approach ensures:
- Core functionality works with minimal dependencies
- Advanced features available when specialized libraries installed
- Graceful degradation when optional components unavailable
Base classes define algorithmic structure, subclasses implement specific steps:
class IntegratedMLBaseModel:
def fit(self, X, y):
# Template method defining training algorithm
X = self._validate_input(X) # Step 1: Validation
y = self._validate_target(y) # Step 2: Target validation
self._store_metadata(X, y) # Step 3: Metadata storage
self._fit_model(X, y) # Step 4: Model-specific training
self.is_fitted = True # Step 5: State update
return self
@abstractmethod
def _fit_model(self, X, y):
# Subclasses implement specific training logic
passDifferent algorithms interchangeable through common interface:
class EnsembleModel:
def __init__(self, voting_strategy='weighted'):
self.voting_strategy = voting_strategy
self._voting_strategies = {
'hard': self._hard_voting,
'soft': self._soft_voting,
'weighted': self._weighted_voting,
'stacking': self._stacking_voting
}
def predict(self, X):
component_predictions = self._get_component_predictions(X)
voting_function = self._voting_strategies[self.voting_strategy]
return voting_function(component_predictions)Model creation abstracted through factory methods:
class ModelFactory:
@staticmethod
def create_model(model_type: str, **kwargs):
if model_type == 'credit_risk':
from demos.credit_risk.models import CustomCreditRiskClassifier
return CustomCreditRiskClassifier(**kwargs)
elif model_type == 'fraud_detection':
from demos.fraud_detection.models import EnsembleFraudDetector
return EnsembleFraudDetector(**kwargs)
elif model_type == 'sales_forecasting':
from demos.sales_forecasting.models import HybridForecastingModel
return HybridForecastingModel(**kwargs)
else:
raise ValueError(f"Unknown model type: {model_type}")Performance monitoring and logging:
class ModelObserver:
def on_fit_start(self, model, X, y): pass
def on_fit_end(self, model, metrics): pass
def on_predict_start(self, model, X): pass
def on_predict_end(self, model, predictions): pass
class IntegratedMLBaseModel:
def __init__(self):
self._observers = []
def add_observer(self, observer: ModelObserver):
self._observers.append(observer)
def _notify_observers(self, event, **kwargs):
for observer in self._observers:
getattr(observer, event)(**kwargs)Principle: ML processing occurs within secure database boundaries
- No Data Export: Training and prediction happen in-database
- Access Control: Database-level permissions control data access
- Audit Trails: All model operations logged for compliance
- Encryption: Data encrypted at rest and in transit
Serialization Security:
def save_model(self, path: str, encrypt: bool = True):
"""Save model with optional encryption."""
state = self._get_serializable_state()
if encrypt:
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher = Fernet(key)
state = cipher.encrypt(pickle.dumps(state))
# Store key securely (implementation specific)
self._store_encryption_key(key, path)
with open(path, 'wb') as f:
f.write(state)Input Validation Security:
def _validate_input(self, X):
"""Validate inputs to prevent injection attacks."""
# Check for malicious inputs
if isinstance(X, str) and any(keyword in X.lower() for keyword in ['drop', 'delete', 'union']):
raise ValueError("Potentially malicious input detected")
# Validate data types and ranges
X = np.asarray(X)
if np.any(np.isinf(X)) or np.any(np.isnan(X)):
raise ValueError("Input contains invalid values (inf/nan)")
return XModel Governance:
- Version control for all model changes
- Approval workflows for production deployment
- Performance monitoring and drift detection
- Automated rollback capabilities
Regulatory Compliance:
- GDPR: Right to explanation through model interpretability
- SOX: Audit trails for financial ML models
- FDA: Validation documentation for healthcare models
- Fair Lending: Bias detection and mitigation
1. Lazy Loading
class ModelRegistry:
def __init__(self):
self._models = {}
self._model_paths = {}
def get_model(self, name: str):
if name not in self._models:
# Load model only when needed
self._models[name] = self._load_model(self._model_paths[name])
return self._models[name]2. Connection Pooling
class DatabaseConnectionPool:
def __init__(self, max_connections=10):
self._pool = queue.Queue(maxsize=max_connections)
for _ in range(max_connections):
self._pool.put(self._create_connection())
@contextmanager
def get_connection(self):
conn = self._pool.get()
try:
yield conn
finally:
self._pool.put(conn)3. Caching Strategy
class PredictionCache:
def __init__(self, max_size=1000, ttl_seconds=300):
self._cache = {}
self._max_size = max_size
self._ttl = ttl_seconds
def get_prediction(self, model_name: str, input_hash: str):
key = f"{model_name}:{input_hash}"
if key in self._cache:
result, timestamp = self._cache[key]
if time.time() - timestamp < self._ttl:
return result
return NoneHorizontal Scaling:
- Model sharding across multiple database instances
- Load balancing for prediction requests
- Distributed training for large datasets
Vertical Scaling:
- In-memory model caching
- Optimized feature engineering pipelines
- Compiled prediction functions
Database Integration:
- Native stored procedures for model execution
- Optimized SQL generation for feature engineering
- Parallel execution of ensemble components
1. Unit Tests - Individual component validation
class TestIntegratedMLBaseModel:
def test_parameter_validation(self):
with pytest.raises(ValueError):
CustomModel(invalid_param=-1)
def test_fit_predict_cycle(self):
model = CustomModel()
model.fit(X_train, y_train)
predictions = model.predict(X_test)
assert len(predictions) == len(X_test)2. Integration Tests - Component interaction validation
class TestModelIntegration:
def test_serialization_roundtrip(self):
original_model = CustomModel()
original_model.fit(X_train, y_train)
# Save and load
original_model.save_model('test_model')
loaded_model = CustomModel.load_model('test_model')
# Verify identical predictions
orig_pred = original_model.predict(X_test)
load_pred = loaded_model.predict(X_test)
np.testing.assert_array_almost_equal(orig_pred, load_pred)3. System Tests - End-to-end validation
class TestSystemIntegration:
def test_sql_integration(self):
# Test complete SQL workflow
model = create_and_train_model()
deploy_to_database(model)
result = execute_sql_query("""
SELECT customer_id, PREDICT(MyModel) as prediction
FROM customers LIMIT 10
""")
assert len(result) == 10
assert all('prediction' in row for row in result)Automated Testing Pipeline:
- Code commit triggers automated tests
- Unit tests validate individual components
- Integration tests verify component interactions
- Performance tests ensure latency targets met
- Security tests check for vulnerabilities
- Documentation tests verify example accuracy
Quality Gates:
- 95%+ test coverage required
- All tests must pass
- Performance regression checks
- Security vulnerability scans
- Documentation completeness validation
1. Blue-Green Deployment
class ModelDeploymentManager:
def deploy_model(self, model, version: str):
# Deploy to staging environment (green)
staging_path = f"models/staging/{version}/"
model.save_model(staging_path)
# Validate deployment
if self._validate_deployment(staging_path):
# Promote to production (blue)
production_path = f"models/production/{version}/"
self._promote_to_production(staging_path, production_path)
self._update_model_registry(version)
else:
raise DeploymentError("Model validation failed")2. Canary Deployment
class CanaryDeployment:
def deploy_canary(self, new_model, traffic_percentage: float = 0.05):
# Route small percentage of traffic to new model
self._update_routing_rules(new_model, traffic_percentage)
# Monitor performance
metrics = self._monitor_canary_performance(duration_minutes=30)
if metrics['error_rate'] < 0.01 and metrics['latency_p95'] < 100:
# Gradually increase traffic
self._increase_canary_traffic(new_model, target_percentage=1.0)
else:
# Rollback to previous model
self._rollback_canary(new_model)3. Database Integration
-- Model deployment SQL procedures
CREATE PROCEDURE DeployModel(
@ModelName VARCHAR(100),
@ModelVersion VARCHAR(50),
@ModelPath VARCHAR(500)
)
AS
BEGIN
-- Validate model compatibility
IF NOT EXISTS (SELECT 1 FROM ValidatedModels WHERE Name = @ModelName AND Version = @ModelVersion)
THROW 50001, 'Model not validated for deployment', 1;
-- Register new model version
INSERT INTO ModelRegistry (Name, Version, Path, DeployedAt, Status)
VALUES (@ModelName, @ModelVersion, @ModelPath, GETDATE(), 'Active');
-- Update active model pointer
UPDATE ActiveModels
SET CurrentVersion = @ModelVersion, UpdatedAt = GETDATE()
WHERE ModelName = @ModelName;
ENDModel Performance Metrics:
class ModelPerformanceMonitor:
def track_prediction(self, model_name: str, prediction_time: float,
input_size: int, confidence: float):
metrics = {
'model_name': model_name,
'prediction_time_ms': prediction_time * 1000,
'input_size': input_size,
'confidence': confidence,
'timestamp': datetime.utcnow()
}
# Store in time-series database
self._store_metrics(metrics)
# Check for performance degradation
if prediction_time > self._get_latency_threshold(model_name):
self._alert_performance_degradation(model_name, prediction_time)Model Drift Detection:
class ModelDriftDetector:
def detect_data_drift(self, current_data: np.ndarray,
reference_data: np.ndarray) -> Dict[str, float]:
"""Detect statistical drift in input data."""
drift_scores = {}
for i in range(current_data.shape[1]):
# Kolmogorov-Smirnov test for distribution drift
statistic, p_value = stats.ks_2samp(
reference_data[:, i], current_data[:, i]
)
drift_scores[f'feature_{i}'] = {
'statistic': statistic,
'p_value': p_value,
'drift_detected': p_value < 0.05
}
return drift_scores
def detect_prediction_drift(self, model, current_data: np.ndarray) -> float:
"""Detect drift in prediction patterns."""
current_predictions = model.predict(current_data)
reference_predictions = self._get_reference_predictions(model)
# Calculate prediction distribution drift
return stats.wasserstein_distance(current_predictions, reference_predictions)Structured Logging:
import structlog
logger = structlog.get_logger()
class IntegratedMLBaseModel:
def fit(self, X, y):
logger.info("Model training started",
model_class=self.__class__.__name__,
n_samples=len(X),
n_features=X.shape[1])
try:
self._fit_model(X, y)
logger.info("Model training completed successfully",
training_time=time.time() - start_time)
except Exception as e:
logger.error("Model training failed",
error=str(e),
error_type=type(e).__name__)
raiseDebug Mode:
class DebugModel(IntegratedMLBaseModel):
def __init__(self, debug_mode: bool = False, **kwargs):
self.debug_mode = debug_mode
super().__init__(**kwargs)
def predict(self, X):
if self.debug_mode:
# Enhanced debugging information
debug_info = {
'input_shape': X.shape,
'input_stats': {
'mean': np.mean(X, axis=0),
'std': np.std(X, axis=0),
'min': np.min(X, axis=0),
'max': np.max(X, axis=0)
}
}
logger.debug("Debug prediction info", **debug_info)
return super().predict(X)This technical architecture provides the foundation for building, deploying, and maintaining sophisticated ML systems that bridge database capabilities with modern machine learning requirements. The modular design ensures flexibility while the standardized interfaces guarantee consistency across all model implementations.
The architecture supports the complete ML lifecycle from development through production deployment, with built-in monitoring, security, and compliance capabilities essential for enterprise environments.