Przeglądaj źródła

fix: support bridge v1 conf hocon-format in the put configs API.

zhongwencool 2 lat temu
rodzic
commit
a46b415c77

+ 1 - 0
apps/emqx/src/emqx_config.erl

@@ -94,6 +94,7 @@
 
 -export([ensure_atom_conf_path/2]).
 -export([load_config_files/2]).
+-export([upgrade_raw_conf/2]).
 
 -ifdef(TEST).
 -export([erase_all/0, backup_and_write/2]).

+ 19 - 3
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -233,7 +233,10 @@ load_config(Bin, Opts) when is_binary(Bin) ->
             {error, Reason}
     end.
 
-load_config_from_raw(RawConf, Opts) ->
+load_config_from_raw(RawConf0, Opts) ->
+    SchemaMod = emqx_conf:schema_module(),
+    RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0),
+    RawConf = emqx_config:fill_defaults(RawConf1),
     case check_config(RawConf) of
         ok ->
             Error =
@@ -452,8 +455,21 @@ sorted_fold(Func, Conf) ->
         Error -> {error, Error}
     end.
 
-to_sorted_list(Conf) ->
-    lists:keysort(1, maps:to_list(Conf)).
+to_sorted_list(Conf0) ->
+    %% connectors > actions/bridges > rule_engine
+    Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>],
+    {HighPriorities, Conf1} = split_high_priority_conf(Keys, Conf0, []),
+    HighPriorities ++ lists:keysort(1, maps:to_list(Conf1)).
+
+split_high_priority_conf([], Conf0, Acc) ->
+    {lists:reverse(Acc), Conf0};
+split_high_priority_conf([Key | Keys], Conf0, Acc) ->
+    case maps:take(Key, Conf0) of
+        error ->
+            split_high_priority_conf(Keys, Conf0, Acc);
+        {Value, Conf1} ->
+            split_high_priority_conf(Keys, Conf1, [{Key, Value} | Acc])
+    end.
 
 merge_conf(Key, NewConf) ->
     OldConf = emqx_conf:get_raw([Key]),

+ 12 - 3
apps/emqx_conf/test/emqx_conf_cli_SUITE.erl

@@ -40,7 +40,7 @@ t_load_config(Config) ->
     ConfBin = hocon_pp:do(#{<<"authorization">> => #{<<"sources">> => []}}, #{}),
     ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
     ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]),
-    ?assertEqual(#{<<"sources">> => []}, emqx_conf:get_raw([Authz])),
+    ?assertMatch(#{<<"sources">> := []}, emqx_conf:get_raw([Authz])),
 
     ConfBin0 = hocon_pp:do(#{<<"authorization">> => Conf#{<<"sources">> => []}}, #{}),
     ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
@@ -73,6 +73,10 @@ t_conflict_mix_conf(Config) ->
             AuthNInit = emqx_conf:get_raw([authentication]),
             Redis = #{
                 <<"backend">> => <<"redis">>,
+                <<"database">> => 0,
+                <<"password_hash_algorithm">> =>
+                    #{<<"name">> => <<"sha256">>, <<"salt_position">> => <<"prefix">>},
+                <<"pool_size">> => 8,
                 <<"cmd">> => <<"HMGET mqtt_user:${username} password_hash salt">>,
                 <<"enable">> => false,
                 <<"mechanism">> => <<"password_based">>,
@@ -85,10 +89,15 @@ t_conflict_mix_conf(Config) ->
             ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
             %% init with redis sources
             ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]),
-            ?assertMatch([Redis], emqx_conf:get_raw([authentication])),
+            [RedisRaw] = emqx_conf:get_raw([authentication]),
+            ?assertEqual(
+                maps:to_list(Redis),
+                maps:to_list(maps:remove(<<"ssl">>, RedisRaw)),
+                {Redis, RedisRaw}
+            ),
             %% change redis type from single to cluster
             %% the server field will become servers field
-            RedisCluster = maps:remove(<<"server">>, Redis#{
+            RedisCluster = maps:without([<<"server">>, <<"database">>], Redis#{
                 <<"redis_type">> => cluster,
                 <<"servers">> => [<<"127.0.0.1:6379">>]
             }),

+ 3 - 0
apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl

@@ -813,6 +813,9 @@ to_schema(Body) ->
         post => #{requestBody => Body, responses => #{200 => <<"ok">>}}
     }.
 
+%% Don't warning hocon callback namespace/0 undef.
+namespace() -> atom_to_list(?MODULE).
+
 fields(good_ref) ->
     [
         {'webhook-host', mk(emqx_schema:ip_port(), #{default => <<"127.0.0.1:80">>})},

+ 99 - 0
apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl

@@ -19,6 +19,7 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -32,11 +33,15 @@ end_per_suite(_) ->
 
 init_per_testcase(TestCase = t_configs_node, Config) ->
     ?MODULE:TestCase({'init', Config});
+init_per_testcase(TestCase = t_create_webhook_v1_bridges_api, Config) ->
+    ?MODULE:TestCase({'init', Config});
 init_per_testcase(_TestCase, Config) ->
     Config.
 
 end_per_testcase(TestCase = t_configs_node, Config) ->
     ?MODULE:TestCase({'end', Config});
+end_per_testcase(TestCase = t_create_webhook_v1_bridges_api, Config) ->
+    ?MODULE:TestCase({'end', Config});
 end_per_testcase(_TestCase, Config) ->
     Config.
 
@@ -372,6 +377,100 @@ t_get_configs_in_different_accept(_Config) ->
     %% returns error if it set to other type
     ?assertMatch({400, "application/json", _}, Request(<<"application/xml">>)).
 
+t_create_webhook_v1_bridges_api({'init', Config}) ->
+    application:ensure_all_started(emqx_connector),
+    application:ensure_all_started(emqx_bridge),
+    Config;
+t_create_webhook_v1_bridges_api({'end', _}) ->
+    application:stop(emqx_bridge),
+    application:stop(emqx_connector),
+    ok;
+t_create_webhook_v1_bridges_api(Config) ->
+    WebHookFile = filename:join(?config(data_dir, Config), "webhook_v1.conf"),
+    ?assertMatch({ok, _}, hocon:files([WebHookFile])),
+    {ok, WebHookBin} = file:read_file(WebHookFile),
+    ?assertEqual([], update_configs_with_binary(WebHookBin)),
+    Actions =
+        #{
+            <<"http">> =>
+                #{
+                    <<"webhook_name">> =>
+                        #{
+                            <<"connector">> => <<"connector_webhook_name">>,
+                            <<"description">> => <<>>,
+                            <<"enable">> => true,
+                            <<"parameters">> =>
+                                #{
+                                    <<"body">> => <<"{\"value\": \"${value}\"}">>,
+                                    <<"headers">> => #{},
+                                    <<"max_retries">> => 3,
+                                    <<"method">> => <<"post">>,
+                                    <<"path">> => <<>>
+                                },
+                            <<"resource_opts">> =>
+                                #{
+                                    <<"health_check_interval">> => <<"15s">>,
+                                    <<"inflight_window">> => 100,
+                                    <<"max_buffer_bytes">> => <<"256MB">>,
+                                    <<"query_mode">> => <<"async">>,
+                                    <<"request_ttl">> => <<"45s">>,
+                                    <<"worker_pool_size">> => 4
+                                }
+                        }
+                }
+        },
+    ?assertEqual(Actions, emqx_conf:get_raw([<<"actions">>])),
+    Connectors =
+        #{
+            <<"http">> =>
+                #{
+                    <<"connector_webhook_name">> =>
+                        #{
+                            <<"connect_timeout">> => <<"15s">>,
+                            <<"description">> => <<>>,
+                            <<"enable">> => true,
+                            <<"enable_pipelining">> => 100,
+                            <<"headers">> =>
+                                #{
+                                    <<"Authorization">> => <<"Bearer redacted">>,
+                                    <<"content-type">> => <<"application/json">>
+                                },
+                            <<"pool_size">> => 4,
+                            <<"pool_type">> => <<"random">>,
+                            <<"resource_opts">> =>
+                                #{
+                                    <<"health_check_interval">> => <<"15s">>,
+                                    <<"start_after_created">> => true,
+                                    <<"start_timeout">> => <<"5s">>
+                                },
+                            <<"ssl">> =>
+                                #{
+                                    <<"ciphers">> => [],
+                                    <<"depth">> => 10,
+                                    <<"enable">> => true,
+                                    <<"hibernate_after">> => <<"5s">>,
+                                    <<"log_level">> => <<"notice">>,
+                                    <<"reuse_sessions">> => true,
+                                    <<"secure_renegotiate">> => true,
+                                    <<"user_lookup_fun">> =>
+                                        <<"emqx_tls_psk:lookup">>,
+                                    <<"verify">> => <<"verify_none">>,
+                                    <<"versions">> =>
+                                        [
+                                            <<"tlsv1.3">>,
+                                            <<"tlsv1.2">>,
+                                            <<"tlsv1.1">>,
+                                            <<"tlsv1">>
+                                        ]
+                                },
+                            <<"url">> => <<"https://127.0.0.1:18083">>
+                        }
+                }
+        },
+    ?assertEqual(Connectors, emqx_conf:get_raw([<<"connectors">>])),
+    ?assertEqual(#{<<"webhook">> => #{}}, emqx_conf:get_raw([<<"bridges">>])),
+    ok.
+
 %% Helpers
 
 get_config(Name) ->

+ 36 - 0
apps/emqx_management/test/emqx_mgmt_api_configs_SUITE_data/webhook_v1.conf

@@ -0,0 +1,36 @@
+bridges {
+  webhook {
+    webhook_name {
+        body = "{\"value\": \"${value}\"}"
+        connect_timeout = "15s"
+        enable = true
+        enable_pipelining = 100
+        headers {Authorization = "Bearer redacted", "content-type" = "application/json"}
+        max_retries = 3
+        method = "post"
+        pool_size = 4
+        pool_type = "random"
+        request_timeout = "15s"
+        resource_opts {
+            async_inflight_window = 100
+            auto_restart_interval = "60s"
+            enable_queue = false
+            health_check_interval = "15s"
+            max_queue_bytes = "1GB"
+            query_mode = "async"
+            worker_pool_size = 4
+        }
+        ssl {
+            ciphers = []
+            depth = 10
+            enable = true
+            reuse_sessions = true
+            secure_renegotiate = true
+            user_lookup_fun = "emqx_tls_psk:lookup"
+            verify = "verify_none"
+            versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
+        }
+        url = "https://127.0.0.1:18083"
+    }
+}
+}