|
|
@@ -24,6 +24,7 @@
|
|
|
%%%
|
|
|
%%% @end
|
|
|
%%%-----------------------------------------------------------------------------
|
|
|
+
|
|
|
-module(emqttd_bridge).
|
|
|
|
|
|
-author("Feng Lee <feng@emqtt.io>").
|
|
|
@@ -47,15 +48,15 @@
|
|
|
qos = ?QOS_2,
|
|
|
topic_suffix = <<>>,
|
|
|
topic_prefix = <<>>,
|
|
|
- mqueue :: emqttd_mqueue:mqueue(),
|
|
|
- max_queue_len = 0,
|
|
|
+ mqueue :: emqttd_mqueue:mqueue(),
|
|
|
+ max_queue_len = 10000,
|
|
|
ping_down_interval = ?PING_DOWN_INTERVAL,
|
|
|
status = up}).
|
|
|
|
|
|
--type option() :: {max_queue_len, pos_integer()} |
|
|
|
- {qos, mqtt_qos()} |
|
|
|
+-type option() :: {qos, mqtt_qos()} |
|
|
|
{topic_suffix, binary()} |
|
|
|
{topic_prefix, binary()} |
|
|
|
+ {max_queue_len, pos_integer()} |
|
|
|
{ping_down_interval, pos_integer()}.
|
|
|
|
|
|
-export_type([option/0]).
|
|
|
@@ -85,7 +86,7 @@ init([Node, SubTopic, Options]) ->
|
|
|
MQueue = emqttd_mqueue:new(qname(Node, SubTopic),
|
|
|
[{max_len, State#state.max_queue_len}],
|
|
|
emqttd_alarm:alarm_fun()),
|
|
|
- emqttd_pubsub:subscribe({SubTopic, State#state.qos}),
|
|
|
+ emqttd_pubsub:subscribe(SubTopic, State#state.qos),
|
|
|
{ok, State#state{mqueue = MQueue}};
|
|
|
false ->
|
|
|
{stop, {cannot_connect, Node}}
|
|
|
@@ -102,7 +103,9 @@ parse_opts([{topic_prefix, Prefix} | Opts], State) ->
|
|
|
parse_opts([{max_queue_len, Len} | Opts], State) ->
|
|
|
parse_opts(Opts, State#state{max_queue_len = Len});
|
|
|
parse_opts([{ping_down_interval, Interval} | Opts], State) ->
|
|
|
- parse_opts(Opts, State#state{ping_down_interval = Interval*1000}).
|
|
|
+ parse_opts(Opts, State#state{ping_down_interval = Interval*1000});
|
|
|
+parse_opts([_Opt | Opts], State) ->
|
|
|
+ parse_opts(Opts, State).
|
|
|
|
|
|
qname(Node, SubTopic) when is_atom(Node) ->
|
|
|
qname(atom_to_list(Node), SubTopic);
|