Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
adb80f3
* add netty dependent
coderzc Mar 22, 2021
170bdcf
* add netty dependent
coderzc Mar 22, 2021
6153840
* add netty dependent
coderzc Mar 22, 2021
31b8bdd
* add netty dependent
coderzc Mar 22, 2021
f1f2fe4
improve copyright
coderzc Mar 22, 2021
d9d280b
modify netty depend
coderzc Mar 23, 2021
90f9566
improve transport config
coderzc Mar 23, 2021
401d705
add NettyTransportServer
coderzc Mar 23, 2021
31c8a0a
* add netty dependent
coderzc Mar 22, 2021
6243396
improve copyright
coderzc Mar 22, 2021
f33b445
modify netty depend
coderzc Mar 23, 2021
dc839ba
delete extra IOMode
coderzc Mar 23, 2021
c805661
* improve NettyTransportServer
coderzc Mar 24, 2021
a3b2838
* improve NettyTransportServer
coderzc Mar 24, 2021
fa1f20f
* improve NettyTransportServer
coderzc Mar 25, 2021
bd414aa
fix check style
coderzc Mar 25, 2021
93a724a
NettyConnectionManager --> NettyClientConnectionManager
coderzc Mar 25, 2021
ab672d3
NettyConnectionManager --> NettyClientConnectionManager
coderzc Mar 25, 2021
44545ea
NettyConnectionManager --> NettyClientConnectionManager
coderzc Mar 25, 2021
cbcfd63
optimize ClientManager, ClientFactory, NettyTransportClient
coderzc Mar 25, 2021
df6b884
optimize ClientManager, ClientFactory, NettyTransportClient
coderzc Mar 25, 2021
db1e637
optimize ClientManager, ClientFactory, NettyTransportClient
coderzc Mar 25, 2021
6e43291
optimize ClientManager,ClientFactory
coderzc Mar 25, 2021
903cf44
* optimize code style
coderzc Mar 26, 2021
bbe4863
* optimize code style
coderzc Mar 26, 2021
12995d2
* improve ConnectionManager interface
coderzc Mar 26, 2021
132f260
* fix code style
coderzc Mar 27, 2021
8df69da
* rebase master
coderzc Mar 27, 2021
b9f6346
improve host and connection error message
coderzc Mar 27, 2021
0a80d99
Transport4Client -> TransportClient, Transport4Server -> TransportServer
coderzc Mar 29, 2021
cd6b359
* add Message,Handler,codec
coderzc Mar 31, 2021
bd142d8
fix code style
coderzc Apr 2, 2021
824ceac
* fix code style
coderzc Apr 2, 2021
dfb3f0e
* fix code style
coderzc Apr 6, 2021
57b8530
improve MessageDecoder
coderzc Apr 6, 2021
a3cadae
fix code style
coderzc Apr 6, 2021
c6cea82
fix code style
coderzc Apr 7, 2021
ff0fff4
* improve test coverage
coderzc Apr 9, 2021
e78d093
fix code style
coderzc Apr 9, 2021
d93ea04
* fix code style
coderzc Apr 9, 2021
4a9da67
* fix code style
coderzc Apr 9, 2021
0229bfb
add timeout
coderzc Apr 9, 2021
a0027ab
add timeout
coderzc Apr 9, 2021
12cf651
* 2000 -> 2000L
coderzc Apr 9, 2021
0b541e6
3000 -> 3000L
coderzc Apr 9, 2021
aede99b
* zero-copy body
coderzc Apr 9, 2021
3476e5e
* use zero-copy transport
coderzc Apr 10, 2021
c758703
add blank line
coderzc Apr 10, 2021
7f3d5c0
improve buffer reference
coderzc Apr 11, 2021
d115d5e
improve buffer reference
coderzc Apr 11, 2021
1a9a4f3
* add WakeHandler
coderzc Apr 12, 2021
c6651b4
* improve code style
coderzc Apr 12, 2021
d4b37b9
add message title
coderzc Apr 12, 2021
5d19234
improve code style
coderzc Apr 12, 2021
d9cd8c6
add ClientChannelListenerOnWrite
coderzc Apr 12, 2021
bf421fd
add ClientChannelListenerOnWrite
coderzc Apr 12, 2021
d71bd52
improve ChannelFutureListenerOnWrite
coderzc Apr 12, 2021
5761e32
improve test coverage
coderzc Apr 12, 2021
e94e5fd
improve test coverage
coderzc Apr 12, 2021
b61d91a
improve code style
coderzc Apr 12, 2021
0ea08ce
improve code style
coderzc Apr 12, 2021
7f44f71
improve code style
coderzc Apr 12, 2021
9a86777
improve code style
coderzc Apr 12, 2021
51e29fd
improve code style
coderzc Apr 12, 2021
1772e52
improve code style
coderzc Apr 12, 2021
d969eaa
improve code style
coderzc Apr 12, 2021
136e46b
improve code style
coderzc Apr 12, 2021
d2a48f0
improve code style
coderzc Apr 12, 2021
ca75306
improve code style
coderzc Apr 12, 2021
768699a
4 -> Integer.BYTES
coderzc Apr 13, 2021
d50466b
* remove 'import static'
coderzc Apr 13, 2021
9598ea0
fix failMessage bodyLength
coderzc Apr 13, 2021
2905c68
improve readString()
coderzc Apr 13, 2021
6118e05
again compile
coderzc Apr 13, 2021
dba069f
remove skipExtraBuffer
coderzc Apr 13, 2021
1631615
slice body and retain it
coderzc Apr 13, 2021
ff14845
slice body and retain it
coderzc Apr 13, 2021
42ec875
slice body and retain it
coderzc Apr 13, 2021
6d64cd5
slice body and retain it
coderzc Apr 13, 2021
84bcd31
fix style
coderzc Apr 13, 2021
58ba674
* improve code style
coderzc Apr 13, 2021
2d74673
improve message
coderzc Apr 13, 2021
081be0c
assert header
coderzc Apr 13, 2021
4f399f4
* rebase master
coderzc Apr 14, 2021
20699fb
improve comment
coderzc Apr 14, 2021
e188287
improve import
coderzc Apr 14, 2021
b0d7d6a
default use EDGE_TRIGGERED
coderzc Apr 15, 2021
8662ea6
improve some comments
coderzc Apr 15, 2021
a3345b3
revert
coderzc Apr 15, 2021
b383a88
revert
coderzc Apr 15, 2021
1b4a11e
recompile
coderzc Apr 15, 2021
759a3f5
improve some comments
coderzc Apr 15, 2021
bd2cbd3
fix too long
coderzc Apr 15, 2021
c516fa9
improve some log
coderzc Apr 15, 2021
71a3abd
improve some log
coderzc Apr 15, 2021
31671be
improve readString, writeString
coderzc Apr 15, 2021
a063a9c
improve some log
coderzc Apr 15, 2021
ea268ef
add TRANSPORT_SYNC_REQUEST_TIMEOUT
coderzc Apr 16, 2021
fbf8036
improve desc
coderzc Apr 16, 2021
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions computer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-client</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.common.exception;

import java.io.IOException;

/**
* The transport network exception
*/
public class TransportException extends IOException {

private static final long serialVersionUID = -6538118382075299762L;

public static final int DEFAULT_CODE = 0;

private final int errorCode;

public TransportException(String message) {
this(DEFAULT_CODE, message);
}

public TransportException(int errorCode, String message) {
super(message);
this.errorCode = errorCode;
}

public TransportException(String message, Object... args) {
this(DEFAULT_CODE, message, args);
}

public TransportException(int errorCode, String message, Object... args) {
super(String.format(message, args));
this.errorCode = errorCode;
}

public TransportException(String message, Throwable cause) {
super(message, cause);
this.errorCode = DEFAULT_CODE;
}

public TransportException(String message, Throwable cause, Object... args) {
super(String.format(message, args), cause);
this.errorCode = DEFAULT_CODE;
}

public Throwable rootCause() {
return ComputeException.rootCause(this);
}

public int errorCode() {
return this.errorCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static com.baidu.hugegraph.config.OptionChecker.allowValues;
import static com.baidu.hugegraph.config.OptionChecker.disallowEmpty;
import static com.baidu.hugegraph.config.OptionChecker.nonNegativeInt;
import static com.baidu.hugegraph.config.OptionChecker.positiveInt;

import java.util.Set;
Expand All @@ -30,6 +31,8 @@
import com.baidu.hugegraph.computer.core.aggregator.WorkerAggrManager;
import com.baidu.hugegraph.computer.core.graph.partition.HashPartitioner;
import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation;
import com.baidu.hugegraph.computer.core.network.NettyTransportProvider;
import com.baidu.hugegraph.computer.core.network.TransportConf;
import com.baidu.hugegraph.config.ConfigOption;
import com.baidu.hugegraph.config.OptionHolder;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -241,26 +244,6 @@ public static synchronized ComputerOptions instance() {
TimeUnit.SECONDS.toMillis(30L)
);

public static final ConfigOption<Integer> WORKER_DATA_PORT_START =
new ConfigOption<>(
"worker.data_port_start",
"The start of range [data_port_start, data_port_end]. " +
"The worker will choose one from small to large of the " +
"range for data transportation.",
positiveInt(),
11000
);

public static final ConfigOption<Integer> WORKER_DATA_PORT_END =
new ConfigOption<>(
"worker.data_port_end",
"The end of range [data_port_start, data_port_end]. " +
"The worker will choose one from small to large of the " +
"range for data transportation.",
positiveInt(),
12000
);

public static final ConfigOption<Class<?>> WORKER_PARTITIONER =
new ConfigOption<>(
"worker.partitioner",
Expand Down Expand Up @@ -333,4 +316,177 @@ public static synchronized ComputerOptions instance() {
disallowEmpty(),
"hugegraph"
);

public static final ConfigOption<String> TRANSPORT_SERVER_HOST =
new ConfigOption<>(
"transport.server_host",
"The server bind host.",
disallowEmpty(),
"127.0.0.1"
);

public static final ConfigOption<Integer> TRANSPORT_SERVER_PORT =
new ConfigOption<>(
"transport.server_port",
"The server bind port, if it is zero " +
"will let the system pick up an ephemeral port.",
nonNegativeInt(),
0
);

public static final ConfigOption<Integer> TRANSPORT_SERVER_THREADS =
new ConfigOption<>(
"transport.server_threads",
"The number of transport threads for server, the default " +
"value is CPUs.",
positiveInt(),
TransportConf.NUMBER_CPU_CORES
);

public static final ConfigOption<Integer> TRANSPORT_CLIENT_THREADS =
new ConfigOption<>(
"transport.client_threads",
"The number of transport threads for client, the default " +
"value is CPUs.",
positiveInt(),
TransportConf.NUMBER_CPU_CORES
);

public static final ConfigOption<Class<?>> TRANSPORT_PROVIDER_CLASS =
new ConfigOption<>(
"transport.provider_class",
"The transport provider, currently only supports Netty.",
disallowEmpty(),
NettyTransportProvider.class
);

public static final ConfigOption<String> TRANSPORT_IO_MODE =
new ConfigOption<>(
"transport.io_mode",
"The network IO Mode, either 'NIO', 'EPOLL', 'AUTO', the " +
"'AUTO' means selecting the property mode automatically.",
allowValues("NIO", "EPOLL", "AUTO"),
"AUTO"
);

public static final ConfigOption<Boolean> TRANSPORT_EPOLL_LT =
new ConfigOption<>(
"transport.transport_epoll_lt",
"Whether enable EPOLL level-trigger.",
allowValues(true, false),
false
);

public static final ConfigOption<Integer> TRANSPORT_SEND_BUFFER_SIZE =
new ConfigOption<>(
"transport.send_buffer_size",
"The network send buffer size, 0 means using system " +
"defaults.",
nonNegativeInt(),
0
);

public static final ConfigOption<Integer> TRANSPORT_RECEIVE_BUFFER_SIZE =
new ConfigOption<>(
"transport.receive_buffer_size",
"The network receive buffer size, 0 means using system " +
"defaults.",
nonNegativeInt(),
0
);

public static final ConfigOption<Integer> TRANSPORT_BACKLOG =
new ConfigOption<>(
"transport.backlog",
"The server connection backlog, 0 means using system " +
"defaults.",
nonNegativeInt(),
0
);

public static final ConfigOption<Long> TRANSPORT_CLIENT_CONNECT_TIMEOUT =
new ConfigOption<>(
"transport.client_connect_timeout",
"The timeout(in ms) of client connect to server.",
positiveInt(),
3000L
);

public static final ConfigOption<Long> TRANSPORT_CLOSE_TIMEOUT =
new ConfigOption<>(
"transport.close_timeout",
"The timeout(in ms) of close server or close client.",
positiveInt(),
10_000L
);

public static final ConfigOption<Long> TRANSPORT_SYNC_REQUEST_TIMEOUT =
new ConfigOption<>(
"transport.sync_request_timeout",
"The timeout(in ms) to wait response after " +
"sending sync-request.",
positiveInt(),
5_000L
);

public static final ConfigOption<Integer> TRANSPORT_NETWORK_RETRIES =
new ConfigOption<>(
"transport.network_retries",
"The number of retry attempts for network communication," +
"if network unstable.",
nonNegativeInt(),
3
);

public static final ConfigOption<Integer> TRANSPORT_MAX_PENDING_REQUESTS =
new ConfigOption<>(
"transport.max_pending_requests",
"The max number of client unreceived ack, " +
"if the number of unreceived ack greater than it, " +
"it will block the client from calling send.",
positiveInt(),
50
);

public static final ConfigOption<Integer> TRANSPORT_MIN_PENDING_REQUESTS =
new ConfigOption<>(
"transport.min_pending_requests",
"The minimum number of client unreceived ack, " +
"if the number of unreceived ack less than it, " +
"it will wake the client from calling send.",
positiveInt(),
5
);

public static final ConfigOption<Long> TRANSPORT_MIN_ACK_INTERVAL =
new ConfigOption<>(
"transport.min_ack_interval",
"The minimum interval(in ms) of server reply ack.",
positiveInt(),
200L
);

public static final ConfigOption<Integer> TRANSPORT_HEARTBEAT_INTERVAL =
new ConfigOption<>(
"transport.heartbeat_interval_seconds",
"Time minimum interval(in seconds) of send heartbeat.",
positiveInt(),
60
);

public static final ConfigOption<Integer> TRANSPORT_HEARTBEAT_TIMEOUT =
new ConfigOption<>(
"transport.heartbeat_timeout_seconds",
"The max timeout(in seconds) of heartbeat.",
positiveInt(),
120
);

public static final ConfigOption<Boolean> TRANSPORT_TCP_KEEP_ALIVE =
new ConfigOption<>(
"transport.transport_tcp_keep_alive",
"Whether enable TCP keep-alive.",
allowValues(true, false),
true
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.network;


import com.baidu.hugegraph.computer.core.common.exception.TransportException;

public interface ClientFactory {

/**
* Initialize the {@link ClientFactory}
*/
void init();

/**
* Create a TransportClient.
* @param connectionId {@link ConnectionId}
* @param handler
* @return {@link TransportClient}
*/
TransportClient createClient(ConnectionId connectionId,
ClientHandler handler)
throws TransportException;

/**
* Close the {@link ClientFactory}
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,11 @@

package com.baidu.hugegraph.computer.core.network;

import com.baidu.hugegraph.computer.core.config.Config;

/**
* This is used for worker that receives data.
*/
public interface Transport4Server {

/**
* Startup server, return the port listened. The port range in config is
* [{@link @ComputerOptions.WORKER_DATA_PORT_START #STATIC_FIELD},
* {@link @ComputerOptions.WORKER_DATA_PORT_END #STATIC_FIELD}].
*/
int listen(Config config, MessageHandler handler);
public interface ClientHandler extends TransportHandler {

/**
* Stop the server
* Invoked when the channel associated with the given connectionId channel
* is able to send data immediately.
*/
void stop();
void sendAvailable(ConnectionId connectionId);
}
Loading