1.KOP 介绍
KoP(Kafka on Pulsar)通过在 Pulsar 代理上引入 Kafka 协议处理程序,为 Apache Pulsar 带来了原生的Apache Kafka协议支持。通过将 KoP 协议处理程序添加到您现有的 Pulsar 集群,您可以将现有的 Kafka 应用程序和服务迁移到Pulsar,而无需修改代码。这使 Kafka 应用程序能够利用 Pulsar 的强大功能.
KoP 作为 Pulsar协议处理程序插件实现,协议名称为“kafka”,在 Pulsar broker 启动时加载。它通过在Apache Pulsar上提供原生 Kafka 协议支持,这样可以大大降低学习Pulsar的成本。基于KOP方案, 整合两个流行的事件流生态系统软件。使用ApachePulsar 构建真正统一的事件流平台,以加速实时应用程序和服务的开发。
2.KOP 配置
wget https://github.com/streamnative/kop/releases/download/v2.8.1.30/pulsar-protocol-handler-kafka-2.8.1.30.nar
2、将KOP NAR包上传到Pulsar的protocols目录中, 如果没有此目录, 直接创建即可
3、在broker配置中设置KOP相关信息
]# vim conf/broker.conf # 添加以下内容 messagingProtocols=kafka protocolHandlerDirectory=./protocols kafkaListeners=PLAINTEXT://pulsar01:9092 brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor # 修改以下配置 allowAutoTopicCreationType=partitioned brokerDeleteInactiveTopicsEnabled=false
4、剩余broker节点也需要上传KOP nar包并修改配置文件,其中kafkaListeners需要修改为自己的IP地址
3.KOP测试
当pulsar基于Kafka协议后,此时我们完全可以使用Kafka的相关命令或者API来进行生产和消费操作,可以无痕迁移到pulsar
Kafka生产者发送数据:
4.KOP 使用
1、maven依赖导入:有了KOP直接导入Kafka的jar包即可
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>
2、生产者代码
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducer_KOP { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "pulsar01:9092,pulsar02:9092,pulsar03:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("kop-topic", Integer.toString(i), Integer.toString(i))); } producer.close(); } }
3、消费者代码
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class KafkaConsumer_KOP { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "pulsar01:9092,pulsar02:9092,pulsar03:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("kop-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
转载请注明:西门飞冰的博客 » Kafka on Pulsar(KOP)