|
|
@@ -277,10 +277,9 @@ t_batch_query_counter(_) ->
|
|
|
fun(Result, Trace) ->
|
|
|
?assertMatch({ok, 0}, Result),
|
|
|
QueryTrace = ?of_kind(call_batch_query, Trace),
|
|
|
- ?assertMatch([#{batch := [{query, _, get_counter, _, _}]}], QueryTrace)
|
|
|
+ ?assertMatch([#{batch := [{query, _, get_counter, _, _, _}]}], QueryTrace)
|
|
|
end
|
|
|
),
|
|
|
-
|
|
|
NMsgs = 1_000,
|
|
|
?check_trace(
|
|
|
?TRACE_OPTS,
|
|
|
@@ -340,7 +339,7 @@ t_query_counter_async_query(_) ->
|
|
|
fun(Trace) ->
|
|
|
%% the callback_mode of 'emqx_connector_demo' is 'always_sync'.
|
|
|
QueryTrace = ?of_kind(call_query, Trace),
|
|
|
- ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
|
|
|
+ ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace)
|
|
|
end
|
|
|
),
|
|
|
%% simple query ignores the query_mode and batching settings in the resource_worker
|
|
|
@@ -351,7 +350,7 @@ t_query_counter_async_query(_) ->
|
|
|
?assertMatch({ok, 1000}, Result),
|
|
|
%% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
|
|
|
QueryTrace = ?of_kind(call_query, Trace),
|
|
|
- ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace)
|
|
|
+ ?assertMatch([#{query := {query, _, get_counter, _, _, _}}], QueryTrace)
|
|
|
end
|
|
|
),
|
|
|
#{counters := C} = emqx_resource:get_metrics(?ID),
|
|
|
@@ -397,7 +396,7 @@ t_query_counter_async_callback(_) ->
|
|
|
end,
|
|
|
fun(Trace) ->
|
|
|
QueryTrace = ?of_kind(call_query_async, Trace),
|
|
|
- ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
|
|
|
+ ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace)
|
|
|
end
|
|
|
),
|
|
|
|
|
|
@@ -408,7 +407,7 @@ t_query_counter_async_callback(_) ->
|
|
|
fun(Result, Trace) ->
|
|
|
?assertMatch({ok, 1000}, Result),
|
|
|
QueryTrace = ?of_kind(call_query, Trace),
|
|
|
- ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace)
|
|
|
+ ?assertMatch([#{query := {query, _, get_counter, _, _, _}}], QueryTrace)
|
|
|
end
|
|
|
),
|
|
|
#{counters := C} = emqx_resource:get_metrics(?ID),
|
|
|
@@ -480,7 +479,7 @@ t_query_counter_async_inflight(_) ->
|
|
|
),
|
|
|
fun(Trace) ->
|
|
|
QueryTrace = ?of_kind(call_query_async, Trace),
|
|
|
- ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
|
|
|
+ ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace)
|
|
|
end
|
|
|
),
|
|
|
tap_metrics(?LINE),
|
|
|
@@ -537,7 +536,7 @@ t_query_counter_async_inflight(_) ->
|
|
|
end,
|
|
|
fun(Trace) ->
|
|
|
QueryTrace = ?of_kind(call_query_async, Trace),
|
|
|
- ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace),
|
|
|
+ ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _, _}} | _], QueryTrace),
|
|
|
?assertEqual(WindowSize + Num + 1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
|
|
tap_metrics(?LINE),
|
|
|
ok
|
|
|
@@ -557,7 +556,7 @@ t_query_counter_async_inflight(_) ->
|
|
|
),
|
|
|
fun(Trace) ->
|
|
|
QueryTrace = ?of_kind(call_query_async, Trace),
|
|
|
- ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
|
|
|
+ ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace)
|
|
|
end
|
|
|
),
|
|
|
|
|
|
@@ -669,8 +668,8 @@ t_query_counter_async_inflight_batch(_) ->
|
|
|
|| Event = #{
|
|
|
?snk_kind := call_batch_query_async,
|
|
|
batch := [
|
|
|
- {query, _, {inc_counter, 1}, _, _},
|
|
|
- {query, _, {inc_counter, 1}, _, _}
|
|
|
+ {query, _, {inc_counter, 1}, _, _, _},
|
|
|
+ {query, _, {inc_counter, 1}, _, _, _}
|
|
|
]
|
|
|
} <-
|
|
|
Trace
|
|
|
@@ -754,7 +753,7 @@ t_query_counter_async_inflight_batch(_) ->
|
|
|
fun(Trace) ->
|
|
|
QueryTrace = ?of_kind(call_batch_query_async, Trace),
|
|
|
?assertMatch(
|
|
|
- [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _],
|
|
|
+ [#{batch := [{query, _, {inc_counter, _}, _, _, _} | _]} | _],
|
|
|
QueryTrace
|
|
|
)
|
|
|
end
|
|
|
@@ -779,7 +778,7 @@ t_query_counter_async_inflight_batch(_) ->
|
|
|
fun(Trace) ->
|
|
|
QueryTrace = ?of_kind(call_batch_query_async, Trace),
|
|
|
?assertMatch(
|
|
|
- [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _],
|
|
|
+ [#{batch := [{query, _, {inc_counter, _}, _, _, _} | _]} | _],
|
|
|
QueryTrace
|
|
|
)
|
|
|
end
|
|
|
@@ -2051,7 +2050,7 @@ do_t_expiration_before_sending(QueryMode) ->
|
|
|
end,
|
|
|
fun(Trace) ->
|
|
|
?assertMatch(
|
|
|
- [#{batch := [{query, _, {inc_counter, 99}, _, _}]}],
|
|
|
+ [#{batch := [{query, _, {inc_counter, 99}, _, _, _}]}],
|
|
|
?of_kind(buffer_worker_flush_all_expired, Trace)
|
|
|
),
|
|
|
Metrics = tap_metrics(?LINE),
|
|
|
@@ -2167,7 +2166,7 @@ do_t_expiration_before_sending_partial_batch(QueryMode) ->
|
|
|
#{
|
|
|
?snk_kind := handle_async_reply,
|
|
|
action := ack,
|
|
|
- batch_or_query := [{query, _, {inc_counter, 99}, _, _}]
|
|
|
+ batch_or_query := [{query, _, {inc_counter, 99}, _, _, _}]
|
|
|
},
|
|
|
10 * TimeoutMS
|
|
|
);
|
|
|
@@ -2189,8 +2188,8 @@ do_t_expiration_before_sending_partial_batch(QueryMode) ->
|
|
|
?assertMatch(
|
|
|
[
|
|
|
#{
|
|
|
- expired := [{query, _, {inc_counter, 199}, _, _}],
|
|
|
- not_expired := [{query, _, {inc_counter, 99}, _, _}]
|
|
|
+ expired := [{query, _, {inc_counter, 199}, _, _, _}],
|
|
|
+ not_expired := [{query, _, {inc_counter, 99}, _, _, _}]
|
|
|
}
|
|
|
],
|
|
|
?of_kind(buffer_worker_flush_potentially_partial, Trace)
|
|
|
@@ -2303,7 +2302,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
|
|
|
#{?snk_kind := delay},
|
|
|
#{
|
|
|
?snk_kind := handle_async_reply_enter,
|
|
|
- batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
|
|
|
+ batch_or_query := [{query, _, {inc_counter, 199}, _, _, _} | _]
|
|
|
}
|
|
|
),
|
|
|
|
|
|
@@ -2346,8 +2345,8 @@ do_t_expiration_async_after_reply(IsBatch) ->
|
|
|
[
|
|
|
#{
|
|
|
expired := [
|
|
|
- {query, _, {inc_counter, 199}, _, _},
|
|
|
- {query, _, {inc_counter, 299}, _, _}
|
|
|
+ {query, _, {inc_counter, 199}, _, _, _},
|
|
|
+ {query, _, {inc_counter, 299}, _, _, _}
|
|
|
]
|
|
|
}
|
|
|
],
|
|
|
@@ -2365,8 +2364,8 @@ do_t_expiration_async_after_reply(IsBatch) ->
|
|
|
single ->
|
|
|
?assertMatch(
|
|
|
[
|
|
|
- #{expired := [{query, _, {inc_counter, 199}, _, _}]},
|
|
|
- #{expired := [{query, _, {inc_counter, 299}, _, _}]}
|
|
|
+ #{expired := [{query, _, {inc_counter, 199}, _, _, _}]},
|
|
|
+ #{expired := [{query, _, {inc_counter, 299}, _, _, _}]}
|
|
|
],
|
|
|
?of_kind(handle_async_reply_expired, Trace)
|
|
|
)
|
|
|
@@ -2417,7 +2416,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
|
|
|
#{?snk_kind := delay},
|
|
|
#{
|
|
|
?snk_kind := handle_async_reply_enter,
|
|
|
- batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
|
|
|
+ batch_or_query := [{query, _, {inc_counter, 199}, _, _, _} | _]
|
|
|
}
|
|
|
),
|
|
|
|
|
|
@@ -2451,8 +2450,8 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
|
|
|
[
|
|
|
#{
|
|
|
expired := [
|
|
|
- {query, _, {inc_counter, 199}, _, _},
|
|
|
- {query, _, {inc_counter, 299}, _, _}
|
|
|
+ {query, _, {inc_counter, 199}, _, _, _},
|
|
|
+ {query, _, {inc_counter, 299}, _, _, _}
|
|
|
]
|
|
|
}
|
|
|
],
|
|
|
@@ -2578,7 +2577,7 @@ do_t_expiration_retry() ->
|
|
|
end,
|
|
|
fun(Trace) ->
|
|
|
?assertMatch(
|
|
|
- [#{expired := [{query, _, {inc_counter, 1}, _, _}]}],
|
|
|
+ [#{expired := [{query, _, {inc_counter, 1}, _, _, _}]}],
|
|
|
?of_kind(buffer_worker_retry_expired, Trace)
|
|
|
),
|
|
|
Metrics = tap_metrics(?LINE),
|
|
|
@@ -2655,8 +2654,8 @@ t_expiration_retry_batch_multiple_times(_Config) ->
|
|
|
fun(Trace) ->
|
|
|
?assertMatch(
|
|
|
[
|
|
|
- #{expired := [{query, _, {inc_counter, 1}, _, _}]},
|
|
|
- #{expired := [{query, _, {inc_counter, 2}, _, _}]}
|
|
|
+ #{expired := [{query, _, {inc_counter, 1}, _, _, _}]},
|
|
|
+ #{expired := [{query, _, {inc_counter, 2}, _, _, _}]}
|
|
|
],
|
|
|
?of_kind(buffer_worker_retry_expired, Trace)
|
|
|
),
|