1.前言
本文主要通过Netty实现一个Http协议的数据采集服务,并将Netty接收的请求转换成消息发送给Kafka:
关于采集程序的几个规则:
1、Netty判断请求的合规性通过request header中携带的key和value进行判断,没有携带指定key和value的request请求全部丢弃。
2、采集的数据仅包括request的body全部内容,且request body的内容需要为json格式,如果需要请求的header信息,可以在Channel中扩展,代码中有获取request header的示例。
3、因为这是自己写着练手用的,没有实际的业务数据,所以采集程序并没有对数据合规性进行校验,要是需要对数据合规性进行判断,可以在Channel中扩展,也可以把这个需求交给消费者或者具体使用数据的团队。
采集架构图如下:
说明:Kafka 集群相关内容略过,请各位看官自己搭建。
2.实现代码
代码结构如下:
(1)导入所需的依赖包
<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.74.Final</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.2</version> </dependency> </dependencies>
(2)添加my.setting配置文件,通过hutool工具包中的Setting类型读取配置文件信息
#netty启动监听的端口号 server.port=8080 #kafka集群服务 kafka.servers=172.16.247.3:9092,172.16.247.4:9092,172.16.247.4:9092 #指定kafka中的topic kafka.topic=netty-collect # 设置请求的校验header信息 request.headerkey=check request.headervalue=haha
添加配置文件获取类
public class MyConfig { public static Setting setting; static { //读取配置文件,读取文件的路径是在classpath下 setting = new Setting("my.setting"); } }
(3)NettyCollectServer类,负责启动Netty服务,添加http相关的编解码器。
import cn.hutool.core.util.StrUtil; import com.fblinux.config.MyConfig; import com.fblinux.handler.ServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; public class NettyCollectServer { public static void main(String[] args) throws InterruptedException { // 定义BossGroup,用于接收用户的链接请求 NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); // 定义WorkerGroup,用于业务逻辑的处理,默认线程数:cpu核数*2 NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { // Netty启动类 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) // 指定通道处理类型,使用的是Nio方式 .channel(NioServerSocketChannel.class) // 业务逻辑处理 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 对于http协议的解码器 .addLast(new HttpRequestDecoder()) // 对于http协议的编码器,用于数据响应 .addLast(new HttpResponseEncoder()) // 将请求的数据,url中或请求头中聚合在一起,得到FullHttpRequest对象 // 传入参数是最大长度 .addLast(new HttpObjectAggregator(1024 * 128)) // 添加自定义处理器 .addLast(new ServerHandler()); } }); // 启动服务 int port = MyConfig.setting.getInt("server.port"); ChannelFuture future = serverBootstrap.bind(port).sync(); System.out.println(StrUtil.format("Netty服务启动了,端口号为:{}。。。。。", port)); //等待监听关闭的信号,阻塞当前的线程,等待客户端的请求 future.channel().closeFuture().sync(); } finally { //优雅的关闭服务 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
(4)ServerHandler类,负责接收用户请求,以及发送kafka消息。
import cn.hutool.core.map.MapUtil; import cn.hutool.json.JSONUtil; import com.fblinux.config.MyConfig; import com.fblinux.service.KafkaService; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; // http web服务处理器 public class ServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private KafkaService kafkaService = new KafkaService(); // Channel数据读取方法,每一条用户请求都会经过这里 @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception { // 从配置中获取校验的header key和value信息 String headerkey = MyConfig.setting.get("request.headerkey"); String headervalue = MyConfig.setting.get("request.headervalue"); // 获取校验header的值 String headerCehck = fullHttpRequest.headers().get(headerkey); // 判断请求是否配置了指定的请求头和值 if (headerCehck.equals(headervalue)){ // 获取请求中的body String body = fullHttpRequest.content().toString(CharsetUtil.UTF_8); // 发送消息到Kafka this.kafkaService.sendMsg(body); // 响应客户端请求 String result = JSONUtil.toJsonStr(MapUtil.builder().put("status", "ok").build()); this.response(ctx,result); }else { // 响应客户端请求 String result = JSONUtil.toJsonStr(MapUtil.builder().put("status", "false").build()); this.response(ctx,result); } } private void response(ChannelHandlerContext ctx,String result){ // 给客户端响应 DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); // 响应的内容 httpResponse.content().writeBytes(Unpooled.copiedBuffer(result, CharsetUtil.UTF_8)); // 设置响应头 httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"application/json; charset=utf-8"); //响应完成后需要将Channel关闭掉 ctx.writeAndFlush(httpResponse).addListener(ChannelFutureListener.CLOSE); } }
(5)KafkaService类,完成发送kafka消息的业务逻辑。
// 实现向Kafka发送消息逻辑 public class KafkaService { private KafkaProducer<String, String> producer; public KafkaService() { // 配置生产者参数 Properties properties = new Properties(); // 配置集群地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyConfig.setting.get("kafka.servers")); // 消息key的序列化方式 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class); // 消息value的序列化方式 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); this.producer = new KafkaProducer<String, String>(properties); } // 发送消息到Kafka public Boolean sendMsg(String body) { // 获取发送数据的topic String topic = MyConfig.setting.get("kafka.topic"); // 发送数据 this.producer.send(new ProducerRecord<>(topic, body), (metadata, exception) -> { if (null != exception) { // 发送失败 System.out.println(exception); } }); return true; } }
3.测试
测试正常的请求:即配置了正确的header key和header value
转载请注明:西门飞冰的博客 » Netty 实现http数据采集服务