Просмотр исходного кода

Merge pull request #7614 from thalesmg/telemetry-cluster-uuid

feat(telemetry): add cluster uuid
Thales Macedo Garitezi 3 лет назад
Родитель
Сommit
f5dea51e24

+ 62 - 24
apps/emqx_modules/src/emqx_telemetry.erl

@@ -47,7 +47,8 @@
 ]).
 
 -export([
-    get_uuid/0,
+    get_node_uuid/0,
+    get_cluster_uuid/0,
     get_telemetry/0
 ]).
 
@@ -67,12 +68,13 @@
 ]).
 
 -record(telemetry, {
-    id :: non_neg_integer(),
+    id :: atom(),
     uuid :: binary()
 }).
 
 -record(state, {
-    uuid :: undefined | binary(),
+    node_uuid :: undefined | binary(),
+    cluster_uuid :: undefined | binary(),
     url :: string(),
     report_interval :: non_neg_integer(),
     timer = undefined :: undefined | reference(),
@@ -85,10 +87,12 @@
 %% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00.
 -define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000).
 
--define(UNIQUE_ID, 9527).
+-define(CLUSTER_UUID_KEY, cluster_uuid).
 
 -define(TELEMETRY, emqx_telemetry).
 
+-define(TELEMETRY_SHARD, emqx_telemetry_shard).
+
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
@@ -99,7 +103,7 @@ start_link() ->
         [
             {type, set},
             {storage, disc_copies},
-            {local_content, true},
+            {rlog_shard, ?TELEMETRY_SHARD},
             {record_name, telemetry},
             {attributes, record_info(fields, telemetry)}
         ]
@@ -117,8 +121,11 @@ enable() ->
 disable() ->
     gen_server:call(?MODULE, disable).
 
-get_uuid() ->
-    gen_server:call(?MODULE, get_uuid).
+get_node_uuid() ->
+    gen_server:call(?MODULE, get_node_uuid).
+
+get_cluster_uuid() ->
+    gen_server:call(?MODULE, get_cluster_uuid).
 
 get_telemetry() ->
     gen_server:call(?MODULE, get_telemetry).
@@ -135,19 +142,8 @@ get_telemetry() ->
 -dialyzer([{nowarn_function, [init/1]}]).
 init(_Opts) ->
     State0 = empty_state(),
-    UUID1 =
-        case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of
-            [] ->
-                UUID = generate_uuid(),
-                mria:dirty_write(?TELEMETRY, #telemetry{
-                    id = ?UNIQUE_ID,
-                    uuid = UUID
-                }),
-                UUID;
-            [#telemetry{uuid = UUID} | _] ->
-                UUID
-        end,
-    {ok, State0#state{uuid = UUID1}}.
+    {NodeUUID, ClusterUUID} = ensure_uuids(),
+    {ok, State0#state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}}.
 
 handle_call(enable, _From, State0) ->
     case ?MODULE:official_version(emqx_app:get_release()) of
@@ -165,7 +161,9 @@ handle_call(disable, _From, State = #state{timer = Timer}) ->
         false ->
             {reply, {error, not_official_version}, State}
     end;
-handle_call(get_uuid, _From, State = #state{uuid = UUID}) ->
+handle_call(get_node_uuid, _From, State = #state{node_uuid = UUID}) ->
+    {reply, {ok, UUID}, State};
+handle_call(get_cluster_uuid, _From, State = #state{cluster_uuid = UUID}) ->
     {reply, {ok, UUID}, State};
 handle_call(get_telemetry, _From, State) ->
     {_State, Telemetry} = get_telemetry(State),
@@ -270,7 +268,7 @@ nodes_uuid() ->
     Nodes = lists:delete(node(), mria_mnesia:running_nodes()),
     lists:foldl(
         fun(Node, Acc) ->
-            case emqx_telemetry_proto_v1:get_uuid(Node) of
+            case emqx_telemetry_proto_v1:get_node_uuid(Node) of
                 {badrpc, _Reason} ->
                     Acc;
                 {ok, UUID} ->
@@ -322,7 +320,7 @@ generate_uuid() ->
     ).
 
 -spec get_telemetry(state()) -> {state(), proplists:proplist()}.
-get_telemetry(State0 = #state{uuid = UUID}) ->
+get_telemetry(State0 = #state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}) ->
     OSInfo = os_info(),
     {MQTTRTInsights, State} = mqtt_runtime_insights(State0),
     #{
@@ -336,7 +334,8 @@ get_telemetry(State0 = #state{uuid = UUID}) ->
         {os_version, bin(get_value(os_version, OSInfo))},
         {otp_version, bin(otp_version())},
         {up_time, uptime()},
-        {uuid, UUID},
+        {uuid, NodeUUID},
+        {cluster_uuid, ClusterUUID},
         {nodes_uuid, nodes_uuid()},
         {active_plugins, active_plugins()},
         {num_clients, num_clients()},
@@ -519,6 +518,45 @@ bin(B) when is_binary(B) ->
 bool2int(true) -> 1;
 bool2int(false) -> 0.
 
+ensure_uuids() ->
+    Txn = fun() ->
+        NodeUUID =
+            case mnesia:wread({?TELEMETRY, node()}) of
+                [] ->
+                    NodeUUID0 = generate_uuid(),
+                    mnesia:write(
+                        ?TELEMETRY,
+                        #telemetry{
+                            id = node(),
+                            uuid = NodeUUID0
+                        },
+                        write
+                    ),
+                    NodeUUID0;
+                [#telemetry{uuid = NodeUUID0}] ->
+                    NodeUUID0
+            end,
+        ClusterUUID =
+            case mnesia:wread({?TELEMETRY, ?CLUSTER_UUID_KEY}) of
+                [] ->
+                    ClusterUUID0 = generate_uuid(),
+                    mnesia:write(
+                        ?TELEMETRY,
+                        #telemetry{
+                            id = ?CLUSTER_UUID_KEY,
+                            uuid = ClusterUUID0
+                        },
+                        write
+                    ),
+                    ClusterUUID0;
+                [#telemetry{uuid = ClusterUUID0}] ->
+                    ClusterUUID0
+            end,
+        {NodeUUID, ClusterUUID}
+    end,
+    {atomic, UUIDs} = mria:transaction(?TELEMETRY_SHARD, Txn),
+    UUIDs.
+
 empty_state() ->
     #state{
         url = ?TELEMETRY_URL,

+ 9 - 4
apps/emqx_modules/src/proto/emqx_telemetry_proto_v1.erl

@@ -20,7 +20,8 @@
 
 -export([
     introduced_in/0,
-    get_uuid/1
+    get_node_uuid/1,
+    get_cluster_uuid/1
 ]).
 
 -include_lib("emqx/include/bpapi.hrl").
@@ -28,6 +29,10 @@
 introduced_in() ->
     "5.0.0".
 
--spec get_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
-get_uuid(Node) ->
-    rpc:call(Node, emqx_telemetry, get_uuid, []).
+-spec get_node_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
+get_node_uuid(Node) ->
+    rpc:call(Node, emqx_telemetry, get_node_uuid, []).
+
+-spec get_cluster_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
+get_cluster_uuid(Node) ->
+    rpc:call(Node, emqx_telemetry, get_cluster_uuid, []).

+ 120 - 16
apps/emqx_modules/test/emqx_telemetry_SUITE.erl

@@ -144,13 +144,7 @@ init_per_testcase(t_exhook_info, Config) ->
     {ok, _} = application:ensure_all_started(emqx_exhook),
     Config;
 init_per_testcase(_Testcase, Config) ->
-    TestPID = self(),
-    ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
-    ok = meck:expect(httpc, request, fun(
-        Method, {URL, Headers, _ContentType, Body}, _HTTPOpts, _Opts
-    ) ->
-        TestPID ! {request, Method, URL, Headers, Body}
-    end),
+    mock_httpc(),
     Config.
 
 end_per_testcase(t_get_telemetry, _Config) ->
@@ -198,20 +192,43 @@ end_per_testcase(_Testcase, _Config) ->
     meck:unload([httpc]),
     ok.
 
-t_uuid(_) ->
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+t_node_uuid(_) ->
     UUID = emqx_telemetry:generate_uuid(),
     Parts = binary:split(UUID, <<"-">>, [global, trim]),
     ?assertEqual(5, length(Parts)),
-    {ok, UUID2} = emqx_telemetry:get_uuid(),
+    {ok, NodeUUID2} = emqx_telemetry:get_node_uuid(),
     emqx_telemetry:disable(),
     emqx_telemetry:enable(),
     emqx_modules_conf:set_telemetry_status(false),
     emqx_modules_conf:set_telemetry_status(true),
-    {ok, UUID3} = emqx_telemetry:get_uuid(),
-    {ok, UUID4} = emqx_telemetry_proto_v1:get_uuid(node()),
-    ?assertEqual(UUID2, UUID3),
-    ?assertEqual(UUID3, UUID4),
-    ?assertMatch({badrpc, nodedown}, emqx_telemetry_proto_v1:get_uuid('fake@node')).
+    {ok, NodeUUID3} = emqx_telemetry:get_node_uuid(),
+    {ok, NodeUUID4} = emqx_telemetry_proto_v1:get_node_uuid(node()),
+    ?assertEqual(NodeUUID2, NodeUUID3),
+    ?assertEqual(NodeUUID3, NodeUUID4),
+    ?assertMatch({badrpc, nodedown}, emqx_telemetry_proto_v1:get_node_uuid('fake@node')).
+
+t_cluster_uuid(_Config) ->
+    {ok, ClusterUUID0} = emqx_telemetry:get_cluster_uuid(),
+    {ok, ClusterUUID1} = emqx_telemetry_proto_v1:get_cluster_uuid(node()),
+    ?assertEqual(ClusterUUID0, ClusterUUID1),
+    {ok, NodeUUID0} = emqx_telemetry:get_node_uuid(),
+
+    Node = start_slave(n1),
+    try
+        ok = setup_slave(Node),
+        {ok, ClusterUUID2} = emqx_telemetry_proto_v1:get_cluster_uuid(Node),
+        ?assertEqual(ClusterUUID0, ClusterUUID2),
+        {ok, NodeUUID1} = emqx_telemetry_proto_v1:get_node_uuid(Node),
+        ?assertNotEqual(NodeUUID0, NodeUUID1),
+        ok
+    after
+        ok = stop_slave(Node)
+    end,
+    ok.
 
 t_official_version(_) ->
     true = emqx_telemetry:official_version("0.0.0"),
@@ -231,8 +248,11 @@ t_get_telemetry(_Config) ->
     {ok, TelemetryData} = emqx_telemetry:get_telemetry(),
     OTPVersion = bin(erlang:system_info(otp_release)),
     ?assertEqual(OTPVersion, get_value(otp_version, TelemetryData)),
-    {ok, UUID} = emqx_telemetry:get_uuid(),
-    ?assertEqual(UUID, get_value(uuid, TelemetryData)),
+    {ok, NodeUUID} = emqx_telemetry:get_node_uuid(),
+    {ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(),
+    ?assertEqual(NodeUUID, get_value(uuid, TelemetryData)),
+    ?assertEqual(ClusterUUID, get_value(cluster_uuid, TelemetryData)),
+    ?assertNotEqual(NodeUUID, ClusterUUID),
     ?assertEqual(0, get_value(num_clients, TelemetryData)),
     BuildInfo = get_value(build_info, TelemetryData),
     ?assertMatch(
@@ -449,6 +469,10 @@ t_exhook_info(_Config) ->
     ),
     ok.
 
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
 assert_approximate(Map, Key, Expected) ->
     Value = maps:get(Key, Map),
     ?assertEqual(Expected, float_to_list(Value, [{decimals, 2}])).
@@ -562,3 +586,83 @@ set_special_configs(emqx_authz) ->
     ok;
 set_special_configs(_App) ->
     ok.
+
+start_slave(Name) ->
+    % We want VMs to only occupy a single core
+    CommonBeamOpts = "+S 1:1 ",
+    {ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()),
+    Node.
+
+%% for some unknown reason, gen_rpc running locally or in CI might
+%% start with different `port_discovery' modes, which means that'll
+%% either be listening at the port in the config (`tcp_server_port',
+%% 5369) if `manual', else it'll listen on 5370 if started as
+%% `stateless'.
+find_gen_rpc_port() ->
+    [EPort] = [
+        EPort
+     || {links, Ls} <- process_info(whereis(gen_rpc_server_tcp)),
+        EPort <- Ls,
+        is_port(EPort)
+    ],
+    {ok, {_, Port}} = inet:sockname(EPort),
+    Port.
+
+setup_slave(Node) ->
+    TestNode = node(),
+    Port = find_gen_rpc_port(),
+    [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
+    ok = rpc:call(
+        Node,
+        application,
+        set_env,
+        [gen_rpc, tcp_server_port, 9002]
+    ),
+    ok = rpc:call(
+        Node,
+        application,
+        set_env,
+        [gen_rpc, client_config_per_node, {internal, #{TestNode => Port}}]
+    ),
+    ok = rpc:call(
+        Node,
+        application,
+        set_env,
+        [gen_rpc, port_discovery, manual]
+    ),
+    Handler =
+        fun
+            (emqx) ->
+                application:set_env(
+                    emqx,
+                    boot_modules,
+                    []
+                ),
+                ekka:join(TestNode),
+                ok;
+            (_) ->
+                ok
+        end,
+    ok = rpc:call(
+        Node,
+        emqx_common_test_helpers,
+        start_apps,
+        [
+            [emqx_conf, emqx_modules],
+            Handler
+        ]
+    ),
+    ok.
+
+stop_slave(Node) ->
+    slave:stop(Node).
+
+host() ->
+    [_, Host] = string:tokens(atom_to_list(node()), "@"),
+    Host.
+
+ebin_path() ->
+    string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
+
+is_lib(Path) ->
+    string:prefix(Path, code:lib_dir()) =:= nomatch.

+ 1 - 0
apps/emqx_modules/test/emqx_telemetry_api_SUITE.erl

@@ -59,6 +59,7 @@ init_per_testcase(t_status_fail, Config) ->
 init_per_testcase(t_status, Config) ->
     meck:new(emqx_telemetry, [non_strict, passthrough]),
     meck:expect(emqx_telemetry, official_version, 1, true),
+    meck:expect(emqx_telemetry, enable, fun() -> ok end),
     Config;
 init_per_testcase(_TestCase, Config) ->
     Config.