class Supply {}
supply 是一种线程安全的异步数据流,如 Channel,但它可以有多个订阅者(taps: 水龙头),所有订阅者都可以获得流经 supply 的相同值。
它是 观察者模式 的线程安全实现,是支持 Raku 中的反应式编程的核心。
有两种类型的 Supplies:实时
(live)和按需
(on demand)。当水龙头接入到实时(live
)供应时,水龙头将仅在创建水龙头之后看到流经 supply 的值。这种供应(supplies)通常是无限的,例如鼠标移动。关闭这样的水龙头(tap)不会阻止鼠标事件的发生,它只是意味着值将看不见了。所有的开孔器(tappers
)都看到了相同的值流(flow of values)。
在按需
供应上轻敲(tap)将发起值的产生,再次轻敲(tap)供应可能会产生一组新的值。例如,每次轻敲时,Supply.interval
都会生成一个具有适当间隔的新计时器。如果水龙头(tap)关闭,计时器只会停止向该水龙头发射值。
从 Supplier 的工厂方法 Supply
获得 live Supply
(实时供应)。通过在 Supplier
对象上调用 emit
来发出新值。
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supplier.tap( -> $v { say "$v" });
$supplier.emit(42); # Will cause the tap to output "42"
实时方法在实时供应上返回 True
。工厂方法如 interval,from-list 会返回按需供应(on demand supplies)。
可以使用 Supplier::Preserving创建一个直到第一次轻敲的保存值的实时供应(Supply
)。
可以在并发页面中找到更多示例。
返回 Taps 的方法
tap 方法
method tap(Supply:D: &emit = -> $ { },
:&done,
:&quit,
--> Tap:D)
除了所有现有的 taps 之外,还会创建一个新 tap(如果您愿意,可以使用一种订阅)。第一个位置参数是一段代码,当通过 emit
调用获得新值时将调用该代码。
当调用供应(supply)上的 done
方法时,将调用 :&done
回调,指示通道(channel)的寿命终止。对于实时(live
)供应,将在父供应商(Supplier
)处调用 done
例程。
当调用 supply 上的 quit
方法时,将调用 &quit
回调,表示错误地终止了 supply。对于实时供应(live
supply),将在父供应商(Supplier
)处调用 quit
例程。
tap
方法返回 Tap 类型的对象,你可以在其上调用 close
方法来取消订阅。
my $s = Supply.from-list(0 .. 5);
my $t = $s.tap( -> $v { say "$v"}, done => { say "no more ticks" } );
产生:
0
1
2
3
4
5
no more ticks
act 方法
method act(Supply:D: &act --> Tap:D)
使用给定代码在给定供应上创建 tap。与 tap
方法不同,它保证该给定代码一次只能由一个线程执行。
实用方法
Capture 方法
定义为:
method Capture(Supply:D --> Capture:D)
等价于在调用者身上调用 .List.Capture。
Channel 方法
method Channel(Supply:D: --> Channel:D)
返回一个 Channel 对象,该对象将从供应中接收所有未来值,并在 Supply 完成时关闭(close
),并在供应退出时退出(关闭并显示错误)。
Promise 方法
method Promise(Supply:D: --> Promise:D)
返回在供应(Supply
)完成时将保留的 Promise。如果 Supply
也发出任何值,则 Promise
将保留最终值。否则,它将与 Nil
保持一致。如果供应以退出(quit
)而不是完成(done
)结束,那么 Promise
将被该异常打破(broken)。
my $supplier = Supplier.new;
my $s = $supplier.Supply;
my $p = $s.Promise;
$p.then(-> $v { say "got $v.result()" });
$supplier.emit('cha'); # not output yet
$supplier.done(); # got cha
Promise
方法在处理仅产生一个值的供应时,当只感兴趣最终值,或仅完成(成功与否)相关时最有用。
live 方法
method live(Supply:D: --> Bool:D)
如果供应是“实时”,则返回True,也就是说,值一到达就会被发送到水龙头。始终在默认的Supply中返回True(但是例如在Supply.from-list返回的供应中它为False)。
schedule-on 方法
method schedule-on(Supply:D: Scheduler $scheduler)
在指定的调度程序上运行emit,done和quit回调。
这对于需要从GUI线程运行某些操作的GUI工具包很有用。
等到供应完成的方法
wait 方法
method wait(Supply:D:)
点击它被调用的供应,并阻止执行,直到供应完成(在这种情况下,它评估在Supply上发出的最终值,或者如果没有发出值则为Nil)或退出(在这种情况下它)将抛出传递给退出的异常)。
my $s = Supplier.new;
start {
sleep 1;
say "One second: running.";
sleep 1;
$s.emit(42);
$s.done;
}
$s.Supply.wait;
say "Two seconds: done";
list 方法
method list(Supply:D: --> List:D)
点击它所调用的Supply,并返回一个惰性列表,该列表将在Supply发出值时生效。一旦供应完成,该清单将被终止。如果Supply退出,则一旦达到惰性列表中的该点,就会抛出异常。
grab 方法
method grab(Supply:D: &when-done --> Supply:D)
点击调用它的供应。完成后,调用&when-done然后发出它在结果Supply上返回的值列表。如果原始供应退出,则异常立即在退货供应上传达。
my $s = Supply.from-list(4, 10, 3, 2);
my $t = $s.grab(&sum);
$t.tap(&say); # OUTPUT: «19»
reverse 方法
method reverse(Supply:D: --> Supply:D)
点击调用它的供应。一旦该供应完成,其发出的所有值将以相反的顺序在返回的供应上发出。如果原始供应退出,则异常立即在退货供应上传达。
my $s = Supply.from-list(1, 2, 3);
my $t = $s.reverse;
$t.tap(&say); # OUTPUT: «321»
sort 方法
method sort(Supply:D: &custom-routine-to-use? --> Supply:D)
点击调用它的供应。一旦该耗材发出完成,它所发出的所有值都将被排序,并且结果以排序的顺序在返回的Supply上发出结果。可选择接受比较器块。如果原始供应退出,则异常立即在退货供应上传达。
my $s = Supply.from-list(4, 10, 3, 2);
my $t = $s.sort();
$t.tap(&say); # OUTPUT: «23410»
返回另一个 Supply 的方法
from-list 方法
method from-list(Supply:U: +@values --> Supply:D)
根据传递给此方法的值创建按需供应。
my $s = Supply.from-list(1, 2, 3);
$s.tap(&say); # OUTPUT: «123»
share 方法
method share(Supply:D: --> Supply:D)
通过按需供应创建实时供应,从而可以在多个分接头上共享按需供应的值,而不是每个分接头从按需供应中查看其自己的所有值的副本。
# this says in turn: "first 1" "first 2" "second 2" "first 3" "second 3"
my $s = Supply.interval(1).share;
$s.tap: { "first $_".say };
sleep 1.1;
$s.tap: { "second $_".say };
sleep 2
flat 方法
method flat(Supply:D: --> Supply:D)
创建供应,在再次发射之前,在给定供应中看到的所有值都被展平。
do 方法
method do(Supply:D: &do --> Supply:D)
创建供应,在给定供应中看到的所有值再次发出。仅为其副作用执行的给定代码保证一次只能由一个线程执行。
on-close 方法
method on-close(Supply:D: &on-close --> Supply:D)
返回一个新的Supply,只要关闭该供应的Tap,它就会运行和关闭。这包括是否将进一步的操作链接到供应。 (例如,$ supply.on-close(&on-close).map(*。uc))。使用反应或供应块时,使用CLOSE相位器通常是更好的选择。
my $s = Supplier.new;
my $tap = $s.Supply.on-close({ say "Tap closed" }).tap(
-> $v { say "the value is $v" },
done => { say "Supply is done" },
quit => -> $ex { say "Supply finished with error $ex" },
);
$s.emit('Raku');
$tap.close; # OUTPUT: «Tap closed»
interval 方法
method interval(Supply:U: $interval, $delay = 0, :$scheduler = $*SCHEDULER --> Supply:D)
创建一个每隔$ interval秒发出一次值的供应,从调用开始$ delay秒。发射的值是从0开始的整数,并且对于每个发射的值递增1。
实施可能会将太小的值视为它们支持的最低分辨率,在这种情况下可能会发出警告;例如将0.0001作为0.001。
grep 方法
method grep(Supply:D: Mu $test --> Supply:D)
创建一个新的供应,仅从原始供应中发出smartmatch对$ test的值。
my $supplier = Supplier.new;
my $all = $supplier.Supply;
my $ints = $all.grep(Int);
$ints.tap(&say);
$supplier.emit($_) for 1, 'a string', 3.14159; # prints only 1
map 方法
method map(Supply:D: &mapper --> Supply:D)
返回一个新的供应,通过&mapper映射给定供应的每个值,并将其发送到新供应。
my $supplier = Supplier.new;
my $all = $supplier.Supply;
my $double = $all.map(-> $value { $value * 2 });
$double.tap(&say);
$supplier.emit(4); # RESULT: «8»
batch 方法
method batch(Supply:D: :$elems, :$seconds --> Supply:D)
创建一个新的供应,通过批次中的元素数量(使用:elems)或最大秒数(使用:秒)或两者来批量处理给定供应的值。供应完成后,最后一批中会发出任何剩余值。
elems 方法
method elems(Supply:D: $seconds? --> Supply:D)
创建一个新的供应,其中发出的值的数量发生了变化。如果您只想每隔这么多秒更新一次,它也可以选择一个间隔(以秒为单位)。
head 方法
method head(Supply:D: Int(Cool) $number = 1 --> Supply:D)
使用与List.head相同的语义创建“head”提供。
my $s = Supply.from-list(4, 10, 3, 2);
my $hs = $s.head(2);
$hs.tap(&say); # OUTPUT: «410»
tail 方法
method tail(Supply:D: Int(Cool) $number = 1 --> Supply:D)
使用与List.tail相同的语义创建“尾部”提供。
my $s = Supply.from-list(4, 10, 3, 2);
my $ts = $s.tail(2);
$ts.tap(&say); # OUTPUT: «32»
rotor 方法
method rotor(Supply:D: @cycle --> Supply:D)
使用与List.rotor相同的语义创建“旋转”供应。
delayed 方法
method delayed(Supply:D: $seconds, :$scheduler = $*SCHEDULER --> Supply:D)
创建一个新的供应,其中发出流经给定供应的所有值,但具有给定的延迟(以秒为单位)。
throttle 方法
method throttle(Supply:D:
$limit, # values / time or simultaneous processing
$seconds or $callable, # time-unit / code to process simultaneously
$delay = 0, # initial delay before starting, in seconds
:$control, # supply to emit control messages on (optional)
:$status, # supply to tap status messages from (optional)
:$bleed, # supply to bleed messages to (optional)
:$vent-at, # bleed when so many buffered (optional)
:$scheduler, # scheduler to use, default $*SCHEDULER
--> Supply:D)
从给定的Supply生成Supply,但确保传递的消息数量有限。
它有两种操作模式:每个时间单位或最大执行代码块:这由第二个位置参数决定。
第一个位置参数指定应该应用的限制。
如果第二个位置参数是Callable,则limit表示执行Callable的最大并行进程数,该进程被赋予接收的值。在这种情况下,发出的值将是从启动Callable获得的Promises。
如果第二个位置参数是数值,则将其解释为时间单位(以秒为单位)。如果指定.1作为值,则确保每十分之一秒不超过限制。
如果超出限制,则缓冲传入的消息,直到有空间再次传递/执行Callable。
第三个位置参数是可选的:它表示在传递任何值之前油门将等待的秒数。
:control命名参数可选地指定一个可用于控制油门运行时的供应。可以发送的消息是“key:value”形式的字符串。请参阅下文,了解可以发送以控制油门的消息类型。
:status named参数可选地指定将接收任何状态消息的Supply。如果指定,它将至少在原始Supply耗尽后发送一条状态消息。请参阅下面的状态消息
:bleed命名参数可选地指定一个Supply,它将接收任何明确放血的值(带有出血控制消息),或者自动放血(如果有一个出口处于活动状态)。
:vent-at命名参数指示在将任何其他值路由到:bleed Supply之前可以缓冲的值的数量。如果未指定,则默认为0(不会导致自动出血)。只有在指定了:出血供应时才有意义。
:scheduler命名参数表示要使用的调度程序。默认为$ * SCHEDULER。
控制信息
这些消息可以发送到:control Supply。控制消息由“key:value”形式的字符串组成,例如“限制:4”。
限制
将消息数(最初在第一个位置给出)更改为给定值。
流血
将给定数量的缓冲消息路由到:bleed Supply。
发泄,在
在自动出血发生之前更改缓冲值的最大数量。如果该值低于之前,将导致立即重新路由缓冲值以匹配新的最大值。
状态
将状态消息发送到:status具有给定ID的供应。
状态信息
状态返回消息是具有以下键的哈希:
允许
当前仍允许传递/执行的消息/可调用的数量。
流血
路由到:bleed Supply的消息数。
缓冲的
由于溢出而当前缓冲的消息数。
发射
发出(传递)的消息数。
ID
此状态消息的ID(单调递增的数字)。如果您想记录状态消息,则很方便。
限制
正在应用的当前限制。
发泄,在
在自动重新路由到:bleed Supply之前可以缓冲的最大消息数。
例子
有一段简单的代码在异步开始运行时宣布,等待一段时间,然后在完成时宣布。这样做6次,但不要让超过3次同时运行。
my $s = Supply.from-list(^6); # set up supply
my $t = $s.throttle: 3, # only allow 3 at a time
{ # code block to run
say "running $_"; # announce we've started
sleep rand; # wait some random time
say "done $_" # announce we're done
} # don't need ; because } at end of line
$t.wait; # wait for the supply to be done
一次运行的结果将是:
running 0
running 1
running 2
done 2
running 3
done 1
running 4
done 4
running 5
done 0
done 3
done 5
stable 方法
method stable(Supply:D: $time, :$scheduler = $*SCHEDULER --> Supply:D)
如果在给定的$ time(以秒为单位)内没有被另一个值取代,则创建一个仅传递给定供应的值的新供应。 (可选)使用:scheduler参数,使用除默认调度程序之外的其他调度程序。
为了澄清上述情况,如果在超时$ time期间,将向供应商发送所有其他值,但最后一个值将被丢弃。每次向供应商发出额外值时,在超时期间,将重置$ time。
在处理UI输入时,此方法非常有用,在用户停止输入一段时间而不是每次击键之前,不希望执行操作。
my $supplier = Supplier.new;
my $supply1 = $supplier.Supply;
$supply1.tap(-> $v { say "Supply1 got: $v" });
$supplier.emit(42);
my Supply $supply2 = $supply1.stable(5);
$supply2.tap(-> $v { say "Supply2 got: $v" });
sleep(3);
$supplier.emit(43); # will not be seen by $supply2 but will reset $time
$supplier.emit(44);
sleep(10);
# OUTPUT: «Supply1 got: 42Supply1 got: 43Supply1 got: 44Supply2 got: 44»
如上所示,$ supply1收到了向供应商发出的所有价值,而$ supply2只收到了一个价值。 43被抛弃了,因为它后面跟着另一个’最后’值44,它被保留并在大约8秒后发送到$ supply2,这是因为超时$ time在3秒后被重置。
reduce 方法
method reduce(Supply:D: &with --> Supply:D)
使用与List.reduce相同的语义创建“减少”供应。
my $supply = Supply.from-list(1..5).reduce({$^a + $^b});
$supply.tap(-> $v { say "$v" }); # OUTPUT: «15»
produce 方法
method produce(Supply:D: &with --> Supply:D)
使用与List.produce相同的语义创建“生成”供应。
my $supply = Supply.from-list(1..5).produce({$^a + $^b});
$supply.tap(-> $v { say "$v" }); # OUTPUT: «1361015»
lines
method lines(Supply:D: :$chomp = True --> Supply:D)
创建一个供应,它将从通常由某些异步I / O操作创建的供应中逐行发出字符。可选:chomp参数指示是否删除行分隔符:默认值为True。
words
method words(Supply:D: --> Supply:D)
创建一个供应,它将从通常由某些异步I / O操作创建的供应中逐字发出字符。
my $s = Supply.from-list("Hello Word!".comb);
my $ws = $s.words;
$ws.tap(&say); # OUTPUT: «HelloWord!»
unique 方法
method unique(Supply:D: :$as, :$with, :$expires --> Supply:D)
创建仅提供唯一值的供应,由可选:as和:with参数定义(与List.unique相同)。可选:expires参数在“重置”之前等待多长时间(以秒为单位)并且不考虑已经看到的值,即使它与旧值相同。
squish 方法
method squish(Supply:D: :$as, :$with --> Supply:D)
创建仅提供唯一值的供应,由可选:as和:with参数定义(与List.squish相同)。
max 方法
method max(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)
如果它们大于之前看到的任何值,则创建仅从给定供应中发出值的供应。换句话说,从持续上升的供应中,它将发出所有的价值。从不断下降的供应中,它只会发出第一个值。可选参数指定比较器,就像Any.max一样。
min 方法
method min(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)
如果它们小于之前看到的任何值,则创建仅从给定供应中发出值的供应。换句话说,从不断下降的供应中,它将发出所有的价值。从持续上升的供应中,它将仅发出第一个值。可选参数指定比较器,就像Any.min一样。
minmax 方法
method minmax(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)
每次从给定供应中看到新的最小值或最大值时,创建一个发出范围的供应。可选参数指定比较器,与Any.minmax一样。
skip 方法
method skip(Supply:D: Int(Cool) $number = 1 --> Supply:D)
返回一个新的Supply,它将从给定的Supply发出所有值,但前几个$ number值将被丢弃。
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply = $supply.skip(3);
$supply.tap({ say $_ });
$supplier.emit($_) for 1..10; # OUTPUT: «45678910»
start 方法
method start(Supply:D: &startee --> Supply:D)
创造供应供应。对于原始供应中的每个值,代码对象在另一个线程上进行调度,并返回单个值(如果代码成功)或者没有值退出(如果代码失败)的供应。
这对于异步启动您未阻止的工作非常有用。
使用migrate可将值再次连接到单个电源中。
migrate 方法
method migrate(Supply:D: --> Supply:D)
采用供应本身具有类型为Supply的值作为输入。每次外部电源发出新的电源时,都会触发该电源并发出其值。之前任何已开发的供应将被关闭。这对于在不同数据源之间进行迁移非常有用,并且只关注最新的数据源。
例如,想象一下用户可以在不同股票之间切换的应用程序。当他们切换到新的连接时,建立与Web套接字的连接以获取最新值,并且应关闭任何先前的连接。来自Web套接字的每个值流将表示为Supply,它们本身被发送到供应的最新数据源供观察。迁移方法可用于将此供应供应平坦为单个供应用户关心的当前值。
这是一个简单的模拟这样的程序:
my Supplier $stock-sources .= new;
sub watch-stock($symbol) {
$stock-sources.emit: supply {
say "Starting to watch $symbol";
whenever Supply.interval(1) {
emit "$symbol: 111." ~ 99.rand.Int;
}
CLOSE say "Lost interest in $symbol";
}
}
$stock-sources.Supply.migrate.tap: *.say;
watch-stock('GOOG');
sleep 3;
watch-stock('AAPL');
sleep 3;
产生的输出如下:
Starting to watch GOOG
GOOG: 111.67
GOOG: 111.20
GOOG: 111.37
Lost interest in GOOG
Starting to watch AAPL
AAPL: 111.55
AAPL: 111.6
AAPL: 111.6
组合 supplies 的方法
merge 方法
method merge(Supply @*supplies --> Supply:D)
创建从给定供应中看到的任何值的供应。只有在完成所有给定的供应后,才能完成供应。也可以称为类方法。
zip 方法
method zip(Supply @*supplies, :&with = &[,] --> Supply:D)
只要在所有耗材上看到新值,就会创建一个发出组合值的供应。默认情况下,会创建列表,但可以通过使用:with参数指定自己的组合器来更改列表。一旦完成任何给定的供应,就会完成供应。也可以称为类方法。
zip-latest 方法
method zip-latest(Supply @*supplies, :&with = &[,], :$initial --> Supply:D)
只要在任何耗材上看到新值,就会创建一个发出组合值的供应。默认情况下,会创建列表,但可以通过使用:with参数指定自己的组合器来更改列表。可选的:initial参数可用于指示组合值的初始状态。默认情况下,在生成的电源上发出第一个组合值之前,所有耗材必须至少有一个值。一旦完成任何给定的供应,就会完成供应。也可以称为类方法。
I/O 功能作为 supplies
sub signal
sub signal(*@signals, :$scheduler = $*SCHEDULER)
为指定的Signal枚举(例如SIGINT)和可选的:scheduler参数创建一个供应。收到的任何信号都将在电源上发出。例如:
signal(SIGINT).tap( { say "Thank you for your attention"; exit 0 } );
会抓住Control-C,谢谢你,然后退出。
要从信号编号转到信号,您可以执行以下操作:
signal(Signal(2)).tap( -> $sig { say "Received signal: $sig" } );
通过检查Signal ::。键可以找到支持的信号列表(就像任何枚举一样)。有关枚举如何工作的更多详细信息,请参阅枚举。
注意:高达2018.05的Rakudo版本有一个错误,因为某些系统上的信号数值不正确。例如,即使信号(10)实际上是特定系统上的SIGUSR1,它也会返回SIGBUS。话虽如此,使用信号(SIGUSR1)在2018.04,2018.04.1和2018.05之外的所有Rakudo版本上都按预期工作,其中通过使用信号(SIGBUS)可以实现预期的行为。这些问题在2018.05之后的Rakudo版本中得到了解决。
IO::Notification.watch-path 方法
method watch-path($path --> Supply:D)
创建OS将向其发出值的供应,以指示给定路径的文件系统上的更改。在IO对象上还有一个带有watch方法的快捷方式,如下所示:
IO::Notification.watch-path(".").act( { say "$^file changed" } );
".".IO.watch.act( { say "$^file changed" } ); # same