diff --git a/composer.json b/composer.json index 949154d4e..cdd88b0b3 100644 --- a/composer.json +++ b/composer.json @@ -413,7 +413,8 @@ }, "drupal/data_pipelines_elasticsearch-data_pipelines_elasticsearch": { "Complete the fix for bulk indexing on Elasticsearch endpoint - https://www.drupal.org/project/data_pipelines_elasticsearch/issues/3540879": "https://www.drupal.org/files/issues/2025-08-12/3511558-Change-bulk-deleting-on-Elasticsearch-endpoint.patch", - "False positive mappings defined at top level error triggered unconditionally in ElasticSearchDatasetPipeline constructor - https://www.drupal.org/project/data_pipelines_elasticsearch/issues/3541059#comment-16226097": "https://www.drupal.org/files/issues/2025-08-13/3541059-fix-elasticsearch-mapping-warning-in-constructor.patch" + "False positive mappings defined at top level error triggered unconditionally in ElasticSearchDatasetPipeline constructor - https://www.drupal.org/project/data_pipelines_elasticsearch/issues/3541059#comment-16226097": "https://www.drupal.org/files/issues/2025-08-13/3541059-fix-elasticsearch-mapping-warning-in-constructor.patch", + "Allow datasets to define a custom document `_id` instead of the positional delta": "https://www.drupal.org/files/issues/2026-06-17/allow_external_id_datasets_3601849_2.patch" }, "drupal/paragraphs": { "Add default paragraph count setting - https://www.drupal.org/project/paragraphs/issues/3089423#comment-14517270": "https://www.drupal.org/files/issues/2022-05-17/paragraphs-default-quantity-3089423-18.patch", diff --git a/docker-compose.yml b/docker-compose.yml index 5ea841ded..0770e3d1a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,8 +3,6 @@ # - Using a single file to work in local, CI and production environments. # - Local overrides are possible using docker-composer.override.yml file. # - Use inline comments starting with ### to have the line removed in CI. -version: '2.3' - x-bay-image-version: &bay-image-version ${BAY_IMAGE_VERSION:-6.x} diff --git a/modules/tide_data_pipeline_json_endpoint/README.md b/modules/tide_data_pipeline_json_endpoint/README.md new file mode 100644 index 000000000..7853867c0 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/README.md @@ -0,0 +1,226 @@ +# Tide Data Pipeline JSON Endpoint + +Provides a `json_endpoint` dataset source for the `data_pipelines` module. External systems push JSON payloads to a Drupal REST endpoint, which stores the data and optionally triggers immediate reprocessing of the dataset. + +## How it works + +1. An external system authenticates with Drupal using OAuth 2.0 (client credentials flow) to obtain a short-lived bearer token. +2. It POSTs a JSON payload to `/api/datasets/{machine_name}/push`. +3. Drupal saves the payload to the private filesystem and, by default, synchronously reprocesses the dataset. + +--- + +## Requirements + +- `data_pipelines` module +- `consumers` module +- `simple_oauth` module (provides the OAuth 2.0 token endpoint) +- `tide_oauth` module (provides the authentication provider that validates bearer tokens) +- A configured private filesystem (`$settings['file_private_path']` in `settings.php`) +- OAuth public/private keys generated (run `drush tide-oauth:keygen`) + +--- + +## Installation + +Enable the module: + +```bash +drush en tide_data_pipeline_json_endpoint +``` + +On install the module creates: + +- A **`data_pipeline_pusher` role** with the single permission `push data pipeline json endpoint`. +- A **`Data Pipeline Pusher` OAuth consumer** (`client_id: data_pipeline_pusher`) wired to that role. The consumer is created as confidential — it cannot issue tokens until a client secret is set (see [OAuth set up](#oauth-set-up) below). + +--- + +## OAuth set up + +### 1. Ensure OAuth keys exist + +If you have not already generated OAuth keys, run: + +```bash +drush tide-oauth:keygen +``` + +### 2. Set the consumer client secret + +The consumer is created without a secret so that no credential is ever stored in code. You must set one before the consumer can issue tokens. + +1. Go to **Admin > Configuration > Web services > Consumers** (`/admin/config/services/consumer`). +2. Open the **Data Pipeline Pusher** consumer. +3. Enter a strong random value in the **New Secret** field and save. + +Store the secret securely (e.g. in a secrets manager or CI/CD environment variable). Drupal stores only a bcrypt hash — the plaintext is never recoverable from the database. + +### 3. Verify the token endpoint + +Confirm that `/oauth/token` is accessible and returns a token: + +```bash +curl -s -X POST https://your-site.com/oauth/token \ + -d "grant_type=client_credentials" \ + -d "client_id=data_pipeline_pusher" \ + -d "client_secret=YOUR_CLIENT_SECRET" \ + | jq . +``` + +A successful response: + +```json +{ + "token_type": "Bearer", + "expires_in": 300, + "access_token": "eyJ0eXAiOiJKV1QiLCJhb..." +} +``` + +--- + +## Creating a dataset + +Datasets are content entities managed at **Admin > Content > Data Pipelines**. + +### Via the admin UI + +1. Go to `/admin/content/data-pipelines/add`. +2. Set **Source** to **JSON Endpoint**. +3. Enter a **Machine name** — this becomes the `{machine_name}` segment in the push URL. +4. Optionally set **Path to data** if your payload wraps the records in a nested key (see [Path to data](#path-to-data)). +5. Configure your pipeline and destination as normal. +6. **Publish** the dataset. Unpublished datasets reject push requests with `422`. + +### Via Drush / API + +```php +$dataset = \Drupal\data_pipelines\Entity\Dataset::create([ + 'name' => 'Suburbs', + 'machine_name' => 'suburbs', + 'source' => 'json_endpoint', + 'pipeline' => 'my_pipeline', + 'published' => TRUE, + 'destinations' => [$destination], +]); +$dataset->save(); +``` + +### Path to data + +If your payload nests records under a key rather than being a top-level array, use the **Path to data** field to provide a [JSONPath](https://github.com/SoftCreatR/JSONPath) expression. + +| Payload shape | Path to data | +|---|---| +| `[{"id":1}, ...]` (top-level array) | *(leave empty)* | +| `{"data": [{"id":1}, ...]}` | `$.data` | +| `{"results": {"items": [...]}}` | `$.results.items` | + +--- + +## Pushing data + +### Endpoint + +``` +POST /api/datasets/{machine_name}/push +``` + +| Header | Value | +|---|---| +| `Authorization` | `Bearer ` | +| `Content-Type` | `application/json` | + +### Modes + +| Query string | Behaviour | +|---|---| +| *(none)* | Save the payload **and** immediately reprocess the dataset synchronously. | +| `?save_only=1` | Save the payload only. Reprocessing is deferred to the next scheduled run or a manual trigger. | + +### Response codes + +| Code | Meaning | +|---|---| +| `200` | Success. Body is `{"status":"processed","machine_name":"..."}` or `{"status":"saved","machine_name":"..."}`. | +| `400` | Request body is not valid JSON. | +| `401` | Missing or invalid bearer token. | +| `403` | Token is valid but the associated user lacks the `push data pipeline json endpoint` permission. | +| `404` | No published `json_endpoint` dataset with that machine name exists. | +| `415` | `Content-Type` is not `application/json`. | +| `422` | The dataset exists but is not published. | + +--- + +## Example curl + +The following example pushes a list of suburb records to a dataset with the machine name `suburbs`. + +### Step 1 — obtain a token + +```bash +TOKEN=$(curl -s -X POST https://your-site.com/oauth/token \ + -d "grant_type=client_credentials" \ + -d "client_id=data_pipeline_pusher" \ + -d "client_secret=YOUR_CLIENT_SECRET" \ + | jq -r '.access_token') +``` + +### Step 2 — push data + +```bash +curl -s -X POST https://your-site.com/api/datasets/suburbs/push \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '[ + {"id": 1, "name": "Carlton", "postcode": "3053"}, + {"id": 2, "name": "Fitzroy", "postcode": "3065"}, + {"id": 3, "name": "Collingwood", "postcode": "3066"} + ]' \ + | jq . +``` + +Expected response: + +```json +{ + "status": "processed", + "machine_name": "suburbs" +} +``` + +### Save only (defer processing) + +```bash +curl -s -X POST "https://your-site.com/api/datasets/suburbs/push?save_only=1" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '[{"id": 1, "name": "Carlton", "postcode": "3053"}]' \ + | jq . +``` + +```json +{ + "status": "saved", + "machine_name": "suburbs" +} +``` + +--- + +## Payload storage + +Each push overwrites the previous payload. The file is stored at: + +``` +private://data_pipelines_json_endpoint/{machine_name}.json +``` + +This path is inside Drupal's private filesystem and is not publicly accessible. + +--- + +## Token expiry + +Access tokens are short-lived (default 5 minutes in `simple_oauth`). Clients should request a new token when the current one is near expiry rather than reusing a cached token indefinitely. diff --git a/modules/tide_data_pipeline_json_endpoint/config/install/user.role.data_pipeline_pusher.yml b/modules/tide_data_pipeline_json_endpoint/config/install/user.role.data_pipeline_pusher.yml new file mode 100644 index 000000000..482cba870 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/config/install/user.role.data_pipeline_pusher.yml @@ -0,0 +1,9 @@ +langcode: en +status: true +dependencies: {} +id: data_pipeline_pusher +label: 'Data Pipeline Pusher' +weight: 100 +is_admin: null +permissions: + - 'push data pipeline json endpoint' diff --git a/modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php b/modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php new file mode 100644 index 000000000..f9d9cce08 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/src/Controller/DatasetPushController.php @@ -0,0 +1,88 @@ +get('file_system')); + } + + /** + * Handles an authenticated JSON payload push for a dataset. + */ + public function push(Request $request, string $machine_name): JsonResponse { + if (!str_contains($request->headers->get('Content-Type', ''), 'application/json')) { + return new JsonResponse(['error' => 'Content-Type must be application/json.'], 415); + } + + $datasets = $this->entityTypeManager()->getStorage('data_pipelines') + ->loadByProperties(['machine_name' => $machine_name, 'source' => 'json_endpoint']); + + if (empty($datasets)) { + return new JsonResponse(['error' => 'Dataset not found.'], 404); + } + + /** @var \Drupal\data_pipelines\Entity\DatasetInterface $dataset */ + $dataset = reset($datasets); + + if (!$dataset->isPublished()) { + return new JsonResponse(['error' => 'Dataset is not published.'], 422); + } + + $body = $request->getContent(); + try { + json_decode($body, flags: JSON_THROW_ON_ERROR); + } + catch (\JsonException $e) { + return new JsonResponse(['error' => 'Invalid JSON: ' . $e->getMessage()], 400); + } + + $directory_uri = sprintf('%s://%s', JsonEndpointSource::STORAGE_SCHEME, JsonEndpointSource::STORAGE_DIR); + $this->fileSystem->prepareDirectory($directory_uri, FileSystemInterface::CREATE_DIRECTORY | FileSystemInterface::MODIFY_PERMISSIONS); + $this->fileSystem->saveData($body, JsonEndpointSource::buildStorageUri($machine_name), FileSystemInterface::EXISTS_REPLACE); + + if ($request->query->getBoolean('save_only')) { + return new JsonResponse(['status' => 'saved', 'machine_name' => $machine_name]); + } + + $dataset_id = (int) $dataset->id(); + + $context = ['sandbox' => [], 'finished' => 0, 'results' => [], 'message' => '']; + do { + DatasetBatchOperations::operationQueueItem($dataset_id, $context); + } while ($context['finished'] < 1); + + $context = ['sandbox' => [], 'finished' => 0, 'results' => [], 'message' => '']; + do { + DatasetBatchOperations::operationProcess($dataset_id, $context); + } while ($context['finished'] < 1); + + return new JsonResponse(['status' => 'processed', 'machine_name' => $machine_name]); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php b/modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php new file mode 100644 index 000000000..8534e2746 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/src/Plugin/DatasetSource/JsonEndpointSource.php @@ -0,0 +1,130 @@ +get('file_system'), + $container->get('logger.channel.data_pipelines'), + ); + } + + /** + * {@inheritdoc} + */ + public function buildFieldDefinitions(): array { + return [ + 'json_endpoint_path_to_data' => BaseFieldDefinition::create('string') + ->setLabel(new TranslatableMarkup('Path to data')) + ->setDescription(new TranslatableMarkup("Optional JSON path expression to select a sub-array from the received payload.", [ + '@link' => 'https://github.com/SoftCreatR/JSONPath', + ])) + ->setSetting('max_length', 255) + ->setDisplayOptions('form', ['type' => 'string_textfield']) + ->setPropertyConstraints('value', ['JsonPath' => []]), + ]; + } + + /** + * {@inheritdoc} + */ + public function extractDataFromDataSet(DatasetInterface $dataset): \Generator { + $machine_name = $dataset->get('machine_name')->value; + $file_uri = static::buildStorageUri($machine_name); + $real_path = $this->fileSystem->realpath($file_uri); + + if (!$real_path || !file_exists($real_path)) { + $this->logger->warning('No payload file found for dataset @name at @uri.', [ + '@name' => $machine_name, + '@uri' => $file_uri, + ]); + return; + } + + try { + $json = json_decode(file_get_contents($real_path), TRUE, 512, JSON_THROW_ON_ERROR); + + $json_path = self::getFieldValue($dataset, 'json_endpoint_path_to_data'); + if (!empty($json_path)) { + $json = $this->createJsonPath($json)->find($json_path)->getData(); + } + + if (is_array($json) && array_values($json) === $json) { + foreach ($json as $record) { + yield new DatasetData(is_array($record) ? $record : [$record]); + } + } + else { + yield new DatasetData($json); + } + } + catch (\Exception $e) { + $this->logger->critical('Failed to process JSON payload for dataset @name: @message', [ + '@name' => $machine_name, + '@message' => $e->getMessage(), + ]); + } + } + + /** + * Returns the private filesystem URI for a dataset's stored JSON payload. + */ + public static function buildStorageUri(string $machine_name): string { + return sprintf('%s://%s/%s.json', static::STORAGE_SCHEME, static::STORAGE_DIR, $machine_name); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.data_pipelines.yml b/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.data_pipelines.yml new file mode 100644 index 000000000..5774fad46 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.data_pipelines.yml @@ -0,0 +1,2 @@ +test_json_endpoint_pipeline: + label: 'Test JSON Endpoint Pipeline' diff --git a/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.info.yml b/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.info.yml new file mode 100644 index 000000000..b42dfff2d --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/modules/tide_data_pipeline_json_endpoint_test/tide_data_pipeline_json_endpoint_test.info.yml @@ -0,0 +1,8 @@ +name: 'Tide Data Pipeline JSON Endpoint Test' +type: module +description: 'Test fixtures and pipelines for tide_data_pipeline_json_endpoint.' +core_version_requirement: ^10 || ^11 +package: Testing +dependencies: + - data_pipelines:data_pipelines + - tide_core:tide_data_pipeline_json_endpoint diff --git a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php new file mode 100644 index 000000000..4a6723d11 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/DatasetPushControllerTest.php @@ -0,0 +1,291 @@ +privatePath = $this->siteDirectory . '/private'; + mkdir($this->privatePath, 0777, TRUE); + $this->setSetting('file_private_path', $this->privatePath); + $this->installEntitySchema('user'); + $this->installEntitySchema('file'); + $this->installEntitySchema('data_pipelines'); + $this->installSchema('file', ['file_usage']); + $this->setUpCurrentUser(); + BypassFinals::enable(FALSE); + } + + /** + * Returns an instantiated controller with the test container. + */ + private function controller(): DatasetPushController { + return DatasetPushController::create($this->container); + } + + /** + * Builds a POST request with JSON content type. + */ + private function jsonRequest(mixed $body): Request { + return Request::create( + '/', + 'POST', + content: is_string($body) ? $body : json_encode($body), + server: ['CONTENT_TYPE' => 'application/json'], + ); + } + + /** + * Creates and saves a published json_endpoint dataset with a state dest. + */ + private function createPublishedDataset(string $machine_name): Dataset { + $destination = Destination::create([ + 'id' => $machine_name . '_dest', + 'label' => 'Test destination', + 'destination' => 'state', + 'destinationSettings' => ['state_key' => 'test_push_result'], + ]); + $destination->save(); + + $dataset = Dataset::create([ + 'name' => $machine_name, + 'machine_name' => $machine_name, + 'source' => 'json_endpoint', + 'pipeline' => 'test_json_endpoint_pipeline', + 'published' => TRUE, + 'destinations' => [$destination], + ]); + $dataset->save(); + assert($dataset instanceof Dataset); + return $dataset; + } + + /** + * Tests that a valid push returns 200 and processes the dataset. + */ + public function testPushProcessesDatasetAndReturns200(): void { + $machine_name = 'push_success'; + $this->createPublishedDataset($machine_name); + $payload = [['suburb' => 'Carlton'], ['suburb' => 'Fitzroy']]; + + $response = $this->controller()->push($this->jsonRequest($payload), $machine_name); + + $this->assertSame(200, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertSame('processed', $body['status']); + $this->assertSame($machine_name, $body['machine_name']); + } + + /** + * Tests that ?save_only=1 saves the file but skips reprocessing. + */ + public function testSaveOnlyReturnsSavedStatusWithoutProcessing(): void { + $machine_name = 'push_save_only'; + $this->createPublishedDataset($machine_name); + $payload = [['id' => 1]]; + + $request = $this->jsonRequest($payload); + $request->query->set('save_only', '1'); + $response = $this->controller()->push($request, $machine_name); + + $this->assertSame(200, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertSame('saved', $body['status']); + + // Confirm the file was written. + $file_path = $this->privatePath . '/' . JsonEndpointSource::STORAGE_DIR . '/' . $machine_name . '.json'; + $this->assertFileExists($file_path); + $this->assertSame($payload, json_decode(file_get_contents($file_path), TRUE)); + + // Confirm no processing occurred (state destination was never written to). + $this->assertNull(\Drupal::state()->get('test_push_result')); + } + + /** + * Tests that a save_only push followed by manual reprocessing works. + */ + public function testSaveOnlyThenManualReprocessProducesExpectedData(): void { + $machine_name = 'push_then_reprocess'; + $dataset = $this->createPublishedDataset($machine_name); + $payload = [['name' => 'Station A'], ['name' => 'Station B']]; + + // Step 1: save only. + $request = $this->jsonRequest($payload); + $request->query->set('save_only', '1'); + $this->controller()->push($request, $machine_name); + + // Step 2: trigger reprocessing via the batch operations directly. + $dataset_id = (int) $dataset->id(); + $context = ['sandbox' => [], 'finished' => 0, 'results' => [], 'message' => '']; + do { + DatasetBatchOperations::operationQueueItem($dataset_id, $context); + } while ($context['finished'] !== 1); + + $context = ['sandbox' => [], 'finished' => 0, 'results' => [], 'message' => '']; + do { + DatasetBatchOperations::operationProcess($dataset_id, $context); + } while ($context['finished'] !== 1); + + $stored = \Drupal::state()->get('test_push_result'); + $this->assertNotNull($stored); + } + + /** + * Tests that a non-JSON Content-Type returns 415. + */ + public function testPushReturns415ForNonJsonContentType(): void { + $machine_name = 'push_415'; + $this->createPublishedDataset($machine_name); + + $request = Request::create('/', 'POST', content: 'some data', server: ['CONTENT_TYPE' => 'text/plain']); + $response = $this->controller()->push($request, $machine_name); + + $this->assertSame(415, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertArrayHasKey('error', $body); + } + + /** + * Tests that a malformed JSON body returns 400. + */ + public function testPushReturns400ForInvalidJson(): void { + $machine_name = 'push_400'; + $this->createPublishedDataset($machine_name); + + $response = $this->controller()->push( + Request::create('/', 'POST', content: '{not: valid json', server: ['CONTENT_TYPE' => 'application/json']), + $machine_name + ); + + $this->assertSame(400, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertStringContainsString('Invalid JSON', $body['error']); + } + + /** + * Tests that pushing to an unknown machine name returns 404. + */ + public function testPushReturns404ForUnknownDataset(): void { + $response = $this->controller()->push( + $this->jsonRequest(['data' => 'value']), + 'this_dataset_does_not_exist' + ); + + $this->assertSame(404, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertArrayHasKey('error', $body); + } + + /** + * Tests that pushing to an unpublished dataset returns 422. + */ + public function testPushReturns422ForUnpublishedDataset(): void { + $machine_name = 'push_unpublished'; + $dataset = Dataset::create([ + 'name' => $machine_name, + 'machine_name' => $machine_name, + 'source' => 'json_endpoint', + 'pipeline' => 'test_json_endpoint_pipeline', + 'published' => FALSE, + ]); + $dataset->save(); + + $response = $this->controller()->push($this->jsonRequest(['data' => 'value']), $machine_name); + + $this->assertSame(422, $response->getStatusCode()); + $body = json_decode($response->getContent(), TRUE); + $this->assertArrayHasKey('error', $body); + } + + /** + * Tests that pushing to a non-json_endpoint source dataset returns 404. + * + * A dataset with a different source type must not be reachable via this + * endpoint even if the machine name matches. + */ + public function testPushReturns404ForDatasetWithDifferentSource(): void { + $name = mb_strtolower($this->randomMachineName()); + $dataset = Dataset::create([ + 'name' => $name, + 'machine_name' => $name, + 'source' => 'csv:text', + 'pipeline' => 'test_pipeline_1', + 'csv_text' => "a,b\n1,2", + ]); + $dataset->save(); + + $response = $this->controller()->push($this->jsonRequest(['data' => 'value']), $name); + + $this->assertSame(404, $response->getStatusCode()); + } + + /** + * Tests that a successful push overwrites any previously stored payload. + */ + public function testPushOverwritesPreviousPayload(): void { + $machine_name = 'push_overwrite'; + $this->createPublishedDataset($machine_name); + $file_path = $this->privatePath . '/' . JsonEndpointSource::STORAGE_DIR . '/' . $machine_name . '.json'; + + $request1 = $this->jsonRequest([['v' => 'first']]); + $request1->query->set('save_only', '1'); + $this->controller()->push($request1, $machine_name); + $this->assertSame([['v' => 'first']], json_decode(file_get_contents($file_path), TRUE)); + + $request2 = $this->jsonRequest([['v' => 'second']]); + $request2->query->set('save_only', '1'); + $this->controller()->push($request2, $machine_name); + $this->assertSame([['v' => 'second']], json_decode(file_get_contents($file_path), TRUE)); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php new file mode 100644 index 000000000..808783762 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/src/Kernel/JsonEndpointSourceKernelTest.php @@ -0,0 +1,207 @@ +privatePath = $this->siteDirectory . '/private'; + mkdir($this->privatePath, 0777, TRUE); + $this->setSetting('file_private_path', $this->privatePath); + $this->installEntitySchema('user'); + $this->installEntitySchema('file'); + $this->installEntitySchema('data_pipelines'); + $this->installSchema('file', ['file_usage']); + $this->setUpCurrentUser(); + BypassFinals::enable(FALSE); + } + + /** + * Returns the json_endpoint source plugin instance. + */ + private function getPlugin(): JsonEndpointSource { + $manager = \Drupal::service('plugin.manager.data_pipelines_source'); + assert($manager instanceof DatasetSourcePluginManager); + $plugin = $manager->createInstance('json_endpoint'); + assert($plugin instanceof JsonEndpointSource); + return $plugin; + } + + /** + * Creates an unsaved Dataset entity with the json_endpoint source. + */ + private function createDataset(string $machine_name, array $extra = []): Dataset { + $dataset = Dataset::create([ + 'name' => $machine_name, + 'machine_name' => $machine_name, + 'source' => 'json_endpoint', + 'pipeline' => 'test_json_endpoint_pipeline', + ] + $extra); + assert($dataset instanceof Dataset); + return $dataset; + } + + /** + * Writes a JSON payload to the private filesystem for a given machine name. + */ + private function writePayload(string $machine_name, mixed $data): void { + $storage_dir = $this->privatePath . '/' . JsonEndpointSource::STORAGE_DIR; + if (!is_dir($storage_dir)) { + mkdir($storage_dir, 0777, TRUE); + } + file_put_contents($storage_dir . '/' . $machine_name . '.json', json_encode($data)); + } + + /** + * Tests that an array-of-objects payload yields one DatasetData per element. + */ + public function testExtractArrayOfObjectsPayload(): void { + $machine_name = 'test_array_objects'; + $this->writePayload($machine_name, [ + ['name' => 'Station A', 'suburb' => 'Carlton'], + ['name' => 'Station B', 'suburb' => 'Fitzroy'], + ]); + + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($this->createDataset($machine_name))); + + $this->assertEquals([ + new DatasetData(['name' => 'Station A', 'suburb' => 'Carlton']), + new DatasetData(['name' => 'Station B', 'suburb' => 'Fitzroy']), + ], $result); + } + + /** + * Tests that a root JSON object yields a single DatasetData. + */ + public function testExtractObjectPayload(): void { + $machine_name = 'test_object'; + $this->writePayload($machine_name, ['key1' => 'value1', 'key2' => 'value2']); + + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($this->createDataset($machine_name))); + + $this->assertEquals([ + new DatasetData(['key1' => 'value1', 'key2' => 'value2']), + ], $result); + } + + /** + * Tests that an array of scalar values wraps each value in a DatasetData. + */ + public function testExtractArrayOfScalarsPayload(): void { + $machine_name = 'test_scalars'; + $this->writePayload($machine_name, ['alpha', 'beta', 'gamma']); + + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($this->createDataset($machine_name))); + + $this->assertEquals([ + new DatasetData(['alpha']), + new DatasetData(['beta']), + new DatasetData(['gamma']), + ], $result); + } + + /** + * Tests that the json_endpoint_path_to_data field applies a JSONPath filter. + */ + public function testExtractWithJsonPath(): void { + $machine_name = 'test_jsonpath'; + $this->writePayload($machine_name, [ + 'meta' => ['total' => 2], + 'records' => [ + ['id' => 1, 'title' => 'First'], + ['id' => 2, 'title' => 'Second'], + ], + ]); + + $dataset = $this->createDataset($machine_name, ['json_endpoint_path_to_data' => '$.records']); + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($dataset)); + + $this->assertEquals([ + new DatasetData(['id' => 1, 'title' => 'First']), + new DatasetData(['id' => 2, 'title' => 'Second']), + ], $result); + } + + /** + * Tests that extraction returns empty when no payload file exists. + */ + public function testExtractReturnsEmptyWhenNoFileExists(): void { + $result = iterator_to_array( + $this->getPlugin()->extractDataFromDataSet($this->createDataset('nonexistent_dataset')) + ); + + $this->assertEmpty($result); + } + + /** + * Tests that extraction returns empty when the stored file has invalid JSON. + */ + public function testExtractReturnsEmptyOnInvalidJson(): void { + $machine_name = 'test_invalid_json'; + $storage_dir = $this->privatePath . '/' . JsonEndpointSource::STORAGE_DIR; + mkdir($storage_dir, 0777, TRUE); + file_put_contents($storage_dir . '/' . $machine_name . '.json', '{not: valid json{{{'); + + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($this->createDataset($machine_name))); + + $this->assertEmpty($result); + } + + /** + * Tests that a single-record payload is processed as one DatasetData. + */ + public function testExtractSingleRecordPayload(): void { + $machine_name = 'test_single'; + $this->writePayload($machine_name, [['station' => 'CBD', 'phone' => '9999-9999']]); + + $result = iterator_to_array($this->getPlugin()->extractDataFromDataSet($this->createDataset($machine_name))); + + $this->assertCount(1, $result); + $this->assertEquals(new DatasetData(['station' => 'CBD', 'phone' => '9999-9999']), $result[0]); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/tests/src/Unit/Plugin/DatasetSource/JsonEndpointSourceTest.php b/modules/tide_data_pipeline_json_endpoint/tests/src/Unit/Plugin/DatasetSource/JsonEndpointSourceTest.php new file mode 100644 index 000000000..e9d044dee --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tests/src/Unit/Plugin/DatasetSource/JsonEndpointSourceTest.php @@ -0,0 +1,61 @@ +assertSame( + 'private://data_pipelines_json_endpoint/my_dataset.json', + JsonEndpointSource::buildStorageUri('my_dataset') + ); + } + + /** + * Tests buildStorageUri with various valid machine names. + * + * @dataProvider machineNameProvider + */ + public function testBuildStorageUriHandlesVariousMachineNames(string $name, string $expected): void { + $this->assertSame($expected, JsonEndpointSource::buildStorageUri($name)); + } + + public static function machineNameProvider(): array { + return [ + 'underscored name' => [ + 'station_locator', + 'private://data_pipelines_json_endpoint/station_locator.json', + ], + 'name with numbers' => [ + 'dataset_2024', + 'private://data_pipelines_json_endpoint/dataset_2024.json', + ], + 'single character' => [ + 'a', + 'private://data_pipelines_json_endpoint/a.json', + ], + ]; + } + + /** + * Tests that the storage scheme and directory constants have expected values. + */ + public function testStorageConstants(): void { + $this->assertSame('private', JsonEndpointSource::STORAGE_SCHEME); + $this->assertSame('data_pipelines_json_endpoint', JsonEndpointSource::STORAGE_DIR); + } + +} diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml new file mode 100644 index 000000000..e09d69a80 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.info.yml @@ -0,0 +1,8 @@ +name: 'Tide Data Pipeline JSON Endpoint' +type: module +description: 'Provides a JSON endpoint source for data pipelines, allowing external systems to push JSON data via an authenticated POST request.' +core_version_requirement: ^10 +package: Tide +dependencies: + - drupal:data_pipelines + - consumers:consumers diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.install b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.install new file mode 100644 index 000000000..6d08a5843 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.install @@ -0,0 +1,38 @@ + 'Data Pipeline Pusher', + 'client_id' => 'data_pipeline_pusher', + 'confidential' => 1, + 'is_default' => FALSE, + 'third_party' => 0, + 'status' => 1, + 'roles' => [['target_id' => 'data_pipeline_pusher']], + ])->save(); +} + +/** + * Implements hook_uninstall(). + */ +function tide_data_pipeline_json_endpoint_uninstall(): void { + $consumers = \Drupal::entityTypeManager() + ->getStorage('consumer') + ->loadByProperties(['client_id' => 'data_pipeline_pusher']); + foreach ($consumers as $consumer) { + $consumer->delete(); + } +} diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml new file mode 100644 index 000000000..fbd053818 --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.permissions.yml @@ -0,0 +1,3 @@ +push data pipeline json endpoint: + title: 'Push data pipeline JSON endpoint' + description: 'Allows pushing data to the JSON endpoint dataset source via the authenticated POST endpoint.' diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml new file mode 100644 index 000000000..06cd4f2ad --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.routing.yml @@ -0,0 +1,10 @@ +tide_data_pipeline_json_endpoint.push: + path: '/api/datasets/{machine_name}/push' + defaults: + _controller: '\Drupal\tide_data_pipeline_json_endpoint\Controller\DatasetPushController::push' + _title: 'Push Dataset Data' + methods: [POST] + requirements: + _permission: 'push data pipeline json endpoint' + options: + no_cache: TRUE diff --git a/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml new file mode 100644 index 000000000..ad189ddbe --- /dev/null +++ b/modules/tide_data_pipeline_json_endpoint/tide_data_pipeline_json_endpoint.services.yml @@ -0,0 +1 @@ +services: {} diff --git a/modules/tide_search/src/Plugin/DatasetTransform/Flatten.php b/modules/tide_search/src/Plugin/DatasetTransform/Flatten.php new file mode 100644 index 000000000..a7d4420c7 --- /dev/null +++ b/modules/tide_search/src/Plugin/DatasetTransform/Flatten.php @@ -0,0 +1,59 @@ + [], + 'separator' => '_', + 'remove_source' => TRUE, + ]; + } + + /** + * {@inheritdoc} + */ + protected function doTransformRecord(DatasetData $record): DatasetData { + $record = parent::doTransformRecord($record); + if (!$this->configuration['fields']) { + return $record; + } + $separator = $this->configuration['separator']; + $remove_source = $this->configuration['remove_source']; + foreach ($this->configuration['fields'] as $field_name) { + if (!$record->offsetExists($field_name)) { + continue; + } + $value = $record[$field_name]; + if (!is_array($value)) { + continue; + } + foreach ($value as $key => $nested_value) { + $record["{$field_name}{$separator}{$key}"] = $nested_value; + } + if ($remove_source) { + unset($record[$field_name]); + } + } + return $record; + } + +}