policyCmd.php 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. <?php
  2. use \Workerman\Worker;
  3. use \Workerman\Lib\Timer;
  4. require_once __DIR__ . '/Autoloader.php';
  5. require_once './Lib/mysql/src/Connection.php';
  6. require_once './Mqtt.php';
  7. require_once './config.php';
  8. $task = new Worker();
  9. function crc16($string) {
  10. $crc = 0xFFFF;
  11. for ($x = 0; $x < strlen ($string); $x++) {
  12. $crc = $crc ^ ord($string[$x]);
  13. for ($y = 0; $y < 8; $y++) {
  14. if (($crc & 0x0001) == 0x0001) {
  15. $crc = (($crc >> 1) ^ 0xA001);
  16. } else { $crc = $crc >> 1; }
  17. }
  18. }
  19. $crc = strval(base_convert($crc, 10, 16));
  20. $crc = substr('0000'.$crc, -4);
  21. return substr($crc, -2).substr($crc, 0,2);
  22. }
  23. function lampMqttCmd($sendTopic,$backTopic,$sendData){
  24. // 订阅信息,接收一个信息后退出
  25. $server = 'streetserver.weclouds.xyz';
  26. $port = 1883;
  27. $username = 'weclouds';
  28. $password = 'weclouds@1234';
  29. $client_id = time().rand(100000,999999); // 设置你的连接客户端id
  30. $mqtt = new Mqtt($server, $port, $client_id);
  31. if($mqtt->connect(true, NULL, $username, $password)) { //链接不成功再重复执行监听连接
  32. $mqtt->publish($sendTopic, $sendData, 2);
  33. // $mqtt->publish('/WEGW/ReadIn/865860046894466', pack('H*','010AA00B'), 2);
  34. }else{
  35. return '';
  36. }
  37. $topics[$backTopic] = array("qos" => 2,"function" => "procmsg");
  38. // $topics['/WEGW/ReadOut/865860046894466'] = array("qos" => 2, "function" => "procmsg");
  39. // 订阅主题
  40. $mqtt->subscribe($topics, 0);
  41. $true = 1;
  42. $t = time();
  43. while($true == 1){
  44. $true = $mqtt->proc();
  45. if (time() - $t >= 10) {
  46. return '';
  47. }
  48. }
  49. $mqtt->close();
  50. return $true;
  51. }
  52. function send_cmd($cmd,$type = 0, $timeout = 30, $ms_timeout = 200) {
  53. $ip = '123.57.20.89';
  54. $port = '6800';
  55. $client = @stream_socket_client("tcp://{$ip}:{$port}", $errno, $errstr, 30);
  56. if ($client) {
  57. fwrite($client, $cmd);
  58. stream_set_timeout($client, $timeout, $ms_timeout);
  59. $result = fread($client, 8196);
  60. fclose($client);
  61. return $result;
  62. } else {
  63. sleep(1);
  64. send_cmd($cmd,$type, $timeout, $ms_timeout);
  65. }
  66. }
  67. $task->count = 1;
  68. $task->onWorkerStart = function($task)
  69. {
  70. global $websocketDB;
  71. global $mqttCmdList;
  72. global $beginDate;
  73. $beginDate = date('H:i',time());
  74. $mqttCmdList = array();
  75. $config = json_decode(file_get_contents('./db_config.php'),true);
  76. $websocketDB = new \Workerman\MySQL\Connection($config['hostname'], '3306', $config['username'], $config['password'], $config['dbname']);
  77. Timer::add(1, function()
  78. {
  79. global $websocketDB;
  80. global $mqttCmdList;
  81. global $beginDate;
  82. if ($beginDate != date('H:i',time())) {
  83. $beginDate = date('H:i',time());
  84. $sql = 'SELECT P.id,T.value FROM project AS P left join lampinfo as L on L.projectid = P.id left join timezone AS T on P.timezone = T.id where L.policyid != 0';
  85. $list = $websocketDB->query($sql);
  86. foreach ($list as $value) {
  87. $nowTime = time() - 8 * 3600 + $value['value'] * 3600;
  88. $nowDate = date('Y-m-d',$nowTime);
  89. $nowH = date('H:i',$nowTime);
  90. $sql = 'SELECT L.id as lampid,PI.*,N.protocoltype,L.protocoltype as lampprotocoltype,L.address FROM lampinfo AS L join (SELECT * from policy_cmd where starttime <= "'.$nowDate.'") as PI on L.policyid = PI.policyid join network as N on N.id = L.networkid where PI.policyid != 0 AND N.status = 1';
  91. $policyList = $websocketDB->query($sql);
  92. foreach ($policyList as $policy) {
  93. $index = 1;
  94. while ($index <= 10) {
  95. if (empty($policy['time'.$index])) {
  96. break;
  97. }
  98. if ($policy['time'.$index] == $nowH) {
  99. $data = array();
  100. $data['cmdtype'] = 0;
  101. $data['relateid'] = $policy['lampid'];
  102. $data['statuscmd'] = $policy['value'.$index];
  103. $data['updatetime'] = date("Y-m-d H:i:s");
  104. if ($policy['protocoltype'] == 4 && $policy['lampprotocoltype'] == 1) {
  105. $mqttCmdList[$policy['lampid']] = array('address'=>$policy['address'],'lampid'=>$policy['lampid'],'status'=>$data['statuscmd'],'cmdCount'=>0);
  106. }else{
  107. $sql = 'SELECT id from batch_switch_cmd where relateid = '.$policy['lampid'].' AND cmdtype = 0';
  108. $cmdList = $websocketDB->query($sql);
  109. if (empty($cmdList) || empty($cmdList[0]['id'])) {
  110. $cmdid = $websocketDB->insert('batch_switch_cmd')->cols($data)->query();
  111. }else{
  112. $websocketDB->update('batch_switch_cmd')->cols($data)->where('id='.$cmdList[0]['id']);
  113. $cmdid = $cmdList[0]['id'];
  114. }
  115. $cmd = '{"cmd_type":"batch_switch_cmd","cmd_id":'.$cmdid.',"broadcast":0}';
  116. send_cmd($cmd,1,0);
  117. }
  118. break;
  119. }
  120. $index ++;
  121. }
  122. }
  123. }
  124. }
  125. });
  126. // mqtt 设备异步发送指令
  127. Timer::add(30, function()
  128. {
  129. global $websocketDB;
  130. global $mqttCmdList;
  131. $temp = $mqttCmdList;
  132. foreach ($temp as $key => $value) {
  133. if ($value['cmdCount'] >= 3) {
  134. unset($mqttCmdList[$key]);
  135. continue;
  136. }
  137. if ($value['status'] == 1) { // 开灯
  138. $lightness = 100;
  139. }else{ // 关灯
  140. $lightness = 0;
  141. }
  142. $sendData = '0110df0a000204';
  143. $lightness = empty($lightness) ? '0000' : '0000'.base_convert($lightness, 10, 16);
  144. $sendData .= mb_strlen($lightness) == 4 ? $lightness : substr($lightness, -4);
  145. $sendData .= mb_substr('0000'.base_convert(600, 10, 16), -4);
  146. $sendData .= crc16(pack('H*',$sendData));
  147. $res = lampMqttCmd('/WE/TransIn/'.$policy['address'],'/WE/TransOut/'.$policy['address'],pack('H*','0001'.$sendData));
  148. if (empty($res)) {
  149. $mqttCmdList[$key]['cmdCount'] += 1;
  150. }else{
  151. if (strtolower(crc16(pack('H*',substr($res['msg'], 0,-4)))) == strtolower(substr($res['msg'], -4))) {
  152. if ($value['status'] == 1) {
  153. $websocketDB->update('lampinfo')->cols(array('status'=>1,'lighteness'=>100))->where('id='.$value['lampid']);
  154. }else{
  155. $websocketDB->update('lampinfo')->cols(array('status'=>0,'lighteness'=>0))->where('id='.$value['lampid']);
  156. }
  157. unset($mqttCmdList[$key]);
  158. }else{
  159. $mqttCmdList[$key]['cmdCount'] += 1;
  160. }
  161. }
  162. }
  163. });
  164. };
  165. // 运行worker
  166. Worker::runAll();