從這一篇博文起,我們開始剖析Muduo網絡庫的源碼,主要結合《Linux多線程服務端編程》和網上的一些學習資料!
(一)TCP網絡編程的本質:三個半事件
1. 連接的建立,包括服務端接受(accept) 新連接和客戶端成功發起(connect) 連接。TCP 連接一旦建立,客戶端和服務端是平等的,可以各自收發數據。
2. 連接的斷開,包括主動斷開(close 或shutdown) 和被動斷開(read(2) 返回0)。
3. 消息到達,文件描述符可讀。這是最為重要的一個事件,對它的處理方式決定了網絡編程的風格(阻塞還是非阻塞,如何處理分包,應用層的緩沖如何設計等等)。
3.5 消息發送完畢,這算半個。對於低流量的服務,可以不必關心這個事件;另外,這裡“發送完畢”是指將數據寫入操作系統的緩沖區,將由TCP 協議棧負責數據的發送與重傳,不代表對方已經收到數據。
這其中,最主要的便是第三點: 消息到達,文件描述符可讀。下面我們來仔細分析(順便分析消息發送完畢):
(1)消息到達,文件可讀:
內核接收-> 網絡庫可讀事件觸發--> 將數據從內核轉至應用緩沖區(並且回調函數OnMessage根據協議判斷是否是完整的數據包,如果不是立即返回)-->如果完整就取出讀走、解包、處理、發送(read decode compute encode write)
(2)消息發送完畢:
應用緩沖區-->內核緩沖區(可全填)--->觸發發送完成的事件,回調Onwrite。如果內核緩沖區不足以容納數據(高流量的服務),要把數據追加到應用層發送緩沖區中內核數據發送之後,觸發socket可寫事件,應用層-->內核;當全發送至內核時,又會回調Onwrite(可繼續寫)
(二)事件循環類圖
EventLoop類:
EventLoop是對Reactor模式的封裝,由於Muduo的並發原型是 Multiple reactors + threadpool (one loop per thread + threadpool),所以每個線程最多只能有一個EventLoop對象。EventLoop對象構造的時候,會檢查當前線程是否已經創建了其他EventLoop對象,如果已創建,終止程序(LOG_FATAL),EventLoop類的構造函數會記錄本對象所屬線程(threadld_),創建了EventLoop對象的線程稱為IO線程,其功能是運行事件循環(EventLoop:loop),啥也不干==
下面是簡化版的EventLoop(內部的Poller尚未實現,只是一個框架)
EventLoop.h
#ifndef MUDUO_NET_EVENTLOOP_H #define MUDUO_NET_EVENTLOOP_H #include#include #include namespace muduo { namespace net { /// Reactor, at most one per thread. /// This is an interface class, so don't expose too much details. class EventLoop : boost::noncopyable { public: EventLoop(); ~EventLoop(); // force out-line dtor, for scoped_ptr members. /// Loops forever. /// Must be called in the same thread as creation of the object. void loop(); void assertInLoopThread() { if (!isInLoopThread()) { abortNotInLoopThread(); } } bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } static EventLoop* getEventLoopOfCurrentThread(); private: void abortNotInLoopThread(); bool looping_; /* atomic */ const pid_t threadId_; // 當前對象所屬線程ID }; } } #endif // MUDUO_NET_EVENTLOOP_H
EventLoop.c
#include#include #include using namespace muduo; using namespace muduo::net; namespace { // 當前線程EventLoop對象指針 // 線程局部存儲 __thread EventLoop* t_loopInThisThread = 0; } EventLoop* EventLoop::getEventLoopOfCurrentThread() { return t_loopInThisThread; } EventLoop::EventLoop() : looping_(false), threadId_(CurrentThread::tid()) { LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; // 如果當前線程已經創建了EventLoop對象,終止(LOG_FATAL) if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this; } } EventLoop::~EventLoop() { t_loopInThisThread = NULL; } // 事件循環,該函數不能跨線程調用 // 只能在創建該對象的線程中調用 void EventLoop::loop() { assert(!looping_); // 斷言當前處於創建該對象的線程中 assertInLoopThread(); looping_ = true; LOG_TRACE << "EventLoop " << this << " start looping"; ::poll(NULL, 0, 5*1000); LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; } void EventLoop::abortNotInLoopThread() { LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << threadId_ << ", current thread id = " << CurrentThread::tid(); }
時序圖:
Poller是個抽象類,具體可以是EPollPoller(默認) 或者PollPoller,需要去實現(唯一使用面向對象的一個類)
對於PollPoller來說,存在一個map,用來關聯fd和channel的,我們可以根據fd快速找到對應的channel。一個fd對應一個struct pollfd(pollfd.fd),一個fd 對應一個channel*;這個fd 可以是socket, eventfd, timerfd, signalfd。Poller的作用是更新IO復用中的channel(IO事件),添加、刪除Channel。我們看一下PollPoller的實現:
PollPoller.h
#ifndef MUDUO_NET_POLLER_POLLPOLLER_H #define MUDUO_NET_POLLER_POLLPOLLER_H #include#include
PollPoller.c
#include代碼中的幾個技巧都在注釋中標出。#include #include #include #include #include using namespace muduo; using namespace muduo::net; PollPoller::PollPoller(EventLoop* loop) : Poller(loop) { } PollPoller::~PollPoller() { } Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels) { // XXX pollfds_ shouldn't change int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs); Timestamp now(Timestamp::now()); if (numEvents > 0) { LOG_TRACE << numEvents << " events happended"; fillActiveChannels(numEvents, activeChannels); } else if (numEvents == 0) { LOG_TRACE << " nothing happended"; } else { LOG_SYSERR << "PollPoller::poll()"; } return now; } void PollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const { for (PollFdList::const_iterator pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd) { if (pfd->revents > 0) { --numEvents; ChannelMap::const_iterator ch = channels_.find(pfd->fd); assert(ch != channels_.end()); Channel* channel = ch->second; assert(channel->fd() == pfd->fd); channel->set_revents(pfd->revents); // pfd->revents = 0; activeChannels->push_back(channel); } } } void PollPoller::updateChannel(Channel* channel) { Poller::assertInLoopThread(); LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events(); if (channel->index() < 0) { // index < 0說明是一個新的通道 // a new one, add to pollfds_ assert(channels_.find(channel->fd()) == channels_.end()); struct pollfd pfd; pfd.fd = channel->fd(); pfd.events = static_cast (channel->events()); pfd.revents = 0; pollfds_.push_back(pfd); int idx = static_cast (pollfds_.size())-1; channel->set_index(idx); channels_[pfd.fd] = channel; } else { // update existing one assert(channels_.find(channel->fd()) != channels_.end()); assert(channels_[channel->fd()] == channel); int idx = channel->index(); assert(0 <= idx && idx < static_cast (pollfds_.size())); struct pollfd& pfd = pollfds_[idx]; assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1); pfd.events = static_cast (channel->events()); pfd.revents = 0; // 將一個通道暫時更改為不關注事件,但不從Poller中移除該通道 if (channel->isNoneEvent()) { // ignore this pollfd // 暫時忽略該文件描述符的事件 // 這裡pfd.fd 可以直接設置為-1 pfd.fd = -channel->fd()-1; // 這樣子設置是為了removeChannel優化 } } } void PollPoller::removeChannel(Channel* channel) { Poller::assertInLoopThread(); LOG_TRACE << "fd = " << channel->fd(); assert(channels_.find(channel->fd()) != channels_.end()); assert(channels_[channel->fd()] == channel); assert(channel->isNoneEvent()); int idx = channel->index(); assert(0 <= idx && idx < static_cast (pollfds_.size())); const struct pollfd& pfd = pollfds_[idx]; (void)pfd; assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events()); size_t n = channels_.erase(channel->fd()); assert(n == 1); (void)n; if (implicit_cast (idx) == pollfds_.size()-1) { pollfds_.pop_back(); } else { // 這裡移除的算法復雜度是O(1),將待刪除元素與最後一個元素交換再pop_back int channelAtEnd = pollfds_.back().fd; iter_swap(pollfds_.begin()+idx, pollfds_.end()-1); if (channelAtEnd < 0) { channelAtEnd = -channelAtEnd-1; } channels_[channelAtEnd]->set_index(idx); pollfds_.pop_back(); } }
Channel類:
Channel是selectable IO channel,負責注冊與響應IO 事件,它不擁有file descriptor。
Channel是Reactor結構中的“事件”,它自始至終都屬於一個EventLoop(一個EventLoop對應多個Channel,處理多個IO),負責一個文件描述符的IO事件,它包含又文件描述符fd_,但實際上它不擁有fd_,不用負責將其關閉。在Channel類中保存這IO事件的類型以及對應的回調函數,當IO事件發生時,最終會調用到Channel類中的回調函數。Channel類一般不單獨使用,它常常包含在其他類中(Acceptor、Connector、EventLoop、TimerQueue、TcpConnection)使用。Channel類有EventLoop的指針 loop_,通過這個指針可以向EventLoop中添加當前Channel事件。事件類型用events_表示,不同事件類型對應不同回調函數。
以下兩個都由Channel注冊:
Acceptor是被動連接的抽象--->關注監聽套接字的可讀事件,回調handleRead。
Connector對主動連接的抽象。
時序圖:
Channel.h
#ifndef MUDUO_NET_CHANNEL_H #define MUDUO_NET_CHANNEL_H #includeChannel.c#include #include #include #include namespace muduo { namespace net { class EventLoop; /// A selectable I/O channel. /// This class doesn't own the file descriptor. /// The file descriptor could be a socket, /// an eventfd, a timerfd, or a signalfd class Channel : boost::noncopyable { public: typedef boost::function EventCallback; typedef boost::function ReadEventCallback; Channel(EventLoop* loop, int fd); ~Channel(); void handleEvent(Timestamp receiveTime); void setReadCallback(const ReadEventCallback& cb) { readCallback_ = cb; } void setWriteCallback(const EventCallback& cb) { writeCallback_ = cb; } void setCloseCallback(const EventCallback& cb) { closeCallback_ = cb; } void setErrorCallback(const EventCallback& cb) { errorCallback_ = cb; } /// Tie this channel to the owner object managed by shared_ptr, /// prevent the owner object being destroyed in handleEvent. void tie(const boost::shared_ptr &); int fd() const { return fd_; } int events() const { return events_; } void set_revents(int revt) { revents_ = revt; } // used by pollers // int revents() const { return revents_; } bool isNoneEvent() const { return events_ == kNoneEvent; } void enableReading() { events_ |= kReadEvent; update(); } // void disableReading() { events_ &= ~kReadEvent; update(); } void enableWriting() { events_ |= kWriteEvent; update(); } void disableWriting() { events_ &= ~kWriteEvent; update(); } void disableAll() { events_ = kNoneEvent; update(); } bool isWriting() const { return events_ & kWriteEvent; } // for Poller int index() { return index_; } void set_index(int idx) { index_ = idx; } // for debug string reventsToString() const; void doNotLogHup() { logHup_ = false; } EventLoop* ownerLoop() { return loop_; } void remove(); private: void update(); void handleEventWithGuard(Timestamp receiveTime); static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; EventLoop* loop_; // 所屬EventLoop const int fd_; // 文件描述符,但不負責關閉該文件描述符 int events_; // 關注的事件 int revents_; // poll/epoll返回的事件 int index_; // used by Poller.表示在poll的事件數組中的序號 bool logHup_; // for POLLHUP boost::weak_ptr tie_; bool tied_; bool eventHandling_; // 是否處於處理事件中 ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_; }; } } #endif // MUDUO_NET_CHANNEL_H
#include這三個類之間的關系不難理解,其實本質就是一個Poll/Epoll,只不過進行了更高的抽象後劃分出來的這些類,重點理解博客開頭的那張類圖即可。#include #include #include #include using namespace muduo; using namespace muduo::net; const int Channel::kNoneEvent = 0; const int Channel::kReadEvent = POLLIN | POLLPRI; const int Channel::kWriteEvent = POLLOUT; Channel::Channel(EventLoop* loop, int fd__) : loop_(loop), fd_(fd__), events_(0), revents_(0), index_(-1), logHup_(true), tied_(false), eventHandling_(false) { } Channel::~Channel() { assert(!eventHandling_); } void Channel::tie(const boost::shared_ptr & obj) { tie_ = obj; tied_ = true; } void Channel::update() { loop_->updateChannel(this); } // 調用這個函數之前確保調用disableAll void Channel::remove() { assert(isNoneEvent()); loop_->removeChannel(this); } void Channel::handleEvent(Timestamp receiveTime) { boost::shared_ptr guard; if (tied_) { guard = tie_.lock(); if (guard) { handleEventWithGuard(receiveTime); } } else { handleEventWithGuard(receiveTime); } } void Channel::handleEventWithGuard(Timestamp receiveTime) { eventHandling_ = true; if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { if (logHup_) { LOG_WARN << "Channel::handle_event() POLLHUP"; } if (closeCallback_) closeCallback_(); } if (revents_ & POLLNVAL) { LOG_WARN << "Channel::handle_event() POLLNVAL"; } if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_(); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { if (readCallback_) readCallback_(receiveTime); } if (revents_ & POLLOUT) { if (writeCallback_) writeCallback_(); } eventHandling_ = false; } string Channel::reventsToString() const { std::ostringstream oss; oss << fd_ << ": "; if (revents_ & POLLIN) oss << "IN "; if (revents_ & POLLPRI) oss << "PRI "; if (revents_ & POLLOUT) oss << "OUT "; if (revents_ & POLLHUP) oss << "HUP "; if (revents_ & POLLRDHUP) oss << "RDHUP "; if (revents_ & POLLERR) oss << "ERR "; if (revents_ & POLLNVAL) oss << "NVAL "; return oss.str().c_str(); }
參考:
《Muduo使用手冊》
《Linux多線程服務端編程》