******************************
Smart Surveillance Application
******************************
.. contents::
:depth: 2
This example is depicted in :ref:`cctv_anatomy` and was provided by `Pedro Silva `_. The example consists of **data producers** at the Edge (8 nodes, each one with 40 cameras);
**gateways** at the Fog (4 nodes, each one with a java application to process images); and a **Flink Cluster** (1 node) and a
**Kafka Cluster** (1 node) at the Cloud. The Flink Cluster is composed of one *Job Manager* and one *Task Manager*. The Kafka Cluster
consists of a Zookeeper server and a Kafka Broker. Lastly, in a single node we have the **metrics collector**, an java application to
collect metrics such as the end-to-end latency.
In this example **you will learn how to**:
- Configure a Flink Cluster and a Kafka Cluster.
- Deploy 4 gateways and 320 data producers and interconnect them in round-robin.
- Define network constraints (delay, loss, and bandwidth) between the Edge, Fog, and Cloud infrastructures.
- Manage and run Flink Job, Kafka broker, gateways, and data producers.
- Check end-to-end execution.
.. _cctv_anatomy:
.. figure:: cctv/cctv_anatomy.png
:width: 100%
:align: center
Figure 1: Anatomy
Experiment Artifacts
====================
Following the instructions below you can get access to the experiments artifacts required
to run this application. These artifacts refers to applications, libraries, dataset, and
configuration files.
.. code-block:: bash
# G5K frontend
$ ssh rennes.grid5000.fr
$ tmux
$ cd git/
$ git clone https://gitlab.inria.fr/E2Clab/examples/cctv
$ cd cctv/
$ ls
artifacts/ # contains dataset/, libs/, libs-opencv/, and Flink job
cluster-2020/ # contains the code used to generate the charts
getting_started/ # contains layers_services.yaml, network.yaml, and workflow.yaml
Defining the Experimental Environment
=====================================
Layers & Services Configuration
-------------------------------
This configuration file presents the **layers** and **services** that compose the Smart Surveillance example. The **Flink Cluster**
(single node, ``quantity: 1``) is composed of one *Job Manager* and one *Task Manager*. The **Kafka Cluster** (single node, ``quantity: 1``)
consists of a Zookeeper server and a Kafka Broker. Lastly, we have **gateways** (4 nodes: ``quantity: 1`` and ``repeat: 3``) and
the **data producers** (two nodes, ``quantity: 1`` and ``repeat: 7``). Note that all services will be monitored ``roles: [monitoring]``
during the execution of experiments if you defined a monitoring service ``monitoring: type: dstat``.
.. literalinclude:: cctv/layers_services.yaml
:language: yaml
:linenos:
Network Configuration
---------------------
The file below presents the network configuration between the ``cloud``, ``fog``, and
``edge`` infrastructures. Between ``cloud`` and ``fog`` we have the following
configuration ``delay: 5ms, loss: 2%, rate: 1gbit``, while between the ``edge`` and
``fog`` we are emulating a 4G LTE network ``delay: 50ms, loss: 5%, rate: 150gbit``.
.. literalinclude:: cctv/network.yaml
:language: yaml
:linenos:
Workflow Configuration
----------------------
This configuration file presents the application workflow configuration. It will be explained in the following order ``prepare``,
``launch``, and ``finalize``.
``prepare``
- Regarding *Kafka* ``cloud.kafka.*.leader``, we are creating two topics ``in-uni-data`` and ``out-uni-data`` with ``32 partitions``.
- Regarding *Flink Job Manager* ``cloud.flink.*.job_manager``, we are copying from the local machine to the remote machine the *Flink Job*.
- Regarding *gateways* ``fog.mosquitto.*``, we are copying its *libraries*.
- Regarding *producers* ``edge.producer.*``, we are copying the *producer java application* and its *libraries*.
- Regarding *metrics collector* ``experiment_manager.metrics_collector.*``, we are copying its *libraries*.
``launch``
- Regarding *Flink Job Manager* ``cloud.flink.*.job_manager``, we are copying it to the container and submitting the *Flink Job*.
- Regarding *gateways* ``fog.mosquitto.*``, we are starting the java application.
- Regarding *producers* ``edge.producer.*``, we are starting 40 java processes per node. We used the ``depends_on`` attribute to interconnect the *320 producers* with the *4 gateways* in *round robin* ``grouping: "round_robin"``. This configuration may be seen in the ``shell`` command, where we used the prefix *mosquitto* ``prefix: "mosquitto"`` to access the URL of *gateways* ``{{ mosquitto.url }}``.
- Regarding *metrics collector* ``experiment_manager.metrics_collector.*``, we are starting the java application to collect some metrics.
``finalize``
- Regarding *Kafka* ``cloud.kafka.*.leader``, we are stopping and removing *Kafka* and *Zookeeper* docker containers.
- Regarding *Flink Job Manager* ``cloud.flink.*.job_manager``, we are stopping and removing the *Task Manager* container.
- Regarding *gateways* ``fog.mosquitto.*``, we are collecting the metrics of all ``4 gateways``, such as their latency and throughput.
- Regarding *producers* ``edge.producer.*``, we are collecting the metrics of all ``320 producers``, such as their throughput.
- Regarding *metrics collector* ``experiment_manager.metrics_collector.*``, we are collecting the end-to-end processing latency.
.. literalinclude:: cctv/workflow.yaml
:language: yaml
:linenos:
Running & Verifying Experiment Execution
========================================
Find below the commands to deploy this application and check its execution. Once deployed, you can check in the **Flink WebUI**
the configurations that we have defined in the ``layers-services.yaml`` and ``workflow.yaml`` configuration files, such as the
``number of Task Managers``, ``total task slots``, ``Job Manager and Task manager heap size``, ``job parallelism``, etc.
(see :ref:`cctv_webui`, :ref:`cctv_configs`, and :ref:`cctv_running_job`). Besides, you can also check the data sent to
**Mosquitto** ``in-uni-data`` topic and **Kafka** ``in-uni-data`` and ``out-uni-data`` topics. For **producers** and **metrics collector** you
can check the java processes running on it.
- Running multiple experiments: in this example we are running two experiments ``--repeat 1``, each one with a duration of four minutes ``--duration 240``.
.. code-block:: bash
# G5K frontend
$ ssh rennes.grid5000.fr
$ tmux
$ cd git/e2clab/
# G5K interactive usage
$ oarsub -p "cluster='parasilo'" -l host=1,walltime=2 -I
# As soon as the host becomes available, you can run your experiments
$ source ../venv/bin/activate
$ e2clab deploy --repeat 1 --duration 240
/home/drosendo/git/cluster-2020-artifacts/getting_started/
/home/drosendo/git/cluster-2020-artifacts/artifacts/
- Verifying experiment execution.
.. code-block:: bash
# Discover hosts in layers_services-validate.yaml file
$ cat layers_services-validate.yaml
# Tunnel to Flink Job Manager
$ ssh -NL 8081:localhost:8081 parasilo-12.rennes.grid5000.fr
# Kafka Cluster
$ ssh parasilo-14.rennes.grid5000.fr
# Kafka input topic
$ sudo docker exec -it leader /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic in-uni-data
# Kafka output topic
$ sudo docker exec -it leader /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic out-uni-data
# Gateway input topic
$ ssh parasilo-25.rennes.grid5000.fr
$ sudo docker exec -it mosquitto mosquitto_sub -t "in-uni-data"
# Producer
$ ssh parasilo-3.rennes.grid5000.fr
$ ps -C java | wc -l
41
$ ps -xau | grep java
java -Xms256m -Xmx1024m -cp /opt/lib/*:/opt/lib university_people_count.university_people_count_data_source.UniversityPeopleCountDataSourceToMosquitto /opt/dataset/data-1 10 1 tcp://parasilo-25.rennes.grid5000.fr:1883 in-uni-data /tmp/mosquitto 220 0 /opt/metrics-1
...
java -Xms256m -Xmx1024m -cp /opt/lib/*:/opt/lib university_people_count.university_people_count_data_source.UniversityPeopleCountDataSourceToMosquitto /opt/dataset/data-40 10 40 tcp://parasilo-25.rennes.grid5000.fr:1883 in-uni-data /tmp/mosquitto 220 0 /opt/metrics-40
# Metrics Collector
$ ssh parasilo-9.rennes.grid5000.fr
$ ps -xau | grep java
java -cp /opt/lib/*:/opt/lib university_people_count.university_people_count_sink.UniversityPeopleCountCloudInSink parasilo-14.rennes.grid5000.fr:9092 in-uni-data 0 /opt/metrics/in-sink
java -cp /opt/lib/*:/opt/lib university_people_count.university_people_count_sink.UniversityPeopleCountOutSink parasilo-14.rennes.grid5000.fr:9092 out-uni-data 0 /opt/metrics/out-sink
- In :ref:`cctv_webui` you can check: the single *Task Manager* configured with *32 task slots* ``taskmanager.numberOfTaskSlots: 32``.
.. _cctv_webui:
.. figure:: cctv/cctv_webui.png
:width: 100%
:align: center
Figure 2: Flink WebUI
- In :ref:`cctv_configs` you can check: ``env.java.opts.taskmanager: -Djava.library.path=/opt/flink/lib/``, ``jobmanager.heap.size: 8000m``, ``taskmanager.heap.size: 7000m``, and ``parallelism.default: 16``.
.. _cctv_configs:
.. figure:: cctv/cctv_configs.png
:width: 100%
:align: center
Figure 3: Flink configurations
- In :ref:`cctv_running_job` you can check: the running job with ``parallelism: 32``.
.. _cctv_running_job:
.. figure:: cctv/cctv_running_job.png
:width: 100%
:align: center
Figure 4: Flink Job
Deployment Validation & Experiment Results
==========================================
Find below the files generated after the execution of each experiment. It consists of **validation files**
``layers_services-validate.yaml``, ``network-validate/``, and ``workflow-validate.out``, monitoring data ``dstat/``,
and ``experiment-results/`` (files generated by producers ``producers/``, gateways ``gateways/``, and metrics collector ``sinks/``).
Note that, for each experiment a new directory is generated ``20200614-155815/``.
.. code-block:: bash
$ ls /home/drosendo/git/cluster-2020-artifacts/getting_started/20200614-155815/
dstat/ # Monitoring data for each physical machine
layers_services-validate.yaml # Mapping between layers and services with physical machines
network-validate/ # Network configuration for each physical machine
workflow-validate.out # Commands used to deploy application (prepare, launch, and finalize)
experiment-results/ # Defined in workflow file
gateways/ # latency and throughput of all gateways (4 in this example)
producers/ # throughput of all producers (320 in this example)
sinks/ # latency: fog-to-cloud and end-to-end
.. note::
Providing a **systematic methodology to define the experimental environment** and providing **access to the methodology artifacts** (``layers_services.yaml``, ``network.yaml``, and ``workflow.yaml``) leverages the experiment **Repeatability**, **Replicability**, and **Reproducibility**, see `ACM Digital Library Terminology `_.