Browse Source

Merge pull request #12165 from JimMoen/EMQX-11498-ocpp-subscriptions

fix(gw_ocpp): handle subscriptions call
JianBo He 2 years atrás
parent
commit
f9f72d75fc
1 changed files with 31 additions and 16 deletions
  1. 31 16
      apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl

+ 31 - 16
apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl

@@ -127,6 +127,8 @@
     }
 ).
 
+-define(DEFAULT_OCPP_DN_SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1}).
+
 -dialyzer(no_match).
 
 %%--------------------------------------------------------------------
@@ -547,6 +549,13 @@ handle_call(kick, _From, Channel) ->
     shutdown(kicked, ok, Channel);
 handle_call(discard, _From, Channel) ->
     shutdown(discarded, ok, Channel);
+handle_call(
+    subscriptions,
+    _From,
+    Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}}
+) ->
+    Subs = [{dntopic(ClientId, Mountpoint), ?DEFAULT_OCPP_DN_SUBOPTS}],
+    reply({ok, Subs}, Channel);
 handle_call(Req, From, Channel) ->
     ?SLOG(error, #{msg => "unexpected_call", req => Req, from => From}),
     reply(ignored, Channel).
@@ -614,22 +623,6 @@ process_connect(
             {error, Reason}
     end.
 
-ensure_subscribe_dn_topics(
-    Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint} = ClientInfo}
-) ->
-    SubOpts = #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1},
-    Topic0 = proc_tmpl(
-        emqx_ocpp_conf:dntopic(),
-        #{
-            clientid => ClientId,
-            cid => ClientId
-        }
-    ),
-    Topic = emqx_mountpoint:mount(Mountpoint, Topic0),
-    ok = emqx_broker:subscribe(Topic, ClientId, SubOpts),
-    ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
-    Channel.
-
 %%--------------------------------------------------------------------
 %% Handle timeout
 %%--------------------------------------------------------------------
@@ -853,6 +846,28 @@ reset_keepalive(Interval, Channel = #channel{conninfo = ConnInfo, timers = Timer
 heartbeat_checking_times_backoff() ->
     max(0, emqx_ocpp_conf:heartbeat_checking_times_backoff() - 1).
 
+%%--------------------------------------------------------------------
+%% Ensure Subscriptions
+
+ensure_subscribe_dn_topics(
+    Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint} = ClientInfo}
+) ->
+    SubOpts = ?DEFAULT_OCPP_DN_SUBOPTS,
+    Topic = dntopic(ClientId, Mountpoint),
+    ok = emqx_broker:subscribe(Topic, ClientId, SubOpts),
+    ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
+    Channel.
+
+dntopic(ClientId, Mountpoint) ->
+    Topic0 = proc_tmpl(
+        emqx_ocpp_conf:dntopic(),
+        #{
+            clientid => ClientId,
+            cid => ClientId
+        }
+    ),
+    emqx_mountpoint:mount(Mountpoint, Topic0).
+
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------