|
|
@@ -201,7 +201,7 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|
|
#{
|
|
|
<<"type">> := ?TYPE_MQTT,
|
|
|
<<"name">> := ?BRIDGE_NAME_INGRESS
|
|
|
- } = jsx:decode(Bridge),
|
|
|
+ } = emqx_utils_json:decode(Bridge),
|
|
|
|
|
|
BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
|
|
|
|
|
|
@@ -313,7 +313,7 @@ t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
|
|
|
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
|
|
%% we should receive a message on the local broker, with specified topic
|
|
|
Msg = assert_mqtt_msg_received(LocalTopic),
|
|
|
- ?assertMatch(#{<<"payload">> := Payload}, jsx:decode(Msg#message.payload)),
|
|
|
+ ?assertMatch(#{<<"payload">> := Payload}, emqx_utils_json:decode(Msg#message.payload)),
|
|
|
|
|
|
%% verify the metrics of the bridge
|
|
|
?assertMetrics(
|
|
|
@@ -402,7 +402,7 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
|
|
|
Msg = assert_mqtt_msg_received(RemoteTopic),
|
|
|
%% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
|
|
|
?assertMatch(<<ResourceID:(byte_size(ResourceID))/binary, _/binary>>, Msg#message.from),
|
|
|
- ?assertMatch(#{<<"payload">> := Payload}, jsx:decode(Msg#message.payload)),
|
|
|
+ ?assertMatch(#{<<"payload">> := Payload}, emqx_utils_json:decode(Msg#message.payload)),
|
|
|
|
|
|
%% verify the metrics of the bridge
|
|
|
?retry(
|
|
|
@@ -545,7 +545,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
|
|
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
|
|
|
}
|
|
|
),
|
|
|
- #{<<"id">> := RuleId} = jsx:decode(Rule),
|
|
|
+ #{<<"id">> := RuleId} = emqx_utils_json:decode(Rule),
|
|
|
|
|
|
%% we now test if the bridge works as expected
|
|
|
|
|
|
@@ -562,7 +562,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
|
|
%% and also the rule should be matched, with matched + 1:
|
|
|
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
|
|
{ok, 200, Metrics} = request(get, uri(["rules", RuleId, "metrics"]), []),
|
|
|
- ?assertMatch(#{<<"id">> := RuleId}, jsx:decode(Rule1)),
|
|
|
+ ?assertMatch(#{<<"id">> := RuleId}, emqx_utils_json:decode(Rule1)),
|
|
|
?assertMatch(
|
|
|
#{
|
|
|
<<"metrics">> := #{
|
|
|
@@ -581,7 +581,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
|
|
<<"actions.failed.unknown">> := 0
|
|
|
}
|
|
|
},
|
|
|
- jsx:decode(Metrics)
|
|
|
+ emqx_utils_json:decode(Metrics)
|
|
|
),
|
|
|
|
|
|
%% we also check if the actions of the rule is triggered
|
|
|
@@ -630,7 +630,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
|
|
<<"sql">> => <<"SELECT * from \"t/1\"">>
|
|
|
}
|
|
|
),
|
|
|
- #{<<"id">> := RuleId} = jsx:decode(Rule),
|
|
|
+ #{<<"id">> := RuleId} = emqx_utils_json:decode(Rule),
|
|
|
|
|
|
%% we now test if the bridge works as expected
|
|
|
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
|
|
@@ -653,7 +653,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
|
|
timer:sleep(100),
|
|
|
emqx:publish(emqx_message:make(RuleTopic, Payload2)),
|
|
|
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
|
|
- ?assertMatch(#{<<"id">> := RuleId, <<"name">> := _}, jsx:decode(Rule1)),
|
|
|
+ ?assertMatch(#{<<"id">> := RuleId, <<"name">> := _}, emqx_utils_json:decode(Rule1)),
|
|
|
{ok, 200, Metrics} = request(get, uri(["rules", RuleId, "metrics"]), []),
|
|
|
?assertMatch(
|
|
|
#{
|
|
|
@@ -673,7 +673,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
|
|
<<"actions.failed.unknown">> := 0
|
|
|
}
|
|
|
},
|
|
|
- jsx:decode(Metrics)
|
|
|
+ emqx_utils_json:decode(Metrics)
|
|
|
),
|
|
|
|
|
|
%% we should receive a message on the "remote" broker, with specified topic
|
|
|
@@ -911,17 +911,17 @@ create_bridge(Config = #{<<"type">> := Type, <<"name">> := Name}) ->
|
|
|
<<"type">> := Type,
|
|
|
<<"name">> := Name
|
|
|
},
|
|
|
- jsx:decode(Bridge)
|
|
|
+ emqx_utils_json:decode(Bridge)
|
|
|
),
|
|
|
emqx_bridge_resource:bridge_id(Type, Name).
|
|
|
|
|
|
request_bridge(BridgeID) ->
|
|
|
{ok, 200, Bridge} = request(get, uri(["bridges", BridgeID]), []),
|
|
|
- jsx:decode(Bridge).
|
|
|
+ emqx_utils_json:decode(Bridge).
|
|
|
|
|
|
request_bridge_metrics(BridgeID) ->
|
|
|
{ok, 200, BridgeMetrics} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
|
|
|
- jsx:decode(BridgeMetrics).
|
|
|
+ emqx_utils_json:decode(BridgeMetrics).
|
|
|
|
|
|
request(Method, Url, Body) ->
|
|
|
request(<<"connector_admin">>, Method, Url, Body).
|