evpp网络库代码分析(一)

    技术2026-02-14  16

            evpp是奇虎360内部使用的开源多线程网络库,集tcp/udp/http多种协议的服务器和客户端支持。github代码路径是:https://github.com/Qihoo360/evpp,可以不依赖boost库,使用现代c++14语言(evpp/invoke_timer.cc的lambda表达式使用到了c++14的特性)进行编码。本项目高度参考了muduo网络库(https://github.com/chenshuo/muduo),而底层使用现成的libevent库作为事件驱动库,典型的一个reactor网络编程模式的例子,本文就是通过分析evpp源码来达到学习c++网络编程的效果。

            muduo代码我也拜读过,muduo有个特点,它完全是为linux而写的,不支持windows,譬如里面用到了eventfd,timerfd以及epoll等,都是linux系统特有的,而且还跟linux版本有关,系统版本太低也不支持,譬如eventfd等,而evpp做了一定的平台兼容性,得益于libevent库,能一定程度做到支持windows平台。另外,muduo有个base库,是重复造轮子了,其他还好,也是一个不可多得的多线程网络服务器编程demo,值得参考,而evpp没有像muduo那样重新实现一套基础库(如线程库、互斥锁、条件变量等),而是直接利用了c++11/c++14自带的std::thread、std::mutex等,相对通用很多,而且学习这些类库使用在其他项目也能用得着,毕竟是c++的标准类库。

            摘抄了github上的说明,大家可以点进去前文给出的github路径阅读Readme:

            我们给出基础的TCP服务器和客户端的例子,http的例子请自行阅读。

    TCP服务器端:

    server.cc #include <evpp/tcp_server.h> #include <evpp/buffer.h> #include <evpp/tcp_conn.h> void OnConnection(const evpp::TCPConnPtr& conn) { if (conn->IsConnected()) { conn->SetTCPNoDelay(true); } } void OnMessage(const evpp::TCPConnPtr& conn, evpp::Buffer* msg) { conn->Send(msg); } int main(int argc, char* argv[]) { std::string addr = "0.0.0.0:9099"; int thread_num = 4; if (argc != 1 && argc != 3) { printf("Usage: %s <port> <thread-num>\n", argv[0]); printf(" e.g: %s 9099 12\n", argv[0]); return 0; } if (argc == 3) { addr = std::string("0.0.0.0:") + argv[1]; thread_num = atoi(argv[2]); } evpp::EventLoop loop; evpp::TCPServer server(&loop, addr, "TCPPingPongServer", thread_num); server.SetMessageCallback(&OnMessage); server.SetConnectionCallback(&OnConnection); server.Init(); server.Start(); loop.Run(); return 0; }

    客户端:

    client_fixed_size.cc // Modified from https://github.com/chenshuo/muduo/blob/master/examples/pingpong/client.cc // // Every time, we need to receive the whole message and then we can send the next one. // #include <iostream> #include <evpp/tcp_client.h> #include <evpp/event_loop_thread_pool.h> #include <evpp/buffer.h> #include <evpp/tcp_conn.h> class Client; class Session { public: Session(evpp::EventLoop* loop, const std::string& serverAddr/*ip:port*/, const std::string& name, size_t block_size, Client* owner) : client_(loop, serverAddr, name), owner_(owner), bytes_read_(0), bytes_written_(0), messages_read_(0), block_size_(block_size) { client_.SetConnectionCallback( std::bind(&Session::OnConnection, this, std::placeholders::_1)); client_.SetMessageCallback( std::bind(&Session::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } void Start() { client_.Connect(); } void Stop() { client_.Disconnect(); } int64_t bytes_read() const { return bytes_read_; } int64_t messages_read() const { return messages_read_; } private: void OnConnection(const evpp::TCPConnPtr& conn); void OnMessage(const evpp::TCPConnPtr& conn, evpp::Buffer* buf) { ++messages_read_; while (buf->size() >= block_size_) { bytes_read_ += block_size_; bytes_written_ += block_size_; conn->Send(buf->data(), block_size_); buf->Skip(block_size_); } } private: evpp::TCPClient client_; Client* owner_; int64_t bytes_read_; int64_t bytes_written_; int64_t messages_read_; size_t block_size_; }; class Client { public: Client(evpp::EventLoop* loop, const std::string& name, const std::string& serverAddr, // ip:port int blockSize, int sessionCount, int timeout_sec, int threadCount) : loop_(loop), name_(name), session_count_(sessionCount), timeout_(timeout_sec), connected_count_(0) { loop->RunAfter(evpp::Duration(double(timeout_sec)), std::bind(&Client::HandleTimeout, this)); tpool_.reset(new evpp::EventLoopThreadPool(loop, threadCount)); tpool_->Start(true); for (int i = 0; i < blockSize; ++i) { message_.push_back(static_cast<char>(i % 128)); } for (int i = 0; i < sessionCount; ++i) { char buf[32]; snprintf(buf, sizeof buf, "C%05d", i); Session* session = new Session(tpool_->GetNextLoop(), serverAddr, buf, blockSize, this); session->Start(); sessions_.push_back(session); } } ~Client() { } const std::string& message() const { return message_; } void OnConnect() { if (++connected_count_ == session_count_) { std::cout << "all connected" << std::endl; } } void OnDisconnect(const evpp::TCPConnPtr& conn) { if (--connected_count_ == 0) { std::cout << "all disconnected" << std::endl; int64_t totalBytesRead = 0; int64_t totalMessagesRead = 0; for (auto &it : sessions_) { totalBytesRead += it->bytes_read(); totalMessagesRead += it->messages_read(); } std::cout << "name=" << name_ << " " << totalBytesRead << " total bytes read" << std::endl; std::cout << "name=" << name_ << " " << totalMessagesRead << " total messages read" << std::endl; std::cout << "name=" << name_ << " " << static_cast<double>(totalBytesRead) / static_cast<double>(totalMessagesRead) << " average message size" << std::endl; std::cout << "name=" << name_ << " " << static_cast<double>(totalBytesRead) / (timeout_ * 1024 * 1024) << " MiB/s throughput" << std::endl; loop_->QueueInLoop(std::bind(&Client::Quit, this)); } } private: void Quit() { tpool_->Stop(); loop_->Stop(); for (auto &it : sessions_) { delete it; } sessions_.clear(); while (!tpool_->IsStopped() || !loop_->IsStopped()) { std::this_thread::sleep_for(std::chrono::seconds(1)); } tpool_.reset(); } void HandleTimeout() { std::cout << "stop" << std::endl; for (auto &it : sessions_) { it->Stop(); } } private: evpp::EventLoop* loop_; std::string name_; std::shared_ptr<evpp::EventLoopThreadPool> tpool_; int session_count_; int timeout_; std::vector<Session*> sessions_; std::string message_; std::atomic<int> connected_count_; }; void Session::OnConnection(const evpp::TCPConnPtr& conn) { if (conn->IsConnected()) { conn->SetTCPNoDelay(true); conn->Send(owner_->message()); owner_->OnConnect(); } else { owner_->OnDisconnect(conn); } } int main(int argc, char* argv[]) { if (argc != 7) { fprintf(stderr, "Usage: client <host_ip> <port> <threads> <blocksize> <sessions> <time_seconds>\n"); return -1; } const char* ip = argv[1]; uint16_t port = static_cast<uint16_t>(atoi(argv[2])); int threadCount = atoi(argv[3]); int blockSize = atoi(argv[4]); int sessionCount = atoi(argv[5]); int timeout = atoi(argv[6]); evpp::EventLoop loop; std::string serverAddr = std::string(ip) + ":" + std::to_string(port); Client client(&loop, argv[0], serverAddr, blockSize, sessionCount, timeout, threadCount); loop.Run(); return 0; }

            server.cc很简单,收到什么就把返回什么,类似echo服务器,而client_fixed_size.cc比较复杂,下发一大段数据,收到服务器返回的数据后测量平均时间,pingpang性能测试,源码在evpp/benchmark/throughput/evpp/下。看得出来类的使用跟muduo如出一辙,也是抛弃c++的继承,积极使用"std::function+std::bind"或lambda来注册回调函数。作者也是致敬了一番muduo库。

    可以在ubuntu下开启两个shell终端,分别运行:

    ./server 9099 200

    以及

    ./client_fixed_size 127.0.0.1 9099 150 512 100 10 #all disconnected #name=./client_fixed_size 357193728 total bytes read #name=./client_fixed_size 697644 total messages read #name=./client_fixed_size 512 average message size #name=./client_fixed_size 34.0646 MiB/s throughput

            从上面的客户端和服务器代码可以看到,evpp封装了两个比较常用的接口:SetConnectionCallback和SetMessageCallback。这两个回调所要处理的就是所谓的“应用层”,而库是底层,在写应用时,无需关注底层。也就是说evpp库分离了“用户逻辑”与“繁琐的tcp socket读写操作”。linux内核也是大量使用这种注册回调到框架的思路,以此实现“面向对象”和“机制与策略分离”。

    1,当TCPClient建立连接或存在的连接断开或建立连接失败时,就会回调我们注册的回调函数:

    // Set a connection event relative callback when the TCPClient // establishes a connection or an exist connection breaks down or failed to establish a connection. // When these three events happened, the value of the parameter in the callback is: // 1. Successfully establish a connection : TCPConn::IsConnected() == true // 2. An exist connection broken down : TCPConn::IsDisconnecting() == true // 3. Failed to establish a connection : TCPConn::IsDisconnected() == true and TCPConn::fd() == -1 void TCPClient::SetConnectionCallback(const ConnectionCallback& cb); // Set the message callback to handle the messages from remote server void TCPClient::SetMessageCallback(const MessageCallback& cb);

    2,当TCPServer收到新连接或现有连接中断时,就会回调我们注册的回调函数:

    // Set a connection event relative callback when the TCPServer // receives a new connection or an exist connection breaks down. // When these two events happened, the value of the parameter in the callback is: // 1. Received a new connection : TCPConn::IsConnected() == true // 2. An exist connection broken down : TCPConn::IsDisconnecting() == true void TCPServer::SetConnectionCallback(const ConnectionCallback& cb); // Set the message callback to handle the messages from remote client void TCPServer::SetMessageCallback(MessageCallback cb);

            通俗地认为,SetConnectionCallback是处理刚连上或者掉线时的业务,譬如可以在连接上后保存evpp::TCPConnPtr(智能指针),断开连接就reset该TCPConnPtr,使其变为nullptr,用于后续send数据,参考evpp/examples/chatroom/simple底下的简易聊天app:client.cc、codec.h和server.cc:

    //参考evpp/examples/chatroom/simple/client.cc std::mutex mutex_; evpp::TCPConnPtr connection_; ... ... void OnConnection(const evpp::TCPConnPtr& conn) { LOG_INFO << conn->AddrToString() << " is " << (conn->IsConnected() ? "UP" : "DOWN"); std::lock_guard<std::mutex> lock(mutex_); if (conn->IsConnected()) { connection_ = conn; } else { connection_.reset(); } } void Write(const evpp::Slice& message) { std::lock_guard<std::mutex> lock(mutex_); if (connection_) { evpp::Buffer buf; buf.Append(message.data(), message.size()); buf.PrependInt32(message.size()); connection_->Send(&buf); } } ... ... std::string line = "hello world"; Write(line);

            而SetMessageCallback则是用于处理接收对端的消息,数据能从evpp::Buffer类中得到,由于使用了reactor模式,这些回调函数都不能阻塞,想要延时一段时间处理的话,不能直接使用sleep(),可以注册超时时间,再比如处理一些会阻塞的任务时,可以自己做一个“任务队列+线程池”,不过这些都是编程手段,跟evpp库无关。

            TCPClient类TCPServer类怎么使用呢?

    作者在tcp_client.h里写得很清楚: 1.创建一个TCPClient对象 2.使用SetConnectionCallback()和SetMessageCallback()设置回调函数 3.调用TCPClient::Connect()去尝试跟远端的服务器建立一个tcp连接 4.使用TCPClient::Send去发送消息 5.在回调函数中处理"连接"和"消息" 6.想要关闭连接,只需调用TCPClient::Disonnect()

    而在tcp_server.h上也有描述: 1.创建一个TCPServer对象 2.使用SetConnectionCallback()和SetMessageCallback()设置回调函数 3.调用TCPServer::Init() 4.调用TCPServer::Start() 5.在回调函数中处理"连接"和"消息" 6.最后想要关闭服务,只需调用Server::Stop()

            上面列出的使用方法,可以在evpp/examples/*下找例子进行印证!

            在分析代码前,要先学会怎么使用!

     

    Processed: 0.010, SQL: 9