*********************** Flink Click Event Count *********************** .. contents:: :depth: 2 This example is depicted in :ref:`flink_anatomy` and is based on the `Flink Operations Playground `_. This example consists of a **Flink Cluster**, a **Kafka Cluster**, and **data producers**. In this example **you will learn how to**: - Configure a Flink Cluster and a Kafka Cluster. - Scale the number of producers (scale in and scale out). - Configure the network between producers and the Flink and Kafka clusters. - Manage and run Flink Jobs and data producers. - Check end-to-end execution. .. _flink_anatomy: .. figure:: flink_click_event_count/flink_anatomy.png :width: 100% :align: center Figure 1: Anatomy Experiment Artifacts ==================== .. code-block:: bash $ cd ~/git/ $ git clone https://gitlab.inria.fr/E2Clab/examples/flink-kafka $ cd flink-kafka/ $ ls artifacts # contains Flink job (ClickCountJob.jar) and lib/ getting_started # contains layers_services.yaml, network.yaml, and workflow.yaml Defining the Experimental Environment ===================================== Layers & Services Configuration ------------------------------- This configuration file presents the **layers** and **services** that compose this example. The **Flink Cluster** (three nodes, ``quantity: 3``) is composed of one *Job Manager* and two *Task Managers*. The **Kafka Cluster** (single node, ``quantity: 1``) consists of a Zookeeper server and a Kafka Broker. Lastly, we have the data producers (two nodes, ``quantity: 1`` and ``repeat: 1``). Note that, as we defined ``monitoring: type: dstat``, all services will be monitored ``roles: [monitoring]`` during the execution of the experiments using ``dstat``. .. literalinclude:: flink_click_event_count/layers_services.yaml :language: yaml :linenos: **User-Defined Service:** add the following services in the ``~/git/e2clab/e2clab/services/`` directory. - `Flink `_ - `Kafka `_ Network Configuration --------------------- The file below presents the network configuration between the ``cloud`` and ``edge`` infrastructures ``delay: 5ms, loss: 2%, rate: 1gbit``. .. literalinclude:: flink_click_event_count/network.yaml :language: yaml :linenos: Workflow Configuration ---------------------- This configuration file presents the application workflow configuration. - Regarding *Flink Job Manager* ``cloud.flink.*.job_manager``, in ``prepare`` we are copying from the local machine to the remote machine the *Flink Job*. In ``launch`` we are copying it to the container and starting the *Flink job*. - Regarding *producers* ``edge.producer.*``, in ``prepare`` we are copying the *producer java application* and its *libraries* and then, in ``launch`` we are starting four java processes. We used the ``depends_on`` attribute to interconnect all *producers* to the *Kafka server*. In this case, they will be grouped in *round robin* ``grouping: "round_robin"`` and as presented in ``shell``, *producers* were configured to send data to *Kafka* ``{{ kafka.url }}``. .. literalinclude:: flink_click_event_count/workflow.yaml :language: yaml :linenos: Running & Verifying Experiment Execution ======================================== Find below the commands to deploy this application and check its execution. Once deployed, you can check in the **Flink WebUI** the configurations that we have defined in the ``layers-services.yaml`` and ``workflow.yaml`` configuration files, such as the ``number of Task Managers``, ``total task slots``, ``Job Manager and Task manager heap size``, ``job parallelism``, etc. (see :ref:`flink_webui`, :ref:`flink_configs`, and :ref:`flink_running_job`). Besides, you can also check the records written to the **Kafka Topics** (``input`` and ``output`` topics). - Running multiple experiments: in this example we are running four experiments ``--repeat 3``, each one with a duration of four minutes ``--duration 240``. .. code-block:: bash $ e2clab deploy --repeat 3 --duration 240 ~/git/flink-kafka/getting_started/ ~/git/flink-kafka/artifacts/ - Verifying experiment execution. .. code-block:: bash # Discover hosts in /layers_services-validate.yaml file $ cat layers_services-validate.yaml # Tunnel to Flink Job Manager (cloud.flink.1.job_manager.1) $ ssh -NL 8081:localhost:8081 .rennes.grid5000.fr # SSH to Kafka Cluster (cloud.kafka.leader.1) $ ssh .rennes.grid5000.fr # Kafka input topic $ sudo docker exec -it leader /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic input {"timestamp":"01-01-1970 12:04:32:235","page":"/news"} {"timestamp":"01-01-1970 12:04:33:480","page":"/help"} {"timestamp":"01-01-1970 12:04:33:255","page":"/about"} {"timestamp":"01-01-1970 12:04:32:565","page":"/jobs"} {"timestamp":"01-01-1970 12:04:32:745","page":"/jobs"} {"timestamp":"01-01-1970 12:04:32:565","page":"/about"} # Kafka output topic $ sudo docker exec -it leader /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic output {"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/index","count":7884} {"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/about","count":7882} {"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/jobs","count":7882} {"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/news","count":7880} {"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/shop","count":7884} {"windowStart":"01-01-1970 12:00:45:000","windowEnd":"01-01-1970 12:01:00:000","page":"/help","count":7885} - In :ref:`flink_webui` you can check: the two *Task Managers* configured with *16 task slots* each ``taskmanager.numberOfTaskSlots: 16`` resulting in ``Total Task Slots 32``. .. _flink_webui: .. figure:: flink_click_event_count/flink_webui.png :width: 100% :align: center Figure 2: Flink WebUI - In :ref:`flink_configs` you can check: ``jobmanager.heap.size: 2048m``, ``taskmanager.heap.size: 768m``, and ``taskmanager.numberOfTaskSlots: 16``. .. _flink_configs: .. figure:: flink_click_event_count/flink_configs.png :width: 100% :align: center Figure 3: Flink configurations - In :ref:`flink_running_job` you can check: the running job with ``parallelism: 32``. .. _flink_running_job: .. figure:: flink_click_event_count/flink_running_job.png :width: 100% :align: center Figure 4: Flink Job Deployment Validation & Experiment Results ========================================== Find below the files generated after the execution of each experiment. It consists of **validation files** ``layers_services-validate.yaml``, ``network-validate/``, and ``workflow-validate.out``, and monitoring data ``monitoring-data/``. Note that, for each experiment a new directory is generated ``yyyymmdd-hhmmss/``. .. code-block:: bash $ ls ~/git/flink-kafka/getting_started// monitoring-data/ # Monitoring data for each physical machine layers_services-validate.yaml # Mapping between layers and services with physical machines network-validate/ # Network configuration for each physical machine workflow-validate.out # Commands used to deploy application (prepare, launch, and finalize) .. note:: Providing a **systematic methodology to define the experimental environment** (``layers_services.yaml``, ``network.yaml``, and ``workflow.yaml``) and providing **access to the artifacts** supports the experiment **Repeatability**, **Replicability**, and **Reproducibility**, see `ACM Digital Library Terminology `_.