etcd 是一个分布式的键值存储系统,由 CoreOS 公司开发,主要用于为分布式系统提供可靠和高可用的配置管理和服务发现功能。etcd 基于 Raft 一致性算法设计,可以有效地处理网络分区等容错问题,确保数据在集群中的一致性和可靠性。
etcd 被广泛应用于 Kubernetes、Cloud Foundry、Mesos 等分布式系统和云原生应用中,充当了可信赖的配置存储和服务注册发现等重要角色。除此之外,etcd 也可作为分布式锁、队列服务、消息发布订阅系统等使用。总之,etcd 作为一个可靠的分布式键值存储框架,为构建分布式系统提供了很好的基础支持。
etcd 作为一个分布式的键值存储系统,具有以下一些显著的特点:
于此对应的,etcd 主要应用于以下几个场景:
首先我们添加依赖,这次我依然选择了命令行添加。
go get go.etcd.io/etcd/client/v3
执行完之后,mod 文件增加了一下内容:
go.etcd.io/etcd/api/v3 v3.5.14 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect
go.etcd.io/etcd/client/v3 v3.5.14 // indirect
首先我们使用前两天学到的 zap 框架添加一个全局的日志对象 logger:
var Logger *zap.SugaredLogger // 日志
func init() {
encoderConfig := zapcore.EncoderConfig{ // 创建编码配置
TimeKey: "T", // 时间键
LevelKey: "L", // 日志级别键
NameKey: "log", // 日志名称键
CallerKey: "C", // 日志调用键
MessageKey: "msg", // 日志消息键
StacktraceKey: "stacktrace", // 堆栈跟踪键
LineEnding: zapcore.DefaultLineEnding, // 行结束符,默认为 \n EncodeLevel: zapcore.CapitalLevelEncoder, // 日志级别编码器,将日志级别转换为大写
EncodeTime: zapcore.ISO8601TimeEncoder, // 时间编码器,将时间格式化为 ISO8601 格式
EncodeDuration: zapcore.StringDurationEncoder, // 持续时间编码器,将持续时间编码为字符串
EncodeCaller: zapcore.ShortCallerEncoder, // 调用编码器,显示文件名和行号
}
encoder := zapcore.NewConsoleEncoder(encoderConfig) // 创建控制台编码器,使用编码配置
atomicLevel := zap.NewAtomicLevel() // 创建原子级别,用于动态设置日志级别
atomicLevel.SetLevel(zap.InfoLevel) // 设置日志级别,只有 Info 级别及以上的日志才会输出
core := zapcore.NewCore(encoder, zapcore.Lock(os.Stdout), atomicLevel) // 将日志输出到标准输出
Logger = zap.New(core, zap.AddCaller(), zap.Development()).Sugar() // 创建 Logger,添加调用者和开发模式
}
单节点的 etcd 服务比较简单,集群的稍微麻烦一些,由于我只是用来作为演示服务,所以选择了最简单的方法:
brew install etcd
然后执行 etcd
命令即可启动一个 etcd 服务,默认端口号是 2379
。
我定义了一个全局的客户端,代码如下:
const timeOut = 5 * time.Second // 超时时间
var cli *clientv3.Client // etcd 客户端,全局变量
// init
//
// @Description: 初始化连接
func init() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"}, // etcd 服务器地址
DialTimeout: timeOut, // 连接超时时间
})
if err != nil {
panic("连接服务器失败!!!") // 初始化失败
} else {
cli = client // 初始化成功
}
}
然后我定义了一个关闭客户端的方法:
// close
//
// @Description: 关闭连接
func close() {
func(cli *clientv3.Client) {
err := cli.Close()
if err != nil {
Logger.Error("关闭连接失败!!!")
}
}(cli)
}
下面是根据官方文档写了一个读写测试:
func TestEtcd(t *testing.T) {
defer close() // 关闭连接
ctx, cancel := context.WithTimeout(context.Background(), timeOut) //
_, _ = cli.Put(ctx, "key", "value") // 写入键值对
cancel() // 取消上下文
ctx, cancel = context.WithTimeout(context.Background(), timeOut) // 重新创建上下文
resp, _ := cli.Get(ctx, "", clientv3.WithPrefix()) // 获取所有键值对
cancel() // 取消上下文
for _, kv := range resp.Kvs { // 遍历键值对
Logger.Infof("%s: %s\n", kv.Key, kv.Value) // 打印键值对
cli.Delete(context.Background(), string(kv.Key)) // 删除键值对
}
}
控制台输出内容:
=== RUN TestEtcd
2024-06-04T20:10:16.212+0800 INFO test/etcd_test.go:79 key: value
2024-06-04T20:10:16.222+0800 INFO test/etcd_test.go:85
2024-06-04T20:10:16.222+0800 INFO test/etcd_test.go:86 删除结果: 0
--- PASS: TestEtcd (0.04s)
PASS
etcd 框架一个主要特征就是分布式,可以用来进行分布式锁的实现,以及基于分布式锁其他功能的实现,下面分享 etcd 锁的使用。
func TestLock(t *testing.T) {
defer close() // 关闭连接
session, _ := concurrency.NewSession(cli) // 创建会话
defer session.Close() // 关闭会话
mutex := concurrency.NewMutex(session, "/funtester/") // 创建互斥锁,锁定 funtester 键
ctx := context.Background() // 创建上下文
if err := mutex.Lock(ctx); err != nil { // 加锁,如果失败,打印错误信息
Logger.Error("加锁失败:", err)
return
}
Logger.Info("加锁成功") // 加锁成功
if err := mutex.Unlock(ctx); err != nil { // 解锁,如果失败,打印错误信息
Logger.Error("解锁失败:", err)
return
}
Logger.Info("解锁成功") // 解锁成功
}
控制台打印日志如下:
=== RUN TestLock
2024-06-04T22:14:17.820+0800 INFO test/etcd_test.go:92 加锁成功
2024-06-04T22:14:17.826+0800 INFO test/etcd_test.go:97 解锁成功
--- PASS: TestLock (0.06s)
PASS