并发编程
awaitAll: 并发获取多个结果
当你需要同时执行多个异步任务,并等待它们全部成功后统一处理结果时,使用 scope.awaitAll()。
并发执行:所有任务同时开始,总耗时取决于最慢的那个任务。
类型安全:要求所有 Deferred 返回相同类型的结果 T,最终结果是 List。
快速失败:只要有一个任务失败,awaitAll 就会立即失败,并取消其他未完成的任务。
import com.m8test.script.GlobalVariables.*
/*
* 完整示例 4.1: 并发获取三个服务器的配置信息
*/
// --- 样板代码 ---
_threads.getMain().setBackground(true)
val scope = _coroutines.newScope(null)
_console.log("准备并发获取多个配置...")
val fetchConfigA = scope.delay(300).then { innerScope, _ -> innerScope.resolve("配置A:加载成功") }
val fetchConfigB = scope.delay(800).then { innerScope, _ -> innerScope.resolve("配置B:加载成功") }
val fetchConfigC = scope.delay(500).then { innerScope, _ -> innerScope.resolve("配置C:加载成功") }
val allConfigsDeferred = scope.awaitAll(listOf(fetchConfigA, fetchConfigB, fetchConfigC))
allConfigsDeferred.then { innerScope, configs ->
// configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致
_console.log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。")
_console.log("结果: " + configs)
innerScope.resolve(null)
}.onError { error ->
_console.error("在并发获取配置时,至少有一个任务失败了: " + error.getMessage())
}
/*
* 完整示例 4.1: 并发获取三个服务器的配置信息
*/
// --- 样板代码 ---
$threads.getMain().setBackground(true)
def scope = $coroutines.newScope(null)
$console.log("准备并发获取多个配置...")
def fetchConfigA = scope.delay(300).then({ innerScope, _ -> innerScope.resolve("配置A:加载成功") })
def fetchConfigB = scope.delay(800).then({ innerScope, _ -> innerScope.resolve("配置B:加载成功") })
def fetchConfigC = scope.delay(500).then({ innerScope, _ -> innerScope.resolve("配置C:加载成功") })
def allConfigsDeferred = scope.awaitAll([fetchConfigA, fetchConfigB, fetchConfigC])
allConfigsDeferred.then({ innerScope, configs ->
// configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致
$console.log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。")
$console.log("结果: " + configs)
innerScope.resolve(null)
}).onError({ error ->
$console.error("在并发获取配置时,至少有一个任务失败了: " + error.getMessage())
})
/*
* 完整示例 4.1: 并发获取三个服务器的配置信息
*/
// --- 样板代码 ---
$threads.getMain().setBackground(true);
const scope = $coroutines.newScope(null);
$console.log("准备并发获取多个配置...");
const fetchConfigA = scope.delay(300).then((innerScope, _) => innerScope.resolve("配置A:加载成功"));
const fetchConfigB = scope.delay(800).then((innerScope, _) => innerScope.resolve("配置B:加载成功"));
const fetchConfigC = scope.delay(500).then((innerScope, _) => innerScope.resolve("配置C:加载成功"));
const allConfigsDeferred = scope.awaitAll([fetchConfigA, fetchConfigB, fetchConfigC]);
allConfigsDeferred.then((innerScope, configs) => {
// configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致
$console.log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。");
$console.log("结果: " + configs);
return innerScope.resolve(null);
}).onError(error => {
$console.error("在并发获取配置时,至少有一个任务失败了: " + error.getMessage());
});
--[[
* 完整示例 4.1: 并发获取三个服务器的配置信息
]]
-- --- 样板代码 ---
_threads:getMain():setBackground(true)
local scope = _coroutines:newScope(nil)
_console:log("准备并发获取多个配置...")
local fetchConfigA = scope:delay(300):_then(function(innerScope, _) return innerScope:resolve("配置A:加载成功") end)
local fetchConfigB = scope:delay(800):_then(function(innerScope, _) return innerScope:resolve("配置B:加载成功") end)
local fetchConfigC = scope:delay(500):_then(function(innerScope, _) return innerScope:resolve("配置C:加载成功") end)
local allConfigsDeferred = scope:awaitAll(_iterables:listOf(fetchConfigA, fetchConfigB, fetchConfigC))
allConfigsDeferred:_then(function(innerScope, configs)
-- configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致
_console:log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。")
_console:log("结果: " .. tostring(configs))
return innerScope:resolve(nil)
end):onError(function(error)
_console:error("在并发获取配置时,至少有一个任务失败了: " .. error:getMessage())
end)
<?php
/** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */
global $threads;
/** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */
global $coroutines;
/** @var m8test_java\com\m8test\script\core\api\console\Console $console */
global $console;
/*
* 完整示例 4.1: 并发获取三个服务器的配置信息
*/
// --- 样板代码 ---
$threads->getMain()->setBackground(true);
$scope = $coroutines->newScope(function ($context) {
$context->setDispatcher(function ($dispatchers) {
return $dispatchers->getScriptMain();
});
});
$console->log(javaString("准备并发获取多个配置..."));
$fetchConfigA = $scope->delay(300)->then(function ($innerScope, $_) {
return $innerScope->resolve(javaString("配置A:加载成功"));
});
$fetchConfigB = $scope->delay(800)->then(function ($innerScope, $_) {
return $innerScope->resolve(javaString("配置B:加载成功"));
});
$fetchConfigC = $scope->delay(500)->then(function ($innerScope, $_) {
return $innerScope->resolve(javaString("配置C:加载成功"));
});
global $iterables;
$allConfigsDeferred = $scope->awaitAll($iterables->listOf($fetchConfigA, $fetchConfigB, $fetchConfigC));
$allConfigsDeferred->then(function ($innerScope, $configs) {
global $console;
// configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致
$console->log(javaString("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。"));
$console->log(javaString("结果: "), $configs);
return $innerScope->resolve(null);
})->onError(function ($error) {
global $console;
$console->error(javaString("在并发获取配置时,至少有一个任务失败了: ") . $error->getMessage());
});
#
# 完整示例 4.1: 并发获取三个服务器的配置信息
#
# --- 样板代码 ---
# 导入所需的全局变量
from m8test_java.com.m8test.script.GlobalVariables import _threads
from m8test_java.com.m8test.script.GlobalVariables import _coroutines
from m8test_java.com.m8test.script.GlobalVariables import _console
_threads.getMain().setBackground(True)
# 使用您指定的 scope 创建方式
scope = _coroutines.newScope(lambda it:
it.setDispatcher(lambda dispatchers: dispatchers.getScriptMain())
)
_console.log("准备并发获取多个配置...")
# 对于单行返回值的 lambda,可以直接返回值,无需元组
fetchConfigA = scope.delay(300).then(lambda innerScope, _: innerScope.resolve("配置A:加载成功"))
fetchConfigB = scope.delay(800).then(lambda innerScope, _: innerScope.resolve("配置B:加载成功"))
fetchConfigC = scope.delay(500).then(lambda innerScope, _: innerScope.resolve("配置C:加载成功"))
allConfigsDeferred = scope.awaitAll([fetchConfigA, fetchConfigB, fetchConfigC])
allConfigsDeferred.then(lambda innerScope, configs: (
# configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致
_console.log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。"),
# Python中打印列表需要将其转换为字符串
_console.log("结果: " + str(configs)),
innerScope.resolve(None)
# 应用规则:返回元组的最后一个元素
)[-1]).onError(lambda error: (
_console.error("在并发获取配置时,至少有一个任务失败了: " + str(error.getMessage()))
))
# encoding: utf-8
#
# 完整示例 4.1: 并发获取三个服务器的配置信息
#
# --- 样板代码 ---
$threads.getMain().setBackground(true)
scope = $coroutines.newScope(nil)
$console.log("准备并发获取多个配置...")
fetchConfigA = scope.delay(300).then { |innerScope, _| innerScope.resolve("配置A:加载成功") }
fetchConfigB = scope.delay(800).then { |innerScope, _| innerScope.resolve("配置B:加载成功") }
fetchConfigC = scope.delay(500).then { |innerScope, _| innerScope.resolve("配置C:加载成功") }
# 创建数组使用 []
allConfigsDeferred = scope.awaitAll([fetchConfigA, fetchConfigB, fetchConfigC])
allConfigsDeferred.then do |innerScope, configs|
# configs 是一个 List<String>,其元素的顺序与输入列表 [fetchConfigA, fetchConfigB, fetchConfigC] 一致
$console.log("所有配置已成功加载!总耗时约800毫秒(取决于最慢的任务)。")
$console.log("结果: " + configs.to_s)
innerScope.resolve(nil)
end.onError do |error|
$console.error("在并发获取配置时,至少有一个任务失败了: " + error.getMessage())
end
joinAll: 等待多个任务完成
当你只关心一组任务是否全部结束(无论成败),而不关心它们的返回值时,使用 scope.joinAll()。
纯同步点:等待所有 Job 或 Deferred 完成。
如果其中一个任务抛出异常,其他未完成的任务会被取消执行。
import com.m8test.script.GlobalVariables.*
/*
* 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束
*/
// --- 样板代码 ---
_threads.getMain().setBackground(true)
// newScope 的 builder 是 Receiver.() -> Unit,所以直接调用 setExceptionHandler
val scope = _coroutines.newScope {
// 设置一个异常处理器,来观察失败任务的日志
setExceptionHandler { ctx, err ->
_console.error("[作用域异常捕获] 任务 '${ctx.getName()}' 失败了: ${err.getMessage()}")
}
}
_console.log("准备启动多个独立的后台任务...")
// 修复:contextBuilder 和 block 都移除了显式参数
val logUploadJob = scope.launch({ setName("日志上传任务") }, null) {
delay(4000).then { innerScope, _ ->
_console.log(" -> 日志上传成功。")
innerScope.resolve(null)
}
}
// 修复:contextBuilder 和 block 都移除了显式参数
val cacheClearJob = scope.launch({ setName("缓存清理任务") }, null) {
delay(200).then<Unit> { innerScope, _ ->
_console.log(" -> 缓存清理时发生了一个预期的错误。")
throw RuntimeException("磁盘空间不足!")
}
}
scope.joinAll(listOf(logUploadJob, cacheClearJob)).then { innerScope, _ ->
_console.log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。")
_console.log("现在可以安全地执行后续的清理操作或退出脚本。")
innerScope.resolve(null)
}.onError {
_console.error("至少有任务执行失败")
}
/*
* 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束
*/
// --- 样板代码 ---
$threads.getMain().setBackground(true)
def scope = $coroutines.newScope({ context ->
// 设置一个异常处理器,来观察失败任务的日志
context.setExceptionHandler({ ctx, err ->
$console.error("[作用域异常捕获] 任务 '${ctx.getName()}' 失败了: ${err.getMessage()}")
})
})
$console.log("准备启动多个独立的后台任务...")
def logUploadJob = scope.launch({ ctx -> ctx.setName("日志上传任务") }, null, { s ->
s.delay(4000).then({ innerScope, _ ->
$console.log(" -> 日志上传成功。")
innerScope.resolve(null)
})
})
def cacheClearJob = scope.launch({ ctx -> ctx.setName("缓存清理任务") }, null, { s ->
s.delay(200).then({ innerScope, _ ->
$console.log(" -> 缓存清理时发生了一个预期的错误。")
throw new RuntimeException("磁盘空间不足!")
})
})
scope.joinAll([logUploadJob, cacheClearJob]).then({ innerScope, _ ->
$console.log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。")
$console.log("现在可以安全地执行后续的清理操作或退出脚本。")
innerScope.resolve(null)
}).onError {
$console.error("至少有任务执行失败")
}
/*
* 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束
*/
// --- 样板代码 ---
$threads.getMain().setBackground(true);
const scope = $coroutines.newScope(context => {
// 设置一个异常处理器,来观察失败任务的日志
context.setExceptionHandler((ctx, err) => {
$console.error(`[作用域异常捕获] 任务 '${ctx.getName()}' 失败了: ${err.getMessage()}`);
});
});
$console.log("准备启动多个独立的后台任务...");
const logUploadJob = scope.launch(ctx => ctx.setName("日志上传任务"), null, s => {
s.delay(4000).then((innerScope, _) => {
$console.log(" -> 日志上传成功。");
return innerScope.resolve(null);
});
});
const cacheClearJob = scope.launch(ctx => ctx.setName("缓存清理任务"), null, s => {
s.delay(200).then((innerScope, _) => {
$console.log(" -> 缓存清理时发生了一个预期的错误。");
throw new Error("磁盘空间不足!");
});
});
scope.joinAll([logUploadJob, cacheClearJob]).then((innerScope, _) => {
$console.log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。");
$console.log("现在可以安全地执行后续的清理操作或退出脚本。");
return innerScope.resolve(null);
}).onError(error => {
$console.error("至少有任务执行失败");
});
--[[
* 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束
]]
-- --- 样板代码 ---
_threads:getMain():setBackground(true)
local scope = _coroutines:newScope(function(context)
-- 设置一个异常处理器,来观察失败任务的日志
context:setExceptionHandler(function(ctx, err)
_console:error("[作用域异常捕获] 任务 '"..ctx:getName().."' 失败了: "..err:getMessage())
end)
end)
_console:log("准备启动多个独立的后台任务...")
local logUploadJob = scope:launch(function(ctx) ctx:setName("日志上传任务") end, nil, function(s)
s:delay(4000):_then(function(innerScope, _)
_console:log(" -> 日志上传成功。")
return innerScope:resolve(nil)
end)
end)
local cacheClearJob = scope:launch(function(ctx) ctx:setName("缓存清理任务") end, nil, function(s)
s:delay(200):_then(function(innerScope, _)
_console:log(" -> 缓存清理时发生了一个预期的错误。")
-- 在Lua中,使用 error() 来抛出异常
error("磁盘空间不足!")
end)
end)
scope:joinAll(_iterables:listOf(logUploadJob, cacheClearJob)):_then(function(innerScope, _)
_console:log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。")
_console:log("现在可以安全地执行后续的清理操作或退出脚本。")
return innerScope:resolve(nil)
end):onError(function()
_console:error("至少有任务执行失败")
end)
<?php
/** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */
global $threads;
/** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */
global $coroutines;
/** @var m8test_java\com\m8test\script\core\api\console\Console $console */
global $console;
/*
* 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束
*/
// --- 样板代码 ---
$threads->getMain()->setBackground(true);
$scope = $coroutines->newScope(function ($context) {
global $console;
$context->setDispatcher(function ($dispatchers) {
return $dispatchers->getScriptMain();
});
// 设置一个异常处理器,来观察失败任务的日志
$context->setExceptionHandler(function ($ctx, $err) use ($console) {
$console->error(javaString("[作用域异常捕获] 任务 '") . $ctx->getName() . javaString("' 失败了: ") . $err->getMessage());
});
});
$console->log(javaString("准备启动多个独立的后台任务..."));
$logUploadJob = $scope->launch(function ($ctx) {
$ctx->setName(javaString("日志上传任务"));
}, null, function ($s) {
$s->delay(4000)->then(function ($innerScope, $_) {
global $console;
$console->log(javaString(" -> 日志上传成功。"));
return $innerScope->resolve(null);
});
});
$cacheClearJob = $scope->launch(function ($ctx) {
$ctx->setName(javaString("缓存清理任务"));
}, null, function ($s) {
$s->delay(200)->then(function ($innerScope, $_) {
global $console;
global $exceptions;
$console->log(javaString(" -> 缓存清理时发生了一个预期的错误。"));
return $innerScope->reject($exceptions->newThrowable(function ($eb) {
$eb->setMessageAndClassName(javaString("磁盘空间不足!"), null);
}));
});
});
global $iterables;
$scope->joinAll($iterables->listOf($logUploadJob, $cacheClearJob))->then(function ($innerScope, $_) {
global $console;
$console->log(javaString("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。"));
$console->log(javaString("现在可以安全地执行后续的清理操作或退出脚本。"));
return $innerScope->resolve(null);
})->onError(function () {
global $console;
$console->error(javaString("至少有任务执行失败"));
});
#
# 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束
#
from m8test_java.com.m8test.script.GlobalVariables import _console
from m8test_java.com.m8test.script.GlobalVariables import _coroutines
from m8test_java.com.m8test.script.GlobalVariables import _exceptions
# --- 样板代码 ---
# 导入所需的全局变量
from m8test_java.com.m8test.script.GlobalVariables import _threads
_threads.getMain().setBackground(True)
scope = _coroutines.newScope(lambda context: (
# 设置一个异常处理器,来观察失败任务的日志
context.setExceptionHandler(lambda ctx, err:
_console.error(
"[作用域异常捕获] 任务 '{}' 失败了: {}".format(ctx.getName(), err.getMessage()))
),
context.setDispatcher(lambda dispatchers: dispatchers.getScriptMain())
))
_console.log("准备启动多个独立的后台任务...")
logUploadJob = scope.launch(lambda ctx: ctx.setName("日志上传任务"), None, lambda s:
s.delay(4000).then(lambda innerScope, _: (
_console.log(" -> 日志上传成功。"),
innerScope.resolve(None)
)[-1]))
cacheClearJob = scope.launch(lambda ctx: ctx.setName("缓存清理任务"), None, lambda s:
s.delay(200).then(lambda innerScope, _: (
_console.log(" -> 缓存清理时发生了一个预期的错误。"),
innerScope.reject(_exceptions.newThrowable(lambda builder:
builder.setMessageAndClassName("磁盘空间不足!",
"java.lang.RuntimeException")
))
)[-1]
))
scope.joinAll([logUploadJob, cacheClearJob]).then(lambda innerScope, _: (
_console.log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。"),
_console.log("现在可以安全地执行后续的清理操作或退出脚本。"),
innerScope.resolve(None)
)[-1]).onError(lambda error: _console.error("至少有任务执行失败"))
# encoding: utf-8
#
# 完整示例 4.2: 启动多个后台任务(日志上报、文件清理),并等待它们全部结束
#
# --- 样板代码 ---
$threads.getMain().setBackground(true)
scope = $coroutines.newScope do |context|
# 设置一个异常处理器,来观察失败任务的日志
context.setExceptionHandler do |ctx, err|
$console.error("[作用域异常捕获] 任务 '#{ctx.getName()}' 失败了: #{err.getMessage()}")
end
end
$console.log("准备启动多个独立的后台任务...")
logUploadJob = scope.launch(lambda { |ctx| ctx.setName("日志上传任务") }, nil) do |s|
s.delay(4000).then do |innerScope, _|
$console.log(" -> 日志上传成功。")
innerScope.resolve(nil)
end
end
cacheClearJob = scope.launch(lambda { |ctx| ctx.setName("缓存清理任务") }, nil) do |s|
s.delay(200).then do |innerScope, _|
$console.log(" -> 缓存清理时发生了一个预期的错误。")
raise RuntimeException.new("磁盘空间不足!")
end
end
scope.joinAll([logUploadJob, cacheClearJob]).then do |innerScope, _|
$console.log("\n✅ joinAll 完成!这意味着所有后台任务都已经结束了(无论成功或失败)。")
$console.log("现在可以安全地执行后续的清理操作或退出脚本。")
innerScope.resolve(nil)
end.onError do
$console.error("至少有任务执行失败")
end
09 December 2025