Class AggregatorManagerImpl
- java.lang.Object
-
- com.alibaba.graphscope.graph.impl.AggregatorManagerImpl
-
- All Implemented Interfaces:
AggregatorManager
,org.apache.giraph.aggregators.AggregatorUsage
,org.apache.giraph.worker.WorkerAggregatorUsage
,org.apache.giraph.worker.WorkerBroadcastUsage
,org.apache.giraph.worker.WorkerGlobalCommUsage
,org.apache.giraph.worker.WorkerReduceUsage
public class AggregatorManagerImpl extends Object implements AggregatorManager, org.apache.giraph.worker.WorkerAggregatorUsage, org.apache.giraph.worker.WorkerGlobalCommUsage
-
-
Constructor Summary
Constructors Constructor Description AggregatorManagerImpl(ImmutableClassesGiraphConfiguration<?,?,?> conf, int workerId, int workerNum)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
acceptNettyMessage(NettyMessage aggregatorMessage)
Accept a message from other worker, aggregate to me.<A extends org.apache.hadoop.io.Writable>
voidaggregate(String name, A value)
Add a new value.void
broadcast(String name, org.apache.hadoop.io.Writable value)
Broadcast given value to all workers for next computation.<A extends org.apache.hadoop.io.Writable>
AgetAggregatedValue(String name)
Return current aggregated value.<B extends org.apache.hadoop.io.Writable>
BgetBroadcast(String name)
Get value broadcasted from masterint
getNumWorkers()
<R extends org.apache.hadoop.io.Writable>
RgetReduced(String name)
Get reduced value from previous worker computation.int
getWorkerId()
void
init(FFICommunicator communicator)
Init the manager with Grape::Communicator, the actual logic depends on implementation.void
postMasterCompute()
void
postSuperstep()
Synchronize aggregator values between workers after superstep.void
preSuperstep()
void
reduce(String name, Object value)
Reduce given value.void
reduceMerge(String name, org.apache.hadoop.io.Writable value)
Reduce given partial value.<A extends org.apache.hadoop.io.Writable>
booleanregisterAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass)
Register an aggregator with a unique name<A extends org.apache.hadoop.io.Writable>
booleanregisterPersistentAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass)
Register a persistent aggregator with a unique name.<S,R extends org.apache.hadoop.io.Writable>
voidregisterReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp)
Register reducer to be reduced in the next worker computation, using given name and operations.<S,R extends org.apache.hadoop.io.Writable>
voidregisterReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp, R globalInitialValue)
Register reducer to be reduced in the next worker computation, using given name and operations, starting globally from globalInitialValue.<A extends org.apache.hadoop.io.Writable>
voidsetAggregatedValue(String name, A value)
Set aggregated value.
-
-
-
Constructor Detail
-
AggregatorManagerImpl
public AggregatorManagerImpl(ImmutableClassesGiraphConfiguration<?,?,?> conf, int workerId, int workerNum)
-
-
Method Detail
-
init
public void init(FFICommunicator communicator)
Description copied from interface:AggregatorManager
Init the manager with Grape::Communicator, the actual logic depends on implementation.- Specified by:
init
in interfaceAggregatorManager
- Parameters:
communicator
- communicator.
-
acceptNettyMessage
public void acceptNettyMessage(NettyMessage aggregatorMessage)
Accept a message from other worker, aggregate to me.- Specified by:
acceptNettyMessage
in interfaceAggregatorManager
- Parameters:
aggregatorMessage
- received message.
-
getWorkerId
public int getWorkerId()
- Specified by:
getWorkerId
in interfaceAggregatorManager
-
getNumWorkers
public int getNumWorkers()
- Specified by:
getNumWorkers
in interfaceAggregatorManager
-
registerAggregator
public <A extends org.apache.hadoop.io.Writable> boolean registerAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException
Register an aggregator with a unique name- Specified by:
registerAggregator
in interfaceAggregatorManager
- Type Parameters:
A
- type param- Parameters:
name
- aggregator nameaggregatorClass
- the class- Throws:
InstantiationException
IllegalAccessException
-
registerPersistentAggregator
public <A extends org.apache.hadoop.io.Writable> boolean registerPersistentAggregator(String name, Class<? extends org.apache.giraph.aggregators.Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException
Register a persistent aggregator with a unique name.- Specified by:
registerPersistentAggregator
in interfaceAggregatorManager
- Type Parameters:
A
- type param- Parameters:
name
- aggregator nameaggregatorClass
- the implementation class- Throws:
InstantiationException
IllegalAccessException
-
getAggregatedValue
public <A extends org.apache.hadoop.io.Writable> A getAggregatedValue(String name)
Return current aggregated value. Needs to be initialized if aggregate or setAggregatedValue have not been called before.- Specified by:
getAggregatedValue
in interfaceAggregatorManager
- Specified by:
getAggregatedValue
in interfaceorg.apache.giraph.aggregators.AggregatorUsage
- Parameters:
name
- name for the aggregator- Returns:
- Aggregated
-
getBroadcast
public <B extends org.apache.hadoop.io.Writable> B getBroadcast(String name)
Get value broadcasted from master- Specified by:
getBroadcast
in interfaceorg.apache.giraph.worker.WorkerBroadcastUsage
- Parameters:
name
- Name of the broadcasted value- Returns:
- Broadcasted value
-
reduce
public void reduce(String name, Object value)
Reduce given value.- Specified by:
reduce
in interfaceAggregatorManager
- Specified by:
reduce
in interfaceorg.apache.giraph.worker.WorkerReduceUsage
- Parameters:
name
- Name of the reducervalue
- Single value to reduce
-
reduceMerge
public void reduceMerge(String name, org.apache.hadoop.io.Writable value)
Reduce given partial value.- Specified by:
reduceMerge
in interfaceorg.apache.giraph.worker.WorkerReduceUsage
- Parameters:
name
- Name of the reducervalue
- Single value to reduce
-
setAggregatedValue
public <A extends org.apache.hadoop.io.Writable> void setAggregatedValue(String name, A value)
Set aggregated value. Can be used for initialization or reset.- Specified by:
setAggregatedValue
in interfaceAggregatorManager
- Parameters:
name
- name for the aggregatorvalue
- Value to be set.
-
aggregate
public <A extends org.apache.hadoop.io.Writable> void aggregate(String name, A value)
Add a new value. Needs to be commutative and associative- Specified by:
aggregate
in interfaceAggregatorManager
- Specified by:
aggregate
in interfaceorg.apache.giraph.worker.WorkerAggregatorUsage
- Parameters:
name
- a unique name refer to an aggregatorvalue
- Value to be aggregated.
-
registerReducer
public <S,R extends org.apache.hadoop.io.Writable> void registerReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp)
Register reducer to be reduced in the next worker computation, using given name and operations.- Specified by:
registerReducer
in interfaceAggregatorManager
- Type Parameters:
S
- Single value typeR
- Reduced value type- Parameters:
name
- Name of the reducerreduceOp
- Reduce operations
-
registerReducer
public <S,R extends org.apache.hadoop.io.Writable> void registerReducer(String name, org.apache.giraph.reducers.ReduceOperation<S,R> reduceOp, R globalInitialValue)
Register reducer to be reduced in the next worker computation, using given name and operations, starting globally from globalInitialValue. (globalInitialValue is reduced only once, each worker will still start from neutral initial value)- Specified by:
registerReducer
in interfaceAggregatorManager
- Type Parameters:
S
- Single value typeR
- Reduced value type- Parameters:
name
- Name of the reducerreduceOp
- Reduce operationsglobalInitialValue
- Global initial value
-
getReduced
public <R extends org.apache.hadoop.io.Writable> R getReduced(String name)
Get reduced value from previous worker computation.- Specified by:
getReduced
in interfaceAggregatorManager
- Type Parameters:
R
- Reduced value type- Parameters:
name
- Name of the reducer- Returns:
- Reduced value
-
broadcast
public void broadcast(String name, org.apache.hadoop.io.Writable value)
Broadcast given value to all workers for next computation.- Specified by:
broadcast
in interfaceAggregatorManager
- Parameters:
name
- Name of the broadcast objectvalue
- Value
-
preSuperstep
public void preSuperstep()
- Specified by:
preSuperstep
in interfaceAggregatorManager
-
postSuperstep
public void postSuperstep()
Synchronize aggregator values between workers after superstep.- Specified by:
postSuperstep
in interfaceAggregatorManager
-
postMasterCompute
public void postMasterCompute()
-
-