Procházet zdrojové kódy

Redesign ensure_start and ensure_stop api of bridge

Gilbert Wong před 6 roky
rodič
revize
4d2bc48822
2 změnil soubory, kde provedl 34 přidání a 21 odebrání
  1. 33 19
      src/emqx_bridge.erl
  2. 1 2
      src/emqx_bridge_connect.erl

+ 33 - 19
src/emqx_bridge.erl

@@ -120,6 +120,7 @@
 -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
 -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
 -define(DEFAULT_SEG_BYTES, (1 bsl 20)).
 -define(DEFAULT_SEG_BYTES, (1 bsl 20)).
 -define(NO_BRIDGE_HANDLER, undefined).
 -define(NO_BRIDGE_HANDLER, undefined).
+-define(NO_FROM, undefined).
 -define(maybe_send, {next_event, internal, maybe_send}).
 -define(maybe_send, {next_event, internal, maybe_send}).
 
 
 %% @doc Start a bridge worker. Supported configs:
 %% @doc Start a bridge worker. Supported configs:
@@ -297,8 +298,7 @@ standing_by(enter, _, #{start_type := auto}) ->
 standing_by(enter, _, #{start_type := manual}) ->
 standing_by(enter, _, #{start_type := manual}) ->
     keep_state_and_data;
     keep_state_and_data;
 standing_by({call, From}, ensure_started, State) ->
 standing_by({call, From}, ensure_started, State) ->
-    {next_state, connecting, State,
-     [{reply, From, ok}]};
+    do_connect({call, From}, standing_by, State);
 standing_by(state_timeout, do_connect, State) ->
 standing_by(state_timeout, do_connect, State) ->
     {next_state, connecting, State};
     {next_state, connecting, State};
 standing_by(info, Info, State) ->
 standing_by(info, Info, State) ->
@@ -313,23 +313,8 @@ standing_by(Type, Content, State) ->
 connecting(enter, connected, #{reconnect_delay_ms := Timeout}) ->
 connecting(enter, connected, #{reconnect_delay_ms := Timeout}) ->
     Action = {state_timeout, Timeout, reconnect},
     Action = {state_timeout, Timeout, reconnect},
     {keep_state_and_data, Action};
     {keep_state_and_data, Action};
-connecting(enter, _, #{reconnect_delay_ms := Timeout,
-                       connect_fun := ConnectFun,
-                       subscriptions := Subs,
-                       forwards := Forwards
-                      } = State) ->
-    ok = subscribe_local_topics(Forwards),
-    case ConnectFun(Subs) of
-        {ok, ConnRef, Conn} ->
-            ?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
-            Action = {state_timeout, 0, connected},
-            State0 = State#{conn_ref => ConnRef, connection => Conn},
-            State1 = eval_bridge_handler(State0, connected),
-            {keep_state, State1, Action};
-        error ->
-            Action = {state_timeout, Timeout, reconnect},
-            {keep_state_and_data, Action}
-    end;
+connecting(enter, _, State) ->
+    do_connect(enter, connecting, State);
 connecting(state_timeout, connected, State) ->
 connecting(state_timeout, connected, State) ->
     {next_state, connected, State};
     {next_state, connected, State};
 connecting(state_timeout, reconnect, _State) ->
 connecting(state_timeout, reconnect, _State) ->
@@ -455,6 +440,35 @@ is_topic_present({Topic, _QoS}, Topics) ->
 is_topic_present(Topic, Topics) ->
 is_topic_present(Topic, Topics) ->
     lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics).
     lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics).
 
 
+do_connect(Type, StateName, #{ forwards := Forwards
+                             , subscriptions := Subs
+                             , connect_fun := ConnectFun
+                             , reconnect_delay_ms := Timeout
+                             } = State) ->
+    ok = subscribe_local_topics(Forwards),
+    From = case StateName of
+               standing_by -> {call, Pid} = Type, Pid;
+               connecting -> ?NO_FROM
+           end,
+    DoEvent = fun (standing_by, StandingbyAction, _ConnectingAction) ->
+                      StandingbyAction;
+                  (connecting, _StandingbyAction, ConnectingAction) ->
+                      ConnectingAction
+              end,
+    case ConnectFun(Subs) of
+        {ok, ConnRef, Conn} ->
+            ?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
+            State0 = State#{conn_ref => ConnRef, connection => Conn},
+            State1 = eval_bridge_handler(State0, connected),
+            StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]},
+            ConnectingAction = {keep_state, State1, {state_timeout, 0, connected}},
+            DoEvent(StateName, StandingbyAction, ConnectingAction);
+        {error, Reason} ->
+            StandingbyAction = {keep_state_and_data, [{reply, From, {error, Reason}}]},
+            ConnectingAction = {keep_state_and_data, {state_timeout, Timeout, reconnect}},
+            DoEvent(StateName, StandingbyAction, ConnectingAction)
+    end.
+
 do_ensure_present(forwards, Topic, _) ->
 do_ensure_present(forwards, Topic, _) ->
     ok = subscribe_local_topic(Topic);
     ok = subscribe_local_topic(Topic);
 do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule,
 do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule,

+ 1 - 2
src/emqx_bridge_connect.erl

@@ -56,7 +56,7 @@ start(Module, Config) ->
             Config1 = obfuscate(Config),
             Config1 = obfuscate(Config),
             ?LOG(error, "[Bridge connect] Failed to connect with module=~p\n"
             ?LOG(error, "[Bridge connect] Failed to connect with module=~p\n"
                  "config=~p\nreason:~p", [Module, Config1, Reason]),
                  "config=~p\nreason:~p", [Module, Config1, Reason]),
-            error
+            {error, Reason}
     end.
     end.
 
 
 obfuscate(Map) ->
 obfuscate(Map) ->
@@ -69,4 +69,3 @@ obfuscate(Map) ->
 
 
 is_sensitive(password) -> true;
 is_sensitive(password) -> true;
 is_sensitive(_) -> false.
 is_sensitive(_) -> false.
-