1.function 背景
当我们进行流式处理的时候,很多情况下,我们的需求可能只是下面这些简单的操作:简单的ETL 操作\聚合计算操作等相关服务。
但为了实现这些功能,我们不得不去部署一整套 流处理服务(spark、flink等)。但是我们仅仅需要这些服务的一小部分功能,部署流处理引擎的成本可能比用户开发这个功能本身更困难。
基于这个原因,pulsar设计并实现了Pulsar Functions,在Pulsar functions中,用户只需要关心计算逻辑本身,而不需要去了解或者部署流处理服务,当然也可以将pulsar function与现有的流处理服务一起使用。
2.function 介绍
Pulsar Functions 的设计灵感来自于 Heron 这样的流处理引擎,Pulsar Functions 将会拓展Pulsar和整个消息领域的未来。使用 Pulsar Functions,用户可以轻松地部署和管理 function,通过function 从Pulsartopic 读取数据或者生产新数据到 Pulsar topic。
引入 Pulsar Functions 后,Pulsar 成为统一的消息投递/计算/存储平台。只需部署一套Pulsar 集群,便可以实现一个计算引擎,页面简单,操作便捷。
Input topic 是数据的来源,在 Pulsar Functions 中,所有的数据均来自 input topic。当数据进入inputtopic 中,Pulsar Functions 充当消费者的角色,去 input topic 中消费消息;当从input topic 中拿到需要处理的消息时,Pulsar Functions 充当生产者的角色往 output topic 或者 log topic 中生产消息。
Output topic 和 log topic 都可以看作是 Pulsar Functions 的输出。从是否会有output 这个点来看,我们可以将 Pulsar Functions 分为两类,当有输出的时候 Pulsar Functions 会将相应的output 输出到outputtopic中。log topic 主要存储用户的日志信息,当 Pulsar Functions 出现问题时,方便用户定位错误并调试。
综上所述:我们不难看出 Pulsar Functions 充当了一个消息处理和转运的角色。
在使用Pulsar Functions, 可以使用不同的语言来编写, 比如Python,Java,Go等. 编写方式主要两种
- 本地模式: 集群外部, 进行本地运行
- 集群模式: 集群内部运行(支持独立模式和集成模式)
3.function的基本使用操作
3.1.开启function功能
pulsar 默认没有开启function功能,需要手动开启并重启集群
# vim conf/broker.conf functionsWorkerEnabled=true # ./bin/pulsar-daemon stop broker # ./bin/pulsar-daemon start broker
3.2.测试function使用
1、导入自带的测试jar包
./bin/pulsar-admin functions create \ --jar examples/api-examples.jar \ --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \ --inputs persistent://public/default/exclamation-input \ --output persistent://public/default/exclamation-output \ --tenant public \ --namespace default \ --name exclamation
2、检查是否按照预期触发函数运行
./bin/pulsar-admin functions trigger --name exclamation --trigger-value "hello world"
3、functions属性说明
bin/pulsar-admin functions 属性说明: functions: 可选值: localrun: 创建本地function进行运行 create: 在集群模式下创建 delete: 删除在集群中运行的function get: 获取function的相关信息 restart: 重启 stop : 停止运行 start: 启动 status: 检查状态 stats: 查看状态 list: 查看特定租户和名称空间下的所有的function --classname: 设置function执行类 --jar 设置function对应的jar包 --inputs : 输入的topic --output : 输出的topic --tenant : 设置function运行在那个租户中 --namespace: 设置function运行在那个名称空间中 --name : 定义function的名称
4.function的案例
案例需求: 使用Pulsar Function 读取某一个Topic中日期(格式为: yyyy/MM/dd HH/mm/ss)数据, 读取后, 对数据进行日期转换(格式为:yyyy-MM-ddHH:mm:ss )
1、创建一个maven工程,并引入依赖
<build> <finalName>Pulsar-FunctionData</finalName> </build> <dependencies> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-functions-api</artifactId> <version>2.8.1</version> </dependency> </dependencies>
2、编写逻辑代码
import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import java.text.SimpleDateFormat; import java.util.Date; public class FunctionDate implements Function<String,String> { // 此方法, 没接收到一条数据, 就会调用一次 process方式, 其中 // 参数1: 输入的消息数据 // 参数2: Context 表示上下文对象,用于执行一些相关的统计计算操作, 以及获取到相关的对象以及元数据信息 private SimpleDateFormat format1 = new SimpleDateFormat("yyyy/MM/dd HH/mm/ss"); private SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public String process(String input, Context context) throws Exception { Date oldDate = format1.parse(input); return format2.format(oldDate); } }
4、构建测试
./bin/pulsar-admin functions create \ --jar functions/Pulsar-FunctionData.jar \ --classname com.fblinux.functions.FunctionDate \ --inputs persistent://public/default/fm-input \ --output persistent://public/default/fm-output \ --tenant public \ --namespace default \ --name FunctionData
启动function测试
./bin/pulsar-admin functions trigger --name FunctionData --trigger-value "2022/10/14 19/30/30"
转载请注明:西门飞冰的博客 » Pulsar function