Class Channel

Thread-safe queue for sending values from producers to consumers

class Channel {}

Channel 是一个线程安全的队列,可帮助你将一个或多个生产者的一系列对象发送给一个或多个消费者。每个对象将仅到达由调度程序选择的一个这样的消费者。如果只有一个消费者和一个生产者,则保证保留对象的顺序。在 Channel 上发送是非阻塞的。

my $c = Channel.new;
await (^10).map: {
    start {
        my $r = rand;
        sleep $r;
        $c.send($r);
    }
}
$c.close;
say $c.list;

可以在并发页面中找到更多示例。

方法

send 方法

定义为:

method send(Channel:D: \item)

将项目排入频道。如果通道已经关闭,则抛出类型X :: Channel :: SendOnClosed的异常。此调用不会阻止等待使用者获取该对象。对可排队的项目数量没有设定限制,因此应注意防止失控排队。

my $c = Channel.new;
$c.send(1);
$c.send([2, 3, 4, 5]);
$c.close;
say $c.list; # OUTPUT: «(1 [2 3 4 5])␤» 

receive 方法

定义为:

method receive(Channel:D:)

从频道接收和删除项目。如果没有项目存在,它会阻塞,等待来自另一个线程的发送。

如果通道已关闭,并且已经删除了最后一项,或者在接收等待项目到达时调用了close,则抛出类型X :: Channel :: ReceiveOnClosed的异常。

如果通道已被标记为不稳定且方法失败,并且最后一项已被删除,则抛出作为异常而失败的参数。

请参阅方法轮询以获取不会引发异常的非阻塞版本。

my $c = Channel.new;
$c.send(1);
say $c.receive; # OUTPUT: «1␤» 

poll 方法

定义为:

method poll(Channel:D:)

从频道接收和删除项目。如果没有项目,则返回Nil而不是等待。

my $c = Channel.new;
Promise.in(2).then: { $c.close; }
^10 .map({ $c.send($_); });
loop {
    if $c.poll -> $item { $item.say };
    if $c.closed  { last };
    sleep 0.1;
}

请参阅方法接收,以获取正确响应通道关闭和故障的阻止版本。

close 方法

定义为:

method close(Channel:D:)

通常关闭频道。这使后续的发送调用死于X :: Channel :: SendOnClosed。后续调用.receive可能仍然会耗尽先前发送的任何剩余项目,但如果队列为空,则会抛出X :: Channel :: ReceiveOnClosed异常。由@()或.list方法生成的Seq在完成此操作之前不会终止。 when-block也将在封闭的通道上正确终止。

my $c = Channel.new;
$c.close;
$c.send(1);
CATCH { default { put .^name, ': ', .Str } };
# OUTPUT: «X::Channel::SendOnClosed: Cannot send a message on a closed channel␤» 

请注意,抛出的任何异常都可能会阻止调用.close,这可能会挂起接收线程。在这种情况下,使用LEAVE移相器强制执行.close调用。

list 方法

定义为:

method list(Channel:D: --> List:D)

返回基于Seq的列表,它将迭代队列中的项目并在迭代时从中删除每个项目。这只能在调用close方法后终止。

my $c = Channel.new; $c.send(1); $c.send(2);
$c.close;
say $c.list; # OUTPUT: «(1 2)␤» 

closed 方法

定义为:

method closed(Channel:D: --> Promise:D)

返回通过调用方法close关闭通道后将保留的promise。

my $c = Channel.new;
$c.closed.then({ say "It's closed!" });
$c.close;
sleep 1;

fail 方法

定义为:

method fail(Channel:D: $error)

关闭通道(即,使后续发送调用死亡),并将要作为通道中的最终元素抛出的错误排入队列。方法receive会将该错误作为异常抛出。如果频道已经关闭或者已经调用了.fail,则不执行任何操作。

my $c = Channel.new;
$c.fail("Bad error happens!");
$c.receive;
CATCH { default { put .^name, ': ', .Str } };
# OUTPUT: «X::AdHoc: Bad error happens!␤» 

Capture 方法

定义为:

method Capture(Channel:D --> Capture:D)

相当于在调用者上调用.List.Capture。

Supply 方法

定义为:

method Supply(Channel:D:)

这将返回按需供应,该供应为通道上收到的每个值发出一个值。当通道关闭时,将在Supply上调用done。

my $c = Channel.new;
my Supply $s1 = $c.Supply;
my Supply $s2 = $c.Supply;
$s1.tap(-> $v { say "First $v" });
$s2.tap(-> $v { say "Second $v" });
^10 .map({ $c.send($_) });
sleep 1;

对此方法的多次调用会生成多个Supply实例,这些实例会与Channel的值竞争。

sub await

定义为:

multi sub await(Channel:D)
multi sub await(*@)

等待所有一个或多个通道都有可用值,并返回这些值(它在通道上调用.receive)。也适用于承诺。

my $c = Channel.new;
Promise.in(1).then({$c.send(1)});
say await $c;

comments powered by Disqus