Browse Source

Merge pull request #10979 from HJianBo/fix-coap-connectionless-pub

fix(coap): fix crash publish/subscribe in connectionless mode
JianBo He 2 năm trước cách đây
mục cha
commit
2153cd4317

+ 2 - 0
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -381,6 +381,8 @@ ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) ->
     Heartbeat = emqx_keepalive:info(interval, KeepAlive),
     Heartbeat = emqx_keepalive:info(interval, KeepAlive),
     Fun(keepalive, Heartbeat, keepalive, Channel).
     Fun(keepalive, Heartbeat, keepalive, Channel).
 
 
+check_auth_state(Msg, #channel{connection_required = false} = Channel) ->
+    call_session(handle_request, Msg, Channel);
 check_auth_state(Msg, #channel{connection_required = true} = Channel) ->
 check_auth_state(Msg, #channel{connection_required = true} = Channel) ->
     case is_create_connection_request(Msg) of
     case is_create_connection_request(Msg) of
         true ->
         true ->

+ 41 - 2
apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl

@@ -58,14 +58,14 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 init_per_suite(Config) ->
 init_per_suite(Config) ->
     application:load(emqx_gateway_coap),
     application:load(emqx_gateway_coap),
     ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
     ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
-    emqx_mgmt_api_test_util:init_suite([emqx_authn, emqx_gateway]),
+    emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]),
     ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
     ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
     Config.
     Config.
 
 
 end_per_suite(_) ->
 end_per_suite(_) ->
     meck:unload(emqx_access_control),
     meck:unload(emqx_access_control),
     {ok, _} = emqx:remove_config([<<"gateway">>, <<"coap">>]),
     {ok, _} = emqx:remove_config([<<"gateway">>, <<"coap">>]),
-    emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn]).
+    emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn, emqx_conf]).
 
 
 init_per_testcase(t_connection_with_authn_failed, Config) ->
 init_per_testcase(t_connection_with_authn_failed, Config) ->
     ok = meck:expect(
     ok = meck:expect(
@@ -92,6 +92,13 @@ mqtt_prefix() ->
 ps_prefix() ->
 ps_prefix() ->
     ?PS_PREFIX.
     ?PS_PREFIX.
 
 
+restart_coap_with_connection_mode(Bool) ->
+    Conf = emqx:get_raw_config([gateway, coap]),
+    emqx_gateway_conf:update_gateway(
+        coap,
+        Conf#{<<"connection_required">> => atom_to_binary(Bool)}
+    ).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Test Cases
 %% Test Cases
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -373,6 +380,35 @@ t_on_offline_event(_) ->
     end,
     end,
     do(Fun).
     do(Fun).
 
 
+t_connectionless_pubsub(_) ->
+    restart_coap_with_connection_mode(false),
+    Fun = fun(Channel) ->
+        Topic = <<"t/a">>,
+        Payload = <<"123">>,
+        URI = pubsub_uri(binary_to_list(Topic)),
+        Req = make_req(get, Payload, [{observe, 0}]),
+        {ok, content, _} = do_request(Channel, URI, Req),
+        ?LOGT("observer topic:~ts~n", [Topic]),
+
+        %% ensure subscribe succeed
+        timer:sleep(100),
+        [SubPid] = emqx:subscribers(Topic),
+        ?assert(is_pid(SubPid)),
+
+        %% publish a message
+        Req2 = make_req(post, Payload),
+        {ok, changed, _} = do_request(Channel, URI, Req2),
+
+        {ok, content, Notify} = with_response(Channel),
+        ?LOGT("observer get Notif=~p", [Notify]),
+
+        #coap_content{payload = PayloadRecv} = Notify,
+
+        ?assertEqual(Payload, PayloadRecv)
+    end,
+    do(Fun),
+    restart_coap_with_connection_mode(true).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% helpers
 %% helpers
 
 
@@ -402,6 +438,9 @@ observe(Channel, Token, false) ->
     {ok, nocontent, _Data} = do_request(Channel, URI, Req),
     {ok, nocontent, _Data} = do_request(Channel, URI, Req),
     ok.
     ok.
 
 
+pubsub_uri(Topic) when is_list(Topic) ->
+    ?PS_PREFIX ++ "/" ++ Topic.
+
 pubsub_uri(Topic, Token) when is_list(Topic), is_list(Token) ->
 pubsub_uri(Topic, Token) when is_list(Topic), is_list(Token) ->
     ?PS_PREFIX ++ "/" ++ Topic ++ "?clientid=client1&token=" ++ Token.
     ?PS_PREFIX ++ "/" ++ Topic ++ "?clientid=client1&token=" ++ Token.