Просмотр исходного кода

fix(dynamo): Added missing keys for DynamoDB

firest 1 год назад
Родитель
Сommit
a1e85e3c59

+ 27 - 2
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl

@@ -87,6 +87,7 @@ connector_values() ->
         <<"url">> => <<"http://127.0.0.1:8000">>,
         <<"aws_access_key_id">> => <<"root">>,
         <<"aws_secret_access_key">> => <<"******">>,
+        <<"region">> => <<"us-west-2">>,
         <<"pool_size">> => 8,
         <<"resource_opts">> =>
             #{
@@ -113,7 +114,8 @@ action_values() ->
         <<"parameters">> =>
             #{
                 <<"table">> => <<"mqtt_msg">>,
-                <<"template">> => ?DEFAULT_TEMPLATE
+                <<"template">> => ?DEFAULT_TEMPLATE,
+                <<"hash_key">> => <<"clientid">>
             }
     }.
 
@@ -160,7 +162,19 @@ fields(dynamo_action) ->
     );
 fields(action_parameters) ->
     Parameters =
-        [{template, template_field_schema()}] ++ emqx_bridge_dynamo_connector:fields(config),
+        [
+            {template, template_field_schema()},
+            {hash_key,
+                mk(
+                    binary(),
+                    #{desc => ?DESC("hash_key"), required => true}
+                )},
+            {range_key,
+                mk(
+                    binary(),
+                    #{desc => ?DESC("range_key"), required => false}
+                )}
+        ] ++ emqx_bridge_dynamo_connector:fields(config),
     lists:foldl(
         fun(Key, Acc) ->
             proplists:delete(Key, Acc)
@@ -168,6 +182,7 @@ fields(action_parameters) ->
         Parameters,
         [
             url,
+            region,
             aws_access_key_id,
             aws_secret_access_key,
             pool_size,
@@ -199,6 +214,16 @@ fields("config") ->
                 binary(),
                 #{desc => ?DESC("local_topic"), default => undefined}
             )},
+        {hash_key,
+            mk(
+                binary(),
+                #{desc => ?DESC("hash_key"), required => true}
+            )},
+        {range_key,
+            mk(
+                binary(),
+                #{desc => ?DESC("range_key"), required => false}
+            )},
         {resource_opts,
             mk(
                 ref(?MODULE, "creation_opts"),

+ 49 - 10
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl

@@ -45,6 +45,7 @@ roots() ->
 fields(config) ->
     [
         {url, mk(binary(), #{required => true, desc => ?DESC("url")})},
+        {region, mk(binary(), #{required => true, desc => ?DESC("region")})},
         {table, mk(binary(), #{required => true, desc => ?DESC("table")})},
         {aws_access_key_id,
             mk(
@@ -102,6 +103,12 @@ on_start(
         pool_name => InstanceId,
         installed_channels => #{}
     },
+    case Config of
+        #{region := Region} ->
+            application:set_env(erlcloud, aws_region, to_str(Region));
+        _ ->
+            ok
+    end,
     case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
         ok ->
             {ok, State};
@@ -126,12 +133,20 @@ on_add_channel(
 create_channel_state(
     #{parameters := Conf} = _ChannelConfig
 ) ->
-    #{
-        table := Table
-    } = Conf,
+    Keys = maps:with([hash_key, range_key], Conf),
+    Keys1 = maps:fold(
+        fun(K, V, Acc) ->
+            Acc#{K := erlang:binary_to_atom(V)}
+        end,
+        Keys,
+        Keys
+    ),
+
+    Base = maps:without([template, hash_key, range_key], Conf),
+    Base1 = maps:merge(Base, Keys1),
+
     Templates = parse_template_from_conf(Conf),
-    State = #{
-        table => Table,
+    State = Base1#{
         templates => Templates
     },
     {ok, State}.
@@ -232,11 +247,16 @@ do_query(
         templates := Templates
     } = ChannelState,
     Result =
-        ecpool:pick_and_do(
-            PoolName,
-            {emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]},
-            no_handover
-        ),
+        case ensuare_dynamo_keys(Query, ChannelState) of
+            true ->
+                ecpool:pick_and_do(
+                    PoolName,
+                    {emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]},
+                    no_handover
+                );
+            _ ->
+                {error, missing_filter_or_range_key}
+        end,
 
     case Result of
         {error, Reason} ->
@@ -288,6 +308,25 @@ get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
 get_query_tuple([InsertQuery | _]) ->
     get_query_tuple(InsertQuery).
 
+ensuare_dynamo_keys({_, Data} = Query, State) when is_map(Data) ->
+    ensuare_dynamo_keys([Query], State);
+ensuare_dynamo_keys([{_, Data} | _] = Queries, State) when is_map(Data) ->
+    Keys = maps:to_list(maps:with([hash_key, range_key], State)),
+    lists:all(
+        fun({_, Query}) ->
+            lists:all(
+                fun({_, Key}) ->
+                    maps:is_key(Key, Query)
+                end,
+                Keys
+            )
+        end,
+        Queries
+    );
+%% this is not a insert query
+ensuare_dynamo_keys(_Query, _State) ->
+    true.
+
 connect(Opts) ->
     Config = proplists:get_value(config, Opts),
     {ok, _Pid} = emqx_bridge_dynamo_connector_client:start_link(Config).

+ 23 - 6
apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl

@@ -16,6 +16,7 @@
 -define(TABLE_BIN, to_bin(?TABLE)).
 -define(ACCESS_KEY_ID, "root").
 -define(SECRET_ACCESS_KEY, "public").
+-define(REGION, "us-west-2").
 -define(HOST, "dynamo").
 -define(PORT, 8000).
 -define(SCHEMA, "http://").
@@ -177,7 +178,9 @@ dynamo_config(BridgeType, Config) ->
             "bridges.~s.~s {"
             "\n   enable = true"
             "\n   url = \"http://~s:~p\""
+            "\n   region = ~p"
             "\n   table = ~p"
+            "\n   hash_key =\"clientid\""
             "\n   aws_access_key_id = ~p"
             "\n   aws_secret_access_key = ~p"
             "\n   resource_opts = {"
@@ -191,6 +194,7 @@ dynamo_config(BridgeType, Config) ->
                 Name,
                 Host,
                 Port,
+                ?REGION,
                 ?TABLE,
                 ?ACCESS_KEY_ID,
                 %% NOTE: using file-based secrets with HOCON configs
@@ -210,7 +214,8 @@ action_config(Config) ->
         <<"enable">> => true,
         <<"parameters">> =>
             #{
-                <<"table">> => ?TABLE
+                <<"table">> => ?TABLE,
+                <<"hash_key">> => <<"clientid">>
             },
         <<"resource_opts">> =>
             #{
@@ -234,6 +239,7 @@ connector_config(Config) ->
         <<"url">> => URL,
         <<"aws_access_key_id">> => ?ACCESS_KEY_ID,
         <<"aws_secret_access_key">> => AccessKey,
+        <<"region">> => ?REGION,
         <<"enable">> => true,
         <<"pool_size">> => 8,
         <<"resource_opts">> =>
@@ -355,7 +361,7 @@ t_setup_via_config_and_publish(Config) ->
         create_bridge(Config)
     ),
     MsgId = emqx_utils:gen_id(),
-    SentData = #{id => MsgId, payload => ?PAYLOAD},
+    SentData = #{clientid => <<"clientid">>, id => MsgId, payload => ?PAYLOAD},
     ?check_trace(
         begin
             ?wait_async_action(
@@ -421,7 +427,7 @@ t_setup_via_http_api_and_publish(Config) ->
         create_bridge_http(PgsqlConfig)
     ),
     MsgId = emqx_utils:gen_id(),
-    SentData = #{id => MsgId, payload => ?PAYLOAD},
+    SentData = #{clientid => <<"clientid">>, id => MsgId, payload => ?PAYLOAD},
     ?check_trace(
         begin
             ?wait_async_action(
@@ -486,7 +492,7 @@ t_write_failure(Config) ->
             #{?snk_kind := resource_connected_enter},
             20_000
         ),
-    SentData = #{id => emqx_utils:gen_id(), payload => ?PAYLOAD},
+    SentData = #{clientid => <<"clientid">>, id => emqx_utils:gen_id(), payload => ?PAYLOAD},
     emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
         ?assertMatch(
             {error, {resource_error, #{reason := timeout}}}, send_message(Config, SentData)
@@ -513,12 +519,21 @@ t_simple_query(Config) ->
     ok.
 
 t_missing_data(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Result = send_message(Config, #{clientid => <<"clientid">>}),
+    ?assertMatch({error, {<<"ValidationException">>, <<>>}}, Result),
+    ok.
+
+t_missing_hash_key(Config) ->
     ?assertMatch(
         {ok, _},
         create_bridge(Config)
     ),
     Result = send_message(Config, #{}),
-    ?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result),
+    ?assertMatch({error, missing_filter_or_range_key}, Result),
     ok.
 
 t_bad_parameter(Config) ->
@@ -543,7 +558,9 @@ t_action_create_via_http(Config) ->
     emqx_bridge_v2_testlib:t_create_via_http(Config).
 
 t_action_sync_query(Config) ->
-    MakeMessageFun = fun() -> #{id => <<"the_message_id">>, payload => ?PAYLOAD} end,
+    MakeMessageFun = fun() ->
+        #{clientid => <<"clientid">>, id => <<"the_message_id">>, payload => ?PAYLOAD}
+    end,
     IsSuccessCheck = fun(Result) -> ?assertEqual({ok, []}, Result) end,
     TracePoint = dynamo_connector_query_return,
     emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint).

+ 5 - 0
rel/i18n/emqx_bridge_dynamo.hocon

@@ -60,4 +60,9 @@ config_connector.desc:
 config_connector.label:
 """DynamoDB Connector Configuration"""
 
+hash_key.desc:
+"""DynamoDB Hash Key"""
+
+range_key.desc:
+"""DynamoDB Range Key"""
 }

+ 3 - 0
rel/i18n/emqx_bridge_dynamo_connector.hocon

@@ -18,6 +18,9 @@ table.desc:
 table.label:
 """Table """
 
+region.label:
+"""Region of the AWS Dynamo"""
+
 url.desc:
 """The url of DynamoDB endpoint."""