Просмотр исходного кода

Merge pull request #6556 from terry-xiaoyu/fixes_for_bridges

Bug fixes for data bridges
Shawn 4 лет назад
Родитель
Сommit
1d4be368cb

+ 1 - 0
apps/emqx_bridge/src/emqx_bridge_http_schema.erl

@@ -87,6 +87,7 @@ basic_config() ->
     , {direction,
         mk(egress,
            #{ desc => "The direction of this bridge, MUST be egress"
+            , default => egress
             })}
     ]
     ++ proplists:delete(base_url, emqx_connector_http:fields(config)).

+ 1 - 0
apps/emqx_bridge/src/emqx_bridge_schema.erl

@@ -71,6 +71,7 @@ metrics_status_fields() ->
 direction_field(Dir, Desc) ->
     {direction, mk(Dir,
         #{ nullable => false
+         , default => egress
          , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>"
             ++ Desc
          })}.

+ 49 - 13
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -47,7 +47,7 @@ groups() ->
     [].
 
 suite() ->
-	[{timetrap,{seconds,30}}].
+	[{timetrap,{seconds,60}}].
 
 init_per_suite(Config) ->
     ok = emqx_config:put([emqx_dashboard], #{
@@ -84,7 +84,7 @@ start_http_server(HandleFun) ->
     spawn_link(fun() ->
         {Port, Sock} = listen_on_random_port(),
         Parent ! {port, Port},
-        loop(Sock, HandleFun)
+        loop(Sock, HandleFun, Parent)
     end),
     receive
         {port, Port} -> Port
@@ -95,40 +95,49 @@ start_http_server(HandleFun) ->
 listen_on_random_port() ->
     Min = 1024, Max = 65000,
     Port = rand:uniform(Max - Min) + Min,
-    case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}]) of
+    case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}, binary]) of
         {ok, Sock} -> {Port, Sock};
         {error, eaddrinuse} -> listen_on_random_port()
     end.
 
-loop(Sock, HandleFun) ->
+loop(Sock, HandleFun, Parent) ->
     {ok, Conn} = gen_tcp:accept(Sock),
-    Handler = spawn(fun () -> HandleFun(Conn) end),
+    Handler = spawn(fun () -> HandleFun(Conn, Parent) end),
     gen_tcp:controlling_process(Conn, Handler),
-    loop(Sock, HandleFun).
+    loop(Sock, HandleFun, Parent).
 
 make_response(CodeStr, Str) ->
     B = iolist_to_binary(Str),
     iolist_to_binary(
       io_lib:fwrite(
-         "HTTP/1.0 ~s\nContent-Type: text/html\nContent-Length: ~p\n\n~s",
+         "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s",
          [CodeStr, size(B), B])).
 
-handle_fun_200_ok(Conn) ->
+handle_fun_200_ok(Conn, Parent) ->
     case gen_tcp:recv(Conn, 0) of
-        {ok, Request} ->
+        {ok, ReqStr} ->
+            ct:pal("the http handler got request: ~p", [ReqStr]),
+            Req = parse_http_request(ReqStr),
+            Parent ! {http_server, received, Req},
             gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
-            self() ! {http_server, received, Request},
-            handle_fun_200_ok(Conn);
+            handle_fun_200_ok(Conn, Parent);
         {error, closed} ->
             gen_tcp:close(Conn)
     end.
 
+parse_http_request(ReqStr0) ->
+    [Method, ReqStr1] = string:split(ReqStr0, " ", leading),
+    [Path, ReqStr2] = string:split(ReqStr1, " ", leading),
+    [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading),
+    [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading),
+    #{method => Method, path => Path, body => Body}.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
 
 t_http_crud_apis(_) ->
-    Port = start_http_server(fun handle_fun_200_ok/1),
+    Port = start_http_server(fun handle_fun_200_ok/2),
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
@@ -163,6 +172,20 @@ t_http_crud_apis(_) ->
          , <<"message">> := <<"bridge already exists">>
          }, jsx:decode(RetMsg)),
 
+    %% send an message to emqx and the message should be forwarded to the HTTP server
+    Body = <<"my msg">>,
+    emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
+    ?assert(
+        receive
+            {http_server, received, #{method := <<"POST">>, path := <<"/path1">>,
+                    body := Body}} ->
+                true;
+            Msg ->
+                ct:pal("error: http got unexpected request: ~p", [Msg]),
+                false
+        after 100 ->
+            false
+        end),
     %% update the request-path of the bridge
     URL2 = ?URL(Port, "path2"),
     {ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]),
@@ -201,6 +224,19 @@ t_http_crud_apis(_) ->
                   , <<"url">> := URL2
                   }, jsx:decode(Bridge3Str)),
 
+    %% send an message to emqx again, check the path has been changed
+    emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
+    ?assert(
+        receive
+            {http_server, received, #{path := <<"/path2">>}} ->
+                true;
+            Msg2 ->
+                ct:pal("error: http got unexpected request: ~p", [Msg2]),
+                false
+        after 100 ->
+            false
+        end),
+
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
@@ -215,7 +251,7 @@ t_http_crud_apis(_) ->
     ok.
 
 t_start_stop_bridges(_) ->
-    Port = start_http_server(fun handle_fun_200_ok/1),
+    Port = start_http_server(fun handle_fun_200_ok/2),
     URL1 = ?URL(Port, "abc"),
     {ok, 201, Bridge} = request(post, uri(["bridges"]),
         ?HTTP_BRIDGE(URL1)#{

+ 6 - 1
apps/emqx_connector/src/emqx_connector_http.erl

@@ -266,11 +266,16 @@ process_request(#{
         } = Conf, Msg) ->
     Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg))
          , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg)
-         , body => emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg)
+         , body => process_request_body(BodyTks, Msg)
          , headers => maps:to_list(proc_headers(HeadersTks, Msg))
          , request_timeout => ReqTimeout
          }.
 
+process_request_body([], Msg) ->
+    emqx_json:encode(Msg);
+process_request_body(BodyTks, Msg) ->
+    emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg).
+
 proc_headers(HeaderTks, Msg) ->
     maps:fold(fun(K, V, Acc) ->
             Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) =>

+ 4 - 1
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -208,7 +208,7 @@ basic_config(#{
         username => User,
         password => Password,
         clean_start => CleanStart,
-        keepalive => KeepAlive,
+        keepalive => ms_to_s(KeepAlive),
         retry_interval => RetryIntv,
         max_inflight => MaxInflight,
         ssl => EnableSsl,
@@ -216,5 +216,8 @@ basic_config(#{
         if_record_metrics => true
     }.
 
+ms_to_s(Ms) ->
+    erlang:ceil(Ms / 1000).
+
 clientid(Id) ->
     iolist_to_binary([Id, ":", atom_to_list(node())]).

+ 7 - 2
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl

@@ -66,7 +66,7 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
 to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken,
         remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
     Topic = replace_vars_in_str(TopicToken, MapMsg),
-    Payload = replace_vars_in_str(PayloadToken, MapMsg),
+    Payload = process_payload(PayloadToken, MapMsg),
     QoS = replace_simple_var(QoSToken, MapMsg),
     Retain = replace_simple_var(RetainToken, MapMsg),
     #mqtt_msg{qos = QoS,
@@ -82,13 +82,18 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
             #{local_topic := TopicToken, payload := PayloadToken,
               local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
     Topic = replace_vars_in_str(TopicToken, MapMsg),
-    Payload = replace_vars_in_str(PayloadToken, MapMsg),
+    Payload = process_payload(PayloadToken, MapMsg),
     QoS = replace_simple_var(QoSToken, MapMsg),
     Retain = replace_simple_var(RetainToken, MapMsg),
     set_headers(Props,
         emqx_message:set_flags(#{dup => Dup, retain => Retain},
             emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
 
+process_payload([], Msg) ->
+    emqx_json:encode(Msg);
+process_payload(Tks, Msg) ->
+    replace_vars_in_str(Tks, Msg).
+
 %% Replace a string contains vars to another string in which the placeholders are replace by the
 %% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
 %% "a: 1".