|
|
@@ -72,7 +72,7 @@ init([Node, Topic, Options]) ->
|
|
|
MQueue = emqttd_mqueue:new(qname(Node, Topic),
|
|
|
[{max_len, State#state.max_queue_len}],
|
|
|
emqttd_alarm:alarm_fun()),
|
|
|
- emqttd_pubsub:subscribe({Topic, State#state.qos}),
|
|
|
+ emqttd:subscribe(Topic),
|
|
|
{ok, State#state{mqueue = MQueue}};
|
|
|
false ->
|
|
|
{stop, {cannot_connect, Node}}
|
|
|
@@ -108,7 +108,7 @@ handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = MQ, status = down})
|
|
|
{noreply, State#state{mqueue = emqttd_mqueue:in(Msg, MQ)}};
|
|
|
|
|
|
handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) ->
|
|
|
- rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]),
|
|
|
+ rpc:cast(Node, emqttd, publish, [transform(Msg, State)]),
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
|