Futex
简介
概述: Futex(Fast Userspace Mutex)是linux的一种特有机制,设计目标是避免传统的线程同步原语(如mutex、条件变量等)在用户空间和内核空间之间频繁的上下文切换。Futex允许在用户空间处理锁定和等待的操作,只有在必要时进入内核,从而减少了不必要的开销。
对比:
- SpinLock:如果上锁成功,立即进入临界区,开销很小;但是如果上锁失败,CPU空转,浪费资源
- Mutex:如果上锁失败,内核会切换到其他线程在CPU运行,不用自旋等待锁;但是即使是上锁成功也要进出内核,使用系统调用,会消耗几百个指令
- Futex:结合上述二者的优点,在用户态尝试上锁,上锁成功既可以不用进入内核态,上锁失败就通过系统调用睡眠,唤醒也要通过系统调用
- 在用户空间尝试加锁通常通过原子操作来完成,如CAS(Compare-And-Swap)或者其他无锁的原子操作
原理
本质: futex直接通过虚拟地址(是一个用户空间地址,通常是一个32位的锁变量字段,0表示空闲,1表示有线程持有当前锁)来处理锁,而不是像以前一样创建一个需要初始化以及跨进程全局内核句柄
用户空间地址: futex用户空间地址可以通过mmap系统调用分配,而mmap分配的内存有三种:匿名内存、共享内存段、内存映射文件。匿名内存只能用在同一进程的线程之间使用,而共享内存段和内存映射文件可以在多个进程之间使用
主要功能:
- Futex原子操作:用户空间线程首先执行一些原子操作(如使用atomic函数或cmpxchg指令)来尝试获取锁
- 短路路径:如果锁在用户空间已经被释放,线程可以直接在用户空间完成同步,无需涉及内核
- 等待:如果一个线程尝试获取的锁已经被另一个线程占用,它可以调用futex_wait()进入休眠
- 唤醒:当锁被释放,另一个线程可以调用futex_wake()唤醒等待的线程
涉及的系统调用:
-
futex():允许用户空间线程等待或唤醒其他线程。
int futex(int *uddr, int op, int val, const struct timespec *timeout, int *uaddr2, int val2);
- uaddr:用户空间地址,表示锁的位置
- val:如果是FUTEX_WAIT,表示原子性地检查uaddr中的值是否为val,如果是则让进程睡眠,否则返回错误;如果是FUTEX_WAKE,表示最多唤醒val个等待在uaddr上的进程
- op:操作类型:
- FUTEX_WAIT:等待操作,线程会阻塞,直到指定的内存地址值发生变化
- FUTEX_WAKE:唤醒操作,唤醒一个或多个等待该内存地址的线程
- timeout:可选的等待时间,如果为空,那么调用会无限期睡眠。timeout默认会根据CLOCK_MONOTONIC时钟来计算,从linux4.5开始,可以再futex_op上指定FUTEX_CLOCK_REALTIME来选择CLOCK_REALTIME时钟。
- uaddr2和val2在某些情况下用于双重条件等待
-
futex_wait()和futex_wake()是更高级的封装,用于在应用程序中更方便地实现等待和唤醒操作
实际应用
- pthread_mutex:在linux中,pthread_mutex是基于futex实现的。
- 内存屏障和自旋锁:在一些高效的同步机制中,futex也常常与自旋锁、内存屏障等技术结合使用
代码示例
代码仓库: 1037827920/Futex-Example-Program
futex本身的使用
Rust示例程序:(需要使用到libc crate)
use libc::{syscall, SYS_futex, FUTEX_WAIT, FUTEX_WAKE};
use std::{panic,sync::{atomic::{AtomicU32, Ordering},Arc,},thread,time::Duration,
};const UNLOCKED: u32 = 0;
const LOCKED: u32 = 1;macro_rules! futex_status {($val:expr) => {if $val == 0 {"UNLOCKED"} else {"LOCKED"}};
}fn main() {test_futex();
}fn futex_wait(futex: &AtomicU32, thread: &str) {loop {// 如果当前futex没有被其他线程持有if futex.compare_exchange(UNLOCKED, LOCKED, Ordering::SeqCst, Ordering::SeqCst).is_ok(){// 加锁后直接返回,这样就不用执行系统调用,减少一定开销println!("线程{thread}上锁成功, futex状态: {}",futex_status!(futex.load(Ordering::SeqCst)));return;}// 线程进入等待状态println!("线程{thread}正在等待futex");let ret = unsafe {syscall(SYS_futex,futex as *const AtomicU32 as *mut u32,FUTEX_WAIT,futex.load(Ordering::SeqCst),0,0,0,)};if ret == -1 {panic!("futex_wait系统调用执行失败");}}
}fn futex_wake(futex: &AtomicU32, thread: &str) {let ret = unsafe {syscall(SYS_futex,futex as *const AtomicU32 as *mut u32,FUTEX_WAKE,1,0,0,0,)};if ret == -1 {panic!("futex_wake系统调用执行失败");}println!("线程{thread}释放锁");futex.store(UNLOCKED, Ordering::SeqCst);
}/// 测试基本的futex使用
fn test_futex() {// futex用户空间地址let futex = Arc::new(AtomicU32::new(0));let futex_clone1 = futex.clone();let futex_clone2 = futex.clone();// 线程1let thread1 = thread::spawn(move || {// 尝试获取锁futex_wait(&futex_clone1, "1");// 执行具体的业务逻辑thread::sleep(Duration::from_secs(5));// 释放锁futex_wake(&futex_clone1, "1");});// 线程2let thread2 = thread::spawn(move || {// 尝试获取锁futex_wait(&futex_clone2, "2");// 执行具体的业务逻辑thread::sleep(Duration::from_secs(5));// 释放锁futex_wake(&futex_clone2, "2");});thread1.join().unwrap();thread2.join().unwrap();
}
C示例程序:
#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <linux/futex.h>#define UNLOCKED 0
#define LOCKED 1
#define FUTEX_STATUS(val) val == 0 ? "UNLOCKED" : "LOCKED"// 定义一个结构体来封装参数
typedef struct {atomic_uint* futex;int thread;
} thread_args;void* futex_wait(atomic_uint* futex, int thread) {while (1) {// 如果当前futex没有其他线程持有int expected = UNLOCKED;if (atomic_compare_exchange_strong(futex, &expected, LOCKED)) {// 加锁后直接返回printf("线程%d上锁成功. futex状态: %s\n", thread, FUTEX_STATUS(*futex));return NULL;}// 线程进入等待状态printf("线程%d正在等待futex\n", thread);long ret = syscall(SYS_futex, (unsigned*)futex, FUTEX_WAIT, *futex, 0, 0);if (ret == -1) {perror("futex_wait系统调用执行失败\n");return NULL;}}
}void* futex_wake(atomic_uint* futex, int thread) {long ret = syscall(SYS_futex, (unsigned*)futex, FUTEX_WAKE, 1, 0, 0, 0);if (ret == -1) {perror("futex_wake系统调用执行失败\n");return NULL;}atomic_store(futex, UNLOCKED);printf("线程%d释放锁\n", thread);return NULL;
}void* thread_task(void* arg) {thread_args* args = (thread_args*)arg;// futex用户空间地址atomic_uint* futex = args->futex;// 线程号int thread = args->thread;// 尝试获取锁futex_wait(futex, thread);// 执行具体的业务逻辑sleep(5);// 释放锁futex_wake(futex, thread);return NULL;
}int main() {// 线程句柄pthread_t t1, t2;// futex用户空间地址atomic_uint futex = 0;thread_args args1 = { &futex, 1 };thread_args args2 = { &futex, 2 };// 创建两个线程同时递增cntpthread_create(&t1, NULL, thread_task, (void*)&args1);pthread_create(&t2, NULL, thread_task, (void*)&args2);// 等待线程结束pthread_join(t1, NULL);pthread_join(t2, NULL);return 0;
}
运行结果:
pthread_mutex的使用
C示例程序:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>// 共享计数器
int shread_cnt = 0;
// 互斥锁
pthread_mutex_t cnt_mutex = PTHREAD_MUTEX_INITIALIZER;// 线程执行的任务
void* thread_task(void* arg) {// 线程IDlong tid = (long)arg;// 循环5次,每次增加计数器for (int i = 0; i < 5; ++i) {// 加锁pthread_mutex_lock(&cnt_mutex);// 临界区:修改共享资源int tmp = shread_cnt;// 模拟一些处理时间usleep(100);shread_cnt = tmp + 1;printf("线程 %ld: 计数器值 = %d\n", tid, shread_cnt);// 解锁pthread_mutex_unlock(&cnt_mutex);// 模拟一些处理时间usleep(200);}return NULL;
}int main() {// 定义线程句柄数组pthread_t threads[3];// 创建3个线程for (long i = 0;i < 3; ++i) {int ret = pthread_create(&threads[i], NULL, thread_task, (void*)i);if (ret != 0) {perror("线程创建失败");return 1;}}// 等待所有线程完成for (int i = 0; i < 3; ++i) {pthread_join(threads[i], NULL);}// 打印最终计数器值printf("最终计数器值 = %d\n", shread_cnt);// 销毁互斥锁pthread_mutex_destroy(&cnt_mutex);return 0;
}
原理浅析:
pthread_mutex_lock实际上会执行__pthread_mutex_lock ,然后这个函数实现里面会调用LLL_MUTEX_LOCK宏,这个宏会调用__lll_lock宏,在这个宏里面就会先尝试在用户态进行上锁(也就是通过CAS的原子操作进行上锁),然后上锁失败再调用__lll_lock_wait(或_\lll_lock_wait_private),这个函数在追踪下去就会执行futex_wait系统调用。这个流程就跟上面futex本身的使用差不多了。
想要看更详细的讲解可以看这个博客:pthread_mutex_lock实现 - pslydhh
参考
- futex技术分享
- 2022年南大操作系统课程——并发控制:互斥
- pthread_mutex_lock实现 - pslydhh