Data pipeline – Mini Project on Google Cloud Platform

Introduction

The idea behind this project is to simulate a real-world scenario where we need to move data from a source to a destination in a big data environment having huge amounts of data in semi-structured and unstructured formats.

Pipeline

Why do we need to write a custom data pipeline to move data?

  • Having multiple sources and one destination.
  • Require real-time sophisticated data analysis.
  • Store data in the cloud
  • Automate the process with own custom logic.
  • Process / Clean the data on the fly.

Main components of the data pipeline

  1. Ingestion
  2. Processing
  3. Persistence

Data engineers are responsible for using multiple tools to build and maintain data pipelines through the above main steps.

Tools and frameworks used for this project

  • Python – Used to write an event simulator and to connect to different services.
  • Google Pub / Sub – To enable asynchronous communication, which means that the endpoints that are producing (Event simulator) and consuming messages (subscriber) interact with the queue, not each other. Messages are stored on the queue until they are processed and deleted. Each message is processed only once, by a single consumer. Message queues can be used to decouple heavyweight processing, to buffer or batch work, and to smooth spiky workloads. Read more
  • Google BiqQuery – Data warehouse to store streamed data.

Example use cases for building a data pipeline

  • Ingest raw data from various data sources, such as customer relationship management (CRM) database.
  • Ingest IoT device events to a data warehouse or a database.
  • Real time analytics while processing the data.

Enough theories… Let’s start building our data pipeline.

Ingestion

How are we receiving the data? Ingestion step is very critical step in any data pipeline project whether the data is coming from an IoT device, Batch data or streaming data like Twitter or events logs.

To make things simple, in this mini project I’m going to generate sample events for an IoT device using Python. The device will generate data every second according to the following template: iot-sample

import datetime, random

def get_deviceId():
    list_of_devices = ["354232234", "234234342", "134234112", "434234123", "534234124", "234235423"]
    return random.choice(list_of_devices)

def get_temperature():
    return random.randrange(-5, 40, 1)

def get_location():
    return random.uniform(-180,180), random.uniform(-90, 90)

def get_time():
    return str(datetime.datetime.utcnow())

Next, we need to send this received to our messaging bus / queue which is Google Cloud Pub/Sub in our project.

First let’s authenticate our application with a GCP account. Simply we need to know which topic inside which project is ready to receive our published data. This process is different if we are using different on-premise messaging queue system. Read more about Google Pub/Sub authentication here

  1. Create a service account in GCP that has access to Pub/Sub.
  2. Provide application credential by setting the environment variable GOOGLE_APPLICATION_CREDENTIALS with the downloaded JSON path.
  3. Calling Pub/Sub client library.
from google.cloud import pubsub_v1

project_id = "YOUR_PROJECT"
topic_name = "YOUR_TOPIC"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

We are ready now to start sending data to our Pub/Sub topic.

import json

project_id = "PROJECT"
topic_name = "TOPIC"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)



def write_to_pubsub(deviceId, temperature, location, time):
    try:
        data = json.dumps({
            'deviceId': deviceId,
            'temperature': temperature,
            'location': location,
            'time': time
        }, ensure_ascii=False)

        publisher.publish(topic_path,
                          data.encode("utf-8"))
        print(data)

    except Exception as e:
        print(e)
        raise

Processing

In this step, we can process our data to meet our goal, or even just send the data as it is to the target destination. Think of it like a step in your life. I’m 20 years old now, when I reach 21, I will do a master’s degree, when I reach to 22, I will do a PhD and so on…

Most of data pipeline projects involve some transformation / mapping to the data. Read more about transformation here (Custom Parsing, Group by, Analytics, etc…)

Why Apache beam?

  1. Fast, unified stream and batch data processing (Same workflow can work for batch and streamed data)
  2. Serverless (Have access to virtually limitless capacity to solve biggest data processing challenges, while paying only for what we use)
  3. Provides an abstraction between your application logic and the big data ecosystem.
  4. More advantages for using apache Beam here

First, let’s write a parsing function which will be applied to each message received from Pub / Sub as below:

class Split(beam.DoFn):

    def process(self, element):
        element = json.loads(element)
        return [{
            'deviceId': element["deviceId"],
            'temperature': element["temperature"],
            'longitude': element["location"][0],
            'latitude': element["location"][1],
            'time': element["time"]
        }]

Persistence

After we get our data in a PCollection format, we can simply do some processing on the data and generate a new PCollection after each step. Lastly in our pipeline, I’m writing data to a BigQuery table.

import logging, json

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, StandardOptions, GoogleCloudOptions

subscription = "projects/PROJECT/subscriptions/SUBSCRIPTION"
project_id = "PROJECT"

def run(argv=None):
    """Build and run the pipeline."""
    options = PipelineOptions()
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Big query configs
    table_spec = "PROJECT:DATASET.TABLE"
    table_schema = "deviceId:STRING,temperature:FLOAT,longitude:FLOAT,latitude:FLOAT,time:TIMESTAMP"

    # Read from PubSub into a PCollection.
    messages = (
            p | 'Read From PubSub' >> beam.io.ReadFromPubSub(subscription=subscription).with_output_types(bytes)
            | 'Decoding' >> beam.Map(lambda x: x.decode('utf-8'))
            | 'Extract elements' >> beam.ParDo(Split().with_output_types('unicode'))
            | 'Write into Bigtable' >> beam.io.WriteToBigQuery(
                    table_spec,
                    schema=table_schema,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )
    )

    result = p.run()
    result.wait_until_finish()

From BigQuery UI, we can see that data are being ingested into the table and by running the below query we can see data in the table.

Bigquery-example

Conclusion

Any Big data project will require you to move data between different sources, a good start to try simulating data and implement them yourself. Google cloud platform offers you $300 credit where you can try all services for free. I highly recommend to apply the same example yourself. Troubles only will let you learn and understand :-)

Let me know if you have any question!