M8Test Help

Flow 使用指南

Flow 是协程中用于处理异步数据流的核心组件。它就像一个可以异步、按需生产一系列值的“管道”。与一次性返回单个结果的 Deferred 不同,Flow 可以随时间发出零个、一个或多个值。 本指南将通过一系列可运行的 脚本示例,带您了解如何创建、转换和消费 Flow。

Flow 的创建与基础消费

Flow 的所有创建操作都始于 $coroutines.getFlows()。一个 Flow 在被“消费”(或称“收集”)之前,是冷的——它内部的代码不会执行。只有当调用终端操作符(如 collect)时,数据才会开始流动。

从固定元素创建 Flow (of)

这是创建 Flow 最简单的方式,适用于你已经知道所有要发出的值的情况。

import com.m8test.script.GlobalVariables.* // 确保脚本在后台运行以等待协程完成 _threads.getMain().setBackground(true) // 创建一个新的协程作用域 val scope = _coroutines.newScope(null) _console.info("--- 章节 1.1:从固定元素创建 Flow ---") // 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow val numberFlow = _coroutines.getFlows().of(1, 2, 3, 4, 5) _console.log("Flow 已创建,但尚未执行。") // 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行 // collect 的回调函数接收 (CoroutineScope, value) 两个参数 // 并需要返回一个 Job 来表示对该值的处理状态 val collectJob = numberFlow.collect(scope) { collectScope, value -> _console.log("接收到值: ${value}") // 对于简单的同步处理,返回一个已完成的 Deferred 即可 collectScope.resolve(null) } // 当收集完成时打印日志 collectJob.invokeOnCompletion { error -> if (error == null) { _console.info("Flow 消费完成!") } else { _console.error("Flow 消费出错: ${error.getMessage()}") } }
// 确保脚本在后台运行以等待协程完成 $threads.getMain().setBackground(true) // 创建一个新的协程作用域 def scope = $coroutines.newScope(null) $console.info("--- 章节 1.1:从固定元素创建 Flow ---") // 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow def numberFlow = $coroutines.getFlows().of(1, 2, 3, 4, 5) $console.log("Flow 已创建,但尚未执行。") // 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行 // collect 的回调函数接收 (CoroutineScope, value) 两个参数 // 并需要返回一个 Job 来表示对该值的处理状态 def collectJob = numberFlow.collect(scope, { collectScope, value -> $console.log("接收到值: ${value}") // 对于简单的同步处理,返回一个已完成的 Deferred 即可 return collectScope.resolve(null) }) // 当收集完成时打印日志 collectJob.invokeOnCompletion({ error -> if (error == null) { $console.info("Flow 消费完成!") } else { $console.error("Flow 消费出错: ${error.getMessage()}") } })
// 确保脚本在后台运行以等待协程完成 $threads.getMain().setBackground(true); // 创建一个新的协程作用域 const scope = $coroutines.newScope(null); $console.info("--- 章节 1.1:从固定元素创建 Flow ---"); // 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow const numberFlow = $coroutines.getFlows().of(1, 2, 3, 4, 5); $console.log("Flow 已创建,但尚未执行。"); // 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行 // collect 的回调函数接收 (CoroutineScope, value) 两个参数 // 并需要返回一个 Job 来表示对该值的处理状态 const collectJob = numberFlow.collect(scope, (collectScope, value) => { $console.log(`接收到值: ${value}`); // 对于简单的同步处理,返回一个已完成的 Deferred 即可 return collectScope.resolve(null); }); // 当收集完成时打印日志 collectJob.invokeOnCompletion(error => { if (error == null) { $console.info("Flow 消费完成!"); } else { $console.error(`Flow 消费出错: ${error.getMessage()}`); } });
-- 确保脚本在后台运行以等待协程完成 _threads:getMain():setBackground(true) -- 创建一个新的协程作用域 local scope = _coroutines:newScope(nil) _console:info("--- 章节 1.1:从固定元素创建 Flow ---") -- 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow local numberFlow = _coroutines:getFlows():of(1, 2, 3, 4, 5) _console:log("Flow 已创建,但尚未执行。") -- 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行 -- collect 的回调函数接收 (CoroutineScope, value) 两个参数 -- 并需要返回一个 Job 来表示对该值的处理状态 local collectJob = numberFlow:collect(scope, function(collectScope, value) _console:log("接收到值: " .. tostring(value)) -- 对于简单的同步处理,返回一个已完成的 Deferred 即可 return collectScope:resolve(nil) end) -- 当收集完成时打印日志 collectJob:invokeOnCompletion(function(error) if error == nil then _console:info("Flow 消费完成!") else _console:error("Flow 消费出错: " .. error:getMessage()) end end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; // 确保脚本在后台运行以等待协程完成 $threads->getMain()->setBackground(true); // 创建一个新的协程作用域 $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->info(javaString("--- 章节 1.1:从固定元素创建 Flow ---")); // 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow $numberFlow = $coroutines->getFlows()->of(1, 2, 3, 4, 5); $console->log(javaString("Flow 已创建,但尚未执行。")); // 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行 // collect 的回调函数接收 (CoroutineScope, value) 两个参数 // 并需要返回一个 Job 来表示对该值的处理状态 $collectJob = $numberFlow->collect($scope, function ($collectScope, $value) { global $console; $console->log(javaString("接收到值: {$value}")); // 对于简单的同步处理,返回一个已完成的 Deferred 即可 return $collectScope->resolve(null); }); // 当收集完成时打印日志 $collectJob->invokeOnCompletion(function ($error) { global $console; if ($error == null) { $console->info(javaString("Flow 消费完成!")); } else { $console->error(javaString("Flow 消费出错: {$error->getMessage()}")); } });
# 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _threads from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _console # 确保脚本在后台运行以等待协程完成 _threads.getMain().setBackground(True) # 创建一个新的协程作用域 scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain())) _console.info("--- 章节 1.1:从固定元素创建 Flow ---") # 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow numberFlow = _coroutines.getFlows().of(1, 2, 3, 4, 5) _console.log("Flow 已创建,但尚未执行。") # 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行 # collect 的回调函数接收 (CoroutineScope, value) 两个参数 # 并需要返回一个 Job 来表示对该值的处理状态 collectJob = numberFlow.collect(scope, lambda collectScope, value: ( _console.log("接收到值: " + str(value)), # 对于简单的同步处理,返回一个已完成的 Deferred 即可 collectScope.resolve(None) )[-1]) # 当收集完成时打印日志 collectJob.invokeOnCompletion(lambda error: ( _console.info("Flow 消费完成!") if error is None else _console.error("Flow 消费出错: " + str(error.getMessage())) ))
# encoding: utf-8 # 确保脚本在后台运行以等待协程完成 $threads.getMain().setBackground(true) # 创建一个新的协程作用域 scope = $coroutines.newScope(nil) $console.info("--- 章节 1.1:从固定元素创建 Flow ---") # 使用 of 创建一个发出 1, 2, 3, 4, 5 的 Flow numberFlow = $coroutines.getFlows().of(1, 2, 3, 4, 5) $console.log("Flow 已创建,但尚未执行。") # 使用 collect 消费 Flow,这是终端操作符,会触发 Flow 的执行 # collect 的回调函数接收 (CoroutineScope, value) 两个参数 # 并需要返回一个 Job 来表示对该值的处理状态 collectJob = numberFlow.collect(scope) do |collectScope, value| $console.log("接收到值: #{value}") # 对于简单的同步处理,返回一个已完成的 Deferred 即可 collectScope.resolve(nil) # Ruby 默认返回最后一行,所以这里 return 可以省略 end # 当收集完成时打印日志 collectJob.invokeOnCompletion do |error| if error == nil $console.info("Flow 消费完成!") else $console.error("Flow 消费出错: #{error.getMessage()}") end end

从集合创建 Flow (from)

你可以轻易地将任何 Iterable 对象(如 List 或 Set)转换为一个 Flow。

import com.m8test.script.GlobalVariables.* // 确保脚本在后台运行 _threads.getMain().setBackground(true) val scope = _coroutines.newScope(null) _console.info("--- 章节 1.2:从集合创建 Flow ---") val fruitList = _iterables.listOf("苹果", "香蕉", "橙子") val fruitFlow = _coroutines.getFlows().from(fruitList) fruitFlow.collect(scope) { s, fruit -> _console.log("今天的水果: ${fruit}") s.resolve(null) }.invokeOnCompletion { _console.info("水果 Flow 消费完成。") }
// 确保脚本在后台运行 $threads.getMain().setBackground(true) def scope = $coroutines.newScope(null) $console.info("--- 章节 1.2:从集合创建 Flow ---") def fruitList = $iterables.listOf("苹果", "香蕉", "橙子") def fruitFlow = $coroutines.getFlows().from(fruitList) fruitFlow.collect(scope, { s, fruit -> $console.log("今天的水果: ${fruit}") s.resolve(null) }).invokeOnCompletion({ $console.info("水果 Flow 消费完成。") })
// 确保脚本在后台运行 $threads.getMain().setBackground(true); const scope = $coroutines.newScope(null); $console.info("--- 章节 1.2:从集合创建 Flow ---"); const fruitList = $iterables.listOf("苹果", "香蕉", "橙子"); const fruitFlow = $coroutines.getFlows().from(fruitList); fruitFlow.collect(scope, (s, fruit) => { $console.log(`今天的水果: ${fruit}`); return s.resolve(null); }).invokeOnCompletion(() => { $console.info("水果 Flow 消费完成。"); });
-- 确保脚本在后台运行 _threads:getMain():setBackground(true) local scope = _coroutines:newScope(nil) _console:info("--- 章节 1.2:从集合创建 Flow ---") local fruitList = _iterables:listOf("苹果", "香蕉", "橙子") local fruitFlow = _coroutines:getFlows():from(fruitList) fruitFlow:collect(scope, function(s, fruit) _console:log("今天的水果: " .. fruit) return s:resolve(nil) end):invokeOnCompletion(function() _console:info("水果 Flow 消费完成。") end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; /** @var m8test_java\com\m8test\script\core\api\collections\Iterables $iterables */ global $iterables; // 确保脚本在后台运行 $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->info(javaString("--- 章节 1.2:从集合创建 Flow ---")); $fruitList = $iterables->listOf(javaString("苹果"), javaString("香蕉"), javaString("橙子")); $fruitFlow = $coroutines->getFlows()->from($fruitList); $fruitFlow->collect($scope, function ($s, $fruit) { global $console; $console->log(javaString("今天的水果: {$fruit}")); return $s->resolve(null); })->invokeOnCompletion(function() { global $console; $console->info(javaString("水果 Flow 消费完成。")); });
# 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _console from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _iterables from m8test_java.com.m8test.script.GlobalVariables import _threads # 确保脚本在后台运行 _threads.getMain().setBackground(True) scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain())) _console.info("--- 章节 1.2:从集合创建 Flow ---") fruitList = _iterables.listOf("苹果", "香蕉", "橙子") fruitFlow = _coroutines.getFlows()._from(fruitList) # 在Python中,from是关键字,API可能会使用from_ (fruitFlow.collect(scope, lambda s, fruit: ( _console.log("今天的水果: {}".format(fruit)), s.resolve(None) )[-1]) .invokeOnCompletion(lambda _: _console.info("水果 Flow 消费完成。")))
# encoding: utf-8 # 确保脚本在后台运行 $threads.getMain().setBackground(true) scope = $coroutines.newScope(nil) $console.info("--- 章节 1.2:从集合创建 Flow ---") fruitList = $iterables.listOf("苹果", "香蕉", "橙子") fruitFlow = $coroutines.getFlows().from(fruitList) fruitFlow.collect(scope) do |s, fruit| $console.log("今天的水果: #{fruit}") s.resolve(nil) end.invokeOnCompletion do |error| $console.info("水果 Flow 消费完成。") end

创建一个定时发射的 Flow (interval)

interval 可以创建一个按指定周期无限发出递增数字的 Flow。这对于模拟定时任务或事件流非常有用。由于它是无限的,我们需要手动管理其生命周期。

import com.m8test.script.GlobalVariables.* // 确保脚本在后台运行 _threads.getMain().setBackground(true) val scope = _coroutines.newScope(null) _console.info("--- 章节 1.3:创建定时 Flow ---") // 创建一个 Flow,延迟 1 秒后开始,之后每 2 秒发出一个递增的数字 (0, 1, 2, ...) val intervalFlow = _coroutines.getFlows().interval(1000, 2000) _console.log("定时 Flow 已创建,将在 1 秒后开始发射。") // 使用 launchIn 在后台启动 Flow 的收集,它是一个“即发即忘”的终端操作符 val intervalJob = intervalFlow.launchIn(scope) // 监听 Flow 中每个发出的值(使用 onEach 操作符,详见后续章节) intervalFlow.onEach { s, tick -> _console.log("Tick: ${tick}") s.resolve(null) }.launchIn(scope) _console.log("脚本将在 7 秒后取消协程作用域,停止定时 Flow。") // 延迟 7 秒后取消整个作用域 scope.delay(7000).then { s, _ -> _console.warn("正在取消作用域...") // 取消 scope 会自动取消所有在其内部启动的协程和 Flow scope.cancel("测试结束") _console.info("作用域已取消,定时 Flow 停止。") s.resolve(null) }
// 确保脚本在后台运行 $threads.getMain().setBackground(true) def scope = $coroutines.newScope(null) $console.info("--- 章节 1.3:创建定时 Flow ---") // 创建一个 Flow,延迟 1 秒后开始,之后每 2 秒发出一个递增的数字 (0, 1, 2, ...) def intervalFlow = $coroutines.getFlows().interval(1000, 2000) $console.log("定时 Flow 已创建,将在 1 秒后开始发射。") // 使用 launchIn 在后台启动 Flow 的收集,它是一个“即发即忘”的终端操作符 def intervalJob = intervalFlow.launchIn(scope) // 监听 Flow 中每个发出的值(使用 onEach 操作符,详见后续章节) intervalFlow.onEach({ s, tick -> $console.log("Tick: ${tick}") s.resolve(null) }).launchIn(scope) $console.log("脚本将在 7 秒后取消协程作用域,停止定时 Flow。") // 延迟 7 秒后取消整个作用域 scope.delay(7000).then({ s, _ -> $console.warn("正在取消作用域...") // 取消 scope 会自动取消所有在其内部启动的协程和 Flow scope.cancel("测试结束") $console.info("作用域已取消,定时 Flow 停止。") })
// 确保脚本在后台运行 $threads.getMain().setBackground(true); const scope = $coroutines.newScope(null); $console.info("--- 章节 1.3:创建定时 Flow ---"); // 创建一个 Flow,延迟 1 秒后开始,之后每 2 秒发出一个递增的数字 (0, 1, 2, ...) const intervalFlow = $coroutines.getFlows().interval(1000, 2000); $console.log("定时 Flow 已创建,将在 1 秒后开始发射。"); // 使用 launchIn 在后台启动 Flow 的收集,它是一个“即发即忘”的终端操作符 const intervalJob = intervalFlow.launchIn(scope); // 监听 Flow 中每个发出的值(使用 onEach 操作符,详见后续章节) intervalFlow.onEach((s, tick) => { $console.log(`Tick: ${tick}`); return s.resolve(null); }).launchIn(scope); $console.log("脚本将在 7 秒后取消协程作用域,停止定时 Flow。"); // 延迟 7 秒后取消整个作用域 scope.delay(7000).then((s, _) => { $console.warn("正在取消作用域..."); // 取消 scope 会自动取消所有在其内部启动的协程和 Flow scope.cancel("测试结束"); $console.info("作用域已取消,定时 Flow 停止。"); // then 链的末端最好也返回一个 Deferred return s.resolve(null); });
-- 确保脚本在后台运行 _threads:getMain():setBackground(true) local scope = _coroutines:newScope(nil) _console:info("--- 章节 1.3:创建定时 Flow ---") -- 创建一个 Flow,延迟 1 秒后开始,之后每 2 秒发出一个递增的数字 (0, 1, 2, ...) local intervalFlow = _coroutines:getFlows():interval(1000, 2000) _console:log("定时 Flow 已创建,将在 1 秒后开始发射。") -- 使用 launchIn 在后台启动 Flow 的收集,它是一个“即发即忘”的终端操作符 local intervalJob = intervalFlow:launchIn(scope) -- 监听 Flow 中每个发出的值(使用 onEach 操作符,详见后续章节) intervalFlow:onEach(function(s, tick) _console:log("Tick: " .. tostring(tick)) return s:resolve(nil) end):launchIn(scope) _console:log("脚本将在 7 秒后取消协程作用域,停止定时 Flow。") -- 延迟 7 秒后取消整个作用域 scope:delay(7000):_then(function(s, _) _console:warn("正在取消作用域...") -- 取消 scope 会自动取消所有在其内部启动的协程和 Flow scope:cancel("测试结束") _console:info("作用域已取消,定时 Flow 停止。") end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; // 确保脚本在后台运行 $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->info(javaString("--- 章节 1.3:创建定时 Flow ---")); // 创建一个 Flow,延迟 1 秒后开始,之后每 2 秒发出一个递增的数字 (0, 1, 2, ...) $intervalFlow = $coroutines->getFlows()->interval(1000, 2000); $console->log(javaString("定时 Flow 已创建,将在 1 秒后开始发射。")); // 使用 launchIn 在后台启动 Flow 的收集,它是一个“即发即忘”的终端操作符 $intervalJob = $intervalFlow->launchIn($scope); // 监听 Flow 中每个发出的值(使用 onEach 操作符,详见后续章节) $intervalFlow->onEach(function ($s, $tick) { global $console; $console->log(javaString("Tick: {$tick}")); return $s->resolve(null); })->launchIn($scope); $console->log(javaString("脚本将在 7 秒后取消协程作用域,停止定时 Flow。")); // 延迟 7 秒后取消整个作用域 $scope->delay(7000)->then(function ($s, $_) use ($scope) { global $console; $console->warn(javaString("正在取消作用域...")); // 取消 scope 会自动取消所有在其内部启动的协程和 Flow $scope->cancel(javaString("测试结束")); $console->info(javaString("作用域已取消,定时 Flow 停止。")); return $s->resolve(null); });
# 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _threads from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _console # 确保脚本在后台运行 _threads.getMain().setBackground(True) scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain())) _console.info("--- 章节 1.3:创建定时 Flow ---") # 创建一个 Flow,延迟 1 秒后开始,之后每 2 秒发出一个递增的数字 (0, 1, 2, ...) intervalFlow = _coroutines.getFlows().interval(1000, 2000) _console.log("定时 Flow 已创建,将在 1 秒后开始发射。") # 使用 launchIn 在后台启动 Flow 的收集,它是一个“即发即忘”的终端操作符 intervalJob = intervalFlow.launchIn(scope) # 监听 Flow 中每个发出的值(使用 onEach 操作符,详见后续章节) intervalFlow.onEach(lambda s, tick: ( _console.log("Tick: {}".format(tick)), s.resolve(None) )[-1]).launchIn(scope) _console.log("脚本将在 7 秒后取消协程作用域,停止定时 Flow。") # 延迟 7 秒后取消整个作用域 scope.delay(7000).then(lambda s, _: ( _console.warn("正在取消作用域..."), # 取消 scope 会自动取消所有在其内部启动的协程和 Flow scope.cancel("测试结束"), _console.info("作用域已取消,定时 Flow 停止。"), s.resolve(None) )[-1])
# encoding: utf-8 # 确保脚本在后台运行 $threads.getMain().setBackground(true) scope = $coroutines.newScope(nil) $console.info("--- 章节 1.3:创建定时 Flow ---") # 创建一个 Flow,延迟 1 秒后开始,之后每 2 秒发出一个递增的数字 (0, 1, 2, ...) intervalFlow = $coroutines.getFlows().interval(1000, 2000) $console.log("定时 Flow 已创建,将在 1 秒后开始发射。") # 使用 launchIn 在后台启动 Flow 的收集,它是一个“即发即忘”的终端操作符 intervalJob = intervalFlow.launchIn(scope) # 监听 Flow 中每个发出的值(使用 onEach 操作符,详见后续章节) intervalFlow.onEach(lambda do |s, tick| $console.log("Tick: #{tick}") s.resolve(nil) end).launchIn(scope) $console.log("脚本将在 7 秒后取消协程作用域,停止定时 Flow。") # 延迟 7 秒后取消整个作用域 scope.delay(7000).then do |s, _| $console.warn("正在取消作用域...") # 取消 scope 会自动取消所有在其内部启动的协程和 Flow scope.cancel("测试结束") $console.info("作用域已取消,定时 Flow 停止。") end

转换 Flow (中间操作符)

中间操作符允许你像操作集合一样,对 Flow 发出的数据进行转换、过滤和处理。这些操作符本身也是冷的,它们返回一个新的 Flow 实例,并且只有在最终被消费时才会执行。

映射值(map)

map 操作符会对 Flow 中的每个元素应用一个转换函数。

import com.m8test.script.GlobalVariables.* // 确保脚本在后台运行 _threads.getMain().setBackground(true) val scope = _coroutines.newScope(null) _console.info("--- 章节 2.1:使用 map 转换值 ---") _coroutines.getFlows().of(1, 2, 3) .map { s, value -> // map 的转换函数需要返回一个 Deferred s.resolve("数字 ${value} 的平方是 ${value * value}") } .collect(scope) { s, resultString -> _console.log(resultString) s.resolve(null) }
// 确保脚本在后台运行 $threads.getMain().setBackground(true) def scope = $coroutines.newScope(null) $console.info("--- 章节 2.1:使用 map 转换值 ---") $coroutines.getFlows().of(1, 2, 3) .map({ s, value -> // map 的转换函数需要返回一个 Deferred s.resolve("数字 ${value} 的平方是 ${value * value}") }) .collect(scope, { s, resultString -> $console.log(resultString) s.resolve(null) })
// 确保脚本在后台运行 $threads.getMain().setBackground(true); const scope = $coroutines.newScope(null); $console.info("--- 章节 2.1:使用 map 转换值 ---"); $coroutines.getFlows().of(1, 2, 3) .map((s, value) => { // map 的转换函数需要返回一个 Deferred return s.resolve(`数字 ${value} 的平方是 ${value * value}`); }) .collect(scope, (s, resultString) => { $console.log(resultString); return s.resolve(null); });
-- 确保脚本在后台运行 _threads:getMain():setBackground(true) local scope = _coroutines:newScope(nil) _console:info("--- 章节 2.1:使用 map 转换值 ---") _coroutines:getFlows():of(1, 2, 3) :map(function(s, value) -- map 的转换函数需要返回一个 Deferred return s:resolve("数字 " .. tostring(value) .. " 的平方是 " .. tostring(value * value)) end) :collect(scope, function(s, resultString) _console:log(resultString) return s:resolve(nil) end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; // 确保脚本在后台运行 $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->info(javaString("--- 章节 2.1:使用 map 转换值 ---")); $coroutines->getFlows()->of(1, 2, 3) ->map(function ($s, $value) { // map 的转换函数需要返回一个 Deferred return $s->resolve(javaString("数字 {$value} 的平方是 " . ($value * $value))); }) ->collect($scope, function ($s, $resultString) { global $console; $console->log($resultString); return $s->resolve(null); });
# 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _threads from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _console # 确保脚本在后台运行 _threads.getMain().setBackground(True) scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain())) _console.info("--- 章节 2.1:使用 map 转换值 ---") _coroutines.getFlows().of(1, 2, 3).map( # map 的转换函数需要返回一个 Deferred, 这是单个表达式,直接返回 lambda s, value: s.resolve("数字 {} 的平方是 {}".format(value, value * value)) ).collect(scope, lambda s, resultString: ( _console.log(resultString), s.resolve(None) )[-1])
# encoding: utf-8 # 确保脚本在后台运行 $threads.getMain().setBackground(true) scope = $coroutines.newScope(nil) $console.info("--- 章节 2.1:使用 map 转换值 ---") $coroutines.getFlows().of(1, 2, 3) .map { |s, value| # map 的转换函数需要返回一个 Deferred s.resolve("数字 #{value} 的平方是 #{value * value}") } .collect(scope) { |s, resultString| $console.log(resultString) s.resolve(nil) }

过滤值 (filter)

filter 操作符只允许满足条件的元素通过。

import com.m8test.script.GlobalVariables.* // 确保脚本在后台运行 _threads.getMain().setBackground(true) val scope = _coroutines.newScope(null) _console.info("--- 章节 2.2:使用 filter 过滤值 ---") // Kotlin 的 Range 1..10 也是 Iterable _coroutines.getFlows().from(1..10) .filter { s, number -> // filter 的判断函数需要返回一个 Deferred<Boolean> s.resolve(number % 2 == 0) // 只保留偶数 } .map { s, evenNumber -> s.resolve("发现一个偶数: ${evenNumber}") } .collect(scope) { s, result -> _console.log(result) s.resolve(null) }
// 确保脚本在后台运行 $threads.getMain().setBackground(true) def scope = $coroutines.newScope(null) $console.info("--- 章节 2.2:使用 filter 过滤值 ---") $coroutines.getFlows().from(1..10) .filter({ s, number -> // filter 的判断函数需要返回一个 Deferred<Boolean> s.resolve(number % 2 == 0) // 只保留偶数 }) .map({ s, evenNumber -> s.resolve("发现一个偶数: ${evenNumber}") }) .collect(scope, { s, result -> $console.log(result) s.resolve(null) })
// 确保脚本在后台运行 $threads.getMain().setBackground(true); const scope = $coroutines.newScope(null); $console.info("--- 章节 2.2:使用 filter 过滤值 ---"); // Groovy 的 1..10 范围转换为 JS 数组 const numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; $coroutines.getFlows().from(numbers) .filter((s, number) => { // filter 的判断函数需要返回一个 Deferred<Boolean> return s.resolve(number % 2 === 0); // 只保留偶数 }) .map((s, evenNumber) => { return s.resolve(`发现一个偶数: ${evenNumber}`); }) .collect(scope, (s, result) => { $console.log(result); return s.resolve(null); });
-- 确保脚本在后台运行 _threads:getMain():setBackground(true) local scope = _coroutines:newScope(nil) _console:info("--- 章节 2.2:使用 filter 过滤值 ---") _coroutines:getFlows():from(_iterables:listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) :filter(function(s, number) -- filter 的判断函数需要返回一个 Deferred<Boolean> return s:resolve(number % 2 == 0) -- 只保留偶数 end) :map(function(s, evenNumber) return s:resolve("发现一个偶数: " .. tostring(evenNumber)) end) :collect(scope, function(s, result) _console:log(result) return s:resolve(nil) end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; // 确保脚本在后台运行 $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->info(javaString("--- 章节 2.2:使用 filter 过滤值 ---")); global $iterables; $coroutines->getFlows()->from($iterables->listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) ->filter(function ($s, $number) { // filter 的判断函数需要返回一个 Deferred<Boolean> return $s->resolve($number % 2 == 0); // 只保留偶数 }) ->map(function ($s, $evenNumber) { return $s->resolve(javaString("发现一个偶数: {$evenNumber}")); }) ->collect($scope, function ($s, $result) { global $console; $console->log($result); return $s->resolve(null); });
# 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _console from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _iterables from m8test_java.com.m8test.script.GlobalVariables import _threads # 确保脚本在后台运行 _threads.getMain().setBackground(True) scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain())) _console.info("--- 章节 2.2:使用 filter 过滤值 ---") # Groovy的 1..10 对应 Python的 range(1, 11) _coroutines.getFlows()._from(_iterables.listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).filter( # filter 的判断函数需要返回一个 Deferred<Boolean>, 单个表达式,直接返回 lambda s, number: s.resolve(number % 2 == 0) # 只保留偶数 ).map( # map 的转换函数,单个表达式,直接返回 lambda s, evenNumber: s.resolve("发现一个偶数: {}".format(evenNumber)) ).collect(scope, lambda s, result: ( _console.log(result), s.resolve(None) )[-1])
# encoding: utf-8 # 确保脚本在后台运行 $threads.getMain().setBackground(true) scope = $coroutines.newScope(nil) $console.info("--- 章节 2.2:使用 filter 过滤值 ---") # 修复说明: # 1. 使用 (1..10).to_a 将 Range 转换为 Array (对应 Java List),避免类型推断问题。 # 2. 使用 proc { ... } 替代 lambda { ... }。在 Android JRuby 环境中,proc 更容易通过 Proxy 机制实现 Java 接口,避免类加载错误。 $coroutines.getFlows().from((1..10).to_a) .filter(proc { |s, number| # filter 的判断函数需要返回一个 Deferred<Boolean> # 这里的 number 是 Fixnum,可以直接进行模运算 s.resolve(number % 2 == 0) # 只保留偶数 }) .map(proc { |s, evenNumber| s.resolve("发现一个偶数: #{evenNumber}") }) .collect(scope, proc { |s, result| $console.log(result) s.resolve(nil) })

执行副作用 (onEach)

onEach 可以在不改变 Flow 内容的情况下,对每个元素执行一个操作,非常适合用于调试或日志记录。

import com.m8test.script.GlobalVariables.* // 确保脚本在后台运行 _threads.getMain().setBackground(true) val scope = _coroutines.newScope(null) _console.info("--- 章节 2.3:使用 onEach 执行副作用 ---") _coroutines.getFlows().of("A", "B", "C") .onEach { s, item -> _console.log("onEach - 正在处理: ${item}") s.resolve(null) } .map { s, item -> s.resolve("转换后的值: [${item}]") } .collect(scope) { s, result -> _console.log("collect - 最终结果: ${result}") s.resolve(null) }
// 确保脚本在后台运行 $threads.getMain().setBackground(true) def scope = $coroutines.newScope(null) $console.info("--- 章节 2.3:使用 onEach 执行副作用 ---") $coroutines.getFlows().of("A", "B", "C") .onEach({ s, item -> $console.log("onEach - 正在处理: ${item}") s.resolve(null) }) .map({ s, item -> s.resolve("转换后的值: [${item}]") }) .collect(scope, { s, result -> $console.log("collect - 最终结果: ${result}") s.resolve(null) })
// 确保脚本在后台运行 $threads.getMain().setBackground(true); const scope = $coroutines.newScope(null); $console.info("--- 章节 2.3:使用 onEach 执行副作用 ---"); $coroutines.getFlows().of("A", "B", "C") .onEach((s, item) => { $console.log(`onEach - 正在处理: ${item}`); return s.resolve(null); }) .map((s, item) => { return s.resolve(`转换后的值: [${item}]`); }) .collect(scope, (s, result) => { $console.log(`collect - 最终结果: ${result}`); return s.resolve(null); });
-- 确保脚本在后台运行 _threads:getMain():setBackground(true) local scope = _coroutines:newScope(nil) _console:info("--- 章节 2.3:使用 onEach 执行副作用 ---") _coroutines:getFlows():of("A", "B", "C") :onEach(function(s, item) _console:log("onEach - 正在处理: " .. item) return s:resolve(nil) end) :map(function(s, item) return s:resolve("转换后的值: [" .. item .. "]") end) :collect(scope, function(s, result) _console:log("collect - 最终结果: " .. result) return s:resolve(nil) end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; // 确保脚本在后台运行 $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->info(javaString("--- 章节 2.3:使用 onEach 执行副作用 ---")); $coroutines->getFlows()->of(javaString("A"), javaString("B"), javaString("C")) ->onEach(function ($s, $item) { global $console; $console->log(javaString("onEach - 正在处理: {$item}")); return $s->resolve(null); }) ->map(function ($s, $item) { return $s->resolve(javaString("转换后的值: [{$item}]")); }) ->collect($scope, function ($s, $result) { global $console; $console->log(javaString("collect - 最终结果: {$result}")); return $s->resolve(null); });
# 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _threads from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _console # 确保脚本在后台运行 _threads.getMain().setBackground(True) scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain())) _console.info("--- 章节 2.3:使用 onEach 执行副作用 ---") _coroutines.getFlows().of("A", "B", "C").onEach( lambda s, item: ( _console.log("onEach - 正在处理: {}".format(item)), s.resolve(None) )[-1] ).map( # 单个表达式,直接返回 lambda s, item: s.resolve("转换后的值: [{}]".format(item)) ).collect(scope, lambda s, result: ( _console.log("collect - 最终结果: {}".format(result)), s.resolve(None) )[-1])
# encoding: utf-8 # 确保脚本在后台运行 $threads.getMain().setBackground(true) scope = $coroutines.newScope(nil) $console.info("--- 章节 2.3:使用 onEach 执行副作用 ---") $coroutines.getFlows().of("A", "B", "C") .onEach(lambda do |s, item| $console.log("onEach - 正在处理: #{item}") s.resolve(nil) end) .map(lambda do |s, item| s.resolve("转换后的值: [#{item}]") end) .collect(scope, lambda do |s, result| $console.log("collect - 最终结果: #{result}") s.resolve(nil) end)

Flow 的错误处理

异步流中可能会发生异常。Flow 提供了类似 try-catch-finally 的机制来优雅地处理这些错误。

捕获异常 (catch)

catch 操作符可以捕获其上游(即 catch 调用之前的所有操作)发生的异常。它不会捕获下游(collect 内部)的异常。

import com.m8test.script.GlobalVariables.* // 确保脚本在后台运行 _threads.getMain().setBackground(true) val scope = _coroutines.newScope(null) _console.info("--- 章节 3.1:使用 catch 捕获异常 ---") _coroutines.getFlows().of(1, 2, 3, 4, 5) .map { s, value -> if (value == 4) { // 模拟一个处理错误 throw Exception("处理值 4 时发生错误!") } s.resolve("成功处理: ${value}") } .catch { s, collector, error -> _console.error("Flow 捕获到异常: ${error.getMessage()}") // 在捕获异常后,可以选择发射一个备用值 collector.emit(s, "发生错误,使用备用值") s.resolve(null) // catch 块本身也需要返回一个 Job } .collect(scope) { s, result -> _console.log(result) s.resolve(null) }
// 确保脚本在后台运行 $threads.getMain().setBackground(true) def scope = $coroutines.newScope(null) $console.info("--- 章节 3.1:使用 catch 捕获异常 ---") $coroutines.getFlows().of(1, 2, 3, 4, 5) .map({ s, value -> if (value == 4) { // 模拟一个处理错误 throw new Exception("处理值 4 时发生错误!") } s.resolve("成功处理: ${value}") }) .catch({ s, collector, error -> $console.error("Flow 捕获到异常: ${error.getMessage()}") // 在捕获异常后,可以选择发射一个备用值 collector.emit(s, "发生错误,使用备用值") s.resolve(null) // catch 块本身也需要返回一个 Job }) .collect(scope, { s, result -> $console.log(result) s.resolve(null) })
// 确保脚本在后台运行 $threads.getMain().setBackground(true); const scope = $coroutines.newScope(null); $console.info("--- 章节 3.1:使用 catch 捕获异常 ---"); $coroutines.getFlows().of(1, 2, 3, 4, 5) .map((s, value) => { if (value === 4) { // 模拟一个处理错误 throw new Error("处理值 4 时发生错误!"); } return s.resolve(`成功处理: ${value}`); }) .catch((s, collector, error) => { $console.error(`Flow 捕获到异常: ${error.getMessage()}`); // 在捕获异常后,可以选择发射一个备用值 collector.emit(s, "发生错误,使用备用值"); // catch 块本身也需要返回一个 Job return s.resolve(null); }) .collect(scope, (s, result) => { $console.log(result); return s.resolve(null); });
-- 确保脚本在后台运行 _threads:getMain():setBackground(true) local scope = _coroutines:newScope(nil) _console:info("--- 章节 3.1:使用 catch 捕获异常 ---") _coroutines:getFlows():of(1, 2, 3, 4, 5) :map(function(s, value) if value == 4 then -- 模拟一个处理错误 error("处理值 4 时发生错误!") end return s:resolve("成功处理: " .. tostring(value)) end) :catch(function(s, collector, error) _console:error("Flow 捕获到异常: " .. error:getMessage()) -- 在捕获异常后,可以选择发射一个备用值 collector:emit(s, "发生错误,使用备用值") return s:resolve(nil) -- catch 块本身也需要返回一个 Job end) :collect(scope, function(s, result) _console:log(result) return s:resolve(nil) end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; // 确保脚本在后台运行 $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->info(javaString("--- 章节 3.1:使用 catch 捕获异常 ---")); $coroutines->getFlows()->of(1, 2, 3, 4, 5) ->map(function ($s, $value) { if ($value == 4) { global $exceptions; // 模拟一个处理错误 return $s->reject($exceptions->newThrowable(function ($eb) { $eb->setMessageAndClassName(javaString("处理值 4 时发生错误!"), null); })); } return $s->resolve(javaString("成功处理: {$value}")); }) ->catch(function ($s, $collector, $error) { global $console; $console->error(javaString("Flow 捕获到异常: {$error->getMessage()}")); // 在捕获异常后,可以选择发射一个备用值 $collector->emit($s, javaString("发生错误,使用备用值")); return $s->resolve(null); // catch 块本身也需要返回一个 Job }) ->collect($scope, function ($s, $result) { global $console; $console->log($result); return $s->resolve(null); });
# 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _console from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _exceptions from m8test_java.com.m8test.script.GlobalVariables import _threads # 确保脚本在后台运行 _threads.getMain().setBackground(True) scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain())) _console.info("--- 章节 3.1:使用 catch 捕获异常 ---") _coroutines.getFlows().of(1, 2, 3, 4, 5).map( # 使用条件表达式处理 if/else lambda s, value: s.reject( _exceptions.newThrowable(lambda b: b.setMessageAndClassName("处理值 4 时发生错误!", "java.lang.Exception"))) if value == 4 else s.resolve("成功处理: {}".format(value)) ).catch( # 在Python中,catch是关键字,API可能会使用catch_ lambda s, collector, error: ( _console.error("Flow 捕获到异常: {}".format(error.getMessage())), # 在捕获异常后,可以选择发射一个备用值 collector.emit(s, "发生错误,使用备用值"), s.resolve(None) # catch 块本身也需要返回一个 Job )[-1] ).collect(scope, lambda s, result: ( _console.log(result), s.resolve(None) )[-1])
# encoding: utf-8 # 确保脚本在后台运行 $threads.getMain().setBackground(true) scope = $coroutines.newScope(nil) $console.info("--- 章节 3.1:使用 catch 捕获异常 ---") $coroutines.getFlows().of(1, 2, 3, 4, 5) .map(lambda do |s, value| if value == 4 # 模拟一个处理错误 raise Exception.new("处理值 4 时发生错误!") end s.resolve("成功处理: #{value}") end) .catch(lambda do |s, collector, error| $console.error("Flow 捕获到异常: #{error.getMessage()}") # 在捕获异常后,可以选择发射一个备用值 collector.emit(s, "发生错误,使用备用值") s.resolve(nil) # catch 块本身也需要返回一个 Job end) .collect(scope, lambda do |s, result| $console.log(result) s.resolve(nil) end)

最终清理 (onCompletion)

onCompletion 类似于 finally 块,无论 Flow 是正常完成还是因异常终止,它都会被执行。

import com.m8test.script.GlobalVariables.* // 确保脚本在后台运行 _threads.getMain().setBackground(true) val scope = _coroutines.newScope(null) _console.info("--- 章节 3.2:使用 onCompletion 进行清理 ---") val flowWithPotentialError = _coroutines.getFlows().create { s, collector -> _console.log("Flow 开始执行...") collector.emit(s, "数据1") collector.emit(s, "数据2") // 模拟一个可能发生的错误 if (Math.random() > 0.5) { throw Exception("随机错误发生了!") } s.resolve(null) } flowWithPotentialError .catch { s, collector, error -> _console.error("在 catch 块中处理了异常: ${error.getMessage()}") s.resolve(null) } .onCompletion { s, cause -> if (cause != null) { _console.warn("Flow 完成,但有异常: ${cause.getMessage()}") } else { _console.info("Flow 正常完成。") } _console.log("执行清理操作,如关闭资源...") s.resolve(null) } .launchIn(scope) // 使用 launchIn 启动,不关心具体值
// 确保脚本在后台运行 $threads.getMain().setBackground(true) def scope = $coroutines.newScope(null) $console.info("--- 章节 3.2:使用 onCompletion 进行清理 ---") def flowWithPotentialError = $coroutines.getFlows().create({ s, collector -> $console.log("Flow 开始执行...") collector.emit(s, "数据1") collector.emit(s, "数据2") // 模拟一个可能发生的错误 if (Math.random() > 0.5) { throw new Exception("随机错误发生了!") } s.resolve(null) }) flowWithPotentialError .catch({ s, collector, error -> $console.error("在 catch 块中处理了异常: ${error.getMessage()}") s.resolve(null) }) .onCompletion({ s, cause -> if (cause != null) { $console.warn("Flow 完成,但有异常: ${cause.getMessage()}") } else { $console.info("Flow 正常完成。") } $console.log("执行清理操作,如关闭资源...") s.resolve(null) }) .launchIn(scope) // 使用 launchIn 启动,不关心具体值
// 确保脚本在后台运行 $threads.getMain().setBackground(true); const scope = $coroutines.newScope(null); $console.info("--- 章节 3.2:使用 onCompletion 进行清理 ---"); const flowWithPotentialError = $coroutines.getFlows().create((s, collector) => { $console.log("Flow 开始执行..."); collector.emit(s, "数据1"); collector.emit(s, "数据2"); // 模拟一个可能发生的错误 if (Math.random() > 0.5) { throw new Error("随机错误发生了!"); } return s.resolve(null); }); flowWithPotentialError .catch((s, collector, error) => { $console.error(`在 catch 块中处理了异常: ${error.getMessage()}`); return s.resolve(null); }) .onCompletion((s, cause) => { if (cause != null) { $console.warn(`Flow 完成,但有异常: ${cause.getMessage()}`); } else { $console.info("Flow 正常完成。"); } $console.log("执行清理操作,如关闭资源..."); return s.resolve(null); }) .launchIn(scope); // 使用 launchIn 启动,不关心具体值
-- 确保脚本在后台运行 _threads:getMain():setBackground(true) local scope = _coroutines:newScope(nil) _console:info("--- 章节 3.2:使用 onCompletion 进行清理 ---") local flowWithPotentialError = _coroutines:getFlows():create(function(s, collector) _console:log("Flow 开始执行...") collector:emit(s, "数据1") collector:emit(s, "数据2") -- 模拟一个可能发生的错误 if math.random() > 0.5 then error("随机错误发生了!") end return s:resolve(nil) end) flowWithPotentialError :catch(function(s, collector, error) _console:error("在 catch 块中处理了异常: " .. error:getMessage()) return s:resolve(nil) end) :onCompletion(function(s, cause) if cause ~= nil then _console:warn("Flow 完成,但有异常: " .. cause:getMessage()) else _console:info("Flow 正常完成。") end _console:log("执行清理操作,如关闭资源...") return s:resolve(nil) end) :launchIn(scope) -- 使用 launchIn 启动,不关心具体值
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; // 确保脚本在后台运行 $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->info(javaString("--- 章节 3.2:使用 onCompletion 进行清理 ---")); $flowWithPotentialError = $coroutines->getFlows()->create(function ($s, $collector) { global $console; $console->log(javaString("Flow 开始执行...")); $collector->emit($s, javaString("数据1")); $collector->emit($s, javaString("数据2")); // 模拟一个可能发生的错误 if ((mt_rand() / mt_getrandmax()) > 0.5) { global $exceptions; return $s->reject($exceptions->newThrowable(function ($eb) { $eb->setMessageAndClassName(javaString("随机错误发生了!"), null); })); } return $s->resolve(null); }); $flowWithPotentialError ->catch(function ($s, $collector, $error) { global $console; $console->error(javaString("在 catch 块中处理了异常: {$error->getMessage()}")); return $s->resolve(null); }) ->onCompletion(function ($s, $cause) { global $console; if ($cause != null) { $console->warn(javaString("Flow 完成,但有异常: {$cause->getMessage()}")); } else { $console->info(javaString("Flow 正常完成。")); } $console->log(javaString("执行清理操作,如关闭资源...")); return $s->resolve(null); }) ->launchIn($scope); // 使用 launchIn 启动,不关心具体值
# 导入Python标准库 import random from m8test_java.com.m8test.script.GlobalVariables import _console from m8test_java.com.m8test.script.GlobalVariables import _coroutines # 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _threads # 确保脚本在后台运行 _threads.getMain().setBackground(True) scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain())) _console.info("--- 章节 3.2:使用 onCompletion 进行清理 ---") def throw_error(): raise RuntimeError("随机错误发生了!") flowWithPotentialError = _coroutines.getFlows().create(lambda s, collector: ( _console.log("Flow 开始执行..."), collector.emit(s, "数据1"), collector.emit(s, "数据2"), # 使用条件表达式处理 if 逻辑 (throw_error() if random.random() > 0.5 else s.resolve(None)) )[-1]) flowWithPotentialError.catch( lambda s, collector, error: ( _console.error("在 catch 块中处理了异常: {}".format(error.getMessage())), s.resolve(None) )[-1] ).onCompletion( lambda s, cause: ( (_console.warn("Flow 完成,但有异常: {}".format(cause.getMessage())) if cause is not None else _console.info("Flow 正常完成。")), _console.log("执行清理操作,如关闭资源..."), s.resolve(None) )[-1] ).launchIn(scope) # 使用 launchIn 启动,不关心具体值
# encoding: utf-8 # 确保脚本在后台运行 $threads.getMain().setBackground(true) scope = $coroutines.newScope(nil) $console.info("--- 章节 3.2:使用 onCompletion 进行清理 ---") flowWithPotentialError = $coroutines.getFlows().create do |s, collector| $console.log("Flow 开始执行...") collector.emit(s, "数据1") collector.emit(s, "数据2") # 模拟一个可能发生的错误 if Java::JavaLang::Math.random() > 0.5 raise Exception.new("随机错误发生了!") end s.resolve(nil) end flowWithPotentialError .catch(lambda do |s, collector, error| $console.error("在 catch 块中处理了异常: #{error.getMessage()}") s.resolve(nil) end) .onCompletion(lambda do |s, cause| if cause != nil $console.warn("Flow 完成,但有异常: #{cause.getMessage()}") else $console.info("Flow 正常完成。") end $console.log("执行清理操作,如关闭资源...") s.resolve(nil) end) .launchIn(scope) # 使用 launchIn 启动,不关心具体值

Flow 的并发与上下文

默认情况下,Flow 的代码在收集它的协程的上下文中执行。flowOn 操作符是改变 Flow 上游执行线程的唯一正确方式。

import com.m8test.script.GlobalVariables.* // 确保脚本在后台运行 _threads.getMain().setBackground(true) val scope = _coroutines.newScope { setDispatcher { dispatchers -> dispatchers.getScriptMain() } } _console.info("--- 章节 4:使用 flowOn 切换执行上下文 ---") val logThread = { tag: String -> _console.log("[${tag}] 当前线程: ${Thread.currentThread().name}") } _coroutines.getFlows().of("任务A", "任务B") .onEach { s, task -> logThread("1. Flow 创建后 (上游)") s.resolve(null) } // flowOn 将其上面的所有操作切换到 IO 线程池执行 .flowOn { setDispatcher { it.getIO() } } .onEach { s, task -> logThread("2. flowOn 之后 (下游)") s.resolve(null) } .collect(scope) { s, task -> logThread("3. collect 内部 (收集器)") s.resolve(null) } .invokeOnCompletion { _console.info("可以看到,'上游'部分在IO线程执行,而'下游'和'收集器'在启动 collect 的主脚本线程执行。") }
// 确保脚本在后台运行 $threads.getMain().setBackground(true) def scope = $coroutines.newScope { it.setDispatcher { dispatchers -> dispatchers.getScriptMain() } } $console.info("--- 章节 4:使用 flowOn 切换执行上下文 ---") def logThread = { tag -> $console.log("[${tag}] 当前线程: ${Thread.currentThread().getName()}") } $coroutines.getFlows().of("任务A", "任务B") .onEach({ s, task -> logThread("1. Flow 创建后 (上游)") s.resolve(null) }) // flowOn 将其上面的所有操作切换到 IO 线程池执行 .flowOn({ it.setDispatcher({ it.getIO() }) }) .onEach({ s, task -> logThread("2. flowOn 之后 (下游)") s.resolve(null) }) .collect(scope, { s, task -> logThread("3. collect 内部 (收集器)") s.resolve(null) }) .invokeOnCompletion({ $console.info("可以看到,'上游'部分在IO线程执行,而'下游'和'收集器'在启动 collect 的主脚本线程执行。") })
// 确保脚本在后台运行 $threads.getMain().setBackground(true); const scope = $coroutines.newScope(it => { it.setDispatcher(dispatchers => dispatchers.getScriptMain()); }); $console.info("--- 章节 4:使用 flowOn 切换执行上下文 ---"); /** @param {string} tag */ const logThread = tag => { $console.log(`[${tag}] 当前线程: ${Packages.java.lang.Thread.currentThread().getName()}`); }; $coroutines.getFlows().of("任务A", "任务B") .onEach((s, task) => { logThread("1. Flow 创建后 (上游)"); return s.resolve(null); }) // flowOn 将其上面的所有操作切换到 IO 线程池执行 .flowOn(context => { context.setDispatcher(dispatchers => dispatchers.getIO()); }) .onEach((s, task) => { logThread("2. flowOn 之后 (下游)"); return s.resolve(null); }) .collect(scope, (s, task) => { logThread("3. collect 内部 (收集器)"); return s.resolve(null); }) .invokeOnCompletion(() => { $console.info("可以看到,'上游'部分在IO线程执行,而'下游'和'收集器'在启动 collect 的主脚本线程执行。"); });
local Thread = require("m8test_java.java.lang.Thread") -- 确保脚本在后台运行 _threads:getMain():setBackground(true) local scope = _coroutines:newScope(function(it) it:setDispatcher(function(dispatchers) return dispatchers:getScriptMain() end) end) _console:info("--- 章节 4:使用 flowOn 切换执行上下文 ---") local logThread = function(tag) _console:log("[" .. tag .. "] 当前线程: " .. Thread:currentThread():getName()) end _coroutines:getFlows():of("任务A", "任务B") :onEach(function(s, task) logThread("1. Flow 创建后 (上游)") return s:resolve(nil) end) -- flowOn 将其上面的所有操作切换到 IO 线程池执行 :flowOn(function(it) it:setDispatcher(function(disp) return disp:getIO() end) end) :onEach(function(s, task) logThread("2. flowOn 之后 (下游)") return s:resolve(nil) end) :collect(scope, function(s, task) logThread("3. collect 内部 (收集器)") return s:resolve(nil) end) :invokeOnCompletion(function() _console:info("可以看到,'上游'部分在IO线程执行,而'下游'和'收集器'在启动 collect 的主脚本线程执行。") end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; use m8test_java\java\lang\Thread; // 确保脚本在后台运行 $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->info(javaString("--- 章节 4:使用 flowOn 切换执行上下文 ---")); $logThread = function ($tag) { global $console; $console->log(javaString("[{$tag}] 当前线程: ") . Thread::currentThread()->getName()); }; $coroutines->getFlows()->of(javaString("任务A"), javaString("任务B")) ->onEach(function ($s, $task) use ($logThread) { $logThread(javaString("1. Flow 创建后 (上游)")); return $s->resolve(null); }) // flowOn 将其上面的所有操作切换到 IO 线程池执行 ->flowOn(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }) ->onEach(function ($s, $task) use ($logThread) { $logThread(javaString("2. flowOn 之后 (下游)")); return $s->resolve(null); }) ->collect($scope, function ($s, $task) use ($logThread) { $logThread(javaString("3. collect 内部 (收集器)")); return $s->resolve(null); }) ->invokeOnCompletion(function () { global $console; $console->info(javaString("可以看到,'上游'部分在IO线程执行,而'下游'和'收集器'在启动 collect 的主脚本线程执行。")); });
# 导入Python标准库 import threading # 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _threads from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _console # 确保脚本在后台运行 _threads.getMain().setBackground(True) scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain())) _console.info("--- 章节 4:使用 flowOn 切换执行上下文 ---") logThread = lambda tag: _console.log("[{}] 当前线程: {}".format(tag, threading.current_thread().name)) _coroutines.getFlows().of("任务A", "任务B").onEach( lambda s, task: ( logThread("1. Flow 创建后 (上游)"), s.resolve(None) )[-1] # flowOn 将其上面的所有操作切换到 IO 线程池执行 ).flowOn( # flowOn 的配置lambda, python 只能用 getScriptMain lambda context: context.setDispatcher(lambda d: d.getScriptMain()) ).onEach( lambda s, task: ( logThread("2. flowOn 之后 (下游)"), s.resolve(None) )[-1] ).collect(scope, lambda s, task: ( logThread("3. collect 内部 (收集器)"), s.resolve(None) )[-1]).invokeOnCompletion(lambda _: _console.info("可以看到,'上游'部分在IO线程执行,而'下游'和'收集器'在启动 collect 的主脚本线程执行。") )
# encoding: utf-8 # 确保脚本在后台运行 $threads.getMain().setBackground(true) scope = $coroutines.newScope do |it| it.setDispatcher { |dispatchers| dispatchers.getScriptMain() } end $console.info("--- 章节 4:使用 flowOn 切换执行上下文 ---") logThread = lambda do |tag| $console.log("[#{tag}] 当前线程: #{Java::JavaLang::Thread.currentThread().getName()}") end $coroutines.getFlows().of("任务A", "任务B"). onEach(lambda do |s, task| logThread.call("1. Flow 创建后 (上游)") s.resolve(nil) end). # flowOn 将其上面的所有操作切换到 IO 线程池执行 flowOn(lambda do |it| it.setDispatcher(lambda { |d| d.getIO() }) end). onEach(lambda do |s, task| logThread.call("2. flowOn 之后 (下游)") s.resolve(nil) end). collect(scope, lambda do |s, task| logThread.call("3. collect 内部 (收集器)") s.resolve(nil) end). invokeOnCompletion do |error| $console.info("可以看到,'上游'部分在IO线程执行,而'下游'和'收集器'在启动 collect 的主脚本线程执行。") end

高级创建:桥接回调 API (fromCallback)

fromCallback 是一个强大的构建器,用于将传统的回调式 API(如事件监听器)转换为现代的 Flow。关键在于使用 producer.send() 发送数据,并必须在 producer.awaitClose() 中注销监听器以防止内存泄漏。

import com.m8test.script.GlobalVariables.* // 确保脚本在后台运行 _threads.getMain().setBackground(true) val scope = _coroutines.newScope(null) _console.info("--- 章节 5:使用 fromCallback 桥接回调 ---") // 使用 fromCallback 创建一个 Flow val eventFlow = _coroutines.getFlows().fromCallback { cbScope, producer -> _console.log("fromCallback: 开始设置模拟监听器...") var intervalId = -1 var i = 0 intervalId = _threads.getMain().getTimer().setInterval({ if (!producer.isClosedForSend()) { val event = "事件 ${i}" _console.log("模拟监听器: 产生 -> ${event}") // 通过 producer.send() 将数据发送到 Flow producer.send(cbScope, event) i++ } else { _threads.getMain().getTimer().clearInterval(intervalId) } }, 1000) // 这是关键!当 Flow 被取消或完成时,此代码块会被执行 producer.awaitClose { _console.warn("awaitClose: Flow 结束,正在注销监听器...") _console.info("awaitClose: 清理完成!") } // fromCallback 的 block 也需要返回一个 Job cbScope.resolve(null) } // 启动对 eventFlow 的收集 val collectionJob = eventFlow .take(5) // 我们只取前5个事件,这会导致 Flow 正常完成 .collect(scope) { s, event -> _console.log("收集器: 收到 <- ${event}") s.resolve(null) } collectionJob.invokeOnCompletion { _console.info("Flow 收集完成。awaitClose 应该已经被调用了。") }
// 确保脚本在后台运行 $threads.getMain().setBackground(true) def scope = $coroutines.newScope(null) $console.info("--- 章节 5:使用 fromCallback 桥接回调 ---") // 使用 fromCallback 创建一个 Flow def eventFlow = $coroutines.getFlows().fromCallback({ cbScope, producer -> $console.log("fromCallback: 开始设置模拟监听器...") def intervalId = -1 def i = 0 intervalId = $threads.getMain().getTimer().setInterval({ if (!producer.isClosedForSend()) { def event = "事件 ${i}" $console.log("模拟监听器: 产生 -> ${event}") // 通过 producer.send() 将数据发送到 Flow producer.send(cbScope, event) i++ } else { $threads.getMain().getTimer().clearInterval(intervalId) } }, 1000) // 这是关键!当 Flow 被取消或完成时,此代码块会被执行 producer.awaitClose({ $console.warn("awaitClose: Flow 结束,正在注销监听器...") $console.info("awaitClose: 清理完成!") }) // fromCallback 的 block 也需要返回一个 Job cbScope.resolve(null) }) // 启动对 eventFlow 的收集 def collectionJob = eventFlow .take(5) // 我们只取前5个事件,这会导致 Flow 正常完成 .collect(scope, { s, event -> $console.log("收集器: 收到 <- ${event}") s.resolve(null) }) collectionJob.invokeOnCompletion({ $console.info("Flow 收集完成。awaitClose 应该已经被调用了。") })
// 确保脚本在后台运行 $threads.getMain().setBackground(true); const scope = $coroutines.newScope(null); $console.info("--- 章节 5:使用 fromCallback 桥接回调 ---"); // 使用 fromCallback 创建一个 Flow const eventFlow = $coroutines.getFlows().fromCallback((cbScope, producer) => { $console.log("fromCallback: 开始设置模拟监听器..."); let intervalId = -1; let i = 0; intervalId = $threads.getMain().getTimer().setInterval(() => { if (producer.isClosedForSend() == false) { const event = `事件 ${i}`; $console.log(`模拟监听器: 产生 -> ${event}`); // 通过 producer.send() 将数据发送到 Flow producer.send(cbScope, event); i++; } else { $threads.getMain().getTimer().clearInterval(intervalId); } }, 1000); // 这是关键!当 Flow 被取消或完成时,此代码块会被执行 producer.awaitClose(() => { $console.warn("awaitClose: Flow 结束,正在注销监听器..."); // 确保即使在 producer 关闭后,定时器也能被清除 if (intervalId !== -1) { $threads.getMain().getTimer().clearInterval(intervalId); } $console.info("awaitClose: 清理完成!"); }); // fromCallback 的 block 也需要返回一个 Job return cbScope.resolve(null); }); // 启动对 eventFlow 的收集 const collectionJob = eventFlow .take(5) // 我们只取前5个事件,这会导致 Flow 正常完成 .collect(scope, (s, event) => { $console.log(`收集器: 收到 <- ${event}`); return s.resolve(null); }); collectionJob.invokeOnCompletion(() => { $console.info("Flow 收集完成。awaitClose 应该已经被调用了。"); });
-- 确保脚本在后台运行 _threads:getMain():setBackground(true) local scope = _coroutines:newScope(nil) _console:info("--- 章节 5:使用 fromCallback 桥接回调 ---") -- 使用 fromCallback 创建一个 Flow local eventFlow = _coroutines:getFlows():fromCallback(function(cbScope, producer) _console:log("fromCallback: 开始设置模拟监听器...") local intervalId = -1 local i = 0 intervalId = _threads:getMain():getTimer():setInterval(function() if not producer:isClosedForSend() then local event = "事件 " .. i _console:log("模拟监听器: 产生 -> " .. event) -- 通过 producer:send() 将数据发送到 Flow producer:send(cbScope, event) i = i + 1 else _threads:getMain():getTimer():clearInterval(intervalId) end end, 1000) -- 这是关键!当 Flow 被取消或完成时,此代码块会被执行 producer:awaitClose(function() _console:warn("awaitClose: Flow 结束,正在注销监听器...") _console:info("awaitClose: 清理完成!") end) -- fromCallback 的 block 也需要返回一个 Job return cbScope:resolve(nil) end) -- 启动对 eventFlow 的收集 local collectionJob = eventFlow :take(5) -- 我们只取前5个事件,这会导致 Flow 正常完成 :collect(scope, function(s, event) _console:log("收集器: 收到 <- " .. event) return s:resolve(nil) end) collectionJob:invokeOnCompletion(function() _console:info("Flow 收集完成。awaitClose 应该已经被调用了。") end)
<?php /** @var m8test_java\com\m8test\script\core\api\thread\Threads $threads */ global $threads; /** @var m8test_java\com\m8test\script\core\api\coroutines\Coroutines $coroutines */ global $coroutines; /** @var m8test_java\com\m8test\script\core\api\console\Console $console */ global $console; // 确保脚本在后台运行 $threads->getMain()->setBackground(true); $scope = $coroutines->newScope(function ($context) { $context->setDispatcher(function ($dispatchers) { return $dispatchers->getScriptMain(); }); }); $console->info(javaString("--- 章节 5:使用 fromCallback 桥接回调 ---")); // 使用 fromCallback 创建一个 Flow $eventFlow = $coroutines->getFlows()->fromCallback(function ($cbScope, $producer) { global $console, $threads; $console->log(javaString("fromCallback: 开始设置模拟监听器...")); $intervalId = -1; $i = 0; $intervalId = $threads->getMain()->getTimer()->setInterval(function() use ($producer, $cbScope, &$i, &$intervalId) { global $threads, $console; if (!$producer->isClosedForSend()) { $event = javaString("事件 {$i}"); $console->log(javaString("模拟监听器: 产生 -> {$event}")); // 通过 producer.send() 将数据发送到 Flow $producer->send($cbScope, $event); $i++; } else { $threads->getMain()->getTimer()->clearInterval($intervalId); } }, 1000); // 这是关键!当 Flow 被取消或完成时,此代码块会被执行 $producer->awaitClose(function() use (&$intervalId) { // intervalId by reference to ensure it's cleared global $console; $console->warn(javaString("awaitClose: Flow 结束,正在注销监听器...")); $console->info(javaString("awaitClose: 清理完成!")); }); // fromCallback 的 block 也需要返回一个 Job return $cbScope->resolve(null); }); // 启动对 eventFlow 的收集 $collectionJob = $eventFlow ->take(5) // 我们只取前5个事件,这会导致 Flow 正常完成 ->collect($scope, function ($s, $event) { global $console; $console->log(javaString("收集器: 收到 <- {$event}")); return $s->resolve(null); }); $collectionJob->invokeOnCompletion(function() { global $console; $console->info(javaString("Flow 收集完成。awaitClose 应该已经被调用了。")); });
# 导入所需的全局变量 from m8test_java.com.m8test.script.GlobalVariables import _threads from m8test_java.com.m8test.script.GlobalVariables import _coroutines from m8test_java.com.m8test.script.GlobalVariables import _console # 确保脚本在后台运行 _threads.getMain().setBackground(True) scope = _coroutines.newScope(lambda it: it.setDispatcher(lambda d: d.getScriptMain())) _console.info("--- 章节 5:使用 fromCallback 桥接回调 ---") # 使用 fromCallback 创建一个 Flow def create_event_flow(): def callback(cbScope, producer): _console.log("fromCallback: 开始设置模拟监听器...") i = [0] # 用列表包装模拟可变变量 interval_id = [None] # 同样包装 def on_tick(_): if not producer.isClosedForSend(): event = "事件 {}".format(i[0]) _console.log("模拟监听器: 产生 -> {}".format(event)) producer.send(cbScope, event) i[0] += 1 else: _threads.getMain().getTimer().clearInterval(interval_id[0]) interval_id[0] = _threads.getMain().getTimer().setInterval(on_tick, 1000) # 注册清理逻辑 producer.awaitClose(lambda: ( _console.warn("awaitClose: Flow 结束,正在注销监听器..."), _console.info("awaitClose: 清理完成!") )) return cbScope.resolve(None) return _coroutines.getFlows().fromCallback(callback) eventFlow = create_event_flow() # 启动对 eventFlow 的收集 collectionJob = eventFlow.take(5).collect(scope, lambda s, event: ( _console.log("收集器: 收到 <- {}".format(event)), s.resolve(None) )[-1]) collectionJob.invokeOnCompletion(lambda _: _console.info("Flow 收集完成。awaitClose 应该已经被调用了。"))
# encoding: utf-8 # 确保脚本在后台运行 $threads.getMain().setBackground(true) scope = $coroutines.newScope(nil) $console.info("--- 章节 5:使用 fromCallback 桥接回调 ---") # 使用 fromCallback 创建一个 Flow eventFlow = $coroutines.getFlows().fromCallback do |cbScope, producer| $console.log("fromCallback: 开始设置模拟监听器...") intervalId = -1 i = 0 intervalId = $threads.getMain().getTimer().setInterval(lambda { |p| if !producer.isClosedForSend() event = "事件 #{i}" $console.log("模拟监听器: 产生 -> #{event}") # 通过 producer.send() 将数据发送到 Flow producer.send(cbScope, event) i += 1 else $threads.getMain().getTimer().clearInterval(intervalId) end }, 1000) # 这是关键!当 Flow 被取消或完成时,此代码块会被执行 producer.awaitClose do $console.warn("awaitClose: Flow 结束,正在注销监听器...") $console.info("awaitClose: 清理完成!") end # fromCallback 的 block 也需要返回一个 Job cbScope.resolve(nil) end # 启动对 eventFlow 的收集 collectionJob = eventFlow .take(5) # 我们只取前5个事件,这会导致 Flow 正常完成 .collect(scope, lambda do |s, event| $console.log("收集器: 收到 <- #{event}") s.resolve(nil) end) collectionJob.invokeOnCompletion do |error| $console.info("Flow 收集完成。awaitClose 应该已经被调用了。") end
09 December 2025