ソースを参照

feat(bridge): support http bridge

Shawn 4 年 前
コミット
e2721c144c

+ 1 - 1
apps/emqx/src/emqx_config_handler.erl

@@ -77,7 +77,7 @@ stop() ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
 update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
 update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
     ?ATOM_CONF_PATH(ConfKeyPath, gen_server:call(?MODULE, {change_config, SchemaModule,
     ?ATOM_CONF_PATH(ConfKeyPath, gen_server:call(?MODULE, {change_config, SchemaModule,
-        AtomKeyPath, UpdateArgs}), {error, ConfKeyPath}).
+        AtomKeyPath, UpdateArgs}), {error, {not_found, ConfKeyPath}}).
 
 
 -spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok.
 -spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok.
 add_handler(ConfKeyPath, HandlerName) ->
 add_handler(ConfKeyPath, HandlerName) ->

+ 27 - 0
apps/emqx_bridge/etc/emqx_bridge.conf

@@ -45,3 +45,30 @@
 #        retain = false
 #        retain = false
 #    }
 #    }
 #}
 #}
+#
+#bridges.http.my_http_bridge {
+#    base_url: "http://localhost:9901"
+#    connect_timeout: "30s"
+#    max_retries: 3
+#    retry_interval = "10s"
+#    pool_type = "random"
+#    pool_size = 4
+#    enable_pipelining = true
+#    ssl {
+#        enable = false
+#        keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
+#        certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
+#        cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
+#    }
+#    egress_channels.post_messages {
+#        subscribe_local_topic = "emqx_http/#"
+#        request_timeout: "30s"
+#        ## following config entries can use placehodler variables
+#        method = post
+#        path = "/messages/${topic}"
+#        body = "${payload}"
+#        headers {
+#          "content-type": "application/json"
+#        }
+#    }
+#}

+ 96 - 22
apps/emqx_bridge/src/emqx_bridge.erl

@@ -15,9 +15,15 @@
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 -module(emqx_bridge).
 -module(emqx_bridge).
 -behaviour(emqx_config_handler).
 -behaviour(emqx_config_handler).
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 
 -export([post_config_update/4]).
 -export([post_config_update/4]).
 
 
+-export([reload_hook/0, unload_hook/0]).
+
+-export([on_message_publish/1]).
+
 -export([ load_bridges/0
 -export([ load_bridges/0
         , get_bridge/2
         , get_bridge/2
         , get_bridge/3
         , get_bridge/3
@@ -28,6 +34,7 @@
         , start_bridge/2
         , start_bridge/2
         , stop_bridge/2
         , stop_bridge/2
         , restart_bridge/2
         , restart_bridge/2
+        , send_message/2
         ]).
         ]).
 
 
 -export([ config_key_path/0
 -export([ config_key_path/0
@@ -38,24 +45,57 @@
         , resource_id/1
         , resource_id/1
         , resource_id/2
         , resource_id/2
         , parse_bridge_id/1
         , parse_bridge_id/1
+        , channel_id/4
+        , parse_channel_id/1
         ]).
         ]).
 
 
+reload_hook() ->
+    unload_hook(),
+    Bridges = emqx:get_config([bridges], #{}),
+    lists:foreach(fun({_Type, Bridge}) ->
+            lists:foreach(fun({_Name, BridgeConf}) ->
+                    load_hook(BridgeConf)
+                end, maps:to_list(Bridge))
+        end, maps:to_list(Bridges)).
+
+load_hook(#{egress_channels := Channels}) ->
+    case has_subscribe_local_topic(Channels) of
+        true -> ok;
+        false -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []})
+    end;
+load_hook(_Conf) -> ok.
+
+unload_hook() ->
+    ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
+
+on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
+    case maps:get(sys, Flags, false) of
+        false ->
+            ChannelIds = get_matched_channels(Topic),
+            lists:foreach(fun(ChannelId) ->
+                    send_message(ChannelId, emqx_message:to_map(Message))
+                end, ChannelIds);
+        true -> ok
+    end,
+    {ok, Message}.
+
+%% TODO: remove this clause, treat mqtt bridges the same as other bridges
+send_message(ChannelId, Message) ->
+    {BridgeType, BridgeName, _, _} = parse_channel_id(ChannelId),
+    ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
+    do_send_message(ResId, ChannelId, Message).
+
+do_send_message(ResId, ChannelId, Message) ->
+    emqx_resource:query(ResId, {send_message, ChannelId, Message}).
+
 config_key_path() ->
 config_key_path() ->
     [bridges].
     [bridges].
 
 
 resource_type(mqtt) -> emqx_connector_mqtt;
 resource_type(mqtt) -> emqx_connector_mqtt;
-resource_type(mysql) -> emqx_connector_mysql;
-resource_type(pgsql) -> emqx_connector_pgsql;
-resource_type(mongo) -> emqx_connector_mongo;
-resource_type(redis) -> emqx_connector_redis;
-resource_type(ldap) -> emqx_connector_ldap.
+resource_type(http) -> emqx_connector_http.
 
 
 bridge_type(emqx_connector_mqtt) -> mqtt;
 bridge_type(emqx_connector_mqtt) -> mqtt;
-bridge_type(emqx_connector_mysql) -> mysql;
-bridge_type(emqx_connector_pgsql) -> pgsql;
-bridge_type(emqx_connector_mongo) -> mongo;
-bridge_type(emqx_connector_redis) -> redis;
-bridge_type(emqx_connector_ldap) -> ldap.
+bridge_type(emqx_connector_http) -> http.
 
 
 post_config_update(_Req, NewConf, OldConf, _AppEnv) ->
 post_config_update(_Req, NewConf, OldConf, _AppEnv) ->
     #{added := Added, removed := Removed, changed := Updated}
     #{added := Added, removed := Removed, changed := Updated}
@@ -100,11 +140,23 @@ bridge_id(BridgeType, BridgeName) ->
     <<Type/binary, ":", Name/binary>>.
     <<Type/binary, ":", Name/binary>>.
 
 
 parse_bridge_id(BridgeId) ->
 parse_bridge_id(BridgeId) ->
-    try
-        [Type, Name] = string:split(str(BridgeId), ":", leading),
-        {list_to_existing_atom(Type), list_to_atom(Name)}
-    catch
-        _ : _ -> error({invalid_bridge_id, BridgeId})
+    case string:split(bin(BridgeId), ":", all) of
+        [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)};
+        _ -> error({invalid_bridge_id, BridgeId})
+    end.
+
+channel_id(BridgeType, BridgeName, ChannelType, ChannelName) ->
+    BType = bin(BridgeType),
+    BName = bin(BridgeName),
+    CType = bin(ChannelType),
+    CName = bin(ChannelName),
+    <<BType/binary, ":", BName/binary, ":", CType/binary, ":", CName/binary>>.
+
+parse_channel_id(ChannelId) ->
+    case string:split(bin(ChannelId), ":", all) of
+        [BridgeType, BridgeName, ChannelType, ChannelName] ->
+            {BridgeType, BridgeName, ChannelType, ChannelName};
+        _ -> error({invalid_bridge_id, ChannelId})
     end.
     end.
 
 
 list_bridges() ->
 list_bridges() ->
@@ -184,13 +236,35 @@ flatten_confs(Conf0) ->
 do_flatten_confs(Type, Conf0) ->
 do_flatten_confs(Type, Conf0) ->
     [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
     [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
 
 
+has_subscribe_local_topic(Channels) ->
+    lists:any(fun (#{subscribe_local_topic := _}) -> true;
+                  (_) -> false
+        end, maps:to_list(Channels)).
+
+get_matched_channels(Topic) ->
+    Bridges = emqx:get_config([bridges], #{}),
+    maps:fold(fun
+        %% TODO: also trigger 'message.publish' for mqtt bridges.
+        (mqtt, _Conf, Acc0) -> Acc0;
+        (BType, Conf, Acc0) ->
+            maps:fold(fun
+                (BName, #{egress_channels := Channels}, Acc1) ->
+                    do_get_matched_channels(Topic, Channels, BType, BName, egress_channels)
+                    ++ Acc1;
+                (_Name, _BridgeConf, Acc1) -> Acc1
+            end, Acc0, Conf)
+    end, [], Bridges).
+
+do_get_matched_channels(Topic, Channels, BType, BName, CType) ->
+    maps:fold(fun
+        (ChannName, #{subscribe_local_topic := Filter}, Acc) ->
+            case emqx_topic:match(Topic, Filter) of
+                true -> [channel_id(BType, BName, CType, ChannName) | Acc];
+                false -> Acc
+            end;
+        (_ChannName, _ChannConf, Acc) -> Acc
+    end, [], Channels).
+
 bin(Bin) when is_binary(Bin) -> Bin;
 bin(Bin) when is_binary(Bin) -> Bin;
 bin(Str) when is_list(Str) -> list_to_binary(Str);
 bin(Str) when is_list(Str) -> list_to_binary(Str);
 bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
 bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
-
-str(A) when is_atom(A) ->
-    atom_to_list(A);
-str(B) when is_binary(B) ->
-    binary_to_list(B);
-str(S) when is_list(S) ->
-    S.

+ 2 - 0
apps/emqx_bridge/src/emqx_bridge_app.erl

@@ -22,10 +22,12 @@
 start(_StartType, _StartArgs) ->
 start(_StartType, _StartArgs) ->
     {ok, Sup} = emqx_bridge_sup:start_link(),
     {ok, Sup} = emqx_bridge_sup:start_link(),
     ok = emqx_bridge:load_bridges(),
     ok = emqx_bridge:load_bridges(),
+    ok = emqx_bridge:reload_hook(),
     emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge),
     emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge),
     {ok, Sup}.
     {ok, Sup}.
 
 
 stop(_State) ->
 stop(_State) ->
+    ok = emqx_bridge:unload_hook(),
     ok.
     ok.
 
 
 %% internal functions
 %% internal functions

+ 13 - 2
apps/emqx_bridge/src/emqx_bridge_schema.erl

@@ -1,5 +1,7 @@
 -module(emqx_bridge_schema).
 -module(emqx_bridge_schema).
 
 
+-include_lib("typerefl/include/types.hrl").
+
 -export([roots/0, fields/1]).
 -export([roots/0, fields/1]).
 
 
 %%======================================================================================
 %%======================================================================================
@@ -8,7 +10,16 @@
 roots() -> [bridges].
 roots() -> [bridges].
 
 
 fields(bridges) ->
 fields(bridges) ->
-    [{mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))}];
+    [ {mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))}
+    , {http, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "http_bridge")))}
+    ];
 
 
 fields("mqtt_bridge") ->
 fields("mqtt_bridge") ->
-    emqx_connector_mqtt:fields("config").
+    emqx_connector_mqtt:fields("config");
+
+fields("http_bridge") ->
+    emqx_connector_http:fields(config) ++ http_channels().
+
+http_channels() ->
+    [{egress_channels, hoconsc:mk(hoconsc:map(id,
+        hoconsc:ref(emqx_connector_http, "http_request")))}].

+ 102 - 8
apps/emqx_connector/src/emqx_connector_http.erl

@@ -21,6 +21,8 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
 -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
 
 
+-include_lib("emqx/include/logger.hrl").
+
 %% callbacks of behaviour emqx_resource
 %% callbacks of behaviour emqx_resource
 -export([ on_start/2
 -export([ on_start/2
         , on_stop/2
         , on_stop/2
@@ -38,7 +40,7 @@
 
 
 -export([ check_ssl_opts/2 ]).
 -export([ check_ssl_opts/2 ]).
 
 
--type connect_timeout() :: non_neg_integer() | infinity.
+-type connect_timeout() :: emqx_schema:duration() | infinity.
 -type pool_type() :: random | hash.
 -type pool_type() :: random | hash.
 
 
 -reflect_type([ connect_timeout/0
 -reflect_type([ connect_timeout/0
@@ -50,6 +52,22 @@
 roots() ->
 roots() ->
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
 
 
+fields("http_request") ->
+    [ {subscribe_local_topic, hoconsc:mk(binary())}
+    , {method, hoconsc:mk(method(), #{default => post})}
+    , {path, hoconsc:mk(binary(), #{default => <<"">>})}
+    , {headers, hoconsc:mk(map(),
+        #{default => #{
+            <<"accept">> => <<"application/json">>,
+            <<"cache-control">> => <<"no-cache">>,
+            <<"connection">> => <<"keep-alive">>,
+            <<"content-type">> => <<"application/json">>,
+            <<"keep-alive">> => <<"timeout=5">>}})
+      }
+    , {body, hoconsc:mk(binary(), #{default => <<"${payload}">>})}
+    , {request_timeout, hoconsc:mk(emqx_schema:duration_ms(), #{default => <<"30s">>})}
+    ];
+
 fields(config) ->
 fields(config) ->
     [ {base_url,          fun base_url/1}
     [ {base_url,          fun base_url/1}
     , {connect_timeout,   fun connect_timeout/1}
     , {connect_timeout,   fun connect_timeout/1}
@@ -60,6 +78,13 @@ fields(config) ->
     , {enable_pipelining, fun enable_pipelining/1}
     , {enable_pipelining, fun enable_pipelining/1}
     ] ++ emqx_connector_schema_lib:ssl_fields().
     ] ++ emqx_connector_schema_lib:ssl_fields().
 
 
+method() ->
+    hoconsc:union([ typerefl:atom(post)
+                  , typerefl:atom(put)
+                  , typerefl:atom(get)
+                  , typerefl:atom(delete)
+                  ]).
+
 validations() ->
 validations() ->
     [ {check_ssl_opts, fun check_ssl_opts/1} ].
     [ {check_ssl_opts, fun check_ssl_opts/1} ].
 
 
@@ -79,7 +104,7 @@ max_retries(type) -> non_neg_integer();
 max_retries(default) -> 5;
 max_retries(default) -> 5;
 max_retries(_) -> undefined.
 max_retries(_) -> undefined.
 
 
-retry_interval(type) -> emqx_schema:duration_ms();
+retry_interval(type) -> emqx_schema:duration();
 retry_interval(default) -> "1s";
 retry_interval(default) -> "1s";
 retry_interval(_) -> undefined.
 retry_interval(_) -> undefined.
 
 
@@ -111,7 +136,7 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
                                          {tcp, []};
                                          {tcp, []};
                                      https ->
                                      https ->
                                          SSLOpts = emqx_plugin_libs_ssl:save_files_return_opts(
                                          SSLOpts = emqx_plugin_libs_ssl:save_files_return_opts(
-                                                    maps:get(ssl_opts, Config), "connectors", InstId),
+                                                    maps:get(ssl, Config), "connectors", InstId),
                                          {tls, SSLOpts}
                                          {tls, SSLOpts}
                                  end,
                                  end,
     NTransportOpts = emqx_misc:ipv6_probe(TransportOpts),
     NTransportOpts = emqx_misc:ipv6_probe(TransportOpts),
@@ -126,16 +151,32 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
                , {transport, Transport}
                , {transport, Transport}
                , {transport_opts, NTransportOpts}],
                , {transport_opts, NTransportOpts}],
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
-    {ok, _} = ehttpc_sup:start_pool(PoolName, PoolOpts),
-    {ok, #{pool_name => PoolName,
-           host => Host,
-           port => Port,
-           base_path => BasePath}}.
+    State = #{
+        pool_name => PoolName,
+        host => Host,
+        port => Port,
+        base_path => BasePath,
+        channels => preproc_channels(InstId, Config)
+    },
+    case ehttpc_sup:start_pool(PoolName, PoolOpts) of
+        {ok, _} -> {ok, State};
+        {error, {already_started, _}} -> {ok, State};
+        {error, Reason} ->
+            {error, Reason}
+    end.
 
 
 on_stop(InstId, #{pool_name := PoolName}) ->
 on_stop(InstId, #{pool_name := PoolName}) ->
     logger:info("stopping http connector: ~p", [InstId]),
     logger:info("stopping http connector: ~p", [InstId]),
     ehttpc_sup:stop_pool(PoolName).
     ehttpc_sup:stop_pool(PoolName).
 
 
+on_query(InstId, {send_message, ChannelId, Msg}, AfterQuery, #{channels := Channels} = State) ->
+    case maps:find(ChannelId, Channels) of
+        error -> ?SLOG(error, #{msg => "channel not found", channel_id => ChannelId});
+        {ok, ChannConf} ->
+            #{method := Method, path := Path, body := Body, headers := Headers,
+              request_timeout := Timeout} = proc_channel_conf(ChannConf, Msg),
+            on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State)
+    end;
 on_query(InstId, {Method, Request}, AfterQuery, State) ->
 on_query(InstId, {Method, Request}, AfterQuery, State) ->
     on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State);
     on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State);
 on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
 on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
@@ -169,6 +210,52 @@ on_health_check(_InstId, #{host := Host, port := Port} = State) ->
 %% Internal functions
 %% Internal functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+preproc_channels(<<"bridge:", BridgeId/binary>>, Config) ->
+    {BridgeType, BridgeName} = emqx_bridge:parse_bridge_id(BridgeId),
+    maps:fold(fun(ChannName, ChannConf, Acc) ->
+            Acc#{emqx_bridge:channel_id(BridgeType, BridgeName, egress_channels, ChannName) =>
+                 preproc_channel_conf(ChannConf)}
+        end, #{}, maps:get(egress_channels, Config, #{})).
+
+preproc_channel_conf(#{
+        method := Method,
+        path := Path,
+        body := Body,
+        headers := Headers} = Conf) ->
+    Conf#{ method => emqx_plugin_libs_rule:preproc_tmpl(bin(Method))
+         , path => emqx_plugin_libs_rule:preproc_tmpl(Path)
+         , body => emqx_plugin_libs_rule:preproc_tmpl(Body)
+         , headers => preproc_headers(Headers)
+         }.
+
+preproc_headers(Headers) ->
+    maps:fold(fun(K, V, Acc) ->
+            Acc#{emqx_plugin_libs_rule:preproc_tmpl(bin(K)) =>
+                 emqx_plugin_libs_rule:preproc_tmpl(bin(V))}
+        end, #{}, Headers).
+
+proc_channel_conf(#{
+        method := MethodTks,
+        path := PathTks,
+        body := BodyTks,
+        headers := HeadersTks} = Conf, Msg) ->
+    Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg))
+         , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg)
+         , body => emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg)
+         , headers => maps:to_list(proc_headers(HeadersTks, Msg))
+         }.
+
+proc_headers(HeaderTks, Msg) ->
+    maps:fold(fun(K, V, Acc) ->
+            Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) =>
+                 emqx_plugin_libs_rule:proc_tmpl(V, Msg)}
+        end, #{}, HeaderTks).
+
+make_method(M) when M == <<"POST">>; M == <<"post">> -> post;
+make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;
+make_method(M) when M == <<"GET">>; M == <<"get">> -> get;
+make_method(M) when M == <<"DELETE">>; M == <<"delete">> -> delete.
+
 check_ssl_opts(Conf) ->
 check_ssl_opts(Conf) ->
     check_ssl_opts("base_url", Conf).
     check_ssl_opts("base_url", Conf).
 
 
@@ -185,3 +272,10 @@ update_path(BasePath, {Path, Headers}) ->
     {filename:join(BasePath, Path), Headers};
     {filename:join(BasePath, Path), Headers};
 update_path(BasePath, {Path, Headers, Body}) ->
 update_path(BasePath, {Path, Headers, Body}) ->
     {filename:join(BasePath, Path), Headers, Body}.
     {filename:join(BasePath, Path), Headers, Body}.
+
+bin(Bin) when is_binary(Bin) ->
+    Bin;
+bin(Str) when is_list(Str) ->
+    list_to_binary(Str);
+bin(Atom) when is_atom(Atom) ->
+    atom_to_binary(Atom, utf8).

+ 3 - 3
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -121,9 +121,9 @@ on_stop(InstId, #{channels := NameList}) ->
 on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
 on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
         baisc_conf := BasicConf}) ->
         baisc_conf := BasicConf}) ->
     create_channel(Conf, Prefix, BasicConf);
     create_channel(Conf, Prefix, BasicConf);
-on_query(_InstId, {send_to_remote, ChannelName, Msg}, _AfterQuery, _State) ->
-    logger:debug("send msg to remote node on channel: ~p, msg: ~p", [ChannelName, Msg]),
-    emqx_connector_mqtt_worker:send_to_remote(ChannelName, Msg).
+on_query(_InstId, {send_message, ChannelId, Msg}, _AfterQuery, _State) ->
+    logger:debug("send msg to remote node on channel: ~p, msg: ~p", [ChannelId, Msg]),
+    emqx_connector_mqtt_worker:send_to_remote(ChannelId, Msg).
 
 
 on_health_check(_InstId, #{channels := NameList} = State) ->
 on_health_check(_InstId, #{channels := NameList} = State) ->
     Results = [{Name, emqx_connector_mqtt_worker:ping(Name)} || Name <- NameList],
     Results = [{Name, emqx_connector_mqtt_worker:ping(Name)} || Name <- NameList],

+ 1 - 3
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -240,9 +240,7 @@ handle_output(OutId, Selected, Envs) ->
 
 
 do_handle_output(#{type := bridge, target := ChannelId}, Selected, _Envs) ->
 do_handle_output(#{type := bridge, target := ChannelId}, Selected, _Envs) ->
     ?LOG(debug, "output to bridge: ~p", [ChannelId]),
     ?LOG(debug, "output to bridge: ~p", [ChannelId]),
-    [Type, BridgeName | _] = string:split(ChannelId, ":", all),
-    ResId = emqx_bridge:resource_id(<<Type/binary, ":", BridgeName/binary>>),
-    emqx_resource:query(ResId, {send_to_remote, ChannelId, Selected});
+    emqx_bridge:send_message(ChannelId, Selected);
 do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) ->
 do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) ->
     erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]);
     erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]);
 do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs)
 do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs)