-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth_processor.py
More file actions
139 lines (92 loc) · 3.51 KB
/
Copy pathauth_processor.py
File metadata and controls
139 lines (92 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import time
from sender import send_alert
# ---------------- GLOBAL STATE ----------------
cron_events = []
CRON_THRESHOLD = 8
TIME_WINDOW = 60
SSH_THRESHOLD = 5
SUDO_THRESHOLD = 3
ssh_fail_count = 0
sudo_fail_count = 0
# ---------------- AUTH ANALYSIS ----------------
def auth_analysis(log):
global ssh_fail_count, sudo_fail_count, cron_events
service = log.get("service")
message = log.get("message", "")
if not service or not message:
return
# ---------- SSH ----------
if service == "sshd":
if "Failed password for" in message:
ssh_fail_count += 1
print(f"[DETECTED] Failed SSH login (count={ssh_fail_count})")
if ssh_fail_count >= SSH_THRESHOLD:
send_alert(
severity="CRITICAL",
detection="SSH Brute Force",
alert_type="IP",
entity="auth.log",
reason=f"{ssh_fail_count} failed SSH login attempts"
)
ssh_fail_count = 0
elif "session opened" in message:
print("[INFO] Successful SSH login")
send_alert(
severity="INFO",
detection="SSH Successful Login",
alert_type="User",
entity="auth.log",
reason="Successful SSH login detected"
)
# ---------- SUDO ----------
elif service == "sudo":
if (
"authentication failure" in message
or "password check failed" in message
):
sudo_fail_count += 1
print(f"[DETECTED] Failed sudo attempt (count={sudo_fail_count})")
if sudo_fail_count >= SUDO_THRESHOLD:
send_alert(
severity="WARNING",
detection="Privilege Escalation Brute Force",
alert_type="User",
entity="auth.log",
reason=f"{sudo_fail_count} failed sudo attempts"
)
sudo_fail_count = 0
elif "session opened" in message:
print("[INFO] Privilege escalation using sudo")
send_alert(
severity="INFO",
detection="Privilege Escalation by User",
alert_type="User",
entity="auth.log",
reason="Successful sudo execution"
)
# ---------- CRON ----------
elif service == "CRON":
if "session opened" in message:
now = time.time()
cron_events.append(now)
cron_events = [t for t in cron_events if now - t <= TIME_WINDOW]
print(f"[INFO] Cron started (last {TIME_WINDOW}s = {len(cron_events)})")
if len(cron_events) >= CRON_THRESHOLD:
print("[ALERT] Excessive cron executions detected")
send_alert(
severity="ALERT",
detection="Cron Execution Burst",
alert_type="Service",
entity="cron.service",
reason=f"{CRON_THRESHOLD} cron executions within {TIME_WINDOW}s"
)
cron_events.clear()
if "user root" in message:
print("[INFO] Cron job executed as root")
send_alert(
severity="INFO",
detection="Cron Job Executed",
alert_type="Service",
entity="cron.service",
reason="Cron job executed as root"
)