线程同步
说明:接上一篇,注意分享线程同步的必要性和线程同步的方法。
测试代码下载:https://github.com/EkeSu/C-Thread-synchronization-C-.git
一、什么是线程同步:
在同一时间只允许一个线程访问资源的情况称为线程同步。
二、为什么需要线程同步:
- 避免竞争条件;
- 确保线程安全;(如果两个线程同时访问一个资源并对那个资源做修改,就不安全了)
现在的计算机变得越来越多核,每一个CPU可以独立工作,但是对于内存和外部资源、数据库的访问却可能因为不同线程的访问使数据产生异常,常见的例子就是银行的转账的例子不再赘述。
三、线程同步的方法:
- 同步代码中重要的部分;
- 使对象不可改变;
- 使用线程安全包装器;
注意:局部变量、方法参数和返回值是放在堆栈中的,本身是线程安全的。
四、线程不安全的演示:
背景:在数据库的user_blance表插入两条数据,两人的balance值都为1000.00,整个user_balance表的balance总值为2000.00
static string connectionStr = "Server=127.0.0.1;Port=3306;Stmt=;Database=exe_dev; User=root;Password=123456";public static void UnSafeThread() {Thread ThreadOne = new Thread(new ThreadStart(DrawMoney));ThreadOne.Name = "A001";Thread ThreadTwo = new Thread(new ThreadStart(DrawMoney));ThreadTwo.Name = "A002";ThreadOne.Start();ThreadTwo.Start();}private static void DoDrawMoney(){Random random = new Random();int money = random.Next(100);string userId = Thread.CurrentThread.Name;string selectSql = "select balance from user_balance where user_id=@UserId";string updateSql = "update user_balance set balance=@Balance+@Money where user_id=@UserId";string updateSql2 = "update user_balance set balance=@Balance-@Money where user_id<>@UserId";using (MySqlConnection conn= MySqlConnectionHelper.OpenConnection(connectionStr)){var balance = conn.ExecuteScalar(selectSql, new { UserId = userId });if (balance != null){conn.Execute(updateSql, new { Money = money, Balance=balance, UserId = userId });conn.Execute(updateSql2, new { Money = money, Balance = balance, UserId = userId });}}}private static void DrawMoney() {for (int i = 0; i < 100; i++){DoDrawMoney();}}
运行结果:
程序中有三条线程在跑:两条支线程,一条主线程,主线程负责统计钱的总数,两条支线程模拟两个人赚钱,赚过来赚过去,哈哈哈,依据查询成果可以看到,钱的总数原本是2000.00,但是之后开始减少。当然上面的异常也可以通过加事务解决,或者改变sql的实现方式balance=balance+money,不过这个不是我们讨论的重点,不展开。
五、线程同步:
1、MethodImplAttribute:同步方法
- 对象:方法。
- 使用方式:放在方法上,作为方法的属性
MethodImplAttribute是一个属性,它用来告诉CLR方法是如何实现的,MethodImplAttribute的一个构造函数把MethodImplOptions的枚举值作为参数,MethodImplOptions的枚举值Synchronized告诉CLR,这个方法该一次性只能在一个线程上执行。 静态方法在类型上锁定,而实例方法在实例上锁定。 只有一个线程可在任意实例函数中执行,且只有一个线程可在任意类的静态函数中执行。
使用方式:在需要同步的方法上添加属性
[MethodImpl(MethodImplOptions.Synchronized)]
[MethodImpl(MethodImplOptions.Synchronized)]private static void DoDrawMoney(){Random random = new Random();int money = random.Next(100);string userId = Thread.CurrentThread.Name;string selectSql = "select balance from user_balance where user_id=@UserId";string updateSql = "update user_balance set balance=@Balance+@Money where user_id=@UserId";string updateSql2 = "update user_balance set balance=balance-@Money where user_id<>@UserId";using (MySqlConnection conn= MySqlConnectionHelper.OpenConnection(connectionStr)){var balance = conn.ExecuteScalar(selectSql, new { UserId = userId });if (balance != null){conn.Execute(updateSql, new { Money = money, Balance=balance, UserId = userId });conn.Execute(updateSql2, new { Money = money, Balance = balance, UserId = userId });}}}
2、SynchronizationAttribute 同步方法----同步上下文:
- 对象:非静态类。
- 使用方式:放在非静态类上,作为非静态类的属性,同时非静态类需要继承【ContextBoundObject】
代码演示(其余部分和不安全的演示代码完全一样):
[Synchronization]class ThreadTestForSynchronization : ContextBoundObject{
上下文是一组属性或使用规则,这组属性或使用规则对执行时的相关对象都是通用的。我的理解是这些组成了程序运行的环境,我们使用【Synchronization】来在上下文中添加规则【ThreadTestForSynchronization】类是需要线程同步的,所以程序在运行的时候就是线程同步的。
注意:当前我使用的环境是VS2017, .Net Framework4.61,C#版本应该是6.0,SynchronizationAttribute属性和更早版本是发生了很大变化,更早版本的构造函数需要填入一个枚举值。
3、使用Monitor同步----同步重要代码块:
- 对象:代码块;
- 使用方式:使用Monitor.Enter(object obj),Monitor.Exit(object obj);这两个是配套使用的,两个方法之间是需要同步的重要代码块,Enter放入的对象和Exit释放的对象应该一致。Enter使对象获得锁,Exit释放锁。
- 注意事项:这两个方法的参数是object,所以不能锁住值类型参数,因为会导致发生装箱,装箱之后的数值是一样的,但是已经不是同一个东西了。
代码演示(下面的例子锁住number会报错,因为发生装箱):
private int number;public void MonitorThread(){Thread ThreadOne = new Thread(new ThreadStart(PrintNumber));ThreadOne.Name = "梁山伯";Thread ThreadTwo = new Thread(new ThreadStart(PrintNumber));ThreadTwo.Name = "祝英台";ThreadOne.Start();ThreadTwo.Start();}private void PrintNumber(){Console.WriteLine(string.Format("Thread {0} enter Method:",Thread.CurrentThread.Name));Monitor.Enter(this);for (int i = 0; i < 10; i++){Console.WriteLine(string.Format("Thread {0} increase number value:{1}", Thread.CurrentThread.Name, number++));}Monitor.Exit(this);Console.WriteLine(string.Format("Thread {0} exit Method:", Thread.CurrentThread.Name));}
输出结果会是很工整的,占篇幅就不贴出来了。
Monitor.Enter()方法是去争取锁,还有另外一个方法是可以去争取锁并且进行等待的,就是TryEnter()方法,该方法可以有一个返回值,返回是否成功获得锁。同时该方法有三个重载:
bool TryEnter(object obj); ---- 有返回值不等待
bool TryEnter(object obj, int millisecondsTimeout); ----- 有返回值且会等待锁
void TryEnter(object obj, TimeSpan timeout, ref bool lockTaken); ----- 等待锁,成功则返回true到lockTaken.
下面看一下被改编的代码:
private void PrintNumber(){Console.WriteLine(string.Format("Thread {0} enter Method:", Thread.CurrentThread.Name));bool isLock = Monitor.TryEnter(this,1000);Console.WriteLine(string.Format("Thread {0} Get Lock {1}", Thread.CurrentThread.Name, isLock));for (int i = 0; i < 10; i++){Thread.Sleep(100);Console.WriteLine(string.Format("Thread {0} increase number value:{1}", Thread.CurrentThread.Name, number++));}if (isLock){Monitor.Exit(this);}Console.WriteLine(string.Format("Thread {0} exit Method:", Thread.CurrentThread.Name));}
演示结果:
Thread 梁山伯 enter Method: Thread 梁山伯 Get Lock True Thread 祝英台 enter Method: Thread 梁山伯 increase number value:0 Thread 梁山伯 increase number value:1 Thread 梁山伯 increase number value:2 Thread 梁山伯 increase number value:3 Thread 梁山伯 increase number value:4 Thread 梁山伯 increase number value:5 Thread 梁山伯 increase number value:6 Thread 梁山伯 increase number value:7 Thread 梁山伯 increase number value:8 Thread 祝英台 Get Lock False Thread 梁山伯 increase number value:9 Thread 梁山伯 exit Method: Thread 祝英台 increase number value:10 Thread 祝英台 increase number value:11 Thread 祝英台 increase number value:12 Thread 祝英台 increase number value:13 Thread 祝英台 increase number value:14 Thread 祝英台 increase number value:15 Thread 祝英台 increase number value:16 Thread 祝英台 increase number value:17 Thread 祝英台 increase number value:18 Thread 祝英台 increase number value:19 Thread 祝英台 exit Method:
分析:线程梁山伯先进入方法PrintNumber,它先获得锁,并且去执行函数,线程祝英台等待的时间为1000毫秒,线程祝英台每次循环都要睡眠100毫秒,循环10次需要睡眠1000毫秒,所以线程祝英台等待1000毫秒是无法获得锁的,那么线程祝英台什么时候执行呢,就是趁着线程梁山伯睡眠的时候执行,所以等待1000毫秒之后,线程祝英台趁梁山伯睡眠的时候执行了。如果我们把祝英台等待锁的时间延长到2000毫秒,那么她就可以等待锁成功。需要注意的是,等待时间是1000毫秒的时候祝英台是没有获得锁的,所以不能执行Monitor.Exit(this)操作,没有获得锁自然无法获得锁,所以需要加一个 if 的判断。
笔者理解:线程梁山伯睡眠的时候祝英台开始执行了,但是并没有获得锁,就进入了被保护的代码块,说明,线程睡眠的时候是会去释放锁,睡眠之后是或重新获得锁的,这里面应该有复杂的机制处理,值得研究。
4、使用Monitor同步重要代码块,并使用Wait,Pulse方法做线程间的交互-----等待和发出脉冲机制:
- 对象:代码块;
- 使用方式:Wait,Pulse在Enter和Exit方法之间调用,Wait方法:当在对象上调用Wait方法时,正在访问被Monitor对象的线程会释放锁并将进入等待状态(包括调用它的线程自己);Pulse方法:发出一个信号通知正在等待的线程可以继续执行了,即等待的线程可以重新竞争获得锁继续执行。
- 个人对获得锁的理解:锁就是权力,有锁的人就有权力执行程序
演示程序:
class ThreadTestForMonitorWait{private int result;private LockData lockData;public ThreadTestForMonitorWait(){this.lockData = new LockData();}public void MonitorWaitThread(){Thread ThreadOne = new Thread(new ThreadStart(WaitFirstThread));ThreadOne.Name = "WaitFirstThread";Thread ThreadTwo = new Thread(new ThreadStart(PulseFirstThread));ThreadTwo.Name = "PulseFirstThread";ThreadOne.Start();ThreadTwo.Start();}private void WaitFirstThread(){Monitor.Enter(lockData);Console.WriteLine(string.Format("Thread {0} enter MonitorWaitThread",Thread.CurrentThread.Name));for (int i = 0; i < 5; i++){Monitor.Wait(lockData);Console.WriteLine(string.Format("Thread {0} increase number value {1}", Thread.CurrentThread.Name, result++));Monitor.Pulse(lockData);}Console.WriteLine(string.Format("Thread {0} exit MonitorWaitThread", Thread.CurrentThread.Name));Monitor.Exit(lockData);}private void PulseFirstThread(){Monitor.Enter(lockData);Console.WriteLine(string.Format("Thread {0} enter MonitorWaitThread", Thread.CurrentThread.Name));for (int i = 0; i < 5; i++){Monitor.Pulse(lockData);Console.WriteLine(string.Format("Thread {0} increase number value {1}", Thread.CurrentThread.Name, result++));Monitor.Wait(lockData);}Console.WriteLine(string.Format("Thread {0} exit MonitorWaitThread", Thread.CurrentThread.Name));Monitor.Exit(lockData);}}public class LockData { }
运行结果:
Thread WaitFirstThread enter MonitorWaitThread Thread PulseFirstThread enter MonitorWaitThread Thread PulseFirstThread increase number value 0 Thread WaitFirstThread increase number value 1 Thread PulseFirstThread increase number value 2 Thread WaitFirstThread increase number value 3 Thread PulseFirstThread increase number value 4 Thread WaitFirstThread increase number value 5 Thread PulseFirstThread increase number value 6 Thread WaitFirstThread increase number value 7 Thread PulseFirstThread increase number value 8 Thread WaitFirstThread increase number value 9 Thread WaitFirstThread exit MonitorWaitThread Thread PulseFirstThread exit MonitorWaitThread
可见:运行结果是很工整的,在上面的程序中,WaitFirstThread 方法先被调用, WaitFirstThread 进入循环会调用Wait方法,这个时候他会失去锁,因此无法继续执行程序,而后方法 PulseFirstThread 被调用,它所在的线程获得锁,不管三七二十一先来发出一个脉冲通知正在被锁定的线程:兄弟你可以继续竞争获得锁了,在一次循环的末尾它又调用了Wait方法,使自己失去锁,其他的线程可以竞争得到锁,所以接着 WaitFirstThread 所在的线程就获得了锁,整个过程就变成两个线程之间锁给来给去,非常恩爱。
问题:其实这个程序是有问题的,就是必须要线程WaitFirstThread先执行,先释放锁,否则若是PulseFirstThread先执行,它先通知其他线程可以竞争锁了,之后执行一次循环把自己的锁释放掉,而线程竞争得到锁之后干的第一件事就是释放锁,这样就大家都没有锁了,死锁就产生了。有兴趣可以试试看。
5.使用lock关键字同步重要代码块 ---- 一块封装了Monitor的语法糖。
直接代码演示(改一下Monitor的PrintNumber方法的写法就行):
private void PrintNumber(){Console.WriteLine(string.Format("Thread {0} enter Method:", Thread.CurrentThread.Name));lock (this){for (int i = 0; i < 10; i++){Console.WriteLine(string.Format("Thread {0} increase number value:{1}", Thread.CurrentThread.Name, number++));}}Console.WriteLine(string.Format("Thread {0} exit Method:", Thread.CurrentThread.Name));}
笔者觉得,比较好用的自然是lock,其实我几乎没有见过用Monitor的,但是一定要学会使用Monitor。
6.使用ReaderWriterLock锁定文件:
我们一定都遇到过要写入一个文件返回该文件已经被其他程序锁定的错误,这个就是无法获得写锁,增加这个锁定可以防止这样的错误发生,使获得文件的写锁更有序。
对象:文件;
使用方式:
AcquireWriterLock(100); 尝试获取文件写锁;
ReleaseWriterLock(); 释放文件写锁。
代码演示(运行时可以把下面绿色的代码注释掉以做比较会比较直观):
private void WriterFileLock(){try{ rwl.AcquireWriterLock(100);using (StreamWriter writer = new StreamWriter("@ReadWriterLokTest.text")){for (int i = 0; i < 1000; i++){writer.WriteLine(i);}}Console.WriteLine("File Writer Finish");}catch (Exception ex){Console.WriteLine(ex.Message);}finally{ rwl.ReleaseWriterLock();}}
7、其他同步方式:
(图片来源:C#线程参考手册-清华大学出版社)
7.1 使用ManualResetEvent 同步
先看代码演示:
static ManualResetEvent manualResetEvent = new ManualResetEvent(false);static Stopwatch stopwatch = new Stopwatch();public static void ManualResetEventThread(){stopwatch.Start();var success = manualResetEvent.WaitOne(1000, false);Console.WriteLine(Thread.CurrentThread.GetHashCode()+"获取信号:" + success + ",时间:"+ stopwatch.Elapsed);manualResetEvent.Set();success = manualResetEvent.WaitOne(1000, false);Console.WriteLine(Thread.CurrentThread.GetHashCode() + "获取信号:" + success + ",时间:" + stopwatch.Elapsed);}
输出结果:
1获取信号:False,时间:00:00:01.0425731 1获取信号:True,时间:00:00:01.0655502
ManualResetEvent 有两个方法:
Set():使状态变成有信号;
ReSet():使状态变成无信号。
其实ManualResetEvent 只是很简单的在不同线程同步了一个信号而已,并不会阻碍线程的向下继续执行,而WaitOne方法可以设置等待线程获取信号的时间,这个方法可以延缓线程的执行(一般不要这么玩)。
在使用上,可以根据WaitOne获取的状态来判断当前线程要不要继续执行。
另外需要介绍一下WaitAll和WaitAny的方法,首先是WaitAll:
我们先定义了三个方法:
private static void PrintOne(){manualResetEvent1.Set();}private static void PrintTwo(){manualResetEvent2.Set();}private static void PrintThree(){manualResetEvent3.Set();}
static ManualResetEvent manualResetEvent1 = new ManualResetEvent(false);
static ManualResetEvent manualResetEvent2 = new ManualResetEvent(false);
static ManualResetEvent manualResetEvent3 = new ManualResetEvent(false);
看WaitAll的测试:
public static void WaitAllTest(){new Thread(new ThreadStart(PrintOne)).Start();new Thread(new ThreadStart(PrintTwo)).Start();new Thread(new ThreadStart(PrintThree)).Start();var isOk = ManualResetEvent.WaitAll(new WaitHandle[] { manualResetEvent1, manualResetEvent2, manualResetEvent3 }, 5000, false);if (isOk){Console.WriteLine("Oh,My God "+isOk);}}
输出结果:Oh,My God True。
WaitAll是WaitHandle的静态方法,它接收WaitHandle数组,当所有的WaitHandle的信号都返回true时,WaitAll才返回True.
而另外一个方法 WaitAny()方法与WaitAll的区别是,WaitAny当数组中一个WaitHandle获得信号的时候,就会返回,返回值为数组中有信号的WaitHandle的索引,当全部没有返回信号时,返回的是System.Threading.WaitHandle.WaitTimeout:
演示代码如下:
var hasSignalIndex = ManualResetEvent.WaitAny(new WaitHandle[] { manualResetEvent1, manualResetEvent2, manualResetEvent3 }, 2000, false);
if (hasSignalIndex == System.Threading.WaitHandle.WaitTimeout)
{
Console.WriteLine("Oh, fail");
}
else
{
Console.WriteLine("Oh,My God " + hasSignalIndex);
}
7.2 使用AutoResetEvent同步 --- 与ManualResetEvent 类似,不做演示。
7.3 使用Mutex线程同步
对象:线程
使用方式:一次只有一个线程能够获得锁,只有获得锁的线程释放了锁其他的线程才能够获得锁。
代码演示:
static Mutex myMutex;public static void MutexThread(){myMutex = new Mutex(true, "myMutex");new Thread(new ThreadStart(PrintNo)).Start();for (int i = 6; i < 10; i++){Console.WriteLine(i);}myMutex.ReleaseMutex();}private static void PrintNo(){myMutex.WaitOne();for (int i = 0; i < 6; i++){Console.WriteLine(i);}}
输出结果:
6 7 8 9 0 1 2 3 4 5
在上面的演示中,主线程先获得锁,接着启动线程,但是线程是没有锁的,所以先执行主线程的打印循环,当主线程释放锁的时候,线程才获得锁,这时候线程才执行。Mutex.WaitOne也有其他的重载方法,可自行探索。
7.4 使用InterLocked同步Int类型变量:
对象:Int类型变量;
使用方式:InterLocked.Increment(ref a)等方法;
代码演示:
private static int a = 0;public static void InterLockThread(){for (int i = 0; i < 100; i++){new Thread(new ThreadStart(IncreaseInt)).Start();}for (int i = 0; i < 100; i++){new Thread(new ThreadStart(IncreaseInt)).Start();}}private static void IncreaseInt(){for (int i = 0; i < 1000; i++){//a++;Interlocked.Increment(ref a);}Console.WriteLine(string.Format("Thread:{0} value: {1}", Thread.CurrentThread.GetHashCode(), a));}
在上面的 IncreaseInt 方法中有 a++ 和 Interlocked.Increment(ref a)两种方式,其中 Interlocked.Increment(ref a) 是原子操作。可能有人会想a++也才一句语句,怎么会不是原子操作,但是实际上我们的程序最后都是编译成了计算机能够认得的计算机指令,一句a++是有很多指令组成的。
结果演示:可以自己试着把a++和 Interlocked.Increment(ref a)切换注释一下运行,结果很明显
7.5 使用ThreadStatic属性为类的静态变量创建副本,使得每一个线程的变量独立
对象:类静态变量;
使用方式:把要设置多个副本的类静态变量标记属性【ThreadStatic】
效果:标记了ThreadStatic的类静态变量各个线程独立不会因为其他线程堆值的改变而共享。
代码演示:
[ThreadStatic]static int x;static int y;public static void StaticAttributeThread(){Task.Run(() => { IncreaseInt(); });Task.Run(() => { IncreaseInt(); });}private static void IncreaseInt(){for (int i = 0; i < 5; i++){Console.WriteLine(string.Format("current thread {0},x={1}, y={2}",Thread.CurrentThread.GetHashCode(),x++,y++));}}
输出结果:
current thread 3,x=0, y=0 current thread 3,x=1, y=2 current thread 3,x=2, y=3 current thread 3,x=3, y=4 current thread 3,x=4, y=5 current thread 4,x=0, y=1 current thread 4,x=1, y=6 current thread 4,x=2, y=7 current thread 4,x=3, y=8 current thread 4,x=4, y=9
可以看到,在不同的线程,y值是共享了修改结果的,而x是没有的。