惊群
惊群现象就是多个进程同时等待同一个事件,当有事件发生时,所有进程都被唤醒,同时去响应这个事件,最后只有一个进程能成功执行,而其他进程只能继续重新休眠。nginx 在master/workers模型下,多个worker进程进行epoll_wait,当有新的连接事件发生,会唤醒所有进程的epoll_wait,结果有些进程发现accpte是失败的。这样的话,开销变大,性能也会随之下降。
nginx的解决方案
解决这个问题的方案总结下来就是一句话:
同一时刻只允许有一个worker进程将需要监听的句柄加入到自己的epoll中。
accpet锁机制
通过使用加锁的机制,来做进程间的协调。如果开启了accept锁,则每个worker进程每次在进入epoll_wait获取事件之前,会进行尝试获取锁。只有获取到锁的worker进程才能将需要监听的句柄加入到自己的epoll中,进而去处理accpet事件。
不过,这里要注意几个逻辑:
- 如果该worker进程获取锁成功:
- 并且其上一次尝试获得锁是失败的(即上一次没有获得到锁),才会将现有的cycle->listening数组中需要监听的句柄加入到自己的epoll中,
- 要是上一次尝试获得锁也是成功的(即上一次该进程得到了锁,这次仍是该进程拥有),则会直接返回。
- 如果该worker进程获取锁失败,
- 并且其上一次尝试获得锁也是失败的,则会直接返回。
- 如果其上一次尝试获得锁是成功的,则会将现有的cycle->listening数组中已经监听的句柄从自己的epoll中移除。
POST事件
对于得到锁的进程,会携带NGX_POST_EVENTS标记属性。这样在epoll_wait后,会把需要处理的事件先加入到队列中,延迟处理,目的就是避免该进程长时间占有锁。
特别说明一下,这里的延迟队列分为两种:
- accept事件的延迟队列
- read/write事件的延迟队列
也就是对应的事件会加到对应的延迟队列中。进程在释放锁之前,会先处理accept事件的延迟队列,处理完后才会释放锁,然后再去处理超时事件和read/write事件的延迟队列,这样既保证的处理accpet的可靠性,也避免了长时间对锁的占用。
未得到锁的进程
与此同时,未得到锁的进程会在epoll_wait下至少等待ngx_accept_mutex_delay毫秒,因为有可能超时事件的发生时间会短与ngx_accept_mutex_delay,也有可能有普通的读写事件触发。总之,该进程会更新进程时间缓存,处理相关的事件。然后再次尝试锁的获取。
指令 accept_mutex
与它相关的指令:
- accept_mutex_delay – 默认 500 毫秒
看下代码逻辑实现
- ngx_use_accept_mutex: 当开启了accept_mutex,且worker进程数 > 1, ngx_use_accept_mutex会置为1
- ngx_process_events 是一个函数指针。普通网络套接字调用ngx_epoll_process_events函数开始处理,异步文件i/o设置事件的回调方法为ngx_epoll_eventfd_handler。这里主要看下ngx_epoll_process_events函数逻辑。
- 注意:这里仅仅是为了方便阅读,只是展现了大概的代码逻辑,省略了部分代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| void ngx_process_events_and_timers(ngx_cycle_t *cycle) {
if (ngx_use_accept_mutex) {
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; }
if (ngx_accept_mutex_held) { flags |= NGX_POST_EVENTS; } else { if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } }
(void) ngx_process_events(cycle, timer, flags);
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); }
if (delta) { ngx_event_expire_timers(); }
ngx_event_process_posted(cycle, &ngx_posted_events); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
if (ngx_accept_mutex_held && ngx_accept_events == 0) { return NGX_OK; }
if (ngx_enable_accept_events(cycle) == NGX_ERROR) { ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; }
ngx_accept_events = 0; ngx_accept_mutex_held = 1;
return NGX_OK; }
if (ngx_accept_mutex_held) { if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) { return NGX_ERROR; }
ngx_accept_mutex_held = 0; }
return NGX_OK; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { events = epoll_wait(ep, event_list, (int) nevents, timer);
ngx_time_update();
for (i = 0; i < events; i++) {
if ((revents & EPOLLIN) && rev->active) {
if (flags & NGX_POST_EVENTS) { queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events;
ngx_post_event(rev, queue);
} else { rev->handler(rev); }
}
if ((revents & EPOLLOUT) && wev->active) {
if (flags & NGX_POST_EVENTS) { ngx_post_event(wev, &ngx_posted_events); } else { wev->handler(wev); }
} } }
|