linux文件系统 linux中常说的一句话 “ 一切皆文件”
数据块:
硬盘分成相同大小的单元,我们称为块(Block)。一块的大小是扇区大小的整数倍,默认是 4K。
Inode:
index node, linux中文件抽象出来的一个数据结构(class), index就是索引,就是去哪找组成文件的数据块。
文件描述符 fd(File Descriptor):
当使用系统调用 open 打开一个文件时,操作系统会创建一些数据结构来表示这个被打开的文件。为了能够找到这些数据结构,在进程中,我们会为这个打开的文件分配一个文件描述符 fd(File Descriptor)。
pageCache:
内存比磁盘快,linux把热点数据放入内存中,这个块区域称为pageCache。
虚拟文件系统VFS:
Linux 可以支持多达数十种不同的文件系统(ext4、ext3、ntfs)。它们的实现各不相同,因此 Linux 内核向用户空间提供了虚拟文件系统这个统一的接口,来对文件系统进行操作。
DMA(Direct Memory Access,直接存储器访问) 是所有现代电脑的重要特色,它允许不同速度的硬件装置来沟通,而不需要依赖于CPU的大量中断负载。否则,CPU 需要从来源把每一片段的资料复制到寄存器,然后把它们再次写回到新的地方。在这个时间中,CPU 对于其他的工作来说就无法使用。
1)用户进程向 CPU 发起 read 系统调用读取数据,由用户态切换为内核态,然后一直阻塞等待数据的返回。
2)CPU 在接收到指令以后对 DMA 磁盘控制器发起调度指令。
3)DMA 磁盘控制器对磁盘发起 I/O 请求,将磁盘数据先放入磁盘控制器缓冲区,CPU 全程不参与此过程。
4)数据读取完成后,DMA 磁盘控制器会接受到磁盘的通知,将数据从磁盘控制器缓冲区拷贝到内核缓冲区。
5)DMA 磁盘控制器向 CPU 发出数据读完的信号,由 CPU 负责将数据从内核缓冲区拷贝到用户缓冲区。
6)用户进程由内核态切换回用户态,解除阻塞状态,然后等待 CPU 的下一个执行时间钟。
内核缓冲区与进程缓冲区 缓冲区的目的,是为了减少频繁的系统IO调用。大家都知道,系统调用需要保存之前的进程数据和状态等信息,而结束调用之后回来还需要恢复之前的信息,为了减少这种损耗时间、也损耗性能的系统调用,于是出现了缓冲区。
有了缓冲区,操作系统使用read函数把数据从内核缓冲区复制到进程缓冲区,write把数据从进程缓冲区复制到内核缓冲区中。等待缓冲区达到一定数量的时候,再进行IO的调用,提升性能。至于什么时候读取和存储则由内核来决定,用户程序不需要关心。
在linux系统中,系统内核也有个缓冲区叫做内核缓冲区。每个进程有自己独立的缓冲区,叫做进程缓冲区。
所以,用户程序的IO读写程序,大多数情况下,并没有进行实际的IO操作,而是在读写自己的进程缓冲区。
什么是IO O,英文全称是Input/Output,翻译过来就是输入/输出 。平时我们听得挺多,就是什么磁盘IO,网络IO。那IO到底是什么呢?是不是有种懵懵懂懂的感觉呀,好像大概知道它是什么,又好像说不清楚。
IO,即输入/输出 ,到底谁是输入?谁是输出呢?IO如果脱离了主体,就会让人疑惑。
计算机角度的IO 我们常说的输入输出,比较直观的意思就是计算机的输入输出 ,计算机就是主体 。大家是否还记得,大学学计算机组成原理 的时候,有个冯.诺依曼结构 ,它将计算机分成分为5个部分:运算器、控制器、存储器、输入设备、输出设备。
输入设备是向计算机输入数据和信息的设备,键盘,鼠标都属于输入设备;输出设备是计算机硬件系统的终端设备,用于接收计算机数据的输出显示,一般显示器、打印机属于输出设备。
例如你在鼠标键盘敲几下,它就会把你的指令数据,传给主机,主机通过运算后,把返回的数据信息,输出到显示器。
鼠标、显示器这只是直观表面的输入输出,回到计算机架构来说,涉及计算机核心与其他设备间数据迁移的过程,就是IO 。如磁盘IO,就是从磁盘读取数据到内存,这算一次输入,对应的,将内存中的数据写入磁盘,就算输出。这就是IO的本质。
操作系统的IO 我们要将内存中的数据写入到磁盘的话,主体会是什么呢?主体可能是一个应用程序,比如一个Java进程(假设网络传来二进制流,一个Java进程可以把它写入到磁盘)。
操作系统 负责计算机的资源管理和进程的调度。我们电脑上跑着的应用程序,其实是需要经过操作系统 ,才能做一些特殊操作,如磁盘文件读写、内存的读写 等等。因为这些都是比较危险的操作,不可以由应用程序乱来,只能交给底层操作系统来。也就是说,你的应用程序要把数据写入磁盘,只能通过调用操作系统开放出来的API来操作。
什么是用户空间?什么是内核空间?
以32位操作系统为例,它为每一个进程都分配了4G(2的32次方)的内存空间。这4G可访问的内存空间分为二部分,一部分是用户空间,一部分是内核空间。内核空间是操作系统内核访问的区域,是受保护的内存空间,而用户空间是用户应用程序访问的内存区域。
我们应用程序是跑在用户空间的,它不存在实质的IO过程,真正的IO是在操作系统 执行的。即应用程序的IO操作分为两种动作:IO调用和IO执行 。IO调用是由进程(应用程序的运行态)发起,而IO执行是操作系统内核 的工作。此时所说的IO是应用程序对操作系统IO功能的一次触发,即IO调用。
操作系统的一次IO过程 应用程序发起的一次IO操作包含两个阶段:
IO调用:应用程序进程向操作系统内核 发起调用。
IO执行:操作系统内核完成IO操作。
操作系统内核完成IO操作还包括两个过程:
其实IO就是把进程的内部数据转移到外部设备,或者把外部设备的数据迁移到进程内部。外部设备一般指硬盘、socket通讯的网卡。一个完整的IO过程 包括以下几个步骤:
阻塞和非阻塞的区别 阻塞和非阻塞的区别就在于第一个阶段,如果数据没有就绪,在查看数据是否就绪的过程中是一直等待,还是直接返回一个标志信息。
同步和异步的区别是否会导致发起IO请求的线程暂停。
同步与异步的区别 同步需要通过用户线程或者内核不断地去轮询数据是否就绪。
异步是IO操作的两个阶段都是由内核自动完成,然后发送通知告知用户线程IO操作已经完成。
阻塞IO模型 我们已经知道IO是什么啦,那什么是阻塞IO 呢?
假设应用程序的进程发起IO调用 ,但是如果内核的数据还没准备好 的话,那应用程序进程就一直在阻塞等待 ,一直等到内核数据准备好了,从内核拷贝到用户空间,才返回成功提示,此次IO操作,称之为阻塞IO 。
非阻塞IO模型 如果内核数据还没准备好,可以先返回错误信息给用户进程,让它不需要等待,而是通过轮询的方式再来请求。这就是非阻塞IO,流程图如下:
非阻塞IO的流程如下:
应用进程向操作系统内核,发起recvfrom
读取数据。
操作系统内核数据没有准备好,立即返回EWOULDBLOCK
错误码。
应用程序进程轮询调用,继续向操作系统内核发起recvfrom
读取数据。
操作系统内核数据准备好了,从内核缓冲区拷贝到用户空间。
完成调用,返回成功提示。
非阻塞IO模型,简称NIO ,Non-Blocking IO
。它相对于阻塞IO,虽然大幅提升了性能,但是它依然存在性能问题 ,即频繁的轮询 ,导致频繁的系统调用,同样会消耗大量的CPU资源。可以考虑IO复用模型 ,去解决这个问题。
IO多路复用模型 既然NIO 无效的轮询会导致CPU资源消耗,我们等到内核数据准备好了,主动通知应用进程再去进行系统调用,那不就好了嘛?
当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。
IO复用模型核心思路:系统给我们提供一类函数 (如我们耳濡目染的select、poll、epoll 函数),它们可以同时监控多个fd
的操作,任何一个返回内核数据就绪,应用进程再发起recvfrom
系统调用。
IO多路复用之select 应用进程通过调用select 函数,可以同时监控多个fd
,在select
函数监控的fd
中,只要有任何一个数据状态准备就绪了,select
函数就会返回可读状态,这时应用进程再发起recvfrom
请求去读取数据。
非阻塞IO模型(NIO)中,需要N
(N>=1)次轮询系统调用,然而借助select
的IO多路复用模型,只需要发起一次询问就够了,大大优化了性能。
但是呢,select
有几个缺点:
因为存在连接数限制 ,所以后来又提出了poll 。与select相比,poll 解决了连接数限制问题 。但是呢,select和poll一样,还是需要通过遍历文件描述符来获取已经就绪的socket
。如果同时连接的大量客户端,在一时刻可能只有极少处于就绪状态,伴随着监视的描述符数量的增长,效率也会线性下降 。
因此经典的多路复用模型epoll
诞生。
IO多路复用之epoll 为了解决select/poll
存在的问题,多路复用模型epoll
诞生,它采用事件驱动来实现,流程图如下:
epoll 先通过epoll_ctl()
来注册一个fd
(文件描述符),一旦基于某个fd
就绪时,内核会采用回调机制,迅速激活这个fd
,当进程调用epoll_wait()
时便得到通知。这里去掉了遍历文件描述符 的坑爹操作,而是采用监听事件回调 的机制。这就是epoll的亮点。
我们一起来总结一下select、poll、epoll的区别
select
poll
epoll
底层数据结构
数组
链表
红黑树和双链表
获取就绪的fd
遍历
遍历
事件回调
事件复杂度
O(n)
O(n)
O(1)
最大连接数
1024
无限制
无限制
fd数据拷贝
每次调用select
需要将fd数据从用户空间拷贝到内核空间 每次调用poll,需要将fd数据从用户空间拷贝到内核空间
使用内存映射(mmap),不需要从用户空间频繁拷贝fd数据到内核空间
epoll 明显优化了IO的执行效率,但在进程调用epoll_wait()
时,仍然可能被阻塞。能不能酱紫:不用我老是去问你数据是否准备就绪,等我发出请求后,你数据准备好了通知我就行了,这就诞生了信号驱动IO模型 。
IO模型之信号驱动模型 信号驱动IO不再用主动询问的方式去确认数据是否就绪,而是向内核发送一个信号(调用sigaction
的时候建立一个SIGIO
的信号),然后应用用户进程可以去做别的事,不用阻塞。当内核数据准备好后,再通过SIGIO
信号通知应用进程,数据准备好后的可读状态。应用用户进程收到信号之后,立即调用recvfrom
,去读取数据。
信号驱动IO模型,在应用进程发出信号后,是立即返回的,不会阻塞进程。它已经有异步操作的感觉了。但是你细看上面的流程图,发现数据复制到应用缓冲的时候 ,应用进程还是阻塞的。回过头来看下,不管是BIO,还是NIO,还是信号驱动,在数据从内核复制到应用缓冲的时候,都是阻塞的。还有没有优化方案呢?AIO (真正的异步IO)!
IO 模型之异步IO(AIO) 前面讲的BIO,NIO和信号驱动
,在数据从内核复制到应用缓冲的时候,都是阻塞 的,因此都不算是真正的异步。AIO
实现了IO全流程的非阻塞,就是应用进程发出系统调用后,是立即返回的,但是立即返回的不是处理结果,而是表示提交成功类似的意思 。等内核数据准备好,将数据拷贝到用户进程缓冲区,发送信号通知用户进程IO操作执行完毕。
流程如下:
![IO 模型之异步IO(AIO)](img/IO 模型之异步IO(AIO).png)
异步IO的优化思路很简单,只需要向内核发送一次请求,就可以完成数据状态询问和数据拷贝的所有操作,并且不用阻塞等待结果。日常开发中,有类似思想的业务场景:
比如发起一笔批量转账,但是批量转账处理比较耗时,这时候后端可以先告知前端转账提交成功,等到结果处理完,再通知前端结果即可。
阻塞、非阻塞、同步、异步IO总结
BIO:同步阻塞,在服务器中实现的模式为一个连接一个线程 。也就是说,客户端有连接请求的时候,服务器就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然这也可以通过线程池机制改善。BIO一般适用于连接数目小且固定的架构 ,这种方式对于服务器资源要求比较高,而且并发局限于应用中,是JDK1.4之前的唯一选择,但好在程序直观简单,易理解。
NIO:同步非阻塞,在服务器中实现的模式为一个请求一个线程 ,也就是说,客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到有连接IO请求时才会启动一个线程进行处理。NIO一般适用于连接数目多且连接比较短(轻操作)的架构 ,并发局限于应用中,编程比较复杂,从JDK1.4开始支持。
AIO:异步并非阻塞,在服务器中实现的模式为一个有效请求一个线程 ,也就是说,客户端的IO请求都是通过操作系统先完成之后,再通知服务器应用去启动线程进行处理。AIO一般适用于连接数目多且连接比较长(重操作)的架构,充分调用操作系统参与并发操作,编程比较复杂,从JDK1.7开始支持。
编写java IO代码及查看系统调用 https://blog.csdn.net/weixin_43970890/article/details/118355800
查看系统调用的工具 strace -ff -o ./out java Test.java
文件IO 网络IO BIO 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class BIOSocket { public static void main (String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket (8090 ); System.out.println("step1: new ServerSocket " ); while (true ) { Socket client = serverSocket.accept(); System.out.println("step2: client\t" + client.getPort()); new Thread (() -> { try { InputStream in = client.getInputStream(); BufferedReader reader = new BufferedReader (new InputStreamReader (in)); while (true ) { System.out.println(reader.readLine()); } } catch (IOException e) { e.printStackTrace(); } }).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class SocketClient { public static void main (String[] args) { try { Socket client = new Socket ("xxx" ,8090 ); client.setSendBufferSize(20 ); client.setTcpNoDelay(true ); OutputStream out = client.getOutputStream(); InputStream in = System.in; BufferedReader reader = new BufferedReader (new InputStreamReader (in)); while (true ){ String line = reader.readLine(); if (line != null ){ byte [] bb = line.getBytes(); for (byte b : bb) { out.write(b); } } } } catch (IOException e) { e.printStackTrace(); } } }
启动时
1 2 3 4 socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 5 bind(5, {sa_family=AF_INET, sin_port=htons(8090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0 listen(5, 50) = 0 poll([{fd=5, events=POLLIN|POLLERR}], 1, -1) = 1 ([{fd=5, revents=POLLIN}])
poll函数会阻塞直到其中任何一个fd发生事件。
1 2 3 accept(5, {sa_family=AF_INET, sin_port=htons(10253), sin_addr=inet_addr("42.120.74.252")}, [16]) = 6 clone(child_stack=0x7f013e5c4fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f013e5c59d0, tls=0x7f013e5c5700, child_tidptr=0x7f013e5c59d0) = 13168 poll([{fd=5, events=POLLIN|POLLERR}], 1, -1
抛出线程(即我们代码里的 new Thread() )后,继续poll阻塞等待连接。
clone出来的线程
1 recvfrom(6, "hello,bio\n", 8192, 0, NULL, NULL) =
关于对recvfrom函数的说明,其中第四个参数0 表示这是一个阻塞调用。
客户端发送数据后
1 recvfrom(6, "hello,bio\n", 8192, 0, NULL, NULL) = 10
NIO 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class NIOSocket { private static LinkedList< SocketChannel> clients = new LinkedList <>(); private static void startClientChannelHandleThread () { new Thread (() -> { while (true ){ ByteBuffer buffer = ByteBuffer.allocateDirect(4096 ); for (SocketChannel c : clients) { int num = 0 ; try { num = c.read(buffer); } catch (IOException e) { e.printStackTrace(); } if (num > 0 ) { buffer.flip(); byte [] clientBytes = new byte [buffer.limit()]; buffer.get(clientBytes); System.out.println(c.socket().getPort() + ":" + new String (clientBytes)); buffer.clear(); } } } }).start(); } public static void main (String[] args) throws IOException { ServerSocketChannel socketChannel = ServerSocketChannel.open(); socketChannel.bind(new InetSocketAddress (9090 )); socketChannel.configureBlocking(true ); startClientChannelHandleThread(); while (true ) { SocketChannel client = socketChannel.accept(); if (client == null ) { } else { client.configureBlocking(false ); int port = client.socket().getPort(); System.out.println("client port :" + port); clients.add(client); } } } }
主线程
1 2 3 4 5 socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4 bind(4, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0 listen(4, 50) = 0 fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0 accept(4, 0x7fe26414e680, 0x7fe26c376710) = -1 EAGAIN (Resource temporarily unavailable)
有连接后,子线程
1 2 3 read(6, 0x7f3f415b1c50, 4096) = -1 EAGAIN (Resource temporarily unavailable) read(6, 0x7f3f415b1c50, 4096) = -1 EAGAIN (Resource temporarily unavailable) ...
多路复用器(select、poll、epoll) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class MultiplexingSocket { static ByteBuffer buffer = ByteBuffer.allocateDirect(4096 ); public static void main (String[] args) throws Exception { LinkedList< SocketChannel> clients = new LinkedList <>(); ServerSocketChannel socketChannel = ServerSocketChannel.open(); socketChannel.bind(new InetSocketAddress (9090 )); socketChannel.configureBlocking(false ); Selector selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0 ) { System.out.println(); Set< SelectionKey> selectionKeys = selector.selectedKeys(); Iterator< SelectionKey> iter = selectionKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { SocketChannel client = socketChannel.accept(); client.configureBlocking(false ); client.register(selector, SelectionKey.OP_READ); System.out.println("new client : " + client.getRemoteAddress()); } else if (key.isReadable()) { readDataFromSocket(key); } } } } protected static void readDataFromSocket (SelectionKey key) throws Exception { SocketChannel socketChannel = (SocketChannel) key.channel(); int num = socketChannel.read(buffer); if (num > 0 ) { buffer.flip(); byte [] clientBytes = new byte [buffer.limit()]; buffer.get(clientBytes); System.out.println(socketChannel.socket().getPort() + ":" + new String (clientBytes)); buffer.clear(); } } }
启动
1 2 3 4 5 6 7 8 socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4 bind(4, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0 listen(4, 50) fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0 epoll_create(256) = 7 epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=4324783852322029573}}) = 0 epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=158913789956}}) = 0 epoll_wait(7
关于对epoll_create(对应着Java的 Selector selector = Selector.open()) 的说明,本质上是在内存的操作系统保留区,创建一个epoll数据结构。用于后面当有client连接时,向该epoll区中添加监听。
有连接
1 2 3 4 epoll_wait(7,[{EPOLLIN, {u32=4, u64=158913789956}}], 8192, -1) = 1 accept(4, {sa_family=AF_INET, sin_port=htons(29597), sin_addr=inet_addr("42.120.74.252")}, [16]) = 8 fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK) = 0 epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=3212844375897800712}}) = 0
关于epoll_ctl (对应着Java的 client.register(selector, SelectionKey.OP_READ) )。其中 EPOLLIN 恰好对应着Java的 SelectionKey.OP_READ 即监听数据到达读取事件。
客户端发送数据
1 2 3 epoll_wait(7,[{EPOLLIN, {u32=8, u64=3212844375897800712}}], 8192, -1) = 1 read(8, "hello,multiplex\n", 4096) = 16 epoll_wait(7,
epoll_wait第四个参数-1表示block。
netty 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public final class TelnetServer { static final boolean SSL = System.getProperty("ssl" ) != null ; static final int PORT = Integer.parseInt(System.getProperty("port" , SSL? "8992" : "8023" )); public static void main (String[] args) throws Exception { final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate (); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null ; } EventLoopGroup bossGroup = new NioEventLoopGroup (1 ); EventLoopGroup workerGroup = new NioEventLoopGroup (); try { ServerBootstrap b = new ServerBootstrap (); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler (LogLevel.INFO)) .childHandler(new TelnetServerInitializer (sslCtx)); b.bind(PORT).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Sharable public class TelnetServerHandler extends SimpleChannelInboundHandler < String> { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n" ); ctx.write("It is " + new Date () + " now.\r\n" ); ctx.flush(); } @Override public void channelRead0 (ChannelHandlerContext ctx, String request) throws Exception { String response; boolean close = false ; if (request.isEmpty()) { response = "Please type something.\r\n" ; } else if ("bye" .equals(request.toLowerCase())) { response = "Have a good day!\r\n" ; close = true ; } else { response = "Did you say '" + request + "'?\r\n" ; } ChannelFuture future = ctx.write(response); if (close) { future.addListener(ChannelFutureListener.CLOSE); } } @Override public void channelReadComplete (ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class TelnetServerInitializer extends ChannelInitializer < SocketChannel> { private static final StringDecoder DECODER = new StringDecoder (); private static final StringEncoder ENCODER = new StringEncoder (); private static final TelnetServerHandler SERVER_HANDLER = new TelnetServerHandler (); private final SslContext sslCtx; public TelnetServerInitializer (SslContext sslCtx) { this .sslCtx = sslCtx; } @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null ) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new DelimiterBasedFrameDecoder (8192 , Delimiters.lineDelimiter())); pipeline.addLast(DECODER); pipeline.addLast(ENCODER); pipeline.addLast(SERVER_HANDLER); } }
主线程(23109)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ## 256无实际作用,这里只为了兼容旧版kernel api epoll_create(256) = 7epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=5477705356928876549}}) = 0 epoll_create(256) = 10epoll_ctl(10, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=17041805914081853448}}) = 0 epoll_create(256) = 13 epoll_ctl(13, EPOLL_CTL_ADD, 11, {EPOLLIN, {u32=11, u64=17042151607409573899}}) = 0 epoll_create(256) = 16 epoll_ctl(16, EPOLL_CTL_ADD, 14, {EPOLLIN, {u32=14, u64=17042497300737294350}}) = 0 epoll_create(256) = 19 epoll_ctl(19, EPOLL_CTL_ADD, 17, {EPOLLIN, {u32=17, u64=17042561450368827409}}) = 0 epoll_create(256) = 10 socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 20 clone(child_stack=0x7fc3c509afb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc3c509b9d0, tls=0x7fc3c509b700, child_tidptr=0x7fc3c509b9d0) = 23130
概括为:
向OS新建socket,并开启clone boss线程23130。 为BOSS创建了一个epoll(论证参见下面“boss”),每个worker创建一个epoll数据结构(本质上是在kernel内存区创建了一个数据结构,用于后续监听)。 创建boss线程监听的socket(本质上在kernel中创建一个数据结构)。
boss(23130)
1 2 3 4 5 6 7 8 9 10 bind(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, 16) = 0 listen(20, 128) = 0 getsockname(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0 getsockname(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0 ##将fd为7号epoll和fd为20号的socket绑定,事件:epoll_ctl_add和epoll_ctl_mod epoll_ctl(7, EPOLL_CTL_ADD, 20, {EPOLLIN, {u32=20, u64=14198059139132817428}}) = 0 epoll_ctl(7, EPOLL_CTL_MOD, 20, {EPOLLIN, {u32=20, u64=20}}) = 0 epoll_wait(7, [{EPOLLIN, {u32=5, u64=17295150779149058053}}], 8192, 1000) = 1 epoll_wait(7, [], 8192, 1000) = 0(不断轮训,1S超时一次)
概括为:
将上一步中main线程创建的fd:20绑定端口8023,并开启监听(网卡负责监听和接受连接和数据,kernel则负责路由到具体进程,具体参见:关于socket和bind和listen,TODO )。 将7号socket对应的fd绑定到20号对应的epoll数据结构上去(都是操作kernel中的内存)。 开始1S中一次阻塞等待epoll有任何连接或数据到达。
客户端连接
boss (23130)
1 2 3 4 5 6 7 8 accept(20, {sa_family=AF_INET, sin_port=htons(11144), sin_addr=inet_addr("42.120.74.122")}, [16]) = 24 getsockname(24, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("192.168.0.120")}, [16]) = 0 getsockname(24, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("192.168.0.120")}, [16]) = 0 setsockopt(24, SOL_TCP, TCP_NODELAY, [1], 4) = 0 getsockopt(24, SOL_SOCKET, SO_SNDBUF, [87040], [4]) = 0 getsockopt(24, SOL_SOCKET, SO_SNDBUF, [87040], [4]) = 0 ##抛出 work线程 clone(child_stack=0x7fc3c4c98fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc3c4c999d0, tls=0x7fc3c4c99700, child_tidptr=0x7fc3c4c999d0) = 2301
worker (2301)
1 2 3 4 5 6 7 8 writev(24, [{"Welcome to iZbp14e1g9ztpshfrla9m"..., 37}, {"It is Sun Aug 23 15:44:14 CST 20"..., 41}], 2) = 78 epoll_ctl(13, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=24}}) = 0 epoll_ctl(13, EPOLL_CTL_MOD, 24, {EPOLLIN, {u32=24, u64=14180008216221450264}}) = 0 epoll_wait(13, [{EPOLLIN, {u32=11, u64=17042151607409573899}}], 8192, 1000) = 1 read(11, "\1", 128) = 1 ##开始无限loop epoll_wait(13, [], 8192, 1000) = 0 epoll_wait(13, [{EPOLLIN, {u32=24, u64=24}}], 8192, 1000) = 1
概括:
当BOSS轮训epoll_wait等到了连接后,首先accept得到该socket对应的fd。 连接建立后 BOSS立马抛出一个线程(clone函数)。 worker(即新建的线程)写入了一段数据(这里是业务逻辑)。 worker将该client对应的fd绑定到了13号epoll上。 worker继续轮训监听13号epoll。
客户端主动发送数据
worker(2301)
1 2 3 4 read(24, "i am daojian\r\n", 1024) = 14 write(24, "Did you say 'i am daojian'?\r\n", 29) = 29 ##继续无限loop epoll_wait(13, [], 8192, 1000) = 0
概括为:
wait到数据后,立即read到用户控件内存中(读取1024个字节到 用户控件某个buff中)。
写入数据(业务逻辑,不必太关注)。
继续轮训等待13号epoll。
客户端发送bye报文,服务器断开TCP连接
worker(2301)
1 2 3 4 5 6 7 8 9 10 read(24, "bye\r\n", 1024) = 5 write(24, "Have a good day!\r\n", 18) = 18 getsockopt(24, SOL_SOCKET, SO_LINGER, {onoff=0, linger=0}, [8]) = 0 dup2(25, 24) = 24 ##从epoll数据结构中(OS)中删除fd为24的socket epoll_ctl(13, EPOLL_CTL_DEL, 24, 0x7f702dd531e0) = -1 ENOENT ##关闭24 socket close(24) = 0 ##继续等待13 epoll数据 epoll_wait(13, [], 8192, 1000) = 0
断开客户端连接概括为:
从epoll中删除该客户端对应的fd(这里触发源头没找到,可能是boss)。
close关闭客户端24号fd。
继续轮训epoll。
五个客户端同时连接
boss线程(23130)
1 2 3 4 5 6 7 accept(20, {sa_family=AF_INET, sin_port=htons(1846), sin_addr=inet_addr("42.120.74.122")}, [16]) = 24 clone(child_stack=0x7f702cc51fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f702cc529d0, tls=0x7f702cc52700, child_tidptr=0x7f702cc529d0) = 10035 accept(20, {sa_family=AF_INET, sin_port=htons(42067), sin_addr=inet_addr("42.120.74.122")}, [16]) = 26 clone(child_stack=0x7f702cb50fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f702cb519d0, tls=0x7f702cb51700, child_tidptr=0x7f702cb519d0) = 10067 ...
woker线程(10035,第一个连接)
1 2 3 4 epoll_ctl(13, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=24}}) = 0 epoll_ctl(13, EPOLL_CTL_MOD, 24, {EPOLLIN, {u32=24, u64=3226004877247250456}}) = 0 epoll_wait(13, [{EPOLLIN, {u32=11, u64=17042151607409573899}}], 8192, 1000) = 1 = 1 epoll_wait(13, [], 8192, 1000) = 0
worker线程(10067,第二个连接)
1 2 3 4 5 epoll_ctl(16, EPOLL_CTL_ADD, 26, {EPOLLIN, {u32=26, u64=26}}) = 0 epoll_ctl(16, EPOLL_CTL_MOD, 26, {EPOLLIN, {u32=26, u64=3221483685433835546}}) = 0 epoll_wait(16, [{EPOLLIN, {u32=14, u64=17042497300737294350}}], 8192, 1000) = 1 epoll_wait(16, [], 8192, 1000) = 0 epoll_wait(16, [], 8192, 1000) = 0
worker线程(10067,第二个连接)
1 2 epoll_ctl(19, EPOLL_CTL_ADD, 27, {EPOLLIN, {u32=27, u64=27}}) = 0 epoll_ctl(19, EPOLL_CTL_MOD, 27, {EPOLLIN, {u32=27, u64=3216966479350071323}}) = 0
worker线程(8055,第四个连接)
1 2 epoll_ctl(10, EPOLL_CTL_ADD, 28, {EPOLLIN, {u32=28, u64=28}}) = 0 epoll_ctl(10, EPOLL_CTL_MOD, 28, {EPOLLIN, {u32=28, u64=3302604828697427996}}) = 0
worker线程(10035,第五个连接,不在clone线程,而是复用了第一个epoll对应的worker)
1 2 epoll_ctl(13, EPOLL_CTL_ADD, 29, {EPOLLIN, {u32=29, u64=29}}) = 0 epoll_ctl(13, EPOLL_CTL_MOD, 29, {EPOLLIN, {u32=29, u64=29}}) = 0
概括为:
epoll和boss、worker之间的关系:一共有4个worker对应着4个epoll对象,boss和每个worker都有对应自己的epoll。
boss根据epoll数量,平衡分配连接到每个worker对应的epoll中。