Stream a Kafka topic to Hive

Apache Kafka is an open source distributed streaming platform for real-time data pipelines and data integration. It provides an efficient and scalable streaming system for use in a variety of applications, including:

  • Real-time analytics
  • Stream processing
  • Log aggregation
  • Distributed messaging
  • Event streaming

Objectives

  1. Install Kafka on a Dataproc HA cluster with ZooKeeper (referred to in this tutorial as a "Dataproc Kafka cluster").

  2. Create fictitious customer data, then publish the data to a Kafka topic.

  3. Create Hive parquet and ORC tables in Cloud Storage to receive streamed Kafka topic data.

  4. Submit a PySpark job to subscribe to and stream the Kafka topic into Cloud Storage in parquet and ORC format.

  5. Run a query on the streamed Hive table data to count the streamed Kafka messages.

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator.

New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

Before you begin

If you haven't already done so, create a Google Cloud project.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  9. Click Create.
  10. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
    1. In the Get started section, do the following:
      • Enter a globally unique name that meets the bucket naming requirements.
      • To add a bucket label, expand the Labels section (), click Add label, and specify a key and a value for your label.
    2. In the Choose where to store your data section, do the following:
      1. Select a Location type.
      2. Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
      3. To set up cross-bucket replication, select Add cross-bucket replication via Storage Transfer Service and follow these steps:

        Set up cross-bucket replication

        1. In the Bucket menu, select a bucket.
        2. In the Replication settings section, click Configure to configure settings for the replication job.

          The Configure cross-bucket replication pane appears.

          • To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
          • To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
          • Click Done.
    3. In the Choose how to store your data section, do the following:
      1. Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
      2. To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
    4. In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
    5. In the Choose how to protect object data section, do the following:
      • Select any of the options under Data protection that you want to set for your bucket.
        • To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
        • To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
        • To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
          • To enable Object Retention Lock, click the Enable object retention checkbox.
          • To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
      • To choose how your object data will be encrypted, expand the Data encryption section (), and select a Data encryption method.
  11. Click Create.

Tutorial steps

Perform the following steps to create a Dataproc Kafka cluster to read a Kafka topic into Cloud Storage in parquet OR ORC format.

Copy the Kafka installation script to Cloud Storage

The kafka.sh initialization action script installs Kafka on a Dataproc cluster.

  1. Browse the code.

    #!/bin/bash # Copyright 2015 Google, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud # Dataproc cluster. set -euxo pipefail readonly ZOOKEEPER_HOME=/usr/lib/zookeeper readonly KAFKA_HOME=/usr/lib/kafka readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties' readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)" readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)" readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)" readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/kafka-jmx-port || echo 9999)" readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)" # The first ZooKeeper server address, e.g., "cluster1-m-0:2181". ZOOKEEPER_ADDRESS='' # Integer broker ID of this node, e.g., 0 BROKER_ID='' function retry_apt_command() {  cmd="$1"  for ((i = 0; i < 10; i++)); do  if eval "$cmd"; then  return 0  fi  sleep 5  done  return 1 } function recv_keys() {  retry_apt_command "apt-get install -y gnupg2 &&\  apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C" } function update_apt_get() {  retry_apt_command "apt-get update" } function install_apt_get() {  pkgs="$@"  retry_apt_command "apt-get install -y $pkgs" } function err() {  echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2  return 1 } # Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,". function get_broker_list() {  ${KAFKA_HOME}/bin/zookeeper-shell.sh "${ZOOKEEPER_ADDRESS}" \  <<<"ls /brokers/ids" |  grep '\[.*\]' |  sed 's/\[/ /' |  sed 's/\]/,/' } # Waits for zookeeper to be up or time out. function wait_for_zookeeper() {  for i in {1..20}; do  if "${ZOOKEEPER_HOME}/bin/zkCli.sh" -server "${ZOOKEEPER_ADDRESS}" ls /; then  return 0  else  echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..."  sleep 5  fi  done  echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}" >&2  exit 1 } # Wait until the current broker is registered or time out. function wait_for_kafka() {  for i in {1..20}; do  local broker_list=$(get_broker_list || true)  if [[ "${broker_list}" == *" ${BROKER_ID},"* ]]; then  return 0  else  echo "Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..."  sleep 5  fi  done  echo "Failed to start Kafka broker ${BROKER_ID}." >&2  exit 1 } function install_and_configure_kafka_server() {  # Find zookeeper list first, before attempting any installation.  local zookeeper_client_port  zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg |  tail -n 1 |  cut -d '=' -f 2)  local zookeeper_list  zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg |  cut -d '=' -f 2 |  cut -d ':' -f 1 |  sort |  uniq |  sed "s/$/:${zookeeper_client_port}/" |  xargs echo |  sed "s/ /,/g")  if [[ -z "${zookeeper_list}" ]]; then  # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't  # bother to populate it. Check if YARN HA is configured.  zookeeper_list=$(bdconfig get_property_value --configuration_file \  /etc/hadoop/conf/yarn-site.xml \  --name yarn.resourcemanager.zk-address 2>/dev/null)  fi  # If all attempts failed, error out.  if [[ -z "${zookeeper_list}" ]]; then  err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'  fi  ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"  # Install Kafka from Dataproc distro.  install_apt_get kafka-server || dpkg -l kafka-server ||  err 'Unable to install and find kafka-server.'  mkdir -p /var/lib/kafka-logs  chown kafka:kafka -R /var/lib/kafka-logs  if [[ "${ROLE}" == "Master" ]]; then  # For master nodes, broker ID starts from 10,000.  if [[ "$(hostname)" == *-m ]]; then  # non-HA  BROKER_ID=10000  else  # HA  BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g')))  fi  else  # For worker nodes, broker ID is a random number generated less than 10000.  # 10000 is choosen since the max broker ID allowed being set is 10000.  BROKER_ID=$((RANDOM % 10000))  fi  sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \  "${KAFKA_PROP_FILE}"  sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' \  "${KAFKA_PROP_FILE}"  sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' \  "${KAFKA_PROP_FILE}"  echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}"  echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}"  if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then  sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh  sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh  fi  wait_for_zookeeper  # Start Kafka.  service kafka-server restart  wait_for_kafka } function install_kafka_python_package() {  KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"  if [[ "${INSTALL_KAFKA_PYTHON}" != "true" ]]; then  return  fi  if [[ "$(echo "${DATAPROC_IMAGE_VERSION} > 2.0" | bc)" -eq 1 ]]; then  /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}"; }  else  OS=$(. /etc/os-release && echo "${ID}")  if [[ "${OS}" == "rocky" ]]; then  yum install -y python2-pip  else  apt-get install -y python-pip  fi  pip2 install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; pip2 install "${KAFKA_PYTHON_PACKAGE}"; } || { sleep 10; pip install "${KAFKA_PYTHON_PACKAGE}"; }  fi } function remove_old_backports {  # This script uses 'apt-get update' and is therefore potentially dependent on  # backports repositories which have been archived. In order to mitigate this  # problem, we will remove any reference to backports repos older than oldstable  # https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157  oldstable=$(curl -s https://deb.debian.org/debian/dists/oldstable/Release | awk '/^Codename/ {print $2}');  stable=$(curl -s https://deb.debian.org/debian/dists/stable/Release | awk '/^Codename/ {print $2}');  matched_files="$(grep -rsil '\-backports' /etc/apt/sources.list*)"  if [[ -n "$matched_files" ]]; then  for filename in "$matched_files"; do  grep -e "$oldstable-backports" -e "$stable-backports" "$filename" || \  sed -i -e 's/^.*-backports.*$//' "$filename"  done  fi } function main() {  OS=$(. /etc/os-release && echo "${ID}")  if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} <= 2.1" | bc -l) == 1 ]]; then  remove_old_backports  fi  recv_keys || err 'Unable to receive keys.'  update_apt_get || err 'Unable to update packages lists.'  install_kafka_python_package  # Only run the installation on workers; verify zookeeper on master(s).  if [[ "${ROLE}" == 'Master' ]]; then  service zookeeper-server status ||  err 'Required zookeeper-server not running on master!'  if [[ "${RUN_ON_MASTER}" == "true" ]]; then  # Run installation on masters.  install_and_configure_kafka_server  else  # On master nodes, just install kafka command-line tools and libs but not  # kafka-server.  install_apt_get kafka ||  err 'Unable to install kafka libraries on master!'  fi  else  # Run installation on workers.  install_and_configure_kafka_server  fi } main 

  2. Copy the kafka.sh initialization action script to your Cloud Storage bucket. This script installs Kafka on a Dataproc cluster.

    1. Open Cloud Shell, then run the following command:

       gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/ 

      Make the following replacements:

      • REGION: kafka.sh is stored in public regionally-tagged buckets in Cloud Storage. Specify a geographically close Compute Engine region, (example: us-central1).
      • BUCKET_NAME: The name of your Cloud Storage bucket.

Create a Dataproc Kafka cluster

  1. Open Cloud Shell, then run the following gcloud dataproc clusters create command to create a Dataproc HA cluster cluster that installs the Kafka and ZooKeeper components:

     gcloud dataproc clusters create KAFKA_CLUSTER \     --project=PROJECT_ID \     --region=REGION \     --image-version=2.1-debian11 \     --num-masters=3 \     --enable-component-gateway \     --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh 

    Notes:

    • KAFKA_CLUSTER: The cluster name, which must be unique within a project. The name must start with a lowercase letter, and can contain up to 51 lowercase letters, numbers, and hyphens. It cannot end with a hyphen. The name of a deleted cluster can be reused.
    • PROJECT_ID: The project to associate with this cluster.
    • REGION: The Compute Engine region where the cluster will be located, such as us-central1.
      • You can add the optional --zone=ZONE flag to specify a zone within the specified region, such as us-central1-a. If you do not specify a zone, the Dataproc autozone placement feature selects a zone with the specified region.
    • --image-version: Dataproc image version 2.1-debian11 is recommended for this tutorial. Note: Each image version contains a set of pre-installed components, including the Hive component used in this tutorial (see Supported Dataproc image versions).
    • --num-master: 3 master nodes create an HA cluster. The Zookeeper component, which is required by Kafka, is pre-installed on an HA cluster.
    • --enable-component-gateway: Enables the Dataproc Component Gateway.
    • BUCKET_NAME: The name of your Cloud Storage bucket that contains the /scripts/kafka.sh initialization script (see Copy the Kafka installation script to Cloud Storage).

Create a Kafka custdata topic

To create a Kafka topic on the Dataproc Kafka cluster:

  1. Use the SSH utility to open a terminal window on the cluster master VM.

  2. Create a Kafka custdata topic.

     /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --create --topic custdata 

    Notes:

    • KAFKA_CLUSTER: Insert the name of your Kafka cluster. -w-0:9092 signifies the Kafka broker running on port 9092 on the worker-0 node.

    • You can run the following commands after creating the custdata topic:

       # List all topics. /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --list 
      # Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \     --broker-list KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --delete --topic custdata

Publish content to the Kafka custdata topic

The following script uses the kafka-console-producer.sh Kafka tool to generate fictitious customer data in CSV format.

  1. Copy, then paste the script in the SSH terminal on the master node of your Kafka cluster. Press <return> to run the script.

     for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:" 

    Notes:

    • KAFKA_CLUSTER: The name of your Kafka cluster.
  2. Run the following Kafka command to confirm the custdata topic contains 10,000 messages.

     /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata 

    Notes:

    • KAFKA_CLUSTER: The name of your Kafka cluster.

    Expected output:

     custdata:0:10000 

Create Hive tables in Cloud Storage

Create Hive tables to receive streamed Kafka topic data. Perform the following steps to create cust_parquet (parquet) and a cust_orc (ORC) Hive tables in your Cloud Storage bucket.

  1. Insert your BUCKET_NAME in the following script, then copy and paste the script into the SSH terminal on your Kafka cluster master node, then press <return> to create a ~/hivetables.hql (Hive Query Language) script.

    You will run the ~/hivetables.hql script in the next step to create parquet and ORC Hive tables in your Cloud Storage bucket.

     cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; 

    drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
  2. In the SSH terminal on the master node of your Kafka cluster, submit the ~/hivetables.hql Hive job to create cust_parquet (parquet) and a cust_orc (ORC) Hive tables in your Cloud Storage bucket.

     gcloud dataproc jobs submit hive \     --cluster=KAFKA_CLUSTER \     --region=REGION \     -f ~/hivetables.hql 

    Notes:

    • The Hive component is pre-installed on the Dataproc Kafka cluster. See 2.1.x release versions for a list of the Hive component versions included in recently released 2.1 images.
    • KAFKA_CLUSTER: The name of your Kafka cluster.
    • REGION: The region where your Kafka cluster is located.

Stream Kafka custdata to Hive tables

  1. Run the following command in the in the SSH terminal on the master node of your Kafka cluster to install the kafka-python library. A Kafka client is needed to stream Kafka topic data to Cloud Storage.
     pip install kafka-python 
  2. Insert your BUCKET_NAME, then copy then paste the following PySpark code into the SSH terminal on your Kafka cluster master node, and then press <return> to create a streamdata.py file.

    The script subscribes to the Kafka custdata topic, then streams the data to your Hive tables in Cloud Storage. The output format, which can be parquet or ORC, is passed into the script as a parameter.

    cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF 
  3. In the SSH terminal on the master node of your Kafka cluster, run spark-submit to stream data to your Hive tables in Cloud Storage.

    1. Insert the name of your KAFKA_CLUSTER and the output FORMAT, then copy and paste the following code into the SSH terminal on the master node of your Kafka cluster, and then press <return> to run the code and stream the Kafka custdata data in parquet format to your Hive tables in Cloud Storage.

       spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \     --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \     --conf spark.driver.memory=4096m \     --conf spark.executor.cores=2 \     --conf spark.executor.instances=2 \     --conf spark.executor.memory=6144m \     streamdata.py KAFKA_CLUSTER FORMAT 

      Notes:

      • KAFKA_CLUSTER: Insert the name of your Kafka cluster.
      • FORMAT: Specify either parquet or orc as the output format. You can run the command successively to stream both formats to the Hive tables: for example, in the first invocation, specify parquet to stream the Kafka custdata topic to the Hive parquet table; then, in second invocation, specify orc format to stream custdata to the Hive ORC table.
  4. After standard output halts in the SSH terminal, which signifies that all of the custdata has been streamed, press <control-c> in the SSH terminal to stop the process.

  5. List the Hive tables in Cloud Storage.

     gcloud storage ls gs://BUCKET_NAME/tables/* --recursive 

    Notes:

    • BUCKET_NAME: Insert the name of the Cloud Storage bucket that contains your Hive tables (see Create Hive tables).

Query streamed data

  1. In the SSH terminal on the master node of your Kafka cluster, run the following hive command to count the streamed Kafka custdata messages in the Hive tables in Cloud Storage.

     hive -e "select count(1) from TABLE_NAME" 

    Notes:

    • TABLE_NAME: Specify either cust_parquet or cust_orc as the Hive table name.

    Expected output snippet:

... Status: Running (Executing on YARN cluster with App id application_....) ----------------------------------------------------------------------------------------------  VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 1 .......... container SUCCEEDED 1 1 0 0 0 0 Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0 ---------------------------------------------------------------------------------------------- VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s ---------------------------------------------------------------------------------------------- OK 10000 Time taken: 21.394 seconds, Fetched: 1 row(s) 

Clean up

Delete the project

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete resources

  • In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  • Click the checkbox for the bucket that you want to delete.
  • To delete the bucket, click Delete, and then follow the instructions.
  • Delete your Kafka cluster:
     gcloud dataproc clusters delete KAFKA_CLUSTER \     --region=${REGION}