Codis In-depth Analysis


codis in-depth analysis

Architecture

Codis由以下组件组成:

Codis-Server

Codis采用预先分片(Pre-Sharding)机制,事先规定好了,分成1024个slots,这些路由信息保存在ZooKeeper/Etcd中,同时维护Codis Server Group信息,并提供分布式锁等服务. redis源码server.c部分修改主要是针对Slot的维护命令,参见slot.c

{"slotsinfo",slotsinfoCommand,-1,"rF",0,NULL,0,0,0,0,0},
{"slotsscan",slotsscanCommand,-3,"rR",0,NULL,0,0,0,0,0},
{"slotsdel",slotsdelCommand,-2,"w",0,NULL,1,-1,1,0,0},
{"slotsmgrtslot",slotsmgrtslotCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotsmgrtone",slotsmgrtoneCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotsmgrttagslot",slotsmgrttagslotCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotsmgrttagone",slotsmgrttagoneCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotshashkey",slotshashkeyCommand,-1,"rF",0,NULL,0,0,0,0,0},
{"slotscheck",slotscheckCommand,0,"r",0,NULL,0,0,0,0,0},
{"slotsrestore",slotsrestoreCommand,-4,"awm",0,NULL,1,1,1,0,0},

Codis-Dashboard

pkg/topom/topom_api.go统一提供基于proxy,group,slots,sentinels的api维护接口

r.Group("/api/topom", func(r martini.Router) {
		r.Get("/model", api.Model)
		r.Get("/xping/:xauth", api.XPing)
		r.Get("/stats/:xauth", api.Stats)
		r.Get("/slots/:xauth", api.Slots)
		r.Put("/reload/:xauth", api.Reload)
		r.Put("/shutdown/:xauth", api.Shutdown)
		r.Put("/loglevel/:xauth/:value", api.LogLevel)
		r.Group("/proxy", func(r martini.Router) {
			r.Put("/create/:xauth/:addr", api.CreateProxy)
			r.Put("/online/:xauth/:addr", api.OnlineProxy)
			r.Put("/reinit/:xauth/:token", api.ReinitProxy)
			r.Put("/remove/:xauth/:token/:force", api.RemoveProxy)
		})
		r.Group("/group", func(r martini.Router) {
			r.Put("/create/:xauth/:gid", api.CreateGroup)
			r.Put("/remove/:xauth/:gid", api.RemoveGroup)
			r.Put("/resync/:xauth/:gid", api.ResyncGroup)
			r.Put("/resync-all/:xauth", api.ResyncGroupAll)
			r.Put("/add/:xauth/:gid/:addr", api.GroupAddServer)
			r.Put("/add/:xauth/:gid/:addr/:datacenter", api.GroupAddServer)
			r.Put("/del/:xauth/:gid/:addr", api.GroupDelServer)
			r.Put("/promote/:xauth/:gid/:addr", api.GroupPromoteServer)
			r.Put("/replica-groups/:xauth/:gid/:addr/:value", api.EnableReplicaGroups)
			r.Put("/replica-groups-all/:xauth/:value", api.EnableReplicaGroupsAll)
			r.Group("/action", func(r martini.Router) {
				r.Put("/create/:xauth/:addr", api.SyncCreateAction)
				r.Put("/remove/:xauth/:addr", api.SyncRemoveAction)
			})
			r.Get("/info/:addr", api.InfoServer)
		})
		r.Group("/slots", func(r martini.Router) {
			r.Group("/action", func(r martini.Router) {
				r.Put("/create/:xauth/:sid/:gid", api.SlotCreateAction)
				r.Put("/create-range/:xauth/:beg/:end/:gid", api.SlotCreateActionRange)
				r.Put("/remove/:xauth/:sid", api.SlotRemoveAction)
				r.Put("/interval/:xauth/:value", api.SetSlotActionInterval)
				r.Put("/disabled/:xauth/:value", api.SetSlotActionDisabled)
			})
			r.Put("/assign/:xauth", binding.Json([]*models.SlotMapping{}), api.SlotsAssignGroup)
			r.Put("/assign/:xauth/offline", binding.Json([]*models.SlotMapping{}), api.SlotsAssignOffline)
			r.Put("/rebalance/:xauth/:confirm", api.SlotsRebalance)
		})
		r.Group("/sentinels", func(r martini.Router) {
			r.Put("/add/:xauth/:addr", api.AddSentinel)
			r.Put("/del/:xauth/:addr/:force", api.DelSentinel)
			r.Put("/resync-all/:xauth", api.ResyncSentinels)
			r.Get("/info/:addr", api.InfoSentinel)
			r.Get("/info/:addr/monitored", api.InfoSentinelMonitored)
		})
	})

再通过资源具体实现完成操作pkg/topom/topom_proxy.go

func (s *Topom) CreateProxy(addr string) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	ctx, err := s.newContext()
	if err != nil {
		return err
	}

	p, err := proxy.NewApiClient(addr).Model()
	if err != nil {
		return errors.Errorf("proxy@%s fetch model failed", addr)
	}
	c := s.newProxyClient(p)

	if err := c.XPing(); err != nil {
		return errors.Errorf("proxy@%s check xauth failed", addr)
	}
	if ctx.proxy[p.Token] != nil {
		return errors.Errorf("proxy-[%s] already exists", p.Token)
	} else {
		p.Id = ctx.maxProxyId() + 1
	}
	defer s.dirtyProxyCache(p.Token)

	if err := s.storeCreateProxy(p); err != nil {
		return err
	} else {
		return s.reinitProxy(ctx, p, c)
	}
}

func (s *Topom) OnlineProxy(addr string) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	ctx, err := s.newContext()
	if err != nil {
		return err
	}

	p, err := proxy.NewApiClient(addr).Model()
	if err != nil {
		return errors.Errorf("proxy@%s fetch model failed", addr)
	}
	c := s.newProxyClient(p)

	if err := c.XPing(); err != nil {
		return errors.Errorf("proxy@%s check xauth failed", addr)
	}
	defer s.dirtyProxyCache(p.Token)

	if d := ctx.proxy[p.Token]; d != nil {
		p.Id = d.Id
		if err := s.storeUpdateProxy(p); err != nil {
			return err
		}
	} else {
		p.Id = ctx.maxProxyId() + 1
		if err := s.storeCreateProxy(p); err != nil {
			return err
		}
	}
	return s.reinitProxy(ctx, p, c)
}

func (s *Topom) RemoveProxy(token string, force bool) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	ctx, err := s.newContext()
	if err != nil {
		return err
	}

	p, err := ctx.getProxy(token)
	if err != nil {
		return err
	}
	c := s.newProxyClient(p)

	if err := c.Shutdown(); err != nil {
		log.WarnErrorf(err, "proxy-[%s] shutdown failed, force remove = %t", token, force)
		if !force {
			return errors.Errorf("proxy-[%s] shutdown failed", p.Token)
		}
	}
	defer s.dirtyProxyCache(p.Token)

	return s.storeRemoveProxy(p)
}

func (s *Topom) ReinitProxy(token string) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	ctx, err := s.newContext()
	if err != nil {
		return err
	}

	p, err := ctx.getProxy(token)
	if err != nil {
		return err
	}
	c := s.newProxyClient(p)

	return s.reinitProxy(ctx, p, c)
}

func (s *Topom) newProxyClient(p *models.Proxy) *proxy.ApiClient {
	c := proxy.NewApiClient(p.AdminAddr)
	c.SetXAuth(s.config.ProductName, s.config.ProductAuth, p.Token)
	return c
}

func (s *Topom) reinitProxy(ctx *context, p *models.Proxy, c *proxy.ApiClient) error {
	log.Warnf("proxy-[%s] reinit:\n%s", p.Token, p.Encode())
	if err := c.FillSlots(ctx.toSlotSlice(ctx.slots, p)...); err != nil {
		log.ErrorErrorf(err, "proxy-[%s] fillslots failed", p.Token)
		return errors.Errorf("proxy-[%s] fillslots failed", p.Token)
	}
	if err := c.Start(); err != nil {
		log.ErrorErrorf(err, "proxy-[%s] start failed", p.Token)
		return errors.Errorf("proxy-[%s] start failed", p.Token)
	}
	if err := c.SetSentinels(ctx.sentinel); err != nil {
		log.ErrorErrorf(err, "proxy-[%s] set sentinels failed", p.Token)
		return errors.Errorf("proxy-[%s] set sentinels failed", p.Token)
	}
	return nil
}

func (s *Topom) resyncSlotMappingsByGroupId(ctx *context, gid int) error {
	return s.resyncSlotMappings(ctx, ctx.getSlotMappingsByGroupId(gid)...)
}

func (s *Topom) resyncSlotMappings(ctx *context, slots ...*models.SlotMapping) error {
	if len(slots) == 0 {
		return nil
	}
	var fut sync2.Future
	for _, p := range ctx.proxy {
		fut.Add()
		go func(p *models.Proxy) {
			err := s.newProxyClient(p).FillSlots(ctx.toSlotSlice(slots, p)...)
			if err != nil {
				log.ErrorErrorf(err, "proxy-[%s] resync slots failed", p.Token)
			}
			fut.Done(p.Token, err)
		}(p)
	}
	for t, v := range fut.Wait() {
		switch err := v.(type) {
		case error:
			if err != nil {
				return errors.Errorf("proxy-[%s] resync slots failed", t)
			}
		}
	}
	return nil
}

存储层接口/pkg/models/client.go

type Client interface {
	Create(path string, data []byte) error
	Update(path string, data []byte) error
	Delete(path string) error

	Read(path string, must bool) ([]byte, error)
	List(path string, must bool) ([]string, error)

	Close() error

	WatchInOrder(path string) (<-chan struct{}, []string, error)

	CreateEphemeral(path string, data []byte) (<-chan struct{}, error)
	CreateEphemeralInOrder(path string, data []byte) (<-chan struct{}, string, error)
}

func NewClient(coordinator string, addrlist string, timeout time.Duration) (Client, error) {
	switch coordinator {
	case "zk", "zookeeper":
		return zkclient.New(addrlist, timeout)
	case "etcd":
		return etcdclient.New(addrlist, timeout)
	case "fs", "filesystem":
		return fsclient.New(addrlist)
	}
	return nil, errors.Errorf("invalid coordinator name = %s", coordinator)
}

Basic Usage

参考/example/setup.py搭建本地redis-cluster的若干步骤

...
if __name__ == "__main__":
    children = []
    atexit.register(kill_all, children)

    product_name = "demo-test"
    product_auth = None

    # step 1. codis-server & codis-sentinel

    # codis-server [master 16380+i <== following == 17380+i slave]
    for port in range(16380, 16384):
        children.append(CodisServer(port, requirepass=product_auth))
        children.append(CodisServer(port + 1000, port, requirepass=product_auth))

    for port in range(26380, 26385):
        children.append(CodisSentinel(port))

    check_alive(children, 1)
    print("[OK] setup codis-server & codis-sentinel")

    # step 2. setup codis-fe & codis-dashboard & codis-proxy

    children.append(CodisFE(8080, "../cmd/fe/assets"))
    children.append(CodisDashboard(18080, product_name, product_auth))

    for i in range(0, 4):
        children.append(CodisProxy(11080 + i, 19000 + i, product_name, product_auth))

    check_alive(children, 3)
    print("[OK] setup codis-fe & codis-dashboard & codis-proxy")

    # step3: init slot-mappings

    for i in range(0, 4):
        gid = i + 1
        codis_admin_dashboard(18080, "--create-group --gid={}".format(gid))
        codis_admin_dashboard(18080, "--group-add --gid={} --addr=127.0.0.1:{} --datacenter=localhost".format(gid, 16380+i))
        codis_admin_dashboard(18080, "--group-add --gid={} --addr=127.0.0.1:{} --datacenter=localhost".format(gid, 17380+i))
        beg, end = i * 256, (i + 1) * 256 - 1
        codis_admin_dashboard(18080, "--slots-assign --beg={} --end={} --gid={} --confirm".format(beg, end, gid))
        codis_admin_dashboard(18080, "--resync-group --gid={}".format(gid))

    for i in range(0, 5):
        codis_admin_dashboard(18080, "--sentinel-add --addr=127.0.0.1:{}".format(26380+i))

    codis_admin_dashboard(18080, "--slot-action --interval=100")
    codis_admin_dashboard(18080, "--sentinel-resync")

    check_alive(children, 3)
    print("[OK] done & have fun!!!")

    while True:
        print(datetime.datetime.now())
        time.sleep(5)