-
Notifications
You must be signed in to change notification settings - Fork 4
feat: implement multi-agent repository analysis integration #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| """ | ||
| Multi-Agent Analysis System for TripMates | ||
| Provides integrated analysis from multiple specialized agents. | ||
| """ | ||
|
|
||
| from .code_analyzer import CodeQualityAgent | ||
| from .compatibility_analyzer import CompatibilityAgent | ||
| from .engagement_analyzer import EngagementAgent | ||
| from .integration_engine import MultiAgentIntegrationEngine | ||
|
|
||
| __all__ = [ | ||
| "CodeQualityAgent", | ||
| "CompatibilityAgent", | ||
| "EngagementAgent", | ||
| "MultiAgentIntegrationEngine", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| """ | ||
| Code Quality Analyzer Agent | ||
| Analyzes repository code for quality, security, and maintainability issues. | ||
| """ | ||
| import ast | ||
| import os | ||
| import re | ||
| from typing import Dict, List, Any | ||
|
|
||
|
|
||
| class CodeQualityAgent: | ||
| """Agent that analyzes code quality metrics and security issues.""" | ||
|
|
||
| def __init__(self, repo_path: str = "."): | ||
| self.repo_path = repo_path | ||
| self.findings = [] | ||
| self.metrics = {} | ||
|
|
||
| def analyze(self) -> Dict[str, Any]: | ||
| """Run full code analysis and return standardized output.""" | ||
| self._scan_python_files() | ||
| self._check_security_issues() | ||
| self._calculate_complexity() | ||
|
|
||
| return { | ||
| "agent_type": "code_analyzer", | ||
| "confidence": 0.85, | ||
| "findings": self.findings, | ||
| "metrics": self.metrics, | ||
| "recommendations": self._generate_recommendations() | ||
| } | ||
|
|
||
| def _scan_python_files(self): | ||
| """Scan all Python files for basic issues.""" | ||
| py_files = [] | ||
| for root, _, files in os.walk(self.repo_path): | ||
| if "node_modules" in root or "__pycache__" in root or ".git" in root: | ||
| continue | ||
| for f in files: | ||
| if f.endswith(".py"): | ||
| py_files.append(os.path.join(root, f)) | ||
|
|
||
| self.metrics["total_python_files"] = len(py_files) | ||
| self.metrics["lines_of_code"] = 0 | ||
|
|
||
| for filepath in py_files: | ||
| try: | ||
| with open(filepath, "r", encoding="utf-8") as f: | ||
| content = f.read() | ||
| self.metrics["lines_of_code"] += len(content.splitlines()) | ||
|
|
||
| # Check for hardcoded secrets | ||
| if re.search(r"(SECRET_KEY|PASSWORD|API_KEY)\s*=\s*['\"][^'\"]+['\"]", content): | ||
| if "settings.py" not in filepath: | ||
| self.findings.append({ | ||
| "severity": "high", | ||
| "category": "security", | ||
| "file": filepath, | ||
| "issue": "Potential hardcoded secret detected" | ||
| }) | ||
|
|
||
| # Check for debug mode in production | ||
| if "DEBUG = True" in content and "settings.py" in filepath: | ||
| self.findings.append({ | ||
| "severity": "critical", | ||
| "category": "security", | ||
| "file": filepath, | ||
| "issue": "DEBUG mode enabled in settings" | ||
| }) | ||
|
|
||
| # Check for bare except clauses | ||
| if "except:" in content and "except Exception" not in content: | ||
| self.findings.append({ | ||
| "severity": "medium", | ||
| "category": "code_quality", | ||
| "file": filepath, | ||
| "issue": "Bare except clause detected" | ||
| }) | ||
|
|
||
| # Check for TODO/FIXME comments | ||
| todos = re.findall(r"(TODO|FIXME|XXX|HACK):", content) | ||
| if todos: | ||
| self.findings.append({ | ||
| "severity": "low", | ||
| "category": "maintenance", | ||
| "file": filepath, | ||
| "issue": f"{len(todos)} unresolved TODO/FIXME comments" | ||
| }) | ||
|
|
||
| except Exception as e: | ||
| self.findings.append({ | ||
| "severity": "low", | ||
| "category": "parsing", | ||
| "file": filepath, | ||
| "issue": f"Could not parse file: {str(e)}" | ||
| }) | ||
|
|
||
| def _check_security_issues(self): | ||
| """Check for common security issues.""" | ||
| # Check for CSRF exemption without proper validation | ||
| csrf_files = [] | ||
| for root, _, files in os.walk(self.repo_path): | ||
| if "node_modules" in root or "__pycache__" in root: | ||
| continue | ||
| for f in files: | ||
| if f.endswith(".py"): | ||
| filepath = os.path.join(root, f) | ||
| try: | ||
| with open(filepath, "r") as file: | ||
| content = file.read() | ||
| if "csrf_exempt" in content and "permission_classes" not in content: | ||
| csrf_files.append(filepath) | ||
| except: | ||
| pass | ||
|
|
||
| if csrf_files: | ||
| self.findings.append({ | ||
| "severity": "high", | ||
| "category": "security", | ||
| "file": ", ".join(csrf_files[:3]), | ||
| "issue": "CSRF exempt endpoints without permission checks" | ||
| }) | ||
|
|
||
| def _calculate_complexity(self): | ||
| """Calculate basic complexity metrics.""" | ||
| self.metrics["security_issues"] = sum(1 for f in self.findings if f["category"] == "security") | ||
| self.metrics["code_quality_issues"] = sum(1 for f in self.findings if f["category"] == "code_quality") | ||
| self.metrics["maintenance_issues"] = sum(1 for f in self.findings if f["category"] == "maintenance") | ||
|
|
||
| def _generate_recommendations(self) -> List[str]: | ||
| """Generate recommendations based on findings.""" | ||
| recs = [] | ||
| if self.metrics.get("security_issues", 0) > 0: | ||
| recs.append("Address security issues before production deployment") | ||
| if self.metrics.get("code_quality_issues", 0) > 0: | ||
| recs.append("Refactor bare except clauses to catch specific exceptions") | ||
| if self.metrics.get("maintenance_issues", 0) > 0: | ||
| recs.append("Resolve TODO/FIXME comments to reduce technical debt") | ||
| recs.append("Add comprehensive test coverage for critical paths") | ||
| recs.append("Implement input validation on all API endpoints") | ||
| return recs |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| """ | ||
| Compatibility Analyzer Agent (Negotiator) | ||
| Analyzes user compatibility and trip matching potential. | ||
| """ | ||
| from typing import Dict, List, Any | ||
| from collections import Counter | ||
|
|
||
|
|
||
| class CompatibilityAgent: | ||
| """Agent that analyzes user compatibility for trip matching.""" | ||
|
|
||
| def __init__(self, users_data: List[Dict] = None, trips_data: List[Dict] = None): | ||
| self.users_data = users_data or [] | ||
| self.trips_data = trips_data or [] | ||
| self.compatibility_scores = [] | ||
|
|
||
| def analyze(self) -> Dict[str, Any]: | ||
| """Run compatibility analysis and return standardized output.""" | ||
| self._analyze_user_profiles() | ||
| self._analyze_trip_diversity() | ||
| self._calculate_match_potential() | ||
|
|
||
| return { | ||
| "agent_type": "compatibility_analyzer", | ||
| "confidence": 0.78, | ||
| "findings": self._get_findings(), | ||
| "metrics": self._get_metrics(), | ||
| "recommendations": self._generate_recommendations() | ||
| } | ||
|
|
||
| def _analyze_user_profiles(self): | ||
| """Analyze user persona distribution.""" | ||
| if not self.users_data: | ||
| return | ||
|
|
||
| travel_freqs = Counter(u.get("travel_frequency", "unknown") for u in self.users_data) | ||
| trip_prefs = Counter(u.get("trip_preferences", "unknown") for u in self.users_data) | ||
| dest_prefs = Counter(u.get("destination_preference", "unknown") for u in self.users_data) | ||
|
|
||
| self.metrics = { | ||
| "total_users": len(self.users_data), | ||
| "travel_frequency_distribution": dict(travel_freqs), | ||
| "trip_preference_distribution": dict(trip_prefs), | ||
| "destination_preference_distribution": dict(dest_prefs) | ||
| } | ||
|
|
||
| def _analyze_trip_diversity(self): | ||
| """Analyze trip diversity and patterns.""" | ||
| if not self.trips_data: | ||
| self.metrics["total_trips"] = 0 | ||
| self.metrics["solo_ratio"] = 0.0 | ||
|
Comment on lines
+47
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): In the |
||
| return | ||
|
|
||
| solo_count = sum(1 for t in self.trips_data if not t.get("group_or_solo", False)) | ||
| self.metrics["total_trips"] = len(self.trips_data) | ||
| self.metrics["solo_ratio"] = solo_count / len(self.trips_data) if self.trips_data else 0.0 | ||
| self.metrics["group_ratio"] = 1.0 - self.metrics["solo_ratio"] | ||
|
|
||
| def _calculate_match_potential(self): | ||
| """Calculate potential match scores.""" | ||
| if not self.users_data or len(self.users_data) < 2: | ||
| self.compatibility_scores = [] | ||
| return | ||
|
|
||
| # Simple compatibility: users with same destination preference | ||
| scores = [] | ||
| for i, user1 in enumerate(self.users_data): | ||
| for user2 in self.users_data[i+1:]: | ||
| score = 0 | ||
| if user1.get("destination_preference") == user2.get("destination_preference"): | ||
| score += 40 | ||
| if user1.get("trip_preferences") == user2.get("trip_preferences"): | ||
| score += 30 | ||
| if user1.get("accommodation_preference") == user2.get("accommodation_preference"): | ||
| score += 20 | ||
| if user1.get("transport_preference") == user2.get("transport_preference"): | ||
| score += 10 | ||
|
|
||
| scores.append({ | ||
| "user_1": user1.get("email", "unknown"), | ||
| "user_2": user2.get("email", "unknown"), | ||
| "score": score, | ||
| "max_score": 100 | ||
| }) | ||
|
|
||
| self.compatibility_scores = sorted(scores, key=lambda x: x["score"], reverse=True)[:10] | ||
|
|
||
| def _get_findings(self) -> List[Dict]: | ||
| """Generate findings from analysis.""" | ||
| findings = [] | ||
|
|
||
| if self.metrics.get("solo_ratio", 0) > 0.7: | ||
| findings.append({ | ||
| "severity": "medium", | ||
| "category": "engagement", | ||
| "issue": "High solo travel ratio suggests low group engagement" | ||
| }) | ||
|
|
||
| if self.metrics.get("total_users", 0) > 0 and self.metrics.get("total_trips", 0) == 0: | ||
| findings.append({ | ||
| "severity": "high", | ||
| "category": "adoption", | ||
| "issue": "Users registered but no trips created" | ||
| }) | ||
|
|
||
| if not self.compatibility_scores: | ||
| findings.append({ | ||
| "severity": "low", | ||
| "category": "data", | ||
| "issue": "Insufficient user data for compatibility analysis" | ||
| }) | ||
|
|
||
| return findings | ||
|
|
||
| def _get_metrics(self) -> Dict[str, Any]: | ||
| """Get calculated metrics.""" | ||
| return { | ||
| **self.metrics, | ||
| "top_compatibility_matches": self.compatibility_scores[:5] | ||
| } | ||
|
|
||
| def _generate_recommendations(self) -> List[str]: | ||
| """Generate recommendations.""" | ||
| recs = [] | ||
| if self.metrics.get("solo_ratio", 0) > 0.7: | ||
| recs.append("Implement group trip incentives to reduce solo travel ratio") | ||
| if self.metrics.get("total_trips", 0) == 0: | ||
| recs.append("Add trip creation wizard to improve user onboarding") | ||
| recs.append("Enhance matching algorithm with ML-based compatibility scoring") | ||
| recs.append("Add real-time chat for matched travelers before trip confirmation") | ||
| return recs | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Potential AttributeError when
users_datais empty becauseself.metricsis never initialized.If
self.users_datais empty,_analyze_user_profilesreturns beforeself.metricsis set, butanalyze()still calls_analyze_trip_diversityand_get_findings, which rely onself.metricsand will raise anAttributeError. Consider initializingself.metrics = {}in__init__or before the early return in_analyze_user_profilesto keep this path safe.