This tutorial shows how to write messages from your Managed Service for Apache Kafka cluster to BigQuery using Kafka Connect.
In this tutorial, you create a Connect cluster, and then use a BigQuery sink connector to write to an existing BigQuery table. For this scenario, the BigQuery table defines the schema for the Kafka messages; the Kafka messages must match the table schema. For more information, see Schemas for the BigQuery sink connector.
Before you begin
Console
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-  In the Google Cloud console, on the project selector page, select or create a Google Cloud project. Roles required to select or create a project - Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-  Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
 
-  Verify that billing is enabled for your Google Cloud project. 
-  Enable the Managed Kafka API. Roles required to enable APIs To enable APIs, you need the Service Usage Admin IAM role ( roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
-  In the Google Cloud console, on the project selector page, select or create a Google Cloud project. Roles required to select or create a project - Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-  Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
 
-  Verify that billing is enabled for your Google Cloud project. 
-  Enable the Managed Kafka API. Roles required to enable APIs To enable APIs, you need the Service Usage Admin IAM role ( roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
-  Make sure that you have the following role or roles on the project: Managed Kafka Cluster Editor, Managed Kafka Connect Cluster Editor, Managed Kafka Connector Editor, Managed Kafka Topic Editor, BigQuery Data Owner Check for the roles-  In the Google Cloud console, go to the IAM page. Go to IAM
- Select the project.
-  In the Principal column, find all rows that identify you or a group that you're included in. To learn which groups you're included in, contact your administrator. 
- For all rows that specify or include you, check the Role column to see whether the list of roles includes the required roles.
 Grant the roles-  In the Google Cloud console, go to the IAM page. Go to IAM
- Select the project.
- Click Grant access.
-  In the New principals field, enter your user identifier. This is typically the email address for a Google Account. 
- In the Select a role list, select a role.
- To grant additional roles, click Add another role and add each additional role.
- Click Save.
 
-  
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-  Install the Google Cloud CLI. 
-  If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity. 
-  To initialize the gcloud CLI, run the following command: gcloud init
-  Create or select a Google Cloud project. Roles required to select or create a project - Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-  Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
 -  Create a Google Cloud project: gcloud projects create PROJECT_ID Replace PROJECT_IDwith a name for the Google Cloud project you are creating.
-  Select the Google Cloud project that you created: gcloud config set project PROJECT_ID Replace PROJECT_IDwith your Google Cloud project name.
 
-  Verify that billing is enabled for your Google Cloud project. 
-  Enable the Managed Kafka API: Roles required to enable APIs To enable APIs, you need the Service Usage Admin IAM role ( roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloud services enable managedkafka.googleapis.com 
-  Install the Google Cloud CLI. 
-  If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity. 
-  To initialize the gcloud CLI, run the following command: gcloud init
-  Create or select a Google Cloud project. Roles required to select or create a project - Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-  Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
 -  Create a Google Cloud project: gcloud projects create PROJECT_ID Replace PROJECT_IDwith a name for the Google Cloud project you are creating.
-  Select the Google Cloud project that you created: gcloud config set project PROJECT_ID Replace PROJECT_IDwith your Google Cloud project name.
 
-  Verify that billing is enabled for your Google Cloud project. 
-  Enable the Managed Kafka API: Roles required to enable APIs To enable APIs, you need the Service Usage Admin IAM role ( roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloud services enable managedkafka.googleapis.com 
-  Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedkafka.clusterEditor, roles/managedkafka.connectClusterEditor, roles/managedkafka.connectorEditor, roles/managedkafka.topicEditor, roles/bigquery.dataOwnergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE Replace the following: - PROJECT_ID: Your project ID.
- USER_IDENTIFIER: The identifier for your user account. For example,- myemail@example.com.
- ROLE: The IAM role that you grant to your user account.
 
Create a BigQuery table
In this step, you create a BigQuery table with the following schema:
| Column name | Data type | 
|---|---|
| name | STRING | 
| id | INTEGER | 
Create a dataset
o create a BigQuery dataset, follow these steps:
Console
- Open the BigQuery page. 
- In the Explorer panel, select the project where you want to create the dataset. 
- Expand the View actions option and click Create dataset. 
- On the Create dataset page: - For Dataset ID, enter a name for the dataset. 
- For Location type, choose a geographic location for the dataset. 
 
gcloud
To create a new dataset, use the bq mk command with the --dataset flag.
bq mk --location REGION \  --dataset PROJECT_ID:DATASET_NAME Replace the following:
- PROJECT_ID: your project ID
- DATASET_NAME: the name of the dataset
- REGION: the dataset's location
For more information, see Create datasets.
Create a table with a schema
Next, create a new BigQuery table with a schema:
Console
- Go to the BigQuery page. 
- In the Explorer panel, expand your project, and then select a dataset. 
- In the Dataset info section, click Create table. 
- In the Create table from list, select Empty table. 
- In the Table box, enter the name of the table. 
- In the Schema section, click Edit as text. 
- Paste in the following schema definition: - name:STRING, id:INTEGER
- Click Create table. 
gcloud
To create a new table, use the bq mk command with the --table flag.
bq mk --table \  PROJECT_ID:DATASET_NAME.TABLE_NAME \  name:STRING,id:INTEGER Replace the following:
- PROJECT_ID: your project ID
- DATASET_NAME: the name of the dataset
- TABLE_NAME: the name of the table to create
For more information, see Create an empty table with a schema definition.
By default, the BigQuery sink connector uses the topic name as the BigQuery table name. You can override this behavior by setting the topic2TableMap configuration property. For more information, see How a BigQuery Sink connector works.
Create Managed Service for Apache Kafka resources
In this section, you create the following Managed Service for Apache Kafka resources:
- A Kafka cluster with a topic.
- A Connect cluster with a BigQuery sink connector.
Create a Kafka cluster
In this step, you create a Managed Service for Apache Kafka cluster. Creating a cluster can take up to 30 minutes.
Console
- Go to the Managed Service for Apache Kafka > Clusters page. 
- Click Create. 
- In the Cluster name box, enter a name for the cluster. 
- In the Region list, select a location for the cluster. Choose the same region as the BigQuery table. 
- For Network configuration, configure the subnet where the cluster is accessible: - For Project, select your project.
- For Network, select the VPC network.
- For Subnet, select the subnet.
- Click Done.
 
- Click Create. 
While the cluster is being created, the cluster state is Creating. When the cluster has finished being created, the state is Active.
gcloud
To create a Kafka cluster, run the managed-kafka clusters create command.
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async Replace the following:
- KAFKA_CLUSTER: a name for the Kafka cluster
- REGION: the location of the cluster; choose the same region as the BigQuery table
- PROJECT_ID: your project ID
- SUBNET_NAME: the subnet where you want to create the cluster, for example- default
For information about supported locations, see Managed Service for Apache Kafka locations.
The command runs asynchronously and returns an operation ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status. To track the progress of the create operation, use the gcloud managed-kafka operations describe command:
gcloud managed-kafka operations describe OPERATION_ID \  --location=REGION For more information, see Monitor the cluster creation operation.
Create a Kafka topic
After the Managed Service for Apache Kafka cluster is created, create a Kafka topic.Give the topic the same name as the BigQuery table.
Console
- Go to the Managed Service for Apache Kafka > Clusters page. 
- Click the name of the cluster. 
- In the cluster details page, click Create Topic. 
- In the Topic name box, enter the same name as your BigQuery table. 
- Click Create. 
gcloud
To create a topic, run the managed-kafka topics create command.
gcloud managed-kafka topics create TABLE_NAME \ --cluster=KAFKA_CLUSTER \ --location=REGION \ --partitions=10 \ --replication-factor=3 Replace the following:
- TABLE_NAME: the name of your BigQuery table, which is also the topic name
- KAFKA_CLUSTER: the name of the Kafka cluster
- REGION: the region where you created the Kafka cluster
Create a Connect cluster
In this step, you create a Connect cluster. Creating a Connect cluster can take up to 30 minutes.
Before you start this step, make sure the Managed Service for Apache Kafka cluster is fully created.
Console
- Go to the Managed Service for Apache Kafka > Connect Clusters page. 
- Click Create. 
- For the Connect cluster name, enter a string. Example: - my-connect-cluster.
- For Primary Kafka cluster, select the Kafka that you created earlier. 
- Click Create. 
While the cluster is being created, the cluster state is Creating. When the cluster has finished being created, the state is Active.
gcloud
To create a Connect cluster, run the gcloud managed-kafka connect-clusters create command.
gcloud managed-kafka connect-clusters create CONNECT_CLUSTER \  --location=REGION \  --cpu=12 \  --memory=12GiB \  --primary-subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \  --kafka-cluster=KAFKA_CLUSTER \  --async Replace the following:
- CONNECT_CLUSTER: a name for the Connect cluster
- REGION: the region where you created the Kafka cluster
- PROJECT_ID: your project ID
- SUBNET_NAME: the subnet where you created the Kafka cluster
- KAFKA_CLUSTER: the name of your Kafka cluster
The command runs asynchronously and returns an operation ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status. To track the progress of the create operation, use the gcloud managed-kafka operations describe command:
gcloud managed-kafka operations describe OPERATION_ID \  --location=REGION For more information, see Monitor the cluster creation operation.
Grant IAM roles
Grant the BigQuery Data Editor Identity and Access Management (IAM) role to the Managed Kafka service account. This role allows the connector to write to the BigQuery table.
Console
- In the Google Cloud console, go to the IAM page. 
- Select Include Google-provided role grants. 
- Find the row for Managed Kafka Service Account and click Edit principal. 
- Click Add another role and select the role BigQuery Data Editor. 
- Click Save. 
For more information about granting roles, see Grant an IAM role by using the console.
gcloud
To grant IAM roles to the service account, run the gcloud projects add-iam-policy-binding command.
gcloud projects add-iam-policy-binding PROJECT_ID \  --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \  --role=roles/bigquery.dataEditor Replace the following:
- PROJECT_ID: your project ID
- PROJECT_NUMBER: your project number
To find your project number, use the gcloud projects describe command.
Create a BigQuery sink connector
In this step, you create a BigQuery Sink connector. This connector reads messages from one or more topics and writes them to BigQuery.
Console
- Go to the Managed Service for Apache Kafka > Connect Clusters page. 
- Click the name of the Connect cluster. 
- Click Create connector. 
- For the Connector name, enter a string. Example: - bigquery-connector.
- In the Connector plugin list, select - BigQuery Sink.
- For Topics, select the topic that you created previously and click OK. 
- For Dataset, enter the name of the BigQuery dataset, in the following format: - PROJECT_ID.DATASET_NAME. Example:- my-project.dataset1.
- In the Configurations edit box, add the following line: - bigQueryPartitionDecorator=false
- Click Create. 
gcloud
To create a BigQuery Sink connector, run the gcloud managed-kafka connectors create command.
gcloud managed-kafka connectors create CONNECTOR_NAME \  --location=REGION \  --connect-cluster=CONNECT_CLUSTER \  --configs=connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector,\ key.converter=org.apache.kafka.connect.storage.StringConverter,\ value.converter=org.apache.kafka.connect.json.JsonConverter,\ value.converter.schemas.enable=false,\ tasks.max=3,\ project=PROJECT_ID,\ defaultDataset=DATASET_NAME,\ topics=TABLE_NAME,\ bigQueryPartitionDecorator=false Replace the following:
- CONNECTOR_NAME: a name for the connector, such as- bigquery-connector
- CONNECT_CLUSTER: the name of your Connect cluster
- REGION: the region where you created the Connect cluster
- PROJECT_ID: your project ID
- DATASET_NAME: the name of your BigQuery dataset.
- TABLE_NAME: the name of your Kafka topic, which is the same as the BigQuery table.
Setting the bigQueryPartitionDecorator configuration parameter to false prevents the connector from adding a partition decorator (such as "$"yyyyMMdd") to the table name.
View results
To view the results, send messages to the Kafka topic. Use the following format for the message body:
{ "name": "STRING_VALUE", "id": INTEGER_VALUE } There are various ways to send messages to Managed Service for Apache Kafka, including:
- Use the Kafka command-line tools.
- Use a Java producer.
- Use a Python producer
- Stream messages from Pub/Sub by using Kafka Connect
To view the records in BigQuery, run a query on the table as follows:
Console
- Open the BigQuery page. 
- In the query editor, run the following query: - SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000- Replace the following variables: - PROJECT_ID: the name of your Google Cloud project
- DATASET_NAME: the name of your BigQuery dataset
- TABLE_NAME: the name of your BigQuery table
 
gcloud
Use the bq query command to run a query on the table:
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`' Replace the following variables:
- PROJECT_ID: the name of your Google Cloud project
- DATASET_NAME: the name of your BigQuery dataset
- TABLE_NAME: the name of your BigQuery table
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Console
- Delete the Connect cluster. - Go to the Managed Service for Apache Kafka > Connect Clusters page. 
- Select the Connect cluster and click Delete. 
 
- Delete the Kafka cluster. - Go to the Managed Service for Apache Kafka > Clusters page. 
- Select the Kafka cluster and click Delete. 
 
- Delete the BigQuery table and dataset. - Go to the BigQuery page. 
- In the Explorer pane, expand your project and select a dataset. 
- Expand the Actions option and click Delete. 
- In the Delete dataset dialog, type - deleteinto the field, and then click Delete.
 
gcloud
- To delete the Connect cluster, use the - gcloud managed-kafka connect-clusters deletecommand.- gcloud managed-kafka connect-clusters delete CONNECT_CLUSTER \ --location=REGION --async
- To delete the Kafka cluster, use the - gcloud managed-kafka clusters deletecommand.- gcloud managed-kafka clusters delete KAFKA_CLUSTER \ --location=REGION --async
- To delete both the BigQuery dataset and the BigQuery table, use the - bq rmcommand.- bq rm --recursive --dataset PROJECT_ID:DATASET_NAME
What's next
- Troubleshoot a BigQuery sink connector.
- Learn more about BigQuery sink connectors.
- Learn more about Kafka Connect.