Bläddra i källkod

feature(mqtt): support response information (#3533)

tigercl 5 år sedan
förälder
incheckning
1b6a586948
5 ändrade filer med 70 tillägg och 3 borttagningar
  1. 15 0
      etc/emqx.conf
  2. 12 0
      priv/emqx.schema
  3. 11 0
      src/emqx_channel.erl
  4. 6 0
      src/emqx_zone.erl
  5. 26 3
      test/emqx_channel_SUITE.erl

+ 15 - 0
etc/emqx.conf

@@ -663,6 +663,11 @@ mqtt.ignore_loop_deliver = false
 ## Value: true | false
 mqtt.strict_mode = false
 
+## Specify the response information returned to the client
+## 
+## Value: String
+## mqtt.response_information = example
+
 ##--------------------------------------------------------------------
 ## Zones
 ##--------------------------------------------------------------------
@@ -868,6 +873,11 @@ zone.external.ignore_loop_deliver = false
 ## Value: true | false
 zone.external.strict_mode = false
 
+## Specify the response information returned to the client
+## 
+## Value: String
+## zone.external.response_information = example
+
 ##--------------------------------------------------------------------
 ## Internal Zone
 
@@ -954,6 +964,11 @@ zone.internal.ignore_loop_deliver = false
 ## Value: true | false
 zone.internal.strict_mode = false
 
+## Specify the response information returned to the client
+## 
+## Value: String
+## zone.internal.response_information = example
+
 ## Allow the zone's clients to bypass authentication step
 ##
 ## Value: true | false

+ 12 - 0
priv/emqx.schema

@@ -805,6 +805,11 @@ end}.
   {datatype, {enum, [true, false]}}
 ]}.
 
+%% @doc Specify the response information returned to the client
+{mapping, "mqtt.response_information", "emqx.response_information", [
+  {datatype, string}
+]}.
+
 %%--------------------------------------------------------------------
 %% Zones
 %%--------------------------------------------------------------------
@@ -1019,6 +1024,11 @@ end}.
   {datatype, {enum, [true, false]}}
 ]}.
 
+%% @doc Specify the response information returned to the client
+{mapping, "zone.$name.response_information", "emqx.zones", [
+  {datatype, string}
+]}.
+
 %% @doc Whether to bypass the authentication step
 {mapping, "zone.$name.bypass_auth_plugins", "emqx.zones", [
   {default, false},
@@ -1079,6 +1089,8 @@ end}.
                     end;
                ("mountpoint", Val) ->	
                    {mountpoint, iolist_to_binary(Val)};
+               ("response_information", Val) ->
+                   {response_information, iolist_to_binary(Val)};
                (Opt, Val) ->
                     {list_to_atom(Opt), Val}
             end,

+ 11 - 0
src/emqx_channel.erl

@@ -676,6 +676,7 @@ not_nacked({deliver, _Topic, Msg}) ->
 handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) ->
     AckProps = run_fold([fun enrich_connack_caps/2,
                          fun enrich_server_keepalive/2,
+                         fun enrich_response_information/2,
                          fun enrich_assigned_clientid/2
                         ], Props, Channel),
     NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps),
@@ -1393,6 +1394,16 @@ enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) ->
         Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
     end.
 
+%%--------------------------------------------------------------------
+%% Enrich response information
+
+enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps},
+                                               clientinfo = #{zone := Zone}}) ->
+    case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of
+        0 -> AckProps;
+        1 -> AckProps#{'Response-Information' => emqx_zone:response_information(Zone)}
+    end.
+
 %%--------------------------------------------------------------------
 %% Enrich Assigned ClientId
 

+ 6 - 0
src/emqx_zone.erl

@@ -45,6 +45,7 @@
           , session_expiry_interval/1
           , force_gc_policy/1
           , force_shutdown_policy/1
+          , response_information/1
           , get_env/2
           , get_env/3
           ]}).
@@ -72,6 +73,7 @@
         , session_expiry_interval/1
         , force_gc_policy/1
         , force_shutdown_policy/1
+        , response_information/1
         ]).
 
 -export([ init_gc_state/1
@@ -204,6 +206,10 @@ force_gc_policy(Zone) ->
 force_shutdown_policy(Zone) ->
     get_env(Zone, force_shutdown_policy).
 
+-spec(response_information(zone()) -> string()).
+response_information(Zone) ->
+    get_env(Zone, response_information).
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------

+ 26 - 3
test/emqx_channel_SUITE.erl

@@ -58,7 +58,8 @@ end_per_suite(_Config) ->
                  emqx_session,
                  emqx_broker,
                  emqx_hooks,
-                 emqx_cm
+                 emqx_cm,
+                 emqx_zone
                 ]).
 
 init_per_testcase(_TestCase, Config) ->
@@ -394,6 +395,27 @@ t_handle_out_connack_sucess(_) ->
         emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()),
     ?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
 
+t_handle_out_connack_response_information(_) ->
+    ok = meck:expect(emqx_cm, open_session,
+                     fun(true, _ClientInfo, _ConnInfo) ->
+                             {ok, #{session => session(), present => false}}
+                     end),
+    ok = meck:expect(emqx_zone, response_information, fun(_) -> test end),
+    IdleChannel = channel(#{conn_state => idle}),
+    {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}], _} =
+        emqx_channel:handle_in(?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 1})), IdleChannel).
+
+t_handle_out_connack_not_response_information(_) ->
+    ok = meck:expect(emqx_cm, open_session,
+                     fun(true, _ClientInfo, _ConnInfo) ->
+                             {ok, #{session => session(), present => false}}
+                     end),
+    ok = meck:expect(emqx_zone, response_information, fun(_) -> test end),
+    IdleChannel = channel(#{conn_state => idle}),
+    {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} =
+        emqx_channel:handle_in(?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 0})), IdleChannel),
+    ?assertEqual(false, maps:is_key('Response-Information', AckProps)).
+
 t_handle_out_connack_failure(_) ->
     {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} =
         emqx_channel:handle_out(connack, ?RC_NOT_AUTHORIZED, channel()).
@@ -639,14 +661,15 @@ clientinfo(InitProps) ->
 topic_filters() ->
     [{<<"+">>, ?DEFAULT_SUBOPTS}, {<<"#">>, ?DEFAULT_SUBOPTS}].
 
-connpkt() ->
+connpkt() -> connpkt(#{}).
+connpkt(Props) ->
     #mqtt_packet_connect{
        proto_name  = <<"MQTT">>,
        proto_ver   = ?MQTT_PROTO_V4,
        is_bridge   = false,
        clean_start = true,
        keepalive   = 30,
-       properties  = #{},
+       properties  = Props,
        clientid    = <<"clientid">>,
        username    = <<"username">>,
        password    = <<"passwd">>