Skip to content

Commit 1146cfc

Browse files
author
Gaurang
committed
Finish 1.0.0
2 parents 2ddce88 + bf82058 commit 1146cfc

File tree

8 files changed

+398
-1
lines changed

8 files changed

+398
-1
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# Custom
2+
config.js
3+
14
# Logs
25
logs
36
*.log

README.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,19 @@
11
# elasticsearch-document-transfer
2-
Simple script to transfer documents between two elasticsearch servers or between indices of the same elasticsearch server.
2+
Simple Node.js script to transfer documents between two elasticsearch servers or between indices of the same elasticsearch server.
3+
4+
Implementation based on [consumer-producer problem][3].
5+
Since Node.js is single-threaded, `producer` and `consumer` cannot simultaneously access the `buffer`. And hence, as long as the asynchronous nature of Node.js is handled properly, there shouldn't be any issues related to concurrency.
6+
7+
### Usage:
8+
1. Clone the repo.
9+
2. Run `npm install` in the directory.
10+
3. Add `config.js` using [`config-sample.js`][1]
11+
4. (Optional) Set appropriate values in [`options.js`][2]
12+
5. Run the script
13+
```
14+
node index.js
15+
```
16+
17+
[1]: /config-sample.js
18+
[2]: /options.js
19+
[3]: https://en.wikipedia.org/wiki/Producer–consumer_problem

config-sample.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
//Values that will have consequences
2+
3+
module.exports = Object.freeze({
4+
//Source client: from where the documents are to be fetched
5+
"source": {
6+
"host": "localhost:9200",
7+
8+
"index": "my_source_index",
9+
"type": "_doc"
10+
},
11+
12+
//Target client: where documents are to be transferred to
13+
"target": {
14+
"index": "my_target_index",
15+
"type": "_doc",
16+
17+
//if options.consume.byFile is not set, this will be ignored
18+
"filePath": "data.json",
19+
20+
//if options.consume.byElastic is not set, this will be ignored
21+
"host": "localhost:9200",
22+
"importMappingFromSource": true
23+
}
24+
})

elastic.js

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
const elasticsearch = require('elasticsearch');
2+
const fs = require('fs');
3+
4+
const config = require('./config');
5+
const options = require('./options');
6+
7+
8+
class Elastic{
9+
constructor(){
10+
this.target = {
11+
doc_count: {
12+
file: 0,
13+
elastic: 0
14+
}
15+
}
16+
this.batches = 0;
17+
this._scroll_id = null;
18+
}
19+
20+
async init(){
21+
//configure source client
22+
this.sourceClient = new elasticsearch.Client({
23+
host: config.source.host,
24+
log: options.elasticsearch.log_level
25+
})
26+
await this._checkConnection(this.sourceClient, 'source');
27+
28+
//configure target client if required
29+
if(options.consume.byElastic){
30+
this.targetClient = new elasticsearch.Client({
31+
host: config.target.host,
32+
log: options.elasticsearch.log_level
33+
})
34+
await this._checkConnection(this.targetClient, 'target');
35+
36+
if(config.target.importMappingFromSource){
37+
await this._transferMapping(this.sourceClient, this.targetClient);
38+
}
39+
}
40+
}
41+
42+
async getDocsFromSourceElastic({batchSize = 20, query = {match_all: {}}, scrollTime = '10s'}){
43+
let results;
44+
45+
//the first time; when scroll_id isn't generated yet
46+
if(!this._scroll_id){
47+
results = await this.sourceClient.search({
48+
scroll: options.elasticsearch.scroll_time,
49+
index: config.source.index,
50+
type: config.source.type,
51+
body: {
52+
size: batchSize,
53+
query: query
54+
}
55+
})
56+
}
57+
//rest of the times
58+
else{
59+
results = await this.sourceClient.scroll({
60+
scrollId: this._scroll_id,
61+
scroll: options.elasticsearch.scroll_time
62+
})
63+
}
64+
this.batches++;
65+
this._scroll_id = results._scroll_id;
66+
67+
console.log('Batches done', this.batches);
68+
return results
69+
}
70+
71+
addDocsToTargetElastic(rawHitsList){
72+
let body = rawHitsList.reduce((accumulator, hit) => {
73+
let index = {
74+
_index: config.target.index,
75+
_type: config.target.type,
76+
_id: ++this.target.doc_count.elastic
77+
}
78+
return accumulator.concat([{index}, hit._source])
79+
}, []);
80+
81+
//console.log(body);
82+
console.log('Elastic Doc Count', this.target.doc_count.elastic);
83+
return this.targetClient.bulk({body});
84+
}
85+
86+
async writeDocsToFile(rawHitsList){
87+
let body = rawHitsList.map(hits => {
88+
let index = {
89+
_index: config.target.index,
90+
_type: config.target.type,
91+
_id: ++this.target.doc_count.file
92+
}
93+
return this._writeLineToFile(config.target.filePath, `${JSON.stringify({index})}\n${JSON.stringify(hits._source)}`);
94+
})
95+
96+
console.log('File Doc Count', this.target.doc_count.file);
97+
return Promise.all(body);
98+
}
99+
100+
async _checkConnection(client, clientName = ''){
101+
await client.ping({requestTimeout: 3000});
102+
console.log(`${clientName} client connected successfully`);
103+
}
104+
105+
_writeLineToFile(path, line){
106+
line = line + '\n';
107+
return new Promise((resolve, reject) => {
108+
fs.appendFile(path, line, err => err? reject(err): resolve());
109+
})
110+
}
111+
112+
async _transferMapping(source, target){
113+
let mapping = await source.indices.getMapping({
114+
index: config.source.index,
115+
type: config.source.type
116+
})
117+
console.log("Mapping: ", JSON.stringify(mapping));
118+
119+
if(!await target.indices.exists({index: config.target.index}))
120+
await target.indices.create({index: config.target.index});
121+
122+
return await target.indices.putMapping({
123+
index: config.target.index,
124+
type: config.target.type,
125+
body: mapping[config.source.index].mappings
126+
});
127+
}
128+
}
129+
130+
module.exports = {Elastic}

index.js

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
const Elastic = require('./elastic').Elastic;
2+
const events = require('events');
3+
4+
const options = require('./options');
5+
6+
const elastic = new Elastic();
7+
const eventEmitter = new events.EventEmitter();
8+
9+
let buffer = [];
10+
let EOS = false; //End Of Scroll/Search/document Stream
11+
12+
const EVENT_CONSUME_READY = 'BUFFER_CONSUME_READY';
13+
const EVENT_PRODUCE_READY = 'BUFFER_PRODUCE_READY';
14+
15+
//to push documents to target
16+
eventEmitter.on(EVENT_CONSUME_READY, async () => {
17+
try{
18+
//while buffer is larger than the batch_size
19+
//keep consuming it
20+
while(buffer.length > options.consume.batch_size){
21+
let batch = buffer.splice(0, options.consume.batch_size);
22+
23+
if(options.consume.byFile)
24+
await elastic.writeDocsToFile(batch);
25+
26+
if(options.consume.byElastic)
27+
await elastic.addDocsToTargetElastic(batch);
28+
}
29+
30+
console.log('consumeSkip');
31+
32+
//if buffer is depleted but we know more documents are on the way,
33+
//we wait for some time and continue when the buffer fills up
34+
if(!EOS)
35+
setTimeout(() => eventEmitter.emit(EVENT_CONSUME_READY), options.consume.timeout);
36+
37+
//else, we just wind up our work with the rest of the buffer
38+
else{
39+
if(options.consume.byFile)
40+
await elastic.writeDocsToFile(buffer);
41+
42+
if(options.consume.byElastic)
43+
await elastic.addDocsToTargetElastic(buffer);
44+
}
45+
}
46+
catch(err){console.error(err)}
47+
});
48+
49+
//to get documents from source elastic
50+
eventEmitter.on(EVENT_PRODUCE_READY, async () => {
51+
let response;
52+
try{
53+
//till we don't overflow our buffer
54+
//we keep adding documents to it
55+
while(buffer.length < options.buffer_size){
56+
response = await elastic.getDocsFromSourceElastic(options.produce.batch_size);
57+
58+
//once we stop getting new documents
59+
//we signal that we've reached End Of Scroll (EOS)
60+
if(response.hits.hits.length > 0)
61+
buffer = buffer.concat(response.hits.hits);
62+
else{
63+
EOS = true;
64+
break;
65+
}
66+
}
67+
68+
//if we overflow the buffer
69+
//we wait some time and try again later
70+
console.log('produceSkip');
71+
if(!EOS)
72+
setTimeout(() => eventEmitter.emit(EVENT_PRODUCE_READY), options.produce.timeout);
73+
}
74+
catch(err){console.error(err)}
75+
});
76+
77+
(async function() {
78+
try{
79+
await elastic.init();
80+
eventEmitter.emit(EVENT_PRODUCE_READY);
81+
eventEmitter.emit(EVENT_CONSUME_READY);
82+
}
83+
catch(err){console.error(err)}
84+
})();

options.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
//Values you can play around with
2+
3+
module.exports = Object.freeze({
4+
"elasticsearch": {
5+
"log_level": "error",
6+
"scroll_time": "10s"
7+
},
8+
9+
"buffer_size": 1000,
10+
11+
"produce": {
12+
"batch_size": 100,
13+
"timeout": 1000 //in ms
14+
},
15+
16+
"consume": {
17+
"batch_size": 50,
18+
"timeout": 1000, //in ms
19+
"byFile": true,
20+
"byElastic": true
21+
}
22+
})

package-lock.json

Lines changed: 92 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)