****************************** Smart Surveillance Application ****************************** .. contents:: :depth: 2 This example is depicted in :ref:`cctv_anatomy` and was provided by `Pedro Silva `_. The example consists of **data producers** at the Edge (8 nodes, each one with 40 cameras); **gateways** at the Fog (4 nodes, each one with a java application to process images); and a **Flink Cluster** (1 node) and a **Kafka Cluster** (1 node) at the Cloud. The Flink Cluster is composed of one *Job Manager* and one *Task Manager*. The Kafka Cluster consists of a Zookeeper server and a Kafka Broker. Lastly, in a single node we have the **metrics collector**, an java application to collect metrics such as the end-to-end latency. In this example **you will learn how to**: - Configure a Flink Cluster and a Kafka Cluster. - Deploy 4 gateways and 320 data producers and interconnect them in round-robin. - Define network constraints (delay, loss, and bandwidth) between the Edge, Fog, and Cloud infrastructures. - Manage and run Flink Job, Kafka broker, gateways, and data producers. - Check end-to-end execution. .. _cctv_anatomy: .. figure:: cctv/cctv_anatomy.png :width: 100% :align: center Figure 1: Anatomy Experiment Artifacts ==================== Following the instructions below you can get access to the experiments artifacts required to run this application. These artifacts refers to applications, libraries, dataset, and configuration files. .. code-block:: bash # G5K frontend $ ssh rennes.grid5000.fr $ tmux $ cd git/ $ git clone https://gitlab.inria.fr/E2Clab/examples/cctv $ cd cctv/ $ ls artifacts/ # contains dataset/, libs/, libs-opencv/, and Flink job cluster-2020/ # contains the code used to generate the charts getting_started/ # contains layers_services.yaml, network.yaml, and workflow.yaml Defining the Experimental Environment ===================================== Layers & Services Configuration ------------------------------- This configuration file presents the **layers** and **services** that compose the Smart Surveillance example. The **Flink Cluster** (single node, ``quantity: 1``) is composed of one *Job Manager* and one *Task Manager*. The **Kafka Cluster** (single node, ``quantity: 1``) consists of a Zookeeper server and a Kafka Broker. Lastly, we have **gateways** (4 nodes: ``quantity: 1`` and ``repeat: 3``) and the **data producers** (two nodes, ``quantity: 1`` and ``repeat: 7``). Note that all services will be monitored ``roles: [monitoring]`` during the execution of experiments if you defined a monitoring service ``monitoring: type: dstat``. .. literalinclude:: cctv/layers_services.yaml :language: yaml :linenos: Network Configuration --------------------- The file below presents the network configuration between the ``cloud``, ``fog``, and ``edge`` infrastructures. Between ``cloud`` and ``fog`` we have the following configuration ``delay: 5ms, loss: 2%, rate: 1gbit``, while between the ``edge`` and ``fog`` we are emulating a 4G LTE network ``delay: 50ms, loss: 5%, rate: 150gbit``. .. literalinclude:: cctv/network.yaml :language: yaml :linenos: Workflow Configuration ---------------------- This configuration file presents the application workflow configuration. It will be explained in the following order ``prepare``, ``launch``, and ``finalize``. ``prepare`` - Regarding *Kafka* ``cloud.kafka.*.leader``, we are creating two topics ``in-uni-data`` and ``out-uni-data`` with ``32 partitions``. - Regarding *Flink Job Manager* ``cloud.flink.*.job_manager``, we are copying from the local machine to the remote machine the *Flink Job*. - Regarding *gateways* ``fog.mosquitto.*``, we are copying its *libraries*. - Regarding *producers* ``edge.producer.*``, we are copying the *producer java application* and its *libraries*. - Regarding *metrics collector* ``experiment_manager.metrics_collector.*``, we are copying its *libraries*. ``launch`` - Regarding *Flink Job Manager* ``cloud.flink.*.job_manager``, we are copying it to the container and submitting the *Flink Job*. - Regarding *gateways* ``fog.mosquitto.*``, we are starting the java application. - Regarding *producers* ``edge.producer.*``, we are starting 40 java processes per node. We used the ``depends_on`` attribute to interconnect the *320 producers* with the *4 gateways* in *round robin* ``grouping: "round_robin"``. This configuration may be seen in the ``shell`` command, where we used the prefix *mosquitto* ``prefix: "mosquitto"`` to access the URL of *gateways* ``{{ mosquitto.url }}``. - Regarding *metrics collector* ``experiment_manager.metrics_collector.*``, we are starting the java application to collect some metrics. ``finalize`` - Regarding *Kafka* ``cloud.kafka.*.leader``, we are stopping and removing *Kafka* and *Zookeeper* docker containers. - Regarding *Flink Job Manager* ``cloud.flink.*.job_manager``, we are stopping and removing the *Task Manager* container. - Regarding *gateways* ``fog.mosquitto.*``, we are collecting the metrics of all ``4 gateways``, such as their latency and throughput. - Regarding *producers* ``edge.producer.*``, we are collecting the metrics of all ``320 producers``, such as their throughput. - Regarding *metrics collector* ``experiment_manager.metrics_collector.*``, we are collecting the end-to-end processing latency. .. literalinclude:: cctv/workflow.yaml :language: yaml :linenos: Running & Verifying Experiment Execution ======================================== Find below the commands to deploy this application and check its execution. Once deployed, you can check in the **Flink WebUI** the configurations that we have defined in the ``layers-services.yaml`` and ``workflow.yaml`` configuration files, such as the ``number of Task Managers``, ``total task slots``, ``Job Manager and Task manager heap size``, ``job parallelism``, etc. (see :ref:`cctv_webui`, :ref:`cctv_configs`, and :ref:`cctv_running_job`). Besides, you can also check the data sent to **Mosquitto** ``in-uni-data`` topic and **Kafka** ``in-uni-data`` and ``out-uni-data`` topics. For **producers** and **metrics collector** you can check the java processes running on it. - Running multiple experiments: in this example we are running two experiments ``--repeat 1``, each one with a duration of four minutes ``--duration 240``. .. code-block:: bash # G5K frontend $ ssh rennes.grid5000.fr $ tmux $ cd git/e2clab/ # G5K interactive usage $ oarsub -p "cluster='parasilo'" -l host=1,walltime=2 -I # As soon as the host becomes available, you can run your experiments $ source ../venv/bin/activate $ e2clab deploy --repeat 1 --duration 240 /home/drosendo/git/cluster-2020-artifacts/getting_started/ /home/drosendo/git/cluster-2020-artifacts/artifacts/ - Verifying experiment execution. .. code-block:: bash # Discover hosts in layers_services-validate.yaml file $ cat layers_services-validate.yaml # Tunnel to Flink Job Manager $ ssh -NL 8081:localhost:8081 parasilo-12.rennes.grid5000.fr # Kafka Cluster $ ssh parasilo-14.rennes.grid5000.fr # Kafka input topic $ sudo docker exec -it leader /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic in-uni-data # Kafka output topic $ sudo docker exec -it leader /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic out-uni-data # Gateway input topic $ ssh parasilo-25.rennes.grid5000.fr $ sudo docker exec -it mosquitto mosquitto_sub -t "in-uni-data" # Producer $ ssh parasilo-3.rennes.grid5000.fr $ ps -C java | wc -l 41 $ ps -xau | grep java java -Xms256m -Xmx1024m -cp /opt/lib/*:/opt/lib university_people_count.university_people_count_data_source.UniversityPeopleCountDataSourceToMosquitto /opt/dataset/data-1 10 1 tcp://parasilo-25.rennes.grid5000.fr:1883 in-uni-data /tmp/mosquitto 220 0 /opt/metrics-1 ... java -Xms256m -Xmx1024m -cp /opt/lib/*:/opt/lib university_people_count.university_people_count_data_source.UniversityPeopleCountDataSourceToMosquitto /opt/dataset/data-40 10 40 tcp://parasilo-25.rennes.grid5000.fr:1883 in-uni-data /tmp/mosquitto 220 0 /opt/metrics-40 # Metrics Collector $ ssh parasilo-9.rennes.grid5000.fr $ ps -xau | grep java java -cp /opt/lib/*:/opt/lib university_people_count.university_people_count_sink.UniversityPeopleCountCloudInSink parasilo-14.rennes.grid5000.fr:9092 in-uni-data 0 /opt/metrics/in-sink java -cp /opt/lib/*:/opt/lib university_people_count.university_people_count_sink.UniversityPeopleCountOutSink parasilo-14.rennes.grid5000.fr:9092 out-uni-data 0 /opt/metrics/out-sink - In :ref:`cctv_webui` you can check: the single *Task Manager* configured with *32 task slots* ``taskmanager.numberOfTaskSlots: 32``. .. _cctv_webui: .. figure:: cctv/cctv_webui.png :width: 100% :align: center Figure 2: Flink WebUI - In :ref:`cctv_configs` you can check: ``env.java.opts.taskmanager: -Djava.library.path=/opt/flink/lib/``, ``jobmanager.heap.size: 8000m``, ``taskmanager.heap.size: 7000m``, and ``parallelism.default: 16``. .. _cctv_configs: .. figure:: cctv/cctv_configs.png :width: 100% :align: center Figure 3: Flink configurations - In :ref:`cctv_running_job` you can check: the running job with ``parallelism: 32``. .. _cctv_running_job: .. figure:: cctv/cctv_running_job.png :width: 100% :align: center Figure 4: Flink Job Deployment Validation & Experiment Results ========================================== Find below the files generated after the execution of each experiment. It consists of **validation files** ``layers_services-validate.yaml``, ``network-validate/``, and ``workflow-validate.out``, monitoring data ``dstat/``, and ``experiment-results/`` (files generated by producers ``producers/``, gateways ``gateways/``, and metrics collector ``sinks/``). Note that, for each experiment a new directory is generated ``20200614-155815/``. .. code-block:: bash $ ls /home/drosendo/git/cluster-2020-artifacts/getting_started/20200614-155815/ dstat/ # Monitoring data for each physical machine layers_services-validate.yaml # Mapping between layers and services with physical machines network-validate/ # Network configuration for each physical machine workflow-validate.out # Commands used to deploy application (prepare, launch, and finalize) experiment-results/ # Defined in workflow file gateways/ # latency and throughput of all gateways (4 in this example) producers/ # throughput of all producers (320 in this example) sinks/ # latency: fog-to-cloud and end-to-end .. note:: Providing a **systematic methodology to define the experimental environment** and providing **access to the methodology artifacts** (``layers_services.yaml``, ``network.yaml``, and ``workflow.yaml``) leverages the experiment **Repeatability**, **Replicability**, and **Reproducibility**, see `ACM Digital Library Terminology `_.