-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathruntime.py
More file actions
88 lines (67 loc) · 2.25 KB
/
runtime.py
File metadata and controls
88 lines (67 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import threading
import time
import importlib
import json
import os
import signal
from system.config import load_env
# Глобальный флаг для остановки всех агентов
RUNNING = True
def stop_all(signum, frame):
global RUNNING
RUNNING = False
print("\n[Runtime] Остановка системы...")
signal.signal(signal.SIGINT, stop_all)
signal.signal(signal.SIGTERM, stop_all)
def load_agent(agent_name):
"""
Загружает агент как модуль:
agents/<agent_name>/agent.py → класс Agent
"""
module_path = f"agents.{agent_name}.agent"
module = importlib.import_module(module_path)
return module.Agent
def run_agent(agent_name):
"""
Запускает агента в его ритме.
"""
agent_class = load_agent(agent_name)
agent = agent_class()
print(f"[Runtime] Агент '{agent_name}' запущен. Ритм: {agent.rhythm} сек")
while RUNNING:
try:
agent.run()
except Exception as e:
print(f"[Runtime] Ошибка в агенте '{agent_name}': {e}")
time.sleep(agent.rhythm)
print(f"[Runtime] Агент '{agent_name}' остановлен.")
def load_agents_list():
"""
Читает список агентов из agents/ (каждая папка = агент).
"""
agents_dir = "agents"
agents = []
for name in os.listdir(agents_dir):
full = os.path.join(agents_dir, name)
if os.path.isdir(full) and not name.startswith("__"):
agents.append(name)
return agents
def main():
load_env() # загружаем .env
agents = load_agents_list()
print("[Runtime] Найдены агенты:", agents)
threads = []
for agent_name in agents:
t = threading.Thread(target=run_agent, args=(agent_name,), daemon=True)
t.start()
threads.append(t)
print("[Runtime] Все агенты запущены.")
# Ждём завершения
while RUNNING:
time.sleep(1)
print("[Runtime] Ожидание завершения потоков...")
for t in threads:
t.join()
print("[Runtime] Система остановлена.")
if __name__ == "__main__":
main()