我們一直說Redis的性能很快,那為什么快?Redis為了達到性能最大化,做了哪些方面的優化呢? 在深度解析Redis的數據結構 這篇文章中,其實從數據結構上分析了Redis性能高的一方面原因。
在目前的k-v數據庫的技術選型中,Redis幾乎是首選的用來實現高性能緩存的方案,它的性能有多快呢?
根據官方的基準測試數據,一臺普通硬件配置的Linux機器上運行單個Redis實例,處理簡單命令(O(n)或者O(logn)),QPS可以達到8W,如果使用pipeline批處理功能,QPS最高可以達到10W。
Redis的高性能主要依賴于幾個方面。
下面我們分別從上述幾個方面進行展開說明,先來看網絡I/O的多路復用模型。
當我們在客戶端向Redis Server發送一條指令,并且得到Redis回復的整個過程中,Redis做了什么呢?
<center>圖4-1</center>
要處理命令,則redis必須完整地接收客戶端的請求,并將命令解析出來,再將結果讀出來,通過網絡回寫到客戶端。整個工序分為以下幾個部分:
其中解析和執行是純cpu/內存操作,而接收和返回主要是IO操作,首先我們先來看通信的過程。
同樣,我也畫了一幅圖來描述網絡數據的傳輸流程
首先,對于TCP通信來說,每個TCP Socket的內核中都有一個發送緩沖區和一個接收緩沖區
接收緩沖區把數據緩存到內核,若應用進程一直沒有調用Socket的read方法進行讀取,那么該數據會一直被緩存在接收緩沖區內。不管進程是否讀取Socket,對端發來的數據都會經過內核接收并緩存到Socket的內核接收緩沖區。
read所要做的工作,就是把內核接收緩沖區中的數據復制到應用層用戶的Buffer里。
進程調用Socket的send發送數據的時候,一般情況下是將數據從應用層用戶的Buffer里復制到Socket的內核發送緩沖區,然后send就會在上層返回。換句話說,send返回時,數據不一定會被發送到對端。
網卡中的緩沖區既不屬于內核空間,也不屬于用戶空間。它屬于硬件緩沖,允許網卡與操作系統之間有個緩沖; 內核緩沖區在內核空間,在內存中,用于內核程序,做為讀自或寫往硬件的數據緩沖區; 用戶緩沖區在用戶空間,在內存中,用于用戶程序,做為讀自或寫往硬件的數據緩沖區
網卡芯片收到網絡數據會以中斷的方式通知CPU,我有數據了,存在我的硬件緩沖里了,來讀我啊。 CPU收到這個中斷信號后,會調用相應的驅動接口函數從網卡的硬件緩沖里把數據讀到內核緩沖區,正常情況下會向上傳遞給TCP/IP模塊一層一層的處理。
Redis的通信采用的是多路復用機制,什么是多路復用機制呢?
由于Redis是C語言實現,為了簡化大家的理解,我們采用Java語言來描述這個過程。
在理解多路復用之前,我們先來了解一下BIO。
在Java中,如果要實現網絡通信,我們會采用Socket套接字來完成。
Socket這不是一個協議,而是一個通信模型。其實它最初是BSD發明的,主要用來一臺電腦的兩個進程間通信,然后把它用到了兩臺電腦的進程間通信。所以,可以把它簡單理解為進程間通信,不是什么高級的東西。主要做的事情不就是:
A發包:發請求包給某個已經綁定的端口(所以我們經常會訪問這樣的地址182.13.15.16:1235,1235就是端口);收到B的允許;然后正式發送;發送完了,告訴B要斷開鏈接;收到斷開允許,馬上斷開,然后發送已經斷開信息給B。
B收包:綁定端口和IP;然后在這個端口監聽;接收到A的請求,發允許給A,并做好接收準備,主要就是清理緩存等待接收新數據;然后正式接收;接受到斷開請求,允許斷開;確認斷開后,繼續監聽其它請求。
可見,Socket其實就是I/O操作,Socket并不僅限于網絡通信,在網絡通信中,它涵蓋了網絡層、傳輸層、會話層、表示層、應用層——其實這都不需要記,因為Socket通信時候用到了IP和端口,僅這兩個就表明了它用到了網絡層和傳輸層;而且它無視多臺電腦通信的系統差別,所以它涉及了表示層;一般Socket都是基于一個應用程序的,所以會涉及到會話層和應用層。
BIOServerSocket
public?class?BIOServerSocket?{ ????//先定義一個端口號,這個端口的值是可以自己調整的。 ????static?final?int?DEFAULT_PORT=8080; ????public?static?void?main(String[]?args)?throws?IOException?{ ????????//先定義一個端口號,這個端口的值是可以自己調整的。 ????????//在服務器端,我們需要使用ServerSocket,所以我們先聲明一個ServerSocket變量 ????????ServerSocket?serverSocket=null; ????????//接下來,我們需要綁定監聽端口,?那我們怎么做呢?只需要創建使用serverSocket實例 ????????//ServerSocket有很多構造重載,在這里,我們把前邊定義的端口傳入,表示當前 ????????//ServerSocket監聽的端口是8080 ????????serverSocket=new?ServerSocket(DEFAULT_PORT); ????????System.out.println("啟動服務,監聽端口:"+DEFAULT_PORT); ????????//回顧一下前面我們講的內容,接下來我們就需要開始等待客戶端的連接了。 ????????//所以我們要使用的是accept這個函數,并且當accept方法獲得一個客戶端請求時,會返回 ????????//一個socket對象,?這個socket對象讓服務器可以用來和客戶端通信的一個端點。 ????????//開始等待客戶端連接,如果沒有客戶端連接,就會一直阻塞在這個位置 ????????Socket?socket=serverSocket.accept(); ????????//很可能有多個客戶端來發起連接,為了區分客戶端,咱們可以輸出客戶端的端口號 ????????System.out.println("客戶端:"+socket.getPort()+"已連接"); ????????//一旦有客戶端連接過來,我們就可以用到IO來獲得客戶端傳過來的數據。 ????????//使用InputStream來獲得客戶端的輸入數據 ????????//bufferedReader大家還記得吧,他維護了一個緩沖區可以減少數據源讀取的頻率 ????????BufferedReader?bufferedReader=new?BufferedReader(new?InputStreamReader(socket.getInputStream())); ????????String?clientStr=bufferedReader.readLine();?//讀取一行信息 ????????System.out.println("客戶端發了一段消息:"+clientStr); ????????//服務端收到數據以后,可以給到客戶端一個回復。這里咱們用到BufferedWriter ????????BufferedWriter?bufferedWriter=new?BufferedWriter(new?OutputStreamWriter(socket.getOutputStream())); ????????bufferedWriter.write("我已經收到你的消息了\n"); ????????bufferedWriter.flush();?//清空緩沖區觸發消息發送 ????} }
BIOClientSocket
public?class?BIOClientSocket?{ ????static?final?int?DEFAULT_PORT=8080; ????public?static?void?main(String[]?args)?throws?IOException?{ ????????//在客戶端這邊,咱們使用socket來連接到指定的ip和端口 ????????Socket?socket=new?Socket("localhost",8080); ????????//使用BufferedWriter,像服務器端寫入一個消息 ????????BufferedWriter?bufferedWriter=new?BufferedWriter(new?OutputStreamWriter(socket.getOutputStream())); ????????bufferedWriter.write("我是客戶端Client-01\n"); ????????bufferedWriter.flush(); ????????BufferedReader?bufferedReader=new?BufferedReader(new?InputStreamReader(socket.getInputStream())); ????????String?serverStr=bufferedReader.readLine();?//通過bufferedReader讀取服務端返回的消息 ????????System.out.println("服務端返回的消息:"+serverStr); ????} }
上述代碼構建了一個簡單的BIO通信模型,也就是服務端建立一個監聽,客戶端向服務端發送一個消息,實現簡單的網絡通信,那BIO有什么弊端呢?
我們通過對BIOServerSocket進行改造,關注case1和case2部分。
public?class?BIOServerSocket?{ ????//先定義一個端口號,這個端口的值是可以自己調整的。 ????static?final?int?DEFAULT_PORT=8080; ????public?static?void?main(String[]?args)?throws?IOException,?InterruptedException?{ ????????ServerSocket?serverSocket=null; ????????serverSocket=new?ServerSocket(DEFAULT_PORT); ????????System.out.println("啟動服務,監聽端口:"+DEFAULT_PORT); ????????while(true)?{?//case1:?增加循環,允許循環接收請求 ????????????Socket?socket?=?serverSocket.accept(); ????????????System.out.println("客戶端:"?+?socket.getPort()?+?"已連接"); ????????????BufferedReader?bufferedReader?=?new?BufferedReader(new?InputStreamReader(socket.getInputStream())); ????????????String?clientStr?=?bufferedReader.readLine();?//讀取一行信息 ????????????System.out.println("客戶端發了一段消息:"?+?clientStr); ????????????Thread.sleep(20000);?//case2:?修改:增加等待時間 ????????????BufferedWriter?bufferedWriter?=?new?BufferedWriter(new?OutputStreamWriter(socket.getOutputStream())); ????????????bufferedWriter.write("我已經收到你的消息了\n"); ????????????bufferedWriter.flush();?//清空緩沖區觸發消息發送 ????????} ????} }
接著,把BIOClientSocket復制兩份(client1、client2),同時向BIOServerSocket發起請求。
運行后看到的現象應該是: client1先發送請求到Server端,由于Server端等待20s才返回,導致client2的請求一直被阻塞。
這個情況會導致一個問題,如果服務端在同一個時刻只能處理一個客戶端的連接,而如果一個網站同時有1000個用戶訪問,那么剩下的999個用戶都需要等待,而這個等待的耗時取決于前面的請求的處理時長,如圖4-2所示。
<center>圖4-2</center>
為了讓服務端能夠同時處理更多的客戶端連接,避免因為某個客戶端連接阻塞導致后續請求被阻塞,于是引入多線程技術,代碼如下。
ServerSocket
public?static?void?main(String[]?args)?throws?IOException,?InterruptedException?{ ????final?int?DEFAULT_PORT=8080; ????ServerSocket?serverSocket=null; ????serverSocket=new?ServerSocket(DEFAULT_PORT); ????System.out.println("啟動服務,監聽端口:"+DEFAULT_PORT); ????ExecutorService?executorService=?Executors.newFixedThreadPool(5); ????while(true)?{ ????????Socket?socket?=?serverSocket.accept(); ????????executorService.submit(new?SocketThread(socket)); ????} }
SocketThread
public?class?SocketThread?implements?Runnable{ ????Socket?socket; ????public?SocketThread(Socket?socket)?{ ????????this.socket?=?socket; ????} ????@Override ????public?void?run()?{ ????????System.out.println("客戶端:"?+?socket.getPort()?+?"已連接"); ????????try?{ ????????????BufferedReader?bufferedReader?=?new?BufferedReader(new?InputStreamReader(socket.getInputStream())); ????????????String?clientStr?=?null;?//讀取一行信息 ????????????clientStr?=?bufferedReader.readLine(); ????????????System.out.println("客戶端發了一段消息:"?+?clientStr); ????????????Thread.sleep(20000); ????????????BufferedWriter?bufferedWriter?=?new?BufferedWriter(new?OutputStreamWriter(socket.getOutputStream())); ????????????bufferedWriter.write("我已經收到你的消息了\n"); ????????????bufferedWriter.flush();?//清空緩沖區觸發消息發送 ????????}?catch?(IOException?e)?{ ????????????e.printStackTrace(); ????????}?catch?(InterruptedException?e)?{ ????????????e.printStackTrace(); ????????} ????} }
如圖4-3所示,當引入了多線程之后,每個客戶端的鏈接(Socket),我們可以直接給到線程池去執行,而由于這個過程是異步的,所以并不會同步阻塞影響后續鏈接的監聽,因此在一定程度上可以提升服務端鏈接的處理數量。
<center>圖4-3</center>
使用多線程的方式來解決這個問題,仍然有一個缺點,線程的數量取決于硬件配置,所以線程數量是有限的,如果請求量比較大的時候,線程本身會收到限制從而并發量也不會太高。那怎么辦呢,我們可以采用非阻塞IO。
NIO 從JDK1.4 提出的,本意是New IO,它的出現為了彌補原本IO的不足,提供了更高效的方式,提出一個通道(channel)的概念,在IO中它始終以流的形式對數據的傳輸和接受,下面我們演示一下NIO的使用。
NioServerSocket
public?class?NioServerSocket?{ ????public?static?void?main(String[]?args)?{ ????????try?{ ????????????ServerSocketChannel?serverSocketChannel?=?ServerSocketChannel.open(); ????????????serverSocketChannel.configureBlocking(false); ????????????serverSocketChannel.socket().bind(new?InetSocketAddress(8080)); ????????????while?(true)?{ ????????????????SocketChannel?socketChannel?=?serverSocketChannel.accept(); ????????????????if?(socketChannel?!=?null)?{ ????????????????????//讀取數據 ????????????????????ByteBuffer?buffer?=?ByteBuffer.allocate(1024); ????????????????????socketChannel.read(buffer); ????????????????????System.out.println(new?String(buffer.array())); ????????????????????//寫出數據 ????????????????????Thread.sleep(10000);?//阻塞一段時間 ????????????????????//當數據讀取到緩沖區之后,接下來就需要把緩沖區的數據寫出到通道,而在寫出之前必須要調用flip方法,實際上就是重置一個有效字節范圍,然后把這個數據接觸到通道。 ????????????????????buffer.flip(); ????????????????????socketChannel.write(buffer);//寫出數據 ????????????????}?else?{ ????????????????????Thread.sleep(1000); ????????????????????System.out.println("連接未就緒"); ????????????????} ????????????} ????????}?catch?(IOException?e)?{ ????????????e.printStackTrace(); ????????}?catch?(InterruptedException?e)?{ ????????????e.printStackTrace(); ????????} ????} }
NioClientSocket
public?class?NioClientSocket?{ ????public?static?void?main(String[]?args)?{ ????????try?{ ????????????SocketChannel?socketChannel=?SocketChannel.open(); ????????????socketChannel.configureBlocking(false); ????????????socketChannel.connect(new?InetSocketAddress("localhost",8080)); ????????????if(socketChannel.isConnectionPending()){ ????????????????socketChannel.finishConnect(); ????????????} ????????????ByteBuffer?byteBuffer=?ByteBuffer.allocate(1024); ????????????byteBuffer.put("Hello?I'M?SocketChannel?Client".getBytes()); ????????????byteBuffer.flip(); ????????????socketChannel.write(byteBuffer); ????????????//讀取服務端數據 ????????????byteBuffer.clear(); ????????????while(true)?{ ????????????????int?i?=?socketChannel.read(byteBuffer); ????????????????if?(i?>?0)?{ ????????????????????System.out.println("收到服務端的數據:"?+?new?String(byteBuffer.array())); ????????????????}?else?{ ????????????????????System.out.println("服務端數據未準備好"); ????????????????????Thread.sleep(1000); ????????????????} ????????????} ????????}?catch?(IOException?|?InterruptedException?e)?{ ????????????e.printStackTrace(); ????????} ????} }
所謂的NIO(非阻塞IO),其實就是取消了IO阻塞和連接阻塞,當服務端不存在阻塞的時候,就可以不斷輪詢處理客戶端的請求,如圖4-4所示,表示NIO下的運行流程。
<center>圖4-4</center>
上述這種NIO的使用方式,仍然存在一個問題,就是客戶端或者服務端需要通過一個線程不斷輪詢才能獲得結果,而這個輪詢過程中會浪費線程資源。
大家站在全局的角度再思考一下整個過程,有哪些地方可以優化呢?
我們回到NIOClientSocket中下面這段代碼,當客戶端通過read
方法去讀取服務端返回的數據時,如果此時服務端數據未準備好,對于客戶端來說就是一次無效的輪詢。
我們能不能夠設計成,當客戶端調用read
方法之后,不僅僅不阻塞,同時也不需要輪詢。而是等到服務端的數據就緒之后, 告訴客戶端。然后客戶端再去讀取服務端返回的數據呢?
就像點外賣一樣,我們在網上下單之后,繼續做其他事情,等到外賣到了公司,外賣小哥主動打電話告訴你,你直接去前臺取餐即可。
while(true)?{ ????int?i?=?socketChannel.read(byteBuffer); ????if?(i?>?0)?{ ????????System.out.println("收到服務端的數據:"?+?new?String(byteBuffer.array())); ????}?else?{ ????????System.out.println("服務端數據未準備好"); ????????Thread.sleep(1000); ????} }
所以為了優化這個問題,引入了多路復用機制。
I/O多路復用的本質是通過一種機制(系統內核緩沖I/O數據),讓單個進程可以監視多個文件描述符,一旦某個描述符就緒(一般是讀就緒或寫就緒),能夠通知程序進行相應的讀寫操作
什么是fd:在linux中,內核把所有的外部設備都當成是一個文件來操作,對一個文件的讀寫會調用內核提供的系統命令,返回一個fd(文件描述符)。而對于一個socket的讀寫也會有相應的文件描述符,成為socketfd。
常見的IO多路復用方式有**【select、poll、epoll】**,都是Linux API提供的IO復用方式,那么接下來重點講一下select、和epoll這兩個模型
**select:**進程可以通過把一個或者多個fd傳遞給select系統調用,進程會阻塞在select操作上,這樣select可以幫我們檢測多個fd是否處于就緒狀態,這個模式有兩個缺點
epoll:linux還提供了epoll的系統調用,epoll是基于事件驅動方式來代替順序掃描,因此性能相對來說更高,主要原理是,當被監聽的fd中,有fd就緒時,會告知當前進程具體哪一個fd就緒,那么當前進程只需要去從指定的fd上讀取數據即可,另外,epoll所能支持的fd上線是操作系統的最大文件句柄,這個數字要遠遠大于1024
【由于epoll能夠通過事件告知應用進程哪個fd是可讀的,所以我們也稱這種IO為異步非阻塞IO,當然它是偽異步的,因為它還需要去把數據從內核同步復制到用戶空間中,真正的異步非阻塞,應該是數據已經完全準備好了,我只需要從用戶空間讀就行】
I/O多路復用的好處是可以通過把多個I/O的阻塞復用到同一個select的阻塞上,從而使得系統在單線程的情況下可以同時處理多個客戶端請求。它的最大優勢是系統開銷小,并且不需要創建新的進程或者線程,降低了系統的資源開銷,它的整體實現思想如圖4-5所示。
客戶端請求到服務端后,此時客戶端在傳輸數據過程中,為了避免Server端在read客戶端數據過程中阻塞,服務端會把該請求注冊到Selector復路器上,服務端此時不需要等待,只需要啟動一個線程,通過selector.select()阻塞輪詢復路器上就緒的channel即可,也就是說,如果某個客戶端連接數據傳輸完成,那么select()方法會返回就緒的channel,然后執行相關的處理即可。
<center>圖4-5</center>
NIOServer的實現如下
測試訪問的時候,直接在cmd中通過telnet連接NIOServer,便可發送信息。
public?class?NIOServer?implements?Runnable{ ????Selector?selector; ????ServerSocketChannel?serverSocketChannel; ????public?NIOServer(int?port)?throws?IOException?{ ????????selector=Selector.open();?//多路復用器 ????????serverSocketChannel=ServerSocketChannel.open(); ????????//綁定監聽端口 ????????serverSocketChannel.socket().bind(new?InetSocketAddress(port)); ????????serverSocketChannel.configureBlocking(false);//非阻塞配置 ????????//針對serverSocketChannel注冊一個ACCEPT連接監聽事件 ????????serverSocketChannel.register(selector,?SelectionKey.OP_ACCEPT); ????} ????@Override ????public?void?run()?{ ????????while(!Thread.interrupted()){ ????????????try?{ ????????????????selector.select();?//阻塞等待事件就緒 ????????????????Set?selected=selector.selectedKeys();?//得到事件列表 ????????????????Iterator?it=selected.iterator(); ????????????????while(it.hasNext()){ ????????????????????dispatch((SelectionKey)?it.next());?//分發事件 ????????????????????it.remove();?//移除當前時間 ????????????????} ????????????}?catch?(IOException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????} ????} ????private?void?dispatch(SelectionKey?key)?throws?IOException?{ ????????if(key.isAcceptable()){?//如果是客戶端的連接事件,則需要針對該連接注冊讀寫事件 ????????????register(key); ????????}else?if(key.isReadable()){ ????????????read(key); ????????}else?if(key.isWritable()){ ????????????write(key); ????????} ????} ????private?void?register(SelectionKey?key)?throws?IOException?{ ????????//得到事件對應的連接 ????????ServerSocketChannel?server=(ServerSocketChannel)key.channel(); ????????SocketChannel?channel=server.accept();?//獲得客戶端的鏈接 ????????channel.configureBlocking(false); ????????//把當前客戶端連接注冊到selector上,注冊事件為READ, ????????//?也就是當前channel可讀時,就會觸發事件,然后讀取客戶端的數據 ????????channel.register(this.selector,SelectionKey.OP_READ); ????} ????private?void?read(SelectionKey?key)?throws?IOException?{ ????????SocketChannel?channel=(SocketChannel)key.channel(); ????????ByteBuffer?byteBuffer=?ByteBuffer.allocate(1024); ????????channel.read(byteBuffer);?//把數據從channel讀取到緩沖區 ????????System.out.println("server?receive?msg:"+new?String(byteBuffer.array())); ????} ????private?void?write(SelectionKey?key)?throws?IOException?{ ????????SocketChannel?channel=(SocketChannel)key.channel(); ????????//寫一個信息給到客戶端 ????????channel.write(ByteBuffer.wrap("hello?Client,I'm?NIO?Server\r\n".getBytes())); ????} ????public?static?void?main(String[]?args)?throws?IOException?{ ????????NIOServer?server=new?NIOServer(8888); ????????new?Thread(server).start(); ????} }
事實上NIO已經解決了上述BIO暴露的下面兩個問題:
到這里為止,通過NIO的多路復用機制,解決了IO阻塞導致客戶端連接處理受限的問題,服務端只需要一個線程就可以維護多個客戶端,并且客戶端的某個連接如果準備就緒時,會通過事件機制告訴應用程序某個channel可用,應用程序通過select方法選出就緒的channel進行處理。
了解了NIO多路復用后,就有必要再和大家說一下Reactor多路復用高性能I/O設計模式,Reactor本質上就是基于NIO多路復用機制提出的一個高性能IO設計模式,它的核心思想是把響應IO事件和業務處理進行分離,通過一個或者多個線程來處理IO事件,然后將就緒得到事件分發到業務處理handlers線程去異步非阻塞處理,如圖4-6所示。
Reactor模型有三個重要的組件:
<center>圖4-6</center>
下面演示一個單線程的Reactor模型。
Reactor 負責響應IO事件,一旦發生,廣播發送給相應的Handler去處理。
public?class?Reactor?implements?Runnable{ ????private?final?Selector?selector; ????private?final?ServerSocketChannel?serverSocketChannel; ????public?Reactor(int?port)?throws?IOException?{ ????????//創建選擇器 ????????selector=?Selector.open(); ????????//創建NIO-Server ????????serverSocketChannel=ServerSocketChannel.open(); ????????serverSocketChannel.bind(new?InetSocketAddress(port)); ????????serverSocketChannel.configureBlocking(false); ????????SelectionKey?key=serverSocketChannel.register(selector,?SelectionKey.OP_ACCEPT); ????????//?綁定一個附加對象 ????????key.attach(new?Acceptor(selector,serverSocketChannel)); ????} ????@Override ????public?void?run()?{ ????????while(!Thread.interrupted()){ ????????????try?{ ????????????????selector.select();?//阻塞等待就緒事件 ????????????????Set?selectionKeys=selector.selectedKeys(); ????????????????Iterator?it=selectionKeys.iterator(); ????????????????while(it.hasNext()){ ????????????????????dispatch((SelectionKey)?it.next()); ????????????????????it.remove(); ????????????????} ????????????}?catch?(IOException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????} ????} ????public?void?dispatch(SelectionKey?key){ ????????//調用之前注冊時附加的對象,也就是attach附加的acceptor ????????Runnable?r=(Runnable)key.attachment(); ????????if(r!=null){ ????????????r.run(); ????????} ????} ????public?static?void?main(String[]?args)?throws?IOException?{ ????????new?Thread(new?Reactor(8888)).start(); ????} }
public?class?Acceptor?implements?Runnable{ ????private?Selector?selector; ????private?ServerSocketChannel?serverSocketChannel; ????public?Acceptor(Selector?selector,?ServerSocketChannel?serverSocketChannel)?{ ????????this.selector?=?selector; ????????this.serverSocketChannel?=?serverSocketChannel; ????} ????@Override ????public?void?run()?{ ????????SocketChannel?channel; ????????try?{ ????????????channel=serverSocketChannel.accept(); ????????????System.out.println(channel.getRemoteAddress()+":?收到一個客戶端連接"); ????????????channel.configureBlocking(false); ????????????//當channel連接中數據就緒時,調用DispatchHandler來處理channel ????????????//巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發生事件的channel鏈接在一起,當發生事件時,可以立即觸發相應鏈接的Handler。 ????????????channel.register(selector,?SelectionKey.OP_READ,new?DispatchHandler(channel)); ????????}?catch?(IOException?e)?{ ????????????e.printStackTrace(); ????????} ????} }
public?class?DispatchHandler?implements?Runnable{ ????private?SocketChannel?channel; ????public?DispatchHandler(SocketChannel?channel)?{ ????????this.channel?=?channel; ????} ????@Override ????public?void?run()?{ ????????System.out.println(Thread.currentThread().getName()+"---handler");?//case:?打印當前線程名稱,證明I/O是同一個線程來處理。 ????????ByteBuffer?buffer=ByteBuffer.allocate(1024); ????????int?len=0,total=0; ????????String?msg=""; ????????try?{ ????????????do?{ ????????????????len?=?channel.read(buffer); ????????????????if?(len?>?0)?{ ????????????????????total?+=?len; ????????????????????msg?+=?new?String(buffer.array()); ????????????????} ????????????????buffer.clear(); ????????????}?while?(len?>?buffer.capacity()); ????????????System.out.println(channel.getRemoteAddress()+":Server?Receive?msg:"+msg); ????????}catch?(Exception?e){ ????????????e.printStackTrace(); ????????????if(channel!=null){ ????????????????try?{ ????????????????????channel.close(); ????????????????}?catch?(IOException?ioException)?{ ????????????????????ioException.printStackTrace(); ????????????????} ????????????} ????????} ????} }
演示方式,通過window的cmd窗口,使用telnet 192.168.1.102 8888 連接到Server端進行數據通信;也可以通過下面這樣一個客戶端程序來訪問。
public?class?ReactorClient?{ ????private?static?Selector?selector; ????public?static?void?main(String[]?args)?throws?IOException?{ ????????selector=Selector.open(); ????????//創建一個連接通道連接指定的server ????????SocketChannel?socketChannel=?SocketChannel.open(); ????????socketChannel.configureBlocking(false); ????????socketChannel.connect(new?InetSocketAddress("192.168.1.102",8888)); ????????socketChannel.register(selector,?SelectionKey.OP_CONNECT); ????????while(true){ ????????????selector.select(); ????????????Set<SelectionKey>?selectionKeys=selector.selectedKeys(); ????????????Iterator<SelectionKey>?iterator=selectionKeys.iterator(); ????????????while(iterator.hasNext()){ ????????????????SelectionKey?key=iterator.next(); ????????????????iterator.remove(); ????????????????if(key.isConnectable()){ ????????????????????handleConnection(key); ????????????????}else?if(key.isReadable()){ ????????????????????handleRead(key); ????????????????} ????????????} ????????} ????} ????private?static?void?handleConnection(SelectionKey?key)?throws?IOException?{ ????????SocketChannel?socketChannel=(SocketChannel)key.channel(); ????????if(socketChannel.isConnectionPending()){ ????????????socketChannel.finishConnect(); ????????} ????????socketChannel.configureBlocking(false); ????????while(true)?{ ????????????Scanner?in?=?new?Scanner(System.in); ????????????String?msg?=?in.nextLine(); ????????????socketChannel.write(ByteBuffer.wrap(msg.getBytes())); ????????????socketChannel.register(selector,SelectionKey.OP_READ); ????????} ????} ????private?static?void?handleRead(SelectionKey?key)?throws?IOException?{ ????????SocketChannel?channel=(SocketChannel)key.channel(); ????????ByteBuffer?byteBuffer=ByteBuffer.allocate(1024); ????????channel.read(byteBuffer); ????????System.out.println("client?receive?msg:"+new?String(byteBuffer.array())); ????} }
這是最基本的單Reactor單線程模型**(整體的I/O操作是由同一個線程完成的)**。
其中Reactor線程,負責多路分離套接字,有新連接到來觸發connect 事件之后,交由Acceptor進行處理,有IO讀寫事件之后交給hanlder 處理。
Acceptor主要任務就是構建handler ,在獲取到和client相關的SocketChannel之后 ,綁定到相應的hanlder上,對應的SocketChannel有讀寫事件之后,基于racotor 分發,hanlder就可以處理了(所有的IO事件都綁定到selector上,有Reactor分發)
Reactor 模式本質上指的是使用 I/O 多路復用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)的模式。
單線程Reactor這種實現方式有存在著缺點,從實例代碼中可以看出,handler的執行是串行的,如果其中一個handler處理線程阻塞將導致其他的業務處理阻塞。由于handler和reactor在同一個線程中的執行,這也將導致新的無法接收新的請求,我們做一個小實驗:
為了解決這種問題,有人提出使用多線程的方式來處理業務,也就是在業務處理的地方加入線程池異步處理,將reactor和handler在不同的線程來執行,如圖4-7所示。
<center>圖4-7</center>
我們直接將4.2.5小節中的Reactor單線程模型改成多線程,其實我們就是把IO阻塞的問題通過異步的方式做了優化,代碼如下,
public?class?MultiDispatchHandler?implements?Runnable{ ????private?SocketChannel?channel; ????public?MultiDispatchHandler(SocketChannel?channel)?{ ????????this.channel?=?channel; ????} ????private?static?Executor?executor?=?Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()?<<?1); ????@Override ????public?void?run()?{ ????????processor(); ????} ????private?void?processor(){ ????????executor.execute(new?ReaderHandler(channel)); ????} ????public?static?class?ReaderHandler?implements?Runnable{ ????????private?SocketChannel?channel; ????????public?ReaderHandler(SocketChannel?socketChannel)?{ ????????????this.channel?=?socketChannel; ????????} ????????@Override ????????public?void?run()?{ ????????????System.out.println(Thread.currentThread().getName()+"---handler");?//case:?打印當前線程名稱,證明I/O是同一個線程來處理。 ????????????ByteBuffer?buffer=?ByteBuffer.allocate(1024); ????????????int?len=0; ????????????String?msg=""; ????????????try?{ ????????????????do?{ ????????????????????len?=?channel.read(buffer); ????????????????????if?(len?>?0)?{ ????????????????????????msg?+=?new?String(buffer.array()); ????????????????????} ????????????????????buffer.clear(); ????????????????}?while?(len?>?buffer.capacity()); ????????????????if(len>0)?{ ????????????????????System.out.println(channel.getRemoteAddress()?+?":Server?Receive?msg:"?+?msg); ????????????????} ????????????}catch?(Exception?e){ ????????????????e.printStackTrace(); ????????????????if(channel!=null){ ????????????????????try?{ ????????????????????????channel.close(); ????????????????????}?catch?(IOException?ioException)?{ ????????????????????????ioException.printStackTrace(); ????????????????????} ????????????????} ????????????} ????????} ????} }
public?class?Acceptor?implements?Runnable{ ????private?Selector?selector; ????private?ServerSocketChannel?serverSocketChannel; ????public?Acceptor(Selector?selector,?ServerSocketChannel?serverSocketChannel)?{ ????????this.selector?=?selector; ????????this.serverSocketChannel?=?serverSocketChannel; ????} ????@Override ????public?void?run()?{ ????????SocketChannel?channel; ????????try?{ ????????????channel=serverSocketChannel.accept(); ????????????System.out.println(channel.getRemoteAddress()+":?收到一個客戶端連接"); ????????????channel.configureBlocking(false); ????????????//當channel連接中數據就緒時,調用DispatchHandler來處理channel ????????????//巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發生事件的channel鏈接在一起,當發生事件時,可以立即觸發相應鏈接的Handler。 ????????????channel.register(selector,?SelectionKey.OP_READ,new?MultiDispatchHandler(channel)); ????????}?catch?(IOException?e)?{ ????????????e.printStackTrace(); ????????} ????} }
在多線程Reactor模型中,添加了一個工作者線程池,并將非I/O操作從Reactor線程中移出轉交給工作者線程池來執行。這樣能夠提高Reactor線程的I/O響應,不至于因為一些耗時的業務邏輯而延遲對后面I/O請求的處理。
在多線程單Reactor模型中,我們發現所有的I/O操作是由一個Reactor來完成,而Reactor運行在單個線程中,它需要處理包括Accept()
/read()
/write
/connect
操作,對于小容量的場景,影響不大。但是對于高負載、大并發或大數據量的應用場景時,容易成為瓶頸,主要原因如下:
所以,我們還可以更進一步優化,引入多Reactor多線程模式,如圖4-8所示,Main Reactor負責接收客戶端的連接請求,然后把接收到的請求傳遞給SubReactor(其中subReactor可以有多個),具體的業務IO處理由SubReactor完成。
Multiple Reactors 模式通常也可以等同于 Master-Workers 模式,比如 Nginx 和 Memcached 等就是采用這種多線程模型,雖然不同的項目實現細節略有區別,但總體來說模式是一致的。
<center>圖4-8</center>
Acceptor,請求接收者,在實踐時其職責類似服務器,并不真正負責連接請求的建立,而只將其請求委托 Main Reactor 線程池來實現,起到一個轉發的作用。
Main Reactor,主 Reactor 線程組,主要負責連接事件,并將IO讀寫請求轉發到 SubReactor 線程池。
Sub Reactor,Main Reactor 通常監聽客戶端連接后會將通道的讀寫轉發到 Sub Reactor 線程池中一個線程(負載均衡),負責數據的讀寫。在 NIO 中 通常注冊通道的讀(OP_READ)、寫事件(OP_WRITE)。
public?class?MultiplyReactor?{ ????public?static?void?main(String[]?args)?throws?IOException?{ ????????MultiplyReactor?mr?=?new?MultiplyReactor(8888); ????????mr.start(); ????} ????private?static?final?int?POOL_SIZE?=?Runtime.getRuntime().availableProcessors(); ????//?Reactor(Selector)?線程池,其中一個線程被?mainReactor?使用,剩余線程都被?subReactor?使用 ????static?Executor?mainReactorExecutor?=?Executors.newFixedThreadPool(POOL_SIZE); ????//?主?Reactor,接收連接,把?SocketChannel?注冊到從?Reactor?上 ????private?Reactor?mainReactor; ????private?int?port; ????public?MultiplyReactor(int?port)?{ ????????try?{ ????????????this.port?=?port; ????????????mainReactor?=?new?Reactor(); ????????}?catch?(IOException?e)?{ ????????????e.printStackTrace(); ????????} ????} ????/** ?????*?啟動主從?Reactor,初始化并注冊?Acceptor?到主?Reactor ?????*/ ????public?void?start()?throws?IOException?{ ????????new?Acceptor(mainReactor.getSelector(),?port);?//?將?ServerSocketChannel?注冊到?mainReactor ????????mainReactorExecutor.execute(mainReactor);?//使用線程池來處理main?Reactor的連接請求 ????} }
public?class?Reactor?implements?Runnable{ ????private?ConcurrentLinkedQueue<AsyncHandler>?events=new?ConcurrentLinkedQueue<>(); ????private?final?Selector?selector; ????public?Reactor()?throws?IOException?{ ????????this.selector?=?Selector.open(); ????} ????public?Selector?getSelector(){ ????????return?selector; ????} ????@Override ????public?void?run()?{ ????????try?{ ????????????while?(!Thread.interrupted())?{ ????????????????AsyncHandler?handler; ????????????????while?((handler?=?events.poll())?!=?null)?{ ????????????????????handler.getChannel().configureBlocking(false); ????????????????????SelectionKey?sk=handler.getChannel().register(selector,?SelectionKey.OP_READ); ????????????????????sk.attach(handler); ????????????????????handler.setSk(sk); ????????????????} ????????????????selector.select();?//阻塞 ????????????????Set<SelectionKey>?selectionKeys=selector.selectedKeys(); ????????????????Iterator<SelectionKey>?it=selectionKeys.iterator(); ????????????????while(it.hasNext()){ ????????????????????SelectionKey?key=it.next(); ????????????????????//獲取attach方法傳入的附加對象 ????????????????????Runnable?runnable=(Runnable)key.attachment(); ????????????????????if(runnable!=null){ ????????????????????????runnable.run(); ????????????????????} ????????????????????it.remove(); ????????????????} ????????????} ????????}catch?(Exception?e){ ????????????e.printStackTrace(); ????????} ????} ????public?void?register(AsyncHandler?asyncHandler){ ????????events.offer(asyncHandler); ????????selector.wakeup(); ????} }
public?class?Acceptor?implements?Runnable{ ????final?Selector?sel; ????final?ServerSocketChannel?serverSocket; ????int?handleNext?=?0; ????private?final?int?POOL_SIZE=Runtime.getRuntime().availableProcessors(); ????private?Executor?subReactorExecutor=?Executors.newFixedThreadPool(POOL_SIZE); ????private?Reactor[]?subReactors=new?Reactor[POOL_SIZE-1]; ????public?Acceptor(Selector?sel,?int?port)?throws?IOException?{ ????????this.sel?=?sel; ????????serverSocket?=?ServerSocketChannel.open(); ????????serverSocket.socket().bind(new?InetSocketAddress(port));?//?綁定端口 ????????//?設置成非阻塞模式 ????????serverSocket.configureBlocking(false); ????????//?注冊到?選擇器?并設置處理?socket?連接事件 ????????serverSocket.register(sel,?SelectionKey.OP_ACCEPT,this); ????????init(); ????????System.out.println("mainReactor-"?+?"Acceptor:?Listening?on?port:?"?+?port); ????} ????public?void?init()?throws?IOException?{ ????????for?(int?i?=?0;?i?<?subReactors.length;?i++)?{ ????????????subReactors[i]=new?Reactor(); ????????????subReactorExecutor.execute(subReactors[i]); ????????} ????} ????@Override ????public?synchronized?void?run()?{ ????????try?{ ????????????//?接收連接,非阻塞模式下,沒有連接直接返回?null ????????????SocketChannel?sc?=?serverSocket.accept(); ????????????if?(sc?!=?null)?{ ????????????????//?把提示發到界面 ????????????????sc.write(ByteBuffer.wrap("Multiply?Reactor?Pattern?Example\r\nreactor>?".getBytes())); ????????????????System.out.println(Thread.currentThread().getName()+":Main-Reactor-Acceptor:?"?+?sc.socket().getLocalSocketAddress()?+"?注冊到?subReactor-"?+?handleNext); ????????????????//?如何解決呢,直接調用?wakeup,有可能還沒有注冊成功又阻塞了。這是一個多線程同步的問題,可以借助隊列進行處理 ????????????????Reactor?subReactor?=?subReactors[handleNext]; ????????????????subReactor.register(new?AsyncHandler(sc)); ????????????????if(++handleNext?==?subReactors.length)?{ ????????????????????handleNext?=?0; ????????????????} ????????????} ????????}?catch?(Exception?ex)?{ ????????????ex.printStackTrace(); ????????} ????} }
public?class?AsyncHandler?implements?Runnable{ ????private?SocketChannel?channel; ????private?SelectionKey?sk; ????ByteBuffer?inputBuffer=ByteBuffer.allocate(1024); ????ByteBuffer?outputBuffer=ByteBuffer.allocate(1024); ????StringBuilder?builder=new?StringBuilder();?//存儲客戶端的完整消息 ????public?AsyncHandler(SocketChannel?channel){ ????????this.channel=channel; ????} ????public?SocketChannel?getChannel()?{ ????????return?channel; ????} ????public?void?setSk(SelectionKey?sk)?{ ????????this.sk?=?sk; ????} ????@Override ????public?void?run()?{ ????????try?{ ????????????if?(sk.isReadable())?{ ????????????????read(); ????????????}?else?if?(sk.isWritable())?{ ????????????????write(); ????????????} ????????}catch?(Exception?e){ ????????????try?{ ????????????????this.sk.channel().close(); ????????????}?catch?(IOException?ioException)?{ ????????????????ioException.printStackTrace(); ????????????} ????????} ????} ????protected?void?read()?throws?IOException?{ ????????inputBuffer.clear(); ????????int?n=channel.read(inputBuffer); ????????if(inputBufferComplete(n)){ ????????????System.out.println(Thread.currentThread().getName()+":Server端收到客戶端的請求消息:"+builder.toString()); ????????????outputBuffer.put(builder.toString().getBytes(StandardCharsets.UTF_8)); ????????????this.sk.interestOps(SelectionKey.OP_WRITE);?//更改服務的邏輯狀態以及處理的事件類型 ????????} ????} ????private?boolean?inputBufferComplete(int?bytes)?throws?EOFException?{ ????????if(bytes>0){ ????????????inputBuffer.flip();?//轉化成讀取模式 ????????????while(inputBuffer.hasRemaining()){?//判斷緩沖區中是否還有元素 ????????????????byte?ch=inputBuffer.get();?//得到輸入的字符 ????????????????if(ch==3){?//表示Ctrl+c?關閉連接 ????????????????????throw?new?EOFException(); ????????????????}else?if(ch=='\r'||ch=='\n'){?//表示換行符 ????????????????????return?true; ????????????????}else{ ????????????????????builder.append((char)ch);?//拼接讀取到的數據 ????????????????} ????????????} ????????}else?if(bytes==-1){ ????????????throw?new?EOFException();?//客戶端關閉了連接 ????????} ????????return?false; ????} ????private?void?write()?throws?IOException?{ ????????int?written=-1; ????????outputBuffer.flip();?//轉化為讀模式,判斷是否有數據需要發送 ????????if(outputBuffer.hasRemaining()){ ????????????written=channel.write(outputBuffer);?//把數據寫回客戶端 ????????} ????????outputBuffer.clear(); ????????builder.delete(0,builder.length()); ????????if(written<=0){?//表示客戶端沒有輸信息 ????????????this.sk.channel().close(); ????????}else{ ????????????channel.write(ByteBuffer.wrap("\r\nreactor>".getBytes())); ????????????this.sk.interestOps(SelectionKey.OP_READ); ????????} ????} }
關注[跟著Mic學架構]公眾號,獲取更多精品原創
|