Streamlining Log Management with Fluvio and Logging Operator

Sandor Guba

Thursday, April 20th, 2023

5 min read

Fluvio is a data-streaming platform that allows you to transform your data stream in real-time using WASM plugins. Integrating Logging operator with Fluvio gives you a flexible data collection pipeline that can transform the data with these plugins. This post shows you how to do that.

Introduction to Fluvio

Fluvio is gaining popularity as a high-performance, scalable, and fault-tolerant real-time streaming platform. It is an open-source software framework that allows developers to build, deploy, and manage streaming data applications. In addition to being built on cloud-native principles and technologies, it has low resource footprint and latency, and provides similar guarantees to your data like other streaming platforms (for example, Apache Kafka).

Using Fluvio with Logging operator
Using Fluvio with Logging operator

Another interesting Fluvio feature is SmartModules. SmartModules exposes programmable data streaming functions using WebAssembly, which allows you to manipulate your data stream in real-time. The data stays within the Fluvio cluster, you don’t need to access any external services (like Lambda or Functions). Fluvio provides client libraries for several popular programming languages.


Using Fluvio with Logging operator has several benefits when logging and monitoring applications:

  • Immutable storage: Fluvio persists your data to avoid data loss and ensures that it cannot be altered.
  • Scalability: Fluvio’s distributed architecture allows it to scale horizontally to accommodate large amounts of data.
  • Real-time Processing: With Fluvio, you can process log data in real-time, identifying and addressing issues faster.
  • Centralized Log Management: By using Logging operator together with Fluvio, you can centralize log data from multiple sources, and aggregate them for easier analysis.
  • High Availability: Fluvio provides a highly available streaming platform, which helps ensure that log data is not lost in the event of system failures.
  • Customizable Logging: Using Logging operator with Fluvio gives you a flexible platform to customize your log processing and analysis to your specific logging requirements.


In this post, you’ll learn how to create a simple logging pipeline for your Kubernetes cluster to send your log data to Fluvio. The pipeline will do the following steps:

  1. Fluent Bit collects the logs from the cluster and sends them to a syslog-ng instance. You can use Logging operator to install and configure Fluent Bit and syslog-ng.
  2. Syslog-ng sends the incoming log messages to an MQTT broker. It would be better to send the logs directly to Fluvio’s syslog collector, unfortuntely that plugin is not ready for production use yet.
  3. Fluvio fetches the data from the MQTT broker using the MQTT connector. After that, Fluvio processes the data, and forwards it to one or more supported destination.

To implement this architecture on your Kubernetes cluster, you’ll need to:

  1. Install Fluvio
  2. Install an MQTT broker and an MQTT connector fro Fluvio
  3. Install and configure the Logging operator to collect the logs from the cluster

Install Fluvio

The Fluvio CLI (command-line interface) is an all-in-one tool for setting up, interacting, and managing with Fluvio clusters.

1. Install the Fluvio CLI by running the following command:

curl -fsS https://packages.fluvio.io/v1/install.sh | bash

2. Add ~/.fluvio/bin/ to your PATH variable.

3. Set your KUBECONFIG context to the cluster.

4. Start the Fluvio cluster by running the following command. (This can take a few minutes.)

fluvio cluster start

5. Verify the cluster. You can check the Fluvio cluster by checking the version and status with the following command: fluvio version

The output should look something like this:

Release Channel      : stable 
Fluvio CLI           : 0.10.2 
Fluvio CLI SHA256    : 61808537aa82f7dceb24cfa5cc112cbb98fe507688ebd317afae2fe44f2a0f5e 
Fluvio channel frontend SHA256 : b9a07efe2b251d77dd31d65639b1010b03fa1dd34524d957bcc2e5872f80ee65 
Fluvio Platform      : 0.10.2 (local) 
Git Commit           : 75be9c2003dbc22d3e8c2da20cb73841725b410a 
OS Details           : Darwin 13.1 (kernel 22.2.0) 

=== Plugin Versions === 

Fluvio Runner (fluvio-run)     : 0.0.0 
Infinyon Cloud CLI (fluvio-cloud) : 0.2.5

6. Configure port forwarding to the controller and the stream processor unit Fluvio services.

kubectl port-forward service/fluvio-sc-public 30003:9003
kubectl port-forward service/fluvio-sc-internal 30004:9005

7. Create a new topic called `log-transformer`:

fluvio topic create log-transformer

The output should be similar to:

topic "log-transformer" created

8. Send a test message:

echo "msg1" | fluvio produce log-transformer

9. Consume the test message from the topic:

fluvio consume log-transformer -B -d

The output should be similar to:

Consuming records from 'log-transformer' starting from the beginning of log 


Install MQTT broker

The MQTT broker will act as a mediator between Logging operator and Fluvio: Logging operator sends the messages to the MQTT broker. This example uses the [Eclipse Mosquitto](https://mosquitto.org) MQTT broker.

Install the Mosquitto MQTT broker by running the following commands:

helm repo add k8s-at-home https://k8s-at-home.com/charts/
helm repo update
helm install mosquitto k8s-at-home/mosquitto

Create a Fluvio MQTT Connector

Create an MQTT Connector, so Fluvio can fetch and process the messages from the MQTT broker.

1. Clone the fluvio-connectors repository and create an MQTT connector.

git clone https://github.com/infinyon/fluvio-connectors.git
cd fluvio-connectors

2. Create a YAML file called mqtt-connector.yaml for the log-transformer topic.

cat > mqtt-connector.yaml <<EOF
version: latest
name: my-mqtt-new
type: mqtt-source
topic: log-transformer
direction: source
create-topic: true
  mqtt_topic: "test/demo"
  payload_output_type: json
  MQTT_URL: mqtt://mosquitto:1883

3. Build the connector module and apply the mqtt-connector.yaml file:

cargo run --bin connector-run -- apply --config mqtt-connector.yaml

Wait a few minutes until the build is finished.

Install Logging operator

Install the Logging operator to collect the logs from your cluster and send them to the MQTT broker.

1. The easiest way is to install Logging operator with Helm.

helm repo add kube-logging https://kube-logging.github.io/helm-charts
helm repo update
helm upgrade --install --wait --create-namespace --namespace logging logging-operator kube-logging

2. Create a Logging resource.

kubectl apply -f - <<EOF
apiVersion: logging.banzaicloud.io/v1beta1
kind: Logging
  name: fluvio-test
  controlNamespace: default
  enableRecreateWorkloadOnImmutableFieldChange: true
    bufferStorage: {}
        path: ""
    bufferVolumeImage: {}
    filterKubernetes: {}
    image: {}
      storage.type: filesystem
        path: ""
    resources: {}
    updateStrategy: {}
    jsonKeyDelim: "~"

3. Create a SyslogNGOutput resource to instruct Logging operator to send the incoming messages to MQTT.

kubectl apply -f - <<EOF
apiVersion: logging.banzaicloud.io/v1beta1
kind: SyslogNGOutput
    name: mqtt
  namespace: default
    address: tcp://mosquitto:1883
    template: |
      $(format-json --subkeys json~ --key-delimiter ~)
    topic: test/demo

4. Create a SyslogNGFlow resource.

kubectl apply -f - <<EOF
apiVersion: logging.banzaicloud.io/v1beta1
kind: SyslogNGFlow
  name: testflow
  namespace: default
  - mqtt
  match: {}

Check the logs in Fluvio

Now that every piece of the logging pipeline is in place, you can consume messages from Fluvio again. Run:

fluvio consume log-transformer -B -d

The log messages of your cluster should appear in the topic.


Fluvio is an open source cloud native distributed streaming platform that provides similar assurances to Apache Kafka, but requires much lower resources. Its low footprint and the possibility to process data streams real time using WASM plugins makes it especially suitable for use in logging pipelines. This post has shown you how to build a simple logging pipeline using Fluvio and the Logging operator. In the future we hope that Fluvio will be able to receive data directly from Logging operator, without having to use an intermediary broker.