Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra-parent</artifactId>
<version>1.5.0.BUILD-SNAPSHOT</version>
<version>1.5.0.DATACASS-297-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data Cassandra</name>
Expand Down
2 changes: 1 addition & 1 deletion spring-cql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra-parent</artifactId>
<version>1.5.0.BUILD-SNAPSHOT</version>
<version>1.5.0.DATACASS-297-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.util.Assert;

import com.datastax.driver.core.BoundStatement;
Expand Down Expand Up @@ -854,19 +855,49 @@ public List<Map<String, Object>> processListOfMap(ResultSet resultSet) {
}

/**
* Attempts to translate the {@link RuntimeException} into a Spring Data {@link Exception}.
* Attempts to translate the {@link Exception} into a Spring Data {@link Exception}.
* @param ex the Exception
* @return the translated {@link RuntimeException}
*/
@SuppressWarnings("all")
protected RuntimeException translateExceptionIfPossible(RuntimeException e) {

RuntimeException resolved = getExceptionTranslator().translateExceptionIfPossible(e);
return (resolved != null ? resolved : e);
protected RuntimeException translateExceptionIfPossible(Exception ex) {
return translateExceptionIfPossible(ex, getExceptionTranslator());
}

/**
* Tries to convert the given {@link RuntimeException} into a {@link DataAccessException} but returns the original
* exception if the conversation failed. Thus allows safe re-throwing of the return value.
*
* @param ex the exception to translate
* @param exceptionTranslator the {@link PersistenceExceptionTranslator} to be used for translation
* @return
*/
@SuppressWarnings("all")
protected RuntimeException translateExceptionIfPossible(Exception e) {
return (e instanceof RuntimeException ? translateExceptionIfPossible((RuntimeException) e)
: new CassandraUncategorizedDataAccessException("Caught Uncategorized Exception", e));
protected static RuntimeException translateExceptionIfPossible(Exception ex, PersistenceExceptionTranslator exceptionTranslator) {

Assert.notNull(ex, "Exception must not be null");
Assert.notNull(exceptionTranslator, "PersistenceExceptionTranslator must not be null");

if (ex instanceof RuntimeException) {
return potentiallyConvertRuntimeException((RuntimeException) ex, exceptionTranslator);
}

return new CassandraUncategorizedDataAccessException("Caught Uncategorized Exception", ex);
}

/**
* Tries to convert the given {@link RuntimeException} into a {@link DataAccessException} but returns the original
* exception if the conversation failed. Thus allows safe re-throwing of the return value.
*
* @param ex the exception to translate
* @param exceptionTranslator the {@link PersistenceExceptionTranslator} to be used for translation
* @return
*/
private static RuntimeException potentiallyConvertRuntimeException(RuntimeException ex,
PersistenceExceptionTranslator exceptionTranslator) {

RuntimeException resolved = exceptionTranslator.translateExceptionIfPossible(ex);
return resolved == null ? ex : resolved;
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion spring-data-cassandra-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra-parent</artifactId>
<version>1.5.0.BUILD-SNAPSHOT</version>
<version>1.5.0.DATACASS-297-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra-parent</artifactId>
<version>1.5.0.BUILD-SNAPSHOT</version>
<version>1.5.0.DATACASS-297-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.cassandra.core;

import java.util.Iterator;
import java.util.List;

import org.springframework.cassandra.core.Cancellable;
Expand Down Expand Up @@ -51,6 +52,20 @@ public interface CassandraOperations extends CqlOperations {
*/
CqlIdentifier getTableName(Class<?> entityClass);

/**
* Executes the given select {@code query} on the entity table of the specified {@code type} backed by a Cassandra
* {@link com.datastax.driver.core.ResultSet}.
* <p>
* Returns a {@link java.util.Iterator} that wraps the a Cassandra {@link com.datastax.driver.core.ResultSet}.
*
* @param <T> element return type
* @param query must not be empty and not {@literal null}.
* @param type must not be {@literal null}.
* @return
* @since 1.5
*/
<T> Iterator<T> stream(String query, Class<T> type);

/**
* Execute query and convert ResultSet to the list of entities.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -34,6 +35,7 @@
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.cassandra.convert.CassandraConverter;
import org.springframework.data.cassandra.convert.MappingCassandraConverter;
import org.springframework.data.cassandra.mapping.CassandraMappingContext;
Expand Down Expand Up @@ -588,6 +590,29 @@ public ResultSet doInSession(Session s) throws DataAccessException {
return result;
}

/* (non-Javadoc)
* @see org.springframework.data.cassandra.core.CassandraOperations#stream(java.lang.String, java.lang.Class)
*/
public <T> Iterator<T> stream(final String query, Class<T> type) {

Assert.hasText(query, "Query must not be empty");
Assert.notNull(type, "Type must not be null");

ResultSet resultSet = doExecute(new SessionCallback<ResultSet>() {

@Override
public ResultSet doInSession(Session s) throws DataAccessException {
return s.execute(query);
}
});

if (resultSet == null) {
return Collections.<T>emptyList().iterator();
}

return new ResultSetIteratorAdapter(resultSet.iterator(), getExceptionTranslator(), new CassandraConverterRowCallback<T>(cassandraConverter, type));
}

protected <T> List<T> select(final Select query, CassandraConverterRowCallback<T> readRowCallback) {

ResultSet resultSet = doExecute(new SessionCallback<ResultSet>() {
Expand Down Expand Up @@ -1067,8 +1092,7 @@ public void onQueryComplete(ResultSetFuture rsf) {
throw new DuplicateKeyException("found two or more results in query " + query);
}
listener.onQueryComplete(result);
}
else{
} else {
listener.onQueryComplete(null);
}
} catch (Exception e) {
Expand All @@ -1085,4 +1109,38 @@ public void onQueryComplete(ResultSetFuture rsf) {
throw new IllegalArgumentException(
String.format("Expected type String or Select; got type [%s] with value [%s]", query.getClass(), query));
}

private static class ResultSetIteratorAdapter<T> implements Iterator<T>{

private final Iterator<Row> iterator;
private final PersistenceExceptionTranslator exceptionTranslator;
private final CassandraConverterRowCallback<T> rowCallback;

public ResultSetIteratorAdapter(Iterator<Row> iterator, PersistenceExceptionTranslator exceptionTranslator, CassandraConverterRowCallback<T> rowCallback) {

this.iterator = iterator;
this.exceptionTranslator = exceptionTranslator;
this.rowCallback = rowCallback;
}

@Override
public boolean hasNext() {

try {
return iterator.hasNext();
} catch (Exception e) {
throw translateExceptionIfPossible(e, exceptionTranslator);
}
}

@Override
public T next() {

try {
return rowCallback.doWith(iterator.next());
} catch (Exception e) {
throw translateExceptionIfPossible(e, exceptionTranslator);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.springframework.data.cassandra.repository.query.CassandraQueryExecution.ResultProcessingExecution;
import org.springframework.data.cassandra.repository.query.CassandraQueryExecution.ResultSetQuery;
import org.springframework.data.cassandra.repository.query.CassandraQueryExecution.SingleEntityExecution;
import org.springframework.data.cassandra.repository.query.CassandraQueryExecution.StreamExecution;
import org.springframework.data.repository.query.ParameterAccessor;
import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.data.repository.query.ResultProcessor;
Expand Down Expand Up @@ -100,13 +101,15 @@ public Object execute(Object[] parameters) {
private CassandraQueryExecution getExecution(String query, CassandraParameterAccessor accessor,
Converter<Object, Object> resultProcessing) {

return new ResultProcessingExecution(getExecutionToWrap(accessor), resultProcessing);
return new ResultProcessingExecution(getExecutionToWrap(accessor, resultProcessing), resultProcessing);
}

private CassandraQueryExecution getExecutionToWrap(CassandraParameterAccessor accessor) {
private CassandraQueryExecution getExecutionToWrap(CassandraParameterAccessor accessor, Converter<Object, Object> resultProcessing) {

if (method.isResultSetQuery()) {
return new ResultSetQuery(template);
} else if (method.isStreamQuery()) {
return new StreamExecution(template, resultProcessing);
} else if (method.isCollectionQuery()) {
return new CollectionExecution(template);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package org.springframework.data.cassandra.repository.query;

import java.util.function.Function;

import org.springframework.core.convert.converter.Converter;
import org.springframework.data.cassandra.core.CassandraOperations;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ReturnedType;
import org.springframework.data.util.StreamUtils;
import org.springframework.util.ClassUtils;

import lombok.NonNull;
Expand All @@ -35,6 +38,33 @@ interface CassandraQueryExecution {

Object execute(String query, Class<?> type);

/**
* {@link CassandraQueryExecution} for a Stream.
*
* @author Mark Paluch
*/
@RequiredArgsConstructor
final class StreamExecution implements CassandraQueryExecution {

private final @NonNull CassandraOperations operations;
private final @NonNull Converter<Object, Object> resultProcessing;

/* (non-Javadoc)
* @see org.springframework.data.cassandra.repository.query.CassandraQueryExecution#execute(java.lang.String, java.lang.Class)
*/
@Override
public Object execute(String query, Class<?> type) {

return StreamUtils.createStreamFromIterator(operations.stream(query, type)).map(new Function<Object, Object>() {

@Override
public Object apply(Object t) {
return resultProcessing.convert(t);
}
});
}
}

/**
* {@link CassandraQueryExecution} for collection returning queries.
*
Expand All @@ -45,6 +75,9 @@ final class CollectionExecution implements CassandraQueryExecution {

private final @NonNull CassandraOperations operations;

/* (non-Javadoc)
* @see org.springframework.data.cassandra.repository.query.CassandraQueryExecution#execute(java.lang.String, java.lang.Class)
*/
@Override
public Object execute(String query, Class<?> type) {
return operations.select(query, type);
Expand All @@ -61,6 +94,9 @@ final class SingleEntityExecution implements CassandraQueryExecution {

private final @NonNull CassandraOperations operations;

/* (non-Javadoc)
* @see org.springframework.data.cassandra.repository.query.CassandraQueryExecution#execute(java.lang.String, java.lang.Class)
*/
@Override
public Object execute(String query, Class<?> type) {
return operations.selectOne(query, type);
Expand All @@ -77,6 +113,9 @@ final class ResultSetQuery implements CassandraQueryExecution {

private final @NonNull CassandraOperations operations;

/* (non-Javadoc)
* @see org.springframework.data.cassandra.repository.query.CassandraQueryExecution#execute(java.lang.String, java.lang.Class)
*/
@Override
public Object execute(String query, Class<?> type) {
return operations.query(query);
Expand All @@ -94,6 +133,9 @@ final class ResultProcessingExecution implements CassandraQueryExecution {
private final @NonNull CassandraQueryExecution delegate;
private final @NonNull Converter<Object, Object> converter;

/* (non-Javadoc)
* @see org.springframework.data.cassandra.repository.query.CassandraQueryExecution#execute(java.lang.String, java.lang.Class)
*/
@Override
public Object execute(String query, Class<?> type) {
return converter.convert(delegate.execute(query, type));
Expand All @@ -110,6 +152,9 @@ final class ResultProcessingConverter implements Converter<Object, Object> {

private final @NonNull ResultProcessor processor;

/* (non-Javadoc)
* @see org.springframework.core.convert.converter.Converter#convert(java.lang.Object)
*/
@Override
public Object convert(Object source) {

Expand Down
Loading