From 48c30705ef0c2cb301ecc69d0f844270abef8a97 Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Thu, 4 Jun 2026 22:39:56 +0000 Subject: [PATCH] Add dynamic RX flow lifecycle Signed-off-by: Cliff Burdick --- docs/api-reference/configuration.md | 22 +- docs/api-reference/cpp.md | 99 +- docs/api-reference/python.md | 9 +- docs/concepts.md | 11 +- docs/tutorials/configuration-walkthrough.md | 5 +- examples/CMakeLists.txt | 3 + examples/README.md | 5 + examples/daqiri_example_dynamic_rx_flow.yaml | 77 ++ examples/dynamic_rx_flow_example.cpp | 442 +++++++ include/daqiri/common.h | 77 +- include/daqiri/types.h | 56 +- python/daqiri_common_pybind.cpp | 54 +- src/common.cpp | 28 +- src/manager.cpp | 30 + src/manager.h | 8 +- src/managers/dpdk/daqiri_dpdk_mgr.cpp | 1202 ++++++++++++++++-- src/managers/dpdk/daqiri_dpdk_mgr.h | 120 +- src/managers/rdma/daqiri_rdma_mgr.h | 2 +- src/managers/socket/daqiri_socket_mgr.cpp | 2 +- src/managers/socket/daqiri_socket_mgr.h | 2 +- 20 files changed, 2094 insertions(+), 160 deletions(-) create mode 100644 examples/daqiri_example_dynamic_rx_flow.yaml create mode 100644 examples/dynamic_rx_flow_example.cpp diff --git a/docs/api-reference/configuration.md b/docs/api-reference/configuration.md index 47d6519..461932e 100644 --- a/docs/api-reference/configuration.md +++ b/docs/api-reference/configuration.md @@ -172,11 +172,15 @@ RDMA transport settings: ### Flows -`rx.flows:` — Flow rules that steer packets to specific queues based on match criteria. +`rx.flows:` — Static startup flow rules that steer packets to specific queues based on +match criteria. This sequence may be omitted; a queues-only RX config can add DPDK RX +flows later with the dynamic flow API. - **`name`**: Flow name. - type: `string` -- **`id`**: Flow ID. Retrievable at runtime via `get_packet_flow_id()`. +- **`id`**: Non-zero static flow ID. Retrievable at runtime via `get_packet_flow_id()`. + Static IDs are reserved for the lifetime of the process and are not deletable through + the dynamic flow API. - type: `integer` - **`action`**: What to do with matched packets. - **`type`**: Action type. Only `queue` is currently supported. @@ -201,11 +205,23 @@ RDMA transport settings: ### Flow Isolation `rx.flow_isolation:` — When `true`, only packets matching an explicit flow rule are delivered. -Unmatched packets are dropped. When `false`, unmatched packets go to a default queue. +Unmatched packets are dropped. When `false`, unmatched packets go to a default queue. A +queues-only config can set `flow_isolation: true` and then install dynamic RX flows after +`daqiri_init()`. - type: `boolean` - default: `false` +### Dynamic Flow Capacity + +`rx.dynamic_flow_capacity:` — DPDK template-table capacity reserved for dynamic RX flow +rules on this interface. DAQIRI uses this when the DPDK template/async fast path is +available; legacy fallback paths still accept dynamic RX flow operations but do not use a +template table. + +- type: `integer` +- default: `1024` + ### Hardware Timestamps `rx.hardware_timestamps:` — Enable per-packet hardware RX timestamps for Raw Ethernet diff --git a/docs/api-reference/cpp.md b/docs/api-reference/cpp.md index 0af6728..ed5b55e 100644 --- a/docs/api-reference/cpp.md +++ b/docs/api-reference/cpp.md @@ -77,7 +77,7 @@ For a single-segment configuration (CPU-only or batched GPU): for (int i = 0; i < daqiri::get_num_packets(burst); i++) { void *pkt = daqiri::get_packet_ptr(burst, i); uint32_t len = daqiri::get_packet_length(burst, i); - uint16_t flow = daqiri::get_packet_flow_id(burst, i); + daqiri::FlowId flow = daqiri::get_packet_flow_id(burst, i); uint64_t rx_ts_ns = 0; if (daqiri::get_packet_rx_timestamp(burst, i, &rx_ts_ns) == daqiri::Status::SUCCESS) { // rx_ts_ns is in the NIC timestamp clock domain. @@ -130,6 +130,92 @@ daqiri::free_all_segment_packets(burst, seg); daqiri::free_rx_burst(burst); ``` +## Dynamic RX Flows + +DPDK RX flows can be added and deleted after `daqiri_init()`. This supports +queues-only startup configs, including `rx.flow_isolation: true` with no +initial `rx.flows`. Static YAML flows still use explicit configured IDs and are +not deletable through this API. + +```cpp +daqiri::FlowRuleConfig flow; +flow.name_ = "udp_5000"; +flow.action_.type_ = daqiri::FlowType::QUEUE; +flow.action_.id_ = 0; +flow.match_.type_ = daqiri::FlowMatchType::IPV4_UDP; +flow.match_.udp_dst_ = 5000; + +daqiri::FlowOpId add_op = 0; +auto st = daqiri::add_rx_flow_async(0, flow, &add_op); +if (st != daqiri::Status::SUCCESS) { + // invalid port/queue/match, unsupported backend, or no flow IDs available +} + +daqiri::FlowId flow_id = 0; +daqiri::FlowOpResult result; +while (flow_id == 0) { + st = daqiri::poll_flow_op(&result); + if (st == daqiri::Status::NOT_READY) { + continue; + } + if (st != daqiri::Status::SUCCESS) { + // handle poll error + break; + } + if (result.op_id_ == add_op) { + if (result.status_ != daqiri::Status::SUCCESS) { + // handle flow create failure + break; + } + flow_id = result.flow_id_; + } +} +``` + +Packets matching a dynamic rule are marked with the same `FlowId` returned by +the add completion, so `get_packet_flow_id()` gives the handle to pass to +`delete_flow_async()`. `poll_flow_op()` returns `Status::NOT_READY` when no flow +operation has completed yet. + +Multiple RX flows can be added as one operation. This maps to a single DPDK +template queue push when the IPv4/UDP template path is available, and +`poll_flow_op()` returns one batch completion when all creates in the batch have +resolved. + +```cpp +std::vector flows; +flows.push_back(flow); +flows.push_back(flow); +flows.back().name_ = "udp_5001"; +flows.back().match_.udp_dst_ = 5001; + +daqiri::FlowOpId batch_op = 0; +st = daqiri::add_rx_flows_async(0, flows, &batch_op); + +std::vector flow_ids; +while (flow_ids.empty()) { + st = daqiri::poll_flow_op(&result); + if (st == daqiri::Status::NOT_READY) { + continue; + } + if (st == daqiri::Status::SUCCESS && result.op_id_ == batch_op) { + flow_ids = result.flow_ids_; + } +} +``` + +For batch completions, `flow_ids_` is in the same order as the input rules. If +the completion status is not `SUCCESS`, nonzero entries were installed and zero +entries were not installed. + +```cpp +daqiri::FlowOpId delete_op = 0; +auto delete_status = daqiri::delete_flow_async(flow_id, &delete_op); +``` + +Dynamic flow support is RX-only in v1. Socket, RDMA, and software loopback +managers return `NOT_SUPPORTED`. + ## Reordered RX Bursts For an overview of what RX reorder is and when to use it, see @@ -411,9 +497,18 @@ workflow sections above show the common call order and ownership rules. | `get_segment_packet_ptr(burst, seg, idx)` | Return a packet pointer for a specific segment. | | `get_packet_length(burst, idx)` | Return the logical packet length. | | `get_segment_packet_length(burst, seg, idx)` | Return the length of one packet segment. | -| `get_packet_flow_id(burst, idx)` | Return the matched flow ID, or `0` when no flow matched. | +| `get_packet_flow_id(burst, idx)` | Return the matched `FlowId`, or `0` when no flow matched. | | `get_packet_rx_timestamp(burst, idx, ×tamp_ns)` | Return the hardware RX timestamp when enabled and available. | +### Dynamic RX Flow Lifecycle + +| Function | Purpose | +| --- | --- | +| `add_rx_flow_async(port, flow, &op_id)` | Enqueue a dynamic RX flow create. The add completion returns the allocated `FlowId`. | +| `add_rx_flows_async(port, flows, &op_id)` | Enqueue a dynamic RX flow batch create. One completion returns allocated `FlowId`s in input order. | +| `delete_flow_async(flow_id, &op_id)` | Enqueue deletion of an active dynamic flow. Static YAML flows and unknown IDs return `INVALID_PARAMETER`. | +| `poll_flow_op(&result)` | Return one completed flow operation, or `NOT_READY` when none are ready. | + ### RX and Reorder | Function | Purpose | diff --git a/docs/api-reference/python.md b/docs/api-reference/python.md index 3936b95..a907415 100644 --- a/docs/api-reference/python.md +++ b/docs/api-reference/python.md @@ -573,6 +573,10 @@ The workflow sections above show the common call order and ownership rules. | `drop_all_traffic(port)` | Install a high-priority drop rule on a port. | | `allow_all_traffic(port)` | Remove a drop rule installed by `drop_all_traffic`. | | `flush_port_queue(port, queue)` | Drain stale packets from a port queue. | +| `add_rx_flow_async(port, flow)` | Return `(Status, op_id)` after enqueueing one dynamic RX flow create. | +| `add_rx_flows_async(port, flows)` | Return `(Status, op_id)` after enqueueing a dynamic RX flow batch create. One completion returns `flow_ids` in input order. | +| `delete_flow_async(flow_id)` | Return `(Status, op_id)` after enqueueing deletion of one dynamic flow. | +| `poll_flow_op()` | Return `(Status, FlowOpResult)`, or `NOT_READY` when no dynamic flow operation has completed. | | `socket_connect_to_server(server_addr, server_port[, src_addr])` | Return `(Status, conn_id)`. | | `socket_get_port_queue(conn_id)` | Return `(Status, port, queue)`. | | `socket_get_server_conn_id(server_addr, server_port)` | Return `(Status, conn_id)`. | @@ -614,7 +618,8 @@ The workflow sections above show the common call order and ownership rules. | `RDMATransportMode` | `RC`, `UC`, `UD`, `INVALID` | | `SocketMode` | `CLIENT`, `SERVER`, `INVALID` | | `FlowType` | `QUEUE` | -| `FlowMatchType` | `NORMAL`, `FLEX_ITEM` | +| `FlowMatchType` | `IPV4_UDP`, `FLEX_ITEM` | +| `FlowOpType` | `ADD_RX`, `ADD_RX_BATCH`, `DELETE` | | `ReorderMethod` | `INVALID`, `SEQ_BATCH_NUMBER`, `SEQ_PACKETS_PER_BATCH` | | `ReorderDataType` | `SAME`, `INT4`, `INT8`, `INT16`, `INT32`, `FP16`, `BF16`, `FP32`, `FP64`, `INVALID` | | `ReorderEndianness` | `HOST`, `NETWORK`, `INVALID` | @@ -647,6 +652,8 @@ names that mostly omit the trailing underscore from the C++ member name (e.g. | `FlowAction` | Flow action type and target ID. | | `FlowMatch` | Flow match fields for UDP, IPv4, and flex item matching. | | `FlowConfig` | Named flow rule combining action and match. | +| `FlowRuleConfig` | Dynamic flow rule match and action. | +| `FlowOpResult` | Dynamic flow operation completion. Batch adds return `flow_ids` in input order. | | `FlexItemConfig` | Flexible parser item configuration. | | `FlexItemMatch` | Flexible parser match value and mask. | | `SocketConfig` | Socket client/server endpoint and timing settings. | diff --git a/docs/concepts.md b/docs/concepts.md index c9ca280..b28e818 100644 --- a/docs/concepts.md +++ b/docs/concepts.md @@ -222,7 +222,16 @@ Flow rules are only available in Raw Ethernet (`stream_type: "raw"`). A flow's match can combine fields such as `udp_src`, `udp_dst`, and `ipv4_len`; multiple flows can target the same queue, and the matching flow's ID is available at runtime so the application can distinguish -them. Flows are configured under `rx.flows` in the YAML. +them. + +Flows can be static or dynamic. Static flows are configured under +`rx.flows` in the YAML and keep their configured IDs for the process lifetime. +Dynamic RX flows are added after `daqiri_init()` with `add_rx_flow_async()` or +`add_rx_flows_async()`; their non-zero `FlowId`s are allocated by DAQIRI, +returned in the add completion, and used as the packet marks returned by +`get_packet_flow_id()`. Batch adds complete with a single operation result whose +flow IDs are in input order. Only dynamic flows can be deleted dynamically. TX +dynamic flows are not part of v1. ### Flow Steering diff --git a/docs/tutorials/configuration-walkthrough.md b/docs/tutorials/configuration-walkthrough.md index bc4a04c..a7b3130 100644 --- a/docs/tutorials/configuration-walkthrough.md +++ b/docs/tutorials/configuration-walkthrough.md @@ -96,8 +96,9 @@ For a shorter backend-selection guide, start with the [Benchmarking overview](.. ??? question "4. I need flow-based load balancing across multiple RX queues" - **Closed-loop TX+RX with four queues** — [`daqiri_bench_raw_tx_rx_4q.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_tx_rx_4q.yaml) (runs on `daqiri_bench_raw_gpudirect`). - [`daqiri_bench_raw_rx_multi_q.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_rx_multi_q.yaml) (runs on `daqiri_bench_raw_gpudirect`). + - **Dynamic RX flow lifecycle** — [`daqiri_example_dynamic_rx_flow.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_example_dynamic_rx_flow.yaml) (runs on `daqiri_example_dynamic_rx_flow`). Starts with `flow_isolation: true` and no configured flows, then dynamically routes one UDP flow to RX queue 0 and queue 1 in sequence. - The four-queue TX+RX config is self-contained and maps each `bench_tx`/`bench_rx` list entry to the matching DAQIRI queue. The RX-only config is for an external traffic source. Both demonstrate flow-rule-based routing across multiple RX queues, each pinned to its own CPU core. + The four-queue TX+RX config is self-contained and maps each `bench_tx`/`bench_rx` list entry to the matching DAQIRI queue. The RX-only config is for an external traffic source. The dynamic-flow example demonstrates queues-only startup and runtime flow insertion/deletion. All three demonstrate flow-rule-based routing across multiple RX queues, each pinned to its own CPU core. *Requires: Raw Ethernet build (`DAQIRI_MGR` includes `dpdk`) + NVIDIA ConnectX-class NIC. The RX-only config also requires a separate TX traffic source.* @@ -368,7 +369,7 @@ flows: udp_dst: 5000 ``` -1. **`id`** · `integer` · *required* — Flow tag attached to matching packets. Set to a non-zero value here so the `reorder_configs:` block below can reference it via `flow_ids:` to select which packets to reorder. +1. **`id`** · `integer` · *required* — Static flow tag attached to matching packets. Set to a non-zero value here so the `reorder_configs:` block below can reference it via `flow_ids:` to select which packets to reorder. Dynamic RX flows are added after initialization and are not attached to reorder configs in v1. **The `reorder_configs:` block.** The core of the feature — sits inside the `rx:` section alongside `queues` and `flows`. diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 9915ba0..7d40437 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -34,6 +34,7 @@ set(DAQIRI_BENCH_CONFIGS daqiri_bench_raw_rx_reorder_seq_batch.yaml daqiri_bench_raw_rx_multi_q.yaml daqiri_bench_raw_sw_loopback.yaml + daqiri_example_dynamic_rx_flow.yaml daqiri_example_gds_write_sw_loopback.yaml daqiri_example_gds_write_tx_rx.yaml daqiri_example_pcap_writer_sw_loopback.yaml @@ -95,6 +96,7 @@ if(DAQIRI_ENABLE_OTEL_METRICS) endif() add_daqiri_raw_bench(daqiri_bench_raw_reorder_seq raw_reorder_seq_bench.cpp) add_daqiri_raw_bench(daqiri_bench_raw_reorder_quantize raw_reorder_quantize_bench.cpp) +add_daqiri_raw_bench(daqiri_example_dynamic_rx_flow dynamic_rx_flow_example.cpp) add_daqiri_raw_bench(daqiri_example_gds_write gds_write_example.cpp) add_daqiri_raw_bench(daqiri_example_pcap_writer pcap_writer_example.cpp) @@ -125,6 +127,7 @@ install(TARGETS daqiri_bench_raw_gpudirect daqiri_bench_raw_reorder_seq daqiri_bench_raw_reorder_quantize + daqiri_example_dynamic_rx_flow daqiri_example_gds_write daqiri_example_pcap_writer daqiri_bench_rdma diff --git a/examples/README.md b/examples/README.md index 2f24ff5..109032e 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,6 +7,9 @@ Standalone benchmark applications for testing performance of DAQIRI with various - `daqiri_bench_raw_reorder_seq`: raw RX sequence-number reorder benchmark - `daqiri_bench_raw_reorder_quantize`: raw RX sequence reorder with payload conversion - `daqiri_example_pcap_writer`: RX pcap writer with optional GPUDirect demo TX traffic +- `daqiri_example_dynamic_rx_flow`: raw TX/RX example that starts with RX flow + isolation and no configured flows, then dynamically steers one UDP flow to + queues 0 and 1 in sequence - `daqiri_bench_rdma`: RDMA benchmark logic (former `rdma_bench.h`) - `daqiri_bench_socket`: TCP/UDP socket benchmark logic - `daqiri_example_gds_write`: one-shot capture that demonstrates synchronous and @@ -50,6 +53,7 @@ Run: ./build/examples/daqiri_bench_raw_reorder_seq ./build/examples/daqiri_bench_raw_tx_rx_reorder_seq_1024.yaml --seconds 10 ./build/examples/daqiri_bench_raw_reorder_quantize ./build/examples/daqiri_bench_raw_tx_rx_reorder_quantize_seq_batch.yaml --seconds 10 ./build/examples/daqiri_example_pcap_writer ./build/examples/daqiri_example_pcap_writer_sw_loopback.yaml /tmp/daqiri-capture.pcap --tx +./build/examples/daqiri_example_dynamic_rx_flow ./build/examples/daqiri_example_dynamic_rx_flow.yaml --target-gbps 10 ./build/examples/daqiri_bench_rdma ./build/examples/daqiri_bench_rdma_tx_rx.yaml --seconds 10 --mode both ./build/examples/daqiri_bench_socket ./build/examples/daqiri_bench_socket_udp_tx_rx.yaml --seconds 10 --mode both ./build/examples/daqiri_bench_socket ./build/examples/daqiri_bench_socket_tcp_tx_rx.yaml --seconds 10 --mode both @@ -72,6 +76,7 @@ Included configs: | `daqiri_bench_raw_tx_rx.yaml` | `daqiri_bench_raw_gpudirect` | | `daqiri_bench_raw_tx_rx_4q.yaml` | `daqiri_bench_raw_gpudirect` | | `daqiri_bench_raw_sw_loopback.yaml` | `daqiri_bench_raw_gpudirect` | +| `daqiri_example_dynamic_rx_flow.yaml` | `daqiri_example_dynamic_rx_flow` | | `daqiri_example_gds_write_sw_loopback.yaml` | `daqiri_example_gds_write` | | `daqiri_example_gds_write_tx_rx.yaml` | `daqiri_example_gds_write` | | `daqiri_bench_raw_rx_multi_q.yaml` | `daqiri_bench_raw_gpudirect` | diff --git a/examples/daqiri_example_dynamic_rx_flow.yaml b/examples/daqiri_example_dynamic_rx_flow.yaml new file mode 100644 index 0000000..62e5ef7 --- /dev/null +++ b/examples/daqiri_example_dynamic_rx_flow.yaml @@ -0,0 +1,77 @@ +%YAML 1.2 +--- +daqiri: + cfg: + version: 1 + stream_type: "raw" + master_core: 3 + debug: false + log_level: "info" + loopback: "" + + memory_regions: + - name: "Data_TX_GPU" + kind: "device" + affinity: 0 + num_bufs: 51200 + buf_size: 1064 + - name: "Data_RX_GPU_0" + kind: "device" + affinity: 0 + num_bufs: 51200 + buf_size: 1064 + - name: "Data_RX_GPU_1" + kind: "device" + affinity: 0 + num_bufs: 51200 + buf_size: 1064 + + interfaces: + - name: "tx_port" + address: <0000:00:00.0> + tx: + queues: + - name: "tx_q_0" + id: 0 + batch_size: 1024 + cpu_core: 4 + memory_regions: + - "Data_TX_GPU" + offloads: + - "tx_eth_src" + - name: "rx_port" + address: <0000:00:00.0> + rx: + flow_isolation: true + dynamic_flow_capacity: 1024 + queues: + - name: "rx_q_0" + id: 0 + cpu_core: 8 + batch_size: 1024 + memory_regions: + - "Data_RX_GPU_0" + - name: "rx_q_1" + id: 1 + cpu_core: 9 + batch_size: 1024 + memory_regions: + - "Data_RX_GPU_1" + +bench_rx: +- interface_name: "rx_port" + queue_id: 0 +- interface_name: "rx_port" + queue_id: 1 + +bench_tx: +- interface_name: "tx_port" + queue_id: 0 + batch_size: 1024 + payload_size: 1000 + header_size: 64 + eth_dst_addr: <00:00:00:00:00:00> + ip_src_addr: <1.2.3.4> + ip_dst_addr: <5.6.7.8> + udp_src_port: 4096 + udp_dst_port: 4096 diff --git a/examples/dynamic_rx_flow_example.cpp b/examples/dynamic_rx_flow_example.cpp new file mode 100644 index 0000000..f5b3c35 --- /dev/null +++ b/examples/dynamic_rx_flow_example.cpp @@ -0,0 +1,442 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. + * All rights reserved. SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "raw_bench_common.h" +#include + +namespace { + +using Clock = std::chrono::steady_clock; + +struct Args { + std::string config_path; + int drop_ms = 1000; + int active_ms = 1000; + int drain_ms = 200; + double target_gbps = 0.0; + double tolerance = 0.35; +}; + +struct RxCounter { + std::atomic packets{0}; + std::atomic bytes{0}; + std::atomic bursts{0}; +}; + +Args parse_args(int argc, char** argv) { + if (argc < 2) { + throw std::runtime_error( + "Usage: dynamic_rx_flow_example [--target-gbps G] " + "[--drop-ms N] [--active-ms N] [--drain-ms N] [--tolerance R]"); + } + + Args args; + args.config_path = argv[1]; + for (int i = 2; i < argc; ++i) { + const std::string opt = argv[i]; + if (i + 1 >= argc) { + throw std::runtime_error("Missing value for " + opt); + } + const std::string value = argv[++i]; + if (opt == "--target-gbps") { + args.target_gbps = std::stod(value); + } else if (opt == "--drop-ms") { + args.drop_ms = std::stoi(value); + } else if (opt == "--active-ms") { + args.active_ms = std::stoi(value); + } else if (opt == "--drain-ms") { + args.drain_ms = std::stoi(value); + } else if (opt == "--tolerance") { + args.tolerance = std::stod(value); + } else { + throw std::runtime_error("Unknown option: " + opt); + } + } + + if (args.drop_ms < 0 || args.active_ms <= 0 || args.drain_ms < 0) { + throw std::runtime_error("Timing values must be non-negative, and active-ms must be positive"); + } + if (args.tolerance < 0.0) { + throw std::runtime_error("tolerance must be non-negative"); + } + return args; +} + +uint32_t parse_ipv4_host_order(const std::string& addr) { + uint32_t value = 0; + if (inet_pton(AF_INET, addr.c_str(), &value) != 1) { + throw std::runtime_error("Invalid IPv4 address: " + addr); + } + return ntohl(value); +} + +uint16_t single_udp_port(const std::string& spec, const char* field) { + const auto ports = daqiri::bench::parse_udp_ports(spec); + if (ports.size() != 1) { + throw std::runtime_error(std::string(field) + " must be one UDP port for this example"); + } + return ports.front(); +} + +void rx_worker(const daqiri::bench::RawBenchRxConfig& cfg, + RxCounter& counter, + std::atomic& stop) { + const int port_id = daqiri::get_port_id(cfg.interface_name); + if (port_id < 0) { + std::cerr << "Invalid RX interface_name: " << cfg.interface_name << "\n"; + stop.store(true); + return; + } + + while (!stop.load()) { + daqiri::BurstParams* burst = nullptr; + if (daqiri::get_rx_burst(&burst, port_id, cfg.queue_id) != daqiri::Status::SUCCESS || + burst == nullptr) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + continue; + } + + counter.packets.fetch_add(static_cast(daqiri::get_num_packets(burst)), + std::memory_order_relaxed); + counter.bytes.fetch_add(daqiri::get_burst_tot_byte(burst), std::memory_order_relaxed); + counter.bursts.fetch_add(1, std::memory_order_relaxed); + daqiri::free_all_packets_and_burst_rx(burst); + } +} + +void tx_worker(const daqiri::bench::RawBenchTxConfig& cfg, + daqiri::bench::TokenBucketPacer& pacer, + std::atomic& stop) { + const int port_id = daqiri::get_port_id(cfg.interface_name); + if (port_id < 0) { + std::cerr << "Invalid TX interface_name: " << cfg.interface_name << "\n"; + stop.store(true); + return; + } + + char eth_dst[6] = {0}; + char eth_src[6] = {0}; + daqiri::format_eth_addr(eth_src, cfg.eth_src_addr); + daqiri::format_eth_addr(eth_dst, cfg.eth_dst_addr); + + const uint32_t ip_src = parse_ipv4_host_order(cfg.ip_src_addr); + const uint32_t ip_dst = parse_ipv4_host_order(cfg.ip_dst_addr); + const uint16_t udp_src = single_udp_port(cfg.udp_src_port, "bench_tx.udp_src_port"); + const uint16_t udp_dst = single_udp_port(cfg.udp_dst_port, "bench_tx.udp_dst_port"); + const uint64_t packet_size = static_cast(cfg.header_size) + cfg.payload_size; + + std::vector packet_template(static_cast(packet_size)); + daqiri::bench::populate_udp_ipv4_headers(packet_template.data(), + cfg.header_size, + cfg.payload_size, + eth_src, + eth_dst, + ip_src, + ip_dst, + udp_src, + udp_dst); + for (uint32_t i = 0; i < cfg.payload_size; ++i) { + packet_template[static_cast(cfg.header_size) + i] = static_cast(i); + } + daqiri::bench::finalize_udp_ipv4_checksums(packet_template.data()); + + std::unordered_set initialized_tx_buffers; + cudaEvent_t copy_done = nullptr; + if (cudaEventCreateWithFlags(©_done, cudaEventDisableTiming) != cudaSuccess) { + std::cerr << "Failed to create CUDA copy completion event\n"; + stop.store(true); + return; + } + + uint64_t packets = 0; + uint64_t bursts = 0; + const auto t0 = Clock::now(); + + while (!stop.load()) { + auto* msg = daqiri::create_tx_burst_params(); + daqiri::set_header(msg, + static_cast(port_id), + static_cast(cfg.queue_id), + cfg.batch_size, + 1); + + if (!daqiri::is_tx_burst_available(msg)) { + daqiri::free_tx_metadata(msg); + std::this_thread::sleep_for(std::chrono::microseconds(100)); + continue; + } + + if (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS) { + daqiri::free_tx_metadata(msg); + continue; + } + + bool failed = false; + const int num_pkts = static_cast(daqiri::get_num_packets(msg)); + for (int i = 0; i < num_pkts; ++i) { + void* pkt = daqiri::get_segment_packet_ptr(msg, 0, i); + if (initialized_tx_buffers.insert(pkt).second) { + if (cudaMemcpyAsync(pkt, + packet_template.data(), + packet_template.size(), + cudaMemcpyHostToDevice, + nullptr) != cudaSuccess || + cudaEventRecord(copy_done, nullptr) != cudaSuccess || + cudaEventSynchronize(copy_done) != cudaSuccess) { + initialized_tx_buffers.erase(pkt); + failed = true; + break; + } + } + + if (daqiri::set_packet_lengths(msg, i, {static_cast(packet_size)}) != + daqiri::Status::SUCCESS) { + failed = true; + break; + } + } + + if (failed) { + daqiri::free_all_packets_and_burst_tx(msg); + continue; + } + + if (daqiri::send_tx_burst(msg) == daqiri::Status::SUCCESS) { + packets += static_cast(num_pkts); + ++bursts; + pacer.wait_for_bytes(static_cast(num_pkts) * packet_size, stop); + } + } + + const double secs = std::chrono::duration(Clock::now() - t0).count(); + std::cout << "TX complete: interface=" << cfg.interface_name + << " queue=" << cfg.queue_id + << " packets=" << packets + << " bursts=" << bursts + << " seconds=" << secs + << "\n"; + cudaEventDestroy(copy_done); +} + +daqiri::FlowId wait_for_flow_op(daqiri::FlowOpId op_id, + daqiri::FlowOpType expected_type, + const std::chrono::seconds timeout) { + const auto deadline = Clock::now() + timeout; + while (Clock::now() < deadline) { + daqiri::FlowOpResult result; + const auto status = daqiri::poll_flow_op(&result); + if (status == daqiri::Status::NOT_READY) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + if (status != daqiri::Status::SUCCESS) { + throw std::runtime_error("poll_flow_op failed"); + } + if (result.op_id_ != op_id) { + continue; + } + if (result.type_ != expected_type) { + throw std::runtime_error("flow operation completed with unexpected type"); + } + if (result.status_ != daqiri::Status::SUCCESS) { + throw std::runtime_error("flow operation completed with failure status"); + } + return result.flow_id_; + } + + throw std::runtime_error("timed out waiting for dynamic flow operation"); +} + +daqiri::FlowId add_dynamic_udp_flow(int port_id, + uint16_t queue_id, + uint16_t udp_src, + uint16_t udp_dst) { + daqiri::FlowRuleConfig flow; + flow.name_ = "dynamic_udp_" + std::to_string(udp_dst) + "_q" + std::to_string(queue_id); + flow.action_.type_ = daqiri::FlowType::QUEUE; + flow.action_.id_ = queue_id; + flow.match_.type_ = daqiri::FlowMatchType::IPV4_UDP; + flow.match_.udp_src_ = udp_src; + flow.match_.udp_dst_ = udp_dst; + + daqiri::FlowOpId op_id = 0; + const auto status = daqiri::add_rx_flow_async(port_id, flow, &op_id); + if (status != daqiri::Status::SUCCESS) { + throw std::runtime_error("add_rx_flow_async was not accepted"); + } + return wait_for_flow_op(op_id, daqiri::FlowOpType::ADD_RX, std::chrono::seconds(5)); +} + +void delete_dynamic_flow(daqiri::FlowId flow_id) { + daqiri::FlowOpId op_id = 0; + const auto status = daqiri::delete_flow_async(flow_id, &op_id); + if (status != daqiri::Status::SUCCESS) { + throw std::runtime_error("delete_flow_async was not accepted"); + } + wait_for_flow_op(op_id, daqiri::FlowOpType::DELETE, std::chrono::seconds(5)); +} + +uint64_t packets(const RxCounter& counter) { + return counter.packets.load(std::memory_order_relaxed); +} + +void print_rx_counter(const daqiri::bench::RawBenchRxConfig& cfg, + const RxCounter& counter) { + std::cout << "RX complete: interface=" << cfg.interface_name + << " queue=" << cfg.queue_id + << " packets=" << counter.packets.load(std::memory_order_relaxed) + << " bytes=" << counter.bytes.load(std::memory_order_relaxed) + << " bursts=" << counter.bursts.load(std::memory_order_relaxed) + << "\n"; +} + +} // namespace + +int main(int argc, char** argv) { + bool daqiri_initialized = false; + std::atomic stop{false}; + std::thread tx_thread; + std::vector rx_threads; + + try { + const Args args = parse_args(argc, argv); + const YAML::Node root = YAML::LoadFile(args.config_path); + const auto rx_configs = daqiri::bench::parse_rx_configs(root); + const auto tx_configs = daqiri::bench::parse_tx_configs(root); + if (rx_configs.size() != 2 || rx_configs[0].queue_id != 0 || rx_configs[1].queue_id != 1) { + throw std::runtime_error("Config must define bench_rx entries for queue 0 and queue 1"); + } + if (tx_configs.size() != 1) { + throw std::runtime_error("Config must define exactly one bench_tx entry"); + } + + const uint16_t udp_src = single_udp_port(tx_configs[0].udp_src_port, "bench_tx.udp_src_port"); + const uint16_t udp_dst = single_udp_port(tx_configs[0].udp_dst_port, "bench_tx.udp_dst_port"); + + if (daqiri::daqiri_init(args.config_path) != daqiri::Status::SUCCESS) { + throw std::runtime_error("daqiri_init failed"); + } + daqiri_initialized = true; + + const int rx_port_id = daqiri::get_port_id(rx_configs[0].interface_name); + if (rx_port_id < 0) { + throw std::runtime_error("Invalid RX interface_name: " + rx_configs[0].interface_name); + } + + std::vector counters(2); + rx_threads.reserve(rx_configs.size()); + for (size_t i = 0; i < rx_configs.size(); ++i) { + rx_threads.emplace_back(rx_worker, std::cref(rx_configs[i]), std::ref(counters[i]), std::ref(stop)); + } + + daqiri::bench::TokenBucketPacer pacer(args.target_gbps); + tx_thread = std::thread(tx_worker, std::cref(tx_configs[0]), std::ref(pacer), std::ref(stop)); + + std::cout << "Initial drop window: UDP " << udp_src << " -> " << udp_dst + << " has no configured flow for " << args.drop_ms << " ms\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(args.drop_ms)); + const uint64_t dropped_window_packets = packets(counters[0]) + packets(counters[1]); + if (dropped_window_packets != 0) { + throw std::runtime_error("Packets arrived during the initial flow-isolated drop window"); + } + + std::cout << "Adding dynamic UDP flow to RX queue 0 for " << args.active_ms << " ms\n"; + const daqiri::FlowId q0_flow = add_dynamic_udp_flow(rx_port_id, 0, udp_src, udp_dst); + std::this_thread::sleep_for(std::chrono::milliseconds(args.active_ms)); + delete_dynamic_flow(q0_flow); + std::this_thread::sleep_for(std::chrono::milliseconds(args.drain_ms)); + const uint64_t q0_after = packets(counters[0]); + const uint64_t q1_after_q0 = packets(counters[1]); + + std::cout << "Adding dynamic UDP flow to RX queue 1 for " << args.active_ms << " ms\n"; + const daqiri::FlowId q1_flow = add_dynamic_udp_flow(rx_port_id, 1, udp_src, udp_dst); + std::this_thread::sleep_for(std::chrono::milliseconds(args.active_ms)); + delete_dynamic_flow(q1_flow); + std::this_thread::sleep_for(std::chrono::milliseconds(args.drain_ms)); + + stop.store(true); + if (tx_thread.joinable()) { + tx_thread.join(); + } + for (auto& thread : rx_threads) { + if (thread.joinable()) { + thread.join(); + } + } + + print_rx_counter(rx_configs[0], counters[0]); + print_rx_counter(rx_configs[1], counters[1]); + + const uint64_t q0_packets = packets(counters[0]); + const uint64_t q1_packets = packets(counters[1]); + if (q0_packets == 0 || q1_packets == 0) { + throw std::runtime_error("Expected nonzero packet counts on both RX queues"); + } + if (q1_after_q0 != 0) { + throw std::runtime_error("Queue 1 received packets while only the queue 0 flow was active"); + } + if (q0_packets < q0_after) { + throw std::runtime_error("Queue 0 packet counter regressed"); + } + + const auto min_packets = static_cast(std::min(q0_packets, q1_packets)); + const auto max_packets = static_cast(std::max(q0_packets, q1_packets)); + const double relative_delta = (max_packets - min_packets) / max_packets; + std::cout << "Queue packet relative delta=" << relative_delta + << " tolerance=" << args.tolerance << "\n"; + if (relative_delta > args.tolerance) { + throw std::runtime_error("RX queue packet counts are not similar"); + } + + daqiri::print_stats(); + daqiri::shutdown(); + daqiri_initialized = false; + return 0; + } catch (const std::exception& e) { + std::cerr << "dynamic_rx_flow_example failed: " << e.what() << "\n"; + stop.store(true); + if (tx_thread.joinable()) { + tx_thread.join(); + } + for (auto& thread : rx_threads) { + if (thread.joinable()) { + thread.join(); + } + } + if (daqiri_initialized) { + daqiri::shutdown(); + } + return 1; + } +} diff --git a/include/daqiri/common.h b/include/daqiri/common.h index dee1b5d..ea7467c 100644 --- a/include/daqiri/common.h +++ b/include/daqiri/common.h @@ -167,9 +167,58 @@ uint32_t get_packet_length(BurstParams *burst, int idx); * * @param burst Burst structure containing packets * @param idx Index of packet - * @return uint16_t Flow ID + * @return FlowId Flow ID, or 0 when no flow matched */ -uint16_t get_packet_flow_id(BurstParams* burst, int idx); +FlowId get_packet_flow_id(BurstParams* burst, int idx); + +/** + * @brief Enqueue creation of a dynamic RX flow rule. + * + * The flow ID is allocated by DAQIRI and returned in the completion result from + * poll_flow_op(). Packets matching the dynamic rule are marked with the same + * ID, so get_packet_flow_id() can be used to identify and later delete the rule. + * + * @param port Port ID of interface + * @param flow Rule match and queue action to install + * @param op_id Output operation ID used to track completion + * @return Status indicating whether the operation was accepted + */ +Status add_rx_flow_async(int port, const FlowRuleConfig &flow, FlowOpId *op_id); + +/** + * @brief Enqueue creation of multiple dynamic RX flow rules. + * + * The batch add completion is delivered as a single FlowOpResult. On success, + * FlowOpResult::flow_ids_ contains one FlowId per input rule, in input order. + * If the batch completion fails, nonzero entries in flow_ids_ were installed + * and can be deleted with delete_flow_async(); zero entries were not installed. + * + * @param port Port ID of interface + * @param flows Rule matches and queue actions to install + * @param op_id Output operation ID used to track completion + * @return Status indicating whether the operation was accepted + */ +Status add_rx_flows_async(int port, const std::vector &flows, FlowOpId *op_id); + +/** + * @brief Enqueue deletion of a dynamic flow rule. + * + * Only flows created by add_rx_flow_async() or add_rx_flows_async() are + * deletable through this API. + * + * @param flow_id Dynamic flow ID returned by an add completion + * @param op_id Output operation ID used to track completion + * @return Status indicating whether the operation was accepted + */ +Status delete_flow_async(FlowId flow_id, FlowOpId *op_id); + +/** + * @brief Poll one dynamic flow operation completion. + * + * @param result Output completion details + * @return SUCCESS when a completion was returned, NOT_READY when none are ready + */ +Status poll_flow_op(FlowOpResult *result); /** * @brief Get the hardware RX timestamp of a packet in nanoseconds @@ -1111,6 +1160,13 @@ template <> struct YAML::convert { rx_cfg.flow_isolation_ = rx["flow_isolation"].as(); } catch (const std::exception& e) { rx_cfg.flow_isolation_ = false; } + try { + rx_cfg.dynamic_flow_capacity_ = + rx["dynamic_flow_capacity"].as(); + } catch (const std::exception& e) { + rx_cfg.dynamic_flow_capacity_ = daqiri::DEFAULT_DYNAMIC_FLOW_CAPACITY; + } + for (const auto &q_item : rx["queues"]) { daqiri::RxQueueConfig q; if (!parse_rx_queue_config( @@ -1128,13 +1184,20 @@ template <> struct YAML::convert { rx_cfg.queues_.emplace_back(std::move(q)); } - for (const auto &flow_item : rx["flows"]) { - daqiri::FlowConfig flow; - if (!parse_flow_config(flow_item, flow)) { - DAQIRI_LOG_ERROR("Failed to parse FlowConfig"); + if (rx["flows"].IsDefined()) { + if (!rx["flows"].IsSequence()) { + DAQIRI_LOG_ERROR("'rx.flows' must be a sequence for interface '{}'", + ifcfg.name_); return false; } - rx_cfg.flows_.emplace_back(std::move(flow)); + for (const auto &flow_item : rx["flows"]) { + daqiri::FlowConfig flow; + if (!parse_flow_config(flow_item, flow)) { + DAQIRI_LOG_ERROR("Failed to parse FlowConfig"); + return false; + } + rx_cfg.flows_.emplace_back(std::move(flow)); + } } try { diff --git a/include/daqiri/types.h b/include/daqiri/types.h index a8a4041..c9ebb93 100644 --- a/include/daqiri/types.h +++ b/include/daqiri/types.h @@ -44,6 +44,10 @@ static inline constexpr uint32_t MAX_INTERFACES = 4; static inline constexpr int MAX_NUM_SEGS = 4; static inline constexpr uint32_t DAQIRI_BURST_FLAG_REORDERED = (1U << 28); static inline constexpr uint32_t DAQIRI_BURST_FLAG_REORDER_TIMEOUT = (1U << 29); +static inline constexpr uint32_t DEFAULT_DYNAMIC_FLOW_CAPACITY = 1024; + +using FlowId = uint32_t; +using FlowOpId = uint64_t; struct ReorderBurstInfo { uint64_t batch_id; @@ -541,36 +545,57 @@ struct TxQueueConfig { enum class FlowType { QUEUE }; struct FlowAction { - FlowType type_; - uint16_t id_; + FlowType type_ = FlowType::QUEUE; + uint16_t id_ = 0; }; struct FlexItemMatch { - uint16_t flex_item_id_; - uint32_t val_; - uint32_t mask_; + uint16_t flex_item_id_ = 0; + uint32_t val_ = 0; + uint32_t mask_ = 0; }; enum class FlowMatchType { - NORMAL, + IPV4_UDP, FLEX_ITEM, }; struct FlowMatch { - FlowMatchType type_; - uint16_t udp_src_; - uint16_t udp_dst_; - uint16_t ipv4_len_; - in_addr_t ipv4_src_; - in_addr_t ipv4_dst_; + FlowMatchType type_ = FlowMatchType::IPV4_UDP; + uint16_t udp_src_ = 0; + uint16_t udp_dst_ = 0; + uint16_t ipv4_len_ = 0; + in_addr_t ipv4_src_ = INADDR_ANY; + in_addr_t ipv4_dst_ = INADDR_ANY; FlexItemMatch flex_item_match_; }; struct FlowConfig { std::string name_; - uint16_t id_; + FlowId id_ = 0; + FlowAction action_; + FlowMatch match_; + void* backend_config_ = nullptr; // Filled in by operator +}; + +struct FlowRuleConfig { + std::string name_; FlowAction action_; FlowMatch match_; - void* backend_config_; // Filled in by operator + void* backend_config_ = nullptr; // Filled in by operator +}; + +enum class FlowOpType { + ADD_RX, + ADD_RX_BATCH, + DELETE, +}; + +struct FlowOpResult { + FlowOpId op_id_ = 0; + FlowOpType type_ = FlowOpType::ADD_RX; + Status status_ = Status::NOT_READY; + FlowId flow_id_ = 0; + std::vector flow_ids_; }; struct CommonConfig { @@ -756,7 +781,7 @@ struct ReorderConfig { std::string reorder_type_; std::string memory_region_; uint32_t payload_byte_offset_ = 0; - std::vector flow_ids_; + std::vector flow_ids_; ReorderMethod method_ = ReorderMethod::INVALID; ReorderSeqBatchNumberConfig seq_batch_number_; ReorderSeqPacketsPerBatchConfig seq_packets_per_batch_; @@ -766,6 +791,7 @@ struct ReorderConfig { struct RxConfig { bool flow_isolation_ = false; bool hardware_timestamps_ = false; + uint32_t dynamic_flow_capacity_ = DEFAULT_DYNAMIC_FLOW_CAPACITY; std::vector queues_; std::vector flows_; std::vector flex_items_; diff --git a/python/daqiri_common_pybind.cpp b/python/daqiri_common_pybind.cpp index fac4ab7..8736fb3 100644 --- a/python/daqiri_common_pybind.cpp +++ b/python/daqiri_common_pybind.cpp @@ -374,9 +374,14 @@ void bind_enums(py::module_ &m) { py::enum_(m, "FlowType").value("QUEUE", FlowType::QUEUE); py::enum_(m, "FlowMatchType") - .value("NORMAL", FlowMatchType::NORMAL) + .value("IPV4_UDP", FlowMatchType::IPV4_UDP) .value("FLEX_ITEM", FlowMatchType::FLEX_ITEM); + py::enum_(m, "FlowOpType") + .value("ADD_RX", FlowOpType::ADD_RX) + .value("ADD_RX_BATCH", FlowOpType::ADD_RX_BATCH) + .value("DELETE", FlowOpType::DELETE); + py::enum_(m, "ReorderMethod") .value("INVALID", ReorderMethod::INVALID) .value("SEQ_BATCH_NUMBER", ReorderMethod::SEQ_BATCH_NUMBER) @@ -525,6 +530,20 @@ void bind_config_types(py::module_ &m) { .def_readwrite("action", &FlowConfig::action_) .def_readwrite("match", &FlowConfig::match_); + py::class_(m, "FlowRuleConfig") + .def(py::init<>()) + .def_readwrite("name", &FlowRuleConfig::name_) + .def_readwrite("action", &FlowRuleConfig::action_) + .def_readwrite("match", &FlowRuleConfig::match_); + + py::class_(m, "FlowOpResult") + .def(py::init<>()) + .def_readwrite("op_id", &FlowOpResult::op_id_) + .def_readwrite("type", &FlowOpResult::type_) + .def_readwrite("status", &FlowOpResult::status_) + .def_readwrite("flow_id", &FlowOpResult::flow_id_) + .def_readwrite("flow_ids", &FlowOpResult::flow_ids_); + py::class_(m, "CommonConfig") .def(py::init<>()) .def_readwrite("version", &CommonConfig::version) @@ -607,6 +626,8 @@ void bind_config_types(py::module_ &m) { .def(py::init<>()) .def_readwrite("flow_isolation", &RxConfig::flow_isolation_) .def_readwrite("hardware_timestamps", &RxConfig::hardware_timestamps_) + .def_readwrite("dynamic_flow_capacity", + &RxConfig::dynamic_flow_capacity_) .def_readwrite("queues", &RxConfig::queues_) .def_readwrite("flows", &RxConfig::flows_) .def_readwrite("flex_items", &RxConfig::flex_items_) @@ -929,6 +950,37 @@ PYBIND11_MODULE(_daqiri, m) { m.def("get_port_id", &get_port_id, "key"_a); m.def("drop_all_traffic", &drop_all_traffic, "port"_a); m.def("allow_all_traffic", &allow_all_traffic, "port"_a); + m.def( + "add_rx_flow_async", + [](int port, const FlowRuleConfig &flow) { + FlowOpId op_id = 0; + const Status status = add_rx_flow_async(port, flow, &op_id); + return py::make_tuple(status, op_id); + }, + "port"_a, "flow"_a); + m.def( + "add_rx_flows_async", + [](int port, const std::vector &flows) { + FlowOpId op_id = 0; + const Status status = add_rx_flows_async(port, flows, &op_id); + return py::make_tuple(status, op_id); + }, + "port"_a, "flows"_a); + m.def( + "delete_flow_async", + [](FlowId flow_id) { + FlowOpId op_id = 0; + const Status status = delete_flow_async(flow_id, &op_id); + return py::make_tuple(status, op_id); + }, + "flow_id"_a); + m.def( + "poll_flow_op", + []() { + FlowOpResult result; + const Status status = poll_flow_op(&result); + return py::make_tuple(status, result); + }); m.def("get_num_rx_queues", &get_num_rx_queues, "port_id"_a); m.def("flush_port_queue", &flush_port_queue, "port"_a, "queue"_a); diff --git a/src/common.cpp b/src/common.cpp index bca3d31..3cda5b0 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -103,11 +103,31 @@ uint32_t get_packet_length(BurstParams* burst, int idx) { return g_daqiri_mgr->get_packet_length(burst, idx); } -uint16_t get_packet_flow_id(BurstParams* burst, int idx) { +FlowId get_packet_flow_id(BurstParams* burst, int idx) { ASSERT_DAQIRI_MGR_INITIALIZED(); return g_daqiri_mgr->get_packet_flow_id(burst, idx); } +Status add_rx_flow_async(int port, const FlowRuleConfig& flow, FlowOpId* op_id) { + ASSERT_DAQIRI_MGR_INITIALIZED(); + return g_daqiri_mgr->add_rx_flow_async(port, flow, op_id); +} + +Status add_rx_flows_async(int port, const std::vector& flows, FlowOpId* op_id) { + ASSERT_DAQIRI_MGR_INITIALIZED(); + return g_daqiri_mgr->add_rx_flows_async(port, flows, op_id); +} + +Status delete_flow_async(FlowId flow_id, FlowOpId* op_id) { + ASSERT_DAQIRI_MGR_INITIALIZED(); + return g_daqiri_mgr->delete_flow_async(flow_id, op_id); +} + +Status poll_flow_op(FlowOpResult* result) { + ASSERT_DAQIRI_MGR_INITIALIZED(); + return g_daqiri_mgr->poll_flow_op(result); +} + Status get_packet_rx_timestamp(BurstParams* burst, int idx, uint64_t* timestamp_ns) { ASSERT_DAQIRI_MGR_INITIALIZED(); return g_daqiri_mgr->get_packet_rx_timestamp(burst, idx, timestamp_ns); @@ -589,7 +609,7 @@ bool YAML::convert::parse_flow_config( struct in_addr addr; try { flow.name_ = flow_item["name"].as(); - flow.id_ = flow_item["id"].as(); + flow.id_ = flow_item["id"].as(); flow.action_.type_ = daqiri::FlowType::QUEUE; flow.action_.id_ = flow_item["action"]["id"].as(); } catch (const std::exception& e) { @@ -598,7 +618,7 @@ bool YAML::convert::parse_flow_config( } memset(&flow.match_, 0, sizeof(flow.match_)); - flow.match_.type_ = daqiri::FlowMatchType::NORMAL; + flow.match_.type_ = daqiri::FlowMatchType::IPV4_UDP; try { flow.match_.udp_src_ = flow_item["match"]["udp_src"].as(); @@ -743,7 +763,7 @@ bool YAML::convert::parse_reorder_config( } for (const auto& flow_id_node : reorder_item["flow_ids"]) { - reorder_config.flow_ids_.push_back(flow_id_node.as()); + reorder_config.flow_ids_.push_back(flow_id_node.as()); } if (reorder_config.flow_ids_.empty()) { DAQIRI_LOG_ERROR("Reorder config '{}' requires at least one flow ID", diff --git a/src/manager.cpp b/src/manager.cpp index f1223a1..b55273d 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -784,6 +784,36 @@ Status Manager::allow_all_traffic(int port) { return Status::NOT_SUPPORTED; } +Status Manager::add_rx_flow_async(int port, const FlowRuleConfig& flow, FlowOpId* op_id) { + (void)port; + (void)flow; + (void)op_id; + DAQIRI_LOG_ERROR("add_rx_flow_async not implemented for this manager type"); + return Status::NOT_SUPPORTED; +} + +Status Manager::add_rx_flows_async(int port, + const std::vector& flows, + FlowOpId* op_id) { + (void)port; + (void)flows; + (void)op_id; + DAQIRI_LOG_ERROR("add_rx_flows_async not implemented for this manager type"); + return Status::NOT_SUPPORTED; +} + +Status Manager::delete_flow_async(FlowId flow_id, FlowOpId* op_id) { + (void)flow_id; + (void)op_id; + DAQIRI_LOG_ERROR("delete_flow_async not implemented for this manager type"); + return Status::NOT_SUPPORTED; +} + +Status Manager::poll_flow_op(FlowOpResult* result) { + (void)result; + return Status::NOT_SUPPORTED; +} + Status Manager::get_rx_burst(BurstParams** burst, int port_id) { // Check if the port_id is valid if (port_id < 0 || port_id >= static_cast(cfg_.ifs_.size())) { diff --git a/src/manager.h b/src/manager.h index cf63319..a06036f 100644 --- a/src/manager.h +++ b/src/manager.h @@ -47,7 +47,7 @@ class Manager { virtual uint32_t get_packet_length(BurstParams* burst, int idx) = 0; virtual void* get_segment_packet_ptr(BurstParams* burst, int seg, int idx) = 0; virtual uint32_t get_segment_packet_length(BurstParams* burst, int seg, int idx) = 0; - virtual uint16_t get_packet_flow_id(BurstParams* burst, int idx) = 0; + virtual FlowId get_packet_flow_id(BurstParams* burst, int idx) = 0; virtual Status get_packet_rx_timestamp(BurstParams* burst, int idx, uint64_t* timestamp_ns) = 0; virtual void* get_packet_extra_info(BurstParams* burst, int idx) = 0; virtual Status get_tx_packet_burst(BurstParams* burst) = 0; @@ -89,6 +89,12 @@ class Manager { virtual Status get_mac_addr(int port, char* mac) = 0; virtual Status drop_all_traffic(int port); virtual Status allow_all_traffic(int port); + virtual Status add_rx_flow_async(int port, const FlowRuleConfig& flow, FlowOpId* op_id); + virtual Status add_rx_flows_async(int port, + const std::vector& flows, + FlowOpId* op_id); + virtual Status delete_flow_async(FlowId flow_id, FlowOpId* op_id); + virtual Status poll_flow_op(FlowOpResult* result); virtual int get_port_id(const std::string& key) final; // NOLINT(readability/inheritance) virtual bool validate_config() const; virtual uint16_t get_num_rx_queues(int port_id) const; diff --git a/src/managers/dpdk/daqiri_dpdk_mgr.cpp b/src/managers/dpdk/daqiri_dpdk_mgr.cpp index 029f45c..0e90b07 100644 --- a/src/managers/dpdk/daqiri_dpdk_mgr.cpp +++ b/src/managers/dpdk/daqiri_dpdk_mgr.cpp @@ -302,7 +302,7 @@ bool DpdkMgr::init_reorder_queue_state(const InterfaceConfig& intf, const RxQueu const auto key = generate_queue_key(intf.port_id_, qcfg.common_.id_); ReorderQueueState qstate; - std::unordered_map flow_id_to_queue; + std::unordered_map flow_id_to_queue; for (const auto& flow : intf.rx_.flows_) { if (flow_id_to_queue.find(flow.id_) != flow_id_to_queue.end()) { DAQIRI_LOG_ERROR("Duplicate flow ID {} in interface '{}'", flow.id_, intf.name_); @@ -314,7 +314,7 @@ bool DpdkMgr::init_reorder_queue_state(const InterfaceConfig& intf, const RxQueu for (const auto& reorder_cfg : intf.rx_.reorder_configs_) { const bool use_gpu_backend = reorder_cfg.reorder_type_ == "gpu"; int flow_queue_id = -1; - std::vector queue_flow_ids; + std::vector queue_flow_ids; queue_flow_ids.reserve(reorder_cfg.flow_ids_.size()); for (const auto flow_id : reorder_cfg.flow_ids_) { @@ -1297,7 +1297,7 @@ Status DpdkMgr::process_burst_for_reorder(uint32_t key, ReorderQueueState& qstat qstate.unmatched_count = 0; for (int i = 0; i < num_pkts; ++i) { - const uint16_t flow_id = get_packet_flow_id(burst, i); + const FlowId flow_id = get_packet_flow_id(burst, i); const auto flow_it = qstate.flow_id_to_plan.find(flow_id); if (flow_it == qstate.flow_id_to_plan.end()) { if (qstate.unmatched_count >= unmatched_indices.size()) { @@ -1855,6 +1855,11 @@ void DpdkMgr::initialize() { for (auto& conf : local_port_conf) { conf = conf_eth_port; } + if (!reserve_static_flow_ids()) { + DAQIRI_LOG_CRITICAL("Failed to reserve static flow IDs"); + return; + } + /* Initialize DPDK params */ constexpr int max_nargs = 32; constexpr int max_arg_size = 64; @@ -2270,10 +2275,9 @@ void DpdkMgr::initialize() { DAQIRI_LOG_INFO("Successfully configured ethdev"); } - ret = - rte_eth_dev_adjust_nb_rx_tx_desc(intf.port_id_, - &default_num_rx_desc, - &default_num_tx_desc); + ret = rte_eth_dev_adjust_nb_rx_tx_desc(intf.port_id_, + &default_num_rx_desc, + &default_num_tx_desc); if (ret < 0) { DAQIRI_LOG_CRITICAL( "Cannot adjust number of descriptors: err={}, port={}", ret, intf.port_id_); @@ -2285,6 +2289,8 @@ void DpdkMgr::initialize() { rte_eth_macaddr_get(intf.port_id_, &conf_ports_eth_addr[intf.port_id_]); + configure_flow_api_for_port(intf.port_id_, rx.dynamic_flow_capacity_); + if (intf.rx_.flow_isolation_) { struct rte_flow_error error; ret = rte_flow_isolate(intf.port_id_, 1, &error); @@ -2384,7 +2390,8 @@ void DpdkMgr::initialize() { for (const auto& flow : rx.flows_) { DAQIRI_LOG_INFO("Adding RX flow {}", flow.name_); if (flow.match_.type_ == FlowMatchType::FLEX_ITEM) { - add_flex_item_flow(intf.port_id_, flow.match_.flex_item_match_, flow.action_.id_); + add_flex_item_flow( + intf.port_id_, flow.match_.flex_item_match_, flow.action_.id_, flow.id_); } else { add_flow(intf.port_id_, flow); } @@ -2544,6 +2551,1006 @@ int DpdkMgr::setup_pools_and_rings(int max_rx_batch, int max_tx_batch) { #define MAX_PATTERN_NUM 5 #define MAX_ACTION_NUM 4 +struct FlowTemplateCreateStorage { + struct rte_flow_item pattern[MAX_PATTERN_NUM]; + struct rte_flow_action action[MAX_ACTION_NUM]; + struct rte_flow_item_ipv4 ip_spec; + struct rte_flow_item_udp udp_spec; + struct rte_flow_action_mark mark; + struct rte_flow_action_queue queue; +}; + +static constexpr uint32_t kDynamicFlowQueueId = 0; +static constexpr uint32_t kRxFlowGroup = 3; +static constexpr uint32_t kRxFlowPriority = 1; +static constexpr size_t kMaxIpv4UdpFlowTemplateFieldsForSingleTable = 7; + +enum class Ipv4UdpFlowTemplateField { + IPV4_LEN, + IPV4_SRC, + IPV4_DST, + UDP_SRC, + UDP_DST, +}; + +struct Ipv4UdpFlowTemplateKey { + std::vector fields; +}; + +static bool operator==(const Ipv4UdpFlowTemplateKey& lhs, + const Ipv4UdpFlowTemplateKey& rhs) { + return lhs.fields == rhs.fields; +} + +static const std::vector& ipv4_udp_flow_template_fields() { + static const std::vector fields = { + Ipv4UdpFlowTemplateField::IPV4_LEN, + Ipv4UdpFlowTemplateField::IPV4_SRC, + Ipv4UdpFlowTemplateField::IPV4_DST, + Ipv4UdpFlowTemplateField::UDP_SRC, + Ipv4UdpFlowTemplateField::UDP_DST, + }; + return fields; +} + +static void append_ipv4_udp_flow_template_keys(size_t field_idx, + std::vector& selected, + std::vector& keys) { + const auto& fields = ipv4_udp_flow_template_fields(); + if (field_idx == fields.size()) { + keys.push_back({selected}); + return; + } + + append_ipv4_udp_flow_template_keys(field_idx + 1, selected, keys); + selected.push_back(fields[field_idx]); + append_ipv4_udp_flow_template_keys(field_idx + 1, selected, keys); + selected.pop_back(); +} + +static const std::vector& ipv4_udp_flow_template_keys() { + static const std::vector keys = [] { + std::vector generated_keys; + if (ipv4_udp_flow_template_fields().size() > + kMaxIpv4UdpFlowTemplateFieldsForSingleTable) { + return generated_keys; + } + + std::vector selected; + append_ipv4_udp_flow_template_keys(0, selected, generated_keys); + return generated_keys; + }(); + return keys; +} + +static Ipv4UdpFlowTemplateKey ipv4_udp_flow_template_key(const FlowMatch& match) { + Ipv4UdpFlowTemplateKey key; + if (match.ipv4_len_ > 0) { + key.fields.push_back(Ipv4UdpFlowTemplateField::IPV4_LEN); + } + if (match.ipv4_src_ != INADDR_ANY) { + key.fields.push_back(Ipv4UdpFlowTemplateField::IPV4_SRC); + } + if (match.ipv4_dst_ != INADDR_ANY) { + key.fields.push_back(Ipv4UdpFlowTemplateField::IPV4_DST); + } + if (match.udp_src_ > 0) { + key.fields.push_back(Ipv4UdpFlowTemplateField::UDP_SRC); + } + if (match.udp_dst_ > 0) { + key.fields.push_back(Ipv4UdpFlowTemplateField::UDP_DST); + } + return key; +} + +static bool ipv4_udp_flow_template_key_has_field(const Ipv4UdpFlowTemplateKey& key, + Ipv4UdpFlowTemplateField field) { + return std::find(key.fields.begin(), key.fields.end(), field) != key.fields.end(); +} + +static void build_ipv4_udp_template_pattern(const Ipv4UdpFlowTemplateKey& key, + struct rte_flow_item pattern[MAX_PATTERN_NUM]) { + static struct rte_flow_item_ipv4 ip_mask; + static struct rte_flow_item_udp udp_mask; + + memset(pattern, 0, sizeof(struct rte_flow_item) * MAX_PATTERN_NUM); + memset(&ip_mask, 0, sizeof(ip_mask)); + memset(&udp_mask, 0, sizeof(udp_mask)); + + pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; + pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4; + pattern[2].type = RTE_FLOW_ITEM_TYPE_UDP; + pattern[3].type = RTE_FLOW_ITEM_TYPE_END; + + if (ipv4_udp_flow_template_key_has_field(key, Ipv4UdpFlowTemplateField::IPV4_LEN)) { + ip_mask.hdr.total_length = 0xffff; + } + if (ipv4_udp_flow_template_key_has_field(key, Ipv4UdpFlowTemplateField::IPV4_SRC)) { + ip_mask.hdr.src_addr = 0xffffffff; + } + if (ipv4_udp_flow_template_key_has_field(key, Ipv4UdpFlowTemplateField::IPV4_DST)) { + ip_mask.hdr.dst_addr = 0xffffffff; + } + if (ipv4_udp_flow_template_key_has_field(key, Ipv4UdpFlowTemplateField::IPV4_LEN) || + ipv4_udp_flow_template_key_has_field(key, Ipv4UdpFlowTemplateField::IPV4_SRC) || + ipv4_udp_flow_template_key_has_field(key, Ipv4UdpFlowTemplateField::IPV4_DST)) { + pattern[1].mask = &ip_mask; + } + + if (ipv4_udp_flow_template_key_has_field(key, Ipv4UdpFlowTemplateField::UDP_SRC)) { + udp_mask.hdr.src_port = 0xffff; + } + if (ipv4_udp_flow_template_key_has_field(key, Ipv4UdpFlowTemplateField::UDP_DST)) { + udp_mask.hdr.dst_port = 0xffff; + } + if (ipv4_udp_flow_template_key_has_field(key, Ipv4UdpFlowTemplateField::UDP_SRC) || + ipv4_udp_flow_template_key_has_field(key, Ipv4UdpFlowTemplateField::UDP_DST)) { + pattern[2].mask = &udp_mask; + } +} + +void* DpdkMgr::flow_op_user_data(FlowOpId op_id) { + return reinterpret_cast(static_cast(op_id)); +} + +FlowOpId DpdkMgr::flow_op_id_from_user_data(void* user_data) { + return static_cast(reinterpret_cast(user_data)); +} + +FlowOpId DpdkMgr::allocate_flow_op_id() { + if (next_flow_op_id_ == 0) { next_flow_op_id_ = 1; } + return next_flow_op_id_++; +} + +FlowId DpdkMgr::allocate_dynamic_flow_id() { + while (next_dynamic_flow_id_ != 0) { + const FlowId candidate = next_dynamic_flow_id_++; + if (candidate == 0) { continue; } + if (static_flow_ids_.find(candidate) != static_flow_ids_.end()) { continue; } + if (dynamic_flows_.find(candidate) != dynamic_flows_.end()) { continue; } + return candidate; + } + return 0; +} + +bool DpdkMgr::reserve_static_flow_ids() { + static_flow_ids_.clear(); + for (const auto& intf : cfg_.ifs_) { + for (const auto& flow : intf.rx_.flows_) { + if (flow.id_ == 0) { + DAQIRI_LOG_ERROR("Static flow '{}' on interface '{}' uses reserved flow ID 0", + flow.name_, + intf.name_); + return false; + } + if (!static_flow_ids_.insert(flow.id_).second) { + DAQIRI_LOG_ERROR("Duplicate static flow ID {}", flow.id_); + return false; + } + } + } + next_dynamic_flow_id_ = 1; + return true; +} + +bool DpdkMgr::is_valid_rx_queue(int port, uint16_t queue_id) const { + if (port < 0 || port >= static_cast(cfg_.ifs_.size())) { return false; } + const auto& queues = cfg_.ifs_[port].rx_.queues_; + return std::any_of(queues.begin(), queues.end(), [queue_id](const RxQueueConfig& q) { + return q.common_.id_ == queue_id; + }); +} + +bool DpdkMgr::ipv4_udp_flow_template_index(const FlowMatch& match, + uint8_t* template_index) const { + if (template_index == nullptr) { return false; } + *template_index = 0; + + const auto key = ipv4_udp_flow_template_key(match); + if (key.fields.empty()) { return false; } + + const auto& keys = ipv4_udp_flow_template_keys(); + const auto key_it = std::find(keys.begin(), keys.end(), key); + if (key_it == keys.end()) { return false; } + + const auto distance = std::distance(keys.begin(), key_it); + if (distance < 0 || distance > std::numeric_limits::max()) { + return false; + } + *template_index = static_cast(distance); + return true; +} + +bool DpdkMgr::is_ipv4_udp_flow_match(const FlowMatch& match) const { + return match.type_ == FlowMatchType::IPV4_UDP && + !ipv4_udp_flow_template_key(match).fields.empty(); +} + +bool DpdkMgr::validate_dynamic_rx_flow(int port, const FlowRuleConfig& flow) const { + if (!initialized_) { + DAQIRI_LOG_ERROR("Cannot add dynamic RX flow before DAQIRI initialization"); + return false; + } + if (loopback_ == LoopbackType::LOOPBACK_TYPE_SW) { + DAQIRI_LOG_ERROR("Dynamic RX flows are not supported in software loopback mode"); + return false; + } + if (port < 0 || port >= static_cast(cfg_.ifs_.size())) { + DAQIRI_LOG_ERROR("Invalid dynamic RX flow port {}", port); + return false; + } + if (flow.action_.type_ != FlowType::QUEUE) { + DAQIRI_LOG_ERROR("Dynamic RX flow action type is not supported"); + return false; + } + if (!is_valid_rx_queue(port, flow.action_.id_)) { + DAQIRI_LOG_ERROR("Dynamic RX flow targets invalid port/queue {}/{}", port, flow.action_.id_); + return false; + } + if (is_ipv4_udp_flow_match(flow.match_)) { return true; } + if (flow.match_.type_ == FlowMatchType::FLEX_ITEM) { + const auto& flex_items = cfg_.ifs_[port].rx_.flex_items_; + if (flow.match_.flex_item_match_.flex_item_id_ >= flex_items.size()) { + DAQIRI_LOG_ERROR("Dynamic RX flow references invalid flex item ID {}", + flow.match_.flex_item_match_.flex_item_id_); + return false; + } + return true; + } + + DAQIRI_LOG_ERROR("Dynamic RX flow must define an IPv4/UDP or flex-item match"); + return false; +} + +void DpdkMgr::build_ipv4_udp_flow_pattern(const FlowMatch& match, + struct rte_flow_item pattern[MAX_PATTERN_NUM], + struct rte_flow_item_ipv4* ip_spec, + struct rte_flow_item_udp* udp_spec) const { + if (ip_spec == nullptr || udp_spec == nullptr) { return; } + + memset(pattern, 0, sizeof(struct rte_flow_item) * MAX_PATTERN_NUM); + memset(ip_spec, 0, sizeof(*ip_spec)); + memset(udp_spec, 0, sizeof(*udp_spec)); + + pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; + pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4; + pattern[2].type = RTE_FLOW_ITEM_TYPE_UDP; + pattern[3].type = RTE_FLOW_ITEM_TYPE_END; + + bool has_ip_match = false; + if (match.ipv4_len_ > 0) { + ip_spec->hdr.total_length = htons(match.ipv4_len_); + has_ip_match = true; + } + if (match.ipv4_src_ != INADDR_ANY) { + ip_spec->hdr.src_addr = match.ipv4_src_; + has_ip_match = true; + } + if (match.ipv4_dst_ != INADDR_ANY) { + ip_spec->hdr.dst_addr = match.ipv4_dst_; + has_ip_match = true; + } + if (has_ip_match) { pattern[1].spec = ip_spec; } + + bool has_udp_match = false; + if (match.udp_src_ > 0) { + udp_spec->hdr.src_port = htons(match.udp_src_); + has_udp_match = true; + } + if (match.udp_dst_ > 0) { + udp_spec->hdr.dst_port = htons(match.udp_dst_); + has_udp_match = true; + } + if (has_udp_match) { pattern[2].spec = udp_spec; } +} + +void DpdkMgr::build_mark_queue_actions(FlowId flow_id, + uint16_t queue_id, + struct rte_flow_action action[MAX_ACTION_NUM], + struct rte_flow_action_mark* mark, + struct rte_flow_action_queue* queue) const { + if (mark == nullptr || queue == nullptr) { return; } + + memset(action, 0, sizeof(struct rte_flow_action) * MAX_ACTION_NUM); + memset(mark, 0, sizeof(*mark)); + memset(queue, 0, sizeof(*queue)); + + mark->id = flow_id; + queue->index = queue_id; + action[0].type = RTE_FLOW_ACTION_TYPE_MARK; + action[0].conf = mark; + action[1].type = RTE_FLOW_ACTION_TYPE_QUEUE; + action[1].conf = queue; + action[2].type = RTE_FLOW_ACTION_TYPE_END; +} + +Status DpdkMgr::enqueue_software_flow_completion(const FlowOpResult& result) { + ready_flow_ops_.push(result); + return Status::SUCCESS; +} + +bool DpdkMgr::configure_flow_api_for_port(uint16_t port, uint32_t capacity) { + if (port >= flow_template_states_.size()) { return false; } + auto& state = flow_template_states_[port]; + if (state.configured) { return true; } + + struct rte_flow_error error; + struct rte_flow_port_info port_info; + struct rte_flow_queue_info queue_info; + memset(&error, 0, sizeof(error)); + memset(&port_info, 0, sizeof(port_info)); + memset(&queue_info, 0, sizeof(queue_info)); + + int ret = rte_flow_info_get(port, &port_info, &queue_info, &error); + if (ret < 0 || port_info.max_nb_queues == 0) { + DAQIRI_LOG_WARN("DPDK async flow API is not available on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + return false; + } + + state.capacity = capacity == 0 ? DEFAULT_DYNAMIC_FLOW_CAPACITY : capacity; + struct rte_flow_port_attr port_attr; + struct rte_flow_queue_attr queue_attr; + memset(&port_attr, 0, sizeof(port_attr)); + memset(&queue_attr, 0, sizeof(queue_attr)); + + if ((port_info.supported_flags & RTE_FLOW_PORT_FLAG_STRICT_QUEUE) != 0) { + port_attr.flags |= RTE_FLOW_PORT_FLAG_STRICT_QUEUE; + } + queue_attr.size = std::max(64, state.capacity * 2); + if (queue_info.max_size > 0) { queue_attr.size = std::min(queue_attr.size, queue_info.max_size); } + const struct rte_flow_queue_attr* queue_attrs[] = {&queue_attr}; + + ret = rte_flow_configure(port, &port_attr, 1, queue_attrs, &error); + if (ret < 0) { + DAQIRI_LOG_WARN("Failed to configure DPDK async flow API on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + return false; + } + + state.configured = true; + state.flow_queue_id = kDynamicFlowQueueId; + DAQIRI_LOG_INFO("Configured DPDK async flow API on port {} with queue size {}", + port, + queue_attr.size); + return true; +} + +bool DpdkMgr::ensure_ipv4_udp_flow_template_table(uint16_t port) { + if (port >= flow_template_states_.size()) { return false; } + auto& state = flow_template_states_[port]; + if (state.templates_ready) { return true; } + if (!state.configured) { return false; } + + struct rte_flow_error error; + memset(&error, 0, sizeof(error)); + + const auto& template_keys = ipv4_udp_flow_template_keys(); + if (template_keys.empty()) { + DAQIRI_LOG_WARN("IPv4/UDP RX flow template field count {} cannot fit in one DPDK " + "template table; falling back to legacy flow create", + ipv4_udp_flow_template_fields().size()); + return false; + } + if (template_keys.size() > std::numeric_limits::max()) { + DAQIRI_LOG_WARN("DPDK supports at most {} pattern templates per table; {} IPv4/UDP RX " + "flow templates requested", + static_cast(std::numeric_limits::max()), + template_keys.size()); + return false; + } + + state.ipv4_udp_pattern_templates.assign(template_keys.size(), nullptr); + for (size_t pattern_idx = 0; pattern_idx < template_keys.size(); ++pattern_idx) { + struct rte_flow_pattern_template_attr pattern_attr; + struct rte_flow_item pattern[MAX_PATTERN_NUM]; + memset(&pattern_attr, 0, sizeof(pattern_attr)); + pattern_attr.ingress = 1; + pattern_attr.relaxed_matching = 1; + build_ipv4_udp_template_pattern(template_keys[pattern_idx], pattern); + + state.ipv4_udp_pattern_templates[pattern_idx] = + rte_flow_pattern_template_create(port, &pattern_attr, pattern, &error); + if (state.ipv4_udp_pattern_templates[pattern_idx] == nullptr) { + DAQIRI_LOG_WARN("Failed to create RX flow pattern template {} on port {}: {}", + pattern_idx, + port, + error.message ? error.message : rte_strerror(rte_errno)); + for (auto* tmpl : state.ipv4_udp_pattern_templates) { + if (tmpl != nullptr) { rte_flow_pattern_template_destroy(port, tmpl, &error); } + } + state.ipv4_udp_pattern_templates.clear(); + return false; + } + } + + struct rte_flow_actions_template_attr actions_attr; + struct rte_flow_action actions[MAX_ACTION_NUM]; + struct rte_flow_action masks[MAX_ACTION_NUM]; + struct rte_flow_action_mark mark; + struct rte_flow_action_queue queue; + struct rte_flow_action_mark mark_mask; + struct rte_flow_action_queue queue_mask; + memset(&actions_attr, 0, sizeof(actions_attr)); + memset(actions, 0, sizeof(actions)); + memset(masks, 0, sizeof(masks)); + memset(&mark, 0, sizeof(mark)); + memset(&queue, 0, sizeof(queue)); + memset(&mark_mask, 0, sizeof(mark_mask)); + memset(&queue_mask, 0, sizeof(queue_mask)); + actions_attr.ingress = 1; + actions[0].type = RTE_FLOW_ACTION_TYPE_MARK; + actions[0].conf = &mark; + actions[1].type = RTE_FLOW_ACTION_TYPE_QUEUE; + actions[1].conf = &queue; + actions[2].type = RTE_FLOW_ACTION_TYPE_END; + masks[0].type = RTE_FLOW_ACTION_TYPE_MARK; + masks[0].conf = &mark_mask; + masks[1].type = RTE_FLOW_ACTION_TYPE_QUEUE; + masks[1].conf = &queue_mask; + masks[2].type = RTE_FLOW_ACTION_TYPE_END; + + state.mark_queue_actions_template = + rte_flow_actions_template_create(port, &actions_attr, actions, masks, &error); + if (state.mark_queue_actions_template == nullptr) { + DAQIRI_LOG_WARN("Failed to create RX flow actions template on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + for (auto* tmpl : state.ipv4_udp_pattern_templates) { + if (tmpl != nullptr) { rte_flow_pattern_template_destroy(port, tmpl, &error); } + } + state.ipv4_udp_pattern_templates.clear(); + return false; + } + + struct rte_flow_template_table_attr table_attr; + memset(&table_attr, 0, sizeof(table_attr)); + table_attr.flow_attr.ingress = 1; + table_attr.flow_attr.group = kRxFlowGroup; + table_attr.flow_attr.priority = kRxFlowPriority; + table_attr.nb_flows = state.capacity; + table_attr.insertion_type = RTE_FLOW_TABLE_INSERTION_TYPE_PATTERN; + table_attr.hash_func = RTE_FLOW_TABLE_HASH_FUNC_DEFAULT; + + std::vector action_templates = { + state.mark_queue_actions_template}; + state.ipv4_udp_table = + rte_flow_template_table_create(port, + &table_attr, + state.ipv4_udp_pattern_templates.data(), + static_cast(state.ipv4_udp_pattern_templates.size()), + action_templates.data(), + static_cast(action_templates.size()), + &error); + if (state.ipv4_udp_table == nullptr) { + DAQIRI_LOG_WARN("Failed to create RX flow template table on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + rte_flow_actions_template_destroy(port, state.mark_queue_actions_template, &error); + state.mark_queue_actions_template = nullptr; + for (auto* tmpl : state.ipv4_udp_pattern_templates) { + if (tmpl != nullptr) { rte_flow_pattern_template_destroy(port, tmpl, &error); } + } + state.ipv4_udp_pattern_templates.clear(); + return false; + } + + state.templates_ready = true; + DAQIRI_LOG_INFO("Created RX flow template table on port {} with capacity {}", + port, + state.capacity); + return true; +} + +Status DpdkMgr::create_dynamic_flow_legacy_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id) { + FlowConfig cfg; + cfg.name_ = flow.name_; + cfg.id_ = flow_id; + cfg.action_ = flow.action_; + cfg.match_ = flow.match_; + cfg.backend_config_ = flow.backend_config_; + + struct rte_flow* rte_flow = nullptr; + if (cfg.match_.type_ == FlowMatchType::FLEX_ITEM) { + rte_flow = add_flex_item_flow(port, cfg.match_.flex_item_match_, cfg.action_.id_, cfg.id_); + } else { + rte_flow = add_flow(port, cfg); + } + + if (rte_flow == nullptr) { + return Status::INTERNAL_ERROR; + } + + DynamicFlowEntry entry; + entry.flow_id = flow_id; + entry.port = static_cast(port); + entry.queue = flow.action_.id_; + entry.flow = rte_flow; + entry.backend = DynamicFlowBackend::LEGACY; + entry.state = DynamicFlowState::ACTIVE; + dynamic_flows_[flow_id] = entry; + return Status::SUCCESS; +} + +Status DpdkMgr::add_rx_flow_legacy_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id, + FlowOpId op_id) { + FlowOpResult result; + result.op_id_ = op_id; + result.type_ = FlowOpType::ADD_RX; + result.flow_id_ = flow_id; + result.flow_ids_ = {flow_id}; + result.status_ = create_dynamic_flow_legacy_locked(port, flow, flow_id); + if (result.status_ != Status::SUCCESS) { + result.flow_id_ = 0; + result.flow_ids_[0] = 0; + } + + enqueue_software_flow_completion(result); + return Status::SUCCESS; +} + +Status DpdkMgr::add_rx_flows_legacy_locked(int port, + const std::vector& flows, + const std::vector& flow_ids, + FlowOpId op_id) { + FlowOpResult result; + result.op_id_ = op_id; + result.type_ = FlowOpType::ADD_RX_BATCH; + result.status_ = Status::SUCCESS; + result.flow_ids_ = flow_ids; + + for (size_t i = 0; i < flows.size(); ++i) { + const Status status = create_dynamic_flow_legacy_locked(port, flows[i], flow_ids[i]); + if (status != Status::SUCCESS) { + result.status_ = status; + result.flow_ids_[i] = 0; + } + } + + enqueue_software_flow_completion(result); + return Status::SUCCESS; +} + +Status DpdkMgr::enqueue_rx_flow_template_create_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id, + FlowOpId completion_id) { + auto& state = flow_template_states_[port]; + struct rte_flow_op_attr op_attr; + struct rte_flow_error error; + memset(&op_attr, 0, sizeof(op_attr)); + memset(&error, 0, sizeof(error)); + + auto storage = std::make_shared(); + build_ipv4_udp_flow_pattern(flow.match_, storage->pattern, &storage->ip_spec, &storage->udp_spec); + build_mark_queue_actions(flow_id, + flow.action_.id_, + storage->action, + &storage->mark, + &storage->queue); + + uint8_t template_index = 0; + if (!ipv4_udp_flow_template_index(flow.match_, &template_index)) { + return Status::INTERNAL_ERROR; + } + struct rte_flow* rte_flow = + rte_flow_async_create(static_cast(port), + state.flow_queue_id, + &op_attr, + state.ipv4_udp_table, + storage->pattern, + template_index, + storage->action, + 0, + flow_op_user_data(completion_id), + &error); + if (rte_flow == nullptr) { + DAQIRI_LOG_WARN("Failed to enqueue async RX flow create on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + return Status::INTERNAL_ERROR; + } + + DynamicFlowEntry entry; + entry.flow_id = flow_id; + entry.port = static_cast(port); + entry.queue = flow.action_.id_; + entry.flow = rte_flow; + entry.backend = DynamicFlowBackend::TEMPLATE; + entry.state = DynamicFlowState::ADDING; + entry.flow_queue_id = state.flow_queue_id; + entry.backend_storage = storage; + dynamic_flows_[flow_id] = entry; + return Status::SUCCESS; +} + +Status DpdkMgr::add_rx_flow_template_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id, + FlowOpId op_id) { + if (!ensure_rx_flow_jump_rule(static_cast(port)) || + !ensure_ipv4_udp_flow_template_table(static_cast(port))) { + return add_rx_flow_legacy_locked(port, flow, flow_id, op_id); + } + + PendingFlowBatch pending; + pending.result.op_id_ = op_id; + pending.result.type_ = FlowOpType::ADD_RX; + pending.result.status_ = Status::NOT_READY; + pending.result.flow_id_ = flow_id; + pending.result.flow_ids_ = {flow_id}; + pending.remaining = 1; + + if (enqueue_rx_flow_template_create_locked(port, flow, flow_id, op_id) != Status::SUCCESS) { + return add_rx_flow_legacy_locked(port, flow, flow_id, op_id); + } + + auto& state = flow_template_states_[port]; + struct rte_flow_error error; + memset(&error, 0, sizeof(error)); + pending_flow_batches_[op_id] = pending; + pending_flow_completions_[op_id] = {op_id, flow_id, 0}; + + if (rte_flow_push(static_cast(port), state.flow_queue_id, &error) < 0) { + DAQIRI_LOG_WARN("Failed to push async RX flow create on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + + pending_flow_completions_.erase(op_id); + pending_flow_batches_.erase(op_id); + dynamic_flows_.erase(flow_id); + + FlowOpResult result; + result.op_id_ = op_id; + result.type_ = FlowOpType::ADD_RX; + result.status_ = Status::INTERNAL_ERROR; + result.flow_id_ = 0; + result.flow_ids_ = {0}; + enqueue_software_flow_completion(result); + return Status::SUCCESS; + } + + return Status::SUCCESS; +} + +Status DpdkMgr::add_rx_flows_template_locked(int port, + const std::vector& flows, + const std::vector& flow_ids, + FlowOpId op_id) { + if (!ensure_rx_flow_jump_rule(static_cast(port)) || + !ensure_ipv4_udp_flow_template_table(static_cast(port))) { + return add_rx_flows_legacy_locked(port, flows, flow_ids, op_id); + } + + for (const auto& flow : flows) { + uint8_t template_index = 0; + if (!ipv4_udp_flow_template_index(flow.match_, &template_index)) { + return add_rx_flows_legacy_locked(port, flows, flow_ids, op_id); + } + } + + PendingFlowBatch pending; + pending.result.op_id_ = op_id; + pending.result.type_ = FlowOpType::ADD_RX_BATCH; + pending.result.status_ = Status::NOT_READY; + pending.result.flow_ids_ = flow_ids; + + size_t enqueued = 0; + for (size_t i = 0; i < flows.size(); ++i) { + const FlowOpId completion_id = allocate_flow_op_id(); + const Status status = + enqueue_rx_flow_template_create_locked(port, flows[i], flow_ids[i], completion_id); + if (status != Status::SUCCESS) { + pending.result.status_ = status; + pending.result.flow_ids_[i] = 0; + for (size_t j = i + 1; j < pending.result.flow_ids_.size(); ++j) { + pending.result.flow_ids_[j] = 0; + } + break; + } + + pending_flow_completions_[completion_id] = {op_id, flow_ids[i], i}; + ++enqueued; + } + + if (enqueued == 0) { + if (pending.result.status_ == Status::NOT_READY) { + pending.result.status_ = Status::INTERNAL_ERROR; + std::fill(pending.result.flow_ids_.begin(), pending.result.flow_ids_.end(), 0); + } + enqueue_software_flow_completion(pending.result); + return Status::SUCCESS; + } + + pending.remaining = enqueued; + pending_flow_batches_[op_id] = pending; + + auto& state = flow_template_states_[port]; + struct rte_flow_error error; + memset(&error, 0, sizeof(error)); + if (rte_flow_push(static_cast(port), state.flow_queue_id, &error) < 0) { + DAQIRI_LOG_WARN("Failed to push async RX flow batch create on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + + for (auto completion_it = pending_flow_completions_.begin(); + completion_it != pending_flow_completions_.end();) { + if (completion_it->second.op_id == op_id) { + dynamic_flows_.erase(completion_it->second.flow_id); + completion_it = pending_flow_completions_.erase(completion_it); + } else { + ++completion_it; + } + } + pending_flow_batches_.erase(op_id); + + pending.result.status_ = Status::INTERNAL_ERROR; + std::fill(pending.result.flow_ids_.begin(), pending.result.flow_ids_.end(), 0); + enqueue_software_flow_completion(pending.result); + } + + return Status::SUCCESS; +} + +Status DpdkMgr::delete_flow_legacy_locked(DynamicFlowEntry& entry, FlowOpId op_id) { + struct rte_flow_error error; + memset(&error, 0, sizeof(error)); + + FlowOpResult result; + result.op_id_ = op_id; + result.type_ = FlowOpType::DELETE; + result.flow_id_ = entry.flow_id; + result.status_ = Status::SUCCESS; + + if (rte_flow_destroy(entry.port, entry.flow, &error) != 0) { + DAQIRI_LOG_ERROR("Failed to destroy dynamic RX flow {} on port {}: {}", + entry.flow_id, + entry.port, + error.message ? error.message : rte_strerror(rte_errno)); + result.status_ = Status::INTERNAL_ERROR; + } + + dynamic_flows_.erase(entry.flow_id); + enqueue_software_flow_completion(result); + return Status::SUCCESS; +} + +Status DpdkMgr::delete_flow_template_locked(DynamicFlowEntry& entry, FlowOpId op_id) { + struct rte_flow_op_attr op_attr; + struct rte_flow_error error; + memset(&op_attr, 0, sizeof(op_attr)); + memset(&error, 0, sizeof(error)); + + if (rte_flow_async_destroy(entry.port, + entry.flow_queue_id, + &op_attr, + entry.flow, + flow_op_user_data(op_id), + &error) < 0) { + DAQIRI_LOG_ERROR("Failed to enqueue async RX flow destroy for {} on port {}: {}", + entry.flow_id, + entry.port, + error.message ? error.message : rte_strerror(rte_errno)); + return Status::INTERNAL_ERROR; + } + if (rte_flow_push(entry.port, entry.flow_queue_id, &error) < 0) { + DAQIRI_LOG_ERROR("Failed to push async RX flow destroy for {} on port {}: {}", + entry.flow_id, + entry.port, + error.message ? error.message : rte_strerror(rte_errno)); + return Status::INTERNAL_ERROR; + } + + entry.state = DynamicFlowState::DELETING; + PendingFlowBatch pending; + pending.result.op_id_ = op_id; + pending.result.type_ = FlowOpType::DELETE; + pending.result.status_ = Status::NOT_READY; + pending.result.flow_id_ = entry.flow_id; + pending.remaining = 1; + pending_flow_batches_[op_id] = pending; + pending_flow_completions_[op_id] = {op_id, entry.flow_id, 0}; + return Status::SUCCESS; +} + +void DpdkMgr::poll_dpdk_flow_completions_locked() { + struct rte_flow_error error; + struct rte_flow_op_result results[16]; + + for (uint16_t port = 0; port < flow_template_states_.size(); ++port) { + auto& state = flow_template_states_[port]; + if (!state.configured) { continue; } + + while (true) { + memset(&error, 0, sizeof(error)); + memset(results, 0, sizeof(results)); + const int ret = rte_flow_pull(port, state.flow_queue_id, results, 16, &error); + if (ret <= 0) { break; } + + for (int i = 0; i < ret; ++i) { + const FlowOpId completion_id = flow_op_id_from_user_data(results[i].user_data); + const auto completion_it = pending_flow_completions_.find(completion_id); + if (completion_it == pending_flow_completions_.end()) { continue; } + + const PendingFlowCompletion completion = completion_it->second; + pending_flow_completions_.erase(completion_it); + + const auto batch_it = pending_flow_batches_.find(completion.op_id); + if (batch_it == pending_flow_batches_.end()) { continue; } + PendingFlowBatch& batch = batch_it->second; + const Status completion_status = results[i].status == RTE_FLOW_OP_SUCCESS + ? Status::SUCCESS + : Status::INTERNAL_ERROR; + if (completion_status != Status::SUCCESS) { + batch.result.status_ = completion_status; + if (completion.flow_index < batch.result.flow_ids_.size()) { + batch.result.flow_ids_[completion.flow_index] = 0; + } + if (batch.result.flow_id_ == completion.flow_id) { batch.result.flow_id_ = 0; } + } + + const auto flow_it = dynamic_flows_.find(completion.flow_id); + if (flow_it != dynamic_flows_.end()) { + if (batch.result.type_ == FlowOpType::ADD_RX || + batch.result.type_ == FlowOpType::ADD_RX_BATCH) { + if (completion_status == Status::SUCCESS) { + flow_it->second.state = DynamicFlowState::ACTIVE; + flow_it->second.backend_storage.reset(); + } else { + dynamic_flows_.erase(flow_it); + } + } else { + dynamic_flows_.erase(flow_it); + } + } + + if (batch.remaining > 0) { --batch.remaining; } + if (batch.remaining == 0) { + if (batch.result.status_ == Status::NOT_READY) { + batch.result.status_ = Status::SUCCESS; + } + ready_flow_ops_.push(batch.result); + pending_flow_batches_.erase(batch_it); + } + } + } + } +} + +Status DpdkMgr::add_rx_flow_async(int port, const FlowRuleConfig& flow, FlowOpId* op_id) { + if (op_id == nullptr) { return Status::NULL_PTR; } + *op_id = 0; + std::lock_guard guard(flow_lock_); + if (!validate_dynamic_rx_flow(port, flow)) { return Status::INVALID_PARAMETER; } + + const FlowId flow_id = allocate_dynamic_flow_id(); + if (flow_id == 0) { return Status::NO_SPACE_AVAILABLE; } + const FlowOpId new_op_id = allocate_flow_op_id(); + *op_id = new_op_id; + + if (is_ipv4_udp_flow_match(flow.match_)) { + return add_rx_flow_template_locked(port, flow, flow_id, new_op_id); + } + return add_rx_flow_legacy_locked(port, flow, flow_id, new_op_id); +} + +Status DpdkMgr::add_rx_flows_async(int port, + const std::vector& flows, + FlowOpId* op_id) { + if (op_id == nullptr) { return Status::NULL_PTR; } + *op_id = 0; + if (flows.empty()) { return Status::INVALID_PARAMETER; } + + std::lock_guard guard(flow_lock_); + for (const auto& flow : flows) { + if (!validate_dynamic_rx_flow(port, flow)) { return Status::INVALID_PARAMETER; } + } + + std::vector flow_ids; + flow_ids.reserve(flows.size()); + for (size_t i = 0; i < flows.size(); ++i) { + const FlowId flow_id = allocate_dynamic_flow_id(); + if (flow_id == 0) { return Status::NO_SPACE_AVAILABLE; } + flow_ids.push_back(flow_id); + } + + const FlowOpId new_op_id = allocate_flow_op_id(); + *op_id = new_op_id; + + const bool all_ipv4_udp = + std::all_of(flows.begin(), flows.end(), [this](const FlowRuleConfig& flow) { + return is_ipv4_udp_flow_match(flow.match_); + }); + if (all_ipv4_udp) { + return add_rx_flows_template_locked(port, flows, flow_ids, new_op_id); + } + return add_rx_flows_legacy_locked(port, flows, flow_ids, new_op_id); +} + +Status DpdkMgr::delete_flow_async(FlowId flow_id, FlowOpId* op_id) { + if (op_id == nullptr) { return Status::NULL_PTR; } + *op_id = 0; + std::lock_guard guard(flow_lock_); + if (!initialized_) { return Status::NOT_READY; } + if (static_flow_ids_.find(flow_id) != static_flow_ids_.end()) { return Status::INVALID_PARAMETER; } + const auto flow_it = dynamic_flows_.find(flow_id); + if (flow_it == dynamic_flows_.end() || flow_it->second.state != DynamicFlowState::ACTIVE) { + return Status::INVALID_PARAMETER; + } + + const FlowOpId new_op_id = allocate_flow_op_id(); + if (flow_it->second.backend == DynamicFlowBackend::TEMPLATE) { + const Status status = delete_flow_template_locked(flow_it->second, new_op_id); + if (status == Status::SUCCESS) { *op_id = new_op_id; } + return status; + } + const Status status = delete_flow_legacy_locked(flow_it->second, new_op_id); + if (status == Status::SUCCESS) { *op_id = new_op_id; } + return status; +} + +Status DpdkMgr::poll_flow_op(FlowOpResult* result) { + if (result == nullptr) { return Status::NULL_PTR; } + std::lock_guard guard(flow_lock_); + poll_dpdk_flow_completions_locked(); + if (ready_flow_ops_.empty()) { return Status::NOT_READY; } + *result = ready_flow_ops_.front(); + ready_flow_ops_.pop(); + return Status::SUCCESS; +} + +void DpdkMgr::cleanup_dynamic_flows() { + std::lock_guard guard(flow_lock_); + struct rte_flow_error error; + memset(&error, 0, sizeof(error)); + + pending_flow_batches_.clear(); + pending_flow_completions_.clear(); + while (!ready_flow_ops_.empty()) { ready_flow_ops_.pop(); } + + for (auto& [flow_id, entry] : dynamic_flows_) { + if (entry.flow != nullptr) { rte_flow_destroy(entry.port, entry.flow, &error); } + } + dynamic_flows_.clear(); + + for (uint16_t port = 0; port < drop_all_traffic_flow.size(); ++port) { + if (drop_all_traffic_flow[port].drop != nullptr) { + rte_flow_destroy(port, drop_all_traffic_flow[port].drop, &error); + drop_all_traffic_flow[port].drop = nullptr; + } + drop_all_traffic_flow[port].jump = nullptr; + } + + for (uint16_t port = 0; port < rx_flow_jump_rules_.size(); ++port) { + auto& state = flow_template_states_[port]; + if (state.ipv4_udp_table != nullptr) { + rte_flow_template_table_destroy(port, state.ipv4_udp_table, &error); + state.ipv4_udp_table = nullptr; + } + if (state.mark_queue_actions_template != nullptr) { + rte_flow_actions_template_destroy(port, state.mark_queue_actions_template, &error); + state.mark_queue_actions_template = nullptr; + } + for (auto* tmpl : state.ipv4_udp_pattern_templates) { + if (tmpl != nullptr) { rte_flow_pattern_template_destroy(port, tmpl, &error); } + } + state.ipv4_udp_pattern_templates.clear(); + state.templates_ready = false; + state.configured = false; + + if (rx_flow_jump_rules_[port] != nullptr) { + rte_flow_destroy(port, rx_flow_jump_rules_[port], &error); + rx_flow_jump_rules_[port] = nullptr; + } + } +} + struct rte_flow_item_flex_handle *DpdkMgr::create_flex_flow_rule( int port, int offset, struct rte_flow_item *udp_item, struct rte_flow_item *end_pattern) { static struct rte_flow_item_flex_handle *item_handle = NULL; @@ -2553,36 +3560,6 @@ struct rte_flow_item_flex_handle *DpdkMgr::create_flex_flow_rule( return item_handle; } - { - struct rte_flow_error jump_error; - struct rte_flow_attr jump_attr; - jump_attr.group = 0; - jump_attr.ingress = 1; - struct rte_flow_action_jump jump_v; - jump_v.group = 1; - struct rte_flow_action jump_actions[2]; - jump_actions[0].type = RTE_FLOW_ACTION_TYPE_JUMP; - jump_actions[0].conf = &jump_v; - jump_actions[1].type = RTE_FLOW_ACTION_TYPE_END; - - struct rte_flow_item jump_pattern[2]; - jump_pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; - jump_pattern[0].spec = 0; - jump_pattern[0].mask = 0; - jump_pattern[1].type = RTE_FLOW_ITEM_TYPE_END; - - int res = rte_flow_validate(port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (!res) { - struct rte_flow* flow = rte_flow_create( - port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (flow == NULL) { - printf("rte_flow_create failed"); - } - } else { - printf("Failed flow validation: %d\n", res); - } - } - struct rte_flow_item_flex_conf flex_conf; flex_conf.tunnel = FLEX_TUNNEL_MODE_SINGLE; memset(&flex_conf.next_header, 0, sizeof(flex_conf.next_header)); @@ -2621,7 +3598,7 @@ struct rte_flow_item_flex_handle *DpdkMgr::create_flex_flow_rule( } struct rte_flow* DpdkMgr::add_flex_item_flow( - int port, const FlexItemMatch& match_info, uint16_t queue_id) { + int port, const FlexItemMatch& match_info, uint16_t queue_id, FlowId mark_id) { /* Declaring structs being used. 8< */ struct rte_flow_attr attr; struct rte_flow_item pattern[MAX_PATTERN_NUM]; @@ -2636,6 +3613,8 @@ struct rte_flow* DpdkMgr::add_flex_item_flow( int res; const auto& flex_item_config = cfg_.ifs_[port].rx_.flex_items_[match_info.flex_item_id_]; + if (!ensure_rx_flow_jump_rule(static_cast(port))) { return nullptr; } + memset(pattern, 0, sizeof(pattern)); memset(action, 0, sizeof(action)); memset(&attr, 0, sizeof(struct rte_flow_attr)); @@ -2644,15 +3623,18 @@ struct rte_flow* DpdkMgr::add_flex_item_flow( memset(&udp_spec, 0, sizeof(struct rte_flow_item_udp)); memset(&udp_mask, 0, sizeof(struct rte_flow_item_udp)); - // struct rte_flow_action_mark mark; - // mark.id = 0x40 + queue_id; - - action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE; - action[0].conf = &queue; - action[1].type = RTE_FLOW_ACTION_TYPE_END; - // action[1].type = RTE_FLOW_ACTION_TYPE_MARK; - // action[1].conf = &mark; - // action[2].type = RTE_FLOW_ACTION_TYPE_END; + struct rte_flow_action_mark mark = {.id = mark_id}; + if (mark_id != 0) { + action[0].type = RTE_FLOW_ACTION_TYPE_MARK; + action[0].conf = &mark; + action[1].type = RTE_FLOW_ACTION_TYPE_QUEUE; + action[1].conf = &queue; + action[2].type = RTE_FLOW_ACTION_TYPE_END; + } else { + action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE; + action[0].conf = &queue; + action[1].type = RTE_FLOW_ACTION_TYPE_END; + } pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4; @@ -2719,7 +3701,7 @@ struct rte_flow* DpdkMgr::add_flex_item_flow( attr.ingress = 1; attr.priority = 0; - attr.group = 1; + attr.group = kRxFlowGroup; /* Validate the rule and create it */ res = rte_flow_validate(port, &attr, pattern, action, &error); @@ -2742,6 +3724,46 @@ struct rte_flow* DpdkMgr::add_flex_item_flow( // Taken from flow_block.c DPDK example */ +bool DpdkMgr::ensure_rx_flow_jump_rule(uint16_t port) { + if (port >= rx_flow_jump_rules_.size()) { return false; } + if (rx_flow_jump_rules_[port] != nullptr) { return true; } + + struct rte_flow_error error; + struct rte_flow_attr attr; + struct rte_flow_action_jump jump = {.group = kRxFlowGroup}; + struct rte_flow_action actions[] = { + {.type = RTE_FLOW_ACTION_TYPE_JUMP, .conf = &jump}, + {.type = RTE_FLOW_ACTION_TYPE_END}, + }; + struct rte_flow_item pattern[] = { + {.type = RTE_FLOW_ITEM_TYPE_ETH, .spec = nullptr, .mask = nullptr}, + {.type = RTE_FLOW_ITEM_TYPE_END}, + }; + + memset(&error, 0, sizeof(error)); + memset(&attr, 0, sizeof(attr)); + attr.group = 0; + attr.ingress = 1; + + int ret = rte_flow_validate(port, &attr, pattern, actions, &error); + if (ret != 0) { + DAQIRI_LOG_ERROR("Failed to validate RX flow jump rule on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + return false; + } + + rx_flow_jump_rules_[port] = rte_flow_create(port, &attr, pattern, actions, &error); + if (rx_flow_jump_rules_[port] == nullptr) { + DAQIRI_LOG_ERROR("Failed to create RX flow jump rule on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + return false; + } + + return true; +} + struct rte_flow* DpdkMgr::add_flow(int port, const FlowConfig& cfg) { /* Declaring structs being used. 8< */ struct rte_flow_attr attr; @@ -2757,33 +3779,7 @@ struct rte_flow* DpdkMgr::add_flow(int port, const FlowConfig& cfg) { struct rte_flow_item_ipv4 ip_mask; int res; - // HWS requires using a non-zero group, so we make a jump event to group 3 for all ethernet - // packets - { - struct rte_flow_error jump_error; - struct rte_flow_attr jump_attr{.group = 0, .ingress = 1}; - struct rte_flow_action_jump jump_v = {.group = 3}; - struct rte_flow_action jump_actions[] = { - { .type = RTE_FLOW_ACTION_TYPE_JUMP, .conf = &jump_v}, - { .type = RTE_FLOW_ACTION_TYPE_END} - }; - - struct rte_flow_item jump_pattern[] = { - { .type = RTE_FLOW_ITEM_TYPE_ETH, .spec = 0, .mask = 0}, - { .type = RTE_FLOW_ITEM_TYPE_END}, - }; - - auto res = rte_flow_validate(port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (!res) { - struct rte_flow* flow = rte_flow_create( - port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (flow == nullptr) { - DAQIRI_LOG_ERROR("rte_flow_create failed"); - } - } else { - DAQIRI_LOG_ERROR("Failed flow validation: {}", res); - } - } + if (!ensure_rx_flow_jump_rule(static_cast(port))) { return nullptr; } memset(pattern, 0, sizeof(pattern)); memset(action, 0, sizeof(action)); @@ -2863,8 +3859,8 @@ struct rte_flow* DpdkMgr::add_flow(int port, const FlowConfig& cfg) { attr.ingress = 1; - attr.priority = 1; // Lower priority to allow drop_traffic (priority 0) to take precedence - attr.group = 3; + attr.priority = kRxFlowPriority; // Lower priority to allow drop_traffic to take precedence + attr.group = kRxFlowGroup; pattern[3].type = RTE_FLOW_ITEM_TYPE_END; @@ -2881,32 +3877,10 @@ Status DpdkMgr::drop_all_traffic(int port) { struct rte_flow_error error; DropTrafficConfig config; - // Initialize the jump rule to group 3 (required by HWS) - { - struct rte_flow_error jump_error; - struct rte_flow_attr jump_attr{.group = 0, .ingress = 1}; - struct rte_flow_action_jump jump_v = {.group = 3}; - struct rte_flow_action jump_actions[] = { - { .type = RTE_FLOW_ACTION_TYPE_JUMP, .conf = &jump_v}, - { .type = RTE_FLOW_ACTION_TYPE_END} - }; - - struct rte_flow_item jump_pattern[] = { - { .type = RTE_FLOW_ITEM_TYPE_ETH, .spec = 0, .mask = 0}, - { .type = RTE_FLOW_ITEM_TYPE_END}, - }; - - auto res = rte_flow_validate(port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (!res) { - config.jump = rte_flow_create( - port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (config.jump == nullptr) { - DAQIRI_LOG_ERROR("rte_flow_create failed for jump rule in drop_all_traffic"); - } - } else { - DAQIRI_LOG_ERROR("Failed flow validation for jump rule in drop_all_traffic: {}", res); - } + if (!ensure_rx_flow_jump_rule(static_cast(port))) { + return Status::INTERNAL_ERROR; } + config.jump = nullptr; // Clear all structures memset(pattern, 0, sizeof(pattern)); @@ -2921,10 +3895,10 @@ Status DpdkMgr::drop_all_traffic(int port) { pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; pattern[1].type = RTE_FLOW_ITEM_TYPE_END; - // Set highest priority (0) and use group 3 (consistent with add_flow) + // Set highest priority (0) and use the shared RX flow group. attr.ingress = 1; attr.priority = 0; // Highest priority - blocks all traffic - attr.group = 3; + attr.group = kRxFlowGroup; DAQIRI_LOG_INFO("Creating drop all traffic rule on port {} with priority {}", port, attr.priority); @@ -2935,7 +3909,6 @@ Status DpdkMgr::drop_all_traffic(int port) { if (config.drop == nullptr) { DAQIRI_LOG_ERROR("Failed to create drop all traffic flow rule on port {}: {}", port, error.message ? error.message : "unknown error"); - rte_flow_destroy(port, config.jump, &error); return Status::INTERNAL_ERROR; } else { DAQIRI_LOG_INFO("Successfully created drop all traffic rule on port {}", port); @@ -2956,22 +3929,15 @@ Status DpdkMgr::allow_all_traffic(int port) { // Tell the RX threads they can keep processing packets flush_rx_queues.store(false); - struct rte_flow_error jump_error; struct rte_flow_error drop_error; int drop_ret = rte_flow_destroy(port, drop_all_traffic_flow[port].drop, &drop_error); - int jump_ret = rte_flow_destroy(port, drop_all_traffic_flow[port].jump, &jump_error); if (drop_ret != 0) { DAQIRI_LOG_ERROR("Failed to destroy drop all traffic flow rule on port {}: {}", port, drop_error.message ? drop_error.message : "unknown error"); } - if (jump_ret != 0) { - DAQIRI_LOG_ERROR("Failed to destroy jump all traffic flow rule on port {}: {}", - port, jump_error.message ? jump_error.message : "unknown error"); - } - - if (drop_ret != 0 || jump_ret != 0) { + if (drop_ret != 0) { return Status::INTERNAL_ERROR; } @@ -3133,6 +4099,7 @@ void DpdkMgr::PrintDpdkStats(int port) { DpdkMgr::~DpdkMgr() { cleanup_reorder_state(); + if (eal_initialized_) { cleanup_dynamic_flows(); } // shutdown() handles ring cleanup BEFORE rte_eal_cleanup(), so the maps // are empty by the time we get here on the normal exit path. The loops @@ -3159,7 +4126,7 @@ bool DpdkMgr::validate_config() const { if (!Manager::validate_config()) { return false; } for (const auto& intf : cfg_.ifs_) { - std::unordered_map flow_to_queue; + std::unordered_map flow_to_queue; for (const auto& flow : intf.rx_.flows_) { if (flow_to_queue.find(flow.id_) != flow_to_queue.end()) { DAQIRI_LOG_ERROR("Duplicate flow ID {} on interface '{}'", flow.id_, intf.name_); @@ -3168,7 +4135,7 @@ bool DpdkMgr::validate_config() const { flow_to_queue.emplace(flow.id_, flow.action_.id_); } - std::unordered_set reorder_flow_ids; + std::unordered_set reorder_flow_ids; for (const auto& reorder_cfg : intf.rx_.reorder_configs_) { const bool use_gpu_backend = reorder_cfg.reorder_type_ == "gpu"; const bool use_cpu_backend = reorder_cfg.reorder_type_ == "cpu"; @@ -4048,7 +5015,7 @@ uint32_t DpdkMgr::get_packet_length(BurstParams* burst, int idx) { return reinterpret_cast(burst->pkts[0][idx])->pkt_len; } -uint16_t DpdkMgr::get_packet_flow_id(BurstParams* burst, int idx) { +FlowId DpdkMgr::get_packet_flow_id(BurstParams* burst, int idx) { if (burst == nullptr || idx < 0) { return 0; } if ((burst->hdr.hdr.burst_flags & kBurstFlagDpdkReordered) != 0U) { return 0; } if (idx >= static_cast(burst->hdr.hdr.num_pkts) || burst->pkts[0] == nullptr) { @@ -4416,6 +5383,7 @@ void DpdkMgr::shutdown() { stats_.Shutdown(); stats_thread_.join(); + cleanup_dynamic_flows(); // Release DPDK resources BEFORE rte_eal_cleanup(). Ring/mempool pointers // are owned by EAL memzones and become invalid once cleanup runs, so the diff --git a/src/managers/dpdk/daqiri_dpdk_mgr.h b/src/managers/dpdk/daqiri_dpdk_mgr.h index b72e62a..1bf61a5 100644 --- a/src/managers/dpdk/daqiri_dpdk_mgr.h +++ b/src/managers/dpdk/daqiri_dpdk_mgr.h @@ -48,8 +48,10 @@ #include #include #include +#include #include #include +#include #include "src/manager.h" #include #include "daqiri_dpdk_stats.h" @@ -104,7 +106,7 @@ class DpdkMgr : public Manager { void* get_packet_ptr(BurstParams* burst, int idx) override; uint32_t get_segment_packet_length(BurstParams* burst, int seg, int idx) override; uint32_t get_packet_length(BurstParams* burst, int idx) override; - uint16_t get_packet_flow_id(BurstParams* burst, int idx) override; + FlowId get_packet_flow_id(BurstParams* burst, int idx) override; Status get_packet_rx_timestamp(BurstParams* burst, int idx, uint64_t* timestamp_ns) override; void* get_packet_extra_info(BurstParams* burst, int idx) override; Status get_tx_packet_burst(BurstParams* burst) override; @@ -141,6 +143,12 @@ class DpdkMgr : public Manager { Status get_mac_addr(int port, char* mac) override; Status drop_all_traffic(int port) override; Status allow_all_traffic(int port) override; + Status add_rx_flow_async(int port, const FlowRuleConfig& flow, FlowOpId* op_id) override; + Status add_rx_flows_async(int port, + const std::vector& flows, + FlowOpId* op_id) override; + Status delete_flow_async(FlowId flow_id, FlowOpId* op_id) override; + Status poll_flow_op(FlowOpResult* result) override; void shutdown() override; void print_stats() override; void adjust_memory_regions() override; @@ -163,6 +171,7 @@ class DpdkMgr : public Manager { bool calibrate_rx_timestamp_clock(uint16_t port_id); int setup_pools_and_rings(int max_rx_batch, int max_tx_batch); struct rte_flow* add_flow(int port, const FlowConfig& cfg); + bool ensure_rx_flow_jump_rule(uint16_t port); void create_dummy_rx_q(); void create_dummy_tx_q(); struct rte_flow* add_modify_flow_set(int port, int queue, const char* buf, int len, @@ -170,7 +179,102 @@ class DpdkMgr : public Manager { static struct rte_flow_item_flex_handle *create_flex_flow_rule( int port, int offset, struct rte_flow_item *udp_item, struct rte_flow_item *end_pattern); - struct rte_flow* add_flex_item_flow(int port, const FlexItemMatch& match, uint16_t queue_id); + struct rte_flow* add_flex_item_flow(int port, + const FlexItemMatch& match, + uint16_t queue_id, + FlowId mark_id = 0); + + enum class DynamicFlowBackend { + LEGACY, + TEMPLATE, + }; + + enum class DynamicFlowState { + ADDING, + ACTIVE, + DELETING, + }; + + struct DynamicFlowEntry { + FlowId flow_id = 0; + uint16_t port = 0; + uint16_t queue = 0; + struct rte_flow* flow = nullptr; + DynamicFlowBackend backend = DynamicFlowBackend::LEGACY; + DynamicFlowState state = DynamicFlowState::ADDING; + uint32_t flow_queue_id = 0; + std::shared_ptr backend_storage; + }; + + struct PendingFlowBatch { + FlowOpResult result; + size_t remaining = 0; + }; + + struct PendingFlowCompletion { + FlowOpId op_id = 0; + FlowId flow_id = 0; + size_t flow_index = 0; + }; + + struct PortFlowTemplateState { + bool configured = false; + bool templates_ready = false; + uint32_t flow_queue_id = 0; + uint32_t capacity = DEFAULT_DYNAMIC_FLOW_CAPACITY; + struct rte_flow_template_table* ipv4_udp_table = nullptr; + std::vector ipv4_udp_pattern_templates; + struct rte_flow_actions_template* mark_queue_actions_template = nullptr; + }; + + FlowOpId allocate_flow_op_id(); + FlowId allocate_dynamic_flow_id(); + bool reserve_static_flow_ids(); + bool validate_dynamic_rx_flow(int port, const FlowRuleConfig& flow) const; + bool is_valid_rx_queue(int port, uint16_t queue_id) const; + bool is_ipv4_udp_flow_match(const FlowMatch& match) const; + bool ipv4_udp_flow_template_index(const FlowMatch& match, uint8_t* template_index) const; + void build_ipv4_udp_flow_pattern(const FlowMatch& match, + struct rte_flow_item pattern[], + struct rte_flow_item_ipv4* ip_spec, + struct rte_flow_item_udp* udp_spec) const; + void build_mark_queue_actions(FlowId flow_id, + uint16_t queue_id, + struct rte_flow_action action[], + struct rte_flow_action_mark* mark, + struct rte_flow_action_queue* queue) const; + Status enqueue_software_flow_completion(const FlowOpResult& result); + Status add_rx_flows_legacy_locked(int port, + const std::vector& flows, + const std::vector& flow_ids, + FlowOpId op_id); + Status create_dynamic_flow_legacy_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id); + Status add_rx_flow_legacy_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id, + FlowOpId op_id); + Status enqueue_rx_flow_template_create_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id, + FlowOpId completion_id); + Status add_rx_flow_template_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id, + FlowOpId op_id); + Status add_rx_flows_template_locked(int port, + const std::vector& flows, + const std::vector& flow_ids, + FlowOpId op_id); + Status delete_flow_legacy_locked(DynamicFlowEntry& entry, FlowOpId op_id); + Status delete_flow_template_locked(DynamicFlowEntry& entry, FlowOpId op_id); + bool configure_flow_api_for_port(uint16_t port, uint32_t capacity); + bool ensure_ipv4_udp_flow_template_table(uint16_t port); + void poll_dpdk_flow_completions_locked(); + void cleanup_dynamic_flows(); + static void* flow_op_user_data(FlowOpId op_id); + static FlowOpId flow_op_id_from_user_data(void* user_data); void apply_tx_offloads(int port); @@ -266,7 +370,7 @@ class DpdkMgr : public Manager { struct ReorderQueueState { bool enabled = false; bool single_plan_fast_path = false; - std::unordered_map flow_id_to_plan; + std::unordered_map flow_id_to_plan; std::vector plans; std::vector> plan_pkt_indices; std::vector plan_pkt_counts; @@ -315,6 +419,8 @@ class DpdkMgr : public Manager { void release_reorder_output_context(BurstParams* burst); std::array mac_addrs; + std::array rx_flow_jump_rules_{}; + std::array flow_template_states_{}; std::unordered_map rx_rings; struct rte_ether_addr conf_ports_eth_addr[RTE_MAX_ETHPORTS]; std::unordered_map flex_item_handles_; @@ -331,6 +437,14 @@ class DpdkMgr : public Manager { std::unordered_map reorder_queue_states_; std::mutex reorder_lock_; std::array drop_all_traffic_flow; + std::mutex flow_lock_; + FlowId next_dynamic_flow_id_ = 1; + FlowOpId next_flow_op_id_ = 1; + std::unordered_set static_flow_ids_; + std::unordered_map dynamic_flows_; + std::unordered_map pending_flow_batches_; + std::unordered_map pending_flow_completions_; + std::queue ready_flow_ops_; int timestamp_dynfield_offset_{-1}; uint64_t rx_timestamp_dynflag_mask_{0}; uint64_t tx_timestamp_dynflag_mask_{0}; diff --git a/src/managers/rdma/daqiri_rdma_mgr.h b/src/managers/rdma/daqiri_rdma_mgr.h index b95c4e9..1577fbf 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.h +++ b/src/managers/rdma/daqiri_rdma_mgr.h @@ -107,7 +107,7 @@ class RdmaMgr : public Manager { void* get_packet_ptr(BurstParams* burst, int idx) override; uint32_t get_packet_length(BurstParams* burst, int idx) override; - uint16_t get_packet_flow_id(BurstParams* burst, int idx) override { return 0; } + FlowId get_packet_flow_id(BurstParams* burst, int idx) override { return 0; } Status get_packet_rx_timestamp(BurstParams* burst, int idx, uint64_t* timestamp_ns) override { (void)burst; (void)idx; diff --git a/src/managers/socket/daqiri_socket_mgr.cpp b/src/managers/socket/daqiri_socket_mgr.cpp index da0e412..05b4f3d 100644 --- a/src/managers/socket/daqiri_socket_mgr.cpp +++ b/src/managers/socket/daqiri_socket_mgr.cpp @@ -210,7 +210,7 @@ uint32_t SocketMgr::get_segment_packet_length(BurstParams* burst, int seg, int i return burst->pkt_lens[seg][idx]; } -uint16_t SocketMgr::get_packet_flow_id(BurstParams* burst, int idx) { +FlowId SocketMgr::get_packet_flow_id(BurstParams* burst, int idx) { return 0; } diff --git a/src/managers/socket/daqiri_socket_mgr.h b/src/managers/socket/daqiri_socket_mgr.h index 569100d..8774471 100644 --- a/src/managers/socket/daqiri_socket_mgr.h +++ b/src/managers/socket/daqiri_socket_mgr.h @@ -48,7 +48,7 @@ class SocketMgr : public Manager { uint32_t get_packet_length(BurstParams* burst, int idx) override; void* get_segment_packet_ptr(BurstParams* burst, int seg, int idx) override; uint32_t get_segment_packet_length(BurstParams* burst, int seg, int idx) override; - uint16_t get_packet_flow_id(BurstParams* burst, int idx) override; + FlowId get_packet_flow_id(BurstParams* burst, int idx) override; Status get_packet_rx_timestamp(BurstParams* burst, int idx, uint64_t* timestamp_ns) override; void* get_packet_extra_info(BurstParams* burst, int idx) override;