11-flink读写MySQL

一、读MySQL

1、通过JDBC方式定义MySQLDataSource类

1.1首先加入JDBC依赖
1.2定义JDBCInputFormat
1.3获取Row类型的DataStreamSource
1.4转化DataStream<Row>为DataStream<Student>

public class MysqlDataSource {

    private static final Logger log = LoggerFactory.getLogger(MySQLDataSource.class);

    public static DataStream<Student> readFromDb(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        //final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.定义field 类型
        TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
        //2.定义field name
        String[] fieldNames = new String[]{"id", "name", "password", "age"};
        //3.定义Row类型
        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
        
        String jdbcUrl = parameterTool.get(PropertiesConstants.MYSQL_JDBC_URL);
        log.info(jdbcUrl);

        //4.定义JDBCInputFormat
        JDBCInputFormat jdbcInputFormat = JDBCInputFormat
                .buildJDBCInputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl(jdbcUrl)
                .setUsername(parameterTool.get(PropertiesConstants.MYSQL_USERNAME))
                .setPassword(parameterTool.get(PropertiesConstants.MYSQL_PASSWORD))
                .setQuery("select id, name, password, age from student")
                .setRowTypeInfo(rowTypeInfo)
                .finish();

        //5.以JDBCInputFormat形式读取MySQL DB数据
        DataStreamSource<Row> dataStreamSourceRow = streamExecutionEnvironment.createInput(jdbcInputFormat);

        //阶段性验证可以正确读取
        dataStreamSourceRow.print();

        //6.将Row类型Stream转化为Entity类型
        DataStream<Student> dataStream = dataStreamSourceRow.map(new RichMapFunction<Row, Student>() {
            @Override
            public Student map(Row value) throws Exception {
                Student s = new Student();
                s.setId((Integer) value.getField(0));
                s.setName((String) value.getField(1));
                s.setPassword((String) value.getField(2));
                s.setAge((Integer) value.getField(3));
                return s;
            }
        });

        log.info("read datasource end");
        return dataStream;
}

2、通过自定义DataSource方式

  • 实现RichSourceFunction<T>接口,T设置DataStream数据类型
  • 使用模板
    • open()方法初始化全局使用数据(比如PrepareStatement等,可类比构造函数或者junit的的@Before这些)
    • run()方法
      • 一般使用while循环不断获取数据
      • while获取的数据需要以流的形式发送出去,使用SourceContext.collect(yourData)就好
      • 这里sourceContext收集(collect)的数据可以是单条(一条Student)也可是List<Student>集合,使用集合要把RichSourceFunction<T>泛型设为List<Student>
    • cancel()方法用于停止while循环,即停止获取数据
/**
 * 通过RichSourceFunction 返回DataStream<Student>类型数据流,且每隔10s读取一次MySQL DB
 */
public class JdbcReader2 extends RichSourceFunction<Student> {
    private static final Logger logger = LoggerFactory.getLogger(JdbcReader2.class);

    private Connection connection = null;
    private PreparedStatement ps = null;
    private volatile boolean isRunning = true;

    //该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        DriverManager.registerDriver(new Driver());
        ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;
        String jdbcUrl = parameterTool.get(PropertiesConstants.MYSQL_JDBC_URL);
        connection = DriverManager.getConnection(jdbcUrl, "root", "abc123456");//获取连接
        ps = connection.prepareStatement("select id, name, password, age, flag from student where flag='true'");
    }

    //执行查询并获取结果
    @Override
    public void run(SourceContext<Student> ctx) throws Exception {

        // List<Student> students = new ArrayList<>();

        try {
            while (isRunning) {
                ResultSet resultSet = ps.executeQuery();
                while (resultSet.next()) {

                    Student student = new Student();
                    student.setId(resultSet.getInt(1));
                    student.setName(resultSet.getString(2));
                    student.setPassword(resultSet.getString(3));
                    student.setAge(resultSet.getInt(4));

                    String flag = resultSet.getString(5);
                    student.setFlag(flag);

                    if (Boolean.parseBoolean(flag)) {
                        //students.add(student);
                        //以单个Student为单位发送数据
                        ctx.collect(student);//发送结果
                        logger.info("student >>>>>>" + student);
                    }
                }

                Thread.sleep(1000 * 5);
            }
        } catch (Exception e) {
            logger.error("runException:{}", e);
        }
    }

    //关闭数据库连接
    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            logger.error("runException:{}", e);
        }
        isRunning = false;
    }
}

3、两种方式对比

  • JDBC需要引入单独的依赖,自定义DataSource方式无特殊依赖
  • JDBC不论读还是写只能处理批数据,RichSourceFunction还是付接口SourceFunction都是流式接口

二、写MySQL

1、通过JDBC方式

Table API提供通过JDBC写MySQL的方式

  • 获取Table(可以通过DataStream转化而来)-table
  • 将table注册到Environment(作为临时view)-tempView
  • 创建inner-dest-table->out-dest-table映射(inner-dest-table是flink内部表,通过insert数据到inner-dest-table 将数据插入到out-dest-table中
       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //1.添加数据源
        DataStream<Student> studentDataStream = env.addSource(new JdbcReader2());


        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                //.useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, settings);

        //2.从DataStream获取数据
        Table table = streamTableEnvironment.fromDataStream(studentDataStream);

        streamTableEnvironment.createTemporaryView("temp_table", table);

        //3.创建sink内部Table
        String destSql = FileUtil.readSourceFile("destination.sql");
        streamTableEnvironment.sqlUpdate(destSql);

        //4.将内部Table插入到outer system
        String insertSql = FileUtil.readSourceFile("insert.sql");

        streamTableEnvironment.sqlUpdate(insertSql);


        env.execute("sort-streaming-data");

        log.info("end");
  • sql文件,保存到resources目录,并用FileUtils读取(仅仅是外置SQL而已也可直接写到代码中)
#destination.sql
CREATE TABLE student_dest (
                              id INT,
                              name VARCHAR,
                              password VARCHAR,
                              age INT
) WITH (
    'connector.type' = 'jdbc', -- 使用 jdbc connector
    'connector.url' = 'jdbc:mysql://localhost:3306/flink_demo', -- jdbc url
    'connector.table' = 'student_2', -- 表名
    'connector.username' = 'root', -- 用户名
    'connector.password' = 'abc123456', -- 密码
    'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条
)

#insert.sql, temp_table为代码临时table
INSERT INTO student_dest
SELECT
    id,
    name,
    password,
    age
FROM temp_table

2、通过自定义Sink方式

  • 通过实行RichSinkFunction接口
  • 步骤(同RichSourceFunction是一致的)
    • open() 初始化数据
    • invoke() 每获取一次数据将其处理存储到outer system
    • close() 清理及关闭资源
public class MySQLSink extends RichSinkFunction<Student> {

    private static final Logger log = LoggerFactory.getLogger(MySQLSink.class);

    PreparedStatement ps;
    BasicDataSource dataSource;
    private Connection connection;

    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        dataSource = new BasicDataSource();
        connection = getConnection(dataSource);
        String sql = "insert into student_2(id, name, password, age) values(?, ?, ?, ?);";
        if (connection != null) {
            ps = this.connection.prepareStatement(sql);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Student value, Context context) throws Exception {
        if (ps == null) {
            return;
        }
        //遍历数据集合
        Student student = value;
        //for (Student student : value) {
            ps.setInt(1, student.getId());
            ps.setString(2, student.getName());
            ps.setString(3, student.getPassword());
            ps.setInt(4, student.getAge());
            ps.addBatch();
        //}
        int[] count = ps.executeBatch();//批量后执行
        log.info("成功了插入了 {} 行数据", count.length);
    }


    private static Connection getConnection(BasicDataSource dataSource) {
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
        dataSource.setUrl("jdbc:mysql://localhost:3306/flink_demo");
        dataSource.setUsername("root");
        dataSource.setPassword("abc123456");
        //设置连接池的一些参数
        dataSource.setInitialSize(10);
        dataSource.setMaxTotal(50);
        dataSource.setMinIdle(2);

        Connection con = null;
        try {
            con = dataSource.getConnection();
            log.info("创建连接池:{}", con);
        } catch (Exception e) {
            log.error("-----------mysql get connection has exception , msg = {}", e.getMessage());
        }
        return con;
    }
}

3、jdbc和Sink方式对比

  • JDBC是用sqlQuery()和sqlUpdate()来执行所有查询和insert/update更新操作,只需要将创建table,insert语句用SQL整理到一起即可,通过insert语句插入out system
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355

推荐阅读更多精彩内容