一. socket基础
首先在服务器,我们需要建立一个socket套接字,对外提供一个网络通信接口,在Linux系统中这个套接字竟然仅仅是一个文件描述符,也就是一个int
类型的值!
为了方便编码以及代码的可读性,可以封装一个错误处理函数:
1 2 3 4 5 6
| void errif(bool condition, const char *errmsg){ if(condition){ perror(errmsg); exit(EXIT_FAILURE); } }
|
1 2 3
| #include <sys/socket.h> int sockfd = socket(AF_INET, SOCK_STREAM, 0); errif(sockfd == -1, "socket create error");
|
- 第一个参数:IP地址类型,AF_INET表示使用IPv4,如果使用IPv6请使用AF_INET6。
- 第二个参数:数据传输方式,SOCK_STREAM表示流格式、面向连接,多用于TCP。SOCK_DGRAM表示数据报格式、无连接,多用于UDP。
- 第三个参数:协议,0表示根据前面的两个参数自动推导协议类型。设置为IPPROTO_TCP和IPPTOTO_UDP,分别表示TCP和UDP。
对于客户端,服务器存在的唯一标识是一个IP地址和端口,这时候我们需要将这个套接字绑定到一个IP地址和端口上。首先创建一个sockaddr_in结构体
1 2 3 4
| struct sockaddr_in serv_addr; serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); serv_addr.sin_port = htons(8888);
|
然后将socket地址与文件描述符绑定:
1
| errif(bind(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr)) == -1, "socket bind error");
|
最后我们需要使用listen
函数监听这个socket端口,这个函数的第二个参数是listen函数的最大监听队列长度,系统建议的最大值SOMAXCONN
被定义为128。
1
| errif(listen(sockfd, SOMAXCONN) == -1, "socket listen error");
|
要接受一个客户端连接,需要使用accept
函数。对于每一个客户端,我们在接受连接时也需要保存客户端的socket地址信息,于是有以下代码:
1 2 3 4 5 6 7
| struct sockaddr_in clnt_addr; socklen_t clnt_addr_len = sizeof(clnt_addr);
bzero(&clnt_addr, sizeof(clnt_addr)); int clnt_sockfd = accept(sockfd, (sockaddr*)&clnt_addr, &clnt_addr_len); printf("new client fd %d! IP: %s Port: %d\n", clnt_sockfd, inet_ntoa(clnt_addr.sin_addr), ntohs(clnt_addr.sin_port)); errif(clnt_sockfd == -1, "socket accept error");
|
连接测试:
服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); errif(sockfd == -1, "socket create error");
struct sockaddr_in serv_addr; bzero(&serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); serv_addr.sin_port = htons(8888);
errif(bind(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr)) == -1, "socket bind error");
errif(listen(sockfd, SOMAXCONN) == -1, "socket listen error"); struct sockaddr_in clnt_addr; socklen_t clnt_addr_len = sizeof(clnt_addr); bzero(&clnt_addr, sizeof(clnt_addr));
int clnt_sockfd = accept(sockfd, (sockaddr*)&clnt_addr, &clnt_addr_len); errif(clnt_sockfd == -1, "socket accept error");
printf("new client fd %d! IP: %s Port: %d\n", clnt_sockfd, inet_ntoa(clnt_addr.sin_addr), ntohs(clnt_addr.sin_port)); while (true) { char buf[1024]; bzero(&buf, sizeof(buf)); ssize_t read_bytes = read(clnt_sockfd, buf, sizeof(buf)); if(read_bytes > 0){ printf("message from client fd %d: %s\n", clnt_sockfd, buf); write(clnt_sockfd, buf, sizeof(buf)); } else if(read_bytes == 0){ printf("client fd %d disconnected\n", clnt_sockfd); close(clnt_sockfd); break; } else if(read_bytes == -1){ close(clnt_sockfd); errif(true, "socket read error"); } } close(sockfd); return 0; }
|
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); errif(sockfd == -1, "socket create error");
struct sockaddr_in serv_addr; bzero(&serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); serv_addr.sin_port = htons(8888);
errif(connect(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr)) == -1, "socket connect error"); while(true){ char buf[1024]; bzero(&buf, sizeof(buf)); scanf("%s", buf); ssize_t write_bytes = write(sockfd, buf, sizeof(buf)); if(write_bytes == -1){ printf("socket already disconnected, can't write any more!\n"); break; } bzero(&buf, sizeof(buf)); ssize_t read_bytes = read(sockfd, buf, sizeof(buf)); if(read_bytes > 0){ printf("message from server: %s\n", buf); }else if(read_bytes == 0){ printf("server socket disconnected!\n"); break; }else if(read_bytes == -1){ close(sockfd); errif(true, "socket read error"); } } close(sockfd); return 0; }
|
二. 高并发
IO复用是针对IO接口,而多线程是针对CPU。在Linux系统中,IO复用使用select, poll和epoll来实现。epoll改进了前两者,更加高效、性能更好,是目前几乎所有高并发服务器的基石。
epoll监听事件的描述符会放在一颗红黑树上,我们将要监听的IO口放入epoll红黑树中,就可以监听该IO上的事件。
1 2 3
| epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
|
其中sockfd表示我们要添加的IO文件描述符,ev是一个epoll_event结构体,其中的events表示事件,如EPOLLIN等,data是一个用户数据union:
1 2 3 4 5 6 7 8 9 10
| typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; epoll_data_t data; } __EPOLL_PACKED;
|
epoll默认采用LT触发模式,即水平触发,只要fd上有事件,就会一直通知内核。这样可以保证所有事件都得到处理、不容易丢失,但可能发生的大量重复通知也会影响epoll的性能。如使用ET模式,即边缘触法,fd从无事件到有事件的变化会通知内核一次,之后就不会再次通知内核。这种方式十分高效,可以大大提高支持的并发度,但程序逻辑必须一次性很好地处理该fd上的事件,编程比LT更繁琐。注意ET模式必须搭配非阻塞式socket使用。
我们可以随时使用epoll_wait
获取有事件发生的fd:
1
| int nfds = epoll_wait(epfd, events, maxevents, timeout);
|
其中events是一个epoll_event结构体数组,maxevents是可供返回的最大事件大小,一般是events的大小,timeout表示最大等待时间,设置为-1表示一直等待。
服务器改写成epoll版本,基本思想为:在创建了服务器socket fd后,将这个fd添加到epoll,只要这个fd上发生可读事件,表示有一个新的客户端连接。然后accept这个客户端并将客户端的socket fd添加到epoll,epoll会监听客户端socket fd是否有事件发生,如果发生则处理事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| int sockfd = socket(...); bind(sockfd...); listen(sockfd...); int epfd = epoll_create1(0); struct epoll_event events[MAX_EVENTS], ev; ev.events = EPOLLIN; ev.data.fd = sockfd; epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); while(true){ int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1); for(int i = 0; i < nfds; ++i){ if(events[i].data.fd == sockfd){ int clnt_sockfd = accept(sockfd, (sockaddr*)&clnt_addr, &clnt_addr_len); ev.data.fd = clnt_sockfd; ev.events = EPOLLIN | EPOLLET; setnonblocking(clnt_sockfd); epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sockfd, &ev); } else if(events[i].events & EPOLLIN){ handleEvent(events[i].data.fd); } } }
|
三. 封装成类
socket:
1 2 3 4 5 6
| Socket *serv_sock = new Socket(); InetAddress *serv_addr = new InetAddress("127.0.0.1", 8888); serv_sock->bind(serv_addr); serv_sock->listen(); InetAddress *clnt_addr = new InetAddress(); Socket *clnt_sock = new Socket(serv_sock->accept(clnt_addr));
|
epoll:
1 2 3 4 5 6 7 8
| Epoll *ep = new Epoll(); ep->addFd(serv_sock->getFd(), EPOLLIN | EPOLLET); while(true){ vector<epoll_event> events = ep->poll(); for(int i = 0; i < events.size(); ++i){ } }
|
四. 抽象服务器
仔细分析可发现,对于每一个事件,不管提供什么样的服务,首先需要做的事都是调用accept()
函数接受这个TCP连接,然后将socket文件描述符添加到epoll。当这个IO口有事件发生的时候,再对此TCP连接提供相应的服务。因此我们可以分离接受连接这一模块,添加一个Acceptor
类,这个类有以下几个特点:
- 类存在于事件驱动
EventLoop
类中,也就是Reactor模式的main-Reactor
- 类中的socket fd就是服务器监听的socket fd,每一个Acceptor对应一个socket fd
- 这个类也通过一个独有的
Channel
负责分发到epoll,该Channel的事件处理函数handleEvent()
会调用Acceptor中的接受连接函数来新建一个TCP连接
对于TCP协议,三次握手新建连接后,这个连接将会一直存在,直到我们四次挥手断开连接。因此,我们也可以把TCP连接抽象成一个Connection
类,这个类也有以下几个特点:
- 类存在于事件驱动
EventLoop
类中,也就是Reactor模式的main-Reactor
- 类中的socket fd就是客户端的socket fd,每一个Connection对应一个socket fd
- 每一个类的实例通过一个独有的
Channel
负责分发到epoll,该Channel的事件处理函数handleEvent()
会调用Connection中的事件处理函数来响应客户端请求
流程
Acceptor
拿着服务器的sockfd
等待着客户来连接,即大堂经理,负责招待贵客。一旦建立连接,拿到客户的sockfd
,将这个连接保存管理起来。所有的sockfd
都会放入epoll
中进行监听。连接中的客户被监听到事件,则仍给线程池进行任务处理。
线程池
关于线程池,需要特别注意的有两点,一是在多线程环境下任务队列的读写操作都应该考虑互斥锁,二是当任务队列为空时CPU不应该不断轮询耗费CPU资源。为了解决第一点,这里使用std::mutex
来对任务队列进行加锁解锁。为了解决第二个问题,使用了条件变量std::condition_variable
。
主从Reactor多线程
main-Reactor只负责接受新的连接,建立连接后将连接交给sub-Reactor管理。一个线程负责一个sub-Reactor,并监听该connection上的事件,读写任务交给线程池完成。