1.简介
元数据checkpoint主要用于Spark Streaming场景,以便Driver从故障中快速恢复任务的DAG和状态数据; 而RDD checkpoint主要是对有状态转换算子的数据做持久化,以切断依赖链,缩短Spark程序恢复时间 在Spark On K8s环境下,checkpoint数据可以通过PVC+PV保存,也可以保存在HDFS或S3上
2.checkpoint 程序开发
为了测试Spark On K8s下与Spark Checkpoint的应用,为此开发一个WordCount示例程序,该程序从Kafka读取文本消息,然后统计文本消息中每个单词出现的次数,并将结果保存到MySQL。
1、开发示例程序:使用Java + Maven开发,Java版本是1.8,Spark版本是3.2.3,Scala版本是2.12。
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function3; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.State; import org.apache.spark.streaming.StateSpec; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaMapWithStateDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import scala.Tuple2; import java.sql.*; import java.util.*; import java.util.regex.Pattern; /** * 从Kafka读取文本消息,逐行分词并统计单词出现次数,最后将统计结果保存到MySQL */ public class StreamWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { // 用于本地测试 int batchDuration = 5; String brokers = "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092"; String topics = "spark_test"; String checkpoint = "file:///Users/willwang/spark-on-k8s-demo/checkpoint"; String dbHost = "172.16.252.113"; String dbPort = "3306"; String dbName = "spark_test"; String table = "wc"; SparkConf sparkConf = new SparkConf().setAppName("StreamWordCount") .setMaster("local[1]"); // 在K8s上运行使用以下代码 // if (args.length < 8) { // System.err.println("Usage: StreamWordCount <batchDuration> <brokers> <topics> <checkpoint> " + // "<dbHost> <dbPort> <dbName> <table>"); // System.exit(1); // } // int batchDuration = Integer.parseInt(args[0]); // String brokers = args[1]; // String topics = args[2]; // String checkpoint = args[3]; // String dbHost = args[4]; // String dbPort = args[5]; // String dbName = args[6]; // String table = args[7]; // SparkConf sparkConf = new SparkConf().setAppName("StreamWordCount"); // 创建Spark Streaming上下文环境,如果checkpoint路径下有数据,则从checkpoint恢复,否则新创建上下文 JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkpoint, () -> { JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(batchDuration)); // 设置checkpoint路径 context.checkpoint(checkpoint); // 设置连接kafka的参数 Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "spark-wc"); kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 创建 kafka stream JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream( context, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams)); // 获取kafka消息 JavaDStream<String> lines = messages.map(ConsumerRecord::value); // 对每条字符串消息分词 JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); // 定义有状态mapping函数,累计每个单词出现次数 Function3<String, org.apache.spark.api.java.Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2<String, Integer> output = new Tuple2<>(word, sum); state.update(sum); return output; }; // 应用有状态mapping函数 JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)).mapWithState(StateSpec.function(mappingFunc)); // 打印统计结果 wordCounts.print(); // 将统计结果保存到MySQL wordCounts.foreachRDD(rdd -> { //对该rdd的整个批次进行处理,不是对单个元素进行处理 rdd.foreachPartition(iter -> { // 连接MySQL Connection conn; PreparedStatement insertPS; PreparedStatement updatePS; String url = "jdbc:mysql://" + dbHost + ":" + dbPort + "/" + dbName + "?characterEncoding=utf8&autoReconnect=true&useUnicode=true&useSSL=false"; //建立mysql连接 conn = DriverManager.getConnection(url, "root", "数据库的密码"); //遍历rdd中的数据,将数据保存在mysql上 while (iter.hasNext()) { Tuple2<String, Integer> t = iter.next(); // 查询某个单词是否已经保存到MySQL String sql = "select count(*) from " + table + " where word = '" + t._1() + "'"; Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(sql); long cnt = 0; while (resultSet.next()) { cnt = resultSet.getLong(1); } resultSet.close(); statement.close(); // 如果已经保存,则更新 if (cnt > 0) { String sql2 = "UPDATE " + table + " set cnt = ? where word = ?;"; updatePS = conn.prepareStatement(sql2); updatePS.setLong(1, t._2()); updatePS.setString(2, t._1()); updatePS.executeUpdate(); updatePS.close(); } else { // 如果没有保存,则新增 String sql2 = "INSERT INTO " + table + " (`word`, `cnt`) VALUES(?,?);"; insertPS = conn.prepareStatement(sql2); insertPS.setString(1, t._1()); insertPS.setLong(2, t._2()); insertPS.executeUpdate(); insertPS.close(); } } // 关闭连接 conn.close(); }); }); return context; }); // 启动Spark作业 jssc.start(); jssc.awaitTermination(); } }
3.spark checkpoint写入pv
1、创建checkpoint pvc
[root@k8s-demo001 ~]# cat spark-checkpoint-pvc.yaml #spark checkpoint 持久化存储pvc apiVersion: v1 kind: PersistentVolumeClaim metadata: name: spark-checkpoint-pvc # spark checkpoint pvc名称 namespace: apache-spark # 指定归属的名命空间 spec: storageClassName: nfs-storage #sc名称,更改为实际的sc名称 accessModes: - ReadWriteMany #采用ReadWriteMany的访问模式 resources: requests: storage: 1Gi #存储容量,根据实际需要更改 [root@k8s-demo001 ~]# kubectl apply -f spark-checkpoint-pvc.yaml
2、编写自定义driver和executor pod模板文件。driver和executor都要挂载checkpoint pvc,否则Spark在执行checkpoint时会报错。
[root@k8s-demo001 ~]# cat /data/nginx_down/driver-cp-pv.yaml apiVersion: v1 kind: Pod metadata: labels: app.kubernetes.io/name: apache-spark app.kubernetes.io/instance: apache-spark app.kubernetes.io/version: v3.2.3 namespace: apache-spark name: driver spec: serviceAccountName: spark-service-account hostAliases: - ip: "172.16.252.105" hostnames: - "k8s-demo001" - ip: "172.16.252.134" hostnames: - "k8s-demo002" - ip: "172.16.252.135" hostnames: - "k8s-demo003" - ip: "172.16.252.136" hostnames: - "k8s-demo004" containers: - image: apache/spark:v3.2.3 name: driver imagePullPolicy: IfNotPresent env: - name: TZ value: Asia/Shanghai resources: requests: cpu: 1 memory: 1Gi limits: cpu: 1 memory: 1Gi volumeMounts: - name: spark-jar # 挂载Jar mountPath: /opt/spark/appJars - name: spark-historyserver # 挂载eventLog归档目录 mountPath: /opt/spark/eventLog - name: spark-logs # 挂载日志 mountPath: /opt/spark/logs - name: spark-checkpoint # 挂载checkpoint路径 mountPath: /opt/spark/checkpoint-wc volumes: - name: spark-jar persistentVolumeClaim: claimName: spark-jar-pvc - name: spark-historyserver persistentVolumeClaim: claimName: spark-historyserver-pvc - name: spark-logs persistentVolumeClaim: claimName: spark-logs-pvc - name: spark-checkpoint persistentVolumeClaim: claimName: spark-checkpoint-pvc
executor:
[root@k8s-demo001 ~]# cat /data/nginx_down/executor-cp-pv.yaml apiVersion: v1 kind: Pod metadata: labels: app.kubernetes.io/name: apache-spark app.kubernetes.io/instance: apache-spark app.kubernetes.io/version: v3.2.3 namespace: apache-spark name: executor spec: serviceAccountName: spark-service-account hostAliases: - ip: "172.16.252.105" hostnames: - "k8s-demo001" - ip: "172.16.252.134" hostnames: - "k8s-demo002" - ip: "172.16.252.135" hostnames: - "k8s-demo003" - ip: "172.16.252.136" hostnames: - "k8s-demo004" containers: - image: apache/spark:v3.2.3 name: executor imagePullPolicy: IfNotPresent env: - name: TZ value: Asia/Shanghai resources: requests: cpu: 1 memory: 1Gi limits: cpu: 1 memory: 1Gi volumeMounts: - name: spark-jar # 挂载Jar mountPath: /opt/spark/appJars - name: spark-logs # 挂载日志 mountPath: /opt/spark/logs - name: spark-checkpoint # 挂载checkpoint路径 mountPath: /opt/spark/checkpoint-wc volumes: - name: spark-jar persistentVolumeClaim: claimName: spark-jar-pvc - name: spark-logs persistentVolumeClaim: claimName: spark-logs-pvc - name: spark-checkpoint persistentVolumeClaim: claimName: spark-checkpoint-pvc
3、提交spark作业
./spark-3.2.3/bin/spark-submit \ --name StreamWordCount \ --verbose \ --master k8s://https://172.16.252.105:6443 \ --deploy-mode cluster \ --conf spark.network.timeout=300 \ --conf spark.executor.instances=1 \ --conf spark.driver.cores=1 \ --conf spark.executor.cores=1 \ --conf spark.driver.memory=1024m \ --conf spark.executor.memory=1024m \ --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-wc-cp-pv.log" \ --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-wc-cp-pv.log" \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=file:///opt/spark/eventLog \ --conf spark.history.fs.logDirectory=file:///opt/spark/eventLog \ --conf spark.kubernetes.namespace=apache-spark \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \ --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \ --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \ --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \ --conf spark.kubernetes.driver.podTemplateFile=http://172.16.252.105:8080/driver-cp-pv.yaml \ --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-cp-pv.yaml \ --class org.fblinux.StreamWordCount \ local:///opt/spark/appJars/spark-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar \ 5 \ 172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092 \ spark_test \ file:///opt/spark/checkpoint-wc \ 172.16.252.113 \ 3306 \ spark_test \ wc
4.spark checkpoint写入hdfs
1、创建Hadoop checkpoint
hdfs dfs -mkdir /tmp/spark/checkpoint-wc
2、编写自定义driver和executor pod模板文件
driver:
[root@k8s-demo001 ~]# cat /data/nginx_down/driver-hadoop.yaml apiVersion: v1 kind: Pod metadata: labels: app.kubernetes.io/name: apache-spark app.kubernetes.io/instance: apache-spark app.kubernetes.io/version: v3.2.3 namespace: apache-spark name: driver spec: serviceAccountName: spark-service-account hostAliases: - ip: "172.16.252.105" hostnames: - "k8s-demo001" - ip: "172.16.252.134" hostnames: - "k8s-demo002" - ip: "172.16.252.135" hostnames: - "k8s-demo003" - ip: "172.16.252.136" hostnames: - "k8s-demo004" containers: - image: apache/spark:v3.2.3 name: driver imagePullPolicy: IfNotPresent env: - name: TZ value: Asia/Shanghai - name: HADOOP_USER_NAME value: root - name: SPARK_USER value: root resources: requests: cpu: 1 memory: 1Gi limits: cpu: 1 memory: 1Gi volumeMounts: - name: spark-jar # 挂载Jar mountPath: /opt/spark/appJars - name: spark-historyserver # 挂载eventLog归档目录 mountPath: /opt/spark/eventLog - name: spark-logs # 挂载日志 mountPath: /opt/spark/logs volumes: - name: spark-jar persistentVolumeClaim: claimName: spark-jar-pvc - name: spark-historyserver persistentVolumeClaim: claimName: spark-historyserver-pvc - name: spark-logs persistentVolumeClaim: claimName: spark-logs-pvc
executor:
[root@k8s-demo001 ~]# cat /data/nginx_down/executor-hadoop.yaml apiVersion: v1 kind: Pod metadata: labels: app.kubernetes.io/name: apache-spark app.kubernetes.io/instance: apache-spark app.kubernetes.io/version: v3.2.3 namespace: apache-spark name: executor spec: serviceAccountName: spark-service-account hostAliases: - ip: "172.16.252.105" hostnames: - "k8s-demo001" - ip: "172.16.252.134" hostnames: - "k8s-demo002" - ip: "172.16.252.135" hostnames: - "k8s-demo003" - ip: "172.16.252.136" hostnames: - "k8s-demo004" containers: - image: apache/spark:v3.2.3 name: executor imagePullPolicy: IfNotPresent env: - name: TZ value: Asia/Shanghai - name: HADOOP_USER_NAME value: root - name: SPARK_USER value: root resources: requests: cpu: 1 memory: 1Gi limits: cpu: 1 memory: 1Gi volumeMounts: - name: spark-jar # 挂载Jar mountPath: /opt/spark/appJars - name: spark-logs # 挂载日志 mountPath: /opt/spark/logs volumes: - name: spark-jar persistentVolumeClaim: claimName: spark-jar-pvc - name: spark-logs persistentVolumeClaim: claimName: spark-logs-pvc
3、提交作业,路径不加file,默认读写hdfs文件系统
./spark-3.2.3-hadoop/bin/spark-submit \ --name StreamWordCount \ --verbose \ --master k8s://https://172.16.252.105:6443 \ --deploy-mode cluster \ --conf spark.network.timeout=300 \ --conf spark.executor.instances=1 \ --conf spark.driver.cores=1 \ --conf spark.executor.cores=1 \ --conf spark.driver.memory=1024m \ --conf spark.executor.memory=1024m \ --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-wc-cp-hadoop.log" \ --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-wc-cp-hadoop.log" \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=file:///opt/spark/eventLog \ --conf spark.history.fs.logDirectory=file:///opt/spark/eventLog \ --conf spark.kubernetes.namespace=apache-spark \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \ --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \ --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \ --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \ --conf spark.kubernetes.driver.podTemplateFile=http://172.16.252.105:8080/driver-hadoop.yaml \ --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-hadoop.yaml \ --class org.fblinux.StreamWordCount \ local:///opt/spark/appJars/spark-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar \ 5 \ 172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092 \ spark_test \ /tmp/spark/checkpoint-wc \ 172.16.252.113 \ 3306 \ spark_test \ wc
转载请注明:西门飞冰的博客 » (4)Spark on K8S checkpoint实践