Twemproxy(Nutcracker)源码分析-connection的类别和创建

Twemproxy是比较优秀的轻量级的Redis和Memcached代理,是Twitter开源的,主要实现Redis和Memcached的集群化。(Redis3.0自带集群化,但是不适合跨地域集群情况,存在运维一系列问题)。

*                   nc_connection.[ch]
 *                Connection (struct conn)
 *                 +         +          +
 *                 |         |          |
 *                 |       Proxy        |
 *                 |     nc_proxy.[ch]  |
 *                 /                    \
 *              Client                Server
 *           nc_client.[ch]         nc_server.[ch]
 *

上面是nc中存在的三种类型的连接,Client conn、Proxy conn和Server conn。

1 Proxy connection

Proxy connection的作用是Accept Client的连接请求,其初始化在nc_connection.c/conn_get_proxy

conn->proxy = 1;

    conn->recv = proxy_recv;
    conn->recv_next = NULL;
    conn->recv_done = NULL;

    conn->send = NULL;
    conn->send_next = NULL;
    conn->send_done = NULL;

通过_conn_get函数初始化连接后设置了recv钩子函数为proxy_recv,,用于在收到Client的连接请求时的处理。没有send钩子,因为该连接的作用只是Accept连接请求。

2 Client connection

Client connection是在上面proxy_recv函数处理时,在accept连接后通过

c = conn_get(p->owner, true, p->redis);

该函数也会调用_conn_get函数来从空闲连接队列中获取空闲连接否则重新创建连接,并且对conn变量属性初始化和维护有关计数统计变量。

上面函数的原型为:

struct conn *
conn_get(void *owner, bool client, bool redis)

第一个参数是该连接的属主,第二个参数是该连接是否是客户端的连接,否则是服务端连接,第三个参数是标明该连接是redis还是memcached。

根据client参数分设置该conn的函数钩子。

if (conn->client) {
        /*
         * client receives a request, possibly parsing it, and sends a
         * response downstream.
         */
        conn->recv = msg_recv;
        conn->recv_next = req_recv_next;
        conn->recv_done = req_recv_done;

        conn->send = msg_send;
        conn->send_next = rsp_send_next;
        conn->send_done = rsp_send_done;

        conn->close = client_close;
        conn->active = client_active;

        conn->ref = client_ref;
        conn->unref = client_unref;

        conn->enqueue_inq = NULL;
        conn->dequeue_inq = NULL;
        conn->enqueue_outq = req_client_enqueue_omsgq;
        conn->dequeue_outq = req_client_dequeue_omsgq;
        
        ncurr_cconn++;
    } else {
        /*
         * server receives a response, possibly parsing it, and sends a
         * request upstream.
         */
        conn->recv = msg_recv;
        conn->recv_next = rsp_recv_next;
        conn->recv_done = rsp_recv_done;

        conn->send = msg_send;
        conn->send_next = req_send_next;
        conn->send_done = req_send_done;

        conn->close = server_close;
        conn->active = server_active;

        conn->ref = server_ref;
        conn->unref = server_unref;

        conn->enqueue_inq = req_server_enqueue_imsgq;
        conn->dequeue_inq = req_server_dequeue_imsgq;
        conn->enqueue_outq = req_server_enqueue_omsgq;
        conn->dequeue_outq = req_server_dequeue_omsgq;
    }

    conn->ref(conn, owner);
3 Server connection

Server connection是nc与后端redis server的连接,该类连接的创建可以根据配置在系统启动后创建连接池,连接的动态创建或获取是在nc_request.c/req_forword函数中,正常情况下的顺序是将该请求从clinet的in_q队列中移除,加入client的out_q队列,调用server_pool_conn获取连接。精简后代码如下:

static void
req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
{
    rstatus_t status;
    struct conn *s_conn;
    struct server_pool *pool;
    uint8_t *key;
    uint32_t keylen;
    struct keypos *kpos;

    /* enqueue message (request) into client outq, if response is expected */
    if (!msg->noreply) {
        c_conn->enqueue_outq(ctx, c_conn, msg);
    }

    pool = c_conn->owner;
    kpos = array_get(msg->keys, 0);
    key = kpos->start;
    keylen = (uint32_t)(kpos->end - kpos->start);

    s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen);
    
    s_conn->enqueue_inq(ctx, s_conn, msg);

    req_forward_stats(ctx, s_conn->owner, msg);

}

其中的server_pool_conn函数是根据key和key长度来选择出一个连接。具体分两步:首先根据key和key长度从server pool中选择一个server;然后选择一个该server的连接。精简后代码如下:

struct conn *
server_pool_conn(struct context *ctx, struct server_pool *pool, uint8_t *key,
                 uint32_t keylen)
{
    status = server_pool_update(pool);
    /* from a given {key, keylen} pick a server from pool */
    server = server_pool_server(pool, key, keylen);

    /* pick a connection to a given server */
    conn = server_conn(server);

    status = server_connect(ctx, server, conn);

    return conn;
}

可以看到函数server_pool_server函数完成了根据key和key长度从pool中选择机器的工作,实现细节这里暂时不介绍(其实是我具体细节还没看到:-)),函数server_conn完成了选择一个该机器的的连接,如果该机器现有的连接数少于配置的连接数,会通过调用conn_get(server, false, pool->redis)后返回该新建的连接,否则从该机器的TAILQ连接队列中的选择队头元素(保持LRU特性),并将该连接重新加入队尾,并将该连接返回。

About 智足者富

http://chenpeng.info

发表评论

电子邮件地址不会被公开。 必填项已用*标注

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>