Skip to content

Commit 57fc1c7

Browse files
Implement feature #102
Example in javascript (river will process the document with last timestamp of now + 5 seconds): "options": { "initial_timestamp": { "script_type": "js", "script": "var date = new Date(); date.setSeconds(date.getSeconds() + 5); new java.lang.Long(date.getTime());" } },
1 parent 3d698ca commit 57fc1c7

File tree

3 files changed

+297
-0
lines changed

3 files changed

+297
-0
lines changed

src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.security.cert.CertificateException;
2828
import java.security.cert.X509Certificate;
2929
import java.util.ArrayList;
30+
import java.util.Date;
3031
import java.util.HashMap;
3132
import java.util.HashSet;
3233
import java.util.List;
@@ -57,6 +58,7 @@
5758
import org.elasticsearch.cluster.metadata.MappingMetaData;
5859
import org.elasticsearch.common.StopWatch;
5960
import org.elasticsearch.common.collect.ImmutableMap;
61+
import org.elasticsearch.common.collect.Maps;
6062
import org.elasticsearch.common.inject.Inject;
6163
import org.elasticsearch.common.logging.ESLogger;
6264
import org.elasticsearch.common.logging.ESLoggerFactory;
@@ -120,6 +122,9 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
120122
public final static String DROP_COLLECTION_FIELD = "drop_collection";
121123
public final static String EXCLUDE_FIELDS_FIELD = "exclude_fields";
122124
public final static String INCLUDE_COLLECTION_FIELD = "include_collection";
125+
public final static String INITIAL_TIMESTAMP_FIELD = "initial_timestamp";
126+
public final static String INITIAL_TIMESTAMP_SCRIPT_TYPE_FIELD = "script_type";
127+
public final static String INITIAL_TIMESTAMP_SCRIPT_FIELD = "script";
123128
public final static String FILTER_FIELD = "filter";
124129
public final static String CREDENTIALS_FIELD = "credentials";
125130
public final static String USER_FIELD = "user";
@@ -189,6 +194,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
189194
protected final boolean dropCollection;
190195
protected final Set<String> excludeFields;
191196
protected final String includeCollection;
197+
protected final BSONTimestamp initialTimestamp;
192198

193199
private final BasicDBObject findKeys = new BasicDBObject();
194200
private final ExecutableScript script;
@@ -264,6 +270,7 @@ public MongoDBRiver(final RiverName riverName,
264270
if (mongoSettings.containsKey(OPTIONS_FIELD)) {
265271
Map<String, Object> mongoOptionsSettings = (Map<String, Object>) mongoSettings
266272
.get(OPTIONS_FIELD);
273+
logger.trace("mongoOptionsSettings: " + mongoOptionsSettings);
267274
mongoSecondaryReadPreference = XContentMapValues
268275
.nodeBooleanValue(mongoOptionsSettings
269276
.get(SECONDARY_READ_PREFERENCE_FIELD), false);
@@ -301,13 +308,52 @@ public MongoDBRiver(final RiverName riverName,
301308
} else {
302309
excludeFields = null;
303310
}
311+
if (mongoOptionsSettings.containsKey(INITIAL_TIMESTAMP_FIELD)) {
312+
BSONTimestamp timeStamp = null;
313+
try {
314+
Map<String, Object> initalTimestampSettings = (Map<String, Object>) mongoOptionsSettings
315+
.get(INITIAL_TIMESTAMP_FIELD);
316+
String scriptType = "js";
317+
if (initalTimestampSettings
318+
.containsKey(INITIAL_TIMESTAMP_SCRIPT_TYPE_FIELD)) {
319+
scriptType = initalTimestampSettings.get(
320+
INITIAL_TIMESTAMP_SCRIPT_TYPE_FIELD)
321+
.toString();
322+
}
323+
if (initalTimestampSettings
324+
.containsKey(INITIAL_TIMESTAMP_SCRIPT_FIELD)) {
325+
326+
ExecutableScript script = scriptService.executable(
327+
scriptType,
328+
initalTimestampSettings.get(
329+
INITIAL_TIMESTAMP_SCRIPT_FIELD)
330+
.toString(), Maps.newHashMap());
331+
Object ctx = script.run();
332+
logger.trace(
333+
"initialTimestamp script returned: {}", ctx);
334+
if (ctx != null) {
335+
long timestamp = Long.parseLong(ctx.toString());
336+
timeStamp = new BSONTimestamp((int) (new Date(
337+
timestamp).getTime() / 1000), 1);
338+
}
339+
}
340+
} catch (Throwable t) {
341+
logger.warn("Could set initial timestamp", t,
342+
new Object());
343+
} finally {
344+
initialTimestamp = timeStamp;
345+
}
346+
} else {
347+
initialTimestamp = null;
348+
}
304349
} else {
305350
mongoSecondaryReadPreference = false;
306351
dropCollection = false;
307352
includeCollection = "";
308353
excludeFields = null;
309354
mongoUseSSL = false;
310355
mongoSSLVerifyCertificate = false;
356+
initialTimestamp = null;
311357
}
312358

313359
// Credentials
@@ -414,6 +460,7 @@ public MongoDBRiver(final RiverName riverName,
414460
excludeFields = null;
415461
mongoUseSSL = false;
416462
mongoSSLVerifyCertificate = false;
463+
initialTimestamp = null;
417464
}
418465
mongoOplogNamespace = mongoDb + "." + mongoCollection;
419466

@@ -1407,6 +1454,10 @@ private BSONTimestamp getLastTimestamp(final String namespace) {
14071454

14081455
}
14091456
}
1457+
} else {
1458+
if (initialTimestamp != null) {
1459+
return initialTimestamp;
1460+
}
14101461
}
14111462
return null;
14121463
}
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package test.elasticsearch.plugin.river.mongodb.simple;
20+
21+
import static org.elasticsearch.client.Requests.countRequest;
22+
import static org.elasticsearch.common.io.Streams.copyToStringFromClasspath;
23+
import static org.hamcrest.MatcherAssert.assertThat;
24+
import static org.hamcrest.Matchers.equalTo;
25+
26+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
27+
import org.elasticsearch.action.count.CountResponse;
28+
import org.testng.Assert;
29+
import org.testng.annotations.AfterClass;
30+
import org.testng.annotations.BeforeClass;
31+
import org.testng.annotations.Test;
32+
33+
import test.elasticsearch.plugin.river.mongodb.RiverMongoDBTestAsbtract;
34+
35+
import com.mongodb.DB;
36+
import com.mongodb.DBCollection;
37+
import com.mongodb.DBObject;
38+
import com.mongodb.WriteConcern;
39+
import com.mongodb.util.JSON;
40+
41+
@Test
42+
public class RiverMongoInitialTimestampTest extends RiverMongoDBTestAsbtract {
43+
44+
private static final String TEST_SIMPLE_MONGODB_RIVER_INITIAL_TIMESTAMP_JSON = "/test/elasticsearch/plugin/river/mongodb/simple/test-simple-mongodb-river-initial-timestamp.json";
45+
private static final String GROOVY_SCRIPT_TYPE = "groovy";
46+
private static final String JAVASCRIPT_SCRIPT_TYPE = "js";
47+
private DB mongoDB;
48+
private DBCollection mongoCollection;
49+
50+
protected RiverMongoInitialTimestampTest() {
51+
super("initial-timestamp-river-" + System.currentTimeMillis(),
52+
"initial-timestamp-river-" + System.currentTimeMillis(),
53+
"initial-timestamp-collection-" + System.currentTimeMillis(),
54+
"initial-timestamp-index-" + System.currentTimeMillis());
55+
}
56+
57+
protected RiverMongoInitialTimestampTest(String river, String database,
58+
String collection, String index) {
59+
super(river, database, collection, index);
60+
}
61+
62+
@BeforeClass
63+
public void createDatabase() {
64+
logger.debug("createDatabase {}", getDatabase());
65+
try {
66+
mongoDB = getMongo().getDB(getDatabase());
67+
mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE);
68+
logger.info("Start createCollection");
69+
mongoCollection = mongoDB.createCollection(getCollection(), null);
70+
Assert.assertNotNull(mongoCollection);
71+
} catch (Throwable t) {
72+
logger.error("createDatabase failed.", t);
73+
}
74+
}
75+
76+
@AfterClass
77+
public void cleanUp() {
78+
// super.deleteRiver();
79+
logger.info("Drop database " + mongoDB.getName());
80+
mongoDB.dropDatabase();
81+
}
82+
83+
//String script = "def now = new Date(); println 'Now: ${now}'; ctx.document.modified = now.clearTime();";
84+
@Test
85+
public void testInitialTimestampInGroovy() throws Throwable {
86+
logger.debug("Start testInitialTimestampInGroovy");
87+
try {
88+
String script = "import groovy.time.TimeCategory; use(TimeCategory){def date = new Date() + 5.second; date.time;}";
89+
super.createRiver(TEST_SIMPLE_MONGODB_RIVER_INITIAL_TIMESTAMP_JSON,
90+
getRiver(), (Object) String.valueOf(getMongoPort1()),
91+
(Object) String.valueOf(getMongoPort2()),
92+
(Object) String.valueOf(getMongoPort3()),
93+
(Object) GROOVY_SCRIPT_TYPE, (Object) script,
94+
(Object) getDatabase(), (Object) getCollection(),
95+
(Object) getIndex(), (Object) getDatabase());
96+
97+
String mongoDocument = copyToStringFromClasspath(TEST_SIMPLE_MONGODB_DOCUMENT_JSON);
98+
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
99+
mongoCollection.insert(dbObject);
100+
Thread.sleep(wait);
101+
102+
assertThat(
103+
getNode().client().admin().indices()
104+
.exists(new IndicesExistsRequest(getIndex()))
105+
.actionGet().isExists(), equalTo(true));
106+
107+
refreshIndex();
108+
109+
CountResponse countResponse = getNode().client()
110+
.count(countRequest(getIndex())).actionGet();
111+
assertThat(countResponse.getCount(), equalTo(0L));
112+
113+
mongoCollection.remove(dbObject);
114+
115+
// Wait 5 seconds and store a new document
116+
Thread.sleep(5000);
117+
118+
dbObject = (DBObject) JSON.parse(mongoDocument);
119+
mongoCollection.insert(dbObject);
120+
Thread.sleep(wait);
121+
122+
assertThat(
123+
getNode().client().admin().indices()
124+
.exists(new IndicesExistsRequest(getIndex()))
125+
.actionGet().isExists(), equalTo(true));
126+
assertThat(
127+
getNode().client().admin().indices()
128+
.prepareTypesExists(getIndex())
129+
.setTypes(getDatabase()).execute().actionGet()
130+
.isExists(), equalTo(true));
131+
132+
refreshIndex();
133+
134+
countResponse = getNode().client().count(countRequest(getIndex()))
135+
.actionGet();
136+
assertThat(countResponse.getCount(), equalTo(1L));
137+
138+
mongoCollection.remove(dbObject);
139+
} catch (Throwable t) {
140+
logger.error("testInitialTimestampInGroovy failed.", t);
141+
t.printStackTrace();
142+
throw t;
143+
} finally {
144+
super.deleteRiver();
145+
super.deleteIndex();
146+
}
147+
}
148+
149+
// Convert JavaScript types to Java types: http://stackoverflow.com/questions/6730062/passing-common-types-between-java-and-rhino-javascript
150+
@Test
151+
public void testInitialTimestampInJavascript() throws Throwable {
152+
logger.debug("Start testInitialTimestampInJavascript");
153+
try {
154+
String script = "var date = new Date(); date.setSeconds(date.getSeconds() + 5); new java.lang.Long(date.getTime());";
155+
super.createRiver(TEST_SIMPLE_MONGODB_RIVER_INITIAL_TIMESTAMP_JSON,
156+
getRiver(), (Object) String.valueOf(getMongoPort1()),
157+
(Object) String.valueOf(getMongoPort2()),
158+
(Object) String.valueOf(getMongoPort3()),
159+
(Object) JAVASCRIPT_SCRIPT_TYPE, (Object) script,
160+
(Object) getDatabase(), (Object) getCollection(),
161+
(Object) getIndex(), (Object) getDatabase());
162+
163+
String mongoDocument = copyToStringFromClasspath(TEST_SIMPLE_MONGODB_DOCUMENT_JSON);
164+
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
165+
mongoCollection.insert(dbObject);
166+
Thread.sleep(wait);
167+
168+
assertThat(
169+
getNode().client().admin().indices()
170+
.exists(new IndicesExistsRequest(getIndex()))
171+
.actionGet().isExists(), equalTo(true));
172+
173+
refreshIndex();
174+
175+
CountResponse countResponse = getNode().client()
176+
.count(countRequest(getIndex())).actionGet();
177+
assertThat(countResponse.getCount(), equalTo(0L));
178+
179+
mongoCollection.remove(dbObject);
180+
181+
// Wait 5 seconds and store a new document
182+
Thread.sleep(5000);
183+
184+
dbObject = (DBObject) JSON.parse(mongoDocument);
185+
mongoCollection.insert(dbObject);
186+
Thread.sleep(wait);
187+
188+
assertThat(
189+
getNode().client().admin().indices()
190+
.exists(new IndicesExistsRequest(getIndex()))
191+
.actionGet().isExists(), equalTo(true));
192+
assertThat(
193+
getNode().client().admin().indices()
194+
.prepareTypesExists(getIndex())
195+
.setTypes(getDatabase()).execute().actionGet()
196+
.isExists(), equalTo(true));
197+
198+
refreshIndex();
199+
200+
countResponse = getNode().client().count(countRequest(getIndex()))
201+
.actionGet();
202+
assertThat(countResponse.getCount(), equalTo(1L));
203+
204+
mongoCollection.remove(dbObject);
205+
} catch (Throwable t) {
206+
logger.error("testInitialTimestampInJavascript failed.", t);
207+
t.printStackTrace();
208+
throw t;
209+
} finally {
210+
super.deleteRiver();
211+
super.deleteIndex();
212+
}
213+
}
214+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"type": "mongodb",
3+
"mongodb": {
4+
"servers": [{
5+
"host": "localhost",
6+
"port": %s
7+
},
8+
{
9+
"host": "localhost",
10+
"port": %s
11+
},
12+
{
13+
"host": "localhost",
14+
"port": %s
15+
}],
16+
"options": {
17+
"secondary_read_preference": true,
18+
"initial_timestamp": {
19+
"script_type": "%s",
20+
"script": "%s"
21+
}
22+
},
23+
"db": "%s",
24+
"collection": "%s",
25+
"gridfs": false
26+
},
27+
"index": {
28+
"name": "%s",
29+
"type": "%s",
30+
"throttle_size": 2000
31+
}
32+
}

0 commit comments

Comments
 (0)