Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Conversation

@debasishg
Copy link
Contributor

This PR does the following:

  1. lots of type refinements
  2. refactoring
  3. test infrastructure incorporated
  4. started writing tests - same ones from confluent examples

Now we have better type inference and hence better chaining of builder methods .. The following fragment is the replacement from https://github.com/confluentinc/kafka-streams-examples/blob/4.0.x/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala#L144-L166

// Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTableS[String, Long] = userClicksStream // Join the stream against the table. .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks)) // Change the stream from <user> -> <region, clicks> to <region> -> <clicks> .map((_: String, regionWithClicks: (String, Long)) => regionWithClicks) // Compute the total per region by summing the individual click counts per region. .groupByKey(Serialized.`with`(stringSerde, scalaLongSerde)) .reduce(_ + _)

Now need to port more tests from Confluent examples and check for exhaustivity of the APIs. Also need to document stuff.

@blublinsky blublinsky merged commit 640f034 into develop Nov 28, 2017
@debasishg debasishg deleted the need-tests branch November 28, 2017 17:43
Copy link

@deanwampler deanwampler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments about constant naming conventions.

import java.util.Properties

object MessageSender {
private val ACKCONFIGURATION = "all" // Blocking on the full commit of the record

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about putting "_" between the words?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do ..


def providerProperties(brokers: String, keySerializer: String, valueSerializer: String): Properties = {
val props = new Properties
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason you didn't use the same names for the properties, e.g., ACKS_CONFIG vs. ACKCONFIGURATION? I know these are values you're setting, so perhaps ACKS_CONFIG_DEFAULT?

Also, should these hard-coded defaults be a last resort, only used if no config file definitions are found (perhaps that's how it works...)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change .. also opened an issue #11 for the config file ..

@blublinsky
Copy link

Additionally, scala 2.11 does not compile

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

4 participants