EditChapter 9 - Parallel LINQ to Objects
EditParallel LINQ to Objects Features
EditListing 9-1 : GeoNames Parsing example - Sequential.
This sample shows how to use the a sequential LINQ query performs when parsing text from a file (780Mb file).
public void Listing_9_1_GeoNamesSequential_Simple()
{
const int nameColumn = 1;
const int countryColumn = 8;
const int elevationColumn = 15;
Stopwatch watch = new Stopwatch();
watch.Start();
// to minimize the download, a single region file is supplied
//var lines = File.ReadLines(Path.Combine(Environment.CurrentDirectory, "Data/AU.txt"));
var lines = File.ReadLines(Path.Combine(
Environment.CurrentDirectory, "Data/AllCountries.txt"));
var q = from line in lines
let fields = line.Split(new char[] { '\t' })
let elevation = string.IsNullOrEmpty(
fields[elevationColumn]) ?
0 : int.Parse(fields[elevationColumn])
where elevation > 8000 // elevation in m's
orderby elevation descending
select new
{
name = fields[nameColumn] ?? "",
elevation = elevation,
country = fields[countryColumn]
};
foreach (var x in q)
{
if (x != null)
Console.WriteLine("{0} ({1}m) – located in {2}",
x.name, x.elevation, x.country);
}
Console.WriteLine("Elapsed time: {0}ms",
watch.ElapsedMilliseconds);
}
Console output (Execution time: 45225ms):
[
Hide/Show]
Mount Everest (8850m) – located in NP
K2 (8611m) – located in PK
Kānchenjunga (8586m) – located in NP
Lo–tzu Feng (8516m) – located in NP
Qowowuyag (8201m) – located in CN
Dhaulāgiri (8167m) – located in NP
Manāslu (8156m) – located in NP
Nanga Parbat (8126m) – located in PK
Annapūrna Himāl (8091m) – located in NP
Annapurna 1 (8091m) – located in NP
Gasherbrum Shan (8068m) – located in PK
Broad Feng (8047m) – located in PK
Gasherbrum II Feng (8035m) – located in PK
Xixabangma Feng (8027m) – located in CN
Elapsed time: 45126ms
Top
EditListing 9-2 : GeoNames Parsing example - Parallel.
This sample shows how to use the AsParallel() operator when parsing text from a file (780Mb).
public void Listing_9_2_GeoNamesParallel_Simple()
{
const int nameColumn = 1;
const int countryColumn = 8;
const int elevationColumn = 15;
Stopwatch watch = new Stopwatch();
watch.Start();
// to minimize the download, a single region file is supplied
//var lines = File.ReadLines(Path.Combine(Environment.CurrentDirectory, "Data/AU.txt"));
var lines = File.ReadLines(Path.Combine(
Environment.CurrentDirectory, "Data/AllCountries.txt"));
var q = from line in lines.AsParallel()
let fields = line.Split(new char[] { '\t' })
let elevation = string.IsNullOrEmpty(
fields[elevationColumn]) ?
0 : int.Parse(fields[elevationColumn])
where elevation > 8000 // elevation in m's
orderby elevation descending
select new
{
name = fields[nameColumn] ?? "",
elevation = elevation,
country = fields[countryColumn]
};
foreach (var x in q)
{
if (x != null)
Console.WriteLine("{0} ({1}m) – located in {2}",
x.name, x.elevation, x.country);
}
Console.WriteLine("Elapsed time: {0}ms",
watch.ElapsedMilliseconds);
}
Console output (Execution time: 28583ms):
[
Hide/Show]
Mount Everest (8850m) – located in NP
K2 (8611m) – located in PK
Kānchenjunga (8586m) – located in NP
Lo–tzu Feng (8516m) – located in NP
Qowowuyag (8201m) – located in CN
Dhaulāgiri (8167m) – located in NP
Manāslu (8156m) – located in NP
Nanga Parbat (8126m) – located in PK
Annapurna 1 (8091m) – located in NP
Annapūrna Himāl (8091m) – located in NP
Gasherbrum Shan (8068m) – located in PK
Broad Feng (8047m) – located in PK
Gasherbrum II Feng (8035m) – located in PK
Xixabangma Feng (8027m) – located in CN
Elapsed time: 28551ms
Top
EditListing 9 : Simplest Parallel LINQ Query.
This sample shows how to make a LINQ query parallel.
public void Listing_9_SimplestParallel()
{
// var data =
// new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
var data = Enumerable.Range(0, 11);
// sequential
var q = from i in data
select i;
// un–ordered parallel
var q1 = from i in data.AsParallel()
select i;
// ordered parallel using sort
var q2 = from i in data.AsParallel()
orderby i
select i;
// ordered parallel using .AsOrdered() extension
var q3 = from i in data.AsParallel().AsOrdered()
select i;
Console.WriteLine("q={0}ms, q1={1}ms, q2={2}ms, q3={3}ms",
MeasureTime(delegate { q.ToArray(); }, 100000),
MeasureTime(delegate { q1.ToArray(); }, 100000),
MeasureTime(delegate { q2.ToArray(); }, 100000),
MeasureTime(delegate { q3.ToArray(); }, 100000)
);
Console.WriteLine("sequential");
foreach (var item in q)
Console.Write(item + " ");
Console.WriteLine("");
Console.WriteLine("Un–ordered Parallel");
foreach (var item in q1)
Console.Write(item + " ");
Console.WriteLine();
Console.WriteLine("Ordered Parallel");
foreach (var item in q2)
Console.Write(item + " ");
Console.WriteLine();
Console.WriteLine(".AsOrdered() Parallel");
foreach (var item in q3)
Console.Write(item + " ");
}
Console output (Execution time: 17819ms):
[
Hide/Show]
q=103ms, q1=2904ms, q2=6850ms, q3=7736ms
sequential
0 1 2 3 4 5 6 7 8 9 10
Un–ordered Parallel
0 1 2 3 4 5 6 7 8 9 10
Ordered Parallel
0 1 2 3 4 5 6 7 8 9 10
.AsOrdered() Parallel
0 1 2 3 4 5 6 7 8 9 10
Top
EditListing 9-3 : Simple Parallel LINQ Query with real Work.
This sample shows how to make a LINQ query parallel, and undertake lengthy work (10ms thread wait in this case) on each item.
public void Listing_9_3_SimpleParallel()
{
var data =
new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
// MySlowFunction(i) simply adds 10ms Sleep on
// the current thread, and returns the 'i' value
// sequential
var q = from i in data
select MySlowFunction(i);
// un–ordered parallel
var q1 = from i in data.AsParallel()
select MySlowFunction(i);
// ordered parallel using .orderby
var q2 = from i in data.AsParallel()
orderby i
select MySlowFunction(i);
// ordered parallel using .AsOrdered()
var q3 = from i in data.AsParallel().AsOrdered()
select MySlowFunction(i);
Console.WriteLine("q={0}ms, q1={1}ms, q2={2}ms, q3={3}ms",
MeasureTime(delegate { q.ToArray(); }, 100),
MeasureTime(delegate { q1.ToArray(); }, 100),
MeasureTime(delegate { q2.ToArray(); }, 100),
MeasureTime(delegate { q3.ToArray(); }, 100)
);
Console.WriteLine("sequential");
foreach (var item in q)
Console.Write(item + " ");
Console.WriteLine("");
Console.WriteLine("Un–ordered Parallel");
foreach (var item in q1)
Console.Write(item + " ");
Console.WriteLine();
Console.WriteLine("Ordered Parallel");
foreach (var item in q2)
Console.Write(item + " ");
Console.WriteLine();
Console.WriteLine(".AsOrdered() Parallel");
foreach (var item in q3)
Console.Write(item + " ");
}
Console output (Execution time: 29660ms):
[
Hide/Show]
q=11088ms, q1=6002ms, q2=6018ms, q3=6194ms
sequential
0 1 2 3 4 5 6 7 8 9 10
Un–ordered Parallel
0 6 1 7 2 8 3 9 4 10 5
Ordered Parallel
0 1 2 3 4 5 6 7 8 9 10
.AsOrdered() Parallel
0 1 2 3 4 5 6 7 8 9 10
Top
EditListing 9-4, 9-5 : AsParallel() and AsSequential().
This sample shows how to use the AsParallel() and AsSequential() to control when a query is executing parallel and when its executing sequentially.
public void Listing_9_4_9_5_AsParallelAndAsSequential()
{
const int nameColumn = 1;
const int countryColumn = 8;
const int elevationColumn = 15;
// to minimize the download, a single region file is supplied
//var lines = File.ReadLines(Path.Combine(Environment.CurrentDirectory, "Data/AU.txt"));
var lines = File.ReadLines(Path.Combine(
Environment.CurrentDirectory, "Data/AllCountries.txt"));
// this query runs sequentially
var q = (from line in lines
let fields = line.Split(new char[] { '\t' })
where fields[countryColumn].StartsWith("A")
orderby fields[elevationColumn] ?? "" descending
select new
{
name = fields[nameColumn] ?? "",
elevation = fields[elevationColumn] ?? "",
country = fields[countryColumn]
})
.Take(5);
var lines1 = File.ReadLines(Path.Combine(
Environment.CurrentDirectory, "Data/AllCountries.txt"));
// this query runs sequentially because of the .Take() operator
var q1 = (from line in lines1.AsParallel()
let fields = line.Split(new char[] { '\t' })
where fields[countryColumn].StartsWith("A")
orderby fields[elevationColumn] ?? "" descending
select new
{
name = fields[nameColumn] ?? "",
elevation = fields[elevationColumn] ?? "",
country = fields[countryColumn]
})
.Take(5);
var lines2 = File.ReadLines(Path.Combine(
Environment.CurrentDirectory, "Data/AllCountries.txt"));
// this query isolates the Take() operator and executes mostly parallel
var q2 = (from line in lines2.AsParallel()
let fields = line.Split(new char[] { '\t' })
where fields[countryColumn].StartsWith("A")
orderby fields[elevationColumn] ?? "" descending
select new
{
name = fields[nameColumn] ?? "",
elevation = fields[elevationColumn] ?? "",
country = fields[countryColumn]
})
.AsSequential()
.Take(5);
Console.WriteLine("Sequential: {0}ms, Parallel: {1}ms, with AsSequential: {2}ms",
MeasureTime(delegate { q.ToArray(); }, 1),
MeasureTime(delegate { q1.ToArray(); }, 1),
MeasureTime(delegate { q2.ToArray(); }, 1));
}
Console output (Execution time: 83342ms):
[
Hide/Show]
Sequential: 29724ms, Parallel: 29535ms, with AsSequential: 24076ms
Top
EditListing 9-7 : Binary Operator AsParallel Rules
This sample shows the rules for binary operators when using the AsParallel operator.
public void Listing_9_7_AsParallelBinaryOperatorRules()
{
var source1 = new int[] { 1, 2, 3, 4, 5 };
var source2 = new int[] { 6, 7, 8, 9, 10 };
// ERROR – Obsolete warning
// "The second data source of a binary operator must be of
// type System.Linq.ParallelQuery<T> rather than
// System.Collections.Generic.IEnumerable<T>.
// To fix this problem, use the AsParallel() extension method
// to convert the right data source to System.Linq.ParallelQuery<T>."
//var q = source1.AsParallel().Concat(source2);
// the following queries work fine.
// remember to force ordering where appropriate.
var q1 = source1.AsParallel()
.Concat(source2.AsParallel());
var q2 = source1.AsParallel().AsOrdered()
.Concat(source2.AsParallel());
var q3 = source1.AsParallel().AsOrdered()
.Concat(source2.AsParallel().AsOrdered());
//Console.WriteLine("q ={0}", String.Join(" ", q.Select(i => i.ToString()).ToArray()));
Console.WriteLine("q1={0}", String.Join(" ", q1.Select(i => i.ToString()).ToArray()));
Console.WriteLine("q2={0}", String.Join(" ", q2.Select(i => i.ToString()).ToArray()));
Console.WriteLine("q3={0}", String.Join(" ", q3.Select(i => i.ToString()).ToArray()));
var w = source1.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism);
//var x = w.WithExecutionMode(ParallelExecutionMode.ForceParallelism);
w.ToList();
}
Console output (Execution time: 93ms):
[
Hide/Show]
q1=1 2 3 4 5 6 7 8 9 10
q2=1 2 3 4 5 6 7 8 9 10
q3=1 2 3 4 5 6 7 8 9 10
Top
EditListing 9 : GeoNames join sample.
This sample shows how to use the AsParallel() when parsing text from a file with a join. Not covered in the book, but an extension showing joins and parallel.
public void Listing_9_x_GeoNames_Join()
{
const int nameColumn = 1;
const int featureClassColumn = 6;
const int featureCodeColumn = 7;
const int countryColumn = 8;
const int elevationIndexColumn = 15;
Stopwatch watch = new Stopwatch();
watch.Start();
// load in the feature class and code decoding data into an array.
var codeFile = File.ReadLines(
Path.Combine(Environment.CurrentDirectory, "Data/FeatureCodes.txt"));
var codes = (from code in codeFile.AsParallel()
let c = code.Split(new char[] { '\t' })
select new
{
featureClass = c[0][0],
featureCode = c[0].Remove(0, 2),
featureDescription = c[1]
})
.ToArray();
// to minimize the download, a single region file is supplied
//var lines = File.ReadLines(Path.Combine(Environment.CurrentDirectory, "Data/AU.txt"));
var lines = File.ReadLines(Path.Combine(Environment.CurrentDirectory, "Data/AllCountries.txt"));
var q = from line in lines.AsParallel()
let fields = line.Split(new char[] { '\t' })
let elevation = string.IsNullOrEmpty(fields[elevationIndexColumn]) ?
0 : int.Parse(fields[elevationIndexColumn])
where elevation > 6000 // elevation in m's
let code = codes.SingleOrDefault(
c => c.featureCode == fields[featureCodeColumn] &&
c.featureClass == fields[featureClassColumn][0])
orderby elevation descending
select new
{
name = fields[nameColumn] ?? "",
elevation = elevation,
country = fields[countryColumn],
description = code != null ? code.featureDescription : ""
};
foreach (var x in q)
{
if (x != null)
Console.WriteLine("{0} ({1}m) – A {2} in {3}",
x.name,
x.elevation,
x.description,
x.country);
}
Console.WriteLine();
Console.WriteLine("Elapsed time: {0}ms", watch.ElapsedMilliseconds);
}
Console output (Execution time: 22869ms):
[
Hide/Show]
Mount Everest (8850m) – A mountain in NP
K2 (8611m) – A mountain in PK
Kānchenjunga (8586m) – A mountain in NP
Lo–tzu Feng (8516m) – A peak in NP
Qowowuyag (8201m) – A mountain in CN
Dhaulāgiri (8167m) – A peak in NP
Manāslu (8156m) – A mountain in NP
Nanga Parbat (8126m) – A peak in PK
Annapurna 1 (8091m) – A mountain in NP
Annapūrna Himāl (8091m) – A area in NP
Gasherbrum Shan (8068m) – A mountain in PK
Broad Feng (8047m) – A mountain in PK
Gasherbrum II Feng (8035m) – A mountain in PK
Xixabangma Feng (8027m) – A mountain in CN
Gasherbrum III Feng (7952m) – A mountain in PK
Annapūrna II (7937m) – A mountain in NP
Gasherbrum IV (7925m) – A mountain in PK
Tirich Mīr (7708m) – A peak in PK
Fang (7647m) – A peak in NP
Chogolisa (7628m) – A peak in PK
Annapurna III (7555m) – A peak in NP
Annapurna IV (7525m) – A peak in NP
Pik Imeni Ismail Samani (7495m) – A peak in TJ
Nowshāk (7482m) – A mountain in AF
Gangapurna (7455m) – A peak in NP
Kangsar Kang (7454m) – A peak in NP
Baltoro Kangri (7312m) – A peak in PK
Annapurna South (7219m) – A peak in NP
Tarke Kang (7193m) – A peak in NP
Nyainqêntanglha Feng (7162m) – A mountain in CN
Tilicho Peak (7134m) – A peak in NP
Gauri Sankar (7134m) – A mountain in NP
Nilgiri North (7061m) – A peak in NP
Māchhāpuchhare (6993m) – A mountain in NP
Lamjung Himal (6983m) – A mountain in NP
Muztag Feng (6973m) – A mountain in CN
Nilgiri (6940m) – A mountain in NP
Tukuche Peak (6920m) – A mountain in NP
Angelus (6855m) – A peak in PK
Xiaofong Tip (6845m) – A peak in CN
Nilgiri South (6839m) – A peak in NP
Nevado Huascarán (6768m) – A mountain in PE
Cerro Pariamachay (6759m) – A mountain in PE
Kangrinboqê Feng (6714m) – A mountain in CN
Nevado Yerupajá (6634m) – A mountain in PE
Chulu (6584m) – A mountain in NP
Būni Zom (6551m) – A mountain in PK
yak Gawa (6484m) – A mountain in NP
Thorungste (6482m) – A mountain in NP
Sinu Chuli (6461m) – A peak in NP
Hiunchuli (6441m) – A peak in NP
Kūh–e Tūlūksā (6435m) – A mountain in AF
Pīr Yekhhā–ye Darreh Sakhī (6414m) – A valley in AF
Nevado Coropuna (6377m) – A mountain in PE
Tozê Kangri (6370m) – A mountain in CN
Taweche (6367m) – A mountain in NP
Kālejandar (6325m) – A mountains in PK
Nevado Ampato (6288m) – A mountain in PE
Kūh–e Bandaka (6271m) – A mountain in AF
Marble Peak (6256m) – A peak in PK
Cristal (6252m) – A peak in PK
Ql Sorkhī (6171m) – A mountain in AF
South Peak (6145m) – A mountain in US
Mount McKinley (6145m) – A mountain in US
Churchill Peaks (6145m) – A mountain in US
Kūh–e Mēnjān (6130m) – A mountain in AF
Canharba Chuli (6128m) – A peak in NP
Nevado Solimana (6093m) – A mountain in PE
Pisang Peak (6091m) – A mountain in NP
Carter Horn (6059m) – A cape in US
Nevado Chachani (6057m) – A mountain in PE
Farthing Horn (6040m) – A cape in US
Nevado Hualca Hualca (6025m) – A mountain in PE
Mitre (6013m) – A peak in PK
Dhampus Peak (6012m) – A mountain in NP
Elapsed time: 22865ms
Top
EditTable 9-2 : Sequential Variance Test
This sample shows the various optomizations for a Variance statistical operator
public void Listing_9_Table_9_2_SequentialVariance()
{
var sourceArray = Enumerable.Range(1, 100000).ToArray();
var sourceEnum = Enumerable.Range(1, 100000);
Console.WriteLine("Variance over array of int's");
Console.WriteLine("Variance1: {0}ms, Variance2: {1}ms, Variance: {2}ms",
MeasureTime(delegate { sourceArray.Variance1(); }, 1000),
MeasureTime(delegate { sourceArray.Variance2(); }, 1000),
MeasureTime(delegate { sourceArray.Variance(); }, 1000));
Console.WriteLine("Variance over IEnumerable of int's");
Console.WriteLine("Variance1: {0}ms, Variance2: {1}ms, Variance: {2}ms",
MeasureTime(delegate { sourceEnum.Variance1(); }, 1000),
MeasureTime(delegate { sourceEnum.Variance2(); }, 1000),
MeasureTime(delegate { sourceEnum.Variance(); }, 1000));
}
public static class StatisticExtensions
{
/* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
n = 0, sum = 0, sum_sqr = 0
for x in data:
n = n + 1
sum = sum + x
sum_sqr = sum_sqr + x*x
mean = sum/n
variance = (sum_sqr – sum*mean)/(n – 1)
*/
// sequential variance calculation
// Listing 9–8
public static double Variance1(
this IEnumerable<int> source)
{
// traditional aggregate
double mean = source.Average();
return source.Aggregate(
(double)0.0,
(subtotal, item) =>
subtotal + Math.Pow((item – mean), 2),
(totalSum) => totalSum / (source.Count() – 1)
);
}
// Listing 9–9
public static double Variance2(
this IEnumerable<int> source)
{
// optomization 1 – removing the Math.Power
double mean = source.Average();
return source.Aggregate(
(double)0.0,
(subtotal, item) => subtotal +
((double)item – mean) * ((double)item – mean),
(totalSum) => totalSum / (source.Count() – 1)
);
}
// Listing 9–10
public static double Variance(
this IEnumerable<int> source)
{
// optomization 2 – removing count and mean calcs
// check for invalid source conditions
if (source == null)
throw new ArgumentNullException("source");
return source.Aggregate(
// seed – array of three doubles
new double[3] { 0.0, 0.0, 0.0 },
// item aggregation function, run for each element
(subtotal, item) =>
{
subtotal[0]++; // count
subtotal[1] += item; // sum
// sum of squares
subtotal[2] += (double)item * (double)item;
return subtotal;
},
// result selector function
// (finesses the final sum into the variance)
// mean = sum / count
// variance = (sum_sqr – sum * mean) / (n – 1)
// Sources with zero or one element return a value of 0
result => result[0] > 1 ?
(result[2] – (result[1] * (result[1] / result[0])))
/ (result[0] – 1) : 0.0
);
}
// parallel Variance aggregate extension method
// based on the optomized sequential algorithm
// Listing 9–12
public static double Variance(
this ParallelQuery<int> source)
{
/* based upon the blog posting by Igor Ostrovsky at –
* http://blogs.msdn.com/pfxteam/archive/2008/06/05/8576194.aspx
* which demonstrates how to use the factory functions in an
* Aggregate function for efficiency
*/
// check for invalid source conditions
if (source == null)
throw new ArgumentNullException("source");
return source.Aggregate(
// seed – array of three doubles constructed
// using factory function, initialized to 0
() => new double[3] { 0.0, 0.0, 0.0 },
// item aggregation function, run for each element
(subtotal, item) =>
{
subtotal[0]++; // count
subtotal[1] += item; // sum
// sum of squares
subtotal[2] += (double)item * (double)item;
return subtotal;
},
// combine function,
// run on completion of each "thread"
(total, thisThread) =>
{
total[0] += thisThread[0];
total[1] += thisThread[1];
total[2] += thisThread[2];
return total;
},
// result selector function
// finesses the final sum into the variance
// mean = sum / count
// variance = (sum_sqr – sum * mean) / (n – 1)
// Sources with zero or one element return a value of 0
(result) => (result[0] > 1) ?
(result[2] – (result[1] * (result[1] / result[0])))
/ (result[0] – 1) : 0.0
);
}
// Listing 9–11
public static double StandardDeviation(
this IEnumerable<int> source)
{
return Math.Sqrt(source.Variance());
}
public static double StandardDeviation<T>(
this IEnumerable<T> source,
Func<T, int> selector)
{
return StandardDeviation(
Enumerable.Select(source, selector));
}
// Listing 9–13
public static double StandardDeviation(
this ParallelQuery<int> source)
{
return Math.Sqrt(source.Variance());
}
public static double StandardDeviation<T>(
this ParallelQuery<T> source,
Func<T, int> selector)
{
return StandardDeviation(
ParallelEnumerable.Select(source, selector));
}
}
Console output (Execution time: 49278ms):
[
Hide/Show]
Variance over array of int's
Variance1: 16097ms, Variance2: 3748ms, Variance: 2893ms
Variance over IEnumerable of int's
Variance1: 17795ms, Variance2: 5573ms, Variance: 3165ms
Top
EditTable 9-3 : Parallel Standard Deviation Test
This sample shows the parallel StandardDeviation statistical operator
public void Listing_9_Table_9_3_Parallel_StandardDeviation()
{
var sequential = Enumerable.Range(1, 100000).ToArray();
var parall = Enumerable.Range(1, 100000)
.ToArray()
.AsParallel();
var sequentialEnum = Enumerable.Range(1, 100000);
var parallEnum = Enumerable.Range(1, 100000)
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism);
Console.WriteLine("{0}, {1}", sequential.StandardDeviation(), parall.StandardDeviation());
Console.WriteLine("Standard Deviation over array of int's");
Console.WriteLine("sequential: {0}ms, Parallel: {1}ms",
MeasureTime(delegate { sequential.StandardDeviation(); }, 1000),
MeasureTime(delegate { parall.StandardDeviation(); }, 1000));
Console.WriteLine("Standard Deviation over IEnumerable of int's");
Console.WriteLine("sequential: {0}ms, Parallel: {1}ms",
MeasureTime(delegate { sequentialEnum.StandardDeviation(); }, 1000),
MeasureTime(delegate { parallEnum.StandardDeviation(); }, 1000));
}
public static class StatisticExtensions
{
/* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
n = 0, sum = 0, sum_sqr = 0
for x in data:
n = n + 1
sum = sum + x
sum_sqr = sum_sqr + x*x
mean = sum/n
variance = (sum_sqr – sum*mean)/(n – 1)
*/
// sequential variance calculation
// Listing 9–8
public static double Variance1(
this IEnumerable<int> source)
{
// traditional aggregate
double mean = source.Average();
return source.Aggregate(
(double)0.0,
(subtotal, item) =>
subtotal + Math.Pow((item – mean), 2),
(totalSum) => totalSum / (source.Count() – 1)
);
}
// Listing 9–9
public static double Variance2(
this IEnumerable<int> source)
{
// optomization 1 – removing the Math.Power
double mean = source.Average();
return source.Aggregate(
(double)0.0,
(subtotal, item) => subtotal +
((double)item – mean) * ((double)item – mean),
(totalSum) => totalSum / (source.Count() – 1)
);
}
// Listing 9–10
public static double Variance(
this IEnumerable<int> source)
{
// optomization 2 – removing count and mean calcs
// check for invalid source conditions
if (source == null)
throw new ArgumentNullException("source");
return source.Aggregate(
// seed – array of three doubles
new double[3] { 0.0, 0.0, 0.0 },
// item aggregation function, run for each element
(subtotal, item) =>
{
subtotal[0]++; // count
subtotal[1] += item; // sum
// sum of squares
subtotal[2] += (double)item * (double)item;
return subtotal;
},
// result selector function
// (finesses the final sum into the variance)
// mean = sum / count
// variance = (sum_sqr – sum * mean) / (n – 1)
// Sources with zero or one element return a value of 0
result => result[0] > 1 ?
(result[2] – (result[1] * (result[1] / result[0])))
/ (result[0] – 1) : 0.0
);
}
// parallel Variance aggregate extension method
// based on the optomized sequential algorithm
// Listing 9–12
public static double Variance(
this ParallelQuery<int> source)
{
/* based upon the blog posting by Igor Ostrovsky at –
* http://blogs.msdn.com/pfxteam/archive/2008/06/05/8576194.aspx
* which demonstrates how to use the factory functions in an
* Aggregate function for efficiency
*/
// check for invalid source conditions
if (source == null)
throw new ArgumentNullException("source");
return source.Aggregate(
// seed – array of three doubles constructed
// using factory function, initialized to 0
() => new double[3] { 0.0, 0.0, 0.0 },
// item aggregation function, run for each element
(subtotal, item) =>
{
subtotal[0]++; // count
subtotal[1] += item; // sum
// sum of squares
subtotal[2] += (double)item * (double)item;
return subtotal;
},
// combine function,
// run on completion of each "thread"
(total, thisThread) =>
{
total[0] += thisThread[0];
total[1] += thisThread[1];
total[2] += thisThread[2];
return total;
},
// result selector function
// finesses the final sum into the variance
// mean = sum / count
// variance = (sum_sqr – sum * mean) / (n – 1)
// Sources with zero or one element return a value of 0
(result) => (result[0] > 1) ?
(result[2] – (result[1] * (result[1] / result[0])))
/ (result[0] – 1) : 0.0
);
}
// Listing 9–11
public static double StandardDeviation(
this IEnumerable<int> source)
{
return Math.Sqrt(source.Variance());
}
public static double StandardDeviation<T>(
this IEnumerable<T> source,
Func<T, int> selector)
{
return StandardDeviation(
Enumerable.Select(source, selector));
}
// Listing 9–13
public static double StandardDeviation(
this ParallelQuery<int> source)
{
return Math.Sqrt(source.Variance());
}
public static double StandardDeviation<T>(
this ParallelQuery<T> source,
Func<T, int> selector)
{
return StandardDeviation(
ParallelEnumerable.Select(source, selector));
}
}
Console output (Execution time: 12354ms):
[
Hide/Show]
28867.6577966877, 28867.6577966877
Standard Deviation over array of int's
sequential: 3381ms, Parallel: 1780ms
Standard Deviation over IEnumerable of int's
sequential: 3303ms, Parallel: 3848ms
Top
EditListing 9 : Standard Deviation Test for Error Handling
This sample shows the Error Handling code for the StandardDeviation statistical operator
public void Listing_9_Hardening_StandardDeviation()
{
// source is null
int[] nullSource = null;
try
{
nullSource.StandardDeviation();
}
catch (Exception e)
{
Console.WriteLine("Sequential SD Null Source Error: {0}",
e.Message);
}
try
{
nullSource.AsParallel().StandardDeviation();
}
catch (Exception e)
{
Console.WriteLine("Parallel SD Null Source Error: {0}",
e.Message);
}
// source is empty
int[] empty = new int[] { };
Console.WriteLine("Sequential empty source SD = {0}, Parallel empty source SD = {1}",
empty.StandardDeviation(),
empty.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.StandardDeviation()
);
// source is single element. Should return 0
int[] one = new int[] { 5 };
Console.WriteLine("Sequential one element SD = {0}, Parallel one element SD = {1}",
one.StandardDeviation(),
one.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.StandardDeviation()
);
}
public static class StatisticExtensions
{
/* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
n = 0, sum = 0, sum_sqr = 0
for x in data:
n = n + 1
sum = sum + x
sum_sqr = sum_sqr + x*x
mean = sum/n
variance = (sum_sqr – sum*mean)/(n – 1)
*/
// sequential variance calculation
// Listing 9–8
public static double Variance1(
this IEnumerable<int> source)
{
// traditional aggregate
double mean = source.Average();
return source.Aggregate(
(double)0.0,
(subtotal, item) =>
subtotal + Math.Pow((item – mean), 2),
(totalSum) => totalSum / (source.Count() – 1)
);
}
// Listing 9–9
public static double Variance2(
this IEnumerable<int> source)
{
// optomization 1 – removing the Math.Power
double mean = source.Average();
return source.Aggregate(
(double)0.0,
(subtotal, item) => subtotal +
((double)item – mean) * ((double)item – mean),
(totalSum) => totalSum / (source.Count() – 1)
);
}
// Listing 9–10
public static double Variance(
this IEnumerable<int> source)
{
// optomization 2 – removing count and mean calcs
// check for invalid source conditions
if (source == null)
throw new ArgumentNullException("source");
return source.Aggregate(
// seed – array of three doubles
new double[3] { 0.0, 0.0, 0.0 },
// item aggregation function, run for each element
(subtotal, item) =>
{
subtotal[0]++; // count
subtotal[1] += item; // sum
// sum of squares
subtotal[2] += (double)item * (double)item;
return subtotal;
},
// result selector function
// (finesses the final sum into the variance)
// mean = sum / count
// variance = (sum_sqr – sum * mean) / (n – 1)
// Sources with zero or one element return a value of 0
result => result[0] > 1 ?
(result[2] – (result[1] * (result[1] / result[0])))
/ (result[0] – 1) : 0.0
);
}
// parallel Variance aggregate extension method
// based on the optomized sequential algorithm
// Listing 9–12
public static double Variance(
this ParallelQuery<int> source)
{
/* based upon the blog posting by Igor Ostrovsky at –
* http://blogs.msdn.com/pfxteam/archive/2008/06/05/8576194.aspx
* which demonstrates how to use the factory functions in an
* Aggregate function for efficiency
*/
// check for invalid source conditions
if (source == null)
throw new ArgumentNullException("source");
return source.Aggregate(
// seed – array of three doubles constructed
// using factory function, initialized to 0
() => new double[3] { 0.0, 0.0, 0.0 },
// item aggregation function, run for each element
(subtotal, item) =>
{
subtotal[0]++; // count
subtotal[1] += item; // sum
// sum of squares
subtotal[2] += (double)item * (double)item;
return subtotal;
},
// combine function,
// run on completion of each "thread"
(total, thisThread) =>
{
total[0] += thisThread[0];
total[1] += thisThread[1];
total[2] += thisThread[2];
return total;
},
// result selector function
// finesses the final sum into the variance
// mean = sum / count
// variance = (sum_sqr – sum * mean) / (n – 1)
// Sources with zero or one element return a value of 0
(result) => (result[0] > 1) ?
(result[2] – (result[1] * (result[1] / result[0])))
/ (result[0] – 1) : 0.0
);
}
// Listing 9–11
public static double StandardDeviation(
this IEnumerable<int> source)
{
return Math.Sqrt(source.Variance());
}
public static double StandardDeviation<T>(
this IEnumerable<T> source,
Func<T, int> selector)
{
return StandardDeviation(
Enumerable.Select(source, selector));
}
// Listing 9–13
public static double StandardDeviation(
this ParallelQuery<int> source)
{
return Math.Sqrt(source.Variance());
}
public static double StandardDeviation<T>(
this ParallelQuery<T> source,
Func<T, int> selector)
{
return StandardDeviation(
ParallelEnumerable.Select(source, selector));
}
}
Console output (Execution time: 21ms):
[
Hide/Show]
Sequential SD Null Source Error: Value cannot be null.
Parameter name: source
Parallel SD Null Source Error: Value cannot be null.
Parameter name: source
Sequential empty source SD = 0, Parallel empty source SD = 0
Sequential one element SD = 0, Parallel one element SD = 0
Top
EditListing 9 : AsParallel examples.
This sample shows how to use the AsParallel() operator.
public void Listing_9_AsParallel()
{
int[] a = { 1, 2, 3, 4, 5 };
int[] b = { 3, 4, 5, 6, 7 };
(from i in a.AsParallel()
select i * i)
.ForAll(i => Console.Write(i + " "));
}
Console output (Execution time: 0ms):
[
Hide/Show]
Top