Client.php 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Mqtt;
  15. use \Workerman\Connection\AsyncTcpConnection;
  16. use \Workerman\Protocols\Mqtt;
  17. use \Workerman\Lib\Timer;
  18. /**
  19. * Class Client
  20. * @package Workerman\Mqtt
  21. */
  22. class Client
  23. {
  24. /**
  25. * STATE_INITIAL.
  26. */
  27. const STATE_INITIAL = 1;
  28. /**
  29. * STATE_CONNECTING
  30. */
  31. const STATE_CONNECTING = 2;
  32. /**
  33. * STATE_WAITCONACK
  34. */
  35. const STATE_WAITCONACK = 3;
  36. /**
  37. * STATE_ESTABLISHED
  38. */
  39. const STATE_ESTABLISHED = 4;
  40. /**
  41. * STATE_DISCONNECT
  42. */
  43. const STATE_DISCONNECT = 5;
  44. /**
  45. * DEFAULT_CLIENT_ID_PREFIX
  46. */
  47. const DEFAULT_CLIENT_ID_PREFIX = 'workerman-mqtt-client';
  48. /**
  49. * MAX_TOPIC_LENGTH
  50. */
  51. const MAX_TOPIC_LENGTH = 65535;
  52. /**
  53. * @var callable
  54. */
  55. public $onConnect = null;
  56. /**
  57. * @var callable
  58. */
  59. public $onReconnect = null;
  60. /**
  61. * @var callable
  62. */
  63. public $onMessage = null;
  64. /**
  65. * @var callable
  66. */
  67. public $onClose = null;
  68. /**
  69. * @var callable
  70. */
  71. public $onError = null;
  72. /**
  73. * @var int
  74. */
  75. protected $_state = 1;
  76. /**
  77. * @var int
  78. */
  79. protected $_messageId = 1;
  80. /**
  81. * @var string
  82. */
  83. protected $_remoteAddress = '';
  84. /**
  85. * @var AsyncTcpConnection
  86. */
  87. protected $_connection = null;
  88. /**
  89. * @var boolean
  90. */
  91. protected $_firstConnect = true;
  92. /**
  93. * ['topic'=>qos, ...]
  94. * @var array
  95. */
  96. protected $_resubscribeTopics = array();
  97. /**
  98. * @var int
  99. */
  100. protected $_checkConnectionTimeoutTimer = 0;
  101. /**
  102. * @var int
  103. */
  104. protected $_pingTimer = 0;
  105. /**
  106. * @var bool
  107. */
  108. protected $_recvPingResponse = true;
  109. /**
  110. * @var bool
  111. */
  112. protected $_doNotReconnect = false;
  113. /**
  114. * @var array
  115. */
  116. protected $_outgoing = array();
  117. /**
  118. * @var array
  119. */
  120. protected static $_errorCodeStringMap = array(
  121. 1 => 'Connection Refused, unacceptable protocol version',
  122. 2 => 'Connection Refused, identifier rejected',
  123. 3 => 'Connection Refused, Server unavailable',
  124. 4 => 'Connection Refused, bad user name or password',
  125. 5 => 'Connection Refused, not authorized',
  126. 100 => 'Connection closed',
  127. 101 => 'Connection timeout',
  128. 102 => 'Connection fail',
  129. 103 => 'Connection buffer full and close connection',
  130. 140 => 'No connection to broker',
  131. 240 => 'Invalid topic',
  132. 241 => 'Invalid qos',
  133. );
  134. /**
  135. * @var array
  136. */
  137. protected $_options = array(
  138. 'clean_session' => 1, // set to 0 to receive QoS 1 and 2 messages while offline
  139. 'username' => '', // the username required by your broker
  140. 'password' => '', // the password required by your broker
  141. 'keepalive' => 50, // default 50 seconds, set to 0 to disable
  142. 'protocol_name' => 'MQTT', // protocol name MQTT or MQIsdp
  143. 'protocol_level' => 4, // protocol level, MQTT is 4 and MQIsdp is 3
  144. 'reconnect_period' => 1, // reconnect period default 1 second, set to 0 to disable
  145. 'connect_timeout' => 30, // 30 seconds, time to wait before a CONNACK is received
  146. 'resubscribe' => true, // default true, if connection is broken and reconnects, subscribed topics are automatically subscribed again.
  147. 'bindto' => '', // bindto option, used to specify the IP address that PHP will use to access the network
  148. 'ssl' => false, // ssl context, see http://php.net/manual/en/context.ssl.php
  149. 'debug' => false, // debug
  150. );
  151. /**
  152. * Client constructor.
  153. * @param $address
  154. * @param array $options
  155. */
  156. public function __construct($address, $options = array())
  157. {
  158. class_alias('\Workerman\Protocols\Mqtt', '\Workerman\Protocols\Mqtt');
  159. $this->setOptions($options);
  160. $context = array();
  161. if ($this->_options['bindto']) {
  162. $context['socket'] = array('bindto' => $this->_options['bindto']);
  163. }
  164. if ($this->_options['ssl'] && is_array($this->_options['ssl'])) {
  165. $context['ssl'] = $this->_options['ssl'];
  166. }
  167. if (strpos($address, 'mqtts') === 0) {
  168. if (empty($this->_options['ssl'])) {
  169. $this->_options['ssl'] = true;
  170. }
  171. $address = str_replace('mqtts', 'mqtt', $address);
  172. }
  173. $this->_remoteAddress = $address;
  174. $this->_connection = new AsyncTcpConnection($address, $context);
  175. $this->onReconnect = array($this, 'onMqttReconnect');
  176. $this->onMessage = function(){};
  177. if ($this->_options['ssl']) {
  178. $this->_connection->transport = 'ssl';
  179. }
  180. }
  181. /**
  182. * connect
  183. */
  184. public function connect()
  185. {
  186. $this->_doNotReconnect = false;
  187. $this->_connection->onConnect = array($this, 'onConnectionConnect');
  188. $this->_connection->onMessage = array($this, 'onConnectionMessage');
  189. $this->_connection->onError = array($this, 'onConnectionError');
  190. $this->_connection->onClose = array($this, 'onConnectionClose');
  191. $this->_connection->onBufferFull = array($this, 'onConnectionBufferFull');
  192. $this->_state = static::STATE_CONNECTING;
  193. $this->_connection->connect();
  194. $this->setConnectionTimeout($this->_options['connect_timeout']);
  195. if ($this->_options['debug']) {
  196. echo "-> Try to connect to {$this->_remoteAddress}", PHP_EOL;
  197. }
  198. }
  199. /**
  200. * subscribe
  201. *
  202. * @param $topic
  203. * @param array $options
  204. * @param callable $callback
  205. */
  206. public function subscribe($topic, $options = array(), $callback = null)
  207. {
  208. if ($this->checkDisconnecting($callback)) {
  209. return;
  210. }
  211. if (is_array($topic)) {
  212. $topics = $topic;
  213. } else {
  214. $qos = !is_callable($options) && isset($options['qos']) ? $options['qos'] : 0;
  215. $topics = array($topic => $qos);
  216. }
  217. $args = func_get_args();
  218. $callback = end($args);
  219. if (!is_callable($callback)) {
  220. $callback = null;
  221. }
  222. if ($invalid_topic = static::validateTopics($topics)) {
  223. $this->triggerError(240, $callback);
  224. return;
  225. }
  226. if ($this->_options['resubscribe']) {
  227. $this->_resubscribeTopics += $topics;
  228. }
  229. $package = array(
  230. 'cmd' => Mqtt::CMD_SUBSCRIBE,
  231. 'topics' => $topics,
  232. 'message_id' => $this->incrMessageId(),
  233. );
  234. if ($this->_options['debug']) {
  235. echo "-> Send SUBSCRIBE package, topic:".implode(',', array_keys($topics))." message_id:{$package['message_id']}", PHP_EOL;
  236. }
  237. $this->sendPackage($package);
  238. if ($callback) {
  239. $this->_outgoing[$package['message_id']] = function($exception, $codes = array())use($callback, $topics) {
  240. if ($exception) {
  241. call_user_func($callback, $exception, array());
  242. return;
  243. }
  244. $granted = array();
  245. $topics = array_keys($topics);
  246. foreach ($topics as $key => $topic) {
  247. $granted[$topic] = $codes[$key];
  248. }
  249. if ($callback) {
  250. call_user_func($callback, null, $granted);
  251. }
  252. };
  253. }
  254. }
  255. /**
  256. * unsubscribe
  257. *
  258. * @param $topic
  259. */
  260. public function unsubscribe($topic, $callback = null)
  261. {
  262. if ($this->checkDisconnecting($callback)) {
  263. return;
  264. }
  265. $topics = is_array($topic) ? $topic : array($topic);
  266. if ($invalid_topic = static::validateTopics($topics)) {
  267. $this->triggerError(240);
  268. return;
  269. }
  270. foreach ($topics as $topic) {
  271. if (isset($this->_resubscribeTopics[$topic])) {
  272. unset($this->_resubscribeTopics[$topic]);
  273. }
  274. }
  275. $package = array(
  276. 'cmd' => Mqtt::CMD_UNSUBSCRIBE,
  277. 'topics' => $topics,
  278. 'message_id' => $this->incrMessageId(),
  279. );
  280. if ($callback) {
  281. $this->_outgoing[$package['message_id']] = $callback;
  282. }
  283. if ($this->_options['debug']) {
  284. echo "-> Send UNSUBSCRIBE package, topic:".implode(',', $topics)." message_id:{$package['message_id']}", PHP_EOL;
  285. }
  286. $this->sendPackage($package);
  287. }
  288. /**
  289. * publish
  290. *
  291. * @param $topic
  292. * @param $content
  293. * @param array $options
  294. * @param callable $callback
  295. */
  296. public function publish($topic, $content, $options = array(), $callback = null)
  297. {
  298. if ($this->checkDisconnecting($callback)) {
  299. return;
  300. }
  301. static::isValidTopic($topic);
  302. $qos = 0;
  303. $retain = 0;
  304. $dup = 0;
  305. if (isset($options['qos'])) {
  306. $qos = $options['qos'];
  307. if($this->checkInvalidQos($qos, $callback)) {
  308. return;
  309. }
  310. }
  311. if (!empty($options['retain'])) {
  312. $retain = 1;
  313. }
  314. if (!empty($options['dup'])) {
  315. $dup = 1;
  316. }
  317. $package = array(
  318. 'cmd' => Mqtt::CMD_PUBLISH,
  319. 'topic' => $topic,
  320. 'content' => $content,
  321. 'retain' => $retain,
  322. 'qos' => $qos,
  323. 'dup' => $dup,
  324. );
  325. if ($qos) {
  326. $package['message_id'] = $this->incrMessageId();
  327. if ($callback) {
  328. $this->_outgoing[$package['message_id']] = $callback;
  329. }
  330. }
  331. if ($this->_options['debug']) {
  332. $message_id = isset($package['message_id']) ? $package['message_id'] : '';
  333. echo "-> Send PUBLISH package, topic:$topic content:$content retain:$retain qos:$qos dup:$dup message_id:$message_id", PHP_EOL;
  334. }
  335. $this->sendPackage($package);
  336. }
  337. /**
  338. * disconnect
  339. */
  340. public function disconnect()
  341. {
  342. $this->sendPackage(array('cmd' => Mqtt::CMD_DISCONNECT));
  343. if ($this->_options['debug']) {
  344. echo "-> Send DISCONNECT package", PHP_EOL;
  345. }
  346. $this->close();
  347. }
  348. /**
  349. * close
  350. */
  351. public function close()
  352. {
  353. $this->_doNotReconnect = true;
  354. if ($this->_options['debug']) {
  355. echo "-> Connection->close() called", PHP_EOL;
  356. }
  357. $this->_connection->destroy();
  358. }
  359. /**
  360. * reconnect
  361. *
  362. * @param int $after
  363. */
  364. public function reconnect($after = 0)
  365. {
  366. $this->_doNotReconnect = false;
  367. $this->_connection->onConnect = array($this, 'onConnectionConnect');
  368. $this->_connection->onMessage = array($this, 'onConnectionMessage');
  369. $this->_connection->onError = array($this, 'onConnectionError');
  370. $this->_connection->onClose = array($this, 'onConnectionClose');
  371. $this->_connection->reConnect($after);
  372. $this->setConnectionTimeout($this->_options['connect_timeout'] + $after);
  373. if ($this->_options['debug']) {
  374. echo "-- Reconnect after $after seconds", PHP_EOL;
  375. }
  376. }
  377. /**
  378. * onConnectionConnect
  379. */
  380. public function onConnectionConnect()
  381. {
  382. if ($this->_doNotReconnect) {
  383. $this->close();
  384. return;
  385. }
  386. //['cmd'=>1, 'clean_session'=>x, 'will'=>['qos'=>x, 'retain'=>x, 'topic'=>x, 'content'=>x],'username'=>x, 'password'=>x, 'keepalive'=>x, 'protocol_name'=>x, 'protocol_level'=>x, 'client_id' => x]
  387. $package = array(
  388. 'cmd' => Mqtt::CMD_CONNECT,
  389. 'clean_session' => $this->_options['clean_session'],
  390. 'username' => $this->_options['username'],
  391. 'password' => $this->_options['password'],
  392. 'keepalive' => $this->_options['keepalive'],
  393. 'protocol_name' => $this->_options['protocol_name'],
  394. 'protocol_level' => $this->_options['protocol_level'],
  395. 'client_id' => $this->_options['client_id'],
  396. );
  397. if (isset($this->_options['will'])) {
  398. $package['will'] = $this->_options['will'];
  399. }
  400. $this->_state = static::STATE_WAITCONACK;
  401. $this->_connection->send($package);
  402. if ($this->_options['debug']) {
  403. echo "-- Tcp connection established", PHP_EOL;
  404. echo "-> Send CONNECT package client_id:{$this->_options['client_id']} username:{$this->_options['username']} password:{$this->_options['password']} clean_session:{$this->_options['clean_session']} protocol_name:{$this->_options['protocol_name']} protocol_level:{$this->_options['protocol_level']}", PHP_EOL;
  405. }
  406. }
  407. /**
  408. * onMqttReconnect
  409. */
  410. public function onMqttReconnect()
  411. {
  412. if ($this->_options['clean_session'] && $this->_options['resubscribe'] && $this->_resubscribeTopics) {
  413. $package = array(
  414. 'cmd' => Mqtt::CMD_SUBSCRIBE,
  415. 'topics' => $this->_resubscribeTopics,
  416. 'message_id' => $this->incrMessageId(),
  417. );
  418. $this->sendPackage($package);
  419. if ($this->_options['debug']) {
  420. echo "-> Send SUBSCRIBE(Resubscribe) package topics:" .
  421. implode(',', array_keys($this->_resubscribeTopics))." message_id:{$package['message_id']}", PHP_EOL;
  422. }
  423. }
  424. }
  425. /**
  426. * onConnectionMessage
  427. *
  428. * @param $connection
  429. * @param $data
  430. */
  431. public function onConnectionMessage($connection, $data)
  432. {
  433. $cmd = $data['cmd'];
  434. switch ($cmd) {
  435. case Mqtt::CMD_CONNACK:
  436. $code = $data['code'];
  437. if ($code != 0) {
  438. $message = static::$_errorCodeStringMap[$code];
  439. if ($this->_options['debug']) {
  440. echo "<- Recv CONNACK package but get error " . $message . PHP_EOL;
  441. }
  442. $this->triggerError($code);
  443. $this->_connection->destroy();
  444. return;
  445. }
  446. if ($this->_options['debug']) {
  447. echo "<- Recv CONNACK package, MQTT connect success", PHP_EOL;
  448. }
  449. $this->_state = static::STATE_ESTABLISHED;
  450. if ($this->_firstConnect) {
  451. if ($this->onConnect) {
  452. call_user_func($this->onConnect, $this);
  453. }
  454. $this->_firstConnect = false;
  455. } else {
  456. if ($this->onReconnect) {
  457. call_user_func($this->onReconnect, $this);
  458. }
  459. }
  460. $this->setPingTimer($this->_options['keepalive']);
  461. $this->cancelConnectionTimeout();
  462. return;
  463. //['cmd' => $cmd, 'topic' => $topic, 'content' => $content]
  464. case Mqtt::CMD_PUBLISH:
  465. $topic = $data['topic'];
  466. $content = $data['content'];
  467. $qos = $data['qos'];
  468. $message_id = isset($data['message_id']) ? $data['message_id'] : '';
  469. if ($this->_options['debug']) {
  470. echo "<- Recv PUBLISH package, message_id:$message_id qos:$qos topic:$topic content:$content", PHP_EOL;
  471. }
  472. call_user_func($this->onMessage, $topic, $content, $this);
  473. // Connection may be closed in onMessage callback.
  474. if ($this->_state !== static::STATE_ESTABLISHED) {
  475. return;
  476. }
  477. switch ($qos) {
  478. case 0:
  479. break;
  480. case 1:
  481. if ($this->_options['debug']) {
  482. echo "-> Send PUBACK package, message_id:$message_id", PHP_EOL;
  483. }
  484. $this->sendPackage(array(
  485. 'cmd' => Mqtt::CMD_PUBACK,
  486. 'message_id' => $message_id
  487. ));
  488. break;
  489. case 2:
  490. if ($this->_options['debug']) {
  491. echo "-> Send PUBREC package, message_id:$message_id", PHP_EOL;
  492. }
  493. $this->sendPackage(array(
  494. 'cmd' => Mqtt::CMD_PUBREC,
  495. 'message_id' => $message_id
  496. ));
  497. }
  498. return;
  499. case Mqtt::CMD_PUBREC:
  500. $message_id = $data['message_id'];
  501. if ($this->_options['debug']) {
  502. echo "<- Recv PUBREC package, message_id:$message_id", PHP_EOL;
  503. echo "-> Send PUBREL package, message_id:$message_id", PHP_EOL;
  504. }
  505. $this->sendPackage(array(
  506. 'cmd' => Mqtt::CMD_PUBREL,
  507. 'message_id' => $data['message_id']
  508. ));
  509. break;
  510. case Mqtt::CMD_PUBREL:
  511. $message_id = $data['message_id'];
  512. if ($this->_options['debug']) {
  513. echo "<- Recv PUBREL package, message_id:$message_id", PHP_EOL;
  514. echo "-> Send PUBCOMP package, message_id:$message_id", PHP_EOL;
  515. }
  516. $this->sendPackage(array(
  517. 'cmd' => Mqtt::CMD_PUBCOMP,
  518. 'message_id' => $message_id
  519. ));
  520. break;
  521. case Mqtt::CMD_PUBACK:
  522. case Mqtt::CMD_PUBCOMP:
  523. $message_id = $data['message_id'];
  524. if ($this->_options['debug']) {
  525. echo "<- Recv ".($cmd == Mqtt::CMD_PUBACK ? 'PUBACK' : 'PUBCOMP') . " package, message_id:$message_id", PHP_EOL;
  526. }
  527. if (!empty($this->_outgoing[$message_id])) {
  528. if ($this->_options['debug']) {
  529. echo "-- Trigger PUB callback for message_id:$message_id", PHP_EOL;
  530. }
  531. $callback = $this->_outgoing[$message_id];
  532. unset($this->_outgoing[$message_id]);
  533. call_user_func($callback, null);
  534. }
  535. break;
  536. case Mqtt::CMD_SUBACK:
  537. case Mqtt::CMD_UNSUBACK:
  538. $message_id = $data['message_id'];
  539. if ($this->_options['debug']) {
  540. echo "<- Recv ".($cmd == Mqtt::CMD_SUBACK ? 'SUBACK' : 'UNSUBACK') . " package, message_id:$message_id", PHP_EOL;
  541. }
  542. $callback = isset($this->_outgoing[$message_id]) ? $this->_outgoing[$message_id] : null;
  543. unset($this->_outgoing[$message_id]);
  544. if ($callback) {
  545. if ($this->_options['debug']) {
  546. echo "-- Trigger ".($cmd == Mqtt::CMD_SUBACK ? 'SUB' : 'UNSUB') . " callback for message_id:$message_id", PHP_EOL;
  547. }
  548. if ($cmd === Mqtt::CMD_SUBACK) {
  549. call_user_func($callback, null, $data['codes']);
  550. } else {
  551. call_user_func($callback, null);
  552. }
  553. }
  554. break;
  555. case Mqtt::CMD_PINGRESP:
  556. $this->_recvPingResponse = true;
  557. if ($this->_options['debug']) {
  558. echo "<- Recv PINGRESP package", PHP_EOL;
  559. }
  560. break;
  561. default :
  562. echo "unknow cmd";
  563. }
  564. }
  565. /**
  566. * onConnectionClose
  567. */
  568. public function onConnectionClose()
  569. {
  570. if ($this->_options['debug']) {
  571. echo "-- Connection closed", PHP_EOL;
  572. }
  573. $this->cancelPingTimer();
  574. $this->cancelConnectionTimeout();
  575. $this->_recvPingResponse = true;
  576. $this->_state = static::STATE_DISCONNECT;
  577. if (!$this->_doNotReconnect && $this->_options['reconnect_period'] > 0) {
  578. $this->reConnect($this->_options['reconnect_period']);
  579. }
  580. $this->flushOutgoing();
  581. if ($this->onClose) {
  582. call_user_func($this->onClose, $this);
  583. }
  584. }
  585. /**
  586. * onConnectionError
  587. *
  588. * @param $connection
  589. * @param $code
  590. */
  591. public function onConnectionError($connection, $code)
  592. {
  593. // Connection error
  594. if ($code === 1) {
  595. $this->triggerError(102);
  596. // Send fail, connection closed
  597. } else {
  598. $this->triggerError(100);
  599. }
  600. }
  601. /**
  602. * onConnectionBufferFull
  603. */
  604. public function onConnectionBufferFull()
  605. {
  606. if ($this->_options['debug']) {
  607. echo "-- Connection buffer full and close connection", PHP_EOL;
  608. }
  609. $this->triggerError(103);
  610. $this->_connection->destroy();
  611. }
  612. /**
  613. * incrMessageId
  614. *
  615. * @return int
  616. */
  617. protected function incrMessageId()
  618. {
  619. $message_id = $this->_messageId++;
  620. if ($message_id >= 65535) {
  621. $this->_messageId = 1;
  622. }
  623. return $message_id;
  624. }
  625. /**
  626. * checkInvalidQos
  627. *
  628. * @param $qos
  629. * @return boolean
  630. */
  631. protected function checkInvalidQos($qos, $callback = null)
  632. {
  633. if ($qos !== 0 && $qos !== 1 && $qos !== 2) {
  634. $this->triggerError(241, $callback);
  635. return true;
  636. }
  637. return false;
  638. }
  639. /**
  640. * isValidTopic
  641. *
  642. * @param $topic
  643. * @return boolean
  644. */
  645. protected static function isValidTopic($topic)
  646. {
  647. if (!static::isString($topic)) {
  648. return false;
  649. }
  650. $topic_length = strlen($topic);
  651. if ($topic_length > static::MAX_TOPIC_LENGTH) {
  652. return false;
  653. }
  654. return true;
  655. }
  656. /**
  657. * validateTopics
  658. *
  659. * @param $topics
  660. * @return null|string
  661. */
  662. protected static function validateTopics($topics)
  663. {
  664. if (empty($topics)) {
  665. return 'array()';
  666. }
  667. foreach ($topics as $topic) {
  668. if(!static::isValidTopic($topic)) {
  669. return $topic;
  670. }
  671. }
  672. return null;
  673. }
  674. /**
  675. * is string.
  676. * @param $string
  677. * @return bool
  678. */
  679. protected static function isString($string) {
  680. return (is_string($string) || is_integer($string)) && strlen($string) > 0;
  681. }
  682. /**
  683. * triggerError
  684. *
  685. * @param $exception
  686. * @param $callback
  687. */
  688. protected function triggerError($code, $callback = null)
  689. {
  690. $exception = new \Exception(static::$_errorCodeStringMap[$code], $code);
  691. if ($this->_options['debug']) {
  692. echo "-- Error: ".$exception->getMessage() . PHP_EOL;
  693. }
  694. if (!$callback) {
  695. $callback = $this->onError ? $this->onError : function($exception){
  696. echo "Mqtt client: ", $exception->getMessage(), PHP_EOL;
  697. };
  698. }
  699. call_user_func($callback, $exception);
  700. }
  701. /**
  702. * createRandomClientId
  703. *
  704. * @return string
  705. */
  706. protected function createRandomClientId()
  707. {
  708. mt_srand();
  709. return static::DEFAULT_CLIENT_ID_PREFIX . '-' . mt_rand();
  710. }
  711. /**
  712. * addCheckTimeoutTimer
  713. */
  714. protected function setConnectionTimeout($timeout)
  715. {
  716. $this->cancelConnectionTimeout();
  717. $this->_checkConnectionTimeoutTimer = Timer::add($timeout, array($this, 'checkConnectTimeout'), null, false);
  718. }
  719. /**
  720. * cancelConnectionTimeout
  721. */
  722. protected function cancelConnectionTimeout()
  723. {
  724. if ($this->_checkConnectionTimeoutTimer) {
  725. Timer::del($this->_checkConnectionTimeoutTimer);
  726. $this->_checkConnectionTimeoutTimer = 0;
  727. }
  728. }
  729. /**
  730. * setPingTimer
  731. */
  732. protected function setPingTimer($ping_interval)
  733. {
  734. $this->cancelPingTimer();
  735. $connection = $this->_connection;
  736. $this->_pingTimer = Timer::add($ping_interval, function()use($connection){
  737. if (!$this->_recvPingResponse) {
  738. if ($this->_options['debug']) {
  739. echo "<- Recv PINGRESP timeout", PHP_EOL;
  740. echo "-> Close connection", PHP_EOL;
  741. }
  742. $this->_connection->destroy();
  743. return;
  744. }
  745. if ($this->_options['debug']) {
  746. echo "-> Send PINGREQ package", PHP_EOL;
  747. }
  748. $this->_recvPingResponse = false;
  749. $connection->send(array('cmd' => Mqtt::CMD_PINGREQ));
  750. });
  751. }
  752. /**
  753. * cancelPingTimer
  754. */
  755. protected function cancelPingTimer()
  756. {
  757. if ($this->_pingTimer) {
  758. Timer::del($this->_pingTimer);
  759. $this->_pingTimer = 0;
  760. }
  761. }
  762. /**
  763. * checkConnectTimeout
  764. */
  765. public function checkConnectTimeout()
  766. {
  767. if ($this->_state === static::STATE_CONNECTING || $this->_state === static::STATE_WAITCONACK) {
  768. $this->triggerError(101);
  769. $this->_connection->destroy();
  770. }
  771. }
  772. /**
  773. * checkDisconnecting
  774. *
  775. * @param null $callback
  776. * @return bool
  777. */
  778. protected function checkDisconnecting($callback = null)
  779. {
  780. if ($this->_state !== static::STATE_ESTABLISHED) {
  781. $this->triggerError(140, $callback);
  782. return true;
  783. }
  784. return false;
  785. }
  786. /**
  787. * flushOutgoing
  788. */
  789. protected function flushOutgoing()
  790. {
  791. foreach ($this->_outgoing as $message_id => $callback) {
  792. $this->triggerError(100, $callback);
  793. }
  794. $this->_outgoing = array();
  795. }
  796. /**
  797. * sendPackage
  798. *
  799. * @param $package
  800. */
  801. protected function sendPackage($package)
  802. {
  803. if ($this->checkDisconnecting()) {
  804. return;
  805. }
  806. $this->_connection->send($package);
  807. }
  808. /**
  809. * set options.
  810. *
  811. * @param $options
  812. * @throws \Exception
  813. */
  814. protected function setOptions($options)
  815. {
  816. if (isset($options['clean_session']) && !$options['clean_session']) {
  817. $this->_options['clean_session'] = 0;
  818. }
  819. if (isset($options['username'])) {
  820. if (!static::isString($options['username'])) {
  821. throw new \Exception('Bad username, expected string or integer but ' . gettype($options['username']) . ' provided.');
  822. }
  823. $this->_options['username'] = $options['username'];
  824. }
  825. if (isset($options['password'])) {
  826. if (!static::isString($options['password'])) {
  827. throw new \Exception('Bad password, expected string or integer but ' . gettype($options['password']) . ' provided.');
  828. }
  829. $this->_options['password'] = $options['password'];
  830. }
  831. if (isset($options['keepalive'])) {
  832. $keepalive = (int)$options['keepalive'];
  833. if (!static::isString($keepalive)) {
  834. throw new \Exception('Bad keepalive, expected integer but ' . gettype($keepalive) . ' provided.');
  835. }
  836. if ($keepalive < 0) {
  837. throw new \Exception('Bad keepalive, expected integer which not less than 0 but ' . $keepalive . ' provided.');
  838. }
  839. $this->_options['keepalive'] = $keepalive;
  840. }
  841. if (isset($options['protocol_name'])) {
  842. $protocol_name = $options['protocol_name'];
  843. if ($protocol_name !== 'MQTT' && $protocol_name !== 'MQIsdp') {
  844. throw new \Exception('Bad protocol_name of options, expected MQTT or MQIsdp but ' . $protocol_name . ' provided.');
  845. }
  846. $this->_options['protocol_name'] = $protocol_name;
  847. }
  848. if (isset($options['protocol_level'])) {
  849. $protocol_level = (int)$options['protocol_level'];
  850. if ($this->_options['protocol_name'] === 'MQTT' && $protocol_level !== 4) {
  851. throw new \Exception('Bad protocol_level of options, expected 4 for protocol_name MQTT but ' . $options['protocol_level'] . ' provided.');
  852. }
  853. if ($this->_options['protocol_name'] === 'MQIsdp' && $protocol_level !== 3) {
  854. throw new \Exception('Bad protocol_level of options, expected 3 for protocol_name MQTT but ' . $options['protocol_level'] . ' provided.');
  855. }
  856. $this->_options['protocol_level'] = $protocol_level;
  857. }
  858. if (isset($options['client_id'])) {
  859. if (!static::isString($options['client_id'])) {
  860. throw new \Exception('Bad client_id of options, expected string or integer but ' . gettype($options['client_id']) . ' provided.');
  861. }
  862. $this->_options['client_id'] = $options['client_id'];
  863. } else {
  864. $this->_options['client_id'] = $this->createRandomClientId();
  865. }
  866. if (isset($options['will'])) {
  867. $will = $options['will'];
  868. $required = array('qos', 'topic', 'content');
  869. foreach ($required as $key) {
  870. if (!isset($will[$key])) {
  871. throw new \Exception('Bad will options, $will['.$key.'] missing.');
  872. }
  873. }
  874. if (!static::isString($will['topic'])) {
  875. throw new \Exception('Bad $will[\'topic\'] of options, expected string or integer but ' . gettype($will['topic']) . ' provided.');
  876. }
  877. if (!static::isString($will['content'])) {
  878. throw new \Exception('Bad $will[\'content\'] of options, expected string or integer but ' . gettype($will['content']) . ' provided.');
  879. }
  880. if ($this->checkInvalidQos($will['qos'])) {
  881. throw new \Exception('Bad will qos:' . var_export($will['qos'], true));
  882. }
  883. $this->_options['will'] = $options['will'];
  884. }
  885. if (isset($options['reconnect_period'])) {
  886. $reconnect_period = (int)$options['reconnect_period'];
  887. if (!static::isString($reconnect_period)) {
  888. throw new \Exception('Bad reconnect_period of options, expected integer but ' . gettype($options['reconnect_period']) . ' provided.');
  889. }
  890. if ($reconnect_period < 0) {
  891. throw new \Exception('Bad reconnect_period, expected integer which not less than 0 but ' . $options['reconnect_period'] . ' provided.');
  892. }
  893. $this->_options['reconnect_period'] = $reconnect_period;
  894. }
  895. if (isset($options['connect_timeout'])) {
  896. $connect_timeout = (int)$options['connect_timeout'];
  897. if (!static::isString($connect_timeout)) {
  898. throw new \Exception('Bad connect_timeout of options, expected integer but ' . gettype($options['connect_timeout']) . ' provided.');
  899. }
  900. if ($connect_timeout <= 0) {
  901. throw new \Exception('Bad connect_timeout, expected integer which greater than 0 but ' . $options['connect_timeout'] . ' provided.');
  902. }
  903. $this->_options['connect_timeout'] = $connect_timeout;
  904. }
  905. if (isset($options['resubscribe']) && !$options['resubscribe']) {
  906. $this->_options['resubscribe'] = false;
  907. }
  908. if (!empty($options['bindto'])) {
  909. $this->_options['bindto'] = $options['bindto'];
  910. }
  911. if (isset($options['ssl'])) {
  912. $this->_options['ssl'] = $options['ssl'];
  913. }
  914. if (isset($options['debug'])) {
  915. $this->_options['debug'] = !empty($options['debug']);
  916. }
  917. }
  918. }