-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_database_cache.py
More file actions
executable file
·275 lines (218 loc) · 8.36 KB
/
Copy pathtest_database_cache.py
File metadata and controls
executable file
·275 lines (218 loc) · 8.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env python3
"""
Test script for Database Pool and Redis Stack
Verifies connection pooling, caching, and health monitoring
"""
import asyncio
import sys
import time
def test_cache_manager():
"""Test the multi-layer cache manager"""
print("\n" + "=" * 60)
print("TESTING MULTI-LAYER CACHE MANAGER")
print("=" * 60)
try:
from services.shared.infrastructure.cache_manager import (
MultiLayerCacheManager,
get_cache_stats,
clear_all_cache,
)
cache = MultiLayerCacheManager(
max_memory_entries=100,
default_ttl_seconds=60,
namespace="test",
)
print("\n1. Testing basic get/set operations...")
cache.set("user:1", {"name": "John", "age": 30})
result = cache.get("user:1")
assert result == {"name": "John", "age": 30}, "Cache get/set failed"
print(" ✓ Basic get/set operations work")
print("\n2. Testing cache hit/miss...")
stats_before = cache.metrics.hits
_ = cache.get("user:1")
stats_after = cache.metrics.hits
assert stats_after > stats_before, "Cache hit not recorded"
print(" ✓ Cache hit tracking works")
miss = cache.get("nonexistent")
assert miss is None, "Cache miss should return None"
print(" ✓ Cache miss returns None")
print("\n3. Testing cache deletion...")
cache.set("temp:1", "value")
assert cache.get("temp:1") == "value"
cache.delete("temp:1")
assert cache.get("temp:1") is None
print(" ✓ Cache deletion works")
print("\n4. Testing cache statistics...")
stats = cache.get_stats()
assert "l1_cache" in stats
assert "l2_cache" in stats
assert "metrics" in stats
print(
f" ✓ Cache stats: L1={stats['l1_cache']['entries']}, L2={stats['l2_cache']['entries']}"
)
print("\n5. Testing namespace clear...")
cache.set("ns1:key1", "value1")
cache.set("ns2:key1", "value2")
cleared = cache.clear_namespace("ns1")
assert cache.get("ns1:key1") is None
assert cache.get("ns2:key1") == "value2"
print(f" ✓ Namespace clear: {cleared} entries removed")
cache.clear_all()
assert cache.get_stats()["l1_cache"]["entries"] == 0
print(" ✓ Clear all cache works")
print("\n✅ All cache manager tests passed!")
return True
except Exception as e:
print(f"\n❌ Cache manager test failed: {e}")
import traceback
traceback.print_exc()
return False
async def test_database_health():
"""Test database health monitoring"""
print("\n" + "=" * 60)
print("TESTING DATABASE HEALTH MONITORING")
print("=" * 60)
try:
from services.shared.infrastructure.database_health import (
DatabaseHealthMonitor,
create_health_monitor,
)
postgres_url = (
"postgresql://zenith_user:zenith_password@localhost:5432/zenith_db"
)
print("\n1. Creating health monitor...")
monitor = create_health_monitor(
database_url=postgres_url,
pool_min_size=2,
pool_max_size=10,
)
print(" ✓ Health monitor created")
print("\n2. Initializing connection pool...")
await monitor.initialize()
print(" ✓ Connection pool initialized")
print("\n3. Running health check...")
health = await monitor.check_health()
print(f" Status: {health.status.value}")
print(f" Latency: {health.latency_ms:.2f}ms")
if health.version:
print(f" PostgreSQL Version: {health.version}")
print(" ✓ Health check completed")
print("\n4. Testing connection quality...")
quality = await monitor.check_connection_quality()
print(f" Success Rate: {quality['success_rate'] * 100:.0f}%")
if quality.get("avg_latency_ms"):
print(f" Avg Latency: {quality['avg_latency_ms']:.2f}ms")
print(" ✓ Connection quality tested")
print("\n5. Getting pool status...")
pool_status = await monitor.get_pool_status()
print(
f" Pool Config: min={pool_status['pool_config']['min_size']}, max={pool_status['pool_config']['max_size']}"
)
print(" ✓ Pool status retrieved")
print("\n6. Converting to dict...")
health_dict = monitor.to_dict()
assert "status" in health_dict
assert "connections" in health_dict
assert "database" in health_dict
print(" ✓ Dict conversion works")
await monitor.close()
print("\n✅ All database health tests passed!")
return True
except Exception as e:
print(f"\n❌ Database health test failed: {e}")
import traceback
traceback.print_exc()
return False
async def test_pgbouncer():
"""Test PGBouncer monitoring"""
print("\n" + "=" * 60)
print("TESTING PGBOUNCER MONITORING")
print("=" * 60)
try:
from services.shared.infrastructure.database_health import (
PGBouncerHealthMonitor,
)
pgbouncer_url = (
"postgresql://zenith_user:zenith_password@localhost:6432/zenith_db"
)
print("\n1. Creating PGBouncer monitor...")
monitor = PGBouncerHealthMonitor(pgbouncer_url)
print(" ✓ PGBouncer monitor created")
print("\n2. Getting pool configuration...")
config = await monitor.get_pool_configuration()
if config.get("connected"):
print(f" ✓ Connected to PGBouncer")
print(f" Pool Mode: {config['configuration'].get('pool_mode', 'N/A')}")
else:
print(" ⚠️ PGBouncer not available (expected if not running locally)")
print("\n✅ PGBouncer tests completed!")
return True
except Exception as e:
print(f"\n❌ PGBouncer test failed: {e}")
import traceback
traceback.print_exc()
return False
async def test_integration():
"""Integration test with actual services"""
print("\n" + "=" * 60)
print("INTEGRATION TEST")
print("=" * 60)
try:
import redis
print("\n1. Testing Redis connection...")
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
r.ping()
print(" ✓ Redis connected")
print("\n2. Testing Redis operations...")
r.set("test:key", "test_value")
value = r.get("test:key")
assert value == "test_value"
r.delete("test:key")
print(" ✓ Redis operations work")
print("\n3. Testing cache manager with Redis...")
from services.shared.infrastructure.cache_manager import cache_manager
if cache_manager.redis_available:
cache_manager.set("integration:test", {"data": "test"})
result = cache_manager.get("integration:test")
assert result == {"data": "test"}
cache_manager.delete("integration:test")
print(" ✓ Cache manager with Redis works")
else:
print(" ⚠️ Redis not available for cache manager")
print("\n✅ Integration tests completed!")
return True
except Exception as e:
print(f"\n❌ Integration test failed: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all tests"""
print("\n" + "=" * 60)
print("ZENITH PLATFORM - DATABASE & CACHE TEST SUITE")
print("=" * 60)
results = []
print("\nRunning tests...")
results.append(("Cache Manager", test_cache_manager()))
print("\nRunning async tests...")
results.append(("Database Health", await test_database_health()))
results.append(("PGBouncer", await test_pgbouncer()))
results.append(("Integration", await test_integration()))
print("\n" + "=" * 60)
print("TEST SUMMARY")
print("=" * 60)
all_passed = True
for name, passed in results:
status = "✅ PASSED" if passed else "❌ FAILED"
print(f"{name:.<40} {status}")
if not passed:
all_passed = False
print("=" * 60)
if all_passed:
print("🎉 ALL TESTS PASSED!")
return 0
else:
print("❌ SOME TESTS FAILED")
return 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))