Преглед изворни кода

fix(mysql_bridge): export the query_mode option to the APIs

Shawn пре 3 година
родитељ
комит
1625b8eaeb

+ 7 - 0
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -414,6 +414,13 @@ on_sql_query(
                 LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
             ),
             Error;
+        {error, {1053, <<"08S01">>, Reason}} ->
+            %% mysql sql server shutdown in progress
+            ?SLOG(
+                error,
+                LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
+            ),
+            {recoverable_error, Reason};
         {error, Reason} ->
             ?SLOG(
                 error,

+ 14 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -128,7 +128,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
     ok = emqx_metrics_worker:create_metrics(
         ?RES_METRICS,
         ResId,
-        [matched, success, failed, exception],
+        [
+            matched,
+            sent,
+            dropped,
+            queued,
+            batched,
+            inflight,
+            'sent.success',
+            'sent.failed',
+            'sent.exception',
+            'dropped.inflight',
+            'dropped.queued',
+            'dropped.other'
+        ],
         [matched]
     ),
     ok = emqx_resource_worker_sup:start_workers(ResId, Opts),

+ 47 - 28
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -77,23 +77,27 @@ start_link(Id, Index, Opts) ->
 sync_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
     Timeout = maps:get(timeout, Opts, infinity),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
     pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
 
 -spec async_query(id(), request(), query_opts()) -> Result :: term().
 async_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
     pick_cast(Id, PickKey, {query, Request, Opts}).
 
 %% simple query the resource without batching and queuing messages.
 -spec simple_sync_query(id(), request()) -> Result :: term().
 simple_sync_query(Id, Request) ->
     Result = call_query(sync, Id, ?QUERY(self(), Request), #{}),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
     _ = handle_query_result(Id, Result, false),
     Result.
 
 -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
 simple_async_query(Id, Request, ReplyFun) ->
     Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
     _ = handle_query_result(Id, Result, false),
     Result.
 
@@ -149,8 +153,10 @@ running(cast, resume, _St) ->
     keep_state_and_data;
 running(cast, block, St) ->
     {next_state, block, St};
-running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
-    Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
+running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
+    is_list(Batch)
+->
+    Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
     {next_state, block, St#{queue := Q1}};
 running({call, From}, {query, Request, _Opts}, St) ->
     query_or_acc(From, Request, St);
@@ -169,8 +175,10 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
     {keep_state_and_data, {state_timeout, ResumeT, resume}};
 blocked(cast, block, _St) ->
     keep_state_and_data;
-blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
-    Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
+blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
+    is_list(Batch)
+->
+    Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
     {keep_state, St#{queue := Q1}};
 blocked(cast, resume, St) ->
     do_resume(St);
@@ -179,12 +187,12 @@ blocked(state_timeout, resume, St) ->
 blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) ->
     Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
     _ = reply_caller(Id, ?REPLY(From, Request, Error)),
-    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}};
+    {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request))])}};
 blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) ->
     ReplayFun = maps:get(async_reply_fun, Opts, undefined),
     Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
     _ = reply_caller(Id, ?REPLY(ReplayFun, Request, Error)),
-    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}.
+    {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}.
 
 terminate(_Reason, #{id := Id, index := Index}) ->
     gproc_pool:disconnect_worker(Id, {Id, Index}).
@@ -206,10 +214,10 @@ estimate_size(QItem) ->
         Pid when is_pid(Pid) ->
             EXPR;
         _ ->
-            ?RESOURCE_ERROR(not_created, "resource not created")
+            ?RESOURCE_ERROR(worker_not_created, "resource not created")
     catch
         error:badarg ->
-            ?RESOURCE_ERROR(not_created, "resource not created");
+            ?RESOURCE_ERROR(worker_not_created, "resource not created");
         exit:{timeout, _} ->
             ?RESOURCE_ERROR(timeout, "call resource timeout")
     end
@@ -277,18 +285,15 @@ query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St)
         inflight_name => maps:get(name, St),
         inflight_window => maps:get(async_inflight_window, St)
     },
-    case send_query(From, Request, Id, QueryOpts) of
+    Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts),
+    case reply_caller(Id, ?REPLY(From, Request, Result)) of
         true ->
             Query = ?QUERY(From, Request),
-            {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}};
+            {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}};
         false ->
             {keep_state, St}
     end.
 
-send_query(From, Request, Id, QueryOpts) ->
-    Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts),
-    reply_caller(Id, ?REPLY(From, Request, Result)).
-
 flush(#{acc := []} = St) ->
     {keep_state, St};
 flush(
@@ -307,14 +312,18 @@ flush(
     St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
     case batch_reply_caller(Id, Result, Batch) of
         true ->
-            Q1 = maybe_append_queue(Q0, [?Q_ITEM(Query) || Query <- Batch]),
+            Q1 = maybe_append_queue(Id, Q0, [?Q_ITEM(Query) || Query <- Batch]),
             {next_state, blocked, St1#{queue := Q1}};
         false ->
             {keep_state, St1}
     end.
 
-maybe_append_queue(undefined, _Items) -> undefined;
-maybe_append_queue(Q, Items) -> replayq:append(Q, Items).
+maybe_append_queue(Id, undefined, _Items) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'),
+    undefined;
+maybe_append_queue(Id, Q, Items) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'),
+    replayq:append(Q, Items).
 
 batch_reply_caller(Id, BatchResult, Batch) ->
     lists:foldl(
@@ -344,30 +353,40 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
     handle_query_result(Id, Result, BlockWorker).
 
 handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, exception),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.exception'),
     BlockWorker;
 handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when
     NotWorking == not_connected; NotWorking == blocked
 ->
     true;
-handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) ->
+handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, _), BlockWorker) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'),
+    BlockWorker;
+handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, _), BlockWorker) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'),
+    BlockWorker;
+handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
+    ?SLOG(error, #{msg => other_resource_error, reason => Reason}),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
     BlockWorker;
 handle_query_result(Id, {error, _}, BlockWorker) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
     BlockWorker;
-handle_query_result(Id, {recoverable_error, _}, _BlockWorker) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
+handle_query_result(_Id, {recoverable_error, _}, _BlockWorker) ->
     true;
 handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
     true;
 handle_query_result(Id, {async_return, {error, _}}, BlockWorker) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
     BlockWorker;
 handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
     BlockWorker;
 handle_query_result(Id, Result, BlockWorker) ->
     assert_ok_result(Result),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, success),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.success'),
     BlockWorker.
 
 call_query(QM0, Id, Query, QueryOpts) ->
@@ -407,7 +426,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
 
 apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent),
     ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request);
 apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
     ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
@@ -419,7 +438,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
                 ?tp(inflight_full, #{id => Id, wind_size => WinSize}),
                 {async_return, inflight_full};
             false ->
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent),
                 ReplyFun = fun ?MODULE:reply_after_query/6,
                 Ref = make_message_ref(),
                 Args = [self(), Id, Name, Ref, Query],
@@ -432,7 +451,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
 apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
     ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
     Requests = [Request || ?QUERY(_From, Request) <- Batch],
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)),
     ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch);
 apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
     ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
@@ -444,7 +463,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
                 ?tp(inflight_full, #{id => Id, wind_size => WinSize}),
                 {async_return, inflight_full};
             false ->
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)),
                 ReplyFun = fun ?MODULE:batch_reply_after_query/6,
                 Ref = make_message_ref(),
                 Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},