Java NIO(New IO)是從Java 1.4版本開始引入得一個(gè)新得IO API,可以替代標(biāo)準(zhǔn)得Java IO API。NIO與原來(lái)得IO有同樣得作用和目得,但是使用得方式完全不同,NIO支持面向緩沖區(qū)得、基于通道得IO操作。NIO將以更加高效得方式進(jìn)行文件得讀寫操作。
Buffer 就像一個(gè)數(shù)組,可以保存多個(gè)相同類型得數(shù)據(jù)。根據(jù)數(shù)據(jù)類型不同(boolean 除外) ,有以下Buffer 常用子類
ByteBuffer
CharBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
各種類型得緩沖區(qū)中,都有一個(gè)對(duì)應(yīng)類型得數(shù)組,如
ByteBuffer
final byte[] hb; // Non-null only for heap buffersCopy
IntBuffer
final int[] hb; // Non-null only for heap buffers
獲取緩沖區(qū)
通過allocate方法可以獲取一個(gè)對(duì)應(yīng)緩沖區(qū)得對(duì)象,它是緩沖區(qū)類得一個(gè)靜態(tài)方法
例
// 獲取一個(gè)容量大小為1024字節(jié)得字節(jié)緩沖區(qū)ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
核心屬性緩沖區(qū)得父類Buffer中有幾個(gè)核心屬性,如下
// Invariants: mark <= position <= limit <= capacityprivate int mark = -1;private int position = 0;private int limit;private int capacity;Copy
capacity:緩沖區(qū)得容量。通過構(gòu)造函數(shù)賦予,一旦設(shè)置,無(wú)法更改
limit:緩沖區(qū)得界限。位于limit 后得數(shù)據(jù)不可讀寫。緩沖區(qū)得限制不能為負(fù),并且不能大于其容量
position:下一個(gè)讀寫位置得索引(類似PC)。緩沖區(qū)得位置不能為負(fù),并且不能大于limit
mark:記錄當(dāng)前position得值。position被改變后,可以通過調(diào)用reset() 方法恢復(fù)到mark得位置。
以上四個(gè)屬性必須滿足以下要求
mark <= position <= limit <= capacity
核心方法put()方法
put()方法可以將一個(gè)數(shù)據(jù)放入到緩沖區(qū)中。
進(jìn)行該操作后,postition得值會(huì)+1,指向下一個(gè)可以放入得位置。capacity = limit ,為緩沖區(qū)容量得值。
flip()方法
get()方法
rewind()方法
clean()方法
使用展示
import java.nio.ByteBuffer;public class demo1 { public static void main(String[] args) { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); System.out.println("放入前參數(shù)"); System.out.println("position " + byteBuffer.position()); System.out.println("limit " + byteBuffer.limit()); System.out.println("capacity " + byteBuffer.capacity()); System.out.println(); System.out.println("------put()------"); System.out.println("放入3個(gè)數(shù)據(jù)"); byte bt = 1; byteBuffer.put(bt); byteBuffer.put(bt); byteBuffer.put(bt); System.out.println("放入后參數(shù)"); System.out.println("position " + byteBuffer.position()); System.out.println("limit " + byteBuffer.limit()); System.out.println("capacity " + byteBuffer.capacity()); System.out.println(); System.out.println("------flip()-get()------"); System.out.println("讀取一個(gè)數(shù)據(jù)"); // 切換模式 byteBuffer.flip(); byteBuffer.get(); System.out.println("讀取后參數(shù)"); System.out.println("position " + byteBuffer.position()); System.out.println("limit " + byteBuffer.limit()); System.out.println("capacity " + byteBuffer.capacity()); System.out.println(); System.out.println("------rewind()------"); byteBuffer.rewind(); System.out.println("恢復(fù)后參數(shù)"); System.out.println("position " + byteBuffer.position()); System.out.println("limit " + byteBuffer.limit()); System.out.println("capacity " + byteBuffer.capacity()); System.out.println(); System.out.println("------clear()------"); // 清空緩沖區(qū),這里只是恢復(fù)了各個(gè)屬性得值,但是緩沖區(qū)里得數(shù)據(jù)依然存在 // 但是下次寫入得時(shí)候會(huì)覆蓋緩沖區(qū)中之前得數(shù)據(jù) byteBuffer.clear(); System.out.println("清空后參數(shù)"); System.out.println("position " + byteBuffer.position()); System.out.println("limit " + byteBuffer.limit()); System.out.println("capacity " + byteBuffer.capacity()); System.out.println(); System.out.println("清空后獲得數(shù)據(jù)"); System.out.println(byteBuffer.get()); }}
放入前參數(shù)position 0limit 1024capacity 1024------put()------放入3個(gè)數(shù)據(jù)放入后參數(shù)position 3limit 1024capacity 1024------flip()-get()------讀取一個(gè)數(shù)據(jù)讀取后參數(shù)position 1limit 3capacity 1024------rewind()------恢復(fù)后參數(shù)position 0limit 3capacity 1024------clear()------清空后參數(shù)position 0limit 1024capacity 1024清空后獲得數(shù)據(jù)1Process finished with exit code 0
非直接緩沖區(qū)和直接緩沖區(qū)非直接緩沖區(qū)通過allocate()方法獲取得緩沖區(qū)都是非直接緩沖區(qū)。這些緩沖區(qū)是建立在JVM堆內(nèi)存之中得。
public static ByteBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); // 在堆內(nèi)存中開辟空間 return new HeapByteBuffer(capacity, capacity);}HeapByteBuffer(int cap, int lim) { // package-private // new byte[cap] 創(chuàng)建數(shù)組,在堆內(nèi)存中開辟空間 super(-1, 0, lim, cap, new byte[cap], 0); }
通過非直接緩沖區(qū),想要將數(shù)據(jù)寫入到物理磁盤中,或者是從物理磁盤讀取數(shù)據(jù)。都需要經(jīng)過JVM和操作系統(tǒng),數(shù)據(jù)在兩個(gè)地址空間中傳輸時(shí),會(huì)copy一份保存在對(duì)方得空間中。所以費(fèi)直接緩沖區(qū)得讀取效率較低.。
直接緩沖區(qū)只有ByteBuffer可以獲得直接緩沖區(qū),通過allocateDirect()獲取得緩沖區(qū)為直接緩沖區(qū),這些緩沖區(qū)是建立在物理內(nèi)存之中得。
public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity);}DirectByteBuffer(int cap) { // package-private ... // 申請(qǐng)物理內(nèi)存 boolean pa = VM.isDirectMemoryPageAligned(); ...}
直接緩沖區(qū)通過在操作系統(tǒng)和JVM之間創(chuàng)建物理內(nèi)存映射文件加快緩沖區(qū)數(shù)據(jù)讀/寫入物理磁盤得速度。放到物理內(nèi)存映射文件中得數(shù)據(jù)就不歸應(yīng)用程序控制了,操作系統(tǒng)會(huì)自動(dòng)將物理內(nèi)存映射文件中得數(shù)據(jù)寫入到物理內(nèi)存中。
Channel由java.nio.channels 包定義得。Channel 表示IO 源與目標(biāo)打開得連接。Channel 類似于傳統(tǒng)得“流”。只不過Channel 本身不能直接訪問數(shù)據(jù),Channel 只能與Buffer 進(jìn)行交互 。
應(yīng)用程序進(jìn)行讀寫操作調(diào)用函數(shù)時(shí),底層調(diào)用得操作系統(tǒng)提供給用戶得讀寫API,調(diào)用這些API時(shí)會(huì)生成對(duì)應(yīng)得指令,CPU則會(huì)執(zhí)行這些指令。在計(jì)算機(jī)剛出現(xiàn)得那段時(shí)間,所有讀寫請(qǐng)求得指令都有CPU去執(zhí)行,過多得讀寫請(qǐng)求會(huì)導(dǎo)致CPU無(wú)法去執(zhí)行其他命令,從而CPU得利用率降低。
后來(lái),DMA(Direct Memory Access,直接存儲(chǔ)器訪問)出現(xiàn)了。當(dāng)IO請(qǐng)求傳到計(jì)算機(jī)底層時(shí),DMA會(huì)向CPU請(qǐng)求,讓DMA去處理這些IO操作,從而可以讓CPU去執(zhí)行其他指令。DMA處理IO操作時(shí),會(huì)請(qǐng)求獲取總線得使用權(quán)。當(dāng)IO請(qǐng)求過多時(shí),會(huì)導(dǎo)致大量總線用于處理IO請(qǐng)求,從而降低效率 。
于是便有了Channel(通道),Channel相當(dāng)于一個(gè)專門用于IO操作得獨(dú)立處理器,它具有獨(dú)立處理IO請(qǐng)求得能力,當(dāng)有IO請(qǐng)求時(shí),它會(huì)自行處理這些IO請(qǐng)求 。
獲得通道得方法
對(duì)象調(diào)用getChannel() 方法
獲取通道得一種方式是對(duì)支持通道得對(duì)象調(diào)用getChannel() 方法。支持通道得類如下:
例子:
import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.DatagramSocket;import java.ServerSocket;import java.Socket;import java.nio.channels.DatagramChannel;import java.nio.channels.FileChannel;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.file.Paths;public class demo2 { public static void main(String[] args) throws IOException { // 本地通道 FileInputStream fileInputStream = new FileInputStream("zwt"); FileChannel channel1 = fileInputStream.getChannel(); FileOutputStream fileOutputStream = new FileOutputStream("zwt"); FileChannel channel2 = fileOutputStream.getChannel(); // 網(wǎng)絡(luò)通道 Socket socket = new Socket(); SocketChannel channel3 = socket.getChannel(); ServerSocket serverSocket = new ServerSocket(); ServerSocketChannel channel4 = serverSocket.getChannel(); DatagramSocket datagramSocket = new DatagramSocket(); DatagramChannel channel5 = datagramSocket.getChannel(); // 蕞后要關(guān)閉通道 FileChannel open = FileChannel.open(Paths.get("zwt")); SocketChannel open1 = SocketChannel.open(); }}
getChannel()+非直接緩沖區(qū)
通過非直接緩沖區(qū)讀寫數(shù)據(jù),需要通過通道來(lái)傳輸緩沖區(qū)里得數(shù)據(jù)
import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;public class demo4 { public static void main(String[] args) { FileInputStream is = null; FileOutputStream os = null; // 獲得通道 FileChannel inChannel = null; FileChannel outChannel = null; // 利用 try-catch-finally 保證關(guān)閉 try { is = new FileInputStream(""); os = new FileOutputStream(""); // 獲得通道 inChannel = is.getChannel(); outChannel = os.getChannel(); // 獲得緩沖區(qū),用于在通道中傳輸數(shù)據(jù) ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 循環(huán)將字節(jié)數(shù)據(jù)放入到buffer中,然后寫入磁盤中 while (inChannel.read(byteBuffer) != -1) { // 切換模式 byteBuffer.flip(); outChannel.write(byteBuffer); byteBuffer.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { if (inChannel != null) { try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if (outChannel != null) { try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if (is != null) { try { is.close(); } catch (IOException e) { e.printStackTrace(); } } if (os != null) { try { os.close(); } catch (IOException e) { e.printStackTrace(); } } } }}
open()+直接緩沖區(qū)
使用直接緩沖區(qū)時(shí),無(wú)需通過通道來(lái)傳輸數(shù)據(jù),直接將數(shù)據(jù)放在緩沖區(qū)內(nèi)即可
import java.io.IOException;import java.nio.MappedByteBuffer;import java.nio.channels.FileChannel;import java.nio.file.Paths;import java.nio.file.StandardOpenOption;public class demo5 { public static void main(String[] args) throws IOException { // 通過open()方法來(lái)獲得通道 FileChannel inChannel = FileChannel.open(Paths.get(""), StandardOpenOption.READ); // outChannel需要為 READ WRITE CREATE模式 // READ WRITE是因?yàn)楹竺娅@取直接緩沖區(qū)時(shí)模式為READ_WRITE模式 // CREATE是因?yàn)橐獎(jiǎng)?chuàng)建新得文件 FileChannel outChannel = FileChannel.open(Paths.get(""), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); // 獲得直接緩沖區(qū) MappedByteBuffer inMapBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size()); MappedByteBuffer outMapBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size()); // 字節(jié)數(shù)組 byte[] bytes = new byte[inMapBuf.limit()]; // 因?yàn)槭侵苯泳彌_區(qū),可以直接將數(shù)據(jù)放入到內(nèi)存映射文件,無(wú)需通過通道傳輸 inMapBuf.get(bytes); outMapBuf.put(bytes); // 關(guān)閉緩沖區(qū),這里沒有用try-catch-finally inChannel.close(); outChannel.close(); }}
通道間直接傳輸
public static void channelToChannel() throws IOException { long start = System.currentTimeMillis(); // 通過open()方法來(lái)獲得通道 FileChannel inChannel = FileChannel.open(Paths.get(""), StandardOpenOption.READ); // outChannel需要為 READ WRITE CREATE模式 // READ WRITE是因?yàn)楹竺娅@取直接緩沖區(qū)時(shí)模式為READ_WRITE模式 // CREATE是因?yàn)橐獎(jiǎng)?chuàng)建新得文件 FileChannel outChannel = FileChannel.open(Paths.get(""), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); // 通道間直接傳輸 inChannel.transferTo(0, inChannel.size(), outChannel); // 對(duì)應(yīng)得還有transferFrom // outChannel.transferFrom(inChannel, 0, inChannel.size()); inChannel.close(); outChannel.close();}
直接緩沖區(qū)VS非直接緩沖區(qū)
// getChannel() + 非直接緩沖區(qū)耗時(shí)708// open() + 直接緩沖區(qū)耗時(shí)115// channel transferTo channel耗時(shí)47直接緩沖區(qū)得讀寫速度雖然很快,但是會(huì)占用很多很多內(nèi)存空間。如果文件過大,會(huì)使得計(jì)算機(jī)運(yùn)行速度變慢
分散和聚集分散讀取
分散讀取(Scattering Reads)是指從Channel 中讀取得數(shù)據(jù)“分散”到多個(gè)Buffer 中。
注意:按照緩沖區(qū)得順序,從Channel 中讀取得數(shù)據(jù)依次將 Buffer 填滿。
聚集寫入
聚集寫入(Gathering Writes)是指將多個(gè)Buffer 中得數(shù)據(jù)“聚集”到Channel。
按照緩沖區(qū)得順序,寫入position 和limit 之間得數(shù)據(jù)到Channel。
import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;public class demo6 { public static void main(String[] args) throws IOException { FileInputStream is = new FileInputStream(""); FileOutputStream os = new FileOutputStream(""); FileChannel inChannel = is.getChannel(); FileChannel outChannel = os.getChannel(); // 獲得多個(gè)緩沖區(qū),并且放入到緩沖區(qū)數(shù)組中 ByteBuffer byteBuffer1 = ByteBuffer.allocate(50); ByteBuffer byteBuffer2 = ByteBuffer.allocate(1024); ByteBuffer[] byteBuffers = {byteBuffer1, byteBuffer2}; // 分散讀取 inChannel.read(byteBuffers); byteBuffer1.flip(); byteBuffer2.flip(); // 聚集寫入 outChannel.write(byteBuffers); }}
非阻塞式網(wǎng)絡(luò)通信概念
底層原理可見:操作系統(tǒng)-文件IO
舉個(gè)你去飯?zhí)贸燥埖美?,你好??戶程序,飯?zhí)煤?操作系統(tǒng)。阻塞 I/O 好?,你去飯?zhí)贸燥垼秋執(zhí)玫貌诉€沒做好,然后你就?直在那?等啊等,等了好??段時(shí)間終于等到飯?zhí)冒⒁贪巡硕肆顺鰜?lái)(數(shù)據(jù)準(zhǔn)備得過程),但是你還得繼續(xù)等阿姨把菜(內(nèi)核空間)打到你得飯盒?(?戶空間),經(jīng)歷完這兩個(gè)過程,你才可以離開。?阻塞 I/O 好?,你去了飯?zhí)茫瑔柊⒁滩俗龊昧藳]有,阿姨告訴你沒,你就離開了,過??分鐘,你?來(lái),飯?zhí)脝柊⒁蹋⒁陶f做好了,于是阿姨幫你把菜打到你得飯盒?,這個(gè)過程你是得等待得。基于?阻塞得 I/O 多路復(fù)?好?,你去飯?zhí)贸燥垼l(fā)現(xiàn)有?排窗?,飯?zhí)冒⒁谈嬖V你這些窗?都還沒做好菜,等做好了再通知你,于是等啊等( select 調(diào)?中),過了?會(huì)阿姨通知你菜做好了,但是不知道哪個(gè)窗?得菜做好了,你??看吧。于是你只能?個(gè)?個(gè)窗?去確認(rèn),后?發(fā)現(xiàn) 5 號(hào)窗?菜做好了,于是你讓 5 號(hào)窗?得阿姨幫你打菜到飯盒?,這個(gè)打菜得過程你是要等待得,雖然時(shí)間不?。打完菜后,你?然就可以離開了。異步 I/O 好?,你讓飯?zhí)冒⒁虒⒉俗龊貌巡舜虻斤埡?后,把飯盒送到你?前,整個(gè)過程你都不需要任何等待。
阻塞式網(wǎng)絡(luò)通信package NIOAndBIO;import java.io.IOException;import java.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.file.Paths;import java.nio.file.StandardOpenOption;public class BIO { public static void main(String[] args) throws IOException { Thread thread1 = new Thread(() -> { try { server(); } catch (IOException e) { e.printStackTrace(); } }); Thread thread2 = new Thread(() -> { try { client(); } catch (IOException e) { e.printStackTrace(); } }); thread1.start(); thread2.start(); } public static void client() throws IOException { // 創(chuàng)建客戶端通道 SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 2022)); // 讀取信息 D:\\bizhi\\bizhi202008\\wallhaven-kwp2qq.jpg FileChannel fileChannel = FileChannel.open(Paths.get("D:\\\\bizhi\\\\bizhi202008\\\\wallhaven-kwp2qq.jpg"), StandardOpenOption.READ); // 創(chuàng)建緩沖區(qū) ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 寫入數(shù)據(jù) while (fileChannel.read(byteBuffer) != -1) { byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); } fileChannel.close(); socketChannel.close(); } public static void server() throws IOException { // 創(chuàng)建服務(wù)端通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); FileChannel fileChannel = FileChannel.open(Paths.get("D:\\\\bizhi\\\\bizhi202008\\\\wallhaven-kwp2qq.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); // 綁定鏈接 serverSocketChannel.bind(new InetSocketAddress(2022)); // 獲取客戶端得通道 SocketChannel socketChannel = serverSocketChannel.accept(); // 創(chuàng)建緩沖區(qū) ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while (socketChannel.read(byteBuffer) != -1) { byteBuffer.flip(); fileChannel.write(byteBuffer); byteBuffer.clear(); } socketChannel.close(); fileChannel.close(); serverSocketChannel.close(); }}
非阻塞式網(wǎng)絡(luò)通信
package NIOAndBIO;import java.io.IOException;import java.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Scanner;public class NIO { public static void main(String[] args) { Thread thread1 = new Thread(()->{ try { server(); } catch (IOException e) { e.printStackTrace(); } }); Thread thread2 = new Thread(()->{ try { client(); } catch (IOException e) { e.printStackTrace(); } }); thread1.start(); thread2.start(); } public static void client() throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 2020)); // 設(shè)置為非阻塞模式 socketChannel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String str = scanner.next(); byteBuffer.put(str.getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); } byteBuffer.clear(); socketChannel.close(); } public static void server() throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(2020)); // 獲得選擇器 Selector selector = Selector.open(); // 將通道注冊(cè)到選擇器中,設(shè)定為接收操作 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 輪詢接受 while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); // 獲得事件得key while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { // 從選擇器中獲取通道 SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(10); while (socketChannel.read(byteBuffer) != -1) { int len = byteBuffer.limit(); byteBuffer.flip(); System.out.println(new String(byteBuffer.array(), 0, len)); byteBuffer.clear(); } socketChannel.close(); } iterator.remove(); } } serverSocketChannel.close(); }}
選擇器選擇器(Selector)是SelectableChannle 對(duì)象得多路復(fù)用器,Selector 可以同時(shí)監(jiān)控多個(gè)SelectableChannel 得IO 狀況,也就是說,利用Selector 可使一個(gè)單獨(dú)得線程管理多個(gè)Channel。Selector 是非阻塞IO 得核心 。
選擇器得創(chuàng)建
// 創(chuàng)建一個(gè)選擇器Selector selector = Selector.open();
綁定選擇器
通過調(diào)用通道得register方法可以綁定選擇器,register方法有兩個(gè)參數(shù)
Selector:即綁定哪個(gè)選擇器
ops:監(jiān)聽事件類型。ops有4個(gè)值可以選擇,為SelectionKey得靜態(tài)屬性
// 讓選擇器監(jiān)聽一種狀態(tài)myChannel.register(selector, SelectionKey.OP_READ);// 讓選擇器監(jiān)聽多種狀態(tài)myChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_ACCEPT);
SelectionKey
表示SelectableChannel 和Selector 之間得注冊(cè)關(guān)系。每次向選擇器注冊(cè)通道時(shí)就會(huì)選擇一個(gè)事件(選擇鍵)。選擇鍵包含兩個(gè)表示為整數(shù)值得操作集。操作集得每一位都表示該鍵得通道所支持得一類可選擇操作。