Linux中的伤害/等待互斥锁介绍

描述

序言:近期读Linux 5.15的发布说明,该版本合并了实时锁机制,当开启配置宏CONFIG_PREEMPT_RT的时候,这些锁被基于实时互斥锁的变体替代:mutex、ww_mutexrw_semaphorespinlock和rwlock。第一次听说ww_mutex,在百度上查找的时候发现介绍文档很少,于是自己学习,写成笔记。在某些场合必须同时持有多个锁,并且获取锁的顺序可能不同,为了避免死锁,应该使用伤害/等待互斥锁(Wound/Wait Mutexes)。获取一个锁集合称为一个事务(transaction),每个事务关联一张门票(ticket),门票也称为序列号,根据门票判断哪个事务年轻。有2种处理死锁的方法,如下。(1) 等待-死亡(Wait-Die)算法:一个事务申请另一个事务已经获取的锁的时候,如果持有锁的事务年轻,那么申请锁的事务等待(wait);如果持有锁的事务年老,那么申请锁的事务退并且死亡(die)

(2) 4.19版本开始支持伤害-等待(Wound-Wait)算法:一个事务申请另一个事务已经获取的锁的时候,如果持有锁的事务年轻,那么申请锁的事务伤害(wound)持有锁的事务,请求它去死亡;如果持有锁的事务年老,那么申请锁的事务等待(wait)

假设进程1和进程2分别在2个处理器上运行,进程1获取锁A,进程2获取锁B,然后进程1申请锁B,进程2申请锁A。假设进程1的门票编号比进程2的门票编号小,也就是进程1年老,进程2年轻。假设选择等待-死亡算法。年老的进程1申请锁B,发现持有锁B的进程2年轻,那么年老的进程1等待。年轻的进程2申请锁A,发现持有锁A的进程1年老,那么年轻的进程2死亡(即申请锁的函数返回“-EDEADLK”),接着回滚(即释放已经获取的锁B),然后重新开始:先申请锁A然后申请锁B(必须改变申请顺序,如果先申请锁B,那么会把刚释放的锁B抢回来)。假设选择伤害-等待算法。年老的进程1申请锁B,发现持有锁B的进程2年轻,那么伤害年轻的进程2,请求它死亡。年轻的进程2申请锁A,发现持有锁A的进程1年老,那么年轻的进程2等待,在收到进程1的死亡请求以后,年轻的进程2死亡(即申请锁的函数返回“-EDEADLK”),接着回滚(即释放已经获取的锁B),然后重新开始:先申请锁A然后申请锁B。两种算法都是公平的,因为其中一个事务最终会成功。和等待-死亡算法相比,伤害-等待算法生成的退避少,但是从一次退避恢复的时候要做更多的工作。伤害-等待算法是一种抢占性的算法(因为事务被其它事务伤害),需要一种可靠的方法来选择受伤状态和抢占正在运行的事务。在伤害-等待算法中,一个事务在受伤后死亡(返回“-EDEADLK”),就认为这个事务被抢占。如果竞争锁的进程少,并且希望减少回滚的次数,那么应该选择伤害-等待算法。  和普通的互斥锁相比,伤害/等待互斥锁增加了下面2个概念。

(1) 获取上下文(acquire context):一个获取上下文表示一个事务,关联一张门票(ticket),门票也称为序列号,门票编号小表示年老,门票编号大表示年轻。获取上下文跟踪调试状态,捕获对伤害/等待互斥锁接口的错误使用。

(2) 伤害/等待类初始化获取上下文的时候需要指定锁类,锁类会给获取上下文分配门票。锁类也指定算法:等待-死亡(Wait-Die)或伤害-等待(Wound-Wait)。当多个进程竞争同一个锁集合的时候,它们必须使用相同的锁类。

 3种获取伤害/等待互斥锁的函数,如下。

(1) 普通的获取锁函数ww_mutex_lock(),带有获取上下文。

(2) 进程在回滚(即释放所有已经获取的锁)以后,使用慢路径获取锁函数ww_mutex_lock_slow()获取正在竞争的锁。带有“_slow”后缀的函数不是必需的,因为可以调用函数ww_mutex_lock()获取正在竞争的锁。带有“_slow”后缀的函数的优点是接口安全,如下。

  • 函数ww_mutex_lock()有一个整数返回值,而函数ww_mutex_lock_slow()没有返回值。
  • 当开启调试的时候,函数ww_mutex_lock_slow()检查所有已经获取的锁已经被释放,并且确保进程阻塞在正在竞争的锁上面。

(3) 只获取一个伤害/等待互斥锁,和获取普通的互斥锁完全相同。调用函数ww_mutex_lock(),把获取上下文指定为空指针。

 伤害/等待互斥锁的使用方法如下。

(1) 定义一个锁类,锁类在初始化获取上下文的时候需要,锁类也指定算法:等待-死亡(Wait-Die)或伤害-等待(Wound-Wait)。

  •  
  •  
  •  
  •  
  •  
/* 指定等待-死亡算法 */static DEFINE_WD_CLASS(my_class);
/* 指定伤害-等待算法 */static DEFINE_WW_CLASS(my_class);

(2) 初始化一个获取上下文,锁类会给获取上下文分配一张门票。

  •  
void ww_acquire_init(struct ww_acquire_ctx *ctx, struct ww_class *ww_class);
(3) 获取锁,返回0表示获取成功,返回“-EDEADLK”表示检测出死锁。
  •  
int ww_mutex_lock(struct ww_mutex *lock, struct ww_acquire_ctx *ctx);
(4) 获取需要的所有锁以后,标记获取阶段结束。目前这个函数没有执行任何操作,但是将来可能改变。
  •  
void ww_acquire_done(struct ww_acquire_ctx *ctx);
(5) 释放锁。
  •  
void ww_mutex_unlock(struct ww_mutex *lock);
(6) 释放所有锁以后,释放获取上下文。
  •  
void ww_acquire_fini(struct ww_acquire_ctx *ctx);

 

下面是一个例子,注意:调用函数ww_mutex_lock()申请锁失败以后,应该先释放已经获取的锁,然后调用慢路径函数ww_mutex_lock_slow()获取正在竞争的锁,最后获取其它锁。重新开始申请锁的时候必须改变申请顺序,因为如果按照原来的顺序申请锁,那么会把刚释放的锁抢回来。
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
/* 第1步:定义锁类,指定伤害-等待算法。*/static DEFINE_WW_CLASS(ww_class);
struct obj {  struct ww_mutex lock;  /* obj data */};
struct obj_entry {  struct list_head head;  struct obj *obj;};
int lock_objs(struct list_head *list, struct ww_acquire_ctx *ctx){  struct obj *res_obj = NULL;  struct obj_entry *contended_entry = NULL;  struct obj_entry *entry;  int ret;
  /* 第2步:初始化获取上下文。*/  ww_acquire_init(ctx, &ww_class);
  /* 第3步:获取锁。*/retry:  list_for_each_entry(entry, list, head) {    if (entry->obj == res_obj) {      res_obj = NULL;      continue;    }
    ret = ww_mutex_lock(&entry->obj->lock, ctx);    if (ret < 0) {      contended_entry = entry;      goto err;    }  }
  /* 第4步:标记获取阶段结束。*/  ww_acquire_done(ctx);  return 0;
err:  /* 回滚,释放已经获取的锁。*/  list_for_each_entry_continue_reverse(entry, list, head) {    ww_mutex_unlock(&entry->obj->lock);  }
  if (res_obj) {    ww_mutex_unlock(&res_obj->lock);  }
  if (ret == -EDEADLK) {    /* 使用慢路径获取锁函数获取正在竞争的锁。*/    ww_mutex_lock_slow(&contended_entry->obj->lock, ctx);    res_obj = contended_entry->obj;    /* 获取其它锁。*/    goto retry;  }  ww_acquire_fini(ctx);
  return ret;}
void unlock_objs(struct list_head *list, struct ww_acquire_ctx *ctx){  struct obj_entry *entry;
  /* 第5步:释放锁。*/  list_for_each_entry (entry, list, head) {    ww_mutex_unlock(&entry->obj->lock);  }
  /* 第6步:释放获取上下文。*/  ww_acquire_fini(ctx);}
  责任编辑:haq

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分