Blocking IO & Non-blocking IO
对于阻塞式IO来说,每个线程只能处理一个channel,该线程和该channel绑定。也就是同步的,客户端在发送请求后,必须得在服务端有回应后才发送下一个请求。所以这个时候的所有请求将会在服务端得到同步。 对于非阻塞式IO来说,每个线程可以处理多个channel。也就是异步的,客户端在发送请求后,不必等待服务端的回应就可以发送下一个请求,这样对于所有的请求动作来说将会在服务端得到异步,这条请求的链路就象是一个请求队列,所有的动作在这里不会得到同步的。 传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。 Java NIO 是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。
TCP&UDP
本文主要是向读者介绍和演示两种IO在网络传输上的差异,所以我们先来了解一下TCP和UDP这两种常用的网络传输协议
TCP(传输控制协议)
1、建立连接,形成传输数据的通道 2、在连接中进行大数据量传输 3、通过三次握手完成连接,是可靠协议 4、必须建立连接,效率会稍低
特点:同步,面向连接,数据量大
UDP(数据报文协议)
1、将数据及源和目的封装成数据包,不需要建立连接 2、每个数据包的大小限制在64K内 3、不可靠协议,可能会出现数据包丢失的情况 4、不需要建立连接,速度快
特点:异步,面向无连接,大小限制在64K内
Socket
Socket,套接字,使应用程序能够读写与收发通讯协定(protocol)与资料的程序 1、Socket就是为网络服务提供的一种机制 2、通信的两端都有Socket 3、网络通信其实就是Socket之间的通信 4、数据在两个Socket之间通过IO传输
DatagramSocket&DatagramPacket
JDK文档介绍: 此类表示用于发送和接收数据报数据包的套接字。 数据报套接字是分组传送服务的发送或接收点。 在数据报套接字上发送或接收的每个数据包都被单独寻址和路由。 从一个机器发送到另一个机器的多个分组可以不同地路由,并且可以以任何顺序到达。
在可能的情况下,新构建的DatagramSocket启用了SO_BROADCAST套接字选项,以允许广播数据报的传输。 为了接收广播数据包,DatagramSocket应该绑定到通配符地址。 在一些实现中,当DatagramSocket绑定到更具体的地址时,也可以接收广播分组。
示例: DatagramSocket s = new DatagramSocket(null); s.bind(new InetSocketAddress(8888));其中相当于: DatagramSocket s = new DatagramSocket(8888);这两种情况都将创建一个DatagramSocket,可以在UDP端口8888上接收广播。
使用UDP+多线程的交互聊天系统: 服务端:
public class UdpService implements Runnable{
DatagramSocket service=null;
public UdpService(DatagramSocket service) {
this.service = service;
}
@Override
public void run() {
while (true) {
byte[] bytes = new byte[1024];
DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
try {
service.receive(packet);
} catch (IOException e) {
e.printStackTrace();
}
String ip = packet.getAddress().getHostAddress();
int port = packet.getPort();
String text = new String(packet.getData(), 0, packet.getLength());
System.out.println(ip + ":" + port + ":" + text);
if (text.equals("886")) {
System.out.println(ip + "disconnect....");
}
}
}
}
客户端:
public class UdpClient implements Runnable{
DatagramSocket client=null;
public UdpClient(DatagramSocket client) {
this.client = client;
}
@Override
public void run() {
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
byte[] bytes=line.getBytes();
DatagramPacket datagramPacket=new DatagramPacket(bytes,bytes.length, InetAddress.getByName("192.168.1.102"),10001);
client.send(datagramPacket);
if(line.equals("886")){
break;
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
实现端:
public class UdpTest {
public static void main(String[] args) throws SocketException {
UdpService udpService = new UdpService(new DatagramSocket(10001));
UdpClient udpClient = new UdpClient(new DatagramSocket());
new Thread(udpService).start();
new Thread(udpClient).start();
}
}
运行结果:
Socket & ServerSocket
ServerSocket:这个类实现了服务器套接字。 服务器套接字等待通过网络进入的请求。 它根据该请求执行一些操作,然后可能将结果返回给请求者。
TCP实现客户端服务端交互 客户端:
public class TcpClient {
public static void main(String[] args) throws IOException {
Socket client=new Socket("192.168.1.102",10002);
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(System.in));
PrintWriter printWriter=new PrintWriter(client.getOutputStream(),true);
BufferedReader bufferedReader1=new BufferedReader(new InputStreamReader(client.getInputStream()));
String line=null;
while((line=bufferedReader.readLine())!=null){
if(line.equals("886")){
break;
}
printWriter.println(line);
String recever=bufferedReader1.readLine();
System.out.println(recever);
}
client.close();
}
}
服务端:
public class TcpService {
public static void main(String[] args) throws IOException {
ServerSocket service=new ServerSocket(10002);
Socket clientSocket=service.accept();
System.out.println(clientSocket.getInetAddress().getHostAddress()+"connect......");
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter printWriter=new PrintWriter(clientSocket.getOutputStream(),true);
String line=null;
while((line=bufferedReader.readLine())!=null){
System.out.println(line);
printWriter.println(line.toUpperCase());
}
service.close();
clientSocket.close();
}
}
运行结果:
NIO
Java NIO系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。
Buffer(缓冲区)
缓冲区(Buffer):一个用于特定基本数据类型的容器。由 java.nio 包定义的,所有缓冲区都是 Buffer 抽象类的子类。 Java NIO 中的 Buffer 主要用于与 NIO 通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中的。
Buffer 就像一个数组,可以保存多个相同类型的数据。根据数据类型不同(boolean 除外) ,有以下 Buffer 常用子类: ByteBuffer CharBuffer ShortBuffer IntBuffer LongBuffer FloatBuffer DoubleBuffer 上述 Buffer 类 他们都采用相似的方法进行管理数据,只是各自管理的数据类型不同而已。都是通过如下方法获取一个 Buffer对象: static XxxBuffer allocate(int capacity) : 创建一个容量为 capacity 的 XxxBuffer 对象
Buffer 中的重要概念: 容量 (capacity) :表示 Buffer 最大数据容量,缓冲区容量不能为负,并且创建后不能更改。 限制 (limit):第一个不应该读取或写入的数据的索引,即位于 limit 后的数据不可读写。缓冲区的限制不能为负,并且不能大于其容量。 位置 (position):下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其限制 标记 (mark)与重置 (reset):标记是一个索引,通过 Buffer 中的 mark() 方法指定 Buffer 中一个特定的 position,之后可以通过调用 reset() 方法恢复到这个 position.
标记、位置、限制、容量遵守以下不变式: 0 <= mark <= position <= limit <= capacity
Buffer 的常用方法:
Buffer 所有子类提供了两个用于数据操作的方法:get() 与 put() 方法 获取 Buffer 中的数据 get() :读取单个字节 get(byte[] dst):批量读取多个字节到 dst 中 get(int index):读取指定索引位置的字节(不会移动 position) 放入数据到 Buffer 中 put(byte b):将给定单个字节写入缓冲区的当前位置 put(byte[] src):将 src 中的字节写入缓冲区的当前位置 put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)
方法效果演示:
public class BufferTest {
@Test
public void test1(){
String str="abcde";
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
System.out.println("________not________");
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
System.out.println(byteBuffer.position());
byteBuffer.put(str.getBytes());
System.out.println("________put________");
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
System.out.println(byteBuffer.position());
byteBuffer.flip();
System.out.println("________flip________");
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
System.out.println(byteBuffer.position());
byte[] bytes=new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
System.out.println(new String(bytes,0,bytes.length));
System.out.println("_________get_______");
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
System.out.println(byteBuffer.position());
byteBuffer.rewind();
System.out.println("_________rewind_______");
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
System.out.println(byteBuffer.position());
byte[] by=new byte[byteBuffer.limit()];
byteBuffer.get(by,0,2);
System.out.println("_________get test rewind_______");
System.out.println(new String(by,0,bytes.length));
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
System.out.println(byteBuffer.position());
System.out.println("_______remaining_________");
if(byteBuffer.hasRemaining()){
System.out.println(byteBuffer.remaining());
}
byteBuffer.mark();
byteBuffer.get(by,2,2);
System.out.println("_________get test mark_______");
System.out.println(new String(by,0,bytes.length));
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
System.out.println(byteBuffer.position());
byteBuffer.reset();
System.out.println("_________reset_______");
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
System.out.println(byteBuffer.position());
byteBuffer.clear();
System.out.println("_______clear_________");
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
System.out.println(byteBuffer.position());
System.out.println("_______test clear_________");
System.out.println((char)byteBuffer.get(1));
}
}
运行结果:
直接和非直接缓冲区
字节缓冲区要么是直接的,要么是非直接的。如果为直接字节缓冲区,则 Java 虚拟机会尽最大努力直接在此缓冲区上执行本机 I/O 操作。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。
直接字节缓冲区可以通过调用此类的 allocateDirect() 工厂方法来创建。此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对应用程序的内存需求量造成的影响可能并不明显。所以,建议将直接缓冲区主要分配给那些易受基础系统的本机 I/O 操作影响的大型、持久的缓冲区。一般情况下,最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。
直接字节缓冲区还可以通过 FileChannel 的 map() 方法 将文件区域直接映射到内存中来创建。该方法返回MappedByteBuffer 。Java 平台的实现有助于通过 JNI 从本机代码创建直接字节缓冲区。如果以上这些缓冲区中的某个缓冲区实例指的是不可访问的内存区域,则试图访问该区域不会更改该缓冲区的内容,并且将会在访问期间或稍后的某个时间导致抛出不确定的异常。
字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定。提供此方法是为了能够在性能关键型代码中执行显式缓冲区管理。
直接缓冲区的优点是读取效率高,缺点是不稳定,内存的创建和销毁的效率低,对内存的控制低,数据什么时候写入磁盘完全由操作系统决定;
使用场景:长时间需要使用的数据;大数据;
Channel(通道)
通道(Channel):由 java.nio.channels 包定义的。Channel 表示 IO 源与目标打开的连接。Channel 类似于传统的“流”。只不过 Channel 本身不能直接访问数据,Channel 只能与Buffer 进行交互。
Java 为 Channel 接口提供的最主要实现类如下: FileChannel:用于读取、写入、映射和操作文件的通道。 DatagramChannel:通过 UDP 读写网络中的数据通道。 SocketChannel:通过 TCP 读写网络中的数据。 ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。
获取通道的一种方式是对支持通道的对象调用getChannel() 方法。支持通道的类如下: FileInputStream FileOutputStream RandomAccessFile DatagramSocket Socket ServerSocket 获取通道的其他方式是使用 Files 类的静态方法 newByteChannel() 获取字节通道。或者通过通道的静态方法 open() 打开并返回指定通道。
Channel的常用方法:
1、使用stream.getChannel()获取通道+allocate()获取非直接缓冲区
@Test
public void test1() throws IOException {
FileInputStream fis=null ;
FileOutputStream fos=null;
FileChannel inputChannel=null;
FileChannel outputChannel=null;
try{
//建立通道
fis=new FileInputStream("F:/1.wmv");
fos=new FileOutputStream("F:/2.wmv");
inputChannel=fis.getChannel();
outputChannel=fos.getChannel();
//建立缓冲区
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
while(inputChannel.read(byteBuffer)!=-1){
byteBuffer.flip();
outputChannel.write(byteBuffer);
byteBuffer.clear();
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(fis!=null){
fis.close();
}
if(fos!=null){
fos.close();
}
if(inputChannel!=null){
inputChannel.close();
}
if(outputChannel!=null){
outputChannel.close();
}
}
}
2、使用FileChannel.open()获取通道+channel.map获取直接缓冲区
@Test
public void test2() throws IOException {
FileChannel inChannel = FileChannel.open(Paths.get("F:/1.wmv"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("F:/2.wmv"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
//内存映射文件
MappedByteBuffer inMappedBuf = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMappedBuf = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());
//直接对缓冲区进行数据的读写操作
byte[] dst = new byte[inMappedBuf.limit()];
inMappedBuf.get(dst);
outMappedBuf.put(dst);
inChannel.close();
outChannel.close();
}
3、使用Files.newByteChannel获取通道+channel.transferTo/transferFrom获取直接缓冲区
@Test
public void test3() throws IOException {
FileChannel inChannel = (FileChannel) Files.newByteChannel(Paths.get("F:/1.wmv"),StandardOpenOption.READ);
FileChannel outChannel = (FileChannel) Files.newByteChannel(Paths.get("F:/2.wmv"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
//inChannel.transferTo(0,inChannel.size(),outChannel);
outChannel.transferFrom(inChannel,inChannel.size(),0);
inChannel.close();
outChannel.close();
}
分散(Scatter)和聚集(Gather)
分散读取(Scattering Reads)是指从 Channel 中读取的数据“分散”到多个 Buffer 中。
注意:按照缓冲区的顺序,从 Channel 中读取的数据依次将 Buffer 填满。
聚集写入(Gathering Writes)是指将多个 Buffer 中的数据“聚集”到 Channel。
注意:按照缓冲区的顺序,写入 position 和 limit 之间的数据到 Channel 。
实例代码:
@Test
public void test4() throws IOException {
RandomAccessFile raf1=new RandomAccessFile("1.txt","rw");
FileChannel inChannel=raf1.getChannel();
ByteBuffer byteBuffer1=ByteBuffer.allocate(100);
ByteBuffer byteBuffer2=ByteBuffer.allocate(1024);
ByteBuffer[] byteBuffers={byteBuffer1,byteBuffer2};
inChannel.read(byteBuffers);
for (ByteBuffer b:byteBuffers
) {
b.flip();
}
System.out.println(new String(byteBuffers[0].array(),0,byteBuffers[0].limit()));
System.out.println(new String(byteBuffers[1].array(),0,byteBuffers[1].limit()));
RandomAccessFile raf2=new RandomAccessFile("2.txt","rw");
FileChannel outChannel=raf2.getChannel();
outChannel.write(byteBuffers);
}
Selector
选择器(Selector) 是 SelectableChannle 对象的多路复用器,Selector 可以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector 可使一个单独的线程管理多个 Channel。Selector 是非阻塞 IO 的核心。
Selector常用方法:
SelectionKey
当调用 register(Selector sel, int ops) 将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。
可以监听的事件类型(可使用 SelectionKey 的四个常量表示): 读 : SelectionKey.OP_READ 写 : SelectionKey.OP_WRITE 连接 : SelectionKey.OP_CONNECT 接收 : SelectionKey.OP_ACCEPT
若注册时不止监听一个事件,则可以使用“位或”操作符连接。
SelectionKey常用方法:
SocketChannel & ServerSocketChannel
Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。
Java NIO中的 ServerSocketChannel 是一个可以监听新进来的TCP连接的通道,就像标准IO中的ServerSocket一样。
SocketChannel常用方法:
ServerSocketChannel常用方法:
使用TCP+NIO的非阻塞聊天室 客户端:
public class NIOClient {
public static void main(String[] args) throws IOException {
SocketChannel service = SocketChannel.open(new InetSocketAddress("127.0.0.1", 10003));
service.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Scanner in = new Scanner(System.in);
while (in.hasNext()) {
String str = in.nextLine();
byteBuffer.put((new Date().toString() + ":" + str).getBytes());
byteBuffer.flip();
service.write(byteBuffer);
byteBuffer.clear();
if(str.equals("886")){
break;
}
}
service.close();
}
}
服务端:
public class NIOService {
public static void main(String[] args) throws IOException {
ServerSocketChannel service=ServerSocketChannel.open();
ServerSocketChannel service2=ServerSocketChannel.open();
service.configureBlocking(false);
service.bind(new InetSocketAddress(10003));
service2.bind(new InetSocketAddress(10004));
Selector selector=Selector.open();
service.register(selector, SelectionKey.OP_ACCEPT);
service2.register(selector, SelectionKey.OP_ACCEPT);
while(selector.select()>0){
Iterator<SelectionKey> iterator=selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key= iterator.next();
if(key.isAcceptable()){
SocketChannel client=service.accept();
System.out.println(client.getLocalAddress()+"connect....");
client.configureBlocking(false);
client.register(selector,SelectionKey.OP_READ);
}else if(key.isReadable()){
SocketChannel client= (SocketChannel) key.channel();
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
int len=0;
while((len=client.read(byteBuffer))>0){
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(),0,len));
byteBuffer.clear();
}
}
iterator.remove();
}
}
}
}
运行结果:
DatagramChannel
Java NIO中的DatagramChannel是一个能收发UDP包的通道。
DatagramChannel的常用方法:
使用UDP+NIO实现非阻塞聊天室 客户端:
public class DatagramClient {
public static void main(String[] args) throws IOException {
DatagramChannel dc = DatagramChannel.open();
dc.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(1024);
Scanner scan = new Scanner(System.in);
while(scan.hasNext()){
String str = scan.nextLine();
buf.put((new Date().toString() + ":" + str).getBytes());
buf.flip();
dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
buf.clear();
}
dc.close();
}
}
服务端:
public class DatagramService {
public static void main(String[] args) throws IOException {
DatagramChannel dc = DatagramChannel.open();
dc.configureBlocking(false);
dc.bind(new InetSocketAddress(9898));
Selector selector = Selector.open();
dc.register(selector, SelectionKey.OP_READ);
while(selector.select() > 0){
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey sk = it.next();
if(sk.isReadable()){
ByteBuffer buf = ByteBuffer.allocate(1024);
dc.receive(buf);
buf.flip();
System.out.println(new String(buf.array(), 0, buf.limit()));
buf.clear();
}
}
it.remove();
}
}
}
运行结果:
Pipe管道
Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。
Pipe的重要属性及方法:
Pipe实例:
public class PipeTest {
public static void main(String[] args) throws IOException {
Pipe pipe=Pipe.open();
Pipe.SinkChannel sinkChannel=pipe.sink();
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
byteBuffer.put("Hello friend".getBytes());
byteBuffer.flip();
sinkChannel.write(byteBuffer);
byteBuffer.clear();
Pipe.SourceChannel sourceChannel=pipe.source();
sourceChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println("result:");
System.out.println(new String(byteBuffer.array(),0,byteBuffer.limit()));
}
}
运行结果: