DPDK中的多线程模型

最近在做dpdk相关的开发和调优,有点小压力,但还是能成长的,打算用两篇博客来介绍下dpdk中的多线程模型,然后下一篇介绍它里面的一些优化方法和NUMA架构[高性能的原因]。
主要是截取相关的源代码进行分析,以及开发过程中可能会遇到的一些坑[一些点参考书《深入dpdk》]。

这里每个线程运行在一个核上,在启动时就绑定了,防止上下文切换等性能开销。从helloworld例子开始吧〜

比如启动参数为-c 0xff,在main入口处,会调用rte_eal_init初始化运行相关的参数,其中对启动线程相关的参数分析eal_parse_coremask,主要实现如下:

 274 static int
 275 eal_parse_coremask(const char *coremask)
 276 {
 277     struct rte_config *cfg = rte_eal_get_configuration();
 278     int i, j, idx = 0;
 279     unsigned count = 0;
 280     char c;
 281     int val;
 282 
 283     if (coremask == NULL)
 284         return -1;
 285     /* Remove all blank characters ahead and after .
 286      * Remove 0x/0X if exists.
 287      */
 288     while (isblank(*coremask))
 289         coremask++;
 290     if (coremask[0] == '0' && ((coremask[1] == 'x')
 291         || (coremask[1] == 'X')))
 292         coremask += 2;
 293     i = strlen(coremask);
 294     while ((i > 0) && isblank(coremask[i - 1]))
 295         i--;
 296     if (i == 0)
 297         return -1;
 299     for (i = i - 1; i >= 0 && idx < RTE_MAX_LCORE; i--) {
 300         c = coremask[i];
 301         if (isxdigit(c) == 0) {
 302             /* invalid characters */
 303             return -1;
 304         }
 305         val = xdigit2val(c);
 306         for (j = 0; j < BITS_PER_HEX && idx < RTE_MAX_LCORE; j++, idx++)
 307         {   
 308             if ((1 << j) & val) {
 309                 if (!lcore_config[idx].detected) {
 310                     RTE_LOG(ERR, EAL, "lcore %u "
 311                             "unavailable\n", idx);
 312                     return -1;
 313                 }
 314                 cfg->lcore_role[idx] = ROLE_RTE;
 315                 lcore_config[idx].core_index = count;
 316                 count++;
 317             } else {
 318                 cfg->lcore_role[idx] = ROLE_OFF;
 319                 lcore_config[idx].core_index = -1;
 320             }
 321         }
 322     }
 323     for (; i >= 0; i--)
 324         if (coremask[i] != '0')
 325             return -1;
 326     for (; idx < RTE_MAX_LCORE; idx++) {
 327         cfg->lcore_role[idx] = ROLE_OFF;
 328         lcore_config[idx].core_index = -1;
 329     }
 330     if (count == 0)
 331         return -1;
 332     /* Update the count of enabled logical cores of the EAL configuration */
 333     cfg->lcore_count = count;
 334     return 0;
 335 }

 79 struct rte_config {
 80     uint32_t master_lcore;       /**< Id of the master lcore */
 81     uint32_t lcore_count;        /**< Number of available logical cores. */
 82     enum rte_lcore_role_t lcore_role[RTE_MAX_LCORE]; /**< State of cores. */
 83 
 84     /** Primary or secondary configuration */
 85     enum rte_proc_type_t process_type;
 86 
 87     /**  
 88      * Pointer to memory configuration, which may be shared across multiple
 89      * DPDK instances
 90      */
 91     struct rte_mem_config *mem_config;
 92 } __attribute__((__packed__));

 63 struct lcore_config {
 64     unsigned detected;         /**< true if lcore was detected */
 65     pthread_t thread_id;       /**< pthread identifier */
 66     int pipe_master2slave[2];  /**< communication pipe with master */
 67     int pipe_slave2master[2];  /**< communication pipe with master */
 68     lcore_function_t * volatile f;         /**< function to call */
 69     void * volatile arg;       /**< argument of function */
 70     volatile int ret;          /**< return value of function */
 71     volatile enum rte_lcore_state_t state; /**< lcore state */
 72     unsigned socket_id;        /**< physical socket id for this lcore */
 73     unsigned core_id;          /**< core number on socket for this lcore */
 74     int core_index;            /**< relative index, starting from 0 */
 75     rte_cpuset_t cpuset;       /**< cpu set which the lcore affinity to */
 76 };

行288~297是跳过启动参数-c 0xff中-c和0xff之间的空白符,跳过0x,并跳过ff右边的空白符,即执行完此语句后只剩下ff;
行299~322是对ff从右往左依次判断是否是十六进制数字,然后再转换成int型,比如f对应的int的二进制为000...1111,如果相应的位为1则执行cfg->lcore_role[idx] = ROLE_RTE[在core_config[idx].detected为true的情况下],否则cfg->lcore_role[idx] = ROLE_OFF
行323~334分别对其它位置的参数检测,因为RTE_MAX_LCORE在此源码中为32,跳出行299的循环要么是return -1,要么是idx < RTE_MAX_LCORE,如果输入的参数比如0x444ffffff是有问题的,再如0x00000000ffffff则合法,并对未设置的核进行cfg->lcore_role[idx] = ROLE_OFF
struct lcore_config是核的配置结构数据,而struct rte_config是全局rte配置结构数据;

 53 int
 54 rte_eal_cpu_init(void)
 55 {
 56     /* pointer to global configuration */
 57     struct rte_config *config = rte_eal_get_configuration();
 58     unsigned lcore_id;
 59     unsigned count = 0;
 60 
 61     /*
 62      * Parse the maximum set of logical cores, detect the subset of running
 63      * ones and enable them by default.
 64      */
 65     for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
 66         lcore_config[lcore_id].core_index = count;
 67 
 68         /* init cpuset for per lcore config */
 69         CPU_ZERO(&lcore_config[lcore_id].cpuset);
 70 
 71         /* in 1:1 mapping, record related cpu detected state */
 72         lcore_config[lcore_id].detected = eal_cpu_detected(lcore_id);
 73         if (lcore_config[lcore_id].detected == 0) {
 74             config->lcore_role[lcore_id] = ROLE_OFF;
 75             lcore_config[lcore_id].core_index = -1;
 76             continue;
 77         }
 78 
 79         /* By default, lcore 1:1 map to cpu id */
 80         CPU_SET(lcore_id, &lcore_config[lcore_id].cpuset);
 81 
 82         /* By default, each detected core is enabled */
 83         config->lcore_role[lcore_id] = ROLE_RTE;
 84         lcore_config[lcore_id].core_id = eal_cpu_core_id(lcore_id);
 85         lcore_config[lcore_id].socket_id = eal_cpu_socket_id(lcore_id);
100         count++;
101     }
102     /* Set the count of enabled logical cores of the EAL configuration */
103     config->lcore_count = count;
108     //more code...
109     return 0;
110 }

rte_eal_cpu_init函数主要设置每个线程lcore_config相关信息,其中CPU_ZEROCPU_SET是设置相关cpu亲和性的接口,这里指的是硬亲和性;简单解释下这样做的好处:使得线/进程在某个给定的 CPU 上尽量长时间地运行而不被迁移到其他处理器,迁移的频率小就意味着产生的负载小,提高了cpu cache的命中率,从而减少内存访问损耗,提高程序的速度,每个核心可以更专注地处理一件事情,资源体系被充分使用,减少了同步的损耗;

99 /* set affinity for current thread */
100 static int
101 eal_thread_set_affinity(void)
102 {   
103     unsigned lcore_id = rte_lcore_id();
104 
105     /* acquire system unique id  */
106     rte_gettid();
107         
108     /* update EAL thread core affinity */
109     return rte_thread_set_affinity(&lcore_config[lcore_id].cpuset);
110 }
112 void eal_thread_init_master(unsigned lcore_id)
113 {       
114     /* set the lcore ID in per-lcore memory area */
115     RTE_PER_LCORE(_lcore_id) = lcore_id;
116         
117     /* set CPU affinity */
118     if (eal_thread_set_affinity() < 0)
119         rte_panic("cannot set affinity\n");
120 } 

上面是设置主线程的cpu亲和性,绑定指定核掩码的第一个;

586     RTE_LCORE_FOREACH_SLAVE(i) {
587 
588         /*
589          * create communication pipes between master thread
590          * and children
591          */
592         if (pipe(lcore_config[i].pipe_master2slave) < 0)
593             rte_panic("Cannot create pipe\n");
594         if (pipe(lcore_config[i].pipe_slave2master) < 0)
595             rte_panic("Cannot create pipe\n");
596 
597         lcore_config[i].state = WAIT;
598 
599         /* create a thread for each lcore */
600         ret = pthread_create(&lcore_config[i].thread_id, NULL,
601                      eal_thread_loop, NULL);
602         if (ret != 0)
603             rte_panic("Cannot create thread\n");
604 
605         /* Set thread_name for aid in debugging. */
606         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
607                 "lcore-slave-%d", i);
608         rte_thread_setname(lcore_config[i].thread_id, thread_name);
609     }

上面主要是创建每个工作线程,设置初始状态,初始化pipe[以前只知道用于进程间通信];

122 /* main loop of threads */
123 __attribute__((noreturn)) void *
124 eal_thread_loop(__attribute__((unused)) void *arg)
125 {
126     char c;
127     int n, ret;
128     unsigned lcore_id;
129     pthread_t thread_id;
130     int m2s, s2m;
131     char cpuset[RTE_CPU_AFFINITY_STR_LEN];
132 
133     thread_id = pthread_self();
134 
135     /* retrieve our lcore_id from the configuration structure */
136     RTE_LCORE_FOREACH_SLAVE(lcore_id) {
137         if (thread_id == lcore_config[lcore_id].thread_id)
138             break;
139     }
140     if (lcore_id == RTE_MAX_LCORE)
141         rte_panic("cannot retrieve lcore id\n");
142 
143     m2s = lcore_config[lcore_id].pipe_master2slave[0];
144     s2m = lcore_config[lcore_id].pipe_slave2master[1];
145 
146     /* set the lcore ID in per-lcore memory area */
147     RTE_PER_LCORE(_lcore_id) = lcore_id;
148 
149     /* set CPU affinity */
150     if (eal_thread_set_affinity() < 0)
151         rte_panic("cannot set affinity\n");
152 
153     ret = eal_thread_dump_affinity(cpuset, RTE_CPU_AFFINITY_STR_LEN);
154 
155     RTE_LOG(DEBUG, EAL, "lcore %u is ready (tid=%p;cpuset=[%s%s])\n",
156         lcore_id, thread_id, cpuset, ret == 0 ? "" : "...");
157 
158     /* read on our pipe to get commands */
159     while (1) {
160         void *fct_arg;
161 
162         /* wait command */
163         do {
164             n = read(m2s, &c, 1);
165         } while (n < 0 && errno == EINTR);
166 
167         if (n <= 0)
168             rte_panic("cannot read on configuration pipe\n");
169 
170         lcore_config[lcore_id].state = RUNNING;
171 
172         /* send ack */
173         n = 0;
174         while (n == 0 || (n < 0 && errno == EINTR))
175             n = write(s2m, &c, 1);
176         if (n < 0)
177             rte_panic("cannot write on configuration pipe\n");
178 
179         if (lcore_config[lcore_id].f == NULL)
180             rte_panic("NULL function pointer\n");
181 
182         /* call the function and store the return value */
183         fct_arg = lcore_config[lcore_id].arg;
184         ret = lcore_config[lcore_id].f(fct_arg);
185         lcore_config[lcore_id].ret = ret;
186         rte_wmb();
187         lcore_config[lcore_id].state = FINISHED;
188     }
189 
190     /* never reached */
191     /* pthread_exit(NULL); */
192     /* return NULL; */
193 }

上面这段代码主要是从pipe读到命令,并写入pipe确认,然后执行线程工作函数;即行184,参数是行183,然后保存结果并更新状态;rte_wmb,rte_rmb作用会在后面介绍;
例子中的main部分如下:

 67 /* call lcore_hello() on every slave lcore */
 68     RTE_LCORE_FOREACH_SLAVE(lcore_id) {
 69         rte_eal_remote_launch(lcore_hello, NULL, lcore_id);
 70     }
 71     
 72     /* call it on master lcore too */
 73     lcore_hello(NULL);
 74 
 75     rte_eal_mp_wait_lcore();
110 void
111 rte_eal_mp_wait_lcore(void)
112 {
113     unsigned lcore_id;
114 
115     RTE_LCORE_FOREACH_SLAVE(lcore_id) {
116         rte_eal_wait_lcore(lcore_id);
117     }
118 }
 47 /*
 48  * Wait until a lcore finished its job.
 49  */
 50 int
 51 rte_eal_wait_lcore(unsigned slave_id)
 52 {
 53     if (lcore_config[slave_id].state == WAIT)
 54         return 0;
 55 
 56     while (lcore_config[slave_id].state != WAIT &&
 57            lcore_config[slave_id].state != FINISHED);
 58 
 59     rte_rmb();
 60 
 61     /* we are in finished state, go to wait state */
 62     lcore_config[slave_id].state = WAIT;
 63     return lcore_config[slave_id].ret;
 64 }

以上代码就是设置工作线程的主函数lcore_hello,并等待结束,整个线程模型还是中规中矩的。

参考:
http://www.cnblogs.com/LubinLew/p/cpu_affinity.html
https://www.ibm.com/developerworks/cn/linux/l-affinity.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,539评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,594评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,871评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,963评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,984评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,763评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,468评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,850评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,002评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,144评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,823评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,483评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,026评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,150评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,415评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,092评论 2 355

推荐阅读更多精彩内容