diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 91226bd08ee3..bc43f6124961 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "pr": "38069", + "pr": "37345", "modification": 41 -} +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java index 658d1fc29e32..01e258f5f3cb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java @@ -53,6 +53,7 @@ public static class Configuration { private String topic; private @Nullable String idAttribute; private @Nullable String timestampAttribute; + private boolean publishWithOrderingKey = false; public void setTopic(String topic) { this.topic = topic; @@ -65,6 +66,10 @@ public void setIdLabel(@Nullable String idAttribute) { public void setTimestampAttribute(@Nullable String timestampAttribute) { this.timestampAttribute = timestampAttribute; } + + public void setPublishWithOrderingKey(Boolean publishWithOrderingKey) { + this.publishWithOrderingKey = publishWithOrderingKey != null && publishWithOrderingKey; + } } public static class WriteBuilder @@ -85,6 +90,9 @@ public PTransform, PDone> buildExternal(Configuration config if (config.timestampAttribute != null) { writeBuilder.setTimestampAttribute(config.timestampAttribute); } + if (config.publishWithOrderingKey) { + writeBuilder.setPublishWithOrderingKey(true); + } writeBuilder.setDynamicDestinations(false); return writeBuilder.build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index d62d294ed2a7..57005745044b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -1727,7 +1727,10 @@ public void startBundle(StartBundleContext c) throws IOException { this.pubsubClient = getPubsubClientFactory() .newClient( - getTimestampAttribute(), null, c.getPipelineOptions().as(PubsubOptions.class)); + getTimestampAttribute(), + null, + c.getPipelineOptions().as(PubsubOptions.class), + Write.this.getPubsubRootUrl()); } @ProcessElement diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py index a2a3430f9a1a..3125c0422275 100644 --- a/sdks/python/apache_beam/io/external/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py @@ -117,6 +117,7 @@ def expand(self, pbegin): # this is not implemented yet on the Java side: # ('with_attributes', bool), ('timestamp_attribute', typing.Optional[str]), + ('publish_with_ordering_key', bool), ]) @@ -135,6 +136,7 @@ def __init__( with_attributes=False, id_label=None, timestamp_attribute=None, + publish_with_ordering_key=False, expansion_service=None): """Initializes ``WriteToPubSub``. @@ -150,18 +152,23 @@ def __init__( in a ReadFromPubSub PTransform to deduplicate messages. timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub message with the given name and the message's publish time as the value. + publish_with_ordering_key: If True, enables ordering key support when + publishing messages. The ordering key must be set on each + PubsubMessage via the ``ordering_key`` attribute. """ self.params = WriteToPubsubSchema( topic=topic, id_label=id_label, # with_attributes=with_attributes, - timestamp_attribute=timestamp_attribute) + timestamp_attribute=timestamp_attribute, + publish_with_ordering_key=publish_with_ordering_key) self.expansion_service = expansion_service self.with_attributes = with_attributes def expand(self, pvalue): if self.with_attributes: - pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str) + pcoll = pvalue | 'ToProto' >> Map( + pubsub.WriteToPubSub.message_to_proto_str) else: pcoll = pvalue | 'ToProto' >> Map( lambda x: pubsub.PubsubMessage(x, {})._to_proto_str()) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 276103f52760..1fcdd45bc6e2 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -31,8 +31,9 @@ """ # pytype: skip-file - +import logging import re +import time from typing import Any from typing import NamedTuple from typing import Optional @@ -430,7 +431,16 @@ def bytes_to_proto_str(element: Union[bytes, str]) -> bytes: def expand(self, pcoll): # Store pipeline options for use in DoFn self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None - + # Warn Dataflow users to use the XLang path for ordering key support, + # since _PubSubWriteDoFn._flush() is not used by Dataflow's implementation. + runner = self.pipeline_options.get_all_options().get( + 'runner', '') if self.pipeline_options else '' + if 'Dataflow' in str(runner): + logging.warning( + 'WriteToPubSub ordering_key support is not available on Dataflow ' + 'via this transform. Use the XLang WriteToPubSub path instead: ' + 'apache_beam.io.external.gcp.pubsub.WriteToPubSub with ' + 'publish_with_ordering_key=True.') if self.with_attributes: pcoll = pcoll | 'ToProtobufX' >> ParDo( _AddMetricsAndMap( @@ -597,7 +607,7 @@ def __init__(self, transform): output_labels_supported = False # Log debug information for troubleshooting - import logging + runner_info = getattr( pipeline_options, 'runner', 'None') if pipeline_options else 'No options' @@ -628,7 +638,10 @@ def __init__(self, transform): def setup(self): from google.cloud import pubsub - self._pub_client = pubsub.PublisherClient() + self._pub_client = pubsub.PublisherClient( + publisher_options=pubsub.types.PublisherOptions( + enable_message_ordering=True, + )) self._topic = self._pub_client.topic_path( self.project, self.short_topic_name) @@ -647,8 +660,6 @@ def _flush(self): if not self._buffer: return - import time - # The elements in buffer are serialized protobuf bytes from the previous # transforms. We need to deserialize them to extract data and attributes. futures = [] @@ -656,12 +667,22 @@ def _flush(self): # Deserialize the protobuf to get the original PubsubMessage pubsub_msg = PubsubMessage._from_proto_str(elem) - # Publish with the correct data and attributes + # Publish with the correct data, attributes, and ordering_key if self.with_attributes and pubsub_msg.attributes: future = self._pub_client.publish( - self._topic, pubsub_msg.data, **pubsub_msg.attributes) + self._topic, + pubsub_msg.data, + ordering_key=pubsub_msg.ordering_key + if pubsub_msg.ordering_key else '', + **pubsub_msg.attributes) else: - future = self._pub_client.publish(self._topic, pubsub_msg.data) + if pubsub_msg.ordering_key: + future = self._pub_client.publish( + self._topic, + pubsub_msg.data, + ordering_key=pubsub_msg.ordering_key) + else: + future = self._pub_client.publish(self._topic, pubsub_msg.data) futures.append(future) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index 8387fe734fc1..533f9ae430e6 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -305,6 +305,96 @@ def test_batch_write_with_attributes(self): """Test WriteToPubSub in batch mode with attributes.""" self._test_batch_write(with_attributes=True) + @pytest.mark.it_postcommit + def test_batch_write_with_ordering_key(self): + """Test WriteToPubSub in batch mode with ordering keys. + + Dataflow's Native Pub/Sub Sink does not support ordering_key + (see https://github.com/apache/beam/issues/36201), so this test + only applies to runners using Beam's Python WriteToPubSub Sink. + Dataflow users should use the XLang WriteToPubSub path instead + (apache_beam.io.external.gcp.pubsub.WriteToPubSub with + publish_with_ordering_key=True). + """ + if self.runner_name == 'TestDataflowRunner': + self.skipTest( + 'Dataflow Native PubSub Sink does not support ordering_key ' + '(see https://github.com/apache/beam/issues/36201). ' + 'Use apache_beam.io.external.gcp.pubsub.WriteToPubSub ' + 'with publish_with_ordering_key=True instead.') + from google.pubsub_v1.types import Subscription + + from apache_beam.options.pipeline_options import PipelineOptions + from apache_beam.options.pipeline_options import StandardOptions + from apache_beam.transforms import Create + + ordering_topic = self.pub_client.create_topic( + name=self.pub_client.topic_path( + self.project, 'psit_topic_ordering' + self.uuid)) + ordering_sub = self.sub_client.create_subscription( + request=Subscription( + name=self.sub_client.subscription_path( + self.project, 'psit_sub_ordering' + self.uuid), + topic=ordering_topic.name, + enable_message_ordering=True, + )) + time.sleep(10) + + try: + test_messages = [ + PubsubMessage( + b'order_data001', {'attr': 'value1'}, ordering_key='key1'), + PubsubMessage( + b'order_data002', {'attr': 'value2'}, ordering_key='key1'), + PubsubMessage( + b'order_data003', {'attr': 'value3'}, ordering_key='key2'), + ] + + pipeline_options = PipelineOptions() + pipeline_options.view_as(StandardOptions).streaming = False + + with TestPipeline(options=pipeline_options) as p: + messages = p | 'CreateMessages' >> Create(test_messages) + _ = messages | 'WriteToPubSub' >> WriteToPubSub( + ordering_topic.name, with_attributes=True) + + time.sleep(10) + + # Retry pulling to handle PubSub delivery delays + received_messages = [] + deadline = time.time() + 60 # wait up to 60 seconds + while time.time() < deadline: + response = self.sub_client.pull( + request={ + 'subscription': ordering_sub.name, + 'max_messages': 10, + }) + received_messages.extend(response.received_messages) + if len(received_messages) >= len(test_messages): + break + time.sleep(5) + + self.assertEqual(len(received_messages), len(test_messages)) + + received_map = { + msg.message.data: msg.message + for msg in received_messages + } + self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1') + self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1') + self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2') + + ack_ids = [msg.ack_id for msg in received_messages] + self.sub_client.acknowledge( + request={ + 'subscription': ordering_sub.name, + 'ack_ids': ack_ids, + }) + finally: + self.sub_client.delete_subscription( + request={'subscription': ordering_sub.name}) + self.pub_client.delete_topic(request={'topic': ordering_topic.name}) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 5650e920e635..14b361ae45fa 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -1098,6 +1098,71 @@ def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock): Lineage.query(p.result.metrics(), Lineage.SINK), set(["pubsub:topic:fakeprj.a_topic"])) + def test_write_messages_with_ordering_key(self, mock_pubsub): + """Test WriteToPubSub with ordering_key in messages.""" + data = b'data' + ordering_key = 'order-123' + attributes = {'key': 'value'} + payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)] + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called with ordering_key + mock_pubsub.return_value.publish.assert_called() + call_args = mock_pubsub.return_value.publish.call_args + + # Check that ordering_key was passed as a keyword argument + self.assertIn('ordering_key', call_args.kwargs) + self.assertEqual(call_args.kwargs['ordering_key'], ordering_key) + + def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub): + """Test WriteToPubSub with ordering_key but no attributes.""" + data = b'data' + ordering_key = 'order-456' + payloads = [PubsubMessage(data, None, ordering_key=ordering_key)] + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called with ordering_key + mock_pubsub.return_value.publish.assert_called() + call_args = mock_pubsub.return_value.publish.call_args + + # Check that ordering_key was passed + self.assertIn('ordering_key', call_args.kwargs) + self.assertEqual(call_args.kwargs['ordering_key'], ordering_key) + + def test_write_messages_without_ordering_key(self, mock_pubsub): + """Test WriteToPubSub without ordering_key (backward compatibility).""" + data = b'data' + attributes = {'key': 'value'} + payloads = [PubsubMessage(data, attributes)] # No ordering_key + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called + mock_pubsub.return_value.publish.assert_called() + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)