Kaynağa Gözat

Merge pull request #11169 from kjellwinblad/kjell/improved_sparkplugb/EMQX-10351

feat: add Sparkplug encode and decode functions to the rule engine
Kjell Winblad 2 yıl önce
ebeveyn
işleme
33cb29efdb

+ 24 - 0
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -84,6 +84,12 @@
     import_config/1
     import_config/1
 ]).
 ]).
 
 
+%% For setting and getting extra rule engine SQL functions module
+-export([
+    extra_functions_module/0,
+    set_extra_functions_module/1
+]).
+
 -define(RULE_ENGINE, ?MODULE).
 -define(RULE_ENGINE, ?MODULE).
 
 
 -define(T_CALL, infinity).
 -define(T_CALL, infinity).
@@ -542,3 +548,21 @@ get_egress_bridges(Actions) ->
         emqx_bridge_resource:bridge_id(BridgeType, BridgeName)
         emqx_bridge_resource:bridge_id(BridgeType, BridgeName)
      || {bridge, BridgeType, BridgeName, _ResId} <- Actions
      || {bridge, BridgeType, BridgeName, _ResId} <- Actions
     ].
     ].
+
+%% For allowing an external application to add extra "built-in" functions to the
+%% rule engine SQL like language. The module set by
+%% set_extra_functions_module/1 should export a function called
+%% handle_rule_function with two parameters (the first being an atom for the
+%% the function name and the second a list of arguments). The function should
+%% should return the result or {error, no_match_for_function} if it cannot
+%% handle the function. See '$handle_undefined_function' in the emqx_rule_funcs
+%% module. See also callback function declaration in emqx_rule_funcs.erl.
+
+-spec extra_functions_module() -> module() | undefined.
+extra_functions_module() ->
+    persistent_term:get({?MODULE, extra_functions}, undefined).
+
+-spec set_extra_functions_module(module()) -> ok.
+set_extra_functions_module(Mod) ->
+    persistent_term:put({?MODULE, extra_functions}, Mod),
+    ok.

+ 23 - 27
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -232,6 +232,10 @@
     timezone_to_offset_seconds/1
     timezone_to_offset_seconds/1
 ]).
 ]).
 
 
+%% See extra_functions_module/0 and set_extra_functions_module/1 in the
+%% emqx_rule_engine module
+-callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
+
 %% MongoDB specific date functions. These functions return a date tuple. The
 %% MongoDB specific date functions. These functions return a date tuple. The
 %% MongoDB bridge converts such date tuples to a MongoDB date type. The
 %% MongoDB bridge converts such date tuples to a MongoDB date type. The
 %% following functions are therefore only useful for rules with at least one
 %% following functions are therefore only useful for rules with at least one
@@ -1141,35 +1145,27 @@ timezone_to_second(TimeZone) ->
 timezone_to_offset_seconds(TimeZone) ->
 timezone_to_offset_seconds(TimeZone) ->
     emqx_calendar:offset_second(TimeZone).
     emqx_calendar:offset_second(TimeZone).
 
 
-%% @doc This is for sql funcs that should be handled in the specific modules.
-%% Here the emqx_rule_funcs module acts as a proxy, forwarding
-%% the function handling to the worker module.
-%% @end
--if(?EMQX_RELEASE_EDITION == ee).
-%% EE
-'$handle_undefined_function'(schema_decode, [SchemaId, Data | MoreArgs]) ->
-    emqx_ee_schema_registry_serde:decode(SchemaId, Data, MoreArgs);
-'$handle_undefined_function'(schema_decode, Args) ->
-    error({args_count_error, {schema_decode, Args}});
-'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) ->
-    %% encode outputs iolists, but when the rule actions process those
-    %% it might wrongly encode them as JSON lists, so we force them to
-    %% binaries here.
-    IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs),
-    iolist_to_binary(IOList);
-'$handle_undefined_function'(schema_encode, Args) ->
-    error({args_count_error, {schema_encode, Args}});
-'$handle_undefined_function'(sprintf, [Format | Args]) ->
-    erlang:apply(fun sprintf_s/2, [Format, Args]);
-'$handle_undefined_function'(Fun, Args) ->
-    error({sql_function_not_supported, function_literal(Fun, Args)}).
--else.
-%% CE
 '$handle_undefined_function'(sprintf, [Format | Args]) ->
 '$handle_undefined_function'(sprintf, [Format | Args]) ->
     erlang:apply(fun sprintf_s/2, [Format, Args]);
     erlang:apply(fun sprintf_s/2, [Format, Args]);
-'$handle_undefined_function'(Fun, Args) ->
-    error({sql_function_not_supported, function_literal(Fun, Args)}).
--endif.
+%% This is for functions that should be handled in another module
+%% (currently this module is emqx_ee_schema_registry_serde in the case of EE but
+%% could be changed to another module in the future).
+'$handle_undefined_function'(FunctionName, Args) ->
+    case emqx_rule_engine:extra_functions_module() of
+        undefined ->
+            throw_sql_function_not_supported(FunctionName, Args);
+        Mod ->
+            case Mod:handle_rule_function(FunctionName, Args) of
+                {error, no_match_for_function} ->
+                    throw_sql_function_not_supported(FunctionName, Args);
+                Result ->
+                    Result
+            end
+    end.
+
+-spec throw_sql_function_not_supported(atom(), list()) -> no_return().
+throw_sql_function_not_supported(FunctionName, Args) ->
+    error({sql_function_not_supported, function_literal(FunctionName, Args)}).
 
 
 map_path(Key) ->
 map_path(Key) ->
     {path, [{key, P} || P <- string:split(Key, ".", all)]}.
     {path, [{key, P} || P <- string:split(Key, ".", all)]}.

+ 26 - 0
changes/ee/feat-11169.en.md

@@ -0,0 +1,26 @@
+Two new built-in functions `sparkplug_decode` and `sparkplug_encode` have been added to the rule engine SQL like language. These functions are used to decode and encode Sparkplug B messages. The functions are used as follows:
+
+Decode a Sparkplug B message:
+
+```sql
+select
+  sparkplug_decode(payload) as decoded
+from t
+
+```
+
+Encode a Sparkplug B message:
+
+```sql
+select
+  sparkplug_encode(json_decode(payload)) as encoded
+from t
+```
+
+One can also specify a Sparkplug B message type by specifying it as the second argument to the `sparkplug_decode` and `sparkplug_encode` functions. The default is `Payload`:
+
+```sql
+select
+  sparkplug_encode(sparkplug_decode(payload, 'Payload'), 'Payload') as encoded
+from t
+```

+ 3 - 0
lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl

@@ -12,6 +12,9 @@
 -define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
 -define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
 -define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab).
 -define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab).
 
 
+-define(EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME,
+    <<"__CiYAWBja87PleCyKZ58h__SparkPlug_B_BUILT-IN">>
+).
 -type schema_name() :: binary().
 -type schema_name() :: binary().
 -type schema_source() :: binary().
 -type schema_source() :: binary().
 
 

+ 277 - 0
lib-ee/emqx_ee_schema_registry/priv/LICENSE

@@ -0,0 +1,277 @@
+Eclipse Public License - v 2.0
+
+    THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
+    PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION
+    OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
+
+1. DEFINITIONS
+
+"Contribution" means:
+
+  a) in the case of the initial Contributor, the initial content
+     Distributed under this Agreement, and
+
+  b) in the case of each subsequent Contributor:
+     i) changes to the Program, and
+     ii) additions to the Program;
+  where such changes and/or additions to the Program originate from
+  and are Distributed by that particular Contributor. A Contribution
+  "originates" from a Contributor if it was added to the Program by
+  such Contributor itself or anyone acting on such Contributor's behalf.
+  Contributions do not include changes or additions to the Program that
+  are not Modified Works.
+
+"Contributor" means any person or entity that Distributes the Program.
+
+"Licensed Patents" mean patent claims licensable by a Contributor which
+are necessarily infringed by the use or sale of its Contribution alone
+or when combined with the Program.
+
+"Program" means the Contributions Distributed in accordance with this
+Agreement.
+
+"Recipient" means anyone who receives the Program under this Agreement
+or any Secondary License (as applicable), including Contributors.
+
+"Derivative Works" shall mean any work, whether in Source Code or other
+form, that is based on (or derived from) the Program and for which the
+editorial revisions, annotations, elaborations, or other modifications
+represent, as a whole, an original work of authorship.
+
+"Modified Works" shall mean any work in Source Code or other form that
+results from an addition to, deletion from, or modification of the
+contents of the Program, including, for purposes of clarity any new file
+in Source Code form that contains any contents of the Program. Modified
+Works shall not include works that contain only declarations,
+interfaces, types, classes, structures, or files of the Program solely
+in each case in order to link to, bind by name, or subclass the Program
+or Modified Works thereof.
+
+"Distribute" means the acts of a) distributing or b) making available
+in any manner that enables the transfer of a copy.
+
+"Source Code" means the form of a Program preferred for making
+modifications, including but not limited to software source code,
+documentation source, and configuration files.
+
+"Secondary License" means either the GNU General Public License,
+Version 2.0, or any later versions of that license, including any
+exceptions or additional permissions as identified by the initial
+Contributor.
+
+2. GRANT OF RIGHTS
+
+  a) Subject to the terms of this Agreement, each Contributor hereby
+  grants Recipient a non-exclusive, worldwide, royalty-free copyright
+  license to reproduce, prepare Derivative Works of, publicly display,
+  publicly perform, Distribute and sublicense the Contribution of such
+  Contributor, if any, and such Derivative Works.
+
+  b) Subject to the terms of this Agreement, each Contributor hereby
+  grants Recipient a non-exclusive, worldwide, royalty-free patent
+  license under Licensed Patents to make, use, sell, offer to sell,
+  import and otherwise transfer the Contribution of such Contributor,
+  if any, in Source Code or other form. This patent license shall
+  apply to the combination of the Contribution and the Program if, at
+  the time the Contribution is added by the Contributor, such addition
+  of the Contribution causes such combination to be covered by the
+  Licensed Patents. The patent license shall not apply to any other
+  combinations which include the Contribution. No hardware per se is
+  licensed hereunder.
+
+  c) Recipient understands that although each Contributor grants the
+  licenses to its Contributions set forth herein, no assurances are
+  provided by any Contributor that the Program does not infringe the
+  patent or other intellectual property rights of any other entity.
+  Each Contributor disclaims any liability to Recipient for claims
+  brought by any other entity based on infringement of intellectual
+  property rights or otherwise. As a condition to exercising the
+  rights and licenses granted hereunder, each Recipient hereby
+  assumes sole responsibility to secure any other intellectual
+  property rights needed, if any. For example, if a third party
+  patent license is required to allow Recipient to Distribute the
+  Program, it is Recipient's responsibility to acquire that license
+  before distributing the Program.
+
+  d) Each Contributor represents that to its knowledge it has
+  sufficient copyright rights in its Contribution, if any, to grant
+  the copyright license set forth in this Agreement.
+
+  e) Notwithstanding the terms of any Secondary License, no
+  Contributor makes additional grants to any Recipient (other than
+  those set forth in this Agreement) as a result of such Recipient's
+  receipt of the Program under the terms of a Secondary License
+  (if permitted under the terms of Section 3).
+
+3. REQUIREMENTS
+
+3.1 If a Contributor Distributes the Program in any form, then:
+
+  a) the Program must also be made available as Source Code, in
+  accordance with section 3.2, and the Contributor must accompany
+  the Program with a statement that the Source Code for the Program
+  is available under this Agreement, and informs Recipients how to
+  obtain it in a reasonable manner on or through a medium customarily
+  used for software exchange; and
+
+  b) the Contributor may Distribute the Program under a license
+  different than this Agreement, provided that such license:
+     i) effectively disclaims on behalf of all other Contributors all
+     warranties and conditions, express and implied, including
+     warranties or conditions of title and non-infringement, and
+     implied warranties or conditions of merchantability and fitness
+     for a particular purpose;
+
+     ii) effectively excludes on behalf of all other Contributors all
+     liability for damages, including direct, indirect, special,
+     incidental and consequential damages, such as lost profits;
+
+     iii) does not attempt to limit or alter the recipients' rights
+     in the Source Code under section 3.2; and
+
+     iv) requires any subsequent distribution of the Program by any
+     party to be under a license that satisfies the requirements
+     of this section 3.
+
+3.2 When the Program is Distributed as Source Code:
+
+  a) it must be made available under this Agreement, or if the
+  Program (i) is combined with other material in a separate file or
+  files made available under a Secondary License, and (ii) the initial
+  Contributor attached to the Source Code the notice described in
+  Exhibit A of this Agreement, then the Program may be made available
+  under the terms of such Secondary Licenses, and
+
+  b) a copy of this Agreement must be included with each copy of
+  the Program.
+
+3.3 Contributors may not remove or alter any copyright, patent,
+trademark, attribution notices, disclaimers of warranty, or limitations
+of liability ("notices") contained within the Program from any copy of
+the Program which they Distribute, provided that Contributors may add
+their own appropriate notices.
+
+4. COMMERCIAL DISTRIBUTION
+
+Commercial distributors of software may accept certain responsibilities
+with respect to end users, business partners and the like. While this
+license is intended to facilitate the commercial use of the Program,
+the Contributor who includes the Program in a commercial product
+offering should do so in a manner which does not create potential
+liability for other Contributors. Therefore, if a Contributor includes
+the Program in a commercial product offering, such Contributor
+("Commercial Contributor") hereby agrees to defend and indemnify every
+other Contributor ("Indemnified Contributor") against any losses,
+damages and costs (collectively "Losses") arising from claims, lawsuits
+and other legal actions brought by a third party against the Indemnified
+Contributor to the extent caused by the acts or omissions of such
+Commercial Contributor in connection with its distribution of the Program
+in a commercial product offering. The obligations in this section do not
+apply to any claims or Losses relating to any actual or alleged
+intellectual property infringement. In order to qualify, an Indemnified
+Contributor must: a) promptly notify the Commercial Contributor in
+writing of such claim, and b) allow the Commercial Contributor to control,
+and cooperate with the Commercial Contributor in, the defense and any
+related settlement negotiations. The Indemnified Contributor may
+participate in any such claim at its own expense.
+
+For example, a Contributor might include the Program in a commercial
+product offering, Product X. That Contributor is then a Commercial
+Contributor. If that Commercial Contributor then makes performance
+claims, or offers warranties related to Product X, those performance
+claims and warranties are such Commercial Contributor's responsibility
+alone. Under this section, the Commercial Contributor would have to
+defend claims against the other Contributors related to those performance
+claims and warranties, and if a court requires any other Contributor to
+pay any damages as a result, the Commercial Contributor must pay
+those damages.
+
+5. NO WARRANTY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
+PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS"
+BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR
+IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF
+TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
+PURPOSE. Each Recipient is solely responsible for determining the
+appropriateness of using and distributing the Program and assumes all
+risks associated with its exercise of rights under this Agreement,
+including but not limited to the risks and costs of program errors,
+compliance with applicable laws, damage to or loss of data, programs
+or equipment, and unavailability or interruption of operations.
+
+6. DISCLAIMER OF LIABILITY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
+PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS
+SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST
+PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE
+EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGES.
+
+7. GENERAL
+
+If any provision of this Agreement is invalid or unenforceable under
+applicable law, it shall not affect the validity or enforceability of
+the remainder of the terms of this Agreement, and without further
+action by the parties hereto, such provision shall be reformed to the
+minimum extent necessary to make such provision valid and enforceable.
+
+If Recipient institutes patent litigation against any entity
+(including a cross-claim or counterclaim in a lawsuit) alleging that the
+Program itself (excluding combinations of the Program with other software
+or hardware) infringes such Recipient's patent(s), then such Recipient's
+rights granted under Section 2(b) shall terminate as of the date such
+litigation is filed.
+
+All Recipient's rights under this Agreement shall terminate if it
+fails to comply with any of the material terms or conditions of this
+Agreement and does not cure such failure in a reasonable period of
+time after becoming aware of such noncompliance. If all Recipient's
+rights under this Agreement terminate, Recipient agrees to cease use
+and distribution of the Program as soon as reasonably practicable.
+However, Recipient's obligations under this Agreement and any licenses
+granted by Recipient relating to the Program shall continue and survive.
+
+Everyone is permitted to copy and distribute copies of this Agreement,
+but in order to avoid inconsistency the Agreement is copyrighted and
+may only be modified in the following manner. The Agreement Steward
+reserves the right to publish new versions (including revisions) of
+this Agreement from time to time. No one other than the Agreement
+Steward has the right to modify this Agreement. The Eclipse Foundation
+is the initial Agreement Steward. The Eclipse Foundation may assign the
+responsibility to serve as the Agreement Steward to a suitable separate
+entity. Each new version of the Agreement will be given a distinguishing
+version number. The Program (including Contributions) may always be
+Distributed subject to the version of the Agreement under which it was
+received. In addition, after a new version of the Agreement is published,
+Contributor may elect to Distribute the Program (including its
+Contributions) under the new version.
+
+Except as expressly stated in Sections 2(a) and 2(b) above, Recipient
+receives no rights or licenses to the intellectual property of any
+Contributor under this Agreement, whether expressly, by implication,
+estoppel or otherwise. All rights in the Program not expressly granted
+under this Agreement are reserved. Nothing in this Agreement is intended
+to be enforceable by any entity that is not a Contributor or Recipient.
+No third-party beneficiary rights are created under this Agreement.
+
+Exhibit A - Form of Secondary Licenses Notice
+
+"This Source Code may also be made available under the following 
+Secondary Licenses when the conditions for such availability set forth 
+in the Eclipse Public License, v. 2.0 are satisfied: {name license(s),
+version(s), and exceptions or additional permissions here}."
+
+  Simply including a copy of this Agreement, including this Exhibit A
+  is not sufficient to license the Source Code under Secondary Licenses.
+
+  If it is not possible or desirable to put the notice in a particular
+  file, then You may include the notice in a location (such as a LICENSE
+  file in a relevant directory) where a recipient would be likely to
+  look for such a notice.
+
+  You may add additional accurate notices of copyright ownership.

+ 229 - 0
lib-ee/emqx_ee_schema_registry/priv/sparkplug_b.proto

@@ -0,0 +1,229 @@
+// Downloaded from: https://github.com/eclipse/tahu/blob/46f25e79f34234e6145d11108660dfd9133ae50d/sparkplug_b/sparkplug_b.proto
+//
+// License for this file is located in the same directory as this file.
+//
+// * Copyright (c) 2015, 2018 Cirrus Link Solutions and others
+// *
+// * This program and the accompanying materials are made available under the
+// * terms of the Eclipse Public License 2.0 which is available at
+// * http://www.eclipse.org/legal/epl-2.0.
+// *
+// * SPDX-License-Identifier: EPL-2.0
+// *
+// * Contributors:
+// *   Cirrus Link Solutions - initial implementation
+
+//
+// To compile:
+// cd client_libraries/java
+// protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto
+//
+
+
+syntax = "proto2";
+
+package org.eclipse.tahu.protobuf;
+
+option java_package         = "org.eclipse.tahu.protobuf";
+option java_outer_classname = "SparkplugBProto";
+
+enum DataType {
+    // Indexes of Data Types
+
+    // Unknown placeholder for future expansion.
+    Unknown         = 0;
+
+    // Basic Types
+    Int8            = 1;
+    Int16           = 2;
+    Int32           = 3;
+    Int64           = 4;
+    UInt8           = 5;
+    UInt16          = 6;
+    UInt32          = 7;
+    UInt64          = 8;
+    Float           = 9;
+    Double          = 10;
+    Boolean         = 11;
+    String          = 12;
+    DateTime        = 13;
+    Text            = 14;
+
+    // Additional Metric Types
+    UUID            = 15;
+    DataSet         = 16;
+    Bytes           = 17;
+    File            = 18;
+    Template        = 19;
+
+    // Additional PropertyValue Types
+    PropertySet     = 20;
+    PropertySetList = 21;
+
+    // Array Types
+    Int8Array = 22;
+    Int16Array = 23;
+    Int32Array = 24;
+    Int64Array = 25;
+    UInt8Array = 26;
+    UInt16Array = 27;
+    UInt32Array = 28;
+    UInt64Array = 29;
+    FloatArray = 30;
+    DoubleArray = 31;
+    BooleanArray = 32;
+    StringArray = 33;
+    DateTimeArray = 34;
+}
+
+message Payload {
+
+    message Template {
+
+        message Parameter {
+            optional string name        = 1;
+            optional uint32 type        = 2;
+
+            oneof value {
+                uint32 int_value        = 3;
+                uint64 long_value       = 4;
+                float  float_value      = 5;
+                double double_value     = 6;
+                bool   boolean_value    = 7;
+                string string_value     = 8;
+                ParameterValueExtension extension_value = 9;
+            }
+
+            message ParameterValueExtension {
+                extensions              1 to max;
+            }
+        }
+
+        optional string version         = 1;          // The version of the Template to prevent mismatches
+        repeated Metric metrics         = 2;          // Each metric includes a name, datatype, and optionally a value
+        repeated Parameter parameters   = 3;
+        optional string template_ref    = 4;          // MUST be a reference to a template definition if this is an instance (i.e. the name of the template definition) - MUST be omitted for template definitions
+        optional bool is_definition     = 5;
+        extensions                      6 to max;
+    }
+
+    message DataSet {
+
+        message DataSetValue {
+
+            oneof value {
+                uint32 int_value                        = 1;
+                uint64 long_value                       = 2;
+                float  float_value                      = 3;
+                double double_value                     = 4;
+                bool   boolean_value                    = 5;
+                string string_value                     = 6;
+                DataSetValueExtension extension_value   = 7;
+            }
+
+            message DataSetValueExtension {
+                extensions  1 to max;
+            }
+        }
+
+        message Row {
+            repeated DataSetValue elements  = 1;
+            extensions                      2 to max;   // For third party extensions
+        }
+
+        optional uint64   num_of_columns    = 1;
+        repeated string   columns           = 2;
+        repeated uint32   types             = 3;
+        repeated Row      rows              = 4;
+        extensions                          5 to max;   // For third party extensions
+    }
+
+    message PropertyValue {
+
+        optional uint32     type                    = 1;
+        optional bool       is_null                 = 2;
+
+        oneof value {
+            uint32          int_value               = 3;
+            uint64          long_value              = 4;
+            float           float_value             = 5;
+            double          double_value            = 6;
+            bool            boolean_value           = 7;
+            string          string_value            = 8;
+            PropertySet     propertyset_value       = 9;
+            PropertySetList propertysets_value      = 10;      // List of Property Values
+            PropertyValueExtension extension_value  = 11;
+        }
+
+        message PropertyValueExtension {
+            extensions                             1 to max;
+        }
+    }
+
+    message PropertySet {
+        repeated string        keys     = 1;         // Names of the properties
+        repeated PropertyValue values   = 2;
+        extensions                      3 to max;
+    }
+
+    message PropertySetList {
+        repeated PropertySet propertyset = 1;
+        extensions                       2 to max;
+    }
+
+    message MetaData {
+        // Bytes specific metadata
+        optional bool   is_multi_part   = 1;
+
+        // General metadata
+        optional string content_type    = 2;        // Content/Media type
+        optional uint64 size            = 3;        // File size, String size, Multi-part size, etc
+        optional uint64 seq             = 4;        // Sequence number for multi-part messages
+
+        // File metadata
+        optional string file_name       = 5;        // File name
+        optional string file_type       = 6;        // File type (i.e. xml, json, txt, cpp, etc)
+        optional string md5             = 7;        // md5 of data
+
+        // Catchalls and future expansion
+        optional string description     = 8;        // Could be anything such as json or xml of custom properties
+        extensions                      9 to max;
+    }
+
+    message Metric {
+
+        optional string   name          = 1;        // Metric name - should only be included on birth
+        optional uint64   alias         = 2;        // Metric alias - tied to name on birth and included in all later DATA messages
+        optional uint64   timestamp     = 3;        // Timestamp associated with data acquisition time
+        optional uint32   datatype      = 4;        // DataType of the metric/tag value
+        optional bool     is_historical = 5;        // If this is historical data and should not update real time tag
+        optional bool     is_transient  = 6;        // Tells consuming clients such as MQTT Engine to not store this as a tag
+        optional bool     is_null       = 7;        // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
+        optional MetaData metadata      = 8;        // Metadata for the payload
+        optional PropertySet properties = 9;
+
+        oneof value {
+            uint32   int_value                      = 10;
+            uint64   long_value                     = 11;
+            float    float_value                    = 12;
+            double   double_value                   = 13;
+            bool     boolean_value                  = 14;
+            string   string_value                   = 15;
+            bytes    bytes_value                    = 16;       // Bytes, File
+            DataSet  dataset_value                  = 17;
+            Template template_value                 = 18;
+            MetricValueExtension extension_value    = 19;
+        }
+
+        message MetricValueExtension {
+            extensions  1 to max;
+        }
+    }
+
+    optional uint64   timestamp     = 1;        // Timestamp at message sending time
+    repeated Metric   metrics       = 2;        // Repeated forever - no limit in Google Protobufs
+    optional uint64   seq           = 3;        // Sequence number
+    optional string   uuid          = 4;        // UUID to track message type in terms of schema definitions
+    optional bytes    body          = 5;        // To optionally bypass the whole definition above
+    extensions                      6 to max;   // For third party extensions
+}

+ 1 - 0
lib-ee/emqx_ee_schema_registry/rebar.config

@@ -4,6 +4,7 @@
 {deps, [
 {deps, [
   {emqx, {path, "../../apps/emqx"}},
   {emqx, {path, "../../apps/emqx"}},
   {emqx_utils, {path, "../../apps/emqx_utils"}},
   {emqx_utils, {path, "../../apps/emqx_utils"}},
+  {emqx_rule_engine, {path, "../../apps/emqx_rule_engine"}},
   {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}},
   {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}},
   {gpb, "4.19.7"}
   {gpb, "4.19.7"}
 ]}.
 ]}.

+ 4 - 1
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src

@@ -1,8 +1,11 @@
 {application, emqx_ee_schema_registry, [
 {application, emqx_ee_schema_registry, [
     {description, "EMQX Schema Registry"},
     {description, "EMQX Schema Registry"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, [emqx_ee_schema_registry_sup]},
     {registered, [emqx_ee_schema_registry_sup]},
     {mod, {emqx_ee_schema_registry_app, []}},
     {mod, {emqx_ee_schema_registry_app, []}},
+    {included_applications, [
+        emqx_rule_engine
+    ]},
     {applications, [
     {applications, [
         kernel,
         kernel,
         stdlib,
         stdlib,

+ 42 - 4
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl

@@ -232,11 +232,46 @@ create_tables() ->
     ok.
     ok.
 
 
 do_build_serdes(Schemas) ->
 do_build_serdes(Schemas) ->
+    %% We build a special serde for the Sparkplug B payload. This serde is used
+    %% by the rule engine functions sparkplug_decode/1 and sparkplug_encode/1.
+    ok = maybe_build_sparkplug_b_serde(),
     %% TODO: use some kind of mutex to make each core build a
     %% TODO: use some kind of mutex to make each core build a
     %% different serde to avoid duplicate work.  Maybe ekka_locker?
     %% different serde to avoid duplicate work.  Maybe ekka_locker?
     maps:foreach(fun do_build_serde/2, Schemas),
     maps:foreach(fun do_build_serde/2, Schemas),
     ?tp(schema_registry_serdes_built, #{}).
     ?tp(schema_registry_serdes_built, #{}).
 
 
+maybe_build_sparkplug_b_serde() ->
+    case get_schema(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME) of
+        {error, not_found} ->
+            do_build_serde(
+                ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME,
+                #{
+                    type => protobuf,
+                    source => get_schema_source(<<"sparkplug_b.proto">>)
+                }
+            );
+        {ok, _} ->
+            ok
+    end.
+
+get_schema_source(Filename) ->
+    {ok, App} = application:get_application(),
+    FilePath =
+        case code:priv_dir(App) of
+            {error, bad_name} ->
+                erlang:error(
+                    {error, <<"Could not find data directory (priv) for Schema Registry">>}
+                );
+            Dir ->
+                filename:join(Dir, Filename)
+        end,
+    case file:read_file(FilePath) of
+        {ok, Content} ->
+            Content;
+        {error, Reason} ->
+            erlang:error({error, Reason})
+    end.
+
 build_serdes(Serdes) ->
 build_serdes(Serdes) ->
     build_serdes(Serdes, []).
     build_serdes(Serdes, []).
 
 
@@ -251,9 +286,10 @@ build_serdes([{Name, Params} | Rest], Acc0) ->
 build_serdes([], _Acc) ->
 build_serdes([], _Acc) ->
     ok.
     ok.
 
 
-do_build_serde(Name0, #{type := Type, source := Source}) ->
+do_build_serde(Name, Serde) when not is_binary(Name) ->
+    do_build_serde(to_bin(Name), Serde);
+do_build_serde(Name, #{type := Type, source := Source}) ->
     try
     try
-        Name = to_bin(Name0),
         {Serializer, Deserializer, Destructor} =
         {Serializer, Deserializer, Destructor} =
             emqx_ee_schema_registry_serde:make_serde(Type, Name, Source),
             emqx_ee_schema_registry_serde:make_serde(Type, Name, Source),
         Serde = #serde{
         Serde = #serde{
@@ -270,7 +306,7 @@ do_build_serde(Name0, #{type := Type, source := Source}) ->
                 error,
                 error,
                 #{
                 #{
                     msg => "error_building_serde",
                     msg => "error_building_serde",
-                    name => Name0,
+                    name => Name,
                     type => Type,
                     type => Type,
                     kind => Kind,
                     kind => Kind,
                     error => Error,
                     error => Error,
@@ -280,11 +316,13 @@ do_build_serde(Name0, #{type := Type, source := Source}) ->
             {error, Error}
             {error, Error}
     end.
     end.
 
 
+ensure_serde_absent(Name) when not is_binary(Name) ->
+    ensure_serde_absent(to_bin(Name));
 ensure_serde_absent(Name) ->
 ensure_serde_absent(Name) ->
     case get_serde(Name) of
     case get_serde(Name) of
         {ok, #{destructor := Destructor}} ->
         {ok, #{destructor := Destructor}} ->
             Destructor(),
             Destructor(),
-            ok = mria:dirty_delete(?SERDE_TAB, to_bin(Name));
+            ok = mria:dirty_delete(?SERDE_TAB, Name);
         {error, not_found} ->
         {error, not_found} ->
             ok
             ok
     end.
     end.

+ 3 - 0
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl

@@ -10,6 +10,9 @@
 -export([start/2, stop/1]).
 -export([start/2, stop/1]).
 
 
 start(_StartType, _StartArgs) ->
 start(_StartType, _StartArgs) ->
+    %% Register rule engine extra functions module so that we can handle decode
+    %% and encode functions called from the rule engine SQL like language
+    ok = emqx_rule_engine:set_extra_functions_module(emqx_ee_schema_registry_serde),
     ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
     ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
     %% HTTP API handler
     %% HTTP API handler
     emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry),
     emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry),

+ 11 - 1
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl

@@ -42,7 +42,8 @@ fields(?CONF_KEY_ROOT) ->
                 ),
                 ),
                 #{
                 #{
                     default => #{},
                     default => #{},
-                    desc => ?DESC("schema_registry_schemas")
+                    desc => ?DESC("schema_registry_schemas"),
+                    validator => fun validate_name/1
                 }
                 }
             )}
             )}
     ];
     ];
@@ -89,6 +90,15 @@ union_member_selector_get_api(all_union_members) ->
 union_member_selector_get_api({value, V}) ->
 union_member_selector_get_api({value, V}) ->
     refs_get_api(V).
     refs_get_api(V).
 
 
+validate_name(NameSchemaMap) ->
+    case maps:is_key(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, NameSchemaMap) of
+        true ->
+            {error,
+                <<"Illegal schema name ", ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME/binary>>};
+        false ->
+            ok
+    end.
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% `minirest_trails' "APIs"
 %% `minirest_trails' "APIs"
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------

+ 49 - 10
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl

@@ -3,6 +3,8 @@
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 -module(emqx_ee_schema_registry_serde).
 -module(emqx_ee_schema_registry_serde).
 
 
+-behaviour(emqx_rule_funcs).
+
 -include("emqx_ee_schema_registry.hrl").
 -include("emqx_ee_schema_registry.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -15,13 +17,50 @@
     decode/3,
     decode/3,
     encode/2,
     encode/2,
     encode/3,
     encode/3,
-    make_serde/3
+    make_serde/3,
+    handle_rule_function/2
 ]).
 ]).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% API
 %% API
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
+-spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
+handle_rule_function(sparkplug_decode, [Data]) ->
+    handle_rule_function(
+        schema_decode,
+        [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data, <<"Payload">>]
+    );
+handle_rule_function(sparkplug_decode, [Data | MoreArgs]) ->
+    handle_rule_function(
+        schema_decode,
+        [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs]
+    );
+handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
+    decode(SchemaId, Data, MoreArgs);
+handle_rule_function(schema_decode, Args) ->
+    error({args_count_error, {schema_decode, Args}});
+handle_rule_function(sparkplug_encode, [Term]) ->
+    handle_rule_function(
+        schema_encode,
+        [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term, <<"Payload">>]
+    );
+handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
+    handle_rule_function(
+        schema_encode,
+        [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
+    );
+handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
+    %% encode outputs iolists, but when the rule actions process those
+    %% it might wrongly encode them as JSON lists, so we force them to
+    %% binaries here.
+    IOList = encode(SchemaId, Term, MoreArgs),
+    iolist_to_binary(IOList);
+handle_rule_function(schema_encode, Args) ->
+    error({args_count_error, {schema_encode, Args}});
+handle_rule_function(_, _) ->
+    {error, no_match_for_function}.
+
 -spec decode(schema_name(), encoded_data()) -> decoded_data().
 -spec decode(schema_name(), encoded_data()) -> decoded_data().
 decode(SerdeName, RawData) ->
 decode(SerdeName, RawData) ->
     decode(SerdeName, RawData, []).
     decode(SerdeName, RawData, []).
@@ -96,7 +135,7 @@ inject_avro_name(Name, Source0) ->
 -spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
 -spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
 make_protobuf_serde_mod(Name, Source) ->
 make_protobuf_serde_mod(Name, Source) ->
     {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
     {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
-    case lazy_generate_protobuf_code(SerdeMod0, Source) of
+    case lazy_generate_protobuf_code(Name, SerdeMod0, Source) of
         {ok, SerdeMod, ModBinary} ->
         {ok, SerdeMod, ModBinary} ->
             load_code(SerdeMod, SerdeModFileName, ModBinary),
             load_code(SerdeMod, SerdeModFileName, ModBinary),
             SerdeMod;
             SerdeMod;
@@ -121,30 +160,30 @@ protobuf_serde_mod_name(Name) ->
     SerdeModFileName = SerdeModName ++ ".memory",
     SerdeModFileName = SerdeModName ++ ".memory",
     {SerdeMod, SerdeModFileName}.
     {SerdeMod, SerdeModFileName}.
 
 
--spec lazy_generate_protobuf_code(module(), schema_source()) ->
+-spec lazy_generate_protobuf_code(schema_name(), module(), schema_source()) ->
     {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
     {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
-lazy_generate_protobuf_code(SerdeMod0, Source) ->
+lazy_generate_protobuf_code(Name, SerdeMod0, Source) ->
     %% We run this inside a transaction with locks to avoid running
     %% We run this inside a transaction with locks to avoid running
     %% the compile on all nodes; only one will get the lock, compile
     %% the compile on all nodes; only one will get the lock, compile
     %% the schema, and other nodes will simply read the final result.
     %% the schema, and other nodes will simply read the final result.
     {atomic, Res} = mria:transaction(
     {atomic, Res} = mria:transaction(
         ?SCHEMA_REGISTRY_SHARD,
         ?SCHEMA_REGISTRY_SHARD,
-        fun lazy_generate_protobuf_code_trans/2,
-        [SerdeMod0, Source]
+        fun lazy_generate_protobuf_code_trans/3,
+        [Name, SerdeMod0, Source]
     ),
     ),
     Res.
     Res.
 
 
--spec lazy_generate_protobuf_code_trans(module(), schema_source()) ->
+-spec lazy_generate_protobuf_code_trans(schema_name(), module(), schema_source()) ->
     {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
     {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
-lazy_generate_protobuf_code_trans(SerdeMod0, Source) ->
+lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) ->
     Fingerprint = erlang:md5(Source),
     Fingerprint = erlang:md5(Source),
     _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write),
     _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write),
     case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of
     case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of
         [#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] ->
         [#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] ->
-            ?tp(schema_registry_protobuf_cache_hit, #{}),
+            ?tp(schema_registry_protobuf_cache_hit, #{name => Name}),
             {ok, SerdeMod, ModBinary};
             {ok, SerdeMod, ModBinary};
         [] ->
         [] ->
-            ?tp(schema_registry_protobuf_cache_miss, #{}),
+            ?tp(schema_registry_protobuf_cache_miss, #{name => Name}),
             case generate_protobuf_code(SerdeMod0, Source) of
             case generate_protobuf_code(SerdeMod0, Source) of
                 {ok, SerdeMod, ModBinary} ->
                 {ok, SerdeMod, ModBinary} ->
                     CacheEntry = #protobuf_cache{
                     CacheEntry = #protobuf_cache{

+ 123 - 5
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl

@@ -21,13 +21,16 @@
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
 all() ->
 all() ->
-    [{group, avro}, {group, protobuf}].
+    [
+        {group, avro},
+        {group, protobuf}
+    ] ++ sparkplug_tests().
 
 
 groups() ->
 groups() ->
-    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(),
     ProtobufOnlyTCs = protobuf_only_tcs(),
     ProtobufOnlyTCs = protobuf_only_tcs(),
-    TCs = AllTCs -- ProtobufOnlyTCs,
-    [{avro, TCs}, {protobuf, AllTCs}].
+    TCs = AllTCsExceptSP -- ProtobufOnlyTCs,
+    [{avro, TCs}, {protobuf, AllTCsExceptSP}].
 
 
 protobuf_only_tcs() ->
 protobuf_only_tcs() ->
     [
     [
@@ -35,6 +38,13 @@ protobuf_only_tcs() ->
         t_protobuf_union_decode
         t_protobuf_union_decode
     ].
     ].
 
 
+sparkplug_tests() ->
+    [
+        t_sparkplug_decode,
+        t_sparkplug_encode,
+        t_sparkplug_decode_encode_with_message_name
+    ].
+
 init_per_suite(Config) ->
 init_per_suite(Config) ->
     emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
     emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
     emqx_mgmt_api_test_util:init_suite(?APPS),
     emqx_mgmt_api_test_util:init_suite(?APPS),
@@ -411,13 +421,18 @@ serde_deletion_calls_destructor_spec(#{serde_type := SerdeType}, Trace) ->
 
 
 protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) ->
 protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) ->
     #{nodes := Nodes} = Res,
     #{nodes := Nodes} = Res,
-    CacheEvents = ?of_kind(
+    CacheEvents0 = ?of_kind(
         [
         [
             schema_registry_protobuf_cache_hit,
             schema_registry_protobuf_cache_hit,
             schema_registry_protobuf_cache_miss
             schema_registry_protobuf_cache_miss
         ],
         ],
         Trace
         Trace
     ),
     ),
+    CacheEvents = [
+        Event
+     || Event <- CacheEvents0,
+        maps:get(name, Event, no_name) =/= ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME
+    ],
     ?assertMatch(
     ?assertMatch(
         [
         [
             schema_registry_protobuf_cache_hit,
             schema_registry_protobuf_cache_hit,
@@ -731,3 +746,106 @@ t_import_config(_Config) ->
         {ok, #{root_key => schema_registry, changed => [Path]}},
         {ok, #{root_key => schema_registry, changed => [Path]}},
         emqx_ee_schema_registry:import_config(RawConf1)
         emqx_ee_schema_registry:import_config(RawConf1)
     ).
     ).
+
+sparkplug_example_data_base64() ->
+    <<"CPHh67HrMBIqChxjb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xc2VjGPXh67HrMCACUKgDEikKHGNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxXzVzZWMY9eHrseswIAJQVBIqCh1jb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xMHNlYxj14eux6zAgAlAqEigKG2NvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3J1bhj14eux6zAgBVABEioKHWNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3Jlc2V0GPXh67HrMCAFUAAYWA==">>.
+
+sparkplug_example_data() ->
+    #{
+        <<"metrics">> =>
+            [
+                #{
+                    <<"datatype">> => 2,
+                    <<"int_value">> => 424,
+                    <<"name">> => <<"counter_group1/counter1_1sec">>,
+                    <<"timestamp">> => 1678094561525
+                },
+                #{
+                    <<"datatype">> => 2,
+                    <<"int_value">> => 84,
+                    <<"name">> => <<"counter_group1/counter1_5sec">>,
+                    <<"timestamp">> => 1678094561525
+                },
+                #{
+                    <<"datatype">> => 2,
+                    <<"int_value">> => 42,
+                    <<"name">> => <<"counter_group1/counter1_10sec">>,
+                    <<"timestamp">> => 1678094561525
+                },
+                #{
+                    <<"datatype">> => 5,
+                    <<"int_value">> => 1,
+                    <<"name">> => <<"counter_group1/counter1_run">>,
+                    <<"timestamp">> => 1678094561525
+                },
+                #{
+                    <<"datatype">> => 5,
+                    <<"int_value">> => 0,
+                    <<"name">> => <<"counter_group1/counter1_reset">>,
+                    <<"timestamp">> => 1678094561525
+                }
+            ],
+        <<"seq">> => 88,
+        <<"timestamp">> => 1678094561521
+    }.
+
+wait_for_sparkplug_schema_registered() ->
+    ?retry(
+        100,
+        100,
+        [_] = ets:lookup(?SERDE_TAB, ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME)
+    ).
+
+t_sparkplug_decode(_Config) ->
+    SQL =
+        <<
+            "select\n"
+            "  sparkplug_decode(payload) as decoded\n"
+            "from t\n"
+        >>,
+    PayloadBase64 = sparkplug_example_data_base64(),
+    {ok, _} = create_rule_http(#{sql => SQL}),
+    PayloadBin = base64:decode(PayloadBase64),
+    ExpectedRuleOutput =
+        #{<<"decoded">> => sparkplug_example_data()},
+    wait_for_sparkplug_schema_registered(),
+    emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
+    Res = receive_action_results(),
+    ?assertMatch(#{data := ExpectedRuleOutput}, Res),
+    ok.
+
+t_sparkplug_encode(_Config) ->
+    %% Default message name field is 'Payload'
+    SQL =
+        <<
+            "select\n"
+            "  sparkplug_encode(json_decode(payload)) as encoded\n"
+            "from t\n"
+        >>,
+    PayloadJSONBin = emqx_utils_json:encode(sparkplug_example_data()),
+    {ok, _} = create_rule_http(#{sql => SQL}),
+    ExpectedRuleOutput =
+        #{<<"encoded">> => base64:decode(sparkplug_example_data_base64())},
+    wait_for_sparkplug_schema_registered(),
+    emqx:publish(emqx_message:make(<<"t">>, PayloadJSONBin)),
+    Res = receive_action_results(),
+    ?assertMatch(#{data := ExpectedRuleOutput}, Res),
+    ok.
+
+t_sparkplug_decode_encode_with_message_name(_Config) ->
+    SQL =
+        <<
+            "select\n"
+            "  sparkplug_encode(sparkplug_decode(payload, 'Payload'), 'Payload') as encoded\n"
+            "from t\n"
+        >>,
+    PayloadBase64 = sparkplug_example_data_base64(),
+    PayloadBin = base64:decode(PayloadBase64),
+    {ok, _} = create_rule_http(#{sql => SQL}),
+    ExpectedRuleOutput =
+        #{<<"encoded">> => PayloadBin},
+    wait_for_sparkplug_schema_registered(),
+    emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
+    Res = receive_action_results(),
+    ?assertMatch(#{data := ExpectedRuleOutput}, Res),
+    ok.

+ 14 - 1
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl

@@ -12,6 +12,8 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
+-include("emqx_ee_schema_registry.hrl").
+
 -define(APPS, [emqx_conf, emqx_ee_schema_registry]).
 -define(APPS, [emqx_conf, emqx_ee_schema_registry]).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -182,7 +184,6 @@ t_crud(Config) ->
         }},
         }},
         request({delete, SchemaName})
         request({delete, SchemaName})
     ),
     ),
-
     %% create a schema
     %% create a schema
     ?assertMatch(
     ?assertMatch(
         {ok, 201, #{
         {ok, 201, #{
@@ -193,6 +194,18 @@ t_crud(Config) ->
         }},
         }},
         request({post, Params})
         request({post, Params})
     ),
     ),
+    %% Test that we can't create a schema with the special Sparkplug B name
+    %% (the special Sparkplug B name contains a random sequence of chars so
+    %% should be very unlikely that users try to do this)
+    ParmsWithForbiddenName = maps:put(
+        <<"name">>, ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Params
+    ),
+    ?assertMatch(
+        {ok, 400, #{
+            <<"code">> := <<"BAD_REQUEST">>
+        }},
+        request({post, ParmsWithForbiddenName})
+    ),
     ?assertMatch(
     ?assertMatch(
         {ok, 200, #{
         {ok, 200, #{
             <<"type">> := SerdeTypeBin,
             <<"type">> := SerdeTypeBin,