Ver código fonte

fix(jt808): avoid funcation_clause error

Jianbo He 1 ano atrás
pai
commit
a743692d76

+ 1 - 1
apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_jt808, [
     {description, "JT/T 808 Gateway"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 25 - 3
apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl

@@ -11,6 +11,8 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
 %% behaviour callbacks
 -export([
     info/1,
@@ -397,13 +399,25 @@ msgs2frame(Messages, Channel) ->
     lists:filtermap(
         fun(#message{payload = Payload}) ->
             case emqx_utils_json:safe_decode(Payload, [return_maps]) of
-                {ok, Map} ->
-                    MsgId = msgid(Map),
+                {ok, Map = #{<<"header">> := #{<<"msg_id">> := MsgId}}} ->
                     NewHeader = build_frame_header(MsgId, Channel),
                     Frame = maps:put(<<"header">>, NewHeader, Map),
                     {true, Frame};
+                {ok, _} ->
+                    tp(
+                        error,
+                        invalid_dl_message,
+                        #{reasons => "missing_msg_id", payload => Payload},
+                        Channel
+                    ),
+                    false;
                 {error, Reason} ->
-                    log(error, #{msg => "json_decode_error", reason => Reason}, Channel),
+                    tp(
+                        error,
+                        invalid_dl_message,
+                        #{reason => "invalid_json", payload => Payload},
+                        Channel
+                    ),
                     false
             end
         end,
@@ -1047,6 +1061,14 @@ metrics_inc(Name, #channel{ctx = Ctx}, Oct) ->
 log(Level, Meta, #channel{clientinfo = #{clientid := ClientId, username := Username}} = _Channel) ->
     ?SLOG(Level, Meta#{clientid => ClientId, username => Username}).
 
+tp(
+    Level,
+    Key,
+    Meta,
+    #channel{clientinfo = #{clientid := ClientId, username := Username}} = _Channel
+) ->
+    ?tp(Level, Key, Meta#{clientid => ClientId, username => Username}).
+
 reply(Reply, Channel) ->
     {reply, Reply, Channel}.
 

+ 27 - 0
apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl

@@ -12,6 +12,8 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
 -define(FRM_FLAG, 16#7e:8).
 -define(RESERVE, 0).
 -define(NO_FRAGMENT, 0).
@@ -106,6 +108,7 @@ init_per_testcase(Case, Config) when
     Apps = boot_apps(Case, <<>>, Config),
     [{suite_apps, Apps} | Config];
 init_per_testcase(Case, Config) ->
+    snabbkaffe:start_trace(),
     Apps = boot_apps(Case, ?CONF_DEFAULT, Config),
     [{suite_apps, Apps} | Config].
 
@@ -116,6 +119,7 @@ end_per_testcase(_Case, Config) ->
         exit:noproc ->
             ok
     end,
+    snabbkaffe:stop(),
     ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
     ok.
 
@@ -2710,6 +2714,29 @@ t_case34_dl_0x8805_single_mm_data_ctrl(_Config) ->
 
     ok = gen_tcp:close(Socket).
 
+t_case_dl_invalid_msg(_Config) ->
+    {ok, Socket} = gen_tcp:connect({127, 0, 0, 1}, ?PORT, [binary, {active, false}]),
+    {ok, AuthCode} = client_regi_procedure(Socket),
+    ok = client_auth_procedure(Socket, AuthCode),
+    PhoneBCD = <<16#00, 16#01, 16#23, 16#45, 16#67, 16#89>>,
+
+    DlCommand = #{
+        %% missing msg_id
+        <<"header">> => #{},
+        <<"body">> => #{
+            <<"id">> => 30,
+            <<"flag">> => 40
+        }
+    },
+
+    emqx:publish(emqx_message:make(?JT808_DN_TOPIC, emqx_utils_json:encode(DlCommand))),
+    ?block_until(#{?snk_kind := invalid_dl_message}),
+
+    emqx:publish(emqx_message:make(?JT808_DN_TOPIC, <<"invliad_json_str">>)),
+    ?block_until(#{?snk_kind := invalid_dl_message}),
+
+    ok = gen_tcp:close(Socket).
+
 t_case_invalid_auth_reg_server(_Config) ->
     {ok, Socket} = gen_tcp:connect({127, 0, 0, 1}, ?PORT, [binary, {active, false}]),
     %