Building data pipeline with Google Cloud DataFlows through custom template in Apache Beam(Python)

Megha Mohini
6 min readApr 24, 2021

Recently stumbled upon Google Cloud’s Dataflow service, as we were solutioning a use-case at work, and it grabbed all my attention!

I looked into some problem statements to solve and found an interesting approach to run dataflows on custom built pipelines through Apache Beam.

Being a beginner at the Google Cloud Platform and a complete newbie to Apache Beam, I went through some basic blogs and tutorials that I found on the internet to understand some basics, that I will be summarizing first.

Overview

The following features of Apache beam makes it extremely powerful, flexible and widely used tool:

  • Execution Platform Agnostic: Pipelines created can be exported to run on any platform. There is no migration effort involved to run on the same pipeline on any other execution engine , eg : Google Cloud dataflows, Flink, Spark,MapReduce etc.
  • Data Agnostic : Input data for pipelines can be in any form(csv/json), and any kind of source. Beam supports both batch data transformations and streaming data,or in-memory data. Logics can be applied to any.
  • Programming Language agnostic: This is self explanatory. Beam supports pipelines to be written in any platform like java ,python Go etc.

Key Terms:

As this was my first experiment with Beam pipelines, i went through a number of online tutorials and blogs to understand the underlying architecture of pipelines. Here however , I will briefly discuss a few ket concepts, most of which I have used later:

  • PCollection: Dataset created when a pipeline is executed. PCollection is executed on any platform.
  • PTransform: Transformation logic applied on any PCollection. The output of each PTransform is a PCollection. PTransforms can be applied in a linear as well as parallel fashion.
  • Runner: Execution engine where we want to run the pipeline. eg: Dataflow etc.

PTransforms :

Element wise transforms:

  • ParDo: Short for Parallel do. Used to perform map/flatmap operation or an element wise operation. Each element is processed individually. It can be used for operations like filtering, computation, extracting etc.
  • Map: Applies a simple 1-to-1 mapping function over each element in the collection. Map accepts a function that returns a single element for every input element in the PCollection.

Aggregations:

  • GroupByKey: Its a process for transforming collections of key-value pairs .It is a parallel reduction operation that takes as input a collection of key-value pairs.In a simple word count example, with values as word: line numbers, the groupByKey operation groups the line numbers against each work(key).
  • CombineValues: This is another way of combining key values in a PCollection. CombineValues accepts a function that takes an iterable of elements as an input, and combines them to return a single element. CombineValues expects a keyed PCollection of elements, where the value is an iterable of elements to be combined.
  • CombinePerKey: Combines all elements for each key in a collection. CombinePerKey accepts a function that takes a list of values as an input, and combines them for each key. It can be used with functions like sum, min,max saturated_sum etc.

I downloaded a dataset from Github, covid.csv, which contains daily covid cases in terms of date,country,province, numbers confirmed,numbers recovered , deaths etc across different countries and provinces. The link to the dataset is here.

https://github.com/datasets/covid-19/tree/main/data

Showing you the first 5 rows of the csv:

I uploaded the file on a bucket in Cloud Storage through Console, and then created a pipeline using Apache Beam on Python to read , perform some basic transformations on the data and store the results in a BigQuery table. I then run this pipeline on Dataflows , through cloud shell command and monitor it on the console.

My idea was to build a timeseries data that represents each day against the max number of deaths for that day, so that a timeseries graph can be plotted to derive the trend of the deaths from Covid.

Cloud Dataflow Programming Model:

  1. Create a dataflow pipeline
  • Reading csv file
  • Grouping the deaths for each day across countries
  • Getting max value for death for that day
  • Mapping this data to convert into a dictionary format.
  • Dumping the pipeline output into Big Query table.

2. Run the dataflow pipeline.

3. Visualizations

Here are some of my code snippets:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())

The above loc are to import the beam library and create a pipeline object. It creates a beam pipeline by taking in the configuration options.

p_col1 =  (p | 'ReadData' >> beam.io.ReadFromText(
'gs://BUCKET_NAME/covid.csv', skip_header_lines=1)
| 'splitting' >> beam.ParDo(Split())
| 'printdata1' >> beam.Map(print)
)

3 transformations are applied on the PCollection. Data from csv in the cloud bucket is read through the “ReadFromText” operation , in the “ReadData” transform step. Next is the ParDo transform applied, that parallelly applies the “split” function(defined ahead) on each element of the Pcollection. Lastly to view the output of these transforms on my local machine I perform a Map operation with a “print” method.

class Split(beam.DoFn):def process(self, element):
date,country,province, deaths, confirmed,recovered = element.split(",")
return [{
'country': country,
'deaths': int(deaths),
'date': date,
'confirmed':confirmed,
'recovered': recovered
}
]

The DoFn object that you pass to ParDo contains the processing logic that gets applied to the elements in the input collection. We write a method process where we provide the actual processing logic. We don’t need to manually extract the elements from the input collection, and the process method should accept an argument element, which is the input element, and returns an iterable with its output values.

In the process method, the elements are split by a comma(,) and returned with its corresponding key.

p_col2 = (p_col1| 'timeseries' >> beam.ParDo(collectDeathRecords())
| 'grouping' >> beam.GroupByKey(beam.CombinePerKey(sum))
| 'Get max value' >> beam.CombineValues(lambda elements: max(elements or [None]))
| 'beam_map' >> beam.Map(lambda x: {"date": x[0], "count": x[1]})

The next set of transformations include a “timeseries” , that performs a ParDo calling a collectDeathRecords class. In the process method, we return a key-value pair of (date:deaths) , rest of the columns are ignored for now.

class collectDeathRecords(beam.DoFn):def process(self, element):
result = {
(element['date'], element['deaths'])
}
return result

Next is the CombinePerKey(sum) that groups the deaths by date , thus corresponding to each date we get a list of deaths in different countries .

“Get Max Value” gets the maximum of the list of deaths for each day. So by this stage, we have date as key and the maximum of death count for that day.

Last is the beam_map step that maps the PCollection in a json like key:value structure, so that it can be stored in bigquery.

SCHEMA = 'date:STRING,count:INTEGER'
p_col3 = p_col2| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'<project>:<dataset>.covid_data',
schema=SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

We use the WriteToBigQuery operation to write the final PCollection to big query, passing the <projectname.dataset_name.table_name> and schema of the table. An empty table with same name was already created . Lastly the Write_append disposition option was passed, which indicates that the insertion to table will be in append mode.

This is how the table finally looks on bigquery (sample data), which can be further used to visualize trend as a time series graph:

Executing Pipeline

  • For testing and debugging purposes I had the file saved to my local system and ran the py script through Pycharm. Also did a pip install apache-beam.
  • To execute the pipeline on google cloud dataflow, I logged on to the Google cloud shell, and executed using the folowing command:

python3 covid.py — runner DataFlowRunner — project <project-name>— temp_location gs://<bucket_name>/tmp — region us-west1 — staging_location gs://test_dev_m_pendo_1/stag \

Sources:

--

--