| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 |
- <?php
- /**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
- namespace Workerman\Protocols;
- /**
- * Mqtt Protocol.
- *
- * @author walkor<walkor@workerman.net>
- */
- class Mqtt
- {
- /**
- * CONNECT Packet.
- */
- const CMD_CONNECT = 1;
- /**
- * CONNACK
- */
- const CMD_CONNACK = 2;
- /**
- * PUBLISH
- */
- const CMD_PUBLISH = 3;
- /**
- * PUBACK
- */
- const CMD_PUBACK = 4;
- /**
- * PUBREC
- */
- const CMD_PUBREC = 5;
- /**
- * PUBREL
- */
- const CMD_PUBREL = 6;
- /**
- * PUBCOMP
- */
- const CMD_PUBCOMP = 7;
- /**
- * SUBSCRIBE
- */
- const CMD_SUBSCRIBE = 8;
- /**
- * SUBACK
- */
- const CMD_SUBACK = 9;
- /**
- * UNSUBSCRIBE
- */
- const CMD_UNSUBSCRIBE = 10;
- /**
- * UNSUBACK
- */
- const CMD_UNSUBACK = 11;
- /**
- * PINGREQ
- */
- const CMD_PINGREQ = 12;
- /**
- * PINGRESP
- */
- const CMD_PINGRESP = 13;
- /**
- * DISCONNECT
- */
- const CMD_DISCONNECT = 14;
- /**
- * Check the integrity of the package.
- *
- * @param string $buffer
- * @return int
- */
- public static function input($buffer)
- {
- $length = strlen($buffer);
- $body_length = static::getBodyLength($buffer, $head_bytes);
- $total_length = $body_length + $head_bytes;
- if ($length < $total_length) {
- return 0;
- }
- return $total_length;
- }
- /**
- * Encode.
- *
- * @param string $buffer
- * @return string
- */
- public static function encode($data)
- {
- $cmd = $data['cmd'];
- switch ($data['cmd']) {
- // ['cmd'=>1, 'clean_session'=>x, 'will'=>['qos'=>x, 'retain'=>x, 'topic'=>x, 'content'=>x],'username'=>x, 'password'=>x, 'keepalive'=>x, 'protocol_name'=>x, 'protocol_level'=>x, 'client_id' => x]
- case static::CMD_CONNECT;
- $body = self::packString($data['protocol_name']).chr($data['protocol_level']);
- $connect_flags = 0;
- if (!empty($data['clean_session'])) {
- $connect_flags |= 1 << 1;
- }
- if (!empty($data['will'])) {
- $connect_flags |= 1 << 2;
- $connect_flags |= $data['will']['qos'] << 3;
- if ($data['will']['retain']) {
- $connect_flags |= 1 << 5;
- }
- }
- if (!empty($data['password'])) {
- $connect_flags |= 1 << 6;
- }
- if (!empty($data['username'])) {
- $connect_flags |= 1 << 7;
- }
- $body .= chr($connect_flags);
- $keepalive = !empty($data['keepalive']) && (int)$data['keepalive'] >= 0 ? (int)$data['keepalive'] : 0;
- $body .= pack('n', $keepalive);
- $body .= static::packString($data['client_id']);
- if (!empty($data['will'])) {
- $body .= static::packString($data['will']['topic']);
- $body .= static::packString($data['will']['content']);
- }
- if(!empty($data['username']) || $data['username'] === '0') {
- $body .= static::packString($data['username']);
- }
- if(!empty($data['password']) || $data['password'] === '0') {
- $body .= static::packString($data['password']);
- }
- $head = self::packHead($cmd, strlen($body));
- return $head.$body;
- //['cmd'=>2, 'session_present'=>0/1, 'code'=>x]
- case static::CMD_CONNACK:
- $body = !empty($data['session_present']) ? chr(1) : chr(0);
- $code = !empty($data['code']) ? $data['code'] : 0;
- $body .= chr($code);
- $head = static::packHead($cmd, strlen($body));
- return $head.$body;
- // ['cmd'=>3, 'message_id'=>x, 'topic'=>x, 'content'=>x, 'qos'=>0/1/2, 'dup'=>0/1, 'retain'=>0/1]
- case static::CMD_PUBLISH:
- $body = static::packString($data['topic']);
- $qos = isset($data['qos']) ? $data['qos'] : 0;
- if ($qos) {
- $body .= pack('n', $data['message_id']);
- }
- $body .= $data['content'];
- $dup = isset($data['dup']) ? $data['dup'] : 0;
- $retain = isset($data['retain']) ? $data['retain'] : 0;
- $head = static::packHead($cmd, strlen($body), $dup, $qos, $retain);
- return $head.$body;
- // ['cmd'=>x, 'message_id'=>x]
- case static::CMD_PUBACK:
- case static::CMD_PUBREC:
- case static::CMD_PUBREL:
- case static::CMD_PUBCOMP:
- $body = pack('n', $data['message_id']);
- if ($cmd === static::CMD_PUBREL) {
- $head = static::packHead($cmd, strlen($body), 0, 1);
- } else {
- $head = static::packHead($cmd, strlen($body));
- }
- return $head.$body;
- // ['cmd'=>8, 'message_id'=>x, 'topics'=>[topic=>qos, ..]]]
- case static::CMD_SUBSCRIBE:
- $id = $data['message_id'];
- $body = pack('n', $id);
- foreach($data['topics'] as $topic => $qos){
- $body .= self::packString($topic);
- $body .= chr($qos);
- }
- $head = static::packHead($cmd, strlen($body), 0, 1);
- return $head.$body;
- // ['cmd'=>9, 'message_id'=>x, 'codes'=>[x,x,..]]
- case static::CMD_SUBACK:
- $codes = $data['codes'];
- $body = pack('n', $data['message_id']).call_user_func_array('pack', array_merge(array('C*'), $codes));
- $head = static::packHead($cmd, strlen($body));
- return $head.$body;
- // ['cmd' => 10, 'message_id' => $message_id, 'topics' => $topics];
- case static::CMD_UNSUBSCRIBE:
- $body = pack('n', $data['message_id']);
- foreach ($data['topics'] as $topic) {
- $body .= static::packString($topic);
- }
- $head = static::packHead($cmd, strlen($body), 0, 1);
- return $head . $body;
- // ['cmd'=>11, 'message_id'=>x]
- case static::CMD_UNSUBACK:
- $body = pack('n', $data['message_id']);
- $head = static::packHead($cmd, strlen($body));
- return $head.$body;
- // ['cmd'=>x]
- case static::CMD_PINGREQ;
- case static::CMD_PINGRESP:
- case static::CMD_DISCONNECT:
- return static::packHead($cmd, 0);
- }
- }
- /**
- * Decode.
- *
- * @param string $buffer
- * @return string
- */
- public static function decode($buffer)
- {
- $cmd = static::getCmd($buffer);
- $body = static::getBody($buffer);
- switch ($cmd) {
- case static::CMD_CONNECT:
- $protocol_name = static::readString($body);
- $protocol_level = ord($body[0]);
- $clean_session = ord($body[1]) >> 1 & 0x1;
- $will_flag = ord($body[1]) >> 2 & 0x1;
- $will_qos = ord($body[1]) >> 3 & 0x3;
- $will_retain = ord($body[1]) >> 5 & 0x1;
- $password_flag = ord($body[1]) >> 6 & 0x1;
- $username_flag = ord($body[1]) >> 7 & 0x1;
- $body = substr($body, 2);
- $tmp = unpack('n', $body, $body);
- $keepalive = $tmp[1];
- $body = substr($body, 2);
- $client_id = static::readString($body);
- if ($will_flag) {
- $will_topic = static::readString($body);
- $will_content = static::readString($body);
- }
- $username = $password = '';
- if ($username_flag) {
- $username = static::readString($body);
- }
- if ($password_flag) {
- $password = static::readString($body);
- }
- // ['cmd'=>1, 'clean_session'=>x, 'will'=>['qos'=>x, 'retain'=>x, 'topic'=>x, 'content'=>x],'username'=>x, 'password'=>x, 'keepalive'=>x, 'protocol_name'=>x, 'protocol_level'=>x, 'client_id' => x]
- $package = array(
- 'cmd' => $cmd,
- 'protocol_name' => $protocol_name,
- 'protocol_level' => $protocol_level,
- 'clean_session' => $clean_session,
- 'will' => array(),
- 'username' => $username,
- 'password' => $password,
- 'keepalive' => $keepalive,
- 'client_id' => $client_id,
- );
- if ($will_flag) {
- $package['will'] = array(
- 'qos' => $will_qos,
- 'retain' => $will_retain,
- 'topic' => $will_topic,
- 'content' => $will_content
- );
- } else {
- unset($package['will']);
- }
- return $package;
- case static::CMD_CONNACK:
- $session_present = ord($body[0]) & 0x01;
- $code = ord($body[1]);
- return array('cmd' => $cmd, 'session_present' => $session_present, 'code' => $code);
- case static::CMD_PUBLISH:
- $dup = ord($buffer[0]) >> 3 & 0x1;
- $qos = ord($buffer[0]) >> 1 & 0x3;
- $retain = ord($buffer[0]) & 0x1;
- $topic = static::readString($body);
- if ($qos) {
- $message_id = static::readShortInt($body);
- }
- $package = array('cmd' => $cmd, 'topic' => $topic, 'content' => $body, 'dup' => $dup, 'qos' => $qos, 'retain' => $retain);
- if ($qos) {
- $package['message_id'] = $message_id;
- }
- return $package;
- case static::CMD_PUBACK:
- case static::CMD_PUBREC:
- case static::CMD_PUBREL:
- case static::CMD_PUBCOMP:
- $message_id = static::readShortInt($body);
- return array('cmd' => $cmd, 'message_id' => $message_id);
- case static::CMD_SUBSCRIBE:
- $message_id = static::readShortInt($body);
- $topics = array();
- while ($body) {
- $topic = static::readString($body);
- $qos = ord($body[0]);
- $topics[$topic] = $qos;
- $body = substr($body, 1);
- }
- return array('cmd' => $cmd, 'message_id' => $message_id, 'topics' => $topics);
- case static::CMD_SUBACK:
- $message_id = static::readShortInt($body);
- $tmp = unpack('C*', $body);
- $codes = array_values($tmp);
- return array('cmd' => $cmd, 'message_id'=> $message_id, 'codes' => $codes);
- case static::CMD_UNSUBSCRIBE:
- $message_id = static::readShortInt($body);
- $topics = array();
- while ($body) {
- $topic = static::readString($body);
- $topics[] = $topic;
- }
- return array('cmd' => $cmd, 'message_id' => $message_id, 'topics' => $topics);
- case static::CMD_UNSUBACK:
- $message_id = static::readShortInt($body);
- return array('cmd' => $cmd, 'message_id' => $message_id);
- case static::CMD_PINGREQ:
- case static::CMD_PINGRESP:
- case static::CMD_DISCONNECT:
- return array('cmd' => $cmd);
- }
- return $buffer;
- }
- /**
- * Pack string.
- *
- * @param $str
- * @return string
- */
- public static function packString($str){
- $len = strlen($str);
- return pack('n', $len).$str;
- }
- /**
- * Write body length.
- *
- * @param $len
- * @return string
- */
- protected static function writeBodyLength($len)
- {
- $string = "";
- do{
- $digit = $len % 128;
- $len = $len >> 7;
- // if there are more digits to encode, set the top bit of this digit
- if ( $len > 0 )
- $digit = ($digit | 0x80);
- $string .= chr($digit);
- }while ( $len > 0 );
- return $string;
- }
- /**
- * Get cmd.
- *
- * @param $buffer
- * @return int
- */
- public static function getCmd($buffer)
- {
- return ord($buffer[0]) >> 4;
- }
- /**
- * Get body length.
- *
- * @param $buffer
- * @param $head_bytes
- * @return int
- */
- public static function getBodyLength($buffer, &$head_bytes){
- $head_bytes = $multiplier = 1;
- $value = 0;
- do{
- if (!isset($buffer[$head_bytes])) {
- $head_bytes = 0;
- return 0;
- }
- $digit = ord($buffer[$head_bytes]);
- $value += ($digit & 127) * $multiplier;
- $multiplier *= 128;
- $head_bytes++;
- }while (($digit & 128) != 0);
- return $value;
- }
- /**
- * Get body.
- *
- * @param $buffer
- * @return string
- */
- public static function getBody($buffer)
- {
- $body_length = static::getBodyLength($buffer, $head_bytes);
- $buffer = substr($buffer, $head_bytes, $body_length);
- return $buffer;
- }
- /**
- * Read string from buffer.
- * @param $buffer
- * @return string
- */
- public static function readString(&$buffer) {
- $tmp = unpack('n', $buffer);
- $length = $tmp[1];
- if ($length + 2 > strlen($buffer)) {
- echo "buffer:".bin2hex($buffer)." lenth:$length not enough for unpackString\n";
- }
- $string = substr($buffer, 2, $length);
- $buffer = substr($buffer, $length + 2);
- return $string;
- }
- /**
- * Read unsigned short int from buffer.
- * @param $buffer
- * @return mixed
- */
- public static function readShortInt(&$buffer) {
- $tmp = unpack('n', $buffer);
- $buffer = substr($buffer, 2);
- return $tmp[1];
- }
- /**
- * packHead.
- * @param $cmd
- * @param $body_length
- * @param int $dup
- * @param int $qos
- * @param int $retain
- * @return string
- */
- public static function packHead($cmd, $body_length, $dup = 0, $qos = 0, $retain = 0)
- {
- $cmd = $cmd << 4;
- if ($dup) {
- $cmd |= 1 << 3;
- }
- if ($qos) {
- $cmd |= $qos << 1;
- }
- if ($retain) {
- $cmd |= 1;
- }
- return chr($cmd).static::writeBodyLength($body_length);
- }
- }
|