由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

(6)Flink SQL on k8s 实现

大数据 西门飞冰 2021℃
[隐藏]

1.简介

我们在实际使用flink的过程中,不仅使用java开发flink作业,同时也会直接使用flink SQL,通过编写SQL的方式,来实现flink作业。本文就基于Flink 官方提供的 Kubernetes Operator,来实现flink sql在k8s上的运行。

2.程序功能示意图

开发一个通用的flink sql运行程序,通过向程序的启动类传参的方式,将sql脚本的路径传给程序,然后程序就会读取sql脚本,解析出里面的sql语句,并把他们转换为flink 任务提交

sql脚本的逻辑主要是从Kafka读取数据,并经过flink处理,将数据写入到mysql

image-20230528182817277

在我们这个案例中,我们只需要开发好SqlRunner,具体的逻辑都在sql脚本里面定义,这样只需要编写不同的脚本就可以,让程序变得非常通用

主程序:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.FileUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

public class SqlRunner {

    private static final Logger LOG = LoggerFactory.getLogger(SqlRunner.class);

    private static final String STATEMENT_DELIMITER = ";"; // a statement should end with `;`
    private static final String LINE_DELIMITER = "\n";

    private static final String COMMENT_PATTERN = "(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))";

    public static void main(String[] args) throws Exception {
        String scriptFilePath = "simple.sql";
        if (args.length == 1) {
            scriptFilePath = args[0];
        }

        String script = FileUtils.readFileUtf8(new File(scriptFilePath));
        List<String> statements = parseStatements(script);

        //建立Stream环境,设置并行度为1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //建立Table环境
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        TableResult tableResult = null;
        for (String statement : statements) {
            LOG.info("Executing:\n{}", statement);
            System.out.println("Executing: " + statement);
            tableResult = tableEnvironment.executeSql(statement);
        }
//        ** executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,需要手动拿到那个executeSql的返回的TableResult
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

    public static List<String> parseStatements(String script) {
        String formatted = formatSqlFile(script).replaceAll(COMMENT_PATTERN, "");
        List<String> statements = new ArrayList<String>();

        StringBuilder current = null;
        boolean statementSet = false;
        for (String line : formatted.split("\n")) {
            String trimmed = line.trim();
            if (trimmed == null || trimmed.length() < 1) {
                continue;
            }
            if (current == null) {
                current = new StringBuilder();
            }
            if (trimmed.startsWith("EXECUTE STATEMENT SET")) {
                statementSet = true;
            }
            current.append(trimmed);
            current.append("\n");
            if (trimmed.endsWith(STATEMENT_DELIMITER)) {
                if (!statementSet || trimmed.equals("END;")) {
                    // SQL语句不能以分号结尾
                    statements.add(current.toString().replace(";", ""));
                    current = null;
                    statementSet = false;
                }
            }
        }
        return statements;
    }

    public static String formatSqlFile(String content) {
        String trimmed = content.trim();
        StringBuilder formatted = new StringBuilder();
        formatted.append(trimmed);
        if (!trimmed.endsWith(STATEMENT_DELIMITER)) {
            formatted.append(STATEMENT_DELIMITER);
        }
        formatted.append(LINE_DELIMITER);
        return formatted.toString();
    }
}

sql 脚本文件:放在程序的根目录下

-- source到kafka
CREATE TABLE user_log
(
    user_id     VARCHAR,
    item_id     VARCHAR,
    category_id VARCHAR,
    behavior    VARCHAR,
    ts          TIMESTAMP
) WITH (
      'connector' = 'kafka',
      'topic' = 'user_behavior',
      'properties.bootstrap.servers' = '172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092',
      'properties.group.id' = 'testGroup1',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
      );

-- sink到mysql
CREATE TABLE pvuv_sink (
                           dt VARCHAR,
                           pv BIGINT,
                           uv BIGINT,
                           primary key (dt) not enforced
) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://172.16.252.113:3306/flink_test?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC',
      'table-name' = 'pvuv_sink',
      'username' = 'root',
      'password' = '111...aaa'
      );

-- 打印到屏幕
-- CREATE TABLE pvuv_sink
-- (
--     dt VARCHAR,
--     pv BIGINT,
--     uv BIGINT
-- ) WITH (
--       'connector' = 'print'
--       );

-- 把统计结果数据保存到MySQL
INSERT INTO pvuv_sink
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
       COUNT(*)                AS          pv,
       COUNT(DISTINCT user_id) AS          uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');

mysql 建表语句:

CREATE TABLE pvuv_sink (
    dt VARCHAR(30),
    pv BIGINT,
    uv BIGINT,
    primary key (dt)
)

本地运行,往Kafka 发送消息,准备的数据是json格式的,要和Kafka定义表的数据格式一致

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 01:00:00"}
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 02:00:00"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 03:36:00"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 04:36:00"}

测试通过后,打包把相关的依赖范围改为provided,减少jar包体积。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
    <scope>provided</scope>
</dependency>

<!-- Flink SQL 相关 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

上传jar包和simple.sql到flink-jar-pvc对应的pv目录下

因为JobManager启动的时候就要去连接kafka和mysql,而flink自有lib目录下没有kafka和mysql的jar包,所以需要新构建flink-sql的镜像,将kafka和mysql的jar包放到/opt/flint/lib目录下

mkdir flink-sql
cd flink-sql/
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.13.6/flink-connector-jdbc_2.12-1.13.6.jar
wget https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar

编写Dockerfile

[root@k8s-demo001 flink-sql]# cat Dockerfile 
FROM flink:1.13.6
WORKDIR /opt/flink
COPY flink-connector-jdbc_2.12-1.13.6.jar /opt/flink/lib/flink-connector-jdbc_2.12-1.13.6.jar
COPY flink-sql-connector-kafka_2.12-1.13.6.jar /opt/flink/lib/flink-sql-connector-kafka_2.12-1.13.6.jar
COPY mysql-connector-java-5.1.49.jar /opt/flink/lib/mysql-connector-java-5.1.49.jar
RUN chown -R flink:flink /opt/flink/lib
ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["help"]

构建镜像,并上传到镜像仓库

docker build -f Dockerfile -t flink-sql:1.13.6 .
docker tag flink-sql:1.13.6 172.16.252.110:8011/flinkonk8s/flink-sql:1.13.6
docker push 172.16.252.110:8011/flinkonk8s/flink-sql:1.13.6

1、编写flink-sql作业yaml并提交

[root@k8s-demo001 ~]# cat sql-application.yaml 
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: sql-application
spec:
  image: 172.16.252.110:8011/flinkonk8s/flink-sql:1.13.6  # 使用flink-sql的镜像
  flinkVersion: v1_13
  imagePullPolicy: IfNotPresent  # 镜像拉取策略,本地没有则从仓库拉取
  ingress:   # ingress配置,用于访问flink web页面
    template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    classloader.resolve-order: parent-first  # 默认是child-first,必须改为parent-first,先加载flink自带的Jar,要不然容易出现各种类型不匹配问题
  serviceAccount: flink
  jobManager:
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    resource:
      memory: "1024m"
      cpu: 1
  podTemplate:
    spec:
      hostAliases:
        - ip: "172.16.252.129"
          hostnames:
            - "Kafka-01"
        - ip: "172.16.252.130"
          hostnames:
            - "Kafka-02"
        - ip: "172.16.252.131"
          hostnames:
            - "Kafka-03"
      containers:
        - name: flink-main-container
          env:
            - name: TZ
              value: Asia/Shanghai
          volumeMounts:
            - name: flink-jar  # 挂载nfs上的jar
              mountPath: /opt/flink/jar
            - name: flink-log
              mountPath: /opt/flink/log
      volumes:
        - name: flink-jar
          persistentVolumeClaim:
            claimName: flink-jar-pvc
        - name: flink-log
          persistentVolumeClaim:
            claimName: flink-log-pvc
  job:
    jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包
    entryClass: org.fblinux.SqlRunner
    args:   # 传递到作业main方法的参数
      - "/opt/flink/jar/simple.sql"
    parallelism: 1
    upgradeMode: stateless
[root@k8s-demo001 ~]# kubectl apply -f sql-application.yaml

 验证pod创建情况

image-20230529143544319

通过Kafka,发送测试数据

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 11:00:00"}
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 12:00:00"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 13:36:00"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 14:36:00"}

转载请注明:西门飞冰的博客 » (6)Flink SQL on k8s 实现

喜欢 (2)or分享 (0)