flink读取写入RocketMQ(String)

之前在网上找了个例子:https://www.freesion.com/article/53297012/ 写的挺好。
github已经有开源地址RocketMQ-Flink。(读取可以根据github和这个例子写)

上面那个例子写了String的反序列化类SimpleStringDeserializationSchema。没有写对应的序列化类。

1、今天写项目就恰好用到了。记录下:

import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;

import java.nio.charset.StandardCharsets;

public class SimpleStringSerializationSchema implements KeyValueSerializationSchema<String> {
    private static final long serialVersionUID = 609645086382422113L;

    @Override
    public byte[] serializeKey(String key) {
        return null;
    }

    @Override
    public byte[] serializeValue(String value) {
        return value != null ? value.getBytes(StandardCharsets.UTF_8) : null;
    }
}

注意:serialVersionUID 这个是用idea生成的。在自己包下重新生成下,以免有问题。(方法:https://blog.csdn.net/ShotMoon/article/details/80496407

2、写入MQ:

  Properties props1 = new Properties();
  props1.setProperty(RocketMQConfig.NAME_SERVER_ADDR, Config.getMQServer());
  props1.setProperty(RocketMQConfig.PRODUCER_GROUP,Config.getProduceGroup());
  DefaultTopicSelector selector = new DefaultTopicSelector(Config.getTopic(),Config.getTagName());

 KeyValueSerializationSchema schema1 = new SimpleStringSerializationSchema();
 result.addSink(new RocketMQSink(schema1, selector, props1));

具体参数按照自己的写就行。

完美。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容