歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

Java AIO 服務器與客戶端實現示例

AIO用於文件處理還是比較快樂的,但用AIO來寫網絡消息處理服務器端與客戶端是比較麻煩的事情,當然這只是我個人意見,主要是有幾點原因:

一是AIO需要操作系統支持,還好Windows與Linux(模擬)都支持;

二是AIO同時使用遞歸調用和異步調用容易把程序員搞暈,代碼容易出錯;

三是CompletionHandler會使用單獨的線程跑,容易出現多線程問題,頻繁線程上下文切換比較消耗資源;

四是異步寫要創建隊列來緩存需要寫入的數據,否則肯定會遇到WritePendingException。

相對來說,NIO比較清楚直白,容易控制。

另外,筆者使用多線程模擬多個客戶場景失敗,代碼在run方法中調用AsynchronousSocketChannel.connect()沒返回,沒連接上服務器,不知道為何,請大俠指教,最後只好使用多個進程模擬多個客戶端,寫個類似下面代碼的bat文件,同時運行多個。

java -classpath .\ com.stevex.app.aio.Client 1
 
java -classpath .\ com.stevex.app.aio.Client 1
 
pause

服務器代碼:

package com.stevex.app.aio;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//import java.nio.channels.WritePendingException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executors;
 
public class XiaoNa {
    private final AsynchronousServerSocketChannel server;
    //寫隊列,因為當前一個異步寫調用還沒完成之前,調用異步寫會拋WritePendingException
    //所以需要一個寫隊列來緩存要寫入的數據,這是AIO比較坑的地方
    private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
    private boolean writing = false;
   
    public static void main(String[] args) throws IOException{
        XiaoNa xiaona = new XiaoNa();
        xiaona.listen();
    }
 
    public XiaoNa() throws IOException{
        //設置線程數為CPU核數
        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
        server = AsynchronousServerSocketChannel.open(channelGroup);
        //重用端口
        server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        //綁定端口並設置連接請求隊列長度
        server.bind(new InetSocketAddress(8383), 80);   
    }
 
    public void listen() {
        System.out.println(Thread.currentThread().getName() + ": run in listen method" );
        //開始接受第一個連接請求
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){                       
            @Override
            public void completed(AsynchronousSocketChannel channel,
                    Object attachment) {
                System.out.println(Thread.currentThread().getName() + ": run in accept completed method" );
               
                //先安排處理下一個連接請求,異步非阻塞調用,所以不用擔心掛住了
                //這裡傳入this是個地雷,小心多線程
                server.accept(null, this);
                //處理連接讀寫
                handle(channel);
            }
 
            private void handle(final AsynchronousSocketChannel channel) {
                System.out.println(Thread.currentThread().getName() + ": run in handle method" );
                //每個AsynchronousSocketChannel,分配一個緩沖區
                final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
                readBuffer.clear();
                channel.read(readBuffer, null, new CompletionHandler<Integer, Object>(){
 
                    @Override
                    public void completed(Integer count, Object attachment) {
                        System.out.println(Thread.currentThread().getName() + ": run in read completed method" ); 
                       
                        if(count > 0){
                            try{
                                readBuffer.flip();
                                //CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
                                CharBuffer charBuffer = Charset.forName("UTF-8").newDecoder().decode(readBuffer);
                                String question = charBuffer.toString();
                                String answer = Helper.getAnswer(question);
                                /*//寫入也是異步調用,也可以使用傳入CompletionHandler對象的方式來處理寫入結果
                                //channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));                           
                                try{
                                    channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
                                }
                                //Unchecked exception thrown when an attempt is made to write to an asynchronous socket channel and a previous write has not completed.
                                //看來操作系統也不可靠
                                catch(WritePendingException wpe){
                                    //休息一秒再重試,如果失敗就不管了
                                    Helper.sleep(1);
                                    channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
                                }*/
                                writeStringMessage(channel, answer);
                               
                                readBuffer.clear();
                            }
                            catch(IOException e){
                                e.printStackTrace();
                            }
                        }
                        else{
                            try {
                                //如果客戶端關閉socket,那麼服務器也需要關閉,否則浪費CPU
                                channel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                       
                        //異步調用OS處理下個讀取請求
                        //這裡傳入this是個地雷,小心多線程
                        channel.read(readBuffer, null, this);
                    }
 
                    /**
                    * 服務器讀失敗處理
                    * @param exc
                    * @param attachment
                    */
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("server read failed: " + exc);         
                        if(channel != null){
                            try {
                                channel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                   
                });                           
            }
 
            /**
            * 服務器接受連接失敗處理
            * @param exc
            * @param attachment
            */
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("server accept failed: " + exc);
            }
           
        });
    }
   
    /**
    * Enqueues a write of the buffer to the channel.
    * The call is asynchronous so the buffer is not safe to modify after
    * passing the buffer here.
    *
    * @param buffer the buffer to send to the channel
    */
    private void writeMessage(final AsynchronousSocketChannel channel, final ByteBuffer buffer) {
        boolean threadShouldWrite = false;
 
        synchronized(queue) {
            queue.add(buffer);
            // Currently no thread writing, make this thread dispatch a write
            if (!writing) {
                writing = true;
                threadShouldWrite = true;
            }
        }
 
        if (threadShouldWrite) {
            writeFromQueue(channel);
        }
    }
 
    private void writeFromQueue(final AsynchronousSocketChannel channel) {
        ByteBuffer buffer;
 
        synchronized (queue) {
            buffer = queue.poll();
            if (buffer == null) {
                writing = false;
            }
        }
 
        // No new data in buffer to write
        if (writing) {
            writeBuffer(channel, buffer);
        }
    }
 
    private void writeBuffer(final AsynchronousSocketChannel channel, ByteBuffer buffer) {
        channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    // Go back and check if there is new data to write
                    writeFromQueue(channel);
                }
            }
 
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.out.println("server write failed: " + exc);
            }
        });
    }
 
    /**
    * Sends a message
    * @param string the message
    * @throws CharacterCodingException
    */
    private void writeStringMessage(final AsynchronousSocketChannel channel, String msg) throws CharacterCodingException {
        writeMessage(channel, Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
    }
}

客戶端代碼:

package com.stevex.app.aio;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
 
import com.stevex.app.nio.CharsetHelper;
 
public class Client implements Runnable{
    private AsynchronousSocketChannel channel;
    private Helper helper;
    private CountDownLatch latch;
    private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
    private boolean writing = false;
   
    public Client(AsynchronousChannelGroup channelGroup, CountDownLatch latch) throws IOException, InterruptedException{
        this.latch = latch;
        helper = new Helper();
        initChannel(channelGroup);
    }
 
    private void initChannel(AsynchronousChannelGroup channelGroup) throws IOException {
        //在默認channel group下創建一個socket channel
        channel = AsynchronousSocketChannel.open(channelGroup);
        //設置Socket選項
        channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
        channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
        channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    }
 
    public static void main(String[] args) throws IOException, InterruptedException {
        int sleepTime = Integer.parseInt(args[0]);
        Helper.sleep(sleepTime);
       
        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
        //只能跑一個線程,第二個線程connect會掛住,暫時不明原因
        final int THREAD_NUM = 1;
        CountDownLatch latch = new CountDownLatch(THREAD_NUM);
       
        //創建個多線程模擬多個客戶端,模擬失敗,無效
        //只能通過命令行同時運行多個進程來模擬多個客戶端
        for(int i=0; i<THREAD_NUM; i++){
            Client c = new Client(channelGroup, latch);
            Thread t = new Thread(c);
            System.out.println(t.getName() + "---start");
            t.start();
            //讓主線程等待子線程處理再退出, 這對於異步調用無效
            //t.join();
        }         
       
        latch.await();
       
        if(channelGroup !=null){
            channelGroup.shutdown();
        }
    }
   
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "---run");
       
        //連接服務器
        channel.connect(new InetSocketAddress("localhost", 8383), null, new CompletionHandler<Void, Void>(){
            final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
           
            @Override
            public void completed(Void result, Void attachment) {
                //連接成功後, 異步調用OS向服務器寫一條消息
                try {
                    //channel.write(CharsetHelper.encode(CharBuffer.wrap(helper.getWord())));
                    writeStringMessage(helper.getWord());
                } catch (CharacterCodingException e) {
                    e.printStackTrace();
                }
               
                //helper.sleep();//等待寫異步調用完成
                readBuffer.clear();
                //異步調用OS讀取服務器發送的消息
                channel.read(readBuffer, null, new CompletionHandler<Integer, Object>(){
 
                    @Override
                    public void completed(Integer result, Object attachment) {
                        try{
                            //異步讀取完成後處理
                            if(result > 0){
                                readBuffer.flip();
                                CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
                                String answer = charBuffer.toString();
                                System.out.println(Thread.currentThread().getName() + "---" + answer);
                                readBuffer.clear();
                               
                                String word = helper.getWord();
                                if(word != null){
                                    //異步寫
                                    //channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
                                    writeStringMessage(word);
                                    //helper.sleep();//等待異步操作
                                    channel.read(readBuffer, null, this);
                                }
                                else{
                                    //不想發消息了,主動關閉channel
                                    shutdown();
                                }
                            }
                            else{
                                //對方已經關閉channel,自己被動關閉,避免空循環
                                shutdown();
                            }                                                     
                        }
                        catch(Exception e){
                            e.printStackTrace();
                        }                     
                    }                 
 
                    /**
                    * 讀取失敗處理
                    * @param exc
                    * @param attachment
                    */
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("client read failed: " + exc);
                        try {
                            shutdown();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                   
                });
            }
 
            /**
            * 連接失敗處理
            * @param exc
            * @param attachment
            */
            @Override
            public void failed(Throwable exc, Void attachment) {
                System.out.println("client connect to server failed: " + exc);
               
                try {
                    shutdown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }         
        });   
    }
   
    private void shutdown() throws IOException {
        if(channel != null){
            channel.close();
        }
       
        latch.countDown();                         
    }
   
    /**
    * Enqueues a write of the buffer to the channel.
    * The call is asynchronous so the buffer is not safe to modify after
    * passing the buffer here.
    *
    * @param buffer the buffer to send to the channel
    */
    private void writeMessage(final ByteBuffer buffer) {
        boolean threadShouldWrite = false;
 
        synchronized(queue) {
            queue.add(buffer);
            // Currently no thread writing, make this thread dispatch a write
            if (!writing) {
                writing = true;
                threadShouldWrite = true;
            }
        }
 
        if (threadShouldWrite) {
            writeFromQueue();
        }
    }
 
    private void writeFromQueue() {
        ByteBuffer buffer;
 
        synchronized (queue) {
            buffer = queue.poll();
            if (buffer == null) {
                writing = false;
            }
        }
 
        // No new data in buffer to write
        if (writing) {
            writeBuffer(buffer);
        }
    }
 
    private void writeBuffer(ByteBuffer buffer) {
        channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    // Go back and check if there is new data to write
                    writeFromQueue();
                }
            }
 
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
            }
        });
    }
 
    /**
    * Sends a message
    * @param string the message
    * @throws CharacterCodingException
    */
    public void writeStringMessage(String msg) throws CharacterCodingException {
        writeMessage(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
    }
}

Helper類

package com.stevex.app.aio;
 
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
 
public class Helper {
    private static BlockingQueue<String> words;
    private static Random random;
   
    public Helper() throws InterruptedException{
        words = new ArrayBlockingQueue<String>(5);
        words.put("hi");
        words.put("who");
        words.put("what");
        words.put("where");
        words.put("bye"); 
       
        random = new Random();
    }
   
    public String getWord(){
        return words.poll();
    }
 
    public void sleep() {
        try {
            TimeUnit.SECONDS.sleep(random.nextInt(3));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    } 
   
    public static void sleep(long l) {
        try {
            TimeUnit.SECONDS.sleep(l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
   
    public static String getAnswer(String question){
        String answer = null;
       
        switch(question){
        case "who":
            answer = "我是小娜\n";
            break;
        case "what":
            answer = "我是來幫你解悶的\n";
            break;
        case "where":
            answer = "我來自外太空\n";
            break;
        case "hi":
            answer = "hello\n";
            break;
        case "bye":
            answer = "88\n";
            break;
        default:
                answer = "請輸入 who, 或者what, 或者where";
        }
       
        return answer;
    }
}

CharsetHelper類:

package com.stevex.app.nio;
 
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
 
public class CharsetHelper {
    private static final String UTF_8 = "UTF-8";
    private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
    private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
   
    public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException{
        return encoder.encode(in);
    }
 
    public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{
        return decoder.decode(in);
    }
}

Java中介者設計模式 http://www.linuxidc.com/Linux/2014-07/104319.htm

Java 設計模式之模板方法開發中應用 http://www.linuxidc.com/Linux/2014-07/104318.htm

設計模式之 Java 中的單例模式(Singleton) http://www.linuxidc.com/Linux/2014-06/103542.htm

Java對象序列化 http://www.linuxidc.com/Linux/2014-10/107584.htm

大話設計模式(帶目錄完整版) PDF+源代碼 http://www.linuxidc.com/Linux/2014-08/105152.htm

Java中的函數傳遞 http://www.linuxidc.com/Linux/2014-11/109056.htm

Copyright © Linux教程網 All Rights Reserved