Browse Source

Merge pull request #6650 from HJianBo/gw-review-r5

JianBo He 4 years atrás
parent
commit
8d5e0bbeb7
36 changed files with 1461 additions and 400 deletions
  1. 3 0
      apps/emqx_gateway/include/emqx_gateway.hrl
  2. 18 9
      apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl
  3. 1 0
      apps/emqx_gateway/src/coap/emqx_coap_channel.erl
  4. 3 1
      apps/emqx_gateway/src/coap/emqx_coap_session.erl
  5. 3 0
      apps/emqx_gateway/src/emqx_gateway.erl
  6. 16 26
      apps/emqx_gateway/src/emqx_gateway_api_clients.erl
  7. 85 41
      apps/emqx_gateway/src/emqx_gateway_cli.erl
  8. 13 15
      apps/emqx_gateway/src/emqx_gateway_cm.erl
  9. 38 29
      apps/emqx_gateway/src/emqx_gateway_cm_registry.erl
  10. 1 0
      apps/emqx_gateway/src/emqx_gateway_conf.erl
  11. 4 12
      apps/emqx_gateway/src/emqx_gateway_ctx.erl
  12. 0 1
      apps/emqx_gateway/src/emqx_gateway_gw_sup.erl
  13. 61 52
      apps/emqx_gateway/src/emqx_gateway_http.erl
  14. 2 5
      apps/emqx_gateway/src/emqx_gateway_insta_sup.erl
  15. 12 1
      apps/emqx_gateway/src/emqx_gateway_metrics.erl
  16. 2 1
      apps/emqx_gateway/src/emqx_gateway_sup.erl
  17. 2 3
      apps/emqx_gateway/src/emqx_gateway_utils.erl
  18. 1 1
      apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl
  19. 1 1
      apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl
  20. 10 2
      apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl
  21. 182 121
      apps/emqx_gateway/test/emqx_coap_SUITE.erl
  22. 100 0
      apps/emqx_gateway/test/emqx_gateway_SUITE.erl
  23. 11 9
      apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl
  24. 154 16
      apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl
  25. 248 0
      apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl
  26. 97 0
      apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl
  27. 67 0
      apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl
  28. 76 0
      apps/emqx_gateway/test/emqx_gateway_metrics_SUITE.erl
  29. 6 0
      apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl
  30. 2 0
      apps/emqx_gateway/test/emqx_gateway_test_utils.erl
  31. 78 3
      apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl
  32. 74 5
      apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl
  33. 34 0
      apps/emqx_gateway/test/emqx_stomp_SUITE.erl
  34. 45 0
      apps/emqx_management/src/emqx_mgmt_api.erl
  35. 8 43
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  36. 3 3
      apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

+ 3 - 0
apps/emqx_gateway/include/emqx_gateway.hrl

@@ -22,6 +22,7 @@
 %% @doc The Gateway defination
 -type gateway() ::
         #{ name    := gateway_name()
+         %% Description
          , descr   => binary() | undefined
          %% Appears only in getting gateway info
          , status  => stopped | running | unloaded
@@ -29,6 +30,8 @@
          , created_at => integer()
          %% Timestamp in millisecond
          , started_at => integer()
+         %% Timestamp in millisecond
+         , stopped_at => integer()
          %% Appears only in getting gateway info
          , config => emqx_config:config()
          }.

+ 18 - 9
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -649,7 +649,7 @@ parse_incoming(Data, Packets,
                           , reason => Reason
                           , stacktrace => Stk
                           }),
-            {[{frame_error, Reason}|Packets], State}
+            {[{frame_error, Reason} | Packets], State}
     end.
 
 next_incoming_msgs([Packet]) ->
@@ -720,20 +720,29 @@ serialize_and_inc_stats_fun(#state{
                                channel = Channel}) ->
     Ctx = ChannMod:info(ctx, Channel),
     fun(Packet) ->
-        case FrameMod:serialize_pkt(Packet, Serialize) of
-            <<>> ->
+        try
+            Data = FrameMod:serialize_pkt(Packet, Serialize),
+            ?SLOG(debug, #{ msg => "SEND_packet"
+                          %% XXX: optimize it, less cpu comsuption?
+                          , packet => FrameMod:format(Packet)
+                          }),
+            ok = inc_outgoing_stats(Ctx, FrameMod, Packet),
+            Data
+        catch
+            _ : too_large ->
                 ?SLOG(warning, #{ msg => "packet_too_large_discarded"
                                 , packet => FrameMod:format(Packet)
                                 }),
                  ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
                  ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
                  <<>>;
-            Data ->
-                ?SLOG(debug, #{ msg => "SEND_packet"
-                              , packet => FrameMod:format(Packet)
-                              }),
-                ok = inc_outgoing_stats(Ctx, FrameMod, Packet),
-                Data
+            _ : Reason ->
+                ?SLOG(warning, #{ msg => "packet_serialize_failure"
+                                , reason => Reason
+                                , packet => FrameMod:format(Packet)
+                                }),
+                 ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
+                 <<>>
         end
     end.
 

+ 1 - 0
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -247,6 +247,7 @@ handle_call({subscribe, Topic, SubOpts}, _From,
     %% modifty session state
     SubReq = {Topic, Token},
     TempMsg = #coap_message{type = non},
+    %% FIXME: The subopts is not used for emqx_coap_session
     Result  = emqx_coap_session:process_subscribe(
                 SubReq, TempMsg, #{}, Session),
     NSession = maps:get(session, Result),

+ 3 - 1
apps/emqx_gateway/src/coap/emqx_coap_session.erl

@@ -92,7 +92,9 @@ info(Keys, Session) when is_list(Keys) ->
     [{Key, info(Key, Session)} || Key <- Keys];
 info(subscriptions, #session{observe_manager = OM}) ->
     Topics = emqx_coap_observe_res:subscriptions(OM),
-    lists:foldl(fun(T, Acc) -> Acc#{T => ?DEFAULT_SUBOPTS} end, #{}, Topics);
+    lists:foldl(
+      fun(T, Acc) -> Acc#{T => emqx_gateway_utils:default_subopts()} end,
+      #{}, Topics);
 info(subscriptions_cnt, #session{observe_manager = OM}) ->
     erlang:length(emqx_coap_observe_res:subscriptions(OM));
 info(subscriptions_max, _) ->

+ 3 - 0
apps/emqx_gateway/src/emqx_gateway.erl

@@ -41,6 +41,7 @@ registered_gateway() ->
 %%--------------------------------------------------------------------
 %% Gateway APIs
 
+%% @doc List the load gateways
 -spec list() -> [gateway()].
 list() ->
     emqx_gateway_sup:list_gateway_insta().
@@ -65,6 +66,8 @@ lookup(Name) ->
 
 -spec update(gateway_name(), emqx_config:config()) -> ok | {error, any()}.
 %% @doc This function only supports full configuration updates
+%%
+%% Note: If the `enable` option is missing, it will be set to true by default
 update(Name, Config) ->
     emqx_gateway_sup:update_gateway(Name, Config).
 

+ 16 - 26
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -73,7 +73,7 @@ paths() ->
     , {<<"ip_address">>, ip}
     , {<<"conn_state">>, atom}
     , {<<"clean_start">>, atom}
-    , {<<"proto_ver">>, integer}
+    , {<<"proto_ver">>, binary}
     , {<<"like_clientid">>, binary}
     , {<<"like_username">>, binary}
     , {<<"gte_created_at">>, timestamp}
@@ -83,15 +83,16 @@ paths() ->
     %% special keys for lwm2m protocol
     , {<<"endpoint_name">>, binary}
     , {<<"like_endpoint_name">>, binary}
-    , {<<"gte_lifetime">>, timestamp}
-    , {<<"lte_lifetime">>, timestamp}
+    , {<<"gte_lifetime">>, integer}
+    , {<<"lte_lifetime">>, integer}
     ]).
 
 -define(QUERY_FUN, {?MODULE, query}).
 
 clients(get, #{ bindings := #{name := Name0}
-              , query_string := Params
+              , query_string := Params0
               }) ->
+    Params = emqx_mgmt_api:ensure_timestamp_format(Params0, time_keys()),
     with_gateway(Name0, fun(GwName, _) ->
         TabName = emqx_gateway_cm:tabname(info, GwName),
         case maps:get(<<"node">>, Params, undefined) of
@@ -147,10 +148,6 @@ subscriptions(get, #{ bindings := #{name := Name0,
     ClientId = emqx_mgmt_util:urldecode(ClientId0),
     with_gateway(Name0, fun(GwName, _) ->
         case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
-            {error, nosupport} ->
-                return_http_error(405, <<"Not support to list subscriptions">>);
-            {error, noimpl} ->
-                return_http_error(501, <<"Not implemented now">>);
             {error, Reason} ->
                 return_http_error(500, Reason);
             {ok, Subs} ->
@@ -171,14 +168,6 @@ subscriptions(post, #{ bindings := #{name := Name0,
             {Topic, SubOpts} ->
                 case emqx_gateway_http:client_subscribe(
                        GwName, ClientId, Topic, SubOpts) of
-                    {error, nosupport} ->
-                        return_http_error(
-                          405,
-                          <<"Not support to add a subscription">>);
-                    {error, noimpl} ->
-                        return_http_error(
-                          501,
-                          <<"Not implemented now">>);
                     {error, Reason} ->
                         return_http_error(404, Reason);
                     {ok, {NTopic, NSubOpts}}->
@@ -221,6 +210,16 @@ extra_sub_props(Props) ->
       #{subid => maps:get(<<"subid">>, Props, undefined)}
      ).
 
+%%--------------------------------------------------------------------
+%% QueryString data-fomrat convert
+%%  (try rfc3339 to timestamp or keep timestamp)
+
+time_keys() ->
+    [ <<"gte_created_at">>
+    , <<"lte_created_at">>
+    , <<"gte_connected_at">>
+    , <<"lte_connected_at">>].
+
 %%--------------------------------------------------------------------
 %% query funcs
 
@@ -264,10 +263,8 @@ ms(clientid, X) ->
     #{clientinfo => #{clientid => X}};
 ms(username, X) ->
     #{clientinfo => #{username => X}};
-ms(zone, X) ->
-    #{clientinfo => #{zone => X}};
 ms(ip_address, X) ->
-    #{clientinfo => #{peername => {X, '_'}}};
+    #{clientinfo => #{peerhost => X}};
 ms(conn_state, X) ->
     #{conn_state => X};
 ms(clean_start, X) ->
@@ -616,9 +613,6 @@ roots() ->
     , subscription
     ].
 
-fields(test) ->
-    [{key, mk(binary(), #{ desc => <<"Desc">>})}];
-
 fields(stomp_client) ->
     common_client_props();
 fields(mqttsn_client) ->
@@ -707,10 +701,6 @@ common_client_props() ->
     %, {will_msg,
     %   mk(binary(),
     %      #{ desc => <<"Client will message">>})}
-    %, {zone,
-    %   mk(binary(),
-    %      #{ desc => <<"Indicate the configuration group used by the "
-    %                   "client">>})}
     , {keepalive,
        mk(integer(),
           #{ desc => <<"keepalive time, with the unit of second">>})}

+ 85 - 41
apps/emqx_gateway/src/emqx_gateway_cli.erl

@@ -53,23 +53,16 @@ is_cmd(Fun) ->
 
 gateway(["list"]) ->
     lists:foreach(
-      fun (#{name := Name, status := unloaded}) ->
-            print("Gateway(name=~ts, status=unloaded)\n", [Name]);
-          (#{name := Name, status := stopped, stopped_at := StoppedAt}) ->
-            print("Gateway(name=~ts, status=stopped, stopped_at=~ts)\n",
-                  [Name, StoppedAt]);
-          (#{name := Name, status := running, current_connections := ConnCnt,
-             started_at := StartedAt}) ->
-            print("Gateway(name=~ts, status=running, clients=~w, started_at=~ts)\n",
-                  [Name, ConnCnt, StartedAt])
+    fun (GwSummary) ->
+        print(format_gw_summary(GwSummary))
     end, emqx_gateway_http:gateways(all));
 
 gateway(["lookup", Name]) ->
     case emqx_gateway:lookup(atom(Name)) of
         undefined ->
             print("undefined\n");
-        Info ->
-            print("~p\n", [Info])
+        Gateway ->
+            print(format_gateway(Gateway))
     end;
 
 gateway(["load", Name, Conf]) ->
@@ -80,7 +73,7 @@ gateway(["load", Name, Conf]) ->
         {ok, _} ->
             print("ok\n");
         {error, Reason} ->
-            print("Error: ~p\n", [Reason])
+            print("Error: ~ts\n", [format_error(Reason)])
     end;
 
 gateway(["unload", Name]) ->
@@ -88,7 +81,7 @@ gateway(["unload", Name]) ->
         ok ->
             print("ok\n");
         {error, Reason} ->
-            print("Error: ~p\n", [Reason])
+            print("Error: ~ts\n", [format_error(Reason)])
     end;
 
 gateway(["stop", Name]) ->
@@ -99,7 +92,7 @@ gateway(["stop", Name]) ->
         {ok, _} ->
             print("ok\n");
         {error, Reason} ->
-            print("Error: ~p\n", [Reason])
+            print("Error: ~ts\n", [format_error(Reason)])
     end;
 
 gateway(["start", Name]) ->
@@ -110,23 +103,24 @@ gateway(["start", Name]) ->
         {ok, _} ->
             print("ok\n");
         {error, Reason} ->
-            print("Error: ~p\n", [Reason])
+            print("Error: ~ts\n", [format_error(Reason)])
     end;
 
 gateway(_) ->
-    emqx_ctl:usage([ {"gateway list",
-                        "List all gateway"}
-                   , {"gateway lookup <Name>",
-                        "Lookup a gateway detailed informations"}
-                   , {"gateway load   <Name> <JsonConf>",
-                        "Load a gateway with config"}
-                   , {"gateway unload <Name>",
-                        "Unload the gateway"}
-                   , {"gateway stop   <Name>",
-                        "Stop the gateway"}
-                   , {"gateway start  <Name>",
-                        "Start the gateway"}
-                   ]).
+    emqx_ctl:usage(
+      [ {"gateway list",
+           "List all gateway"}
+      , {"gateway lookup <Name>",
+           "Lookup a gateway detailed informations"}
+      , {"gateway load   <Name> <JsonConf>",
+           "Load a gateway with config"}
+      , {"gateway unload <Name>",
+           "Unload the gateway"}
+      , {"gateway stop   <Name>",
+           "Stop the gateway"}
+      , {"gateway start  <Name>",
+           "Start the gateway"}
+      ]).
 
 'gateway-registry'(["list"]) ->
     lists:foreach(
@@ -141,7 +135,7 @@ gateway(_) ->
                    ]).
 
 'gateway-clients'(["list", Name]) ->
-    %% FIXME: page me?
+    %% XXX: page me?
     InfoTab = emqx_gateway_cm:tabname(info, Name),
     case ets:info(InfoTab) of
         undefined ->
@@ -152,12 +146,17 @@ gateway(_) ->
 
 'gateway-clients'(["lookup", Name, ClientId]) ->
     ChanTab = emqx_gateway_cm:tabname(chan, Name),
-    case ets:lookup(ChanTab, bin(ClientId)) of
-        [] -> print("Not Found.\n");
-        [Chann] ->
-            InfoTab = emqx_gateway_cm:tabname(info, Name),
-            [ChannInfo] = ets:lookup(InfoTab, Chann),
-            print_record({client, ChannInfo})
+    case ets:info(ChanTab) of
+        undefined ->
+            print("Bad Gateway Name.\n");
+        _ ->
+            case ets:lookup(ChanTab, bin(ClientId)) of
+                [] -> print("Not Found.\n");
+                [Chann] ->
+                    InfoTab = emqx_gateway_cm:tabname(info, Name),
+                    [ChannInfo] = ets:lookup(InfoTab, Chann),
+                    print_record({client, ChannInfo})
+            end
     end;
 
 'gateway-clients'(["kick", Name, ClientId]) ->
@@ -176,15 +175,13 @@ gateway(_) ->
                    ]).
 
 'gateway-metrics'([Name]) ->
-    Tab = emqx_gateway_metrics:tabname(Name),
-    case ets:info(Tab) of
+    case emqx_gateway_metrics:lookup(atom(Name)) of
         undefined ->
             print("Bad Gateway Name.\n");
-        _ ->
+        Metrics ->
             lists:foreach(
-              fun({K, V}) ->
-                print("~-30s: ~w\n", [K, V])
-              end, lists:sort(ets:tab2list(Tab)))
+              fun({K, V}) -> print("~-30s: ~w\n", [K, V]) end,
+              Metrics)
     end;
 
 'gateway-metrics'(_) ->
@@ -255,3 +252,50 @@ format(peername, {IPAddr, Port}) ->
 
 format(_, Val) ->
     Val.
+
+format_gw_summary(#{name := Name, status := unloaded}) ->
+    io_lib:format("Gateway(name=~ts, status=unloaded)\n", [Name]);
+
+format_gw_summary(#{name := Name, status := stopped,
+                    stopped_at := StoppedAt}) ->
+    io_lib:format("Gateway(name=~ts, status=stopped, stopped_at=~ts)\n",
+                  [Name, StoppedAt]);
+format_gw_summary(#{name := Name, status := running,
+                    current_connections := ConnCnt,
+                    started_at := StartedAt}) ->
+    io_lib:format("Gateway(name=~ts, status=running, clients=~w, "
+                  "started_at=~ts)\n", [Name, ConnCnt, StartedAt]).
+
+format_gateway(#{name := Name,
+                 status := unloaded}) ->
+    io_lib:format(
+        "name: ~ts\n"
+        "status: unloaded\n", [Name]);
+
+format_gateway(Gw =
+               #{name := Name,
+                 status := Status,
+                 created_at := CreatedAt,
+                 config := Config
+                }) ->
+    {StopOrStart, Timestamp} =
+        case Status of
+            stopped -> {stopped_at, maps:get(stopped_at, Gw)};
+            running -> {started_at, maps:get(started_at, Gw)}
+        end,
+    io_lib:format(
+        "name: ~ts\n"
+        "status: ~ts\n"
+        "created_at: ~ts\n"
+        "~ts: ~ts\n"
+        "config: ~p\n",
+        [Name, Status,
+         emqx_gateway_utils:unix_ts_to_rfc3339(CreatedAt),
+         StopOrStart, emqx_gateway_utils:unix_ts_to_rfc3339(Timestamp),
+         Config]).
+
+format_error(Reason) ->
+    case emqx_gateway_http:reason2msg(Reason) of
+        error -> io_lib:format("~p", [Reason]);
+        Msg -> Msg
+    end.

+ 13 - 15
apps/emqx_gateway/src/emqx_gateway_cm.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% @doc The Gateway Connection-Manager
+%% @doc The Gateway Channel Manager
 %%
 %% For a certain type of protocol, this is a single instance of the manager.
 %% It means that no matter how many instances of the stomp gateway are created,
@@ -26,7 +26,6 @@
 -include("include/emqx_gateway.hrl").
 -include_lib("emqx/include/logger.hrl").
 
-
 %% APIs
 -export([start_link/1]).
 
@@ -74,6 +73,8 @@
 -type option() :: {gwname, gateway_name()}.
 -type options() :: list(option()).
 
+-define(T_KICK, 5000).
+-define(T_GET_INFO, 5000).
 -define(T_TAKEOVER, 15000).
 -define(DEFAULT_BATCH_SIZE, 10000).
 
@@ -94,9 +95,9 @@ procname(GwName) ->
         ConnTab :: atom(),
         ChannInfoTab :: atom()}.
 cmtabs(GwName) ->
-    { tabname(chan, GwName)   %% Client Tabname; Record: {ClientId, Pid}
-    , tabname(conn, GwName)   %% Client ConnMod; Recrod: {{ClientId, Pid}, ConnMod}
-    , tabname(info, GwName)   %% ClientInfo Tabname; Record: {{ClientId, Pid}, ClientInfo, ClientStats}
+    { tabname(chan, GwName)   %% Record: {ClientId, Pid}
+    , tabname(conn, GwName)   %% Recrod: {{ClientId, Pid}, ConnMod}
+    , tabname(info, GwName)   %% Record: {{ClientId, Pid}, Info, Stats}
     }.
 
 tabname(chan, GwName) ->
@@ -134,7 +135,6 @@ unregister_channel(GwName, ClientId) when is_binary(ClientId) ->
 insert_channel_info(GwName, ClientId, Info, Stats) ->
     Chan = {ClientId, self()},
     true = ets:insert(tabname(info, GwName), {Chan, Info, Stats}),
-    %%?tp(debug, insert_channel_info, #{client_id => ClientId}),
     ok.
 
 %% @doc Get info of a channel.
@@ -207,7 +207,8 @@ set_chan_stats(GwName, ClientId, Stats) ->
                      emqx_types:clientid(),
                      pid(),
                      emqx_types:stats()) -> boolean().
-set_chan_stats(GwName, ClientId, ChanPid, Stats)  when node(ChanPid) == node() ->
+set_chan_stats(GwName, ClientId, ChanPid, Stats)
+  when node(ChanPid) == node() ->
     Chan = {ClientId, self()},
     try ets:update_element(tabname(info, GwName), Chan, {3, Stats})
     catch
@@ -232,7 +233,7 @@ connection_closed(GwName, ClientId) ->
     -> {ok, #{session := Session,
               present := boolean(),
               pendings => list()
-          }}
+             }}
      | {error, any()}.
 
 open_session(GwName, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
@@ -256,7 +257,7 @@ open_session(GwName, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun,
 
 open_session(_Type, false = _CleanStart,
              _ClientInfo, _ConnInfo, _CreateSessionFun, _SessionMod) ->
-    %% TODO:
+    %% TODO: The session takeover logic will be implemented on 0.9?
     {error, not_supported_now}.
 
 %% @private
@@ -305,17 +306,12 @@ do_discard_session(GwName, ClientId, Pid) ->
         discard_session(GwName, ClientId, Pid)
     catch
         _ : noproc -> % emqx_ws_connection: call
-            %?tp(debug, "session_already_gone", #{pid => Pid}),
             ok;
         _ : {noproc, _} -> % emqx_connection: gen_server:call
-            %?tp(debug, "session_already_gone", #{pid => Pid}),
             ok;
         _ : {{shutdown, _}, _} ->
-            %?tp(debug, "session_already_shutdown", #{pid => Pid}),
             ok;
         _ : _Error : _St ->
-            %?tp(error, "failed_to_discard_session",
-            %    #{pid => Pid, reason => Error, stacktrace=>St})
             ok
     end.
 
@@ -464,7 +460,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
 handle_info(_Info, State) ->
     {noreply, State}.
 
-terminate(_Reason, _State) ->
+terminate(_Reason, #state{registry = Registry, locker = Locker}) ->
+    _ = gen_server:stop(Registry),
+    _ = ekka_locker:stop(Locker),
     ok.
 
 code_change(_OldVsn, State, _Extra) ->

+ 38 - 29
apps/emqx_gateway/src/emqx_gateway_cm_registry.erl

@@ -17,6 +17,8 @@
 %% @doc The gateway connection registry
 -module(emqx_gateway_cm_registry).
 
+-include("include/emqx_gateway.hrl").
+
 -behaviour(gen_server).
 
 -export([start_link/1]).
@@ -27,6 +29,8 @@
 
 -export([lookup_channels/2]).
 
+-export([tabname/1]).
+
 %% gen_server callbacks
 -export([ init/1
         , handle_call/3
@@ -41,39 +45,43 @@
 
 -record(channel, {chid, pid}).
 
-%% @doc Start the global channel registry.
--spec(start_link(atom()) -> gen_server:startlink_ret()).
-start_link(Type) ->
-    gen_server:start_link(?MODULE, [Type], []).
+%% @doc Start the global channel registry for the gived gateway name.
+-spec(start_link(gateway_name()) -> gen_server:startlink_ret()).
+start_link(Name) ->
+    gen_server:start_link(?MODULE, [Name], []).
 
--spec tabname(atom()) -> atom().
-tabname(Type) ->
-    list_to_atom(lists:concat([emqx_gateway_, Type, '_channel_registry'])).
+-spec tabname(gateway_name()) -> atom().
+tabname(Name) ->
+    %% XXX: unsafe ??
+    list_to_atom(lists:concat([emqx_gateway_, Name, '_channel_registry'])).
 
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
 
 %% @doc Register a global channel.
--spec register_channel(atom(), binary() | {binary(), pid()}) -> ok.
-register_channel(Type, ClientId) when is_binary(ClientId) ->
-    register_channel(Type, {ClientId, self()});
+-spec register_channel(gateway_name(), binary() | {binary(), pid()}) -> ok.
+register_channel(Name, ClientId) when is_binary(ClientId) ->
+    register_channel(Name, {ClientId, self()});
 
-register_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
-    mria:dirty_write(tabname(Type), record(ClientId, ChanPid)).
+register_channel(Name, {ClientId, ChanPid}) 
+  when is_binary(ClientId), is_pid(ChanPid) ->
+    mria:dirty_write(tabname(Name), record(ClientId, ChanPid)).
 
 %% @doc Unregister a global channel.
--spec unregister_channel(atom(), binary() | {binary(), pid()}) -> ok.
-unregister_channel(Type, ClientId) when is_binary(ClientId) ->
-    unregister_channel(Type, {ClientId, self()});
+-spec unregister_channel(gateway_name(), binary() | {binary(), pid()}) -> ok.
+unregister_channel(Name, ClientId) when is_binary(ClientId) ->
+    unregister_channel(Name, {ClientId, self()});
 
-unregister_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
-    mria:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)).
+unregister_channel(Name, {ClientId, ChanPid})
+  when is_binary(ClientId), is_pid(ChanPid) ->
+    mria:dirty_delete_object(tabname(Name), record(ClientId, ChanPid)).
 
 %% @doc Lookup the global channels.
--spec lookup_channels(atom(), binary()) -> list(pid()).
-lookup_channels(Type, ClientId) ->
-    [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Type), ClientId)].
+-spec lookup_channels(gateway_name(), binary()) -> list(pid()).
+lookup_channels(Name, ClientId) ->
+    [ChanPid
+     || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Name), ClientId)].
 
 record(ClientId, ChanPid) ->
     #channel{chid = ClientId, pid = ChanPid}.
@@ -82,8 +90,8 @@ record(ClientId, ChanPid) ->
 %% gen_server callbacks
 %%--------------------------------------------------------------------
 
-init([Type]) ->
-    Tab = tabname(Type),
+init([Name]) ->
+    Tab = tabname(Name),
     ok = mria:create_table(Tab, [
                 {type, bag},
                 {rlog_shard, ?CM_SHARD},
@@ -94,7 +102,7 @@ init([Type]) ->
                                              {write_concurrency, true}]}]}]),
     ok = mria:wait_for_tables([Tab]),
     ok = ekka:monitor(membership),
-    {ok, #{type => Type}}.
+    {ok, #{name => Name}}.
 
 handle_call(Req, _From, State) ->
     logger:error("Unexpected call: ~p", [Req]),
@@ -104,12 +112,13 @@ handle_cast(Msg, State) ->
     logger:error("Unexpected cast: ~p", [Msg]),
     {noreply, State}.
 
-handle_info({membership, {mnesia, down, Node}}, State = #{type := Type}) ->
-    Tab = tabname(Type),
-    global:trans({?LOCK, self()},
-                 fun() ->
-                     mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab])
-                 end),
+handle_info({membership, {mnesia, down, Node}}, State = #{name := Name}) ->
+    Tab = tabname(Name),
+    global:trans(
+      {?LOCK, self()},
+      fun() ->
+        mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab])
+      end),
     {noreply, State};
 
 handle_info({membership, _Event}, State) ->

+ 1 - 0
apps/emqx_gateway/src/emqx_gateway_conf.erl

@@ -93,6 +93,7 @@ load_gateway(GwName, Conf) ->
 %% @doc convert listener array to map
 unconvert_listeners(Ls) when is_list(Ls) ->
     lists:foldl(fun(Lis, Acc) ->
+        %% FIXME: params apperence guard?
         {[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis),
         NLis1 = maps:without([<<"id">>], Lis1),
         emqx_map_lib:deep_merge(Acc, #{Type => #{Name => NLis1}})

+ 4 - 12
apps/emqx_gateway/src/emqx_gateway_ctx.erl

@@ -30,7 +30,7 @@
         #{ %% Gateway Name
            gwname := gateway_name()
            %% Authentication chains
-         , auth   := [emqx_authentication:chain_name()] | undefined
+         , auth   := [emqx_authentication:chain_name()]
            %% The ConnectionManager PID
          , cm     := pid()
          }.
@@ -64,18 +64,15 @@
 -spec authenticate(context(), emqx_types:clientinfo())
     -> {ok, emqx_types:clientinfo()}
      | {error, any()}.
-authenticate(_Ctx = #{auth := undefined}, ClientInfo) ->
-    {ok, mountpoint(ClientInfo)};
-authenticate(_Ctx = #{auth := _ChainName}, ClientInfo0) ->
+authenticate(_Ctx = #{auth := _ChainNames}, ClientInfo0)
+  when is_list(_ChainNames) ->
     ClientInfo = ClientInfo0#{zone => default},
     case emqx_access_control:authenticate(ClientInfo) of
         {ok, _} ->
             {ok, mountpoint(ClientInfo)};
         {error, Reason} ->
             {error, Reason}
-    end;
-authenticate(_Ctx, ClientInfo) ->
-    {ok, mountpoint(ClientInfo)}.
+    end.
 
 %% @doc Register the session to the cluster.
 %%
@@ -95,11 +92,6 @@ open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
     open_session(Ctx, CleanStart, ClientInfo, ConnInfo,
                  CreateSessionFun, emqx_session).
 
-open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
-    logger:warning("clean_start=false is not supported now, "
-                   "fallback to clean_start mode"),
-    open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun, SessionMod);
-
 open_session(_Ctx = #{gwname := GwName},
              CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
     emqx_gateway_cm:open_session(GwName, CleanStart,

+ 0 - 1
apps/emqx_gateway/src/emqx_gateway_gw_sup.erl

@@ -51,7 +51,6 @@ create_insta(Sup, Gateway = #{name := Name}, GwDscrptr) ->
         {ok, _GwInstaPid} -> {error, alredy_existed};
         false ->
             Ctx = ctx(Sup, Name),
-            %%
             ChildSpec = emqx_gateway_utils:childspec(
                           Name,
                           worker,

+ 61 - 52
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -62,12 +62,15 @@
         , with_listener_authn/3
         , checks/2
         , reason2resp/1
+        , reason2msg/1
         ]).
 
 -type gateway_summary() ::
         #{ name := binary()
          , status := running | stopped | unloaded
+         , created_at => binary()
          , started_at => binary()
+         , stopped_at => binary()
          , max_connections => integer()
          , current_connections => integer()
          , listeners => []
@@ -76,7 +79,6 @@
 -elvis([{elvis_style, god_modules, disable}]).
 -elvis([{elvis_style, no_nested_try_catch, disable}]).
 
-
 -define(DEFAULT_CALL_TIMEOUT, 15000).
 
 %%--------------------------------------------------------------------
@@ -317,57 +319,13 @@ with_channel(GwName, ClientId, Fun) ->
 %%--------------------------------------------------------------------
 
 -spec reason2resp({atom(), map()} | any()) -> binary() | any().
-reason2resp({badconf, #{key := Key, value := Value, reason := Reason}}) ->
-    fmt400err("Bad config value '~s' for '~s', reason: ~s",
-              [Value, Key, Reason]);
-reason2resp({badres, #{resource := gateway,
-                       gateway := GwName,
-                       reason := not_found}}) ->
-    fmt400err("The ~s gateway is unloaded", [GwName]);
-
-reason2resp({badres, #{resource := gateway,
-                       gateway := GwName,
-                       reason := already_exist}}) ->
-    fmt400err("The ~s gateway has loaded", [GwName]);
-
-reason2resp({badres, #{resource := listener,
-                       listener := {GwName, LType, LName},
-                       reason := not_found}}) ->
-    fmt400err("Listener ~s not found",
-              [listener_id(GwName, LType, LName)]);
-
-reason2resp({badres, #{resource := listener,
-                       listener := {GwName, LType, LName},
-                       reason := already_exist}}) ->
-    fmt400err("The listener ~s of ~s already exist",
-              [listener_id(GwName, LType, LName), GwName]);
-
-reason2resp({badres, #{resource := authn,
-                       gateway := GwName,
-                       reason := not_found}}) ->
-    fmt400err("The authentication not found on ~s", [GwName]);
-
-reason2resp({badres, #{resource := authn,
-                       gateway := GwName,
-                       reason := already_exist}}) ->
-    fmt400err("The authentication already exist on ~s", [GwName]);
-
-reason2resp({badres, #{resource := listener_authn,
-                       listener := {GwName, LType, LName},
-                       reason := not_found}}) ->
-    fmt400err("The authentication not found on ~s",
-              [listener_id(GwName, LType, LName)]);
-
-reason2resp({badres, #{resource := listener_authn,
-                       listener := {GwName, LType, LName},
-                       reason := already_exist}}) ->
-    fmt400err("The authentication already exist on ~s",
-              [listener_id(GwName, LType, LName)]);
-
-reason2resp(R) -> return_http_error(500, R).
-
-fmt400err(Fmt, Args) ->
-    return_http_error(400, io_lib:format(Fmt, Args)).
+reason2resp(R) ->
+    case reason2msg(R) of
+        error ->
+            return_http_error(500, R);
+        Msg ->
+            return_http_error(400, Msg)
+    end.
 
 -spec return_http_error(integer(), any()) -> {integer(), binary()}.
 return_http_error(Code, Msg) ->
@@ -377,6 +335,54 @@ return_http_error(Code, Msg) ->
               })
     }.
 
+-spec reason2msg({atom(), map()} | any()) -> error | string().
+reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) ->
+    fmtstr("Bad config value '~s' for '~s', reason: ~s", [Value, Key, Reason]);
+reason2msg({badres, #{resource := gateway,
+                      gateway := GwName,
+                      reason := not_found}}) ->
+    fmtstr("The ~s gateway is unloaded", [GwName]);
+
+reason2msg({badres, #{resource := gateway,
+                      gateway := GwName,
+                      reason := already_exist}}) ->
+    fmtstr("The ~s gateway already loaded", [GwName]);
+
+reason2msg({badres, #{resource := listener,
+                      listener := {GwName, LType, LName},
+                      reason := not_found}}) ->
+    fmtstr("Listener ~s not found", [listener_id(GwName, LType, LName)]);
+
+reason2msg({badres, #{resource := listener,
+                      listener := {GwName, LType, LName},
+                      reason := already_exist}}) ->
+    fmtstr("The listener ~s of ~s already exist",
+           [listener_id(GwName, LType, LName), GwName]);
+
+reason2msg({badres, #{resource := authn,
+                      gateway := GwName,
+                      reason := not_found}}) ->
+    fmtstr("The authentication not found on ~s", [GwName]);
+
+reason2msg({badres, #{resource := authn,
+                      gateway := GwName,
+                      reason := already_exist}}) ->
+    fmtstr("The authentication already exist on ~s", [GwName]);
+
+reason2msg({badres, #{resource := listener_authn,
+                      listener := {GwName, LType, LName},
+                      reason := not_found}}) ->
+    fmtstr("The authentication not found on ~s",
+           [listener_id(GwName, LType, LName)]);
+
+reason2msg({badres, #{resource := listener_authn,
+                      listener := {GwName, LType, LName},
+                      reason := already_exist}}) ->
+    fmtstr("The authentication already exist on ~s",
+           [listener_id(GwName, LType, LName)]);
+reason2msg(_) ->
+    error.
+
 codestr(400) -> 'BAD_REQUEST';
 codestr(401) -> 'NOT_SUPPORTED_NOW';
 codestr(404) -> 'RESOURCE_NOT_FOUND';
@@ -384,6 +390,9 @@ codestr(405) -> 'METHOD_NOT_ALLOWED';
 codestr(500) -> 'UNKNOW_ERROR';
 codestr(501) -> 'NOT_IMPLEMENTED'.
 
+fmtstr(Fmt, Args) ->
+    lists:flatten(io_lib:format(Fmt, Args)).
+
 -spec with_authn(binary(), function()) -> any().
 with_authn(GwName0, Fun) ->
     with_gateway(GwName0, fun(GwName, _GwConf) ->

+ 2 - 5
apps/emqx_gateway/src/emqx_gateway_insta_sup.erl

@@ -122,7 +122,6 @@ handle_call(info, _From, State) ->
     {reply, detailed_gateway_info(State), State};
 
 handle_call(disable, _From, State = #state{status = Status}) ->
-    %% XXX: The `disable` opertaion is not persist to config database
     case Status of
         running ->
             case cb_gateway_unload(State) of
@@ -308,8 +307,7 @@ do_update_one_by_one(NCfg, State = #state{
                                       name = GwName,
                                       config = OCfg,
                                       status = Status}) ->
-    OEnable = maps:get(enable, OCfg, true),
-    NEnable = maps:get(enable, NCfg, OEnable),
+    NEnable = maps:get(enable, NCfg, true),
 
     OAuths = authns(GwName, OCfg),
     NAuths = authns(GwName, NCfg),
@@ -329,7 +327,7 @@ do_update_one_by_one(NCfg, State = #state{
                              AuthnNames = init_authn(State#state.name, NCfg),
                              State#state{authns = AuthnNames}
                      end,
-            %% XXX: minimum impact update ???
+            %% TODO: minimum impact update ???
             cb_gateway_update(NCfg, NState);
         {running, false} ->
             case cb_gateway_unload(State) of
@@ -414,7 +412,6 @@ cb_gateway_update(Config,
         case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of
             {error, Reason} -> {error, Reason};
             {ok, ChildPidOrSpecs, NGwState} ->
-                %% XXX: Hot-upgrade ???
                 ChildPids = start_child_process(ChildPidOrSpecs),
                 {ok, State#state{
                        config = Config,

+ 12 - 1
apps/emqx_gateway/src/emqx_gateway_metrics.erl

@@ -20,7 +20,6 @@
 
 -include_lib("emqx_gateway/include/emqx_gateway.hrl").
 
-
 %% APIs
 -export([start_link/1]).
 
@@ -30,6 +29,8 @@
         , dec/3
         ]).
 
+-export([lookup/1]).
+
 %% gen_server callbacks
 -export([ init/1
         , handle_call/3
@@ -67,6 +68,16 @@ dec(GwName, Name) ->
 dec(GwName, Name, Oct) ->
     inc(GwName, Name, -Oct).
 
+-spec lookup(gateway_name())
+    -> undefined
+     | [{Name :: atom(), integer()}].
+lookup(GwName) ->
+    Tab = emqx_gateway_metrics:tabname(GwName),
+    case ets:info(Tab) of
+        undefined -> undefined;
+        _ -> lists:sort(ets:tab2list(Tab))
+    end.
+
 tabname(GwName) ->
     list_to_atom(lists:concat([emqx_gateway_, GwName, '_metrics'])).
 

+ 2 - 1
apps/emqx_gateway/src/emqx_gateway_sup.erl

@@ -59,7 +59,8 @@ load_gateway(Gateway = #{name := GwName}) ->
 unload_gateway(GwName) ->
     case lists:keyfind(GwName, 1, supervisor:which_children(?MODULE)) of
         false -> {error, not_found};
-        _ ->
+        {_Id, Pid, _Type, _Mods} ->
+            _ = emqx_gateway_gw_sup:remove_insta(Pid, GwName),
             _ = supervisor:terminate_child(?MODULE, GwName),
             _ = supervisor:delete_child(?MODULE, GwName),
             ok

+ 2 - 3
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -291,9 +291,8 @@ is_running(ListenerId, #{<<"bind">> := ListenOn0}) ->
     end.
 
 %% same with emqx_authentication:global_chain/1
-global_chain(mqtt) ->
-    'mqtt:global';
-global_chain('mqtt-sn') ->
+-spec global_chain(GatewayName :: atom()) -> atom().
+global_chain('mqttsn') ->
     'mqtt-sn:global';
 global_chain(coap) ->
     'coap:global';

+ 1 - 1
apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl

@@ -451,7 +451,7 @@ do_subscribe(TopicFilter, SubOpts, Channel =
                       subscriptions = Subs}) ->
     %% Mountpoint first
     NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter),
-    NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts),
+    NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts),
     SubId = maps:get(clientid, ClientInfo, undefined),
     %% XXX: is_new?
     IsNew = not maps:is_key(NTopicFilter, Subs),

+ 1 - 1
apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl

@@ -931,7 +931,7 @@ do_subscribe({TopicId, TopicName, SubOpts},
                           clientinfo = ClientInfo
                                      = #{mountpoint := Mountpoint}}) ->
     NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
-    NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts),
+    NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts),
     case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of
         {ok, NSession} ->
             {ok, {TopicId, NTopicName, NSubOpts},

+ 10 - 2
apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl

@@ -134,8 +134,10 @@ g(Key, Opts, Val) ->
 parse(<<>>, Parser) ->
     {more, Parser};
 
-parse(Bytes, #{phase := body, len := Len, state := State}) ->
+parse(Bytes, #{phase := body, length := Len, state := State}) ->
     parse(body, Bytes, State, Len);
+parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none ->
+    parse(Phase, Bytes, State);
 
 parse(Bytes, Parser = #{pre := Pre}) ->
     parse(<<Pre/binary, Bytes/binary>>, maps:without([pre], Parser));
@@ -162,6 +164,8 @@ parse(command, <<?LF, Rest/binary>>, State = #parser_state{acc = Acc}) ->
     parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
 parse(command, <<Ch:8, Rest/binary>>, State) ->
     parse(command, Rest, acc(Ch, State));
+parse(command, <<>>, State) ->
+    {more, #{phase => command, state => State}};
 
 parse(headers, <<?LF, Rest/binary>>, State) ->
     parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
@@ -174,6 +178,8 @@ parse(hdname, <<?COLON, Rest/binary>>, State = #parser_state{acc = Acc}) ->
     parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
 parse(hdname, <<Ch:8, Rest/binary>>, State) ->
     parse(hdname, Rest, acc(Ch, State));
+parse(hdname, <<>>, State) ->
+    {more, #{phase => hdname, state => State}};
 
 parse(hdvalue, <<?LF, Rest/binary>>,
       State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
@@ -183,7 +189,9 @@ parse(hdvalue, <<?LF, Rest/binary>>,
                                },
     parse(headers, Rest, NState);
 parse(hdvalue, <<Ch:8, Rest/binary>>, State) ->
-    parse(hdvalue, Rest, acc(Ch, State)).
+    parse(hdvalue, Rest, acc(Ch, State));
+parse(hdvalue, <<>>, State) ->
+    {more, #{phase => hdvalue, state => State}}.
 
 %% @private
 parse(body, <<>>, State, Length) ->

+ 182 - 121
apps/emqx_gateway/test/emqx_coap_SUITE.erl

@@ -19,6 +19,11 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_gateway_test_utils,
+        [ request/2
+        , request/3
+        ]).
+
 -include_lib("er_coap_client/include/coap.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
@@ -48,114 +53,115 @@ gateway.coap
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_gateway], fun set_special_cfg/1),
+    ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
     Config.
 
-set_special_cfg(emqx_gateway) ->
-    ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT);
-
-set_special_cfg(_) ->
-    ok.
-
-end_per_suite(Config) ->
+end_per_suite(_) ->
     {ok, _} = emqx:remove_config([<<"gateway">>,<<"coap">>]),
-    emqx_common_test_helpers:stop_apps([emqx_gateway]),
-    Config.
+    emqx_mgmt_api_test_util:end_suite([emqx_gateway]).
 
 %%--------------------------------------------------------------------
 %% Test Cases
 %%--------------------------------------------------------------------
-t_connection(_Config) ->
+
+t_connection(_) ->
     Action = fun(Channel) ->
-                     %% connection
-                     Token = connection(Channel),
+        %% connection
+        Token = connection(Channel),
 
-                     timer:sleep(100),
-                     ?assertNotEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)),
+        timer:sleep(100),
+        ?assertNotEqual(
+           [],
+           emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)),
 
-                     %% heartbeat
-                     HeartURI = ?MQTT_PREFIX ++ "/connection?clientid=client1&token=" ++ Token,
-                     ?LOGT("send heartbeat request:~ts~n", [HeartURI]),
-                     {ok, changed, _} = er_coap_client:request(put, HeartURI),
+        %% heartbeat
+        HeartURI = ?MQTT_PREFIX ++
+                   "/connection?clientid=client1&token=" ++
+                   Token,
 
-                     disconnection(Channel, Token),
+        ?LOGT("send heartbeat request:~ts~n", [HeartURI]),
+        {ok, changed, _} = er_coap_client:request(put, HeartURI),
 
-                     timer:sleep(100),
-                     ?assertEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>))
-             end,
-    do(Action).
+        disconnection(Channel, Token),
 
+        timer:sleep(100),
+        ?assertEqual(
+           [],
+           emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>))
+    end,
+    do(Action).
 
-t_publish(_Config) ->
+t_publish(_) ->
     Action = fun(Channel, Token) ->
-                     Topic = <<"/abc">>,
-                     Payload = <<"123">>,
-
-                     TopicStr = binary_to_list(Topic),
-                     URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
-
-                     %% Sub topic first
-                     emqx:subscribe(Topic),
-
-                     Req = make_req(post, Payload),
-                     {ok, changed, _} = do_request(Channel, URI, Req),
-
-                     receive
-                         {deliver, Topic, Msg} ->
-                             ?assertEqual(Topic, Msg#message.topic),
-                             ?assertEqual(Payload, Msg#message.payload)
-                     after
-                         500 ->
-                             ?assert(false)
-                     end
-             end,
-
+        Topic = <<"/abc">>,
+        Payload = <<"123">>,
+
+        TopicStr = binary_to_list(Topic),
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+
+        %% Sub topic first
+        emqx:subscribe(Topic),
+
+        Req = make_req(post, Payload),
+        {ok, changed, _} = do_request(Channel, URI, Req),
+
+        receive
+            {deliver, Topic, Msg} ->
+                ?assertEqual(Topic, Msg#message.topic),
+                ?assertEqual(Payload, Msg#message.payload)
+        after
+            500 ->
+                ?assert(false)
+        end
+    end,
     with_connection(Action).
 
-
-%t_publish_authz_deny(_Config) ->
+%t_publish_authz_deny(_) ->
 %    Action = fun(Channel, Token) ->
-%                     Topic = <<"/abc">>,
-%                     Payload = <<"123">>,
-%                     InvalidToken = lists:reverse(Token),
+%        Topic = <<"/abc">>,
+%        Payload = <<"123">>,
+%        InvalidToken = lists:reverse(Token),
 %
-%                     TopicStr = binary_to_list(Topic),
-%                     URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ InvalidToken,
+%        TopicStr = binary_to_list(Topic),
+%        URI = ?PS_PREFIX ++
+%              TopicStr ++
+%              "?clientid=client1&token=" ++ InvalidToken,
 %
-%                     %% Sub topic first
-%                     emqx:subscribe(Topic),
+%        %% Sub topic first
+%        emqx:subscribe(Topic),
 %
-%                     Req = make_req(post, Payload),
-%                     Result = do_request(Channel, URI, Req),
-%                     ?assertEqual({error, reset}, Result)
-%             end,
+%        Req = make_req(post, Payload),
+%        Result = do_request(Channel, URI, Req),
+%        ?assertEqual({error, reset}, Result)
+%    end,
 %
 %    with_connection(Action).
 
-t_subscribe(_Config) ->
+t_subscribe(_) ->
     Topic = <<"/abc">>,
     Fun = fun(Channel, Token) ->
-                  TopicStr = binary_to_list(Topic),
-                  Payload = <<"123">>,
+        TopicStr = binary_to_list(Topic),
+        Payload = <<"123">>,
 
-                  URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
-                  Req = make_req(get, Payload, [{observe, 0}]),
-                  {ok, content, _} = do_request(Channel, URI, Req),
-                  ?LOGT("observer topic:~ts~n", [Topic]),
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        Req = make_req(get, Payload, [{observe, 0}]),
+        {ok, content, _} = do_request(Channel, URI, Req),
+        ?LOGT("observer topic:~ts~n", [Topic]),
 
-                  timer:sleep(100),
-                  [SubPid] = emqx:subscribers(Topic),
-                  ?assert(is_pid(SubPid)),
+        timer:sleep(100),
+        [SubPid] = emqx:subscribers(Topic),
+        ?assert(is_pid(SubPid)),
 
-                  %% Publish a message
-                  emqx:publish(emqx_message:make(Topic, Payload)),
-                  {ok, content, Notify} = with_response(Channel),
-                  ?LOGT("observer get Notif=~p", [Notify]),
+        %% Publish a message
+        emqx:publish(emqx_message:make(Topic, Payload)),
+        {ok, content, Notify} = with_response(Channel),
+        ?LOGT("observer get Notif=~p", [Notify]),
 
-                  #coap_content{payload = PayloadRecv} = Notify,
+        #coap_content{payload = PayloadRecv} = Notify,
 
-                  ?assertEqual(Payload, PayloadRecv)
-          end,
+        ?assertEqual(Payload, PayloadRecv)
+    end,
 
     with_connection(Fun),
     timer:sleep(100),
@@ -163,63 +169,117 @@ t_subscribe(_Config) ->
     ?assertEqual([], emqx:subscribers(Topic)).
 
 
-t_un_subscribe(_Config) ->
+t_un_subscribe(_) ->
     Topic = <<"/abc">>,
     Fun = fun(Channel, Token) ->
-                  TopicStr = binary_to_list(Topic),
-                  Payload = <<"123">>,
+        TopicStr = binary_to_list(Topic),
+        Payload = <<"123">>,
 
-                  URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
 
-                  Req = make_req(get, Payload, [{observe, 0}]),
-                  {ok, content, _} = do_request(Channel, URI, Req),
-                  ?LOGT("observer topic:~ts~n", [Topic]),
+        Req = make_req(get, Payload, [{observe, 0}]),
+        {ok, content, _} = do_request(Channel, URI, Req),
+        ?LOGT("observer topic:~ts~n", [Topic]),
 
-                  timer:sleep(100),
-                  [SubPid] = emqx:subscribers(Topic),
-                  ?assert(is_pid(SubPid)),
+        timer:sleep(100),
+        [SubPid] = emqx:subscribers(Topic),
+        ?assert(is_pid(SubPid)),
 
-                  UnReq = make_req(get, Payload, [{observe, 1}]),
-                  {ok, nocontent, _} = do_request(Channel, URI, UnReq),
-                  ?LOGT("un observer topic:~ts~n", [Topic]),
-                  timer:sleep(100),
-                  ?assertEqual([], emqx:subscribers(Topic))
-          end,
+        UnReq = make_req(get, Payload, [{observe, 1}]),
+        {ok, nocontent, _} = do_request(Channel, URI, UnReq),
+        ?LOGT("un observer topic:~ts~n", [Topic]),
+        timer:sleep(100),
+        ?assertEqual([], emqx:subscribers(Topic))
+    end,
 
     with_connection(Fun).
 
-t_observe_wildcard(_Config) ->
+t_observe_wildcard(_) ->
     Fun = fun(Channel, Token) ->
-                  %% resolve_url can't process wildcard with #
-                  Topic = <<"/abc/+">>,
-                  TopicStr = binary_to_list(Topic),
-                  Payload = <<"123">>,
+        %% resolve_url can't process wildcard with #
+        Topic = <<"/abc/+">>,
+        TopicStr = binary_to_list(Topic),
+        Payload = <<"123">>,
 
-                  URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
-                  Req = make_req(get, Payload, [{observe, 0}]),
-                  {ok, content, _} = do_request(Channel, URI, Req),
-                  ?LOGT("observer topic:~ts~n", [Topic]),
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        Req = make_req(get, Payload, [{observe, 0}]),
+        {ok, content, _} = do_request(Channel, URI, Req),
+        ?LOGT("observer topic:~ts~n", [Topic]),
 
-                  timer:sleep(100),
-                  [SubPid] = emqx:subscribers(Topic),
-                  ?assert(is_pid(SubPid)),
+        timer:sleep(100),
+        [SubPid] = emqx:subscribers(Topic),
+        ?assert(is_pid(SubPid)),
 
-                  %% Publish a message
-                  PubTopic = <<"/abc/def">>,
-                  emqx:publish(emqx_message:make(PubTopic, Payload)),
-                  {ok, content, Notify} = with_response(Channel),
+        %% Publish a message
+        PubTopic = <<"/abc/def">>,
+        emqx:publish(emqx_message:make(PubTopic, Payload)),
+        {ok, content, Notify} = with_response(Channel),
 
-                  ?LOGT("observer get Notif=~p", [Notify]),
+        ?LOGT("observer get Notif=~p", [Notify]),
 
-                  #coap_content{payload = PayloadRecv} = Notify,
+        #coap_content{payload = PayloadRecv} = Notify,
 
-                  ?assertEqual(Payload, PayloadRecv)
-          end,
+        ?assertEqual(Payload, PayloadRecv)
+    end,
+
+    with_connection(Fun).
 
+t_clients_api(_) ->
+    Fun = fun(_Channel, _Token) ->
+        ClientId = <<"client1">>,
+        %% list
+        {200, #{data := [Client1]}} = request(get, "/gateway/coap/clients"),
+        #{clientid := ClientId} = Client1,
+        %% searching
+        {200, #{data := [Client2]}} =
+            request(get, "/gateway/coap/clients",
+                    [{<<"clientid">>, ClientId}]),
+        {200, #{data := [Client3]}} =
+            request(get, "/gateway/coap/clients",
+                    [{<<"like_clientid">>, <<"cli">>}]),
+        %% lookup
+        {200, Client4} =
+            request(get, "/gateway/coap/clients/client1"),
+        %% assert
+        Client1 = Client2 = Client3 = Client4,
+        %% kickout
+        {204, _} =
+            request(delete, "/gateway/coap/clients/client1"),
+        {200, #{data := []}} = request(get, "/gateway/coap/clients")
+    end,
     with_connection(Fun).
 
+t_clients_subscription_api(_) ->
+    Fun = fun(_Channel, _Token) ->
+        Path = "/gateway/coap/clients/client1/subscriptions",
+        %% list
+        {200, []} = request(get, Path),
+        %% create
+        SubReq = #{ topic => <<"tx">>
+                  , qos => 0
+                  , nl => 0
+                  , rap => 0
+                  , rh => 0
+                  },
+
+        {201, SubsResp} = request(post, Path, SubReq),
+        {200, [SubsResp2]} = request(get, Path),
+        ?assertEqual(
+           maps:get(topic, SubsResp),
+           maps:get(topic, SubsResp2)),
+
+        {204, _} = request(delete, Path ++ "/tx"),
+
+        {200, []} = request(get, Path)
+    end,
+    with_connection(Fun).
+
+%%--------------------------------------------------------------------
+%% helpers
+
 connection(Channel) ->
-    URI = ?MQTT_PREFIX ++ "/connection?clientid=client1&username=admin&password=public",
+    URI = ?MQTT_PREFIX ++
+          "/connection?clientid=client1&username=admin&password=public",
     Req = make_req(post),
     {ok, created, Data} = do_request(Channel, URI, Req),
     #coap_content{payload = BinToken} = Data,
@@ -252,7 +312,8 @@ do_request(Channel, URI, #coap_message{options = Opts} = Req) ->
 
 with_response(Channel) ->
     receive
-        {coap_response, _ChId, Channel, _Ref, Message=#coap_message{method=Code}} ->
+        {coap_response, _ChId, Channel,
+         _Ref, Message=#coap_message{method=Code}} ->
             return_response(Code, Message);
         {coap_error, _ChId, Channel, _Ref, reset} ->
             {error, reset}
@@ -280,10 +341,10 @@ do(Fun) ->
 
 with_connection(Action) ->
     Fun = fun(Channel) ->
-                  Token = connection(Channel),
-                  timer:sleep(100),
-                  Action(Channel, Token),
-                  disconnection(Channel, Token),
-                  timer:sleep(100)
-          end,
+        Token = connection(Channel),
+        timer:sleep(100),
+        Action(Channel, Token),
+        disconnection(Channel, Token),
+        timer:sleep(100)
+    end,
     do(Fun).

+ 100 - 0
apps/emqx_gateway/test/emqx_gateway_SUITE.erl

@@ -0,0 +1,100 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(GWNAME, mqttsn).
+-define(CONF_DEFAULT, <<"gateway {}">>).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    emqx_config:erase(gateway),
+    emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_common_test_helpers:start_apps([emqx_gateway]),
+    Conf.
+
+end_per_suite(_Conf) ->
+    emqx_common_test_helpers:stop_apps([emqx_gateway]).
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_registered_gateway(_) ->
+    [{coap, #{cbkmod := emqx_coap_impl}},
+     {exproto, #{cbkmod := emqx_exproto_impl}},
+     {lwm2m, #{cbkmod := emqx_lwm2m_impl}},
+     {mqttsn, #{cbkmod := emqx_sn_impl}},
+     {stomp, #{cbkmod := emqx_stomp_impl}}] =  emqx_gateway:registered_gateway().
+
+t_load_unload_list_lookup(_) ->
+    {ok, _} = emqx_gateway:load(?GWNAME, #{idle_timeout => 1000}),
+    ?assertEqual(
+       {error, alredy_existed},
+       emqx_gateway:load(?GWNAME, #{})),
+    ?assertEqual(
+       {error, {unknown_gateway_name, bad_gw_name}},
+       emqx_gateway:load(bad_gw_name, #{})),
+
+    ?assertEqual(1, length(emqx_gateway:list())),
+    ?assertEqual(
+       emqx_gateway:lookup(?GWNAME),
+       lists:nth(1, emqx_gateway:list())),
+
+    ?assertEqual(ok, emqx_gateway:unload(?GWNAME)),
+    ?assertEqual({error, not_found}, emqx_gateway:unload(?GWNAME)).
+
+t_start_stop_update(_) ->
+    {ok, _} = emqx_gateway:load(?GWNAME, #{idle_timeout => 1000}),
+
+    #{status := running} = emqx_gateway:lookup(?GWNAME),
+
+    ok = emqx_gateway:stop(?GWNAME),
+    {error, already_stopped} = emqx_gateway:stop(?GWNAME),
+
+    #{status := stopped} = emqx_gateway:lookup(?GWNAME),
+
+    ok = emqx_gateway:update(
+           ?GWNAME, #{enable => false, idle_timeout => 2000}),
+    #{status := stopped,
+      config := #{idle_timeout := 2000}} = emqx_gateway:lookup(?GWNAME),
+
+    ok = emqx_gateway:update(
+           ?GWNAME, #{enable => true, idle_timeout => 3000}),
+    #{status := running,
+      config := #{idle_timeout := 3000}} = emqx_gateway:lookup(?GWNAME),
+
+    ok = emqx_gateway:update(
+           ?GWNAME, #{enable => false, idle_timeout => 4000}),
+    #{status := stopped,
+      config := #{idle_timeout := 4000}} = emqx_gateway:lookup(?GWNAME),
+
+    ok = emqx_gateway:start(?GWNAME),
+    #{status := running,
+      config := #{idle_timeout := 4000}} = emqx_gateway:lookup(?GWNAME),
+
+    {error, already_started} = emqx_gateway:start(?GWNAME),
+    ok.

+ 11 - 9
apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl

@@ -30,9 +30,7 @@
 
 %% this parses to #{}, will not cause config cleanup
 %% so we will need call emqx_config:erase
--define(CONF_DEFAULT, <<"
-gateway {}
-">>).
+-define(CONF_DEFAULT, <<"gateway {}">>).
 
 %%--------------------------------------------------------------------
 %% Setup
@@ -307,6 +305,10 @@ t_listeners_authn(_) ->
 
     {200, ConfResp3} = request(get, Path),
     assert_confs(AuthConf2, ConfResp3),
+
+    {204, _} = request(delete, Path),
+    %% FIXME: 204?
+    {204, _} = request(get, Path),
     {204, _} = request(delete, "/gateway/stomp").
 
 t_listeners_authn_data_mgmt(_) ->
@@ -340,32 +342,32 @@ t_listeners_authn_data_mgmt(_) ->
     {200,
      #{data := [UserRespd1]} } = request(
                                    get,
-                                   "/gateway/stomp/listeners/stomp:tcp:def/authentication/users"),
+                                   Path ++ "/users"),
     assert_confs(UserRespd1, User1),
 
     {200, UserRespd2} = request(
                           get,
-                          "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test"),
+                          Path ++ "/users/test"),
     assert_confs(UserRespd2, User1),
 
     {200, UserRespd3} = request(
                           put,
-                          "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test",
+                          Path ++ "/users/test",
                           #{password => <<"654321">>, is_superuser => true}),
     assert_confs(UserRespd3, User1#{is_superuser => true}),
 
     {200, UserRespd4} = request(
                           get,
-                          "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test"),
+                          Path ++ "/users/test"),
     assert_confs(UserRespd4, User1#{is_superuser => true}),
 
     {204, _} = request(
                  delete,
-                 "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test"),
+                 Path ++ "/users/test"),
 
     {200, #{data := []}} = request(
                              get,
-                             "/gateway/stomp/listeners/stomp:tcp:def/authentication/users"),
+                             Path ++ "/users"),
     {204, _} = request(delete, "/gateway/stomp").
 
 %%--------------------------------------------------------------------

+ 154 - 16
apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl

@@ -29,6 +29,23 @@
 gateway {}
 ">>).
 
+%% The config with json format for mqtt-sn gateway
+-define(CONF_MQTTSN, "
+{\"idle_timeout\": \"30s\",
+ \"enable_stats\": true,
+ \"mountpoint\": \"mqttsn/\",
+ \"gateway_id\": 1,
+ \"broadcast\": true,
+ \"enable_qos3\": true,
+ \"predefined\": [{\"id\": 1001, \"topic\": \"pred/a\"}],
+ \"listeners\":
+    [{\"type\": \"udp\",
+      \"name\": \"ct\",
+      \"bind\": \"1884\"
+    }]
+}
+").
+
 %%--------------------------------------------------------------------
 %% Setup
 %%--------------------------------------------------------------------
@@ -107,34 +124,144 @@ t_gateway_list(_) ->
       "Gateway(name=lwm2m, status=unloaded)\n"
       "Gateway(name=mqttsn, status=unloaded)\n"
       "Gateway(name=stomp, status=unloaded)\n"
-      , acc_print()).
+      , acc_print()),
 
-t_gateway_load(_) ->
-    ok.
+    emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]),
+    ?assertEqual("ok\n", acc_print()),
 
-t_gateway_unload(_) ->
-    ok.
+    emqx_gateway_cli:gateway(["list"]),
+    %% TODO: assert it.
+    _ = acc_print(),
 
-t_gateway_start(_) ->
-    ok.
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()).
 
-t_gateway_stop(_) ->
-    ok.
+t_gateway_load_unload_lookup(_) ->
+    emqx_gateway_cli:gateway(["lookup", "mqttsn"]),
+    ?assertEqual("undefined\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]),
+    ?assertEqual("ok\n", acc_print()),
+
+    %% TODO: bad config name, format???
+
+    emqx_gateway_cli:gateway(["lookup", "mqttsn"]),
+    %% TODO: assert it. for example:
+    %% name: mqttsn
+    %% status: running
+    %% created_at: 2022-01-05T14:40:20.039+08:00
+    %% started_at: 2022-01-05T14:42:37.894+08:00
+    %% config: #{broadcast => false,enable => true,enable_qos3 => true,
+    %%           enable_stats => true,gateway_id => 1,idle_timeout => 30000,
+    %%           mountpoint => <<>>,predefined => []}
+    _ = acc_print(),
+
+    emqx_gateway_cli:gateway(["load", "mqttsn", "{}"]),
+    ?assertEqual(
+        "Error: The mqttsn gateway already loaded\n"
+        , acc_print()),
+
+    emqx_gateway_cli:gateway(["load", "bad-gw-name", "{}"]),
+    %% TODO: assert it. for example:
+    %% Error: Illegal gateway name
+    _ = acc_print(),
+
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+    %% Always return ok, even the gateway has unloaded
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["lookup", "mqttsn"]),
+    ?assertEqual("undefined\n", acc_print()).
+
+t_gateway_start_stop(_) ->
+    emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]),
+    ?assertEqual("ok\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["stop", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+    %% dupliacted stop gateway, return ok
+    emqx_gateway_cli:gateway(["stop", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["start", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+    %% dupliacted start gateway, return ok
+    emqx_gateway_cli:gateway(["start", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()).
 
 t_gateway_clients_usage(_) ->
-    ok.
+    ?assertEqual(
+       ["gateway-clients list   <Name>            "
+            "# List all clients for a gateway\n",
+        "gateway-clients lookup <Name> <ClientId> "
+            "# Lookup the Client Info for specified client\n",
+        "gateway-clients kick   <Name> <ClientId> "
+            "# Kick out a client\n"],
+       emqx_gateway_cli:'gateway-clients'(usage)
+     ).
 
-t_gateway_clients_list(_) ->
-    ok.
+t_gateway_clients(_) ->
+    emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]),
+    ?assertEqual("ok\n", acc_print()),
 
-t_gateway_clients_lookup(_) ->
-    ok.
+    Socket = sn_client_connect(<<"client1">>),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["list", "mqttsn"]),
+    ClientDesc1 = acc_print(),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["lookup", "mqttsn", "client1"]),
+    ClientDesc2 = acc_print(),
+    ?assertEqual(ClientDesc1, ClientDesc2),
+
+    sn_client_disconnect(Socket),
+    timer:sleep(500),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["lookup", "mqttsn", "bad-client"]),
+    ?assertEqual("Not Found.\n", acc_print()),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["lookup", "bad-gw", "bad-client"]),
+    ?assertEqual("Bad Gateway Name.\n", acc_print()),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["list", "mqttsn"]),
+    %% no print for empty client list
+
+    _ = emqx_gateway_cli:'gateway-clients'(["list", "bad-gw"]),
+    ?assertEqual("Bad Gateway Name.\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()).
 
 t_gateway_clients_kick(_) ->
-    ok.
+    emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]),
+    ?assertEqual("ok\n", acc_print()),
+
+    Socket = sn_client_connect(<<"client1">>),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["list", "mqttsn"]),
+    _ = acc_print(),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["kick", "mqttsn", "bad-client"]),
+    ?assertEqual("Not Found.\n", acc_print()),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["kick", "mqttsn", "client1"]),
+    ?assertEqual("ok\n", acc_print()),
+
+    sn_client_disconnect(Socket),
+
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()).
 
 t_gateway_metrcis_usage(_) ->
-    ok.
+    ?assertEqual(
+       [ "gateway-metrics <Name> "
+            "# List all metrics for a gateway\n"],
+       emqx_gateway_cli:'gateway-metrics'(usage)
+     ).
 
 t_gateway_metrcis(_) ->
     ok.
@@ -148,3 +275,14 @@ acc_print(Acc) ->
     after 200 ->
         Acc
     end.
+
+sn_client_connect(ClientId) ->
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    _ = emqx_sn_protocol_SUITE:send_connect_msg(Socket, ClientId),
+    ?assertEqual(<<3, 16#05, 0>>,
+                 emqx_sn_protocol_SUITE:receive_response(Socket)),
+    Socket.
+
+sn_client_disconnect(Socket) ->
+    _ = emqx_sn_protocol_SUITE:send_disconnect_msg(Socket, undefined),
+    gen_udp:close(Socket), ok.

+ 248 - 0
apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl

@@ -0,0 +1,248 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_cm_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(GWNAME, mqttsn).
+-define(CLIENTID, <<"client1">>).
+
+-define(CONF_DEFAULT, <<"gateway {}">>).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    emqx_config:erase(gateway),
+    emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_common_test_helpers:start_apps([]),
+
+    ok = meck:new(emqx_gateway_metrics, [passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_gateway_metrics, inc, fun(_, _) -> ok end),
+    Conf.
+
+end_per_suite(_Conf) ->
+    meck:unload(emqx_gateway_metrics),
+    emqx_common_test_helpers:stop_apps([]).
+
+init_per_testcase(_TestCase, Conf) ->
+    process_flag(trap_exit, true),
+    {ok, CMPid} = emqx_gateway_cm:start_link([{gwname, ?GWNAME}]),
+    [{cm, CMPid} | Conf].
+
+end_per_testcase(_TestCase, Conf) ->
+    CMPid = proplists:get_value(cm, Conf),
+    gen_server:stop(CMPid),
+    process_flag(trap_exit, false),
+    Conf.
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_open_session(_) ->
+    {error, not_supported_now} = emqx_gateway_cm:open_session(
+                                 ?GWNAME, false, clientinfo(), conninfo(),
+                                 fun(_, _) -> #{} end),
+
+    {ok, SessionRes} = emqx_gateway_cm:open_session(
+                         ?GWNAME, true, clientinfo(), conninfo(),
+                         fun(_, _) -> #{no => 1} end),
+    ?assertEqual(#{present => false,
+                   session => #{no => 1}}, SessionRes),
+
+    %% assert1. check channel infos in ets table
+    Chann = {?CLIENTID, self()},
+    ?assertEqual(
+       [Chann],
+       ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))),
+    ?assertEqual(
+       [{Chann, ?MODULE}],
+       ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))),
+
+    %% assert2. discard the presented session
+
+    {ok, SessionRes2} = emqx_gateway_cm:open_session(
+                          ?GWNAME, true, clientinfo(), conninfo(),
+                          fun(_, _) -> #{no => 2} end),
+    ?assertEqual(#{present => false,
+                   session => #{no => 2}}, SessionRes2),
+
+    emqx_gateway_cm:insert_channel_info(
+      ?GWNAME, ?CLIENTID,
+      #{clientinfo => clientinfo(), conninfo => conninfo()}, []),
+    ?assertEqual(
+       1,
+       ets:info(emqx_gateway_cm:tabname(info, ?GWNAME), size)),
+
+    receive
+        discard ->
+            emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
+            emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID)
+    after 100 ->
+        ?assert(false, "waiting discard msg timeout")
+    end,
+
+    %% assert3. no channel infos in ets table
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))),
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))),
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))).
+
+t_get_set_chan_info_stats(_) ->
+    {ok, SessionRes} = emqx_gateway_cm:open_session(
+                         ?GWNAME, true, clientinfo(), conninfo(),
+                         fun(_, _) -> #{no => 1} end),
+    ?assertEqual(#{present => false,
+                   session => #{no => 1}}, SessionRes),
+    emqx_gateway_cm:insert_channel_info(
+      ?GWNAME, ?CLIENTID,
+      #{clientinfo => clientinfo(), conninfo => conninfo()}, []),
+
+    %% Info: get/set
+    NInfo = #{newinfo => true},
+    emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo),
+    ?assertEqual(
+       NInfo,
+       emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID)),
+    ?assertEqual(
+       NInfo,
+       emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID, self())),
+    %% Stats: get/set
+    NStats = [{newstats, true}],
+    emqx_gateway_cm:set_chan_stats(?GWNAME, ?CLIENTID, NStats),
+    ?assertEqual(
+       NStats,
+       emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID)),
+    ?assertEqual(
+       NStats,
+       emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID, self())),
+
+    emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
+    emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID).
+
+t_handle_process_down(Conf) ->
+    Pid = proplists:get_value(cm, Conf),
+
+    {ok, SessionRes} = emqx_gateway_cm:open_session(
+                         ?GWNAME, true, clientinfo(), conninfo(),
+                         fun(_, _) -> #{no => 1} end),
+    ?assertEqual(#{present => false,
+                   session => #{no => 1}}, SessionRes),
+    emqx_gateway_cm:insert_channel_info(
+      ?GWNAME, ?CLIENTID,
+      #{clientinfo => clientinfo(), conninfo => conninfo()}, []),
+
+    _ = Pid ! {'DOWN', mref, process, self(), normal},
+
+    timer:sleep(200), %% wait the asycn clear task
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))),
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))),
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))).
+
+t_kick_session(_) ->
+    %% session1
+    {ok, _} = emqx_gateway_cm:open_session(
+                ?GWNAME, true, clientinfo(), conninfo(),
+                fun(_, _) -> #{no => 1} end),
+    emqx_gateway_cm:insert_channel_info(
+      ?GWNAME, ?CLIENTID,
+      #{clientinfo => clientinfo(), conninfo => conninfo()}, []),
+
+    %% meck `lookup_channels`
+    Self = self(),
+    ok = meck:new(emqx_gateway_cm_registry,
+                  [passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_gateway_cm_registry, lookup_channels,
+                     fun(_, ?CLIENTID) -> [Self, Self] end),
+
+    ok = emqx_gateway_cm:kick_session(?GWNAME, ?CLIENTID),
+
+    receive discard -> ok
+    after 100 -> ?assert(false, "waiting discard msg timeout")
+    end,
+    receive
+        kick ->
+            emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
+            emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID)
+    after
+        100 ->
+            ?assert(false, "waiting kick msg timeout")
+    end,
+    meck:unload(emqx_gateway_cm_registry).
+
+t_unexpected_handle(Conf) ->
+    Pid = proplists:get_value(cm, Conf),
+    _ = Pid ! unexpected_info,
+    ok = gen_server:call(Pid, unexpected_call),
+    ok = gen_server:cast(Pid, unexpected_cast).
+
+%%--------------------------------------------------------------------
+%% helpers
+
+clientinfo() ->
+    #{ clientid => ?CLIENTID
+     , is_bridge => false
+     , is_superuser => false
+     , listener => 'mqttsn:udp:default'
+     , mountpoint => <<"mqttsn/">>
+     , peerhost => {127, 0, 0, 1}
+     , protocol => 'mqtt-sn'
+     , sockport => 1884
+     , username => undefined
+     , zone => default
+     }.
+
+conninfo() ->
+    #{ clean_start => true
+     , clientid => ?CLIENTID
+     , conn_mod => ?MODULE
+     , connected_at => 1641805544652
+     , expiry_interval => 0
+     , keepalive => 10
+     , peercert => nossl
+     , peername => {{127, 0, 0, 1}, 64810}
+     , proto_name => <<"MQTT-SN">>
+     , proto_ver => <<"1.2">>
+     , sockname => {{0, 0, 0, 0}, 1884}
+     , socktype => udp
+     }.
+
+%%--------------------------------------------------------------------
+%% connection module mock
+
+call(ConnPid, discard, _) ->
+    ConnPid ! discard, ok;
+call(ConnPid, kick, _) ->
+    ConnPid ! kick, ok.

+ 97 - 0
apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl

@@ -0,0 +1,97 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_cm_registry_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(GWNAME, mqttsn).
+-define(CLIENTID, <<"client1">>).
+
+-define(CONF_DEFAULT, <<"gateway {}">>).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    emqx_config:erase(gateway),
+    emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_common_test_helpers:start_apps([]),
+    Conf.
+
+end_per_suite(_Conf) ->
+    emqx_common_test_helpers:stop_apps([]).
+
+init_per_testcase(_TestCase, Conf) ->
+    {ok, Pid} = emqx_gateway_cm_registry:start_link(?GWNAME),
+    [{registry, Pid} | Conf].
+
+end_per_testcase(_TestCase, Conf) ->
+    Pid = proplists:get_value(registry, Conf),
+    gen_server:stop(Pid),
+    Conf.
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_tabname(_) ->
+    ?assertEqual(
+       emqx_gateway_gw_name_channel_registry,
+       emqx_gateway_cm_registry:tabname(gw_name)).
+
+t_register_unregister_channel(_) ->
+    ok = emqx_gateway_cm_registry:register_channel(?GWNAME, ?CLIENTID),
+    ?assertEqual(
+       [{channel, ?CLIENTID, self()}],
+       ets:tab2list(emqx_gateway_cm_registry:tabname(?GWNAME))),
+
+    ?assertEqual(
+       [self()],
+       emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)),
+
+    ok = emqx_gateway_cm_registry:unregister_channel(?GWNAME, ?CLIENTID),
+
+    ?assertEqual(
+       [], 
+       ets:tab2list(emqx_gateway_cm_registry:tabname(?GWNAME))),
+    ?assertEqual(
+       [],
+       emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)).
+
+t_cleanup_channels(Conf) ->
+    Pid = proplists:get_value(registry, Conf),
+    emqx_gateway_cm_registry:register_channel(?GWNAME, ?CLIENTID),
+    ?assertEqual(
+       [self()],
+       emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)),
+    Pid ! {membership, {mnesia, down, node()}},
+    ct:sleep(100),
+    ?assertEqual(
+       [],
+       emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)).
+
+t_handle_unexpected_msg(Conf) ->
+    Pid = proplists:get_value(registry, Conf),
+    _ = Pid ! unexpected_info,
+    ok = gen_server:cast(Pid, unexpected_cast),
+    ignored = gen_server:call(Pid, unexpected_call).

+ 67 - 0
apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl

@@ -0,0 +1,67 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_ctx_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_access_control, authenticate,
+                     fun(#{clientid := bad_client}) ->
+                             {error, bad_username_or_password};
+                        (ClientInfo) -> {ok, ClientInfo}
+                     end),
+    Conf.
+
+end_per_suite(_Conf) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_authenticate(_) ->
+    Ctx = #{gwname => mqttsn, auth => [], cm => self()},
+    Info1 = #{ mountpoint => undefined
+             , clientid => <<"user1">>
+             },
+    NInfo1 = zone(Info1),
+    ?assertEqual({ok, NInfo1}, emqx_gateway_ctx:authenticate(Ctx, Info1)),
+
+    Info2 = #{ mountpoint => <<"mqttsn/${clientid}/">> 
+             , clientid => <<"user1">>
+             },
+    NInfo2 = zone(Info2#{mountpoint => <<"mqttsn/user1/">>}),
+    ?assertEqual({ok, NInfo2}, emqx_gateway_ctx:authenticate(Ctx, Info2)),
+
+    Info3 = #{ mountpoint => <<"mqttsn/${clientid}/">> 
+             , clientid => bad_client
+             },
+    {error, bad_username_or_password}
+        = emqx_gateway_ctx:authenticate(Ctx, Info3),
+    ok.
+
+zone(Info) -> Info#{zone => default}.

+ 76 - 0
apps/emqx_gateway/test/emqx_gateway_metrics_SUITE.erl

@@ -0,0 +1,76 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_metrics_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(GWNAME, mqttsn).
+-define(METRIC, 'ct.test.metrics_name').
+-define(CONF_DEFAULT, <<"gateway {}">>).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    emqx_config:erase(gateway),
+    emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_common_test_helpers:start_apps([]),
+    Conf.
+
+end_per_suite(_Conf) ->
+    emqx_common_test_helpers:stop_apps([]).
+
+init_per_testcase(_TestCase, Conf) ->
+    {ok, Pid} = emqx_gateway_metrics:start_link(?GWNAME),
+    [{metrics, Pid} | Conf].
+
+end_per_testcase(_TestCase, Conf) ->
+    Pid = proplists:get_value(metrics, Conf),
+    gen_server:stop(Pid),
+    Conf.
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_inc_dec(_) ->
+    ok = emqx_gateway_metrics:inc(?GWNAME, ?METRIC),
+    ok = emqx_gateway_metrics:inc(?GWNAME, ?METRIC),
+
+    ?assertEqual(
+      [{?METRIC, 2}],
+      emqx_gateway_metrics:lookup(?GWNAME)),
+
+    ok = emqx_gateway_metrics:dec(?GWNAME, ?METRIC),
+    ok = emqx_gateway_metrics:dec(?GWNAME, ?METRIC),
+
+    ?assertEqual(
+      [{?METRIC, 0}],
+      emqx_gateway_metrics:lookup(?GWNAME)).
+
+t_handle_unexpected_msg(Conf) ->
+    Pid = proplists:get_value(metrics, Conf),
+    _ = Pid ! unexpected_info,
+    ok = gen_server:cast(Pid, unexpected_cast),
+    ok = gen_server:call(Pid, unexpected_call),
+    ok.

+ 6 - 0
apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl

@@ -57,7 +57,13 @@ t_load_unload(_) ->
 
     {error, already_existed} = emqx_gateway_registry:reg(test, [{cbkmod, ?MODULE}]),
 
+    ok = emqx_gateway_registry:unreg(test),
     ok = emqx_gateway_registry:unreg(test),
     undefined = emqx_gateway_registry:lookup(test),
     OldCnt = length(emqx_gateway_registry:list()),
     ok.
+
+t_handle_unexpected_msg(_) ->
+    _ = emqx_gateway_registry ! unexpected_info,
+    ok = gen_server:cast(emqx_gateway_registry, unexpected_cast),
+    ok = gen_server:call(emqx_gateway_registry, unexpected_call).

+ 2 - 0
apps/emqx_gateway/test/emqx_gateway_test_utils.erl

@@ -117,6 +117,8 @@ req(Path, Qs) ->
 req(Path, Qs, Body) ->
     {url(Path, Qs), auth([]), "application/json", emqx_json:encode(Body)}.
 
+url(Path, []) ->
+    lists:concat([?http_api_host, Path]);
 url(Path, Qs) ->
     lists:concat([?http_api_host, Path, "?", binary_to_list(cow_qs:qs(Qs))]).
 

+ 78 - 3
apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl

@@ -19,6 +19,11 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_gateway_test_utils,
+        [ request/2
+        , request/3
+        ]).
+
 -define(PORT, 5783).
 
 -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
@@ -66,9 +71,9 @@ all() ->
     , {group, test_grp_4_discover}
     , {group, test_grp_5_write_attr}
     , {group, test_grp_6_observe}
-
       %% {group, test_grp_8_object_19}
     , {group, test_grp_9_psm_queue_mode}
+    , {group, test_grp_10_rest_api}
     ].
 
 suite() -> [{timetrap, {seconds, 90}}].
@@ -147,21 +152,29 @@ groups() ->
       [
        case90_psm_mode,
        case90_queue_mode
+      ]},
+     {test_grp_10_rest_api, [RepeatOpt],
+      [
+       case100_clients_api,
+       case100_subscription_api
       ]}
     ].
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_conf]),
+    %% load application first for minirest api searching
+    application:load(emqx_gateway),
+    emqx_mgmt_api_test_util:init_suite([emqx_conf]),
     Config.
 
 end_per_suite(Config) ->
     timer:sleep(300),
     {ok, _} = emqx_conf:remove([<<"gateway">>,<<"lwm2m">>], #{}),
-    emqx_common_test_helpers:stop_apps([emqx_conf]),
+    emqx_mgmt_api_test_util:end_suite([emqx_conf]),
     Config.
 
 init_per_testcase(_AllTestCase, Config) ->
     ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+
     {ok, _} = application:ensure_all_started(emqx_gateway),
     {ok, ClientUdpSock} = gen_udp:open(0, [binary, {active, false}]),
 
@@ -1887,6 +1900,68 @@ server_cache_mode(Config, RegOption) ->
     verify_read_response_1(2, UdpSock),
     verify_read_response_1(3, UdpSock).
 
+case100_clients_api(Config) ->
+    Epn = "urn:oma:lwm2m:oma:3",
+    MsgId1 = 15,
+    UdpSock = ?config(sock, Config),
+    ObjectList = <<"</1>, </2>, </3/0>, </4>, </5>">>,
+    RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
+    std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic),
+
+    %% list
+    {200, #{data := [Client1]}} = request(get, "/gateway/lwm2m/clients"),
+    %% searching
+    {200, #{data := [Client2]}} =
+        request(get, "/gateway/lwm2m/clients",
+                [{<<"endpoint_name">>, list_to_binary(Epn)}]),
+    {200, #{data := [Client3]}} =
+        request(get, "/gateway/lwm2m/clients",
+                [{<<"like_endpoint_name">>, list_to_binary(Epn)},
+                 {<<"gte_lifetime">>, <<"1">>}
+                ]),
+    %% lookup
+    ClientId = maps:get(clientid, Client1),
+    {200, Client4} =
+        request(get, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)),
+    %% assert
+    Client1 = Client2 = Client3 = Client4,
+    %% kickout
+    {204, _} =
+        request(delete, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)),
+    {200, #{data := []}} = request(get, "/gateway/lwm2m/clients").
+
+case100_subscription_api(Config) ->
+    Epn = "urn:oma:lwm2m:oma:3",
+    MsgId1 = 15,
+    UdpSock = ?config(sock, Config),
+    ObjectList = <<"</1>, </2>, </3/0>, </4>, </5>">>,
+    RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
+    std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic),
+
+    {200, #{data := [Client1]}} = request(get, "/gateway/lwm2m/clients"),
+    ClientId = maps:get(clientid, Client1),
+    Path = "/gateway/lwm2m/clients/" ++
+            binary_to_list(ClientId) ++
+            "/subscriptions",
+
+    %% list
+    {200, [InitSub]} = request(get, Path),
+    ?assertEqual(
+       <<"lwm2m/", (list_to_binary(Epn))/binary, "/dn/#">>,
+       maps:get(topic, InitSub)),
+
+    %% create
+    SubReq = #{ topic => <<"tx">>
+              , qos => 1
+              , nl => 0
+              , rap => 0
+              , rh => 0
+              },
+    {201, _} = request(post, Path, SubReq),
+    {200, _} = request(get, Path),
+    {204, _} = request(delete, Path ++ "/tx"),
+    {200, [InitSub]} = request(get, Path).
+
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 %%% Internal Functions
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 74 - 5
apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl

@@ -14,11 +14,16 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module (emqx_sn_protocol_SUITE).
+-module(emqx_sn_protocol_SUITE).
 
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_gateway_test_utils,
+        [ request/2
+        , request/3
+        ]).
+
 -include("src/mqttsn/include/emqx_sn.hrl").
 
 -include_lib("eunit/include/eunit.hrl").
@@ -27,7 +32,6 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
-
 -define(HOST, {127,0,0,1}).
 -define(PORT, 1884).
 
@@ -85,12 +89,12 @@ all() ->
 
 init_per_suite(Config) ->
     ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
-    emqx_common_test_helpers:start_apps([emqx_gateway]),
+    emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
     Config.
 
 end_per_suite(_) ->
     {ok, _} = emqx:remove_config([gateway, mqttsn]),
-    emqx_common_test_helpers:stop_apps([emqx_gateway]).
+    emqx_mgmt_api_test_util:end_suite([emqx_gateway]).
 
 %%--------------------------------------------------------------------
 %% Test cases
@@ -644,7 +648,6 @@ t_publish_qos0_case05(_) ->
 
     gen_udp:close(Socket).
 
-
 t_publish_qos0_case06(_) ->
     Dup = 0,
     QoS = 0,
@@ -1762,6 +1765,72 @@ t_broadcast_test1(_) ->
     timer:sleep(600),
     gen_udp:close(Socket).
 
+t_socket_passvice(_) ->
+    %% TODO: test this gateway enter the passvie event
+    ok.
+
+t_clients_api(_) ->
+    TsNow = emqx_gateway_utils:unix_ts_to_rfc3339(
+              erlang:system_time(millisecond)),
+    ClientId = <<"client_id_test1">>,
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    send_connect_msg(Socket, ClientId),
+    ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
+    %% list
+    {200, #{data := [Client1]}} = request(get, "/gateway/mqttsn/clients"),
+    #{clientid := ClientId} = Client1,
+    %% searching
+    {200, #{data := [Client2]}} =
+        request(get, "/gateway/mqttsn/clients", [{<<"clientid">>, ClientId}]),
+    {200, #{data := [Client3]}} =
+        request(get, "/gateway/mqttsn/clients",
+                [{<<"like_clientid">>, <<"test1">>},
+                 {<<"proto_ver">>, <<"1.2">>},
+                 {<<"ip_address">>, <<"127.0.0.1">>},
+                 {<<"conn_state">>, <<"connected">>},
+                 {<<"clean_start">>, <<"true">>},
+                 {<<"gte_connected_at">>, TsNow}
+                ]),
+    %% lookup
+    {200, Client4} =
+        request(get, "/gateway/mqttsn/clients/client_id_test1"),
+    %% assert
+    Client1 = Client2 = Client3 = Client4,
+    %% kickout
+    {204, _} =
+        request(delete, "/gateway/mqttsn/clients/client_id_test1"),
+    {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"),
+
+    send_disconnect_msg(Socket, undefined),
+    gen_udp:close(Socket).
+
+t_clients_subscription_api(_) ->
+    ClientId = <<"client_id_test1">>,
+    Path = "/gateway/mqttsn/clients/client_id_test1/subscriptions",
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    send_connect_msg(Socket, ClientId),
+    ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
+    %% list
+    {200, []} = request(get, Path),
+    %% create
+    SubReq = #{ topic => <<"tx">>
+              , qos => 1
+              , nl => 0
+              , rap => 0
+              , rh => 0
+              },
+    {201, SubsResp} = request(post, Path, SubReq),
+
+    {200, [SubsResp]} = request(get, Path),
+
+    {204, _} = request(delete, Path ++ "/tx"),
+
+    {200, []} = request(get, Path),
+
+    send_disconnect_msg(Socket, undefined),
+    ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
+    gen_udp:close(Socket).
+
 %%--------------------------------------------------------------------
 %% Helper funcs
 %%--------------------------------------------------------------------

+ 34 - 0
apps/emqx_gateway/test/emqx_stomp_SUITE.erl

@@ -351,6 +351,40 @@ t_ack(_) ->
                                   body    = _}, _, _} = parse(Data4)
     end).
 
+t_1000_msg_send(_) ->
+    with_connection(fun(Sock) ->
+        gen_tcp:send(Sock, serialize(<<"CONNECT">>,
+                                     [{<<"accept-version">>, ?STOMP_VER},
+                                      {<<"host">>, <<"127.0.0.1:61613">>},
+                                      {<<"login">>, <<"guest">>},
+                                      {<<"passcode">>, <<"guest">>},
+                                      {<<"heart-beat">>, <<"0,0">>}])),
+        {ok, Data} = gen_tcp:recv(Sock, 0),
+        {ok, #stomp_frame{command = <<"CONNECTED">>,
+                          headers = _,
+                          body    = _}, _, _} = parse(Data),
+
+        Topic = <<"/queue/foo">>,
+        SendFun = fun() ->
+            gen_tcp:send(Sock, serialize(<<"SEND">>,
+                                        [{<<"destination">>, Topic}],
+                                        <<"msgtest">>))
+        end,
+
+        RecvFun = fun() ->
+            receive
+                {deliver, Topic, _Msg}->
+                    ok
+            after 100 ->
+                      ?assert(false, "waiting message timeout")
+            end
+        end,
+
+        emqx:subscribe(Topic),
+        lists:foreach(fun(_) -> SendFun() end, lists:seq(1, 1000)),
+        lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000))
+    end).
+
 t_rest_clienit_info(_) ->
     with_connection(fun(Sock) ->
         gen_tcp:send(Sock, serialize(<<"CONNECT">>,

+ 45 - 0
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -35,6 +35,15 @@
 
 -export([do_query/6]).
 
+-export([ ensure_timestamp_format/2
+        ]).
+
+-export([ unix_ts_to_rfc3339_bin/1
+        , unix_ts_to_rfc3339_bin/2
+        , time_string_to_unix_ts_int/1
+        , time_string_to_unix_ts_int/2
+        ]).
+
 paginate(Tables, Params, {Module, FormatFun}) ->
     Qh = query_handle(Tables),
     Count = count(Tables),
@@ -401,6 +410,7 @@ to_integer(B) when is_binary(B) ->
 to_timestamp(I) when is_integer(I) ->
     I;
 to_timestamp(B) when is_binary(B) ->
+
     binary_to_integer(B).
 
 aton(B) when is_binary(B) ->
@@ -412,6 +422,41 @@ to_ip_port(IPAddress) ->
     Port = list_to_integer(Port0),
     {IP, Port}.
 
+%%--------------------------------------------------------------------
+%% time format funcs
+
+ensure_timestamp_format(Qs, TimeKeys)
+  when is_map(Qs);
+       is_list(TimeKeys) ->
+    Fun = fun (Key, NQs) ->
+        case NQs of
+            %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339)
+            %% or "1609430400000" (in millisecond)
+            #{Key := TimeString} ->
+                NQs#{Key => time_string_to_unix_ts_int(TimeString)};
+            #{} -> NQs
+        end
+    end,
+    lists:foldl(Fun, Qs, TimeKeys).
+
+unix_ts_to_rfc3339_bin(TimeStamp) ->
+    unix_ts_to_rfc3339_bin(TimeStamp, millisecond).
+
+unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) ->
+    list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])).
+
+time_string_to_unix_ts_int(DateTime) ->
+    time_string_to_unix_ts_int(DateTime, millisecond).
+
+time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) ->
+    try binary_to_integer(DateTime) of
+        TimeStamp when is_integer(TimeStamp) -> TimeStamp
+    catch
+        error:badarg ->
+            calendar:rfc3339_to_system_time(
+              binary_to_list(DateTime), [{unit, Unit}])
+    end.
+
 %%--------------------------------------------------------------------
 %% EUnits
 %%--------------------------------------------------------------------

+ 8 - 43
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -44,14 +44,6 @@
 %% for batch operation
 -export([do_subscribe/3]).
 
-%% for test suite
--export([ unix_ts_to_rfc3339_bin/1
-        , unix_ts_to_rfc3339_bin/2
-        , time_string_to_unix_ts_int/1
-        , time_string_to_unix_ts_int/2
-        ]).
-
-
 -define(CLIENT_QS_SCHEMA, {emqx_channel_info,
     [ {<<"node">>, atom}
     , {<<"username">>, binary}
@@ -463,7 +455,7 @@ keepalive_api() ->
 %%%==============================================================================================
 %% parameters trans
 clients(get, #{query_string := Qs}) ->
-    list(generate_qs(Qs)).
+    list(emqx_mgmt_api:ensure_timestamp_format(Qs, time_keys())).
 
 client(get, #{bindings := Bindings}) ->
     lookup(Bindings);
@@ -625,7 +617,8 @@ do_unsubscribe(ClientID, Topic) ->
     end.
 
 %%--------------------------------------------------------------------
-%% QueryString Generation (try rfc3339 to timestamp or keep timestamp)
+%% QueryString data-fomrat convert
+%%  (try rfc3339 to timestamp or keep timestamp)
 
 time_keys() ->
     [ <<"gte_created_at">>
@@ -633,18 +626,6 @@ time_keys() ->
     , <<"gte_connected_at">>
     , <<"lte_connected_at">>].
 
-generate_qs(Qs) ->
-    Fun =
-        fun (Key, NQs) ->
-                case NQs of
-                    %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339)
-                    %% or "1609430400000" (in millisecond)
-                    #{Key := TimeString} -> NQs#{Key => time_string_to_unix_ts_int(TimeString)};
-                    #{}                  -> NQs
-                end
-        end,
-    lists:foldl(Fun, Qs, time_keys()).
-
 %%--------------------------------------------------------------------
 %% Query Functions
 
@@ -778,8 +759,11 @@ take_maps_from_inner(Key, Value, Current) ->
 
 result_format_time_fun(Key, NClientInfoMap) ->
     case NClientInfoMap of
-        #{Key := TimeStamp} -> NClientInfoMap#{Key => unix_ts_to_rfc3339_bin(TimeStamp)};
-        #{}                 -> NClientInfoMap
+        #{Key := TimeStamp} ->
+            NClientInfoMap#{
+              Key => emqx_mgmt_api:unix_ts_to_rfc3339_bin(TimeStamp)};
+        #{} ->
+            NClientInfoMap
     end.
 
 -spec(peername_dispart(emqx_types:peername()) -> {binary(), inet:port_number()}).
@@ -795,22 +779,3 @@ format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) ->
        updated_time => Timestamp
      }.
 
-%%--------------------------------------------------------------------
-%% time format funcs
-
-unix_ts_to_rfc3339_bin(TimeStamp) ->
-    unix_ts_to_rfc3339_bin(TimeStamp, millisecond).
-
-unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) ->
-    list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])).
-
-time_string_to_unix_ts_int(DateTime) ->
-    time_string_to_unix_ts_int(DateTime, millisecond).
-
-time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) ->
-    try binary_to_integer(DateTime) of
-        TimeStamp when is_integer(TimeStamp) -> TimeStamp
-    catch
-        error:badarg ->
-            calendar:rfc3339_to_system_time(binary_to_list(DateTime), [{unit, Unit}])
-    end.

+ 3 - 3
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -129,7 +129,7 @@ t_query_clients_with_time(_) ->
     NowTimeStampInt = erlang:system_time(millisecond),
     %% Do not uri_encode `=` to `%3D`
     Rfc3339String   = emqx_http_lib:uri_encode(binary:bin_to_list(
-        emqx_mgmt_api_clients:unix_ts_to_rfc3339_bin(NowTimeStampInt))),
+        emqx_mgmt_api:unix_ts_to_rfc3339_bin(NowTimeStampInt))),
     TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)),
 
     LteKeys         = ["lte_created_at=", "lte_connected_at="],
@@ -147,10 +147,10 @@ t_query_clients_with_time(_) ->
                        || {ok, Response} <- RequestResults],
     {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults),
     %% EachData :: list()
-    [?assert( emqx_mgmt_api_clients:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt)
+    [?assert( emqx_mgmt_api:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt)
      || #{<<"data">> := EachData} <- LteResponseDecodeds,
         #{<<"created_at">> := CreatedAt}     <- EachData],
-    [?assert(emqx_mgmt_api_clients:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt)
+    [?assert(emqx_mgmt_api:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt)
      || #{<<"data">> := EachData} <- LteResponseDecodeds,
         #{<<"connected_at">> := ConnectedAt} <- EachData],
     [?assertEqual(EachData, [])