1 //下面的代码离生产环境还差内存池和logger哦!
2 #include "socketserver.h"
5 #define DIGIT_PATTERN_STRING "^[0-9]+$"
6 void * epollWorkerRoutine(void *);
7 void * blockingSendEpollerRoutine(void *);
8 int isDigitStr(const char *str
){
12 if(!regcomp(®ex
,DIGIT_PATTERN_STRING
,REG_EXTENDED
/*这里不要传0哦,否则nomatch*/)){
13 ret
=!regexec(®ex
,str
, 1,matchs
,0);
19 static int setNonBlocking(int sock
)
22 opts
=fcntl(sock
,F_GETFL
);
25 perror("fcntl(sock,GETFL) failed!\n");
28 opts
= opts
|O_NONBLOCK
;
29 opts
=fcntl(sock
,F_SETFL
,opts
);
32 perror("fcntl(sock,SETFL,opts) failed!\n");
38 static void adjustQSSWorkerLimits(QSocketServer
*qss
){
39 //to adjust availabe size.
46 static WORD
addQSSWorker(QSocketServer
*qss
,WORD addCounter
){
48 if(qss
->workerCounter
<qss
->minThreads
||(qss
->currentBusyWorkers
==qss
->workerCounter
&&qss
->workerCounter
<qss
->maxThreads
))
50 QSSWORKER_PARAM
* pParam
=NULL
;
52 pthread_spin_lock(&qss
->g_spinlock
);
53 if(qss
->workerCounter
+addCounter
<=qss
->maxThreads
)
54 for(;i
<addCounter
;i
++)
56 pParam
=malloc(sizeof(QSSWORKER_PARAM
));
59 pthread_create(&pParam
->th
,NULL
,epollWorkerRoutine
,pParam
);
61 qss
->workerCounter
++,res
++;
64 pthread_spin_unlock(&qss
->g_spinlock
);
69 static void SOlogger(const char * msg
,SOCKET s
){
75 static int _InternalProtocolHandler(struct epoll_event
* event
,BlockingSender_t _blockingSender
,void * senderBase
){
76 QSSEPollEvent
*qssEPEvent
=event
->data
.ptr
;
78 printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno
,qssEPEvent
->client_s
,pthread_self());
79 if((ret
=recv(qssEPEvent
->client_s
,qssEPEvent
->buf
,MAX_BUF_SIZE
,0))>0){
81 ret
=_blockingSender(senderBase
,qssEPEvent
->client_s
,qssEPEvent
->buf
,ret
);
83 printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret
,errno
,strerror(errno
),qssEPEvent
->client_s
,pthread_self());
87 int createSocketServer(QSocketServer
** qss_ptr
,WORD passive
,WORD port
,CSocketLifecycleCallback cslifecb
,InternalProtocolHandler protoHandler
,WORD minThreads
,WORD maxThreads
,int workerWaitTimeout
)
90 QSocketServer
* qss
=malloc(sizeof(QSocketServer
));
93 qss
->minThreads
=minThreads
;
94 qss
->maxThreads
=maxThreads
;
95 qss
->workerWaitTimeout
=workerWaitTimeout
;
96 qss
->lifecycleStatus
=0;
97 pthread_spin_init(&qss
->g_spinlock
,PTHREAD_PROCESS_PRIVATE
);
99 qss
->currentBusyWorkers
=0;
100 qss
->CSocketsCounter
=0;
101 qss
->cslifecb
=cslifecb
,qss
->protoHandler
=protoHandler
;
102 if(!qss
->protoHandler
)
103 qss
->protoHandler
=_InternalProtocolHandler
;
104 adjustQSSWorkerLimits(qss
);
109 int startSocketServer(QSocketServer
*qss
)
114 pthread_spin_lock(&qss
->g_spinlock
);
115 if(qss
->lifecycleStatus
==0){
116 qss
->lifecycleStatus
=1;
117 pthread_spin_unlock(&qss
->g_spinlock
);
119 pthread_spin_unlock(&qss
->g_spinlock
);
123 //bzero(&qss->serv_addr, sizeof(qss->serv_addr));
125 qss
->serv_addr
.sin_family
=AF_INET
;
126 qss
->serv_addr
.sin_port
=htons(qss
->port
);
127 inet_aton("127.0.0.1",&(qss
->serv_addr
.sin_addr
));
128 //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");
130 qss
->server_s
=socket(AF_INET
,SOCK_STREAM
,IPPROTO_IP
);
131 if(setNonBlocking(qss
->server_s
)==-1)
133 SOlogger("setNonBlocking server_s failed.\n",0);
137 if(qss
->server_s
==INVALID_SOCKET
)
139 SOlogger("socket failed.\n",0);
143 if(bind(qss
->server_s
,(struct sockaddr
*)&qss
->serv_addr
,sizeof(SOCKADDR_IN
))==SOCKET_ERROR
)
145 SOlogger("bind failed.\n",qss
->server_s
);
149 if(listen(qss
->server_s
,SOMAXCONN
/*这个宏windows也有,这里是128,当然你可以设的小些,它影响开销的*/)==SOCKET_ERROR
)
151 SOlogger("listen failed.\n",qss
->server_s
);
154 qss
->epollFD
=epoll_create1(0);/*这里不是epoll_create(size)哦,你可能不知道如何设置size,所以忽略它吧*/
155 if(qss
->epollFD
==-1){
156 SOlogger("epoll_create1 0, main epollFD failed.\n",qss
->server_s
);
159 qss
->BSendEpollFD
=epoll_create1(0);//for blocking send.
160 if(qss
->BSendEpollFD
==-1){
161 SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss
->server_s
);
166 struct epoll_event _epEvent
;
167 QSSEPollEvent
*qssEPEvent
=malloc(sizeof(QSSEPollEvent
));
168 qssEPEvent
->client_s
=qss
->server_s
;
169 _epEvent
.events
=qssEPEvent
->curEvents
=EPOLLIN
|EPOLLET
;
170 _epEvent
.data
.ptr
=qssEPEvent
;
171 if(epoll_ctl(qss
->epollFD
,EPOLL_CTL_ADD
,qss
->server_s
,&_epEvent
)==-1){
172 SOlogger("epoll_ctl server_s to accept failed.\n",qss
->server_s
);
177 {//starup blocking send epoller.
178 QSSWORKER_PARAM
* pParam
=malloc(sizeof(QSSWORKER_PARAM
));
180 pthread_create(&pParam
->th
,NULL
,blockingSendEpollerRoutine
,pParam
);
183 //initialize worker for epoll events.
184 addQSSWorker(qss
,qss
->minThreads
);
185 qss
->lifecycleStatus
=2;
189 int shutdownSocketServer(QSocketServer
*qss
){
190 //change qss->lifecycleStatus
194 pthread_spin_lock(&qss
->g_spinlock
);
195 if(qss
->lifecycleStatus
==2){
196 qss
->lifecycleStatus
=3;
197 pthread_spin_unlock(&qss
->g_spinlock
);
199 pthread_spin_unlock(&qss
->g_spinlock
);
203 /*shutdown server-listening socket,这里优雅的做法是shutdown--notify-->epoll-->close.记得shutdown会发送EOF的哦*/
204 shutdown(qss
->server_s
,SHUT_RDWR
);
206 // /proc/getpid/fd shutdown all socket cs != serv_s
212 sprintf(dirBuf
,"/proc/%d/fd/",getpid());
215 while((de
=readdir(pd
))!=NULL
){
216 if(isDigitStr(de
->d_name
)){
217 sockFD
=atoi(de
->d_name
);
218 if(isfdtype(sockFD
,S_IFSOCK
))
219 shutdown(sockFD
,SHUT_RDWR
);
224 /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
229 static int onAcceptRoutine(QSocketServer
* qss
)
231 SOCKADDR_IN client_addr
;
232 unsigned int client_addr_leng
=sizeof(SOCKADDR_IN
);
234 struct epoll_event _epEvent
;
235 QSSEPollEvent
*qssEPEvent
=NULL
;
236 cs
=accept(qss
->server_s
,(struct sockaddr
*)&client_addr
,&client_addr_leng
);
237 if(cs
==INVALID_SOCKET
)
239 printf("onAccept failed:%d,%s\n",errno
,strerror(errno
));
240 epoll_ctl(qss
->epollFD
,EPOLL_CTL_DEL
,qss
->server_s
,NULL
);//EINVAL 22 Invalid argument
243 if(setNonBlocking(cs
)==-1)
245 printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs
);
249 {// set keepalive option
251 int keepIdle
= QSS_SIO_KEEPALIVE_VALS_TIMEOUT
;
252 int keepInterval
= QSS_SIO_KEEPALIVE_VALS_INTERVAL
;
253 int keepCount
= QSS_SIO_KEEPALIVE_VALS_COUNT
;
254 if(setsockopt(cs
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&keepAlive
, sizeof(keepAlive
))||
255 setsockopt(cs
, SOL_TCP
, TCP_KEEPIDLE
, (void *)&keepIdle
, sizeof(keepIdle
))||
256 setsockopt(cs
, SOL_TCP
, TCP_KEEPINTVL
, (void *)&keepInterval
, sizeof(keepInterval
))||
257 setsockopt(cs
, SOL_TCP
, TCP_KEEPCNT
, (void *)&keepCount
, sizeof(keepCount
)))
259 printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs
,strerror(errno
));
263 qssEPEvent
=malloc(sizeof(QSSEPollEvent
));
264 qssEPEvent
->client_s
=cs
;
266 _epEvent
.events
=qssEPEvent
->curEvents
=EPOLLIN
|EPOLLET
|EPOLLONESHOT
;
267 qssEPEvent
->BSendEpollFDRelated
=0;
268 _epEvent
.data
.ptr
=qssEPEvent
;/*这里又和教科的不一样哦,真正的user data用ptr,而不是单一的fd*/
269 if(epoll_ctl(qss
->epollFD
,EPOLL_CTL_ADD
,cs
,&_epEvent
)==-1){
270 printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs
,errno
);
274 pthread_spin_lock(&qss
->g_spinlock
);
275 qss
->CSocketsCounter
++;
276 pthread_spin_unlock(&qss
->g_spinlock
);
281 printf("onAccepted flags:err:%d ,cs:%d.\n",errno
,cs
);
287 QSSEPollEvent
* event
;
288 }InternalSenderBase_t
;
290 static int internalBlockingSender(void * senderBase
,int cs
, void * _buf
, size_t nbs
){
291 InternalSenderBase_t
*sb
=(InternalSenderBase_t
*)senderBase
;
293 int ret
=0,sum
=0,curEpoll_ctl_opt
,*errno_ptr
=&errno
;
295 QSSEPollEvent
*qssEPEvent
=NULL
;
296 struct epoll_event _epEvent
;
298 struct timespec sendTimeo
;
302 while(sum
<nbs
&&(ret
=send(cs
,_sbuf
,nbs
-sum
,0))>0)
307 if(errno
==EAGAIN
&&sum
<nbs
){
308 qssEPEvent
=sb
->event
;
309 _epEvent
.data
.ptr
=qssEPEvent
;
310 _epEvent
.events
=EPOLLOUT
|EPOLLET
|EPOLLONESHOT
;
311 if(qssEPEvent
->BSendEpollFDRelated
==0){
312 pthread_mutex_init(&qssEPEvent
->writableLock
,NULL
);
313 pthread_cond_init(&qssEPEvent
->writableMonitor
,NULL
);
314 qssEPEvent
->BSendEpollFDRelated
=1;
315 curEpoll_ctl_opt
=EPOLL_CTL_ADD
;
317 curEpoll_ctl_opt
=EPOLL_CTL_MOD
;
322 pthread_mutex_lock(&qssEPEvent
->writableLock
);
323 if(epoll_ctl(sb
->qss
->BSendEpollFD
,curEpoll_ctl_opt
,qssEPEvent
->client_s
,&_epEvent
)==0){
324 sendTimeo
.tv_nsec
=0,sendTimeo
.tv_sec
=time(NULL
)+BLOCKING_SEND_TIMEOUT
;
325 int err
=pthread_cond_timedwait(&qssEPEvent
->writableMonitor
,&qssEPEvent
->writableLock
,&sendTimeo
);
330 pthread_mutex_unlock(&qssEPEvent
->writableLock
);
336 if(errno
==EAGAIN
&&sum
==nbs
)
344 void * blockingSendEpollerRoutine(void *_param
){
345 QSSWORKER_PARAM
* pParam
=(QSSWORKER_PARAM
*)_param
;
346 QSocketServer
* qss
=pParam
->qss
;
347 //pthread_t * curThread=&pParam->th;
348 struct epoll_event epEvents
[qss
->maxThreads
];
349 QSSEPollEvent
*qssEPEvent
=NULL
;
350 int pollRes
,*errno_ptr
=&errno
;
352 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
,NULL
);
357 pollRes
=epoll_wait(qss
->BSendEpollFD
,epEvents
,qss
->maxThreads
,-1);
361 if(epEvents
[i
].events
&EPOLLOUT
){//这个epollfd只应该做以下的事情,少做为快!
362 qssEPEvent
=epEvents
[i
].data
.ptr
;
363 pthread_mutex_lock(&qssEPEvent
->writableLock
);
364 pthread_cond_signal(&qssEPEvent
->writableMonitor
);
365 pthread_mutex_unlock(&qssEPEvent
->writableLock
);
368 }else if(pollRes
==-1){//errno
369 printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr
,strerror(*errno_ptr
));
378 void * epollWorkerRoutine(void * _param
){
379 QSSWORKER_PARAM
* pParam
=(QSSWORKER_PARAM
*)_param
;
380 QSocketServer
* qss
=pParam
->qss
;
381 pthread_t
* curThread
=&pParam
->th
;
382 struct epoll_event _epEvent
;
383 QSSEPollEvent
*qssEPEvent
=NULL
;
384 InternalSenderBase_t _senderBase
;
385 int pollRes
=0,handleCode
=0,exitCode
=0,SOErrOccurred
=0,*errno_ptr
=&errno
;
387 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
,NULL
);
392 *errno_ptr
=0,SOErrOccurred
=0,qssEPEvent
=NULL
;
393 pollRes
=epoll_wait(qss
->epollFD
,&_epEvent
,1,qss
->workerWaitTimeout
);
395 qssEPEvent
=(QSSEPollEvent
*)_epEvent
.data
.ptr
;
397 if(qssEPEvent
->client_s
==qss
->server_s
)
399 onAcceptRoutine(qss
);
402 if(qss
->protoHandler
){
403 _senderBase
.event
=_epEvent
.data
.ptr
;
404 pthread_spin_lock(&qss
->g_spinlock
);
405 qss
->currentBusyWorkers
++;
406 pthread_spin_unlock(&qss
->g_spinlock
);
409 handleCode
=qss
->protoHandler(&_epEvent
,internalBlockingSender
,&_senderBase
);
411 pthread_spin_lock(&qss
->g_spinlock
);
412 qss
->currentBusyWorkers
--;
413 pthread_spin_unlock(&qss
->g_spinlock
);
416 _epEvent
.events
=EPOLLIN
|EPOLLET
|EPOLLONESHOT
;
417 if(epoll_ctl(qss
->epollFD
,EPOLL_CTL_MOD
,qssEPEvent
->client_s
,&_epEvent
)==-1)
420 SOErrOccurred
=1;//maybe socket closed 0. Or -1 socket error.
425 }else if(pollRes
==0){//timeout
426 printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr
,*curThread
);
427 if(qss
->lifecycleStatus
<=3&&qss
->currentBusyWorkers
==0&&qss
->workerCounter
>qss
->minThreads
)
429 pthread_spin_lock(&qss
->g_spinlock
);
430 if(qss
->lifecycleStatus
<=3&&qss
->currentBusyWorkers
==0&&qss
->workerCounter
>qss
->minThreads
){
431 qss
->workerCounter
--;//until qss->workerCounter decrease to qss->minThreads
434 pthread_spin_unlock(&qss
->g_spinlock
);
435 }else if(qss
->lifecycleStatus
>=4)
438 }else if(pollRes
==-1){//errno
439 printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr
,strerror(*errno_ptr
));
445 qss
->cslifecb(qssEPEvent
->client_s
,-1);
447 epoll_ctl(qss
->epollFD
,EPOLL_CTL_DEL
,qssEPEvent
->client_s
,NULL
);
448 epoll_ctl(qss
->BSendEpollFD
,EPOLL_CTL_DEL
,qssEPEvent
->client_s
,NULL
);
449 close(qssEPEvent
->client_s
);
450 if(qssEPEvent
->BSendEpollFDRelated
){
451 pthread_cond_destroy(&qssEPEvent
->writableMonitor
);
452 pthread_mutex_destroy(&qssEPEvent
->writableLock
);
456 pthread_spin_lock(&qss
->g_spinlock
);
457 if(--qss
->CSocketsCounter
==0&&qss
->lifecycleStatus
>=3){
458 //for qss workerSize,
459 qss
->lifecycleStatus
=4;
462 pthread_spin_unlock(&qss
->g_spinlock
);
463 }//SOErrOccurred handle;
469 pthread_spin_lock(&qss
->g_spinlock
);
470 if(!--qss
->workerCounter
&&qss
->lifecycleStatus
>=4){//clearup QSS
473 pthread_spin_unlock(&qss
->g_spinlock
);
476 close(qss
->BSendEpollFD
);
477 pthread_spin_destroy(&qss
->g_spinlock
);