Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.mongodb.scala.syncadapter

import com.mongodb.assertions.Assertions
import com.mongodb.client.model.bulk.{ ClientBulkWriteOptions, ClientBulkWriteResult, ClientNamespacedWriteModel }
import com.mongodb.{ ClientSessionOptions, ReadConcern, ReadPreference, WriteConcern }
import com.mongodb.client.{ ClientSession, MongoCluster => JMongoCluster, MongoDatabase => JMongoDatabase }
Expand Down Expand Up @@ -129,33 +128,21 @@ class SyncMongoCluster(wrapped: MongoCluster) extends JMongoCluster {

override def bulkWrite(
models: util.List[_ <: ClientNamespacedWriteModel]
): ClientBulkWriteResult = {
org.junit.Assume.assumeTrue("TODO-JAVA-5531 implement", java.lang.Boolean.parseBoolean(toString))
throw Assertions.fail("TODO-JAVA-5531 implement")
}
): ClientBulkWriteResult = wrapped.bulkWrite(models.asScala.toList).toFuture().get()

override def bulkWrite(
models: util.List[_ <: ClientNamespacedWriteModel],
options: ClientBulkWriteOptions
): ClientBulkWriteResult = {
org.junit.Assume.assumeTrue("TODO-JAVA-5531 implement", java.lang.Boolean.parseBoolean(toString))
throw Assertions.fail("TODO-JAVA-5531 implement")
}
): ClientBulkWriteResult = wrapped.bulkWrite(models.asScala.toList, options).toFuture().get()

override def bulkWrite(
clientSession: ClientSession,
models: util.List[_ <: ClientNamespacedWriteModel]
): ClientBulkWriteResult = {
org.junit.Assume.assumeTrue("TODO-JAVA-5531 implement", java.lang.Boolean.parseBoolean(toString))
throw Assertions.fail("TODO-JAVA-5531 implement")
}
): ClientBulkWriteResult = wrapped.bulkWrite(unwrap(clientSession), models.asScala.toList).toFuture().get()

override def bulkWrite(
clientSession: ClientSession,
models: util.List[_ <: ClientNamespacedWriteModel],
options: ClientBulkWriteOptions
): ClientBulkWriteResult = {
org.junit.Assume.assumeTrue("TODO-JAVA-5531 implement", java.lang.Boolean.parseBoolean(toString))
throw Assertions.fail("TODO-JAVA-5531 implement")
}
): ClientBulkWriteResult = wrapped.bulkWrite(unwrap(clientSession), models.asScala.toList, options).toFuture().get()
}
123 changes: 123 additions & 0 deletions driver-scala/src/main/scala/org/mongodb/scala/MongoCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import com.mongodb.reactivestreams.client.{ MongoCluster => JMongoCluster }
import org.bson.codecs.configuration.CodecRegistry
import org.mongodb.scala.bson.DefaultHelper.DefaultsTo
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.bulk.ClientNamespacedUpdateManyModel
import org.mongodb.scala.model.bulk.ClientNamespacedDeleteManyModel
import org.mongodb.scala.model.bulk.{ ClientBulkWriteOptions, ClientBulkWriteResult, ClientNamespacedWriteModel }

import scala.collection.JavaConverters._
import scala.concurrent.duration.{ Duration, MILLISECONDS }
Expand Down Expand Up @@ -290,4 +293,124 @@ class MongoCluster(private val wrapped: JMongoCluster) {
)(implicit e: C DefaultsTo Document, ct: ClassTag[C]): ChangeStreamObservable[C] =
ChangeStreamObservable(wrapped.watch(clientSession, pipeline.asJava, ct))

/**
* Executes a client-level bulk write operation.
* This method is functionally equivalent to `bulkWrite(List, ClientBulkWriteOptions)`
* with the [[ClientBulkWriteOptions.clientBulkWriteOptions default options]].
*
* This operation supports retryable writes.
* Depending on the number of `models`, encoded size of `models`, and the size limits in effect,
* executing this operation may require multiple `bulkWrite` commands.
* The eligibility for retries is determined per each `bulkWrite` command:
* [[ClientNamespacedUpdateManyModel]], [[ClientNamespacedDeleteManyModel]] in a command render it non-retryable.
*
* This operation is not supported by MongoDB Atlas Serverless instances.
*
* [[https://www.mongodb.com/docs/manual/reference/command/bulkWrite/ bulkWrite]]
* @param models The [[ClientNamespacedWriteModel]] individual write operations.
* @return The [[SingleObservable]] signalling at most one element [[ClientBulkWriteResult]] if the operation is successful,
* or the following errors:
* - [[ClientBulkWriteException]]: If and only if the operation is unsuccessful or partially unsuccessful,
* and there is at least one of the following pieces of information to report:
* [[ClientBulkWriteException ClientBulkWriteException#getWriteConcernErrors]],
* [[ClientBulkWriteException ClientBulkWriteException#getWriteErrors]],
* [[ClientBulkWriteException ClientBulkWriteException#getPartialResult]].
* - [[MongoException]]: Only if the operation is unsuccessful.
* @since 5.4
* @note Requires MongoDB 8.0 or greater.
*/
def bulkWrite(models: List[_ <: ClientNamespacedWriteModel]): SingleObservable[ClientBulkWriteResult] =
wrapped.bulkWrite(models.asJava)

/**
* Executes a client-level bulk write operation.
*
* This operation supports retryable writes.
* Depending on the number of `models`, encoded size of `models`, and the size limits in effect,
* executing this operation may require multiple `bulkWrite` commands.
* The eligibility for retries is determined per each `bulkWrite` command:
* [[ClientNamespacedUpdateManyModel]], [[ClientNamespacedDeleteManyModel]] in a command render it non-retryable.
*
* This operation is not supported by MongoDB Atlas Serverless instances.
*
* [[https://www.mongodb.com/docs/manual/reference/command/bulkWrite/ bulkWrite]]
* @param models The [[ClientNamespacedWriteModel]] individual write operations.
* @param options The options.
* @return The [[SingleObservable]] signalling at most one element [[ClientBulkWriteResult]] if the operation is successful,
* or the following errors:
* - [[ClientBulkWriteException]]: If and only if the operation is unsuccessful or partially unsuccessful,
* and there is at least one of the following pieces of information to report:
* [[ClientBulkWriteException ClientBulkWriteException#getWriteConcernErrors]],
* [[ClientBulkWriteException ClientBulkWriteException#getWriteErrors]],
* [[ClientBulkWriteException ClientBulkWriteException#getPartialResult]].
* - [[MongoException]]: Only if the operation is unsuccessful.
* @since 5.4
* @note Requires MongoDB 8.0 or greater.
*/
def bulkWrite(
models: List[_ <: ClientNamespacedWriteModel],
options: ClientBulkWriteOptions
): SingleObservable[ClientBulkWriteResult] = wrapped.bulkWrite(models.asJava, options)

/**
* Executes a client-level bulk write operation.
* This method is functionally equivalent to `bulkWrite(ClientSession, List, ClientBulkWriteOptions)`
* with the [[ClientBulkWriteOptions.clientBulkWriteOptions default options]].
*
* This operation supports retryable writes.
* Depending on the number of `models`, encoded size of `models`, and the size limits in effect,
* executing this operation may require multiple `bulkWrite` commands.
* The eligibility for retries is determined per each `bulkWrite` command:
* [[ClientNamespacedUpdateManyModel]], [[ClientNamespacedDeleteManyModel]] in a command render it non-retryable.
*
* This operation is not supported by MongoDB Atlas Serverless instances.
*
* [[https://www.mongodb.com/docs/manual/reference/command/bulkWrite/ bulkWrite]]
* @param clientSession [[ClientSession client session]] with which to associate this operation.
* @param models The [[ClientNamespacedWriteModel]] individual write operations.
* @return The [[SingleObservable]] signalling at most one element [[ClientBulkWriteResult]] if the operation is successful,
* or the following errors:
* - [[ClientBulkWriteException]]: If and only if the operation is unsuccessful or partially unsuccessful,
* and there is at least one of the following pieces of information to report:
* [[ClientBulkWriteException ClientBulkWriteException#getWriteConcernErrors]],
* [[ClientBulkWriteException ClientBulkWriteException#getWriteErrors]],
* [[ClientBulkWriteException ClientBulkWriteException#getPartialResult]].
* - [[MongoException]]: Only if the operation is unsuccessful.
* @since 5.4
* @note Requires MongoDB 8.0 or greater.
*/
def bulkWrite(
clientSession: ClientSession,
models: List[_ <: ClientNamespacedWriteModel]
): SingleObservable[ClientBulkWriteResult] = wrapped.bulkWrite(clientSession, models.asJava)

/**
* Executes a client-level bulk write operation.
*
* This operation supports retryable writes.
* Depending on the number of `models`, encoded size of `models`, and the size limits in effect,
* executing this operation may require multiple `bulkWrite` commands.
* The eligibility for retries is determined per each `bulkWrite` command:
* [[ClientNamespacedUpdateManyModel]], [[ClientNamespacedDeleteManyModel]] in a command render it non-retryable.
*
* [[https://www.mongodb.com/docs/manual/reference/command/bulkWrite/ bulkWrite]]
* @param clientSession The [[ClientSession client session]] with which to associate this operation.
* @param models The [[ClientNamespacedWriteModel]] individual write operations.
* @param options The options.
* @return The [[SingleObservable]] signalling at most one element [[ClientBulkWriteResult]] if the operation is successful,
* or the following errors:
* - [[ClientBulkWriteException]]: If and only if the operation is unsuccessful or partially unsuccessful,
* and there is at least one of the following pieces of information to report:
* [[ClientBulkWriteException ClientBulkWriteException#getWriteConcernErrors]],
* [[ClientBulkWriteException ClientBulkWriteException#getWriteErrors]],
* [[ClientBulkWriteException ClientBulkWriteException#getPartialResult]].
* - [[MongoException]]: Only if the operation is unsuccessful.
* @since 5.4
* @note Requires MongoDB 8.0 or greater.
*/
def bulkWrite(
clientSession: ClientSession,
models: List[_ <: ClientNamespacedWriteModel],
options: ClientBulkWriteOptions
): SingleObservable[ClientBulkWriteResult] = wrapped.bulkWrite(clientSession, models.asJava, options)
}
Loading