スポンサーサイト

上記の広告は1ヶ月以上更新のないブログに表示されています。
新しい記事を書く事で広告が消せます。

F#による並行プログラミング入門(14) キャンセル(1) Parallel Loop,PLINQ

 今回は並行処理のキャンセルの紹介です。 
 
これにはCancellationTokenSourceクラスのインスタンスを使用します。まずは一つインスタンス化して代的プロパティを調べてみます。(CancellationTokenSourceとはいわばキャンセルの狼煙を挙げる火元です。) 
 
> let taskCTS0 = new CancellationTokenSource();; 
val taskCTS0 : CancellationTokenSource 
 
> taskCTS0;; 
val it : CancellationTokenSource = 
  System.Threading.CancellationTokenSource 
    {IsCancellationRequested = false; 
     Token = System.Threading.CancellationToken;} 
 
さらにTokenプロパティを調べてみます。 
 
> taskCTS0.Token;; 
val it : CancellationToken = 
  System.Threading.CancellationToken 
    {CanBeCanceled = true; 
     IsCancellationRequested = false; 
     WaitHandle = System.Threading.ManualResetEvent;} 
 
さてここでキャンセルの狼煙を上げてみます。 
 
> taskCTS0.Cancel();; 
val it : unit = () 
 
もう一度Tokenプロパティを調べてみます。 
> taskCTS0.Token;; 
val it : CancellationToken = 
  System.Threading.CancellationToken 
    {CanBeCanceled = true; 
     IsCancellationRequested = true; 
     WaitHandle = System.Threading.ManualResetEvent;} 
      
IsCancellationRequested プロパティがtrueに変化してます。 
 
ということで、並行処理のキャンセルの一つの方法は、並行処理側に「CancellationTokenSourceクラスのインスタンスのTokenプロパティで得られるCancellationToken型の値A」を渡して、「並行処理内ではAのIsCancellationRequestedプロパティを定期的に調べながら、もしこれがtrueの時は適切な処理を施してから処理を終了する」という仕組みで行う方法です。 
もう一つは「IsCancellationRequestedプロパティを調べて、もしfalseならそのまま処理を続行し、trueならば,例外を発生させるというCancellationToken型の値のインスタンスメソッドThrowIfCancellationRequested()」を定期的に実行するという方法です。(なおIsCancellationRequestedプロパティを定期的に調べるのにもコストがかかりますから、タイミング等は考える必要があります。) 
ここでは2番目の方法でやっていきます。 
 
 
ではParallel.Loop等順番にキャンセル例を挙げていきます。 
 
(1)Parallel.Loop 
 
Parallel.ForやParallel.ForEachやParallel.Invokeのキャンセルはオーバーロードされたうち次の形のようなParallelOptionsが引数の一部にあるものを利用します。 
 
public static ParallelLoopResult For( 
    int fromInclusive, 
    int toExclusive, 
    ParallelOptions parallelOptions, 
    Action<int> body 
 
public static ParallelLoopResult ForEach<TSource>( 
    IEnumerable<TSource> source, 
    ParallelOptions parallelOptions, 
    Action<TSource> body 
 
public static void Invoke( 
    ParallelOptions parallelOptions, 
    params Action[] actions 
 
ParallelOptions型の値はCancellationTokenプロパティをもつので、これに自分で生成したCancellationTokenSourceクラスのインスタンスのTokenプロパティをセットします。 
 
例 
> let taskCTS0 = new CancellationTokenSource() 
let options = new ParallelOptions() 
options.CancellationToken <- taskCTS0.Token 
//上の2行は次のようにしてもよい let options = new ParallelOptions(CancellationToken = taskCTS0.Token) 
 
//100ms後にキャンセルを発動 
Task.Factory.StartNew(fun () -> Thread.Sleep(100);taskCTS0.Cancel()) |> ignore 
Parallel.For(0,20,options,fun i -> printfn "%dの仕事を開始" i 
                                   Thread.Sleep(50) 
                                   options.CancellationToken.ThrowIfCancellationRequested()) |>ignore;; 
5の仕事を開始 
10の仕事を開始 
15の仕事を開始 
0の仕事を開始 
16の仕事を開始 
6の仕事を開始 
1の仕事を開始 
11の仕事を開始 
System.OperationCanceledException: 操作はキャンセルされました。 
   場所 System.Threading.Tasks.Parallel.ThrowIfReducableToSingleOCE(IEnumerable`1 excCollection, CancellationToken ct) 
   場所 System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Func`4 bodyWithLocal, Func`1 localInit, Action`1 localFinally) 
   場所 System.Threading.Tasks.Parallel.For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body) 
   場所 <StartupCode$FSI_0006>.$FSI_0006.main@() 
エラーのため停止しました 
ということで例外が発生してキャンセル成功です。(例外のさばき方は後日やります。) 
 
(2)PLINQ 
 
PLINQではWithCancellationを利用します。 
例 
 
 > open System 
open System.Threading 
open System.Threading.Tasks 
open System.Linq;; 
 
> let taskCTS0 = new CancellationTokenSource() 
//100ms後にキャンセルを発動 
Task.Factory.StartNew(fun () -> Thread.Sleep(100);taskCTS0.Cancel()) |> ignore 
[1 .. 19].AsParallel() 
         .WithCancellation(taskCTS0.Token) 
         .ForAll(fun i -> printfn "%dの仕事を開始" i 
                          Thread.Sleep(50) 
                          taskCTS0.Token.ThrowIfCancellationRequested() 
                 );; 
3の仕事を開始 
1の仕事を開始 
4の仕事を開始 
2の仕事を開始 
5の仕事を開始 
6の仕事を開始 
7の仕事を開始 
8の仕事を開始 
System.OperationCanceledException: WithCancellation に対して指定されたトークンを介してクエリが取り消されました。 
   場所 System.Linq.Parallel.CancellationState.ThrowWithStandardMessageIfCanceled(CancellationToken externalCancellationToken) 
   場所 System.Linq.Parallel.QueryTaskGroupState.QueryEnd(Boolean userInitiatedDispose) 
   場所 System.Linq.Parallel.SpoolingTask.SpoolForAll[TInputOutput,TIgnoreKey](QueryTaskGroupState groupState, PartitionedStream`2 partitions, TaskScheduler taskScheduler) 
   場所 System.Linq.Parallel.DefaultMergeHelper`2.System.Linq.Parallel.IMergeHelper<TInputOutput>.Execute() 
   場所 System.Linq.Parallel.MergeExecutor`1.Execute[TKey](PartitionedStream`2 partitions, Boolean ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, Boolean isOrdered, CancellationState cancellationState, Int32 queryId) 
   場所 System.Linq.Parallel.PartitionedStreamMerger`1.Receive[TKey](PartitionedStream`2 partitionedStream) 
   場所 System.Linq.Parallel.ForAllOperator`1.WrapPartitionedStream[TKey](PartitionedStream`2 inputStream, IPartitionedStreamRecipient`1 recipient, Boolean preferStriping, QuerySettings settings) 
   場所 System.Linq.Parallel.UnaryQueryOperator`2.UnaryQueryOperatorResults.ChildResultsRecipient.Receive[TKey](PartitionedStream`2 inputStream) 
   場所 System.Linq.Parallel.ScanQueryOperator`1.ScanEnumerableQueryOperatorResults.GivePartitionedStream(IPartitionedStreamRecipient`1 recipient) 
   場所 System.Linq.Parallel.UnaryQueryOperator`2.UnaryQueryOperatorResults.GivePartitionedStream(IPartitionedStreamRecipient`1 recipient) 
   場所 System.Linq.Parallel.QueryOperator`1.GetOpenedEnumerator(Nullable`1 mergeOptions, Boolean suppressOrder, Boolean forEffect, QuerySettings querySettings) 
   場所 System.Linq.Parallel.ForAllOperator`1.RunSynchronously() 
   場所 System.Linq.ParallelEnumerable.ForAll[TSource](ParallelQuery`1 source, Action`1 action) 
   場所 <StartupCode$FSI_0003>.$FSI_0003.main@() 
エラーのため停止しました
 
スポンサーサイト

テーマ : プログラミング
ジャンル : コンピュータ

コメントの投稿

非公開コメント

プロフィール

T GYOUTEN

Author:T GYOUTEN
F#と英単語とフリーソフトと読書に興味があります。
ホームページでフリーソフトも公開しています。どぞ御贔屓に。

最新記事
最新コメント
最新トラックバック
月別アーカイブ
カテゴリ
フリーエリア
フリーエリア
blogram投票ボタン
検索フォーム
RSSリンクの表示
リンク
ブロとも申請フォーム

この人とブロともになる

QRコード
QRコード
上記広告は1ヶ月以上更新のないブログに表示されています。新しい記事を書くことで広告を消せます。