Bläddra i källkod

Merge pull request #14069 from HJianBo/fix-cassandra-prepare

fix(cassandra): fix the prepared query cannot be modified
JianBo He 1 år sedan
förälder
incheckning
1239db2ea1

+ 15 - 0
apps/emqx_bridge/test/emqx_bridge_testlib.erl

@@ -190,6 +190,21 @@ update_bridge_api(Config, Overrides) ->
     ct:pal("bridge update result: ~p", [Res]),
     Res.
 
+get_bridge_api(Config) ->
+    BridgeType = ?config(bridge_type, Config),
+    Name = ?config(bridge_name, Config),
+    BridgeId = emqx_bridge_resource:bridge_id(BridgeType, Name),
+    Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    ct:pal("getting bridge (via http)", []),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader) of
+            {ok, Body0} -> {ok, emqx_utils_json:decode(Body0, [return_maps])};
+            Error -> Error
+        end,
+    ct:pal("bridge result: ~p", [Res]),
+    Res.
+
 delete_bridge_http_api_v1(Opts) ->
     #{type := Type, name := Name} = Opts,
     BridgeId = emqx_bridge_resource:bridge_id(Type, Name),

+ 1 - 1
apps/emqx_bridge_cassandra/mix.exs

@@ -23,7 +23,7 @@ defmodule EMQXBridgeCassandra.MixProject do
 
   def deps() do
     [
-      {:ecql, github: "emqx/ecql", tag: "v0.7.0"},
+      {:ecql, github: "emqx/ecql", tag: "v0.7.1"},
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},
       {:emqx_bridge, in_umbrella: true, runtime: false}

+ 1 - 1
apps/emqx_bridge_cassandra/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.7.0"}}},
+    {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.7.1"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 1 - 1
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_cassandra, [
     {description, "EMQX Enterprise Cassandra Bridge"},
-    {vsn, "0.3.2"},
+    {vsn, "0.3.3"},
     {registered, []},
     {applications, [
         kernel,

+ 7 - 1
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -44,7 +44,7 @@
 %% callbacks for query executing
 -export([query/4, prepared_query/4, batch_query/3]).
 
--export([do_get_status/1]).
+-export([do_get_status/1, get_reconnect_callback_signature/1]).
 
 -type state() ::
     #{
@@ -409,6 +409,12 @@ conn_opts([{password, Password} | Opts], Acc) ->
 conn_opts([Opt | Opts], Acc) ->
     conn_opts(Opts, [Opt | Acc]).
 
+%% this callback accepts the arg list provided to
+%% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
+%% so ecpool_worker can de-duplicate the callbacks based on the signature.
+get_reconnect_callback_signature([#{prepare_key := PrepareKey}]) ->
+    PrepareKey.
+
 %%--------------------------------------------------------------------
 %% prepare
 prepare_cql_to_cassandra(ParsedCql, PoolName) ->

+ 140 - 0
apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl

@@ -755,5 +755,145 @@ t_insert_null_into_int_column(Config) ->
 
     %% Would return `1853189228' if it encodes `null' as an integer...
     ?assertEqual(null, connect_and_get_payload(Config, "select x from mqtt.mqtt_msg_test2")),
+    ok.
 
+t_update_action_sql(Config) ->
+    BridgeType = ?config(bridge_type, Config),
+    connect_and_create_table(
+        Config,
+        <<
+            "CREATE TABLE mqtt.mqtt_msg_test2 (\n"
+            "  topic text,\n"
+            "  qos int,\n"
+            "  payload text,\n"
+            "  arrived timestamp,\n"
+            "  PRIMARY KEY (topic)\n"
+            ")"
+        >>
+    ),
+    on_exit(fun() -> connect_and_drop_table(Config, "DROP TABLE mqtt.mqtt_msg_test2") end),
+    {ok, {{_, 201, _}, _, _}} =
+        emqx_bridge_testlib:create_bridge_api(
+            Config,
+            #{
+                <<"cql">> => <<
+                    "insert into mqtt_msg_test2(topic, payload, arrived) "
+                    "values (${topic}, ${payload}, ${timestamp})"
+                >>
+            }
+        ),
+    RuleTopic = <<"t/a">>,
+    Opts = #{
+        sql => <<"select * from \"", RuleTopic/binary, "\"">>
+    },
+    {ok, _} = emqx_bridge_testlib:create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts),
+
+    Payload = <<"{}">>,
+    Msg = emqx_message:make(RuleTopic, Payload),
+    {_, {ok, _}} =
+        ?wait_async_action(
+            emqx:publish(Msg),
+            #{?snk_kind := cassandra_connector_query_return},
+            10_000
+        ),
+
+    ?assertEqual(
+        null,
+        connect_and_get_payload(Config, "select qos from mqtt.mqtt_msg_test2 where topic = 't/a'")
+    ),
+
+    %% Update a correct SQL Teamplate
+    {ok, _} =
+        emqx_bridge_testlib:update_bridge_api(
+            Config,
+            #{
+                <<"cql">> => <<
+                    "insert into mqtt_msg_test2(topic, qos, payload, arrived) "
+                    "values (${topic}, ${qos}, ${payload}, ${timestamp})"
+                >>
+            }
+        ),
+
+    RuleTopic1 = <<"t/b">>,
+    Opts1 = #{
+        sql => <<"select * from \"", RuleTopic1/binary, "\"">>
+    },
+    {ok, _} = emqx_bridge_testlib:create_rule_and_action_http(
+        BridgeType, RuleTopic1, Config, Opts1
+    ),
+
+    Msg1 = emqx_message:make(RuleTopic1, Payload),
+    {_, {ok, _}} =
+        ?wait_async_action(
+            emqx:publish(Msg1),
+            #{?snk_kind := cassandra_connector_query_return},
+            10_000
+        ),
+
+    ?assertEqual(
+        0,
+        connect_and_get_payload(Config, "select qos from mqtt.mqtt_msg_test2 where topic = 't/b'")
+    ),
+
+    %% Update a wrong SQL Teamplate
+    BadSQL =
+        <<
+            "insert into mqtt_msg_test2(topic, qos, payload, bad_col_name) "
+            "values (${topic}, ${qos}, ${payload}, ${timestamp})"
+        >>,
+    {ok, Body} =
+        emqx_bridge_testlib:update_bridge_api(
+            Config,
+            #{
+                <<"cql">> => BadSQL
+            }
+        ),
+    ?assertMatch(#{<<"status">> := <<"connecting">>}, Body),
+    Error1 = maps:get(<<"status_reason">>, Body),
+    case re:run(Error1, <<"Undefined column name bad_col_name">>, [{capture, none}]) of
+        match ->
+            ok;
+        nomatch ->
+            ct:fail(#{
+                expected_pattern => "undefined_column",
+                got => Error1
+            })
+    end,
+    %% assert that although there was an error returned, the invliad SQL is actually put
+    {ok, BridgeResp} = emqx_bridge_testlib:get_bridge_api(Config),
+    #{<<"cql">> := FetchedSQL} = BridgeResp,
+    ?assertEqual(FetchedSQL, BadSQL),
+
+    %% Update again with a correct SQL Teamplate
+    {ok, _} =
+        emqx_bridge_testlib:update_bridge_api(
+            Config,
+            #{
+                <<"cql">> => <<
+                    "insert into mqtt_msg_test2(topic, qos, payload, arrived) "
+                    "values (${topic}, ${qos}, ${payload}, ${timestamp})"
+                >>
+            }
+        ),
+
+    RuleTopic2 = <<"t/c">>,
+    Opts2 = #{
+        sql => <<"select * from \"", RuleTopic2/binary, "\"">>
+    },
+    {ok, _} = emqx_bridge_testlib:create_rule_and_action_http(
+        BridgeType, RuleTopic2, Config, Opts2
+    ),
+
+    Msg2 = emqx_message:make(RuleTopic2, Payload),
+    {_, {ok, _}} =
+        ?wait_async_action(
+            emqx:publish(Msg2),
+            #{?snk_kind := cassandra_connector_query_return},
+            10_000
+        ),
+
+    ?assertEqual(
+        0,
+        connect_and_get_payload(Config, "select qos from mqtt.mqtt_msg_test2 where topic = 't/c'")
+    ),
     ok.

+ 4 - 0
changes/ee/fix-14069.en.md

@@ -0,0 +1,4 @@
+Fix prepared statements for Cassandra integration.
+
+Before the fix, when the SQL Teamples in EMQX Action were modified, it could not be prepared
+to Cassandra and caused write failure.