电子说
现实生活中时间可以记录事情发生的时刻、比较事情发生的先后顺序。
分布式系统的一些场景也需要记录和比较不同节点间事件发生的顺序。如数据写入先后顺序,事件发生的先后顺序等等。
复习下离散数学中关系:
假设A是一个集合 {1,2,3,4} ;R是集合A上的关系,例如{<1,1>,<2,2>,<3,3>,<4,4>,<1,2>,<1,4>,<2,4>,<3,4>}
集合内只有部分元素之间是可以比较的。
偏序关系的定义(R为A上的偏序关系):设R是集合A上的一个二元关系,若R满足:
一个partitial ordering关系满足的条件是自反的,反对称的和可传递的,因此在partitial ordering中,可能有两个元素之间是不相关的。
集合内只有部分元素之间是可以比较的。
比如:比如复数集中并不是所有的数都可以比较大小,那么“大小”就是复数集的一个偏序关系。
全序关系的定义:
完全性本身也包括了自反性,所以全序关系是偏序关系。
所以偏序中满足完全性就是全序了。
一个total ordering关系满足的条件是反对称的,可传递的和完全性,因此在total ordering中,两个元素一定是有关系的,要么是a<>b或b<>a。
在分布式系统中,一个进程包含一系列的事件,对于同一进程内的事件,如果a happens before b,那么a发生在b之前。并且,假定收或发消息都是一个事件。
happens before的定义如下(用->表示)
[图来自Time, Clocks, and the Ordering of Events in a Distributed System]
以一个例子来说明happens before关系,如上图,垂直线上代表一个进程,从下往上,时间依次增加,水平的距离代表空间的隔离。原点代表一个事件,而曲线代表一条消息。
从图中很容易地看出,如果一个事件a,能通过进程的线和消息线,到达b,那么a->b。
在图中,p3和q4是并行的事件,因为,只有到了p4才能确定q4的发生,而q3也只能确定p1发生。
计算机有固定频率晶体的震荡次数,晶体的振荡周期决定了单机的时钟精度。
时钟频率也可能因为温度等外部因素导致时钟偏移,普通的石英晶体的漂移大10 ^-6^ 。 原子钟的漂移约为 10^-13^ ,所以原子钟精度远远高于石英晶体。
不同机器上的物理时钟难以同步,导致无法区分在分布式系统中多个节点的事件时序。即使设置了 NTP 时间同步节点间也存在毫秒级别的偏差,因而分布式系统需要有另外的方法记录事件顺序关系。
1978年Lamport在《Time, Clocks and the Ordering of Events in a Distributed System》中提出了逻辑时钟的概念,来解决分布式系统中区分事件发生的时序问题。
逻辑时钟指的是分布式系统中用于区分事件的发生顺序的时间机制。 从某种意义上讲,现实世界中的物理时间其实是逻辑时钟的特例。
Logical Clock解决的问题是找到一种方法,给分布式系统中所有时间定一个序,这个序能够正确地排列出具有因果关系的事件(注意,是不能保证并发事件的真实顺序的),使得分布式系统在逻辑上不会发生因果倒置的错误。因果一致性
Time, Clocks, and the Ordering of Events in a Distributed System
Leslie Lamport 在1978年提出逻辑时钟的概念,并描述了一种逻辑时钟的表示方法,这个方法被称为Lamport时间戳(Lamport timestamps)。
分布式系统中按是否存在节点交互可分为三类事件:
时钟的定义如下
for any events a, b
if a->b then C(a) < C(b)
根据关系->的定义,我们可以得出
为了让系统满足上述条件,在实现中,需要满足以下原则
假设有事件a、b,C(a)、C(b)分别表示事件a、b对应的Lamport时间戳,如果a->b,则C(a) < C(b),a发生在b之前(happened before)。
所以Lamport timestamps原理如下:
通过该定义,事件集中Lamport时间戳不等的事件可进行比较,我们获得事件的偏序关系(partial order)。
上图更形象的解释了事件之间的关系。
以B4事件为基准:
Lamport timestamps只保证因果关系(偏序)的正确性,不保证绝对时序的正确性。
由于Lamport timestamps只能得到偏序关系,如果要得到全序关系,就需要给Ci(a) = Cj(b)的事件定一个先后顺序。
total order的事件关系=>定义如下:
如果事件a发生在进程Pi,事件b发生在进程Pj,那么当满足下列两者条件之一时,a=>b
根据以上条件,对于任意的两个事件,都能判断出它们之间的关系,因此是total ordering的。
当Lamport timestamp一致时,通过义Pi < Pj来定义顺序,确保分布式场景下各个进程间发生的事件的全序定义。至于Pj < Pj:可采用不同的方式,Lamport Logical Clock提到的 arbitrary total ordering。
Lamport timestamp得到的是全序关系,但无法严格表示对于没有因果关系、存在同时发生关系(concurrent)的事件。
Vector clock是在Lamport timestamp基础上改进的一种逻辑时钟方法,它构不但记录本节点的Lamport timestamp,同时也记录了其他节点的Lamport timestamp。
原理如下:
图来源于wikipedia
vector clock判定并发关系:
和之前lamport timestamp的一样,以B4事件为基准(vector clock为[A:2,B:4,C:1]),根据vector clock的判定,可以判断出
特性:
分布式系统多个副本被同时更新时,会导致副本之间数据的不一致。version vector用于来发现这些不一致的冲突。
version vector只能发现冲突,无法解决冲突;当然也可以通过再添加一个维度信息timestamp,发生冲突时进行比较,但是又回到了物理时钟不同步的问题。
下图展示了数据由不同副本处理后导致的不同版本冲突。
D5时发现了数据的冲突,这时会将不同版本数据都存储下来,一般由客户端来解决冲突。
version vector与vector clock的差异
分布式系统中,每个节点的物理时钟是不同步的,都有一定的差异。
这样就带来了一些分布式系统实现的难题,如基于MVCC实现的事务,基于MVCC实现事务会要求版本之间能判断先后顺序,只有确定先后才知道应该用哪一个版本的数据,确定先后顺序就涉及到时间,而不同机器之间的本地时钟是无法保证一致的,所以这就需要确保时钟的同步。
而通常解决方案有两种:
如果我们整个系统不复杂,而且没有跨全球的需求,这时用一台中心授时服务就可以了。
如TiDB使用的就是TSO方案,tipb作为一个TSO集群,来提供授时服务。
使用TSO的好处在于因为只有一个中心授时,所以我们一定能确定所有时间的时间,但TSO需要关注几个问题:
由于节点间NTP是有偏差的,且可能出现时间回退的情况,所以NTP无法准确的判定事件的全序关系。在Google Spanner里面,通过引入True Time来解决了分布式时间问题。
Spanner通过使用GPS + 原子钟atomic clock来对集群的机器时间进行校对,保证了集群机器的时间戳差距不会超过一个上限值(ε)。
用两种技术来处理,是因为导致这两种技术的失败的原因是不同的。
虽然spanner引入了TrueTime可以得到全球范围的时序一致性,但由于TrueTime返回的时间仍然有一定的偏差,如果要给两个事件定序,就需要等待2个偏差的时间间隔,来确保其先后顺序。
事件b时间 - 事件a时间 > 2ε
Logical Physical Clocks and Consistent Snapshots in Globally Distributed Databases
TrueTime 需要硬件的支持,所以有一定的成本,而HLC无需硬件支持也能解决分布式下时间问题。
HLC同时使用了物理时钟和逻辑时钟(physical clock + logical clock),能够保证单点的时间发生器是单调递增的,同时能够尽量控制不同节点之间的时钟偏差在规定的偏差范围内。
判断两个事件的先后顺序:先判断物理时间,再判断逻辑时间。
l.j维护的是节点j当前已知的最大的物理时间(wall time),c.j则是当前的逻辑时间。
// 在节点j上面:初始化: l.j = 0,c.j = 0。
initially l.j :=0; c.j := 0
// 本地事件或者发送消息时,
// 如果本地时钟pt大于当前的混合逻辑时钟的l,
// 则将l更新成本地时钟,将c清零。
// 否则,l保持不变,将c加1。
Send or local event
{
l'.j := l.j;
l.j := max(l'.j, pt.j); // 本地物理时间pt
if (l.j = l'.j) then
c.j := c.j+1
else
c.j := 0;
Timestamp with l.j, c.j
}
// 收到消息时
// l在 当前的逻辑时钟的l、机器的本地时钟pt、收到消息里面带的l,三者中取最大的。
// 如果l部分是更新为本地时钟了,则将c清零。否则,c取较大的那个l对应到的c加1。
Receive event of message m
{
l'.j := l.j;
l.j := max(l'.j, l.m, pt.j);
if (l.j = l'.j = l.m) then
c.j := max(c.j, c.m) + 1
elseif (l.j=l'.j) then
c.j := c.j + 1
elseif (l.j=l.m) then
c.j := c.m + 1
else
c.j := 0
Timestamp with l.j, c.j
}
HLC算法保证了HLC时间有如下特性:
CockroachDB采用基于NTP时钟同步的HLC去中心化方案。
所有节点间的RPC消息都会把时间戳带入到消息中,接收到消息的节点会通过消息中的时间戳更新自己的时间, 从而达到节点间时间同步的效果。
HLC定义
// Timestamp represents a state of the hybrid logical clock.
type Timestamp struct {
// Holds a wall time, typically a unix epoch time
// expressed in nanoseconds.
WallTime int64 `protobuf:"varint,1,opt,name=wall_time,json=wallTime" json:"wall_time"`
// The logical component captures causality for events whose wall
// times are equal. It is effectively bounded by (maximum clock
// skew)/(minimal ns between events) and nearly impossible to
// overflow.
Logical int32 `protobuf:"varint,2,opt,name=logical" json:"logical"`
}
获取物理时钟
// PhysicalNow returns the local wall time. It corresponds to the physicalClock
// provided at instantiation. For a timestamp value, use Now() instead.
func (c *Clock) PhysicalNow() int64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.getPhysicalClockLocked()
}
// getPhysicalClockLocked returns the current physical clock and checks for
// time jumps.
func (c *Clock) getPhysicalClockLocked() int64 {
// physicalClock 就是 UnixNano
newTime := c.physicalClock()
if c.mu.lastPhysicalTime != 0 {
interval := c.mu.lastPhysicalTime - newTime
// 检查时钟是否回退
if interval > int64(c.maxOffset/10) {
c.mu.monotonicityErrorsCount++
log.Warningf(context.TODO(), "backward time jump detected (%f seconds)", float64(-interval)/1e9)
}
}
c.mu.lastPhysicalTime = newTime
return newTime
}
// UnixNano returns the local machine's physical nanosecond
// unix epoch timestamp as a convenience to create a HLC via
// c := hlc.NewClock(hlc.UnixNano, ...).
func UnixNano() int64 {
return timeutil.Now().UnixNano()
}
获取当前HLC时钟
// Now returns a timestamp associated with an event from
// the local machine that may be sent to other members
// of the distributed network. This is the counterpart
// of Update, which is passed a timestamp received from
// another member of the distributed network.
func (c *Clock) Now() Timestamp {
c.mu.Lock()
defer c.mu.Unlock()
if physicalClock := c.getPhysicalClockLocked(); c.mu.timestamp.WallTime >= physicalClock {
// The wall time is ahead, so the logical clock ticks.
c.mu.timestamp.Logical++
} else {
// Use the physical clock, and reset the logical one.
c.mu.timestamp.WallTime = physicalClock
c.mu.timestamp.Logical = 0
}
return c.mu.timestamp
}
节点时钟同步
节点之间通过在RPC请求中携带HLC时间来进行时钟同步。
// sendSingleRange gathers and rearranges the replicas, and makes an RPC call.
func (ds *DistSender) sendSingleRange(
ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor,
) (*roachpb.BatchResponse, *roachpb.Error) {
......
br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba)
if err != nil {
log.ErrEvent(ctx, err.Error())
return nil, roachpb.NewError(err)
}
// If the reply contains a timestamp, update the local HLC with it.
if br.Error != nil && br.Error.Now != (hlc.Timestamp{}) {
ds.clock.Update(br.Error.Now)
} else if br.Now != (hlc.Timestamp{}) {
ds.clock.Update(br.Now)
}
......
}
// Update takes a hybrid timestamp, usually originating from
// an event received from another member of a distributed
// system. The clock is updated and the hybrid timestamp
// associated to the receipt of the event returned.
// An error may only occur if offset checking is active and
// the remote timestamp was rejected due to clock offset,
// in which case the timestamp of the clock will not have been
// altered.
// To timestamp events of local origin, use Now instead.
func (c *Clock) Update(rt Timestamp) Timestamp {
c.mu.Lock()
defer c.mu.Unlock()
// 如果本地物理时间pt
physicalClock := c.getPhysicalClockLocked()
// 大于本地WallTime且大于rt.WallTime:
// 更新本地WallTime=pt,且logical=0
if physicalClock > c.mu.timestamp.WallTime && physicalClock > rt.WallTime {
// Our physical clock is ahead of both wall times. It is used
// as the new wall time and the logical clock is reset.
c.mu.timestamp.WallTime = physicalClock
c.mu.timestamp.Logical = 0
return c.mu.timestamp
}
// In the remaining cases, our physical clock plays no role
// as it is behind the local or remote wall times. Instead,
// the logical clock comes into play.
// 如果rt.WallTime > 本地WallTime:
// 检查rt.WallTime与pt是否大于时钟偏差;
// 本地WallTime=rt.WallTime,logical++
if rt.WallTime > c.mu.timestamp.WallTime {
offset := time.Duration(rt.WallTime-physicalClock) * time.Nanosecond
if c.maxOffset > 0 && offset > c.maxOffset {
log.Warningf(context.TODO(), "remote wall time is too far ahead (%s) to be trustworthy - updating anyway", offset)
}
// The remote clock is ahead of ours, and we update
// our own logical clock with theirs.
c.mu.timestamp.WallTime = rt.WallTime
c.mu.timestamp.Logical = rt.Logical + 1
} else if c.mu.timestamp.WallTime > rt.WallTime {
// 如果本地WallTime>rt.WallTime:logical++
// Our wall time is larger, so it remains but we tick
// the logical clock.
c.mu.timestamp.Logical++
} else {
// Both wall times are equal, and the larger logical
// clock is used for the update.
if rt.Logical > c.mu.timestamp.Logical {
c.mu.timestamp.Logical = rt.Logical
}
c.mu.timestamp.Logical++
}
return c.mu.timestamp
}
全部0条评论
快来发表一下你的评论吧 !