Kafka REST to Google BigQuery Data Pipeline Use Case

In this use case, we will create a data pipeline that starts from the REST producer, consumes data from the topic using Kafka Connect, inserts data into the Google BigQuery table. It can be said that this way is the streaming to BigQuery. Important to note that streaming to BigQuery is charged by Google.

Figure-1. BigQuery price list

This pipeline architecture can be depicted as below.

Figure-2. Data Pipeline Architecture

Currently, free tier Apache Kafka has no REST API (except Kafka Connect). In this use case, Confluent Kafka REST API is used. Confluent provides docker image for the test environment. Detailed requirements are discussed here.

Moving data using REST API provides a flexible solution. In any application which has been developed any development language can have the ability to access other applications via REST API. There is no requirement for developing Producer-Consumer Kafka API to access data on Kafka Cluster. It makes it very easy to play data on Kafka Cluster.

After running Confluent docker image, each depended images will be installed and run.

Figure-3. Running All in One Confluent Docker Image

For REST request, I used Insomnia REST, but Postman could be used. It is up to you.

In Confluent Kafka REST API, there are some templates for the messages. In this use case, JSON messages will be stored with AVRO schema and messages will be consumed as AVRO for inserting on BigQuery.

My JSON message format is

{  
   "key_schema": "{\"type\":\"string\"}",
   "value_schema":"{  \"type\":\"record\",\"name\":\"Position\",\"fields\":[  {  \"name\":\"loadId\",\"type\":\"double\"},{\"name\":\"lat\",\"type\":\"double\"},{  \"name\":\"lon\",\"type\":\"double\"}]}",
   "records":[  
      {  
         "key":"1",
         "value":{  
            "lat":43.33,
            "lon":43.33,
            "loadId":22
         }
      }
   ]
}

Its headers are

Content-Type : application/vnd.kafka.avro.v2+json

Accept : application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

That’s all!

Let’s send the request.

Figure-4. POST request

So far, everything are OK, 200 OK response should be seen. Otherwise, something went wrong…

In the Confluent Cluster UI (localhost:9021), the topic and message can be seen. It is so handy.

Figure-5. Topic Messages on Confluent UI

The point in that, schema definitions are registered on Confluent Schema Registry API for the key and value.

Value Schema Definition
Figure-6. Value Schema Definition
Key Schema Definition
Figure-7. Key Schema Definition

In order to copy each record into the BigQuery table, open-source connector BigQuerySinkConnector is used. I could have seen the Connector on the Connectors page after cluster restart.

Refreshed Connectors List
Figure-8. Refreshed Connectors List

My Connector configuration file

 {
   "name": "kcbq-connect",
   "config": {
     "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
     "tasks.max" : "1",
     "topics" : "onurtest3",
     "sanitizeTopics" : "true",
     "autoCreateTables" : "true",
     "schemaRetriever" : "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
     "schemaRegistryLocation":"http://schema-registry:8081",
     "project" : <project_id on cloud>,
     "datasets" : ".*=<dataset on cloud service>",
     "keyfile" : " /tmp/security_file.json",
     "value.converter" : "io.confluent.connect.avro.AvroConverter",
     "value.converter.schemas.enable" : "true",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "key.converter.schemas.enable":"true",
     "value.converter.schema.registry.url" : "http://schema-registry:8081",
     "key.converter.schema.registry.url": "http://schema-registry:8081",
     "errors.log.enable": "true",
     "errors.log.include.messages": "true",
     "errors.tolerance": "all"
   }
 }

Project property should be the same with project id on Google cloud. The service account key file should be saved on Kafka Connect docker image (not on local).

The schema registry URL indicates the schema registry docker image. If you use localhost:8081, the connector will not start error because of schema registry errors.

If everything are ok, you should see the below screen

Confluent UI Connector dashboard
Figure-9. Confluent UI Connector dashboard

And each record are sent to Google BigQuery table, that is it!

Record are sent to BigQuery Table in realtime
Figure-10. Record are sent to BigQuery Table in realtime