diff --git a/examples/java/app/build.gradle b/examples/java/app/build.gradle index 203efc8c..ccb305b7 100644 --- a/examples/java/app/build.gradle +++ b/examples/java/app/build.gradle @@ -14,6 +14,7 @@ plugins { repositories { // Use Maven Central for resolving dependencies. + mavenLocal() mavenCentral() maven { url 'https://s01.oss.sonatype.org/content/repositories/snapshots/' @@ -21,12 +22,12 @@ repositories { } dependencies { - implementation 'io.hstream:hstreamdb-java:0.12.0' + implementation 'io.hstream:hstreamdb-java:0.14.0-SNAPSHOT' } application { // Define the main class for the application. - mainClass = 'docs.code.examples.App' + mainClass = 'docs.code.examples.QueryExample' } tasks.named('test') { diff --git a/examples/java/app/src/main/java/docs/code/examples/QueryExample.java b/examples/java/app/src/main/java/docs/code/examples/QueryExample.java new file mode 100644 index 00000000..aef6c2e1 --- /dev/null +++ b/examples/java/app/src/main/java/docs/code/examples/QueryExample.java @@ -0,0 +1,55 @@ +package docs.code.examples; + +import io.hstream.*; + +public class QueryExample { + public static void main(String[] args) throws Exception { + // Client + String serviceUrl = "hstream://127.0.0.1:26571"; + var client = HStreamClient.builder().serviceUrl(serviceUrl).build(); + // Query + client.createStream("stream01"); + var sql = "create stream result_stream as select * from stream01;"; + var query = client.createQuery("query01", sql); + + Thread.sleep(20000); + var producer = client.newProducer().stream("stream01").build(); + for (int i = 0; i < 10; i++) { + var hRecord = HRecord.newBuilder() + .put("id", i % 3) + .put("value", i).build(); + producer.write(Record.newBuilder().hRecord(hRecord).build()).join(); + } + client.createSubscription(Subscription.newBuilder() + .stream(query.getResultStream()) + .subscription("sub01") + .offset(Subscription.SubscriptionOffset.EARLIEST) + .build()); + // Read + var consumer = client.newConsumer() + .subscription("sub01") + .rawRecordReceiver((receivedRawRecord, responder) -> { + System.out.println("raw:" + receivedRawRecord.getRecordId()); + responder.ack(); + }) + .hRecordReceiver((receivedHRecord, responder) -> { + System.out.println("record:" + receivedHRecord.getHRecord().toCompactJsonString()); + responder.ack(); + }) + .build(); + consumer.startAsync().awaitRunning(); + Thread.sleep(10000); + consumer.stopAsync().awaitTerminated(); + for (var q : client.listQueries()) { + System.out.println("q:" + q.getName() + " status:" + q.getStatus()); + System.out.println("result stream:" + q.getResultStream()); + } + + // View +// client.createView("create view view01 as select id, sum(value) from stream01 group by id;"); +// Thread.sleep(5000); +// for (HRecord hRecord : client.executeViewQuery("select * from view01;")) { +// System.out.println("View Result:" + hRecord.toCompactJsonString()); +// } + } +}