Просмотр исходного кода

fix(kafka): use async callback to bump success counters

some telemetry events from wolff are discarded:

* dropped:
    this is double counted in wolff,
    we now only subscribe to the dropped_queue_full event
* retried_failed:
    it has different meanings in wolff,
    in wolff, it means it's the 2nd (or onward) produce attempt
    in EMQX, it means it's eventually failed after some retries

* retried_success
    since we are going to handle the success counters in callbac
    this having this reported from wolff will only make things
    harder to understand

* failed
    wolff never fails (unelss drop which is a different counter)
Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
5fdf7fd24c

+ 12 - 4
apps/emqx_bridge/src/emqx_bridge.erl

@@ -171,14 +171,22 @@ send_message(BridgeId, Message) ->
         not_found ->
             {error, {bridge_not_found, BridgeId}};
         #{enable := true} = Config ->
-            Timeout = emqx_map_lib:deep_get(
-                [resource_opts, request_timeout], Config, timer:seconds(15)
-            ),
-            emqx_resource:query(ResId, {send_message, Message}, #{timeout => Timeout});
+            QueryOpts = query_opts(Config),
+            emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
         #{enable := false} ->
             {error, {bridge_stopped, BridgeId}}
     end.
 
+query_opts(Config) ->
+    case emqx_map_lib:deep_get([resource_opts, request_timeout], Config, false) of
+        Timeout when is_integer(Timeout) ->
+            %% request_timeout is configured
+            #{timeout => Timeout};
+        _ ->
+            %% emqx_resource has a default value (15s)
+            #{}
+    end.
+
 config_key_path() ->
     [bridges].
 

+ 2 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -89,6 +89,8 @@
 -define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024).
 -define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>).
 
+-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)).
+
 %% count
 -define(DEFAULT_BATCH_SIZE, 1).
 

+ 3 - 2
apps/emqx_resource/src/emqx_resource.erl

@@ -255,7 +255,7 @@ reset_metrics(ResId) ->
 query(ResId, Request) ->
     query(ResId, Request, #{}).
 
--spec query(resource_id(), Request :: term(), emqx_resource_buffer_worker:query_opts()) ->
+-spec query(resource_id(), Request :: term(), query_opts()) ->
     Result :: term().
 query(ResId, Request, Opts) ->
     case emqx_resource_manager:ets_lookup(ResId) of
@@ -263,7 +263,8 @@ query(ResId, Request, Opts) ->
             IsBufferSupported = is_buffer_supported(Module),
             case {IsBufferSupported, QM} of
                 {true, _} ->
-                    emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
+                    %% only Kafka so far
+                    emqx_resource_buffer_worker:simple_async_query(ResId, Request);
                 {false, sync} ->
                     emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
                 {false, async} ->

+ 43 - 27
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -38,7 +38,8 @@
 ]).
 
 -export([
-    simple_sync_query/2
+    simple_sync_query/2,
+    simple_async_query/2
 ]).
 
 -export([
@@ -61,6 +62,7 @@
 -define(COLLECT_REQ_LIMIT, 1000).
 -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
 -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
+-define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)).
 -define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}).
 -define(EXPAND(RESULT, BATCH), [
     ?REPLY(FROM, REQUEST, SENT, RESULT)
@@ -116,8 +118,8 @@ async_query(Id, Request, Opts0) ->
     emqx_resource_metrics:matched_inc(Id),
     pick_cast(Id, PickKey, {query, Request, Opts}).
 
-%% simple query the resource without batching and queuing messages.
--spec simple_sync_query(id(), request()) -> Result :: term().
+%% simple query the resource without batching and queuing.
+-spec simple_sync_query(id(), request()) -> term().
 simple_sync_query(Id, Request) ->
     %% Note: since calling this function implies in bypassing the
     %% buffer workers, and each buffer worker index is used when
@@ -126,18 +128,27 @@ simple_sync_query(Id, Request) ->
     %% would mess up the metrics anyway.  `undefined' is ignored by
     %% `emqx_resource_metrics:*_shift/3'.
     Index = undefined,
-    QueryOpts0 = #{simple_query => true, timeout => infinity},
-    QueryOpts = #{expire_at := ExpireAt} = ensure_expire_at(QueryOpts0),
+    QueryOpts = simple_query_opts(),
     emqx_resource_metrics:matched_inc(Id),
     Ref = make_message_ref(),
-    HasBeenSent = false,
-    From = self(),
-    Result = call_query(
-        sync, Id, Index, Ref, ?QUERY(From, Request, HasBeenSent, ExpireAt), QueryOpts
-    ),
-    _ = handle_query_result(Id, Result, HasBeenSent),
+    Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
+    _ = handle_query_result(Id, Result, _HasBeenSent = false),
+    Result.
+
+%% simple async-query the resource without batching and queuing.
+-spec simple_async_query(id(), request()) -> term().
+simple_async_query(Id, Request) ->
+    Index = undefined,
+    QueryOpts = simple_query_opts(),
+    emqx_resource_metrics:matched_inc(Id),
+    Ref = make_message_ref(),
+    Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
+    _ = handle_query_result(Id, Result, _HasBeenSent = false),
     Result.
 
+simple_query_opts() ->
+    ensure_expire_at(#{simple_query => true, timeout => infinity}).
+
 -spec block(pid()) -> ok.
 block(ServerRef) ->
     gen_statem:cast(ServerRef, block).
@@ -848,9 +859,9 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
     case emqx_resource_manager:ets_lookup(Id) of
         {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
             QM =
-                case QM0 of
-                    configured -> maps:get(query_mode, Data);
-                    _ -> QM0
+                case QM0 =:= configured of
+                    true -> maps:get(query_mode, Data);
+                    false -> QM0
                 end,
             CBM = maps:get(callback_mode, Data),
             CallMode = call_mode(QM, CBM),
@@ -991,11 +1002,7 @@ do_reply_after_query(
                 ref => Ref,
                 result => Result
             }),
-            IsFullBefore = is_inflight_full(InflightTID),
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
-            IsAcked andalso PostFn(),
-            IsFullBefore andalso ?MODULE:flush_worker(Pid),
-            ok
+            do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
     end.
 
 batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
@@ -1049,13 +1056,23 @@ do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, R
                 ref => Ref,
                 result => Result
             }),
-            IsFullBefore = is_inflight_full(InflightTID),
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
-            IsAcked andalso PostFn(),
-            IsFullBefore andalso ?MODULE:flush_worker(Pid),
-            ok
+            do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
     end.
 
+do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) ->
+    IsFullBefore = is_inflight_full(InflightTID),
+    IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index),
+    case maps:get(simple_query, QueryOpts, false) of
+        true ->
+            PostFn();
+        false when IsKnownRef ->
+            PostFn();
+        false ->
+            ok
+    end,
+    IsFullBefore andalso ?MODULE:flush_worker(WorkerPid),
+    ok.
+
 %%==============================================================================
 %% operations for queue
 queue_item_marshaller(Bin) when is_binary(Bin) ->
@@ -1113,7 +1130,7 @@ inflight_new(InfltWinSZ, Id, Index) ->
     inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
     inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
     inflight_append(
-        TableId, {?INITIAL_MONOTONIC_TIME_REF, erlang:monotonic_time(nanosecond)}, Id, Index
+        TableId, {?INITIAL_MONOTONIC_TIME_REF, make_message_ref()}, Id, Index
     ),
     TableId.
 
@@ -1426,8 +1443,7 @@ now_() ->
 ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) ->
     Opts;
 ensure_timeout_query_opts(#{} = Opts0, sync) ->
-    TimeoutMS = timer:seconds(15),
-    Opts0#{timeout => TimeoutMS};
+    Opts0#{timeout => ?DEFAULT_REQUEST_TIMEOUT};
 ensure_timeout_query_opts(#{} = Opts0, async) ->
     Opts0#{timeout => infinity}.
 

+ 6 - 2
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl

@@ -12,6 +12,7 @@
     on_start/2,
     on_stop/2,
     on_query/3,
+    on_query_async/4,
     on_get_status/2,
     is_buffer_supported/0
 ]).
@@ -26,8 +27,11 @@ on_start(InstId, Config) ->
 on_stop(InstId, State) ->
     emqx_bridge_impl_kafka_producer:on_stop(InstId, State).
 
-on_query(InstId, Msg, State) ->
-    emqx_bridge_impl_kafka_producer:on_query(InstId, Msg, State).
+on_query(InstId, Req, State) ->
+    emqx_bridge_impl_kafka_producer:on_query(InstId, Req, State).
+
+on_query_async(InstId, Req, ReplyFn, State) ->
+    emqx_bridge_impl_kafka_producer:on_query_async(InstId, Req, ReplyFn, State).
 
 on_get_status(InstId, State) ->
     emqx_bridge_impl_kafka_producer:on_get_status(InstId, State).

+ 43 - 55
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -11,6 +11,7 @@
     on_start/2,
     on_stop/2,
     on_query/3,
+    on_query_async/4,
     on_get_status/2
 ]).
 
@@ -140,19 +141,48 @@ on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_i
         }
     ).
 
+on_query(
+    _InstId,
+    {send_message, Message},
+    #{message_template := Template, producers := Producers}
+) ->
+    KafkaMessage = render_message(Template, Message),
+    %% TODO: this function is not used so far,
+    %% timeout should be configurable
+    %% or the on_query/3 should be on_query/4 instead.
+    try
+        {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], 5000),
+        ok
+    catch
+        error:{producer_down, _} = Reason ->
+            {error, Reason};
+        error:timeout ->
+            {error, timeout}
+    end.
+
 %% @doc The callback API for rule-engine (or bridge without rules)
 %% The input argument `Message' is an enriched format (as a map())
 %% of the original #message{} record.
 %% The enrichment is done by rule-engine or by the data bridge framework.
 %% E.g. the output of rule-engine process chain
 %% or the direct mapping from an MQTT message.
-on_query(_InstId, {send_message, Message}, #{message_template := Template, producers := Producers}) ->
+on_query_async(
+    _InstId,
+    {send_message, Message},
+    AsyncReplyFn,
+    #{message_template := Template, producers := Producers}
+) ->
     KafkaMessage = render_message(Template, Message),
+    %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
+    %% * Must be a single element batch because wolff books calls, but not batch sizes
+    %%   for counters and gauges.
+    Batch = [KafkaMessage],
     %% The retuned information is discarded here.
     %% If the producer process is down when sending, this function would
     %% raise an error exception which is to be caught by the caller of this callback
-    {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}),
-    {async_return, ok}.
+    {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}),
+    %% this Pid is so far never used because Kafka producer is by-passing the buffer worker
+    {ok, Pid}.
 
 compile_message_template(T) ->
     KeyTemplate = maps:get(key, T, <<"${.clientid}">>),
@@ -194,9 +224,14 @@ render_timestamp(Template, Message) ->
             erlang:system_time(millisecond)
     end.
 
-on_kafka_ack(_Partition, _Offset, _Extra) ->
-    %% Do nothing so far.
-    %% Maybe need to bump some counters?
+%% Wolff producer never gives up retrying
+%% so there can only be 'ok' results.
+on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) ->
+    %% the ReplyFn is emqx_resource_worker:reply_after_query/8
+    apply(ReplyFn, Args ++ [ok]);
+on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
+    %% wolff should bump the dropped_queue_full counter
+    %% do not apply the callback (which is basically to bump success or fail counter)
     ok.
 
 on_get_status(_InstId, _State) ->
@@ -345,27 +380,13 @@ get_required(Field, Config, Throw) ->
 %% we *must* match the bridge id in the event metadata with that in
 %% the handler config; otherwise, multiple kafka producer bridges will
 %% install multiple handlers to the same wolff events, multiplying the
-handle_telemetry_event(
-    [wolff, dropped],
-    #{counter_inc := Val},
-    #{bridge_id := ID},
-    #{bridge_id := ID}
-) when is_integer(Val) ->
-    emqx_resource_metrics:dropped_inc(ID, Val);
 handle_telemetry_event(
     [wolff, dropped_queue_full],
     #{counter_inc := Val},
     #{bridge_id := ID},
     #{bridge_id := ID}
 ) when is_integer(Val) ->
-    %% When wolff emits a `dropped_queue_full' event due to replayq
-    %% overflow, it also emits a `dropped' event (at the time of
-    %% writing, wolff is 1.7.4).  Since we already bump `dropped' when
-    %% `dropped.queue_full' occurs, we have to correct it here.  This
-    %% correction will have to be dropped if wolff stops also emitting
-    %% `dropped'.
-    emqx_resource_metrics:dropped_queue_full_inc(ID, Val),
-    emqx_resource_metrics:dropped_inc(ID, -Val);
+    emqx_resource_metrics:dropped_queue_full_inc(ID, Val);
 handle_telemetry_event(
     [wolff, queuing],
     #{gauge_set := Val},
@@ -380,13 +401,6 @@ handle_telemetry_event(
     #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:retried_inc(ID, Val);
-handle_telemetry_event(
-    [wolff, failed],
-    #{counter_inc := Val},
-    #{bridge_id := ID},
-    #{bridge_id := ID}
-) when is_integer(Val) ->
-    emqx_resource_metrics:failed_inc(ID, Val);
 handle_telemetry_event(
     [wolff, inflight],
     #{gauge_set := Val},
@@ -394,27 +408,6 @@ handle_telemetry_event(
     #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:inflight_set(ID, PartitionID, Val);
-handle_telemetry_event(
-    [wolff, retried_failed],
-    #{counter_inc := Val},
-    #{bridge_id := ID},
-    #{bridge_id := ID}
-) when is_integer(Val) ->
-    emqx_resource_metrics:retried_failed_inc(ID, Val);
-handle_telemetry_event(
-    [wolff, retried_success],
-    #{counter_inc := Val},
-    #{bridge_id := ID},
-    #{bridge_id := ID}
-) when is_integer(Val) ->
-    emqx_resource_metrics:retried_success_inc(ID, Val);
-handle_telemetry_event(
-    [wolff, success],
-    #{counter_inc := Val},
-    #{bridge_id := ID},
-    #{bridge_id := ID}
-) when is_integer(Val) ->
-    emqx_resource_metrics:success_inc(ID, Val);
 handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
     %% Event that we do not handle
     ok.
@@ -437,15 +430,10 @@ maybe_install_wolff_telemetry_handlers(ResourceID) ->
         %% unique handler id
         telemetry_handler_id(ResourceID),
         [
-            [wolff, dropped],
             [wolff, dropped_queue_full],
             [wolff, queuing],
             [wolff, retried],
-            [wolff, failed],
-            [wolff, inflight],
-            [wolff, retried_failed],
-            [wolff, retried_success],
-            [wolff, success]
+            [wolff, inflight]
         ],
         fun ?MODULE:handle_telemetry_event/4,
         %% we *must* keep track of the same id that is handed down to

+ 62 - 22
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -46,7 +46,14 @@
 %%------------------------------------------------------------------------------
 
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    [
+        {group, on_query},
+        {group, on_query_async}
+    ].
+
+groups() ->
+    All = emqx_common_test_helpers:all(?MODULE),
+    [{on_query, All}, {on_query_async, All}].
 
 wait_until_kafka_is_up() ->
     wait_until_kafka_is_up(0).
@@ -89,6 +96,12 @@ end_per_suite(_Config) ->
     _ = application:stop(emqx_connector),
     ok.
 
+init_per_group(GroupName, Config) ->
+    [{query_api, GroupName} | Config].
+
+end_per_group(_, _) ->
+    ok.
+
 set_special_configs(emqx_management) ->
     Listeners = #{http => #{port => 8081}},
     Config = #{
@@ -106,23 +119,23 @@ set_special_configs(_) ->
 %% Test cases for all combinations of SSL, no SSL and authentication types
 %%------------------------------------------------------------------------------
 
-t_publish_no_auth(_CtConfig) ->
-    publish_with_and_without_ssl("none").
+t_publish_no_auth(CtConfig) ->
+    publish_with_and_without_ssl(CtConfig, "none").
 
-t_publish_no_auth_key_dispatch(_CtConfig) ->
-    publish_with_and_without_ssl("none", #{"partition_strategy" => "key_dispatch"}).
+t_publish_no_auth_key_dispatch(CtConfig) ->
+    publish_with_and_without_ssl(CtConfig, "none", #{"partition_strategy" => "key_dispatch"}).
 
-t_publish_sasl_plain(_CtConfig) ->
-    publish_with_and_without_ssl(valid_sasl_plain_settings()).
+t_publish_sasl_plain(CtConfig) ->
+    publish_with_and_without_ssl(CtConfig, valid_sasl_plain_settings()).
 
-t_publish_sasl_scram256(_CtConfig) ->
-    publish_with_and_without_ssl(valid_sasl_scram256_settings()).
+t_publish_sasl_scram256(CtConfig) ->
+    publish_with_and_without_ssl(CtConfig, valid_sasl_scram256_settings()).
 
-t_publish_sasl_scram512(_CtConfig) ->
-    publish_with_and_without_ssl(valid_sasl_scram512_settings()).
+t_publish_sasl_scram512(CtConfig) ->
+    publish_with_and_without_ssl(CtConfig, valid_sasl_scram512_settings()).
 
-t_publish_sasl_kerberos(_CtConfig) ->
-    publish_with_and_without_ssl(valid_sasl_kerberos_settings()).
+t_publish_sasl_kerberos(CtConfig) ->
+    publish_with_and_without_ssl(CtConfig, valid_sasl_kerberos_settings()).
 
 %%------------------------------------------------------------------------------
 %% Test cases for REST api
@@ -350,7 +363,7 @@ kafka_bridge_rest_api_helper(Config) ->
 %% exists and it will.  This is specially bad if the
 %% original crash was due to misconfiguration and we are
 %% trying to fix it...
-t_failed_creation_then_fix(_Config) ->
+t_failed_creation_then_fix(Config) ->
     HostsString = kafka_hosts_string_sasl(),
     ValidAuthSettings = valid_sasl_plain_settings(),
     WrongAuthSettings = ValidAuthSettings#{"password" := "wrong"},
@@ -394,7 +407,7 @@ t_failed_creation_then_fix(_Config) ->
     },
     {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
     ct:pal("base offset before testing ~p", [Offset]),
-    ?assertEqual({async_return, ok}, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)),
+    ok = send(Config, ResourceId, Msg, State),
     {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
     ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
     %% TODO: refactor those into init/end per testcase
@@ -406,11 +419,37 @@ t_failed_creation_then_fix(_Config) ->
 %% Helper functions
 %%------------------------------------------------------------------------------
 
-publish_with_and_without_ssl(AuthSettings) ->
-    publish_with_and_without_ssl(AuthSettings, #{}).
+send(Config, ResourceId, Msg, State) when is_list(Config) ->
+    Ref = make_ref(),
+    ok = do_send(Ref, Config, ResourceId, Msg, State),
+    receive
+        {ack, Ref} ->
+            ok
+    after 10000 ->
+        error(timeout)
+    end.
+
+do_send(Ref, Config, ResourceId, Msg, State) when is_list(Config) ->
+    Caller = self(),
+    F = fun(ok) ->
+        Caller ! {ack, Ref},
+        ok
+    end,
+    case proplists:get_value(query_api, Config) of
+        on_query ->
+            ok = ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State),
+            F(ok);
+        on_query_async ->
+            {ok, _} = ?PRODUCER:on_query_async(ResourceId, {send_message, Msg}, {F, []}, State),
+            ok
+    end.
+
+publish_with_and_without_ssl(CtConfig, AuthSettings) ->
+    publish_with_and_without_ssl(CtConfig, AuthSettings, #{}).
 
-publish_with_and_without_ssl(AuthSettings, Config) ->
+publish_with_and_without_ssl(CtConfig, AuthSettings, Config) ->
     publish_helper(
+        CtConfig,
         #{
             auth_settings => AuthSettings,
             ssl_settings => #{}
@@ -418,6 +457,7 @@ publish_with_and_without_ssl(AuthSettings, Config) ->
         Config
     ),
     publish_helper(
+        CtConfig,
         #{
             auth_settings => AuthSettings,
             ssl_settings => valid_ssl_settings()
@@ -426,10 +466,11 @@ publish_with_and_without_ssl(AuthSettings, Config) ->
     ),
     ok.
 
-publish_helper(AuthSettings) ->
-    publish_helper(AuthSettings, #{}).
+publish_helper(CtConfig, AuthSettings) ->
+    publish_helper(CtConfig, AuthSettings, #{}).
 
 publish_helper(
+    CtConfig,
     #{
         auth_settings := AuthSettings,
         ssl_settings := SSLSettings
@@ -477,8 +518,7 @@ publish_helper(
     ct:pal("base offset before testing ~p", [Offset]),
     StartRes = ?PRODUCER:on_start(InstId, Conf),
     {ok, State} = StartRes,
-    OnQueryRes = ?PRODUCER:on_query(InstId, {send_message, Msg}, State),
-    {async_return, ok} = OnQueryRes,
+    ok = send(CtConfig, InstId, Msg, State),
     {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
     ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
     ok = ?PRODUCER:on_stop(InstId, State),

+ 1 - 1
scripts/ct/run.sh

@@ -195,7 +195,7 @@ fi
 
 echo "Fixing file owners and permissions for $UID_GID"
 # rebar and hex cache directory need to be writable by $UID
-docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "mkdir -p /.cache && chown $UID_GID /.cache && chown -R $UID_GID /emqx"
+docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "mkdir -p /.cache && chown $UID_GID /.cache && chown -R $UID_GID /emqx/.git /emqx/.ci /emqx/_build/default/lib"
 # need to initialize .erlang.cookie manually here because / is not writable by $UID
 docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "openssl rand -base64 16 > /.erlang.cookie && chown $UID_GID /.erlang.cookie && chmod 0400 /.erlang.cookie"