-
-
Save brammittendorff/8ba9ffc0bafa6ff6173fc895a1ab2125 to your computer and use it in GitHub Desktop.
af_packet_rx_ring_habrahabr.c
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdlib.h> | |
#include <stdio.h> | |
#include <string.h> | |
#include <iostream> | |
#include <unistd.h> | |
#include <boost/thread.hpp> | |
#include <sys/mman.h> | |
#include <poll.h> | |
#include <arpa/inet.h> | |
#include <sys/socket.h> | |
#include <sys/ioctl.h> | |
#include <net/if.h> | |
#include <linux/if_packet.h> | |
#include <net/ethernet.h> /* the L2 protocols */ | |
/* | |
Parser files: | |
https://github.com/FastVPSEestiOu/fastnetmon/blob/master/src/fastnetmon_packet_parser.c | |
https://github.com/FastVPSEestiOu/fastnetmon/blob/master/src/fastnetmon_packet_parser.h | |
*/ | |
#include "../fastnetmon_packet_parser.h" | |
// 4194304 bytes | |
unsigned int blocksiz = 1 << 22; | |
// 2048 bytes | |
unsigned int framesiz = 1 << 11; | |
unsigned int blocknum = 64; | |
struct block_desc { | |
uint32_t version; | |
uint32_t offset_to_priv; | |
struct tpacket_hdr_v1 h1; | |
}; | |
/* | |
Build it: | |
g++ ../fastnetmon_packet_parser.c -ofastnetmon_packet_parser.o -c | |
g++ af_packet.cpp fastnetmon_packet_parser.o -lboost_thread -lboost_system -lpthread | |
*/ | |
// Get interface number by name | |
int get_interface_number_by_device_name(int socket_fd, std::string interface_name) { | |
struct ifreq ifr; | |
memset(&ifr, 0, sizeof(ifr)); | |
if (interface_name.size() > IFNAMSIZ) { | |
return -1; | |
} | |
strncpy(ifr.ifr_name, interface_name.c_str(), sizeof(ifr.ifr_name)); | |
if (ioctl(socket_fd, SIOCGIFINDEX, &ifr) == -1) { | |
return -1; | |
} | |
return ifr.ifr_ifindex; | |
} | |
unsigned int af_packet_threads = 1; | |
uint64_t received_packets = 0; | |
uint64_t received_bytes = 0; | |
void speed_printer() { | |
while (true) { | |
uint64_t packets_before = received_packets; | |
boost::this_thread::sleep(boost::posix_time::seconds(1)); | |
uint64_t packets_after = received_packets; | |
uint64_t pps = packets_after - packets_before; | |
printf("We process: %llu pps\n", pps); | |
} | |
} | |
void flush_block(struct block_desc *pbd) { | |
pbd->h1.block_status = TP_STATUS_KERNEL; | |
} | |
void walk_block(struct block_desc *pbd, const int block_num) { | |
int num_pkts = pbd->h1.num_pkts, i; | |
unsigned long bytes = 0; | |
struct tpacket3_hdr *ppd; | |
ppd = (struct tpacket3_hdr *) ((uint8_t *) pbd + | |
pbd->h1.offset_to_first_pkt); | |
for (i = 0; i < num_pkts; ++i) { | |
bytes += ppd->tp_snaplen; | |
// struct ethhdr *eth = (struct ethhdr *) ((uint8_t *) ppd + ppd->tp_mac); | |
// Print packets | |
// #define PRINT_PACKETS | |
#ifdef PRINT_PACKETS | |
struct pfring_pkthdr packet_header; | |
memset(&packet_header, 0, sizeof(packet_header)); | |
packet_header.len = ppd->tp_snaplen; | |
packet_header.caplen = ppd->tp_snaplen; | |
u_int8_t timestamp = 0; | |
u_int8_t add_hash = 0; | |
u_char* data_pointer = (u_char*)((uint8_t *) ppd + ppd->tp_mac); | |
fastnetmon_parse_pkt(data_pointer, &packet_header, 4, timestamp, add_hash); | |
char print_buffer[512]; | |
fastnetmon_print_parsed_pkt(print_buffer, 512, data_pointer, &packet_header); | |
printf("%s\n", print_buffer); | |
#endif | |
ppd = (struct tpacket3_hdr *) ((uint8_t *) ppd + | |
ppd->tp_next_offset); | |
} | |
received_packets += num_pkts; | |
received_bytes += bytes; | |
} | |
int setup_socket(std::string interface_name, int fanout_group_id) { | |
// More details here: http://man7.org/linux/man-pages/man7/packet.7.html | |
// We could use SOCK_RAW or SOCK_DGRAM for second argument | |
// SOCK_RAW - raw packets pass from the kernel | |
// SOCK_DGRAM - some amount of processing | |
// Third argument manage ether type of captured packets | |
int packet_socket = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL)); | |
if (packet_socket == -1) { | |
printf("Can't create AF_PACKET socket\n"); | |
return -1; | |
} | |
// We whould use V3 bcause it could read/pool in per block basis instead per packet | |
int version = TPACKET_V3; | |
int setsockopt_packet_version = setsockopt(packet_socket, SOL_PACKET, PACKET_VERSION, &version, sizeof(version)); | |
if (setsockopt_packet_version < 0) { | |
printf("Can't set packet v3 version\n"); | |
return -1; | |
} | |
int interface_number = get_interface_number_by_device_name(packet_socket, interface_name); | |
if (interface_number == -1) { | |
printf("Can't get interface number by interface name\n"); | |
return -1; | |
} | |
// Switch to PROMISC mode | |
struct packet_mreq sock_params; | |
memset(&sock_params, 0, sizeof(sock_params)); | |
sock_params.mr_type = PACKET_MR_PROMISC; | |
sock_params.mr_ifindex = interface_number; | |
int set_promisc = setsockopt(packet_socket, SOL_PACKET, PACKET_ADD_MEMBERSHIP, (void *)&sock_params, sizeof(sock_params)); | |
if (set_promisc == -1) { | |
printf("Can't enable promisc mode\n"); | |
return -1; | |
} | |
struct sockaddr_ll bind_address; | |
memset(&bind_address, 0, sizeof(bind_address)); | |
bind_address.sll_family = AF_PACKET; | |
bind_address.sll_protocol = htons(ETH_P_ALL); | |
bind_address.sll_ifindex = interface_number; | |
// We will follow http://yusufonlinux.blogspot.ru/2010/11/data-link-access-and-zero-copy.html | |
// And this: https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt | |
struct tpacket_req3 req; | |
memset(&req, 0, sizeof(req)); | |
req.tp_block_size = blocksiz; | |
req.tp_frame_size = framesiz; | |
req.tp_block_nr = blocknum; | |
req.tp_frame_nr = (blocksiz * blocknum) / framesiz; | |
req.tp_retire_blk_tov = 60; // Timeout in msec | |
req.tp_feature_req_word = TP_FT_REQ_FILL_RXHASH; | |
int setsockopt_rx_ring = setsockopt(packet_socket, SOL_PACKET , PACKET_RX_RING , (void*)&req , sizeof(req)); | |
if (setsockopt_rx_ring == -1) { | |
printf("Can't enable RX_RING for AF_PACKET socket\n"); | |
return -1; | |
} | |
// We use per thread structures | |
uint8_t* mapped_buffer = NULL; | |
struct iovec* rd = NULL; | |
mapped_buffer = (uint8_t*)mmap(NULL, req.tp_block_size * req.tp_block_nr, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_LOCKED, packet_socket, 0); | |
if (mapped_buffer == MAP_FAILED) { | |
printf("mmap failed!\n"); | |
return -1; | |
} | |
// Allocate iov structure for each block | |
rd = (struct iovec*)malloc(req.tp_block_nr * sizeof(struct iovec)); | |
// Initilize iov structures | |
for (int i = 0; i < req.tp_block_nr; ++i) { | |
rd[i].iov_base = mapped_buffer + (i * req.tp_block_size); | |
rd[i].iov_len = req.tp_block_size; | |
} | |
int bind_result = bind(packet_socket, (struct sockaddr *)&bind_address, sizeof(bind_address)); | |
if (bind_result == -1) { | |
printf("Can't bind to AF_PACKET socket\n"); | |
return -1; | |
} | |
if (fanout_group_id) { | |
// PACKET_FANOUT_LB - round robin | |
// PACKET_FANOUT_CPU - send packets to CPU where packet arrived | |
int fanout_type = PACKET_FANOUT_CPU; | |
int fanout_arg = (fanout_group_id | (fanout_type << 16)); | |
int setsockopt_fanout = setsockopt(packet_socket, SOL_PACKET, PACKET_FANOUT, &fanout_arg, sizeof(fanout_arg)); | |
if (setsockopt_fanout < 0) { | |
printf("Can't configure fanout\n"); | |
return -1; | |
} | |
} | |
unsigned int current_block_num = 0; | |
struct pollfd pfd; | |
memset(&pfd, 0, sizeof(pfd)); | |
pfd.fd = packet_socket; | |
pfd.events = POLLIN | POLLERR; | |
pfd.revents = 0; | |
while (true) { | |
struct block_desc *pbd = (struct block_desc *) rd[current_block_num].iov_base; | |
if ((pbd->h1.block_status & TP_STATUS_USER) == 0) { | |
poll(&pfd, 1, -1); | |
continue; | |
} | |
walk_block(pbd, current_block_num); | |
flush_block(pbd); | |
current_block_num = (current_block_num + 1) % blocknum; | |
} | |
return packet_socket; | |
} | |
void start_af_packet_capture(std::string interface_name, int fanout_group_id) { | |
setup_socket(interface_name, fanout_group_id); | |
} | |
void get_af_packet_stats() { | |
// getsockopt PACKET_STATISTICS | |
} | |
// Could get some speed up on NUMA servers | |
bool execute_strict_cpu_affinity = false; | |
bool use_multiple_fanout_processes = true; | |
int main() { | |
int fanout_group_id = getpid() & 0xffff; | |
boost::thread speed_printer_thread( speed_printer ); | |
if (use_multiple_fanout_processes) { | |
boost::thread_group packet_receiver_thread_group; | |
unsigned int num_cpus = 8; | |
for (int cpu = 0; cpu < num_cpus; cpu++) { | |
boost::thread::attributes thread_attrs; | |
if (execute_strict_cpu_affinity) { | |
cpu_set_t current_cpu_set; | |
int cpu_to_bind = cpu % num_cpus; | |
CPU_ZERO(¤t_cpu_set); | |
// We count cpus from zero | |
CPU_SET(cpu_to_bind, ¤t_cpu_set); | |
int set_affinity_result = pthread_attr_setaffinity_np(thread_attrs.native_handle(), sizeof(cpu_set_t), ¤t_cpu_set); | |
if (set_affinity_result != 0) { | |
printf("Can't set CPU affinity for thread\n"); | |
} | |
} | |
packet_receiver_thread_group.add_thread( | |
new boost::thread(thread_attrs, boost::bind(start_af_packet_capture, "eth6", fanout_group_id)) | |
); | |
} | |
// Wait all processes for finish | |
packet_receiver_thread_group.join_all(); | |
} else { | |
start_af_packet_capture("eth6", 0); | |
} | |
speed_printer_thread.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment