Etcd源码解析(转)

7 Etcd服务端实现

7.1 Etcd启动

Etcd有多种启动方式,我们从最简单的方式入手,也就是从embed的etcd.go开始启动,最后会启动EtcdServer。

先看看etcd.go中的启动代码:

func StartEtcd(inCfg *Config) (e *Etcd, err error)

从StartEtcd方法启动etcd服务,参数是初始配置信息config,启动集群间监听进程和客户端监听进程,最后启动EtcdServer。

主要代码:

e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
cfg := &e.cfg
if e.Peers, err = startPeerListeners(cfg); err != nil {
   return
}
if e.sctxs, err = startClientListeners(cfg); err != nil {
   return
}
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
   return
}
e.Server.Start()

startPeerListeners启动Peer监听,等待集群中其他机器连接自己。startClientListeners启动客户端监听Socket,等待客户端请求并响应。最后调用Start方法启动EtcdServer。

7.2 EtcdServer

EtcdServer位于etcdserver/server.go,定义了Server接口和EtcdServer对象。EtcdServer从逻辑上讲代表了一个完整的Etcd服务。

Etcd源码解析(转)

图7.1 Etcd服务端的功能示意图

Etcd服务端主要提供两大类客户端接口:

(1)集群配置

由memberHandler负责,提供添加集群成员,删除成员,更新成员信息三种接口服务。

(2)KV键值:由keysHandler负责。

KeysHandler接收到客户端请求后,调用EtcdServer的Do方法处理请求,Watcher类的客户端请求信息同样包含在keysHandler中了。KV键值响应主要在v2_server.go中定义,etcd新版本同时还提供了v3操作命令集,本文不讨论v3的源码实现。

7.2.1 接口定义

Etcdserver/server.go中定义了Server接口,是服务端的主接口,其中Do方法处理客户端请求。

Server.go中定义了EtcdServer对象,它是Server接口的实现类。Server中的Do接口是专门用来响应客户端请求的。

Server接口定义:

  • start

    读取配置文件,启动本Server。

  • stop

    停止本Server

  • ID

    获取本节点server的ID,集群中所有的机器都有唯一ID,用于标识自己。

  • Leader

    获取leader的ID

  • Do

    处理客户群请求,返回处理结果。

    定义:

    func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error)

    在server.go中并没有看到Go接口的实现,其实它是在v2_server.go文件中定义的。

  • Process

    Process(ctx context.Context, m raftpb.Message) error
    

    处理Raft消息。

  • AddMember

    向Etcd集群中增加一台服务器,新增服务器的ID必须唯一标识。

  • RemoveMember

    从集群删除一台服务器,删除服务器的ID必须已经存在于集群中。

  • UpdateMember

    修改集群成员属性,如果成员ID不存在则返回ErrIDNotFound错误。

7.2.2 实体定义

EtcdServer表示一个独立运行的Etcd节点。

type EtcdServer struct {
    inflightSnapshots int64
    appliedIndex      uint64
    committedIndex    uint64.
    consistIndex consistentIndex
    Cfg          *ServerConfig
    readych chan struct{}
    r       raftNode
    snapCount uint64
    w wait.Wait
    readMu sync.RWMutex
    readwaitc chan struct{}
    readNotifier *notifier
    stop chan struct{}
    stopping chan struct{}
    done chan struct{}
    errorc     chan error
    id         types.ID
    attributes membership.Attributes
    cluster *membership.RaftCluster
    store store.Store
    applyV2 ApplierV2
    applyV3 applierV3
    applyV3Base applierV3
    applyWait   wait.WaitTime
    kv         mvcc.ConsistentWatchableKV
    lessor     lease.Lessor
    bemu       sync.Mutex
    be         backend.Backend
    authStore  auth.AuthStore
    alarmStore *alarm.AlarmStore
    stats  *stats.ServerStats
    lstats *stats.LeaderStats
    SyncTicker *time.Ticker
    compactor *compactor.Periodic
    peerRt   http.RoundTripper
    reqIDGen *idutil.Generator
    forceVersionC chan struct{}
    wgMu sync.RWMutex
    wg sync.WaitGroup
    ctx    context.Context
    cancel context.CancelFunc
    leadTimeMu      sync.RWMutex
    leadElectedTime time.Time
}

7.2.3 Do

Do定义在v2_server.go中,处理客户群请求包,调用raftNode的Propose方法。在上一章已经介绍过。

对于KV键值请求,Do方法是在etcdServer/v2_server.go中定义的,它的相关代码逻辑如下:

func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
    r.ID = s.reqIDGen.Next()
    if r.Method == "GET" && r.Quorum {
        r.Method = "QGET"
    }
    v2api := (v2API)(&v2apiStore{s})
    switch r.Method {
    case "POST":
        return v2api.Post(ctx, &r)
    case "PUT":
        return v2api.Put(ctx, &r)
    case "DELETE":
        return v2api.Delete(ctx, &r)
    case "QGET":
        return v2api.QGet(ctx, &r)
    case "GET":
        return v2api.Get(ctx, &r)
    case "HEAD":
        return v2api.Head(ctx, &r)
    }
    return Response{}, ErrUnknownMethod
}

可以看到对客户端的KV键值请求,最终是通过v2apiStore的相关方法来实现。客户端的命令前缀为"/v2/keys"。支持的命令有以下这些:

  • GET/QGET:读取键值

  • POST:创建一个新的KV键值

  • PUT:重新设置键值的值

  • DELETE:删除已有键值

v2apiStore包含了EtcdServer引用。

type v2apiStore struct{ s *EtcdServer }

除了GET命令,其余Post,Put和Delete每个写操作请求最后都是通过processRaftRequest方法来处理的。

我们先看看GET命令的处理:

func (a *v2apiStore) Get(ctx context.Context, r *pb.Request) (Response, error) {
    if r.Wait {
        wc, err := a.s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
        if err != nil {
            return Response{}, err
        }
        return Response{Watcher: wc}, nil
    }
    ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted)
    if err != nil {
        return Response{}, err
    }
    return Response{Event: ev}, nil
}

看到对于普通的GET操作,直接调用store.Get方法获取KV值返回给客户端,如果是Watcher操作,则返回Watcher给客户端,客户端后续通过Watcher接口读取变化值。

对于POST,PUT,DELETE命令,走下述Propose流程处理。

Etcd源码解析(转)

图7.2 Propose流程示意图

比如"DELETE"命令。

func (a *v2apiStore) Delete(ctx context.Context, r *pb.Request) (Response, error) {
    return a.processRaftRequest(ctx, r)
}

processRaftRequest方法的源码如下:

func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Response, error) {
    data, err := r.Marshal()
    if err != nil {
        return Response{}, err
    }
    ch := a.s.w.Register(r.ID)
    start := time.Now()
    a.s.r.Propose(ctx, data)
    proposalsPending.Inc()
    defer proposalsPending.Dec()
    select {
    case x := <-ch:
        resp := x.(Response)
        return resp, resp.err
    case <-ctx.Done():
        proposalsFailed.Inc()
        a.s.w.Trigger(r.ID, nil) // GC wait
        return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
    case <-a.s.stopping:
    }
    return Response{}, ErrStopped
}
  • data, err := r.Marshal()语句:

    这条语句从pb.request得到请求数据data

  • ch := a.s.w.Register(r.ID)语句:

    注册chain,一直等待直到ch有响应数据。

    Register方法是wait的Register方法。该方法直到调用wait的Trigger方法后才会有数据从而触发select在该Register Id上线程被唤醒。Wait在pkg/wait中定义。

  • a.s.r.Propose(ctx, data)

    Propose方法在node中定义,raftNode在etcdserver/node.go文件中。Propose将写事务请求发给Leader,等待集群间同步。Propose集群间同步消息完成后会唤醒a.s.w.Register语句。

    调用raft/node的Propose方法处理写事务请求,进一步调用step方法将写事务封装成MsgProp消息并传递给集群中其他机器。

    func (n *node) Propose(ctx context.Context, data []byte) error {
        return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
    }

    step会调用StepFunc函数来处理MsgProp消息,根据leader,follower,candidate等运行状态分别调用不同的实现函数。

  • select语句

    select … case …语句类似于Socket通信中的select语句,它的含义是只要任意一个case语句有数据返回就往下执行,否则就阻塞在这里让出CPU给其他线程执行。

    case x := <-ch:当ch有值时,将ch赋值给x变量,同时唤醒case语句被执行,这里将执行以下代码:

    resp := x.(Response)
    return resp, resp.err
    

    此时将ch中的返回结果Response回复给调用者(即客户端)。

    case <-ctx.Done():说明上下文被中断,Context的Done()被触发,此时写事务执行失败,返回空Response。

7.2.4 初始化

Etcd服务端主要由5大组件构成,他们的分工如下:

  • etcdServer:主进程,相当于整个Etcd的容器,包含了raftNode,WAL,snapshotter等多个关键组件。

  • raftNode:执行raft协议,保证写事务的集群一致性维护。

  • Store:管理维护Etcd数据库

  • Wal:管理事务日志

  • Snapshotter:负责数据快照,管理store数据库在内存中和磁盘上的相互转换。

raftNode除了负责集群间raft消息交互,还负责事务和快照的存储,保持数据一致性。

Etcd定义了一个storage数据结构,一起负责事务和快照。

type storage struct {
    *wal.WAL
    *snap.Snapshotter
}

storage中没有指定WAL和Snapshotter的变量名称,这两个类的方法都可直接通过storage来调用,比如WAL的Save方法,可以通过storage.Save来调用,也可以通过storage.WAL.Save来调用,这两者是等价的,在阅读源码的时候要注意这一点,否则对Go语法不太了解的读者会感到迷惑。

func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
    st := store.New(StoreClusterPrefix, StoreKeysPrefix)
    var (
        w  *wal.WAL
        n  raft.Node
        s  *raft.MemoryStorage
        id types.ID
        cl *membership.RaftCluster
    )
    haveWAL := wal.Exist(cfg.WALDir())
    ss := snap.New(cfg.SnapDir())
    bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
    beExist := fileutil.Exist(bepath)

    switch {
    case haveWAL:
        snapshot, err = ss.Load()
        if snapshot != nil {
            if err = st.Recovery(snapshot.Data); err != nil {
                plog.Panicf("recovered store from snapshot error: %v", err)
            }
        }
        cfg.Print()
        if !cfg.ForceNewCluster {
            id, cl, n, s, w = restartNode(cfg, snapshot)
        } else {
            id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
        }
        cl.SetStore(st)
        cl.SetBackend(be)
        cl.Recover(api.UpdateCapability)
    }
    if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
        return nil