Codis In-depth Analysis
codis in-depth analysis
Architecture
Codis由以下组件组成:
Codis Server:基于 redis分支开发。增加了额外的数据结构,以支持 slot 有关的操作以及数据迁移指令。具体的修改可以参考文档 redis 的修改。
Codis Proxy:客户端连接的 Redis 代理服务, 实现了 Redis 协议。 除部分命令不支持以外(不支持的命令列表),表现的和原生的 Redis 没有区别(就像 Twemproxy)。
- 对于同一个业务集群而言,可以同时部署多个 codis-proxy 实例
- 不同 codis-proxy 之间由 codis-dashboard 保证状态同步
Codis Dashboard:集群管理工具,支持 codis-proxy、codis-server 的添加、删除,以及据迁移等操作。在集群状态发生改变时,codis-dashboard 维护集群下所有 codis-proxy 的状态的一致性。
- 对于同一个业务集群而言,同一个时刻 codis-dashboard 只能有 0个或者1个;
- 所有对集群的修改都必须通过 codis-dashboard 完成。
Codis Admin:集群管理的命令行工具。
- 可用于控制 codis-proxy、codis-dashboard 状态以及访问外部存储。
Codis FE:集群管理界面。
- 多个集群实例共享可以共享同一个前端展示页面;
- 通过配置文件管理后端 codis-dashboard 列表,配置文件可自动更新。
Codis HA:为集群提供高可用。
- 依赖 codis-dashboard 实例,自动抓取集群各个组件的状态;
- 会根据当前集群状态自动生成主从切换策略,并在需要时通过 codis-dashboard 完成主从切换。
Storage:为集群状态提供外部存储。
- 提供 Namespace 概念,不同集群的会按照不同 product name 进行组织;
- 目前仅提供了 Zookeeper 和 Etcd 两种实现,但是提供了抽象的 interface 可自行扩展
Codis-Server
Codis采用预先分片(Pre-Sharding)机制,事先规定好了,分成1024个slots,这些路由信息保存在ZooKeeper/Etcd中,同时维护Codis Server Group信息,并提供分布式锁等服务. redis源码server.c部分修改主要是针对Slot的维护命令,参见slot.c
{"slotsinfo",slotsinfoCommand,-1,"rF",0,NULL,0,0,0,0,0},
{"slotsscan",slotsscanCommand,-3,"rR",0,NULL,0,0,0,0,0},
{"slotsdel",slotsdelCommand,-2,"w",0,NULL,1,-1,1,0,0},
{"slotsmgrtslot",slotsmgrtslotCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotsmgrtone",slotsmgrtoneCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotsmgrttagslot",slotsmgrttagslotCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotsmgrttagone",slotsmgrttagoneCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotshashkey",slotshashkeyCommand,-1,"rF",0,NULL,0,0,0,0,0},
{"slotscheck",slotscheckCommand,0,"r",0,NULL,0,0,0,0,0},
{"slotsrestore",slotsrestoreCommand,-4,"awm",0,NULL,1,1,1,0,0},
Codis-Dashboard
pkg/topom/topom_api.go统一提供基于proxy,group,slots,sentinels的api维护接口
r.Group("/api/topom", func(r martini.Router) {
r.Get("/model", api.Model)
r.Get("/xping/:xauth", api.XPing)
r.Get("/stats/:xauth", api.Stats)
r.Get("/slots/:xauth", api.Slots)
r.Put("/reload/:xauth", api.Reload)
r.Put("/shutdown/:xauth", api.Shutdown)
r.Put("/loglevel/:xauth/:value", api.LogLevel)
r.Group("/proxy", func(r martini.Router) {
r.Put("/create/:xauth/:addr", api.CreateProxy)
r.Put("/online/:xauth/:addr", api.OnlineProxy)
r.Put("/reinit/:xauth/:token", api.ReinitProxy)
r.Put("/remove/:xauth/:token/:force", api.RemoveProxy)
})
r.Group("/group", func(r martini.Router) {
r.Put("/create/:xauth/:gid", api.CreateGroup)
r.Put("/remove/:xauth/:gid", api.RemoveGroup)
r.Put("/resync/:xauth/:gid", api.ResyncGroup)
r.Put("/resync-all/:xauth", api.ResyncGroupAll)
r.Put("/add/:xauth/:gid/:addr", api.GroupAddServer)
r.Put("/add/:xauth/:gid/:addr/:datacenter", api.GroupAddServer)
r.Put("/del/:xauth/:gid/:addr", api.GroupDelServer)
r.Put("/promote/:xauth/:gid/:addr", api.GroupPromoteServer)
r.Put("/replica-groups/:xauth/:gid/:addr/:value", api.EnableReplicaGroups)
r.Put("/replica-groups-all/:xauth/:value", api.EnableReplicaGroupsAll)
r.Group("/action", func(r martini.Router) {
r.Put("/create/:xauth/:addr", api.SyncCreateAction)
r.Put("/remove/:xauth/:addr", api.SyncRemoveAction)
})
r.Get("/info/:addr", api.InfoServer)
})
r.Group("/slots", func(r martini.Router) {
r.Group("/action", func(r martini.Router) {
r.Put("/create/:xauth/:sid/:gid", api.SlotCreateAction)
r.Put("/create-range/:xauth/:beg/:end/:gid", api.SlotCreateActionRange)
r.Put("/remove/:xauth/:sid", api.SlotRemoveAction)
r.Put("/interval/:xauth/:value", api.SetSlotActionInterval)
r.Put("/disabled/:xauth/:value", api.SetSlotActionDisabled)
})
r.Put("/assign/:xauth", binding.Json([]*models.SlotMapping{}), api.SlotsAssignGroup)
r.Put("/assign/:xauth/offline", binding.Json([]*models.SlotMapping{}), api.SlotsAssignOffline)
r.Put("/rebalance/:xauth/:confirm", api.SlotsRebalance)
})
r.Group("/sentinels", func(r martini.Router) {
r.Put("/add/:xauth/:addr", api.AddSentinel)
r.Put("/del/:xauth/:addr/:force", api.DelSentinel)
r.Put("/resync-all/:xauth", api.ResyncSentinels)
r.Get("/info/:addr", api.InfoSentinel)
r.Get("/info/:addr/monitored", api.InfoSentinelMonitored)
})
})
再通过资源具体实现完成操作pkg/topom/topom_proxy.go
func (s *Topom) CreateProxy(addr string) error {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
if err != nil {
return err
}
p, err := proxy.NewApiClient(addr).Model()
if err != nil {
return errors.Errorf("proxy@%s fetch model failed", addr)
}
c := s.newProxyClient(p)
if err := c.XPing(); err != nil {
return errors.Errorf("proxy@%s check xauth failed", addr)
}
if ctx.proxy[p.Token] != nil {
return errors.Errorf("proxy-[%s] already exists", p.Token)
} else {
p.Id = ctx.maxProxyId() + 1
}
defer s.dirtyProxyCache(p.Token)
if err := s.storeCreateProxy(p); err != nil {
return err
} else {
return s.reinitProxy(ctx, p, c)
}
}
func (s *Topom) OnlineProxy(addr string) error {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
if err != nil {
return err
}
p, err := proxy.NewApiClient(addr).Model()
if err != nil {
return errors.Errorf("proxy@%s fetch model failed", addr)
}
c := s.newProxyClient(p)
if err := c.XPing(); err != nil {
return errors.Errorf("proxy@%s check xauth failed", addr)
}
defer s.dirtyProxyCache(p.Token)
if d := ctx.proxy[p.Token]; d != nil {
p.Id = d.Id
if err := s.storeUpdateProxy(p); err != nil {
return err
}
} else {
p.Id = ctx.maxProxyId() + 1
if err := s.storeCreateProxy(p); err != nil {
return err
}
}
return s.reinitProxy(ctx, p, c)
}
func (s *Topom) RemoveProxy(token string, force bool) error {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
if err != nil {
return err
}
p, err := ctx.getProxy(token)
if err != nil {
return err
}
c := s.newProxyClient(p)
if err := c.Shutdown(); err != nil {
log.WarnErrorf(err, "proxy-[%s] shutdown failed, force remove = %t", token, force)
if !force {
return errors.Errorf("proxy-[%s] shutdown failed", p.Token)
}
}
defer s.dirtyProxyCache(p.Token)
return s.storeRemoveProxy(p)
}
func (s *Topom) ReinitProxy(token string) error {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
if err != nil {
return err
}
p, err := ctx.getProxy(token)
if err != nil {
return err
}
c := s.newProxyClient(p)
return s.reinitProxy(ctx, p, c)
}
func (s *Topom) newProxyClient(p *models.Proxy) *proxy.ApiClient {
c := proxy.NewApiClient(p.AdminAddr)
c.SetXAuth(s.config.ProductName, s.config.ProductAuth, p.Token)
return c
}
func (s *Topom) reinitProxy(ctx *context, p *models.Proxy, c *proxy.ApiClient) error {
log.Warnf("proxy-[%s] reinit:\n%s", p.Token, p.Encode())
if err := c.FillSlots(ctx.toSlotSlice(ctx.slots, p)...); err != nil {
log.ErrorErrorf(err, "proxy-[%s] fillslots failed", p.Token)
return errors.Errorf("proxy-[%s] fillslots failed", p.Token)
}
if err := c.Start(); err != nil {
log.ErrorErrorf(err, "proxy-[%s] start failed", p.Token)
return errors.Errorf("proxy-[%s] start failed", p.Token)
}
if err := c.SetSentinels(ctx.sentinel); err != nil {
log.ErrorErrorf(err, "proxy-[%s] set sentinels failed", p.Token)
return errors.Errorf("proxy-[%s] set sentinels failed", p.Token)
}
return nil
}
func (s *Topom) resyncSlotMappingsByGroupId(ctx *context, gid int) error {
return s.resyncSlotMappings(ctx, ctx.getSlotMappingsByGroupId(gid)...)
}
func (s *Topom) resyncSlotMappings(ctx *context, slots ...*models.SlotMapping) error {
if len(slots) == 0 {
return nil
}
var fut sync2.Future
for _, p := range ctx.proxy {
fut.Add()
go func(p *models.Proxy) {
err := s.newProxyClient(p).FillSlots(ctx.toSlotSlice(slots, p)...)
if err != nil {
log.ErrorErrorf(err, "proxy-[%s] resync slots failed", p.Token)
}
fut.Done(p.Token, err)
}(p)
}
for t, v := range fut.Wait() {
switch err := v.(type) {
case error:
if err != nil {
return errors.Errorf("proxy-[%s] resync slots failed", t)
}
}
}
return nil
}
存储层接口/pkg/models/client.go
type Client interface {
Create(path string, data []byte) error
Update(path string, data []byte) error
Delete(path string) error
Read(path string, must bool) ([]byte, error)
List(path string, must bool) ([]string, error)
Close() error
WatchInOrder(path string) (<-chan struct{}, []string, error)
CreateEphemeral(path string, data []byte) (<-chan struct{}, error)
CreateEphemeralInOrder(path string, data []byte) (<-chan struct{}, string, error)
}
func NewClient(coordinator string, addrlist string, timeout time.Duration) (Client, error) {
switch coordinator {
case "zk", "zookeeper":
return zkclient.New(addrlist, timeout)
case "etcd":
return etcdclient.New(addrlist, timeout)
case "fs", "filesystem":
return fsclient.New(addrlist)
}
return nil, errors.Errorf("invalid coordinator name = %s", coordinator)
}
Basic Usage
参考/example/setup.py搭建本地redis-cluster的若干步骤
...
if __name__ == "__main__":
children = []
atexit.register(kill_all, children)
product_name = "demo-test"
product_auth = None
# step 1. codis-server & codis-sentinel
# codis-server [master 16380+i <== following == 17380+i slave]
for port in range(16380, 16384):
children.append(CodisServer(port, requirepass=product_auth))
children.append(CodisServer(port + 1000, port, requirepass=product_auth))
for port in range(26380, 26385):
children.append(CodisSentinel(port))
check_alive(children, 1)
print("[OK] setup codis-server & codis-sentinel")
# step 2. setup codis-fe & codis-dashboard & codis-proxy
children.append(CodisFE(8080, "../cmd/fe/assets"))
children.append(CodisDashboard(18080, product_name, product_auth))
for i in range(0, 4):
children.append(CodisProxy(11080 + i, 19000 + i, product_name, product_auth))
check_alive(children, 3)
print("[OK] setup codis-fe & codis-dashboard & codis-proxy")
# step3: init slot-mappings
for i in range(0, 4):
gid = i + 1
codis_admin_dashboard(18080, "--create-group --gid={}".format(gid))
codis_admin_dashboard(18080, "--group-add --gid={} --addr=127.0.0.1:{} --datacenter=localhost".format(gid, 16380+i))
codis_admin_dashboard(18080, "--group-add --gid={} --addr=127.0.0.1:{} --datacenter=localhost".format(gid, 17380+i))
beg, end = i * 256, (i + 1) * 256 - 1
codis_admin_dashboard(18080, "--slots-assign --beg={} --end={} --gid={} --confirm".format(beg, end, gid))
codis_admin_dashboard(18080, "--resync-group --gid={}".format(gid))
for i in range(0, 5):
codis_admin_dashboard(18080, "--sentinel-add --addr=127.0.0.1:{}".format(26380+i))
codis_admin_dashboard(18080, "--slot-action --interval=100")
codis_admin_dashboard(18080, "--sentinel-resync")
check_alive(children, 3)
print("[OK] done & have fun!!!")
while True:
print(datetime.datetime.now())
time.sleep(5)