Mqtt.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Protocols;
  15. /**
  16. * Mqtt Protocol.
  17. *
  18. * @author walkor<walkor@workerman.net>
  19. */
  20. class Mqtt
  21. {
  22. /**
  23. * CONNECT Packet.
  24. */
  25. const CMD_CONNECT = 1;
  26. /**
  27. * CONNACK
  28. */
  29. const CMD_CONNACK = 2;
  30. /**
  31. * PUBLISH
  32. */
  33. const CMD_PUBLISH = 3;
  34. /**
  35. * PUBACK
  36. */
  37. const CMD_PUBACK = 4;
  38. /**
  39. * PUBREC
  40. */
  41. const CMD_PUBREC = 5;
  42. /**
  43. * PUBREL
  44. */
  45. const CMD_PUBREL = 6;
  46. /**
  47. * PUBCOMP
  48. */
  49. const CMD_PUBCOMP = 7;
  50. /**
  51. * SUBSCRIBE
  52. */
  53. const CMD_SUBSCRIBE = 8;
  54. /**
  55. * SUBACK
  56. */
  57. const CMD_SUBACK = 9;
  58. /**
  59. * UNSUBSCRIBE
  60. */
  61. const CMD_UNSUBSCRIBE = 10;
  62. /**
  63. * UNSUBACK
  64. */
  65. const CMD_UNSUBACK = 11;
  66. /**
  67. * PINGREQ
  68. */
  69. const CMD_PINGREQ = 12;
  70. /**
  71. * PINGRESP
  72. */
  73. const CMD_PINGRESP = 13;
  74. /**
  75. * DISCONNECT
  76. */
  77. const CMD_DISCONNECT = 14;
  78. /**
  79. * Check the integrity of the package.
  80. *
  81. * @param string $buffer
  82. * @return int
  83. */
  84. public static function input($buffer)
  85. {
  86. $length = strlen($buffer);
  87. $body_length = static::getBodyLength($buffer, $head_bytes);
  88. $total_length = $body_length + $head_bytes;
  89. if ($length < $total_length) {
  90. return 0;
  91. }
  92. return $total_length;
  93. }
  94. /**
  95. * Encode.
  96. *
  97. * @param string $buffer
  98. * @return string
  99. */
  100. public static function encode($data)
  101. {
  102. $cmd = $data['cmd'];
  103. switch ($data['cmd']) {
  104. // ['cmd'=>1, 'clean_session'=>x, 'will'=>['qos'=>x, 'retain'=>x, 'topic'=>x, 'content'=>x],'username'=>x, 'password'=>x, 'keepalive'=>x, 'protocol_name'=>x, 'protocol_level'=>x, 'client_id' => x]
  105. case static::CMD_CONNECT;
  106. $body = self::packString($data['protocol_name']).chr($data['protocol_level']);
  107. $connect_flags = 0;
  108. if (!empty($data['clean_session'])) {
  109. $connect_flags |= 1 << 1;
  110. }
  111. if (!empty($data['will'])) {
  112. $connect_flags |= 1 << 2;
  113. $connect_flags |= $data['will']['qos'] << 3;
  114. if ($data['will']['retain']) {
  115. $connect_flags |= 1 << 5;
  116. }
  117. }
  118. if (!empty($data['password'])) {
  119. $connect_flags |= 1 << 6;
  120. }
  121. if (!empty($data['username'])) {
  122. $connect_flags |= 1 << 7;
  123. }
  124. $body .= chr($connect_flags);
  125. $keepalive = !empty($data['keepalive']) && (int)$data['keepalive'] >= 0 ? (int)$data['keepalive'] : 0;
  126. $body .= pack('n', $keepalive);
  127. $body .= static::packString($data['client_id']);
  128. if (!empty($data['will'])) {
  129. $body .= static::packString($data['will']['topic']);
  130. $body .= static::packString($data['will']['content']);
  131. }
  132. if(!empty($data['username']) || $data['username'] === '0') {
  133. $body .= static::packString($data['username']);
  134. }
  135. if(!empty($data['password']) || $data['password'] === '0') {
  136. $body .= static::packString($data['password']);
  137. }
  138. $head = self::packHead($cmd, strlen($body));
  139. return $head.$body;
  140. //['cmd'=>2, 'session_present'=>0/1, 'code'=>x]
  141. case static::CMD_CONNACK:
  142. $body = !empty($data['session_present']) ? chr(1) : chr(0);
  143. $code = !empty($data['code']) ? $data['code'] : 0;
  144. $body .= chr($code);
  145. $head = static::packHead($cmd, strlen($body));
  146. return $head.$body;
  147. // ['cmd'=>3, 'message_id'=>x, 'topic'=>x, 'content'=>x, 'qos'=>0/1/2, 'dup'=>0/1, 'retain'=>0/1]
  148. case static::CMD_PUBLISH:
  149. $body = static::packString($data['topic']);
  150. $qos = isset($data['qos']) ? $data['qos'] : 0;
  151. if ($qos) {
  152. $body .= pack('n', $data['message_id']);
  153. }
  154. $body .= $data['content'];
  155. $dup = isset($data['dup']) ? $data['dup'] : 0;
  156. $retain = isset($data['retain']) ? $data['retain'] : 0;
  157. $head = static::packHead($cmd, strlen($body), $dup, $qos, $retain);
  158. return $head.$body;
  159. // ['cmd'=>x, 'message_id'=>x]
  160. case static::CMD_PUBACK:
  161. case static::CMD_PUBREC:
  162. case static::CMD_PUBREL:
  163. case static::CMD_PUBCOMP:
  164. $body = pack('n', $data['message_id']);
  165. if ($cmd === static::CMD_PUBREL) {
  166. $head = static::packHead($cmd, strlen($body), 0, 1);
  167. } else {
  168. $head = static::packHead($cmd, strlen($body));
  169. }
  170. return $head.$body;
  171. // ['cmd'=>8, 'message_id'=>x, 'topics'=>[topic=>qos, ..]]]
  172. case static::CMD_SUBSCRIBE:
  173. $id = $data['message_id'];
  174. $body = pack('n', $id);
  175. foreach($data['topics'] as $topic => $qos){
  176. $body .= self::packString($topic);
  177. $body .= chr($qos);
  178. }
  179. $head = static::packHead($cmd, strlen($body), 0, 1);
  180. return $head.$body;
  181. // ['cmd'=>9, 'message_id'=>x, 'codes'=>[x,x,..]]
  182. case static::CMD_SUBACK:
  183. $codes = $data['codes'];
  184. $body = pack('n', $data['message_id']).call_user_func_array('pack', array_merge(array('C*'), $codes));
  185. $head = static::packHead($cmd, strlen($body));
  186. return $head.$body;
  187. // ['cmd' => 10, 'message_id' => $message_id, 'topics' => $topics];
  188. case static::CMD_UNSUBSCRIBE:
  189. $body = pack('n', $data['message_id']);
  190. foreach ($data['topics'] as $topic) {
  191. $body .= static::packString($topic);
  192. }
  193. $head = static::packHead($cmd, strlen($body), 0, 1);
  194. return $head . $body;
  195. // ['cmd'=>11, 'message_id'=>x]
  196. case static::CMD_UNSUBACK:
  197. $body = pack('n', $data['message_id']);
  198. $head = static::packHead($cmd, strlen($body));
  199. return $head.$body;
  200. // ['cmd'=>x]
  201. case static::CMD_PINGREQ;
  202. case static::CMD_PINGRESP:
  203. case static::CMD_DISCONNECT:
  204. return static::packHead($cmd, 0);
  205. }
  206. }
  207. /**
  208. * Decode.
  209. *
  210. * @param string $buffer
  211. * @return string
  212. */
  213. public static function decode($buffer)
  214. {
  215. $cmd = static::getCmd($buffer);
  216. $body = static::getBody($buffer);
  217. switch ($cmd) {
  218. case static::CMD_CONNECT:
  219. $protocol_name = static::readString($body);
  220. $protocol_level = ord($body[0]);
  221. $clean_session = ord($body[1]) >> 1 & 0x1;
  222. $will_flag = ord($body[1]) >> 2 & 0x1;
  223. $will_qos = ord($body[1]) >> 3 & 0x3;
  224. $will_retain = ord($body[1]) >> 5 & 0x1;
  225. $password_flag = ord($body[1]) >> 6 & 0x1;
  226. $username_flag = ord($body[1]) >> 7 & 0x1;
  227. $body = substr($body, 2);
  228. $tmp = unpack('n', $body, $body);
  229. $keepalive = $tmp[1];
  230. $body = substr($body, 2);
  231. $client_id = static::readString($body);
  232. if ($will_flag) {
  233. $will_topic = static::readString($body);
  234. $will_content = static::readString($body);
  235. }
  236. $username = $password = '';
  237. if ($username_flag) {
  238. $username = static::readString($body);
  239. }
  240. if ($password_flag) {
  241. $password = static::readString($body);
  242. }
  243. // ['cmd'=>1, 'clean_session'=>x, 'will'=>['qos'=>x, 'retain'=>x, 'topic'=>x, 'content'=>x],'username'=>x, 'password'=>x, 'keepalive'=>x, 'protocol_name'=>x, 'protocol_level'=>x, 'client_id' => x]
  244. $package = array(
  245. 'cmd' => $cmd,
  246. 'protocol_name' => $protocol_name,
  247. 'protocol_level' => $protocol_level,
  248. 'clean_session' => $clean_session,
  249. 'will' => array(),
  250. 'username' => $username,
  251. 'password' => $password,
  252. 'keepalive' => $keepalive,
  253. 'client_id' => $client_id,
  254. );
  255. if ($will_flag) {
  256. $package['will'] = array(
  257. 'qos' => $will_qos,
  258. 'retain' => $will_retain,
  259. 'topic' => $will_topic,
  260. 'content' => $will_content
  261. );
  262. } else {
  263. unset($package['will']);
  264. }
  265. return $package;
  266. case static::CMD_CONNACK:
  267. $session_present = ord($body[0]) & 0x01;
  268. $code = ord($body[1]);
  269. return array('cmd' => $cmd, 'session_present' => $session_present, 'code' => $code);
  270. case static::CMD_PUBLISH:
  271. $dup = ord($buffer[0]) >> 3 & 0x1;
  272. $qos = ord($buffer[0]) >> 1 & 0x3;
  273. $retain = ord($buffer[0]) & 0x1;
  274. $topic = static::readString($body);
  275. if ($qos) {
  276. $message_id = static::readShortInt($body);
  277. }
  278. $package = array('cmd' => $cmd, 'topic' => $topic, 'content' => $body, 'dup' => $dup, 'qos' => $qos, 'retain' => $retain);
  279. if ($qos) {
  280. $package['message_id'] = $message_id;
  281. }
  282. return $package;
  283. case static::CMD_PUBACK:
  284. case static::CMD_PUBREC:
  285. case static::CMD_PUBREL:
  286. case static::CMD_PUBCOMP:
  287. $message_id = static::readShortInt($body);
  288. return array('cmd' => $cmd, 'message_id' => $message_id);
  289. case static::CMD_SUBSCRIBE:
  290. $message_id = static::readShortInt($body);
  291. $topics = array();
  292. while ($body) {
  293. $topic = static::readString($body);
  294. $qos = ord($body[0]);
  295. $topics[$topic] = $qos;
  296. $body = substr($body, 1);
  297. }
  298. return array('cmd' => $cmd, 'message_id' => $message_id, 'topics' => $topics);
  299. case static::CMD_SUBACK:
  300. $message_id = static::readShortInt($body);
  301. $tmp = unpack('C*', $body);
  302. $codes = array_values($tmp);
  303. return array('cmd' => $cmd, 'message_id'=> $message_id, 'codes' => $codes);
  304. case static::CMD_UNSUBSCRIBE:
  305. $message_id = static::readShortInt($body);
  306. $topics = array();
  307. while ($body) {
  308. $topic = static::readString($body);
  309. $topics[] = $topic;
  310. }
  311. return array('cmd' => $cmd, 'message_id' => $message_id, 'topics' => $topics);
  312. case static::CMD_UNSUBACK:
  313. $message_id = static::readShortInt($body);
  314. return array('cmd' => $cmd, 'message_id' => $message_id);
  315. case static::CMD_PINGREQ:
  316. case static::CMD_PINGRESP:
  317. case static::CMD_DISCONNECT:
  318. return array('cmd' => $cmd);
  319. }
  320. return $buffer;
  321. }
  322. /**
  323. * Pack string.
  324. *
  325. * @param $str
  326. * @return string
  327. */
  328. public static function packString($str){
  329. $len = strlen($str);
  330. return pack('n', $len).$str;
  331. }
  332. /**
  333. * Write body length.
  334. *
  335. * @param $len
  336. * @return string
  337. */
  338. protected static function writeBodyLength($len)
  339. {
  340. $string = "";
  341. do{
  342. $digit = $len % 128;
  343. $len = $len >> 7;
  344. // if there are more digits to encode, set the top bit of this digit
  345. if ( $len > 0 )
  346. $digit = ($digit | 0x80);
  347. $string .= chr($digit);
  348. }while ( $len > 0 );
  349. return $string;
  350. }
  351. /**
  352. * Get cmd.
  353. *
  354. * @param $buffer
  355. * @return int
  356. */
  357. public static function getCmd($buffer)
  358. {
  359. return ord($buffer[0]) >> 4;
  360. }
  361. /**
  362. * Get body length.
  363. *
  364. * @param $buffer
  365. * @param $head_bytes
  366. * @return int
  367. */
  368. public static function getBodyLength($buffer, &$head_bytes){
  369. $head_bytes = $multiplier = 1;
  370. $value = 0;
  371. do{
  372. if (!isset($buffer[$head_bytes])) {
  373. $head_bytes = 0;
  374. return 0;
  375. }
  376. $digit = ord($buffer[$head_bytes]);
  377. $value += ($digit & 127) * $multiplier;
  378. $multiplier *= 128;
  379. $head_bytes++;
  380. }while (($digit & 128) != 0);
  381. return $value;
  382. }
  383. /**
  384. * Get body.
  385. *
  386. * @param $buffer
  387. * @return string
  388. */
  389. public static function getBody($buffer)
  390. {
  391. $body_length = static::getBodyLength($buffer, $head_bytes);
  392. $buffer = substr($buffer, $head_bytes, $body_length);
  393. return $buffer;
  394. }
  395. /**
  396. * Read string from buffer.
  397. * @param $buffer
  398. * @return string
  399. */
  400. public static function readString(&$buffer) {
  401. $tmp = unpack('n', $buffer);
  402. $length = $tmp[1];
  403. if ($length + 2 > strlen($buffer)) {
  404. echo "buffer:".bin2hex($buffer)." lenth:$length not enough for unpackString\n";
  405. }
  406. $string = substr($buffer, 2, $length);
  407. $buffer = substr($buffer, $length + 2);
  408. return $string;
  409. }
  410. /**
  411. * Read unsigned short int from buffer.
  412. * @param $buffer
  413. * @return mixed
  414. */
  415. public static function readShortInt(&$buffer) {
  416. $tmp = unpack('n', $buffer);
  417. $buffer = substr($buffer, 2);
  418. return $tmp[1];
  419. }
  420. /**
  421. * packHead.
  422. * @param $cmd
  423. * @param $body_length
  424. * @param int $dup
  425. * @param int $qos
  426. * @param int $retain
  427. * @return string
  428. */
  429. public static function packHead($cmd, $body_length, $dup = 0, $qos = 0, $retain = 0)
  430. {
  431. $cmd = $cmd << 4;
  432. if ($dup) {
  433. $cmd |= 1 << 3;
  434. }
  435. if ($qos) {
  436. $cmd |= $qos << 1;
  437. }
  438. if ($retain) {
  439. $cmd |= 1;
  440. }
  441. return chr($cmd).static::writeBodyLength($body_length);
  442. }
  443. }