使用librosa实现视频素材音乐卡点

背景

近期业务上有一个需求:需要将我们已有的一些视频素材作为数据源,结合有节奏感的BGM,输出一个能够卡点的视频素材。类似于这样的功能在很多视频剪辑软件如剪映、快影中都能够看到,它们会让用户上传多个视频,然后再让用户挑选一个喜欢的BGM,就可以快速输出一个卡点好的视频。我们利用librosa简单实现了这样的功能,并结合后台的一系列自动化工具,使用户能够快速得到大量的具有音乐卡点效果的视频素材,我们将该能力与现有系统相结合,提升了混剪视频制作效率并降低了制作成本。

本篇文章去除了业务上的一些细节,将如何利用开源工具快速实现音乐卡点功能做了总结整理。

使用到的开源工具

OpenCV:OpenCV(开源计算机视觉库)是一个开源计算机视觉和机器学习软件库。可以用于图像处理、视频处理、特征提取等很多场景。这里我们用它来获取视频帧率、尺寸、帧,并用于视频的帧级别的裁剪。

spleeter:spleeter是一个源分离器。可以用于音频的分离,将人声、鼓、贝斯和其他声音分离开来,便于后续的声音处理。

librosa:librasa是一个用于音乐和音频分析的 python 包。可以用于提取音频的采样值、采样率、节拍时间点等信息。

AudioCraft:AudioCraft 是一个用于音频生成深度学习研究的 PyTorch 库。 其中它的 MusicGen 功能可以用于生成音乐。

AudioSegment:AudioSegment 可以用于音频文件的裁剪。

整体逻辑

从整体逻辑看,整个流程不算复杂。共有如下几个步骤:

  1. 确定合适的数据源,也就是素材物料库,可以从原始物料中来,也可以从素材成片来。
  2. 离线将大量的视频做好切幕以及分类。
  3. 将上述分幕按照一定的规则导入到剪辑服务中。
  4. 剪辑服务输出成品卡点视频的地址,以供用户使用。

实现流程

BGM来自于成熟曲库

一般像剪映以及快影“剪同款”中的“卡点”功能就都算是BGM来自于成熟曲库。因为曲库中的音乐该在什么时刻进行卡点都已经是生成好了的。

后台需要做的事情仅就是将导入的多个视频裁剪为适合拼接的视频即可。这种卡点方式的实现流程比较简单。

  1. 接收用户传入的多个视频,此处校验视频时长是否满足需求。
  2. 导入音乐的卡点方案,这个卡点方案如果是时刻的数组,则可以使用 numpy 转为时间间隔的数组。如:将[1.345, 2.345, 3.456, 4.567, 5.678]转为[1.345, 1, 1.111, 1.111, 1.111]。便于后续视频的裁剪。
  3. 选择视频并进行视频裁剪,此处的视频可能是与卡点间隔一一对应的,也可能是随机或依据某个规则选择的。可以使用OpenCV进行视频裁剪,它可以实现帧级别的处理,非常适合卡点这种对时间要求较高的场景。

用OpenCV做视频裁剪的示例代码如下:

def video_stretch(input_file, output_file, dt):
    cap = cv2.VideoCapture(input_file)
    fps = cap.get(cv2.CAP_PROP_FPS)
    (w, h) = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    # 读取 video
    video = []
    success = 1
    while success:
        success, frame = cap.read()
        if success:
            video.append(frame)
    cap.release()
    # 选择片段
    frame_cnt = int(dt * fps)
    new_fps = int(dt * fps) / dt
    start = int((len(video) - frame_cnt) / 2)
    sub_video = video[start:(start + frame_cnt)]
    # 保存到新视频
    vision_writer = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'xvid'), new_fps, (w, h))
    for frame in sub_video:
        vision_writer.write(frame)
    vision_writer.release()

这里裁剪的片段是取的中间的片段,实际上也可以在此处加入算法,判断从何处截取视频。

  1. 使用 ffmpeg 做视频的拼接,示例代码如下:
concat_cmd = "concat:" + "|".join(
            [f"{workspaceMix}/bgm_stuck/tmp_ts/{scen_list_idx}_{i}.ts" for i in range(len(choose_list))])
cmd = 'ffmpeg -y -i \\"{concat_cmd}\\" -i "{audio_path}" -vcodec copy -absf aac_adtstoasc {output_file}'
os.system(cmd)

其中,{workspaceMix} 是剪辑机器上的工作空间。{scen_list_idx}_{i}.ts 是待拼接的视频片段。

ffmpeg 的拼接支持的比较简单,如果需要在视频间增加转场特效,还需要另外调用相应能力。

BGM来自于AI生成或用户上传

这种场景与上述BGM来自于成熟曲库的场景相比,多了一个在音乐中标记卡点时刻的步骤。而这一步的实现还是比较有挑战的,如果标记的不合理,那么最终呈现出的卡点效果就会很差。目前看起来剪映和快影还没有支持该功能,或许也是因为出片质量不是很好保障吧。

这里仅介绍一下AI音乐生成以及对应音乐的卡点时刻标记。用户上传音乐场景类似。

  1. AI生成音乐使用的是 AudioCraft 的 MusicGen 功能,我们只需要输入一段 Prompt,就可以得到相应的音乐。比如,我用的Prompt为:cheerful music without drums。可以得到如下音乐:
  1. 利用 Separator 将音乐做音轨分离,提取出音乐的纯粹节拍部分。代码如下:
sp = Separator(params_descriptor="spleeter:4stems")
sp.separate_to_file(orig_path, music_ai_path, codec="wav")

其中,参数params_descriptor="spleeter:4stems"表示将该段音频分离为人声、鼓、贝斯和其他声音。spleeter 还有一些别的参数可供选择。详细可参考:https://research.deezer.com/projects/spleeter.html

  1. 使用 librosa 加载鼓点音频文件,并获取到对应的音频时序数组以及采样率。代码如下:
drums, sampling_rate = librosa.load(drum_path)

其中,drum_path 是鼓点音频文件的路径,drumssampling_rate分别表示音频时序数组以及采样率,示例分别为:[0.0012345, 0.0023456, 0.0034567, …] 和 44100。

  1. 使用 librosa 获取鼓点音频的节拍时刻。代码如下:
estimated_bpm, beats = librosa.beat.beat_track(y=drums, sr=sampling_rate, units='time')

其中drumssampling_rate分别表示前面获取到的音频时序数组以及采样率。units 表示节拍位置的单位,可选time和frames,time 表示以时间为单位,frames 表示以帧数为单位。

estimated_bpm表示估计的全局节奏,以每分钟节拍为单位。beats是节拍时刻的数组。

librosa.beat.beat_track 具体的检测算法可以参考文档:http://labrosa.ee.columbia.edu/projects/beattrack/

  1. 使用 librosa 获取得到脉冲信号所在帧,使用上述 beats 计算得到节奏所在帧,并将脉冲信号所在帧与节奏所在帧进行匹配,得到节奏点。代码如下:
# 获取得到脉冲信号所在帧
onset_env = librosa.onset.onset_strength(y=y, sr=sr, hop_length=512, aggregate=np.median)
peaks = librosa.util.peak_pick(onset_env, 1, 1, 1, 1, 0.8, 5)
# 创建一个节拍值1/4、2/4、3/4、4/4的数组
M = beats * [[1 / 4], [2 / 4], [3 / 4]]
M = M.flatten()
M = np.sort(M)
# 局部脉冲与节拍点做10%的去误差,得到节奏点
L = []
for i in M:
	for j in peaks:
		if i * 0.9 < j < i * 1.1:
	    L.append(j)
L = list(set(L))
L.sort()
# 取前20个点,不够20个则全取
if len(L) > 20:
	point_list = librosa.frames_to_time(L[:20], sr=sr)
else:
	point_list = librosa.frames_to_time(L[:len(L)], sr=sr)

这里我们有个假设,就是AI生成或用户上传的音乐一定是2/4拍或3/4拍或4/4拍中的一种。这样才能通过该方法来找到节奏点。最终输出的节奏点是以脉冲信号为准的,因为有可能音乐的节奏点没有脉冲信号,我们要的卡点时刻一定是要有脉冲信号的。

librosa.util.peak_pick 的具体检测算法可以参考文档:https://librosa.org/doc/main/generated/librosa.util.peak_pick.html

  1. 使用AudioSegment 进行音频文件的裁剪,具体怎么裁剪这个就看业务需求了,参考代码如下:
# 音乐裁剪,设置开始结束时间
end_time = point_list[len(point_list) - 1] + start_time
start_time = start_time * 1000
end_time = end_time * 1000
sound = AudioSegment.from_mp3(music_path)
word = sound[start_time:end_time]
# 音乐储存路径
word.export('movie/music.wav', format="wav")
  1. 将节奏点时刻数组作为卡点方案,以及待剪辑的视频作为输入。这样后续的流程就和“BGM来自于成熟曲库”的流程一样了。

总结

本文总结了如何利用一些开源工具快速的实现视频素材BGM卡点的功能。限于篇幅,有很多细节都没有涉及到,比如更多酷炫转场的实现、相似素材场景聚类的实现等。后续如果有机会,会继续续写分享。

相关参考资料

OpenCV文档:https://docs.opencv.org/4.x/d9/df8/tutorial_root.html

spleeter:https://research.deezer.com/projects/spleeter.html

librosa:https://librosa.org/doc/latest/index.html

AudioCraft:https://audiocraft.metademolab.com/

AudioSegment:https://pydub.com/

虚拟人的快速实现:语音驱动图片

背景

由于业务需要,近期调研了如何使用开源工具快速的实现一个虚拟人,这个虚拟人来自于一个图片,我们需要做的是要让这个图片动起来,且图中人物的口型能够跟对应的语音对应上。为此,我了解几个开源的工具或模型,最终选择使用SadTalker来实现该功能。

本文是对SadTalker的使用以及其用到的基础模型做一总结。希望能够对想要实现该功能的同学有所帮助。

要达成的目标

需要有个开源工具,能够实现如下的输入输出,效果要尽可能的好,并且方便我们将该服务本地化。

输入:一段文字脚本、一个音色模型、一张人物图片

输出:一个视频,该视频中的人物能够开口说话,并且用指定的音色说出文字脚本的内容

SadTalker相关信息

Github地址:https://github.com/OpenTalker/SadTalker

hugging face地址:https://huggingface.co/spaces/vinthony/SadTalker

SadTalker的使用

关于SadTalker的使用,其官方文档已经写的比较全面了,这里不再赘述。只提一下其中一些参数的使用要点。

  1. 如果要用preprocess的full参数,则一定要加上–still参数,不然效果会很糟糕。
  2. free-view Mode参数可以用于控制头像的转动,其中参数的详细含义如下:

input_yawinput_pitch 和 input_roll 通常用于描述物体或相机在三维空间中的旋转角度,也被称为欧拉角或俯仰、偏航和翻滚角。

  • input_yaw 表示绕垂直于地面的轴旋转的角度,也称为偏航角。通常以正北方向为0度,顺时针方向为正方向,逆时针方向为负方向。
  • input_pitch 表示绕水平轴旋转的角度,也称为俯仰角。通常以水平面为0度,向上旋转为正方向,向下旋转为负方向。
  • input_roll 表示绕前后轴旋转的角度,也称为翻滚角。通常以垂直于水平面的轴为0度,向右旋转为正方向,向左旋转为负方向。
  1. 就我目前来看,如果我们需要头部动起来,那么还是不设置旋转参数的效果更好。

SadTalker使用到的模型

模型介绍

shape_predictor_68_face_landmarks.dat

shape_predictor_68_face_landmarks.dat是一个基于dlib库的人脸关键点检测模型文件,可以用于检测人脸的68个关键点,包括眼睛、眉毛、鼻子、嘴巴等部位的位置信息。这个模型文件在很多人脸识别、表情识别、人脸姿态估计等领域都有广泛的应用。(基于随机森林的回归算法)

Deep3DFaceReconstruction

**Deep3DFaceReconstruction**是一种基于深度学习的人脸三维重建技术。它通过利用深度学习算法对人脸图像进行分析和处理,从而实现对人脸的三维重建。这种技术可以广泛应用于计算机视觉、虚拟现实、增强现实等领域,为人们带来更加逼真的视觉体验。

Deep3DFaceReconstruction的核心技术是利用深度学习算法对人脸图像进行分析和处理,从而提取出人脸的三维信息。具体来说,它通过对大量的人脸图像进行训练,学习到了一种高效的特征提取方法,可以快速准确地提取出人脸的关键特征,包括面部轮廓、眼睛、鼻子、嘴巴等。然后,它通过对这些特征进行三维重建,从而得到了一个高度逼真的人脸三维模型。

Deep3DFaceReconstruction具有许多优点,例如可以实现快速高效的三维重建、可以处理不同角度和光照条件下的人脸图像、可以处理不同种族和性别的人脸图像等。此外,它还可以应用于人脸识别、人脸动画、人脸表情识别等领域,具有广泛的应用前景。

auido2pose_00140-model.pth

auido2pose_00140-model.pth是一个训练好的PyTorch模型文件,用于音频到人体姿态的转换。该模型可以根据输入的音频数据,预测出对应的人体姿态信息。在这个文件中,auido2pose_00140-model.pth表示训练过程中的第140个epoch。在每个epoch中,模型会对训练数据进行一次完整的训练,以更新模型参数。通常情况下,训练的epoch次数越多,模型的预测效果会越好,但是也会增加训练时间和计算资源的开销。该模型文件可以被应用于很多领域,如虚拟现实、运动分析、人体姿态识别等。(PoseVAE模型,基于transformer模型的)

auido2exp_00300-model.pth

auido2exp_00300-model.pth是一个训练好的模型文件,用于将音频数据转换为对应的面部表情。该模型通过分析音频数据和面部表情数据之间的关系,可以预测出对应的面部表情信息。auido2exp_00300-model.pth可以被应用于很多领域,如虚拟现实、面部表情识别、情感分析等。通过分析音频数据和面部表情数据之间的关系,该模型可以实现对面部表情的快速、准确的预测。(ExpNet模型,基于transformer模型的)

wav2lip.pth

wav2lip.pth是一个基于深度学习的语音和口型同步技术的模型文件,可以将音频和视频中的语音和口型同步,生成逼真的合成视频。该模型使用了深度学习中的卷积神经网络和循环神经网络等技术,实现了对音频和视频的特征提取和匹配。wav2lip.pth可以被应用于很多领域,如虚拟现实、视频编辑、电影制作等。通过将音频和视频进行同步,可以实现更加逼真的人机交互和视频合成效果,提高用户体验。(基于循环神经网络)

mapping_00109-model.pth.tar

mapping_00109-model.pth.tar的作用是将输入音频中学习到的逼真的3D运动系数转换为对应的基础向量,从而生成一个逼真的3D人脸。具体来说,mappingNet模型学习从显式的3DMM运动系数(头部姿态和表情)到隐式的基础向量之间的关系,并将这个关系应用于输入音频中学习到的逼真的3D运动系数,从而生成对应的基础向量。最后,这个基础向量与3DMM模型中的基础向量相结合,生成一个逼真的3D人脸。(基于GAN神经网络)

facevid2vid_00189-model.pth.tar

facevid2vid_00189-model.pth.tar是一个基于深度学习的人脸动作转换模型,可以将输入的人脸视频转换成指定动作的人脸视频。简单来说,可以将上述3D模型视频转为穿了图片皮肤的视频。(基于GAN神经网络)

GFPGANv1.4.pth

GFPGANv1.4.pth是一种图像超分辨率重建模型,用于将低分辨率图像(LR)提升至高分辨率图像(HR)。它是由腾讯优图实验室提出的一种基于生成对抗网络(GAN)的方法,可以在不失真的情况下提高图像质量。GFPGANv1.4.pth是该模型的一个预训练权重文件,可以用于对新的低分辨率图像进行重建。该模型可以应用于许多领域,如数字娱乐、医学影像、安防监控等。(基于GAN神经网络)

处理流程中的相关模型

  1. 对原始图片进行裁剪,这一步用到shape_predictor_68_face_landmarks.dat算法模型识别出人脸,并使用CV2进行裁剪处理。
  2. 将原始图片中的人脸识别成一个3D人脸模型系数,并存储在mat格式的文件中。
  3. 3DMM提出给定视频中的眨眼动作。(将视频中的人脸识别成一个3D人脸模型系数,并存储在mat格式的文件中。)
  4. 3DMM提出给定视频中的姿势动作。(将视频中的人脸识别成一个3D人脸模型系数,并存储在mat格式的文件中。)
  5. 从音频中提取出姿势、表情以及唇形,并存储在mat格式的文件中。
  6. 使用上述系数渲染出一个3D人脸模型,并用ffmpeg命令将音频与视频做结合。
  7. 将视频与图片做结合。
  8. 效果增强,使视频变得更加清晰。

模型处理的时延测试

音频时长:10秒

处理总时间:366054毫秒(6分钟)

模型处理时间总占比:97.20%

测试机器配置:半张NVIDIA Tesla T4显卡、显存:8GB

处理过程模型处理时间(单位:毫秒)处理时间占比
图片的预处理shape_predictor_68_face_landmarks.dat
epoch_20.pth92962.54%
眨眼视频的处理shape_predictor_68_face_landmarks.dat
epoch_20.pth4094811.19%
姿势视频的处理shape_predictor_68_face_landmarks.dat
epoch_20.pth300448.21%
音频提出系数auido2exp_00300-model.pth
auido2pose_00140-model.pth
wav2lip.pth10940.30%
3D人脸模型渲染mapping_00109-model.pth.tar12231233.41%
视频与图片做结合facevid2vid_00189-model.pth.tar66461.82%
效果增强gfpgan14544939.73%

可以看到,模型处理过程中,最耗时的是3D人脸模型渲染以及效果增强,两者耗时占比超70%。

SadTalker的部署与服务

SadTalker的项目代码是用Python编写的,我们可以用FastAPI很快速的把它改造为能够通过HTTP对外提供服务的方式,并且在服务器中用uvicorn启动该服务。这样就可以在自己的本地环境中稳定的提供服务了,在有代码变更的情况下,也无需手动重启服务。

我们的服务还与腾讯云COS存储打通了,语音和图片从COS中读取,并且将处理成功的视频文件写入到COS中,并将链接地址返回。

总结

本文是对SadTalker的使用以及其用到的基础模型做了一下总结,其中包括了模型的介绍以及各模型处理时长的试验结果,但并没有涉及到模型的原理以及优化细节。对于针对于业务的优化,可以后续视情况来续写分享。

参考资料

SadTalker Github地址:https://github.com/OpenTalker/SadTalker

SadTalker hugging face地址:https://huggingface.co/spaces/vinthony/SadTalker

SadTalker最佳实践:https://github.com/OpenTalker/SadTalker/blob/main/docs/best_practice.md

FastAPI:https://github.com/tiangolo/fastapi

ChatGPT实践——如何用LangChain基于LLM构建一个应用程序

ChatGPT实践——如何用LangChain基于LLM构建一个应用程序

LangChain是什么

随着大语言模型的出现和发展,开发者们都会思考一个问题,如何利用这些新技术来搭建一个有用的应用程序。但若只是基于各个独立的技术,是很难满足我们对应用程序的需求的。而LangChain这样的工具就是用来解决这一问题的,它能够将多个不同的技术组合起来供开发者使用,从而让开发者得以便捷的实现一个有用的应用程序。

LangChain可以实现的应用场景

Agents

这里的Agents指的是一个系统,该系统可以使用大语言模型来与其他工具进行交互,从而实现如智能问答、智能操作或是其它连接硬件进行行动的功能。我能想到的是可以实现一个类似于Siri这样的手机软件助手,应该会比它智能一些吧,还有像是科幻片种的智能机器人。

Chatbots(聊天机器人)

这个其实就是基于LLM来做一个能够对话的机器人,像OpenAI的ChatGPT就能直接进行对话的。但它这里可以提供更多复杂的功能,让LLM能够更加定制化,适用更多的应用场景。

Generate Examples(生成例子)

可以基于我们之前的样例按照某个模板生成更多相似的样例。

Data Augmented Generation(增强的数据生成)

目前的LLM只能是基于已训练好的通用模型进行数据生成(或者叫做对话),但我们有很多场景都是特殊的,我们希望LLM能够对我们自己的私人文档进行总结或是基于私人文档进行问答对话,又或者希望我们和LLM的对话能够基于我们的某个数据库。而使用该工具就可以实现这些场景了。

Question Answering(问答)

可以创建基于某个或多个文档的问答系统。对于基于多个文档的问答系统,可以做到创建各个文档的索引,从而在问答时能够快速挑选出最相关的文档发送给LLM,而不用每次都把所有文档都发送过去。

这个我能想到的是,可以把公司的iwiki系统接入进去(当然,这只是假设,因为数据敏感要求,肯定不能这么做),这样我们就可以不仅限于倒排索引进行检索了。

Summarization(摘要)

将多个长文档总结凝练成一小段核心的摘要。

Querying Tabular Data(查询表格数据)

有许多数据都是表格型数据,比如CSV、EXCEL表格,比如SQL表格。LangChain可以用于处理这些表格数据。

Extraction(提取)

在我们工作场景中,大多数的API以及数据库处理的都是结构化的数据,因此,从一段文本中提取出我们所需要的结构化数据就显得很重要了。包括从一段文本中提取出一条或多条结构化数据插入到数据库中,以及从用户的查询中提取出正确的API参数等。

Model Comparison(模型比较)

在我们构建自己的应用程序时,肯定会涉及到对不同的模型、参数等进行选择。LangChain提供了一个叫做ModelLaboratory的工具,可以让开发者便捷的检验出不同的模型、参数等带来的不同效果,以此来快速判断出该如何进行选择。

LangChain的生态系统

LangChain集成了很多LLM以及各种常用的工具包,比如OpenAI、Wolfram Alpha Wrapper等。

AI研究数据管理自然语言处理搜索引擎其他
AI21 LabsAtlasDBBananaGoogle搜索封装器Chroma
CerebriumAIDeepInfraHugging FaceGoogle Serper封装器Cohere
Deep LakeGraphsignalNLPCloudOpenSearchDeepInfra
ForefrontAIMilvusPetalsSearxNG搜索APIGooseAI
OpenAIPineconePromptLayerSerpAPIHazy Research
StochasticAIPGVectorWeaviateWolfram Alpha封装器Modal
QdrantRunhouse
UnstructuredWeights & Biases

详见:https://langchain.readthedocs.io/en/latest/ecosystem.html

使用LangChain做了几个应用程序demo

结合自己的notion文档做对话

demo背景

因为我有每天做工作记录的习惯,所以想尝试使用这个工具结合自己的notion文档,看能否使用它对自己的notion文档做问答对话。

我的工作记录的notion文档部分截图如下:

问题一:把vanping在2022年6月的工作做一下总结

执行脚本运行的结果:

问题二:在2022月6月,什么工作是vanping的重点

代码

import os
from langchain.chains.question_answering import load_qa_chain
from langchain.chat_models import ChatOpenAI
from langchain.document_loaders import UnstructuredFileLoader

openaichat = ChatOpenAI(model_name="gpt-3.5-turbo")

loader = UnstructuredFileLoader("./vanping_work.txt")
docs = loader.load()

chain = load_qa_chain(openaichat, chain_type="stuff")
query = "在2022月6月,什么工作是vanping的重点"
result = chain.run(input_documents=docs, question=query)
print(result)

小结

这里用到了LangChain的Models、Indexes模块。

看起来使用该工具是能够非常便捷的导入文档以及和chatGPT做对话交互的。但我这里也只是展现了一个简单的demo,若是多加利用,应该是很好用的。

让应用程序扮演某个角色和我对话

demo背景

我想尝试使用LangChain中的chains,看看能否用加前后缀的方式,让应用程序可以扮演某个角色和我进行对话。

角色零:通用模型。问题:你每天都在干吗?

运行结果:

角色一:阴阳怪气的人。问题:你每天都在干吗?

运行结果:

角色零:通用模型。问题:给我讲下ChatGPT的应用场景?

运行结果:

角色二:言简意赅的人。问题:给我讲下ChatGPT的应用场景?

运行结果:

代码

import os
import sys
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain

os.environ["OPENAI_API_KEY"] = "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"

openaichat = ChatOpenAI(model_name="gpt-3.5-turbo")

prompt = PromptTemplate(
   input_variables=["query"],
   template="假设你是个言简意赅,每次说话都不超过30个字的人,{query}",
)

chain = LLMChain(llm=openaichat, prompt=prompt)
result = chain.run(sys.argv[1])
print(result)

小结

这里用到了LangChain的Models、Prompts、Chains模块。

这应该是Chains最简单的用法了。实际上Chains是有很多高级的用法。可以参考:https://langchain.readthedocs.io/en/latest/modules/chains.html

私人问答助手(与搜索引擎结合,且具备记忆能力)

demo背景

当前的很多LLM模型都是基于历史的数据训练得来的,比如ChatGPT,就仅能支持查询21年以前的事件,之后的事件它就无能为力了。

比如,当我想知道”当前NBA赛季,洛杉矶湖人队的常规赛平均得分是多少? 与上赛季常规赛平均水平相比百分比变化如何?”

它的答复是这样的:

从结果我们就能看到,它对于这种事情是一窍不通的。

而如果我们想做一个私人问答助手,肯定是不能不知道当前时事的,因此,我就使用LangChain结合搜索引擎来做一个助手。

私人问答助手的答复

问题一:当前 NBA 赛季洛杉矶湖人队的常规赛平均得分是多少? 与上赛季常规赛平均水平相比百分比变化如何?

问题二:该队当前的主力队员有谁?

针对以上两个问题,这个私人问答助手都有答复。

答复一:洛杉矶湖人队本赛季常规赛场均得分为 116.7,比上赛季的场均得分 111.5 提高了 4.2%。

答复二:洛杉矶湖人队的主力球员是Anthony Davis,Wesley Gabriel,Rui Hachimura,LeBron James。

我在NBA APP中查了下球队数据,看起来也不十分精确,但相差也并不大。

我在这个demo中用到了LangChain的Memory、Agents模块,以及GoogleSerperAPIWrapper工具。

我还试验了一下,如果不要Memory模块,那么输出的结果会是如下这个样子。

第一个答复还依旧正常,但第二个答复就是胡说八道了:美中贸易战的主要参与者是越南、台湾、孟加拉国、韩国、智利、马来西亚、阿根廷、中国、美国、日本、印度和印度尼西亚。

代码

from langchain.agents import ZeroShotAgent, Tool, AgentExecutor
from langchain.memory import ConversationBufferMemory
from langchain import OpenAI, LLMChain
from langchain.utilities import GoogleSerperAPIWrapper
search = GoogleSerperAPIWrapper()
tools = [
   Tool(
       name = "Search",
       func=search.run,
       description="useful for when you need to answer questions about current events"
  )
]

prefix = """Have a conversation with a human, answering the following questions as best you can. You have access to the following tools:"""
suffix = """Begin!"

{chat_history}
Question: {input}
{agent_scratchpad}"""

prompt = ZeroShotAgent.create_prompt(
   tools,
   prefix=prefix,
   suffix=suffix,
   input_variables=["input", "chat_history", "agent_scratchpad"]
)
memory = ConversationBufferMemory(memory_key="chat_history")

llm_chain = LLMChain(llm=OpenAI(temperature=0), prompt=prompt)
agent = ZeroShotAgent(llm_chain=llm_chain, tools=tools, verbose=True)
agent_chain = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True, memory=memory)

agent_chain.run(input="What is the regular season scoring average for the Los Angeles Lakers in the current NBA season? How is the percentage change compared to last season's regular season average?")
agent_chain.run(input="Who are its main players?")

小结

这里用到了LangChain的Models、Memory、Chains、Agents模块。

制作一个小助手算是非常常见的功能,我这里也只是简单的试用了一下。如果有更复杂的需求,可以参考文档:https://python.langchain.com/en/latest/use_cases/personal_assistants.html

总结

本文对LangChain做了一些简单的介绍,重点是后面列举了三个应用程序Demo。每个应用程序Demo虽然都比较简单,但也算是试用了一下LangChain所提供的一些功能。参考LangChain的官方文档,理论上就能实现很多场景更复杂的AI应用。

参考资料

LangChain官方文档:https://python.langchain.com/en/latest/index.html

ChatGPT分享-如何开发一个LLM应用:https://www.51cto.com/article/749570.html

学会任务理解、真正运算和时事搜索,GPT:我将以高达形态出击:https://km.woa.com/articles/show/573744

LangChain:Model as a Service 粘合剂,被 ChatGPT 插件干掉了吗?:https://foresightnews.pro/article/detail/28959

软件系统的设计原则及解耦部署

一、背景

之所以写这篇文章,是因为我之前做了一段时间的后台开发工作,而且有些系统是从0开始做的。一开始的系统设计是没有遵循任何设计原则的,都是怎么快怎么来,导致后面有一段时间在做需求更改和软件变更时感到非常痛苦,想要加一个功能会需要改动很多地方,搞得自己战战兢兢。于是才找来一些系统设计相关的书来看一下,学习了设计原则和解耦部署的相关知识,并且尽可能用于实践。经过优化后的系统,虽然还是有很多问题亟待解决,但总体上来说好了很多,再做变更时心里也有把握了许多。经过一轮优化,我觉得有必要将这些原则汇总总结一下,一方面是梳理下自己学到的知识,另一方面是分享给有需要的人。限于篇幅,文章基本上是比较偏理论的,但每一点都可以套用在自己的系统中进行深入思考。

文章分为四个部分。第一部分讲SOLID设计原则,因为这是系统设计需要遵循的基本原则,这些原则在很多地方都有详细样例讲解,我这里就主要是提了一下基本概念。第二部分讲组件的设计原则,由于一个大型系统其实是由各个组件耦合组成的,因此有必要对组件应该如何设计进行系统总结。第三部分总结了软件系统解耦部署的几种方式,因为如果系统部署方式选择的正确,是会切实提升研发效率的。在文章最后,我也写了一些我个人对于系统设计原则实践的一点思考。

二、代码设计原则

前面提到了,一个大型软件系统其实就是由一系列组件耦合组成的。所谓组件,简单来说就是对数据与方法的封装。作为组件的使用者,无需关心它的实现细节,只需要简单的调用即可。当一个组件设计的比较合适合理时,是可以变得非常通用的,开发者可以重复利用,也就可以极大的提升研发效率。

既然组件是数据与方法的封装,本质上组件还是代码构成的,因此,组件的设计原则也就一定是基于代码的设计原则的。这里,我们先回顾一下代码设计原则。代码的主要设计原则可以简单记为SOLID原则。

SRP

所谓SRP,即单一职责原则,指的是一个类只能因为一个原因(一个行为者)而被修改。这个原则是为了防止两个不同的使用者如A、B,当A因为某个原因让开发修改类代码时,却影响到了B的使用功能。单一职责原则经常被人误解读为一个接口或者一个类只能做一件事,这样的设计也有道理,但这样理解这个原则是有偏差的。

OCP

OCP,指的是开闭原则。如果我们要添加一个新功能,最好是能够增加新代码来实现,而非变动之前的旧代码。该原则的目的是为了系统有良好的扩展性,而且,不会因为变动旧代码而出现问题。实现的方式是利用接口的方式将低层级代码插件化,可以将系统划分为一系列的组件,并且按层级划分好,让低层级的代码通过实现高层级的接口,从而实现插件化管理。

LSP

LSP,即里氏替换原则。这是一个针对继承的原则。指的是,一个父类的对象,要能够被它的子类的对象进行直接替换,并且相关程序还能够正常且正确的运行,那么就可以说是符合LSP原则的。继承要符合LSP原则的目的是,使继承不会增加复杂度,基类能真正被复用,而派生类也能够在基类的基础上增加新的行为。使得代码更容易被扩展,也能够更加健壮。LSP原则是对OCP原则的一种实践。

ISP

ISP是接口隔离原则。该原则指的是我们不应该依赖用不到的代码。任何层级的代码,如果是依赖了用不到的代码,那么很可能会在未来出现意想不到的麻烦。在实际中,我们可以做到以下几点来做到ISP原则:

  1. 一个类对另一个类的依赖应该建立在最小接口之上,接口中不应该有我们不需要的方法。
  2. 建立功能单一的接口,不要建立一个大而臃肿的接口。
  3. 尽量去细化接口,接口中的方法尽可能少。

DIP

DIP,依赖反转原则,指的是底层的代码应该依赖于核心代码。当核心代码需要依赖底层代码时,就使用多态的方式使依赖进行反转。也就是使控制流和依赖流相反。目的是使不稳定的底层代码依赖稳定的核心代码,从而减少更核心代码的变动。

举个我们常见的例子,比如我们在写一个功能时,很可能会用到缓存,经过调研,我们决定使用Redis来作为缓存使用。于是,我们就在核心代码里直接调用了Redis的操作方法。这是我们常用的方式。

但是这样的方式有个问题,就是我们以后如果想要将Redis替换掉,那我们就会比较痛苦,因为我们不得不去修改核心业务代码,并且我们要重新编译、测试、发布等。

img

假设我们要将Redis替换为Tendis,那么我们就不得不变更核心业务代码,如下:

img

但是,假如我们遵守DIP原则,将操作缓存的接口在核心业务代码中制定好,而由外部缓存如Redis的实际操作部分来实现我们的接口代码,那么Redis就变成了我们核心业务代码中的一个插件而已,我们在想要更换缓存选型时,也非常的轻松,只需要让新的缓存操作代码去实现核心业务代码中的操作接口即可,核心业务代码不需要做任何更改。这样就极大的降低了系统在做变更时的风险。

img

如果遵循DIP原则,那么,我们就可以保证核心业务代码持续稳定不用修改,让其他的组件都作为插件方便的进行替换。比如我们要将Redis替换为Tendis,那么写好Tendis操作代码后直接替换就行了。

img

三、组件设计原则

上面我们介绍到了,组件其实就是对数据与方法的封装。同时,组件也是软件开发与部署的最小单元。说到这里,那我们肯定产生的第一个疑问就是,什么样的代码应该被组织为一个组件?组件是代码量越小越好吗?下面就介绍一下代码组织为一个组件应该要遵守的设计原则。

1. 组件构成的设计原则

1.1 REP:复用发布等同原则

REP原则指的是软件复用的最小粒度应等同于其发布的最小粒度

由于一个组件是代码部署发布的最小单位,因此,一个组件的版本是可以由版本号以及版本信息来记录和追踪的。我们要将同时发布/变更的类放到同一个组件中,当我们的组件发布新版时,使用者就可以通过版本号以及版本信息来确定他们是否需要升级该组件。

比如说,如果我们将几个毫无关联的类或功能放在了同一个组件中,那么可能类A频繁变更发版,但类B却没有任何变化。那么类B的使用者一定会感到不合理,因为发现组件频繁的变更,却没有任何应该自己要关注的信息。

1.2 CCP:共同闭包原则

CCP原则指的是我们要将会同时修改并且会为相同目的而修改的类放在同一个组件中,而将不会同时修改并且不会为了相同目的而修改的类放在不同的组件中。

这个原则算是对上述代码设计原则SRP的一种组件上的延伸。目的也是为了避免变更范围大的情况,从而避免大量变更所导致的风险与问题。

1.3 CRP:共同复用原则

CRP原则指的是不要强迫使用者依赖他们不需要的东西。

CRP原则与之前讲述的ISP原则有相似之处。ISP简单说是我们不应该将用不到的方法放在使用者要使用的接口中。而CRP原则是说,我们不应该将用不到的类放在使用者要使用的组件中。

1.4 组件设计原则的关系

我们思考以下这些情况,CCP原则可能会让一些要同时修改的类放在一个组件中,但是,这些类又不是软件复用的最小粒度,因此,会导致REP原则的缺位。相反,如果我们强执行REP原则,就有可能导致某些本应放在一起的类被拆分开到不同组件,那也就可能会造成多次不必要的分布,也就造成了CCP原则的缺失。

再考虑一下,如果我们严格执行CRP原则,那么,我们就会使组件拆分的尽可能小,也可能会出现CCP原则的缺失。那么,也会造成频繁发布的问题。类似这样原则间互相冲突的情况有很多。

总而言之,这三个原则其实是很难被同时满足的,三个原则其实是存在竞争关系的。

如图:

img

因此,如何权衡这三个原则的关系就非常重要。一般来说,我们在软件开发的初期都是更在意研发效率,而复用可以暂时放一放。所以,一般在软件初期,我们的组件设计会更偏重于CCP原则,减少变更与发布次数从而提升研发效率。到软件开发中后期,我们会逐步将原则由右边转移至左边,也就是会更偏重于REP原则,从而提升组件的复用性,使研发变得更具有长期性。

当然,实际使用情况还是要看项目的具体情况来灵活调整,不用拘泥于某一个原则。

2. 组件耦合的设计原则

上面的组件构成指的是组件内部由什么构成,需要遵循的原则。而组件耦合指的是组件和组件之间关系,这需要遵循的原则如下。

2.1 无依赖环原则

什么是无依赖环原则

要理解无依赖环原则,首先得定义清楚什么是依赖环?

我们都知道,从一个结构中的任意一个节点开始,都不能沿着结构的边最终走回到起始点,那么就可以说这个结构中不存在环。如果这个结构是一个图,那么,这个图就是有向无环图。如果这个结构是组件的依赖关系图,那么就可以说这些组件是不存在依赖环的,也就是说,满足了无依赖环原则。

如下图组件耦合图就满足了无依赖环原则:

img

而下图组件耦合图不满足无依赖环原则:

img

为什么不能出现依赖环

定义清楚后,我们思考一下,为什么组件耦合不可以出现依赖环呢?

我们先看下没有依赖环的情况。假如上图几个组件都是由不同的开发者负责的,在没有依赖环的情况下,如果组件C做了调整修改,并且打上了新的Tag并进行了代码发布。那么,依赖组件C的其他开发者都要考虑是否需要升级该组件,若升级该组件,则其余的开发者都可能要进行相应调整、测试、发布。重新发布完成就到此结束了。

但是,如果是有依赖环的情况,如果组件C做了调整修改,并且还是打上了新的Tag并进行了代码发布。那么,依赖C的组件,包括D和E组件都需要调整、测试、发布。但是,重新发布完成还没有结束,因为D、E做了变更,而C又是依赖于D、E,因此,在D、E重新发布完成后,C又必须进行调整和重新发布。此时就进入了一个尴尬的死循环。要想破除这个循环,就必须要C、D、E三个组件的开发者同时进行联调并发布上线。这就使几个组件变得强依赖了,开发成本就变高了,这样的设计显然是不合理的。

可能会有开发者觉得,如果组件形成了依赖环,那么我们只需要确认有没有对我们所依赖的类或接口进行变更再来判断需不需要进行重新开发和发布不就好了么?这样的想法是不对的。假如是一个大型项目,组件数量繁多,难道每次开发者依赖的组件在变更后,开发者都要去确认我们需不需要变更吗?这样的成本是非常高的,而且出错的可能性也非常高。

如何优化不符合无依赖环原则的组件耦合

现在,我们知道了一个项目中的组件是不应该出现依赖环的,那么假如依赖环是业务导致形成的,是无法避免的呢?或者说,是历史遗留问题呢?这个时候如何去解决依赖环的问题呢?

一般是有两种解决方案:使用接口进行依赖反转以及将共同依赖抽离出一个新的组件。

先说一下依赖反转。其实依赖反转就是根据前面所叙述的DIP原则,将用于实现的接口放在需要被依赖的组件中,让另一个组件来实现这个接口,这样就可以达到让依赖反转的目的。具体来说,我们可以让上图中的组件D、E的依赖进行反转,组件E中做一个用于组件D实现的接口。这样就可以让组件D反而依赖组件E了。

img

另一种解决方案是将共同依赖抽离出一个新的组件。这个很好理解,其实就是将组件D、E互相依赖的类抽离出来,单独做成一个新的组件,如图。

img

上面有一点可能不太好理解。组件E依赖于组件D,因此将组件D中被依赖的类抽离出一个新的组件F,这个很直观。但是为什么改造后的组件D也要依赖于组件F呢?是因为组件D可能会通过组件C间接的依赖于组件E的某个类,这样的话,我们也应该将组件E中被依赖的类抽离到组件F中。当然,如果组件D并没有依赖于组件E的某个类,那么组件D对于组件F的依赖就是没有必要的了。

2.2 稳定依赖原则

什么是稳定依赖原则

所谓稳定依赖原则,指的是,被依赖的组件的稳定性应该高于产生依赖的组件的稳定性。

img

如图所示,组件B的稳定性应该是要高于组件A的稳定性。否则,我们就说该组件耦合不满足稳定依赖原则。

组件的稳定性如何评估

既然我们已经知道了稳定依赖原则指的是被依赖的组件的稳定性应该高于产生依赖的组件的稳定性。那么,重点就在于,我们如何来评估一个组件的稳定性呢?什么样的组件可以被称为更为稳定的组件呢?

组件与组件之间都是有依赖关系的,一个组件会依赖于别的组件,也可能会被别的组件所依赖。我们用于评估组件稳定性的公式是:I(A) = o(A) / (o(A) + i(A)

其中,I(A)表示组件A的稳定性,o(A)表示组件A依赖于别的组件的数量,i(A)表示组件被别的组件所依赖的数量。I(A)越小,则表示组件A越稳定。比如下面这两个组件A、B,分别对应的稳定性值为I(A)=1/3,I(B)=0。

img

上图的组件耦合就是一个满足了稳定依赖原则的耦合。

另外,稳定性有两个极端情况,一种是组件只被依赖,而不依赖于其他组件,如组件B,这是最稳定的组件,这样的组件非常难以变更。还有一种组件是只依赖别的组件,而自身却不被依赖,如组件C、E、F,这种组件是最不稳定的,很容易被变更。

如何优化不符合稳定依赖原则的组件耦合

比如我们现有的组件耦合方式如下图所示:

img

可以看到,组件A的I(A)=1/4,组件B的I(B)=3/4,因此,组件A的稳定性是大于组件B的。根据我们前面说的稳定依赖原则,被依赖的组件的稳定性应该大于产生依赖的组件,所以,上图中组件A、B的耦合方式是不满足稳定依赖原则的。

对于这种情况,我们依然可以用DIP原则进行依赖反转,具体方式前面已经多次叙述,这里不再赘述。

优化后的组件耦合如下图所示:

img

2.3 稳定抽象原则

什么是稳定抽象原则

所谓稳定抽象原则,指的是一个组件的抽象化程度应该与其稳定性保持一致。

换句话说,就是越稳定的组件就应该抽象化程度越高,越不稳定的组件就应该有越多的具体实现。

组件的抽象程度如何评估

我们知道了什么是稳定抽象原则,也已经通过上一节的叙述知道了稳定性的评估方法。接下来就该了解一下如何评估一个组件的抽象程度。

衡量一个组件抽象程度的公式如下:$$
A = Na / Nc
$$

其中,A表示组件的抽象程度。Na表示该组件中抽象类与接口的数量。Nc表示该组件中类的数量。

A的取值范围为[0, 1],当A为0时,则表示组件中没有抽象类与接口,该组件全是具体实现。当A为1时,则表示该组件中全是抽象类与接口,那么该组件就是完全抽象的组件。

稳定抽象原则的实际应用

我们对稳定抽象原则加以思考就会发现,该原则是较为“模糊”的。因为现实中,不可能所有的组件都是完全抽象化且完全稳定,或者完全具体实现且完全不稳定的。现实中的情况是绝大多数组件都是处于中间状态的,那么,怎么样才算是抽象化程度与稳定性保持了一致呢?

(1)最理想的情况

我们先来看下理想情况,如图所示:

img

当一个组件的稳定性指标为0,抽象化程度为1时,即在(0,1)点;或者一个组件的稳定性指标为1,抽象化程度为0时,即在(1,0)点。这两个点上的组件是最理想的情况。也毫无疑问是遵循了稳定抽象原则的了。

(2)最不理想的情况

然后我们看下最不理想的情况,最不理想的情况有两个区域,一个是痛苦区、一个是无用区。怎么理解这两个区域呢?

首先来看痛苦区。

痛苦区指的是一些组件的稳定性非常高,但是抽象程度却很低。这些组件由于稳定性高,被其他组件依赖非常多,抽象程度又很低,因此一旦这样的组件要被更改就会非常痛苦,因为其他依赖他的组件都要重新检查测试并发布。

但反过来看,如果一个组件本身就无需被变更,那么这个组件落在这个区域也就显得没有那么痛苦了。典型的情况就是一些公共工具库如String库,这样的公共库都是基本不会做变更的,因此这样的库就算是落在这个区间也没关系。

再来看下无用区。

无用区指的是一些组件的稳定性非常低,但是抽象程度却很高。为什么说这样的组件是无用的呢?这是因为,这样的组件基本不会被别的组件所依赖,但是它们又都是抽象的,没有具体实现,这样的代码一般是历史原因造成的,放着都是多余的。

(3)现实情况

如果用“小黑点”来表示组件落在坐标中的情况的话,现实情况一般如上图所示。

(4)D指标

看到了现实情况的图示后,我们还要引入一个新的指标D,用于描述“小黑点”到主线的距离(并非垂直距离),用这个指标可以描述一个组件对稳定依赖原则的遵循程度。公式如下:D=|A+I-1|

当D=0时,则表示组件就位于主线上,而当D=1时,则表示组件位于离主线最远的地方。

当然,我们的组件的D指标越小就越好。

另外,我们还可以将系统所有的组件的D指标计算出统计量如平均值以及方差。用以衡量一个系统的设计是否优良。最理想的情况当然是平均值以及方差都趋近于0,我们在做系统设计时,也应该尽可能往这方面考虑。对于单个组件,我们也可以用D指标来衡量该组件针对稳定抽象原则是否能够达标。总之,D指标可以帮助我们量化我们的系统设计。

四、合理的解耦部署

前面我们讲到的都是代码以及组件在开发前的设计原则,遵循这些设计原则,我们可以得到一个更健壮更易于扩展的系统。接下来,我们讲一下在开发后,应该怎样来合理的部署系统。这里不会讲具体的部署步骤,只是叙述一下代码的解耦部署方式。

源码层次解耦部署

源码层次解耦部署其实就是我们常说的单体系统,该系统内通过划分不同的模块以及管理好模块间的依赖关系来进行解耦,使得不同模块可以被独立开发,从而在一个模块进行变更后,可以尽量不影响到其他模块,避免其他模块也要变更发布。

单体系统的所有模块一般都是在同一个进程地址空间中执行,通过函数来进行功能调用。单体系统是理解上较为简单的系统,但并不等于单体系统就是比较低端的系统,由于单体系统易于开发、易于部署以及函数间调用时延低的特点,其实很多场景下更适合用单体系统。

库层次解耦部署

库层次解耦部署其实依然算是一个单体系统,只是该系统可以通过管理好不同代码库之间的依赖关系来进行解耦,使得不同模块可以独立开发以及发布,不同的库可以有不同的版本,我们可以依据实际情况来决定是否进行升级所依赖的库组件。

这种部署方式一般来说也是所有模块都在同一进程地址空间中执行且通过函数进行交互调用。但是也存在一些组件会运行在其他进程中,组件间就会通过进程间通信方式来进行交互调用,比如共享内存以及socket。

服务层次解耦部署

所谓服务层次解耦部署,指的是让不同的组件运行在不同的服务中。组件间只能通过网络包来进行数据交互,现在比较流行的服务间通信的方式是gRPC。按服务层次解耦部署,则其中一个服务的开发部署完全影响不到另外一个服务,这个层次解耦部署的方式是隔离性最强的,常见的SOA和微服务就是这样的部署方式。

部署方式的选择

前面所述的三种部署方式其实没有标准答案,要看具体业务场景来进行选择。现在有许多开发者都喜欢鼓吹SOA和微服务,但这是 不合理的,假如业务场景还没有到必须拆分服务的时候,而去过早的拆分成微服务进行部署,那么很可能会出现一些负面效果,因为服务层次解耦部署有一些难以绕开的问题,比如服务器资源成本更高、增加了不必要的网络时延、开发维护成本高等等。如果不是选用后的优势很大,让人可以忍受这些无法避免的问题,那么就不建议一开始就去拆微服务。

一般来说,在软件开发初始阶段,系统都会选用源码层次解耦部署,随着业务场景以及请求规模的变化,可能会进行进一步的演进(也可能一直维持单体系统),比如把通用的组件拆出来解耦部署,比如将各个模块拆分成独立的服务部署。无论演进的情况如何,有一点很重要,就是要保持系统的整体架构边界清晰,在进行部署方式变更时可以不至于有很高的开发成本,这也就要求在系统设计时就要尽可能遵循上面讲到的设计原则。

五、个人思考

我觉得各种设计原则都算是一些指导性的建议,一般都是经过长期实践、不断总结凝练出来的很难过时的经验,但并不是说每个系统都非得严格按照这些原则来做,况且,有时候有些设计原则在某些场景下相互之间也会有矛盾,也会有需要取舍的时候。我觉得我们在工程实践时,要尽可能的在心里对这些原则有所把握,在可以实践时一定要按照这些设计原则进行实践。但反过来说,如果遵循某个原则会导致很大的成本投入,比如可能因为时间花费较多而引起需求延误。又或者我们做的是一个持续时间不长的小型活动系统(国内很多这样的临时产品),那就需要自己去权衡利弊了。

另外,我觉得需要长期维护的系统是最应该实践设计原则的,绝对是一劳永逸的好事,哪怕是跟产品砍一些功能需求来做都是非常值得的。只要尽力说服,产品和老板是会给一定的时间让开发去做这些优化事项的,毕竟产品和老板也希望自己之后的需求总能够快速被实现,而不是项目越往后做一个新需求或者变更一个旧功能越困难。

本文我总结的可能会有些理论化,建议文后可以找相关书籍进行系统了解,另一方面还可以在实践中去实际感受。当理论知识被应用于实践中,而且还有一点点成效时,那是非常有成就感的。

六、Reference

[1] 《Clean Architecture》 https://github.com/sdcuike/Clean-Code-Collection-Books/blob/master/Clean%20Architecture%20A%20Craftsman‘s%20Guide%20to%20Software%20Structure%20and%20Design.pdf

[2] 《object-oriented software construction》 chrome-extension://ikhdkkncnoglghljlkmcimlnlhkeamad/pdf-viewer/web/viewer.html?file=https%3A%2F%2Fweb.uettaxila.edu.pk%2FCMS%2FAUT2011%2FseSCbs%2Ftutorial%2FObject%2520Oriented%2520Software%2520Construction.pdf

[3] 让里氏替换原则为你效力 https://yuanshenjian.cn/make-lsp-working-for-you/

[4] 极客教程 里氏替换原则 https://geek-docs.com/design-pattern/design-principle/liskov-substitution-principle.html

[5] 通过实例讲解Go的七大设计原则 https://km.woa.com/group/22373/articles/show/482249?kmref=search&from_page=1&no=5

[6] 设计模式概念和七大原则 https://cloud.tencent.com/developer/article/1650116

第一次做新人导师的总结

背景

到目前,我差不多入职腾讯两年多了。最近三个月我做了一次角色上的转变,从以前一直是被别人指导的角色,转换成了需要去带一名新人的导师角色。我带的是一名暑期实习生,在做新人导师的过程中有不少感悟。这篇文章主要是想总结一下在这一过程中,我个人做得比较好与不太好的地方,以及从个人角度谈谈如何做到新人眼中的好导师,另外,我在带新人的过程中,对于向上管理也有了一点新的体会。我觉得鹅厂招进来的新人都十分优秀,我从实习生的身上也学到了一些好的习惯和品质。总之,第一次做新人导师收获不小。

做得比较好的部分

先来谈一谈我觉得在做新人导师过程中,自己做得比较好,以后再做导师时需要坚持的部分:

1 新人入职前主动建立联系

在新人入职前,我先添加了他的微信,与他在微信上做了简单的沟通。主要了解一些他个人的学习和生活情况,包括他人在哪里,什么时候可以来实习,实习多久,对深圳是否熟悉,来了之后怎么租房子等。了解新人情况后,如果有一些需要帮助的,也能尽早了解到。来深圳的实习生有很多都是外地人,尤其还有一些是非常远的北方,来到深圳还要一个人租房子,可能有许多地方都需要适应,如果导师能早点联系并提供建议和帮助的话,可能对于新人来说适应起来会顺利很多。

2 新人入职后及时同步培养计划

新人入职后,我先跟他口头沟通,强调了一些公司内必须注意的事情,比如高压线。然后带他熟悉了工作环境以及组内的同事。在他配置工作设备以及学习一些基本资料的同时,我这边就开始制定他的培养计划。这个培养计划我一开始是考虑在他来之前就做好,在他来之后就立即同步给他。不过后来觉得还是等他来后,先多了解一些他的情况以及意愿后再做这个计划。差不多在他入职第三天我完成并跟他同步了他在实习期间的培养计划。

3 不定期与定期沟通

在新人来后,我及时跟他确立了我们的沟通机制。不定期的沟通就是让他在有事情、有问题时随时可以来找我,如果是紧急的事情,就当面找我或者打电话联系,我都会随时响应。如果是不紧急的事情,就微信或者企业微信联系我,我会定期查看并回复的。还有就是定期的沟通,我们约定每周五下午都必须进行一次单独沟通,这个沟通就是简单聊聊他一周的工作和感受,碰到的问题(工作和生活上的都行),以及回顾下培养计划。我觉得通过这两种沟通方式,我们起码在信息上可以做到对称,而且也可以及时在他需要协助时予以帮助。

可以做得更好的部分

第一次做新人导师,我还是有很多做的不够好,以后需要注意的地方:

1 对预期结果描述不够清晰

我在培养计划中给新人定了两个工作目标和一个研究目标,这个研究目标最后要以文档和PPT分享来作为结果呈现。我觉得自己在对新人的研究目标跟进方面做的还不够好。首先我是知道研究目标对新人来说是有一定难度,需要花一定时间的,但是我在前期也只是每周例行沟通时提醒一下,并没有在前期给出一个具体的阶段性完成时间以及预期的结果,导致这个研究目标最后完成的比较仓促,虽然新人最后在中心做的分享还是不错的,但我觉得如果我前期能够给他更清晰的指引,其实是可以做的更好的。而且,由于我前期工作不够到位,最后新人其实有几天是比较辛苦的,而这辛苦其实是可以通过前期准备充足从而避免掉的。

2 工作细节扣得不够仔细

在新人完成日常工作期间,我觉得自己在某些细节方面扣的不够仔细。比如对他实现方案细节的把控,代码的Review等。我觉得有两方面原因,首先是因为我平时工作也有比较忙碌的时候,自己没有抽出一个专门用来指导他工作细节的时间,另一个原因是自己在工作细节方面本身做的也不够好,所以对这方面也没有特别好的直觉,如果帮他把握细节就会觉得是比较累的一件事,也就去逃避做这些事。这块我觉得自己有待加强。

如何成为新人眼中的好导师

我前面总结了一些自己做的比较好以及不好的部分,我觉得如果能把做的好的地方继续坚持,做的不好的地方多多注意,那应该就可以成为新人眼中的好导师。这里我也对这些思考做一下总结,我觉得如果要成为新人眼中的好导师,至少应该做到下面这些:

1 新人入职前就开始准备。

首先要通过他的简历了解到他的基本情况,除了个人信息,也要了解他的学习和工作经历,所擅长的技能,面试评价等。提前添加联系方式,与新人提前沟通,在了解新人的同时,也可以适当地介绍自己,建议信任。在入职前如果新人有什么问题,也可以及时协助解决。还要主动找新人的直属上级了解一下新人来后的大体工作方向以及定位。

2 新人入职后,互相的第一印象是很重要的。

有很多新人是第一次来深圳或者第一次来工作地这边,非常有必要在新人来的第一天给予清晰的位置指引,新人到达后,可以去楼下或电梯间接一下,因为可能办公楼好找,但工位一般很难找的。如果新人来的第一天就要兜圈子找工作地,想必体验会非常不好。

3 新人刚入职,要多关照下。

需要带新人在办公区,公共区随便转一转,带他熟悉一下工作环境,顺便也和导师相互熟悉一下。我觉得新人来的前期如果导师方便的话,就主动叫他一起吃饭。因为新人刚来初期,一个人都不认识。除了导师,估计其他人都是陌生的,如果导师吃饭的时候也不叫他,估计他也不好意思找导师,刚来就一个人吃饭体验也挺不好的。新人在入职后,肯定是要先配置工作设备,这时候导师可以多关注下,看有没有什么需要帮助的。配置好之后,就可以先给他一些时间让他了解一下公司最基本的东西,比如公司高压线、瑞雪、价值观等,这些都是有既定资料的。需要强调的是高压线部分,这里导师有必要在新人刚来时,就跟新人非常严肃的强调。

4 制定培养计划要及时。

在新人了解公司基本资料的同时,导师就应该开始制定新人的培养计划了,包括学习计划、工作计划等。制定计划的依据是对他工作的定位、他自身的技能以及参考他的意愿。制定的计划一定要依据SMART原则,方便执行达成与复盘。有时候制定的计划也不是一成不变的,在后续的日常工作中,导师也应该多沟通,多观察。在觉得培养计划有必要调整时,就及时沟通与调整,并把变化同步给相关人员。

5 建立沟通机制。

在开始常规的工作之后,建立一个和新人有效的沟通机制是非常重要的。我是将沟通机制分为不定期的沟通和定期的沟通。不定期的沟通就是要让新人有问题时随时可以找到导师,如果是紧急的事情,那么导师就应该立即有回应,如果是不紧急的事情,那么导师可以在固定的时间来进行答复。而定期的沟通就是指定一个具体的周期进行,比如每周要进行一次一对一的单独沟通。如果是特别忙碌的导师,那也最起码要双周跟新人进行一次单独沟通。周期性的沟通可以让新人总结下该周期的学习和工作情况,感悟和收获,碰到的问题。而导师也应该在定期沟通时,给出一些对新人的建议,并且最后要一起回顾一下培养计划,保证每周期的工作是围绕着培养计划开展的。

关于向上管理

在做新人导师期间,由于经常会给新人分配一些任务,也会针对他需要做得更好的地方予以指导。所以,从这个角度来看,我对向上管理也有了一些新的体会,主要包括这几点:

1 事事有回应,件件有着落,凡事有交代。

在这期间,我对工作上的闭环思维有了更多的体会。因为我发现,在我给新人布置任务时,这是很重要的一点。如果一件事布置下去了,那么我是希望能够得到一个反馈的。首先是你已经知道了这件事了,然后我需要知道这件事的后续进展。一件事可以正常完成的情况下,一般是不会有什么问题的,大多数人都会有所反馈。但有时候一件事无法完成,或者无法按照既定计划完成时,这也是需要有所反馈的,因为这种情况是可以讨论和商量的,但重要的是要及时的有所反馈。对于布置任务的人来说,最头疼的事情莫过于布置完任务后,相关人就没音了,还需要自己不断的去问才能得到结果。

2 积极主动

在工作中,积极主动是非常重要的一个品质。所谓积极主动,我觉得具体来说,首先就是在工作中要发挥自己的主观能动性,要有自己的思考在工作里面,除了完成自己手头上的工作外,还可以多一些想法,比如手上的事情有没有什么问题?哪里还可以做得更好?抛开别人给的方案,自己能否提出一个更好的方案?这一点我觉得我带的新人做的就不错,他之前有通过完善告警来主动的发现一些问题并提出解决方式。然后就是主动总结与汇报工作,有时候分配工作的人事情会比较多,如果在被分配了工作后,最好是能够主动的去汇报自己手头上的工作,而不是被动等待。这一点我觉得和“凡事有交代“是呼应的。还有一点就是可以主动的去承担一些边界模糊的工作。最理想的情况当然是每个人的工作边界都完美清晰,大家都做好自己的事就能使整体达到很好的结果。但理想是理想,现实情况是,不可能有完美的工作边界,只要开始工作,就一定会有边界模糊的工作存在,对于这些工作,我觉得有必要积极主动的去承担起来,这也是为了达成一个整体的目标。

3 要事第一

这个就是要在工作中分清楚工作任务的优先级。这里也是需要做向上管理的,原因在于,有很多时候我们和布置任务的人信息是不对称的,所以当我们在面对事情比较多,或者对某件事的重要性不确定时,是需要跟布置任务的人主动去对齐事情的优先级的。以免有时候自己埋头苦做花很多时间做的事情其实并不是最要紧最重要的事。

总结

这是我第一次在鹅厂担任新人导师的角色,因此我觉得这次导师的经历对我来说挺重要的,通过这次经历,我体会到了做一个导师容易,但做好一个导师并不容易。也希望通过这个总结,来使自己可以在以后的导师角色中扮演的更好。而向上管理的体会也是这次经历的意外收获之一。

Go语言内存泄漏问题排查总结

背景

我们使用Go语言开发了一个后台服务,在发布测试环境后,发现内存使用量会随着时间的推移持续增加。因此服务的Pod会隔一段时间重启一次,因此,需要排查一下该问题。此文是对排查过程的记录以及排查后的思考总结。

环境准备

本文假设开发机环境中已经安装了go、pprof、graphviz,并且后台服务中已经集成了pprof。

业务中内存泄漏的现象以及排查思路

内存泄漏的现象

在这里插入图片描述
我们将服务发布到测试环境中之后,可以从内存监控的看板中看到,内存使用量随着时间的推移会一直增加,而且会一直达到内存设置的限制并且重启Pod。这种情况的出现,就是内存出现了泄漏的问题。

排查思路

使用Go语言开发的后台服务,在遇到这种情况时,我们首先应该想到的是可能Goroutine出现了泄漏,也就是说,可能开启了大量Goroutine,但是没有进行回收导致。因为Go语言程序的基本运行单位就是Goroutine,因此大多数内存泄漏都是Goroutine的泄漏。我们按照重点来排查,可以节约时间和精力。

我先在开发机上运行起来服务,然后请求pprof来查看Goroutine的运行情况:

请求http://10.111.55.111:8081/debug/pprof/,(10.111.55.111为我的开发机IP)可以看到:
在这里插入图片描述
然后我们选择查看其中的Goroutine:
在这里插入图片描述
再过一段时间后,我们再次刷新一下,再次查看Goroutine的数量:
在这里插入图片描述
可以发现蓝框标记的Goroutine数量一直随着时间的推移而增加,这就是内存泄漏的Goroutine。如果发现变化比较缓慢,我们也可以进行压力测试后再观察。

按照展示的调用信息,我们定位到Redis线程池实现中的这行代码。可以看到,是这行代码发生阻塞,这行代码是做什么的呢?其实就是Redis线程池的定时回收空闲线程功能,只是我们有大量的空闲线程还没有到时间被回收。于是阻塞在了这里。
在这里插入图片描述
接着往上找调用函数,就可以发现是新建Redis连接池时调用的该函数。
在这里插入图片描述
在这里插入图片描述
最终可以定位到导致内存泄漏的原因:其实就是我们在很多地方新建了Redis连接池,但是设置的关闭空闲连接的时间又不合理,导致在大量请求过来时,就会不断的累计连接数量,于是也就有大量的连接未关闭并一直阻塞在定时器那里。
在这里插入图片描述
后续我们将Redis线程池做成了一个公共引用,只在初始化服务时初始化一些我们需要的连接量,于是该内存泄漏问题得到了解决。
在这里插入图片描述
可以看到,内存使用量不再随着时间的推移而不断增加。而且Goroutine的数量也不再异常增加了。
在这里插入图片描述

内存泄漏的拓展思考

Goroutine泄漏为什么会导致内存泄漏

排查了一个Goroutine泄漏导致的内存泄漏例子后,我们再思考一下,为什么Goroutine的泄漏会导致内存泄漏呢?
首先,我们需要清楚什么是Goroutine泄漏。

Goroutine泄漏是指,我们创建的Goroutine没有在我们预期的时刻关闭,导致Goroutine的数量在服务端一直累积增加,最终影响到服务的性能。

然后,为什么Goroutine的泄漏会导致内存泄漏呢?

有两点原因:

  1. Goroutine本身的堆栈大小是2KB,我们开启一个新的Goroutine,至少会占用2KB的内存大小。当长时间的累积,数量较大时,比如开启了100万个Goroutine,那么至少就会占用2GB的内存。
  2. Goroutine中的变量若指向了堆内存区,那么,当该Goroutine未被销毁,系统会认为该部分内存还不能被垃圾回收,那么就可能会占用大量的堆区内存空间。

Goroutine会发生泄漏的场景总结

1 从channel中读或写,但没有对应的写或读

我们都知道,channel分为两种类型,unbuffered channel和buffered channel,我们先讨论unbuffered channel。

在channel被创建后未被关闭前,我们若从channel中读取数据,但又一直没有数据写入channel中,那么channel就会进入等待状态,对应的Goroutine也就会一直阻塞着了。对应的,当我们往channel中写数据,但又一直没有从channel中读。那么也会出现被阻塞的情况。

以从channel中读,但没有写为例:

func ChannelLeak(w http.ResponseWriter, r *http.Request) {
    ch := make(chan int)

    go func() {
        value := <-ch
        fmt.Println("value is: ", value)
    }()
}

以上程序就会导致Goroutine的泄漏。
至于buffered channel,其实和unbuffered channel情况是类似的,只是buffered channel是读完缓存后,或写完缓存后会导致阻塞,这里就不再赘述了。

2 在使用select时,所有的case都阻塞

可以看一下这个例子:

func add(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x = x + y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func Add() {
    c := make(chan int)
    quit := make(chan int)

    go add(c, quit)

    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }

    // close(quit)
}

我们可以看到,当Add函数for循环了10次之后,add函数就会一直阻塞了,也就出现了Goroutine泄漏。
正确的做法应该是在合适的时间将quit关闭,那么add协程就可以安全退出了。

3 Goroutine进入死循环

由于代码逻辑上的bug,Goroutine进入了死循环,则会导致资源一直无法释放。

如下例:

func loop() {
    for {
        fmt.Println("loop")
    }
}

go loop()

Goroutine泄漏的预防

以上总结了四个比较常见的Goroutine泄漏的场景,我在这次业务中碰到的内存泄漏问题就是由于开启了大量Goroutine,但定时器一直在等待channel数据的到来,导致长时间阻塞导致。就是以上介绍的常见场景的第一种情况。

至于Goroutine泄漏,应该是预防重于解决,预防Goroutine泄漏的方法如下:

  1. 在Goroutine中使用到了channel时,要考虑清楚channel何时可能会阻塞,以及阻塞时的具体情况
  2. 创建了一个Goroutine时,就要考虑清楚Goroutine应该如何结束
  3. 注意代码程序的逻辑,切忌在代码中出现死循环

总结

pprof不仅是一个可以用于做性能优化的工具,也是一个可以用来排查问题的好工具。善用这类工具对于Go开发者来说是非常重要的。另外,在写Go语言代码时,要重视Goroutine泄漏的问题,这种问题不出现则已,如果出现,就很可能会导致线上问题,后果是非常严重的。

业务中使用缓存的实践总结(一)

背景

之前我在团队中一直负责的是数据开发的相关工作。因此,对于后台系统开发并没有很多工作实践。不过近期我也着手做了一些后台开发的相关工作,在实践过程中使用到了缓存,在实践一段时间后,在此文中对我使用缓存的实践做一总结。

实际上,我们的业务场景非常简单,没有很高的并发以及时延要求。因此,在设计缓存时,我们也没有把问题复杂化,方案设计的很简单。不过也并不影响我们进一步的思考缓存的使用。本文主要总结的是在工作实践这一场景下更多的联想思考,包括为什么要使用缓存、我们的缓存方案选择以及使用缓存中可能要注意的问题。一篇文章肯定远远总结不完我们对于缓存的实践,因此后续还可能会继续写文章讨论一这话题。

业务场景

我当前在做的其中一个工作是游戏中心的后台开发。所谓游戏中心,对于外部来说其实就是一个H5页面,该页面个性化的展示了我们IEG的各个游戏,之后规划会将该H5页面嵌入到一些中小媒体的APP中,用于做腾讯游戏的推广和分发。目前我们系统的QPS大约只有500,不过我相信随着后续媒体的不断接入,QPS会持续升高的。

游戏中心的前端页面是这样的:
在这里插入图片描述
该前端页面主要是调用我们后台的两个接口:游戏列表、游戏详情。

后台的这两个接口会去查询到相应的数据并返回给前端做展示。因此,对于前端页面来说,只有对后台数据的读取,没有写入。

后台数据的写入是通过游戏中心的管理端进行的。

整个业务示意如图所示:
在这里插入图片描述

为什么要使用缓存

我们的后台服务在查询数据时使用到了缓存,那么,我们为什么要使用缓存呢?

其中有三点原因:

  1. 减轻数据库的压力
    具体的例子来说,我们使用Redis做缓存。一个Redis实例可以有万级的吞吐量,而一个MySQL实例大概是千级的吞吐量。在业务请求并发量很高的情况下,如果每次查询数据都要到MySQL数据库里去查,那么MySQL会压力非常大。极端情况下,有可能会导致MySQL挂掉。就算我们MySQL可以堆很多资源,从成本角度来说,也不如加上缓存。
  2. 提升用户的体验
    缓存使用的是计算机内存进行存储,而数据库使用的是计算机的磁盘。从读写性能上来说,当然缓存的读写性能远优于数据库。因此,在查询数据的时候,缓存的时延要远低于数据库查询的时延。对于用户来说,就可以有更好的使用体验。
  3. 提高系统并发量
    如前面所述,缓存的并发量远大于数据库的并发量。使用缓存可以有效提高系统的请求并发量。

实现方案的选择

缓存设计模式主要有三种:Cache Aside、Read/Write Through和Write Behind Caching。

这里就不一一详细说明了,简单来说的话,这三者就是更新数据库的时机有所不同:

Cache Aside:在更新缓存的同时更新数据库。
Read/Write Through:更新缓存后,由缓存来更新数据库。
Write Behind Caching:更新缓存后,缓存定时异步更新数据库。

我们实际的方案设计中有Cache Aside的设计思想,但是也不完全一样。

方案的思考

方案一

第一种方案的操作示意如图所示:
在这里插入图片描述
外部会先从Redis中读取数据,如果读取到了数据则直接返回。若没有读取到数据,则会去MySQL中读取相应数据,并同时将MySQL中读取到的数据更新至Redis中。若MySQL中也读不到该数据,则报错返回。

在MySQL数据发生更新时,会同时将数据更新到Redis中,并设置Redis数据的过期时间。

外部读取数据时的流程如下图所示:
在这里插入图片描述
该方案的优缺点如下

优点:

  1. 数据的更新非常及时,运营人员操作更新数据后,外部获取到的数据也会立即更新。
  2. 一般情况下不会出现数据不一致的情况。

缺点:

  1. 若发生缓存雪崩或缓存击穿的情况,那么就有导致MySQL出问题的风险。
  2. 相较于本地Cache,读取Redis的数据还是需要一定的网络IO。

方案二

第二种方案的操作示意如下图所示:
在这里插入图片描述
这里使用了定时更新的方式,运营人员通过管理端往MySQL中写数据这块不用赘述。

MySQL中的数据会每隔3分钟往Redis中刷新数据。而外部在请求数据时,首先会访问本地的Cache,如果本地Cache中没有该条数据,则会去Redis中进行查询,并同时会将该条数据更新至本地Cache(本地Cache会设置过期时间)。而若Redis中也查询不到该条数据,那么我们就会返回报错信息,不会再到MySQL中进行查询。

外部读取数据时的流程如下图所示:
在这里插入图片描述
该方案的优缺点如下

优点:

  1. 实现很简单,不需要访问数据库,只需要写一些定时任务执行。
  2. 不用访问数据库,可以完全避免缓存雪崩与缓存击穿所对数据库会带来的风险。
  3. 使用本地Cache,会比访问Redis获取数据更快一些。

缺点:

  1. 数据的更新延迟很大,完全不适用对数据及时性要求高的系统。
  2. 使用本地Cache,若有多节点对外提供服务,很可能会有数据不一致的问题。

方案三

第三种方案的操作示意如下图所示:
在这里插入图片描述
方案三在方案一的基础之上增加了本地Cache缓存。

该方案在外部读取数据时的流程如下图所示:
在这里插入图片描述
该方案的优缺点如下

优点:

  1. 使用本地Cache,会比访问Redis获取数据更快一些。
  2. 每次有更新都会非常及时的同步到外部访问的数据。

缺点:

  1. 若发生缓存雪崩或缓存击穿的情况,那么就有导致MySQL出问题的风险。
  2. 使用本地Cache,若有多节点对外提供服务,很可能会有数据不一致的问题。

缓存实现方案的对比

方案数据一致性请求时延数据更新时机方案风险
方案一没有不一致的风险Redis的IO时延立即更新缓存雪崩/击穿时,会导致MySQL风险
方案二有不一致的风险内存读取时延不会立即更新缓存雪崩/击穿时,不会导致MySQL风险
方案三有不一致的风险内存读取时延立即更新缓存雪崩/击穿时,会导致MySQL风险

业务场景下的缓存实现方案选择

通过上面的缓存实现方案对比,基本上我们可以明确三种不同的方案的优劣势。

最终我们的业务选择了方案二来进行具体的实现,原因是:

  1. 我们当前业务有几个模块共用了同一个数据库,方案二可以完全保证数据库的稳定。
  2. 当前业务对请求时延有要求,但对数据一致性没有什么要求。
  3. 当前业务场景下,对数据的更新时机没有高要求,就算数据变更后隔段时间再更新至用户侧,也不会对业务有影响。

当然,之后如果业务的要求有调整,我们后台这边的缓存策略也会做相应的调整的。

使用缓存可能会带来的问题的思考

我们的业务中引入了缓存机制,是为了有效的利用缓存的一些优势,比如减轻数据库的压力、提升用户的体验、提高系统并发量。

但是,缓存的引入也可能会带来一些新的问题。虽然我们当前的业务场景较为简单,可能不会碰到缓存带来的问题,但是也不影响在该场景下的一些延伸思考。接下来,就从几个方面来思考一下缓存可能会带来的问题。

数据不一致

首先,只要引入了缓存机制,那么就不可避免的会要考虑到数据一致性的问题。我们先在这里明确一下数据一致性的定义:

  1. 缓存中存在数据时,缓存中的数据需要与数据库中的数据一致。
  2. 缓存中不存在数据时,数据库中的数据需要是最新的数据。

如果以上两点有其中一点不满足,那么我们就认为数据是不一致的了。因此,我们在使用缓存的过程中要思考的问题就是如何去保证数据的一致性。

缓存可以分为两种类型:读写缓存以及只读缓存。接下来我们就以上述方案一为例分别对这两种类型缓存进行数据一致性的分析:

读写缓存

读写缓存不仅可以被读取数据,还可以在缓存中进行增删改操作。因此,使用读写缓存时,一般会有两种数据写回数据库的方式。

一种是外部只操作缓存,然后每隔一段时间将缓存数据写回到数据库一次。这种方式叫做异步写回策略。一般适用于非核心的业务数据。因为如果在缓存数据写回到数据库之前就挂了,那么这些数据的变更记录也就永久丢失了。

另一种是外部在更新缓存的同时,也会同步的更新数据库。这种方式叫做同步直写策略。对于核心的业务数据我们就应该用这种策略。

对于该策略来说,要保证数据的一致性,那么就必须保证缓存和数据库更新的原子性,也就是说,如果要更新成功就都成功,如果更新失败就都回到更新前的状态。只有这样才能保证数据的一致。在这样的前提下,我们具体分别通过增、删、改操作来进行描述。

增:无论是先更新缓存还是先更新数据库,都不会影响到数据的最终一致性。

删:若先更新缓存,后更新数据库。假设操作A更新缓存完成后,且更新数据库前,这是如果有操作B对缓存该数据进行查询,那么就可能会出现将数据库的数据重新刷回缓存的情况,之后A再更新数据库,就会导致最终的数据不一致。若先更新数据库,后更新缓存。那么就不会影响到数据的最终一致性。

改:无论是先更新缓存还是先更新数据库,都不会影响到数据的最终一致性。

可以看到,无论我们是进行增、删、改的哪个操作,最有利的更新策略就是先更新数据库后更新缓存,这样就不会造成数据不一致的问题。

只读缓存

只读缓存与读写缓存不同,对于读操作和删操作来说是一样的逻辑,但是对于增操作和改操作则不同。

此处读操作和删操作不再赘述,只看一下只读缓存的增操作和改操作。

增:对于只读缓存,在做增操作时,只需要对数据库进行操作,也就是只需要在数据库中增加新的数据即可,在下次外部访问缓存时会自动将缓存更新。

改:在进行改操作时,我们有两步需要做,一个是将缓存中对应的数据设置为失效,一个是更改数据库中的数据。若我们先更新缓存,再更新数据库,则可能会碰到数据不一致的问题,原因这里不再赘述。若我们先更新数据库,再更新缓存,则可以保证数据的一致。只不过是数据更改的生效会稍稍慢一些。

对于只读缓存来说,我们也是要保证缓存和数据库操作的原子性。同时,最优的策略依然是先更新数据库后更新缓存。

整理这些情况,如表所示:
在这里插入图片描述
上表看着复杂,实际上整体来看,无论是读写缓存还是只读缓存,我们首先要保证的是缓存和数据库操作的原子性,否则就可能出现数据不一致的情况。同时,我们在进行数据操作时,尽可能考虑优先操作数据库后操作缓存,这样会极大程度的保证数据的一致性。

缓存雪崩

所谓缓存雪崩,指的是缓存在一段时间内无法对外提供服务,这样导致的结果就是会突然有大量的外部请求直接访问到后台数据库。而我们都知道,后台数据库的吞吐能力一般来说都没有缓存大,如果本来应该是缓存的请求,全部都打到数据库上,则很有可能会出现数据库卡顿的情况,甚至可能会引起数据库宕机。

如图所示:
在这里插入图片描述
一般来说,会有两种情况会导致缓存雪崩:

第一种,就是缓存服务真的宕机了。这种情况,我们一般有三种手段来进行处理。

  1. 服务熔断
    所谓服务熔断,是一种比较暴力的解决方式,也就是在缓存服务宕机时,让客户端的请求不再打到缓存服务上,而是直接返回我们预设的默认值,或者干脆直接返回报错信息。以避免因为缓存服务的宕机而导致的后续连锁反应。这样,当缓存服务恢复正常时,我们也可以快速的将服务恢复到正常状态。
    如图所示:
    在这里插入图片描述
  2. 服务限流
    上面讨论的服务熔断的处理方式或许太过粗暴,会直接导致用户的请求都暂时受到影响。我们还可以用另一种服务限流的方式来进行缓存宕机后的处理。
    所谓服务限流,指的是由前端服务限制每秒请求的个数,将每秒请求的个数限制在数据库所能承受的数量内,那么就算是每个请求都直接访问数据库也不会导致数据库出问题了。
    在这里插入图片描述
  3. 服务降级
    还有一种比较合理的处理方式是服务降级。
    所谓服务降级,指的是在缓存服务宕机的情况下,只对核心接口请求进行处理,而将非核心接口的请求直接返回。这种处理方式更能够考虑到业务的需求。

前面就考虑完了在缓存服务宕机所引发的缓存雪崩的情况下该如何去处理。

另一种会引起缓存雪崩的场景,是大量的缓存同时过期。这时,虽然缓存服务没有问题,但是由于大量的请求都查询不到对应的缓存数据,也会一下子有大量的请求打到数据库上的。像这种情况所引起的雪崩是可以通过合理的设置缓存过期时间来避免的。

具体来说,就是我们在设置缓存过期时间时,不要将缓存过期时间设置为一个固定值,而应该将缓存时间设置为一个固定值加一个上下浮动的随机数。这样就能够比较好的避免因缓存同时过期而导致的缓存雪崩问题。

对于我们的系统而言,Redis缓存是存放在腾讯云上的,基本上可以放心使用不用担心雪崩的问题。但给缓存设置一个合理的过期时间还是非常重要的。

缓存击穿

缓存击穿有别于缓存雪崩,指的是由于某个热点数据的过期,而导致在同一时间有大量请求打到数据库上进行查询的情况。

如图:
在这里插入图片描述
对于这种情况的处理比较简单,就是我们要判别哪些数据在缓存中属于热点数据,这些数据很可能是每次请求都会去访问的。这种数据就不要设置过期时间了,让这些数据长期在缓存中,只有在数据库更新的同时再主动去更新这些缓存数据即可。

对于我们的系统来说,比如首页的游戏列表,这样的数据我们都是长期在缓存中保存的,因为这些数据只有可能被更新,而不会被淘汰。

缓存穿透

缓存穿透指的是外部请求的数据既不在缓存中,也不在数据库中。这种情况更为严重,因为会使缓存和数据库同时承受很大的压力。
在这里插入图片描述
可能会导致缓存穿透的原因有两个:

  1. 外部黑客的恶意攻击
  2. 对应数据被误删除

针对缓存穿透的情况,这里可以考虑三点:

  1. 接口做严格的校验
    这是预防的策略,防患于未然,外部之所以可以对系统进行恶意攻击,访问到不存在的数据,是因为前置的接口权限校验以及参数校验做的不够严谨导致。
  2. 给数据设置缺省值
    在数据被误删除的情况下,一个紧急的处理办法就是在缓存中写入对应数据的缺省值。在这种情况下起码可以对数据库做到保护作用。
  3. 使用布隆过滤器前置值存在与否的查询
    这种是比较优雅的预防以及解决缓存穿透的方案。我们在数据被写入到数据库之前先在布隆过滤器做一个标记,在缓存中查询不到数据时,我们就可以先查询布隆过滤器,若未查到标记的数据,那么我们也就可以直接给请求做返回处理了,从而保护了数据库免于被大量无效请求访问。

当前我们的系统隔断了对数据库的访问,因此也就不会出现缓存穿透的问题。不过,严格的接口校验也是必不可少的。另外,如果我们后续可以在Redis中做一个布隆过滤器的话,就可以用方案三来替代掉方案二了。

总结以上缓存问题以及对应的解决方案如下表:
在这里插入图片描述

总结

本文从近期业务使用的缓存场景出发,首先简单介绍了我们的业务场景,然后描述了我们为什么要使用缓存,接着对方案的选择过程做了记录,最后分析了我们之后使用缓存过程中可能会遇到的问题以及对应的解决方案。一篇文章肯定远远总结不完我们对于缓存的实践,因此后续还可能会继续写文章讨论一这话题。

使用Prometheus和Grafana做服务指标监控可视化(实践篇)

1 背景

最近团队开发了许多新的服务,这些服务都需要做指标的可视化监控,于是考虑使用Prometheus来实现,Prometheus是一个用于事件监控和告警的开源软件。本文是Prometheus的实践篇,主要总结了Prometheus的实践操作。主要考虑要先将它用起来,接下来会再总结一篇相关的原理文章。

2 安装及启动Prometheus

点击这里从官网直接下载Linux版的安装包,上传至Linux服务器上之后解压:
tar xvfz prometheus-2.25.0.linux-amd64.tar.gz
并且进入目录
cd prometheus-2.25.0.linux-amd64
在这里插入图片描述
我们可以查看一下Prometheus的版本信息:
在这里插入图片描述
将Prometheus运行起来:
./prometheus --config.file=prometheus.yml
在这里插入图片描述
运行Prometheus时需要指定配置文件:prometheus.yml
我们看一下这个文件的内容:
在这里插入图片描述
后面我们会用到global和scrape_configs,我们主要看一下:
global:
scrape_interval 表示 Prometheus 多久从指标服务端口抓取一次数据
evaluation_interval 表示多久检测一次告警规则,并进行相应告警
scrape_configs:
job_name表示我们要监控的主体名称
statis_configs的targets表示我们要监听的服务的IP及端口,也就是说,服务会从该端口向Prometheus提供指标数据

3 安装及使用Grafana

3.1 安装Grafana

虽然Prometheus自带的有可视化系统,但是相较于它,Grafana作为可视化监控系统会功能更强大,界面更美观。所以,我们将Prometheus作为数据采集器,将Grafana作为可视化监控系统来做数据指标的可视化。
我们使用Docker安装方式对Grafana进行安装:
docker run -d -p 3000:3000 grafana/grafana
运行该命令,Grafana就安装好了。如果服务器的3000端口被占用,可以对冒号前的”3000″进行修改,以使用其他端口进行服务。如:
docker run -d -p xxx:3000 grafana/grafana
比如我这里就是在三天前在3001启动了该服务
在这里插入图片描述

3.2 使用Grafana

安装完成之后,我们可以访问http://localhost:3000/进入到Grafana的登录界面,用户名和密码默认为admin、admin。
在这里插入图片描述
然后我们就可以选择我们希望被可视化的数据源了。这里,我们被可视化的数据源为Prometheus:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这里我们重点要填3个地方:
在这里插入图片描述
其中Access表示Grafana访问数据源的方式。如果可以直接通过浏览器访问,那么则选择Browser;否则,如果是需要通过Grafana的服务端来访问数据源的URL,则选择Server。

3.3 查看Prometheus的性能数据来进行Grafana测试

Prometheus的数据源配置好了之后,我们就可以配置dashboard来查看数据源的监控数据了。
Prometheus自带的有自身的监控服务,我们可以直接使用:
在这里插入图片描述
导入Prometheus的dashboard之后,我们可以在这里查看一下已导入的dashboard:
在这里插入图片描述
Prometheus的监控可视化图表:
在这里插入图片描述

4 使用Prometheus和Grafana做服务器监控可视化

我们已经了解到,Grafana只是一个用于从数据源获取数据并做可视化的系统,而Prometheus是Grafana的数据源之一。
Prometheus是一个监控系统,它的数据指标也需要从一个服务中获取。而真正用于采集数据指标,并将数据指标通过端口暴露给Prometheus采集的服务叫做Exporter。
这个Exporter可以自己写,用于定制自己的数据采集。也可以直接使用Prometheus官方提供的Exporter。官方提供的Exporter都是开箱即用的。我们先来试用一下,用官方的服务器指标监控Exporter以及MySQL指标监控Exporter。看看官方的Exporter都是如何即刻使用的,后续再进一步介绍如何定制自己的数据采集服务。
Prometheus的整体架构图如下:
architecture.png
(图片来自Prometheus官网)

4.1 官方Exporter使用

4.1.1 服务器指标监控

4.1.1.1 Exporter运行

服务器指标监控Exporter可以从我们的Linux服务器中采集到一些关键指标用于可视化
接下来,我们就从官网下载并运行该Exporter
在Linux中依次执行以下命令:

wget https://github.com/prometheus/node_exporter/releases/download/v1.1.1/node_exporter-1.1.1.linux-amd64.tar.gz

tar xvfz node_exporter-1.1.1.linux-amd64.tar.gz

cd node_exporter-1.1.1.linux-amd64

./node_exporter

在这里插入图片描述
在这里插入图片描述
执行之后,不出意外,我们可以看到上图所示服务执行起来了
在这里插入图片描述

该Exporter暴露9100端口、metrics路径给Prometheus,用于数据指标采集。
我们可以访问下/metrics路径,看看能否正常获取到服务器指标:
在这里插入图片描述
可以看到,我们已经可以正常获取到服务器的指标了。
接下来,我们就应该考虑如何将Exporter的端口添加进Prometheus的配置了。这样,Prometheus才能从该Exporter中采集到指标数据。

4.1.1.2 Exporter指标采集

前面说过,Prometheus的配置文件是其目录下的prometheus.yml文件。
我们将服务端口添加进scrape_configs配置中即可:
在这里插入图片描述

修改配置完成后,我们对Prometheus进行重启,或者发送HUP信号(killall HUP prometheus),让Prometheus重新加载配置:
在这里插入图片描述
这样,Prometheus就可以从node_exporter这个官方Exporter中采集到该服务器的监控指标了。

4.1.1.3 配置Grafana面板从而可视化指标

我们已经通过Prometheus从Exporter中获取到了指标数据,接下来,我们就应该在Grafana中配置一下面板,从而将采集到的服务器指标数据做可视化。
在这里插入图片描述

import用于导入一个新的可视化面板
在这里插入图片描述
可以看到,我们可以从Grafana官网导入一个现成的面板。
进入Grafana的dashboard页面
在这里插入图片描述
我们可以直接使用上图所示的Node Exporter Full面板用于可视化服务器的指标数据。也可以在上图所示的搜索框中搜索自己想用的面板。
这里我们使用Node Exporter Full面板,点击进入,可以看到页面:
在这里插入图片描述
可以看到,这个面板的数据源就是Prometheus,且这个dashboard的ID为1860。我们将这个ID填入Grafana的import的输入框中。
在这里插入图片描述
在这里插入图片描述
数据源选择Prometheus。可以看到可视化页面:
在这里插入图片描述
该界面展示的就是我的服务器的指标监控数据了。

4.1.2 MySQL指标监控

收集MySQL的指标数据进行可视化的流程与上述的服务器指标监控类似,这里就简单描述一下。
首先,还是要先下载并运行MySQL的Exporter。

wget https://github.com/prometheus/mysqld_exporter/releases/download/v0.12.1/mysqld_exporter-0.12.1.linux-amd64.tar.gz

tar xvfz mysqld_exporter-0.12.1.linux-amd64.tar.gz

cd mysqld_exporter-0.12.1.linux-amd64

与服务器指标数据监控的不同之处在于,MySQL指标监控的Exporter在运行之前必须要配置一下服务器连接MySQL的方式。这里,我们通过配置环境变量DATA_SOURCE_NAME的方式进行MySQL连接。格式如下:
export DATA_SOURCE_NAME='root:rootmysql@(localhost:3306)/'
配置完环境变量后,即可启动MySQL的Exporter:
在这里插入图片描述
然后,我们像Node_Exporter一样,在Prometheus的配置文件中配置一下MySQL的Exporter的IP以及端口:
在这里插入图片描述
更新Prometheus配置后,我们可以在Prometheus的Dashboard官网中找一个MySQL的面板导入,可以看到最终的可视化界面如下:
在这里插入图片描述

4.2 使用go-kit实现自己的Exporter服务

4.2.1 定义指标和标签

假设我们现在做了一个服务,该服务会接收外部的请求,我们需要知道一段时间内某个客户端(client_id)请求某个接口的次数。这是一个非常常见的统计需求。
go-kit的包为:"github.com/go-kit/kit/metrics/prometheus",这里我们重命名为kitprometheus
primetheus的go工具包为:"github.com/prometheus/client_golang/prometheus",这里我们重命名为stdprometheus
我们使用go-kit创建一个指标和标签的代码如下:

appidGetFieldKeys := []string{"method", "client_id"}
    appidGetRequestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
        Namespace: "namespace_name",
        Subsystem: "subsystem_name",
        Name:      "metrics_name",
        Help:      "Number of client_id get received.",
    },appidGetFieldKeys )

这样,我们就创建了一个指标为:namespace_name_subsystem_name_metrics_name,标签为[method, client_id]的数据。
当然,Prometheus不仅提供了用于统计次数的Counter,还有一些其他的统计类型。详见Prometheus官网文档

4.2.2 统计数据

我们定义好了指标以及标签后,就可以进行数据统计了。
比如,我们收到了一次client_id为”123″的请求,那么,我们就应该在调用真正的处理函数之前先进行一次数据的统计。

func  MethodName(c *gin.Context) {
    client_id := c.Query("client_id")

    defer func(begin time.Time) {
        lvs = []string{"method", "AccessToken", "client_id", client_id}
        appidGetRequestCount .With(lvs...).Add(1)
    }(time.Now())

    AccessToken(c)

    return
}

这样,我们就做好了client_id的一次接口请求统计,并且对应的方法为AccessToken。如果这次传入的client_id参数为123,那么,我们就完成了一次方法为AccessToken,client_id为123的一次请求统计。
最后,我们通过某个服务端口的/metrics路径对Prometheus提供我们的指标数据,
Prometheus访问/metrics路径即可获取到我们的统计数据了。

"github.com/prometheus/client_golang/prometheus/promhttp"

r := gin.Default()
r.GET("/metrics", gin.WrapH(promhttp.Handler()))

这里,我们也可以通过直接访问该路由查看所有的指标数据:
在这里插入图片描述
Prometheus也提供了一套查询语言PromQL方便进行各个数据的查询。
比如:
我们将/metrics的端口添加到Prometheus的配置文件中后(前面描述过,这里不再赘述),在Grafana添加一个面板
在这里插入图片描述
使用PromQL查询出我们希望看到的可视化界面
在这里插入图片描述
实际上,前面所描述的官方Exporter所对应的可视化图表,就是各种PromQL查询的结果,只是官方做了一系列的封装罢了,所以看起来比较漂亮。我们也可以详细研究下Grafana面板的使用方法,从而用我们采集到的各种指标做出更多更直观的图表。

5 总结

本文先介绍了Prometheus以及Grafana的功能与使用方法。然后通过官方Exporter来介绍该如何用Prometheus实现一个最简单的监控界面。最后简单的介绍了如何通过编写业务代码的方式来定制自己服务的Exporter,从而实现用Prometheus和Grafana来做服务指标的监控可视化。本文是入门实践篇,以后会再总结一篇原理篇。

技术人员做数据质量治理实践总结

1 导语

本人是腾讯游戏市场平台部的一名开发人员,目前主要负责O2广告投放系统的开发以及数据质量治理工作。O2是市场平台部用于做游戏广告投放以及相关效果数据回收展示的系统。该系统不仅在功能上支持广告的高效精准投放,也同时是一个较为庞大的数据系统,每天O2都会有大量的数据流入、计算、以及可视化。
当一个数据系统越来越复杂,参与方越来越多,其需要管理的数据量越来越庞大时,数据治理尤其是针对数据质量的治理就变得越来越重要且紧迫了。
本篇文章主要是对我过去一段时间针对O2所做的数据质量治理工作做一总结与分享,希望能够帮助到同样在做数据质量治理工作的同学。

2 什么是数据质量治理

要明确什么是数据质量治理,首先我们需要了解一下什么是数据治理。
数据治理的定义是,在管理数据资产过程中行使严格的管控(包括计划、实施和监控),以确保组织可以从数据中获取最大的价值。数据治理包括了为各种应用场景设计数据模型、安全的存储与访问数据、合理的共享数据、以及保障数据满足用户的需求等。
其实,这一切的前提都是数据本身必须是可靠且可信的,换句话说,数据应该是高质量的。而数据质量治理,就是利用数据治理的手段,不断提高系统数据的可靠与可信性的过程。

3 数据质量治理的必要性

提高数据的质量本身并不是目的,而是我们团队获取最大化利益的一种手段。值得信赖的数据不仅可以提升决策人员的决策效率以及成果,也可以降低发生风险的概率。当使用者使用可靠的数据时,他们可以更快、更一致地回答问题,做出决策。如果数据是高质量的,他们也能花更少的时间发现问题,而将更多的时间用于使用数据来获得洞察力、做决策、服务用户。因此,对于一个数据系统来说,数据质量治理是非常必要且紧迫的工作。

4 实践经验总结思维导图

在这里插入图片描述

我在实践中,对数据质量治理做了一个总结,整理出了如下思维导图。不过,数据质量治理不是一个项目,而是一个持续性工作,因此,以后肯定也会不断对该总结做修改与完善。
文章的后续部分会对该导图中的内容一一做分析讲解。

5 技术人员如何做数据质量治理

5.1 了解数据系统的使用者

在这里插入图片描述


我觉得在做数据质量治理工作的时候,多去了解数据系统的使用者需求是非常重要的。尤其是对于技术人员来说,因为技术人员可能和产品的使用者中间隔了一个产品经理,有些技术同学可能就不太想去了解太多,只是产品经理提了什么就做什么。或者只被动的从产品经理口中了解了一部分用户的需求,但这些还不够。
我们应该主动的去了解更多的背景细节。当我们了解清楚了是谁在用我们的系统,他们是什么角色,他们分别高频或低频查看哪些报表,他们对哪些报表的数据质量要求更高等等细节,我们才可以更高效的划分自己手头上繁多工作的优先级,按照优先级高低稳扎稳打的推进工作。

5.2 数据质量改进生命周期

在这里插入图片描述

在开展工作时,确定下来一个固定的工作流程是非常重要的事,可以帮助提升工作效率以及降低可能犯错的概率。
对于数据质量治理工作类似,也应该确定一个数据质量改进的生命周期,我们可以按照这个生命周期去不断迭代的提升自身数据系统的数据质量。
我在实践过程中,总结出的数据质量改进生命周期如图:

(1) 发现问题
发现问题的方式主要有两种,包括通过监控或告警提前发现目前存在或潜在的数据质量问题,以及外部使用者在使用过程中反馈过来的数据质量问题。
当然,我们希望尽可能在外部使用者发现问题之前,通过系统监控告警或者人工核验的方式提前发现并解决问题。但是,要做到外部使用者完全没有任何数据问题的反馈也是不大可能(其中还包括口径以及外部人员信息不对称的问题),因此,我们在收到外部反馈的数据问题时也不必过于惊慌,重要的是先做好记录,方便接下来的问题排查、修复,以及方便以后的历史工作回溯。
(2)定位本质原因
发现问题之后,首先要做的并不是立刻把问题解决掉(除非情况紧急),而是在问题仍存在时,精准的定位出导出该问题的本质原因。因为,有时候着急处理掉问题,会使定位导致发生该问题的本质原因变得极难排查。而我们只有精确的定位出本质原因,才能有效避免以后再次发生类似的数据问题。
(3)修正或补录数据
在排查出了导致该问题的本质原因后,我们就要立刻修正缺失或有误的数据了。这里值得注意的是,我们应该去建立一个快速的数据补录重跑机制,避免在这种重复性工作上花费过多时间与精力。
(4)系统化解决问题
我们将数据修正之后,或许近期使用者暂时就不会发现什么类似的问题了。但是,如果导致数据质量问题的本质原因没有被定位并解决掉,那么势必此类数据质量问题还会出现。因此,我们在数据修正之后,接下来要做的就是尽快解决掉之前导致数据质量问题的根源性问题。而且,要以系统化的方式去解决,避免解决了一个问题,又导致出了另外一些问题。或者仅解决一个眼下问题,而对未来潜在风险视若无睹。
(5)检查确认
这一步也是非常必要的。我们在解决完问题之后,一定要对数据准确性进行核验,并且确认系统还有没有此类漏洞,确认系统下次不会再次发生此类数据问题。

5.3 数据质量治理的思路方法

在这里插入图片描述

数据质量治理可以从三方面去考虑并推进工作。
首先是数据问题的预防,创建高质量数据的最佳方法是防止低质量数据进入组织。然后就是数据问题的提前发现,因为我们都知道,当数据系统较大且复杂时,数据问题不可避免的会出现。但我们要尽可能在外部使用者反馈出问题之前,将问题发现并解决掉,这样外部对于数据系统才会有信任感。最后,就是数据问题的解决,我们应该建立一个快速解决问题的机制,将数据问题按照优先级,快速精准的解决掉。参考如下思维导图:

5.3.1 数据问题的预防

(1)建立数据输入控制
这一点非常重要,我们只有对流入系统的数据严格把控,才能从源头上避免出数据问题。如果在源头都没有把控好,那么数据后续在系统中的各种流转计算都是白费功夫。
首先是数据源数据的准确性保证,我们O2主要有三种数据源的输入方式:从媒体端拉取数据、媒体端给我们上报数据、人工在系统上录入数据。
因此,针对这三种录入数据的方式,我们都要进行把控。
(2)适当的冗余计算
我们其实可以用计算资源换数据质量,在进行核心数据计算时,不妨适当的增加冗余计算。这里的冗余计算指的是,比如数据源拉取任务如果是非常核心的数据任务,且我们担心它会有意外导致失败,我们可以多增加一次计算,虽然这种方式略显笨拙,但可以更加确保数据不会有缺失。
(3)培训数据录入者
对于数据录入者,我们也要进行规范化培训。因为系统不是万能的,有许多人为的操作,系统很难以纠正。因此对数据录入部分的录入者也要有规范性上的培训。

5.3.2 数据问题的提前发现

(1)脚本任务增加主动告警
我们可以给核心任务增加主动告警,在脚本执行失败或未执行完成的情况下能够第一时间发现并处理。
(2)每日检查监控告警可视化界面
对于监控告警的可视化检查,是因业务不同而异的,我们这里有许多每日的离线任务,这种任务一般都是在凌晨执行。那么,我每天早上在上班时,确认一眼昨日任务的执行情况就行。提前发现并处理问题要比问题出现后解决花的时间少得多。
(3)数据剖析并作邮件告警
我们可以制定一些符合自身业务特征的规则,对数据结果进行自动的判断剖析,来确认数据是否是高质量的。如果发现了不满足规则的情况,那么就及时做告警推送给相关负责人注意。
比如,在我们O2这里,有些字段值是不能为空的(某一天某个游戏完全没有消耗费用),或者某个游戏广告的消耗金额很大,却没有任何注册新进,这种就是明显的数据不合理。
关于数据剖析的角度还有很多,我们需要根据自身业务情况来进行规则制定,详细的可参考思维导图中罗列的一些。
(4)人工定期对核心数据进行对账
对于非常重要的核心数据,我们有必要专人进行定期的数据核验,人工总是能够发现一些系统难以发现的问题。

5.3.3 数据问题的解决

(1)数据问题的优先级划分
优先级是很重要的事情,意味着技术人员在协助定位解决问题时,要选择哪一个优先处理,哪一些可以暂且放一放。当然这不仅仅是指需要技术人员协助解决的问题较多时。
就算数据问题较少甚至只有一两个,优先级的划分依然很重要,因为技术人员手上可能会有很多重要的事情在做,比如需求开发、代码重构、完善监控等。
如果数据问题的优先级划分清楚,也能够帮助技术人员更宏观更合理的安排时间。
(2)数据问题的根因快速定位
定位数据问题的原因,我的实践经验是,首先能交给使用者定位是最好的。因为有许多数据问题不一定是真正的数据问题,如果所有使用者一碰到难以理解的问题就来找技术人员协助定位,那技术人员会花费过多时间在问题定位上的,反倒没有时间去做其他重要不紧急的事情。导致数据问题会越堆积越多的。因此,可以给使用者做一些自助排查的系统,去协助他们找到问题原因,如果是真正需要技术人员帮忙解决的,那么再找到技术人员协助解决。
另外,就是可以将我们数据流中间结果的数据可视化出来,便于在最终结果报表缺失或有误的情况下,能够快速定位出是数据流中哪一个环节出了问题。
最后,其实如果技术人员对整个系统较为了解,且排查问题的经验较丰富,实际上是可以依据直觉快速定位出问题原因的。因此,作为技术人员,多熟悉数据系统,定位问题时多思考多总结也是很重要的。
(3)数据的快速重算或补录
发现数据问题后,如何快速的修正数据也是需要思考的问题。建议可以针对每一个环节都设计一个快速补录数据的方式。使得可以提升重跑数据的效率。
比如我们O2系统,会设置一些补录任务,分别针对数据源需要重拉取、中间结果需要重新计算以及结果报表需要重新展示等。

5.4 数据质量问题的常见原因

在这里插入图片描述

在做数据质量治理的工作中,记录并总结自身系统中常见的数据质量问题,并且思考系统性的解决方案是非常重要的一件事。
我在实践中对O2数据质量问题常见的原因做了一些总结。主要有四点:
数据输入过程引起的问题、数据处理功能引起的问题、系统设计引起的问题、解决问题引起的问题。
参考如下思维导图:

5.4.1 数据输入过程引起的问题

由于O2作为数据系统,有许多接口需要人工去参与数据输入,包括配置账户、配置媒体信息、录入媒体的消耗费用信息等。因此,这一部分也是O2系统最容易导致数据问题的地方。
(1)数据输入接口不合理
对于技术人员来说,要想协助减少数据录入的问题,最重要的就是假设人为操作充满了各种失误。因此,可以从两方面去考虑协助优化。
第一,降低使用者参与系统的程度,能让系统自动做的事情就不要让人工去参与,机器出现失误的概率是要远低于人为操作的。
第二,对于必须要人工参与的部分,对使用者输入的各种数据进行全面的校验就显得尤其重要。这里可能需要注意,业务规则一般会变化较频繁,要定期review校验规则,注意这些规则是否合理以及是否全面。
(2)显示条目放置不合理
对于需要使用者注意的信息条目,一定要前置或者标注以提醒。比如某些使用者必须配置的地方,或者使用者在使用过程中易失误的地方。
有时候仅仅是显示条目放置的更合理些,就会减少很多不必要的操作失误发生。
(3)人工操作失误问题(系统无法协助校验的)
有些人工操作失误的问题,是系统也无法帮忙校验的。这种情况,我们技术人员可以做的事情就是用告警邮件的方式,定时通知到相关负责人,告知他们哪些地方可能是有问题的。推动相关业务负责人去检查并修正问题。
比如O2系统中,会每天发送告警邮件,将各种配置异常的情况告知给相关人员。
另外,就是需要有专门针对数据系统使用者的培训,虽然无法完全避免,但也要尽可能减少人工操作的失误。
(4)业务流程的变更
业务流程随着时间的推移而变化,在变化过程中引入了新的业务规则和数据质量要求。但是,这些业务规则的更改并不一定总能被及时或全面的纳入系统。如果接口未升级以适应新的或变化的需求,将导致数据质量问题发生。
因此,业务规则的更改应当同步给整个系统的负责人或系统各个可能影响到的模块的负责人,否则数据系统很可能会受到影响。
(5)业务流程执行混乱
通过混乱的流程创建的数据很可能会不一致。混乱的流程很可能是由培训、文档不完善或需求的随意变化导致的。这些方面也应当受到重视。

5.4.2 数据处理功能引起的问题

(1)有关数据源的错误假设
在我实践过程中发现,我们有时会假定数据源是没有任务问题的,但这种想法是不正确的。有时第三方数据源的问题一样会导致自身数据系统出现数据质量问题。
首先第三方来的数据源本身就有问题,这个我们就无法干涉处理了,只能够在数据源修复问题后重跑自身数据。
还有一种情况是数据源对接的接口文档不完整或过时,这时可能也会引起自身数据系统出现数据问题。因此,就算是对接后的第三方数据源,我们也应当持续定期review其接口文档,或者在接到接口变更的信息时,及时对自身数据系统做适配调整。
另外,有些第三方数据源会有快速的数据接口(在线实时任务),以及较慢速的数据接口(离线任务),可能相同的接口在不同的时间点会有数据修正的发生,因此,也要注意自身数据获取任务的执行时间点,避免数据修正后却没有重跑任务更新最新的数据。
(2)过时的业务规则(变更了业务规则,没有及时同步到数据开发者)
有时候由于沟通的问题,会出现业务规则没有提前或及时同步给技术人员,导致出现一些数据问题。这就要求业务与开发同学需要有有效的信息同步渠道。
(3)变更的数据结构
一般情况下,较复杂的数据系统的数据链路都比较长,如果上游数据有如字段名称、字段类型这样的变化但没有及时同步给下游数据负责人,那么也一定会出现数据问题。

5.4.3 系统设计引起的问题

从系统设计角度来说,如果系统设计的不合理或不完善,那么也非常容易导致数据出现问题。
(1)未执行参照完整性
如果没有强制的执行参照完整性,或者关闭了验证参照完整性,那么很可能会导致出现数据问题。如产生破坏唯一性约束的重复数据或者由于丢失的数据被分配为默认值而导致的数据质量问题等。
(2)未执行数据的唯一性约束/唯一性约束有误
底层库表的唯一键约束需要考虑清楚,唯一键设置不合理,很容易导致数据出现翻倍的情况。
(3)数据模型不准确
如果数据模型内的假设没有实际数据的支持,则容易出现数据质量问题。在设计库表时,要考虑好以后的兼容性,避免出现字段类型设置不合理导致的数据问题。如实际数据超出字段长度导致数据丢失。
(4)时间数据不匹配
在没有统一规范的情况下,多个系统可能会采用不同的日期或时间格式,当不同源系统之间数据同步或计算时,可能会导致数据不匹配和数据丢失。这里可以推广一下,不仅仅是时间格式,其他应当规范的字段格式也一定要做约束与规范。
(5)字段值定义不清晰
有些字段值会包含特定的含义,若不制定严格的规范约束,不同技术人员可能会使用相同的值来表示不同的含义,这样就会导致数据不匹配或丢失。
比如,O2中有一个常用的字段platid,表示设备的ID值。若不强制约束,可能有的人用0表示iOS平台,有的人用2表示iOS平台,那么在后续的汇总计算时,就可能会出现严重的数据问题。
(6)数据复制
不必要的数据复制通常时数据管理不当造成的,一般情况下,不必要的数据复制都会带来害处。有害的数据复制通常有两种情况:
一种是单源但有多个本地实例,比如有些配置表,在数据库系统中存储有多份,不清楚的人很难知道该用哪一个实例,而这些实例不一定就是完全同步的,会可能会造成数据质量问题。
另一种是多源但仅一个本地实例,比如外部数据源给了多个数据源,这些数据源可能是不同层级的数据,相互间又有关系,如果我们这里只存一个实例,需要认真判断该用哪个源。

5.4.4 解决问题引起的问题

在解决数据问题时,通常为了快速便捷而采用直接在数据库修改或脚本手动执行的方式,而且由于修复问题都比较仓促,非常容易引起额外的数据问题。比如数据更改错误、数据更改后未通知到下游数据等,经常为解决这些问题会花费更多的时间。
因此,非常不建议在解决问题时仓促着急。在解决数据问题时,也有以下几点需要注意:
(1)直接更改数据库数据一定要手动开启及提交事务,在遇到觉得更改的行数异常的情况下,方便直接回滚至原状态。
(2)尽可能保留原始数据,不要直接采用覆盖的方式写数据。
(3)重要数据要先备份。
(4)注意到使用该数据的下游数据,将更改信息及时做同步。

5.5 数据质量治理工作的复盘与同步

在这里插入图片描述


与数据治理和整体的数据管理一样,数据质量治理并不是一个项目,而是一项持续性工作。它包括了项目和维护工作,以及沟通和培训工作。最重要的是,数据质量改进取得长期成功取决于组织的质量观念的建立或改变。
简单来说,数据质量取决于所有与数据交互的人,而不仅仅是数据治理的专业人员。因此,在数据质量治理工作中,尤其是治理的前期,复盘与同步是一件很重要的事。可以定期做一次复盘以及数据质量治理情况的同步,让相关人都可以及时同步到数据治理情况以及逐渐建立高数据质量的观念。

6 结语

就像前面说的,数据质量治理不仅仅是一项技术活,更多的是一项持续性的维护工作。因此,我们要调用一切方法,而不仅仅局限于技术方法去解决数据质量问题。另外,只要有复杂的数据系统,也就一定会有数据质量的问题存在。所以,对于数据质量的追求应该是持续不断的。
本人接触数据质量治理还不到一年的时间,而且,我们项目目前的数据系统复杂性还远不如一些公司级的产品。因此,对于数据质量治理工作的了解还只是皮毛,总结复盘的内容也不够系统完善,接下来会继续推进该项工作,也会继续总结数据质量治理的经验与教训。
也恳切希望有经验的前辈同行对于不正确或不完善的地方予以指点批评,感恩。

7 参考资料

《数据产品设计》
《DAMA数据管理知识体系指南》

etcd后端存储源码解析——底层读写操作

背景

最近想找一些用Go语言实现的优秀开源项目学习一下,etcd作为一个被广泛应用的高可用、强一致性服务发现存储仓库,非常值得分析学习。
本篇文章主要是对etcd的后台存储源码做一解析,希望可以从中学到一些东西。

etcd大版本区别

目前etcd常用的是v2和v3两个大版本。两个版本不同之处主要在于:

  1. v2版本仅在内存中对数据进行了存储,没有做持久化存储。而v3版本做了持久化存储,且还使用了缓存机制加快查询速度。
  2. v2版本和v3版本对外提供的接口做了一些改变。在命令行界面中,可以使用环境变量ETCDCTL_API来设置对外接口。

我们在这里主要是介绍v3版本的后台存储部分实现。 并且这里仅涉及到底层的读写操作接口,并不涉及到更上层的读写步骤(键值的revision版本选择等)。

etcd的后端存储接口

分析思路:

  1. 查看etcd封装的后端存储接口
  2. 查看etcd实现了后端存储接口的结构体
  3. 查看上述结构体的初始化方法
  4. 查看上述结构体的初始化值
  5. 查看上述结构体初始化方法的具体初始化过程

首先,我们先来看下etcd封装的后端存储接口:
路径:https://github.com/etcd-io/etcd/blob/master/mvcc/backend/backend.go

type Backend interface {
    // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
    ReadTx() ReadTx
    BatchTx() BatchTx
    // ConcurrentReadTx returns a non-blocking read transaction.
    ConcurrentReadTx() ReadTx

    Snapshot() Snapshot
    Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
    // Size returns the current size of the backend physically allocated.
    // The backend can hold DB space that is not utilized at the moment,
    // since it can conduct pre-allocation or spare unused space for recycling.
    // Use SizeInUse() instead for the actual DB size.
    Size() int64
    // SizeInUse returns the current size of the backend logically in use.
    // Since the backend can manage free space in a non-byte unit such as
    // number of pages, the returned value can be not exactly accurate in bytes.
    SizeInUse() int64
    // OpenReadTxN returns the number of currently open read transactions in the backend.
    OpenReadTxN() int64
    Defrag() error
    ForceCommit()
    Close() error
}

Backend接口封装了etcd后端所提供的接口,最主要的是:
ReadTx(),提供只读事务的接口,以及BatchTx(),提供读写事务的接口。
Backend作为后端封装好的接口,而backend结构体则实现了Backend接口。
路径:https://github.com/etcd-io/etcd/blob/master/mvcc/backend/backend.go

type backend struct {
    // size and commits are used with atomic operations so they must be
    // 64-bit aligned, otherwise 32-bit tests will crash

    // size is the number of bytes allocated in the backend
    // size字段用于存储给后端分配的字节大小
    size int64
    // sizeInUse is the number of bytes actually used in the backend
    // sizeInUse字段是后端实际上使用的内存大小
    sizeInUse int64
    // commits counts number of commits since start
    // commits字段用于记录启动以来提交的次数
    commits int64
    // openReadTxN is the number of currently open read transactions in the backend
    // openReadTxN存储目前读取事务的开启次数
    openReadTxN int64

    // mu是互斥锁
    mu sync.RWMutex
    // db表示一个boltDB实例,此处可以看到,Etcd默认使用Bolt数据库作为底层存储数据库
    db *bolt.DB

    // 用于读写操作
    batchInterval time.Duration
    batchLimit    int
    batchTx       *batchTxBuffered

    // 该结构体用于只读操作,Tx表示transaction
    readTx *readTx

    stopc chan struct{}
    donec chan struct{}

    // 日志信息
    lg *zap.Logger
}

通过19行 db *bolt.DB 我们可以看到,etcd的底层存储数据库为BoltDB。
好了,接下来我们就看一下这个backend结构体是如何初始化的。
还是在该路径下,我们可以看到New函数

// 创建一个新的backend实例
func New(bcfg BackendConfig) Backend {
    return newBackend(bcfg)
}

该函数传入了参数bcfg,类型为BackendConfig,这是后端存储的配置信息。
我们先看下这个配置信息中包含了什么
依然在该路径下,找到BackendConfig结构体

type BackendConfig struct {
    // Path is the file path to the backend file.
    Path string
    // BatchInterval is the maximum time before flushing the BatchTx.
    // BatchInterval表示提交事务的最长间隔时间
    BatchInterval time.Duration
    // BatchLimit is the maximum puts before flushing the BatchTx.
    BatchLimit int
    // BackendFreelistType is the backend boltdb's freelist type.
    BackendFreelistType bolt.FreelistType
    // MmapSize is the number of bytes to mmap for the backend.
    // MmapSize表示分配的内存大小
    MmapSize uint64
    // Logger logs backend-side operations.
    Logger *zap.Logger
    // UnsafeNoFsync disables all uses of fsync.
    UnsafeNoFsync bool `json:"unsafe-no-fsync"`
}

可以看到,有许多backend初始化所需要的信息都在这个结构体中。
既然有这些配置信息,那么一定会有相应的默认配置信息,
我们来看下在默认情况下etcd存储部分会被赋怎样的值。
依然在该目录下,找到DefaultBackendConfig函数。

func DefaultBackendConfig() BackendConfig {
    return BackendConfig{
    BatchInterval: defaultBatchInterval,
    BatchLimit: defaultBatchLimit,
    MmapSize: initialMmapSize,
    }
}

随便查看其中某个全局变量的值,比如defaultBatchInterval,则可以看到默认值:

var (
    defaultBatchLimit = 10000
    defaultBatchInterval = 100 * time.Millisecond
    defragLimit = 10000
    // initialMmapSize is the initial size of the mmapped region. Setting this larger than
    // the potential max db size can prevent writer from blocking reader.
    // This only works for linux.
    initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
    // minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
    minSnapshotWarningTimeout = 30 * time.Second
)

defaultBatchInterval变量为例,就是说默认情况下,etcd会100秒做一次自动的事务提交。
etcd后端存储默认赋值的部分说完了,就说回对结构体的初始化上。
我们继续看函数New,它调用了函数newBackend
我们看下函数newBackend做了些什么

func newBackend(bcfg BackendConfig) *backend {
    if bcfg.Logger == nil {
        bcfg.Logger = zap.NewNop()
    }

    // 一些配置载入
    bopts := &bolt.Options{}
    if boltOpenOptions != nil {
        *bopts = *boltOpenOptions
    }
    bopts.InitialMmapSize = bcfg.mmapSize()
    bopts.FreelistType = bcfg.BackendFreelistType
    bopts.NoSync = bcfg.UnsafeNoFsync
    bopts.NoGrowSync = bcfg.UnsafeNoFsync

    // 初始化Bolt数据库
    db, err := bolt.Open(bcfg.Path, 0600, bopts)
    if err != nil {    
        bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
    }

    // In future, may want to make buffering optional for low-concurrency systems
    // or dynamically swap between buffered/non-buffered depending on workload.
    // 对backend结构体做初始化,包括了readTx只读事务以及batchTx读写事务
    b := &backend{
        db: db,

        batchInterval: bcfg.BatchInterval,
        batchLimit: bcfg.BatchLimit,

        readTx: &readTx{
            baseReadTx: baseReadTx{
                buf: txReadBuffer{
                    txBuffer: txBuffer{make(map[string]*bucketBuffer)},
                },
                buckets: make(map[string]*bolt.Bucket),
                txWg: new(sync.WaitGroup),
                txMu: new(sync.RWMutex),
            },
        },

        stopc: make(chan struct{}),
        donec: make(chan struct{}),

        lg: bcfg.Logger,
    }
    b.batchTx = newBatchTxBuffered(b)
    // 开启一个新的etcd后端存储连接
    go b.run()
    return b
}

我们可以看到6-19行在初始化boltDB的同时载入了一些数据库的配置信息。
23-41行是对backend结构体做了初始化,包括了只读事务readTx、读写事务batchTx结构体的初始化,以及初始化了两个通道stopc、donec,这个后面会用到。
43行开启了一个协程去并发的处理run()函数内的工作。
我们继续看一下run()函数做了什么。依然在该目录下

func (b *backend) run() {
    // 关闭结构体的donec通道
    defer close(b.donec)
    // 开启一个定时器
    t := time.NewTimer(b.batchInterval)
    // 最后要关闭定时器
    defer t.Stop()
    for {
        select {
            // 当定时器到时间了,则t.C会有值
            case <-t.C:
            case <-b.stopc:
                b.batchTx.CommitAndStop()
                return
            }
        // 定时器到时间了,且数据的偏移量非0,即有数据的情况下,则会进行一次事务的自动提交
        if b.batchTx.safePending() != 0 {
            b.batchTx.Commit()
        }
        // 重新设置定时器的时间
        t.Reset(b.batchInterval)    
    }
}

我在代码中注释的比较详细了,简单的说,就是在初始化backend结构体时,开启了一个协程用于事务的自动提交,事务自动提交的时间间隔为batchInterval,这个默认值为100秒。
注意12-14行,这段代码表示,如果是停止信号进来的话,则事务会立即提交并且停止。
到这里,backend结构体就初始化完成了,接下来我们看一下用于读操作的只读事务接口ReadTx

etcd后端存储的读操作

路径:https://github.com/etcd-io/etcd/blob/master/mvcc/backend/read_tx.go

type ReadTx interface {
    Lock()
    Unlock()
    RLock()
    RUnlock()

    UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
    UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
}

该接口是用结构体baseReadTx实现的,来看一下baseReadTx结构体,文件路径与ReadTx接口一样

type baseReadTx struct {
    // buf与buckets都是用于增加读效率的缓存
    // mu用于保护txReadBuffer缓存的操作
    mu  sync.RWMutex
    buf txReadBuffer

    // txMu用于保护buckets缓存和tx的操作
    txMu    *sync.RWMutex
    tx      *bolt.Tx
    buckets map[string]*bolt.Bucket
    // txWg可以防止tx在批处理间隔结束时回滚,直到使用该tx完成所有读取为止
    txWg *sync.WaitGroup
}

只读事务ReadTx的读取数据的接口有两个,分别是UnsafeRange以及UnsafeForEach。我们以UnsafeRange接口为例进行代码分析。
UnsafeRange接口的实现依然在上述路径中

// 该方法用于底层数据的只读操作
func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
    // 不使用范围查询
    if endKey == nil {
        // forbid duplicates for single keys
        limit = 1
    }
    // 当范围值异常时,则传入最大范围
    if limit <= 0 {
        limit = math.MaxInt64
    }
    if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
        panic("do not use unsafeRange on non-keys bucket")
    }
    // 将buf缓存中的数据读取出来
    keys, vals := baseReadTx.buf.Range(bucketName, key, endKey, limit)
    // 如果取出的数据满足了需求,那么则直接返回数据
    if int64(len(keys)) == limit {
        return keys, vals
    }

    // find/cache bucket
    // 从bucket缓存中查询bucket实例,查询到了则返回缓存中的实例,查询不到,则在BoltDB中查找
    bn := string(bucketName)
    baseReadTx.txMu.RLock()
    bucket, ok := baseReadTx.buckets[bn]
    baseReadTx.txMu.RUnlock()
    lockHeld := false
    // 缓存中取不到bucket的话,会从bolt中查找,并写入缓存中
    if !ok {
        baseReadTx.txMu.Lock()
        lockHeld = true
        bucket = baseReadTx.tx.Bucket(bucketName)
        baseReadTx.buckets[bn] = bucket
    }

    // ignore missing bucket since may have been created in this batch
    if bucket == nil {
        if lockHeld {
            baseReadTx.txMu.Unlock()
        }
        return keys, vals
    }
    if !lockHeld {
        baseReadTx.txMu.Lock()
        lockHeld = true
    }
    c := bucket.Cursor()
    baseReadTx.txMu.Unlock()

    // 从bolt的该bucket中查找键值对
    k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
    return append(k2, keys...), append(v2, vals...)
}

4-14行是一些前置的判断步骤,16-20则是从buf缓存中读取数据,前面提到过,buf是etcd用于提高读取效率的缓存。
我们看下具体的从buf读取数据的过程。
Range函数在路径:https://github.com/etcd-io/etcd/blob/master/mvcc/backend/tx_buffer.go

func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
    if b := txr.buckets[string(bucketName)]; b != nil {
        return b.Range(key, endKey, limit)
    }
    return nil, nil
}

可以看到,该方法就是实例化了名为bucketName的桶,然后从该桶中按照范围读取键值数据。
我们可以看到,bucket的实例为结构体bucketBuffer

type bucketBuffer struct {
    buf []kv
    // used字段记录了正在使用的元素个数,这样buf无需重新分配内存就可以覆盖写入
    used int
}

看回到Range方法代码的第3行,我们来看一下b.Range方法的代码。b.Range与buf.Range方法不同,b.Range是结构体bucketBuffer实现的方法。
依然与Range方法相同路径

func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
    // 查找到key在buf中的索引idx
    f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
    // sort.Search用于从某个切片中查找某个值的索引
    idx := sort.Search(bb.used, f)
    if idx < 0 {
        return nil, nil
    }
    // 只查找一个key值,而非范围查找
    if len(endKey) == 0 {
        if bytes.Equal(key, bb.buf[idx].key) {
            keys = append(keys, bb.buf[idx].key)
            vals = append(vals, bb.buf[idx].val)
        }
        return keys, vals
    }
    // 根据字节的值来比较字节切片的大小
    // 如果endKey比key值小,则返回nil
    if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
        return nil, nil
    }
    // 在个数限制limit内,且小于endKey的所有键值对都取出来
    for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
        if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
            break
        }
        keys = append(keys, bb.buf[i].key)
        vals = append(vals, bb.buf[i].val)
    }
    return keys, vals
}

3-5行代码表示要从buf结构体中找到第一个满足包含key的索引值。该两行代码一般结合使用,是一种常见的用于查找值对应索引值的方式。

10-16行代码表示,如果endKey为0,即不使用范围查找,只查找key这一个精确值,那么就需要判断3-5代码找到的值是否与该key完全相等,只有完全相等了才会返回keys与vals。

19-21行代码表示,如果输入的endKey比key值还要小,那么就认为是输入的问题,则返回nil值。

最后19-30行代码表示,key与endKey都输入正常的情况下,则将limit内,大于等于key且小于endKey的键值对都取出来,并返回keys、vals结果。

到此,从buf缓存中就可以读取所需要的数据了,那么,我们回过头接着看UnsafeRange方法的实现,该方法在前面有提到。
我再次把代码贴在这里:

// 该方法用于底层数据的只读操作
func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
    // 不使用范围查询
    if endKey == nil {
        // forbid duplicates for single keys
        limit = 1
    }
    // 当范围值异常时,则传入最大范围
    if limit <= 0 {
        limit = math.MaxInt64
    }
    if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
        panic("do not use unsafeRange on non-keys bucket")
    }
    // 将buf缓存中的数据读取出来
    keys, vals := baseReadTx.buf.Range(bucketName, key, endKey, limit)
    // 如果取出的数据满足了需求,那么则直接返回数据
    if int64(len(keys)) == limit {
        return keys, vals
    }

    // find/cache bucket
    // 从bucket缓存中查询bucket实例,查询到了则返回缓存中的实例,查询不到,则在BoltDB中查找
    bn := string(bucketName)
    baseReadTx.txMu.RLock()
    bucket, ok := baseReadTx.buckets[bn]
    baseReadTx.txMu.RUnlock()
    lockHeld := false
    // 缓存中取不到bucket的话,会从bolt中查找,并写入缓存中
    if !ok {
        baseReadTx.txMu.Lock()
        lockHeld = true
        bucket = baseReadTx.tx.Bucket(bucketName)
        baseReadTx.buckets[bn] = bucket
    }

    // ignore missing bucket since may have been created in this batch
    if bucket == nil {
        if lockHeld {
            baseReadTx.txMu.Unlock()
        }
        return keys, vals
    }
    if !lockHeld {
        baseReadTx.txMu.Lock()
        lockHeld = true
    }
    c := bucket.Cursor()
    baseReadTx.txMu.Unlock()

    // 从bolt的该bucket中查找键值对
    k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
    return append(k2, keys...), append(v2, vals...)
}

看第16-20行,刚才我们已经分析了16行代码的具体实现。

18-19行则表示,如果返回的key的数量与limit相等,则就直接返回缓存中的数据即可。如果不是相等的,一般取出的key的数量小于limit值,也就是说,缓存中的数据不完全满足我们的查询需求,那么则需要继续向下执行,到etcd的底层数据库bolt中查询数据。

注意24-43行,首先etcd会从baseReadTx结构体的buckets缓存中查询查询bucket实例,如果缓存中查询不到该实例,则会从bolt数据库中查询并且将实例写入到缓存中。而如果bolt中也查询不到该bucket,则会直接返回之前从buf中查询到的keys与vals值。

如果从缓存或者bolt中查询到了bucket实例,那么,后续就可以直接从bolt中查询该bucket下的键值对了。

我们看一下52行的具体实现。

func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
    if limit <= 0 {
        limit = math.MaxInt64
    }
    var isMatch func(b []byte) bool
    // 如果有终止的key,则将找到的key与终止的key比较,是否key小于endkey
    // 否则,将找到的key与自身比较是否相等
    if len(endKey) > 0 {
        isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
    } else {
        isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
        limit = 1
    }

    // 循环查找key所对应的值, 然后与endkey做对比(如果有endkey的话)
    // 直到不满足所需条件
    for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
        vs = append(vs, cv)
        keys = append(keys, ck)
        if limit == int64(len(keys)) {
            break
        }
    }
    return keys, vs
}

先看一下传入unsafeRange方法的参数。

c为cursor实例,key为我们要查询的初始key值,endkey为我们要查询的终止key值。而limit是我们查询的范围值,由于我们之前用缓存已经查询出来了一些数据,因此,该范围其实是我们的总范围减去已经查询到的key值数。

其中,5-13行代码用到了匿名函数,用于判断查询到的key值是否依然满足需求。如果我们给到了endkey,那么就会对查到的key与endkey做比较。如果我们没有给endkey,那么就会直接判断查询到的key值是否等于我们要查询的key。

17-23行则为用于DB查询实现代码。

etcd后端存储的写操作

文章开头部分,我们讲到过etcd后端存储对外的接口Backend,其中包括了两个重要的接口:ReadTx以及BatchTx,ReadTx接口负责只读操作,这个我们在前面已经讲到了。
接下来,我们看一下etcd后端存储的读写接口BatchTx。
路径:https://github.com/etcd-io/etcd/blob/master/mvcc/backend/batch_tx.go

type BatchTx interface {
    ReadTx
    UnsafeCreateBucket(name []byte)
    UnsafePut(bucketName []byte, key []byte, value []byte)
    UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
    UnsafeDelete(bucketName []byte, key []byte)
    // Commit commits a previous tx and begins a new writable one.
    Commit()
    // CommitAndStop commits the previous tx and does not create a new one.
    CommitAndStop()
}

我们可以看到,BatchTx接口也包含了前面讲到的ReadTx接口,以及其他用于写操作的方法。batchTx结构体实现了BatchTx接口。

type batchTx struct {
    sync.Mutex
    tx      *bolt.Tx
    backend *backend

    // 数据的偏移量
    pending int
}

具体接口中方法的实现我们就不一一看了,因为都是直接调用了bolt数据库的接口,比较简单。

总结

本篇文章主要从源码角度分析了etcd后端存储的底层读写操作的具体实现。无论我们是使用命令行操作etcd,还是调用etcd的对外接口。最终在对键值对进行读写操作时,底层都会涉及到今天分析的这两个接口:ReadTx以及BatchTx。

然而,etcd的键值对读写其实还会涉及到许多其他的知识,比如revision的概念。接下来还会有文章继续对这些知识做解析。