歡迎您光臨本站 註冊首頁

C++編寫高性能服務器實例教程

←手機掃碼閱讀     madbeef @ 2020-06-12 , reply:0

我將展示如何使用現代C++編寫一個Echo服務器,相當於分佈式系統開發中的“Hello World”。這個服務器會將接收的消息直接返回。我們同時需要一個可以向我們的服務器發動消息的客戶端,在這裡可以發現客戶端的源碼。

Wangle是一個用來搭建事件驅動的現代異步C++服務的C/S應用框架。Wangle最基本的抽象概念就是Pipeline(管線)。能夠理解這種抽象,將會很容易寫出各種複雜的現代C++服務,另一個重要的概念是Service(服務),其可以看作一種更高級的Pipeline,不過超出了本文我們關注的範疇。

PipeLine

pipeline 是 Wangle 中最重要也是最強大的抽象,可以讓用戶在定製 request 和 response 的實現時擁有很大的自由。一個pipeline就是一系列request/response控制程序的嵌套。我試圖尋找一個真實世界中pipeline的類比,唯一我能想到的就是現實世界工廠中的生產線。一條生產線工作在一種順序模式下,所有的工人取得一個物體,並且只添加一種修改,再將其發送給上游的工人直到整個產品製造完成。這可能不是一個特別好的比喻,因為流水線上產品的流動是單向的,而一個pipeline能控制反方向的數據流動--就好像將成品分解成原材料。

一個Wangle handler可以同時掌控上游和下游的兩個方向的數據流動。當你把所有的handler連接在一起,就可以用一種靈活的方式將原始數據組裝為想要的數據類型或者將已有的數據拆分。

在我們的服務器的pipeline中大致將會有下面幾種handler:

1.Handler 1 (下文的上游下游是指對一同個handler而言,根據其在pipeline中的位置不同,輸入輸出相反) 上游:將從socket中接收的二進制數據流寫入一個零拷貝(zero-copy,指省略了Applicaion context和Kernel context之間的上下文切換,避免了CPU對Buffer的冗餘拷貝,直接在Kernel級別進行數據傳輸的技術,詳情請參閱維基百科)的字節緩存中,發送給handler2

下游:接收一個零拷貝的字節緩存,將其內容寫入socket中

2.Handler2 上游:接收handler1的緩存對象,解碼為一個string對象傳遞給handler3 下游:接收handler3的string對象,將其轉碼為一個零拷貝的字節緩存,發送給handler1

3.Handler3 上游:接收handler2中的string對象,再向下發送至pipeline等待寫回客戶端。string會發回handler2 下游:接收上游的string對象,傳遞給handler2

需要注意的一點是,每一個handler應當只做一件事並且只有一件,如果你有一個handler裡做了多項任務,比如從二進制流離直接解碼出string,那麼你需要學會將它拆分。這對提升代碼的可維護性和擴展性非常重要。

另外,沒錯,handler不是線程安全的,所以不要輕易的在其中使用任何沒有經過mutex,atomic lock保護的數據,如果你確實需要一個線程安全的環境,Folly提供了一種免於加鎖的數據結構, Folly依賴於Wangle,你可以很容易的在項目中引入並使用它。

如果你還不是很明白所有的步驟,不用著急,在看到下面的具體實現時你會更加清楚。

Echo Server

下面我會展示服務器的具體實現。我假定您已經安裝好Wangle。需要注意的是截至目前Wangle還不能在Mac OS上安裝,我建議您可以安裝虛擬機,使用Ubuntu來安裝Wangle。

這就是echo handler:接收一個string,打印到stdout中,再發送回pipeline。要注意write語句中的定界符不可以省略,因為pipeline會按照字節解碼。

// the main logic of our echo server; receives a string and writes it straight    // back    class EchoHandler : public HandlerAdapter {    public:     virtual void read(Context* ctx, std::string msg) override {     std::cout << "handling " << msg << std::endl;     write(ctx, msg + "rn");     }    };  

 

Echohandler其實是我們pipeline的最後一個handler,現在我們需要創建一個PipelineFactory來控制所有的request和response。

// where we define the chain of handlers for each messeage received    class EchoPipelineFactory : public PipelineFactory {    public:     EchoPipeline::Ptr newPipeline(std::shared_ptr sock) {     auto pipeline = EchoPipeline::create();     pipeline->addBack(AsyncSocketHandler(sock));     pipeline->addBack(LineBasedFrameDecoder(8192));     pipeline->addBack(StringCodec());     pipeline->addBack(EchoHandler());     pipeline->finalize();     return pipeline;     }    };  

 

pipeline中每一個handler的插入順序都需要嚴格注意,因為它們是按照先後排序的,此處我們有4個handler

1.AsyncSocketHandler: 上游:讀取scoket中的二進制流轉換成零拷貝字節緩存 下游:將字節緩存內容寫入底層socket

2. LineBasedFrameDecoder: 上游:接收字節緩存,按行分割數據 下游:將字節緩存發送給AsyncSocketHandler

3. StringCodec: 上游:接收字節緩存,解碼為std:string傳遞給EchoHandler 下游:接收std:string, 編碼為字節緩存,傳遞給LineBasedFrameDecoder

4. EchoHandler: 上游:接收std:string對象,將其寫入pipeline-將消息返回給Echohandler。 下游:接收一個std:string對象,轉發給StringCodec Handler。 現在我們所需要做的就是將pipeline factory關聯到ServerBootstrap,綁定一個端口,這樣我們已經完成了 基本上所有的工作。

#include#include#include#include#includeusing namespace folly;    using namespace wangle;        DEFINE_int32(port, 8080, "echo server port");        typedef PipelineEchoPipeline;        // the main logic of our echo server; receives a string and writes it straight    // back    class EchoHandler : public HandlerAdapter{    public:     virtual void read(Context* ctx, std::string msg) override {     std::cout << "handling " << msg << std::endl;     write(ctx, msg + "  ");     }    };        // where we define the chain of handlers for each messeage received    class EchoPipelineFactory : public PipelineFactory{    public:     EchoPipeline::Ptr newPipeline(std::shared_ptrsock) {     auto pipeline = EchoPipeline::create();     pipeline->addBack(AsyncSocketHandler(sock));     pipeline->addBack(LineBasedFrameDecoder(8192));     pipeline->addBack(StringCodec());     pipeline->addBack(EchoHandler());     pipeline->finalize();     return pipeline;     }    };        int main(int argc, char** argv) {     google::ParseCommandLineFlags(&argc, &argv, true);         ServerBootstrapserver;     server.childPipeline(std::make_shared());     server.bind(FLAGS_port);     server.waitForStop();         return 0;    }  

 

至此我們一共只寫了48行代碼就完成了一個高性能的異步C++服務器。

Echo Client

echo客戶端的實現與我們的服務端非常類似:

// the handler for receiving messages back from the server    class EchoHandler : public HandlerAdapter {    public:     virtual void read(Context* ctx, std::string msg) override {     std::cout << "received back: " << msg;     }     virtual void readException(Context* ctx, exception_wrapper e) override {     std::cout << exceptionStr(e) << std::endl;     close(ctx);     }     virtual void readEOF(Context* ctx) override {     std::cout << "EOF received :(" << std::endl;     close(ctx);     }    };  

 

注意我們重載了readException和readEOF兩個方法,還有其他一些方法可以被重載。如果你需要控制某個特別的事件,只需要重載對應的虛函數即可。

這是客戶端的pipeline factory的實現,與我們的服務端結構基本一致,只有EventBaseHandler這個handler在服務端代碼中不曾出現,它可以確保我們可以從任意一個線程寫入數據。

// the handler for receiving messages back from the server    class EchoHandler : public HandlerAdapter {    public:     virtual void read(Context* ctx, std::string msg) override {     std::cout << "received back: " << msg;     }     virtual void readException(Context* ctx, exception_wrapper e) override {     std::cout << exceptionStr(e) << std::endl;     close(ctx);     }     virtual void readEOF(Context* ctx) override {     std::cout << "EOF received :(" << std::endl;     close(ctx);     }    };  

 

客戶端所有的代碼如下圖所示

#include#include        #include#include#include#include#includeusing namespace folly;    using namespace wangle;        DEFINE_int32(port, 8080, "echo server port");    DEFINE_string(host, "::1", "echo server address");        typedef PipelineEchoPipeline;        // the handler for receiving messages back from the server    class EchoHandler : public HandlerAdapter {    public:     virtual void read(Context* ctx, std::string msg) override {     std::cout << "received back: " << msg;     }     virtual void readException(Context* ctx, exception_wrapper e) override {     std::cout << exceptionStr(e) << std::endl;     close(ctx);     }     virtual void readEOF(Context* ctx) override {     std::cout << "EOF received :(" << std::endl;     close(ctx);     }    };        // chains the handlers together to define the response pipeline    class EchoPipelineFactory : public PipelineFactory {    public:     EchoPipeline::Ptr newPipeline(std::shared_ptr sock) {     auto pipeline = EchoPipeline::create();     pipeline->addBack(AsyncSocketHandler(sock));     pipeline->addBack(     EventBaseHandler()); // ensure we can write from any thread     pipeline->addBack(LineBasedFrameDecoder(8192, false));     pipeline->addBack(StringCodec());     pipeline->addBack(EchoHandler());     pipeline->finalize();     return pipeline;     }    };        int main(int argc, char** argv) {     google::ParseCommandLineFlags(&argc, &argv, true);         ClientBootstrap client;     client.group(std::make_shared(1));     client.pipelineFactory(std::make_shared());     auto pipeline = client.connect(SocketAddress(FLAGS_host, FLAGS_port)).get();         try {     while (true) {     std::string line;     std::getline(std::cin, line);     if (line == "") {     break;     }         pipeline->write(line + "rn").get();     if (line == "bye") {     pipeline->close();     break;     }     }     } catch (const std::exception& e) {     std::cout << exceptionStr(e) << std::endl;     }         return 0;    }  

 

程序用一個While循環不斷監測用戶的輸入,並且依靠調用.get() 來同步等待一直到請求被響應。


[madbeef ] C++編寫高性能服務器實例教程已經有238次圍觀

http://coctec.com/docs/c/language/show-post-238223.html