上一次主要讨论了在.NET 4中如何编写并行程序,这次继续上次的话题。

当我们有能力使用前面所介绍的一些结构来构建我们的应用程序时,一个需要考虑的场景是:假如一个并行过程已经开始,在它没有完成前想取消它的话应该怎么做呢?其实这个问题很现实,在多线程程序中也会遇到,当然了,多线程编程时我们可以用Thread.Abort()来终结它,那么在并行中该如何实现呢?老规矩,上Demo:

CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
Task task1 = Task.Factory.StartNew(() => {
    int i = 0;
    while (!token.IsCancellationRequested) {
        Thread.Sleep(500);
        Console.WriteLine("Task1,{0}", i++);
    }
}, token);
token.Register(() => {
    Console.WriteLine("Task1 has been canceled");
});

Console.ReadLine();
tokenSource.Cancel();

首先实例化一个CancellationTokenSource对象,这个对象用于管理并行过程中的取消动作。当创建一个Task的时候,传入一个CancellationToken对象,而这个对象可以由CancellationTokenSource.Token属性来获得。当Task开开始执行时,如果想取消这个Task,那么就可以调用CancellationTokenSource.Cancel()来取消与之相关的Task了。

其实,我们可以将CancellationTokenSource理解为一个犯罪团伙的头目,然后这个头目管理着CancellationToken小兵,然后当一个Task创建时将这个小兵安插在其中(无间道?),当头目发指令Cancel()时,小兵在Task内部上演无间道,嗯……

然而,出来混,迟早要还的。Task能不能无异常的执行完毕还是个未知数。在.NET 4中,现在可以用AggregateException来处理这些异常了,它提供一种异常汇总机制,可以对未知的异常进行处理,上一个Demo:

Task task1 = Task.Factory.StartNew(() => {
    Console.WriteLine("Task1 completed.");
});
Task task2 = Task.Factory.StartNew(() => {
    Console.WriteLine("Task2 processing.");
    throw new ArgumentOutOfRangeException("task2");
});

try {
    Task.WaitAll(task1, task2);
}
catch (AggregateException aggex) {
    aggex.Handle(ex => {
        Console.WriteLine(ex);
        return true;
    });
}

可以将具体的异常处理写在AggregateException.Handle方法内,Handle方法传入一个Func委托,用于处理具体的异常,例如上面,我们只是在Task内简单的抛出了一个ArgumentOutOfRangeException异常,当调用Task.WaitAll时异常被捕获,然后在aggex.Handle中具体处理。当然,可以将上述中传入的ex进行is判断来确定具体的异常,例如:if (ex is ArgumentOutOfRangeException) {…}。

聊完异常处理,接着聊访问安全。

访问安全的问题其实很显然,在多线程环境下要注意的问题无疑都是要在并行环境下考虑。这里,其实可以用线程安全来讲,因为不管是多线程还是并行,CPU调度的最小单位都是线程,所以,以下都用线程安全来说明。

要求线程安全,首先想到的肯定就是上锁。过去的一些手段就不说了,我们着重说一下.NET 4中的SpinLock互斥锁。

SpinLock spinLock = new SpinLock();
bool locked = false;
try {
    spinLock.Enter(ref locked);
    // Do something.
}
finally {
    if (locked) {
        spinLock.Exit();
    }
}

其实不用解释也看的懂了。线程安全操作多用于对资源的访问控制上,例如IO,Stream等,而我们往往需要面对的其实是内存中的数据,例如集合,例如队列等。.NET 4为了并行,引入了一个新的命名空间:System.Collections.Concurrent;在这个空间之下有很多经过处理的基础结构,例如ConcurrentDictionary、ConcurrentQueue等,它们都是线程安全的,或者说是并行安全的,而且在使用方法上和见的System.Collections.Generic下的结构无异。接下来演示一个BlockingCollection结构的使用Demo:

BlockingCollection<string> blockCollection = new BlockingCollection<string>();
ThreadPool.QueueUserWorkItem(o => {
    for (int i = 0; i < 200; i++) {
        blockCollection.Add("String" + i);
        Thread.Sleep(1000);
    }
});
ThreadPool.QueueUserWorkItem(o => {
    foreach (var i in blockCollection.GetConsumingEnumerable()) {
        Console.WriteLine("Read:{0}", i);
    }
});

我们用的是线程池的做法,当然,在并行编程中的做法也是一样的(有做法么?好像没有耶……)。可以看的出,Concurrent相关的结构可以大大简化并行编程中需要考虑的线程安全问题。

最后要讨论的是.NET 4中很NB的一个东西:PLINQ。并行并行,就是要多CPU协作同时执行,其实有理由相信多CPU可以提高查询的效率(没有说一定是DB查询,不涉及IO性能),尤其是在内存中集合的查询,PLINQ就是Parallel化的LINQ,使我们的查询可以在多个CPU上同时执行,藉此提高查询效率。

要想在一个自定义的集合中实现LINQ功能,常用的做法就是实现IEnumerable,这样就可以使用LINQ的查询语法来实现类似SQL的“漂漂”代码,在.NET 4中要想实现一个能并行查询(PLINQ)的自定义集合,可以实现IParallelEnumerable,IParallelEnumerable继承于IEnumerable,实现起来其实也不困难。

怎样去用PLINQ呢?上一个Demo看看:

var dataSet = new string[] { "data1", "data2", "data3", "data4" };
var results = from d in dataSet.AsParallel<string>()
                let result = d.ToUpper()
                select result;
foreach (var r in results) {
    Console.WriteLine(r);
}

只是简单的.AsParallel即可,很好很强大。我们可以使用.AsOrdered来实现在排序前进行缓存。

var queryByOrder = from d in dataSet.AsParallel<string>().AsOrdered<string>()
                    orderby d descending
                    let result = d.ToUpper()
                    select result;

这次我们换个方面来将queryByOrder输出:

queryByOrder.ForAll<string>(q => {
    Console.WriteLine(q);
});

看来扩展方法这块糖的确很好吃……

其实在普通的开发中,.AsParallel一下就OK了,我们来猜一下上面代码的结果是怎么样的?答案是不一定有顺序(即使在第二个示例中排序过),详情请参见上篇中的解释。

通过两篇文章,我们讨论了一些.NET 4中并于并行开发的基础,在实际的开发中,是否选择并行仍然是一个有待商榷的问题,我们往往关心的是,并行究竟能为开发带来多大的复杂度,能为效率带来多大的提升,下一次我将对并行的效率进行讨论,欢迎大家一起加入。