|
24 | 24 | import java.net.ServerSocket; |
25 | 25 | import java.net.Socket; |
26 | 26 | import java.util.ArrayList; |
| 27 | +import java.util.Arrays; |
27 | 28 | import java.util.List; |
28 | 29 | import java.util.Optional; |
29 | 30 | import java.util.function.Consumer; |
@@ -78,47 +79,46 @@ public void shutdown() { |
78 | 79 | private void executeInternal() |
79 | 80 | throws ApiException, KubectlException, IOException, InterruptedException { |
80 | 81 | PortForward pf = new PortForward(apiClient); |
81 | | - PortForward.PortForwardResult result = pf.forward(namespace, name, targetPorts); |
82 | | - if (result == null) { |
83 | | - throw new KubectlException("PortForward failed!"); |
84 | | - } |
85 | 82 | // TODO: Convert this to NIO to reduce the number of threads? |
86 | 83 | List<Thread> threads = new ArrayList<>(); |
87 | 84 | for (int i = 0; i < localPorts.size(); i++) { |
88 | 85 | int targetPort = targetPorts.get(i); |
89 | 86 | threads.add( |
90 | | - portForward( |
91 | | - new ServerSocket(localPorts.get(i)), |
92 | | - result.getInputStream(targetPort), |
93 | | - result.getOutboundStream(targetPort))); |
| 87 | + portForward(pf, |
| 88 | + new ServerSocket(localPorts.get(i)), targetPort)); |
94 | 89 | } |
95 | 90 | for (Thread t : threads) { |
96 | 91 | t.join(); |
97 | 92 | } |
98 | 93 | } |
99 | 94 |
|
100 | | - private Thread portForward(ServerSocket server, InputStream in, OutputStream out) { |
101 | | - Thread t = |
102 | | - new Thread( |
103 | | - new Runnable() { |
104 | | - @Override |
105 | | - public void run() { |
106 | | - while (running) { |
107 | | - try { |
108 | | - Socket sock = server.accept(); |
109 | | - Thread t1 = copyAsync(sock.getInputStream(), out, onUnhandledError); |
110 | | - Thread t2 = copyAsync(in, sock.getOutputStream(), onUnhandledError); |
111 | | - |
112 | | - t1.join(); |
113 | | - t2.join(); |
114 | | - } catch (InterruptedException | IOException ex) { |
115 | | - Optional.ofNullable(onUnhandledError) |
116 | | - .orElse(Throwable::printStackTrace) |
117 | | - .accept(ex); |
118 | | - } |
| 95 | + private Thread portForward(PortForward pf, ServerSocket server, int targetPort) { |
| 96 | + Thread t = new Thread( |
| 97 | + new Runnable() { |
| 98 | + @Override |
| 99 | + public void run() { |
| 100 | + while (running) { |
| 101 | + try (Socket sock = server.accept()) { |
| 102 | + PortForward.PortForwardResult result = pf.forward(namespace, name, Arrays.asList(targetPort)); |
| 103 | + if (result == null) { |
| 104 | + throw new KubectlException("PortForward failed!"); |
119 | 105 | } |
| 106 | + InputStream in = result.getInputStream(targetPort); |
| 107 | + OutputStream out = result.getOutboundStream(targetPort); |
| 108 | + Thread t1 = copyAsync(sock.getInputStream(), out, onUnhandledError); |
| 109 | + Thread t2 = copyAsync(in, sock.getOutputStream(), onUnhandledError); |
| 110 | + |
| 111 | + t1.join(); |
| 112 | + in.close(); |
| 113 | + t2.join(); |
| 114 | + } catch (Exception ex) { |
| 115 | + Optional.ofNullable(onUnhandledError) |
| 116 | + .orElse(Throwable::printStackTrace) |
| 117 | + .accept(ex); |
120 | 118 | } |
121 | | - }); |
| 119 | + } |
| 120 | + } |
| 121 | + }); |
122 | 122 | t.start(); |
123 | 123 | return t; |
124 | 124 | } |
|
0 commit comments