I am using DPDK
to receive UDP packets at a high throughput. When I try to free rte_mbufs
(UDP packets) using rte_pktmbuf_free_bulk()
on a thread not running on a DPDK lcore
, the rte_mbuf
s are not actually freed, which leads to the inability to receive more than 4096 packets. Do I need to move all my logic onto threads running on lcore
s, or is there something else I can do to make this work?
I am new to DPDK and inherited the initialization part of the code, which is entirely too long to include here. I am not sure which specific details about the configuration might be important or relevant, but will be happy to supplement this post if anyone has specific questions about the DPDK setup. However, I have included the main RX loop code and a description of it, below.
_receive_packets()
, which is only called using rte_eal_remote_launch()
, which is why the arguments are passed in with a void pointer.thread_arg
is a struct that I have defined to pass in all the arguments I need when calling this function with rte_eal_remote_launch()
._receive_packets()
must be a static class method, which is why you see the strange input_arg->ds_instance
notation. input_arg->ds_instance
is simply a pointer to the Data_Streamer
instance that called _receive_packets()
.input_arg->ds_instance->m_active
becomes false when it is time for the application to shut down.BURST_SIZE
is 512IO_Job
is a struct that I use to send the packets to another (non lcore) thread using an rte_ring
to be processed and streamed to file. The code that dequeues the jobs calls rte_eal_remote_launch()
on the packets and deletes any object created in _receive_packets()
, so it may look like there is a memory leak, but there is not.input_arg->ds_instance->enqueue_io_job()
is a helper function that simply enqueues a job to the rte_ring
mentioned above.void Data_Streamer::_receive_packets(void* arg)
{
thread_arg* input_arg = (thread_arg*) arg;
uint16_t port_id = input_arg->port_id;
uint16_t queue_id = input_arg->queue_id;
long long bytes_recorded = 0;
while (input_arg->ds_instance->m_active)
{
// If BURST_SIZE packets not ready, then do nothing this cycle
if (rte_eth_rx_queue_count(port_id, queue_id) < BURST_SIZE) continue;
// Once data is available, get packets as burst
rte_mbuf** packets = new rte_mbuf*[BURST_SIZE];
const uint16_t got_num_packets = rte_eth_rx_burst(port_id, queue_id, packets, BURST_SIZE);
// Package packets into job and enqueue the job
if (got_num_packets)
{
IO_Job* job = new IO_Job;
job->byte_offset = bytes_recorded;
job->packets = packets;
job->num_packets = got_num_packets;
input_arg->ds_instance->enqueue_io_job(job);
}
}
}
rte_pktmbuf_free_bulk()
is called from _receive_packets()
immediately after the job is enqueued, then the packets are freed as expected, however, this creates a race condition where the data in the packets might be overwritten with new packet data before it can be processed on the non lcore
thread._receive_packets()
until the process running on the non lcore
thread sends a notification that it is done using the data, upon which the packets are freed in _receive_packets()
. In this case the behavior is similar to when the packets are freed in the non lcore
thread -- the packets are not freed. (As if just looking at the data in the non lcore
thread prevents it from being freed properly, even when freed from the lcore
thread.)rte_pktmbuf_mtod()
such that I never send pointers to the packets objects to the non lcore
thread, it still does not free the rte_mbuf
s properly.The solution was to execute ALL code that makes use of mbuf
s on DPDK lcore threads. There is no need to free the packets with rte_pktmbuf_free()
on all threads that use the packets, as I had previously suggested in my comments. If all threads are running on DPDK lcores, then only a single call to rte_pktmbuf_free()
is required.
I thought I could get away with reading data from the mbufs in threads not running on DPDK lcores, but apparently even read operations are only thread safe if the reads are performed on threads running on DPDK lcores.