第四届阿里中间件性能挑战赛总结和思考

随着复赛今天截止,为期两个月的挑战赛也终于结束了.这两个月里很大一部分时间花在这上面,有过欢乐,也有为分数刷不上去而发愁.作为第一次参加比赛,对比赛结果还算是满意吧.而在这个过程中,对多线程知识,netty,nio等知识的深入认识.下面是对比赛的总结和思考.排名如下

image.png
初赛:《Service Mesh Agent for Apache Dubbo (Incubating) 》
  • 赛题的思考

题目看起来是让我们实现一个rpc agent.因为官方已经给出了consumer和provider,选手就是要实现两个代理,第一个代理是consumer-agent,负责把consumer的调用通过自定义协议发动给Provider-agent.第二个代理就是provider-agent.他的任务就是接收Consumer-agent通过网络发动过来的消息,然后通过dubbo调用provider.最后把结果返回给consumer-agent.整个系统的调用图如下:


image.png
  • 设计和实现.

整个调用过程如下所示:


未命名文件.jpg
  • ①处这里采用Netty Http应用作为服务端,处理Consumer发送过来的http请求.
  • ②处这里就是在Consumer-agent开启Netty Client,Provider-agent端开启Netty Server进行请求和响应.
  • ③ Provider-agent通Netty Client去调用Provider的服务.
  • ④ Provider把结果返回给Consumer-agent.
  • ⑤ Consumer-agent把结果封装成HttpResponse返回给客户端.

Provider提供的服务如下:

public interface IHelloService {

  /**
   * 计算传入参数的哈希值.
   *
   * @param str 随机字符串
   * @return 该字符串的哈希值
   */
  int hash(String str);
}

整个代码我放在github中,这里不对整个代码做分析,只分析出关键的点.

负载均衡

如下图,3个provider的负载能力如下,那么我们可以选择负载均衡算法的时候,把这个考虑进去.我选择是随机加权算法.根据大家的一致认同,small:meddium:large = 1:2:2.


Selection_020.png

所有的服务都运行在docker环境中,而用的etcd作为服务发现的组件.事先并不知道那台机器是small,large,meddium.那么我们可以考虑把参数加上启动参数.一旦服务启动,这些信息,都会注册到etcd中.然后取出来,做相应的判断就行.


Selection_021.png

在etcd做服务发现的时候,把型号信息转换成比例注册上去

//small 1; meddium和large是2.
if(val.equals("small")) {
                endpoints.add(new Endpoint(host, port, 1));
            }else{
                endpoints.add(new Endpoint(host, port, 2));
            }

Consumer在选择那个Provider的时候就可以根据以上的信息,轮询选择一个.

//向endpoints加入5个实例,small一个,meddium和large都是2个.
if (null == endpoints) {
            synchronized (ConsumerAgentHttpServerHandler.class) {
                if (null == endpoints) {
                    endpoints = RegistryInstance.getInstance().find("com.alibaba.dubbo.performance.demo.provider.IHelloService");
                    ListIterator<Endpoint> it = endpoints.listIterator();
                    while (it.hasNext()){
                        Endpoint temp = it.next();
                        if(temp.getSize()==2) {
                            it.add(temp);
                        }
                    }
                }
            }
        }
        int id = count.getAndIncrement();
        if(id>=4){
            count.set(0);
            id=4;
        }
        // 简单的负载均衡,随机取一个
        Endpoint endpoint = endpoints.get(id);

这样一个随机加权的算法就实现了.

EventLoop复用

当我们创建Provident-agent的时候,我们是否可以考虑Eventloop的复用,这样每个请求从接收到发动都是用同一个线程处理的,没有上下文切换.另外一个,这样做好处,把channel和Eventloop绑定起来,也就限定了channel的个数,相当于做了一个channel的缓存(因为channel的数量得控制).一举两得.

private void providerServerStart(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup(4);
        putMap(workerGroup);
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ProviderAgentHttpServerChannelInitializer());
            LOGGER.info("provider netty server start");
            ChannelFuture future = sbs.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

 //预先把channel设置好,复用上面的eventloop.
    public void putMap(EventLoopGroup group) {
        for (EventExecutor executor : group) {
            try {
                map.put((EventLoop) executor, connecManager.getChannel( (EventLoop) executor));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
回调的设计

当Provider返回给结果后,那我们应该如何把结果返回给Consumer-agent呢,也就是它如何记住之前的通道.这里采用的是一个回调的设计.这样就能够记住上下文,也就是记住过来时候的ChannelHandlerContext,通过这个把结果返回回去.

 @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
        Map<String, String> data = HttpParser.parse(msg);
        handle(new RequestWrapper(data.get("interface"),
                data.get("method"),
                data.get("parameterTypesString"),
                data.get("parameter")), (result) -> {
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(result.getBytes()));
            response.headers().set(CONTENT_TYPE, "text/plain");
            response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
            ctx.write(response);
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
        },ctx.channel().eventLoop());
    }

当结果返回后,通过回调调用回调函数的逻辑

//拿到结果后,回调
 public void done(RpcResponse response){
        this.response = response;
        sender.accept(new String(response.getBytes()).trim());
    }
  • 反思和思考
可以批量flush,批量decode(来源于朋友徐靖峰的思想)
image.png

Netty 提供了一个方便的解码工具类 ByteToMessageDecoder ,如图上半部分所示,这个类具备 accumulate 批量解包能力,可以尽可能的从 socket 里读取字节,然后同步调用 decode 方法,解码出业务对象,并组成一个 List 。最后再循环遍历该 List ,依次提交到 ChannelPipeline 进行处理。此处我们做了一个细小的改动,如图下半部分所示,即将提交的内容从单个 command ,改为整个 List 一起提交,如此能减少 pipeline 的执行次数,同时提升吞吐量。这个模式在低并发场景,并没有什么优势,而在高并发场景下对提升吞吐量有不小的性能提升。

负载均衡

上面我的做法有点硬编码的意思,而且随机的话,而且不确定性有点大.那是是否可以考虑根据调用的次数来做负载均衡,也就是说,给句每个Provider请求的次数,尽量把请求分给请求量少的Provider,当然这个量还是得加权.实现的复杂性有点高.

限流

经过朋友提醒,是否可以尝试下,限流,也就是说不放那么多请求进取,只通过一部分来请求,待完成之后,再放另外一部分,这个可以尝试用令牌桶来实现.处于理论阶段,没实际尝试过.

编码

我做的处理里面都是采用的jdk自带的编码方式.如果采用kryo,protobuf的方式,性能上也会有一定的提升.

我的代码:https://github.com/maskwang520/springforall.git


复赛:实现一个进程内的队列引擎,单机可支持100万队列以上,能够承受2亿消息的存取.
  • 赛题的思考
    题目要求有5个:
    1.各个阶段线程数在20~30左右
    2.发送阶段:消息大小在50字节左右,消息条数在20亿条左右,也即发送总数据在100G左右
    3.索引校验阶段:会对所有队列的索引进行随机校验;平均每个队列会校验1~2次;
    4.顺序消费阶段:挑选20%的队列进行全部读取和校验;
    5.发送阶段最大耗时不能超过1800s;索引校验阶段和顺序消费阶段加在一起,最大耗时也不能超过1800s;超时会被判断为评测失败。

100万个queue,20亿消息,如果放内存是完全不现实的,内存肯定会爆.接下来自然想到把消息存放到文件中,内存中只放索引就行.但是内存存放索引,是20亿消息的消息,索引自然是由(消息起始位置+长度)构成.但是这样的Map<queue,Index>存放的索引有20亿,疯狂的FullGc是不可避免的,Full Gc一多,Tps自然上不去.后来想到,消息按块存储(多个消息存在一个块中),索引的时候按块索引.这样就能把Map里面存的只有100万(queue的个数),示意图如下:

未命名文件.png

Block的设计
public class Block {
    //开始位置
    public long startPosition;

    //长度
    public int length;
    //Block中已经存放的消息的条数
    public int size;

    public Block(Long startPosition, int length) {
        this.startPosition = startPosition;
        this.length = length;
        this.size = 0;
    }
}
  • 因为一个queue中可能有多个Block,在消息检索的时候给出的是在队列中的偏移量,那么size这个域方便后面消息检索的时候判断在哪个block中.

  • 消息缓存的设计
    因为每当来一个消息都要flush到文件中去,这样Io的时间就太多了,题目的关键点在于如何减少Io的时间.所以可以采用消息的缓存来处理.每当来一个消息,就放入缓存中,当缓存中超过10次消息的时候,就同步写入到文件中去.这样的话,相当于每10次写,才做一次Io.

public class DataCache {
   //消息缓存
    public ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
    public int count;
}

这里将缓存的大小设置为1024Byte,当然你也可以设置成更大.这里有个小Tips.缓存的消息最好设置成Block的大小.这样当缓存满了之后,就可以直接写入到一个Block块中,而不用接着上一个Block写(上面一个Block写),这样设计,写入更简单,每次flush到文件的时候,只要新开辟一个新的Block,而不用管之前的Block.

    //以块为索引,一个队列可能有多个块,且块的写入有顺序,所有用List来存Block.
    public Map<String, List<Block>> blockMap = new ConcurrentHashMap<>();
    public Map<String, DataCache> cacheMap = new ConcurrentHashMap<>();
消息的存储

因为不可能每个队列的消息都用一个文件来存放,所以这里用hash来把文件限定在32个.一个queue的Block必须在一个文件里面.不同queue的Block可以在一个文件里面.

 //根据队列的名字hash到对应的文件中,共32个文件
    int hashFile(String queueName) {
        return queueName.hashCode() & 0x1f;
        //return 0;
    }

还存在一个问题就是,往一个文件中写入消息的时候,什么位置写,因为按块写.所以已经写过的块不能用.只能从新开辟一个块,块与块之间尽可能紧凑.

    //block的大小为1024,根据当前文件已经存在的写的位置,找到下一个比该位置大的,且是1024的倍数
    public long getLeastBlockPosition(long length) {
        if (length == 0) {
            return 0;
        }
        int initSize = 1 << 10;
        int i = 1;
        while (i * initSize <= length) {
            i++;
        }
        //定义到可用的块的第一个位置
        return i * initSize;
    }
消息存放

这里采用的是原生的filechannel去读写.本打算用mmap去写的,经过一位朋友提醒,mmap在这个场景下不合适.原因是不是长期读写,写完就释放,不是长期的.

public void put(String queueName, byte[] message) {
        int hash = hashFile(queueName);
        String path = DIRPATH + hash + ".txt";
        lock.lock();
        //创建文件
        File file = new File(path);
        if (!file.exists()) {
            try {
                file.createNewFile();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        if (!blockMap.containsKey(queueName)) {
            List<Block> list = new ArrayList();
            blockMap.put(queueName, list);
        }

        if (!cacheMap.containsKey(queueName)) {
            DataCache dataCache = new DataCache();
            cacheMap.put(queueName, dataCache);
        }

        DataCache dataCache = cacheMap.get(queueName);
      //每10次flush到文件中
        if (dataCache.count == 10) {
            FileChannel fileChannel = null;
            // long fileLength = 0;
            try {
                fileChannel = new RandomAccessFile(file, "rw").getChannel();
                //fileLength = raf.length();
            } catch (Exception e) {
                e.printStackTrace();
            }

            long blockPosition;
            try {
                blockPosition = getLeastBlockPosition(getLeastBlockPosition(fileChannel.size()));
                Block block = new Block(blockPosition, dataCache.dataBuffer.position());
                block.size = 10;
                blockMap.get(queueName).add(block);
                dataCache.dataBuffer.flip();
                fileChannel.position(blockPosition);
                fileChannel.write(dataCache.dataBuffer);
                dataCache.dataBuffer.clear();

            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    fileChannel.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        } else {
            //放入缓存中
            dataCache.dataBuffer.putInt(message.length);
            dataCache.dataBuffer.put(message);
            dataCache.count++;
        }


        lock.unlock();


    }
消息获取

消息获取的思路是根据队列名,找到该队列对应的List<Block>,然后根据偏移量,找到属于哪个block.找到具体的Block后,然后遍历Block,找到偏移量的开始位置,取相应数量的消息即可.

public Collection<byte[]> get(String queueName, long offset, long num) {

        //队列不存在
        if (!blockMap.containsKey(queueName)) {
            return EMPTY;
        }
        //消息集合
        List<byte[]> msgs = new ArrayList();
        List<Block> blocks = blockMap.get(queueName);

        int hash = hashFile(queueName);
        String path = DIRPATH + hash + ".txt";
        FileChannel fileChannel = null;
        int size = blocks.get(0).size;
        int eleNum = 0;
        //记录了目标block所在的下标
        int blockNum = 0;
        lock.lock();
        try {
            fileChannel = new RandomAccessFile(new File(path), "rw").getChannel();
            for (int i = 1; i < blocks.size() && size < offset; i++, blockNum++) {
                size += blocks.get(i).size;
            }

            size = size - blocks.get(blockNum).size;


            for (int i = blockNum; i < blocks.size(); i++) {
                //size+=blocks.get(i).size;
                // size-=blocks.get(i).size;
                int length = blocks.get(i).length;
                MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, blocks.get(i).startPosition, length);
                int sum = 0;
                while (sum < length && size < offset) {
                    int len = buffer.getInt();
                    sum += 4;
                    sum += len;
                    buffer.position(sum);
                    size++;
                }

                if (size >= offset) {
                    while (buffer.position() < length && eleNum <= num) {
                        int len = buffer.getInt();
                        byte[] temp = new byte[len];
                        buffer.get(temp, 0, len);
                        eleNum++;
                        msgs.add(temp);
                    }
                    if (eleNum > num) {
                        break;
                    }
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            try {
                fileChannel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
            lock.unlock();

        }

        return msgs;
    }
思考
  • 将所有的ByteBuf池化,包括缓存的那部分ByteBuf.通过ThreadLocal,将ByteBuf与线程绑定起来,后面申请Buffer,直接从对应的线程里面去申请即可.
  • 在写入的时候,可以不同步写,实现异步写.由一个线程去异步flush到文件里面
  • 当读取消息块达到临界点的时候,由单线程申请buffer资源来预读后面的消息块存入,并缓存.
    我的代码:https://github.com/maskwang520/messagequeue.git
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容