thrift 最初是 facebook 开发使用的 rpc 通信框架,后来贡献给了 apache 基金会,出来得比较早,几乎支持所有的后端语言,使用非常广泛,是不可不知的一个网络框架。
和 grpc 一样,需要先定义通信协议,然后实现自己业务逻辑。
- 安装golang的thrift协议包
go get git.apache.org/thrift.git/lib/go/thrift
- 定义协议文件
文件名为RpcService.thrift
namespace go demo.rpc
// 测试服务
service RpcService {
// 发起远程调用list<string> funCall(1:i64 callTime, 2:string funCode, 3:map<string, string> paramMap),}
当函数funCall的参数总想包含自己设计的结构时,需要自己提前设计,例如
namespace go demo.rpc
struct EchoReq {
1: string msg;
}
// 测试服务
service RpcService {
// 发起远程调用list<string> funCall(1: EchoReq req),
}
- 生成开发库
在.thrift文件路径下执行命令
thrift -gen go RpcService.thrift
生成
其中 constants.go、rpc_service.go、ttypes.go 是协议库(自行参考),编写程序需要用到。rpc_service-remote.go 是自动生成的例程,可以不用。
ttypes.go 文件是结构体类型协议
rpcservice.go 文件是函数类型协议
constants.go 文件是常量类型协议(未使用)
- 服务端实现
package main
import ("fmt""gen-go/demo/rpc""git.apache.org/thrift.git/lib/go/thrift""os"
)const (NetworkAddr = "127.0.0.1:19090"
)type RpcServiceImpl struct {
}func (this *RpcServiceImpl) FunCall(callTime int64, funCode string, paramMap map[string]string) (r []string, err error) {
fmt.Println("-->FunCall:", callTime, funCode, paramMap)for k, v := range paramMap {
r = append(r, k+v)}return
}
func main() {
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()serverTransport, err := thrift.NewTServerSocket(NetworkAddr)if err != nil {
fmt.Println("Error!", err)os.Exit(1)}handler := &RpcServiceImpl{
}processor := rpc.NewRpcServiceProcessor(handler)server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory)fmt.Println("thrift server in", NetworkAddr)server.Serve()
}
注意导入包的路径
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
serverTransport, err := thrift.NewTServerSocket(NetworkAddr)
server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory)
以上函数都是由thrift库提供的函数
func NewRpcServiceProcessor(handler RpcService) *RpcServiceProcessor {self2 := &RpcServiceProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)}self2.processorMap["funCall"] = &rpcServiceProcessorFunCall{handler: handler}return self2
}
thrift通过key-value保存了我们实际将要运行的函数,最终通过handler来执行。
server.Serve()启动服务
监听并处理
func (p *TSimpleServer) Serve() error {
err := p.Listen()if err != nil {
return err}p.AcceptLoop()return nil
}
func (p *TSimpleServer) AcceptLoop() error {
p.processRequests(client)
}
processor.Process(inputProtocol, outputProtocol)//接受信息和输出信息
利用go语言的接口实现多态执行对应函数
return processor.Process(seqId, iprot, oprot)
retval, err2 = p.handler.FunCall(args.CallTime, args.FunCode, args.ParamMap);
调用服务端接口
执行
func (this *RpcServiceImpl) FunCall(callTime int64, funCode string, paramMap map[string]string) (r []string, err error) {
fmt.Println("-->FunCall:", callTime, funCode, paramMap)for k, v := range paramMap {
r = append(r, k+v)}time.Sleep(10*time.Second)return
}
- 客户端实现
package mainimport ("fmt""gen-go/demo/rpc""git.apache.org/thrift.git/lib/go/thrift""net""os""time"
)func main() {
startTime := currentTimeMillis()transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()transport, err := thrift.NewTSocket(net.JoinHostPort("127.0.0.1", "19090"))if err != nil {
fmt.Fprintln(os.Stderr, "error resolving address:", err)os.Exit(1)}useTransport := transportFactory.GetTransport(transport)client := rpc.NewRpcServiceClientFactory(useTransport, protocolFactory)if err := transport.Open(); err != nil {
fmt.Fprintln(os.Stderr, "Error opening socket to 127.0.0.1:19090", " ", err)os.Exit(1)}defer transport.Close()for i := 0; i < 1000; i++ {
paramMap := make(map[string]string)paramMap["name"] = "qinerg"paramMap["passwd"] = "123456"r1, e1 := client.FunCall(currentTimeMillis(), "login", paramMap)fmt.Println(i, "Call->", r1, e1)}endTime := currentTimeMillis()fmt.Println("Program exit. time->", endTime, startTime, (endTime - startTime))
}// 转换成毫秒
func currentTimeMillis() int64 {
return time.Now().UnixNano() / 1000000
}
r1, e1 := client.FunCall(currentTimeMillis(), “login”, paramMap)
rpcservice.go 中
func (p *RpcServiceClient) FunCall(callTime int64, funCode string, paramMap map[string]string) (r []string, err error) {
if err = p.sendFunCall(callTime, funCode, paramMap); err != nil {
return}return p.recvFunCall()