|
|
@@ -51,29 +51,22 @@ roots() ->
|
|
|
|
|
|
fields(config) ->
|
|
|
[
|
|
|
- {url, mk(binary(), #{required => true, desc => ?DESC("url")})}
|
|
|
- | add_default_username(
|
|
|
- emqx_connector_schema_lib:relational_db_fields()
|
|
|
- )
|
|
|
+ {url, mk(binary(), #{required => true, desc => ?DESC("url")})},
|
|
|
+ {table, mk(binary(), #{required => true, desc => ?DESC("table")})},
|
|
|
+ {aws_access_key_id,
|
|
|
+ mk(
|
|
|
+ binary(),
|
|
|
+ #{required => true, desc => ?DESC("aws_access_key_id")}
|
|
|
+ )},
|
|
|
+ {aws_secret_access_key,
|
|
|
+ mk(
|
|
|
+ binary(),
|
|
|
+ #{required => true, desc => ?DESC("aws_secret_access_key")}
|
|
|
+ )},
|
|
|
+ {pool_size, fun emqx_connector_schema_lib:pool_size/1},
|
|
|
+ {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
|
|
|
].
|
|
|
|
|
|
-add_default_username(Fields) ->
|
|
|
- lists:map(
|
|
|
- fun
|
|
|
- ({username, OrigUsernameFn}) ->
|
|
|
- {username, add_default_fn(OrigUsernameFn, <<"root">>)};
|
|
|
- (Field) ->
|
|
|
- Field
|
|
|
- end,
|
|
|
- Fields
|
|
|
- ).
|
|
|
-
|
|
|
-add_default_fn(OrigFn, Default) ->
|
|
|
- fun
|
|
|
- (default) -> Default;
|
|
|
- (Field) -> OrigFn(Field)
|
|
|
- end.
|
|
|
-
|
|
|
%%========================================================================================
|
|
|
%% `emqx_resource' API
|
|
|
%%========================================================================================
|
|
|
@@ -86,16 +79,16 @@ on_start(
|
|
|
InstanceId,
|
|
|
#{
|
|
|
url := Url,
|
|
|
- username := Username,
|
|
|
- password := Password,
|
|
|
- database := Database,
|
|
|
+ aws_access_key_id := AccessKeyID,
|
|
|
+ aws_secret_access_key := SecretAccessKey,
|
|
|
+ table := Table,
|
|
|
pool_size := PoolSize
|
|
|
} = Config
|
|
|
) ->
|
|
|
?SLOG(info, #{
|
|
|
msg => "starting_dynamo_connector",
|
|
|
connector => InstanceId,
|
|
|
- config => emqx_utils:redact(Config)
|
|
|
+ config => redact(Config)
|
|
|
}),
|
|
|
|
|
|
{Schema, Server} = get_host_schema(to_str(Url)),
|
|
|
@@ -105,8 +98,8 @@ on_start(
|
|
|
{config, #{
|
|
|
host => Host,
|
|
|
port => Port,
|
|
|
- username => to_str(Username),
|
|
|
- password => to_str(Password),
|
|
|
+ aws_access_key_id => to_str(AccessKeyID),
|
|
|
+ aws_secret_access_key => to_str(SecretAccessKey),
|
|
|
schema => Schema
|
|
|
}},
|
|
|
{pool_size, PoolSize}
|
|
|
@@ -115,7 +108,7 @@ on_start(
|
|
|
Templates = parse_template(Config),
|
|
|
State = #{
|
|
|
poolname => InstanceId,
|
|
|
- database => Database,
|
|
|
+ table => Table,
|
|
|
templates => Templates
|
|
|
},
|
|
|
case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of
|
|
|
@@ -183,7 +176,7 @@ do_query(
|
|
|
InstanceId,
|
|
|
Query,
|
|
|
ApplyMode,
|
|
|
- #{poolname := PoolName, templates := Templates, database := Database} = State
|
|
|
+ #{poolname := PoolName, templates := Templates, table := Table} = State
|
|
|
) ->
|
|
|
?TRACE(
|
|
|
"QUERY",
|
|
|
@@ -192,7 +185,7 @@ do_query(
|
|
|
),
|
|
|
Result = ecpool:pick_and_do(
|
|
|
PoolName,
|
|
|
- {?MODULE, worker_do_query, [Database, Query, Templates]},
|
|
|
+ {?MODULE, worker_do_query, [Table, Query, Templates]},
|
|
|
ApplyMode
|
|
|
),
|
|
|
|
|
|
@@ -217,43 +210,43 @@ do_query(
|
|
|
Result
|
|
|
end.
|
|
|
|
|
|
-worker_do_query(_Client, Database, Query0, Templates) ->
|
|
|
+worker_do_query(_Client, Table, Query0, Templates) ->
|
|
|
try
|
|
|
Query = apply_template(Query0, Templates),
|
|
|
- execute(Query, Database)
|
|
|
+ execute(Query, Table)
|
|
|
catch
|
|
|
_Type:Reason ->
|
|
|
{error, {unrecoverable_error, {invalid_request, Reason}}}
|
|
|
end.
|
|
|
|
|
|
%% some simple query commands for authn/authz or test
|
|
|
-execute({insert_item, Msg}, Database) ->
|
|
|
+execute({insert_item, Msg}, Table) ->
|
|
|
Item = convert_to_item(Msg),
|
|
|
- erlcloud_ddb2:put_item(Database, Item);
|
|
|
-execute({delete_item, Key}, Database) ->
|
|
|
- erlcloud_ddb2:delete_item(Database, Key);
|
|
|
-execute({get_item, Key}, Database) ->
|
|
|
- erlcloud_ddb2:get_item(Database, Key);
|
|
|
+ erlcloud_ddb2:put_item(Table, Item);
|
|
|
+execute({delete_item, Key}, Table) ->
|
|
|
+ erlcloud_ddb2:delete_item(Table, Key);
|
|
|
+execute({get_item, Key}, Table) ->
|
|
|
+ erlcloud_ddb2:get_item(Table, Key);
|
|
|
%% commands for data bridge query or batch query
|
|
|
-execute({send_message, Msg}, Database) ->
|
|
|
+execute({send_message, Msg}, Table) ->
|
|
|
Item = convert_to_item(Msg),
|
|
|
- erlcloud_ddb2:put_item(Database, Item);
|
|
|
-execute([{put, _} | _] = Msgs, Database) ->
|
|
|
+ erlcloud_ddb2:put_item(Table, Item);
|
|
|
+execute([{put, _} | _] = Msgs, Table) ->
|
|
|
%% type of batch_write_item argument :: batch_write_item_request_items()
|
|
|
%% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item())
|
|
|
%% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())}
|
|
|
%% batch_write_item_request() :: {put, item()} | {delete, key()}
|
|
|
- erlcloud_ddb2:batch_write_item({Database, Msgs}).
|
|
|
+ erlcloud_ddb2:batch_write_item({Table, Msgs}).
|
|
|
|
|
|
connect(Opts) ->
|
|
|
#{
|
|
|
- username := Username,
|
|
|
- password := Password,
|
|
|
+ aws_access_key_id := AccessKeyID,
|
|
|
+ aws_secret_access_key := SecretAccessKey,
|
|
|
host := Host,
|
|
|
port := Port,
|
|
|
schema := Schema
|
|
|
} = proplists:get_value(config, Opts),
|
|
|
- erlcloud_ddb2:configure(Username, Password, Host, Port, Schema),
|
|
|
+ erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema),
|
|
|
|
|
|
%% The dynamodb driver uses caller process as its connection process
|
|
|
%% so at here, the connection process is the ecpool worker self
|
|
|
@@ -338,3 +331,6 @@ convert2binary(Value) when is_map(Value) ->
|
|
|
|
|
|
do_async_reply(Result, {ReplyFun, [Context]}) ->
|
|
|
ReplyFun(Context, Result).
|
|
|
+
|
|
|
+redact(Data) ->
|
|
|
+ emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end).
|