Преглед изворни кода

Pass paho mqtt interoperability tests

Feng Lee пре 7 година
родитељ
комит
68cfcf6e0e
8 измењених фајлова са 31 додато и 28 уклоњено
  1. 1 1
      etc/emqx.conf
  2. 0 2
      priv/emqx.schema
  3. 3 2
      src/emqx_connection.erl
  4. 0 1
      src/emqx_mqtt_caps.erl
  5. 5 1
      src/emqx_packet.erl
  6. 16 16
      src/emqx_protocol.erl
  7. 4 3
      src/emqx_session.erl
  8. 2 2
      src/emqx_trie.erl

+ 1 - 1
etc/emqx.conf

@@ -427,7 +427,7 @@ allow_anonymous = true
 ## TODO: Allow or deny if no ACL rules match.
 ## TODO: Allow or deny if no ACL rules match.
 ##
 ##
 ## Value: allow | deny
 ## Value: allow | deny
-acl_nomatch = deny
+acl_nomatch = allow
 
 
 ## Default ACL File.
 ## Default ACL File.
 ##
 ##

+ 0 - 2
priv/emqx.schema

@@ -653,12 +653,10 @@ end}.
 ]}.
 ]}.
 
 
 {mapping, "zone.$name.allow_anonymous", "emqx.zones", [
 {mapping, "zone.$name.allow_anonymous", "emqx.zones", [
-  {default, false},
   {datatype, {enum, [true, false]}}
   {datatype, {enum, [true, false]}}
 ]}.
 ]}.
 
 
 {mapping, "zone.$name.acl_nomatch", "emqx.zones", [
 {mapping, "zone.$name.acl_nomatch", "emqx.zones", [
-  {default, deny},
   {datatype, {enum, [allow, deny]}}
   {datatype, {enum, [allow, deny]}}
 ]}.
 ]}.
 
 

+ 3 - 2
src/emqx_connection.erl

@@ -121,8 +121,9 @@ send_fun(Transport, Socket, Peername) ->
     fun(Data) ->
     fun(Data) ->
         try Transport:async_send(Socket, Data) of
         try Transport:async_send(Socket, Data) of
             ok ->
             ok ->
-                ?LOG(debug, "SEND ~p", [Data], #state{peername = Peername}),
-                emqx_metrics:inc('bytes/sent', iolist_size(Data)), ok;
+                ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}),
+                emqx_metrics:inc('bytes/sent', iolist_size(Data)),
+                ok;
             Error -> Error
             Error -> Error
         catch
         catch
             error:Error ->
             error:Error ->

+ 0 - 1
src/emqx_mqtt_caps.erl

@@ -46,7 +46,6 @@
                       mqtt_retain_available]).
                       mqtt_retain_available]).
 -define(SUBCAP_KEYS, [max_qos_allowed,
 -define(SUBCAP_KEYS, [max_qos_allowed,
                       max_topic_levels,
                       max_topic_levels,
-                      mqtt_retain_available,
                       mqtt_shared_subscription,
                       mqtt_shared_subscription,
                       mqtt_wildcard_subscription]).
                       mqtt_wildcard_subscription]).
 
 

+ 5 - 1
src/emqx_packet.erl

@@ -94,7 +94,11 @@ to_message(ClientId, #mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLI
                                                                   properties = Props},
                                                                   properties = Props},
                                   payload  = Payload}) ->
                                   payload  = Payload}) ->
     Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
     Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
-    Msg#message{flags = #{dup => Dup, retain => Retain}, headers = Props};
+    Msg#message{flags = #{dup => Dup, retain => Retain},
+                headers = if
+                              Props =:= undefined -> #{};
+                              true -> Props
+                          end};
 
 
 to_message(_ClientId, #mqtt_packet_connect{will_flag = false}) ->
 to_message(_ClientId, #mqtt_packet_connect{will_flag = false}) ->
     undefined;
     undefined;

+ 16 - 16
src/emqx_protocol.erl

@@ -215,28 +215,28 @@ process(?CONNECT_PACKET(
 
 
     connack(
     connack(
       case check_connect(Connect, PState1) of
       case check_connect(Connect, PState1) of
-          ok ->
-              case authenticate(client(PState1), Password) of
+          {ok, PState2} ->
+              case authenticate(client(PState2), Password) of
                   {ok, IsSuper} ->
                   {ok, IsSuper} ->
                       %% Maybe assign a clientId
                       %% Maybe assign a clientId
-                      PState2 = maybe_assign_client_id(PState1#pstate{is_super = IsSuper}),
+                      PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
                       %% Open session
                       %% Open session
-                      case try_open_session(PState2) of
+                      case try_open_session(PState3) of
                           {ok, SPid, SP} ->
                           {ok, SPid, SP} ->
-                              PState3 = PState2#pstate{session = SPid},
-                              ok = emqx_cm:register_client({client_id(PState3), self()}, info(PState3)),
+                              PState4 = PState3#pstate{session = SPid},
+                              ok = emqx_cm:register_client({client_id(PState4), self()}, info(PState4)),
                               %% Start keepalive
                               %% Start keepalive
-                              start_keepalive(Keepalive, PState3),
+                              start_keepalive(Keepalive, PState4),
                               %% TODO: 'Run hooks' before open_session?
                               %% TODO: 'Run hooks' before open_session?
-                              emqx_hooks:run('client.connected', [?RC_SUCCESS], client(PState3)),
+                              emqx_hooks:run('client.connected', [?RC_SUCCESS], client(PState4)),
                               %% Success
                               %% Success
-                              {?RC_SUCCESS, SP, replvar(PState3)};
+                              {?RC_SUCCESS, SP, replvar(PState4)};
                           {error, Error} ->
                           {error, Error} ->
                               ?LOG(error, "Failed to open session: ~p", [Error], PState1),
                               ?LOG(error, "Failed to open session: ~p", [Error], PState1),
                               {?RC_UNSPECIFIED_ERROR, PState1}
                               {?RC_UNSPECIFIED_ERROR, PState1}
                     end;
                     end;
                   {error, Reason} ->
                   {error, Reason} ->
-                      ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], PState1),
+                      ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], PState2),
                       {?RC_NOT_AUTHORIZED, PState1}
                       {?RC_NOT_AUTHORIZED, PState1}
               end;
               end;
           {error, ReasonCode} ->
           {error, ReasonCode} ->
@@ -245,8 +245,8 @@ process(?CONNECT_PACKET(
 
 
 process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
 process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
     case check_publish(Packet, PState) of
     case check_publish(Packet, PState) of
-        ok ->
-            do_publish(Packet, PState);
+        {ok, PState1} ->
+            do_publish(Packet, PState1);
         {error, ReasonCode} ->
         {error, ReasonCode} ->
             ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, ReasonCode], PState),
             ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, ReasonCode], PState),
             {ok, PState}
             {ok, PState}
@@ -254,16 +254,16 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
 
 
 process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) ->
 process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) ->
     case check_publish(Packet, PState) of
     case check_publish(Packet, PState) of
-        ok ->
-            do_publish(Packet, PState);
+        {ok, PState1} ->
+            do_publish(Packet, PState1);
         {error, ReasonCode} ->
         {error, ReasonCode} ->
             deliver({puback, PacketId, ReasonCode}, PState)
             deliver({puback, PacketId, ReasonCode}, PState)
     end;
     end;
 
 
 process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) ->
 process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) ->
     case check_publish(Packet, PState) of
     case check_publish(Packet, PState) of
-        ok ->
-            do_publish(Packet, PState);
+        {ok, PState1} ->
+            do_publish(Packet, PState1);
         {error, ReasonCode} ->
         {error, ReasonCode} ->
             deliver({pubrec, PacketId, ReasonCode}, PState)
             deliver({pubrec, PacketId, ReasonCode}, PState)
     end;
     end;

+ 4 - 3
src/emqx_session.erl

@@ -310,7 +310,7 @@ close(SPid) ->
 
 
 init(#{zone        := Zone,
 init(#{zone        := Zone,
        client_id   := ClientId,
        client_id   := ClientId,
-       conn_pid    := ClientPid,
+       client_pid  := ClientPid,
        clean_start := CleanStart,
        clean_start := CleanStart,
        username    := Username,
        username    := Username,
        %% TODO:
        %% TODO:
@@ -469,7 +469,7 @@ handle_cast({pubrec, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
     end;
     end;
 
 
 %% PUBREL:
 %% PUBREL:
-handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
+handle_cast({pubrel, PacketId, _ReasonCode}, State = #state{awaiting_rel = AwaitingRel}) ->
     {noreply,
     {noreply,
      case maps:take(PacketId, AwaitingRel) of
      case maps:take(PacketId, AwaitingRel) of
          {Msg, AwaitingRel1} ->
          {Msg, AwaitingRel1} ->
@@ -503,7 +503,7 @@ handle_cast({resume, ClientPid},
                            await_rel_timer = AwaitTimer,
                            await_rel_timer = AwaitTimer,
                            expiry_timer    = ExpireTimer}) ->
                            expiry_timer    = ExpireTimer}) ->
 
 
-    ?LOG(info, "Resumed by ~p", [ClientPid], State),
+    ?LOG(info, "Resumed by ~p ", [ClientPid], State),
 
 
     %% Cancel Timers
     %% Cancel Timers
     lists:foreach(fun emqx_misc:cancel_timer/1,
     lists:foreach(fun emqx_misc:cancel_timer/1,
@@ -649,6 +649,7 @@ retry_delivery(Force, State = #state{inflight = Inflight}) ->
             State;
             State;
         false ->
         false ->
             Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
             Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
+            io:format("!!! Retry Delivery: ~p~n", [Msgs]),
             retry_delivery(Force, Msgs, os:timestamp(), State)
             retry_delivery(Force, Msgs, os:timestamp(), State)
     end.
     end.
 
 

+ 2 - 2
src/emqx_trie.erl

@@ -118,8 +118,8 @@ add_path({Node, Word, Child}) ->
 
 
 %% @private
 %% @private
 %% @doc Match node with word or '+'.
 %% @doc Match node with word or '+'.
-match_node(root, [<<"$SYS">>|Words]) ->
-    match_node(<<"$SYS">>, Words, []);
+match_node(root, [NodeId = <<$$, _/binary>>|Words]) ->
+    match_node(NodeId, Words, []);
 
 
 match_node(NodeId, Words) ->
 match_node(NodeId, Words) ->
     match_node(NodeId, Words, []).
     match_node(NodeId, Words, []).