歡迎您光臨本站 註冊首頁

libx-msg-im-xsc 0.0.1 發布,X-MSG-IM 去中心化即時通信基礎設施

←手機掃碼閱讀     admin @ 2019-11-11 , reply:0

libx-msg-im-xsc是一個基於actor模型的單進程多線程併發通信伺服器框架. 它的目標是為上層應用提供一個高性能, 可測量, 并行無鎖, 網路透明, 全非同步的開發環境.

在X-MSG-IM系統中, 它為所有核心網元提供網路事務控制, 應用層消息處理, 透明的分散式信令跟蹤(調用鏈)能力.

既然是到了應用層, 提供的api自然也比較友好. 因此, 你可以很快速地在這些api上構建起一個併發伺服器, 看下面的例子:

伺服器:


#include "XmsgTcpLog.h"
#include "XmsgHttpLog.h"
#include "XmsgWebSocketLog.h"
#include "net-x-msg-im-auth.pb.h"

#define X_MSG_N2H_PRPC_BEFOR_AUTH(__MSGMGR__, __BEGIN__, __END__, __CB__)					(__MSGMGR__->reg(__BEGIN__::descriptor(), __END__::descriptor(), NULL, (void*)(__CB__), false));

static void x_msg_im_auth_simple(shared_ptr<XscChannel> channel, SptrXitp trans, shared_ptr<XmsgImAuthSimpleReq> req);

int main(int argc, char **argv)
{
	Log::setRecord();
	Xsc::init(); /* libxsc-cpp初始化. */
	//
	shared_ptr<XscTcpServer> tcpServer(new XscTcpServer("tcp-server", shared_ptr<XmsgTcpLog>(new XmsgTcpLog())));
	shared_ptr<XscTcpCfg> tcpCfg(new XscTcpCfg());
	tcpCfg->addr = "0.0.0.0:1224";
	if (!tcpServer->startup(tcpCfg) || !tcpServer->publish()) /* tcp伺服器啟動. */
		return EXIT_FAILURE;
	//
	shared_ptr<XscHttpServer> httpServer(new XscHttpServer("http-server", shared_ptr<XmsgHttpLog>(new XmsgHttpLog())));
	shared_ptr<XscHttpCfg> httpCfg(new XscHttpCfg());
	httpCfg->addr = "0.0.0.0:1225";
	if (!httpServer->startup(httpCfg) || !httpServer->publish()) /* http伺服器啟動. */
		return EXIT_FAILURE;
	//
	shared_ptr<XscWebSocketServer> webSocketServer(new XscWebSocketServer("web-socket-server", shared_ptr<XmsgWebSocketLog>(new XmsgWebSocketLog())));
	shared_ptr<XscWebSocketCfg> webSocketCfg(new XscWebSocketCfg());
	webSocketCfg->addr = "0.0.0.0:1226";
	if (!webSocketServer->startup(webSocketCfg) || !webSocketServer->publish()) /* websocket伺服器啟動. */
		return EXIT_FAILURE;
	//
	shared_ptr<XmsgImN2HMsgMgr> msgMgrTcp(new XmsgImN2HMsgMgr(tcpServer)); /* 伺服器上的消息管理器. */
	shared_ptr<XmsgImN2HMsgMgr> msgMgrHttp(new XmsgImN2HMsgMgr(httpServer));
	shared_ptr<XmsgImN2HMsgMgr> msgMgrWebSocket(new XmsgImN2HMsgMgr(webSocketServer));
	//
	X_MSG_N2H_PRPC_BEFOR_AUTH(msgMgrTcp, XmsgImAuthSimpleReq, XmsgImAuthSimpleRsp, x_msg_im_auth_simple /* 消息註冊. */)
	X_MSG_N2H_PRPC_BEFOR_AUTH(msgMgrHttp, XmsgImAuthSimpleReq, XmsgImAuthSimpleRsp, x_msg_im_auth_simple)
	X_MSG_N2H_PRPC_BEFOR_AUTH(msgMgrWebSocket, XmsgImAuthSimpleReq, XmsgImAuthSimpleRsp, x_msg_im_auth_simple)
	//
	Misc::hold();
	return EXIT_FAILURE;
}

/* 在這裡處理消息. */
void x_msg_im_auth_simple(shared_ptr<XscChannel> channel, SptrXitp trans, shared_ptr<XmsgImAuthSimpleReq> req)
{
	/**
	 *
	 * channel即網路通道, 這裡是客戶端連接.
	 *
	 * trans即network transaction, 一切消息都以事務開始, 以事務結束.
	 *
	 */
	thread t([trans, req]() /* 事務總是在channel歸屬的線程上開始, 卻可以在任意線程上結束. */
	{
		shared_ptr<XmsgImAuthSimpleRsp> rsp(new XmsgImAuthSimpleRsp());
		rsp->set_token("token");
		trans->end(rsp); /* 結束事務. */
	});
	t.detach();
}

客戶端-tcp


public static void main(String[] args) throws Exception
{
    XmsgImAuthSimpleReq.Builder req = XmsgImAuthSimpleReq.newBuilder();
    req.setUsr("usr");
    //
    XscProtoPdu pdu = new XscProtoPdu(); /* 基於xsc協議的pdu構造. */
    pdu.transm.indicator = 0x00;
    pdu.transm.trans = new XscProtoTransaction();
    pdu.transm.trans.trans = XscProto.XSC_TAG_TRANS_BEGIN;
    pdu.transm.trans.stid = 0x00112233;
    pdu.transm.trans.msg = XmsgImAuthSimpleReq.getDescriptor().getName();
    pdu.transm.trans.dat = req.build().toByteArray();
    //
    Socket sock = new Socket("127.0.0.1", 1224);
    sock.getOutputStream().write(pdu.bytes());
    byte by[] = new byte[0x200];
    int len = sock.getInputStream().read(by); /* 這裡很不嚴謹, 僅用於演示. */
    pdu = XscProtoPdu.decode(by, 0, len);
    Log.info("rsp: %s", Misc.pb2str(XmsgImAuthSimpleRsp.parseFrom(pdu.transm.trans.dat)));
}

客戶端-http


public static void main(String[] args) throws Exception
{
    XmsgImAuthSimpleReq.Builder req = XmsgImAuthSimpleReq.newBuilder();
    req.setUsr("usr");
    //
    HttpClient client = HttpClient.newBuilder().build();
    HttpRequest request = HttpRequest.newBuilder()//
            .uri(URI.create("http://127.0.0.1:1225/"))//
            .header("x-msg-name", XmsgImAuthSimpleReq.getDescriptor().getName())//
            .header("x-msg-dat", Crypto.base64enc(req.build().toByteArray()))//
            .build();
    HttpResponse<byte[]> rsp = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
    Log.info("rsp: %s", Misc.pb2str(XmsgImAuthSimpleRsp.parseFrom(XmsgImHttpRsp.parseFrom(rsp.body()).getDat())));
}

客戶端-websocket


public static void main(String[] args)
{
    Log.setRecord();
    var httpClient = HttpClient.newHttpClient();
    var wsCompletableFuture = httpClient.newWebSocketBuilder().buildAsync(URI.create("ws://127.0.0.1:1226"), new Listener()
    {
        public void onOpen(WebSocket ws)
        {
            XmsgImAuthSimpleReq.Builder req = XmsgImAuthSimpleReq.newBuilder();
            req.setUsr("usr");
            //
            XscProtoPdu pdu = new XscProtoPdu(); /* 基於xsc協議的pdu構造. */
            pdu.transm.indicator = 0x00;
            pdu.transm.trans = new XscProtoTransaction();
            pdu.transm.trans.trans = XscProto.XSC_TAG_TRANS_BEGIN;
            pdu.transm.trans.stid = 0x00112233;
            pdu.transm.trans.msg = XmsgImAuthSimpleReq.getDescriptor().getName();
            pdu.transm.trans.dat = req.build().toByteArray();
            //
            ws.sendBinary(ByteBuffer.wrap(pdu.bytes() /* 消息出棧. */), true);
            ws.request(1);
        }

        public CompletionStage<?> onBinary(WebSocket ws, ByteBuffer dat, boolean last)
        {
            byte by[] = new byte[dat.limit()];
            dat.get(by, 0, by.length);
            try
            {
                XscProtoPdu pdu = XscProtoPdu.decode(by, 0, by.length); /* 解析收到的響應位元組流. */
                Log.info("rsp: %s", Misc.pb2str(XmsgImAuthSimpleRsp.parseFrom(pdu.transm.trans.dat)));
            } catch (Exception e)
            {
                Log.error(Log.trace(e));
            }
            ws.request(1);
            return null;
        }

        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason)
        {
            Log.debug("web-socket channel closed");
            return null;
        }

        public void onError(WebSocket ws, Throwable error)
        {
            Log.debug("web-socket channel error occured: %s", Log.trace(error));
            ws.request(1);
        }
    });
    wsCompletableFuture.join();
    Misc.hold();
}

 

完整的例子在這裡:

x-msg-im-xsc-examples-cpp
x-msg-im-xsc-examples-java


[admin ]

來源:OsChina
連結:https://www.oschina.net/news/111235/libx-msg-im-xsc-0-0-1-released
libx-msg-im-xsc 0.0.1 發布,X-MSG-IM 去中心化即時通信基礎設施已經有166次圍觀

http://coctec.com/news/all/show-post-219226.html