go micro code analysis
go micro code analysis
Overview
The main piece of software provided is Micro, a microservice toolkit.
The toolkit is composed of the following components:
Go Micro - A pluggable RPC framework for writing microservices in Go. It provides libraries for service discovery, client side load balancing, encoding, synchronous and asynchronous communication.
API - An API Gateway that serves HTTP and routes requests to appropriate micro services. It acts as a single entry point and can either be used as a reverse proxy or translate HTTP requests to RPC.
Sidecar - A language agnostic RPC proxy with all the features of go-micro as HTTP endpoints. While Go is a great language for building microservices, you may also want to use other languages, so the Sidecar provides a way to integrate your other apps into the Micro world.
Web - A web dashboard and reverse proxy for micro web applications. We believe that web apps should be built as microservices and therefore treated as a first class citizen in a microservice world. It behaves much the like the API reverse proxy but also includes support for web sockets.
CLI - A straight forward command line interface to interact with your micro services. It also allows you to leverage the Sidecar as a proxy where you may not want to directly connect to the service registry.
Bot - A Hubot style bot that sits inside your microservices platform and can be interacted with via Slack, HipChat, XMPP, etc. It provides the features of the CLI via messaging. Additional commands can be added to automate common ops tasks.
Note: Go-micro is a standalone library and can be used independent of the rest of the toolkit.
Usage
The top level Service interface is the main component for building a service. It wraps all the underlying packages of Go Micro into a single convenient interface.
type Service interface {
Init(...Option)
Options() Options
Client() client.Client
Server() server.Server
Run() error
String() string
}
Define Api Interface
use protobuf files to define the service API interface. This is a very convenient way to strictly define the API and provide concrete types for both the server and a client.
e.g:greeter.proto
syntax = "proto3";
service Greeter {
rpc Hello(HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string greeting = 2;
}
use protoc and protoc-gen-go to generate the concrete go implementation for this definition.It’s done via a protobuf plugin which requires a fork of golang/protobuf that can be found here micro/protobuf
go get github.com/micro/protobuf/{proto,protoc-gen-go}
protoc --go_out=plugins=micro:. greeter.proto
Write Service
package main
import (
"log"
"github.com/micro/go-micro"
proto "github.com/micro/examples/service/proto"
"golang.org/x/net/context"
)
type Greeter struct{}
func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
rsp.Greeting = "Hello " + req.Name
return nil
}
func main() {
service := micro.NewService(
micro.Name("greeter"),
micro.Version("latest"),
)
service.Init()
proto.RegisterGreeterHandler(service.Server(), new(Greeter))
if err := service.Run(); err != nil {
log.Fatal(err)
}
}
Write Client
// create the greeter client using the service name and client
greeter := proto.NewGreeterClient("greeter", service.Client())
// request the Hello method on the Greeter handler
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{
Name: "John",
})
if err != nil {
fmt.Println(err)
return
}
fmt.Println(rsp.Greeter)
Go Micro Analysis
Architecture
Go-micro is a standalone RPC framework for microservices. It’s at the core of the toolkit and leveraged by all the above components. Here we’ll look at each individual feature of go-micro.
Registry
The registry provides a pluggable service discovery library to find running services. Current implementations are consul, etcd, memory and kubernetes. The interface is easily implemented if your preferences differ.
Selector
The selector provides a load balancing mechanism via selection. When a client makes a request to a service it will first query the registry for the service. This usually returns a list of running nodes representing the service. A selector will select one of these nodes to be used for querying. Multiple calls to the selector will allow balancing algorithms to be utilised. Current methods are round robin, random hashed and blacklist.
Broker
The broker is pluggable interface for pub/sub. Microservices are an event driven architecture where a publishing and subscribing to events should be a first class citizen. Current implementations include nats, rabbitmq and http (for development).
Transport
Transport is a pluggable interface over point to point transfer of messages. Current implementations are http, rabbitmq and nats. By providing this abstraction, transports can be swapped out seamlessly.
Client
The client provides a way to make RPC queries. It combines the registry, selector, broker and transport. It also provides retries, timeouts, use of context, etc.
Server
The server is an interface to build a running microservice. It provides a way of serving RPC requests.
Source Code Explaination
register service handler
func (s *rpcServer) Handle(h Handler) error {
s.Lock()
defer s.Unlock()
if err := s.rpc.register(h.Handler()); err != nil {
return err
}
s.handlers[h.Name()] = h
return nil
}
func (server *server) register(rcvr interface{}) error {
server.mu.Lock()
defer server.mu.Unlock()
if server.serviceMap == nil {
server.serviceMap = make(map[string]*service)
}
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if sname == "" {
log.Fatal("rpc: no service name for type", s.typ.String())
}
if !isExported(sname) {
s := "rpc Register: type " + sname + " is not exported"
log.Print(s)
return errors.New(s)
}
if _, present := server.serviceMap[sname]; present {
return errors.New("rpc: service already defined: " + sname)
}
s.name = sname
s.method = make(map[string]*methodType)
// Install the methods
for m := 0; m < s.typ.NumMethod(); m++ {
method := s.typ.Method(m)
if mt := prepareMethod(method); mt != nil {
s.method[method.Name] = mt
}
}
if len(s.method) == 0 {
s := "rpc Register: type " + sname + " has no exported methods of suitable type"
log.Print(s)
return errors.New(s)
}
server.serviceMap[s.name] = s
return nil
}
Start Service
func (s *service) Start() error {
for _, fn := range s.opts.BeforeStart {
if err := fn(); err != nil {
return err
}
}
if err := s.opts.Server.Start(); err != nil {
return err
}
if err := s.opts.Server.Register(); err != nil {
return err
}
for _, fn := range s.opts.AfterStart {
if err := fn(); err != nil {
return err
}
}
return nil
}
/*serving client request*/
func (s *rpcServer) Start() error {
registerDebugHandler(s)
config := s.Options()
ts, err := config.Transport.Listen(config.Address)
if err != nil {
return err
}
log.Printf("Listening on %s", ts.Addr())
s.Lock()
s.opts.Address = ts.Addr()
s.Unlock()
go ts.Accept(s.accept)
go func() {
ch := <-s.exit
ch <- ts.Close()
config.Broker.Disconnect()
}()
// TODO: subscribe to cruft
return config.Broker.Connect()
}
/*register service to discovery system like consul*/
func (s *rpcServer) Register() error {
// parse address for host, port
config := s.Options()
var advt, host string
var port int
// check the advertise address first
// if it exists then use it, otherwise
// use the address
if len(config.Advertise) > 0 {
advt = config.Advertise
} else {
advt = config.Address
}
parts := strings.Split(advt, ":")
if len(parts) > 1 {
host = strings.Join(parts[:len(parts)-1], ":")
port, _ = strconv.Atoi(parts[len(parts)-1])
} else {
host = parts[0]
}
addr, err := addr.Extract(host)
if err != nil {
return err
}
// register service
node := ®istry.Node{
Id: config.Name + "-" + config.Id,
Address: addr,
Port: port,
Metadata: config.Metadata,
}
...
service := ®istry.Service{
Name: config.Name,
Version: config.Version,
Nodes: []*registry.Node{node},
Endpoints: endpoints,
}
...
if err := config.Registry.Register(service, rOpts...); err != nil {
return err
}
...
}
/*find method name from request params*/
func (server *server) readRequestHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
err = codec.ReadRequestHeader(req, true)
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
err = errors.New("rpc: server cannot decode request: " + err.Error())
return
}
// We read the header successfully. If we see an error now,
// we can still recover and move on to the next request.
keepReading = true
serviceMethod := strings.Split(req.ServiceMethod, ".")
if len(serviceMethod) != 2 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
return
}
// Look up the request.
server.mu.Lock()
service = server.serviceMap[serviceMethod[0]]
server.mu.Unlock()
if service == nil {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
return
}
mtype = service.method[serviceMethod[1]]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
}
return
}
/*reflect invoking method*/
func (server *server) serveRequest(ctx context.Context, codec serverCodec, ct string) error {
sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if !keepReading {
return err
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error(), true)
server.freeRequest(req)
}
return err
}
service.call(ctx, server, sending, mtype, req, argv, replyv, codec, ct)
return nil
}
Client Invoke
/*Client select node which provide service and make call*/
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
...
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the address
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
// make the call
err = rcall(ctx, address, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return err
}
ch := make(chan error, callOpts.Retries)
var gerr error
for i := 0; i < callOpts.Retries; i++ {
go func() {
ch <- call(i)
}()
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408)
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {
return nil
}
retry, rerr := callOpts.Retry(ctx, request, i, err)
if rerr != nil {
return rerr
}
if !retry {
return err
}
gerr = err
}
}
return gerr
}
/*get service node from service name*/
func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, error) {
sopts := SelectOptions{
Strategy: r.so.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNoneAvailable
}
return sopts.Strategy(services), nil
}