Procházet zdrojové kódy

Merge pull request #12175 from HJianBo/fix-ocpp-listener-bug

fix(ocpp): fix 500 return for `PUT ...gateway/ocpp/<listener_id>`
zhongwencool před 2 roky
rodič
revize
caf1f33bdf

+ 41 - 2
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -321,9 +321,48 @@ stop_listener(GwName, {Type, LisName, ListenOn, Cfg}) ->
     end,
     StopRet.
 
-stop_listener(GwName, Type, LisName, ListenOn, _Cfg) ->
+stop_listener(GwName, Type, LisName, ListenOn, _Cfg) when
+    Type == tcp;
+    Type == ssl;
+    Type == udp;
+    Type == dtls
+->
+    Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
+    esockd:close(Name, ListenOn);
+stop_listener(GwName, Type, LisName, ListenOn, _Cfg) when
+    Type == ws; Type == wss
+->
     Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
-    esockd:close(Name, ListenOn).
+    case cowboy:stop_listener(Name) of
+        ok ->
+            wait_listener_stopped(ListenOn);
+        Error ->
+            Error
+    end.
+
+wait_listener_stopped(ListenOn) ->
+    % NOTE
+    % `cowboy:stop_listener/1` will not close the listening socket explicitly,
+    % it will be closed by the runtime system **only after** the process exits.
+    Endpoint = maps:from_list(ip_port(ListenOn)),
+    case
+        gen_tcp:connect(
+            maps:get(ip, Endpoint, loopback),
+            maps:get(port, Endpoint),
+            [{active, false}]
+        )
+    of
+        {error, _EConnrefused} ->
+            %% NOTE
+            %% We should get `econnrefused` here because acceptors are already dead
+            %% but don't want to crash if not, because this doesn't make any difference.
+            ok;
+        {ok, Socket} ->
+            %% NOTE
+            %% Tiny chance to get a connected socket here, when some other process
+            %% concurrently binds to the same port.
+            gen_tcp:close(Socket)
+    end.
 
 -ifndef(TEST).
 console_print(Fmt, Args) -> ?ULOG(Fmt, Args).

+ 1 - 1
apps/emqx_gateway_ocpp/src/emqx_ocpp_schemas.erl

@@ -89,7 +89,7 @@ feedvar(Path) ->
     binary_to_list(
         emqx_placeholder:proc_tmpl(
             emqx_placeholder:preproc_tmpl(Path),
-            #{application_priv => code:priv_dir(emqx_ocpp)}
+            #{application_priv => code:priv_dir(emqx_gateway_ocpp)}
         )
     ).
 

+ 108 - 25
apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl

@@ -16,36 +16,119 @@
 
 -module(emqx_ocpp_SUITE).
 
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
 -compile(export_all).
 -compile(nowarn_export_all).
 
--include_lib("emqx/include/emqx.hrl").
+-import(
+    emqx_gateway_test_utils,
+    [
+        assert_fields_exist/2,
+        request/2,
+        request/3
+    ]
+).
 
--include_lib("eunit/include/eunit.hrl").
--include_lib("common_test/include/ct.hrl").
+-define(HEARTBEAT, <<$\n>>).
+
+-define(CONF_DEFAULT, <<
+    "\n"
+    "gateway.ocpp {\n"
+    "  mountpoint = \"ocpp/\"\n"
+    "  default_heartbeat_interval = \"60s\"\n"
+    "  heartbeat_checking_times_backoff = 1\n"
+    "  message_format_checking = disable\n"
+    "  upstream {\n"
+    "    topic = \"cp/${clientid}\"\n"
+    "    reply_topic = \"cp/${clientid}/Reply\"\n"
+    "    error_topic = \"cp/${clientid}/Reply\"\n"
+    "  }\n"
+    "  dnstream {\n"
+    "    topic = \"cs/${clientid}\"\n"
+    "  }\n"
+    "  listeners.ws.default {\n"
+    "      bind = \"0.0.0.0:33033\"\n"
+    "      websocket.path = \"/ocpp\"\n"
+    "  }\n"
+    "}\n"
+>>).
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+init_per_suite(Config) ->
+    application:load(emqx_gateway_ocpp),
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx_conf, ?CONF_DEFAULT},
+            emqx_gateway,
+            emqx_auth,
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    emqx_common_test_http:create_default_app(),
+    [{suite_apps, Apps} | Config].
 
-all() ->
-    emqx_common_test_helpers:all(?MODULE).
-
-init_per_suite(Conf) ->
-    emqx_ct_helpers:start_apps([emqx_gateway_ocpp], fun set_special_cfg/1),
-    Conf.
-
-end_per_suite(_Config) ->
-    emqx_ct_helpers:stop_apps([emqx_gateway_ocpp]).
-
-set_special_cfg(emqx) ->
-    application:set_env(emqx, allow_anonymous, true),
-    application:set_env(emqx, enable_acl_cache, false),
-    LoadedPluginPath = filename:join(["test", "emqx_SUITE_data", "loaded_plugins"]),
-    application:set_env(
-        emqx,
-        plugins_loaded_file,
-        emqx_ct_helpers:deps_path(emqx, LoadedPluginPath)
-    );
-set_special_cfg(_App) ->
+end_per_suite(Config) ->
+    emqx_common_test_http:delete_default_app(),
+    emqx_cth_suite:stop(?config(suite_apps, Config)),
     ok.
 
+default_config() ->
+    ?CONF_DEFAULT.
+
 %%--------------------------------------------------------------------
-%% Testcases
-%%---------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_update_listeners(_Config) ->
+    {200, [DefaultListener]} = request(get, "/gateways/ocpp/listeners"),
+
+    ListenerConfKeys =
+        [
+            id,
+            type,
+            name,
+            enable,
+            enable_authn,
+            bind,
+            acceptors,
+            max_connections,
+            max_conn_rate,
+            proxy_protocol,
+            proxy_protocol_timeout,
+            websocket,
+            tcp_options
+        ],
+    StatusKeys = [status, node_status],
+
+    assert_fields_exist(ListenerConfKeys ++ StatusKeys, DefaultListener),
+    ?assertMatch(
+        #{
+            id := <<"ocpp:ws:default">>,
+            type := <<"ws">>,
+            name := <<"default">>,
+            enable := true,
+            enable_authn := true,
+            bind := <<"0.0.0.0:33033">>,
+            websocket := #{path := <<"/ocpp">>}
+        },
+        DefaultListener
+    ),
+
+    UpdateBody = emqx_utils_maps:deep_put(
+        [websocket, path],
+        maps:with(ListenerConfKeys, DefaultListener),
+        <<"/ocpp2">>
+    ),
+    {200, _} = request(put, "/gateways/ocpp/listeners/ocpp:ws:default", UpdateBody),
+
+    {200, [UpdatedListener]} = request(get, "/gateways/ocpp/listeners"),
+    ?assertMatch(#{websocket := #{path := <<"/ocpp2">>}}, UpdatedListener).