歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux基礎 >> 關於Linux

Muduo網絡庫源碼分析(六)TcpConnection 的生存期管理

TcpConnection是使用shared_ptr來管理的類,因為它的生命周期模糊。TcpConnection表示已經建立或正在建立的連接,建立連接後,用戶只需要在上層類如TcpServer中設置連接到來和消息到來的處理函數,繼而回調TcpConnection中的 setConnectionCallback和setMessageCallback函數,實現對事件的處理。用戶需要關心的事件是有限的,其他都由網絡庫負責。

TcpConnection中封裝了InputBuffer和OutputBuffer,用來表示應用層的緩沖區。在發送數據時,如果不能一次將Buffer中的數據發送完畢,它還會繼續關注Channel中的可寫事件,當sockfd可寫時,會再次發送。

前面提到TcpConnection的生存期模糊,主要是因為我們不能在TcpServer中直接erase掉TcpConnection對象,因為此時有可能Channel中的handleEvent還在執行,如果析構TcpConnection對象,那麼他的成員channel_也會被析構,會導致core dump。也就是說我們需要TcpConnection 對象生存期要長於handleEvent() 函數,直到執行完connectDestroyed() 後才會析構。

斷開連接:
TcpConnection的斷開是采用被動方式,即對方先關閉連接,本地read(2)返回0後,調用順序如下:
handleClose()->TcpServer::removeConnection->TcpConnection::connectDestroyed()。

具體我們查看下面的連接關閉時序圖:

\

當連接到來,創建一個TcpConnection對象,立刻用shared_ptr來管理,引用計數為1,在Channel中維護一個weak_ptr(tie_),將這個shared_ptr對象賦值給_tie,引用計數仍然為1。當連接關閉時,在handleEvent中,將tie_提升,得到一個shard_ptr對象,引用計數就變成了2。當shared_ptr的計數不為0時,TcpConnection不會被銷毀。

TcpConnection.h

 

class TcpConnection : boost::noncopyable,
                      public boost::enable_shared_from_this
{
 public:
  /// Constructs a TcpConnection with a connected sockfd
  ///
  /// User should not create this object.
  TcpConnection(EventLoop* loop,
                const string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);
  ~TcpConnection();

  EventLoop* getLoop() const { return loop_; }
  const string& name() const { return name_; }
  const InetAddress& localAddress() { return localAddr_; }
  const InetAddress& peerAddress() { return peerAddr_; }
  bool connected() const { return state_ == kConnected; }

  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }

  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }

  /// Internal use only.
  void setCloseCallback(const CloseCallback& cb)
  { closeCallback_ = cb; }

  // called when TcpServer accepts a new connection
  void connectEstablished();   // should be called only once
  // called when TcpServer has removed me from its map
  void connectDestroyed();  // should be called only once

 private:
  enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
  void handleRead(Timestamp receiveTime);
  void handleClose();
  void handleError();
  void setState(StateE s) { state_ = s; }

  EventLoop* loop_;			// 所屬EventLoop
  string name_;				// 連接名
  StateE state_;  // FIXME: use atomic variable
  // we don't expose those classes to client.
  boost::scoped_ptr socket_;
  boost::scoped_ptr channel_;
  InetAddress localAddr_;
  InetAddress peerAddr_;
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;
  CloseCallback closeCallback_;
};

typedef boost::shared_ptr TcpConnectionPtr;

}

 

TcpConnection.cc

 

TcpConnection::TcpConnection(EventLoop* loop,
                             const string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr)/*,
    highWaterMark_(64*1024*1024)*/
{
  // 通道可讀事件到來的時候,回調TcpConnection::handleRead,_1是事件發生時間
  channel_->setReadCallback(
      boost::bind(&TcpConnection::handleRead, this, _1));
  // 連接關閉,回調TcpConnection::handleClose
  channel_->setCloseCallback(
      boost::bind(&TcpConnection::handleClose, this));
  // 發生錯誤,回調TcpConnection::handleError
  channel_->setErrorCallback(
      boost::bind(&TcpConnection::handleError, this));
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  socket_->setKeepAlive(true);
}

TcpConnection::~TcpConnection()
{
  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
            << " fd=" << channel_->fd();
}

void TcpConnection::connectEstablished()
{
  loop_->assertInLoopThread();
  assert(state_ == kConnecting);
  setState(kConnected);
  LOG_TRACE << "[3] usecount=" << shared_from_this().use_count();
  channel_->tie(shared_from_this());
  channel_->enableReading();	// TcpConnection所對應的通道加入到Poller關注

  connectionCallback_(shared_from_this());
  LOG_TRACE << "[4] usecount=" << shared_from_this().use_count();
}

void TcpConnection::connectDestroyed()
{
  loop_->assertInLoopThread();
  if (state_ == kConnected)
  {
    setState(kDisconnected);
    channel_->disableAll();

    connectionCallback_(shared_from_this());
  }
  channel_->remove();
}

void TcpConnection::handleRead(Timestamp receiveTime)
{
  /*
  loop_->assertInLoopThread();
  int savedErrno = 0;
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
  */
  loop_->assertInLoopThread();
  int savedErrno = 0;
  char buf[65536];
  ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), buf, n);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
  

 

Channel中對tie_的處理:

 

void Channel::handleEvent(Timestamp receiveTime)
{
  boost::shared_ptr guard;
  if (tied_)
  {
    guard = tie_.lock();
    if (guard)
    {
      LOG_TRACE << "[6] usecount=" << guard.use_count();
      handleEventWithGuard(receiveTime);
	  LOG_TRACE << "[12] usecount=" << guard.use_count();
    }
  }
  else
  {
    handleEventWithGuard(receiveTime);
  }
}
Copyright © Linux教程網 All Rights Reserved