From ae79f7028e9a906c33cae76dabe36295f3a9ebcf Mon Sep 17 00:00:00 2001 From: crnsnlzc Date: Tue, 7 Oct 2014 14:48:38 +0800 Subject: [PATCH 1/3] support truncate suro file hourly/daily --- .../suro/sink/localfile/Granularity.java | 47 +++++++++++++++++++ .../suro/sink/localfile/LocalFileSink.java | 16 ++++++- 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 suro-localfile/src/main/java/com/netflix/suro/sink/localfile/Granularity.java diff --git a/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/Granularity.java b/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/Granularity.java new file mode 100644 index 00000000..06bf3441 --- /dev/null +++ b/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/Granularity.java @@ -0,0 +1,47 @@ +package com.netflix.suro.sink.localfile; + +import org.joda.time.*; + +/** + * Created by liuzhenchuan@foxmail.com on 9/30/14. + */ +public enum Granularity { + + HOUR{ + @Override + public DateTime truncate(DateTime time) { + final MutableDateTime mutableDateTime = time.toMutableDateTime(); + mutableDateTime.setMillisOfSecond(0); + mutableDateTime.setSecondOfMinute(0); + mutableDateTime.setMinuteOfHour(0); + return mutableDateTime.toDateTime(); + } + + @Override + public ReadablePeriod getUnits(int n) { + return Hours.hours(n); + } + } , + DAY{ + @Override + public DateTime truncate(DateTime time) { + final MutableDateTime mutableDateTime = time.toMutableDateTime(); + mutableDateTime.setMillisOfDay(0); + return mutableDateTime.toDateTime(); + } + + @Override + public ReadablePeriod getUnits(int n) { + return Days.days(n); + } + }; + + public abstract DateTime truncate(DateTime time); + public abstract ReadablePeriod getUnits(int n); + + public final DateTime next(DateTime time){ + return truncate(time.plus(getUnits(1))); + } + + +} diff --git a/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/LocalFileSink.java b/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/LocalFileSink.java index 5df721ae..28d69714 100644 --- a/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/LocalFileSink.java +++ b/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/LocalFileSink.java @@ -86,6 +86,8 @@ public class LocalFileSink extends QueuedSink implements Sink { private boolean messageWrittenInRotation = false; + private Granularity granularity; + @JsonCreator public LocalFileSink( @JsonProperty("outputDir") String outputDir, @@ -97,6 +99,7 @@ public LocalFileSink( @JsonProperty("queue4Sink") MessageQueue4Sink queue4Sink, @JsonProperty("batchSize") int batchSize, @JsonProperty("batchTimeout") int batchTimeout, + @JsonProperty("granularity") String gran, @JacksonInject TrafficController trafficController, @JacksonInject SpaceChecker spaceChecker) { if (!outputDir.endsWith("/")) { @@ -113,6 +116,10 @@ public LocalFileSink( this.trafficController = trafficController; this.spaceChecker = spaceChecker; + if(gran!=null){ + granularity = Granularity.valueOf(gran.toUpperCase()); + } + Monitors.registerObject(outputDir.replace('/', '_'), this); initialize("localfile_" + outputDir.replace('/', '_'), queue4Sink == null ? new MemoryQueue4Sink(10000) : queue4Sink, batchSize, batchTimeout); @@ -189,7 +196,14 @@ private void rotate() throws IOException { filePath = newName; - nextRotation = new DateTime().plus(rotationPeriod).getMillis(); + DateTime currentDateTime = new DateTime(); + nextRotation = currentDateTime.plus(rotationPeriod).getMillis(); + if(granularity!=null){ + long expectedBreak = granularity.next(currentDateTime).getMillis(); + if(nextRotation >expectedBreak){ + nextRotation = expectedBreak; + } + } if (!spaceChecker.hasEnoughSpace()) { trafficController.stopTakingTraffic(); From 80422aa8c9c42f04bdfb50f50d2ffb3d0de0c4bf Mon Sep 17 00:00:00 2001 From: crnsnlzc Date: Mon, 20 Oct 2014 12:02:57 +0800 Subject: [PATCH 2/3] add copy right --- .../netflix/suro/sink/localfile/Granularity.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/Granularity.java b/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/Granularity.java index 06bf3441..6db51363 100644 --- a/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/Granularity.java +++ b/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/Granularity.java @@ -1,3 +1,19 @@ +/* + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.netflix.suro.sink.localfile; import org.joda.time.*; From d2ceb89b63429a9c7f06ac083007c173068bdaeb Mon Sep 17 00:00:00 2001 From: crnsnlzc Date: Mon, 20 Oct 2014 12:03:18 +0800 Subject: [PATCH 3/3] add test for Granularity --- .../suro/sink/localfile/TestGranularity.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 suro-localfile/src/test/java/com/netflix/suro/sink/localfile/TestGranularity.java diff --git a/suro-localfile/src/test/java/com/netflix/suro/sink/localfile/TestGranularity.java b/suro-localfile/src/test/java/com/netflix/suro/sink/localfile/TestGranularity.java new file mode 100644 index 00000000..275d94c5 --- /dev/null +++ b/suro-localfile/src/test/java/com/netflix/suro/sink/localfile/TestGranularity.java @@ -0,0 +1,51 @@ +/* + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.suro.sink.localfile; + +import org.joda.time.DateTime; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Created by liuzhenchuan@foxmail.com on 10/20/14. + */ +public class TestGranularity { + + @Test + public void testNextHour(){ + DateTime dateTime = new DateTime("2014-10-18T15:48:55.000+08:00"); + DateTime expected = new DateTime("2014-10-18T16:00:00.000+08:00"); + assertEquals(Granularity.HOUR.next(dateTime),expected); + + dateTime = new DateTime("2014-10-18T23:48:55.000+08:00"); + expected = new DateTime("2014-10-19T00:00:00.000+08:00"); + assertEquals(Granularity.HOUR.next(dateTime),expected); + } + + @Test + public void testNextDay(){ + DateTime dateTime = new DateTime("2014-10-18T15:48:55.000+08:00"); + DateTime expected = new DateTime("2014-10-19T00:00:00.000+08:00"); + assertEquals(Granularity.DAY.next(dateTime),expected); + + dateTime = new DateTime("2014-10-18T00:00:00.000+08:00"); + expected = new DateTime("2014-10-19T00:00:00.000+08:00"); + assertEquals(Granularity.DAY.next(dateTime),expected); + } + +} \ No newline at end of file