【IRIS/Cache】並列処理(作業キュー・マネージャ)について(カスタマイズ後編)

本記事は、並列処理のカスタマイズ(後編)について解説致します。

※この記事は下記の方向けになります。
  • 並列処理に興味がある方
  • 基本編からのカスタマイズについて知りたい方

はじめに

前回の記事では、並列処理の3工程に対する選択肢を解説しました。

並列処理の3工程
  1. 作業キューの生成
  2. 作業キューに作業項目を追加
  3. 待機

本記事は、カスタマイズ後編として、作業キューの「停止」「修了」や、作業項目の「前処理・後処理」等を解説致します。

作業キューの一時停止と再開

作業キューには、「作業項目の開始」を停止する機能があります。

処理中の作業項目を停止する事はできません
あくまで、まだ開始していない作業項目の実行を行わないだけです。

【サンプル】

ClassMethod pauseTest(time As %Integer = 10, stopTime As %Integer = 10)
{
	s workMgr = $system.WorkMgr.%New(,3)
	
	w "全体開始:",$p($zdt($h,3,1)," ",2),!!

	f cnt = 1:1:6 {
		d workMgr.Queue("..worker", cnt, time)
	}
	
	h 1
	w "停止開始:",$p($zdt($h,3,1)," ",2),!
	w "Pause:",workMgr.Pause(stopTime, .comp), ", Comp:",comp,!
	w "停止終了:",$p($zdt($h,3,1)," ",2),!!
	

	f pos=1:1:20 { h 0.5 }
	w "Resume:",workMgr.Resume()," , time:"_$p($zdt($h,3,1)," ",2),!


	// 完了まで待機
	d workMgr.Sync()
}

【time=10, stopTime=10での実行結果】
全体開始:22:19:52

停止開始:22:19:53
Pause:1, Comp:1
停止終了:22:19:58 ← 5秒間の停止で、先行していた作業項目が完了

Resume:1 , time:22:20:08 ← 再開時刻

作業開始: 3988, cnt: 1, 時刻:2025-07-08 22:19:52 ~ 2025-07-08 22:19:58
作業開始: 5692, cnt: 3, 時刻:2025-07-08 22:19:52 ~ 2025-07-08 22:19:58
作業開始: 4608, cnt: 2, 時刻:2025-07-08 22:19:52 ~ 2025-07-08 22:19:58
作業開始: 4608, cnt: 6, 時刻:2025-07-08 22:20:08 ~ 2025-07-08 22:20:13
作業開始: 3988, cnt: 5, 時刻:2025-07-08 22:20:08 ~ 2025-07-08 22:20:13
作業開始: 5692, cnt: 4, 時刻:2025-07-08 22:20:08 ~ 2025-07-08 22:20:13

一時停止(Pause)

timeout設定中に動作している作業項目が完了したら、completed=1となります。

s status = workMgr.Pause(timeout, .completed)

timeout設定中に動作している作業項目が完了しない場合、completed=0となります。

【time=10, stopTime=2での実行結果】
全体開始:22:34:30

停止開始:22:34:31
Pause:1, Comp:0 ← timeout中に作業項目の処理が完了しなかったため「0」が返る
停止終了:22:34:33

Resume:1 , time:22:34:43

作業開始: 6332, cnt: 1, 時刻:2025-07-08 22:34:30 ~ 2025-07-08 22:34:35
作業開始: 6988, cnt: 2, 時刻:2025-07-08 22:34:30 ~ 2025-07-08 22:34:35
作業開始: 852, cnt: 3, 時刻:2025-07-08 22:34:30 ~ 2025-07-08 22:34:35
作業開始: 852, cnt: 6, 時刻:2025-07-08 22:34:43 ~ 2025-07-08 22:34:48
作業開始: 6332, cnt: 4, 時刻:2025-07-08 22:34:43 ~ 2025-07-08 22:34:48
作業開始: 6988, cnt: 5, 時刻:2025-07-08 22:34:43 ~ 2025-07-08 22:34:48

再開(Resume)

一時停止した作業キューは、「再開」関数を実行することで、停止が解除されます。

また、再開関数を実行しなくても、Sync等の待機系関数でも停止が解除されます。

s status = workMgr.Resume()

作業キューの停止

「一時停止」関数では、実行中の作業項目にたいし処理の停止を行うことができませんでしたが、「停止」関数は、実行中の作業項目も停止(終了)する事が可能です。

また、n秒後に全作業項目を停止させる事も可能です。
下記サンプル25行目を参照してください。

s outTime = timeout - ($zh - start)

指定時間から経過秒を減算し、WaitOneに再設定する事により、指定時間に停止する仕様になります。

【サンプル】

ClassMethod wateOne(timeout As %Integer = 60, wait As %Integer = 10)
{
	s workMgr = $system.WorkMgr.%New(,3)
	w "全体開始:",$p($zdt($h,3,1)," ",2),!

	f cnt = 1:1:6 {
		d workMgr.Queue("..child3", cnt, wait)
	}

	s outTime = timeout
	s start = $zh
	w "WaitOne開始:",$p($zdt($h,3,1)," ",2),!!
	while (workMgr.WaitOne(outTime, .sts, .args, .result)) {
		If ($$$ISERR(sts)) {
			i ($$$ERRORISTYPE(sts, $$$MultiTimeout)) {
				// 終わらなかった
				d workMgr.Clear()
				w !,"	> TimeOut!!:",$p($zdt($h,3,1)," ",2),", 経過:", ($zh-start),!
			}else{
				// 作業項目のエラー
				w !,"	> Error!!:",$p($zdt($h,3,1)," ",2),", 経過:", ($zh-start),!
			}
			w !,$system.Status.GetErrorText(sts)
		}else{
			s outTime = timeout - ($zh - start)
			w !,"	> 完了:",$p($zdt($h,3,1)," ",2),", 経過:", ($zh-start),", timeout:",outTime,!
		}
		
		w !
		zw result
	}
	w !!,"全体終了:",$p($zdt($h,3,1)," ",2),", 経過時間:",($zh - start)
}

ClassMethod child3(cnt As %Integer, wait As %Integer) As %Status
{
	s sts = $$$OK
	
	try {
		w !,"child開始:",$j($j, 5), ", cnt:", $j(cnt,2), ", 時刻:",$p($zdt($h,3,1)," ",2)

		s time = cnt * wait
		f i=1:1:time h 1
		s %result(1) = $p($zdt($h,3,1)," ",2)
		
		w "~ ",$p($zdt($h,3,1)," ",2)
	} catch e {
		s sts = e.AsStatus()
	}
	
	q sts
}

【time=60, wait=10での実行結果】
全体開始:08:44:50
WaitOne開始:08:44:50

child開始: 4744, cnt: 1, 時刻:08:44:50~ 08:45:00
  > 完了:08:45:00, 経過:10.118609, timeout:49.881404
result(1)=”08:45:00″

child開始: 3588, cnt: 2, 時刻:08:44:50~ 08:45:10
  > 完了:08:45:10, 経過:20.202827, timeout:39.797189
result(1)=”08:45:10″

child開始: 2312, cnt: 3, 時刻:08:44:50~ 08:45:20
  > 完了:08:45:20, 経過:30.269829, timeout:29.730183
result(1)=”08:45:20″

child開始: 4744, cnt: 4, 時刻:08:45:00~ 08:45:40
  > 完了:08:45:40, 経過:50.404696, timeout:9.595324
result(1)=”08:45:40″

  > TimeOut!!:08:45:50, 経過:60.012425

エラー #7881: ‘WaitOne’ メソッドから 9.595324s 待機中にタイムアウトしました。

全体終了:08:45:50, 経過時間:60.012571

停止(Clear)

ドキュメントには、下記が記載されていますが(2025-07-11時点)、実際はそんな機能はありません。
Clear()実行後、即停止します。

【ドキュメント】 2025-07-11 時点
タイムアウト期間timeout(秒)を指定すると、このメソッドはワーカ・ジョブが現在のタスクを完了するまで待機してから、ジョブを強制終了します。

d workMgr.Clear(timeout)

そのため、待機時間が必要な場合は、サンプルPGの様に「WaitOne()」や「Wait()」と組み合わせる必要があります。

セットアップ(準備)・ティアダウン(片付け) / クリーンアップ

セットアップ(準備)・ティアダウン(片付け)

ワーカ・ジョブの開始時に「セットアップ」を実行し、終了時に「ティアダウン」が実行されます。

セットアップとティアダウンでは、下記対応を行うと都合が良いとドキュメントに記載されています。

セットアップ・ティアダウンで実行しそうな処理
  • パブリック変数の設定/削除
  • プロセス・グローバル、グローバルの設定/削除
  • グローバルのロック・解除

作業項目を実行する為の前準備と後片付けですね。

【サンプル】

ClassMethod setUpTest(time As %Integer = 2)
{
	s workMgr = $system.WorkMgr.%New(,2)
	k ^parallel
	
	// セットアップ・ティアダウン
	s sts = workMgr.Setup("..setup", "1.start")
	s sts = workMgr.TearDown("..teardown", "2.end", "サンプル")
	
	f cnt = 1:1:6 d workMgr.Queue("..testSetTear", cnt, time)	

	f {
		d workMgr.Wait(, .end, 5) q:(end)
	}
}

ClassMethod setup(item As %String) As %Status
{
	s ^||parallel(1)=item_" てすと1"
	s ^||parallel(2)=item_" てすと2"
	s ^||parallel(3)=item_" てすと3"
	s ^||parallel(4)=item_" てすと4"
	s ^||parallel(5)=item_" てすと5"
	s ^||parallel(6)=item_" てすと6"
	q $$$OK
}

ClassMethod teardown(item As %String, sample As %String) As %Status
{
	s cnt=""
	f { s cnt = $o(^||parallel(cnt),1,data) q:cnt=""
		s ^parallel($j, $now(), cnt)=item_"-"_data_sample
	}
	k ^||parallel
	q $$$OK
}

【time=2での実行結果】
作業開始:1.start てすと1
作業開始:1.start てすと2
作業開始:1.start てすと3
作業開始:1.start てすと4
作業開始:1.start てすと5
作業開始:1.start てすと6

【^parallelグローバル】
^parallel(672,”67407,63711.6605425″,2)=”2.end-1.start てすと2サンプル”
^parallel(672,”67407,63711.6606164″,3)=”2.end-1.start てすと3サンプル”
^parallel(672,”67407,63711.6607052″,6)=”2.end-1.start てすと6サンプル”
^parallel(4296,”67407,63711.66058″,1)=”2.end-1.start てすと1サンプル”
^parallel(4296,”67407,63711.6606852″,4)=”2.end-1.start てすと4サンプル”
^parallel(4296,”67407,63711.6607031″,5)=”2.end-1.start てすと5サンプル”

ワーカ・ジョブが2つなので、各プロセス毎に1回づつ「セットアップ」と「ティアダウン」が実行されているのが分かります。

クリーンアップ

全ての作業項目が終了した段階で1回実行され、作業項目で発生した諸々の後片付けを行います。

作業キュー自体は、関数を実行しない限り作業項目の完了を感知できませんが、クリーンアップ関数を設定しておく事で、非同期で実行されます。

【サンプル】

ClassMethod cleanUpTest(time As %Integer = 2)
{
	k ^parallel
	f cnt=1:1:10 s ^parallel(cnt)=$now()
	
	s workMgr = $system.WorkMgr.%New(,2)
	
	s sts = workMgr.Cleanup("..cleanup", $na(^parallel))
	
	f cnt = 1:1:4 d workMgr.Queue("..testClean", cnt, time)	

	f { d workMgr.Wait(, .end, 5) q:(end) }
}

ClassMethod cleanup(gbl As %String) As %Status
{
	k @gbl
	q $$$OK
}

ClassMethod testClean(cnt As %Integer, time As %Integer) As %Status
{
	w !,"作業開始:",$g(^parallel(cnt))
	f pos=1:1:time { h 0.5  }
	q $$$OK
}

【time=2での実行結果】
作業開始:67407,64595.9352049
作業開始:67407,64595.9352698
作業開始:67407,64595.9352806
作業開始:67407,64595.9352891

【^parallelグローバル】
なし!

並列処理のシナリオ上で後片付け処理がある場合は、設定しておくことをお勧めします。

クリーンアップの動作確認

クリーンアップの関数説明に、「Setup() を呼び出したワーカーでは実行されないことに注意してください。」と記載があります。

念のため確認しましょう。

【サンプル】

ClassMethod mixTest(time As %Integer = 2)
{
	k ^parallel
	s workMgr = $system.WorkMgr.%New("-d",2)
	
	// セットアップ・ティアダウン
	s sts = workMgr.Setup("..mix", "setup")
	s sts = workMgr.TearDown("..mix", "teardown")
	s sts = workMgr.Cleanup("..mix", "cleanup")

	f cnt = 1:1:6 d workMgr.Queue("..testSetTear", cnt, time)	

	f {	d workMgr.Wait(, .end, 5) q:(end) }
}

ClassMethod mix(keyStr As %String) As %Status
{
	s ^parallel(keyStr, $j) = $p($zdt($now(),3,1,6)," ", 2)
	q $$$OK
}

【^parallelグローバル】
^parallel(“cleanup”,4044)=”18:11:26.377244″
^parallel(“setup”,668)=”18:11:23.293609″
^parallel(“setup”,2888)=”18:11:23.294632″
^parallel(“teardown”,668)=”18:11:26.376454″
^parallel(“teardown”,2888)=”18:11:26.376452″

ワーカ・ジョブとは異なるプロセスで動作している事が分ります。
クリーンアップ関数では、プロセス固有で動作する処理(プロセス・グローバル等)は行わないで下さい。

デタッチ・アタッチ

デタッチ」は、作業キューから作業項目を抜き取り、作業キューのプロセスを開放する事が可能です。

アタッチ」は、開放した作業キューを復元する事ができます。

【サンプル】

ClassMethod detachTest(time As %Integer = 5, stop As %Integer = 20)
{
	k ^parallel
	s workMgr = $system.WorkMgr.%New(,2)
	
	f cnt = 1:1:6 d workMgr.Queue("..setGlobal", cnt, time)
	
	h 2
	s sts = workMgr.Detach(.token, 3600)
	w !,"デタッチ実行:"_$p($zdt($h,3,1)," ",2),!
	w !,"NumWorkers:",workMgr.NumWorkers,
	  ", NumActiveWorkers:",workMgr.NumActiveWorkers,
	  ", WorkQueueCount:",workMgr.WorkQueueCount,!
	
	w "停止開始:",$p($zdt($h,3,1)," ",2),!
	h stop
	w "停止終了:",$p($zdt($h,3,1)," ",2),!!
	
	
	// アタッチ
	w !,"アタッチ開始:"_$p($zdt($h,3,1)," ",2),!
	s workMgr2 = $system.WorkMgr.Attach(token, .sts)
	w !,"NumWorkers:",workMgr2.NumWorkers,
	  ", NumActiveWorkers:",workMgr2.NumActiveWorkers,
	  ", WorkQueueCount:",workMgr2.WorkQueueCount,!
	d workMgr2.Sync()
	w !!,"全体終了:",$p($zdt($h,3,1)," ",2)
}

ClassMethod setGlobal(cnt As %Integer, time As %Integer) As %Status
{
	w !,"作業開始:",$j($j, 5), ", cnt:", $j(cnt,2), ", 時刻:",$p($zdt($h,3,1)," ",2)
	s ^parallel(cnt) = $p($zdt($h,3,1)," ",2)
	f pos=1:1:time { h 0.5  }
	s ^parallel(cnt) = ^parallel(cnt)_" ~ "_$p($zdt($h,3,1)," ",2)
	w " ~ ",$p($zdt($h,3,1)," ",2)
	q $$$OK
}

【time=2, stop=20での実行結果】
デタッチ実行:20:40:26
NumWorkers:2, NumActiveWorkers:-1, WorkQueueCount:6
停止開始:20:40:26
停止終了:20:40:46


アタッチ開始:20:40:46 ←アタッチ開始時刻の方が作業項目完了時刻よりも後
NumWorkers:2, NumActiveWorkers:1, WorkQueueCount:6

作業開始: 5300, cnt: 1, 時刻:20:40:24 ~ 20:40:27
作業開始: 2452, cnt: 2, 時刻:20:40:24 ~ 20:40:27
作業開始: 5300, cnt: 3, 時刻:20:40:27 ~ 20:40:29
作業開始: 2452, cnt: 4, 時刻:20:40:27 ~ 20:40:29
作業開始: 5300, cnt: 5, 時刻:20:40:29 ~ 20:40:32
作業開始: 2452, cnt: 6, 時刻:20:40:29 ~ 20:40:32

全体終了:20:40:46

デタッチを行うと「NumActiveWorkers」が-1になっているのが分かります。
また、デタッチを行っても作業項目の実行は継続している事が確認できます。

デタッチ

デタッチを行うことで、トークン(token)を取得できます。
また、トークンの有効期間を第2引数「timeout」で指定します。

s sts = workMgr.Detach(.token, [timeout])

【トークン】
0cUPR9n+hh3cKVAkByOVp1JU5/kpwbAPuY21EYFW+dk

トークンは、こんな感じの文字列です。

アタッチ

デタッチで取得したトークンをアタッチの第1引数に設定する事で、作業キューを取得する事が可能です。

この作業キューから、キャッシュしたwrite文や、作業項目の進捗を取得する事が可能です。

s workMgr = $system.WorkMgr.Attach([token], [.sts])

おわりに

いかがだったでしょうか。

本記事では、並列処理のカスタマイズ編・後編として、作業キューの一時停止/再開、強制停止、セットアップ/ティアダウン/クリーンアップ、さらにはデタッチ/アタッチのような高度な機能について解説しました。

これらの機能を活用することで、単純な並列処理に留まらず、より柔軟で堅牢な処理設計が可能になります。

今後、より複雑な並列処理の設計に取り組む際に、本記事が少しでも皆さまのヒントや手助けになれば幸いです。