Просмотр исходного кода

Merge remote-tracking branch 'origin/release-50' into 0403-sync-release-50-back-to-master

Zaiming (Stone) Shi 2 лет назад
Родитель
Сommit
68c15ffd48

+ 1 - 1
apps/emqx/include/emqx_release.hrl

@@ -35,7 +35,7 @@
 -define(EMQX_RELEASE_CE, "5.0.21").
 -define(EMQX_RELEASE_CE, "5.0.21").
 
 
 %% Enterprise edition
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.0.2-rc.3").
+-define(EMQX_RELEASE_EE, "5.0.2-rc.4").
 
 
 %% the HTTP API version
 %% the HTTP API version
 -define(EMQX_API_VERSION, "5.0").
 -define(EMQX_API_VERSION, "5.0").

+ 37 - 3
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -270,9 +270,6 @@ fast_forward_to_commit(Node, ToTnxId) ->
 
 
 %% @private
 %% @private
 init([Node, RetryMs]) ->
 init([Node, RetryMs]) ->
-    %% Workaround for https://github.com/emqx/mria/issues/94:
-    _ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000),
-    _ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
     {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
     {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
     State = #{node => Node, retry_interval => RetryMs},
     State = #{node => Node, retry_interval => RetryMs},
     %% The init transaction ID is set in emqx_conf_app after
     %% The init transaction ID is set in emqx_conf_app after
@@ -286,6 +283,9 @@ init([Node, RetryMs]) ->
 
 
 %% @private
 %% @private
 handle_continue(?CATCH_UP, State) ->
 handle_continue(?CATCH_UP, State) ->
+    %% emqx app must be started before
+    %% trying to catch up the rpc commit logs
+    ok = wait_for_emqx_ready(),
     {noreply, State, catch_up(State)}.
     {noreply, State, catch_up(State)}.
 
 
 handle_call(reset, _From, State) ->
 handle_call(reset, _From, State) ->
@@ -572,3 +572,37 @@ maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok;
 maybe_init_tnx_id(Node, TnxId) ->
 maybe_init_tnx_id(Node, TnxId) ->
     {atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]),
     {atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]),
     ok.
     ok.
+
+%% @priv Cannot proceed until emqx app is ready.
+%% Otherwise the committed transaction catch up may fail.
+wait_for_emqx_ready() ->
+    %% wait 10 seconds for emqx to start
+    ok = do_wait_for_emqx_ready(10).
+
+%% Wait for emqx app to be ready,
+%% write a log message every 1 second
+do_wait_for_emqx_ready(0) ->
+    timeout;
+do_wait_for_emqx_ready(N) ->
+    %% check interval is 100ms
+    %% makes the total wait time 1 second
+    case do_wait_for_emqx_ready2(10) of
+        ok ->
+            ok;
+        timeout ->
+            ?SLOG(warning, #{msg => "stil_waiting_for_emqx_app_to_be_ready"}),
+            do_wait_for_emqx_ready(N - 1)
+    end.
+
+%% Wait for emqx app to be ready,
+%% check interval is 100ms
+do_wait_for_emqx_ready2(0) ->
+    timeout;
+do_wait_for_emqx_ready2(N) ->
+    case emqx:is_running() of
+        true ->
+            ok;
+        false ->
+            timer:sleep(100),
+            do_wait_for_emqx_ready2(N - 1)
+    end.

+ 3 - 1
apps/emqx_conf/src/emqx_cluster_rpc_handler.erl

@@ -13,7 +13,9 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
--module(emqx_cluster_rpc_handler).
+
+%% @doc This module is responsible for cleaning up the cluster RPC MFA.
+-module(emqx_cluster_rpc_cleaner).
 
 
 -behaviour(gen_server).
 -behaviour(gen_server).
 
 

+ 4 - 1
apps/emqx_conf/src/emqx_conf_app.erl

@@ -95,13 +95,16 @@ init_load() ->
 -endif.
 -endif.
 
 
 init_conf() ->
 init_conf() ->
+    %% Workaround for https://github.com/emqx/mria/issues/94:
+    _ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000),
+    _ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
     {ok, TnxId} = copy_override_conf_from_core_node(),
     {ok, TnxId} = copy_override_conf_from_core_node(),
     _ = emqx_app:set_init_tnx_id(TnxId),
     _ = emqx_app:set_init_tnx_id(TnxId),
     ok = init_load(),
     ok = init_load(),
     ok = emqx_app:set_init_config_load_done().
     ok = emqx_app:set_init_config_load_done().
 
 
 cluster_nodes() ->
 cluster_nodes() ->
-    mria_mnesia:running_nodes() -- [node()].
+    mria:cluster_nodes(cores) -- [node()].
 
 
 copy_override_conf_from_core_node() ->
 copy_override_conf_from_core_node() ->
     case cluster_nodes() of
     case cluster_nodes() of

+ 1 - 1
apps/emqx_conf/src/emqx_conf_sup.erl

@@ -36,7 +36,7 @@ init([]) ->
     ChildSpecs =
     ChildSpecs =
         [
         [
             child_spec(emqx_cluster_rpc, []),
             child_spec(emqx_cluster_rpc, []),
-            child_spec(emqx_cluster_rpc_handler, [])
+            child_spec(emqx_cluster_rpc_cleaner, [])
         ],
         ],
     {ok, {SupFlags, ChildSpecs}}.
     {ok, {SupFlags, ChildSpecs}}.
 
 

+ 12 - 10
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -43,6 +43,7 @@ groups() -> [].
 init_per_suite(Config) ->
 init_per_suite(Config) ->
     application:load(emqx_conf),
     application:load(emqx_conf),
     ok = ekka:start(),
     ok = ekka:start(),
+    ok = emqx_common_test_helpers:start_apps([]),
     ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
     ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
     ok = emqx_config:put([node, cluster_call, retry_interval], 1000),
     ok = emqx_config:put([node, cluster_call, retry_interval], 1000),
     meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
     meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
@@ -53,6 +54,7 @@ init_per_suite(Config) ->
     Config.
     Config.
 
 
 end_per_suite(_Config) ->
 end_per_suite(_Config) ->
+    ok = emqx_common_test_helpers:stop_apps([]),
     ekka:stop(),
     ekka:stop(),
     mria:stop(),
     mria:stop(),
     meck:unload(mria),
     meck:unload(mria),
@@ -255,13 +257,13 @@ t_fast_forward_commit(_Config) ->
     ),
     ),
     ok.
     ok.
 
 
-t_handler_unexpected_msg(_Config) ->
-    Handler = emqx_cluster_rpc_handler,
-    OldPid = erlang:whereis(Handler),
-    ok = gen_server:cast(Handler, unexpected_cast_msg),
-    ignore = gen_server:call(Handler, unexpected_cast_msg),
-    erlang:send(Handler, unexpected_info_msg),
-    NewPid = erlang:whereis(Handler),
+t_cleaner_unexpected_msg(_Config) ->
+    Cleaner = emqx_cluster_cleaner,
+    OldPid = erlang:whereis(Cleaner),
+    ok = gen_server:cast(Cleaner, unexpected_cast_msg),
+    ignore = gen_server:call(Cleaner, unexpected_cast_msg),
+    erlang:send(Cleaner, unexpected_info_msg),
+    NewPid = erlang:whereis(Cleaner),
     ?assertEqual(OldPid, NewPid),
     ?assertEqual(OldPid, NewPid),
     ok.
     ok.
 
 
@@ -279,8 +281,8 @@ start() ->
     {ok, Pid1} = emqx_cluster_rpc:start_link(),
     {ok, Pid1} = emqx_cluster_rpc:start_link(),
     {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
     {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
     {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
     {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
-    {ok, Pid4} = emqx_cluster_rpc_handler:start_link(100, 500),
-    true = erlang:register(emqx_cluster_rpc_handler, Pid4),
+    {ok, Pid4} = emqx_cluster_rpc_cleaner:start_link(100, 500),
+    true = erlang:register(emqx_cluster_rpc_cleaner, Pid4),
     {ok, [Pid1, Pid2, Pid3, Pid4]}.
     {ok, [Pid1, Pid2, Pid3, Pid4]}.
 
 
 stop() ->
 stop() ->
@@ -296,7 +298,7 @@ stop() ->
         end
         end
      || N <- [?NODE1, ?NODE2, ?NODE3]
      || N <- [?NODE1, ?NODE2, ?NODE3]
     ],
     ],
-    gen_server:stop(emqx_cluster_rpc_handler, normal, 5000).
+    gen_server:stop(emqx_cluster_rpc_cleaner, normal, 5000).
 
 
 receive_msg(0, _Msg) ->
 receive_msg(0, _Msg) ->
     ok;
     ok;

+ 1 - 1
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -82,7 +82,7 @@ worker_pool_size(required) -> false;
 worker_pool_size(_) -> undefined.
 worker_pool_size(_) -> undefined.
 
 
 resume_interval(type) -> emqx_schema:duration_ms();
 resume_interval(type) -> emqx_schema:duration_ms();
-resume_interval(importance) -> hidden;
+resume_interval(importance) -> ?IMPORTANCE_HIDDEN;
 resume_interval(desc) -> ?DESC("resume_interval");
 resume_interval(desc) -> ?DESC("resume_interval");
 resume_interval(required) -> false;
 resume_interval(required) -> false;
 resume_interval(_) -> undefined.
 resume_interval(_) -> undefined.

+ 3 - 3
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -514,6 +514,8 @@ inc_action_metrics({error, {recoverable_error, _}}, RuleId) ->
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
 inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
 inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
+inc_action_metrics({error, {unrecoverable_error, _}}, RuleId) ->
+    emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed');
 inc_action_metrics(R, RuleId) ->
 inc_action_metrics(R, RuleId) ->
     case is_ok_result(R) of
     case is_ok_result(R) of
         false ->
         false ->
@@ -523,9 +525,7 @@ inc_action_metrics(R, RuleId) ->
             emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
             emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
     end.
     end.
 
 
-is_ok_result(ok) ->
-    true;
 is_ok_result(R) when is_tuple(R) ->
 is_ok_result(R) when is_tuple(R) ->
     ok == erlang:element(1, R);
     ok == erlang:element(1, R);
-is_ok_result(ok) ->
+is_ok_result(_) ->
     false.
     false.

+ 4 - 0
changes/ce/fix-10327.en.md

@@ -0,0 +1,4 @@
+Don't increment 'actions.failed.unknown' rule metrics counter upon receiving unrecoverable bridge errors.
+This counter is displayed on the dashboard's rule overview tab ('Action statistics' - 'Unknown'). 
+The fix is only applicable for synchronous bridges, as all rule actions for asynchronous bridges 
+are counted as successful (they increment 'actions.success' which is displayed as 'Action statistics' - 'Success'). 

+ 25 - 14
scripts/buildx.sh

@@ -27,6 +27,13 @@ help() {
     echo "                           E.g. ghcr.io/emqx/emqx-builder/5.0-28:1.13.4-24.3.4.2-2-debian11"
     echo "                           E.g. ghcr.io/emqx/emqx-builder/5.0-28:1.13.4-24.3.4.2-2-debian11"
 }
 }
 
 
+die() {
+    msg="$1"
+    echo "$msg" >&2
+    help
+    exit 1
+}
+
 while [ "$#" -gt 0 ]; do
 while [ "$#" -gt 0 ]; do
     case $1 in
     case $1 in
     -h|--help)
     -h|--help)
@@ -81,13 +88,23 @@ while [ "$#" -gt 0 ]; do
   esac
   esac
 done
 done
 
 
-if [ -z "${PROFILE:-}" ]    ||
-   [ -z "${PKGTYPE:-}" ]    ||
-   [ -z "${BUILDER:-}" ]    ||
-   [ -z "${ARCH:-}" ]; then
-    help
-    exit 1
+## we have a different naming for them
+if [[ $(uname -m) == "x86_64" ]]; then
+    NATIVE_ARCH='amd64'
+elif [[ $(uname -m) == "aarch64" ]]; then
+    NATIVE_ARCH='arm64'
+elif [[ $(uname -m) == "arm64" ]]; then
+    NATIVE_ARCH='arm64'
+elif [[ $(uname -m) == "armv7l" ]]; then
+    # CHECKME: really ?
+    NATIVE_ARCH='arm64'
 fi
 fi
+ARCH="${ARCH:-${NATIVE_ARCH:-}}"
+
+[ -z "${PROFILE:-}" ] && die "missing --prifile"
+[ -z "${PKGTYPE:-}" ] && die "missing --pkgtyp"
+[ -z "${BUILDER:-}" ] && die "missing --builder"
+[ -z "${ARCH:-}" ] && die "missing --arch"
 
 
 # ensure dir
 # ensure dir
 cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
 cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
@@ -128,13 +145,7 @@ if [[ "$HOST_SYSTEM" = "$BUILDER_SYSTEM" ]]; then
 fi
 fi
 
 
 IS_NATIVE_ARCH='no'
 IS_NATIVE_ARCH='no'
-if [[ $(uname -m) == "x86_64" && "$ARCH" == "amd64" ]]; then
-    IS_NATIVE_ARCH='yes'
-elif [[ $(uname -m) == "aarch64" && "$ARCH" == "arm64" ]]; then
-    IS_NATIVE_ARCH='yes'
-elif [[ $(uname -m) == "arm64" && "$ARCH" == "arm64" ]]; then
-    IS_NATIVE_ARCH='yes'
-elif [[ $(uname -m) == "armv7l" && "$ARCH" == "arm64" ]]; then
+if [[ "$NATIVE_ARCH" == "$ARCH" ]]; then
     IS_NATIVE_ARCH='yes'
     IS_NATIVE_ARCH='yes'
 fi
 fi
 
 
@@ -151,7 +162,7 @@ elif docker info; then
         --platform="linux/$ARCH" \
         --platform="linux/$ARCH" \
         --env ACLOCAL_PATH="/usr/share/aclocal:/usr/local/share/aclocal" \
         --env ACLOCAL_PATH="/usr/share/aclocal:/usr/local/share/aclocal" \
         "$BUILDER" \
         "$BUILDER" \
-        bash -euc "$CMD_RUN"
+        bash -euc "git config --global --add safe.directory /emqx && $CMD_RUN"
 else
 else
     echo "Error: Docker not available on unsupported platform"
     echo "Error: Docker not available on unsupported platform"
     exit 1;
     exit 1;