Просмотр исходного кода

fix(http_bridge): don't attempt to convert headers to atoms

Fixes https://emqx.atlassian.net/browse/EMQX-10653
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
5c8dc092a1
25 измененных файлов с 491 добавлено и 48 удалено
  1. 1 1
      apps/emqx_bridge/src/emqx_bridge.app.src
  2. 23 6
      apps/emqx_bridge/src/emqx_bridge_api.erl
  3. 14 1
      apps/emqx_bridge/src/emqx_bridge_resource.erl
  4. 13 0
      apps/emqx_bridge/test/emqx_bridge_testlib.erl
  5. 1 1
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src
  6. 4 3
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl
  7. 4 4
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl
  8. 4 2
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl
  9. 4 3
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl
  10. 1 2
      apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
  11. 66 12
      apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl
  12. 1 1
      apps/emqx_bridge_http/src/emqx_bridge_http.app.src
  13. 10 1
      apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
  14. 89 1
      apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl
  15. 118 0
      apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl
  16. 1 1
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src
  17. 9 1
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
  18. 21 1
      apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl
  19. 1 1
      apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src
  20. 2 0
      apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl
  21. 32 2
      apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl
  22. 1 1
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
  23. 15 1
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
  24. 40 2
      apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl
  25. 16 0
      apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.1.25"},
+    {vsn, "0.1.26"},
     {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 23 - 6
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -541,7 +541,7 @@ schema("/bridges_probe") ->
     case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of
         {ok, #{body := #{<<"type">> := ConnType} = Params}} ->
             Params1 = maybe_deobfuscate_bridge_probe(Params),
-            case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of
+            try emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of
                 ok ->
                     ?NO_CONTENT;
                 {error, #{kind := validation_error} = Reason} ->
@@ -553,6 +553,15 @@ schema("/bridges_probe") ->
                             _ -> Reason0
                         end,
                     ?BAD_REQUEST('TEST_FAILED', Reason)
+            catch
+                %% We need to catch hocon validation errors here as well because,
+                %% currently, when defining the API union member selector, we can only use
+                %% references to fields, and they don't share whole-bridge validators if
+                %% they exist.  Such validators will only be triggered by
+                %% `create_dry_run'...
+                throw:{_Schema, [#{kind := validation_error} = Reason0]} ->
+                    Reason = redact(Reason0),
+                    ?BAD_REQUEST('TEST_FAILED', map_to_json(Reason))
             end;
         BadRequest ->
             BadRequest
@@ -608,7 +617,7 @@ create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
         {ok, _} ->
             lookup_from_all_nodes(BridgeType, BridgeName, HttpStatusCode);
         {error, Reason} when is_map(Reason) ->
-            ?BAD_REQUEST(map_to_json(emqx_utils:redact(Reason)))
+            ?BAD_REQUEST(map_to_json(redact(Reason)))
     end.
 
 get_metrics_from_local_node(BridgeType, BridgeName) ->
@@ -1071,7 +1080,15 @@ deobfuscate(NewConf, OldConf) ->
         NewConf
     ).
 
-map_to_json(M) ->
-    emqx_utils_json:encode(
-        emqx_utils_maps:jsonable_map(M, fun(K, V) -> {K, emqx_utils_maps:binary_string(V)} end)
-    ).
+map_to_json(M0) ->
+    %% When dealing with Hocon validation errors, `value' might contain non-serializable
+    %% values (e.g.: user_lookup_fun), so we try again without that key if serialization
+    %% fails as a best effort.
+    M1 = emqx_utils_maps:jsonable_map(M0, fun(K, V) -> {K, emqx_utils_maps:binary_string(V)} end),
+    try
+        emqx_utils_json:encode(M1)
+    catch
+        error:_ ->
+            M2 = maps:without([value], M1),
+            emqx_utils_json:encode(M2)
+    end.

+ 14 - 1
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -261,7 +261,17 @@ recreate(Type, Name, Conf, Opts) ->
 create_dry_run(Type, Conf0) ->
     TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
     TmpPath = emqx_utils:safe_filename(TmpName),
-    Conf = emqx_utils_maps:safe_atom_key_map(Conf0),
+    %% Already typechecked, no need to catch errors
+    TypeBin = bin(Type),
+    TypeAtom = safe_atom(Type),
+    Conf1 = maps:without([<<"name">>], Conf0),
+    RawConf = #{<<"bridges">> => #{TypeBin => #{<<"a">> => Conf1}}},
+    #{bridges := #{TypeAtom := #{a := Conf}}} =
+        hocon_tconf:check_plain(
+            emqx_bridge_schema,
+            RawConf,
+            #{atom_key => true, required => false}
+        ),
     case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
         {error, Reason} ->
             {error, Reason};
@@ -415,6 +425,9 @@ bin(Bin) when is_binary(Bin) -> Bin;
 bin(Str) when is_list(Str) -> list_to_binary(Str);
 bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
 
+safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8);
+safe_atom(Atom) when is_atom(Atom) -> Atom.
+
 parse_opts(Conf, Opts0) ->
     override_start_after_created(Conf, Opts0).
 

+ 13 - 0
apps/emqx_bridge/test/emqx_bridge_testlib.erl

@@ -212,6 +212,19 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
     ct:pal("bridge probe result: ~p", [Res]),
     Res.
 
+try_decode_error(Body0) ->
+    case emqx_utils_json:safe_decode(Body0, [return_maps]) of
+        {ok, #{<<"message">> := Msg0} = Body1} ->
+            case emqx_utils_json:safe_decode(Msg0, [return_maps]) of
+                {ok, Msg1} -> Body1#{<<"message">> := Msg1};
+                {error, _} -> Body1
+            end;
+        {ok, Body1} ->
+            Body1;
+        {error, _} ->
+            Body0
+    end.
+
 create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
     create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).
 

+ 1 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_gcp_pubsub, [
     {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [
         kernel,

+ 4 - 3
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl

@@ -363,9 +363,9 @@ service_account_json_validator(Map) ->
         {[], <<"service_account">>} ->
             ok;
         {[], Type} ->
-            {error, {wrong_type, Type}};
+            {error, #{wrong_type => Type}};
         {_, _} ->
-            {error, {missing_keys, MissingKeys}}
+            {error, #{missing_keys => MissingKeys}}
     end.
 
 service_account_json_converter(Map) when is_map(Map) ->
@@ -382,7 +382,8 @@ service_account_json_converter(Val) ->
 
 consumer_topic_mapping_validator(_TopicMapping = []) ->
     {error, "There must be at least one GCP PubSub-MQTT topic mapping"};
-consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
+consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) ->
+    TopicMapping = [emqx_utils_maps:binary_key_map(TM) || TM <- TopicMapping0],
     NumEntries = length(TopicMapping),
     PubSubTopics = [KT || #{<<"pubsub_topic">> := KT} <- TopicMapping],
     DistinctPubSubTopics = length(lists:usort(PubSubTopics)),

+ 4 - 4
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl

@@ -220,10 +220,10 @@ parse_jwt_config(ResourceId, #{
     service_account_json := ServiceAccountJSON
 }) ->
     #{
-        project_id := ProjectId,
-        private_key_id := KId,
-        private_key := PrivateKeyPEM,
-        client_email := ServiceAccountEmail
+        <<"project_id">> := ProjectId,
+        <<"private_key_id">> := KId,
+        <<"private_key">> := PrivateKeyPEM,
+        <<"client_email">> := ServiceAccountEmail
     } = ServiceAccountJSON,
     %% fixed for pubsub; trailing slash is important.
     Aud = <<"https://pubsub.googleapis.com/">>,

+ 4 - 2
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl

@@ -64,7 +64,9 @@ callback_mode() -> async_if_possible.
 query_mode(_Config) -> no_queries.
 
 -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
-on_start(InstanceId, Config) ->
+on_start(InstanceId, Config0) ->
+    %% ensure it's a binary key map
+    Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
     case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
         {ok, Client} ->
             start_consumers(InstanceId, Client, Config);
@@ -125,7 +127,7 @@ start_consumers(InstanceId, Client, Config) ->
         consumer := ConsumerConfig0,
         hookpoint := Hookpoint,
         resource_opts := #{request_ttl := RequestTTL},
-        service_account_json := #{project_id := ProjectId}
+        service_account_json := #{<<"project_id">> := ProjectId}
     } = Config,
     ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0),
     TopicMapping = maps:get(topic_mapping, ConsumerConfig1),

+ 4 - 3
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -50,15 +50,16 @@ callback_mode() -> async_if_possible.
 query_mode(_Config) -> async.
 
 -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
-on_start(InstanceId, Config) ->
+on_start(InstanceId, Config0) ->
     ?SLOG(info, #{
         msg => "starting_gcp_pubsub_bridge",
-        config => Config
+        config => Config0
     }),
+    Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
     #{
         payload_template := PayloadTemplate,
         pubsub_topic := PubSubTopic,
-        service_account_json := #{project_id := ProjectId}
+        service_account_json := #{<<"project_id">> := ProjectId}
     } = Config,
     case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
         {ok, Client} ->

+ 1 - 2
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -275,14 +275,13 @@ ensure_topic(Config, Topic) ->
 
 start_control_client() ->
     RawServiceAccount = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
-    ServiceAccount = emqx_utils_maps:unsafe_atom_key_map(RawServiceAccount),
     ConnectorConfig =
         #{
             connect_timeout => 5_000,
             max_retries => 0,
             pool_size => 1,
             resource_opts => #{request_ttl => 5_000},
-            service_account_json => ServiceAccount
+            service_account_json => RawServiceAccount
         },
     PoolName = <<"control_connector">>,
     {ok, Client} = emqx_bridge_gcp_pubsub_client:start(PoolName, ConnectorConfig),

+ 66 - 12
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl

@@ -196,16 +196,27 @@ create_bridge_http(Config, GCPPubSubConfigOverrides) ->
     Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
     AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
     ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
-    ProbeResult = emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params),
+    Opts = #{return_all => true},
+    ProbeResult = emqx_mgmt_api_test_util:request_api(
+        post, ProbePath, "", AuthHeader, Params, Opts
+    ),
     ct:pal("creating bridge (via http): ~p", [Params]),
     ct:pal("probe result: ~p", [ProbeResult]),
     Res =
-        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
-            {ok, Res0} -> {ok, emqx_utils_json:decode(Res0, [return_maps])};
-            Error -> Error
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {Status, Headhers, Res0}} ->
+                {ok, {Status, Headhers, emqx_utils_json:decode(Res0, [return_maps])}};
+            {error, {Status, Headers, Body0}} ->
+                {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
+            Error ->
+                Error
         end,
     ct:pal("bridge creation result: ~p", [Res]),
     ?assertEqual(element(1, ProbeResult), element(1, Res)),
+    case ProbeResult of
+        {error, {{_, 500, _}, _, _}} -> error({bad_probe_result, ProbeResult});
+        _ -> ok
+    end,
     Res.
 
 create_rule_and_action_http(Config) ->
@@ -821,7 +832,7 @@ t_not_of_service_account_type(Config) ->
     ?assertMatch(
         {error, #{
             kind := validation_error,
-            reason := {wrong_type, <<"not a service account">>},
+            reason := #{wrong_type := <<"not a service account">>},
             %% should be censored as it contains secrets
             value := <<"******">>
         }},
@@ -832,6 +843,23 @@ t_not_of_service_account_type(Config) ->
             }
         )
     ),
+    ?assertMatch(
+        {error,
+            {{_, 400, _}, _, #{
+                <<"message">> := #{
+                    <<"kind">> := <<"validation_error">>,
+                    <<"reason">> := #{<<"wrong_type">> := <<"not a service account">>},
+                    %% should be censored as it contains secrets
+                    <<"value">> := <<"******">>
+                }
+            }}},
+        create_bridge_http(
+            Config,
+            #{
+                <<"service_account_json">> => #{<<"type">> => <<"not a service account">>}
+            }
+        )
+    ),
     ok.
 
 t_json_missing_fields(Config) ->
@@ -840,13 +868,15 @@ t_json_missing_fields(Config) ->
         {error, #{
             kind := validation_error,
             reason :=
-                {missing_keys, [
-                    <<"client_email">>,
-                    <<"private_key">>,
-                    <<"private_key_id">>,
-                    <<"project_id">>,
-                    <<"type">>
-                ]},
+                #{
+                    missing_keys := [
+                        <<"client_email">>,
+                        <<"private_key">>,
+                        <<"private_key_id">>,
+                        <<"project_id">>,
+                        <<"type">>
+                    ]
+                },
             %% should be censored as it contains secrets
             value := <<"******">>
         }},
@@ -855,6 +885,30 @@ t_json_missing_fields(Config) ->
             | Config
         ])
     ),
+    ?assertMatch(
+        {error,
+            {{_, 400, _}, _, #{
+                <<"message">> := #{
+                    <<"kind">> := <<"validation_error">>,
+                    <<"reason">> :=
+                        #{
+                            <<"missing_keys">> := [
+                                <<"client_email">>,
+                                <<"private_key">>,
+                                <<"private_key_id">>,
+                                <<"project_id">>,
+                                <<"type">>
+                            ]
+                        },
+                    %% should be censored as it contains secrets
+                    <<"value">> := <<"******">>
+                }
+            }}},
+        create_bridge_http([
+            {gcp_pubsub_config, GCPPubSubConfig0#{<<"service_account_json">> := #{}}}
+            | Config
+        ])
+    ),
     ok.
 
 t_invalid_private_key(Config) ->

+ 1 - 1
apps/emqx_bridge_http/src/emqx_bridge_http.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_http, [
     {description, "EMQX HTTP Bridge and Connector Application"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
     {env, []},

+ 10 - 1
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -155,7 +155,16 @@ desc("request") ->
 desc(_) ->
     undefined.
 
-validate_method(M) when M =:= <<"post">>; M =:= <<"put">>; M =:= <<"get">>; M =:= <<"delete">> ->
+validate_method(M) when
+    M =:= <<"post">>;
+    M =:= <<"put">>;
+    M =:= <<"get">>;
+    M =:= <<"delete">>;
+    M =:= post;
+    M =:= put;
+    M =:= get;
+    M =:= delete
+->
     ok;
 validate_method(M) ->
     case string:find(M, "${") of

+ 89 - 1
apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl

@@ -82,6 +82,14 @@ init_per_testcase(t_rule_action_expired, Config) ->
         {bridge_name, ?BRIDGE_NAME}
         | Config
     ];
+init_per_testcase(t_bridge_probes_header_atoms, Config) ->
+    HTTPPath = <<"/path">>,
+    ServerSSLOpts = false,
+    {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
+        _Port = random, HTTPPath, ServerSSLOpts
+    ),
+    ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()),
+    [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
 init_per_testcase(_TestCase, Config) ->
     Server = start_http_server(#{response_delay_ms => 0}),
     [{http_server, Server} | Config].
@@ -89,7 +97,8 @@ init_per_testcase(_TestCase, Config) ->
 end_per_testcase(TestCase, _Config) when
     TestCase =:= t_path_not_found;
     TestCase =:= t_too_many_requests;
-    TestCase =:= t_rule_action_expired
+    TestCase =:= t_rule_action_expired;
+    TestCase =:= t_bridge_probes_header_atoms
 ->
     ok = emqx_bridge_http_connector_test_server:stop(),
     persistent_term:erase({?MODULE, times_called}),
@@ -292,6 +301,22 @@ make_bridge(Config) ->
     ),
     emqx_bridge_resource:bridge_id(Type, Name).
 
+success_http_handler() ->
+    TestPid = self(),
+    fun(Req0, State) ->
+        {ok, Body, Req} = cowboy_req:read_body(Req0),
+        Headers = cowboy_req:headers(Req),
+        ct:pal("http request received: ~p", [#{body => Body, headers => Headers}]),
+        TestPid ! {http, Headers, Body},
+        Rep = cowboy_req:reply(
+            200,
+            #{<<"content-type">> => <<"text/plain">>},
+            <<"hello">>,
+            Req
+        ),
+        {ok, Rep, State}
+    end.
+
 not_found_http_handler() ->
     TestPid = self(),
     fun(Req0, State) ->
@@ -613,6 +638,55 @@ t_rule_action_expired(Config) ->
     ),
     ok.
 
+t_bridge_probes_header_atoms(Config) ->
+    #{port := Port, path := Path} = ?config(http_server, Config),
+    ?check_trace(
+        begin
+            LocalTopic = <<"t/local/topic">>,
+            BridgeConfig0 = bridge_async_config(#{
+                type => ?BRIDGE_TYPE,
+                name => ?BRIDGE_NAME,
+                port => Port,
+                path => Path,
+                resume_interval => "100ms",
+                connect_timeout => "1s",
+                request_timeout => "100ms",
+                resource_request_ttl => "100ms",
+                local_topic => LocalTopic
+            }),
+            BridgeConfig = BridgeConfig0#{
+                <<"headers">> => #{
+                    <<"some-non-existent-atom">> => <<"x">>
+                }
+            },
+            ?assertMatch(
+                {ok, {{_, 204, _}, _Headers, _Body}},
+                probe_bridge_api(BridgeConfig)
+            ),
+            ?assertMatch(
+                {ok, {{_, 201, _}, _Headers, _Body}},
+                emqx_bridge_testlib:create_bridge_api(
+                    ?BRIDGE_TYPE,
+                    ?BRIDGE_NAME,
+                    BridgeConfig
+                )
+            ),
+            Msg = emqx_message:make(LocalTopic, <<"hi">>),
+            emqx:publish(Msg),
+            receive
+                {http, Headers, _Body} ->
+                    ?assertMatch(#{<<"some-non-existent-atom">> := <<"x">>}, Headers),
+                    ok
+            after 5_000 ->
+                ct:pal("mailbox: ~p", [process_info(self(), messages)]),
+                ct:fail("request not made")
+            end,
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %% helpers
 do_t_async_retries(TestContext, Error, Fn) ->
     #{error_attempts := ErrorAttempts} = TestContext,
@@ -659,3 +733,17 @@ remove_message_id(MessageIDs, #{body := IDBin}) ->
     ID = erlang:binary_to_integer(IDBin),
     %% It is acceptable to get the same message more than once
     maps:without([ID], MessageIDs).
+
+probe_bridge_api(BridgeConfig) ->
+    Params = BridgeConfig#{<<"type">> => ?BRIDGE_TYPE, <<"name">> => ?BRIDGE_NAME},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("probing bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
+            Error -> Error
+        end,
+    ct:pal("bridge probe result: ~p", [Res]),
+    Res.

+ 118 - 0
apps/emqx_bridge_http/test/emqx_bridge_http_connector_tests.erl

@@ -91,3 +91,121 @@ is_unwrapped_headers(Headers) ->
 is_unwrapped_header({_, V}) when is_function(V) -> false;
 is_unwrapped_header({_, [{str, _V}]}) -> throw(unexpected_tmpl_token);
 is_unwrapped_header(_) -> true.
+
+method_validator_test() ->
+    Conf0 = parse(webhook_config_hocon()),
+    ?assertMatch(
+        #{<<"method">> := _},
+        emqx_utils_maps:deep_get([<<"bridges">>, <<"webhook">>, <<"a">>], Conf0)
+    ),
+    lists:foreach(
+        fun(Method) ->
+            Conf1 = emqx_utils_maps:deep_put(
+                [<<"bridges">>, <<"webhook">>, <<"a">>, <<"method">>],
+                Conf0,
+                Method
+            ),
+            ?assertMatch(
+                #{},
+                check(Conf1),
+                #{method => Method}
+            ),
+            ?assertMatch(
+                #{},
+                check_atom_key(Conf1),
+                #{method => Method}
+            ),
+            ok
+        end,
+        [<<"post">>, <<"put">>, <<"get">>, <<"delete">>]
+    ),
+    lists:foreach(
+        fun(Method) ->
+            Conf1 = emqx_utils_maps:deep_put(
+                [<<"bridges">>, <<"webhook">>, <<"a">>, <<"method">>],
+                Conf0,
+                Method
+            ),
+            ?assertThrow(
+                {_, [
+                    #{
+                        kind := validation_error,
+                        reason := not_a_enum_symbol
+                    }
+                ]},
+                check(Conf1),
+                #{method => Method}
+            ),
+            ?assertThrow(
+                {_, [
+                    #{
+                        kind := validation_error,
+                        reason := not_a_enum_symbol
+                    }
+                ]},
+                check_atom_key(Conf1),
+                #{method => Method}
+            ),
+            ok
+        end,
+        [<<"x">>, <<"patch">>, <<"options">>]
+    ),
+    ok.
+
+%%===========================================================================
+%% Helper functions
+%%===========================================================================
+
+parse(Hocon) ->
+    {ok, Conf} = hocon:binary(Hocon),
+    Conf.
+
+%% what bridge creation does
+check(Conf) when is_map(Conf) ->
+    hocon_tconf:check_plain(emqx_bridge_schema, Conf).
+
+%% what bridge probe does
+check_atom_key(Conf) when is_map(Conf) ->
+    hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}).
+
+%%===========================================================================
+%% Data section
+%%===========================================================================
+
+%% erlfmt-ignore
+webhook_config_hocon() ->
+"""
+bridges.webhook.a {
+  body = \"${.}\"
+  connect_timeout = 15s
+  enable = false
+  enable_pipelining = 100
+  headers {content-type = \"application/json\", jjjjjjjjjjjjjjjjjjj = jjjjjjj}
+  max_retries = 2
+  method = post
+  pool_size = 8
+  pool_type = random
+  resource_opts {
+    health_check_interval = 15s
+    inflight_window = 100
+    max_buffer_bytes = 1GB
+    query_mode = async
+    request_ttl = 45s
+    start_after_created = true
+    start_timeout = 5s
+    worker_pool_size = 4
+  }
+  ssl {
+    ciphers = []
+    depth = 10
+    enable = false
+    hibernate_after = 5s
+    log_level = notice
+    reuse_sessions = true
+    secure_renegotiate = true
+    verify = verify_peer
+    versions = [tlsv1.3, tlsv1.2]
+  }
+  url = \"http://some.host:4000/api/echo\"
+}
+""".

+ 1 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_kafka, [
     {description, "EMQX Enterprise Kafka Bridge"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, [emqx_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,

+ 9 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -528,7 +528,8 @@ kafka_producer_converter(Config, _HoconOpts) ->
 
 consumer_topic_mapping_validator(_TopicMapping = []) ->
     {error, "There must be at least one Kafka-MQTT topic mapping"};
-consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
+consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) ->
+    TopicMapping = [emqx_utils_maps:binary_key_map(TM) || TM <- TopicMapping0],
     NumEntries = length(TopicMapping),
     KafkaTopics = [KT || #{<<"kafka_topic">> := KT} <- TopicMapping],
     DistinctKafkaTopics = length(lists:usort(KafkaTopics)),
@@ -539,6 +540,13 @@ consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
             {error, "Kafka topics must not be repeated in a bridge"}
     end.
 
+producer_strategy_key_validator(
+    #{
+        partition_strategy := _,
+        message := #{key := _}
+    } = Conf
+) ->
+    producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf));
 producer_strategy_key_validator(#{
     <<"partition_strategy">> := key_dispatch,
     <<"message">> := #{<<"key">> := ""}

+ 21 - 1
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl

@@ -166,11 +166,24 @@ message_key_dispatch_validations_test() ->
         ]},
         check(Conf)
     ),
+    %% ensure atoms exist
+    _ = [myproducer],
+    ?assertThrow(
+        {_, [
+            #{
+                path := "bridges.kafka.myproducer.kafka",
+                reason := "Message key cannot be empty when `key_dispatch` strategy is used"
+            }
+        ]},
+        check_atom_key(Conf)
+    ),
     ok.
 
 tcp_keepalive_validation_test_() ->
     ProducerConf = parse(kafka_producer_new_hocon()),
     ConsumerConf = parse(kafka_consumer_hocon()),
+    %% ensure atoms exist
+    _ = [my_producer, my_consumer],
     test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++
         test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf).
 
@@ -184,7 +197,9 @@ test_keepalive_validation(Name, Conf) ->
     InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>),
     InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2],
     [?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++
-        [?_assertThrow(_, check(C)) || C <- InvalidConfs].
+        [?_assertMatch(#{bridges := _}, check_atom_key(C)) || C <- ValidConfs] ++
+        [?_assertThrow(_, check(C)) || C <- InvalidConfs] ++
+        [?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs].
 
 %%===========================================================================
 %% Helper functions
@@ -194,9 +209,14 @@ parse(Hocon) ->
     {ok, Conf} = hocon:binary(Hocon),
     Conf.
 
+%% what bridge creation does
 check(Conf) when is_map(Conf) ->
     hocon_tconf:check_plain(emqx_bridge_schema, Conf).
 
+%% what bridge probe does
+check_atom_key(Conf) when is_map(Conf) ->
+    hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}).
+
 %%===========================================================================
 %% Data section
 %%===========================================================================

+ 1 - 1
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_oracle, [
     {description, "EMQX Enterprise Oracle Database Bridge"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 0
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl

@@ -108,6 +108,8 @@ type_field(Type) ->
 name_field() ->
     {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
 
+config_validator(#{server := _} = Config) ->
+    config_validator(emqx_utils_maps:binary_key_map(Config));
 config_validator(#{<<"server">> := Server} = Config) when
     not is_map(Server) andalso
         not is_map_key(<<"sid">>, Config) andalso

+ 32 - 2
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -305,6 +305,8 @@ create_bridge_api(Config, Overrides) ->
         case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
             {ok, {Status, Headers, Body0}} ->
                 {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
+            {error, {Status, Headers, Body0}} ->
+                {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
             Error ->
                 Error
         end,
@@ -348,8 +350,12 @@ probe_bridge_api(Config, Overrides) ->
     ct:pal("probing bridge (via http): ~p", [Params]),
     Res =
         case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
-            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
-            Error -> Error
+            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} ->
+                {ok, Res0};
+            {error, {Status, Headers, Body0}} ->
+                {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
+            Error ->
+                Error
         end,
     ct:pal("bridge probe result: ~p", [Res]),
     Res.
@@ -630,6 +636,30 @@ t_no_sid_nor_service_name(Config0) ->
         {error, #{kind := validation_error, reason := "neither SID nor Service Name was set"}},
         create_bridge(Config)
     ),
+    ?assertMatch(
+        {error,
+            {{_, 400, _}, _, #{
+                <<"message">> := #{
+                    <<"kind">> := <<"validation_error">>,
+                    <<"reason">> := <<"neither SID nor Service Name was set">>,
+                    %% should be censored as it contains secrets
+                    <<"value">> := #{<<"password">> := <<"******">>}
+                }
+            }}},
+        create_bridge_api(Config)
+    ),
+    ?assertMatch(
+        {error,
+            {{_, 400, _}, _, #{
+                <<"message">> := #{
+                    <<"kind">> := <<"validation_error">>,
+                    <<"reason">> := <<"neither SID nor Service Name was set">>,
+                    %% should be censored as it contains secrets
+                    <<"value">> := #{<<"password">> := <<"******">>}
+                }
+            }}},
+        probe_bridge_api(Config)
+    ),
     ok.
 
 t_missing_table(Config) ->

+ 1 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_pulsar, [
     {description, "EMQX Pulsar Bridge"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [
         kernel,

+ 15 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl

@@ -220,6 +220,13 @@ conn_bridge_examples(_Method) ->
         }
     ].
 
+producer_strategy_key_validator(
+    #{
+        strategy := _,
+        message := #{key := _}
+    } = Conf
+) ->
+    producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf));
 producer_strategy_key_validator(#{
     <<"strategy">> := key_dispatch,
     <<"message">> := #{<<"key">> := ""}
@@ -257,7 +264,12 @@ override_default(OriginalFn, NewDefault) ->
 
 auth_union_member_selector(all_union_members) ->
     [none, ref(auth_basic), ref(auth_token)];
-auth_union_member_selector({value, V}) ->
+auth_union_member_selector({value, V0}) ->
+    V =
+        case is_map(V0) of
+            true -> emqx_utils_maps:binary_key_map(V0);
+            false -> V0
+        end,
     case V of
         #{<<"password">> := _} ->
             [ref(auth_basic)];
@@ -265,6 +277,8 @@ auth_union_member_selector({value, V}) ->
             [ref(auth_token)];
         <<"none">> ->
             [none];
+        none ->
+            [none];
         _ ->
             Expected = "none | basic | token",
             throw(#{

+ 40 - 2
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl

@@ -40,6 +40,7 @@ groups() ->
 only_once_tests() ->
     [
         t_create_via_http,
+        t_strategy_key_validation,
         t_start_when_down,
         t_send_when_down,
         t_send_when_timeout,
@@ -313,6 +314,8 @@ create_bridge_api(Config, Overrides) ->
         case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
             {ok, {Status, Headers, Body0}} ->
                 {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
+            {error, {Status, Headers, Body0}} ->
+                {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
             Error ->
                 Error
         end,
@@ -356,8 +359,12 @@ probe_bridge_api(Config, Overrides) ->
     ct:pal("probing bridge (via http): ~p", [Params]),
     Res =
         case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
-            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
-            Error -> Error
+            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} ->
+                {ok, Res0};
+            {error, {Status, Headers, Body0}} ->
+                {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
+            Error ->
+                Error
         end,
     ct:pal("bridge probe result: ~p", [Res]),
     Res.
@@ -1074,6 +1081,37 @@ t_resource_manager_crash_before_producers_started(Config) ->
     ),
     ok.
 
+t_strategy_key_validation(Config) ->
+    ?assertMatch(
+        {error,
+            {{_, 400, _}, _, #{
+                <<"message">> :=
+                    #{
+                        <<"kind">> := <<"validation_error">>,
+                        <<"reason">> := <<"Message key cannot be empty", _/binary>>
+                    } = Msg
+            }}} when not is_map_key(<<"value">>, Msg),
+        probe_bridge_api(
+            Config,
+            #{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}}
+        )
+    ),
+    ?assertMatch(
+        {error,
+            {{_, 400, _}, _, #{
+                <<"message">> :=
+                    #{
+                        <<"kind">> := <<"validation_error">>,
+                        <<"reason">> := <<"Message key cannot be empty", _/binary>>
+                    } = Msg
+            }}} when not is_map_key(<<"value">>, Msg),
+        create_bridge_api(
+            Config,
+            #{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}}
+        )
+    ),
+    ok.
+
 t_cluster(Config0) ->
     ct:timetrap({seconds, 120}),
     ?retrying(Config0, 3, fun do_t_cluster/1).

+ 16 - 0
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl

@@ -35,6 +35,17 @@ pulsar_producer_validations_test() ->
         ]},
         check(Conf)
     ),
+    %% ensure atoms exist
+    _ = [my_producer],
+    ?assertThrow(
+        {_, [
+            #{
+                path := "bridges.pulsar_producer.my_producer",
+                reason := "Message key cannot be empty when `key_dispatch` strategy is used"
+            }
+        ]},
+        check_atom_key(Conf)
+    ),
 
     ok.
 
@@ -46,9 +57,14 @@ parse(Hocon) ->
     {ok, Conf} = hocon:binary(Hocon),
     Conf.
 
+%% what bridge creation does
 check(Conf) when is_map(Conf) ->
     hocon_tconf:check_plain(emqx_bridge_schema, Conf).
 
+%% what bridge probe does
+check_atom_key(Conf) when is_map(Conf) ->
+    hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}).
+
 %%===========================================================================
 %% Data section
 %%===========================================================================