1. 前言

有时完成一个比较好的案例,但是事后没有去记载,久了之后就有点忘记了,感觉记录下来,分享下当时的做法也是很不错的。

2. 百万数据处理

这是昨天20210914号发现的异常,统计redis订阅数据的脚本,在预发也跑了一次,导致数据翻倍,先需要对受影响的插座电量数据做修复。
因相关代码环境没有迁移到go上,这次修复采用php脚本执行,下面是部分代码,公司相关逻辑已删除 sysnc-repair文件

// 处理脚本传入参数
$params = array();
foreach ($argv as $val) {
    if (strpos($val, '--') !== 0 && false === strpos($val, '=')) {
        continue;
    }
    list($k, $v) = explode('=', $val);
    $k = str_replace('--', '', $k);
    $params[$k] = $v;
}

if (!($params && isset($params['batch']) && in_array($params['batch'], ['a', 'b']))) {
    writelog('params is error');
    return;
}

writelog('执行第' . $params['batch'] . '批次');

if (date("Y-m-d H:i:s") > '2021-09-16 18:00:00') {
    writelog('time is out');
    return;
}

// 执行时间段,a时间段评测执行可行性、突发性和耗时等
$date = [
    'a' => ['2021-09-09 12:00:00', '2021-09-09 13:00:00'],
    'b' => ['2021-09-09 13:00:00', '2021-09-14 19:00:00']
];

$startTime = $date[$params['batch']][0];
$endTime   = $date[$params['batch']][1];

// 因数据量过大(超百万) 分页获取数据
$num = $sunDb->all("根据时间算出电量表总数");
if (!$num || !$num['num']) {
    writelog('num is empty');
    return;
}

$allUpNum = 0;
$time     = time();

writelog('总数量: ' . $num['num']);

$count   = 2000;
$pageAll = intval($num['num'] / $count) + 1;
$page    = 0;
// 分页获取
for ($y = 0; $y < $pageAll; $y++) {
    writelog('页数: ' . ($y + 1));

    $offset = $y * $count;
    $sql    = "分页得到电量表总数 WHERE startTime >= '{$startTime}' and endTime < '{$endTime}'  limit {$offset},{$count}";
    // 查询在这个时间段的数据
    $data = $sunDb->all($sql);
...
    // 获取只是受影响的sn
    $c2sn = $db->all("根据电量表中sn做条件查询是否是受影响的产品,得受影响的sn");
...
    // 需要更新的数据
    $update = [];
    foreach ($data as $val) {
        if (in_array($val['sn'], $c2sns)) $update[] = $val;
    }

    writelog('本页需要更新的数量:' . count($update));

    // 批量更新 每100个更一次
    $sql = '';
    $sq  = "UPDATE dianliangbiao SET powerConsume = CASE id ";
    $i   = 0;
    $id  = [];
    $ids = '';
    foreach ($update as $v) {
        $allUpNum++;
        $i++;
        $id[] = $v['ID'];
        $sql  .= sprintf("WHEN %d THEN %d ", $v['ID'], $v['powerConsume'] / 2);
        // 100条执行一次
        if ($i == 100) {
            $ids = implode(',', $id);
            $sql .= "END WHERE id IN ($ids)";
            $sunDb->query($sq . $sql);

            writelog('sql: ' . $sq . $sql);
            writelog($ids, 'sync-repair-update-');

            // 清0
            $sql = '';
            $id  = [];
            $i   = 0;
        }
    }

    // 执行后面的
    if ($id) {
        $ids = implode(',', $id);
        $sql .= "END WHERE id IN ($ids)";
        $sunDb->query($sq . $sql);

        writelog('last sql: ' . $sq . $sql);
        writelog($ids, 'sync-sunlogin-repair-c2-data-update-');
    }
}

writeLog("总共执行更新数量:" . $allUpNum . "个,耗时:" . (time() - $time));
writeLog("sync-repair end " . PHP_EOL);

处理这种大数据修复的,需要做好可能出现的问题,预估耗时、错误修复、中途中断数据是否可保持完整性等。下面是执行计划:

执行计划
1、先取2021-09-09 12:00:00 - 2021-09-09 13:00:00段数据,2万条数据做测试,预估执行时间和可能出现的情况
2、 第一步执行完毕后,分析无问题后执行 2021-09-09 13:00:00 - 2021-09-14 19:00:00时间段
操作:
执行第一步:
php7 sync-repair.php batch=a
执行第二步: php7 sync-repair.php batch=b

3. kafka消息队列的写入个性化广告数据

20210916今天,需要接入一个kafkaf的写入数据要求,实现给用户个性化呈现对应广告,然后做一次日志记录写到 kafka... 因这部分接口在老项目里,所以还是用php,引入kafka-php,执行下面composer就可,无需安装rdkafka扩展直接可用

composer require nmred/kafka-php

下面是部分代码逻辑:

 /**
  * 发送kafka消息队列 (需要php>7.1可用,composer方式)
  * @param $data
  * @param string $topic
  */
 public function sendKafka($data, $topic = 'test')
 {
     if (empty($this->config['kafka']['host'])) return;

     try {
         $host = $this->config['kafka']['host'] . ':' . $this->config['kafka']['port'];

         $config = \Kafka\ProducerConfig::getInstance();
         $config->setMetadataRefreshIntervalMs(10000);
         // 必须设置所有broker地址,否则会报错
         $config->setMetadataBrokerList($host);

         $config->setBrokerVersion('1.0.0');
         $config->setRequiredAck(1);
         $config->setIsAsyn(false);
         $config->setProduceInterval(500);

         $producer = new \Kafka\Producer(
             function () use ($data, $topic) {
                 return [
                     [
                         'topic' => $topic,
                         'value' => json_encode($data, true),
                     ]
                 ];
             }
         );

         $producer->success(function ($result) {

         });
         $producer->error(function ($errorCode) {

         });

         $producer->send(true);

     } catch (Exception $e) {

     };
 }

以上代码就可以直接添加线上的消息,如需要增加本地环境自己试试,可参考docker搭建kafka环境

注意上面的需要php>7.1才可用,而有些项目是php5.6就不能用了,那有啥办法可以解决麽,有的,采用安装php扩展php-rdkafka

/**
 * 个性化广告曝光量数据写入流量kafka (扩展方式)
 */
public function _sendKafkaTopic($topicData, $topic = 'test')
{
    if (empty($this->config['kafka']['host'])) return;

    try {
        $host = $this->config['kafka']['host'] . ':' . $this->config['kafka']['port'];

        $conf = new RdKafka\Conf();
        $conf->set('log_level', LOG_DEBUG);
        $objRdKafka = new RdKafka\Producer($conf);
        $objRdKafka->addBrokers($host);
        $oObjTopic = $objRdKafka->newTopic($topic);
        $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($topicData, true));
        $objRdKafka->flush(1000);
    } catch (Exception $e) {

    };

}
Copyright © yzx该文章修订时间: 2021-09-29 17:16:04

results matching ""

    No results matching ""