1.环境情况
flink 1.11
kafka 2.4.0
2. 操作准备
a. avro 的jar 包 avro-tools-1.7.7.jar
b. 对象的 XXX.avsc 文件 ,描述序列化各个字段的属性类型。
namespace:命名路径
type:固定写法 record
name:java对象类名
fields:字段描述
{
"namespace":"com.test.avro",
"type":"record",
"name":"User",
"fields":[
{
"name":"name",
"type":"string"
},
{
"name":"age",
"type":[
"int",
"null"
]
},
{
"name":"email",
"type":[
"string",
"null"
]
},
{
"name":"custom_props",
"type":[
"null",
{
"type":"map",
"values":"string"
}
]
}
]
}
3. 执行步骤
a第一步.使用windows 自动生成 XXX.java 文件
命令:C:\Users\colin>java -jar F:\repository\org\apache\avro\avro-tools\1.7.7\avro-tools-1.7.7.jar compile -string schema C:\Users\colin\IdeaProjects\logs_behavior_analysis_v2\ods_logs_kafka\src\main\resources\avro\Logs.avsc ./
java -jar + tools路径+ copile -string schema + XXX.avsc路径+ XXX.java存放路径
注意:不加-string 则默认使用 CharSequence类型,遇到map就会自动加密(base64加密方式),本人再这里搞了很久都解析不出map中的字段。所以建议使用 -string指定字段属性为String。
image.png
生成的java文件截图如下:
image.png
b第二步:放入程序中,引用此对象
sink的序列化
AvroSerializationSchema<Logs> avroSerializationSchema = AvroSerializationSchema.forSpecific(Logs.class);
//Logs.class 这个就是你生成的 XXX.java文件
final FlinkKafkaProducer010<Logs> myProducer = new FlinkKafkaProducer010<>(
topic,
avroSerializationSchema,
prop);
Source 反序列化
AvroDeserializationSchema<Logs> avroDeserializationSchema = AvroDeserializationSchema.forSpecific(Logs.class);
DataStream<Logs> kafkaStream = env.addSource(new FlinkKafkaConsumer010<Logs>("colin_topic", avroDeserializationSchema, properties))
4. tools maven
通过这样tools下载包
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>1.7.7</version>
</dependency>