From cfceeb8ecdd0ce21ca98e798a99cddcdefc8bb65 Mon Sep 17 00:00:00 2001 From: ASU Date: Sat, 26 Apr 2025 19:44:55 +0300 Subject: [PATCH 1/6] Added test for read/writing Parquet files as streams to IBucket --- java/pom.xml | 15 + .../com/esamtrade/bucketbase/S3Bucket.java | 17 +- .../esamtrade/bucketbase/IBucketTester.java | 129 ++++++- .../bucketbase/MemoryBucketTest.java | 5 + .../esamtrade/bucketbase/ParquetUtils.java | 354 ++++++++++++++++++ .../bucketbase/S3BucketSDKv1Test.java | 7 + .../esamtrade/bucketbase/S3BucketTest.java | 5 + 7 files changed, 510 insertions(+), 22 deletions(-) create mode 100644 java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java diff --git a/java/pom.xml b/java/pom.xml index f8102ef..71138f4 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -51,6 +51,21 @@ ${junit.version} test + + + + org.apache.parquet + parquet-hadoop-bundle + 1.15.1 + test + + + org.apache.hadoop + hadoop-client + 3.4.1 + test + + diff --git a/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java b/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java index f50cfea..e333c34 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java +++ b/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java @@ -11,6 +11,7 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.*; +import java.io.BufferedInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -24,20 +25,26 @@ import java.util.stream.Collectors; public class S3Bucket extends BaseBucket { - + private final static int DEFAULT_BUF_SIZE = 8 * 1024; + protected final int BUF_SIZE; // 8 KB by default protected S3Client s3Client; protected S3AsyncClient s3AsyncClient; protected String bucketName; - public S3Bucket(S3Client s3Client, S3AsyncClient s3AsyncClient, String bucketName) { + public S3Bucket(S3Client s3Client, S3AsyncClient s3AsyncClient, String bucketName, int bufSize) { this.s3Client = s3Client; this.s3AsyncClient = s3AsyncClient; this.bucketName = bucketName; + this.BUF_SIZE = bufSize; } public S3Bucket(String endpoint, String accessKey, String secretKey, String bucketName) { - this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName); + this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName, DEFAULT_BUF_SIZE); + } + + public S3Bucket(String endpoint, String accessKey, String secretKey, String bucketName, int bufSize) { + this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName, bufSize); } private static S3Client createS3Client(String endpoint, String accessKey, String secretKey) { @@ -159,13 +166,13 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException { .key(name.toString()) .build(); InputStream inputStream = s3Client.getObject(request, ResponseTransformer.toInputStream()); - return new ObjectStream(inputStream, name.toString()); + BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, BUF_SIZE); + return new ObjectStream(bufferedInputStream, name.toString()); } catch (NoSuchKeyException e) { throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); } } - @Override /** * Lists all objects in the S3 bucket with the given prefix. diff --git a/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java b/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java index 401513e..ba67095 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java +++ b/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java @@ -1,11 +1,31 @@ package com.esamtrade.bucketbase; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; + import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream; @@ -111,6 +131,43 @@ public void testPutAndGetObjectStream() throws IOException { assertThrows(FileNotFoundException.class, () -> storage.getObjectStream(nonExistentPath)); } + public void testPutAndGetParquetObjectStream() throws IOException { + MessageType schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("int_field") + .named("TestSchema"); + + String uniqueDir = "dir" + uniqueSuffix; + + // Binary content + PurePosixPath path = PurePosixPath.from(uniqueDir, "file1.parquet"); + + InputStream inputStream = generateParquetOutput(); + storage.putObjectStream(path, inputStream); + + ObjectStream objectStream = storage.getObjectStream(path); + assertTrue(objectStream.getStream().markSupported()); + + InputFile inFile = new ParquetUtils.StreamInputFile(objectStream); + try (ParquetFileReader reader = ParquetFileReader.open(inFile)) { + int count = 0; + // get actual file schema + var metadata = reader.getFooter().getFileMetaData(); + MessageType fileSchema = metadata.getSchema(); + PageReadStore pages; + while ((pages = reader.readNextRowGroup()) != null) { + MessageColumnIO columnIO = new ColumnIOFactory() + .getColumnIO(schema, fileSchema); + RecordReader rr = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); + for (int i = 0, rows = (int) pages.getRowCount(); i < rows; i++) { + Group g = rr.read(); + assertEquals(count, g.getInteger("int_field", 0)); + count++; + } + } + assertEquals(3, count); + } + } + public void testListObjects() throws IOException { String uniqueDir = "dir" + uniqueSuffix; storage.putObject(PurePosixPath.from(uniqueDir, "file1.txt"), "Content 1".getBytes()); @@ -151,23 +208,22 @@ private PurePosixPath ensureDirWith2025Keys() throws IOException { List existingKeys = storage.listObjects(pathWith2025Keys); if (existingKeys.isEmpty()) { // Create the directory and add 2025 files - try (ForkJoinPool customThreadPool = new ForkJoinPool(50)) { - try { - customThreadPool.submit(() -> - IntStream.range(0, 2025).parallel().forEach(i -> { - try { - var path = pathWith2025Keys.join("file" + i + ".txt"); - storage.putObject(path, ("Content " + i).getBytes()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - ).get(); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - customThreadPool.shutdown(); - } + ForkJoinPool customThreadPool = new ForkJoinPool(50); + try { + customThreadPool.submit(() -> + IntStream.range(0, 2025).parallel().forEach(i -> { + try { + var path = pathWith2025Keys.join("file" + i + ".txt"); + storage.putObject(path, ("Content " + i).getBytes()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + ).get(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + customThreadPool.shutdown(); } } return pathWith2025Keys; @@ -245,4 +301,43 @@ public void testRemoveObjects() throws IOException { List prefixes = shallowListing.getPrefixes(); assertFalse(prefixes.contains(uniqueDir + "/")); } + + private InputStream generateParquetOutput() throws IOException { + MessageType schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("int_field") + .named("TestSchema"); + + // enable writing + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + int pipeBuffer = 4 * 1024 * 1024; // 4 MB internal buffer + PipedInputStream inputStreamToReturn = new PipedInputStream(pipeBuffer); + PipedOutputStream pipedOutputStream = new PipedOutputStream(inputStreamToReturn); + + // wrap pipe in our PositionOutputStream + ParquetUtils.PositionOutputStreamWrapper posOutStream = new ParquetUtils.PositionOutputStreamWrapper(pipedOutputStream); + ParquetUtils.OutputFileWrapper OutputFileWrapper = new ParquetUtils.OutputFileWrapper(posOutStream); + + // start a thread to write Parquet into the pipe + Thread writerThread = new Thread(() -> { + try (ParquetWriter writer = ExampleParquetWriter + .builder(OutputFileWrapper) + .withConf(conf) + .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE) + .withType(schema) + .build()) { + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + for (int i = 0; i < 3; i++) { + writer.write(factory.newGroup().append("int_field", i)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + writerThread.start(); + + return inputStreamToReturn; + } } diff --git a/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java b/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java index 7f056cc..fb12381 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java +++ b/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java @@ -26,6 +26,11 @@ public void testPutAndGetObject() throws IOException { tester.testPutAndGetObject(); } + @Test + void testPutAndGetParquetObjectStream() throws IOException { + tester.testPutAndGetParquetObjectStream(); + } + @Test void putObjectAndGetObjectStream() throws IOException { tester.testPutAndGetObjectStream(); diff --git a/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java b/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java new file mode 100644 index 0000000..5dd3f5f --- /dev/null +++ b/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java @@ -0,0 +1,354 @@ +package com.esamtrade.bucketbase; + +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.io.SeekableInputStream; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class ParquetUtils { + static class SeekableByteArrayInputStream extends SeekableInputStream { + private final byte[] data; + private int pos; + + SeekableByteArrayInputStream(byte[] data) { + this.data = data; + this.pos = 0; + } + + @Override + public long getPos() { + return pos; + } + + @Override + public void seek(long newPos) throws IOException { + if (newPos < 0 || newPos > data.length) { + throw new IOException("Invalid seek: " + newPos); + } + pos = (int) newPos; + } + + @Override + public int read() { + if (pos >= data.length) + return -1; + return data[pos++] & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) { + if (pos >= data.length) + return -1; + int toRead = Math.min(len, data.length - pos); + System.arraycopy(data, pos, b, off, toRead); + pos += toRead; + return toRead; + } + + @Override + public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + int remaining = len; + int offset = off; + while (remaining > 0) { + int r = read(b, offset, remaining); + if (r < 0) + throw new EOFException("EOF before filling buffer"); + remaining -= r; + offset += r; + } + } + + @Override + public int read(ByteBuffer buf) throws IOException { + if (!buf.hasRemaining()) + return 0; + int toRead = Math.min(buf.remaining(), data.length - pos); + if (toRead <= 0) + return -1; + buf.put(data, pos, toRead); + pos += toRead; + return toRead; + } + + @Override + public void readFully(ByteBuffer buf) throws IOException { + int remaining = buf.remaining(); + byte[] temp = new byte[remaining]; + readFully(temp, 0, remaining); + buf.put(temp); + } + } + + static class ByteArrayPositionOutputStream extends PositionOutputStream { + private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + @Override + public void write(int b) throws IOException { + baos.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + baos.write(b, off, len); + } + + @Override + public long getPos() { + return baos.size(); + } + + byte[] toByteArray() { + return baos.toByteArray(); + } + } + + static class InMemoryOutputFile implements OutputFile { + private ByteArrayPositionOutputStream current; + + @Override + public PositionOutputStream create(long blockSizeHint) { + current = new ByteArrayPositionOutputStream(); + return current; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return create(blockSizeHint); + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 0; + } + + byte[] toByteArray() { + return current.toByteArray(); + } + } + + static class InMemoryInputFile implements InputFile { + private final byte[] data; + + InMemoryInputFile(byte[] data) { + this.data = data; + } + + @Override + public long getLength() { + return data.length; + } + + @Override + public SeekableInputStream newStream() { + return new SeekableByteArrayInputStream(data); + } + } + + static class BufferedBytesInputFile implements InputFile { + private final byte[] data; + + BufferedBytesInputFile(byte[] data) { + this.data = data; + } + + @Override + public long getLength() { + return data.length; + } + + @Override + public SeekableInputStream newStream() { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + BufferedInputStream bis = new BufferedInputStream(bais); + SeekableInputStream sis = new SeekableInputStreamWrapper(bis); + return sis; + } + } + + static class StreamInputFile implements InputFile { + private final ObjectStream stream; + + StreamInputFile(ObjectStream stream) { + this.stream = stream; + } + + @Override + public long getLength() throws IOException { + return stream.getStream().available(); + } + + @Override + public SeekableInputStream newStream() throws IOException { + return new SeekableInputStreamWrapper(stream.getStream()); + } + } + + public static class SeekableInputStreamWrapper extends SeekableInputStream { + private final InputStream in; + private long pos = 0; + private final int markLimit = 0; + + /** + * @param in the raw InputStream (e.g. S3ObjectInputStream) + */ + public SeekableInputStreamWrapper(InputStream in) { + this.in = in; + in.mark(markLimit); + + } + + @Override + public long getPos() { + return pos; + } + + @Override + public void seek(long newPos) throws IOException { + if (newPos < 0) { + throw new IOException("Cannot seek to negative position: " + newPos); + } + // if going backwards, reset to the mark and re‐mark + if (newPos < pos) { + in.reset(); + in.mark(markLimit); + pos = 0; + } + // skip forward to the desired position + long toSkip = newPos - pos; + while (toSkip > 0) { + long skipped = in.skip(toSkip); + if (skipped <= 0) { + throw new EOFException("Unable to skip to position " + newPos); + } + toSkip -= skipped; + pos += skipped; + } + } + + @Override + public int read() throws IOException { + int b = in.read(); + if (b >= 0) + pos++; + return b; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int n = in.read(b, off, len); + if (n > 0) + pos += n; + return n; + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + int total = 0; + while (total < len) { + int n = in.read(b, off + total, len - total); + if (n < 0) + throw new EOFException("EOF before filling buffer"); + total += n; + } + pos += len; + } + + @Override + public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + @Override + public int read(ByteBuffer buf) throws IOException { + int toRead = buf.remaining(); + byte[] tmp = new byte[toRead]; + int n = in.read(tmp, 0, toRead); + if (n > 0) { + buf.put(tmp, 0, n); + pos += n; + } + return n; + } + + @Override + public void readFully(ByteBuffer buf) throws IOException { + int toRead = buf.remaining(); + byte[] tmp = new byte[toRead]; + readFully(tmp, 0, toRead); + buf.put(tmp); + } + + @Override + public void close() throws IOException { + in.close(); + } + } + + public static class PositionOutputStreamWrapper extends PositionOutputStream { + private final OutputStream out; + private long pos = 0; + + public PositionOutputStreamWrapper(OutputStream out) { + this.out = out; + } + + @Override public void write(int b) throws IOException { + out.write(b); + pos++; + } + + @Override public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + pos += len; + } + + @Override public long getPos() { + return pos; + } + + @Override public void close() throws IOException { + out.close(); + } + } + + // 2) hands Parquet the above stream when it wants to create the file + public static class OutputFileWrapper implements OutputFile { + private final PositionOutputStreamWrapper posOut; + + public OutputFileWrapper(PositionOutputStreamWrapper posOut) { + this.posOut = posOut; + } + + @Override + public PositionOutputStream create(long blockSizeHint) { + return posOut; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return posOut; + } + + @Override public boolean supportsBlockSize() { return false; } + @Override public long defaultBlockSize() { return 0; } + } +} \ No newline at end of file diff --git a/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java b/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java index 5317c32..de4e548 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java +++ b/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java @@ -3,6 +3,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -40,6 +41,12 @@ void putObjectAndGetObjectStream() throws IOException { tester.testPutAndGetObjectStream(); } + @Test + @Disabled("AWS SDK V1's S3ObjectInputStream does not support mark/reset operations required for Parquet reading") + void testPutAndGetParquetObjectStream() throws IOException { + tester.testPutAndGetParquetObjectStream(); + } + @Test void getListObjects() throws IOException { tester.testListObjects(); diff --git a/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java b/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java index 506523c..7140f71 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java +++ b/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java @@ -41,6 +41,11 @@ void putObjectAndGetObjectStream() throws IOException { tester.testPutAndGetObjectStream(); } + @Test + void testPutAndGetParquetObjectStream() throws IOException { + tester.testPutAndGetParquetObjectStream(); + } + @Test void getListObjects() throws IOException { tester.testListObjects(); From 004dd2967cfa45140b08746ebf15b52bc6d66bd7 Mon Sep 17 00:00:00 2001 From: ASU Date: Sat, 26 Apr 2025 21:18:26 +0300 Subject: [PATCH 2/6] Removed redundant classes from ParquetUtils --- .../esamtrade/bucketbase/ParquetUtils.java | 195 ++---------------- 1 file changed, 17 insertions(+), 178 deletions(-) diff --git a/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java b/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java index 5dd3f5f..d7e7b0a 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java +++ b/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java @@ -5,9 +5,6 @@ import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.SeekableInputStream; -import java.io.BufferedInputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -15,175 +12,6 @@ import java.nio.ByteBuffer; public class ParquetUtils { - static class SeekableByteArrayInputStream extends SeekableInputStream { - private final byte[] data; - private int pos; - - SeekableByteArrayInputStream(byte[] data) { - this.data = data; - this.pos = 0; - } - - @Override - public long getPos() { - return pos; - } - - @Override - public void seek(long newPos) throws IOException { - if (newPos < 0 || newPos > data.length) { - throw new IOException("Invalid seek: " + newPos); - } - pos = (int) newPos; - } - - @Override - public int read() { - if (pos >= data.length) - return -1; - return data[pos++] & 0xFF; - } - - @Override - public int read(byte[] b, int off, int len) { - if (pos >= data.length) - return -1; - int toRead = Math.min(len, data.length - pos); - System.arraycopy(data, pos, b, off, toRead); - pos += toRead; - return toRead; - } - - @Override - public void readFully(byte[] b) throws IOException { - readFully(b, 0, b.length); - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - int remaining = len; - int offset = off; - while (remaining > 0) { - int r = read(b, offset, remaining); - if (r < 0) - throw new EOFException("EOF before filling buffer"); - remaining -= r; - offset += r; - } - } - - @Override - public int read(ByteBuffer buf) throws IOException { - if (!buf.hasRemaining()) - return 0; - int toRead = Math.min(buf.remaining(), data.length - pos); - if (toRead <= 0) - return -1; - buf.put(data, pos, toRead); - pos += toRead; - return toRead; - } - - @Override - public void readFully(ByteBuffer buf) throws IOException { - int remaining = buf.remaining(); - byte[] temp = new byte[remaining]; - readFully(temp, 0, remaining); - buf.put(temp); - } - } - - static class ByteArrayPositionOutputStream extends PositionOutputStream { - private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - @Override - public void write(int b) throws IOException { - baos.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - baos.write(b, off, len); - } - - @Override - public long getPos() { - return baos.size(); - } - - byte[] toByteArray() { - return baos.toByteArray(); - } - } - - static class InMemoryOutputFile implements OutputFile { - private ByteArrayPositionOutputStream current; - - @Override - public PositionOutputStream create(long blockSizeHint) { - current = new ByteArrayPositionOutputStream(); - return current; - } - - @Override - public PositionOutputStream createOrOverwrite(long blockSizeHint) { - return create(blockSizeHint); - } - - @Override - public boolean supportsBlockSize() { - return false; - } - - @Override - public long defaultBlockSize() { - return 0; - } - - byte[] toByteArray() { - return current.toByteArray(); - } - } - - static class InMemoryInputFile implements InputFile { - private final byte[] data; - - InMemoryInputFile(byte[] data) { - this.data = data; - } - - @Override - public long getLength() { - return data.length; - } - - @Override - public SeekableInputStream newStream() { - return new SeekableByteArrayInputStream(data); - } - } - - static class BufferedBytesInputFile implements InputFile { - private final byte[] data; - - BufferedBytesInputFile(byte[] data) { - this.data = data; - } - - @Override - public long getLength() { - return data.length; - } - - @Override - public SeekableInputStream newStream() { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - BufferedInputStream bis = new BufferedInputStream(bais); - SeekableInputStream sis = new SeekableInputStreamWrapper(bis); - return sis; - } - } - static class StreamInputFile implements InputFile { private final ObjectStream stream; @@ -311,21 +139,25 @@ public PositionOutputStreamWrapper(OutputStream out) { this.out = out; } - @Override public void write(int b) throws IOException { + @Override + public void write(int b) throws IOException { out.write(b); pos++; } - @Override public void write(byte[] b, int off, int len) throws IOException { + @Override + public void write(byte[] b, int off, int len) throws IOException { out.write(b, off, len); pos += len; } - @Override public long getPos() { + @Override + public long getPos() { return pos; } - @Override public void close() throws IOException { + @Override + public void close() throws IOException { out.close(); } } @@ -348,7 +180,14 @@ public PositionOutputStream createOrOverwrite(long blockSizeHint) { return posOut; } - @Override public boolean supportsBlockSize() { return false; } - @Override public long defaultBlockSize() { return 0; } + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 0; + } } } \ No newline at end of file From 4c7751acaad51c193b7dd4cd4041d7ec5b0f7f58 Mon Sep 17 00:00:00 2001 From: ASU Date: Sat, 26 Apr 2025 21:27:40 +0300 Subject: [PATCH 3/6] Removed redundant redundant methods --- .../com/esamtrade/bucketbase/ParquetUtils.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java b/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java index d7e7b0a..ef7316c 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java +++ b/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java @@ -80,14 +80,6 @@ public int read() throws IOException { return b; } - @Override - public int read(byte[] b, int off, int len) throws IOException { - int n = in.read(b, off, len); - if (n > 0) - pos += n; - return n; - } - @Override public void readFully(byte[] b, int off, int len) throws IOException { int total = 0; @@ -145,12 +137,6 @@ public void write(int b) throws IOException { pos++; } - @Override - public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); - pos += len; - } - @Override public long getPos() { return pos; From 3fcf702f419d2583a97303db86922de22f310c88 Mon Sep 17 00:00:00 2001 From: ASU Date: Sat, 26 Apr 2025 23:24:58 +0300 Subject: [PATCH 4/6] Fixed issues suggested by coderrabbit: - https://github.com/eSAMTrade/bucketbase/pull/144/files#r2061488318 - https://github.com/eSAMTrade/bucketbase/pull/144/files#r2061488322 --- .../com/esamtrade/bucketbase/IBucket.java | 3 ++- .../esamtrade/bucketbase/MemoryBucket.java | 6 ++++++ .../com/esamtrade/bucketbase/S3Bucket.java | 20 +++++++++++++++++++ .../esamtrade/bucketbase/S3BucketSDKv1.java | 14 +++++++++++++ .../esamtrade/bucketbase/IBucketTester.java | 17 +++++++++++++++- .../bucketbase/MemoryBucketTest.java | 5 +++++ .../esamtrade/bucketbase/ParquetUtils.java | 12 ++++++----- .../bucketbase/S3BucketSDKv1Test.java | 5 +++++ .../esamtrade/bucketbase/S3BucketTest.java | 5 +++++ 9 files changed, 80 insertions(+), 7 deletions(-) diff --git a/java/src/main/java/com/esamtrade/bucketbase/IBucket.java b/java/src/main/java/com/esamtrade/bucketbase/IBucket.java index ecd6ec4..2e4fc14 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/IBucket.java +++ b/java/src/main/java/com/esamtrade/bucketbase/IBucket.java @@ -7,7 +7,6 @@ import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -28,6 +27,8 @@ public interface IBucket { ObjectStream getObjectStream(PurePosixPath name) throws IOException; + long getSize(PurePosixPath name) throws IOException; + List listObjects(PurePosixPath prefix) throws IOException; ShallowListing shallowListObjects(PurePosixPath prefix) throws IOException; diff --git a/java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java b/java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java index 25eaab1..4714d0b 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java +++ b/java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java @@ -60,6 +60,12 @@ public ObjectStream getObjectStream(PurePosixPath name) throws FileNotFoundExcep return new ObjectStream(new ByteArrayInputStream(content), name.toString()); } + @Override + public long getSize(PurePosixPath name) throws IOException { + byte[] content = getObject(name); + return content.length; + } + @Override public List listObjects(PurePosixPath prefix) { splitPrefix(prefix); // validate prefix diff --git a/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java b/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java index e333c34..00dfabd 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java +++ b/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java @@ -1,8 +1,10 @@ package com.esamtrade.bucketbase; +import com.amazonaws.SdkClientException; import org.apache.commons.codec.digest.DigestUtils; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; @@ -173,6 +175,24 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException { } } + @Override + public long getSize(PurePosixPath name) throws IOException { + try { + HeadObjectRequest request = HeadObjectRequest.builder() + .bucket(bucketName) + .key(name.toString()) + .build(); + HeadObjectResponse response = s3Client.headObject(request); + return response.contentLength(); + } + catch (NoSuchKeyException e) { + throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); + } + catch (AwsServiceException | SdkClientException e) { + throw new IOException("Failed to get object size: " + name, e); + } + } + @Override /** * Lists all objects in the S3 bucket with the given prefix. diff --git a/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java b/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java index f35507e..ad5b6ce 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java +++ b/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java @@ -1,5 +1,6 @@ package com.esamtrade.bucketbase; +import com.amazonaws.SdkClientException; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder; @@ -82,6 +83,19 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException { return new ObjectStream(inputStream, name.toString()); } + @Override + public long getSize(PurePosixPath name) throws IOException { + try { + return s3Client.getObjectMetadata(bucketName, name.toString()).getContentLength(); + } + catch (AmazonS3Exception e) { + if (e.getStatusCode() == 404) + throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); + throw new IOException("Failed to get object metadata: " + e.getMessage(), e); + } catch (SdkClientException e) { + throw new IOException("Failed to get object metadata: " + e.getMessage(), e); + } + } /** * Retrieves a list of object paths stored in the bucket that match the given prefix. diff --git a/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java b/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java index ba67095..86d3966 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java +++ b/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java @@ -146,8 +146,9 @@ public void testPutAndGetParquetObjectStream() throws IOException { ObjectStream objectStream = storage.getObjectStream(path); assertTrue(objectStream.getStream().markSupported()); + long size = storage.getSize(path); - InputFile inFile = new ParquetUtils.StreamInputFile(objectStream); + InputFile inFile = new ParquetUtils.StreamInputFile(objectStream, size); try (ParquetFileReader reader = ParquetFileReader.open(inFile)) { int count = 0; // get actual file schema @@ -168,6 +169,20 @@ public void testPutAndGetParquetObjectStream() throws IOException { } } + public void testGetSize() throws IOException { + String uniqueDir = "dir" + uniqueSuffix; + + // Binary content + PurePosixPath path = PurePosixPath.from(uniqueDir, "file1.bin"); + byte[] bContent = "Test\ncontent".getBytes(); + ByteArrayInputStream byteStream = new ByteArrayInputStream(bContent); + storage.putObjectStream(path, byteStream); + + long size = storage.getSize(path); + assertEquals(bContent.length, size); + assertThrows(FileNotFoundException.class, () -> storage.getSize(new PurePosixPath(uniqueDir, "inexistent.txt"))); + } + public void testListObjects() throws IOException { String uniqueDir = "dir" + uniqueSuffix; storage.putObject(PurePosixPath.from(uniqueDir, "file1.txt"), "Content 1".getBytes()); diff --git a/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java b/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java index fb12381..fd91799 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java +++ b/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java @@ -31,6 +31,11 @@ void testPutAndGetParquetObjectStream() throws IOException { tester.testPutAndGetParquetObjectStream(); } + @Test + void testGetSize() throws IOException { + tester.testGetSize(); + } + @Test void putObjectAndGetObjectStream() throws IOException { tester.testPutAndGetObjectStream(); diff --git a/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java b/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java index ef7316c..4958aa7 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java +++ b/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java @@ -14,14 +14,16 @@ public class ParquetUtils { static class StreamInputFile implements InputFile { private final ObjectStream stream; + private final long length; - StreamInputFile(ObjectStream stream) { + StreamInputFile(ObjectStream stream, long length) { this.stream = stream; + this.length = length; } @Override public long getLength() throws IOException { - return stream.getStream().available(); + return length; } @Override @@ -33,14 +35,14 @@ public SeekableInputStream newStream() throws IOException { public static class SeekableInputStreamWrapper extends SeekableInputStream { private final InputStream in; private long pos = 0; - private final int markLimit = 0; + private final int MARK_LIMIT = Integer.MAX_VALUE; /** * @param in the raw InputStream (e.g. S3ObjectInputStream) */ public SeekableInputStreamWrapper(InputStream in) { this.in = in; - in.mark(markLimit); + in.mark(MARK_LIMIT); } @@ -57,7 +59,7 @@ public void seek(long newPos) throws IOException { // if going backwards, reset to the mark and re‐mark if (newPos < pos) { in.reset(); - in.mark(markLimit); + in.mark(MARK_LIMIT); pos = 0; } // skip forward to the desired position diff --git a/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java b/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java index de4e548..0e8a69d 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java +++ b/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java @@ -47,6 +47,11 @@ void testPutAndGetParquetObjectStream() throws IOException { tester.testPutAndGetParquetObjectStream(); } + @Test + void testGetSize() throws IOException { + tester.testGetSize(); + } + @Test void getListObjects() throws IOException { tester.testListObjects(); diff --git a/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java b/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java index 7140f71..03d8657 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java +++ b/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java @@ -46,6 +46,11 @@ void testPutAndGetParquetObjectStream() throws IOException { tester.testPutAndGetParquetObjectStream(); } + @Test + void testGetSize() throws IOException { + tester.testGetSize(); + } + @Test void getListObjects() throws IOException { tester.testListObjects(); From daafc0e375f9a84776f8e353c87f5ac340771197 Mon Sep 17 00:00:00 2001 From: ASU Date: Mon, 28 Apr 2025 00:02:03 +0300 Subject: [PATCH 5/6] Fixed flacky test testPutAndGetParquetObjectStream, and added testPutAndGetMultiUploadObjectStream to test S3 multipart upload with large files --- .../com/esamtrade/bucketbase/S3Bucket.java | 102 ++++++++++++------ .../esamtrade/bucketbase/IBucketTester.java | 75 +++++++++---- .../esamtrade/bucketbase/S3BucketTest.java | 8 +- 3 files changed, 129 insertions(+), 56 deletions(-) diff --git a/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java b/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java index 00dfabd..3a72013 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java +++ b/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java @@ -19,6 +19,7 @@ import java.io.InputStream; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; import java.util.HashSet; import java.util.List; @@ -28,6 +29,7 @@ public class S3Bucket extends BaseBucket { private final static int DEFAULT_BUF_SIZE = 8 * 1024; + private final static int DEFAULT_UPLOAD_PART_SIZE = 5 * 1024 * 1024; protected final int BUF_SIZE; // 8 KB by default protected S3Client s3Client; protected S3AsyncClient s3AsyncClient; @@ -79,26 +81,40 @@ public void putObject(PurePosixPath name, byte[] content) { } @Override - public void putObjectStream(PurePosixPath name, InputStream stream) { + public void putObjectStream(PurePosixPath name, InputStream stream) throws IOException { String _name = validateName(name); try { uploadLargeStream(_name, stream); } catch (Exception e) { - throw new RuntimeException("Failed to upload object: " + _name, e); + throw new IOException("Failed to upload object: " + _name, e); } finally { s3AsyncClient.close(); } } - private void uploadLargeStream(String key, InputStream inputStream) { - int partSize = 5 * 1024 * 1024; // 5 MB + private void uploadLargeStream(String key, InputStream inputStream) throws IOException { + // Please note, that if the input stream will have less than 5MB, the S3 multipart upload throws 400 code (upload is smaller than the minimum allowed object size) List completedParts = new ArrayList<>(); - byte[] buffer = new byte[partSize]; + byte[] buffer = new byte[DEFAULT_UPLOAD_PART_SIZE]; int bytesRead; int partNumber = 1; - // 1. Initiate the multipart upload + bytesRead = readUploadBuffer(inputStream, buffer); + if (bytesRead == 0) { + // Empty stream, create empty object + this.putObject(PurePosixPath.from(key), new byte[0]); + return; + } + + if (bytesRead < DEFAULT_UPLOAD_PART_SIZE) { + byte[] content = Arrays.copyOf(buffer, bytesRead); + // Small file, use regular putObject to avoid multipart upload being rejected + this.putObject(PurePosixPath.from(key), content); + return; + } + + // 1. Initiate the multipart upload for large files CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() .bucket(bucketName) .key(key) @@ -107,22 +123,16 @@ private void uploadLargeStream(String key, InputStream inputStream) { String uploadId = response.uploadId(); try { - // 2. Read the input stream and upload each part - while ((bytesRead = inputStream.read(buffer)) != -1) { - byte[] bytesToUpload = (bytesRead < partSize) ? java.util.Arrays.copyOf(buffer, bytesRead) : buffer; - UploadPartRequest uploadPartRequest = UploadPartRequest.builder() - .bucket(bucketName) - .key(key) - .uploadId(uploadId) - .partNumber(partNumber) - .contentLength((long) bytesRead) - .build(); - AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytesToUpload); - CompletableFuture uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody); - completedParts.add(CompletedPart.builder() - .partNumber(partNumber) - .eTag(uploadPartResponse.join().eTag()) - .build()); + // Upload the first buffer we already read + CompletedPart firstPart = uploadPart(key, uploadId, partNumber, bytesRead, buffer); + completedParts.add(firstPart); + partNumber++; + + // 2. Continue reading and uploading remaining parts + while ((bytesRead = readUploadBuffer(inputStream, buffer)) != 0) { + byte[] bytesToUpload = (bytesRead < DEFAULT_UPLOAD_PART_SIZE) ? java.util.Arrays.copyOf(buffer, bytesRead) : buffer; + CompletedPart completedPart = uploadPart(key, uploadId, partNumber, bytesRead, bytesToUpload); + completedParts.add(completedPart); partNumber++; } @@ -142,8 +152,41 @@ private void uploadLargeStream(String key, InputStream inputStream) { .uploadId(uploadId) .build(); s3AsyncClient.abortMultipartUpload(abortMultipartUploadRequest).join(); - throw new RuntimeException("Failed to upload object: " + key, e); + throw new IOException("Failed to upload object: " + key, e); + } + } + + private int readUploadBuffer(InputStream inputStream, byte[] buffer) throws IOException { + int totalBytesRead = inputStream.read(buffer); + + if (totalBytesRead == -1) { + return 0; + } + + if (totalBytesRead < DEFAULT_UPLOAD_PART_SIZE) { + int bytesRead; + while (totalBytesRead < DEFAULT_UPLOAD_PART_SIZE && (bytesRead = inputStream.read(buffer, totalBytesRead, DEFAULT_UPLOAD_PART_SIZE - totalBytesRead)) != -1) { + totalBytesRead += bytesRead; + } } + + return totalBytesRead; + } + + private CompletedPart uploadPart(String key, String uploadId, int partNumber, long bytesRead, byte[] bytesToUpload) { + UploadPartRequest uploadPartRequest = UploadPartRequest.builder() + .bucket(bucketName) + .key(key) + .uploadId(uploadId) + .partNumber(partNumber) + .contentLength(bytesRead) + .build(); + AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytesToUpload); + CompletableFuture uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody); + return CompletedPart.builder() + .partNumber(partNumber) + .eTag(uploadPartResponse.join().eTag()) + .build(); } @@ -184,16 +227,13 @@ public long getSize(PurePosixPath name) throws IOException { .build(); HeadObjectResponse response = s3Client.headObject(request); return response.contentLength(); - } - catch (NoSuchKeyException e) { + } catch (NoSuchKeyException e) { throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); - } - catch (AwsServiceException | SdkClientException e) { + } catch (AwsServiceException | SdkClientException e) { throw new IOException("Failed to get object size: " + name, e); } } - @Override /** * Lists all objects in the S3 bucket with the given prefix. * @@ -202,17 +242,15 @@ public long getSize(PurePosixPath name) throws IOException { * @param prefix The prefix to filter objects by. * @return A list of paths to the objects in the bucket. */ + @Override public List listObjects(PurePosixPath prefix) { splitPrefix(prefix); // validate prefix - List result = new ArrayList<>(); ListObjectsV2Request request = ListObjectsV2Request.builder() .bucket(bucketName) .prefix(prefix.toString()) .build(); - List results = s3Client.listObjectsV2Paginator(request).contents().stream().map(S3Object::key).map(PurePosixPath::from).toList(); - - return results; + return s3Client.listObjectsV2Paginator(request).contents().stream().map(S3Object::key).map(PurePosixPath::from).toList(); } @Override diff --git a/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java b/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java index 86d3966..9760b22 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java +++ b/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java @@ -26,6 +26,7 @@ import java.io.InputStreamReader; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream; @@ -35,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -44,7 +46,6 @@ public class IBucketTester { private static final List INVALID_PREFIXES = List.of("/", "/dir", "star*1", "dir1/a\\file.txt", "at@gmail", "sharp#1", "dollar$1", "comma,"); private final BaseBucket storage; private final String uniqueSuffix; - private final String PATH_WITH_2025_KEYS = "test-dir-with-2025-keys/"; public IBucketTester(BaseBucket storage) { this.storage = storage; @@ -72,7 +73,7 @@ public void testPutAndGetObject() throws IOException { String sContent = "Test content"; storage.putObject(path, sContent.getBytes()); retrievedContent = storage.getObject(path); - assertArrayEquals(retrievedContent, sContent.getBytes("utf-8")); + assertArrayEquals(retrievedContent, sContent.getBytes(StandardCharsets.UTF_8)); // ByteArray content path = PurePosixPath.from(uniqueDir, "file1.ba"); @@ -85,7 +86,7 @@ public void testPutAndGetObject() throws IOException { String stringPath = uniqueDir + "/file1.txt"; storage.putObject(PurePosixPath.from(stringPath), sContent.getBytes()); retrievedContent = storage.getObject(PurePosixPath.from(stringPath)); - assertArrayEquals(retrievedContent, sContent.getBytes("utf-8")); + assertArrayEquals(retrievedContent, sContent.getBytes(StandardCharsets.UTF_8)); // Non-existent path PurePosixPath nonExistentPath = PurePosixPath.from(uniqueDir, "inexistent.txt"); @@ -140,17 +141,40 @@ public void testPutAndGetParquetObjectStream() throws IOException { // Binary content PurePosixPath path = PurePosixPath.from(uniqueDir, "file1.parquet"); + int totalRows = 3; + InputStream inputStream = generateParquetOutput(totalRows); + storage.putObjectStream(path, inputStream); + + int count = readRowsCountFromParquet(path, schema); + assertEquals(totalRows, count); + } + + public void testPutAndGetMultiUploadLargeParquetObjectStream() throws IOException { + // This test is intended to generate ~6MB parquet file, so triggering multiple multipart uploads for S3 + MessageType schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("int_field") + .named("TestSchema"); - InputStream inputStream = generateParquetOutput(); + String uniqueDir = "dir" + uniqueSuffix; + + // Binary content + PurePosixPath path = PurePosixPath.from(uniqueDir, "file20.parquet"); + int totalRows = 1_500_000; + InputStream inputStream = generateParquetOutput(totalRows); storage.putObjectStream(path, inputStream); + int count = readRowsCountFromParquet(path, schema); + assertEquals(totalRows, count); + } + + private int readRowsCountFromParquet(PurePosixPath path, MessageType schema) throws IOException { ObjectStream objectStream = storage.getObjectStream(path); assertTrue(objectStream.getStream().markSupported()); long size = storage.getSize(path); InputFile inFile = new ParquetUtils.StreamInputFile(objectStream, size); + int count = 0; try (ParquetFileReader reader = ParquetFileReader.open(inFile)) { - int count = 0; // get actual file schema var metadata = reader.getFooter().getFileMetaData(); MessageType fileSchema = metadata.getSchema(); @@ -165,8 +189,8 @@ public void testPutAndGetParquetObjectStream() throws IOException { count++; } } - assertEquals(3, count); } + return count; } public void testGetSize() throws IOException { @@ -219,6 +243,7 @@ public void testListObjectsWithOver1000keys() throws IOException { } private PurePosixPath ensureDirWith2025Keys() throws IOException { + final String PATH_WITH_2025_KEYS = "test-dir-with-2025-keys/"; var pathWith2025Keys = new PurePosixPath(PATH_WITH_2025_KEYS); List existingKeys = storage.listObjects(pathWith2025Keys); if (existingKeys.isEmpty()) { @@ -226,15 +251,15 @@ private PurePosixPath ensureDirWith2025Keys() throws IOException { ForkJoinPool customThreadPool = new ForkJoinPool(50); try { customThreadPool.submit(() -> - IntStream.range(0, 2025).parallel().forEach(i -> { - try { - var path = pathWith2025Keys.join("file" + i + ".txt"); - storage.putObject(path, ("Content " + i).getBytes()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - ).get(); + IntStream.range(0, 2025).parallel().forEach(i -> { + try { + var path = pathWith2025Keys.join("file" + i + ".txt"); + storage.putObject(path, ("Content " + i).getBytes()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + ).get(); } catch (Exception e) { throw new RuntimeException(e); } finally { @@ -271,8 +296,8 @@ public void testShallowListObjects() throws IOException { ShallowListing shallowListing = storage.shallowListObjects(new PurePosixPath(uniqueDir)); expectedObjects = List.of(new PurePosixPath(uniqueDir + "file1.txt")); expectedPrefixes = List.of(PurePosixPath.from(uniqueDir + "/")); - assertTrue(shallowListing.getObjects() instanceof List); - assertTrue(shallowListing.getPrefixes() instanceof List); + assertInstanceOf(List.class, shallowListing.getObjects()); + assertInstanceOf(List.class, shallowListing.getPrefixes()); assertIterableEquals(expectedObjects, shallowListing.getObjects()); assertIterableEquals(expectedPrefixes, shallowListing.getPrefixes()); @@ -304,7 +329,7 @@ public void testRemoveObjects() throws IOException { List result = storage.removeObjects(List.of(path1, path2, new PurePosixPath(uniqueDir + "/inexistent.file"))); // Check that the files do not exist - assertTrue(result instanceof List); + assertInstanceOf(List.class, result); assertEquals(List.of(), result); assertFalse(storage.exists(path1)); assertFalse(storage.exists(path2)); @@ -314,10 +339,11 @@ public void testRemoveObjects() throws IOException { // Check that the leftover empty directories are also removed, but the bucket may contain leftovers from the other test runs ShallowListing shallowListing = storage.shallowListObjects(new PurePosixPath("")); List prefixes = shallowListing.getPrefixes(); - assertFalse(prefixes.contains(uniqueDir + "/")); + List prefixStrings = prefixes.stream().map(PurePosixPath::toString).toList(); + assertFalse(prefixStrings.contains(uniqueDir + "/")); } - private InputStream generateParquetOutput() throws IOException { + private InputStream generateParquetOutput(int rows) throws IOException { MessageType schema = Types.buildMessage() .required(PrimitiveType.PrimitiveTypeName.INT32).named("int_field") .named("TestSchema"); @@ -327,7 +353,7 @@ private InputStream generateParquetOutput() throws IOException { GroupWriteSupport.setSchema(schema, conf); int pipeBuffer = 4 * 1024 * 1024; // 4 MB internal buffer - PipedInputStream inputStreamToReturn = new PipedInputStream(pipeBuffer); + PipedInputStream inputStreamToReturn = new PipedInputStream(pipeBuffer); PipedOutputStream pipedOutputStream = new PipedOutputStream(inputStreamToReturn); // wrap pipe in our PositionOutputStream @@ -344,7 +370,7 @@ private InputStream generateParquetOutput() throws IOException { .build()) { SimpleGroupFactory factory = new SimpleGroupFactory(schema); - for (int i = 0; i < 3; i++) { + for (int i = 0; i < rows; i++) { writer.write(factory.newGroup().append("int_field", i)); } } catch (IOException e) { @@ -352,6 +378,11 @@ private InputStream generateParquetOutput() throws IOException { } }); writerThread.start(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } return inputStreamToReturn; } diff --git a/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java b/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java index 03d8657..d711a42 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java +++ b/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java @@ -9,7 +9,6 @@ class S3BucketTest { - private S3Bucket bucket; private IBucketTester tester; @BeforeAll @@ -21,7 +20,7 @@ public static void setUpClass() { public void setUp() { String accessKey = System.getenv("MINIO_ACCESS_KEY"); String secretKey = System.getenv("MINIO_SECRET_KEY"); - bucket = new S3Bucket("https://minio.esamtrade.vlada.ro", accessKey, secretKey, "minio-dev-tests"); + S3Bucket bucket = new S3Bucket("https://minio.esamtrade.vlada.ro", accessKey, secretKey, "minio-dev-tests"); tester = new IBucketTester(bucket); } @@ -46,6 +45,11 @@ void testPutAndGetParquetObjectStream() throws IOException { tester.testPutAndGetParquetObjectStream(); } + @Test + void testPutAndGetMultiUploadObjectStream() throws IOException { + tester.testPutAndGetMultiUploadLargeParquetObjectStream(); + } + @Test void testGetSize() throws IOException { tester.testGetSize(); From a82123fb51bd8003f1faa69361a134c589c95810 Mon Sep 17 00:00:00 2001 From: ASU Date: Mon, 28 Apr 2025 00:04:50 +0300 Subject: [PATCH 6/6] Fixed flacky test testPutAndGetParquetObjectStream, and added testPutAndGetMultiUploadObjectStream to test S3 multipart upload with large files --- java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java | 1 + 1 file changed, 1 insertion(+) diff --git a/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java b/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java index 9760b22..3512839 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java +++ b/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java @@ -248,6 +248,7 @@ private PurePosixPath ensureDirWith2025Keys() throws IOException { List existingKeys = storage.listObjects(pathWith2025Keys); if (existingKeys.isEmpty()) { // Create the directory and add 2025 files + @SuppressWarnings("resource") ForkJoinPool customThreadPool = new ForkJoinPool(50); try { customThreadPool.submit(() ->