Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

22 changes: 11 additions & 11 deletions src/java/alering-node-new/src/test/java/MetricClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,32 @@ public class MetricClientTest {
//
// static void setupClient() throws IOException {
// context = new RealAlertingDriverContext("/dev/shm/aeron");
// Producer metricProducer = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/MetricProducerConfig.yaml"));
// Producer metricProducer = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/MetricClientTest/MetricProducerConfig.yaml"));
//
// MetricProducer clientMetricProducer = new MetricProducer(metricProducer);
// Producer clientProducer = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/ClientProducerConfig.yaml"));
// Subscriber clientConsumer = new Subscriber(context, ConfigReader.readConsumerFromFile("src/test/resources/ClientConsumerConfig.yaml"));
// Producer clientProducer = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/MetricClientTest/ClientProducerConfig.yaml"));
// Subscriber clientConsumer = new Subscriber(context, ConfigReader.readSubscriberFromFile("src/test/resources/MetricClientTest/ClientConsumerConfig.yaml"));
//
// MetricRegistry.initialize(clientProducer, clientConsumer, clientMetricProducer, context);
// metricRegistry = MetricRegistry.getInstance();
//
// alertSubscriber = new AlertSubscriberTest(new Subscriber(context,
// ConfigReader.readConsumerFromFile("src/test/resources/AlertConsumerConfig.yaml")),
// ConfigReader.readSubscriberFromFile("src/test/resources/MetricClientTest/AlertConsumerConfig.yaml")),
// alertId, metricId, alertValue, alertTimestamp);
// alertSubscriber.start();
// }
//
// static void setupServer() {
// serverProducer = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/ServerProducerConfig.yaml"));
// serverSubscriber = new Subscriber(context, ConfigReader.readConsumerFromFile("src/test/resources/ServerConsumerConfig.yaml"));
// serverProducer = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/MetricClientTest/ServerProducerConfig.yaml"));
// serverSubscriber = new Subscriber(context, ConfigReader.readSubscriberFromFile("src/test/resources/MetricClientTest/ServerConsumerConfig.yaml"));
//
// alertNodeProducer1 = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/AlertNodeProducerConfig1.yaml"));
// alertNodeProducer2 = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/AlertNodeProducerConfig2.yaml"));
// alertNodeSubscriber1 = new Subscriber(context, ConfigReader.readConsumerFromFile("src/test/resources/AlertNodeConsumerConfig1.yaml"));
// alertNodeSubscriber2 = new Subscriber(context, ConfigReader.readConsumerFromFile("src/test/resources/AlertNodeConsumerConfig2.yaml"));
// alertNodeProducer1 = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/MetricClientTest/AlertNodeProducerConfig1.yaml"));
// alertNodeProducer2 = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/MetricClientTest/AlertNodeProducerConfig2.yaml"));
// alertNodeSubscriber1 = new Subscriber(context, ConfigReader.readSubscriberFromFile("src/test/resources/MetricClientTest/AlertNodeConsumerConfig1.yaml"));
// alertNodeSubscriber2 = new Subscriber(context, ConfigReader.readSubscriberFromFile("src/test/resources/MetricClientTest/AlertNodeConsumerConfig2.yaml"));
// }
//
// @BeforeAll
// @BeforeAllConsumer
// static void run() throws IOException {
// setupClient();
// setupServer();
Expand Down
123 changes: 61 additions & 62 deletions src/java/alering-node-new/src/test/java/TimeTestingServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,66 +24,65 @@ private class ApiNode {
Producer serverProducer;
Subscriber serverSubscriber;
}
//
// public void main() throws Exception {
// RealAlertingDriverContext context = new RealAlertingDriverContext("/dev/shm/aeron");
// ApiNode[] apiNodes = new ApiNode[clientCount];
//
// AlertSystemBalancer alertSystemBalancer = new AlertSystemBalancer();
// AlertNode[] alertNodes = new AlertNode[2 * clientCount];
//
// AlertInfo[] alertInfos = new AlertInfo[alertPerClient];
// AlertLogicBase[] alertLogicBases = new AlertLogicBase[alertPerClient];
// for (int i = 0; i < alertPerClient; ++i) {
// alertInfos[i] = new AlertInfo(i, alertPerClient - i, threshold);
// alertLogicBases[i] = switch (i % 5) {
// case 0 -> new GreaterAlert(alertInfos[i]);
// case 1 -> new LessAlert(alertInfos[i]);
// case 2 -> new EqualAlert(alertInfos[i]);
// case 3 -> new GreaterOrEqualAlert(alertInfos[i]);
// case 4 -> new LessOrEqualAlert(alertInfos[i]);
// default -> throw new IllegalStateException("Unexpected value: " + i % 5);
// };
// }
//
// for (int i = 0; i < clientCount; ++i) {
// ApiNode node = new ApiNode();
// node.serverProducer = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/ServerProducerConfig" + i + ".yaml"));
// node.serverSubscriber = new Subscriber(context, ConfigReader.readConsumerFromFile("src/test/resources/ServerSubscriberConfig" + i + ".yaml"));
// apiNodes[i] = node;
//
//
// alertSystemBalancer.clientCreated(i);
// Producer alertNodeProducer1 = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/AlertNodeProducerConfig" + i + "_1.yaml"));
// Producer alertNodeProducer2 = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/AlertNodeProducerConfig" + i + "_2.yaml"));
// Subscriber alertNodeSubscriber1 = new Subscriber(context, ConfigReader.readProducerFromFile("src/test/resources/AlertNodeSubscriberConfig" + i + "_1.yaml"));
// Subscriber alertNodeSubscriber2 = new Subscriber(context, ConfigReader.readProducerFromFile("src/test/resources/AlertNodeSubscriberConfig" + i + "_2.yaml"));
// alertSystemBalancer.assignClientToAlertNode(i, alertNodeProducer1, alertNodeProducer2, alertNodeSubscriber1, alertNodeSubscriber2);
// AlertNode[] curClientAlertNodes = alertSystemBalancer.getAlertNodesByClientId(i);
// alertNodes[2 * i] = curClientAlertNodes[0];
// alertNodes[2 * i + 1] = curClientAlertNodes[1];
// for (int j = 0; j < alertPerClient; ++j) {
// alertSystemBalancer.addAlertToClientsAlertNodes(i, alertInfos[j], alertLogicBases[j]);
// }
// alertSystemBalancer.startClientsAlertNodes(i);
// // TODO wait all producers
//
// GetMetricId work = new GetMetricId();
// // TODO client должен сделать work.addToMap(tagsList, metricId)
// int finalI = i;
// FragmentHandler serverGetIdHandler = (DirectBuffer buffer, int offset, int length, Header header) -> {
// buffer.getInt(offset); // вытаскиваем id инструкции
// offset += MetricConstants.INT_SIZE;
// work.doWork(client, apiNodes[finalI].serverProducer, buffer, offset, length, header);
// };
// new Thread(() -> {
// int poll = -1;
// while (poll <= 0) {
// poll = apiNodes[finalI].serverSubscriber.getSubscription().poll(serverGetIdHandler, 1000);
// apiNodes[finalI].serverSubscriber.getIdle().idle();
// }
// }).start();
// }
//
// }

public void main() throws Exception {
RealAlertingDriverContext context = new RealAlertingDriverContext("/dev/shm/aeron");
ApiNode[] apiNodes = new ApiNode[clientCount];

AlertSystemBalancer alertSystemBalancer = new AlertSystemBalancer();
AlertNode[] alertNodes = new AlertNode[2 * clientCount];

AlertInfo[] alertInfos = new AlertInfo[alertPerClient];
AlertLogicBase[] alertLogicBases = new AlertLogicBase[alertPerClient];
for (int i = 0; i < alertPerClient; ++i) {
alertInfos[i] = new AlertInfo(i, alertPerClient - i, threshold);
alertLogicBases[i] = switch (i % 5) {
case 0 -> new GreaterAlert(alertInfos[i]);
case 1 -> new LessAlert(alertInfos[i]);
case 2 -> new EqualAlert(alertInfos[i]);
case 3 -> new GreaterOrEqualAlert(alertInfos[i]);
case 4 -> new LessOrEqualAlert(alertInfos[i]);
default -> throw new IllegalStateException("Unexpected value: " + i % 5);
};
}

for (int i = 0; i < clientCount; ++i) {
ApiNode node = new ApiNode();
node.serverProducer = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/TimeTesting/Server/ServerProducerConfig" + i + ".yaml"));
node.serverSubscriber = new Subscriber(context, ConfigReader.readSubscriberFromFile("src/test/resources/TimeTesting/Server/ServerSubscriberConfig" + i + ".yaml"));
apiNodes[i] = node;

alertSystemBalancer.clientCreated(i);
Producer alertNodeProducer1 = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/TimeTesting/Server/AlertNodeProducerConfig" + i + "_1.yaml"));
Producer alertNodeProducer2 = new Producer(context, ConfigReader.readProducerFromFile("src/test/resources/TimeTesting/Server/AlertNodeProducerConfig" + i + "_2.yaml"));
Subscriber alertNodeSubscriber1 = new Subscriber(context, ConfigReader.readSubscriberFromFile("src/test/resources/TimeTesting/Server/AlertNodeSubscriberConfig" + i + "_1.yaml"));
Subscriber alertNodeSubscriber2 = new Subscriber(context, ConfigReader.readSubscriberFromFile("src/test/resources/TimeTesting/Server/AlertNodeSubscriberConfig" + i + "_2.yaml"));
alertSystemBalancer.assignClientToAlertNode(i, alertNodeProducer1, alertNodeProducer2, alertNodeSubscriber1, alertNodeSubscriber2);
AlertNode[] curClientAlertNodes = alertSystemBalancer.getAlertNodesByClientId(i);
alertNodes[2 * i] = curClientAlertNodes[0];
alertNodes[2 * i + 1] = curClientAlertNodes[1];
for (int j = 0; j < alertPerClient; ++j) {
alertSystemBalancer.addAlertToClientsAlertNodes(i, alertInfos[j], alertLogicBases[j]);
}
alertSystemBalancer.startClientsAlertNodes(i);
// TODO wait all producers

GetMetricId work = new GetMetricId();
// TODO client должен сделать work.addToMap(tagsList, metricId)
int finalI = i;
FragmentHandler serverGetIdHandler = (DirectBuffer buffer, int offset, int length, Header header) -> {
buffer.getInt(offset); // вытаскиваем id инструкции
offset += MetricConstants.INT_SIZE;
work.doWork(client, apiNodes[finalI].serverProducer, buffer, offset, length, header);
};
new Thread(() -> {
int poll = -1;
while (poll <= 0) {
poll = apiNodes[finalI].serverSubscriber.getSubscription().poll(serverGetIdHandler, 1000);
apiNodes[finalI].serverSubscriber.getIdle().idle();
}
}).start();
}

}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
consumer:
subscriber:
streams:
- subscription-type: mdc
publication-ip: localhost
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
consumer:
subscriber:
streams:
- subscription-type: mdc
publication-ip: localhost
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
consumer:
subscriber:
streams:
- subscription-type: udp
ip: localhost
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
consumer:
subscriber:
streams:
- subscription-type: udp
ip: localhost
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
consumer:
subscriber:
streams:
- subscription-type: udp
ip: localhost
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ru.realalerting.alertlogic;

public class AlertInfo { // TODO record
public class AlertInfo {
// TODO перенести всю инфу в саму реализацию
private final int alertId;
private final int metricId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ClickHouseAlertTest {
// @BeforeAll
// static void setUp() throws Exception {
// RealAlertingDriverContext context = new RealAlertingDriverContext("/dev/shm/aeron");
// RealAlertingConfig config = ConfigReader.readConsumerFromFile("src/test/resources/ConsumerConfig.yaml");
// RealAlertingConfig config = ConfigReader.readConsumerFromFile("src/test/resources/SubscriberConfig.yaml");
// String url = "jdbc:clickhouse://localhost:8123";
// ClickHouseProperties properties = ClickhouseJdbcUrlParser.parse(url, new ClickHouseProperties().asProperties());
// HttpClient httpClient = new ClickHouseHttpClientBuilder(properties).buildClient();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
consumer:
subscriber:
streams:
- subscription-type: udp
ip: localhost
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package ru.realalerting.metrciclient;

import io.aeron.logbuffer.BufferClaim;
import org.agrona.BufferUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import ru.realalerting.producer.BaseProducer;
import ru.realalerting.producer.Producer;
import ru.realalerting.protocol.MetricConstants;
Expand Down Expand Up @@ -57,7 +55,7 @@ public void getMetricId(int requestId, List<CharSequence> tags) {
int allocatedMemory = MetricConstants.ID_SIZE + MetricConstants.ID_SIZE
+ MetricConstants.INT_SIZE + allTagsSize + tagsBytes.length * MetricConstants.INT_SIZE;
BufferClaim curBufferClaim = bufferClaim.get();
// int (id инструкции) + int(request id) + колиечство тэгов + сумма размеров всех тэгов + количество тэгов * int (длина тэга)
// instructionId + requestId + количество тэгов + сумма размеров всех тэгов + количество тэгов * длина тэга (int)
if (producer.getPublication().tryClaim(allocatedMemory, curBufferClaim) > 0) {
MutableDirectBuffer buf = curBufferClaim.buffer();
sendData(tagsBytes, requestId, buf, curBufferClaim.offset());
Expand All @@ -68,18 +66,18 @@ public void getMetricId(int requestId, List<CharSequence> tags) {
}

public void getAlertsConfigsByMetricId(int requestId, int metricId) {
int allocatedMemory = MetricConstants.INT_SIZE + MetricConstants.ID_SIZE + MetricConstants.INT_SIZE;
int allocatedMemory = MetricConstants.ID_SIZE + MetricConstants.ID_SIZE + MetricConstants.ID_SIZE;
// instructionId + requestId + metricId
BufferClaim curBufferClaim = bufferClaim.get();
if (producer.getPublication().tryClaim(allocatedMemory, curBufferClaim) > 0) {
MutableDirectBuffer buf = curBufferClaim.buffer();
int offset = curBufferClaim.offset();
buf.putInt(offset, Protocol.INSTRUCTION_GET_METRIC_CRITICAL_ALERTS_BY_METRIC_ID);
offset += MetricConstants.INT_SIZE;
offset += MetricConstants.ID_SIZE;
buf.putInt(offset, requestId);
offset += MetricConstants.ID_SIZE;
buf.putInt(offset, metricId);
offset += MetricConstants.INT_SIZE;
offset += MetricConstants.ID_SIZE;
curBufferClaim.commit();
} else {
++dataLeaked;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,4 @@ public Object[] setMetricIdWithAlerts(DirectBuffer directBuffer, int offset, int
response[3] = alertLogics;
return response;
}

}
24 changes: 0 additions & 24 deletions src/java/metric-client/src/test/java/AlertSubscriberTest.java

This file was deleted.

Loading