2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.ipc
;
20 import java
.io
.IOException
;
21 import java
.net
.InetSocketAddress
;
22 import java
.util
.ArrayList
;
23 import java
.util
.Collection
;
24 import java
.util
.List
;
25 import org
.apache
.hadoop
.conf
.Configuration
;
26 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
27 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
28 import org
.apache
.hadoop
.hbase
.Server
;
29 import org
.apache
.hadoop
.hbase
.codec
.Codec
;
30 import org
.apache
.hadoop
.hbase
.nio
.ByteBuff
;
31 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
32 import org
.apache
.hadoop
.hbase
.testclassification
.RPCTests
;
33 import org
.apache
.hadoop
.hbase
.util
.JVM
;
34 import org
.junit
.AfterClass
;
35 import org
.junit
.BeforeClass
;
36 import org
.junit
.ClassRule
;
37 import org
.junit
.experimental
.categories
.Category
;
38 import org
.junit
.runner
.RunWith
;
39 import org
.junit
.runners
.Parameterized
;
40 import org
.junit
.runners
.Parameterized
.Parameter
;
41 import org
.junit
.runners
.Parameterized
.Parameters
;
43 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.Channel
;
44 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.epoll
.EpollEventLoopGroup
;
45 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.epoll
.EpollSocketChannel
;
46 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.nio
.NioEventLoopGroup
;
47 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.socket
.nio
.NioSocketChannel
;
49 @RunWith(Parameterized
.class)
50 @Category({ RPCTests
.class, MediumTests
.class })
51 public class TestNettyIPC
extends AbstractTestIPC
{
54 public static final HBaseClassTestRule CLASS_RULE
=
55 HBaseClassTestRule
.forClass(TestNettyIPC
.class);
57 @Parameters(name
= "{index}: EventLoop={0}")
58 public static Collection
<Object
[]> parameters() {
59 List
<Object
[]> params
= new ArrayList
<>();
60 params
.add(new Object
[] { "nio" });
61 params
.add(new Object
[] { "perClientNio" });
62 if (JVM
.isLinux() && JVM
.isAmd64()) {
63 params
.add(new Object
[] { "epoll" });
69 public String eventLoopType
;
71 private static NioEventLoopGroup NIO
;
73 private static EpollEventLoopGroup EPOLL
;
76 public static void setUpBeforeClass() {
77 NIO
= new NioEventLoopGroup();
78 if (JVM
.isLinux() && JVM
.isAmd64()) {
79 EPOLL
= new EpollEventLoopGroup();
84 public static void tearDownAfterClass() {
86 NIO
.shutdownGracefully();
89 EPOLL
.shutdownGracefully();
93 private void setConf(Configuration conf
) {
94 switch (eventLoopType
) {
96 NettyRpcClientConfigHelper
.setEventLoopConfig(conf
, NIO
, NioSocketChannel
.class);
99 NettyRpcClientConfigHelper
.setEventLoopConfig(conf
, EPOLL
, EpollSocketChannel
.class);
102 NettyRpcClientConfigHelper
.createEventLoopPerClient(conf
);
110 protected RpcServer
createRpcServer(Server server
, String name
,
111 List
<RpcServer
.BlockingServiceAndInterface
> services
, InetSocketAddress bindAddress
,
112 Configuration conf
, RpcScheduler scheduler
) throws IOException
{
113 return new NettyRpcServer(server
, name
, services
, bindAddress
, conf
, scheduler
, true);
117 protected NettyRpcClient
createRpcClientNoCodec(Configuration conf
) {
119 return new NettyRpcClient(conf
) {
130 protected NettyRpcClient
createRpcClient(Configuration conf
) {
132 return new NettyRpcClient(conf
);
136 protected NettyRpcClient
createRpcClientRTEDuringConnectionSetup(Configuration conf
) {
138 return new NettyRpcClient(conf
) {
141 boolean isTcpNoDelay() {
142 throw new RuntimeException("Injected fault");
147 private static class TestFailingRpcServer
extends NettyRpcServer
{
149 TestFailingRpcServer(Server server
, String name
,
150 List
<RpcServer
.BlockingServiceAndInterface
> services
, InetSocketAddress bindAddress
,
151 Configuration conf
, RpcScheduler scheduler
) throws IOException
{
152 super(server
, name
, services
, bindAddress
, conf
, scheduler
, true);
155 static final class FailingConnection
extends NettyServerRpcConnection
{
156 private FailingConnection(TestFailingRpcServer rpcServer
, Channel channel
) {
157 super(rpcServer
, channel
);
161 public void processRequest(ByteBuff buf
) throws IOException
, InterruptedException
{
162 // this will throw exception after the connection header is read, and an RPC is sent
164 throw new DoNotRetryIOException("Failing for test");
169 protected NettyRpcServerPreambleHandler
createNettyRpcServerPreambleHandler() {
170 return new NettyRpcServerPreambleHandler(TestFailingRpcServer
.this) {
172 protected NettyServerRpcConnection
createNettyServerRpcConnection(Channel channel
) {
173 return new FailingConnection(TestFailingRpcServer
.this, channel
);
180 protected RpcServer
createTestFailingRpcServer(Server server
, String name
,
181 List
<RpcServer
.BlockingServiceAndInterface
> services
, InetSocketAddress bindAddress
,
182 Configuration conf
, RpcScheduler scheduler
) throws IOException
{
183 return new TestFailingRpcServer(server
, name
, services
, bindAddress
, conf
, scheduler
);