本文将介绍eventfd这个system call的内核实现。

1. Prerequisite

summary(仅考虑eventfd的flags为0 ,同时eventfd counter 没有 exceed the maximum):

  1. the eventfd counter has a nonzero value, then a read returns 8 bytes containing that value, and the counter’s value is reset to zero;
  2. If the eventfd counter is zero at the time of the call to read, then the call blocks until the counter becomes nonzero;
  3. A write call adds the 8-byte integer value supplied in its buffer to the counter;
  4. eventfd_signal(struct eventfd_ctx *ctx, __u64 n): Adds @n to the eventfd counter.

2. Data struct

eventfd_ctx结构的形式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct eventfd_ctx {
struct kref kref;
wait_queue_head_t wqh;
/*
* Every time that a write(2) is performed on an eventfd, the
* value of the __u64 being written is added to "count" and a
* wakeup is performed on "wqh". A read(2) will return the "count"
* value to userspace, and will reset "count" to zero. The kernel
* side eventfd_signal() also, adds to the "count" counter and
* issue a wakeup.
*/
__u64 count;
unsigned int flags;
int id;
};

在一个eventfd上执行write系统调用,会向count加上被写入的值,并唤醒等待队列wqh中的元素。内核中的eventfd_signal函数也会增加count的值并唤醒wqh中的元素。

在eventfd上执行read系统调用,会向用户空间返回count的值,并且该eventfd对应的eventfd_ctx结构中的count会被清0。

kref是一个内核中的通用变量,一般插入到结构体中,用于记录该结构体被内核各处引用的次数,当kref->refcount为0时,该结构体不再被引用,需要进行释放。

flags由调用者传入,可能取值为EFD_CLOEXECEFD_NONBLOCKEFD_SEMAPHORE三者的任意或组合。

id即eventfd的id,用于唯一标识一个eventfd。

3. Core function

3.1 系统调用的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
SYSCALL_DEFINE1(eventfd, unsigned int, count)
{
return do_eventfd(count, 0);
}

static int do_eventfd(unsigned int count, int flags)
{
struct eventfd_ctx *ctx;
int fd;
...
ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
...
kref_init(&ctx->kref);
init_waitqueue_head(&ctx->wqh);
ctx->count = count;
ctx->flags = flags;
ctx->id = ida_simple_get(&eventfd_ida, 0, 0, GFP_KERNEL);

fd = anon_inode_getfd("[eventfd]", &eventfd_fops, ctx,
O_RDWR | (flags & EFD_SHARED_FCNTL_FLAGS));
...
return fd;
}
  1. 分配一个eventfd_ctx结构用于存储eventfd相关信息
  2. 设置eventfd_ctx->kref中的值为1,表明内核正在引用该eventfd
  3. 初始化eventfd_ctx结构中的等待队列
  4. 为eventfd_ctx结构中的count(读写eventfd时要操作的量)赋上系统调用传入的count
  5. 通过Linux提供的ida机制为eventfd_ctx结构中的id申请一个id
  6. 通过anon_inode_getfd创建一个文件实例,该文件的操作方法为eventfd_fops,fd->private_data为eventfd_ctx,文件实例名为eventfd。
  7. 返回该文件实例的文件描述符

3.2 eventfd_read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count,
loff_t *ppos)
{
struct eventfd_ctx *ctx = file->private_data;
ssize_t res;
__u64 ucnt = 0;
DECLARE_WAITQUEUE(wait, current);

res = -EAGAIN;
if (ctx->count > 0)
res = sizeof(ucnt);
else if (!(file->f_flags & O_NONBLOCK)) {
/*add to wait queue*/
__add_wait_queue(&ctx->wqh, &wait);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (ctx->count > 0) {
res = sizeof(ucnt);
break;
}
...
/*触发调度器,执行调度*/
schedule();
}
/*remove from the wait queue*/
__remove_wait_queue(&ctx->wqh, &wait);
__set_current_state(TASK_RUNNING);
}
if (likely(res > 0)) {
eventfd_ctx_do_read(ctx, &ucnt);
/*judge whether wait queue is empty*/
if (waitqueue_active(&ctx->wqh)) //在该eventfd上write阻塞的进程
wake_up_locked_poll(&ctx->wqh, EPOLLOUT); //唤醒对应的进程
}
...
return res;
}

static void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt)
{
*cnt = (ctx->flags & EFD_SEMAPHORE) ? 1 : ctx->count;
ctx->count -= *cnt;
}
  • 只有eventfd_ctx->count大于0时,eventfd才是可读的,此时调用eventfd_ctx_do_read对eventfd_ctx的count进行处理,如果eventfd_ctx->flags中的EFD_SEMAPHORE为0,就将count变量置0,并激活在等待队列中EPOLLOUT(write阻塞)的进程。

  • 如果eventfd_ctx->count等于0,即该eventfd当前不可读,如果eventfd_ctx->flags中的O_NONBLOCK没有置位,那么将发起读eventfd动作的进程放入eventfd_ctx中的等待队列,并重新调度新的进程运行。

3.3 eventfd_write

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count,
loff_t *ppos)
{
struct eventfd_ctx *ctx = file->private_data;
ssize_t res;
__u64 ucnt;
DECLARE_WAITQUEUE(wait, current);

copy_from_user(&ucnt, buf, sizeof(ucnt));
...
res = -EAGAIN;
if (ULLONG_MAX - ctx->count > ucnt)
res = sizeof(ucnt);
else if (!(file->f_flags & O_NONBLOCK)) {
__add_wait_queue(&ctx->wqh, &wait);
for (res = 0;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (ULLONG_MAX - ctx->count > ucnt) {
res = sizeof(ucnt);
break;
}
...
schedule();
}
__remove_wait_queue(&ctx->wqh, &wait);
__set_current_state(TASK_RUNNING);
}
if (likely(res > 0)) {
ctx->count += ucnt;
if (waitqueue_active(&ctx->wqh)) //在该eventfd上read阻塞的进程
wake_up_locked_poll(&ctx->wqh, EPOLLIN);
}

return res;
}

将想要写入eventfd的value赋值到ucnt,判断ULLONG_MAX - eventfd_ctx->count 与ucnt的大小,确认eventfd中是否还有足够空间用于写入。

  • 如果有足够空间用于写入,ctx->count += ucnt,并激活在等待队列中EPOLLIN(read阻塞)的进程。

  • 如果没有足够空间用于写入,则将发起写eventfd动作的进程放入eventfd_ctx中的等待队列,并重新调度新的进程运行。

3.4 eventfd_signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* eventfd_signal - Adds @n to the eventfd counter.
* @ctx: [in] Pointer to the eventfd context.
* @n: [in] Value of the counter to be added to the eventfd internal counter.
*/
__u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n)
{
...
if (ULLONG_MAX - ctx->count < n)
n = ULLONG_MAX - ctx->count;
ctx->count += n;
if (waitqueue_active(&ctx->wqh)) //在该eventfd上read阻塞的进程
wake_up_locked_poll(&ctx->wqh, EPOLLIN);
...

return n;
}

参考资料:

  1. Linux的eventfd机制
  2. eventfd分析
  3. Linux eventfd分析