Linux内核同步机制mutex详解

嵌入式技术

1367人已加入

描述

Linux内核同步机制mutex

mutex锁概述

在linux内核中,互斥量mutex是一种保证CPU串行运行的睡眠锁机制。和spinlock类似,都是同一个时刻只有一个线程进入临界资源,不同的是,当无法获取锁的时候,spinlock原地自旋,而mutex则是选择挂起当前线程,进入阻塞状态。所以,mutex无法在中断上下文中使用。

mutex锁使用注意事项

  • mutex一次只能有一个进程或线程持有该锁
  • mutex只有它的拥有者可以释放该锁
  • 不能多次释放同一把锁
  • 不可以重复获取同一把锁,否则会造成死锁
  • 必须使用mutex提供的专用初始化函数初始化该锁
  • 不能重复初始化同一把锁
  • 不能使用memsetmemcpy等内存处理函数初始化mutex锁
  • 线程退出时要释放自己持有的所有mutex锁
  • 不能用于设备中断或软中断上下文中

mutex锁结构体定义

  • owner:记录mutex的持有者
  • wait_lock:spinlock自旋锁
  • soq:MCS锁队列,用于支持mutex乐观自旋机制
  • wait_list:当无法获取锁的时候挂起在此
  • magic:用于debug调试
  • dep_map:用于debug调试
struct mutex {
 atomic_long_t  owner;
 spinlock_t  wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
 struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
 struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
 void   *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
 struct lockdep_map dep_map;
#endif
};

mutex锁主要接口函数

mutex_init 初始化mutex对象
__mutex_init mutex_init会调用此函数
DEFINE_MUTEX 静态定义并初始化一个mutex对象
__MUTEX_INITIALIZER DEFINE_MUTEX会调用此函数
mutex_lock 获取mutex锁,失败进程进入D状态
mutex_lock_interruptible 获取mutex锁,失败进入S状态
mutex_trylock 尝试获取mutex锁,失败直接返回
mutex_unlock 释放mutex锁
mutex_is_locked 判断当前mutex锁的状态

获取锁流程分析

mutex_lock()函数调用might_sleep()函数判断锁的状态,调用__mutex_trylock_fast()函数尝试快速获取mutex锁,如果失败,则调用__mutex_lock_slowpath()函数获取mutex

void __sched mutex_lock(struct mutex *lock)
{
 might_sleep();

 if (!__mutex_trylock_fast(lock))
  __mutex_lock_slowpath(lock);
}

如果没有定义CONFIG_DEBUG_ATOMIC_SLEEP宏,might_sleep 函数退化为 might_resched() 函数。

# define might_sleep() \\
 do { __might_sleep(__FILE__, __LINE__, 0); might_resched(); } while (0)
# define sched_annotate_sleep() (current- >task_state_change = 0)
#else
  static inline void ___might_sleep(const char *file, int line,
       int preempt_offset) { }
  static inline void __might_sleep(const char *file, int line,
       int preempt_offset) { }
# define might_sleep() do { might_resched(); } while (0)
# define sched_annotate_sleep() do { } while (0)

在配置了抢占式内核或者非抢占式内核的情况下,might_resched() 函数最终都是空函数。如果配置了主动抢占式内核CONFIG_PREEMPT_VOLUNTARY,则might_resched()函数会调用 _cond_resched() 函数来主动触发一次抢占。

#ifdef CONFIG_PREEMPT_VOLUNTARY
extern int _cond_resched(void);
# define might_resched() _cond_resched()
#else
# define might_resched() do { } while (0)
#endif

#ifndef CONFIG_PREEMPT
extern int _cond_resched(void);
#else
static inline int _cond_resched(void) { return 0; }
#endif

——cond_resched()函数调用should_resched()函数判断抢占计数器是否为0,如果抢占计数器为0并且设置了重新调度标记则调用preempt_schedule_common()函数进行抢占式调度

#ifndef CONFIG_PREEMPT
int __sched _cond_resched(void)
{
 if (should_resched(0)) {
  preempt_schedule_common();
  return 1;
 }
 return 0;
}
EXPORT_SYMBOL(_cond_resched);
#endif

__mutex_trylock_fast()函数调用atomic_long_cmpxchg_acquire()函数判断lock->owner的值是否等于0,如果等于0,则直接将当前线程的task struct的指针赋值给lock->owner,表示该mutex锁已经被当前线程持有。如果lock->owner的值不等于0,则表示该mutex锁已经被其他线程持有或者锁正在传递给top waiter线程,当前线程需要阻塞等待。上面描述的操作(比较和赋值)都是原子操作,不会有任何指令插入。

static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
 unsigned long curr = (unsigned long)current;

 if (!atomic_long_cmpxchg_acquire(&lock- >owner, 0UL, curr))
  return true;

 return false;
}

慢速获取mutex锁的路径就是__mutex_lock_common()函数,所谓慢速其实就是阻塞当前线程,将current task挂入mutex的等待队列的尾部。让所有等待mutex的任务按照时间的先后顺序排列起来,当mutex被释放的时候,会首先唤醒队首的任务,即最先等待的任务最先被唤醒。此外,在向空队列插入第一个任务的时候,会给mutex flag设置上MUTEX_FLAG_WAITERS标记,表示已经有任务在等待这个mutex锁了。

static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{
 __mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}
static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
      struct lockdep_map *nest_lock, unsigned long ip)
{
 return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
      struct lockdep_map *nest_lock, unsigned long ip,
      struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
 struct mutex_waiter waiter;
 bool first = false;
 struct ww_mutex *ww;
 int ret;

 might_sleep();

 ww = container_of(lock, struct ww_mutex, base);
 if (use_ww_ctx && ww_ctx) {
  if (unlikely(ww_ctx == READ_ONCE(ww- >ctx)))
   return -EALREADY;
 }

 preempt_disable();
 mutex_acquire_nest(&lock- >dep_map, subclass, 0, nest_lock, ip);

 if (__mutex_trylock(lock) ||
     mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
  /* got the lock, yay! */
  lock_acquired(&lock- >dep_map, ip);
  if (use_ww_ctx && ww_ctx)
   ww_mutex_set_context_fastpath(ww, ww_ctx);
  preempt_enable();
  return 0;
 }

 spin_lock(&lock- >wait_lock);
 /*
  * After waiting to acquire the wait_lock, try again.
  */
 if (__mutex_trylock(lock)) {
  if (use_ww_ctx && ww_ctx)
   __ww_mutex_wakeup_for_backoff(lock, ww_ctx);

  goto skip_wait;
 }

 debug_mutex_lock_common(lock, &waiter);
 debug_mutex_add_waiter(lock, &waiter, current);

 lock_contended(&lock- >dep_map, ip);

 if (!use_ww_ctx) {
  /* add waiting tasks to the end of the waitqueue (FIFO): */
  list_add_tail(&waiter.list, &lock- >wait_list);

#ifdef CONFIG_DEBUG_MUTEXES
  waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
 } else {
  /* Add in stamp order, waking up waiters that must back off. */
  ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
  if (ret)
   goto err_early_backoff;

  waiter.ww_ctx = ww_ctx;
 }

 waiter.task = current;

 if (__mutex_waiter_is_first(lock, &waiter))
  __mutex_set_flag(lock, MUTEX_FLAG_WAITERS);

 set_current_state(state);
 for (;;) {
  /*
   * Once we hold wait_lock, we're serialized against
   * mutex_unlock() handing the lock off to us, do a trylock
   * before testing the error conditions to make sure we pick up
   * the handoff.
   */
  if (__mutex_trylock(lock))
   goto acquired;

  /*
   * Check for signals and wound conditions while holding
   * wait_lock. This ensures the lock cancellation is ordered
   * against mutex_unlock() and wake-ups do not go missing.
   */
  if (unlikely(signal_pending_state(state, current))) {
   ret = -EINTR;
   goto err;
  }

  if (use_ww_ctx && ww_ctx && ww_ctx- >acquired > 0) {
   ret = __ww_mutex_lock_check_stamp(lock, &waiter, ww_ctx);
   if (ret)
    goto err;
  }

  spin_unlock(&lock- >wait_lock);
  schedule_preempt_disabled();

  /*
   * ww_mutex needs to always recheck its position since its waiter
   * list is not FIFO ordered.
   */
  if ((use_ww_ctx && ww_ctx) || !first) {
   first = __mutex_waiter_is_first(lock, &waiter);
   if (first)
    __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
  }

  set_current_state(state);
  /*
   * Here we order against unlock; we must either see it change
   * state back to RUNNING and fall through the next schedule(),
   * or we must see its unlock and acquire.
   */
  if (__mutex_trylock(lock) ||
      (first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
   break;

  spin_lock(&lock- >wait_lock);
 }
 spin_lock(&lock- >wait_lock);
acquired:
 __set_current_state(TASK_RUNNING);

 mutex_remove_waiter(lock, &waiter, current);
 if (likely(list_empty(&lock- >wait_list)))
  __mutex_clear_flag(lock, MUTEX_FLAGS);

 debug_mutex_free_waiter(&waiter);

skip_wait:
 /* got the lock - cleanup and rejoice! */
 lock_acquired(&lock- >dep_map, ip);

 if (use_ww_ctx && ww_ctx)
  ww_mutex_set_context_slowpath(ww, ww_ctx);

 spin_unlock(&lock- >wait_lock);
 preempt_enable();
 return 0;

err:
 __set_current_state(TASK_RUNNING);
 mutex_remove_waiter(lock, &waiter, current);
err_early_backoff:
 spin_unlock(&lock- >wait_lock);
 debug_mutex_free_waiter(&waiter);
 mutex_release(&lock- >dep_map, 1, ip);
 preempt_enable();
 return ret;
}

释放锁流程分析

释放锁的流程也分快速释放和慢速释放两种路径

void __sched mutex_unlock(struct mutex *lock)
{
#ifndef CONFIG_DEBUG_LOCK_ALLOC
 if (__mutex_unlock_fast(lock))
  return;
#endif
 __mutex_unlock_slowpath(lock, _RET_IP_);
}

如果一个线程获取了mutex锁之后,没有其他的线程试图获取,此时的mutexowner成员就是该线程的task struct地址,并且所有的mutex flag都没有被设置。这时候只需要将mutexowner成员清零即可,不需要其它操作,这就是快速释放锁的路径。

static __always_inline bool __mutex_unlock_fast(struct mutex *lock)
{
 unsigned long curr = (unsigned long)current;

 if (atomic_long_cmpxchg_release(&lock- >owner, curr, 0UL) == curr)
  return true;

 return false;
}

如果有其他线程在竞争该mutex锁,这时候就会进入慢速释放锁路径,慢速释放锁路径的逻辑分成两段:一段是释放mutex锁,另外一段是唤醒top waiter线程。

static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
 struct task_struct *next = NULL;
 DEFINE_WAKE_Q(wake_q);
 unsigned long owner;

 mutex_release(&lock- >dep_map, 1, ip);

 /*
  * Release the lock before (potentially) taking the spinlock such that
  * other contenders can get on with things ASAP.
  *
  * Except when HANDOFF, in that case we must not clear the owner field,
  * but instead set it to the top waiter.
  */
 owner = atomic_long_read(&lock- >owner);
 for (;;) {
  unsigned long old;

#ifdef CONFIG_DEBUG_MUTEXES
  DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
  DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif

  if (owner & MUTEX_FLAG_HANDOFF)
   break;

  old = atomic_long_cmpxchg_release(&lock- >owner, owner,
        __owner_flags(owner));
  if (old == owner) {
   if (owner & MUTEX_FLAG_WAITERS)
    break;

   return;
  }

  owner = old;
 }

 spin_lock(&lock- >wait_lock);
 debug_mutex_unlock(lock);
 if (!list_empty(&lock- >wait_list)) {
  /* get the first entry from the wait-list: */
  struct mutex_waiter *waiter =
   list_first_entry(&lock- >wait_list,
      struct mutex_waiter, list);

  next = waiter- >task;

  debug_mutex_wake_waiter(lock, waiter);
  wake_q_add(&wake_q, next);
 }

 if (owner & MUTEX_FLAG_HANDOFF)
  __mutex_handoff(lock, next);

 spin_unlock(&lock- >wait_lock);

 wake_up_q(&wake_q);
}

总结

本篇主要介绍了mutex互斥锁的使用注意事项,介绍了mutex的主要函数接口以及获取锁的流程分析和释放锁的流程分析。通过本文的学习,我们基本可以了解了mutex的实现机制和使用方法。

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分