TcpConnection中封裝了InputBuffer和OutputBuffer,用來表示應用層的緩沖區。在發送數據時,如果不能一次將Buffer中的數據發送完畢,它還會繼續關注Channel中的可寫事件,當sockfd可寫時,會再次發送。
前面提到TcpConnection的生存期模糊,主要是因為我們不能在TcpServer中直接erase掉TcpConnection對象,因為此時有可能Channel中的handleEvent還在執行,如果析構TcpConnection對象,那麼他的成員channel_也會被析構,會導致core dump。也就是說我們需要TcpConnection 對象生存期要長於handleEvent() 函數,直到執行完connectDestroyed() 後才會析構。
斷開連接:
TcpConnection的斷開是采用被動方式,即對方先關閉連接,本地read(2)返回0後,調用順序如下:
handleClose()->TcpServer::removeConnection->TcpConnection::connectDestroyed()。
具體我們查看下面的連接關閉時序圖:
當連接到來,創建一個TcpConnection對象,立刻用shared_ptr來管理,引用計數為1,在Channel中維護一個weak_ptr(tie_),將這個shared_ptr對象賦值給_tie,引用計數仍然為1。當連接關閉時,在handleEvent中,將tie_提升,得到一個shard_ptr對象,引用計數就變成了2。當shared_ptr的計數不為0時,TcpConnection不會被銷毀。
TcpConnection.h
class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this{ public: /// Constructs a TcpConnection with a connected sockfd /// /// User should not create this object. TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); ~TcpConnection(); EventLoop* getLoop() const { return loop_; } const string& name() const { return name_; } const InetAddress& localAddress() { return localAddr_; } const InetAddress& peerAddress() { return peerAddr_; } bool connected() const { return state_ == kConnected; } void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } /// Internal use only. void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; } // called when TcpServer accepts a new connection void connectEstablished(); // should be called only once // called when TcpServer has removed me from its map void connectDestroyed(); // should be called only once private: enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; void handleRead(Timestamp receiveTime); void handleClose(); void handleError(); void setState(StateE s) { state_ = s; } EventLoop* loop_; // 所屬EventLoop string name_; // 連接名 StateE state_; // FIXME: use atomic variable // we don't expose those classes to client. boost::scoped_ptr socket_; boost::scoped_ptr channel_; InetAddress localAddr_; InetAddress peerAddr_; ConnectionCallback connectionCallback_; MessageCallback messageCallback_; CloseCallback closeCallback_; }; typedef boost::shared_ptr TcpConnectionPtr; }
TcpConnection.cc
TcpConnection::TcpConnection(EventLoop* loop, const string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_(CHECK_NOTNULL(loop)), name_(nameArg), state_(kConnecting), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), localAddr_(localAddr), peerAddr_(peerAddr)/*, highWaterMark_(64*1024*1024)*/ { // 通道可讀事件到來的時候,回調TcpConnection::handleRead,_1是事件發生時間 channel_->setReadCallback( boost::bind(&TcpConnection::handleRead, this, _1)); // 連接關閉,回調TcpConnection::handleClose channel_->setCloseCallback( boost::bind(&TcpConnection::handleClose, this)); // 發生錯誤,回調TcpConnection::handleError channel_->setErrorCallback( boost::bind(&TcpConnection::handleError, this)); LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this << " fd=" << sockfd; socket_->setKeepAlive(true); } TcpConnection::~TcpConnection() { LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this << " fd=" << channel_->fd(); } void TcpConnection::connectEstablished() { loop_->assertInLoopThread(); assert(state_ == kConnecting); setState(kConnected); LOG_TRACE << "[3] usecount=" << shared_from_this().use_count(); channel_->tie(shared_from_this()); channel_->enableReading(); // TcpConnection所對應的通道加入到Poller關注 connectionCallback_(shared_from_this()); LOG_TRACE << "[4] usecount=" << shared_from_this().use_count(); } void TcpConnection::connectDestroyed() { loop_->assertInLoopThread(); if (state_ == kConnected) { setState(kDisconnected); channel_->disableAll(); connectionCallback_(shared_from_this()); } channel_->remove(); } void TcpConnection::handleRead(Timestamp receiveTime) { /* loop_->assertInLoopThread(); int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } */ loop_->assertInLoopThread(); int savedErrno = 0; char buf[65536]; ssize_t n = ::read(channel_->fd(), buf, sizeof buf); if (n > 0) { messageCallback_(shared_from_this(), buf, n); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); }
Channel中對tie_的處理:
void Channel::handleEvent(Timestamp receiveTime) { boost::shared_ptrguard; if (tied_) { guard = tie_.lock(); if (guard) { LOG_TRACE << "[6] usecount=" << guard.use_count(); handleEventWithGuard(receiveTime); LOG_TRACE << "[12] usecount=" << guard.use_count(); } } else { handleEventWithGuard(receiveTime); } }