import com.m8test.script.GlobalVariables.*
// 确保脚本在后台运行以等待协程完成
_threads.getMain().setBackground(true)
// 创建一个新的协程作用域
val scope = _coroutines.newScope(null)
_console.info("--- 章节 1.1:从固定元素创建 Flow ---")
// 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow
val numberFlow = _coroutines.getFlows().of(1, 2, 3, 4, 5)
_console.log("Flow 已创建,但尚未执行。")
// 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行
// collect 的回调函数接收 (CoroutineScope, value) 两个参数
// 并需要返回一个 Job 来表示对该值的处理状态
val collectJob = numberFlow.collect(scope) { collectScope, value ->
_console.log("接收到值: ${value}")
// 对于简单的同步处理,返回一个已完成的 Deferred 即可
collectScope.resolve(null)
}
// 当收集完成时打印日志
collectJob.invokeOnCompletion { error ->
if (error == null) {
_console.info("Flow 消费完成!")
} else {
_console.error("Flow 消费出错: ${error.getMessage()}")
}
}
// 确保脚本在后台运行以等待协程完成
$threads.getMain().setBackground(true)
// 创建一个新的协程作用域
def scope = $coroutines.newScope(null)
$console.info("--- 章节 1.1:从固定元素创建 Flow ---")
// 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow
def numberFlow = $coroutines.getFlows().of(1, 2, 3, 4, 5)
$console.log("Flow 已创建,但尚未执行。")
// 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行
// collect 的回调函数接收 (CoroutineScope, value) 两个参数
// 并需要返回一个 Job 来表示对该值的处理状态
def collectJob = numberFlow.collect(scope, { collectScope, value ->
$console.log("接收到值: ${value}")
// 对于简单的同步处理,返回一个已完成的 Deferred 即可
return collectScope.resolve(null)
})
// 当收集完成时打印日志
collectJob.invokeOnCompletion({ error ->
if (error == null) {
$console.info("Flow 消费完成!")
} else {
$console.error("Flow 消费出错: ${error.getMessage()}")
}
})
// 确保脚本在后台运行以等待协程完成
$threads.getMain().setBackground(true);
// 创建一个新的协程作用域
const scope = $coroutines.newScope(null);
$console.info("--- 章节 1.1:从固定元素创建 Flow ---");
// 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow
const numberFlow = $coroutines.getFlows().of(1, 2, 3, 4, 5);
$console.log("Flow 已创建,但尚未执行。");
// 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行
// collect 的回调函数接收 (CoroutineScope, value) 两个参数
// 并需要返回一个 Job 来表示对该值的处理状态
const collectJob = numberFlow.collect(scope, (collectScope, value) => {
$console.log(`接收到值: ${value}`);
// 对于简单的同步处理,返回一个已完成的 Deferred 即可
return collectScope.resolve(null);
});
// 当收集完成时打印日志
collectJob.invokeOnCompletion(error => {
if (error == null) {
$console.info("Flow 消费完成!");
} else {
$console.error(`Flow 消费出错: ${error.getMessage()}`);
}
});
-- 确保脚本在后台运行以等待协程完成
_threads:getMain():setBackground(true)
-- 创建一个新的协程作用域
local scope = _coroutines:newScope(nil)
_console:info("--- 章节 1.1:从固定元素创建 Flow ---")
-- 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow
local numberFlow = _coroutines:getFlows():of(1, 2, 3, 4, 5)
_console:log("Flow 已创建,但尚未执行。")
-- 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行
-- collect 的回调函数接收 (CoroutineScope, value) 两个参数
-- 并需要返回一个 Job 来表示对该值的处理状态
local collectJob = numberFlow:collect(scope, function(collectScope, value)
_console:log("接收到值: " .. tostring(value))
-- 对于简单的同步处理,返回一个已完成的 Deferred 即可
return collectScope:resolve(nil)
end)
-- 当收集完成时打印日志
collectJob:invokeOnCompletion(function(error)
if error == nil then
_console:info("Flow 消费完成!")
else
_console:error("Flow 消费出错: " .. error:getMessage())
end
end)
<?php
/** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */
global $threads;
/** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */
global $coroutines;
/** @var m8test_java\com\m8test\script\core\api\console\Console $console */
global $console;
// 确保脚本在后台运行以等待协程完成
$threads->getMain()->setBackground(true);
// 创建一个新的协程作用域
$scope = $coroutines->newScope(function ($context) {
$context->setDispatcher(function ($dispatchers) {
return $dispatchers->getScriptMain();
});
});
$console->info(javaString("--- 章节 1.1:从固定元素创建 Flow ---"));
// 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow
$numberFlow = $coroutines->getFlows()->of(1, 2, 3, 4, 5);
$console->log(javaString("Flow 已创建,但尚未执行。"));
// 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行
// collect 的回调函数接收 (CoroutineScope, value) 两个参数
// 并需要返回一个 Job 来表示对该值的处理状态
$collectJob = $numberFlow->collect($scope, function ($collectScope, $value) {
global $console;
$console->log(javaString("接收到值: {$value}"));
// 对于简单的同步处理,返回一个已完成的 Deferred 即可
return $collectScope->resolve(null);
});
// 当收集完成时打印日志
$collectJob->invokeOnCompletion(function ($error) {
global $console;
if ($error == null) {
$console->info(javaString("Flow 消费完成!"));
} else {
$console->error(javaString("Flow 消费出错: {$error->getMessage()}"));
}
});
# 导入所需的全局变量
from m8test_java.com.m8test.script.GlobalVariables import _threads
from m8test_java.com.m8test.script.GlobalVariables import _coroutines
from m8test_java.com.m8test.script.GlobalVariables import _console
# 确保脚本在后台运行以等待协程完成
_threads.getMain().setBackground(True)
# 创建一个新的协程作用域
scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain()))
_console.info("--- 章节 1.1:从固定元素创建 Flow ---")
# 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow
numberFlow = _coroutines.getFlows().of(1, 2, 3, 4, 5)
_console.log("Flow 已创建,但尚未执行。")
# 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行
# collect 的回调函数接收 (CoroutineScope, value) 两个参数
# 并需要返回一个 Job 来表示对该值的处理状态
collectJob = numberFlow.collect(scope, lambda collectScope, value: (
_console.log("接收到值: " + str(value)),
# 对于简单的同步处理,返回一个已完成的 Deferred 即可
collectScope.resolve(None)
)[-1])
# 当收集完成时打印日志
collectJob.invokeOnCompletion(lambda error: (
_console.info("Flow 消费完成!") if error is None else
_console.error("Flow 消费出错: " + str(error.getMessage()))
))
# encoding: utf-8
# 确保脚本在后台运行以等待协程完成
$threads.getMain().setBackground(true)
# 创建一个新的协程作用域
scope = $coroutines.newScope(nil)
$console.info("--- 章节 1.1:从固定元素创建 Flow ---")
# 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow
numberFlow = $coroutines.getFlows().of(1, 2, 3, 4, 5)
$console.log("Flow 已创建,但尚未执行。")
# 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行
# collect 的回调函数接收 (CoroutineScope, value) 两个参数
# 并需要返回一个 Job 来表示对该值的处理状态
collectJob = numberFlow.collect(scope) do |collectScope, value|
$console.log("接收到值: #{value}")
# 对于简单的同步处理,返回一个已完成的 Deferred 即可
collectScope.resolve(nil) # Ruby 默认返回最后一行,所以这里 return 可以省略
end
# 当收集完成时打印日志
collectJob.invokeOnCompletion do |error|
if error == nil
$console.info("Flow 消费完成!")
else
$console.error("Flow 消费出错: #{error.getMessage()}")
end
end