Parallel Report Download

  • The code samples demonstrate parallel downloading of Google Ads reports for multiple customer IDs using the Google Ads API and asynchronous programming for efficiency.

  • Key functionalities include configuration, query definition, parallel execution, result handling, error management, and console output summarizing the download outcomes.

  • Implementations are provided in Java, C#, Python, Ruby, and Perl (with caveats), each leveraging language-specific features for parallel processing.

  • Important considerations include Google Ads API rate limits, resource management when using multi-threading/multiprocessing, and data synchronization for shared resources.

  • The primary goal is to improve efficiency by dividing the workload into smaller, concurrently executed tasks, while incorporating robust error handling and result aggregation.

Java

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.ads.googleads.examples.reporting; import com.beust.jcommander.Parameter; import com.google.ads.googleads.examples.utils.ArgumentNames; import com.google.ads.googleads.examples.utils.CodeSampleParams; import com.google.ads.googleads.lib.GoogleAdsClient; import com.google.ads.googleads.v22.errors.GoogleAdsError; import com.google.ads.googleads.v22.errors.GoogleAdsException; import com.google.ads.googleads.v22.services.GoogleAdsServiceClient; import com.google.ads.googleads.v22.services.SearchGoogleAdsStreamRequest; import com.google.ads.googleads.v22.services.SearchGoogleAdsStreamResponse; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; /**  * Shows how to download a set of reports from a list of accounts in parallel.  *  * <p>If you need to obtain a list of accounts, please see the {@link  * com.google.ads.googleads.examples.accountmanagement.GetAccountHierarchy} or {@link  * com.google.ads.googleads.examples.accountmanagement.ListAccessibleCustomers} examples.  */ public class ParallelReportDownload {  // Adjust as required.  /** Defines the Google Ads Query Language (GAQL) query strings to run for each customer ID. */  private static final List<String> GAQL_QUERY_STRINGS =  ImmutableList.of(  "SELECT campaign.id, metrics.impressions, metrics.clicks"  + " FROM campaign"  + " WHERE segments.date DURING LAST_30_DAYS",  "SELECT campaign.id, ad_group.id, metrics.impressions, metrics.clicks"  + " FROM ad_group"  + " WHERE segments.date DURING LAST_30_DAYS");  private static class ParallelReportDownloadParams extends CodeSampleParams {  @Parameter(  names = ArgumentNames.CUSTOMER_IDS,  required = true,  description = "Specify a comma-separated list of customer IDs to downloads reports from.")  List<Long> customerIds;  @Parameter(  names = ArgumentNames.LOGIN_CUSTOMER_ID,  description =  "Optionally specify the manager account ID which provides access to the customer IDs")  Long loginCustomerId;  }  public static void main(String[] args) throws InterruptedException {  ParallelReportDownloadParams params = new ParallelReportDownloadParams();  if (!params.parseArguments(args)) {  // Either pass the required parameters for this example on the command line, or insert them  // into the code here. See the parameter class definition above for descriptions.  params.customerIds = ImmutableList.of(Long.valueOf("INSERT CUSTOMER IDS"));  // Optionally specify the login customer ID if your access to the CIDs is via a manager  // account.  // params.loginCustomerId = Long.parseLong("INSERT_LOGIN_CUSTOMER_ID");  }  GoogleAdsClient googleAdsClient = null;  try {  GoogleAdsClient.Builder builder = GoogleAdsClient.newBuilder().fromPropertiesFile();  if (params.loginCustomerId != null) {  builder.setLoginCustomerId(params.loginCustomerId);  }  googleAdsClient = builder.build();  } catch (FileNotFoundException fnfe) {  System.err.printf(  "Failed to load GoogleAdsClient configuration from file. Exception: %s%n", fnfe);  return;  } catch (IOException ioe) {  System.err.printf("Failed to create GoogleAdsClient. Exception: %s%n", ioe);  return;  }  try {  new ParallelReportDownload().runExample(googleAdsClient, params.customerIds);  } catch (GoogleAdsException gae) {  // GoogleAdsException is the base class for most exceptions thrown by an API request.  // Instances of this exception have a message and a GoogleAdsFailure that contains a  // collection of GoogleAdsErrors that indicate the underlying causes of the  // GoogleAdsException.  System.err.printf(  "Request ID %s failed due to GoogleAdsException. Underlying errors:%n",  gae.getRequestId());  int i = 0;  for (GoogleAdsError googleAdsError : gae.getGoogleAdsFailure().getErrorsList()) {  System.err.printf(" Error %d: %s%n", i++, googleAdsError);  }  }  }  /**  * Runs the example.  *  * @param googleAdsClient the client library instance for API access.  * @param customerIds the customer IDs to run against.  */  private void runExample(GoogleAdsClient googleAdsClient, List<Long> customerIds)  throws InterruptedException {  // Creates a single client which can be shared by all threads.  // gRPC handles multiplexing parallel requests to the underlying API connection.  try (GoogleAdsServiceClient serviceClient =  googleAdsClient.getLatestVersion().createGoogleAdsServiceClient()) {  // IMPORTANT: You should avoid hitting the same customer ID in parallel. There are rate limits  // at the customer ID level which are much stricter than limits at the developer token level.  // Hitting these limits frequently enough will significantly reduce throughput as the client  // library will automatically retry with exponential back-off before failing the request.  for (String gaqlQuery : GAQL_QUERY_STRINGS) {  // Uses a list of futures to make sure that we wait for this report to complete on all  // customer IDs before proceeding. The Future data type here is just for demonstration.  List<ListenableFuture<ReportSummary>> futures = new ArrayList<>();  // Uses the API to retrieve the report for each customer ID.  for (Long customerId : customerIds) {  // Uses the gRPC asynchronous API to download the reports in parallel. This saves having  // to create/manage our own thread pool.  ResponseCountingObserver responseObserver = new ResponseCountingObserver(customerId);  // Starts the report download in a background thread.  serviceClient  .searchStreamCallable()  .call(  SearchGoogleAdsStreamRequest.newBuilder()  .setCustomerId(customerId.toString())  .setQuery(gaqlQuery)  .build(),  responseObserver);  // Stores a future to retrieve the results.  futures.add(responseObserver.asFuture());  }  // Waits for all pending requests to the current set of customer IDs to complete.  //  // This is a naive implementation for illustrative purposes. It is possible to optimize the  // utilization of each customer ID by providing a queue of work (or similar). However, this  // would complicate the example code and so is omitted here.  List<ReportSummary> results = Futures.allAsList(futures).get();  System.out.println("Report results for query: " + gaqlQuery);  results.forEach(System.out::println);  }  } catch (ExecutionException e) {  throw new RuntimeException(e);  }  }  /** An observer which records a simple count of the result rows received. */  private static class ResponseCountingObserver  implements ResponseObserver<SearchGoogleAdsStreamResponse> {  private final long customerId;  private final SettableFuture<ReportSummary> future = SettableFuture.create();  private final AtomicLong numResponses = new AtomicLong(0);  ResponseCountingObserver(long customerId) {  this.customerId = customerId;  }  @Override  public void onStart(StreamController controller) {  // Nothing to do here.  }  @Override  public void onResponse(SearchGoogleAdsStreamResponse response) {  // Does something useful with the response. In this case we just count the responses, but  // could also write the response to a database/file, pass the response on to another method  // for further processing, etc.  numResponses.incrementAndGet();  // Note: this method may be called from multiple threads, though responses will always arrive  // in the same order as returned by the API.  }  @Override  public void onError(Throwable t) {  // Notify that this report failed.  notifyResultReady(new ReportSummary(customerId, numResponses.get(), t));  }  @Override  public void onComplete() {  // Notify that this report succeeded.  notifyResultReady(new ReportSummary(customerId, numResponses.get()));  }  /** Sets the value on the future and unblocks any threads waiting for result. */  private void notifyResultReady(ReportSummary summary) {  future.set(summary);  }  /** Gets a {@link ListenableFuture} which represents the result of this stream. */  ListenableFuture<ReportSummary> asFuture() {  return future;  }  }  /** Summarizes the result of a reporting API call. */  private static class ReportSummary {  private final Long customerId;  private final long numResponses;  private final Throwable throwable;  ReportSummary(Long customerId, long numResponses, Throwable throwable) {  this.customerId = customerId;  this.throwable = throwable;  this.numResponses = numResponses;  }  ReportSummary(Long customerId, long numResponses) {  this(customerId, numResponses, null);  }  boolean isSuccess() {  return throwable == null;  }  @Override  public String toString() {  return "Customer ID '"  + customerId  + "' Number of responses: "  + numResponses  + " IsSuccess? "  + (isSuccess() ? "Yes!" : "No :-( Why? " + throwable.getMessage());  }  } }   

C#

// Copyright 2020 Google LLC. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using CommandLine; using Google.Ads.Gax.Examples; using Google.Ads.GoogleAds.Lib; using Google.Ads.GoogleAds.V22.Errors; using Google.Ads.GoogleAds.V22.Services; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Google.Ads.GoogleAds.Examples.V22 {  /// <summary>  /// Shows how to download a set of reports from a list of accounts in parallel. If you need  /// to obtain a list of accounts, please see the GetAccountHierarchy or  /// ListAccessibleCustomers examples.";  /// </summary>  public class ParallelReportDownload : ExampleBase  {  /// <summary>  /// Command line options for running the <see cref="ParallelReportDownload"/> example.  /// </summary>  public class Options : OptionsBase  {  /// <summary>  /// The Google Ads customer Id.  /// </summary>  [Option("customerIds", Required = true, HelpText =  "The Google Ads customer IDs for which the call is made.")]  public IEnumerable<long> CustomerIds { get; set; }  /// <summary>  /// Optional login customer ID if your access to the CIDs is via a manager account.  /// </summary>  [Option("loginCustomerId", Required = false, HelpText =  "Optional login customer ID if your access to the CIDs is via a manager account.")]  public long? LoginCustomerId { get; set; }  }  /// <summary>  /// Main method, to run this code example as a standalone application.  /// </summary>  /// <param name="args">The command line arguments.</param>  public static void Main(string[] args)  {  Options options = ExampleUtilities.ParseCommandLine<Options>(args);  ParallelReportDownload codeExample = new ParallelReportDownload();  Console.WriteLine(codeExample.Description);  codeExample.Run(new GoogleAdsClient(), options.CustomerIds.ToArray(),  options.LoginCustomerId);  }  // Defines the Google Ads Query Language (GAQL) query strings to run for each customer ID.  private readonly Dictionary<string, string> GAQL_QUERY_STRINGS =  new Dictionary<string, string>()  {  {  "Campaign Query",  @"SELECT campaign.id, metrics.impressions, metrics.clicks  FROM campaign  WHERE segments.date DURING LAST_30_DAYS"  },  {  "Ad Group Query",  @"SELECT campaign.id, ad_group.id, metrics.impressions, metrics.clicks  FROM ad_group  WHERE segments.date DURING LAST_30_DAYS"  }  };  /// <summary>  /// Returns a description about the code example.  /// </summary>  public override string Description =>  "Shows how to download a set of reports from a list of accounts in parallel. If you " +  "need to obtain a list of accounts, please see the GetAccountHierarchy or " +  "ListAccessibleCustomers examples.";  /// <summary>  /// Runs the code example.  /// </summary>  /// <param name="client">The Google Ads client.</param>  /// <param name="customerIds">The Google Ads customer Id.</param>  /// <param name="loginCustomerId">Optional login customer ID if your access to the CIDs  /// is via a manager account.</param>  public void Run(GoogleAdsClient client, long[] customerIds, long? loginCustomerId)  {  // If a manager ID is supplied, update the login credentials.  if (loginCustomerId.HasValue)  {  client.Config.LoginCustomerId = loginCustomerId.ToString();  }  // Get the GoogleAdsService. A single client can be shared by all threads.  GoogleAdsServiceClient googleAdsService = client.GetService(  Services.V22.GoogleAdsService);  try  {  // Begin downloading reports and block program termination until complete.  Task task = RunDownloadParallelAsync(googleAdsService, customerIds);  task.Wait();  }  catch (GoogleAdsException e)  {  Console.WriteLine("Failure:");  Console.WriteLine($"Message: {e.Message}");  Console.WriteLine($"Failure: {e.Failure}");  Console.WriteLine($"Request ID: {e.RequestId}");  throw;  }  }  /// <summary>  /// Initiate all download requests, wait for their completion, and report the results.  /// </summary>  /// <param name="googleAdsService">The Google Ads service.</param>  /// <param name="customerIds">The list of customer IDs from which to request data.</param>  /// <returns>The asynchronous operation.</returns>  private async Task RunDownloadParallelAsync(  GoogleAdsServiceClient googleAdsService, long[] customerIds)  {  // List of all requests to ensure that we wait for the reports to complete on all  // customer IDs before proceeding.  ConcurrentBag<Task<bool>> tasks =  new ConcurrentBag<Task<bool>>();  // Collection of downloaded responses.  ConcurrentBag<ReportDownload> responses = new ConcurrentBag<ReportDownload>();  // IMPORTANT: You should avoid hitting the same customer ID in parallel. There are rate  // limits at the customer ID level which are much stricter than limits at the developer  // token level. Hitting these limits frequently enough will significantly reduce  // throughput as the client library will automatically retry with exponential back-off  // before failing the request.  Parallel.ForEach(GAQL_QUERY_STRINGS, query =>  {  Parallel.ForEach(customerIds, customerId =>  {  Console.WriteLine($"Requesting {query.Key} for CID {customerId}.");  // Issue an asynchronous search request and add it to the list of requests  // in progress.  tasks.Add(DownloadReportAsync(googleAdsService, customerId, query.Key,  query.Value, responses));  });  }  );  Console.WriteLine($"Awaiting results from {tasks.Count} requests...\n");  // Proceed only when all requests have completed.  await Task.WhenAll(tasks);  // Give a summary report for each successful download.  foreach (ReportDownload reportDownload in responses)  {  Console.WriteLine(reportDownload);  }  }  /// <summary>  /// Initiates one asynchronous report download.  /// </summary>  /// <param name="googleAdsService">The Google Ads service client.</param>  /// <param name="customerId">The customer ID from which data is requested.</param>  /// <param name="queryKey">The name of the query to be downloaded.</param>  /// <param name="queryValue">The query for the download request.</param>  /// <param name="responses">Collection of all successful report downloads.</param>  /// <returns>The asynchronous operation.</returns>  /// <exception cref="GoogleAdsException">Thrown if errors encountered in the execution of  /// the request.</exception>  private Task<bool> DownloadReportAsync(  GoogleAdsServiceClient googleAdsService, long customerId, string queryKey,  string queryValue, ConcurrentBag<ReportDownload> responses)  {  try  {  // Issue an asynchronous download request.  googleAdsService.SearchStream(  customerId.ToString(), queryValue,  delegate (SearchGoogleAdsStreamResponse resp)  {  // Store the results.  responses.Add(new ReportDownload()  {  CustomerId = customerId,  QueryKey = queryKey,  Response = resp  });  }  );  return Task.FromResult(true);  }  catch (AggregateException ae)  {  Console.WriteLine($"Download failed for {queryKey} and CID {customerId}!");  GoogleAdsException gae = GoogleAdsException.FromTaskException(ae);  var download = new ReportDownload()  {  CustomerId = customerId,  QueryKey = queryKey,  Exception = gae  };  if (gae != null)  {  Console.WriteLine($"Message: {gae.Message}");  Console.WriteLine($"Failure: {gae.Failure}");  Console.WriteLine($"Request ID: {gae.RequestId}");  download.Exception = gae;  }  else  {  download.Exception = ae;  }  responses.Add(download);  return Task.FromResult(false);  }  }  /// <summary>  /// Stores a result from a reporting API call. In this case we simply report a count of  /// the responses, but one could also write the response to a database/file, pass the  /// response on to another method for further processing, etc.  /// </summary>  private class ReportDownload  {  internal long CustomerId { get; set; }  internal string QueryKey { get; set; }  internal SearchGoogleAdsStreamResponse Response { get; set; }  internal Exception Exception { get; set; }  public override string ToString()  {  if (Exception != null)  {  return $"Download failed for {QueryKey} and CID {CustomerId}. " +  $"Exception: {Exception}";  }  else  {  return $"{QueryKey} downloaded for CID {CustomerId}: " +  $"{Response.Results.Count} rows returned.";  }  }  }  } }   

PHP

This is not applicable to PHP because multi-threading cannot be used in a web server environment. 

Python

#!/usr/bin/env python # Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Shows how to download in parallel a set of reports from a list of accounts. If you need to obtain a list of accounts, please see the account_management/get_account_hierarchy.py or account_management/list_accessible_customers.py examples. """ import argparse from itertools import product import multiprocessing import time from typing import Any, Dict, Iterable, List, Tuple from google.ads.googleads.client import GoogleAdsClient from google.ads.googleads.errors import GoogleAdsException from google.ads.googleads.v22.errors.types import ( ErrorLocation, GoogleAdsError, ) from google.ads.googleads.v22.services.services.google_ads_service import ( GoogleAdsServiceClient, ) from google.ads.googleads.v22.services.types import ( GoogleAdsRow, SearchGoogleAdsStreamResponse, ) # Maximum number of processes to spawn. MAX_PROCESSES: int = multiprocessing.cpu_count() # Timeout between retries in seconds. BACKOFF_FACTOR: int = 5 # Maximum number of retries for errors. MAX_RETRIES: int = 5 def main(client: GoogleAdsClient, customer_ids: List[str]) -> None:  """The main method that creates all necessary entities for the example.  Args:  client: an initialized GoogleAdsClient instance.  customer_ids: an array of client customer IDs.  """ # Define the GAQL query strings to run for each customer ID. campaign_query: str = """  SELECT campaign.id, metrics.impressions, metrics.clicks  FROM campaign  WHERE segments.date DURING LAST_30_DAYS""" ad_group_query: str = """  SELECT campaign.id, ad_group.id, metrics.impressions, metrics.clicks  FROM ad_group  WHERE segments.date DURING LAST_30_DAYS""" inputs: Iterable[Tuple[GoogleAdsClient, str, str]] = generate_inputs( client, customer_ids, [campaign_query, ad_group_query] ) with multiprocessing.Pool(MAX_PROCESSES) as pool: # Call issue_search_request on each input, parallelizing the work # across processes in the pool. results: List[Tuple[bool, Dict[str, Any]]] = pool.starmap( issue_search_request, inputs ) # Partition our results into successful and failed results. successes: List[Dict[str, Any]] = [] failures: List[Dict[str, Any]] = [] res: Tuple[bool, Dict[str, Any]] for res in results: if res[0]: successes.append(res[1]) else: failures.append(res[1]) # Output results. print( f"Total successful results: {len(successes)}\n" f"Total failed results: {len(failures)}\n" ) print("Successes:") if len(successes) else None success: Dict[str, Any] for success in successes: # success["results"] represents an array of result strings for one # customer ID / query combination. result_str: str = "\n".join(success["results"]) print(result_str) print("Failures:") if len(failures) else None failure: Dict[str, Any] for failure in failures: ex: GoogleAdsException = failure["exception"] print( f'Request with ID "{ex.request_id}" failed with status ' f'"{ex.error.code().name}" for customer_id ' f'{failure["customer_id"]} and query "{failure["query"]}" and ' "includes the following errors:" ) error: GoogleAdsError for error in ex.failure.errors: print(f'\tError with message "{error.message}".') if error.location: field_path_element: ErrorLocation.FieldPathElement for ( field_path_element ) in error.location.field_path_elements: print(f"\t\tOn field: {field_path_element.field_name}") def issue_search_request( client: GoogleAdsClient, customer_id: str, query: str ) -> Tuple[bool, Dict[str, Any]]:  """Issues a search request using streaming.  Retries if a GoogleAdsException is caught, until MAX_RETRIES is reached.  Args:  client: an initialized GoogleAdsClient instance.  customer_id: a client customer ID str.  query: a GAQL query str.  """ ga_service: GoogleAdsServiceClient = client.get_service("GoogleAdsService") retry_count: int = 0 # Retry until we've reached MAX_RETRIES or have successfully received a # response. while True: try: stream: Iterable[SearchGoogleAdsStreamResponse] = ( ga_service.search_stream(customer_id=customer_id, query=query) ) # Returning a list of GoogleAdsRows will result in a # PicklingError, so instead we put the GoogleAdsRow data # into a list of str results and return that. result_strings: List[str] = [] batch: SearchGoogleAdsStreamResponse for batch in stream: row: GoogleAdsRow for row in batch.results: ad_group_id: str = ( f"Ad Group ID {row.ad_group.id} in " if "ad_group.id" in query else "" ) result_string: str = ( f"{ad_group_id}" f"Campaign ID {row.campaign.id} " f"had {row.metrics.impressions} impressions " f"and {row.metrics.clicks} clicks." ) result_strings.append(result_string) return (True, {"results": result_strings}) except GoogleAdsException as ex: # This example retries on all GoogleAdsExceptions. In practice, # developers might want to limit retries to only those error codes # they deem retriable. if retry_count < MAX_RETRIES: retry_count += 1 time.sleep(retry_count * BACKOFF_FACTOR) else: return ( False, { "exception": ex, "customer_id": customer_id, "query": query, }, ) def generate_inputs( client: GoogleAdsClient, customer_ids: List[str], queries: List[str], ) -> Iterable[Tuple[GoogleAdsClient, str, str]]:  """Generates all inputs to feed into search requests.  A GoogleAdsService instance cannot be serialized with pickle for parallel  processing, but a GoogleAdsClient can be, so we pass the client to the  pool task which will then get the GoogleAdsService instance.  Args:  client: An initialized GoogleAdsClient instance.  customer_ids: A list of str client customer IDs.  queries: A list of str GAQL queries.  """ return product([client], customer_ids, queries) if __name__ == "__main__": parser: argparse.ArgumentParser = argparse.ArgumentParser( description="Download a set of reports in parallel from a list of " "accounts." ) # The following argument(s) should be provided to run the example. parser.add_argument( "-c", "--customer_ids", nargs="+", type=str, required=True, help="The Google Ads customer IDs.", ) parser.add_argument( "-l", "--login_customer_id", type=str, help="The login customer ID (optional).", ) args: argparse.Namespace = parser.parse_args() # GoogleAdsClient will read the google-ads.yaml configuration file in the # home directory if none is specified. googleads_client: GoogleAdsClient = GoogleAdsClient.load_from_storage( version="v22" ) # Override the login_customer_id on the GoogleAdsClient, if specified. if args.login_customer_id is not None: googleads_client.login_customer_id = args.login_customer_id main(googleads_client, args.customer_ids)  

Ruby

#!/usr/bin/ruby # Encoding: utf-8 # # Copyright:: Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Shows how to download a set of reports from a list of accounts in parallel. # If you need to obtain a list of accounts, please see get_account_hierarchy.rb # or list_accessible_customers.rb examples. require 'optparse' require 'google/ads/google_ads' require 'thread' def parallel_report_download(customer_ids, login_customer_id)  # GoogleAdsClient will read a config file from  # ENV['HOME']/google_ads_config.rb when called without parameters  client = Google::Ads::GoogleAds::GoogleAdsClient.new  # Optional login customer ID if your access to the CIDs is  # via a manager account.  client.configure do |config|  if login_customer_id  config.login_customer_id = login_customer_id.tr("-", "").to_i  end  end  ga_service = client.service.google_ads  query_list = [  [  "Campaign Query",  <<~QUERY  SELECT campaign.id, metrics.impressions, metrics.clicks  FROM campaign  WHERE segments.date DURING LAST_30_DAYS  QUERY  ],  [  "Ad Group Query",  <<~QUERY  SELECT campaign.id, ad_group.id, metrics.impressions, metrics.clicks  FROM ad_group  WHERE segments.date DURING LAST_30_DAYS  QUERY  ]  ]  # Use Queue instead of array, to ensure thread safety.  # (Array in Ruby is not thread safe.)  reports_succeeded = Queue.new()  reports_failed = Queue.new()  # Start all the threads.  # This is a naive implementation for illustrative purposes. It is possible to  # optimize the utilization of each customer ID by providing a queue of work  # (or similar). However, this would complicate the example code and so is  # omitted here.  threads = []  query_list.each do |query_key, query|  customer_ids.each do |cid|  cid = cid.tr("-", "")  # Starts the report download in a background thread.  threads << Thread.new do  begin  puts "Requesting #{query_key} for CID #{cid}"  responses = ga_service.search_stream(  customer_id: cid.tr("-", ""),  query: query,  )  # Stores the number of rows for each report for illustrative purposes.  # Users of this code example can implement other logic here such as  # storing response to a database/file, pass the response on to  # another method for further processing, etc.  num_rows = 0  responses.each do |response|  response.results.each do |row|  num_rows += 1  end  end  reports_succeeded << {  :cid => cid,  :query_key => query_key,  :num_rows => num_rows,  }  rescue Google::Ads::GoogleAds::Errors::GoogleAdsError => e  error_messages = ""  e.failure.errors.each do |error|  error_messages += error.message  end  reports_failed << {  :cid => cid,  :query_key => query_key,  :error_messages => error_messages,  }  end  end  end  end  puts "Awaiting results from #{threads.size} report download requests..."  # Waits for all pending requests to the current set of customer IDs  # to complete.  threads.each { |thread| thread.join }  puts 'Download completed, results:'  puts 'Successful reports:'  while !reports_succeeded.empty? do  result = reports_succeeded.pop()  puts "Customer ID: #{result[:cid]}, Query Key: #{result[:query_key]}, " \  "Total rows retrieved: #{result[:num_rows]}"  end  puts 'Failed reports:'  while !reports_failed.empty? do  result = reports_failed.pop()  puts "Customer ID: #{result[:cid]}, Query Key: #{result[:query_key]}, " \  "Error Messages: #{result[:error_messages]}"  end  puts 'End of results.' end if __FILE__ == $PROGRAM_NAME  options = {}  # The following parameter(s) should be provided to run the example. You can  # either specify these by changing the INSERT_XXX_ID_HERE values below, or on  # the command line.  #  # Parameters passed on the command line will override any parameters set in  # code.  #  # Running the example with -h will print the command line usage.  options[:customer_ids] = [  'INSERT_CUSTOMER_ID_1_HERE',  'INSERT_CUSTOMER_ID_2_HERE',  ]  OptionParser.new do |opts|  opts.banner = sprintf('Usage: ruby %s [options]', File.basename(__FILE__))  opts.separator ''  opts.separator 'Options:'  opts.on('-C', '--customer-ids CUSTOMER-IDS', String,  'A comma-separated list of Customer IDs to downloads reports in parallel.') do |v|  options[:customer_ids] = v.split(',')  end  opts.on('-L', '--login-customer-id LOGIN-CUSTOMER-ID', String,  'Optionally specify the manager account ID which provides access to the Customer IDs.') do |v|  options[:login_customer_id] = v  end  opts.separator ''  opts.separator 'Help:'  opts.on_tail('-h', '--help', 'Show this message') do  puts opts  exit  end  end.parse!  begin  parallel_report_download(  options.fetch(:customer_ids),  options[:login_customer_id],  )  rescue Google::Ads::GoogleAds::Errors::GoogleAdsError => e  e.failure.errors.each do |error|  STDERR.printf("Error with message: %s\n", error.message)  if error.location  error.location.field_path_elements.each do |field_path_element|  STDERR.printf("\tOn field: %s\n", field_path_element.field_name)  end  end  error.error_code.to_h.each do |k, v|  next if v == :UNSPECIFIED  STDERR.printf("\tType: %s\n\tCode: %s\n", k, v)  end  end  raise  end end   

Perl

#!/usr/bin/perl -w # # Copyright 2020, Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Shows how to download a set of reports from a list of accounts in parallel. # # If you need to obtain a list of accounts, please see the get_account_hierarchy.pl # or list_accessible_customers.pl examples. use strict; use warnings; use utf8; use FindBin qw($Bin); use lib "$Bin/../../lib"; use Google::Ads::GoogleAds::Client; use Google::Ads::GoogleAds::Utils::GoogleAdsHelper; use Google::Ads::GoogleAds::Utils::SearchGoogleAdsIterator; use  Google::Ads::GoogleAds::V22::Services::GoogleAdsService::SearchGoogleAdsRequest; use Getopt::Long qw(:config auto_help); use Pod::Usage; use Cwd qw(abs_path); use threads; # Defines the Google Ads Query Language (GAQL) query strings to run for each # customer ID. use constant GAQL_QUERY_STRINGS => [  "SELECT campaign.id, metrics.impressions, metrics.clicks " .  "FROM campaign WHERE segments.date DURING LAST_30_DAYS",  "SELECT campaign.id, ad_group.id, metrics.impressions, metrics.clicks" .  " FROM ad_group WHERE segments.date DURING LAST_30_DAYS" ]; # The following parameter(s) should be provided to run the example. You can # either specify these by changing the INSERT_XXX_ID_HERE values below, or on # the command line. # # Parameters passed on the command line will override any parameters set in # code. # # Running the example with -h will print the command line usage. my $customer_id_1 = "INSERT_CUSTOMER_ID_1_HERE"; my $customer_id_2 = "INSERT_CUSTOMER_ID_2_HERE"; my $customer_ids = []; my $login_customer_id = undef; sub parallel_report_download {  my ($api_client, $customer_ids) = @_;  # Create a single google ads service which can be shared by all threads.  my $google_ads_service = $api_client->GoogleAdsService();  # IMPORTANT: You should avoid hitting the same customer ID in parallel. There  # are rate limits at the customer ID level which are much stricter than limits  # at the developer token level.  foreach my $search_query (@{+GAQL_QUERY_STRINGS}) {  # Use a list of threads to make sure that we wait for this report to complete  # on all customer IDs before proceeding.  my $threads = [];  # Use the API to retrieve the report for each customer ID.  foreach my $customer_id (@$customer_ids) {  # Start the report download in a background thread.  my $thread =  threads->create(\&download_report, $google_ads_service, $customer_id,  $search_query);  # Store a thread to retrieve the results.  push @$threads, $thread;  }  # Wait for all pending requests to the current set of customer IDs to complete.  my $results = [map { $_->join() } @$threads];  print "Report results for query: $search_query\n";  foreach my $result (@$results) {  printf "Customer ID '%d' Number of results: %d IsSuccess? %s\n",  $result->{customerId}, $result->{numResults},  defined $result->{errorMessage}  ? "No :-( Why? " . $result->{errorMessage}  : "Yes!";  }  }  return 1; } # Downloads the report from the specified customer ID. sub download_report {  my ($google_ads_service, $customer_id, $search_query) = @_;  my $numResults = 0;  my $errorMessage = undef;  # Ideally we should use the search stream request here. But there's a tricky  # issue in the JSON::SL module which is a dependency of SearchStreamHandler:  #  # This will most likely not work with threads, although one would wonder why  # you would want to use this module across threads.  # Create a search Google Ads request that will retrieve the results using pages  # of the specified page size.  my $search_request =  Google::Ads::GoogleAds::V22::Services::GoogleAdsService::SearchGoogleAdsRequest  ->new({  customerId => $customer_id,  query => $search_query  });  eval {  my $iterator = Google::Ads::GoogleAds::Utils::SearchGoogleAdsIterator->new({  service => $google_ads_service,  request => $search_request  });  # Iterate over all rows in all pages to count the number or results.  while ($iterator->has_next) {  my $google_ads_row = $iterator->next;  $numResults++;  }  };  if ($@) {  $errorMessage = $@ =~ /"message": "([^"]+)"/ ? $1 : "";  }  return {  customerId => $customer_id,  numResults => $numResults,  errorMessage => $errorMessage  }; } # Don't run the example if the file is being included. if (abs_path($0) ne abs_path(__FILE__)) {  return 1; } # Get Google Ads Client, credentials will be read from ~/googleads.properties. my $api_client = Google::Ads::GoogleAds::Client->new(); # By default examples are set to die on any server returned fault. $api_client->set_die_on_faults(1); # Parameters passed on the command line will override any parameters set in code. GetOptions(  "customer_ids=s" => \@$customer_ids,  "login_customer_id=s" => \$login_customer_id ); $customer_ids = [$customer_id_1, $customer_id_2] unless @$customer_ids; # Print the help message if the parameters are not initialized in the code nor # in the command line. pod2usage(2) if not check_params($customer_ids); $api_client->set_login_customer_id($login_customer_id =~ s/-//gr)  if $login_customer_id; # Call the example. parallel_report_download($api_client, [map { $_ =~ s/-//gr } @$customer_ids]); =pod =head1 NAME parallel_report_download =head1 DESCRIPTION Shows how to download a set of reports from a list of accounts in parallel. If you need to obtain a list of accounts, please see the get_account_hierarchy.pl or list_accessible_customers.pl examples. =head1 SYNOPSIS parallel_report_download.pl [options]  -help Show the help message.  -customer_ids The Google Ads customer IDs.  -login_customer_id [optional] The login customer ID. =cut