| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 | 
							- <?php
 
- /* phpMQTT */
 
- class Mqtt {
 
-     private $socket;             /* holds the socket    */
 
-     private $msgid = 1;            /* counter for message id */
 
-     public $keepalive = 10;        /* default keepalive timmer */
 
-     public $timesinceping;        /* host unix time, used to detect disconects */
 
-     public $topics = array();     /* used to store currently subscribed topics */
 
-     public $debug = false;        /* should output debug messages */
 
-     public $address;            /* broker address */
 
-     public $port;                /* broker port */
 
-     public $clientid;            /* client id sent to brocker */
 
-     public $will;                /* stores the will of the client */
 
-     private $username;            /* stores username */
 
-     private $password;            /* stores password */
 
-     public $cafile;
 
-     function __construct($address, $port, $clientid, $cafile = NULL){
 
-         $this->broker($address, $port, $clientid, $cafile);
 
-     }
 
-     /* sets the broker details */
 
-     function broker($address, $port, $clientid, $cafile = NULL){
 
-         $this->address = $address;
 
-         $this->port = $port;
 
-         $this->clientid = $clientid;
 
-         $this->cafile = $cafile;
 
-     }
 
-     function connect_auto($clean = true, $will = NULL, $username = NULL, $password = NULL){
 
-         while($this->connect($clean, $will, $username, $password)==false){
 
-             sleep(10);
 
-         }
 
-         return true;
 
-     }
 
-     /* connects to the broker
 
-         inputs: $clean: should the client send a clean session flag */
 
-     function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){
 
-         if($will) $this->will = $will;
 
-         if($username) $this->username = $username;
 
-         if($password) $this->password = $password;
 
-         if ($this->cafile) {
 
-             $socketContext = stream_context_create(["ssl" => [
 
-                 "verify_peer_name" => true,
 
-                 "cafile" => $this->cafile
 
-             ]]);
 
-             $this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext);
 
-         } else {
 
-             $this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT);
 
-         }
 
-         if (!$this->socket ) {
 
-             if($this->debug) error_log("stream_socket_create() $errno, $errstr \n");
 
-             return false;
 
-         }
 
-         stream_set_timeout($this->socket, 5);
 
-         stream_set_blocking($this->socket, 0);
 
-         $i = 0;
 
-         $buffer = "";
 
-         $buffer .= chr(0x00); $i++;
 
-         $buffer .= chr(0x06); $i++;
 
-         $buffer .= chr(0x4d); $i++;
 
-         $buffer .= chr(0x51); $i++;
 
-         $buffer .= chr(0x49); $i++;
 
-         $buffer .= chr(0x73); $i++;
 
-         $buffer .= chr(0x64); $i++;
 
-         $buffer .= chr(0x70); $i++;
 
-         $buffer .= chr(0x03); $i++;
 
-         //No Will
 
-         $var = 0;
 
-         if($clean) $var+=2;
 
-         //Add will info to header
 
-         if($this->will != NULL){
 
-             $var += 4; // Set will flag
 
-             $var += ($this->will['qos'] << 3); //Set will qos
 
-             if($this->will['retain'])    $var += 32; //Set will retain
 
-         }
 
-         if($this->username != NULL) $var += 128;    //Add username to header
 
-         if($this->password != NULL) $var += 64;    //Add password to header
 
-         $buffer .= chr($var); $i++;
 
-         //Keep alive
 
-         $buffer .= chr($this->keepalive >> 8); $i++;
 
-         $buffer .= chr($this->keepalive & 0xff); $i++;
 
-         $buffer .= $this->strwritestring($this->clientid,$i);
 
-         //Adding will to payload
 
-         if($this->will != NULL){
 
-             $buffer .= $this->strwritestring($this->will['topic'],$i);
 
-             $buffer .= $this->strwritestring($this->will['content'],$i);
 
-         }
 
-         if($this->username) $buffer .= $this->strwritestring($this->username,$i);
 
-         if($this->password) $buffer .= $this->strwritestring($this->password,$i);
 
-         $head = "  ";
 
-         $head{0} = chr(0x10);
 
-         $head{1} = chr($i);
 
-         fwrite($this->socket, $head, 2);
 
-         fwrite($this->socket,  $buffer);
 
-         $string = $this->read(4);
 
-         if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){
 
-             if($this->debug) echo "Connected to Broker\n";
 
-         }else{
 
-             error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n",
 
-                 ord($string{0}),ord($string{3})));
 
-             return false;
 
-         }
 
-         $this->timesinceping = time();
 
-         return true;
 
-     }
 
-     /* read: reads in so many bytes */
 
-     function read($int = 8192, $nb = false){
 
-         //    print_r(socket_get_status($this->socket));
 
-         $string="";
 
-         $togo = $int;
 
-         if($nb){
 
-             return fread($this->socket, $togo);
 
-         }
 
-         while (!feof($this->socket) && $togo>0) {
 
-             $fread = fread($this->socket, $togo);
 
-             $string .= $fread;
 
-             $togo = $int - strlen($string);
 
-         }
 
-         return $string;
 
-     }
 
-     /* subscribe: subscribes to topics */
 
-     function subscribe($topics, $qos = 0){
 
-         $i = 0;
 
-         $buffer = "";
 
-         $id = $this->msgid;
 
-         $buffer .= chr($id >> 8);  $i++;
 
-         $buffer .= chr($id % 256);  $i++;
 
-         foreach($topics as $key => $topic){
 
-             $buffer .= $this->strwritestring($key,$i);
 
-             $buffer .= chr($topic["qos"]);  $i++;
 
-             $this->topics[$key] = $topic;
 
-         }
 
-         $cmd = 0x80;
 
-         //$qos
 
-         $cmd +=    ($qos << 1);
 
-         $head = chr($cmd);
 
-         $head .= chr($i);
 
-         fwrite($this->socket, $head, 2);
 
-         fwrite($this->socket, $buffer, $i);
 
-         $string = $this->read(2);
 
-         $bytes = ord(substr($string,1,1));
 
-         $string = $this->read($bytes);
 
-     }
 
-     /* ping: sends a keep alive ping */
 
-     function ping(){
 
-         $head = " ";
 
-         $head = chr(0xc0);
 
-         $head .= chr(0x00);
 
-         fwrite($this->socket, $head, 2);
 
-         if($this->debug) echo "ping sent\n";
 
-     }
 
-     /* disconnect: sends a proper disconect cmd */
 
-     function disconnect(){
 
-         $head = " ";
 
-         $head{0} = chr(0xe0);
 
-         $head{1} = chr(0x00);
 
-         fwrite($this->socket, $head, 2);
 
-     }
 
-     /* close: sends a proper disconect, then closes the socket */
 
-     function close(){
 
-         $this->disconnect();
 
-         stream_socket_shutdown($this->socket, STREAM_SHUT_WR);
 
-     }
 
-     /* publish: publishes $content on a $topic */
 
-     function publish($topic, $content, $qos = 0, $retain = 0){
 
-         $i = 0;
 
-         $buffer = "";
 
-         $buffer .= $this->strwritestring($topic,$i);
 
-         //$buffer .= $this->strwritestring($content,$i);
 
-         if($qos){
 
-             $id = $this->msgid++;
 
-             $buffer .= chr($id >> 8);  $i++;
 
-             $buffer .= chr($id % 256);  $i++;
 
-         }
 
-         $buffer .= $content;
 
-         $i+=strlen($content);
 
-         $head = " ";
 
-         $cmd = 0x30;
 
-         if($qos) $cmd += $qos << 1;
 
-         if($retain) $cmd += 1;
 
-         $head{0} = chr($cmd);
 
-         $head .= $this->setmsglength($i);
 
-         fwrite($this->socket, $head, strlen($head));
 
-         fwrite($this->socket, $buffer, $i);
 
-     }
 
-     /* message: processes a recieved topic */
 
-     function message($msg){
 
-         $tlen = (ord($msg{0})<<8) + ord($msg{1});
 
-         $topic = substr($msg,2,$tlen);
 
-         $msg = substr($msg,($tlen+2));
 
-         $found = 0;
 
-         foreach($this->topics as $key=>$top){
 
-             if( preg_match("/^".str_replace("#",".*",
 
-                     str_replace("+","[^\/]*",
 
-                         str_replace("/","\/",
 
-                             str_replace("$",'\$',
 
-                                 $key))))."$/",$topic) ){
 
-                 // if(is_callable($top['function'])){
 
-                 //     call_user_func($top['function'],$topic,$msg);
 
-                 //     $found = 1;
 
-                 // }
 
-                 return array('topic'=>$topic,'msg'=>$msg);
 
-             }
 
-         }
 
-         if($this->debug && !$found) echo "msg recieved but no match in subscriptions\n";
 
-     }
 
-     /* proc: the processing loop for an "allways on" client
 
-         set true when you are doing other stuff in the loop good for watching something else at the same time */
 
-     function proc( $loop = true){
 
-         if(1){
 
-             $sockets = array($this->socket);
 
-             $w = $e = NULL;
 
-             $cmd = 0;
 
-             //$byte = fgetc($this->socket);
 
-             if(feof($this->socket)){
 
-                 if($this->debug) echo "eof receive going to reconnect for good measure\n";
 
-                 fclose($this->socket);
 
-                 $this->connect_auto(false);
 
-                 if(count($this->topics))
 
-                     $this->subscribe($this->topics);
 
-             }
 
-             $byte = $this->read(1, true);
 
-             if(!strlen($byte)){
 
-                 if($loop){
 
-                     usleep(100000);
 
-                 }
 
-             }else{
 
-                 $cmd = (int)(ord($byte)/16);
 
-                 if($this->debug) echo "Recevid: $cmd\n";
 
-                 $multiplier = 1;
 
-                 $value = 0;
 
-                 do{
 
-                     $digit = ord($this->read(1));
 
-                     $value += ($digit & 127) * $multiplier;
 
-                     $multiplier *= 128;
 
-                 }while (($digit & 128) != 0);
 
-                 if($this->debug) echo "Fetching: $value\n";
 
-                 if($value)
 
-                     $string = $this->read($value);
 
-                 if($cmd){
 
-                     switch($cmd){
 
-                         case 3:
 
-                             return $this->message($string);
 
-                             // $tlen = (ord($string{0})<<8) + ord($string{1});
 
-                             // $topic = substr($string,2,$tlen);
 
-                             // $msg = substr($string,($tlen+2));
 
-                             // return $msg;
 
-                             // return array('topic'=>$topic,'msg'=>$msg);
 
-                             // return $string;
 
-                             break;
 
-                     }
 
-                     $this->timesinceping = time();
 
-                 }
 
-             }
 
-             if($this->timesinceping < (time() - $this->keepalive )){
 
-                 if($this->debug) echo "not found something so ping\n";
 
-                 $this->ping();
 
-             }
 
-             if($this->timesinceping<(time()-($this->keepalive*2))){
 
-                 if($this->debug) echo "not seen a package in a while, disconnecting\n";
 
-                 fclose($this->socket);
 
-                 $this->connect_auto(false);
 
-                 if(count($this->topics))
 
-                     $this->subscribe($this->topics);
 
-             }
 
-         }
 
-         return 1;
 
-     }
 
-     /* getmsglength: */
 
-     function getmsglength(&$msg, &$i){
 
-         $multiplier = 1;
 
-         $value = 0 ;
 
-         do{
 
-             $digit = ord($msg{$i});
 
-             $value += ($digit & 127) * $multiplier;
 
-             $multiplier *= 128;
 
-             $i++;
 
-         }while (($digit & 128) != 0);
 
-         return $value;
 
-     }
 
-     /* setmsglength: */
 
-     function setmsglength($len){
 
-         $string = "";
 
-         do{
 
-             $digit = $len % 128;
 
-             $len = $len >> 7;
 
-             // if there are more digits to encode, set the top bit of this digit
 
-             if ( $len > 0 )
 
-                 $digit = ($digit | 0x80);
 
-             $string .= chr($digit);
 
-         }while ( $len > 0 );
 
-         return $string;
 
-     }
 
-     /* strwritestring: writes a string to a buffer */
 
-     function strwritestring($str, &$i){
 
-         $ret = " ";
 
-         $len = strlen($str);
 
-         $msb = $len >> 8;
 
-         $lsb = $len % 256;
 
-         $ret = chr($msb);
 
-         $ret .= chr($lsb);
 
-         $ret .= $str;
 
-         $i += ($len+2);
 
-         return $ret;
 
-     }
 
-     function printstr($string){
 
-         $strlen = strlen($string);
 
-         for($j=0;$j<$strlen;$j++){
 
-             $num = ord($string{$j});
 
-             if($num > 31)
 
-                 $chr = $string{$j}; else $chr = " ";
 
-             printf("%4d: %08b : 0x%02x : %s \n",$j,$num,$num,$chr);
 
-         }
 
-     }
 
- }
 
 
  |