Bladeren bron

refactor(gw): simplify massive repeated codes

JianBo He 4 jaren geleden
bovenliggende
commit
8203b1f328

+ 24 - 69
apps/emqx_gateway/src/coap/emqx_coap_impl.erl

@@ -16,9 +16,16 @@
 
 -module(emqx_coap_impl).
 
+-behaviour(emqx_gateway_impl).
+
+-include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_gateway/include/emqx_gateway.hrl").
 
--behaviour(emqx_gateway_impl).
+-import(emqx_gateway_utils,
+        [ normalize_config/1
+        , start_listeners/4
+        , stop_listeners/2
+        ]).
 
 %% APIs
 -export([ reg/0
@@ -30,8 +37,6 @@
         , on_gateway_unload/2
         ]).
 
--include_lib("emqx/include/logger.hrl").
-
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -51,12 +56,20 @@ unreg() ->
 on_gateway_load(_Gateway = #{name := GwName,
                              config := Config
                             }, Ctx) ->
-    Listeners = emqx_gateway_utils:normalize_config(Config),
-    ListenerPids = lists:map(fun(Lis) ->
-                                     start_listener(GwName, Ctx, Lis)
-                             end, Listeners),
-
-    {ok, ListenerPids,  #{ctx => Ctx}}.
+    Listeners = normalize_config(Config),
+    ModCfg = #{frame_mod => emqx_coap_frame,
+               chann_mod => emqx_coap_channel
+              },
+    case start_listeners(
+           Listeners, GwName, Ctx, ModCfg) of
+        {ok, ListenerPids} ->
+            {ok, ListenerPids,  #{ctx => Ctx}};
+        {error, {Reason, Listener}} ->
+            throw({badconf, #{ key => listeners
+                             , vallue => Listener
+                             , reason => Reason
+                             }})
+    end.
 
 on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
     GwName = maps:get(name, Gateway),
@@ -76,63 +89,5 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
 on_gateway_unload(_Gateway = #{ name := GwName,
                                 config := Config
                               }, _GwState) ->
-    Listeners = emqx_gateway_utils:normalize_config(Config),
-    lists:foreach(fun(Lis) ->
-        stop_listener(GwName, Lis)
-    end, Listeners).
-
-%%--------------------------------------------------------------------
-%% Internal funcs
-%%--------------------------------------------------------------------
-
-start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
-    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
-    case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
-        {ok, Pid} ->
-            console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
-                          [GwName, Type, LisName, ListenOnStr]),
-            Pid;
-        {error, Reason} ->
-            ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
-                  [GwName, Type, LisName, ListenOnStr, Reason]),
-            throw({badconf, Reason})
-    end.
-
-start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
-    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    NCfg = Cfg#{ctx => Ctx,
-                listener => {GwName, Type, LisName},
-                frame_mod => emqx_coap_frame,
-                chann_mod => emqx_coap_channel
-               },
-    MFA = {emqx_gateway_conn, start_link, [NCfg]},
-    do_start_listener(Type, Name, ListenOn, SocketOpts, MFA).
-
-do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
-    esockd:open_udp(Name, ListenOn, SocketOpts, MFA);
-
-do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
-    esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
-
-stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
-    StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
-    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
-    case StopRet of
-        ok ->
-            console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
-                          [GwName, Type, LisName, ListenOnStr]);
-        {error, Reason} ->
-            ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
-                  [GwName, Type, LisName, ListenOnStr, Reason])
-    end,
-    StopRet.
-
-stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
-    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    esockd:close(Name, ListenOn).
-
--ifndef(TEST).
-console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
--else.
-console_print(_Fmt, _Args) -> ok.
--endif.
+    Listeners = normalize_config(Config),
+    stop_listeners(GwName, Listeners).

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_api_listeners.erl

@@ -580,7 +580,7 @@ common_listener_opts() ->
           #{ nullable => {true, recursively}
            , desc => <<"The authenticatior for this listener">>
            })}
-    ].
+    ] ++ emqx_gateway_schema:proxy_protocol_opts().
 
 %%--------------------------------------------------------------------
 %% examples

+ 2 - 0
apps/emqx_gateway/src/emqx_gateway_schema.erl

@@ -50,6 +50,8 @@
 
 -export([namespace/0, roots/0 , fields/1]).
 
+-export([proxy_protocol_opts/0]).
+
 namespace() -> gateway.
 
 roots() -> [gateway].

+ 130 - 3
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -18,6 +18,7 @@
 -module(emqx_gateway_utils).
 
 -include("emqx_gateway.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -export([ childspec/2
         , childspec/3
@@ -26,6 +27,12 @@
         , find_sup_child/2
         ]).
 
+-export([ start_listeners/4
+        , start_listener/4
+        , stop_listeners/2
+        , stop_listener/2
+        ]).
+
 -export([ apply/2
         , format_listenon/1
         , parse_listenon/1
@@ -89,9 +96,15 @@ childspec(Id, Type, Mod, Args) ->
 -spec supervisor_ret(supervisor:startchild_ret())
     -> {ok, pid()}
      | {error, supervisor:startchild_err()}.
-supervisor_ret({ok, Pid, _Info}) -> {ok, Pid};
-supervisor_ret({error, {Reason, _Child}}) -> {error, Reason};
-supervisor_ret(Ret) -> Ret.
+supervisor_ret({ok, Pid, _Info}) ->
+    {ok, Pid};
+supervisor_ret({error, {Reason, Child}}) ->
+    case element(1, Child) == child of
+        true -> {error, Reason};
+        _ -> {error, {Reason, Child}}
+    end;
+supervisor_ret(Ret) ->
+    Ret.
 
 -spec find_sup_child(Sup :: pid() | atom(), ChildId :: supervisor:child_id())
     -> false
@@ -102,6 +115,120 @@ find_sup_child(Sup, ChildId) ->
         {_Id, Pid, _Type, _Mods} -> {ok, Pid}
     end.
 
+%% @doc start listeners. close all listeners if someone failed
+-spec start_listeners(Listeners :: list(),
+                      GwName :: atom(),
+                      Ctx :: map(),
+                      ModCfg)
+    -> {ok, [pid()]}
+     | {error, term()}
+    when ModCfg :: #{frame_mod := atom(), chann_mod := atom()}.
+start_listeners(Listeners, GwName, Ctx, ModCfg) ->
+    start_listeners(Listeners, GwName, Ctx, ModCfg, []).
+
+start_listeners([], _, _, _, Acc) ->
+    {ok, lists:map(fun({listener, {_, Pid}}) -> Pid end, Acc)};
+start_listeners([L | Ls], GwName, Ctx, ModCfg, Acc) ->
+    case start_listener(GwName, Ctx, L, ModCfg) of
+        {ok, {ListenerId, ListenOn, Pid}} ->
+            NAcc = Acc ++ [{listener, {{ListenerId, ListenOn}, Pid}}],
+            start_listeners(Ls, GwName, Ctx, ModCfg, NAcc);
+        {error, Reason} ->
+            lists:foreach(fun({listener, {{ListenerId, ListenOn}, _}}) ->
+                esockd:close({ListenerId, ListenOn})
+            end, Acc),
+            {error, {Reason, L}}
+    end.
+
+-spec start_listener(GwName :: atom(),
+                     Ctx :: emqx_gateway_ctx:context(),
+                     Listener :: tuple(),
+                     ModCfg :: map())
+    -> {ok, pid()}
+     | {error, term()}.
+start_listener(GwName, Ctx,
+               {Type, LisName, ListenOn, SocketOpts, Cfg}, ModCfg) ->
+    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
+    ListenerId  = emqx_gateway_utils:listener_id(GwName, Type, LisName),
+
+    NCfg = maps:merge(Cfg, ModCfg),
+    case start_listener(GwName, Ctx, Type,
+                        LisName, ListenOn, SocketOpts, NCfg) of
+        {ok, Pid} ->
+            console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
+                          [GwName, Type, LisName, ListenOnStr]),
+            {ok, {ListenerId, ListenOn, Pid}};
+        {error, Reason} ->
+            ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
+                  [GwName, Type, LisName, ListenOnStr, Reason]),
+            emqx_gateway_utils:supervisor_ret({error, Reason})
+    end.
+
+start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
+    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
+    NCfg = Cfg#{ ctx => Ctx
+               , listener => {GwName, Type, LisName}
+               },
+    NSocketOpts = merge_default(Type, SocketOpts),
+    MFA = {emqx_gateway_conn, start_link, [NCfg]},
+    do_start_listener(Type, Name, ListenOn, NSocketOpts, MFA).
+
+merge_default(Udp, Options) ->
+    {Key, Default} = case Udp of
+                         udp ->
+                             {udp_options, default_udp_options()};
+                         dtls ->
+                             {udp_options, default_udp_options()};
+                         tcp ->
+                             {tcp_options, default_tcp_options()};
+                         ssl ->
+                             {tcp_options, default_tcp_options()}
+                     end,
+    case lists:keytake(Key, 1, Options) of
+        {value, {Key, TcpOpts}, Options1} ->
+            [{Key, emqx_misc:merge_opts(Default, TcpOpts)}
+             | Options1];
+        false ->
+            [{Key, Default} | Options]
+    end.
+
+do_start_listener(Type, Name, ListenOn, SocketOpts, MFA)
+  when Type == tcp;
+       Type == ssl ->
+    esockd:open(Name, ListenOn, SocketOpts, MFA);
+do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
+    esockd:open_udp(Name, ListenOn, SocketOpts, MFA);
+do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
+    esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
+
+-spec stop_listeners(GwName :: atom(), Listeners :: list()) -> ok.
+stop_listeners(GwName, Listeners) ->
+    lists:foreach(fun(L) -> stop_listener(GwName, L) end, Listeners).
+
+-spec stop_listener(GwName :: atom(), Listener :: tuple()) -> ok.
+stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
+    StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
+    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
+    case StopRet of
+        ok ->
+            console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
+                          [GwName, Type, LisName, ListenOnStr]);
+        {error, Reason} ->
+            ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
+                  [GwName, Type, LisName, ListenOnStr, Reason])
+    end,
+    StopRet.
+
+stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
+    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
+    esockd:close(Name, ListenOn).
+
+-ifndef(TEST).
+console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
+-else.
+console_print(_Fmt, _Args) -> ok.
+-endif.
+
 apply({M, F, A}, A2) when is_atom(M),
                           is_atom(M),
                           is_list(A),

+ 75 - 133
apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl

@@ -19,6 +19,14 @@
 
 -behaviour(emqx_gateway_impl).
 
+-include_lib("emqx/include/logger.hrl").
+
+-import(emqx_gateway_utils,
+        [ normalize_config/1
+        , start_listeners/4
+        , stop_listeners/2
+        ]).
+
 %% APIs
 -export([ reg/0
         , unreg/0
@@ -29,8 +37,6 @@
         , on_gateway_unload/2
         ]).
 
--include_lib("emqx/include/logger.hrl").
-
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -47,6 +53,73 @@ unreg() ->
 %% emqx_gateway_registry callbacks
 %%--------------------------------------------------------------------
 
+on_gateway_load(_Gateway = #{ name := GwName,
+                              config := Config
+                            }, Ctx) ->
+    %% XXX: How to monitor it ?
+    %% Start grpc client pool & client channel
+    PoolName = pool_name(GwName),
+    PoolSize = emqx_vm:schedulers() * 2,
+    {ok, PoolSup} = emqx_pool_sup:start_link(
+                      PoolName, hash, PoolSize,
+                      {emqx_exproto_gcli, start_link, []}),
+    _ = start_grpc_client_channel(GwName,
+                                  maps:get(handler, Config, undefined)
+                                 ),
+    %% XXX: How to monitor it ?
+    _ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
+
+    NConfig = maps:without(
+                 [server, handler],
+                 Config#{pool_name => PoolName}
+                ),
+    Listeners = emqx_gateway_utils:normalize_config(
+                  NConfig#{handler => GwName}
+                 ),
+
+    ModCfg = #{frame_mod => emqx_exproto_frame,
+               chann_mod => emqx_exproto_channel
+              },
+    case start_listeners(
+           Listeners, GwName, Ctx, ModCfg) of
+        {ok, ListenerPids} ->
+            {ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}};
+        {error, {Reason, Listener}} ->
+            throw({badconf, #{ key => listeners
+                             , vallue => Listener
+                             , reason => Reason
+                             }})
+    end.
+
+on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
+    GwName = maps:get(name, Gateway),
+    try
+        %% XXX: 1. How hot-upgrade the changes ???
+        %% XXX: 2. Check the New confs first before destroy old instance ???
+        on_gateway_unload(Gateway, GwState),
+        on_gateway_load(Gateway#{config => Config}, Ctx)
+    catch
+        Class : Reason : Stk ->
+            logger:error("Failed to update ~ts; "
+                         "reason: {~0p, ~0p} stacktrace: ~0p",
+                         [GwName, Class, Reason, Stk]),
+            {error, {Class, Reason}}
+    end.
+
+on_gateway_unload(_Gateway = #{ name := GwName,
+                                config := Config
+                              }, _GwState = #{pool := PoolSup}) ->
+    Listeners = emqx_gateway_utils:normalize_config(Config),
+    %% Stop funcs???
+    exit(PoolSup, kill),
+    stop_grpc_server(GwName),
+    stop_grpc_client_channel(GwName),
+    stop_listeners(GwName, Listeners).
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
 start_grpc_server(_GwName, undefined) ->
     undefined;
 start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
@@ -103,140 +176,9 @@ stop_grpc_client_channel(GwName) ->
     _ = grpc_client_sup:stop_channel_pool(GwName),
     ok.
 
-on_gateway_load(_Gateway = #{ name := GwName,
-                              config := Config
-                            }, Ctx) ->
-    %% XXX: How to monitor it ?
-    %% Start grpc client pool & client channel
-    PoolName = pool_name(GwName),
-    PoolSize = emqx_vm:schedulers() * 2,
-    {ok, PoolSup} = emqx_pool_sup:start_link(
-                      PoolName, hash, PoolSize,
-                      {emqx_exproto_gcli, start_link, []}),
-    _ = start_grpc_client_channel(GwName,
-                                  maps:get(handler, Config, undefined)
-                                 ),
-    %% XXX: How to monitor it ?
-    _ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
-
-    NConfig = maps:without(
-                 [server, handler],
-                 Config#{pool_name => PoolName}
-                ),
-    Listeners = emqx_gateway_utils:normalize_config(
-                  NConfig#{handler => GwName}
-                 ),
-    ListenerPids = lists:map(fun(Lis) ->
-                     start_listener(GwName, Ctx, Lis)
-                   end, Listeners),
-    {ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}}.
-
-on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
-    GwName = maps:get(name, Gateway),
-    try
-        %% XXX: 1. How hot-upgrade the changes ???
-        %% XXX: 2. Check the New confs first before destroy old instance ???
-        on_gateway_unload(Gateway, GwState),
-        on_gateway_load(Gateway#{config => Config}, Ctx)
-    catch
-        Class : Reason : Stk ->
-            logger:error("Failed to update ~ts; "
-                         "reason: {~0p, ~0p} stacktrace: ~0p",
-                         [GwName, Class, Reason, Stk]),
-            {error, {Class, Reason}}
-    end.
-
-on_gateway_unload(_Gateway = #{ name := GwName,
-                                config := Config
-                              }, _GwState = #{pool := PoolSup}) ->
-    Listeners = emqx_gateway_utils:normalize_config(Config),
-    %% Stop funcs???
-    exit(PoolSup, kill),
-    stop_grpc_server(GwName),
-    stop_grpc_client_channel(GwName),
-    lists:foreach(fun(Lis) ->
-        stop_listener(GwName, Lis)
-    end, Listeners).
-
 pool_name(GwName) ->
     list_to_atom(lists:concat([GwName, "_gcli_pool"])).
 
-%%--------------------------------------------------------------------
-%% Internal funcs
-%%--------------------------------------------------------------------
-
-start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
-    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
-    case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
-        {ok, Pid} ->
-            console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
-                          [GwName, Type, LisName, ListenOnStr]),
-            Pid;
-        {error, Reason} ->
-            ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
-                  [GwName, Type, LisName, ListenOnStr, Reason]),
-            throw({badconf, Reason})
-    end.
-
-start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
-    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    NCfg = Cfg#{
-             ctx => Ctx,
-             listener => {GwName, Type, LisName},
-             frame_mod => emqx_exproto_frame,
-             chann_mod => emqx_exproto_channel
-            },
-    MFA = {emqx_gateway_conn, start_link, [NCfg]},
-    NSockOpts = merge_default_by_type(Type, SocketOpts),
-    do_start_listener(Type, Name, ListenOn, NSockOpts, MFA).
-
-do_start_listener(Type, Name, ListenOn, Opts, MFA)
-  when Type == tcp;
-       Type == ssl ->
-    esockd:open(Name, ListenOn, Opts, MFA);
-do_start_listener(udp, Name, ListenOn, Opts, MFA) ->
-    esockd:open_udp(Name, ListenOn, Opts, MFA);
-do_start_listener(dtls, Name, ListenOn, Opts, MFA) ->
-    esockd:open_dtls(Name, ListenOn, Opts, MFA).
-
-merge_default_by_type(Type, Options) when Type =:= tcp;
-                                          Type =:= ssl ->
-    Default = emqx_gateway_utils:default_tcp_options(),
-    case lists:keytake(tcp_options, 1, Options) of
-        {value, {tcp_options, TcpOpts}, Options1} ->
-            [{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)}
-             | Options1];
-        false ->
-            [{tcp_options, Default} | Options]
-    end;
-merge_default_by_type(Type, Options) when Type =:= udp;
-                                          Type =:= dtls ->
-    Default = emqx_gateway_utils:default_udp_options(),
-    case lists:keytake(udp_options, 1, Options) of
-        {value, {udp_options, TcpOpts}, Options1} ->
-            [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)}
-             | Options1];
-        false ->
-            [{udp_options, Default} | Options]
-    end.
-
-stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
-    StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
-    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
-    case StopRet of
-        ok ->
-            console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
-                          [GwName, Type, LisName, ListenOnStr]);
-        {error, Reason} ->
-            ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
-                  [GwName, Type, LisName, ListenOnStr, Reason])
-    end,
-    StopRet.
-
-stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
-    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    esockd:close(Name, ListenOn).
-
 -ifndef(TEST).
 console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
 -else.

+ 17 - 76
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl

@@ -19,6 +19,8 @@
 
 -behaviour(emqx_gateway_impl).
 
+-include_lib("emqx/include/logger.hrl").
+
 %% APIs
 -export([ reg/0
         , unreg/0
@@ -29,8 +31,6 @@
         , on_gateway_unload/2
         ]).
 
--include_lib("emqx/include/logger.hrl").
-
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -54,10 +54,20 @@ on_gateway_load(_Gateway = #{ name := GwName,
     case emqx_lwm2m_xml_object_db:start_link(XmlDir) of
         {ok, RegPid} ->
             Listeners = emqx_gateway_utils:normalize_config(Config),
-            ListenerPids = lists:map(fun(Lis) ->
-                                             start_listener(GwName, Ctx, Lis)
-                                     end, Listeners),
-            {ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}};
+            ModCfg = #{frame_mod => emqx_coap_frame,
+                       chann_mod => emqx_lwm2m_channel
+                      },
+            case emqx_gateway_utils:start_listeners(
+                   Listeners, GwName, Ctx, ModCfg) of
+                {ok, ListenerPids} ->
+                    {ok, ListenerPids, #{ctx => Ctx, registry => RegPid}};
+                {error, {Reason, Listener}} ->
+                    emqx_lwm2m_xml_object_db:stop(),
+                    throw({badconf, #{ key => listeners
+                                     , vallue => Listener
+                                     , reason => Reason
+                                     }})
+            end;
         {error, Reason} ->
             throw({badconf, #{ key => xml_dir
                              , value => XmlDir
@@ -85,73 +95,4 @@ on_gateway_unload(_Gateway = #{ name := GwName,
                               }, _GwState = #{registry := RegPid}) ->
     exit(RegPid, kill),
     Listeners = emqx_gateway_utils:normalize_config(Config),
-    lists:foreach(fun(Lis) ->
-        stop_listener(GwName, Lis)
-    end, Listeners).
-
-%%--------------------------------------------------------------------
-%% Internal funcs
-%%--------------------------------------------------------------------
-
-start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
-    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
-    case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
-        {ok, Pid} ->
-            console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
-                          [GwName, Type, LisName, ListenOnStr]),
-            Pid;
-        {error, Reason} ->
-            ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
-                  [GwName, Type, LisName, ListenOnStr, Reason]),
-            throw({badconf, Reason})
-    end.
-
-start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
-    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    NCfg = Cfg#{ ctx => Ctx
-               , listener => {GwName, Type, LisName}
-               , frame_mod => emqx_coap_frame
-               , chann_mod => emqx_lwm2m_channel
-               },
-    NSocketOpts = merge_default(SocketOpts),
-    MFA = {emqx_gateway_conn, start_link, [NCfg]},
-    do_start_listener(Type, Name, ListenOn, NSocketOpts, MFA).
-
-merge_default(Options) ->
-    Default = emqx_gateway_utils:default_udp_options(),
-    case lists:keytake(udp_options, 1, Options) of
-        {value, {udp_options, TcpOpts}, Options1} ->
-            [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)}
-             | Options1];
-        false ->
-            [{udp_options, Default} | Options]
-    end.
-
-do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
-    esockd:open_udp(Name, ListenOn, SocketOpts, MFA);
-
-do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
-    esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
-
-stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
-    StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
-    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
-    case StopRet of
-        ok ->
-            console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
-                          [GwName, Type, LisName, ListenOnStr]);
-        {error, Reason} ->
-            ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
-                  [GwName, Type, LisName, ListenOnStr, Reason])
-    end,
-    StopRet.
-
-stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
-    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    esockd:close(Name, ListenOn).
-
--ifndef(TEST).
-console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
--else.
-console_print(_Fmt, _Args) -> ok.
--endif.
+    emqx_gateway_utils:stop_listeners(GwName, Listeners).

+ 25 - 71
apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl

@@ -19,6 +19,14 @@
 
 -behaviour(emqx_gateway_impl).
 
+-include_lib("emqx/include/logger.hrl").
+
+-import(emqx_gateway_utils,
+        [ normalize_config/1
+        , start_listeners/4
+        , stop_listeners/2
+        ]).
+
 %% APIs
 -export([ reg/0
         , unreg/0
@@ -29,8 +37,6 @@
         , on_gateway_unload/2
         ]).
 
--include_lib("emqx/include/logger.hrl").
-
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -70,12 +76,23 @@ on_gateway_load(_Gateway = #{ name := GwName,
                  [broadcast, predefined],
                  Config#{registry => emqx_sn_registry:lookup_name(RegistrySvr)}
                 ),
+
     Listeners = emqx_gateway_utils:normalize_config(NConfig),
 
-    ListenerPids = lists:map(fun(Lis) ->
-                     start_listener(GwName, Ctx, Lis)
-                   end, Listeners),
-    {ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
+    ModCfg = #{frame_mod => emqx_sn_frame,
+               chann_mod => emqx_sn_channel
+              },
+
+    case start_listeners(
+           Listeners, GwName, Ctx, ModCfg) of
+        {ok, ListenerPids} ->
+            {ok, ListenerPids, _GwState = #{ctx => Ctx}};
+        {error, {Reason, Listener}} ->
+            throw({badconf, #{ key => listeners
+                             , vallue => Listener
+                             , reason => Reason
+                             }})
+    end.
 
 on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
     GwName = maps:get(name, Gateway),
@@ -95,68 +112,5 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
 on_gateway_unload(_Gateway = #{ name := GwName,
                                 config := Config
                               }, _GwState) ->
-    Listeners = emqx_gateway_utils:normalize_config(Config),
-    lists:foreach(fun(Lis) ->
-        stop_listener(GwName, Lis)
-    end, Listeners).
-
-%%--------------------------------------------------------------------
-%% Internal funcs
-%%--------------------------------------------------------------------
-
-start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
-    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
-    case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
-        {ok, Pid} ->
-            console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
-                          [GwName, Type, LisName, ListenOnStr]),
-            Pid;
-        {error, Reason} ->
-            ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
-                  [GwName, Type, LisName, ListenOnStr, Reason]),
-            throw({badconf, Reason})
-    end.
-
-start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
-    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    NCfg = Cfg#{
-             ctx => Ctx,
-             listene => {GwName, Type, LisName},
-             frame_mod => emqx_sn_frame,
-             chann_mod => emqx_sn_channel
-            },
-    esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
-                    {emqx_gateway_conn, start_link, [NCfg]}).
-
-merge_default(Options) ->
-    Default = emqx_gateway_utils:default_udp_options(),
-    case lists:keytake(udp_options, 1, Options) of
-        {value, {udp_options, TcpOpts}, Options1} ->
-            [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)}
-             | Options1];
-        false ->
-            [{udp_options, Default} | Options]
-    end.
-
-stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
-    StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
-    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
-    case StopRet of
-        ok ->
-            console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
-                          [GwName, Type, LisName, ListenOnStr]);
-        {error, Reason} ->
-            ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
-                  [GwName, Type, LisName, ListenOnStr, Reason])
-    end,
-    StopRet.
-
-stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
-    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    esockd:close(Name, ListenOn).
-
--ifndef(TEST).
-console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
--else.
-console_print(_Fmt, _Args) -> ok.
--endif.
+    Listeners = normalize_config(Config),
+    stop_listeners(GwName, Listeners).

+ 27 - 77
apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl

@@ -18,6 +18,15 @@
 
 -behaviour(emqx_gateway_impl).
 
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_gateway/include/emqx_gateway.hrl").
+
+-import(emqx_gateway_utils,
+        [ normalize_config/1
+        , start_listeners/4
+        , stop_listeners/2
+        ]).
+
 %% APIs
 -export([ reg/0
         , unreg/0
@@ -28,9 +37,6 @@
         , on_gateway_unload/2
         ]).
 
--include_lib("emqx_gateway/include/emqx_gateway.hrl").
--include_lib("emqx/include/logger.hrl").
-
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -52,15 +58,22 @@ unreg() ->
 on_gateway_load(_Gateway = #{ name := GwName,
                               config := Config
                             }, Ctx) ->
-    %% Step1. Fold the config to listeners
-    Listeners = emqx_gateway_utils:normalize_config(Config),
-    %% Step2. Start listeners or escokd:specs
-    ListenerPids = lists:map(fun(Lis) ->
-                     start_listener(GwName, Ctx, Lis)
-                   end, Listeners),
-    %% FIXME: How to throw an exception to interrupt the restart logic ?
-    %% FIXME: Assign ctx to GwState
-    {ok, ListenerPids, _GwState = #{ctx => Ctx}}.
+    Listeners = normalize_config(Config),
+    ModCfg = #{frame_mod => emqx_stomp_frame,
+               chann_mod => emqx_stomp_channel
+              },
+    case start_listeners(
+           Listeners, GwName, Ctx, ModCfg) of
+        {ok, ListenerPids} ->
+            %% FIXME: How to throw an exception to interrupt the restart logic ?
+            %% FIXME: Assign ctx to GwState
+            {ok, ListenerPids, _GwState = #{ctx => Ctx}};
+        {error, {Reason, Listener}} ->
+            throw({badconf, #{ key => listeners
+                             , vallue => Listener
+                             , reason => Reason
+                             }})
+    end.
 
 on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
     GwName = maps:get(name, Gateway),
@@ -80,68 +93,5 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
 on_gateway_unload(_Gateway = #{ name := GwName,
                                 config := Config
                               }, _GwState) ->
-    Listeners = emqx_gateway_utils:normalize_config(Config),
-    lists:foreach(fun(Lis) ->
-        stop_listener(GwName, Lis)
-    end, Listeners).
-
-%%--------------------------------------------------------------------
-%% Internal funcs
-%%--------------------------------------------------------------------
-
-start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
-    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
-    case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
-        {ok, Pid} ->
-            console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
-                          [GwName, Type, LisName, ListenOnStr]),
-            Pid;
-        {error, Reason} ->
-            ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
-                  [GwName, Type, LisName, ListenOnStr, Reason]),
-            throw({badconf, Reason})
-    end.
-
-start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
-    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    NCfg = Cfg#{
-             ctx => Ctx,
-             listener => {GwName, Type, LisName},   %% Used for authn
-             frame_mod => emqx_stomp_frame,
-             chann_mod => emqx_stomp_channel
-            },
-    esockd:open(Name, ListenOn, merge_default(SocketOpts),
-                {emqx_gateway_conn, start_link, [NCfg]}).
-
-merge_default(Options) ->
-    Default = emqx_gateway_utils:default_tcp_options(),
-    case lists:keytake(tcp_options, 1, Options) of
-        {value, {tcp_options, TcpOpts}, Options1} ->
-            [{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)}
-             | Options1];
-        false ->
-            [{tcp_options, Default} | Options]
-    end.
-
-stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
-    StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
-    ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
-    case StopRet of
-        ok ->
-            console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
-                          [GwName, Type, LisName, ListenOnStr]);
-        {error, Reason} ->
-            ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
-                  [GwName, Type, LisName, ListenOnStr, Reason])
-    end,
-    StopRet.
-
-stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
-    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    esockd:close(Name, ListenOn).
-
--ifndef(TEST).
-console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
--else.
-console_print(_Fmt, _Args) -> ok.
--endif.
+    Listeners = normalize_config(Config),
+    stop_listeners(GwName, Listeners).