当前位置: 代码迷 >> 综合 >> FlinkMysqlSourceFlinkMysqlSink
  详细解决方案

FlinkMysqlSourceFlinkMysqlSink

热度:97   发布时间:2023-09-18 17:10:02.0

/**

 * 自定义Mysql Source

 */

public class CustomerMysqlSourceDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获得自定义Source对象

        DataStreamSource<UserInfo> mysqlSource = env.addSource(new MyMysqlSource());

        mysqlSource.print();

        env.execute("CustomerMySQLSourceDemo");

    }

    /**

     自定义Mysql Source实现类

     */

    public static class MyMysqlSource extends RichSourceFunction<UserInfo> {

        private Connection connection = null;       // 定义数据库连接对象

        private PreparedStatement ps = null;        // 定义PreparedStatement对象

        /*

        使用open方法, 这个方法在实例化类的时候会执行一次, 比较适合用来做数据库连接

         */

        @Override

        public void open(Configuration parameters) throws Exception {

            super.open(parameters);

            // 加载数据库驱动

            Class.forName("com.mysql.jdbc.Driver");

            // 创建数据库连接

            String url = "jdbc:mysql://node01:3306/flinkdemo?useUnicode=true&characterEncoding=utf-8&useSSL=false";

            this.connection = DriverManager.getConnection(url, "root", "123456");

            // 准备PreparedStatement对象

            this.ps = connection.prepareStatement("SELECT id, username, password, name FROM user");

        }

        /*

        使用close方法, 这个方法在销毁实例的时候会执行一次, 比较适合用来关闭连接

         */

        @Override

        public void close() throws Exception {

            super.close();

            // 关闭资源

            if (this.ps != null) this.ps.close();

            if (this.connection != null) this.connection.close();

        }

        @Override

        public void run(SourceContext<UserInfo> ctx) throws Exception {

            ResultSet resultSet = ps.executeQuery();

            while (resultSet.next()) {

                int id = resultSet.getInt("id");

                String username = resultSet.getString("username");

                String password = resultSet.getString("password");

                String name = resultSet.getString("name");

                ctx.collect(new UserInfo(id, username, password, name));

            }

        }

        @Override

        public void cancel() {

            System.out.println("任务被取消......");

        }

    }

    /**

     数据定义类, POJO

     */

    @Data

    @AllArgsConstructor

    @NoArgsConstructor

    public static class UserInfo {

        int id;

        String username;

        String password;

        String name;

    }

}

/**

 * 从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL

 */

public class JDBCSinkDemo {

    public static void main(String[] args) throws Exception {

        //创建Flink流计算执行环境

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000);

        //创建DataStream

        //Source

        DataStreamSource<String> lines = env.socketTextStream("node01", 9999);

        //调用Transformation

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override

            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {

                String[] words = line.split(" ");

                for (String word : words) {

                    //new Tuple2<String, Integer>(word, 1)

                    collector.collect(Tuple2.of(word, 1));

                }

            }

        });

        //分组

        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {

            @Override

            public String getKey(Tuple2<String, Integer> tp) throws Exception {

                return tp.f0;

            }

        });

        //聚合

        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

        summed.addSink(JdbcSink.sink(

                "INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?",

                (ps, t) -> {

                    ps.setString(1, t.f0);

                    ps.setInt(2, t.f1);

                    ps.setInt(3, t.f1);

                },

                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

                        .withUrl("jdbc:mysql://node03:3306/test?characterEncoding=utf-8")

                        .withDriverName("com.mysql.jdbc.Driver")

                        .withUsername("root")

                        .withPassword("123456")

                        .build()));

        //启动执行

        env.execute("JDBCSinkDemo");

    }

}

  相关解决方案