|
@@ -63,8 +63,9 @@ init([Pool, Id, Node, Topic, Options]) ->
|
|
|
Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
|
|
Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
|
|
|
emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]),
|
|
emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]),
|
|
|
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
|
|
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
|
|
|
- %%TODO: queue....
|
|
|
|
|
- MQueue = emqx_mqueue:new(qname(Node, Topic), [{max_len, State#state.max_queue_len}]),
|
|
|
|
|
|
|
+ MQueue = emqx_mqueue:init(#{type => simple,
|
|
|
|
|
+ max_len => State#state.max_queue_len,
|
|
|
|
|
+ store_qos0 => true}),
|
|
|
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue}};
|
|
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue}};
|
|
|
false ->
|
|
false ->
|
|
|
{stop, {cannot_connect_node, Node}}
|
|
{stop, {cannot_connect_node, Node}}
|