四、高性能HTTP服务器设计


32 | 自己动手写高性能HTTP服务器(一):设计和思路

在开始编写高性能HTTP服务器之前,我们先要构建一个支持TCP的高性能网络编程框架,完成这个TCP高性能网络框架之后,再增加HTTP特性的支持就比较容易了,这样就可以很快开发出一个高性能的HTTP服务器程序。

设计需求

在第三个模块性能篇中,我们已经使用这个网络编程框架完成了多个应用程序的开发,这也等于对网络编程框架提出了编程接口方面的需求。综合之前的使用经验,TCP高性能网络框架需要满足的需求有以下三点。

第一,采用reactor模型,可以灵活使用poll/epoll作为事件分发实现。

第二,必须支持多线程,从而可以支持单线程单reactor模式,也可以支持多线程主-从reactor模式。可以将套接字上的I/O事件分离到多个线程上。

第三,封装读写操作到Buffer对象中。

按照这三个需求,正好可以把整体设计思路分成三块来讲解,分别包括反应堆模式设计、I/O模型和多线程模型设计、数据读写封装和buffer。今天我们主要讲一下主要的设计思路和数据结构,以及反应堆模式设计。

主要设计思路

反应堆模式设计

反应堆模式,按照性能篇的讲解,主要是设计一个基于事件分发和回调的反应堆框架。这个框架里面的主要对象包括:

  • event_loop

你可以把event_loop这个对象理解成和一个线程绑定的无限事件循环,你会在各种语言里看到event_loop这个抽象。这是什么意思呢?简单来说,它就是一个无限循环着的事件分发器,一旦有事件发生,它就会回调预先定义好的回调函数,完成事件的处理。

具体来说,event_loop使用poll或者epoll方法将一个线程阻塞,等待各种I/O事件的发生。

  • channel

对各种注册到event_loop上的对象,我们抽象成channel来表示,例如注册到event_loop上的监听事件,注册到event_loop上的套接字读写事件等。在各种语言的API里,你都会看到channel这个对象,大体上它们表达的意思跟我们这里的设计思路是比较一致的。

  • acceptor

acceptor对象表示的是服务器端监听器,acceptor对象最终会作为一个channel对象,注册到event_loop上,以便进行连接完成的事件分发和检测。

  • event_dispatcher

event_dispatcher是对事件分发机制的一种抽象,也就是说,可以实现一个基于poll的poll_dispatcher,也可以实现一个基于epoll的epoll_dispatcher。在这里,我们统一设计一个event_dispatcher结构体,来抽象这些行为。

  • channel_map

channel_map保存了描述字到channel的映射,这样就可以在事件发生时,根据事件类型对应的套接字快速找到channel对象里的事件处理函数。

I/O模型和多线程模型设计

I/O线程和多线程模型,主要解决event_loop的线程运行问题,以及事件分发和回调的线程执行问题。

  • thread_pool

thread_pool维护了一个sub-reactor的线程列表,它可以提供给主reactor线程使用,每次当有新的连接建立时,可以从thread_pool里获取一个线程,以便用它来完成对新连接套接字的read/write事件注册,将I/O线程和主reactor线程分离。

  • event_loop_thread

event_loop_thread是reactor的线程实现,连接套接字的read/write事件检测都是在这个线程里完成的。

Buffer和数据读写

  • buffer

buffer对象屏蔽了对套接字进行的写和读的操作,如果没有buffer对象,连接套接字的read/write事件都需要和字节流直接打交道,这显然是不友好的。所以,我们也提供了一个基本的buffer对象,用来表示从连接套接字收取的数据,以及应用程序即将需要发送出去的数据。

  • tcp_connection

tcp_connection这个对象描述的是已建立的TCP连接。它的属性包括接收缓冲区、发送缓冲区、channel对象等。这些都是一个TCP连接的天然属性。

tcp_connection是大部分应用程序和我们的高性能框架直接打交道的数据结构。我们不想把最下层的channel对象暴露给应用程序,因为抽象的channel对象不仅仅可以表示tcp_connection,前面提到的监听套接字也是一个channel对象,后面提到的唤醒socketpair也是一个 channel对象。所以,我们设计了tcp_connection这个对象,希望可以提供给用户比较清晰的编程入口。

反应堆模式设计

概述

下面,我们详细讲解一下以event_loop为核心的反应堆模式设计。这里有一张event_loop的运行详图,你可以对照这张图来理解。

当event_loop_run完成之后,线程进入循环,首先执行dispatch事件分发,一旦有事件发生,就会调用channel_event_activate函数,在这个函数中完成事件回调函数eventReadcallback和eventWritecallback的调用,最后再进行event_loop_handle_pending_channel,用来修改当前监听的事件列表,完成这个部分之后,又进入了事件分发循环。

event_loop分析

说event_loop是整个反应堆模式设计的核心,一点也不为过。先看一下event_loop的数据结构。

在这个数据结构中,最重要的莫过于event_dispatcher对象了。你可以简单地把event_dispatcher理解为poll或者epoll,它可以让我们的线程挂起,等待事件的发生。

这里有一个小技巧,就是event_dispatcher_data,它被定义为一个void *类型,可以按照我们的需求,任意放置一个我们需要的对象指针。这样,针对不同的实现,例如poll或者epoll,都可以根据需求,放置不同的数据对象。

event_loop中还保留了几个跟多线程有关的对象,如owner_thread_id是保留了每个event loop的线程ID,mutex和con是用来进行线程同步的。

socketPair是父线程用来通知子线程有新的事件需要处理。pending_head和pending_tail是保留在子线程内的需要处理的新事件。

struct event_loop {
    int quit;
    const struct event_dispatcher *eventDispatcher;

    /** 对应的event_dispatcher的数据. */
    void *event_dispatcher_data;
    struct channel_map *channelMap;

    int is_handle_pending;
    struct channel_element *pending_head;
    struct channel_element *pending_tail;

    pthread_t owner_thread_id;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int socketPair[2];
    char *thread_name;
};

下面我们看一下event_loop最主要的方法event_loop_run方法,前面提到过,event_loop就是一个无限while循环,不断地在分发事件。

/**
 *
 * 1.参数验证
 * 2.调用dispatcher来进行事件分发,分发完回调事件处理函数
 */
int event_loop_run(struct event_loop *eventLoop) {
    assert(eventLoop != NULL);

    struct event_dispatcher *dispatcher = eventLoop->eventDispatcher;

    if (eventLoop->owner_thread_id != pthread_self()) {
        exit(1);
    }

    yolanda_msgx("event loop run, %s", eventLoop->thread_name);
    struct timeval timeval;
    timeval.tv_sec = 1;

    while (!eventLoop->quit) {
        //block here to wait I/O event, and get active channels
        dispatcher->dispatch(eventLoop, &timeval);

        //handle the pending channel
        event_loop_handle_pending_channel(eventLoop);
    }

    yolanda_msgx("event loop end, %s", eventLoop->thread_name);
    return 0;
}

代码很明显地反映了这一点,这里我们在event_loop不退出的情况下,一直在循环,循环体中调用了dispatcher对象的dispatch方法来等待事件的发生。

event_dispacher分析

为了实现不同的事件分发机制,这里把poll、epoll等抽象成了一个event_dispatcher结构。event_dispatcher的具体实现有poll_dispatcher和epoll_dispatcher两种,实现的方法和性能篇 21 22讲 类似,这里就不再赘述,你如果有兴趣的话,可以直接研读代码。

/** 抽象的event_dispatcher结构体,对应的实现如select,poll,epoll等I/O复用. */
struct event_dispatcher {
    /**  对应实现 */
    const char *name;

    /**  初始化函数 */
    void *(*init)(struct event_loop * eventLoop);

    /** 通知dispatcher新增一个channel事件*/
    int (*add)(struct event_loop * eventLoop, struct channel * channel);

    /** 通知dispatcher删除一个channel事件*/
    int (*del)(struct event_loop * eventLoop, struct channel * channel);

    /** 通知dispatcher更新channel对应的事件*/
    int (*update)(struct event_loop * eventLoop, struct channel * channel);

    /** 实现事件分发,然后调用event_loop的event_activate方法执行callback*/
    int (*dispatch)(struct event_loop * eventLoop, struct timeval *);

    /** 清除数据 */
    void (*clear)(struct event_loop * eventLoop);
};

channel对象分析

channel对象是用来和event_dispather进行交互的最主要的结构体,它抽象了事件分发。一个channel对应一个描述字,描述字上可以有READ可读事件,也可以有WRITE可写事件。channel对象绑定了事件处理函数event_read_callback和event_write_callback。

typedef int (*event_read_callback)(void *data);

typedef int (*event_write_callback)(void *data);

struct channel {
    int fd;
    int events;   //表示event类型

    event_read_callback eventReadCallback;
    event_write_callback eventWriteCallback;
    void *data; //callback data, 可能是event_loop,也可能是tcp_server或者tcp_connection
};

channel_map对象分析

event_dispatcher在获得活动事件列表之后,需要通过文件描述字找到对应的channel,从而回调channel上的事件处理函数event_read_callback和event_write_callback,为此,设计了channel_map对象。

/**
 * channel映射表, key为对应的socket描述字
 */
struct channel_map {
    void **entries;

    /* The number of entries available in entries */
    int nentries;
};

channel_map对象是一个数组,数组的下标即为描述字,数组的元素为channel对象的地址。

比如描述字3对应的channel,就可以这样直接得到。

struct chanenl * channel = map->entries[3];

这样,当event_dispatcher需要回调channel上的读、写函数时,调用channel_event_activate就可以,下面是channel_event_activate的实现,在找到了对应的channel对象之后,根据事件类型,回调了读函数或者写函数。注意,这里使用了EVENT_READ和EVENT_WRITE来抽象了poll和epoll的所有读写事件类型。

int channel_event_activate(struct event_loop *eventLoop, int fd, int revents) {
    struct channel_map *map = eventLoop->channelMap;
    yolanda_msgx("activate channel fd == %d, revents=%d, %s", fd, revents, eventLoop->thread_name);

    if (fd < 0)
        return 0;

    if (fd >= map->nentries)return (-1);

    struct channel *channel = map->entries[fd];
    assert(fd == channel->fd);

    if (revents & (EVENT_READ)) {
        if (channel->eventReadCallback) channel->eventReadCallback(channel->data);
    }
    if (revents & (EVENT_WRITE)) {
        if (channel->eventWriteCallback) channel->eventWriteCallback(channel->data);
    }

    return 0;
}

增加、删除、修改channel event

那么如何增加新的channel event事件呢?下面这几个函数是用来增加、删除和修改channel event事件的。

int event_loop_add_channel_event(struct event_loop *eventLoop, int fd, struct channel *channel1);

int event_loop_remove_channel_event(struct event_loop *eventLoop, int fd, struct channel *channel1);

int event_loop_update_channel_event(struct event_loop *eventLoop, int fd, struct channel *channel1);

前面三个函数提供了入口能力,而真正的实现则落在这三个函数上:

int event_loop_handle_pending_add(struct event_loop *eventLoop, int fd, struct channel *channel);

int event_loop_handle_pending_remove(struct event_loop *eventLoop, int fd, struct channel *channel);

int event_loop_handle_pending_update(struct event_loop *eventLoop, int fd, struct channel *channel);

我们看一下其中的一个实现,event_loop_handle_pending_add在当前event_loop的channel_map里增加一个新的key-value对,key是文件描述字,value是channel对象的地址。之后调用event_dispatcher对象的add方法增加channel event事件。注意这个方法总在当前的I/O线程中执行。

// in the i/o thread
int event_loop_handle_pending_add(struct event_loop *eventLoop, int fd, struct channel *channel) {
    yolanda_msgx("add channel fd == %d, %s", fd, eventLoop->thread_name);
    struct channel_map *map = eventLoop->channelMap;

    if (fd < 0)
        return 0;

    if (fd >= map->nentries) {
        if (map_make_space(map, fd, sizeof(struct channel *)) == -1)
            return (-1);
    }

    //第一次创建,增加
    if ((map)->entries[fd] == NULL) {
        map->entries[fd] = channel;
        //add channel
        struct event_dispatcher *eventDispatcher = eventLoop->eventDispatcher;
        eventDispatcher->add(eventLoop, channel);
        return 1;
    }

    return 0;
}

总结

在这一讲里,我们介绍了高性能网络编程框架的主要设计思路和基本数据结构,以及反应堆设计相关的具体做法。在接下来的章节中,我们将继续编写高性能网络编程框架的线程模型以及读写Buffer部分。

32 IO模型和多线程模型实现

33 | 自己动手写高性能HTTP服务器(二):I/O模型和多线程模型实现

多线程设计的几个考虑

在我们的设计中,main reactor线程是一个acceptor线程,这个线程一旦创建,会以event_loop形式阻塞在event_dispatcher的dispatch方法上,实际上,它在等待监听套接字上的事件发生,也就是已完成的连接,一旦有连接完成,就会创建出连接对象tcp_connection,以及channel对象等。

当用户期望使用多个sub-reactor子线程时,主线程会创建多个子线程,每个子线程在创建之后,按照主线程指定的启动函数立即运行,并进行初始化。随之而来的问题是, 主线程如何判断子线程已经完成初始化并启动,继续执行下去呢?这是一个需要解决的重点问题。

在设置了多个线程的情况下,需要将新创建的已连接套接字对应的读写事件交给一个sub-reactor线程处理。所以,这里从thread_pool中取出一个线程, 通知这个线程有新的事件加入。而这个线程很可能是处于事件分发的阻塞调用之中,如何协调主线程数据写入给子线程,这是另一个需要解决的重点问题。

子线程是一个event_loop线程,它阻塞在dispatch上,一旦有事件发生,它就会查找channel_map,找到对应的处理函数并执行它。之后它就会增加、删除或修改pending事件,再次进入下一轮的dispatch。

这张图阐述了线程的运行关系。

为了方便你理解,我把对应的函数实现列在了另外一张图中。

主线程等待多个sub-reactor子线程初始化完

主线程需要等待子线程完成初始化,也就是需要获取子线程对应数据的反馈,而子线程初始化也是对这部分数据进行初始化,实际上这是一个多线程的通知问题。采用的做法在 前面 讲多线程的时候也提到过,使用mutex和condition两个主要武器。

下面这段代码是主线程发起的子线程创建,调用event_loop_thread_init对每个子线程初始化,之后调用event_loop_thread_start来启动子线程。注意,如果应用程序指定的线程池大小为0,则直接返回,这样acceptor和I/O事件都会在同一个主线程里处理,就退化为单reactor模式。

//一定是main thread发起
void thread_pool_start(struct thread_pool *threadPool) {
    assert(!threadPool->started);
    assertInSameThread(threadPool->mainLoop);

    threadPool->started = 1;
    void *tmp;

    if (threadPool->thread_number <= 0) {
        return;
    }

    threadPool->eventLoopThreads = malloc(threadPool->thread_number * sizeof(struct event_loop_thread));
    for (int i = 0; i < threadPool->thread_number; ++i) {
        event_loop_thread_init(&threadPool->eventLoopThreads[i], i);
        event_loop_thread_start(&threadPool->eventLoopThreads[i]);
    }
}

我们再看一下event_loop_thread_start这个方法,这个方法一定是主线程运行的。这里我使用了pthread_create创建了子线程,子线程一旦创建,立即执行event_loop_thread_run,我们稍后将看到,event_loop_thread_run进行了子线程的初始化工作。这个函数最重要的部分是使用了pthread_mutex_lock和pthread_mutex_unlock进行了加锁和解锁,并使用了pthread_cond_wait来守候eventLoopThread中的eventLoop的变量。

//由主线程调用,初始化一个子线程,并且让子线程开始运行event_loop
struct event_loop *event_loop_thread_start(struct event_loop_thread *eventLoopThread) {
    pthread_create(&eventLoopThread->thread_tid, NULL, &event_loop_thread_run, eventLoopThread);

    assert(pthread_mutex_lock(&eventLoopThread->mutex) == 0);

    while (eventLoopThread->eventLoop == NULL) {
        assert(pthread_cond_wait(&eventLoopThread->cond, &eventLoopThread->mutex) == 0);
    }
    assert(pthread_mutex_unlock(&eventLoopThread->mutex) == 0);

    yolanda_msgx("event loop thread started, %s", eventLoopThread->thread_name);
    return eventLoopThread->eventLoop;
}

为什么要这么做呢?看一下子线程的代码你就会大致明白。子线程执行函数event_loop_thread_run一上来也是进行了加锁,之后初始化event_loop对象,当初始化完成之后,调用了pthread_cond_signal函数来通知此时阻塞在pthread_cond_wait上的主线程。这样,主线程就会从wait中苏醒,代码得以往下执行。子线程本身也通过调用event_loop_run进入了一个无限循环的事件分发执行体中,等待子线程reator上注册过的事件发生。

void *event_loop_thread_run(void *arg) {
    struct event_loop_thread *eventLoopThread = (struct event_loop_thread *) arg;

    pthread_mutex_lock(&eventLoopThread->mutex);

    // 初始化化event loop,之后通知主线程
    eventLoopThread->eventLoop = event_loop_init();
    yolanda_msgx("event loop thread init and signal, %s", eventLoopThread->thread_name);
    pthread_cond_signal(&eventLoopThread->cond);

    pthread_mutex_unlock(&eventLoopThread->mutex);

    //子线程event loop run
    eventLoopThread->eventLoop->thread_name = eventLoopThread->thread_name;
    event_loop_run(eventLoopThread->eventLoop);
}

可以看到,这里主线程和子线程共享的变量正是每个event_loop_thread的eventLoop对象,这个对象在初始化的时候为NULL,只有当子线程完成了初始化,才变成一个非NULL的值,这个变化是子线程完成初始化的标志,也是信号量守护的变量。通过使用锁和信号量,解决了主线程和子线程同步的问题。当子线程完成初始化之后,主线程才会继续往下执行。

struct event_loop_thread {
    struct event_loop *eventLoop;
    pthread_t thread_tid;        /* thread ID */
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    char * thread_name;
    long thread_count;    /* # connections handled */
};

你可能会问,主线程是循环在等待每个子线程完成初始化,如果进入第二个循环,等待第二个子线程完成初始化,而此时第二个子线程已经初始化完成了,该怎么办?

注意我们这里一上来是加锁的,只要取得了这把锁,同时发现event_loop_thread的eventLoop对象已经变成非NULL值,可以肯定第二个线程已经初始化,就直接释放锁往下执行了。

你可能还会问,在执行pthread_cond_wait的时候,需要持有那把锁么?这里,父线程在调用pthread_cond_wait函数之后,会立即进入睡眠,并释放持有的那把互斥锁。而当父线程再从pthread_cond_wait返回时(这是子线程通过pthread_cond_signal通知达成的),该线程再次持有那把锁。

增加已连接套接字事件到sub-reactor线程中

前面提到,主线程是一个main reactor线程,这个线程负责检测监听套接字上的事件,当有事件发生时,也就是一个连接已完成建立,如果我们有多个sub-reactor子线程,我们期望的结果是,把这个已连接套接字相关的I/O事件交给sub-reactor子线程负责检测。这样的好处是,main reactor只负责连接套接字的建立,可以一直维持在一个非常高的处理效率,在多核的情况下,多个sub-reactor可以很好地利用上多核处理的优势。

不过,这里有一个令人苦恼的问题。

我们知道,sub-reactor线程是一个无限循环的event loop执行体,在没有已注册事件发生的情况下,这个线程阻塞在event_dispatcher的dispatch上。你可以简单地认为阻塞在poll调用或者epoll_wait上,这种情况下,主线程如何能把已连接套接字交给sub-reactor子线程呢?

当然有办法。

如果我们能让sub-reactor线程从event_dispatcher的dispatch上返回,再让sub-reactor线程返回之后能够把新的已连接套接字事件注册上,这件事情就算完成了。

那如何让sub-reactor线程从event_dispatcher的dispatch上返回呢?答案是构建一个类似管道一样的描述字,让event_dispatcher注册该管道描述字,当我们想让sub-reactor线程苏醒时,往管道上发送一个字符就可以了。

在event_loop_init函数里,调用了socketpair函数创建了套接字对,这个套接字对的作用就是我刚刚说过的,往这个套接字的一端写时,另外一端就可以感知到读的事件。其实,这里也可以直接使用UNIX上的pipe管道,作用是一样的。

struct event_loop *event_loop_init() {
    ...
    //add the socketfd to event 这里创建的是套接字对,目的是为了唤醒子线程
    eventLoop->owner_thread_id = pthread_self();
    if (socketpair(AF_UNIX, SOCK_STREAM, 0, eventLoop->socketPair) < 0) {
        LOG_ERR("socketpair set fialed");
    }
    eventLoop->is_handle_pending = 0;
    eventLoop->pending_head = NULL;
    eventLoop->pending_tail = NULL;
    eventLoop->thread_name = "main thread";

    struct channel *channel = channel_new(eventLoop->socketPair[1], EVENT_READ, handleWakeup, NULL, eventLoop);
    event_loop_add_channel_event(eventLoop, eventLoop->socketPair[1], channel);

    return eventLoop;
}

要特别注意的是这句代码,这告诉event_loop的,是注册了socketPair[1]描述字上的READ事件,如果有READ事件发生,就调用handleWakeup函数来完成事件处理。

struct channel *channel = channel_new(eventLoop->socketPair[1], EVENT_READ, handleWakeup, NULL, eventLoop);

我们来看看这个handleWakeup函数:

事实上,这个函数就是简单的从socketPair[1]描述字上读取了一个字符而已,除此之外,它什么也没干。它的主要作用就是让子线程从dispatch的阻塞中苏醒。

int handleWakeup(void * data) {
    struct event_loop *eventLoop = (struct event_loop *) data;
    char one;
    ssize_t n = read(eventLoop->socketPair[1], &one, sizeof one);
    if (n != sizeof one) {
        LOG_ERR("handleWakeup  failed");
    }
    yolanda_msgx("wakeup, %s", eventLoop->thread_name);
}

现在,我们再回过头看看,如果有新的连接产生,主线程是怎么操作的?在handle_connection_established中,通过accept调用获取了已连接套接字,将其设置为非阻塞套接字(切记),接下来调用thread_pool_get_loop获取一个event_loop。thread_pool_get_loop的逻辑非常简单,从thread_pool线程池中按照顺序挑选出一个线程来服务。接下来是创建了tcp_connection对象。

//处理连接已建立的回调函数
int handle_connection_established(void *data) {
    struct TCPserver *tcpServer = (struct TCPserver *) data;
    struct acceptor *acceptor = tcpServer->acceptor;
    int listenfd = acceptor->listen_fd;

    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    //获取这个已建立的套集字,设置为非阻塞套集字
    int connected_fd = accept(listenfd, (struct sockaddr *) &client_addr, &client_len);
    make_nonblocking(connected_fd);

    yolanda_msgx("new connection established, socket == %d", connected_fd);

    //从线程池里选择一个eventloop来服务这个新的连接套接字
    struct event_loop *eventLoop = thread_pool_get_loop(tcpServer->threadPool);

    // 为这个新建立套接字创建一个tcp_connection对象,并把应用程序的callback函数设置给这个tcp_connection对象
    struct tcp_connection *tcpConnection = tcp_connection_new(connected_fd, eventLoop,tcpServer->connectionCompletedCallBack,tcpServer->connectionClosedCallBack,tcpServer->messageCallBack,tcpServer->writeCompletedCallBack);
    //callback内部使用
    if (tcpServer->data != NULL) {
        tcpConnection->data = tcpServer->data;
    }
    return 0;
}

在调用tcp_connection_new创建tcp_connection对象的代码里,可以看到先是创建了一个channel对象,并注册了READ事件,之后调用event_loop_add_channel_event方法往子线程中增加channel对象。

tcp_connection_new(int connected_fd, struct event_loop *eventLoop,
                   connection_completed_call_back connectionCompletedCallBack,
                   connection_closed_call_back connectionClosedCallBack,
                   message_call_back messageCallBack, write_completed_call_back writeCompletedCallBack) {
    ...
    //为新的连接对象创建可读事件
    struct channel *channel1 = channel_new(connected_fd, EVENT_READ, handle_read, handle_write, tcpConnection);
    tcpConnection->channel = channel1;

    //完成对connectionCompleted的函数回调
    if (tcpConnection->connectionCompletedCallBack != NULL) {
        tcpConnection->connectionCompletedCallBack(tcpConnection);
    }

    //把该套集字对应的channel对象注册到event_loop事件分发器上
    event_loop_add_channel_event(tcpConnection->eventLoop, connected_fd, tcpConnection->channel);
    return tcpConnection;
}

请注意,到现在为止的操作都是在主线程里执行的。下面的event_loop_do_channel_event也不例外,接下来的行为我期望你是熟悉的,那就是加解锁。

如果能够获取锁,主线程就会调用event_loop_channel_buffer_nolock往子线程的数据中增加需要处理的channel event对象。所有增加的channel对象以列表的形式维护在子线程的数据结构中。

接下来的部分是重点,如果当前增加channel event的不是当前event loop线程自己,就会调用event_loop_wakeup函数把event_loop子线程唤醒。唤醒的方法很简单,就是往刚刚的socketPair[0]上写一个字节,别忘了,event_loop已经注册了socketPair[1]的可读事件。如果当前增加channel event的是当前event loop线程自己,则直接调用event_loop_handle_pending_channel处理新增加的channel event事件列表。

int event_loop_do_channel_event(struct event_loop *eventLoop, int fd, struct channel *channel1, int type) {
    //get the lock
    pthread_mutex_lock(&eventLoop->mutex);
    assert(eventLoop->is_handle_pending == 0);
    //往该线程的channel列表里增加新的channel
    event_loop_channel_buffer_nolock(eventLoop, fd, channel1, type);
    //release the lock
    pthread_mutex_unlock(&eventLoop->mutex);
    //如果是主线程发起操作,则调用event_loop_wakeup唤醒子线程
    if (!isInSameThread(eventLoop)) {
        event_loop_wakeup(eventLoop);
    } else {
        //如果是子线程自己,则直接可以操作
        event_loop_handle_pending_channel(eventLoop);
    }

    return 0;
}

如果是event_loop被唤醒之后,接下来也会执行event_loop_handle_pending_channel函数。你可以看到在循环体内从dispatch退出之后,也调用了event_loop_handle_pending_channel函数。

int event_loop_run(struct event_loop *eventLoop) {
    assert(eventLoop != NULL);

    struct event_dispatcher *dispatcher = eventLoop->eventDispatcher;

    if (eventLoop->owner_thread_id != pthread_self()) {
        exit(1);
    }

    yolanda_msgx("event loop run, %s", eventLoop->thread_name);
    struct timeval timeval;
    timeval.tv_sec = 1;

    while (!eventLoop->quit) {
        //block here to wait I/O event, and get active channels
        dispatcher->dispatch(eventLoop, &timeval);

        //这里处理pending channel,如果是子线程被唤醒,这个部分也会立即执行到
        event_loop_handle_pending_channel(eventLoop);
    }

    yolanda_msgx("event loop end, %s", eventLoop->thread_name);
    return 0;
}

event_loop_handle_pending_channel函数的作用是遍历当前event loop里pending的channel event列表,将它们和event_dispatcher关联起来,从而修改感兴趣的事件集合。

这里有一个点值得注意,因为event loop线程得到活动事件之后,会回调事件处理函数,这样像onMessage等应用程序代码也会在event loop线程执行,如果这里的业务逻辑过于复杂,就会导致event_loop_handle_pending_channel执行的时间偏后,从而影响I/O的检测。所以,将I/O线程和业务逻辑线程隔离,让I/O线程只负责处理I/O交互,让业务线程处理业务,是一个比较常见的做法。

总结

在这一讲里,我们重点讲解了框架中涉及多线程的两个重要问题,第一是主线程如何等待多个子线程完成初始化,第二是如何通知处于事件分发中的子线程有新的事件加入、删除、修改。第一个问题通过使用锁和信号量加以解决;第二个问题通过使用socketpair,并将sockerpair作为channel注册到event loop中来解决。

思考题

和往常一样,给你布置两道思考题:

第一道, 你可以修改一下代码,让sub-reactor默认的线程个数为cpu*2。

第二道,当前选择线程的算法是round-robin的算法,你觉得有没有改进的空间?如果改进的话,你可能会怎么做?

欢迎在评论区写下你的思考,也欢迎把这篇文章分享给你的朋友或者同事,一起交流进步一下。

33 TCP字节流处理和HTTP协议实现

34 | 自己动手写高性能HTTP服务器(三):TCP字节流处理和HTTP协议实现

buffer对象

你肯定在各种语言、各种框架里面看到过不同的buffer对象,buffer,顾名思义,就是一个缓冲区对象,缓存了从套接字接收来的数据以及需要发往套接字的数据。

如果是从套接字接收来的数据,事件处理回调函数在不断地往buffer对象增加数据,同时,应用程序需要不断把buffer对象中的数据处理掉,这样,buffer对象才可以空出新的位置容纳更多的数据。

如果是发往套接字的数据,应用程序不断地往buffer对象增加数据,同时,事件处理回调函数不断调用套接字上的发送函数将数据发送出去,减少buffer对象中的写入数据。

可见,buffer对象是同时可以作为输入缓冲(input buffer)和输出缓冲(output buffer)两个方向使用的,只不过,在两种情形下,写入和读出的对象是有区别的。

这张图描述了buffer对象的设计。

下面是buffer对象的数据结构。

//数据缓冲区
struct buffer {
    char *data;          //实际缓冲
    int readIndex;       //缓冲读取位置
    int writeIndex;      //缓冲写入位置
    int total_size;      //总大小
};

buffer对象中的writeIndex标识了当前可以写入的位置;readIndex标识了当前可以读出的数据位置,图中红色部分从readIndex到writeIndex的区域是需要读出数据的部分,而绿色部分从writeIndex到缓存的最尾端则是可以写出的部分。

随着时间的推移,当readIndex和writeIndex越来越靠近缓冲的尾端时,前面部分的front_space_size区域变得会很大,而这个区域的数据已经是旧数据,在这个时候,就需要调整一下整个buffer对象的结构,把红色部分往左侧移动,与此同时,绿色部分也会往左侧移动,整个缓冲区的可写部分就会变多了。

make_room函数就是起这个作用的,如果右边绿色的连续空间不足以容纳新的数据,而最左边灰色部分加上右边绿色部分一起可以容纳下新数据,就会触发这样的移动拷贝,最终红色部分占据了最左边,绿色部分占据了右边,右边绿色的部分成为一个连续的可写入空间,就可以容纳下新的数据。下面的一张图解释了这个过程。

下面是make_room的具体实现。

void make_room(struct buffer *buffer, int size) {
    if (buffer_writeable_size(buffer) >= size) {
        return;
    }
    //如果front_spare和writeable的大小加起来可以容纳数据,则把可读数据往前面拷贝
    if (buffer_front_spare_size(buffer) + buffer_writeable_size(buffer) >= size) {
        int readable = buffer_readable_size(buffer);
        int i;
        for (i = 0; i < readable; i++) {
            memcpy(buffer->data + i, buffer->data + buffer->readIndex + i, 1);
        }
        buffer->readIndex = 0;
        buffer->writeIndex = readable;
    } else {
        //扩大缓冲区
        void *tmp = realloc(buffer->data, buffer->total_size + size);
        if (tmp == NULL) {
            return;
        }
        buffer->data = tmp;
        buffer->total_size += size;
    }
}

当然,如果红色部分占据过大,可写部分不够,会触发缓冲区的扩大操作。这里我通过调用realloc函数来完成缓冲区的扩容。

下面这张图对此做了解释。

套接字接收数据处理

套接字接收数据是在tcp_connection.c中的handle_read来完成的。在这个函数里,通过调用buffer_socket_read函数接收来自套接字的数据流,并将其缓冲到buffer对象中。之后你可以看到,我们将buffer对象和tcp_connection对象传递给应用程序真正的处理函数messageCallBack来进行报文的解析工作。这部分的样例在HTTP报文解析中会展开。

int handle_read(void *data) {
    struct tcp_connection *tcpConnection = (struct tcp_connection *) data;
    struct buffer *input_buffer = tcpConnection->input_buffer;
    struct channel *channel = tcpConnection->channel;

    if (buffer_socket_read(input_buffer, channel->fd) > 0) {
        //应用程序真正读取Buffer里的数据
        if (tcpConnection->messageCallBack != NULL) {
            tcpConnection->messageCallBack(input_buffer, tcpConnection);
        }
    } else {
        handle_connection_closed(tcpConnection);
    }
}

在buffer_socket_read函数里,调用readv往两个缓冲区写入数据,一个是buffer对象,另外一个是这里的additional_buffer,之所以这样做,是担心buffer对象没办法容纳下来自套接字的数据流,而且也没有办法触发buffer对象的扩容操作。通过使用额外的缓冲,一旦判断出从套接字读取的数据超过了buffer对象里的实际最大可写大小,就可以触发buffer对象的扩容操作,这里buffer_append函数会调用前面介绍的make_room函数,完成buffer对象的扩容。

int buffer_socket_read(struct buffer *buffer, int fd) {
    char additional_buffer[INIT_BUFFER_SIZE];
    struct iovec vec[2];
    int max_writable = buffer_writeable_size(buffer);
    vec[0].iov_base = buffer->data + buffer->writeIndex;
    vec[0].iov_len = max_writable;
    vec[1].iov_base = additional_buffer;
    vec[1].iov_len = sizeof(additional_buffer);
    int result = readv(fd, vec, 2);
    if (result < 0) {
        return -1;
    } else if (result <= max_writable) {
        buffer->writeIndex += result;
    } else {
        buffer->writeIndex = buffer->total_size;
        buffer_append(buffer, additional_buffer, result - max_writable);
    }
    return result;
}

套接字发送数据处理

当应用程序需要往套接字发送数据时,即完成了read-decode-compute-encode过程后,通过往buffer对象里写入encode以后的数据,调用tcp_connection_send_buffer,将buffer里的数据通过套接字缓冲区发送出去。

int tcp_connection_send_buffer(struct tcp_connection *tcpConnection, struct buffer *buffer) {
    int size = buffer_readable_size(buffer);
    int result = tcp_connection_send_data(tcpConnection, buffer->data + buffer->readIndex, size);
    buffer->readIndex += size;
    return result;
}

如果发现当前channel没有注册WRITE事件,并且当前tcp_connection对应的发送缓冲无数据需要发送,就直接调用write函数将数据发送出去。如果这一次发送不完,就将剩余需要发送的数据拷贝到当前tcp_connection对应的发送缓冲区中,并向event_loop注册WRITE事件。这样数据就由框架接管,应用程序释放这部分数据。

//应用层调用入口
int tcp_connection_send_data(struct tcp_connection *tcpConnection, void *data, int size) {
    size_t nwrited = 0;
    size_t nleft = size;
    int fault = 0;

    struct channel *channel = tcpConnection->channel;
    struct buffer *output_buffer = tcpConnection->output_buffer;

    //先往套接字尝试发送数据
    if (!channel_write_event_registered(channel) && buffer_readable_size(output_buffer) == 0) {
        nwrited = write(channel->fd, data, size);
        if (nwrited >= 0) {
            nleft = nleft - nwrited;
        } else {
            nwrited = 0;
            if (errno != EWOULDBLOCK) {
                if (errno == EPIPE || errno == ECONNRESET) {
                    fault = 1;
                }
            }
        }
    }

    if (!fault && nleft > 0) {
        //拷贝到Buffer中,Buffer的数据由框架接管
        buffer_append(output_buffer, data + nwrited, nleft);
        if (!channel_write_event_registered(channel)) {
            channel_write_event_add(channel);
        }
    }

    return nwrited;
}

HTTP协议实现

下面,我们在TCP的基础上,加入HTTP的功能。

为此,我们首先定义了一个http_server结构,这个http_server本质上就是一个TCPServer,只不过暴露给应用程序的回调函数更为简单,只需要看到http_request和http_response结构。

typedef int (*request_callback)(struct http_request *httpRequest, struct http_response *httpResponse);

struct http_server {
    struct TCPserver *tcpServer;
    request_callback requestCallback;
};

在http_server里面,重点是需要完成报文的解析,将解析的报文转化为http_request对象,这件事情是通过http_onMessage回调函数来完成的。在http_onMessage函数里,调用的是parse_http_request完成报文解析。

// buffer是框架构建好的,并且已经收到部分数据的情况下
// 注意这里可能没有收到全部数据,所以要处理数据不够的情形
int http_onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {
    yolanda_msgx("get message from tcp connection %s", tcpConnection->name);

    struct http_request *httpRequest = (struct http_request *) tcpConnection->request;
    struct http_server *httpServer = (struct http_server *) tcpConnection->data;

    if (parse_http_request(input, httpRequest) == 0) {
        char *error_response = "HTTP/1.1 400 Bad Request\r\n\r\n";
        tcp_connection_send_data(tcpConnection, error_response, sizeof(error_response));
        tcp_connection_shutdown(tcpConnection);
    }

    //处理完了所有的request数据,接下来进行编码和发送
    if (http_request_current_state(httpRequest) == REQUEST_DONE) {
        struct http_response *httpResponse = http_response_new();

        //httpServer暴露的requestCallback回调
        if (httpServer->requestCallback != NULL) {
            httpServer->requestCallback(httpRequest, httpResponse);
        }

        //将httpResponse发送到套接字发送缓冲区中
        struct buffer *buffer = buffer_new();
        http_response_encode_buffer(httpResponse, buffer);
        tcp_connection_send_buffer(tcpConnection, buffer);

        if (http_request_close_connection(httpRequest)) {
            tcp_connection_shutdown(tcpConnection);
            http_request_reset(httpRequest);
        }
    }
}

还记得 第16讲中 讲到的HTTP协议吗?我们从16讲得知,HTTP通过设置回车符、换行符作为HTTP报文协议的边界。

parse_http_request的思路就是寻找报文的边界,同时记录下当前解析工作所处的状态。根据解析工作的前后顺序,把报文解析的工作分成REQUEST_STATUS、REQUEST_HEADERS、REQUEST_BODY和REQUEST_DONE四个阶段,每个阶段解析的方法各有不同。

在解析状态行时,先通过定位CRLF回车换行符的位置来圈定状态行,进入状态行解析时,再次通过查找空格字符来作为分隔边界。

在解析头部设置时,也是先通过定位CRLF回车换行符的位置来圈定一组key-value对,再通过查找冒号字符来作为分隔边界。

最后,如果没有找到冒号字符,说明解析头部的工作完成。

parse_http_request函数完成了HTTP报文解析的四个阶段:

int parse_http_request(struct buffer *input, struct http_request *httpRequest) {
    int ok = 1;
    while (httpRequest->current_state != REQUEST_DONE) {
        if (httpRequest->current_state == REQUEST_STATUS) {
            char *crlf = buffer_find_CRLF(input);
            if (crlf) {
                int request_line_size = process_status_line(input->data + input->readIndex, crlf, httpRequest);
                if (request_line_size) {
                    input->readIndex += request_line_size;  // request line size
                    input->readIndex += 2;  //CRLF size
                    httpRequest->current_state = REQUEST_HEADERS;
                }
            }
        } else if (httpRequest->current_state == REQUEST_HEADERS) {
            char *crlf = buffer_find_CRLF(input);
            if (crlf) {
                /**
                 *    <start>-------<colon>:-------<crlf>
                 */
                char *start = input->data + input->readIndex;
                int request_line_size = crlf - start;
                char *colon = memmem(start, request_line_size, ": ", 2);
                if (colon != NULL) {
                    char *key = malloc(colon - start + 1);
                    strncpy(key, start, colon - start);
                    key[colon - start] = '\0';
                    char *value = malloc(crlf - colon - 2 + 1);
                    strncpy(value, colon + 1, crlf - colon - 2);
                    value[crlf - colon - 2] = '\0';

                    http_request_add_header(httpRequest, key, value);

                    input->readIndex += request_line_size;  //request line size
                    input->readIndex += 2;  //CRLF size
                } else {
                    //读到这里说明:没找到,就说明这个是最后一行
                    input->readIndex += 2;  //CRLF size
                    httpRequest->current_state = REQUEST_DONE;
                }
            }
        }
    }
    return ok;
}

处理完了所有的request数据,接下来进行编码和发送的工作。为此,创建了一个http_response对象,并调用了应用程序提供的编码函数requestCallback,接下来,创建了一个buffer对象,函数http_response_encode_buffer用来将http_response中的数据,根据HTTP协议转换为对应的字节流。

可以看到,http_response_encode_buffer设置了如Content-Length等http_response头部,以及http_response的body部分数据。

void http_response_encode_buffer(struct http_response *httpResponse, struct buffer *output) {
    char buf[32];
    snprintf(buf, sizeof buf, "HTTP/1.1 %d ", httpResponse->statusCode);
    buffer_append_string(output, buf);
    buffer_append_string(output, httpResponse->statusMessage);
    buffer_append_string(output, "\r\n");

    if (httpResponse->keep_connected) {
        buffer_append_string(output, "Connection: close\r\n");
    } else {
        snprintf(buf, sizeof buf, "Content-Length: %zd\r\n", strlen(httpResponse->body));
        buffer_append_string(output, buf);
        buffer_append_string(output, "Connection: Keep-Alive\r\n");
    }

    if (httpResponse->response_headers != NULL && httpResponse->response_headers_number > 0) {
        for (int i = 0; i < httpResponse->response_headers_number; i++) {
            buffer_append_string(output, httpResponse->response_headers[i].key);
            buffer_append_string(output, ": ");
            buffer_append_string(output, httpResponse->response_headers[i].value);
            buffer_append_string(output, "\r\n");
        }
    }

    buffer_append_string(output, "\r\n");
    buffer_append_string(output, httpResponse->body);
}

完整的HTTP服务器例子

现在,编写一个HTTP服务器例子就变得非常简单。

在这个例子中,最主要的部分是onRequest callback函数,这里,onRequest方法已经在parse_http_request之后,可以根据不同的http_request的信息,进行计算和处理。例子程序里的逻辑非常简单,根据http request的URL path,返回了不同的http_response类型。比如,当请求为根目录时,返回的是200和HTML格式。

#include <lib/acceptor.h>
#include <lib/http_server.h>
#include "lib/common.h"
#include "lib/event_loop.h"

//数据读到buffer之后的callback
int onRequest(struct http_request *httpRequest, struct http_response *httpResponse) {
    char *url = httpRequest->url;
    char *question = memmem(url, strlen(url), "?", 1);
    char *path = NULL;
    if (question != NULL) {
        path = malloc(question - url);
        strncpy(path, url, question - url);
    } else {
        path = malloc(strlen(url));
        strncpy(path, url, strlen(url));
    }

    if (strcmp(path, "/") == 0) {
        httpResponse->statusCode = OK;
        httpResponse->statusMessage = "OK";
        httpResponse->contentType = "text/html";
        httpResponse->body = "<html><head><title>This is network programming</title></head><body><h1>Hello, network programming</h1></body></html>";
    } else if (strcmp(path, "/network") == 0) {
        httpResponse->statusCode = OK;
        httpResponse->statusMessage = "OK";
        httpResponse->contentType = "text/plain";
        httpResponse->body = "hello, network programming";
    } else {
        httpResponse->statusCode = NotFound;
        httpResponse->statusMessage = "Not Found";
        httpResponse->keep_connected = 1;
    }

    return 0;
}

int main(int c, char **v) {
    //主线程event_loop
    struct event_loop *eventLoop = event_loop_init();

    //初始tcp_server,可以指定线程数目,如果线程是0,就是在这个线程里acceptor+i/o;如果是1,有一个I/O线程
    //tcp_server自己带一个event_loop
    struct http_server *httpServer = http_server_new(eventLoop, SERV_PORT, onRequest, 2);
    http_server_start(httpServer);

    // main thread for acceptor
    event_loop_run(eventLoop);
}

运行这个程序之后,我们可以通过浏览器和curl命令来访问它。你可以同时开启多个浏览器和curl命令,这也证明了我们的程序是可以满足高并发需求的。

$curl -v http://127.0.0.1:43211/
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 43211 (#0)
> GET / HTTP/1.1
> Host: 127.0.0.1:43211
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Length: 116
< Connection: Keep-Alive
<
* Connection #0 to host 127.0.0.1 left intact
<html><head><title>This is network programming</title></head><body><h1>Hello, network programming</h1></body></html>%

总结

这一讲我们主要讲述了整个编程框架的字节流处理能力,引入了buffer对象,并在此基础上通过增加HTTP的特性,包括http_server、http_request、http_response,完成了HTTP高性能服务器的编写。实例程序利用框架提供的能力,编写了一个简单的HTTP服务器程序。

34 编写高性能网络编程框架时,都需要注意哪些问题?

35 | 答疑:编写高性能网络编程框架时,都需要注意哪些问题?

为什么在发送数据时,会先尝试通过socket直接发送,再由框架接管呢?

这个问题具体描述是下面这样的。

当应用程序需要发送数据时,比如下面这段,在完成数据读取和回应的编码之后,会调用tcp_connection_send_buffer方法发送数据。

//数据读到buffer之后的callback
int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {
    printf("get message from tcp connection %s\n", tcpConnection->name);
    printf("%s", input->data);

    struct buffer *output = buffer_new();
    int size = buffer_readable_size(input);
    for (int i = 0; i < size; i++) {
        buffer_append_char(output, rot13_char(buffer_read_char(input)));
    }
    tcp_connection_send_buffer(tcpConnection, output);
    return 0;
}

而tcp_connection_send_buffer方法则会调用tcp_connection_send_data来发送数据:

int tcp_connection_send_buffer(struct tcp_connection *tcpConnection, struct buffer *buffer) {
    int size = buffer_readable_size(buffer);
    int result = tcp_connection_send_data(tcpConnection, buffer->data + buffer->readIndex, size);
    buffer->readIndex += size;
    return result;
}

在tcp_connection_send_data中,如果发现当前 channel 没有注册 WRITE 事件,并且当前 tcp_connection 对应的发送缓冲无数据需要发送,就直接调用 write 函数将数据发送出去。

//应用层调用入口
int tcp_connection_send_data(struct tcp_connection *tcpConnection, void *data, int size) {
    size_t nwrited = 0;
    size_t nleft = size;
    int fault = 0;

    struct channel *channel = tcpConnection->channel;
    struct buffer *output_buffer = tcpConnection->output_buffer;

    //先往套接字尝试发送数据
    if (!channel_write_event_is_enabled(channel) && buffer_readable_size(output_buffer) == 0) {
        nwrited = write(channel->fd, data, size);
        if (nwrited >= 0) {
            nleft = nleft - nwrited;
        } else {
            nwrited = 0;
            if (errno != EWOULDBLOCK) {
                if (errno == EPIPE || errno == ECONNRESET) {
                    fault = 1;
                }
            }
        }
    }

    if (!fault && nleft > 0) {
        //拷贝到Buffer中,Buffer的数据由框架接管
        buffer_append(output_buffer, data + nwrited, nleft);
        if (!channel_write_event_is_enabled(channel)) {
            channel_write_event_enable(channel);
        }
    }

    return nwrited;
}

为啥不能做成无论有没有 WRITE 事件都统一往发送缓冲区写,再把WRITE 事件注册到event_loop中呢?

如果用一句话来总结的话,这是为了发送效率。

我们来分析一下,应用层读取数据,进行编码,之后的这个buffer对象是应用层创建的,数据也在应用层这个buffer对象上。你可以理解,tcp_connection_send_data里面的data数据其实是应用层缓冲的,而不是我们tcp_connection这个对象里面的buffer。

如果我们跳过直接往套接字发送这一段,而是把数据交给我们的tcp_connection对应的output_buffer,这里有一个数据拷贝的过程,它发生在buffer_append里面。

int buffer_append(struct buffer *buffer, void *data, int size) {
    if (data != NULL) {
        make_room(buffer, size);
        //拷贝数据到可写空间中
        memcpy(buffer->data + buffer->writeIndex, data, size);
        buffer->writeIndex += size;
    }
}

但是,如果增加了一段判断来直接往套接字发送,其实就跳过了这段拷贝,直接把数据发往到了套接字发生缓冲区。

//先往套接字尝试发送数据
if (!channel_write_event_is_enabled(channel) && buffer_readable_size(output_buffer) == 0) {
        nwrited = write(channel->fd, data, size)
        ...

在绝大部分场景下,这种处理方式已经满足数据发送的需要了,不再需要把数据拷贝到tcp_connection对象中的output_buffer中。

如果不满足直接往套接字发送的条件,比如已经注册了回调事件,或者output_buffer里面有数据需要发送,那么就把数据拷贝到output_buffer中,让event_loop的回调不断地驱动handle_write将数据从output_buffer发往套接字缓冲区中。

//发送缓冲区可以往外写
//把channel对应的output_buffer不断往外发送
int handle_write(void *data) {
    struct tcp_connection *tcpConnection = (struct tcp_connection *) data;
    struct event_loop *eventLoop = tcpConnection->eventLoop;
    assertInSameThread(eventLoop);

    struct buffer *output_buffer = tcpConnection->output_buffer;
    struct channel *channel = tcpConnection->channel;

    ssize_t nwrited = write(channel->fd, output_buffer->data + output_buffer->readIndex,buffer_readable_size(output_buffer));
    if (nwrited > 0) {
        //已读nwrited字节
        output_buffer->readIndex += nwrited;
        //如果数据完全发送出去,就不需要继续了
        if (buffer_readable_size(output_buffer) == 0) {
            channel_write_event_disable(channel);
        }
        //回调writeCompletedCallBack
        if (tcpConnection->writeCompletedCallBack != NULL) {
            tcpConnection->writeCompletedCallBack(tcpConnection);
        }
    } else {
        yolanda_msgx("handle_write for tcp connection %s", tcpConnection->name);
    }

}

你可以这样想象,在一个非常高效的处理条件下,你需要发送什么,都直接发送给了套接字缓冲区;而当网络条件变差,处理效率变慢,或者待发送的数据极大,一次发送不可能完成的时候,这部分数据被框架缓冲到tcp_connection的发送缓冲区对象output_buffer中,由事件分发机制来负责把这部分数据发送给套接字缓冲区。

关于回调函数的设计

在epoll-server-multithreads.c里面定义了很多回调函数,比如onMessage, onConnectionCompleted等,这些回调函数被用于创建一个TCPServer,但是在tcp_connection对照中,又实现了handle_read handle_write 等事件的回调,似乎有两层回调,为什么要这样封装两层回调呢?

这里如果说回调函数,确实有两个不同层次的回调函数。

第一个层次是框架定义的,对连接的生命周期管理的回调。包括连接建立完成后的回调、报文读取并接收到output缓冲区之后的回调、报文发送到套接字缓冲区之后的回调,以及连接关闭时的回调。分别是connectionCompletedCallBack、messageCallBack、writeCompletedCallBack,以及connectionClosedCallBack。

struct tcp_connection {
    struct event_loop *eventLoop;
    struct channel *channel;
    char *name;
    struct buffer *input_buffer;   //接收缓冲区
    struct buffer *output_buffer;  //发送缓冲区

    connection_completed_call_back connectionCompletedCallBack;
    message_call_back messageCallBack;
    write_completed_call_back writeCompletedCallBack;
    connection_closed_call_back connectionClosedCallBack;

    void * data; //for callback use: http_server
    void * request; // for callback use
    void * response; // for callback use
};

为什么要定义这四个回调函数呢?

因为框架需要提供给应用程序和框架的编程接口,我把它总结为编程连接点,或者叫做program-hook-point。就像是设计了一个抽象类,这个抽象类代表了框架给你提供的一个编程入口,你可以继承这个抽象类,完成一些方法的填充,这些方法和框架类一起工作,就可以表现出一定符合逻辑的行为。

比如我们定义一个抽象类People,这个类的其他属性,包括它的创建和管理都可以交给框架来完成,但是你需要完成两个函数,一个是on_sad,这个人悲伤的时候干什么;另一个是on_happy,这个人高兴的时候干什么。

abstract class People{
  void on_sad();

  void on_happy();
}

这样,我们可以试着把tcp_connection改成这样:

abstract class TCP_connection{
  void on_connection_completed();

  void on_message();

  void on_write_completed();

  void on_connectin_closed();
}

这个层次的回调,更像是一层框架和应用程序约定的接口,接口实现由应用程序来完成,框架负责在合适的时候调用这些预定义好的接口,回调的意思体现在“框架会调用预定好的接口实现”。

比如,当连接建立成功,一个新的connection创建出来,connectionCompletedCallBack函数会被回调:

struct tcp_connection *
tcp_connection_new(int connected_fd, struct event_loop *eventLoop,
connection_completed_call_back connectionCompletedCallBack,
connection_closed_call_back connectionClosedCallBack,
message_call_back messageCallBack,
write_completed_call_back writeCompletedCallBack) {
    ...
    // add event read for the new connection
    struct channel *channel1 = channel_new(connected_fd, EVENT_READ, handle_read, handle_write, tcpConnection);
    tcpConnection->channel = channel1;

    //connectionCompletedCallBack callback
    if (tcpConnection->connectionCompletedCallBack != NULL) {
        tcpConnection->connectionCompletedCallBack(tcpConnection);
    }

   ...
}

第二个层次的回调,是基于epoll、poll事件分发机制的回调。通过注册一定的读、写事件,在实际事件发生时,由事件分发机制保证对应的事件回调函数被及时调用,完成基于事件机制的网络I/O处理。

在每个连接建立之后,创建一个对应的channel对象,并为这个channel对象赋予了读、写回调函数:

// add event read for the new connection
struct channel *channel1 = channel_new(connected_fd, EVENT_READ, handle_read, handle_write, tcpConnection);

handle_read函数,对应用程序屏蔽了套接字的读操作,把数据缓冲到tcp_connection的input_buffer中,而且,它还起到了编程连接点和框架的耦合器的作用,这里分别调用了messageCallBack和connectionClosedCallBack函数,完成了应用程序编写部分代码在框架的“代入”。

int handle_read(void *data) {
    struct tcp_connection *tcpConnection = (struct tcp_connection *) data;
    struct buffer *input_buffer = tcpConnection->input_buffer;
    struct channel *channel = tcpConnection->channel;

    if (buffer_socket_read(input_buffer, channel->fd) > 0) {
        //应用程序真正读取Buffer里的数据
        if (tcpConnection->messageCallBack != NULL) {
            tcpConnection->messageCallBack(input_buffer, tcpConnection);
        }
    } else {
        handle_connection_closed(tcpConnection);
    }
}

handle_write函数则负责把tcp_connection对象里的output_buffer源源不断地送往套接字发送缓冲区。

//发送缓冲区可以往外写
//把channel对应的output_buffer不断往外发送
int handle_write(void *data) {
    struct tcp_connection *tcpConnection = (struct tcp_connection *) data;
    struct event_loop *eventLoop = tcpConnection->eventLoop;
    assertInSameThread(eventLoop);

    struct buffer *output_buffer = tcpConnection->output_buffer;
    struct channel *channel = tcpConnection->channel;

    ssize_t nwrited = write(channel->fd, output_buffer->data + output_buffer->readIndex,buffer_readable_size(output_buffer));
    if (nwrited > 0) {
        //已读nwrited字节
        output_buffer->readIndex += nwrited;
        //如果数据完全发送出去,就不需要继续了
        if (buffer_readable_size(output_buffer) == 0) {
            channel_write_event_disable(channel);
        }
        //回调writeCompletedCallBack
        if (tcpConnection->writeCompletedCallBack != NULL) {
            tcpConnection->writeCompletedCallBack(tcpConnection);
        }
    } else {
        yolanda_msgx("handle_write for tcp connection %s", tcpConnection->name);
    }

}

tcp_connection对象设计的想法是什么,和channel有什么联系和区别?

tcp_connection对象似乎和channel对象有着非常紧密的联系,为什么要单独设计一个tcp_connection呢?

我也提到了,开始的时候我并不打算设计一个tcp_connection对象的,后来我才发现非常有必要存在一个tcp_connection对象。

第一,我需要在暴露给应用程序的onMessage,onConnectionCompleted等回调函数里,传递一个有用的数据结构,这个数据结构必须有一定的现实语义,可以携带一定的信息,比如套接字、缓冲区等,而channel对象过于单薄,和连接的语义相去甚远。

第二,这个channel对象是抽象的,比如acceptor,比如socketpair等,它们都是一个channel,只要能引起事件的发生和传递,都是一个channel,基于这一点,我也觉得最好把chanel作为一个内部实现的细节,不要通过回调函数暴露给应用程序。

第三,在后面实现HTTP的过程中,我发现需要在上下文中保存http_request和http_response数据,而这个部分数据放在channel中是非常不合适的,所以才有了最后的tcp_connection对象。

struct tcp_connection {
    struct event_loop *eventLoop;
    struct channel *channel;
    char *name;
    struct buffer *input_buffer;   //接收缓冲区
    struct buffer *output_buffer;  //发送缓冲区

    connection_completed_call_back connectionCompletedCallBack;
    message_call_back messageCallBack;
    write_completed_call_back writeCompletedCallBack;
    connection_closed_call_back connectionClosedCallBack;

    void * data; //for callback use: http_server
    void * request; // for callback use
    void * response; // for callback use
};

简单总结下来就是,每个tcp_connection对象一定包含了一个channel对象,而channel对象未必是一个tcp_connection对象。

主线程等待子线程完成的同步锁问题

有人在加锁这里有个疑问,如果加锁的目的是让主线程等待子线程初始化event_loop,那不加锁不是也可以达到这个目的吗?主线程while循环里面不断判断子线程的event_loop是否不为null不就可以了?为什么一定要加一把锁呢?

//由主线程调用,初始化一个子线程,并且让子线程开始运行event_loop
struct event_loop *event_loop_thread_start(struct event_loop_thread *eventLoopThread) {
    pthread_create(&eventLoopThread->thread_tid, NULL, &event_loop_thread_run, eventLoopThread);

    assert(pthread_mutex_lock(&eventLoopThread->mutex) == 0);

    while (eventLoopThread->eventLoop == NULL) {
        assert(pthread_cond_wait(&eventLoopThread->cond, &eventLoopThread->mutex) == 0);
    }
    assert(pthread_mutex_unlock(&eventLoopThread->mutex) == 0);

    yolanda_msgx("event loop thread started, %s", eventLoopThread->thread_name);
    return eventLoopThread->eventLoop;
}

要回答这个问题,就要解释多线程下共享变量竞争的问题。我们知道,一个共享变量在多个线程下同时作用,如果没有锁的控制,就会引起变量的不同步。这里的共享变量就是每个eventLoopThread的eventLoop对象。

这里如果我们不加锁,一直循环判断每个eventLoopThread的状态,会对CPU增加很大的消耗,如果使用锁-信号量的方式来加以解决,就变得很优雅,而且不会对CPU造成过多的影响。

关于channel_map的设计,特别是内存方面的设计。

我们来详细介绍一下channel_map。

channel_map实际上是一个指针数组,这个数组里面的每个元素都是一个指针,指向了创建出的channel对象。我们用数据下标和套接字进行了映射,这样虽然有些元素是浪费了,比如stdin,stdout,stderr代表的套接字0、1和2,但是总体效率是非常高的。

你在这里可以看到图中描绘了channel_map的设计。

而且,我们的channel_map还不会太占用内存,在最开始的时候,整个channel_map的指针数组大小为0,当这个channel_map投入使用时,会根据实际使用的套接字的增长,按照32、64、128这样的速度成倍增长,这样既保证了实际的需求,也不会一下子占用太多的内存。

此外,当指针数组增长时,我们不会销毁原来的部分,而是使用realloc()把旧的内容搬过去,再使用memset() 用来给新申请的内存初始化为0值,这样既高效也节省内存。


文章作者: Merlin
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Merlin !
  目录