From 2bbe28eec1bc3feef92ed920fe71a8939b1d737e Mon Sep 17 00:00:00 2001 From: Niklas Riekenbrauck Date: Mon, 29 Aug 2022 17:46:26 +0200 Subject: [PATCH 1/4] [DYOD2022] Add projection pushdown to ParquetReader (#781) Add Arrow-based projection pushdown to Parquet reader. Co-authored-by: Niklas Riekenbrauck Co-authored-by: TheoRadig Co-authored-by: Benedikt Schenkel Co-authored-by: Thomas Bodner --- src/lib/storage/formats/parquet_reader.cpp | 16 ++++++++++++++- src/lib/storage/formats/parquet_reader.hpp | 1 + .../storage/formats/parquet_reader_test.cpp | 20 +++++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/lib/storage/formats/parquet_reader.cpp b/src/lib/storage/formats/parquet_reader.cpp index 5b02e927..da176995 100644 --- a/src/lib/storage/formats/parquet_reader.cpp +++ b/src/lib/storage/formats/parquet_reader.cpp @@ -98,11 +98,25 @@ ParquetFormatReader::ParquetFormatReader(std::unique_ptr source, C auto scan_builder = std::make_shared(parquet_schema, std::move(fragment), scan_options); - // If possible, push down predicate. + // Push down any available predicates. if (configuration_.arrow_expression.has_value()) { HANDLE_RESULT(scan_builder->Filter(*configuration_.arrow_expression), "Failed to evaluate predicate"); } + // Push down any available projections. + if (configuration_.include_columns.has_value()) { + const auto field_names = parquet_schema->field_names(); + const auto columns = *configuration_.include_columns; + std::vector include_columns; + include_columns.reserve(columns.size()); + + for (const auto column_id : columns) { + include_columns.emplace_back(field_names[column_id]); + } + + HANDLE_RESULT(scan_builder->Project(include_columns), "Failed to project columns"); + } + scanner_ = scan_builder->Finish().ValueOrDie(); batch_iterator_ = scanner_->ScanBatches().ValueOrDie(); diff --git a/src/lib/storage/formats/parquet_reader.hpp b/src/lib/storage/formats/parquet_reader.hpp index d6ac77b4..1b340afe 100644 --- a/src/lib/storage/formats/parquet_reader.hpp +++ b/src/lib/storage/formats/parquet_reader.hpp @@ -19,6 +19,7 @@ namespace skyrise { struct ParquetFormatReaderOptions { bool parse_dates_as_string = false; std::shared_ptr expected_schema = nullptr; + std::optional> include_columns = std::nullopt; std::optional arrow_expression = std::nullopt; }; diff --git a/src/test/lib/storage/formats/parquet_reader_test.cpp b/src/test/lib/storage/formats/parquet_reader_test.cpp index 18769345..80b003f6 100644 --- a/src/test/lib/storage/formats/parquet_reader_test.cpp +++ b/src/test/lib/storage/formats/parquet_reader_test.cpp @@ -26,4 +26,24 @@ TEST_F(ParquetFormatReaderTest, ArrowPredicatePushdown) { EXPECT_EQ(5, chunk->GetSegment(ColumnId(0))->Size()); } +TEST_F(ParquetFormatReaderTest, ProjectionPushdown) { + const auto parquet_options = ParquetFormatReaderOptions{}; + + // Test without projection + auto parquet_reader = ParquetFormatReader( + test_data_storage_->OpenForReading("parquet/partitioned_int_string.parquet"), parquet_options); + + const auto chunk = parquet_reader.Next(); + EXPECT_EQ(2, chunk->GetColumnCount()); + + // Test with projection + auto parquet_options_projection = ParquetFormatReaderOptions{}; + parquet_options_projection.include_columns = std::vector{0}; + auto parquet_reader_projection = ParquetFormatReader( + test_data_storage_->OpenForReading("parquet/partitioned_int_string.parquet"), parquet_options_projection); + + const auto chunk_projection = parquet_reader_projection.Next(); + EXPECT_EQ(1, chunk_projection->GetColumnCount()); +} + } // namespace skyrise From 679f83f1c7450b32e5b6d501cc37b99ce585ceab Mon Sep 17 00:00:00 2001 From: Theo <42896593+TheoRadig@users.noreply.github.com> Date: Tue, 30 Aug 2022 10:55:31 +0200 Subject: [PATCH 2/4] Integrate ParquetReader into ImportOptions / ImportProxy (#786) Co-authored-by: Niklas Riekenbrauck Co-authored-by: Benedikt Schenkel Co-authored-by: Thomas Bodner Co-authored-by: Niklas Riekenbrauck Co-authored-by: Theo --- .../operator_proxy/import_operator_proxy.cpp | 11 +-- .../operator_proxy/import_options.cpp | 67 +++++++++++++++++++ .../operator_proxy/import_options.hpp | 10 +-- src/lib/constants.hpp | 1 + src/lib/storage/formats/parquet_reader.hpp | 1 + .../import_operator_proxy_test.cpp | 41 ++++++++++-- .../operator_proxy/import_options_test.cpp | 26 +++++++ 7 files changed, 144 insertions(+), 13 deletions(-) diff --git a/src/lib/compiler/physical_query_plan/operator_proxy/import_operator_proxy.cpp b/src/lib/compiler/physical_query_plan/operator_proxy/import_operator_proxy.cpp index 07781adf..dae1707b 100644 --- a/src/lib/compiler/physical_query_plan/operator_proxy/import_operator_proxy.cpp +++ b/src/lib/compiler/physical_query_plan/operator_proxy/import_operator_proxy.cpp @@ -6,7 +6,6 @@ #include "constants.hpp" #include "operator/import_operator.hpp" #include "storage/formats/csv_reader.hpp" -#include "storage/formats/orc_reader.hpp" #include "utils/json.hpp" namespace { @@ -183,12 +182,14 @@ std::shared_ptr ImportOperatorProxy::CreateOperatorInstanceRec }; ImportFormat import_format = ImportFormat::kCsv; - if (specifies_format(std::string{kOrcExtension})) { - import_format = ImportFormat::kOrc; - } else if (specifies_format(std::string{kCsvExtension})) { + if (specifies_format(std::string{kCsvExtension})) { import_format = ImportFormat::kCsv; + } else if (specifies_format(std::string{kOrcExtension})) { + import_format = ImportFormat::kOrc; + } else if (specifies_format(std::string{kParquetExtension})) { + import_format = ImportFormat::kParquet; } else { - Fail("Expected object key to have either a .csv or .orc file extension."); + Fail("Object key expected to have one of the following file extensions: .csv, .orc or .parquet."); } reader_factory = ImportOptions(import_format, column_ids_).CreateReaderFactory(); } diff --git a/src/lib/compiler/physical_query_plan/operator_proxy/import_options.cpp b/src/lib/compiler/physical_query_plan/operator_proxy/import_options.cpp index 20684c2c..550ee251 100644 --- a/src/lib/compiler/physical_query_plan/operator_proxy/import_options.cpp +++ b/src/lib/compiler/physical_query_plan/operator_proxy/import_options.cpp @@ -2,6 +2,8 @@ #include +#include "expression/expression_serialization.hpp" +#include "storage/formats/parquet_expression.hpp" #include "utils/json.hpp" namespace { @@ -30,6 +32,12 @@ const std::string kJsonKeyCsvHasHeader = "has_header"; const std::string kJsonKeyCsvHasTypes = "has_types"; const std::string kJsonKeyCsvReadBufferSize = "read_buffer_size"; +// ParquetFormatReaderOptions +const std::string kJsonKeyParquetFormatReaderOptions = "parquet_format_reader_options"; +const std::string kJsonKeyParquetParseDatesAsString = "parse_dates_as_string"; +const std::string kJsonKeyParquetIncludeColumns = "include_columns"; +const std::string kJsonKeyParquetExpression = "skyrise_expression"; + } // namespace namespace skyrise { @@ -48,6 +56,11 @@ ImportOptions::ImportOptions(ImportFormat object_format, const std::vector(reader_options_); + auto json_parquet_options = Aws::Utils::Json::JsonValue(); + + json_parquet_options.WithBool(kJsonKeyParquetParseDatesAsString, parquet_options.parse_dates_as_string); + + if (parquet_options.include_columns.has_value()) { + json_parquet_options.WithArray(kJsonKeyParquetIncludeColumns, + VectorToJsonArray(parquet_options.include_columns.value())); + } + + if (parquet_options.expected_schema) { + json_parquet_options.WithArray(kJsonKeyExpectedSchema, + TableColumnDefinitionsToJsonArray(parquet_options.expected_schema)); + } + + if (parquet_options.skyrise_expression.has_value()) { + json_parquet_options.WithObject(kJsonKeyParquetExpression, + SerializeExpression(parquet_options.skyrise_expression.value())); + } + + json_output.WithObject(kJsonKeyParquetFormatReaderOptions, json_parquet_options); + } break; + default: Fail("Unexpected ImportFormat."); } @@ -170,6 +210,30 @@ std::shared_ptr ImportOptions::FromJson(const Aws::Utils::J return std::make_shared(orc_options); } + // (c) PARQUET Options + if (json_in.ValueExists(kJsonKeyParquetFormatReaderOptions)) { + const auto json = json_in.GetObject(kJsonKeyParquetFormatReaderOptions); + ParquetFormatReaderOptions parquet_options; + parquet_options.parse_dates_as_string = json.GetObject(kJsonKeyParquetParseDatesAsString).AsBool(); + + if (json.KeyExists(kJsonKeyExpectedSchema)) { + parquet_options.expected_schema = + ImportOptions::TableColumnDefinitionsFromJsonArray(json.GetArray(kJsonKeyExpectedSchema)); + } + + if (json.KeyExists(kJsonKeyParquetIncludeColumns)) { + parquet_options.include_columns = JsonArrayToVector(json.GetArray(kJsonKeyParquetIncludeColumns)); + } + + if (json.KeyExists(kJsonKeyParquetExpression)) { + const auto serialized_expression = json.GetObject(kJsonKeyParquetExpression); + const auto skyrise_expression = DeserializeExpression(serialized_expression); + parquet_options.arrow_expression = CreateArrowExpression(skyrise_expression); + } + + return std::make_shared(parquet_options); + } + Fail("Failed to create ImportOptions because JSON values are missing."); } @@ -179,6 +243,9 @@ std::shared_ptr ImportOptions::CreateReaderFactory() return std::make_shared>(std::get(reader_options_)); case ImportFormat::kOrc: return std::make_shared>(std::get(reader_options_)); + case ImportFormat::kParquet: + return std::make_shared>( + std::get(reader_options_)); default: Fail("Unexpected ImportFormat."); } diff --git a/src/lib/compiler/physical_query_plan/operator_proxy/import_options.hpp b/src/lib/compiler/physical_query_plan/operator_proxy/import_options.hpp index fd70371e..9fcd60ac 100644 --- a/src/lib/compiler/physical_query_plan/operator_proxy/import_options.hpp +++ b/src/lib/compiler/physical_query_plan/operator_proxy/import_options.hpp @@ -8,10 +8,11 @@ #include "storage/formats/abstract_chunk_reader.hpp" #include "storage/formats/csv_reader.hpp" #include "storage/formats/orc_reader.hpp" +#include "storage/formats/parquet_reader.hpp" namespace skyrise { -enum class ImportFormat { kCsv, kOrc }; +enum class ImportFormat { kCsv, kOrc, kParquet }; class ImportOptions { public: @@ -19,11 +20,12 @@ class ImportOptions { ImportOptions(ImportFormat object_format, const std::vector& columns_to_load); ImportOptions(CsvFormatReaderOptions csv_format_reader_options); ImportOptions(OrcFormatReaderOptions orc_format_reader_options); + ImportOptions(ParquetFormatReaderOptions parquet_format_reader_options); /** - * @return a FormatReaderFactory for either CSV or ORC data. + * @return a FormatReaderFactory for either CSV, ORC or PARQUET data. * The factory uses custom reader options, if provided. Otherwise, the factory is initialized with default - * reader options for CSV and ORC data. + * reader options for CSV, ORC or PARQUET data, respectively. */ std::shared_ptr CreateReaderFactory() const; @@ -39,7 +41,7 @@ class ImportOptions { private: ImportFormat import_format_; - std::variant reader_options_; + std::variant reader_options_; }; } // namespace skyrise diff --git a/src/lib/constants.hpp b/src/lib/constants.hpp index 044f61c8..9c970fdc 100644 --- a/src/lib/constants.hpp +++ b/src/lib/constants.hpp @@ -6,5 +6,6 @@ namespace skyrise { inline constexpr std::string_view kCsvExtension = ".csv"; inline constexpr std::string_view kOrcExtension = ".orc"; +inline constexpr std::string_view kParquetExtension = ".parquet"; } // namespace skyrise diff --git a/src/lib/storage/formats/parquet_reader.hpp b/src/lib/storage/formats/parquet_reader.hpp index 1b340afe..7413208a 100644 --- a/src/lib/storage/formats/parquet_reader.hpp +++ b/src/lib/storage/formats/parquet_reader.hpp @@ -21,6 +21,7 @@ struct ParquetFormatReaderOptions { std::shared_ptr expected_schema = nullptr; std::optional> include_columns = std::nullopt; std::optional arrow_expression = std::nullopt; + std::optional> skyrise_expression = std::nullopt; }; class ParquetFormatReader : public AbstractChunkReader { diff --git a/src/test/lib/compiler/physical_query_plan/operator_proxy/import_operator_proxy_test.cpp b/src/test/lib/compiler/physical_query_plan/operator_proxy/import_operator_proxy_test.cpp index 7e4b83fa..10a07248 100644 --- a/src/test/lib/compiler/physical_query_plan/operator_proxy/import_operator_proxy_test.cpp +++ b/src/test/lib/compiler/physical_query_plan/operator_proxy/import_operator_proxy_test.cpp @@ -15,14 +15,18 @@ class ImportOperatorProxyTest : public ::testing::Test { column_definitions_a_ = std::make_shared(); column_definitions_a_->emplace_back("a", DataType::kInt, false); + CsvFormatReaderOptions csv_options; + csv_options.expected_schema = column_definitions_a_; + import_options_csv_ = std::make_shared(csv_options); + OrcFormatReaderOptions orc_options; orc_options.expected_schema = column_definitions_a_; orc_options.select_partition_range = std::make_pair(1, 1); import_options_orc_ = std::make_shared(orc_options); - CsvFormatReaderOptions csv_options; - csv_options.expected_schema = column_definitions_a_; - import_options_csv_ = std::make_shared(csv_options); + ParquetFormatReaderOptions parquet_options; + parquet_options.expected_schema = column_definitions_a_; + import_options_parquet_ = std::make_shared(parquet_options); } protected: @@ -31,8 +35,9 @@ class ImportOperatorProxyTest : public ::testing::Test { ObjectReference{"dummy_bucket", "key3.orc", "etag3"}}; static inline const std::vector kColumnIds = {ColumnId{0}, ColumnId{1}, ColumnId{3}}; std::shared_ptr column_definitions_a_; - std::shared_ptr import_options_orc_; std::shared_ptr import_options_csv_; + std::shared_ptr import_options_orc_; + std::shared_ptr import_options_parquet_; }; TEST_F(ImportOperatorProxyTest, BaseProperties) { @@ -133,6 +138,21 @@ TEST_F(ImportOperatorProxyTest, SerializeAndDeserializeImportOptionsCsv) { EXPECT_EQ(deserialized_proxy_csv_json, proxy_csv_json); } +TEST_F(ImportOperatorProxyTest, SerializeAndDeserializeImportOptionsParquet) { + const auto import_proxy_parquet = ImportOperatorProxy::Make(kObjectReferences, kColumnIds); + import_proxy_parquet->SetImportOptions(import_options_parquet_); + // (1) Serialize + const auto proxy_parquet_json = import_proxy_parquet->ToJson(); + + // (2) Deserialize + const auto deserialized_proxy_parquet = ImportOperatorProxy::FromJson(proxy_parquet_json); + ASSERT_NE(std::static_pointer_cast(deserialized_proxy_parquet)->GetImportOptions(), nullptr); + + // (3) Serialize again + const auto deserialized_proxy_parquet_json = deserialized_proxy_parquet->ToJson(); + EXPECT_EQ(deserialized_proxy_parquet_json, proxy_parquet_json); +} + TEST_F(ImportOperatorProxyTest, DeepCopy) { auto import_proxy = ImportOperatorProxy::Make(kObjectReferences, kColumnIds); import_proxy->SetOutputObjectsCount(2); @@ -168,6 +188,13 @@ TEST_F(ImportOperatorProxyTest, CreateOperatorInstance) { EXPECT_TRUE(import_proxy->GetOrCreateOperatorInstance()); EXPECT_EQ(import_proxy->GetOrCreateOperatorInstance()->Type(), OperatorType::kImport); } + { + const std ::vector object_references = {ObjectReference("dummy_bucket", "key1.parquet"), + ObjectReference("dummy_bucket", "key2.parquet")}; + const auto import_proxy = ImportOperatorProxy::Make(object_references, kColumnIds); + EXPECT_TRUE(import_proxy->GetOrCreateOperatorInstance()); + EXPECT_EQ(import_proxy->GetOrCreateOperatorInstance()->Type(), OperatorType::kImport); + } } TEST_F(ImportOperatorProxyTest, CreateOperatorInstanceCustomCsvOptions) { @@ -182,4 +209,10 @@ TEST_F(ImportOperatorProxyTest, CreateOperatorInstanceCustomOrcOptions) { EXPECT_TRUE(import_proxy->GetOrCreateOperatorInstance()); } +TEST_F(ImportOperatorProxyTest, CreateOperatorInstanceCustomParquetOptions) { + const auto import_proxy = ImportOperatorProxy::Make(kObjectReferences, kColumnIds); + import_proxy->SetImportOptions(import_options_parquet_); + EXPECT_TRUE(import_proxy->GetOrCreateOperatorInstance()); +} + } // namespace skyrise diff --git a/src/test/lib/compiler/physical_query_plan/operator_proxy/import_options_test.cpp b/src/test/lib/compiler/physical_query_plan/operator_proxy/import_options_test.cpp index 2df41f76..54535411 100644 --- a/src/test/lib/compiler/physical_query_plan/operator_proxy/import_options_test.cpp +++ b/src/test/lib/compiler/physical_query_plan/operator_proxy/import_options_test.cpp @@ -5,6 +5,8 @@ #include +#include "expression/binary_predicate_expression.hpp" +#include "expression/value_expression.hpp" #include "storage/formats/csv_reader.hpp" #include "storage/formats/orc_reader.hpp" #include "types.hpp" @@ -178,4 +180,28 @@ TEST_F(ImportOptionsTest, IncludeColumns) { EXPECT_EQ(orc_reader_factory_B->Configuration().include_columns.value(), include_columns); } +TEST_F(ImportOptionsTest, CreateReaderFactoryParquetCustomOptions) { + // ParquetOptions with BinaryPredicateExpression + ParquetFormatReaderOptions parquet_format_reader_options; + + parquet_format_reader_options.skyrise_expression = std::make_optional(std::make_shared( + PredicateCondition::kEquals, std::make_shared(1), std::make_shared(1))); + const auto arrow_expression = arrow::compute::equal(arrow::compute::literal(1), arrow::compute::literal(1)); + const auto import_options = std::make_shared(parquet_format_reader_options); + const auto reader_factory = import_options->CreateReaderFactory(); + const auto parquet_reader_factory = + std::dynamic_pointer_cast>(reader_factory); + ASSERT_NE(parquet_reader_factory, nullptr); + + // Serialize / Deserialize + const auto serialized_json = import_options->ToJson(); + const auto deserialized_import_options = ImportOptions::FromJson(serialized_json); + const auto reader_factory_B = deserialized_import_options->CreateReaderFactory(); + const auto parquet_reader_factory_B = + std::dynamic_pointer_cast>(reader_factory_B); + ASSERT_NE(parquet_reader_factory_B, nullptr); + ASSERT_TRUE(parquet_reader_factory_B->Configuration().arrow_expression.has_value()); + EXPECT_EQ(arrow_expression, parquet_reader_factory_B->Configuration().arrow_expression); +} + } // namespace skyrise From fded644dce93b6b0c433368b6295b93af727c8ac Mon Sep 17 00:00:00 2001 From: Julian Menzler Date: Tue, 30 Aug 2022 17:29:12 +0200 Subject: [PATCH 3/4] Add ColumnPruningRule --- src/lib/CMakeLists.txt | 2 + .../strategy/lqp_column_pruning_rule.cpp | 376 ++++++++++++++++++ .../strategy/lqp_column_pruning_rule.hpp | 30 ++ 3 files changed, 408 insertions(+) create mode 100644 src/lib/compiler/optimizer/strategy/lqp_column_pruning_rule.cpp create mode 100644 src/lib/compiler/optimizer/strategy/lqp_column_pruning_rule.hpp diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index a79b32b4..06fb9ae1 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -10,6 +10,8 @@ set( compiler/compilation_context.hpp compiler/logical_query_plan/functional_dependency.cpp compiler/logical_query_plan/functional_dependency.hpp + compiler/optimizer/strategy/lqp_column_pruning_rule.cpp + compiler/optimizer/strategy/lqp_column_pruning_rule.hpp ) set( diff --git a/src/lib/compiler/optimizer/strategy/lqp_column_pruning_rule.cpp b/src/lib/compiler/optimizer/strategy/lqp_column_pruning_rule.cpp new file mode 100644 index 00000000..1ffa23ae --- /dev/null +++ b/src/lib/compiler/optimizer/strategy/lqp_column_pruning_rule.cpp @@ -0,0 +1,376 @@ +/** + * Taken and modified from our sister project Hyrise (https://github.com/hyrise/hyrise) + */ +#include "lqp_column_pruning_rule.hpp" + +#include + +#include "compiler/logical_query_plan/abstract_lqp_node.hpp" +#include "compiler/logical_query_plan/aggregate_node.hpp" +#include "compiler/logical_query_plan/dummy_table_node.hpp" +#include "compiler/logical_query_plan/join_node.hpp" +#include "compiler/logical_query_plan/lqp_utils.hpp" +#include "compiler/logical_query_plan/mock_node.hpp" +#include "compiler/logical_query_plan/predicate_node.hpp" +#include "compiler/logical_query_plan/projection_node.hpp" +#include "compiler/logical_query_plan/sort_node.hpp" +#include "compiler/logical_query_plan/stored_table_node.hpp" +#include "compiler/logical_query_plan/union_node.hpp" +#include "expression/abstract_expression.hpp" +#include "expression/expression_functional.hpp" +#include "expression/expression_utils.hpp" + +namespace { + +using namespace skyrise; // NOLINT +using namespace skyrise::expression_functional; // NOLINT + +void gather_expressions_not_computed_by_expression_evaluator( + const std::shared_ptr& expression, + const std::vector>& input_expressions, + ExpressionUnorderedSet& required_expressions, const bool top_level = true) { + // Top-level expressions are those that are (part of) the ExpressionEvaluator's final result. For example, for an + // ExpressionEvaluator producing (a + b) + c, the entire expression is a top-level expression. It is the consumer's + // job to mark it as required. (a + b) however, is required by the ExpressionEvaluator and will be added to + // required_expressions, as it is not a top-level expression. + + // If an expression that is not a top-level expression is already an input, we require it + if (std::find_if(input_expressions.begin(), input_expressions.end(), + [&expression](const auto& other) { return *expression == *other; }) != input_expressions.end()) { + if (!top_level) required_expressions.emplace(expression); + return; + } + + if (expression->type_ == ExpressionType::kAggregate || expression->type_ == ExpressionType::kLqpColumn) { + // Aggregates and LqpColumns are not calculated by the ExpressionEvaluator and are thus required to be part of the + // input. + required_expressions.emplace(expression); + return; + } + + for (const auto& argument : expression->arguments_) { + gather_expressions_not_computed_by_expression_evaluator(argument, input_expressions, required_expressions, false); + } +} + +ExpressionUnorderedSet gather_locally_required_expressions( + const std::shared_ptr& node, const ExpressionUnorderedSet& expressions_required_by_consumers) { + // Gathers all expressions required by THIS node, i.e., expressions needed by the node to do its job. For example, a + // PredicateNode `a < 3` requires the LqpColumn a. + auto locally_required_expressions = ExpressionUnorderedSet{}; + + switch (node->type_) { + // For the vast majority of node types, AbstractLqpNode::node_expression holds all expressions required by this + // node. + case LqpNodeType::kAlias: + case LqpNodeType::kCreateView: + case LqpNodeType::kDropView: + case LqpNodeType::kDummyTable: + case LqpNodeType::kImport: + case LqpNodeType::kLimit: + case LqpNodeType::kRoot: + case LqpNodeType::kSort: + case LqpNodeType::kStaticTable: + case LqpNodeType::kStoredTable: + case LqpNodeType::kMock: { + for (const auto& expression : node->node_expressions_) { + locally_required_expressions.emplace(expression); + } + } break; + + // For aggregate nodes, we need the group by columns and the arguments to the aggregate functions + case LqpNodeType::kAggregate: { + const auto& aggregate_node = static_cast(*node); + const auto& node_expressions = node->node_expressions_; + + for (auto expression_idx = size_t{0}; expression_idx < node_expressions.size(); ++expression_idx) { + const auto& expression = node_expressions[expression_idx]; + // The AggregateNode's node_expressions contain both the group_by- and the aggregate_expressions in that order, + // separated by aggregate_expressions_begin_idx. + if (expression_idx < aggregate_node.aggregate_expressions_begin_idx) { + // All group_by-expressions are required + locally_required_expressions.emplace(expression); + } else { + // We need the arguments of all aggregate functions + DebugAssert(expression->type_ == ExpressionType::kAggregate, "Expected AggregateExpression"); + if (!AggregateExpression::IsCountStar(*expression)) { + locally_required_expressions.emplace(expression->arguments_[0]); + } else { + /** + * COUNT(*) is an edge case: The aggregate function contains a pseudo column expression with an + * INVALID_COLUMN_ID. We cannot require the latter from other nodes. However, in the end, we have to + * ensure that the AggregateNode requires at least one expression from other nodes. + * For + * a) grouped COUNT(*) aggregates, this is guaranteed by the group-by column(s). + * b) ungrouped COUNT(*) aggregates, it may be guaranteed by other aggregate functions. But, if COUNT(*) + * is the only type of aggregate function, we simply require the first output expression from the + * left input node. + */ + if (!locally_required_expressions.empty() || expression_idx < node_expressions.size() - 1) continue; + locally_required_expressions.emplace(node->LeftInput()->OutputExpressions().at(0)); + } + } + } + } break; + + // For ProjectionNodes, collect all expressions that + // (1) were already computed and are re-used as arguments in this projection + // (2) cannot be computed (i.e., Aggregate and LqpColumn inputs) + // PredicateNodes have the same requirements - if they have their own implementation, they require all columns to + // be already computed; if they use the ExpressionEvaluator the columns should at least be computable. + case LqpNodeType::kPredicate: + case LqpNodeType::kProjection: { + for (const auto& expression : node->node_expressions_) { + if (node->type_ == LqpNodeType::kProjection && + expressions_required_by_consumers.find(expression) == expressions_required_by_consumers.cend()) { + // An expression produced by a ProjectionNode that is not required by anyone upstream is useless. We should + // not collect the expressions required for calculating that useless expression. + continue; + } + + gather_expressions_not_computed_by_expression_evaluator(expression, node->LeftInput()->OutputExpressions(), + locally_required_expressions); + } + } break; + + // For Joins, collect the expressions used on the left and right sides of the join expressions + case LqpNodeType::kJoin: { + const auto& join_node = static_cast(*node); + for (const auto& predicate : join_node.join_predicates()) { + DebugAssert(predicate->type_ == ExpressionType::kPredicate && predicate->arguments_.size() == 2, + "Expected binary predicate for join"); + locally_required_expressions.emplace(predicate->arguments_[0]); + locally_required_expressions.emplace(predicate->arguments_[1]); + } + } break; + + case LqpNodeType::kUnion: { + const auto& union_node = static_cast(*node); + switch (union_node.set_operation_mode) { + case SetOperationMode::kAll: { + // Similarly, if the two input tables are only glued together, the UnionNode itself does not require any + // expressions. Currently, this mode is used to merge the result of two mutually exclusive or conditions (see + // PredicateSplitUpRule). Once we have a union operator that merges data from different tables, we have to + // look into this more deeply. + Assert(union_node.LeftInput()->OutputExpressions() == union_node.RightInput()->OutputExpressions(), + "Can only handle SetOperationMode::kAll if both inputs have the same expressions"); + } break; + + case SetOperationMode::kUnique: { + // This probably needs all expressions, as all of them are used to establish uniqueness + Fail("SetOperationMode::kUnique is not supported yet"); + } + } + } break; + // No pruning of the input columns for these nodes as they need them all. + case LqpNodeType::kExport: + break; + } + + return locally_required_expressions; +} + +void recursively_gather_required_expressions( + const std::shared_ptr& node, + std::unordered_map, ExpressionUnorderedSet>& required_expressions_by_node, + std::unordered_map, size_t>& outputs_visited_by_node) { + auto& required_expressions = required_expressions_by_node[node]; + const auto locally_required_expressions = gather_locally_required_expressions(node, required_expressions); + required_expressions.insert(locally_required_expressions.begin(), locally_required_expressions.end()); + + // We only continue with node's inputs once we have visited all paths above node. We check this by counting the + // number of the node's outputs that have already been visited. Once we reach the output count, we can continue. + if (node->type_ != LqpNodeType::kRoot) ++outputs_visited_by_node[node]; + if (outputs_visited_by_node[node] < node->OutputNodeCount()) return; + + // Once all nodes that may require columns from this node (i.e., this node's outputs) have been visited, we can + // recurse into this node's inputs. + for (const auto& input : {node->LeftInput(), node->RightInput()}) { + if (!input) continue; + + // Make sure the entry in required_expressions_by_node exists, then insert all expressions that the current node + // needs + auto& required_expressions_for_input = required_expressions_by_node[input]; + for (const auto& required_expression : required_expressions) { + // Add the columns needed here (and above) if they come from the input node. Reasons why this might NOT be the + // case are: (1) The expression is calculated in this node (and is thus not available in the input node), or + // (2) we have two input nodes (i.e., a join) and the expressions comes from the other side. + if (input->FindColumnId(*required_expression)) { + required_expressions_for_input.emplace(required_expression); + } + } + + recursively_gather_required_expressions(input, required_expressions_by_node, outputs_visited_by_node); + } +} + +// void try_join_to_semi_rewrite( +// const std::shared_ptr& node, +// const std::unordered_map, ExpressionUnorderedSet>& required_expressions_by_node) +// { +// // Sometimes, joins are not actually used to combine tables but only to check the existence of a tuple in a second +// // table. Example: SELECT c_name FROM customer, nation WHERE c_nationkey = n_nationkey AND n_name = 'GERMANY' +// // If the join is on a unique/primary key column, we can rewrite these joins into semi joins. If, however, the +// // uniqueness is not guaranteed, we cannot perform the rewrite as non-unique joins could possibly emit a matching +// // line more than once. +// +// auto join_node = std::dynamic_pointer_cast(node); +// if (join_node->join_mode != JoinMode::kInner) return; +// +// // Check whether the left/right inputs are actually needed by following operators +// auto left_input_is_used = false; +// auto right_input_is_used = false; +// for (const auto& output : node->Outputs()) { +// for (const auto& required_expression : required_expressions_by_node.at(output)) { +// if (expression_evaluable_on_lqp(required_expression, *node->LeftInput())) left_input_is_used = true; +// if (expression_evaluable_on_lqp(required_expression, *node->RightInput())) right_input_is_used = true; +// } +// } +// DebugAssert(left_input_is_used || right_input_is_used, "Did not expect a useless join"); +// +// // Early out, if we need output expressions from both input tables. +// if (left_input_is_used && right_input_is_used) return; +// +// /** +// * We can only rewrite an inner join to a semi join when it has a join cardinality of 1:1 or n:1, which we check as +// * follows: +// * (1) From all predicates of type Equals, we collect the operand expressions by input node. +// * (2) We determine the input node that should be used for filtering. +// * (3) We check the input node from (2) for a matching single- or multi-expression unique constraint. +// * a) Found match -> Rewrite to semi join +// * b) No match -> Do no rewrite to semi join because we might end up with duplicated input records. +// */ +// const auto& join_predicates = join_node->join_predicates(); +// auto equals_predicate_expressions_left = ExpressionUnorderedSet{}; +// auto equals_predicate_expressions_right = ExpressionUnorderedSet{}; +// for (const auto& join_predicate : join_predicates) { +// const auto& predicate = std::dynamic_pointer_cast(join_predicate); +// // Skip predicates that are not of type Equals (because we need n:1 or 1:1 join cardinality) +// if (predicate->predicate_condition != PredicateCondition::Equals) continue; +// +// // Collect operand expressions table-wise +// for (const auto& operand_expression : {predicate->left_operand(), predicate->right_operand()}) { +// if (join_node->LeftInput()->has_OutputExpressions({operand_expression})) { +// equals_predicate_expressions_left.insert(operand_expression); +// } else if (join_node->RightInput()->has_OutputExpressions({operand_expression})) { +// equals_predicate_expressions_right.insert(operand_expression); +// } +// } +// } +// // Early out, if we did not see any Equals-predicates. +// if (equals_predicate_expressions_left.empty() || equals_predicate_expressions_right.empty()) return; +// +// // Determine, which node to use for Semi-Join-filtering and check for the required uniqueness guarantees +// if (!left_input_is_used && +// join_node->LeftInput()->has_matching_unique_constraint(equals_predicate_expressions_left)) { +// join_node->join_mode = JoinMode::kSemi; +// const auto temp = join_node->LeftInput(); +// join_node->set_left_input(join_node->RightInput()); +// join_node->set_right_input(temp); +// } +// if (!right_input_is_used && +// join_node->RightInput()->has_matching_unique_constraint(equals_predicate_expressions_right)) { +// join_node->join_mode = JoinMode::kSemi; +// } +// } + +void prune_projection_node( + const std::shared_ptr& node, + const std::unordered_map, ExpressionUnorderedSet>& required_expressions_by_node) { + // Iterate over the ProjectionNode's expressions and add them to the required expressions if at least one output node + // requires them + auto projection_node = std::dynamic_pointer_cast(node); + + auto new_node_expressions = std::vector>{}; + new_node_expressions.reserve(projection_node->node_expressions_.size()); + + for (const auto& expression : projection_node->node_expressions_) { + for (const auto& output : node->Outputs()) { + const auto& required_expressions = required_expressions_by_node.at(output); + if (std::find_if(required_expressions.begin(), required_expressions.end(), [&expression](const auto& other) { + return *expression == *other; + }) != required_expressions.end()) { + new_node_expressions.emplace_back(expression); + break; + } + } + } + + projection_node->node_expressions_ = new_node_expressions; +} + +} // namespace + +namespace skyrise { + +const std::string& LqpColumnPruningRule::Name() const { + static const auto name = std::string{"LqpColumnPruningRule"}; + return name; +} + +void LqpColumnPruningRule::ApplyTo(const std::shared_ptr& lqp_root) const { + // For each node, required_expressions_by_node will hold the expressions either needed by this node or by one of its + // successors (i.e., nodes to which this node is an input). After collecting this information, we walk through all + // identified nodes and perform the pruning. + std::unordered_map, ExpressionUnorderedSet> required_expressions_by_node; + + // Add top-level columns that need to be included as they are the actual output + const auto output_expressions = lqp_root->OutputExpressions(); + required_expressions_by_node[lqp_root].insert(output_expressions.cbegin(), output_expressions.cend()); + + // Recursively walk through the LQP. We cannot use VisitLqp_ as we explicitly need to take each path through the LQP. + // The right side of a diamond might require additional columns - if we only visited each node once, we might miss + // those. However, we track how many of a node's outputs we have already visited and recurse only once we have seen + // all of them. That way, the performance should be similar to that of VisitLqp_. + std::unordered_map, size_t> outputs_visited_by_node; + recursively_gather_required_expressions(lqp_root, required_expressions_by_node, outputs_visited_by_node); + + // Now, go through the LQP and perform all prunings. This time, it is sufficient to look at each node once. + for (const auto& [node, required_expressions] : required_expressions_by_node) { + DebugAssert(outputs_visited_by_node.at(node) == node->OutputNodeCount(), + "Not all outputs have been visited - is the input LQP corrupt?"); + switch (node->type_) { + case LqpNodeType::kMock: + case LqpNodeType::kStoredTable: { + // Prune all unused columns from a StoredTableNode + auto pruned_column_ids = std::vector{}; + for (const auto& expression : node->OutputExpressions()) { + if (required_expressions.find(expression) != required_expressions.end()) { + continue; + } + + const auto column_expression = std::dynamic_pointer_cast(expression); + pruned_column_ids.emplace_back(column_expression->original_column_id_); + } + + if (pruned_column_ids.size() == node->OutputExpressions().size()) { + // All columns were marked to be pruned. However, while `SELECT 1 FROM table` does not need any particular + // column, it needs at least one column so that it knows how many 1s to produce. Thus, we remove a random + // column from the pruning list. It does not matter which column it is. + pruned_column_ids.resize(pruned_column_ids.size() - 1); + } + + if (auto stored_table_node = std::dynamic_pointer_cast(node)) { + DebugAssert(stored_table_node->pruned_column_ids().empty(), "Node pruned twice"); + stored_table_node->set_pruned_column_ids(pruned_column_ids); + } else if (auto mock_node = std::dynamic_pointer_cast(node)) { + DebugAssert(mock_node->pruned_column_ids().empty(), "Node pruned twice"); + mock_node->set_pruned_column_ids(pruned_column_ids); + } + } break; + + case LqpNodeType::kJoin: { + // try_join_to_semi_rewrite(node, required_expressions_by_node); + } break; + + case LqpNodeType::kProjection: { + prune_projection_node(node, required_expressions_by_node); + } break; + + default: + break; // Node cannot be pruned + } + } +} + +} // namespace skyrise diff --git a/src/lib/compiler/optimizer/strategy/lqp_column_pruning_rule.hpp b/src/lib/compiler/optimizer/strategy/lqp_column_pruning_rule.hpp new file mode 100644 index 00000000..57e9025a --- /dev/null +++ b/src/lib/compiler/optimizer/strategy/lqp_column_pruning_rule.hpp @@ -0,0 +1,30 @@ +/** + * Taken and modified from our sister project Hyrise (https://github.com/hyrise/hyrise) + */ +#pragma once + +#include + +#include "compiler/logical_query_plan/abstract_lqp_node.hpp" +#include "compiler/optimizer/abstract_rule.hpp" +#include "expression/abstract_expression.hpp" + +namespace skyrise { + +// Removes expressions (i.e., columns) that are never or no longer used from the plan +// - In StoredTableNodes, we can get rid of all columns that are not used anywhere in the LQP +// - In ProjectionNodes, we can get rid of columns that are not part of the result or not used anymore. Example: +// SELECT SUM(a + 2) FROM (SELECT a, a + 1 FROM t1) t2 +// Here, `a + 1` is never actually used and should be pruned +// - Joins that emit columns that are never used can be rewritten to semi joins if (a) the unused side has a unique +// constraint and (b) the join is an inner join. This is done in the ColumnPruningRule because it requires +// information about which columns are needed and which ones are not. That information is gathered here and not +// exported. +class LqpColumnPruningRule : public AbstractRule { + public: + const std::string& Name() const override; + + void ApplyTo(const std::shared_ptr& lqp_root) const override; +}; + +} // namespace skyrise From 3afd926dae69dda6b7d4eb9b0b94d0e0f445e571 Mon Sep 17 00:00:00 2001 From: Julian Menzler Date: Tue, 30 Aug 2022 17:30:48 +0200 Subject: [PATCH 4/4] Add test suite --- src/test/CMakeLists.txt | 2 + .../strategy/lqp_column_pruning_rule_test.cpp | 697 ++++++++++++++++++ 2 files changed, 699 insertions(+) create mode 100644 src/test/lib/compiler/optimizer/strategy/lqp_column_pruning_rule_test.cpp diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 3b010fa0..ceb03476 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -40,6 +40,8 @@ set( lib/compiler/physical_query_plan/pqp_serialization_test.cpp lib/compiler/physical_query_plan/pqp_utils_test.cpp + lib/compiler/optimizer/strategy/lqp_column_pruning_rule_test.cpp + lib/expression/expression_evaluator_to_result_test.cpp lib/expression/expression_serialization_test.cpp diff --git a/src/test/lib/compiler/optimizer/strategy/lqp_column_pruning_rule_test.cpp b/src/test/lib/compiler/optimizer/strategy/lqp_column_pruning_rule_test.cpp new file mode 100644 index 00000000..ba014229 --- /dev/null +++ b/src/test/lib/compiler/optimizer/strategy/lqp_column_pruning_rule_test.cpp @@ -0,0 +1,697 @@ +/** + * Taken and modified from our sister project Hyrise (https://github.com/hyrise/hyrise) + */ +#include "compiler/optimizer/strategy/lqp_column_pruning_rule.hpp" + +#include "compiler/logical_query_plan/aggregate_node.hpp" +#include "compiler/logical_query_plan/export_node.hpp" +#include "compiler/logical_query_plan/join_node.hpp" +#include "compiler/logical_query_plan/mock_node.hpp" +#include "compiler/logical_query_plan/predicate_node.hpp" +#include "compiler/logical_query_plan/projection_node.hpp" +#include "compiler/logical_query_plan/sort_node.hpp" +#include "compiler/logical_query_plan/stored_table_node.hpp" +#include "compiler/logical_query_plan/union_node.hpp" +#include "compiler/optimizer/strategy/strategy_base_test.hpp" +#include "expression/expression_functional.hpp" +#include "testing/testing_assert.hpp" +#include "types.hpp" + +namespace skyrise { + +using namespace skyrise::expression_functional; // NOLINT(google-build-using-namespace) + +class LqpColumnPruningRuleTest : public StrategyBaseTest { + public: + void SetUp() override { + node_a = MockNode::Make( + MockNode::ColumnDefinitions{{DataType::kInt, "a"}, {DataType::kInt, "b"}, {DataType::kInt, "c"}}, "a"); + node_b = MockNode::Make( + MockNode::ColumnDefinitions{{DataType::kInt, "u"}, {DataType::kInt, "v"}, {DataType::kInt, "w"}}, "b"); + + a = node_a->get_column("a"); + b = node_a->get_column("b"); + c = node_a->get_column("c"); + u = node_b->get_column("u"); + v = node_b->get_column("v"); + w = node_b->get_column("w"); + + rule_ = std::make_shared(); + } + + const std::shared_ptr Pruned(const std::shared_ptr node, + const std::vector& column_ids) { + const auto pruned_node = std::static_pointer_cast(node->DeepCopy()); + pruned_node->set_pruned_column_ids(column_ids); + return pruned_node; + } + + std::shared_ptr rule_; // TODO append _ + std::shared_ptr node_a, node_b; + std::shared_ptr a, b, c, u, v, w; +}; + +TEST_F(LqpColumnPruningRuleTest, NoUnion) { + std::shared_ptr lqp; + + // clang-format off + lqp = + ProjectionNode::Make(ExpressionVector_(Add_(Mul_(a, u), 5)), + PredicateNode::Make(GreaterThan_(5, c), + JoinNode::Make(JoinMode::kInner, GreaterThan_(v, a), + node_a, + SortNode::Make(ExpressionVector_(w), std::vector{SortMode::kAscending}, // NOLINT + node_b)))); + + // Create deep copy so we can set pruned ColumnIds on node_a below without manipulating the input LQP + lqp = lqp->DeepCopy(); + + const auto pruned_node_a = Pruned(node_a, {ColumnId{1}}); + const auto pruned_a = pruned_node_a->get_column("a"); + const auto pruned_c = pruned_node_a->get_column("c"); + + const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); + + const auto expected_lqp = + ProjectionNode::Make(ExpressionVector_(Add_(Mul_(pruned_a, u), 5)), + PredicateNode::Make(GreaterThan_(5, pruned_c), + JoinNode::Make(JoinMode::kInner, GreaterThan_(v, pruned_a), + pruned_node_a, + SortNode::Make(ExpressionVector_(w), std::vector{SortMode::kAscending}, // NOLINT + node_b)))); + // clang-format on + + EXPECT_LQP_EQ(actual_lqp, expected_lqp); +} + +TEST_F(LqpColumnPruningRuleTest, WithUnion) { + for (auto union_mode : {SetOperationMode::kAll}) { + // SCOPED_TRACE(std::string{"union_mode: "} + set_operation_mode_to_string.left.at(union_mode)); + + auto lqp = std::shared_ptr{}; + + // clang-format off + lqp = + ProjectionNode::Make(ExpressionVector_(a), + UnionNode::Make(union_mode, + PredicateNode::Make(GreaterThan_(a, 5), + node_a), + PredicateNode::Make(GreaterThan_(b, 5), + node_a))); + + // Create deep copy so we can set pruned ColumnIds on node_a below without manipulating the input LQP + lqp = lqp->DeepCopy(); + + + const auto pruned_node_a = Pruned(node_a, {ColumnId{2}}); + const auto pruned_a = pruned_node_a->get_column("a"); + const auto pruned_b = pruned_node_a->get_column("b"); + + const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); + + // Column c is not used anywhere above the union, so it can be pruned at least in the Positions mode + const auto expected_lqp = + ProjectionNode::Make(ExpressionVector_(pruned_a), + UnionNode::Make(union_mode, + PredicateNode::Make(GreaterThan_(pruned_a, 5), + pruned_node_a), + PredicateNode::Make(GreaterThan_(pruned_b, 5), + pruned_node_a))); + // clang-format on + + EXPECT_LQP_EQ(actual_lqp, expected_lqp); + } +} + +TEST_F(LqpColumnPruningRuleTest, WithMultipleProjections) { + auto lqp = std::shared_ptr{}; + + // clang-format off + lqp = + ProjectionNode::Make(ExpressionVector_(a), + PredicateNode::Make(GreaterThan_(Mul_(a, b), 5), + ProjectionNode::Make(ExpressionVector_(a, b, Mul_(a, b), c), + PredicateNode::Make(GreaterThan_(Mul_(a, 2), 5), + ProjectionNode::Make(ExpressionVector_(a, b, Mul_(a, 2), c), + node_a))))); + + // Create deep copy so we can set pruned ColumnIds on node_a below without manipulating the input LQP + lqp = lqp->DeepCopy(); + + const auto pruned_node_a = Pruned(node_a, {ColumnId{2}}); + const auto pruned_a = pruned_node_a->get_column("a"); + const auto pruned_b = pruned_node_a->get_column("b"); + + const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); + + const auto expected_lqp = + ProjectionNode::Make(ExpressionVector_(pruned_a), + PredicateNode::Make(GreaterThan_(Mul_(pruned_a, pruned_b), 5), + ProjectionNode::Make(ExpressionVector_(pruned_a, Mul_(pruned_a, pruned_b)), + PredicateNode::Make(GreaterThan_(Mul_(pruned_a, 2), 5), + ProjectionNode::Make(ExpressionVector_(pruned_a, pruned_b, Mul_(pruned_a, 2)), + pruned_node_a))))); + // clang-format on + + EXPECT_LQP_EQ(actual_lqp, expected_lqp); +} + +TEST_F(LqpColumnPruningRuleTest, ProjectionDoesNotRecompute) { + auto lqp = std::shared_ptr{}; + + // clang-format off + lqp = + ProjectionNode::Make(ExpressionVector_(Add_(Add_(a, 2), 1)), + PredicateNode::Make(GreaterThan_(Add_(a, 2), 5), + ProjectionNode::Make(ExpressionVector_(Add_(a, 2)), + node_a))); + + // Create deep copy so we can set pruned ColumnIds on node_a below without manipulating the input LQP + lqp = lqp->DeepCopy(); + + const auto pruned_node_a = Pruned(node_a, {ColumnId{1}, ColumnId{2}}); + const auto pruned_a = pruned_node_a->get_column("a"); + + const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); + + const auto expected_lqp = + ProjectionNode::Make(ExpressionVector_(Add_(Add_(pruned_a, 2), 1)), + PredicateNode::Make(GreaterThan_(Add_(pruned_a, 2), 5), + ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2)), + pruned_node_a))); + // clang-format on + + // We can be sure that the top projection node does not recompute a+2 because a is not available + + EXPECT_LQP_EQ(actual_lqp, expected_lqp); +} + +TEST_F(LqpColumnPruningRuleTest, Diamond) { + auto lqp = std::shared_ptr{}; + + // clang-format off + const auto sub_lqp = + ProjectionNode::Make(ExpressionVector_(Add_(a, 2), Add_(b, 3), Add_(c, 4)), + node_a); + + lqp = + ProjectionNode::Make(ExpressionVector_(Add_(a, 2), Add_(b, 3)), + UnionNode::Make(SetOperationMode::kAll, // changed from kPositions to kAll + PredicateNode::Make(GreaterThan_(Add_(a, 2), 5), + sub_lqp), + PredicateNode::Make(LessThan_(Add_(b, 3), 10), + sub_lqp))); + + // Create deep copy so we can set pruned ColumnIds on node_a below without manipulating the input LQP + lqp = lqp->DeepCopy(); + + // Column c should be removed even below the UnionNode + const auto pruned_node_a = Pruned(node_a, {ColumnId{2}}); + const auto pruned_a = pruned_node_a->get_column("a"); + const auto pruned_b = pruned_node_a->get_column("b"); + + const auto expected_sub_lqp = + ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2), Add_(pruned_b, 3)), + pruned_node_a); + + const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); + + const auto expected_lqp = + ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2), Add_(pruned_b, 3)), + UnionNode::Make(SetOperationMode::kAll, // changed from kPositions to kAll + PredicateNode::Make(GreaterThan_(Add_(pruned_a, 2), 5), + expected_sub_lqp), + PredicateNode::Make(LessThan_(Add_(pruned_b, 3), 10), + expected_sub_lqp))); + // clang-format on + + // We can be sure that the top projection node does not recompute a+2 because a is not available + + EXPECT_LQP_EQ(actual_lqp, expected_lqp); +} + +TEST_F(LqpColumnPruningRuleTest, SimpleAggregate) { + auto lqp = std::shared_ptr{}; + + // clang-format off + lqp = + AggregateNode::Make(ExpressionVector_(), ExpressionVector_(Sum_(Add_(a, 2))), + ProjectionNode::Make(ExpressionVector_(a, b, Add_(a, 2)), + node_a)); + + // Create deep copy so we can set pruned ColumnIds on node_a below without manipulating the input LQP + lqp = lqp->DeepCopy(); + + const auto pruned_node_a = Pruned(node_a, {ColumnId{1}, ColumnId{2}}); + const auto pruned_a = pruned_node_a->get_column("a"); + + const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); + + const auto expected_lqp = + AggregateNode::Make(ExpressionVector_(), ExpressionVector_(Sum_(Add_(pruned_a, 2))), + ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2)), + pruned_node_a)); + // clang-format on + + EXPECT_LQP_EQ(actual_lqp, expected_lqp); +} + +TEST_F(LqpColumnPruningRuleTest, UngroupedCountStar) { + auto lqp = std::shared_ptr{}; + + // clang-format off + lqp = + AggregateNode::Make(ExpressionVector_(), ExpressionVector_(CountStarLqp_(node_a)), + ProjectionNode::Make(ExpressionVector_(a, b, Add_(a, 2)), + node_a)); + + // Create deep copy so we can set pruned ColumnIds on node_a below without manipulating the input LQP + lqp = lqp->DeepCopy(); + + const auto pruned_node_a = Pruned(node_a, {ColumnId{1}, ColumnId{2}}); + const auto pruned_a = pruned_node_a->get_column("a"); + + const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); + + const auto expected_lqp = + AggregateNode::Make(ExpressionVector_(), ExpressionVector_(CountStarLqp_(pruned_node_a)), + ProjectionNode::Make(ExpressionVector_(pruned_a), + pruned_node_a)); + // clang-format on + + EXPECT_LQP_EQ(actual_lqp, expected_lqp); +} + +TEST_F(LqpColumnPruningRuleTest, UngroupedCountStarAndSum) { + auto lqp = std::shared_ptr{}; + + // clang-format off + lqp = + AggregateNode::Make(ExpressionVector_(), ExpressionVector_(CountStarLqp_(node_a), Sum_(b)), + ProjectionNode::Make(ExpressionVector_(a, b, Add_(a, 2)), + node_a)); + + // Create deep copy so we can set pruned ColumnIds on node_a below without manipulating the input LQP + lqp = lqp->DeepCopy(); + + const auto pruned_node_a = Pruned(node_a, {ColumnId{0}, ColumnId{2}}); + const auto pruned_b = pruned_node_a->get_column("b"); + + const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); + + const auto expected_lqp = + AggregateNode::Make(ExpressionVector_(), ExpressionVector_(CountStarLqp_(pruned_node_a), Sum_(pruned_b)), + ProjectionNode::Make(ExpressionVector_(pruned_b), + pruned_node_a)); + // clang-format on + + EXPECT_LQP_EQ(actual_lqp, expected_lqp); +} + +TEST_F(LqpColumnPruningRuleTest, GroupedCountStar) { + auto lqp = std::shared_ptr{}; + + // clang-format off + lqp = + AggregateNode::Make(ExpressionVector_(b, a), ExpressionVector_(CountStarLqp_(node_a)), + ProjectionNode::Make(ExpressionVector_(a, b, Add_(a, 2)), + node_a)); + + // Create deep copy so we can set pruned ColumnIds on node_a below without manipulating the input LQP + lqp = lqp->DeepCopy(); + + const auto pruned_node_a = Pruned(node_a, {ColumnId{2}}); + const auto pruned_a = pruned_node_a->get_column("a"); + const auto pruned_b = pruned_node_a->get_column("b"); + + const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); + + const auto expected_lqp = + AggregateNode::Make(ExpressionVector_(pruned_b, pruned_a), ExpressionVector_(CountStarLqp_(pruned_node_a)), + ProjectionNode::Make(ExpressionVector_(pruned_a, pruned_b), + pruned_node_a)); + // clang-format on + + EXPECT_LQP_EQ(actual_lqp, expected_lqp); +} + +// TEST_F(LqpColumnPruningRuleTest, InnerJoinToSemiJoin) { +// { +// TableColumnDefinitions column_definitions; +// column_definitions.emplace_back("column0", DataType::kInt, false); +// auto table = std::make_shared(column_definitions, TableType::Data, 2, UseMvcc::Yes); +// +// auto& sm = Hyrise::get().storage_manager; +// sm.add_table("table", table); +// +// table->add_soft_key_constraint({{ColumnId{0}}, KeyConstraintType::UNIQUE}); +// } +// +// const auto stored_table_node = StoredTableNode::Make("table"); +// const auto column0 = stored_table_node->get_column("column0"); +// +// // clang-format off +// const auto lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(a, 2)), +// JoinNode::Make(JoinMode::kInner, equals_(a, column0), +// ProjectionNode::Make(ExpressionVector_(a, Add_(b, 1)), +// node_a), +// stored_table_node)); +// +// const auto pruned_node_a = Pruned(node_a, {ColumnId{1}, ColumnId{2}}); +// const auto pruned_a = pruned_node_a->get_column("a"); +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// +// const auto expected_lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2)), +// JoinNode::Make(JoinMode::kSemi, equals_(pruned_a, column0), +// ProjectionNode::Make(ExpressionVector_(pruned_a), +// pruned_node_a), +// stored_table_node)); +// // clang-format on +// +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +//} + +// TEST_F(LqpColumnPruningRuleTest, MultiPredicateInnerJoinToSemiJoinWithSingleEqui) { +// // Same as InnerJoinToSemiJoin, but with an additional join predicate that should not change the result. +// +// { +// TableColumnDefinitions column_definitions; +// column_definitions.emplace_back("column0", DataType::kInt, false); +// column_definitions.emplace_back("column1", DataType::kInt, false); +// auto table = std::make_shared
(column_definitions, TableType::Data, 2, UseMvcc::Yes); +// +// auto& sm = Hyrise::get().storage_manager; +// sm.add_table("table", table); +// +// table->add_soft_key_constraint({{ColumnId{0}}, KeyConstraintType::UNIQUE}); +// } +// +// const auto stored_table_node = StoredTableNode::Make("table"); +// const auto column0 = stored_table_node->get_column("column0"); +// const auto column1 = stored_table_node->get_column("column1"); +// +// // clang-format off +// const auto lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(a, 2)), +// JoinNode::Make(JoinMode::kInner, ExpressionVector_(equals_(a, column0), not_equals_(a, column1)), +// ProjectionNode::Make(ExpressionVector_(a, Add_(b, 1)), +// node_a), +// stored_table_node)); +// +// const auto pruned_node_a = Pruned(node_a, {ColumnId{1}, ColumnId{2}}); +// const auto pruned_a = pruned_node_a->get_column("a"); +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// +// const auto expected_lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2)), +// JoinNode::Make(JoinMode::kSemi, ExpressionVector_(equals_(pruned_a, column0), not_equals_(pruned_a, column1)), +// ProjectionNode::Make(ExpressionVector_(pruned_a), +// pruned_node_a), +// stored_table_node)); +// // clang-format on +// +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +// } + +// TEST_F(LqpColumnPruningRuleTest, MultiPredicateInnerJoinToSemiJoinWithMultiEqui) { +// /** +// * Defines a multi-column key constraint (column0, column1) and two inner join predicates of type Equals covering +// * those two columns. We expect to see a semi join reformulation because the resulting unique constraint matches +// * the inner join's predicate expressions. +// */ +// { +// TableColumnDefinitions column_definitions; +// column_definitions.emplace_back("column0", DataType::kInt, false); +// column_definitions.emplace_back("column1", DataType::kInt, false); +// auto table = std::make_shared
(column_definitions, TableType::Data, 2, UseMvcc::Yes); +// +// auto& sm = Hyrise::get().storage_manager; +// sm.add_table("table", table); +// +// table->add_soft_key_constraint({{ColumnId{0}, ColumnId{1}}, KeyConstraintType::UNIQUE}); +// } +// +// const auto stored_table_node = StoredTableNode::Make("table"); +// const auto column0 = stored_table_node->get_column("column0"); +// const auto column1 = stored_table_node->get_column("column1"); +// +// // clang-format off +// const auto lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(a, 2)), +// JoinNode::Make(JoinMode::kInner, ExpressionVector_(equals_(a, column0), equals_(a, column1)), +// ProjectionNode::Make(ExpressionVector_(a, Add_(b, 1)), +// node_a), +// stored_table_node)); +// +// const auto pruned_node_a = Pruned(node_a, {ColumnId{1}, ColumnId{2}}); +// const auto pruned_a = pruned_node_a->get_column("a"); +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// +// const auto expected_lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2)), +// JoinNode::Make(JoinMode::kSemi, ExpressionVector_(equals_(pruned_a, column0), equals_(pruned_a, column1)), +// ProjectionNode::Make(ExpressionVector_(pruned_a), +// pruned_node_a), +// stored_table_node)); +// // clang-format on +// +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +// } + +// TEST_F(LqpColumnPruningRuleTest, DoNotTouchInnerJoinWithNonEqui) { +// { +// TableColumnDefinitions column_definitions; +// column_definitions.emplace_back("column0", DataType::kInt, false); +// auto table = std::make_shared
(column_definitions, TableType::Data, 2, UseMvcc::Yes); +// +// auto& sm = Hyrise::get().storage_manager; +// sm.add_table("table", table); +// +// table->add_soft_key_constraint({{ColumnId{0}}, KeyConstraintType::UNIQUE}); +// } +// +// const auto stored_table_node = StoredTableNode::Make("table"); +// const auto column0 = stored_table_node->get_column("column0"); +// +// // clang-format off +// const auto lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(a, 2)), +// JoinNode::Make(JoinMode::kInner, GreaterThan_(a, column0), +// ProjectionNode::Make(ExpressionVector_(a, Add_(b, 1)), +// node_a), +// stored_table_node)); +// +// const auto pruned_node_a = Pruned(node_a, {ColumnId{1}, ColumnId{2}}); +// const auto pruned_a = pruned_node_a->get_column("a"); +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// +// // Still expect it to prune b+1 +// const auto expected_lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2)), +// JoinNode::Make(JoinMode::kInner, GreaterThan_(pruned_a, column0), +// ProjectionNode::Make(ExpressionVector_(pruned_a), +// pruned_node_a), +// stored_table_node)); +// // clang-format on +// +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +// } +// +// TEST_F(LqpColumnPruningRuleTest, DoNotTouchInnerJoinWithoutUniqueConstraint) { +// // Based on the InnerJoinToSemiJoin test. +// { +// TableColumnDefinitions column_definitions; +// column_definitions.emplace_back("column0", DataType::kInt, false); +// auto table = std::make_shared
(column_definitions, TableType::Data, 2, UseMvcc::Yes); +// +// auto& sm = Hyrise::get().storage_manager; +// sm.add_table("table", table); +// } +// +// const auto stored_table_node = StoredTableNode::Make("table"); +// const auto column0 = stored_table_node->get_column("column0"); +// +// // clang-format off +// const auto lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(a, 2)), +// JoinNode::Make(JoinMode::kInner, equals_(a, column0), +// ProjectionNode::Make(ExpressionVector_(a, Add_(b, 1)), +// node_a), +// stored_table_node)); +// +// const auto pruned_node_a = Pruned(node_a, {ColumnId{1}, ColumnId{2}}); +// const auto pruned_a = pruned_node_a->get_column("a"); +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// +// const auto expected_lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2)), +// JoinNode::Make(JoinMode::kInner, equals_(pruned_a, column0), +// ProjectionNode::Make(ExpressionVector_(pruned_a), +// pruned_node_a), +// stored_table_node)); +// // clang-format on +// +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +// } +// +// TEST_F(LqpColumnPruningRuleTest, DoNotTouchInnerJoinWithoutMatchingUniqueConstraint) { +// /** +// * Based on the InnerJoinToSemiJoin test. +// * +// * We define a multi-column key constraint (column0, column1), but only a single Equals-predicate for the inner +// * join (a == column0). Hence, the resulting unique constraint does not match the expressions of the +// * single Equals-predicate and we should not see a semi join reformulation. +// */ +// +// { +// TableColumnDefinitions column_definitions; +// column_definitions.emplace_back("column0", DataType::kInt, false); +// column_definitions.emplace_back("column1", DataType::kInt, false); +// auto table = std::make_shared
(column_definitions, TableType::Data, 2, UseMvcc::Yes); +// +// auto& sm = Hyrise::get().storage_manager; +// sm.add_table("table", table); +// +// table->add_soft_key_constraint({{ColumnId{0}, ColumnId{1}}, KeyConstraintType::UNIQUE}); +// } +// +// const auto stored_table_node = StoredTableNode::Make("table"); +// const auto column0 = stored_table_node->get_column("column0"); +// +// // clang-format off +// const auto lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(a, 2)), +// JoinNode::Make(JoinMode::kInner, equals_(a, column0), +// ProjectionNode::Make(ExpressionVector_(a, Add_(b, 1)), +// node_a), +// stored_table_node)); +// +// const auto pruned_node_a = Pruned(node_a, {ColumnId{1}, ColumnId{2}}); +// const auto pruned_a = pruned_node_a->get_column("a"); +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// +// const auto expected_lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2)), +// JoinNode::Make(JoinMode::kInner, equals_(pruned_a, column0), +// ProjectionNode::Make(ExpressionVector_(pruned_a), +// pruned_node_a), +// stored_table_node)); +// // clang-format on +// +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +// } +// +// TEST_F(LqpColumnPruningRuleTest, DoNotTouchNonInnerJoin) { +// // Based on the InnerJoinToSemiJoin test. +// { +// TableColumnDefinitions column_definitions; +// column_definitions.emplace_back("column0", DataType::kInt, false); +// auto table = std::make_shared
(column_definitions, TableType::Data, 2, UseMvcc::Yes); +// +// auto& sm = Hyrise::get().storage_manager; +// sm.add_table("table", table); +// +// table->add_soft_key_constraint({{ColumnId{0}}, KeyConstraintType::PRIMARY_KEY}); +// } +// +// const auto stored_table_node = StoredTableNode::Make("table"); +// const auto column0 = stored_table_node->get_column("column0"); +// +// // clang-format off +// const auto lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(a, 2)), +// JoinNode::Make(JoinMode::kLeft, equals_(a, column0), +// ProjectionNode::Make(ExpressionVector_(a, Add_(b, 1)), +// node_a), +// stored_table_node)); +// +// const auto pruned_node_a = Pruned(node_a, {ColumnId{1}, ColumnId{2}}); +// const auto pruned_a = pruned_node_a->get_column("a"); +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// +// const auto expected_lqp = +// ProjectionNode::Make(ExpressionVector_(Add_(pruned_a, 2)), +// JoinNode::Make(JoinMode::kLeft, equals_(pruned_a, column0), +// ProjectionNode::Make(ExpressionVector_(pruned_a), +// pruned_node_a), +// stored_table_node)); +// // clang-format on +// +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +// } + +// TEST_F(LqpColumnPruningRuleTest, DoNotPruneUpdateInputs) { +// // Do not prune away input columns to Update, Update needs them all +// +// // clang-format off +// const auto select_rows_lqp = +// PredicateNode::Make(GreaterThan_(a, 5), +// node_a); +// +// const auto lqp = +// UpdateNode::Make("dummy", +// select_rows_lqp, +// ProjectionNode::Make(ExpressionVector_(a, Add_(b, 1), c), +// select_rows_lqp)); +// // clang-format on +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// const auto expected_lqp = lqp->DeepCopy(); +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +// } +// +// TEST_F(LqpColumnPruningRuleTest, DoNotPruneInsertInputs) { +// // Do not prune away input columns to Insert, Insert needs them all +// +// // clang-format off +// const auto lqp = +// InsertNode::Make("dummy", +// PredicateNode::Make(GreaterThan_(a, 5), +// node_a)); +// // clang-format on +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// const auto expected_lqp = lqp->DeepCopy(); +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +// } +// +// TEST_F(LqpColumnPruningRuleTest, DoNotPruneDeleteInputs) { +// // Do not prune away input columns to Delete, Delete needs them all +// +// // clang-format off +// const auto lqp = +// DeleteNode::Make( +// PredicateNode::Make(GreaterThan_(a, 5), +// node_a)); +// // clang-format on +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// const auto expected_lqp = lqp->DeepCopy(); +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +// } +// +// TEST_F(LqpColumnPruningRuleTest, DoNotPruneExportInputs) { +// // Do not prune away input columns to Export, Export needs them all +// +// // clang-format off +// const auto lqp = +// ExportNode::Make("dummy", "dummy.csv", FileType::Auto, +// PredicateNode::Make(GreaterThan_(a, 5), +// node_a)); +// // clang-format on +// +// const auto actual_lqp = StrategyBaseTest::ApplyRule(rule_, lqp); +// const auto expected_lqp = lqp->DeepCopy(); +// EXPECT_LQP_EQ(actual_lqp, expected_lqp); +// } + +} // namespace skyrise