Skip to content
This repository was archived by the owner on Aug 22, 2025. It is now read-only.

Commit aa2f869

Browse files
Merge remote-tracking branch 'upstream/branch-0.11'
2 parents 506be65 + 1f8e399 commit aa2f869

File tree

15 files changed

+247
-46
lines changed

15 files changed

+247
-46
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## 0.11.1 (? 2016)
4+
5+
* Max and Min splitVector bounds for not sharded collections (see doc)
6+
* Config parameter renamed idasobjectid -> idAsObjectId
7+
38
## 0.11.0 (February 2016)
49

510
* Compatibility with spark 1.6

doc/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<parent>
2727
<groupId>com.stratio.datasource</groupId>
2828
<artifactId>spark-mongodb-parent</artifactId>
29-
<version>0.12.0-SNAPSHOT</version>
29+
<version>0.11.2-RC1-SNAPSHOT</version>
3030
</parent>
3131
<properties>
3232
<jacoco.skip>true</jacoco.skip>

doc/src/site/sphinx/First_Steps.rst

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,18 @@ Configuration parameters
9191
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
9292
| schema_samplingRatio | 1.0 | No |
9393
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
94-
| writeConcern | mongodb.WriteConcern.ACKNOWLEDGED | No |
94+
| writeConcern | "safe" | No |
9595
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
9696
| splitSize | 10 | No |
9797
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
9898
| splitKey | "fieldName" | No |
9999
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
100+
| splitKeyType | "dataTypeName" | No |
101+
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
102+
| splitKeyMin | "minvalue" | No |
103+
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
104+
| splitKeyMax | "maxvalue" | No |
105+
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
100106
| credentials | "user,database,password;user,database,password" | No |
101107
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
102108
| updateFields | "fieldName,fieldName" | No |
@@ -117,7 +123,7 @@ Configuration parameters
117123
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
118124
| threadsAllowedToBlockForConnectionMultiplier | "5" | No |
119125
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
120-
| idasobjectid | "false" | No |
126+
| idAsObjectId | "false" | No |
121127
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
122128
| connectionsTime | "180000" | No |
123129
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
@@ -129,12 +135,95 @@ Configuration parameters
129135

130136
**Note:** '_id' field is autogenerated in MongoDB, so by default, you can filter it as String. If you need a custom '_id', you have to set 'idasobjectid' property to "false" like in the above table.
131137

138+
There are two ways to set up configuration:
139+
140+
1. Using MongodbConfigBuilder, which should contains all the config with the right types.
141+
142+
2. Using DataFrame API to read/write with a Map[String, String] or setting configuration from a SQL sentence (in String to String format) as in:
143+
144+
::
145+
146+
CREATE TEMPORARY TABLE tableName USING com.stratio.datasource.mongodb
147+
OPTIONS (host 'host:port', database 'highschool', collection 'students')"
148+
149+
150+
Credentials
151+
-----------
152+
153+
To connect with credentials you should specify user, database and password.
154+
155+
From MongodbConfigBuilder, you have to create a list of MongodbCredentials, here is an example:
156+
157+
::
158+
159+
MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "highschool", Collection ->"students",
160+
List(com.stratio.datasource.mongodb.MongodbCredentials(user, database, password.toCharArray))
161+
)).build
162+
163+
164+
In other case, (String format) you have to use the format set in the table above.
165+
166+
One credential:
167+
168+
::
169+
170+
"user,database,password"
171+
172+
173+
174+
Two credentials:
175+
176+
::
177+
178+
"user1,database1,password1;user2,database2,password2"
179+
180+
181+
182+
SplitKey parameters
183+
-------------------
184+
185+
An index is needed in the splitKey field.
186+
187+
All splitKey parameters are optionals.
188+
189+
splitKey: Field to split for.
190+
191+
splitSize: Max size of each chunk in MB.
192+
193+
If you want to use explicit boundaries to choose what data get from MongoDB, you will have to use these parameters:
194+
195+
- splitKeyType: Data type of splitKey field. Next MongoDB types are supported:
196+
- "isoDate"
197+
- "int"
198+
- "long"
199+
- "double"
200+
- "string"
201+
202+
- splitKeyMin: Min value of the split in string format.
203+
204+
- splitKeyMax: Max value of the split in string format.
205+
206+
**Note:** Only data between boundaries would be available
207+
208+
132209
Examples
133210
========
134211

135212
Scala API
136213
---------
137214

215+
Launch the spark shell:
216+
::
217+
218+
$ bin/spark-shell --packages com.stratio.datasource:spark-mongodb_2.10:<VERSION>
219+
220+
If you are using the spark shell, a SQLContext is already created and is available as a variable: 'sqlContext'.
221+
Alternatively, you could create a SQLContext instance in your spark application code:
222+
223+
::
224+
225+
val sqlContext = new SQLContext(sc)
226+
138227
To read a DataFrame from a Mongo collection, you can use the library by loading the implicits from `com.stratio.datasource.mongodb._`.
139228

140229
To save a DataFrame in MongoDB you should use the saveToMongodb() function as follows:
@@ -172,8 +261,8 @@ In the example we can see how to use the fromMongoDB() function to read from Mon
172261
val readConfig = builder.build()
173262
val mongoRDD = sqlContext.fromMongoDB(readConfig)
174263
mongoRDD.registerTempTable("students")
175-
sqlContext.sql("SELECT name, age FROM students")
176-
264+
val dataFrame = sqlContext.sql("SELECT name, age FROM students")
265+
dataFrame.show
177266

178267

179268
If you want to use a SSL connection, you need to add this 'import', and add 'SSLOptions' to the MongodbConfigBuilder:
@@ -191,9 +280,9 @@ Using StructType:
191280

192281
import org.apache.spark.sql.types._
193282
val schemaMongo = StructType(StructField("name", StringType, true) :: StructField("age", IntegerType, true ) :: Nil)
194-
sqlContext.createExternalTable("mongoTable", "com.stratio.datasource.mongodb", schemaMongo, Map("host" -> "localhost:27017", "database" -> "highschool", "collection" -> "students"))
283+
val df = sqlContext.read.schema(schemaMongo).format("com.stratio.datasource.mongodb").options(Map("host" -> "localhost:27017", "database" -> "highschool", "collection" -> "students")).load
284+
df.registerTempTable("mongoTable")
195285
sqlContext.sql("SELECT * FROM mongoTable WHERE name = 'Torcuato'").show()
196-
sqlContext.sql("DROP TABLE mongoTable")
197286

198287

199288
Using DataFrameWriter:
@@ -242,9 +331,19 @@ Then:
242331
::
243332

244333
from pyspark.sql import SQLContext
245-
sqlContext.sql("CREATE TEMPORARY TABLE students_table USING com.stratio.datasource.mongodb OPTIONS (host 'host:port', database 'highschool', collection 'students')")
334+
sqlContext.sql("CREATE TEMPORARY TABLE students_table USING com.stratio.datasource.mongodb OPTIONS (host 'localhost:27017', database 'highschool', collection 'students')")
246335
sqlContext.sql("SELECT * FROM students_table").collect()
247336

337+
Using DataFrameReader and DataFrameWriter:
338+
::
339+
340+
df = sqlContext.read.format('com.stratio.datasource.mongodb').options(host='localhost:27017', database='highschool', collection='students').load()
341+
df.select("name").collect()
342+
343+
df.select("name").write.format("com.stratio.datasource.mongodb").mode('overwrite').options(host='localhost:27017', database='highschool', collection='studentsview').save()
344+
dfView = sqlContext.read.format('com.stratio.datasource.mongodb').options(host='localhost:27017', database='highschool', collection='studentsview').load()
345+
dfView.show()
346+
248347
Java API
249348
--------
250349

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<modelVersion>4.0.0</modelVersion>
2222
<groupId>com.stratio.datasource</groupId>
2323
<artifactId>spark-mongodb-parent</artifactId>
24-
<version>0.12.0-SNAPSHOT</version>
24+
<version>0.11.2-RC1-SNAPSHOT</version>
2525
<packaging>pom</packaging>
2626
<name>Stratio Spark Mongodb Datasource</name>
2727
<description>A Spark SQL library for MongoDB</description>

spark-mongodb-examples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
<properties>
4242
<scala.binary.version>2.10</scala.binary.version>
43-
<mongodb.datasource.version>0.11.0-SNAPSHOT</mongodb.datasource.version>
43+
<mongodb.datasource.version>0.11.2-RC1-SNAPSHOT</mongodb.datasource.version>
4444
<spark.version>1.5.2</spark.version>
4545
</properties>
4646

spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/client/MongodbClientFactory.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import com.mongodb.casbah.Imports._
2626
import com.mongodb.casbah.MongoClient
2727
import com.stratio.datasource.mongodb.client.MongodbClientActor._
2828
import com.stratio.datasource.mongodb.config.MongodbSSLOptions
29+
import com.typesafe.config.ConfigFactory
2930

3031
import scala.concurrent.Await
3132
import scala.concurrent.duration._
@@ -43,7 +44,7 @@ object MongodbClientFactory {
4344
/**
4445
* Scheduler that close connections automatically when the timeout was expired
4546
*/
46-
private val actorSystem = ActorSystem()
47+
private val actorSystem = ActorSystem("mongodbClientFactory", ConfigFactory.load(ConfigFactory.parseString("akka.daemonic=on")))
4748
private val scheduler = actorSystem.scheduler
4849
private val SecondsToCheckConnections = 60
4950
private val mongoConnectionsActor = actorSystem.actorOf(Props(new MongodbClientActor), "mongoConnectionActor")
@@ -154,8 +155,9 @@ object MongodbClientFactory {
154155
}
155156
}
156157

158+
// TODO Review when refactoring config
157159
def extractValue[T](options: Map[String, Any], key: String): Option[T] =
158-
options.get(key).map(_.asInstanceOf[T])
160+
options.get(key.toLowerCase).map(_.asInstanceOf[T])
159161

160162
def sslBuilder(optionSSLOptions: Option[MongodbSSLOptions]): Boolean =
161163
optionSSLOptions.exists(sslOptions => {

spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.stratio.datasource.util.Config._
2323
/**
2424
* Values and Functions for access and parse the configuration parameters
2525
*/
26+
// TODO Review when refactoring config
2627
object MongodbConfig {
2728

2829
// Parameter names
@@ -41,12 +42,15 @@ object MongodbConfig {
4142
val SamplingRatio = "schema_samplingRatio"
4243
val SplitSize = "splitSize"
4344
val SplitKey = "splitKey"
45+
val SplitKeyType = "splitKeyType"
46+
val SplitKeyMin = "splitKeyMin"
47+
val SplitKeyMax = "splitKeyMax"
4448
val UpdateFields = "updateFields"
4549
val Language = "language"
4650
val ConnectionsTime = "connectionsTime"
4751
val CursorBatchSize = "cursorBatchSize"
4852
val BulkBatchSize = "bulkBatchSize"
49-
val IdAsObjectId = "idasobjectid"
53+
val IdAsObjectId = "idAsObjectId"
5054

5155
// List of parameters for mongoClientOptions
5256
val ListMongoClientOptions = List(
@@ -90,6 +94,7 @@ object MongodbConfig {
9094
* @param parameters List of parameters
9195
* @return List of parameters parsed to correct mongoDb configurations
9296
*/
97+
// TODO Review when refactoring config
9398
def parseParameters(parameters : Map[String,String]): Map[String, Any] = {
9499

95100
// required properties
@@ -134,6 +139,7 @@ object MongodbConfig {
134139
* @param readPreference string key for identify the correct object
135140
* @return readPreference object
136141
*/
142+
// TODO Review when refactoring config
137143
def parseReadPreference(readPreference: String): ReadPreference = {
138144
readPreference.toUpperCase match {
139145
case "PRIMARY" => com.mongodb.casbah.ReadPreference.Primary
@@ -150,18 +156,16 @@ object MongodbConfig {
150156
* @param writeConcern string key for identify the correct object
151157
* @return writeConcern object
152158
*/
159+
// TODO Review when refactoring config
153160
def parseWriteConcern(writeConcern: String): WriteConcern = {
154161
writeConcern.toUpperCase match {
155-
case "FSYNC_SAFE" => com.mongodb.WriteConcern.FSYNC_SAFE
156-
case "FSYNCED" => com.mongodb.WriteConcern.FSYNCED
157-
case "JOURNAL_SAFE" => com.mongodb.WriteConcern.JOURNAL_SAFE
158-
case "JOURNALED" => com.mongodb.WriteConcern.JOURNALED
162+
case "SAFE" | "ACKNOWLEDGED" => com.mongodb.WriteConcern.SAFE
163+
case "NORMAL" | "UNACKNOWLEDGED" => com.mongodb.WriteConcern.NORMAL
164+
case "REPLICAS_SAFE" | "REPLICA_ACKNOWLEDGED" => com.mongodb.WriteConcern.REPLICAS_SAFE
165+
case "FSYNC_SAFE" | "FSYNCED" => com.mongodb.WriteConcern.FSYNC_SAFE
159166
case "MAJORITY" => com.mongodb.WriteConcern.MAJORITY
160-
case "NORMAL" => com.mongodb.WriteConcern.NORMAL
161-
case "REPLICA_ACKNOWLEDGED" => com.mongodb.WriteConcern.REPLICA_ACKNOWLEDGED
162-
case "REPLICAS_SAFE" => com.mongodb.WriteConcern.REPLICAS_SAFE
163-
case "SAFE" => com.mongodb.WriteConcern.SAFE
164-
case "UNACKNOWLEDGED" => com.mongodb.WriteConcern.UNACKNOWLEDGED
167+
case "JOURNAL_SAFE" | "JOURNALED" => com.mongodb.WriteConcern.JOURNAL_SAFE
168+
case "NONE" | "ERRORS_IGNORED" => com.mongodb.WriteConcern.NONE
165169
case _ => DefaultWriteConcern
166170
}
167171
}

0 commit comments

Comments
 (0)