【IRIS/Cache】並列処理(作業キュー・マネージャ)について(カスタマイズ前編)

本記事は、並列処理のカスタマイズについて解説致します。

※この記事は下記の方向けになります。
  • 並列処理に興味がある方
  • 基本編からのカスタマイズについて知りたい方

はじめに

前回の記事では、並列処理の基本として、3つの工程を解説しました。

前回のおさらい
  1. 作業キューの生成
  2. 作業キューに作業項目を追加
  3. 待機

上記各項目の中で複数の選択肢があり、それらを組み合わせて並列処理のシナリオを構築していく事になります

では、並列処理のカスタマイズについて解説致します。

① 作業キュー生成時のカスタマイズ

作業キュー作成時の引数は3つあります。

$system.WorkMgr.%New(qspec, numberjobs, category)
引数説明
qspec“-d”を設定すると、作業項目でwriteした内容が出力されない
numberjobsワーカ・ジョブ数の指定
categoryカテゴリの指定。初期値=Default
%New()の引数説明

基本編では、ワーカ・ジョブ数を指定し、第三引数の「カテゴリ」に関しては指定していませんでした。
 → カテゴリは指定しないと「Default」が選択されます。

カテゴリは、ワーカ・ジョブのグループになります。
カテゴリを設定する事で、ワーカ・ジョブの最大数等を制限する事ができます。

では、カテゴリを新規作成/編集してみましょう。

カテゴリ新規作成/編集

管理ポータル画面より、[システム管理] > [構成] > [システム構成] > [WQMカテゴリ] を選択します。

この画面に表示している「Default」が、初期値で使用している項目になります。
 ※ DefaultとSQLの2項目は削除できません。

新規作成する場合、画面右上の「カテゴリを作成」をクリックします。

必要な項目を入力・選択し、「保存」をクリックします。

項目名設定値説明
最大アクティブワーカー2アクティブなワーカ・ジョブの最大数。
デフォルトワーカー1作業キューを作成時、ワーカ・ジョブ数を指定しなかった時の初期値
最大ワーカー2作業キューの最大ワーカ・ジョブ数

カテゴリを使用する

先ほど作成したカテゴリ「Sample」を使用してみましょう。

【サンプルPG】
第2引数の「ワーカ・ジョブ数」はカテゴリで設定した最大数=2を超えた「4」を設定します。

ClassMethod cate(time As %Integer = 10)
{
	s workMgr = $system.WorkMgr.%New(,4,"Sample")
	
	w !,"全体開始:",$zdt($h,3,1)
	
	f cnt = 1:1:6 {
		d workMgr.Queue("..worker", cnt, time)
	}
	
	d workMgr.Sync( ,.err)
	
	w !,"全体終了:",$zdt($h,3,1)
}

ClassMethod worker(cnt As %Integer, time As %Integer) As %Status
{
	w !,"作業開始:",$j($j, 5), ", cnt:", $j(cnt,2), ", 時刻:",$zdt($h,3,1)	
	f pos=1:1:time { h 0.5  }
	w " ~ ",$zdt($h,3,1)
	q $$$OK
}

実行します。

全体開始:2025-06-28 21:07:13
作業開始: 6476, cnt: 2, 時刻:2025-06-28 21:07:13 ~ 2025-06-28 21:07:18
作業開始: 6360, cnt: 1, 時刻:2025-06-28 21:07:13 ~ 2025-06-28 21:07:18
作業開始: 6476, cnt: 3, 時刻:2025-06-28 21:07:18 ~ 2025-06-28 21:07:23
作業開始: 6360, cnt: 4, 時刻:2025-06-28 21:07:18 ~ 2025-06-28 21:07:23
作業開始: 6360, cnt: 5, 時刻:2025-06-28 21:07:23 ~ 2025-06-28 21:07:28
作業開始: 6476, cnt: 6, 時刻:2025-06-28 21:07:23 ~ 2025-06-28 21:07:28
全体終了:2025-06-28 21:07:28

インスタンス化した際のワーカ・ジョブ数は4に設定しましたが、実際に動作したワーカ・ジョブは2個になっています。

これは、カテゴリ「Sample」の「最大ワーカ」の制限によるものです。

このようにカテゴリの設定は、設定ミス等によるCPUの高負荷に対応する事も可能です。

アクティブ・ワーカ(ActiveWorkers)とは

アクティブ・ワーカとは、作業項目を処理しているワーカ・ジョブを指しています。

作業キューから、アクティブ・ワーカ数を取得するには、「NumActiveWorkers or NumActiveWorkersGet() 」を利用します。

【サンプルPG】

ClassMethod sample(time As %Integer = 10)
{
	s workMgr = $SYSTEM.WorkMgr.%New("-d",2)
		
	f cnt = 1:1:6 d workMgr.Queue("..worker", cnt, ($r(3)+1)*time)
	h 1
	w !,workMgr.NumActiveWorkers      // 1が返る
	w !,workMgr.NumActiveWorkersGet() // 1が返る

	d workMgr.Sync()
}

ただ、この「NumActiveWorkers」は、実際のワーカ・ジョブの数より「-1」少ない値が返るようです。
 ※サンプルPGでは、「1」がターミナルに表示されます。

では、実際にどのプロセスが作業項目として動作しているのでしょうか?

キューの管理は、グローバル「^IRIS.WorkQueue」で行っています。
そのため、このグローバルにアクセスすると、アクティブ・ワーカ数とプロセスIDが取得できます。

【コマンド】
下記をターミナルで実行してください。

GetWorkers(category)	;
	s job="", num=0
	f { s job = $o(^IRIS.WorkQueue("Worker",job), 1, data) q:job=""
		s cat = $lg(data, 1)
		, group=$zu(61, 12, $lg(data, 2))
		
		w:(cat[category)&&(group>0) !,"jobno:",job
	}
d GetWorkers("Default")

作業キューを処理している最中に、別のターミナルで実行すると、下記が表示されました。

【実行結果】
jobno:6920
jobno:8076

実際には、2つのワーカ・ジョブが動作している事が分ります。

② 作業項目追加時のカスタマイズ

基本編では、作業キューに作業項目を追加するのに、関数「Queue」を紹介していますが、もう一つ追加する方法があります。

それは関数「QueueCallback」です。

QueueCallback

関数「Queue」と異なり「QueueCallback」は、その名の通りコールバック関数を設定する事が可能です。

これにより、作業項目の処理が完了した後に、追加の処理を実行する事が可能です。

【サンプルPG】
作業項目である関数「worker」を処理した後、関数「callBack」を実行する流れです。

ClassMethod callBackTest(time As %Integer = 10)
{
	w !,"全体開始:",$zdt($h,3,1)

	s workMgr = $system.WorkMgr.%New(,2)
	f cnt = 1:1:6 {
		d workMgr.QueueCallback("..worker", "##class(developer.parallel.Sample).callBack", cnt, time)
	}	
	d workMgr.Sync( ,.err)

	w !,"全体終了:",$zdt($h,3,1)
}
ClassMethod callBack(cnt As %Integer, time As %Integer) As %Status
{
	w !,"	callback:",%job, ", cnt:", cnt
	w ", %Status:"_%status
	w ", 作業キュー:"_%workqueue.NumWorkers
	q $$$OK
}

実行します。

太字のテキストが、コールバック関数で出力した内容になります。
作業項目の完了後に実行されているのが分かります。

全体開始:2025-06-29 16:07:45
作業開始: 2204, cnt: 2, 時刻:2025-06-29 16:07:45 ~ 2025-06-29 16:07:50
callback:2204, cnt:2, %Status:1, 作業キュー:2
作業開始: 1936, cnt: 1, 時刻:2025-06-29 16:07:45 ~ 2025-06-29 16:07:50
callback:1936, cnt:1, %Status:1, 作業キュー:2
作業開始: 2204, cnt: 3, 時刻:2025-06-29 16:07:50 ~ 2025-06-29 16:07:55
callback:2204, cnt:3, %Status:1, 作業キュー:2
作業開始: 1936, cnt: 4, 時刻:2025-06-29 16:07:50 ~ 2025-06-29 16:07:55
callback:1936, cnt:4, %Status:1, 作業キュー:2
作業開始: 2204, cnt: 5, 時刻:2025-06-29 16:07:55 ~ 2025-06-29 16:08:00
callback:2204, cnt:5, %Status:1, 作業キュー:2
作業開始: 1936, cnt: 6, 時刻:2025-06-29 16:07:55 ~ 2025-06-29 16:08:00
callback:1936, cnt:6, %Status:1, 作業キュー:2
全体終了:2025-06-29 16:08:00

QueueCallbackの使い方

コールバック関数の仕様になります。

  • コールバック関数は、QueueCallbackの第2引数に設定する
  • コールバック関数の引数は、作業項目の引数と同じ項目・同じ値になる
  • “##class(クラス名).メソッド名”で登録する

コールバック関数内では、下記パブリック変数が使用可能です。

パブリック変数名説明
%job実行した作業項目のプロセスID
%status作業項目の戻り値
%workqueue作業キューのインスタンス
コールバック関数内でしようできるパブリック変数一覧

③ 待機方法のカスタマイズ

待機時方法に関しては、「Sync, WaitForComplete(旧)」「Wait」「WaitOne」があり、並列処理のシナリオに沿って選択が可能です。

一つ一つ確認していきましょう。

Sync, WaitForComplete

全ての作業項目が完了するまで待機します。
一番シンプルな実装になります。

「Sync」と「WaitForComplete」の両関数に機能差はありません。

「WaitForComplete」が内部で「Sync」を呼んでいるだけなので、コーディング量が少ない「Sync」を使用すれば良いと思います。

s sts = workMgr.Sync(qspec, .errorlog)
s sts = workMgr.WaitForComplete(qspec, .errorlog)
引数説明
qspec現在不具合中…
指定しない方が良い
 → 指定するとシステムの設定を採用してしまう
errorlog作業項目のエラー・メッセージが返る
 ※エラー内容の構成は後述
Sync()の引数説明

戻り値の%Statusは、作業項目で発生した「エラー%Status」が全て含まれています。

第2引数のerrorLogは、下記内容が返ってきます。

【エラー内容の構成】
err=[エラーが発生した作業項目数]
err(1)=[エラー内容]
err(1,”caller”)=[エラー発生個所]
err(1,”code”)=[エラー・コード]
err(1,”dcode”)=[エラー・コード]
err(1,”domain”)=”%ObjectErrors”
err(1,”namespace”)=[ネームスペース]
err(1,”param”)=1
err(1,”param”,1)=[エラー発生個所]
err(1,”stack”)=$lb([スタック情報])

Wait

設定した「timeout」が経過するか、作業項目が完了すると処理が呼び出し元に戻ります。
複数の作業項目がある場合は、ループ処理を行います。

s sts = workMgr.Wait(qspec, .atend, timeout)

全ての作業項目が完了すると、引数「atend」がtrueになります。
ループを抜けてください。

下記サンプルは、8秒間隔で待機する挙動になります。

【サンプルPG】

ClassMethod waitTest(time As %Integer = 20)
{
	s workMgr = $system.WorkMgr.%New(,2)
	w !,"全体開始:",$zdt($h,3,1),!
	
	f cnt = 1:1:6 {
		d workMgr.Queue("..worker", cnt, time)
	}
	
	// 8秒間隔で待機
	f {
		w !,"	Wait開始:",$p($zdt($h,3,1)," ", 2)
		d workMgr.Wait(, .end, 8)
		w !,"	Wait終了:",$p($zdt($h,3,1)," ", 2),!
		
		i (end) w !!,"全体終了:",$zdt($h,3,1) 	q		
	}
}

実行します。

最初の8秒待機では、作業項目が完了しなかった為空振りしてるのが分かります。

全体開始:2025-06-30 21:03:46

  Wait開始:21:03:46
  Wait終了:21:03:54 ← 8秒間隔では作業項目が終了しなかった

  Wait開始:21:03:54
作業開始: 5384, cnt: 2, 時刻:2025-06-30 21:03:46 ~ 2025-06-30 21:03:56
  Wait終了:21:03:56 ← 2秒後に1つ目の作業項目が完了

  Wait開始:21:03:56
作業開始: 4492, cnt: 1, 時刻:2025-06-30 21:03:46 ~ 2025-06-30 21:03:56
  Wait終了:21:03:56 ← 2つ目の作業項目も完了したので、即待機完了

  Wait開始:21:03:56
  Wait終了:21:04:04 ← 2秒後に1つ目の作業項目が完了

~~ 略 ~~

全体終了:2025-06-30 21:04:16

若しくは、timeout=0に設定しコマンド「hang」を使用してポーリングする事も可能です。

また、先ほどの設定と異なり挙動も変化します。
timeout=8に設定した時は、1ループ1作業項目の出力でしたが、timeout=0では、完了した全ての作業項目が出力されます。

並列処理のシナリオにそって選択してください。

	// 1秒間隔でのポーリング
	f {
		w !,"	Wait開始:",$p($zdt($h,3,1)," ", 2)
		d workMgr.Wait(, .end, 0)
		w !,"	Wait終了:",$p($zdt($h,3,1)," ", 2),!
		
		i (end) w !!,"全体終了:",$zdt($h,3,1) 	q
		
		h 1	
	}

QueueCallbackとの組み合わせ

QueueCallbackとの組み合わせでは、timeout=-1(初期値)に設定する事で、コールバック関数の完了までを待機します。

【サンプル】

ClassMethod callBackTest(time As %Integer = 10)
{
	w !,"全体開始:",$zdt($h,3,1)

	s workMgr = $system.WorkMgr.%New(,2)	
	f cnt = 1:1:6 {
		d workMgr.QueueCallback("..worker", "##class(developer.parallel.Sample).callBack", cnt, time)
	}
	d workMgr.Sync( ,.err)
	
	w !,"全体終了:",$zdt($h,3,1)

	// 1秒間隔でポーリング
	f {
		d workMgr.Wait(, .end, -1)
		i (end) w !!,"全体終了:",$zdt($h,3,1) 	q
		
		h 1
	}
}

ClassMethod callBack(cnt As %Integer, time As %Integer) As %Status
{
	w !,"	callback:",%job, ", cnt:", cnt
	w ", %Status:"_%status
	w ", 作業キュー:"_%workqueue.NumWorkers
		
	s %exit  = 1
	q $$$OK
}

WaitOne

1 つの作業単位が完了するか、タイムアウトになるまで待機します。

s sts = workMgr.WaitOne(timeout, .worksc, .workargs, .workresult)
引数説明
timeoutタイムアウト秒(初期値=9999)
worksc作業項目の戻り値(%Status)
workargs作業項目の引数
workresult作業項目でセットした値
WaitOneの引数説明

Waitと大きく異なるのは、下記3点です。

Waitとの相違点
  • タイムアウトになった」事を受け取る
  • 作業項目への引数が受け取れる
  • 作業項目からのデータが受け取れる

これら3点をサンプルPGを通して確認してみます。

【サンプル】

ClassMethod wateOne(timeout As %Integer = 60, wait As %Integer = 10)
{
	s workMgr = $system.WorkMgr.%New(,3)
	
	w "全体開始:",$zdt($h,3,1),!

	f cnt = 1:1:6 {
		d workMgr.Queue("..child3", cnt, wait)
	}

	// 1秒間隔でポーリング
	s outTime = timeout
	s start = $zh
	w "ポーリング開始:",$zdt($now(),3,1,6),!!
	while (workMgr.WaitOne(outTime, .sts, .args, .result)) {
		If ($$$ISERR(sts)) {
			i ($$$ERRORISTYPE(sts, $$$MultiTimeout)) {
				// タイムアウト判定
				d workMgr.Clear()
				w !,"	> TimeOut!!:",$zdt($now(),3,1,6),", 経過:", ($zh-start),!
			}else{
				// 作業項目のエラー
				w !,"	> Error!!:",$zdt($now(),3,1,6),", 経過:", ($zh-start),!
			}
			w !,$system.Status.GetErrorText(sts)
		}else{
			s outTime = timeout - ($zh - start)
			w !,"	> 完了:",$zdt($now(),3,1,6),", 経過:", ($zh-start),", timeout:",outTime,!
		}
		
		zw args
		zw result
	}
	w !!,"全体終了:",$zdt($h,3,1),", 経過時間:",($zh - start)
}

ClassMethod child3(cnt As %Integer, wait As %Integer) As %Status
{
	s sts = $$$OK
	
	try {
		w !,"child開始:",$j($j, 5), ", cnt:", $j(cnt,2), ", 時刻:",$zdt($h,3,1)

		s time = cnt * wait
		f i=1:1:time h 1
		
		s %result(1) = $zdt($h,3,1)
		
		w "~ ",$zdt($h,3,1)
	} catch e {
		s sts = e.AsStatus()
	}
	
	q sts
}

実行します。

作業項目の引数・作業項目でセットした値・タイムアウトが確認できます。

全体開始:2025-07-02 19:15:23
ポーリング開始:2025-07-02 19:15:23.522896

child開始: 6456, cnt: 1, 時刻:2025-07-02 19:15:23~ 2025-07-02 19:15:33
  > 完了:2025-07-02 19:15:33.621022, 経過:10.098168, timeout:49.901871
args(1)=1  ←作業項目への引数
args(2)=10  ←作業項目への引数
result(1)=”2025-07-02 19:15:33″ ←作業項目で作成した値

child開始: 7032, cnt: 2, 時刻:2025-07-02 19:15:23~ 2025-07-02 19:15:43
  > 完了:2025-07-02 19:15:43.685973, 経過:20.163115, timeout:39.836908
args(1)=2
args(2)=10
result(1)=”2025-07-02 19:15:43″

child開始: 4604, cnt: 3, 時刻:2025-07-02 19:15:23~ 2025-07-02 19:15:53
  > 完了:2025-07-02 19:15:53.769282, 経過:30.246427, timeout:29.753608
args(1)=3
args(2)=10
result(1)=”2025-07-02 19:15:53″

child開始: 6456, cnt: 4, 時刻:2025-07-02 19:15:33~ 2025-07-02 19:16:13
  > 完了:2025-07-02 19:16:13.904506, 経過:50.381652, timeout:9.618388
args(1)=4
args(2)=10
result(1)=”2025-07-02 19:16:13″

  > TimeOut!!:2025-07-02 19:16:23.542715, 経過:60.019862
エラー #7881: ‘WaitOne’ メソッドから 9.591414s 待機中にタイムアウトしました。

全体終了:2025-07-02 19:16:23, 経過時間:60.020147

タイムアウトの場合は、関数「Clear」がセットになると思います。
 ※関数「Clear」別の記事で記載します。

指定の時刻で並列処理を終了したい場合は、現在の時刻から算出してtimeoutに設定する必要があります。
狙った時間に停止させられるのは便利ですね。

どれを使おう・・・

シナリオに沿って関数を選択すればよいのですが、やはり選択肢があると迷いますよね。

そこで、筆者が考えるフローを作成してみました。
待機関数を選択する際の参考にしてみて下さい。

■作業項目を追加した後は、ただ待つだけでいい
  ├ Yes → 「Sync
  └ No
    └ ■一定時間後に強制終了したい・作業項目からデータを受け取りたい
        ├ Yes → 「WaitOne
        └ No  → 「Wait

「Wait」はポーリングしつつ、待機中に色々操作したい時に有効です。
次回の記事も参考にしてみて下さい。

おわりに

いかがだったでしょうか。

本記事は、並列処理のカスタマイズについて触れました。

ただでさえ簡単なコーディングで並列処理が実行できるのに、色々な選択肢からシナリオに合わせた構築が可能なので魅力的です。

次回は、さらに「カスタマイズ後編」について触れたいと思います。

本記事が、何かの参考になれば幸いです。