Przeglądaj źródła

refactor: address review comments

zmstone 1 rok temu
rodzic
commit
c96ee0d8be

+ 1 - 1
apps/emqx_resource/include/emqx_resource.hrl

@@ -28,7 +28,7 @@
 -type resource_type() :: atom().
 -type resource_module() :: module().
 -type resource_id() :: binary().
--type channel_id() :: binary().
+-type channel_id() :: action_resource_id() | source_resource_id().
 -type raw_resource_config() :: binary() | raw_term_resource_config().
 -type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()].
 -type resource_config() :: term().

+ 20 - 23
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1302,36 +1302,31 @@ extract_connector_id(Id) when is_binary(Id) ->
 %% Check if channel is installed in the connector state.
 %% There is no need to query the conncector if the channel is not
 %% installed as the query will fail anyway.
-pre_query_channel_check(Id, {Id, _} = _Request, ChanSt, QueryOpts) ->
+pre_query_channel_check(Id, {Id, _} = _Request, ChanSt, IsSimpleQuery) ->
     case emqx_resource_manager:channel_status_is_channel_added(ChanSt) of
         true ->
             ok;
         false ->
-            error_if_channel_is_not_installed(Id, QueryOpts)
+            error_if_channel_is_not_installed(Id, IsSimpleQuery)
     end;
-pre_query_channel_check(Id, {Id, _} = _Request, ?NO_CHANNEL, QueryOpts) ->
-    %% a per-channel requst, but channel was not added or deleted (due to reace)
-    error_if_channel_is_not_installed(Id, QueryOpts);
-pre_query_channel_check(_Id, _Request, _ChanSt, _QueryOpts) ->
-    %% Not a per-channel requst
+pre_query_channel_check(_Id, _Request, _ChanSt, _IsSimpleQuery) ->
+    %% Not a per-channel request
     ok.
 
-error_if_channel_is_not_installed(Id, QueryOpts) ->
+error_if_channel_is_not_installed(Id, IsSimpleQuery) ->
     %% Fail with a recoverable error if the channel is not installed and there are buffer
     %% workers involved so that the operation can be retried.  Otherwise, this is
     %% unrecoverable.  It is emqx_resource_manager's responsibility to ensure that the
     %% channel installation is retried.
-    IsSimpleQuery = is_simple_query(QueryOpts),
-    case IsSimpleQuery of
-        true ->
-            {error,
-                {unrecoverable_error,
-                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
-        false ->
-            {error,
-                {recoverable_error,
-                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}}
-    end.
+    Msg = iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id])),
+    ErrorType =
+        case IsSimpleQuery of
+            true ->
+                unrecoverable_error;
+            false ->
+                recoverable_error
+        end,
+    {error, {ErrorType, Msg}}.
 
 is_always_send(#{query_mode := M}, _) when ?IS_BYPASS(M) ->
     %% The query overrides the query mode of the resource, send even in disconnected state
@@ -1393,7 +1388,7 @@ apply_query_fun(
         ?APPLY_RESOURCE(
             call_query,
             begin
-                case pre_query_channel_check(Id, Request, ChanSt, QueryOpts) of
+                case pre_query_channel_check(Id, Request, ChanSt, is_simple_query(QueryOpts)) of
                     ok ->
                         Mod:on_query(extract_connector_id(Id), Request, ResSt);
                     Error ->
@@ -1437,7 +1432,7 @@ apply_query_fun(
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
-            case pre_query_channel_check(Id, Request, ChanSt, QueryOpts) of
+            case pre_query_channel_check(Id, Request, ChanSt, IsSimpleQuery) of
                 ok ->
                     case
                         Mod:on_query_async(
@@ -1479,7 +1474,9 @@ apply_query_fun(
         ?APPLY_RESOURCE(
             call_batch_query,
             begin
-                case pre_query_channel_check(Id, FirstRequest, ChanSt, QueryOpts) of
+                case
+                    pre_query_channel_check(Id, FirstRequest, ChanSt, is_simple_query(QueryOpts))
+                of
                     ok ->
                         Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt);
                     Error ->
@@ -1526,7 +1523,7 @@ apply_query_fun(
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
-            case pre_query_channel_check(Id, FirstRequest, ChanSt, QueryOpts) of
+            case pre_query_channel_check(Id, FirstRequest, ChanSt, IsSimpleQuery) of
                 ok ->
                     case
                         Mod:on_batch_query_async(

+ 1 - 1
apps/emqx_resource/src/emqx_resource_cache.erl

@@ -40,7 +40,7 @@
     extra = []
 }).
 
--type chan_key() :: {ConnectorId :: binary(), ChannelID :: binary()}.
+-type chan_key() :: {connector_resource_id(), channel_id()}.
 
 -record(channel, {
     id :: chan_key(),

+ 0 - 4
apps/emqx_resource/src/emqx_resource_cache_cleaner.erl

@@ -22,7 +22,6 @@
     handle_call/3,
     handle_cast/2,
     handle_info/2,
-    code_change/3,
     terminate/2
 ]).
 -export([add/2]).
@@ -64,9 +63,6 @@ handle_info(_Info, State) ->
 terminate(_Reason, _State) ->
     ok.
 
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
 maybe_erase_cache(DownManager, ID) ->
     case emqx_resource_cache:read_manager_pid(ID) =:= DownManager of
         true ->

+ 1 - 1
apps/emqx_resource/src/emqx_resource_sup.erl

@@ -34,7 +34,7 @@ init([]) ->
         id => emqx_resource_cache_cleaner,
         start => {emqx_resource_cache_cleaner, start_link, []},
         restart => permanent,
-        shutdown => infinity,
+        shutdown => 5_000,
         type => worker,
         modules => [emqx_resource_cache_cleaner]
     },

+ 2 - 2
apps/emqx_resource/test/emqx_resource_tests.erl

@@ -26,12 +26,12 @@ is_dry_run_test_() ->
         ?_assert(emqx_resource:is_dry_run(bin([?PROBE_ID_NEW(), "_abc"]))),
         ?_assert(
             emqx_resource:is_dry_run(
-                bin(["acton:typeA:", ?PROBE_ID_NEW(), ":connector:typeB:dryrun"])
+                bin(["action:typeA:", ?PROBE_ID_NEW(), ":connector:typeB:dryrun"])
             )
         ),
         ?_assertNot(
             emqx_resource:is_dry_run(
-                bin(["acton:type1:dryrun:connector:typeb:dryrun"])
+                bin(["action:type1:dryrun:connector:typeb:dryrun"])
             )
         )
     ].