多线程并发
当一段代码有可能被不止一个线程同时访问时,且存在共享资源(变量、文件句柄等),可能出现并发冲突。发生并发冲突时如果不加锁,程序的行为是不可预测的。而加锁本身又是一件麻烦事,弄不好会出现死锁,死锁时程序卡在那既没有异常也没有日志,找问题都无从下手。
C#中通常会使用 lock 来加锁,这里应严格避免锁静态对象(如 lock(type)),避免 lock(this),这种锁的粒度都太大,容易出现死锁。
多线程程序不加锁的方法
第一,bool 类型是线程安全的,不需要锁
第二,整形, DateTime 类型可以用Interlocked
long lastTicks= DateTime.Now.Ticks;private void multiThreadMethod()
{var l = Interlocked.Read(ref lastTicks);var lastTime = new DateTime(l);//读取上一时刻.//do work...Interlocked.Exchange(ref lastTicks, DateTime.Now.Ticks);//更新上一时刻.
}
第三,多使用ConcurrentQuere, ConcurrentDictionary
一个典型的数采驱动程序
数采驱动通常需要做 接收-处理-转发 三步,为避免阻塞接收,接收后应该入队,由单独的线程完成处理、转发工作。可以设计两个类Driver, MessageHandler,Driver接受数据,MessageHandler入队处理后返回给Driver,再有Driver发送出去。
Driver代码要关键要保持与数据源的链接,断开后能够自动重连,这部分大概率要有bool 变量在表示链接状态,有lastConnect来记录上次连接成功/数据更新的时间,以便在连接断开或数据不动多长时间后自动重连,收到到数据后入队到MessageHandler处理,并把MessageHandler返回的数据发送出去。
下面的MessageHandler代码中有三个关键点,
- 用ConcurrentQuere,出队/入队时避免使用lock;
- Thread的标准写法,即有一个quitFlag 布尔变量来控制线程的退出,因为线程只有在无事可做时才能退出,不要试图从外部让线程退出;
- Close 线程退出后,还要把队列中未处理的部分处理完。
public class MessageHandler
{ConcurrentQueue<string> queue;Thread thread;bool quitFlag = false;const int BATCH_SIZE = 100;//每次连续发送数据的数量.public MessageHandler(){queue = new ConcurrentQueue<string>();}public event Action<List<MyDataType>> OnDataArrive; //有新数据.public void Enqueue(string msg){queue.Enqueue(msg);}public void Open(){if (thread == null){quitFlag = false;thread = new Thread(doWork);thread.Start();}}private void doWork(){while (!quitFlag){int i = 0;List<MyDataType> datas = new List<MyDataType>();while (queue.Count > 0 && i < BATCH_SIZE){if (queue.TryDequeue(out string msg)){doWorkBody(msg, datas);i++;}}if (datas.Count > 0){OnDataArrive?.Invoke(datas);}Thread.Sleep(50);}}private void doWorkBody(string msg, List<MyDataType> datas){try{var dataPackage = JsonConvert.DeserializeObject<DataPackage>(msg);foreach (var dataItem in dataPackage.Data){datas.Add(new MyDataType(){//从MQTT payload中得到数据});}}catch(Exception ex){Logger.Error(ex, $"解析 {msg} 出错", logSource);}}public void Close(){//退出线程quitFlag = true;if (thread != null){thread.Join();thread = null;}List<MyDataType> datas = new List<MyDataType>();while (queue.Count > 0){if (queue.TryDequeue(out string msg)){doWorkBody(msg, datas);}}//处理最后一批数据if (datas.Count > 0){OnDataArrive?.Invoke(datas);}}
}