emqttd_bridge.erl 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqttd_bridge).
  17. -behaviour(gen_server2).
  18. -include("emqttd.hrl").
  19. -include("emqttd_protocol.hrl").
  20. -include("emqttd_internal.hrl").
  21. %% API Function Exports
  22. -export([start_link/3]).
  23. %% gen_server Function Exports
  24. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  25. terminate/2, code_change/3]).
  26. -define(PING_DOWN_INTERVAL, 1000).
  27. -record(state, {node, subtopic,
  28. qos = ?QOS_2,
  29. topic_suffix = <<>>,
  30. topic_prefix = <<>>,
  31. mqueue :: emqttd_mqueue:mqueue(),
  32. max_queue_len = 10000,
  33. ping_down_interval = ?PING_DOWN_INTERVAL,
  34. status = up}).
  35. -type option() :: {qos, mqtt_qos()} |
  36. {topic_suffix, binary()} |
  37. {topic_prefix, binary()} |
  38. {max_queue_len, pos_integer()} |
  39. {ping_down_interval, pos_integer()}.
  40. -export_type([option/0]).
  41. %%--------------------------------------------------------------------
  42. %% API
  43. %%--------------------------------------------------------------------
  44. %% @doc Start a bridge
  45. -spec(start_link(atom(), binary(), [option()]) -> {ok, pid()} | ignore | {error, term()}).
  46. start_link(Node, Topic, Options) ->
  47. gen_server2:start_link(?MODULE, [Node, Topic, Options], []).
  48. %%--------------------------------------------------------------------
  49. %% gen_server callbacks
  50. %%--------------------------------------------------------------------
  51. init([Node, Topic, Options]) ->
  52. process_flag(trap_exit, true),
  53. case net_kernel:connect_node(Node) of
  54. true ->
  55. true = erlang:monitor_node(Node, true),
  56. State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
  57. MQueue = emqttd_mqueue:new(qname(Node, Topic),
  58. [{max_len, State#state.max_queue_len}],
  59. emqttd_alarm:alarm_fun()),
  60. emqttd:subscribe(Topic),
  61. {ok, State#state{mqueue = MQueue}};
  62. false ->
  63. {stop, {cannot_connect, Node}}
  64. end.
  65. parse_opts([], State) ->
  66. State;
  67. parse_opts([{qos, Qos} | Opts], State) ->
  68. parse_opts(Opts, State#state{qos = Qos});
  69. parse_opts([{topic_suffix, Suffix} | Opts], State) ->
  70. parse_opts(Opts, State#state{topic_suffix= Suffix});
  71. parse_opts([{topic_prefix, Prefix} | Opts], State) ->
  72. parse_opts(Opts, State#state{topic_prefix = Prefix});
  73. parse_opts([{max_queue_len, Len} | Opts], State) ->
  74. parse_opts(Opts, State#state{max_queue_len = Len});
  75. parse_opts([{ping_down_interval, Interval} | Opts], State) ->
  76. parse_opts(Opts, State#state{ping_down_interval = Interval*1000});
  77. parse_opts([_Opt | Opts], State) ->
  78. parse_opts(Opts, State).
  79. qname(Node, Topic) when is_atom(Node) ->
  80. qname(atom_to_list(Node), Topic);
  81. qname(Node, Topic) ->
  82. iolist_to_binary(["Bridge:", Node, ":", Topic]).
  83. handle_call(Req, _From, State) ->
  84. ?UNEXPECTED_REQ(Req, State).
  85. handle_cast(Msg, State) ->
  86. ?UNEXPECTED_MSG(Msg, State).
  87. handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = MQ, status = down}) ->
  88. {noreply, State#state{mqueue = emqttd_mqueue:in(Msg, MQ)}};
  89. handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) ->
  90. rpc:cast(Node, emqttd, publish, [transform(Msg, State)]),
  91. {noreply, State, hibernate};
  92. handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
  93. lager:warning("Bridge Node Down: ~p", [Node]),
  94. erlang:send_after(Interval, self(), ping_down_node),
  95. {noreply, State#state{status = down}, hibernate};
  96. handle_info({nodeup, Node}, State = #state{node = Node}) ->
  97. %% TODO: Really fast??
  98. case emqttd:is_running(Node) of
  99. true ->
  100. lager:warning("Bridge Node Up: ~p", [Node]),
  101. {noreply, dequeue(State#state{status = up})};
  102. false ->
  103. self() ! {nodedown, Node},
  104. {noreply, State#state{status = down}}
  105. end;
  106. handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) ->
  107. Self = self(),
  108. spawn_link(fun() ->
  109. case net_kernel:connect_node(Node) of
  110. true -> %%TODO: this is not right... fixme later
  111. Self ! {nodeup, Node};
  112. false ->
  113. erlang:send_after(Interval, Self, ping_down_node)
  114. end
  115. end),
  116. {noreply, State};
  117. handle_info({'EXIT', _Pid, normal}, State) ->
  118. {noreply, State};
  119. handle_info(Info, State) ->
  120. ?UNEXPECTED_INFO(Info, State).
  121. terminate(_Reason, _State) ->
  122. ok.
  123. code_change(_OldVsn, State, _Extra) ->
  124. {ok, State}.
  125. %%--------------------------------------------------------------------
  126. %% Internal functions
  127. %%--------------------------------------------------------------------
  128. dequeue(State = #state{mqueue = MQ}) ->
  129. case emqttd_mqueue:out(MQ) of
  130. {empty, MQ1} ->
  131. State#state{mqueue = MQ1};
  132. {{value, Msg}, MQ1} ->
  133. handle_info({dispatch, Msg#mqtt_message.topic, Msg}, State),
  134. dequeue(State#state{mqueue = MQ1})
  135. end.
  136. transform(Msg = #mqtt_message{topic = Topic}, #state{topic_prefix = Prefix,
  137. topic_suffix = Suffix}) ->
  138. Msg#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.