スポンサーサイト

上記の広告は1ヶ月以上更新のないブログに表示されています。
新しい記事を書く事で広告が消せます。

F#による並行プログラミング入門(14) キャンセル(1) Parallel Loop,PLINQ

 今回は並行処理のキャンセルの紹介です。 
 
これにはCancellationTokenSourceクラスのインスタンスを使用します。まずは一つインスタンス化して代的プロパティを調べてみます。(CancellationTokenSourceとはいわばキャンセルの狼煙を挙げる火元です。) 
 
> let taskCTS0 = new CancellationTokenSource();; 
val taskCTS0 : CancellationTokenSource 
 
> taskCTS0;; 
val it : CancellationTokenSource = 
  System.Threading.CancellationTokenSource 
    {IsCancellationRequested = false; 
     Token = System.Threading.CancellationToken;} 
 
さらにTokenプロパティを調べてみます。 
 
> taskCTS0.Token;; 
val it : CancellationToken = 
  System.Threading.CancellationToken 
    {CanBeCanceled = true; 
     IsCancellationRequested = false; 
     WaitHandle = System.Threading.ManualResetEvent;} 
 
さてここでキャンセルの狼煙を上げてみます。 
 
> taskCTS0.Cancel();; 
val it : unit = () 
 
もう一度Tokenプロパティを調べてみます。 
> taskCTS0.Token;; 
val it : CancellationToken = 
  System.Threading.CancellationToken 
    {CanBeCanceled = true; 
     IsCancellationRequested = true; 
     WaitHandle = System.Threading.ManualResetEvent;} 
      
IsCancellationRequested プロパティがtrueに変化してます。 
 
ということで、並行処理のキャンセルの一つの方法は、並行処理側に「CancellationTokenSourceクラスのインスタンスのTokenプロパティで得られるCancellationToken型の値A」を渡して、「並行処理内ではAのIsCancellationRequestedプロパティを定期的に調べながら、もしこれがtrueの時は適切な処理を施してから処理を終了する」という仕組みで行う方法です。 
もう一つは「IsCancellationRequestedプロパティを調べて、もしfalseならそのまま処理を続行し、trueならば,例外を発生させるというCancellationToken型の値のインスタンスメソッドThrowIfCancellationRequested()」を定期的に実行するという方法です。(なおIsCancellationRequestedプロパティを定期的に調べるのにもコストがかかりますから、タイミング等は考える必要があります。) 
ここでは2番目の方法でやっていきます。 
 
 
ではParallel.Loop等順番にキャンセル例を挙げていきます。 
 
(1)Parallel.Loop 
 
Parallel.ForやParallel.ForEachやParallel.Invokeのキャンセルはオーバーロードされたうち次の形のようなParallelOptionsが引数の一部にあるものを利用します。 
 
public static ParallelLoopResult For( 
    int fromInclusive, 
    int toExclusive, 
    ParallelOptions parallelOptions, 
    Action<int> body 
 
public static ParallelLoopResult ForEach<TSource>( 
    IEnumerable<TSource> source, 
    ParallelOptions parallelOptions, 
    Action<TSource> body 
 
public static void Invoke( 
    ParallelOptions parallelOptions, 
    params Action[] actions 
 
ParallelOptions型の値はCancellationTokenプロパティをもつので、これに自分で生成したCancellationTokenSourceクラスのインスタンスのTokenプロパティをセットします。 
 
例 
> let taskCTS0 = new CancellationTokenSource() 
let options = new ParallelOptions() 
options.CancellationToken <- taskCTS0.Token 
//上の2行は次のようにしてもよい let options = new ParallelOptions(CancellationToken = taskCTS0.Token) 
 
//100ms後にキャンセルを発動 
Task.Factory.StartNew(fun () -> Thread.Sleep(100);taskCTS0.Cancel()) |> ignore 
Parallel.For(0,20,options,fun i -> printfn "%dの仕事を開始" i 
                                   Thread.Sleep(50) 
                                   options.CancellationToken.ThrowIfCancellationRequested()) |>ignore;; 
5の仕事を開始 
10の仕事を開始 
15の仕事を開始 
0の仕事を開始 
16の仕事を開始 
6の仕事を開始 
1の仕事を開始 
11の仕事を開始 
System.OperationCanceledException: 操作はキャンセルされました。 
   場所 System.Threading.Tasks.Parallel.ThrowIfReducableToSingleOCE(IEnumerable`1 excCollection, CancellationToken ct) 
   場所 System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Func`4 bodyWithLocal, Func`1 localInit, Action`1 localFinally) 
   場所 System.Threading.Tasks.Parallel.For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body) 
   場所 <StartupCode$FSI_0006>.$FSI_0006.main@() 
エラーのため停止しました 
ということで例外が発生してキャンセル成功です。(例外のさばき方は後日やります。) 
 
(2)PLINQ 
 
PLINQではWithCancellationを利用します。 
例 
 
 > open System 
open System.Threading 
open System.Threading.Tasks 
open System.Linq;; 
 
> let taskCTS0 = new CancellationTokenSource() 
//100ms後にキャンセルを発動 
Task.Factory.StartNew(fun () -> Thread.Sleep(100);taskCTS0.Cancel()) |> ignore 
[1 .. 19].AsParallel() 
         .WithCancellation(taskCTS0.Token) 
         .ForAll(fun i -> printfn "%dの仕事を開始" i 
                          Thread.Sleep(50) 
                          taskCTS0.Token.ThrowIfCancellationRequested() 
                 );; 
3の仕事を開始 
1の仕事を開始 
4の仕事を開始 
2の仕事を開始 
5の仕事を開始 
6の仕事を開始 
7の仕事を開始 
8の仕事を開始 
System.OperationCanceledException: WithCancellation に対して指定されたトークンを介してクエリが取り消されました。 
   場所 System.Linq.Parallel.CancellationState.ThrowWithStandardMessageIfCanceled(CancellationToken externalCancellationToken) 
   場所 System.Linq.Parallel.QueryTaskGroupState.QueryEnd(Boolean userInitiatedDispose) 
   場所 System.Linq.Parallel.SpoolingTask.SpoolForAll[TInputOutput,TIgnoreKey](QueryTaskGroupState groupState, PartitionedStream`2 partitions, TaskScheduler taskScheduler) 
   場所 System.Linq.Parallel.DefaultMergeHelper`2.System.Linq.Parallel.IMergeHelper<TInputOutput>.Execute() 
   場所 System.Linq.Parallel.MergeExecutor`1.Execute[TKey](PartitionedStream`2 partitions, Boolean ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, Boolean isOrdered, CancellationState cancellationState, Int32 queryId) 
   場所 System.Linq.Parallel.PartitionedStreamMerger`1.Receive[TKey](PartitionedStream`2 partitionedStream) 
   場所 System.Linq.Parallel.ForAllOperator`1.WrapPartitionedStream[TKey](PartitionedStream`2 inputStream, IPartitionedStreamRecipient`1 recipient, Boolean preferStriping, QuerySettings settings) 
   場所 System.Linq.Parallel.UnaryQueryOperator`2.UnaryQueryOperatorResults.ChildResultsRecipient.Receive[TKey](PartitionedStream`2 inputStream) 
   場所 System.Linq.Parallel.ScanQueryOperator`1.ScanEnumerableQueryOperatorResults.GivePartitionedStream(IPartitionedStreamRecipient`1 recipient) 
   場所 System.Linq.Parallel.UnaryQueryOperator`2.UnaryQueryOperatorResults.GivePartitionedStream(IPartitionedStreamRecipient`1 recipient) 
   場所 System.Linq.Parallel.QueryOperator`1.GetOpenedEnumerator(Nullable`1 mergeOptions, Boolean suppressOrder, Boolean forEffect, QuerySettings querySettings) 
   場所 System.Linq.Parallel.ForAllOperator`1.RunSynchronously() 
   場所 System.Linq.ParallelEnumerable.ForAll[TSource](ParallelQuery`1 source, Action`1 action) 
   場所 <StartupCode$FSI_0003>.$FSI_0003.main@() 
エラーのため停止しました

続きを読む

スポンサーサイト

テーマ : プログラミング
ジャンル : コンピュータ

F#による並行プログラミング入門(13) Parallel Tasks(2) Parallel.Invoke

 前回は「newとstartとwait」もしくは「Task.Factory.StartNewとwait」でタスクを生成開始して、終了を待ちましたが、今回はそれをもっと手軽にできるParallel.Invokeを紹介します。 
 
まずはシグネチャーから。 
 
static member Invoke :  
        actions:Action[] -> unit  
 
では例です。 
 
> open System 
open System.Threading 
open System.Threading.Tasks;; 
 
> let longTaskSubL r =  
    let res = ref 0L  
    for i in 1L .. 10000L do  
      for j in 1L .. 1000L do  
         res := !res + r  
    printfn "引数%Aに対しlongTaskSubの計算が終了" r 
    !res  
 
let doWorkL i = 
    longTaskSubL i |> ignore 
    printfn "引数%Aに対しManagedThreadId = %dで仕事を終わらせました"  i Thread.CurrentThread.ManagedThreadId 
;; 
 
val longTaskSubL : int64 -> int64 
val doWorkL : int64 -> unit 
 
ここでParallel.Invokeを使ってみますが、次のように書くと型推論がきかずにエラーが出てしまします。 
 
Parallel.Invoke [|(fun () -> doWorkL 3L);(fun () -> doWorkL 1L);(fun () -> doWorkL 2L)|] 
 
そこで、ワンクッションおいてAction型にキャストします。 
 
> [|(fun () -> doWorkL 3L);(fun () -> doWorkL 1L);(fun () -> doWorkL 2L)|] 
|> Array.map (fun f -> Action(f)) 
|> Parallel.Invoke 
;; 
引数引数引数3Lに対しlongTaskSubの計算が終了 
1Lに対しlongTaskSubの計算が終了 
2Lに対しlongTaskSubの計算が終了 
引数引数引数1Lに対しManagedThreadId = 5で仕事を終わらせました 
3Lに対しManagedThreadId = 6で仕事を終わらせました 
2Lに対しManagedThreadId = 4で仕事を終わらせました 
val it : unit = () 
 
上は次のようにも書き直せます。 
 
[|3L;1L;2L|] 
|> Array.map (fun i -> Action(fun () -> doWorkL i)) 
|> Parallel.Invoke 
 
なおParallel.InvokeにはParallel.Forなどと同様にParallelOptionsを引数の一部にするオーバーロードも準備されています。 
 
public static void Invoke( 
    ParallelOptions parallelOptions, 
    params Action[] actions 
 
利用法はParallel.Forなどと同様なので割愛します。 
 
留意点は、これを使うと「すべての仕事の処理終了を待つという」という点です。 

続きを読む

テーマ : プログラミング
ジャンル : コンピュータ

F#による並行プログラミング入門(12) Parallel Tasks(1) Taskの生成実行,WaitAll,WaitAny

 前回までは、lEnumerable型の値に一つの関数を適用させていく作業をparallelで行う例でしたが、今回からは複数種類の関数実行をparallelに行う例の紹介です。 
 
これを行うにはSystem.Threading.Tasksクラスのインスタンスをそれぞれの実行する関数を材料として生成し、並行処理として走らせます。 
 
> open System 
open System.Threading 
open System.Threading.Tasks;; 
> let longTaskSubL r =  
    let res = ref 0L  
    for i in 1L .. 100000L do  
      for j in 1L .. 1000L do  
         res := !res + r  
    printfn "引数%Aに対しlongTaskSubの計算が終了" r 
    !res  
 
let doWorkL i = 
    longTaskSubL i |> ignore 
    printfn "引数%Aに対しManagedThreadId = %dで仕事を終わらせました"  i Thread.CurrentThread.ManagedThreadId;; 
 
val longTaskSubL : int64 -> int64 
val doWorkL : int64 -> unit 
 
ではTaskクラスのインスタンスを一つ作ってみます。 
 
> let task0 = new Task(fun () ->(doWorkL 2L));; 
val task0 : Task 
 
代表的なプロパティを見てみます。 
 
> task0;; 
val it : Task = System.Threading.Tasks.Task {AsyncState = null; 
                                             CreationOptions = None; 
                                             Exception = null; 
                                             Id = 1; 
                                             IsCanceled = false; 
                                             IsCompleted = false; 
                                             IsFaulted = false; 
                                             Status = Created;} 
                                              
 
開始するにはスタートメソッドを使います。 
 
> task0.Start() 
printfn "Task0 が終了しました";; 
 
Task0 が終了しました 
> 引数2Lに対しlongTaskSubの計算が終了 
引数2Lに対しManagedThreadId = 6で仕事を終わらせました 
 
少し不思議な状況になりました。「Task0 が終了しました」の表示の後に実際の仕事が終わっています。 
これは「taskを並行処理で開始した場合、その処理の終了を待たずに、メインスレッド側が処理を進めていっていること」が原因です。メインスレッド側で並行処理の終了を待って処理を進める場合は、待つ場所で、タスクのWaitメソッドを使います。(他にもいろいろ方法がありますが、それは後日順次紹介していきます。) 
ではさっきの部分をすこし修正します。 
 
> let task0 = new Task(fun () ->(doWorkL 2L)) 
task0.Start() 
task0.Wait() 
printfn "Task0 が終了しました";; 
 
引数2Lに対しlongTaskSubの計算が終了 
引数2Lに対しManagedThreadId = 5で仕事を終わらせました 
Task0 が終了しました 
 
次は2つのタスクを生成しスタートしてから、メインスレッド側で終了を待って、処理を同期したいと思います。2つ以上のタスクの終了を待つのはTask.WaitAllメソッドを利用します。引数はTask[] 型です。 
 
> let task0 = new Task(fun () ->(doWorkL 2L)) 
let task1 = new Task(fun () ->(doWorkL 1L)) 
task0.Start() 
task1.Start() 
Task.WaitAll [|task0;task1|] 
printfn "すべてのTask が終了しました";; 
 
引数1Lに対しlongTaskSubの計算が終了 
引数1Lに対しManagedThreadId = 4で仕事を終わらせました 
引数2Lに対しlongTaskSubの計算が終了 
引数2Lに対しManagedThreadId = 3で仕事を終わらせました 
すべてのTask が終了しました 
 
どれか少なくとも一つのタスクが終了するのを待つには、WaitAnyメソッドを使用します。 
引数はtask[]で、返り値は(最初に)終了したタスクのインデックスです。 
 
例 
 
> let task0 = new Task(fun () ->(doWorkL 3L)) 
let task1 = new Task(fun () ->(doWorkL 1L)) 
let task2 = new Task(fun () ->(doWorkL 2L)) 
 
let tasks = [|task0;task1;task2|] 
tasks |> Array.iter (fun t -> t.Start())  
let index = Task.WaitAny tasks 
printfn "タスク配列のindex:%dのタスクが終了しました。" index 
Task.WaitAll tasks 
printfn "すべてのTask が終了しました";; 
 
 
引数1Lに対しlongTaskSubの計算が終了 
引数1Lに対しManagedThreadId = 6で仕事を終わらせました 
タスク配列のindex:1のタスクが終了しました。 
引数3Lに対しlongTaskSubの計算が終了 
引数3Lに対しManagedThreadId = 4で仕事を終わらせました 
引数2Lに対しlongTaskSubの計算が終了 
引数2Lに対しManagedThreadId = 5で仕事を終わらせました 
すべてのTask が終了しました 
 
val task0 : Task 
val task1 : Task 
val task2 : Task 
val tasks : Task [] = 
  [|System.Threading.Tasks.Task; System.Threading.Tasks.Task; 
    System.Threading.Tasks.Task|] 
val index : int = 1 
 
なおタスクの生成と開始はTask.Factory.StartNewメソッドを利用するとワンステップでおこなうことができます。 
なお返り値は生成され開始されたTaskです。 
 
> let task00 = Task.Factory.StartNew(fun () -> doWorkL 1L) 
task00.Wait();; 
引数1Lに対しlongTaskSubの計算が終了 
引数1Lに対しManagedThreadId = 11で仕事を終わらせました 
 
val task00 : Task 
 
これを用いると先ほどの例は次のように書き直すことができます。 
 
> let tasks = 
    [|(fun () -> doWorkL 3L);(fun () -> doWorkL 1L);(fun () -> doWorkL 2L)|] 
    |> Array.map (fun f -> Task.Factory.StartNew f) 
let index = Task.WaitAny tasks 
printfn "タスク配列のindex:%dのタスクが終了しました。" index 
Task.WaitAll tasks 
printfn "すべてのTask が終了しました" 
;; 
 
引数2Lに対しlongTaskSubの計算が終了 
引数2Lに対しManagedThreadId = 14で仕事を終わらせました 
タスク配列のindex:2のタスクが終了しました。 
引数1Lに対しlongTaskSubの計算が終了 
引数1Lに対しManagedThreadId = 13で仕事を終わらせました 
引数3Lに対しlongTaskSubの計算が終了 
引数3Lに対しManagedThreadId = 12で仕事を終わらせました 
すべてのTask が終了しました 
 
val tasks : Task [] = 
  [|System.Threading.Tasks.Task; System.Threading.Tasks.Task; 
    System.Threading.Tasks.Task|] 
val index : int = 2 

続きを読む

テーマ : プログラミング
ジャンル : コンピュータ

F#による並行プログラミング入門(11) PLINQ ForAll

 Parallel.ForやParallel.ForEachに対応するPLINQのメソッドはForAllです。今回はこのメソッドの紹介からです。 
 
シグネチャーは次のようになります。 
 
static member ForAll :  
        source:ParallelQuery<'TSource> *  
        action:Action<'TSource> -> unit  
 
いつも通りの時間だけかかる無意味関数を定義しておきます。 
 
> let longTaskSub r =  
    let res = ref 0  
    for i in 1 .. 100000 do  
      for j in 1 .. 1000*r do  
         res := !res + 1  
    printfn "引数%3dに対しlongTaskSubの計算が終了" r 
    !res  
 
let doWork i = 
    longTaskSub i |> ignore;; 
 
val longTaskSub : int -> int 
val doWork : int -> unit 
 
 
PLINQを用いて1から100までの数に対してdoWorkを呼び出してみます。 
 
> open System.Collections.Concurrent 
open System.Linq;; 
 
> System.Linq.ParallelEnumerable.ForAll((seq{1 .. 100}).AsParallel() ,(fun i -> doWork i)) 
;; 
引数  2に対しlongTaskSubの計算が終了 
引数  4に対しlongTaskSubの計算が終了 
引数  3に対しlongTaskSubの計算が終了 
引数  1に対しlongTaskSubの計算が終了 
引数  6に対しlongTaskSubの計算が終了 
引数  5に対しlongTaskSubの計算が終了 
以下略 
 
インスタンスメソッドとしても呼び出せます。 
 
> (seq{1 .. 100}).AsParallel().ForAll(fun i -> doWork i);; 
引数  3に対しlongTaskSubの計算が終了 
引数  1に対しlongTaskSubの計算が終了 
引数  4に対しlongTaskSubの計算が終了 
引数  2に対しlongTaskSubの計算が終了 
以下略 
 
PLINQでは通常並列化するかどうかはシステム任せなのですが、強制的に並列実行するように指定することもできます。これにはWithExcutionModeメソッドを用います。シグネチャーは次の通りです。 
 
static member WithExecutionMode :  
        source:ParallelQuery<'TSource> *  
        executionMode:ParallelExecutionMode -> ParallelQuery<'TSource>  
         
 
引数のタプル内のSystem.Linq.ParallelExecutionModeはクエリの実行モードで、強制的に並列実行するには、ここにParallelExcutionMode.ForceParallelismを渡します。 
 
では強制並列実行してみます。 
 
> ParallelEnumerable.ForAll([1 .. 100].AsParallel().WithExecutionMode(System.Linq.ParallelExecutionMode.ForceParallelism) ,fun i -> doWork i) 
;; 
実行結果略 
 
もしくは(ふつうはこっちを使いますが) 
 
> [1 .. 100].AsParallel().WithExecutionMode(System.Linq.ParallelExecutionMode.ForceParallelism).ForAll(fun i -> doWork i);; 
 
実行結果略 
 
PLINQではでは固定された並列化数で仕事をこなします。デフォルトではコア数ですがWithDegreeOfParallelismメソッドを利用するとこのスレッド数を指定できます。シグネチャーは次の通りです。 
 
static member WithDegreeOfParallelism :  
        source:ParallelQuery<'TSource> *  
        degreeOfParallelism:int -> ParallelQuery<'TSource>  
 
では並列化数を2にして実行してみます。 
 
> [1 .. 100].AsParallel().WithDegreeOfParallelism(2).ForAll(fun i -> doWork i);; 
 
ここで留意事項ですが、WithExecutionModeメソッドにしても、WithDegreeOfParallelismにしても返り値はParallelQuery<_>型なので、ドットでつないでいくことができます。 
 
例 
 
> [1 .. 100].AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).WithDegreeOfParallelism(2).ForAll(fun i -> doWork i) 
;; 
実行結果略

テーマ : プログラミング
ジャンル : コンピュータ

F#による並行プログラミング入門(10) PLINQ

 まずはLINQの復習を少し。 
IEnuenrable<'a>(seq<'a>)型の値を引数とする多数のメソッドがSystem.Linq.Enumerable名前空間に定義されていて、例えば 
 
> System.Linq.Enumerable.Select([1;2;3],(fun x -> 2*x));; 
val it : seq<int> = seq [2; 4; 6] 
 
とできるのでした。 
 
また引数のタプルの第一成分であるIEnuenrable<'a>(seq<'a>)型の値のインスタンスメソッドのようにも使えるような仕組みになっていて、 
 
> [1;2;3].Select(fun x -> 2+x);; 
val it : seq<int> = seq [2; 4; 6] 
 
というようにも使えるのでした。 
 
また、 
> open System;; 
> let select f s = Enumerable.Select(s,new Func<_,_>(f));; 
 
val select : ('a -> 'b) -> seq<'a> -> seq<'b> 
 
としておけば 
 
> [1;2;3] 
|> select (fun x -> 2*x);; 
val it : seq<int> = seq [2; 4; 6] 
 
とF#的に書くこともできるのでした。 
 
このようにLINQの素材となるのはIEnumenrable<'a>(seq<'a>)型の値だったのですが、 
IEnuenrable<'a>(seq<'a>)型の値を素材としてそれに対してparallelに処理を加えることができるのがPLINQです。(Parallel LINQ)これはSystem.Linq.ParallelEnumerable名前空間に定義されています。 
例えばこちら側のSelectは 
 
static member Select :  
        source:ParallelQuery<'TSource> *  
        selector:Func<'TSource, 'TResult> -> ParallelQuery<'TResult>  
 
引数のタプルの第一成分がParallelQuery<'TSource>型となっていますが、これはIEnumenrable<'TSource>型の値vにたいしてv.AsParallel()とすることで得ることができます。 
IEnumerable型を並行処理の素材用に変換するわけです。 
よって先ほどの例のPLINQ版は次の様に得ることができます。 
 
> let t = System.Linq.ParallelEnumerable.Select( [1;2;3].AsParallel() ,(fun x -> 2*x));; 
val t : ParallelQuery<int> 
 
> t;; 
val it : ParallelQuery<int> = seq [6; 2; 4] //並行処理なので順番は保障されない。 
 
結果はParallelQuery<_>型なのですがこれはlEnumerableインターフェイスを実装しているので、普通のIEnumerble型としても扱えます。 
 
> Seq.iter (fun x ->printfn "%d" x) t;; 
val it : unit = () 
 
また、これはさらに並行処理の素材としても使えます。 
> System.Linq.ParallelEnumerable.Select( t ,(fun x -> 2*x));; 
val it : ParallelQuery<int> = seq [4; 8; 12] 
 
 
さてLINQでは 
 System.Linq.Enumerable.Select([1;2;3],(fun x -> 2*x)) 
 を 
 [1;2;3].Select(fun x -> 2+x);; 
というように書くことができるのでしたが、PLINQでも同様の仕組みで 
System.Linq.ParallelEnumerable.Select( [1;2;3].AsParallel() ,(fun x -> 2*x)) 
を 
[1;2;3].AsParallel().Select(fun x -> 2*x) 
と書くことができます。 
 
> [1;2;3].AsParallel().Select(fun x -> 2*x);; 
val it : ParallelQuery<int> = seq [2; 4; 6] 
 
lEnumerable型の値の次に.AsParallel()を付けるとLINQのメソッドでなくPLINQのメソッドが呼び出せれるという仕組みになっています。 
 
最後にLINQで 
> let select f s = Enumerable.Select(s,new Func<_,_>(f));; 
val select : ('a -> 'b) -> seq<'a> -> seq<'b> 
 
としておいて 
 
> [1;2;3] 
|> select (fun x -> 2*x);; 
val it : seq<int> = seq [2; 4; 6] 
 
とF#的に書くことのPLINQ版ですが、 
 
> open System.Collections.Generic 
let p_select f (s:IEnumerable<'a>) = ParallelEnumerable.Select(s.AsParallel(),new Func<_,_>(f));; 
 
val p_select : 
  ('a -> 'a0) -> Collections.Generic.IEnumerable<'a> -> ParallelQuery<'a0> 
 
としておけば、 
 
> [1;2;3] 
|> p_select (fun x -> 2*x);; 
val it : ParallelQuery<int> = seq [2; 6; 4] 
 
とできます。 
 
(参考) 
実はこれを一般化したものがpseqモジュールとしてF# PowerPackに準備されています 
 
部分抜粋すると 
 
// Type abbreviation for parallel sequences. 
type pseq<'T> = ParallelQuery<'T> 
 
module PSeq =  
 
   // Converst a seq<'T> to a pseq<'T>. 
   //ParallelQuery<'T>型にすでに変換されていれば、そのままで、変換されていなければ変換する 
   let inline toP (s : seq<'T>) =  
       match s with 
       | :? pseq<'T> as p ->  p 
       | _ -> s.AsParallel() 
 
   let map f s  =  
       ParallelEnumerable.Select(toP(s), new Func<_,_>(f))  
 
となってます。 
 
これを使うと先ほどの例は 
 
[1;2;3] 
|> PSeq.map (fun x -> 2*x) 
 
となります。 

続きを読む

テーマ : プログラミング
ジャンル : コンピュータ

プロフィール

T GYOUTEN

Author:T GYOUTEN
F#と英単語とフリーソフトと読書に興味があります。
ホームページでフリーソフトも公開しています。どぞ御贔屓に。

最新記事
最新コメント
最新トラックバック
月別アーカイブ
カテゴリ
フリーエリア
フリーエリア
blogram投票ボタン
検索フォーム
RSSリンクの表示
リンク
ブロとも申請フォーム

この人とブロともになる

QRコード
QRコード
上記広告は1ヶ月以上更新のないブログに表示されています。新しい記事を書くことで広告を消せます。