Просмотр исходного кода

Improve 'client.connect', 'client.connack' hooks (#3153)

JianBo He 6 лет назад
Родитель
Сommit
7d3a08dc13
4 измененных файлов с 57 добавлено и 9 удалено
  1. 11 9
      src/emqx_channel.erl
  2. 5 0
      src/emqx_mqtt_props.erl
  3. 31 0
      src/emqx_packet.erl
  4. 10 0
      test/emqx_packet_SUITE.erl

+ 11 - 9
src/emqx_channel.erl

@@ -558,19 +558,20 @@ handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = Co
                          fun enrich_server_keepalive/2,
                          fun enrich_assigned_clientid/2
                         ], #{}, Channel),
-    AckPacket = run_hooks('client.connack', [ConnInfo],
-                          ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps)),
-    return_connack(AckPacket,
-                   ensure_keepalive(AckProps,
+    NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps),
+
+    return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
+                   ensure_keepalive(NAckProps,
                                     ensure_connected(ConnPkt, Channel)));
 
 handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
+    Reason = emqx_reason_codes:name(ReasonCode),
+    AckProps = run_hooks('client.connack', [ConnInfo, Reason], emqx_mqtt_props:new()),
     AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of
                                     ?MQTT_PROTO_V5 -> ReasonCode;
                                     _ -> emqx_reason_codes:compat(connack, ReasonCode)
-                                end),
-    AckPacket1 = run_hooks('client.connack', [ConnInfo], AckPacket),
-    shutdown(emqx_reason_codes:name(ReasonCode), AckPacket1, Channel);
+                                end, sp(false), AckProps),
+    shutdown(Reason, AckPacket, Channel);
 
 %% Optimize?
 handle_out(publish, [], Channel) ->
@@ -944,9 +945,10 @@ receive_maximum(#{zone := Zone}, ConnProps) ->
 %% Run Connect Hooks
 
 run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) ->
-    case run_hooks('client.connect', [ConnInfo], ConnPkt) of
+    ConnProps = emqx_packet:info(properties, ConnPkt),
+    case run_hooks('client.connect', [ConnInfo], ConnProps) of
         Error = {error, _Reason} -> Error;
-        NConnPkt -> {ok, NConnPkt, Channel}
+        NConnProps -> {ok, emqx_packet:set_props(NConnProps, ConnPkt), Channel}
     end.
 
 %%--------------------------------------------------------------------

+ 5 - 0
src/emqx_mqtt_props.erl

@@ -23,6 +23,7 @@
         , name/1
         , filter/2
         , validate/1
+        , new/0
         ]).
 
 %% For tests
@@ -180,6 +181,10 @@ validate_value('Binary-Data', Val) ->
     is_binary(Val);
 validate_value(_Type, _Val) -> false.
 
+-spec(new() -> map()).
+new() ->
+    #{}.
+
 -spec(all() -> map()).
 all() -> ?PROPS_TABLE.
 

+ 31 - 0
src/emqx_packet.erl

@@ -31,6 +31,7 @@
 -export([ proto_name/1
         , proto_ver/1
         , info/2
+        , set_props/2
         ]).
 
 %% Check API
@@ -191,6 +192,36 @@ info(reason_code, #mqtt_packet_auth{reason_code = RC}) ->
 info(properties, #mqtt_packet_auth{properties = Props}) ->
     Props.
 
+set_props(Props, #mqtt_packet_connect{} = Pkt) ->
+    Pkt#mqtt_packet_connect{properties = Props};
+
+set_props(Props, #mqtt_packet_connack{} = Pkt) ->
+    Pkt#mqtt_packet_connack{properties = Props};
+
+set_props(Props, #mqtt_packet_publish{} = Pkt) ->
+    Pkt#mqtt_packet_publish{properties = Props};
+
+set_props(Props, #mqtt_packet_puback{} = Pkt) ->
+    Pkt#mqtt_packet_puback{properties = Props};
+
+set_props(Props, #mqtt_packet_subscribe{} = Pkt) ->
+    Pkt#mqtt_packet_subscribe{properties = Props};
+
+set_props(Props, #mqtt_packet_suback{} = Pkt) ->
+    Pkt#mqtt_packet_suback{properties = Props};
+
+set_props(Props, #mqtt_packet_unsubscribe{} = Pkt) ->
+    Pkt#mqtt_packet_unsubscribe{properties = Props};
+
+set_props(Props, #mqtt_packet_unsuback{} = Pkt) ->
+    Pkt#mqtt_packet_unsuback{properties = Props};
+
+set_props(Props, #mqtt_packet_disconnect{} = Pkt) ->
+    Pkt#mqtt_packet_disconnect{properties = Props};
+
+set_props(Props, #mqtt_packet_auth{} = Pkt) ->
+    Pkt#mqtt_packet_auth{properties = Props}.
+
 %%--------------------------------------------------------------------
 %% Check MQTT Packet
 %%--------------------------------------------------------------------

+ 10 - 0
test/emqx_packet_SUITE.erl

@@ -157,6 +157,16 @@ t_auth_info(_) ->
     ?assertEqual(0, emqx_packet:info(reason_code, AuthPkt)),
     ?assertEqual(undefined, emqx_packet:info(properties, AuthPkt)).
 
+t_set_props(_) ->
+    Pkts = [#mqtt_packet_connect{}, #mqtt_packet_connack{}, #mqtt_packet_publish{},
+            #mqtt_packet_puback{}, #mqtt_packet_subscribe{}, #mqtt_packet_suback{},
+            #mqtt_packet_unsubscribe{}, #mqtt_packet_unsuback{},
+            #mqtt_packet_disconnect{}, #mqtt_packet_auth{}],
+    Props = #{'A-Fake-Props' => true},
+    lists:foreach(fun(Pkt) ->
+        ?assertEqual(Props, emqx_packet:info(properties, emqx_packet:set_props(Props, Pkt)))
+    end, Pkts).
+
 t_check_publish(_) ->
     Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1},
     ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)),