我的环境:nanomsg 做网络传输,然后在上面跑 protobuf 3。
起因就是不想用gRPC,因为rpc再引入个包麻烦啊,何况protobuf内容简明易懂,直接用了何乐而不为啊。下文以nanomsg举例。。
看看流程
在protobuf里面,message规定的是一种struct,service规定的是一组function。(生成.pb.cc/.pb.h时候,他扩展自动生成了rpc框架的代码,还不错。)
nanomsg作为数据包的承载,无论是传rpc请求,还是传message,都是客户端把请求通过protobuf变成数据包,在nanomsg网络上扔给服务端,然后服务端通过ptorobuf解包。
我们从用法上想一下,假设现在有一个已经做好的rpc系统
- 在客户端我们会写
remote.add()
这样的语句。一旦运行了这句话,就进入了由protobuf系统自动创造的代码里,所以我们只能把网络通信嵌在protobuf规定的代码里。 - 在服务端我们期望的结构是:收包,然后调protobuf提供的函数调用接口,系统自动运行
service.add()
,但service.add()
里面的代码是我们写的。
所以我们有下代码
client里面:
- 初始化
- 用户 calling stub.add()
- 由第二步进入protobuf自动生成的代码部分,系统会自动调rpcChannel->CallMethod(),我们在channel这里做手脚。
- 在第三步进入我们做手脚的代码中,我们完成网络发送接收工作。
- 第2~4步是block的。
server里面:
- 初始化
- 用户 naonmsg.recv(message) 接收调用信息
- 用户解包,并调Service->CallMethod()。(这个class Service是我们延续“protobuf生成的代码”二次开发的。)
- 把算好的值用nanomsg送回客户端。
看样例代码更简明易懂
由.proto文件生成 rpcInterface.pb.h/.cc
rpcInterface.proto
syntax = "proto3";
option cc_generic_services = true;
package myrpc;
message rpcRequest {
int32 a = 1;
int32 b = 2;
}
message rpcResponse {
string result = 1;
}
service rpcService {
rpc add(rpcRequest) returns (rpcResponse);
}
接下来是c++文件的一些说明
google::protobuf::Service
对应.proto里面的Servicegoogle::protobuf::ServiceDescriptor
类似于Iterator,可以遍历service里面含的function()google::protobuf::MethodDescriptor
对应.proto里面service{}内的functiongoogle::protobuf::Message
对应.proto里面的Message,这是所有message的祖宗。(rpcRequest和rpcResponse都来自于它)
cpp看 '.proto文件' 详情的方法
// @input : The implement of my rpc servcie.
void showProtobufInfo(google::protobuf::Service *serv) {
cout<<"::Protobuf service Info::"<<endl;
const google::protobuf::ServiceDescriptor *s_descriptor = serv->GetDescriptor();
for (int i=0; i<s_descriptor->method_count(); i++) {
const google::protobuf::MethodDescriptor *method = s_descriptor->method(i);
cout<<"method "<<i<<" => "<<method->full_name()<<endl;
cout<<"\t Input: "<<serv->GetRequestPrototype(method).GetTypeName()<<endl;
cout<<"\tOutput: "<<serv->GetResponsePrototype(method).GetTypeName()<<endl;
}
cout<<endl;
}
cpp客户端缩略代码
class rpcChannel : public google::protobuf::RpcChannel {
void CallMethod(method, request, response) {
nn_send({method.id+request}, response);
nn_recv(response);
};
};
int main() {
nn_socket(AF_SP, NN_REQ);
nn_connect(fd, "ipc://rpc_address.proto");
myrpc::rpcService_Stub stub( new rpcChannel() );
protobuf::Message ask={1,2},ans;
stub.add(&ask, &ans);
cout<<ans.result()<<endl;
return 0;
}
cpp服务端缩略代码
class rpcWorker : public myrpc::rpcService {
void add(request, response) {
response->set_result(request.a + request.b);
};
};
int main() {
int fd = nn_socket(AF_SP, NN_REP);
int ret = nn_bind(fd, "ipc://rpc_address.proto");
ServiceDescriptor *descriptor = new rpcWorker()->GetDescriptor();
while (true) {
nn_recv(buffer);
buffer.split_into(method_id, input_argument);
MethodDescriptor *func = descriptor->method(method_id);
descriptor->CallMethod( func, input_argument, result);
nn_send(result);
}
return 0;
}
完整代码见附件
How to build (mac平台可能不需要-pthread)
#!/bin/sh
protoc --cpp_out . rpcInterface.proto
g++ -std=c++14 server.cpp rpcInterface.pb.cc -o serv -lnanomsg -lprotobuf -pthread
g++ -std=c++14 client.cpp rpcInterface.pb.cc -o client -lnanomsg -lprotobuf -pthread
client.cpp
server.cpp
rpcInterface.proto
第二章 Closure
好用的东西从这里开始,建议先看一些bind的花式玩法
https://my.oschina.net/hevakelcj/blog/114440
http://www.cnblogs.com/ttss/p/4100917.html
上文中,closure从来就没用过,google文档中提到,我们可以在rpc运行前,或调用结束后调用它。就是一个回调函数bind了一下,也可以借此实现异步rpc,但更字面的一个功能是:一次远程调用,多次返回。
今晚看了baidu-sofa-pbrpc的源码(https://github.com/baidu/sofa-pbrpc),突然理解了它,其实很简单,待我一一说来。
- 参照上文,可以见得上文的block-synchronize的rpc,仅仅占用nanomsg的一个“通道”(例如一个ipc文件)。
- 从表面上看的现象是:服务端Done->Run()可以在任意时刻调用,当服务端调用以后,客户端的Closure里写的函数就得运行。
- 所以我们得为“closure Done”单独开一条通路,下文假设为
ipc://async.ipc
。 - 干货开始了,在客户端的closure构造之初,客户端便监听
ipc://async.ipc
,看看有没有发来的消息。 - 服务端
myservice_bundle->CallMethod(method, nullptr, input_argument, result, IMPORTANT_closure);
时候,在“IMPORTANT_closure”里面写网络发送函数,当Done->Run()
运行时候,会执行IMPORTANT_closure的回调函数,回调函数的内容是“向ipc://async.ipc
里面发包”。 - 举例,一种实现方式的细节:发数据时候,我们可以从“某内存”读数据,巧妙运用bind嘛,把内存地址绑在上面,然后执行nn_send时候从“某内存”里面把东西读出来就好啦。
- 回过头来我们看,写成Closure,而不直接是std::functional简直是太明智了!避开重复发包函数,还可以强行扩展,达到“一次rpc,n次返回”!(调用一次Done->Run()就返回一次呗)