HBASE-23232 Remove rsgroup profile from pom.xml of hbase-assembly (#779)
[hbase.git] / hbase-shell / src / main / ruby / hbase / admin.rb
blob0d6dbfc6e43d42c30be13f8b320f3f65301ff3f1
3 # Licensed to the Apache Software Foundation (ASF) under one
4 # or more contributor license agreements.  See the NOTICE file
5 # distributed with this work for additional information
6 # regarding copyright ownership.  The ASF licenses this file
7 # to you under the Apache License, Version 2.0 (the
8 # "License"); you may not use this file except in compliance
9 # with the License.  You may obtain a copy of the License at
11 #     http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
20 include Java
21 java_import java.util.Arrays
22 java_import java.util.regex.Pattern
23 java_import org.apache.hadoop.hbase.util.Pair
24 java_import org.apache.hadoop.hbase.util.RegionSplitter
25 java_import org.apache.hadoop.hbase.util.Bytes
26 java_import org.apache.hadoop.hbase.ServerName
27 java_import org.apache.hadoop.hbase.TableName
29 # Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
31 module Hbase
32   # rubocop:disable Metrics/ClassLength
33   class Admin
34     include HBaseConstants
36     def initialize(connection)
37       @connection = connection
38       # Java Admin instance
39       @admin = @connection.getAdmin
40       @hbck = @connection.getHbck
41       @conf = @connection.getConfiguration
42     end
44     def close
45       @admin.close
46     end
48     #----------------------------------------------------------------------------------------------
49     # Returns a list of tables in hbase
50     def list(regex = '.*')
51       @admin.listTableNames(Pattern.compile(regex)).map(&:getNameAsString)
52     end
54     #----------------------------------------------------------------------------------------------
55     # Requests a table or region or region server flush
56     def flush(name)
57       @admin.flushRegion(name.to_java_bytes)
58     rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
59       # Unknown region. Try table.
60       begin
61         @admin.flush(TableName.valueOf(name))
62       rescue java.lang.IllegalArgumentException
63         # Unknown table. Try region server.
64         @admin.flushRegionServer(ServerName.valueOf(name))
65       end
66     end
68     #----------------------------------------------------------------------------------------------
69     # Requests a table or region or column family compaction
70     def compact(table_or_region_name, family = nil, type = 'NORMAL')
71       family_bytes = nil
72       family_bytes = family.to_java_bytes unless family.nil?
73       compact_type = nil
74       if type == 'NORMAL'
75         compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL
76       elsif type == 'MOB'
77         compact_type = org.apache.hadoop.hbase.client.CompactType::MOB
78       else
79         raise ArgumentError, 'only NORMAL or MOB accepted for type!'
80       end
82       begin
83         if family_bytes.nil?
84           @admin.compactRegion(table_or_region_name.to_java_bytes)
85         else
86           @admin.compactRegion(table_or_region_name.to_java_bytes, family_bytes)
87         end
88       rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
89         if family_bytes.nil?
90           @admin.compact(TableName.valueOf(table_or_region_name), compact_type)
91         else
92           @admin.compact(TableName.valueOf(table_or_region_name), family_bytes, compact_type)
93         end
94       end
95     end
97     #----------------------------------------------------------------------------------------------
98     # Switch compaction on/off at runtime on a region server
99     def compaction_switch(on_or_off, regionserver_names)
100       region_servers = regionserver_names.flatten.compact
101       servers = java.util.ArrayList.new
102       if region_servers.any?
103         region_servers.each do |s|
104           servers.add(s)
105         end
106       end
107       @admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers)
108     end
110     #----------------------------------------------------------------------------------------------
111     # Gets compaction state for specified table
112     def getCompactionState(table_name)
113       @admin.getCompactionState(TableName.valueOf(table_name)).name
114     end
116     # Requests to compact all regions on the regionserver
117     def compact_regionserver(servername, major = false)
118       if major
119         @admin.majorCompactRegionServer(ServerName.valueOf(servername))
120       else
121         @admin.compactRegionServer(ServerName.valueOf(servername))
122       end
123     end
125     #----------------------------------------------------------------------------------------------
126     # Requests a table or region or column family major compaction
127     def major_compact(table_or_region_name, family = nil, type = 'NORMAL')
128       family_bytes = nil
129       family_bytes = family.to_java_bytes unless family.nil?
130       compact_type = nil
131       if type == 'NORMAL'
132         compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL
133       elsif type == 'MOB'
134         compact_type = org.apache.hadoop.hbase.client.CompactType::MOB
135       else
136         raise ArgumentError, 'only NORMAL or MOB accepted for type!'
137       end
139       begin
140         if family_bytes.nil?
141           @admin.majorCompactRegion(table_or_region_name.to_java_bytes)
142         else
143           @admin.majorCompactRegion(table_or_region_name.to_java_bytes, family_bytes)
144         end
145       rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
146         if family_bytes.nil?
147           @admin.majorCompact(TableName.valueOf(table_or_region_name), compact_type)
148         else
149           @admin.majorCompact(TableName.valueOf(table_or_region_name), family_bytes, compact_type)
150         end
151       end
152     end
154     #----------------------------------------------------------------------------------------------
155     # Requests a regionserver's WAL roll
156     def wal_roll(server_name)
157       @admin.rollWALWriter(ServerName.valueOf(server_name))
158     end
159     # TODO: remove older hlog_roll version
160     alias hlog_roll wal_roll
162     #----------------------------------------------------------------------------------------------
163     # Requests a table or region split
164     def split(table_or_region_name, split_point = nil)
165       split_point_bytes = nil
166       split_point_bytes = split_point.to_java_bytes unless split_point.nil?
167       begin
168         if split_point_bytes.nil?
169           org.apache.hadoop.hbase.util.FutureUtils.get(@admin.splitRegionAsync(table_or_region_name.to_java_bytes))
170         else
171           org.apache.hadoop.hbase.util.FutureUtils.get(@admin.splitRegionAsync(table_or_region_name.to_java_bytes, split_point_bytes))
172         end
173       rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
174         if split_point_bytes.nil?
175           @admin.split(TableName.valueOf(table_or_region_name))
176         else
177           @admin.split(TableName.valueOf(table_or_region_name), split_point_bytes)
178         end
179       end
180     end
182     #----------------------------------------------------------------------------------------------
183     # Enable/disable one split or merge switch
184     # Returns previous switch setting.
185     def splitormerge_switch(type, enabled)
186       if type == 'SPLIT'
187         @admin.splitSwitch(java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false))
188       elsif type == 'MERGE'
189         @admin.mergeSwitch(java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false))
190       else
191         raise ArgumentError, 'only SPLIT or MERGE accepted for type!'
192       end
193     end
195     #----------------------------------------------------------------------------------------------
196     # Query the current state of the split or merge switch.
197     # Returns the switch's state (true is enabled).
198     def splitormerge_enabled(type)
199       if type == 'SPLIT'
200         @admin.isSplitEnabled
201       elsif type == 'MERGE'
202         @admin.isMergeEnabled
203       else
204         raise ArgumentError, 'only SPLIT or MERGE accepted for type!'
205       end
206     end
208     def locate_region(table_name, row_key)
209       locator = @connection.getRegionLocator(TableName.valueOf(table_name))
210       begin
211         return locator.getRegionLocation(Bytes.toBytesBinary(row_key))
212       ensure
213         locator.close
214       end
215     end
217     #----------------------------------------------------------------------------------------------
218     # Requests a cluster balance
219     # Returns true if balancer ran
220     def balancer(force)
221       @admin.balance(java.lang.Boolean.valueOf(force))
222     end
224     #----------------------------------------------------------------------------------------------
225     # Enable/disable balancer
226     # Returns previous balancer switch setting.
227     def balance_switch(enableDisable)
228       @admin.balancerSwitch(
229         java.lang.Boolean.valueOf(enableDisable), java.lang.Boolean.valueOf(false)
230       )
231     end
233     #----------------------------------------------------------------------------------------------
234     # Query the current state of the LoadBalancer.
235     # Returns the balancer's state (true is enabled).
236     def balancer_enabled?
237       @admin.isBalancerEnabled
238     end
240     #----------------------------------------------------------------------------------------------
241     # Requests clear block cache for table
242     def clear_block_cache(table_name)
243       @admin.clearBlockCache(org.apache.hadoop.hbase.TableName.valueOf(table_name)).toString
244     end
246     #----------------------------------------------------------------------------------------------
247     # Requests region normalization for all configured tables in the cluster
248     # Returns true if normalizer ran successfully
249     def normalize
250       @admin.normalize
251     end
253     #----------------------------------------------------------------------------------------------
254     # Enable/disable region normalizer
255     # Returns previous normalizer switch setting.
256     def normalizer_switch(enableDisable)
257       @admin.normalizerSwitch(java.lang.Boolean.valueOf(enableDisable))
258     end
260     #----------------------------------------------------------------------------------------------
261     # Query the current state of region normalizer.
262     # Returns the state of region normalizer (true is enabled).
263     def normalizer_enabled?
264       @admin.isNormalizerEnabled
265     end
267     #----------------------------------------------------------------------------------------------
268     # Query the current state of master in maintenance mode.
269     # Returns the state of maintenance mode (true is on).
270     def in_maintenance_mode?
271       @admin.isMasterInMaintenanceMode
272     end
274     #----------------------------------------------------------------------------------------------
275     # Request HBCK chore to run
276     def hbck_chore_run
277       @hbck.runHbckChore
278     end
280     #----------------------------------------------------------------------------------------------
281     # Request a scan of the catalog table (for garbage collection)
282     # Returns an int signifying the number of entries cleaned
283     def catalogjanitor_run
284       @admin.runCatalogJanitor
285     end
287     #----------------------------------------------------------------------------------------------
288     # Enable/disable the catalog janitor
289     # Returns previous catalog janitor switch setting.
290     def catalogjanitor_switch(enableDisable)
291       @admin.catalogJanitorSwitch(java.lang.Boolean.valueOf(enableDisable))
292     end
294     #----------------------------------------------------------------------------------------------
295     # Query on the catalog janitor state (enabled/disabled?)
296     # Returns catalog janitor state (true signifies enabled).
297     def catalogjanitor_enabled
298       @admin.isCatalogJanitorEnabled
299     end
301     #----------------------------------------------------------------------------------------------
302     # Request cleaner chore to run (for garbage collection of HFiles and WAL files)
303     def cleaner_chore_run
304       @admin.runCleanerChore
305     end
307     #----------------------------------------------------------------------------------------------
308     # Enable/disable the cleaner chore
309     # Returns previous cleaner switch setting.
310     def cleaner_chore_switch(enableDisable)
311       @admin.cleanerChoreSwitch(java.lang.Boolean.valueOf(enableDisable))
312     end
314     #----------------------------------------------------------------------------------------------
315     # Query on the cleaner chore state (enabled/disabled?)
316     # Returns cleaner state (true signifies enabled).
317     def cleaner_chore_enabled
318       @admin.isCleanerChoreEnabled
319     end
321     #----------------------------------------------------------------------------------------------
322     # Enables a table
323     def enable(table_name)
324       tableExists(table_name)
325       return if enabled?(table_name)
326       @admin.enableTable(TableName.valueOf(table_name))
327     end
329     #----------------------------------------------------------------------------------------------
330     # Enables all tables matching the given regex
331     def enable_all(regex)
332       pattern = Pattern.compile(regex.to_s)
333       failed = java.util.ArrayList.new
334       @admin.listTableNames(pattern).each do |table_name|
335         begin
336           @admin.enableTable(table_name)
337         rescue java.io.IOException => e
338           puts "table:#{table_name}, error:#{e.toString}"
339           failed.add(table_name)
340         end
341       end
342       failed
343     end
345     #----------------------------------------------------------------------------------------------
346     # Disables a table
347     def disable(table_name)
348       tableExists(table_name)
349       return if disabled?(table_name)
350       @admin.disableTable(TableName.valueOf(table_name))
351     end
353     #----------------------------------------------------------------------------------------------
354     # Disables all tables matching the given regex
355     def disable_all(regex)
356       pattern = Pattern.compile(regex.to_s)
357       failed = java.util.ArrayList.new
358       @admin.listTableNames(pattern).each do |table_name|
359         begin
360           @admin.disableTable(table_name)
361         rescue java.io.IOException => e
362           puts "table:#{table_name}, error:#{e.toString}"
363           failed.add(table_name)
364         end
365       end
366       failed
367     end
369     #---------------------------------------------------------------------------------------------
370     # Throw exception if table doesn't exist
371     def tableExists(table_name)
372       raise ArgumentError, "Table #{table_name} does not exist." unless exists?(table_name)
373     end
375     #----------------------------------------------------------------------------------------------
376     # Is table disabled?
377     def disabled?(table_name)
378       @admin.isTableDisabled(TableName.valueOf(table_name))
379     end
381     #----------------------------------------------------------------------------------------------
382     # Drops a table
383     def drop(table_name)
384       tableExists(table_name)
385       raise ArgumentError, "Table #{table_name} is enabled. Disable it first." if enabled?(
386         table_name
387       )
389       @admin.deleteTable(org.apache.hadoop.hbase.TableName.valueOf(table_name))
390     end
392     #----------------------------------------------------------------------------------------------
393     # Drops a table
394     def drop_all(regex)
395       pattern = Pattern.compile(regex.to_s)
396       failed = java.util.ArrayList.new
397       @admin.listTableNames(pattern).each do |table_name|
398         begin
399           @admin.deleteTable(table_name)
400         rescue java.io.IOException => e
401           puts puts "table:#{table_name}, error:#{e.toString}"
402           failed.add(table_name)
403         end
404       end
405       failed
406     end
408     #----------------------------------------------------------------------------------------------
409     # Returns ZooKeeper status dump
410     def zk_dump
411       @zk_wrapper = org.apache.hadoop.hbase.zookeeper.ZKWatcher.new(
412         @admin.getConfiguration,
413         'admin',
414         nil
415       )
416       zk = @zk_wrapper.getRecoverableZooKeeper.getZooKeeper
417       @zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk)
418       org.apache.hadoop.hbase.zookeeper.ZKUtil.dump(@zk_wrapper)
419     end
421     #----------------------------------------------------------------------------------------------
422     # Creates a table
423     def create(table_name, *args)
424       # Fail if table name is not a string
425       raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String)
427       # Flatten params array
428       args = args.flatten.compact
429       has_columns = false
431       # Start defining the table
432       htd = org.apache.hadoop.hbase.HTableDescriptor.new(org.apache.hadoop.hbase.TableName.valueOf(table_name))
433       splits = nil
434       # Args are either columns or splits, add them to the table definition
435       # TODO: add table options support
436       args.each do |arg|
437         unless arg.is_a?(String) || arg.is_a?(Hash)
438           raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
439         end
441         # First, handle all the cases where arg is a column family.
442         if arg.is_a?(String) || arg.key?(NAME)
443           # If the arg is a string, default action is to add a column to the table.
444           # If arg has a name, it must also be a column descriptor.
445           descriptor = hcd(arg, htd)
446           # Warn if duplicate columns are added
447           if htd.hasFamily(descriptor.getName)
448             puts "Family '" + descriptor.getNameAsString + "' already exists, the old one will be replaced"
449             htd.modifyFamily(descriptor)
450           else
451             htd.addFamily(descriptor)
452           end
453           has_columns = true
454           next
455         end
456         if arg.key?(REGION_REPLICATION)
457           region_replication = JInteger.valueOf(arg.delete(REGION_REPLICATION))
458           htd.setRegionReplication(region_replication)
459         end
461         # Get rid of the "METHOD", which is deprecated for create.
462         # We'll do whatever it used to do below if it's table_att.
463         if (method = arg.delete(METHOD))
464           raise(ArgumentError, 'table_att is currently the only supported method') unless method == 'table_att'
465         end
467         # The hash is not a column family. Figure out what's in it.
468         # First, handle splits.
469         if arg.key?(SPLITS_FILE)
470           splits_file = arg.delete(SPLITS_FILE)
471           unless File.exist?(splits_file)
472             raise(ArgumentError, "Splits file #{splits_file} doesn't exist")
473           end
474           arg[SPLITS] = []
475           File.foreach(splits_file) do |line|
476             arg[SPLITS].push(line.chomp)
477           end
478           htd.setValue(SPLITS_FILE, arg[SPLITS_FILE])
479         end
481         if arg.key?(SPLITS)
482           splits = Java::byte[][arg[SPLITS].size].new
483           idx = 0
484           arg.delete(SPLITS).each do |split|
485             splits[idx] = org.apache.hadoop.hbase.util.Bytes.toBytesBinary(split)
486             idx += 1
487           end
488         elsif arg.key?(NUMREGIONS) || arg.key?(SPLITALGO)
489           # deprecated region pre-split API; if one of the above is specified, will be ignored.
490           raise(ArgumentError, 'Number of regions must be specified') unless arg.key?(NUMREGIONS)
491           raise(ArgumentError, 'Split algorithm must be specified') unless arg.key?(SPLITALGO)
492           raise(ArgumentError, 'Number of regions must be greater than 1') unless arg[NUMREGIONS] > 1
493           num_regions = arg.delete(NUMREGIONS)
494           split_algo = RegionSplitter.newSplitAlgoInstance(@conf, arg.delete(SPLITALGO))
495           splits = split_algo.split(JInteger.valueOf(num_regions))
496         end
498         # Done with splits; apply formerly-table_att parameters.
499         update_htd_from_arg(htd, arg)
501         arg.each_key do |ignored_key|
502           puts(format('An argument ignored (unknown or overridden): %s', ignored_key))
503         end
504       end
506       # Fail if no column families defined
507       raise(ArgumentError, 'Table must have at least one column family') unless has_columns
509       if splits.nil?
510         # Perform the create table call
511         @admin.createTable(htd)
512       else
513         # Perform the create table call
514         @admin.createTable(htd, splits)
515       end
516     end
518     #----------------------------------------------------------------------------------------------
519     #----------------------------------------------------------------------------------------------
520     # Assign a region
521     def assign(region_name)
522       @admin.assign(region_name.to_java_bytes)
523     end
525     #----------------------------------------------------------------------------------------------
526     # Unassign a region
527     def unassign(region_name, force)
528       @admin.unassign(region_name.to_java_bytes, java.lang.Boolean.valueOf(force))
529     end
531     #----------------------------------------------------------------------------------------------
532     # Move a region
533     def move(encoded_region_name, server = nil)
534       @admin.move(encoded_region_name.to_java_bytes, server ? server.to_java_bytes : nil)
535     end
537     #----------------------------------------------------------------------------------------------
538     # Merge multiple regions
539     def merge_region(regions, force)
540       unless regions.is_a?(Array)
541         raise(ArgumentError, "Type of #{regions.inspect} is #{regions.class}, but expected Array")
542       end
543       region_array = Java::byte[][regions.length].new
544       i = 0
545       while i < regions.length
546         unless regions[i].is_a?(String)
547           raise(
548               ArgumentError,
549               "Type of #{regions[i].inspect} is #{regions[i].class}, but expected String"
550           )
551         end
552         region_array[i] = regions[i].to_java_bytes
553         i += 1
554       end
555       org.apache.hadoop.hbase.util.FutureUtils.get(
556           @admin.mergeRegionsAsync(
557               region_array,
558               java.lang.Boolean.valueOf(force)
559           )
560       )
561     end
563     #----------------------------------------------------------------------------------------------
564     # Returns table's structure description
565     def describe(table_name)
566       tableExists(table_name)
567       @admin.getDescriptor(TableName.valueOf(table_name)).to_s
568     end
570     def get_column_families(table_name)
571       tableExists(table_name)
572       @admin.getDescriptor(TableName.valueOf(table_name)).getColumnFamilies
573     end
575     def get_table_attributes(table_name)
576       tableExists(table_name)
577       @admin.getDescriptor(TableName.valueOf(table_name)).toStringTableAttributes
578     end
580     #----------------------------------------------------------------------------------------------
581     # Enable/disable snapshot auto-cleanup based on TTL expiration
582     # Returns previous snapshot auto-cleanup switch setting.
583     def snapshot_cleanup_switch(enable_disable)
584       @admin.snapshotCleanupSwitch(
585         java.lang.Boolean.valueOf(enable_disable), java.lang.Boolean.valueOf(false)
586       )
587     end
589     #----------------------------------------------------------------------------------------------
590     # Query the current state of the snapshot auto-cleanup based on TTL
591     # Returns the snapshot auto-cleanup state (true if enabled)
592     def snapshot_cleanup_enabled?
593       @admin.isSnapshotCleanupEnabled
594     end
596     #----------------------------------------------------------------------------------------------
597     # Truncates table (deletes all records by recreating the table)
598     def truncate(table_name_str)
599       puts "Truncating '#{table_name_str}' table (it may take a while):"
600       table_name = TableName.valueOf(table_name_str)
602       if enabled?(table_name_str)
603         puts 'Disabling table...'
604         disable(table_name_str)
605       end
607       puts 'Truncating table...'
608       @admin.truncateTable(table_name, false)
609     end
611     #----------------------------------------------------------------------------------------------
612     # Truncates table while maintaining region boundaries
613     # (deletes all records by recreating the table)
614     def truncate_preserve(table_name_str)
615       puts "Truncating '#{table_name_str}' table (it may take a while):"
616       table_name = TableName.valueOf(table_name_str)
618       if enabled?(table_name_str)
619         puts 'Disabling table...'
620         disable(table_name_str)
621       end
623       puts 'Truncating table...'
624       @admin.truncateTable(table_name, true)
625     end
627     #----------------------------------------------------------------------------------------------
628     # Check the status of alter command (number of regions reopened)
629     def alter_status(table_name)
630       # Table name should be a string
631       raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String)
633       # Table should exist
634       raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name)
636       begin
637         cluster_metrics = @admin.getClusterMetrics
638         table_region_status = cluster_metrics
639                               .getTableRegionStatesCount
640                               .get(org.apache.hadoop.hbase.TableName.valueOf(table_name))
641         if table_region_status.getTotalRegions != 0
642           updated_regions = table_region_status.getTotalRegions -
643                             table_region_status.getRegionsInTransition -
644                             table_region_status.getClosedRegions
645           puts "#{updated_regions}/#{table_region_status.getTotalRegions} regions updated."
646         else
647           puts 'All regions updated.'
648         end
649         sleep 1
650       end while !table_region_status.nil? && table_region_status.getRegionsInTransition != 0
651       puts 'Done.'
652     end
654     #----------------------------------------------------------------------------------------------
655     # Change table structure or table options
656     def alter(table_name_str, wait = true, *args)
657       # Table name should be a string
658       raise(ArgumentError, 'Table name must be of type String') unless
659           table_name_str.is_a?(String)
661       # Table should exist
662       raise(ArgumentError, "Can't find a table: #{table_name_str}") unless exists?(table_name_str)
664       # There should be at least one argument
665       raise(ArgumentError, 'There should be at least one argument but the table name') if args.empty?
667       table_name = TableName.valueOf(table_name_str)
669       # Get table descriptor
670       htd = org.apache.hadoop.hbase.HTableDescriptor.new(@admin.getDescriptor(table_name))
671       hasTableUpdate = false
673       # Process all args
674       args.each do |arg|
675         # Normalize args to support column name only alter specs
676         arg = { NAME => arg } if arg.is_a?(String)
678         # Normalize args to support shortcut delete syntax
679         arg = { METHOD => 'delete', NAME => arg['delete'] } if arg['delete']
681         # There are 3 possible options.
682         # 1) Column family spec. Distinguished by having a NAME and no METHOD.
683         method = arg.delete(METHOD)
684         if method.nil? && arg.key?(NAME)
685           descriptor = hcd(arg, htd)
686           column_name = descriptor.getNameAsString
688           # If column already exist, then try to alter it. Create otherwise.
689           if htd.hasFamily(column_name.to_java_bytes)
690             htd.modifyFamily(descriptor)
691           else
692             htd.addFamily(descriptor)
693           end
694           hasTableUpdate = true
695           next
696         end
698         # 2) Method other than table_att, with some args.
699         name = arg.delete(NAME)
700         if !method.nil? && method != 'table_att'
701           # Delete column family
702           if method == 'delete'
703             raise(ArgumentError, 'NAME parameter missing for delete method') unless name
704             htd.removeFamily(name.to_java_bytes)
705             hasTableUpdate = true
706           # Unset table attributes
707           elsif method == 'table_att_unset'
708             raise(ArgumentError, 'NAME parameter missing for table_att_unset method') unless name
709             if name.is_a?(Array)
710               name.each do |key|
711                 if htd.getValue(key).nil?
712                   raise ArgumentError, "Could not find attribute: #{key}"
713                 end
714                 htd.remove(key)
715               end
716             else
717               if htd.getValue(name).nil?
718                 raise ArgumentError, "Could not find attribute: #{name}"
719               end
720               htd.remove(name)
721             end
722             hasTableUpdate = true
723           # Unset table configuration
724           elsif method == 'table_conf_unset'
725             raise(ArgumentError, 'NAME parameter missing for table_conf_unset method') unless name
726             if name.is_a?(Array)
727               name.each do |key|
728                 if htd.getConfigurationValue(key).nil?
729                   raise ArgumentError, "Could not find configuration: #{key}"
730                 end
731                 htd.removeConfiguration(key)
732               end
733             else
734               if htd.getConfigurationValue(name).nil?
735                 raise ArgumentError, "Could not find configuration: #{name}"
736               end
737               htd.removeConfiguration(name)
738             end
739             hasTableUpdate = true
740           # Unknown method
741           else
742             raise ArgumentError, "Unknown method: #{method}"
743           end
745           arg.each_key do |unknown_key|
746             puts(format('Unknown argument ignored: %s', unknown_key))
747           end
749           next
750         end
752         # 3) Some args for the table, optionally with METHOD => table_att (deprecated)
753         update_htd_from_arg(htd, arg)
755         # set a coprocessor attribute
756         valid_coproc_keys = []
757         next unless arg.is_a?(Hash)
758         arg.each do |key, value|
759           k = String.new(key) # prepare to strip
760           k.strip!
762           next unless k =~ /coprocessor/i
763           v = String.new(value)
764           v.strip!
765           # TODO: We should not require user to config the coprocessor with our inner format.
766           htd.addCoprocessorWithSpec(v)
767           valid_coproc_keys << key
768         end
770         valid_coproc_keys.each do |key|
771           arg.delete(key)
772         end
774         hasTableUpdate = true
776         arg.each_key do |unknown_key|
777           puts(format('Unknown argument ignored: %s', unknown_key))
778         end
780         next
781       end
783       # Bulk apply all table modifications.
784       if hasTableUpdate
785         future = @admin.modifyTableAsync(htd)
787         if wait == true
788           puts 'Updating all regions with the new schema...'
789           future.get
790         end
791       end
792     end
794     def status(format, type)
795       cluster_metrics = @admin.getClusterMetrics
796       if format == 'detailed'
797         puts(format('version %s', cluster_metrics.getHBaseVersion))
798         # Put regions in transition first because usually empty
799         puts(format('%d regionsInTransition', cluster_metrics.getRegionStatesInTransition.size))
800         for v in cluster_metrics.getRegionStatesInTransition
801           puts(format('    %s', v))
802         end
803         master = cluster_metrics.getMasterName
804         puts(format('active master:  %s:%d %d', master.getHostname, master.getPort,
805                     master.getStartcode))
806         puts(format('%d backup masters', cluster_metrics.getBackupMasterNames.size))
807         for server in cluster_metrics.getBackupMasterNames
808           puts(format('    %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
809         end
811         master_coprocs = @admin.getMasterCoprocessorNames.toString
812         unless master_coprocs.nil?
813           puts(format('master coprocessors: %s', master_coprocs))
814         end
815         puts(format('%d live servers', cluster_metrics.getLiveServerMetrics.size))
816         for server in cluster_metrics.getLiveServerMetrics.keySet
817           puts(format('    %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
818           puts(format('        %s', cluster_metrics.getLiveServerMetrics.get(server).toString))
819           for name, region in cluster_metrics.getLiveServerMetrics.get(server).getRegionMetrics
820             puts(format('        %s', region.getNameAsString.dump))
821             puts(format('            %s', region.toString))
822           end
823         end
824         puts(format('%d dead servers', cluster_metrics.getDeadServerNames.size))
825         for server in cluster_metrics.getDeadServerNames
826           puts(format('    %s', server))
827         end
828       elsif format == 'replication'
829         puts(format('version %<version>s', version: cluster_metrics.getHBaseVersion))
830         puts(format('%<servers>d live servers',
831                     servers: cluster_metrics.getLiveServerMetrics.size))
832         cluster_metrics.getLiveServerMetrics.keySet.each do |server_name|
833           sl = cluster_metrics.getLiveServerMetrics.get(server_name)
834           r_sink_string   = '      SINK:'
835           r_source_string = '       SOURCE:'
836           r_load_sink = sl.getReplicationLoadSink
837           next if r_load_sink.nil?
839           r_sink_string << ' AgeOfLastAppliedOp=' +
840                            r_load_sink.getAgeOfLastAppliedOp.to_s
841           r_sink_string << ', TimeStampsOfLastAppliedOp=' +
842                            java.util.Date.new(r_load_sink
843                              .getTimestampsOfLastAppliedOp).toString
844           r_load_source_map = sl.getReplicationLoadSourceMap
845           build_source_string(r_load_source_map, r_source_string)
846           puts(format('    %<host>s:', host: server_name.getHostname))
847           if type.casecmp('SOURCE').zero?
848             puts(format('%<source>s', source: r_source_string))
849           elsif type.casecmp('SINK').zero?
850             puts(format('%<sink>s', sink: r_sink_string))
851           else
852             puts(format('%<source>s', source: r_source_string))
853             puts(format('%<sink>s', sink: r_sink_string))
854           end
855         end
856       elsif format == 'simple'
857         load = 0
858         regions = 0
859         master = cluster_metrics.getMasterName
860         puts(format('active master:  %s:%d %d', master.getHostname, master.getPort,
861                     master.getStartcode))
862         puts(format('%d backup masters', cluster_metrics.getBackupMasterNames.size))
863         for server in cluster_metrics.getBackupMasterNames
864           puts(format('    %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
865         end
866         puts(format('%d live servers', cluster_metrics.getLiveServerMetrics.size))
867         for server in cluster_metrics.getLiveServerMetrics.keySet
868           puts(format('    %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
869           puts(format('        %s', cluster_metrics.getLiveServerMetrics.get(server).toString))
870           load += cluster_metrics.getLiveServerMetrics.get(server).getRequestCountPerSecond
871           regions += cluster_metrics.getLiveServerMetrics.get(server).getRegionMetrics.size
872         end
873         puts(format('%d dead servers', cluster_metrics.getDeadServerNames.size))
874         for server in cluster_metrics.getDeadServerNames
875           puts(format('    %s', server))
876         end
877         puts(format('Aggregate load: %d, regions: %d', load, regions))
878       else
879         puts "1 active master, #{cluster_metrics.getBackupMasterNames.size} backup masters,
880               #{cluster_metrics.getLiveServerMetrics.size} servers,
881               #{cluster_metrics.getDeadServerNames.size} dead,
882               #{format('%.4f', cluster_metrics.getAverageLoad)} average load"
883       end
884     end
886     def build_source_string(r_load_source_map, r_source_string)
887       r_load_source_map.each do |peer, sources|
888         r_source_string << ' PeerID=' + peer
889         sources.each do |source_load|
890           build_queue_title(source_load, r_source_string)
891           build_running_source_stats(source_load, r_source_string)
892         end
893       end
894     end
896     def build_queue_title(source_load, r_source_string)
897       r_source_string << if source_load.isRecovered
898                            "\n         Recovered Queue: "
899                          else
900                            "\n         Normal Queue: "
901                          end
902       r_source_string << source_load.getQueueId
903     end
905     def build_running_source_stats(source_load, r_source_string)
906       if source_load.isRunning
907         build_shipped_stats(source_load, r_source_string)
908         build_load_general_stats(source_load, r_source_string)
909         r_source_string << ', Replication Lag=' +
910                            source_load.getReplicationLag.to_s
911       else
912         r_source_string << "\n           "
913         r_source_string << 'No Reader/Shipper threads runnning yet.'
914       end
915     end
917     def build_shipped_stats(source_load, r_source_string)
918       r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
919                            "\n           " \
920                            'No Ops shipped since last restart'
921                          else
922                            "\n           AgeOfLastShippedOp=" +
923                            source_load.getAgeOfLastShippedOp.to_s +
924                            ', TimeStampOfLastShippedOp=' +
925                            java.util.Date.new(source_load
926                              .getTimeStampOfLastShippedOp).toString
927                          end
928     end
930     def build_load_general_stats(source_load, r_source_string)
931       r_source_string << ', SizeOfLogQueue=' +
932                          source_load.getSizeOfLogQueue.to_s
933       r_source_string << ', EditsReadFromLogQueue=' +
934                          source_load.getEditsRead.to_s
935       r_source_string << ', OpsShippedToTarget=' +
936                          source_load.getOPsShipped.to_s
937       build_edits_for_source(source_load, r_source_string)
938     end
940     def build_edits_for_source(source_load, r_source_string)
941       if source_load.hasEditsSinceRestart
942         r_source_string << ', TimeStampOfNextToReplicate=' +
943                            java.util.Date.new(source_load
944                              .getTimeStampOfNextToReplicate).toString
945       else
946         r_source_string << ', No edits for this source'
947         r_source_string << ' since it started'
948       end
949     end
951     #----------------------------------------------------------------------------------------------
952     #
953     # Helper methods
954     #
956     # Does table exist?
957     def exists?(table_name)
958       @admin.tableExists(TableName.valueOf(table_name))
959     end
961     #----------------------------------------------------------------------------------------------
962     # Is table enabled
963     def enabled?(table_name)
964       @admin.isTableEnabled(TableName.valueOf(table_name))
965     end
967     #----------------------------------------------------------------------------------------------
968     # Return a new HColumnDescriptor made of passed args
969     def hcd(arg, htd)
970       # String arg, single parameter constructor
971       return org.apache.hadoop.hbase.HColumnDescriptor.new(arg) if arg.is_a?(String)
973       raise(ArgumentError, "Column family #{arg} must have a name") unless name = arg.delete(NAME)
975       family = htd.getFamily(name.to_java_bytes)
976       # create it if it's a new family
977       family ||= org.apache.hadoop.hbase.HColumnDescriptor.new(name.to_java_bytes)
979       family.setBlockCacheEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE)
980       family.setScope(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE)
981       family.setCacheDataOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_DATA_ON_WRITE)
982       family.setCacheIndexesOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_INDEX_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_INDEX_ON_WRITE)
983       family.setCacheBloomsOnWrite(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_BLOOMS_ON_WRITE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::CACHE_BLOOMS_ON_WRITE)
984       family.setEvictBlocksOnClose(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::EVICT_BLOCKS_ON_CLOSE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::EVICT_BLOCKS_ON_CLOSE)
985       family.setInMemory(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY)
986       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION)
987         family.setInMemoryCompaction(
988           org.apache.hadoop.hbase.MemoryCompactionPolicy.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY_COMPACTION))
989         )
990       end
991       family.setTimeToLive(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::TTL)) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL)
992       family.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING)
993       family.setBlocksize(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
994       family.setMaxVersions(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
995       family.setMinVersions(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
996       family.setKeepDeletedCells(org.apache.hadoop.hbase.KeepDeletedCells.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS).to_s.upcase)) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS)
997       family.setCompressTags(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESS_TAGS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESS_TAGS)
998       family.setPrefetchBlocksOnOpen(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::PREFETCH_BLOCKS_ON_OPEN))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::PREFETCH_BLOCKS_ON_OPEN)
999       family.setMobEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IS_MOB))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IS_MOB)
1000       family.setMobThreshold(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_THRESHOLD))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_THRESHOLD)
1001       family.setNewVersionBehavior(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::NEW_VERSION_BEHAVIOR))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::NEW_VERSION_BEHAVIOR)
1002       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER)
1003         bloomtype = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER).upcase.to_sym
1004         if org.apache.hadoop.hbase.regionserver.BloomType.constants.include?(bloomtype)
1005           family.setBloomFilterType(org.apache.hadoop.hbase.regionserver.BloomType.valueOf(bloomtype))
1006         else
1007           raise(ArgumentError, "BloomFilter type #{bloomtype} is not supported. Use one of " + org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.join(' '))
1008         end
1009       end
1010       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION)
1011         compression = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION).upcase.to_sym
1012         if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression)
1013           family.setCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression))
1014         else
1015           raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
1016         end
1017       end
1018       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION)
1019         algorithm = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION).upcase
1020         family.setEncryptionType(algorithm)
1021         if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION_KEY)
1022           key = org.apache.hadoop.hbase.io.crypto.Encryption.pbkdf128(
1023             arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::ENCRYPTION_KEY)
1024           )
1025           family.setEncryptionKey(org.apache.hadoop.hbase.security.EncryptionUtil.wrapKey(@conf, key,
1026                                                                                           algorithm))
1027         end
1028       end
1029       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT)
1030         compression = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT).upcase.to_sym
1031         if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression)
1032           family.setCompactionCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression))
1033         else
1034           raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
1035         end
1036       end
1037       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY)
1038         storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
1039         family.setStoragePolicy(storage_policy)
1040       end
1041       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY)
1042         mob_partition_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY).upcase.to_sym
1043         if org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.include?(mob_partition_policy)
1044           family.setMobCompactPartitionPolicy(org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.valueOf(mob_partition_policy))
1045         else
1046           raise(ArgumentError, "MOB_COMPACT_PARTITION_POLICY #{mob_partition_policy} is not supported. Use one of " + org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.join(' '))
1047         end
1048       end
1050       set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
1051       set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
1052       if arg.include?(org.apache.hadoop.hbase
1053         .HColumnDescriptor::DFS_REPLICATION)
1054         family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase
1055           .HColumnDescriptor::DFS_REPLICATION)))
1056       end
1058       arg.each_key do |unknown_key|
1059         puts(format('Unknown argument ignored for column family %s: %s', name, unknown_key))
1060       end
1062       family
1063     end
1065     #----------------------------------------------------------------------------------------------
1066     # Enables/disables a region by name
1067     def online(region_name, on_off)
1068       # Open meta table
1069       meta = @connection.getTable(org.apache.hadoop.hbase.TableName::META_TABLE_NAME)
1071       # Read region info
1072       # FIXME: fail gracefully if can't find the region
1073       region_bytes = region_name.to_java_bytes
1074       g = org.apache.hadoop.hbase.client.Get.new(region_bytes)
1075       g.addColumn(org.apache.hadoop.hbase.HConstants::CATALOG_FAMILY, org.apache.hadoop.hbase.HConstants::REGIONINFO_QUALIFIER)
1076       hri_bytes = meta.get(g).value
1078       # Change region status
1079       hri = org.apache.hadoop.hbase.util.Writables.getWritable(hri_bytes, org.apache.hadoop.hbase.HRegionInfo.new)
1080       hri.setOffline(on_off)
1082       # Write it back
1083       put = org.apache.hadoop.hbase.client.Put.new(region_bytes)
1084       put.addColumn(org.apache.hadoop.hbase.HConstants::CATALOG_FAMILY,
1085                     org.apache.hadoop.hbase.HConstants::REGIONINFO_QUALIFIER,
1086                     org.apache.hadoop.hbase.util.Writables.getBytes(hri))
1087       meta.put(put)
1088     end
1090     # Apply user metadata to table/column descriptor
1091     def set_user_metadata(descriptor, metadata)
1092       raise(ArgumentError, "#{METADATA} must be a Hash type") unless metadata.is_a?(Hash)
1093       for k, v in metadata
1094         v = v.to_s unless v.nil?
1095         descriptor.setValue(k, v)
1096       end
1097     end
1099     #----------------------------------------------------------------------------------------------
1100     # Take a snapshot of specified table
1101     def snapshot(table, snapshot_name, *args)
1102       # Table name should be a string
1103       raise(ArgumentError, 'Table name must be of type String') unless table.is_a?(String)
1105       # Snapshot name should be a string
1106       raise(ArgumentError, 'Snapshot name must be of type String') unless
1107           snapshot_name.is_a?(String)
1109       table_name = TableName.valueOf(table)
1110       if args.empty?
1111         @admin.snapshot(snapshot_name, table_name)
1112       else
1113         args.each do |arg|
1114           ttl = arg[TTL]
1115           ttl = ttl ? ttl.to_java(:long) : -1
1116           snapshot_props = java.util.HashMap.new
1117           snapshot_props.put("TTL", ttl)
1118           if arg[SKIP_FLUSH] == true
1119             @admin.snapshot(snapshot_name, table_name,
1120                             org.apache.hadoop.hbase.client.SnapshotType::SKIPFLUSH, snapshot_props)
1121           else
1122             @admin.snapshot(snapshot_name, table_name, snapshot_props)
1123           end
1124         end
1125       end
1126     end
1128     #----------------------------------------------------------------------------------------------
1129     # Restore specified snapshot
1130     def restore_snapshot(snapshot_name, restore_acl = false)
1131       conf = @connection.getConfiguration
1132       take_fail_safe_snapshot = conf.getBoolean('hbase.snapshot.restore.take.failsafe.snapshot', false)
1133       @admin.restoreSnapshot(snapshot_name, take_fail_safe_snapshot, restore_acl)
1134     end
1136     #----------------------------------------------------------------------------------------------
1137     # Create a new table by cloning the snapshot content
1138     def clone_snapshot(snapshot_name, table, restore_acl = false)
1139       @admin.cloneSnapshot(snapshot_name, TableName.valueOf(table), restore_acl)
1140     end
1142     #----------------------------------------------------------------------------------------------
1143     # Delete specified snapshot
1144     def delete_snapshot(snapshot_name)
1145       @admin.deleteSnapshot(snapshot_name)
1146     end
1148     #----------------------------------------------------------------------------------------------
1149     # Deletes the snapshots matching the given regex
1150     def delete_all_snapshot(regex)
1151       @admin.deleteSnapshots(Pattern.compile(regex)).to_a
1152     end
1154     #----------------------------------------------------------------------------------------------
1155     # Deletes the table snapshots matching the given regex
1156     def delete_table_snapshots(tableNameRegex, snapshotNameRegex = '.*')
1157       @admin.deleteTableSnapshots(Pattern.compile(tableNameRegex),
1158         Pattern.compile(snapshotNameRegex)).to_a
1159     end
1161     #----------------------------------------------------------------------------------------------
1162     # Returns a list of snapshots
1163     def list_snapshot(regex = '.*')
1164       @admin.listSnapshots(Pattern.compile(regex)).to_a
1165     end
1167     #----------------------------------------------------------------------------------------------
1168     # Returns a list of table snapshots
1169     def list_table_snapshots(tableNameRegex, snapshotNameRegex = '.*')
1170       @admin.listTableSnapshots(Pattern.compile(tableNameRegex),
1171         Pattern.compile(snapshotNameRegex)).to_a
1172     end
1174     #----------------------------------------------------------------------------------------------
1175     # Returns the whole ClusterMetrics containing details:
1176     #
1177     # hbase version
1178     # cluster id
1179     # primary/backup master(s)
1180     # master's coprocessors
1181     # live/dead regionservers
1182     # balancer
1183     # regions in transition
1184     def getClusterMetrics
1185       @admin.getClusterMetrics
1186     end
1188     #----------------------------------------------------------------------------------------------
1189     # Returns a list of regionservers
1190     def getRegionServers
1191       @admin.getClusterMetrics.getLiveServerMetrics.keySet.map { |server_name| server_name }
1192     end
1194     #----------------------------------------------------------------------------------------------
1195     # Returns servername corresponding to passed server_name_string
1196     def getServerName(server_name_string)
1197       regionservers = getRegionServers
1199       if ServerName.isFullServerName(server_name_string)
1200         return ServerName.valueOf(server_name_string)
1201       else
1202         name_list = server_name_string.split(',')
1204         regionservers.each do|sn|
1205           if name_list[0] == sn.hostname && (name_list[1].nil? ? true : (name_list[1] == sn.port.to_s))
1206             return sn
1207           end
1208         end
1209       end
1211       return nil
1212     end
1214     #----------------------------------------------------------------------------------------------
1215     # Returns a list of servernames
1216     def getServerNames(servers, should_return_all_if_servers_empty)
1217       regionservers = getRegionServers
1218       servernames = []
1220       if servers.empty?
1221         # if no servers were specified as arguments, get a list of all servers
1222         if should_return_all_if_servers_empty
1223           servernames = regionservers
1224         end
1225       else
1226         # Strings replace with ServerName objects in servers array
1227         i = 0
1228         while i < servers.length
1229           server = servers[i]
1231           if ServerName.isFullServerName(server)
1232             servernames.push(ServerName.valueOf(server))
1233           else
1234             name_list = server.split(',')
1235             j = 0
1236             while j < regionservers.length
1237               sn = regionservers[j]
1238               if name_list[0] == sn.hostname && (name_list[1].nil? ? true : (name_list[1] == sn.port.to_s))
1239                 servernames.push(sn)
1240               end
1241               j += 1
1242             end
1243           end
1244           i += 1
1245         end
1246       end
1248       servernames
1249     end
1251     # Apply config specific to a table/column to its descriptor
1252     def set_descriptor_config(descriptor, config)
1253       raise(ArgumentError, "#{CONFIGURATION} must be a Hash type") unless config.is_a?(Hash)
1254       for k, v in config
1255         v = v.to_s unless v.nil?
1256         descriptor.setConfiguration(k, v)
1257       end
1258     end
1260     #----------------------------------------------------------------------------------------------
1261     # Updates the configuration of one regionserver.
1262     def update_config(serverName)
1263       @admin.updateConfiguration(ServerName.valueOf(serverName))
1264     end
1266     #----------------------------------------------------------------------------------------------
1267     # Updates the configuration of all the regionservers.
1268     def update_all_config
1269       @admin.updateConfiguration
1270     end
1272     #----------------------------------------------------------------------------------------------
1273     # Returns namespace's structure description
1274     def describe_namespace(namespace_name)
1275       namespace = @admin.getNamespaceDescriptor(namespace_name)
1277       return namespace.to_s unless namespace.nil?
1279       raise(ArgumentError, "Failed to find namespace named #{namespace_name}")
1280     end
1282     #----------------------------------------------------------------------------------------------
1283     # Returns a list of namespaces in hbase
1284     def list_namespace(regex = '.*')
1285       pattern = java.util.regex.Pattern.compile(regex)
1286       list = @admin.listNamespaces
1287       list.select { |s| pattern.match(s) }
1288     end
1290     #----------------------------------------------------------------------------------------------
1291     # Returns a list of tables in namespace
1292     def list_namespace_tables(namespace_name)
1293       unless namespace_name.nil?
1294         return @admin.listTableNamesByNamespace(namespace_name).map(&:getQualifierAsString)
1295       end
1297       raise(ArgumentError, "Failed to find namespace named #{namespace_name}")
1298     end
1300     #----------------------------------------------------------------------------------------------
1301     # Creates a namespace
1302     def create_namespace(namespace_name, *args)
1303       # Fail if table name is not a string
1304       raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String)
1306       # Flatten params array
1307       args = args.flatten.compact
1309       # Start defining the table
1310       nsb = org.apache.hadoop.hbase.NamespaceDescriptor.create(namespace_name)
1311       args.each do |arg|
1312         unless arg.is_a?(Hash)
1313           raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
1314         end
1315         for k, v in arg
1316           v = v.to_s unless v.nil?
1317           nsb.addConfiguration(k, v)
1318         end
1319       end
1320       @admin.createNamespace(nsb.build)
1321     end
1323     #----------------------------------------------------------------------------------------------
1324     # modify a namespace
1325     def alter_namespace(namespace_name, *args)
1326       # Fail if table name is not a string
1327       raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String)
1329       nsd = @admin.getNamespaceDescriptor(namespace_name)
1331       raise(ArgumentError, 'Namespace does not exist') unless nsd
1332       nsb = org.apache.hadoop.hbase.NamespaceDescriptor.create(nsd)
1334       # Flatten params array
1335       args = args.flatten.compact
1337       # Start defining the table
1338       args.each do |arg|
1339         unless arg.is_a?(Hash)
1340           raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash type")
1341         end
1342         method = arg[METHOD]
1343         if method == 'unset'
1344           nsb.removeConfiguration(arg[NAME])
1345         elsif method == 'set'
1346           arg.delete(METHOD)
1347           for k, v in arg
1348             v = v.to_s unless v.nil?
1350             nsb.addConfiguration(k, v)
1351           end
1352         else
1353           raise(ArgumentError, "Unknown method #{method}")
1354         end
1355       end
1356       @admin.modifyNamespace(nsb.build)
1357     end
1359     #----------------------------------------------------------------------------------------------
1360     # Drops a table
1361     def drop_namespace(namespace_name)
1362       @admin.deleteNamespace(namespace_name)
1363     end
1365     #----------------------------------------------------------------------------------------------
1366     # Get security capabilities
1367     def get_security_capabilities
1368       @admin.getSecurityCapabilities
1369     end
1371     # List all procedures
1372     def list_procedures
1373       @admin.getProcedures
1374     end
1376     # List all locks
1377     def list_locks
1378       @admin.getLocks
1379     end
1381     # Parse arguments and update HTableDescriptor accordingly
1382     def update_htd_from_arg(htd, arg)
1383       htd.setOwnerString(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::OWNER)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::OWNER)
1384       htd.setMaxFileSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MAX_FILESIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MAX_FILESIZE)
1385       htd.setReadOnly(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::READONLY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::READONLY)
1386       htd.setCompactionEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::COMPACTION_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::COMPACTION_ENABLED)
1387       htd.setSplitEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_ENABLED)
1388       htd.setMergeEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MERGE_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MERGE_ENABLED)
1389       htd.setNormalizationEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZATION_ENABLED))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZATION_ENABLED)
1390       htd.setNormalizerTargetRegionCount(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_COUNT))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_COUNT)
1391       htd.setNormalizerTargetRegionSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_SIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::NORMALIZER_TARGET_REGION_SIZE)
1392       htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::MEMSTORE_FLUSHSIZE))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::MEMSTORE_FLUSHSIZE)
1393       htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::DURABILITY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::DURABILITY)
1394       htd.setPriority(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::PRIORITY))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::PRIORITY)
1395       htd.setFlushPolicyClassName(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::FLUSH_POLICY)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::FLUSH_POLICY)
1396       htd.setRegionMemstoreReplication(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::REGION_MEMSTORE_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::REGION_MEMSTORE_REPLICATION)
1397       htd.setRegionSplitPolicyClassName(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_POLICY)) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::SPLIT_POLICY)
1398       htd.setRegionReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HTableDescriptor::REGION_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.HTableDescriptor::REGION_REPLICATION)
1399       set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
1400       set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
1401     end
1403     #----------------------------------------------------------------------------------------------
1404     # clear compaction queues
1405     def clear_compaction_queues(server_name, queue_name = nil)
1406       names = %w[long short]
1407       queues = java.util.HashSet.new
1408       if queue_name.nil?
1409         queues.add('long')
1410         queues.add('short')
1411       elsif queue_name.is_a?(String)
1412         queues.add(queue_name)
1413         unless names.include?(queue_name)
1414           raise(ArgumentError, "Unknown queue name #{queue_name}")
1415         end
1416       elsif queue_name.is_a?(Array)
1417         queue_name.each do |s|
1418           queues.add(s)
1419           unless names.include?(s)
1420             raise(ArgumentError, "Unknown queue name #{s}")
1421           end
1422         end
1423       else
1424         raise(ArgumentError, "Unknown queue name #{queue_name}")
1425       end
1426       @admin.clearCompactionQueues(ServerName.valueOf(server_name), queues)
1427     end
1429     #----------------------------------------------------------------------------------------------
1430     # clear dead region servers
1431     def list_deadservers
1432       @admin.listDeadServers.to_a
1433     end
1435     #----------------------------------------------------------------------------------------------
1436     # clear dead region servers
1437     def clear_deadservers(dead_servers)
1438       # Flatten params array
1439       dead_servers = dead_servers.flatten.compact
1440       if dead_servers.empty?
1441         servers = list_deadservers
1442       else
1443         servers = java.util.ArrayList.new
1444         dead_servers.each do |s|
1445           servers.add(ServerName.valueOf(s))
1446         end
1447       end
1448       @admin.clearDeadServers(servers).to_a
1449     end
1451     #----------------------------------------------------------------------------------------------
1452     # List live region servers
1453     def list_liveservers
1454       @admin.getClusterMetrics.getLiveServerMetrics.keySet.to_a
1455     end
1457     #---------------------------------------------------------------------------
1458     # create a new table by cloning the existent table schema.
1459     def clone_table_schema(table_name, new_table_name, preserve_splits = true)
1460       @admin.cloneTableSchema(TableName.valueOf(table_name),
1461                               TableName.valueOf(new_table_name),
1462                               preserve_splits)
1463     end
1465     #----------------------------------------------------------------------------------------------
1466     # List decommissioned RegionServers
1467     def list_decommissioned_regionservers
1468       @admin.listDecommissionedRegionServers
1469     end
1471     #----------------------------------------------------------------------------------------------
1472     # Retrieve SlowLog Responses from RegionServers
1473     def get_slowlog_responses(server_names, args)
1474       unless server_names.is_a?(Array) || server_names.is_a?(String)
1475         raise(ArgumentError,
1476               "#{server_names.class} of #{server_names.inspect} is not of Array/String type")
1477       end
1478       if server_names == '*'
1479         server_names = getServerNames([], true)
1480       else
1481         server_names_list = to_server_names(server_names)
1482         server_names = getServerNames(server_names_list, false)
1483       end
1484       filter_params = get_filter_params(args)
1485       slow_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names),
1486                                                       filter_params)
1487       slow_log_responses_arr = []
1488       for slow_log_response in slow_log_responses
1489         slow_log_responses_arr << slow_log_response.toJsonPrettyPrint
1490       end
1491       puts 'Retrieved SlowLog Responses from RegionServers'
1492       puts slow_log_responses_arr
1493     end
1495     def get_filter_params(args)
1496       filter_params = org.apache.hadoop.hbase.client.SlowLogQueryFilter.new
1497       if args.key? 'REGION_NAME'
1498         region_name = args['REGION_NAME']
1499         filter_params.setRegionName(region_name)
1500       end
1501       if args.key? 'TABLE_NAME'
1502         table_name = args['TABLE_NAME']
1503         filter_params.setTableName(table_name)
1504       end
1505       if args.key? 'CLIENT_IP'
1506         client_ip = args['CLIENT_IP']
1507         filter_params.setClientAddress(client_ip)
1508       end
1509       if args.key? 'USER'
1510         user = args['USER']
1511         filter_params.setUserName(user)
1512       end
1513       if args.key? 'LIMIT'
1514         limit = args['LIMIT']
1515         filter_params.setLimit(limit)
1516       end
1517       filter_params
1518     end
1520     #----------------------------------------------------------------------------------------------
1521     # Clears SlowLog Responses from RegionServers
1522     def clear_slowlog_responses(server_names)
1523       unless server_names.nil? || server_names.is_a?(Array) || server_names.is_a?(String)
1524         raise(ArgumentError,
1525               "#{server_names.class} of #{server_names.inspect} is not of correct type")
1526       end
1527       if server_names.nil?
1528         server_names = getServerNames([], true)
1529       else
1530         server_names_list = to_server_names(server_names)
1531         server_names = getServerNames(server_names_list, false)
1532       end
1533       clear_log_responses = @admin.clearSlowLogResponses(java.util.HashSet.new(server_names))
1534       clear_log_success_count = 0
1535       clear_log_responses.each do |response|
1536         if response
1537           clear_log_success_count += 1
1538         end
1539       end
1540       puts 'Cleared Slowlog responses from ' \
1541            "#{clear_log_success_count}/#{clear_log_responses.size} RegionServers"
1542     end
1544     #----------------------------------------------------------------------------------------------
1545     # Decommission a list of region servers, optionally offload corresponding regions
1546     def decommission_regionservers(host_or_servers, should_offload)
1547       # Fail if host_or_servers is neither a string nor an array
1548       unless host_or_servers.is_a?(Array) || host_or_servers.is_a?(String)
1549         raise(ArgumentError,
1550              "#{host_or_servers.class} of #{host_or_servers.inspect} is not of Array/String type")
1551       end
1553       # Fail if should_offload is neither a TrueClass/FalseClass nor a string
1554       unless (!!should_offload == should_offload) || should_offload.is_a?(String)
1555         raise(ArgumentError, "#{should_offload} is not a boolean value")
1556       end
1558       # If a string is passed, convert  it to an array
1559       _host_or_servers =  host_or_servers.is_a?(Array) ?
1560                           host_or_servers :
1561                           java.util.Arrays.asList(host_or_servers)
1563       # Retrieve the server names corresponding to passed _host_or_servers list
1564       server_names = getServerNames(_host_or_servers, false)
1566       # Fail, if we can not find any server(s) corresponding to the passed host_or_servers
1567       if server_names.empty?
1568         raise(ArgumentError,
1569              "Could not find any server(s) with specified name(s): #{host_or_servers}")
1570       end
1572       @admin.decommissionRegionServers(server_names,
1573                                        java.lang.Boolean.valueOf(should_offload))
1574     end
1576     #----------------------------------------------------------------------------------------------
1577     # Recommission a region server, optionally load a list of passed regions
1578     def recommission_regionserver(server_name_string, encoded_region_names)
1579       # Fail if server_name_string is not a string
1580       unless server_name_string.is_a?(String)
1581         raise(ArgumentError,
1582              "#{server_name_string.class} of #{server_name_string.inspect} is not of String type")
1583       end
1585       # Fail if encoded_region_names is not an array
1586       unless encoded_region_names.is_a?(Array)
1587         raise(ArgumentError,
1588              "#{encoded_region_names.class} of #{encoded_region_names.inspect} is not of Array type")
1589       end
1591       # Convert encoded_region_names from string to bytes (element-wise)
1592       region_names_in_bytes = encoded_region_names
1593                               .map {|region_name| region_name.to_java_bytes}
1594                               .compact
1596       # Retrieve the server name corresponding to the passed server_name_string
1597       server_name = getServerName(server_name_string)
1599       # Fail if we can not find a server corresponding to the passed server_name_string
1600       if server_name.nil?
1601         raise(ArgumentError,
1602              "Could not find any server with name #{server_name_string}")
1603       end
1605       @admin.recommissionRegionServer(server_name, region_names_in_bytes)
1606     end
1608     #----------------------------------------------------------------------------------------------
1609     # Stop the active Master
1610     def stop_master
1611       @admin.stopMaster
1612     end
1614     # Stop the given RegionServer
1615     def stop_regionserver(hostport)
1616       @admin.stopRegionServer(hostport)
1617     end
1619     #----------------------------------------------------------------------------------------------
1620     # Get list of server names
1621     def to_server_names(server_names)
1622       if server_names.is_a?(Array)
1623         server_names
1624       else
1625         java.util.Arrays.asList(server_names)
1626       end
1627     end
1628   end
1629   # rubocop:enable Metrics/ClassLength