+ * The exact format selected by {@code AUTO} is an implementation detail and may change
+ * between SDK releases. Choose {@link #XML} or {@link #ARROW} explicitly if you require
+ * a specific format.
+ */
+ AUTO,
+
+ /**
+ * XML response format.
+ */
+ XML,
+
+ /**
+ * Apache Arrow response format.
+ */
+ ARROW
+}
diff --git a/sdk/storage/azure-storage-blob/src/main/java/module-info.java b/sdk/storage/azure-storage-blob/src/main/java/module-info.java
index 597a417add7f..e622d0aa2e95 100644
--- a/sdk/storage/azure-storage-blob/src/main/java/module-info.java
+++ b/sdk/storage/azure-storage-blob/src/main/java/module-info.java
@@ -5,6 +5,9 @@
requires transitive com.azure.storage.common;
requires com.azure.storage.internal.avro;
+ requires org.apache.arrow.vector;
+ requires org.apache.arrow.memory.core;
+ requires org.apache.arrow.format;
exports com.azure.storage.blob;
exports com.azure.storage.blob.models;
diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/ContainerApiTests.java b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/ContainerApiTests.java
index f46116acdbb5..d4c9d3838d35 100644
--- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/ContainerApiTests.java
+++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/ContainerApiTests.java
@@ -34,6 +34,7 @@
import com.azure.storage.blob.models.PublicAccessType;
import com.azure.storage.blob.models.RehydratePriority;
import com.azure.storage.blob.models.StorageAccountInfo;
+import com.azure.storage.blob.models.StorageResponseSerializationFormat;
import com.azure.storage.blob.models.TaggedBlobItem;
import com.azure.storage.blob.options.BlobContainerCreateOptions;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
@@ -50,6 +51,10 @@
import com.azure.storage.common.test.shared.extensions.PlaybackOnly;
import com.azure.storage.common.test.shared.extensions.RequiredServiceVersion;
import com.azure.storage.common.test.shared.policy.InvalidServiceVersionPipelinePolicy;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.VectorSchemaRoot;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -58,7 +63,19 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import com.azure.core.http.HttpPipeline;
+import com.azure.storage.blob.implementation.AzureBlobStorageImpl;
+import com.azure.storage.blob.implementation.AzureBlobStorageImplBuilder;
+import com.azure.storage.blob.implementation.models.ContainersListBlobFlatSegmentApacheArrowHeaders;
+import com.azure.storage.blob.implementation.models.ContainersListBlobHierarchySegmentApacheArrowHeaders;
+import com.azure.storage.blob.implementation.util.ArrowBlobListDeserializer;
+import com.azure.storage.blob.implementation.util.ModelHelper;
+import com.azure.storage.blob.models.ListBlobsIncludeItem;
+import com.azure.core.http.rest.ResponseBase;
+
import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
import java.net.URL;
import java.time.OffsetDateTime;
import java.util.Arrays;
@@ -2128,4 +2145,313 @@ public void getBlobContainerUrlEncodesContainerName() {
// then:
// assertThrows(BlobStorageException.class, () ->
// }
+
+ @Test
+ @RequiredServiceVersion(clazz = BlobServiceVersion.class, min = "2026-06-06")
+ public void listBlobsArrowBasic() {
+ // Upload a test blob
+ String blobName = generateBlobName();
+ cc.getBlobClient(blobName).getBlockBlobClient().upload(DATA.getDefaultInputStream(), DATA.getDefaultDataSize());
+
+ ListBlobsOptions options
+ = new ListBlobsOptions().setStorageResponseSerializationFormat(StorageResponseSerializationFormat.ARROW);
+ List> result = uploads.then(ccAsync.listBlobs(options, null).byPage().doOnNext(page -> {
+ assertTrue(page.getValue().size() <= 1);
+ }).flatMap(page -> Flux.fromIterable(page.getValue())).collectList());
+
+ StepVerifier.create(result).assertNext(allBlobs -> assertEquals(4, allBlobs.size())).verifyComplete();
+
+ // Mirror the sync test's secondary assertion: requesting page size 2 yields exactly 2 blobs per page.
+ StepVerifier.create(ccAsync.listBlobs().byPage(2)).thenConsumeWhile(page -> {
+ assertEquals(2, page.getValue().size());
+ return true;
+ }).verifyComplete();
+ }
+
+ @Test
+ @RequiredServiceVersion(clazz = BlobServiceVersion.class, min = "2026-06-06")
+ public void listBlobsArrowNullUseArrowUsesXml() {
+ // Default apacheArrowEnabled is null — should use XML path without error
+ String blobName = generateBlobName();
+ BlockBlobAsyncClient bc = ccAsync.getBlobAsyncClient(blobName).getBlockBlobAsyncClient();
+
+ ListBlobsOptions options = new ListBlobsOptions();
+ assertNull(options.getStorageResponseSerializationFormat());
+
+ StepVerifier
+ .create(
+ bc.upload(DATA.getDefaultFlux(), DATA.getDefaultDataSize()).thenMany(ccAsync.listBlobs(options, null)))
+ .assertNext(item -> assertEquals(blobName, item.getName()))
+ .verifyComplete();
+ }
+
+ @LiveOnly
+ @Test
+ @RequiredServiceVersion(clazz = BlobServiceVersion.class, min = "2026-06-06")
+ public void listBlobsArrowEncryptedBlob() {
+ // Upload a blob with CPK (customer-provided key)
+ String blobName = generateBlobName();
+ CustomerProvidedKey cpk = new CustomerProvidedKey(Base64.getEncoder().encodeToString(getRandomKey()));
+ BlockBlobAsyncClient cpkClient
+ = ccAsync.getBlobAsyncClient(blobName).getCustomerProvidedKeyAsyncClient(cpk).getBlockBlobAsyncClient();
+
+ ListBlobsOptions options
+ = new ListBlobsOptions().setStorageResponseSerializationFormat(StorageResponseSerializationFormat.ARROW);
+
+ StepVerifier.create(cpkClient.upload(DATA.getDefaultFlux(), DATA.getDefaultDataSize())
+ .thenMany(ccAsync.listBlobs(options, null))).assertNext(item -> {
+ assertEquals(blobName, item.getName());
+ // CPK blob should have server-encrypted = true
+ assertTrue(item.getProperties().isServerEncrypted());
+ // Metadata should be null (no metadata was set)
+ assertNull(item.getMetadata());
+ }).verifyComplete();
+ }
+
+ @ParameterizedTest
+ @MethodSource("listBlobsFlatRehydratePrioritySupplier")
+ @RequiredServiceVersion(clazz = BlobServiceVersion.class, min = "2026-06-06")
+ public void listBlobsArrowRehydratePriority(RehydratePriority rehydratePriority) {
+ String name = generateBlobName();
+ BlockBlobAsyncClient bc = ccAsync.getBlobAsyncClient(name).getBlockBlobAsyncClient();
+
+ Mono
> items
+ = uploads.then(ccAsync.listBlobsByHierarchy("/", options).collect(Collectors.toList()));
+
+ StepVerifier.create(items).assertNext(list -> {
+ // Root level: one prefix "dir/" and one blob "topblob"
+ assertEquals(2, list.size());
+
+ BlobItem prefixItem = list.stream().filter(BlobItem::isPrefix).findFirst().orElse(null);
+ BlobItem blobItem = list.stream().filter(i -> !i.isPrefix()).findFirst().orElse(null);
+
+ assertNotNull(prefixItem);
+ assertEquals("dir/", prefixItem.getName());
+ assertTrue(prefixItem.isPrefix());
+
+ assertNotNull(blobItem);
+ assertEquals("topblob", blobItem.getName());
+ assertFalse(blobItem.isPrefix());
+ assertNotNull(blobItem.getProperties());
+ assertEquals(DATA.getDefaultDataSize(), blobItem.getProperties().getContentLength());
+ assertEquals(BlobType.BLOCK_BLOB, blobItem.getProperties().getBlobType());
+ assertNotNull(blobItem.getProperties().getLastModified());
+ assertNotNull(blobItem.getProperties().getETag());
+ }).verifyComplete();
+ }
+
+ @Test
+ @RequiredServiceVersion(clazz = BlobServiceVersion.class, min = "2026-06-06")
+ public void listBlobsByHierarchyArrowWithMetadata() {
+ String blobName = generateBlobName();
+ Map
> result
+ = uploads.then(ccAsync.listBlobsByHierarchy("/", options).byPage().doOnNext(page -> {
+ assertTrue(page.getValue().size() <= 1);
+ }).flatMap(page -> Flux.fromIterable(page.getValue())).collectList());
+
+ // 3 prefixes + 1 blob = 4 items
+ StepVerifier.create(result).assertNext(allItems -> assertEquals(4, allItems.size())).verifyComplete();
+ }
+
+ @Test
+ @RequiredServiceVersion(clazz = BlobServiceVersion.class, min = "2026-06-06")
+ public void listBlobsArrowWithTags() {
+ // Upload a blob and set tags
+ String blobName = generateBlobName();
+ BlockBlobAsyncClient bc = ccAsync.getBlobAsyncClient(blobName).getBlockBlobAsyncClient();
+
+ Map