给分支预测器的建议
原始代码
以下代码用于实现多线程中只调用一次的效果,这里的if大多数情况下都是false,即已经被调用过了。这里是否被调用过用的是一个`std::atomic<uint32_t>`的原子变量
template <typename Callable, typename... Args>
void call_once(absl::once_flag& flag, Callable&& fn, Args&&... args) {std::atomic<uint32_t>* once = base_internal::ControlWord(&flag);uint32_t s = once->load(std::memory_order_acquire);if (ABSL_PREDICT_FALSE(s != base_internal::kOnceDone)) {base_internal::CallOnceImpl(once, base_internal::SCHEDULE_COOPERATIVE_AND_KERNEL,std::forward<Callable>(fn), std::forward<Args>(args)...);}
}
用于做分支预测建议的宏
// Recommendation: Modern CPUs dynamically predict branch execution paths,
// typically with accuracy greater than 97%. As a result, annotating every
// branch in a codebase is likely counterproductive; however, annotating
// specific branches that are both hot and consistently mispredicted is likely
// to yield performance improvements.
#if ABSL_HAVE_BUILTIN(__builtin_expect) || \(defined(__GNUC__) && !defined(__clang__))
#define ABSL_PREDICT_FALSE(x) (__builtin_expect(false || (x), false))
#define ABSL_PREDICT_TRUE(x) (__builtin_expect(false || (x), true))
#else
#define ABSL_PREDICT_FALSE(x) (x)
#define ABSL_PREDICT_TRUE(x) (x)
#endif
解释
(__builtin_expect(false || (x), true))
是一个使用了 GCC 内置函数 __builtin_expect
的表达式。这个内置函数通常用于向编译器提供分支预测信息,以优化代码的执行。
__builtin_expect
函数的语法是:
__builtin_expect(EXPRESSION, EXPECTED_VALUE)
EXPRESSION
是一个表达式,可以是任何布尔表达式。EXPECTED_VALUE
是一个编译器期望表达式EXPRESSION
的结果为真或假的值。通常使用true
或false
。
__builtin_expect
函数告诉编译器表达式 EXPRESSION
的结果很可能是 EXPECTED_VALUE
,以便编译器对代码进行优化。这种优化涉及到对条件分支的预测,使得最有可能的分支能够更快地执行,提高代码的性能。
在你的表达式中,(__builtin_expect(false || (x), true))
使用了 __builtin_expect
函数,期望 (false || (x))
的结果为真。这样的编码风格通常用于告诉编译器,(false || (x))
表达式中的 x
很可能为真,以便编译器在生成机器代码时进行相关的优化。
需要注意的是,__builtin_expect
是 GCC 提供的特定于编译器的内置函数,因此它在其他编译器或开发环境中可能不可用。如果你的代码需要在其他编译器中编译,可能需要进行适当的修改或条件编译。
其他
讲讲这个call_once在面对多线程竞争时的实现原理。
- 第一个进入的线程可以执行
- 后续进入的线程需要等待
有了这个认识,剩下的就是看原子变量的改变过程和等待过程了。
template <typename Callable, typename... Args>
ABSL_ATTRIBUTE_NOINLINE
void CallOnceImpl(std::atomic<uint32_t>* control,base_internal::SchedulingMode scheduling_mode, Callable&& fn,Args&&... args) {static const base_internal::SpinLockWaitTransition trans[] = {{kOnceInit, kOnceRunning, true},{kOnceRunning, kOnceWaiter, false},{kOnceDone, kOnceDone, true}};// Must do this before potentially modifying control word's state.base_internal::SchedulingHelper maybe_disable_scheduling(scheduling_mode);// Short circuit the simplest case to avoid procedure call overhead.// The base_internal::SpinLockWait() call returns either kOnceInit or// kOnceDone. If it returns kOnceDone, it must have loaded the control word// with std::memory_order_acquire and seen a value of kOnceDone.uint32_t old_control = kOnceInit;if (control->compare_exchange_strong(old_control, kOnceRunning,std::memory_order_relaxed) ||base_internal::SpinLockWait(control, ABSL_ARRAYSIZE(trans), trans,scheduling_mode) == kOnceInit) {base_internal::invoke(std::forward<Callable>(fn),std::forward<Args>(args)...);old_control =control->exchange(base_internal::kOnceDone, std::memory_order_release);if (old_control == base_internal::kOnceWaiter) {base_internal::SpinLockWake(control, true);}} // else *control is already kOnceDone
}
// See spinlock_wait.h for spec.
uint32_t SpinLockWait(std::atomic<uint32_t> *w, int n,const SpinLockWaitTransition trans[],base_internal::SchedulingMode scheduling_mode) {int loop = 0;for (;;) {uint32_t v = w->load(std::memory_order_acquire);int i;for (i = 0; i != n && v != trans[i].from; i++) {}if (i == n) {SpinLockDelay(w, v, ++loop, scheduling_mode); // no matching transition} else if (trans[i].to == v || // null transitionw->compare_exchange_strong(v, trans[i].to,std::memory_order_acquire,std::memory_order_relaxed)) {if (trans[i].done) return v;}}
}
这里精彩的地方有两个,一个是多线程进入时候的状态机转换过程,即原子变量遵循的trans
数组。第二个是SpinLockDelay
在多个平台下的实现。
//posix linux
ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalSpinLockDelay)(std::atomic<uint32_t>* /* lock_word */, uint32_t /* value */, int loop,absl::base_internal::SchedulingMode /* mode */) {absl::base_internal::ErrnoSaver errno_saver;if (loop == 0) {} else if (loop == 1) {sched_yield();} else {struct timespec tm;tm.tv_sec = 0;tm.tv_nsec = absl::base_internal::SpinLockSuggestedDelayNS(loop);nanosleep(&tm, nullptr);}
}
//win32
void ABSL_INTERNAL_C_SYMBOL(AbslInternalSpinLockDelay)(std::atomic<uint32_t>* /* lock_word */, uint32_t /* value */, int loop,absl::base_internal::SchedulingMode /* mode */) {if (loop == 0) {} else if (loop == 1) {Sleep(0);} else {// SpinLockSuggestedDelayNS() always returns a positive integer, so this// static_cast is safe.Sleep(static_cast<DWORD>(absl::base_internal::SpinLockSuggestedDelayNS(loop) / 1000000));}
}
//sleep ms consideration
// Return a suggested delay in nanoseconds for iteration number "loop"
int SpinLockSuggestedDelayNS(int loop) {// Weak pseudo-random number generator to get some spread between threads// when many are spinning.uint64_t r = delay_rand.load(std::memory_order_relaxed);r = 0x5deece66dLL * r + 0xb; // numbers from nrand48()delay_rand.store(r, std::memory_order_relaxed);if (loop < 0 || loop > 32) { // limit loop to 0..32loop = 32;}const int kMinDelay = 128 << 10; // 128us// Double delay every 8 iterations, up to 16x (2ms).int delay = kMinDelay << (loop / 8);// Randomize in delay..2*delay range, for resulting 128us..4ms range.return delay | ((delay - 1) & static_cast<int>(r));
}
L1数据预取
abseil里面还定义了三个函数用于数据预取(prefetch)到本地缓存的函数。
数据预取是一种优化技术,通过提前将数据移动到CPU的缓存中,以便在数据被使用之前加速访问。这些函数的作用是将指定地址的数据预取到L1缓存中,以便在读取数据之前移动数据到缓存中。这样,当读取发生时,数据可能已经在缓存中,以提高访问速度。
下面是这些函数的简要说明:
-
void PrefetchToLocalCache(const void* addr)
: 将数据预取到L1缓存中,具有最高程度的时间局部性(temporal locality)。在可能的情况下,数据将预取到所有级别的缓存中。这个函数适用于具有长期重复访问的数据。 -
void PrefetchToLocalCacheNta(const void* addr)
: 与PrefetchToLocalCache
函数相同,但具有非时间局部性(non-temporal locality)。这意味着预取的数据不应该留在任何缓存层级中。这在数据只使用一次或短期使用的情况下很有用,例如对对象调用析构函数。 -
void PrefetchToLocalCacheForWrite(const void* addr)
: 将具有修改意图的数据预取到L1缓存中。这个函数类似于PrefetchToLocalCache
,但会预取带有“修改意图”的缓存行。通常包括在所有其他缓存层级中使该地址的缓存条目无效,并具有独占访问意图。这个函数用于在修改数据之前将数据预取到缓存中。
这些函数需要注意的是,不正确或滥用使用这些函数可能会降低性能。只有在经过充分的基准测试表明有改进时,才应使用这些函数。
ABSL_ATTRIBUTE_ALWAYS_INLINE inline void PrefetchToLocalCache(const void* addr) {_mm_prefetch(reinterpret_cast<const char*>(addr), _MM_HINT_T0);
}ABSL_ATTRIBUTE_ALWAYS_INLINE inline void PrefetchToLocalCacheNta(const void* addr) {_mm_prefetch(reinterpret_cast<const char*>(addr), _MM_HINT_NTA);
}ABSL_ATTRIBUTE_ALWAYS_INLINE inline void PrefetchToLocalCacheForWrite(const void* addr) {
#if defined(_MM_HINT_ET0)_mm_prefetch(reinterpret_cast<const char*>(addr), _MM_HINT_ET0);
#elif !defined(_MSC_VER) && defined(__x86_64__)// _MM_HINT_ET0 is not universally supported. As we commented further// up, PREFETCHW is recognized as a no-op on older Intel processors// and has been present on AMD processors since the K6-2. We have this// disabled for MSVC compilers as this miscompiles on older MSVC compilers.asm("prefetchw (%0)" : : "r"(addr));
#endif
}
编译器静态检查
#if ABSL_HAVE_ATTRIBUTE(guarded_by)
#define ABSL_GUARDED_BY(x) __attribute__((guarded_by(x)))
#else
#define ABSL_GUARDED_BY(x)
#endif
__attribute__((guarded_by(x)))
是一个GCC/Clang的扩展属性(attribute),用于指定一个互斥量(mutex)或锁(lock)来保护变量的访问。
这个属性的语法如下:
__attribute__((guarded_by(x)))
其中,x
是一个标识符,用于指定用于保护变量访问的互斥量或锁的名称。
该属性的作用是向编译器提供关于变量的额外信息,以帮助进行静态分析和检查多线程代码中的数据竞争问题。通过将 __attribute__((guarded_by(x)))
应用于变量,我们可以指示编译器该变量受特定互斥量的保护,从而在编译时进行检查。
例如,考虑以下示例:
#include <mutex>std::mutex mutex;
int shared_data __attribute__((guarded_by(mutex)));void foo()
{std::lock_guard<std::mutex> lock(mutex);// 访问 shared_datashared_data = 42;
}
在上面的示例中,shared_data
变量被 guarded_by
属性修饰,指示它受 mutex
互斥量的保护。这样,当在没有获取 mutex
互斥量的情况下访问 shared_data
时,编译器会发出警告或错误,以帮助检测潜在的数据竞争问题。
需要注意的是,__attribute__((guarded_by(x)))
是GCC/Clang的扩展属性,不是标准C++的一部分。因此,它在不同编译器之间可能具有不同的行为或不受支持。在使用该属性时,应注意编译器的兼容性和文档。