这篇文章简单介绍一些在 Java 网络编程中,经常遇到的一些概念,比如 Socket、Select、Selector 等,避免在具体使用它们的时候,混淆它们的作用。
Socket
介绍
Socket 就是 TCP/IP 协议的编程模型,通过一系列 API 来访问操作系统的网络协议栈,从而建立 TCP 连接,发送或接收数据。
传统意义上是类似于文件描述符的存在形式,是一种抽象的资源定位,即进程端口资源。
一个程序启动后,有 pid 文件标识锁定该运行的程序。
一个文件打开后,在进程中有一个 fd 标识着它。
一个进程占用了一个端口,并建立相关协议的通信,由 socket 标识着它。
例子
服务端
public class Server {
public static void main(String[] args) {
try {
// 创建ServerSocket并绑定端口
ServerSocket serverSocket = new ServerSocket(8888);
System.out.println("Server started, waiting for client...");
// 监听客户端连接
Socket clientSocket = serverSocket.accept();
System.out.println("Client connected: " + clientSocket.getInetAddress());
// 获取输入流和输出流
InputStream inputStream = clientSocket.getInputStream();
OutputStream outputStream = clientSocket.getOutputStream();
// 读取客户端发送的数据
byte[] buffer = new byte[1024];
int length = inputStream.read(buffer);
String message = new String(buffer, 0, length);
System.out.println("Received message from client: " + message);
// 向客户端发送数据
String response = "Hello, client! TCP!";
outputStream.write(response.getBytes());
// 关闭连接
clientSocket.close();
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端
public class Client {
public static void main(String[] args) {
try {
// 创建Socket并连接服务器
Socket socket = new Socket("localhost", 8888);
System.out.println("Connected to server: " + socket.getInetAddress());
// 获取输入流和输出流
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
// 向服务器发送数据
String message = "Hello, server! TCP!";
outputStream.write(message.getBytes());
// 读取服务器返回的数据
byte[] buffer = new byte[1024];
int length = inputStream.read(buffer);
String response = new String(buffer, 0, length);
System.out.println("Received message from server: " + response);
// 关闭连接
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
tcpdump
// 三次握手
22:07:38.042234 IP localhost.63836 > localhost.ddi-tcp-1: Flags [S], seq 3851306320, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 496650245 ecr 0,sackOK,eol], length 0
22:07:38.042243 IP localhost.63836 > localhost.ddi-tcp-1: Flags [S], seq 3851306320, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 496650245 ecr 0,sackOK,eol], length 0
22:07:38.042353 IP localhost.ddi-tcp-1 > localhost.63836: Flags [S.], seq 3003959104, ack 3851306321, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 1229281786 ecr 496650245,sackOK,eol], length 0
22:07:38.042357 IP localhost.ddi-tcp-1 > localhost.63836: Flags [S.], seq 3003959104, ack 3851306321, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 1229281786 ecr 496650245,sackOK,eol], length 0
22:07:38.042374 IP localhost.63836 > localhost.ddi-tcp-1: Flags [.], ack 1, win 6379, options [nop,nop,TS val 496650245 ecr 1229281786], length 0
22:07:38.042376 IP localhost.63836 > localhost.ddi-tcp-1: Flags [.], ack 1, win 6379, options [nop,nop,TS val 496650245 ecr 1229281786], length 0
22:07:38.042389 IP localhost.ddi-tcp-1 > localhost.63836: Flags [.], ack 1, win 6379, options [nop,nop,TS val 1229281786 ecr 496650245], length 0
22:07:38.042391 IP localhost.ddi-tcp-1 > localhost.63836: Flags [.], ack 1, win 6379, options [nop,nop,TS val 1229281786 ecr 496650245], length 0
// 通信
22:07:38.043893 IP localhost.63836 > localhost.ddi-tcp-1: Flags [P.], seq 1:20, ack 1, win 6379, options [nop,nop,TS val 496650247 ecr 1229281786], length 19
22:07:38.043907 IP localhost.63836 > localhost.ddi-tcp-1: Flags [P.], seq 1:20, ack 1, win 6379, options [nop,nop,TS val 496650247 ecr 1229281786], length 19
22:07:38.043920 IP localhost.ddi-tcp-1 > localhost.63836: Flags [.], ack 20, win 6379, options [nop,nop,TS val 1229281788 ecr 496650247], length 0
22:07:38.043926 IP localhost.ddi-tcp-1 > localhost.63836: Flags [.], ack 20, win 6379, options [nop,nop,TS val 1229281788 ecr 496650247], length 0
22:07:38.044294 IP localhost.ddi-tcp-1 > localhost.63836: Flags [P.], seq 1:22, ack 20, win 6379, options [nop,nop,TS val 1229281789 ecr 496650247], length 21
22:07:38.044304 IP localhost.ddi-tcp-1 > localhost.63836: Flags [P.], seq 1:22, ack 20, win 6379, options [nop,nop,TS val 1229281789 ecr 496650247], length 21
22:07:38.044318 IP localhost.63836 > localhost.ddi-tcp-1: Flags [.], ack 22, win 6379, options [nop,nop,TS val 496650248 ecr 1229281789], length 0
22:07:38.044325 IP localhost.63836 > localhost.ddi-tcp-1: Flags [.], ack 22, win 6379, options [nop,nop,TS val 496650248 ecr 1229281789], length 0
// 四次挥手
22:07:38.044630 IP localhost.ddi-tcp-1 > localhost.63836: Flags [F.], seq 22, ack 20, win 6379, options [nop,nop,TS val 1229281789 ecr 496650248], length 0
22:07:38.044636 IP localhost.ddi-tcp-1 > localhost.63836: Flags [F.], seq 22, ack 20, win 6379, options [nop,nop,TS val 1229281789 ecr 496650248], length 0
22:07:38.044644 IP localhost.63836 > localhost.ddi-tcp-1: Flags [.], ack 23, win 6379, options [nop,nop,TS val 496650248 ecr 1229281789], length 0
22:07:38.044647 IP localhost.63836 > localhost.ddi-tcp-1: Flags [.], ack 23, win 6379, options [nop,nop,TS val 496650248 ecr 1229281789], length 0
22:07:38.044650 IP localhost.63836 > localhost.ddi-tcp-1: Flags [F.], seq 20, ack 23, win 6379, options [nop,nop,TS val 496650248 ecr 1229281789], length 0
22:07:38.044652 IP localhost.63836 > localhost.ddi-tcp-1: Flags [F.], seq 20, ack 23, win 6379, options [nop,nop,TS val 496650248 ecr 1229281789], length 0
22:07:38.044664 IP localhost.ddi-tcp-1 > localhost.63836: Flags [.], ack 21, win 6379, options [nop,nop,TS val 1229281789 ecr 496650248], length 0
22:07:38.044666 IP localhost.ddi-tcp-1 > localhost.63836: Flags [.], ack 21, win 6379, options [nop,nop,TS val 1229281789 ecr 496650248], length 0
解释
- 22:07:38.044666 时间带有精确到微妙
- localhost.39870 > localhost.9502 表示通信的流向,39870是客户端,9502是服务器端
- [S] 表示这是一个SYN请求
- [S.] 表示这是一个SYN+ACK确认包:
- [.] 表示这是一个ACT确认包, (client)SYN->(server)SYN->(client)ACT 就是3次握手过程
- [P] 表示这个是一个数据推送,可以是从服务器端向客户端推送,也可以从客户端向服务器端推
- [F] 表示这是一个FIN包,是关闭连接操作,client/server都有可能发起
- [R] 表示这是一个RST包,与F包作用相同,但RST表示连接关闭时,仍然有数据未被处理。可以理解为是强制切断连接
- win 4099 是指滑动窗口大小
- length 18指数据包的大小
NIO
例子
public class HttpServer {
public static void main(String[] args) throws IOException {
// 创建一个Selector
Selector selector = Selector.open();
// 创建一个ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
serverSocketChannel.configureBlocking(false);
// 注册ServerSocketChannel到Selector,并监听连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞等待就绪的事件
selector.select();
// 获取所有已就绪的事件
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
// 处理已就绪的事件
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
// 处理连接事件
handleAccept(key);
} else if (key.isReadable()) {
// 处理读取事件
handleRead(key);
}
}
}
}
private static void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(key.selector(), SelectionKey.OP_READ);
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = clientChannel.read(buffer);
if (bytesRead == -1) {
clientChannel.close();
return;
}
buffer.flip();
String request = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println("request = " + request);
// 处理HTTP请求
String response = "HTTP/1.1 200 OK\r\n"
+ "Content-Type: text/plain\r\n"
+ "\r\n"
+ "Hello, World!"
+ "\r\n"
+ request;
ByteBuffer responseBuffer = StandardCharsets.UTF_8.encode(response);
clientChannel.write(responseBuffer);
clientChannel.close();
}
}
从前面两个例子可以看出,对网络的读写其实是对文件(磁盘)的读写,或者优惠资盘的读写性能,使用内存缓冲区。
-
首先创建了一个Selector,并打开一个ServerSocketChannel,绑定到8888端口,并配置为非阻塞模式。然后我们将ServerSocketChannel注册到Selector上,监听连接事件(OP_ACCEPT)。
-
在主循环中,我们使用
selector.select()
阻塞等待就绪的事件。一旦有事件就绪,我们获取已就绪的事件集合,并遍历处理每个事件。 -
当有连接事件就绪时,我们调用
handleAccept()
方法处理连接事件,即接受客户端连接,并将客户端SocketChannel注册到Selector上,监听读取事件(OP_READ)。 -
当有读取事件就绪时,我们调用
handleRead()
方法处理读取事件。在该方法中,我们使用ByteBuffer读取客户端发送的HTTP请求,并根据请求生成HTTP响应。最后,将响应写入客户端,并关闭连接。
Netty
Netty 是一个基于 Java NIO 用于开发高性能的异步事件驱动网络应用程序框架。
public class ChatServer {
public static void main(String[] args) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new ChatServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class ChatServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel incoming = ctx.channel();
System.out.println("Received message from " + incoming.remoteAddress() + ": " + msg);
incoming.writeAndFlush("Server received: " + msg + "\n");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client " + incoming.remoteAddress() + " connected");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client " + incoming.remoteAddress() + " disconnected");
}
}
}
客户端
package com.wei.netty.gpt.chat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class ChatClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new ChatClientHandler());
}
});
Channel channel = bootstrap.connect("localhost", 8888).sync().channel();
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String message = reader.readLine();
if (message == null || "exit".equalsIgnoreCase(message)) {
break;
}
channel.writeAndFlush(message + "\n");
}
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
private static class ChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.err.println(msg);
}
}
}
Epoll
epoll是Linux内核提供的一种高性能的事件通知机制,用于实现I/O多路复用。它是Linux内核2.6版本引入的一种新的I/O事件通知接口,相比于旧的select和poll机制,具有更好的性能和扩展性。
epoll使用一个文件描述符来管理需要监视的I/O事件,通过epoll_ctl函数向内核注册需要监视的文件描述符和事件类型。当有文件描述符上的事件发生时,内核会将事件通知给应用程序。应用程序可以通过epoll_wait函数来等待事件的发生,一旦有事件发生,就可以进行相应的处理。
epoll的主要特点包括:
- 支持大量的并发连接:epoll使用红黑树和哈希表来管理文件描述符,可以高效地处理大量的并发连接。
- 高效的事件通知:epoll采用事件驱动的方式,只有当事件发生时才会通知应用程序,避免了轮询的开销。
- 边缘触发和水平触发:epoll提供了边缘触发(EPOLLET)和水平触发(EPOLLIN和EPOLLOUT)两种模式。边缘触发模式只通知应用程序一次,直到下次有新的事件发生。水平触发模式则会持续通知应用程序,直到事件被处理。
- 零拷贝:epoll可以使用零拷贝技术,避免了数据在内核空间和用户空间之间的复制,提高了数据传输的效率。
TCP epoll
- 127.0.0.1:1234 -> 127.0.0.1:8080,在监听端口号为 8080 的进程新建一个 Socket 套接字。
- 新建后返回一个文件描述符。
- 后续所有请求都是基于这个文件描述符所指向文件的IO读写。
- epoll_ctl函数监听这个文件描述符的IO读写。
- 当有文件描述符上的事件发生时。
- 内核通知应用程序。
- 应用程序通过 epoll_wait 读取事件,进行相应的处理。
UDP epoll
TCP 面向连接,有 epoll异步进行IO读写,那么UDP 有这种功能吗?也是有的,UDP 连接时也会创建一个 Socket,也会返回一个文件描述符,epoll 是根据这个文件描述符上的事件来响应,所以 UDP 也可以使用epoll的功能。
UDP 会复用前面创建的 Socket 文件描述符吗?会的,文件描述符由IP,端口,事件类型指定,当是复用连接请求时,并不会新建一个Socket套接字。
Selector
在Java NIO(New I/O)中,Selector是一个可用于多路复用的高效机制,用于管理多个通道(Channel)的I/O事件。它允许单个线程同时监视多个通道上的事件,当一个或多个通道上的事件发生时,该线程可以选择处理这些事件。
Selector通过使用底层操作系统提供的机制(如epoll、kqueue等)来实现高效的事件通知。它基于事件驱动的模型,当一个或多个通道上的事件发生时,Selector将通知应用程序,应用程序可以根据事件的类型进行相应的处理。
IO 多路复用
IO:IO 系统
多路:多个网络连接
复用:一个或有限个线程
Selector Java 应用的多路复用?
是的
通过Selector,Java应用程序可以将多个通道(Channel)注册到Selector上,在一个线程中同时监视多个通道的I/O事件,避免了为每个通道创建一个线程的开销。并指定所关注的事件类型,如读取、写入等。然后,应用程序可以通过调用Selector的select()方法来阻塞等待事件的发生。当有事件发生时,select()方法将返回,并返回就绪的通道集合。
select 内核的多路复用?
是
在内核中,select函数会遍历所有被监视的文件描述符,检查每个文件描述符的状态。如果某个文件描述符上有事件发生,即满足了所关注的事件类型,select函数会将该文件描述符标记为就绪状态,并将其添加到返回的结果集中。应用程序可以通过遍历结果集获取到就绪的文件描述符和具体的事件信息。
结论
很明显,select、poll、epoll 都是 IO 多路复用,也是是一种 IO 事件的通知方式(中断?),多路复用 IO多路复用常常与 Reactor、Proctor)一起介绍,这几个概念常常会混淆,epoll 与 应用层的 Selector 组合,通过各种线程模型处理,一起实现高效的网络模型。
ChatGPT:
Java的Selector也是通过底层的机制实现多路复用。具体来说,Java的Selector会利用底层操作系统提供的机制(如epoll、kqueue等)来实现高效的事件监视和通知。它会将需要监视的通道注册到内核的文件描述符集合中,并通过底层机制等待事件的发生。一旦有事件发生,内核会将事件通知给Java的Selector,然后 Java 的 Selector 再通过相应的方法将事件通知给应用程序。
Netty 中怎么体现内核对网络的读写?
Dubbo 从内核到应用的数据是怎么读写的?