فهرست منبع

feat(mysql): check whether target table exists

Fixes https://emqx.atlassian.net/browse/EMQX-9026
Paulo Zulato 2 سال پیش
والد
کامیت
c9a2ddf98c
2فایلهای تغییر یافته به همراه114 افزوده شده و 2 حذف شده
  1. 43 2
      apps/emqx_connector/src/emqx_connector_mysql.erl
  2. 71 0
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl

+ 43 - 2
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -224,7 +224,10 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
                     {connected, NState};
                 {error, _Reason} ->
                     %% do not log error, it is logged in prepare_sql_to_conn
-                    connecting
+                    connecting;
+                {undefined_table, NState} ->
+                    %% return new state indicating that we are connected but the target table is not created
+                    {disconnected, NState, unhealthy_target}
             end;
         false ->
             connecting
@@ -233,7 +236,37 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
 do_get_status(Conn) ->
     ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
 
-do_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) ->
+do_check_prepares(
+    #{
+        pool_name := PoolName,
+        prepare_statement := #{send_message := SQL}
+    } = State
+) ->
+    % it's already connected. Verify if target table still exists
+    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+    lists:foldl(
+        fun
+            (WorkerPid, ok) ->
+                case ecpool_worker:client(WorkerPid) of
+                    {ok, Conn} ->
+                        case mysql:prepare(Conn, get_status, SQL) of
+                            {error, {1146, _, _}} ->
+                                {undefined_table, State};
+                            {ok, Statement} ->
+                                mysql:unprepare(Conn, Statement);
+                            _ ->
+                                ok
+                        end;
+                    _ ->
+                        ok
+                end;
+            (_, Acc) ->
+                Acc
+        end,
+        ok,
+        Workers
+    );
+do_check_prepares(#{prepare_statement := Statement}) when is_map(Statement) ->
     ok;
 do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) ->
     %% retry to prepare
@@ -241,6 +274,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error,
         ok ->
             %% remove the error
             {ok, State#{prepare_statement => Prepares}};
+        {error, undefined_table} ->
+            %% indicate the error
+            {undefined_table, State#{prepare_statement => {error, Prepares}}};
         {error, Reason} ->
             {error, Reason}
     end.
@@ -320,6 +356,11 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
         {ok, _Key} ->
             ?SLOG(info, LogMeta#{result => success}),
             prepare_sql_to_conn(Conn, PrepareList);
+        {error, {1146, _, _} = Reason} ->
+            %% Target table is not created
+            ?tp(mysql_undefined_table, #{}),
+            ?SLOG(error, LogMeta#{result => failed, reason => Reason}),
+            {error, undefined_table};
         {error, Reason} ->
             % FIXME: we should try to differ on transient failers and
             % syntax failures. Retrying syntax failures is not very productive.

+ 71 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl

@@ -110,6 +110,7 @@ end_per_suite(_Config) ->
     ok.
 
 init_per_testcase(_Testcase, Config) ->
+    connect_and_create_table(Config),
     connect_and_clear_table(Config),
     delete_bridge(Config),
     snabbkaffe:start_trace(),
@@ -241,6 +242,12 @@ query_resource(Config, Request) ->
     ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
     emqx_resource:query(ResourceID, Request, #{timeout => 500}).
 
+sync_query_resource(Config, Request) ->
+    Name = ?config(mysql_name, Config),
+    BridgeType = ?config(mysql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
+
 query_resource_async(Config, Request) ->
     Name = ?config(mysql_name, Config),
     BridgeType = ?config(mysql_bridge_type, Config),
@@ -480,6 +487,7 @@ t_write_timeout(Config) ->
     ProxyHost = ?config(proxy_host, Config),
     QueryMode = ?config(query_mode, Config),
     {ok, _} = create_bridge(Config),
+    connect_and_create_table(Config),
     Val = integer_to_binary(erlang:unique_integer()),
     SentData = #{payload => Val, timestamp => 1668602148000},
     Timeout = 1000,
@@ -641,6 +649,7 @@ t_workload_fits_prepared_statement_limit(Config) ->
     ).
 
 t_unprepared_statement_query(Config) ->
+    ok = connect_and_create_table(Config),
     ?assertMatch(
         {ok, _},
         create_bridge(Config)
@@ -668,6 +677,7 @@ t_unprepared_statement_query(Config) ->
 %% Test doesn't work with batch enabled since batch doesn't use
 %% prepared statements as such; it has its own query generation process
 t_uninitialized_prepared_statement(Config) ->
+    connect_and_create_table(Config),
     ?assertMatch(
         {ok, _},
         create_bridge(Config)
@@ -705,3 +715,64 @@ t_uninitialized_prepared_statement(Config) ->
         end
     ),
     ok.
+
+t_missing_table(Config) ->
+    Name = ?config(mysql_name, Config),
+    BridgeType = ?config(mysql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+
+    ?check_trace(
+        begin
+            connect_and_drop_table(Config),
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertMatch(
+                    {ok, Status} when Status == connecting orelse Status == disconnected,
+                    emqx_resource_manager:health_check(ResourceID)
+                )
+            ),
+            Val = integer_to_binary(erlang:unique_integer()),
+            SentData = #{payload => Val, timestamp => 1668602148000},
+            Timeout = 1000,
+            ?assertMatch(
+                {error, {resource_error, #{reason := unhealthy_target}}},
+                query_resource(Config, {send_message, SentData, [], Timeout})
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_, _, _], ?of_kind(mysql_undefined_table, Trace)),
+            ok
+        end
+    ).
+
+t_table_removed(Config) ->
+    Name = ?config(mysql_name, Config),
+    BridgeType = ?config(mysql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    ?check_trace(
+        begin
+            connect_and_create_table(Config),
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
+            ),
+            connect_and_drop_table(Config),
+            Val = integer_to_binary(erlang:unique_integer()),
+            SentData = #{payload => Val, timestamp => 1668602148000},
+            Timeout = 1000,
+            ?assertMatch(
+                {error,
+                    {unrecoverable_error,
+                        {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
+                sync_query_resource(Config, {send_message, SentData, [], Timeout})
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.