Log-structured Protocols in Delos
☁️

Log-structured Protocols in Delos

Tags
分布式系统
分布式论文
SOSP
Abstract
作者ti一套Log-Structured协议,该协议基于共享日志(shared-log)对复制状态机做进一步的抽象,使其能够以分层的方式构建一套可以复用的协议栈,从而可以在不同的数据库实现中直接复用。
Property
💡
发表于SOSP 2021,该系统在OSDI2020年发表了其Shared Log层的抽象,并获得了当年的Best Paper. 论文作者近几年的研究都是集中在如何对分布式系统做更好的抽象,从而使分布式系统更加容易构建、修改和操作。作者主页: https://maheshba.bitbucket.io/
notion image

Intro

开发者在大规模系统的开发中往往能够接触多种不同的存储API,如关系型数据库、KV数据库、文件系统等等,这些API背后通常都是一个复杂的分布式存储系统,每一个系统都需要耗费大量的精力去开发和维护。这篇文章的作者则希望这些不同API的开发都能够基于同样一套代码库(codebase),从而降低开发和维护的成本,所以作者提出了一套Log-Structured协议,该协议基于共享日志(shared-log)对复制状态机做进一步的抽象,使其能够以分层的方式构建一套可以复用的协议栈,从而可以在不同的数据库实现中直接复用。

Background and Motivation

Facebook在2017年的时候线上已经部署了很多种不同的分布式数据库,比如MySQL、ZippyDB、ZooKeeper、LogDevice等,每一种数据库的开发和维护同学都需要充分理解每一种数据库的实现才能进行开发和运维,而且往往不同数据库的运维工具/经验是不能复用到其他数据库中。每一种数据库本质上可以分为两部分看待:上面一层包含特定API语义的单节点状态机(比如一个单机的MySQL服务)和下面一层通过共识协议去复制单机的状态。
学术界和业界已经有很多工作对下面一层进行抽象,然后基于共享日志(Shared Log)可以实现一个数据库,并且这个共享日志可以复用在不同的系统里。但是上面一层每个数据库都需要实现自己复杂的复制状态机逻辑,而作者观察到基于共享日志实现的复制状态机都有一些共性的功能,比如日志回放、Log Trimming、Batching、group commit、Lease-based local read、PITR(Point-in-Time Recovery)等等,所以本文对基于共享日志的复制状态机的实现进一步抽象,提出了一个Log-Structured协议,如图1所示,每个节点都包含一些系列的Engine,然后不同节点通过共享日志进行写作,然后对外则暴露特定数据库API。
notion image

Shared Log

本文实际上是Facebook Delos系统的一部分抽象,去年的OSDI会议作者介绍了Delos shared log抽象的实现,并且获得了当年的最佳论文。这里简单介绍下Delos Virtual Log的实现一遍大家有个基础的共识。Virtual Log的提出是因为Facebook在实践过程中经常会遇到,系统使用某个共识协议实现了一套存储系统,然后过了几年发现又有先的协议出现,于是他们想要对系统升级,使用新的协议去替换当前的协议从而获得更好的性能之类的收益,但是他们发现替换一个共识协议基本上相当于重新实现了一套存储系统,而且还不能保证系统的正确性,风险太大。
分布式系统里Log是一个非常重要的技术/组件,如MySQL的redo,RocksDB的WAL、Raft的WAL等,Log is the database基本上已经成为了共识。所以作者基于此思想提出了Virtual consesus的概念,因此共识的细节,提出Virutal Log的抽象,Virtual Log提供append/checkTail/readNext接口,使用方只用假设该log里的每一个Entry都已经复制并持久化在不同的节点上,不用关心背后使用那种共识协议实现的。每一批连续的Log entires称为Loglets,对应一种共识协议或者说使用某种共识协议实现的Log存储系统,然后保存在一个MetaStore服务里,在进行共识协议替换的时候,只需要修改MetaStore切换存储的位置即可。
notion image
notion image

Log-Structured Protocol

Log-Structed协议是一种基于共享日志的复制状态机的实现,使用方可以基于该协议在不同的节点间一致地复制其应用状态
该协议提供了一组SMR(State Machine Replication) API(见图2),使用方可以通过该API与协议引擎进行交互。使用IEngine API,应用可以通过propose接口提议一个Entry到共享日志;registerUpcall注册一个upcall可以从共享日志接收新的Entry;sync接口确保所有的在共享日志中的Entry都已经通过upcall通知给了应用。应用则可以将其本地的状态保存到持久化的存储系统中,如RocksDB,论文里将其称为LocalStore。LocalStore可以认为共享日志的确定性状态,通过共享日志可以重建一个确定性的应用状态,共享日志的重放则只能有upcall的apply进行。
notion image
Log-Structured协议本身是一个stackable的复制状态机。如图3所示,每一个Engine都像是下面一层Engine的应用,上面一层的Engine会调用下面一层Engine的propose/sync,然后下面一层Engine则会调用上面一层的apply。每一层Engine都会实现图2中IEngine API,并且实现过程中则会调用下一层的API。同时,每一层的Engine都可以直接访问LocalStore从而持久化其需要的状态。当一个Entry被propose到一个Engine(可能是从用户方,也可能是由上面一层Engine)的时候,该Engine会在Entry里加上自己的Header,然后继续向下面一层Engine发起Propose。同样地,当下面一层Engine调用其apply的时候,其会从Entry里解析出来自己的Header并且修改LocatStore状态,然后再去调用上面一层Engine的apply。
图3中,栈顶是真实的应用,其会暴露特定的API给终端用户(如Table API,或者Zookeeper API)。最下面一个Engine,我们称之为BaseEngine,是专门实现的一个Eengine用于和共享日志进行交互。下面作者以一个例子介绍如何从一个BaseEngine+Application一步步将其他Engine加入进来。
notion image

The Application

以图3为例,是一个单节点的服务,暴露一个名为foo()的api,并且这个api的依赖本地的状态。为了使用Log-structured协议,讲该服务分成两部分:Wrapper和Applicator。Wrapper负责暴露foo的语义,序列化请求并且调用栈顶engine的propose接口;Applicator则负责实现IApplicator的API,真正实现foo的功能。如图上箭头所示,Wrapper调用栈顶Engine的propose方法后,每层engine都会调用下面一层Engine的propose方法,直到BaseEngine,BaseEngine调用共享日志的接口将Entry持久化到共享日志后,会调用其上面Engine的apply方法,这里需要注意的是apply方法的调用全部是在一个单线程内执行,称之为Apply Thread。类似propose,一层层调用apply直到Applicator的apply方法,该方法是用户进行的实现,会真正执行foo的功能,可能会从LocalStore持久化或者从中读取数据,然后将执行结果一层层返回,最终返回给调用了propose而等待结果的Wrapper。
整个流程里还有两个需要注意的地方在图中并没有展示:
  1. Applicator执行的proposals可能来自不同的节点,在propose本节点propose的时候,BaseEngine会将当前节点没有apply的log entry,即当前提议的Entry前面已经提交的Entry也顺序Apply了,这些Entry可能来自不同的节点propose的,如果entry来自其他节点则response直接丢弃即可。换句话说,当前实现并没有leader的角色,每个节点都可以接收请求发起propose,并且可以并发propose,最终有shared log对日志进行排序,最终通过单线程的Apply到状态机,共享日志的序保证了每个节点状态机的确定性
  1. Applicator对LocalStore的任何访问都在一个事务内进行,确保其原子性。BaseEngine在调用上层Engine apply的时候先生成了一个事务Handle,然后最为参数传入apply接口,上面所有Engine以及Applicator对LocalStore的操作都有该Handle保证其原子性。
目前为止,通过将所有读写请求都有单线程的Apply Thread执行实现了linearizable的语义。但是有时候对于读可能需要更好的性能,不用每次请求都经过propose走一遍apply线程,用户只需要调用sync接口,就会自动tail当前shared log的entry apply到本地状态机,并且返回一个LocalView,那么多线程的读就可以直接在LocalView中进行,不需要将所有的读请求都走一遍整个Engine Stack。这样一来,整个服务就可以做到对LocalStore一写多读的模型。
更进一步,有时候Applicator需要维护一些状态,比如一些统计信息,从而能够对一写读请求达到更好的性能,IEngine也提供了一个postApply的接口,在Apply真正成功commit的时候会调用postApply去通知Applicator更新其状态。

The Bottom Engine

BaseEngine处于整个Engine Stack的最下面,同样也实现了IEngine的API,其主要职责是推进Log并且Apply每一个Log Entry到上层应用/Engine。在BaseEngine内部维护了一个游标(Cursor)记录当前Log推进的位置,当一个Entry被apply的时候,BaseEngine会先创建一个LocalStore事务上线文,即我们前面提到的事务handle,然后在事务内部将该游标进行更新到LocalStore。前面提到的Apply thread也是有BaseEngine在初始化的时候进行创建,也就是说整个Engine Stack的所有apply都是在BaseEngine创建的apply thread里执行的。当BaseEngine向上调用apply到应用层,应用层将entry处理并且下发到事务后,BaseEngine会将该事务commit到LocalStore,但是并不保证该事务的数据已经持久化,即落盘,机器重启可能会丢失,BaseEngine后台会周期性得将数据刷盘。
BaseEngine完全由propose/sync驱动,并不会主动推进log。当应用层调用sync的时候,BaseEngine会check共享日志当前最新的committed entry,然后从当前游标记录的位置开始往前apply日志,直到最新的entry为止。当日志推进到最新的log entry后,sync调用会返回一个LocalStore的只读事务,该事务包含了当前所有日志的回放。当应用层调用propose的时候,BaseEngine会将该条日志append到共享日志,然后往前推进apply日志直到当前append进去的日志,当应用apply成功新append进去的日志后,BaseEngine会将apply的结果/response返回给还在等待的propose请求。事实上,propose调用实现了某种形式上的复制RPC,首先他保证了持久性,propose只在Entry已经持久化到共享日志才会返回;其次保证了failure-atomic,其apply的执行都在一个LocalStore事务内;最后确保了线性性,在propose的entry在本地store执行前,该entry先被append到共享日志,然后按照共享日志的顺序执行。
除了推进日志的propose和apply,BaseEngine还负责共享日志的trim,BaseEngine后台线程会周期性的对日志进行修剪,确保已经在所有节点都持久化的日志truncate掉,这个过程称为GC。但是BaseEngine并不负责GC的策略,其提供了一个setTrimPreifx的接口,有上层Engine/Application调用,没一层Engine根据自己的需要调整trimprefix最终通知BaseEngine进行trim的位置。

The Middle Engines

正如图3所示,Engine可以一层层堆叠,每一个Engine都可以基于某一层的IEngine API实现一个新的Engine然后置于其上。当应用层调用propose的时候,每一层的Engine都会将其特有的Header添加到Eetry里(piggybacks),最终持久化在共享日志里。通过这种方式,实现Engine的人就可以在Engine做很多事情了,比如延迟propose entry以达到batching的目的,也可以选择性的根据需要修改entry的内容达到一些目的。
Engine也可以实现IApplicator接口,然后向下一层Engine注册其apply/postApply接口,如此在apply entry的时候Engine就可以根据需要对entry进行处理,甚至可以通过提供的LocalStore事务handle做一些状态的持久化,然后再将entry传递到上一层注册的apply/postApply接口。图4展示了一个Engine的代码实例,其实现了拦截apply entry的功能
notion image

Dynamic Updates

通过这种stack的模式,通常新加一个功能就是通过加入一个新的Engine到engine stack里,对于已经上线的服务这本身是一个比较危险的操作,稍有不慎可能会导致不同节点状态的不一致。因此Facebook在实践中提出了一种两阶段升级协议:
  1. 第一阶段:滚动升级每个节点,确保加入新的engine。加入了新的engine的节点,在propose时候会携带新engine的header到entry并持久化到共享日志,但是在apply的时候不会对LocalStore做任何的修改,维持原来的apply流程
  1. 第二阶段:待所有节点都升级完成后,通过propose一条log的方式下发一条运维命令。因为propose log是顺序append到共享日志的,所以当节点apply到该log时候知道该log后面的日志可以启用新的engine了,每个节点看到的该条log的位置也是一致的,确保了各个节点状态的一致

A Tale of Two Databases

Delos使用C++实现了Log-Structured协议,LolcalStore使用RocksDB,log entry的序列化反序列化使用thrift。共享日志使用VirtualLog[1],BaseEngine使用VirtualLog接口实现。每一个Delos数据库通常部署在5或者7个节点上 。Delos不提供sharding和跨shard事务,但是使用方可以通过在上面单独做一层服务实现这些功能。
图5列出了Facebook内部这几年为了实现不同需求的数据库实现的9种Engine,通过这些engine的组合快速构造了不同的数据库,如提供MySQL语义的DelosTable,zookeeper的zelos,一个队列服务DelosQ。
notion image
notion image
 

Rapid deployment of DelosTable

DelosTable是Delos这个项目最开始构建的数据库,在BaseEngine和应用层之间实现了两个Engine:ViewTrackingEngine和ObserverEngine。
  1. ViewTrackingEngine
该Engine主要负责协调日志的修剪(trimming)。ViewTrackingEngine会记录每一个节点的log已经playback的位置,当所有节点都已经回放到某个位置X的时候,该Engine会通过setTrimPrefix接口设置可以安全trim的位置,这样BaseEngine就知道X之前的log可以安全的GC掉了。具体的实现是,ViewTrackingEngine在内存会保存一个map记录每个节点当前playback的位置,当一个Entry popose经过该Engine的时候,其会查询LocalStore当前apply和flush的位置,然后添加到header里带到下一层engine,最终也会持久化到共享日志被其他节点看到,最终所有节点看到的都是一致的视图。
  1. ObserverEngine
observerEngine是一个很轻量的Engine,用来记录propose/sync等调用的延迟,这样就可以记录每一个engine的延迟,通常建议,每一个Engine上面会添加一个ObserverEngine用于观测记录该Engine的性能指标。通过这种方式,有两个明显的收益:
  • 首先可以很好的观测每一层Engine的延迟等性能
  • 将监控从核心代码逻辑分离,这样在修改代码逻辑的时候不用担心是否因为不小心修改了监控的代码导致性能变化不可信

Adding Features to DelosTable

  1. BrainDoctorEngine
DelosTable在线上运行一段时间后,因为一个代码的bug导致不同节点出现了数据的不一致(做存储的应该都深有感触😢),所以他们需要有一个方法能够直接修改LocalStore去修复数据。所以他们开发了一个BrainDoctorEngine,通过一个外部的请求,提供一系列log entry,该engine可以直接将entry apply到localstore使数据快速修复到希望的状态。
  1. LogBackupEngine
随后Facebook遇到了客户的备份恢复需求,业务希望DelosTable能够提供PITR的能力(Point-in-Time Recovery)。LogBackupEngine的功能就是在log被trim前,将其复制到备份存储系统里,这样就可以利用备份的log快速恢复到任意一个时刻的数据库状态。高效的备份log需要所有节点一起配合,不能对任何一个节点带来额外的负担,也不能因为一个节点的故障而延迟备份。
类似ViewTrackingEngine,LogBackupEngine使用log自身作为协调的机制。该机制的状态是一个map,保存了每个节点的 投标 (bids),该投标指示每个节点想要上传的的没有交集的log segments。每个节点会propose一个控制命令为segments投标,一旦某个节点中标,那么其开始上传中标的segment到backupstore,上传成功后,会propose一个成功的Log通知其他节点。除此之外,LogBackupEngine也会延迟trimming log,前面说过,triming的机制是通过上层settrimPrefix接口一层一层经过每一层Engine的同意才会传到BaseEngine,所以LogBackupEngine会根据备份的状态去重置trimming的位置。

Stronger Guarantees for Zelos

Zelos是基于Delos实现的提供Zookeeper协议的数据库,基于之前DelosTable已经实现的Engine,Zelos只用实现一个提供Zookeeper语义的应用层即可。但是当时他们已经开发的Engine都是linearizale的语义,而zk是要求session-ordering的语义(在同一个session里,如果一个client下发了一个write请求,然后紧接着下发了一个read请求,在write请求返回之前,read请求是要求可以看到write请求的结果的),所以他们单独开发了一个SessionOrderEngine。
  1. SessionOrderEngine
SessionOrderEngine对每一个经过的proposal entry添加一个sequence number,当proposals apply的时候,其只会按照顺序apply entry,如果收到一个log entry,其前面的log还没有apply则会被过滤掉,然后重新propose乱序的log entries。比如11号log比10号log先apply,则会过滤掉,然后重新propose 10号和11号,为了实现这种效果,propose不再等待apply完成,二十通过postApply去通知propose,然后是否进行retry(这里实现细节论文里表述不是很清晰)。
  1. TimeEngine
根据时间去trim Log,没有上线,所以这里不做过多介绍。

Improving performance

  1. BatchingEngine
为了提供存储系统的吞吐,batching是一个常用的技术,在Delos当前分层的架构下,batching放在哪一层是他们考虑的一个很重要的一点。如果放在Engine Stacks上面一层,也就是应用层,那么DelosTable和Zelos都需要单独实现一套batching的逻辑;如果放在共享日志一层,那么在propose append到共享日志的时候做了batch,但是在apply的时候还是一条一条日志进行apply的,那么BaseEngine不得不为每一条Log entry创建一个事务进行apply;如果放在Engine层,使用一个新的BatchingEngine,在propose和apply的时候都可以使用batch的优势,对一批日志一次性append到共享日志,然后apply的时候就可以使用group commit的优化。
  1. LeaseEngine
可以看到当前的实现,所有节点都是对等的,每个节点都可以进行propose,也可以调用sync从而实现读的强一致。这种设计在任何一个节点故障的时候,可以保证整个系统都是可用的。但是sync要求去检查shared log,也就意味着一次网络请求。但是如果有个leader,所有的读都是从leader读取就不需要额外的网络请求就可以提供强一致读。
LeaseEngine通过向共享日志append一个entry来选举和更新leader的Lease。当调用sync的时候,如果前期leader的lease没有过期,那么就可以直接返回,不需要去tail共享日志获取最新的状态。

Evaluation

截止2021年5月,DelosTable线上已经上线107个集群,每天处理3B事务请求。Zelos线上155个集群,每天处理6.5B写请求和30B读请求。
  • Log-Structured协议是轻量的。每个Engine对CPU的使用主要消耗在apply函数,额外增加很小的开销
  • apply线程不是瓶颈。图8可以看到,99分位的cpu使用率很少超过60%,SSD可能是主要瓶颈。
notion image
notion image
  • 该协议可以很好的提高性能。图9和图10分别是batching和lease带来的吞吐和延迟的优化
notion image
notion image
  • 该协议可以提高系统的观测性。通过增加一个通用ObserveEngine可以更好的观测系统的性能,图11是线上每个Engine的propose延迟
notion image
 

Reference

  1. Virtual Consensus in Delos
  1. SOSP 2021 Q&A: Session 12: Scale: Log-structured Protocols in Delos
  1. Log-structured Protocols in Delos
  1. The FuzzyLog: A Partially Ordered Shared Log
  1. 📝
    Bookeeper: A scalable, fault-tolerant, and low-latency storage service optimized for real-time workloads