3 # Licensed to the Apache Software Foundation (ASF) under one
4 # or more contributor license agreements. See the NOTICE file
5 # distributed with this work for additional information
6 # regarding copyright ownership. The ASF licenses this file
7 # to you under the Apache License, Version 2.0 (the
8 # "License"); you may not use this file except in compliance
9 # with the License. You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
21 java_import java.util.Arrays
22 java_import java.util.regex.Pattern
23 java_import org.apache.hadoop.hbase.util.Pair
24 java_import org.apache.hadoop.hbase.util.RegionSplitter
25 java_import org.apache.hadoop.hbase.util.Bytes
26 java_import org.apache.hadoop.hbase.ServerName
27 java_import org.apache.hadoop.hbase.TableName
29 # Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
32 # rubocop:disable Metrics/ClassLength
34 include HBaseConstants
36 def initialize(connection)
37 @connection = connection
39 @admin = @connection.getAdmin
40 @hbck = @connection.getHbck
41 @conf = @connection.getConfiguration
48 #----------------------------------------------------------------------------------------------
49 # Returns a list of tables in hbase
50 def list(regex = '.*')
51 @admin.listTableNames(Pattern.compile(regex)).map(&:getNameAsString)
54 #----------------------------------------------------------------------------------------------
55 # Requests a table or region or region server flush
57 @admin.flushRegion(name.to_java_bytes)
58 rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
59 # Unknown region. Try table.
61 @admin.flush(TableName.valueOf(name))
62 rescue java.lang.IllegalArgumentException
63 # Unknown table. Try region server.
64 @admin.flushRegionServer(ServerName.valueOf(name))
68 #----------------------------------------------------------------------------------------------
69 # Requests a table or region or column family compaction
70 def compact(table_or_region_name, family = nil, type = 'NORMAL')
72 family_bytes = family.to_java_bytes unless family.nil?
75 compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL
77 compact_type = org.apache.hadoop.hbase.client.CompactType::MOB
79 raise ArgumentError, 'only NORMAL or MOB accepted for type!'
84 @admin.compactRegion(table_or_region_name.to_java_bytes)
86 @admin.compactRegion(table_or_region_name.to_java_bytes, family_bytes)
88 rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
90 @admin.compact(TableName.valueOf(table_or_region_name), compact_type)
92 @admin.compact(TableName.valueOf(table_or_region_name), family_bytes, compact_type)
97 #----------------------------------------------------------------------------------------------
98 # Switch compaction on/off at runtime on a region server
99 def compaction_switch(on_or_off, regionserver_names)
100 region_servers = regionserver_names.flatten.compact
101 servers = java.util.ArrayList.new
102 if region_servers.any?
103 region_servers.each do |s|
107 @admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers)
110 #----------------------------------------------------------------------------------------------
111 # Gets compaction state for specified table
112 def getCompactionState(table_name)
113 @admin.getCompactionState(TableName.valueOf(table_name)).name
116 # Requests to compact all regions on the regionserver
117 def compact_regionserver(servername, major = false)
119 @admin.majorCompactRegionServer(ServerName.valueOf(servername))
121 @admin.compactRegionServer(ServerName.valueOf(servername))
125 #----------------------------------------------------------------------------------------------
126 # Requests a table or region or column family major compaction
127 def major_compact(table_or_region_name, family = nil, type = 'NORMAL')
129 family_bytes = family.to_java_bytes unless family.nil?
132 compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL
134 compact_type = org.apache.hadoop.hbase.client.CompactType::MOB
136 raise ArgumentError, 'only NORMAL or MOB accepted for type!'
141 @admin.majorCompactRegion(table_or_region_name.to_java_bytes)
143 @admin.majorCompactRegion(table_or_region_name.to_java_bytes, family_bytes)
145 rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
147 @admin.majorCompact(TableName.valueOf(table_or_region_name), compact_type)
149 @admin.majorCompact(TableName.valueOf(table_or_region_name), family_bytes, compact_type)
154 #----------------------------------------------------------------------------------------------
155 # Requests a regionserver's WAL roll
156 def wal_roll(server_name)
157 @admin.rollWALWriter(ServerName.valueOf(server_name))
159 # TODO: remove older hlog_roll version
160 alias hlog_roll wal_roll
162 #----------------------------------------------------------------------------------------------
163 # Requests a table or region split
164 def split(table_or_region_name, split_point = nil)
165 split_point_bytes = nil
166 split_point_bytes = split_point.to_java_bytes unless split_point.nil?
168 if split_point_bytes.nil?
169 org.apache.hadoop.hbase.util.FutureUtils.get(@admin.splitRegionAsync(table_or_region_name.to_java_bytes))
171 org.apache.hadoop.hbase.util.FutureUtils.get(@admin.splitRegionAsync(table_or_region_name.to_java_bytes, split_point_bytes))
173 rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
174 if split_point_bytes.nil?
175 @admin.split(TableName.valueOf(table_or_region_name))
177 @admin.split(TableName.valueOf(table_or_region_name), split_point_bytes)
182 #----------------------------------------------------------------------------------------------
183 # Enable/disable one split or merge switch
184 # Returns previous switch setting.
185 def splitormerge_switch(type, enabled)
187 @admin.splitSwitch(java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false))
188 elsif type == 'MERGE'
189 @admin.mergeSwitch(java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false))
191 raise ArgumentError, 'only SPLIT or MERGE accepted for type!'
195 #----------------------------------------------------------------------------------------------
196 # Query the current state of the split or merge switch.
197 # Returns the switch's state (true is enabled).
198 def splitormerge_enabled(type)
200 @admin.isSplitEnabled
201 elsif type == 'MERGE'
202 @admin.isMergeEnabled
204 raise ArgumentError, 'only SPLIT or MERGE accepted for type!'
208 def locate_region(table_name, row_key)
209 locator = @connection.getRegionLocator(TableName.valueOf(table_name))
211 return locator.getRegionLocation(Bytes.toBytesBinary(row_key))
217 #----------------------------------------------------------------------------------------------
218 # Requests a cluster balance
219 # Returns true if balancer ran
221 @admin.balance(java.lang.Boolean.valueOf(force))
224 #----------------------------------------------------------------------------------------------
225 # Enable/disable balancer
226 # Returns previous balancer switch setting.
227 def balance_switch(enableDisable)
228 @admin.balancerSwitch(
229 java.lang.Boolean.valueOf(enableDisable), java.lang.Boolean.valueOf(false)
233 #----------------------------------------------------------------------------------------------
234 # Query the current state of the LoadBalancer.
235 # Returns the balancer's state (true is enabled).
236 def balancer_enabled?
237 @admin.isBalancerEnabled
240 #----------------------------------------------------------------------------------------------
241 # Requests clear block cache for table
242 def clear_block_cache(table_name)
243 @admin.clearBlockCache(org.apache.hadoop.hbase.TableName.valueOf(table_name)).toString
246 #----------------------------------------------------------------------------------------------
247 # Requests region normalization for all configured tables in the cluster
248 # Returns true if normalizer ran successfully
253 #----------------------------------------------------------------------------------------------
254 # Enable/disable region normalizer
255 # Returns previous normalizer switch setting.
256 def normalizer_switch(enableDisable)
257 @admin.normalizerSwitch(java.lang.Boolean.valueOf(enableDisable))
260 #----------------------------------------------------------------------------------------------
261 # Query the current state of region normalizer.
262 # Returns the state of region normalizer (true is enabled).
263 def normalizer_enabled?
264 @admin.isNormalizerEnabled
267 #----------------------------------------------------------------------------------------------
268 # Query the current state of master in maintenance mode.
269 # Returns the state of maintenance mode (true is on).
270 def in_maintenance_mode?
271 @admin.isMasterInMaintenanceMode
274 #----------------------------------------------------------------------------------------------
275 # Request HBCK chore to run
280 #----------------------------------------------------------------------------------------------
281 # Request a scan of the catalog table (for garbage collection)
282 # Returns an int signifying the number of entries cleaned
283 def catalogjanitor_run
284 @admin.runCatalogJanitor
287 #----------------------------------------------------------------------------------------------
288 # Enable/disable the catalog janitor
289 # Returns previous catalog janitor switch setting.
290 def catalogjanitor_switch(enableDisable)
291 @admin.catalogJanitorSwitch(java.lang.Boolean.valueOf(enableDisable))
294 #----------------------------------------------------------------------------------------------
295 # Query on the catalog janitor state (enabled/disabled?)
296 # Returns catalog janitor state (true signifies enabled).
297 def catalogjanitor_enabled
298 @admin.isCatalogJanitorEnabled
301 #----------------------------------------------------------------------------------------------
302 # Request cleaner chore to run (for garbage collection of HFiles and WAL files)
303 def cleaner_chore_run
304 @admin.runCleanerChore
307 #----------------------------------------------------------------------------------------------
308 # Enable/disable the cleaner chore
309 # Returns previous cleaner switch setting.
310 def cleaner_chore_switch(enableDisable)
311 @admin.cleanerChoreSwitch(java.lang.Boolean.valueOf(enableDisable))
314 #----------------------------------------------------------------------------------------------
315 # Query on the cleaner chore state (enabled/disabled?)
316 # Returns cleaner state (true signifies enabled).
317 def cleaner_chore_enabled
318 @admin.isCleanerChoreEnabled
321 #----------------------------------------------------------------------------------------------
323 def enable(table_name)
324 tableExists(table_name)
325 return if enabled?(table_name)
326 @admin.enableTable(TableName.valueOf(table_name))
329 #----------------------------------------------------------------------------------------------
330 # Enables all tables matching the given regex
331 def enable_all(regex)
332 pattern = Pattern.compile(regex.to_s)
333 failed = java.util.ArrayList.new
334 @admin.listTableNames(pattern).each do |table_name|
336 @admin.enableTable(table_name)
337 rescue java.io.IOException => e
338 puts "table:#{table_name}, error:#{e.toString}"
339 failed.add(table_name)
345 #----------------------------------------------------------------------------------------------
347 def disable(table_name)
348 tableExists(table_name)
349 return if disabled?(table_name)
350 @admin.disableTable(TableName.valueOf(table_name))
353 #----------------------------------------------------------------------------------------------
354 # Disables all tables matching the given regex
355 def disable_all(regex)
356 pattern = Pattern.compile(regex.to_s)
357 failed = java.util.ArrayList.new
358 @admin.listTableNames(pattern).each do |table_name|
360 @admin.disableTable(table_name)
361 rescue java.io.IOException => e
362 puts "table:#{table_name}, error:#{e.toString}"
363 failed.add(table_name)
369 #---------------------------------------------------------------------------------------------
370 # Throw exception if table doesn't exist
371 def tableExists(table_name)
372 raise ArgumentError, "Table #{table_name} does not exist." unless exists?(table_name)
375 #----------------------------------------------------------------------------------------------
377 def disabled?(table_name)
378 @admin.isTableDisabled(TableName.valueOf(table_name))
381 #----------------------------------------------------------------------------------------------
384 tableExists(table_name)
385 raise ArgumentError, "Table #{table_name} is enabled. Disable it first." if enabled?(
389 @admin.deleteTable(org.apache.hadoop.hbase.TableName.valueOf(table_name))
392 #----------------------------------------------------------------------------------------------
395 pattern = Pattern.compile(regex.to_s)
396 failed = java.util.ArrayList.new
397 @admin.listTableNames(pattern).each do |table_name|
399 @admin.deleteTable(table_name)
400 rescue java.io.IOException => e
401 puts puts "table:#{table_name}, error:#{e.toString}"
402 failed.add(table_name)
408 #----------------------------------------------------------------------------------------------
409 # Returns ZooKeeper status dump
411 @zk_wrapper = org.apache.hadoop.hbase.zookeeper.ZKWatcher.new(
412 @admin.getConfiguration,
416 zk = @zk_wrapper.getRecoverableZooKeeper.getZooKeeper
417 @zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk)
418 org.apache.hadoop.hbase.zookeeper.ZKUtil.dump(@zk_wrapper)
421 #----------------------------------------------------------------------------------------------
423 def create(table_name, *args)
424 # Fail if table name is not a string
425 raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String)
427 # Flatten params array
428 args = args.flatten.compact
431 # Start defining the table
432 htd = org.apache.hadoop.hbase.HTableDescriptor.new(org.apache.hadoop.hbase.TableName.valueOf(table_name))
434 # Args are either columns or splits, add them to the table definition
435 # TODO: add table options support
437 unless arg.is_a?(String) || arg.is_a?(Hash)
438 raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
441 # First, handle all the cases where arg is a column family.
442 if arg.is_a?(String) || arg.key?(NAME)
443 # If the arg is a string, default action is to add a column to the table.
444 # If arg has a name, it must also be a column descriptor.
445 descriptor = hcd(arg, htd)
446 # Warn if duplicate columns are added
447 if htd.hasFamily(descriptor.getName)
448 puts "Family '" + descriptor.getNameAsString + "' already exists, the old one will be replaced"
449 htd.modifyFamily(descriptor)
451 htd.addFamily(descriptor)
456 if arg.key?(REGION_REPLICATION)
457 region_replication = JInteger.valueOf(arg.delete(REGION_REPLICATION))
458 htd.setRegionReplication(region_replication)
461 # Get rid of the "METHOD", which is deprecated for create.
462 # We'll do whatever it used to do below if it's table_att.
463 if (method = arg.delete(METHOD))
464 raise(ArgumentError, 'table_att is currently the only supported method') unless method == 'table_att'
467 # The hash is not a column family. Figure out what's in it.
468 # First, handle splits.
469 if arg.key?(SPLITS_FILE)
470 splits_file = arg.delete(SPLITS_FILE)
471 unless File.exist?(splits_file)
472 raise(ArgumentError, "Splits file #{splits_file} doesn't exist")
475 File.foreach(splits_file) do |line|
476 arg[SPLITS].push(line.chomp)
478 htd.setValue(SPLITS_FILE, arg[SPLITS_FILE])
482 splits = Java::byte[][arg[SPLITS].size].new
484 arg.delete(SPLITS).each do |split|
485 splits[idx] = org.apache.hadoop.hbase.util.Bytes.toBytesBinary(split)
488 elsif arg.key?(NUMREGIONS) || arg.key?(SPLITALGO)
489 # deprecated region pre-split API; if one of the above is specified, will be ignored.
490 raise(ArgumentError, 'Number of regions must be specified') unless arg.key?(NUMREGIONS)
491 raise(ArgumentError, 'Split algorithm must be specified') unless arg.key?(SPLITALGO)
492 raise(ArgumentError, 'Number of regions must be greater than 1') unless arg[NUMREGIONS] > 1
493 num_regions = arg.delete(NUMREGIONS)
494 split_algo = RegionSplitter.newSplitAlgoInstance(@conf, arg.delete(SPLITALGO))
495 splits = split_algo.split(JInteger.valueOf(num_regions))
498 # Done with splits; apply formerly-table_att parameters.
499 update_htd_from_arg(htd, arg)
501 arg.each_key do |ignored_key|
502 puts(format('An argument ignored (unknown or overridden): %s', ignored_key))
506 # Fail if no column families defined
507 raise(ArgumentError, 'Table must have at least one column family') unless has_columns
510 # Perform the create table call
511 @admin.createTable(htd)
513 # Perform the create table call
514 @admin.createTable(htd, splits)
518 #----------------------------------------------------------------------------------------------
519 #----------------------------------------------------------------------------------------------
521 def assign(region_name)
522 @admin.assign(region_name.to_java_bytes)
525 #----------------------------------------------------------------------------------------------
527 def unassign(region_name, force)
528 @admin.unassign(region_name.to_java_bytes, java.lang.Boolean.valueOf(force))
531 #----------------------------------------------------------------------------------------------
533 def move(encoded_region_name, server = nil)
534 @admin.move(encoded_region_name.to_java_bytes, server ? server.to_java_bytes : nil)
537 #----------------------------------------------------------------------------------------------
538 # Merge multiple regions
539 def merge_region(regions, force)
540 unless regions.is_a?(Array)
541 raise(ArgumentError, "Type of #{regions.inspect} is #{regions.class}, but expected Array")
543 region_array = Java::byte[][regions.length].new
545 while i < regions.length
546 unless regions[i].is_a?(String)
549 "Type of #{regions[i].inspect} is #{regions[i].class}, but expected String"
552 region_array[i] = regions[i].to_java_bytes
555 org.apache.hadoop.hbase.util.FutureUtils.get(
556 @admin.mergeRegionsAsync(
558 java.lang.Boolean.valueOf(force)
563 #----------------------------------------------------------------------------------------------
564 # Returns table's structure description
565 def describe(table_name)
566 tableExists(table_name)
567 @admin.getDescriptor(TableName.valueOf(table_name)).to_s
570 def get_column_families(table_name)
571 tableExists(table_name)
572 @admin.getDescriptor(TableName.valueOf(table_name)).getColumnFamilies
575 def get_table_attributes(table_name)
576 tableExists(table_name)
577 @admin.getDescriptor(TableName.valueOf(table_name)).toStringTableAttributes
580 #----------------------------------------------------------------------------------------------
581 # Enable/disable snapshot auto-cleanup based on TTL expiration
582 # Returns previous snapshot auto-cleanup switch setting.
583 def snapshot_cleanup_switch(enable_disable)
584 @admin.snapshotCleanupSwitch(
585 java.lang.Boolean.valueOf(enable_disable), java.lang.Boolean.valueOf(false)
589 #----------------------------------------------------------------------------------------------
590 # Query the current state of the snapshot auto-cleanup based on TTL
591 # Returns the snapshot auto-cleanup state (true if enabled)
592 def snapshot_cleanup_enabled?
593 @admin.isSnapshotCleanupEnabled
596 #----------------------------------------------------------------------------------------------
597 # Truncates table (deletes all records by recreating the table)
598 def truncate(table_name_str)
599 puts "Truncating '#{table_name_str}' table (it may take a while):"
600 table_name = TableName.valueOf(table_name_str)
602 if enabled?(table_name_str)
603 puts 'Disabling table...'
604 disable(table_name_str)
607 puts 'Truncating table...'
608 @admin.truncateTable(table_name, false)
611 #----------------------------------------------------------------------------------------------
612 # Truncates table while maintaining region boundaries
613 # (deletes all records by recreating the table)
614 def truncate_preserve(table_name_str)
615 puts "Truncating '#{table_name_str}' table (it may take a while):"
616 table_name = TableName.valueOf(table_name_str)
618 if enabled?(table_name_str)
619 puts 'Disabling table...'
620 disable(table_name_str)
623 puts 'Truncating table...'
624 @admin.truncateTable(table_name, true)
627 #----------------------------------------------------------------------------------------------
628 # Check the status of alter command (number of regions reopened)
629 def alter_status(table_name)
630 # Table name should be a string
631 raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String)
634 raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name)
637 cluster_metrics = @admin.getClusterMetrics
638 table_region_status = cluster_metrics
639 .getTableRegionStatesCount
640 .get(org.apache.hadoop.hbase.TableName.valueOf(table_name))
641 if table_region_status.getTotalRegions != 0
642 updated_regions = table_region_status.getTotalRegions -
643 table_region_status.getRegionsInTransition -
644 table_region_status.getClosedRegions
645 puts "#{updated_regions}/#{table_region_status.getTotalRegions} regions updated."
647 puts 'All regions updated.'
650 end while !table_region_status.nil? && table_region_status.getRegionsInTransition != 0
654 #----------------------------------------------------------------------------------------------
655 # Change table structure or table options
656 def alter(table_name_str, wait = true, *args)
657 # Table name should be a string
658 raise(ArgumentError, 'Table name must be of type String') unless
659 table_name_str.is_a?(String)
662 raise(ArgumentError, "Can't find a table: #{table_name_str}") unless exists?(table_name_str)
664 # There should be at least one argument
665 raise(ArgumentError, 'There should be at least one argument but the table name') if args.empty?
667 table_name = TableName.valueOf(table_name_str)
669 # Get table descriptor
670 htd = org.apache.hadoop.hbase.HTableDescriptor.new(@admin.getDescriptor(table_name))
671 hasTableUpdate = false
675 # Normalize args to support column name only alter specs
676 arg = { NAME => arg } if arg.is_a?(String)
678 # Normalize args to support shortcut delete syntax
679 arg = { METHOD => 'delete', NAME => arg['delete'] } if arg['delete']
681 # There are 3 possible options.
682 # 1) Column family spec. Distinguished by having a NAME and no METHOD.
683 method = arg.delete(METHOD)
684 if method.nil? && arg.key?(NAME)
685 descriptor = hcd(arg, htd)
686 column_name = descriptor.getNameAsString
688 # If column already exist, then try to alter it. Create otherwise.
689 if htd.hasFamily(column_name.to_java_bytes)
690 htd.modifyFamily(descriptor)
692 htd.addFamily(descriptor)
694 hasTableUpdate = true
698 # 2) Method other than table_att, with some args.
699 name = arg.delete(NAME)
700 if !method.nil? && method != 'table_att'
701 # Delete column family
702 if method == 'delete'
703 raise(ArgumentError, 'NAME parameter missing for delete method') unless name
704 htd.removeFamily(name.to_java_bytes)
705 hasTableUpdate = true
706 # Unset table attributes
707 elsif method == 'table_att_unset'
708 raise(ArgumentError, 'NAME parameter missing for table_att_unset method') unless name
711 if htd.getValue(key).nil?
712 raise ArgumentError, "Could not find attribute: #{key}"
717 if htd.getValue(name).nil?
718 raise ArgumentError, "Could not find attribute: #{name}"
722 hasTableUpdate = true
723 # Unset table configuration
724 elsif method == 'table_conf_unset'
725 raise(ArgumentError, 'NAME parameter missing for table_conf_unset method') unless name
728 if htd.getConfigurationValue(key).nil?
729 raise ArgumentError, "Could not find configuration: #{key}"
731 htd.removeConfiguration(key)
734 if htd.getConfigurationValue(name).nil?
735 raise ArgumentError, "Could not find configuration: #{name}"
737 htd.removeConfiguration(name)
739 hasTableUpdate = true
742 raise ArgumentError, "Unknown method: #{method}"
745 arg.each_key do |unknown_key|
746 puts(format('Unknown argument ignored: %s', unknown_key))
752 # 3) Some args for the table, optionally with METHOD => table_att (deprecated)
753 update_htd_from_arg(htd, arg)
755 # set a coprocessor attribute
756 valid_coproc_keys = []
757 next unless arg.is_a?(Hash)
758 arg.each do |key, value|
759 k = String.new(key) # prepare to strip
762 next unless k =~ /coprocessor/i
763 v = String.new(value)
765 # TODO: We should not require user to config the coprocessor with our inner format.
766 htd.addCoprocessorWithSpec(v)
767 valid_coproc_keys << key
770 valid_coproc_keys.each do |key|
774 hasTableUpdate = true
776 arg.each_key do |unknown_key|
777 puts(format('Unknown argument ignored: %s', unknown_key))
783 # Bulk apply all table modifications.
785 future = @admin.modifyTableAsync(htd)
788 puts 'Updating all regions with the new schema...'
794 def status(format, type)
795 cluster_metrics = @admin.getClusterMetrics
796 if format == 'detailed'
797 puts(format('version %s', cluster_metrics.getHBaseVersion))
798 # Put regions in transition first because usually empty
799 puts(format('%d regionsInTransition', cluster_metrics.getRegionStatesInTransition.size))
800 for v in cluster_metrics.getRegionStatesInTransition
801 puts(format(' %s', v))
803 master = cluster_metrics.getMasterName
804 puts(format('active master: %s:%d %d', master.getHostname, master.getPort,
805 master.getStartcode))
806 puts(format('%d backup masters', cluster_metrics.getBackupMasterNames.size))
807 for server in cluster_metrics.getBackupMasterNames
808 puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
811 master_coprocs = @admin.getMasterCoprocessorNames.toString
812 unless master_coprocs.nil?
813 puts(format('master coprocessors: %s', master_coprocs))
815 puts(format('%d live servers', cluster_metrics.getLiveServerMetrics.size))
816 for server in cluster_metrics.getLiveServerMetrics.keySet
817 puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
818 puts(format(' %s', cluster_metrics.getLiveServerMetrics.get(server).toString))
819 for name, region in cluster_metrics.getLiveServerMetrics.get(server).getRegionMetrics
820 puts(format(' %s', region.getNameAsString.dump))
821 puts(format(' %s', region.toString))
824 puts(format('%d dead servers', cluster_metrics.getDeadServerNames.size))
825 for server in cluster_metrics.getDeadServerNames
826 puts(format(' %s', server))
828 elsif format == 'replication'
829 puts(format('version %<version>s', version: cluster_metrics.getHBaseVersion))
830 puts(format('%<servers>d live servers',
831 servers: cluster_metrics.getLiveServerMetrics.size))
832 cluster_metrics.getLiveServerMetrics.keySet.each do |server_name|
833 sl = cluster_metrics.getLiveServerMetrics.get(server_name)
834 r_sink_string = ' SINK:'
835 r_source_string = ' SOURCE:'
836 r_load_sink = sl.getReplicationLoadSink
837 next if r_load_sink.nil?
839 r_sink_string << ' AgeOfLastAppliedOp=' +
840 r_load_sink.getAgeOfLastAppliedOp.to_s
841 r_sink_string << ', TimeStampsOfLastAppliedOp=' +
842 java.util.Date.new(r_load_sink
843 .getTimestampsOfLastAppliedOp).toString
844 r_load_source_map = sl.getReplicationLoadSourceMap
845 build_source_string(r_load_source_map, r_source_string)
846 puts(format(' %<host>s:', host: server_name.getHostname))
847 if type.casecmp('SOURCE').zero?
848 puts(format('%<source>s', source: r_source_string))
849 elsif type.casecmp('SINK').zero?
850 puts(format('%<sink>s', sink: r_sink_string))
852 puts(format('%<source>s', source: r_source_string))
853 puts(format('%<sink>s', sink: r_sink_string))
856 elsif format == 'simple'
859 master = cluster_metrics.getMasterName
860 puts(format('active master: %s:%d %d', master.getHostname, master.getPort,
861 master.getStartcode))
862 puts(format('%d backup masters', cluster_metrics.getBackupMasterNames.size))
863 for server in cluster_metrics.getBackupMasterNames
864 puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
866 puts(format('%d live servers', cluster_metrics.getLiveServerMetrics.size))
867 for server in cluster_metrics.getLiveServerMetrics.keySet
868 puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
869 puts(format(' %s', cluster_metrics.getLiveServerMetrics.get(server).toString))
870 load += cluster_metrics.getLiveServerMetrics.get(server).getRequestCountPerSecond
871 regions += cluster_metrics.getLiveServerMetrics.get(server).getRegionMetrics.size
873 puts(format('%d dead servers', cluster_metrics.getDeadServerNames.size))
874 for server in cluster_metrics.getDeadServerNames
875 puts(format(' %s', server))
877 puts(format('Aggregate load: %d, regions: %d', load, regions))
879 puts "1 active master, #{cluster_metrics.getBackupMasterNames.size} backup masters,
880 #{cluster_metrics.getLiveServerMetrics.size} servers,
881 #{cluster_metrics.getDeadServerNames.size} dead,
882 #{format('%.4f', cluster_metrics.getAverageLoad)} average load"
886 def build_source_string(r_load_source_map, r_source_string)
887 r_load_source_map.each do |peer, sources|
888 r_source_string << ' PeerID=' + peer
889 sources.each do |source_load|
890 build_queue_title(source_load, r_source_string)
891 build_running_source_stats(source_load, r_source_string)
896 def build_queue_title(source_load, r_source_string)
897 r_source_string << if source_load.isRecovered
898 "\n Recovered Queue: "
902 r_source_string << source_load.getQueueId
905 def build_running_source_stats(source_load, r_source_string)
906 if source_load.isRunning
907 build_shipped_stats(source_load, r_source_string)
908 build_load_general_stats(source_load, r_source_string)
909 r_source_string << ', Replication Lag=' +
910 source_load.getReplicationLag.to_s
912 r_source_string << "\n "
913 r_source_string << 'No Reader/Shipper threads runnning yet.'
917 def build_shipped_stats(source_load, r_source_string)
918 r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
920 'No Ops shipped since last restart'
922 "\n AgeOfLastShippedOp=" +
923 source_load.getAgeOfLastShippedOp.to_s +
924 ', TimeStampOfLastShippedOp=' +
925 java.util.Date.new(source_load
926 .getTimeStampOfLastShippedOp).toString
930 def build_load_general_stats(source_load, r_source_string)
931 r_source_string << ', SizeOfLogQueue=' +
932 source_load.getSizeOfLogQueue.to_s
933 r_source_string << ', EditsReadFromLogQueue=' +
934 source_load.getEditsRead.to_s
935 r_source_string << ', OpsShippedToTarget=' +
936 source_load.getOPsShipped.to_s
937 build_edits_for_source(source_load, r_source_string)
940 def build_edits_for_source(source_load, r_source_string)
941 if source_load.hasEditsSinceRestart
942 r_source_string << ', TimeStampOfNextToReplicate=' +
943 java.util.Date.new(source_load
944 .getTimeStampOfNextToReplicate).toString
946 r_source_string << ', No edits for this source'
947 r_source_string << ' since it started'
951 #----------------------------------------------------------------------------------------------
957 def exists?(table_name)
958 @admin.tableExists(TableName.valueOf(table_name))
961 #----------------------------------------------------------------------------------------------
963 def enabled?(table_name)
964 @admin.isTableEnabled(TableName.valueOf(table_name))
967 #----------------------------------------------------------------------------------------------
968 # Return a new HColumnDescriptor made of passed args
970 # String arg, single parameter constructor
971 return org.apache.hadoop.hbase.HColumnDescriptor.new(arg) if arg.is_a?(String)
973 raise(ArgumentError, "Column family #{arg} must have a name") unless name = arg.delete(NAME)
975 family = htd.getFamily(name.to_java_bytes)
976 # create it if it's a new family
977 family ||= org.apache.hadoop.hbase.HColumnDescriptor.new(name.to_java_bytes)
979 family.setBlockCacheEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE)
980 family.setScope(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE)
981 family.setCacheDataOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE)
982 family.setCacheIndexesOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_INDEX_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_INDEX_ON_WRITE)
983 family.setCacheBloomsOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_BLOOMS_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_BLOOMS_ON_WRITE)
984 family.setEvictBlocksOnClose(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::EVICT_BLOCKS_ON_CLOSE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::EVICT_BLOCKS_ON_CLOSE)
985 family.setInMemory(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY)
986 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION)
987 family.setInMemoryCompaction(
988 org.apache.hadoop.hbase.MemoryCompactionPolicy.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION))
991 family.setTimeToLive(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::TTL)) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL)
992 family.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING)
993 family.setBlocksize(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
994 family.setMaxVersions(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
995 family.setMinVersions(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
996 family.setKeepDeletedCells(org.apache.hadoop.hbase.KeepDeletedCells.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS).to_s.upcase)) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS)
997 family.setCompressTags(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESS_TAGS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESS_TAGS)
998 family.setPrefetchBlocksOnOpen(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::PREFETCH_BLOCKS_ON_OPEN))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::PREFETCH_BLOCKS_ON_OPEN)
999 family.setMobEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IS_MOB))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IS_MOB)
1000 family.setMobThreshold(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_THRESHOLD))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_THRESHOLD)
1001 family.setNewVersionBehavior(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::NEW_VERSION_BEHAVIOR))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::NEW_VERSION_BEHAVIOR)
1002 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER)
1003 bloomtype = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER).upcase.to_sym
1004 if org.apache.hadoop.hbase.regionserver.BloomType.constants.include?(bloomtype)
1005 family.setBloomFilterType(org.apache.hadoop.hbase.regionserver.BloomType.valueOf(bloomtype))
1007 raise(ArgumentError, "BloomFilter type #{bloomtype} is not supported. Use one of " + org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.join(' '))
1010 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION)
1011 compression = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION).upcase.to_sym
1012 if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression)
1013 family.setCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression))
1015 raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
1018 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION)
1019 algorithm = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION).upcase
1020 family.setEncryptionType(algorithm)
1021 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION_KEY)
1022 key = org.apache.hadoop.hbase.io.crypto.Encryption.pbkdf128(
1023 arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION_KEY)
1025 family.setEncryptionKey(org.apache.hadoop.hbase.security.EncryptionUtil.wrapKey(@conf, key,
1029 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT)
1030 compression = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT).upcase.to_sym
1031 if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression)
1032 family.setCompactionCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression))
1034 raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
1037 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY)
1038 storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
1039 family.setStoragePolicy(storage_policy)
1041 if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY)
1042 mob_partition_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY).upcase.to_sym
1043 if org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.include?(mob_partition_policy)
1044 family.setMobCompactPartitionPolicy(org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.valueOf(mob_partition_policy))
1046 raise(ArgumentError, "MOB_COMPACT_PARTITION_POLICY #{mob_partition_policy} is not supported. Use one of " + org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.join(' '))
1050 set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
1051 set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
1052 if arg.include?(org.apache.hadoop.hbase
1053 .HColumnDescriptor::DFS_REPLICATION)
1054 family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase
1055 .HColumnDescriptor::DFS_REPLICATION)))
1058 arg.each_key do |unknown_key|
1059 puts(format('Unknown argument ignored for column family %s: %s', name, unknown_key))
1065 #----------------------------------------------------------------------------------------------
1066 # Enables/disables a region by name
1067 def online(region_name, on_off)
1069 meta = @connection.getTable(org.apache.hadoop.hbase.TableName::META_TABLE_NAME)
1072 # FIXME: fail gracefully if can't find the region
1073 region_bytes = region_name.to_java_bytes
1074 g = org.apache.hadoop.hbase.client.Get.new(region_bytes)
1075 g.addColumn(org.apache.hadoop.hbase.HConstants::CATALOG_FAMILY, org.apache.hadoop.hbase.HConstants::REGIONINFO_QUALIFIER)
1076 hri_bytes = meta.get(g).value
1078 # Change region status
1079 hri = org.apache.hadoop.hbase.util.Writables.getWritable(hri_bytes, org.apache.hadoop.hbase.HRegionInfo.new)
1080 hri.setOffline(on_off)
1083 put = org.apache.hadoop.hbase.client.Put.new(region_bytes)
1084 put.addColumn(org.apache.hadoop.hbase.HConstants::CATALOG_FAMILY,
1085 org.apache.hadoop.hbase.HConstants::REGIONINFO_QUALIFIER,
1086 org.apache.hadoop.hbase.util.Writables.getBytes(hri))
1090 # Apply user metadata to table/column descriptor
1091 def set_user_metadata(descriptor, metadata)
1092 raise(ArgumentError, "#{METADATA} must be a Hash type") unless metadata.is_a?(Hash)
1093 for k, v in metadata
1094 v = v.to_s unless v.nil?
1095 descriptor.setValue(k, v)
1099 #----------------------------------------------------------------------------------------------
1100 # Take a snapshot of specified table
1101 def snapshot(table, snapshot_name, *args)
1102 # Table name should be a string
1103 raise(ArgumentError, 'Table name must be of type String') unless table.is_a?(String)
1105 # Snapshot name should be a string
1106 raise(ArgumentError, 'Snapshot name must be of type String') unless
1107 snapshot_name.is_a?(String)
1109 table_name = TableName.valueOf(table)
1111 @admin.snapshot(snapshot_name, table_name)
1115 ttl = ttl ? ttl.to_java(:long) : -1
1116 snapshot_props = java.util.HashMap.new
1117 snapshot_props.put("TTL", ttl)
1118 if arg[SKIP_FLUSH] == true
1119 @admin.snapshot(snapshot_name, table_name,
1120 org.apache.hadoop.hbase.client.SnapshotType::SKIPFLUSH, snapshot_props)
1122 @admin.snapshot(snapshot_name, table_name, snapshot_props)
1128 #----------------------------------------------------------------------------------------------
1129 # Restore specified snapshot
1130 def restore_snapshot(snapshot_name, restore_acl = false)
1131 conf = @connection.getConfiguration
1132 take_fail_safe_snapshot = conf.getBoolean('hbase.snapshot.restore.take.failsafe.snapshot', false)
1133 @admin.restoreSnapshot(snapshot_name, take_fail_safe_snapshot, restore_acl)
1136 #----------------------------------------------------------------------------------------------
1137 # Create a new table by cloning the snapshot content
1138 def clone_snapshot(snapshot_name, table, restore_acl = false)
1139 @admin.cloneSnapshot(snapshot_name, TableName.valueOf(table), restore_acl)
1142 #----------------------------------------------------------------------------------------------
1143 # Delete specified snapshot
1144 def delete_snapshot(snapshot_name)
1145 @admin.deleteSnapshot(snapshot_name)
1148 #----------------------------------------------------------------------------------------------
1149 # Deletes the snapshots matching the given regex
1150 def delete_all_snapshot(regex)
1151 @admin.deleteSnapshots(Pattern.compile(regex)).to_a
1154 #----------------------------------------------------------------------------------------------
1155 # Deletes the table snapshots matching the given regex
1156 def delete_table_snapshots(tableNameRegex, snapshotNameRegex = '.*')
1157 @admin.deleteTableSnapshots(Pattern.compile(tableNameRegex),
1158 Pattern.compile(snapshotNameRegex)).to_a
1161 #----------------------------------------------------------------------------------------------
1162 # Returns a list of snapshots
1163 def list_snapshot(regex = '.*')
1164 @admin.listSnapshots(Pattern.compile(regex)).to_a
1167 #----------------------------------------------------------------------------------------------
1168 # Returns a list of table snapshots
1169 def list_table_snapshots(tableNameRegex, snapshotNameRegex = '.*')
1170 @admin.listTableSnapshots(Pattern.compile(tableNameRegex),
1171 Pattern.compile(snapshotNameRegex)).to_a
1174 #----------------------------------------------------------------------------------------------
1175 # Returns the whole ClusterMetrics containing details:
1179 # primary/backup master(s)
1180 # master's coprocessors
1181 # live/dead regionservers
1183 # regions in transition
1184 def getClusterMetrics
1185 @admin.getClusterMetrics
1188 #----------------------------------------------------------------------------------------------
1189 # Returns a list of regionservers
1190 def getRegionServers
1191 @admin.getClusterMetrics.getLiveServerMetrics.keySet.map { |server_name| server_name }
1194 #----------------------------------------------------------------------------------------------
1195 # Returns servername corresponding to passed server_name_string
1196 def getServerName(server_name_string)
1197 regionservers = getRegionServers
1199 if ServerName.isFullServerName(server_name_string)
1200 return ServerName.valueOf(server_name_string)
1202 name_list = server_name_string.split(',')
1204 regionservers.each do|sn|
1205 if name_list[0] == sn.hostname && (name_list[1].nil? ? true : (name_list[1] == sn.port.to_s))
1214 #----------------------------------------------------------------------------------------------
1215 # Returns a list of servernames
1216 def getServerNames(servers, should_return_all_if_servers_empty)
1217 regionservers = getRegionServers
1221 # if no servers were specified as arguments, get a list of all servers
1222 if should_return_all_if_servers_empty
1223 servernames = regionservers
1226 # Strings replace with ServerName objects in servers array
1228 while i < servers.length
1231 if ServerName.isFullServerName(server)
1232 servernames.push(ServerName.valueOf(server))
1234 name_list = server.split(',')
1236 while j < regionservers.length
1237 sn = regionservers[j]
1238 if name_list[0] == sn.hostname && (name_list[1].nil? ? true : (name_list[1] == sn.port.to_s))
1239 servernames.push(sn)
1251 # Apply config specific to a table/column to its descriptor
1252 def set_descriptor_config(descriptor, config)
1253 raise(ArgumentError, "#{CONFIGURATION} must be a Hash type") unless config.is_a?(Hash)
1255 v = v.to_s unless v.nil?
1256 descriptor.setConfiguration(k, v)
1260 #----------------------------------------------------------------------------------------------
1261 # Updates the configuration of one regionserver.
1262 def update_config(serverName)
1263 @admin.updateConfiguration(ServerName.valueOf(serverName))
1266 #----------------------------------------------------------------------------------------------
1267 # Updates the configuration of all the regionservers.
1268 def update_all_config
1269 @admin.updateConfiguration
1272 #----------------------------------------------------------------------------------------------
1273 # Returns namespace's structure description
1274 def describe_namespace(namespace_name)
1275 namespace = @admin.getNamespaceDescriptor(namespace_name)
1277 return namespace.to_s unless namespace.nil?
1279 raise(ArgumentError, "Failed to find namespace named #{namespace_name}")
1282 #----------------------------------------------------------------------------------------------
1283 # Returns a list of namespaces in hbase
1284 def list_namespace(regex = '.*')
1285 pattern = java.util.regex.Pattern.compile(regex)
1286 list = @admin.listNamespaces
1287 list.select { |s| pattern.match(s) }
1290 #----------------------------------------------------------------------------------------------
1291 # Returns a list of tables in namespace
1292 def list_namespace_tables(namespace_name)
1293 unless namespace_name.nil?
1294 return @admin.listTableNamesByNamespace(namespace_name).map(&:getQualifierAsString)
1297 raise(ArgumentError, "Failed to find namespace named #{namespace_name}")
1300 #----------------------------------------------------------------------------------------------
1301 # Creates a namespace
1302 def create_namespace(namespace_name, *args)
1303 # Fail if table name is not a string
1304 raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String)
1306 # Flatten params array
1307 args = args.flatten.compact
1309 # Start defining the table
1310 nsb = org.apache.hadoop.hbase.NamespaceDescriptor.create(namespace_name)
1312 unless arg.is_a?(Hash)
1313 raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
1316 v = v.to_s unless v.nil?
1317 nsb.addConfiguration(k, v)
1320 @admin.createNamespace(nsb.build)
1323 #----------------------------------------------------------------------------------------------
1324 # modify a namespace
1325 def alter_namespace(namespace_name, *args)
1326 # Fail if table name is not a string
1327 raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String)
1329 nsd = @admin.getNamespaceDescriptor(namespace_name)
1331 raise(ArgumentError, 'Namespace does not exist') unless nsd
1332 nsb = org.apache.hadoop.hbase.NamespaceDescriptor.create(nsd)
1334 # Flatten params array
1335 args = args.flatten.compact
1337 # Start defining the table
1339 unless arg.is_a?(Hash)
1340 raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash type")
1342 method = arg[METHOD]
1343 if method == 'unset'
1344 nsb.removeConfiguration(arg[NAME])
1345 elsif method == 'set'
1348 v = v.to_s unless v.nil?
1350 nsb.addConfiguration(k, v)
1353 raise(ArgumentError, "Unknown method #{method}")
1356 @admin.modifyNamespace(nsb.build)
1359 #----------------------------------------------------------------------------------------------
1361 def drop_namespace(namespace_name)
1362 @admin.deleteNamespace(namespace_name)
1365 #----------------------------------------------------------------------------------------------
1366 # Get security capabilities
1367 def get_security_capabilities
1368 @admin.getSecurityCapabilities
1371 # List all procedures
1373 @admin.getProcedures
1381 # Parse arguments and update HTableDescriptor accordingly
1382 def update_htd_from_arg(htd, arg)
1383 htd.setOwnerString(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::OWNER)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::OWNER)
1384 htd.setMaxFileSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MAX_FILESIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MAX_FILESIZE)
1385 htd.setReadOnly(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::READONLY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::READONLY)
1386 htd.setCompactionEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::COMPACTION_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::COMPACTION_ENABLED)
1387 htd.setSplitEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_ENABLED)
1388 htd.setMergeEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MERGE_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MERGE_ENABLED)
1389 htd.setNormalizationEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZATION_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZATION_ENABLED)
1390 htd.setNormalizerTargetRegionCount(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_COUNT))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_COUNT)
1391 htd.setNormalizerTargetRegionSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_SIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_SIZE)
1392 htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MEMSTORE_FLUSHSIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MEMSTORE_FLUSHSIZE)
1393 htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::DURABILITY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::DURABILITY)
1394 htd.setPriority(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::PRIORITY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::PRIORITY)
1395 htd.setFlushPolicyClassName(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::FLUSH_POLICY)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::FLUSH_POLICY)
1396 htd.setRegionMemstoreReplication(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::REGION_MEMSTORE_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::REGION_MEMSTORE_REPLICATION)
1397 htd.setRegionSplitPolicyClassName(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_POLICY)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_POLICY)
1398 htd.setRegionReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::REGION_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::REGION_REPLICATION)
1399 set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
1400 set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
1403 #----------------------------------------------------------------------------------------------
1404 # clear compaction queues
1405 def clear_compaction_queues(server_name, queue_name = nil)
1406 names = %w[long short]
1407 queues = java.util.HashSet.new
1411 elsif queue_name.is_a?(String)
1412 queues.add(queue_name)
1413 unless names.include?(queue_name)
1414 raise(ArgumentError, "Unknown queue name #{queue_name}")
1416 elsif queue_name.is_a?(Array)
1417 queue_name.each do |s|
1419 unless names.include?(s)
1420 raise(ArgumentError, "Unknown queue name #{s}")
1424 raise(ArgumentError, "Unknown queue name #{queue_name}")
1426 @admin.clearCompactionQueues(ServerName.valueOf(server_name), queues)
1429 #----------------------------------------------------------------------------------------------
1430 # clear dead region servers
1431 def list_deadservers
1432 @admin.listDeadServers.to_a
1435 #----------------------------------------------------------------------------------------------
1436 # clear dead region servers
1437 def clear_deadservers(dead_servers)
1438 # Flatten params array
1439 dead_servers = dead_servers.flatten.compact
1440 if dead_servers.empty?
1441 servers = list_deadservers
1443 servers = java.util.ArrayList.new
1444 dead_servers.each do |s|
1445 servers.add(ServerName.valueOf(s))
1448 @admin.clearDeadServers(servers).to_a
1451 #----------------------------------------------------------------------------------------------
1452 # List live region servers
1453 def list_liveservers
1454 @admin.getClusterMetrics.getLiveServerMetrics.keySet.to_a
1457 #---------------------------------------------------------------------------
1458 # create a new table by cloning the existent table schema.
1459 def clone_table_schema(table_name, new_table_name, preserve_splits = true)
1460 @admin.cloneTableSchema(TableName.valueOf(table_name),
1461 TableName.valueOf(new_table_name),
1465 #----------------------------------------------------------------------------------------------
1466 # List decommissioned RegionServers
1467 def list_decommissioned_regionservers
1468 @admin.listDecommissionedRegionServers
1471 #----------------------------------------------------------------------------------------------
1472 # Retrieve SlowLog Responses from RegionServers
1473 def get_slowlog_responses(server_names, args)
1474 unless server_names.is_a?(Array) || server_names.is_a?(String)
1475 raise(ArgumentError,
1476 "#{server_names.class} of #{server_names.inspect} is not of Array/String type")
1478 if server_names == '*'
1479 server_names = getServerNames([], true)
1481 server_names_list = to_server_names(server_names)
1482 server_names = getServerNames(server_names_list, false)
1484 filter_params = get_filter_params(args)
1485 slow_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names),
1487 slow_log_responses_arr = []
1488 for slow_log_response in slow_log_responses
1489 slow_log_responses_arr << slow_log_response.toJsonPrettyPrint
1491 puts 'Retrieved SlowLog Responses from RegionServers'
1492 puts slow_log_responses_arr
1495 def get_filter_params(args)
1496 filter_params = org.apache.hadoop.hbase.client.SlowLogQueryFilter.new
1497 if args.key? 'REGION_NAME'
1498 region_name = args['REGION_NAME']
1499 filter_params.setRegionName(region_name)
1501 if args.key? 'TABLE_NAME'
1502 table_name = args['TABLE_NAME']
1503 filter_params.setTableName(table_name)
1505 if args.key? 'CLIENT_IP'
1506 client_ip = args['CLIENT_IP']
1507 filter_params.setClientAddress(client_ip)
1511 filter_params.setUserName(user)
1513 if args.key? 'LIMIT'
1514 limit = args['LIMIT']
1515 filter_params.setLimit(limit)
1520 #----------------------------------------------------------------------------------------------
1521 # Clears SlowLog Responses from RegionServers
1522 def clear_slowlog_responses(server_names)
1523 unless server_names.nil? || server_names.is_a?(Array) || server_names.is_a?(String)
1524 raise(ArgumentError,
1525 "#{server_names.class} of #{server_names.inspect} is not of correct type")
1527 if server_names.nil?
1528 server_names = getServerNames([], true)
1530 server_names_list = to_server_names(server_names)
1531 server_names = getServerNames(server_names_list, false)
1533 clear_log_responses = @admin.clearSlowLogResponses(java.util.HashSet.new(server_names))
1534 clear_log_success_count = 0
1535 clear_log_responses.each do |response|
1537 clear_log_success_count += 1
1540 puts 'Cleared Slowlog responses from ' \
1541 "#{clear_log_success_count}/#{clear_log_responses.size} RegionServers"
1544 #----------------------------------------------------------------------------------------------
1545 # Decommission a list of region servers, optionally offload corresponding regions
1546 def decommission_regionservers(host_or_servers, should_offload)
1547 # Fail if host_or_servers is neither a string nor an array
1548 unless host_or_servers.is_a?(Array) || host_or_servers.is_a?(String)
1549 raise(ArgumentError,
1550 "#{host_or_servers.class} of #{host_or_servers.inspect} is not of Array/String type")
1553 # Fail if should_offload is neither a TrueClass/FalseClass nor a string
1554 unless (!!should_offload == should_offload) || should_offload.is_a?(String)
1555 raise(ArgumentError, "#{should_offload} is not a boolean value")
1558 # If a string is passed, convert it to an array
1559 _host_or_servers = host_or_servers.is_a?(Array) ?
1561 java.util.Arrays.asList(host_or_servers)
1563 # Retrieve the server names corresponding to passed _host_or_servers list
1564 server_names = getServerNames(_host_or_servers, false)
1566 # Fail, if we can not find any server(s) corresponding to the passed host_or_servers
1567 if server_names.empty?
1568 raise(ArgumentError,
1569 "Could not find any server(s) with specified name(s): #{host_or_servers}")
1572 @admin.decommissionRegionServers(server_names,
1573 java.lang.Boolean.valueOf(should_offload))
1576 #----------------------------------------------------------------------------------------------
1577 # Recommission a region server, optionally load a list of passed regions
1578 def recommission_regionserver(server_name_string, encoded_region_names)
1579 # Fail if server_name_string is not a string
1580 unless server_name_string.is_a?(String)
1581 raise(ArgumentError,
1582 "#{server_name_string.class} of #{server_name_string.inspect} is not of String type")
1585 # Fail if encoded_region_names is not an array
1586 unless encoded_region_names.is_a?(Array)
1587 raise(ArgumentError,
1588 "#{encoded_region_names.class} of #{encoded_region_names.inspect} is not of Array type")
1591 # Convert encoded_region_names from string to bytes (element-wise)
1592 region_names_in_bytes = encoded_region_names
1593 .map {|region_name| region_name.to_java_bytes}
1596 # Retrieve the server name corresponding to the passed server_name_string
1597 server_name = getServerName(server_name_string)
1599 # Fail if we can not find a server corresponding to the passed server_name_string
1601 raise(ArgumentError,
1602 "Could not find any server with name #{server_name_string}")
1605 @admin.recommissionRegionServer(server_name, region_names_in_bytes)
1608 #----------------------------------------------------------------------------------------------
1609 # Stop the active Master
1614 # Stop the given RegionServer
1615 def stop_regionserver(hostport)
1616 @admin.stopRegionServer(hostport)
1619 #----------------------------------------------------------------------------------------------
1620 # Get list of server names
1621 def to_server_names(server_names)
1622 if server_names.is_a?(Array)
1625 java.util.Arrays.asList(server_names)
1629 # rubocop:enable Metrics/ClassLength