Browse Source

fix(cassandra_bridge): correctly insert null values into columns

Fixes https://emqx.atlassian.net/browse/EMQX-11822
Thales Macedo Garitezi 2 năm trước cách đây
mục cha
commit
47f61ba68a

+ 1 - 1
apps/emqx_bridge_cassandra/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.6.0"}}},
+    {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.6.1"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 16 - 2
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -278,10 +278,24 @@ proc_cql_params(prepared_query, ChannId, Params, #{channels := Channs}) ->
             params_tokens := ParamsTokens
         }
     } = maps:get(ChannId, Channs),
-    {PrepareKey, assign_type_for_params(emqx_placeholder:proc_sql(ParamsTokens, Params))};
+    {PrepareKey, assign_type_for_params(proc_sql(ParamsTokens, Params))};
 proc_cql_params(query, CQL, Params, _State) ->
     {CQL1, Tokens} = emqx_placeholder:preproc_sql(CQL, '?'),
-    {CQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}.
+    {CQL1, assign_type_for_params(proc_sql(Tokens, Params))}.
+
+proc_sql(Tokens, Params) ->
+    VarTrans = fun
+        (null) -> null;
+        (X) -> emqx_placeholder:sql_data(X)
+    end,
+    emqx_placeholder:proc_tmpl(
+        Tokens,
+        Params,
+        #{
+            return => rawlist,
+            var_trans => VarTrans
+        }
+    ).
 
 exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
     Type == query; Type == prepared_query

+ 81 - 8
apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl

@@ -19,6 +19,8 @@
 %%     ./rebar3 ct --name 'test@127.0.0.1' -v --suite \
 %%     apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 % SQL definitions
 -define(SQL_BRIDGE,
     "insert into mqtt_msg_test(topic, payload, arrived) "
@@ -129,12 +131,15 @@ init_per_group(_Group, Config) ->
     Config.
 
 end_per_group(Group, Config) when
-    Group == without_batch; Group == without_batch
+    Group == with_batch;
+    Group == without_batch
 ->
     connect_and_drop_table(Config),
     ProxyHost = ?config(proxy_host, Config),
     ProxyPort = ?config(proxy_port, Config),
     emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
     ok;
 end_per_group(_Group, _Config) ->
     ok.
@@ -160,6 +165,7 @@ end_per_testcase(_Testcase, Config) ->
     ok = snabbkaffe:stop(),
     connect_and_clear_table(Config),
     delete_bridge(Config),
+    emqx_common_test_helpers:call_janitor(),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -177,19 +183,32 @@ common_init(Config0) ->
             ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
             ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
             emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-            % Ensure EE bridge module is loaded
-            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
-            _ = emqx_bridge_enterprise:module_info(),
-            emqx_mgmt_api_test_util:init_suite(),
+            Apps = emqx_cth_suite:start(
+                [
+                    emqx,
+                    emqx_conf,
+                    emqx_bridge_cassandra,
+                    emqx_bridge,
+                    emqx_rule_engine,
+                    emqx_management,
+                    {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+                ],
+                #{work_dir => emqx_cth_suite:work_dir(Config0)}
+            ),
+            {ok, _Api} = emqx_common_test_http:create_default_app(),
             % Connect to cassnadra directly and create the table
             catch connect_and_drop_table(Config0),
             connect_and_create_table(Config0),
             {Name, CassaConf} = cassa_config(BridgeType, Config0),
             Config =
                 [
+                    {apps, Apps},
                     {cassa_config, CassaConf},
                     {cassa_bridge_type, BridgeType},
                     {cassa_name, Name},
+                    {bridge_type, BridgeType},
+                    {bridge_name, Name},
+                    {bridge_config, CassaConf},
                     {proxy_host, ProxyHost},
                     {proxy_port, ProxyPort}
                     | Config0
@@ -360,13 +379,19 @@ connect_direct_cassa(Config) ->
 
 % These funs connect and then stop the cassandra connection
 connect_and_create_table(Config) ->
+    connect_and_create_table(Config, ?SQL_CREATE_TABLE).
+
+connect_and_create_table(Config, SQL) ->
     with_direct_conn(Config, fun(Conn) ->
-        {ok, _} = ecql:query(Conn, ?SQL_CREATE_TABLE)
+        {ok, _} = ecql:query(Conn, SQL)
     end).
 
 connect_and_drop_table(Config) ->
+    connect_and_drop_table(Config, ?SQL_DROP_TABLE).
+
+connect_and_drop_table(Config, SQL) ->
     with_direct_conn(Config, fun(Conn) ->
-        {ok, _} = ecql:query(Conn, ?SQL_DROP_TABLE)
+        {ok, _} = ecql:query(Conn, SQL)
     end).
 
 connect_and_clear_table(Config) ->
@@ -375,8 +400,11 @@ connect_and_clear_table(Config) ->
     end).
 
 connect_and_get_payload(Config) ->
+    connect_and_get_payload(Config, ?SQL_SELECT).
+
+connect_and_get_payload(Config, SQL) ->
     with_direct_conn(Config, fun(Conn) ->
-        {ok, {_Keyspace, _ColsSpec, [[Result]]}} = ecql:query(Conn, ?SQL_SELECT),
+        {ok, {_Keyspace, _ColsSpec, [[Result]]}} = ecql:query(Conn, SQL),
         Result
     end).
 
@@ -685,3 +713,48 @@ t_nasty_sql_string(Config) ->
     %% XXX: why ok instead of {ok, AffectedLines}?
     ?assertEqual(ok, send_message(Config, Message)),
     ?assertEqual(Payload, connect_and_get_payload(Config)).
+
+t_insert_null_into_int_column(Config) ->
+    BridgeType = ?config(bridge_type, Config),
+    connect_and_create_table(
+        Config,
+        <<
+            "CREATE TABLE mqtt.mqtt_msg_test2 (\n"
+            "  topic text,\n"
+            "  payload text,\n"
+            "  arrived timestamp,\n"
+            "  x int,\n"
+            "  PRIMARY KEY (topic)\n"
+            ")"
+        >>
+    ),
+    on_exit(fun() -> connect_and_drop_table(Config, "DROP TABLE mqtt.mqtt_msg_test2") end),
+    {ok, {{_, 201, _}, _, _}} =
+        emqx_bridge_testlib:create_bridge_api(
+            Config,
+            #{
+                <<"cql">> => <<
+                    "insert into mqtt_msg_test2(topic, payload, x, arrived) "
+                    "values (${topic}, ${payload}, ${x}, ${timestamp})"
+                >>
+            }
+        ),
+    RuleTopic = <<"t/c">>,
+    Opts = #{
+        sql => <<"select *, first(jq('null', payload)) as x from \"", RuleTopic/binary, "\"">>
+    },
+    {ok, _} = emqx_bridge_testlib:create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts),
+
+    Payload = <<"{}">>,
+    Msg = emqx_message:make(RuleTopic, Payload),
+    {_, {ok, _}} =
+        ?wait_async_action(
+            emqx:publish(Msg),
+            #{?snk_kind := cassandra_connector_query_return},
+            10_000
+        ),
+
+    %% Would return `1853189228' if it encodes `null' as an integer...
+    ?assertEqual(null, connect_and_get_payload(Config, "select x from mqtt.mqtt_msg_test2")),
+
+    ok.

+ 1 - 0
changes/ee/fix-12411.en.md

@@ -0,0 +1 @@
+Fixed a bug where `null` values would be inserted as `1853189228` in `int` columns in Cassandra data integration.