技术人员做数据质量治理实践总结

1 导语

本人是腾讯游戏市场平台部的一名开发人员,目前主要负责O2广告投放系统的开发以及数据质量治理工作。O2是市场平台部用于做游戏广告投放以及相关效果数据回收展示的系统。该系统不仅在功能上支持广告的高效精准投放,也同时是一个较为庞大的数据系统,每天O2都会有大量的数据流入、计算、以及可视化。
当一个数据系统越来越复杂,参与方越来越多,其需要管理的数据量越来越庞大时,数据治理尤其是针对数据质量的治理就变得越来越重要且紧迫了。
本篇文章主要是对我过去一段时间针对O2所做的数据质量治理工作做一总结与分享,希望能够帮助到同样在做数据质量治理工作的同学。

2 什么是数据质量治理

要明确什么是数据质量治理,首先我们需要了解一下什么是数据治理。
数据治理的定义是,在管理数据资产过程中行使严格的管控(包括计划、实施和监控),以确保组织可以从数据中获取最大的价值。数据治理包括了为各种应用场景设计数据模型、安全的存储与访问数据、合理的共享数据、以及保障数据满足用户的需求等。
其实,这一切的前提都是数据本身必须是可靠且可信的,换句话说,数据应该是高质量的。而数据质量治理,就是利用数据治理的手段,不断提高系统数据的可靠与可信性的过程。

3 数据质量治理的必要性

提高数据的质量本身并不是目的,而是我们团队获取最大化利益的一种手段。值得信赖的数据不仅可以提升决策人员的决策效率以及成果,也可以降低发生风险的概率。当使用者使用可靠的数据时,他们可以更快、更一致地回答问题,做出决策。如果数据是高质量的,他们也能花更少的时间发现问题,而将更多的时间用于使用数据来获得洞察力、做决策、服务用户。因此,对于一个数据系统来说,数据质量治理是非常必要且紧迫的工作。

4 实践经验总结思维导图

在这里插入图片描述

我在实践中,对数据质量治理做了一个总结,整理出了如下思维导图。不过,数据质量治理不是一个项目,而是一个持续性工作,因此,以后肯定也会不断对该总结做修改与完善。
文章的后续部分会对该导图中的内容一一做分析讲解。

5 技术人员如何做数据质量治理

5.1 了解数据系统的使用者

在这里插入图片描述


我觉得在做数据质量治理工作的时候,多去了解数据系统的使用者需求是非常重要的。尤其是对于技术人员来说,因为技术人员可能和产品的使用者中间隔了一个产品经理,有些技术同学可能就不太想去了解太多,只是产品经理提了什么就做什么。或者只被动的从产品经理口中了解了一部分用户的需求,但这些还不够。
我们应该主动的去了解更多的背景细节。当我们了解清楚了是谁在用我们的系统,他们是什么角色,他们分别高频或低频查看哪些报表,他们对哪些报表的数据质量要求更高等等细节,我们才可以更高效的划分自己手头上繁多工作的优先级,按照优先级高低稳扎稳打的推进工作。

5.2 数据质量改进生命周期

在这里插入图片描述

在开展工作时,确定下来一个固定的工作流程是非常重要的事,可以帮助提升工作效率以及降低可能犯错的概率。
对于数据质量治理工作类似,也应该确定一个数据质量改进的生命周期,我们可以按照这个生命周期去不断迭代的提升自身数据系统的数据质量。
我在实践过程中,总结出的数据质量改进生命周期如图:

(1) 发现问题
发现问题的方式主要有两种,包括通过监控或告警提前发现目前存在或潜在的数据质量问题,以及外部使用者在使用过程中反馈过来的数据质量问题。
当然,我们希望尽可能在外部使用者发现问题之前,通过系统监控告警或者人工核验的方式提前发现并解决问题。但是,要做到外部使用者完全没有任何数据问题的反馈也是不大可能(其中还包括口径以及外部人员信息不对称的问题),因此,我们在收到外部反馈的数据问题时也不必过于惊慌,重要的是先做好记录,方便接下来的问题排查、修复,以及方便以后的历史工作回溯。
(2)定位本质原因
发现问题之后,首先要做的并不是立刻把问题解决掉(除非情况紧急),而是在问题仍存在时,精准的定位出导出该问题的本质原因。因为,有时候着急处理掉问题,会使定位导致发生该问题的本质原因变得极难排查。而我们只有精确的定位出本质原因,才能有效避免以后再次发生类似的数据问题。
(3)修正或补录数据
在排查出了导致该问题的本质原因后,我们就要立刻修正缺失或有误的数据了。这里值得注意的是,我们应该去建立一个快速的数据补录重跑机制,避免在这种重复性工作上花费过多时间与精力。
(4)系统化解决问题
我们将数据修正之后,或许近期使用者暂时就不会发现什么类似的问题了。但是,如果导致数据质量问题的本质原因没有被定位并解决掉,那么势必此类数据质量问题还会出现。因此,我们在数据修正之后,接下来要做的就是尽快解决掉之前导致数据质量问题的根源性问题。而且,要以系统化的方式去解决,避免解决了一个问题,又导致出了另外一些问题。或者仅解决一个眼下问题,而对未来潜在风险视若无睹。
(5)检查确认
这一步也是非常必要的。我们在解决完问题之后,一定要对数据准确性进行核验,并且确认系统还有没有此类漏洞,确认系统下次不会再次发生此类数据问题。

5.3 数据质量治理的思路方法

在这里插入图片描述

数据质量治理可以从三方面去考虑并推进工作。
首先是数据问题的预防,创建高质量数据的最佳方法是防止低质量数据进入组织。然后就是数据问题的提前发现,因为我们都知道,当数据系统较大且复杂时,数据问题不可避免的会出现。但我们要尽可能在外部使用者反馈出问题之前,将问题发现并解决掉,这样外部对于数据系统才会有信任感。最后,就是数据问题的解决,我们应该建立一个快速解决问题的机制,将数据问题按照优先级,快速精准的解决掉。参考如下思维导图:

5.3.1 数据问题的预防

(1)建立数据输入控制
这一点非常重要,我们只有对流入系统的数据严格把控,才能从源头上避免出数据问题。如果在源头都没有把控好,那么数据后续在系统中的各种流转计算都是白费功夫。
首先是数据源数据的准确性保证,我们O2主要有三种数据源的输入方式:从媒体端拉取数据、媒体端给我们上报数据、人工在系统上录入数据。
因此,针对这三种录入数据的方式,我们都要进行把控。
(2)适当的冗余计算
我们其实可以用计算资源换数据质量,在进行核心数据计算时,不妨适当的增加冗余计算。这里的冗余计算指的是,比如数据源拉取任务如果是非常核心的数据任务,且我们担心它会有意外导致失败,我们可以多增加一次计算,虽然这种方式略显笨拙,但可以更加确保数据不会有缺失。
(3)培训数据录入者
对于数据录入者,我们也要进行规范化培训。因为系统不是万能的,有许多人为的操作,系统很难以纠正。因此对数据录入部分的录入者也要有规范性上的培训。

5.3.2 数据问题的提前发现

(1)脚本任务增加主动告警
我们可以给核心任务增加主动告警,在脚本执行失败或未执行完成的情况下能够第一时间发现并处理。
(2)每日检查监控告警可视化界面
对于监控告警的可视化检查,是因业务不同而异的,我们这里有许多每日的离线任务,这种任务一般都是在凌晨执行。那么,我每天早上在上班时,确认一眼昨日任务的执行情况就行。提前发现并处理问题要比问题出现后解决花的时间少得多。
(3)数据剖析并作邮件告警
我们可以制定一些符合自身业务特征的规则,对数据结果进行自动的判断剖析,来确认数据是否是高质量的。如果发现了不满足规则的情况,那么就及时做告警推送给相关负责人注意。
比如,在我们O2这里,有些字段值是不能为空的(某一天某个游戏完全没有消耗费用),或者某个游戏广告的消耗金额很大,却没有任何注册新进,这种就是明显的数据不合理。
关于数据剖析的角度还有很多,我们需要根据自身业务情况来进行规则制定,详细的可参考思维导图中罗列的一些。
(4)人工定期对核心数据进行对账
对于非常重要的核心数据,我们有必要专人进行定期的数据核验,人工总是能够发现一些系统难以发现的问题。

5.3.3 数据问题的解决

(1)数据问题的优先级划分
优先级是很重要的事情,意味着技术人员在协助定位解决问题时,要选择哪一个优先处理,哪一些可以暂且放一放。当然这不仅仅是指需要技术人员协助解决的问题较多时。
就算数据问题较少甚至只有一两个,优先级的划分依然很重要,因为技术人员手上可能会有很多重要的事情在做,比如需求开发、代码重构、完善监控等。
如果数据问题的优先级划分清楚,也能够帮助技术人员更宏观更合理的安排时间。
(2)数据问题的根因快速定位
定位数据问题的原因,我的实践经验是,首先能交给使用者定位是最好的。因为有许多数据问题不一定是真正的数据问题,如果所有使用者一碰到难以理解的问题就来找技术人员协助定位,那技术人员会花费过多时间在问题定位上的,反倒没有时间去做其他重要不紧急的事情。导致数据问题会越堆积越多的。因此,可以给使用者做一些自助排查的系统,去协助他们找到问题原因,如果是真正需要技术人员帮忙解决的,那么再找到技术人员协助解决。
另外,就是可以将我们数据流中间结果的数据可视化出来,便于在最终结果报表缺失或有误的情况下,能够快速定位出是数据流中哪一个环节出了问题。
最后,其实如果技术人员对整个系统较为了解,且排查问题的经验较丰富,实际上是可以依据直觉快速定位出问题原因的。因此,作为技术人员,多熟悉数据系统,定位问题时多思考多总结也是很重要的。
(3)数据的快速重算或补录
发现数据问题后,如何快速的修正数据也是需要思考的问题。建议可以针对每一个环节都设计一个快速补录数据的方式。使得可以提升重跑数据的效率。
比如我们O2系统,会设置一些补录任务,分别针对数据源需要重拉取、中间结果需要重新计算以及结果报表需要重新展示等。

5.4 数据质量问题的常见原因

在这里插入图片描述

在做数据质量治理的工作中,记录并总结自身系统中常见的数据质量问题,并且思考系统性的解决方案是非常重要的一件事。
我在实践中对O2数据质量问题常见的原因做了一些总结。主要有四点:
数据输入过程引起的问题、数据处理功能引起的问题、系统设计引起的问题、解决问题引起的问题。
参考如下思维导图:

5.4.1 数据输入过程引起的问题

由于O2作为数据系统,有许多接口需要人工去参与数据输入,包括配置账户、配置媒体信息、录入媒体的消耗费用信息等。因此,这一部分也是O2系统最容易导致数据问题的地方。
(1)数据输入接口不合理
对于技术人员来说,要想协助减少数据录入的问题,最重要的就是假设人为操作充满了各种失误。因此,可以从两方面去考虑协助优化。
第一,降低使用者参与系统的程度,能让系统自动做的事情就不要让人工去参与,机器出现失误的概率是要远低于人为操作的。
第二,对于必须要人工参与的部分,对使用者输入的各种数据进行全面的校验就显得尤其重要。这里可能需要注意,业务规则一般会变化较频繁,要定期review校验规则,注意这些规则是否合理以及是否全面。
(2)显示条目放置不合理
对于需要使用者注意的信息条目,一定要前置或者标注以提醒。比如某些使用者必须配置的地方,或者使用者在使用过程中易失误的地方。
有时候仅仅是显示条目放置的更合理些,就会减少很多不必要的操作失误发生。
(3)人工操作失误问题(系统无法协助校验的)
有些人工操作失误的问题,是系统也无法帮忙校验的。这种情况,我们技术人员可以做的事情就是用告警邮件的方式,定时通知到相关负责人,告知他们哪些地方可能是有问题的。推动相关业务负责人去检查并修正问题。
比如O2系统中,会每天发送告警邮件,将各种配置异常的情况告知给相关人员。
另外,就是需要有专门针对数据系统使用者的培训,虽然无法完全避免,但也要尽可能减少人工操作的失误。
(4)业务流程的变更
业务流程随着时间的推移而变化,在变化过程中引入了新的业务规则和数据质量要求。但是,这些业务规则的更改并不一定总能被及时或全面的纳入系统。如果接口未升级以适应新的或变化的需求,将导致数据质量问题发生。
因此,业务规则的更改应当同步给整个系统的负责人或系统各个可能影响到的模块的负责人,否则数据系统很可能会受到影响。
(5)业务流程执行混乱
通过混乱的流程创建的数据很可能会不一致。混乱的流程很可能是由培训、文档不完善或需求的随意变化导致的。这些方面也应当受到重视。

5.4.2 数据处理功能引起的问题

(1)有关数据源的错误假设
在我实践过程中发现,我们有时会假定数据源是没有任务问题的,但这种想法是不正确的。有时第三方数据源的问题一样会导致自身数据系统出现数据质量问题。
首先第三方来的数据源本身就有问题,这个我们就无法干涉处理了,只能够在数据源修复问题后重跑自身数据。
还有一种情况是数据源对接的接口文档不完整或过时,这时可能也会引起自身数据系统出现数据问题。因此,就算是对接后的第三方数据源,我们也应当持续定期review其接口文档,或者在接到接口变更的信息时,及时对自身数据系统做适配调整。
另外,有些第三方数据源会有快速的数据接口(在线实时任务),以及较慢速的数据接口(离线任务),可能相同的接口在不同的时间点会有数据修正的发生,因此,也要注意自身数据获取任务的执行时间点,避免数据修正后却没有重跑任务更新最新的数据。
(2)过时的业务规则(变更了业务规则,没有及时同步到数据开发者)
有时候由于沟通的问题,会出现业务规则没有提前或及时同步给技术人员,导致出现一些数据问题。这就要求业务与开发同学需要有有效的信息同步渠道。
(3)变更的数据结构
一般情况下,较复杂的数据系统的数据链路都比较长,如果上游数据有如字段名称、字段类型这样的变化但没有及时同步给下游数据负责人,那么也一定会出现数据问题。

5.4.3 系统设计引起的问题

从系统设计角度来说,如果系统设计的不合理或不完善,那么也非常容易导致数据出现问题。
(1)未执行参照完整性
如果没有强制的执行参照完整性,或者关闭了验证参照完整性,那么很可能会导致出现数据问题。如产生破坏唯一性约束的重复数据或者由于丢失的数据被分配为默认值而导致的数据质量问题等。
(2)未执行数据的唯一性约束/唯一性约束有误
底层库表的唯一键约束需要考虑清楚,唯一键设置不合理,很容易导致数据出现翻倍的情况。
(3)数据模型不准确
如果数据模型内的假设没有实际数据的支持,则容易出现数据质量问题。在设计库表时,要考虑好以后的兼容性,避免出现字段类型设置不合理导致的数据问题。如实际数据超出字段长度导致数据丢失。
(4)时间数据不匹配
在没有统一规范的情况下,多个系统可能会采用不同的日期或时间格式,当不同源系统之间数据同步或计算时,可能会导致数据不匹配和数据丢失。这里可以推广一下,不仅仅是时间格式,其他应当规范的字段格式也一定要做约束与规范。
(5)字段值定义不清晰
有些字段值会包含特定的含义,若不制定严格的规范约束,不同技术人员可能会使用相同的值来表示不同的含义,这样就会导致数据不匹配或丢失。
比如,O2中有一个常用的字段platid,表示设备的ID值。若不强制约束,可能有的人用0表示iOS平台,有的人用2表示iOS平台,那么在后续的汇总计算时,就可能会出现严重的数据问题。
(6)数据复制
不必要的数据复制通常时数据管理不当造成的,一般情况下,不必要的数据复制都会带来害处。有害的数据复制通常有两种情况:
一种是单源但有多个本地实例,比如有些配置表,在数据库系统中存储有多份,不清楚的人很难知道该用哪一个实例,而这些实例不一定就是完全同步的,会可能会造成数据质量问题。
另一种是多源但仅一个本地实例,比如外部数据源给了多个数据源,这些数据源可能是不同层级的数据,相互间又有关系,如果我们这里只存一个实例,需要认真判断该用哪个源。

5.4.4 解决问题引起的问题

在解决数据问题时,通常为了快速便捷而采用直接在数据库修改或脚本手动执行的方式,而且由于修复问题都比较仓促,非常容易引起额外的数据问题。比如数据更改错误、数据更改后未通知到下游数据等,经常为解决这些问题会花费更多的时间。
因此,非常不建议在解决问题时仓促着急。在解决数据问题时,也有以下几点需要注意:
(1)直接更改数据库数据一定要手动开启及提交事务,在遇到觉得更改的行数异常的情况下,方便直接回滚至原状态。
(2)尽可能保留原始数据,不要直接采用覆盖的方式写数据。
(3)重要数据要先备份。
(4)注意到使用该数据的下游数据,将更改信息及时做同步。

5.5 数据质量治理工作的复盘与同步

在这里插入图片描述


与数据治理和整体的数据管理一样,数据质量治理并不是一个项目,而是一项持续性工作。它包括了项目和维护工作,以及沟通和培训工作。最重要的是,数据质量改进取得长期成功取决于组织的质量观念的建立或改变。
简单来说,数据质量取决于所有与数据交互的人,而不仅仅是数据治理的专业人员。因此,在数据质量治理工作中,尤其是治理的前期,复盘与同步是一件很重要的事。可以定期做一次复盘以及数据质量治理情况的同步,让相关人都可以及时同步到数据治理情况以及逐渐建立高数据质量的观念。

6 结语

就像前面说的,数据质量治理不仅仅是一项技术活,更多的是一项持续性的维护工作。因此,我们要调用一切方法,而不仅仅局限于技术方法去解决数据质量问题。另外,只要有复杂的数据系统,也就一定会有数据质量的问题存在。所以,对于数据质量的追求应该是持续不断的。
本人接触数据质量治理还不到一年的时间,而且,我们项目目前的数据系统复杂性还远不如一些公司级的产品。因此,对于数据质量治理工作的了解还只是皮毛,总结复盘的内容也不够系统完善,接下来会继续推进该项工作,也会继续总结数据质量治理的经验与教训。
也恳切希望有经验的前辈同行对于不正确或不完善的地方予以指点批评,感恩。

7 参考资料

《数据产品设计》
《DAMA数据管理知识体系指南》

etcd后端存储源码解析——底层读写操作

背景

最近想找一些用Go语言实现的优秀开源项目学习一下,etcd作为一个被广泛应用的高可用、强一致性服务发现存储仓库,非常值得分析学习。
本篇文章主要是对etcd的后台存储源码做一解析,希望可以从中学到一些东西。

etcd大版本区别

目前etcd常用的是v2和v3两个大版本。两个版本不同之处主要在于:

  1. v2版本仅在内存中对数据进行了存储,没有做持久化存储。而v3版本做了持久化存储,且还使用了缓存机制加快查询速度。
  2. v2版本和v3版本对外提供的接口做了一些改变。在命令行界面中,可以使用环境变量ETCDCTL_API来设置对外接口。

我们在这里主要是介绍v3版本的后台存储部分实现。 并且这里仅涉及到底层的读写操作接口,并不涉及到更上层的读写步骤(键值的revision版本选择等)。

etcd的后端存储接口

分析思路:

  1. 查看etcd封装的后端存储接口
  2. 查看etcd实现了后端存储接口的结构体
  3. 查看上述结构体的初始化方法
  4. 查看上述结构体的初始化值
  5. 查看上述结构体初始化方法的具体初始化过程

首先,我们先来看下etcd封装的后端存储接口:
路径:https://github.com/etcd-io/etcd/blob/master/mvcc/backend/backend.go

type Backend interface {
    // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
    ReadTx() ReadTx
    BatchTx() BatchTx
    // ConcurrentReadTx returns a non-blocking read transaction.
    ConcurrentReadTx() ReadTx

    Snapshot() Snapshot
    Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
    // Size returns the current size of the backend physically allocated.
    // The backend can hold DB space that is not utilized at the moment,
    // since it can conduct pre-allocation or spare unused space for recycling.
    // Use SizeInUse() instead for the actual DB size.
    Size() int64
    // SizeInUse returns the current size of the backend logically in use.
    // Since the backend can manage free space in a non-byte unit such as
    // number of pages, the returned value can be not exactly accurate in bytes.
    SizeInUse() int64
    // OpenReadTxN returns the number of currently open read transactions in the backend.
    OpenReadTxN() int64
    Defrag() error
    ForceCommit()
    Close() error
}

Backend接口封装了etcd后端所提供的接口,最主要的是:
ReadTx(),提供只读事务的接口,以及BatchTx(),提供读写事务的接口。
Backend作为后端封装好的接口,而backend结构体则实现了Backend接口。
路径:https://github.com/etcd-io/etcd/blob/master/mvcc/backend/backend.go

type backend struct {
    // size and commits are used with atomic operations so they must be
    // 64-bit aligned, otherwise 32-bit tests will crash

    // size is the number of bytes allocated in the backend
    // size字段用于存储给后端分配的字节大小
    size int64
    // sizeInUse is the number of bytes actually used in the backend
    // sizeInUse字段是后端实际上使用的内存大小
    sizeInUse int64
    // commits counts number of commits since start
    // commits字段用于记录启动以来提交的次数
    commits int64
    // openReadTxN is the number of currently open read transactions in the backend
    // openReadTxN存储目前读取事务的开启次数
    openReadTxN int64

    // mu是互斥锁
    mu sync.RWMutex
    // db表示一个boltDB实例,此处可以看到,Etcd默认使用Bolt数据库作为底层存储数据库
    db *bolt.DB

    // 用于读写操作
    batchInterval time.Duration
    batchLimit    int
    batchTx       *batchTxBuffered

    // 该结构体用于只读操作,Tx表示transaction
    readTx *readTx

    stopc chan struct{}
    donec chan struct{}

    // 日志信息
    lg *zap.Logger
}

通过19行 db *bolt.DB 我们可以看到,etcd的底层存储数据库为BoltDB。
好了,接下来我们就看一下这个backend结构体是如何初始化的。
还是在该路径下,我们可以看到New函数

// 创建一个新的backend实例
func New(bcfg BackendConfig) Backend {
    return newBackend(bcfg)
}

该函数传入了参数bcfg,类型为BackendConfig,这是后端存储的配置信息。
我们先看下这个配置信息中包含了什么
依然在该路径下,找到BackendConfig结构体

type BackendConfig struct {
    // Path is the file path to the backend file.
    Path string
    // BatchInterval is the maximum time before flushing the BatchTx.
    // BatchInterval表示提交事务的最长间隔时间
    BatchInterval time.Duration
    // BatchLimit is the maximum puts before flushing the BatchTx.
    BatchLimit int
    // BackendFreelistType is the backend boltdb's freelist type.
    BackendFreelistType bolt.FreelistType
    // MmapSize is the number of bytes to mmap for the backend.
    // MmapSize表示分配的内存大小
    MmapSize uint64
    // Logger logs backend-side operations.
    Logger *zap.Logger
    // UnsafeNoFsync disables all uses of fsync.
    UnsafeNoFsync bool `json:"unsafe-no-fsync"`
}

可以看到,有许多backend初始化所需要的信息都在这个结构体中。
既然有这些配置信息,那么一定会有相应的默认配置信息,
我们来看下在默认情况下etcd存储部分会被赋怎样的值。
依然在该目录下,找到DefaultBackendConfig函数。

func DefaultBackendConfig() BackendConfig {
    return BackendConfig{
    BatchInterval: defaultBatchInterval,
    BatchLimit: defaultBatchLimit,
    MmapSize: initialMmapSize,
    }
}

随便查看其中某个全局变量的值,比如defaultBatchInterval,则可以看到默认值:

var (
    defaultBatchLimit = 10000
    defaultBatchInterval = 100 * time.Millisecond
    defragLimit = 10000
    // initialMmapSize is the initial size of the mmapped region. Setting this larger than
    // the potential max db size can prevent writer from blocking reader.
    // This only works for linux.
    initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
    // minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
    minSnapshotWarningTimeout = 30 * time.Second
)

defaultBatchInterval变量为例,就是说默认情况下,etcd会100秒做一次自动的事务提交。
etcd后端存储默认赋值的部分说完了,就说回对结构体的初始化上。
我们继续看函数New,它调用了函数newBackend
我们看下函数newBackend做了些什么

func newBackend(bcfg BackendConfig) *backend {
    if bcfg.Logger == nil {
        bcfg.Logger = zap.NewNop()
    }

    // 一些配置载入
    bopts := &bolt.Options{}
    if boltOpenOptions != nil {
        *bopts = *boltOpenOptions
    }
    bopts.InitialMmapSize = bcfg.mmapSize()
    bopts.FreelistType = bcfg.BackendFreelistType
    bopts.NoSync = bcfg.UnsafeNoFsync
    bopts.NoGrowSync = bcfg.UnsafeNoFsync

    // 初始化Bolt数据库
    db, err := bolt.Open(bcfg.Path, 0600, bopts)
    if err != nil {    
        bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
    }

    // In future, may want to make buffering optional for low-concurrency systems
    // or dynamically swap between buffered/non-buffered depending on workload.
    // 对backend结构体做初始化,包括了readTx只读事务以及batchTx读写事务
    b := &backend{
        db: db,

        batchInterval: bcfg.BatchInterval,
        batchLimit: bcfg.BatchLimit,

        readTx: &readTx{
            baseReadTx: baseReadTx{
                buf: txReadBuffer{
                    txBuffer: txBuffer{make(map[string]*bucketBuffer)},
                },
                buckets: make(map[string]*bolt.Bucket),
                txWg: new(sync.WaitGroup),
                txMu: new(sync.RWMutex),
            },
        },

        stopc: make(chan struct{}),
        donec: make(chan struct{}),

        lg: bcfg.Logger,
    }
    b.batchTx = newBatchTxBuffered(b)
    // 开启一个新的etcd后端存储连接
    go b.run()
    return b
}

我们可以看到6-19行在初始化boltDB的同时载入了一些数据库的配置信息。
23-41行是对backend结构体做了初始化,包括了只读事务readTx、读写事务batchTx结构体的初始化,以及初始化了两个通道stopc、donec,这个后面会用到。
43行开启了一个协程去并发的处理run()函数内的工作。
我们继续看一下run()函数做了什么。依然在该目录下

func (b *backend) run() {
    // 关闭结构体的donec通道
    defer close(b.donec)
    // 开启一个定时器
    t := time.NewTimer(b.batchInterval)
    // 最后要关闭定时器
    defer t.Stop()
    for {
        select {
            // 当定时器到时间了,则t.C会有值
            case <-t.C:
            case <-b.stopc:
                b.batchTx.CommitAndStop()
                return
            }
        // 定时器到时间了,且数据的偏移量非0,即有数据的情况下,则会进行一次事务的自动提交
        if b.batchTx.safePending() != 0 {
            b.batchTx.Commit()
        }
        // 重新设置定时器的时间
        t.Reset(b.batchInterval)    
    }
}

我在代码中注释的比较详细了,简单的说,就是在初始化backend结构体时,开启了一个协程用于事务的自动提交,事务自动提交的时间间隔为batchInterval,这个默认值为100秒。
注意12-14行,这段代码表示,如果是停止信号进来的话,则事务会立即提交并且停止。
到这里,backend结构体就初始化完成了,接下来我们看一下用于读操作的只读事务接口ReadTx

etcd后端存储的读操作

路径:https://github.com/etcd-io/etcd/blob/master/mvcc/backend/read_tx.go

type ReadTx interface {
    Lock()
    Unlock()
    RLock()
    RUnlock()

    UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
    UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
}

该接口是用结构体baseReadTx实现的,来看一下baseReadTx结构体,文件路径与ReadTx接口一样

type baseReadTx struct {
    // buf与buckets都是用于增加读效率的缓存
    // mu用于保护txReadBuffer缓存的操作
    mu  sync.RWMutex
    buf txReadBuffer

    // txMu用于保护buckets缓存和tx的操作
    txMu    *sync.RWMutex
    tx      *bolt.Tx
    buckets map[string]*bolt.Bucket
    // txWg可以防止tx在批处理间隔结束时回滚,直到使用该tx完成所有读取为止
    txWg *sync.WaitGroup
}

只读事务ReadTx的读取数据的接口有两个,分别是UnsafeRange以及UnsafeForEach。我们以UnsafeRange接口为例进行代码分析。
UnsafeRange接口的实现依然在上述路径中

// 该方法用于底层数据的只读操作
func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
    // 不使用范围查询
    if endKey == nil {
        // forbid duplicates for single keys
        limit = 1
    }
    // 当范围值异常时,则传入最大范围
    if limit <= 0 {
        limit = math.MaxInt64
    }
    if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
        panic("do not use unsafeRange on non-keys bucket")
    }
    // 将buf缓存中的数据读取出来
    keys, vals := baseReadTx.buf.Range(bucketName, key, endKey, limit)
    // 如果取出的数据满足了需求,那么则直接返回数据
    if int64(len(keys)) == limit {
        return keys, vals
    }

    // find/cache bucket
    // 从bucket缓存中查询bucket实例,查询到了则返回缓存中的实例,查询不到,则在BoltDB中查找
    bn := string(bucketName)
    baseReadTx.txMu.RLock()
    bucket, ok := baseReadTx.buckets[bn]
    baseReadTx.txMu.RUnlock()
    lockHeld := false
    // 缓存中取不到bucket的话,会从bolt中查找,并写入缓存中
    if !ok {
        baseReadTx.txMu.Lock()
        lockHeld = true
        bucket = baseReadTx.tx.Bucket(bucketName)
        baseReadTx.buckets[bn] = bucket
    }

    // ignore missing bucket since may have been created in this batch
    if bucket == nil {
        if lockHeld {
            baseReadTx.txMu.Unlock()
        }
        return keys, vals
    }
    if !lockHeld {
        baseReadTx.txMu.Lock()
        lockHeld = true
    }
    c := bucket.Cursor()
    baseReadTx.txMu.Unlock()

    // 从bolt的该bucket中查找键值对
    k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
    return append(k2, keys...), append(v2, vals...)
}

4-14行是一些前置的判断步骤,16-20则是从buf缓存中读取数据,前面提到过,buf是etcd用于提高读取效率的缓存。
我们看下具体的从buf读取数据的过程。
Range函数在路径:https://github.com/etcd-io/etcd/blob/master/mvcc/backend/tx_buffer.go

func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
    if b := txr.buckets[string(bucketName)]; b != nil {
        return b.Range(key, endKey, limit)
    }
    return nil, nil
}

可以看到,该方法就是实例化了名为bucketName的桶,然后从该桶中按照范围读取键值数据。
我们可以看到,bucket的实例为结构体bucketBuffer

type bucketBuffer struct {
    buf []kv
    // used字段记录了正在使用的元素个数,这样buf无需重新分配内存就可以覆盖写入
    used int
}

看回到Range方法代码的第3行,我们来看一下b.Range方法的代码。b.Range与buf.Range方法不同,b.Range是结构体bucketBuffer实现的方法。
依然与Range方法相同路径

func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
    // 查找到key在buf中的索引idx
    f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
    // sort.Search用于从某个切片中查找某个值的索引
    idx := sort.Search(bb.used, f)
    if idx < 0 {
        return nil, nil
    }
    // 只查找一个key值,而非范围查找
    if len(endKey) == 0 {
        if bytes.Equal(key, bb.buf[idx].key) {
            keys = append(keys, bb.buf[idx].key)
            vals = append(vals, bb.buf[idx].val)
        }
        return keys, vals
    }
    // 根据字节的值来比较字节切片的大小
    // 如果endKey比key值小,则返回nil
    if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
        return nil, nil
    }
    // 在个数限制limit内,且小于endKey的所有键值对都取出来
    for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
        if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
            break
        }
        keys = append(keys, bb.buf[i].key)
        vals = append(vals, bb.buf[i].val)
    }
    return keys, vals
}

3-5行代码表示要从buf结构体中找到第一个满足包含key的索引值。该两行代码一般结合使用,是一种常见的用于查找值对应索引值的方式。

10-16行代码表示,如果endKey为0,即不使用范围查找,只查找key这一个精确值,那么就需要判断3-5代码找到的值是否与该key完全相等,只有完全相等了才会返回keys与vals。

19-21行代码表示,如果输入的endKey比key值还要小,那么就认为是输入的问题,则返回nil值。

最后19-30行代码表示,key与endKey都输入正常的情况下,则将limit内,大于等于key且小于endKey的键值对都取出来,并返回keys、vals结果。

到此,从buf缓存中就可以读取所需要的数据了,那么,我们回过头接着看UnsafeRange方法的实现,该方法在前面有提到。
我再次把代码贴在这里:

// 该方法用于底层数据的只读操作
func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
    // 不使用范围查询
    if endKey == nil {
        // forbid duplicates for single keys
        limit = 1
    }
    // 当范围值异常时,则传入最大范围
    if limit <= 0 {
        limit = math.MaxInt64
    }
    if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
        panic("do not use unsafeRange on non-keys bucket")
    }
    // 将buf缓存中的数据读取出来
    keys, vals := baseReadTx.buf.Range(bucketName, key, endKey, limit)
    // 如果取出的数据满足了需求,那么则直接返回数据
    if int64(len(keys)) == limit {
        return keys, vals
    }

    // find/cache bucket
    // 从bucket缓存中查询bucket实例,查询到了则返回缓存中的实例,查询不到,则在BoltDB中查找
    bn := string(bucketName)
    baseReadTx.txMu.RLock()
    bucket, ok := baseReadTx.buckets[bn]
    baseReadTx.txMu.RUnlock()
    lockHeld := false
    // 缓存中取不到bucket的话,会从bolt中查找,并写入缓存中
    if !ok {
        baseReadTx.txMu.Lock()
        lockHeld = true
        bucket = baseReadTx.tx.Bucket(bucketName)
        baseReadTx.buckets[bn] = bucket
    }

    // ignore missing bucket since may have been created in this batch
    if bucket == nil {
        if lockHeld {
            baseReadTx.txMu.Unlock()
        }
        return keys, vals
    }
    if !lockHeld {
        baseReadTx.txMu.Lock()
        lockHeld = true
    }
    c := bucket.Cursor()
    baseReadTx.txMu.Unlock()

    // 从bolt的该bucket中查找键值对
    k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
    return append(k2, keys...), append(v2, vals...)
}

看第16-20行,刚才我们已经分析了16行代码的具体实现。

18-19行则表示,如果返回的key的数量与limit相等,则就直接返回缓存中的数据即可。如果不是相等的,一般取出的key的数量小于limit值,也就是说,缓存中的数据不完全满足我们的查询需求,那么则需要继续向下执行,到etcd的底层数据库bolt中查询数据。

注意24-43行,首先etcd会从baseReadTx结构体的buckets缓存中查询查询bucket实例,如果缓存中查询不到该实例,则会从bolt数据库中查询并且将实例写入到缓存中。而如果bolt中也查询不到该bucket,则会直接返回之前从buf中查询到的keys与vals值。

如果从缓存或者bolt中查询到了bucket实例,那么,后续就可以直接从bolt中查询该bucket下的键值对了。

我们看一下52行的具体实现。

func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
    if limit <= 0 {
        limit = math.MaxInt64
    }
    var isMatch func(b []byte) bool
    // 如果有终止的key,则将找到的key与终止的key比较,是否key小于endkey
    // 否则,将找到的key与自身比较是否相等
    if len(endKey) > 0 {
        isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
    } else {
        isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
        limit = 1
    }

    // 循环查找key所对应的值, 然后与endkey做对比(如果有endkey的话)
    // 直到不满足所需条件
    for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
        vs = append(vs, cv)
        keys = append(keys, ck)
        if limit == int64(len(keys)) {
            break
        }
    }
    return keys, vs
}

先看一下传入unsafeRange方法的参数。

c为cursor实例,key为我们要查询的初始key值,endkey为我们要查询的终止key值。而limit是我们查询的范围值,由于我们之前用缓存已经查询出来了一些数据,因此,该范围其实是我们的总范围减去已经查询到的key值数。

其中,5-13行代码用到了匿名函数,用于判断查询到的key值是否依然满足需求。如果我们给到了endkey,那么就会对查到的key与endkey做比较。如果我们没有给endkey,那么就会直接判断查询到的key值是否等于我们要查询的key。

17-23行则为用于DB查询实现代码。

etcd后端存储的写操作

文章开头部分,我们讲到过etcd后端存储对外的接口Backend,其中包括了两个重要的接口:ReadTx以及BatchTx,ReadTx接口负责只读操作,这个我们在前面已经讲到了。
接下来,我们看一下etcd后端存储的读写接口BatchTx。
路径:https://github.com/etcd-io/etcd/blob/master/mvcc/backend/batch_tx.go

type BatchTx interface {
    ReadTx
    UnsafeCreateBucket(name []byte)
    UnsafePut(bucketName []byte, key []byte, value []byte)
    UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
    UnsafeDelete(bucketName []byte, key []byte)
    // Commit commits a previous tx and begins a new writable one.
    Commit()
    // CommitAndStop commits the previous tx and does not create a new one.
    CommitAndStop()
}

我们可以看到,BatchTx接口也包含了前面讲到的ReadTx接口,以及其他用于写操作的方法。batchTx结构体实现了BatchTx接口。

type batchTx struct {
    sync.Mutex
    tx      *bolt.Tx
    backend *backend

    // 数据的偏移量
    pending int
}

具体接口中方法的实现我们就不一一看了,因为都是直接调用了bolt数据库的接口,比较简单。

总结

本篇文章主要从源码角度分析了etcd后端存储的底层读写操作的具体实现。无论我们是使用命令行操作etcd,还是调用etcd的对外接口。最终在对键值对进行读写操作时,底层都会涉及到今天分析的这两个接口:ReadTx以及BatchTx。

然而,etcd的键值对读写其实还会涉及到许多其他的知识,比如revision的概念。接下来还会有文章继续对这些知识做解析。