@@ -25,7 +25,7 @@ class Elastic{
2525 } )
2626 await this . _checkConnection ( this . sourceClient , 'source' ) ;
2727
28- //configure target client if required
28+ //configure target elastic client if required
2929 if ( options . consume . byElastic ) {
3030 this . targetClient = new elasticsearch . Client ( {
3131 host : config . target . host ,
@@ -37,6 +37,11 @@ class Elastic{
3737 await this . _transferMapping ( this . sourceClient , this . targetClient ) ;
3838 }
3939 }
40+
41+ //configure target file if required
42+ if ( options . consume . byFile ) {
43+ await this . _mkdir ( config . target . dirPath ) ;
44+ }
4045 }
4146
4247 async getDocsFromSourceElastic ( { batchSize = 20 , query = { match_all : { } } , scrollTime = '10s' } ) {
@@ -78,19 +83,19 @@ class Elastic{
7883 return accumulator . concat ( [ { index} , hit . _source ] )
7984 } , [ ] ) ;
8085
81- //console.log(body);
8286 console . log ( 'Elastic Doc Count' , this . target . doc_count . elastic ) ;
8387 return this . targetClient . bulk ( { body} ) ;
8488 }
8589
8690 async writeDocsToFile ( rawHitsList ) {
8791 let body = rawHitsList . map ( hits => {
92+ let path = config . target . dirPath + config . target . filePath ;
8893 let index = {
8994 _index : config . target . index ,
9095 _type : config . target . type ,
9196 _id : ++ this . target . doc_count . file
9297 }
93- return this . _writeLineToFile ( config . target . filePath , `${ JSON . stringify ( { index} ) } \n${ JSON . stringify ( hits . _source ) } ` ) ;
98+ return this . _writeLineToFile ( path , `${ JSON . stringify ( { index} ) } \n${ JSON . stringify ( hits . _source ) } ` ) ;
9499 } )
95100
96101 console . log ( 'File Doc Count' , this . target . doc_count . file ) ;
@@ -109,22 +114,60 @@ class Elastic{
109114 } )
110115 }
111116
117+ _mkdir ( path ) {
118+ return new Promise ( ( resolve , reject ) => {
119+ fs . mkdir ( path , err => {
120+ //if any error other than 'directory already exists'
121+ if ( err && err . code != 'EEXIST' )
122+ reject ( err ) ;
123+ resolve ( ) ;
124+ } )
125+ } )
126+ }
127+
112128 async _transferMapping ( source , target ) {
113- let mapping = await source . indices . getMapping ( {
129+ let sourceMapping = await source . indices . getMapping ( {
114130 index : config . source . index ,
115131 type : config . source . type
116132 } )
117- console . log ( "Mapping: " , JSON . stringify ( mapping ) ) ;
133+
134+ let mapping = this . _buildMappingFrom ( sourceMapping ) ;
118135
119136 if ( ! await target . indices . exists ( { index : config . target . index } ) )
120137 await target . indices . create ( { index : config . target . index } ) ;
121138
122139 return await target . indices . putMapping ( {
123140 index : config . target . index ,
124141 type : config . target . type ,
125- body : mapping [ config . source . index ] . mappings
142+ body : mapping
126143 } ) ;
127144 }
145+
146+ //sourceMapping:
147+ //{<index>: {"mappings": {<type>: {<the mapping>}}}}
148+ //To return:
149+ //{<type>: <mapping>}
150+ _buildMappingFrom ( sourceMapping ) {
151+ let base = { } ;
152+ let type = Object . assign ( { } , sourceMapping [ config . source . index ] . mappings [ config . source . type ] ) ;
153+
154+ //if target 'type' given in config is different from the type in source elastic, change it
155+ if ( this . _targetConfigCheckIf ( 'type' , 'given' ) && ! this . _targetConfigCheckIf ( 'type' , 'same' ) )
156+ base [ config . target . type ] = type ;
157+ else
158+ base [ config . source . type ] = type ;
159+
160+ //base = {<type>: <mapping>}
161+ return base ;
162+ }
163+
164+ _targetConfigCheckIf ( property , flag ) {
165+ switch ( flag ) {
166+ case 'given' : return ! ! config . target [ property ] ;
167+ case 'same' : return config . target [ property ] === config . source [ property ] ;
168+ default : throw new Error ( `Error with mapping: ${ property } ` ) ;
169+ }
170+ }
128171}
129172
130173module . exports = { Elastic}
0 commit comments