|
21 | 21 | package events; |
22 | 22 |
|
23 | 23 | import com.google.api.gax.rpc.InvalidArgumentException; |
24 | | -import com.google.cloud.ServiceOptions; |
25 | | -import com.google.cloud.bigquery.BigQueryException; |
| 24 | +import com.google.api.gax.rpc.PermissionDeniedException; |
26 | 25 | import com.google.cloud.retail.v2.GcsSource; |
27 | 26 | import com.google.cloud.retail.v2.ImportErrorsConfig; |
28 | 27 | import com.google.cloud.retail.v2.ImportMetadata; |
|
32 | 31 | import com.google.cloud.retail.v2.UserEventServiceClient; |
33 | 32 | import com.google.longrunning.Operation; |
34 | 33 | import com.google.longrunning.OperationsClient; |
35 | | -import events.setup.EventsCreateGcsBucket; |
36 | 34 | import java.io.IOException; |
| 35 | +import java.time.Instant; |
| 36 | +import java.util.concurrent.TimeUnit; |
37 | 37 |
|
38 | 38 | public class ImportUserEventsGcs { |
39 | 39 |
|
40 | 40 | public static void main(String[] args) throws IOException, InterruptedException { |
41 | 41 | // TODO(developer): Replace these variables before running the sample. |
42 | | - String projectId = ServiceOptions.getDefaultProjectId(); |
| 42 | + String projectId = "your-project-id"; |
43 | 43 | String defaultCatalog = |
44 | 44 | String.format("projects/%s/locations/global/catalogs/default_catalog", projectId); |
45 | | - // TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE: defaultCatalog = |
46 | | - // "invalid_catalog_name" |
47 | | - String gcsEventsObject = "user_events.json"; |
48 | | - // TO CHECK ERROR HANDLING USE THE JSON WITH INVALID USER EVENT: gcsEventsObject = |
49 | | - // "user_events_some_invalid.json" |
50 | | - |
51 | | - importUserEventsFromGcs(gcsEventsObject, defaultCatalog); |
| 45 | + // TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE: |
| 46 | + // defaultCatalog = "invalid_catalog_name"; |
| 47 | + String bucketName = System.getenv("EVENTS_BUCKET_NAME"); |
| 48 | + String gcsUserEventsObject = "user_events.json"; |
| 49 | + // TO CHECK ERROR HANDLING USE THE JSON WITH INVALID USER EVENT: |
| 50 | + // gcsUserEventsObject = "user_events_some_invalid.json"; |
| 51 | + |
| 52 | + importUserEventsFromGcs(defaultCatalog, bucketName, gcsUserEventsObject); |
52 | 53 | } |
53 | 54 |
|
54 | | - public static void importUserEventsFromGcs(String gcsEventsObject, String defaultCatalog) |
| 55 | + public static void importUserEventsFromGcs( |
| 56 | + String defaultCatalog, String bucketName, String gcsUserEventsObject) |
55 | 57 | throws IOException, InterruptedException { |
56 | | - try { |
57 | | - String gcsBucket = String.format("gs://%s", EventsCreateGcsBucket.getBucketName()); |
58 | | - String gcsErrorsBucket = String.format("%s/error", gcsBucket); |
59 | | - |
60 | | - GcsSource gcsSource = |
61 | | - GcsSource.newBuilder() |
62 | | - .addInputUris(String.format("%s/%s", gcsBucket, gcsEventsObject)) |
63 | | - .build(); |
64 | | - |
65 | | - UserEventInputConfig inputConfig = |
66 | | - UserEventInputConfig.newBuilder().setGcsSource(gcsSource).build(); |
67 | | - |
68 | | - ImportErrorsConfig errorsConfig = |
69 | | - ImportErrorsConfig.newBuilder().setGcsPrefix(gcsErrorsBucket).build(); |
70 | | - |
71 | | - ImportUserEventsRequest importRequest = |
72 | | - ImportUserEventsRequest.newBuilder() |
73 | | - .setParent(defaultCatalog) |
74 | | - .setInputConfig(inputConfig) |
75 | | - .setErrorsConfig(errorsConfig) |
76 | | - .build(); |
77 | | - |
78 | | - System.out.printf("Import user events from google cloud source request: %s%n", importRequest); |
79 | | - |
80 | | - // Initialize client that will be used to send requests. This client only needs to be created |
81 | | - // once, and can be reused for multiple requests. After completing all of your requests, call |
82 | | - // the "close" method on the client to safely clean up any remaining background resources. |
83 | | - try (UserEventServiceClient serviceClient = UserEventServiceClient.create()) { |
84 | | - String operationName = |
85 | | - serviceClient.importUserEventsCallable().call(importRequest).getName(); |
86 | | - |
87 | | - System.out.printf("OperationName = %s\n", operationName); |
88 | | - |
89 | | - OperationsClient operationsClient = serviceClient.getOperationsClient(); |
90 | | - Operation operation = operationsClient.getOperation(operationName); |
91 | | - |
92 | | - while (!operation.getDone()) { |
93 | | - // Keep polling the operation periodically until the import task is done. |
94 | | - int awaitDuration = 30000; |
95 | | - Thread.sleep(awaitDuration); |
96 | | - operation = operationsClient.getOperation(operationName); |
97 | | - } |
98 | | - |
99 | | - if (operation.hasMetadata()) { |
100 | | - ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class); |
101 | | - System.out.printf( |
102 | | - "Number of successfully imported events: %s\n", metadata.getSuccessCount()); |
103 | | - System.out.printf( |
104 | | - "Number of failures during the importing: %s\n", metadata.getFailureCount()); |
105 | | - } |
106 | | - |
107 | | - if (operation.hasResponse()) { |
108 | | - ImportUserEventsResponse response = |
109 | | - operation.getResponse().unpack(ImportUserEventsResponse.class); |
110 | | - System.out.printf("Operation result: %s%n", response); |
111 | | - } |
112 | | - } catch (InvalidArgumentException e) { |
| 58 | + String gcsBucket = String.format("gs://%s", bucketName); |
| 59 | + String gcsErrorsBucket = String.format("%s/error", gcsBucket); |
| 60 | + |
| 61 | + GcsSource gcsSource = |
| 62 | + GcsSource.newBuilder() |
| 63 | + .addInputUris(String.format("%s/%s", gcsBucket, gcsUserEventsObject)) |
| 64 | + .build(); |
| 65 | + |
| 66 | + UserEventInputConfig inputConfig = |
| 67 | + UserEventInputConfig.newBuilder().setGcsSource(gcsSource).build(); |
| 68 | + |
| 69 | + System.out.println("GCS source: " + gcsSource.getInputUrisList()); |
| 70 | + |
| 71 | + ImportErrorsConfig errorsConfig = |
| 72 | + ImportErrorsConfig.newBuilder().setGcsPrefix(gcsErrorsBucket).build(); |
| 73 | + |
| 74 | + ImportUserEventsRequest importRequest = |
| 75 | + ImportUserEventsRequest.newBuilder() |
| 76 | + .setParent(defaultCatalog) |
| 77 | + .setInputConfig(inputConfig) |
| 78 | + .setErrorsConfig(errorsConfig) |
| 79 | + .build(); |
| 80 | + System.out.printf("Import user events from google cloud source request: %s%n", importRequest); |
| 81 | + |
| 82 | + // Initialize client that will be used to send requests. This client only |
| 83 | + // needs to be created once, and can be reused for multiple requests. After |
| 84 | + // completing all of your requests, call the "close" method on the client to |
| 85 | + // safely clean up any remaining background resources. |
| 86 | + try (UserEventServiceClient serviceClient = UserEventServiceClient.create()) { |
| 87 | + String operationName = serviceClient.importUserEventsCallable().call(importRequest).getName(); |
| 88 | + |
| 89 | + System.out.println("The operation was started."); |
| 90 | + System.out.printf("OperationName = %s%n", operationName); |
| 91 | + |
| 92 | + OperationsClient operationsClient = serviceClient.getOperationsClient(); |
| 93 | + Operation operation = operationsClient.getOperation(operationName); |
| 94 | + |
| 95 | + Instant deadline = Instant.now().plusSeconds(60); |
| 96 | + |
| 97 | + while (!operation.getDone() || Instant.now().isBefore(deadline)) { |
| 98 | + System.out.println("Please wait till operation is done."); |
| 99 | + TimeUnit.SECONDS.sleep(30); |
| 100 | + operation = operationsClient.getOperation(operationName); |
| 101 | + } |
| 102 | + |
| 103 | + if (operation.hasMetadata()) { |
| 104 | + ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class); |
| 105 | + System.out.printf( |
| 106 | + "Number of successfully imported events: %s%n", metadata.getSuccessCount()); |
113 | 107 | System.out.printf( |
114 | | - "Given GCS input path was not found. %n%s%n " |
115 | | - + "Please run CreateTestResources class to create resources.", |
116 | | - e.getMessage()); |
| 108 | + "Number of failures during the importing: %s%n", metadata.getFailureCount()); |
| 109 | + } else { |
| 110 | + System.out.println("Metadata is empty."); |
| 111 | + } |
| 112 | + |
| 113 | + if (operation.hasResponse()) { |
| 114 | + ImportUserEventsResponse response = |
| 115 | + operation.getResponse().unpack(ImportUserEventsResponse.class); |
| 116 | + System.out.printf("Operation result: %s%n", response); |
| 117 | + } else { |
| 118 | + System.out.println("Operation result is empty."); |
117 | 119 | } |
118 | | - } catch (BigQueryException e) { |
119 | | - System.out.printf("Exception message: %s", e.getMessage()); |
| 120 | + } catch (InvalidArgumentException e) { |
| 121 | + System.out.printf( |
| 122 | + "%s%n'%s' file does not exist in the bucket. Please " |
| 123 | + + "make sure you have followed the setting up instructions.", |
| 124 | + e.getMessage(), gcsUserEventsObject); |
| 125 | + } catch (PermissionDeniedException e) { |
| 126 | + System.out.println(e.getMessage()); |
120 | 127 | } |
121 | 128 | } |
122 | 129 | } |
0 commit comments