Просмотр исходного кода

test: add tests for publishing lwt when deny_action is disconnect

Thales Macedo Garitezi 3 лет назад
Родитель
Сommit
dca522d7d3
2 измененных файлов с 79 добавлено и 13 удалено
  1. 8 13
      apps/emqx/src/emqx_channel.erl
  2. 71 0
      apps/emqx_authz/test/emqx_authz_SUITE.erl

+ 8 - 13
apps/emqx/src/emqx_channel.erl

@@ -354,12 +354,14 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
         {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
         {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
             ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
             ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
             NChannel1 = NChannel#channel{
             NChannel1 = NChannel#channel{
-                will_msg = emqx_packet:will_msg(NConnPkt),
                 alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
                 alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
             },
             },
             case authenticate(?CONNECT_PACKET(NConnPkt), NChannel1) of
             case authenticate(?CONNECT_PACKET(NConnPkt), NChannel1) of
                 {ok, Properties, NChannel2} ->
                 {ok, Properties, NChannel2} ->
-                    process_connect(Properties, NChannel2);
+                    %% only store will_msg after successful authn
+                    %% fix for: https://github.com/emqx/emqx/issues/8886
+                    NChannel3 = NChannel2#channel{will_msg = emqx_packet:will_msg(NConnPkt)},
+                    process_connect(Properties, NChannel3);
                 {continue, Properties, NChannel2} ->
                 {continue, Properties, NChannel2} ->
                     handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
                     handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
                 {error, ReasonCode} ->
                 {error, ReasonCode} ->
@@ -1438,20 +1440,13 @@ terminate({shutdown, Reason}, Channel) when
 ->
 ->
     run_terminate_hook(Reason, Channel);
     run_terminate_hook(Reason, Channel);
 terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
 terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
-    should_publish_will_message(Reason, Channel) andalso publish_will_msg(WillMsg),
+    %% since will_msg is set to undefined as soon as it is published,
+    %% if will_msg still exists when the session is terminated, it
+    %% must be published immediately.
+    WillMsg =/= undefined andalso publish_will_msg(WillMsg),
     (Reason =:= expired) andalso persist_if_session(Channel),
     (Reason =:= expired) andalso persist_if_session(Channel),
     run_terminate_hook(Reason, Channel).
     run_terminate_hook(Reason, Channel).
 
 
-should_publish_will_message(TerminateReason, Channel) ->
-    not lists:member(TerminateReason, [
-        {shutdown, kicked},
-        {shutdown, discarded},
-        {shutdown, takenover},
-        {shutdown, not_authorized}
-    ]) andalso
-        not lists:member(info(conn_state, Channel), [idle, connecting]) andalso
-        info(will_msg, Channel) =/= undefined.
-
 persist_if_session(#channel{session = Session} = Channel) ->
 persist_if_session(#channel{session = Session} = Channel) ->
     case emqx_session:is_session(Session) of
     case emqx_session:is_session(Session) of
         true ->
         true ->

+ 71 - 0
apps/emqx_authz/test/emqx_authz_SUITE.erl

@@ -19,6 +19,8 @@
 -compile(export_all).
 -compile(export_all).
 
 
 -include("emqx_authz.hrl").
 -include("emqx_authz.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx/include/emqx_placeholder.hrl").
 -include_lib("emqx/include/emqx_placeholder.hrl").
@@ -61,10 +63,26 @@ end_per_suite(_Config) ->
     meck:unload(emqx_resource),
     meck:unload(emqx_resource),
     ok.
     ok.
 
 
+init_per_testcase(TestCase, Config) when
+    TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament;
+    TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament
+->
+    {ok, _} = emqx_authz:update(?CMD_REPLACE, []),
+    {ok, _} = emqx:update_config([authorization, deny_action], disconnect),
+    Config;
 init_per_testcase(_, Config) ->
 init_per_testcase(_, Config) ->
     {ok, _} = emqx_authz:update(?CMD_REPLACE, []),
     {ok, _} = emqx_authz:update(?CMD_REPLACE, []),
     Config.
     Config.
 
 
+end_per_testcase(TestCase, _Config) when
+    TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament;
+    TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament
+->
+    {ok, _} = emqx:update_config([authorization, deny_action], ignore),
+    ok;
+end_per_testcase(_TestCase, _Config) ->
+    ok.
+
 set_special_configs(emqx_authz) ->
 set_special_configs(emqx_authz) ->
     {ok, _} = emqx:update_config([authorization, cache, enable], false),
     {ok, _} = emqx:update_config([authorization, cache, enable], false),
     {ok, _} = emqx:update_config([authorization, no_match], deny),
     {ok, _} = emqx:update_config([authorization, no_match], deny),
@@ -287,5 +305,58 @@ t_get_enabled_authzs_some_enabled(_Config) ->
     {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE4]),
     {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE4]),
     ?assertEqual([postgresql], emqx_authz:get_enabled_authzs()).
     ?assertEqual([postgresql], emqx_authz:get_enabled_authzs()).
 
 
+t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) ->
+    {ok, C} = emqtt:start_link([
+        {will_topic, <<"lwt">>},
+        {will_payload, <<"should be published">>}
+    ]),
+    {ok, _} = emqtt:connect(C),
+    ok = emqx:subscribe(<<"lwt">>),
+    process_flag(trap_exit, true),
+
+    try
+        emqtt:subscribe(C, <<"unauthorized">>),
+        error(should_have_disconnected)
+    catch
+        exit:{{shutdown, tcp_closed}, _} ->
+            ok
+    end,
+
+    receive
+        {deliver, <<"lwt">>, #message{payload = <<"should be published">>}} ->
+            ok
+    after 2_000 ->
+        error(lwt_not_published)
+    end,
+
+    ok.
+
+t_publish_deny_disconnect_publishes_last_will_testament(_Config) ->
+    {ok, C} = emqtt:start_link([
+        {will_topic, <<"lwt">>},
+        {will_payload, <<"should be published">>}
+    ]),
+    {ok, _} = emqtt:connect(C),
+    ok = emqx:subscribe(<<"lwt">>),
+    process_flag(trap_exit, true),
+
+    %% disconnect is async
+    Ref = monitor(process, C),
+    emqtt:publish(C, <<"some/topic">>, <<"unauthorized">>),
+    receive
+        {'DOWN', Ref, process, C, _} ->
+            ok
+    after 1_000 ->
+        error(client_should_have_been_disconnected)
+    end,
+    receive
+        {deliver, <<"lwt">>, #message{payload = <<"should be published">>}} ->
+            ok
+    after 2_000 ->
+        error(lwt_not_published)
+    end,
+
+    ok.
+
 stop_apps(Apps) ->
 stop_apps(Apps) ->
     lists:foreach(fun application:stop/1, Apps).
     lists:foreach(fun application:stop/1, Apps).