Skip to content

Commit 0bf6bf4

Browse files
committed
Merge pull request AsyncHttpClient#262 from md-5/master
[AsyncHttpClient#261] Update to use Netty 4 as a transport provider
2 parents 12d9d77 + c83340c commit 0bf6bf4

File tree

89 files changed

+7214
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+7214
-0
lines changed

providers/netty-4/pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0"
2+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<parent>
5+
<groupId>com.ning</groupId>
6+
<artifactId>async-http-client-providers-parent</artifactId>
7+
<version>1.8.0-SNAPSHOT</version>
8+
</parent>
9+
<modelVersion>4.0.0</modelVersion>
10+
<artifactId>async-http-client-netty-4-provider</artifactId>
11+
<name>Asynchronous Http Client Netty 4 Provider</name>
12+
<description>
13+
The Async Http Client Netty 4 Provider.
14+
</description>
15+
16+
<dependencies>
17+
<dependency>
18+
<groupId>io.netty</groupId>
19+
<artifactId>netty-all</artifactId>
20+
<version>4.0.0.Beta3</version>
21+
</dependency>
22+
</dependencies>
23+
24+
</project>
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.providers.netty_4;
14+
15+
import com.ning.http.client.Body;
16+
import io.netty.buffer.ByteBuf;
17+
import io.netty.buffer.Unpooled;
18+
import io.netty.handler.stream.ChunkedInput;
19+
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
23+
/**
24+
* Adapts a {@link Body} to Netty's {@link ChunkedInput}.
25+
*/
26+
class BodyChunkedInput
27+
implements ChunkedInput<ByteBuf> {
28+
29+
private final Body body;
30+
31+
private final int chunkSize = 1024 * 8;
32+
33+
private ByteBuffer nextChunk;
34+
35+
private static final ByteBuffer EOF = ByteBuffer.allocate(0);
36+
37+
private boolean endOfInput = false;
38+
39+
public BodyChunkedInput(Body body) {
40+
if (body == null) {
41+
throw new IllegalArgumentException("no body specified");
42+
}
43+
this.body = body;
44+
}
45+
46+
private ByteBuffer peekNextChunk()
47+
throws IOException {
48+
49+
if (nextChunk == null) {
50+
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
51+
long length = body.read(buffer);
52+
if (length < 0) {
53+
// Negative means this is finished
54+
buffer.flip();
55+
nextChunk = buffer;
56+
endOfInput = true;
57+
} else if (length == 0) {
58+
// Zero means we didn't get anything this time, but may get next time
59+
buffer.flip();
60+
nextChunk = null;
61+
} else {
62+
buffer.flip();
63+
nextChunk = buffer;
64+
}
65+
}
66+
return nextChunk;
67+
}
68+
69+
/**
70+
* Having no next chunk does not necessarily means end of input, other chunks may arrive later
71+
*/
72+
public boolean hasNextChunk() throws Exception {
73+
return peekNextChunk() != null;
74+
}
75+
76+
@Override
77+
public boolean readChunk(ByteBuf b) throws Exception {
78+
ByteBuffer buffer = peekNextChunk();
79+
if (buffer == null || buffer == EOF) {
80+
return false;
81+
}
82+
nextChunk = null;
83+
84+
b.writeBytes(buffer);
85+
return true;
86+
}
87+
88+
public boolean isEndOfInput() throws Exception {
89+
return endOfInput;
90+
}
91+
92+
public void close() throws Exception {
93+
body.close();
94+
}
95+
96+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.providers.netty_4;
14+
15+
import com.ning.http.client.RandomAccessBody;
16+
import io.netty.buffer.AbstractReferenceCounted;
17+
import io.netty.buffer.ReferenceCounted;
18+
import io.netty.channel.FileRegion;
19+
20+
import java.io.IOException;
21+
import java.nio.channels.WritableByteChannel;
22+
23+
/**
24+
* Adapts a {@link RandomAccessBody} to Netty's {@link FileRegion}.
25+
*/
26+
class BodyFileRegion
27+
extends AbstractReferenceCounted
28+
implements FileRegion {
29+
30+
private final RandomAccessBody body;
31+
32+
public BodyFileRegion(RandomAccessBody body) {
33+
if (body == null) {
34+
throw new IllegalArgumentException("no body specified");
35+
}
36+
this.body = body;
37+
}
38+
39+
public long position() {
40+
return 0;
41+
}
42+
43+
public long count() {
44+
return body.getContentLength();
45+
}
46+
47+
public long transferTo(WritableByteChannel target, long position)
48+
throws IOException {
49+
return body.transferTo(position, Long.MAX_VALUE, target);
50+
}
51+
52+
public void deallocate() {
53+
try {
54+
body.close();
55+
} catch (IOException e) {
56+
// we tried
57+
}
58+
}
59+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright (c) 2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.providers.netty_4;
14+
15+
import java.io.IOException;
16+
import java.nio.ByteBuffer;
17+
import java.util.Queue;
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import com.ning.http.client.Body;
22+
import com.ning.http.client.BodyGenerator;
23+
24+
/**
25+
* {@link BodyGenerator} which may return just part of the payload at the time
26+
* handler is requesting it. If it happens - PartialBodyGenerator becomes responsible
27+
* for finishing payload transferring asynchronously.
28+
*/
29+
public class FeedableBodyGenerator implements BodyGenerator {
30+
private final static byte[] END_PADDING = "\r\n".getBytes();
31+
private final static byte[] ZERO = "0".getBytes();
32+
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<BodyPart>();
33+
private final AtomicInteger queueSize = new AtomicInteger();
34+
private FeedListener listener;
35+
36+
@Override
37+
public Body createBody() throws IOException {
38+
return new PushBody();
39+
}
40+
41+
public void feed(final ByteBuffer buffer, final boolean isLast) throws IOException {
42+
queue.offer(new BodyPart(buffer, isLast));
43+
queueSize.incrementAndGet();
44+
if (listener != null) {
45+
listener.onContentAdded();
46+
}
47+
}
48+
49+
public static interface FeedListener {
50+
public void onContentAdded();
51+
}
52+
53+
public void setListener(FeedListener listener) {
54+
this.listener = listener;
55+
}
56+
57+
private final class PushBody implements Body {
58+
private final int ONGOING = 0;
59+
private final int CLOSING = 1;
60+
private final int FINISHED = 2;
61+
62+
private int finishState = 0;
63+
64+
@Override
65+
public long getContentLength() {
66+
return -1;
67+
}
68+
69+
@Override
70+
public long read(final ByteBuffer buffer) throws IOException {
71+
BodyPart nextPart = queue.peek();
72+
if (nextPart == null) {
73+
// Nothing in the queue
74+
switch (finishState) {
75+
case ONGOING:
76+
return 0;
77+
case CLOSING:
78+
buffer.put(ZERO);
79+
buffer.put(END_PADDING);
80+
finishState = FINISHED;
81+
return buffer.position();
82+
case FINISHED:
83+
buffer.put(END_PADDING);
84+
return -1;
85+
}
86+
}
87+
int capacity = buffer.remaining() - 10; // be safe (we'll have to add size, ending, etc.)
88+
int size = Math.min(nextPart.buffer.remaining(), capacity);
89+
buffer.put(Integer.toHexString(size).getBytes());
90+
buffer.put(END_PADDING);
91+
for (int i=0; i < size; i++) {
92+
buffer.put(nextPart.buffer.get());
93+
}
94+
buffer.put(END_PADDING);
95+
if (!nextPart.buffer.hasRemaining()) {
96+
if (nextPart.isLast) {
97+
finishState = CLOSING;
98+
}
99+
queue.remove();
100+
}
101+
return size;
102+
}
103+
104+
@Override
105+
public void close() throws IOException {
106+
}
107+
108+
}
109+
110+
private final static class BodyPart {
111+
private final boolean isLast;
112+
private final ByteBuffer buffer;
113+
114+
public BodyPart(final ByteBuffer buffer, final boolean isLast) {
115+
this.buffer = buffer;
116+
this.isLast = isLast;
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)