瀏覽代碼

feature(telemetry): support for collecting telemetry data and disable it by default (#3653)

tigercl 5 年之前
父節點
當前提交
e37a70509b
共有 5 個文件被更改,包括 529 次插入4 次删除
  1. 29 0
      etc/emqx.conf
  2. 25 0
      priv/emqx.schema
  3. 9 4
      src/emqx_kernel_sup.erl
  4. 401 0
      src/emqx_telemetry.erl
  5. 65 0
      test/emqx_telemetry_SUITE.erl

+ 29 - 0
etc/emqx.conf

@@ -2187,4 +2187,33 @@ alarm.size_limit = 1000
 ## Default: 24h
 alarm.validity_period = 24h
 
+##--------------------------------------------------------------------
+## Telemetry
+##--------------------------------------------------------------------
+
+## Enable telemetry
+##
+## Value: true | false
+##
+## Default: false
+telemetry.enabled = false
+
+## The destination URL for the telemetry data report
+##
+## Value: String
+##
+## Default: https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry
+telemetry.url = https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry
+
+## Interval for reporting telemetry data
+##
+## Value: Duration
+## -d: day
+## -h: hour
+## -m: minute
+## -s: second
+##
+## Default: 7d
+telemetry.report_interval = 7d
+
 {{ additional_configs }}

+ 25 - 0
priv/emqx.schema

@@ -2165,3 +2165,28 @@ end}.
      {size_limit, cuttlefish:conf_get("alarm.size_limit", Conf)},
      {validity_period, cuttlefish:conf_get("alarm.validity_period", Conf)}]
 end}.
+
+%%--------------------------------------------------------------------
+%% Telemetry
+%%--------------------------------------------------------------------
+{mapping, "telemetry.enabled", "emqx.telemetry", [
+  {default, false},
+  {datatype, {enum, [true, false]}}
+]}.
+
+{mapping, "telemetry.url", "emqx.telemetry", [
+  {default, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry"},
+  {datatype, string}
+]}.
+
+{mapping, "telemetry.report_interval", "emqx.telemetry", [
+  {default, "7d"},
+  {datatype, {duration, s}}
+]}.
+
+{translation, "emqx.telemetry", fun(Conf) ->
+  [ {enabled,         cuttlefish:conf_get("telemetry.enabled", Conf)}
+  , {url,             cuttlefish:conf_get("telemetry.url", Conf)}
+  , {report_interval, cuttlefish:conf_get("telemetry.report_interval", Conf)}
+  ]
+end}.

+ 9 - 4
src/emqx_kernel_sup.erl

@@ -32,24 +32,29 @@ init([]) ->
            child_spec(emqx_hooks, worker),
            child_spec(emqx_stats, worker),
            child_spec(emqx_metrics, worker),
+           child_spec(emqx_telemetry, worker, [config(telemetry)]),
            child_spec(emqx_ctl, worker),
            child_spec(emqx_zone, worker)]}}.
 
-child_spec(M, worker) ->
+child_spec(M, Type) ->
+    child_spec(M, Type, []).
+
+child_spec(M, worker, Args) ->
     #{id       => M,
-      start    => {M, start_link, []},
+      start    => {M, start_link, Args},
       restart  => permanent,
       shutdown => 5000,
       type     => worker,
       modules  => [M]
      };
 
-child_spec(M, supervisor) ->
+child_spec(M, supervisor, Args) ->
     #{id       => M,
-      start    => {M, start_link, []},
+      start    => {M, start_link, Args},
       restart  => permanent,
       shutdown => infinity,
       type     => supervisor,
       modules  => [M]
      }.
 
+config(Name) -> emqx:get_env(Name, []).

+ 401 - 0
src/emqx_telemetry.erl

@@ -0,0 +1,401 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_telemetry).
+
+-behaviour(gen_server).
+
+-include("emqx.hrl").
+-include("logger.hrl").
+
+ -include_lib("kernel/include/file.hrl").
+
+-logger_header("[Telemetry]").
+
+%% Mnesia bootstrap
+-export([mnesia/1]).
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
+
+-export([ start_link/1
+        , stop/0
+        ]).
+
+%% gen_server callbacks
+-export([ init/1
+        , handle_call/3
+        , handle_cast/2
+        , handle_info/2
+        , terminate/2
+        , code_change/3
+        ]).
+
+-export([ enable/0
+        , disable/0
+        , is_enabled/0
+        , get_uuid/0
+        , get_telemetry/0
+        ]).
+
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
+-import(proplists, [ get_value/2
+                   , get_value/3
+                   ]).
+
+-record(telemetry, {
+          id :: non_neg_integer(),
+
+          uuid :: binary(),
+
+          enabled :: boolean()
+        }).
+
+-record(state, {
+          uuid :: undefined | binary(),
+
+          enabled :: undefined | boolean(),
+
+          url :: string(),
+
+          report_interval :: undefined | non_neg_integer(),
+
+          timer = undefined :: undefined | reference()
+        }).
+
+%% The count of 100-nanosecond intervals between the UUID epoch 
+%% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00.
+-define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000).
+
+-define(UNIQUE_ID, 9527).
+
+-define(TELEMETRY, emqx_telemetry).
+
+%%--------------------------------------------------------------------
+%% Mnesia bootstrap
+%%--------------------------------------------------------------------
+
+mnesia(boot) ->
+    ok = ekka_mnesia:create_table(?TELEMETRY,
+             [{type, set},
+              {disc_copies, [node()]},
+              {local_content, true},
+              {record_name, telemetry},
+              {attributes, record_info(fields, telemetry)}]);
+mnesia(copy) ->
+    ok = ekka_mnesia:copy_table(?TELEMETRY, disc_copies).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+start_link(Opts) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
+
+stop() ->
+    gen_server:stop(?MODULE).
+
+enable() ->
+    gen_server:call(?MODULE, enable).
+
+disable() ->
+    gen_server:call(?MODULE, disable).
+
+is_enabled() ->
+    gen_server:call(?MODULE, is_enabled).
+
+get_uuid() ->
+    gen_server:call(?MODULE, get_uuid).
+
+get_telemetry() ->
+    gen_server:call(?MODULE, get_telemetry).
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([Opts]) ->
+    State = #state{url = get_value(url, Opts),
+                   report_interval = timer:seconds(get_value(report_interval, Opts))},
+    NState = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of
+                 [] ->
+                     Enabled = get_value(enabled, Opts),
+                     UUID = generate_uuid(),
+                     mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
+                                                               uuid = UUID,
+                                                               enabled = Enabled}),
+                     State#state{enabled = Enabled, uuid = UUID};
+                 [#telemetry{uuid = UUID, enabled = Enabled} | _] ->
+                     State#state{enabled = Enabled, uuid = UUID}
+             end,
+    {ok, ensure_first_report_timer(timer:seconds(1), NState)}.
+
+handle_call(enable, _From, State = #state{uuid = UUID}) ->
+    mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
+                                              uuid = UUID,
+                                              enabled = true}),
+    {reply, ok, ensure_report_timer(State#state{enabled = true})};
+
+handle_call(disable, _From, State = #state{uuid = UUID}) ->
+    mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
+                                              uuid = UUID,
+                                              enabled = false}),
+    {reply, ok, State#state{enabled = false}};
+
+handle_call(is_enabled, _From, State = #state{enabled = Enabled}) ->
+    {reply, Enabled, State};
+
+handle_call(get_uuid, _From, State = #state{uuid = UUID}) ->
+    {reply, {ok, UUID}, State};
+
+handle_call(get_telemetry, _From, State) ->
+    {reply, {ok, get_telemetry(State)}, State};
+
+handle_call(Req, _From, State) ->
+    ?LOG(error, "Unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
+
+handle_cast(Msg, State) ->
+    ?LOG(error, "Unexpected msg: ~p", [Msg]),
+    {noreply, State}.
+
+handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef,
+                                                                           enabled = false}) ->
+    {noreply, State};
+handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef}) ->
+    report_telemetry(State),
+    {noreply, ensure_report_timer(State)};
+
+handle_info(Info, State) ->
+    ?LOG(error, "Unexpected info: ~p", [Info]),
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+ensure_first_report_timer(FirstReportInterval, State) ->
+    State#state{timer = emqx_misc:start_timer(FirstReportInterval, time_to_report_telemetry_data)}.
+
+ensure_report_timer(State = #state{report_interval = ReportInterval}) ->
+    State#state{timer = emqx_misc:start_timer(ReportInterval, time_to_report_telemetry_data)}.
+
+emqx_version() ->
+    {ok, Version} = application:get_key(emqx, vsn),
+    Version.
+
+license() ->
+    case application:get_key(emqx, description) of
+        {ok, "EMQ X Broker"} ->
+            [{edition, <<"community">>}];
+        {ok, "EMQ X Enterprise"} ->
+            case search_license_callback() of
+                {error, not_found} ->
+                    [{edition, <<"enterprise">>}];
+                {M, F} ->
+                    case erlang:function_exported(M, F, 0) of
+                        true ->
+                            erlang:apply(M, F, []);
+                        false ->
+                           [{edition, <<"enterprise">>}] 
+                    end
+            end
+    end.
+
+os_info() ->
+    case erlang:system_info(os_type) of
+        {unix,darwin} ->
+            [Name | _] = string:tokens(os:cmd("sw_vers -productName"), "\n"),
+            [Version | _] = string:tokens(os:cmd("sw_vers -productVersion"), "\n"),
+            [{os_name, Name},
+             {os_version, Version}];
+        {unix, _} ->
+            case file:read_file_info("/etc/os-release") of
+                {error, _} ->
+                    [{os_name, "Unknown"},
+                     {os_version, "Unknown"}];
+                {ok, FileInfo} ->
+                    case FileInfo#file_info.access of
+                        Access when Access =:= read orelse Access =:= read_write ->
+                            OSInfo = lists:foldl(fun(Line, Acc) ->
+                                                     [Var, Value] = string:tokens(Line, "="),
+                                                     NValue = case Value of
+                                                                  _ when is_list(Value) ->
+                                                                      lists:nth(1, string:tokens(Value, "\""));
+                                                                  _ ->
+                                                                      Value
+                                                              end,
+                                                     [{Var, NValue} | Acc]
+                                                 end, [], string:tokens(os:cmd("cat /etc/os-release"), "\n")),
+                            [{os_name, get_value("NAME", OSInfo)},
+                             {os_version, get_value("VERSION", OSInfo, get_value("VERSION_ID", OSInfo))}];
+                        _ ->
+                            [{os_name, "Unknown"},
+                             {os_version, "Unknown"}]
+                    end
+            end;
+        {win32, nt} ->
+            Ver = os:cmd("ver"),
+            case re:run(Ver, "[a-zA-Z ]+ \\[Version ([0-9]+[\.])+[0-9]+\\]", [{capture, none}]) of
+                match ->
+                    [NVer | _] = string:tokens(Ver, "\r\n"),
+                    {match, [Version]} = re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]),
+                    [Name | _] = string:split(NVer, " [Version "),
+                    [{os_name, Name},
+                     {os_version, Version}];
+                nomatch ->
+                    [{os_name, "Unknown"},
+                     {os_version, "Unknown"}]
+            end
+    end.
+
+otp_version() ->
+    erlang:system_info(otp_release).
+
+uptime() ->
+    element(1, erlang:statistics(wall_clock)).
+
+nodes_uuid() ->
+    Nodes = lists:delete(node(), ekka_mnesia:running_nodes()),
+    lists:foldl(fun(Node, Acc) ->
+                    case rpc:call(Node, ?MODULE, get_uuid, []) of
+                        {badrpc, _Reason} ->
+                            Acc;
+                        UUID ->
+                            [UUID | Acc]
+                    end
+                end, [], Nodes).
+
+active_plugins() ->
+    lists:foldl(fun(#plugin{name = Name, active = Active}, Acc) ->
+                        case Active of
+                            true -> [Name | Acc];
+                            false -> Acc
+                        end
+                    end, [], emqx_plugins:list()).
+
+active_modules() ->
+    lists:foldl(fun({Name, Persistent}, Acc) ->
+                    case Persistent of
+                        true -> [Name | Acc];
+                        false -> Acc
+                    end
+                end, [], emqx_modules:list()).
+
+num_clients() ->
+    emqx_stats:getstat('connections.count').
+
+messages_sent() ->
+    emqx_metrics:val('messages.sent').
+
+messages_received() ->
+    emqx_metrics:val('messages.received').
+
+generate_uuid() ->
+    MicroSeconds = erlang:system_time(microsecond),
+    Timestamp = MicroSeconds * 10 + ?GREGORIAN_EPOCH_OFFSET,
+    <<TimeHigh:12, TimeMid:16, TimeLow:32>> = <<Timestamp:60>>,
+    <<ClockSeq:32>> = crypto:strong_rand_bytes(4),
+    <<First:7, _:1, Last:40>> = crypto:strong_rand_bytes(6),
+    <<NTimeHigh:16>> = <<16#01:4, TimeHigh:12>>,
+    <<NClockSeq:16>> = <<1:1, 0:1, ClockSeq:14>>,
+    <<Node:48>> = <<First:7, 1:1, Last:40>>,
+    list_to_binary(io_lib:format("~.16B-~.16B-~.16B-~.16B-~.16B", [TimeLow, TimeMid, NTimeHigh, NClockSeq, Node])).
+
+get_telemetry(#state{uuid = UUID}) ->
+    OSInfo = os_info(),
+    [{emqx_version, bin(emqx_version())},
+     {license, license()},
+     {os_name, bin(get_value(os_name, OSInfo))},
+     {os_version, bin(get_value(os_version, OSInfo))},
+     {otp_version, bin(otp_version())},
+     {up_time, uptime()},
+     {uuid, UUID},
+     {nodes_uuid, nodes_uuid()},
+     {active_plugins, active_plugins()},
+     {active_modules, active_modules()},
+     {num_clients, num_clients()},
+     {messages_received, messages_received()},
+     {messages_sent, messages_sent()}].
+
+report_telemetry(State = #state{url = URL}) ->
+    Data = get_telemetry(State),
+    case emqx_json:safe_encode(Data) of
+        {ok, Bin} ->
+            case httpc_request(post, URL, [], Bin) of
+                {ok, {{_, StatusCode, _}, _, _}}
+                  when StatusCode =:= 200 orelse StatusCode =:= 204 ->
+                    ?LOG(debug, "Report ~p successfully", [Bin]);
+                {ok, {{_, StatusCode, ReasonPhrase}, _, Body}} ->
+                    ?LOG(error, "Report ~p failed due to ~p ~s(~s)", [Bin, StatusCode, ReasonPhrase, Body]);
+                {error, Reason} ->
+                    ?LOG(error, "Report ~p failed due to ~p", [Bin, Reason])
+            end;
+        {error, Reason} ->
+            ?LOG(error, "Encode ~p failed due to ~p", [Data, Reason])
+    end.
+
+httpc_request(Method, URL, Headers, Body) ->
+    httpc:request(Method, {URL, Headers, "application/json", Body}, [], []).
+
+ignore_lib_apps(Apps) ->
+    LibApps = [kernel, stdlib, sasl, appmon, eldap, erts,
+               syntax_tools, ssl, crypto, mnesia, os_mon,
+               inets, goldrush, gproc, runtime_tools,
+               snmp, otp_mibs, public_key, asn1, ssh, hipe,
+               common_test, observer, webtool, xmerl, tools,
+               test_server, compiler, debugger, eunit, et,
+               wx],
+    [AppName || {AppName, _, _} <- Apps, not lists:member(AppName, LibApps)].
+
+search_license_callback() ->
+    search_license_callback(ignore_lib_apps(application:loaded_applications()), []).
+
+search_license_callback([], []) ->
+    {error, not_found};
+search_license_callback([], [Callback | _]) ->
+    Callback;
+search_license_callback([App | More], Acc) ->
+    {ok, Modules} = application:get_key(App, modules),
+    Callbacks = lists:foldl(fun(Module, AccIn) ->
+                                case proplists:get_value(license_callback, module_attributes(Module), undefined) of
+                                    undefined -> AccIn;
+                                    [Callback | _] -> [{Module, Callback} | AccIn]
+                                end
+                            end, [], Modules),
+    search_license_callback(More, Acc ++ Callbacks).
+
+module_attributes(Module) ->
+    try Module:module_info(attributes)
+    catch
+        error:undef -> [];
+        error:Reason -> error(Reason)
+    end.
+
+bin(L) when is_list(L) ->
+    list_to_binary(L);
+bin(B) when is_binary(B) ->
+    B.

+ 65 - 0
test/emqx_telemetry_SUITE.erl

@@ -0,0 +1,65 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_telemetry_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-import(proplists, [get_value/2]).
+
+all() -> emqx_ct:all(?MODULE).
+
+init_per_testcase(_, Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([]),
+    Config.
+
+end_per_testcase(_, _Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+t_uuid(_) ->
+    UUID = emqx_telemetry:generate_uuid(),
+    Parts = binary:split(UUID, <<"-">>, [global, trim]),
+    ?assertEqual(5, length(Parts)),
+    {ok, UUID2} = emqx_telemetry:get_uuid(),
+    emqx_telemetry:stop(),
+    emqx_telemetry:start_link([{enabled, true},
+                               {url, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry"},
+                               {report_interval, 7 * 24 * 60 * 60}]),
+    {ok, UUID3} = emqx_telemetry:get_uuid(),
+    ?assertEqual(UUID2, UUID3).
+
+t_get_telemetry(_) ->
+    {ok, TelemetryData} = emqx_telemetry:get_telemetry(),
+    OTPVersion = bin(erlang:system_info(otp_release)),
+    ?assertEqual(OTPVersion, get_value(otp_version, TelemetryData)),
+    {ok, UUID} = emqx_telemetry:get_uuid(),
+    ?assertEqual(UUID, get_value(uuid, TelemetryData)),
+    ?assertEqual(0, get_value(num_clients, TelemetryData)).
+
+t_enable(_) ->
+    ok = emqx_telemetry:enable(),
+    ?assertEqual(true, emqx_telemetry:is_enabled()),
+    ok = emqx_telemetry:disable(),
+    ?assertEqual(false, emqx_telemetry:is_enabled()).
+
+bin(L) when is_list(L) ->
+    list_to_binary(L);
+bin(B) when is_binary(B) ->
+    B.