Client.Call
方法进行同步阻塞调用,该方法的实现如下:func (client *Client) Call(
serviceMethod string, args interface{},
reply interface{},
) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
Client.Go
方法进行一次异步调用,返回一个表示这次调用的Call
结构体。然后等待Call
结构体的Done管道返回调用结果。Client.Go
方法异步调用前面的HelloService服务:func doClientWork(client *rpc.Client) {
helloCall := client.Go("HelloService.Hello", "hello", new(string), nil)
// do some thing
helloCall = <-helloCall.Done
if err := helloCall.Error; err != nil {
log.Fatal(err)
}
args := helloCall.Args.(string)
reply := helloCall.Reply.(string)
fmt.Println(args, reply)
}
Call
变量进行获取。Client.Go
方法实现如下:func (client *Client) Go(
serviceMethod string, args interface{},
reply interface{},
done chan *Call,
) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
call.Done = make(chan *Call, 10) // buffered.
client.send(call)
return call
}
call
变量,然后通过client.send
将call的完整参数发送到RPC框架。client.send
方法调用是线程安全的,因此可以从多个Goroutine同时向同一个RPC链接发送调用指令。call.done
方法通知完成:func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don‘t want to block here. It is the caller‘s responsibility to make
// sure the channel has enough buffer space. See comment in Go().
}
}
Call.done
方法的实现可以得知call.Done
管道会将处理后的call返回。client.send
是线程安全的,我们也可以通过在不同的Goroutine中同时并发阻塞调用RPC方法。通过在一个独立的Goroutine中调用Watch函数进行监控。type KVStoreService struct {
m map[string]string
filter map[string]func(key string)
mu sync.Mutex
}
func NewKVStoreService() *KVStoreService {
return &KVStoreService{
m: make(map[string]string),
filter: make(map[string]func(key string)),
}
}
m
成员是一个map类型,用于存储KV数据。filter
成员对应每个Watch调用时定义的过滤器函数列表。而mu
成员为互斥锁,用于在多个Goroutine访问或修改时对其它成员提供保护。func (p *KVStoreService) Get(key string, value *string) error {
p.mu.Lock()
defer p.mu.Unlock()
if v, ok := p.m[key]; ok {
*value = v
return nil
}
return fmt.Errorf("not found")
}
func (p *KVStoreService) Set(kv [2]string, reply *struct{}) error {
p.mu.Lock()
defer p.mu.Unlock()
key, value := kv[0], kv[1]
if oldValue := p.m[key]; oldValue != value {
for _, fn := range p.filter {
fn(key)
}
}
p.m[key] = value
return nil
}
func (p *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int())
ch := make(chan string, 10) // buffered
p.mu.Lock()
p.filter[id] = func(key string) { ch <- key }
p.mu.Unlock()
select {
case <-time.After(time.Duration(timeoutSecond) * time.Second):
return fmt.Errorf("timeout")
case key := <-ch:
*keyChanged = key
return nil
}
return nil
}
p.filter
列表。func doClientWork(client *rpc.Client) {
go func() {
var keyChanged string
err := client.Call("KVStoreService.Watch", 30, &keyChanged)
if err != nil {
log.Fatal(err)
}
fmt.Println("watch:", keyChanged)
} ()
err := client.Call(
"KVStoreService.Set", [2]string{"abc", "abc-value"},
new(struct{}),
)
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second*3)
}
func main() {
rpc.Register(new(HelloService))
for {
// 先拨号获取conn后在进行服务的开启 相当于一个中转
conn, _ := net.Dial("tcp", "localhost:1234")
if conn == nil {
time.Sleep(time.Second)
continue
}
rpc.ServeConn(conn)
conn.Close()
}
}
func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
clientChan := make(chan *rpc.Client)
go func() {
for {
// 获取连接对象放入生成的clientChan中
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
clientChan <- rpc.NewClient(conn)
}
}()
// 调用最终执行rpc的环境
doClientWork(clientChan)
}
func doClientWork(clientChan <-chan *rpc.Client) {
client := <-clientChan
defer client.Close()
var reply string
// 调用call
err = client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
type HelloService struct {
conn net.Conn
}
func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
go func() {
defer conn.Close()
p := rpc.NewServer()
p.Register(&HelloService{conn: conn})
p.ServeConn(conn)
} ()
}
}
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
return nil
}
type HelloService struct {
conn net.Conn
isLogin bool
}
func (p *HelloService) Login(request string, reply *string) error {
if request != "user:password" {
return fmt.Errorf("auth failed")
}
log.Println("login ok")
p.isLogin = true
return nil
}
func (p *HelloService) Hello(request string, reply *string) error {
if !p.isLogin {
return fmt.Errorf("please login")
}
*reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
return nil
}
原文:https://www.cnblogs.com/binHome/p/13054657.html