一、引入netty依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
二、创建Tcp服务
使用线程创建服务类ReadCodeServer.java
package com.kaicom.mes.netty.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* @Author: BillYu
* @Description:websocket服务端
* @Date: Created in 下午2:04 2018/6/1.
*/
public class ReadCodeServer implements Runnable{
private final Logger log = LoggerFactory.getLogger(ReadCodeServer.class);
@Override
public void run() {
// 服务端启动辅助类,用于设置TCP相关参数
ServerBootstrap bootstrap = new ServerBootstrap();
// 获取Reactor线程池
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
// 设置为主从线程模型
bootstrap.group(bossGroup, workGroup)
// 设置服务端NIO通信类型
.channel(NioServerSocketChannel.class)
// 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理
.childHandler(new ChannelInitializer<Channel>() {
// 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等
@Override
protected void initChannel(Channel ch) throws Exception {
// 获取职责链
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("handler", new ReadCodeTestHandler());
}
})
// bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。
// 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。
// backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列
.option(ChannelOption.SO_BACKLOG, 1024)
// 表示连接保活,相当于心跳机制,默认为7200s
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
// 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理
Channel channel = bootstrap.bind(8081).sync().channel();
// 等待服务端口关闭
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
System.out.println("over read");
return;
}
}
消息处理类:ReadCodeTestHandler.java
package com.kaicom.mes.netty.netty;
import com.kaicom.mes.netty.kafka.KafkaUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static io.netty.buffer.Unpooled.copiedBuffer;
/**
* @Author: BillYu
* @Description: 读码头tcp服务处理
* @Date: Created in 下午2:05 2018/6/1.
*/
public class ReadCodeTestHandler extends ChannelInboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(ReadCodeTestHandler.class);
/**
* channelAction channel 通道 action 活跃的
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.channel().read();
System.out.println(ctx.channel().localAddress().toString() + " 通道已激活!");
}
/**
* channelInactive channel 通道 Inactive 不活跃的
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().localAddress().toString() + " 通道不活跃!并且关闭。");
// 关闭流
ctx.close();
}
/**
* 功能:读取服务器发送过来的信息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] buffer = new byte[buf.readableBytes()];
buf.readBytes(buffer, 0, buffer.length);
String rev = new String(buffer);
System.out.println("客户端收到服务器数据:" + rev);
String address = ctx.channel().remoteAddress().toString();
System.out.println(address);
// 匹配规则
String reg = "/(.*?):";
Pattern pattern = Pattern.compile(reg);
// 内容 与 匹配规则 的测试
Matcher matcher = pattern.matcher(address);
String ip = "0.0.0.0";
if (matcher.find()) {
// 不包含前后的两个字符
ip = matcher.group(1);
} else {
log.error("无法解析IP地址" + address + " 码:" + rev);
}
log.info("ip:" + ip + " code:" + rev);
rev = ip + "|" + System.currentTimeMillis() + "|" + rev;
log.info(rev);
KafkaUtil.send("code_info", rev);
log.info("发送到kafka");
ctx.writeAndFlush(rev);
} finally {
ReferenceCountUtil.release(msg);
}
}
/**
* 功能:读取完毕客户端发送过来的数据之后的操作
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务端接收数据完毕..");
// 第一种方法:写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
// ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
// ctx.flush();
// ctx.flush(); //
// 第二种方法:在client端关闭channel连接,这样的话,会触发两次channelReadComplete方法。
// ctx.flush().close().sync(); // 第三种:改成这种写法也可以,但是这中写法,没有第一种方法的好。
}
/**
* 功能:服务端发生异常的操作
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("异常信息:\r\n");
cause.printStackTrace();
}
}
三、创建websocket服务
使用线程创建websocket服务:WebsocketServer.java
package com.kaicom.mes.netty.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* @Author: BillYu
* @Description:websocket服务端
* @Date: Created in 下午2:04 2018/6/1.
*/
public class WebsocketServer implements Runnable{
private final Logger log = LoggerFactory.getLogger(WebsocketServer.class);
@Override
public void run() {
// 服务端启动辅助类,用于设置TCP相关参数
ServerBootstrap bootstrap = new ServerBootstrap();
// 获取Reactor线程池
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
// 设置为主从线程模型
bootstrap.group(bossGroup, workGroup)
// 设置服务端NIO通信类型
.channel(NioServerSocketChannel.class)
// 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理
.childHandler(new ChannelInitializer<Channel>() {
// 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等
@Override
protected void initChannel(Channel ch) throws Exception {
// 获取职责链
ChannelPipeline pipeline = ch.pipeline();
//
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
pipeline.addLast("handler", new WebsocketHandler());
}
})
// bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。
// 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。
// backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列
.option(ChannelOption.SO_BACKLOG, 1024)
// 表示连接保活,相当于心跳机制,默认为7200s
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
// 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理
Channel channel = bootstrap.bind(8080).sync().channel();
// 等待服务端口关闭
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
消息处理类:WebsocketHandler.java
package com.kaicom.mes.netty.netty;
import com.kaicom.mes.netty.kafka.KafkaUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import static io.netty.buffer.Unpooled.copiedBuffer;
/**
* @Author: BillYu
* @Description:
* @Date: Created in 下午2:05 2018/6/1.
*/
@Component
public class WebsocketHandler extends ChannelInboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(WebsocketHandler.class);
/**
* 用于websocket握手的处理类
*/
private WebSocketServerHandshaker handshaker;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// ReferenceCountUtil.release(msg);在读数据时候,在结束时候,需要释放,将byteBuf回收到直接内存池中。
try {
if (msg instanceof FullHttpRequest) {
System.out.println("===> full http request");
// websocket连接请求
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
// System.out.println("===> web socket frame");
// websocket业务处理
handleWebSocketRequest(ctx, (WebSocketFrame) msg);
}
} finally {
ReferenceCountUtil.release(msg);
}
// System.out.println("==>over");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
System.out.println("===>exception:");
System.out.println(cause);
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
// Http解码失败,向服务器指定传输的协议为Upgrade:websocket
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
// 握手相应处理,创建websocket握手的工厂类,
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8081/ws", null, false);
// 根据工厂类和HTTP请求创建握手类
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
// 不支持websocket
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
// 通过它构造握手响应消息返回给客户端
handshaker.handshake(ctx.channel(), req);
}
}
private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) {
if (req instanceof CloseWebSocketFrame) {
// 关闭websocket连接
handshaker.close(ctx.channel(), (CloseWebSocketFrame) req.retain());
return;
}
if (req instanceof PingWebSocketFrame) {
System.out.println("====> ping web socket frame");
ctx.channel().write(new PongWebSocketFrame(req.content().retain()));
return;
}
if (!(req instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
}
if (ctx == null || this.handshaker == null || ctx.isRemoved()) {
System.out.println("null");
// throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
}
String request = ((TextWebSocketFrame) req).text();
log.info("收到socket msg=" + request);
KafkaUtil.send("equipment_info", request);
log.info("发送到kafka");
//消息回复
ctx.channel().writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) req).text()));
}
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// BAD_REQUEST(400) 客户端请求错误返回的应答消息
if (res.status().code() != 200) {
// 将返回的状态码放入缓存中,Unpooled没有使用缓存池
ByteBuf buf = copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
}
// 发送应答消息
ChannelFuture cf = ctx.channel().writeAndFlush(res);
// 非法连接直接关闭连接
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
cf.addListener(ChannelFutureListener.CLOSE);
}
}
}
四、启动服务线程
@Component
public class StartApplicationRunner implements ApplicationRunner {
private Logger logger = LoggerFactory.getLogger(StartApplicationRunner.class);
@Override
public void run(ApplicationArguments args) throws Exception {
logger.info("start ReadCodeServer");
new Thread(new ReadCodeServer()).start();
logger.info("start WebsocketServer");
new Thread(new WebsocketServer()).start();
}
}
项目目录结构:
DC3A0BE8-22E9-41AA-8723-F8740541C7D1.png
五、备注
group :设置SeverBootstrap要用到的EventLoopGroup,也就是定义netty服务的线程模型,处理Acceptor链接的主"线程池"以及用于I/O工作的从"线程池";
channel:设置将要被实例化的SeverChannel类;
option :指定要应用到新创建SeverChannel的ChannelConfig的ChannelOption.其实也就是服务本身的一些配置;
chidOption:子channel的ChannelConfig的ChannelOption。也就是与客户端建立的连接的一些配置;
childHandler:设置将被添加到已被接收的子Channel的ChannelPipeline中的ChannelHandler,其实就是让你在里面定义处理连接收发数据,需要哪些ChannelHandler按什么顺序去处理;
ServerChannelInitializer这个类继承实现自netty的ChannelInitializer抽象类,这个类的作用就是对channel(连接)的ChannelPipeline进行初始化工作,说白了就是你要把处理数据的方法添加到这个任务链中去,netty才知道每一步拿着socket连接和数据去做什么。
之前遇到handle里面channelRead()方法不执行,发现原因是在任务链中加了其他的handle类