Просмотр исходного кода

Merge pull request #11984 from SergeTupchiy/EMQX-10535-openetelmetry-tracing-new

Integrate OpenTelmetry tracing
SergeTupchiy 2 лет назад
Родитель
Сommit
28ff53e99c
33 измененных файлов с 1715 добавлено и 179 удалено
  1. 69 0
      .ci/docker-compose-file/docker-compose-otel.yaml
  2. 6 0
      .ci/docker-compose-file/otel/.gitignore
  3. 52 0
      .ci/docker-compose-file/otel/otel-collector-config-tls.yaml
  4. 51 0
      .ci/docker-compose-file/otel/otel-collector-config.yaml
  5. 18 3
      apps/emqx/src/emqx_channel.erl
  6. 7 2
      apps/emqx/src/emqx_connection.erl
  7. 117 0
      apps/emqx/src/emqx_external_trace.erl
  8. 2 0
      apps/emqx/src/emqx_frame.erl
  9. 9 3
      apps/emqx/src/emqx_message.erl
  10. 7 1
      apps/emqx/src/emqx_packet.erl
  11. 2 2
      apps/emqx/test/emqx_message_SUITE.erl
  12. 1 6
      apps/emqx_machine/priv/reboot_lists.eterm
  13. 1 0
      apps/emqx_machine/src/emqx_machine.erl
  14. 1 3
      apps/emqx_machine/src/emqx_machine_boot.erl
  15. 1 0
      apps/emqx_opentelemetry/docker-ct
  16. 11 3
      apps/emqx_opentelemetry/rebar.config
  17. 6 1
      apps/emqx_opentelemetry/src/emqx_opentelemetry.app.src
  18. 8 13
      apps/emqx_opentelemetry/src/emqx_otel_api.erl
  19. 12 3
      apps/emqx_opentelemetry/src/emqx_otel_app.erl
  20. 38 37
      apps/emqx_opentelemetry/src/emqx_otel_config.erl
  21. 8 9
      apps/emqx_opentelemetry/src/emqx_otel_metrics.erl
  22. 108 47
      apps/emqx_opentelemetry/src/emqx_otel_schema.erl
  23. 3 3
      apps/emqx_opentelemetry/src/emqx_otel_sup.erl
  24. 272 0
      apps/emqx_opentelemetry/src/emqx_otel_trace.erl
  25. 252 0
      apps/emqx_opentelemetry/test/emqx_otel_api_SUITE.erl
  26. 201 0
      apps/emqx_opentelemetry/test/emqx_otel_schema_SUITE.erl
  27. 431 0
      apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl
  28. 2 2
      apps/emqx_utils/include/emqx_message.hrl
  29. 1 0
      changes/ce/feat-11984.en.md
  30. 1 31
      mix.exs
  31. 0 8
      rebar.config
  32. 14 2
      rel/i18n/emqx_otel_schema.hocon
  33. 3 0
      scripts/ct/run.sh

+ 69 - 0
.ci/docker-compose-file/docker-compose-otel.yaml

@@ -0,0 +1,69 @@
+version: '3.9'
+
+services:
+  jaeger-all-in-one:
+    image: jaegertracing/all-in-one:1.51.0
+    container_name: jaeger.emqx.net
+    hostname: jaeger.emqx.net
+    networks:
+      - emqx_bridge
+    restart: always
+#    ports:
+#      - "16686:16686"
+    user: "${DOCKER_USER:-root}"
+
+  # Collector
+  otel-collector:
+    image: otel/opentelemetry-collector:0.90.0
+    container_name: otel-collector.emqx.net
+    hostname: otel-collector.emqx.net
+    networks:
+      - emqx_bridge
+    restart: always
+    command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"]
+    volumes:
+      - ./otel:/etc/
+#    ports:
+#      - "1888:1888"   # pprof extension
+#      - "8888:8888"   # Prometheus metrics exposed by the collector
+#      - "8889:8889"   # Prometheus exporter metrics
+#      - "13133:13133" # health_check extension
+#      - "4317:4317"   # OTLP gRPC receiver
+#      - "4318:4318"   # OTLP http receiver
+#      - "55679:55679" # zpages extension
+    depends_on:
+      - jaeger-all-in-one
+    user: "${DOCKER_USER:-root}"
+
+
+# Collector
+  otel-collector-tls:
+    image: otel/opentelemetry-collector:0.90.0
+    container_name: otel-collector-tls.emqx.net
+    hostname: otel-collector-tls.emqx.net
+    networks:
+      - emqx_bridge
+    restart: always
+    command: ["--config=/etc/otel-collector-config-tls.yaml", "${OTELCOL_ARGS}"]
+    volumes:
+      - ./otel:/etc/
+      - ./certs:/etc/certs
+ #   ports:
+ #     - "14317:4317"   # OTLP gRPC receiver
+    depends_on:
+      - jaeger-all-in-one
+    user: "${DOCKER_USER:-root}"
+
+#networks:
+#  emqx_bridge:
+#    driver: bridge
+#    name: emqx_bridge
+#    enable_ipv6: true
+#    ipam:
+#      driver: default
+#      config:
+#        - subnet: 172.100.239.0/24
+#          gateway: 172.100.239.1
+#        - subnet: 2001:3200:3200::/64
+#          gateway: 2001:3200:3200::1
+#

+ 6 - 0
.ci/docker-compose-file/otel/.gitignore

@@ -0,0 +1,6 @@
+certs
+hostname
+hosts
+otel-collector.json
+otel-collector-tls.json
+resolv.conf

+ 52 - 0
.ci/docker-compose-file/otel/otel-collector-config-tls.yaml

@@ -0,0 +1,52 @@
+receivers:
+  otlp:
+    protocols:
+      grpc:
+        tls:
+          ca_file: /etc/certs/ca.crt
+          cert_file: /etc/certs/server.crt
+          key_file: /etc/certs/server.key
+      http:
+        tls:
+          ca_file: /etc/certs/ca.crt
+          cert_file: /etc/certs/server.crt
+          key_file: /etc/certs/server.key
+
+exporters:
+  logging:
+    verbosity: detailed
+  otlp:
+    endpoint: jaeger.emqx.net:4317
+    tls:
+      insecure: true
+  debug:
+    verbosity: detailed
+  file:
+    path: /etc/otel-collector-tls.json
+
+
+processors:
+  batch:
+    # send data immediately
+    timeout: 0
+
+extensions:
+  health_check:
+  zpages:
+    endpoint: :55679
+
+service:
+  extensions: [zpages, health_check]
+  pipelines:
+    traces:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [logging, otlp]
+    metrics:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [logging]
+    logs:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [logging, file]

+ 51 - 0
.ci/docker-compose-file/otel/otel-collector-config.yaml

@@ -0,0 +1,51 @@
+receivers:
+  otlp:
+    protocols:
+      grpc:
+        tls:
+#          ca_file: /etc/ca.pem
+#          cert_file: /etc/server.pem
+#          key_file: /etc/server.key
+      http:
+        tls:
+#          ca_file: /etc/ca.pem
+#          cert_file: /etc/server.pem
+#          key_file: /etc/server.key
+
+exporters:
+  logging:
+    verbosity: detailed
+  otlp:
+    endpoint: jaeger.emqx.net:4317
+    tls:
+      insecure: true
+  debug:
+    verbosity: detailed
+  file:
+    path: /etc/otel-collector.json
+
+processors:
+  batch:
+    # send data immediately
+    timeout: 0
+
+extensions:
+  health_check:
+  zpages:
+    endpoint: :55679
+
+service:
+  extensions: [zpages, health_check]
+  pipelines:
+    traces:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [logging, otlp]
+    metrics:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [logging]
+    logs:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [logging, file]

+ 18 - 3
apps/emqx/src/emqx_channel.erl

@@ -398,8 +398,15 @@ handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when
     handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
 handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
     case emqx_packet:check(Packet) of
-        ok -> process_publish(Packet, Channel);
-        {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel)
+        ok ->
+            emqx_external_trace:trace_process_publish(
+                Packet,
+                %% More info can be added in future, but for now only clientid is used
+                trace_info(Channel),
+                fun(PacketWithTrace) -> process_publish(PacketWithTrace, Channel) end
+            );
+        {error, ReasonCode} ->
+            handle_out(disconnect, ReasonCode, Channel)
     end;
 handle_in(
     ?PUBACK_PACKET(PacketId, _ReasonCode, Properties),
@@ -921,7 +928,11 @@ handle_deliver(
     Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session),
     NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
     {ok, Channel#channel{session = NSession}};
-handle_deliver(
+handle_deliver(Delivers, Channel) ->
+    Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)),
+    do_handle_deliver(Delivers1, Channel).
+
+do_handle_deliver(
     Delivers,
     Channel = #channel{
         session = Session,
@@ -1429,6 +1440,10 @@ overload_protection(_, #channel{clientinfo = #{zone := Zone}}) ->
     emqx_olp:backoff(Zone),
     ok.
 
+trace_info(Channel) ->
+    %% More info can be added in future, but for now only clientid is used
+    maps:from_list(info([clientid], Channel)).
+
 %%--------------------------------------------------------------------
 %% Enrich MQTT Connect Info
 

+ 7 - 2
apps/emqx/src/emqx_connection.erl

@@ -855,9 +855,14 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
 %%--------------------------------------------------------------------
 %% Handle outgoing packets
 
-handle_outgoing(Packets, State) when is_list(Packets) ->
+handle_outgoing(Packets, State) ->
+    Res = do_handle_outgoing(Packets, State),
+    emqx_external_trace:end_trace_send(Packets),
+    Res.
+
+do_handle_outgoing(Packets, State) when is_list(Packets) ->
     send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
-handle_outgoing(Packet, State) ->
+do_handle_outgoing(Packet, State) ->
     send((serialize_and_inc_stats_fun(State))(Packet), State).
 
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->

+ 117 - 0
apps/emqx/src/emqx_external_trace.erl

@@ -0,0 +1,117 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_external_trace).
+
+-callback trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    ChannelInfo :: channel_info(),
+    Res :: term().
+
+-callback start_trace_send(list(emqx_types:deliver()), channel_info()) ->
+    list(emqx_types:deliver()).
+
+-callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
+
+-callback event(EventName :: term(), Attributes :: term()) -> ok.
+
+-type channel_info() :: #{atom() => _}.
+
+-export([
+    provider/0,
+    register_provider/1,
+    unregister_provider/1,
+    trace_process_publish/3,
+    start_trace_send/2,
+    end_trace_send/1,
+    event/1,
+    event/2
+]).
+
+-export_type([channel_info/0]).
+
+-define(PROVIDER, {?MODULE, trace_provider}).
+
+-define(with_provider(IfRegistered, IfNotRegistered),
+    case persistent_term:get(?PROVIDER, undefined) of
+        undefined ->
+            IfNotRegistered;
+        Provider ->
+            Provider:IfRegistered
+    end
+).
+
+%%--------------------------------------------------------------------
+%% provider API
+%%--------------------------------------------------------------------
+
+-spec register_provider(module()) -> ok | {error, term()}.
+register_provider(Module) when is_atom(Module) ->
+    case is_valid_provider(Module) of
+        true ->
+            persistent_term:put(?PROVIDER, Module);
+        false ->
+            {error, invalid_provider}
+    end.
+
+-spec unregister_provider(module()) -> ok | {error, term()}.
+unregister_provider(Module) ->
+    case persistent_term:get(?PROVIDER, undefined) of
+        Module ->
+            persistent_term:erase(?PROVIDER),
+            ok;
+        _ ->
+            {error, not_registered}
+    end.
+
+-spec provider() -> module() | undefined.
+provider() ->
+    persistent_term:get(?PROVIDER, undefined).
+%%--------------------------------------------------------------------
+%% trace API
+%%--------------------------------------------------------------------
+
+-spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    ChannelInfo :: channel_info(),
+    Res :: term().
+trace_process_publish(Packet, ChannelInfo, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Packet, ChannelInfo, ProcessFun), ProcessFun(Packet)).
+
+-spec start_trace_send(list(emqx_types:deliver()), channel_info()) ->
+    list(emqx_types:deliver()).
+start_trace_send(Delivers, ChannelInfo) ->
+    ?with_provider(?FUNCTION_NAME(Delivers, ChannelInfo), Delivers).
+
+-spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
+end_trace_send(Packets) ->
+    ?with_provider(?FUNCTION_NAME(Packets), ok).
+
+event(Name) ->
+    event(Name, #{}).
+
+-spec event(term(), term()) -> ok.
+event(Name, Attributes) ->
+    ?with_provider(?FUNCTION_NAME(Name, Attributes), ok).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+is_valid_provider(Module) ->
+    lists:all(
+        fun({F, A}) -> erlang:function_exported(Module, F, A) end,
+        ?MODULE:behaviour_info(callbacks)
+    ).

+ 2 - 0
apps/emqx/src/emqx_frame.erl

@@ -960,6 +960,8 @@ serialize_properties(Props) when is_map(Props) ->
 
 serialize_property(_, Disabled) when Disabled =:= disabled; Disabled =:= undefined ->
     <<>>;
+serialize_property(internal_extra, _) ->
+    <<>>;
 serialize_property('Payload-Format-Indicator', Val) ->
     <<16#01, Val>>;
 serialize_property('Message-Expiry-Interval', Val) ->

+ 9 - 3
apps/emqx/src/emqx_message.erl

@@ -311,7 +311,8 @@ to_packet(
         qos = QoS,
         headers = Headers,
         topic = Topic,
-        payload = Payload
+        payload = Payload,
+        extra = Extra
     }
 ) ->
     #mqtt_packet{
@@ -324,8 +325,8 @@ to_packet(
         variable = #mqtt_packet_publish{
             topic_name = Topic,
             packet_id = PacketId,
-            properties = filter_pub_props(
-                maps:get(properties, Headers, #{})
+            properties = maybe_put_extra(
+                Extra, filter_pub_props(maps:get(properties, Headers, #{}))
             )
         },
         payload = Payload
@@ -345,6 +346,11 @@ filter_pub_props(Props) ->
         Props
     ).
 
+maybe_put_extra(Extra, Props) when map_size(Extra) > 0 ->
+    Props#{internal_extra => Extra};
+maybe_put_extra(_Extra, Props) ->
+    Props.
+
 %% @doc Message to map
 -spec to_map(emqx_types:message()) -> message_map().
 to_map(#message{

+ 7 - 1
apps/emqx/src/emqx_packet.erl

@@ -452,9 +452,15 @@ to_message(
     Headers
 ) ->
     Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
+    {Extra, Props1} =
+        case maps:take(internal_extra, Props) of
+            error -> {#{}, Props};
+            ExtraProps -> ExtraProps
+        end,
     Msg#message{
         flags = #{dup => Dup, retain => Retain},
-        headers = Headers#{properties => Props}
+        headers = Headers#{properties => Props1},
+        extra = Extra
     }.
 
 -spec will_msg(#mqtt_packet_connect{}) -> emqx_types:message().

+ 2 - 2
apps/emqx/test/emqx_message_SUITE.erl

@@ -207,7 +207,7 @@ t_to_map(_) ->
         {topic, <<"topic">>},
         {payload, <<"payload">>},
         {timestamp, emqx_message:timestamp(Msg)},
-        {extra, []}
+        {extra, #{}}
     ],
     ?assertEqual(List, emqx_message:to_list(Msg)),
     ?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).
@@ -223,7 +223,7 @@ t_from_map(_) ->
         topic => <<"topic">>,
         payload => <<"payload">>,
         timestamp => emqx_message:timestamp(Msg),
-        extra => []
+        extra => #{}
     },
     ?assertEqual(Map, emqx_message:to_map(Msg)),
     ?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))).

+ 1 - 6
apps/emqx_machine/priv/reboot_lists.eterm

@@ -25,12 +25,7 @@
             redbug,
             xmerl,
             {hocon, load},
-            telemetry,
-            {opentelemetry, load},
-            {opentelemetry_api, load},
-            {opentelemetry_experimental, load},
-            {opentelemetry_api_experimental, load},
-            {opentelemetry_exporter, load}
+            telemetry
         ],
     %% must always be of type `load'
     common_business_apps =>

+ 1 - 0
apps/emqx_machine/src/emqx_machine.erl

@@ -50,6 +50,7 @@ start() ->
     start_sysmon(),
     configure_shard_transports(),
     set_mnesia_extra_diagnostic_checks(),
+    emqx_otel_app:configure_otel_deps(),
     ekka:start(),
     ok.
 

+ 1 - 3
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -69,9 +69,7 @@ stop_apps() ->
     ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
     _ = emqx_alarm_handler:unload(),
     ok = emqx_conf_app:unset_config_loaded(),
-    lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())),
-    %% Mute otel deps application.
-    ok = emqx_otel_app:stop_deps().
+    lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
 
 %% Those port apps are terminated after the main apps
 %% Don't need to stop when reboot.

+ 1 - 0
apps/emqx_opentelemetry/docker-ct

@@ -0,0 +1 @@
+otel

+ 11 - 3
apps/emqx_opentelemetry/rebar.config

@@ -1,8 +1,16 @@
 %% -*- mode: erlang -*-
 
-{deps, [
-    {emqx, {path, "../emqx"}}
-]}.
+{deps,
+    [{emqx, {path, "../emqx"}}
+    %% trace
+    , {opentelemetry_api, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.6-emqx"}, "apps/opentelemetry_api"}}
+    , {opentelemetry, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.6-emqx"}, "apps/opentelemetry"}}
+    %% logs, metrics
+    , {opentelemetry_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.6-emqx"}, "apps/opentelemetry_experimental"}}
+    , {opentelemetry_api_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.6-emqx"}, "apps/opentelemetry_api_experimental"}}
+    %% export
+    , {opentelemetry_exporter, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.6-emqx"}, "apps/opentelemetry_exporter"}}
+    ]}.
 
 {edoc_opts, [{preprocess, true}]}.
 {erl_opts, [

+ 6 - 1
apps/emqx_opentelemetry/src/emqx_opentelemetry.app.src

@@ -8,7 +8,12 @@
         stdlib,
         emqx,
         %% otel metrics depend on emqx_mgmt_cache
-        emqx_management
+        emqx_management,
+        opentelemetry_exporter,
+        opentelemetry,
+        opentelemetry_experimental,
+        opentelemetry_api,
+        opentelemetry_api_experimental
     ]},
     {env, []},
     {modules, []},

+ 8 - 13
apps/emqx_opentelemetry/src/emqx_otel_api.erl

@@ -103,24 +103,19 @@ otel_config_schema() ->
 
 otel_config_example() ->
     #{
+        exporter => #{
+            endpoint => "http://localhost:4317",
+            ssl_options => #{}
+        },
         logs => #{
             enable => true,
-            exporter => #{
-                endpoint => "http://localhost:4317",
-                ssl_options => #{
-                    enable => false
-                }
-            },
             level => warning
         },
         metrics => #{
+            enable => true
+        },
+        traces => #{
             enable => true,
-            exporter => #{
-                endpoint => "http://localhost:4317",
-                interval => "10s",
-                ssl_options => #{
-                    enable => false
-                }
-            }
+            filter => #{trace_all => false}
         }
     }.

+ 12 - 3
apps/emqx_opentelemetry/src/emqx_otel_app.erl

@@ -19,17 +19,26 @@
 -behaviour(application).
 
 -export([start/2, stop/1]).
--export([stop_deps/0]).
+-export([configure_otel_deps/0]).
 
 start(_StartType, _StartArgs) ->
     emqx_otel_config:add_handler(),
     ok = emqx_otel_config:add_otel_log_handler(),
+    ok = emqx_otel_trace:ensure_traces(emqx:get_config([opentelemetry])),
     emqx_otel_sup:start_link().
 
 stop(_State) ->
     emqx_otel_config:remove_handler(),
+    _ = emqx_otel_trace:stop(),
     _ = emqx_otel_config:remove_otel_log_handler(),
     ok.
 
-stop_deps() ->
-    emqx_otel_config:stop_all_otel_apps().
+configure_otel_deps() ->
+    %% default tracer and metrics are started only on demand
+    ok = application:set_env(
+        [
+            {opentelemetry, [{start_default_tracer, false}]},
+            {opentelemetry_experimental, [{start_default_metrics, false}]}
+        ],
+        [{persistent, true}]
+    ).

+ 38 - 37
apps/emqx_opentelemetry/src/emqx_otel_config.erl

@@ -27,7 +27,6 @@
 -export([post_config_update/5]).
 -export([update/1]).
 -export([add_otel_log_handler/0, remove_otel_log_handler/0]).
--export([stop_all_otel_apps/0]).
 -export([otel_exporter/1]).
 
 update(Config) ->
@@ -54,27 +53,20 @@ remove_handler() ->
 
 post_config_update(?OPTL, _Req, Old, Old, _AppEnvs) ->
     ok;
-post_config_update(?OPTL, _Req, New, _Old, AppEnvs) ->
+post_config_update(?OPTL, _Req, New, Old, AppEnvs) ->
     application:set_env(AppEnvs),
-    MetricsRes = ensure_otel_metrics(New),
-    LogsRes = ensure_otel_logs(New),
-    _ = maybe_stop_all_otel_apps(New),
-    case {MetricsRes, LogsRes} of
-        {ok, ok} -> ok;
+    MetricsRes = ensure_otel_metrics(New, Old),
+    LogsRes = ensure_otel_logs(New, Old),
+    TracesRes = ensure_otel_traces(New, Old),
+    case {MetricsRes, LogsRes, TracesRes} of
+        {ok, ok, ok} -> ok;
         Other -> {error, Other}
     end;
 post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
     ok.
 
-stop_all_otel_apps() ->
-    _ = application:stop(opentelemetry),
-    _ = application:stop(opentelemetry_experimental),
-    _ = application:stop(opentelemetry_experimental_api),
-    _ = application:stop(opentelemetry_exporter),
-    ok.
-
 add_otel_log_handler() ->
-    ensure_otel_logs(emqx:get_config(?OPTL)).
+    ensure_otel_logs(emqx:get_config(?OPTL), #{}).
 
 remove_otel_log_handler() ->
     remove_handler_if_present(?OTEL_LOG_HANDLER_ID).
@@ -93,23 +85,43 @@ otel_exporter(ExporterConf) ->
 
 %% Internal functions
 
-ensure_otel_metrics(#{metrics := #{enable := true} = MetricsConf}) ->
+ensure_otel_metrics(
+    #{metrics := MetricsConf, exporter := Exporter},
+    #{metrics := MetricsConf, exporter := Exporter}
+) ->
+    ok;
+ensure_otel_metrics(#{metrics := #{enable := true}} = Conf, _Old) ->
     _ = emqx_otel_metrics:stop_otel(),
-    emqx_otel_metrics:start_otel(MetricsConf);
-ensure_otel_metrics(#{metrics := #{enable := false}}) ->
+    emqx_otel_metrics:start_otel(Conf);
+ensure_otel_metrics(#{metrics := #{enable := false}}, _Old) ->
     emqx_otel_metrics:stop_otel();
-ensure_otel_metrics(_) ->
+ensure_otel_metrics(_, _) ->
     ok.
 
-ensure_otel_logs(#{logs := #{enable := true} = LogsConf}) ->
+ensure_otel_logs(
+    #{logs := LogsConf, exporter := Exporter},
+    #{logs := LogsConf, exporter := Exporter}
+) ->
+    ok;
+ensure_otel_logs(#{logs := #{enable := true}} = Conf, _OldConf) ->
     ok = remove_handler_if_present(?OTEL_LOG_HANDLER_ID),
-    ok = ensure_log_apps(),
-    HandlerConf = tr_handler_conf(LogsConf),
+    HandlerConf = tr_handler_conf(Conf),
     %% NOTE: should primary logger level be updated if it's higher than otel log level?
     logger:add_handler(?OTEL_LOG_HANDLER_ID, ?OTEL_LOG_HANDLER, HandlerConf);
-ensure_otel_logs(#{logs := #{enable := false}}) ->
+ensure_otel_logs(#{logs := #{enable := false}}, _OldConf) ->
     remove_handler_if_present(?OTEL_LOG_HANDLER_ID).
 
+ensure_otel_traces(
+    #{traces := TracesConf, exporter := Exporter},
+    #{traces := TracesConf, exporter := Exporter}
+) ->
+    ok;
+ensure_otel_traces(#{traces := #{enable := true}} = Conf, _OldConf) ->
+    _ = emqx_otel_trace:stop(),
+    emqx_otel_trace:start(Conf);
+ensure_otel_traces(#{traces := #{enable := false}}, _OldConf) ->
+    emqx_otel_trace:stop().
+
 remove_handler_if_present(HandlerId) ->
     case logger:get_handler_config(HandlerId) of
         {ok, _} ->
@@ -118,24 +130,13 @@ remove_handler_if_present(HandlerId) ->
             ok
     end.
 
-ensure_log_apps() ->
-    {ok, _} = application:ensure_all_started(opentelemetry_exporter),
-    {ok, _} = application:ensure_all_started(opentelemetry_experimental),
-    ok.
-
-maybe_stop_all_otel_apps(#{metrics := #{enable := false}, logs := #{enable := false}}) ->
-    stop_all_otel_apps();
-maybe_stop_all_otel_apps(_) ->
-    ok.
-
-tr_handler_conf(Conf) ->
+tr_handler_conf(#{logs := LogsConf, exporter := ExporterConf}) ->
     #{
         level := Level,
         max_queue_size := MaxQueueSize,
         exporting_timeout := ExportingTimeout,
-        scheduled_delay := ScheduledDelay,
-        exporter := ExporterConf
-    } = Conf,
+        scheduled_delay := ScheduledDelay
+    } = LogsConf,
     #{
         level => Level,
         config => #{

+ 8 - 9
apps/emqx_opentelemetry/src/emqx_otel_metrics.erl

@@ -65,7 +65,7 @@ handle_info(_Msg, State) ->
 terminate(_Reason, _State) ->
     ok.
 
-setup(Conf = #{enable := true}) ->
+setup(Conf = #{metrics := #{enable := true}}) ->
     ensure_apps(Conf),
     create_metric_views();
 setup(_Conf) ->
@@ -73,11 +73,10 @@ setup(_Conf) ->
     ok.
 
 ensure_apps(Conf) ->
-    #{exporter := #{interval := ExporterInterval} = Exporter} = Conf,
-    {ok, _} = application:ensure_all_started(opentelemetry_exporter),
-    {ok, _} = application:ensure_all_started(opentelemetry),
-    {ok, _} = application:ensure_all_started(opentelemetry_experimental),
-    {ok, _} = application:ensure_all_started(opentelemetry_api_experimental),
+    #{
+        exporter := Exporter,
+        metrics := #{interval := ExporterInterval}
+    } = Conf,
 
     _ = opentelemetry_experimental:stop_default_metrics(),
     ok = application:set_env(
@@ -102,12 +101,12 @@ cleanup() ->
 
 safe_stop_default_metrics() ->
     try
-        _ = opentelemetry_experimental:stop_default_metrics()
+        _ = opentelemetry_experimental:stop_default_metrics(),
+        ok
     catch
         %% noramal scenario, metrics supervisor is not started
         exit:{noproc, _} -> ok
-    end,
-    ok.
+    end.
 
 create_metric_views() ->
     Meter = opentelemetry_experimental:get_meter(),

+ 108 - 47
apps/emqx_opentelemetry/src/emqx_otel_schema.erl

@@ -30,15 +30,27 @@
 upgrade_legacy_metrics(RawConf) ->
     case RawConf of
         #{<<"opentelemetry">> := Otel} ->
-            LegacyMetricsFields = [<<"enable">>, <<"exporter">>],
-            Otel1 = maps:without(LegacyMetricsFields, Otel),
-            Metrics = maps:with(LegacyMetricsFields, Otel),
-            case Metrics =:= #{} of
-                true ->
-                    RawConf;
-                false ->
-                    RawConf#{<<"opentelemetry">> => Otel1#{<<"metrics">> => Metrics}}
-            end;
+            Otel1 =
+                case maps:take(<<"enable">>, Otel) of
+                    {MetricsEnable, OtelConf} ->
+                        emqx_utils_maps:deep_put(
+                            [<<"metrics">>, <<"enable">>], OtelConf, MetricsEnable
+                        );
+                    error ->
+                        Otel
+                end,
+            Otel2 =
+                case Otel1 of
+                    #{<<"exporter">> := #{<<"interval">> := Interval} = Exporter} ->
+                        emqx_utils_maps:deep_put(
+                            [<<"metrics">>, <<"interval">>],
+                            Otel1#{<<"exporter">> => maps:remove(<<"interval">>, Exporter)},
+                            Interval
+                        );
+                    _ ->
+                        Otel1
+                end,
+            RawConf#{<<"opentelemetry">> => Otel2};
         _ ->
             RawConf
     end.
@@ -62,6 +74,20 @@ fields("opentelemetry") ->
                 #{
                     desc => ?DESC(otel_logs)
                 }
+            )},
+        {traces,
+            ?HOCON(
+                ?R_REF("otel_traces"),
+                #{
+                    desc => ?DESC(otel_traces)
+                }
+            )},
+        {exporter,
+            ?HOCON(
+                ?R_REF("otel_exporter"),
+                #{
+                    desc => ?DESC(otel_exporter)
+                }
             )}
     ];
 fields("otel_metrics") ->
@@ -75,10 +101,15 @@ fields("otel_metrics") ->
                     desc => ?DESC(enable)
                 }
             )},
-        {exporter,
+        {interval,
             ?HOCON(
-                ?R_REF("otel_metrics_exporter"),
-                #{desc => ?DESC(exporter)}
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    aliases => [scheduled_delay],
+                    default => <<"10s">>,
+                    desc => ?DESC(scheduled_delay),
+                    importance => ?IMPORTANCE_HIDDEN
+                }
             )}
     ];
 fields("otel_logs") ->
@@ -127,34 +158,56 @@ fields("otel_logs") ->
                     desc => ?DESC(scheduled_delay),
                     importance => ?IMPORTANCE_HIDDEN
                 }
-            )},
-        {exporter,
+            )}
+    ];
+fields("otel_traces") ->
+    [
+        {enable,
             ?HOCON(
-                ?R_REF("otel_logs_exporter"),
+                boolean(),
                 #{
-                    desc => ?DESC(exporter),
+                    default => false,
+                    desc => ?DESC(enable),
                     importance => ?IMPORTANCE_HIGH
                 }
+            )},
+        {max_queue_size,
+            ?HOCON(
+                pos_integer(),
+                #{
+                    default => 2048,
+                    desc => ?DESC(max_queue_size),
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {exporting_timeout,
+            ?HOCON(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => <<"30s">>,
+                    desc => ?DESC(exporting_timeout),
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {scheduled_delay,
+            ?HOCON(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => <<"5s">>,
+                    desc => ?DESC(scheduled_delay),
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {filter,
+            ?HOCON(
+                ?R_REF("trace_filter"),
+                #{
+                    desc => ?DESC(trace_filter),
+                    importance => ?IMPORTANCE_MEDIUM
+                }
             )}
     ];
-fields("otel_metrics_exporter") ->
-    exporter_fields(metrics);
-fields("otel_logs_exporter") ->
-    exporter_fields(logs);
-fields("ssl_opts") ->
-    Schema = emqx_schema:client_ssl_opts_schema(#{}),
-    lists:keydelete("enable", 1, Schema).
-
-desc("opentelemetry") -> ?DESC(opentelemetry);
-desc("exporter") -> ?DESC(exporter);
-desc("otel_logs_exporter") -> ?DESC(exporter);
-desc("otel_metrics_exporter") -> ?DESC(exporter);
-desc("otel_logs") -> ?DESC(otel_logs);
-desc("otel_metrics") -> ?DESC(otel_metrics);
-desc("ssl_opts") -> ?DESC(exporter_ssl);
-desc(_) -> undefined.
-
-exporter_fields(OtelSignal) ->
+fields("otel_exporter") ->
     [
         {endpoint,
             ?HOCON(
@@ -183,21 +236,29 @@ exporter_fields(OtelSignal) ->
                     importance => ?IMPORTANCE_LOW
                 }
             )}
-    ] ++ exporter_extra_fields(OtelSignal).
-
-%% Let's keep it in exporter config for metrics, as it is different from
-%% scheduled_delay_ms opt used for otel traces and logs
-exporter_extra_fields(metrics) ->
+    ];
+fields("ssl_opts") ->
+    Schema = emqx_schema:client_ssl_opts_schema(#{}),
+    lists:keydelete("enable", 1, Schema);
+fields("trace_filter") ->
+    %% More filters can be implemented in future, e.g. topic, clientid
     [
-        {interval,
+        {trace_all,
             ?HOCON(
-                emqx_schema:timeout_duration_ms(),
+                boolean(),
                 #{
-                    default => <<"10s">>,
-                    required => true,
-                    desc => ?DESC(scheduled_delay)
+                    default => false,
+                    desc => ?DESC(trace_all),
+                    importance => ?IMPORTANCE_MEDIUM
                 }
             )}
-    ];
-exporter_extra_fields(_OtelSignal) ->
-    [].
+    ].
+
+desc("opentelemetry") -> ?DESC(opentelemetry);
+desc("otel_exporter") -> ?DESC(otel_exporter);
+desc("otel_logs") -> ?DESC(otel_logs);
+desc("otel_metrics") -> ?DESC(otel_metrics);
+desc("otel_traces") -> ?DESC(otel_traces);
+desc("ssl_opts") -> ?DESC(exporter_ssl);
+desc("trace_filter") -> ?DESC(trace_filter);
+desc(_) -> undefined.

+ 3 - 3
apps/emqx_opentelemetry/src/emqx_otel_sup.erl

@@ -41,8 +41,8 @@ init([]) ->
         period => 512
     },
     Children =
-        case emqx_conf:get([opentelemetry, metrics]) of
-            #{enable := false} -> [];
-            #{enable := true} = Conf -> [worker_spec(emqx_otel_metrics, Conf)]
+        case emqx_conf:get([opentelemetry]) of
+            #{metrics := #{enable := false}} -> [];
+            #{metrics := #{enable := true}} = Conf -> [worker_spec(emqx_otel_metrics, Conf)]
         end,
     {ok, {SupFlags, Children}}.

+ 272 - 0
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -0,0 +1,272 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_otel_trace).
+
+-behaviour(emqx_external_trace).
+
+-export([
+    ensure_traces/1,
+    start/1,
+    stop/0
+]).
+
+-export([toggle_registered/1]).
+
+-export([
+    trace_process_publish/3,
+    start_trace_send/2,
+    end_trace_send/1,
+    event/2
+]).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("opentelemetry_api/include/otel_tracer.hrl").
+
+-define(EMQX_OTEL_CTX, otel_ctx).
+-define(IS_ENABLED, emqx_enable).
+-define(USER_PROPERTY, 'User-Property').
+
+-define(TRACE_ALL_KEY, {?MODULE, trace_all}).
+-define(TRACE_ALL, persistent_term:get(?TRACE_ALL_KEY, false)).
+
+%%--------------------------------------------------------------------
+%% config
+%%--------------------------------------------------------------------
+
+-spec toggle_registered(boolean()) -> ok | {error, term()}.
+toggle_registered(true = _Enable) ->
+    emqx_external_trace:register_provider(?MODULE);
+toggle_registered(false = _Enable) ->
+    _ = emqx_external_trace:unregister_provider(?MODULE),
+    ok.
+
+-spec ensure_traces(map()) -> ok | {error, term()}.
+ensure_traces(#{traces := #{enable := true}} = Conf) ->
+    start(Conf);
+ensure_traces(_Conf) ->
+    ok.
+
+-spec start(map()) -> ok | {error, term()}.
+start(#{traces := TracesConf, exporter := ExporterConf}) ->
+    #{
+        max_queue_size := MaxQueueSize,
+        exporting_timeout := ExportingTimeout,
+        scheduled_delay := ScheduledDelay,
+        filter := #{trace_all := TraceAll}
+    } = TracesConf,
+    OtelEnv = [
+        {bsp_scheduled_delay_ms, ScheduledDelay},
+        {bsp_exporting_timeout_ms, ExportingTimeout},
+        {bsp_max_queue_size, MaxQueueSize},
+        {traces_exporter, emqx_otel_config:otel_exporter(ExporterConf)}
+    ],
+    set_trace_all(TraceAll),
+    ok = application:set_env([{opentelemetry, OtelEnv}]),
+    Res = assert_started(opentelemetry:start_default_tracer_provider()),
+    case Res of
+        ok ->
+            _ = toggle_registered(true),
+            Res;
+        Err ->
+            Err
+    end.
+
+-spec stop() -> ok.
+stop() ->
+    _ = toggle_registered(false),
+    safe_stop_default_tracer().
+
+%%--------------------------------------------------------------------
+%% trace API
+%%--------------------------------------------------------------------
+
+-spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    ChannelInfo :: emqx_external_trace:channel_info(),
+    Res :: term().
+trace_process_publish(Packet, ChannelInfo, ProcessFun) ->
+    case maybe_init_ctx(Packet) of
+        false ->
+            ProcessFun(Packet);
+        RootCtx ->
+            RootCtx1 = otel_ctx:set_value(RootCtx, ?IS_ENABLED, true),
+            Attrs = maps:merge(packet_attributes(Packet), channel_attributes(ChannelInfo)),
+            SpanCtx = otel_tracer:start_span(RootCtx1, ?current_tracer, process_message, #{
+                attributes => Attrs
+            }),
+            Ctx = otel_tracer:set_current_span(RootCtx1, SpanCtx),
+            %% put ctx to packet, so it can be further propagated
+            Packet1 = put_ctx_to_packet(Ctx, Packet),
+            _ = otel_ctx:attach(Ctx),
+            try
+                ProcessFun(Packet1)
+            after
+                _ = ?end_span(),
+                clear()
+            end
+    end.
+
+-spec start_trace_send(list(emqx_types:deliver()), emqx_external_trace:channel_info()) ->
+    list(emqx_types:deliver()).
+start_trace_send(Delivers, ChannelInfo) ->
+    lists:map(
+        fun({deliver, Topic, Msg} = Deliver) ->
+            case get_ctx_from_msg(Msg) of
+                Ctx when is_map(Ctx) ->
+                    Attrs = maps:merge(
+                        msg_attributes(Msg), sub_channel_attributes(ChannelInfo)
+                    ),
+                    StartOpts = #{attributes => Attrs},
+                    SpanCtx = otel_tracer:start_span(
+                        Ctx, ?current_tracer, send_published_message, StartOpts
+                    ),
+                    Msg1 = put_ctx_to_msg(
+                        otel_tracer:set_current_span(Ctx, SpanCtx), Msg
+                    ),
+                    {deliver, Topic, Msg1};
+                _ ->
+                    Deliver
+            end
+        end,
+        Delivers
+    ).
+
+-spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
+end_trace_send(Packets) ->
+    lists:foreach(
+        fun(Packet) ->
+            case get_ctx_from_packet(Packet) of
+                Ctx when is_map(Ctx) ->
+                    otel_span:end_span(otel_tracer:current_span_ctx(Ctx));
+                _ ->
+                    ok
+            end
+        end,
+        packets_list(Packets)
+    ).
+
+%% NOTE: adds an event only within an active span (Otel Ctx must be set in the calling process dict)
+-spec event(opentelemetry:event_name(), opentelemetry:attributes_map()) -> ok.
+event(Name, Attributes) ->
+    case otel_ctx:get_value(?IS_ENABLED, false) of
+        true ->
+            ?add_event(Name, Attributes),
+            ok;
+        false ->
+            ok
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+packets_list(Packets) when is_list(Packets) ->
+    Packets;
+packets_list(Packet) ->
+    [Packet].
+
+maybe_init_ctx(#mqtt_packet{variable = Packet}) ->
+    case should_trace_packet(Packet) of
+        true ->
+            Ctx = extract_traceparent_from_packet(Packet),
+            should_trace_context(Ctx) andalso Ctx;
+        false ->
+            false
+    end.
+
+extract_traceparent_from_packet(Packet) ->
+    Ctx = otel_ctx:new(),
+    case emqx_packet:info(properties, Packet) of
+        #{?USER_PROPERTY := UserProps} ->
+            otel_propagator_text_map:extract_to(Ctx, UserProps);
+        _ ->
+            Ctx
+    end.
+
+should_trace_context(RootCtx) ->
+    map_size(RootCtx) > 0 orelse ?TRACE_ALL.
+
+should_trace_packet(Packet) ->
+    not is_sys(emqx_packet:info(topic_name, Packet)).
+
+%% TODO: move to emqx_topic module?
+is_sys(<<"$SYS/", _/binary>> = _Topic) -> true;
+is_sys(_Topic) -> false.
+
+msg_attributes(Msg) ->
+    #{
+        'messaging.destination.name' => emqx_message:topic(Msg),
+        'messaging.client_id' => emqx_message:from(Msg)
+    }.
+
+packet_attributes(#mqtt_packet{variable = Packet}) ->
+    #{'messaging.destination.name' => emqx_packet:info(topic_name, Packet)}.
+
+channel_attributes(ChannelInfo) ->
+    #{'messaging.client_id' => maps:get(clientid, ChannelInfo, undefined)}.
+
+sub_channel_attributes(ChannelInfo) ->
+    channel_attributes(ChannelInfo).
+
+put_ctx_to_msg(OtelCtx, Msg = #message{extra = Extra}) when is_map(Extra) ->
+    Msg#message{extra = Extra#{?EMQX_OTEL_CTX => OtelCtx}};
+%% extra field has not being used previously and defaulted to an empty list, it's safe to overwrite it
+put_ctx_to_msg(OtelCtx, Msg) when is_record(Msg, message) ->
+    Msg#message{extra = #{?EMQX_OTEL_CTX => OtelCtx}}.
+
+put_ctx_to_packet(
+    OtelCtx, #mqtt_packet{variable = #mqtt_packet_publish{properties = Props} = PubPacket} = Packet
+) ->
+    Extra = maps:get(internal_extra, Props, #{}),
+    Props1 = Props#{internal_extra => Extra#{?EMQX_OTEL_CTX => OtelCtx}},
+    Packet#mqtt_packet{variable = PubPacket#mqtt_packet_publish{properties = Props1}}.
+
+get_ctx_from_msg(#message{extra = Extra}) ->
+    from_extra(Extra).
+
+get_ctx_from_packet(#mqtt_packet{
+    variable = #mqtt_packet_publish{properties = #{internal_extra := Extra}}
+}) ->
+    from_extra(Extra);
+get_ctx_from_packet(_) ->
+    undefined.
+
+from_extra(#{?EMQX_OTEL_CTX := OtelCtx}) ->
+    OtelCtx;
+from_extra(_) ->
+    undefined.
+
+clear() ->
+    otel_ctx:clear().
+
+safe_stop_default_tracer() ->
+    try
+        _ = opentelemetry:stop_default_tracer_provider(),
+        ok
+    catch
+        %% noramal scenario, opentelemetry supervisor is not started
+        exit:{noproc, _} -> ok
+    end,
+    ok.
+
+assert_started({ok, _Pid}) -> ok;
+assert_started({ok, _Pid, _Info}) -> ok;
+assert_started({error, {already_started, _Pid}}) -> ok;
+assert_started({error, Reason}) -> {error, Reason}.
+
+set_trace_all(TraceAll) ->
+    persistent_term:put({?MODULE, trace_all}, TraceAll).

+ 252 - 0
apps/emqx_opentelemetry/test/emqx_otel_api_SUITE.erl

@@ -0,0 +1,252 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_otel_api_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(OTEL_API_PATH, emqx_mgmt_api_test_util:api_path(["opentelemetry"])).
+-define(CONF_PATH, [opentelemetry]).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    %% This is called by emqx_machine in EMQX release
+    emqx_otel_app:configure_otel_deps(),
+    Apps = emqx_cth_suite:start(
+        [
+            emqx_conf,
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"},
+            emqx_opentelemetry
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    Auth = auth_header(),
+    [{suite_apps, Apps}, {auth, Auth} | Config].
+
+end_per_suite(Config) ->
+    emqx_cth_suite:stop(?config(suite_apps, Config)),
+    emqx_config:delete_override_conf_files(),
+    ok.
+
+init_per_testcase(_TC, Config) ->
+    emqx_conf:update(
+        ?CONF_PATH,
+        #{
+            <<"traces">> => #{<<"enable">> => false},
+            <<"metrics">> => #{<<"enable">> => false},
+            <<"logs">> => #{<<"enable">> => false}
+        },
+        #{}
+    ),
+    Config.
+
+end_per_testcase(_TC, _Config) ->
+    ok.
+
+auth_header() ->
+    {ok, API} = emqx_common_test_http:create_default_app(),
+    emqx_common_test_http:auth_header(API).
+
+t_get(Config) ->
+    Auth = ?config(auth, Config),
+    Path = ?OTEL_API_PATH,
+    {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
+    ?assertMatch(
+        #{
+            <<"traces">> := #{<<"enable">> := false},
+            <<"metrics">> := #{<<"enable">> := false},
+            <<"logs">> := #{<<"enable">> := false}
+        },
+        emqx_utils_json:decode(Resp)
+    ).
+
+t_put_enable_disable(Config) ->
+    Auth = ?config(auth, Config),
+    Path = ?OTEL_API_PATH,
+    EnableAllReq = #{
+        <<"traces">> => #{<<"enable">> => true},
+        <<"metrics">> => #{<<"enable">> => true},
+        <<"logs">> => #{<<"enable">> => true}
+    },
+    ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, EnableAllReq)),
+    ?assertMatch(
+        #{
+            traces := #{enable := true},
+            metrics := #{enable := true},
+            logs := #{enable := true}
+        },
+        emqx:get_config(?CONF_PATH)
+    ),
+
+    DisableAllReq = #{
+        <<"traces">> => #{<<"enable">> => false},
+        <<"metrics">> => #{<<"enable">> => false},
+        <<"logs">> => #{<<"enable">> => false}
+    },
+    ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, DisableAllReq)),
+    ?assertMatch(
+        #{
+            traces := #{enable := false},
+            metrics := #{enable := false},
+            logs := #{enable := false}
+        },
+        emqx:get_config(?CONF_PATH)
+    ).
+
+t_put_invalid(Config) ->
+    Auth = ?config(auth, Config),
+    Path = ?OTEL_API_PATH,
+
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"exporter">> => #{<<"endpoint">> => <<>>}
+        })
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"exporter">> => #{<<"endpoint">> => <<"unknown://somehost.org">>}
+        })
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"exporter">> => #{<<"endpoint">> => <<"https://somehost.org:99999">>}
+        })
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"exporter">> => #{<<"endpoint">> => <<"https://somehost.org:99999">>}
+        })
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"exporter">> => #{<<"unknown_field">> => <<"foo">>}
+        })
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"exporter">> => #{<<"protocol">> => <<"unknown">>}
+        })
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"traces">> => #{<<"filter">> => #{<<"unknown_filter">> => <<"foo">>}}
+        })
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"logs">> => #{<<"level">> => <<"foo">>}
+        })
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"metrics">> => #{<<"interval">> => <<"foo">>}
+        })
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"logs">> => #{<<"unknown_field">> => <<"foo">>}
+        })
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"unknown_field">> => <<"foo">>})
+    ).
+
+t_put_valid(Config) ->
+    Auth = ?config(auth, Config),
+    Path = ?OTEL_API_PATH,
+
+    ?assertMatch(
+        {ok, _},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{
+            <<"exporter">> => #{<<"endpoint">> => <<"nohost.com">>}
+        })
+    ),
+    ?assertEqual(<<"http://nohost.com/">>, emqx:get_config(?CONF_PATH ++ [exporter, endpoint])),
+
+    ?assertMatch(
+        {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"exporter">> => #{}})
+    ),
+    ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{})),
+    ?assertMatch(
+        {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"traces">> => #{}})
+    ),
+    ?assertMatch(
+        {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"logs">> => #{}})
+    ),
+    ?assertMatch(
+        {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, #{<<"metrics">> => #{}})
+    ),
+    ?assertMatch(
+        {ok, _},
+        emqx_mgmt_api_test_util:request_api(
+            put,
+            Path,
+            "",
+            Auth,
+            #{<<"exporter">> => #{}, <<"traces">> => #{}, <<"logs">> => #{}, <<"metrics">> => #{}}
+        )
+    ),
+    ?assertMatch(
+        {ok, _},
+        emqx_mgmt_api_test_util:request_api(
+            put,
+            Path,
+            "",
+            Auth,
+            #{
+                <<"exporter">> => #{
+                    <<"endpoint">> => <<"https://localhost:4317">>, <<"protocol">> => <<"grpc">>
+                },
+                <<"traces">> => #{
+                    <<"enable">> => true,
+                    <<"max_queue_size">> => 10,
+                    <<"exporting_timeout">> => <<"10s">>,
+                    <<"scheduled_delay">> => <<"20s">>,
+                    <<"filter">> => #{<<"trace_all">> => true}
+                },
+                <<"logs">> => #{
+                    <<"level">> => <<"warning">>,
+                    <<"max_queue_size">> => 100,
+                    <<"exporting_timeout">> => <<"10s">>,
+                    <<"scheduled_delay">> => <<"1s">>
+                },
+                <<"metrics">> => #{
+                    %% alias for "interval"
+                    <<"scheduled_delay">> => <<"15321ms">>
+                }
+            }
+        ),
+        %% alias check
+        ?assertEqual(15_321, emqx:get_config(?CONF_PATH ++ [metrics, interval]))
+    ).

+ 201 - 0
apps/emqx_opentelemetry/test/emqx_otel_schema_SUITE.erl

@@ -0,0 +1,201 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_otel_schema_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% Backward compatibility suite for `upgrade_raw_conf/1`,
+%% expected callback is `emqx_otel_schema:upgrade_legacy_metrics/1`
+
+-define(OLD_CONF_ENABLED, <<
+    "\n"
+    "opentelemetry\n"
+    "{\n"
+    "    enable = true\n"
+    "}\n"
+>>).
+
+-define(OLD_CONF_DISABLED, <<
+    "\n"
+    "opentelemetry\n"
+    "{\n"
+    "    enable = false\n"
+    "}\n"
+>>).
+
+-define(OLD_CONF_ENABLED_EXPORTER, <<
+    "\n"
+    "opentelemetry\n"
+    "{\n"
+    "    enable = true\n"
+    "    exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n"
+    "}\n"
+>>).
+
+-define(OLD_CONF_DISABLED_EXPORTER, <<
+    "\n"
+    "opentelemetry\n"
+    "{\n"
+    "    enable = false\n"
+    "    exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n"
+    "}\n"
+>>).
+
+-define(OLD_CONF_EXPORTER, <<
+    "\n"
+    "opentelemetry\n"
+    "{\n"
+    "    exporter {endpoint = \"http://127.0.0.1:4317/\", interval = 5s}\n"
+    "}\n"
+>>).
+
+-define(OLD_CONF_EXPORTER_PARTIAL, <<
+    "\n"
+    "opentelemetry\n"
+    "{\n"
+    "    exporter {endpoint = \"http://127.0.0.1:4317/\"}\n"
+    "}\n"
+>>).
+
+-define(OLD_CONF_EXPORTER_PARTIAL1, <<
+    "\n"
+    "opentelemetry\n"
+    "{\n"
+    "    exporter {interval = 3s}\n"
+    "}\n"
+>>).
+
+-define(TESTS_CONF, #{
+    t_old_conf_enabled => ?OLD_CONF_ENABLED,
+    t_old_conf_disabled => ?OLD_CONF_DISABLED,
+    t_old_conf_enabled_exporter => ?OLD_CONF_ENABLED_EXPORTER,
+    t_old_conf_disabled_exporter => ?OLD_CONF_DISABLED_EXPORTER,
+    t_old_conf_exporter => ?OLD_CONF_EXPORTER,
+    t_old_conf_exporter_partial => ?OLD_CONF_EXPORTER_PARTIAL,
+    t_old_conf_exporter_partial1 => ?OLD_CONF_EXPORTER_PARTIAL1
+}).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
+
+init_per_testcase(TC, Config) ->
+    Apps = start_apps(TC, Config, maps:get(TC, ?TESTS_CONF)),
+    [{suite_apps, Apps} | Config].
+
+end_per_testcase(_TC, Config) ->
+    emqx_cth_suite:stop(?config(suite_apps, Config)),
+    emqx_config:delete_override_conf_files(),
+    ok.
+
+start_apps(TC, Config, OtelConf) ->
+    emqx_cth_suite:start(
+        [
+            {emqx_conf, OtelConf},
+            emqx_management,
+            emqx_opentelemetry
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(TC, Config)}
+    ).
+
+t_old_conf_enabled(_Config) ->
+    OtelConf = emqx:get_config([opentelemetry]),
+    ?assertMatch(
+        #{metrics := #{enable := true, interval := _}, exporter := #{endpoint := _}},
+        OtelConf
+    ),
+    ?assertNot(erlang:is_map_key(enable, OtelConf)),
+    ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
+
+t_old_conf_disabled(_Config) ->
+    OtelConf = emqx:get_config([opentelemetry]),
+    ?assertMatch(
+        #{metrics := #{enable := false, interval := _}, exporter := #{endpoint := _}},
+        OtelConf
+    ),
+    ?assertNot(erlang:is_map_key(enable, OtelConf)),
+    ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
+
+t_old_conf_enabled_exporter(_Config) ->
+    OtelConf = emqx:get_config([opentelemetry]),
+    ?assertMatch(
+        #{
+            metrics := #{enable := true, interval := 5000},
+            exporter := #{endpoint := <<"http://127.0.0.1:4317/">>}
+        },
+        OtelConf
+    ),
+    ?assertNot(erlang:is_map_key(enable, OtelConf)),
+    ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
+
+t_old_conf_disabled_exporter(_Config) ->
+    OtelConf = emqx:get_config([opentelemetry]),
+    ?assertMatch(
+        #{
+            metrics := #{enable := false, interval := 5000},
+            exporter := #{endpoint := <<"http://127.0.0.1:4317/">>}
+        },
+        OtelConf
+    ),
+    ?assertNot(erlang:is_map_key(enable, OtelConf)),
+    ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
+
+t_old_conf_exporter(_Config) ->
+    io:format(user, "TC running: ~p~n", [?FUNCTION_NAME]),
+    OtelConf = emqx:get_config([opentelemetry]),
+    ?assertMatch(
+        #{
+            metrics := #{enable := false, interval := 5000},
+            exporter := #{endpoint := <<"http://127.0.0.1:4317/">>}
+        },
+        OtelConf
+    ),
+    ?assertNot(erlang:is_map_key(enable, OtelConf)),
+    ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
+
+t_old_conf_exporter_partial(_Config) ->
+    OtelConf = emqx:get_config([opentelemetry]),
+    ?assertMatch(
+        #{
+            metrics := #{enable := false, interval := _},
+            exporter := #{endpoint := <<"http://127.0.0.1:4317/">>}
+        },
+        OtelConf
+    ),
+    ?assertNot(erlang:is_map_key(enable, OtelConf)),
+    ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).
+
+t_old_conf_exporter_partial1(_Config) ->
+    OtelConf = emqx:get_config([opentelemetry]),
+    ?assertMatch(
+        #{
+            metrics := #{enable := false, interval := 3000},
+            exporter := #{endpoint := _}
+        },
+        OtelConf
+    ),
+    ?assertNot(erlang:is_map_key(enable, OtelConf)),
+    ?assertNot(erlang:is_map_key(interval, maps:get(exporter, OtelConf))).

+ 431 - 0
apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl

@@ -0,0 +1,431 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_otel_trace_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(OTEL_SERVICE_NAME, "emqx").
+-define(CONF_PATH, [opentelemetry]).
+
+%% How to run it locally:
+%%  1. Uncomment networks in .ci/docker-compose-file/docker-compose-otel.yaml,
+%%     Uncomment OTLP gRPC ports mappings for otel-collector and otel-collector-tls services.
+%%     Uncomment jaeger-all-in-one prots maooing.
+%%  2. Start deps services:
+%%     DOCKER_USER="$(id -u)" docker-compose -f .ci/docker-compose-file/docker-compose-otel.yaml up
+%%  3. Run tests with special env variables:
+%%         PROFILE=emqx JAEGER_URL="http://localhost:16686" \
+%%         OTEL_COLLECTOR_URL="http://localhost:4317" OTEL_COLLECTOR_TLS_URL="https://localhost:14317" \
+%%         make "apps/emqx_opentelemetry-ct"
+%%     Or run only this suite:
+%%         PROFILE=emqx JAEGER_URL="http://localhost:16686" \
+%%         OTEL_COLLECTOR_URL="http://localhost:4317" OTEL_COLLECTOR_TLS_URL="https://localhost:14317" \
+%%         ./rebar3 ct -v --readable=true --name 'test@127.0.0.1' \
+%%                     --suite apps/emqx_opentelemetry/test/emqx_otel_trace_SUITE.erl
+
+all() ->
+    [
+        {group, tcp},
+        {group, tls}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {tcp, TCs},
+        {tls, TCs}
+    ].
+
+init_per_suite(Config) ->
+    %% This is called by emqx_machine in EMQX release
+    emqx_otel_app:configure_otel_deps(),
+    %% No release name during the test case, we need a reliable service name to query Jaeger
+    os:putenv("OTEL_SERVICE_NAME", ?OTEL_SERVICE_NAME),
+    JaegerURL = os:getenv("JAEGER_URL", "http://jaeger.emqx.net:16686"),
+    [{jaeger_url, JaegerURL} | Config].
+
+end_per_suite(_) ->
+    os:unsetenv("OTEL_SERVICE_NAME"),
+    ok.
+
+init_per_group(tcp = Group, Config) ->
+    OtelCollectorURL = os:getenv("OTEL_COLLECTOR_URL", "http://otel-collector.emqx.net:4317"),
+    [
+        {otel_collector_url, OtelCollectorURL},
+        {logs_exporter_file_path, logs_exporter_file_path(Group, Config)}
+        | Config
+    ];
+init_per_group(tls = Group, Config) ->
+    OtelCollectorURL = os:getenv(
+        "OTEL_COLLECTOR_TLS_URL", "https://otel-collector-tls.emqx.net:4317"
+    ),
+    [
+        {otel_collector_url, OtelCollectorURL},
+        {logs_exporter_file_path, logs_exporter_file_path(Group, Config)}
+        | Config
+    ].
+
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_testcase(t_distributed_trace = TC, Config) ->
+    Cluster = cluster(TC, Config),
+    [{cluster, Cluster} | Config];
+init_per_testcase(TC, Config) ->
+    Apps = emqx_cth_suite:start(apps_spec(), #{work_dir => emqx_cth_suite:work_dir(TC, Config)}),
+    [{suite_apps, Apps} | Config].
+
+end_per_testcase(t_distributed_trace = _TC, Config) ->
+    emqx_cth_cluster:stop(?config(cluster, Config)),
+    emqx_config:delete_override_conf_files(),
+    ok;
+end_per_testcase(_TC, Config) ->
+    emqx_cth_suite:stop(?config(suite_apps, Config)),
+    emqx_config:delete_override_conf_files(),
+    ok.
+
+t_trace(Config) ->
+    MqttHostPort = mqtt_host_port(),
+
+    {ok, _} = emqx_conf:update(?CONF_PATH, enabled_trace_conf(Config), #{override_to => cluster}),
+
+    Topic = <<"t/trace/test/", (atom_to_binary(?FUNCTION_NAME))/binary>>,
+    TopicNoSubs = <<"t/trace/test/nosub/", (atom_to_binary(?FUNCTION_NAME))/binary>>,
+
+    SubConn1 = connect(MqttHostPort, <<"sub1">>),
+    {ok, _, [0]} = emqtt:subscribe(SubConn1, Topic),
+    SubConn2 = connect(MqttHostPort, <<"sub2">>),
+    {ok, _, [0]} = emqtt:subscribe(SubConn2, Topic),
+    PubConn = connect(MqttHostPort, <<"pub">>),
+
+    TraceParent = traceparent(true),
+    TraceParentNotSampled = traceparent(false),
+    ok = emqtt:publish(PubConn, Topic, props(TraceParent), <<"must be traced">>, []),
+    ok = emqtt:publish(PubConn, Topic, props(TraceParentNotSampled), <<"must not be traced">>, []),
+
+    TraceParentNoSub = traceparent(true),
+    TraceParentNoSubNotSampled = traceparent(false),
+    ok = emqtt:publish(PubConn, TopicNoSubs, props(TraceParentNoSub), <<"must be traced">>, []),
+    ok = emqtt:publish(
+        PubConn, TopicNoSubs, props(TraceParentNoSubNotSampled), <<"must not be traced">>, []
+    ),
+
+    ?assertEqual(
+        ok,
+        emqx_common_test_helpers:wait_for(
+            ?FUNCTION_NAME,
+            ?LINE,
+            fun() ->
+                {ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)),
+                [Trace] = filter_traces(trace_id(TraceParent), Traces),
+                [] = filter_traces(trace_id(TraceParentNotSampled), Traces),
+                [TraceNoSub] = filter_traces(trace_id(TraceParentNoSub), Traces),
+                [] = filter_traces(trace_id(TraceParentNoSubNotSampled), Traces),
+
+                #{<<"spans">> := Spans, <<"processes">> := _} = Trace,
+                %% 2 sub spans and 1 publish process span
+                IsExpectedSpansLen = length(Spans) =:= 3,
+
+                #{<<"spans">> := SpansNoSub, <<"processes">> := _} = TraceNoSub,
+                %% Only 1 publish process span
+                IsExpectedSpansLen andalso 1 =:= length(SpansNoSub)
+            end,
+            10_000
+        )
+    ),
+    stop_conns([SubConn1, SubConn2, PubConn]).
+
+t_trace_disabled(_Config) ->
+    ?assertNot(emqx:get_config(?CONF_PATH ++ [traces, enable])),
+    %% Tracer must be actually disabled
+    ?assertEqual({otel_tracer_noop, []}, opentelemetry:get_tracer()),
+    ?assertEqual(undefined, emqx_external_trace:provider()),
+
+    Topic = <<"t/trace/test", (atom_to_binary(?FUNCTION_NAME))/binary>>,
+
+    SubConn = connect(mqtt_host_port(), <<"sub">>),
+    {ok, _, [0]} = emqtt:subscribe(SubConn, Topic),
+    PubConn = connect(mqtt_host_port(), <<"pub">>),
+
+    TraceParent = traceparent(true),
+    emqtt:publish(PubConn, Topic, props(TraceParent), <<>>, []),
+    receive
+        {publish, #{topic := Topic, properties := Props}} ->
+            %% traceparent must be propagated by EMQX even if internal otel trace is disabled
+            #{'User-Property' := [{<<"traceparent">>, TrParent}]} = Props,
+            ?assertEqual(TraceParent, TrParent)
+    after 10_000 ->
+        ct:fail("published_message_not_received")
+    end,
+
+    %%  if otel trace is registered but is actually not running, EMQX must work fine
+    %% and the message must be delivered to the subscriber
+    ok = emqx_otel_trace:toggle_registered(true),
+    TraceParent1 = traceparent(true),
+    emqtt:publish(PubConn, Topic, props(TraceParent1), <<>>, []),
+    receive
+        {publish, #{topic := Topic, properties := Props1}} ->
+            #{'User-Property' := [{<<"traceparent">>, TrParent1}]} = Props1,
+            ?assertEqual(TraceParent1, TrParent1)
+    after 10_000 ->
+        ct:fail("published_message_not_received")
+    end,
+    stop_conns([SubConn, PubConn]).
+
+t_trace_all(Config) ->
+    OtelConf = enabled_trace_conf(Config),
+    OtelConf1 = emqx_utils_maps:deep_put([<<"traces">>, <<"filter">>], OtelConf, #{
+        <<"trace_all">> => true
+    }),
+    {ok, _} = emqx_conf:update(?CONF_PATH, OtelConf1, #{override_to => cluster}),
+
+    Topic = <<"t/trace/test", (atom_to_binary(?FUNCTION_NAME))/binary>>,
+    ClientId = <<"pub-", (integer_to_binary(erlang:system_time(nanosecond)))/binary>>,
+    PubConn = connect(mqtt_host_port(), ClientId),
+    emqtt:publish(PubConn, Topic, #{}, <<>>, []),
+
+    ?assertEqual(
+        ok,
+        emqx_common_test_helpers:wait_for(
+            ?FUNCTION_NAME,
+            ?LINE,
+            fun() ->
+                {ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)),
+                Res = lists:filter(
+                    fun(#{<<"spans">> := Spans}) ->
+                        case Spans of
+                            %% Only one span is expected as there are no subscribers
+                            [#{<<"tags">> := Tags}] ->
+                                lists:any(
+                                    fun(#{<<"key">> := K, <<"value">> := Val}) ->
+                                        K =:= <<"messaging.client_id">> andalso Val =:= ClientId
+                                    end,
+                                    Tags
+                                );
+                            _ ->
+                                false
+                        end
+                    end,
+                    Traces
+                ),
+                %% Expecting exactly 1 span
+                length(Res) =:= 1
+            end,
+            10_000
+        )
+    ),
+    stop_conns([PubConn]).
+
+t_distributed_trace(Config) ->
+    [Core1, Core2, Repl] = Cluster = ?config(cluster, Config),
+    {ok, _} = rpc:call(
+        Core1,
+        emqx_conf,
+        update,
+        [?CONF_PATH, enabled_trace_conf(Config), #{override_to => cluster}]
+    ),
+    Topic = <<"t/trace/test/", (atom_to_binary(?FUNCTION_NAME))/binary>>,
+
+    SubConn1 = connect(mqtt_host_port(Core1), <<"sub1">>),
+    {ok, _, [0]} = emqtt:subscribe(SubConn1, Topic),
+    SubConn2 = connect(mqtt_host_port(Core2), <<"sub2">>),
+    {ok, _, [0]} = emqtt:subscribe(SubConn2, Topic),
+    SubConn3 = connect(mqtt_host_port(Repl), <<"sub3">>),
+    {ok, _, [0]} = emqtt:subscribe(SubConn3, Topic),
+
+    PubConn = connect(mqtt_host_port(Repl), <<"pub">>),
+
+    TraceParent = traceparent(true),
+    TraceParentNotSampled = traceparent(false),
+
+    ok = emqtt:publish(PubConn, Topic, props(TraceParent), <<"must be traced">>, []),
+    ok = emqtt:publish(PubConn, Topic, props(TraceParentNotSampled), <<"must not be traced">>, []),
+
+    ?assertEqual(
+        ok,
+        emqx_common_test_helpers:wait_for(
+            ?FUNCTION_NAME,
+            ?LINE,
+            fun() ->
+                {ok, #{<<"data">> := Traces}} = get_jaeger_traces(?config(jaeger_url, Config)),
+                [Trace] = filter_traces(trace_id(TraceParent), Traces),
+
+                [] = filter_traces(trace_id(TraceParentNotSampled), Traces),
+
+                #{<<"spans">> := Spans, <<"processes">> := Procs} = Trace,
+
+                %% 3 sub spans and 1 publish process span
+                4 = length(Spans),
+                [_, _, _] = SendSpans = filter_spans(<<"send_published_message">>, Spans),
+
+                IsAllNodesSpans =
+                    lists:sort([atom_to_binary(N) || N <- Cluster]) =:=
+                        lists:sort([span_node(S, Procs) || S <- SendSpans]),
+
+                [PubSpan] = filter_spans(<<"process_message">>, Spans),
+                atom_to_binary(Repl) =:= span_node(PubSpan, Procs) andalso IsAllNodesSpans
+            end,
+            10_000
+        )
+    ),
+    stop_conns([SubConn1, SubConn2, SubConn3, PubConn]).
+
+%% Keeping this test in this SUITE as there is no separate module for logs
+t_log(Config) ->
+    Level = emqx_logger:get_primary_log_level(),
+    LogsConf = #{
+        <<"logs">> => #{
+            <<"enable">> => true,
+            <<"level">> => atom_to_binary(Level),
+            <<"scheduled_delay">> => <<"20ms">>
+        },
+        <<"exporter">> => exporter_conf(Config)
+    },
+    {ok, _} = emqx_conf:update(?CONF_PATH, LogsConf, #{override_to => cluster}),
+
+    %% Ids are only needed for matching logs in the file exported by otel-collector
+    Id = integer_to_binary(otel_id_generator:generate_trace_id()),
+    ?SLOG(Level, #{msg => "otel_test_log_message", id => Id}),
+    Id1 = integer_to_binary(otel_id_generator:generate_trace_id()),
+    logger:Level("Ordinary log message, id: ~p", [Id1]),
+
+    ?assertEqual(
+        ok,
+        emqx_common_test_helpers:wait_for(
+            ?FUNCTION_NAME,
+            ?LINE,
+            fun() ->
+                {ok, Logs} = file:read_file(?config(logs_exporter_file_path, Config)),
+                binary:match(Logs, Id) =/= nomatch andalso binary:match(Logs, Id1) =/= nomatch
+            end,
+            10_000
+        )
+    ).
+
+logs_exporter_file_path(Group, Config) ->
+    filename:join([project_dir(Config), logs_exporter_filename(Group)]).
+
+project_dir(Config) ->
+    filename:join(
+        lists:takewhile(
+            fun(PathPart) -> PathPart =/= "_build" end,
+            filename:split(?config(priv_dir, Config))
+        )
+    ).
+
+logs_exporter_filename(tcp) ->
+    ".ci/docker-compose-file/otel/otel-collector.json";
+logs_exporter_filename(tls) ->
+    ".ci/docker-compose-file/otel/otel-collector-tls.json".
+
+enabled_trace_conf(TcConfig) ->
+    #{
+        <<"traces">> => #{
+            <<"enable">> => true,
+            <<"scheduled_delay">> => <<"50ms">>
+        },
+        <<"exporter">> => exporter_conf(TcConfig)
+    }.
+
+exporter_conf(TcConfig) ->
+    #{<<"endpoint">> => ?config(otel_collector_url, TcConfig)}.
+
+span_node(#{<<"processID">> := ProcId}, Procs) ->
+    #{ProcId := #{<<"tags">> := ProcTags}} = Procs,
+    [#{<<"value">> := Node}] = lists:filter(
+        fun(#{<<"key">> := K}) ->
+            K =:= <<"service.instance.id">>
+        end,
+        ProcTags
+    ),
+    Node.
+
+trace_id(<<"00-", TraceId:32/binary, _/binary>>) ->
+    TraceId.
+
+filter_traces(TraceId, Traces) ->
+    lists:filter(fun(#{<<"traceID">> := TrId}) -> TrId =:= TraceId end, Traces).
+
+filter_spans(OpName, Spans) ->
+    lists:filter(fun(#{<<"operationName">> := Name}) -> Name =:= OpName end, Spans).
+
+get_jaeger_traces(JagerBaseURL) ->
+    case httpc:request(JagerBaseURL ++ "/api/traces?service=" ++ ?OTEL_SERVICE_NAME) of
+        {ok, {{_, 200, _}, _, RespBpdy}} ->
+            {ok, emqx_utils_json:decode(RespBpdy)};
+        Err ->
+            ct:pal("Jager error: ~p", Err),
+            Err
+    end.
+
+stop_conns(Conns) ->
+    lists:foreach(fun emqtt:stop/1, Conns).
+
+props(TraceParent) ->
+    #{'User-Property' => [{<<"traceparent">>, TraceParent}]}.
+
+traceparent(IsSampled) ->
+    TraceId = otel_id_generator:generate_trace_id(),
+    SpanId = otel_id_generator:generate_span_id(),
+    {ok, TraceIdHexStr} = otel_utils:format_binary_string("~32.16.0b", [TraceId]),
+    {ok, SpanIdHexStr} = otel_utils:format_binary_string("~16.16.0b", [SpanId]),
+    TraceFlags =
+        case IsSampled of
+            true -> <<"01">>;
+            false -> <<"00">>
+        end,
+    <<"00-", TraceIdHexStr/binary, "-", SpanIdHexStr/binary, "-", TraceFlags/binary>>.
+
+connect({Host, Port}, ClientId) ->
+    {ok, ConnPid} = emqtt:start_link([
+        {proto_ver, v5},
+        {host, Host},
+        {port, Port},
+        {clientid, ClientId}
+    ]),
+    {ok, _} = emqtt:connect(ConnPid),
+    ConnPid.
+
+mqtt_host_port() ->
+    emqx:get_config([listeners, tcp, default, bind]).
+
+mqtt_host_port(Node) ->
+    rpc:call(Node, emqx, get_config, [[listeners, tcp, default, bind]]).
+
+cluster(TC, Config) ->
+    Nodes = emqx_cth_cluster:start(
+        [
+            {otel_trace_core1, #{role => core, apps => apps_spec()}},
+            {otel_trace_core2, #{role => core, apps => apps_spec()}},
+            {otel_trace_replicant, #{role => replicant, apps => apps_spec()}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(TC, Config)}
+    ),
+    Nodes.
+
+apps_spec() ->
+    [
+        emqx,
+        emqx_conf,
+        emqx_management,
+        emqx_opentelemetry
+    ].

+ 2 - 2
apps/emqx_utils/include/emqx_message.hrl

@@ -36,8 +36,8 @@
     payload :: emqx_types:payload(),
     %% Timestamp (Unit: millisecond)
     timestamp :: integer(),
-    %% not used so far, for future extension
-    extra = [] :: term()
+    %% Miscellaneous extensions, currently used for OpenTelemetry context propagation
+    extra = #{} :: term()
 }).
 
 -endif.

+ 1 - 0
changes/ce/feat-11984.en.md

@@ -0,0 +1 @@
+Implemented Open Telemetry distributed tracing feature.

+ 1 - 31
mix.exs

@@ -98,37 +98,7 @@ defmodule EMQXUmbrella.MixProject do
       # set by hackney (dependency)
       {:ssl_verify_fun, "1.1.7", override: true},
       {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true},
-      {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true},
-      {:opentelemetry_api,
-       github: "emqx/opentelemetry-erlang",
-       sparse: "apps/opentelemetry_api",
-       tag: "v1.4.2-emqx",
-       override: true,
-       runtime: false},
-      {:opentelemetry,
-       github: "emqx/opentelemetry-erlang",
-       sparse: "apps/opentelemetry",
-       tag: "v1.4.2-emqx",
-       override: true,
-       runtime: false},
-      {:opentelemetry_api_experimental,
-       github: "emqx/opentelemetry-erlang",
-       sparse: "apps/opentelemetry_api_experimental",
-       tag: "v1.4.2-emqx",
-       override: true,
-       runtime: false},
-      {:opentelemetry_experimental,
-       github: "emqx/opentelemetry-erlang",
-       sparse: "apps/opentelemetry_experimental",
-       tag: "v1.4.2-emqx",
-       override: true,
-       runtime: false},
-      {:opentelemetry_exporter,
-       github: "emqx/opentelemetry-erlang",
-       sparse: "apps/opentelemetry_exporter",
-       tag: "v1.4.2-emqx",
-       override: true,
-       runtime: false}
+      {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true}
     ] ++
       emqx_apps(profile_info, version) ++
       enterprise_deps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep()

+ 0 - 8
rebar.config

@@ -84,14 +84,6 @@
     %% in conflict by erlavro and rocketmq
     , {jsone, {git, "https://github.com/emqx/jsone.git", {tag, "1.7.1"}}}
     , {uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.6"}}}
-    %% trace
-    , {opentelemetry_api, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_api"}}
-    , {opentelemetry, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry"}}
-    %% log metrics
-    , {opentelemetry_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_experimental"}}
-    , {opentelemetry_api_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_api_experimental"}}
-    %% export
-    , {opentelemetry_exporter, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_exporter"}}
     , {ssl_verify_fun, "1.1.7"}
     ]}.
 

+ 14 - 2
rel/i18n/emqx_otel_schema.hocon

@@ -11,11 +11,14 @@ otel_logs.label: "Open Telemetry Logs"
 otel_metrics.desc: "Open Telemetry Metrics configuration."
 otel_metrics.label: "Open Telemetry Metrics"
 
+otel_traces.desc: "Open Telemetry Traces configuration."
+otel_traces.label: "Open Telemetry Traces"
+
 enable.desc: "Enable or disable Open Telemetry signal."
 enable.label: "Enable."
 
-exporter.desc: "Open Telemetry Exporter"
-exporter.label: "Exporter"
+otel_exporter.desc: "Open Telemetry Exporter"
+otel_exporter.label: "Exporter"
 
 max_queue_size.desc:
 """The maximum queue size. After the size is reached Open Telemetry signals are dropped."""
@@ -41,4 +44,13 @@ otel_log_handler_level.desc:
 """The log level of the Open Telemetry log handler."""
 otel_log_handler_level.label: "Log Level"
 
+trace_filter.desc: "Open Telemetry Trace Filter configuration"
+trace_filter.label: "Trace Filter"
+
+trace_all.desc:
+"""If enabled, all published messages are traced, a new trace ID is generated if it can't be extracted from the message.
+Otherwise, only messages published with trace context are traced. Disabled by default."""
+trace_all.label: "Trace All"
+
+
 }

+ 3 - 0
scripts/ct/run.sh

@@ -243,6 +243,9 @@ for dep in ${CT_DEPS}; do
         ldap)
             FILES+=( '.ci/docker-compose-file/docker-compose-ldap.yaml' )
             ;;
+        otel)
+            FILES+=( '.ci/docker-compose-file/docker-compose-otel.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1