Selaa lähdekoodia

feat: handle strange key values when resolving placeholders

Thales Macedo Garitezi 2 vuotta sitten
vanhempi
commit
23f5cea482

+ 36 - 5
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -219,7 +219,7 @@ encode_payload(State, Selected) ->
         payload_template := PayloadTemplate
     } = State,
     Data = render_payload(PayloadTemplate, Selected),
-    OrderingKey = render(OrderingKeyTemplate, Selected),
+    OrderingKey = render_key(OrderingKeyTemplate, Selected),
     Attributes = proc_attributes(AttributesTemplate, Selected),
     Payload0 = #{data => base64:encode(Data)},
     Payload1 = put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0),
@@ -234,9 +234,40 @@ put_if(Acc, _K, _V, false) ->
 render_payload([] = _Template, Selected) ->
     emqx_utils_json:encode(Selected);
 render_payload(Template, Selected) ->
-    render(Template, Selected).
+    render_value(Template, Selected).
 
-render(Template, Selected) ->
+render_key(Template, Selected) ->
+    Opts = #{
+        return => full_binary,
+        var_trans => fun
+            (_Var, undefined) ->
+                <<>>;
+            (Var, X) when is_boolean(X) ->
+                throw({bad_value_for_key, Var, X});
+            (_Var, X) when is_binary(X); is_number(X); is_atom(X) ->
+                emqx_utils_conv:bin(X);
+            (Var, X) ->
+                throw({bad_value_for_key, Var, X})
+        end
+    },
+    try
+        emqx_placeholder:proc_tmpl(Template, Selected, Opts)
+    catch
+        throw:{bad_value_for_key, Var, X} ->
+            ?tp(
+                warning,
+                "gcp_pubsub_producer_bad_value_for_key",
+                #{
+                    placeholder => Var,
+                    value => X,
+                    action => "key ignored",
+                    hint => "only plain values like strings and numbers can be used in keys"
+                }
+            ),
+            <<>>
+    end.
+
+render_value(Template, Selected) ->
     Opts = #{
         return => full_binary,
         var_trans => fun
@@ -264,12 +295,12 @@ preproc_attributes(AttributesTemplate) ->
 proc_attributes(AttributesTemplate, Selected) ->
     maps:fold(
         fun(KT, VT, Acc) ->
-            K = render(KT, Selected),
+            K = render_key(KT, Selected),
             case K =:= <<>> of
                 true ->
                     Acc;
                 false ->
-                    V = render(VT, Selected),
+                    V = render_value(VT, Selected),
                     Acc#{K => V}
             end
         end,

+ 114 - 1
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl

@@ -64,7 +64,8 @@ single_config_tests() ->
         t_get_status_no_worker,
         t_get_status_timeout_calling_workers,
         t_on_start_ehttpc_pool_already_started,
-        t_attributes
+        t_attributes,
+        t_bad_attributes
     ].
 
 only_sync_tests() ->
@@ -484,9 +485,15 @@ assert_http_request(ServiceAccountJSON) ->
         error({timeout, #{mailbox => Mailbox}})
     end.
 
+receive_http_requests(ServiceAccountJSON, Opts) ->
+    Default = #{n => 1},
+    #{n := N} = maps:merge(Default, Opts),
+    lists:flatmap(fun(_) -> receive_http_request(ServiceAccountJSON) end, lists:seq(1, N)).
+
 receive_http_request(ServiceAccountJSON) ->
     receive
         {http, Headers, Body} ->
+            ct:pal("received publish:\n  ~p", [#{headers => Headers, body => Body}]),
             assert_valid_request_headers(Headers, ServiceAccountJSON),
             #{<<"messages">> := Msgs} = emqx_utils_json:decode(Body, [return_maps]),
             lists:map(
@@ -1689,3 +1696,109 @@ t_attributes(Config) ->
         []
     ),
     ok.
+
+t_bad_attributes(Config) ->
+    ServiceAccountJSON = ?config(service_account_json, Config),
+    LocalTopic = <<"t/topic">>,
+    ?check_trace(
+        begin
+            {ok, _} = create_bridge_http(
+                Config,
+                #{
+                    <<"local_topic">> => LocalTopic,
+                    <<"attributes_template">> =>
+                        [
+                            #{
+                                <<"key">> => <<"${.payload.key}">>,
+                                <<"value">> => <<"${.payload.value}">>
+                            }
+                        ],
+                    <<"ordering_key_template">> => <<"${.payload.ok}">>
+                }
+            ),
+            %% Ok: attribute value is a map or list
+            lists:foreach(
+                fun(OkValue) ->
+                    Payload0 =
+                        emqx_utils_json:encode(
+                            #{
+                                <<"ok">> => <<"ord_key">>,
+                                <<"value">> => OkValue,
+                                <<"key">> => <<"attr_key">>
+                            }
+                        ),
+                    Message0 = emqx_message:make(LocalTopic, Payload0),
+                    emqx:publish(Message0)
+                end,
+                [
+                    #{<<"some">> => <<"map">>},
+                    [1, <<"str">>, #{<<"deep">> => true}]
+                ]
+            ),
+            DecodedMessages0 = receive_http_requests(ServiceAccountJSON, #{n => 1}),
+            ?assertMatch(
+                [
+                    #{
+                        <<"attributes">> :=
+                            #{<<"attr_key">> := <<"{\"some\":\"map\"}">>},
+                        <<"orderingKey">> := <<"ord_key">>
+                    },
+                    #{
+                        <<"attributes">> :=
+                            #{<<"attr_key">> := <<"[1,\"str\",{\"deep\":true}]">>},
+                        <<"orderingKey">> := <<"ord_key">>
+                    }
+                ],
+                DecodedMessages0
+            ),
+            %% Bad: key is not a plain value
+            lists:foreach(
+                fun(BadKey) ->
+                    Payload1 =
+                        emqx_utils_json:encode(
+                            #{
+                                <<"value">> => <<"v">>,
+                                <<"key">> => BadKey,
+                                <<"ok">> => BadKey
+                            }
+                        ),
+                    Message1 = emqx_message:make(LocalTopic, Payload1),
+                    emqx:publish(Message1)
+                end,
+                [
+                    #{<<"some">> => <<"map">>},
+                    [1, <<"list">>, true],
+                    true,
+                    false
+                ]
+            ),
+            DecodedMessages1 = receive_http_request(ServiceAccountJSON),
+            lists:foreach(
+                fun(DMsg) ->
+                    ?assertNot(is_map_key(<<"orderingKey">>, DMsg), #{decoded_message => DMsg}),
+                    ?assertNot(is_map_key(<<"attributes">>, DMsg), #{decoded_message => DMsg}),
+                    ok
+                end,
+                DecodedMessages1
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ct:pal("trace:\n  ~p", [Trace]),
+            ?assertMatch(
+                [
+                    #{placeholder := [<<"payload">>, <<"ok">>], value := #{}},
+                    #{placeholder := [<<"payload">>, <<"key">>], value := #{}},
+                    #{placeholder := [<<"payload">>, <<"ok">>], value := [_ | _]},
+                    #{placeholder := [<<"payload">>, <<"key">>], value := [_ | _]},
+                    #{placeholder := [<<"payload">>, <<"ok">>], value := true},
+                    #{placeholder := [<<"payload">>, <<"key">>], value := true},
+                    #{placeholder := [<<"payload">>, <<"ok">>], value := false},
+                    #{placeholder := [<<"payload">>, <<"key">>], value := false}
+                ],
+                ?of_kind("gcp_pubsub_producer_bad_value_for_key", Trace)
+            ),
+            ok
+        end
+    ),
+    ok.