Просмотр исходного кода

fix(mqtt_bridge): the mqtt bridge hangs with an unreachable IP

Shawn 4 лет назад
Родитель
Сommit
a54668e83b
1 измененных файлов с 36 добавлено и 22 удалено
  1. 36 22
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl

+ 36 - 22
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl

@@ -410,21 +410,35 @@ start_resource(ResId, PoolName, Options) ->
     end.
 
 test_resource_status(PoolName) ->
-    IsConnected = fun(Worker) ->
-                          case ecpool_worker:client(Worker) of
-                              {ok, Bridge} ->
-                                  try emqx_bridge_worker:status(Bridge) of
-                                      connected -> true;
-                                      _ -> false
-                                  catch _Error:_Reason ->
-                                          false
-                                  end;
-                              {error, _} ->
-                                  false
-                          end
-                  end,
-    Status = [IsConnected(Worker) || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
-    lists:any(fun(St) -> St =:= true end, Status).
+    Parent = self(),
+    Pids = [spawn(fun() -> Parent ! {self(), get_worker_status(Worker)} end)
+            || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+    try
+        Status = [
+            receive {Pid, R} -> R
+            after 1000 -> %% get_worker_status/1 should be a quick operation
+                throw({timeout, Pid})
+            end || Pid <- Pids],
+        lists:any(fun(St) -> St =:= true end, Status)
+    catch
+        throw:Reason ->
+            ?LOG(error, "Get mqtt bridge status timeout: ~p", [Reason]),
+            lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids),
+            false
+    end.
+
+get_worker_status(Worker) ->
+    case ecpool_worker:client(Worker) of
+        {ok, Bridge} ->
+            try emqx_bridge_worker:status(Bridge) of
+                connected -> true;
+                _ -> false
+            catch _Error:_Reason ->
+                    false
+            end;
+        {error, _} ->
+            false
+    end.
 
 -spec(on_get_resource_status(ResId::binary(), Params::map()) -> Status::map()).
 on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) ->
@@ -433,13 +447,13 @@ on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) ->
 
 on_resource_destroy(ResId, #{<<"pool">> := PoolName}) ->
     ?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]),
-        case ecpool:stop_sup_pool(PoolName) of
-            ok ->
-                ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]);
-            {error, Reason} ->
-                ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]),
-                error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed})
-        end.
+    case ecpool:stop_sup_pool(PoolName) of
+        ok ->
+            ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]);
+        {error, Reason} ->
+            ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]),
+            error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed})
+    end.
 
 on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName,
                                                      <<"forward_topic">> := ForwardTopic,