AIO用於文件處理還是比較快樂的,但用AIO來寫網絡消息處理服務器端與客戶端是比較麻煩的事情,當然這只是我個人意見,主要是有幾點原因:
一是AIO需要操作系統支持,還好Windows與Linux(模擬)都支持;
二是AIO同時使用遞歸調用和異步調用容易把程序員搞暈,代碼容易出錯;
三是CompletionHandler會使用單獨的線程跑,容易出現多線程問題,頻繁線程上下文切換比較消耗資源;
四是異步寫要創建隊列來緩存需要寫入的數據,否則肯定會遇到WritePendingException。
相對來說,NIO比較清楚直白,容易控制。
另外,筆者使用多線程模擬多個客戶場景失敗,代碼在run方法中調用AsynchronousSocketChannel.connect()沒返回,沒連接上服務器,不知道為何,請大俠指教,最後只好使用多個進程模擬多個客戶端,寫個類似下面代碼的bat文件,同時運行多個。
java -classpath .\ com.stevex.app.aio.Client 1
java -classpath .\ com.stevex.app.aio.Client 1
pause
服務器代碼:
package com.stevex.app.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//import java.nio.channels.WritePendingException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executors;
public class XiaoNa {
private final AsynchronousServerSocketChannel server;
//寫隊列,因為當前一個異步寫調用還沒完成之前,調用異步寫會拋WritePendingException
//所以需要一個寫隊列來緩存要寫入的數據,這是AIO比較坑的地方
private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
private boolean writing = false;
public static void main(String[] args) throws IOException{
XiaoNa xiaona = new XiaoNa();
xiaona.listen();
}
public XiaoNa() throws IOException{
//設置線程數為CPU核數
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
server = AsynchronousServerSocketChannel.open(channelGroup);
//重用端口
server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
//綁定端口並設置連接請求隊列長度
server.bind(new InetSocketAddress(8383), 80);
}
public void listen() {
System.out.println(Thread.currentThread().getName() + ": run in listen method" );
//開始接受第一個連接請求
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){
@Override
public void completed(AsynchronousSocketChannel channel,
Object attachment) {
System.out.println(Thread.currentThread().getName() + ": run in accept completed method" );
//先安排處理下一個連接請求,異步非阻塞調用,所以不用擔心掛住了
//這裡傳入this是個地雷,小心多線程
server.accept(null, this);
//處理連接讀寫
handle(channel);
}
private void handle(final AsynchronousSocketChannel channel) {
System.out.println(Thread.currentThread().getName() + ": run in handle method" );
//每個AsynchronousSocketChannel,分配一個緩沖區
final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
readBuffer.clear();
channel.read(readBuffer, null, new CompletionHandler<Integer, Object>(){
@Override
public void completed(Integer count, Object attachment) {
System.out.println(Thread.currentThread().getName() + ": run in read completed method" );
if(count > 0){
try{
readBuffer.flip();
//CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
CharBuffer charBuffer = Charset.forName("UTF-8").newDecoder().decode(readBuffer);
String question = charBuffer.toString();
String answer = Helper.getAnswer(question);
/*//寫入也是異步調用,也可以使用傳入CompletionHandler對象的方式來處理寫入結果
//channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
try{
channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
}
//Unchecked exception thrown when an attempt is made to write to an asynchronous socket channel and a previous write has not completed.
//看來操作系統也不可靠
catch(WritePendingException wpe){
//休息一秒再重試,如果失敗就不管了
Helper.sleep(1);
channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
}*/
writeStringMessage(channel, answer);
readBuffer.clear();
}
catch(IOException e){
e.printStackTrace();
}
}
else{
try {
//如果客戶端關閉socket,那麼服務器也需要關閉,否則浪費CPU
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//異步調用OS處理下個讀取請求
//這裡傳入this是個地雷,小心多線程
channel.read(readBuffer, null, this);
}
/**
* 服務器讀失敗處理
* @param exc
* @param attachment
*/
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("server read failed: " + exc);
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
/**
* 服務器接受連接失敗處理
* @param exc
* @param attachment
*/
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("server accept failed: " + exc);
}
});
}
/**
* Enqueues a write of the buffer to the channel.
* The call is asynchronous so the buffer is not safe to modify after
* passing the buffer here.
*
* @param buffer the buffer to send to the channel
*/
private void writeMessage(final AsynchronousSocketChannel channel, final ByteBuffer buffer) {
boolean threadShouldWrite = false;
synchronized(queue) {
queue.add(buffer);
// Currently no thread writing, make this thread dispatch a write
if (!writing) {
writing = true;
threadShouldWrite = true;
}
}
if (threadShouldWrite) {
writeFromQueue(channel);
}
}
private void writeFromQueue(final AsynchronousSocketChannel channel) {
ByteBuffer buffer;
synchronized (queue) {
buffer = queue.poll();
if (buffer == null) {
writing = false;
}
}
// No new data in buffer to write
if (writing) {
writeBuffer(channel, buffer);
}
}
private void writeBuffer(final AsynchronousSocketChannel channel, ByteBuffer buffer) {
channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
} else {
// Go back and check if there is new data to write
writeFromQueue(channel);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("server write failed: " + exc);
}
});
}
/**
* Sends a message
* @param string the message
* @throws CharacterCodingException
*/
private void writeStringMessage(final AsynchronousSocketChannel channel, String msg) throws CharacterCodingException {
writeMessage(channel, Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
}
}
客戶端代碼:
package com.stevex.app.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import com.stevex.app.nio.CharsetHelper;
public class Client implements Runnable{
private AsynchronousSocketChannel channel;
private Helper helper;
private CountDownLatch latch;
private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
private boolean writing = false;
public Client(AsynchronousChannelGroup channelGroup, CountDownLatch latch) throws IOException, InterruptedException{
this.latch = latch;
helper = new Helper();
initChannel(channelGroup);
}
private void initChannel(AsynchronousChannelGroup channelGroup) throws IOException {
//在默認channel group下創建一個socket channel
channel = AsynchronousSocketChannel.open(channelGroup);
//設置Socket選項
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
public static void main(String[] args) throws IOException, InterruptedException {
int sleepTime = Integer.parseInt(args[0]);
Helper.sleep(sleepTime);
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
//只能跑一個線程,第二個線程connect會掛住,暫時不明原因
final int THREAD_NUM = 1;
CountDownLatch latch = new CountDownLatch(THREAD_NUM);
//創建個多線程模擬多個客戶端,模擬失敗,無效
//只能通過命令行同時運行多個進程來模擬多個客戶端
for(int i=0; i<THREAD_NUM; i++){
Client c = new Client(channelGroup, latch);
Thread t = new Thread(c);
System.out.println(t.getName() + "---start");
t.start();
//讓主線程等待子線程處理再退出, 這對於異步調用無效
//t.join();
}
latch.await();
if(channelGroup !=null){
channelGroup.shutdown();
}
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "---run");
//連接服務器
channel.connect(new InetSocketAddress("localhost", 8383), null, new CompletionHandler<Void, Void>(){
final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
@Override
public void completed(Void result, Void attachment) {
//連接成功後, 異步調用OS向服務器寫一條消息
try {
//channel.write(CharsetHelper.encode(CharBuffer.wrap(helper.getWord())));
writeStringMessage(helper.getWord());
} catch (CharacterCodingException e) {
e.printStackTrace();
}
//helper.sleep();//等待寫異步調用完成
readBuffer.clear();
//異步調用OS讀取服務器發送的消息
channel.read(readBuffer, null, new CompletionHandler<Integer, Object>(){
@Override
public void completed(Integer result, Object attachment) {
try{
//異步讀取完成後處理
if(result > 0){
readBuffer.flip();
CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
String answer = charBuffer.toString();
System.out.println(Thread.currentThread().getName() + "---" + answer);
readBuffer.clear();
String word = helper.getWord();
if(word != null){
//異步寫
//channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
writeStringMessage(word);
//helper.sleep();//等待異步操作
channel.read(readBuffer, null, this);
}
else{
//不想發消息了,主動關閉channel
shutdown();
}
}
else{
//對方已經關閉channel,自己被動關閉,避免空循環
shutdown();
}
}
catch(Exception e){
e.printStackTrace();
}
}
/**
* 讀取失敗處理
* @param exc
* @param attachment
*/
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("client read failed: " + exc);
try {
shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
/**
* 連接失敗處理
* @param exc
* @param attachment
*/
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("client connect to server failed: " + exc);
try {
shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
private void shutdown() throws IOException {
if(channel != null){
channel.close();
}
latch.countDown();
}
/**
* Enqueues a write of the buffer to the channel.
* The call is asynchronous so the buffer is not safe to modify after
* passing the buffer here.
*
* @param buffer the buffer to send to the channel
*/
private void writeMessage(final ByteBuffer buffer) {
boolean threadShouldWrite = false;
synchronized(queue) {
queue.add(buffer);
// Currently no thread writing, make this thread dispatch a write
if (!writing) {
writing = true;
threadShouldWrite = true;
}
}
if (threadShouldWrite) {
writeFromQueue();
}
}
private void writeFromQueue() {
ByteBuffer buffer;
synchronized (queue) {
buffer = queue.poll();
if (buffer == null) {
writing = false;
}
}
// No new data in buffer to write
if (writing) {
writeBuffer(buffer);
}
}
private void writeBuffer(ByteBuffer buffer) {
channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
} else {
// Go back and check if there is new data to write
writeFromQueue();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
});
}
/**
* Sends a message
* @param string the message
* @throws CharacterCodingException
*/
public void writeStringMessage(String msg) throws CharacterCodingException {
writeMessage(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
}
}
Helper類:
package com.stevex.app.aio;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Helper {
private static BlockingQueue<String> words;
private static Random random;
public Helper() throws InterruptedException{
words = new ArrayBlockingQueue<String>(5);
words.put("hi");
words.put("who");
words.put("what");
words.put("where");
words.put("bye");
random = new Random();
}
public String getWord(){
return words.poll();
}
public void sleep() {
try {
TimeUnit.SECONDS.sleep(random.nextInt(3));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void sleep(long l) {
try {
TimeUnit.SECONDS.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static String getAnswer(String question){
String answer = null;
switch(question){
case "who":
answer = "我是小娜\n";
break;
case "what":
answer = "我是來幫你解悶的\n";
break;
case "where":
answer = "我來自外太空\n";
break;
case "hi":
answer = "hello\n";
break;
case "bye":
answer = "88\n";
break;
default:
answer = "請輸入 who, 或者what, 或者where";
}
return answer;
}
}
CharsetHelper類:
package com.stevex.app.nio;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
public class CharsetHelper {
private static final String UTF_8 = "UTF-8";
private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException{
return encoder.encode(in);
}
public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{
return decoder.decode(in);
}
}
Java中介者設計模式 http://www.linuxidc.com/Linux/2014-07/104319.htm
Java 設計模式之模板方法開發中應用 http://www.linuxidc.com/Linux/2014-07/104318.htm
設計模式之 Java 中的單例模式(Singleton) http://www.linuxidc.com/Linux/2014-06/103542.htm
Java對象序列化 http://www.linuxidc.com/Linux/2014-10/107584.htm
大話設計模式(帶目錄完整版) PDF+源代碼 http://www.linuxidc.com/Linux/2014-08/105152.htm
Java中的函數傳遞 http://www.linuxidc.com/Linux/2014-11/109056.htm