> 1) ^ 0xA001); } else { $crc = $crc >> 1; } } } $crc = strval(base_convert($crc, 10, 16)); $crc = substr('0000'.$crc, -4); return substr($crc, -2).substr($crc, 0,2); } function lampMqttCmd($sendTopic,$backTopic,$sendData){ // 订阅信息,接收一个信息后退出 $server = 'streetserver.weclouds.xyz'; $port = 1883; $username = 'weclouds'; $password = 'weclouds@1234'; $client_id = time().rand(100000,999999); // 设置你的连接客户端id $mqtt = new Mqtt($server, $port, $client_id); if($mqtt->connect(true, NULL, $username, $password)) { //链接不成功再重复执行监听连接 $mqtt->publish($sendTopic, $sendData, 2); // $mqtt->publish('/WEGW/ReadIn/865860046894466', pack('H*','010AA00B'), 2); }else{ return ''; } $topics[$backTopic] = array("qos" => 2,"function" => "procmsg"); // $topics['/WEGW/ReadOut/865860046894466'] = array("qos" => 2, "function" => "procmsg"); // 订阅主题 $mqtt->subscribe($topics, 0); $true = 1; $t = time(); while($true == 1){ $true = $mqtt->proc(); if (time() - $t >= 10) { return ''; } } $mqtt->close(); return $true; } function send_cmd($cmd,$type = 0, $timeout = 30, $ms_timeout = 200) { $ip = '123.57.20.89'; $port = '6800'; $client = @stream_socket_client("tcp://{$ip}:{$port}", $errno, $errstr, 30); if ($client) { fwrite($client, $cmd); stream_set_timeout($client, $timeout, $ms_timeout); $result = fread($client, 8196); fclose($client); return $result; } else { sleep(1); send_cmd($cmd,$type, $timeout, $ms_timeout); } } $task->count = 1; $task->onWorkerStart = function($task) { global $websocketDB; global $mqttCmdList; global $beginDate; $beginDate = date('H:i',time()); $mqttCmdList = array(); $config = json_decode(file_get_contents('./db_config.php'),true); $websocketDB = new \Workerman\MySQL\Connection($config['hostname'], '3306', $config['username'], $config['password'], $config['dbname']); Timer::add(1, function() { global $websocketDB; global $mqttCmdList; global $beginDate; if ($beginDate != date('H:i',time())) { $beginDate = date('H:i',time()); $sql = 'SELECT P.id,T.value FROM project AS P left join lampinfo as L on L.projectid = P.id left join timezone AS T on P.timezone = T.id where L.policyid != 0'; $list = $websocketDB->query($sql); foreach ($list as $value) { $nowTime = time() - 8 * 3600 + $value['value'] * 3600; $nowDate = date('Y-m-d',$nowTime); $nowH = date('H:i',$nowTime); $sql = 'SELECT L.id as lampid,PI.*,N.protocoltype,L.protocoltype as lampprotocoltype,L.address FROM lampinfo AS L join (SELECT * from policy_cmd where starttime <= "'.$nowDate.'") as PI on L.policyid = PI.policyid join network as N on N.id = L.networkid where PI.policyid != 0 AND N.status = 1'; $policyList = $websocketDB->query($sql); foreach ($policyList as $policy) { $index = 1; while ($index <= 10) { if (empty($policy['time'.$index])) { break; } if ($policy['time'.$index] == $nowH) { $data = array(); $data['cmdtype'] = 0; $data['relateid'] = $policy['lampid']; $data['statuscmd'] = $policy['value'.$index]; $data['updatetime'] = date("Y-m-d H:i:s"); if ($policy['protocoltype'] == 4 && $policy['lampprotocoltype'] == 1) { $mqttCmdList[$policy['lampid']] = array('address'=>$policy['address'],'lampid'=>$policy['lampid'],'status'=>$data['statuscmd'],'cmdCount'=>0); }else{ $sql = 'SELECT id from batch_switch_cmd where relateid = '.$policy['lampid'].' AND cmdtype = 0'; $cmdList = $websocketDB->query($sql); if (empty($cmdList) || empty($cmdList[0]['id'])) { $cmdid = $websocketDB->insert('batch_switch_cmd')->cols($data)->query(); }else{ $websocketDB->update('batch_switch_cmd')->cols($data)->where('id='.$cmdList[0]['id']); $cmdid = $cmdList[0]['id']; } $cmd = '{"cmd_type":"batch_switch_cmd","cmd_id":'.$cmdid.',"broadcast":0}'; send_cmd($cmd,1,0); } break; } $index ++; } } } } }); // mqtt 设备异步发送指令 Timer::add(30, function() { global $websocketDB; global $mqttCmdList; $temp = $mqttCmdList; foreach ($temp as $key => $value) { if ($value['cmdCount'] >= 3) { unset($mqttCmdList[$key]); continue; } if ($value['status'] == 1) { // 开灯 $lightness = 100; }else{ // 关灯 $lightness = 0; } $sendData = '0110df0a000204'; $lightness = empty($lightness) ? '0000' : '0000'.base_convert($lightness, 10, 16); $sendData .= mb_strlen($lightness) == 4 ? $lightness : substr($lightness, -4); $sendData .= mb_substr('0000'.base_convert(600, 10, 16), -4); $sendData .= crc16(pack('H*',$sendData)); $res = lampMqttCmd('/WE/TransIn/'.$policy['address'],'/WE/TransOut/'.$policy['address'],pack('H*','0001'.$sendData)); if (empty($res)) { $mqttCmdList[$key]['cmdCount'] += 1; }else{ if (strtolower(crc16(pack('H*',substr($res['msg'], 0,-4)))) == strtolower(substr($res['msg'], -4))) { if ($value['status'] == 1) { $websocketDB->update('lampinfo')->cols(array('status'=>1,'lighteness'=>100))->where('id='.$value['lampid']); }else{ $websocketDB->update('lampinfo')->cols(array('status'=>0,'lighteness'=>0))->where('id='.$value['lampid']); } unset($mqttCmdList[$key]); }else{ $mqttCmdList[$key]['cmdCount'] += 1; } } } }); }; // 运行worker Worker::runAll();