Просмотр исходного кода

Merge pull request #12163 from JimMoen/EMQX-11525-gw-jt808-cannot-list-client

fix(gw_jt808): cannot list client
JimMoen 2 лет назад
Родитель
Сommit
bce35b2dd8

+ 1 - 1
apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl

@@ -416,7 +416,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}
     log(error, #{msg => "unexpected_sock_closed", reason => Reason}, Channel),
     {ok, Channel};
 handle_info(Info, Channel) ->
-    log(error, #{msg => "unexpected_info}", info => Info}, Channel),
+    log(error, #{msg => "unexpected_info", info => Info}, Channel),
     {ok, Channel}.
 
 %%--------------------------------------------------------------------

+ 100 - 38
apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl

@@ -48,7 +48,7 @@
     %% AuthCode
     authcode :: undefined | anonymous | binary(),
     %% Keepalive
-    keepalive,
+    keepalive :: maybe(emqx_keepalive:keepalive()),
     %% Msg SN
     msg_sn,
     %% Down Topic
@@ -85,6 +85,8 @@
 
 -define(INFO_KEYS, [ctx, conninfo, zone, clientid, clientinfo, session, conn_state, authcode]).
 
+-define(DN_TOPIC_SUBOPTS, #{rap => 0, nl => 0, qos => 0, rh => 0}).
+
 -define(RETX_INTERVAL, 8000).
 -define(RETX_MAX_TIME, 5).
 
@@ -115,15 +117,28 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
     ClientId;
 info(clientinfo, #channel{clientinfo = ClientInfo}) ->
     ClientInfo;
-info(session, _) ->
-    #{};
+info(session, #channel{session = Session}) ->
+    Session;
 info(conn_state, #channel{conn_state = ConnState}) ->
     ConnState;
 info(authcode, #channel{authcode = AuthCode}) ->
     AuthCode.
 
-stats(_Channel) ->
-    [].
+-spec stats(channel()) -> emqx_types:stats().
+stats(#channel{inflight = Inflight, mqueue = Queue}) ->
+    %% XXX: A fake stats for managed by emqx_management
+    [
+        {subscriptions_cnt, 1},
+        {subscriptions_max, 1},
+        {inflight_cnt, emqx_inflight:size(Inflight)},
+        {inflight_max, emqx_inflight:max_size(Inflight)},
+        {mqueue_len, queue:len(Queue)},
+        {mqueue_max, 0},
+        {mqueue_dropped, 0},
+        {next_pkt_id, 0},
+        {awaiting_rel_cnt, 0},
+        {awaiting_rel_max, 0}
+    ].
 
 %%--------------------------------------------------------------------
 %% Init the Channel
@@ -173,7 +188,7 @@ init(
         conn_state = idle,
         timers = #{},
         authcode = undefined,
-        keepalive = maps:get(keepalive, Options, ?DEFAULT_KEEPALIVE),
+        keepalive = undefined,
         msg_sn = 0,
         % TODO: init rsa_key from user input
         dn_topic = maps:get(dn_topic, ProtoConf, ?DEFAULT_DN_TOPIC),
@@ -228,9 +243,8 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) ->
     #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
     case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of
         {ok, Authcode} ->
-            Channel = enrich_clientinfo(
-                Frame, enrich_conninfo(Frame, Channel0#channel{authcode = Authcode})
-            ),
+            {ok, Conninfo} = enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}),
+            {ok, Channel} = enrich_clientinfo(Frame, Conninfo),
             handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel);
         {error, Reason} ->
             ?SLOG(error, #{msg => "register_failed", reason => Reason}),
@@ -243,25 +257,26 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) ->
     end;
 do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) ->
     #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
-    Channel =
-        #channel{clientinfo = #{clientid := ClientId}} =
-        enrich_clientinfo(Frame, enrich_conninfo(Frame, Channel0)),
-    authack(
-        case authenticate(Frame, Channel0) of
-            true ->
-                NChannel = prepare_adapter_topic(ensure_connected(Channel)),
-                emqx_logger:set_metadata_clientid(ClientId),
-                %% Auto subscribe downlink topics
-                autosubcribe(NChannel),
-                _ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel),
-                %% 0: Successful
-                {0, MsgSn, NChannel};
-            false ->
-                ?SLOG(error, #{msg => "authenticated_failed"}),
-                %% 1: Failure
-                {1, MsgSn, Channel}
-        end
-    );
+    case
+        emqx_utils:pipeline(
+            [
+                fun enrich_clientinfo/2,
+                fun enrich_conninfo/2,
+                fun set_log_meta/2
+            ],
+            Frame,
+            Channel0
+        )
+    of
+        {ok, _NFrame, Channel} ->
+            case authenticate(Frame, Channel) of
+                true ->
+                    NChannel = process_connect(Frame, ensure_connected(Channel)),
+                    authack({0, MsgSn, NChannel});
+                false ->
+                    authack({1, MsgSn, Channel})
+            end
+    end;
 do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) ->
     handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel);
 do_handle_in(?MSG(?MC_RSA_KEY), Channel = #channel{rsa_key = [E, N]}) ->
@@ -428,6 +443,8 @@ handle_call(kick, _From, Channel) ->
     disconnect_and_shutdown(kicked, ok, Channel1);
 handle_call(discard, _From, Channel) ->
     disconnect_and_shutdown(discarded, ok, Channel);
+handle_call(subscriptions, _From, Channel = #channel{dn_topic = DnTopic}) ->
+    reply({ok, [{DnTopic, ?DN_TOPIC_SUBOPTS}]}, Channel);
 handle_call(Req, _From, Channel) ->
     log(error, #{msg => "unexpected_call", call => Req}, Channel),
     reply(ignored, Channel).
@@ -464,6 +481,9 @@ handle_info(
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
     log(error, #{msg => "unexpected_sock_closed", reason => Reason}, Channel),
     {ok, Channel};
+handle_info({keepalive, start, Interval}, Channel) ->
+    NChannel = Channel#channel{keepalive = emqx_keepalive:init(Interval)},
+    {ok, ensure_timer(alive_timer, NChannel)};
 handle_info(Info, Channel) ->
     log(error, #{msg => "unexpected_info", info => Info}, Channel),
     {ok, Channel}.
@@ -615,6 +635,46 @@ maybe_fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) ->
     Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
     ClientInfo#{mountpoint := Mountpoint1}.
 
+process_connect(
+    _Frame,
+    Channel = #channel{
+        ctx = Ctx,
+        conninfo = ConnInfo,
+        clientinfo = ClientInfo = #{clientid := ClientId}
+    }
+) ->
+    SessFun = fun(_, _) -> #{} end,
+    case
+        emqx_gateway_ctx:open_session(
+            Ctx,
+            true,
+            ClientInfo,
+            ConnInfo,
+            SessFun
+        )
+    of
+        {ok, #{session := Session}} ->
+            NChannel = Channel#channel{session = Session},
+            %% Auto subscribe downlink topics
+            ok = autosubcribe(NChannel),
+            _ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel),
+            _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]),
+            _ = emqx_gateway_ctx:insert_channel_info(
+                Ctx, ClientId, info(NChannel), stats(NChannel)
+            ),
+            NChannel;
+        {error, Reason} ->
+            log(
+                error,
+                #{
+                    msg => "failed_to_open_session",
+                    reason => Reason
+                },
+                Channel
+            ),
+            shutdown(Reason, Channel)
+    end.
+
 ensure_connected(
     Channel = #channel{
         ctx = Ctx,
@@ -624,10 +684,7 @@ ensure_connected(
 ) ->
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
-    Channel#channel{
-        conninfo = NConnInfo,
-        conn_state = connected
-    }.
+    prepare_adapter_topic(Channel#channel{conninfo = NConnInfo, conn_state = connected}).
 
 %% Ensure disconnected
 ensure_disconnected(
@@ -836,7 +893,7 @@ enrich_conninfo(
         receive_maximum => 0,
         expiry_interval => 0
     },
-    Channel#channel{conninfo = NConnInfo}.
+    {ok, Channel#channel{conninfo = NConnInfo}}.
 
 %% Register
 enrich_clientinfo(
@@ -855,7 +912,7 @@ enrich_clientinfo(
         manufacturer => Manu,
         terminal_id => DevId
     }),
-    Channel#channel{clientinfo = NClientInfo};
+    {ok, Channel#channel{clientinfo = NClientInfo}};
 %% Auth
 enrich_clientinfo(
     #{<<"header">> := #{<<"phone">> := Phone}},
@@ -865,7 +922,11 @@ enrich_clientinfo(
         phone => Phone,
         clientid => Phone
     },
-    Channel#channel{clientinfo = NClientInfo}.
+    {ok, Channel#channel{clientinfo = NClientInfo}}.
+
+set_log_meta(_Packet, #channel{clientinfo = #{clientid := ClientId}}) ->
+    emqx_logger:set_metadata_clientid(ClientId),
+    ok.
 
 prepare_adapter_topic(Channel = #channel{up_topic = UpTopic, dn_topic = DnTopic}) ->
     Channel#channel{
@@ -905,9 +966,10 @@ autosubcribe(#channel{
             #{clientid := ClientId},
     dn_topic = Topic
 }) ->
-    SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0},
-    emqx:subscribe(Topic, ClientId, SubOpts),
-    ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]).
+    _ = emqx_broker:subscribe(Topic, ClientId, ?DN_TOPIC_SUBOPTS),
+    ok = emqx_hooks:run('session.subscribed', [
+        ClientInfo, Topic, ?DN_TOPIC_SUBOPTS#{is_new => true}
+    ]).
 
 start_keepalive(Secs, _Channel) when Secs > 0 ->
     self() ! {keepalive, start, round(Secs) * 1000}.

+ 8 - 8
apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl

@@ -324,6 +324,14 @@ location_report_28bytes() ->
 binary_to_hex_string(Data) ->
     lists:flatten([io_lib:format("~2.16.0B ", [X]) || <<X:8>> <= Data]).
 
+receive_msg() ->
+    receive
+        {deliver, Topic, #message{payload = Payload}} ->
+            {Topic, Payload}
+    after 100 ->
+        {error, timeout}
+    end.
+
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%% test cases %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
 t_case00_register(_) ->
@@ -2677,11 +2685,3 @@ t_case34_dl_0x8805_single_mm_data_ctrl(_Config) ->
     {error, timeout} = gen_tcp:recv(Socket, 0, 500),
 
     ok = gen_tcp:close(Socket).
-
-receive_msg() ->
-    receive
-        {deliver, Topic, #message{payload = Payload}} ->
-            {Topic, Payload}
-    after 100 ->
-        {error, timeout}
-    end.