|
|
@@ -377,9 +377,14 @@ common(_StateName, {call, From}, {ensure_present, What, Topic}, State) ->
|
|
|
common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
|
|
|
{Result, NewState} = ensure_absent(What, Topic, State),
|
|
|
{keep_state, NewState, [{reply, From, Result}]};
|
|
|
-common(_StateName, info, {deliver, _, Msg}, #{replayq := Q, if_record_metrics := IfRecordMetric} = State) ->
|
|
|
- bridges_metrics_inc(IfRecordMetric, 'bridge.mqtt.message_received'),
|
|
|
- NewQ = replayq:append(Q, collect([Msg])),
|
|
|
+common(_StateName, info, {deliver, _, Msg},
|
|
|
+ State = #{replayq := Q, if_record_metrics := IfRecordMetric}) ->
|
|
|
+ Msgs = collect([Msg]),
|
|
|
+ bridges_metrics_inc(IfRecordMetric,
|
|
|
+ 'bridge.mqtt.message_received',
|
|
|
+ length(Msgs)
|
|
|
+ ),
|
|
|
+ NewQ = replayq:append(Q, Msgs),
|
|
|
{keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
|
|
|
common(_StateName, info, {'EXIT', _, _}, State) ->
|
|
|
{keep_state, State};
|
|
|
@@ -586,3 +591,8 @@ bridges_metrics_inc(true, Metric) ->
|
|
|
emqx_metrics:inc(Metric);
|
|
|
bridges_metrics_inc(_IsRecordMetric, _Metric) ->
|
|
|
ok.
|
|
|
+
|
|
|
+bridges_metrics_inc(true, Metric, Value) ->
|
|
|
+ emqx_metrics:inc(Metric, Value);
|
|
|
+bridges_metrics_inc(_IsRecordMetric, _Metric, _Value) ->
|
|
|
+ ok.
|