M8Test Help

并发编程

awaitAll: 并发获取多个结果

当你需要同时执行多个异步任务,并等待它们全部成功后统一处理结果时,使用 scope.awaitAll()。

  • 并发执行:所有任务同时开始,总耗时取决于最慢的那个任务。

  • 类型安全:要求所有 Deferred 返回相同类型的结果 T,最终结果是 List。

  • 快速失败:只要有一个任务失败,awaitAll 就会立即失败,并取消其他未完成的任务。

import com.m8test.script.GlobalVariables.* /* * 完整示例 4.1: 并发获取三个服务器的配置信息 */ // --- 样板代码 --- _threads.getMain().setBackground(true) val scope = _coroutines.newScope(null) _console.log("准备并发获取多个配置...") val fetchConfigA = scope.delay(300).then { innerScope, _ -> innerScope.resolve("配置A:加载成功") } val fetchConfigB = scope.delay(800).then { innerScope, _ -> innerScope.resolve("配置B:加载成功") } val fetchConfigC = scope.delay(500).then { innerScope, _ -> innerScope.resolve("配置C:加载成功") } val allConfigsDeferred = scope.awaitAll(listOf(fetchConfigA, fetchConfigB, fetchConfigC)) allConfigsDeferred.then { innerScope, configs -> // configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致 _console.log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。") _console.log("结果: " + configs) innerScope.resolve(null) }.onError { error -> _console.error("在并发获取配置时,至少有一个任务失败了: " + error.getMessage()) }
/* * 完整示例 4.1: 并发获取三个服务器的配置信息 */ // --- 样板代码 --- $threads.getMain().setBackground(true) def scope = $coroutines.newScope(null) $console.log("准备并发获取多个配置...") def fetchConfigA = scope.delay(300).then({ innerScope, _ -> innerScope.resolve("配置A:加载成功") }) def fetchConfigB = scope.delay(800).then({ innerScope, _ -> innerScope.resolve("配置B:加载成功") }) def fetchConfigC = scope.delay(500).then({ innerScope, _ -> innerScope.resolve("配置C:加载成功") }) def allConfigsDeferred = scope.awaitAll([fetchConfigA, fetchConfigB, fetchConfigC]) allConfigsDeferred.then({ innerScope, configs -> // configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致 $console.log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。") $console.log("结果: " + configs) innerScope.resolve(null) }).onError({ error -> $console.error("在并发获取配置时,至少有一个任务失败了: " + error.getMessage()) })
/* * 完整示例 4.1: 并发获取三个服务器的配置信息 */ // --- 样板代码 --- $threads.getMain().setBackground(true); const scope = $coroutines.newScope(null); $console.log("准备并发获取多个配置..."); const fetchConfigA = scope.delay(300).then((innerScope, _) => innerScope.resolve("配置A:加载成功")); const fetchConfigB = scope.delay(800).then((innerScope, _) => innerScope.resolve("配置B:加载成功")); const fetchConfigC = scope.delay(500).then((innerScope, _) => innerScope.resolve("配置C:加载成功")); const allConfigsDeferred = scope.awaitAll([fetchConfigA, fetchConfigB, fetchConfigC]); allConfigsDeferred.then((innerScope, configs) => { // configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致 $console.log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。"); $console.log("结果: " + configs); return innerScope.resolve(null); }).onError(error => { $console.error("在并发获取配置时,至少有一个任务失败了: " + error.getMessage()); });
--[[ * 完整示例 4.1: 并发获取三个服务器的配置信息 ]] -- --- 样板代码 --- _threads:getMain():setBackground(true) local scope = _coroutines:newScope(nil) _console:log("准备并发获取多个配置...") local fetchConfigA = scope:delay(300):_then(function(innerScope, _) return innerScope:resolve("配置A:加载成功") end) local fetchConfigB = scope:delay(800):_then(function(innerScope, _) return innerScope:resolve("配置B:加载成功") end) local fetchConfigC = scope:delay(500):_then(function(innerScope, _) return innerScope:resolve("配置C:加载成功") end) local allConfigsDeferred = scope:awaitAll(_iterables:listOf(fetchConfigA, fetchConfigB, fetchConfigC)) allConfigsDeferred:_then(function(innerScope, configs) -- configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致 _console:log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。") _console:log("结果: " .. tostring(configs)) return innerScope:resolve(nil) end):onError(function(error) _console:error("在并发获取配置时,至少有一个任务失败了: " .. error:getMessage()) end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; /* * 完整示例 4.1: 并发获取三个服务器的配置信息 */ // --- 样板代码 --- $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->log(javaString("准备并发获取多个配置...")); $fetchConfigA = $scope->delay(300)->then(function ($innerScope, $_) { return $innerScope->resolve(javaString("配置A:加载成功")); }); $fetchConfigB = $scope->delay(800)->then(function ($innerScope, $_) { return $innerScope->resolve(javaString("配置B:加载成功")); }); $fetchConfigC = $scope->delay(500)->then(function ($innerScope, $_) { return $innerScope->resolve(javaString("配置C:加载成功")); }); global $iterables; $allConfigsDeferred = $scope->awaitAll($iterables->listOf($fetchConfigA, $fetchConfigB, $fetchConfigC)); $allConfigsDeferred->then(function ($innerScope, $configs) { global $console; // configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致 $console->log(javaString("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。")); $console->log(javaString("结果: "), $configs); return $innerScope->resolve(null); })->onError(function ($error) { global $console; $console->error(javaString("在并发获取配置时,至少有一个任务失败了: ") . $error->getMessage()); });
# # 完整示例 4.1: 并发获取三个服务器的配置信息 # # --- 样板代码 --- # 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _threads from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _console _threads.getMain().setBackground(True) # 使用您指定的 scope 创建方式 scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda dispatchers: dispatchers.getScriptMain()) ) _console.log("准备并发获取多个配置...") # 对于单行返回值的 lambda,可以直接返回值,无需元组 fetchConfigA = scope.delay(300).then(lambda innerScope, _: innerScope.resolve("配置A:加载成功")) fetchConfigB = scope.delay(800).then(lambda innerScope, _: innerScope.resolve("配置B:加载成功")) fetchConfigC = scope.delay(500).then(lambda innerScope, _: innerScope.resolve("配置C:加载成功")) allConfigsDeferred = scope.awaitAll([fetchConfigA, fetchConfigB, fetchConfigC]) allConfigsDeferred.then(lambda innerScope, configs: ( # configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致 _console.log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。"), # Python中打印列表需要将其转换为字符串 _console.log("结果: " + str(configs)), innerScope.resolve(None) # 应用规则:返回元组的最后一个元素 )[-1]).onError(lambda error: ( _console.error("在并发获取配置时,至少有一个任务失败了: " + str(error.getMessage())) ))
# encoding: utf-8 # # 完整示例 4.1: 并发获取三个服务器的配置信息 # # --- 样板代码 --- $threads.getMain().setBackground(true) scope = $coroutines.newScope(nil) $console.log("准备并发获取多个配置...") fetchConfigA = scope.delay(300).then { |innerScope, _| innerScope.resolve("配置A:加载成功") } fetchConfigB = scope.delay(800).then { |innerScope, _| innerScope.resolve("配置B:加载成功") } fetchConfigC = scope.delay(500).then { |innerScope, _| innerScope.resolve("配置C:加载成功") } # 创建数组使用 [] allConfigsDeferred = scope.awaitAll([fetchConfigA, fetchConfigB, fetchConfigC]) allConfigsDeferred.then do |innerScope, configs| # configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致 $console.log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。") $console.log("结果: " + configs.to_s) innerScope.resolve(nil) end.onError do |error| $console.error("在并发获取配置时,至少有一个任务失败了: " + error.getMessage()) end

joinAll: 等待多个任务完成

当你只关心一组任务是否全部结束(无论成败),而不关心它们的返回值时,使用 scope.joinAll()。

  • 纯同步点:等待所有 Job 或 Deferred 完成。

  • 如果其中一个任务抛出异常,其他未完成的任务会被取消执行。

import com.m8test.script.GlobalVariables.* /* * 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束 */ // --- 样板代码 --- _threads.getMain().setBackground(true) // newScope 的 builder 是 Receiver.() -> Unit,所以直接调用 setExceptionHandler val scope = _coroutines.newScope { // 设置一个异常处理器,来观察失败任务的日志 setExceptionHandler { ctx, err -> _console.error("[作用域异常捕获] 任务 '${ctx.getName()}' 失败了: ${err.getMessage()}") } } _console.log("准备启动多个独立的后台任务...") // 修复:contextBuilder 和 block 都移除了显式参数 val logUploadJob = scope.launch({ setName("日志上传任务") }, null) { delay(4000).then { innerScope, _ -> _console.log(" -> 日志上传成功。") innerScope.resolve(null) } } // 修复:contextBuilder 和 block 都移除了显式参数 val cacheClearJob = scope.launch({ setName("缓存清理任务") }, null) { delay(200).then<Unit> { innerScope, _ -> _console.log(" -> 缓存清理时发生了一个预期的错误。") throw RuntimeException("磁盘空间不足!") } } scope.joinAll(listOf(logUploadJob, cacheClearJob)).then { innerScope, _ -> _console.log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。") _console.log("现在可以安全地执行后续的清理操作或退出脚本。") innerScope.resolve(null) }.onError { _console.error("至少有任务执行失败") }
/* * 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束 */ // --- 样板代码 --- $threads.getMain().setBackground(true) def scope = $coroutines.newScope({ context -> // 设置一个异常处理器,来观察失败任务的日志 context.setExceptionHandler({ ctx, err -> $console.error("[作用域异常捕获] 任务 '${ctx.getName()}' 失败了: ${err.getMessage()}") }) }) $console.log("准备启动多个独立的后台任务...") def logUploadJob = scope.launch({ ctx -> ctx.setName("日志上传任务") }, null, { s -> s.delay(4000).then({ innerScope, _ -> $console.log(" -> 日志上传成功。") innerScope.resolve(null) }) }) def cacheClearJob = scope.launch({ ctx -> ctx.setName("缓存清理任务") }, null, { s -> s.delay(200).then({ innerScope, _ -> $console.log(" -> 缓存清理时发生了一个预期的错误。") throw new RuntimeException("磁盘空间不足!") }) }) scope.joinAll([logUploadJob, cacheClearJob]).then({ innerScope, _ -> $console.log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。") $console.log("现在可以安全地执行后续的清理操作或退出脚本。") innerScope.resolve(null) }).onError { $console.error("至少有任务执行失败") }
/* * 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束 */ // --- 样板代码 --- $threads.getMain().setBackground(true); const scope = $coroutines.newScope(context => { // 设置一个异常处理器,来观察失败任务的日志 context.setExceptionHandler((ctx, err) => { $console.error(`[作用域异常捕获] 任务 '${ctx.getName()}' 失败了: ${err.getMessage()}`); }); }); $console.log("准备启动多个独立的后台任务..."); const logUploadJob = scope.launch(ctx => ctx.setName("日志上传任务"), null, s => { s.delay(4000).then((innerScope, _) => { $console.log(" -> 日志上传成功。"); return innerScope.resolve(null); }); }); const cacheClearJob = scope.launch(ctx => ctx.setName("缓存清理任务"), null, s => { s.delay(200).then((innerScope, _) => { $console.log(" -> 缓存清理时发生了一个预期的错误。"); throw new Error("磁盘空间不足!"); }); }); scope.joinAll([logUploadJob, cacheClearJob]).then((innerScope, _) => { $console.log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。"); $console.log("现在可以安全地执行后续的清理操作或退出脚本。"); return innerScope.resolve(null); }).onError(error => { $console.error("至少有任务执行失败"); });
--[[ * 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束 ]] -- --- 样板代码 --- _threads:getMain():setBackground(true) local scope = _coroutines:newScope(function(context) -- 设置一个异常处理器,来观察失败任务的日志 context:setExceptionHandler(function(ctx, err) _console:error("[作用域异常捕获] 任务 '"..ctx:getName().."' 失败了: "..err:getMessage()) end) end) _console:log("准备启动多个独立的后台任务...") local logUploadJob = scope:launch(function(ctx) ctx:setName("日志上传任务") end, nil, function(s) s:delay(4000):_then(function(innerScope, _) _console:log(" -> 日志上传成功。") return innerScope:resolve(nil) end) end) local cacheClearJob = scope:launch(function(ctx) ctx:setName("缓存清理任务") end, nil, function(s) s:delay(200):_then(function(innerScope, _) _console:log(" -> 缓存清理时发生了一个预期的错误。") -- 在Lua中,使用 error() 来抛出异常 error("磁盘空间不足!") end) end) scope:joinAll(_iterables:listOf(logUploadJob, cacheClearJob)):_then(function(innerScope, _) _console:log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。") _console:log("现在可以安全地执行后续的清理操作或退出脚本。") return innerScope:resolve(nil) end):onError(function() _console:error("至少有任务执行失败") end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; /* * 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束 */ // --- 样板代码 --- $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { global $console; $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); // 设置一个异常处理器,来观察失败任务的日志 $context->setExceptionHandler(function ($ctx, $err) use ($console) { $console->error(javaString("[作用域异常捕获] 任务 '") . $ctx->getName() . javaString("' 失败了: ") . $err->getMessage()); }); }); $console->log(javaString("准备启动多个独立的后台任务...")); $logUploadJob = $scope->launch(function ($ctx) { $ctx->setName(javaString("日志上传任务")); }, null, function ($s) { $s->delay(4000)->then(function ($innerScope, $_) { global $console; $console->log(javaString(" -> 日志上传成功。")); return $innerScope->resolve(null); }); }); $cacheClearJob = $scope->launch(function ($ctx) { $ctx->setName(javaString("缓存清理任务")); }, null, function ($s) { $s->delay(200)->then(function ($innerScope, $_) { global $console; global $exceptions; $console->log(javaString(" -> 缓存清理时发生了一个预期的错误。")); return $innerScope->reject($exceptions->newThrowable(function ($eb) { $eb->setMessageAndClassName(javaString("磁盘空间不足!"), null); })); }); }); global $iterables; $scope->joinAll($iterables->listOf($logUploadJob, $cacheClearJob))->then(function ($innerScope, $_) { global $console; $console->log(javaString("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。")); $console->log(javaString("现在可以安全地执行后续的清理操作或退出脚本。")); return $innerScope->resolve(null); })->onError(function () { global $console; $console->error(javaString("至少有任务执行失败")); });
# # 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束 # from m8test_java.com.m8test.script.GlobalVariables import _console from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _exceptions # --- 样板代码 --- # 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _threads _threads.getMain().setBackground(True) scope = _coroutines.newScope(lambda context: ( # 设置一个异常处理器,来观察失败任务的日志 context.setExceptionHandler(lambda ctx, err: _console.error( "[作用域异常捕获] 任务 '{}' 失败了: {}".format(ctx.getName(), err.getMessage())) ), context.setDispatcher(lambda dispatchers: dispatchers.getScriptMain()) )) _console.log("准备启动多个独立的后台任务...") logUploadJob = scope.launch(lambda ctx: ctx.setName("日志上传任务"), None, lambda s: s.delay(4000).then(lambda innerScope, _: ( _console.log(" -> 日志上传成功。"), innerScope.resolve(None) )[-1])) cacheClearJob = scope.launch(lambda ctx: ctx.setName("缓存清理任务"), None, lambda s: s.delay(200).then(lambda innerScope, _: ( _console.log(" -> 缓存清理时发生了一个预期的错误。"), innerScope.reject(_exceptions.newThrowable(lambda builder: builder.setMessageAndClassName("磁盘空间不足!", "java.lang.RuntimeException") )) )[-1] )) scope.joinAll([logUploadJob, cacheClearJob]).then(lambda innerScope, _: ( _console.log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。"), _console.log("现在可以安全地执行后续的清理操作或退出脚本。"), innerScope.resolve(None) )[-1]).onError(lambda error: _console.error("至少有任务执行失败"))
# encoding: utf-8 # # 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束 # # --- 样板代码 --- $threads.getMain().setBackground(true) scope = $coroutines.newScope do |context| # 设置一个异常处理器,来观察失败任务的日志 context.setExceptionHandler do |ctx, err| $console.error("[作用域异常捕获] 任务 '#{ctx.getName()}' 失败了: #{err.getMessage()}") end end $console.log("准备启动多个独立的后台任务...") logUploadJob = scope.launch(lambda { |ctx| ctx.setName("日志上传任务") }, nil) do |s| s.delay(4000).then do |innerScope, _| $console.log(" -> 日志上传成功。") innerScope.resolve(nil) end end cacheClearJob = scope.launch(lambda { |ctx| ctx.setName("缓存清理任务") }, nil) do |s| s.delay(200).then do |innerScope, _| $console.log(" -> 缓存清理时发生了一个预期的错误。") raise RuntimeException.new("磁盘空间不足!") end end scope.joinAll([logUploadJob, cacheClearJob]).then do |innerScope, _| $console.log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。") $console.log("现在可以安全地执行后续的清理操作或退出脚本。") innerScope.resolve(nil) end.onError do $console.error("至少有任务执行失败") end
09 December 2025