***********************
Flink Click Event Count
***********************
.. contents::
:depth: 2
This example is depicted in :ref:`flink_anatomy` and is based on the `Flink Operations Playground `_. This example consists of a
**Flink Cluster**, a **Kafka Cluster**, and **data producers**.
In this example **you will learn how to**:
- Configure a Flink Cluster and a Kafka Cluster.
- Scale the number of producers (scale in and scale out).
- Configure the network between producers and the Flink and Kafka clusters.
- Manage and run Flink Jobs and data producers.
- Check end-to-end execution.
.. _flink_anatomy:
.. figure:: flink_click_event_count/flink_anatomy.png
:width: 100%
:align: center
Figure 1: Anatomy
Experiment Artifacts
====================
.. code-block:: bash
$ cd ~/git/
$ git clone https://gitlab.inria.fr/E2Clab/examples/flink-kafka
$ cd flink-kafka/
$ ls
artifacts # contains Flink job (ClickCountJob.jar) and lib/
getting_started # contains layers_services.yaml, network.yaml, and workflow.yaml
Defining the Experimental Environment
=====================================
Layers & Services Configuration
-------------------------------
This configuration file presents the **layers** and **services** that compose this example. The **Flink Cluster** (three nodes,
``quantity: 3``) is composed of one *Job Manager* and two *Task Managers*. The **Kafka Cluster** (single node, ``quantity: 1``) consists of a
Zookeeper server and a Kafka Broker. Lastly, we have the data producers (two nodes, ``quantity: 1`` and ``repeat: 1``).
Note that, as we defined ``monitoring: type: dstat``, all services will be monitored ``roles: [monitoring]`` during the execution of
the experiments using ``dstat``.
.. literalinclude:: flink_click_event_count/layers_services.yaml
:language: yaml
:linenos:
**User-Defined Service:** add the following services in the ``~/git/e2clab/e2clab/services/`` directory.
- `Flink `_
- `Kafka `_
Network Configuration
---------------------
The file below presents the network configuration between the ``cloud`` and ``edge`` infrastructures ``delay: 5ms, loss: 2%, rate: 1gbit``.
.. literalinclude:: flink_click_event_count/network.yaml
:language: yaml
:linenos:
Workflow Configuration
----------------------
This configuration file presents the application workflow configuration.
- Regarding *Flink Job Manager* ``cloud.flink.*.job_manager``, in ``prepare`` we are copying from the local machine to the remote machine the *Flink Job*. In ``launch`` we are copying it to the container and starting the *Flink job*.
- Regarding *producers* ``edge.producer.*``, in ``prepare`` we are copying the *producer java application* and its *libraries* and then, in ``launch`` we are starting four java processes. We used the ``depends_on`` attribute to interconnect all *producers* to the *Kafka server*. In this case, they will be grouped in *round robin* ``grouping: "round_robin"`` and as presented in ``shell``, *producers* were configured to send data to *Kafka* ``{{ kafka.url }}``.
.. literalinclude:: flink_click_event_count/workflow.yaml
:language: yaml
:linenos:
Running & Verifying Experiment Execution
========================================
Find below the commands to deploy this application and check its execution. Once deployed, you can check in the **Flink WebUI**
the configurations that we have defined in the ``layers-services.yaml`` and ``workflow.yaml`` configuration files, such as the
``number of Task Managers``, ``total task slots``, ``Job Manager and Task manager heap size``, ``job parallelism``, etc.
(see :ref:`flink_webui`, :ref:`flink_configs`, and :ref:`flink_running_job`). Besides, you can also check the records
written to the **Kafka Topics** (``input`` and ``output`` topics).
- Running multiple experiments: in this example we are running four experiments
``--repeat 3``, each one with a duration of four minutes ``--duration 240``.
.. code-block:: bash
$ e2clab deploy --repeat 3 --duration 240
~/git/flink-kafka/getting_started/
~/git/flink-kafka/artifacts/
- Verifying experiment execution.
.. code-block:: bash
# Discover hosts in /layers_services-validate.yaml file
$ cat layers_services-validate.yaml
# Tunnel to Flink Job Manager (cloud.flink.1.job_manager.1)
$ ssh -NL 8081:localhost:8081 .rennes.grid5000.fr
# SSH to Kafka Cluster (cloud.kafka.leader.1)
$ ssh .rennes.grid5000.fr
# Kafka input topic
$ sudo docker exec -it leader /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic input
{"timestamp":"01-01-1970 12:04:32:235","page":"/news"}
{"timestamp":"01-01-1970 12:04:33:480","page":"/help"}
{"timestamp":"01-01-1970 12:04:33:255","page":"/about"}
{"timestamp":"01-01-1970 12:04:32:565","page":"/jobs"}
{"timestamp":"01-01-1970 12:04:32:745","page":"/jobs"}
{"timestamp":"01-01-1970 12:04:32:565","page":"/about"}
# Kafka output topic
$ sudo docker exec -it leader /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic output
{"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/index","count":7884}
{"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/about","count":7882}
{"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/jobs","count":7882}
{"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/news","count":7880}
{"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/shop","count":7884}
{"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/help","count":7885}
- In :ref:`flink_webui` you can check: the two *Task Managers* configured with *16 task slots* each ``taskmanager.numberOfTaskSlots: 16`` resulting in ``Total Task Slots 32``.
.. _flink_webui:
.. figure:: flink_click_event_count/flink_webui.png
:width: 100%
:align: center
Figure 2: Flink WebUI
- In :ref:`flink_configs` you can check: ``jobmanager.heap.size: 2048m``, ``taskmanager.heap.size: 768m``, and ``taskmanager.numberOfTaskSlots: 16``.
.. _flink_configs:
.. figure:: flink_click_event_count/flink_configs.png
:width: 100%
:align: center
Figure 3: Flink configurations
- In :ref:`flink_running_job` you can check: the running job with ``parallelism: 32``.
.. _flink_running_job:
.. figure:: flink_click_event_count/flink_running_job.png
:width: 100%
:align: center
Figure 4: Flink Job
Deployment Validation & Experiment Results
==========================================
Find below the files generated after the execution of each experiment. It consists of **validation files**
``layers_services-validate.yaml``, ``network-validate/``, and ``workflow-validate.out``, and monitoring data ``monitoring-data/``.
Note that, for each experiment a new directory is generated ``yyyymmdd-hhmmss/``.
.. code-block:: bash
$ ls ~/git/flink-kafka/getting_started//
monitoring-data/ # Monitoring data for each physical machine
layers_services-validate.yaml # Mapping between layers and services with physical machines
network-validate/ # Network configuration for each physical machine
workflow-validate.out # Commands used to deploy application (prepare, launch, and finalize)
.. note::
Providing a **systematic methodology to define the experimental environment**
(``layers_services.yaml``, ``network.yaml``, and ``workflow.yaml``) and
providing **access to the artifacts** supports the experiment **Repeatability**,
**Replicability**, and **Reproducibility**, see `ACM Digital Library Terminology
`_.