当前位置: 代码迷 >> java >> Echo Server与1000个客户端同时发生(丢失消息+错误连接)
  详细解决方案

Echo Server与1000个客户端同时发生(丢失消息+错误连接)

热度:128   发布时间:2023-07-25 20:06:01.0

我正在阅读“Netty In Action V5”。 当阅读第2.3章和第2.4章时,我尝试使用示例EchoServer和EchoClient,当我测试一个连接到服务器的客户端时,一切都运行良好...然后我将示例修改为多个客户端可以连接到服务器。 我的目的是运行压力测试:1000个客户端将连接到服务器,每个客户端将回复100条消息到服务器,当所有客户端完成后,我将获得所有进程的总时间。 服务器部署在Linux机器(VPS)上,客户端部署在窗口机器上。

当运行压力测试时,我遇到了两个问题:

一些客户收到错误消息:

java.io.IOException: An existing connection was forcibly closed by the remote host 
    at sun.nio.ch.SocketDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:447)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)\at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)

但是有些客户没有收到来自服务器的消息

工作环境:

  • Netty-all-4.0.30.Final

  • JDK1.8.0_25

  • Echo客户端部署在Window 7 Ultimate上

  • Echo Server部署在Linux Centos 6上

NettyClient类:

public class NettyClient {
    private Bootstrap bootstrap;
    private EventLoopGroup group;

    public NettyClient(final ChannelInboundHandlerAdapter handler) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(handler);
            }
        });
    }

    public void start(String host, int port) throws Exception {
        bootstrap.remoteAddress(new InetSocketAddress(host, port));
        bootstrap.connect();
    }

    public void stop() {
        try {
            group.shutdownGracefully().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

NettyServer类:

public class NettyServer {
    private EventLoopGroup parentGroup;
    private EventLoopGroup childGroup;
    private ServerBootstrap boopstrap;

    public NettyServer(final ChannelInboundHandlerAdapter handler) {
        parentGroup = new NioEventLoopGroup(300);
        childGroup = new NioEventLoopGroup(300);
        boopstrap = new ServerBootstrap();
        boopstrap.group(parentGroup, childGroup);
        boopstrap.channel(NioServerSocketChannel.class);
        boopstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(handler);
            }
        });
    }

    public void start(int port) throws Exception {
        boopstrap.localAddress(new InetSocketAddress(port));
        ChannelFuture future = boopstrap.bind().sync();
        System.err.println("Start Netty server on port " + port);
        future.channel().closeFuture().sync();
    }

    public void stop() throws Exception {
        parentGroup.shutdownGracefully().sync();
        childGroup.shutdownGracefully().sync();
    }
}

类EchoClient

public class EchoClient {
    private static final String HOST = "203.12.37.22";
    private static final int PORT = 3344;
    private static final int NUMBER_CONNECTION = 1000;
    private static final int NUMBER_ECHO = 10;
    private static CountDownLatch counter = new CountDownLatch(NUMBER_CONNECTION);

    public static void main(String[] args) throws Exception {
        List<NettyClient> listClients = Collections.synchronizedList(new ArrayList<NettyClient>());
        for (int i = 0; i < NUMBER_CONNECTION; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        NettyClient client = new NettyClient(new EchoClientHandler(NUMBER_ECHO) {
                            @Override
                            protected void onFinishEcho() {
                                counter.countDown();
                                System.err.println((NUMBER_CONNECTION - counter.getCount()) + "/" + NUMBER_CONNECTION);
                            }
                        });
                        client.start(HOST, PORT);
                        listClients.add(client);
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }).start();
        }

        long t1 = System.currentTimeMillis();
        counter.await();
        long t2 = System.currentTimeMillis();
        System.err.println("Totla time: " + (t2 - t1));

        for (NettyClient client : listClients) {
            client.stop();
        }
    }

    private static class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

        private static final String ECHO_MSG = "Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo";
        private int numberEcho;
        private int curNumberEcho = 0;

        public EchoClientHandler(int numberEcho) {
            this.numberEcho = numberEcho;
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_MSG, CharsetUtil.UTF_8));
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            curNumberEcho++;
            if (curNumberEcho >= numberEcho) {
                onFinishEcho();
            } else {
                ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_MSG, CharsetUtil.UTF_8));
            }
        }

        protected void onFinishEcho() {

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

EchoServer类:

public class EchoServer {
    private static final int PORT = 3344;

    public static void main(String[] args) throws Exception {
        NettyServer server = new NettyServer(new EchoServerHandler());
        server.start(PORT);
        System.err.println("Start server on port " + PORT);
    }

    @Sharable
    private static class EchoServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.write(msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
}

你可以改变两件事:

  1. 只创建一个客户端引导程序,并为所有客户端重用它 ,而不是为每个客户端创建一个。 因此,从客户端部分中提取引导构建,并在开始时仅保留连接。 这将限制内部的线程数。

  2. 当达到乒乓数量时, 关闭客户端的连接 目前你只调用onFinishEcho上的空方法,这在客户端没有任何关闭,所以没有客户端停止...因此也没有关闭通道......

您可能已经对客户端上的线程数量有一些限制。

另外一个元素可能是一个问题: 您没有指定任何编解码器 (字符串编解码器或其他),这可能导致从客户端或服务器部分发送被视为完全响应。

例如,您可能有第一个“Echo Echo Echo”块发送一个包含缓冲区开头的数据包,而其他部分(更多“Echo”)将通过以后的数据包发送。

为了防止这种情况,您应该使用一个编解码器来确保最终处理程序获得真正的完整消息,而不是部分消息。 如果没有,您可能会陷入其他问题,例如服务器端的错误,试图发送额外的数据包,而客户端会尽快关闭该频道...

  相关解决方案