(1)通过FlinkSQL将数据写入mysql demo

(1)通过FlinkSQL将数据写入mysql demo插图

FlinkSQL的出现,极大程度上降低了Flink的编程门槛,更加容易理解和掌握使用。今天将自己的笔记分享出来,希望能帮助在这方面有需要的朋友。

(1)首先引入POM依赖:

properties>
flink.version>1.13.1flink.version>
scala.binary.version>2.12scala.binary.version>
slf4j.version>1.7.30slf4j.version>
properties>

dependencies>
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-javaartifactId>
version>${flink.version}version>
dependency>
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-streaming-java_${scala.binary.version}artifactId>
version>${flink.version}version
dependency>
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-clients_${scala.binary.version}artifactId>
version>${flink.version}version>
dependency>
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-table-api-java-bridge_${scala.binary.version}artifactId>
version>${flink.version}version>
dependency>


dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-connector-jdbc_${scala.binary.version}artifactId>
version>${flink.version}version>

dependency>
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-table-planner-blink_${scala.binary.version}artifactId>
version>${flink.version}version>
dependency>
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-streaming-scala_${scala.binary.version}artifactId>
version>${flink.version}version>
dependency>
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-table-commonartifactId>
version>${flink.version}version>
dependency>
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-jsonartifactId>
version>${flink.version}version>
dependency>

dependency>
groupId>com.fasterxml.jackson.coregroupId>
artifactId>jackson-databindartifactId>
version>2.12.0version>
dependency>


dependency>
groupId>mysqlgroupId>
artifactId>mysql-connector-javaartifactId>
version>8.0.16version>
dependency>

dependency>
groupId>com.alibabagroupId>
artifactId>fastjsonartifactId>
version>1.2.66version>
dependency>
dependencies>

(2)编写代码

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
//.useOldPlanner() // flink
.useBlinkPlanner() // blink
.build();
StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);


String ddl = "CREATE TABLE flinksinksds(rn" +
"componentname STRING,rn" +
"componentcount INT,rn" +
"componentsum INTrn" +
") WITH(rn" +
"'connector.type'='jdbc',rn" +
"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
"'connector.url'='jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',rn" +
"'connector.table'='flinksink',rn" +
"'connector.username'='root',rn" +
"'connector.password'='root',rn" +
"'connector.write.flush.max-rows'='1'rn" +
")";
System.err.println(ddl);
ste.executeSql(ddl);

String insert = "insert into flinksinksds(componentname,componentcount,componentsum)" +
"values('1024', 1 , 2 )";
ste.executeSql(insert);
env.execute();
System.exit(0);
}

(3)执行结果:

(1)通过FlinkSQL将数据写入mysql demo插图1

文章来源于互联网:(1)通过FlinkSQL将数据写入mysql demo

THE END
分享
二维码