当前位置: 代码迷 >> 综合 >> netty~ 基于netty实现服务端的长连接
  详细解决方案

netty~ 基于netty实现服务端的长连接

热度:88   发布时间:2024-02-25 18:07:23.0

描述

        socket长连接即服务端不断开客户端channel的连接,客户端需要定时向服务端进行心跳检测,服务端需要将过期未进行心跳检测的socket关闭。

        服务端关闭过期的channel连接: Netty提供了ScheduledFuture,可以通过ChannelHandlerContext.executor().schedule()创建,支持延时提交,也支持取消任务,为自动关闭提供了一个很好的实现方案。

实现Demo

消息定义

public class Msg {/**消息类型:1:心跳检测消息2:普通消息*/private byte type;/**消息长度*/private int length;/**消息内容*/private String content;public byte getType() {return type;}public void setType(byte type) {this.type = type;}public int getLength() {return length;}public void setLength(int length) {this.length = length;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}@Overridepublic String toString() {return "Msg{" +"type=" + type +", length=" + length +", content='" + content + '\'' +'}';}
}

消息编码

public class MsgEncoder extends MessageToByteEncoder<Msg> {@Overrideprotected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf byteBuf) throws Exception {byteBuf.writeByte(msg.getType());byteBuf.writeInt(msg.getLength());if (!StringUtil.isNullOrEmpty(msg.getContent())) {byteBuf.writeBytes(msg.getContent().getBytes());}}
}

消息解码

public class MsgDecoder extends ReplayingDecoder<MsgDecoder.MsgState> {/*** 状态类型通常是一个Enum ; 使用Void如果状态管理是未使用* TYPE:    消息类型* LENGTH:  消息长度* CONTENT: 消息内容*/public enum MsgState {TYPE,LENGTH,CONTENT}public MsgDecoder() {super(MsgState.TYPE);}private Msg msg;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {MsgState state = state();switch (state) {case TYPE:msg = new Msg();byte type = byteBuf.readByte();msg.setType(type);checkpoint(MsgState.LENGTH);break;case LENGTH:int length = byteBuf.readInt();msg.setLength(length);if (length > 0) {checkpoint(MsgState.CONTENT);} else {out.add(msg);checkpoint(MsgState.TYPE);}break;case CONTENT:byte[] bytes = new byte[msg.getLength()];byteBuf.readBytes(bytes);String content = new String(bytes);msg.setContent(content);out.add(msg);checkpoint(MsgState.TYPE);break;default:throw new IllegalStateException("invalid state:" + state);}}
}

消息处理

@ChannelHandler.Sharable
public class MsgHandler extends SimpleChannelInboundHandler<Msg> {private static Map<Integer, ChannelCache> channelCache = new HashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Msg msg) throws Exception {System.out.println("收到消息,消息内容" + msg);Channel channel = ctx.channel();final int hashCode = channel.hashCode();//判断channel在缓存中if (!channelCache.containsKey(hashCode)) {//添加通道关闭的监听器,当通道关闭时将channel从缓存中移除channel.closeFuture().addListener(future -> {channelCache.remove(hashCode);});//创建并执行定时任务 10秒后服务端主动将channel关闭ScheduledFuture scheduledFuture = ctx.executor().schedule(() -> {channel.close();}, 10, TimeUnit.SECONDS);//将渠道信息放入缓存channelCache.put(hashCode, new ChannelCache(channel, scheduledFuture));}switch (msg.getType()) {//心跳检测case 1: {//创建一个新的定时器ScheduledFuture scheduledFuture = ctx.executor().schedule(() -> channel.close(), 5, TimeUnit.SECONDS);//重新设置channel过期定时器并将老的定时器取消ChannelCache cache = channelCache.get(hashCode);cache.getScheduledFuture().cancel(true);cache.setScheduledFuture(scheduledFuture);ctx.channel().writeAndFlush(msg);break;}//普通消息case 2: {channelCache.entrySet().stream().forEach(entry -> {Channel otherChannel = entry.getValue().getChannel();otherChannel.writeAndFlush(msg);});break;}}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {super.channelReadComplete(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (null != cause) {cause.printStackTrace();}if (null != ctx) {ctx.close();}}
}

channel缓存

public class ChannelCache {private Channel channel;private ScheduledFuture scheduledFuture;public ChannelCache(Channel channel, ScheduledFuture scheduledFuture) {this.channel = channel;this.scheduledFuture = scheduledFuture;}。。。。
}

服务端

/*** 基于netty的服务端* 思路:*  socket长连接即服务端不断开客户端channel的连接,客户端需要定时向服务端进行心跳检测,服务端需要将过期未进行心跳检测的socket关闭。* 服务端关闭过期的channel连接:*   Netty提供了ScheduledFuture,可以通过ChannelHandlerContext.executor().schedule()创建,支持延时提交,也支持取消任务,*   为自动关闭提供了一个很好的实现方案。*/
public class LongConnServer {private static final int port = 9999;public static void main(String[] args) throws Exception {LongConnServer server = new LongConnServer();server.start();}public void start() throws Exception {ServerBootstrap b = new ServerBootstrap();NioEventLoopGroup group = new NioEventLoopGroup();b.group(group).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch)throws Exception {ch.pipeline().addLast("decoder", new MsgDecoder()).addLast("encoder", new MsgEncoder()).addLast("handler", new MsgHandler());}})// determining the number of connections queued.option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);b.bind(port).sync();}
}

客户端

/*** @describe: socket客户端* @author: houkai*/
public class LongConnClient {String host = "127.0.0.1";int port = 9999;public static void main(String[] args) throws Exception {new LongConnClient().testLongConn();}public void testLongConn() throws Exception {final Socket socket = new Socket();socket.connect(new InetSocketAddress(host, port));//独立的线程 获取服务端的响应消息new Thread(() -> {while (true) {readResponse(socket);}}).start();//每隔3秒进行一次心跳检测new Thread(() -> {while (true) {try {heartCheck(socket);Thread.sleep(3000);} catch (IOException | InterruptedException e) {e.printStackTrace();}}}).start();//客户端每一秒向服务端发送一跳消息while (true) {byte[] content = ("hello, I'm  " + hashCode()).getBytes();ByteBuffer byteBuffer = ByteBuffer.allocate(content.length + 5);byteBuffer.put((byte) 2);byteBuffer.putInt(content.length);byteBuffer.put(content);socket.getOutputStream().write(byteBuffer.array());Thread.sleep(1000);}}/*** 心跳检测*/private void heartCheck(Socket socket) throws IOException {ByteBuffer byteBuffer = ByteBuffer.allocate(5);byteBuffer.put((byte) 1);byteBuffer.putInt(0);socket.getOutputStream().write(byteBuffer.array());}/*** 读取响应的消息*/private void readResponse(final Socket socket) {try {InputStream in = socket.getInputStream();byte[] buffer = new byte[1024];int n;while ((n = in.read(buffer)) > 0) {System.out.println(new String(buffer, 0, n));}} catch (IOException e) {e.printStackTrace();}}
}