Przeglądaj źródła

feat(resource): start emqx_resource_worker in pools

Shawn 3 lat temu
rodzic
commit
d8d8d674e4

+ 6 - 3
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -111,12 +111,15 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
     ok = emqx_metrics_worker:create_metrics(
         ?RES_METRICS,
         ResId,
-        [matched, success, failed, exception, resource_error],
+        [matched, success, failed, exception, resource_down],
         [matched]
     ),
     case maps:get(start_after_created, Opts, true) of
-        true -> wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000));
-        false -> ok
+        true ->
+            ok = emqx_resource_sup:start_workers(ResId, Opts),
+            wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000));
+        false ->
+            ok
     end,
     ok.
 

+ 76 - 4
apps/emqx_resource/src/emqx_resource_sup.erl

@@ -19,13 +19,10 @@
 
 -behaviour(supervisor).
 
--export([start_link/0]).
+-export([start_link/0, start_workers/2, stop_workers/2]).
 
 -export([init/1]).
 
-%% set a very large pool size in case all the workers busy
--define(POOL_SIZE, 64).
-
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
@@ -43,3 +40,78 @@ init([]) ->
             modules => [emqx_resource_manager_sup]
         },
     {ok, {SupFlags, [Metrics, ResourceManager]}}.
+
+start_workers(ResId, Opts) ->
+    PoolSize = pool_size(Opts),
+    _ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]),
+    lists:foreach(
+        fun(Idx) ->
+            _ = ensure_worker_added(ResId, {ResId, Idx}, Idx),
+            ok = ensure_worker_started(ResId, Idx, Opts)
+        end,
+        lists:seq(1, PoolSize)
+    ).
+
+stop_workers(ResId, Opts) ->
+    PoolSize = pool_size(Opts),
+    lists:foreach(
+        fun(Idx) ->
+            ok = ensure_worker_stopped(ResId, Idx),
+            ok = ensure_worker_removed(ResId, {ResId, Idx})
+        end,
+        lists:seq(1, PoolSize)
+    ),
+    _ = gproc_pool:delete(ResId),
+    ok.
+
+pool_size(Opts) ->
+    maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)).
+
+ensure_worker_pool(Pool, Type, Opts) ->
+    try
+        gproc_pool:new(Pool, Type, Opts)
+    catch
+        error:exists -> ok
+    end,
+    ok.
+
+ensure_worker_added(Pool, Name, Slot) ->
+    try
+        gproc_pool:add_worker(Pool, Name, Slot)
+    catch
+        error:exists -> ok
+    end,
+    ok.
+
+ensure_worker_removed(Pool, Name) ->
+    _ = gproc_pool:remove_worker(Pool, Name),
+    ok.
+
+-define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}).
+ensure_worker_started(ResId, Idx, Opts) ->
+    Mod = emqx_resource_worker,
+    Spec = #{
+        id => ?CHILD_ID(Mod, ResId, Idx),
+        start => {Mod, start_link, [ResId, Idx, Opts]},
+        restart => transient,
+        shutdown => 5000,
+        type => worker,
+        modules => [Mod]
+    },
+    case supervisor:start_child(emqx_resource_sup, Spec) of
+        {ok, _Pid} -> ok;
+        {error, {already_started, _}} -> ok;
+        {error, already_present} -> ok;
+        {error, _} = Err -> Err
+    end.
+
+ensure_worker_stopped(ResId, Idx) ->
+    ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx),
+    case supervisor:terminate_child(emqx_resource_sup, ChildId) of
+        ok ->
+            supervisor:delete_child(emqx_resource_sup, ChildId);
+        {error, not_found} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.

+ 196 - 120
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -14,7 +14,8 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% An FIFO queue using ETS-ReplayQ as backend.
+%% This module implements async message sending, disk message queuing,
+%%  and message batching using ReplayQ.
 
 -module(emqx_resource_worker).
 
@@ -25,18 +26,23 @@
 -behaviour(gen_statem).
 
 -export([
-    start_link/2,
-    query/2,
-    query_async/3,
-    query_mfa/3
+    start_link/3,
+    query/3,
+    query_async/4,
+    block/1,
+    resume/1
 ]).
 
 -export([
     callback_mode/0,
-    init/1
+    init/1,
+    terminate/2,
+    code_change/3
 ]).
 
--export([do/3]).
+-export([running/3, blocked/3]).
+
+-define(RESUME_INTERVAL, 15000).
 
 %% count
 -define(DEFAULT_BATCH_SIZE, 100).
@@ -47,72 +53,136 @@
 -define(REPLY(FROM, REQUEST, RESULT), {FROM, REQUEST, RESULT}).
 -define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]).
 
+-define(RESOURCE_ERROR(Reason, Msg), {error, {resource_error, #{reason => Reason, msg => Msg}}}).
+-define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}).
+
 -type id() :: binary().
 -type request() :: term().
 -type result() :: term().
--type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()}.
+-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
 -type from() :: pid() | reply_fun().
 
 -callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) ->
     {{from(), result()}, NewCbState :: term()}.
 
-callback_mode() -> [state_functions].
+callback_mode() -> [state_functions, state_enter].
 
-start_link(Id, Opts) ->
-    gen_statem:start_link({local, name(Id)}, ?MODULE, {Id, Opts}, []).
+start_link(Id, Index, Opts) ->
+    gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []).
 
--spec query(id(), request()) -> ok.
-query(Id, Request) ->
-    gen_statem:call(name(Id), {query, Request}).
+-spec query(id(), term(), request()) -> ok.
+query(Id, Key, Request) ->
+    gen_statem:call(pick(Id, Key), {query, Request}).
 
--spec query_async(id(), request(), reply_fun()) -> ok.
-query_async(Id, Request, ReplyFun) ->
-    gen_statem:cast(name(Id), {query, Request, ReplyFun}).
+-spec query_async(id(), term(), request(), reply_fun()) -> ok.
+query_async(Id, Key, Request, ReplyFun) ->
+    gen_statem:cast(pick(Id, Key), {query, Request, ReplyFun}).
 
--spec name(id()) -> atom().
-name(Id) ->
-    Mod = atom_to_binary(?MODULE, utf8),
-    <<Mod/binary, ":", Id/binary>>.
+-spec block(pid() | atom()) -> ok.
+block(ServerRef) ->
+    gen_statem:cast(ServerRef, block).
 
-disk_cache_dir(Id) ->
-    filename:join([emqx:data_dir(), Id, cache]).
+-spec resume(pid() | atom()) -> ok.
+resume(ServerRef) ->
+    gen_statem:cast(ServerRef, resume).
 
-init({Id, Opts}) ->
+init({Id, Index, Opts}) ->
+    true = gproc_pool:connect_worker(Id, {Id, Index}),
     BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
     Queue =
-        case maps:get(cache_enabled, Opts, true) of
-            true -> replayq:open(#{dir => disk_cache_dir(Id), seg_bytes => 10000000});
+        case maps:get(queue_enabled, Opts, true) of
+            true -> replayq:open(#{dir => disk_queue_dir(Id), seg_bytes => 10000000});
             false -> undefined
         end,
     St = #{
         id => Id,
+        index => Index,
         batch_enabled => maps:get(batch_enabled, Opts, true),
         batch_size => BatchSize,
         batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
-        cache_queue => Queue,
+        queue => Queue,
         acc => [],
         acc_left => BatchSize,
         tref => undefined
     },
-    {ok, do, St}.
-
-do(cast, {query, Request, ReplyFun}, #{batch_enabled := true} = State) ->
-    do_acc(ReplyFun, Request, State);
-do(cast, {query, Request, ReplyFun}, #{batch_enabled := false} = State) ->
-    do_query(ReplyFun, Request, State);
-do({call, From}, {query, Request}, #{batch_enabled := true} = State) ->
-    do_acc(From, Request, State);
-do({call, From}, {query, Request}, #{batch_enabled := false} = State) ->
-    do_query(From, Request, State);
-do(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
+    {ok, blocked, St, {next_event, cast, resume}}.
+
+running(enter, _, _St) ->
+    keep_state_and_data;
+running(cast, resume, _St) ->
+    keep_state_and_data;
+running(cast, block, St) ->
+    {next_state, block, St};
+running(cast, {query, Request, ReplyFun}, St) ->
+    query_or_acc(ReplyFun, Request, St);
+running({call, From}, {query, Request}, St) ->
+    query_or_acc(From, Request, St);
+running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
     {keep_state, flush(St#{tref := undefined})};
-do(info, {flush, _Ref}, _St) ->
+running(info, {flush, _Ref}, _St) ->
     keep_state_and_data;
-do(info, Info, _St) ->
+running(info, Info, _St) ->
     ?SLOG(error, #{msg => unexpected_msg, info => Info}),
     keep_state_and_data.
 
-do_acc(From, Request, #{acc := Acc, acc_left := Left} = St0) ->
+blocked(enter, _, _St) ->
+    keep_state_and_data;
+blocked(cast, block, _St) ->
+    keep_state_and_data;
+blocked(cast, resume, St) ->
+    do_resume(St);
+blocked(state_timeout, resume, St) ->
+    do_resume(St);
+blocked(cast, {query, Request, ReplyFun}, St) ->
+    handle_blocked(ReplyFun, Request, St);
+blocked({call, From}, {query, Request}, St) ->
+    handle_blocked(From, Request, St).
+
+terminate(_Reason, #{id := Id, index := Index}) ->
+    gproc_pool:disconnect_worker(Id, {Id, Index}).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%==============================================================================
+pick(Id, Key) ->
+    Pid = gproc_pool:pick_worker(Id, Key),
+    case is_pid(Pid) of
+        true -> Pid;
+        false -> error({failed_to_pick_worker, {Id, Key}})
+    end.
+
+do_resume(#{queue := undefined} = St) ->
+    {next_state, running, St};
+do_resume(#{queue := Q, id := Id} = St) ->
+    case replayq:peek(Q) of
+        empty ->
+            {next_state, running, St, {state_timeout, ?RESUME_INTERVAL, resume}};
+        First ->
+            Result = call_query(Id, First),
+            case handle_query_result(Id, false, Result) of
+                %% Send failed because resource down
+                true ->
+                    {keep_state, St};
+                %% Send ok or failed but the resource is working
+                false ->
+                    %% We Send 'resume' to the end of the mailbox to give the worker
+                    %% a chance to process 'query' requests.
+                    {keep_state, St#{queue => drop_head(Q)}, {state_timeout, 0, resume}}
+            end
+    end.
+
+drop_head(Q) ->
+    {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
+    ok = replayq:ack(Q1, AckRef),
+    Q1.
+
+query_or_acc(From, Request, #{batch_enabled := true} = St) ->
+    acc_query(From, Request, St);
+query_or_acc(From, Request, #{batch_enabled := false} = St) ->
+    send_query(From, Request, St).
+
+acc_query(From, Request, #{acc := Acc, acc_left := Left} = St0) ->
     Acc1 = [?QUERY(From, Request) | Acc],
     St = St0#{acc := Acc1, acc_left := Left - 1},
     case Left =< 1 of
@@ -120,10 +190,19 @@ do_acc(From, Request, #{acc := Acc, acc_left := Left} = St0) ->
         false -> {keep_state, ensure_flush_timer(St)}
     end.
 
-do_query(From, Request, #{id := Id, cache_queue := Q0} = St0) ->
+send_query(From, Request, #{id := Id, queue := Q} = St) ->
     Result = call_query(Id, Request),
-    Q1 = reply_caller(Id, Q0, ?REPLY(From, Request, Result)),
-    {keep_state, St0#{cache_queue := Q1}}.
+    case reply_caller(Id, Q, ?REPLY(From, Request, Result)) of
+        true ->
+            {keep_state, St#{queue := maybe_append_queue(Q, [Request])}};
+        false ->
+            {next_state, blocked, St}
+    end.
+
+handle_blocked(From, Request, #{id := Id, queue := Q} = St) ->
+    Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
+    _ = reply_caller(Id, Q, ?REPLY(From, Request, Error)),
+    {keep_state, St#{queue := maybe_append_queue(Q, [Request])}}.
 
 flush(#{acc := []} = St) ->
     St;
@@ -132,101 +211,109 @@ flush(
         id := Id,
         acc := Batch,
         batch_size := Size,
-        cache_queue := Q0
+        queue := Q0
     } = St
 ) ->
-    BatchResults = call_batch_query(Id, Batch),
-    Q1 = batch_reply_caller(Id, Q0, BatchResults),
-    cancel_flush_timer(
-        St#{
-            acc_left := Size,
-            acc := [],
-            cache_queue := Q1
-        }
-    ).
+    BatchResults = maybe_expand_batch_result(call_batch_query(Id, Batch), Batch),
+    St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
+    case batch_reply_caller(Id, BatchResults) of
+        true ->
+            Q1 = maybe_append_queue(Q0, [Request || ?QUERY(_, Request) <- Batch]),
+            {keep_state, St1#{queue := Q1}};
+        false ->
+            {next_state, blocked, St1}
+    end.
 
-maybe_append_cache(undefined, _Request) -> undefined;
-maybe_append_cache(Q, Request) -> replayq:append(Q, Request).
+maybe_append_queue(undefined, _Query) -> undefined;
+maybe_append_queue(Q, Query) -> replayq:append(Q, Query).
 
-batch_reply_caller(Id, Q, BatchResults) ->
+batch_reply_caller(Id, BatchResults) ->
     lists:foldl(
-        fun(Reply, Q1) ->
-            reply_caller(Id, Q1, Reply)
+        fun(Reply, BlockWorker) ->
+            reply_caller(Id, BlockWorker, Reply)
         end,
-        Q,
+        false,
         BatchResults
     ).
 
-reply_caller(Id, Q, ?REPLY({ReplyFun, Args}, Request, Result)) when is_function(ReplyFun) ->
+reply_caller(Id, BlockWorker, ?REPLY(undefined, _, Result)) ->
+    handle_query_result(Id, BlockWorker, Result);
+reply_caller(Id, BlockWorker, ?REPLY({ReplyFun, Args}, _, Result)) ->
     ?SAFE_CALL(ReplyFun(Result, Args)),
-    handle_query_result(Id, Q, Request, Result);
-reply_caller(Id, Q, ?REPLY(From, Request, Result)) ->
+    handle_query_result(Id, BlockWorker, Result);
+reply_caller(Id, BlockWorker, ?REPLY(From, _, Result)) ->
     gen_statem:reply(From, Result),
-    handle_query_result(Id, Q, Request, Result).
+    handle_query_result(Id, BlockWorker, Result).
 
-handle_query_result(Id, Q, _Request, ok) ->
+handle_query_result(Id, BlockWorker, ok) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, success),
-    Q;
-handle_query_result(Id, Q, _Request, {ok, _}) ->
+    BlockWorker;
+handle_query_result(Id, BlockWorker, {ok, _}) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, success),
-    Q;
-handle_query_result(Id, Q, _Request, {error, _}) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
-    Q;
-handle_query_result(Id, Q, Request, {error, {resource_error, #{reason := not_connected}}}) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, resource_error),
-    maybe_append_cache(Q, Request);
-handle_query_result(Id, Q, _Request, {error, {resource_error, #{}}}) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, resource_error),
-    Q;
-handle_query_result(Id, Q, Request, {error, {exception, _}}) ->
+    BlockWorker;
+handle_query_result(Id, BlockWorker, ?RESOURCE_ERROR_M(exception, _)) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, exception),
-    maybe_append_cache(Q, Request).
+    BlockWorker;
+handle_query_result(_Id, _, ?RESOURCE_ERROR_M(NotWorking, _)) when
+    NotWorking == not_connected; NotWorking == blocked
+->
+    true;
+handle_query_result(_Id, BlockWorker, ?RESOURCE_ERROR_M(_, _)) ->
+    BlockWorker;
+handle_query_result(Id, BlockWorker, {error, _}) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
+    BlockWorker;
+handle_query_result(Id, _BlockWorker, {resource_down, _}) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down),
+    true.
 
 call_query(Id, Request) ->
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+    do_call_query(on_query, Id, Request).
+
+call_batch_query(Id, Batch) ->
+    do_call_query(on_batch_query, Id, Batch).
+
+do_call_query(Fun, Id, Data) ->
     case emqx_resource_manager:ets_lookup(Id) of
         {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
-            try Mod:on_query(Id, Request, ResourceState) of
+            ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Data)),
+            try Mod:Fun(Id, Data, ResourceState) of
+                %% if the callback module (connector) wants to return an error that
+                %% makes the current resource goes into the `error` state, it should
+                %% return `{resource_down, Reason}`
                 Result -> Result
             catch
                 Err:Reason:ST ->
-                    ModB = atom_to_binary(Mod, utf8),
-                    Msg = <<"call failed, func: ", ModB/binary, ":on_query/3">>,
-                    exception_error(Reason, Msg, {Err, Reason, ST})
+                    Msg = io_lib:format(
+                        "call query failed, func: ~s:~s/3, error: ~0p",
+                        [Mod, Fun, {Err, Reason, ST}]
+                    ),
+                    ?RESOURCE_ERROR(exception, Msg)
             end;
         {ok, _Group, #{status := stopped}} ->
-            resource_error(stopped, <<"resource stopped or disabled">>);
+            ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
         {ok, _Group, _Data} ->
-            resource_error(not_connected, <<"resource not connected">>);
+            ?RESOURCE_ERROR(not_connected, "resource not connected");
         {error, not_found} ->
-            resource_error(not_found, <<"resource not found">>)
+            ?RESOURCE_ERROR(not_found, "resource not found")
     end.
 
-call_batch_query(Id, Batch) ->
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)),
-    case emqx_resource_manager:ets_lookup(Id) of
-        {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
-            try Mod:on_batch_query(Id, Batch, ResourceState) of
-                BatchResults -> BatchResults
-            catch
-                Err:Reason:ST ->
-                    ModB = atom_to_binary(Mod, utf8),
-                    Msg = <<"call failed, func: ", ModB/binary, ":on_batch_query/3">>,
-                    ?EXPAND(exception_error(Reason, Msg, {Err, Reason, ST}), Batch)
-            end;
-        {ok, _Group, _Data} ->
-            ?EXPAND(resource_error(not_connected, <<"resource not connected">>), Batch);
-        {error, not_found} ->
-            ?EXPAND(resource_error(not_found, <<"resource not found">>), Batch)
-    end.
+%% the result is already expaned by the `Mod:on_query/3`
+maybe_expand_batch_result(Results, _Batch) when is_list(Results) ->
+    Results;
+%% the `Mod:on_query/3` returns a sinle result for a batch, so it is need expand
+maybe_expand_batch_result(Result, Batch) ->
+    ?EXPAND(Result, Batch).
+
+%%==============================================================================
 
-resource_error(Reason, Msg) ->
-    {error, {resource_error, #{reason => Reason, msg => Msg}}}.
-exception_error(Reason, Msg, Details) ->
-    {error, {exception, #{reason => Reason, msg => Msg, details => Details}}}.
+-spec name(id(), integer()) -> atom().
+name(Id, Index) ->
+    list_to_atom(lists:concat([?MODULE, ":", Id, ":", Index])).
+
+disk_queue_dir(Id) ->
+    filename:join([emqx:data_dir(), Id, queue]).
 
-%% ==========================================
 ensure_flush_timer(St = #{tref := undefined, batch_time := T}) ->
     Ref = make_ref(),
     TRef = erlang:send_after(T, self(), {flush, Ref}),
@@ -239,14 +326,3 @@ cancel_flush_timer(St = #{tref := undefined}) ->
 cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
     _ = erlang:cancel_timer(TRef),
     St#{tref => undefined}.
-
-query_mfa(InsertMode, Request, SyncTimeout) ->
-    {?MODULE, query_fun(InsertMode), query_args(InsertMode, Request, SyncTimeout)}.
-
-query_fun(<<"sync">>) -> query;
-query_fun(<<"async">>) -> query_async.
-
-query_args(<<"sync">>, Request, SyncTimeout) ->
-    [Request, SyncTimeout];
-query_args(<<"async">>, Request, _) ->
-    [Request].