Flink Source/Sink探究与实践:RocketMQ数据写入HBase

前言

最近我们正在尝试把原有的一些Spark Streaming任务改造成Flink Streaming任务,自定义Source和Sink是遇到的第一个主要问题,稍微记录一下。

Flink既可以做流处理,也可以做批处理。不管哪种处理方式,都要有数据来源(输入)和数据汇集(输出),前者叫做Source,后者叫做Sink。Flink已经默认包含了最基本的Source和Sink(文件、Socket等)。另外也有些常用的与第三方组件交互的Source和Sink,这些叫做连接器(Connectors),如与HDFS、Kafka、ElasticSearch等对接的连接器。对于那些没有默认实现的数据源和数据汇,就必须自己动手丰衣足食了。

SourceFunction

SourceFunction是定义Flink Source的根接口,其源码如下。

@Public
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceContext<T> ctx) throws Exception;
    
    void cancel();

    interface SourceContext<T> {
        void collect(T element);

        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);

        void emitWatermark(Watermark mark);

        @PublicEvolving
        void markAsTemporarilyIdle();

        Object getCheckpointLock();

        void close();
    }
}

SourceFunction接口定义了run()方法,该方法用于源源不断地产生源数据,因此重写的时候一般都写成循环,用标志位控制是否结束。cancel()方法则用来打断run()方法中的循环,终止产生数据的过程。

SourceFunction中还嵌套定义了SourceContext接口,它表示这个Source对应的上下文,用来发射数据。其中起主要作用的是前三个方法:

  • collect():发射一个不带自定义时间戳的元素。如果流程序的时间特征(TimeCharacteristic)是处理时间(ProcessingTime),元素没有时间戳;如果是摄入时间(IngestionTime),元素会附带系统时间;如果是事件时间(EventTime),那么初始没有时间戳,但一旦要做与时间戳相关的操作(如窗口)时,就必须用TimestampAssigner设定一个。
  • collectWithTimestamp():发射一个带有自定义时间戳的元素。该方法对于时间特征为事件时间的程序是绝对必须的,如果为处理时间就会被直接忽略,如果为摄入时间就会被系统时间覆盖。
  • emitWatermark():发射一个水印,仅对于事件时间有效。一个带有时间戳t的水印表示不会有任何t' <= t的事件再发生,如果发生,会被当做迟到事件忽略掉。

SourceFunction还有一些其他实现,如:

  • ParallelSourceFunction,表示该Source可以按照设置的并行度并发执行。
  • RichSourceFunction,继承自富函数RichFunction,表示该Source可以感知到运行时上下文(RuntimeContext,如Task、State、并行度的信息),以及可以自定义初始化和销毁逻辑(通过open()/close()方法)。
  • RichParallelSourceFunction,以上两者的综合。

RocketMQ Source

我们有些老旧业务的消息总线采用的是RocketMQ。在自己造轮子实现对应的Source之前,先去GitHub上的rocketmq-externals项目看了一眼,发现已经有了对应的连接器(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink),免去了自己实现的麻烦。下面来看一下RocketMQSource类的源码,比较长,但写得很漂亮,也容易理解。

public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
    implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class);

    private transient MQPullConsumerScheduleService pullConsumerScheduleService;
    private DefaultMQPullConsumer consumer;
    private KeyValueDeserializationSchema<OUT> schema;

    private RunningChecker runningChecker;

    private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
    private Map<MessageQueue, Long> offsetTable;
    private Map<MessageQueue, Long> restoredOffsets;
    private LinkedMap pendingOffsetsToCommit;

    private Properties props;
    private String topic;
    private String group;

    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";

    private transient volatile boolean restored;
    private transient boolean enableCheckpoint;

    public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
        this.schema = schema;
        this.props = props;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        LOG.debug("source open....");
        Validate.notEmpty(props, "Consumer properties can not be empty");
        Validate.notNull(schema, "KeyValueDeserializationSchema can not be null");

        this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
        this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);

        Validate.notEmpty(topic, "Consumer topic can not be empty");
        Validate.notEmpty(group, "Consumer group can not be empty");

        this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();

        if (offsetTable == null) {
            offsetTable = new ConcurrentHashMap<>();
        }
        if (restoredOffsets == null) {
            restoredOffsets = new ConcurrentHashMap<>();
        }
        if (pendingOffsetsToCommit == null) {
            pendingOffsetsToCommit = new LinkedMap();
        }

        runningChecker = new RunningChecker();
        pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
        consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();

        consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
        RocketMQConfig.buildConsumerConfigs(props, consumer);
    }

    @Override
    public void run(SourceContext context) throws Exception {
        LOG.debug("source run....");

        final Object lock = context.getCheckpointLock();

        int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,
            RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);

        String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
        int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE,
            RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
        int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE,
            RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);

        pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
        pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {
            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) {
                try {
                    long offset = getMessageQueueOffset(mq);
                    if (offset < 0) {
                        return;
                    }

                    PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize);
                    boolean found = false;
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> messages = pullResult.getMsgFoundList();
                            for (MessageExt msg : messages) {
                                byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
                                byte[] value = msg.getBody();
                                OUT data = schema.deserializeKeyAndValue(key, value);

                                synchronized (lock) {
                                    context.collectWithTimestamp(data, msg.getBornTimestamp());
                                }
                            }
                            found = true;
                            break;
                        case NO_MATCHED_MSG:
                            LOG.debug("No matched message after offset {} for queue {}", offset, mq);
                            break;
                        case NO_NEW_MSG:
                            break;
                        case OFFSET_ILLEGAL:
                            LOG.warn("Offset {} is illegal for queue {}", offset, mq);
                            break;
                        default:
                            break;
                    }

                    synchronized (lock) {
                        putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    }

                    if (found) {
                        pullTaskContext.setPullNextDelayTimeMillis(0); 
                    } else {
                        pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });

        try {
            pullConsumerScheduleService.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }

        runningChecker.setRunning(true);
        awaitTermination();
    }

    private void awaitTermination() throws InterruptedException {
        while (runningChecker.isRunning()) {
            Thread.sleep(50);
        }
    }

    private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
        Long offset = offsetTable.get(mq);
        if (restored && offset == null) {
            offset = restoredOffsets.get(mq);
        }
        if (offset == null) {
            offset = consumer.fetchConsumeOffset(mq, false);
            if (offset < 0) {
                String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
                switch (initialOffset) {
                    case CONSUMER_OFFSET_EARLIEST:
                        offset = consumer.minOffset(mq);
                        break;
                    case CONSUMER_OFFSET_LATEST:
                        offset = consumer.maxOffset(mq);
                        break;
                    case CONSUMER_OFFSET_TIMESTAMP:
                        offset = consumer.searchOffset(mq, getLong(props,
                            RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()));
                        break;
                    default:
                        throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");
                }
            }
        }
        offsetTable.put(mq, offset);
        return offsetTable.get(mq);
    }

    private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
        offsetTable.put(mq, offset);
        if (!enableCheckpoint) {
            consumer.updateConsumeOffset(mq, offset);
        }
    }

    @Override
    public void cancel() {
        LOG.debug("cancel ...");
        runningChecker.setRunning(false);

        if (pullConsumerScheduleService != null) {
            pullConsumerScheduleService.shutdown();
        }

        offsetTable.clear();
        restoredOffsets.clear();
        pendingOffsetsToCommit.clear();
    }

    @Override
    public void close() throws Exception {
        LOG.debug("close ...");
        try {
            cancel();
        } finally {
            super.close();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!runningChecker.isRunning()) {
            LOG.debug("snapshotState() called on closed source; returning null.");
            return;
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotting state {} ...", context.getCheckpointId());
        }

        unionOffsetStates.clear();

        HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());

        Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
        offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));

        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
            currentOffsets.put(entry.getKey(), entry.getValue());
        }

        pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);

        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
                offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        LOG.debug("initialize State ...");

        this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
            OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { })));

        this.restored = context.isRestored();

        if (restored) {
            if (restoredOffsets == null) {
                restoredOffsets = new ConcurrentHashMap<>();
            }
            for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
                if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
                }
            }
            LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
        } else {
            LOG.info("No restore state for the consumer.");
        }
    }

    @Override
    public TypeInformation<OUT> getProducedType() {
        return schema.getProducedType();
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!runningChecker.isRunning()) {
            LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");
            return;
        }

        final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
        if (posInMap == -1) {
            LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
            return;
        }

        Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>)pendingOffsetsToCommit.remove(posInMap);

        for (int i = 0; i < posInMap; i++) {
            pendingOffsetsToCommit.remove(0);
        }

        if (offsets == null || offsets.size() == 0) {
            LOG.debug("Checkpoint state was empty.");
            return;
        }

        for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
            consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
        }
    }
}

RocketMQSource在open()方法中校验并初始化了所有配置,并创建了拉模式的RocketMQ消费者线程。在run()方法中启动线程,不断执行注册的回调逻辑,拉取消息并调用collectWithTimestamp()方法发射消息数据与时间戳,然后更新Offset。发射数据与更新Offset的操作都用检查点锁保护。

该Source除了实现RichParallelSourceFunction接口之外,还另外实现了CheckpointedFunction接口,说明它支持检查点,对应的方法为snapshotState()和initializeState()。这不是本文要说的东西,之后再提。

SinkFunction

SinkFunction是自定义Sink的根接口,其源码如下。

public interface SinkFunction<IN> extends Function, Serializable {
    @Deprecated
    default void invoke(IN value) throws Exception {}

    default void invoke(IN value, Context context) throws Exception {
        invoke(value);
    }

    @Public 
    interface Context<T> {
        long currentProcessingTime();

        long currentWatermark();

        Long timestamp();
    }
}

它的定义比SourceFunction要简单,只有一个invoke()方法,对收集来的每条数据都会调用它来处理。SinkFunction也有对应的上下文对象Context,可以从中获得当前处理时间、当前水印和时间戳。它也有衍生出来的富函数版本RichSinkFunction。

Flink内部提供了一个最简单的实现DiscardingSink。顾名思义,就是将所有汇集的数据全部丢弃。

@Public
public class DiscardingSink<T> implements SinkFunction<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public void invoke(T value) {}
}

HBase Sink

下面是自己实现的一个简陋的(当然用起来没问题的)HBase Sink。代码里写了部分注释。

public class CalendarHBaseSink extends RichSinkFunction<JSONObject> {
    private static final long serialVersionUID = -6140896663267559061L;

    private static final Logger LOGGER = LoggerFactory.getLogger(CalendarHBaseSink.class);
    private static byte[] CF_BYTES = Bytes.toBytes("f");
    private static TableName TABLE_NAME = TableName.valueOf("dw_hbase:calendar_record");
    private Connection connection = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // HBase连接封装为单例
        // RichSinkFunction.open()方法是按并行度执行的,而创建HBase连接是个很贵的操作
        connection = HBaseConnection.get();
        LOGGER.info("Opened CalendarHBaseSink");
    }

    @Override
    public void close() throws Exception {
        super.close();
        HBaseConnection.close();
        LOGGER.info("Closed CalendarHBaseSink");
    }

    // 数据预先解析为JSON
    @Override
    public void invoke(JSONObject record, Context context) throws Exception {
        Integer eventType = record.getInteger("eventtype");
        // 获取HBase中的列名映射,列名尽量短就是了
        String qualifier = EventType.getColumnQualifier(eventType);
        if (qualifier == null) {
            return;
        }

        Long uid = record.getLong("uid");
        Integer recordDate = record.getInteger("dateline");
        String data = record.getString("data");
        Long uploadTime = record.getLong("updatetime");

       // try-with-resources语法。创建Table就很轻量级了
       // 为了提高写入效率,在并发大时还可以使用HBase的BufferedMutator
        try (Table table = connection.getTable(TABLE_NAME)) {
            // 以UID和记录日期作为主键。注意设计一个好的RowKey
            Put put = new Put(Bytes.toBytes(RowKeyUtil.getForCalendar(uid, recordDate)));
            // 将上传时间作为HBase时间戳写入
            put.addColumn(CF_BYTES, Bytes.toBytes(qualifier), uploadTime * 1000, Bytes.toBytes(data));
            table.put(put);
        }
    }
}

在写这个Sink的过程中由于粗心,出了两个小插曲。

一是程序写完在本地运行时,没有任何报错信息,但就是写入不了数据。Debug时发现上传时间的JSON Field名字搞错了,实际上抛出了NPE,但在正常运行时无法发现。

二是创建检查点频繁超时,并且过一段时间就会抛出HBase连接不成功的异常。这是因为本地hosts文件中没有正确配置新的HBase集群的域名导致的,修改hosts文件之后就好了。

Streaming主程序

将Source和Sink联系起来,提交到Flink on YARN集群,就可以跑了。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(300000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // at least once的检查点就够用了
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        // 逻辑很简单,并且RocketMQ会记录offset,不必非要用FsStateBackend
        env.setStateBackend(new MemoryStateBackend(true));

        Properties consumerProps = new Properties();
        consumerProps.setProperty(NAME_SERVER_ADDR, RocketMQConst.NAME_SERVER_TEST);
        consumerProps.setProperty(CONSUMER_OFFSET_RESET_TO, "latest");
        consumerProps.setProperty(CONSUMER_TOPIC, "calendar");
        consumerProps.setProperty(CONSUMER_TAG, "*");
        consumerProps.setProperty(CONSUMER_GROUP, "FLINK_STREAM_CALENDAR_TEST_1");

        env.addSource(new RocketMQSource<>(new JSONDeserializationSchema(), consumerProps))
            .name("calendar-rocketmq-source")
            .addSink(new CalendarHBaseSink())
            .name("calendar-hbase-sink");

        env.execute();
    }

其中将RocketMQ消息解析为JSON的Schema类也很简单。

public class JSONDeserializationSchema implements KeyValueDeserializationSchema<JSONObject> {
    private static final long serialVersionUID = -649479674253613072L;

    @Override
    public JSONObject deserializeKeyAndValue(byte[] key, byte[] value) {
        return value != null ? JSON.parseObject(new String(value, StandardCharsets.UTF_8)) : null;
    }

    @Override
    public TypeInformation<JSONObject> getProducedType() {
        return TypeInformation.of(JSONObject.class);
    }
}

在这里仍然用默认的处理时间作为时间特征,没有使用事件时间(即上面的uploadTime字段)。这是因为Flink中的水印目前是Operator级别的,而不是Key级别的。如果直接使用事件时间和水印的话,不同用户ID与记录日期之间的时间戳就会互相干扰,导致用户A的正常数据因为用户B的数据水印更改而被误识别为迟到数据。要解决这个问题,有两种思路:

The End

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354