当前位置: 代码迷 >> 综合 >> Flink Table API SQL 编程
  详细解决方案

Flink Table API SQL 编程

热度:19   发布时间:2023-12-18 05:06:35.0

目录

Maven依赖

DEMO1 消费Kafka主题数据,生产主题数据

DEMO2 Table与DataStream的互相转换

DEMO3 时间窗口

sql-client的使用

执行建表语句

filesystem数据源

kafka数据源

执行查询语句



Maven依赖

    <dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><!--json--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!--csv--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency></dependencies>

DEMO1 消费Kafka主题数据,生产主题数据

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** Description: 从Kafka主题kafkaInput读取数据,筛选后将指定数据写入kafkaOutput* 数据样式:* {"id":"1","user_id":"1001","status":"1"}* {"id":"2","user_id":"1002","status":"1"}* {"id":"3","user_id":"1003","status":"1"}* {"id":"4","user_id":"1004","status":"1"}* {"id":"5","user_id":"1005","status":"0"}*/
public class Demo01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("CREATE TABLE kafka_input(id BIGINT,user_id BIGINT," +"status STRING) " +"WITH ('connector' = 'kafka','topic' = 'kafkaInput'," +"'properties.bootstrap.servers' = 'vm01:9092'," +"'properties.group.id' = 'testGroup'," +"'scan.startup.mode' = 'latest-offset'," +"'format' = 'json')");Table table = tableEnv.sqlQuery("SELECT * FROM kafka_input WHERE status='1'");tableEnv.executeSql("CREATE TABLE kafka_output(id BIGINT,user_id BIGINT," +"status STRING)" +" WITH ('connector'='kafka','topic'='kafkaOutput'," +"'properties.bootstrap.servers' = 'vm01:9092'," +"'format'='json'," +"'sink.partitioner'='round-robin')");tableEnv.executeSql("INSERT INTO kafka_output SELECT * FROM " + table);// 会报错:No operators defined in streaming topology. Cannot execute.但不影响程序运行// tableEnv.execute("Run TableapiKafkaDemo");}
}

DEMO2 Table与DataStream的互相转换

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Demo02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// kafka table -> stream
//        table2Stream(tEnv);// stream -> table -> streamstream2Table(env, tEnv);env.execute();}/*** kafka table -> stream print()* 数据格式:1,Jordan,male* @param tEnv StreamTableEnvironment*/private static void table2Stream(StreamTableEnvironment tEnv) {String createTableSql = "CREATE TABLE user_info(" +"id INT," +"name STRING," +"gender STRING" +")WITH(" +"'connector'='kafka'," +"'topic'='user_info'," +"'properties.bootstrap.servers'='vm01:9092'," +"'properties.group.id'='testGroup'," +"'scan.startup.mode'='latest-offset'," +"'format'='csv'" +")";tEnv.executeSql(createTableSql);Table table = tEnv.sqlQuery("SELECT * FROM user_info");tEnv.toDataStream(table).print();}/*** stream -> table -> stream* * @param env  StreamExecutionEnvironment* @param tEnv StreamTableEnvironment*/private static void stream2Table(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {DataStream<String> dataStream = env.fromElements("Jordan", "Alice", "Tom");Table inputTable = tEnv.fromDataStream(dataStream);tEnv.createTemporaryView("INPUT_TABLE", inputTable);Table resultTable = tEnv.sqlQuery("SELECT UPPER(f0) FROM INPUT_TABLE");resultTable.printSchema();tEnv.toDataStream(resultTable).print();}
}

DEMO3 时间窗口

建表时定义一个TIMESTAMP(3)数据格式的列作为时间窗口取值列,并通过:WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND 指定水位线为5秒,然后正常开窗和使用聚合函数。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;/*** Kafka数据格式:1,jordan,79,2022-03-05 00:00:30*/
public class Demo03 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);String createTableSql = "CREATE TABLE user_info(" +"id INT," +"name STRING," +"score INT," +"row_time TIMESTAMP(3)," +"WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND" +")WITH(" +"'connector'='kafka'," +"'topic'='t_score'," +"'properties.bootstrap.servers'='vm01:9092'," +"'properties.group.id'='testGroup'," +"'scan.startup.mode'='latest-offset'," +"'format'='csv'" +")";tEnv.executeSql(createTableSql);Table table = tEnv.sqlQuery("SELECT * FROM user_info");Table resultTable = table.window(Tumble.over(lit(10).seconds()).on($("row_time")).as("w"))
//                            table.window(Slide.over(lit(10).seconds()).every(lit(20).seconds()).on($("row_time")).as("w")).groupBy($("name"), $("w")).select($("name"), $("score").avg(), $("w").end().as("hour"));resultTable.printSchema();tEnv.toDataStream(resultTable).print();env.execute("Demo03");}
}

sql-client的使用

# 启动集群
${FLINK_HOME}/bin/start-cluster.sh
# 启动命令行
${FLINK_HOME}/bin/sql-client.sh

此时可以交互式执行sql语句。

此外,可以通过使用以下方式执行sql脚本

# 其中:~/file.sql为可正常执行的sql脚本,-f也可换成--file
${FLINK_HOME}/bin/sql-client.sh -f ~/file.sql

执行建表语句

如果需要使用时间窗口,则必须指定watermark

filesystem数据源

CREATE TABLE bid(bidtime TIMESTAMP(3),price DECIMAL(10, 2),item STRING,WATERMARK FOR bidtime AS bidtime - INTERVAL '10' MINUTES
) WITH ( 'connector' = 'filesystem','path' = '/root/data/bid.csv','format' = 'csv'
);

kafka数据源

CREATE TABLE user_info(id INT,name STRING,score INT,row_time TIMESTAMP(3),WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector'='kafka','topic'='t_score','properties.bootstrap.servers'='vm01:9092','properties.group.id'='testGroup','scan.startup.mode'='latest-offset','format'='csv'
);

执行查询语句

一般的查询语句与通用SQL类似,对于内置函数,可以通过SHOW FUNCTIONS进行查看。

下面演示的是窗口函数的使用。

# 所查询的表为章节【执行建表语句->filesystem数据源】所建的表
SELECT window_start, window_end, SUM(price) FROM TABLE(TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;

  相关解决方案