Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ set(
compiler/compilation_context.hpp
compiler/logical_query_plan/functional_dependency.cpp
compiler/logical_query_plan/functional_dependency.hpp
compiler/optimizer/strategy/lqp_column_pruning_rule.cpp
compiler/optimizer/strategy/lqp_column_pruning_rule.hpp
)

set(
Expand Down
376 changes: 376 additions & 0 deletions src/lib/compiler/optimizer/strategy/lqp_column_pruning_rule.cpp

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions src/lib/compiler/optimizer/strategy/lqp_column_pruning_rule.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Taken and modified from our sister project Hyrise (https://github.com/hyrise/hyrise)
*/
#pragma once

#include <unordered_map>

#include "compiler/logical_query_plan/abstract_lqp_node.hpp"
#include "compiler/optimizer/abstract_rule.hpp"
#include "expression/abstract_expression.hpp"

namespace skyrise {

// Removes expressions (i.e., columns) that are never or no longer used from the plan
// - In StoredTableNodes, we can get rid of all columns that are not used anywhere in the LQP
// - In ProjectionNodes, we can get rid of columns that are not part of the result or not used anymore. Example:
// SELECT SUM(a + 2) FROM (SELECT a, a + 1 FROM t1) t2
// Here, `a + 1` is never actually used and should be pruned
// - Joins that emit columns that are never used can be rewritten to semi joins if (a) the unused side has a unique
// constraint and (b) the join is an inner join. This is done in the ColumnPruningRule because it requires
// information about which columns are needed and which ones are not. That information is gathered here and not
// exported.
class LqpColumnPruningRule : public AbstractRule {
public:
const std::string& Name() const override;

void ApplyTo(const std::shared_ptr<AbstractLqpNode>& lqp_root) const override;
};

} // namespace skyrise
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "constants.hpp"
#include "operator/import_operator.hpp"
#include "storage/formats/csv_reader.hpp"
#include "storage/formats/orc_reader.hpp"
#include "utils/json.hpp"

namespace {
Expand Down Expand Up @@ -183,12 +182,14 @@ std::shared_ptr<AbstractOperator> ImportOperatorProxy::CreateOperatorInstanceRec
};

ImportFormat import_format = ImportFormat::kCsv;
if (specifies_format(std::string{kOrcExtension})) {
import_format = ImportFormat::kOrc;
} else if (specifies_format(std::string{kCsvExtension})) {
if (specifies_format(std::string{kCsvExtension})) {
import_format = ImportFormat::kCsv;
} else if (specifies_format(std::string{kOrcExtension})) {
import_format = ImportFormat::kOrc;
} else if (specifies_format(std::string{kParquetExtension})) {
import_format = ImportFormat::kParquet;
} else {
Fail("Expected object key to have either a .csv or .orc file extension.");
Fail("Object key expected to have one of the following file extensions: .csv, .orc or .parquet.");
}
reader_factory = ImportOptions(import_format, column_ids_).CreateReaderFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <magic_enum.hpp>

#include "expression/expression_serialization.hpp"
#include "storage/formats/parquet_expression.hpp"
#include "utils/json.hpp"

namespace {
Expand Down Expand Up @@ -30,6 +32,12 @@ const std::string kJsonKeyCsvHasHeader = "has_header";
const std::string kJsonKeyCsvHasTypes = "has_types";
const std::string kJsonKeyCsvReadBufferSize = "read_buffer_size";

// ParquetFormatReaderOptions
const std::string kJsonKeyParquetFormatReaderOptions = "parquet_format_reader_options";
const std::string kJsonKeyParquetParseDatesAsString = "parse_dates_as_string";
const std::string kJsonKeyParquetIncludeColumns = "include_columns";
const std::string kJsonKeyParquetExpression = "skyrise_expression";

} // namespace

namespace skyrise {
Expand All @@ -48,6 +56,11 @@ ImportOptions::ImportOptions(ImportFormat object_format, const std::vector<Colum
options.include_columns = columns_to_load;
reader_options_ = options;
} break;
case ImportFormat::kParquet: {
auto options = ParquetFormatReaderOptions();
options.include_columns = columns_to_load;
reader_options_ = options;
} break;
default:
Fail("Unexpected ImportFormat.");
}
Expand All @@ -59,6 +72,9 @@ ImportOptions::ImportOptions(CsvFormatReaderOptions csv_format_reader_options)
ImportOptions::ImportOptions(OrcFormatReaderOptions orc_format_reader_options)
: import_format_(ImportFormat::kOrc), reader_options_(std::move(orc_format_reader_options)){};

ImportOptions::ImportOptions(ParquetFormatReaderOptions parquet_format_reader_options)
: import_format_(ImportFormat::kParquet), reader_options_(std::move(parquet_format_reader_options)){};

Aws::Utils::Json::JsonValue ImportOptions::ToJson() const {
Aws::Utils::Json::JsonValue json_output;

Expand Down Expand Up @@ -112,6 +128,30 @@ Aws::Utils::Json::JsonValue ImportOptions::ToJson() const {

} break;

case ImportFormat::kParquet: {
const auto& parquet_options = std::get<ParquetFormatReaderOptions>(reader_options_);
auto json_parquet_options = Aws::Utils::Json::JsonValue();

json_parquet_options.WithBool(kJsonKeyParquetParseDatesAsString, parquet_options.parse_dates_as_string);

if (parquet_options.include_columns.has_value()) {
json_parquet_options.WithArray(kJsonKeyParquetIncludeColumns,
VectorToJsonArray<ColumnId>(parquet_options.include_columns.value()));
}

if (parquet_options.expected_schema) {
json_parquet_options.WithArray(kJsonKeyExpectedSchema,
TableColumnDefinitionsToJsonArray(parquet_options.expected_schema));
}

if (parquet_options.skyrise_expression.has_value()) {
json_parquet_options.WithObject(kJsonKeyParquetExpression,
SerializeExpression(parquet_options.skyrise_expression.value()));
}

json_output.WithObject(kJsonKeyParquetFormatReaderOptions, json_parquet_options);
} break;

default:
Fail("Unexpected ImportFormat.");
}
Expand Down Expand Up @@ -170,6 +210,30 @@ std::shared_ptr<const ImportOptions> ImportOptions::FromJson(const Aws::Utils::J
return std::make_shared<ImportOptions>(orc_options);
}

// (c) PARQUET Options
if (json_in.ValueExists(kJsonKeyParquetFormatReaderOptions)) {
const auto json = json_in.GetObject(kJsonKeyParquetFormatReaderOptions);
ParquetFormatReaderOptions parquet_options;
parquet_options.parse_dates_as_string = json.GetObject(kJsonKeyParquetParseDatesAsString).AsBool();

if (json.KeyExists(kJsonKeyExpectedSchema)) {
parquet_options.expected_schema =
ImportOptions::TableColumnDefinitionsFromJsonArray(json.GetArray(kJsonKeyExpectedSchema));
}

if (json.KeyExists(kJsonKeyParquetIncludeColumns)) {
parquet_options.include_columns = JsonArrayToVector<ColumnId>(json.GetArray(kJsonKeyParquetIncludeColumns));
}

if (json.KeyExists(kJsonKeyParquetExpression)) {
const auto serialized_expression = json.GetObject(kJsonKeyParquetExpression);
const auto skyrise_expression = DeserializeExpression(serialized_expression);
parquet_options.arrow_expression = CreateArrowExpression(skyrise_expression);
}

return std::make_shared<ImportOptions>(parquet_options);
}

Fail("Failed to create ImportOptions because JSON values are missing.");
}

Expand All @@ -179,6 +243,9 @@ std::shared_ptr<AbstractChunkReaderFactory> ImportOptions::CreateReaderFactory()
return std::make_shared<FormatReaderFactory<CsvFormatReader>>(std::get<CsvFormatReaderOptions>(reader_options_));
case ImportFormat::kOrc:
return std::make_shared<FormatReaderFactory<OrcFormatReader>>(std::get<OrcFormatReaderOptions>(reader_options_));
case ImportFormat::kParquet:
return std::make_shared<FormatReaderFactory<ParquetFormatReader>>(
std::get<ParquetFormatReaderOptions>(reader_options_));
default:
Fail("Unexpected ImportFormat.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,24 @@
#include "storage/formats/abstract_chunk_reader.hpp"
#include "storage/formats/csv_reader.hpp"
#include "storage/formats/orc_reader.hpp"
#include "storage/formats/parquet_reader.hpp"

namespace skyrise {

enum class ImportFormat { kCsv, kOrc };
enum class ImportFormat { kCsv, kOrc, kParquet };

class ImportOptions {
public:
ImportOptions(ImportFormat object_format);
ImportOptions(ImportFormat object_format, const std::vector<skyrise::ColumnId>& columns_to_load);
ImportOptions(CsvFormatReaderOptions csv_format_reader_options);
ImportOptions(OrcFormatReaderOptions orc_format_reader_options);
ImportOptions(ParquetFormatReaderOptions parquet_format_reader_options);

/**
* @return a FormatReaderFactory for either CSV or ORC data.
* @return a FormatReaderFactory for either CSV, ORC or PARQUET data.
* The factory uses custom reader options, if provided. Otherwise, the factory is initialized with default
* reader options for CSV and ORC data.
* reader options for CSV, ORC or PARQUET data, respectively.
*/
std::shared_ptr<AbstractChunkReaderFactory> CreateReaderFactory() const;

Expand All @@ -39,7 +41,7 @@ class ImportOptions {

private:
ImportFormat import_format_;
std::variant<CsvFormatReaderOptions, OrcFormatReaderOptions> reader_options_;
std::variant<CsvFormatReaderOptions, OrcFormatReaderOptions, ParquetFormatReaderOptions> reader_options_;
};

} // namespace skyrise
1 change: 1 addition & 0 deletions src/lib/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ namespace skyrise {

inline constexpr std::string_view kCsvExtension = ".csv";
inline constexpr std::string_view kOrcExtension = ".orc";
inline constexpr std::string_view kParquetExtension = ".parquet";

} // namespace skyrise
16 changes: 15 additions & 1 deletion src/lib/storage/formats/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,25 @@ ParquetFormatReader::ParquetFormatReader(std::unique_ptr<ObjectReader> source, C
auto scan_builder =
std::make_shared<arrow::dataset::ScannerBuilder>(parquet_schema, std::move(fragment), scan_options);

// If possible, push down predicate.
// Push down any available predicates.
if (configuration_.arrow_expression.has_value()) {
HANDLE_RESULT(scan_builder->Filter(*configuration_.arrow_expression), "Failed to evaluate predicate");
}

// Push down any available projections.
if (configuration_.include_columns.has_value()) {
const auto field_names = parquet_schema->field_names();
const auto columns = *configuration_.include_columns;
std::vector<std::string> include_columns;
include_columns.reserve(columns.size());

for (const auto column_id : columns) {
include_columns.emplace_back(field_names[column_id]);
}

HANDLE_RESULT(scan_builder->Project(include_columns), "Failed to project columns");
}

scanner_ = scan_builder->Finish().ValueOrDie();
batch_iterator_ = scanner_->ScanBatches().ValueOrDie();

Expand Down
2 changes: 2 additions & 0 deletions src/lib/storage/formats/parquet_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ namespace skyrise {
struct ParquetFormatReaderOptions {
bool parse_dates_as_string = false;
std::shared_ptr<TableColumnDefinitions> expected_schema = nullptr;
std::optional<std::vector<ColumnId>> include_columns = std::nullopt;
std::optional<arrow::compute::Expression> arrow_expression = std::nullopt;
std::optional<std::shared_ptr<AbstractPredicateExpression>> skyrise_expression = std::nullopt;
};

class ParquetFormatReader : public AbstractChunkReader {
Expand Down
2 changes: 2 additions & 0 deletions src/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ set(
lib/compiler/physical_query_plan/pqp_serialization_test.cpp
lib/compiler/physical_query_plan/pqp_utils_test.cpp

lib/compiler/optimizer/strategy/lqp_column_pruning_rule_test.cpp

lib/expression/expression_evaluator_to_result_test.cpp
lib/expression/expression_serialization_test.cpp

Expand Down
Loading