把google protobuf service (rpc)跑在自己的网络传输组件上

@vrqq  May 7, 2018

我的环境:nanomsg 做网络传输,然后在上面跑 protobuf 3。
起因就是不想用gRPC,因为rpc再引入个包麻烦啊,何况protobuf内容简明易懂,直接用了何乐而不为啊。下文以nanomsg举例。。

看看流程

在protobuf里面,message规定的是一种struct,service规定的是一组function。(生成.pb.cc/.pb.h时候,他扩展自动生成了rpc框架的代码,还不错。)
nanomsg作为数据包的承载,无论是传rpc请求,还是传message,都是客户端把请求通过protobuf变成数据包,在nanomsg网络上扔给服务端,然后服务端通过ptorobuf解包。

我们从用法上想一下,假设现在有一个已经做好的rpc系统

  • 在客户端我们会写remote.add()这样的语句。一旦运行了这句话,就进入了由protobuf系统自动创造的代码里,所以我们只能把网络通信嵌在protobuf规定的代码里。
  • 在服务端我们期望的结构是:收包,然后调protobuf提供的函数调用接口,系统自动运行service.add(),但service.add()里面的代码是我们写的。

所以我们有下代码

client里面:

  1. 初始化
  2. 用户 calling stub.add()
  3. 由第二步进入protobuf自动生成的代码部分,系统会自动调rpcChannel->CallMethod(),我们在channel这里做手脚。
  4. 在第三步进入我们做手脚的代码中,我们完成网络发送接收工作。
  5. 第2~4步是block的。

server里面:

  1. 初始化
  2. 用户 naonmsg.recv(message) 接收调用信息
  3. 用户解包,并调Service->CallMethod()。(这个class Service是我们延续“protobuf生成的代码”二次开发的。)
  4. 把算好的值用nanomsg送回客户端。

看样例代码更简明易懂

由.proto文件生成 rpcInterface.pb.h/.cc

rpcInterface.proto

syntax = "proto3";
option cc_generic_services = true;
package myrpc;

message rpcRequest {
    int32 a = 1;
    int32 b = 2;
}
message rpcResponse {
    string result = 1;
}
service rpcService {
    rpc add(rpcRequest) returns (rpcResponse);
}

接下来是c++文件的一些说明

  • google::protobuf::Service 对应.proto里面的Service
  • google::protobuf::ServiceDescriptor 类似于Iterator,可以遍历service里面含的function()
  • google::protobuf::MethodDescriptor 对应.proto里面service{}内的function
  • google::protobuf::Message 对应.proto里面的Message,这是所有message的祖宗。(rpcRequest和rpcResponse都来自于它)

cpp看 '.proto文件' 详情的方法

// @input : The implement of my rpc servcie.
void showProtobufInfo(google::protobuf::Service *serv) {
    cout<<"::Protobuf service Info::"<<endl;
    const google::protobuf::ServiceDescriptor *s_descriptor = serv->GetDescriptor();
    for (int i=0; i<s_descriptor->method_count(); i++) {
        const google::protobuf::MethodDescriptor *method = s_descriptor->method(i);
        cout<<"method "<<i<<" => "<<method->full_name()<<endl;
        cout<<"\t Input: "<<serv->GetRequestPrototype(method).GetTypeName()<<endl;
        cout<<"\tOutput: "<<serv->GetResponsePrototype(method).GetTypeName()<<endl;
    }
    cout<<endl;
}

cpp客户端缩略代码

class rpcChannel : public google::protobuf::RpcChannel {
    void CallMethod(method, request, response) {
        nn_send({method.id+request}, response);
        nn_recv(response);
    };
};
int main() {
    nn_socket(AF_SP, NN_REQ);
    nn_connect(fd, "ipc://rpc_address.proto");
    myrpc::rpcService_Stub stub( new rpcChannel() );
    protobuf::Message ask={1,2},ans;
    stub.add(&ask, &ans);
    cout<<ans.result()<<endl;
    return 0;
}

cpp服务端缩略代码

class rpcWorker : public myrpc::rpcService {
    void add(request, response) {
        response->set_result(request.a + request.b);
    };
};
int main() {
    int fd = nn_socket(AF_SP, NN_REP);
    int ret = nn_bind(fd, "ipc://rpc_address.proto");
    ServiceDescriptor *descriptor = new rpcWorker()->GetDescriptor();
    while (true) {
        nn_recv(buffer);
        buffer.split_into(method_id, input_argument);
        MethodDescriptor *func = descriptor->method(method_id);
        descriptor->CallMethod( func, input_argument, result);
        nn_send(result);
    }
    return 0;
}

prog.jpg

完整代码见附件

How to build (mac平台可能不需要-pthread)

#!/bin/sh
protoc --cpp_out . rpcInterface.proto
g++ -std=c++14 server.cpp rpcInterface.pb.cc -o serv -lnanomsg -lprotobuf -pthread
g++ -std=c++14 client.cpp rpcInterface.pb.cc -o client -lnanomsg -lprotobuf -pthread

client.cpp
server.cpp
rpcInterface.proto

第二章 Closure

好用的东西从这里开始,建议先看一些bind的花式玩法
https://my.oschina.net/hevakelcj/blog/114440
http://www.cnblogs.com/ttss/p/4100917.html

上文中,closure从来就没用过,google文档中提到,我们可以在rpc运行前,或调用结束后调用它。就是一个回调函数bind了一下,也可以借此实现异步rpc,但更字面的一个功能是:一次远程调用,多次返回。
今晚看了baidu-sofa-pbrpc的源码(https://github.com/baidu/sofa-pbrpc),突然理解了它,其实很简单,待我一一说来。

  • 参照上文,可以见得上文的block-synchronize的rpc,仅仅占用nanomsg的一个“通道”(例如一个ipc文件)。
  • 从表面上看的现象是:服务端Done->Run()可以在任意时刻调用,当服务端调用以后,客户端的Closure里写的函数就得运行。
  • 所以我们得为“closure Done”单独开一条通路,下文假设为ipc://async.ipc
  • 干货开始了,在客户端的closure构造之初,客户端便监听ipc://async.ipc,看看有没有发来的消息。
  • 服务端myservice_bundle->CallMethod(method, nullptr, input_argument, result, IMPORTANT_closure);时候,在“IMPORTANT_closure”里面写网络发送函数,当Done->Run()运行时候,会执行IMPORTANT_closure的回调函数,回调函数的内容是“向ipc://async.ipc里面发包”。
  • 举例,一种实现方式的细节:发数据时候,我们可以从“某内存”读数据,巧妙运用bind嘛,把内存地址绑在上面,然后执行nn_send时候从“某内存”里面把东西读出来就好啦。
  • 回过头来我们看,写成Closure,而不直接是std::functional简直是太明智了!避开重复发包函数,还可以强行扩展,达到“一次rpc,n次返回”!(调用一次Done->Run()就返回一次呗)

添加新评论