Просмотр исходного кода

fix(bridges): support create resources with options

Shawn 3 лет назад
Родитель
Сommit
2872f0b668

+ 5 - 4
apps/emqx_bridge/src/emqx_bridge.erl

@@ -171,9 +171,9 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
         diff_confs(NewConf, OldConf),
     %% The config update will be failed if any task in `perform_bridge_changes` failed.
     Result = perform_bridge_changes([
-        {fun emqx_bridge_resource:remove/3, Removed},
-        {fun emqx_bridge_resource:create/3, Added},
-        {fun emqx_bridge_resource:update/3, Updated}
+        {fun emqx_bridge_resource:remove/4, Removed},
+        {fun emqx_bridge_resource:create/4, Added},
+        {fun emqx_bridge_resource:update/4, Updated}
     ]),
     ok = unload_hook(),
     ok = load_hook(NewConf),
@@ -261,7 +261,8 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
             ({_Type, _Name}, _Conf, {error, Reason}) ->
                 {error, Reason};
             ({Type, Name}, Conf, _) ->
-                case Action(Type, Name, Conf) of
+                ResOpts = emqx_resource:fetch_creation_opts(Conf),
+                case Action(Type, Name, Conf, ResOpts) of
                     {error, Reason} -> {error, Reason};
                     Return -> Return
                 end

+ 7 - 4
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -158,11 +158,14 @@ recreate(Type, Name) ->
     recreate(Type, Name, emqx:get_config([bridges, Type, Name])).
 
 recreate(Type, Name, Conf) ->
+    recreate(Type, Name, Conf, #{}).
+
+recreate(Type, Name, Conf, Opts) ->
     emqx_resource:recreate_local(
         resource_id(Type, Name),
         bridge_to_resource_type(Type),
         parse_confs(Type, Name, Conf),
-        #{auto_retry_interval => 60000}
+        Opts#{auto_retry_interval => 60000}
     ).
 
 create_dry_run(Type, Conf) ->
@@ -186,13 +189,13 @@ create_dry_run(Type, Conf) ->
 
 remove(BridgeId) ->
     {BridgeType, BridgeName} = parse_bridge_id(BridgeId),
-    remove(BridgeType, BridgeName, #{}).
+    remove(BridgeType, BridgeName, #{}, #{}).
 
 remove(Type, Name) ->
-    remove(Type, Name, undefined).
+    remove(Type, Name, #{}, #{}).
 
 %% just for perform_bridge_changes/1
-remove(Type, Name, _Conf) ->
+remove(Type, Name, _Conf, _Opts) ->
     ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
     case emqx_resource:remove_local(resource_id(Type, Name)) of
         ok -> ok;

+ 0 - 28
apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf

@@ -1,32 +1,4 @@
 emqx_resource_schema {
-  batch {
-    desc {
-      en: """
-Configuration of batch query.</br>
-Batch requests are made immediately when the number of requests reaches the `batch_size`, or also immediately when the number of requests is less than the batch request size but the maximum batch_time has been reached.
-"""
-      zh: """
-批量请求配置。</br>
-请求数达到批量请求大小时立刻进行批量请求,或当请求数不足批量请求数大小,但已经达到最大批量等待时间时也立即进行批量请求。
-"""
-    }
-    label {
-      en: """batch"""
-      zh: """批量请求"""
-    }
-  }
-
-  queue {
-    desc {
-      en: """Configuration of queue."""
-      zh: """请求队列配置"""
-    }
-    label {
-      en: """queue"""
-      zh: """请求队列"""
-    }
-  }
-
   query_mode {
     desc {
       en: """Query mode. Optional 'sync/async', default 'sync'."""

+ 10 - 2
apps/emqx_resource/include/emqx_resource.hrl

@@ -40,7 +40,7 @@
     metrics := emqx_metrics_worker:metrics()
 }.
 -type resource_group() :: binary().
--type create_opts() :: #{
+-type creation_opts() :: #{
     health_check_interval => integer(),
     health_check_timeout => integer(),
     %% We can choose to block the return of emqx_resource:start until
@@ -52,7 +52,15 @@
     start_after_created => boolean(),
     %% If the resource disconnected, we can set to retry starting the resource
     %% periodically.
-    auto_retry_interval => integer()
+    auto_retry_interval => integer(),
+    enable_batch => boolean(),
+    batch_size => integer(),
+    batch_time => integer(),
+    enable_queue => boolean(),
+    queue_max_bytes => integer(),
+    query_mode => async | sync | dynamic
+    resume_interval => integer(),
+    async_inflight_window => integer()
 }.
 -type query_result() ::
     ok

+ 30 - 10
apps/emqx_resource/src/emqx_resource.erl

@@ -102,6 +102,7 @@
     list_instances_verbose/0,
     %% return the data of the instance
     get_instance/1,
+    fetch_creation_opts/1,
     %% return all the instances of the same resource type
     list_instances_by_type/1,
     generate_id/1,
@@ -159,7 +160,7 @@ is_resource_mod(Module) ->
 create(ResId, Group, ResourceType, Config) ->
     create(ResId, Group, ResourceType, Config, #{}).
 
--spec create(resource_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
+-spec create(resource_id(), resource_group(), resource_type(), resource_config(), creation_opts()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
 create(ResId, Group, ResourceType, Config, Opts) ->
     emqx_resource_proto_v1:create(ResId, Group, ResourceType, Config, Opts).
@@ -175,7 +176,7 @@ create_local(ResId, Group, ResourceType, Config) ->
     resource_group(),
     resource_type(),
     resource_config(),
-    create_opts()
+    creation_opts()
 ) ->
     {ok, resource_data()}.
 create_local(ResId, Group, ResourceType, Config, Opts) ->
@@ -196,7 +197,7 @@ create_dry_run_local(ResourceType, Config) ->
 recreate(ResId, ResourceType, Config) ->
     recreate(ResId, ResourceType, Config, #{}).
 
--spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) ->
+-spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
 recreate(ResId, ResourceType, Config, Opts) ->
     emqx_resource_proto_v1:recreate(ResId, ResourceType, Config, Opts).
@@ -206,7 +207,7 @@ recreate(ResId, ResourceType, Config, Opts) ->
 recreate_local(ResId, ResourceType, Config) ->
     recreate_local(ResId, ResourceType, Config, #{}).
 
--spec recreate_local(resource_id(), resource_type(), resource_config(), create_opts()) ->
+-spec recreate_local(resource_id(), resource_type(), resource_config(), creation_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
 recreate_local(ResId, ResourceType, Config, Opts) ->
     emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts).
@@ -249,7 +250,7 @@ simple_async_query(ResId, Request, ReplyFun) ->
 start(ResId) ->
     start(ResId, #{}).
 
--spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
+-spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
 start(ResId, Opts) ->
     emqx_resource_manager:start(ResId, Opts).
 
@@ -257,7 +258,7 @@ start(ResId, Opts) ->
 restart(ResId) ->
     restart(ResId, #{}).
 
--spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
+-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
 restart(ResId, Opts) ->
     emqx_resource_manager:restart(ResId, Opts).
 
@@ -277,6 +278,25 @@ set_resource_status_connecting(ResId) ->
 get_instance(ResId) ->
     emqx_resource_manager:lookup(ResId).
 
+-spec fetch_creation_opts(map()) -> creation_opts().
+fetch_creation_opts(Opts) ->
+    SupportedOpts = [
+        health_check_interval,
+        health_check_timeout,
+        wait_for_resource_ready,
+        start_after_created,
+        auto_retry_interval,
+        enable_batch,
+        batch_size,
+        batch_time,
+        enable_queue,
+        queue_max_bytes,
+        query_mode,
+        resume_interval,
+        async_inflight_window
+    ],
+    maps:with(creation_opts(), SupportedOpts).
+
 -spec list_instances() -> [resource_id()].
 list_instances() ->
     [Id || #{id := Id} <- list_instances_verbose()].
@@ -341,7 +361,7 @@ check_and_create(ResId, Group, ResourceType, RawConfig) ->
     resource_group(),
     resource_type(),
     raw_resource_config(),
-    create_opts()
+    creation_opts()
 ) ->
     {ok, resource_data() | 'already_created'} | {error, term()}.
 check_and_create(ResId, Group, ResourceType, RawConfig, Opts) ->
@@ -366,7 +386,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig) ->
     resource_group(),
     resource_type(),
     raw_resource_config(),
-    create_opts()
+    creation_opts()
 ) -> {ok, resource_data()} | {error, term()}.
 check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) ->
     check_and_do(
@@ -379,7 +399,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) ->
     resource_id(),
     resource_type(),
     raw_resource_config(),
-    create_opts()
+    creation_opts()
 ) ->
     {ok, resource_data()} | {error, term()}.
 check_and_recreate(ResId, ResourceType, RawConfig, Opts) ->
@@ -393,7 +413,7 @@ check_and_recreate(ResId, ResourceType, RawConfig, Opts) ->
     resource_id(),
     resource_type(),
     raw_resource_config(),
-    create_opts()
+    creation_opts()
 ) ->
     {ok, resource_data()} | {error, term()}.
 check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) ->

+ 4 - 4
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -85,7 +85,7 @@ manager_id_to_resource_id(MgrId) ->
     resource_group(),
     resource_type(),
     resource_config(),
-    create_opts()
+    creation_opts()
 ) -> {ok, resource_data()}.
 ensure_resource(ResId, Group, ResourceType, Config, Opts) ->
     case lookup(ResId) of
@@ -97,7 +97,7 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) ->
     end.
 
 %% @doc Called from emqx_resource when recreating a resource which may or may not exist
--spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) ->
+-spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) ->
     {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}.
 recreate(ResId, ResourceType, NewConfig, Opts) ->
     case lookup(ResId) of
@@ -166,7 +166,7 @@ remove(ResId, ClearMetrics) when is_binary(ResId) ->
     safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION).
 
 %% @doc Stops and then starts an instance that was already running
--spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
+-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
 restart(ResId, Opts) when is_binary(ResId) ->
     case safe_call(ResId, restart, ?T_OPERATION) of
         ok ->
@@ -177,7 +177,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
     end.
 
 %% @doc Start the resource
--spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
+-spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
 start(ResId, Opts) ->
     case safe_call(ResId, start, ?T_OPERATION) of
         ok ->

+ 2 - 2
apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl

@@ -38,7 +38,7 @@ introduced_in() ->
     resource_group(),
     resource_type(),
     resource_config(),
-    create_opts()
+    creation_opts()
 ) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
 create(ResId, Group, ResourceType, Config, Opts) ->
@@ -58,7 +58,7 @@ create_dry_run(ResourceType, Config) ->
     resource_id(),
     resource_type(),
     resource_config(),
-    create_opts()
+    creation_opts()
 ) ->
     {ok, resource_data()} | {error, Reason :: term()}.
 recreate(ResId, ResourceType, Config, Opts) ->

+ 2 - 10
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -30,22 +30,14 @@ namespace() -> "resource_schema".
 
 roots() -> [].
 
-fields('batch&async&queue') ->
+fields('creation_opts') ->
     [
         {query_mode, fun query_mode/1},
         {resume_interval, fun resume_interval/1},
         {async_inflight_window, fun async_inflight_window/1},
-        {batch, mk(ref(?MODULE, batch), #{desc => ?DESC("batch")})},
-        {queue, mk(ref(?MODULE, queue), #{desc => ?DESC("queue")})}
-    ];
-fields(batch) ->
-    [
         {enable_batch, fun enable_batch/1},
         {batch_size, fun batch_size/1},
-        {batch_time, fun batch_time/1}
-    ];
-fields(queue) ->
-    [
+        {batch_time, fun batch_time/1},
         {enable_queue, fun enable_queue/1},
         {max_queue_bytes, fun queue_max_bytes/1}
     ].

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl

@@ -111,7 +111,7 @@ fields(Name) when
     Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2
 ->
     fields(basic) ++
-        emqx_resource_schema:fields('batch&async&queue') ++
+        emqx_resource_schema:fields('creation_opts') ++
         connector_field(Name).
 
 method_fileds(post, ConnectorType) ->