java的IO

linux文件系统

linux中常说的一句话 “ 一切皆文件”

数据块:

硬盘分成相同大小的单元,我们称为块(Block)。一块的大小是扇区大小的整数倍,默认是 4K。

Inode:

index node, linux中文件抽象出来的一个数据结构(class), index就是索引,就是去哪找组成文件的数据块。

文件描述符 fd(File Descriptor):

当使用系统调用 open 打开一个文件时,操作系统会创建一些数据结构来表示这个被打开的文件。为了能够找到这些数据结构,在进程中,我们会为这个打开的文件分配一个文件描述符 fd(File Descriptor)。

pageCache:

内存比磁盘快,linux把热点数据放入内存中,这个块区域称为pageCache。

虚拟文件系统VFS:

Linux 可以支持多达数十种不同的文件系统(ext4、ext3、ntfs)。它们的实现各不相同,因此 Linux 内核向用户空间提供了虚拟文件系统这个统一的接口,来对文件系统进行操作。

linux文件系统

DMA(Direct Memory Access,直接存储器访问)

是所有现代电脑的重要特色,它允许不同速度的硬件装置来沟通,而不需要依赖于CPU的大量中断负载。否则,CPU 需要从来源把每一片段的资料复制到寄存器,然后把它们再次写回到新的地方。在这个时间中,CPU 对于其他的工作来说就无法使用。

DMA传输方式

1)用户进程向 CPU 发起 read 系统调用读取数据,由用户态切换为内核态,然后一直阻塞等待数据的返回。

2)CPU 在接收到指令以后对 DMA 磁盘控制器发起调度指令。

3)DMA 磁盘控制器对磁盘发起 I/O 请求,将磁盘数据先放入磁盘控制器缓冲区,CPU 全程不参与此过程。

4)数据读取完成后,DMA 磁盘控制器会接受到磁盘的通知,将数据从磁盘控制器缓冲区拷贝到内核缓冲区。

5)DMA 磁盘控制器向 CPU 发出数据读完的信号,由 CPU 负责将数据从内核缓冲区拷贝到用户缓冲区。

6)用户进程由内核态切换回用户态,解除阻塞状态,然后等待 CPU 的下一个执行时间钟。

内核缓冲区与进程缓冲区

缓冲区的目的,是为了减少频繁的系统IO调用。大家都知道,系统调用需要保存之前的进程数据和状态等信息,而结束调用之后回来还需要恢复之前的信息,为了减少这种损耗时间、也损耗性能的系统调用,于是出现了缓冲区。

有了缓冲区,操作系统使用read函数把数据从内核缓冲区复制到进程缓冲区,write把数据从进程缓冲区复制到内核缓冲区中。等待缓冲区达到一定数量的时候,再进行IO的调用,提升性能。至于什么时候读取和存储则由内核来决定,用户程序不需要关心。

在linux系统中,系统内核也有个缓冲区叫做内核缓冲区。每个进程有自己独立的缓冲区,叫做进程缓冲区。

所以,用户程序的IO读写程序,大多数情况下,并没有进行实际的IO操作,而是在读写自己的进程缓冲区。

什么是IO

O,英文全称是Input/Output,翻译过来就是输入/输出。平时我们听得挺多,就是什么磁盘IO,网络IO。那IO到底是什么呢?是不是有种懵懵懂懂的感觉呀,好像大概知道它是什么,又好像说不清楚。

IO,即输入/输出,到底谁是输入?谁是输出呢?IO如果脱离了主体,就会让人疑惑。

计算机角度的IO

我们常说的输入输出,比较直观的意思就是计算机的输入输出计算机就是主体。大家是否还记得,大学学计算机组成原理的时候,有个冯.诺依曼结构,它将计算机分成分为5个部分:运算器、控制器、存储器、输入设备、输出设备。

计算机角度的IO

输入设备是向计算机输入数据和信息的设备,键盘,鼠标都属于输入设备;输出设备是计算机硬件系统的终端设备,用于接收计算机数据的输出显示,一般显示器、打印机属于输出设备。

例如你在鼠标键盘敲几下,它就会把你的指令数据,传给主机,主机通过运算后,把返回的数据信息,输出到显示器。

鼠标、显示器这只是直观表面的输入输出,回到计算机架构来说,涉及计算机核心与其他设备间数据迁移的过程,就是IO。如磁盘IO,就是从磁盘读取数据到内存,这算一次输入,对应的,将内存中的数据写入磁盘,就算输出。这就是IO的本质。

操作系统的IO

我们要将内存中的数据写入到磁盘的话,主体会是什么呢?主体可能是一个应用程序,比如一个Java进程(假设网络传来二进制流,一个Java进程可以把它写入到磁盘)。

操作系统负责计算机的资源管理和进程的调度。我们电脑上跑着的应用程序,其实是需要经过操作系统,才能做一些特殊操作,如磁盘文件读写、内存的读写等等。因为这些都是比较危险的操作,不可以由应用程序乱来,只能交给底层操作系统来。也就是说,你的应用程序要把数据写入磁盘,只能通过调用操作系统开放出来的API来操作。

什么是用户空间?什么是内核空间?

以32位操作系统为例,它为每一个进程都分配了4G(2的32次方)的内存空间。这4G可访问的内存空间分为二部分,一部分是用户空间,一部分是内核空间。内核空间是操作系统内核访问的区域,是受保护的内存空间,而用户空间是用户应用程序访问的内存区域。

我们应用程序是跑在用户空间的,它不存在实质的IO过程,真正的IO是在操作系统执行的。即应用程序的IO操作分为两种动作:IO调用和IO执行。IO调用是由进程(应用程序的运行态)发起,而IO执行是操作系统内核的工作。此时所说的IO是应用程序对操作系统IO功能的一次触发,即IO调用。

操作系统的一次IO过程

应用程序发起的一次IO操作包含两个阶段:

  • IO调用:应用程序进程向操作系统内核发起调用。

  • IO执行:操作系统内核完成IO操作。

操作系统内核完成IO操作还包括两个过程:

  • 准备数据阶段:内核等待I/O设备准备好数据

  • 拷贝数据阶段:将数据从内核缓冲区拷贝到用户进程缓冲区

操作系统的一次IO过程

其实IO就是把进程的内部数据转移到外部设备,或者把外部设备的数据迁移到进程内部。外部设备一般指硬盘、socket通讯的网卡。一个完整的IO过程包括以下几个步骤:

  • 应用程序进程向操作系统发起IO调用请求

  • 操作系统准备数据,把IO外部设备的数据,加载到内核缓冲区

  • 操作系统拷贝数据,即将内核缓冲区的数据,拷贝到用户进程缓冲区

阻塞和非阻塞的区别

阻塞和非阻塞的区别就在于第一个阶段,如果数据没有就绪,在查看数据是否就绪的过程中是一直等待,还是直接返回一个标志信息。

同步和异步的区别是否会导致发起IO请求的线程暂停。

同步与异步的区别

同步需要通过用户线程或者内核不断地去轮询数据是否就绪。

异步是IO操作的两个阶段都是由内核自动完成,然后发送通知告知用户线程IO操作已经完成。

阻塞IO模型

我们已经知道IO是什么啦,那什么是阻塞IO呢?

假设应用程序的进程发起IO调用,但是如果内核的数据还没准备好的话,那应用程序进程就一直在阻塞等待,一直等到内核数据准备好了,从内核拷贝到用户空间,才返回成功提示,此次IO操作,称之为阻塞IO

阻塞IO模型

  • 阻塞IO比较经典的应用就是阻塞socket、Java BIO

  • 阻塞IO的缺点就是:如果内核数据一直没准备好,那用户进程将一直阻塞,浪费性能,可以使用非阻塞IO优化。

非阻塞IO模型

如果内核数据还没准备好,可以先返回错误信息给用户进程,让它不需要等待,而是通过轮询的方式再来请求。这就是非阻塞IO,流程图如下:

非阻塞IO模型

非阻塞IO的流程如下:

  • 应用进程向操作系统内核,发起recvfrom读取数据。

  • 操作系统内核数据没有准备好,立即返回EWOULDBLOCK错误码。

  • 应用程序进程轮询调用,继续向操作系统内核发起recvfrom读取数据。

  • 操作系统内核数据准备好了,从内核缓冲区拷贝到用户空间。

  • 完成调用,返回成功提示。

非阻塞IO模型,简称NIONon-Blocking IO。它相对于阻塞IO,虽然大幅提升了性能,但是它依然存在性能问题,即频繁的轮询,导致频繁的系统调用,同样会消耗大量的CPU资源。可以考虑IO复用模型,去解决这个问题。

IO多路复用模型

既然NIO无效的轮询会导致CPU资源消耗,我们等到内核数据准备好了,主动通知应用进程再去进行系统调用,那不就好了嘛?

当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。

IO复用模型核心思路:系统给我们提供一类函数(如我们耳濡目染的select、poll、epoll函数),它们可以同时监控多个fd的操作,任何一个返回内核数据就绪,应用进程再发起recvfrom系统调用。

IO多路复用之select

应用进程通过调用select函数,可以同时监控多个fd,在select函数监控的fd中,只要有任何一个数据状态准备就绪了,select函数就会返回可读状态,这时应用进程再发起recvfrom请求去读取数据。

IO多路复用之select

非阻塞IO模型(NIO)中,需要N(N>=1)次轮询系统调用,然而借助select的IO多路复用模型,只需要发起一次询问就够了,大大优化了性能。

但是呢,select有几个缺点:

  • 监听的IO最大连接数有限,在Linux系统上一般为1024。

  • select函数返回后,是通过遍历fdset,找到就绪的描述符fd。(仅知道有I/O事件发生,却不知是哪几个流,所以遍历所有流

因为存在连接数限制,所以后来又提出了poll。与select相比,poll解决了连接数限制问题。但是呢,select和poll一样,还是需要通过遍历文件描述符来获取已经就绪的socket。如果同时连接的大量客户端,在一时刻可能只有极少处于就绪状态,伴随着监视的描述符数量的增长,效率也会线性下降

因此经典的多路复用模型epoll诞生。

IO多路复用之epoll

为了解决select/poll存在的问题,多路复用模型epoll诞生,它采用事件驱动来实现,流程图如下:

IO多路复用之select

epoll先通过epoll_ctl()来注册一个fd(文件描述符),一旦基于某个fd就绪时,内核会采用回调机制,迅速激活这个fd,当进程调用epoll_wait()时便得到通知。这里去掉了遍历文件描述符的坑爹操作,而是采用监听事件回调的机制。这就是epoll的亮点。

我们一起来总结一下select、poll、epoll的区别

select poll epoll
底层数据结构 数组 链表 红黑树和双链表
获取就绪的fd 遍历 遍历 事件回调
事件复杂度 O(n) O(n) O(1)
最大连接数 1024 无限制 无限制
fd数据拷贝 每次调用select 需要将fd数据从用户空间拷贝到内核空间 每次调用poll,需要将fd数据从用户空间拷贝到内核空间 使用内存映射(mmap),不需要从用户空间频繁拷贝fd数据到内核空间

epoll明显优化了IO的执行效率,但在进程调用epoll_wait()时,仍然可能被阻塞。能不能酱紫:不用我老是去问你数据是否准备就绪,等我发出请求后,你数据准备好了通知我就行了,这就诞生了信号驱动IO模型

IO模型之信号驱动模型

信号驱动IO不再用主动询问的方式去确认数据是否就绪,而是向内核发送一个信号(调用sigaction的时候建立一个SIGIO的信号),然后应用用户进程可以去做别的事,不用阻塞。当内核数据准备好后,再通过SIGIO信号通知应用进程,数据准备好后的可读状态。应用用户进程收到信号之后,立即调用recvfrom,去读取数据。

IO模型之信号驱动模型

信号驱动IO模型,在应用进程发出信号后,是立即返回的,不会阻塞进程。它已经有异步操作的感觉了。但是你细看上面的流程图,发现数据复制到应用缓冲的时候,应用进程还是阻塞的。回过头来看下,不管是BIO,还是NIO,还是信号驱动,在数据从内核复制到应用缓冲的时候,都是阻塞的。还有没有优化方案呢?AIO(真正的异步IO)!

IO 模型之异步IO(AIO)

前面讲的BIO,NIO和信号驱动,在数据从内核复制到应用缓冲的时候,都是阻塞的,因此都不算是真正的异步。AIO实现了IO全流程的非阻塞,就是应用进程发出系统调用后,是立即返回的,但是立即返回的不是处理结果,而是表示提交成功类似的意思。等内核数据准备好,将数据拷贝到用户进程缓冲区,发送信号通知用户进程IO操作执行完毕。

流程如下:

![IO 模型之异步IO(AIO)](img/IO 模型之异步IO(AIO).png)

异步IO的优化思路很简单,只需要向内核发送一次请求,就可以完成数据状态询问和数据拷贝的所有操作,并且不用阻塞等待结果。日常开发中,有类似思想的业务场景:

比如发起一笔批量转账,但是批量转账处理比较耗时,这时候后端可以先告知前端转账提交成功,等到结果处理完,再通知前端结果即可。

阻塞、非阻塞、同步、异步IO总结

  • 同步阻塞(blocking-IO)简称BIO

  • 同步非阻塞(non-blocking-IO)简称NIO

  • 异步非阻塞(asynchronous-non-blocking-IO)简称AIO

BIO:同步阻塞,在服务器中实现的模式为一个连接一个线程。也就是说,客户端有连接请求的时候,服务器就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然这也可以通过线程池机制改善。BIO一般适用于连接数目小且固定的架构,这种方式对于服务器资源要求比较高,而且并发局限于应用中,是JDK1.4之前的唯一选择,但好在程序直观简单,易理解。

NIO:同步非阻塞,在服务器中实现的模式为一个请求一个线程,也就是说,客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到有连接IO请求时才会启动一个线程进行处理。NIO一般适用于连接数目多且连接比较短(轻操作)的架构,并发局限于应用中,编程比较复杂,从JDK1.4开始支持。

AIO:异步并非阻塞,在服务器中实现的模式为一个有效请求一个线程,也就是说,客户端的IO请求都是通过操作系统先完成之后,再通知服务器应用去启动线程进行处理。AIO一般适用于连接数目多且连接比较长(重操作)的架构,充分调用操作系统参与并发操作,编程比较复杂,从JDK1.7开始支持。

编写java IO代码及查看系统调用

https://blog.csdn.net/weixin_43970890/article/details/118355800

查看系统调用的工具

strace -ff -o ./out java Test.java

文件IO

网络IO

BIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class BIOSocket {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8090);
System.out.println("step1: new ServerSocket ");
while (true) {
Socket client = serverSocket.accept();
System.out.println("step2: client\t" + client.getPort());
new Thread(() -> {
try {
InputStream in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SocketClient {

public static void main(String[] args) {

try {
Socket client = new Socket("xxx",8090);

client.setSendBufferSize(20);
client.setTcpNoDelay(true);
OutputStream out = client.getOutputStream();

InputStream in = System.in;
BufferedReader reader = new BufferedReader(new InputStreamReader(in));

while(true){
String line = reader.readLine();
if(line != null ){
byte[] bb = line.getBytes();
for (byte b : bb) {
out.write(b);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

启动时

1
2
3
4
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 5
bind(5, {sa_family=AF_INET, sin_port=htons(8090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(5, 50) = 0
poll([{fd=5, events=POLLIN|POLLERR}], 1, -1) = 1 ([{fd=5, revents=POLLIN}])

poll函数会阻塞直到其中任何一个fd发生事件。

1
2
3
accept(5, {sa_family=AF_INET, sin_port=htons(10253), sin_addr=inet_addr("42.120.74.252")}, [16]) = 6
clone(child_stack=0x7f013e5c4fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f013e5c59d0, tls=0x7f013e5c5700, child_tidptr=0x7f013e5c59d0) = 13168
poll([{fd=5, events=POLLIN|POLLERR}], 1, -1

抛出线程(即我们代码里的 new Thread() )后,继续poll阻塞等待连接。

clone出来的线程

1
recvfrom(6, "hello,bio\n", 8192, 0, NULL, NULL) =

关于对recvfrom函数的说明,其中第四个参数0 表示这是一个阻塞调用。

客户端发送数据后

1
recvfrom(6, "hello,bio\n", 8192, 0, NULL, NULL) = 10

NIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

public class NIOSocket {
private static LinkedList< SocketChannel> clients = new LinkedList<>();

private static void startClientChannelHandleThread(){
new Thread(() -> {
while (true){
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);

//处理客户端连接
for (SocketChannel c : clients) {
// 非阻塞, >0 表示读取到的字节数量, 0或-1表示未读取到或读取异常
int num = 0;
try {
num = c.read(buffer);
} catch (IOException e) {
e.printStackTrace();
}

if (num > 0) {
buffer.flip();
byte[] clientBytes = new byte[buffer.limit()];
//从缓冲区 读取到内存中
buffer.get(clientBytes);

System.out.println(c.socket().getPort() + ":" + new String(clientBytes));

//清空缓冲区
buffer.clear();
}
}
}
}).start();
}

public static void main(String[] args) throws IOException {
//new socket,开启监听
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress(9090));
//设置阻塞接受客户端连接
socketChannel.configureBlocking(true);

//开始client处理线程
startClientChannelHandleThread();

while (true) {
//接受客户端连接; 非阻塞,无客户端返回null(操作系统返回-1)
SocketChannel client = socketChannel.accept();

if (client == null) {
//System.out.println("no client");
} else {
//设置读非阻塞
client.configureBlocking(false);

int port = client.socket().getPort();
System.out.println("client port :" + port);

clients.add(client);
}
}
}
}

主线程

1
2
3
4
5
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4
bind(4, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(4, 50) = 0
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
accept(4, 0x7fe26414e680, 0x7fe26c376710) = -1 EAGAIN (Resource temporarily unavailable)

有连接后,子线程

1
2
3
read(6, 0x7f3f415b1c50, 4096)           = -1 EAGAIN (Resource temporarily unavailable)
read(6, 0x7f3f415b1c50, 4096) = -1 EAGAIN (Resource temporarily unavailable)
...

多路复用器(select、poll、epoll)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class MultiplexingSocket {

static ByteBuffer buffer = ByteBuffer.allocateDirect(4096);

public static void main(String[] args) throws Exception {

LinkedList< SocketChannel> clients = new LinkedList<>();

//1.启动server
//new socket,开启监听
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress(9090));
//设置非阻塞,接受客户端
socketChannel.configureBlocking(false);

//多路复用器(JDK包装的代理,select /poll/epoll/kqueue)
Selector selector = Selector.open(); //java自动代理,默认为epoll
//Selector selector = PollSelectorProvider.provider().openSelector();//指定为poll

//将服务端socket 注册到 多路复用器
socketChannel.register(selector, SelectionKey.OP_ACCEPT);

//2. 轮训多路复用器
// 先询问有没有连接,如果有则返回数量以及对应的对象(fd)
while (selector.select() > 0) {
System.out.println();
Set< SelectionKey> selectionKeys = selector.selectedKeys();
Iterator< SelectionKey> iter = selectionKeys.iterator();

while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();

//2.1 处理新的连接
if (key.isAcceptable()) {
//接受客户端连接; 非阻塞,无客户端返回null(操作系统返回-1)
SocketChannel client = socketChannel.accept();
//设置读非阻塞
client.configureBlocking(false);

//同样,把client也注册到selector
client.register(selector, SelectionKey.OP_READ);
System.out.println("new client : " + client.getRemoteAddress());
}
//2.2 处理读取数据
else if (key.isReadable()) {
readDataFromSocket(key);
}
}
}
}

protected static void readDataFromSocket(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
// 非阻塞, >0 表示读取到的字节数量, 0或-1表示未读取到或读取异常
// 请注意:这个例子降低复杂度,不考虑报文大于buffer size的情况
int num = socketChannel.read(buffer);

if (num > 0) {
buffer.flip();
byte[] clientBytes = new byte[buffer.limit()];
//从缓冲区 读取到内存中
buffer.get(clientBytes);

System.out.println(socketChannel.socket().getPort() + ":" + new String(clientBytes));

//清空缓冲区
buffer.clear();
}
}
}

启动

1
2
3
4
5
6
7
8
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4
bind(4, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(4, 50)
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
epoll_create(256) = 7
epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=4324783852322029573}}) = 0
epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=158913789956}}) = 0
epoll_wait(7

关于对epoll_create(对应着Java的 Selector selector = Selector.open()) 的说明,本质上是在内存的操作系统保留区,创建一个epoll数据结构。用于后面当有client连接时,向该epoll区中添加监听。

有连接

1
2
3
4
epoll_wait(7,[{EPOLLIN, {u32=4, u64=158913789956}}], 8192, -1) = 1
accept(4, {sa_family=AF_INET, sin_port=htons(29597), sin_addr=inet_addr("42.120.74.252")}, [16]) = 8
fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK) = 0
epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=3212844375897800712}}) = 0

关于epoll_ctl (对应着Java的 client.register(selector, SelectionKey.OP_READ) )。其中 EPOLLIN 恰好对应着Java的 SelectionKey.OP_READ 即监听数据到达读取事件。

客户端发送数据

1
2
3
epoll_wait(7,[{EPOLLIN, {u32=8, u64=3212844375897800712}}], 8192, -1) = 1
read(8, "hello,multiplex\n", 4096) = 16
epoll_wait(7,

epoll_wait第四个参数-1表示block。

netty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final class TelnetServer {

static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8992" : "8023"));

public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TelnetServerInitializer(sslCtx));

b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Sharable
public class TelnetServerHandler extends SimpleChannelInboundHandler< String> {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Send greeting for a new connection.
ctx.write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");
ctx.write("It is " + new Date() + " now.\r\n");
ctx.flush();
}

@Override
public void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {
// Generate and write a response.
String response;
boolean close = false;
if (request.isEmpty()) {
response = "Please type something.\r\n";
} else if ("bye".equals(request.toLowerCase())) {
response = "Have a good day!\r\n";
close = true;
} else {
response = "Did you say '" + request + "'?\r\n";
}

// We do not need to write a ChannelBuffer here.
// We know the encoder inserted at TelnetPipelineFactory will do the conversion.
ChannelFuture future = ctx.write(response);

// Close the connection after sending 'Have a good day!'
// if the client has sent 'bye'.
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TelnetServerInitializer extends ChannelInitializer< SocketChannel> {

private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();

private static final TelnetServerHandler SERVER_HANDLER = new TelnetServerHandler();

private final SslContext sslCtx;

public TelnetServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}

@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}

// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// the encoder and decoder are static as these are sharable
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);

// and then business logic.
pipeline.addLast(SERVER_HANDLER);
}
}

主线程(23109)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## 256无实际作用,这里只为了兼容旧版kernel api
epoll_create(256) = 7epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=5477705356928876549}}) = 0

epoll_create(256) = 10epoll_ctl(10, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=17041805914081853448}}) = 0

epoll_create(256) = 13
epoll_ctl(13, EPOLL_CTL_ADD, 11, {EPOLLIN, {u32=11, u64=17042151607409573899}}) = 0

epoll_create(256) = 16
epoll_ctl(16, EPOLL_CTL_ADD, 14, {EPOLLIN, {u32=14, u64=17042497300737294350}}) = 0

epoll_create(256) = 19
epoll_ctl(19, EPOLL_CTL_ADD, 17, {EPOLLIN, {u32=17, u64=17042561450368827409}}) = 0

epoll_create(256) = 10
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 20
clone(child_stack=0x7fc3c509afb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc3c509b9d0, tls=0x7fc3c509b700, child_tidptr=0x7fc3c509b9d0) = 23130

概括为:

向OS新建socket,并开启clone boss线程23130。
为BOSS创建了一个epoll(论证参见下面“boss”),每个worker创建一个epoll数据结构(本质上是在kernel内存区创建了一个数据结构,用于后续监听)。
创建boss线程监听的socket(本质上在kernel中创建一个数据结构)。

boss(23130)

1
2
3
4
5
6
7
8
9
10
bind(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(20, 128) = 0
getsockname(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0
getsockname(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0

##将fd为7号epoll和fd为20号的socket绑定,事件:epoll_ctl_add和epoll_ctl_mod
epoll_ctl(7, EPOLL_CTL_ADD, 20, {EPOLLIN, {u32=20, u64=14198059139132817428}}) = 0
epoll_ctl(7, EPOLL_CTL_MOD, 20, {EPOLLIN, {u32=20, u64=20}}) = 0
epoll_wait(7, [{EPOLLIN, {u32=5, u64=17295150779149058053}}], 8192, 1000) = 1
epoll_wait(7, [], 8192, 1000) = 0(不断轮训,1S超时一次)

概括为:

将上一步中main线程创建的fd:20绑定端口8023,并开启监听(网卡负责监听和接受连接和数据,kernel则负责路由到具体进程,具体参见:关于socket和bind和listen,TODO )。
将7号socket对应的fd绑定到20号对应的epoll数据结构上去(都是操作kernel中的内存)。
开始1S中一次阻塞等待epoll有任何连接或数据到达。

客户端连接

boss (23130)

1
2
3
4
5
6
7
8
accept(20, {sa_family=AF_INET, sin_port=htons(11144), sin_addr=inet_addr("42.120.74.122")}, [16]) = 24
getsockname(24, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("192.168.0.120")}, [16]) = 0
getsockname(24, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("192.168.0.120")}, [16]) = 0
setsockopt(24, SOL_TCP, TCP_NODELAY, [1], 4) = 0
getsockopt(24, SOL_SOCKET, SO_SNDBUF, [87040], [4]) = 0
getsockopt(24, SOL_SOCKET, SO_SNDBUF, [87040], [4]) = 0
##抛出 work线程
clone(child_stack=0x7fc3c4c98fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc3c4c999d0, tls=0x7fc3c4c99700, child_tidptr=0x7fc3c4c999d0) = 2301

worker (2301)

1
2
3
4
5
6
7
8
writev(24, [{"Welcome to iZbp14e1g9ztpshfrla9m"..., 37}, {"It is Sun Aug 23 15:44:14 CST 20"..., 41}], 2) = 78
epoll_ctl(13, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=24}}) = 0
epoll_ctl(13, EPOLL_CTL_MOD, 24, {EPOLLIN, {u32=24, u64=14180008216221450264}}) = 0
epoll_wait(13, [{EPOLLIN, {u32=11, u64=17042151607409573899}}], 8192, 1000) = 1
read(11, "\1", 128) = 1
##开始无限loop
epoll_wait(13, [], 8192, 1000) = 0
epoll_wait(13, [{EPOLLIN, {u32=24, u64=24}}], 8192, 1000) = 1

概括:

当BOSS轮训epoll_wait等到了连接后,首先accept得到该socket对应的fd。
连接建立后 BOSS立马抛出一个线程(clone函数)。
worker(即新建的线程)写入了一段数据(这里是业务逻辑)。
worker将该client对应的fd绑定到了13号epoll上。
worker继续轮训监听13号epoll。

客户端主动发送数据

worker(2301)

1
2
3
4
read(24, "i am daojian\r\n", 1024)      = 14
write(24, "Did you say 'i am daojian'?\r\n", 29) = 29
##继续无限loop
epoll_wait(13, [], 8192, 1000) = 0

概括为:

  • wait到数据后,立即read到用户控件内存中(读取1024个字节到 用户控件某个buff中)。
  • 写入数据(业务逻辑,不必太关注)。
  • 继续轮训等待13号epoll。

客户端发送bye报文,服务器断开TCP连接

worker(2301)

1
2
3
4
5
6
7
8
9
10
read(24, "bye\r\n", 1024)               = 5
write(24, "Have a good day!\r\n", 18) = 18
getsockopt(24, SOL_SOCKET, SO_LINGER, {onoff=0, linger=0}, [8]) = 0
dup2(25, 24) = 24
##从epoll数据结构中(OS)中删除fd为24的socket
epoll_ctl(13, EPOLL_CTL_DEL, 24, 0x7f702dd531e0) = -1 ENOENT
##关闭24 socket
close(24) = 0
##继续等待13 epoll数据
epoll_wait(13, [], 8192, 1000) = 0

断开客户端连接概括为:

  • 从epoll中删除该客户端对应的fd(这里触发源头没找到,可能是boss)。
  • close关闭客户端24号fd。
  • 继续轮训epoll。

五个客户端同时连接

boss线程(23130)

1
2
3
4
5
6
7
accept(20, {sa_family=AF_INET, sin_port=htons(1846), sin_addr=inet_addr("42.120.74.122")}, [16]) = 24
clone(child_stack=0x7f702cc51fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f702cc529d0, tls=0x7f702cc52700, child_tidptr=0x7f702cc529d0) = 10035

accept(20, {sa_family=AF_INET, sin_port=htons(42067), sin_addr=inet_addr("42.120.74.122")}, [16]) = 26
clone(child_stack=0x7f702cb50fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f702cb519d0, tls=0x7f702cb51700, child_tidptr=0x7f702cb519d0) = 10067

...

woker线程(10035,第一个连接)

1
2
3
4
epoll_ctl(13, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=24}}) = 0
epoll_ctl(13, EPOLL_CTL_MOD, 24, {EPOLLIN, {u32=24, u64=3226004877247250456}}) = 0
epoll_wait(13, [{EPOLLIN, {u32=11, u64=17042151607409573899}}], 8192, 1000) = 1 = 1
epoll_wait(13, [], 8192, 1000) = 0

worker线程(10067,第二个连接)

1
2
3
4
5
epoll_ctl(16, EPOLL_CTL_ADD, 26, {EPOLLIN, {u32=26, u64=26}}) = 0
epoll_ctl(16, EPOLL_CTL_MOD, 26, {EPOLLIN, {u32=26, u64=3221483685433835546}}) = 0
epoll_wait(16, [{EPOLLIN, {u32=14, u64=17042497300737294350}}], 8192, 1000) = 1
epoll_wait(16, [], 8192, 1000) = 0
epoll_wait(16, [], 8192, 1000) = 0

worker线程(10067,第二个连接)

1
2
epoll_ctl(19, EPOLL_CTL_ADD, 27, {EPOLLIN, {u32=27, u64=27}}) = 0
epoll_ctl(19, EPOLL_CTL_MOD, 27, {EPOLLIN, {u32=27, u64=3216966479350071323}}) = 0

worker线程(8055,第四个连接)

1
2
epoll_ctl(10, EPOLL_CTL_ADD, 28, {EPOLLIN, {u32=28, u64=28}}) = 0
epoll_ctl(10, EPOLL_CTL_MOD, 28, {EPOLLIN, {u32=28, u64=3302604828697427996}}) = 0

worker线程(10035,第五个连接,不在clone线程,而是复用了第一个epoll对应的worker)

1
2
epoll_ctl(13, EPOLL_CTL_ADD, 29, {EPOLLIN, {u32=29, u64=29}}) = 0
epoll_ctl(13, EPOLL_CTL_MOD, 29, {EPOLLIN, {u32=29, u64=29}}) = 0

概括为:

  • epoll和boss、worker之间的关系:一共有4个worker对应着4个epoll对象,boss和每个worker都有对应自己的epoll。
  • boss根据epoll数量,平衡分配连接到每个worker对应的epoll中。

java的IO
http://hanqichuan.com/2022/07/12/java/java的IO/
作者
韩启川
发布于
2022年7月12日
许可协议