之前在网上找了个例子:https://www.freesion.com/article/53297012/ 写的挺好。
github已经有开源地址RocketMQ-Flink。(读取可以根据github和这个例子写)
上面那个例子写了String的反序列化类SimpleStringDeserializationSchema。没有写对应的序列化类。
1、今天写项目就恰好用到了。记录下:
import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;
import java.nio.charset.StandardCharsets;
public class SimpleStringSerializationSchema implements KeyValueSerializationSchema<String> {
private static final long serialVersionUID = 609645086382422113L;
@Override
public byte[] serializeKey(String key) {
return null;
}
@Override
public byte[] serializeValue(String value) {
return value != null ? value.getBytes(StandardCharsets.UTF_8) : null;
}
}
注意:serialVersionUID 这个是用idea生成的。在自己包下重新生成下,以免有问题。(方法:https://blog.csdn.net/ShotMoon/article/details/80496407)
2、写入MQ:
Properties props1 = new Properties();
props1.setProperty(RocketMQConfig.NAME_SERVER_ADDR, Config.getMQServer());
props1.setProperty(RocketMQConfig.PRODUCER_GROUP,Config.getProduceGroup());
DefaultTopicSelector selector = new DefaultTopicSelector(Config.getTopic(),Config.getTagName());
KeyValueSerializationSchema schema1 = new SimpleStringSerializationSchema();
result.addSink(new RocketMQSink(schema1, selector, props1));
具体参数按照自己的写就行。
完美。