0%

C++轻量级服务器

一. socket基础

首先在服务器,我们需要建立一个socket套接字,对外提供一个网络通信接口,在Linux系统中这个套接字竟然仅仅是一个文件描述符,也就是一个int类型的值!

为了方便编码以及代码的可读性,可以封装一个错误处理函数:

1
2
3
4
5
6
void errif(bool condition, const char *errmsg){
if(condition){
perror(errmsg);
exit(EXIT_FAILURE);
}
}
1
2
3
#include <sys/socket.h>
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
errif(sockfd == -1, "socket create error");
  • 第一个参数:IP地址类型,AF_INET表示使用IPv4,如果使用IPv6请使用AF_INET6。
  • 第二个参数:数据传输方式,SOCK_STREAM表示流格式、面向连接,多用于TCP。SOCK_DGRAM表示数据报格式、无连接,多用于UDP。
  • 第三个参数:协议,0表示根据前面的两个参数自动推导协议类型。设置为IPPROTO_TCP和IPPTOTO_UDP,分别表示TCP和UDP。

对于客户端,服务器存在的唯一标识是一个IP地址和端口,这时候我们需要将这个套接字绑定到一个IP地址和端口上。首先创建一个sockaddr_in结构体

1
2
3
4
struct sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
serv_addr.sin_port = htons(8888);

然后将socket地址与文件描述符绑定:

1
errif(bind(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr)) == -1, "socket bind error");

最后我们需要使用listen函数监听这个socket端口,这个函数的第二个参数是listen函数的最大监听队列长度,系统建议的最大值SOMAXCONN被定义为128。

1
errif(listen(sockfd, SOMAXCONN) == -1, "socket listen error");

要接受一个客户端连接,需要使用accept函数。对于每一个客户端,我们在接受连接时也需要保存客户端的socket地址信息,于是有以下代码:

1
2
3
4
5
6
7
struct sockaddr_in clnt_addr;
socklen_t clnt_addr_len = sizeof(clnt_addr);
//填充0初始化
bzero(&clnt_addr, sizeof(clnt_addr));
int clnt_sockfd = accept(sockfd, (sockaddr*)&clnt_addr, &clnt_addr_len);
printf("new client fd %d! IP: %s Port: %d\n", clnt_sockfd, inet_ntoa(clnt_addr.sin_addr), ntohs(clnt_addr.sin_port));
errif(clnt_sockfd == -1, "socket accept error");

连接测试:

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int main() {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
errif(sockfd == -1, "socket create error");

struct sockaddr_in serv_addr;
bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
serv_addr.sin_port = htons(8888);

errif(bind(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr)) == -1, "socket bind error");

errif(listen(sockfd, SOMAXCONN) == -1, "socket listen error");

struct sockaddr_in clnt_addr;
socklen_t clnt_addr_len = sizeof(clnt_addr);
bzero(&clnt_addr, sizeof(clnt_addr));

int clnt_sockfd = accept(sockfd, (sockaddr*)&clnt_addr, &clnt_addr_len);
errif(clnt_sockfd == -1, "socket accept error");

printf("new client fd %d! IP: %s Port: %d\n", clnt_sockfd, inet_ntoa(clnt_addr.sin_addr), ntohs(clnt_addr.sin_port));
while (true) {
char buf[1024];
bzero(&buf, sizeof(buf));
ssize_t read_bytes = read(clnt_sockfd, buf, sizeof(buf));
if(read_bytes > 0){
printf("message from client fd %d: %s\n", clnt_sockfd, buf);
write(clnt_sockfd, buf, sizeof(buf));
} else if(read_bytes == 0){
printf("client fd %d disconnected\n", clnt_sockfd);
close(clnt_sockfd);
break;
} else if(read_bytes == -1){
close(clnt_sockfd);
errif(true, "socket read error");
}
}
close(sockfd);
return 0;
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int main() {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
errif(sockfd == -1, "socket create error");

struct sockaddr_in serv_addr;
bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
serv_addr.sin_port = htons(8888);

errif(connect(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr)) == -1, "socket connect error");

while(true){
char buf[1024];
bzero(&buf, sizeof(buf));
scanf("%s", buf);
ssize_t write_bytes = write(sockfd, buf, sizeof(buf));
if(write_bytes == -1){
printf("socket already disconnected, can't write any more!\n");
break;
}
bzero(&buf, sizeof(buf));
ssize_t read_bytes = read(sockfd, buf, sizeof(buf));
if(read_bytes > 0){
printf("message from server: %s\n", buf);
}else if(read_bytes == 0){
printf("server socket disconnected!\n");
break;
}else if(read_bytes == -1){
close(sockfd);
errif(true, "socket read error");
}
}
close(sockfd);
return 0;
}

二. 高并发

IO复用是针对IO接口,而多线程是针对CPU。在Linux系统中,IO复用使用select, poll和epoll来实现。epoll改进了前两者,更加高效、性能更好,是目前几乎所有高并发服务器的基石。

epoll监听事件的描述符会放在一颗红黑树上,我们将要监听的IO口放入epoll红黑树中,就可以监听该IO上的事件。

1
2
3
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);    //添加事件到epoll
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); //修改epoll红黑树上的事件
epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL); //删除事件

其中sockfd表示我们要添加的IO文件描述符,ev是一个epoll_event结构体,其中的events表示事件,如EPOLLIN等,data是一个用户数据union:

1
2
3
4
5
6
7
8
9
10
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;

epoll默认采用LT触发模式,即水平触发,只要fd上有事件,就会一直通知内核。这样可以保证所有事件都得到处理、不容易丢失,但可能发生的大量重复通知也会影响epoll的性能。如使用ET模式,即边缘触法,fd从无事件到有事件的变化会通知内核一次,之后就不会再次通知内核。这种方式十分高效,可以大大提高支持的并发度,但程序逻辑必须一次性很好地处理该fd上的事件,编程比LT更繁琐。注意ET模式必须搭配非阻塞式socket使用。

我们可以随时使用epoll_wait获取有事件发生的fd:

1
int nfds = epoll_wait(epfd, events, maxevents, timeout);

其中events是一个epoll_event结构体数组,maxevents是可供返回的最大事件大小,一般是events的大小,timeout表示最大等待时间,设置为-1表示一直等待。

服务器改写成epoll版本,基本思想为:在创建了服务器socket fd后,将这个fd添加到epoll,只要这个fd上发生可读事件,表示有一个新的客户端连接。然后accept这个客户端并将客户端的socket fd添加到epoll,epoll会监听客户端socket fd是否有事件发生,如果发生则处理事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int sockfd = socket(...);   //创建服务器socket fd
bind(sockfd...);
listen(sockfd...);
int epfd = epoll_create1(0);
struct epoll_event events[MAX_EVENTS], ev;
ev.events = EPOLLIN; //在代码中使用了ET模式,且未处理错误,在day12进行了修复,实际上接受连接最好不要用ET模式
ev.data.fd = sockfd; //该IO口为服务器socket fd
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); //将服务器socket fd添加到epoll
while(true){ // 不断监听epoll上的事件并处理
int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1); //有nfds个fd发生事件
for(int i = 0; i < nfds; ++i){ //处理这nfds个事件
if(events[i].data.fd == sockfd){ //发生事件的fd是服务器socket fd,表示有新客户端连接
int clnt_sockfd = accept(sockfd, (sockaddr*)&clnt_addr, &clnt_addr_len);
ev.data.fd = clnt_sockfd;
ev.events = EPOLLIN | EPOLLET; //对于客户端连接,使用ET模式,可以让epoll更加高效,支持更多并发
setnonblocking(clnt_sockfd); //ET需要搭配非阻塞式socket使用
epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sockfd, &ev); //将该客户端的socket fd添加到epoll
} else if(events[i].events & EPOLLIN){ //发生事件的是客户端,并且是可读事件(EPOLLIN)
handleEvent(events[i].data.fd); //处理该fd上发生的事件
}
}
}

三. 封装成类

socket:

1
2
3
4
5
6
Socket *serv_sock = new Socket();
InetAddress *serv_addr = new InetAddress("127.0.0.1", 8888);
serv_sock->bind(serv_addr);
serv_sock->listen();
InetAddress *clnt_addr = new InetAddress();
Socket *clnt_sock = new Socket(serv_sock->accept(clnt_addr));

epoll:

1
2
3
4
5
6
7
8
Epoll *ep = new Epoll();
ep->addFd(serv_sock->getFd(), EPOLLIN | EPOLLET);
while(true){
vector<epoll_event> events = ep->poll();
for(int i = 0; i < events.size(); ++i){
// handle event
}
}

四. 抽象服务器

仔细分析可发现,对于每一个事件,不管提供什么样的服务,首先需要做的事都是调用accept()函数接受这个TCP连接,然后将socket文件描述符添加到epoll。当这个IO口有事件发生的时候,再对此TCP连接提供相应的服务。因此我们可以分离接受连接这一模块,添加一个Acceptor类,这个类有以下几个特点:

  • 类存在于事件驱动EventLoop类中,也就是Reactor模式的main-Reactor
  • 类中的socket fd就是服务器监听的socket fd,每一个Acceptor对应一个socket fd
  • 这个类也通过一个独有的Channel负责分发到epoll,该Channel的事件处理函数handleEvent()会调用Acceptor中的接受连接函数来新建一个TCP连接

对于TCP协议,三次握手新建连接后,这个连接将会一直存在,直到我们四次挥手断开连接。因此,我们也可以把TCP连接抽象成一个Connection类,这个类也有以下几个特点:

  • 类存在于事件驱动EventLoop类中,也就是Reactor模式的main-Reactor
  • 类中的socket fd就是客户端的socket fd,每一个Connection对应一个socket fd
  • 每一个类的实例通过一个独有的Channel负责分发到epoll,该Channel的事件处理函数handleEvent()会调用Connection中的事件处理函数来响应客户端请求

流程

Acceptor拿着服务器的sockfd等待着客户来连接,即大堂经理,负责招待贵客。一旦建立连接,拿到客户的sockfd,将这个连接保存管理起来。所有的sockfd都会放入epoll中进行监听。连接中的客户被监听到事件,则仍给线程池进行任务处理。

线程池

关于线程池,需要特别注意的有两点,一是在多线程环境下任务队列的读写操作都应该考虑互斥锁,二是当任务队列为空时CPU不应该不断轮询耗费CPU资源。为了解决第一点,这里使用std::mutex来对任务队列进行加锁解锁。为了解决第二个问题,使用了条件变量std::condition_variable

主从Reactor多线程

main-Reactor只负责接受新的连接,建立连接后将连接交给sub-Reactor管理。一个线程负责一个sub-Reactor,并监听该connection上的事件,读写任务交给线程池完成。

------------- Thank you for reading -------------

Title - Artist
0:00