1000字范文,内容丰富有趣,学习的好帮手!
1000字范文 > muduo学习笔记:net部分之实现TCP网络编程库-TcpClient

muduo学习笔记:net部分之实现TCP网络编程库-TcpClient

时间:2023-12-20 22:12:55

相关推荐

muduo学习笔记:net部分之实现TCP网络编程库-TcpClient

有了前文【muduo学习笔记:net部分之实现TCP网络编程库-Connector】的介绍,TcpClient的实现就不难了。muduo用TcpClient发起连接,TcpClient有一个Connector连接器,TCPClient使用Conneccor发起连接, 连接建立成功后, 用socket创建TcpConnection来管理连接, 每个TcpClient class只管理一个TcpConnecction,连接建立成功后设置相应的回调函数。很显然,TcpClient用来管理客户端连接,真正连接交给Connector。

它的代码与TcpServer甚至有几分相似,只不过TcpClient只管理一个TcpConnection。先谈几个要点:

TcpClient具备TcpConnection断开之后重新连接的功能,加上Connector具备反复尝试连接的功能,因此客户端和服务器的启动顺序无关紧要。可以先启动客户端,一旦服务器启动,半分钟之内即可恢复连接(由Connector::kMaxRetryDelayMs常数控制);再客户端运行期间服务器可以重启,客户端也会自动重连。连接断开后初次重试的延迟时间是随机的,比方说服务器崩溃,它所有的客户端连接同时断开,然后0.5s之后同时再次发起连接,这样既可能造成SYN丢包,也可能给服务器带来短期大负载,影响其服务质量。因此每个TcpClient应该等待一段随机的时间(0.5~2s),再重试,避免拥塞。发起连接的时候如果发生TCP SYN丢包,那么系统默认的重试间隔是3s,这期间不会返回错误码,而且这个间隔似乎不容易修改。如果需要缩短间隔,可以再用一个定时器,在0.5s或1s之后发起另一个链接。如果有需求的话,这个功能可以做到Connector中。

1、TcpClient 定义

TcpClient使用Conneccor发起连接, 连接建立成功后,用socket创建TcpConnection来管理连接, 每个TcpClient class只管理一个TcpConnecction。

class TcpClient : noncopyable{public:// TcpClient(EventLoop* loop);// TcpClient(EventLoop* loop, const string& host, uint16_t port);TcpClient(EventLoop* loop,const InetAddress& serverAddr,const string& nameArg);~TcpClient(); // force out-line dtor, for std::unique_ptr members.void connect();void disconnect();void stop();TcpConnectionPtr connection() const{MutexLockGuard lock(mutex_);return connection_;}EventLoop* getLoop() const {return loop_; }bool retry() const {return retry_; }void enableRetry() {retry_ = true; }const string& name() const {return name_; }/// Set connection callback./// Not thread safe.void setConnectionCallback(ConnectionCallback cb) {connectionCallback_ = std::move(cb); }/// Set message callback./// Not thread safe.void setMessageCallback(MessageCallback cb) {messageCallback_ = std::move(cb); }/// Set write complete callback./// Not thread safe.void setWriteCompleteCallback(WriteCompleteCallback cb) {writeCompleteCallback_ = std::move(cb); }private:/// Not thread safe, but in loopvoid newConnection(int sockfd);/// Not thread safe, but in loopvoid removeConnection(const TcpConnectionPtr& conn);EventLoop* loop_;// 所属的EvenetLoopConnectorPtr connector_; // 使用Connector智能指针,避免头文件引入const string name_;// 连接的名字ConnectionCallback connectionCallback_;// 建立连接的回调函数MessageCallback messageCallback_;// 消息到来的回调函数WriteCompleteCallback writeCompleteCallback_; // 数据发送完毕回调函数bool retry_; // atomic// 连接断开后是否重连bool connect_; // atomic// always in loop threadint nextConnId_;// name_+nextConnId_ 用于标识一个连接mutable MutexLock mutex_;TcpConnectionPtr connection_ GUARDED_BY(mutex_);};

2、TcpClient 实现

构造TcpClient时,初始化Connector并注册事件回调,用于与服务端的通信,并设置连接成功的回调函数。

2.1、构造函数

初始化列表种创建一个Connector,用于连接服务器,若连接成功,调用TcpClient::newConnection()回调函数。

TcpClient::TcpClient(EventLoop* loop,const InetAddress& serverAddr,const string& nameArg): loop_(CHECK_NOTNULL(loop)),connector_(new Connector(loop, serverAddr)),// 创建一个Connectorname_(nameArg),connectionCallback_(defaultConnectionCallback),messageCallback_(defaultMessageCallback),retry_(false), // 默认不重连connect_(true), // 开始连接nextConnId_(1) // 当前连接的序号{// 设置建立连接的回调函数connector_->setNewConnectionCallback(std::bind(&TcpClient::newConnection, this, _1));// FIXME setConnectFailedCallbackLOG_INFO << "TcpClient::TcpClient[" << name_ << "] - connector " << get_pointer(connector_);}

2.2、建立连接、断开连接

connect()函数调用Connector::start(),发起连接。

void TcpClient::connect(){// FIXME: check stateLOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to " << connector_->serverAddress().toIpPort();connect_ = true;connector_->start(); // 调用Connector::start,发起连接}

2.3、连接建立成功

若连接成功,则回调TcpClient::newConnection()函数。 参数是本地已建立连接的sockfd,通过它创建一个TcpConnection,用于后续消息的发送。

void TcpClient::newConnection(int sockfd){loop_->assertInLoopThread();InetAddress peerAddr(sockets::getPeerAddr(sockfd)); // 获取对端的地址char buf[32];snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf; // 连接的名字InetAddress localAddr(sockets::getLocalAddr(sockfd)); // 获取本段的地址// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessary// 构造一个TcpConnection对象,并设置相应的回调函数TcpConnectionPtr conn(new TcpConnection(loop_,connName,sockfd,localAddr,peerAddr));conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe{MutexLockGuard lock(mutex_);connection_ = conn; // 保存到成员变量}conn->connectEstablished(); // 注册到Poller,监听IO事件}

2.4、断开连接disconnect()、关闭连接stop()

断开连接,仅关闭写功能,仍然能接收对端消息。

void TcpClient::disconnect(){connect_ = false;{MutexLockGuard lock(mutex_);if (connection_) {connection_->shutdown(); // 半关闭, 能继续完整接收对端的消息}}}

关闭连接,则将完全关闭客户端,不能再进行收、发数据。

void TcpClient::stop(){connect_ = false;connector_->stop();}

2.5、析构

TcpClient::~TcpClient(){LOG_INFO << "TcpClient::~TcpClient[" << name_ << "] - connector " << get_pointer(connector_);TcpConnectionPtr conn;bool unique = false;{MutexLockGuard lock(mutex_);unique = connection_.unique(); // 是否只有一个持有者conn = connection_;}if (conn) // 连接已经建立成功,TcpConnectionPtr 不为空{assert(loop_ == conn->getLoop());// FIXME: not 100% safe, if we are in different threadCloseCallback cb = std::bind(&detail::removeConnection, loop_, _1);loop_->runInLoop(std::bind(&TcpConnection::setCloseCallback, conn, cb));if (unique){conn->forceClose();}}else{// TcpConnectionPtr为空connector_->stop(); // 关闭Connector连接// FIXME: HACKloop_->runAfter(1, std::bind(&detail::removeConnector, connector_)); }}

3、测试

EchoClient

#include <muduo/net/TcpClient.h>#include <muduo/base/Logging.h>#include <muduo/base/Thread.h>#include <muduo/net/EventLoop.h>#include <muduo/net/InetAddress.h>#include <utility>#include <stdio.h>#include <unistd.h>using namespace muduo;using namespace muduo::net;int numThreads = 0;class EchoClient;std::vector<std::unique_ptr<EchoClient>> clients;int current = 0;class EchoClient : noncopyable{public:EchoClient(EventLoop* loop, const InetAddress& listenAddr, const string& id): loop_(loop),client_(loop, listenAddr, "EchoClient"+id){client_.setConnectionCallback(std::bind(&EchoClient::onConnection, this, _1));client_.setMessageCallback(std::bind(&EchoClient::onMessage, this, _1, _2, _3));//client_.enableRetry();}void connect(){client_.connect();}// void stop();private:void onConnection(const TcpConnectionPtr& conn){LOG_TRACE << conn->localAddress().toIpPort() << " -> "<< conn->peerAddress().toIpPort() << " is "<< (conn->connected() ? "UP" : "DOWN");if (conn->connected()) {++current;if (implicit_cast<size_t>(current) < clients.size()) {clients[current]->connect();}LOG_INFO << "*** connected " << current;}conn->send("world\n");}void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time){string msg(buf->retrieveAllAsString());LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toString();if (msg == "quit\n") {conn->send("bye\n");conn->shutdown();}else if (msg == "shutdown\n") {loop_->quit();}else {conn->send(msg);}}EventLoop* loop_;TcpClient client_;};int main(int argc, char* argv[]){argc = 2;argv[1] = const_cast<char*>("localhost");LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();if (argc > 1){EventLoop loop;bool ipv6 = argc > 3;InetAddress serverAddr(argv[1], 2000, ipv6);int n = 1;if (argc > 2){n = atoi(argv[2]);}clients.reserve(n);for (int i = 0; i < n; ++i){char buf[32];snprintf(buf, sizeof buf, "%d", i+1);clients.emplace_back(new EchoClient(&loop, serverAddr, buf));}clients[current]->connect();loop.loop();}else{printf("Usage: %s host_ip [current#]\n", argv[0]);}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。