Przeglądaj źródła

feat(cluster-link): introduce cached links shortconf

That is responsible for getting rid of redundancy and precomputing.
Andrew Mayorov 1 rok temu
rodzic
commit
cbee849e9c

+ 1 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_api.erl

@@ -625,7 +625,7 @@ link_metrics_response_example() ->
     }.
 
 with_link(Name, FoundFn, NotFoundFn) ->
-    case emqx_cluster_link_config:link_raw(Name) of
+    case emqx_cluster_link_config:get_link_raw(Name) of
         undefined ->
             NotFoundFn();
         Link0 = #{} when is_function(FoundFn, 1) ->

+ 3 - 3
apps/emqx_cluster_link/src/emqx_cluster_link_app.erl

@@ -10,10 +10,10 @@
 
 start(_StartType, _StartArgs) ->
     ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()),
-    emqx_cluster_link_config:add_handler(),
-    LinksConf = emqx_cluster_link_config:enabled_links(),
+    ok = emqx_cluster_link_config:load(),
     ok = emqx_cluster_link:register_external_broker(),
     ok = emqx_cluster_link:put_hook(),
+    LinksConf = emqx_cluster_link_config:get_enabled_links(),
     case LinksConf of
         [_ | _] ->
             ok = start_msg_fwd_resources(LinksConf);
@@ -25,7 +25,7 @@ start(_StartType, _StartArgs) ->
     {ok, Sup}.
 
 prep_stop(State) ->
-    emqx_cluster_link_config:remove_handler(),
+    ok = emqx_cluster_link_config:unload(),
     State.
 
 stop(_State) ->

+ 140 - 83
apps/emqx_cluster_link/src/emqx_cluster_link_config.erl

@@ -6,8 +6,6 @@
 
 -feature(maybe_expr, enable).
 
--behaviour(emqx_config_handler).
-
 -include_lib("emqx/include/logger.hrl").
 
 -define(LINKS_PATH, [cluster, links]).
@@ -29,19 +27,17 @@
 -define(ACTOR_FIELDS, [topics]).
 
 -export([
-    %% General
-    create_link/1,
-    delete_link/1,
-    update_link/1,
-    update/1,
-    cluster/0,
-    enabled_links/0,
+    %% Shortconf
     links/0,
     link/1,
-    link_raw/1,
-    topic_filters/1,
+    enabled_links/0,
+    %% Configuration
+    cluster/0,
+    get_links/0,
+    get_link/1,
+    get_enabled_links/0,
+    get_link_raw/1,
     %% Connections
-    emqtt_options/1,
     mk_emqtt_options/1,
     %% Actor Lifecycle
     actor_ttl/0,
@@ -49,16 +45,114 @@
     actor_heartbeat_interval/0
 ]).
 
+%% Managing configuration:
 -export([
-    add_handler/0,
-    remove_handler/0
+    update/1,
+    create_link/1,
+    delete_link/1,
+    update_link/1
 ]).
 
+%% Application lifecycle:
+-export([
+    load/0,
+    unload/0
+]).
+
+-behaviour(emqx_config_handler).
 -export([
     pre_config_update/3,
     post_config_update/5
 ]).
 
+%% Test exports
+-export([prepare_link/1]).
+
+-export_type([shortconf/0]).
+
+-type shortconf() :: #{
+    name := binary(),
+    enable := boolean(),
+    topics := _Union :: [emqx_types:words()]
+}.
+
+-define(PTERM(K), {?MODULE, K}).
+
+%%
+
+cluster() ->
+    atom_to_binary(emqx_config:get([cluster, name])).
+
+-spec links() -> [shortconf()].
+links() ->
+    get_shortconf(links).
+
+-spec enabled_links() -> [shortconf()].
+enabled_links() ->
+    get_shortconf(enabled).
+
+-spec link(_Name :: binary()) -> shortconf() | undefined.
+link(Name) ->
+    find_link(Name, links()).
+
+-spec get_links() -> [emqx_cluster_link_schema:link()].
+get_links() ->
+    emqx:get_config(?LINKS_PATH, []).
+
+-spec get_enabled_links() -> [emqx_cluster_link_schema:link()].
+get_enabled_links() ->
+    [L || L = #{enable := true} <- get_links()].
+
+-spec get_link(_Name :: binary()) -> emqx_cluster_link_schema:link() | undefined.
+get_link(Name) ->
+    find_link(Name, get_links()).
+
+-spec get_link_raw(_Name :: binary()) -> emqx_config:raw_config().
+get_link_raw(Name) ->
+    find_link(Name, get_links_raw()).
+
+get_links_raw() ->
+    emqx:get_raw_config(?LINKS_PATH, []).
+
+find_link(Name, Links) ->
+    case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, Links) of
+        [LinkConf | _] -> LinkConf;
+        [] -> undefined
+    end.
+
+-spec actor_ttl() -> _Milliseconds :: pos_integer().
+actor_ttl() ->
+    ?DEFAULT_ACTOR_TTL.
+
+-spec actor_gc_interval() -> _Milliseconds :: pos_integer().
+actor_gc_interval() ->
+    actor_ttl().
+
+-spec actor_heartbeat_interval() -> _Milliseconds :: pos_integer().
+actor_heartbeat_interval() ->
+    actor_ttl() div 3.
+
+%%
+
+mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) ->
+    ClientId = maps:get(clientid, LinkConf, cluster()),
+    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS),
+    Opts = maps:with([username, retry_interval, max_inflight], LinkConf),
+    Opts1 = Opts#{
+        host => Host,
+        port => Port,
+        clientid => ClientId,
+        proto_ver => v5,
+        ssl => EnableSsl,
+        ssl_opts => maps:to_list(maps:remove(enable, Ssl))
+    },
+    with_password(Opts1, LinkConf).
+
+with_password(Opts, #{password := P} = _LinkConf) ->
+    Opts#{password => emqx_secret:unwrap(P)};
+with_password(Opts, _LinkConf) ->
+    Opts.
+
 %%
 
 create_link(LinkConfig) ->
@@ -121,76 +215,13 @@ update(Config) ->
             {error, Reason}
     end.
 
-cluster() ->
-    atom_to_binary(emqx_config:get([cluster, name])).
-
-links() ->
-    emqx:get_config(?LINKS_PATH, []).
-
-links_raw() ->
-    emqx:get_raw_config(?LINKS_PATH, []).
-
-enabled_links() ->
-    [L || L = #{enable := true} <- links()].
-
-link(Name) ->
-    find_link(Name, links()).
-
-link_raw(Name) ->
-    find_link(Name, links_raw()).
-
-find_link(Name, Links) ->
-    case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, Links) of
-        [LinkConf | _] -> LinkConf;
-        [] -> undefined
-    end.
-
-emqtt_options(LinkName) ->
-    emqx_maybe:apply(fun mk_emqtt_options/1, ?MODULE:link(LinkName)).
-
-topic_filters(LinkName) ->
-    maps:get(topics, ?MODULE:link(LinkName), []).
-
--spec actor_ttl() -> _Milliseconds :: pos_integer().
-actor_ttl() ->
-    ?DEFAULT_ACTOR_TTL.
-
--spec actor_gc_interval() -> _Milliseconds :: pos_integer().
-actor_gc_interval() ->
-    actor_ttl().
-
--spec actor_heartbeat_interval() -> _Milliseconds :: pos_integer().
-actor_heartbeat_interval() ->
-    actor_ttl() div 3.
-
-%%
-
-mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) ->
-    ClientId = maps:get(clientid, LinkConf, cluster()),
-    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS),
-    Opts = maps:with([username, retry_interval, max_inflight], LinkConf),
-    Opts1 = Opts#{
-        host => Host,
-        port => Port,
-        clientid => ClientId,
-        proto_ver => v5,
-        ssl => EnableSsl,
-        ssl_opts => maps:to_list(maps:remove(enable, Ssl))
-    },
-    with_password(Opts1, LinkConf).
-
-with_password(Opts, #{password := P} = _LinkConf) ->
-    Opts#{password => emqx_secret:unwrap(P)};
-with_password(Opts, _LinkConf) ->
-    Opts.
-
-%%
-
-add_handler() ->
+load() ->
+    ok = prepare_shortconf(get_links()),
     ok = emqx_config_handler:add_handler(?LINKS_PATH, ?MODULE).
 
-remove_handler() ->
-    ok = emqx_config_handler:remove_handler(?LINKS_PATH).
+unload() ->
+    ok = emqx_config_handler:remove_handler(?LINKS_PATH),
+    ok = cleanup_shortconf().
 
 pre_config_update(?LINKS_PATH, RawConf, RawConf) ->
     {ok, RawConf};
@@ -242,11 +273,37 @@ post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) ->
     IsAllOk = all_ok(RemovedRes) andalso all_ok(AddedRes) andalso all_ok(UpdatedRes),
     case IsAllOk of
         true ->
-            ok;
+            prepare_shortconf(New);
         false ->
             {error, #{added => AddedRes, removed => RemovedRes, updated => UpdatedRes}}
     end.
 
+%%
+
+prepare_shortconf(Config) ->
+    Links = [prepare_link(L) || L <- Config],
+    ok = persistent_term:put(?PTERM(links), Links),
+    ok = persistent_term:put(?PTERM(enabled), [L || L = #{enable := true} <- Links]).
+
+cleanup_shortconf() ->
+    _ = persistent_term:erase(?PTERM(links)),
+    _ = persistent_term:erase(?PTERM(enabled)),
+    ok.
+
+prepare_link(#{name := Name, enable := Enabled, topics := Topics}) ->
+    #{
+        name => Name,
+        enable => Enabled,
+        topics => prepare_topics(Topics)
+    }.
+
+prepare_topics(Topics) ->
+    Union = emqx_topic:union(Topics),
+    lists:map(fun emqx_topic:words/1, Union).
+
+get_shortconf(K) ->
+    persistent_term:get(?PTERM(K)).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -101,7 +101,7 @@
 resource_id(ClusterName) ->
     ?MSG_RES_ID(ClusterName).
 
--spec ensure_msg_fwd_resource(map()) ->
+-spec ensure_msg_fwd_resource(emqx_cluster_link_schema:link()) ->
     {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}.
 ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) ->
     ResOpts1 = ResOpts#{