Procházet zdrojové kódy

feat: expose `resource_opts.query_mode` for pulsar action

Fixes https://emqx.atlassian.net/browse/EMQX-12782
Thales Macedo Garitezi před 1 rokem
rodič
revize
9f97bff7d0

+ 4 - 1
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -889,7 +889,8 @@ t_sync_query_down(Config, Opts) ->
             ),
 
             ?force_ordering(
-                #{?snk_kind := call_query},
+                #{?snk_kind := SNKKind} when
+                    SNKKind =:= call_query orelse SNKKind =:= simple_query_enter,
                 #{?snk_kind := cut_connection, ?snk_span := start}
             ),
             %% Note: order of arguments here is reversed compared to `?force_ordering'.
@@ -913,6 +914,7 @@ t_sync_query_down(Config, Opts) ->
                     emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort)
                 )
             end),
+            ?tp("publishing_message", #{}),
             try
                 {_, {ok, _}} =
                     snabbkaffe:wait_async_action(
@@ -921,6 +923,7 @@ t_sync_query_down(Config, Opts) ->
                         infinity
                     )
             after
+                ?tp("healing_failure", #{}),
                 emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort)
             end,
             {ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity),

+ 1 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_pulsar, [
     {description, "EMQX Pulsar Bridge"},
-    {vsn, "0.2.2"},
+    {vsn, "0.2.3"},
     {registered, []},
     {applications, [
         kernel,

+ 27 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl

@@ -11,7 +11,8 @@
     action_type_name/0,
     connector_type_name/0,
     schema_module/0,
-    is_action/1
+    is_action/1,
+    connector_action_config_to_bridge_v1_config/2
 ]).
 
 is_action(_) -> true.
@@ -23,3 +24,28 @@ action_type_name() -> pulsar.
 connector_type_name() -> pulsar.
 
 schema_module() -> emqx_bridge_pulsar_pubsub_schema.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    BridgeV1Config1 = emqx_action_info:connector_action_config_to_bridge_v1_config(
+        ConnectorConfig, ActionConfig
+    ),
+    BridgeV1Config = maps:with(v1_fields(pulsar_producer), BridgeV1Config1),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun(RO) -> maps:with(v1_fields(producer_resource_opts), RO) end,
+        BridgeV1Config
+    ).
+
+%%------------------------------------------------------------------------------------------
+%% Internal helper functions
+%%------------------------------------------------------------------------------------------
+
+v1_fields(Struct) ->
+    [
+        to_bin(K)
+     || {K, _} <- emqx_bridge_pulsar:fields(Struct)
+    ].
+
+to_bin(B) when is_binary(B) -> B;
+to_bin(L) when is_list(L) -> list_to_binary(L);
+to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

+ 17 - 9
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl

@@ -58,6 +58,8 @@
 
 callback_mode() -> async_if_possible.
 
+query_mode(#{resource_opts := #{query_mode := sync}}) ->
+    simple_sync_internal_buffer;
 query_mode(_Config) ->
     simple_async_internal_buffer.
 
@@ -202,12 +204,17 @@ on_query(_InstanceId, {ChannelId, Message}, State) ->
                 sync_timeout => SyncTimeout,
                 is_async => false
             }),
-            try
-                pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
-            catch
-                error:timeout ->
-                    {error, timeout}
-            end
+            ?tp_span(
+                "pulsar_producer_query_enter",
+                #{instance_id => _InstanceId, message => Message, mode => sync},
+                try
+                    ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => sync}),
+                    pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
+                catch
+                    error:timeout ->
+                        {error, timeout}
+                end
+            )
     end.
 
 -spec on_query_async(
@@ -218,11 +225,11 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) ->
     #{channels := Channels} = State,
     case maps:find(ChannelId, Channels) of
         error ->
-            {error, channel_not_found};
+            {error, {unrecoverable_error, channel_not_found}};
         {ok, #{message := MessageTmpl, producers := Producers}} ->
             ?tp_span(
-                pulsar_producer_on_query_async,
-                #{instance_id => _InstanceId, message => Message},
+                "pulsar_producer_query_enter",
+                #{instance_id => _InstanceId, message => Message, mode => async},
                 on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn)
             )
     end.
@@ -233,6 +240,7 @@ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ->
         message => PulsarMessage,
         is_async => true
     }),
+    ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => async}),
     pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
 
 on_format_query_result({ok, Info}) ->

+ 1 - 3
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl

@@ -66,10 +66,8 @@ fields(action_resource_opts) ->
         batch_size,
         batch_time,
         worker_pool_size,
-        request_ttl,
         inflight_window,
-        max_buffer_bytes,
-        query_mode
+        max_buffer_bytes
     ],
     lists:filter(
         fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,

+ 7 - 2
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl

@@ -843,7 +843,8 @@ do_t_send_with_failure(Config, FailureType) ->
                         ?wait_async_action(
                             emqx:publish(Message0),
                             #{
-                                ?snk_kind := pulsar_producer_on_query_async,
+                                ?snk_kind := "pulsar_producer_query_enter",
+                                mode := async,
                                 ?snk_span := {complete, _}
                             },
                             5_000
@@ -970,7 +971,11 @@ t_producer_process_crash(Config) ->
             {_, {ok, _}} =
                 ?wait_async_action(
                     emqx:publish(Message0),
-                    #{?snk_kind := pulsar_producer_on_query_async, ?snk_span := {complete, _}},
+                    #{
+                        ?snk_kind := "pulsar_producer_query_enter",
+                        mode := async,
+                        ?snk_span := {complete, _}
+                    },
                     5_000
                 ),
             Data0 = receive_consumed(20_000),

+ 113 - 76
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl

@@ -23,31 +23,25 @@
 %%------------------------------------------------------------------------------
 
 all() ->
-    [
-        {group, plain},
-        {group, tls}
-    ].
+    All0 = emqx_common_test_helpers:all(?MODULE),
+    All = All0 -- matrix_cases(),
+    Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
+    Groups ++ All.
 
 groups() ->
-    AllTCs = emqx_common_test_helpers:all(?MODULE),
-    [
-        {plain, AllTCs},
-        {tls, AllTCs}
-    ].
+    emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
+
+matrix_cases() ->
+    emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    %% Ensure enterprise bridge module is loaded
-    _ = emqx_bridge_enterprise:module_info(),
-    {ok, Cwd} = file:get_cwd(),
-    PrivDir = ?config(priv_dir, Config),
-    WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd),
     Apps = emqx_cth_suite:start(
         lists:flatten([
             ?APPS,
             emqx_management,
             emqx_mgmt_api_test_util:emqx_dashboard()
         ]),
-        #{work_dir => WorkDir}
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
     [{suite_apps, Apps} | Config].
 
@@ -61,6 +55,7 @@ init_per_group(plain = Type, Config) ->
     case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
         true ->
             Config1 = common_init_per_group(),
+            ConnectorName = ?MODULE,
             NewConfig =
                 [
                     {proxy_name, ProxyName},
@@ -70,7 +65,7 @@ init_per_group(plain = Type, Config) ->
                     {use_tls, false}
                     | Config1 ++ Config
                 ],
-            create_connector(?MODULE, NewConfig),
+            create_connector(ConnectorName, NewConfig),
             NewConfig;
         false ->
             maybe_skip_without_ci()
@@ -82,6 +77,7 @@ init_per_group(tls = Type, Config) ->
     case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
         true ->
             Config1 = common_init_per_group(),
+            ConnectorName = ?MODULE,
             NewConfig =
                 [
                     {proxy_name, ProxyName},
@@ -91,17 +87,21 @@ init_per_group(tls = Type, Config) ->
                     {use_tls, true}
                     | Config1 ++ Config
                 ],
-            create_connector(?MODULE, NewConfig),
+            create_connector(ConnectorName, NewConfig),
             NewConfig;
         false ->
             maybe_skip_without_ci()
-    end.
+    end;
+init_per_group(_Group, Config) ->
+    Config.
 
 end_per_group(Group, Config) when
     Group =:= plain;
     Group =:= tls
 ->
     common_end_per_group(Config),
+    ok;
+end_per_group(_Group, _Config) ->
     ok.
 
 common_init_per_group() ->
@@ -189,66 +189,49 @@ pulsar_connector(Config) ->
         ":",
         integer_to_binary(PulsarPort)
     ]),
-    Connector = #{
-        <<"connectors">> => #{
-            <<"pulsar">> => #{
-                Name => #{
-                    <<"enable">> => true,
-                    <<"ssl">> => #{
-                        <<"enable">> => UseTLS,
-                        <<"verify">> => <<"verify_none">>,
-                        <<"server_name_indication">> => <<"auto">>
-                    },
-                    <<"authentication">> => <<"none">>,
-                    <<"servers">> => ServerURL
-                }
-            }
-        }
+    InnerConfigMap = #{
+        <<"enable">> => true,
+        <<"ssl">> => #{
+            <<"enable">> => UseTLS,
+            <<"verify">> => <<"verify_none">>,
+            <<"server_name_indication">> => <<"auto">>
+        },
+        <<"authentication">> => <<"none">>,
+        <<"servers">> => ServerURL
     },
-    parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name).
+    emqx_bridge_v2_testlib:parse_and_check_connector(?TYPE, Name, InnerConfigMap).
 
 pulsar_action(Config) ->
+    QueryMode = proplists:get_value(query_mode, Config, <<"sync">>),
     Name = atom_to_binary(?MODULE),
-    Action = #{
-        <<"actions">> => #{
-            <<"pulsar">> => #{
-                Name => #{
-                    <<"connector">> => Name,
-                    <<"enable">> => true,
-                    <<"parameters">> => #{
-                        <<"retention_period">> => <<"infinity">>,
-                        <<"max_batch_bytes">> => <<"1MB">>,
-                        <<"batch_size">> => 100,
-                        <<"strategy">> => <<"random">>,
-                        <<"buffer">> => #{
-                            <<"mode">> => <<"memory">>,
-                            <<"per_partition_limit">> => <<"10MB">>,
-                            <<"segment_bytes">> => <<"5MB">>,
-                            <<"memory_overload_protection">> => true
-                        },
-                        <<"message">> => #{
-                            <<"key">> => <<"${.clientid}">>,
-                            <<"value">> => <<"${.}">>
-                        },
-                        <<"pulsar_topic">> => ?config(pulsar_topic, Config)
-                    },
-                    <<"resource_opts">> => #{
-                        <<"health_check_interval">> => <<"1s">>,
-                        <<"metrics_flush_interval">> => <<"300ms">>
-                    }
-                }
-            }
+    InnerConfigMap = #{
+        <<"connector">> => Name,
+        <<"enable">> => true,
+        <<"parameters">> => #{
+            <<"retention_period">> => <<"infinity">>,
+            <<"max_batch_bytes">> => <<"1MB">>,
+            <<"batch_size">> => 100,
+            <<"strategy">> => <<"random">>,
+            <<"buffer">> => #{
+                <<"mode">> => <<"memory">>,
+                <<"per_partition_limit">> => <<"10MB">>,
+                <<"segment_bytes">> => <<"5MB">>,
+                <<"memory_overload_protection">> => true
+            },
+            <<"message">> => #{
+                <<"key">> => <<"${.clientid}">>,
+                <<"value">> => <<"${.}">>
+            },
+            <<"pulsar_topic">> => ?config(pulsar_topic, Config)
+        },
+        <<"resource_opts">> => #{
+            <<"query_mode">> => QueryMode,
+            <<"request_ttl">> => <<"1s">>,
+            <<"health_check_interval">> => <<"1s">>,
+            <<"metrics_flush_interval">> => <<"300ms">>
         }
     },
-    parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name).
-
-parse_and_check(Key, Mod, Conf, Name) ->
-    ConfStr = hocon_pp:do(Conf, #{}),
-    ct:pal(ConfStr),
-    {ok, RawConf} = hocon:binary(ConfStr, #{format => map}),
-    hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}),
-    #{Key := #{<<"pulsar">> := #{Name := RetConf}}} = RawConf,
-    RetConf.
+    emqx_bridge_v2_testlib:parse_and_check(action, ?TYPE, Name, InnerConfigMap).
 
 instance_id(Type, Name) ->
     ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name),
@@ -404,20 +387,44 @@ assert_status_api(Line, Type, Name, Status) ->
     ).
 -define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)).
 
+proplists_with(Keys, PList) ->
+    lists:filter(fun({K, _}) -> lists:member(K, Keys) end, PList).
+
+group_path(Config) ->
+    case emqx_common_test_helpers:group_path(Config) of
+        [] ->
+            undefined;
+        Path ->
+            Path
+    end.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_action_probe(Config) ->
+t_action_probe(matrix) ->
+    [[plain], [tls]];
+t_action_probe(Config) when is_list(Config) ->
     Name = atom_to_binary(?FUNCTION_NAME),
     Action = pulsar_action(Config),
     {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
     ?assertMatch({{_, 204, _}, _, _}, Res0),
     ok.
 
-t_action(Config) ->
+t_action(matrix) ->
+    [
+        [plain, async],
+        [plain, sync],
+        [tls, async]
+    ];
+t_action(Config) when is_list(Config) ->
+    QueryMode =
+        case group_path(Config) of
+            [_, QM | _] -> atom_to_binary(QM);
+            _ -> <<"async">>
+        end,
     Name = atom_to_binary(?FUNCTION_NAME),
-    create_action(Name, Config),
+    create_action(Name, [{query_mode, QueryMode} | Config]),
     Actions = emqx_bridge_v2:list(actions),
     Any = fun(#{name := BName}) -> BName =:= Name end,
     ?assert(lists:any(Any, Actions), Actions),
@@ -465,7 +472,9 @@ t_action(Config) ->
 
 %% Tests that deleting/disabling an action that share the same Pulsar topic with other
 %% actions do not disturb the latter.
-t_multiple_actions_sharing_topic(Config) ->
+t_multiple_actions_sharing_topic(matrix) ->
+    [[plain], [tls]];
+t_multiple_actions_sharing_topic(Config) when is_list(Config) ->
     Type = ?TYPE,
     ConnectorName = <<"c">>,
     ConnectorConfig = pulsar_connector(Config),
@@ -546,3 +555,31 @@ t_multiple_actions_sharing_topic(Config) ->
         []
     ),
     ok.
+
+t_sync_query_down(matrix) ->
+    [[plain]];
+t_sync_query_down(Config0) when is_list(Config0) ->
+    ct:timetrap({seconds, 15}),
+    Payload = #{<<"x">> => <<"some data">>},
+    PayloadBin = emqx_utils_json:encode(Payload),
+    ClientId = <<"some_client">>,
+    Opts = #{
+        make_message_fn => fun(Topic) -> emqx_message:make(ClientId, Topic, PayloadBin) end,
+        enter_tp_filter =>
+            ?match_event(#{?snk_kind := "pulsar_producer_send"}),
+        error_tp_filter =>
+            ?match_event(#{?snk_kind := "resource_simple_sync_internal_buffer_query_timeout"}),
+        success_tp_filter =>
+            ?match_event(#{?snk_kind := pulsar_echo_consumer_message})
+    },
+    Config = [
+        {connector_type, ?TYPE},
+        {connector_name, ?FUNCTION_NAME},
+        {connector_config, pulsar_connector(Config0)},
+        {action_type, ?TYPE},
+        {action_name, ?FUNCTION_NAME},
+        {action_config, pulsar_action(Config0)}
+        | proplists_with([proxy_name, proxy_host, proxy_port], Config0)
+    ],
+    emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts),
+    ok.

+ 11 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -198,6 +198,9 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
         QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
         case simple_async_query(Id, Request, QueryOpts) of
             {error, _} = Error ->
+                ?tp("resource_simple_sync_internal_buffer_query_error", #{
+                    id => Id, request => Request
+                }),
                 Error;
             {async_return, {error, _} = Error} ->
                 Error;
@@ -210,7 +213,11 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
                     receive
                         {ReplyAlias, Response} ->
                             Response
-                    after 0 -> {error, timeout}
+                    after 0 ->
+                        ?tp("resource_simple_sync_internal_buffer_query_timeout", #{
+                            id => Id, request => Request
+                        }),
+                        {error, timeout}
                     end
                 end
         end
@@ -1302,6 +1309,7 @@ do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Res
     ?tp(simple_query_override, #{query_mode => ReqQM}),
     #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
     CallMode = call_mode(QM, CBM),
+    ?tp(simple_query_enter, #{}),
     apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
 do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
     ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
@@ -1309,6 +1317,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Res
     %% The connector supports buffer, send even in disconnected state
     #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
     CallMode = call_mode(QM, CBM),
+    ?tp(simple_query_enter, #{}),
     apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
 do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) ->
     %% when calling from the buffer worker or other simple queries,
@@ -2297,6 +2306,7 @@ reply_call(Alias, Response) ->
 %% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to'
 %% callbacks.
 reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) ->
+    ?tp("reply_call_internal_buffer", #{}),
     ?MODULE:reply_call(ReplyAlias, Response),
     do_reply_caller(MaybeReplyTo, Response).
 

+ 1 - 0
changes/ee/feat-13546.en.md

@@ -0,0 +1 @@
+Added the option to configure the query mode for Pulsar Producer action.