Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,31 @@

package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.graph.value.Value;

public interface Aggregator<V extends Value<?>> {

/**
* Used by worker to aggregate a new value when compute a vertex, needs to
* be commutative and associative.
* Used by worker to aggregate a new value when compute a vertex.
* The combination method needs to be commutative and associative.
* Can be called in worker computation compute() or afterSuperstep().
* @param value The value to be aggregated
*/
void aggregateValue(V value);

/**
* Used by worker to aggregate an int value when compute a vertex. For
* performance reasons, it can aggregate without create an IntValue object.
* Used by worker to aggregate an int value. For performance reasons, it
* can aggregate without create an IntValue object.
*/
default void aggregateValue(int value) {
throw new ComputerException("Not implemented: aggregateValue(int)");
}

/**
* Used by worker to aggregate a long value. For performance reasons, it can
* aggregate without create a LongValue object.
* Used by worker to aggregate a long value. For performance reasons, it
* can aggregate without create a LongValue object.
*/
default void aggregateValue(long value) {
throw new ComputerException("Not implemented: aggregateValue(long)");
Expand All @@ -64,17 +66,28 @@ default void aggregateValue(double value) {
}

/**
* Used by worker or master to get current aggregated value. The worker
* get aggregated value before a superstep. The master can get the
* aggregated value after a superstep.
* Used by worker or master to get the aggregated value. The worker
* get an aggregated value of previous superstep in current superstep.
* The master can get an aggregated value of current superstep when
* master-computation compute().
*/
V aggregatedValue();

/**
* Used by worker or master to set current aggregated value directly. The
* worker set aggregated value and then sent to master for further
* aggregation. The master set aggregated value and then use by worker in
* next superstep.
* worker set aggregated value and then send to master for further
* aggregation. The master set aggregated value and then used by workers in
* the next superstep.
*/
void aggregatedValue(V value);

/**
* Return cloned object of this instance.
*/
Aggregator<V> copy();

/**
* Repair the object because some fields may not be deserialized.
*/
void repair(ComputerContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,46 @@ public interface Aggregator4Master {

/**
* Register the aggregator with specified name. The name must be unique.
* Used by algorithm's master-computation to register aggregators.
* Used by algorithm's master-computation init() to register aggregators.
*/
void registerAggregator(String name, Class<? extends Aggregator<Value<?>>>
aggregatorClass);
<V extends Value<?>, C extends Aggregator<V>>
void registerAggregator(String name, Class<C> aggregator);

/**
* Register aggregator with specified value type and a combiner which can
* combine values with specified value type. The name must be unique.
* Used by algorithm's master-computation to register aggregators.
* Used by algorithm's master-computation init() to register aggregators.
*/
void registerAggregator(String name, ValueType type,
Class<? extends Combiner<Value<?>>> combinerClass);
<V extends Value<?>, C extends Combiner<V>>
void registerAggregator(String name, ValueType type, Class<C> combiner);

/**
* Set the aggregated value by master-computation. The value will be
* received by workers at next superstep.
* Throws ComputerException if master does not register the aggregator
* with specified name.
* Register aggregator with specified default value(include type) and
* a combiner which can combine values with specified value type.
* The name must be unique.
* Used by algorithm's master-computation init() to register aggregators.
*/
<V extends Value<?>, C extends Combiner<V>>
void registerAggregator(String name, V defaultValue, Class<C> combiner);

/**
* Set the aggregated value by master-computation, generally users may not
* need to explicitly set a aggregated value.
* If the value is set, it will be received by workers at next superstep.
* Throws ComputerException if master-computation does not register
* aggregator with specified name.
*/
<V extends Value<?>> void aggregatedValue(String name, V value);

/**
* Get the aggregated value. The aggregated value is aggregated from
* workers at this superstep.
* Throws ComputerException if master does not register the aggregator
* with specified name.
* Get the aggregated value. Each worker aggregate the aggregator value
* locally, then submit to master, then master aggregate the aggregators
* value from all workers. master-computation can get the aggregated value
* in master compute(), and worker-computation can get the aggregated value
* in the next superstep.
* Used by algorithm's master-computation compute()
* Throws ComputerException if master-computation does not register
* aggregator with the specified name.
*/
<V extends Value<?>> V aggregatedValue(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,38 @@
public interface Aggregator4Worker {

/**
* Set aggregate value after superstep. The value will be sent to
* master when current superstep finish.
* Throws ComputerException if master does not register the aggregator
* with specified name.
* Create aggregator by name in worker-computation, the aggregator is
* registered by master-computation.
* Used by algorithm's worker-computation beforeSuperstep(), can be called
* in each superstep.
* Throws ComputerException if master-computation does not register
* aggregator with the specified name.
*/
<V extends Value<?>> Aggregator<V> createAggregator(String name);

/**
* Set aggregate value in worker. The value of aggregator will be
* aggregated locally in worker first, and it would be sent to master when
* the current superstep finish.
* Used by algorithm's worker-computation afterSuperstep(), can be called
* in each superstep.
* Throws ComputerException if master-computation does not register
* aggregator with the specified name.
* @param value The value to be aggregated
*/
<V extends Value<?>> void aggregateValue(String name, V value);

/**
* Get the aggregated value before a superstep start. The value is
* aggregated by master at previous superstep.
* Throws ComputerException if master does not register the aggregator
* with specified name.
* Get the aggregated value in worker-computation, the value is aggregated
* by master at previous superstep, it won't be changed in a superstep.
* Each worker aggregate an aggregator value locally, then submit to master,
* then master aggregate the aggregator values from all workers.
* master-computation can get the aggregated value in master compute(), and
* worker-computation can get the aggregated value in the next superstep.
* Used by algorithm's worker-computation compute(), can be called in
* each superstep.
* Throws ComputerException if master-computation does not register
* aggregator with the specified name.
*/
<V extends Value<?>> V aggregatedValue(String name);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.aggregator;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.rpc.AggregateRpcService;
import com.baidu.hugegraph.util.E;
import com.google.common.collect.ImmutableMap;

public class Aggregators {

private final Map<String, Aggregator<? extends Value<?>>> aggregators;

public Aggregators() {
this(ImmutableMap.of());
}

public Aggregators(Map<String, Aggregator<?>> aggrs) {
this.aggregators = new ConcurrentHashMap<>(aggrs);
}

public Map<String, Value<?>> values() {
Map<String, Value<?>> values = new HashMap<>();
for (Entry<String, Aggregator<? extends Value<?>>> aggr :
this.aggregators.entrySet()) {
values.put(aggr.getKey(), aggr.getValue().aggregatedValue());
}
return values;
}

public <V extends Value<?>> Aggregator<V> get(String name,
AggregateRpcService service) {
Aggregator<?> aggregator = this.aggregators.get(name);
if (aggregator == null) {
if (service != null) {
// Try to get the aggregator maybe created dynamic
aggregator = service.getAggregator(name);
if (aggregator != null) {
this.aggregators.put(name, aggregator);
}
}
E.checkArgument(aggregator != null,
"Can't get aggregator '%s'", name);
}
@SuppressWarnings("unchecked")
Aggregator<V> result = (Aggregator<V>) aggregator;
return result;
}

public void reset(RegisterAggregators register) {
this.aggregators.clear();
this.aggregators.putAll(register.copyAll());
}

public void clear() {
this.aggregators.clear();
}
}
Loading