eBPF 中提供了内核和用户空间之间高效地交换数据的机制--perf buffer,它是一种per-cpu的环形缓冲区,当我们需要将 eBPF 收集到的数据发送到用户空间记录或者处理时,就可以用perf buffer来完成。它还有如下特点:
eBPF提供了专门的map和helper function来使用perf buffer:
前面提到了这么多的概念可能比较迷糊,现在我们来实际操作一把,show me the code。
首先,在eBPF 中定义一个BPF_MAP_TYPE_PERF_EVENT_ARRAY类型的map;这个map可以不用指定max_entries,因为在libbpf中会默认设置max_entries为系统cpu个数。
/* BPF perfbuf map */
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(int));
__uint(value_size, sizeof(u32));
} my_map SEC(".maps");
然后,我们再构建一个用于通知用户态拷贝的prog。这个prog在sys_write挂接了一个kprobe处理程序,程序调用bpf_perf_event_output()来通知用户态拷贝数据data。
SEC("kprobe/sys_write")
int bpf_prog1(struct pt_regs *ctx)
{
struct eb {
u64 pid;
u64 cookie;
} data;
data.pid = bpf_get_current_pid_tgid();
data.cookie = 0x12345678;
bpf_perf_event_output(ctx, &my_map, 0, &data, sizeof(data));
return 0;
}
这部分代码是基于libbpf-bootstrap的BPF skeleton 框架来编写的。我们基于Coolbpf会有比较简洁的写法,做了一些封装。但是为了大家从源码处易于理解,我们采用libbpf-bootstrap示例一下。
struct perf_buffer *pb = NULL;
struct perf_buffer_opts pb_opts = {};
struct perfbuf_output_bpf *skel;
...
pb_opts.sample_cb = handle_event;
pb = perf_buffer__new(bpf_map__fd(skel->maps.my_map), 8 /* 32KB per CPU */, &pb_opts);
if (libbpf_get_error(pb)) {
err = -1;
fprintf(stderr, "Failed to create perf buffer\n");
goto cleanup;
}
...
while (!exiting) {
err = perf_buffer__poll(pb, 100 /* timeout, ms */);
/* Ctrl-C will cause -EINTR */
if (err == -EINTR) {
err = 0;
break;
}
if (err < 0) {
printf("Error polling perf buffer: %d\n", err);
break;
}
}
上面程序通过perf_buffer_new这个libbpf提供的库函数来创建一个struct perf_buffer;此外还通过pb_opts.samble_cb = handle_event设置了perf_buffer的通知回调处理函数。
然后通过perf_buffer__poll()等待内核的通知事件;当内核侧eBPF程序调用bpf_perf_event_output()发起通知时,用户态poll等待的任务就会被唤醒从而调用pb_opts.samble_cb指向的回调函数handle_event.
#define MAX_CNT 1000000000ll
struct eventbuf {
__u64 pid;
__u64 cookie;
} ;
void handle_event(void *ctx, int cpu, void *data, __u32 data_sz)
{
static __u64 cnt;
const struct eventbuf *eb = data;
if (eb->cookie != 0x12345678) {
printf("BUG pid %llx cookie %llx sized %d\n", eb->pid, eb->cookie, data_sz);
return;
}
cnt++;
if (cnt == MAX_CNT) {
printf("recv %lld events per sec\n",
MAX_CNT * 1000000000ll / (time_get_ns() - start_time));
}
return;
}
从前面的几个步骤可知,perf buffer的使用一般有以下几步:
今天我们比较简单的介绍了perf buffer编写eBPF代码的内核态和用户态部分的处理流程,在下一篇文章中,我们将介绍perf buffer用到的map:BPF_MAP_TYPE_PERF_EVENT_ARRAY的具体实现。