您现在的位置:爱折腾>> Linux>>正文内容

Linux c 开发 - Memcached源码分析之命令解析(2)

前言
从我们上一章《Linux c 开发 - Memcached源码分析之基于Libevent的网络模型(1)》我们基本了解了Memcached的网络模型。这一章节,我们需要详细解读Memcached的命令解析。
我们回顾上一章发现Memcached会分成主线程和N个工作线程。主线程主要用于监听accpet客户端的Socket连接,而工作线程主要用于接管具体的客户端连接。
主线程和工作线程之间主要通过基于Libevent的pipe的读写事件来监听,当有连接练上来的时候,主线程会将连接交个某一个工作线程去接管,后期客户端和服务端的读写工作都会在这个工作线程中进行。
工作线程也是基于Libevent的事件的,当有读或者写的事件进来的时候,就会触发事件的回调函数。
那么Memcached是如何来解析客户端上传的命令数据报文的呢?这一章我们详细讲解命令的解析过程,下一章会讲解Memcached对客户端的回应。

Memcached的命令解析源码分析
conn数据结构
每一个连接都会有自己的一个conn数据结构。这个结构主要存储每个连接的基本信息。
这一章中用到的几个比较重要的参数:
char * rbuf:用于存储客户端数据报文中的命令。
int rsize:rbuf的大小。
char * rcurr:未解析的命令的字符指针。
int rbytes:为解析的命令的长度。


  1. typedef struct conn conn;    
  2. struct conn {    
  3.     int    sfd;    
  4.     sasl_conn_t *sasl_conn;    
  5.     bool authenticated;    
  6.     enum conn_states  state;    
  7.     enum bin_substates substate;    
  8.     rel_time_t last_cmd_time;    
  9.     struct event event;    
  10.     short  ev_flags;    
  11.     short  which;   /** which events were just triggered */    
  12.     
  13.     char   *rbuf;   /** buffer to read commands into */    
  14.     char   *rcurr;  /** but if we parsed some already, this is where we stopped */    
  15.     int    rsize;   /** total allocated size of rbuf */    
  16.     int    rbytes;  /** how much data, starting from rcur, do we have unparsed */    
  17.     
  18.     char   *wbuf;    
  19.     char   *wcurr;    
  20.     int    wsize;    
  21.     int    wbytes;    
  22.     /** which state to go into after finishing current write */    
  23.     enum conn_states  write_and_go;    
  24.     void   *write_and_free; /** free this memory after finishing writing */    
  25.     
  26.     char   *ritem;  /** when we read in an item's value, it goes here */    
  27.     int    rlbytes;    
  28.     
  29.     /* data for the nread state */    
  30.     
  31.     /** 
  32.      * item is used to hold an item structure created after reading the command   
  33.      * line of set/add/replace commands, but before we finished reading the actual   
  34.      * data. The data is read into ITEM_data(item) to avoid extra copying.   
  35.      */    
  36.     
  37.     void   *item;     /* for commands set/add/replace  */    
  38.     
  39.     /* data for the swallow state */    
  40.     int    sbytes;    /* how many bytes to swallow */    
  41.     
  42.     /* data for the mwrite state */    
  43.     struct iovec *iov;    
  44.     int    iovsize;   /* number of elements allocated in iov[] */    
  45.     int    iovused;   /* number of elements used in iov[] */    
  46.     
  47.     struct msghdr *msglist;    
  48.     int    msgsize;   /* number of elements allocated in msglist[] */    
  49.     int    msgused;   /* number of elements used in msglist[] */    
  50.     int    msgcurr;   /* element in msglist[] being transmitted now */    
  51.     int    msgbytes;  /* number of bytes in current msg */    
  52.     
  53.     item   **ilist;   /* list of items to write out */    
  54.     int    isize;    
  55.     item   **icurr;    
  56.     int    ileft;    
  57.     
  58.     char   **suffixlist;    
  59.     int    suffixsize;    
  60.     char   **suffixcurr;    
  61.     int    suffixleft;    
  62.     
  63.     enum protocol protocol;   /* which protocol this connection speaks */    
  64.     enum network_transport transport; /* what transport is used by this connection */    
  65.     
  66.     /* data for UDP clients */    
  67.     int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */    
  68.     struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */    
  69.     socklen_t request_addr_size;    
  70.     unsigned char *hdrbuf; /* udp packet headers */    
  71.     int    hdrsize;   /* number of headers' worth of space is allocated */    
  72.     
  73.     bool   noreply;   /* True if the reply should not be sent. */    
  74.     /* current stats command */    
  75.     struct {    
  76.         char *buffer;    
  77.         size_t size;    
  78.         size_t offset;    
  79.     } stats;    
  80.     
  81.     /* Binary protocol stuff */    
  82.     /* This is where the binary header goes */    
  83.     protocol_binary_request_header binary_header;    
  84.     uint64_t cas; /* the cas to return */    
  85.     short cmd; /* current command being processed */    
  86.     int opaque;    
  87.     int keylen;    
  88.     conn   *next;     /* Used for generating a list of conn structures */    
  89.     LIBEVENT_THREAD *thread/* Pointer to the thread object serving this connection */    
  90. }; 

整体流程
1. 当客户端和Memcached建立TCP连接后,Memcached会基于Libevent的event事件来监听客户端是否有可以读取的数据。
2. 当客户端有命令数据报文上报的时候,就会触发drive_machine方法中的conn_read这个Case。
3. memcached通过try_read_network方法读取客户端的报文。如果读取失败,则返回conn_closing,去关闭客户端的连接;如果没有读取到任何数据,则会返回conn_waiting,继续等待客户端的事件到来,并且退出drive_machine的循环;如果数据读取成功,则会将状态转交给conn_parse_cmd处理,读取到的数据会存储在c->rbuf容器中。
4. conn_parse_cmd主要的工作就是用来解析命令。主要通过try_read_command这个方法来读取c->rbuf中的命令数据,通过\n来分隔数据报文的命令。如果c->buf内存块中的数据匹配不到\n,则返回继续等待客户端的命令数据报文到来conn_waiting;否则就会转交给process_command方法,来处理具体的命令(命令解析会通过\0符号来分隔)。
5. process_command主要用来处理具体的命令。其中tokenize_command这个方法非常重要,将命令拆解成多个元素(KEY的最大长度250)。例如我们以get命令为例,最终会跳转到process_get_command这个命令 process_*_command这一系列就是处理具体的命令逻辑的。
6. 我们进入process_get_command,当获取数据处理完毕之后,会转交到conn_mwrite这个状态。如果获取数据失败,则关闭连接。
7. 进入conn_mwrite后,主要是通过transmit方法来向客户端提交数据。如果写数据失败,则关闭连接或退出drive_machine循环;如果写入成功,则又转交到conn_new_cmd这个状态。
8. conn_new_cmd这个状态主要是处理c->rbuf中剩余的命令。主要看一下reset_cmd_handler这个方法,这个方法回去判断c->rbytes中是否还有剩余的报文没处理,如果未处理,则转交到conn_parse_cmd(第四部)继续解析剩余命令;如果已经处理了,则转交到conn_waiting,等待新的事件到来。在转交之前,每次都会执行一次conn_shrink方法。
9. conn_shrink方法主要用来处理命令报文容器c->rbuf和输出内容的容器是否数据满了?是否需要扩大buffer的大小,是否需要移动内存块。接受命令报文的初始化内存块大小2048,最大8192。



 

命令rbuf数据结构变化图:
1. 读取客户端的数据



 

2. 解析buf中的命令。如果遇到\n,则表明是一个命令语句的结尾标识符。



 

3. 命令拆分。命令解析出来之后,对命令进行分解,分解是通过空格来分离的。第一个参数一般为操作方法,第二个参数一般为KEY。



 

4. 内存块重设置。如果rbuf内存块使用空间不足,或者大于8k,则需要进行重新分配内存块
 


 

从drive_machine开始
我们上一节看到客户端连接的读写事件回调函数:event_handler,这个方法中最终调用的是drive_machine。

  1. void event_handler(const int fd, const short which, void *arg) {    
  2.     conn *c;    
  3.     //组装conn结构    
  4.     c = (conn *) arg;    
  5.     assert(c != NULL);    
  6.     
  7.     c->which = which;    
  8.     
  9.     /* sanity */    
  10.     if (fd != c->sfd) {    
  11.         if (settings.verbose > 0)    
  12.             fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");    
  13.         conn_close(c);    
  14.         return;    
  15.     }    
  16.     //最终转交给了drive_machine这个方法    
  17.     drive_machine(c);    
  18.     
  19.     /* wait for next event */    
  20.     return;    

drive_machine:
drive_machine这个方法中,都是通过c->state来判断需要处理的逻辑。
conn_listening:监听状态
conn_waiting:等待状态
conn_read:读取状态
conn_parse_cmd:命令行解析
conn_mwrite:向客户端写数据
conn_new_cmd:解析新的命令

  1. static void drive_machine(conn *c) {    
  2.     bool stop = false;    
  3.     int sfd;    
  4.     socklen_t addrlen;    
  5.     struct sockaddr_storage addr;    
  6.     int nreqs = settings.reqs_per_event;    
  7.     int res;    
  8.     const char *str;    
  9.     #ifdef HAVE_ACCEPT4    
  10.     static int use_accept4 = 1;    
  11.     #else    
  12.     static int use_accept4 = 0;    
  13.     #endif    
  14.     
  15.     assert(c != NULL);    
  16.     
  17.     while (!stop) {    
  18.     
  19.         switch (c->state) {    
  20.         case conn_listening:    
  21. //.......更多代码    

我们继续看一下conn_read、conn_wait和conn_parse_cmd状态的代码

  1. //这边是继续等待客户端的数据报文到来    
  2.     case conn_waiting:    
  3.         if (!update_event(c, EV_READ | EV_PERSIST)) {    
  4.             if (settings.verbose > 0)    
  5.                 fprintf(stderr, "Couldn't update event\n");    
  6.             conn_set_state(c, conn_closing);    
  7.             break;    
  8.         }    
  9.         //等待的过程中,将连接状态设置为读取状态,并且stop设置为true,退出while(stop)的循环    
  10.         conn_set_state(c, conn_read);    
  11.         stop = true;    
  12.         break;    
  13.     
  14.         //读取数据的事件,当客户端有数据报文上传的时候,就会触发libevent的读事件    
  15.     case conn_read:    
  16.         //try_read_network 主要读取TCP数据    
  17.         //返回try_read_result的枚举类型结构,通过这个枚举类型,来判断是否已经读取到数据,是否读取失败等情况    
  18.         res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);    
  19.     
  20.         switch (res) {    
  21.         //没有读取到数据,那么继续将事件设置为等待。    
  22.         //while(stop)会继续循环,去调用conn_waiting这个case    
  23.         case READ_NO_DATA_RECEIVED:    
  24.             conn_set_state(c, conn_waiting);    
  25.             break;    
  26.             //如果有数据读取到了,这个时候就需要调用conn_parse_cmd逻辑    
  27.             //conn_parse_cmd:主要用来解析读取到的命令    
  28.         case READ_DATA_RECEIVED:    
  29.             conn_set_state(c, conn_parse_cmd);    
  30.             break;    
  31.             //读取失败的状态,则直接调用conn_closing 关闭客户端的连接    
  32.         case READ_ERROR:    
  33.             conn_set_state(c, conn_closing);    
  34.             break;    
  35.         case READ_MEMORY_ERROR: /* Failed to allocate more memory */    
  36.             /* State already set by try_read_network */    
  37.             break;    
  38.         }    
  39.         break;    
  40.     
  41.         //这边是解析Memcached的客户端命令,例如解析:set username zhuli    
  42.     case conn_parse_cmd:    
  43.         //try_read_command方法很关键,用来读取命令    
  44.         //如果这个方法返回为0,则表示解析命令失败(因为TCP粘包拆包的原因,可能命令不完整,需要继续等待数据到来)    
  45.         if (try_read_command(c) == 0) {    
  46.             /* wee need more data! */    
  47.             //这边的注释貌似写错误了吧,应该是we need more data!    
  48.             conn_set_state(c, conn_waiting);    
  49.         }    
  50.     
  51.         break

try_read_network
这个方法主要是读取TCP网络数据。读取到的数据会放进c->rbuf的buf中。
如果buf没有空间存储更多数据的时候,就会触发内存块的重新分配。重新分配,memcached限制了4次,估计是担忧客户端的而已攻击导致存储命令行数据报文的buf不断的ralloc。

  1. //这个方法是通过TCP的方式读取客户端传递过来的命令数据    
  2. static enum try_read_result try_read_network(conn *c) {    
  3.     //这个方法会最终返回try_read_result的枚举类型    
  4.     //默认设置READ_NO_DATA_RECEIVED:没有接受到数据    
  5.     enum try_read_result gotdata = READ_NO_DATA_RECEIVED;    
  6.     int res;    
  7.     int num_allocs = 0;    
  8.     assert(c != NULL);    
  9.     
  10.     //c->rcurr 存放未解析命令内容指针   c->rbytes 还有多少没解析过的数据    
  11.     //c->rbuf 用于读取命令的buf,存储命令字符串的指针  c->rsize rbuf的size    
  12.         //这边每次都会将前一次剩余的命令报文,移动到c->rbuf的头部。    
  13.         if (c->rcurr != c->rbuf) {    
  14.         if (c->rbytes != 0) /* otherwise there's nothing to copy */    
  15.             memmove(c->rbuf, c->rcurr, c->rbytes);    
  16.         c->rcurr = c->rbuf;    
  17.     }    
  18.     //循环从fd中读取数据    
  19.     while (1) {    
  20.         //如果buf满了,则需要重新分配一块更大的内存    
  21.         //当未解析的数据size 大于等于 buf块的size,则需要重新分配    
  22.         if (c->rbytes >= c->rsize) {    
  23.             //最多分配4次    
  24.             if (num_allocs == 4) {    
  25.                 return gotdata;    
  26.             }    
  27.             ++num_allocs;    
  28.             //从新分配一块新的内存块,内存大小为rsize的两倍    
  29.             char *new_rbuf = realloc(c->rbuf, c->rsize * 2);    
  30.             if (!new_rbuf) {    
  31.                 STATS_LOCK();    
  32.                 stats.malloc_fails++;    
  33.                 STATS_UNLOCK();    
  34.                 if (settings.verbose > 0) {    
  35.                     fprintf(stderr, "Couldn't realloc input buffer\n");    
  36.                 }    
  37.                 c->rbytes = 0; /* ignore what we read */    
  38.                 out_of_memory(c, "SERVER_ERROR out of memory reading request");    
  39.                 c->write_and_go = conn_closing;    
  40.                 return READ_MEMORY_ERROR;    
  41.             }    
  42.             //c->rcurr和c->rbuf指向到新的buf块    
  43.             c->rcurr = c->rbuf = new_rbuf;    
  44.             c->rsize *= 2; //rsize则乘以2    
  45.         }    
  46.     
  47.         //avail可以计算出buf块中剩余的空间多大    
  48.         int avail = c->rsize - c->rbytes;    
  49.     
  50.         //这边我们可以看到Socket的读取方法    
  51.         //c->sfd为Socket的ID    
  52.         //c->rbuf + c->rbytes 意思是从buf块中空余的内存地址开始存放新读取到的数据    
  53.         //avail 每次接收最大能读取多大的数据    
  54.         res = read(c->sfd, c->rbuf + c->rbytes, avail);    
  55.     
  56.         //如果接受到的结果res大于0,则说明Socket中读取到了数据    
  57.         //设置成READ_DATA_RECEIVED枚举类型,表明读取到了数据    
  58.         if (res > 0) {    
  59.             pthread_mutex_lock(&c->thread->stats.mutex); //线程锁    
  60.             c->thread->stats.bytes_read += res;    
  61.             pthread_mutex_unlock(&c->thread->stats.mutex);    
  62.             gotdata = READ_DATA_RECEIVED;    
  63.             c->rbytes += res; //未处理的数据量 + 当前读取到的命令size    
  64.             if (res == avail) {    
  65.                 continue;    
  66.             } else {    
  67.                 break;    
  68.             }    
  69.         }    
  70.         //判断读取失败的两种情况    
  71.         if (res == 0) {    
  72.             return READ_ERROR;    
  73.         }    
  74.         if (res == -1) {    
  75.             if (errno == EAGAIN || errno == EWOULDBLOCK) {    
  76.                 break;    
  77.             }    
  78.             return READ_ERROR;    
  79.         }    
  80.     }    
  81.     return gotdata;    

try_read_command
这个方法主要是用来读取rbuf中的命令的。
例如命令:set username zhuli\r\n get username \n
则会通过\n这个换行符来分隔数据报文中的命令。因为数据报文会有粘包和拆包的特性,所以只有等到命令行完整了才能进行解析。所有只有匹配到了\n符号,才能匹配一个完整的命令。

  1. //如果我们已经在c->rbuf中有可以处理的命令行了,则就可以调用此函数来处理命令解析    
  2. static int try_read_command(conn *c) {    
  3.     assert(c != NULL);    
  4.     assert(c->rcurr <= (c->rbuf + c->rsize)); //这边断言    
  5.     assert(c->rbytes > 0);    
  6.     
  7.     if (c->protocol == negotiating_prot || c->transport == udp_transport) {    
  8.         if ((unsigned char) c->rbuf[0] == (unsigned char) PROTOCOL_BINARY_REQ) {    
  9.             c->protocol = binary_prot;    
  10.         } else {    
  11.             c->protocol = ascii_prot;    
  12.         }    
  13.     
  14.         if (settings.verbose > 1) {    
  15.             fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,    
  16.                     prot_text(c->protocol));    
  17.         }    
  18.     }    
  19.     //有两种模式,是否是二进制模式还是ascii模式    
  20.     if (c->protocol == binary_prot) {    
  21.         //更多代码    
  22.     } else {    
  23.         //这边主要处理非二进制模式的命令解析    
  24.         char *el, *cont;    
  25.     
  26.         //如果c->rbytes==0 表示buf容器中没有可以处理的命令报文,则返回0    
  27.         //0 是让程序继续等待接收新的客户端报文    
  28.         if (c->rbytes == 0)    
  29.             return 0;    
  30.     
  31.         //查找命令中是否有\n,memcache的命令通过\n来分割    
  32.         //当客户端的数据报文过来的时候,Memcached通过查找接收到的数据中是否有\n换行符来判断收到的命令数据包是否完整    
  33.         //例如命令:set username 10234344 \n get username \n    
  34.         //这个命令就可以分割成两个命令,分别是set和get的命令    
  35.         //el返回\n的字符指针地址    
  36.         el = memchr(c->rcurr, '\n', c->rbytes);    
  37.     
  38.         //如果没有找到\n,说明命令不完整,则返回0,继续等待接收新的客户端数据报文    
  39.         if (!el) {    
  40.             //c->rbytes是接收到的数据包的长度    
  41.             //这边非常有趣,如果一次接收的数据报文大于了1K,则Memcached回去判断这个请求是否太大了,是否有问题?    
  42.             //然后会关闭这个客户端的链接    
  43.             if (c->rbytes > 1024) {    
  44.                 /* 
  45.                  * We didn't have a '\n' in the first k. This _has_ to be a   
  46.                  * large multiget, if not we should just nuke the connection.   
  47.                  */    
  48.                 char *ptr = c->rcurr;    
  49.                 while (*ptr == ' ') { /* ignore leading whitespaces */    
  50.                     ++ptr;    
  51.                 }    
  52.     
  53.                 if (ptr - c->rcurr > 100    
  54.                         || (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {    
  55.     
  56.                     conn_set_state(c, conn_closing);    
  57.                     return 1;    
  58.                 }    
  59.             }    
  60.     
  61.             return 0;    
  62.         }    
  63.         //如果找到了\n,说明c->rcurr中有完整的命令了    
  64.         cont = el + 1; //下一个命令开始的指针节点    
  65.         //这边判断是否是\r\n,如果是\r\n,则el往前移一位    
  66.         if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {    
  67.             el--;    
  68.         }    
  69.         //然后将命令的最后一个字符用 \0(字符串结束符号)来分隔    
  70.         *el = '\0';    
  71.     
  72.         assert(cont <= (c->rcurr + c->rbytes));    
  73.     
  74.         c->last_cmd_time = current_time; //最后命令时间    
  75.         //处理命令,c->rcurr就是命令    
  76.         process_command(c, c->rcurr);    
  77.     
  78.         c->rbytes -= (cont - c->rcurr); //这个地方为何不这样写?c->rbytes = c->rcurr - cont    
  79.         c->rcurr = cont; //将c->rcurr指向到下一个命令的指针节点    
  80.     
  81.         assert(c->rcurr <= (c->rbuf + c->rsize));    
  82.     }    
  83.     
  84.     return 1;    

process_command
这个方法主要用来处理具体的命令。将命令分解后,分发到不同的具体操作中去。

  1. //命令处理函数    
  2. //前一个方法中,我们找到了rbuf中\n的字符,然后将其替换成\0    
  3. static void process_command(conn *c, char *command) {    
  4.     
  5.     //tokens结构,这边会将c->rcurr(command)命令拆分出来    
  6.     //并且将命令通过空格符号来分隔成多个元素    
  7.     //例如:set username zhuli,则会拆分成3个元素,分别是set和username和zhuli    
  8.     //MAX_TOKENS最大值为8,说明memcached的命令行,最多可以拆分成8个元素    
  9.     token_t tokens[MAX_TOKENS];    
  10.     size_t ntokens;    
  11.     int comm;    
  12.     
  13.     assert(c != NULL);    
  14.     
  15.     MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);    
  16.     
  17.     if (settings.verbose > 1)    
  18.         fprintf(stderr, "<%d %s\n", c->sfd, command);    
  19.     
  20.     /* 
  21.      * for commands set/add/replace, we build an item and read the data   
  22.      * directly into it, then continue in nread_complete().   
  23.      */    
  24.     
  25.     c->msgcurr = 0;    
  26.     c->msgused = 0;    
  27.     c->iovused = 0;    
  28.     if (add_msghdr(c) != 0) {    
  29.         out_of_memory(c, "SERVER_ERROR out of memory preparing response");    
  30.         return;    
  31.     }    
  32.     
  33.     //tokenize_command非常重要,主要就是拆分命令的    
  34.     //并且将拆分出来的命令元素放进tokens的数组中    
  35.     //参数:command为命令    
  36.     ntokens = tokenize_command(command, tokens, MAX_TOKENS);    
  37.     
  38.     //tokens[COMMAND_TOKEN] COMMAND_TOKEN=0    
  39.     //分解出来的命令的第一个参数为操作方法    
  40.     if (ntokens >= 3    
  41.             && ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0)    
  42.                     || (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {    
  43.     
  44.         //处理get命令    
  45.         process_get_command(c, tokens, ntokens, false);    
  46.     
  47.     } else if ((ntokens == 6 || ntokens == 7)    
  48.             && ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm =    
  49.                     NREAD_ADD))    
  50.                     || (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0    
  51.                             && (comm = NREAD_SET))    
  52.                     || (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0    
  53.                             && (comm = NREAD_REPLACE))    
  54.                     || (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0    
  55.                             && (comm = NREAD_PREPEND))    
  56.                     || (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0    
  57.                             && (comm = NREAD_APPEND)))) {    
  58.     
  59.         //处理更新命令    
  60.         process_update_command(c, tokens, ntokens, comm, false);    
  61. //更多代码....    
  62. }  

tokenize_command:
这个方法主要用于分解命令。具体是将一个命令语句分解成多个元素。
例如:set username zhuli\n
则会分解成三个元素:set和username和zhuli这三个元素。

  1. //拆分命令方法    
  2. static size_t tokenize_command(char *command, token_t *tokens,    
  3.         const size_t max_tokens) {    
  4.     char *s, *e;    
  5.     size_t ntokens = 0; //命令参数游标    
  6.     size_t len = strlen(command); //命令长度    
  7.     unsigned int i = 0;    
  8.     
  9.     assert(command != NULL && tokens != NULL && max_tokens > 1);    
  10.     
  11.     s = e = command;    
  12.     for (i = 0; i < len; i++) {    
  13.         //指针不停往前走,如果遇到空格,则会停下来,将命令元素拆分出来,放进tokens这个数组中    
  14.         if (*e == ' ') {    
  15.             if (s != e) {    
  16.                 tokens[ntokens].value = s;    
  17.                 tokens[ntokens].length = e - s;    
  18.                 ntokens++;    
  19.                 //这边将空格替换成\0    
  20.                 //Memcached这边的代码写的非常的好,这边的命令进行切割的时候,并没有将内存块进行拷贝,而是在原来的内存块上进行切割    
  21.                 *e = '\0';    
  22.                 //最多8个元素    
  23.                 if (ntokens == max_tokens - 1) {    
  24.                     e++;    
  25.                     s = e; /* so we don't add an extra token */    
  26.                     break;    
  27.                 }    
  28.             }    
  29.             s = e + 1;    
  30.         }    
  31.         e++;    
  32.     }    
  33.     
  34.     if (s != e) {    
  35.         tokens[ntokens].value = s;    
  36.         tokens[ntokens].length = e - s;    
  37.         ntokens++;    
  38.     }    
  39.     
  40.     /* 
  41.      * If we scanned the whole string, the terminal value pointer is null,   
  42.      * otherwise it is the first unprocessed character.   
  43.      */    
  44.     tokens[ntokens].value = *e == '\0' ? NULL : e;    
  45.     tokens[ntokens].length = 0;    
  46.     ntokens++;    
  47.     //返回值为参数个数,例如分解出3个元素,则返回3    
  48.     return ntokens;    

process_get_command
get的命令例子。get请求拿到Memcached Item中的数据后,又会跳转到conn_mwrite这个状态,将进入向客户端写数据的状态。

  1. //处理GET请求的命令    
  2. static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,    
  3.         bool return_cas) {    
  4.     //处理GET命令    
  5.     char *key;    
  6.     size_t nkey;    
  7.     int i = 0;    
  8.     item *it;    
  9.     //&tokens[0] 是操作的方法    
  10.     //&tokens[1] 为key    
  11.     //token_t 存储了value和length    
  12.     token_t *key_token = &tokens[KEY_TOKEN];    
  13.     char *suffix;    
  14.     assert(c != NULL);    
  15.     
  16.     do {    
  17.         //如果key的长度不为0    
  18.         while (key_token->length != 0) {    
  19.     
  20.             key = key_token->value;    
  21.             nkey = key_token->length;    
  22.     
  23.             //判断key的长度是否超过了最大的长度,memcache key的最大长度为250    
  24.             //这个地方需要非常注意,我们在平常的使用中,还是要注意key的字节长度的    
  25.             if (nkey > KEY_MAX_LENGTH) {    
  26.                 //out_string 向外部输出数据    
  27.                 out_string(c, "CLIENT_ERROR bad command line format");    
  28.                 while (i-- > 0) {    
  29.                     item_remove(*(c->ilist + i));    
  30.                 }    
  31.                 return;    
  32.             }    
  33.             //这边是从Memcached的内存存储快中去取数据    
  34.             it = item_get(key, nkey);    
  35.             if (settings.detail_enabled) {    
  36.                 //状态记录,key的记录数的方法    
  37.                 stats_prefix_record_get(key, nkey, NULL != it);    
  38.             }    
  39.             //如果获取到了数据    
  40.             if (it) {    
  41.                 //c->ilist 存放用于向外部写数据的buf    
  42.                 //如果ilist太小,则重新分配一块内存    
  43.                 if (i >= c->isize) {    
  44.                     item **new_list = realloc(c->ilist,    
  45.                             sizeof(item *) * c->isize * 2);    
  46.                     if (new_list) {    
  47.                         c->isize *= 2;    
  48.                         c->ilist = new_list;    
  49.                     } else {    
  50.                         STATS_LOCK();    
  51.                         stats.malloc_fails++;    
  52.                         STATS_UNLOCK();    
  53.                         item_remove(it);    
  54.                         break;    
  55.                     }    
  56.                 }    
  57.     
  58.                 /* 
  59.                  * Construct the response. Each hit adds three elements to the   
  60.                  * outgoing data list:   
  61.                  *   "VALUE "   
  62.                  *   key   
  63.                  *   " " + flags + " " + data length + "\r\n" + data (with \r\n)   
  64.                  */    
  65.                 //初始化返回出去的数据结构    
  66.                 if (return_cas) {    
  67.                     MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,    
  68.                             it->nbytes, ITEM_get_cas(it));    
  69.                     /* Goofy mid-flight realloc. */    
  70.                     if (i >= c->suffixsize) {    
  71.                         char **new_suffix_list = realloc(c->suffixlist,    
  72.                                 sizeof(char *) * c->suffixsize * 2);    
  73.                         if (new_suffix_list) {    
  74.                             c->suffixsize *= 2;    
  75.                             c->suffixlist = new_suffix_list;    
  76.                         } else {    
  77.                             STATS_LOCK();    
  78.                             stats.malloc_fails++;    
  79.                             STATS_UNLOCK();    
  80.                             item_remove(it);    
  81.                             break;    
  82.                         }    
  83.                     }    
  84.     
  85.                     suffix = cache_alloc(c->thread->suffix_cache);    
  86.                     if (suffix == NULL) {    
  87.                         STATS_LOCK();    
  88.                         stats.malloc_fails++;    
  89.                         STATS_UNLOCK();    
  90.                         out_of_memory(c,    
  91.                                 "SERVER_ERROR out of memory making CAS suffix");    
  92.                         item_remove(it);    
  93.                         while (i-- > 0) {    
  94.                             item_remove(*(c->ilist + i));    
  95.                         }    
  96.                         return;    
  97.                     }    
  98.                     *(c->suffixlist + i) = suffix;    
  99.                     int suffix_len = snprintf(suffix, SUFFIX_SIZE, " %llu\r\n",    
  100.                             (unsigned long long) ITEM_get_cas(it));    
  101.                     if (add_iov(c, "VALUE ", 6) != 0    
  102.                             || add_iov(c, ITEM_key(it), it->nkey) != 0    
  103.                             || add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0    
  104.                             || add_iov(c, suffix, suffix_len) != 0    
  105.                             || add_iov(c, ITEM_data(it), it->nbytes) != 0) {    
  106.                         item_remove(it);    
  107.                         break;    
  108.                     }    
  109.                 } else {    
  110.                     MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,    
  111.                             it->nbytes, ITEM_get_cas(it));    
  112.                     if (add_iov(c, "VALUE ", 6) != 0    
  113.                             || add_iov(c, ITEM_key(it), it->nkey) != 0    
  114.                             || add_iov(c, ITEM_suffix(it),    
  115.                                     it->nsuffix + it->nbytes) != 0) {    
  116.                         item_remove(it);    
  117.                         break;    
  118.                     }    
  119.                 }    
  120.     
  121.                 if (settings.verbose > 1) {    
  122.                     int ii;    
  123.                     fprintf(stderr, ">%d sending key ", c->sfd);    
  124.                     for (ii = 0; ii < it->nkey; ++ii) {    
  125.                         fprintf(stderr, "%c", key[ii]);    
  126.                     }    
  127.                     fprintf(stderr, "\n");    
  128.                 }    
  129.     
  130.                 /* item_get() has incremented it->refcount for us */    
  131.                 pthread_mutex_lock(&c->thread->stats.mutex);    
  132.                 c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;    
  133.                 c->thread->stats.get_cmds++;    
  134.                 pthread_mutex_unlock(&c->thread->stats.mutex);    
  135.                 item_update(it);    
  136.                 *(c->ilist + i) = it;    
  137.                 i++;    
  138.     
  139.             } else {    
  140.                 pthread_mutex_lock(&c->thread->stats.mutex);    
  141.                 c->thread->stats.get_misses++;    
  142.                 c->thread->stats.get_cmds++;    
  143.                 pthread_mutex_unlock(&c->thread->stats.mutex);    
  144.                 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);    
  145.             }    
  146.     
  147.             key_token++;    
  148.         }    
  149.     
  150.         /* 
  151.          * If the command string hasn't been fully processed, get the next set   
  152.          * of tokens.   
  153.          */    
  154.         //如果命令行中的命令没有全部被处理,则继续下一个命令    
  155.         //一个命令行中,可以get多个元素    
  156.         if (key_token->value != NULL) {    
  157.             ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);    
  158.             key_token = tokens;    
  159.         }    
  160.     
  161.     } while (key_token->value != NULL);    
  162.     
  163.     c->icurr = c->ilist;    
  164.     c->ileft = i;    
  165.     if (return_cas) {    
  166.         c->suffixcurr = c->suffixlist;    
  167.         c->suffixleft = i;    
  168.     }    
  169.     
  170.     if (settings.verbose > 1)    
  171.         fprintf(stderr, ">%d END\n", c->sfd);    
  172.     
  173.     /* 
  174.      If the loop was terminated because of out-of-memory, it is not   
  175.      reliable to add END\r\n to the buffer, because it might not end   
  176.      in \r\n. So we send SERVER_ERROR instead.   
  177.      */    
  178.     if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0    
  179.             || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {    
  180.         out_of_memory(c, "SERVER_ERROR out of memory writing get response");    
  181.     } else {    
  182.         //将状态修改为写,这边读取到item的数据后,又开始需要往客户端写数据了。    
  183.         conn_set_state(c, conn_mwrite);    
  184.         c->msgcurr = 0;    
  185.     }    

conn_mwrite和transmit
主要用于向客户端写数据。写完数据后,如果写失败,则关闭连接;如果写成功,则会将状态修改成conn_new_cmd,继续解析c->rbuf中剩余的命令

 
  1. //drive_machine方法    
  2.          //这个conn_mwrite是向客户端写数据    
  3.          case conn_mwrite:    
  4.              if (IS_UDP(c->transport) && c->msgcurr == 0    
  5.                      && build_udp_headers(c) != 0) {    
  6.                  if (settings.verbose > 0)    
  7.                      fprintf(stderr, "Failed to build UDP headers\n");    
  8.                  conn_set_state(c, conn_closing);    
  9.                  break;    
  10.              }    
  11.              //transmit这个方法非常重要,主要向客户端写数据的操作都在这个方法中进行    
  12.              //返回transmit_result枚举类型,用于判断是否写成功,如果失败,则关闭连接    
  13.              switch (transmit(c)) {    
  14.      
  15.              //如果向客户端发送数据成功    
  16.              case TRANSMIT_COMPLETE:    
  17.                  if (c->state == conn_mwrite) {    
  18.                      conn_release_items(c);    
  19.                      /* XXX:  I don't know why this wasn't the general case */    
  20.                      if (c->protocol == binary_prot) {    
  21.                          conn_set_state(c, c->write_and_go);    
  22.                      } else {    
  23.                          //这边是TCP的状态    
  24.                          //状态又会切回到conn_new_cmd这个状态    
  25.                          //conn_new_cmd主要是继续解析c->rbuf容器中剩余的命令参数    
  26.                          conn_set_state(c, conn_new_cmd);    
  27.                      }    
  28.                  } else if (c->state == conn_write) {    
  29.                      if (c->write_and_free) {    
  30.                          free(c->write_and_free);    
  31.                          c->write_and_free = 0;    
  32.                      }    
  33.                      conn_set_state(c, c->write_and_go);    
  34.                  } else {    
  35.                      if (settings.verbose > 0)    
  36.                          fprintf(stderr, "Unexpected state %d\n", c->state);    
  37.                      conn_set_state(c, conn_closing);    
  38.                  }    
  39.                  break;    
  40.      
  41.              case TRANSMIT_INCOMPLETE:    
  42.              case TRANSMIT_HARD_ERROR:    
  43.                  break/* Continue in state machine. */    
  44.              //失败的情况    
  45.              case TRANSMIT_SOFT_ERROR:    
  46.                  stop = true;    
  47.                  break;    
  48.              }    
  49.              break;   

这个方法主要是向客户端发送数据。

  1. //这个方法主要向客户端写数据    
  2. static enum transmit_result transmit(conn *c) {    
  3.     assert(c != NULL);    
  4.     
  5.     if (c->msgcurr < c->msgused && c->msglist[c->msgcurr].msg_iovlen == 0) {    
  6.         /* Finished writing the current msg; advance to the next. */    
  7.         c->msgcurr++;    
  8.     }    
  9.     if (c->msgcurr < c->msgused) {    
  10.         ssize_t res;    
  11.         //msghdr 发送数据的结构    
  12.         struct msghdr *m = &c->msglist[c->msgcurr];    
  13.         //sendmsg 发送数据方法    
  14.         res = sendmsg(c->sfd, m, 0);    
  15.         //发送成功    
  16.         if (res > 0) {    
  17.             pthread_mutex_lock(&c->thread->stats.mutex);    
  18.             c->thread->stats.bytes_written += res;    
  19.             pthread_mutex_unlock(&c->thread->stats.mutex);    
  20.     
  21.             /* We've written some of the data. Remove the completed 
  22.              iovec entries from the list of pending writes. */    
  23.             while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {    
  24.                 res -= m->msg_iov->iov_len;    
  25.                 m->msg_iovlen--;    
  26.                 m->msg_iov++;    
  27.             }    
  28.     
  29.             /* Might have written just part of the last iovec entry; 
  30.              adjust it so the next write will do the rest. */    
  31.             if (res > 0) {    
  32.                 m->msg_iov->iov_base = (caddr_t) m->msg_iov->iov_base + res;    
  33.                 m->msg_iov->iov_len -= res;    
  34.             }    
  35.             return TRANSMIT_INCOMPLETE;    
  36.         }    
  37.         if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {    
  38.             if (!update_event(c, EV_WRITE | EV_PERSIST)) {    
  39.                 if (settings.verbose > 0)    
  40.                     fprintf(stderr, "Couldn't update event\n");    
  41.                 conn_set_state(c, conn_closing);    
  42.                 return TRANSMIT_HARD_ERROR;    
  43.             }    
  44.             return TRANSMIT_SOFT_ERROR;    
  45.         }    
  46.         /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK, 
  47.          we have a real error, on which we close the connection */    
  48.         if (settings.verbose > 0)    
  49.             perror("Failed to write, and not due to blocking");    
  50.     
  51.         if (IS_UDP(c->transport))    
  52.             conn_set_state(c, conn_read);    
  53.         else    
  54.             conn_set_state(c, conn_closing);    
  55.         return TRANSMIT_HARD_ERROR;    
  56.     } else {    
  57.         return TRANSMIT_COMPLETE;    
  58.     }    
  59. }  

conn_new_cmd和reset_cmd_handler
继续解析c->rbuf中剩余的命令。

  1. //处理c->rbuf中剩余的命令    
  2.         case conn_new_cmd:    
  3.             /* Only process nreqs at a time to avoid starving other 
  4.              connections */    
  5.     
  6.             --nreqs;    
  7.             if (nreqs >= 0) {    
  8.                 reset_cmd_handler(c); //会跳转到reset_cmd_handler这个方法    
  9.             } else {    
  10.                 pthread_mutex_lock(&c->thread->stats.mutex);    
  11.                 c->thread->stats.conn_yields++;    
  12.                 pthread_mutex_unlock(&c->thread->stats.mutex);    
  13.                 if (c->rbytes > 0) {    
  14.                     /* We have already read in data into the input buffer, 
  15.                      so libevent will most likely not signal read events   
  16.                      on the socket (unless more data is available. As a   
  17.                      hack we should just put in a request to write data,   
  18.                      because that should be possible ;-)   
  19.                      */    
  20.                     if (!update_event(c, EV_WRITE | EV_PERSIST)) {    
  21.                         if (settings.verbose > 0)    
  22.                             fprintf(stderr, "Couldn't update event\n");    
  23.                         conn_set_state(c, conn_closing);    
  24.                         break;    
  25.                     }    
  26.                 }    
  27.                 stop = true;    
  28.             }    
  29.             break;//重新设置命令handler    
  30. static void reset_cmd_handler(conn *c) {    
  31.     c->cmd = -1;    
  32.     c->substate = bin_no_state;    
  33.     if (c->item != NULL) {    
  34.         item_remove(c->item);    
  35.         c->item = NULL;    
  36.     }    
  37.     conn_shrink(c); //这个方法是检查c->rbuf容器的大小    
  38.     //如果剩余未解析的命令 > 0的话,继续跳转到conn_parse_cmd解析命令    
  39.     if (c->rbytes > 0) {    
  40.         conn_set_state(c, conn_parse_cmd);    
  41.     } else {    
  42.         //如果命令都解析完成了,则继续等待新的数据到来    
  43.         conn_set_state(c, conn_waiting);    
  44.     }    
  45. }  

conn_shrink
这个方法主要检查命令行容器的大小。

  1. //检查rbuf的大小    
  2. static void conn_shrink(conn *c) {    
  3.     assert(c != NULL);    
  4.     
  5.     if (IS_UDP(c->transport))    
  6.         return;    
  7.     
  8.     //如果bufsize大于READ_BUFFER_HIGHWAT(8192)的时候需要重新处理    
  9.     //DATA_BUFFER_SIZE等于2048,所以我们可以看到之前的代码中对rbuf最多只能进行4次recalloc    
  10.     if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {    
  11.         char *newbuf;    
  12.     
  13.         if (c->rcurr != c->rbuf)    
  14.             memmove(c->rbuf, c->rcurr, (size_t) c->rbytes); //内存移动    
  15.     
  16.         newbuf = (char *) realloc((void *) c->rbuf, DATA_BUFFER_SIZE);    
  17.     
  18.         if (newbuf) {    
  19.             c->rbuf = newbuf;    
  20.             c->rsize = DATA_BUFFER_SIZE;    
  21.         }    
  22.         /* TODO check other branch... */    
  23.         c->rcurr = c->rbuf;    
  24.     }    
  25.     
  26.     if (c->isize > ITEM_LIST_HIGHWAT) {    
  27.         item **newbuf = (item**) realloc((void *) c->ilist,    
  28.                 ITEM_LIST_INITIAL * sizeof(c->ilist[0]));    
  29.         if (newbuf) {    
  30.             c->ilist = newbuf;    
  31.             c->isize = ITEM_LIST_INITIAL;    
  32.         }    
  33.         /* TODO check error condition? */    
  34.     }    
  35.     
  36.     if (c->msgsize > MSG_LIST_HIGHWAT) {    
  37.         struct msghdr *newbuf = (struct msghdr *) realloc((void *) c->msglist,    
  38.                 MSG_LIST_INITIAL * sizeof(c->msglist[0]));    
  39.         if (newbuf) {    
  40.             c->msglist = newbuf;    
  41.             c->msgsize = MSG_LIST_INITIAL;    
  42.         }    
  43.         /* TODO check error condition? */    
  44.     }    
  45.     
  46.     if (c->iovsize > IOV_LIST_HIGHWAT) {    
  47.         struct iovec *newbuf = (struct iovec *) realloc((void *) c->iov,    
  48.                 IOV_LIST_INITIAL * sizeof(c->iov[0]));    
  49.         if (newbuf) {    
  50.             c->iov = newbuf;    
  51.             c->iovsize = IOV_LIST_INITIAL;    
  52.         }    
  53.         /* TODO check return value */    
  54.     }    

相关文章