TcpConnection中封裝了InputBuffer和OutputBuffer,用來表示應用層的緩沖區。在發送數據時,如果不能一次將Buffer中的數據發送完畢,它還會繼續關注Channel中的可寫事件,當sockfd可寫時,會再次發送。
前面提到TcpConnection的生存期模糊,主要是因為我們不能在TcpServer中直接erase掉TcpConnection對象,因為此時有可能Channel中的handleEvent還在執行,如果析構TcpConnection對象,那麼他的成員channel_也會被析構,會導致core dump。也就是說我們需要TcpConnection 對象生存期要長於handleEvent() 函數,直到執行完connectDestroyed() 後才會析構。
斷開連接:
TcpConnection的斷開是采用被動方式,即對方先關閉連接,本地read(2)返回0後,調用順序如下:
handleClose()->TcpServer::removeConnection->TcpConnection::connectDestroyed()。
具體我們查看下面的連接關閉時序圖:

當連接到來,創建一個TcpConnection對象,立刻用shared_ptr來管理,引用計數為1,在Channel中維護一個weak_ptr(tie_),將這個shared_ptr對象賦值給_tie,引用計數仍然為1。當連接關閉時,在handleEvent中,將tie_提升,得到一個shard_ptr對象,引用計數就變成了2。當shared_ptr的計數不為0時,TcpConnection不會被銷毀。
TcpConnection.h
class TcpConnection : boost::noncopyable,
public boost::enable_shared_from_this
{
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop* loop,
const string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();
EventLoop* getLoop() const { return loop_; }
const string& name() const { return name_; }
const InetAddress& localAddress() { return localAddr_; }
const InetAddress& peerAddress() { return peerAddr_; }
bool connected() const { return state_ == kConnected; }
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
/// Internal use only.
void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }
// called when TcpServer accepts a new connection
void connectEstablished(); // should be called only once
// called when TcpServer has removed me from its map
void connectDestroyed(); // should be called only once
private:
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
void handleRead(Timestamp receiveTime);
void handleClose();
void handleError();
void setState(StateE s) { state_ = s; }
EventLoop* loop_; // 所屬EventLoop
string name_; // 連接名
StateE state_; // FIXME: use atomic variable
// we don't expose those classes to client.
boost::scoped_ptr socket_;
boost::scoped_ptr channel_;
InetAddress localAddr_;
InetAddress peerAddr_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
CloseCallback closeCallback_;
};
typedef boost::shared_ptr TcpConnectionPtr;
}
TcpConnection.cc
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr)/*,
highWaterMark_(64*1024*1024)*/
{
// 通道可讀事件到來的時候,回調TcpConnection::handleRead,_1是事件發生時間
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this, _1));
// 連接關閉,回調TcpConnection::handleClose
channel_->setCloseCallback(
boost::bind(&TcpConnection::handleClose, this));
// 發生錯誤,回調TcpConnection::handleError
channel_->setErrorCallback(
boost::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}
TcpConnection::~TcpConnection()
{
LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this
<< " fd=" << channel_->fd();
}
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
LOG_TRACE << "[3] usecount=" << shared_from_this().use_count();
channel_->tie(shared_from_this());
channel_->enableReading(); // TcpConnection所對應的通道加入到Poller關注
connectionCallback_(shared_from_this());
LOG_TRACE << "[4] usecount=" << shared_from_this().use_count();
}
void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this());
}
channel_->remove();
}
void TcpConnection::handleRead(Timestamp receiveTime)
{
/*
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
*/
loop_->assertInLoopThread();
int savedErrno = 0;
char buf[65536];
ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
if (n > 0)
{
messageCallback_(shared_from_this(), buf, n);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
Channel中對tie_的處理:
void Channel::handleEvent(Timestamp receiveTime)
{
boost::shared_ptr guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
LOG_TRACE << "[6] usecount=" << guard.use_count();
handleEventWithGuard(receiveTime);
LOG_TRACE << "[12] usecount=" << guard.use_count();
}
}
else
{
handleEventWithGuard(receiveTime);
}
}