1.简介
我们在实际使用flink的过程中,不仅使用java开发flink作业,同时也会直接使用flink SQL,通过编写SQL的方式,来实现flink作业。本文就基于Flink 官方提供的 Kubernetes Operator,来实现flink sql在k8s上的运行。
2.程序功能示意图
开发一个通用的flink sql运行程序,通过向程序的启动类传参的方式,将sql脚本的路径传给程序,然后程序就会读取sql脚本,解析出里面的sql语句,并把他们转换为flink 任务提交
sql脚本的逻辑主要是从Kafka读取数据,并经过flink处理,将数据写入到mysql
3.Flink SQL程序开发
主程序:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; import java.util.List; public class SqlRunner { private static final Logger LOG = LoggerFactory.getLogger(SqlRunner.class); private static final String STATEMENT_DELIMITER = ";"; // a statement should end with `;` private static final String LINE_DELIMITER = "\n"; private static final String COMMENT_PATTERN = "(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))"; public static void main(String[] args) throws Exception { String scriptFilePath = "simple.sql"; if (args.length == 1) { scriptFilePath = args[0]; } String script = FileUtils.readFileUtf8(new File(scriptFilePath)); List<String> statements = parseStatements(script); //建立Stream环境,设置并行度为1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); //建立Table环境 StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env); TableResult tableResult = null; for (String statement : statements) { LOG.info("Executing:\n{}", statement); System.out.println("Executing: " + statement); tableResult = tableEnvironment.executeSql(statement); } // ** executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,需要手动拿到那个executeSql的返回的TableResult tableResult.getJobClient().get().getJobExecutionResult().get(); } public static List<String> parseStatements(String script) { String formatted = formatSqlFile(script).replaceAll(COMMENT_PATTERN, ""); List<String> statements = new ArrayList<String>(); StringBuilder current = null; boolean statementSet = false; for (String line : formatted.split("\n")) { String trimmed = line.trim(); if (trimmed == null || trimmed.length() < 1) { continue; } if (current == null) { current = new StringBuilder(); } if (trimmed.startsWith("EXECUTE STATEMENT SET")) { statementSet = true; } current.append(trimmed); current.append("\n"); if (trimmed.endsWith(STATEMENT_DELIMITER)) { if (!statementSet || trimmed.equals("END;")) { // SQL语句不能以分号结尾 statements.add(current.toString().replace(";", "")); current = null; statementSet = false; } } } return statements; } public static String formatSqlFile(String content) { String trimmed = content.trim(); StringBuilder formatted = new StringBuilder(); formatted.append(trimmed); if (!trimmed.endsWith(STATEMENT_DELIMITER)) { formatted.append(STATEMENT_DELIMITER); } formatted.append(LINE_DELIMITER); return formatted.toString(); } }
sql 脚本文件:放在程序的根目录下
-- source到kafka CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = '172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- sink到mysql CREATE TABLE pvuv_sink ( dt VARCHAR, pv BIGINT, uv BIGINT, primary key (dt) not enforced ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://172.16.252.113:3306/flink_test?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC', 'table-name' = 'pvuv_sink', 'username' = 'root', 'password' = '111...aaa' ); -- 打印到屏幕 -- CREATE TABLE pvuv_sink -- ( -- dt VARCHAR, -- pv BIGINT, -- uv BIGINT -- ) WITH ( -- 'connector' = 'print' -- ); -- 把统计结果数据保存到MySQL INSERT INTO pvuv_sink SELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
mysql 建表语句:
CREATE TABLE pvuv_sink ( dt VARCHAR(30), pv BIGINT, uv BIGINT, primary key (dt) )
本地运行,往Kafka 发送消息,准备的数据是json格式的,要和Kafka定义表的数据格式一致
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 01:00:00"} {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 02:00:00"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 03:36:00"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 04:36:00"}
测试通过后,打包把相关的依赖范围改为provided,减少jar包体积。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> <scope>provided</scope> </dependency> <!-- Flink SQL 相关 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
4.Flink SQL镜像构建
mkdir flink-sql cd flink-sql/ wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.13.6/flink-connector-jdbc_2.12-1.13.6.jar wget https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar
编写Dockerfile
[root@k8s-demo001 flink-sql]# cat Dockerfile FROM flink:1.13.6 WORKDIR /opt/flink COPY flink-connector-jdbc_2.12-1.13.6.jar /opt/flink/lib/flink-connector-jdbc_2.12-1.13.6.jar COPY flink-sql-connector-kafka_2.12-1.13.6.jar /opt/flink/lib/flink-sql-connector-kafka_2.12-1.13.6.jar COPY mysql-connector-java-5.1.49.jar /opt/flink/lib/mysql-connector-java-5.1.49.jar RUN chown -R flink:flink /opt/flink/lib ENTRYPOINT ["/docker-entrypoint.sh"] EXPOSE 6123 8081 CMD ["help"]
构建镜像,并上传到镜像仓库
docker build -f Dockerfile -t flink-sql:1.13.6 . docker tag flink-sql:1.13.6 172.16.252.110:8011/flinkonk8s/flink-sql:1.13.6 docker push 172.16.252.110:8011/flinkonk8s/flink-sql:1.13.6
5.提交flink-sql作业
[root@k8s-demo001 ~]# cat sql-application.yaml apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: namespace: flink name: sql-application spec: image: 172.16.252.110:8011/flinkonk8s/flink-sql:1.13.6 # 使用flink-sql的镜像 flinkVersion: v1_13 imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取 ingress: # ingress配置,用于访问flink web页面 template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)" className: "nginx" annotations: nginx.ingress.kubernetes.io/rewrite-target: "/$2" flinkConfiguration: taskmanager.numberOfTaskSlots: "2" classloader.resolve-order: parent-first # 默认是child-first,必须改为parent-first,先加载flink自带的Jar,要不然容易出现各种类型不匹配问题 serviceAccount: flink jobManager: resource: memory: "1024m" cpu: 1 taskManager: resource: memory: "1024m" cpu: 1 podTemplate: spec: hostAliases: - ip: "172.16.252.129" hostnames: - "Kafka-01" - ip: "172.16.252.130" hostnames: - "Kafka-02" - ip: "172.16.252.131" hostnames: - "Kafka-03" containers: - name: flink-main-container env: - name: TZ value: Asia/Shanghai volumeMounts: - name: flink-jar # 挂载nfs上的jar mountPath: /opt/flink/jar - name: flink-log mountPath: /opt/flink/log volumes: - name: flink-jar persistentVolumeClaim: claimName: flink-jar-pvc - name: flink-log persistentVolumeClaim: claimName: flink-log-pvc job: jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包 entryClass: org.fblinux.SqlRunner args: # 传递到作业main方法的参数 - "/opt/flink/jar/simple.sql" parallelism: 1 upgradeMode: stateless [root@k8s-demo001 ~]# kubectl apply -f sql-application.yaml
通过Kafka,发送测试数据
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 11:00:00"} {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 12:00:00"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 13:36:00"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 14:36:00"}
转载请注明:西门飞冰的博客 » (6)Flink SQL on k8s 实现