Selaa lähdekoodia

test: more code coverage for emqx_gateway_api_clients

JianBo He 4 vuotta sitten
vanhempi
commit
e9e559ccd0

+ 1 - 0
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -247,6 +247,7 @@ handle_call({subscribe, Topic, SubOpts}, _From,
     %% modifty session state
     SubReq = {Topic, Token},
     TempMsg = #coap_message{type = non},
+    %% FIXME: The subopts is not used for emqx_coap_session
     Result  = emqx_coap_session:process_subscribe(
                 SubReq, TempMsg, #{}, Session),
     NSession = maps:get(session, Result),

+ 3 - 1
apps/emqx_gateway/src/coap/emqx_coap_session.erl

@@ -92,7 +92,9 @@ info(Keys, Session) when is_list(Keys) ->
     [{Key, info(Key, Session)} || Key <- Keys];
 info(subscriptions, #session{observe_manager = OM}) ->
     Topics = emqx_coap_observe_res:subscriptions(OM),
-    lists:foldl(fun(T, Acc) -> Acc#{T => ?DEFAULT_SUBOPTS} end, #{}, Topics);
+    lists:foldl(
+      fun(T, Acc) -> Acc#{T => emqx_gateway_utils:default_subopts()} end,
+      #{}, Topics);
 info(subscriptions_cnt, #session{observe_manager = OM}) ->
     erlang:length(emqx_coap_observe_res:subscriptions(OM));
 info(subscriptions_max, _) ->

+ 16 - 26
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -73,7 +73,7 @@ paths() ->
     , {<<"ip_address">>, ip}
     , {<<"conn_state">>, atom}
     , {<<"clean_start">>, atom}
-    , {<<"proto_ver">>, integer}
+    , {<<"proto_ver">>, binary}
     , {<<"like_clientid">>, binary}
     , {<<"like_username">>, binary}
     , {<<"gte_created_at">>, timestamp}
@@ -83,15 +83,16 @@ paths() ->
     %% special keys for lwm2m protocol
     , {<<"endpoint_name">>, binary}
     , {<<"like_endpoint_name">>, binary}
-    , {<<"gte_lifetime">>, timestamp}
-    , {<<"lte_lifetime">>, timestamp}
+    , {<<"gte_lifetime">>, integer}
+    , {<<"lte_lifetime">>, integer}
     ]).
 
 -define(QUERY_FUN, {?MODULE, query}).
 
 clients(get, #{ bindings := #{name := Name0}
-              , query_string := Params
+              , query_string := Params0
               }) ->
+    Params = emqx_mgmt_api:ensure_timestamp_format(Params0, time_keys()),
     with_gateway(Name0, fun(GwName, _) ->
         TabName = emqx_gateway_cm:tabname(info, GwName),
         case maps:get(<<"node">>, Params, undefined) of
@@ -147,10 +148,6 @@ subscriptions(get, #{ bindings := #{name := Name0,
     ClientId = emqx_mgmt_util:urldecode(ClientId0),
     with_gateway(Name0, fun(GwName, _) ->
         case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
-            {error, nosupport} ->
-                return_http_error(405, <<"Not support to list subscriptions">>);
-            {error, noimpl} ->
-                return_http_error(501, <<"Not implemented now">>);
             {error, Reason} ->
                 return_http_error(500, Reason);
             {ok, Subs} ->
@@ -171,14 +168,6 @@ subscriptions(post, #{ bindings := #{name := Name0,
             {Topic, SubOpts} ->
                 case emqx_gateway_http:client_subscribe(
                        GwName, ClientId, Topic, SubOpts) of
-                    {error, nosupport} ->
-                        return_http_error(
-                          405,
-                          <<"Not support to add a subscription">>);
-                    {error, noimpl} ->
-                        return_http_error(
-                          501,
-                          <<"Not implemented now">>);
                     {error, Reason} ->
                         return_http_error(404, Reason);
                     {ok, {NTopic, NSubOpts}}->
@@ -221,6 +210,16 @@ extra_sub_props(Props) ->
       #{subid => maps:get(<<"subid">>, Props, undefined)}
      ).
 
+%%--------------------------------------------------------------------
+%% QueryString data-fomrat convert
+%%  (try rfc3339 to timestamp or keep timestamp)
+
+time_keys() ->
+    [ <<"gte_created_at">>
+    , <<"lte_created_at">>
+    , <<"gte_connected_at">>
+    , <<"lte_connected_at">>].
+
 %%--------------------------------------------------------------------
 %% query funcs
 
@@ -264,10 +263,8 @@ ms(clientid, X) ->
     #{clientinfo => #{clientid => X}};
 ms(username, X) ->
     #{clientinfo => #{username => X}};
-ms(zone, X) ->
-    #{clientinfo => #{zone => X}};
 ms(ip_address, X) ->
-    #{clientinfo => #{peername => {X, '_'}}};
+    #{clientinfo => #{peerhost => X}};
 ms(conn_state, X) ->
     #{conn_state => X};
 ms(clean_start, X) ->
@@ -616,9 +613,6 @@ roots() ->
     , subscription
     ].
 
-fields(test) ->
-    [{key, mk(binary(), #{ desc => <<"Desc">>})}];
-
 fields(stomp_client) ->
     common_client_props();
 fields(mqttsn_client) ->
@@ -707,10 +701,6 @@ common_client_props() ->
     %, {will_msg,
     %   mk(binary(),
     %      #{ desc => <<"Client will message">>})}
-    %, {zone,
-    %   mk(binary(),
-    %      #{ desc => <<"Indicate the configuration group used by the "
-    %                   "client">>})}
     , {keepalive,
        mk(integer(),
           #{ desc => <<"keepalive time, with the unit of second">>})}

+ 1 - 1
apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl

@@ -451,7 +451,7 @@ do_subscribe(TopicFilter, SubOpts, Channel =
                       subscriptions = Subs}) ->
     %% Mountpoint first
     NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter),
-    NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts),
+    NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts),
     SubId = maps:get(clientid, ClientInfo, undefined),
     %% XXX: is_new?
     IsNew = not maps:is_key(NTopicFilter, Subs),

+ 1 - 1
apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl

@@ -931,7 +931,7 @@ do_subscribe({TopicId, TopicName, SubOpts},
                           clientinfo = ClientInfo
                                      = #{mountpoint := Mountpoint}}) ->
     NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
-    NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts),
+    NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts),
     case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of
         {ok, NSession} ->
             {ok, {TopicId, NTopicName, NSubOpts},

+ 182 - 121
apps/emqx_gateway/test/emqx_coap_SUITE.erl

@@ -19,6 +19,11 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_gateway_test_utils,
+        [ request/2
+        , request/3
+        ]).
+
 -include_lib("er_coap_client/include/coap.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
@@ -48,114 +53,115 @@ gateway.coap
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_gateway], fun set_special_cfg/1),
+    ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
     Config.
 
-set_special_cfg(emqx_gateway) ->
-    ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT);
-
-set_special_cfg(_) ->
-    ok.
-
-end_per_suite(Config) ->
+end_per_suite(_) ->
     {ok, _} = emqx:remove_config([<<"gateway">>,<<"coap">>]),
-    emqx_common_test_helpers:stop_apps([emqx_gateway]),
-    Config.
+    emqx_mgmt_api_test_util:end_suite([emqx_gateway]).
 
 %%--------------------------------------------------------------------
 %% Test Cases
 %%--------------------------------------------------------------------
-t_connection(_Config) ->
+
+t_connection(_) ->
     Action = fun(Channel) ->
-                     %% connection
-                     Token = connection(Channel),
+        %% connection
+        Token = connection(Channel),
 
-                     timer:sleep(100),
-                     ?assertNotEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)),
+        timer:sleep(100),
+        ?assertNotEqual(
+           [],
+           emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)),
 
-                     %% heartbeat
-                     HeartURI = ?MQTT_PREFIX ++ "/connection?clientid=client1&token=" ++ Token,
-                     ?LOGT("send heartbeat request:~ts~n", [HeartURI]),
-                     {ok, changed, _} = er_coap_client:request(put, HeartURI),
+        %% heartbeat
+        HeartURI = ?MQTT_PREFIX ++
+                   "/connection?clientid=client1&token=" ++
+                   Token,
 
-                     disconnection(Channel, Token),
+        ?LOGT("send heartbeat request:~ts~n", [HeartURI]),
+        {ok, changed, _} = er_coap_client:request(put, HeartURI),
 
-                     timer:sleep(100),
-                     ?assertEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>))
-             end,
-    do(Action).
+        disconnection(Channel, Token),
 
+        timer:sleep(100),
+        ?assertEqual(
+           [],
+           emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>))
+    end,
+    do(Action).
 
-t_publish(_Config) ->
+t_publish(_) ->
     Action = fun(Channel, Token) ->
-                     Topic = <<"/abc">>,
-                     Payload = <<"123">>,
-
-                     TopicStr = binary_to_list(Topic),
-                     URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
-
-                     %% Sub topic first
-                     emqx:subscribe(Topic),
-
-                     Req = make_req(post, Payload),
-                     {ok, changed, _} = do_request(Channel, URI, Req),
-
-                     receive
-                         {deliver, Topic, Msg} ->
-                             ?assertEqual(Topic, Msg#message.topic),
-                             ?assertEqual(Payload, Msg#message.payload)
-                     after
-                         500 ->
-                             ?assert(false)
-                     end
-             end,
-
+        Topic = <<"/abc">>,
+        Payload = <<"123">>,
+
+        TopicStr = binary_to_list(Topic),
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+
+        %% Sub topic first
+        emqx:subscribe(Topic),
+
+        Req = make_req(post, Payload),
+        {ok, changed, _} = do_request(Channel, URI, Req),
+
+        receive
+            {deliver, Topic, Msg} ->
+                ?assertEqual(Topic, Msg#message.topic),
+                ?assertEqual(Payload, Msg#message.payload)
+        after
+            500 ->
+                ?assert(false)
+        end
+    end,
     with_connection(Action).
 
-
-%t_publish_authz_deny(_Config) ->
+%t_publish_authz_deny(_) ->
 %    Action = fun(Channel, Token) ->
-%                     Topic = <<"/abc">>,
-%                     Payload = <<"123">>,
-%                     InvalidToken = lists:reverse(Token),
+%        Topic = <<"/abc">>,
+%        Payload = <<"123">>,
+%        InvalidToken = lists:reverse(Token),
 %
-%                     TopicStr = binary_to_list(Topic),
-%                     URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ InvalidToken,
+%        TopicStr = binary_to_list(Topic),
+%        URI = ?PS_PREFIX ++
+%              TopicStr ++
+%              "?clientid=client1&token=" ++ InvalidToken,
 %
-%                     %% Sub topic first
-%                     emqx:subscribe(Topic),
+%        %% Sub topic first
+%        emqx:subscribe(Topic),
 %
-%                     Req = make_req(post, Payload),
-%                     Result = do_request(Channel, URI, Req),
-%                     ?assertEqual({error, reset}, Result)
-%             end,
+%        Req = make_req(post, Payload),
+%        Result = do_request(Channel, URI, Req),
+%        ?assertEqual({error, reset}, Result)
+%    end,
 %
 %    with_connection(Action).
 
-t_subscribe(_Config) ->
+t_subscribe(_) ->
     Topic = <<"/abc">>,
     Fun = fun(Channel, Token) ->
-                  TopicStr = binary_to_list(Topic),
-                  Payload = <<"123">>,
+        TopicStr = binary_to_list(Topic),
+        Payload = <<"123">>,
 
-                  URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
-                  Req = make_req(get, Payload, [{observe, 0}]),
-                  {ok, content, _} = do_request(Channel, URI, Req),
-                  ?LOGT("observer topic:~ts~n", [Topic]),
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        Req = make_req(get, Payload, [{observe, 0}]),
+        {ok, content, _} = do_request(Channel, URI, Req),
+        ?LOGT("observer topic:~ts~n", [Topic]),
 
-                  timer:sleep(100),
-                  [SubPid] = emqx:subscribers(Topic),
-                  ?assert(is_pid(SubPid)),
+        timer:sleep(100),
+        [SubPid] = emqx:subscribers(Topic),
+        ?assert(is_pid(SubPid)),
 
-                  %% Publish a message
-                  emqx:publish(emqx_message:make(Topic, Payload)),
-                  {ok, content, Notify} = with_response(Channel),
-                  ?LOGT("observer get Notif=~p", [Notify]),
+        %% Publish a message
+        emqx:publish(emqx_message:make(Topic, Payload)),
+        {ok, content, Notify} = with_response(Channel),
+        ?LOGT("observer get Notif=~p", [Notify]),
 
-                  #coap_content{payload = PayloadRecv} = Notify,
+        #coap_content{payload = PayloadRecv} = Notify,
 
-                  ?assertEqual(Payload, PayloadRecv)
-          end,
+        ?assertEqual(Payload, PayloadRecv)
+    end,
 
     with_connection(Fun),
     timer:sleep(100),
@@ -163,63 +169,117 @@ t_subscribe(_Config) ->
     ?assertEqual([], emqx:subscribers(Topic)).
 
 
-t_un_subscribe(_Config) ->
+t_un_subscribe(_) ->
     Topic = <<"/abc">>,
     Fun = fun(Channel, Token) ->
-                  TopicStr = binary_to_list(Topic),
-                  Payload = <<"123">>,
+        TopicStr = binary_to_list(Topic),
+        Payload = <<"123">>,
 
-                  URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
 
-                  Req = make_req(get, Payload, [{observe, 0}]),
-                  {ok, content, _} = do_request(Channel, URI, Req),
-                  ?LOGT("observer topic:~ts~n", [Topic]),
+        Req = make_req(get, Payload, [{observe, 0}]),
+        {ok, content, _} = do_request(Channel, URI, Req),
+        ?LOGT("observer topic:~ts~n", [Topic]),
 
-                  timer:sleep(100),
-                  [SubPid] = emqx:subscribers(Topic),
-                  ?assert(is_pid(SubPid)),
+        timer:sleep(100),
+        [SubPid] = emqx:subscribers(Topic),
+        ?assert(is_pid(SubPid)),
 
-                  UnReq = make_req(get, Payload, [{observe, 1}]),
-                  {ok, nocontent, _} = do_request(Channel, URI, UnReq),
-                  ?LOGT("un observer topic:~ts~n", [Topic]),
-                  timer:sleep(100),
-                  ?assertEqual([], emqx:subscribers(Topic))
-          end,
+        UnReq = make_req(get, Payload, [{observe, 1}]),
+        {ok, nocontent, _} = do_request(Channel, URI, UnReq),
+        ?LOGT("un observer topic:~ts~n", [Topic]),
+        timer:sleep(100),
+        ?assertEqual([], emqx:subscribers(Topic))
+    end,
 
     with_connection(Fun).
 
-t_observe_wildcard(_Config) ->
+t_observe_wildcard(_) ->
     Fun = fun(Channel, Token) ->
-                  %% resolve_url can't process wildcard with #
-                  Topic = <<"/abc/+">>,
-                  TopicStr = binary_to_list(Topic),
-                  Payload = <<"123">>,
+        %% resolve_url can't process wildcard with #
+        Topic = <<"/abc/+">>,
+        TopicStr = binary_to_list(Topic),
+        Payload = <<"123">>,
 
-                  URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
-                  Req = make_req(get, Payload, [{observe, 0}]),
-                  {ok, content, _} = do_request(Channel, URI, Req),
-                  ?LOGT("observer topic:~ts~n", [Topic]),
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        Req = make_req(get, Payload, [{observe, 0}]),
+        {ok, content, _} = do_request(Channel, URI, Req),
+        ?LOGT("observer topic:~ts~n", [Topic]),
 
-                  timer:sleep(100),
-                  [SubPid] = emqx:subscribers(Topic),
-                  ?assert(is_pid(SubPid)),
+        timer:sleep(100),
+        [SubPid] = emqx:subscribers(Topic),
+        ?assert(is_pid(SubPid)),
 
-                  %% Publish a message
-                  PubTopic = <<"/abc/def">>,
-                  emqx:publish(emqx_message:make(PubTopic, Payload)),
-                  {ok, content, Notify} = with_response(Channel),
+        %% Publish a message
+        PubTopic = <<"/abc/def">>,
+        emqx:publish(emqx_message:make(PubTopic, Payload)),
+        {ok, content, Notify} = with_response(Channel),
 
-                  ?LOGT("observer get Notif=~p", [Notify]),
+        ?LOGT("observer get Notif=~p", [Notify]),
 
-                  #coap_content{payload = PayloadRecv} = Notify,
+        #coap_content{payload = PayloadRecv} = Notify,
 
-                  ?assertEqual(Payload, PayloadRecv)
-          end,
+        ?assertEqual(Payload, PayloadRecv)
+    end,
+
+    with_connection(Fun).
 
+t_clients_api(_) ->
+    Fun = fun(_Channel, _Token) ->
+        ClientId = <<"client1">>,
+        %% list
+        {200, #{data := [Client1]}} = request(get, "/gateway/coap/clients"),
+        #{clientid := ClientId} = Client1,
+        %% searching
+        {200, #{data := [Client2]}} =
+            request(get, "/gateway/coap/clients",
+                    [{<<"clientid">>, ClientId}]),
+        {200, #{data := [Client3]}} =
+            request(get, "/gateway/coap/clients",
+                    [{<<"like_clientid">>, <<"cli">>}]),
+        %% lookup
+        {200, Client4} =
+            request(get, "/gateway/coap/clients/client1"),
+        %% assert
+        Client1 = Client2 = Client3 = Client4,
+        %% kickout
+        {204, _} =
+            request(delete, "/gateway/coap/clients/client1"),
+        {200, #{data := []}} = request(get, "/gateway/coap/clients")
+    end,
     with_connection(Fun).
 
+t_clients_subscription_api(_) ->
+    Fun = fun(_Channel, _Token) ->
+        Path = "/gateway/coap/clients/client1/subscriptions",
+        %% list
+        {200, []} = request(get, Path),
+        %% create
+        SubReq = #{ topic => <<"tx">>
+                  , qos => 0
+                  , nl => 0
+                  , rap => 0
+                  , rh => 0
+                  },
+
+        {201, SubsResp} = request(post, Path, SubReq),
+        {200, [SubsResp2]} = request(get, Path),
+        ?assertEqual(
+           maps:get(topic, SubsResp),
+           maps:get(topic, SubsResp2)),
+
+        {204, _} = request(delete, Path ++ "/tx"),
+
+        {200, []} = request(get, Path)
+    end,
+    with_connection(Fun).
+
+%%--------------------------------------------------------------------
+%% helpers
+
 connection(Channel) ->
-    URI = ?MQTT_PREFIX ++ "/connection?clientid=client1&username=admin&password=public",
+    URI = ?MQTT_PREFIX ++
+          "/connection?clientid=client1&username=admin&password=public",
     Req = make_req(post),
     {ok, created, Data} = do_request(Channel, URI, Req),
     #coap_content{payload = BinToken} = Data,
@@ -252,7 +312,8 @@ do_request(Channel, URI, #coap_message{options = Opts} = Req) ->
 
 with_response(Channel) ->
     receive
-        {coap_response, _ChId, Channel, _Ref, Message=#coap_message{method=Code}} ->
+        {coap_response, _ChId, Channel,
+         _Ref, Message=#coap_message{method=Code}} ->
             return_response(Code, Message);
         {coap_error, _ChId, Channel, _Ref, reset} ->
             {error, reset}
@@ -280,10 +341,10 @@ do(Fun) ->
 
 with_connection(Action) ->
     Fun = fun(Channel) ->
-                  Token = connection(Channel),
-                  timer:sleep(100),
-                  Action(Channel, Token),
-                  disconnection(Channel, Token),
-                  timer:sleep(100)
-          end,
+        Token = connection(Channel),
+        timer:sleep(100),
+        Action(Channel, Token),
+        disconnection(Channel, Token),
+        timer:sleep(100)
+    end,
     do(Fun).

+ 2 - 0
apps/emqx_gateway/test/emqx_gateway_test_utils.erl

@@ -117,6 +117,8 @@ req(Path, Qs) ->
 req(Path, Qs, Body) ->
     {url(Path, Qs), auth([]), "application/json", emqx_json:encode(Body)}.
 
+url(Path, []) ->
+    lists:concat([?http_api_host, Path]);
 url(Path, Qs) ->
     lists:concat([?http_api_host, Path, "?", binary_to_list(cow_qs:qs(Qs))]).
 

+ 78 - 3
apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl

@@ -19,6 +19,11 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_gateway_test_utils,
+        [ request/2
+        , request/3
+        ]).
+
 -define(PORT, 5783).
 
 -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
@@ -66,9 +71,9 @@ all() ->
     , {group, test_grp_4_discover}
     , {group, test_grp_5_write_attr}
     , {group, test_grp_6_observe}
-
       %% {group, test_grp_8_object_19}
     , {group, test_grp_9_psm_queue_mode}
+    , {group, test_grp_10_rest_api}
     ].
 
 suite() -> [{timetrap, {seconds, 90}}].
@@ -147,21 +152,29 @@ groups() ->
       [
        case90_psm_mode,
        case90_queue_mode
+      ]},
+     {test_grp_10_rest_api, [RepeatOpt],
+      [
+       case100_clients_api,
+       case100_subscription_api
       ]}
     ].
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_conf]),
+    %% load application first for minirest api searching
+    application:load(emqx_gateway),
+    emqx_mgmt_api_test_util:init_suite([emqx_conf]),
     Config.
 
 end_per_suite(Config) ->
     timer:sleep(300),
     {ok, _} = emqx_conf:remove([<<"gateway">>,<<"lwm2m">>], #{}),
-    emqx_common_test_helpers:stop_apps([emqx_conf]),
+    emqx_mgmt_api_test_util:end_suite([emqx_conf]),
     Config.
 
 init_per_testcase(_AllTestCase, Config) ->
     ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+
     {ok, _} = application:ensure_all_started(emqx_gateway),
     {ok, ClientUdpSock} = gen_udp:open(0, [binary, {active, false}]),
 
@@ -1887,6 +1900,68 @@ server_cache_mode(Config, RegOption) ->
     verify_read_response_1(2, UdpSock),
     verify_read_response_1(3, UdpSock).
 
+case100_clients_api(Config) ->
+    Epn = "urn:oma:lwm2m:oma:3",
+    MsgId1 = 15,
+    UdpSock = ?config(sock, Config),
+    ObjectList = <<"</1>, </2>, </3/0>, </4>, </5>">>,
+    RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
+    std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic),
+
+    %% list
+    {200, #{data := [Client1]}} = request(get, "/gateway/lwm2m/clients"),
+    %% searching
+    {200, #{data := [Client2]}} =
+        request(get, "/gateway/lwm2m/clients",
+                [{<<"endpoint_name">>, list_to_binary(Epn)}]),
+    {200, #{data := [Client3]}} =
+        request(get, "/gateway/lwm2m/clients",
+                [{<<"like_endpoint_name">>, list_to_binary(Epn)},
+                 {<<"gte_lifetime">>, <<"1">>}
+                ]),
+    %% lookup
+    ClientId = maps:get(clientid, Client1),
+    {200, Client4} =
+        request(get, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)),
+    %% assert
+    Client1 = Client2 = Client3 = Client4,
+    %% kickout
+    {204, _} =
+        request(delete, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)),
+    {200, #{data := []}} = request(get, "/gateway/lwm2m/clients").
+
+case100_subscription_api(Config) ->
+    Epn = "urn:oma:lwm2m:oma:3",
+    MsgId1 = 15,
+    UdpSock = ?config(sock, Config),
+    ObjectList = <<"</1>, </2>, </3/0>, </4>, </5>">>,
+    RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
+    std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic),
+
+    {200, #{data := [Client1]}} = request(get, "/gateway/lwm2m/clients"),
+    ClientId = maps:get(clientid, Client1),
+    Path = "/gateway/lwm2m/clients/" ++
+            binary_to_list(ClientId) ++
+            "/subscriptions",
+
+    %% list
+    {200, [InitSub]} = request(get, Path),
+    ?assertEqual(
+       <<"lwm2m/", (list_to_binary(Epn))/binary, "/dn/#">>,
+       maps:get(topic, InitSub)),
+
+    %% create
+    SubReq = #{ topic => <<"tx">>
+              , qos => 1
+              , nl => 0
+              , rap => 0
+              , rh => 0
+              },
+    {201, _} = request(post, Path, SubReq),
+    {200, _} = request(get, Path),
+    {204, _} = request(delete, Path ++ "/tx"),
+    {200, [InitSub]} = request(get, Path).
+
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 %%% Internal Functions
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 69 - 3
apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl

@@ -19,6 +19,11 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_gateway_test_utils,
+        [ request/2
+        , request/3
+        ]).
+
 -include("src/mqttsn/include/emqx_sn.hrl").
 
 -include_lib("eunit/include/eunit.hrl").
@@ -27,7 +32,6 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
-
 -define(HOST, {127,0,0,1}).
 -define(PORT, 1884).
 
@@ -85,12 +89,12 @@ all() ->
 
 init_per_suite(Config) ->
     ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
-    emqx_common_test_helpers:start_apps([emqx_gateway]),
+    emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
     Config.
 
 end_per_suite(_) ->
     {ok, _} = emqx:remove_config([gateway, mqttsn]),
-    emqx_common_test_helpers:stop_apps([emqx_gateway]).
+    emqx_mgmt_api_test_util:end_suite([emqx_gateway]).
 
 %%--------------------------------------------------------------------
 %% Test cases
@@ -1762,6 +1766,68 @@ t_broadcast_test1(_) ->
     timer:sleep(600),
     gen_udp:close(Socket).
 
+t_clients_api(_) ->
+    TsNow = emqx_gateway_utils:unix_ts_to_rfc3339(
+              erlang:system_time(millisecond)),
+    ClientId = <<"client_id_test1">>,
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    send_connect_msg(Socket, ClientId),
+    ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
+    %% list
+    {200, #{data := [Client1]}} = request(get, "/gateway/mqttsn/clients"),
+    #{clientid := ClientId} = Client1,
+    %% searching
+    {200, #{data := [Client2]}} =
+        request(get, "/gateway/mqttsn/clients", [{<<"clientid">>, ClientId}]),
+    {200, #{data := [Client3]}} =
+        request(get, "/gateway/mqttsn/clients",
+                [{<<"like_clientid">>, <<"test1">>},
+                 {<<"proto_ver">>, <<"1.2">>},
+                 {<<"ip_address">>, <<"127.0.0.1">>},
+                 {<<"conn_state">>, <<"connected">>},
+                 {<<"clean_start">>, <<"true">>},
+                 {<<"gte_connected_at">>, TsNow}
+                ]),
+    %% lookup
+    {200, Client4} =
+        request(get, "/gateway/mqttsn/clients/client_id_test1"),
+    %% assert
+    Client1 = Client2 = Client3 = Client4,
+    %% kickout
+    {204, _} =
+        request(delete, "/gateway/mqttsn/clients/client_id_test1"),
+    {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"),
+
+    send_disconnect_msg(Socket, undefined),
+    gen_udp:close(Socket).
+
+t_clients_subscription_api(_) ->
+    ClientId = <<"client_id_test1">>,
+    Path = "/gateway/mqttsn/clients/client_id_test1/subscriptions",
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    send_connect_msg(Socket, ClientId),
+    ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
+    %% list
+    {200, []} = request(get, Path),
+    %% create
+    SubReq = #{ topic => <<"tx">>
+              , qos => 1
+              , nl => 0
+              , rap => 0
+              , rh => 0
+              },
+    {201, SubsResp} = request(post, Path, SubReq),
+
+    {200, [SubsResp]} = request(get, Path),
+
+    {204, _} = request(delete, Path ++ "/tx"),
+
+    {200, []} = request(get, Path),
+
+    send_disconnect_msg(Socket, undefined),
+    ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
+    gen_udp:close(Socket).
+
 %%--------------------------------------------------------------------
 %% Helper funcs
 %%--------------------------------------------------------------------

+ 45 - 0
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -35,6 +35,15 @@
 
 -export([do_query/6]).
 
+-export([ ensure_timestamp_format/2
+        ]).
+
+-export([ unix_ts_to_rfc3339_bin/1
+        , unix_ts_to_rfc3339_bin/2
+        , time_string_to_unix_ts_int/1
+        , time_string_to_unix_ts_int/2
+        ]).
+
 paginate(Tables, Params, {Module, FormatFun}) ->
     Qh = query_handle(Tables),
     Count = count(Tables),
@@ -401,6 +410,7 @@ to_integer(B) when is_binary(B) ->
 to_timestamp(I) when is_integer(I) ->
     I;
 to_timestamp(B) when is_binary(B) ->
+
     binary_to_integer(B).
 
 aton(B) when is_binary(B) ->
@@ -412,6 +422,41 @@ to_ip_port(IPAddress) ->
     Port = list_to_integer(Port0),
     {IP, Port}.
 
+%%--------------------------------------------------------------------
+%% time format funcs
+
+ensure_timestamp_format(Qs, TimeKeys)
+  when is_map(Qs);
+       is_list(TimeKeys) ->
+    Fun = fun (Key, NQs) ->
+        case NQs of
+            %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339)
+            %% or "1609430400000" (in millisecond)
+            #{Key := TimeString} ->
+                NQs#{Key => time_string_to_unix_ts_int(TimeString)};
+            #{} -> NQs
+        end
+    end,
+    lists:foldl(Fun, Qs, TimeKeys).
+
+unix_ts_to_rfc3339_bin(TimeStamp) ->
+    unix_ts_to_rfc3339_bin(TimeStamp, millisecond).
+
+unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) ->
+    list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])).
+
+time_string_to_unix_ts_int(DateTime) ->
+    time_string_to_unix_ts_int(DateTime, millisecond).
+
+time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) ->
+    try binary_to_integer(DateTime) of
+        TimeStamp when is_integer(TimeStamp) -> TimeStamp
+    catch
+        error:badarg ->
+            calendar:rfc3339_to_system_time(
+              binary_to_list(DateTime), [{unit, Unit}])
+    end.
+
 %%--------------------------------------------------------------------
 %% EUnits
 %%--------------------------------------------------------------------

+ 8 - 43
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -44,14 +44,6 @@
 %% for batch operation
 -export([do_subscribe/3]).
 
-%% for test suite
--export([ unix_ts_to_rfc3339_bin/1
-        , unix_ts_to_rfc3339_bin/2
-        , time_string_to_unix_ts_int/1
-        , time_string_to_unix_ts_int/2
-        ]).
-
-
 -define(CLIENT_QS_SCHEMA, {emqx_channel_info,
     [ {<<"node">>, atom}
     , {<<"username">>, binary}
@@ -463,7 +455,7 @@ keepalive_api() ->
 %%%==============================================================================================
 %% parameters trans
 clients(get, #{query_string := Qs}) ->
-    list(generate_qs(Qs)).
+    list(emqx_mgmt_api:ensure_timestamp_format(Qs, time_keys())).
 
 client(get, #{bindings := Bindings}) ->
     lookup(Bindings);
@@ -625,7 +617,8 @@ do_unsubscribe(ClientID, Topic) ->
     end.
 
 %%--------------------------------------------------------------------
-%% QueryString Generation (try rfc3339 to timestamp or keep timestamp)
+%% QueryString data-fomrat convert
+%%  (try rfc3339 to timestamp or keep timestamp)
 
 time_keys() ->
     [ <<"gte_created_at">>
@@ -633,18 +626,6 @@ time_keys() ->
     , <<"gte_connected_at">>
     , <<"lte_connected_at">>].
 
-generate_qs(Qs) ->
-    Fun =
-        fun (Key, NQs) ->
-                case NQs of
-                    %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339)
-                    %% or "1609430400000" (in millisecond)
-                    #{Key := TimeString} -> NQs#{Key => time_string_to_unix_ts_int(TimeString)};
-                    #{}                  -> NQs
-                end
-        end,
-    lists:foldl(Fun, Qs, time_keys()).
-
 %%--------------------------------------------------------------------
 %% Query Functions
 
@@ -778,8 +759,11 @@ take_maps_from_inner(Key, Value, Current) ->
 
 result_format_time_fun(Key, NClientInfoMap) ->
     case NClientInfoMap of
-        #{Key := TimeStamp} -> NClientInfoMap#{Key => unix_ts_to_rfc3339_bin(TimeStamp)};
-        #{}                 -> NClientInfoMap
+        #{Key := TimeStamp} ->
+            NClientInfoMap#{
+              Key => emqx_mgmt_api:unix_ts_to_rfc3339_bin(TimeStamp)};
+        #{} ->
+            NClientInfoMap
     end.
 
 -spec(peername_dispart(emqx_types:peername()) -> {binary(), inet:port_number()}).
@@ -795,22 +779,3 @@ format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) ->
        updated_time => Timestamp
      }.
 
-%%--------------------------------------------------------------------
-%% time format funcs
-
-unix_ts_to_rfc3339_bin(TimeStamp) ->
-    unix_ts_to_rfc3339_bin(TimeStamp, millisecond).
-
-unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) ->
-    list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])).
-
-time_string_to_unix_ts_int(DateTime) ->
-    time_string_to_unix_ts_int(DateTime, millisecond).
-
-time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) ->
-    try binary_to_integer(DateTime) of
-        TimeStamp when is_integer(TimeStamp) -> TimeStamp
-    catch
-        error:badarg ->
-            calendar:rfc3339_to_system_time(binary_to_list(DateTime), [{unit, Unit}])
-    end.

+ 3 - 3
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -129,7 +129,7 @@ t_query_clients_with_time(_) ->
     NowTimeStampInt = erlang:system_time(millisecond),
     %% Do not uri_encode `=` to `%3D`
     Rfc3339String   = emqx_http_lib:uri_encode(binary:bin_to_list(
-        emqx_mgmt_api_clients:unix_ts_to_rfc3339_bin(NowTimeStampInt))),
+        emqx_mgmt_api:unix_ts_to_rfc3339_bin(NowTimeStampInt))),
     TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)),
 
     LteKeys         = ["lte_created_at=", "lte_connected_at="],
@@ -147,10 +147,10 @@ t_query_clients_with_time(_) ->
                        || {ok, Response} <- RequestResults],
     {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults),
     %% EachData :: list()
-    [?assert( emqx_mgmt_api_clients:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt)
+    [?assert( emqx_mgmt_api:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt)
      || #{<<"data">> := EachData} <- LteResponseDecodeds,
         #{<<"created_at">> := CreatedAt}     <- EachData],
-    [?assert(emqx_mgmt_api_clients:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt)
+    [?assert(emqx_mgmt_api:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt)
      || #{<<"data">> := EachData} <- LteResponseDecodeds,
         #{<<"connected_at">> := ConnectedAt} <- EachData],
     [?assertEqual(EachData, [])