1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003 |
- <?php
- /**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
- namespace Workerman\Mqtt;
- use \Workerman\Connection\AsyncTcpConnection;
- use \Workerman\Protocols\Mqtt;
- use \Workerman\Lib\Timer;
- /**
- * Class Client
- * @package Workerman\Mqtt
- */
- class Client
- {
- /**
- * STATE_INITIAL.
- */
- const STATE_INITIAL = 1;
- /**
- * STATE_CONNECTING
- */
- const STATE_CONNECTING = 2;
- /**
- * STATE_WAITCONACK
- */
- const STATE_WAITCONACK = 3;
- /**
- * STATE_ESTABLISHED
- */
- const STATE_ESTABLISHED = 4;
- /**
- * STATE_DISCONNECT
- */
- const STATE_DISCONNECT = 5;
- /**
- * DEFAULT_CLIENT_ID_PREFIX
- */
- const DEFAULT_CLIENT_ID_PREFIX = 'workerman-mqtt-client';
- /**
- * MAX_TOPIC_LENGTH
- */
- const MAX_TOPIC_LENGTH = 65535;
- /**
- * @var callable
- */
- public $onConnect = null;
- /**
- * @var callable
- */
- public $onReconnect = null;
- /**
- * @var callable
- */
- public $onMessage = null;
- /**
- * @var callable
- */
- public $onClose = null;
- /**
- * @var callable
- */
- public $onError = null;
- /**
- * @var int
- */
- protected $_state = 1;
- /**
- * @var int
- */
- protected $_messageId = 1;
- /**
- * @var string
- */
- protected $_remoteAddress = '';
- /**
- * @var AsyncTcpConnection
- */
- protected $_connection = null;
- /**
- * @var boolean
- */
- protected $_firstConnect = true;
- /**
- * ['topic'=>qos, ...]
- * @var array
- */
- protected $_resubscribeTopics = array();
- /**
- * @var int
- */
- protected $_checkConnectionTimeoutTimer = 0;
- /**
- * @var int
- */
- protected $_pingTimer = 0;
- /**
- * @var bool
- */
- protected $_recvPingResponse = true;
- /**
- * @var bool
- */
- protected $_doNotReconnect = false;
- /**
- * @var array
- */
- protected $_outgoing = array();
- /**
- * @var array
- */
- protected static $_errorCodeStringMap = array(
- 1 => 'Connection Refused, unacceptable protocol version',
- 2 => 'Connection Refused, identifier rejected',
- 3 => 'Connection Refused, Server unavailable',
- 4 => 'Connection Refused, bad user name or password',
- 5 => 'Connection Refused, not authorized',
- 100 => 'Connection closed',
- 101 => 'Connection timeout',
- 102 => 'Connection fail',
- 103 => 'Connection buffer full and close connection',
- 140 => 'No connection to broker',
- 240 => 'Invalid topic',
- 241 => 'Invalid qos',
- );
- /**
- * @var array
- */
- protected $_options = array(
- 'clean_session' => 1, // set to 0 to receive QoS 1 and 2 messages while offline
- 'username' => '', // the username required by your broker
- 'password' => '', // the password required by your broker
- 'keepalive' => 50, // default 50 seconds, set to 0 to disable
- 'protocol_name' => 'MQTT', // protocol name MQTT or MQIsdp
- 'protocol_level' => 4, // protocol level, MQTT is 4 and MQIsdp is 3
- 'reconnect_period' => 1, // reconnect period default 1 second, set to 0 to disable
- 'connect_timeout' => 30, // 30 seconds, time to wait before a CONNACK is received
- 'resubscribe' => true, // default true, if connection is broken and reconnects, subscribed topics are automatically subscribed again.
- 'bindto' => '', // bindto option, used to specify the IP address that PHP will use to access the network
- 'ssl' => false, // ssl context, see http://php.net/manual/en/context.ssl.php
- 'debug' => false, // debug
- );
- /**
- * Client constructor.
- * @param $address
- * @param array $options
- */
- public function __construct($address, $options = array())
- {
- class_alias('\Workerman\Protocols\Mqtt', '\Workerman\Protocols\Mqtt');
- $this->setOptions($options);
- $context = array();
- if ($this->_options['bindto']) {
- $context['socket'] = array('bindto' => $this->_options['bindto']);
- }
- if ($this->_options['ssl'] && is_array($this->_options['ssl'])) {
- $context['ssl'] = $this->_options['ssl'];
- }
- if (strpos($address, 'mqtts') === 0) {
- if (empty($this->_options['ssl'])) {
- $this->_options['ssl'] = true;
- }
- $address = str_replace('mqtts', 'mqtt', $address);
- }
- $this->_remoteAddress = $address;
- $this->_connection = new AsyncTcpConnection($address, $context);
- $this->onReconnect = array($this, 'onMqttReconnect');
- $this->onMessage = function(){};
- if ($this->_options['ssl']) {
- $this->_connection->transport = 'ssl';
- }
- }
- /**
- * connect
- */
- public function connect()
- {
- $this->_doNotReconnect = false;
- $this->_connection->onConnect = array($this, 'onConnectionConnect');
- $this->_connection->onMessage = array($this, 'onConnectionMessage');
- $this->_connection->onError = array($this, 'onConnectionError');
- $this->_connection->onClose = array($this, 'onConnectionClose');
- $this->_connection->onBufferFull = array($this, 'onConnectionBufferFull');
- $this->_state = static::STATE_CONNECTING;
- $this->_connection->connect();
- $this->setConnectionTimeout($this->_options['connect_timeout']);
- if ($this->_options['debug']) {
- echo "-> Try to connect to {$this->_remoteAddress}", PHP_EOL;
- }
- }
- /**
- * subscribe
- *
- * @param $topic
- * @param array $options
- * @param callable $callback
- */
- public function subscribe($topic, $options = array(), $callback = null)
- {
- if ($this->checkDisconnecting($callback)) {
- return;
- }
- if (is_array($topic)) {
- $topics = $topic;
- } else {
- $qos = !is_callable($options) && isset($options['qos']) ? $options['qos'] : 0;
- $topics = array($topic => $qos);
- }
- $args = func_get_args();
- $callback = end($args);
- if (!is_callable($callback)) {
- $callback = null;
- }
- if ($invalid_topic = static::validateTopics($topics)) {
- $this->triggerError(240, $callback);
- return;
- }
- if ($this->_options['resubscribe']) {
- $this->_resubscribeTopics += $topics;
- }
- $package = array(
- 'cmd' => Mqtt::CMD_SUBSCRIBE,
- 'topics' => $topics,
- 'message_id' => $this->incrMessageId(),
- );
- if ($this->_options['debug']) {
- echo "-> Send SUBSCRIBE package, topic:".implode(',', array_keys($topics))." message_id:{$package['message_id']}", PHP_EOL;
- }
- $this->sendPackage($package);
- if ($callback) {
- $this->_outgoing[$package['message_id']] = function($exception, $codes = array())use($callback, $topics) {
- if ($exception) {
- call_user_func($callback, $exception, array());
- return;
- }
- $granted = array();
- $topics = array_keys($topics);
- foreach ($topics as $key => $topic) {
- $granted[$topic] = $codes[$key];
- }
- if ($callback) {
- call_user_func($callback, null, $granted);
- }
- };
- }
- }
- /**
- * unsubscribe
- *
- * @param $topic
- */
- public function unsubscribe($topic, $callback = null)
- {
- if ($this->checkDisconnecting($callback)) {
- return;
- }
- $topics = is_array($topic) ? $topic : array($topic);
- if ($invalid_topic = static::validateTopics($topics)) {
- $this->triggerError(240);
- return;
- }
- foreach ($topics as $topic) {
- if (isset($this->_resubscribeTopics[$topic])) {
- unset($this->_resubscribeTopics[$topic]);
- }
- }
- $package = array(
- 'cmd' => Mqtt::CMD_UNSUBSCRIBE,
- 'topics' => $topics,
- 'message_id' => $this->incrMessageId(),
- );
- if ($callback) {
- $this->_outgoing[$package['message_id']] = $callback;
- }
- if ($this->_options['debug']) {
- echo "-> Send UNSUBSCRIBE package, topic:".implode(',', $topics)." message_id:{$package['message_id']}", PHP_EOL;
- }
- $this->sendPackage($package);
- }
- /**
- * publish
- *
- * @param $topic
- * @param $content
- * @param array $options
- * @param callable $callback
- */
- public function publish($topic, $content, $options = array(), $callback = null)
- {
- if ($this->checkDisconnecting($callback)) {
- return;
- }
- static::isValidTopic($topic);
- $qos = 0;
- $retain = 0;
- $dup = 0;
- if (isset($options['qos'])) {
- $qos = $options['qos'];
- if($this->checkInvalidQos($qos, $callback)) {
- return;
- }
- }
- if (!empty($options['retain'])) {
- $retain = 1;
- }
- if (!empty($options['dup'])) {
- $dup = 1;
- }
- $package = array(
- 'cmd' => Mqtt::CMD_PUBLISH,
- 'topic' => $topic,
- 'content' => $content,
- 'retain' => $retain,
- 'qos' => $qos,
- 'dup' => $dup,
- );
- if ($qos) {
- $package['message_id'] = $this->incrMessageId();
- if ($callback) {
- $this->_outgoing[$package['message_id']] = $callback;
- }
- }
- if ($this->_options['debug']) {
- $message_id = isset($package['message_id']) ? $package['message_id'] : '';
- echo "-> Send PUBLISH package, topic:$topic content:$content retain:$retain qos:$qos dup:$dup message_id:$message_id", PHP_EOL;
- }
- $this->sendPackage($package);
- }
- /**
- * disconnect
- */
- public function disconnect()
- {
- $this->sendPackage(array('cmd' => Mqtt::CMD_DISCONNECT));
- if ($this->_options['debug']) {
- echo "-> Send DISCONNECT package", PHP_EOL;
- }
- $this->close();
- }
- /**
- * close
- */
- public function close()
- {
- $this->_doNotReconnect = true;
- if ($this->_options['debug']) {
- echo "-> Connection->close() called", PHP_EOL;
- }
- $this->_connection->destroy();
- }
- /**
- * reconnect
- *
- * @param int $after
- */
- public function reconnect($after = 0)
- {
- $this->_doNotReconnect = false;
- $this->_connection->onConnect = array($this, 'onConnectionConnect');
- $this->_connection->onMessage = array($this, 'onConnectionMessage');
- $this->_connection->onError = array($this, 'onConnectionError');
- $this->_connection->onClose = array($this, 'onConnectionClose');
- $this->_connection->reConnect($after);
- $this->setConnectionTimeout($this->_options['connect_timeout'] + $after);
- if ($this->_options['debug']) {
- echo "-- Reconnect after $after seconds", PHP_EOL;
- }
- }
- /**
- * onConnectionConnect
- */
- public function onConnectionConnect()
- {
- if ($this->_doNotReconnect) {
- $this->close();
- return;
- }
- //['cmd'=>1, 'clean_session'=>x, 'will'=>['qos'=>x, 'retain'=>x, 'topic'=>x, 'content'=>x],'username'=>x, 'password'=>x, 'keepalive'=>x, 'protocol_name'=>x, 'protocol_level'=>x, 'client_id' => x]
- $package = array(
- 'cmd' => Mqtt::CMD_CONNECT,
- 'clean_session' => $this->_options['clean_session'],
- 'username' => $this->_options['username'],
- 'password' => $this->_options['password'],
- 'keepalive' => $this->_options['keepalive'],
- 'protocol_name' => $this->_options['protocol_name'],
- 'protocol_level' => $this->_options['protocol_level'],
- 'client_id' => $this->_options['client_id'],
- );
- if (isset($this->_options['will'])) {
- $package['will'] = $this->_options['will'];
- }
- $this->_state = static::STATE_WAITCONACK;
- $this->_connection->send($package);
- if ($this->_options['debug']) {
- echo "-- Tcp connection established", PHP_EOL;
- echo "-> Send CONNECT package client_id:{$this->_options['client_id']} username:{$this->_options['username']} password:{$this->_options['password']} clean_session:{$this->_options['clean_session']} protocol_name:{$this->_options['protocol_name']} protocol_level:{$this->_options['protocol_level']}", PHP_EOL;
- }
- }
- /**
- * onMqttReconnect
- */
- public function onMqttReconnect()
- {
- if ($this->_options['clean_session'] && $this->_options['resubscribe'] && $this->_resubscribeTopics) {
- $package = array(
- 'cmd' => Mqtt::CMD_SUBSCRIBE,
- 'topics' => $this->_resubscribeTopics,
- 'message_id' => $this->incrMessageId(),
- );
- $this->sendPackage($package);
- if ($this->_options['debug']) {
- echo "-> Send SUBSCRIBE(Resubscribe) package topics:" .
- implode(',', array_keys($this->_resubscribeTopics))." message_id:{$package['message_id']}", PHP_EOL;
- }
- }
- }
- /**
- * onConnectionMessage
- *
- * @param $connection
- * @param $data
- */
- public function onConnectionMessage($connection, $data)
- {
- $cmd = $data['cmd'];
- switch ($cmd) {
- case Mqtt::CMD_CONNACK:
- $code = $data['code'];
- if ($code != 0) {
- $message = static::$_errorCodeStringMap[$code];
- if ($this->_options['debug']) {
- echo "<- Recv CONNACK package but get error " . $message . PHP_EOL;
- }
- $this->triggerError($code);
- $this->_connection->destroy();
- return;
- }
- if ($this->_options['debug']) {
- echo "<- Recv CONNACK package, MQTT connect success", PHP_EOL;
- }
- $this->_state = static::STATE_ESTABLISHED;
- if ($this->_firstConnect) {
- if ($this->onConnect) {
- call_user_func($this->onConnect, $this);
- }
- $this->_firstConnect = false;
- } else {
- if ($this->onReconnect) {
- call_user_func($this->onReconnect, $this);
- }
- }
- $this->setPingTimer($this->_options['keepalive']);
- $this->cancelConnectionTimeout();
- return;
- //['cmd' => $cmd, 'topic' => $topic, 'content' => $content]
- case Mqtt::CMD_PUBLISH:
- $topic = $data['topic'];
- $content = $data['content'];
- $qos = $data['qos'];
- $message_id = isset($data['message_id']) ? $data['message_id'] : '';
- if ($this->_options['debug']) {
- echo "<- Recv PUBLISH package, message_id:$message_id qos:$qos topic:$topic content:$content", PHP_EOL;
- }
- call_user_func($this->onMessage, $topic, $content, $this);
- // Connection may be closed in onMessage callback.
- if ($this->_state !== static::STATE_ESTABLISHED) {
- return;
- }
- switch ($qos) {
- case 0:
- break;
- case 1:
- if ($this->_options['debug']) {
- echo "-> Send PUBACK package, message_id:$message_id", PHP_EOL;
- }
- $this->sendPackage(array(
- 'cmd' => Mqtt::CMD_PUBACK,
- 'message_id' => $message_id
- ));
- break;
- case 2:
- if ($this->_options['debug']) {
- echo "-> Send PUBREC package, message_id:$message_id", PHP_EOL;
- }
- $this->sendPackage(array(
- 'cmd' => Mqtt::CMD_PUBREC,
- 'message_id' => $message_id
- ));
- }
- return;
- case Mqtt::CMD_PUBREC:
- $message_id = $data['message_id'];
- if ($this->_options['debug']) {
- echo "<- Recv PUBREC package, message_id:$message_id", PHP_EOL;
- echo "-> Send PUBREL package, message_id:$message_id", PHP_EOL;
- }
- $this->sendPackage(array(
- 'cmd' => Mqtt::CMD_PUBREL,
- 'message_id' => $data['message_id']
- ));
- break;
- case Mqtt::CMD_PUBREL:
- $message_id = $data['message_id'];
- if ($this->_options['debug']) {
- echo "<- Recv PUBREL package, message_id:$message_id", PHP_EOL;
- echo "-> Send PUBCOMP package, message_id:$message_id", PHP_EOL;
- }
- $this->sendPackage(array(
- 'cmd' => Mqtt::CMD_PUBCOMP,
- 'message_id' => $message_id
- ));
- break;
- case Mqtt::CMD_PUBACK:
- case Mqtt::CMD_PUBCOMP:
- $message_id = $data['message_id'];
- if ($this->_options['debug']) {
- echo "<- Recv ".($cmd == Mqtt::CMD_PUBACK ? 'PUBACK' : 'PUBCOMP') . " package, message_id:$message_id", PHP_EOL;
- }
- if (!empty($this->_outgoing[$message_id])) {
- if ($this->_options['debug']) {
- echo "-- Trigger PUB callback for message_id:$message_id", PHP_EOL;
- }
- $callback = $this->_outgoing[$message_id];
- unset($this->_outgoing[$message_id]);
- call_user_func($callback, null);
- }
- break;
- case Mqtt::CMD_SUBACK:
- case Mqtt::CMD_UNSUBACK:
- $message_id = $data['message_id'];
- if ($this->_options['debug']) {
- echo "<- Recv ".($cmd == Mqtt::CMD_SUBACK ? 'SUBACK' : 'UNSUBACK') . " package, message_id:$message_id", PHP_EOL;
- }
- $callback = isset($this->_outgoing[$message_id]) ? $this->_outgoing[$message_id] : null;
- unset($this->_outgoing[$message_id]);
- if ($callback) {
- if ($this->_options['debug']) {
- echo "-- Trigger ".($cmd == Mqtt::CMD_SUBACK ? 'SUB' : 'UNSUB') . " callback for message_id:$message_id", PHP_EOL;
- }
- if ($cmd === Mqtt::CMD_SUBACK) {
- call_user_func($callback, null, $data['codes']);
- } else {
- call_user_func($callback, null);
- }
- }
- break;
- case Mqtt::CMD_PINGRESP:
- $this->_recvPingResponse = true;
- if ($this->_options['debug']) {
- echo "<- Recv PINGRESP package", PHP_EOL;
- }
- break;
- default :
- echo "unknow cmd";
- }
- }
- /**
- * onConnectionClose
- */
- public function onConnectionClose()
- {
- if ($this->_options['debug']) {
- echo "-- Connection closed", PHP_EOL;
- }
- $this->cancelPingTimer();
- $this->cancelConnectionTimeout();
- $this->_recvPingResponse = true;
- $this->_state = static::STATE_DISCONNECT;
- if (!$this->_doNotReconnect && $this->_options['reconnect_period'] > 0) {
- $this->reConnect($this->_options['reconnect_period']);
- }
- $this->flushOutgoing();
- if ($this->onClose) {
- call_user_func($this->onClose, $this);
- }
- }
- /**
- * onConnectionError
- *
- * @param $connection
- * @param $code
- */
- public function onConnectionError($connection, $code)
- {
- // Connection error
- if ($code === 1) {
- $this->triggerError(102);
- // Send fail, connection closed
- } else {
- $this->triggerError(100);
- }
- }
- /**
- * onConnectionBufferFull
- */
- public function onConnectionBufferFull()
- {
- if ($this->_options['debug']) {
- echo "-- Connection buffer full and close connection", PHP_EOL;
- }
- $this->triggerError(103);
- $this->_connection->destroy();
- }
- /**
- * incrMessageId
- *
- * @return int
- */
- protected function incrMessageId()
- {
- $message_id = $this->_messageId++;
- if ($message_id >= 65535) {
- $this->_messageId = 1;
- }
- return $message_id;
- }
- /**
- * checkInvalidQos
- *
- * @param $qos
- * @return boolean
- */
- protected function checkInvalidQos($qos, $callback = null)
- {
- if ($qos !== 0 && $qos !== 1 && $qos !== 2) {
- $this->triggerError(241, $callback);
- return true;
- }
- return false;
- }
- /**
- * isValidTopic
- *
- * @param $topic
- * @return boolean
- */
- protected static function isValidTopic($topic)
- {
- if (!static::isString($topic)) {
- return false;
- }
- $topic_length = strlen($topic);
- if ($topic_length > static::MAX_TOPIC_LENGTH) {
- return false;
- }
- return true;
- }
- /**
- * validateTopics
- *
- * @param $topics
- * @return null|string
- */
- protected static function validateTopics($topics)
- {
- if (empty($topics)) {
- return 'array()';
- }
- foreach ($topics as $topic) {
- if(!static::isValidTopic($topic)) {
- return $topic;
- }
- }
- return null;
- }
- /**
- * is string.
- * @param $string
- * @return bool
- */
- protected static function isString($string) {
- return (is_string($string) || is_integer($string)) && strlen($string) > 0;
- }
- /**
- * triggerError
- *
- * @param $exception
- * @param $callback
- */
- protected function triggerError($code, $callback = null)
- {
- $exception = new \Exception(static::$_errorCodeStringMap[$code], $code);
- if ($this->_options['debug']) {
- echo "-- Error: ".$exception->getMessage() . PHP_EOL;
- }
- if (!$callback) {
- $callback = $this->onError ? $this->onError : function($exception){
- echo "Mqtt client: ", $exception->getMessage(), PHP_EOL;
- };
- }
- call_user_func($callback, $exception);
- }
- /**
- * createRandomClientId
- *
- * @return string
- */
- protected function createRandomClientId()
- {
- mt_srand();
- return static::DEFAULT_CLIENT_ID_PREFIX . '-' . mt_rand();
- }
- /**
- * addCheckTimeoutTimer
- */
- protected function setConnectionTimeout($timeout)
- {
- $this->cancelConnectionTimeout();
- $this->_checkConnectionTimeoutTimer = Timer::add($timeout, array($this, 'checkConnectTimeout'), null, false);
- }
- /**
- * cancelConnectionTimeout
- */
- protected function cancelConnectionTimeout()
- {
- if ($this->_checkConnectionTimeoutTimer) {
- Timer::del($this->_checkConnectionTimeoutTimer);
- $this->_checkConnectionTimeoutTimer = 0;
- }
- }
- /**
- * setPingTimer
- */
- protected function setPingTimer($ping_interval)
- {
- $this->cancelPingTimer();
- $connection = $this->_connection;
- $this->_pingTimer = Timer::add($ping_interval, function()use($connection){
- if (!$this->_recvPingResponse) {
- if ($this->_options['debug']) {
- echo "<- Recv PINGRESP timeout", PHP_EOL;
- echo "-> Close connection", PHP_EOL;
- }
- $this->_connection->destroy();
- return;
- }
- if ($this->_options['debug']) {
- echo "-> Send PINGREQ package", PHP_EOL;
- }
- $this->_recvPingResponse = false;
- $connection->send(array('cmd' => Mqtt::CMD_PINGREQ));
- });
- }
- /**
- * cancelPingTimer
- */
- protected function cancelPingTimer()
- {
- if ($this->_pingTimer) {
- Timer::del($this->_pingTimer);
- $this->_pingTimer = 0;
- }
- }
- /**
- * checkConnectTimeout
- */
- public function checkConnectTimeout()
- {
- if ($this->_state === static::STATE_CONNECTING || $this->_state === static::STATE_WAITCONACK) {
- $this->triggerError(101);
- $this->_connection->destroy();
- }
- }
- /**
- * checkDisconnecting
- *
- * @param null $callback
- * @return bool
- */
- protected function checkDisconnecting($callback = null)
- {
- if ($this->_state !== static::STATE_ESTABLISHED) {
- $this->triggerError(140, $callback);
- return true;
- }
- return false;
- }
- /**
- * flushOutgoing
- */
- protected function flushOutgoing()
- {
- foreach ($this->_outgoing as $message_id => $callback) {
- $this->triggerError(100, $callback);
- }
- $this->_outgoing = array();
- }
- /**
- * sendPackage
- *
- * @param $package
- */
- protected function sendPackage($package)
- {
- if ($this->checkDisconnecting()) {
- return;
- }
- $this->_connection->send($package);
- }
- /**
- * set options.
- *
- * @param $options
- * @throws \Exception
- */
- protected function setOptions($options)
- {
- if (isset($options['clean_session']) && !$options['clean_session']) {
- $this->_options['clean_session'] = 0;
- }
- if (isset($options['username'])) {
- if (!static::isString($options['username'])) {
- throw new \Exception('Bad username, expected string or integer but ' . gettype($options['username']) . ' provided.');
- }
- $this->_options['username'] = $options['username'];
- }
- if (isset($options['password'])) {
- if (!static::isString($options['password'])) {
- throw new \Exception('Bad password, expected string or integer but ' . gettype($options['password']) . ' provided.');
- }
- $this->_options['password'] = $options['password'];
- }
- if (isset($options['keepalive'])) {
- $keepalive = (int)$options['keepalive'];
- if (!static::isString($keepalive)) {
- throw new \Exception('Bad keepalive, expected integer but ' . gettype($keepalive) . ' provided.');
- }
- if ($keepalive < 0) {
- throw new \Exception('Bad keepalive, expected integer which not less than 0 but ' . $keepalive . ' provided.');
- }
- $this->_options['keepalive'] = $keepalive;
- }
- if (isset($options['protocol_name'])) {
- $protocol_name = $options['protocol_name'];
- if ($protocol_name !== 'MQTT' && $protocol_name !== 'MQIsdp') {
- throw new \Exception('Bad protocol_name of options, expected MQTT or MQIsdp but ' . $protocol_name . ' provided.');
- }
- $this->_options['protocol_name'] = $protocol_name;
- }
- if (isset($options['protocol_level'])) {
- $protocol_level = (int)$options['protocol_level'];
- if ($this->_options['protocol_name'] === 'MQTT' && $protocol_level !== 4) {
- throw new \Exception('Bad protocol_level of options, expected 4 for protocol_name MQTT but ' . $options['protocol_level'] . ' provided.');
- }
- if ($this->_options['protocol_name'] === 'MQIsdp' && $protocol_level !== 3) {
- throw new \Exception('Bad protocol_level of options, expected 3 for protocol_name MQTT but ' . $options['protocol_level'] . ' provided.');
- }
- $this->_options['protocol_level'] = $protocol_level;
- }
- if (isset($options['client_id'])) {
- if (!static::isString($options['client_id'])) {
- throw new \Exception('Bad client_id of options, expected string or integer but ' . gettype($options['client_id']) . ' provided.');
- }
- $this->_options['client_id'] = $options['client_id'];
- } else {
- $this->_options['client_id'] = $this->createRandomClientId();
- }
- if (isset($options['will'])) {
- $will = $options['will'];
- $required = array('qos', 'topic', 'content');
- foreach ($required as $key) {
- if (!isset($will[$key])) {
- throw new \Exception('Bad will options, $will['.$key.'] missing.');
- }
- }
- if (!static::isString($will['topic'])) {
- throw new \Exception('Bad $will[\'topic\'] of options, expected string or integer but ' . gettype($will['topic']) . ' provided.');
- }
- if (!static::isString($will['content'])) {
- throw new \Exception('Bad $will[\'content\'] of options, expected string or integer but ' . gettype($will['content']) . ' provided.');
- }
- if ($this->checkInvalidQos($will['qos'])) {
- throw new \Exception('Bad will qos:' . var_export($will['qos'], true));
- }
- $this->_options['will'] = $options['will'];
- }
- if (isset($options['reconnect_period'])) {
- $reconnect_period = (int)$options['reconnect_period'];
- if (!static::isString($reconnect_period)) {
- throw new \Exception('Bad reconnect_period of options, expected integer but ' . gettype($options['reconnect_period']) . ' provided.');
- }
- if ($reconnect_period < 0) {
- throw new \Exception('Bad reconnect_period, expected integer which not less than 0 but ' . $options['reconnect_period'] . ' provided.');
- }
- $this->_options['reconnect_period'] = $reconnect_period;
- }
- if (isset($options['connect_timeout'])) {
- $connect_timeout = (int)$options['connect_timeout'];
- if (!static::isString($connect_timeout)) {
- throw new \Exception('Bad connect_timeout of options, expected integer but ' . gettype($options['connect_timeout']) . ' provided.');
- }
- if ($connect_timeout <= 0) {
- throw new \Exception('Bad connect_timeout, expected integer which greater than 0 but ' . $options['connect_timeout'] . ' provided.');
- }
- $this->_options['connect_timeout'] = $connect_timeout;
- }
- if (isset($options['resubscribe']) && !$options['resubscribe']) {
- $this->_options['resubscribe'] = false;
- }
- if (!empty($options['bindto'])) {
- $this->_options['bindto'] = $options['bindto'];
- }
- if (isset($options['ssl'])) {
- $this->_options['ssl'] = $options['ssl'];
- }
- if (isset($options['debug'])) {
- $this->_options['debug'] = !empty($options['debug']);
- }
- }
- }
|