|
|
@@ -134,20 +134,10 @@ on_add_channel(
|
|
|
create_channel_state(
|
|
|
#{parameters := Conf} = _ChannelConfig
|
|
|
) ->
|
|
|
- Keys = maps:with([hash_key, range_key], Conf),
|
|
|
- Keys1 = maps:fold(
|
|
|
- fun(K, V, Acc) ->
|
|
|
- Acc#{K := erlang:binary_to_existing_atom(V)}
|
|
|
- end,
|
|
|
- Keys,
|
|
|
- Keys
|
|
|
- ),
|
|
|
-
|
|
|
- Base = maps:without([template, hash_key, range_key], Conf),
|
|
|
- Base1 = maps:merge(Base, Keys1),
|
|
|
+ Base = maps:without([template], Conf),
|
|
|
|
|
|
Templates = parse_template_from_conf(Conf),
|
|
|
- State = Base1#{
|
|
|
+ State = Base#{
|
|
|
templates => Templates
|
|
|
},
|
|
|
{ok, State}.
|
|
|
@@ -318,12 +308,12 @@ get_query_tuple([InsertQuery | _]) ->
|
|
|
ensuare_dynamo_keys({_, Data} = Query, State) when is_map(Data) ->
|
|
|
ensuare_dynamo_keys([Query], State);
|
|
|
ensuare_dynamo_keys([{_, Data} | _] = Queries, State) when is_map(Data) ->
|
|
|
- Keys = maps:to_list(maps:with([hash_key, range_key], State)),
|
|
|
+ Keys = maps:values(maps:with([hash_key, range_key], State)),
|
|
|
lists:all(
|
|
|
fun({_, Query}) ->
|
|
|
lists:all(
|
|
|
- fun({_, Key}) ->
|
|
|
- maps:is_key(Key, Query)
|
|
|
+ fun(Key) ->
|
|
|
+ is_dynamo_key_existing(Key, Query)
|
|
|
end,
|
|
|
Keys
|
|
|
)
|
|
|
@@ -371,3 +361,17 @@ get_host_info(Server) ->
|
|
|
|
|
|
redact(Data) ->
|
|
|
emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end).
|
|
|
+
|
|
|
+is_dynamo_key_existing(Bin, Query) when is_binary(Bin) ->
|
|
|
+ case maps:is_key(Bin, Query) of
|
|
|
+ true ->
|
|
|
+ true;
|
|
|
+ _ ->
|
|
|
+ try
|
|
|
+ Key = erlang:binary_to_existing_atom(Bin),
|
|
|
+ maps:is_key(Key, Query)
|
|
|
+ catch
|
|
|
+ _:_ ->
|
|
|
+ false
|
|
|
+ end
|
|
|
+ end.
|