Przeglądaj źródła

fix(coap): trigger disconnect logic when the client manually disconnected

firest 3 lat temu
rodzic
commit
ab51d8ab8e

+ 2 - 1
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -638,7 +638,8 @@ process_connection({open, Req}, Result,
          Channel);
          Channel);
 process_connection({close, Msg}, _, Channel, _) ->
 process_connection({close, Msg}, _, Channel, _) ->
     Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
     Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
-    {shutdown, close, Reply, Channel}.
+    NChannel = ensure_disconnected(close_connection, Channel),
+    {shutdown, close, Reply, NChannel}.
 
 
 process_subscribe({Sub, Msg}, Result, #channel{session = Session} = Channel, Iter) ->
 process_subscribe({Sub, Msg}, Result, #channel{session = Session} = Channel, Iter) ->
     Result2 = emqx_coap_session:process_subscribe(Sub, Msg, Result, Session),
     Result2 = emqx_coap_session:process_subscribe(Sub, Msg, Result, Session),

+ 34 - 1
apps/emqx_gateway/test/emqx_coap_SUITE.erl

@@ -292,12 +292,37 @@ t_clients_get_subscription_api(_) ->
           end,
           end,
     with_connection(Fun).
     with_connection(Fun).
 
 
+t_on_offline_event(_) ->
+    Fun = fun(Channel) ->
+                  application:start(emqx_modules),
+                  emqx_event_message_SUITE:load_config(),
+                  timer:sleep(100),
+
+                  emqx_event_message:enable(),
+                  timer:sleep(200),
+
+                  emqx_broker:subscribe(<<"$event/#">>),
+
+                  Token = connection(Channel),
+                  ?assertMatch(#message{topic = <<"$event/client_connected">>}, receive_deliver(500)),
+
+                  disconnection(Channel, Token),
+
+                  ?assertMatch(#message{topic = <<"$event/client_disconnected">>}, receive_deliver(500)),
+
+                  emqx_broker:unsubscribe(<<"$event/#">>),
+                  emqx_event_message:disable(),
+                  application:stop(emqx_modules),
+                  timer:sleep(200)
+          end,
+    do(Fun).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% helpers
 %% helpers
 
 
 connection(Channel) ->
 connection(Channel) ->
     URI = ?MQTT_PREFIX ++
     URI = ?MQTT_PREFIX ++
-          "/connection?clientid=client1&username=admin&password=public",
+        "/connection?clientid=client1&username=admin&password=public",
     Req = make_req(post),
     Req = make_req(post),
     {ok, created, Data} = do_request(Channel, URI, Req),
     {ok, created, Data} = do_request(Channel, URI, Req),
     #coap_content{payload = BinToken} = Data,
     #coap_content{payload = BinToken} = Data,
@@ -378,3 +403,11 @@ with_connection(Action) ->
         timer:sleep(100)
         timer:sleep(100)
     end,
     end,
     do(Fun).
     do(Fun).
+
+receive_deliver(Wait) ->
+    receive
+        {deliver, _, Msg} ->
+            Msg
+    after Wait ->
+            {error, timeout}
+    end.

+ 4 - 1
apps/emqx_modules/test/emqx_event_message_SUITE.erl

@@ -42,9 +42,12 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 init_per_suite(Config) ->
 init_per_suite(Config) ->
     emqx_common_test_helpers:boot_modules(all),
     emqx_common_test_helpers:boot_modules(all),
     emqx_common_test_helpers:start_apps([emqx_modules]),
     emqx_common_test_helpers:start_apps([emqx_modules]),
-    ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?EVENT_MESSAGE),
+    load_config(),
     Config.
     Config.
 
 
+load_config() ->
+    ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?EVENT_MESSAGE).
+
 end_per_suite(_Config) ->
 end_per_suite(_Config) ->
     emqx_common_test_helpers:stop_apps([emqx_modules]).
     emqx_common_test_helpers:stop_apps([emqx_modules]).