Skip to content
Open

Napi #629

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions main/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct gr_config {
unsigned max_mtu;
bool test_mode;
bool poll_mode;
bool napi; // --napi: adaptive interrupt (NAPI) rx; implies poll_mode
bool log_syslog;
bool log_packets;
bool override_default_route;
Expand Down
10 changes: 9 additions & 1 deletion main/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ static void usage(void) {
puts(" -m, --socket-mode PERMISSIONS API socket file permissions (Default: 0660).");
puts(" -o, --socket-owner USER:GROUP API socket file ownership");
puts(" -p, --poll-mode Disable automatic micro-sleep.");
puts(" -n, --napi Adaptive interrupt (NAPI) rx: idle");
puts(" workers block on rxq interrupts");
puts(" instead of polling. Implies poll-mode.");
puts(" -s, --socket PATH Path the control plane API socket.");
puts(" Default: GROUT_SOCK_PATH from env or");
printf(" %s).\n", GR_DEFAULT_SOCK_PATH);
Expand Down Expand Up @@ -185,11 +188,12 @@ static bool parse_bool_env(const char *name) {
static int parse_args(int argc, char **argv) {
int c;

#define FLAGS ":M:Vhm:o:pSs:tu:vx"
#define FLAGS ":M:Vhm:no:pSs:tu:vx"
static struct option long_options[] = {
{"help", no_argument, NULL, 'h'},
{"max-mtu", required_argument, NULL, 'u'},
{"metrics", required_argument, NULL, 'M'},
{"napi", no_argument, NULL, 'n'},
{"poll-mode", no_argument, NULL, 'p'},
{"socket", required_argument, NULL, 's'},
{"socket-mode", required_argument, NULL, 'm'},
Expand Down Expand Up @@ -233,6 +237,10 @@ static int parse_args(int argc, char **argv) {
case 'p':
gr_config.poll_mode = true;
break;
case 'n':
gr_config.napi = true;
gr_config.poll_mode = true;
break;
case 'M':
if (parse_metrics_addr(optarg) < 0)
return errno_set(EINVAL);
Expand Down
5 changes: 5 additions & 0 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ elif compiler.has_function(
grout_cflags += ['-DHAVE_RTE_FIB_TBL8_GET_STATS']
endif

# glibc >= 2.41 provides struct sched_attr and the sched_setattr() wrapper.
if compiler.has_header_symbol('sched.h', 'sched_setattr', args: '-D_GNU_SOURCE')
grout_cflags += ['-DHAVE_SCHED_SETATTR']
endif

src = []
inc = []

Expand Down
1 change: 1 addition & 0 deletions modules/infra/control/iface_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ bool vrf_has_interfaces(uint16_t) {
return false;
}
void control_queue_drain(uint32_t, const void *) { }
void worker_wakeup_all(void) { }
mock_func(struct iface *, __wrap_iface_from_id(uint16_t));
mock_func(int, port_unplug(struct iface_info_port *));
mock_func(int, port_plug(struct iface_info_port *));
Expand Down
5 changes: 5 additions & 0 deletions modules/infra/control/port.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ int port_configure(struct iface_info_port *p, uint16_t n_txq_min) {
if (info.dev_flags != NULL && *info.dev_flags & RTE_ETH_DEV_INTR_LSC) {
conf.intr_conf.lsc = 1;
}
// --napi: request RX queue interrupts so idle workers can block on them.
if (gr_config.napi)
conf.intr_conf.rxq = 1;

if ((ret = rte_eth_dev_configure(p->port_id, p->n_rxq, p->n_txq, &conf)) < 0)
return errno_log(-ret, "rte_eth_dev_configure");
Expand Down Expand Up @@ -269,6 +272,7 @@ static int port_mtu_set(struct iface *iface, uint16_t mtu) {
if ((ret = rte_eth_dev_start(p->port_id)) < 0)
return errno_log(-ret, "rte_eth_dev_start");
p->started = true;
worker_wakeup_all(); // let workers (re)arm now that the port is started
iface->mtu = mtu;

vec_foreach (struct iface *s, iface->subinterfaces)
Expand Down Expand Up @@ -361,6 +365,7 @@ static int iface_port_reconfig(
return errno_log(-ret, "rte_eth_dev_start");

p->started = true;
worker_wakeup_all(); // let workers (re)arm now that the port is started

return 0;
}
Expand Down
25 changes: 25 additions & 0 deletions modules/infra/control/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <pthread.h>
#include <sched.h>
#include <stdatomic.h>
#include <sys/eventfd.h>
#include <sys/queue.h>
#include <unistd.h>

Expand All @@ -42,9 +43,16 @@ int worker_create(unsigned cpu_id) {

worker->cpu_id = cpu_id;
worker->lcore_id = LCORE_ID_ANY;
worker->wakeup_fd = -1;
pthread_mutex_init(&worker->wakeup.lock, NULL);
pthread_cond_init(&worker->wakeup.cond, NULL);

worker->wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (worker->wakeup_fd < 0) {
ret = errno;
goto end;
}
Comment on lines +50 to +54

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🔴 Critical | ⚡ Quick win

Split cleanup by acquired resource state.

Line 53 can jump to end before the worker is inserted or the thread is created, but the error path still runs STAILQ_REMOVE() and pthread_cancel() on those unacquired resources. Track inserted/thread_created flags or use staged cleanup labels before routing early eventfd() failures through the shared teardown.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@modules/infra/control/worker.c` around lines 50 - 54, The early eventfd()
failure path in worker initialization is using the shared end cleanup even
though the worker has not yet been inserted into the queue or had its thread
created. Update the worker setup flow in the worker initialization routine to
track acquisition state with flags like inserted and thread_created, or split
into staged cleanup labels, so STAILQ_REMOVE() and pthread_cancel() only run
after those resources actually exist. Make sure the cleanup path for the
eventfd() failure exits before any teardown that assumes insertion or thread
creation has occurred.


CPU_ZERO(&cpuset);
CPU_SET(cpu_id, &cpuset);
pthread_attr_init(&attr);
Expand Down Expand Up @@ -76,6 +84,8 @@ int worker_create(unsigned cpu_id) {
pthread_cancel(worker->thread);
pthread_cond_destroy(&worker->wakeup.cond);
pthread_mutex_destroy(&worker->wakeup.lock);
if (worker->wakeup_fd >= 0)
close(worker->wakeup_fd);
rte_free(worker);
}
LOG(ERR, "worker %u start failed: %s", cpu_id, strerror(ret));
Expand All @@ -97,6 +107,7 @@ int worker_destroy(unsigned cpu_id) {
pthread_join(worker->thread, NULL);
pthread_cond_destroy(&worker->wakeup.cond);
pthread_mutex_destroy(&worker->wakeup.lock);
close(worker->wakeup_fd);
worker_graph_free(worker);
vec_free(worker->rxqs);
vec_free(worker->txqs);
Expand All @@ -115,10 +126,24 @@ void worker_wait_wakeup(struct worker *w) {
}

void worker_wakeup(struct worker *w) {
uint64_t one = 1;

pthread_mutex_lock(&w->wakeup.lock);
w->wakeup.set = true;
pthread_cond_signal(&w->wakeup.cond);
pthread_mutex_unlock(&w->wakeup.lock);

// The condvar above only reaches a worker parked on graph == NULL. A napi
// worker idle on rte_epoll_wait is woken through the eventfd instead.
// EAGAIN means a kick is already pending and undrained, which is enough.
if (write(w->wakeup_fd, &one, sizeof(one)) != sizeof(one) && errno != EAGAIN)
LOG(ERR, "worker %u wakeup_fd write: %s", w->cpu_id, strerror(errno));
Comment on lines +139 to +140

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Retry interrupted eventfd writes.

If write() returns -1/EINTR, the NAPI wakeup is not delivered; a worker blocked in rte_epoll_wait() can remain asleep and make teardown hang. Retry EINTR, while keeping EAGAIN as “already pending”.

Proposed fix
-	if (write(w->wakeup_fd, &one, sizeof(one)) != sizeof(one) && errno != EAGAIN)
-		LOG(ERR, "worker %u wakeup_fd write: %s", w->cpu_id, strerror(errno));
+	ssize_t n;
+
+	do {
+		n = write(w->wakeup_fd, &one, sizeof(one));
+	} while (n < 0 && errno == EINTR);
+
+	if (n < 0 && errno != EAGAIN)
+		LOG(ERR, "worker %u wakeup_fd write: %s", w->cpu_id, strerror(errno));
+	else if (n >= 0 && n != sizeof(one))
+		LOG(ERR, "worker %u wakeup_fd short write", w->cpu_id);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (write(w->wakeup_fd, &one, sizeof(one)) != sizeof(one) && errno != EAGAIN)
LOG(ERR, "worker %u wakeup_fd write: %s", w->cpu_id, strerror(errno));
ssize_t n;
do {
n = write(w->wakeup_fd, &one, sizeof(one));
} while (n < 0 && errno == EINTR);
if (n < 0 && errno != EAGAIN)
LOG(ERR, "worker %u wakeup_fd write: %s", w->cpu_id, strerror(errno));
else if (n >= 0 && n != sizeof(one))
LOG(ERR, "worker %u wakeup_fd short write", w->cpu_id);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@modules/infra/control/worker.c` around lines 139 - 140, The wakeup write in
worker wakeup handling does not retry interrupted writes, so an EINTR can leave
the NAPI wakeup undelivered and stall teardown. Update the wakeup path around
the write() call in the worker wakeup logic to loop and retry when errno is
EINTR, while still treating EAGAIN as a harmless already-pending wakeup; keep
the existing error logging in the worker code for real failures only.

}

void worker_wakeup_all(void) {
struct worker *worker;
STAILQ_FOREACH (worker, &workers, next)
worker_wakeup(worker);
}

unsigned worker_count(void) {
Expand Down
2 changes: 2 additions & 0 deletions modules/infra/control/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct worker {
unsigned cpu_id;
unsigned lcore_id;
pid_t tid;
int wakeup_fd; // eventfd: ctlplane writes, dataplane epoll-waits + drains (napi)

struct {
pthread_mutex_t lock;
Expand All @@ -87,6 +88,7 @@ int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id);
int worker_queue_distribute(const cpu_set_t *affinity, vec struct iface_info_port **ports);
void worker_wait_wakeup(struct worker *);
void worker_wakeup(struct worker *);
void worker_wakeup_all(void);
vec struct gr_stat *worker_dump_stats(uint16_t cpu_id);

int port_unplug(struct iface_info_port *);
Expand Down
Loading
Loading