以下代碼只兼容Java 7及以上版本,對於一些關鍵地方請看注釋說明。
公共類:
package com.stevex.app.nio;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
public class CharsetHelper {
private static final String UTF_8 = "UTF-8";
private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException{
return encoder.encode(in);
}
public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{
return decoder.decode(in);
}
}
服務器代碼:
package com.stevex.app.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class XiaoNa {
private ByteBuffer readBuffer;
private Selector selector;
public static void main(String[] args){
XiaoNa xiaona = new XiaoNa();
xiaona.init();
xiaona.listen();
}
private void init(){
readBuffer = ByteBuffer.allocate(1024);
ServerSocketChannel servSocketChannel;
try {
servSocketChannel = ServerSocketChannel.open();
servSocketChannel.configureBlocking(false);
//綁定端口
servSocketChannel.socket().bind(new InetSocketAddress(8383));
selector = Selector.open();
servSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
private void listen() {
while(true){
try{
selector.select();
Iterator ite = selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey) ite.next();
ite.remove();//確保不重復處理
handleKey(key);
}
}
catch(Throwable t){
t.printStackTrace();
}
}
}
private void handleKey(SelectionKey key)
throws IOException, ClosedChannelException {
SocketChannel channel = null;
try{
if(key.isAcceptable()){
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
channel = serverChannel.accept();//接受連接請求
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
else if(key.isReadable()){
channel = (SocketChannel) key.channel();
readBuffer.clear();
/*當客戶端channel關閉後,會不斷收到read事件,但沒有消息,即read方法返回-1
* 所以這時服務器端也需要關閉channel,避免無限無效的處理*/
int count = channel.read(readBuffer);
if(count > 0){
//一定需要調用flip函數,否則讀取錯誤數據
readBuffer.flip();
/*使用CharBuffer配合取出正確的數據
String question = new String(readBuffer.array());
可能會出錯,因為前面readBuffer.clear();並未真正清理數據
只是重置緩沖區的position, limit, mark,
而readBuffer.array()會返回整個緩沖區的內容。
decode方法只取readBuffer的position到limit數據。
例如,上一次讀取到緩沖區的是"where", clear後position為0,limit為 1024,
再次讀取“bye"到緩沖區後,position為3,limit不變,
flip後position為0,limit為3,前三個字符被覆蓋了,但"re"還存在緩沖區中,
所以 new String(readBuffer.array()) 返回 "byere",
而decode(readBuffer)返回"bye"。
*/
CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
String question = charBuffer.toString();
String answer = getAnswer(question);
channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
}
else{
//這裡關閉channel,因為客戶端已經關閉channel或者異常了
channel.close();
}
}
}
catch(Throwable t){
t.printStackTrace();
if(channel != null){
channel.close();
}
}
}
private String getAnswer(String question){
String answer = null;
switch(question){
case "who":
answer = "我是小娜\n";
break;
case "what":
answer = "我是來幫你解悶的\n";
break;
case "where":
answer = "我來自外太空\n";
break;
case "hi":
answer = "hello\n";
break;
case "bye":
answer = "88\n";
break;
default:
answer = "請輸入 who, 或者what, 或者where";
}
return answer;
}
}
客戶端代碼:
package com.stevex.app.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Client implements Runnable{
private BlockingQueue<String> words;
private Random random;
public static void main(String[] args) {
//種多個線程發起Socket客戶端連接請求
for(int i=0; i<10; i++){
Client c = new Client();
c.init();
new Thread(c).start();
}
}
@Override
public void run() {
SocketChannel channel = null;
Selector selector = null;
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
//請求連接
channel.connect(new InetSocketAddress("localhost", 8383));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
boolean isOver = false;
while(! isOver){
selector.select();
Iterator ite = selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey) ite.next();
ite.remove();
if(key.isConnectable()){
if(channel.isConnectionPending()){
if(channel.finishConnect()){
//只有當連接成功後才能注冊OP_READ事件
key.interestOps(SelectionKey.OP_READ);
channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
sleep();
}
else{
key.cancel();
}
}
}
else if(key.isReadable()){
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
channel.read(byteBuffer);
byteBuffer.flip();
CharBuffer charBuffer = CharsetHelper.decode(byteBuffer);
String answer = charBuffer.toString();
System.out.println(Thread.currentThread().getId() + "---" + answer);
String word = getWord();
if(word != null){
channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
}
else{
isOver = true;
}
sleep();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
finally{
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private void init() {
words = new ArrayBlockingQueue<String>(5);
try {
words.put("hi");
words.put("who");
words.put("what");
words.put("where");
words.put("bye");
} catch (InterruptedException e) {
e.printStackTrace();
}
random = new Random();
}
private String getWord(){
return words.poll();
}
private void sleep() {
try {
TimeUnit.SECONDS.sleep(random.nextInt(3));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void sleep(long l) {
try {
TimeUnit.SECONDS.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Java中介者設計模式 http://www.linuxidc.com/Linux/2014-07/104319.htm
Java 設計模式之模板方法開發中應用 http://www.linuxidc.com/Linux/2014-07/104318.htm
設計模式之 Java 中的單例模式(Singleton) http://www.linuxidc.com/Linux/2014-06/103542.htm
Java對象序列化 http://www.linuxidc.com/Linux/2014-10/107584.htm
大話設計模式(帶目錄完整版) PDF+源代碼 http://www.linuxidc.com/Linux/2014-08/105152.htm
Java中的函數傳遞 http://www.linuxidc.com/Linux/2014-11/109056.htm