diff --git a/zan-extension/config.m4 b/zan-extension/config.m4 index f4711af..6b52e3c 100644 --- a/zan-extension/config.m4 +++ b/zan-extension/config.m4 @@ -296,8 +296,9 @@ if test "$PHP_ZAN" != "no"; then src/core/array.c \ src/core/list.c \ src/core/heap.c \ - src/core/log.c \ - src/core/rbtree.c \ + src/core/log.c \ + src/core/rbtree.c \ + src/core/clock.c \ src/memory/ShareMemory.c \ src/memory/MemoryGlobal.c \ src/memory/RingBuffer.c \ diff --git a/zan-extension/include/swClock.h b/zan-extension/include/swClock.h new file mode 100644 index 0000000..7779517 --- /dev/null +++ b/zan-extension/include/swClock.h @@ -0,0 +1,28 @@ +/* + +----------------------------------------------------------------------+ + | Zan | + +----------------------------------------------------------------------+ + | Copyright (c) 2016-2017 Zan Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 2.0 of the Apache license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.apache.org/licenses/LICENSE-2.0.html | + | If you did not receive a copy of the Apache2.0 license and are unable| + | to obtain it through the world-wide-web, please send a note to | + | zan@zanphp.io so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Zan Group | + +----------------------------------------------------------------------+ +*/ + +#ifndef _SW_CLOCK_H_ +#define _SW_CLOCK_H_ + +#include + +int swClock_init(); +int swClock_get(struct timeval *tv); + +#endif + diff --git a/zan-extension/include/swGlobalDef.h b/zan-extension/include/swGlobalDef.h index 8e49c3f..054aaf1 100644 --- a/zan-extension/include/swGlobalDef.h +++ b/zan-extension/include/swGlobalDef.h @@ -70,6 +70,11 @@ struct _swServer */ uint32_t max_request; + /** + * request terminate timeout + */ + uint32_t terminate_timeout; + int timeout_sec; int timeout_usec; diff --git a/zan-extension/include/swStats.h b/zan-extension/include/swStats.h index 3705a17..95f0a22 100644 --- a/zan-extension/include/swStats.h +++ b/zan-extension/include/swStats.h @@ -27,6 +27,7 @@ typedef struct { sw_atomic_long_t total_request_count; sw_atomic_long_t request_count; sw_atomic_t start_count; + struct timeval accepted; } swWorkerStats; typedef struct @@ -50,8 +51,11 @@ typedef struct swLock lock; } swServerStats; -#define sw_stats_incr(val) sw_atomic_fetch_add(val, 1) -#define sw_stats_decr(val) sw_atomic_fetch_sub(val, 1) +#define sw_stats_atom_incr(val) sw_atomic_fetch_add(val, 1) +#define sw_stats_atom_decr(val) sw_atomic_fetch_sub(val, 1) +#define sw_stats_incr(val) ((*val)++) +#define sw_stats_decr(val) ((*val)--) + void sw_stats_set_worker_status(swWorker *worker, int status); diff --git a/zan-extension/src/core/clock.c b/zan-extension/src/core/clock.c new file mode 100644 index 0000000..6967b74 --- /dev/null +++ b/zan-extension/src/core/clock.c @@ -0,0 +1,126 @@ +/* + +----------------------------------------------------------------------+ + | Zan | + +----------------------------------------------------------------------+ + | Copyright (c) 2016-2017 Zan Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 2.0 of the Apache license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.apache.org/licenses/LICENSE-2.0.html | + | If you did not receive a copy of the Apache2.0 license and are unable| + | to obtain it through the world-wide-web, please send a note to | + | zan@zanphp.io so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Zan Group | + +----------------------------------------------------------------------+ +*/ + +#if defined(HAVE_CLOCK_GETTIME) +#include +#endif + +#include "swClock.h" +#include "swLog.h" + + +#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) + +static int monotonic_works; + +int swClock_init() +{ + struct timespec ts; + + monotonic_works = 0; + + if (0 == clock_gettime(CLOCK_MONOTONIC, &ts)) { + monotonic_works = 1; + } + + return 0; +} + +int swClock_get(struct timeval *tv) +{ + if (monotonic_works) { + struct timespec ts; + + if (0 > clock_gettime(CLOCK_MONOTONIC, &ts)) { + swError("clock_gettime() failed"); + return -1; + } + + tv->tv_sec = ts.tv_sec; + tv->tv_usec = ts.tv_nsec / 1000; + return 0; + } + + return gettimeofday(tv, 0); +} + +/* macosx clock */ +#elif defined(HAVE_CLOCK_GET_TIME) + +#include +#include +#include + +static clock_serv_t mach_clock; + +/* this code borrowed from here: http://lists.apple.com/archives/Darwin-development/2002/Mar/msg00746.html */ +/* mach_clock also should be re-initialized in child process after fork */ +int swClock_init() +{ + kern_return_t ret; + mach_timespec_t aTime; + + ret = host_get_clock_service(mach_host_self(), REALTIME_CLOCK, &mach_clock); + + if (ret != KERN_SUCCESS) { + swError("host_get_clock_service() failed: %s", mach_error_string(ret)); + return -1; + } + + /* test if it works */ + ret = clock_get_time(mach_clock, &aTime); + + if (ret != KERN_SUCCESS) { + swError("clock_get_time() failed: %s", mach_error_string(ret)); + return -1; + } + + return 0; +} + +int swClock_get(struct timeval *tv) +{ + kern_return_t ret; + mach_timespec_t aTime; + + ret = clock_get_time(mach_clock, &aTime); + + if (ret != KERN_SUCCESS) { + swError("clock_get_time() failed: %s", mach_error_string(ret)); + return -1; + } + + tv->tv_sec = aTime.tv_sec; + tv->tv_usec = aTime.tv_nsec / 1000; + + return 0; +} + +#else /* no clock */ + +int swClock_init() +{ + return 0; +} + +int swClock_get(struct timeval *tv) +{ + return gettimeofday(tv, 0); +} + +#endif diff --git a/zan-extension/src/factory/ProcessPool.c b/zan-extension/src/factory/ProcessPool.c index 00f4bea..11516c1 100644 --- a/zan-extension/src/factory/ProcessPool.c +++ b/zan-extension/src/factory/ProcessPool.c @@ -182,7 +182,7 @@ int swProcessPool_dispatch(swProcessPool *pool, swEventData *data, int *dst_work } else { - sw_stats_incr(&worker->tasking_num); + sw_stats_atom_incr(&worker->tasking_num); } return ret; @@ -212,7 +212,7 @@ int swProcessPool_dispatch_blocking(swProcessPool *pool, swEventData *data, int } else { - sw_stats_incr(&worker->tasking_num); + sw_stats_atom_incr(&worker->tasking_num); } return ret; diff --git a/zan-extension/src/network/Manager.c b/zan-extension/src/network/Manager.c index 734f752..0905e0e 100644 --- a/zan-extension/src/network/Manager.c +++ b/zan-extension/src/network/Manager.c @@ -26,6 +26,7 @@ #include "swSignal.h" #include "swExecutor.h" #include "swBaseOperator.h" +#include "swClock.h" #include @@ -37,12 +38,16 @@ typedef struct } swManagerProcess; +// manager process static int swManager_loop_async(swFactory *factory); static int swManager_loop_sync(swFactory *factory); static void swManager_signal_handle(int sig); static pid_t swManager_spawn_worker(swFactory *factory, int worker_id); static void swManager_check_exit_status(swServer *serv, int worker_id, pid_t pid, int status); +static int swManager_init_process_check(); +static int swManager_check_request_timeout(); + static swManagerProcess ManagerProcess; //create worker child proccess @@ -472,6 +477,7 @@ static int swManager_loop_sync(swFactory *factory) swSignal_add(SIGRTMIN, swManager_signal_handle); #endif //swSignal_add(SIGINT, swManager_signal_handle); + swManager_init_process_check(); SwooleG.main_reactor = NULL; int pid = -1; @@ -531,7 +537,7 @@ static int swManager_loop_sync(swFactory *factory) } else { - sw_stats_incr(status == 0 ? &SwooleStats->worker_normal_exit + sw_stats_atom_incr(status == 0 ? &SwooleStats->worker_normal_exit : &SwooleStats->worker_abnormal_exit); swManager_check_exit_status(serv, index, pid, status); pid = 0; @@ -561,7 +567,7 @@ static int swManager_loop_sync(swFactory *factory) exit_worker = swHashMap_find_int(SwooleGS->task_workers.map, pid); if (exit_worker != NULL) { - sw_stats_incr(status == 0 ? &SwooleStats->task_worker_normal_exit + sw_stats_atom_incr(status == 0 ? &SwooleStats->task_worker_normal_exit : &SwooleStats->task_worker_abnormal_exit); swManager_check_exit_status(serv, exit_worker->id, pid, status); if (exit_worker->deleted == 1) //主动回收不重启 @@ -690,6 +696,38 @@ static pid_t swManager_spawn_worker(swFactory *factory, int worker_id) } } +static int swManager_init_process_check() +{ + if (SwooleG.serv->terminate_timeout) { + swSignal_add(SIGALRM, swManager_signal_handle); + alarm(1); + } + return SW_OK; +} + +static int swManager_check_request_timeout() +{ + int i, consumed; + swServer *serv = SwooleG.serv; + struct timeval tv; + struct timeval now; + + swClock_get(&now); + + for (i = 0; i < serv->worker_num; i++) { + tv = SwooleStats->workers[i].accepted; + if (serv->workers[i].status == SW_WORKER_BUSY) { + consumed = (now.tv_sec - tv.tv_sec) * 1000000 + (now.tv_usec - tv.tv_usec); + if (consumed > (serv->terminate_timeout * 1000000)) { + kill(serv->workers[i].pid, SIGUSR2); + } + } + } + + alarm(1); + return SW_OK; +} + static void swManager_signal_handle(int sig) { switch (sig) @@ -717,6 +755,10 @@ static void swManager_signal_handle(int sig) ManagerProcess.reload_task_worker = 1; } break; + case SIGALRM: + // check request terminate timeout + swManager_check_request_timeout(); + break; default: #ifdef SIGRTMIN if (sig == SIGRTMIN) diff --git a/zan-extension/src/network/Port.c b/zan-extension/src/network/Port.c index dbd823e..81bcc89 100644 --- a/zan-extension/src/network/Port.c +++ b/zan-extension/src/network/Port.c @@ -442,7 +442,7 @@ static int swPort_onRead_http(swReactor *reactor, swListenPort *port, swEvent *e goto close_fd; } - //support method:get post put delete patch head options + //support method:get post put delete patch head options if ((request->method > 0 && request->method <= HTTP_PATCH) || request->method == HTTP_OPTIONS) { //receive data of http header diff --git a/zan-extension/src/network/ReactorAccept.c b/zan-extension/src/network/ReactorAccept.c index 0dde345..a184b73 100644 --- a/zan-extension/src/network/ReactorAccept.c +++ b/zan-extension/src/network/ReactorAccept.c @@ -50,8 +50,8 @@ static swConnection* swConnection_create(swServer *serv, swListenPort *ls, int f { swConnection* connection = NULL; - sw_stats_incr(&SwooleStats->accept_count); - sw_stats_incr(&SwooleStats->connection_num); + sw_stats_atom_incr(&SwooleStats->accept_count); + sw_stats_atom_incr(&SwooleStats->connection_num); if (fd > swServer_get_maxfd(serv)) { diff --git a/zan-extension/src/network/ReactorThread.c b/zan-extension/src/network/ReactorThread.c index 3d7c821..51ef0ea 100644 --- a/zan-extension/src/network/ReactorThread.c +++ b/zan-extension/src/network/ReactorThread.c @@ -877,8 +877,8 @@ int swReactorThread_close(swReactor *reactor, int fd) assert(fd % serv->reactor_num == SwooleTG.id); } - sw_stats_incr(&SwooleStats->close_count); - sw_stats_decr(&SwooleStats->connection_num); + sw_stats_atom_incr(&SwooleStats->close_count); + sw_stats_atom_decr(&SwooleStats->connection_num); swTrace("Close Event.fd=%d|from=%d", fd, reactor->id); diff --git a/zan-extension/src/network/Server.c b/zan-extension/src/network/Server.c index 7768446..0c9ec64 100644 --- a/zan-extension/src/network/Server.c +++ b/zan-extension/src/network/Server.c @@ -30,11 +30,11 @@ #include "swConnection.h" #include "swBaseOperator.h" #include "swGlobalVars.h" - +#include "swClock.h" swServerG SwooleG; /// 超全局本地变量,此全局变量子进程中修改,其它进程不感知 swServerGS *SwooleGS = NULL; /// 超全局共享变量,此全局变量是基于共享内存的,修改字段,其它进程可感知 -swWorkerG SwooleWG; /// 进程内全局变量,此全局变量在worker进程内初始化 +swWorkerG SwooleWG; /// 进程内全局变量,此全局变量在worker进程内初始 swServerStats *SwooleStats = NULL; __thread swThreadG SwooleTG; /// 线程独立变量 @@ -356,6 +356,7 @@ void swServer_init(swServer *serv) bzero(serv, sizeof(swServer)); swoole_init(); + swClock_init(); serv->factory_mode = SW_MODE_BASE; serv->reactor_num = SW_REACTOR_NUM > SW_REACTOR_MAX_THREAD ? SW_REACTOR_MAX_THREAD : SW_REACTOR_NUM; @@ -984,9 +985,9 @@ swListenPort* swServer_add_port(swServer *serv, int type, char *host, int port) swFatalError("this port is listen now"); return NULL; } - + } - + swListenPort *ls = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swListenPort)); if (ls == NULL) { diff --git a/zan-extension/src/network/TaskWorker.c b/zan-extension/src/network/TaskWorker.c index 5b37b70..dc9af8c 100644 --- a/zan-extension/src/network/TaskWorker.c +++ b/zan-extension/src/network/TaskWorker.c @@ -81,8 +81,8 @@ int swTaskWorker_onTask(swProcessPool *pool, swEventData *task) ret = serv->onTask(serv, task); } - sw_stats_incr(&SwooleStats->workers[SwooleWG.id].request_count); - sw_stats_incr(&SwooleStats->workers[SwooleWG.id].total_request_count); + sw_stats_atom_incr(&SwooleStats->workers[SwooleWG.id].request_count); + sw_stats_atom_incr(&SwooleStats->workers[SwooleWG.id].total_request_count); return ret; } diff --git a/zan-extension/src/network/Worker.c b/zan-extension/src/network/Worker.c index 0c00a0d..ec3be7a 100644 --- a/zan-extension/src/network/Worker.c +++ b/zan-extension/src/network/Worker.c @@ -20,9 +20,14 @@ #include "swSignal.h" #include "swServer.h" #include "swWork.h" +#include "swClock.h" #include #include +#include + +static sigjmp_buf bl; +static swHashMap *activefd = NULL; static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event); @@ -43,6 +48,8 @@ int swWorker_create(swWorker *worker) } swMutex_create(&worker->lock, 1); + activefd = swHashMap_create(128, NULL); + return SW_OK; } @@ -52,6 +59,39 @@ void swWorker_free(swWorker *worker) { sw_shm_free(worker->send_shm); } + if (activefd) { + swHashMap_free(activefd); + } +} + +static void swWorker_check_timeout() +{ + struct timeval tv; + struct timeval now; + int consumed; + + swClock_get(&now); + tv = SwooleStats->workers[SwooleWG.id].accepted; + consumed = (now.tv_sec - tv.tv_sec) * 1000000 + (now.tv_usec - tv.tv_usec); + if (consumed > (SwooleG.serv->terminate_timeout * 1000000)) { + if (SwooleG.main_reactor) { + SwooleG.main_reactor->running = 0; + } else { + SwooleG.running = 0; + } + siglongjmp(bl, 1); + } +} + +static void swWorker_discard_connections() +{ + uint64_t key; + swFactory *factory = SwooleG.factory; + + while (swHashMap_each_int(activefd, &key) != NULL) { + swWarn("discard connect: %d", key); + factory->end(factory, key); + } } void swWorker_signal_init(void) @@ -59,7 +99,6 @@ void swWorker_signal_init(void) swSignal_add(SIGHUP, NULL); swSignal_add(SIGPIPE, NULL); swSignal_add(SIGUSR1, swWorker_signal_handler); - swSignal_add(SIGUSR2, NULL); //swSignal_add(SIGINT, swWorker_signal_handler); swSignal_add(SIGTERM, swWorker_signal_handler); swSignal_add(SIGALRM, swSystemTimer_signal_handler); @@ -68,6 +107,7 @@ void swWorker_signal_init(void) #ifdef SIGRTMIN swSignal_set(SIGRTMIN, swWorker_signal_handler, 1, 0); #endif + swSignal_set(SIGUSR2, swWorker_signal_handler, 1, 0); } void swWorker_signal_handler(int signo) @@ -128,7 +168,7 @@ void swWorker_signal_handler(int signo) } break; case SIGUSR2: - swWarn("signal SIGUSR2 coming."); + swWorker_check_timeout(); break; default: #ifdef SIGRTMIN @@ -215,11 +255,12 @@ int swWorker_onTask(swFactory *factory, swEventData *task) } do_task: { + swHashMap_add_int(activefd, task->info.fd, activefd); serv->onReceive(serv, task); SwooleWG.request_count++; - sw_stats_incr(&SwooleStats->request_count); - sw_stats_incr(&SwooleStats->workers[SwooleWG.id].total_request_count); - sw_stats_incr(&SwooleStats->workers[SwooleWG.id].request_count); + sw_stats_atom_incr(&SwooleStats->request_count); + sw_stats_atom_incr(&SwooleStats->workers[SwooleWG.id].total_request_count); + sw_stats_atom_incr(&SwooleStats->workers[SwooleWG.id].request_count); } if (task->info.type == SW_EVENT_PACKAGE_END) { @@ -263,9 +304,10 @@ int swWorker_onTask(swFactory *factory, swEventData *task) if (package->offset == package->length - sizeof(swDgramPacket)) { SwooleWG.request_count++; - sw_stats_incr(&SwooleStats->request_count); - sw_stats_incr(&SwooleStats->workers[SwooleWG.id].total_request_count); - sw_stats_incr(&SwooleStats->workers[SwooleWG.id].request_count); + sw_stats_atom_incr(&SwooleStats->request_count); + sw_stats_atom_incr(&SwooleStats->workers[SwooleWG.id].total_request_count); + sw_stats_atom_incr(&SwooleStats->workers[SwooleWG.id].request_count); + swHashMap_add_int(activefd, task->info.fd, activefd); serv->onPacket(serv, task); swString_clear(package); } @@ -281,6 +323,7 @@ int swWorker_onTask(swFactory *factory, swEventData *task) } #endif factory->end(factory, task->info.fd); + swHashMap_del_int(activefd, task->info.fd); break; case SW_EVENT_CONNECT: @@ -450,11 +493,15 @@ int swWorker_loop(swFactory *factory, int worker_id) SwooleG.use_signalfd = 0; #endif + if (SwooleG.serv->terminate_timeout) { + SwooleG.use_signalfd = 0; + } + //worker_id SwooleWG.id = worker_id; SwooleWG.request_count = 0; SwooleStats->workers[SwooleWG.id].request_count = 0; - sw_stats_incr(&SwooleStats->workers[SwooleWG.id].start_count); + sw_stats_atom_incr(&SwooleStats->workers[SwooleWG.id].start_count); SwooleG.pid = getpid(); //signal init @@ -502,8 +549,16 @@ int swWorker_loop(swFactory *factory, int worker_id) swSignalfd_setup(SwooleG.main_reactor); } #endif + + if (sigsetjmp(bl, 1)) { + swWorker_discard_connections(); + goto clean; + } + //main loop SwooleG.main_reactor->wait(SwooleG.main_reactor, NULL); + +clean: //clear pipe buffer swWorker_clean(); //worker shutdown diff --git a/zan-extension/src/reactor/ReactorBase.c b/zan-extension/src/reactor/ReactorBase.c index 6c393d9..91e6db5 100644 --- a/zan-extension/src/reactor/ReactorBase.c +++ b/zan-extension/src/reactor/ReactorBase.c @@ -223,10 +223,13 @@ static void swReactor_onTimeout_and_Finish(swReactor *reactor) { swTimer_select(&SwooleG.timer); } + //server master - if (SwooleG.serv && SwooleTG.update_time) - { - swoole_update_time(); + if (SwooleG.serv) { + if (SwooleTG.update_time) + { + swoole_update_time(); + } } //defer callback @@ -383,7 +386,7 @@ static int swReactor_write(swReactor *reactor, int fd, void *buf, int n) goto do_buffer; } } - + #ifdef HAVE_KQUEUE else if (errno == EAGAIN || errno == ENOBUFS) #else diff --git a/zan-extension/src/reactor/ReactorEpoll.c b/zan-extension/src/reactor/ReactorEpoll.c index add4fe2..0716513 100644 --- a/zan-extension/src/reactor/ReactorEpoll.c +++ b/zan-extension/src/reactor/ReactorEpoll.c @@ -223,7 +223,7 @@ static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo) if (reactor->timeout_msec == 0) { - reactor->timeout_msec = (timeo == NULL)? -1:timeo->tv_sec * 1000 + timeo->tv_usec / 1000; + reactor->timeout_msec = (timeo == NULL)? -1:timeo->tv_sec * 1000 + timeo->tv_usec / 1000; } while (reactor->running > 0) diff --git a/zan-extension/swoole_server.c b/zan-extension/swoole_server.c index f2552e2..efdb9cb 100644 --- a/zan-extension/swoole_server.c +++ b/zan-extension/swoole_server.c @@ -1807,6 +1807,15 @@ PHP_METHOD(swoole_server, set) convert_to_long(value); serv->max_request = (int) Z_LVAL_P(value); } + + // request terminate timeout + value = NULL; + if (sw_zend_hash_find(vht, ZEND_STRS("request_terminate_timeout"), (void **)&value) == SUCCESS) + { + convert_to_long(value); + serv->terminate_timeout = (uint32_t)Z_LVAL_P(value); + } + //cpu affinity value = NULL; if (sw_zend_hash_find(vht, ZEND_STRS("open_cpu_affinity"), (void **) &value) == SUCCESS) @@ -2570,7 +2579,7 @@ PHP_METHOD(swoole_server, taskwait) if (swProcessPool_dispatch_blocking(&SwooleGS->task_workers, &buf, (int*) &dst_worker_id) >= 0) { task_notify_pipe->timeout = timeout; - sw_stats_incr(&SwooleStats->tasking_num); + sw_stats_atom_incr(&SwooleStats->tasking_num); int ret = task_notify_pipe->read(task_notify_pipe, ¬ify, sizeof(notify)); if (ret > 0) { @@ -2626,7 +2635,7 @@ PHP_METHOD(swoole_server, task) RETURN_FALSE; } - sw_stats_incr(&SwooleStats->tasking_num); + sw_stats_atom_incr(&SwooleStats->tasking_num); RETURN_LONG(buf.info.fd); } diff --git a/zan-extension/swoole_stats.c b/zan-extension/swoole_stats.c index 6e45ba1..4a197a4 100644 --- a/zan-extension/swoole_stats.c +++ b/zan-extension/swoole_stats.c @@ -18,6 +18,7 @@ #include "swGlobalVars.h" #include "swWork.h" +#include "swClock.h" void sw_stats_set_worker_status(swWorker *worker, int status) { @@ -28,6 +29,7 @@ void sw_stats_set_worker_status(swWorker *worker, int status) if (swIsWorker()) { sw_stats_incr(&SwooleStats->active_worker); + swClock_get(&SwooleStats->workers[SwooleWG.id].accepted); if (SwooleStats->active_worker > SwooleStats->max_active_worker) { SwooleStats->max_active_worker = SwooleStats->active_worker;