3 # Licensed to the Apache Software Foundation (ASF) under one
4 # or more contributor license agreements. See the NOTICE file
5 # distributed with this work for additional information
6 # regarding copyright ownership. The ASF licenses this file
7 # to you under the Apache License, Version 2.0 (the
8 # "License"); you may not use this file except in compliance
9 # with the License. You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
21 java_import java.util.Arrays
22 java_import java.util.regex.Pattern
23 java_import org.apache.hadoop.hbase.util.Pair
24 java_import org.apache.hadoop.hbase.util.RegionSplitter
25 java_import org.apache.hadoop.hbase.util.Bytes
26 java_import org.apache.hadoop.hbase.ServerName
27 java_import org.apache.hadoop.hbase.TableName
29 # Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
32 # rubocop:disable Metrics/ClassLength
34 include HBaseConstants
36 def initialize(connection)
37 @connection = connection
39 @admin = @connection.getAdmin
40 @conf = @connection.getConfiguration
47 #----------------------------------------------------------------------------------------------
48 # Returns a list of tables in hbase
49 def list(regex = '.*')
50 @admin.listTableNames(Pattern.compile(regex)).map(&:getNameAsString)
53 #----------------------------------------------------------------------------------------------
54 # Requests a table or region or region server flush
56 @admin.flushRegion(name.to_java_bytes)
57 rescue java.lang.IllegalArgumentException
58 # Unknown region. Try table.
60 @admin.flush(TableName.valueOf(name))
61 rescue java.lang.IllegalArgumentException
62 # Unknown table. Try region server.
63 @admin.flushRegionServer(ServerName.valueOf(name))
67 #----------------------------------------------------------------------------------------------
68 # Requests a table or region or column family compaction
69 def compact(table_or_region_name, family = nil, type = 'NORMAL')
71 family_bytes = family.to_java_bytes unless family.nil?
74 compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL
76 compact_type = org.apache.hadoop.hbase.client.CompactType::MOB
78 raise ArgumentError, 'only NORMAL or MOB accepted for type!'
82 @admin.compactRegion(table_or_region_name.to_java_bytes, family_bytes)
83 rescue java.lang.IllegalArgumentException => e
84 @admin.compact(TableName.valueOf(table_or_region_name), family_bytes, compact_type)
88 #----------------------------------------------------------------------------------------------
89 # Switch compaction on/off at runtime on a region server
90 def compaction_switch(on_or_off, regionserver_names)
91 region_servers = regionserver_names.flatten.compact
92 servers = java.util.ArrayList.new
93 if region_servers.any?
94 region_servers.each do |s|
98 @admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers)
101 #----------------------------------------------------------------------------------------------
102 # Gets compaction state for specified table
103 def getCompactionState(table_name)
104 @admin.getCompactionState(TableName.valueOf(table_name)).name
107 # Requests to compact all regions on the regionserver
108 def compact_regionserver(servername, major = false)
109 @admin.compactRegionServer(ServerName.valueOf(servername), major)
112 #----------------------------------------------------------------------------------------------
113 # Requests a table or region or column family major compaction
114 def major_compact(table_or_region_name, family = nil, type = 'NORMAL')
116 family_bytes = family.to_java_bytes unless family.nil?
119 compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL
121 compact_type = org.apache.hadoop.hbase.client.CompactType::MOB
123 raise ArgumentError, 'only NORMAL or MOB accepted for type!'
127 @admin.majorCompactRegion(table_or_region_name.to_java_bytes, family_bytes)
128 rescue java.lang.IllegalArgumentException => e
129 @admin.majorCompact(TableName.valueOf(table_or_region_name), family_bytes, compact_type)
133 #----------------------------------------------------------------------------------------------
134 # Requests a regionserver's WAL roll
135 def wal_roll(server_name)
136 @admin.rollWALWriter(ServerName.valueOf(server_name))
138 # TODO: remove older hlog_roll version
139 alias hlog_roll wal_roll
141 #----------------------------------------------------------------------------------------------
142 # Requests a table or region split
143 def split(table_or_region_name, split_point = nil)
144 split_point_bytes = nil
145 split_point_bytes = split_point.to_java_bytes unless split_point.nil?
147 @admin.splitRegionAsync(table_or_region_name.to_java_bytes, split_point_bytes).get
148 rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException => e
149 @admin.split(TableName.valueOf(table_or_region_name), split_point_bytes)
153 #----------------------------------------------------------------------------------------------
154 # Enable/disable one split or merge switch
155 # Returns previous switch setting.
156 def splitormerge_switch(type, enabled)
158 @admin.splitSwitch(java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false))
159 elsif type == 'MERGE'
160 @admin.mergeSwitch(java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false))
162 raise ArgumentError, 'only SPLIT or MERGE accepted for type!'
166 #----------------------------------------------------------------------------------------------
167 # Query the current state of the split or merge switch.
168 # Returns the switch's state (true is enabled).
169 def splitormerge_enabled(type)
171 @admin.isSplitEnabled
172 elsif type == 'MERGE'
173 @admin.isMergeEnabled
175 raise ArgumentError, 'only SPLIT or MERGE accepted for type!'
179 def locate_region(table_name, row_key)
180 locator = @connection.getRegionLocator(TableName.valueOf(table_name))
182 return locator.getRegionLocation(Bytes.toBytesBinary(row_key))
188 #----------------------------------------------------------------------------------------------
189 # Requests a cluster balance
190 # Returns true if balancer ran
192 @admin.balancer(java.lang.Boolean.valueOf(force))
195 #----------------------------------------------------------------------------------------------
196 # Enable/disable balancer
197 # Returns previous balancer switch setting.
198 def balance_switch(enableDisable)
199 @admin.balancerSwitch(
200 java.lang.Boolean.valueOf(enableDisable), java.lang.Boolean.valueOf(false)
204 #----------------------------------------------------------------------------------------------
205 # Query the current state of the LoadBalancer.
206 # Returns the balancer's state (true is enabled).
207 def balancer_enabled?
208 @admin.isBalancerEnabled
211 #----------------------------------------------------------------------------------------------
212 # Requests clear block cache for table
213 def clear_block_cache(table_name)
214 @admin.clearBlockCache(org.apache.hadoop.hbase.TableName.valueOf(table_name)).toString
217 #----------------------------------------------------------------------------------------------
218 # Requests region normalization for all configured tables in the cluster
219 # Returns true if normalizer ran successfully
224 #----------------------------------------------------------------------------------------------
225 # Enable/disable region normalizer
226 # Returns previous normalizer switch setting.
227 def normalizer_switch(enableDisable)
228 @admin.normalizerSwitch(java.lang.Boolean.valueOf(enableDisable))
231 #----------------------------------------------------------------------------------------------
232 # Query the current state of region normalizer.
233 # Returns the state of region normalizer (true is enabled).
234 def normalizer_enabled?
235 @admin.isNormalizerEnabled
238 #----------------------------------------------------------------------------------------------
239 # Query the current state of master in maintenance mode.
240 # Returns the state of maintenance mode (true is on).
241 def in_maintenance_mode?
242 @admin.isMasterInMaintenanceMode
245 #----------------------------------------------------------------------------------------------
246 # Request a scan of the catalog table (for garbage collection)
247 # Returns an int signifying the number of entries cleaned
248 def catalogjanitor_run
249 @admin.runCatalogJanitor
252 #----------------------------------------------------------------------------------------------
253 # Enable/disable the catalog janitor
254 # Returns previous catalog janitor switch setting.
255 def catalogjanitor_switch(enableDisable)
256 @admin.catalogJanitorSwitch(java.lang.Boolean.valueOf(enableDisable))
259 #----------------------------------------------------------------------------------------------
260 # Query on the catalog janitor state (enabled/disabled?)
261 # Returns catalog janitor state (true signifies enabled).
262 def catalogjanitor_enabled
263 @admin.isCatalogJanitorEnabled
266 #----------------------------------------------------------------------------------------------
267 # Request cleaner chore to run (for garbage collection of HFiles and WAL files)
268 def cleaner_chore_run
269 @admin.runCleanerChore
272 #----------------------------------------------------------------------------------------------
273 # Enable/disable the cleaner chore
274 # Returns previous cleaner switch setting.
275 def cleaner_chore_switch(enableDisable)
276 @admin.cleanerChoreSwitch(java.lang.Boolean.valueOf(enableDisable))
279 #----------------------------------------------------------------------------------------------
280 # Query on the cleaner chore state (enabled/disabled?)
281 # Returns cleaner state (true signifies enabled).
282 def cleaner_chore_enabled
283 @admin.isCleanerChoreEnabled
286 #----------------------------------------------------------------------------------------------
288 def enable(table_name)
289 tableExists(table_name)
290 return if enabled?(table_name)
291 @admin.enableTable(TableName.valueOf(table_name))
294 #----------------------------------------------------------------------------------------------
295 # Enables all tables matching the given regex
296 def enable_all(regex)
297 pattern = Pattern.compile(regex.to_s)
298 failed = java.util.ArrayList.new
299 admin.listTableNames(pattern).each do |table_name|
301 admin.enableTable(table_name)
302 rescue java.io.IOException => e
303 puts "table:#{table_name}, error:#{e.toString}"
304 failed.add(table_name)
310 #----------------------------------------------------------------------------------------------
312 def disable(table_name)
313 tableExists(table_name)
314 return if disabled?(table_name)
315 @admin.disableTable(TableName.valueOf(table_name))
318 #----------------------------------------------------------------------------------------------
319 # Disables all tables matching the given regex
320 def disable_all(regex)
321 pattern = Pattern.compile(regex.to_s)
322 failed = java.util.ArrayList.new
323 admin.listTableNames(pattern).each do |table_name|
325 admin.disableTable(table_name)
326 rescue java.io.IOException => e
327 puts "table:#{table_name}, error:#{e.toString}"
328 failed.add(table_name)
334 #---------------------------------------------------------------------------------------------
335 # Throw exception if table doesn't exist
336 def tableExists(table_name)
337 raise ArgumentError, "Table #{table_name} does not exist." unless exists?(table_name)
340 #----------------------------------------------------------------------------------------------
342 def disabled?(table_name)
343 @admin.isTableDisabled(TableName.valueOf(table_name))
346 #----------------------------------------------------------------------------------------------
349 tableExists(table_name)
350 raise ArgumentError, "Table #{table_name} is enabled. Disable it first." if enabled?(
354 @admin.deleteTable(org.apache.hadoop.hbase.TableName.valueOf(table_name))
357 #----------------------------------------------------------------------------------------------
360 pattern = Pattern.compile(regex.to_s)
361 failed = java.util.ArrayList.new
362 admin.listTableNames(pattern).each do |table_name|
364 admin.deleteTable(table_name)
365 rescue java.io.IOException => e
366 puts puts "table:#{table_name}, error:#{e.toString}"
367 failed.add(table_name)
373 #----------------------------------------------------------------------------------------------
374 # Returns ZooKeeper status dump
376 @zk_wrapper = org.apache.hadoop.hbase.zookeeper.ZKWatcher.new(
377 @admin.getConfiguration,
381 zk = @zk_wrapper.getRecoverableZooKeeper.getZooKeeper
382 @zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk)
383 org.apache.hadoop.hbase.zookeeper.ZKUtil.dump(@zk_wrapper)
386 #----------------------------------------------------------------------------------------------
388 def create(table_name, *args)
389 # Fail if table name is not a string
390 raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String)
392 # Flatten params array
393 args = args.flatten.compact
396 # Start defining the table
397 htd = org.apache.hadoop.hbase.HTableDescriptor.new(org.apache.hadoop.hbase.TableName.valueOf(table_name))
399 # Args are either columns or splits, add them to the table definition
400 # TODO: add table options support
402 unless arg.is_a?(String) || arg.is_a?(Hash)
403 raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
406 # First, handle all the cases where arg is a column family.
407 if arg.is_a?(String) || arg.key?(NAME)
408 # If the arg is a string, default action is to add a column to the table.
409 # If arg has a name, it must also be a column descriptor.
410 descriptor = hcd(arg, htd)
411 # Warn if duplicate columns are added
412 if htd.hasFamily(descriptor.getName)
413 puts "Family '" + descriptor.getNameAsString + "' already exists, the old one will be replaced"
414 htd.modifyFamily(descriptor)
416 htd.addFamily(descriptor)
421 if arg.key?(REGION_REPLICATION)
422 region_replication = JInteger.valueOf(arg.delete(REGION_REPLICATION))
423 htd.setRegionReplication(region_replication)
426 # Get rid of the "METHOD", which is deprecated for create.
427 # We'll do whatever it used to do below if it's table_att.
428 if (method = arg.delete(METHOD))
429 raise(ArgumentError, 'table_att is currently the only supported method') unless method == 'table_att'
432 # The hash is not a column family. Figure out what's in it.
433 # First, handle splits.
434 if arg.key?(SPLITS_FILE)
435 splits_file = arg.delete(SPLITS_FILE)
436 unless File.exist?(splits_file)
437 raise(ArgumentError, "Splits file #{splits_file} doesn't exist")
440 File.foreach(splits_file) do |line|
441 arg[SPLITS].push(line.chomp)
443 htd.setValue(SPLITS_FILE, arg[SPLITS_FILE])
447 splits = Java::byte[][arg[SPLITS].size].new
449 arg.delete(SPLITS).each do |split|
450 splits[idx] = org.apache.hadoop.hbase.util.Bytes.toBytesBinary(split)
453 elsif arg.key?(NUMREGIONS) || arg.key?(SPLITALGO)
454 # deprecated region pre-split API; if one of the above is specified, will be ignored.
455 raise(ArgumentError, 'Number of regions must be specified') unless arg.key?(NUMREGIONS)
456 raise(ArgumentError, 'Split algorithm must be specified') unless arg.key?(SPLITALGO)
457 raise(ArgumentError, 'Number of regions must be greater than 1') unless arg[NUMREGIONS] > 1
458 num_regions = arg.delete(NUMREGIONS)
459 split_algo = RegionSplitter.newSplitAlgoInstance(@conf, arg.delete(SPLITALGO))
460 splits = split_algo.split(JInteger.valueOf(num_regions))
463 # Done with splits; apply formerly-table_att parameters.
464 update_htd_from_arg(htd, arg)
466 arg.each_key do |ignored_key|
467 puts(format('An argument ignored (unknown or overridden): %s', ignored_key))
471 # Fail if no column families defined
472 raise(ArgumentError, 'Table must have at least one column family') unless has_columns
475 # Perform the create table call
476 @admin.createTable(htd)
478 # Perform the create table call
479 @admin.createTable(htd, splits)
483 #----------------------------------------------------------------------------------------------
484 #----------------------------------------------------------------------------------------------
486 def assign(region_name)
487 @admin.assign(region_name.to_java_bytes)
490 #----------------------------------------------------------------------------------------------
492 def unassign(region_name, force)
493 @admin.unassign(region_name.to_java_bytes, java.lang.Boolean.valueOf(force))
496 #----------------------------------------------------------------------------------------------
498 def move(encoded_region_name, server = nil)
499 @admin.move(encoded_region_name.to_java_bytes, server ? server.to_java_bytes : nil)
502 #----------------------------------------------------------------------------------------------
504 def merge_region(region_a_name, region_b_name, force)
505 @admin.mergeRegions(region_a_name.to_java_bytes,
506 region_b_name.to_java_bytes,
507 java.lang.Boolean.valueOf(force))
510 #----------------------------------------------------------------------------------------------
511 # Returns table's structure description
512 def describe(table_name)
513 tableExists(table_name)
514 @admin.getDescriptor(TableName.valueOf(table_name)).to_s
517 def get_column_families(table_name)
518 tableExists(table_name)
519 @admin.getDescriptor(TableName.valueOf(table_name)).getColumnFamilies
522 def get_table_attributes(table_name)
523 tableExists(table_name)
524 @admin.getDescriptor(TableName.valueOf(table_name)).toStringTableAttributes
527 #----------------------------------------------------------------------------------------------
528 # Truncates table (deletes all records by recreating the table)
529 def truncate(table_name_str)
530 puts "Truncating '#{table_name_str}' table (it may take a while):"
531 table_name = TableName.valueOf(table_name_str)
532 table_description = @admin.getDescriptor(table_name)
533 raise ArgumentError, "Table #{table_name_str} is not enabled. Enable it first." unless
534 enabled?(table_name_str)
535 puts 'Disabling table...'
536 @admin.disableTable(table_name)
539 puts 'Truncating table...'
540 @admin.truncateTable(table_name, false)
542 # Handle the compatibility case, where the truncate method doesn't exists on the Master
543 raise e unless e.respond_to?(:cause) && !e.cause.nil?
545 if rootCause.is_a?(org.apache.hadoop.hbase.DoNotRetryIOException)
546 # Handle the compatibility case, where the truncate method doesn't exists on the Master
547 puts 'Dropping table...'
548 @admin.deleteTable(table_name)
550 puts 'Creating table...'
551 @admin.createTable(table_description)
558 #----------------------------------------------------------------------------------------------
559 # Truncates table while maintaing region boundaries (deletes all records by recreating the table)
560 def truncate_preserve(table_name_str, conf = @conf)
561 puts "Truncating '#{table_name_str}' table (it may take a while):"
562 table_name = TableName.valueOf(table_name_str)
563 locator = @connection.getRegionLocator(table_name)
565 splits = locator.getAllRegionLocations
566 .map { |i| Bytes.toStringBinary(i.getRegionInfo.getStartKey) }
567 .delete_if { |k| k == '' }.to_java :String
568 splits = org.apache.hadoop.hbase.util.Bytes.toBinaryByteArrays(splits)
573 table_description = @admin.getDescriptor(table_name)
574 puts 'Disabling table...'
575 disable(table_name_str)
578 puts 'Truncating table...'
580 unless conf.getBoolean('hbase.client.truncatetable.support', true)
581 raise UnsupportedMethodException, 'truncateTable'
583 @admin.truncateTable(table_name, true)
585 # Handle the compatibility case, where the truncate method doesn't exists on the Master
586 raise e unless e.respond_to?(:cause) && !e.cause.nil?
588 if rootCause.is_a?(org.apache.hadoop.hbase.DoNotRetryIOException)
589 # Handle the compatibility case, where the truncate method doesn't exists on the Master
590 puts 'Dropping table...'
591 @admin.deleteTable(table_name)
593 puts 'Creating table with region boundaries...'
594 @admin.createTable(table_description, splits)
601 class UnsupportedMethodException < StandardError
607 org.apache.hadoop.hbase.DoNotRetryIOException.new("#{@method_name} is not support")
611 #----------------------------------------------------------------------------------------------
612 # Check the status of alter command (number of regions reopened)
613 def alter_status(table_name)
614 # Table name should be a string
615 raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String)
618 raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name)
622 status = @admin.getAlterStatus(org.apache.hadoop.hbase.TableName.valueOf(table_name))
623 if status.getSecond != 0
624 puts "#{status.getSecond - status.getFirst}/#{status.getSecond} regions updated."
626 puts 'All regions updated.'
629 end while !status.nil? && status.getFirst != 0
633 #----------------------------------------------------------------------------------------------
634 # Change table structure or table options
635 def alter(table_name_str, wait = true, *args)
636 # Table name should be a string
637 raise(ArgumentError, 'Table name must be of type String') unless
638 table_name_str.is_a?(String)
641 raise(ArgumentError, "Can't find a table: #{table_name_str}") unless exists?(table_name_str)
643 # There should be at least one argument
644 raise(ArgumentError, 'There should be at least one argument but the table name') if args.empty?
646 table_name = TableName.valueOf(table_name_str)
648 # Get table descriptor
649 htd = org.apache.hadoop.hbase.HTableDescriptor.new(@admin.getDescriptor(table_name))
650 hasTableUpdate = false
654 # Normalize args to support column name only alter specs
655 arg = { NAME => arg } if arg.is_a?(String)
657 # Normalize args to support shortcut delete syntax
658 arg = { METHOD => 'delete', NAME => arg['delete'] } if arg['delete']
660 # There are 3 possible options.
661 # 1) Column family spec. Distinguished by having a NAME and no METHOD.
662 method = arg.delete(METHOD)
663 if method.nil? && arg.key?(NAME)
664 descriptor = hcd(arg, htd)
665 column_name = descriptor.getNameAsString
667 # If column already exist, then try to alter it. Create otherwise.
668 if htd.hasFamily(column_name.to_java_bytes)
669 htd.modifyFamily(descriptor)
671 htd.addFamily(descriptor)
673 hasTableUpdate = true
677 # 2) Method other than table_att, with some args.
678 name = arg.delete(NAME)
679 if !method.nil? && method != 'table_att'
680 # Delete column family
681 if method == 'delete'
682 raise(ArgumentError, 'NAME parameter missing for delete method') unless name
683 htd.removeFamily(name.to_java_bytes)
684 hasTableUpdate = true
685 # Unset table attributes
686 elsif method == 'table_att_unset'
687 raise(ArgumentError, 'NAME parameter missing for table_att_unset method') unless name
690 if htd.getValue(key).nil?
691 raise ArgumentError, "Could not find attribute: #{key}"
696 if htd.getValue(name).nil?
697 raise ArgumentError, "Could not find attribute: #{name}"
701 hasTableUpdate = true
702 # Unset table configuration
703 elsif method == 'table_conf_unset'
704 raise(ArgumentError, 'NAME parameter missing for table_conf_unset method') unless name
707 if htd.getConfigurationValue(key).nil?
708 raise ArgumentError, "Could not find configuration: #{key}"
710 htd.removeConfiguration(key)
713 if htd.getConfigurationValue(name).nil?
714 raise ArgumentError, "Could not find configuration: #{name}"
716 htd.removeConfiguration(name)
718 hasTableUpdate = true
721 raise ArgumentError, "Unknown method: #{method}"
724 arg.each_key do |unknown_key|
725 puts(format('Unknown argument ignored: %s', unknown_key))
731 # 3) Some args for the table, optionally with METHOD => table_att (deprecated)
732 update_htd_from_arg(htd, arg)
734 # set a coprocessor attribute
735 valid_coproc_keys = []
736 next unless arg.is_a?(Hash)
737 arg.each do |key, value|
738 k = String.new(key) # prepare to strip
741 next unless k =~ /coprocessor/i
742 v = String.new(value)
744 # TODO: We should not require user to config the coprocessor with our inner format.
745 htd.addCoprocessorWithSpec(v)
746 valid_coproc_keys << key
749 valid_coproc_keys.each do |key|
753 hasTableUpdate = true
755 arg.each_key do |unknown_key|
756 puts(format('Unknown argument ignored: %s', unknown_key))
762 # Bulk apply all table modifications.
764 future = @admin.modifyTableAsync(htd)
767 puts 'Updating all regions with the new schema...'
773 def status(format, type)
774 status = org.apache.hadoop.hbase.ClusterStatus.new(@admin.getClusterMetrics)
775 if format == 'detailed'
776 puts(format('version %s', status.getHBaseVersion))
777 # Put regions in transition first because usually empty
778 puts(format('%d regionsInTransition', status.getRegionStatesInTransition.size))
779 for v in status.getRegionStatesInTransition
780 puts(format(' %s', v))
782 master = status.getMaster
783 puts(format('active master: %s:%d %d', master.getHostname, master.getPort, master.getStartcode))
784 puts(format('%d backup masters', status.getBackupMastersSize))
785 for server in status.getBackupMasters
786 puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
789 master_coprocs = @admin.getMasterCoprocessorNames.toString
790 unless master_coprocs.nil?
791 puts(format('master coprocessors: %s', master_coprocs))
793 puts(format('%d live servers', status.getServersSize))
794 for server in status.getServers
795 puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
796 puts(format(' %s', status.getLoad(server).toString))
797 for name, region in status.getLoad(server).getRegionsLoad
798 puts(format(' %s', region.getNameAsString.dump))
799 puts(format(' %s', region.toString))
802 puts(format('%d dead servers', status.getDeadServersSize))
803 for server in status.getDeadServerNames
804 puts(format(' %s', server))
806 elsif format == 'replication'
807 puts(format('version %<version>s', version: status.getHBaseVersion))
808 puts(format('%<servers>d live servers', servers: status.getServersSize))
809 status.getServers.each do |server_status|
810 sl = status.getLoad(server_status)
811 r_sink_string = ' SINK:'
812 r_source_string = ' SOURCE:'
813 r_load_sink = sl.getReplicationLoadSink
814 next if r_load_sink.nil?
816 r_sink_string << ' AgeOfLastAppliedOp=' +
817 r_load_sink.getAgeOfLastAppliedOp.to_s
818 r_sink_string << ', TimeStampsOfLastAppliedOp=' +
819 java.util.Date.new(r_load_sink
820 .getTimeStampsOfLastAppliedOp).toString
821 r_load_source_map = sl.getReplicationLoadSourceMap
822 build_source_string(r_load_source_map, r_source_string)
823 puts(format(' %<host>s:', host: server_status.getHostname))
824 if type.casecmp('SOURCE').zero?
825 puts(format('%<source>s', source: r_source_string))
826 elsif type.casecmp('SINK').zero?
827 puts(format('%<sink>s', sink: r_sink_string))
829 puts(format('%<source>s', source: r_source_string))
830 puts(format('%<sink>s', sink: r_sink_string))
833 elsif format == 'simple'
836 master = status.getMaster
837 puts(format('active master: %s:%d %d', master.getHostname, master.getPort, master.getStartcode))
838 puts(format('%d backup masters', status.getBackupMastersSize))
839 for server in status.getBackupMasters
840 puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
842 puts(format('%d live servers', status.getServersSize))
843 for server in status.getServers
844 puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
845 puts(format(' %s', status.getLoad(server).toString))
846 load += status.getLoad(server).getNumberOfRequests
847 regions += status.getLoad(server).getNumberOfRegions
849 puts(format('%d dead servers', status.getDeadServers))
850 for server in status.getDeadServerNames
851 puts(format(' %s', server))
853 puts(format('Aggregate load: %d, regions: %d', load, regions))
855 puts "1 active master, #{status.getBackupMastersSize} backup masters, #{status.getServersSize} servers, #{status.getDeadServers} dead, #{format('%.4f', status.getAverageLoad)} average load"
859 def build_source_string(r_load_source_map, r_source_string)
860 r_load_source_map.each do |peer, sources|
861 r_source_string << ' PeerID=' + peer
862 sources.each do |source_load|
863 build_queue_title(source_load, r_source_string)
864 build_running_source_stats(source_load, r_source_string)
869 def build_queue_title(source_load, r_source_string)
870 r_source_string << if source_load.isRecovered
871 "\n Recovered Queue: "
875 r_source_string << source_load.getQueueId
878 def build_running_source_stats(source_load, r_source_string)
879 if source_load.isRunning
880 build_shipped_stats(source_load, r_source_string)
881 build_load_general_stats(source_load, r_source_string)
882 r_source_string << ', Replication Lag=' +
883 source_load.getReplicationLag.to_s
885 r_source_string << "\n "
886 r_source_string << 'No Reader/Shipper threads runnning yet.'
890 def build_shipped_stats(source_load, r_source_string)
891 r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
893 'No Ops shipped since last restart'
895 "\n AgeOfLastShippedOp=" +
896 source_load.getAgeOfLastShippedOp.to_s +
897 ', TimeStampOfLastShippedOp=' +
898 java.util.Date.new(source_load
899 .getTimeStampOfLastShippedOp).toString
903 def build_load_general_stats(source_load, r_source_string)
904 r_source_string << ', SizeOfLogQueue=' +
905 source_load.getSizeOfLogQueue.to_s
906 r_source_string << ', EditsReadFromLogQueue=' +
907 source_load.getEditsRead.to_s
908 r_source_string << ', OpsShippedToTarget=' +
909 source_load.getOPsShipped.to_s
910 build_edits_for_source(source_load, r_source_string)
913 def build_edits_for_source(source_load, r_source_string)
914 if source_load.hasEditsSinceRestart
915 r_source_string << ', TimeStampOfNextToReplicate=' +
916 java.util.Date.new(source_load
917 .getTimeStampOfNextToReplicate).toString
919 r_source_string << ', No edits for this source'
920 r_source_string << ' since it started'
924 #----------------------------------------------------------------------------------------------
930 def exists?(table_name)
931 @admin.tableExists(TableName.valueOf(table_name))
934 #----------------------------------------------------------------------------------------------
936 def enabled?(table_name)
937 @admin.isTableEnabled(TableName.valueOf(table_name))
940 #----------------------------------------------------------------------------------------------
941 # Return a new HColumnDescriptor made of passed args
943 # String arg, single parameter constructor
944 return org.apache.hadoop.hbase.HColumnDescriptor.new(arg) if arg.is_a?(String)
946 raise(ArgumentError, "Column family #{arg} must have a name") unless name = arg.delete(NAME)
948 family = htd.getFamily(name.to_java_bytes)
949 # create it if it's a new family
950 family ||= org.apache.hadoop.hbase.HColumnDescriptor.new(name.to_java_bytes)
952 family.setBlockCacheEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE)
953 family.setScope(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE)
954 family.setCacheDataOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE)
955 family.setCacheIndexesOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_INDEX_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_INDEX_ON_WRITE)
956 family.setCacheBloomsOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_BLOOMS_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_BLOOMS_ON_WRITE)
957 family.setEvictBlocksOnClose(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::EVICT_BLOCKS_ON_CLOSE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::EVICT_BLOCKS_ON_CLOSE)
958 family.setInMemory(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY)
959 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION)
960 family.setInMemoryCompaction(
961 org.apache.hadoop.hbase.MemoryCompactionPolicy.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION))
964 family.setTimeToLive(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::TTL)) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL)
965 family.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING)
966 family.setBlocksize(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
967 family.setMaxVersions(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
968 family.setMinVersions(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
969 family.setKeepDeletedCells(org.apache.hadoop.hbase.KeepDeletedCells.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS).to_s.upcase)) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS)
970 family.setCompressTags(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESS_TAGS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESS_TAGS)
971 family.setPrefetchBlocksOnOpen(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::PREFETCH_BLOCKS_ON_OPEN))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::PREFETCH_BLOCKS_ON_OPEN)
972 family.setMobEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IS_MOB))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IS_MOB)
973 family.setMobThreshold(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_THRESHOLD))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_THRESHOLD)
974 family.setNewVersionBehavior(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::NEW_VERSION_BEHAVIOR))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::NEW_VERSION_BEHAVIOR)
975 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER)
976 bloomtype = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER).upcase.to_sym
977 if org.apache.hadoop.hbase.regionserver.BloomType.constants.include?(bloomtype)
978 family.setBloomFilterType(org.apache.hadoop.hbase.regionserver.BloomType.valueOf(bloomtype))
980 raise(ArgumentError, "BloomFilter type #{bloomtype} is not supported. Use one of " + org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.join(' '))
983 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION)
984 compression = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION).upcase.to_sym
985 if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression)
986 family.setCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression))
988 raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
991 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION)
992 algorithm = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION).upcase
993 family.setEncryptionType(algorithm)
994 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION_KEY)
995 key = org.apache.hadoop.hbase.io.crypto.Encryption.pbkdf128(
996 arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION_KEY)
998 family.setEncryptionKey(org.apache.hadoop.hbase.security.EncryptionUtil.wrapKey(@conf, key,
1002 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT)
1003 compression = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT).upcase.to_sym
1004 if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression)
1005 family.setCompactionCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression))
1007 raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
1010 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY)
1011 storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
1012 family.setStoragePolicy(storage_policy)
1014 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY)
1015 mob_partition_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY).upcase.to_sym
1016 if org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.include?(mob_partition_policy)
1017 family.setMobCompactPartitionPolicy(org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.valueOf(mob_partition_policy))
1019 raise(ArgumentError, "MOB_COMPACT_PARTITION_POLICY #{mob_partition_policy} is not supported. Use one of " + org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.join(' '))
1023 set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
1024 set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
1025 if arg.include?(org.apache.hadoop.hbase
1026 .HColumnDescriptor::DFS_REPLICATION)
1027 family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase
1028 .HColumnDescriptor::DFS_REPLICATION)))
1031 arg.each_key do |unknown_key|
1032 puts(format('Unknown argument ignored for column family %s: %s', name, unknown_key))
1038 #----------------------------------------------------------------------------------------------
1039 # Enables/disables a region by name
1040 def online(region_name, on_off)
1042 meta = @connection.getTable(org.apache.hadoop.hbase.TableName::META_TABLE_NAME)
1045 # FIXME: fail gracefully if can't find the region
1046 region_bytes = region_name.to_java_bytes
1047 g = org.apache.hadoop.hbase.client.Get.new(region_bytes)
1048 g.addColumn(org.apache.hadoop.hbase.HConstants::CATALOG_FAMILY, org.apache.hadoop.hbase.HConstants::REGIONINFO_QUALIFIER)
1049 hri_bytes = meta.get(g).value
1051 # Change region status
1052 hri = org.apache.hadoop.hbase.util.Writables.getWritable(hri_bytes, org.apache.hadoop.hbase.HRegionInfo.new)
1053 hri.setOffline(on_off)
1056 put = org.apache.hadoop.hbase.client.Put.new(region_bytes)
1057 put.addColumn(org.apache.hadoop.hbase.HConstants::CATALOG_FAMILY,
1058 org.apache.hadoop.hbase.HConstants::REGIONINFO_QUALIFIER,
1059 org.apache.hadoop.hbase.util.Writables.getBytes(hri))
1063 # Apply user metadata to table/column descriptor
1064 def set_user_metadata(descriptor, metadata)
1065 raise(ArgumentError, "#{METADATA} must be a Hash type") unless metadata.is_a?(Hash)
1066 for k, v in metadata
1067 v = v.to_s unless v.nil?
1068 descriptor.setValue(k, v)
1072 #----------------------------------------------------------------------------------------------
1073 # Take a snapshot of specified table
1074 def snapshot(table, snapshot_name, *args)
1075 # Table name should be a string
1076 raise(ArgumentError, 'Table name must be of type String') unless table.is_a?(String)
1078 # Snapshot name should be a string
1079 raise(ArgumentError, 'Snapshot name must be of type String') unless
1080 snapshot_name.is_a?(String)
1082 table_name = TableName.valueOf(table)
1084 @admin.snapshot(snapshot_name, table_name)
1087 if arg[SKIP_FLUSH] == true
1088 @admin.snapshot(snapshot_name, table_name,
1089 org.apache.hadoop.hbase.client.SnapshotType::SKIPFLUSH)
1091 @admin.snapshot(snapshot_name, table_name)
1097 #----------------------------------------------------------------------------------------------
1098 # Restore specified snapshot
1099 def restore_snapshot(snapshot_name, restore_acl = false)
1100 conf = @connection.getConfiguration
1101 take_fail_safe_snapshot = conf.getBoolean('hbase.snapshot.restore.take.failsafe.snapshot', false)
1102 @admin.restoreSnapshot(snapshot_name, take_fail_safe_snapshot, restore_acl)
1105 #----------------------------------------------------------------------------------------------
1106 # Create a new table by cloning the snapshot content
1107 def clone_snapshot(snapshot_name, table, restore_acl = false)
1108 @admin.cloneSnapshot(snapshot_name, TableName.valueOf(table), restore_acl)
1111 #----------------------------------------------------------------------------------------------
1112 # Delete specified snapshot
1113 def delete_snapshot(snapshot_name)
1114 @admin.deleteSnapshot(snapshot_name)
1117 #----------------------------------------------------------------------------------------------
1118 # Deletes the snapshots matching the given regex
1119 def delete_all_snapshot(regex)
1120 @admin.deleteSnapshots(Pattern.compile(regex)).to_a
1123 #----------------------------------------------------------------------------------------------
1124 # Deletes the table snapshots matching the given regex
1125 def delete_table_snapshots(tableNameRegex, snapshotNameRegex = '.*')
1126 @admin.deleteTableSnapshots(Pattern.compile(tableNameRegex),
1127 Pattern.compile(snapshotNameRegex)).to_a
1130 #----------------------------------------------------------------------------------------------
1131 # Returns a list of snapshots
1132 def list_snapshot(regex = '.*')
1133 @admin.listSnapshots(Pattern.compile(regex)).to_a
1136 #----------------------------------------------------------------------------------------------
1137 # Returns a list of table snapshots
1138 def list_table_snapshots(tableNameRegex, snapshotNameRegex = '.*')
1139 @admin.listTableSnapshots(Pattern.compile(tableNameRegex),
1140 Pattern.compile(snapshotNameRegex)).to_a
1143 #----------------------------------------------------------------------------------------------
1144 # Returns the ClusterStatus of the cluster
1145 def getClusterStatus
1146 org.apache.hadoop.hbase.ClusterStatus.new(@admin.getClusterMetrics)
1149 #----------------------------------------------------------------------------------------------
1150 # Returns a list of regionservers
1151 def getRegionServers
1152 org.apache.hadoop.hbase.ClusterStatus.new(@admin.getClusterMetrics).getServers.map { |serverName| serverName }
1155 #----------------------------------------------------------------------------------------------
1156 # Returns servername corresponding to passed server_name_string
1157 def getServerName(server_name_string)
1158 regionservers = getRegionServers
1160 if ServerName.isFullServerName(server_name_string)
1161 return ServerName.valueOf(server_name_string)
1163 name_list = server_name_string.split(',')
1165 regionservers.each do|sn|
1166 if name_list[0] == sn.hostname && (name_list[1].nil? ? true : (name_list[1] == sn.port.to_s))
1175 #----------------------------------------------------------------------------------------------
1176 # Returns a list of servernames
1177 def getServerNames(servers, should_return_all_if_servers_empty)
1178 regionservers = getRegionServers
1182 # if no servers were specified as arguments, get a list of all servers
1183 if should_return_all_if_servers_empty
1184 servernames = regionservers
1187 # Strings replace with ServerName objects in servers array
1189 while i < servers.length
1192 if ServerName.isFullServerName(server)
1193 servernames.push(ServerName.valueOf(server))
1195 name_list = server.split(',')
1197 while j < regionservers.length
1198 sn = regionservers[j]
1199 if name_list[0] == sn.hostname && (name_list[1].nil? ? true : (name_list[1] == sn.port.to_s))
1200 servernames.push(sn)
1212 # Apply config specific to a table/column to its descriptor
1213 def set_descriptor_config(descriptor, config)
1214 raise(ArgumentError, "#{CONFIGURATION} must be a Hash type") unless config.is_a?(Hash)
1216 v = v.to_s unless v.nil?
1217 descriptor.setConfiguration(k, v)
1221 #----------------------------------------------------------------------------------------------
1222 # Updates the configuration of one regionserver.
1223 def update_config(serverName)
1224 @admin.updateConfiguration(ServerName.valueOf(serverName))
1227 #----------------------------------------------------------------------------------------------
1228 # Updates the configuration of all the regionservers.
1229 def update_all_config
1230 @admin.updateConfiguration
1233 #----------------------------------------------------------------------------------------------
1234 # Returns namespace's structure description
1235 def describe_namespace(namespace_name)
1236 namespace = @admin.getNamespaceDescriptor(namespace_name)
1238 return namespace.to_s unless namespace.nil?
1240 raise(ArgumentError, "Failed to find namespace named #{namespace_name}")
1243 #----------------------------------------------------------------------------------------------
1244 # Returns a list of namespaces in hbase
1245 def list_namespace(regex = '.*')
1246 pattern = java.util.regex.Pattern.compile(regex)
1247 list = @admin.listNamespaceDescriptors.map(&:getName)
1248 list.select { |s| pattern.match(s) }
1251 #----------------------------------------------------------------------------------------------
1252 # Returns a list of tables in namespace
1253 def list_namespace_tables(namespace_name)
1254 unless namespace_name.nil?
1255 return @admin.listTableNamesByNamespace(namespace_name).map(&:getQualifierAsString)
1258 raise(ArgumentError, "Failed to find namespace named #{namespace_name}")
1261 #----------------------------------------------------------------------------------------------
1262 # Creates a namespace
1263 def create_namespace(namespace_name, *args)
1264 # Fail if table name is not a string
1265 raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String)
1267 # Flatten params array
1268 args = args.flatten.compact
1270 # Start defining the table
1271 nsb = org.apache.hadoop.hbase.NamespaceDescriptor.create(namespace_name)
1273 unless arg.is_a?(Hash)
1274 raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
1277 v = v.to_s unless v.nil?
1278 nsb.addConfiguration(k, v)
1281 @admin.createNamespace(nsb.build)
1284 #----------------------------------------------------------------------------------------------
1285 # modify a namespace
1286 def alter_namespace(namespace_name, *args)
1287 # Fail if table name is not a string
1288 raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String)
1290 nsd = @admin.getNamespaceDescriptor(namespace_name)
1292 raise(ArgumentError, 'Namespace does not exist') unless nsd
1293 nsb = org.apache.hadoop.hbase.NamespaceDescriptor.create(nsd)
1295 # Flatten params array
1296 args = args.flatten.compact
1298 # Start defining the table
1300 unless arg.is_a?(Hash)
1301 raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash type")
1303 method = arg[METHOD]
1304 if method == 'unset'
1305 nsb.removeConfiguration(arg[NAME])
1306 elsif method == 'set'
1309 v = v.to_s unless v.nil?
1311 nsb.addConfiguration(k, v)
1314 raise(ArgumentError, "Unknown method #{method}")
1317 @admin.modifyNamespace(nsb.build)
1320 #----------------------------------------------------------------------------------------------
1322 def drop_namespace(namespace_name)
1323 @admin.deleteNamespace(namespace_name)
1326 #----------------------------------------------------------------------------------------------
1327 # Get security capabilities
1328 def get_security_capabilities
1329 @admin.getSecurityCapabilities
1332 # List all procedures
1334 @admin.getProcedures
1342 # Parse arguments and update HTableDescriptor accordingly
1343 def update_htd_from_arg(htd, arg)
1344 htd.setOwnerString(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::OWNER)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::OWNER)
1345 htd.setMaxFileSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MAX_FILESIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MAX_FILESIZE)
1346 htd.setReadOnly(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::READONLY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::READONLY)
1347 htd.setCompactionEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::COMPACTION_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::COMPACTION_ENABLED)
1348 htd.setSplitEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_ENABLED)
1349 htd.setMergeEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MERGE_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MERGE_ENABLED)
1350 htd.setNormalizationEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZATION_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZATION_ENABLED)
1351 htd.setNormalizerTargetRegionCount(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_COUNT))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_COUNT)
1352 htd.setNormalizerTargetRegionSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_SIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_SIZE)
1353 htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MEMSTORE_FLUSHSIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MEMSTORE_FLUSHSIZE)
1354 htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::DURABILITY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::DURABILITY)
1355 htd.setPriority(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::PRIORITY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::PRIORITY)
1356 htd.setFlushPolicyClassName(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::FLUSH_POLICY)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::FLUSH_POLICY)
1357 htd.setRegionMemstoreReplication(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::REGION_MEMSTORE_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::REGION_MEMSTORE_REPLICATION)
1358 htd.setRegionSplitPolicyClassName(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_POLICY)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_POLICY)
1359 htd.setRegionReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::REGION_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::REGION_REPLICATION)
1360 set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
1361 set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
1364 #----------------------------------------------------------------------------------------------
1365 # clear compaction queues
1366 def clear_compaction_queues(server_name, queue_name = nil)
1367 names = %w[long short]
1368 queues = java.util.HashSet.new
1372 elsif queue_name.is_a?(String)
1373 queues.add(queue_name)
1374 unless names.include?(queue_name)
1375 raise(ArgumentError, "Unknown queue name #{queue_name}")
1377 elsif queue_name.is_a?(Array)
1378 queue_name.each do |s|
1380 unless names.include?(s)
1381 raise(ArgumentError, "Unknown queue name #{s}")
1385 raise(ArgumentError, "Unknown queue name #{queue_name}")
1387 @admin.clearCompactionQueues(ServerName.valueOf(server_name), queues)
1390 #----------------------------------------------------------------------------------------------
1391 # clear dead region servers
1392 def list_deadservers
1393 @admin.listDeadServers.to_a
1396 #----------------------------------------------------------------------------------------------
1397 # clear dead region servers
1398 def clear_deadservers(dead_servers)
1399 # Flatten params array
1400 dead_servers = dead_servers.flatten.compact
1401 if dead_servers.empty?
1402 servers = list_deadservers
1404 servers = java.util.ArrayList.new
1405 dead_servers.each do |s|
1406 servers.add(ServerName.valueOf(s))
1409 @admin.clearDeadServers(servers).to_a
1412 #----------------------------------------------------------------------------------------------
1413 # List live region servers
1414 def list_liveservers
1415 org.apache.hadoop.hbase.ClusterStatus.new(@admin.getClusterMetrics).getServers.to_a
1418 #---------------------------------------------------------------------------
1419 # create a new table by cloning the existent table schema.
1420 def clone_table_schema(table_name, new_table_name, preserve_splits = true)
1421 @admin.cloneTableSchema(TableName.valueOf(table_name),
1422 TableName.valueOf(new_table_name),
1426 #----------------------------------------------------------------------------------------------
1427 # List decommissioned RegionServers
1428 def list_decommissioned_regionservers
1429 @admin.listDecommissionedRegionServers
1432 #----------------------------------------------------------------------------------------------
1433 # Decommission a list of region servers, optionally offload corresponding regions
1434 def decommission_regionservers(host_or_servers, should_offload)
1435 # Fail if host_or_servers is neither a string nor an array
1436 unless host_or_servers.is_a?(Array) || host_or_servers.is_a?(String)
1437 raise(ArgumentError,
1438 "#{host_or_servers.class} of #{host_or_servers.inspect} is not of Array/String type")
1441 # Fail if should_offload is neither a TrueClass/FalseClass nor a string
1442 unless (!!should_offload == should_offload) || should_offload.is_a?(String)
1443 raise(ArgumentError, "#{should_offload} is not a boolean value")
1446 # If a string is passed, convert it to an array
1447 _host_or_servers = host_or_servers.is_a?(Array) ?
1449 java.util.Arrays.asList(host_or_servers)
1451 # Retrieve the server names corresponding to passed _host_or_servers list
1452 server_names = getServerNames(_host_or_servers, false)
1454 # Fail, if we can not find any server(s) corresponding to the passed host_or_servers
1455 if server_names.empty?
1456 raise(ArgumentError,
1457 "Could not find any server(s) with specified name(s): #{host_or_servers}")
1460 @admin.decommissionRegionServers(server_names,
1461 java.lang.Boolean.valueOf(should_offload))
1464 #----------------------------------------------------------------------------------------------
1465 # Recommission a region server, optionally load a list of passed regions
1466 def recommission_regionserver(server_name_string, encoded_region_names)
1467 # Fail if server_name_string is not a string
1468 unless server_name_string.is_a?(String)
1469 raise(ArgumentError,
1470 "#{server_name_string.class} of #{server_name_string.inspect} is not of String type")
1473 # Fail if encoded_region_names is not an array
1474 unless encoded_region_names.is_a?(Array)
1475 raise(ArgumentError,
1476 "#{encoded_region_names.class} of #{encoded_region_names.inspect} is not of Array type")
1479 # Convert encoded_region_names from string to bytes (element-wise)
1480 region_names_in_bytes = encoded_region_names
1481 .map {|region_name| region_name.to_java_bytes}
1484 # Retrieve the server name corresponding to the passed server_name_string
1485 server_name = getServerName(server_name_string)
1487 # Fail if we can not find a server corresponding to the passed server_name_string
1489 raise(ArgumentError,
1490 "Could not find any server with name #{server_name_string}")
1493 @admin.recommissionRegionServer(server_name, region_names_in_bytes)
1496 #----------------------------------------------------------------------------------------------
1497 # Stop the active Master
1502 # Stop the given RegionServer
1503 def stop_regionserver(hostport)
1504 @admin.stopRegionServer(hostport)
1507 # rubocop:enable Metrics/ClassLength