有这样一个场景, 数据发送方将压缩文件读成字节数组后发往 Kafka, 然后第三方的 Kafka Client 从中读取字节数组解压缩, 每条 message 对应一个压缩文件, 每个压缩文件中包含 _log.txt
和 _result.txt
。
Raku 可以从 Kafka 中读取消息并完成解析。
首先安装相关模块: Pkafka 用于和 Kafka 交互; Archive::Libarchive 用于解压缩字节数组。 Cro 用于 HTTP 请求,DBiish 用于数据库读写。
zef install Pkafka
zef install Archive::Libarchive
zef install Cro
zef install DBIish
代码片段如下:
use PKafka::Consumer;
use PKafka::Message;
use PKafka::Producer;
use Archive::Libarchive;
use Archive::Libarchive::Constants;
use Cro::HTTP::Client;
use JSON::Fast;
use JSON::Path;
use DBIish;
sub MAIN ()
{
my $brokers = "127.0.0.1";
my $test = PKafka::Consumer.new( topic=>"dc-diagnostic-report", brokers=>$brokers);
$test.messages.tap(-> $msg
{
given $msg
{
when PKafka::Message
{
say "got offset: {$msg.offset}";
my $log = get-log($msg.payload); # 获取 log
my ($taskid, $result) = get-result($msg.payload); # 获取 taskid 和 result
my $json = request_ads($taskid,$log, $result); # 获取 json
my @values = parse-json($json); # 解析 json, 提取出 sql value
write2db(@values); # 写数据库
}
when PKafka::EOF
{
say "Messages Consumed { $msg.total-consumed}";
}
when PKafka::Error
{
say "Error {$msg.what}";
$test.stop;
}
}
});
my $t1 = $test.consume-from-beginning(partition=>0);
await $t1;
}
#| 获取 log
sub get-log($payload) {
my $a = Archive::Libarchive.new: operation => LibarchiveRead, file => $payload;
my $log-content = $a.read-file-content(sub (Archive::Libarchive::Entry $e --> Bool) { $e.pathname.ends-with('_log.txt') });
my $log = $log-content.decode('UTF8-C8'); # encoding: https://stackoverflow.com/questions/50674498/raku-malformed-utf-8-causes-program-crash
$a.close;
return $log;
}
#| 获取 taskid 和 result
sub get-result($payload) {
my $a = Archive::Libarchive.new: operation => LibarchiveRead, file => $payload;
my $res-content = $a.read-file-content(sub (Archive::Libarchive::Entry $e --> Bool) { $e.pathname.ends-with('_result.txt') });
my $log-result = $res-content.decode('UTF8-C8');
my $taskid = $log-result.lines[0].split(":")[1]; # 获取 taskid
my $result = $log-result.lines[1].split(":")[1]; # 获取 result 的内容
$a.close;
return ($taskid, $result);
}
#| 请求 ADS
sub request_ads($taskid, $log, $result) {
my $client = Cro::HTTP::Client.new: content-type => 'application/json';
my %rds = taskId => $taskid, :$log, :$result;
my $resp = await $client.post: 'http://10.0.201.46/bls_ads/diagResultReq', body => %rds;
my $json = to-json await $resp.body-text;
return $json;
}
#| 解析 JSON
sub parse-json($json) {
my $oj = from-json($json);
my $path = JSON::Path.new('$.data.dtcs.ecu[0]');
my $ecuid = $path.values($oj)[0].{'ecuid'};
my $dtcnum = $path.values($oj)[0].{'dtcnum'};
my $dtc = $path.values($oj)[0].{'dtc'};
my $taskid = JSON::Path.new('$.taskId').values($oj)[0];
my $vtype = JSON::Path.new('$.vtype').values($oj)[0];
my $ecunum = JSON::Path.new('$.ecunum').values($oj)[0];
my $funid = JSON::Path.new('$.funid').values($oj)[0];
return ($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc);
}
#| 写数据库
sub write2db(@values) {
my $dbh = DBIish.connect("mysql", :database<wmdtc>, :host<127.0.0.1>, :user<root>, :password<000608>, :port<6606>, :RaiseError);
my ($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc) = @values;
my $sth = $dbh.prepare(q:to/STATEMENT/);
insert into wm_ads_result (taskid, vtype, funid, ecuid, dtcnum, dtc)
values (?,?,?,?,?,?)
ON DUPLICATE KEY
UPDATE vtype=?, funid=?, ecuid=?, dtcnum=?, dtc=?
STATEMENT
$sth.execute($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc,$vtype, $funid, $ecuid, $dtcnum, $dtc);
$sth.finish;
$dbh.dispose;
}
其中遇到的困难是如何在不将字节数组保存到本地磁盘的情况下,在内存中完成压缩包中各文件内容的读取。