Explorar o código

Merge pull request #9799 from zmstone/0118-fix-key_dispatch-kafka-produce-strategy

0118 fix key dispatch kafka produce strategy
Zaiming (Stone) Shi %!s(int64=3) %!d(string=hai) anos
pai
achega
2d01e604a5

+ 12 - 3
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -253,10 +253,14 @@ producers_config(BridgeName, ClientId, Input) ->
             mode := BufferMode,
             per_partition_limit := PerPartitionLimit,
             segment_bytes := SegmentBytes,
-            memory_overload_protection := MemOLP
+            memory_overload_protection := MemOLP0
         }
     } = Input,
-
+    MemOLP =
+        case os:type() of
+            {unix, linux} -> MemOLP0;
+            _ -> false
+        end,
     {OffloadMode, ReplayqDir} =
         case BufferMode of
             memory -> {false, false};
@@ -268,7 +272,7 @@ producers_config(BridgeName, ClientId, Input) ->
     ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
     #{
         name => make_producer_name(BridgeName),
-        partitioner => PartitionStrategy,
+        partitioner => partitioner(PartitionStrategy),
         partition_count_refresh_interval_seconds => PCntRefreshInterval,
         replayq_dir => ReplayqDir,
         replayq_offload_mode => OffloadMode,
@@ -282,6 +286,11 @@ producers_config(BridgeName, ClientId, Input) ->
         telemetry_meta_data => #{bridge_id => ResourceID}
     }.
 
+%% Wolff API is a batch API.
+%% key_dispatch only looks at the first element, so it's named 'first_key_dispatch'
+partitioner(random) -> random;
+partitioner(key_dispatch) -> first_key_dispatch.
+
 replayq_dir(ClientId) ->
     filename:join([emqx:data_dir(), "kafka", ClientId]).
 

+ 50 - 19
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -109,6 +109,9 @@ set_special_configs(_) ->
 t_publish_no_auth(_CtConfig) ->
     publish_with_and_without_ssl("none").
 
+t_publish_no_auth_key_dispatch(_CtConfig) ->
+    publish_with_and_without_ssl("none", #{"partition_strategy" => "key_dispatch"}).
+
 t_publish_sasl_plain(_CtConfig) ->
     publish_with_and_without_ssl(valid_sasl_plain_settings()).
 
@@ -404,20 +407,35 @@ t_failed_creation_then_fix(_Config) ->
 %%------------------------------------------------------------------------------
 
 publish_with_and_without_ssl(AuthSettings) ->
-    publish_helper(#{
-        auth_settings => AuthSettings,
-        ssl_settings => #{}
-    }),
-    publish_helper(#{
-        auth_settings => AuthSettings,
-        ssl_settings => valid_ssl_settings()
-    }),
+    publish_with_and_without_ssl(AuthSettings, #{}).
+
+publish_with_and_without_ssl(AuthSettings, Config) ->
+    publish_helper(
+        #{
+            auth_settings => AuthSettings,
+            ssl_settings => #{}
+        },
+        Config
+    ),
+    publish_helper(
+        #{
+            auth_settings => AuthSettings,
+            ssl_settings => valid_ssl_settings()
+        },
+        Config
+    ),
     ok.
 
-publish_helper(#{
-    auth_settings := AuthSettings,
-    ssl_settings := SSLSettings
-}) ->
+publish_helper(AuthSettings) ->
+    publish_helper(AuthSettings, #{}).
+
+publish_helper(
+    #{
+        auth_settings := AuthSettings,
+        ssl_settings := SSLSettings
+    },
+    Conf0
+) ->
     HostsString =
         case {AuthSettings, SSLSettings} of
             {"none", Map} when map_size(Map) =:= 0 ->
@@ -434,13 +452,17 @@ publish_helper(#{
     InstId = emqx_bridge_resource:resource_id("kafka", Name),
     BridgeId = emqx_bridge_resource:bridge_id("kafka", Name),
     KafkaTopic = "test-topic-one-partition",
-    Conf = config(#{
-        "authentication" => AuthSettings,
-        "kafka_hosts_string" => HostsString,
-        "kafka_topic" => KafkaTopic,
-        "instance_id" => InstId,
-        "ssl" => SSLSettings
-    }),
+    Conf = config(
+        #{
+            "authentication" => AuthSettings,
+            "kafka_hosts_string" => HostsString,
+            "kafka_topic" => KafkaTopic,
+            "instance_id" => InstId,
+            "ssl" => SSLSettings
+        },
+        Conf0
+    ),
+
     emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}),
     %% To make sure we get unique value
     timer:sleep(1),
@@ -463,7 +485,15 @@ publish_helper(#{
     ok = emqx_bridge_resource:remove(BridgeId),
     ok.
 
+default_config() ->
+    #{"partition_strategy" => "random"}.
+
 config(Args) ->
+    config(Args, #{}).
+
+config(Args0, More) ->
+    Args1 = maps:merge(default_config(), Args0),
+    Args = maps:merge(Args1, More),
     ConfText = hocon_config(Args),
     ct:pal("Running tests with conf:\n~s", [ConfText]),
     {ok, Conf} = hocon:binary(ConfText),
@@ -506,6 +536,7 @@ producer = {
     kafka = {
         topic = \"{{ kafka_topic }}\"
         message = {key = \"${clientid}\", value = \"${.payload}\"}
+        partition_strategy = {{ partition_strategy }}
     }
 }
 """.

+ 17 - 13
scripts/ct/run.sh

@@ -169,11 +169,6 @@ else
     export UID_GID="$ORIG_UID_GID"
 fi
 
-if [ "$STOP" = 'no' ]; then
-    # shellcheck disable=2086 # no quotes for F_OPTIONS
-    docker-compose $F_OPTIONS up -d --build --remove-orphans
-fi
-
 # /emqx is where the source dir is mounted to the Erlang container
 # in .ci/docker-compose-file/docker-compose.yaml
 TTY=''
@@ -181,18 +176,29 @@ if [[ -t 1 ]]; then
     TTY='-t'
 fi
 
+function restore_ownership {
+    if ! sudo chown -R "$ORIG_UID_GID" . >/dev/null 2>&1; then
+        docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "chown -R $ORIG_UID_GID /emqx" >/dev/null 2>&1 || true
+    fi
+}
+
+restore_ownership
+trap restore_ownership EXIT
+
+
+if [ "$STOP" = 'no' ]; then
+    # some left-over log file has to be deleted before a new docker-compose up
+    rm -f '.ci/docker-compose-file/redis/*.log'
+    # shellcheck disable=2086 # no quotes for F_OPTIONS
+    docker-compose $F_OPTIONS up -d --build --remove-orphans
+fi
+
 echo "Fixing file owners and permissions for $UID_GID"
 # rebar and hex cache directory need to be writable by $UID
 docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "mkdir -p /.cache && chown $UID_GID /.cache && chown -R $UID_GID /emqx"
 # need to initialize .erlang.cookie manually here because / is not writable by $UID
 docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "openssl rand -base64 16 > /.erlang.cookie && chown $UID_GID /.erlang.cookie && chmod 0400 /.erlang.cookie"
 
-restore_ownership() {
-    if [[ "$ORIG_UID_GID" != "$UID_GID" ]]; then
-        docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "chown -R $ORIG_UID_GID /emqx"
-    fi
-}
-
 if [ "$ONLY_UP" = 'yes' ]; then
     exit 0
 fi
@@ -204,10 +210,8 @@ if [ "$STOP" = 'yes' ]; then
     docker-compose $F_OPTIONS down --remove-orphans
 elif [ "$ATTACH" = 'yes' ]; then
     docker exec -it "$ERLANG_CONTAINER" bash
-    restore_ownership
 elif [ "$CONSOLE" = 'yes' ]; then
     docker exec -e PROFILE="$PROFILE" -i $TTY "$ERLANG_CONTAINER" bash -c "make run"
-    restore_ownership
 else
     if [ -z "${REBAR3CT:-}" ]; then
         docker exec -e IS_CI="$IS_CI" -e PROFILE="$PROFILE" -i $TTY "$ERLANG_CONTAINER" bash -c "BUILD_WITHOUT_QUIC=1 make ${WHICH_APP}-ct"