Просмотр исходного кода

feat: otel whitelist sampler and mgmt api

JimMoen 1 год назад
Родитель
Сommit
363bb11ac9

+ 19 - 20
apps/emqx/include/emqx_external_trace.hrl

@@ -26,16 +26,11 @@
 -define(EMQX_EXTERNAL_MODULE, emqx_external_trace).
 -define(PROVIDER, {?EMQX_EXTERNAL_MODULE, trace_provider}).
 
--if(?EMQX_RELEASE_EDITION == ee).
+-ifndef(EMQX_RELEASE_EDITION).
+-define(EMQX_RELEASE_EDITION, ce).
+-endif.
 
--define(res_without_provider(TraceAction, Any),
-    case TraceAction of
-        ?EXT_TRACE_START ->
-            Any;
-        ?EXT_TRACE_STOP ->
-            ok
-    end
-).
+-if(?EMQX_RELEASE_EDITION == ee).
 
 -define(with_provider(IfRegistered, IfNotRegistered),
     case persistent_term:get(?PROVIDER, undefined) of
@@ -54,21 +49,24 @@
 ).
 
 -define(EXT_TRACE_ADD_ATTRS(Attrs),
-    case Attrs of
-        NotMap when not is_map(NotMap) -> ok;
-        EmptyMap when is_map(EmptyMap) andalso map_size(EmptyMap) =:= 0 -> ok;
-        _ -> ?with_provider(add_span_attrs(Attrs), ok)
-    end
+    ?with_provider(add_span_attrs(Attrs), ok)
 ).
 
 -define(EXT_TRACE_ADD_ATTRS(Attrs, Ctx),
     ?with_provider(add_span_attrs(Attrs, Ctx), ok)
 ).
 
--define(EXT_TRACE_WITH_ACTION(FuncName, TraceAction, Any, Attrs),
+-define(EXT_TRACE_WITH_ACTION_START(FuncName, Any, Attrs),
     ?with_provider(
-        FuncName(TraceAction, Any, Attrs),
-        ?res_without_provider(TraceAction, Any)
+        FuncName(?EXT_TRACE_START, Any, Attrs),
+        Any
+    )
+).
+
+-define(EXT_TRACE_WITH_ACTION_STOP(FuncName, Any, Attrs),
+    ?with_provider(
+        FuncName(?EXT_TRACE_STOP, Any, Attrs),
+        ok
     )
 ).
 
@@ -79,12 +77,15 @@
     )
 ).
 
+-type event_name() :: opentelemetry:event_name().
+
 -else.
 
 -define(EXT_TRACE_ANY(_FuncName, Any, _Attrs), Any).
 -define(EXT_TRACE_ADD_ATTRS(_Attrs), ok).
 -define(EXT_TRACE_ADD_ATTRS(_Attrs, _Ctx), ok).
--define(EXT_TRACE_WITH_ACTION(_FuncName, _TraceAction, Any, _Attrs), Any).
+-define(EXT_TRACE_WITH_ACTION_START(_FuncName, Any, _Attrs), Any).
+-define(EXT_TRACE_WITH_ACTION_STOP(_FuncName, Any, _Attrs), ok).
 -define(EXT_TRACE_WITH_PROCESS_FUN(_FuncName, Any, _Attrs, ProcessFun), ProcessFun(Any)).
 
 -endif.
@@ -94,6 +95,4 @@
 
 -type attrs() :: #{atom() => _}.
 
--type event_name() :: opentelemetry:event_name().
-
 -endif.

+ 2 - 0
apps/emqx/src/emqx_broker.erl

@@ -108,6 +108,8 @@
     bypass_hook => boolean()
 }.
 
+-elvis([{elvis_style, used_ignored_variable, disable}]).
+
 -spec start_link(atom(), pos_integer()) -> startlink_ret().
 start_link(Pool, Id) ->
     gen_server:start_link(

+ 4 - 8
apps/emqx/src/emqx_channel.erl

@@ -1267,36 +1267,32 @@ handle_out(publish, Publishes, Channel) ->
     {ok, ?REPLY_OUTGOING(Packets), NChannel};
 handle_out(puback, {PacketId, ReasonCode}, Channel) ->
     {ok,
-        ?EXT_TRACE_WITH_ACTION(
+        ?EXT_TRACE_WITH_ACTION_START(
             outgoing,
-            ?EXT_TRACE_START,
             ?PUBACK_PACKET(PacketId, ReasonCode),
             basic_trace_attrs(Channel)
         ),
         Channel};
 handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
     {ok,
-        ?EXT_TRACE_WITH_ACTION(
+        ?EXT_TRACE_WITH_ACTION_START(
             outgoing,
-            ?EXT_TRACE_START,
             ?PUBREC_PACKET(PacketId, ReasonCode),
             basic_trace_attrs(Channel)
         ),
         Channel};
 handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
     {ok,
-        ?EXT_TRACE_WITH_ACTION(
+        ?EXT_TRACE_WITH_ACTION_START(
             outgoing,
-            ?EXT_TRACE_START,
             ?PUBREL_PACKET(PacketId, ReasonCode),
             basic_trace_attrs(Channel)
         ),
         Channel};
 handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
     {ok,
-        ?EXT_TRACE_WITH_ACTION(
+        ?EXT_TRACE_WITH_ACTION_START(
             outgoing,
-            ?EXT_TRACE_START,
             ?PUBCOMP_PACKET(PacketId, ReasonCode),
             basic_trace_attrs(Channel)
         ),

+ 1 - 2
apps/emqx/src/emqx_connection.erl

@@ -848,9 +848,8 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
 
 handle_outgoing(Packets, State) ->
     Res = do_handle_outgoing(Packets, State),
-    ?EXT_TRACE_WITH_ACTION(
+    ?EXT_TRACE_WITH_ACTION_STOP(
         outgoing,
-        ?EXT_TRACE_STOP,
         Packets,
         _Attrs = #{}
     ),

+ 5 - 1
apps/emqx/src/emqx_external_trace.erl

@@ -18,6 +18,10 @@
 -include("emqx_external_trace.hrl").
 -include_lib("emqx_utils/include/emqx_message.hrl").
 
+%% Legacy
+-type channel_info() :: #{atom() => _}.
+-export_type([channel_info/0]).
+
 %% --------------------------------------------------------------------
 %% Trace in Rich mode callbacks
 
@@ -115,7 +119,7 @@
 
 -callback add_span_attrs(Attrs, Ctx) -> ok when
     Attrs :: attrs(),
-    Ctx :: otel_ctx:t().
+    Ctx :: map() | undefined.
 
 -optional_callbacks(
     [

+ 14 - 0
apps/emqx_opentelemetry/include/emqx_otel_trace.hrl

@@ -43,4 +43,18 @@
 -define(MSG_FORWARD_SPAN_NAME, 'message.forward').
 -define(MSG_HANDLE_FORWARD_SPAN_NAME, 'message.handle_forward').
 
+%% ====================
+%% OTEL sample whitelist/blacklist Table
+-define(EMQX_OTEL_SAMPLER, emqx_otel_sampler).
+
+-define(EMQX_OTEL_SAMPLE_CLIENTID, 1).
+-define(EMQX_OTEL_SAMPLE_TOPIC, 2).
+
+-record(?EMQX_OTEL_SAMPLER, {
+    type ::
+        {?EMQX_OTEL_SAMPLE_CLIENTID, binary()}
+        | {?EMQX_OTEL_SAMPLE_TOPIC, binary()},
+    extra :: map()
+}).
+
 -endif.

+ 16 - 1
apps/emqx_opentelemetry/src/emqx_otel_api.erl

@@ -116,6 +116,21 @@ otel_config_example() ->
         },
         traces => #{
             enable => true,
-            filter => #{trace_all => false}
+            filter => #{
+                trace_all => false,
+                trace_mode => legacy,
+                e2e_tracing_options => #{
+                    attribute_meta_value => "emqxcl",
+                    mqtt_publish_trace_level => basic,
+                    clientid_match_rules_max => 30,
+                    topic_match_rules_max => 30,
+                    sample_ratio => "10%",
+                    client_connect => true,
+                    client_disconnect => true,
+                    client_subscribe => true,
+                    client_unsubscribe => true,
+                    client_publish => true
+                }
+            }
         }
     }.

+ 36 - 44
apps/emqx_opentelemetry/src/emqx_otel_schema.erl

@@ -252,75 +252,65 @@ fields("trace_filter") ->
     ];
 fields("e2e_tracing_options") ->
     [
-        {attribute_meta,
+        {attribute_meta_value,
             ?HOCON(
                 string(),
                 #{
                     default => <<"emqxcl">>,
-                    desc => ?DESC(e2e_attribute_meta),
+                    desc => ?DESC(e2e_attribute_meta_value),
                     importance => ?IMPORTANCE_MEDIUM
                 }
             )},
-        {publish_response_trace_level,
+        %% TODO: Rename
+        {mqtt_publish_trace_level,
             ?HOCON(
-                emqx_schema:qos(),
+                ?ENUM([basic, first_ack, all]),
                 #{
-                    default => 0,
+                    default => basic,
                     desc => ?DESC(publish_response_trace_level),
                     importance => ?IMPORTANCE_MEDIUM
                 }
             )},
-        {samplers,
+        {clientid_match_rules_max,
             ?HOCON(
-                ?R_REF("e2e_samplers"),
-                #{
-                    desc => ?DESC(e2e_samplers),
-                    default => #{},
-                    importance => ?IMPORTANCE_MEDIUM
-                }
-            )}
-    ];
-fields("e2e_samplers") ->
-    [
-        {whitelist_based_sampler,
-            ?HOCON(
-                boolean(),
+                pos_integer(),
                 #{
-                    default => true,
-                    desc => ?DESC(whitelist_based_sampler),
+                    desc => ?DESC(clientid_match_rules_max),
+                    default => 30,
                     importance => ?IMPORTANCE_MEDIUM
                 }
             )},
-        {event_based_samplers,
+        {topic_match_rules_max,
             ?HOCON(
-                ?ARRAY(?R_REF("event_based_samplers")),
-                #{
-                    default => [],
-                    importance => ?IMPORTANCE_MEDIUM
-                }
-            )}
-    ];
-fields("event_based_samplers") ->
-    [
-        {name,
-            ?HOCON(
-                ?ENUM(root_span_names()),
+                pos_integer(),
                 #{
-                    required => ture,
-                    desc => ?DESC(event_type),
+                    desc => ?DESC(topic_match_rules_max),
+                    default => 30,
                     importance => ?IMPORTANCE_MEDIUM
                 }
             )},
-        {ratio,
+        {sample_ratio,
             ?HOCON(
                 emqx_schema:percent(),
                 #{
                     default => <<"10%">>,
-                    desc => ?DESC(ratio),
+                    desc => ?DESC(sample_ratio),
                     importance => ?IMPORTANCE_MEDIUM
                 }
             )}
-    ].
+    ] ++
+        [
+            {TraceEvent,
+                ?HOCON(
+                    boolean(),
+                    #{
+                        desc => ?DESC(TraceEvent),
+                        default => false,
+                        importance => ?IMPORTANCE_MEDIUM
+                    }
+                )}
+         || TraceEvent <- root_span_names()
+        ].
 
 desc("opentelemetry") ->
     ?DESC(opentelemetry);
@@ -334,16 +324,18 @@ desc("otel_traces") ->
     ?DESC(otel_traces);
 desc("trace_filter") ->
     ?DESC(trace_filter);
+desc("e2e_tracing_options") ->
+    ?DESC(e2e_tracing_options);
 desc(_) ->
     undefined.
 
 root_span_names() ->
     [
-        ?CLIENT_CONNECT_SPAN_NAME,
-        ?CLIENT_DISCONNECT_SPAN_NAME,
-        ?CLIENT_SUBSCRIBE_SPAN_NAME,
-        ?CLIENT_UNSUBSCRIBE_SPAN_NAME,
-        ?CLIENT_PUBLISH_SPAN_NAME
+        client_connect,
+        client_disconnect,
+        client_subscribe,
+        client_unsubscribe,
+        client_publish
     ].
 
 %% Compatibility with the previous schema that defined only metrics fields

+ 4 - 1
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -879,7 +879,10 @@ add_span_attrs(EmpytAttr) when map_size(EmpytAttr) =:= 0 ->
 add_span_attrs(Attrs) ->
     ?with_trace_mode(
         ok,
-        _ = ?set_attributes(Attrs)
+        begin
+            _ = ?set_attributes(Attrs),
+            ok
+        end
     ).
 
 add_span_attrs(Attrs, Ctx) ->

+ 161 - 161
apps/emqx_opentelemetry/src/sampler/emqx_otel_sampler.erl

@@ -23,35 +23,24 @@
 -include_lib("opentelemetry/include/otel_sampler.hrl").
 -include_lib("opentelemetry_api/include/opentelemetry.hrl").
 
--define(EMQX_OTEL_SAMPLER_RULE, emqx_otel_sampler_rule).
-
--define(EMQX_OTEL_SAMPLE_CLIENTID, 1).
--define(EMQX_OTEL_SAMPLE_USERNAME, 2).
--define(EMQX_OTEL_SAMPLE_TOPIC_NAME, 3).
--define(EMQX_OTEL_SAMPLE_TOPIC_MATCHING, 4).
-
--record(?EMQX_OTEL_SAMPLER_RULE, {
-    type ::
-        {?EMQX_OTEL_SAMPLE_CLIENTID, binary()}
-        | {?EMQX_OTEL_SAMPLE_USERNAME, binary()}
-        | {?EMQX_OTEL_SAMPLE_TOPIC_NAME, binary()}
-        | {?EMQX_OTEL_SAMPLE_TOPIC_MATCHING, binary()},
-    should_sample :: boolean(),
-    extra :: map()
-}).
+-define(META_KEY, 'emqx.meta').
 
 -export([
     init_tables/0,
-    store_rules/3,
+    store_rule/2,
+    store_rule/3,
     purge_rules/0,
+    purge_rules/1,
     get_rules/1,
-    delete_rules/1
+    get_rule/2,
+    delete_rule/2,
+    record_count/0
 ]).
 
 %% OpenTelemetry Sampler Callback
 -export([setup/1, description/1, should_sample/7]).
 
-%% 2^64 - 1
+%% 2^64 - 1 =:= (2#1 bsl 64 -1)
 -define(MAX_VALUE, 18446744073709551615).
 
 %%--------------------------------------------------------------------
@@ -61,16 +50,16 @@
 -spec create_tables() -> [mria:table()].
 create_tables() ->
     ok = mria:create_table(
-        ?EMQX_OTEL_SAMPLER_RULE,
+        ?EMQX_OTEL_SAMPLER,
         [
             {type, ordered_set},
             {storage, disc_copies},
             {local_content, true},
-            {record_name, ?EMQX_OTEL_SAMPLER_RULE},
-            {attributes, record_info(fields, ?EMQX_OTEL_SAMPLER_RULE)}
+            {record_name, ?EMQX_OTEL_SAMPLER},
+            {attributes, record_info(fields, ?EMQX_OTEL_SAMPLER)}
         ]
     ),
-    [?EMQX_OTEL_SAMPLER_RULE].
+    [?EMQX_OTEL_SAMPLER].
 
 %% Init
 -spec init_tables() -> ok.
@@ -78,68 +67,77 @@ init_tables() ->
     ok = mria:wait_for_tables(create_tables()).
 
 %% @doc Update sample rule
-%% -spec store_rules(who(), rules()) -> ok.
-store_rules({clientid, ClientId}, ShouldSample, Extra) ->
-    Record = #?EMQX_OTEL_SAMPLER_RULE{
+-spec store_rule(clientid | topic, binary()) -> ok.
+store_rule(clientid, ClientId) ->
+    store_rule(clientid, ClientId, #{});
+store_rule(topic, TopicName) ->
+    store_rule(topic, TopicName, #{}).
+
+-spec store_rule(clientid | topic, binary(), map()) -> ok.
+store_rule(clientid, ClientId, Extra) ->
+    Record = #?EMQX_OTEL_SAMPLER{
         type = {?EMQX_OTEL_SAMPLE_CLIENTID, ClientId},
-        should_sample = ShouldSample,
         extra = Extra
     },
     mria:dirty_write(Record);
-store_rules({username, Username}, ShouldSample, Extra) ->
-    Record = #?EMQX_OTEL_SAMPLER_RULE{
-        type = {?EMQX_OTEL_SAMPLE_USERNAME, Username},
-        should_sample = ShouldSample,
-        extra = Extra
-    },
-    mria:dirty_write(Record);
-store_rules({topic_name, TopicName}, ShouldSample, Extra) ->
-    Record = #?EMQX_OTEL_SAMPLER_RULE{
-        type = {?EMQX_OTEL_SAMPLE_TOPIC_NAME, TopicName},
-        should_sample = ShouldSample,
-        extra = Extra
-    },
-    mria:dirty_write(Record);
-store_rules({topic_matching, TopicFilter}, ShouldSample, Extra) ->
-    Record = #?EMQX_OTEL_SAMPLER_RULE{
-        type = {?EMQX_OTEL_SAMPLE_TOPIC_MATCHING, TopicFilter},
-        should_sample = ShouldSample,
+store_rule(topic, TopicName, Extra) ->
+    Record = #?EMQX_OTEL_SAMPLER{
+        type = {?EMQX_OTEL_SAMPLE_TOPIC, TopicName},
         extra = Extra
     },
     mria:dirty_write(Record).
 
 -spec purge_rules() -> ok.
 purge_rules() ->
-    ok = lists:foreach(
-        fun(Key) ->
-            ok = mria:dirty_delete(?EMQX_OTEL_SAMPLER_RULE, Key)
-        end,
-        mnesia:dirty_all_keys(?EMQX_OTEL_SAMPLER_RULE)
-    ).
-
-get_rules({clientid, ClientId}) ->
-    do_get_rules({?EMQX_OTEL_SAMPLE_CLIENTID, ClientId});
-get_rules({username, Username}) ->
-    do_get_rules({?EMQX_OTEL_SAMPLE_USERNAME, Username});
-get_rules({topic_name, TopicName}) ->
-    do_get_rules({?EMQX_OTEL_SAMPLE_TOPIC_NAME, TopicName});
-get_rules({topic_matching, TopicFilter}) ->
-    do_get_rules({?EMQX_OTEL_SAMPLE_TOPIC_MATCHING, TopicFilter}).
-
-do_get_rules(Key) ->
-    case mnesia:dirty_read(?EMQX_OTEL_SAMPLER_RULE, Key) of
-        [#?EMQX_OTEL_SAMPLER_RULE{should_sample = ShouldSample}] -> {ok, ShouldSample};
+    do_purge_rules(purge_func('_')).
+
+-spec purge_rules(clientid | topic) -> ok.
+purge_rules(clientid) ->
+    do_purge_rules(purge_func(?EMQX_OTEL_SAMPLE_CLIENTID));
+purge_rules(topic) ->
+    do_purge_rules(purge_func(?EMQX_OTEL_SAMPLE_TOPIC)).
+
+do_purge_rules(Func) when is_function(Func) ->
+    ok = lists:foreach(Func, mnesia:dirty_all_keys(?EMQX_OTEL_SAMPLER)).
+
+purge_func(K) ->
+    fun
+        ({I, _} = Key) when I =:= K ->
+            ok = mria:dirty_delete(?EMQX_OTEL_SAMPLER, Key);
+        (_) ->
+            ok
+    end.
+
+get_rules(clientid) ->
+    read_rules(?EMQX_OTEL_SAMPLE_CLIENTID);
+get_rules(topic) ->
+    read_rules(?EMQX_OTEL_SAMPLE_TOPIC).
+
+read_rules(K) ->
+    mnesia:dirty_match_object(#?EMQX_OTEL_SAMPLER{
+        type = {K, '_'},
+        _ = '_'
+    }).
+
+get_rule(clientid, ClientId) ->
+    do_get_rule({?EMQX_OTEL_SAMPLE_CLIENTID, ClientId});
+get_rule(topic, Topic) ->
+    do_get_rule({?EMQX_OTEL_SAMPLE_TOPIC, Topic}).
+
+do_get_rule(Key) ->
+    case mnesia:dirty_read(?EMQX_OTEL_SAMPLER, Key) of
+        [#?EMQX_OTEL_SAMPLER{} = R] -> {ok, R};
         [] -> not_found
     end.
 
-delete_rules({clientid, ClientId}) ->
-    mria:dirty_delete(?EMQX_OTEL_SAMPLER_RULE, {?EMQX_OTEL_SAMPLE_CLIENTID, ClientId});
-delete_rules({username, Username}) ->
-    mria:dirty_delete(?EMQX_OTEL_SAMPLER_RULE, {?EMQX_OTEL_SAMPLE_USERNAME, Username});
-delete_rules({topic_name, TopicName}) ->
-    mria:dirty_delete(?EMQX_OTEL_SAMPLER_RULE, {?EMQX_OTEL_SAMPLE_TOPIC_NAME, TopicName});
-delete_rules({topic_matching, TopicFilter}) ->
-    mria:dirty_delete(?EMQX_OTEL_SAMPLER_RULE, {?EMQX_OTEL_SAMPLE_TOPIC_MATCHING, TopicFilter}).
+delete_rule(clientid, ClientId) ->
+    mria:dirty_delete(?EMQX_OTEL_SAMPLER, {?EMQX_OTEL_SAMPLE_CLIENTID, ClientId});
+delete_rule(topic, Topic) ->
+    mria:dirty_delete(?EMQX_OTEL_SAMPLER, {?EMQX_OTEL_SAMPLE_TOPIC, Topic}).
+
+-spec record_count() -> non_neg_integer().
+record_count() ->
+    mnesia:table_info(?EMQX_OTEL_SAMPLER, size).
 
 %%--------------------------------------------------------------------
 %% OpenTelemetry Sampler Callback
@@ -147,42 +145,46 @@ delete_rules({topic_matching, TopicFilter}) ->
 
 setup(
     #{
-        attribute_meta := MetaValue,
-        samplers :=
-            #{
-                event_based_samplers := EventBasedSamplers,
-                whitelist_based_sampler := WhiteListEnabled
-            },
-        publish_response_trace_level := QoS
-    } = _Opts
+        mqtt_publish_trace_level := Level,
+        sample_ratio := Ratio
+    } = InitOpts
 ) ->
-    EventBasedRatio = lists:foldl(
-        %% Name might not appears
-        fun(#{name := Name, ratio := Ratio}, AccIn) ->
-            case Ratio of
-                R when R =:= +0.0 ->
-                    AccIn#{Name => #{ratio => 0, id_upper => 0}};
-                R when R =:= 1.0 ->
-                    AccIn#{Name => #{ratio => 1, id_upper => ?MAX_VALUE}};
-                R when R >= 0.0 andalso R =< 1.0 ->
-                    IdUpperBound = R * ?MAX_VALUE,
-                    AccIn#{Name => #{ratio => R, id_upper => IdUpperBound}}
-            end
+    IdUpper =
+        case Ratio of
+            R when R =:= +0.0 ->
+                0;
+            R when R =:= 1.0 ->
+                ?MAX_VALUE;
+            R when R >= 0.0 andalso R =< 1.0 ->
+                trunc(R * ?MAX_VALUE)
         end,
-        #{},
-        EventBasedSamplers
-    ),
-    Config = #{
-        event_based_samplers => EventBasedRatio,
-        whitelist_based_sampler => WhiteListEnabled,
-        publish_response_trace_level => QoS,
-        attribute_meta => MetaValue
+
+    Opts = (maps:with(
+        [
+            client_connect,
+            client_disconnect,
+            client_subscribe,
+            client_unsubscribe,
+            client_publish,
+            attribute_meta_value
+        ],
+        InitOpts
+    ))#{
+        response_trace_qos => level_to_qos(Level),
+        id_upper => IdUpper
     },
+
     ?SLOG(debug, #{
         msg => "emqx_otel_sampler_setup",
-        config => Config
+        opts => Opts
     }),
-    Config.
+
+    Opts.
+
+-compile({inline, [level_to_qos/1]}).
+level_to_qos(basic) -> ?QOS_0;
+level_to_qos(first_ack) -> ?QOS_1;
+level_to_qos(all) -> ?QOS_2.
 
 %% TODO: description
 description(_Opts) ->
@@ -197,10 +199,8 @@ should_sample(
     _SpanKind,
     Attributes,
     #{
-        whitelist_based_sampler := WhiteListEnabled,
-        event_based_samplers := EventBasedRatio,
-        attribute_meta := MetaValue
-    } = _Opts
+        attribute_meta_value := MetaValue
+    } = Opts
 ) when
     SpanName =:= ?CLIENT_CONNECT_SPAN_NAME orelse
         SpanName =:= ?CLIENT_DISCONNECT_SPAN_NAME orelse
@@ -209,11 +209,11 @@ should_sample(
         SpanName =:= ?CLIENT_PUBLISH_SPAN_NAME
 ->
     Desicion =
-        decide_by_whitelist(WhiteListEnabled, Attributes) orelse
-            decide_by_traceid_ratio(TraceId, SpanName, EventBasedRatio),
+        decide_by_match_rule(Attributes, Opts) orelse
+            decide_by_traceid_ratio(TraceId, SpanName, Opts),
     {
         decide(Desicion),
-        #{attribute_meta => MetaValue},
+        #{?META_KEY => MetaValue},
         otel_span:tracestate(otel_tracer:current_span_ctx(Ctx))
     };
 %% None Root Span, decide by Parent or Publish Response Tracing Level
@@ -224,98 +224,98 @@ should_sample(
     SpanName,
     _SpanKind,
     _Attributes,
-    #{publish_response_trace_level := QoS, attribute_meta := MetaValue} = _Opts
+    #{
+        response_trace_qos := QoS,
+        attribute_meta_value := MetaValue
+    } = _Opts
 ) ->
     Desicion =
         parent_sampled(otel_tracer:current_span_ctx(Ctx)) andalso
             match_by_span_name(SpanName, QoS),
     {
         decide(Desicion),
-        #{attribute_meta => MetaValue},
+        #{?META_KEY => MetaValue},
         otel_span:tracestate(otel_tracer:current_span_ctx(Ctx))
     }.
 
 -compile({inline, [match_by_span_name/2]}).
-match_by_span_name(?BROKER_PUBACK_SPAN_NAME, L) -> ?QOS_1 =< L;
-match_by_span_name(?CLIENT_PUBACK_SPAN_NAME, L) -> ?QOS_1 =< L;
-match_by_span_name(?BROKER_PUBREC_SPAN_NAME, L) -> ?QOS_1 =< L;
-match_by_span_name(?CLIENT_PUBREC_SPAN_NAME, L) -> ?QOS_1 =< L;
-match_by_span_name(?BROKER_PUBREL_SPAN_NAME, L) -> ?QOS_2 =< L;
-match_by_span_name(?CLIENT_PUBREL_SPAN_NAME, L) -> ?QOS_2 =< L;
-match_by_span_name(?BROKER_PUBCOMP_SPAN_NAME, L) -> ?QOS_2 =< L;
-match_by_span_name(?CLIENT_PUBCOMP_SPAN_NAME, L) -> ?QOS_2 =< L;
-%% other spans, always sample
+match_by_span_name(?BROKER_PUBACK_SPAN_NAME, TraceQoS) -> ?QOS_1 =< TraceQoS;
+match_by_span_name(?CLIENT_PUBACK_SPAN_NAME, TraceQoS) -> ?QOS_1 =< TraceQoS;
+match_by_span_name(?BROKER_PUBREC_SPAN_NAME, TraceQoS) -> ?QOS_1 =< TraceQoS;
+match_by_span_name(?CLIENT_PUBREC_SPAN_NAME, TraceQoS) -> ?QOS_1 =< TraceQoS;
+match_by_span_name(?BROKER_PUBREL_SPAN_NAME, TraceQoS) -> ?QOS_2 =< TraceQoS;
+match_by_span_name(?CLIENT_PUBREL_SPAN_NAME, TraceQoS) -> ?QOS_2 =< TraceQoS;
+match_by_span_name(?BROKER_PUBCOMP_SPAN_NAME, TraceQoS) -> ?QOS_2 =< TraceQoS;
+match_by_span_name(?CLIENT_PUBCOMP_SPAN_NAME, TraceQoS) -> ?QOS_2 =< TraceQoS;
+%% other sub spans, sample by parent, set mask as true
 match_by_span_name(_, _) -> true.
 
-decide_by_whitelist(true, Attributes) ->
-    sample_by_clientid(Attributes) orelse
-        sample_by_message_from(Attributes) orelse
-        sample_by_username(Attributes) orelse
-        sample_by_topic_name(Attributes) orelse
-        sample_by_topic_filter(Attributes);
-decide_by_whitelist(false, _Attributes) ->
-    false.
+decide_by_match_rule(Attributes, _) ->
+    by_clientid(Attributes) orelse
+        by_message_from(Attributes) orelse
+        %% FIXME: external topic filters for AUTHZ
+        by_topic(Attributes).
 
-decide_by_traceid_ratio(TraceId, SpanName, _EventBasedRatio) ->
-    case _EventBasedRatio of
-        #{SpanName := #{id_upper := IdUpperBound}} ->
+decide_by_traceid_ratio(_, _, #{id_upper := ?MAX_VALUE}) ->
+    true;
+decide_by_traceid_ratio(TraceId, SpanName, #{id_upper := IdUpperBound} = Opts) ->
+    case maps:get(span_name_to_config_key(SpanName), Opts, false) of
+        true ->
             Lower64Bits = TraceId band ?MAX_VALUE,
-            %% XXX: really need abs?
-            erlang:abs(Lower64Bits) =< IdUpperBound;
+            Lower64Bits =< IdUpperBound;
         _ ->
             %% not configured, always dropped.
             false
     end.
 
-sample_by_clientid(#{'client.clientid' := ClientId}) ->
+span_name_to_config_key(?CLIENT_CONNECT_SPAN_NAME) ->
+    client_connect;
+span_name_to_config_key(?CLIENT_DISCONNECT_SPAN_NAME) ->
+    client_disconnect;
+span_name_to_config_key(?CLIENT_SUBSCRIBE_SPAN_NAME) ->
+    client_subscribe;
+span_name_to_config_key(?CLIENT_UNSUBSCRIBE_SPAN_NAME) ->
+    client_unsubscribe;
+span_name_to_config_key(?CLIENT_PUBLISH_SPAN_NAME) ->
+    client_publish.
+
+by_clientid(#{'client.clientid' := ClientId}) ->
     read_should_sample({?EMQX_OTEL_SAMPLE_CLIENTID, ClientId});
-sample_by_clientid(_) ->
+by_clientid(_) ->
     false.
 
-sample_by_message_from(#{'message.from' := ClientId}) ->
+by_message_from(#{'message.from' := ClientId}) ->
     read_should_sample({?EMQX_OTEL_SAMPLE_CLIENTID, ClientId});
-sample_by_message_from(_) ->
-    false.
-
-sample_by_username(#{'client.username' := Username}) ->
-    read_should_sample({?EMQX_OTEL_SAMPLE_USERNAME, Username});
-sample_by_username(_) ->
-    false.
-
-sample_by_topic_name(#{'message.topic' := TopicName}) ->
-    read_should_sample({?EMQX_OTEL_SAMPLE_TOPIC_NAME, TopicName});
-sample_by_topic_name(_) ->
+by_message_from(_) ->
     false.
 
-sample_by_topic_filter(#{'message.topic' := TopicName}) ->
+-dialyzer({nowarn_function, by_topic/1}).
+by_topic(#{'message.topic' := Topic}) ->
     case
-        mnesia:dirty_match_object(#?EMQX_OTEL_SAMPLER_RULE{
-            type = {?EMQX_OTEL_SAMPLE_TOPIC_MATCHING, '_'},
-            should_sample = '_',
-            extra = '_'
+        mnesia:dirty_match_object(#?EMQX_OTEL_SAMPLER{
+            type = {?EMQX_OTEL_SAMPLE_TOPIC, '_'},
+            _ = '_'
         })
     of
         [] ->
             false;
         Rules ->
             lists:any(
-                fun(Rule) -> match_topic_filter(TopicName, Rule) end, Rules
+                fun(Rule) -> match_topic_filter(Topic, Rule) end, Rules
             )
     end;
-sample_by_topic_filter(_) ->
+by_topic(_) ->
     false.
 
 read_should_sample(Key) ->
-    case mnesia:dirty_read(?EMQX_OTEL_SAMPLER_RULE, Key) of
+    case mnesia:dirty_read(?EMQX_OTEL_SAMPLER, Key) of
         [] -> false;
-        [#?EMQX_OTEL_SAMPLER_RULE{should_sample = ShouldSample}] -> ShouldSample
+        [#?EMQX_OTEL_SAMPLER{}] -> true
     end.
 
-match_topic_filter(TopicName, #?EMQX_OTEL_SAMPLER_RULE{
-    type = {?EMQX_OTEL_SAMPLE_TOPIC_MATCHING, TopicFilter},
-    should_sample = ShouldSample
-}) ->
-    emqx_topic:match(TopicName, TopicFilter) andalso ShouldSample.
+-dialyzer({nowarn_function, match_topic_filter/2}).
+match_topic_filter(AttrTopic, #?EMQX_OTEL_SAMPLER{type = {?EMQX_OTEL_SAMPLE_TOPIC, Topic}}) ->
+    emqx_topic:match(AttrTopic, Topic).
 
 -compile({inline, [parent_sampled/1]}).
 parent_sampled(#span_ctx{trace_flags = TraceFlags}) when

+ 203 - 0
apps/emqx_opentelemetry/src/sampler/emqx_otel_sampler_api.erl

@@ -0,0 +1,203 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_otel_sampler_api).
+
+-behaviour(minirest_api).
+
+-include("emqx_otel_trace.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/1, array/1]).
+
+-export([
+    namespace/0,
+    api_spec/0,
+    paths/0,
+    schema/1,
+    fields/1
+]).
+
+%% operation functions
+-export([
+    whitelist/2
+]).
+
+-define(API_TAGS, [<<"Opentelemetry">>]).
+-define(BAD_REQUEST, 'BAD_REQUEST').
+-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
+
+-define(RESP_INTERNAL_ERROR(MSG), {500, #{code => ?INTERNAL_ERROR, message => MSG}}).
+
+namespace() -> undefined.
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [
+        "/opentelemetry/whitelist/:type"
+    ].
+
+schema("/opentelemetry/whitelist/:type") ->
+    #{
+        'operationId' => whitelist,
+        get => #{
+            tags => ?API_TAGS,
+            description => ?DESC(sample_white_list_get),
+            parameters => [ref(white_list_type)],
+            responses =>
+                #{
+                    200 => swagger_with_example()
+                }
+        },
+        post => #{
+            tags => ?API_TAGS,
+            description => ?DESC(sample_white_list_post),
+            parameters => [ref(white_list_type)],
+            'requestBody' => swagger_with_example(),
+            responses =>
+                #{
+                    204 => <<"Created">>,
+                    500 => emqx_dashboard_swagger:error_codes(
+                        [?INTERNAL_ERROR], <<"Internal Service Error">>
+                    )
+                }
+        },
+        delete =>
+            #{
+                tags => ?API_TAGS,
+                description => ?DESC(sample_white_list_delete),
+                parameters => [ref(white_list_type)],
+                responses =>
+                    #{
+                        204 => <<"Deleted">>,
+                        500 => emqx_dashboard_swagger:error_codes(
+                            [?INTERNAL_ERROR], <<"Internal Service Error">>
+                        )
+                    }
+            }
+    }.
+
+fields(clientid) ->
+    [
+        {clientid,
+            mk(
+                binary(),
+                #{
+                    desc => ?DESC(clientid),
+                    example => <<"client1">>
+                }
+            )}
+    ];
+fields(topic) ->
+    [
+        {topic,
+            mk(
+                binary(),
+                #{
+                    desc => ?DESC(topic),
+                    example => <<"topic/#">>
+                }
+            )}
+    ];
+fields(white_list_type) ->
+    [
+        {type,
+            mk(
+                enum([clientid, topic]),
+                #{
+                    in => path,
+                    required => true,
+                    desc => ?DESC(white_list_type),
+                    example => <<"clientid">>
+                }
+            )}
+    ].
+%%--------------------------------------------------------------------
+%% HTTP API
+%%--------------------------------------------------------------------
+
+whitelist(get, #{bindings := #{type := Type}}) ->
+    Res = [
+        V
+     || #?EMQX_OTEL_SAMPLER{type = {_, V}} <- emqx_otel_sampler:get_rules(Type)
+    ],
+    {200, Res};
+whitelist(post, #{bindings := #{type := Type}, body := Body}) when is_list(Body) ->
+    case ensure_rules_is_valid(Type, Body) of
+        ok ->
+            %% Purge all old data and store new
+            ok = emqx_otel_sampler:purge_rules(Type),
+            ok = lists:foreach(fun(Val) -> store_rules(Type, Val) end, Body),
+            {204};
+        {error, too_many_rules} ->
+            {400, #{
+                code => ?BAD_REQUEST,
+                message =>
+                    binfmt(
+                        <<"The rules length for '~ts' exceeds the maximum limit.">>, [Type]
+                    )
+            }}
+    end;
+whitelist(delete, #{bindings := #{type := Type}}) ->
+    try emqx_otel_sampler:purge_rules(Type) of
+        ok -> {204}
+    catch
+        _:Error ->
+            ?RESP_INTERNAL_ERROR(emqx_utils:readable_error_msg(Error))
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal Helpers
+%%--------------------------------------------------------------------
+
+swagger_with_example() ->
+    emqx_dashboard_swagger:schema_with_examples(
+        array(binary()),
+        #{
+            clientid_white_list => #{
+                summary => <<"ClientId White list">>,
+                value => [<<"clientid">>, <<"clientid2">>]
+            },
+
+            topic_white_list => #{
+                summary => <<"Topic White list">>,
+                value => [<<"topic/#">>, <<"topic/2">>]
+            }
+        }
+    ).
+
+ensure_rules_is_valid(Type, Rules) ->
+    case length(Rules) =< rules_max(Type) of
+        true -> ok;
+        false -> {error, too_many_rules}
+    end.
+
+store_rules(clientid, ClientID) ->
+    emqx_otel_sampler:store_rule(clientid, ClientID);
+store_rules(topic, Topic) ->
+    emqx_otel_sampler:store_rule(topic, Topic).
+
+rules_max(clientid) -> get_rule_max(clientid_match_rules_max);
+rules_max(topic) -> get_rule_max(topic_match_rules_max).
+
+-define(RULE_MAX_CONFIG_KEY_PREFIX, [opentelemetry, traces, filter, e2e_tracing_options]).
+get_rule_max(Key) ->
+    emqx:get_config(?RULE_MAX_CONFIG_KEY_PREFIX ++ [Key]).
+
+binfmt(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).

+ 33 - 0
rel/i18n/emqx_otel_sampler_api.hocon

@@ -0,0 +1,33 @@
+emqx_otel_sampler_api {
+
+clientid.desc:
+"""Client ID"""
+clientid.label:
+"""Client ID"""
+
+topic.desc:
+"""TopicName or TopicFilter"""
+topic.label:
+"""Topic"""
+
+sample_white_list_get.desc:
+"""Show the list of sampler white list for specific type"""
+sample_white_list_get.label:
+"""Show white list"""
+
+sample_white_list_post.desc:
+"""Update white list for specific type"""
+sample_white_list_post.label:
+"""Update white list"""
+
+sample_white_list_delete.desc:
+"""Purge white list for specific type"""
+sample_white_list_delete.label:
+"""Purge white list"""
+
+white_list_type.desc:
+"""Type of white list. Can be `clientid`, `topic`"""
+white_list_type.label:
+"""White list type"""
+
+}

+ 37 - 35
rel/i18n/emqx_otel_schema.hocon

@@ -55,52 +55,54 @@ Note: this config only takes effect when <code>trace_mode</code> is set to <code
 trace_all.label: "Trace All"
 
 trace_mode.desc:
-"""Opentelemetry tracing mode.<br/>
+"""OpenTelemetry tracing mode.<br/>
 - `legacy`: follow the old tracing method, only trace message publishing and delivery.<br/>
   Span Name will remain compatible with the old version.
 - `e2e`: end-to-end tracing mode. All kinds of client behaviors will be traced:<br/>
-  Connection/Disconnection/Subscription/Unsubscription/Message Publishing/Message delivery.<br/>
-  More control options and sampling functions are controlled by the `e2e_tracing_options` sub-configuration item<br/>"""
+  Connect/Disconnect/Subscribe/Unsubscribe/Message Publish/Message Delivery.<br/>
+  More control options and sampling functions are controlled by the `e2e_tracing_options` sub-configuration item"""
 trace_mode.label: "Trace Mode"
 
 e2e_tracing_options.desc: "End-to-end tracing options"
 e2e_tracing_options.label: "End-to-End Tracing Options"
 
-e2e_attribute_meta.desc:
-"""Simple attribute meta value added into Span's Attributes.</br/>
-Typically set a simple and easily recognizable name or use the cluster name to identify different EMQX clusters."""
-e2e_attribute_meta.label: "Attribute Meta Value"
+e2e_attribute_meta_value.desc:
+"""A meta value added into Span's Attribute. The attribute key will be <code>emqx.meta</code></br/>
+Typically, set a simple and easily recognizable name or use the cluster name to identify different EMQX clusters."""
+e2e_attribute_meta_value.label: "Attribute Meta Value"
 
-publish_response_trace_level.desc:
+## TODO: Rename
+mqtt_publish_trace_level.desc:
 """Trace level for all message exchanges during the message publishing process.<br/>
 Note: this config only takes effect when <code>sample</code> is set to <code>false</code>.
-- `0`: Only `PUBLISH` packets are sampled for all QoS level (both QoS0, QoS1, QoS2).
-- `1`: In addition to `PUBLISH` packets for all QoS. PUBACK and PUBREC are also sampled.
+- `basic`: Only `PUBLISH` packets are sampled for all QoS level (both QoS0, QoS1, QoS2).
+- `first_ack`: In addition to `PUBLISH` packets for all QoS. `PUBACK` and `PUBREC` are also sampled.
    That is, the first response packet in the QoS1 or QoS2 message interaction.
-- `2`: Both `PUBLISH` packets and all response packets `PUBACK`, `PUBREC`, `PUBREL`, `PUBCOMP` will be sampled."""
-publish_response_trace_level.label: "Publish Trace Level"
-
-e2e_samplers.desc: """End-to-end Tracing Samplers."""
-e2e_samplers.label: "Samplers"
-
-whitelist_based_sampler.desc:
-"""Whitelist-based Sampler. All Root Spans matched the whitelist will be sampled."""
-whitelist_based_samplers.label: "Whitelist Based Sampler"
-
-event_based_samplers.desc:
-"""Event-based Samplers. The Spans that not matched the whitelist will be sampled based on the event type and the ratio setting."""
-event_based_samplers.label: "Event Based Samplers"
-
-event_type.desc:
-"""Tracing event type.<br/>
-- `"client.connect"`: Trace client Connect.<br/>
-- `"client.disconnect"`: Trace client Disconnect.<br/>
-- `"client.subscribe"`: Trace client Subscribe.<br/>
-- `"client.unsubscribe"`: Trace client Unsubscribe.<br/>
-- `"client.publish"`: Trace Client publishing messages. The message-delivering to subscribers will also be sampled as a part of the message publishing process."""
-event_type.label: "Event Type"
-
-ratio.desc: "Sampling ratio for the event type."
-ratio.label: "Event based Sampling Ratio"
+- `all`: Both `PUBLISH` packets and all response packets `PUBACK`, `PUBREC`, `PUBREL`, `PUBCOMP` will be sampled."""
+mqtt_publish_trace_level.label: "Publish Trace Level"
+
+client_connect.desc: """In addition to the given rules, whether to sample Client Connect event."""
+client_connect.label: "Client Connect"
+
+client_disconnect.desc: """In addition to the given rules, whether to sample Client Disconnect event."""
+client_disconnect.label: "Client Disconnect"
+
+client_subscribe.desc: """In addition to the given rules, whether to sample Client Subscribe event."""
+client_subscribe.label: "Client Subscribe"
+
+client_unsubscribe.desc: """In addition to the given rules, whether to sample Client Unsubscribe event."""
+client_unsubscribe.label: "Client Unsubscribe"
+
+client_publish.desc: """In addition to the given rules, whether to sample Client Publish event."""
+client_publish.label: "Client Publish"
+
+clientid_match_rules_max.desc: """Maximum length of the rule list based on clientid matching."""
+clientid_match_rules_max.label: "Client ID Match Rules Max"
+
+topic_match_rules_max.desc: """Maximum length of the rule list based on topic matching."""
+topic_match_rules_max.label: "Topic Match Rules Max"
+
+sample_ratio.desc: "Sampling ratio for the event types."
+sample_ratio.label: "Sampling Ratio"
 
 }

+ 2 - 0
scripts/spellcheck/dicts/emqx.txt

@@ -317,3 +317,5 @@ aliyun
 OID
 PKCE
 Datalayers
+OpenTelemetry
+opentelemetry