Gilbert 6 лет назад
Родитель
Сommit
b6fa4d2a3f
2 измененных файлов с 17 добавлено и 5 удалено
  1. 9 2
      src/emqx_bridge.erl
  2. 8 3
      src/emqx_bridge_mqtt.erl

+ 9 - 2
src/emqx_bridge.erl

@@ -276,7 +276,8 @@ init(Config) ->
        forwards => Topics,
        subscriptions => Subs,
        replayq => Queue,
-       inflight => []
+       inflight => [],
+       connection => undefined
       }}.
 
 code_change(_Vsn, State, Data, _Extra) ->
@@ -370,7 +371,7 @@ connected(info, {disconnected, ConnRef, Reason},
         true ->
             ?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]),
             {next_state, connecting,
-             State#{conn_ref := undefined, connection := undefined}};
+             State#{conn_ref => undefined, connection => undefined}};
         false ->
             keep_state_and_data
     end;
@@ -446,6 +447,9 @@ is_topic_present(Topic, Topics) ->
 
 do_ensure_present(forwards, Topic, _) ->
     ok = subscribe_local_topic(Topic);
+do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule,
+                                           connection := undefined}) ->
+    {error, no_connection};
 do_ensure_present(subscriptions, {Topic, QoS},
                   #{connect_module := ConnectModule, connection := Conn}) ->
     case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of
@@ -458,6 +462,9 @@ do_ensure_present(subscriptions, {Topic, QoS},
 
 do_ensure_absent(forwards, Topic, _) ->
     ok = emqx_broker:unsubscribe(Topic);
+do_ensure_absent(subscriptions, _Topic, #{connect_module := _ConnectModule,
+                                          connection := undefined}) ->
+    {error, no_connection};
 do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule,
                                          connection := Conn}) ->
     case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of

+ 8 - 3
src/emqx_bridge_mqtt.erl

@@ -84,13 +84,19 @@ stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) ->
     ok.
 
 ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
-    emqx_client:subscribe(Pid, Topic, QoS);
+    case emqx_client:subscribe(Pid, Topic, QoS) of
+        {ok, _, _} -> ok;
+        Error -> Error
+    end;
 ensure_subscribed(_Conn, _Topic, _QoS) ->
     %% return ok for now, next re-connect should should call start with new topic added to config
     ok.
 
 ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
-    emqx_client:unsubscribe(Pid, Topic);
+    case emqx_client:unsubscribe(Pid, Topic) of
+        {ok, _, _} -> ok;
+        Error -> Error
+    end;
 ensure_unsubscribed(_, _) ->
     %% return ok for now, next re-connect should should call start with this topic deleted from config
     ok.
@@ -188,4 +194,3 @@ subscribe_remote_topics(ClientPid, Subscriptions) ->
                               Error -> throw(Error)
                           end
                   end, Subscriptions).
-