Просмотр исходного кода

Merge pull request #13487 from thalesmg/20240715-m-refactor-cluster-link-api

feat(cluster link): refactor http api, add status and metrics
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
39b8cb1789

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -10,6 +10,7 @@
 {emqx_bridge,5}.
 {emqx_bridge,6}.
 {emqx_broker,1}.
+{emqx_cluster_link,1}.
 {emqx_cm,1}.
 {emqx_cm,2}.
 {emqx_cm,3}.

+ 4 - 0
apps/emqx_cluster_link/include/emqx_cluster_link.hrl

@@ -17,3 +17,7 @@
 %% Fairly compact text encoding.
 -define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>).
 -define(PERSISTENT_ROUTE_ID(Topic, ID), <<"$p/", ID/binary, "/", Topic/binary>>).
+
+-define(METRIC_NAME, cluster_link).
+
+-define(route_metric, 'routes').

+ 498 - 23
apps/emqx_cluster_link/src/emqx_cluster_link_api.erl

@@ -7,86 +7,525 @@
 
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/http_api.hrl").
+-include_lib("emqx_utils/include/emqx_utils_api.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include("emqx_cluster_link.hrl").
 
 -export([
     api_spec/0,
     paths/0,
+    namespace/0,
+    fields/1,
     schema/1
 ]).
 
--export([config/2]).
+-export([
+    '/cluster/links'/2,
+    '/cluster/links/link/:name'/2,
+    '/cluster/links/link/:name/metrics'/2
+]).
 
 -define(CONF_PATH, [cluster, links]).
 -define(TAGS, [<<"Cluster">>]).
 
+-type cluster_name() :: binary().
+
+namespace() -> "cluster_link".
+
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
 
 paths() ->
     [
-        "/cluster/links"
+        "/cluster/links",
+        "/cluster/links/link/:name",
+        "/cluster/links/link/:name/metrics"
     ].
 
 schema("/cluster/links") ->
     #{
-        'operationId' => config,
+        'operationId' => '/cluster/links',
         get =>
             #{
                 description => "Get cluster links configuration",
                 tags => ?TAGS,
                 responses =>
-                    #{200 => links_config_schema()}
+                    #{200 => links_config_schema_response()}
+            },
+        post =>
+            #{
+                description => "Create a cluster link",
+                tags => ?TAGS,
+                'requestBody' => link_config_schema(),
+                responses =>
+                    #{
+                        200 => link_config_schema_response(),
+                        400 =>
+                            emqx_dashboard_swagger:error_codes(
+                                [?BAD_REQUEST, ?ALREADY_EXISTS],
+                                <<"Update Config Failed">>
+                            )
+                    }
+            }
+    };
+schema("/cluster/links/link/:name") ->
+    #{
+        'operationId' => '/cluster/links/link/:name',
+        get =>
+            #{
+                description => "Get a cluster link configuration",
+                tags => ?TAGS,
+                parameters => [param_path_name()],
+                responses =>
+                    #{
+                        200 => link_config_schema_response(),
+                        404 => emqx_dashboard_swagger:error_codes(
+                            [?NOT_FOUND], <<"Cluster link not found">>
+                        )
+                    }
+            },
+        delete =>
+            #{
+                description => "Delete a cluster link",
+                tags => ?TAGS,
+                parameters => [param_path_name()],
+                responses =>
+                    #{
+                        204 => <<"Link deleted">>,
+                        404 => emqx_dashboard_swagger:error_codes(
+                            [?NOT_FOUND], <<"Cluster link not found">>
+                        )
+                    }
             },
         put =>
             #{
-                description => "Update cluster links configuration",
+                description => "Update a cluster link configuration",
                 tags => ?TAGS,
-                'requestBody' => links_config_schema(),
+                parameters => [param_path_name()],
+                'requestBody' => update_link_config_schema(),
                 responses =>
                     #{
-                        200 => links_config_schema(),
+                        200 => link_config_schema_response(),
+                        404 => emqx_dashboard_swagger:error_codes(
+                            [?NOT_FOUND], <<"Cluster link not found">>
+                        ),
                         400 =>
                             emqx_dashboard_swagger:error_codes(
                                 [?BAD_REQUEST], <<"Update Config Failed">>
                             )
                     }
             }
+    };
+schema("/cluster/links/link/:name/metrics") ->
+    #{
+        'operationId' => '/cluster/links/link/:name/metrics',
+        get =>
+            #{
+                description => "Get a cluster link metrics",
+                tags => ?TAGS,
+                parameters => [param_path_name()],
+                responses =>
+                    #{
+                        200 => link_metrics_schema_response(),
+                        404 => emqx_dashboard_swagger:error_codes(
+                            [?NOT_FOUND], <<"Cluster link not found">>
+                        )
+                    }
+            }
     }.
 
+fields(link_config_response) ->
+    [
+        {node, hoconsc:mk(binary(), #{desc => ?DESC("node")})},
+        {status, hoconsc:mk(status(), #{desc => ?DESC("status")})}
+        | emqx_cluster_link_schema:fields("link")
+    ];
+fields(metrics) ->
+    [
+        {metrics, hoconsc:mk(map(), #{desc => ?DESC("metrics")})}
+    ];
+fields(link_metrics_response) ->
+    [
+        {node_metrics,
+            hoconsc:mk(
+                hoconsc:array(hoconsc:ref(?MODULE, node_metrics)),
+                #{desc => ?DESC("node_metrics")}
+            )}
+        | fields(metrics)
+    ];
+fields(node_metrics) ->
+    [
+        {node, hoconsc:mk(atom(), #{desc => ?DESC("node")})}
+        | fields(metrics)
+    ].
+
 %%--------------------------------------------------------------------
 %% API Handler funcs
 %%--------------------------------------------------------------------
 
-config(get, _Params) ->
-    {200, get_raw()};
-config(put, #{body := Body}) ->
-    case emqx_cluster_link_config:update(Body) of
-        {ok, NewConfig} ->
-            {200, NewConfig};
-        {error, Reason} ->
-            Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
-            {400, ?BAD_REQUEST, Message}
-    end.
+'/cluster/links'(get, _Params) ->
+    handle_list();
+'/cluster/links'(post, #{body := Body = #{<<"name">> := Name}}) ->
+    with_link(
+        Name,
+        return(?BAD_REQUEST('ALREADY_EXISTS', <<"Cluster link already exists">>)),
+        fun() -> handle_create(Name, Body) end
+    ).
+
+'/cluster/links/link/:name'(get, #{bindings := #{name := Name}}) ->
+    with_link(Name, fun(Link) -> handle_lookup(Name, Link) end, not_found());
+'/cluster/links/link/:name'(put, #{bindings := #{name := Name}, body := Params0}) ->
+    with_link(Name, fun() -> handle_update(Name, Params0) end, not_found());
+'/cluster/links/link/:name'(delete, #{bindings := #{name := Name}}) ->
+    with_link(
+        Name,
+        fun() ->
+            case emqx_cluster_link_config:delete_link(Name) of
+                ok ->
+                    ?NO_CONTENT;
+                {error, Reason} ->
+                    Message = list_to_binary(io_lib:format("Delete link failed ~p", [Reason])),
+                    ?BAD_REQUEST(Message)
+            end
+        end,
+        not_found()
+    ).
+
+'/cluster/links/link/:name/metrics'(get, #{bindings := #{name := Name}}) ->
+    with_link(Name, fun() -> handle_metrics(Name) end, not_found()).
 
 %%--------------------------------------------------------------------
 %% Internal funcs
 %%--------------------------------------------------------------------
 
+handle_list() ->
+    Links = get_raw(),
+    NodeRPCResults = emqx_cluster_link_mqtt:get_all_resources_cluster(),
+    {NameToStatus, Errors} = collect_all_status(NodeRPCResults),
+    NodeErrors = lists:map(
+        fun({Node, Error}) ->
+            #{node => Node, status => inconsistent, reason => Error}
+        end,
+        Errors
+    ),
+    EmptyStatus = #{status => inconsistent, node_status => NodeErrors},
+    Response =
+        lists:map(
+            fun(#{<<"name">> := Name} = Link) ->
+                Status = maps:get(Name, NameToStatus, EmptyStatus),
+                maps:merge(Link, Status)
+            end,
+            Links
+        ),
+    ?OK(Response).
+
+handle_create(Name, Params) ->
+    case emqx_cluster_link_config:create_link(Params) of
+        {ok, Link} ->
+            ?CREATED(add_status(Name, Link));
+        {error, Reason} ->
+            Message = list_to_binary(io_lib:format("Create link failed ~p", [Reason])),
+            ?BAD_REQUEST(Message)
+    end.
+
+handle_lookup(Name, Link) ->
+    ?OK(add_status(Name, Link)).
+
+handle_metrics(Name) ->
+    Results = emqx_cluster_link_metrics:get_metrics(Name),
+    {NodeMetrics0, NodeErrors} =
+        lists:foldl(
+            fun({Node, RouterMetrics0, ResourceMetrics0}, {OkAccIn, ErrAccIn}) ->
+                {RouterMetrics, RouterError} = get_metrics_or_errors(RouterMetrics0),
+                {ResourceMetrics, ResourceError} = get_metrics_or_errors(ResourceMetrics0),
+                ErrAcc = append_errors(RouterError, ResourceError, Node, ErrAccIn),
+                {[format_metrics(Node, RouterMetrics, ResourceMetrics) | OkAccIn], ErrAcc}
+            end,
+            {[], []},
+            Results
+        ),
+    case NodeErrors of
+        [] ->
+            ok;
+        [_ | _] ->
+            ?SLOG(warning, #{
+                msg => "cluster_link_api_metrics_bad_erpc_results",
+                errors => maps:from_list(NodeErrors)
+            })
+    end,
+    NodeMetrics1 = lists:map(fun({Node, _Error}) -> format_metrics(Node, #{}, #{}) end, NodeErrors),
+    NodeMetrics = NodeMetrics1 ++ NodeMetrics0,
+    AggregatedMetrics = aggregate_metrics(NodeMetrics),
+    Response = #{metrics => AggregatedMetrics, node_metrics => NodeMetrics},
+    ?OK(Response).
+
+get_metrics_or_errors({ok, Metrics}) ->
+    {Metrics, undefined};
+get_metrics_or_errors(Error) ->
+    {#{}, Error}.
+
+append_errors(undefined, undefined, _Node, Acc) ->
+    Acc;
+append_errors(RouterError, ResourceError, Node, Acc) ->
+    Err0 = emqx_utils_maps:put_if(#{}, router, RouterError, RouterError =/= undefined),
+    Err = emqx_utils_maps:put_if(Err0, resource, ResourceError, ResourceError =/= undefined),
+    [{Node, Err} | Acc].
+
+aggregate_metrics(NodeMetrics) ->
+    ErrorLogger = fun(_) -> ok end,
+    #{metrics := #{router := EmptyRouterMetrics}} = format_metrics(node(), #{}, #{}),
+    {RouterMetrics, ResourceMetrics} = lists:foldl(
+        fun(
+            #{metrics := #{router := RMetrics, forwarding := FMetrics}},
+            {RouterAccIn, ResourceAccIn}
+        ) ->
+            ResourceAcc =
+                emqx_utils_maps:best_effort_recursive_sum(FMetrics, ResourceAccIn, ErrorLogger),
+            RouterAcc = merge_cluster_wide_metrics(RMetrics, RouterAccIn),
+            {RouterAcc, ResourceAcc}
+        end,
+        {EmptyRouterMetrics, #{}},
+        NodeMetrics
+    ),
+    #{router => RouterMetrics, forwarding => ResourceMetrics}.
+
+merge_cluster_wide_metrics(Metrics, Acc) ->
+    %% For cluster-wide metrics, all nodes should report the same values, except if the
+    %% RPC to fetch a node's metrics failed, in which case all values will be 0.
+    F =
+        fun(_Key, V1, V2) ->
+            case {erlang:is_map(V1), erlang:is_map(V2)} of
+                {true, true} ->
+                    merge_cluster_wide_metrics(V1, V2);
+                {true, false} ->
+                    merge_cluster_wide_metrics(V1, #{});
+                {false, true} ->
+                    merge_cluster_wide_metrics(V2, #{});
+                {false, false} ->
+                    true = is_number(V1),
+                    true = is_number(V2),
+                    max(V1, V2)
+            end
+        end,
+    maps:merge_with(F, Acc, Metrics).
+
+format_metrics(Node, RouterMetrics, ResourceMetrics) ->
+    Get = fun(Path, Map) -> emqx_utils_maps:deep_get(Path, Map, 0) end,
+    Routes = Get([gauges, ?route_metric], RouterMetrics),
+    #{
+        node => Node,
+        metrics => #{
+            router => #{
+                ?route_metric => Routes
+            },
+            forwarding => #{
+                'matched' => Get([counters, 'matched'], ResourceMetrics),
+                'success' => Get([counters, 'success'], ResourceMetrics),
+                'failed' => Get([counters, 'failed'], ResourceMetrics),
+                'dropped' => Get([counters, 'dropped'], ResourceMetrics),
+                'retried' => Get([counters, 'retried'], ResourceMetrics),
+                'received' => Get([counters, 'received'], ResourceMetrics),
+
+                'queuing' => Get([gauges, 'queuing'], ResourceMetrics),
+                'inflight' => Get([gauges, 'inflight'], ResourceMetrics),
+
+                'rate' => Get([rate, 'matched', current], ResourceMetrics),
+                'rate_last5m' => Get([rate, 'matched', last5m], ResourceMetrics),
+                'rate_max' => Get([rate, 'matched', max], ResourceMetrics)
+            }
+        }
+    }.
+
+add_status(Name, Link) ->
+    NodeRPCResults = emqx_cluster_link_mqtt:get_resource_cluster(Name),
+    Status = collect_single_status(NodeRPCResults),
+    maps:merge(Link, Status).
+
+handle_update(Name, Params0) ->
+    Params = Params0#{<<"name">> => Name},
+    case emqx_cluster_link_config:update_link(Params) of
+        {ok, Link} ->
+            ?OK(add_status(Name, Link));
+        {error, Reason} ->
+            Message = list_to_binary(io_lib:format("Update link failed ~p", [Reason])),
+            ?BAD_REQUEST(Message)
+    end.
+
 get_raw() ->
-    #{<<"links">> := Conf} =
+    #{<<"cluster">> := #{<<"links">> := Links}} =
         emqx_config:fill_defaults(
-            #{<<"links">> => emqx_conf:get_raw(?CONF_PATH)},
+            #{<<"cluster">> => #{<<"links">> => emqx_conf:get_raw(?CONF_PATH)}},
             #{obfuscate_sensitive_values => true}
         ),
-    Conf.
+    Links.
+
+-spec collect_all_status([{node(), {ok, #{cluster_name() => _}} | _Error}]) ->
+    {ClusterToStatus, Errors}
+when
+    ClusterToStatus :: #{
+        cluster_name() => #{
+            node := node(),
+            status := emqx_resource:resource_status() | inconsistent
+        }
+    },
+    Errors :: [{node(), term()}].
+collect_all_status(NodeResults) ->
+    {Reindexed, Errors} = lists:foldl(
+        fun
+            ({Node, {ok, AllLinkData}}, {OkAccIn, ErrAccIn}) ->
+                OkAcc = maps:fold(
+                    fun(Name, Data, AccIn) ->
+                        collect_all_status1(Node, Name, Data, AccIn)
+                    end,
+                    OkAccIn,
+                    AllLinkData
+                ),
+                {OkAcc, ErrAccIn};
+            ({Node, Error}, {OkAccIn, ErrAccIn}) ->
+                {OkAccIn, [{Node, Error} | ErrAccIn]}
+        end,
+        {#{}, []},
+        NodeResults
+    ),
+    NoErrors =
+        case Errors of
+            [] ->
+                true;
+            [_ | _] ->
+                ?SLOG(warning, #{
+                    msg => "cluster_link_api_lookup_status_bad_erpc_results",
+                    errors => Errors
+                }),
+                false
+        end,
+    ClusterToStatus = maps:fold(
+        fun(Name, NodeToData, Acc) ->
+            OnlyStatus = [S || #{status := S} <- maps:values(NodeToData)],
+            SummaryStatus =
+                case lists:usort(OnlyStatus) of
+                    [SameStatus] when NoErrors -> SameStatus;
+                    _ -> inconsistent
+                end,
+            NodeStatus = lists:map(
+                fun
+                    ({Node, #{status := S}}) ->
+                        #{node => Node, status => S};
+                    ({Node, Error0}) ->
+                        Error = emqx_logger_jsonfmt:best_effort_json(Error0),
+                        #{node => Node, status => inconsistent, reason => Error}
+                end,
+                maps:to_list(NodeToData) ++ Errors
+            ),
+            Acc#{
+                Name => #{
+                    status => SummaryStatus,
+                    node_status => NodeStatus
+                }
+            }
+        end,
+        #{},
+        Reindexed
+    ),
+    {ClusterToStatus, Errors}.
+
+collect_all_status1(Node, Name, Data, Acc) ->
+    maps:update_with(
+        Name,
+        fun(Old) -> Old#{Node => Data} end,
+        #{Node => Data},
+        Acc
+    ).
+
+collect_single_status(NodeResults) ->
+    NodeStatus =
+        lists:map(
+            fun
+                ({Node, {ok, {ok, #{status := S}}}}) ->
+                    #{node => Node, status => S};
+                ({Node, {ok, {error, _}}}) ->
+                    #{node => Node, status => ?status_disconnected};
+                ({Node, Error0}) ->
+                    Error = emqx_logger_jsonfmt:best_effort_json(Error0),
+                    #{node => Node, status => inconsistent, reason => Error}
+            end,
+            NodeResults
+        ),
+    OnlyStatus = [S || #{status := S} <- NodeStatus],
+    SummaryStatus =
+        case lists:usort(OnlyStatus) of
+            [SameStatus] -> SameStatus;
+            _ -> inconsistent
+        end,
+    #{
+        status => SummaryStatus,
+        node_status => NodeStatus
+    }.
+
+links_config_schema_response() ->
+    hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, link_config_response)), #{
+        examples => #{<<"example">> => links_config_response_example()}
+    }).
 
-links_config_schema() ->
-    emqx_cluster_link_schema:links_schema(
+link_config_schema() ->
+    hoconsc:mk(emqx_cluster_link_schema:link_schema(), #{
+        examples => #{<<"example">> => hd(links_config_example())}
+    }).
+
+link_config_schema_response() ->
+    hoconsc:mk(
+        hoconsc:ref(?MODULE, link_config_response),
         #{
-            examples => #{<<"example">> => links_config_example()}
+            examples => #{
+                <<"example">> => hd(links_config_response_example())
+            }
         }
     ).
 
+link_metrics_schema_response() ->
+    hoconsc:mk(
+        hoconsc:ref(?MODULE, link_metrics_response),
+        #{
+            examples => #{
+                <<"example">> => link_metrics_response_example()
+            }
+        }
+    ).
+
+status() ->
+    hoconsc:enum([?status_connected, ?status_disconnected, ?status_connecting, inconsistent]).
+
+param_path_name() ->
+    {name,
+        hoconsc:mk(
+            binary(),
+            #{
+                in => path,
+                required => true,
+                example => <<"my_link">>,
+                desc => ?DESC("param_path_name")
+            }
+        )}.
+
+update_link_config_schema() ->
+    proplists:delete(name, emqx_cluster_link_schema:fields("link")).
+
+links_config_response_example() ->
+    lists:map(
+        fun(LinkEx) ->
+            LinkEx#{
+                <<"status">> => <<"connected">>,
+                <<"node_status">> => [
+                    #{
+                        <<"node">> => <<"emqx1@emqx.net">>,
+                        <<"status">> => <<"connected">>
+                    }
+                ]
+            }
+        end,
+        links_config_example()
+    ).
+
 links_config_example() ->
     [
         #{
@@ -114,3 +553,39 @@ links_config_example() ->
             <<"name">> => <<"emqxcl_c">>
         }
     ].
+
+link_metrics_response_example() ->
+    #{
+        <<"metrics">> => #{<<"routes">> => 10240},
+        <<"node_metrics">> => [
+            #{
+                <<"node">> => <<"emqx1@emqx.net">>,
+                <<"metrics">> => #{<<"routes">> => 10240}
+            }
+        ]
+    }.
+
+with_link(Name, FoundFn, NotFoundFn) ->
+    case emqx_cluster_link_config:link_raw(Name) of
+        undefined ->
+            NotFoundFn();
+        Link0 = #{} when is_function(FoundFn, 1) ->
+            Link = fill_defaults_single(Link0),
+            FoundFn(Link);
+        _Link = #{} when is_function(FoundFn, 0) ->
+            FoundFn()
+    end.
+
+fill_defaults_single(Link0) ->
+    #{<<"cluster">> := #{<<"links">> := [Link]}} =
+        emqx_config:fill_defaults(
+            #{<<"cluster">> => #{<<"links">> => [Link0]}},
+            #{obfuscate_sensitive_values => true}
+        ),
+    Link.
+
+return(Response) ->
+    fun() -> Response end.
+
+not_found() ->
+    return(?NOT_FOUND(<<"Cluster link not found">>)).

+ 11 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_app.erl

@@ -22,7 +22,9 @@ start(_StartType, _StartArgs) ->
         _ ->
             ok
     end,
-    emqx_cluster_link_sup:start_link(LinksConf).
+    {ok, Sup} = emqx_cluster_link_sup:start_link(LinksConf),
+    ok = create_metrics(LinksConf),
+    {ok, Sup}.
 
 prep_stop(State) ->
     emqx_cluster_link_config:remove_handler(),
@@ -53,3 +55,11 @@ remove_msg_fwd_resources(LinksConf) ->
         end,
         LinksConf
     ).
+
+create_metrics(LinksConf) ->
+    lists:foreach(
+        fun(#{name := ClusterName}) ->
+            ok = emqx_cluster_link_metrics:maybe_create_metrics(ClusterName)
+        end,
+        LinksConf
+    ).

+ 90 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_bookkeeper.erl

@@ -0,0 +1,90 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_cluster_link_bookkeeper).
+
+%% API
+-export([
+    start_link/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_continue/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-ifdef(TEST).
+%% ms
+-define(TALLY_ROUTES_INTERVAL, 300).
+-else.
+%% ms
+-define(TALLY_ROUTES_INTERVAL, 15_000).
+-endif.
+
+%% call/cast/info events
+-record(tally_routes, {}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec start_link() -> gen_server:start_ret().
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, _InitOpts = #{}, _Opts = []).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(_Opts) ->
+    State = #{},
+    {ok, State, {continue, #tally_routes{}}}.
+
+handle_continue(#tally_routes{}, State) ->
+    handle_tally_routes(),
+    {noreply, State}.
+
+handle_call(_Call, _From, State) ->
+    {reply, {error, bad_call}, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(#tally_routes{}, State) ->
+    handle_tally_routes(),
+    {noreply, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+cluster_names() ->
+    Links = emqx_cluster_link_config:links(),
+    lists:map(fun(#{name := Name}) -> Name end, Links).
+
+ensure_timer(Event, Timeout) ->
+    _ = erlang:send_after(Timeout, self(), Event),
+    ok.
+
+handle_tally_routes() ->
+    ClusterNames = cluster_names(),
+    tally_routes(ClusterNames),
+    ensure_timer(#tally_routes{}, ?TALLY_ROUTES_INTERVAL),
+    ok.
+
+tally_routes([ClusterName | ClusterNames]) ->
+    NumRoutes = emqx_cluster_link_extrouter:count(ClusterName),
+    emqx_cluster_link_metrics:routes_set(ClusterName, NumRoutes),
+    tally_routes(ClusterNames);
+tally_routes([]) ->
+    ok.

+ 108 - 6
apps/emqx_cluster_link/src/emqx_cluster_link_config.erl

@@ -4,6 +4,8 @@
 
 -module(emqx_cluster_link_config).
 
+-feature(maybe_expr, enable).
+
 -behaviour(emqx_config_handler).
 
 -include_lib("emqx/include/logger.hrl").
@@ -28,11 +30,15 @@
 
 -export([
     %% General
+    create_link/1,
+    delete_link/1,
+    update_link/1,
     update/1,
     cluster/0,
     enabled_links/0,
     links/0,
     link/1,
+    link_raw/1,
     topic_filters/1,
     %% Connections
     emqtt_options/1,
@@ -55,6 +61,52 @@
 
 %%
 
+create_link(LinkConfig) ->
+    #{<<"name">> := Name} = LinkConfig,
+    case
+        emqx_conf:update(
+            ?LINKS_PATH,
+            {create, LinkConfig},
+            #{rawconf_with_defaults => true, override_to => cluster}
+        )
+    of
+        {ok, #{raw_config := NewConfigRows}} ->
+            NewLinkConfig = find_link(Name, NewConfigRows),
+            {ok, NewLinkConfig};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+delete_link(Name) ->
+    case
+        emqx_conf:update(
+            ?LINKS_PATH,
+            {delete, Name},
+            #{rawconf_with_defaults => true, override_to => cluster}
+        )
+    of
+        {ok, _} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+update_link(LinkConfig) ->
+    #{<<"name">> := Name} = LinkConfig,
+    case
+        emqx_conf:update(
+            ?LINKS_PATH,
+            {update, LinkConfig},
+            #{rawconf_with_defaults => true, override_to => cluster}
+        )
+    of
+        {ok, #{raw_config := NewConfigRows}} ->
+            NewLinkConfig = find_link(Name, NewConfigRows),
+            {ok, NewLinkConfig};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
 update(Config) ->
     case
         emqx_conf:update(
@@ -75,11 +127,20 @@ cluster() ->
 links() ->
     emqx:get_config(?LINKS_PATH, []).
 
+links_raw() ->
+    emqx:get_raw_config(?LINKS_PATH, []).
+
 enabled_links() ->
     [L || L = #{enable := true} <- links()].
 
 link(Name) ->
-    case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, links()) of
+    find_link(Name, links()).
+
+link_raw(Name) ->
+    find_link(Name, links_raw()).
+
+find_link(Name, Links) ->
+    case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, Links) of
         [LinkConf | _] -> LinkConf;
         [] -> undefined
     end.
@@ -133,6 +194,37 @@ remove_handler() ->
 
 pre_config_update(?LINKS_PATH, RawConf, RawConf) ->
     {ok, RawConf};
+pre_config_update(?LINKS_PATH, {create, LinkRawConf}, OldRawConf) ->
+    #{<<"name">> := Name} = LinkRawConf,
+    maybe
+        undefined ?= find_link(Name, OldRawConf),
+        NewRawConf0 = OldRawConf ++ [LinkRawConf],
+        NewRawConf = convert_certs(maybe_increment_ps_actor_incr(NewRawConf0, OldRawConf)),
+        {ok, NewRawConf}
+    else
+        _ ->
+            {error, already_exists}
+    end;
+pre_config_update(?LINKS_PATH, {update, LinkRawConf}, OldRawConf) ->
+    #{<<"name">> := Name} = LinkRawConf,
+    maybe
+        {_Found, Front, Rear} ?= safe_take(Name, OldRawConf),
+        NewRawConf0 = Front ++ [LinkRawConf] ++ Rear,
+        NewRawConf = convert_certs(maybe_increment_ps_actor_incr(NewRawConf0, OldRawConf)),
+        {ok, NewRawConf}
+    else
+        not_found ->
+            {error, not_found}
+    end;
+pre_config_update(?LINKS_PATH, {delete, Name}, OldRawConf) ->
+    maybe
+        {_Found, Front, Rear} ?= safe_take(Name, OldRawConf),
+        NewRawConf = Front ++ Rear,
+        {ok, NewRawConf}
+    else
+        _ ->
+            {error, not_found}
+    end;
 pre_config_update(?LINKS_PATH, NewRawConf, OldRawConf) ->
     {ok, convert_certs(maybe_increment_ps_actor_incr(NewRawConf, OldRawConf))}.
 
@@ -185,9 +277,10 @@ all_ok(Results) ->
 add_links(LinksConf) ->
     [add_link(Link) || Link <- LinksConf].
 
-add_link(#{enable := true} = LinkConf) ->
+add_link(#{name := ClusterName, enable := true} = LinkConf) ->
     {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(LinkConf),
     {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
+    ok = emqx_cluster_link_metrics:maybe_create_metrics(ClusterName),
     ok;
 add_link(_DisabledLinkConf) ->
     ok.
@@ -197,12 +290,13 @@ remove_links(LinksConf) ->
 
 remove_link(Name) ->
     _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
-    ensure_actor_stopped(Name).
+    _ = ensure_actor_stopped(Name),
+    emqx_cluster_link_metrics:drop_metrics(Name).
 
 update_links(LinksConf) ->
-    [update_link(Link) || Link <- LinksConf].
+    [do_update_link(Link) || Link <- LinksConf].
 
-update_link({OldLinkConf, #{enable := true, name := Name} = NewLinkConf}) ->
+do_update_link({OldLinkConf, #{enable := true, name := Name} = NewLinkConf}) ->
     case what_is_changed(OldLinkConf, NewLinkConf) of
         both ->
             _ = ensure_actor_stopped(Name),
@@ -215,7 +309,7 @@ update_link({OldLinkConf, #{enable := true, name := Name} = NewLinkConf}) ->
         msg_resource ->
             ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf)
     end;
-update_link({_OldLinkConf, #{enable := false, name := Name} = _NewLinkConf}) ->
+do_update_link({_OldLinkConf, #{enable := false, name := Name} = _NewLinkConf}) ->
     _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
     ensure_actor_stopped(Name).
 
@@ -320,3 +414,11 @@ do_convert_certs(LinkName, SSLOpts) ->
             ),
             throw({bad_ssl_config, Reason})
     end.
+
+safe_take(Name, Transformations) ->
+    case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Transformations) of
+        {_Front, []} ->
+            not_found;
+        {Front, [Found | Rear]} ->
+            {Found, Front, Rear}
+    end.

+ 17 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl

@@ -34,6 +34,9 @@
     apply_actor_operation/5
 ]).
 
+%% Internal export for bookkeeping
+-export([count/1]).
+
 %% Strictly monotonically increasing integer.
 -type smint() :: integer().
 
@@ -147,6 +150,16 @@ make_extroute_rec_pat(Entry) ->
         [{1, extroute}, {#extroute.entry, Entry}]
     ).
 
+%% Internal exports for bookkeeping
+count(ClusterName) ->
+    TopicPat = '_',
+    RouteIDPat = '_',
+    Pat = make_extroute_rec_pat(
+        emqx_trie_search:make_pat(TopicPat, ?ROUTE_ID(ClusterName, RouteIDPat))
+    ),
+    MS = [{Pat, [], [true]}],
+    ets:select_count(?EXTROUTE_TAB, MS).
+
 %%
 
 -record(state, {
@@ -280,7 +293,9 @@ apply_operation(Entry, MCounter, OpName, Lane) ->
     Marker = 1 bsl Lane,
     case MCounter band Marker of
         0 when OpName =:= add ->
-            mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker);
+            Res = mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker),
+            ?tp("cluster_link_extrouter_route_added", #{}),
+            Res;
         Marker when OpName =:= add ->
             %% Already added.
             MCounter;
@@ -289,6 +304,7 @@ apply_operation(Entry, MCounter, OpName, Lane) ->
                 0 ->
                     Record = #extroute{entry = Entry, mcounter = 0},
                     ok = mria:dirty_delete_object(?EXTROUTE_TAB, Record),
+                    ?tp("cluster_link_extrouter_route_deleted", #{}),
                     0;
                 C ->
                     C

+ 60 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl

@@ -0,0 +1,60 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_cluster_link_metrics).
+
+-include("emqx_cluster_link.hrl").
+
+%% API
+-export([
+    maybe_create_metrics/1,
+    drop_metrics/1,
+
+    get_metrics/1,
+    routes_set/2
+]).
+
+%%--------------------------------------------------------------------
+%% Type definitions
+%%--------------------------------------------------------------------
+
+-define(METRICS, [
+    ?route_metric
+]).
+-define(RATE_METRICS, []).
+
+%%--------------------------------------------------------------------
+%% metrics API
+%%--------------------------------------------------------------------
+
+get_metrics(ClusterName) ->
+    Nodes = emqx:running_nodes(),
+    Timeout = 15_000,
+    RouterResults = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout),
+    ResourceId = emqx_cluster_link_mqtt:resource_id(ClusterName),
+    ResourceResults = emqx_metrics_proto_v2:get_metrics(
+        Nodes, resource_metrics, ResourceId, Timeout
+    ),
+    lists:zip3(Nodes, RouterResults, ResourceResults).
+
+maybe_create_metrics(ClusterName) ->
+    case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of
+        true ->
+            ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, ClusterName);
+        false ->
+            ok = emqx_metrics_worker:create_metrics(
+                ?METRIC_NAME, ClusterName, ?METRICS, ?RATE_METRICS
+            )
+    end.
+
+drop_metrics(ClusterName) ->
+    ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, ClusterName).
+
+routes_set(ClusterName, Val) ->
+    catch emqx_metrics_worker:set_gauge(
+        ?METRIC_NAME, ClusterName, <<"singleton">>, ?route_metric, Val
+    ).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------

+ 64 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -9,6 +9,7 @@
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -behaviour(emqx_resource).
 -behaviour(ecpool_worker).
@@ -28,6 +29,7 @@
 ]).
 
 -export([
+    resource_id/1,
     ensure_msg_fwd_resource/1,
     remove_msg_fwd_resource/1,
     decode_route_op/1,
@@ -47,6 +49,16 @@
     forward/2
 ]).
 
+-export([
+    get_all_resources_cluster/0,
+    get_resource_cluster/1
+]).
+%% BpAPI / RPC Targets
+-export([
+    get_resource_local_v1/1,
+    get_all_resources_local_v1/0
+]).
+
 -define(MSG_CLIENTID_SUFFIX, ":msg:").
 
 -define(MQTT_HOST_OPTS, #{default_port => 1883}).
@@ -81,6 +93,12 @@
 
 -define(PUB_TIMEOUT, 10_000).
 
+-type cluster_name() :: binary().
+
+-spec resource_id(cluster_name()) -> resource_id().
+resource_id(ClusterName) ->
+    ?MSG_RES_ID(ClusterName).
+
 -spec ensure_msg_fwd_resource(map()) ->
     {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}.
 ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) ->
@@ -90,10 +108,55 @@ ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf)
     },
     emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResOpts1).
 
--spec remove_msg_fwd_resource(binary() | map()) -> ok | {error, Reason :: term()}.
+-spec remove_msg_fwd_resource(cluster_name()) -> ok | {error, Reason :: term()}.
 remove_msg_fwd_resource(ClusterName) ->
     emqx_resource:remove_local(?MSG_RES_ID(ClusterName)).
 
+-spec get_all_resources_cluster() ->
+    [{node(), emqx_rpc:erpc(#{cluster_name() => emqx_resource:resource_data()})}].
+get_all_resources_cluster() ->
+    Nodes = emqx:running_nodes(),
+    Results = emqx_cluster_link_proto_v1:get_all_resources(Nodes),
+    lists:zip(Nodes, Results).
+
+-spec get_resource_cluster(cluster_name()) ->
+    [{node(), {ok, {ok, emqx_resource:resource_data()} | {error, not_found}} | _Error}].
+get_resource_cluster(ClusterName) ->
+    Nodes = emqx:running_nodes(),
+    Results = emqx_cluster_link_proto_v1:get_resource(Nodes, ClusterName),
+    lists:zip(Nodes, Results).
+
+%% RPC Target in `emqx_cluster_link_proto_v1'.
+-spec get_resource_local_v1(cluster_name()) ->
+    {ok, emqx_resource:resource_data()} | {error, not_found}.
+get_resource_local_v1(ClusterName) ->
+    case emqx_resource:get_instance(?MSG_RES_ID(ClusterName)) of
+        {ok, _ResourceGroup, ResourceData} ->
+            {ok, ResourceData};
+        {error, not_found} ->
+            {error, not_found}
+    end.
+
+%% RPC Target in `emqx_cluster_link_proto_v1'.
+-spec get_all_resources_local_v1() -> #{cluster_name() => emqx_resource:resource_data()}.
+get_all_resources_local_v1() ->
+    lists:foldl(
+        fun
+            (?MSG_RES_ID(Name) = Id, Acc) ->
+                case emqx_resource:get_instance(Id) of
+                    {ok, ?RES_GROUP, ResourceData} ->
+                        Acc#{Name => ResourceData};
+                    _ ->
+                        Acc
+                end;
+            (_Id, Acc) ->
+                %% Doesn't follow the naming pattern; manually crafted?
+                Acc
+        end,
+        #{},
+        emqx_resource:list_group_instances(?RES_GROUP)
+    ).
+
 %%--------------------------------------------------------------------
 %% emqx_resource callbacks (message forwarding)
 %%--------------------------------------------------------------------

+ 11 - 4
apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl

@@ -12,7 +12,7 @@
 -export([injected_fields/0]).
 
 %% Used in emqx_cluster_link_api
--export([links_schema/1]).
+-export([links_schema/1, link_schema/0]).
 
 -export([
     roots/0,
@@ -30,13 +30,20 @@ namespace() -> "cluster".
 roots() -> [].
 
 injected_fields() ->
-    #{cluster => [{links, links_schema(#{})}]}.
+    #{
+        cluster => [
+            {links, links_schema(#{})}
+        ]
+    }.
 
 links_schema(Meta) ->
     ?HOCON(?ARRAY(?R_REF("link")), Meta#{
         default => [], validator => fun links_validator/1, desc => ?DESC("links")
     }).
 
+link_schema() ->
+    hoconsc:ref(?MODULE, "link").
+
 fields("link") ->
     [
         {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})},
@@ -44,8 +51,8 @@ fields("link") ->
         {server,
             emqx_schema:servers_sc(#{required => true, desc => ?DESC(server)}, ?MQTT_HOST_OPTS)},
         {clientid, ?HOCON(binary(), #{desc => ?DESC(clientid)})},
-        {username, ?HOCON(binary(), #{desc => ?DESC(username)})},
-        {password, emqx_schema_secret:mk(#{desc => ?DESC(password)})},
+        {username, ?HOCON(binary(), #{required => false, desc => ?DESC(username)})},
+        {password, emqx_schema_secret:mk(#{required => false, desc => ?DESC(password)})},
         {ssl, #{
             type => ?R_REF(emqx_schema, "ssl_client_opts"),
             default => #{<<"enable">> => false},

+ 14 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl

@@ -6,6 +6,8 @@
 
 -behaviour(supervisor).
 
+-include("emqx_cluster_link.hrl").
+
 -export([start_link/1]).
 
 -export([
@@ -27,12 +29,14 @@ init(LinksConf) ->
         intensity => 10,
         period => 5
     },
+    Metrics = emqx_metrics_worker:child_spec(metrics, ?METRIC_NAME),
+    BookKeeper = bookkeeper_spec(),
     ExtrouterGC = extrouter_gc_spec(),
     RouteActors = [
         sup_spec(Name, ?ACTOR_MODULE, [LinkConf])
      || #{name := Name} = LinkConf <- LinksConf
     ],
-    {ok, {SupFlags, [ExtrouterGC | RouteActors]}}.
+    {ok, {SupFlags, [Metrics, BookKeeper, ExtrouterGC | RouteActors]}}.
 
 extrouter_gc_spec() ->
     %% NOTE: This one is currently global, not per-link.
@@ -53,6 +57,15 @@ sup_spec(Id, Mod, Args) ->
         modules => [Mod]
     }.
 
+bookkeeper_spec() ->
+    #{
+        id => bookkeeper,
+        start => {emqx_cluster_link_bookkeeper, start_link, []},
+        restart => permanent,
+        type => worker,
+        shutdown => 5_000
+    }.
+
 ensure_actor(#{name := Name} = LinkConf) ->
     case supervisor:start_child(?SERVER, sup_spec(Name, ?ACTOR_MODULE, [LinkConf])) of
         {ok, Pid} ->

+ 31 - 0
apps/emqx_cluster_link/src/proto/emqx_cluster_link_proto_v1.erl

@@ -0,0 +1,31 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    get_resource/2,
+    get_all_resources/1
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-define(TIMEOUT, 15000).
+
+introduced_in() ->
+    "5.7.2".
+
+-spec get_resource([node()], binary()) ->
+    emqx_rpc:erpc_multicall({ok, emqx_resource:resource_data()} | {error, not_found}).
+get_resource(Nodes, ClusterName) ->
+    erpc:multicall(Nodes, emqx_cluster_link_mqtt, get_resource_local_v1, [ClusterName], ?TIMEOUT).
+
+-spec get_all_resources([node()]) ->
+    emqx_rpc:erpc_multicall(#{binary() => emqx_resource:resource_data()}).
+get_all_resources(Nodes) ->
+    erpc:multicall(Nodes, emqx_cluster_link_mqtt, get_all_resources_local_v1, [], ?TIMEOUT).

+ 634 - 44
apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl

@@ -11,6 +11,8 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 -define(API_PATH, emqx_mgmt_api_test_util:api_path(["cluster", "links"])).
 -define(CONF_PATH, [cluster, links]).
 
@@ -37,8 +39,28 @@
     "-----END CERTIFICATE-----"
 >>).
 
+-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    OtherTCs = AllTCs -- cluster_test_cases(),
+    [
+        {group, cluster}
+        | OtherTCs
+    ].
+
+groups() ->
+    [{cluster, cluster_test_cases()}].
+
+cluster_test_cases() ->
+    [
+        t_status,
+        t_metrics
+    ].
 
 init_per_suite(Config) ->
     %% This is called by emqx_machine in EMQX release
@@ -47,7 +69,7 @@ init_per_suite(Config) ->
         [
             emqx_conf,
             emqx_management,
-            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"},
+            emqx_mgmt_api_test_util:emqx_dashboard(),
             emqx_cluster_link
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
@@ -60,73 +82,641 @@ end_per_suite(Config) ->
     emqx_config:delete_override_conf_files(),
     ok.
 
+init_per_group(cluster = Group, Config) ->
+    ok = emqx_cth_suite:stop_apps([emqx_dashboard]),
+    SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(Group, Config),
+    TargetClusterSpec = emqx_cluster_link_SUITE:mk_target_cluster(Group, Config),
+    SourceNodes = [SN1 | _] = emqx_cth_cluster:start(SourceClusterSpec),
+    TargetNodes = [TN1 | _] = emqx_cth_cluster:start(TargetClusterSpec),
+    emqx_cluster_link_SUITE:start_cluster_link(SourceNodes ++ TargetNodes, Config),
+    erpc:call(SN1, emqx_cth_suite, start_apps, [
+        [emqx_management, emqx_mgmt_api_test_util:emqx_dashboard()],
+        #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
+    ]),
+    erpc:call(TN1, emqx_cth_suite, start_apps, [
+        [
+            emqx_management,
+            emqx_mgmt_api_test_util:emqx_dashboard(
+                "dashboard.listeners.http { enable = true, bind = 28083 }"
+            )
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
+    ]),
+    [
+        {source_nodes, SourceNodes},
+        {target_nodes, TargetNodes}
+        | Config
+    ];
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(cluster, Config) ->
+    SourceNodes = ?config(source_nodes, Config),
+    TargetNodes = ?config(target_nodes, Config),
+    ok = emqx_cth_cluster:stop(SourceNodes),
+    ok = emqx_cth_cluster:stop(TargetNodes),
+    _ = emqx_cth_suite:start_apps(
+        [emqx_mgmt_api_test_util:emqx_dashboard()],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
 auth_header() ->
-    {ok, API} = emqx_common_test_http:create_default_app(),
-    emqx_common_test_http:auth_header(API).
+    emqx_mgmt_api_test_util:auth_header_().
 
 init_per_testcase(_TC, Config) ->
     {ok, _} = emqx_cluster_link_config:update([]),
+    snabbkaffe:start_trace(),
     Config.
 
 end_per_testcase(_TC, _Config) ->
+    snabbkaffe:stop(),
+    emqx_common_test_helpers:call_janitor(),
     ok.
 
-t_put_get_valid(Config) ->
-    Auth = ?config(auth, Config),
-    Path = ?API_PATH,
-    {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
-    ?assertMatch([], emqx_utils_json:decode(Resp)),
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+api_root() ->
+    <<"cluster/links">>.
+
+list() ->
+    Path = emqx_mgmt_api_test_util:api_path([api_root()]),
+    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "").
+
+get_link(Name) ->
+    get_link(source, Name).
+
+get_link(SourceOrTargetCluster, Name) ->
+    Host = host(SourceOrTargetCluster),
+    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name]),
+    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "").
+
+delete_link(Name) ->
+    Path = emqx_mgmt_api_test_util:api_path([api_root(), "link", Name]),
+    emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = "").
+
+update_link(Name, Params) ->
+    update_link(source, Name, Params).
+
+update_link(SourceOrTargetCluster, Name, Params) ->
+    Host = host(SourceOrTargetCluster),
+    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name]),
+    emqx_mgmt_api_test_util:simple_request(put, Path, Params).
+
+create_link(Name, Params0) ->
+    Params = Params0#{<<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path([api_root()]),
+    emqx_mgmt_api_test_util:simple_request(post, Path, Params).
+
+get_metrics(Name) ->
+    get_metrics(source, Name).
+
+get_metrics(SourceOrTargetCluster, Name) ->
+    Host = host(SourceOrTargetCluster),
+    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name, "metrics"]),
+    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = []).
+
+host(source) -> "http://127.0.0.1:18083";
+host(target) -> "http://127.0.0.1:28083".
 
-    Link1 = #{
+link_params() ->
+    link_params(_Overrides = #{}).
+
+link_params(Overrides) ->
+    Default = #{
+        <<"clientid">> => <<"linkclientid">>,
         <<"pool_size">> => 1,
         <<"server">> => <<"emqxcl_2.nohost:31883">>,
-        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
-        <<"name">> => <<"emqcl_1">>
-    },
-    Link2 = #{
-        <<"pool_size">> => 1,
-        <<"server">> => <<"emqxcl_2.nohost:41883">>,
-        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
-        <<"name">> => <<"emqcl_2">>
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>]
     },
-    ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link1, Link2])),
+    emqx_utils_maps:deep_merge(Default, Overrides).
 
-    {ok, Resp1} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
-    ?assertMatch([Link1, Link2], emqx_utils_json:decode(Resp1)),
+%%------------------------------------------------------------------------------
+%% Test cases
+%%------------------------------------------------------------------------------
 
-    DisabledLink1 = Link1#{<<"enable">> => false},
-    ?assertMatch(
-        {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [DisabledLink1, Link2])
-    ),
+t_put_get_valid(_Config) ->
+    ?assertMatch({200, []}, list()),
+
+    Name1 = <<"emqcl_1">>,
+    Link1 = link_params(#{
+        <<"server">> => <<"emqxcl_2.nohost:31883">>,
+        <<"name">> => Name1
+    }),
+    Name2 = <<"emqcl_2">>,
+    Link2 = link_params(#{
+        <<"server">> => <<"emqxcl_2.nohost:41883">>,
+        <<"name">> => Name2
+    }),
+    ?assertMatch({201, _}, create_link(Name1, Link1)),
+    ?assertMatch({201, _}, create_link(Name2, Link2)),
+    ?assertMatch({200, [_, _]}, list()),
 
-    {ok, Resp2} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
-    ?assertMatch([DisabledLink1, Link2], emqx_utils_json:decode(Resp2)),
+    DisabledLink1 = Link1#{<<"enable">> => false},
+    ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, DisabledLink1))),
+    ?assertMatch({200, #{<<"enable">> := false}}, get_link(Name1)),
+    ?assertMatch({200, #{<<"enable">> := true}}, get_link(Name2)),
 
     SSL = #{<<"enable">> => true, <<"cacertfile">> => ?CACERT},
     SSLLink1 = Link1#{<<"ssl">> => SSL},
+    ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, SSLLink1))),
     ?assertMatch(
-        {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link2, SSLLink1])
+        {200, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}},
+        get_link(Name1)
     ),
-    {ok, Resp3} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
+    ok.
 
+t_put_invalid(_Config) ->
+    Name = <<"l1">>,
+    {201, _} = create_link(Name, link_params()),
     ?assertMatch(
-        [Link2, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}],
-        emqx_utils_json:decode(Resp3)
+        {400, _},
+        update_link(Name, maps:remove(<<"server">>, link_params()))
     ).
 
-t_put_invalid(Config) ->
-    Auth = ?config(auth, Config),
-    Path = ?API_PATH,
-    Link = #{
-        <<"pool_size">> => 1,
-        <<"server">> => <<"emqxcl_2.nohost:31883">>,
-        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
-        <<"name">> => <<"emqcl_1">>
-    },
+%% Tests a sequence of CRUD operations and their expected responses, for common use cases
+%% and configuration states.
+t_crud(_Config) ->
+    %% No links initially.
+    ?assertMatch({200, []}, list()),
+    NameA = <<"a">>,
+    ?assertMatch({404, _}, get_link(NameA)),
+    ?assertMatch({404, _}, delete_link(NameA)),
+    ?assertMatch({404, _}, update_link(NameA, link_params())),
+    ?assertMatch({404, _}, get_metrics(NameA)),
+
+    Params1 = link_params(),
     ?assertMatch(
-        {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link, Link])
+        {201, #{
+            <<"name">> := NameA,
+            <<"status">> := _,
+            <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
+        }},
+        create_link(NameA, Params1)
     ),
+    ?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, create_link(NameA, Params1)),
     ?assertMatch(
-        {error, {_, 400, _}},
-        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [maps:remove(<<"name">>, Link)])
-    ).
+        {200, [
+            #{
+                <<"name">> := NameA,
+                <<"status">> := _,
+                <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
+            }
+        ]},
+        list()
+    ),
+    ?assertMatch(
+        {200, #{
+            <<"name">> := NameA,
+            <<"status">> := _,
+            <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
+        }},
+        get_link(NameA)
+    ),
+    ?assertMatch({200, _}, get_metrics(NameA)),
+
+    Params2 = Params1#{<<"pool_size">> := 2},
+    ?assertMatch(
+        {200, #{
+            <<"name">> := NameA,
+            <<"status">> := _,
+            <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
+        }},
+        update_link(NameA, Params2)
+    ),
+
+    ?assertMatch({204, _}, delete_link(NameA)),
+    ?assertMatch({404, _}, delete_link(NameA)),
+    ?assertMatch({404, _}, get_link(NameA)),
+    ?assertMatch({404, _}, update_link(NameA, Params1)),
+    ?assertMatch({404, _}, get_metrics(NameA)),
+    ?assertMatch({200, []}, list()),
+
+    ok.
+
+%% Verifies the behavior of reported status under different conditions when listing all
+%% links and when fetching a specific link.
+t_status(Config) ->
+    [SN1 | _] = ?config(source_nodes, Config),
+    Name = <<"cl.target">>,
+    ?retry(
+        100,
+        10,
+        ?assertMatch(
+            {200, [
+                #{
+                    <<"status">> := <<"connected">>,
+                    <<"node_status">> := [
+                        #{
+                            <<"node">> := _,
+                            <<"status">> := <<"connected">>
+                        },
+                        #{
+                            <<"node">> := _,
+                            <<"status">> := <<"connected">>
+                        }
+                    ]
+                }
+            ]},
+            list()
+        )
+    ),
+    ?assertMatch(
+        {200, #{
+            <<"status">> := <<"connected">>,
+            <<"node_status">> := [
+                #{
+                    <<"node">> := _,
+                    <<"status">> := <<"connected">>
+                },
+                #{
+                    <<"node">> := _,
+                    <<"status">> := <<"connected">>
+                }
+            ]
+        }},
+        get_link(Name)
+    ),
+
+    %% If one of the nodes reports a different status, the cluster is inconsistent.
+    ProtoMod = emqx_cluster_link_proto_v1,
+    ?ON(SN1, begin
+        ok = meck:new(ProtoMod, [no_link, passthrough, no_history]),
+        meck:expect(ProtoMod, get_all_resources, fun(Nodes) ->
+            [Res1, {ok, Res2A} | Rest] = meck:passthrough([Nodes]),
+            %% Res2A :: #{cluster_name() => emqx_resource:resource_data()}
+            Res2B = maps:map(fun(_, Data) -> Data#{status := disconnected} end, Res2A),
+            [Res1, {ok, Res2B} | Rest]
+        end),
+        meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) ->
+            [Res1, {ok, {ok, Res2A}} | Rest] = meck:passthrough([Nodes, LinkName]),
+            Res2B = Res2A#{status := disconnected},
+            [Res1, {ok, {ok, Res2B}} | Rest]
+        end)
+    end),
+    on_exit(fun() -> catch ?ON(SN1, meck:unload()) end),
+    ?assertMatch(
+        {200, [
+            #{
+                <<"status">> := <<"inconsistent">>,
+                <<"node_status">> := [
+                    #{
+                        <<"node">> := _,
+                        <<"status">> := <<"connected">>
+                    },
+                    #{
+                        <<"node">> := _,
+                        <<"status">> := <<"disconnected">>
+                    }
+                ]
+            }
+        ]},
+        list()
+    ),
+    ?assertMatch(
+        {200, #{
+            <<"status">> := <<"inconsistent">>,
+            <<"node_status">> := [
+                #{
+                    <<"node">> := _,
+                    <<"status">> := <<"connected">>
+                },
+                #{
+                    <<"node">> := _,
+                    <<"status">> := <<"disconnected">>
+                }
+            ]
+        }},
+        get_link(Name)
+    ),
+
+    %% Simulating erpc failures
+    ?ON(SN1, begin
+        meck:expect(ProtoMod, get_all_resources, fun(Nodes) ->
+            [Res1, _ | Rest] = meck:passthrough([Nodes]),
+            [Res1, {error, {erpc, noconnection}} | Rest]
+        end),
+        meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) ->
+            [Res1, _ | Rest] = meck:passthrough([Nodes, LinkName]),
+            [Res1, {error, {erpc, noconnection}} | Rest]
+        end)
+    end),
+    ?assertMatch(
+        {200, [
+            #{
+                <<"status">> := <<"inconsistent">>,
+                <<"node_status">> := [
+                    #{
+                        <<"node">> := _,
+                        <<"status">> := <<"connected">>
+                    },
+                    #{
+                        <<"node">> := _,
+                        <<"status">> := <<"inconsistent">>,
+                        <<"reason">> := _
+                    }
+                ]
+            }
+        ]},
+        list()
+    ),
+    ?assertMatch(
+        {200, #{
+            <<"status">> := <<"inconsistent">>,
+            <<"node_status">> := [
+                #{
+                    <<"node">> := _,
+                    <<"status">> := <<"connected">>
+                },
+                #{
+                    <<"node">> := _,
+                    <<"status">> := <<"inconsistent">>,
+                    <<"reason">> := _
+                }
+            ]
+        }},
+        get_link(Name)
+    ),
+    %% Simulate another inconsistency
+    ?ON(SN1, begin
+        meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) ->
+            [Res1, _ | Rest] = meck:passthrough([Nodes, LinkName]),
+            [Res1, {ok, {error, not_found}} | Rest]
+        end)
+    end),
+    ?assertMatch(
+        {200, #{
+            <<"status">> := <<"inconsistent">>,
+            <<"node_status">> := [
+                #{
+                    <<"node">> := _,
+                    <<"status">> := <<"connected">>
+                },
+                #{
+                    <<"node">> := _,
+                    <<"status">> := <<"disconnected">>
+                }
+            ]
+        }},
+        get_link(Name)
+    ),
+
+    ok.
+
+t_metrics(Config) ->
+    ct:timetrap({seconds, 10}),
+    [SN1, SN2] = ?config(source_nodes, Config),
+    [TN1, TN2] = ?config(target_nodes, Config),
+    %% N.B. Link names on each cluster, so they are switched.
+    SourceName = <<"cl.target">>,
+    TargetName = <<"cl.source">>,
+
+    ?assertMatch(
+        {200, #{
+            <<"metrics">> := #{
+                <<"router">> := #{
+                    <<"routes">> := 0
+                },
+                <<"forwarding">> := #{
+                    <<"matched">> := _,
+                    <<"success">> := _,
+                    <<"failed">> := _,
+                    <<"dropped">> := _,
+                    <<"retried">> := _,
+                    <<"received">> := _,
+                    <<"queuing">> := _,
+                    <<"inflight">> := _,
+                    <<"rate">> := _,
+                    <<"rate_last5m">> := _,
+                    <<"rate_max">> := _
+                }
+            },
+            <<"node_metrics">> := [
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> := #{
+                        <<"router">> := #{
+                            <<"routes">> := 0
+                        },
+                        <<"forwarding">> := #{
+                            <<"matched">> := _,
+                            <<"success">> := _,
+                            <<"failed">> := _,
+                            <<"dropped">> := _,
+                            <<"retried">> := _,
+                            <<"received">> := _,
+                            <<"queuing">> := _,
+                            <<"inflight">> := _,
+                            <<"rate">> := _,
+                            <<"rate_last5m">> := _,
+                            <<"rate_max">> := _
+                        }
+                    }
+                },
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> := #{
+                        <<"router">> := #{
+                            <<"routes">> := 0
+                        },
+                        <<"forwarding">> := #{
+                            <<"matched">> := _,
+                            <<"success">> := _,
+                            <<"failed">> := _,
+                            <<"dropped">> := _,
+                            <<"retried">> := _,
+                            <<"received">> := _,
+                            <<"queuing">> := _,
+                            <<"inflight">> := _,
+                            <<"rate">> := _,
+                            <<"rate_last5m">> := _,
+                            <<"rate_max">> := _
+                        }
+                    }
+                }
+            ]
+        }},
+        get_metrics(source, SourceName)
+    ),
+    ?assertMatch(
+        {200, #{
+            <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
+            <<"node_metrics">> := [
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}
+                },
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}
+                }
+            ]
+        }},
+        get_metrics(target, TargetName)
+    ),
+
+    SourceC1 = emqx_cluster_link_SUITE:start_client(<<"sc1">>, SN1),
+    SourceC2 = emqx_cluster_link_SUITE:start_client(<<"sc2">>, SN2),
+    {ok, _, _} = emqtt:subscribe(SourceC1, <<"t/sc1">>),
+    {ok, _, _} = emqtt:subscribe(SourceC2, <<"t/sc2">>),
+
+    %% Still no routes, as routes in the source cluster are replicated to the target
+    %% cluster.
+    ?assertMatch(
+        {200, #{
+            <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
+            <<"node_metrics">> := [
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}
+                },
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}
+                }
+            ]
+        }},
+        get_metrics(source, SourceName)
+    ),
+    ?assertMatch(
+        {200, #{
+            <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
+            <<"node_metrics">> := [
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}
+                },
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}
+                }
+            ]
+        }},
+        get_metrics(target, TargetName)
+    ),
+
+    TargetC1 = emqx_cluster_link_SUITE:start_client(<<"tc1">>, TN1),
+    TargetC2 = emqx_cluster_link_SUITE:start_client(<<"tc2">>, TN2),
+    {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/tc1">>),
+    {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/tc2">>),
+    {_, {ok, _}} =
+        ?wait_async_action(
+            begin
+                {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/tc1">>),
+                {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/tc2">>)
+            end,
+            #{?snk_kind := clink_route_sync_complete}
+        ),
+
+    %% Routes = 2 in source cluster, because the target cluster has some topic filters
+    %% configured and subscribers to them, which were replicated to the source cluster.
+    %% This metric is global (cluster-wide).
+    ?retry(
+        300,
+        10,
+        ?assertMatch(
+            {200, #{
+                <<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}},
+                <<"node_metrics">> := [
+                    #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}},
+                    #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}}
+                ]
+            }},
+            get_metrics(source, SourceName)
+        )
+    ),
+    ?assertMatch(
+        {200, #{
+            <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
+            <<"node_metrics">> := _
+        }},
+        get_metrics(target, TargetName)
+    ),
+
+    %% Unsubscribe and remove route.
+    ct:pal("unsubscribing"),
+    {_, {ok, _}} =
+        ?wait_async_action(
+            begin
+                {ok, _, _} = emqtt:unsubscribe(TargetC1, <<"t/tc1">>)
+            end,
+            #{?snk_kind := clink_route_sync_complete}
+        ),
+
+    ?retry(
+        300,
+        10,
+        ?assertMatch(
+            {200, #{
+                <<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}},
+                <<"node_metrics">> := [
+                    #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}},
+                    #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}}
+                ]
+            }},
+            get_metrics(source, SourceName)
+        )
+    ),
+
+    %% Disabling the link should remove the routes.
+    ct:pal("disabling"),
+    {200, TargetLink0} = get_link(target, TargetName),
+    TargetLink1 = maps:without([<<"status">>, <<"node_status">>], TargetLink0),
+    TargetLink2 = TargetLink1#{<<"enable">> := false},
+    {_, {ok, _}} =
+        ?wait_async_action(
+            begin
+                {200, _} = update_link(target, TargetName, TargetLink2),
+                %% Note that only when the GC runs and collects the stopped actor it'll actually
+                %% remove the routes
+                NowMS = erlang:system_time(millisecond),
+                TTL = emqx_cluster_link_config:actor_ttl(),
+                ct:pal("gc"),
+                %% 2 Actors: one for normal routes, one for PS routes
+                1 = ?ON(SN1, emqx_cluster_link_extrouter:actor_gc(#{timestamp => NowMS + TTL * 3})),
+                1 = ?ON(SN1, emqx_cluster_link_extrouter:actor_gc(#{timestamp => NowMS + TTL * 3})),
+                ct:pal("gc done"),
+                ok
+            end,
+            #{?snk_kind := "cluster_link_extrouter_route_deleted"}
+        ),
+
+    ?retry(
+        300,
+        10,
+        ?assertMatch(
+            {200, #{
+                <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
+                <<"node_metrics">> := _
+            }},
+            get_metrics(source, SourceName)
+        )
+    ),
+
+    %% Enabling again
+    TargetLink3 = TargetLink2#{<<"enable">> := true},
+    {_, {ok, _}} =
+        ?wait_async_action(
+            begin
+                {200, _} = update_link(target, TargetName, TargetLink3)
+            end,
+            #{?snk_kind := "cluster_link_extrouter_route_added"}
+        ),
+
+    ?retry(
+        300,
+        10,
+        ?assertMatch(
+            {200, #{
+                <<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}},
+                <<"node_metrics">> := _
+            }},
+            get_metrics(source, SourceName)
+        )
+    ),
+
+    ok.

+ 27 - 0
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -293,3 +293,30 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) ->
         FileNames
     ),
     erlang:iolist_to_binary([WithPaths, StartBoundary, <<"--">>, LineSeparator]).
+
+maybe_json_decode(X) ->
+    case emqx_utils_json:safe_decode(X, [return_maps]) of
+        {ok, Decoded} -> Decoded;
+        {error, _} -> X
+    end.
+
+simple_request(Method, Path, Params) ->
+    AuthHeader = auth_header_(),
+    Opts = #{return_all => true},
+    case request_api(Method, Path, "", AuthHeader, Params, Opts) of
+        {ok, {{_, Status, _}, _Headers, Body0}} ->
+            Body = maybe_json_decode(Body0),
+            {Status, Body};
+        {error, {{_, Status, _}, _Headers, Body0}} ->
+            Body =
+                case emqx_utils_json:safe_decode(Body0, [return_maps]) of
+                    {ok, Decoded0 = #{<<"message">> := Msg0}} ->
+                        Msg = maybe_json_decode(Msg0),
+                        Decoded0#{<<"message">> := Msg};
+                    {ok, Decoded0} ->
+                        Decoded0;
+                    {error, _} ->
+                        Body0
+                end,
+            {Status, Body}
+    end.

+ 2 - 0
apps/emqx_utils/include/emqx_utils_api.hrl

@@ -21,6 +21,8 @@
 
 -define(OK(CONTENT), {200, CONTENT}).
 
+-define(CREATED(CONTENT), {201, CONTENT}).
+
 -define(NO_CONTENT, 204).
 
 -define(BAD_REQUEST(CODE, REASON), {400, ?ERROR_MSG(CODE, REASON)}).