|
1 | | -/* |
2 | | - * DISCLAIMER |
3 | | - * |
4 | | - * Copyright 2016 ArangoDB GmbH, Cologne, Germany |
5 | | - * |
6 | | - * Licensed under the Apache License, Version 2.0 (the "License"); |
7 | | - * you may not use this file except in compliance with the License. |
8 | | - * You may obtain a copy of the License at |
9 | | - * |
10 | | - * http://www.apache.org/licenses/LICENSE-2.0 |
11 | | - * |
12 | | - * Unless required by applicable law or agreed to in writing, software |
13 | | - * distributed under the License is distributed on an "AS IS" BASIS, |
14 | | - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | | - * See the License for the specific language governing permissions and |
16 | | - * limitations under the License. |
17 | | - * |
18 | | - * Copyright holder is ArangoDB GmbH, Cologne, Germany |
19 | | - */ |
20 | | - |
21 | | -package com.arangodb.internal.velocystream; |
22 | | - |
23 | | -import java.io.IOException; |
24 | | -import java.util.ArrayList; |
25 | | -import java.util.Collection; |
26 | | -import java.util.concurrent.atomic.AtomicLong; |
27 | | - |
28 | | -import javax.net.ssl.SSLContext; |
29 | | - |
30 | | -import org.slf4j.Logger; |
31 | | -import org.slf4j.LoggerFactory; |
32 | | - |
33 | | -import com.arangodb.ArangoDBException; |
34 | | -import com.arangodb.internal.ArangoDBConstants; |
35 | | -import com.arangodb.internal.Host; |
36 | | -import com.arangodb.internal.net.ArangoDBRedirectException; |
37 | | -import com.arangodb.internal.net.ConnectionPool; |
38 | | -import com.arangodb.internal.net.HostHandle; |
39 | | -import com.arangodb.internal.util.HostUtils; |
40 | | -import com.arangodb.internal.util.ResponseUtils; |
41 | | -import com.arangodb.internal.velocystream.internal.Chunk; |
42 | | -import com.arangodb.internal.velocystream.internal.Message; |
43 | | -import com.arangodb.internal.velocystream.internal.VstConnection; |
44 | | -import com.arangodb.util.ArangoSerialization; |
45 | | -import com.arangodb.velocypack.VPackSlice; |
46 | | -import com.arangodb.velocypack.exception.VPackParserException; |
47 | | -import com.arangodb.velocystream.Request; |
48 | | -import com.arangodb.velocystream.Response; |
49 | | - |
50 | | -/** |
51 | | - * @author Mark Vollmary |
52 | | - * |
53 | | - */ |
54 | | -public abstract class VstCommunication<R, C extends VstConnection> { |
55 | | - |
56 | | -private static final Logger LOGGER = LoggerFactory.getLogger(VstCommunication.class); |
57 | | - |
58 | | -protected static final AtomicLong mId = new AtomicLong(0L); |
59 | | -protected final ArangoSerialization util; |
60 | | -protected final ConnectionPool<C> connectionPool; |
61 | | - |
62 | | -protected final String user; |
63 | | -protected final String password; |
64 | | - |
65 | | -protected final Integer chunksize; |
66 | | - |
67 | | -protected VstCommunication(final Integer timeout, final String user, final String password, final Boolean useSsl, |
68 | | -final SSLContext sslContext, final ArangoSerialization util, final Integer chunksize, |
69 | | -final ConnectionPool<C> connectionPool) { |
70 | | -this.user = user; |
71 | | -this.password = password; |
72 | | -this.util = util; |
73 | | -this.connectionPool = connectionPool; |
74 | | -this.chunksize = chunksize != null ? chunksize : ArangoDBConstants.CHUNK_DEFAULT_CONTENT_SIZE; |
75 | | -} |
76 | | - |
77 | | -protected synchronized void connect(final C connection) { |
78 | | -if (!connection.isOpen()) { |
79 | | -try { |
80 | | -connection.open(); |
81 | | -if (user != null) { |
82 | | -authenticate(connection); |
83 | | -} |
84 | | -} catch (final IOException e) { |
85 | | -LOGGER.error(e.getMessage(), e); |
86 | | -throw new ArangoDBException(e); |
87 | | -} |
88 | | -} |
89 | | -} |
90 | | - |
91 | | -protected abstract void authenticate(final C connection); |
92 | | - |
93 | | -public void disconnect() throws IOException { |
94 | | -connectionPool.disconnect(); |
95 | | -} |
96 | | - |
97 | | -public R execute(final Request request, final HostHandle hostHandle) throws ArangoDBException { |
98 | | -final C connection = connectionPool.connection(hostHandle); |
99 | | -try { |
100 | | -return execute(request, connection); |
101 | | -} catch (final ArangoDBException e) { |
102 | | -if (e instanceof ArangoDBRedirectException) { |
103 | | -final String location = ArangoDBRedirectException.class.cast(e).getLocation(); |
104 | | -final Host host = HostUtils.createFromLocation(location); |
105 | | -connectionPool.closeConnectionOnError(connection); |
106 | | -return execute(request, new HostHandle().setHost(host)); |
107 | | -} else { |
108 | | -throw e; |
109 | | -} |
110 | | -} |
111 | | -} |
112 | | - |
113 | | -protected abstract R execute(final Request request, C connection) throws ArangoDBException; |
114 | | - |
115 | | -protected void checkError(final Response response) throws ArangoDBException { |
116 | | -ResponseUtils.checkError(util, response); |
117 | | -} |
118 | | - |
119 | | -protected Response createResponse(final Message message) throws VPackParserException { |
120 | | -final Response response = util.deserialize(message.getHead(), Response.class); |
121 | | -if (message.getBody() != null) { |
122 | | -response.setBody(message.getBody()); |
123 | | -} |
124 | | -return response; |
125 | | -} |
126 | | - |
127 | | -protected Message createMessage(final Request request) throws VPackParserException { |
128 | | -final long id = mId.incrementAndGet(); |
129 | | -return new Message(id, util.serialize(request), request.getBody()); |
130 | | -} |
131 | | - |
132 | | -protected Collection<Chunk> buildChunks(final Message message) { |
133 | | -final Collection<Chunk> chunks = new ArrayList<Chunk>(); |
134 | | -final VPackSlice head = message.getHead(); |
135 | | -int size = head.getByteSize(); |
136 | | -final VPackSlice body = message.getBody(); |
137 | | -if (body != null) { |
138 | | -size += body.getByteSize(); |
139 | | -} |
140 | | -final int n = size / chunksize; |
141 | | -final int numberOfChunks = (size % chunksize != 0) ? (n + 1) : n; |
142 | | -int off = 0; |
143 | | -for (int i = 0; size > 0; i++) { |
144 | | -final int len = Math.min(chunksize, size); |
145 | | -final long messageLength = (i == 0 && numberOfChunks > 1) ? size : -1L; |
146 | | -final Chunk chunk = new Chunk(message.getId(), i, numberOfChunks, messageLength, off, len); |
147 | | -size -= len; |
148 | | -off += len; |
149 | | -chunks.add(chunk); |
150 | | -} |
151 | | -return chunks; |
152 | | -} |
153 | | - |
154 | | -} |
| 1 | +/* |
| 2 | + * DISCLAIMER |
| 3 | + * |
| 4 | + * Copyright 2016 ArangoDB GmbH, Cologne, Germany |
| 5 | + * |
| 6 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 7 | + * you may not use this file except in compliance with the License. |
| 8 | + * You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, software |
| 13 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | + * See the License for the specific language governing permissions and |
| 16 | + * limitations under the License. |
| 17 | + * |
| 18 | + * Copyright holder is ArangoDB GmbH, Cologne, Germany |
| 19 | + */ |
| 20 | + |
| 21 | +package com.arangodb.internal.velocystream; |
| 22 | + |
| 23 | +import java.io.IOException; |
| 24 | +import java.util.ArrayList; |
| 25 | +import java.util.Collection; |
| 26 | +import java.util.concurrent.atomic.AtomicLong; |
| 27 | + |
| 28 | +import javax.net.ssl.SSLContext; |
| 29 | + |
| 30 | +import org.slf4j.Logger; |
| 31 | +import org.slf4j.LoggerFactory; |
| 32 | + |
| 33 | +import com.arangodb.ArangoDBException; |
| 34 | +import com.arangodb.internal.ArangoDBConstants; |
| 35 | +import com.arangodb.internal.Host; |
| 36 | +import com.arangodb.internal.net.ArangoDBRedirectException; |
| 37 | +import com.arangodb.internal.net.ConnectionPool; |
| 38 | +import com.arangodb.internal.net.HostHandle; |
| 39 | +import com.arangodb.internal.util.HostUtils; |
| 40 | +import com.arangodb.internal.util.ResponseUtils; |
| 41 | +import com.arangodb.internal.velocystream.internal.Chunk; |
| 42 | +import com.arangodb.internal.velocystream.internal.Message; |
| 43 | +import com.arangodb.internal.velocystream.internal.VstConnection; |
| 44 | +import com.arangodb.util.ArangoSerialization; |
| 45 | +import com.arangodb.velocypack.VPackSlice; |
| 46 | +import com.arangodb.velocypack.exception.VPackParserException; |
| 47 | +import com.arangodb.velocystream.Request; |
| 48 | +import com.arangodb.velocystream.Response; |
| 49 | + |
| 50 | +/** |
| 51 | + * @author Mark Vollmary |
| 52 | + * |
| 53 | + */ |
| 54 | +public abstract class VstCommunication<R, C extends VstConnection> { |
| 55 | + |
| 56 | +private static final Logger LOGGER = LoggerFactory.getLogger(VstCommunication.class); |
| 57 | + |
| 58 | +protected static final AtomicLong mId = new AtomicLong(0L); |
| 59 | +protected final ArangoSerialization util; |
| 60 | +protected final ConnectionPool<C> connectionPool; |
| 61 | + |
| 62 | +protected final String user; |
| 63 | +protected final String password; |
| 64 | + |
| 65 | +protected final Integer chunksize; |
| 66 | + |
| 67 | +protected VstCommunication(final Integer timeout, final String user, final String password, final Boolean useSsl, |
| 68 | +final SSLContext sslContext, final ArangoSerialization util, final Integer chunksize, |
| 69 | +final ConnectionPool<C> connectionPool) { |
| 70 | +this.user = user; |
| 71 | +this.password = password; |
| 72 | +this.util = util; |
| 73 | +this.connectionPool = connectionPool; |
| 74 | +this.chunksize = chunksize != null ? chunksize : ArangoDBConstants.CHUNK_DEFAULT_CONTENT_SIZE; |
| 75 | +} |
| 76 | + |
| 77 | +protected synchronized void connect(final C connection) { |
| 78 | +if (!connection.isOpen()) { |
| 79 | +try { |
| 80 | +connection.open(); |
| 81 | +if (user != null) { |
| 82 | +authenticate(connection); |
| 83 | +} |
| 84 | +connection.confirm(); |
| 85 | +} catch (final IOException e) { |
| 86 | +LOGGER.error(e.getMessage(), e); |
| 87 | +throw new ArangoDBException(e); |
| 88 | +} |
| 89 | +} |
| 90 | +} |
| 91 | + |
| 92 | +protected abstract void authenticate(final C connection); |
| 93 | + |
| 94 | +public void disconnect() throws IOException { |
| 95 | +connectionPool.disconnect(); |
| 96 | +} |
| 97 | + |
| 98 | +public R execute(final Request request, final HostHandle hostHandle) throws ArangoDBException { |
| 99 | +final C connection = connectionPool.connection(hostHandle); |
| 100 | +try { |
| 101 | +return execute(request, connection); |
| 102 | +} catch (final ArangoDBException e) { |
| 103 | +if (e instanceof ArangoDBRedirectException) { |
| 104 | +final String location = ArangoDBRedirectException.class.cast(e).getLocation(); |
| 105 | +final Host host = HostUtils.createFromLocation(location); |
| 106 | +connectionPool.closeConnectionOnError(connection); |
| 107 | +return execute(request, new HostHandle().setHost(host)); |
| 108 | +} else { |
| 109 | +throw e; |
| 110 | +} |
| 111 | +} |
| 112 | +} |
| 113 | + |
| 114 | +protected abstract R execute(final Request request, C connection) throws ArangoDBException; |
| 115 | + |
| 116 | +protected void checkError(final Response response) throws ArangoDBException { |
| 117 | +ResponseUtils.checkError(util, response); |
| 118 | +} |
| 119 | + |
| 120 | +protected Response createResponse(final Message message) throws VPackParserException { |
| 121 | +final Response response = util.deserialize(message.getHead(), Response.class); |
| 122 | +if (message.getBody() != null) { |
| 123 | +response.setBody(message.getBody()); |
| 124 | +} |
| 125 | +return response; |
| 126 | +} |
| 127 | + |
| 128 | +protected Message createMessage(final Request request) throws VPackParserException { |
| 129 | +final long id = mId.incrementAndGet(); |
| 130 | +return new Message(id, util.serialize(request), request.getBody()); |
| 131 | +} |
| 132 | + |
| 133 | +protected Collection<Chunk> buildChunks(final Message message) { |
| 134 | +final Collection<Chunk> chunks = new ArrayList<Chunk>(); |
| 135 | +final VPackSlice head = message.getHead(); |
| 136 | +int size = head.getByteSize(); |
| 137 | +final VPackSlice body = message.getBody(); |
| 138 | +if (body != null) { |
| 139 | +size += body.getByteSize(); |
| 140 | +} |
| 141 | +final int n = size / chunksize; |
| 142 | +final int numberOfChunks = (size % chunksize != 0) ? (n + 1) : n; |
| 143 | +int off = 0; |
| 144 | +for (int i = 0; size > 0; i++) { |
| 145 | +final int len = Math.min(chunksize, size); |
| 146 | +final long messageLength = (i == 0 && numberOfChunks > 1) ? size : -1L; |
| 147 | +final Chunk chunk = new Chunk(message.getId(), i, numberOfChunks, messageLength, off, len); |
| 148 | +size -= len; |
| 149 | +off += len; |
| 150 | +chunks.add(chunk); |
| 151 | +} |
| 152 | +return chunks; |
| 153 | +} |
| 154 | + |
| 155 | +} |
0 commit comments