閱讀目錄
Buffer是一個對象,包含一些要寫入和讀出的數據。
在NIO中,所有的數據都是用緩沖區處理的,讀取數據時,它是從通道(Channel)直接讀到緩沖區中,在寫入數據時,也是從緩沖區寫入到通道。
緩沖區實質上是一個數組,通常是一個字節數組(ByteBuffer),也可以是其它類型的數組,此外緩沖區還提供了對數據的結構化訪問以及維護讀寫位置等信息。
Buffer類的繼承關系如下圖所示:
Channel是一個通道,網絡數據通過Channel讀取和寫入。通道和流的不同之處在於通道是雙向的(通道可以用於讀、寫後者二者同時進行),流只是在一個方向上移動。
Channel大體上可以分為兩類:用於網絡讀寫的SelectableChannel(ServerSocketChannel和SocketChannel就是其子類)、用於文件操作的FileChannel。
下面的例子給出通過FileChannel來向文件中寫入數據、從文件中讀取數據,將文件數據拷貝到另一個文件中:
public class NioTest
{
public static void main(String[] args) throws IOException
{
copyFile();
}
//拷貝文件
private static void copyFile()
{
FileInputStream in=null;
FileOutputStream out=null;
try
{
in=new FileInputStream("src/main/java/data/in-data.txt");
out=new FileOutputStream("src/main/java/data/out-data.txt");
FileChannel inChannel=in.getChannel();
FileChannel outChannel=out.getChannel();
ByteBuffer buffer=ByteBuffer.allocate(1024);
int bytesRead = inChannel.read(buffer);
while (bytesRead!=-1)
{
buffer.flip();
outChannel.write(buffer);
buffer.clear();
bytesRead = inChannel.read(buffer);
}
}
catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//寫文件
private static void writeFileNio()
{
try
{
RandomAccessFile fout = new RandomAccessFile("src/main/java/data/nio-data.txt", "rw");
FileChannel fc=fout.getChannel();
ByteBuffer buffer=ByteBuffer.allocate(1024);
buffer.put("hi123".getBytes());
buffer.flip();
try
{
fc.write(buffer);
} catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//讀文件
private static void readFileNio()
{
FileInputStream fileInputStream;
try
{
fileInputStream = new FileInputStream("src/main/java/data/nio-data.txt");
FileChannel fileChannel=fileInputStream.getChannel();//從 FileInputStream 獲取通道
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);//創建緩沖區
int bytesRead=fileChannel.read(byteBuffer);//將數據讀到緩沖區
while(bytesRead!=-1)
{
/*limit=position
* position=0;
*/
byteBuffer.flip();
//hasRemaining():告知在當前位置和限制之間是否有元素
while (byteBuffer.hasRemaining())
{
System.out.print((char) byteBuffer.get());
}
/*
* 清空緩沖區
* position=0;
* limit=capacity;
*/
byteBuffer.clear();
bytesRead = fileChannel.read(byteBuffer);
}
} catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
多路復用器提供選擇已經就緒的任務的能力。Selector會不斷的輪詢注冊在其上的Channel,如果某個Channel上面發送讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。
一個多路復用器Selector可以同時輪詢多個Channel,由於JDK使用了epoll代替了傳統的select實現,所以它沒有最大連接句柄1024/2048的限制,意味著只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。其模型如下圖所示:
用單線程處理一個Selector。要使用Selector,得向Selector注冊Channel,然後調用它的select()方法。這個方法會一直阻塞到某個注冊的通道有事件就緒。一旦這個方法返回,線程就可以處理這些事件,事件的例子有如新連接進來,數據接收等。
注:
1、什麼select模型?
select是事件觸發機制,當等待的事件發生就觸發進行處理,多用於Linux實現的服務器對客戶端的處理。
可以阻塞地同時探測一組支持非阻塞的IO設備,是否有事件發生(如可讀、可寫,有高優先級錯誤輸出等),直至某一個設備觸發了事件或者超過了指定的等待時間。也就是它們的職責不是做IO,而是幫助調用者尋找當前就緒的設備。
2、什麼是epoll模型?
epoll的設計思路,是把select/poll單個的操作拆分為1個epoll_create+多個epoll_ctrl+一個wait。此外,內核針對epoll操作添加了一個文件系統”eventpollfs”,每一個或者多個要監視的文件描述符都有一個對應的eventpollfs文件系統的inode節點,主要信息保存在eventpoll結構體中。而被監視的文件的重要信息則保存在epitem結構體中。所以他們是一對多的關系。
功能說明:開啟服務器端,對每一個接入的客戶端都向其發送hello字符串。
使用NIO進行服務器端開發主要有以下幾個步驟:
1、創建ServerSocketChannel,配置它為非阻塞模式
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
2、綁定監聽,配置TCP參數,如backlog大小
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
3、創建一個獨立的I/O線程,用於輪詢多路復用器Selector
4、創建Selector,將之前創建的ServerSocketChannel注冊到Selector上,監聽SelectionKey.ACCEPT
selector=Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
5、啟動I/O線程,在循環體內執行Selector.select()方法,輪詢就緒的Channel
while(true)
{
try
{
//select()阻塞到至少有一個通道在你注冊的事件上就緒了
//如果沒有准備好的channel,就在這一直阻塞
//select(long timeout)和select()一樣,除了最長會阻塞timeout毫秒(參數)。
selector.select();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
break;
}
}
6、當輪詢到了處於就緒狀態的Channel時,需對其進行判斷,如果是OP_ACCEPT狀態,說明是新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端
//返回已經就緒的SelectionKey,然後迭代執行
Set<SelectionKey> readKeys=selector.selectedKeys();
for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)
{
SelectionKey key=it.next();
it.remove();
try
{
if(key.isAcceptable())
{
ServerSocketChannel server=(ServerSocketChannel) key.channel();
SocketChannel client=server.accept();
client.configureBlocking(false);
client.register(selector,SelectionKey.OP_WRITE);
}
else if(key.isWritable())
{
SocketChannel client=(SocketChannel) key.channel();
ByteBuffer buffer=ByteBuffer.allocate(20);
String str="hello";
buffer=ByteBuffer.wrap(str.getBytes());
client.write(buffer);
key.cancel();
}
}catch(IOException e)
{
e.printStackTrace();
key.cancel();
try
{
key.channel().close();
} catch (IOException e1)
{
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
7、設置新接入的客戶端鏈路SocketChannel為非阻塞模式,配置其他的一些TCP參數
if(key.isAcceptable())
{
ServerSocketChannel server=(ServerSocketChannel) key.channel();
SocketChannel client=server.accept();
client.configureBlocking(false);
...
}
8、將SocketChannel注冊到Selector,監聽OP_WRITE
client.register(selector,SelectionKey.OP_WRITE);
9、如果輪詢的Channel為OP_WRITE,則說明要向SockChannel中寫入數據,則構造ByteBuffer對象,寫入數據包
else if(key.isWritable())
{
SocketChannel client=(SocketChannel) key.channel();
ByteBuffer buffer=ByteBuffer.allocate(20);
String str="hello";
buffer=ByteBuffer.wrap(str.getBytes());
client.write(buffer);
key.cancel();
}
完整代碼如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class ServerSocketChannelDemo
{
public static void main(String[] args)
{
ServerSocketChannel serverSocketChannel;
Selector selector=null;
try
{
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
selector=Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
while(true)
{
try
{
//select()阻塞到至少有一個通道在你注冊的事件上就緒了
//如果沒有准備好的channel,就在這一直阻塞
//select(long timeout)和select()一樣,除了最長會阻塞timeout毫秒(參數)。
selector.select();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
break;
}
//返回已經就緒的SelectionKey,然後迭代執行
Set<SelectionKey> readKeys=selector.selectedKeys();
for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)
{
SelectionKey key=it.next();
it.remove();
try
{
if(key.isAcceptable())
{
ServerSocketChannel server=(ServerSocketChannel) key.channel();
SocketChannel client=server.accept();
client.configureBlocking(false);
client.register(selector,SelectionKey.OP_WRITE);
}
else if(key.isWritable())
{
SocketChannel client=(SocketChannel) key.channel();
ByteBuffer buffer=ByteBuffer.allocate(20);
String str="hello";
buffer=ByteBuffer.wrap(str.getBytes());
client.write(buffer);
key.cancel();
}
}catch(IOException e)
{
e.printStackTrace();
key.cancel();
try
{
key.channel().close();
} catch (IOException e1)
{
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
}
}
我們用telnet localhost 8080模擬出多個客戶端:
程序運行結果如下:
1、Netty權威指南(李林峰)【Netty權威指南 PDF完整版帶目錄書簽+源碼 下載地址 http://www.linuxidc.com/Linux/2016-07/133575.htm 】
運用Spring注解實現Netty服務器端UDP應用程序 http://www.linuxidc.com/Linux/2013-09/89780.htm
Netty源碼學習筆記 http://www.linuxidc.com/Linux/2013-09/89778.htm
Netty使用實例 http://www.linuxidc.com/Linux/2013-09/89779.htm
Java NIO框架--Netty4的簡單示例 http://www.linuxidc.com/Linux/2015-01/111335.htm
Netty 的詳細介紹:請點這裡
Netty 的下載地址:請點這裡