Просмотр исходного кода

Provide bridge handler to extense emqx_bridge

Gilbert Wong 7 лет назад
Родитель
Сommit
eb7b1797c2
1 измененных файлов с 18 добавлено и 5 удалено
  1. 18 5
      src/emqx_bridge.erl

+ 18 - 5
src/emqx_bridge.erl

@@ -119,6 +119,7 @@
 -define(DEFAULT_SEND_AHEAD, 8).
 -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
 -define(DEFAULT_SEG_BYTES, (1 bsl 20)).
+-define(NO_BRIDGE_HANDLER, undefined).
 -define(maybe_send, {next_event, internal, maybe_send}).
 
 %% @doc Start a bridge worker. Supported configs:
@@ -277,7 +278,8 @@ init(Config) ->
        subscriptions => Subs,
        replayq => Queue,
        inflight => [],
-       connection => undefined
+       connection => undefined,
+       bridge_handler => Get(bridge_handler, ?NO_BRIDGE_HANDLER)
       }}.
 
 code_change(_Vsn, State, Data, _Extra) ->
@@ -321,7 +323,10 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout,
         {ok, ConnRef, Conn} ->
             ?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
             Action = {state_timeout, 0, connected},
-            {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action};
+            {keep_state,
+             eval_bridge_handler(State#{ conn_ref => ConnRef
+                                       , connection => Conn}, connected),
+             Action};
         error ->
             Action = {state_timeout, Timeout, reconnect},
             {keep_state_and_data, Action}
@@ -416,6 +421,12 @@ common(StateName, Type, Content, State) ->
           [name(), Type, StateName, Content]),
     {keep_state, State}.
 
+eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) ->
+    State;
+eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) ->
+    _ = Handler(Msg),
+    State.
+
 ensure_present(Key, Topic, State) ->
     Topics = maps:get(Key, State),
     case is_topic_present(Topic, Topics) of
@@ -553,9 +564,11 @@ disconnect(#{connection := Conn,
              connect_module := Module
             } = State) when Conn =/= undefined ->
     ok = Module:stop(ConnRef, Conn),
-    State#{conn_ref => undefined,
-           connection => undefined};
-disconnect(State) -> State.
+    eval_bridge_handler(State#{conn_ref => undefined,
+                               connection => undefined},
+                        disconnected);
+disconnect(State) ->
+    eval_bridge_handler(State, disconnected).
 
 %% Called only when replayq needs to dump it to disk.
 msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin);