Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.RestRequest;

/**
* ContentType is a factory interface with 4 enumerated values
*/
public enum ContentType {

// these variables are value of this enum type ContentType
// they are by default public static final
CSV(10) {
@Override
public String contentType() {
Expand Down Expand Up @@ -109,6 +115,7 @@ public String fileName(final RestRequest request) {

private int index;

// pass one argument into their values
ContentType(final int index) {
this.index = index;
}
Expand All @@ -117,9 +124,17 @@ public int index() {
return index;
}

// every enum value should implement all the abstract method in their enum type
public abstract String contentType();

public abstract String fileName(RestRequest request);

/**
* Create a {@link DataContent} object associated with this {@link ContentType}
* to do the dump operations.
* @param client
* @param request
* @return
*/
public abstract DataContent dataContent(Client client, RestRequest request);

public abstract String fileName(RestRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public DataContent(final Client client, final RestRequest request, final Content
public abstract void write(File outputFile, SearchResponse response, RestChannel channel,
ActionListener<Void> listener);

public RestRequest getRequest() {
return request;
}

public ContentType getContentType() {
return contentType;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package org.codelibs.elasticsearch.df.content.csv;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -13,6 +11,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.commons.codec.Charsets;
import org.apache.logging.log4j.Logger;
import org.codelibs.elasticsearch.df.content.ContentType;
import org.codelibs.elasticsearch.df.content.DataContent;
Expand Down Expand Up @@ -49,8 +48,8 @@ public CsvContent(final Client client, final RestRequest request, final ContentT
super(client, request, contentType);
csvConfig = new CsvConfig(
request.param("csv.separator", ",").charAt(0), request.param(
"csv.quote", "\"").charAt(0), request.param(
"csv.escape", "\"").charAt(0));
"csv.quote", "\"").charAt(0), request.param(
"csv.escape", "\"").charAt(0));
csvConfig.setQuoteDisabled(request.paramAsBoolean("csv.quoteDisabled",
false));
csvConfig.setEscapeDisabled(request.paramAsBoolean(
Expand All @@ -65,7 +64,7 @@ public CsvContent(final Client client, final RestRequest request, final ContentT
charsetName = request.param("csv.encoding", "UTF-8");

final String[] fields = request.paramAsStringArray("fl",
StringUtils.EMPTY_STRINGS);
StringUtils.EMPTY_STRINGS);
if (fields.length == 0) {
headerSet = new LinkedHashSet<String>();
modifiableFieldSet = true;
Expand All @@ -87,7 +86,7 @@ public CsvContent(final Client client, final RestRequest request, final ContentT

@Override
public void write(final File outputFile, final SearchResponse response, final RestChannel channel,
final ActionListener<Void> listener) {
final ActionListener<Void> listener) {
try {
final OnLoadListener onLoadListener = new OnLoadListener(
outputFile, listener);
Expand All @@ -105,6 +104,8 @@ protected class OnLoadListener implements ActionListener<SearchResponse> {

protected File outputFile;

protected String header;

private long currentCount = 0;

protected OnLoadListener(final File outputFile, final ActionListener<Void> listener) {
Expand Down Expand Up @@ -142,14 +143,6 @@ public void onResponse(final SearchResponse response) {
headerSet.add(key);
}
}
if (appendHeader) {
final List<String> headerList = new ArrayList<String>(
headerSet.size());
headerList.addAll(headerSet);
csvWriter.writeValues(headerList);
appendHeader = false;
}

final List<String> dataList = new ArrayList<String>(
dataMap.size());
for (final String name : headerSet) {
Expand All @@ -163,6 +156,21 @@ public void onResponse(final SearchResponse response) {
// end
csvWriter.flush();
close();
if (appendHeader) {
String tmp = new String();
for (String field : headerSet) {
tmp += "\"" + field + "\"" + ",";
}
header = tmp.substring(0, tmp.length() - 1);
List<String> lines = Files.readAllLines(outputFile.toPath());
BufferedWriter writer = Files.newBufferedWriter(outputFile.toPath());
writer.write(header);
for (String line : lines) {
writer.newLine();
writer.write(line);
}
writer.close();
}
listener.onResponse(null);
} else {
client.prepareSearchScroll(scrollId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public class RestDataAction extends BaseRestHandler {

@Inject
public RestDataAction(final Settings settings,
final RestController restController,
final SearchRequestParsers searchRequestParsers,
final NamedXContentRegistry xContentRegistry) {
final RestController restController,
final SearchRequestParsers searchRequestParsers,
final NamedXContentRegistry xContentRegistry) {
super(settings);

this.searchRequestParsers = searchRequestParsers;
Expand All @@ -79,14 +79,14 @@ public RestDataAction(final Settings settings,
restController.registerHandler(POST, "/{index}/{type}/_data", this);

this.maxMemory = Runtime.getRuntime().maxMemory();
this.defaultLimit = (long)(maxMemory * (DEFAULT_LIMIT_PERCENTAGE / 100F));
this.defaultLimit = (long) (maxMemory * (DEFAULT_LIMIT_PERCENTAGE / 100F));
}

protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
SearchRequestBuilder prepareSearch;
try {
final String[] indices = request.paramAsStringArray("index",
emptyStrings);
emptyStrings);
if (logger.isDebugEnabled()) {
logger.debug("indices: " + Arrays.toString(indices));
}
Expand All @@ -101,7 +101,7 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
searchSourceBuilder.parseXContent(context, searchRequestParsers.aggParsers, searchRequestParsers.suggesters, searchRequestParsers.searchExtParsers);
}
final Map<String, Object> map = SourceLookup
.sourceAsMap(restContent);
.sourceAsMap(restContent);
fromObj = map.get("from");
} else {
final String source = request.param("source");
Expand All @@ -114,7 +114,7 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
logger.debug("source: " + source);
}
final Map<String, Object> map = parser
.map();
.map();
fromObj = map.get("from");
}
}
Expand All @@ -133,23 +133,17 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
}

final String[] types = request.paramAsStringArray("type",
emptyStrings);
emptyStrings);
if (types.length > 0) {
prepareSearch.setTypes(types);
}
prepareSearch.setRouting(request.param("routing"));
prepareSearch.setPreference(request.param("preference"));
prepareSearch.setIndicesOptions(IndicesOptions.fromRequest(request,
IndicesOptions.strictExpandOpen()));
IndicesOptions.strictExpandOpen()));

final String file = request.param("file");

final ContentType contentType = getContentType(request);
if (contentType == null) {
final String msg = "Unknown content type:" + request.header("Content-Type");
throw new IllegalArgumentException(msg);
}

final long limitBytes;
String limitParamStr = request.param("limit");
if (Strings.isNullOrEmpty(limitParamStr)) {
Expand All @@ -158,7 +152,13 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
if (limitParamStr.endsWith("%")) {
limitParamStr = limitParamStr.substring(0, limitParamStr.length() - 1);
}
limitBytes = (long)(maxMemory * (Float.parseFloat(limitParamStr) / 100F));
limitBytes = (long) (maxMemory * (Float.parseFloat(limitParamStr) / 100F));
}

final ContentType contentType = getContentType(request);
if (contentType == null) {
final String msg = "Unknown content type:" + request.header("Content-Type");
throw new IllegalArgumentException(msg);
}

final DataContent dataContent = contentType.dataContent(client, request);
Expand All @@ -169,7 +169,7 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
* to capture and react to the result.
*/
return (channel) -> prepareSearch
.execute(new SearchResponseListener(request, channel, file,
.execute(new SearchResponseListener(channel, file,
limitBytes, dataContent));
} catch (final Exception e) {
logger.warn("failed to parse search request parameters", e);
Expand Down Expand Up @@ -203,7 +203,7 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
}
if (request.hasParam("terminate_after")) {
int terminateAfter = request.paramAsInt("terminate_after",
SearchContext.DEFAULT_TERMINATE_AFTER);
SearchContext.DEFAULT_TERMINATE_AFTER);
if (terminateAfter < 0) {
throw new IllegalArgumentException("terminateAfter must be > 0");
} else if (terminateAfter > 0) {
Expand All @@ -213,14 +213,14 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil

if (request.param("fields") != null) {
throw new IllegalArgumentException("The parameter [" +
SearchSourceBuilder.FIELDS_FIELD + "] is no longer supported, please use [" +
SearchSourceBuilder.STORED_FIELDS_FIELD + "] to retrieve stored fields or _source filtering " +
"if the field is not stored");
SearchSourceBuilder.FIELDS_FIELD + "] is no longer supported, please use [" +
SearchSourceBuilder.STORED_FIELDS_FIELD + "] to retrieve stored fields or _source filtering " +
"if the field is not stored");
}


StoredFieldsContext storedFieldsContext =
StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request);
StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request);
if (storedFieldsContext != null) {
searchSourceBuilder.storedFields(storedFieldsContext);
}
Expand Down Expand Up @@ -275,12 +275,18 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
int suggestSize = request.paramAsInt("suggest_size", 5);
String suggestMode = request.param("suggest_mode");
searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(suggestField,
termSuggestion(suggestField)
.text(suggestText).size(suggestSize)
.suggestMode(TermSuggestionBuilder.SuggestMode.resolve(suggestMode))));
termSuggestion(suggestField)
.text(suggestText).size(suggestSize)
.suggestMode(TermSuggestionBuilder.SuggestMode.resolve(suggestMode))));
}
}

/**
* Retrieve dump format (csv, excel, or json) from {@link RestRequest}
*
* @param request
* @return a {@link ContentType} value
*/
private ContentType getContentType(final RestRequest request) {
final String contentType = request.param("format",
request.header("Content-Type"));
Expand Down Expand Up @@ -329,7 +335,6 @@ private static BytesReference getRestContent(RestRequest request) {
}

class SearchResponseListener implements ActionListener<SearchResponse> {
private final RestRequest request;

private final RestChannel channel;

Expand All @@ -339,10 +344,8 @@ class SearchResponseListener implements ActionListener<SearchResponse> {

private long limit;

SearchResponseListener(final RestRequest request,
final RestChannel channel, final String file, final long limit,
final DataContent dataContent) {
this.request = request;
SearchResponseListener(final RestChannel channel, final String file, final long limit,
final DataContent dataContent) {
this.channel = channel;
this.dataContent = dataContent;
if (!Strings.isNullOrEmpty(file)) {
Expand Down Expand Up @@ -374,10 +377,15 @@ public void onResponse(final SearchResponse response) {
public void onResponse(final Void response) {
try {
if (useLocalFile) {
sendResponse(request,channel,
// from java 8: the local variables passed to anonymous class
// could also be "effectively final", which means their values
// are never changed after initialization.
// it's more about to encourage the use of lambda expression
// instead of creating anonymous class
sendResponse(dataContent.getRequest(), channel,
outputFile.getAbsolutePath());
} else {
writeResponse(request, channel, outputFile, limit, dataContent);
writeResponse(dataContent.getRequest(), channel, outputFile, limit, dataContent);
SearchResponseListener.this
.deleteOutputFile();
}
Expand Down Expand Up @@ -413,10 +421,10 @@ public void onFailure(final Exception e) {
}
}

private void sendResponse(final RestRequest request,final RestChannel channel, final String file) {
private void sendResponse(final RestRequest request, final RestChannel channel, final String file) {
try {
final XContentBuilder builder = JsonXContent.contentBuilder();
final String pretty=request.param("pretty");
final String pretty = request.param("pretty");
if (pretty != null && !"false".equalsIgnoreCase(pretty)) {
builder.prettyPrint().lfAtEnd();
}
Expand All @@ -437,11 +445,11 @@ private void writeResponse(final RestRequest request, final RestChannel channel,
return;
}

try (FileInputStream fis = new FileInputStream(outputFile)){
try (FileInputStream fis = new FileInputStream(outputFile)) {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] bytes = new byte[1024];
int len;
while((len = fis.read(bytes)) > 0) {
while ((len = fis.read(bytes)) > 0) {
out.write(bytes, 0, len);
}

Expand Down
Loading