使用 Raku 连接 Kafka

Using Raku to consumer Kafka

有这样一个场景, 数据发送方将压缩文件读成字节数组后发往 Kafka, 然后第三方的 Kafka Client 从中读取字节数组解压缩, 每条 message 对应一个压缩文件, 每个压缩文件中包含 _log.txt_result.txt

Raku 可以从 Kafka 中读取消息并完成解析。

首先安装相关模块: Pkafka 用于和 Kafka 交互; Archive::Libarchive 用于解压缩字节数组。 Cro 用于 HTTP 请求,DBiish 用于数据库读写。

zef install Pkafka
zef install Archive::Libarchive
zef install Cro
zef install DBIish

代码片段如下:

use PKafka::Consumer;
use PKafka::Message;
use PKafka::Producer;
use Archive::Libarchive; 
use Archive::Libarchive::Constants;
use Cro::HTTP::Client;
use JSON::Fast;
use JSON::Path;
use DBIish;

sub MAIN ()
{
    my $brokers = "127.0.0.1";
    my $test = PKafka::Consumer.new( topic=>"dc-diagnostic-report", brokers=>$brokers);


    $test.messages.tap(-> $msg
    {
        given $msg
        {
            when PKafka::Message
            {
                say "got offset: {$msg.offset}";
                my $log = get-log($msg.payload);                  # 获取 log
                my ($taskid, $result) = get-result($msg.payload); # 获取 taskid 和 result
                my $json = request_ads($taskid,$log, $result);    # 获取 json
                my @values =  parse-json($json);                  # 解析 json, 提取出 sql value
                write2db(@values);                                # 写数据库
            }
            when PKafka::EOF
            {
                say "Messages Consumed { $msg.total-consumed}";
            }
            when PKafka::Error
            {
                say "Error {$msg.what}";
                $test.stop;
            }
        }
    });

    my $t1 = $test.consume-from-beginning(partition=>0);

    await $t1;
}

#| 获取 log
sub get-log($payload) {
    my $a = Archive::Libarchive.new: operation => LibarchiveRead, file => $payload;
    my $log-content = $a.read-file-content(sub (Archive::Libarchive::Entry $e --> Bool) { $e.pathname.ends-with('_log.txt')    });
    my $log = $log-content.decode('UTF8-C8'); # encoding: https://stackoverflow.com/questions/50674498/raku-malformed-utf-8-causes-program-crash
    $a.close;
    return $log;
}

#| 获取 taskid 和 result
sub get-result($payload) {
    my $a = Archive::Libarchive.new: operation => LibarchiveRead, file => $payload;
    my $res-content = $a.read-file-content(sub (Archive::Libarchive::Entry $e --> Bool) { $e.pathname.ends-with('_result.txt') });
    my $log-result = $res-content.decode('UTF8-C8');

    my $taskid = $log-result.lines[0].split(":")[1];  # 获取 taskid
    my $result = $log-result.lines[1].split(":")[1];  # 获取 result 的内容
    $a.close;
    return ($taskid, $result);
}

#| 请求 ADS
sub request_ads($taskid, $log, $result) {
    my $client = Cro::HTTP::Client.new: content-type => 'application/json';
    my %rds = taskId => $taskid, :$log, :$result;
    my $resp = await $client.post: 'http://10.0.201.46/bls_ads/diagResultReq', body => %rds;
    my $json = to-json await $resp.body-text;
    return $json;
}

#| 解析 JSON
sub parse-json($json) {
    my $oj     = from-json($json);
    my $path   = JSON::Path.new('$.data.dtcs.ecu[0]');
    my $ecuid  = $path.values($oj)[0].{'ecuid'};
    my $dtcnum = $path.values($oj)[0].{'dtcnum'};
    my $dtc    = $path.values($oj)[0].{'dtc'};
    
    my $taskid  = JSON::Path.new('$.taskId').values($oj)[0];
    my $vtype   = JSON::Path.new('$.vtype').values($oj)[0];
    my $ecunum  = JSON::Path.new('$.ecunum').values($oj)[0];
    my $funid   = JSON::Path.new('$.funid').values($oj)[0];

    return ($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc);
}

#| 写数据库
sub write2db(@values) {
    my $dbh = DBIish.connect("mysql", :database<wmdtc>, :host<127.0.0.1>, :user<root>, :password<000608>, :port<6606>, :RaiseError);
    my ($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc) = @values;
    my $sth = $dbh.prepare(q:to/STATEMENT/);
        insert into wm_ads_result (taskid, vtype, funid, ecuid, dtcnum, dtc)
        values (?,?,?,?,?,?) 
        ON DUPLICATE KEY 
        UPDATE vtype=?, funid=?, ecuid=?, dtcnum=?, dtc=?
    STATEMENT

    $sth.execute($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc,$vtype, $funid, $ecuid, $dtcnum, $dtc);

    $sth.finish;
    $dbh.dispose;
}

其中遇到的困难是如何在不将字节数组保存到本地磁盘的情况下,在内存中完成压缩包中各文件内容的读取。

Kafka 
comments powered by Disqus