There Is More Than One Way at the Same Time

楽土

Perl 6 Rosattacode 中用于并行计算的部分已经非常过时,而且缺少了过去几周中添加或修复的所有好东西。通过这篇文章,我想为 Rosettacode 提出一个更新的版本。如果你认为我遗漏了什么,请在下面评论。请记住,Rosettacode 是用来炫耀的,而不是用来全面的。

use v6.d.PREVIEW;

Perl 6 提供了通过线程并行执行代码的功能。有一些低级的 custructs 可以启动一个线程或安全地暂停执行。

my $t1 = Thread.start({ say [+] 1..10_000_000 });
my $t2 = Thread.start({ say [*] 1..10_000 });
$t1.finish;
$t2.finish;

my $l = Lock.new;
$l.lock;
$t1 = Thread.start: { $l.lock; say 'got the lock'; $l.unlock };
sleep 2; $l.unlock;

$t1.finish;

在处理列表时,可以使用由方法 hyperrace 创建的高级 Iterator。后者可能会不按顺序返回值。这些 Iterator 将把列表中的元素分配给工人线程,再由 Rakudo 的 ThreadPoolScheduler 分配给操作系统级别的线程。整个构造将阻塞,直到最后一个元素被处理。

my @values = 1..100;

sub postfix:<!> (Int $n) { [*] 1..$n }

say [+] @values.hyper.map( -> $i { print '.' if $i %% 100; $i!.chars });
For for-lovers there are the race for and hyper for keyword for distributing work over threads in the same way as their respective methods forms.

race for 1..100 {
    say .Str; # There be out of order dragons!
}

my @a = do hyper for 1..100 {
   .Int! # Here be thread dragons!
}

say [+] @a;

Perl 6 运动了遵循反应式编程模型的构造。我们可以旋转出许多工作线程,并使用线程安全的 ChannelSupply 来将值从一个线程移动到另一个线程。反应块可以将这些值流组合起来,对它们进行处理,并对一些条件做出反应,比如在一个工作线程完成生成值后进行清理或处理错误。后者是通过将 Exception-objects 装入 Failure-objects 来完成的,它可以跟踪错误在哪里首次发生,以及在哪里使用了错误而不是一个合适的值。

my \pipe = Supplier::Preserving.new;

start {
    for $*HOME {
        pipe.emit: .IO if .f & .ends-with('.txt');

        say „Looking in ⟨{.Str}⟩ for files that end in ".txt"“ if .IO.d;
        .IO.dir()».&?BLOCK when .IO.d;

        CATCH {
            default {
                note .^name, ': ', .Str;
                pipe.emit: Failure.new(.item);
            }
        }
    }
    pipe.done;
}

react {
    whenever pipe.Supply {
        say „Checking ⟨{.Str}⟩ for "Rosetta".“;
        say „I found Rosetta in ⟨{.Str}⟩“ if try .open.slurp.contains('Rosetta');
        LAST {
            say ‚Done looking for files.‘;
            done;
        }
        CATCH {
            default {
                note .^name, ': ', .Str;
            }
        }
    }
    whenever Promise.in(60*10) {
        say „I gave up to find Rosetta after 10 minutes.“;
        pipe.done;
        done;
    }
}

许多内置对象会返回一个 SupplyPromise。后者会返回一个单一的值,或者只是传达一个事件,比如超时。在上面的例子中,我们以这种方式使用了一个 Promise。下面我们炮轰一下,逐行找到并处理其输出。如果有很多不同类型的事件需要处理,这可以用在一个 react 块中。在这里,我们只是挖掘一个值流,然后逐一处理。由于我们没有一个 react 块来提供一个阻塞事件循环,我们用 await 等待 find 完成并处理它的 exitcode。给 .tap 的块内的任何东西都将在自己的线程中运行。

my $find = Proc::Async.new('find', $*HOME, '-iname', '*.txt');
$find.stdout.lines.tap: {
    say „Looking for "Rosetta" in ⟨$_⟩“;
    say „Found "Rosetta" in ⟨$_⟩“ if try .open.slurp.contains('Rosetta');
};

await $find.start.then: {
    say „find finished with exitcode: “, .result.exitcode;
};

让运算符通过线程或向量单元并行处理值,还没有做到。超运算符和 Junction 都是自动线程的候选者。如果你今天使用它们,请记住副作用可能在未来提供脚炮。

by gfldex

comments powered by Disqus