HBASE-22002 Remove the deprecated methods in Admin interface
[hbase.git] / hbase-shell / src / main / ruby / hbase / admin.rb
blob31545b5acc15ff71594e9bdcab41bd2c2e760ab0
3 # Licensed to the Apache Software Foundation (ASF) under one
4 # or more contributor license agreements.  See the NOTICE file
5 # distributed with this work for additional information
6 # regarding copyright ownership.  The ASF licenses this file
7 # to you under the Apache License, Version 2.0 (the
8 # "License"); you may not use this file except in compliance
9 # with the License.  You may obtain a copy of the License at
11 #     http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
20 include Java
21 java_import java.util.Arrays
22 java_import java.util.regex.Pattern
23 java_import org.apache.hadoop.hbase.util.Pair
24 java_import org.apache.hadoop.hbase.util.RegionSplitter
25 java_import org.apache.hadoop.hbase.util.Bytes
26 java_import org.apache.hadoop.hbase.ServerName
27 java_import org.apache.hadoop.hbase.TableName
29 # Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
31 module Hbase
32   # rubocop:disable Metrics/ClassLength
33   class Admin
34     include HBaseConstants
36     def initialize(connection)
37       @connection = connection
38       # Java Admin instance
39       @admin = @connection.getAdmin
40       @conf = @connection.getConfiguration
41     end
43     def close
44       @admin.close
45     end
47     #----------------------------------------------------------------------------------------------
48     # Returns a list of tables in hbase
49     def list(regex = '.*')
50       @admin.listTableNames(Pattern.compile(regex)).map(&:getNameAsString)
51     end
53     #----------------------------------------------------------------------------------------------
54     # Requests a table or region or region server flush
55     def flush(name)
56       @admin.flushRegion(name.to_java_bytes)
57     rescue java.lang.IllegalArgumentException
58       # Unknown region. Try table.
59       begin
60         @admin.flush(TableName.valueOf(name))
61       rescue java.lang.IllegalArgumentException
62         # Unknown table. Try region server.
63         @admin.flushRegionServer(ServerName.valueOf(name))
64       end
65     end
67     #----------------------------------------------------------------------------------------------
68     # Requests a table or region or column family compaction
69     def compact(table_or_region_name, family = nil, type = 'NORMAL')
70       family_bytes = nil
71       family_bytes = family.to_java_bytes unless family.nil?
72       compact_type = nil
73       if type == 'NORMAL'
74         compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL
75       elsif type == 'MOB'
76         compact_type = org.apache.hadoop.hbase.client.CompactType::MOB
77       else
78         raise ArgumentError, 'only NORMAL or MOB accepted for type!'
79       end
81       begin
82         @admin.compactRegion(table_or_region_name.to_java_bytes, family_bytes)
83       rescue java.lang.IllegalArgumentException => e
84         @admin.compact(TableName.valueOf(table_or_region_name), family_bytes, compact_type)
85       end
86     end
88     #----------------------------------------------------------------------------------------------
89     # Switch compaction on/off at runtime on a region server
90     def compaction_switch(on_or_off, regionserver_names)
91       region_servers = regionserver_names.flatten.compact
92       servers = java.util.ArrayList.new
93       if region_servers.any?
94         region_servers.each do |s|
95           servers.add(s)
96         end
97       end
98       @admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers)
99     end
101     #----------------------------------------------------------------------------------------------
102     # Gets compaction state for specified table
103     def getCompactionState(table_name)
104       @admin.getCompactionState(TableName.valueOf(table_name)).name
105     end
107     # Requests to compact all regions on the regionserver
108     def compact_regionserver(servername, major = false)
109       @admin.compactRegionServer(ServerName.valueOf(servername), major)
110     end
112     #----------------------------------------------------------------------------------------------
113     # Requests a table or region or column family major compaction
114     def major_compact(table_or_region_name, family = nil, type = 'NORMAL')
115       family_bytes = nil
116       family_bytes = family.to_java_bytes unless family.nil?
117       compact_type = nil
118       if type == 'NORMAL'
119         compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL
120       elsif type == 'MOB'
121         compact_type = org.apache.hadoop.hbase.client.CompactType::MOB
122       else
123         raise ArgumentError, 'only NORMAL or MOB accepted for type!'
124       end
126       begin
127         @admin.majorCompactRegion(table_or_region_name.to_java_bytes, family_bytes)
128       rescue java.lang.IllegalArgumentException => e
129         @admin.majorCompact(TableName.valueOf(table_or_region_name), family_bytes, compact_type)
130       end
131     end
133     #----------------------------------------------------------------------------------------------
134     # Requests a regionserver's WAL roll
135     def wal_roll(server_name)
136       @admin.rollWALWriter(ServerName.valueOf(server_name))
137     end
138     # TODO: remove older hlog_roll version
139     alias hlog_roll wal_roll
141     #----------------------------------------------------------------------------------------------
142     # Requests a table or region split
143     def split(table_or_region_name, split_point = nil)
144       split_point_bytes = nil
145       split_point_bytes = split_point.to_java_bytes unless split_point.nil?
146       begin
147         @admin.splitRegionAsync(table_or_region_name.to_java_bytes, split_point_bytes).get
148       rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException => e
149         @admin.split(TableName.valueOf(table_or_region_name), split_point_bytes)
150       end
151     end
153     #----------------------------------------------------------------------------------------------
154     # Enable/disable one split or merge switch
155     # Returns previous switch setting.
156     def splitormerge_switch(type, enabled)
157       if type == 'SPLIT'
158         @admin.splitSwitch(java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false))
159       elsif type == 'MERGE'
160         @admin.mergeSwitch(java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false))
161       else
162         raise ArgumentError, 'only SPLIT or MERGE accepted for type!'
163       end
164     end
166     #----------------------------------------------------------------------------------------------
167     # Query the current state of the split or merge switch.
168     # Returns the switch's state (true is enabled).
169     def splitormerge_enabled(type)
170       if type == 'SPLIT'
171         @admin.isSplitEnabled
172       elsif type == 'MERGE'
173         @admin.isMergeEnabled
174       else
175         raise ArgumentError, 'only SPLIT or MERGE accepted for type!'
176       end
177     end
179     def locate_region(table_name, row_key)
180       locator = @connection.getRegionLocator(TableName.valueOf(table_name))
181       begin
182         return locator.getRegionLocation(Bytes.toBytesBinary(row_key))
183       ensure
184         locator.close
185       end
186     end
188     #----------------------------------------------------------------------------------------------
189     # Requests a cluster balance
190     # Returns true if balancer ran
191     def balancer(force)
192       @admin.balancer(java.lang.Boolean.valueOf(force))
193     end
195     #----------------------------------------------------------------------------------------------
196     # Enable/disable balancer
197     # Returns previous balancer switch setting.
198     def balance_switch(enableDisable)
199       @admin.balancerSwitch(
200         java.lang.Boolean.valueOf(enableDisable), java.lang.Boolean.valueOf(false)
201       )
202     end
204     #----------------------------------------------------------------------------------------------
205     # Query the current state of the LoadBalancer.
206     # Returns the balancer's state (true is enabled).
207     def balancer_enabled?
208       @admin.isBalancerEnabled
209     end
211     #----------------------------------------------------------------------------------------------
212     # Requests clear block cache for table
213     def clear_block_cache(table_name)
214       @admin.clearBlockCache(org.apache.hadoop.hbase.TableName.valueOf(table_name)).toString
215     end
217     #----------------------------------------------------------------------------------------------
218     # Requests region normalization for all configured tables in the cluster
219     # Returns true if normalizer ran successfully
220     def normalize
221       @admin.normalize
222     end
224     #----------------------------------------------------------------------------------------------
225     # Enable/disable region normalizer
226     # Returns previous normalizer switch setting.
227     def normalizer_switch(enableDisable)
228       @admin.normalizerSwitch(java.lang.Boolean.valueOf(enableDisable))
229     end
231     #----------------------------------------------------------------------------------------------
232     # Query the current state of region normalizer.
233     # Returns the state of region normalizer (true is enabled).
234     def normalizer_enabled?
235       @admin.isNormalizerEnabled
236     end
238     #----------------------------------------------------------------------------------------------
239     # Query the current state of master in maintenance mode.
240     # Returns the state of maintenance mode (true is on).
241     def in_maintenance_mode?
242       @admin.isMasterInMaintenanceMode
243     end
245     #----------------------------------------------------------------------------------------------
246     # Request a scan of the catalog table (for garbage collection)
247     # Returns an int signifying the number of entries cleaned
248     def catalogjanitor_run
249       @admin.runCatalogJanitor
250     end
252     #----------------------------------------------------------------------------------------------
253     # Enable/disable the catalog janitor
254     # Returns previous catalog janitor switch setting.
255     def catalogjanitor_switch(enableDisable)
256       @admin.catalogJanitorSwitch(java.lang.Boolean.valueOf(enableDisable))
257     end
259     #----------------------------------------------------------------------------------------------
260     # Query on the catalog janitor state (enabled/disabled?)
261     # Returns catalog janitor state (true signifies enabled).
262     def catalogjanitor_enabled
263       @admin.isCatalogJanitorEnabled
264     end
266     #----------------------------------------------------------------------------------------------
267     # Request cleaner chore to run (for garbage collection of HFiles and WAL files)
268     def cleaner_chore_run
269       @admin.runCleanerChore
270     end
272     #----------------------------------------------------------------------------------------------
273     # Enable/disable the cleaner chore
274     # Returns previous cleaner switch setting.
275     def cleaner_chore_switch(enableDisable)
276       @admin.cleanerChoreSwitch(java.lang.Boolean.valueOf(enableDisable))
277     end
279     #----------------------------------------------------------------------------------------------
280     # Query on the cleaner chore state (enabled/disabled?)
281     # Returns cleaner state (true signifies enabled).
282     def cleaner_chore_enabled
283       @admin.isCleanerChoreEnabled
284     end
286     #----------------------------------------------------------------------------------------------
287     # Enables a table
288     def enable(table_name)
289       tableExists(table_name)
290       return if enabled?(table_name)
291       @admin.enableTable(TableName.valueOf(table_name))
292     end
294     #----------------------------------------------------------------------------------------------
295     # Enables all tables matching the given regex
296     def enable_all(regex)
297       pattern = Pattern.compile(regex.to_s)
298       failed = java.util.ArrayList.new
299       admin.listTableNames(pattern).each do |table_name|
300         begin
301           admin.enableTable(table_name)
302         rescue java.io.IOException => e
303           puts "table:#{table_name}, error:#{e.toString}"
304           failed.add(table_name)
305         end
306       end
307       @failed
308     end
310     #----------------------------------------------------------------------------------------------
311     # Disables a table
312     def disable(table_name)
313       tableExists(table_name)
314       return if disabled?(table_name)
315       @admin.disableTable(TableName.valueOf(table_name))
316     end
318     #----------------------------------------------------------------------------------------------
319     # Disables all tables matching the given regex
320     def disable_all(regex)
321       pattern = Pattern.compile(regex.to_s)
322       failed = java.util.ArrayList.new
323       admin.listTableNames(pattern).each do |table_name|
324         begin
325           admin.disableTable(table_name)
326         rescue java.io.IOException => e
327           puts "table:#{table_name}, error:#{e.toString}"
328           failed.add(table_name)
329         end
330       end
331       @failed
332     end
334     #---------------------------------------------------------------------------------------------
335     # Throw exception if table doesn't exist
336     def tableExists(table_name)
337       raise ArgumentError, "Table #{table_name} does not exist." unless exists?(table_name)
338     end
340     #----------------------------------------------------------------------------------------------
341     # Is table disabled?
342     def disabled?(table_name)
343       @admin.isTableDisabled(TableName.valueOf(table_name))
344     end
346     #----------------------------------------------------------------------------------------------
347     # Drops a table
348     def drop(table_name)
349       tableExists(table_name)
350       raise ArgumentError, "Table #{table_name} is enabled. Disable it first." if enabled?(
351         table_name
352       )
354       @admin.deleteTable(org.apache.hadoop.hbase.TableName.valueOf(table_name))
355     end
357     #----------------------------------------------------------------------------------------------
358     # Drops a table
359     def drop_all(regex)
360       pattern = Pattern.compile(regex.to_s)
361       failed = java.util.ArrayList.new
362       admin.listTableNames(pattern).each do |table_name|
363         begin
364           admin.deleteTable(table_name)
365         rescue java.io.IOException => e
366           puts puts "table:#{table_name}, error:#{e.toString}"
367           failed.add(table_name)
368         end
369       end
370       @failed
371     end
373     #----------------------------------------------------------------------------------------------
374     # Returns ZooKeeper status dump
375     def zk_dump
376       @zk_wrapper = org.apache.hadoop.hbase.zookeeper.ZKWatcher.new(
377         @admin.getConfiguration,
378         'admin',
379         nil
380       )
381       zk = @zk_wrapper.getRecoverableZooKeeper.getZooKeeper
382       @zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk)
383       org.apache.hadoop.hbase.zookeeper.ZKUtil.dump(@zk_wrapper)
384     end
386     #----------------------------------------------------------------------------------------------
387     # Creates a table
388     def create(table_name, *args)
389       # Fail if table name is not a string
390       raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String)
392       # Flatten params array
393       args = args.flatten.compact
394       has_columns = false
396       # Start defining the table
397       htd = org.apache.hadoop.hbase.HTableDescriptor.new(org.apache.hadoop.hbase.TableName.valueOf(table_name))
398       splits = nil
399       # Args are either columns or splits, add them to the table definition
400       # TODO: add table options support
401       args.each do |arg|
402         unless arg.is_a?(String) || arg.is_a?(Hash)
403           raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
404         end
406         # First, handle all the cases where arg is a column family.
407         if arg.is_a?(String) || arg.key?(NAME)
408           # If the arg is a string, default action is to add a column to the table.
409           # If arg has a name, it must also be a column descriptor.
410           descriptor = hcd(arg, htd)
411           # Warn if duplicate columns are added
412           if htd.hasFamily(descriptor.getName)
413             puts "Family '" + descriptor.getNameAsString + "' already exists, the old one will be replaced"
414             htd.modifyFamily(descriptor)
415           else
416             htd.addFamily(descriptor)
417           end
418           has_columns = true
419           next
420         end
421         if arg.key?(REGION_REPLICATION)
422           region_replication = JInteger.valueOf(arg.delete(REGION_REPLICATION))
423           htd.setRegionReplication(region_replication)
424         end
426         # Get rid of the "METHOD", which is deprecated for create.
427         # We'll do whatever it used to do below if it's table_att.
428         if (method = arg.delete(METHOD))
429           raise(ArgumentError, 'table_att is currently the only supported method') unless method == 'table_att'
430         end
432         # The hash is not a column family. Figure out what's in it.
433         # First, handle splits.
434         if arg.key?(SPLITS_FILE)
435           splits_file = arg.delete(SPLITS_FILE)
436           unless File.exist?(splits_file)
437             raise(ArgumentError, "Splits file #{splits_file} doesn't exist")
438           end
439           arg[SPLITS] = []
440           File.foreach(splits_file) do |line|
441             arg[SPLITS].push(line.chomp)
442           end
443           htd.setValue(SPLITS_FILE, arg[SPLITS_FILE])
444         end
446         if arg.key?(SPLITS)
447           splits = Java::byte[][arg[SPLITS].size].new
448           idx = 0
449           arg.delete(SPLITS).each do |split|
450             splits[idx] = org.apache.hadoop.hbase.util.Bytes.toBytesBinary(split)
451             idx += 1
452           end
453         elsif arg.key?(NUMREGIONS) || arg.key?(SPLITALGO)
454           # deprecated region pre-split API; if one of the above is specified, will be ignored.
455           raise(ArgumentError, 'Number of regions must be specified') unless arg.key?(NUMREGIONS)
456           raise(ArgumentError, 'Split algorithm must be specified') unless arg.key?(SPLITALGO)
457           raise(ArgumentError, 'Number of regions must be greater than 1') unless arg[NUMREGIONS] > 1
458           num_regions = arg.delete(NUMREGIONS)
459           split_algo = RegionSplitter.newSplitAlgoInstance(@conf, arg.delete(SPLITALGO))
460           splits = split_algo.split(JInteger.valueOf(num_regions))
461         end
463         # Done with splits; apply formerly-table_att parameters.
464         update_htd_from_arg(htd, arg)
466         arg.each_key do |ignored_key|
467           puts(format('An argument ignored (unknown or overridden): %s', ignored_key))
468         end
469       end
471       # Fail if no column families defined
472       raise(ArgumentError, 'Table must have at least one column family') unless has_columns
474       if splits.nil?
475         # Perform the create table call
476         @admin.createTable(htd)
477       else
478         # Perform the create table call
479         @admin.createTable(htd, splits)
480       end
481     end
483     #----------------------------------------------------------------------------------------------
484     #----------------------------------------------------------------------------------------------
485     # Assign a region
486     def assign(region_name)
487       @admin.assign(region_name.to_java_bytes)
488     end
490     #----------------------------------------------------------------------------------------------
491     # Unassign a region
492     def unassign(region_name, force)
493       @admin.unassign(region_name.to_java_bytes, java.lang.Boolean.valueOf(force))
494     end
496     #----------------------------------------------------------------------------------------------
497     # Move a region
498     def move(encoded_region_name, server = nil)
499       @admin.move(encoded_region_name.to_java_bytes, server ? server.to_java_bytes : nil)
500     end
502     #----------------------------------------------------------------------------------------------
503     # Merge two regions
504     def merge_region(region_a_name, region_b_name, force)
505       @admin.mergeRegions(region_a_name.to_java_bytes,
506                           region_b_name.to_java_bytes,
507                           java.lang.Boolean.valueOf(force))
508     end
510     #----------------------------------------------------------------------------------------------
511     # Returns table's structure description
512     def describe(table_name)
513       tableExists(table_name)
514       @admin.getDescriptor(TableName.valueOf(table_name)).to_s
515     end
517     def get_column_families(table_name)
518       tableExists(table_name)
519       @admin.getDescriptor(TableName.valueOf(table_name)).getColumnFamilies
520     end
522     def get_table_attributes(table_name)
523       tableExists(table_name)
524       @admin.getDescriptor(TableName.valueOf(table_name)).toStringTableAttributes
525     end
527     #----------------------------------------------------------------------------------------------
528     # Truncates table (deletes all records by recreating the table)
529     def truncate(table_name_str)
530       puts "Truncating '#{table_name_str}' table (it may take a while):"
531       table_name = TableName.valueOf(table_name_str)
532       table_description = @admin.getDescriptor(table_name)
533       raise ArgumentError, "Table #{table_name_str} is not enabled. Enable it first." unless
534           enabled?(table_name_str)
535       puts 'Disabling table...'
536       @admin.disableTable(table_name)
538       begin
539         puts 'Truncating table...'
540         @admin.truncateTable(table_name, false)
541       rescue => e
542         # Handle the compatibility case, where the truncate method doesn't exists on the Master
543         raise e unless e.respond_to?(:cause) && !e.cause.nil?
544         rootCause = e.cause
545         if rootCause.is_a?(org.apache.hadoop.hbase.DoNotRetryIOException)
546           # Handle the compatibility case, where the truncate method doesn't exists on the Master
547           puts 'Dropping table...'
548           @admin.deleteTable(table_name)
550           puts 'Creating table...'
551           @admin.createTable(table_description)
552         else
553           raise e
554         end
555       end
556     end
558     #----------------------------------------------------------------------------------------------
559     # Truncates table while maintaing region boundaries (deletes all records by recreating the table)
560     def truncate_preserve(table_name_str, conf = @conf)
561       puts "Truncating '#{table_name_str}' table (it may take a while):"
562       table_name = TableName.valueOf(table_name_str)
563       locator = @connection.getRegionLocator(table_name)
564       begin
565         splits = locator.getAllRegionLocations
566                         .map { |i| Bytes.toStringBinary(i.getRegionInfo.getStartKey) }
567                         .delete_if { |k| k == '' }.to_java :String
568         splits = org.apache.hadoop.hbase.util.Bytes.toBinaryByteArrays(splits)
569       ensure
570         locator.close
571       end
573       table_description = @admin.getDescriptor(table_name)
574       puts 'Disabling table...'
575       disable(table_name_str)
577       begin
578         puts 'Truncating table...'
579         # just for test
580         unless conf.getBoolean('hbase.client.truncatetable.support', true)
581           raise UnsupportedMethodException, 'truncateTable'
582         end
583         @admin.truncateTable(table_name, true)
584       rescue => e
585         # Handle the compatibility case, where the truncate method doesn't exists on the Master
586         raise e unless e.respond_to?(:cause) && !e.cause.nil?
587         rootCause = e.cause
588         if rootCause.is_a?(org.apache.hadoop.hbase.DoNotRetryIOException)
589           # Handle the compatibility case, where the truncate method doesn't exists on the Master
590           puts 'Dropping table...'
591           @admin.deleteTable(table_name)
593           puts 'Creating table with region boundaries...'
594           @admin.createTable(table_description, splits)
595         else
596           raise e
597         end
598       end
599     end
601     class UnsupportedMethodException < StandardError
602       def initialize(name)
603         @method_name = name
604       end
606       def cause
607         org.apache.hadoop.hbase.DoNotRetryIOException.new("#{@method_name} is not support")
608       end
609     end
611     #----------------------------------------------------------------------------------------------
612     # Check the status of alter command (number of regions reopened)
613     def alter_status(table_name)
614       # Table name should be a string
615       raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String)
617       # Table should exist
618       raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name)
620       status = Pair.new
621       begin
622         status = @admin.getAlterStatus(org.apache.hadoop.hbase.TableName.valueOf(table_name))
623         if status.getSecond != 0
624           puts "#{status.getSecond - status.getFirst}/#{status.getSecond} regions updated."
625         else
626           puts 'All regions updated.'
627         end
628         sleep 1
629       end while !status.nil? && status.getFirst != 0
630       puts 'Done.'
631     end
633     #----------------------------------------------------------------------------------------------
634     # Change table structure or table options
635     def alter(table_name_str, wait = true, *args)
636       # Table name should be a string
637       raise(ArgumentError, 'Table name must be of type String') unless
638           table_name_str.is_a?(String)
640       # Table should exist
641       raise(ArgumentError, "Can't find a table: #{table_name_str}") unless exists?(table_name_str)
643       # There should be at least one argument
644       raise(ArgumentError, 'There should be at least one argument but the table name') if args.empty?
646       table_name = TableName.valueOf(table_name_str)
648       # Get table descriptor
649       htd = org.apache.hadoop.hbase.HTableDescriptor.new(@admin.getDescriptor(table_name))
650       hasTableUpdate = false
652       # Process all args
653       args.each do |arg|
654         # Normalize args to support column name only alter specs
655         arg = { NAME => arg } if arg.is_a?(String)
657         # Normalize args to support shortcut delete syntax
658         arg = { METHOD => 'delete', NAME => arg['delete'] } if arg['delete']
660         # There are 3 possible options.
661         # 1) Column family spec. Distinguished by having a NAME and no METHOD.
662         method = arg.delete(METHOD)
663         if method.nil? && arg.key?(NAME)
664           descriptor = hcd(arg, htd)
665           column_name = descriptor.getNameAsString
667           # If column already exist, then try to alter it. Create otherwise.
668           if htd.hasFamily(column_name.to_java_bytes)
669             htd.modifyFamily(descriptor)
670           else
671             htd.addFamily(descriptor)
672           end
673           hasTableUpdate = true
674           next
675         end
677         # 2) Method other than table_att, with some args.
678         name = arg.delete(NAME)
679         if !method.nil? && method != 'table_att'
680           # Delete column family
681           if method == 'delete'
682             raise(ArgumentError, 'NAME parameter missing for delete method') unless name
683             htd.removeFamily(name.to_java_bytes)
684             hasTableUpdate = true
685           # Unset table attributes
686           elsif method == 'table_att_unset'
687             raise(ArgumentError, 'NAME parameter missing for table_att_unset method') unless name
688             if name.is_a?(Array)
689               name.each do |key|
690                 if htd.getValue(key).nil?
691                   raise ArgumentError, "Could not find attribute: #{key}"
692                 end
693                 htd.remove(key)
694               end
695             else
696               if htd.getValue(name).nil?
697                 raise ArgumentError, "Could not find attribute: #{name}"
698               end
699               htd.remove(name)
700             end
701             hasTableUpdate = true
702           # Unset table configuration
703           elsif method == 'table_conf_unset'
704             raise(ArgumentError, 'NAME parameter missing for table_conf_unset method') unless name
705             if name.is_a?(Array)
706               name.each do |key|
707                 if htd.getConfigurationValue(key).nil?
708                   raise ArgumentError, "Could not find configuration: #{key}"
709                 end
710                 htd.removeConfiguration(key)
711               end
712             else
713               if htd.getConfigurationValue(name).nil?
714                 raise ArgumentError, "Could not find configuration: #{name}"
715               end
716               htd.removeConfiguration(name)
717             end
718             hasTableUpdate = true
719           # Unknown method
720           else
721             raise ArgumentError, "Unknown method: #{method}"
722           end
724           arg.each_key do |unknown_key|
725             puts(format('Unknown argument ignored: %s', unknown_key))
726           end
728           next
729         end
731         # 3) Some args for the table, optionally with METHOD => table_att (deprecated)
732         update_htd_from_arg(htd, arg)
734         # set a coprocessor attribute
735         valid_coproc_keys = []
736         next unless arg.is_a?(Hash)
737         arg.each do |key, value|
738           k = String.new(key) # prepare to strip
739           k.strip!
741           next unless k =~ /coprocessor/i
742           v = String.new(value)
743           v.strip!
744           # TODO: We should not require user to config the coprocessor with our inner format.
745           htd.addCoprocessorWithSpec(v)
746           valid_coproc_keys << key
747         end
749         valid_coproc_keys.each do |key|
750           arg.delete(key)
751         end
753         hasTableUpdate = true
755         arg.each_key do |unknown_key|
756           puts(format('Unknown argument ignored: %s', unknown_key))
757         end
759         next
760       end
762       # Bulk apply all table modifications.
763       if hasTableUpdate
764         future = @admin.modifyTableAsync(htd)
766         if wait == true
767           puts 'Updating all regions with the new schema...'
768           future.get
769         end
770       end
771     end
773     def status(format, type)
774       status = org.apache.hadoop.hbase.ClusterStatus.new(@admin.getClusterMetrics)
775       if format == 'detailed'
776         puts(format('version %s', status.getHBaseVersion))
777         # Put regions in transition first because usually empty
778         puts(format('%d regionsInTransition', status.getRegionStatesInTransition.size))
779         for v in status.getRegionStatesInTransition
780           puts(format('    %s', v))
781         end
782         master = status.getMaster
783         puts(format('active master:  %s:%d %d', master.getHostname, master.getPort, master.getStartcode))
784         puts(format('%d backup masters', status.getBackupMastersSize))
785         for server in status.getBackupMasters
786           puts(format('    %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
787         end
789         master_coprocs = @admin.getMasterCoprocessorNames.toString
790         unless master_coprocs.nil?
791           puts(format('master coprocessors: %s', master_coprocs))
792         end
793         puts(format('%d live servers', status.getServersSize))
794         for server in status.getServers
795           puts(format('    %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
796           puts(format('        %s', status.getLoad(server).toString))
797           for name, region in status.getLoad(server).getRegionsLoad
798             puts(format('        %s', region.getNameAsString.dump))
799             puts(format('            %s', region.toString))
800           end
801         end
802         puts(format('%d dead servers', status.getDeadServersSize))
803         for server in status.getDeadServerNames
804           puts(format('    %s', server))
805         end
806       elsif format == 'replication'
807         puts(format('version %<version>s', version: status.getHBaseVersion))
808         puts(format('%<servers>d live servers', servers: status.getServersSize))
809         status.getServers.each do |server_status|
810           sl = status.getLoad(server_status)
811           r_sink_string   = '      SINK:'
812           r_source_string = '       SOURCE:'
813           r_load_sink = sl.getReplicationLoadSink
814           next if r_load_sink.nil?
816           r_sink_string << ' AgeOfLastAppliedOp=' +
817                            r_load_sink.getAgeOfLastAppliedOp.to_s
818           r_sink_string << ', TimeStampsOfLastAppliedOp=' +
819                            java.util.Date.new(r_load_sink
820                              .getTimeStampsOfLastAppliedOp).toString
821           r_load_source_map = sl.getReplicationLoadSourceMap
822           build_source_string(r_load_source_map, r_source_string)
823           puts(format('    %<host>s:', host: server_status.getHostname))
824           if type.casecmp('SOURCE').zero?
825             puts(format('%<source>s', source: r_source_string))
826           elsif type.casecmp('SINK').zero?
827             puts(format('%<sink>s', sink: r_sink_string))
828           else
829             puts(format('%<source>s', source: r_source_string))
830             puts(format('%<sink>s', sink: r_sink_string))
831           end
832         end
833       elsif format == 'simple'
834         load = 0
835         regions = 0
836         master = status.getMaster
837         puts(format('active master:  %s:%d %d', master.getHostname, master.getPort, master.getStartcode))
838         puts(format('%d backup masters', status.getBackupMastersSize))
839         for server in status.getBackupMasters
840           puts(format('    %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
841         end
842         puts(format('%d live servers', status.getServersSize))
843         for server in status.getServers
844           puts(format('    %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
845           puts(format('        %s', status.getLoad(server).toString))
846           load += status.getLoad(server).getNumberOfRequests
847           regions += status.getLoad(server).getNumberOfRegions
848         end
849         puts(format('%d dead servers', status.getDeadServers))
850         for server in status.getDeadServerNames
851           puts(format('    %s', server))
852         end
853         puts(format('Aggregate load: %d, regions: %d', load, regions))
854       else
855         puts "1 active master, #{status.getBackupMastersSize} backup masters, #{status.getServersSize} servers, #{status.getDeadServers} dead, #{format('%.4f', status.getAverageLoad)} average load"
856       end
857     end
859     def build_source_string(r_load_source_map, r_source_string)
860       r_load_source_map.each do |peer, sources|
861         r_source_string << ' PeerID=' + peer
862         sources.each do |source_load|
863           build_queue_title(source_load, r_source_string)
864           build_running_source_stats(source_load, r_source_string)
865         end
866       end
867     end
869     def build_queue_title(source_load, r_source_string)
870       r_source_string << if source_load.isRecovered
871                            "\n         Recovered Queue: "
872                          else
873                            "\n         Normal Queue: "
874                          end
875       r_source_string << source_load.getQueueId
876     end
878     def build_running_source_stats(source_load, r_source_string)
879       if source_load.isRunning
880         build_shipped_stats(source_load, r_source_string)
881         build_load_general_stats(source_load, r_source_string)
882         r_source_string << ', Replication Lag=' +
883                            source_load.getReplicationLag.to_s
884       else
885         r_source_string << "\n           "
886         r_source_string << 'No Reader/Shipper threads runnning yet.'
887       end
888     end
890     def build_shipped_stats(source_load, r_source_string)
891       r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
892                            "\n           " \
893                            'No Ops shipped since last restart'
894                          else
895                            "\n           AgeOfLastShippedOp=" +
896                            source_load.getAgeOfLastShippedOp.to_s +
897                            ', TimeStampOfLastShippedOp=' +
898                            java.util.Date.new(source_load
899                              .getTimeStampOfLastShippedOp).toString
900                          end
901     end
903     def build_load_general_stats(source_load, r_source_string)
904       r_source_string << ', SizeOfLogQueue=' +
905                          source_load.getSizeOfLogQueue.to_s
906       r_source_string << ', EditsReadFromLogQueue=' +
907                          source_load.getEditsRead.to_s
908       r_source_string << ', OpsShippedToTarget=' +
909                          source_load.getOPsShipped.to_s
910       build_edits_for_source(source_load, r_source_string)
911     end
913     def build_edits_for_source(source_load, r_source_string)
914       if source_load.hasEditsSinceRestart
915         r_source_string << ', TimeStampOfNextToReplicate=' +
916                            java.util.Date.new(source_load
917                              .getTimeStampOfNextToReplicate).toString
918       else
919         r_source_string << ', No edits for this source'
920         r_source_string << ' since it started'
921       end
922     end
924     #----------------------------------------------------------------------------------------------
925     #
926     # Helper methods
927     #
929     # Does table exist?
930     def exists?(table_name)
931       @admin.tableExists(TableName.valueOf(table_name))
932     end
934     #----------------------------------------------------------------------------------------------
935     # Is table enabled
936     def enabled?(table_name)
937       @admin.isTableEnabled(TableName.valueOf(table_name))
938     end
940     #----------------------------------------------------------------------------------------------
941     # Return a new HColumnDescriptor made of passed args
942     def hcd(arg, htd)
943       # String arg, single parameter constructor
944       return org.apache.hadoop.hbase.HColumnDescriptor.new(arg) if arg.is_a?(String)
946       raise(ArgumentError, "Column family #{arg} must have a name") unless name = arg.delete(NAME)
948       family = htd.getFamily(name.to_java_bytes)
949       # create it if it's a new family
950       family ||= org.apache.hadoop.hbase.HColumnDescriptor.new(name.to_java_bytes)
952       family.setBlockCacheEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE)
953       family.setScope(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE)
954       family.setCacheDataOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE)
955       family.setCacheIndexesOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_INDEX_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_INDEX_ON_WRITE)
956       family.setCacheBloomsOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_BLOOMS_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_BLOOMS_ON_WRITE)
957       family.setEvictBlocksOnClose(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::EVICT_BLOCKS_ON_CLOSE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::EVICT_BLOCKS_ON_CLOSE)
958       family.setInMemory(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY)
959       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION)
960         family.setInMemoryCompaction(
961           org.apache.hadoop.hbase.MemoryCompactionPolicy.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION))
962         )
963       end
964       family.setTimeToLive(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::TTL)) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL)
965       family.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING)
966       family.setBlocksize(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
967       family.setMaxVersions(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
968       family.setMinVersions(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
969       family.setKeepDeletedCells(org.apache.hadoop.hbase.KeepDeletedCells.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS).to_s.upcase)) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS)
970       family.setCompressTags(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESS_TAGS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESS_TAGS)
971       family.setPrefetchBlocksOnOpen(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::PREFETCH_BLOCKS_ON_OPEN))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::PREFETCH_BLOCKS_ON_OPEN)
972       family.setMobEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IS_MOB))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IS_MOB)
973       family.setMobThreshold(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_THRESHOLD))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_THRESHOLD)
974       family.setNewVersionBehavior(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::NEW_VERSION_BEHAVIOR))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::NEW_VERSION_BEHAVIOR)
975       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER)
976         bloomtype = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER).upcase.to_sym
977         if org.apache.hadoop.hbase.regionserver.BloomType.constants.include?(bloomtype)
978           family.setBloomFilterType(org.apache.hadoop.hbase.regionserver.BloomType.valueOf(bloomtype))
979         else
980           raise(ArgumentError, "BloomFilter type #{bloomtype} is not supported. Use one of " + org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.join(' '))
981         end
982       end
983       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION)
984         compression = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION).upcase.to_sym
985         if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression)
986           family.setCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression))
987         else
988           raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
989         end
990       end
991       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION)
992         algorithm = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION).upcase
993         family.setEncryptionType(algorithm)
994         if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION_KEY)
995           key = org.apache.hadoop.hbase.io.crypto.Encryption.pbkdf128(
996             arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION_KEY)
997           )
998           family.setEncryptionKey(org.apache.hadoop.hbase.security.EncryptionUtil.wrapKey(@conf, key,
999                                                                                           algorithm))
1000         end
1001       end
1002       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT)
1003         compression = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT).upcase.to_sym
1004         if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression)
1005           family.setCompactionCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression))
1006         else
1007           raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
1008         end
1009       end
1010       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY)
1011         storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
1012         family.setStoragePolicy(storage_policy)
1013       end
1014       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY)
1015         mob_partition_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY).upcase.to_sym
1016         if org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.include?(mob_partition_policy)
1017           family.setMobCompactPartitionPolicy(org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.valueOf(mob_partition_policy))
1018         else
1019           raise(ArgumentError, "MOB_COMPACT_PARTITION_POLICY #{mob_partition_policy} is not supported. Use one of " + org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.join(' '))
1020         end
1021       end
1023       set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
1024       set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
1025       if arg.include?(org.apache.hadoop.hbase
1026         .HColumnDescriptor::DFS_REPLICATION)
1027         family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase
1028           .HColumnDescriptor::DFS_REPLICATION)))
1029       end
1031       arg.each_key do |unknown_key|
1032         puts(format('Unknown argument ignored for column family %s: %s', name, unknown_key))
1033       end
1035       family
1036     end
1038     #----------------------------------------------------------------------------------------------
1039     # Enables/disables a region by name
1040     def online(region_name, on_off)
1041       # Open meta table
1042       meta = @connection.getTable(org.apache.hadoop.hbase.TableName::META_TABLE_NAME)
1044       # Read region info
1045       # FIXME: fail gracefully if can't find the region
1046       region_bytes = region_name.to_java_bytes
1047       g = org.apache.hadoop.hbase.client.Get.new(region_bytes)
1048       g.addColumn(org.apache.hadoop.hbase.HConstants::CATALOG_FAMILY, org.apache.hadoop.hbase.HConstants::REGIONINFO_QUALIFIER)
1049       hri_bytes = meta.get(g).value
1051       # Change region status
1052       hri = org.apache.hadoop.hbase.util.Writables.getWritable(hri_bytes, org.apache.hadoop.hbase.HRegionInfo.new)
1053       hri.setOffline(on_off)
1055       # Write it back
1056       put = org.apache.hadoop.hbase.client.Put.new(region_bytes)
1057       put.addColumn(org.apache.hadoop.hbase.HConstants::CATALOG_FAMILY,
1058                     org.apache.hadoop.hbase.HConstants::REGIONINFO_QUALIFIER,
1059                     org.apache.hadoop.hbase.util.Writables.getBytes(hri))
1060       meta.put(put)
1061     end
1063     # Apply user metadata to table/column descriptor
1064     def set_user_metadata(descriptor, metadata)
1065       raise(ArgumentError, "#{METADATA} must be a Hash type") unless metadata.is_a?(Hash)
1066       for k, v in metadata
1067         v = v.to_s unless v.nil?
1068         descriptor.setValue(k, v)
1069       end
1070     end
1072     #----------------------------------------------------------------------------------------------
1073     # Take a snapshot of specified table
1074     def snapshot(table, snapshot_name, *args)
1075       # Table name should be a string
1076       raise(ArgumentError, 'Table name must be of type String') unless table.is_a?(String)
1078       # Snapshot name should be a string
1079       raise(ArgumentError, 'Snapshot name must be of type String') unless
1080           snapshot_name.is_a?(String)
1082       table_name = TableName.valueOf(table)
1083       if args.empty?
1084         @admin.snapshot(snapshot_name, table_name)
1085       else
1086         args.each do |arg|
1087           if arg[SKIP_FLUSH] == true
1088             @admin.snapshot(snapshot_name, table_name,
1089                             org.apache.hadoop.hbase.client.SnapshotType::SKIPFLUSH)
1090           else
1091             @admin.snapshot(snapshot_name, table_name)
1092           end
1093         end
1094       end
1095     end
1097     #----------------------------------------------------------------------------------------------
1098     # Restore specified snapshot
1099     def restore_snapshot(snapshot_name, restore_acl = false)
1100       conf = @connection.getConfiguration
1101       take_fail_safe_snapshot = conf.getBoolean('hbase.snapshot.restore.take.failsafe.snapshot', false)
1102       @admin.restoreSnapshot(snapshot_name, take_fail_safe_snapshot, restore_acl)
1103     end
1105     #----------------------------------------------------------------------------------------------
1106     # Create a new table by cloning the snapshot content
1107     def clone_snapshot(snapshot_name, table, restore_acl = false)
1108       @admin.cloneSnapshot(snapshot_name, TableName.valueOf(table), restore_acl)
1109     end
1111     #----------------------------------------------------------------------------------------------
1112     # Delete specified snapshot
1113     def delete_snapshot(snapshot_name)
1114       @admin.deleteSnapshot(snapshot_name)
1115     end
1117     #----------------------------------------------------------------------------------------------
1118     # Deletes the snapshots matching the given regex
1119     def delete_all_snapshot(regex)
1120       @admin.deleteSnapshots(Pattern.compile(regex)).to_a
1121     end
1123     #----------------------------------------------------------------------------------------------
1124     # Deletes the table snapshots matching the given regex
1125     def delete_table_snapshots(tableNameRegex, snapshotNameRegex = '.*')
1126       @admin.deleteTableSnapshots(Pattern.compile(tableNameRegex),
1127         Pattern.compile(snapshotNameRegex)).to_a
1128     end
1130     #----------------------------------------------------------------------------------------------
1131     # Returns a list of snapshots
1132     def list_snapshot(regex = '.*')
1133       @admin.listSnapshots(Pattern.compile(regex)).to_a
1134     end
1136     #----------------------------------------------------------------------------------------------
1137     # Returns a list of table snapshots
1138     def list_table_snapshots(tableNameRegex, snapshotNameRegex = '.*')
1139       @admin.listTableSnapshots(Pattern.compile(tableNameRegex),
1140         Pattern.compile(snapshotNameRegex)).to_a
1141     end
1143     #----------------------------------------------------------------------------------------------
1144     # Returns the ClusterStatus of the cluster
1145     def getClusterStatus
1146       org.apache.hadoop.hbase.ClusterStatus.new(@admin.getClusterMetrics)
1147     end
1149     #----------------------------------------------------------------------------------------------
1150     # Returns a list of regionservers
1151     def getRegionServers
1152       org.apache.hadoop.hbase.ClusterStatus.new(@admin.getClusterMetrics).getServers.map { |serverName| serverName }
1153     end
1155     #----------------------------------------------------------------------------------------------
1156     # Returns servername corresponding to passed server_name_string
1157     def getServerName(server_name_string)
1158       regionservers = getRegionServers
1160       if ServerName.isFullServerName(server_name_string)
1161         return ServerName.valueOf(server_name_string)
1162       else
1163         name_list = server_name_string.split(',')
1165         regionservers.each do|sn|
1166           if name_list[0] == sn.hostname && (name_list[1].nil? ? true : (name_list[1] == sn.port.to_s))
1167             return sn
1168           end
1169         end
1170       end
1172       return nil
1173     end
1175     #----------------------------------------------------------------------------------------------
1176     # Returns a list of servernames
1177     def getServerNames(servers, should_return_all_if_servers_empty)
1178       regionservers = getRegionServers
1179       servernames = []
1181       if servers.empty?
1182         # if no servers were specified as arguments, get a list of all servers
1183         if should_return_all_if_servers_empty
1184           servernames = regionservers
1185         end
1186       else
1187         # Strings replace with ServerName objects in servers array
1188         i = 0
1189         while i < servers.length
1190           server = servers[i]
1192           if ServerName.isFullServerName(server)
1193             servernames.push(ServerName.valueOf(server))
1194           else
1195             name_list = server.split(',')
1196             j = 0
1197             while j < regionservers.length
1198               sn = regionservers[j]
1199               if name_list[0] == sn.hostname && (name_list[1].nil? ? true : (name_list[1] == sn.port.to_s))
1200                 servernames.push(sn)
1201               end
1202               j += 1
1203             end
1204           end
1205           i += 1
1206         end
1207       end
1209       servernames
1210     end
1212     # Apply config specific to a table/column to its descriptor
1213     def set_descriptor_config(descriptor, config)
1214       raise(ArgumentError, "#{CONFIGURATION} must be a Hash type") unless config.is_a?(Hash)
1215       for k, v in config
1216         v = v.to_s unless v.nil?
1217         descriptor.setConfiguration(k, v)
1218       end
1219     end
1221     #----------------------------------------------------------------------------------------------
1222     # Updates the configuration of one regionserver.
1223     def update_config(serverName)
1224       @admin.updateConfiguration(ServerName.valueOf(serverName))
1225     end
1227     #----------------------------------------------------------------------------------------------
1228     # Updates the configuration of all the regionservers.
1229     def update_all_config
1230       @admin.updateConfiguration
1231     end
1233     #----------------------------------------------------------------------------------------------
1234     # Returns namespace's structure description
1235     def describe_namespace(namespace_name)
1236       namespace = @admin.getNamespaceDescriptor(namespace_name)
1238       return namespace.to_s unless namespace.nil?
1240       raise(ArgumentError, "Failed to find namespace named #{namespace_name}")
1241     end
1243     #----------------------------------------------------------------------------------------------
1244     # Returns a list of namespaces in hbase
1245     def list_namespace(regex = '.*')
1246       pattern = java.util.regex.Pattern.compile(regex)
1247       list = @admin.listNamespaceDescriptors.map(&:getName)
1248       list.select { |s| pattern.match(s) }
1249     end
1251     #----------------------------------------------------------------------------------------------
1252     # Returns a list of tables in namespace
1253     def list_namespace_tables(namespace_name)
1254       unless namespace_name.nil?
1255         return @admin.listTableNamesByNamespace(namespace_name).map(&:getQualifierAsString)
1256       end
1258       raise(ArgumentError, "Failed to find namespace named #{namespace_name}")
1259     end
1261     #----------------------------------------------------------------------------------------------
1262     # Creates a namespace
1263     def create_namespace(namespace_name, *args)
1264       # Fail if table name is not a string
1265       raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String)
1267       # Flatten params array
1268       args = args.flatten.compact
1270       # Start defining the table
1271       nsb = org.apache.hadoop.hbase.NamespaceDescriptor.create(namespace_name)
1272       args.each do |arg|
1273         unless arg.is_a?(Hash)
1274           raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
1275         end
1276         for k, v in arg
1277           v = v.to_s unless v.nil?
1278           nsb.addConfiguration(k, v)
1279         end
1280       end
1281       @admin.createNamespace(nsb.build)
1282     end
1284     #----------------------------------------------------------------------------------------------
1285     # modify a namespace
1286     def alter_namespace(namespace_name, *args)
1287       # Fail if table name is not a string
1288       raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String)
1290       nsd = @admin.getNamespaceDescriptor(namespace_name)
1292       raise(ArgumentError, 'Namespace does not exist') unless nsd
1293       nsb = org.apache.hadoop.hbase.NamespaceDescriptor.create(nsd)
1295       # Flatten params array
1296       args = args.flatten.compact
1298       # Start defining the table
1299       args.each do |arg|
1300         unless arg.is_a?(Hash)
1301           raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash type")
1302         end
1303         method = arg[METHOD]
1304         if method == 'unset'
1305           nsb.removeConfiguration(arg[NAME])
1306         elsif method == 'set'
1307           arg.delete(METHOD)
1308           for k, v in arg
1309             v = v.to_s unless v.nil?
1311             nsb.addConfiguration(k, v)
1312           end
1313         else
1314           raise(ArgumentError, "Unknown method #{method}")
1315         end
1316       end
1317       @admin.modifyNamespace(nsb.build)
1318     end
1320     #----------------------------------------------------------------------------------------------
1321     # Drops a table
1322     def drop_namespace(namespace_name)
1323       @admin.deleteNamespace(namespace_name)
1324     end
1326     #----------------------------------------------------------------------------------------------
1327     # Get security capabilities
1328     def get_security_capabilities
1329       @admin.getSecurityCapabilities
1330     end
1332     # List all procedures
1333     def list_procedures
1334       @admin.getProcedures
1335     end
1337     # List all locks
1338     def list_locks
1339       @admin.getLocks
1340     end
1342     # Parse arguments and update HTableDescriptor accordingly
1343     def update_htd_from_arg(htd, arg)
1344       htd.setOwnerString(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::OWNER)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::OWNER)
1345       htd.setMaxFileSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MAX_FILESIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MAX_FILESIZE)
1346       htd.setReadOnly(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::READONLY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::READONLY)
1347       htd.setCompactionEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::COMPACTION_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::COMPACTION_ENABLED)
1348       htd.setSplitEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_ENABLED)
1349       htd.setMergeEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MERGE_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MERGE_ENABLED)
1350       htd.setNormalizationEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZATION_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZATION_ENABLED)
1351       htd.setNormalizerTargetRegionCount(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_COUNT))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_COUNT)
1352       htd.setNormalizerTargetRegionSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_SIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_SIZE)
1353       htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MEMSTORE_FLUSHSIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MEMSTORE_FLUSHSIZE)
1354       htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::DURABILITY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::DURABILITY)
1355       htd.setPriority(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::PRIORITY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::PRIORITY)
1356       htd.setFlushPolicyClassName(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::FLUSH_POLICY)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::FLUSH_POLICY)
1357       htd.setRegionMemstoreReplication(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::REGION_MEMSTORE_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::REGION_MEMSTORE_REPLICATION)
1358       htd.setRegionSplitPolicyClassName(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_POLICY)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_POLICY)
1359       htd.setRegionReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::REGION_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::REGION_REPLICATION)
1360       set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
1361       set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
1362     end
1364     #----------------------------------------------------------------------------------------------
1365     # clear compaction queues
1366     def clear_compaction_queues(server_name, queue_name = nil)
1367       names = %w[long short]
1368       queues = java.util.HashSet.new
1369       if queue_name.nil?
1370         queues.add('long')
1371         queues.add('short')
1372       elsif queue_name.is_a?(String)
1373         queues.add(queue_name)
1374         unless names.include?(queue_name)
1375           raise(ArgumentError, "Unknown queue name #{queue_name}")
1376         end
1377       elsif queue_name.is_a?(Array)
1378         queue_name.each do |s|
1379           queues.add(s)
1380           unless names.include?(s)
1381             raise(ArgumentError, "Unknown queue name #{s}")
1382           end
1383         end
1384       else
1385         raise(ArgumentError, "Unknown queue name #{queue_name}")
1386       end
1387       @admin.clearCompactionQueues(ServerName.valueOf(server_name), queues)
1388     end
1390     #----------------------------------------------------------------------------------------------
1391     # clear dead region servers
1392     def list_deadservers
1393       @admin.listDeadServers.to_a
1394     end
1396     #----------------------------------------------------------------------------------------------
1397     # clear dead region servers
1398     def clear_deadservers(dead_servers)
1399       # Flatten params array
1400       dead_servers = dead_servers.flatten.compact
1401       if dead_servers.empty?
1402         servers = list_deadservers
1403       else
1404         servers = java.util.ArrayList.new
1405         dead_servers.each do |s|
1406           servers.add(ServerName.valueOf(s))
1407         end
1408       end
1409       @admin.clearDeadServers(servers).to_a
1410     end
1412     #----------------------------------------------------------------------------------------------
1413     # List live region servers
1414     def list_liveservers
1415       org.apache.hadoop.hbase.ClusterStatus.new(@admin.getClusterMetrics).getServers.to_a
1416     end
1418     #---------------------------------------------------------------------------
1419     # create a new table by cloning the existent table schema.
1420     def clone_table_schema(table_name, new_table_name, preserve_splits = true)
1421       @admin.cloneTableSchema(TableName.valueOf(table_name),
1422                               TableName.valueOf(new_table_name),
1423                               preserve_splits)
1424     end
1426     #----------------------------------------------------------------------------------------------
1427     # List decommissioned RegionServers
1428     def list_decommissioned_regionservers
1429       @admin.listDecommissionedRegionServers
1430     end
1432     #----------------------------------------------------------------------------------------------
1433     # Decommission a list of region servers, optionally offload corresponding regions
1434     def decommission_regionservers(host_or_servers, should_offload)
1435       # Fail if host_or_servers is neither a string nor an array
1436       unless host_or_servers.is_a?(Array) || host_or_servers.is_a?(String)
1437         raise(ArgumentError,
1438              "#{host_or_servers.class} of #{host_or_servers.inspect} is not of Array/String type")
1439       end
1441       # Fail if should_offload is neither a TrueClass/FalseClass nor a string
1442       unless (!!should_offload == should_offload) || should_offload.is_a?(String)
1443         raise(ArgumentError, "#{should_offload} is not a boolean value")
1444       end
1446       # If a string is passed, convert  it to an array
1447       _host_or_servers =  host_or_servers.is_a?(Array) ?
1448                           host_or_servers :
1449                           java.util.Arrays.asList(host_or_servers)
1451       # Retrieve the server names corresponding to passed _host_or_servers list
1452       server_names = getServerNames(_host_or_servers, false)
1454       # Fail, if we can not find any server(s) corresponding to the passed host_or_servers
1455       if server_names.empty?
1456         raise(ArgumentError,
1457              "Could not find any server(s) with specified name(s): #{host_or_servers}")
1458       end
1460       @admin.decommissionRegionServers(server_names,
1461                                        java.lang.Boolean.valueOf(should_offload))
1462     end
1464     #----------------------------------------------------------------------------------------------
1465     # Recommission a region server, optionally load a list of passed regions
1466     def recommission_regionserver(server_name_string, encoded_region_names)
1467       # Fail if server_name_string is not a string
1468       unless server_name_string.is_a?(String)
1469         raise(ArgumentError,
1470              "#{server_name_string.class} of #{server_name_string.inspect} is not of String type")
1471       end
1473       # Fail if encoded_region_names is not an array
1474       unless encoded_region_names.is_a?(Array)
1475         raise(ArgumentError,
1476              "#{encoded_region_names.class} of #{encoded_region_names.inspect} is not of Array type")
1477       end
1479       # Convert encoded_region_names from string to bytes (element-wise)
1480       region_names_in_bytes = encoded_region_names
1481                               .map {|region_name| region_name.to_java_bytes}
1482                               .compact
1484       # Retrieve the server name corresponding to the passed server_name_string
1485       server_name = getServerName(server_name_string)
1487       # Fail if we can not find a server corresponding to the passed server_name_string
1488       if server_name.nil?
1489         raise(ArgumentError,
1490              "Could not find any server with name #{server_name_string}")
1491       end
1493       @admin.recommissionRegionServer(server_name, region_names_in_bytes)
1494     end
1496     #----------------------------------------------------------------------------------------------
1497     # Stop the active Master
1498     def stop_master
1499       @admin.stopMaster
1500     end
1502     # Stop the given RegionServer
1503     def stop_regionserver(hostport)
1504       @admin.stopRegionServer(hostport)
1505     end
1506   end
1507   # rubocop:enable Metrics/ClassLength