Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.graph.value.Value;

Expand Down Expand Up @@ -77,4 +78,14 @@ default void aggregateValue(double value) {
* next superstep.
*/
void aggregatedValue(V value);

/**
* Return cloned object of this instance.
*/
Aggregator<V> copy();

/**
* Repair the object because some fields may not be deserialized.
*/
void repair(ComputerContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,25 @@ public interface Aggregator4Master {
* Register the aggregator with specified name. The name must be unique.
* Used by algorithm's master-computation to register aggregators.
*/
void registerAggregator(String name, Class<? extends Aggregator<Value<?>>>
aggregatorClass);
<V extends Value<?>, C extends Aggregator<V>>
void registerAggregator(String name, Class<C> aggregator);

/**
* Register aggregator with specified value type and a combiner which can
* combine values with specified value type. The name must be unique.
* Used by algorithm's master-computation to register aggregators.
*/
void registerAggregator(String name, ValueType type,
Class<? extends Combiner<Value<?>>> combinerClass);
<V extends Value<?>, C extends Combiner<V>>
void registerAggregator(String name, ValueType type, Class<C> combiner);

/**
* Register aggregator with specified default value(include type) and
* a combiner which can combine values with specified value type.
* The name must be unique.
* Used by algorithm's master-computation to register aggregators.
*/
<V extends Value<?>, C extends Combiner<V>>
void registerAggregator(String name, V defaultValue, Class<C> combiner);

/**
* Set the aggregated value by master-computation. The value will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@
public interface Aggregator4Worker {

/**
* Set aggregate value after superstep. The value will be sent to
* Create aggregator by name in worker, the aggregator is registered by
* master. Can be called in each superstep.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is master and master-computation interchangeable?

Copy link
Contributor Author

@javeme javeme May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean aslo add createAggregator() to Aggregator4Master?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the aggregator is registered by master-computation.

* Throws ComputerException if master does not register the aggregator
* with specified name.
*/
<V extends Value<?>> Aggregator<V> createAggregator(String name);

/**
* Set aggregate value after a superstep. The value will be sent to
* master when current superstep finish.
* Throws ComputerException if master does not register the aggregator
* with specified name.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.aggregator;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.rpc.AggregateRpcService;
import com.baidu.hugegraph.util.E;
import com.google.common.collect.ImmutableMap;

public class Aggregators {

private final Map<String, Aggregator<? extends Value<?>>> aggregators;

public Aggregators() {
this(ImmutableMap.of());
}

public Aggregators(Map<String, Aggregator<?>> aggrs) {
this.aggregators = new ConcurrentHashMap<>(aggrs);
}

public Map<String, Value<?>> values() {
Map<String, Value<?>> values = new HashMap<>();
for (Entry<String, Aggregator<? extends Value<?>>> aggr :
this.aggregators.entrySet()) {
values.put(aggr.getKey(), aggr.getValue().aggregatedValue());
}
return values;
}

public <V extends Value<?>> Aggregator<V> get(String name,
AggregateRpcService service) {
Aggregator<?> aggregator = this.aggregators.get(name);
if (aggregator == null) {
if (service != null) {
// Try to get the aggregator maybe created dynamic
aggregator = service.getAggregator(name);
if (aggregator != null) {
this.aggregators.put(name, aggregator);
}
}
E.checkArgument(aggregator != null,
"Can't get aggregator '%s'", name);
}
@SuppressWarnings("unchecked")
Aggregator<V> result = (Aggregator<V>) aggregator;
return result;
}

public void reset(RegisterAggregators register) {
this.aggregators.clear();
this.aggregators.putAll(register.copyAll());
}

public void clear() {
this.aggregators.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.combiner.Combiner;
import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.graph.value.FloatValue;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.graph.value.LongValue;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.value.ValueType;
import com.baidu.hugegraph.util.E;

public class DefaultAggregator<V extends Value<?>> implements Aggregator<V> {

private final ValueType type;
private final Class<? extends Combiner<V>> combinerClass;

private transient Combiner<V> combiner;
private transient ThreadLocal<V> localValue;

private V value;

public DefaultAggregator(ComputerContext context, ValueType type,
Class<? extends Combiner<V>> combinerClass,
V defaultValue) {
E.checkArgument(type != null,
"The value type of aggregator can't be null");
E.checkArgument(combinerClass != null,
"The combiner of aggregator can't be null");
this.type = type;
this.combinerClass = combinerClass;

if (defaultValue != null) {
this.checkValue(defaultValue);
}
this.value = defaultValue;

if (context != null) {
this.repair(context);
}

E.checkArgument(this.value != null,
"Must provide default value for aggregator");
}

@Override
public void aggregateValue(V value) {
this.checkValue(value);
this.value = this.combiner.combine(value, this.value);
}

@Override
public void aggregateValue(int value) {
assert this.type == ValueType.INT;
V localValue = this.localValue.get();
((IntValue) localValue).value(value);
this.combineAndSwapIfNeeded(localValue, this.value);
}

@Override
public void aggregateValue(long value) {
assert this.type == ValueType.LONG;
V localValue = this.localValue.get();
((LongValue) localValue).value(value);
this.combineAndSwapIfNeeded(localValue, this.value);
}

@Override
public void aggregateValue(float value) {
assert this.type == ValueType.FLOAT;
V localValue = this.localValue.get();
((FloatValue) localValue).value(value);
this.combineAndSwapIfNeeded(localValue, this.value);
}

@Override
public void aggregateValue(double value) {
assert this.type == ValueType.DOUBLE;
V localValue = this.localValue.get();
((DoubleValue) localValue).value(value);
this.combineAndSwapIfNeeded(localValue, this.value);
}

private void combineAndSwapIfNeeded(V localValue, V thisValue) {
// TODO: call combine(localValue, thisValue, <output>thisValue)
V tmp = this.combiner.combine(localValue, thisValue);
if (tmp == localValue) {
this.localValue.set(thisValue);
this.value = tmp;
} else {
assert tmp == thisValue;
}
}

@Override
public V aggregatedValue() {
assert this.value != null;
return this.value;
}

@Override
public void aggregatedValue(V value) {
this.checkValue(value);
this.value = value;
}

private void checkValue(V value) {
E.checkNotNull(value, "aggregator", "value");
E.checkArgument(value.type() == this.type,
"Can't set %s value '%s' to %s aggregator",
value.type().string(), value, this.type.string());
}

@Override
public String toString() {
return this.value.toString();
}

@Override
public Aggregator<V> copy() {
DefaultAggregator<V> aggregator = new DefaultAggregator<>(
null, this.type,
this.combinerClass,
this.value);
// Ensure deep copy the value
@SuppressWarnings("unchecked")
V deepCopyValue = (V) this.value.copy();
aggregator.value = deepCopyValue;
aggregator.combiner = this.combiner;
aggregator.localValue = this.localValue;
return aggregator;
}

@Override
public void repair(ComputerContext context) {
try {
this.combiner = this.combinerClass.newInstance();
} catch (Exception e) {
throw new ComputerException("Can't new instance from class: %s",
e, this.combinerClass.getName());
}

this.localValue = ThreadLocal.withInitial(() -> {
return this.newValue(context);
});

if (this.value == null) {
this.value = this.newValue(context);
}
}

private V newValue(ComputerContext context) {
@SuppressWarnings("unchecked")
V val = (V) context.valueFactory().createValue(this.type);
return val;
}
}
Loading