1. 前言
有时完成一个比较好的案例,但是事后没有去记载,久了之后就有点忘记了,感觉记录下来,分享下当时的做法也是很不错的。
2. 百万数据处理
这是昨天20210914号发现的异常,统计redis订阅数据的脚本,在预发也跑了一次,导致数据翻倍,先需要对受影响的插座电量数据做修复。
因相关代码环境没有迁移到go上,这次修复采用php脚本执行,下面是部分代码,公司相关逻辑已删除
sysnc-repair文件
// 处理脚本传入参数
$params = array();
foreach ($argv as $val) {
if (strpos($val, '--') !== 0 && false === strpos($val, '=')) {
continue;
}
list($k, $v) = explode('=', $val);
$k = str_replace('--', '', $k);
$params[$k] = $v;
}
if (!($params && isset($params['batch']) && in_array($params['batch'], ['a', 'b']))) {
writelog('params is error');
return;
}
writelog('执行第' . $params['batch'] . '批次');
if (date("Y-m-d H:i:s") > '2021-09-16 18:00:00') {
writelog('time is out');
return;
}
// 执行时间段,a时间段评测执行可行性、突发性和耗时等
$date = [
'a' => ['2021-09-09 12:00:00', '2021-09-09 13:00:00'],
'b' => ['2021-09-09 13:00:00', '2021-09-14 19:00:00']
];
$startTime = $date[$params['batch']][0];
$endTime = $date[$params['batch']][1];
// 因数据量过大(超百万) 分页获取数据
$num = $sunDb->all("根据时间算出电量表总数");
if (!$num || !$num['num']) {
writelog('num is empty');
return;
}
$allUpNum = 0;
$time = time();
writelog('总数量: ' . $num['num']);
$count = 2000;
$pageAll = intval($num['num'] / $count) + 1;
$page = 0;
// 分页获取
for ($y = 0; $y < $pageAll; $y++) {
writelog('页数: ' . ($y + 1));
$offset = $y * $count;
$sql = "分页得到电量表总数 WHERE startTime >= '{$startTime}' and endTime < '{$endTime}' limit {$offset},{$count}";
// 查询在这个时间段的数据
$data = $sunDb->all($sql);
...
// 获取只是受影响的sn
$c2sn = $db->all("根据电量表中sn做条件查询是否是受影响的产品,得受影响的sn");
...
// 需要更新的数据
$update = [];
foreach ($data as $val) {
if (in_array($val['sn'], $c2sns)) $update[] = $val;
}
writelog('本页需要更新的数量:' . count($update));
// 批量更新 每100个更一次
$sql = '';
$sq = "UPDATE dianliangbiao SET powerConsume = CASE id ";
$i = 0;
$id = [];
$ids = '';
foreach ($update as $v) {
$allUpNum++;
$i++;
$id[] = $v['ID'];
$sql .= sprintf("WHEN %d THEN %d ", $v['ID'], $v['powerConsume'] / 2);
// 100条执行一次
if ($i == 100) {
$ids = implode(',', $id);
$sql .= "END WHERE id IN ($ids)";
$sunDb->query($sq . $sql);
writelog('sql: ' . $sq . $sql);
writelog($ids, 'sync-repair-update-');
// 清0
$sql = '';
$id = [];
$i = 0;
}
}
// 执行后面的
if ($id) {
$ids = implode(',', $id);
$sql .= "END WHERE id IN ($ids)";
$sunDb->query($sq . $sql);
writelog('last sql: ' . $sq . $sql);
writelog($ids, 'sync-sunlogin-repair-c2-data-update-');
}
}
writeLog("总共执行更新数量:" . $allUpNum . "个,耗时:" . (time() - $time));
writeLog("sync-repair end " . PHP_EOL);
处理这种大数据修复的,需要做好可能出现的问题,预估耗时、错误修复、中途中断数据是否可保持完整性等。下面是执行计划:
3. kafka消息队列的写入个性化广告数据
20210916今天,需要接入一个kafkaf的写入数据要求,实现给用户个性化呈现对应广告,然后做一次日志记录写到 kafka... 因这部分接口在老项目里,所以还是用php,引入kafka-php,执行下面composer就可,无需安装rdkafka扩展直接可用
composer require nmred/kafka-php
下面是部分代码逻辑:
/**
* 发送kafka消息队列 (需要php>7.1可用,composer方式)
* @param $data
* @param string $topic
*/
public function sendKafka($data, $topic = 'test')
{
if (empty($this->config['kafka']['host'])) return;
try {
$host = $this->config['kafka']['host'] . ':' . $this->config['kafka']['port'];
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
// 必须设置所有broker地址,否则会报错
$config->setMetadataBrokerList($host);
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
function () use ($data, $topic) {
return [
[
'topic' => $topic,
'value' => json_encode($data, true),
]
];
}
);
$producer->success(function ($result) {
});
$producer->error(function ($errorCode) {
});
$producer->send(true);
} catch (Exception $e) {
};
}
以上代码就可以直接添加线上的消息,如需要增加本地环境自己试试,可参考docker搭建kafka环境
注意上面的需要php>7.1才可用,而有些项目是php5.6就不能用了,那有啥办法可以解决麽,有的,采用安装php扩展php-rdkafka
/**
* 个性化广告曝光量数据写入流量kafka (扩展方式)
*/
public function _sendKafkaTopic($topicData, $topic = 'test')
{
if (empty($this->config['kafka']['host'])) return;
try {
$host = $this->config['kafka']['host'] . ':' . $this->config['kafka']['port'];
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
$objRdKafka = new RdKafka\Producer($conf);
$objRdKafka->addBrokers($host);
$oObjTopic = $objRdKafka->newTopic($topic);
$oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($topicData, true));
$objRdKafka->flush(1000);
} catch (Exception $e) {
};
}