|
@@ -806,7 +806,7 @@ await(PacketId, Msg, State = #state{inflight = Inflight}) ->
|
|
|
acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
|
|
acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
|
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
|
{value, {publish, {_, Msg}, _Ts}} ->
|
|
{value, {publish, {_, Msg}, _Ts}} ->
|
|
|
- emqx_hooks:run('message.acked', [#{client_id =>ClientId}], Msg),
|
|
|
|
|
|
|
+ emqx_hooks:run('message.acked', [#{client_id => ClientId}], Msg),
|
|
|
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
|
|
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
|
|
|
none ->
|
|
none ->
|
|
|
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId], State),
|
|
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId], State),
|
|
@@ -816,7 +816,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Infligh
|
|
|
acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
|
|
acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
|
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
|
{value, {publish, {_, Msg}, _Ts}} ->
|
|
{value, {publish, {_, Msg}, _Ts}} ->
|
|
|
- emqx_hooks:run('message.acked', [ClientId], Msg),
|
|
|
|
|
|
|
+ emqx_hooks:run('message.acked', [#{client_id => ClientId}], Msg),
|
|
|
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
|
|
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
|
|
|
{value, {pubrel, PacketId, _Ts}} ->
|
|
{value, {pubrel, PacketId, _Ts}} ->
|
|
|
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId], State),
|
|
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId], State),
|