同步机制
Barrier(屏障同步)
比如我们数据库中有100w条数据需要导入excel,为了在数据库中加速load,我们需要开多个任务去跑,比如这里的4个task,要想load产品表,必须等4个task都跑完用户表才行,那么你有什么办法可以让task为了你两肋插刀呢?它就是Barrier。
如果我们的屏障设为4个task就认为已经满了的话,那么执行中先到的task必须等待后到的task,通知方式也就是barrier.SignalAndWait(),屏障中线程设置操作为new Barrier(4,(i)=>{})。
static Task[] tasks = new Task[4]; static Barrier barrier = null; static void Main(string[] args) { barrier = new Barrier(tasks.Length, (i) => { Console.WriteLine("**********************************************************"); Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber); Console.WriteLine("**********************************************************"); }); for(int j=0;j<tasks.Length;j++) { tasks[j] = Task.Factory.StartNew((obj) => { var single = Convert.ToInt32(obj); LoadUser(single); barrier.SignalAndWait(); LoadProduct(single); barrier.SignalAndWait(); LoadOrder(single); barrier.SignalAndWait(); },j); } Task.WaitAll(tasks); Console.WriteLine("指定数据库中所有数据已经加载完毕!"); Console.ReadKey(); } private static void LoadUser(int single) { Console.WriteLine("当前任务:{0}正在加载User部分数据!", single); } private static void LoadProduct(int single) { Console.WriteLine("当前任务:{0}正在加载Product部分数据!", single); } private static void LoadOrder(int single) { Console.WriteLine("当前任务:{0}正在加载Order部分数据!", single); }死锁问题
先前的例子我们也知道,屏障必须等待4个task通过SignalAndWait()来告知自己已经到达,当4个task全部达到后,我们可以通过
barrier.ParticipantsRemaining来获取task到达状态,那么如果有一个task久久不能到达那会是怎样的情景呢?
private static void LoadUser(int single) { Console.WriteLine("当前任务:{0}正在加载User部分数据!", single); //single=0:表示0号任务 //barrier.ParticipantsRemaining == 0:表示所有task到达屏障才会退出 // SpinWait.SpinUntil: 自旋锁,相当于死循环 if (single==0) { SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0); } }我们发现程序在加载User表的时候卡住了,出现了类似死循环,这句SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0)中的ParticipantsRemaining==0 永远也不能成立,导致task0永远都不能退出,然而barrier还在一直等待task0调用SignalAndWait来结束屏障。
超时机制
当我们coding的时候遇到了这种问题还是很纠结的,所以我们必须引入一种“超时机制”,如果在指定的时候内所有的参与者(task)都没有到达屏障的话,我们就需要取消这些参与者的后续执行,幸好SignalAndWait给我们提供了超时的重载,为了能够取消后续执行,我们还要采用CancellationToken机制。
static Task[] tasks = new Task[4]; static Barrier barrier = null; static void Main(string[] args) { CancellationTokenSource cts = new CancellationTokenSource(); CancellationToken ct = cts.Token; barrier = new Barrier(tasks.Length, (i) => { Console.WriteLine("**********************************************************"); Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber); Console.WriteLine("**********************************************************"); }); for(int j=0;j<tasks.Length;j++) { tasks[j] = Task.Factory.StartNew((obj) => { var single = Convert.ToInt32(obj); LoadUser(single); if( !barrier.SignalAndWait(2000)) { //抛出异常,取消后面加载的执行 throw new OperationCanceledException(string.Format("我是当前任务{0},我抛出异常了!", single), ct); } LoadProduct(single); barrier.SignalAndWait(); LoadOrder(single); barrier.SignalAndWait(); },j,ct); } //等待所有tasks 4s Task.WaitAll(tasks,4000); try { for(int i=0;i<tasks.Length;i++) { if(tasks[i].Status==TaskStatus.Faulted) { foreach(var single in tasks[i].Exception.InnerExceptions) { Console.WriteLine(single.Message); } } } barrier.Dispose(); } catch(AggregateException e) { Console.WriteLine("我是总异常:{0}", e.Message); } Console.WriteLine("指定数据库中所有数据已经加载完毕!"); Console.ReadKey(); } private static void LoadUser(int single) { Console.WriteLine("\n当前任务:{0}正在加载User部分数据!", single); //single=0:表示0号任务 //barrier.ParticipantsRemaining == 0:表示所有task到达屏障才会退出 // SpinWait.SpinUntil: 自旋锁,相当于死循环 if (single==0) { if (!SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0, 5000)) return; } Console.WriteLine("当前任务:{0}正在加载User数据完毕!", single); } private static void LoadProduct(int single) { Console.WriteLine("当前任务:{0}正在加载Product部分数据!", single); } private static void LoadOrder(int single) { Console.WriteLine("当前任务:{0}正在加载Order部分数据!", single); }spinLock(自旋锁)
static SpinLock slock = new SpinLock(false); static int sum1 = 0; static int sum2 = 0; static void Main(string[] args) { Task[] task = new Task[100]; for(int i=1;i<=100;i++) { task[i-1] = Task.Factory.StartNew((num) => { Add1((int)num); Add2((int)num); },i); } Task.WaitAll(task); Console.WriteLine("Add1数字总和:{0}", sum1); Console.WriteLine("Add1数字总和:{0}", sum2); Console.ReadKey(); } //无锁 private static void Add1(int num) { Thread.Sleep(100); sum1 += num; } //自旋锁 private static void Add2(int num) { bool lockTaken = false; Thread.Sleep(100); try { slock.Enter(ref lockTaken); sum2 += num; } finally { if(lockTaken) { slock.Exit(false); } } }