Przeglądaj źródła

feat: set keepalive via http api (#6171)

* feat: set keepalive via http api

* fix: make cluster_rpc test case solider
zhongwencool 4 lat temu
rodzic
commit
33f5eec802

+ 11 - 0
apps/emqx/src/emqx_channel.erl

@@ -1003,6 +1003,17 @@ handle_call({quota, Policy}, Channel) ->
     Quota = emqx_limiter:init(Zone, Policy),
     reply(ok, Channel#channel{quota = Quota});
 
+handle_call({keepalive, Interval}, Channel = #channel{keepalive = KeepAlive,
+    conninfo = ConnInfo}) ->
+    ClientId = info(clientid, Channel),
+    NKeepalive = emqx_keepalive:set(interval, Interval * 1000, KeepAlive),
+    NConnInfo = maps:put(keepalive, Interval, ConnInfo),
+    NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo},
+    SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}),
+    ChanInfo1 = info(NChannel),
+    emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
+    reply(ok, reset_timer(alive_timer, NChannel));
+
 handle_call(Req, Channel) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     reply(ignored, Channel).

+ 8 - 1
apps/emqx/src/emqx_keepalive.erl

@@ -20,8 +20,11 @@
         , info/1
         , info/2
         , check/2
+        , set/3
         ]).
 
+-elvis([{elvis_style, no_if_expression, disable}]).
+
 -export_type([keepalive/0]).
 
 -record(keepalive, {
@@ -49,7 +52,7 @@ info(#keepalive{interval = Interval,
       repeat   => Repeat
      }.
 
--spec(info(interval|statval|repeat, keepalive())
+-spec(info(interval | statval | repeat, keepalive())
       -> non_neg_integer()).
 info(interval, #keepalive{interval = Interval}) ->
     Interval;
@@ -71,3 +74,7 @@ check(NewVal, KeepAlive = #keepalive{statval = OldVal,
         true -> {error, timeout}
     end.
 
+%% @doc Update keepalive's interval
+-spec(set(interval, non_neg_integer(), keepalive()) -> keepalive()).
+set(interval, Interval, KeepAlive) ->
+    KeepAlive#keepalive{interval = Interval}.

+ 4 - 5
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -122,6 +122,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
     emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
     ets:new(test, [named_table, public]),
+    ets:insert(test, {other_mfa_result, failed}),
     ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]),
     {M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]},
     {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
@@ -129,6 +130,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
     ct:pal("333:~p~n", [emqx_cluster_rpc:status()]),
     {atomic, [_Status|L]} = emqx_cluster_rpc:status(),
     ?assertEqual([], L),
+    ets:insert(test, {other_mfa_result, ok}),
     {ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000),
     ct:sleep(1000),
     {atomic, NewStatus} = emqx_cluster_rpc:status(),
@@ -239,12 +241,9 @@ failed_on_node_by_odd(Pid) ->
     end.
 
 failed_on_other_recover_after_retry(Pid) ->
-    Counter = ets:update_counter(test, counter, 1, {counter, 0}),
     case Pid =:= self() of
         true -> ok;
         false ->
-            case Counter < 4 of
-                true -> "MFA return not ok";
-                false -> ok
-            end
+            [{_, Res}] = ets:lookup(test, other_mfa_result),
+            Res
     end.

+ 13 - 0
apps/emqx_management/src/emqx_mgmt.erl

@@ -17,6 +17,8 @@
 -module(emqx_mgmt).
 
 -include("emqx_mgmt.hrl").
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
+-elvis([{elvis_style, god_modules, disable}]).
 
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("emqx/include/emqx.hrl").
@@ -51,6 +53,7 @@
         , clean_authz_cache_all/1
         , set_ratelimit_policy/2
         , set_quota_policy/2
+        , set_keepalive/2
         ]).
 
 %% Internal funcs
@@ -149,6 +152,7 @@ node_info(Node) when Node =:= node() ->
           memory_used       => proplists:get_value(total, Memory),
           process_available => erlang:system_info(process_limit),
           process_used      => erlang:system_info(process_count),
+
           max_fds           => proplists:get_value(
                                  max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))),
           connections       => ets:info(emqx_channel, size),
@@ -235,6 +239,7 @@ nodes_info_count(PropList) ->
 %%--------------------------------------------------------------------
 
 lookup_client({clientid, ClientId}, FormatFun) ->
+
     lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun)
                   || Node <- mria_mnesia:running_nodes()]);
 
@@ -300,6 +305,7 @@ clean_authz_cache(ClientId) ->
     Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
     check_results(Results).
 
+
 clean_authz_cache(Node, ClientId) when Node =:= node() ->
     case emqx_cm:lookup_channels(ClientId) of
         [] ->
@@ -330,6 +336,9 @@ set_ratelimit_policy(ClientId, Policy) ->
 set_quota_policy(ClientId, Policy) ->
     call_client(ClientId, {quota, Policy}).
 
+set_keepalive(ClientId, Interval) ->
+    call_client(ClientId, {keepalive, Interval}).
+
 %% @private
 call_client(ClientId, Req) ->
     Results = [call_client(Node, ClientId, Req) || Node <- mria_mnesia:running_nodes()],
@@ -373,6 +382,7 @@ list_subscriptions_via_topic(Topic, FormatFun) ->
     lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
                   || Node <- mria_mnesia:running_nodes()]).
 
+
 list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
     MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
     erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]);
@@ -502,6 +512,7 @@ listener_id_filter(Id, Listeners) ->
     Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
     lists:filter(Filter, Listeners).
 
+
 -spec manage_listener( Operation :: start_listener
                                   | stop_listener
                                   | restart_listener
@@ -576,6 +587,7 @@ add_duration_field([], _Now, Acc) ->
     Acc;
 add_duration_field([Alarm = #{activated := true, activate_at := ActivateAt} | Rest], Now, Acc) ->
     add_duration_field(Rest, Now, [Alarm#{duration => Now - ActivateAt} | Acc]);
+
 add_duration_field( [Alarm = #{ activated := false
                               , activate_at := ActivateAt
                               , deactivate_at := DeactivateAt} | Rest]
@@ -638,3 +650,4 @@ max_row_limit() ->
     ?MAX_ROW_LIMIT.
 
 table_size(Tab) -> ets:info(Tab, size).
+

+ 36 - 1
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -34,6 +34,7 @@
         , subscribe/2
         , unsubscribe/2
         , subscribe_batch/2
+        , set_keepalive/2
         ]).
 
 -export([ query/4
@@ -82,7 +83,9 @@ apis() ->
     , clients_authz_cache_api()
     , clients_subscriptions_api()
     , subscribe_api()
-    , unsubscribe_api()].
+    , unsubscribe_api()
+    , keepalive_api()
+    ].
 
 schemas() ->
     Client = #{
@@ -435,6 +438,27 @@ subscribe_api() ->
                 <<"200">> => emqx_mgmt_util:schema(<<"Subscribe ok">>)}}},
     {"/clients/:clientid/subscribe", Metadata, subscribe}.
 
+keepalive_api() ->
+    Metadata = #{
+        put => #{
+            description => <<"set the online client keepalive by second ">>,
+            parameters => [#{
+                name => clientid,
+                in => path,
+                schema => #{type => string},
+                required => true
+            },
+                #{
+                    name => interval,
+                    in => query,
+                    schema => #{type => integer},
+                    required => true
+                }
+                ],
+            responses => #{
+                <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
+                <<"200">> => emqx_mgmt_util:schema(<<"ok">>)}}},
+    {"/clients/:clientid/keepalive", Metadata, set_keepalive}.
 %%%==============================================================================================
 %% parameters trans
 clients(get, #{query_string := Qs}) ->
@@ -478,6 +502,17 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
     end, Subs0),
     {200, Subs}.
 
+set_keepalive(put, #{bindings := #{clientid := ClientID}, query_string := Query}) ->
+    case maps:find(<<"interval">>, Query) of
+        error -> {404, "Interval Not Found"};
+        {ok, Interval0} ->
+            Interval = binary_to_integer(Interval0),
+            case emqx_mgmt:set_keepalive(emqx_mgmt_util:urldecode(ClientID), Interval) of
+                ok -> {200};
+                {error, not_found} ->{404, ?CLIENT_ID_NOT_FOUND}
+            end
+    end.
+
 %%%==============================================================================================
 %% api apply
 

+ 35 - 8
apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl

@@ -77,22 +77,27 @@ t_clients(_) ->
     ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse2),
 
     %% get /clients/:clientid/authz_cache should has no authz cache
-    Client1AuthzCachePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "authz_cache"]),
+    Client1AuthzCachePath = emqx_mgmt_api_test_util:api_path(["clients",
+        binary_to_list(ClientId1), "authz_cache"]),
     {ok, Client1AuthzCache} = emqx_mgmt_api_test_util:request_api(get, Client1AuthzCachePath),
     ?assertEqual("[]", Client1AuthzCache),
 
     %% post /clients/:clientid/subscribe
     SubscribeBody = #{topic => Topic, qos => Qos},
-    SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "subscribe"]),
-    {ok, _} =  emqx_mgmt_api_test_util:request_api(post, SubscribePath, "", AuthHeader, SubscribeBody),
+    SubscribePath = emqx_mgmt_api_test_util:api_path(["clients",
+        binary_to_list(ClientId1), "subscribe"]),
+    {ok, _} =  emqx_mgmt_api_test_util:request_api(post, SubscribePath,
+        "", AuthHeader, SubscribeBody),
     timer:sleep(100),
     [{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
     ?assertEqual(AfterSubTopic, Topic),
     ?assertEqual(AfterSubQos, Qos),
 
     %% post /clients/:clientid/unsubscribe
-    UnSubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "unsubscribe"]),
-    {ok, _} =  emqx_mgmt_api_test_util:request_api(post, UnSubscribePath, "", AuthHeader, SubscribeBody),
+    UnSubscribePath = emqx_mgmt_api_test_util:api_path(["clients",
+        binary_to_list(ClientId1), "unsubscribe"]),
+    {ok, _} =  emqx_mgmt_api_test_util:request_api(post, UnSubscribePath,
+        "", AuthHeader, SubscribeBody),
     timer:sleep(100),
     ?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)),
 
@@ -123,7 +128,8 @@ t_query_clients_with_time(_) ->
     %% get /clients with time(rfc3339)
     NowTimeStampInt = erlang:system_time(millisecond),
     %% Do not uri_encode `=` to `%3D`
-    Rfc3339String   = emqx_http_lib:uri_encode(binary:bin_to_list(emqx_mgmt_api_clients:unix_ts_to_rfc3339_bin(NowTimeStampInt))),
+    Rfc3339String   = emqx_http_lib:uri_encode(binary:bin_to_list(
+        emqx_mgmt_api_clients:unix_ts_to_rfc3339_bin(NowTimeStampInt))),
     TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)),
 
     LteKeys         = ["lte_created_at=", "lte_connected_at="],
@@ -133,8 +139,10 @@ t_query_clients_with_time(_) ->
     GteParamRfc3339 = [Param ++ Rfc3339String   || Param <- GteKeys],
     GteParamStamp   = [Param ++ TimeStampString || Param <- GteKeys],
 
-    RequestResults  = [emqx_mgmt_api_test_util:request_api(get, ClientsPath, Param, AuthHeader)
-                       || Param <- LteParamRfc3339 ++ LteParamStamp ++ GteParamRfc3339 ++ GteParamStamp],
+    RequestResults  =
+        [emqx_mgmt_api_test_util:request_api(get, ClientsPath, Param, AuthHeader)
+                       || Param <- LteParamRfc3339 ++ LteParamStamp
+            ++ GteParamRfc3339 ++ GteParamStamp],
     DecodedResults  = [emqx_json:decode(Response, [return_maps])
                        || {ok, Response} <- RequestResults],
     {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults),
@@ -153,3 +161,22 @@ t_query_clients_with_time(_) ->
     Client2Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId2)]),
     {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client1Path),
     {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path).
+
+t_keepalive(_Config) ->
+    Username = "user_keepalive",
+    ClientId = "client_keepalive",
+    AuthHeader      = emqx_mgmt_api_test_util:auth_header_(),
+    Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "keepalive"]),
+    Query = "interval=11",
+    {error,{"HTTP/1.1",404,"Not Found"}} =
+        emqx_mgmt_api_test_util:request_api(put, Path, Query, AuthHeader, <<"">>),
+    {ok, C1} = emqtt:start_link(#{username => Username, clientid => ClientId}),
+    {ok, _} = emqtt:connect(C1),
+    {ok, Ok} = emqx_mgmt_api_test_util:request_api(put, Path, Query, AuthHeader, <<"">>),
+    ?assertEqual("", Ok),
+    [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
+    State = sys:get_state(Pid),
+    ct:pal("~p~n", [State]),
+    ?assertEqual(11000, element(2, element(5, element(11, State)))),
+    emqtt:disconnect(C1),
+    ok.