The Architect's Guide to the AIoT - Part 3

Asheesh Goja
Asheesh Goja

Wednesday, June 1st, 2022

"I may not have gone where I intended to go, but I think I have ended up where I needed to be." - HG2G

Introduction

In part 2 of this series, we took a deep dive into configuring a multi-tier reference infrastructure that can host an AIoT application. In the concluding part, we will see how to design and deploy a reference AIoT application on this infrastructure.

I will start by laying out the primary use case of this application. Then using detailed design artifacts such as event diagrams and deployment topology models I will explain the system design. I will show how to code, build, containerize, deploy and orchestrate AIoT modules and services as MLOps pipelines on ARM64 devices.

I will also show you how to design and code the IoT device firmware to perform inferences using the TFLM library. The electrical circuit schematics and calculations for the industrial IoT setup are also included in this post.

Reference AIoT Application

The reference application simulates a predictive maintenance scenario wherein an industrial induction motor is monitored by an AIoT application. In the real world, this motor could be powering a mud pump for an oil rig, a conveyor drive, or a cooling fan for a wind turbine’s thermal management system. wind-turbine-oil-rig

Primary use case

The primary use case of this application is summarized below:

Perception

  • Monitor an induction motor by sensing its power utilization, vibration, sound, and temperature. Capture the sensor data and digitize it.
  • To eliminate noise, pre-process the digitized sensor data, by passing it through an embedded FFT DSP filter.
  • Send the filtered data to an embedded logistic regression ML model that can make inferences about conditions that can lead to a potential motor failure.

Cognition

  • Send any abnormal or anomalous data to downstream systems that can detect and predict imminent motor failure for both preventative and predictive scenarios.
  • Send the filtered data to downstream systems for model training and drift detection MLOps pipelines.
  • Monitor drift and retrain the model.
  • Continuously deploy the trained model to all edge devices that perform inferences.

Action

  • If the model detects an imminent motor failure, initiate a closed-loop real-time decision to actuate a motor that powers a backup pump.

System Design

The system design that fulfils the requirements of the primary use cases is organized into three sections

Software

Firmware

Hardware

Software

Deployment Topology

The smallest deployable unit of this application is a container. Every microservice and module of this application is packaged as a container. The containers are managed by K3S and the container workflows by Argo.

In order to deploy on resource constrained devices a resource conserving technique is to use lightweight Distroless containers.

deployment-topology

Platform Tier Deployments

The deployment target hardware for the reference application services in this tier is one Nvidia Jetson Nano and two Raspberry Pi devices.

DeviceJetson Nano DevKit
Node Nameagentnode-nvidia-jetson
Cluster size1
AcceleratorGPU
OSUbuntu 18.04.6 LTS / 4.9.253-tegra
PackagingContainer
StorageLonghorn CSI Plugin
OrchestrationArgo Workflows
Workloads DeployedMLOps Training Pipeline

As an example let's see how a training task from the training pipeline gets deployed to this device. The training workload deployment is managed by the Argo workflow platform. The training pipelines and the dependencies are expressed as workflow DAGs. Each workload gets scheduled on the node with the required hardware accelerator using a declarative syntax of Kubernetes advanced scheduling directives such as pod affinity, node selectors, taints, and tolerations. See the section on labels and taints from the previous article.

nodeSelector:
  gpuAccelerator: "true"

The Train module is packaged as a container and then get deployed by the Argo workflow engine as a task in the ML Pipeline. Here is a snippet that shows how a containerized task is expressed as Argo DAG

- name: train-template
  inputs:
    parameters:
      - name: message
  container:
    image:
      docker.<IP Address>.nip.io:5000/training-module:latest
    env:
      - name: MODEL_REGISTRY_URL
        value: "https://<IP Address>:30007/uploadModel"
  nodeSelector:
    gpuAccelerator: "true"

The gpuAccelerator label ensures that the training workloads are scheduled only to nodes with GPU accelerators.

To see how the entire training pipeline is built and deployed, see this section

DeviceRaspberry Pi
Node Nameagentnode-raspi1
Cluster size1 of 2
AcceleratorNone
OSDebian GNU/Linux 10 (buster)/5.10.63-v8+
PackagingContainer
StorageLonghorn CSI Plugin
OrchestrationK3S
Services DeployedEmbedded Go MQTT Broker
MQTT-Kafka Protocol Bridge
Model Registry μService
Device Registry μService
Training Datastore μService
Data Ingest μService


DeviceRaspberry Pi
Node Nameagentnode-raspi12
Cluster size2 of 2
AcceleratorNone
OSDebian GNU/Linux 10 (buster)/5.10.63-v8+
StorageLonghorn CSI Plugin
Services DeployedK3S Server
Argo Workflows Server
Strimzi
Longhorn
Docker container registry

We get into more detail on how to build and deploy these services in the subsequent sections.

Inference Tier Deployments

The inference components of the reference app get deployed on two classes of devices - Coral Edge TPU and ESP32S.

DeviceCoral Dev Board
Node Nameagentnode-coral-tpu1, agentnode-coral-tpu2, agentnode-coral-tpu3
Cluster size3
AcceleratorTPU
OSMendel GNU/Linux 5 (Eagle)/ 4.14.98-imx
PackagingContainer
StorageLonghorn CSI Plugin
OrchestrationK3S
Workloads DeployedPyCoral TF Lite Inference Module
Go Streaming API Sidecar

The coral edge TPU deployments are managed by K3S and runs a PyCoral inference module that uses a streaming API sidecar to get event streams from the Kafka broker. The inference module and the sidecar are configured to run on the same pod and communicate over a TCP/IP socket. The required configuration is expressed in this YAML file.

Things Tier Deployments

DeviceESP32 SoC
AcceleratorNone
No. of devices1
OSESP-IDF FreeRTOS
PackagingFirmware
DeploymentESP32 OTA
Workloads DeployedTFLM Module

The primary deployment target on the things tier is the ESP32 SoC firmware that runs the ESP-IDF FreeRTOS. This is the only tier that does not use containerization and hence is not managed by K3S. The control and communication for modules within this tier are based on lightweight pub/sub using MQTT. The ESP32 firmware embeds a TFLM C++ inference module. After the initial flash of the firmware, the TF Lite model is downloaded from the model registry. The details on how to code, build and flash the firmware are in this section.

Events, endpoints, and dataflow

Using the reference architecture blueprint, this application is organized and modularized into multiple components that run on separate tiers and communicate using both synchronous and asynchronous APIs. Let's zoom into these interactions using a sequence diagram. In this sequence diagram, you will see how various components of the application interact and exchange messages in order to carry out the functionality of the primary use case.

Control and Data Topics

Various control and data interactions take place over two separate pub/sub brokers - MQTT and Kafka. The brokers are bridged together by a custom protocol bridge microservice. The control events, shown in green lines, flow on the control-topic, and the data messages, shown in purple lines, on the data-topic.

overall-dataflow

This diagram might seem complex and overwhelming, but I will walk you step by step and explain each interaction and message exchange in sufficient detail, tier by tier. So grab a cup of joe and stay with me - I am already on my third ristretto.

Things Tier events, endpoints, and dataflow

Components in this tier exchange messages exclusively over the MQTT broker. These messages are then relayed to the inference and platform tier components by the mqtt-kafka protocol bridge.

Events

Topic NameTypeModeProtocolFormatBroker
control-messagecontrolSubscribeMQTTJSONMQTT Broker
shaded-pole-motor-sensor_datadataPublishMQTTJSONMQTT Broker
motor-anomaly-level1-inferencedataPublishMQTTJSONMQTT Broker

EndPoints

NameURLType
Model OTA URLhttps://<HOST:30007>/quantizedConsumer
Device Activation URLhttps://<HOST:30006>/confirmActivationConsumer
MQTT Brokertcp://<MQTT BROKER>:30005Consumer

Data Flow

iot-flow

Here are the main interactions between the IoT components in this tier and the rest of the tiers:

  1. Device Activation: The device key is sent the to device registry server on the endpoint Device Activation URL. If the device is provisioned and activated the server responds back with a code. If not then the device is deactivated.
  2. New Model Message: The device subscribes to the control topic control-message. On this topic, any updated TF Lite model information is sent to the device as a JSON key-value pair. As an example, the device will get this message if the Argo training pipeline generates a new model. The name of the TF Lite model file "2022-05-06-20:36:11-model.tflite" is in the payload.
{
	"command": "download-model",
	"payload": "2022-05-06-20:36:11-model.tflite"
}
  1. Model OTA download: The TFLM module uses this filename to download the TF Lite model from the Model OTA endpoint. It then instantiates the model from the downloaded file and allocates memory for the model's tensors. Now, this module is ready to make inferences.
  2. Aggregate Data: The aggregator module aggregates the sensor data for vibration, temperature, current, and sound.
  3. FFT DSP Filter: The aggregated data is then sent to the FFT module. The FFT module samples the data applies a Fourier transform and returns the peaks.
  4. Filtered sensor data: The filtered sensor data is published as an MQTT message over the topic shaded-pole-motor-sensor_data. Here is an example of this message
{
	"deviceID": "14333616",
	"current": 26.56,
	"temperature": 32.81,
	"vibration": 32.81,
	"sound": 32.81,
	"fft_data": "true"
}
  1. Level 1 Inferences: The FFT preprocessed data is then sent to the TFLM module where it applies the logistic regression model to the data. This module returns an inference value between 0 and 1. If the inference value exceeds a certain threshold, this data is published to the broker as a “motor-anomaly-level1-inference” message. This inference performed in the things tier by the IoT device is referred to as the “Level 1 inference”.
  2. Actuator Control: If the inference value exceeds a certain threshold, indicative of motor failure, then the device calls the servo_controller module to activate the hydraulic servo.

Inference Tier events, endpoints, and dataflow

This tier has components running on two classes of devices - Coral Dev board (Linux/ARM) and ESP32S (RTOS/MCU) devices.

Events

Topic NameTypeModeProtocolFormatBroker
Coral Dev Board
shaded-pole-motor-sensor_dataDataSubscribeKafka StreamJSONKafka Broker
motor-failure-alert-level2-inferenceDataPublishKafka StreamJSONKafka Broker
control-messageControlSubscribeKafka StreamJSONKafka Broker
ESP32 SoC
shaded-pole-motor-sensor_dataDataSubscribeMQTTJSONMQTT Broker
control-messageControlSubscribeMQTTJSONMQTT Broker
motor-failure-alert-level2-inferenceDataPublishMQTTJSONMQTT Broker

EndPoints

NameURLType
Model OTA URLhttps://<HOST:30007>/quantizedConsumer
MQTT Brokertcp://<MQTT BROKER>:1883Consumer
Kafka Brokertcp://<Strimzi Endpoint>:32199Consumer

There are four main interactions between the ML inference components and the rest of the system:

Data Flow

inference-flow

  1. New Model Message: The devices in this tier subscribes to the control topic control-message. On this topic, any updated TFLite model information is sent to the device as a JSON key-value pair. As an example, the device will get this message if the MLOps pipeline generates a new model. The name of the model file is in the payload. Here is a sample control message
{
	"command": "download-model",
	"payload": "2022-05-06-20:36:11-model.tflite"
}
  1. TF Lite Model download: The model download is handled differently in each type of device.
    RTOS based device: The TFLM module uses this filename to download the TF Lite model from the Model OTA endpoint. It then instantiates the model from the downloaded file and allocates memory for the model's tensors.
    ARM Linux based device: The TFLite model is downloaded and then the model is used by the PyCoral modules to perform inferences.
  2. Subscribe to Sensor Data: The Linux ARM devices subscribe to a Kafka topic “shaded-pole-motor-sensor_data” and the RTOS based devices subscribe to an MQTT topic with the same name.
  3. Level 2 Inferences
    The ML inference components perform ”Level 2" inferences on the data received from the IoT components. If the ML modules detect an anomaly it triggers an alert that is published over a Kafka or MQTT topic named “motor-failure-alert-level2-inference”.

Platform Tier events, endpoints, and dataflow

The platform tier host various components service for the following services

  • Embedded Go MQTT Broker
  • MQTT-Kafka Protocol Bridge
  • Kafka/Strimzi
  • Model OTA Server
  • Model Registry μService
  • Device Registry μService
  • Training Datastore μService
  • Docker Registry Service
  • K3S
  • Argo Workflows
  • Longhorn

These services are hosted on a cluster comprising one Nvidia Jetson Nano and two Raspberry Pi SBCs. All the MLOps pipelines run on this platform and interact using the following events, endpoints, and dataflow.

Events

Topic NameTypeModeProtocolFormatBroker
shaded-pole-motor-sensor_dataDataSubscribeKafka StreamJSONKafka Broker
control-messageControlSubscribeKafka StreamJSONKafka Broker

EndPoints

NameVERBURLType
Training Datastore - Raw dataGEThttps://<HOST:30007>/<fileName >Provider
Training Datastore - Raw dataPOSThttps://<HOST:30007>/uploadProvider
Model Registry - Normalized dataPOSThttps://<HOST:30008>/uploadNormalizedDataProvider
Model Registry - Normalized dataGEThttps://<HOST:30008>/normalized_training_dataProvider
Model Registry - Frozen graphPOSThttps://<HOST:30008>/uploadModelProvider
Model Registry - Frozen graphGEThttps://<HOST:30008>/fullProvider
Model Registry - Quantized modelPOSThttps://<HOST:30008>/uploadQuantizedModelProvider
Model Registry - Quantized modelGEThttps://<HOST:30008>/quantizedProvider
MQTT Brokertcp://<MQTT BROKER>:30005Provider
Kafka Brokertcp://<Strimzi Endpoint>:32199Provider

The MLOps training tasks are orchestrated by Argo workflow and interact in the following sequences with the rest of the system:

Data Flow

training-flow

  1. Ingest sensor data: The ingest μService subscribes to the “shaded-pole-motor-sensor_data” kafka topic and aggregates all raw sensor data.
  2. Upload Training Data: This service uploads the aggregated sensor data to the Training Datastore endpoint. The Training Datastore μService persists the data on the "artifacts-registry-volm" PV. The PV is managed by the Longhorn Edge storage platform.
  3. Publish Control Message: The ingest μService publishes a control message extract-data that triggers the Argo workflow training pipeline. The message is in the following format.
{
	"command": "extract-data",
	"payload": "raw_sensor_training_data_2022-05-06T16:14:48:1651853688477.csv"
}
  1. Extract Training Data: The extract task subscribes to the control message extract-data. In response to this message, it downloads the training data from the Training Datastore endpoint and normalizes its contents.
  2. Upload Normalized Data: The extract task upload the normalized training data to the Model Registry - Normalized Training Data endpoint.
  3. Detect Drift: The "Detect Drift" task is triggered by Argo workflow training pipeline. This task downloads the normalized training data from the Model Registry - Normalized Training Data endpoint.
  4. Trigger Re-Training: If drift is detected this task triggers a new training job by publishing a control message "train-model".
  5. Train: The train task downloads the training data from the Model Registry - Normalized Training Data endpoint. It runs the training tasks and then creates a frozen graph.
  6. Upload Frozen Graph: The train task upload the zipped frozen graph to the Model Registry - Frozen Graph endpoint.
  7. Quantize: The quantize task downloads the frozen model from the Model Registry - Frozen Graph endpoint. It unzips the file and then quantizes it.
  8. Upload Quantized Model: The quantize task uploads the quantized model to Model Registry - Quantized Model endpoint.
  9. Publish Control Message: The quantize task publishes the download-model control message on the Kafka Broker. The MQTT-Kafka protocol bridge converts this message into an MQTT message and publishes it as a download-model MQTT control message. Any IoT devices in the things tier or any inference devices in the inference tier, that subscribe to this topic, will get this message. Such devices, in response to this message, will download the latest quantized model from the Model Registry - Quantized Model endpoint. Here is an example of this message
{
	"command": "download-model",
	"payload": "2022-05-06-20:36:11-model.tflite"
}

System Implementation - Code, Build and Deploy

The application is composed of several microservices, tasks, and orchestration workflows that operate concurrently on various infrastructure tiers. LEts see how to code, containerize, build and deploy these components.

Endpoint Security

All of the platform microservices will use the following TLS certificates. The certs will be mounted in the Kubernetes pod as secret volume.

TLS Certificates

Using OpenSSL create a public/private key pair

openssl genrsa -out server.key 2048
openssl req -new -x509 -sha256 -key server.key -out server.crt -days 30

TLS Secret

Using this pair create a kubernetes secret ssh-keys-secret

kubectl -n architectsguide2aiot create secret generic ssh-keys-secret --from-file=ssh-privatekey=server.key --from-file=ssh-publickey=server.crt

Secret volume

Configure the yaml file to mount the secret volume

spec:
  containers:
  volumeMounts:
    - name: secret-volume
      mountPath: /keys
  volumes:
    - name: secret-volume
      secret:
        secretName: ssh-keys-secret

Training MLOps pipeline tasks

Extract Task

Deployment Target : Platform Tier - Nvidia Jetson Nano Device

This module performs the following functions

  1. Waits for the kafka message extract-data on the topic control-message. This message payload contains the name of the file to extract.
  2. Downloads the training data file from the training datastore and extract the content.
  3. Normalized the data in preparation for the training task.
  4. Uploads the normalized training data file to the Model Registry - Normalized Training Data endpoint.

Code

rand.Seed(time.Now().UTC().UnixNano())
topic := lookupStringEnv("CONTROL-TOPIC", "control-message")
brokerAddress := lookupStringEnv("KAFKA-BROKER", "<IP Address>:32199")

kafka_reader := kafka.NewReader(kafka.ReaderConfig{
   Brokers: []string{brokerAddress},
   Topic:   topic,
})
ctx := context.Background()

downloadUrl := lookupStringEnv("RAW_TRAINING_DATA_DOWNLOAD_REGISTRY_URL", "https://localhost:8081/")
uploadUrl := lookupStringEnv("NORMALIZED_DATA_UPLOAD_REGISTRY_URL", "https://localhost:8080/uploadNormalizedData")

trainingFileName := listenForControlMessage(kafka_reader, ctx, topic)
fmt.Println(trainingFileName)

rawDataRows := downloadRawData(downloadUrl, trainingFileName)
if rawDataRows != nil {
   normalizedData := normalizeData(rawDataRows)
   uploadToModelRegistry(uploadUrl, normalizedData)

   fmt.Println(rawDataRows[0])
}

Build - ARM64 compatibility

In order to run this container on an Nvidia Jetson Nano device, which is ARM64, we need to build an ARM64 compatible image. We first cross-compile for ARM64 and then containerize it as a distroless container using the following Dockerfile

FROM golang as builder

ENV USER=appuser
ENV UID=10001

WORKDIR /app
COPY go.mod ./
RUN go mod download
COPY *.go ./
# Build the binary for ARM64
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags='-w -s -extldflags "-static"' -a -o /go/bin/extract main.go

FROM scratch
COPY --from=builder /go/bin/extract /go/bin/extract
EXPOSE 8080
ENTRYPOINT ["/go/bin/extract"]

Build the image, tag it and then pushed to the private docker registry

docker build -t extract_module .
docker tag extract_module:latest docker.<IP Address>.nip.io:5000/extract_module:latest
docker push docker.<IP Address>.nip.io:5000/extract_module:latest

Deploy

The Argo DAG specification then manages this container as a task in the workflow pipeline using the following configuration

- name: extract-template
inputs:
   parameters:
      - name: message
container:
   image: docker.<IP Address>.nip.io:5000/extract_module:latest
   securityContext:
      privileged: true
   env:
      - name: NORMALIZED_DATA_UPLOAD_REGISTRY_URL
      value: "https://<IP Address>:30007/uploadNormalizedData"
      - name: RAW_TRAINING_DATA_DOWNLOAD_REGISTRY_URL
      value: "https://<IP Address>:30008/"
      - name: CONTROL-TOPIC
      value: "control-message"
      - name: KAFKA-BROKER
      value: "<IP Address>:32199"
nodeSelector:
   kubernetes.io/hostname: "agentnode-raspi1"

Detect Drift Task

Deployment Target : Platform Tier - Nvidia Jetson Nano Device

Code

This module primarily uses the tensorflow_data_validation library along with the following libs

import tensorflow_data_validation as tfdv
import sys
import os
import urllib
import json
from kafka import KafkaProducer

This module performs the following functions:

  1. Download the training data from the Model Registry - Normalized Training Data endpoint.
  2. Run various validation routines on this data to detect drift.
train_stats = tfdv.generate_statistics_from_csv(data_location=training_data_set)
schema = tfdv.infer_schema(train_stats)
anomalies = tfdv.validate_statistics(statistics=train_stats, schema=schema)
if <drift criteria> > DRIFT_THRESHOLD :
    publishControlMessage(training_data_set)
  1. If the drift exceeds a certain threshold then publish a control message train-model
def publishControlMessage(trainingFileName):
      fileName = os.path.basename(trainingFileName)
      producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER)
      json_data = {"command": "train-model", "payload" : fileName}
      message = json.dumps(json_data)
      bytesMessage = message.encode()
      producer.send(CONTROL_TOPIC, bytesMessage )

Build

This module is containerized as a distroless container using the following Dockerfile

FROM debian:buster-slim AS build
RUN apt-get update &amp;&amp; \
    apt-get install --no-install-suggests --no-install-recommends --yes python3-venv gcc libpython3-dev &amp;&amp; \
    python3 -m venv /venv &amp;&amp; \
    /venv/bin/pip install --upgrade pip
FROM build AS build-venv
COPY requirements.txt /requirements.txt
RUN /venv/bin/pip install --disable-pip-version-check -r /requirements.txt

#  distroless python image
FROM gcr.io/distroless/python3-debian10
COPY --from=build-venv /venv /venv
COPY . /app
WORKDIR /app
ENTRYPOINT [&quot;/venv/bin/python3&quot;, &quot;validate.py&quot;]

Deploy

The Argo DAG specification then manages this container as task in the workflow pipeline using the following configuration

- name: detect-drift-template
  inputs:
    parameters:
      - name: message
  container:
    image:
      docker.<IP
      Address>.nip.io:5000/validation_module:latest
    env:
      - name: TRAINING_DATA_URL
        value:
          "https://<IP
          Address>:30007/normalized_training_data/"
  nodeSelector:
    kubernetes.io/hostname: ""

Train Task

Deployment Target : Platform Tier - Nvidia Jetson Nano Device

Code

The Train module uses the following libraries

import sklearn
import pandas
import tensorflow as tf

It's important to keep sklearn as the first import for Nvidia jetson nano and set the environment variable OPENBLAS_CORETYPE to ARMV8

And performs the following functions:

  1. Download the training data from the Model Registry - Normalized Training Data endpoint and trains the model.
dataset = dataframe.values
X = dataset[:,0:4].astype(float)
y = dataset[:,4]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)

model = Sequential()
model.add(Dense(60, input_dim=4, activation='relu'))
model.add(Dense(30, activation='relu'))
model.add(Dense(10, activation='relu'))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=int(EPOCS), batch_size=int(BATCH_SIZE), verbose=0)
zip_file_name = "{}/{}-model.zip".format(dir_path,datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') )
save_dir_name = "{}/{}-savedDir".format(dir_path,datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') )
model.save(save_dir_name)
  1. Package the model and related artifacts as a zip file.
  2. Uploads the frozen graph to the Model Registry server Model Registry - Frozen Graph endpoint.
upload_url = MODEL_REGISTRY_URL
test_response = requests.post(upload_url, files = {"file": model_file})

Build

This module is containerized using the nvidia l4t-tensorflow image.

FROM nvcr.io/nvidia/l4t-tensorflow:r32.6.1-tf2.5-py3

RUN mkdir /tensorflow
WORKDIR /tensorflow
COPY train.py .
COPY loop.sh .
COPY requirements.txt .

RUN pip3 install --upgrade setuptools
RUN python3 -m pip install --upgrade pip
RUN pip3 install -r requirements.txt
ENV OPENBLAS_CORETYPE ARMV8

CMD [ "python3", "/tensorflow/train.py" ]

Build the image, tag it and then push it to the private docker registry

docker build -t training-module .
docker tag training-module:latest docker.<IP Address>.nip.io:5000/training-module:latest
docker push docker.<IP Address>.nip.io:5000/training-module:latest

Deploy

The Argo DAG specification then manages this container as a task in the workflow pipeline using the following configuration. The environment vars for MODEL_REGISTRY_URL, EPOCS, BATCH_SIZE are specified in this file.

- name:  train-template
   inputs:
   parameters:
   container:
   image: docker.<IP Address>.nip.io:5000/training-module:latest
   env:
   - name: MODEL_REGISTRY_URL
      value: "https://<IP Address>:30007/uploadModel"
   - name: EPOCS
      value: "2"
   - name: BATCH_SIZE
      value: "32"
   nodeSelector:
      kubernetes.io/hostname: "agentnode-nvidia-jetson"

Quantize Task

Deployment Target : Platform Tier - Nvidia Jetson Nano Device

Code

This module performs the following functions:

  1. Downloads the frozen graph from the Model Registry - Frozen Graph endpoint.
  2. Unzips the frozen graph.
  3. Quantizes the model using the TFLiteConverter package.
  4. Uploads the quantized file to the Model Registry - Quantized Model endpoint.
quantized_file_name = "{}/{}-model.tflite".format(dir_path,datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') )
converter = tf.lite.TFLiteConverter.from_saved_model(fileName)
tflite_model = converter.convert()
open(quantized_file_name, "wb").write(tflite_model)

Build

This module is containerized using the nvidia l4t-tensorflow image.

FROM nvcr.io/nvidia/l4t-tensorflow:r32.6.1-tf2.5-py3
RUN mkdir /tensorflow
WORKDIR /tensorflow
COPY quantize.py .
COPY loop.sh .
COPY requirements.txt .
CMD [ &quot;python3&quot;, &quot;/tensorflow/quantize.py&quot; ]

Build the image, tag it and then push it to the private docker registry

docker build -t quantize-module .
docker tag quantize-module:latest docker.<IP Address>.nip.io:5000/quantize-module:latest
docker push docker.<IP Address>.nip.io:5000/quantize-module:latest

Deploy

The Argo DAG specification then manages this container as a task in the workflow pipeline using the following configuration

- name: quantize-template
  inputs:
    parameters:
      - name: message
  container:
    image:
      docker.<IP Address>.nip.io:5000/quantize-module:latest
    securityContext:
      privileged: true
    env:
      - name: MODEL_DOWNLOAD_REGISTRY_URL
        value: "https://<IP Address>:30007/full"
      - name: MODEL_UPLOAD_REGISTRY_URL
        value:
          "https://<IP Address>:30007/uploadQuantizedModel"
      - name: CONTROL-TOPIC
        value: "control-message"
      - name: KAFKA-BROKER
        value: "<IP Address>:32199"
  nodeSelector:
    kubernetes.io/hostname: "agentnode-nvidia-jetson"

Orchestrating Edge Learning Tasks - Argo DAGs

Deployment Target: Platform Tier - Raspberry Pi 4 Device

These edge learning tasks can now be assembled as a pipeline, and the pipeline can be expressed as a Directed Acyclic Graph (DAG).

workflow-dag

Deploy

Configure the tasks as steps (nodes) and the dependencies as (edges) between them in the Argo DAG YAML file.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: kubecon-aiotdemo-dag-
spec:
  entrypoint: kubecon-aiotdemo-dag
  templates:
    - name: kubecon-aiotdemo-dag
      dag:
        tasks:
          - name: extract
            template: extract-template
            arguments:
              parameters:
                - name: message
                  value: ""
          - name: detect-drift
            dependencies: [extract]
            template: detect-drift-template
            arguments:
              parameters:
                - name: message
                  value: ""
          - name: train
            dependencies: [detect-drift, extract]
            template: train-template
            arguments:
              parameters:
                - name: message
                  value: ""
          - name: quantize
            dependencies: [train]
            template: quantize-template
            arguments:
              parameters:
                - name: message
                  value: ""
    - name: extract-template
      inputs:
        parameters:
          - name: message
      container:
        image:
          docker.<IP
          Address>.nip.io:5000/extract_module:latest
        securityContext:
          privileged: true
        env:
          - name: NORMALIZED_DATA_UPLOAD_REGISTRY_URL
            value:
              "https://<IP
              Address>:30007/uploadNormalizedData"
          - name: GCP_BUCKET
            value: "architectsguide2aiot-aiot-mlops-demo"
          - name: RAW_TRAINING_DATA_DOWNLOAD_REGISTRY_URL
            value: "https://<IP Address>:30008/"
          - name: CONTROL-TOPIC
            value: "control-message"
          - name: KAFKA-BROKER
            value: "<IP Address>:32199"
        volumeMounts:
          - name: secret-volume
            mountPath: /keys
      nodeSelector:
        kubernetes.io/hostname: "agentnode-raspi1"
    - name: detect-drift-template
      inputs:
        parameters:
          - name: message
      container:
        image:
          docker.<IP
          Address>.nip.io:5000/validation_module:latest
        env:
          - name: TRAINING_DATA_URL
            value: "https://73.252.176.163:30007/normalized_training_data/"
      nodeSelector:
        kubernetes.io/hostname: ""
    - name: train-template
      inputs:
        parameters:
          - name: message
      container:
        image:
          docker.<IP
          Address>.nip.io:5000/training-module:latest
        env:
          - name: MODEL_REGISTRY_URL
            value: "https://<IP Address>:30007/uploadModel"
          - name: EPOCS
            value: "2"
          - name: BATCH_SIZE
            value: "32"
          - name: TRAINING_DATA_URL
            value:
              "https://<IP
              Address>:30007/normalized_training_data/"
      nodeSelector:
        kubernetes.io/hostname: "agentnode-nvidia-jetson"
    - name: quantize-template
      inputs:
        parameters:
          - name: message
      container:
        image:
          docker.<IP
          Address>.nip.io:5000/quantize-module:latest
        securityContext:
          privileged: true
        env:
          - name: MODEL_DOWNLOAD_REGISTRY_URL
            value: "https://<IP Address>:30007/full"
          - name: MODEL_UPLOAD_REGISTRY_URL
            value:
              "https://<IP
              Address>:30007/uploadQuantizedModel"
          - name: CONTROL-TOPIC
            value: "control-message"
          - name: KAFKA-BROKER
            value: "<IP Address>:32199"
      nodeSelector:
        kubernetes.io/hostname: "agentnode-nvidia-jetson"

Use the argo CLI to deploy the learning pipeline

argo submit -n kubecon2021 --serviceaccount argo --watch ../demo_DAG.yaml

This is what I see on my console

Name:                kubecon-aiotdemo-dag-2m5wz
Namespace:           architectsguide2aiot
ServiceAccount:      argo
Status:              Succeeded
Conditions:
 PodRunning          False
 Completed           True
Created:             Fri May 27 20:06:28 +0000 (7 minutes ago)
Started:             Fri May 27 20:06:28 +0000 (7 minutes ago)
Finished:            Fri May 27 20:13:40 +0000 (now)
Duration:            7 minutes 12 seconds
Progress:            4/4
ResourcesDuration:   12m48s*(100Mi memory),12m48s*(1 cpu)

STEP                           TEMPLATE               PODNAME                                DURATION  MESSAGE
 ✔ kubecon-aiotdemo-dag-2m5wz  kubecon-aiotdemo-dag
 ├─✔ extract                   extract-template       kubecon-aiotdemo-dag-2m5wz-3861752307  5m
 ├─✔ detect-drift              detect-drift-template  kubecon-aiotdemo-dag-2m5wz-2571201847  22s
 ├─✔ train                     train-template         kubecon-aiotdemo-dag-2m5wz-2497119492  31s
 └─✔ quantize                  quantize-template      kubecon-aiotdemo-dag-2m5wz-1935646649  28s

You can also see the status and be able to monitor and manage the workflow using the Argo Dashboard. Open the Argo console in your browser (make sure to follow the installation instructions from the previous post

argo-workflow

Ingest μService

Deployment Target : Platform Tier - Raspberry Pi 4 Device

The ingest microservice performs the following functions:

  1. Subscribes to the kafka data-message topic "shaded-pole-motor-sensor_data"
  2. Aggregates the sensor data and uploads it to the Training Datastore - Raw data endpoint
  3. Publishes a kafka control message extract-data on topic control-message with the name of the data file in the payload.

Code

This module uses the segmentio kafka go client to connect to the kafka broker.

msg, err := kafka_reader.ReadMessage(ctx)
var rawSensorData RawSensorData
json.Unmarshal([]byte(string(msg.Value)), &rawSensorData)

t := time.Now()
if rawSensorData.TimeStamp == "" {
  rawSensorData.TimeStamp = fmt.Sprintf("%02d",t.UnixNano()/int64(time.Millisecond))
}
if rawSensorData.DeviceID != "" {
  if counter == 0 { // create new file and write header
      f, err := os.Create(fileName)
      testDataFile = f
      if err != nil {
        panic(err)
      }
      testDataFile.WriteString("deviceID,timeStamp,current,temperature,vibration,sound\n")
  }
  testDataFile.WriteString(fmt.Sprintf("%s,%s,%.1f,%.1f,%.1f,%.1f\n", rawSensorData.DeviceID, rawSensorData.TimeStamp, rawSensorData.Current, rawSensorData.Temperature, rawSensorData.Vibration, rawSensorData.Sound))
  counter = counter + 1
  if maxRows == counter { // upload the file
      counter = 0 //reset
      testDataFile.Close()
      uploadFileToModelRegistry(fileName, upload_url)
      publishControlMessage(fileName, kafka_writer, ctx)
  }
}

Build - ARM64 compatibility

Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry

docker build -t ingest_service .
docker tag model-registry:latest docker.<IP Address>.nip.io:5000/ingest_service:latest
docker push docker.<IP Address>.nip.io:5000/ingest_service:latest

Deploy

The kubernetes.yaml file is configured to set the image name and the env vars as per the operating environment. The node selector label is set to run this service on one of the Raspberry Pi devices.

---
containers:
  - name: ingest-microservice
    image:
      docker.<IP Address>.nip.io:5000/ingest_service:latest
    env:
      - name: TRAINING_DATA_UPLOAD_REGISTRY_URL
        value: "https://<IP Address>:30008/upload"
      - name: MAX_ROWS
        value: "3000"
      - name: DATA-TOPIC
        value: "shaded-pole-motor-sensor_data"
      - name: CONTROL-TOPIC
        value: "control-message"
      - name: KAFKA-BROKER
        value: "<IP Address>:32199"
nodeSelector:
  kubernetes.io/hostname: "agentnode-raspi1"

Model Registry μService

Deployment Target : Platform Tier - Raspberry Pi 4 Device

The model registry services are exposed as the following REST endpoints:

NameVERBURL
Model Registry - Frozen GraphPOSThttps://<HOST:30008>/uploadModel
Model Registry - Frozen GraphGEThttps://<HOST:30008>/full
Model Registry - Quantized ModelPOSThttps://<HOST:30008>/uploadQuantizedModel
Model Registry - Quantized ModelGEThttps://<HOST:30008>/quantized
Model Registry - Normalized Training DataPOSThttps://<HOST:30008>/uploadNormalizedData
Model Registry - Normalized Training DataGEThttps://<HOST:30008>/normalized_training_data

Code

This Golang module uses handlers and Servemuxes from the http package.

mux := http.NewServeMux()

os.MkdirAll("./model_store/full", os.ModePerm)
os.MkdirAll("./model_store/quantized", os.ModePerm)
os.MkdirAll("./model_store/normalized_training_data", os.ModePerm)
os.MkdirAll("./model_store/OTA_bin", os.ModePerm)

mux.HandleFunc("/uploadModel", uploadModelHandler)
mux.HandleFunc("/uploadQuantizedModel", uploadModelQuantizedHandler)
mux.HandleFunc("/uploadNormalizedData", uploadTrainingDataHandler)

fileServerHtml := http.FileServer(http.Dir("model_store"))
mux.Handle("/", fileServerHtml)

log.Printf("Serving Model Registry on port: %s\n", port)
if err := http.ListenAndServeTLS(":"+port, "/keys/ssh-publickey", "/keys/ssh-privatekey", mux); err != nil {
  log.Fatal(err)
}

Build - ARM64 compatibility

Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry

docker build -t model-registry .
docker tag model-registry:latest docker.<IP Address>.nip.io:5000/model-registry:latest
docker push docker.<IP Address>.nip.io:5000/model-registry:latest

Deploy

The kubernetes.yaml file is configured to set the image name to the correct location and publish this app as a Kubernetes service to an external IP Address at port 30007 using a nodeport. Specify TLS certs volume mount correctly as shown in this section.

image: docker.<IP Address>.nip.io:5000/model-registry:latest

spec:
  type: NodePort
  selector:
    app: model-registry
  ports:
    - protocol: TCP
      port: 8080
      targetPort: 8080
      nodePort: 30007

Training Datastore μService

Deployment Target : Platform Tier - Raspberry Pi 4 Device

The training datastore services are exposed as the following REST endpoints:

NameVERBURL
Training Datastore - Raw dataGEThttps://<HOST:30008>/<fileName >
Training Datastore - Raw dataPOSThttps://<HOST:30008>/upload

Code

This Golang module uses handlers and Servemuxes from the http package.

mux := http.NewServeMux()
mux.HandleFunc("/upload", uploadTrainingDataHandler)

fileServerHtml := http.FileServer(http.Dir("/data/raw_training_data"))
mux.Handle("/", fileServerHtml)

port := lookupStringEnv("PORT" , "8081")
log.Printf("Serving TrainingDataStore service on port: %s\n", port)

if err := http.ListenAndServeTLS(":"+port, "/keys/ssh-publickey", "/keys/ssh-privatekey", mux); err != nil {
  log.Fatal(err)
}

Build - ARM64 compatibility

Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry

docker build -t training-datastore .
docker tag model-registry:latest docker.<IP Address>.nip.io:5000/training-datastore:latest
docker push docker.<IP Address>.nip.io:5000/training-datastore:latest

Deploy

The kubernetes.yaml file is configured to set the image name to the correct location and publish this app as a Kubernetes service to an external IP Address at port 30008 using a nodeport. Specify TLS certs volume mount correctly as shown in this section.

image:
  docker.<IP Address>.nip.io:5000/training-datastore:latest

spec:
  type: NodePort
  selector:
    app: model-registry
  ports:
    - protocol: TCP
      port: 8080
      targetPort: 8080
      nodePort: 30008

Device Registry μService

Deployment Target : Platform Tier - Raspberry Pi 4 Device

The device registry service provides services to provision new IoT devices and validates their activation:

NameVERBURL
Device Registry - Provision DevicePOSThttps://<IP Address>:30006/provisionDevice
Device Registry - Confirm ActivationPOSThttps://<IP Address>:30006/confirmActivation

Code

This Golang module uses handlers and Servemuxes from the http package.

mux := http.NewServeMux()
mux.HandleFunc("/confirmActivation", confirmActivation)
mux.HandleFunc("/provisionDevice", provisionDevice)

fileServerHtml := http.FileServer(http.Dir("/data/deviceRegistry"))
mux.Handle("/", fileServerHtml)

port := lookupStringEnv("PORT", "8082")

log.Printf("Serving device Registry on port: %s\n", port)

if err := http.ListenAndServeTLS(":"+port, "/keys/ssh-publickey", "/keys/ssh-privatekey", mux); err != nil {
  log.Fatal(err)
}

Build - ARM64 compatibility

Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry

docker build -t device-registry .
docker tag model-registry:latest docker.<IP Address>.nip.io:5000/device-registry:latest
docker push docker.<IP Address>.nip.io:5000/device-registry:latest

Deploy

The kubernetes.yaml file is configured to set the image name to the correct location and publish this app as a Kubernetes service to an external IP Address at port 30006 using a nodeport. Specify TLS certs volume mount correctly as shown in this section.

image: docker.<IP Address>.nip.io:5000/device-registry:latest
.
.
.
apiVersion: v1
kind: Service
metadata:
  name: device-registry-service
spec:
  type: NodePort
  selector:
    app: device-registry
  ports:
    - protocol: TCP
      port: 8080
      targetPort: 8080
      nodePort: 30006

Lightweight Pub/Sub Broker - Embedded MQTT broker setup

Deployment Target : Platform Tier - Raspberry Pi 4 Device

This microservice provides lightweight and high performance MQTT broker services.

Code

The implementation is based on an embedded open-source go MQTT broker .

Build - ARM64 compatibility

Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry

docker build -t go_amr64_mqtt_broker .
docker tag go_amr64_mqtt_broker:latest docker.<IP Address>.nip.io:5000/go_amr64_mqtt_broker:latest
docker push docker.<IP Address>.nip.io:5000/go_amr64_mqtt_broker:latest

Deploy

The kubernetes.yaml file is configured to set the image name to the correct location and publish this app as a Kubernetes service to an external IP Address at port 30005 using a nodeport.

---
spec:
  containers:
    - name: broker-container
      image:
        docker.<IP
        Adress>.nip.io:5000/go_amr64_mqtt_broker:latest
      ports:
        - containerPort: 1883
  nodeSelector:
    kubernetes.io/hostname: ""
---
apiVersion: v1
kind: Service
metadata:
  name: mqtt-broker-service
spec:
  type: NodePort
  selector:
    app: mqtt-broker
  ports:
    - protocol: TCP
      port: 1883
      targetPort: 1883
      nodePort: 30005

MQTT-Kafka Protocol bridge

Deployment Target : Platform Tier - Raspberry Pi 4 Device

This microservice bridges the MQTT messages with Kafka streams.

Code

This module uses the Sarama as the client library for Apache Kafka and Eclipse Paho as the client library for MQTT. We first need to set up and connect to the kafka broker as a publisher

kafka_topic := lookupStringEnv("KAFKA-TOPIC", "shaded-pole-motor-sensor_data")
kafka_brokerAddress := lookupStringEnv("KAFKA-BROKER", "<IP ADdress>:32199")

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)

producerConfig := sarama.NewConfig()
producerConfig.Producer.RequiredAcks = sarama.RequiredAcks(int16(1))
producerConfig.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer([]string{kafka_brokerAddress}, producerConfig)
end := make(chan int, 1)

Now we need to set up an MQTT subscription handler. The handler uses go channels to asynchronously wait for the MQTT message. When it receives an MQTT message it calls the Kafka publisher handler "publishMqttMessageToKafka" which gets passed to it as an anonymous function.

func startMqttSubscriber(opts *MQTT.ClientOptions, publishMqttMessageToKafka func(string)) {
	qos := 0
	mqtt_topic := lookupStringEnv("MQTT-TOPIC", "shaded-pole-motor-sensor_data")
	choke := make(chan [2]string)
	opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
		choke <- [2]string{msg.Topic(), string(msg.Payload())}
	})
	client := MQTT.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
	if token := client.Subscribe(mqtt_topic, byte(qos), nil); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}
	for {
		incoming := <-choke
		publishMqttMessageToKafka(incoming[1])
	}
	client.Disconnect(250)
}

Using this MQTT handler a closure is set up with the body of the anonymous function "publishMqttMessageToKafka" and publishes the kfka message

startMqttSubscriber(mqtt_opts, func(messageVal string) {
   msg := &sarama.ProducerMessage{
			Topic: kafka_topic,
			Value: sarama.StringEncoder(messageVal),
		}
  producer.SendMessage(msg)

Conversely, when this service gets a Kafka message, it publishes this message as an MQTT message.

mqtt_client := MQTT.NewClient(opts)
if token := mqtt_client.Connect(); token.Wait() && token.Error() != nil {
  panic(token.Error())
}
ctx := context.Background()

r := kafka.NewReader(kafka.ReaderConfig{
  Brokers: []string{kafka_broker},
  Topic:   kafka_topic,
})
for {
    msg, err := r.ReadMessage(ctx)
  if err != nil {
    panic("could not read message " + err.Error())
  }

  retry(100, 4, func() error {

    fmt.Println("Sample Publisher Started")
    token := mqtt_client.Publish(mqtt_pub_topic, byte(qos), false, string(msg.Value))
    token.Wait()
    return nil
  })
}

Build - ARM64 compatibility

Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry

docker build -t protocol_bridge .
docker tag protocol_bridge:latest docker.<IP Address>.nip.io:5000/protocol_bridge:latest
docker push docker.<IP Address>.nip.io:5000/protocol_bridge:latest

Deploy

The kubernetes.yaml file is configured to set the image name to the correct location and the env vars to the appropriate MQTT and Kafka settings

apiVersion: apps/v1
kind: Deployment
metadata:
  name: protocol-bridge-deployment
spec:
  selector:
    matchLabels:
      app: protocol-bridge
  replicas: 1
  template:
    metadata:
      labels:
        app: protocol-bridge
    spec:
      containers:
        - name: protocol-bridge
          image:
            docker.<IP
            Address>.nip.io:5000/protocol_bridge:latest
          env:
            - name: MQTT-BROKER
              value: "tcp://<IP Address>:30005"
            - name: MQTT-ID
              value: "architectsguide2aiot_mqtt-id"
            - name: DATA-TOPIC
              value: "shaded-pole-motor-sensor_data"
            - name: CONTROL-TOPIC
              value: "control-message"
            - name: KAFKA-BROKER
              value: "<IP Address>:32199"

The Inference Module

Deployment Target : Inference Tier - Coral Edge TPU Devices

The inference module running on the Coral Edge devkit cluster is built using the PyCoral API. This module gets events streams from the Kafka broker via a streaming API sidecar.

Code

Import the following PyCoral adapters

from pycoral.adapters import classify
from pycoral.adapters import common
from pycoral.utils.dataset import read_label_file
from pycoral.utils.edgetpu import make_interpreter

While testing on hardware that does not have a TPU accelerator, use the TF Lite imports and comment out the pycoral imports.

import numpy as np
 import tensorflow as tf
 import numpy as np

Subscribe to the control message and in response to the "download-model" message, get the latest TF Lite quantized model file from the Model Registry and use the Interpreter to load it

# use this for testing on non TPU h/w
# interpreter = tf.lite.Interpreter(model_path= dir_path + '/' + latest_model_file_name)
interpreter = make_interpreter(dir_path + '/' + latest_model_file_name)

Setup the input data, invoke the inference, and get the output

interpreter.allocate_tensors()

sensor_data = json.loads(msg, object_hook=lambda d: SimpleNamespace(**d))
sensor_data_arr = [np.float32(sensor_data.current), np.float32(sensor_data.temperature), np.float32(sensor_data.vibration), np.float32(sensor_data.sound)]
np_arr_64 = np.array(sensor_data_arr)
np_arr_f32 = np_arr_64.astype(np.float32)
inp_details = interpreter.get_input_details()
out_details = interpreter.get_output_details()

interpreter.set_tensor(inp_details[0]['index'], [sensor_data_arr])
interpreter.invoke()
output_details = interpreter.get_output_details()
predictions = interpreter.get_tensor(output_details[0]['index'])
output_index = interpreter.get_output_details()[0]["index"]
ten = interpreter.get_tensor(output_index)

inference_val = float(('%f' % ten))

The input data comes in a JSON object over a TCP/IP socket from a streaming API sidecar. The inference module setups a TCP server endpoint

HOST = '127.0.0.1'  # Standard loopback interface address (localhost)
PORT = int(SIDECAR_PORT)      # Port to listen on (non-privileged ports are > 1023)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.bind((HOST, PORT))
   s.listen()
   while True:
      try:
         conn, addr = s.accept()
         with conn:
            logToFile('Connected by ' + addr[0] )
            while True:
               data = conn.recv(1024).decode()
               logToFile("Received message from go client{}".format(data))
               if not data:
                  break

               ret = infer(interpreter, data)
               strRet = "inference value from the edge tpu = {}".format(ret)
               conn.sendall(strRet.encode())
      except Exception as inst:
         print(inst)

Streaming API Sidecar

This Golang module uses Segment Kafka-go package to connect to the broker. It subscribes to the Kafka topic and waits synchronously to receive the messages.

connHost   := "127.0.0.1"
connPort   := lookupStringEnv("PORT", "9898")
topic         := lookupStringEnv("TOPIC", "my-topic")
brokerAddress := lookupStringEnv("KAFKA-BROKER", "<IP Address>:32199")

ctx := context.Background()
r := kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{brokerAddress},
		Topic:   topic
	})
msg, err := r.ReadMessage(ctx)

A retry loop function is setup with an gradual backoff interval, to ensure guaranteed message delivery

func retry(attempts int, sleep time.Duration, fn func() error) error {
	if err := fn(); err != nil {
		if s, ok := err.(stop); ok {
			// Return the original error for later checking
			return s.error
		}

		if attempts--; attempts > 0 {
			time.Sleep(sleep * time.Second)
			return retry(attempts, 1*sleep, fn)
		}
		return err
	}
	return nil
}

This function in then used in a retry closure that sends the received messages as JSON objects to the Inference module over a the TCP/IP endpoint exposed by the inference module.

retry(100, 4, func() error {
			var socketConnection net.Conn
			socketConnection, err := net.Dial(connType, connHost+":"+connPort)
			buff := []byte(msg.Value)
			_, e := socketConnection.Write([]byte(buff))
			buff2 := make([]byte, 1024)
			n, _ := socketConnection.Read(buff2)
			log.Printf("Receive: %s", buff2[:n])
			socketConnection.Close()
			return nil
		})

Build

Inference Module

The inference module is containerized using a debian:buster image.

FROM debian:buster
RUN mkdir /coral
WORKDIR /coral
RUN apt update &amp;&amp; \
apt-get install curl gnupg ca-certificates -y
RUN echo &quot;deb https://packages.cloud.google.com/apt coral-edgetpu-stable main&quot; | tee /etc/apt/sources.list.d/coral-edgetpu.list
RUN echo &quot;deb https://packages.cloud.google.com/apt coral-cloud-stable main&quot; | tee /etc/apt/sources.list.d/coral-cloud.list
RUN curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add -
RUN apt-get update
RUN apt-get update &amp;&amp; \
    apt-get install python3  python3-pip -y
RUN apt-get install python3-pycoral -y

COPY loop.sh ./
COPY infer_tflite_socket.py ./
COPY requirements.txt ./

RUN pip3 install -r requirements.txt
EXPOSE 9898

CMD [ &quot;python3&quot;, &quot;/coral/infer_tflite_socket.py&quot; ]

ARM64 compatibility

In order to run this container on a coral TPU device, which is ARM64, we need to build an ARM64 compatible image. We do this by using docker buildx and following these steps

  1. Log into your docker hub account
  2. Use docker buildx to build an image for the ARM64 platform
  3. Push it to the Docker hub
  4. Pull the image from the Docker hub
  5. Tag the image and push it to the private docker registry
docker buildx build -t asheeshgoja/edge-tpu-inference-engine:latest --platform linux/arm64 --push  .
docker pull docker.io/asheeshgoja/edge-tpu-inference-engine:latest
docker tag docker.io/asheeshgoja/edge-tpu-inference-engine:latest docker.&lt;IP Address&gt;.nip.io:5000/edge-tpu-inference-engine:latest
docker push docker.&lt;IP Address&gt;.nip.io:5000/edge-tpu-inference-engine:latest

Streaming API Sidecar

This module is cross-compiled for ARM64 and then containerize it as a distroless container using the following two-stage docker build

# Stage 1
FROM golang

ENV USER=appuser
ENV UID=10001
WORKDIR /app

COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY main.go ./

# Build the binary
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags=&#39;-w -s -extldflags &quot;-static&quot;&#39; -a  -o /go/bin/golang_kafka_consumer main.go

# Stage 2
FROM scratch
COPY --from=builder /go/bin/golang_kafka_consumer /go/bin/golang_kafka_consumer

ENTRYPOINT [&quot;/go/bin/golang_kafka_consumer&quot;]

Build the image, tag it and then push it to the private docker registry

docker build -t golang-api-sidecar -f Dockerfile .
docker tag golang-api-sidecar:latest docker.<IP Address>.nip.io:5000/golang-api-sidecar:latest
docker push docker.<IP Address>.nip.io:5000/golang-api-sidecar:latest

Deploy

The inference module and the streaming API sidecar get deployed as co-containers on the same pod. They share a common file mount and communicate over a local TCP/IP link.

---
containers:
  - name: edge-tpu-inference-engine
    image:
      docker.<IP
      Address>.nip.io:5000/edge-tpu-inference-engine:latest
    securityContext:
      privileged: true
    ports:
      - containerPort: 9898
    env:
      - name: STREAM_GRP_ID
        value: work-load-B
      - name: SIDECAR_PORT
        value: "9898"
      - name: MODEL_REGISTRY_URL
        value: "https://<IP Address>:30007/quantized"
  - name: golang-api-sidecar
    image:
      docker.<IP
      Address>.nip.io:5000/golang-api-sidecar:latest
    ports:
      - containerPort: 9898
    securityContext:
      privileged: true
    env:
      - name: STREAM_GRP_ID
        value: "work-load-A"
      - name: PORT
        value: "9898"
      - name: TOPIC
        value: "shaded-pole-motor-sensor_data"
      - name: KAFKA-BROKER
        value: "<IP Address>:32199"
nodeSelector:
  tpuAccelerator: "true"

The tpuAccelerator: "true" label ensures that this pod gets scheduled only on TPU accelerated hardware.



Firmware

IoT Gateway

Deployment Target : Things Tier - ESP32 SoC

PlatformIO IDE setup

The IDE used to build the firmware is VSCode with the PlatformIO extension. Install this extension and then open the IDE from the iot-gateway-firmware folder. Set the board, ports, dependencies, and the baud correctly by using the following configuration in the platformio.ini file.

[env:esp32dev]
platform = espressif32
board = esp32dev
framework = arduino
upload_port = /dev/cu.SLAB_USBtoUART
monitor_port = /dev/cu.SLAB_USBtoUART
monitor_speed = 115200
lib_deps =
	knolleary/PubSubClient@^2.8.0
	openenergymonitor/EmonLib@^1.1.0
	bblanchon/ArduinoJson@^6.18.5

TLS Cert

Copy the contents of the server.crt created in the previous section into a static char array and name it reg_svr_pub_key. We will subsequently use this key in the HTTPClient operations.

const char* reg_svr_pub_key= \
"-----BEGIN CERTIFICATE-----\n" \
"MIICSDCCAc6gAwIBAgIUDXRzo8SpZJeJqZmgNP1BpyllvHkwCgYIKoZIzj0EAwIw\n" \
.
.
.
"-----END CERTIFICATE-----\n" ;

Code - TFLM Module

The inference module for the ESP32 MCU is built using the TensorFlow Lite for Microcontrollers C++ library. You need to first download and include the TFLM C++ libraries into your PlatformIO project under the lib folder. Your lib structure should look like this.

pio-lib-structure

Include the TFLM header files in your module

#include "tensorflow/lite/micro/all_ops_resolver.h"
#include "tensorflow/lite/micro/micro_error_reporter.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/schema/schema_generated.h"
#include "tensorflow/lite/version.h"

Download the latest TFLite file from the Model Registry endpoint ( in response to the control message download-model)

int downloadTFLiteModel(uint8_t **tfLiteModel, const char *tfLiteFileName)
{
  char tfliteFileURL[255] = {}; // the file name in the control message payload
  snprintf(tfliteFileURL, 255, "%s%s", modelRegistry, tfLiteFileName);
  pHTTPClient->begin(tfliteFileURL, reg_svr_pub_key);
  int httpResponseCode = pHTTPClient->GET();
  if (httpResponseCode == 200)
  {
      int len = 11148;
      *tfLiteModel = (byte *)malloc(len);
      pHTTPClient->getStream().readBytes(*tfLiteModel, len);
      return len;
  }
}

The tfLiteModel array is the TFLite quantized model and can now be used to perform the inferences. Map and load the model and its tensors

tflite::ErrorReporter *error_reporter;
const tflite::Model *model;
tflite::MicroInterpreter *interpreter;
TfLiteTensor *input;
TfLiteTensor *output;
int inference_count;
HTTPClient *pHTTPClient;
char modelRegistry[255];

model = tflite::GetModel(tfLiteModel);
static tflite::AllOpsResolver resolver;
static tflite::MicroInterpreter static_interpreter(model, resolver, tensor_arena, kTensorArenaSize, error_reporter);
interpreter = &static_interpreter;

// Allocate memory from the tensor_arena for the model's tensors.
TfLiteStatus allocate_status = interpreter->AllocateTensors();
// Obtain pointers to the model's input and output tensors.
input = interpreter->input(0);
output = interpreter->output(0);

The module is now ready to make inferences

input->data.f[0] = current;
input->data.f[1] = temperature;
input->data.f[2] = vibration;
input->data.f[3] = sound;

interpreter->Invoke();

return output->data.f[0]; // this is the inference val used to determine is the motor if faulty

Code - MQTT module

Add the PubSubClient@^2.8.0 library to your project. Then include the following header files

#include <PubSubClient.h>
#include <ArduinoJson.h>

Set up a callback handler for download-model messages on the control topic. Call the TFLM_Module::setNewModelFileName to download and map the new TFLite model.

char buf[255] = "";
String jsonMessage(buf);
StaticJsonDocument<255> jsonBuffer;
DeserializationError error = deserializeJson(jsonBuffer, jsonMessage);
const char *command = jsonBuffer["command"];
const char *cmd_payload = jsonBuffer["payload"];
if (strcmp(command, "download-model") == 0)
{
    TFLM_Module::setNewModelFileName(cmd_payload);
}

Code - FFT DSP Filter module

Get the Arduino FFT library and copy it to the lib folder. Add the following header file to your module and set the sample size and frequency to these values

#include "arduinoFFT.h"
const double signalFrequency = 1000;
const double samplingFrequency = 13095; // find using the function getMaxSamplingFrequency

Find the max sampling frequency of your device by using this code

long newTime = micros();
for (int i = 0; i < 1000000; i++)
{
    analogRead(ANY_ANALOG_PIN);
}
float t = (micros() - newTime) / 1000000.0;
long samplingFrequency =  (1.0 / t) * 1000000;

Record the sensor data and apply the FFT transform to get the major peak

ptrFFT->Windowing(vReal, MAX_SAMPLE_SIZE, FFT_WIN_TYP_HAMMING, FFT_FORWARD); /* Weigh data */
ptrFFT->Compute(vReal, vImag, MAX_SAMPLE_SIZE, FFT_FORWARD); /* Compute FFT */
ptrFFT->ComplexToMagnitude(vReal, vImag, MAX_SAMPLE_SIZE); /* Compute magnitudes */;
majorPeakParabola(vReal);

Code - Activation module

Use the chip ID of the ESP32S SoC as the device key.

uint32_t Activation_Module::getChipID()
{
  uint32_t c_id = 0;
  for (int i = 0; i < 17; i = i + 8)
  {
    c_id |= ((ESP.getEfuseMac() >> (40 - i)) & 0xff) << i;
  }
  return c_id;
}

Pass this key to the REST Device Registry endpoint https://<IP Address>:30006/confirmActivation and confirm activation

int Activation_Module::isDeviceActivated()
{
    uint32_t chipID = getChipID();
    pHTTPClient->begin(activationServer.c_str(), reg_svr_pub_key);
    pHTTPClient->addHeader("Content-Type", "application/x-www-form-urlencoded");
    char payload[255] = {};
    snprintf(payload, 255, "%s=%d", "device_id", chipID);
    int httpResponseCode = pHTTPClient->POST(payload);
    if (httpResponseCode == 200)
    {
        String activationCode = pHTTPClient->getString();
        pHTTPClient->end();
        return strcmp(activationCode.c_str() , "TRUE") == 0 ? 1 : 0;
    }
    else
    {
        pHTTPClient->end();
        return 0;
    }
    return 0;
}

Code - Servo module

Get the Esp32 Servo library and copy it to the lib folder. Add the header file to your module and use the Servo class to control the motor.

Servo_Module::Servo_Module(int pin)
{
    pHydraulic_valve_servo = new Servo();
    pHydraulic_valve_servo->attach(pin);
}
void Servo_Module::turnValveOn()
{
    for (int posDegrees = 0; posDegrees <= 180; posDegrees++)
    {
        pHydraulic_valve_servo->write(posDegrees);
        delay(20);
    }
}
void Servo_Module::turnValveOff()
{
    for (int posDegrees = 180; posDegrees >= 0; posDegrees--)
    {
        pHydraulic_valve_servo->write(posDegrees);
        delay(20);
    }
}

Code - Main Super Loop

Include all the libs and custom modules and the main file

#include "tflm_module.h"
#include "motor_sensors.h"
#include "mqtt_module.h"
#include "fft_module.h"
#include "servo_module.h"
#include "aggregation.h"
#include "activation_module.h"

//open source libs
HTTPClient http_client_activation , http_client_model;
WiFiClient espClient;

//custome modules
TFLM_Module tflm_module;
Sensors_Module sensors_module;
Mqtt_Module mqtt_module;
Fft_Module fft_module;
Servo_Module hydraulicValveController(SERVO_PIN);
Aggregation_Module aggregationModule;
Activation_Module activationModule;

Initialize the modules and confirm device activation

void setup()
{
  Serial.begin(115200);
  pinMode(LED_BUILTIN, OUTPUT);
  setupWifi();
  activationModule.init(&http_client_activation, deviceRegistry);
  chipId = activationModule.getChipID();
  isValidDevice = activationModule.isDeviceActivated();
  Serial.printf("Device Activation code from server = %d\n", isValidDevice);
  if (0 == isValidDevice)
  {
    Serial.printf("Device Activation failed!!\n");
  }
  tflm_module.init(&http_client_model, modelRegistry);
  sensors_module.init(TMP_PIN, CURRENT_PIN, VIBRATION_PIN, SOUND_PIN);
  mqtt_module.init(espClient, broker, broker_port, commandTopic, dataTopic, String(chipId));
  hydraulicValveController.turnValveOff();
}

In the super loop get the sensor data, apply the FFT filter and then the TFLM inference. If the inference value is above a certain threshold then turn on the servo controlling the hydraulic valve.

void loop()
{
  if (isValidDevice == 0)
  {
    Serial.printf("Device not registered, retrying ... !\n");
    delay(3000);
    isValidDevice = activationModule.isDeviceActivated();
    return;
  }

  mqtt_module.reconnect();
  float tempVal, vibrationVal, soundVal, currentVal;
  tempVal = vibrationVal = soundVal = currentVal = 0;
  int maxSampleSize = fft_module.getMaxSampleSize();
  int samplingPeriod = fft_module.getSamplingPeriod();
  long sampleStartMicros = 0;

  for (int i = 0; i < maxSampleSize; i++)
  {
    sampleStartMicros = micros();
    aggregationModule.aggregateData(sensors_module, currentVal, tempVal, vibrationVal, soundVal);
    fft_module.recordSample(currentVal, tempVal, vibrationVal, soundVal);
    while ((micros() - sampleStartMicros) < samplingPeriod)
    { /* spin */
    }
  }

  float temp_fft, current_fft, vibr_fft, sound_fft;
  temp_fft = current_fft = vibr_fft = sound_fft = 0;
  fft_module.perform_fft(&current_fft, &temp_fft, &vibr_fft, &sound_fft);
  char sensor_data_fft[255];
  snprintf(sensor_data_fft, 255, "{\"deviceID\": \"%d\", \"current\": %.2f, \"temperature\": %.2f, \"vibration\": %.2f, \"sound\": %.2f, \"fft_data\": \"true\"}",
           chipId, current_fft, temp_fft, vibr_fft, sound_fft);
  mqtt_module.publish(sensor_data_fft);
  fft_module.resetSampleCounter();
  float inference_val = tflm_module.predict(current_fft, temp_fft, vibr_fft, sound_fft);
  if (inference_val > 8.0)
  {
    hydraulicValveController.turnValveOn();
  }
}

Build and Flash the MCU

Connect the ESP32 devkit

Use the Upload and Build option to flash the firmware. You may need to toggle the reset button to set the device in flash mode. After the flashing is complete and the device starts, this is what I see on my serial monitor

{"deviceID": "14339204", "current": 567.15, "temperature": 28.30, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 549.70, "temperature": 28.30, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 541.80, "temperature": 28.30, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 549.33, "temperature": 27.91, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 529.03, "temperature": 28.40, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 508.29, "temperature": 29.77, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 1269.85, "temperature": 77.81, "vibration": 850.28, "sound": 635.15, "fft_data": "true"}
Publishing data success
TFLM inference val : 0.53, data : Current: 1269.85, Temp: 77.81, Vibration: 850.28, Sound: 635.15
Publishing data success

Hardware

Sensor placement

The IoT sensors sense the vibration, sound, temperature, and current on the motor. These sensors are placed on and around the motor.

iot-analog

  • The current sensor loops around the power cable and senses the current drawn by the motor. This signal is then sent through a voltage divider to the MCU.
  • The temperature sensor is placed on the motor in direct contact with the metal frame.
  • The vibration sensor is placed close to the motor shaft.
  • The sound sensor is also placed next to the motor shaft.

Components specs

ComponentPart Name
MCUESP32S SoC
Current SensorCT 013
Sound SensorMAX 4465
Vibration SensorCeramic Piezo Vibration Sensor
Induction MotorShaded Pole C-Frame Motor

Electrical circuit setup

Wire the sensors to the ESP32 GPIO analog ports

circuit_diagram

Burden resistor calculations

  1. The SCT-013 CT current sensor has a ramge of 0 - 100 A.
  2. Convert the max RMS to peak.
  3. Primary Peak = Irms × √2 = 100 A × 1.414 = 141.4A
  4. SCT-013 CT current has 2200 secondary turns
  5. Secondary peak current = Primary Peak / 2200 = 0.064 A
  6. ESP32 ref volage is 3.3 V
  7. Burden R = (3.3/2) / 0.064 = 25.78 Ω
  8. R1 and R2 are kept at 10 KΩ for low quiescent current consumption.

Source code

The source code for all the modules are in this git repo.

Conclusion

This post concludes the three part series on how to architect, build and deploy AIoT applications. I hope this series equips you with the principles, patterns, best practices, and tools necessary to venture into the universe of Edge AI and build "real-world" AIoT applications.

See ya on the Edge!

marvin