more robust bulk file processing#903
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR refactors bulk CSV sanitization to parse the header and body line-by-line (rather than using a single-pass parser) to better handle malformed rows and preserve original end-of-line markers.
- Add
parseFirstLineinColumnarBulkDataSanitizerImplto capture header contents and EOL sequence - Introduce
processRecordsto iterate rows manually, skipping malformed lines via exceptions - Update tests to use a shared
forFilehelper and cover new first-line parsing and malformed-row behavior
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| ColumnarBulkDataSanitizerImpl.java | Added parseFirstLine, refactored sanitize to use processRecords and handle per-line parsing |
| RecordBulkDataSanitizerImpl.java | Changed sanitize signature to require BufferedReader (align with new parsing logic) |
| BulkDataSanitizer.java | Updated interface to accept BufferedReader instead of Reader |
| StorageHandler.java | Unified reader instantiation to BufferedReader |
| ColumnarBulkDataSanitizerImplTest.java | Added parameterized tests for header EOL detection and malformed-row skipping |
| BulkDataSanitizerImplTest.java | Replaced repeated file-reading code with forFile helper and wildcard imports |
Comments suppressed due to low confidence (6)
java/core/src/main/java/co/worklytics/psoxy/storage/impl/RecordBulkDataSanitizerImpl.java:61
- Changing the API from
ReadertoBufferedReaderis a breaking change and reduces flexibility; consider accepting aReaderand wrapping it internally.
public void sanitize(BufferedReader reader,
java/core/src/main/java/co/worklytics/psoxy/storage/impl/ColumnarBulkDataSanitizerImpl.java:78
- This override now requires
BufferedReaderinstead of the broaderReaderinterface, which may break existing clients; consider reverting toReader.
public void sanitize(@NonNull BufferedReader reader,
java/core/src/main/java/co/worklytics/psoxy/storage/BulkDataSanitizer.java:25
- The interface signature change from
ReadertoBufferedReaderis a breaking API change; it’s better to keep the more generalReader.
void sanitize(BufferedReader reader,
java/core/src/main/java/co/worklytics/psoxy/storage/impl/ColumnarBulkDataSanitizerImpl.java:322
transformsWithoutMappingsis declared but never used; remove it to clean up dead code.
Set<String> transformsWithoutMappings = new HashSet<>();
java/core/src/test/java/co/worklytics/psoxy/storage/impl/BulkDataSanitizerImplTest.java:42
- [nitpick] Avoid wildcard imports; prefer explicit imports for clarity and to prevent unintended dependencies.
import java.io.*;
java/core/src/main/java/co/worklytics/psoxy/storage/StorageHandler.java:382
- [nitpick] The variable is declared as
BufferedReader; consider using theReaderinterface on the left-hand side to keep the code decoupled from specific implementations.
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8), bufferSize);
…narBulkDataSanitizerImplTest.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…narBulkDataSanitizerImplTest.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
aperez-worklytics
left a comment
There was a problem hiding this comment.
I'd include the number of rows processed too as part of metadata.
And 🔴 ; pending to include that in GcsFileEventHandler for GCP (AWS and terminal are included here; missing GCP?)
| * @param outputBuffer to write the processed records to | ||
| * @return number of records which could not be processed due to errors | ||
| */ | ||
| int processRecords( |
There was a problem hiding this comment.
I'd return an object with the count of errors and processed lines too
| if (buffer.flush()) { | ||
| log.info(String.format("Processed records: %d", buffer.getProcessed())); | ||
| if (outputBuffer.addAndAttemptFlush(ProcessedRecord.of(Lists.newArrayList(newRecord.values())))) { | ||
| log.info(String.format("Processed records: %d", outputBuffer.getProcessed())); |
There was a problem hiding this comment.
Yeah true, that appears in logs but just the buffer; I think if we expose the total as part of metadata could be useful
| .setTrim(true) | ||
| .build(); | ||
|
|
||
| ParsedFirstLine parsedFirstLine = parseFirstLine(reader); |
There was a problem hiding this comment.
I'd include a comment here about why use this and not records.getFirstEndOfLine()
Fixes
for (CSVRecord record : records)can throwUncheckedIOExceptionif individual line is malformed. This avoids that to some extent, by parsing line-by-line.Change implications
While arguably good to avoid failing a huge file due to a couple malformed lines, may lead to encoding issues being too subtle to be discovered ... so perhaps we don't want this.