Преглед изворни кода

Merge pull request #12366 from lafirest/fix/sys

fix(sysk): fix probe testing bugs for syskeeper
lafirest пре 2 година
родитељ
комит
dc458ff0db

+ 13 - 2
apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl

@@ -67,11 +67,17 @@ on_start(
         {tcp_options, [{mode, binary}, {reuseaddr, true}, {nodelay, true}]}
     ],
     MFArgs = {?MODULE, start_link, [maps:with([handshake_timeout], Config)]},
-    ok = emqx_resource:allocate_resource(InstanceId, listen_on, ListenOn),
 
+    %% Since the esockd only supports atomic name and we don't want to introduce a new atom per each instance
+    %% when the port is same for two instance/connector, them will reference to a same esockd listener
+    %% to prevent the failed one dealloctes the listener which created by a earlier instance
+    %% we need record only when the listen is successed
     case esockd:open(?MODULE, ListenOn, Options, MFArgs) of
         {ok, _} ->
+            ok = emqx_resource:allocate_resource(InstanceId, listen_on, ListenOn),
             {ok, #{listen_on => ListenOn}};
+        {error, {already_started, _}} ->
+            {error, eaddrinuse};
         Error ->
             Error
     end.
@@ -83,7 +89,12 @@ on_stop(InstanceId, _State) ->
     }),
     case emqx_resource:get_allocated_resources(InstanceId) of
         #{listen_on := ListenOn} ->
-            esockd:close(?MODULE, ListenOn);
+            case esockd:close(?MODULE, ListenOn) of
+                {error, not_found} ->
+                    ok;
+                Result ->
+                    Result
+            end;
         _ ->
             ok
     end.

+ 5 - 2
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -107,7 +107,7 @@ parse_connector_id(ConnectorId) ->
     {atom(), atom() | binary()}.
 parse_connector_id(<<"connector:", ConnectorId/binary>>, Opts) ->
     parse_connector_id(ConnectorId, Opts);
-parse_connector_id(<<?TEST_ID_PREFIX, ConnectorId/binary>>, Opts) ->
+parse_connector_id(<<?TEST_ID_PREFIX, _:16/binary, ConnectorId/binary>>, Opts) ->
     parse_connector_id(ConnectorId, Opts);
 parse_connector_id(ConnectorId, Opts) ->
     emqx_resource:parse_resource_id(ConnectorId, Opts).
@@ -229,7 +229,10 @@ create_dry_run(Type, Conf0, Callback) ->
     TypeBin = bin(Type),
     TypeAtom = safe_atom(Type),
     %% We use a fixed name here to avoid creating an atom
-    TmpName = iolist_to_binary([?TEST_ID_PREFIX, TypeBin, ":", <<"probedryrun">>]),
+    %% to avoid potential race condition, the resource id should be unique
+    Prefix = emqx_resource_manager:make_test_id(),
+    TmpName =
+        iolist_to_binary([Prefix, TypeBin, ":", <<"probedryrun">>]),
     TmpPath = emqx_utils:safe_filename(TmpName),
     Conf1 = maps:without([<<"name">>], Conf0),
     RawConf = #{<<"connectors">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},

+ 2 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -50,7 +50,8 @@
 ]).
 
 -export([
-    set_resource_status_connecting/1
+    set_resource_status_connecting/1,
+    make_test_id/0
 ]).
 
 % Server