Просмотр исходного кода

Merge pull request #12071 from zmstone/1201-sync-release-54

1201 sync release 54
Zaiming (Stone) Shi 2 лет назад
Родитель
Сommit
956b25cc4c
35 измененных файлов с 813 добавлено и 253 удалено
  1. 4 4
      .ci/docker-compose-file/redis/sentinel-tcp/sentinel-base.conf
  2. 4 4
      .ci/docker-compose-file/redis/sentinel-tls/sentinel-base.conf
  3. 2 2
      apps/emqx/rebar.config
  4. 1 1
      apps/emqx/rebar.config.script
  5. 1 1
      apps/emqx/src/emqx_quic_stream.erl
  6. 7 8
      apps/emqx/test/emqx_quic_multistreams_SUITE.erl
  7. 1 1
      apps/emqx_auth_redis/src/emqx_auth_redis.app.src
  8. 2 6
      apps/emqx_auth_redis/src/emqx_authn_redis_schema.erl
  9. 2 10
      apps/emqx_auth_redis/src/emqx_authz_redis_schema.erl
  10. 2 1
      apps/emqx_bridge/src/emqx_action_info.erl
  11. 3 3
      apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl
  12. 3 1
      apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src
  13. 11 15
      apps/emqx_bridge_redis/src/emqx_bridge_redis.erl
  14. 98 0
      apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl
  15. 90 49
      apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl
  16. 276 0
      apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl
  17. 41 34
      apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl
  18. 3 0
      apps/emqx_connector/src/emqx_connector.app.src
  19. 14 2
      apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
  20. 27 12
      apps/emqx_connector/src/schema/emqx_connector_schema.erl
  21. 1 1
      apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
  22. 11 5
      apps/emqx_management/src/emqx_mgmt_api_data_backup.erl
  23. 15 5
      apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl
  24. 100 76
      apps/emqx_redis/src/emqx_redis.erl
  25. 1 1
      apps/emqx_redis/test/emqx_redis_SUITE.erl
  26. 5 1
      apps/emqx_rule_engine/src/emqx_rule_engine.app.src
  27. 2 0
      changes/ce/feat-12040.en.md
  28. 2 2
      mix.exs
  29. 1 1
      rebar.config
  30. 1 1
      rebar.config.erl
  31. 7 2
      rel/i18n/emqx_bridge_redis.hocon
  32. 41 0
      rel/i18n/emqx_bridge_redis_schema.hocon
  33. 6 0
      rel/i18n/emqx_connector_schema.hocon
  34. 14 0
      rel/i18n/emqx_redis.hocon
  35. 14 4
      scripts/pre-compile.sh

+ 4 - 4
.ci/docker-compose-file/redis/sentinel-tcp/sentinel-base.conf

@@ -1,7 +1,7 @@
 sentinel resolve-hostnames yes
 bind :: 0.0.0.0
 
-sentinel monitor mymaster redis-sentinel-master 6379 1
-sentinel auth-pass mymaster public
-sentinel down-after-milliseconds mymaster 10000
-sentinel failover-timeout mymaster 20000
+sentinel monitor mytcpmaster redis-sentinel-master 6379 1
+sentinel auth-pass mytcpmaster public
+sentinel down-after-milliseconds mytcpmaster 10000
+sentinel failover-timeout mytcpmaster 20000

+ 4 - 4
.ci/docker-compose-file/redis/sentinel-tls/sentinel-base.conf

@@ -8,7 +8,7 @@ tls-key-file /etc/certs/key.pem
 tls-ca-cert-file /etc/certs/cacert.pem
 tls-auth-clients no
 
-sentinel monitor mymaster redis-sentinel-tls-master 6389 1
-sentinel auth-pass mymaster public
-sentinel down-after-milliseconds mymaster 10000
-sentinel failover-timeout mymaster 20000
+sentinel monitor mytlsmaster redis-sentinel-tls-master 6389 1
+sentinel auth-pass mytlsmaster public
+sentinel down-after-milliseconds mytlsmaster 10000
+sentinel failover-timeout mytlsmaster 20000

+ 2 - 2
apps/emqx/rebar.config

@@ -45,7 +45,7 @@
             {meck, "0.9.2"},
             {proper, "1.4.0"},
             {bbmustache, "1.10.0"},
-            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.6"}}}
+            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}}
         ]},
         {extra_src_dirs, [{"test", [recursive]},
                           {"integration_test", [recursive]}]}
@@ -55,7 +55,7 @@
             {meck, "0.9.2"},
             {proper, "1.4.0"},
             {bbmustache, "1.10.0"},
-            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.6"}}}
+            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}}
         ]},
         {extra_src_dirs, [{"test", [recursive]}]}
     ]}

+ 1 - 1
apps/emqx/rebar.config.script

@@ -24,7 +24,7 @@ IsQuicSupp = fun() ->
 end,
 
 Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
-Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.202"}}}.
+Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.303"}}}.
 
 Dialyzer = fun(Config) ->
     {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),

+ 1 - 1
apps/emqx/src/emqx_quic_stream.erl

@@ -184,7 +184,7 @@ peer_send_aborted(Stream, ErrorCode, S) ->
 
 -spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret().
 peer_send_shutdown(Stream, undefined, S) ->
-    ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0),
+    _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0),
     {ok, S}.
 
 -spec send_complete(stream_handle(), boolean(), cb_data()) -> cb_ret().

+ 7 - 8
apps/emqx/test/emqx_quic_multistreams_SUITE.erl

@@ -669,22 +669,21 @@ t_multi_streams_packet_malform(Config) ->
     case quicer:send(MalformStream, <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>) of
         {ok, 10} -> ok;
         {error, cancelled} -> ok;
-        {error, stm_send_error, aborted} -> ok
+        {error, stm_send_error, aborted} -> ok;
+        {error, closed} -> ok
     end,
 
     ?assert(is_list(emqtt:info(C))),
-
-    {error, stm_send_error, _} =
+    {error, closed} =
         snabbkaffe:retry(
             10000,
             10,
             fun() ->
-                {error, stm_send_error, _} = quicer:send(
+                {error, closed} = quicer:send(
                     MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>>
                 )
             end
         ),
-
     ?assert(is_list(emqtt:info(C))),
 
     ok = emqtt:disconnect(C).
@@ -770,9 +769,9 @@ t_multi_streams_packet_too_large(Config) ->
     timeout = recv_pub(1),
     ?assert(is_list(emqtt:info(C))),
 
-    %% Connection could be kept
-    {error, stm_send_error, _} = quicer:send(via_stream(PubVia), <<1>>),
-    {error, stm_send_error, _} = quicer:send(via_stream(PubVia2), <<1>>),
+    %% Connection could be kept but data stream are closed!
+    {error, closed} = quicer:send(via_stream(PubVia), <<1>>),
+    {error, closed} = quicer:send(via_stream(PubVia2), <<1>>),
     %% We could send data over new stream
     {ok, PubVia3} = emqtt:start_data_stream(C, []),
     ok = emqtt:publish_async(

+ 1 - 1
apps/emqx_auth_redis/src/emqx_auth_redis.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_redis, [
     {description, "EMQX Redis Authentication and Authorization"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {mod, {emqx_auth_redis_app, []}},
     {applications, [

+ 2 - 6
apps/emqx_auth_redis/src/emqx_authn_redis_schema.erl

@@ -64,12 +64,8 @@ refs(_) ->
         expected => "single | cluster | sentinel"
     }).
 
-fields(redis_single) ->
-    common_fields() ++ emqx_redis:fields(single);
-fields(redis_cluster) ->
-    common_fields() ++ emqx_redis:fields(cluster);
-fields(redis_sentinel) ->
-    common_fields() ++ emqx_redis:fields(sentinel).
+fields(Type) ->
+    common_fields() ++ emqx_redis:fields(Type).
 
 desc(redis_single) ->
     ?DESC(single);

+ 2 - 10
apps/emqx_auth_redis/src/emqx_authz_redis_schema.erl

@@ -34,17 +34,9 @@ namespace() -> "authz".
 
 type() -> ?AUTHZ_TYPE.
 
-fields(redis_single) ->
+fields(Type) ->
     emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
-        emqx_redis:fields(single) ++
-        [{cmd, cmd()}];
-fields(redis_sentinel) ->
-    emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
-        emqx_redis:fields(sentinel) ++
-        [{cmd, cmd()}];
-fields(redis_cluster) ->
-    emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
-        emqx_redis:fields(cluster) ++
+        emqx_redis:fields(Type) ++
         [{cmd, cmd()}].
 
 desc(redis_single) ->

+ 2 - 1
apps/emqx_bridge/src/emqx_action_info.erl

@@ -81,7 +81,8 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_mongodb_action_info,
         emqx_bridge_pgsql_action_info,
         emqx_bridge_syskeeper_action_info,
-        emqx_bridge_timescale_action_info
+        emqx_bridge_timescale_action_info,
+        emqx_bridge_redis_action_info
     ].
 -else.
 hard_coded_action_info_modules_ee() ->

+ 3 - 3
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -242,17 +242,17 @@ schema_homogeneous_test() ->
 is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
     Fields = Module:fields(TypeName),
     ExpectedFieldNames = lists:map(fun binary_to_atom/1, top_level_common_action_keys()),
-    MissingFileds = lists:filter(
+    MissingFields = lists:filter(
         fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames
     ),
-    case MissingFileds of
+    case MissingFields of
         [] ->
             false;
         _ ->
             {true, #{
                 schema_module => Module,
                 type_name => TypeName,
-                missing_fields => MissingFileds
+                missing_fields => MissingFields
             }}
     end.
 

+ 3 - 1
apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src

@@ -9,7 +9,9 @@
         emqx_resource,
         emqx_redis
     ]},
-    {env, []},
+    {env, [
+        {emqx_action_info_modules, [emqx_bridge_redis_action_info]}
+    ]},
     {modules, []},
     {links, []}
 ]}.

+ 11 - 15
apps/emqx_bridge_redis/src/emqx_bridge_redis.erl

@@ -8,9 +8,9 @@
 
 -import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
 
--export([
-    conn_bridge_examples/1
-]).
+-export([conn_bridge_examples/1]).
+
+-export([type_name_fields/1, connector_fields/1]).
 
 -export([
     namespace/0,
@@ -100,6 +100,8 @@ namespace() -> "bridge_redis".
 
 roots() -> [].
 
+fields(action_parameters) ->
+    [{command_template, fun command_template/1}];
 fields("post_single") ->
     method_fields(post, redis_single);
 fields("post_sentinel") ->
@@ -142,21 +144,13 @@ method_fields(put, ConnectorType) ->
 redis_bridge_common_fields(Type) ->
     emqx_bridge_schema:common_bridge_fields() ++
         [
-            {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
-            {command_template, fun command_template/1}
+            {local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})}
+            | fields(action_parameters)
         ] ++
         resource_fields(Type).
 
 connector_fields(Type) ->
-    RedisType = bridge_type_to_redis_conn_type(Type),
-    emqx_redis:fields(RedisType).
-
-bridge_type_to_redis_conn_type(redis_single) ->
-    single;
-bridge_type_to_redis_conn_type(redis_sentinel) ->
-    sentinel;
-bridge_type_to_redis_conn_type(redis_cluster) ->
-    cluster.
+    emqx_redis:fields(Type).
 
 type_name_fields(Type) ->
     [
@@ -168,7 +162,7 @@ resource_fields(Type) ->
     [
         {resource_opts,
             mk(
-                ref("creation_opts_" ++ atom_to_list(Type)),
+                ?R_REF("creation_opts_" ++ atom_to_list(Type)),
                 #{
                     required => false,
                     default => #{},
@@ -185,6 +179,8 @@ resource_creation_fields("redis_cluster") ->
 resource_creation_fields(_) ->
     emqx_resource_schema:fields("creation_opts").
 
+desc(action_parameters) ->
+    ?DESC("desc_action_parameters");
 desc("config") ->
     ?DESC("desc_config");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->

+ 98 - 0
apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl

@@ -0,0 +1,98 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_redis_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0,
+    bridge_v1_config_to_action_config/2,
+    connector_action_config_to_bridge_v1_config/2,
+    bridge_v1_config_to_connector_config/1,
+    bridge_v1_type_name_fun/1
+]).
+
+-import(emqx_utils_conv, [bin/1]).
+
+-define(SCHEMA_MODULE, emqx_bridge_redis_schema).
+-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
+
+action_type_name() -> redis.
+
+connector_type_name() -> redis.
+
+schema_module() -> ?SCHEMA_MODULE.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    fix_v1_type(
+        maps:merge(
+            maps:without(
+                [<<"connector">>],
+                map_unindent(<<"parameters">>, ActionConfig)
+            ),
+            map_unindent(<<"parameters">>, ConnectorConfig)
+        )
+    ).
+
+bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
+    ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
+    ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
+    ActionConfig#{<<"connector">> => ConnectorName}.
+
+bridge_v1_config_to_connector_config(BridgeV1Config) ->
+    ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
+    ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ConnectorTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields("config_connector")),
+    %% Need put redis_type into parameter.
+    %% cluster need type to filter resource_opts
+    ConnectorKeys =
+        (maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)) ++
+            [<<"redis_type">>],
+    ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys,
+    make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config).
+
+%%------------------------------------------------------------------------------------------
+%% Internal helper fns
+%%------------------------------------------------------------------------------------------
+
+bridge_v1_type_name() ->
+    {fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}.
+bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) ->
+    v1_type(Type).
+
+fix_v1_type(#{<<"redis_type">> := RedisType} = Conf) ->
+    Conf#{<<"type">> => v1_type(RedisType)}.
+
+v1_type(<<"single">>) -> redis_single;
+v1_type(<<"sentinel">>) -> redis_sentinel;
+v1_type(<<"cluster">>) -> redis_cluster.
+
+bridge_v1_type_names() -> [redis_single, redis_sentinel, redis_cluster].
+
+map_unindent(Key, Map) ->
+    maps:merge(
+        maps:get(Key, Map),
+        maps:remove(Key, Map)
+    ).
+
+map_indent(IndentKey, PickKeys, Map) ->
+    maps:put(
+        IndentKey,
+        maps:with(PickKeys, Map),
+        maps:without(PickKeys, Map)
+    ).
+
+schema_keys(Schema) ->
+    [bin(Key) || {Key, _} <- Schema].
+
+make_config_map(PickKeys, IndentKeys, Config) ->
+    Conf0 = maps:with(PickKeys, Config),
+    map_indent(<<"parameters">>, IndentKeys, Conf0).

+ 90 - 49
apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl

@@ -4,6 +4,7 @@
 -module(emqx_bridge_redis_connector).
 
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -behaviour(emqx_resource).
@@ -11,11 +12,15 @@
 %% callbacks of behaviour emqx_resource
 -export([
     callback_mode/0,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
     on_start/2,
     on_stop/2,
     on_query/3,
     on_batch_query/3,
-    on_get_status/2
+    on_get_status/2,
+    on_get_channel_status/3
 ]).
 
 %% -------------------------------------------------------------------------------------------------
@@ -24,7 +29,34 @@
 
 callback_mode() -> always_sync.
 
-on_start(InstId, #{command_template := CommandTemplate} = Config) ->
+on_add_channel(
+    _InstanceId,
+    State = #{channels := Channels},
+    ChannelId,
+    #{
+        parameters := #{
+            command_template := Template
+        }
+    }
+) ->
+    Channels2 = Channels#{
+        ChannelId => #{template => preproc_command_template(Template)}
+    },
+    {ok, State#{channels => Channels2}}.
+
+on_remove_channel(_InstanceId, State = #{channels := Channels}, ChannelId) ->
+    {ok, State#{channels => maps:remove(ChannelId, Channels)}}.
+
+on_get_channels(InstanceId) ->
+    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+
+on_get_channel_status(_ConnectorResId, ChannelId, #{channels := Channels}) ->
+    case maps:is_key(ChannelId, Channels) of
+        true -> ?status_connected;
+        false -> ?status_disconnected
+    end.
+
+on_start(InstId, Config) ->
     case emqx_redis:on_start(InstId, Config) of
         {ok, RedisConnSt} ->
             ?tp(
@@ -33,7 +65,7 @@ on_start(InstId, #{command_template := CommandTemplate} = Config) ->
             ),
             {ok, #{
                 conn_st => RedisConnSt,
-                command_template => preproc_command_template(CommandTemplate)
+                channels => #{}
             }};
         {error, {start_pool_failed, _, #{type := authentication_error, reason := Reason}}} = Error ->
             ?tp(
@@ -57,14 +89,8 @@ on_stop(InstId, undefined = _State) ->
 on_get_status(InstId, #{conn_st := RedisConnSt}) ->
     emqx_redis:on_get_status(InstId, RedisConnSt).
 
-on_query(
-    InstId,
-    {send_message, Data},
-    _State = #{
-        command_template := CommandTemplate, conn_st := RedisConnSt
-    }
-) ->
-    Cmd = proc_command_template(CommandTemplate, Data),
+%% raw cmd without template, for CI test
+on_query(InstId, {cmd, Cmd}, #{conn_st := RedisConnSt}) ->
     ?tp(
         redis_bridge_connector_cmd,
         #{cmd => Cmd, batch => false, mode => sync}
@@ -77,45 +103,68 @@ on_query(
     Result;
 on_query(
     InstId,
-    Query,
-    _State = #{conn_st := RedisConnSt}
+    {_MessageTag, _Data} = Msg,
+    #{channels := Channels, conn_st := RedisConnSt}
 ) ->
-    ?tp(
-        redis_bridge_connector_query,
-        #{query => Query, batch => false, mode => sync}
-    ),
-    Result = query(InstId, Query, RedisConnSt),
-    ?tp(
-        redis_bridge_connector_send_done,
-        #{query => Query, batch => false, mode => sync, result => Result}
-    ),
-    Result.
+    case try_render_message([Msg], Channels) of
+        {ok, [Cmd]} ->
+            ?tp(
+                redis_bridge_connector_cmd,
+                #{cmd => Cmd, batch => false, mode => sync}
+            ),
+            Result = query(InstId, {cmd, Cmd}, RedisConnSt),
+            ?tp(
+                redis_bridge_connector_send_done,
+                #{cmd => Cmd, batch => false, mode => sync, result => Result}
+            ),
+            Result;
+        Error ->
+            Error
+    end.
 
 on_batch_query(
-    InstId, BatchData, _State = #{command_template := CommandTemplate, conn_st := RedisConnSt}
+    InstId, BatchData, _State = #{channels := Channels, conn_st := RedisConnSt}
 ) ->
-    Cmds = process_batch_data(BatchData, CommandTemplate),
-    ?tp(
-        redis_bridge_connector_send,
-        #{batch_data => BatchData, batch => true, mode => sync}
-    ),
-    Result = query(InstId, {cmds, Cmds}, RedisConnSt),
-    ?tp(
-        redis_bridge_connector_send_done,
-        #{
-            batch_data => BatchData,
-            batch_size => length(BatchData),
-            batch => true,
-            mode => sync,
-            result => Result
-        }
-    ),
-    Result.
+    case try_render_message(BatchData, Channels) of
+        {ok, Cmds} ->
+            ?tp(
+                redis_bridge_connector_send,
+                #{batch_data => BatchData, batch => true, mode => sync}
+            ),
+            Result = query(InstId, {cmds, Cmds}, RedisConnSt),
+            ?tp(
+                redis_bridge_connector_send_done,
+                #{
+                    batch_data => BatchData,
+                    batch_size => length(BatchData),
+                    batch => true,
+                    mode => sync,
+                    result => Result
+                }
+            ),
+            Result;
+        Error ->
+            Error
+    end.
 
 %% -------------------------------------------------------------------------------------------------
 %% private helpers
 %% -------------------------------------------------------------------------------------------------
 
+try_render_message(Datas, Channels) ->
+    try_render_message(Datas, Channels, []).
+
+try_render_message([{MessageTag, Data} | T], Channels, Acc) ->
+    case maps:find(MessageTag, Channels) of
+        {ok, #{template := Template}} ->
+            Msg = proc_command_template(Template, Data),
+            try_render_message(T, Channels, [Msg | Acc]);
+        _ ->
+            {error, {unrecoverable_error, {invalid_message_tag, MessageTag}}}
+    end;
+try_render_message([], _Channels, Acc) ->
+    {ok, lists:reverse(Acc)}.
+
 query(InstId, Query, RedisConnSt) ->
     case emqx_redis:on_query(InstId, Query, RedisConnSt) of
         {ok, _} = Ok -> Ok;
@@ -123,14 +172,6 @@ query(InstId, Query, RedisConnSt) ->
         {error, _} = Error -> Error
     end.
 
-process_batch_data(BatchData, CommandTemplate) ->
-    lists:map(
-        fun({send_message, Data}) ->
-            proc_command_template(CommandTemplate, Data)
-        end,
-        BatchData
-    ).
-
 proc_command_template(CommandTemplate, Msg) ->
     lists:map(
         fun(ArgTks) ->

+ 276 - 0
apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl

@@ -0,0 +1,276 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_redis_schema).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-define(TYPE, redis).
+
+%% `hocon_schema' API
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1,
+    resource_opts_converter/2
+]).
+
+%% `emqx_bridge_v2_schema' "unofficial" API
+-export([
+    bridge_v2_examples/1,
+    conn_bridge_examples/1,
+    connector_examples/1
+]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `hocon_schema' API
+%%-------------------------------------------------------------------------------------------------
+
+namespace() ->
+    ?TYPE.
+
+roots() ->
+    [].
+
+%%=========================================
+%% Action fields
+%%=========================================
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        [
+            {parameters,
+                ?HOCON(
+                    hoconsc:union([
+                        ?R_REF(emqx_redis, redis_single_connector),
+                        ?R_REF(emqx_redis, redis_sentinel_connector),
+                        ?R_REF(emqx_redis, redis_cluster_connector)
+                    ]),
+                    #{required => true, desc => ?DESC(redis_parameters)}
+                )}
+        ] ++
+        emqx_redis:redis_fields() ++
+        emqx_connector_schema_lib:ssl_fields();
+fields(action) ->
+    {?TYPE,
+        ?HOCON(
+            ?MAP(name, ?R_REF(redis_action)),
+            #{
+                desc => <<"Redis Action Config">>,
+                converter => fun ?MODULE:resource_opts_converter/2,
+                required => false
+            }
+        )};
+fields(redis_action) ->
+    Schema =
+        emqx_bridge_v2_schema:make_producer_action_schema(
+            ?HOCON(
+                ?R_REF(emqx_bridge_redis, action_parameters),
+                #{
+                    required => true,
+                    desc => ?DESC(producer_action)
+                }
+            )
+        ),
+    ResOpts =
+        {resource_opts,
+            ?HOCON(
+                ?R_REF(resource_opts),
+                #{
+                    required => true,
+                    desc => ?DESC(emqx_resource_schema, resource_opts)
+                }
+            )},
+    RedisType =
+        {redis_type,
+            ?HOCON(
+                ?ENUM([single, sentinel, cluster]),
+                #{required => true, desc => ?DESC(redis_type)}
+            )},
+    [RedisType | lists:keyreplace(resource_opts, 1, Schema, ResOpts)];
+fields(resource_opts) ->
+    emqx_resource_schema:create_opts([
+        {batch_size, #{desc => ?DESC(batch_size)}},
+        {batch_time, #{desc => ?DESC(batch_time)}}
+    ]);
+%%=========================================
+%% HTTP API fields
+%%=========================================
+fields("post_connector") ->
+    emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("config_connector");
+fields("put_connector") ->
+    fields("config_connector");
+fields("get_connector") ->
+    emqx_bridge_schema:status_fields() ++
+        fields("post_connector");
+fields("get_bridge_v2") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2");
+fields("post_bridge_v2") ->
+    emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_bridge_v2");
+fields("put_bridge_v2") ->
+    fields(redis_action);
+fields("get_single") ->
+    emqx_bridge_schema:status_fields() ++ fields("put_single");
+fields("put_single") ->
+    fields("config_connector");
+fields("post_single") ->
+    emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_single").
+
+desc("config_connector") ->
+    ?DESC(emqx_bridge_redis, "desc_config");
+desc(redis_action) ->
+    ?DESC(redis_action);
+desc(resource_opts) ->
+    ?DESC(emqx_resource_schema, resource_opts);
+desc(_Name) ->
+    undefined.
+
+resource_opts_converter(undefined, _Opts) ->
+    undefined;
+resource_opts_converter(Conf, _Opts) ->
+    maps:map(
+        fun(_Name, SubConf) ->
+            case SubConf of
+                #{<<"redis_type">> := <<"cluster">>} ->
+                    ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}),
+                    %% cluster don't support batch
+                    SubConf#{
+                        <<"resource_opts">> =>
+                            ResOpts#{<<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>}
+                    };
+                _ ->
+                    SubConf
+            end
+        end,
+        Conf
+    ).
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_bridge_v2_schema' "unofficial" API
+%%-------------------------------------------------------------------------------------------------
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"redis_single_producer">> => #{
+                summary => <<"Redis Single Producer Action">>,
+                value => action_example(single, Method)
+            }
+        },
+        #{
+            <<"redis_sentinel_producer">> => #{
+                summary => <<"Redis Sentinel Producer Action">>,
+                value => action_example(sentinel, Method)
+            }
+        },
+        #{
+            <<"redis_cluster_producer">> => #{
+                summary => <<"Redis Cluster Producer Action">>,
+                value => action_example(cluster, Method)
+            }
+        }
+    ].
+
+connector_examples(Method) ->
+    [
+        #{
+            <<"redis_single_producer">> => #{
+                summary => <<"Redis Single Producer Connector">>,
+                value => connector_example(single, Method)
+            }
+        },
+        #{
+            <<"redis_cluster_producer">> => #{
+                summary => <<"Redis Cluster Producer Connector">>,
+                value => connector_example(cluster, Method)
+            }
+        },
+        #{
+            <<"redis_sentinel_producer">> => #{
+                summary => <<"Redis Sentinel Producer Connector">>,
+                value => connector_example(sentinel, Method)
+            }
+        }
+    ].
+
+conn_bridge_examples(Method) ->
+    emqx_bridge_redis:conn_bridge_examples(Method).
+
+action_example(RedisType, post) ->
+    maps:merge(
+        action_example(RedisType, put),
+        #{
+            type => <<"redis">>,
+            name => <<"my_action">>
+        }
+    );
+action_example(RedisType, get) ->
+    maps:merge(
+        action_example(RedisType, put),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        }
+    );
+action_example(RedisType, put) ->
+    #{
+        redis_type => RedisType,
+        enable => true,
+        connector => <<"my_connector_name">>,
+        description => <<"My action">>,
+        parameters => #{
+            command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>]
+        },
+        resource_opts => #{batch_size => 1}
+    }.
+
+connector_example(RedisType, get) ->
+    maps:merge(
+        connector_example(RedisType, put),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        }
+    );
+connector_example(RedisType, post) ->
+    maps:merge(
+        connector_example(RedisType, put),
+        #{
+            type => <<"redis_single_producer">>,
+            name => <<"my_connector">>
+        }
+    );
+connector_example(RedisType, put) ->
+    #{
+        enable => true,
+        desc => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>,
+        parameters => connector_parameter(RedisType),
+        pool_size => 8,
+        database => 1,
+        username => <<"test">>,
+        password => <<"******">>,
+        auto_reconnect => true,
+        ssl => #{enable => false}
+    }.
+
+connector_parameter(single) ->
+    #{redis_type => single, server => <<"127.0.0.1:6379">>};
+connector_parameter(cluster) ->
+    #{redis_type => cluster, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>};
+connector_parameter(sentinel) ->
+    #{
+        redis_type => sentinel,
+        servers => <<"127.0.0.1:6379,127.0.0.2:6379">>,
+        sentinel => <<"myredismaster">>
+    }.

+ 41 - 34
apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl

@@ -56,6 +56,7 @@
 ).
 
 all() -> [{group, transports}, {group, rest}].
+suite() -> [{timetrap, {minutes, 20}}].
 
 groups() ->
     ResourceSpecificTCs = [t_create_delete_bridge],
@@ -143,15 +144,19 @@ redis_checks() ->
     end.
 
 end_per_suite(_Config) ->
-    ok = delete_all_bridges(),
+    ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(),
     ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
     ok = emqx_connector_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_resource]),
     _ = application:stop(emqx_connector),
     ok.
 
-init_per_testcase(_Testcase, Config) ->
+init_per_testcase(Testcase, Config0) ->
+    emqx_logger:set_log_level(debug),
     ok = delete_all_rules(),
-    ok = delete_all_bridges(),
+    ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    Name = <<(atom_to_binary(Testcase))/binary, UniqueNum/binary>>,
+    Config = [{bridge_name, Name} | Config0],
     case {?config(connector_type, Config), ?config(batch_mode, Config)} of
         {undefined, _} ->
             Config;
@@ -165,7 +170,13 @@ init_per_testcase(_Testcase, Config) ->
             IsBatch = (BatchMode =:= batch_on),
             BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS),
             BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig},
-            [{bridge_config, BridgeConfig1}, {is_batch, IsBatch} | Config]
+            BridgeType = list_to_atom(atom_to_list(RedisType) ++ "_producer"),
+            [
+                {bridge_type, BridgeType},
+                {bridge_config, BridgeConfig1},
+                {is_batch, IsBatch}
+                | Config
+            ]
     end.
 
 end_per_testcase(_Testcase, Config) ->
@@ -173,10 +184,18 @@ end_per_testcase(_Testcase, Config) ->
     ProxyPort = ?config(proxy_port, Config),
     ok = snabbkaffe:stop(),
     emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-    ok = delete_all_bridges().
+    ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors().
 
 t_create_delete_bridge(Config) ->
-    Name = <<"mybridge">>,
+    Pid = erlang:whereis(eredis_sentinel),
+    ct:pal("t_create_detele_bridge:~p~n", [
+        #{
+            config => Config,
+            sentinel => Pid,
+            eredis_sentinel => Pid =/= undefined andalso erlang:process_info(Pid)
+        }
+    ]),
+    Name = ?config(bridge_name, Config),
     Type = ?config(connector_type, Config),
     BridgeConfig = ?config(bridge_config, Config),
     IsBatch = ?config(is_batch, Config),
@@ -184,13 +203,11 @@ t_create_delete_bridge(Config) ->
         {ok, _},
         emqx_bridge:create(Type, Name, BridgeConfig)
     ),
-
     ResourceId = emqx_bridge_resource:resource_id(Type, Name),
-
     ?WAIT(
         {ok, connected},
         emqx_resource:health_check(ResourceId),
-        5
+        10
     ),
 
     RedisType = atom_to_binary(Type),
@@ -244,7 +261,7 @@ t_check_values(_Config) ->
     ).
 
 t_check_replay(Config) ->
-    Name = <<"toxic_bridge">>,
+    Name = ?config(bridge_name, Config),
     Type = <<"redis_single">>,
     Topic = <<"local_topic/test">>,
     ProxyName = "redis_single_tcp",
@@ -324,15 +341,15 @@ t_permanent_error(_Config) ->
     ),
     ok = emqx_bridge:remove(Type, Name).
 
-t_auth_username_password(_Config) ->
-    Name = <<"mybridge">>,
+t_auth_username_password(Config) ->
+    Name = ?config(bridge_name, Config),
     Type = <<"redis_single">>,
-    ResourceId = emqx_bridge_resource:resource_id(Type, Name),
     BridgeConfig = username_password_redis_bridge_config(),
     ?assertMatch(
         {ok, _},
         emqx_bridge:create(Type, Name, BridgeConfig)
     ),
+    ResourceId = emqx_bridge_resource:resource_id(Type, Name),
     ?WAIT(
         {ok, connected},
         emqx_resource:health_check(ResourceId),
@@ -340,16 +357,16 @@ t_auth_username_password(_Config) ->
     ),
     ok = emqx_bridge:remove(Type, Name).
 
-t_auth_error_username_password(_Config) ->
-    Name = <<"mybridge">>,
+t_auth_error_username_password(Config) ->
+    Name = ?config(bridge_name, Config),
     Type = <<"redis_single">>,
-    ResourceId = emqx_bridge_resource:resource_id(Type, Name),
     BridgeConfig0 = username_password_redis_bridge_config(),
     BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}),
     ?assertMatch(
         {ok, _},
         emqx_bridge:create(Type, Name, BridgeConfig)
     ),
+    ResourceId = emqx_bridge_resource:resource_id(Type, Name),
     ?WAIT(
         {ok, disconnected},
         emqx_resource:health_check(ResourceId),
@@ -361,16 +378,16 @@ t_auth_error_username_password(_Config) ->
     ),
     ok = emqx_bridge:remove(Type, Name).
 
-t_auth_error_password_only(_Config) ->
-    Name = <<"mybridge">>,
+t_auth_error_password_only(Config) ->
+    Name = ?config(bridge_name, Config),
     Type = <<"redis_single">>,
-    ResourceId = emqx_bridge_resource:resource_id(Type, Name),
     BridgeConfig0 = toxiproxy_redis_bridge_config(),
     BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}),
     ?assertMatch(
         {ok, _},
         emqx_bridge:create(Type, Name, BridgeConfig)
     ),
+    ResourceId = emqx_bridge_resource:resource_id(Type, Name),
     ?assertEqual(
         {ok, disconnected},
         emqx_resource:health_check(ResourceId)
@@ -382,7 +399,7 @@ t_auth_error_password_only(_Config) ->
     ok = emqx_bridge:remove(Type, Name).
 
 t_create_disconnected(Config) ->
-    Name = <<"toxic_bridge">>,
+    Name = ?config(bridge_name, Config),
     Type = <<"redis_single">>,
 
     ?check_trace(
@@ -450,10 +467,8 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
 added_msgs(ResourceId, BaseTopic, Payload) ->
     lists:flatmap(
         fun(K) ->
-            {ok, Results} = emqx_resource:simple_sync_query(
-                ResourceId,
-                {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]}
-            ),
+            Message = {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]},
+            {ok, Results} = emqx_resource:simple_sync_query(ResourceId, Message),
             [El || El <- Results, El =:= Payload]
         end,
         [format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)]
@@ -482,14 +497,6 @@ delete_all_rules() ->
         emqx_rule_engine:get_rules()
     ).
 
-delete_all_bridges() ->
-    lists:foreach(
-        fun(#{name := Name, type := Type}) ->
-            emqx_bridge:remove(Type, Name)
-        end,
-        emqx_bridge:list()
-    ).
-
 all_test_hosts() ->
     Confs = [
         ?REDIS_TOXYPROXY_CONNECT_CONFIG
@@ -554,12 +561,12 @@ redis_connect_configs() ->
             tcp => #{
                 <<"servers">> => <<"redis-sentinel:26379">>,
                 <<"redis_type">> => <<"sentinel">>,
-                <<"sentinel">> => <<"mymaster">>
+                <<"sentinel">> => <<"mytcpmaster">>
             },
             tls => #{
                 <<"servers">> => <<"redis-sentinel-tls:26380">>,
                 <<"redis_type">> => <<"sentinel">>,
-                <<"sentinel">> => <<"mymaster">>,
+                <<"sentinel">> => <<"mytlsmaster">>,
                 <<"ssl">> => redis_connect_ssl_opts(redis_sentinel)
             }
         },

+ 3 - 0
apps/emqx_connector/src/emqx_connector.app.src

@@ -9,6 +9,9 @@
         stdlib,
         ecpool,
         emqx_resource,
+        eredis,
+        %% eredis_cluster has supervisor should be started before emqx_connector
+        %% otherwise the first start redis_cluster will fail.
         eredis_cluster,
         ehttpc,
         jose,

+ 14 - 2
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -42,6 +42,8 @@ resource_type(syskeeper_proxy) ->
     emqx_bridge_syskeeper_proxy_server;
 resource_type(timescale) ->
     emqx_postgresql;
+resource_type(redis) ->
+    emqx_bridge_redis_connector;
 resource_type(Type) ->
     error({unknown_connector_type, Type}).
 
@@ -139,6 +141,14 @@ connector_structs() ->
                     desc => <<"Matrix Connector Config">>,
                     required => false
                 }
+            )},
+        {redis,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")),
+                #{
+                    desc => <<"Redis Connector Config">>,
+                    required => false
+                }
             )}
     ].
 
@@ -153,7 +163,8 @@ schema_modules() ->
         emqx_bridge_syskeeper_connector,
         emqx_bridge_syskeeper_proxy,
         emqx_bridge_timescale,
-        emqx_postgresql_connector_schema
+        emqx_postgresql_connector_schema,
+        emqx_bridge_redis_schema
     ].
 
 api_schemas(Method) ->
@@ -177,7 +188,8 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
         api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
         api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
-        api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector")
+        api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
+        api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector")
     ].
 
 api_ref(Module, Type, Method) ->

+ 27 - 12
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -103,17 +103,32 @@ schema_modules() ->
     [emqx_bridge_http_schema].
 -endif.
 
-connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
-connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
-connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer];
-connector_type_to_bridge_types(http) -> [http, webhook];
-connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
-connector_type_to_bridge_types(matrix) -> [matrix];
-connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
-connector_type_to_bridge_types(pgsql) -> [pgsql];
-connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
-connector_type_to_bridge_types(syskeeper_proxy) -> [];
-connector_type_to_bridge_types(timescale) -> [timescale].
+%% @doc Return old bridge(v1) and/or connector(v2) type
+%% from the latest connector type name.
+connector_type_to_bridge_types(http) ->
+    [webhook, http];
+connector_type_to_bridge_types(azure_event_hub_producer) ->
+    [azure_event_hub_producer];
+connector_type_to_bridge_types(confluent_producer) ->
+    [confluent_producer];
+connector_type_to_bridge_types(gcp_pubsub_producer) ->
+    [gcp_pubsub, gcp_pubsub_producer];
+connector_type_to_bridge_types(kafka_producer) ->
+    [kafka, kafka_producer];
+connector_type_to_bridge_types(matrix) ->
+    [matrix];
+connector_type_to_bridge_types(mongodb) ->
+    [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
+connector_type_to_bridge_types(pgsql) ->
+    [pgsql];
+connector_type_to_bridge_types(syskeeper_forwarder) ->
+    [syskeeper_forwarder];
+connector_type_to_bridge_types(syskeeper_proxy) ->
+    [];
+connector_type_to_bridge_types(timescale) ->
+    [timescale];
+connector_type_to_bridge_types(redis) ->
+    [redis, redis_single, redis_sentinel, redis_cluster].
 
 actions_config_name() -> <<"actions">>.
 
@@ -158,7 +173,7 @@ split_bridge_to_connector_and_action(
                     BridgeType, BridgeV1Conf
                 );
             false ->
-                %% We do an automatic transfomation to get the connector config
+                %% We do an automatic transformation to get the connector config
                 %% if the callback is not defined.
                 %% Get connector fields from bridge config
                 lists:foldl(

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -844,7 +844,7 @@ parse_object_loop(PropList0, Module, Options) ->
     ),
     parse_object_loop(PropList, Module, Options, _Props = [], _Required = [], _Refs = []).
 
-parse_object_loop([], _Modlue, _Options, Props, Required, Refs) ->
+parse_object_loop([], _Module, _Options, Props, Required, Refs) ->
     {lists:reverse(Props), lists:usort(Required), Refs};
 parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs) ->
     NameBin = to_bin(Name),

+ 11 - 5
apps/emqx_management/src/emqx_mgmt_api_data_backup.erl

@@ -204,7 +204,7 @@ data_export(post, _Request) ->
 data_import(post, #{body := #{<<"filename">> := FileName} = Body}) ->
     case safe_parse_node(Body) of
         {error, Msg} ->
-            {400, #{code => 'BAD_REQUEST', message => Msg}};
+            {400, #{code => ?BAD_REQUEST, message => Msg}};
         FileNode ->
             CoreNode = core_node(FileNode),
             response(
@@ -231,20 +231,23 @@ data_files(post, #{body := #{<<"filename">> := #{type := _} = File}}) ->
         ok ->
             {204};
         {error, Reason} ->
-            {400, #{code => 'BAD_REQUEST', message => emqx_mgmt_data_backup:format_error(Reason)}}
+            {400, #{code => ?BAD_REQUEST, message => emqx_mgmt_data_backup:format_error(Reason)}}
     end;
+data_files(post, #{body := _}) ->
+    {400, #{code => ?BAD_REQUEST, message => "Missing required parameter: filename"}};
 data_files(get, #{query_string := PageParams}) ->
     case emqx_mgmt_api:parse_pager_params(PageParams) of
         false ->
             {400, #{code => ?BAD_REQUEST, message => <<"page_limit_invalid">>}};
         #{page := Page, limit := Limit} = Pager ->
-            {200, #{data => list_backup_files(Page, Limit), meta => Pager}}
+            {Count, HasNext, Data} = list_backup_files(Page, Limit),
+            {200, #{data => Data, meta => Pager#{count => Count, hasnext => HasNext}}}
     end.
 
 data_file_by_name(Method, #{bindings := #{filename := Filename}, query_string := QS}) ->
     case safe_parse_node(QS) of
         {error, Msg} ->
-            {400, #{code => 'BAD_REQUEST', message => Msg}};
+            {400, #{code => ?BAD_REQUEST, message => Msg}};
         Node ->
             case get_or_delete_file(Method, Filename, Node) of
                 {error, not_found} ->
@@ -293,7 +296,10 @@ response({error, Reason}) ->
 
 list_backup_files(Page, Limit) ->
     Start = Page * Limit - Limit + 1,
-    lists:sublist(list_backup_files(), Start, Limit).
+    AllFiles = list_backup_files(),
+    Count = length(AllFiles),
+    HasNext = Start + Limit - 1 < Count,
+    {Count, HasNext, lists:sublist(AllFiles, Start, Limit)}.
 
 list_backup_files() ->
     Nodes = emqx:running_nodes(),

+ 15 - 5
apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl

@@ -80,22 +80,32 @@ t_list_backups(Config) ->
     [{ok, _} = export_backup(?NODE2_PORT, Auth) || _ <- lists:seq(1, 10)],
 
     {ok, RespBody} = list_backups(?NODE1_PORT, Auth, <<"1">>, <<"100">>),
-    #{<<"data">> := Data, <<"meta">> := _} = emqx_utils_json:decode(RespBody),
+    #{<<"data">> := Data, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
+        RespBody
+    ),
     ?assertEqual(20, length(Data)),
 
     {ok, EmptyRespBody} = list_backups(?NODE2_PORT, Auth, <<"2">>, <<"100">>),
-    #{<<"data">> := EmptyData, <<"meta">> := _} = emqx_utils_json:decode(EmptyRespBody),
+    #{<<"data">> := EmptyData, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
+        EmptyRespBody
+    ),
     ?assertEqual(0, length(EmptyData)),
 
     {ok, RespBodyP1} = list_backups(?NODE3_PORT, Auth, <<"1">>, <<"10">>),
     {ok, RespBodyP2} = list_backups(?NODE3_PORT, Auth, <<"2">>, <<"10">>),
     {ok, RespBodyP3} = list_backups(?NODE3_PORT, Auth, <<"3">>, <<"10">>),
 
-    #{<<"data">> := DataP1, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP1),
+    #{<<"data">> := DataP1, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := true}} = emqx_utils_json:decode(
+        RespBodyP1
+    ),
     ?assertEqual(10, length(DataP1)),
-    #{<<"data">> := DataP2, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP2),
+    #{<<"data">> := DataP2, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
+        RespBodyP2
+    ),
     ?assertEqual(10, length(DataP2)),
-    #{<<"data">> := DataP3, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP3),
+    #{<<"data">> := DataP3, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
+        RespBodyP3
+    ),
     ?assertEqual(0, length(DataP3)),
 
     ?assertEqual(Data, DataP1 ++ DataP2).

+ 100 - 76
apps/emqx_redis/src/emqx_redis.erl

@@ -20,7 +20,7 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
 
--export([roots/0, fields/1]).
+-export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]).
 
 -behaviour(emqx_resource).
 
@@ -45,60 +45,53 @@
 }).
 
 %%=====================================================================
+namespace() -> "redis".
+
 roots() ->
     [
         {config, #{
             type => hoconsc:union(
                 [
-                    hoconsc:ref(?MODULE, cluster),
-                    hoconsc:ref(?MODULE, single),
-                    hoconsc:ref(?MODULE, sentinel)
+                    ?R_REF(redis_cluster),
+                    ?R_REF(redis_single),
+                    ?R_REF(redis_sentinel)
                 ]
             )
         }}
     ].
 
-fields(single) ->
+fields(redis_single) ->
+    fields(redis_single_connector) ++
+        redis_fields() ++
+        emqx_connector_schema_lib:ssl_fields();
+fields(redis_single_connector) ->
     [
         {server, server()},
-        {redis_type, #{
-            type => single,
-            default => single,
-            required => false,
-            desc => ?DESC("single")
-        }}
-    ] ++
-        redis_fields() ++
+        redis_type(single)
+    ];
+fields(redis_cluster) ->
+    fields(redis_cluster_connector) ++
+        lists:keydelete(database, 1, redis_fields()) ++
         emqx_connector_schema_lib:ssl_fields();
-fields(cluster) ->
+fields(redis_cluster_connector) ->
     [
         {servers, servers()},
-        {redis_type, #{
-            type => cluster,
-            default => cluster,
-            required => false,
-            desc => ?DESC("cluster")
-        }}
-    ] ++
-        lists:keydelete(database, 1, redis_fields()) ++
+        redis_type(cluster)
+    ];
+fields(redis_sentinel) ->
+    fields(redis_sentinel_connector) ++
+        redis_fields() ++
         emqx_connector_schema_lib:ssl_fields();
-fields(sentinel) ->
+fields(redis_sentinel_connector) ->
     [
         {servers, servers()},
-        {redis_type, #{
-            type => sentinel,
-            default => sentinel,
-            required => false,
-            desc => ?DESC("sentinel")
-        }},
+        redis_type(sentinel),
         {sentinel, #{
             type => string(),
             required => true,
             desc => ?DESC("sentinel_desc")
         }}
-    ] ++
-        redis_fields() ++
-        emqx_connector_schema_lib:ssl_fields().
+    ].
 
 server() ->
     Meta = #{desc => ?DESC("server")},
@@ -108,64 +101,52 @@ servers() ->
     Meta = #{desc => ?DESC("servers")},
     emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS).
 
+desc(redis_cluster_connector) ->
+    ?DESC(redis_cluster_connector);
+desc(redis_single_connector) ->
+    ?DESC(redis_single_connector);
+desc(redis_sentinel_connector) ->
+    ?DESC(redis_sentinel_connector);
+desc(_) ->
+    undefined.
+
 %% ===================================================================
 
+redis_type(Type) ->
+    {redis_type, #{
+        type => Type,
+        default => Type,
+        required => false,
+        desc => ?DESC(Type)
+    }}.
+
 callback_mode() -> always_sync.
 
-on_start(
-    InstId,
-    #{
-        redis_type := Type,
-        pool_size := PoolSize,
-        ssl := SSL
-    } = Config
-) ->
+on_start(InstId, Config0) ->
     ?SLOG(info, #{
         msg => "starting_redis_connector",
         connector => InstId,
-        config => emqx_utils:redact(Config)
+        config => emqx_utils:redact(Config0)
     }),
-    ConfKey =
-        case Type of
-            single -> server;
-            _ -> servers
-        end,
-    Servers0 = maps:get(ConfKey, Config),
-    Servers1 = lists:map(
-        fun(#{hostname := Host, port := Port}) ->
-            {Host, Port}
-        end,
-        emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)
-    ),
-    Servers = [{servers, Servers1}],
-    Database =
-        case Type of
-            cluster -> [];
-            _ -> [{database, maps:get(database, Config)}]
-        end,
+    Config = config(Config0),
+    #{pool_size := PoolSize, ssl := SSL, redis_type := Type} = Config,
+    Options = ssl_options(SSL) ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
     Opts =
         [
-            {pool_size, PoolSize},
             {username, maps:get(username, Config, undefined)},
             {password, maps:get(password, Config, "")},
+            {servers, servers(Config)},
+            {options, Options},
+            {pool_size, PoolSize},
             {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
-        ] ++ Database ++ Servers,
-    Options =
-        case maps:get(enable, SSL) of
-            true ->
-                [
-                    {ssl, true},
-                    {ssl_options, emqx_tls_lib:to_client_opts(SSL)}
-                ];
-            false ->
-                [{ssl, false}]
-        end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
+        ] ++ database(Config),
+
     State = #{pool_name => InstId, type => Type},
     ok = emqx_resource:allocate_resource(InstId, type, Type),
     ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
     case Type of
         cluster ->
-            case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of
+            case eredis_cluster:start_pool(InstId, Opts) of
                 {ok, _} ->
                     {ok, State};
                 {ok, _, _} ->
@@ -174,7 +155,7 @@ on_start(
                     {error, Reason}
             end;
         _ ->
-            case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ [{options, Options}]) of
+            case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
                 ok ->
                     {ok, State};
                 {error, Reason} ->
@@ -182,6 +163,14 @@ on_start(
             end
     end.
 
+ssl_options(SSL = #{enable := true}) ->
+    [
+        {ssl, true},
+        {ssl_options, emqx_tls_lib:to_client_opts(SSL)}
+    ];
+ssl_options(#{enable := false}) ->
+    [{ssl, false}].
+
 on_stop(InstId, _State) ->
     ?SLOG(info, #{
         msg => "stopping_redis_connector",
@@ -189,7 +178,11 @@ on_stop(InstId, _State) ->
     }),
     case emqx_resource:get_allocated_resources(InstId) of
         #{pool_name := PoolName, type := cluster} ->
-            eredis_cluster:stop_pool(PoolName);
+            case eredis_cluster:stop_pool(PoolName) of
+                {error, not_found} -> ok;
+                ok -> ok;
+                Error -> Error
+            end;
         #{pool_name := PoolName, type := _} ->
             emqx_resource_pool:stop(PoolName);
         _ ->
@@ -244,8 +237,17 @@ is_unrecoverable_error(_) ->
 on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
     case eredis_cluster:pool_exists(PoolName) of
         true ->
-            Health = eredis_cluster:ping_all(PoolName),
-            status_result(Health);
+            %% eredis_cluster has null slot even pool_exists when emqx start before redis cluster.
+            %% we need restart eredis_cluster pool when pool_worker(slot) is empty.
+            %% If the pool is empty, it means that there are no workers attempting to reconnect.
+            %% In this case, we can directly consider it as a disconnect and then proceed to reconnect.
+            case eredis_cluster_monitor:get_all_pools(PoolName) of
+                [] ->
+                    disconnected;
+                [_ | _] ->
+                    Health = eredis_cluster:ping_all(PoolName),
+                    status_result(Health)
+            end;
         false ->
             disconnected
     end;
@@ -289,6 +291,28 @@ wrap_qp_result(Results) when is_list(Results) ->
     end.
 
 %% ===================================================================
+%% parameters for connector
+config(#{parameters := #{} = Param} = Config) ->
+    maps:merge(maps:remove(parameters, Config), Param);
+%% is for authn/authz
+config(Config) ->
+    Config.
+
+servers(#{server := Server}) ->
+    servers(Server);
+servers(#{servers := Servers}) ->
+    servers(Servers);
+servers(Servers) ->
+    lists:map(
+        fun(#{hostname := Host, port := Port}) ->
+            {Host, Port}
+        end,
+        emqx_schema:parse_servers(Servers, ?REDIS_HOST_OPTIONS)
+    ).
+
+database(#{redis_type := cluster}) -> [];
+database(#{database := Database}) -> [{database, Database}].
+
 connect(Opts) ->
     eredis:start_link(Opts).
 

+ 1 - 1
apps/emqx_redis/test/emqx_redis_SUITE.erl

@@ -223,7 +223,7 @@ redis_config_base(Type, ServerKey) ->
         "sentinel" ->
             Host = ?REDIS_SENTINEL_HOST,
             Port = ?REDIS_SENTINEL_PORT,
-            MaybeSentinel = "    sentinel = mymaster\n",
+            MaybeSentinel = "    sentinel = mytcpmaster\n",
             MaybeDatabase = "    database = 1\n";
         "single" ->
             Host = ?REDIS_SINGLE_HOST,

+ 5 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.app.src

@@ -13,7 +13,11 @@
         uuid,
         emqx,
         emqx_utils,
-        emqx_ctl
+        emqx_ctl,
+        %% rule_engine should wait for bridge connector start,
+        %% it's will check action/connector ref's exist.
+        emqx_bridge,
+        emqx_connector
     ]},
     {mod, {emqx_rule_engine_app, []}},
     {env, []},

+ 2 - 0
changes/ce/feat-12040.en.md

@@ -0,0 +1,2 @@
+Upgrade QUIC stack, more features on the way!
+

+ 2 - 2
mix.exs

@@ -64,7 +64,7 @@ defmodule EMQXUmbrella.MixProject do
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
       # maybe forbid to fetch quicer
       {:emqtt,
-       github: "emqx/emqtt", tag: "1.9.6", override: true, system_env: maybe_no_quic_env()},
+       github: "emqx/emqtt", tag: "1.9.7", override: true, system_env: maybe_no_quic_env()},
       {:rulesql, github: "emqx/rulesql", tag: "0.1.7"},
       {:observer_cli, "1.7.1"},
       {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
@@ -830,7 +830,7 @@ defmodule EMQXUmbrella.MixProject do
   defp quicer_dep() do
     if enable_quicer?(),
       # in conflict with emqx and emqtt
-      do: [{:quicer, github: "emqx/quic", tag: "0.0.202", override: true}],
+      do: [{:quicer, github: "emqx/quic", tag: "0.0.303", override: true}],
       else: []
   end
 

+ 1 - 1
rebar.config

@@ -69,7 +69,7 @@
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
     , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
-    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.6"}}}
+    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.7"}}}
     , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
     , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}

+ 1 - 1
rebar.config.erl

@@ -39,7 +39,7 @@ bcrypt() ->
     {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.1"}}}.
 
 quicer() ->
-    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.202"}}}.
+    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.303"}}}.
 
 jq() ->
     {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.12"}}}.

+ 7 - 2
rel/i18n/emqx_bridge_redis.hocon

@@ -32,14 +32,19 @@ desc_type.desc:
 desc_type.label:
 """Bridge Type"""
 
-local_topic.desc:
+desc_local_topic.desc:
 """The MQTT topic filter to be forwarded to Redis. All MQTT 'PUBLISH' messages with the topic
 matching the local_topic will be forwarded.<br/>
 NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
 configured, then both the data got from the rule and the MQTT messages that match local_topic
 will be forwarded."""
 
-local_topic.label:
+desc_local_topic.label:
 """Local Topic"""
 
+desc_action_parameters.desc:
+"""The parameters of the action."""
+desc_action_parameters.label:
+"""Action Parameters"""
+
 }

+ 41 - 0
rel/i18n/emqx_bridge_redis_schema.hocon

@@ -0,0 +1,41 @@
+emqx_bridge_redis_schema {
+redis_parameters.label:
+"""Redis Type Specific Parameters"""
+
+redis_parameters.desc:
+"""Set of parameters specific for the given type of this Redis connector, `redis_type` can be one of `single`, `cluster` or `sentinel`."""
+
+producer_action.desc:
+"""The parameters of the action."""
+producer_action.label:
+"""Action Parameters"""
+
+redis_type.label:
+"""Redis Type"""
+redis_type.desc:
+"""Single mode. Must be set to 'single' when Redis server is running in single mode.
+Sentinel mode. Must be set to 'sentinel' when Redis server is running in sentinel mode.
+Cluster mode. Must be set to 'cluster' when Redis server is running in clustered mode."""
+
+batch_size.label:
+"""Batch Size"""
+batch_size.desc:
+"""This parameter defines the upper limit of the batch count.
+Setting this value to 1 effectively disables batching, as it indicates that only one item will be processed per batch.
+Note on Redis Cluster Mode:
+In the context of Redis Cluster Mode, it is important to note that batching is not supported.
+Consequently, the batch_size is always set to 1,
+reflecting the mode inherent limitation in handling batch operations."""
+
+batch_time.desc:
+"""Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage."""
+
+batch_time.label:
+"""Max batch wait time, disable when in Redis Cluster Mode."""
+
+redis_action.label:
+"""Redis Action"""
+redis_action.desc:
+"""Action to interact with a Redis connector."""
+
+}

+ 6 - 0
rel/i18n/emqx_connector_schema.hocon

@@ -1,5 +1,11 @@
 emqx_connector_schema {
 
+config_enable.desc:
+"""Enable (true) or disable (false) this connector."""
+
+config_enable.label:
+"""Enable or Disable"""
+
 desc_connectors.desc:
 """Connectors that are used to connect to external systems"""
 desc_connectors.label:

+ 14 - 0
rel/i18n/emqx_redis.hocon

@@ -47,4 +47,18 @@ single.desc:
 single.label:
 """Single Mode"""
 
+redis_cluster_connector.label:
+"""Redis Cluster Connector"""
+redis_cluster_connector.desc:
+"""Redis connector in cluster mode"""
+
+redis_sentinel_connector.label:
+"""Redis Sentinel Connector"""
+redis_sentinel_connector.desc:
+"""Redis connector in sentinel mode"""
+
+redis_single_connector.label:
+"""Redis Single Connector"""
+redis_single_connector.desc:
+"""Redis connector in sentinel mode"""
 }

+ 14 - 4
scripts/pre-compile.sh

@@ -27,11 +27,21 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
 
 I18N_REPO_BRANCH="v$(./pkg-vsn.sh "${PROFILE_STR}" | tr -d '.' | cut -c 1-2)"
 
+DOWNLOAD_I18N_TRANSLATIONS=${DOWNLOAD_I18N_TRANSLATIONS:-true}
 # download desc (i18n) translations
-curl -L --silent --show-error \
-     --output "apps/emqx_dashboard/priv/desc.zh.hocon" \
-     "https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon"
+if [ "$DOWNLOAD_I18N_TRANSLATIONS" = "true" ]; then
+  echo "downloading i18n translation from emqx/emqx-i18n"
+  start=$(date +%s)
+  curl -L --silent --show-error \
+       --output "apps/emqx_dashboard/priv/desc.zh.hocon" \
+       "https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon"
+  end=$(date +%s)
+  duration=$(echo "$end $start" | awk '{print $1 - $2}')
+  echo "downloaded i18n translation in $duration seconds, set DOWNLOAD_I18N_TRANSLATIONS=false to skip"
+else
+  echo "skipping to download i18n translation from emqx/emqx-i18n, set DOWNLOAD_I18N_TRANSLATIONS=true to update"
+fi
 
 # TODO
-# make sbom a build artifcat
+# make sbom a build artifact
 # ./scripts/update-bom.sh "$PROFILE_STR" ./rel