Bläddra i källkod

Merge pull request #11791 from thalesmg/fix-coap-heartbeat-m-20231018

fix(coap): increase received packet counter for keepalive
Thales Macedo Garitezi 2 år sedan
förälder
incheckning
eda82caf65

+ 13 - 5
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -86,7 +86,6 @@
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
 
 
 -define(DEF_IDLE_TIME, timer:seconds(30)).
 -define(DEF_IDLE_TIME, timer:seconds(30)).
--define(GET_IDLE_TIME(Cfg), maps:get(idle_timeout, Cfg, ?DEF_IDLE_TIME)).
 
 
 -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
 -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
 
 
@@ -150,8 +149,7 @@ init(
             mountpoint => Mountpoint
             mountpoint => Mountpoint
         }
         }
     ),
     ),
-    %% FIXME: it should coap.hearbeat instead of idle_timeout?
-    Heartbeat = ?GET_IDLE_TIME(Config),
+    Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_TIME),
     #channel{
     #channel{
         ctx = Ctx,
         ctx = Ctx,
         conninfo = ConnInfo,
         conninfo = ConnInfo,
@@ -179,8 +177,8 @@ send_request(Channel, Request) ->
     | {ok, replies(), channel()}
     | {ok, replies(), channel()}
     | {shutdown, Reason :: term(), channel()}
     | {shutdown, Reason :: term(), channel()}
     | {shutdown, Reason :: term(), replies(), channel()}.
     | {shutdown, Reason :: term(), replies(), channel()}.
-handle_in(Msg, ChannleT) ->
-    Channel = ensure_keepalive_timer(ChannleT),
+handle_in(Msg, Channel0) ->
+    Channel = ensure_keepalive_timer(Channel0),
     case emqx_coap_message:is_request(Msg) of
     case emqx_coap_message:is_request(Msg) of
         true ->
         true ->
             check_auth_state(Msg, Channel);
             check_auth_state(Msg, Channel);
@@ -321,6 +319,9 @@ handle_call(Req, _From, Channel) ->
 handle_cast(close, Channel) ->
 handle_cast(close, Channel) ->
     ?SLOG(info, #{msg => "close_connection"}),
     ?SLOG(info, #{msg => "close_connection"}),
     shutdown(normal, Channel);
     shutdown(normal, Channel);
+handle_cast(inc_recv_pkt, Channel) ->
+    _ = emqx_pd:inc_counter(recv_pkt, 1),
+    {ok, Channel};
 handle_cast(Req, Channel) ->
 handle_cast(Req, Channel) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
     ?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
     {ok, Channel}.
     {ok, Channel}.
@@ -455,6 +456,13 @@ check_token(
                     Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
                     Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
                     {shutdown, normal, Reply, Channel};
                     {shutdown, normal, Reply, Channel};
                 true ->
                 true ->
+                    %% hack: since each message request can spawn a new connection
+                    %% process, we can't rely on the `inc_incoming_stats' call in
+                    %% `emqx_gateway_conn:handle_incoming' to properly keep track of
+                    %% bumping incoming requests for an existing channel.  Since this
+                    %% number is used by keepalive, we have to bump it inside the
+                    %% requested channel/connection pid so heartbeats actually work.
+                    emqx_gateway_cm:cast(coap, ReqClientId, inc_recv_pkt),
                     call_session(handle_request, Msg, Channel)
                     call_session(handle_request, Msg, Channel)
             end;
             end;
         _ ->
         _ ->

+ 67 - 6
apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl

@@ -83,10 +83,26 @@ init_per_testcase(t_connection_with_authn_failed, Config) ->
         fun(_) -> {error, bad_username_or_password} end
         fun(_) -> {error, bad_username_or_password} end
     ),
     ),
     Config;
     Config;
+init_per_testcase(t_heartbeat, Config) ->
+    NewHeartbeat = 800,
+    OldConf = emqx:get_raw_config([gateway, coap]),
+    {ok, _} = emqx_gateway_conf:update_gateway(
+        coap,
+        OldConf#{<<"heartbeat">> => <<"800ms">>}
+    ),
+    [
+        {old_conf, OldConf},
+        {new_heartbeat, NewHeartbeat}
+        | Config
+    ];
 init_per_testcase(_, Config) ->
 init_per_testcase(_, Config) ->
     ok = meck:new(emqx_access_control, [passthrough]),
     ok = meck:new(emqx_access_control, [passthrough]),
     Config.
     Config.
 
 
+end_per_testcase(t_heartbeat, Config) ->
+    OldConf = ?config(old_conf, Config),
+    {ok, _} = emqx_gateway_conf:update_gateway(coap, OldConf),
+    ok;
 end_per_testcase(_, Config) ->
 end_per_testcase(_, Config) ->
     ok = meck:unload(emqx_access_control),
     ok = meck:unload(emqx_access_control),
     Config.
     Config.
@@ -123,13 +139,49 @@ t_connection(_) ->
         ),
         ),
 
 
         %% heartbeat
         %% heartbeat
-        HeartURI =
-            ?MQTT_PREFIX ++
-                "/connection?clientid=client1&token=" ++
-                Token,
+        {ok, changed, _} = send_heartbeat(Token),
 
 
-        ?LOGT("send heartbeat request:~ts~n", [HeartURI]),
-        {ok, changed, _} = er_coap_client:request(put, HeartURI),
+        disconnection(Channel, Token),
+
+        timer:sleep(100),
+        ?assertEqual(
+            [],
+            emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
+        )
+    end,
+    do(Action).
+
+t_heartbeat(Config) ->
+    Heartbeat = ?config(new_heartbeat, Config),
+    Action = fun(Channel) ->
+        Token = connection(Channel),
+
+        timer:sleep(100),
+        ?assertNotEqual(
+            [],
+            emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
+        ),
+
+        %% must keep client connection alive
+        Delay = Heartbeat div 2,
+        lists:foreach(
+            fun(_) ->
+                ?assertMatch({ok, changed, _}, send_heartbeat(Token)),
+                timer:sleep(Delay)
+            end,
+            lists:seq(1, 5)
+        ),
+
+        ?assertNotEqual(
+            [],
+            emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
+        ),
+
+        timer:sleep(Heartbeat * 2),
+        ?assertEqual(
+            [],
+            emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
+        ),
 
 
         disconnection(Channel, Token),
         disconnection(Channel, Token),
 
 
@@ -491,6 +543,15 @@ t_connectionless_pubsub(_) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% helpers
 %% helpers
 
 
+send_heartbeat(Token) ->
+    HeartURI =
+        ?MQTT_PREFIX ++
+            "/connection?clientid=client1&token=" ++
+            Token,
+
+    ?LOGT("send heartbeat request:~ts~n", [HeartURI]),
+    er_coap_client:request(put, HeartURI).
+
 connection(Channel) ->
 connection(Channel) ->
     URI =
     URI =
         ?MQTT_PREFIX ++
         ?MQTT_PREFIX ++

+ 1 - 0
changes/ce/fix-11791.en.md

@@ -0,0 +1 @@
+Fixed an issue that prevented heartbeats from correctly keeping the CoAP Gateway connections alive.