java-doc-netty4百万连接

优化系统最大文件句柄

  1. 查看操作系统最大文件句柄数,执行命令cat /proc/sys/fs/file-max,查看最大句柄数是否满足需要,如果不满足,通过vim /etc/sysctl.conf命令插入如下配置:

    1
    fs.file-max = 1000000

    配置完成后,执行sysctl -p命令,让配置立即生效

  2. 设置单进程打开的文件最大句柄数,执行命令ulimit -a查看当前设置是否满足要求:
    当并发接入的Tcp连接数超过上限时,就会提示“Too many open files”,所有的新客户端接入将会失败。通过vim /etc/security/limits.conf 修改配置参数:

    1
    2
    soft nofile 1000000
    hard nofile 1000000

    修改配置参数后注销生效。

优化TCP/IP相关参数

  • 查看客户端端口范围限制

    1
    cat /proc/sys/net/ipv4/ip_local_port_range
  • 通过vim /etc/sysctl.conf 修改网络参数

  • 客户端修改端口范围的限制

    1
    net.ipv4.ip_local_port_range = 1024 65535
  • 优化TCP参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    net.ipv4.tcp_mem = 786432 2097152 3145728
    net.ipv4.tcp_wmem = 4096 4096 16777216
    net.ipv4.tcp_rmem = 4096 4096 16777216
    net.ipv4.tcp_keepalive_time = 1800
    net.ipv4.tcp_keepalive_intvl = 20
    net.ipv4.tcp_keepalive_probes = 5
    net.ipv4.tcp_tw_reuse = 1
    net.ipv4.tcp_tw_recycle = 1
    net.ipv4.tcp_fin_timeout = 30

    参数说明

    net.ipv4.tcp_mem: 分配给tcp连接的内存,单位是page(1个Page通常是4KB,可以通过getconf PAGESIZE命令查看),三个值分别是最小、默认、和最大。比如以上配置中的最大是3145728,那分配给tcp的最大内存=3145728_4 / 1024 / 1024 = 12GB。一个TCP连接大约占7.5KB,粗略可以算出百万连接≈7.5_1000000/4=1875000 3145728足以满足测试所需。
    net.ipv4.tcp_wmem: 为每个TCP连接分配的写缓冲区内存大小,单位是字节。三个值分别是最小、默认、和最大。

    net.ipv4.tcp_rmem: 为每个TCP连接分配的读缓冲区内存大小,单位是字节。三个值分别是最小、默认、和最大。

    net.ipv4.tcp_keepalive_time: 最近一次数据包发送与第一次keep alive探测消息发送的事件间隔,用于确认TCP连接是否有效。

    net.ipv4.tcp_keepalive_intvl: 在未获得探测消息响应时,发送探测消息的时间间隔。

    net.ipv4.tcp_keepalive_probes: 判断TCP连接失效连续发送的探测消息个数,达到之后判定连接失效。

    net.ipv4.tcp_tw_reuse: 是否允许将TIME_WAIT Socket 重新用于新的TCP连接,默认为0,表示关闭。

    net.ipv4.tcp_tw_recycle: 是否开启TIME_WAIT Socket 的快速回收功能,默认为0,表示关闭。

    net.ipv4.tcp_fin_timeout: 套接字自身关闭时保持在FIN_WAIT_2 状态的时间。默认为60。

参考代码

Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package learn2;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
//服务端的IP
private static final String SERVER_HOST = "localhost";

static final int BEGIN_PORT = 11000;
static final int N_PORT = 100;

public static void main(String[] args) {
new Client().start(BEGIN_PORT, N_PORT);
}

public void start(final int beginPort, int nPort) {
System.out.println("客户端启动....");
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);

int index = 0;
int port;

String serverHost = System.getProperty("server.host", SERVER_HOST);
//从10000的端口开始,按端口递增的方式进行连接
while (!Thread.interrupted()) {
port = beginPort + index;
try {
ChannelFuture channelFuture = bootstrap.connect(serverHost, port);
channelFuture.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
System.out.println("连接失败,退出!");
System.exit(0);
}
});
channelFuture.get();
} catch (Exception e) {
}

if (++index == nPort) {
index = 0;
}
}
}

}
ConnectionCountHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package learn2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ConnectionCountHandler extends ChannelInboundHandlerAdapter {

//这里用来对连接数进行记数,每两秒输出到控制台
private static final AtomicInteger nConnection = new AtomicInteger();

static {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
System.out.println("连接数: " + nConnection.get());
}, 0, 2, TimeUnit.SECONDS);
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
nConnection.incrementAndGet();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
nConnection.decrementAndGet();
}
}
Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package learn2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {
static final int BEGIN_PORT = 11000;
static final int N_PORT = 100;

public static void main(String[] args) {
new Server().start(BEGIN_PORT, N_PORT);
}

public void start(int beginPort, int nPort) {
System.out.println("启动服务....");

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//每个连接都有个ConnectionCountHandler对连接记数进行增加
pipeline.addLast(new ConnectionCountHandler());
}
});

//这里开启 10000到100099这100个端口
for (int i = 0; i < nPort; i++) {
int port = beginPort + i;
bootstrap.bind(port).addListener((ChannelFutureListener) future -> {
System.out.println("端口绑定成功: " + port);
});
}
System.out.println("服务已启动!");
}
}