Run your data operations on a single, unified platform.

  • Easy setup, no data storage required
  • Free forever for core features
  • Simple expansion with additional credits
cross-icon
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

How to stream data into Keboola using BigQuery Continuous Query

Integrate Google's BigQuery Continuous Queries with Keboola's orchestration for faster event response.

How To
August 6, 2024
How to stream data into Keboola using BigQuery Continuous Query
Integrate Google's BigQuery Continuous Queries with Keboola's orchestration for faster event response.

Authors:
Jobin George,
Data & Analytics Partner Solutions Lead at Google
Andrew Mott,
EMEA Field CTO at Keboola
Vojta Tuma,
Field CTO at Keboola

Introduction

The purpose of this post is to describe the integration and benefits of combining Google’s new BigQuery Continuous Queries capability with Keboola’s orchestration capabilities. Combining these capabilities enables organizations to react more quickly to external and internal events. The example outlined here focuses on a reverse ETL scenario where Keboola is monitoring the continuous data and is orchestrating the use of other Google Services to respond in a timely manner, however as you see below, we think that the combination of these two technologies opens up many use cases.

An introduction to BigQuery Continuous Queries

BigQuery Continuous Queries provide customers with the ability to run continuously processing SQL statements that can analyze, transform, and externally replicate data as new events arrive into BigQuery in real-time. This approach extends BigQuery’s real-time capabilities to handle continuous streams of input , analysis, and output; which can be leveraged to construct event-driven actionable insights, real-time machine learning inference, operational reverse ETL pipelines, and more. An initiative which essentially transforms BigQuery from the analytical data sink at the end of data’s architectural journey into an operational and event-driven data processing engine in the middle of an application’s decision logic, all using the user-friendly language of SQL.

How Keboola works with BigQuery Continuous Queries 

You can create and execute continuous queries either through Google Cloud Console,  bq-command-line-interface, or the BigQuery API after choosing Query mode as “Continuous Query”. You can export continuous query output to another BigQuery table, a BigTable or a Pub/Sub topic. In this solution scenario, we will be using Pub/Sub export as destination for the query output. From that point forward, the continuous query will continue to run and generate as new data enters BigQuery. Below is the architecture of how this solution will work with Keboola. 

Once the data is available in Pub/Sub, you can use a cloud function to retrieve the events and push it into Keboola for downstream use cases.

#getsmarter
Oops! Something went wrong while submitting the form.

Connecting the dots between Keboola and Continuous Queries

Setting up and running BigQuery Continuous Queries

1) Cloud functions for Keboola Data Streams are configured using a service account to connect and consume data from Pub/Sub. This can be done by configuring a single service account for running continuous queries and consuming from Pub/Sub by assigning relevant permissions to the user. You can configure the service account with permissions listed here.

2) To run continuous queries, BigQuery requires a slot reservation with a “CONTINUOUS” assignment type. Follow steps here if you are not sure how to create a reservation

Begin your journey with Keboola on the Google Cloud Marketplace today!

3) Navigate to the  Pub/Sub topic page and click on “Create Topic” button on the top center of the page, and provide a name (say ‘continuous_query_topic’, also create a default subscription if needed)

4) Navigate to BigQuery service page and design the query as an export to Pub/Sub 

In the More Settings as shown below, select the query mode as continuous query and in the query settings select the service account created above to run the query. You can also choose the timeout required if any.

5) Before we execute the query, make sure the below steps are done to ensure data continuously generated can be captured by the Keboola Data Streams via cloud Pub/Sub.

Creating and configuring Keboola Data streams

6) Next we need to create a bridge between GCP PubSub and Keboola Connection. Typically we will use Data Source (Extractor) but for this purpose we will utilize Keboola Data Streams. Keboola Data Streams allows users to expose HTTPS endpoints where users can stream data to. This is very useful because it allows you to process data as soon as data appears within Google Pub/Sub.

7) To create a Keboola Data Stream, log into Keboola and select  “Storage” -> “Data streams”.

8) Create a new Data stream. For the purpose of this tutorial we will name it “BigQuery consumer”. On the detail screen of Data streams you can see “Data Streams URL”. 

This URL is important for us because it will serve as an endpoint for Pub/Sub. To build a bridge between Google Pub/Sub and Keboola Connection we need to create a GCP Cloud Function which reads data from Google Pub/Sub into the Keboola Data Stream. In our example the endpoint is: 

https://stream-in.europe-west3.gcp.keboola.com/projects/111/sources/bq-consumer/eXaJ3Qr86RvHDithzm5K7uQpbcplUOeuWTWbmy7cBO0icWe8". 

Security in this case is provided through the use of  a long unique hash, without this hash it’s not possible to  send data to the endpoint, this security mechanism was chosen to reduce the overhead of an additional authentication workload which could impact performance.

How to create Cloud Function in GCP 

9) You can create a cloud function following instructions here , you can find sample python code here. Cloud function is very easy and Python code is as follows:

This is enough to make proper Connection between Keboola Connection and Google Pub/Sub.

Working with data in Keboola Connection

10) Now all the new events  produced by Continuous Query will automatically appear in Keboola Connection. Before we move on let’s focus on the “Import condition” tab in the newly created Data Stream. 

This is defining the conditions  that data in the Data Stream will be exported/saved to Storage. In our case we are saving data every 5 minutes or if we have more than 5 records in Data Stream. We can also set the maximum size of data to upload into which we set at 1MB.

This means that data won’t reside in the data stream for longer than 5 minutes which allows us to build a near-real time pipeline.

Each data stream lands in a bucket and table in Keboola Storage. This can be configured in “Table settings”.

In this example all data from Google Pub/Sub will land into a bucket called ‘bq-consumer’ in a table called ‘orders’ with the table schema shown.

Example usage - AI generated upsell

11) We can now configure a Keboola Triggered Flow on this table, which will execute once a trigger has been reached, such as a number of rows received.  Select ‘Flows’ in the Keboola UI and then create a new Flow called ‘Warranty UpSell’. In this flow we will process all new orders and offer a customer an extended warranty for his/her product. We will also include already bought products that are reaching the end of warranty. Generative AI components (based on the Gemini model) will generate personalized emails for us. This Flow could  look similar to the flow below.

This simple flow shows 3 steps:

  1. A Big Query SQL based transformation to query new orders that have been received and find products where we can offer an extended warranty. We could also use some Google BigQuery Machine Learning to determine the propensity of individual customers to purchase an extended warranty. The output of this step is a list of new customers with relevant product order information who have a high propensity to purchase an extended warranty.
  2. The second step uses Gemini to generate personalized email content based on the data that is generated in the previous transformation.
  3. The third step will automate the distribution of email warranty offers to those customers using integration with an SMTP service.

Conclusion

Now you can leverage BigQuery Continuous Queries to stream real time Google BigQuery data directly into Keboola and enable the creation of solutions which were not possible before. By harnessing the power of Keboola and BigQuery Continuous Queries, organizations can unlock the full potential of their data and gain a competitive edge. This powerful combination enables real-time data-driven decision-making,streamlined data pipelines, and improved application performance. To learn more about BigQuery Continuous Query click here. Begin your journey with Keboola on the Google Cloud Marketplace today!

Subscribe to our newsletter
Have our newsletter delivered to your inbox.
By subscribing to our newsletter you agree with Keboola Czech s.r.o. Privacy Policy.
You are now subscribed to Keboola newsletter
Oops! Something went wrong while submitting the form.

Recommended Articles

Close Cookie Preference Manager
Cookie Settings
By clicking “Accept All Cookies”, you agree to the storing of cookies on your device to enhance site navigation, analyze site usage and assist in our marketing efforts. More info
Strictly Necessary (Always Active)
Cookies required to enable basic website functionality.
Made by Flinch 77
Oops! Something went wrong while submitting the form.
>