From 815e1c2ee82bfa70f3addedf36ab78e83719b61d Mon Sep 17 00:00:00 2001 From: waler-zheng Date: Thu, 8 May 2014 09:03:24 +0800 Subject: [PATCH] push sockt&epoll samples --- client_udp.c | 56 +++++++ epoll/epoll.c | 280 ++++++++++++++++++++++++++++++++ epoll/epoll.py | 53 +++++++ epoll/ip_snif.c | 54 +++++++ epoll/serv.c | 137 ++++++++++++++++ server_udp.c | 54 +++++++ socketserver.c | 482 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ socketserver.h | 78 +++++++++ sockevent.c | 401 ++++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 1595 insertions(+) create mode 100644 client_udp.c create mode 100644 epoll/epoll.c create mode 100644 epoll/epoll.py create mode 100644 epoll/ip_snif.c create mode 100644 epoll/serv.c create mode 100644 server_udp.c create mode 100644 socketserver.c create mode 100644 socketserver.h create mode 100644 sockevent.c diff --git a/client_udp.c b/client_udp.c new file mode 100644 index 0000000..46b346a --- /dev/null +++ b/client_udp.c @@ -0,0 +1,56 @@ +/* UDP client in the internet domain */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +void error(const char *); +int main(int argc, char *argv[]) +{ + int sock, n; + unsigned int length; + struct sockaddr_in server, from; + struct hostent *hp; + char buffer[256]; + + if (argc != 3) { printf("Usage: server port\n"); + exit(1); + } + sock= socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) error("socket"); + + server.sin_family = AF_INET; + hp = gethostbyname(argv[1]); + if (hp==0) error("Unknown host"); + + bcopy((char *)hp->h_addr, + (char *)&server.sin_addr, + hp->h_length); + server.sin_port = htons(atoi(argv[2])); + length=sizeof(struct sockaddr_in); + printf("Please enter the message: "); + while(1){ + bzero(buffer,256); + fgets(buffer,255,stdin); + n=sendto(sock,buffer, + strlen(buffer),0,(const struct sockaddr *)&server,length); + if (n < 0) error("Sendto"); + n = recvfrom(sock,buffer,256,0,(struct sockaddr *)&from, &length); + if (n < 0) error("recvfrom"); + write(1,"Got an ack: ",12); + write(1,buffer,n); + } + close(sock); + return 0; +} + +void error(const char *msg) +{ + perror(msg); + exit(0); +} diff --git a/epoll/epoll.c b/epoll/epoll.c new file mode 100644 index 0000000..63f9bbe --- /dev/null +++ b/epoll/epoll.c @@ -0,0 +1,280 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAXEVENTS 64 + +//函数: +//功能:创建和绑定一个TCP socket +//参数:端口 +//返回值:创建的socket +static int +create_and_bind (char *port) +{ + struct addrinfo hints; + struct addrinfo *result, *rp; + int s, sfd; + + memset (&hints, 0, sizeof (struct addrinfo)); + hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */ + hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */ + hints.ai_flags = AI_PASSIVE; /* All interfaces */ + + s = getaddrinfo (NULL, port, &hints, &result); + if (s != 0) + { + fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s)); + return -1; + } + + for (rp = result; rp != NULL; rp = rp->ai_next) + { + sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) + continue; + + s = bind (sfd, rp->ai_addr, rp->ai_addrlen); + if (s == 0) + { + /* We managed to bind successfully! */ + break; + } + + close (sfd); + } + + if (rp == NULL) + { + fprintf (stderr, "Could not bind\n"); + return -1; + } + + freeaddrinfo (result); + + return sfd; +} + + +//函数 +//功能:设置socket为非阻塞的 +static int +make_socket_non_blocking (int sfd) +{ + int flags, s; + + //得到文件状态标志 + flags = fcntl (sfd, F_GETFL, 0); + if (flags == -1) + { + perror ("fcntl"); + return -1; + } + + //设置文件状态标志 + flags |= O_NONBLOCK; + s = fcntl (sfd, F_SETFL, flags); + if (s == -1) + { + perror ("fcntl"); + return -1; + } + + return 0; +} + +//端口由参数argv[1]指定 +int +main (int argc, char *argv[]) +{ + int sfd, s; + int efd; + struct epoll_event event; + struct epoll_event *events; + + if (argc != 2) + { + fprintf (stderr, "Usage: %s [port]\n", argv[0]); + exit (EXIT_FAILURE); + } + + sfd = create_and_bind (argv[1]); + if (sfd == -1) + abort (); + + s = make_socket_non_blocking (sfd); + if (s == -1) + abort (); + + s = listen (sfd, SOMAXCONN); + if (s == -1) + { + perror ("listen"); + abort (); + } + + //除了参数size被忽略外,此函数和epoll_create完全相同 + efd = epoll_create1 (0); + if (efd == -1) + { + perror ("epoll_create"); + abort (); + } + + event.data.fd = sfd; + event.events = EPOLLIN | EPOLLET;//读入,边缘触发方式 + s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event); + if (s == -1) + { + perror ("epoll_ctl"); + abort (); + } + + /* Buffer where events are returned */ + events = calloc (MAXEVENTS, sizeof event); + + /* The event loop */ + while (1) + { + int n, i; + + n = epoll_wait (efd, events, MAXEVENTS, -1); + for (i = 0; i < n; i++) + { + if ((events[i].events & EPOLLERR) || + (events[i].events & EPOLLHUP) || + (!(events[i].events & EPOLLIN))) + { + /* An error has occured on this fd, or the socket is not + ready for reading (why were we notified then?) */ + fprintf (stderr, "epoll error\n"); + close (events[i].data.fd); + continue; + } + + else if (sfd == events[i].data.fd) + { + /* We have a notification on the listening socket, which + means one or more incoming connections. */ + while (1) + { + struct sockaddr in_addr; + socklen_t in_len; + int infd; + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + + in_len = sizeof in_addr; + infd = accept (sfd, &in_addr, &in_len); + if (infd == -1) + { + if ((errno == EAGAIN) || + (errno == EWOULDBLOCK)) + { + /* We have processed all incoming + connections. */ + break; + } + else + { + perror ("accept"); + break; + } + } + + //将地址转化为主机名或者服务名 + s = getnameinfo (&in_addr, in_len, + hbuf, sizeof hbuf, + sbuf, sizeof sbuf, + NI_NUMERICHOST | NI_NUMERICSERV);//flag参数:以数字名返回 + //主机地址和服务地址 + + if (s == 0) + { + printf("Accepted connection on descriptor %d " + "(host=%s, port=%s)\n", infd, hbuf, sbuf); + } + + /* Make the incoming socket non-blocking and add it to the + list of fds to monitor. */ + s = make_socket_non_blocking (infd); + if (s == -1) + abort (); + + event.data.fd = infd; + event.events = EPOLLIN | EPOLLET; + s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event); + if (s == -1) + { + perror ("epoll_ctl"); + abort (); + } + } + continue; + } + else + { + /* We have data on the fd waiting to be read. Read and + display it. We must read whatever data is available + completely, as we are running in edge-triggered mode + and won't get a notification again for the same + data. */ + int done = 0; + + while (1) + { + ssize_t count; + char buf[512]; + + count = read (events[i].data.fd, buf, sizeof(buf)); + if (count == -1) + { + /* If errno == EAGAIN, that means we have read all + data. So go back to the main loop. */ + if (errno != EAGAIN) + { + perror ("read"); + done = 1; + } + break; + } + else if (count == 0) + { + /* End of file. The remote has closed the + connection. */ + done = 1; + break; + } + + /* Write the buffer to standard output */ + s = write (1, buf, count); + if (s == -1) + { + perror ("write"); + abort (); + } + } + + if (done) + { + printf ("Closed connection on descriptor %d\n", + events[i].data.fd); + + /* Closing the descriptor will make epoll remove it + from the set of descriptors which are monitored. */ + close (events[i].data.fd); + } + } + } + } + + free (events); + + close (sfd); + + return EXIT_SUCCESS; +} diff --git a/epoll/epoll.py b/epoll/epoll.py new file mode 100644 index 0000000..1eb1459 --- /dev/null +++ b/epoll/epoll.py @@ -0,0 +1,53 @@ +#! /usr/bin/python + +""" +epoll 异步使用 +""" +import socket, select + +EOL1 = b'\n\n' +EOL2 = b'\n\r\n' +response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n' +response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n' +response += b'Hello, world!' + +serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +serversocket.bind(('0.0.0.0', 8080)) +serversocket.listen(1) +serversocket.setblocking(0) + +epoll = select.epoll() +epoll.register(serversocket.fileno(), select.EPOLLIN) + +try: + connections = {}; requests = {}; responses = {} + while True: + events = epoll.poll(1) + for fileno, event in events: + if fileno == serversocket.fileno(): + connection, address = serversocket.accept() + connection.setblocking(0) + epoll.register(connection.fileno(), select.EPOLLIN) + connections[connection.fileno()] = connection + requests[connection.fileno()] = b'' + responses[connection.fileno()] = response + elif event & select.EPOLLIN: + requests[fileno] += connections[fileno].recv(1024) + if EOL1 in requests[fileno] or EOL2 in requests[fileno]: + epoll.modify(fileno, select.EPOLLOUT) + print('-'*40 + '\n' + requests[fileno].decode()[:-2]) + elif event & select.EPOLLOUT: + byteswritten = connections[fileno].send(responses[fileno]) + responses[fileno] = responses[fileno][byteswritten:] + if len(responses[fileno]) == 0: + epoll.modify(fileno, 0) + connections[fileno].shutdown(socket.SHUT_RDWR) + elif event & select.EPOLLHUP: + epoll.unregister(fileno) + connections[fileno].close() + del connections[fileno] +finally: + epoll.unregister(serversocket.fileno()) + epoll.close() + serversocket.close() diff --git a/epoll/ip_snif.c b/epoll/ip_snif.c new file mode 100644 index 0000000..e38944e --- /dev/null +++ b/epoll/ip_snif.c @@ -0,0 +1,54 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +int main(int argc, char **argv) { + int sock, n; + char buffer[2048]; + struct ethhdr *eth; + struct iphdr *iph; + + if (0>(sock=socket(PF_PACKET, SOCK_RAW, htons(ETH_P_IP)))) { + perror("socket"); + exit(1); + } + //混杂模式 + struct ifreq ethreq; + strncpy(ethreq.ifr_name,"eth0",IFNAMSIZ); + if(-1 == ioctl(sock,SIOCGIFFLAGS,ðreq)){ + perror("ioctl"); + close(sock); + exit(1); + } + ethreq.ifr_flags |=IFF_PROMISC; + if(-1 == ioctl(sock,SIOCGIFFLAGS,ðreq)){ + perror("ioctl"); + close(sock); + exit(1); + } + + while (1) { + printf("=====================================\n"); + //注意:在这之前我没有调用bind函数,原因是什么呢? + n = recvfrom(sock,buffer,2048,0,NULL,NULL); + printf("%d bytes read\n",n); + + //接收到的数据帧头6字节是目的MAC地址,紧接着6字节是源MAC地址。 + eth=(struct ethhdr*)buffer; + printf("Dest MAC addr:%02x:%02x:%02x:%02x:%02x:%02x\n",eth->h_dest[0],eth->h_dest[1],eth->h_dest[2],eth->h_dest[3],eth->h_dest[4],eth->h_dest[5]); + printf("Source MAC addr:%02x:%02x:%02x:%02x:%02x:%02x\n",eth->h_source[0],eth->h_source[1],eth->h_source[2],eth->h_source[3],eth->h_source[4],eth->h_source[5]); + + iph=(struct iphdr*)(buffer+sizeof(struct ethhdr)); + //我们只对IPV4且没有选项字段的IPv4报文感兴趣 + if(iph->version ==4 && iph->ihl == 5){ + printf("Source host:%s\n",inet_ntoa(iph->saddr)); + printf("Dest host:%s\n",inet_ntoa(iph->daddr)); + } + } +} diff --git a/epoll/serv.c b/epoll/serv.c new file mode 100644 index 0000000..13bb9e7 --- /dev/null +++ b/epoll/serv.c @@ -0,0 +1,137 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAX_EVENTS 10 +#define PORT 8080 + +//设置socket连接为非阻塞模式 +void setnonblocking(int sockfd) { + int opts; + + opts = fcntl(sockfd, F_GETFL); + if(opts < 0) { + perror("fcntl(F_GETFL)\n"); + exit(1); + } + opts = (opts | O_NONBLOCK); + if(fcntl(sockfd, F_SETFL, opts) < 0) { + perror("fcntl(F_SETFL)\n"); + exit(1); + } +} + +int main(){ + //ET模式的简单HTTP服务器代码 + + struct epoll_event ev, events[MAX_EVENTS]; + int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n; + struct sockaddr_in local, remote; + char buf[BUFSIZ]; + + //创建listen socket + if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("sockfd\n"); + exit(1); + } + setnonblocking(listenfd); + bzero(&local, sizeof(local)); + local.sin_family = AF_INET; + local.sin_addr.s_addr = htonl(INADDR_ANY);; + local.sin_port = htons(PORT); + if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0) { + perror("bind\n"); + exit(1); + } + listen(listenfd, 20); + + epfd = epoll_create(MAX_EVENTS); + if (epfd == -1) { + perror("epoll_create"); + exit(EXIT_FAILURE); + } + + ev.events = EPOLLIN; + ev.data.fd = listenfd; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) { + perror("epoll_ctl: listen_sock"); + exit(EXIT_FAILURE); + } + + for (;;) { + nfds = epoll_wait(epfd, events, MAX_EVENTS, -1); + if (nfds == -1) { + perror("epoll_pwait"); + exit(EXIT_FAILURE); + } + + for (i = 0; i < nfds; ++i) { + fd = events[i].data.fd; + if (fd == listenfd) { + // ET下,正确的accept + while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, + (size_t *)&addrlen)) > 0) { + setnonblocking(conn_sock); + ev.events = EPOLLIN | EPOLLET; + ev.data.fd = conn_sock; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, + &ev) == -1) { + perror("epoll_ctl: add"); + exit(EXIT_FAILURE); + } + } + if (conn_sock == -1) { + if (errno != EAGAIN && errno != ECONNABORTED + && errno != EPROTO && errno != EINTR) + perror("accept"); + } + continue; + } + if (events[i].events & EPOLLIN) { + // ET下,正确的读 + n = 0; + while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) { + n += nread; + } + if (nread == -1 && errno != EAGAIN) { + perror("read error"); + } + ev.data.fd = fd; + ev.events = events[i].events | EPOLLOUT; + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1) { + perror("epoll_ctl: mod"); + } + } + if (events[i].events & EPOLLOUT) { + // ET下,正确的写 + sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", 11); + int nwrite, data_size = strlen(buf); + n = data_size; + while (n > 0) { + nwrite = write(fd, buf + data_size - n, n); + if (nwrite < n) { + if (nwrite == -1 && errno != EAGAIN) { + perror("write error"); + } + break; + } + n -= nwrite; + } + close(fd); + } + } + } + + return 0; +} diff --git a/server_udp.c b/server_udp.c new file mode 100644 index 0000000..5e79f7f --- /dev/null +++ b/server_udp.c @@ -0,0 +1,54 @@ +/* Creates a datagram server. The port + number is passed as an argument. This + server runs forever */ + +#include +#include +#include +#include +#include +#include +#include +#include + +void error(const char *msg) +{ + perror(msg); + exit(0); +} + +int main(int argc, char *argv[]) +{ + int sock, length, n; + socklen_t fromlen; + struct sockaddr_in server; + struct sockaddr_in from; + char buf[1024]; + + if (argc < 2) { + fprintf(stderr, "ERROR, no port provided\n"); + exit(0); + } + + sock=socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) error("Opening socket"); + length = sizeof(server); + bzero(&server,length); + server.sin_family=AF_INET; + server.sin_addr.s_addr=INADDR_ANY; + server.sin_port=htons(atoi(argv[1])); + if (bind(sock,(struct sockaddr *)&server,length)<0) + error("binding"); + fromlen = sizeof(struct sockaddr_in); + while (1) { + n = recvfrom(sock,buf,1024,0,(struct sockaddr *)&from,&fromlen); + if (n < 0) error("recvfrom"); + write(1,"Received a datagram: ",21); + write(1,buf,n); + n = sendto(sock,"Got your message\n",17, + 0,(struct sockaddr *)&from,fromlen); + if (n < 0) error("sendto"); + } + return 0; + } + diff --git a/socketserver.c b/socketserver.c new file mode 100644 index 0000000..f6d8d4c --- /dev/null +++ b/socketserver.c @@ -0,0 +1,482 @@ +//下面的代码离生产环境还差内存池和logger哦! +#include "socketserver.h" +#include +#include +#define DIGIT_PATTERN_STRING "^[0-9]+$" +void * epollWorkerRoutine(void *); +void * blockingSendEpollerRoutine(void *); +int isDigitStr(const char *str){ + int ret=-1; + regex_t regex; + regmatch_t matchs[1]; + if(!regcomp(®ex,DIGIT_PATTERN_STRING,REG_EXTENDED/*这里不要传0哦,否则nomatch*/)){ + ret=!regexec(®ex,str, 1,matchs,0); + regfree(®ex); + } + return ret; +} + +static int setNonBlocking(int sock) +{ + int opts; + opts=fcntl(sock,F_GETFL); + if(opts==-1) + { + perror("fcntl(sock,GETFL) failed!\n"); + return opts; + } + opts = opts|O_NONBLOCK; + opts=fcntl(sock,F_SETFL,opts); + if(opts==-1) + { + perror("fcntl(sock,SETFL,opts) failed!\n"); + return opts; + } + return 1; +} + +static void adjustQSSWorkerLimits(QSocketServer *qss){ + //to adjust availabe size. +} +typedef struct{ + QSocketServer * qss; + pthread_t th; +}QSSWORKER_PARAM; + +static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){ + WORD res=0; + if(qss->workerCounterminThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCountermaxThreads)) + { + QSSWORKER_PARAM * pParam=NULL; + int i=0; + pthread_spin_lock(&qss->g_spinlock); + if(qss->workerCounter+addCounter<=qss->maxThreads) + for(;ith,NULL,epollWorkerRoutine,pParam); + pParam->qss=qss; + qss->workerCounter++,res++; + } + } + pthread_spin_unlock(&qss->g_spinlock); + } + return res; +} + +static void SOlogger(const char * msg,SOCKET s){ + perror(msg); + if(s>0) + close(s); +} + +static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){ + QSSEPollEvent *qssEPEvent=event->data.ptr; + int ret; + printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self()); + if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){ + //sleep(5); + ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret); + } + printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self()); + return ret; +} + +int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout) +{ + + QSocketServer * qss=malloc(sizeof(QSocketServer)); + qss->passive=passive; + qss->port=port; + qss->minThreads=minThreads; + qss->maxThreads=maxThreads; + qss->workerWaitTimeout=workerWaitTimeout; + qss->lifecycleStatus=0; + pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE); + qss->workerCounter=0; + qss->currentBusyWorkers=0; + qss->CSocketsCounter=0; + qss->cslifecb=cslifecb,qss->protoHandler=protoHandler; + if(!qss->protoHandler) + qss->protoHandler=_InternalProtocolHandler; + adjustQSSWorkerLimits(qss); + *qss_ptr=qss; + return 1; +} + +int startSocketServer(QSocketServer *qss) +{ + if(qss==NULL) + return 0; + else{ + pthread_spin_lock(&qss->g_spinlock); + if(qss->lifecycleStatus==0){ + qss->lifecycleStatus=1; + pthread_spin_unlock(&qss->g_spinlock); + }else{ + pthread_spin_unlock(&qss->g_spinlock); + return 0; + } + } + //bzero(&qss->serv_addr, sizeof(qss->serv_addr)); + + qss->serv_addr.sin_family=AF_INET; + qss->serv_addr.sin_port=htons(qss->port); + inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr)); + //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1"); + + qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP); + if(setNonBlocking(qss->server_s)==-1) + { + SOlogger("setNonBlocking server_s failed.\n",0); + return 0; + } + + if(qss->server_s==INVALID_SOCKET) + { + SOlogger("socket failed.\n",0); + return 0; + } + + if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR) + { + SOlogger("bind failed.\n",qss->server_s); + return 0; + } + + if(listen(qss->server_s,SOMAXCONN/*这个宏windows也有,这里是128,当然你可以设的小些,它影响开销的*/)==SOCKET_ERROR) + { + SOlogger("listen failed.\n",qss->server_s); + return 0; + } + qss->epollFD=epoll_create1(0);/*这里不是epoll_create(size)哦,你可能不知道如何设置size,所以忽略它吧*/ + if(qss->epollFD==-1){ + SOlogger("epoll_create1 0, main epollFD failed.\n",qss->server_s); + return 0; + } + qss->BSendEpollFD=epoll_create1(0);//for blocking send. + if(qss->BSendEpollFD==-1){ + SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s); + return 0; + } + + {//ADD ACCEPT EVENT + struct epoll_event _epEvent; + QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent)); + qssEPEvent->client_s=qss->server_s; + _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET; + _epEvent.data.ptr=qssEPEvent; + if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){ + SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s); + free(qssEPEvent); + return 0; + } + } + {//starup blocking send epoller. + QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM)); + pParam->qss=qss; + pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam); + } + + //initialize worker for epoll events. + addQSSWorker(qss,qss->minThreads); + qss->lifecycleStatus=2; + return 1; +} + +int shutdownSocketServer(QSocketServer *qss){ + //change qss->lifecycleStatus + if(qss==NULL) + return 0; + else{ + pthread_spin_lock(&qss->g_spinlock); + if(qss->lifecycleStatus==2){ + qss->lifecycleStatus=3; + pthread_spin_unlock(&qss->g_spinlock); + }else{ + pthread_spin_unlock(&qss->g_spinlock); + return 0; + } + } + /*shutdown server-listening socket,这里优雅的做法是shutdown--notify-->epoll-->close.记得shutdown会发送EOF的哦*/ + shutdown(qss->server_s,SHUT_RDWR); + + // /proc/getpid/fd shutdown all socket cs != serv_s + { + char dirBuf[64]; + struct dirent * de; + DIR *pd=NULL; + int sockFD; + sprintf(dirBuf,"/proc/%d/fd/",getpid()); + pd=opendir(dirBuf); + if(pd!=NULL){ + while((de=readdir(pd))!=NULL){ + if(isDigitStr(de->d_name)){ + sockFD=atoi(de->d_name); + if(isfdtype(sockFD,S_IFSOCK)) + shutdown(sockFD,SHUT_RDWR); + } + } + closedir(pd); + } + /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/ + } + return 1; +} + +static int onAcceptRoutine(QSocketServer * qss) +{ + SOCKADDR_IN client_addr; + unsigned int client_addr_leng=sizeof(SOCKADDR_IN); + SOCKET cs; + struct epoll_event _epEvent; + QSSEPollEvent *qssEPEvent=NULL; + cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng); + if(cs==INVALID_SOCKET) + { + printf("onAccept failed:%d,%s\n",errno,strerror(errno)); + epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22 Invalid argument + return 0; + } + if(setNonBlocking(cs)==-1) + { + printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs); + return 0; + } + + {// set keepalive option + int keepAlive = 1; + int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT; + int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL; + int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT; + if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))|| + setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))|| + setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))|| + setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount))) + { + printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno)); + return 0; + } + } + qssEPEvent=malloc(sizeof(QSSEPollEvent)); + qssEPEvent->client_s=cs; + { + _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT; + qssEPEvent->BSendEpollFDRelated=0; + _epEvent.data.ptr=qssEPEvent;/*这里又和教科的不一样哦,真正的user data用ptr,而不是单一的fd*/ + if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){ + printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno); + free(qssEPEvent); + return 0; + }else{ + pthread_spin_lock(&qss->g_spinlock); + qss->CSocketsCounter++; + pthread_spin_unlock(&qss->g_spinlock); + if(qss->cslifecb) + qss->cslifecb(cs,0); + } + } + printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs); + return 1; +} + +typedef struct{ + QSocketServer * qss; + QSSEPollEvent * event; +}InternalSenderBase_t; + +static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){ + InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase; + char * _sbuf=_buf; + int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno; + + QSSEPollEvent *qssEPEvent=NULL; + struct epoll_event _epEvent; + + struct timespec sendTimeo; + + while(1){ + *errno_ptr=0; + while(sum0) + sum+=ret,_sbuf+=ret; + if(sum==nbs||ret==0) + break; + else if(ret==-1){ + if(errno==EAGAIN&&sumevent; + _epEvent.data.ptr=qssEPEvent; + _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT; + if(qssEPEvent->BSendEpollFDRelated==0){ + pthread_mutex_init(&qssEPEvent->writableLock,NULL); + pthread_cond_init(&qssEPEvent->writableMonitor,NULL); + qssEPEvent->BSendEpollFDRelated=1; + curEpoll_ctl_opt=EPOLL_CTL_ADD; + }else{ + curEpoll_ctl_opt=EPOLL_CTL_MOD; + } + + {//wait writable. + int flag=0; + pthread_mutex_lock(&qssEPEvent->writableLock); + if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){ + sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT; + int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo); + if(err) + flag=-1; + }else + flag=-1; + pthread_mutex_unlock(&qssEPEvent->writableLock); + if(flag==-1) + break; + } + + }else{ + if(errno==EAGAIN&&sum==nbs) + ret=nbs;//it is ok; + break; + } + } + }//end while. + return ret; +} +void * blockingSendEpollerRoutine(void *_param){ + QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param; + QSocketServer * qss=pParam->qss; + //pthread_t * curThread=&pParam->th; + struct epoll_event epEvents[qss->maxThreads]; + QSSEPollEvent *qssEPEvent=NULL; + int pollRes,*errno_ptr=&errno; + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); + + free(pParam); + while(1){ + + pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1); + if(pollRes>=1){ + int i=0; + for(;iwritableLock); + pthread_cond_signal(&qssEPEvent->writableMonitor); + pthread_mutex_unlock(&qssEPEvent->writableLock); + } + + }else if(pollRes==-1){//errno + printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr)); + break; + } + + } + + return NULL; +} + +void * epollWorkerRoutine(void * _param){ + QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param; + QSocketServer * qss=pParam->qss; + pthread_t * curThread=&pParam->th; + struct epoll_event _epEvent; + QSSEPollEvent *qssEPEvent=NULL; + InternalSenderBase_t _senderBase; + int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno; + _senderBase.qss=qss; + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); + + free(pParam); + while(!exitCode){ + + *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL; + pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout); + if(pollRes==1){ + qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr; + + if(qssEPEvent->client_s==qss->server_s) + {//Accepted Socket. + onAcceptRoutine(qss); + continue; + }else{ + if(qss->protoHandler){ + _senderBase.event=_epEvent.data.ptr; + pthread_spin_lock(&qss->g_spinlock); + qss->currentBusyWorkers++; + pthread_spin_unlock(&qss->g_spinlock); + + addQSSWorker(qss,1); + handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase); + + pthread_spin_lock(&qss->g_spinlock); + qss->currentBusyWorkers--; + pthread_spin_unlock(&qss->g_spinlock); + + if(handleCode>0){ + _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT; + if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1) + SOErrOccurred=2; + }else{ + SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error. + } + } + } + + }else if(pollRes==0){//timeout + printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread); + if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads) + { + pthread_spin_lock(&qss->g_spinlock); + if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){ + qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads + exitCode=2; + } + pthread_spin_unlock(&qss->g_spinlock); + }else if(qss->lifecycleStatus>=4) + exitCode=4; + + }else if(pollRes==-1){//errno + printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr)); + exitCode=1; + } + + if(SOErrOccurred){ + if(qss->cslifecb) + qss->cslifecb(qssEPEvent->client_s,-1); + /*if(qssEPEvent)*/{ + epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL); + epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL); + close(qssEPEvent->client_s); + if(qssEPEvent->BSendEpollFDRelated){ + pthread_cond_destroy(&qssEPEvent->writableMonitor); + pthread_mutex_destroy(&qssEPEvent->writableLock); + } + free(qssEPEvent); + } + pthread_spin_lock(&qss->g_spinlock); + if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){ + //for qss workerSize, + qss->lifecycleStatus=4; + exitCode=3; + } + pthread_spin_unlock(&qss->g_spinlock); + }//SOErrOccurred handle; + + }//end main while. + + if(exitCode!=2){ + int clearup=0; + pthread_spin_lock(&qss->g_spinlock); + if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS + clearup=1; + } + pthread_spin_unlock(&qss->g_spinlock); + if(clearup){ + close(qss->epollFD); + close(qss->BSendEpollFD); + pthread_spin_destroy(&qss->g_spinlock); + free(qss); + } + }//exitCode handle; + return NULL; +} diff --git a/socketserver.h b/socketserver.h new file mode 100644 index 0000000..134a12e --- /dev/null +++ b/socketserver.h @@ -0,0 +1,78 @@ +#ifndef __Q_SOCKET_SERVER__ +#define __Q_SOCKET_SERVER__ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#define SOCKET_ERROR -1 +#define INVALID_SOCKET -1 +typedef int SOCKET; +typedef struct sockaddr_in SOCKADDR_IN; +typedef unsigned short WORD; +typedef unsigned int DWORD; + +#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60 +#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5 +#define QSS_SIO_KEEPALIVE_VALS_COUNT 3 +#define MAX_THREADS 100 +#define MAX_THREADS_MIN 10 +#define MIN_WORKER_WAIT_TIMEOUT 20*1000 +#define MAX_WORKER_WAIT_TIMEOUT 60*MIN_WORKER_WAIT_TIMEOUT +#define MAX_THREADPOOLS 32 + +#define MAX_BUF_SIZE 1024 +/* ulimit -n opened FDs per process.记得修改哦,否则还是select效果,就不是epoll效果了哦,呵呵*/ +#define BLOCKING_SEND_TIMEOUT 20 + +typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose +typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs); +typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR + +typedef struct { + WORD passive; + WORD port;//uint16_t + WORD minThreads; + WORD maxThreads; + pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE + volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped + int workerWaitTimeout;//wait timeout + volatile int workerCounter; + volatile int currentBusyWorkers; + volatile int CSocketsCounter; + CSocketLifecycleCallback cslifecb; + InternalProtocolHandler protoHandler; + SOCKET server_s; + SOCKADDR_IN serv_addr; + int epollFD;//main epoller. + int BSendEpollFD;//For blocking send. +}QSocketServer; + +typedef struct { + SOCKET client_s; + SOCKADDR_IN client_addr; + uint32_t curEvents; + + char buf[MAX_BUF_SIZE]; + DWORD numberOfBytesTransferred; + char * data; + + int BSendEpollFDRelated; + pthread_mutex_t writableLock; + pthread_cond_t writableMonitor; +}QSSEPollEvent;//for per connection + +int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout); +int startSocketServer(QSocketServer *qss); +int shutdownSocketServer(QSocketServer *qss); +#endif + diff --git a/sockevent.c b/sockevent.c new file mode 100644 index 0000000..66600c8 --- /dev/null +++ b/sockevent.c @@ -0,0 +1,401 @@ +/* sockevent.c + * create TCP socket and store then in a epoll file descriptor + * build with : gcc -O3 -g -falign-functions=4 -falign-jumps -falign-loops -Wall -o sockevent sockevent.c + */ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +/* catch SIGINT and set stop to signal_id +*/ +int stop; +void interrupt(int signal_id) +{ + stop = signal_id; +} + +// init struct for statistics +struct statistics +{ + int reqsent; + int bytessent; + int reprecv; + int bytesrecv; + int error; + int nbsock; +}; + +void printstats(struct statistics *stats) +{ + struct statistics previous; + previous.reqsent = 0; + previous.reprecv = 0; + int banner = 0; + + printf("\nreprecv\tbytes\t^hit\treqsent\tbytes\t^req\tErrors\tActive\n"); + + for(;;) + { + sleep(1); + if(banner == 10) + { + printf("\nreprecv\tbytes\t^hit\treqsent\tbytes\t^req\tErrors\tActive\n"); + banner = 0; + } + else + banner++; + + + printf("%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\n", stats->reprecv, stats->bytesrecv, stats->reprecv - previous.reprecv,stats->reqsent, stats->bytessent, stats->reqsent - previous.reqsent, stats->error, stats->nbsock); + + previous.reqsent = stats->reqsent; + previous.reprecv = stats->reprecv; + } + +} + + +/* WT (inject29) function to convert a ip:port chain into a sockaddr struct */ +struct sockaddr_in str2sa(char *str) +{ + static struct sockaddr_in sa; + char *c; + int port; + + bzero(&sa, sizeof(sa)); + str=strdup(str); + if ((c=strrchr(str,':')) != NULL) { + *c++=0; + port=atol(c); + } + else + port=0; + + if (!inet_aton(str, &sa.sin_addr)) { + struct hostent *he; + + if ((he = gethostbyname(str)) == NULL) + fprintf(stderr,"[NetTools] Invalid server name: %s\n",str); + else + sa.sin_addr = *(struct in_addr *) *(he->h_addr_list); + } + sa.sin_port=htons(port); + sa.sin_family=AF_INET; + + free(str); + return sa; +} + +/* create a TCP socket with non blocking options and connect it to the target + * if succeed, add the socket in the epoll list and exit with 0 + */ +int create_and_connect( struct sockaddr_in target , int *epfd) +{ + int yes = 1; + int sock; + + // epoll mask that contain the list of epoll events attached to a network socket + static struct epoll_event Edgvent; + + + if( (sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) + { + perror("socket"); + exit(1); + } + + // set socket to non blocking and allow port reuse + if ( (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) || + fcntl(sock, F_SETFL, O_NONBLOCK)) == -1) + { + perror("setsockopt || fcntl"); + exit(1); + } + + if( connect(sock, (struct sockaddr *)&target, sizeof(struct sockaddr)) == -1 + && errno != EINPROGRESS) + { + // connect doesn't work, are we running out of available ports ? if yes, destruct the socket + if (errno == EAGAIN) + { + perror("connect is EAGAIN"); + close(sock); + exit(1); + } + } + else + { + /* epoll will wake up for the following events : + * + * EPOLLIN : The associated file is available for read(2) operations. + * + * EPOLLOUT : The associated file is available for write(2) operations. + * + * EPOLLRDHUP : Stream socket peer closed connection, or shut down writing + * half of connection. (This flag is especially useful for writing simple + * code to detect peer shutdown when using Edge Triggered monitoring.) + * + * EPOLLERR : Error condition happened on the associated file descriptor. + * epoll_wait(2) will always wait for this event; it is not necessary to set it in events. + */ + Edgvent.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET ; + //Edgvent.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLERR; + + Edgvent.data.fd = sock; + + // add the socket to the epoll file descriptors + if(epoll_ctl((int)epfd, EPOLL_CTL_ADD, sock, &Edgvent) != 0) + { + perror("epoll_ctl, adding socket\n"); + exit(1); + } + } + + return 0; +} + + + + +/* reading waiting errors on the socket + * return 0 if there's no, 1 otherwise + */ +int socket_check(int fd) +{ + int ret; + int code; + size_t len = sizeof(int); + + ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &code, &len); + + if ((ret || code)!= 0) + return 1; + + return 0; +} + + + + +int main(int argc, char *argv[]) +{ + + if(argc!=3) + { + printf("gatlinject \n"); + exit(1); + } + + struct sockaddr_in target = str2sa((char *) argv[1]); // convert target information + int maxconn = atoi(argv[2]); //number of sockets to connect to the target + + // internal variables definition + int i, count, datacount; + + char message[] = "hello\n\n"; + int messagelength = strlen(message); + + char buffer[1500]; + int buffersize = strlen(buffer); + + struct statistics stats; + memset(&stats,0x0,6 * sizeof(int)); + pthread_t Statsthread; + + + // time + struct timeval start; + struct timeval current; + float elapsedtime; + + // catch SIGINT to exit in a clean way + struct sigaction sa; + memset(&sa, 0, sizeof(struct sigaction *)); + sa.sa_handler = interrupt; + sa.sa_flags = 0; + sigemptyset (&(sa.sa_mask)); + if(sigaction (SIGINT, &sa, NULL)!= 0) + { + perror("sigaction failed"); + exit(1); + } + + // the epoll file descriptor + int epfd; + + // epoll structure that will contain the current network socket and event when epoll wakes up + static struct epoll_event *events; + static struct epoll_event event_mask; + + // create the special epoll file descriptor + epfd = epoll_create(maxconn); + + // allocate enough memory to store all the events in the "events" structure + if (NULL == (events = calloc(maxconn, sizeof(struct epoll_event)))) + { + perror("calloc events"); + exit(1); + }; + + // create and connect as much as needed + for(i=0;i