class Supply

Asynchronous data stream with multiple subscribers

class Supply {}

supply 是一种线程安全的异步数据流,如 Channel,但它可以有多个订阅者(taps: 水龙头),所有订阅者都可以获得流经 supply 的相同值。

它是 观察者模式 的线程安全实现,是支持 Raku 中的反应式编程的核心。

有两种类型的 Supplies:实时(live)和按需(on demand)。当水龙头接入到实时(live)供应时,水龙头将仅在创建水龙头之后看到流经 supply 的值。这种供应(supplies)通常是无限的,例如鼠标移动。关闭这样的水龙头(tap)不会阻止鼠标事件的发生,它只是意味着值将看不见了。所有的开孔器(tappers )都看到了相同的值流(flow of values)。

按需供应上轻敲(tap)将发起值的产生,再次轻敲(tap)供应可能会产生一组新的值。例如,每次轻敲时,Supply.interval 都会生成一个具有适当间隔的新计时器。如果水龙头(tap)关闭,计时器只会停止向该水龙头发射值。

Supplier 的工厂方法 Supply 获得 live Supply(实时供应)。通过在 Supplier 对象上调用 emit 来发出新值。

my $supplier = Supplier.new;
my $supply   = $supplier.Supply;
$supplier.tap( -> $v { say "$v" });
$supplier.emit(42);  # Will cause the tap to output "42" 

实时方法在实时供应上返回 True。工厂方法如 intervalfrom-list 会返回按需供应(on demand supplies)。

可以使用 Supplier::Preserving创建一个直到第一次轻敲的保存值的实时供应(Supply)。

可以在并发页面中找到更多示例。

返回 Taps 的方法

tap 方法

method tap(Supply:D: &emit = -> $ { },
        :&done,
        :&quit,
    --> Tap:D)

除了所有现有的 taps 之外,还会创建一个新 tap(如果您愿意,可以使用一种订阅)。第一个位置参数是一段代码,当通过 emit 调用获得新值时将调用该代码。

当调用供应(supply)上的 done 方法时,将调用 :&done 回调,指示通道(channel)的寿命终止。对于实时(live)供应,将在父供应商(Supplier)处调用 done 例程。

当调用 supply 上的 quit 方法时,将调用 &quit 回调,表示错误地终止了 supply。对于实时供应(live supply),将在父供应商(Supplier)处调用 quit 例程。

tap 方法返回 Tap 类型的对象,你可以在其上调用 close 方法来取消订阅。

my $s = Supply.from-list(0 .. 5);
my $t = $s.tap( -> $v { say "$v"}, done => { say "no more ticks" } );

产生:

0
1
2
3
4
5
no more ticks

act 方法

method act(Supply:D: &act --> Tap:D)

使用给定代码在给定供应上创建 tap。与 tap 方法不同,它保证该给定代码一次只能由一个线程执行。

实用方法

Capture 方法

定义为:

method Capture(Supply:D --> Capture:D)

等价于在调用者身上调用 .List.Capture

Channel 方法

method Channel(Supply:D: --> Channel:D)

返回一个 Channel 对象,该对象将从供应中接收所有未来值,并在 Supply 完成时关闭(close),并在供应退出时退出(关闭并显示错误)。

Promise 方法

method Promise(Supply:D: --> Promise:D)

返回在供应(Supply)完成时将保留的 Promise。如果 Supply 也发出任何值,则 Promise 将保留最终值。否则,它将与 Nil 保持一致。如果供应以退出(quit)而不是完成(done)结束,那么 Promise 将被该异常打破(broken)。

my $supplier = Supplier.new;
my $s = $supplier.Supply;
my $p = $s.Promise;
$p.then(-> $v { say "got $v.result()" });
$supplier.emit('cha');         # not output yet 
$supplier.done();              # got cha 

Promise 方法在处理仅产生一个值的供应时,当只感兴趣最终值,或仅完成(成功与否)相关时最有用。

live 方法

method live(Supply:D: --> Bool:D)

如果供应是“实时”,则返回True,也就是说,值一到达就会被发送到水龙头。始终在默认的Supply中返回True(但是例如在Supply.from-list返回的供应中它为False)。

schedule-on 方法

method schedule-on(Supply:D: Scheduler $scheduler)

在指定的调度程序上运行emit,done和quit回调。

这对于需要从GUI线程运行某些操作的GUI工具包很有用。

等到供应完成的方法

wait 方法

method wait(Supply:D:)

点击它被调用的供应,并阻止执行,直到供应完成(在这种情况下,它评估在Supply上发出的最终值,或者如果没有发出值则为Nil)或退出(在这种情况下它)将抛出传递给退出的异常)。

my $s = Supplier.new;
start {
  sleep 1;
  say "One second: running.";
  sleep 1;
  $s.emit(42);
  $s.done;
}
$s.Supply.wait;
say "Two seconds: done";

list 方法

method list(Supply:D: --> List:D)

点击它所调用的Supply,并返回一个惰性列表,该列表将在Supply发出值时生效。一旦供应完成,该清单将被终止。如果Supply退出,则一旦达到惰性列表中的该点,就会抛出异常。

grab 方法

method grab(Supply:D: &when-done --> Supply:D)

点击调用它的供应。完成后,调用&when-done然后发出它在结果Supply上返回的值列表。如果原始供应退出,则异常立即在退货供应上传达。

my $s = Supply.from-list(4, 10, 3, 2);
my $t = $s.grab(&sum);
$t.tap(&say);           # OUTPUT: «19␤» 

reverse 方法

method reverse(Supply:D: --> Supply:D)

点击调用它的供应。一旦该供应完成,其发出的所有值将以相反的顺序在返回的供应上发出。如果原始供应退出,则异常立即在退货供应上传达。

my $s = Supply.from-list(1, 2, 3);
my $t = $s.reverse;
$t.tap(&say);           # OUTPUT: «3␤2␤1␤» 

sort 方法

method sort(Supply:D: &custom-routine-to-use? --> Supply:D)

点击调用它的供应。一旦该耗材发出完成,它所发出的所有值都将被排序,并且结果以排序的顺序在返回的Supply上发出结果。可选择接受比较器块。如果原始供应退出,则异常立即在退货供应上传达。

my $s = Supply.from-list(4, 10, 3, 2);
my $t = $s.sort();
$t.tap(&say);           # OUTPUT: «2␤3␤4␤10␤» 

返回另一个 Supply 的方法

from-list 方法

method from-list(Supply:U: +@values --> Supply:D)

根据传递给此方法的值创建按需供应。

my $s = Supply.from-list(1, 2, 3);
$s.tap(&say);           # OUTPUT: «1␤2␤3␤» 

share 方法

method share(Supply:D: --> Supply:D)

通过按需供应创建实时供应,从而可以在多个分接头上共享按需供应的值,而不是每个分接头从按需供应中查看其自己的所有值的副本。

# this says in turn: "first 1" "first 2" "second 2" "first 3" "second 3" 
my $s = Supply.interval(1).share;
$s.tap: { "first $_".say };
sleep 1.1;
$s.tap: { "second $_".say };
sleep 2

flat 方法

method flat(Supply:D: --> Supply:D)

创建供应,在再次发射之前,在给定供应中看到的所有值都被展平。

do 方法

method do(Supply:D: &do --> Supply:D)

创建供应,在给定供应中看到的所有值再次发出。仅为其副作用执行的给定代码保证一次只能由一个线程执行。

on-close 方法

method on-close(Supply:D: &on-close --> Supply:D)

返回一个新的Supply,只要关闭该供应的Tap,它就会运行和关闭。这包括是否将进一步的操作链接到供应。 (例如,$ supply.on-close(&on-close).map(*。uc))。使用反应或供应块时,使用CLOSE相位器通常是更好的选择。

my $s = Supplier.new;
my $tap = $s.Supply.on-close({ say "Tap closed" }).tap(
    -> $v { say "the value is $v" },
    done    => { say "Supply is done" },
    quit    => -> $ex { say "Supply finished with error $ex" },
);
 
$s.emit('Raku');
$tap.close;        # OUTPUT: «Tap closed␤»

interval 方法

method interval(Supply:U: $interval, $delay = 0, :$scheduler = $*SCHEDULER --> Supply:D)

创建一个每隔$ interval秒发出一次值的供应,从调用开始$ delay秒。发射的值是从0开始的整数,并且对于每个发射的值递增1。

实施可能会将太小的值视为它们支持的最低分辨率,在这种情况下可能会发出警告;例如将0.0001作为0.001。

grep 方法

method grep(Supply:D: Mu $test --> Supply:D)

创建一个新的供应,仅从原始供应中发出smartmatch对$ test的值。

my $supplier = Supplier.new;
my $all      = $supplier.Supply;
my $ints     = $all.grep(Int);
$ints.tap(&say);
$supplier.emit($_) for 1, 'a string', 3.14159;   # prints only 1 

map 方法

method map(Supply:D: &mapper --> Supply:D)

返回一个新的供应,通过&mapper映射给定供应的每个值,并将其发送到新供应。

my $supplier = Supplier.new;
my $all      = $supplier.Supply;
my $double   = $all.map(-> $value { $value * 2 });
$double.tap(&say);
$supplier.emit(4);           # RESULT: «8» 

batch 方法

method batch(Supply:D: :$elems, :$seconds --> Supply:D)

创建一个新的供应,通过批次中的元素数量(使用:elems)或最大秒数(使用:秒)或两者来批量处理给定供应的值。供应完成后,最后一批中会发出任何剩余值。

elems 方法

method elems(Supply:D: $seconds? --> Supply:D)

创建一个新的供应,其中发出的值的数量发生了变化。如果您只想每隔这么多秒更新一次,它也可以选择一个间隔(以秒为单位)。

head 方法

method head(Supply:D: Int(Cool) $number = 1 --> Supply:D)

使用与List.head相同的语义创建“head”提供。

my $s = Supply.from-list(4, 10, 3, 2);
my $hs = $s.head(2);
$hs.tap(&say);           # OUTPUT: «4␤10␤» 

tail 方法

method tail(Supply:D: Int(Cool) $number = 1 --> Supply:D)

使用与List.tail相同的语义创建“尾部”提供。

my $s = Supply.from-list(4, 10, 3, 2);
my $ts = $s.tail(2);
$ts.tap(&say);           # OUTPUT: «3␤2␤» 

rotor 方法

method rotor(Supply:D: @cycle --> Supply:D)

使用与List.rotor相同的语义创建“旋转”供应。

delayed 方法

method delayed(Supply:D: $seconds, :$scheduler = $*SCHEDULER --> Supply:D)

创建一个新的供应,其中发出流经给定供应的所有值,但具有给定的延迟(以秒为单位)。

throttle 方法

method throttle(Supply:D:
  $limit,                 # values / time or simultaneous processing 
  $seconds or $callable,  # time-unit / code to process simultaneously 
  $delay = 0,             # initial delay before starting, in seconds 
  :$control,              # supply to emit control messages on (optional) 
  :$status,               # supply to tap status messages from (optional) 
  :$bleed,                # supply to bleed messages to (optional) 
  :$vent-at,              # bleed when so many buffered (optional) 
  :$scheduler,            # scheduler to use, default $*SCHEDULER 
  --> Supply:D)

从给定的Supply生成Supply,但确保传递的消息数量有限。

它有两种操作模式:每个时间单位或最大执行代码块:这由第二个位置参数决定。

第一个位置参数指定应该应用的限制。

如果第二个位置参数是Callable,则limit表示执行Callable的最大并行进程数,该进程被赋予接收的值。在这种情况下,发出的值将是从启动Callable获得的Promises。

如果第二个位置参数是数值,则将其解释为时间单位(以秒为单位)。如果指定.1作为值,则确保每十分之一秒不超过限制。

如果超出限制,则缓冲传入的消息,直到有空间再次传递/执行Callable。

第三个位置参数是可选的:它表示在传递任何值之前油门将等待的秒数。

:control命名参数可选地指定一个可用于控制油门运行时的供应。可以发送的消息是“key:value”形式的字符串。请参阅下文,了解可以发送以控制油门的消息类型。

:status named参数可选地指定将接收任何状态消息的Supply。如果指定,它将至少在原始Supply耗尽后发送一条状态消息。请参阅下面的状态消息

:bleed命名参数可选地指定一个Supply,它将接收任何明确放血的值(带有出血控制消息),或者自动放血(如果有一个出口处于活动状态)。

:vent-at命名参数指示在将任何其他值路由到:bleed Supply之前可以缓冲的值的数量。如果未指定,则默认为0(不会导致自动出血)。只有在指定了:出血供应时才有意义。

:scheduler命名参数表示要使用的调度程序。默认为$ * SCHEDULER。

控制信息

这些消息可以发送到:control Supply。控制消息由“key:value”形式的字符串组成,例如“限制:4”。

限制

将消息数(最初在第一个位置给出)更改为给定值。

流血

将给定数量的缓冲消息路由到:bleed Supply。

发泄,在

在自动出血发生之前更改缓冲值的最大数量。如果该值低于之前,将导致立即重新路由缓冲值以匹配新的最大值。

状态

将状态消息发送到:status具有给定ID的供应。

状态信息

状态返回消息是具有以下键的哈希:

允许

当前仍允许传递/执行的消息/可调用的数量。

流血

路由到:bleed Supply的消息数。

缓冲的

由于溢出而当前缓冲的消息数。

发射

发出(传递)的消息数。

ID

此状态消息的ID(单调递增的数字)。如果您想记录状态消息,则很方便。

限制

正在应用的当前限制。

发泄,在

在自动重新路由到:bleed Supply之前可以缓冲的最大消息数。

例子

有一段简单的代码在异步开始运行时宣布,等待一段时间,然后在完成时宣布。这样做6次,但不要让超过3次同时运行。

my $s = Supply.from-list(^6);  # set up supply 
my $t = $s.throttle: 3,        # only allow 3 at a time 
{                              # code block to run 
    say "running $_";          # announce we've started 
    sleep rand;                # wait some random time 
    say "done $_"              # announce we're done 
}                              # don't need ; because } at end of line 
$t.wait;                       # wait for the supply to be done 

一次运行的结果将是:

running 0
running 1
running 2
done 2
running 3
done 1
running 4
done 4
running 5
done 0
done 3
done 5

stable 方法

method stable(Supply:D: $time, :$scheduler = $*SCHEDULER --> Supply:D)

如果在给定的$ time(以秒为单位)内没有被另一个值取代,则创建一个仅传递给定供应的值的新供应。 (可选)使用:scheduler参数,使用除默认调度程序之外的其他调度程序。

为了澄清上述情况,如果在超时$ time期间,将向供应商发送所有其他值,但最后一个值将被丢弃。每次向供应商发出额外值时,在超时期间,将重置$ time。

在处理UI输入时,此方法非常有用,在用户停止输入一段时间而不是每次击键之前,不希望执行操作。

my $supplier = Supplier.new;
my $supply1 = $supplier.Supply;
$supply1.tap(-> $v { say "Supply1 got: $v" });
$supplier.emit(42);
 
my Supply $supply2 = $supply1.stable(5);
$supply2.tap(-> $v { say "Supply2 got: $v" });
sleep(3);
$supplier.emit(43);  # will not be seen by $supply2 but will reset $time 
$supplier.emit(44);
sleep(10);
# OUTPUT: «Supply1 got: 42␤Supply1 got: 43␤Supply1 got: 44␤Supply2 got: 44␤» 

如上所示,$ supply1收到了向供应商发出的所有价值,而$ supply2只收到了一个价值。 43被抛弃了,因为它后面跟着另一个’最后’值44,它被保留并在大约8秒后发送到$ supply2,这是因为超时$ time在3秒后被重置。

reduce 方法

method reduce(Supply:D: &with --> Supply:D)

使用与List.reduce相同的语义创建“减少”供应。

my $supply = Supply.from-list(1..5).reduce({$^a + $^b});
$supply.tap(-> $v { say "$v" }); # OUTPUT: «15␤» 

produce 方法

method produce(Supply:D: &with --> Supply:D)

使用与List.produce相同的语义创建“生成”供应。

my $supply = Supply.from-list(1..5).produce({$^a + $^b});
$supply.tap(-> $v { say "$v" }); # OUTPUT: «1␤3␤6␤10␤15␤» 

lines

method lines(Supply:D: :$chomp = True --> Supply:D)

创建一个供应,它将从通常由某些异步I / O操作创建的供应中逐行发出字符。可选:chomp参数指示是否删除行分隔符:默认值为True。

words

method words(Supply:D: --> Supply:D)

创建一个供应,它将从通常由某些异步I / O操作创建的供应中逐字发出字符。

my $s = Supply.from-list("Hello Word!".comb);
my $ws = $s.words;
$ws.tap(&say);           # OUTPUT: «Hello␤Word!␤» 

unique 方法

method unique(Supply:D: :$as, :$with, :$expires --> Supply:D)

创建仅提供唯一值的供应,由可选:as和:with参数定义(与List.unique相同)。可选:expires参数在“重置”之前等待多长时间(以秒为单位)并且不考虑已经看到的值,即使它与旧值相同。

squish 方法

method squish(Supply:D: :$as, :$with --> Supply:D)

创建仅提供唯一值的供应,由可选:as和:with参数定义(与List.squish相同)。

max 方法

method max(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

如果它们大于之前看到的任何值,则创建仅从给定供应中发出值的供应。换句话说,从持续上升的供应中,它将发出所有的价值。从不断下降的供应中,它只会发出第一个值。可选参数指定比较器,就像Any.max一样。

min 方法

method min(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

如果它们小于之前看到的任何值,则创建仅从给定供应中发出值的供应。换句话说,从不断下降的供应中,它将发出所有的价值。从持续上升的供应中,它将仅发出第一个值。可选参数指定比较器,就像Any.min一样。

minmax 方法

method minmax(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

每次从给定供应中看到新的最小值或最大值时,创建一个发出范围的供应。可选参数指定比较器,与Any.minmax一样。

skip 方法

method skip(Supply:D: Int(Cool) $number = 1 --> Supply:D)

返回一个新的Supply,它将从给定的Supply发出所有值,但前几个$ number值将被丢弃。

my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply = $supply.skip(3);
$supply.tap({ say $_ });
$supplier.emit($_) for 1..10; # OUTPUT: «4␤5␤6␤7␤8␤9␤10␤» 

start 方法

method start(Supply:D: &startee --> Supply:D)

创造供应供应。对于原始供应中的每个值,代码对象在另一个线程上进行调度,并返回单个值(如果代码成功)或者没有值退出(如果代码失败)的供应。

这对于异步启动您未阻止的工作非常有用。

使用migrate可将值再次连接到单个电源中。

migrate 方法

method migrate(Supply:D: --> Supply:D)

采用供应本身具有类型为Supply的值作为输入。每次外部电源发出新的电源时,都会触发该电源并发出其值。之前任何已开发的供应将被关闭。这对于在不同数据源之间进行迁移非常有用,并且只关注最新的数据源。

例如,想象一下用户可以在不同股票之间切换的应用程序。当他们切换到新的连接时,建立与Web套接字的连接以获取最新值,并且应关闭任何先前的连接。来自Web套接字的每个值流将表示为Supply,它们本身被发送到供应的最新数据源供观察。迁移方法可用于将此供应供应平坦为单个供应用户关心的当前值。

这是一个简单的模拟这样的程序:

my Supplier $stock-sources .= new;
 
sub watch-stock($symbol) {
    $stock-sources.emit: supply {
        say "Starting to watch $symbol";
        whenever Supply.interval(1) {
            emit "$symbol: 111." ~ 99.rand.Int;
        }
        CLOSE say "Lost interest in $symbol";
    }
}
 
$stock-sources.Supply.migrate.tap: *.say;
 
watch-stock('GOOG');
sleep 3;
watch-stock('AAPL');
sleep 3;

产生的输出如下:

Starting to watch GOOG
GOOG: 111.67
GOOG: 111.20
GOOG: 111.37
Lost interest in GOOG
Starting to watch AAPL
AAPL: 111.55
AAPL: 111.6
AAPL: 111.6

组合 supplies 的方法

merge 方法

method merge(Supply @*supplies --> Supply:D)

创建从给定供应中看到的任何值的供应。只有在完成所有给定的供应后,才能完成供应。也可以称为类方法。

zip 方法

method zip(Supply @*supplies, :&with = &[,] --> Supply:D)

只要在所有耗材上看到新值,就会创建一个发出组合值的供应。默认情况下,会创建列表,但可以通过使用:with参数指定自己的组合器来更改列表。一旦完成任何给定的供应,就会完成供应。也可以称为类方法。

zip-latest 方法

method zip-latest(Supply @*supplies, :&with = &[,], :$initial --> Supply:D)

只要在任何耗材上看到新值,就会创建一个发出组合值的供应。默认情况下,会创建列表,但可以通过使用:with参数指定自己的组合器来更改列表。可选的:initial参数可用于指示组合值的初始状态。默认情况下,在生成的电源上发出第一个组合值之前,所有耗材必须至少有一个值。一旦完成任何给定的供应,就会完成供应。也可以称为类方法。

I/O 功能作为 supplies

sub signal

sub signal(*@signals, :$scheduler = $*SCHEDULER)

为指定的Signal枚举(例如SIGINT)和可选的:scheduler参数创建一个供应。收到的任何信号都将在电源上发出。例如:

signal(SIGINT).tap( { say "Thank you for your attention"; exit 0 } );

会抓住Control-C,谢谢你,然后退出。

要从信号编号转到信号,您可以执行以下操作:

signal(Signal(2)).tap( -> $sig { say "Received signal: $sig" } );

通过检查Signal ::。键可以找到支持的信号列表(就像任何枚举一样)。有关枚举如何工作的更多详细信息,请参阅枚举。

注意:高达2018.05的Rakudo版本有一个错误,因为某些系统上的信号数值不正确。例如,即使信号(10)实际上是特定系统上的SIGUSR1,它也会返回SIGBUS。话虽如此,使用信号(SIGUSR1)在2018.04,2018.04.1和2018.05之外的所有Rakudo版本上都按预期工作,其中通过使用信号(SIGBUS)可以实现预期的行为。这些问题在2018.05之后的Rakudo版本中得到了解决。

IO::Notification.watch-path 方法

method watch-path($path --> Supply:D)

创建OS将向其发出值的供应,以指示给定路径的文件系统上的更改。在IO对象上还有一个带有watch方法的快捷方式,如下所示:

IO::Notification.watch-path(".").act( { say "$^file changed" } );
".".IO.watch.act(                     { say "$^file changed" } );   # same 
Supply 
comments powered by Disqus