Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.List;
import java.util.Set;

import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.geo.GeoResult;
import org.springframework.data.mongodb.core.geo.GeoResults;
Expand Down Expand Up @@ -301,6 +303,16 @@ public interface MongoOperations {
*/
<T> GroupByResults<T> group(Criteria criteria, String inputCollectionName, GroupBy groupBy, Class<T> entityClass);

/**
* Execute an aggregation operation. The raw results will be mapped to the given entity class.
*
* @param inputCollectionName the collection there the aggregation operation will read from.
* @param pipeline The pipeline holding the aggregation operations.
* @param entityClass The parameterized type of the returned list.
* @return The results of the aggregation operation.
*/
<T> AggregationResults<T> aggregate(String inputCollectionName, AggregationPipeline pipeline, Class<T> entityClass);

/**
* Execute a map-reduce operation. The map-reduce operation will be formed with an output type of INLINE
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package org.springframework.data.mongodb.core;

import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.SerializationUtils.*;
import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.SerializationUtils.serializeToJsonSafely;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -54,6 +54,8 @@
import org.springframework.data.mapping.model.BeanWrapper;
import org.springframework.data.mapping.model.MappingException;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.MongoWriter;
Expand Down Expand Up @@ -1171,6 +1173,31 @@ public <T> GroupByResults<T> group(Criteria criteria, String inputCollectionName

}

public <T> AggregationResults<T> aggregate(String inputCollectionName, AggregationPipeline pipeline, Class<T> entityClass) {
Assert.notNull(inputCollectionName, "Collection name is missing");
Assert.notNull(pipeline, "Aggregation pipeline is missing");
Assert.notNull(entityClass, "Entity class is missing");

// prepare command
DBObject command = new BasicDBObject("aggregate", inputCollectionName );
command.put( "pipeline", pipeline.getOperations() );

// execute command
CommandResult commandResult = executeCommand(command);
handleCommandError(commandResult, command);

// map results
@SuppressWarnings("unchecked")
Iterable<DBObject> resultSet = (Iterable<DBObject>) commandResult.get("result");
List<T> mappedResults = new ArrayList<T>();
DbObjectCallback<T> callback = new ReadDbObjectCallback<T>(mongoConverter, entityClass);
for (DBObject dbObject : resultSet) {
mappedResults.add(callback.doWith(dbObject));
}

return new AggregationResults<T>(mappedResults, commandResult);
}

protected String replaceWithResourceIfNecessary(String function) {

String func = function;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2011-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;

import java.util.ArrayList;
import java.util.List;

import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.util.Assert;

import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.util.JSON;
import com.mongodb.util.JSONParseException;

/**
* Holds the operations of an aggregation pipeline.
*
* @author Tobias Trelle
*/
public class AggregationPipeline {

private static final String OPERATOR_PREFIX = "$";

private List<DBObject> operations = new ArrayList<DBObject>();

/**
* Adds a projection operation to the pipeline.
*
* @param projection JSON string holding the projection.
* @return The pipeline.
*/
public AggregationPipeline project(String projection) {
return addDocumentOperation("project", projection);
}

/**
* Adds a projection operation to the pipeline.
*
* @param projection Type safe projection object.
* @return The pipeline.
*/
public AggregationPipeline project(Projection projection) {
return addOperation("project", projection.toDBObject() );
}

/**
* Adds an unwind operation to the pipeline.
*
* @param field Name of the field to unwind (should be an array).
* @return The pipeline.
*/
public AggregationPipeline unwind(String field) {
Assert.notNull(field, "Missing field name");

if (!field.startsWith(OPERATOR_PREFIX)) {
field = OPERATOR_PREFIX + field;
}

return addOperation("unwind", field);
}

/**
* Adds a group operation to the pipeline.
*
* @param projection JSON string holding the group.
* @return The pipeline.
*/
public AggregationPipeline group(String group) {
return addDocumentOperation("group", group);
}

/**
* Adds a sort operation to the pipeline.
*
* @param sort JSON string holding the sorting.
* @return The pipeline.
*/
public AggregationPipeline sort(String sort) {
return addDocumentOperation("sort", sort);
}

/**
* Adds a sort operation to the pipeline.
*
* @param sort Type safe sort operation.
* @return The pipeline.
*/
public AggregationPipeline sort(Sort sort) {
Assert.notNull(sort);

DBObject dbo = new BasicDBObject();

for (org.springframework.data.domain.Sort.Order order : sort) {
dbo.put(order.getProperty(), order.isAscending() ? 1 : -1);
}
return addOperation("sort", dbo);
}

/**
* Adds a match operation to the pipeline that is basically a query on the collection.s
*
* @param projection JSON string holding the criteria.
* @return The pipeline.
*/
public AggregationPipeline match(String match) {
return addDocumentOperation("match", match);
}

/**
* Adds a match operation to the pipeline that is basically a query on the collection.s
*
* @param criteria Type safe criteria to filter documents from the collection.
* @return The pipeline.
*/
public AggregationPipeline match(Criteria criteria) {
Assert.notNull(criteria);

return addOperation("match", criteria.getCriteriaObject());
}

/**
* Adds an limit operation to the pipeline.
*
* @param n Number of document to consider.
* @return The pipeline.
*/
public AggregationPipeline limit(long n) {
return addOperation("limit", n);
}

/**
* Adds an skip operation to the pipeline.
*
* @param n Number of documents to skip.
* @return The pipeline.
*/
public AggregationPipeline skip(long n) {
return addOperation("skip", n);
}

public List<DBObject> getOperations() {
return operations;
}

private AggregationPipeline addDocumentOperation(String opName, String operation) {
Assert.notNull(operation, "Missing " + opName);
return addOperation(opName, parseJson(operation));
}

private AggregationPipeline addOperation(String key, Object value) {
this.operations.add(new BasicDBObject(OPERATOR_PREFIX + key, value));
return this;
}

private DBObject parseJson(String json) {
try {
return (DBObject) JSON.parse(json);
} catch (JSONParseException e) {
throw new IllegalArgumentException("Not a valid JSON document: " + json, e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2011-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.springframework.util.Assert;

import com.mongodb.DBObject;

/**
* Collects the results of executing an aggregation operation.
*
* @author Tobias Trelle
*
* @param <T> The class in which the results are mapped onto.
*/
public class AggregationResults<T> implements Iterable<T> {

private final List<T> mappedResults;
private final DBObject rawResults;

private String serverUsed;

public AggregationResults(List<T> mappedResults, DBObject rawResults) {
Assert.notNull(mappedResults);
Assert.notNull(rawResults);
this.mappedResults = mappedResults;
this.rawResults = rawResults;
parseServerUsed();
}

public List<T> getAggregationResult() {
List<T> result = new ArrayList<T>();
Iterator<T> it = iterator();

while (it.hasNext()) {
result.add(it.next());
}

return result;
}

@Override
public Iterator<T> iterator() {
return mappedResults.iterator();
}

public String getServerUsed() {
return serverUsed;
}

private void parseServerUsed() {
Object object = rawResults.get("serverUsed");
if (object instanceof String) {
serverUsed = (String) object;
}
}

}
Loading