Creating compute plugins

This section of the user manual describes, how to make use of the Senseforce Edge compute interface and create your own compute plugins. Compute plugins are pieces of software, which connect between the incoming and outgoing edge data-stream and allow for virtually any calculations and stream processing.

For an overview about possible data flows and the general streaming architecture, please refer to the following page.

Using compute plugins

Plugins can be created by using C# with dotnet core 3.1 ff. The plugin binaries need to be built for netstandard2.0, .NET Core 3.0, .NET5.0 or .NET6.0.

The plugins are battle-tested for netstandard2.1. While targets for .NET Core 3.0 and .NET5.0/6.0 are supported, targeting netstandard2.1 is currently suggested.

Creating the plugin project from Senseforce Edge template nuget

To provide you with an easy and simple starting point for creating your Edge compute plugins, Senseforce provides a ready-to-use Edge compute template via nuget. Follow these steps to install the compute plugin template.

  1. Prerequisite: Install the dotnet CLI by install the dotnet SDK: Install .NET on Windows | Microsoft Docs

  2. Install the template by running: dotnet new --install Senseforce.Agent.Extensibility.Compute.Template::1.0.0 (This downloads and installs the plugin from nuget.org)

  3. Create the project from the template by running dotnet new edgecomputeplugin --name MyTest --myBatchSize 1 --output ./path/to/your/project/folder

    1. --name: Defines the name of your plugin as well as the main class of your plugin

    2. --myBatchSize: The sample plugin provides support for message batching on the plugin input. This means, the plugin waits until the defined number of messages are provided at the input and only triggers the compute-stages after the defined amount of messages are collected. Eg. setting this to 5 will wait for 5 messages until the compute blocks are triggered. Set this setting to 1 for default, non-batched plugin

    3. --output: Path to where the project will be created.

The results of the template installation should look as follows

with the C# code files being present in the src folder:

The basic plugin template is now installed. You can start developing.

Basic plugin development

The plugin template is created in a way, that all the necessary infrastructure for starting and stopping is already present. Each time a message arrives (or the amount of specified batched messages arrives - see “myBatchSize above) at the compute plugin, the method Compute() (located in Computer.cs) is called.

The method is called with an array of input messages. This array is of length of what you specified as myBatchSize(See chapter above).

Furthermore, the method provides a logger instance, allowing to connect to the Senseforce Edge logging infrastructure.

The return value of the method is automatically redirected to the Stream processor.

As described in the page "Using compute plugins", each message leaving one of the plugins needs an IngressKey. The plugin boiler plate automatically adds an IngressKey = [nameOfThePlugin]. If your plugin’s name is “Compute_demo”, the IngressKey of each outgoing message will be “Compute_demo”. Use this key in the EventDefinition.xml data routing configuration. If you want to change the IngressKey-value, you can do so by changing the messages.AddIngressKey() - parameter in the ComputeHandler.cs file, line 190

Using compute plugins

The following section shows the Compute-class of the template plugin. Simply add the calculations you want to execute here

using Microsoft.Extensions.Logging;
using Senseforce.Agent.Shared;

namespace Senseforce.Agent.Extensibility.Compute.Plugins.MyTest
{
    /// <summary>
    /// Class for compute algorithm.
    /// </summary>
    public static class Computer
    {        /// <summary>
        /// Compute method for compute plugins
        /// TODO: Add your compute algorithm here.
        /// </summary>
        /// <param name="messages">Array of messages used as input for your compute algorithm.</param> 
        /// <param name="logger">Instance of a logger; Optional.</param>
        /// <returns>EdgeEventObject containing the message data values.</returns> 
        public static EdgeEventObject[] Compute(EdgeEventObject[] messages, ILogger logger = null)        public static EdgeEventObject Compute(EdgeEventObject[] messages, ILogger logger = null)
        {
            // This example simply logs the incoming data-point "Measurement" and
            // redirects the first message of the incoming batch to the stream processor. 
            // Add whatever computation you like.
            logger?.LogInformation(messages[0].Data["Measurement"].ToString());
            return messages;
        }
    }
}

For most use cases you only need to add your algorithms to the Compute() method. The plugin boiler plate code will handle the rest.

Please make sure to use defensive coding techniques to prevent the plugin from exposing unhandled runtime exceptions. The Senseforce Edge is designed to not allow unhandled exceptions in plugins to prevent unwanted data loss. The Edge will shut down if an unhandled exception occurs.

Advanced plugin development

The following sections provide an in-depth overview about the sample plugin boiler-plate as well as a guide to change the compute plugin infrastructure (or create your own compute-plugins from scratch).

Template file overview

  • build.sh: Script to build the plugin. Run with any bash-like shell

  • ComputerPlugin.sln: Solution file, containing the plugin project

  • src/Configuration: Class and Interface for potential plugin configuration. Extend to your needs.

  • src/Dependencies: Senseforce Edge dependency binaries (Will be replaced by nuget packages)

  • src/Computer.cs: Contains the above mentioned Compute() method, providing an easy entrypoint for your plugin development

  • src/ComputeHandler.cs: Contains the main plugin boiler plate code

ComputeHandler.cs

The ComputeHandler - class contains the main boiler plate code for a Senseforce Edge compute plugin. The infrastructure for starting and stopping is provided as well as a TPL Dataflow data pipeline, connecting the plugin to the stream processor and furthermore redirecting the computed results to the output plugin(s).

For most use-cases it is sufficient to not touch this file. Only change code in here for more advanced applications.

The most important code areas of the template plugin are highlighted and described below.

Class

    /// <summary>
    /// Plugin handler for Hello World Plugin; Implementing the IComputePlugin-Interface
    /// The IComputePlugin-Interface provides a contract with an IPropagator interface, 
    /// See https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library for
    /// more information on how to implement the Microsoft TPL Dataflow and especially IPropagatorBlocks.
    /// A Senseforce Edge Compute plugin is an implementation of TPL Dataflow IPropagatorBlock.
    /// </summary>
    public class MyTestComputeHandler : IComputePlugin
    {

The ComputeHandler implements the Senseforce.Agent.Extensibility.Compute.Interface.IComputePlugin - interface. This is required to allow the Senseforce Edge main application to load the plugin.

As described in the classes comment, the compute plugin boiler plate basically creates a Microsoft TPL Dataflow propagator block. For more details see Microsoft’s documentation: https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library

Constructor

While you can change any behavior of the plugin, two things must not change:

  1. The plugin handler needs to implement the IComputePlugin - interface (as described above)

  2. The plugin handler’s constructor needs to have the same parameters as in the sample plugin. Otherwise the plugin can’t be loaded.

The first sections of the plugin’s constructor create the logging infrastructure and load the configuration file. (The parameter applicationConfigurationPath value equals the ConfigurationFile configuration of the plugin in appsettings.xml).

/// <summary>
/// Constructor for Plugin (needs to have these parameters).
/// How to load this plugin? Add the following sections to appsettings.xml "ComputePlugins"-Section:
/// (Note: We assume, the MyTest.dll is located in the LunaMainPath/Plugins/Compute/MyTest/MyTest.dll
/// and the according settings file (optional!!) is located in the [configuration path of Luna]/configurations/InputPlugins/MyTest/MyTestSettings.xml).
/// <ComputePlugin name="MyMyTestName">
/// <FilePath>$(RuntimePath)Plugins/Compute/MyTest/MyTest.dll</FilePath>
/// <TypeName>Senseforce.Agent.Extensibility.Compute.Plugins.MyTest.MyTestComputeHandler</TypeName>
/// <ConfigurationFile>$(LunaAppDataPath)configurations/InputPlugins/MyTest/MyTestSettings.xml</ConfigurationFile>
/// </ComputePlugin>
/// </summary>
/// <param name="name">Name of the plugin.</param>
/// <param name="configurationFile">configuration file path.</param>
/// <param name="cancellationToken">cancellation Token, used for stopping the plugin.</param>
/// <param name="offline">Flag, indicating, whether the offline-mode is active.</param>
/// <param name="applicationConfigurationPath">path to the application configuration pathh.</param>
/// <param name="logsDirectory">Path to the Luna logs directory.</param>
public MyTestComputeHandler(string name, string configurationFile, CancellationToken cancellationToken, bool offline, string applicationConfigurationPath, string logsDirectory)
{
    applicationConfigPath = applicationConfigurationPath;
    Name = name;
    ConfigurationFile = configurationFile;
    this.offline = offline;

    // Cancellation-Tokens for cancellation-Handling
    internalCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
    internalCancellationToken = internalCancellationTokenSource.Token;

    serviceProvider = new ServiceCollection()
        .AddLogging(builder => builder.SetMinimumLevel(LogLevel.Trace))
        .AddSingleton<IMyTestComputeSettings, MyTestComputeSettings>()
        .BuildServiceProvider();

    // Configuration
    try
    {
        var pathConfiguration = CommonPluginHelper.SubstituteSpecialPath(ConfigurationFile, applicationConfigPath);
        configuration = serviceProvider.GetService<IMyTestComputeSettings>()
            .Load(pathConfiguration);
    }
    catch (Exception e)
    {
        serviceProvider?.Dispose();
        throw e;
    }
    // Logging
    var loggerLogLevel = configuration.Logger.LogLevel;

    var fullLoggerPath = Path.GetFullPath(Path.Combine(logsDirectory, Name, Name + ".log"));

    var fileCountLimit = configuration.Logger.FileCountLimit == null ? 10 : int.Parse(configuration.Logger.FileCountLimit.ToString());

    serilogger = Infrastructure.Logging.Utils.CreateDefaultLogger(fullLoggerPath, loggerLogLevel, fileCountLimit);

    var loggerFactory = serviceProvider.GetService<ILoggerFactory>().AddSerilog(serilogger);
    logger = loggerFactory.CreateLogger<MyTestComputeHandler>();

    logger.LogInformation("++ ");
    logger.LogInformation("Starting Hello world compute plugin");

The second half of the constructor creates the TPL dataflow chain. Schematically, the following dataflow blocks are created and connected with each other.

  • BatchBlock: Waits for the specified amount of input messages until the next block is triggered. If you want to change the amount of messages to wait for, change the first parameter in the incomingMessagesBatchBlock = new BatchBlock<EdgeEventObject>() call. This first parameter gets set during template installation - with the value being the setting of myBatchSize. All data from input plugins are first received by the BatchBlock.

  • TransformBlock: Takes the incoming messages from the BatchBlock and calls the Compute() method of the Computer - class, each time the block is offered a message.

  • BufferBlock: Gets the output messages of the TransformBlock (--> return value of the Compute() - method) and redirects them to the stream processor and therefore output plugins. The BufferBlock is responsible for providing the stream process and therefore output plugins with the computed messages.

The BatchBlock and the TransformBlock are directly connected, meaning all messages coming to the BatchBlock are sent in batches to the TransformBlock. The TransformBlock however is not directly connected to the BufferBlock but only via the SendAsync-call in line 191 of the ComputeHandler class. In the template plugin, each message is first processed by the "Computer" - method and their output is directed to the BufferBlock via SendAsync. If you want to create totally new messages or don't use the "Computer"-method - simply call SendAsync() to the BufferBlock and the message you are sending to the BufferBlock is redirected to the stream processor and therefore the output plugins.

// Internal buffer block which we wrap as ISourceBlock<EdgeEventObject> (from IComputePlugin)
bufferBlockOut = new BufferBlock<EdgeEventObject>(
    new DataflowBlockOptions()
    {
        CancellationToken = internalCancellationToken,
    });

incomingMessagesBlock = new TransformBlock<EdgeEventObject[], EdgeEventObject>(
    (message) =>
{
    var messages = Computer.Compute(message, logger);
    messages.AddIngressKey(Name);
    return messages;
}, new ExecutionDataflowBlockOptions
{
    CancellationToken = internalCancellationToken,
    MaxDegreeOfParallelism = 1,
});

incomingMessagesBatchBlock = new BatchBlock<EdgeEventObject>(
    2,
    new GroupingDataflowBlockOptions { CancellationToken = internalCancellationToken });

((BatchBlock<EdgeEventObject>)incomingMessagesBatchBlock).LinkTo(incomingMessagesBlock, new DataflowLinkOptions
{
    PropagateCompletion = true,
});

incomingMessagesBlock.LinkTo(bufferBlockOut, new DataflowLinkOptions
{
    PropagateCompletion = true,
});

var completionTask = bufferBlockOut.Completion.ContinueWith(
    async task =>
    {
        // Trigger stop - also acts as fallback for non .Complete() or .Fault() scenarios.
        Stop();

        // TODO: You might await tasks to be completed here!

        // Set the completion source according to whether the task was canceled or faulted
        if (task.IsCanceled)
        {
            taskCompletionSource.TrySetCanceled();
        }
        else if (task.IsFaulted)
        {
            taskCompletionSource.TrySetException(task.Exception);
        }
        else
        {
            taskCompletionSource.TrySetResult(default);
        }
    },
    TaskContinuationOptions.DenyChildAttach);

The methods below the constructor are mainly used for starting up and stopping the plugin. You might change them to your special needs.

Configuration

The plugin boiler plate code implements to load a plugin configuration file.

The constructor parameter applicationConfigurationPath value equals the ConfigurationFile configuration of the plugin in appsettings.xml.

The configuration file at this location is expected to be an xml file. The plugin tries to serialize this xml file to the interface IComputeSettings.cs (found in src/Configuration/AppSettings/IMyTestComputeSettings.cs). In the default template, the only parameter to be serialized is a Senseforce.Agent.Extensibility.CommonPluginHelpers.AppSettings.Logger.

using Senseforce.Agent.Extensibility.CommonPluginHelpers.AppSettings;

namespace Senseforce.Agent.Extensibility.Compute.Plugins.MyTest.Configuration.AppSettings
{
    /// <summary>
    /// Interface providing contract for MsSql-configuration classes.
    /// </summary>
    public interface IMyTestComputeSettings
    {   
        /// <summary>
        /// Gets or sets the logger implementation.
        /// </summary>
        Logger Logger { get; set; }

        /// <summary>
        /// Method to load the MsSql plugin configuration file.
        /// </summary>
        /// <param name="filePath">
        /// File path to the MsSql plugin configuration file.
        /// </param>
        /// <returns>
        /// <see cref="IMyTestComputeSettings"/> object.
        /// </returns>
        IMyTestComputeSettings Load(string filePath);
    }
}

Extend the xml serialization Interface as well as the implementing class (src/Configuration/AppSettings/MyTestComputeSettings.cs) with any configuration parameter you desire.

Eg, if you want to load an additional parameter BatchSize, extend the configuration Interface as follows:

using Senseforce.Agent.Extensibility.CommonPluginHelpers.AppSettings;

namespace Senseforce.Agent.Extensibility.Compute.Plugins.MyTest.Configuration.AppSettings
{
    /// <summary>
    /// Interface providing contract for MsSql-configuration classes.
    /// </summary>
    public interface IMyTestComputeSettings
    {   
        /// <summary>
        /// Gets or sets the logger implementation.
        /// </summary>
        Logger Logger { get; set; }

        /// <summary>
        /// Gets or sets the logger implementation.
        /// </summary>
        int BatchSize { get; set; }

        /// <summary>
        /// Method to load the MsSql plugin configuration file.
        /// </summary>
        /// <param name="filePath">
        /// File path to the MsSql plugin configuration file.
        /// </param>
        /// <returns>
        /// <see cref="IMyTestComputeSettings"/> object.
        /// </returns>
        IMyTestComputeSettings Load(string filePath);
    }
}

For the template plugin to work, please make sure to NOT delete the Load() method.

Last updated