Checking in changes prior to tagging of version 2.66.
[MogileFS-Server.git] / lib / MogileFS / Rebalance.pm
blob9a17ca9b515b73b07174988c3d64ed1a90551a12
1 package MogileFS::Rebalance;
2 use strict;
3 use warnings;
4 use Carp qw(croak);
5 use List::Util ();
6 use MogileFS::Server ();
8 # Note: The filters aren't written for maximum speed, as they're not likely
9 # in the slow path. They're supposed to be readable/extensible. Please don't
10 # cram them down unless you have to.
11 # TODO: allow filters to note by dev why they were filtered in/out, and return
12 # that info for DEBUG display.
13 # TODO: Add "debug trace" lines to most functions. "choosing sdev to work on",
14 # etc.
15 # TODO: tally into the state how many fids/size/etc it's done so far.
16 # TODO: should track old device state and return to it. Overall this is
17 # probably better fit by switching "device state" to a set of "device flags",
18 # so we can disable specifically "stop getting new files" while we work :(
20 # Default policy structure are all of these fields.
21 # A minimum set of fields should be defined for a policy to be valid..
22 my %default_policy = (
23 # source
24 from_all_devs => 1,
25 from_hosts => [], # host ids.
26 from_devices => [], # dev ids.
27 from_percent_used => undef, # 0.nn * 100
28 from_percent_free => undef,
29 from_space_used => undef,
30 from_space_free => undef,
31 fid_age => 'old', # old|new
32 limit_type => 'device', # global|device
33 limit_by => 'none', # size|count|percent|none
34 limit => undef, # 100g|10%|5000
35 # target
36 to_all_devs => 1,
37 to_hosts => [],
38 to_devices => [],
39 to_percent_used => undef,
40 to_percent_free => undef,
41 to_space_used => undef,
42 to_space_free => undef,
43 not_to_hosts => [],
44 not_to_devices => [],
45 use_dest_devs => 'all', # all|N (list up to N devices to rep pol)
46 leave_in_drain_mode => 0,
49 # State policy example
50 my %default_state = (
51 completed_devs => [],
52 source_devs => [],
53 sdev_current => 0,
54 sdev_lastfid => 0,
55 sdev_limit => 0,
56 limit => 0,
57 fids_queued => 0,
58 bytes_queued => 0,
59 time_started => 0,
60 time_finished => 0,
61 time_stopped => 0,
62 time_latest_run => 0,
63 time_latest_empty_run => 0,
64 empty_runs => 0,
67 sub new {
68 my $class = shift;
69 my $policy = shift || "";
70 my $state = shift || '';
72 # Validate policy here?
73 my $self = bless {
74 policy => '',
75 state => '',
76 }, $class;
78 $self->policy($policy) if $policy;
79 $self->load_state($state) if $state;
81 return $self;
84 sub init {
85 my $self = shift;
86 my $devs = shift;
88 croak "policy object already initialized" if $self->{state};
89 croak "please pass in devices to filter" unless $devs && ref($devs);
90 my %state = %default_state;
92 # If we don't have an initial source device list, discover them.
93 # Used to filter destination devices later.
94 $state{source_devs} = $self->filter_source_devices($devs);
95 $state{time_started} = time();
96 $self->{state} = \%state;
99 sub stop {
100 my $self = shift;
101 my $p = $self->{policy};
102 my $s = $self->{state};
103 my $sdev = $self->{sdev_current};
104 unless ($p->{leave_in_drain_mode}) {
105 Mgd::get_store()->set_device_state($sdev, 'alive') if $sdev;
107 $s->{time_stopped} = time();
110 sub finish {
111 my $self = shift;
112 my $s = $self->{state};
113 $s->{time_finished} = time();
116 # Resume from saved as_string state.
117 sub load_state {
118 my $self = shift;
119 my $state = shift;
120 my $state_parsed = $self->_parse_settings($state, \%default_state);
121 # TODO: validate state?
122 $self->{state} = $state_parsed;
125 # Call as_string()? merge into load_state as "state"?
126 sub save_state {
127 my $self = shift;
128 return $self->_save_settings($self->{state});
131 sub source_devices {
132 my $self = shift;
133 return $self->{source_devs};
136 sub policy {
137 my $self = shift;
138 unless (@_) {
139 # TODO: serialize it or pass a structure?
140 return $self->{policy};
142 my $policy = shift;
143 $self->{policy} = $self->_parse_settings($policy, \%default_policy);
144 return $self->{policy};
147 sub _save_settings {
148 my $self = shift;
149 my $settings = shift;
150 my @tosave = ();
151 while (my ($key, $val) = each %{$settings}) {
152 # Only ref we support is ARRAY at the mo'...
153 if (ref($val) eq 'ARRAY') {
154 push(@tosave, $key . '=' . join(',', @$val));
155 } else {
156 push(@tosave, $key . '=' . $val);
159 return join(' ', @tosave);
162 # foo=bar foo2=bar2 foo3=baaz,quux
163 sub _parse_settings {
164 my $self = shift;
165 my $settings = shift;
166 my $constraint = shift || '';
167 my %parsed = ();
168 # the constraint also serves as a set of defaults.
169 %parsed = %{$constraint} if ($constraint);
171 return unless $settings;
172 # parse out from a string: key=value key=value
173 for my $tuple (split /\s/, $settings) {
174 my ($key, $value) = split /=/, $tuple;
175 if (index($value, ',') > -1) {
176 # ',' is reserved for multivalue types.
177 $value = [split /,/, $value];
179 # In the future we could do stronger type checking at load
180 # time, but for now this will happen at use time :/
181 if ($constraint) {
182 if (exists $constraint->{$key}) {
183 my $c = $constraint->{$key};
184 # default says we should be an array.
185 if (ref($c) && ref($c) eq 'ARRAY' && !ref($value)) {
186 $parsed{$key} = [$value];
187 } else {
188 $parsed{$key} = $value;
190 } else {
191 croak "Invalid setting $key";
193 } else {
194 $parsed{$key} = $value;
197 return \%parsed;
200 # step through the filters and find the next set of fids to rebalance.
201 # should $sto be passed in here or should we fetch it ourselves?
202 # also, should device info be passed in? I think so.
203 # returning 'undef' means there's nothing left
204 # returning an empty array means "try again"
205 sub next_fids_to_rebalance {
206 my $self = shift;
207 my $devs = shift;
208 my $sto = shift;
209 my $limit = shift || 100; # random low default.
210 # Balk unless we have a policy or a state?
211 my $policy = $self->{policy};
212 croak "No policy loaded" unless $policy;
213 croak "Must pass in device list" unless $devs;
214 croak "Must pass in storage object" unless $sto;
215 my $state = $self->{state};
217 # If we're not working against a source device, discover one
218 my $sdev = $self->_find_source_device($state->{source_devs});
219 return undef unless $sdev;
220 $sdev = Mgd::device_factory()->get_by_id($sdev);
221 my $filtered_destdevs = $self->filter_dest_devices($devs);
223 croak("rebalance cannot find suitable destination devices")
224 unless (@$filtered_destdevs);
226 my @fids = $sdev->fid_chunks(age => $policy->{fid_age},
227 fidid => $state->{sdev_lastfid},
228 limit => $limit);
229 # We'll wait until the next cycle to find a new sdev.
230 if (! @fids || ! $self->_check_limits) {
231 $self->_finish_source_device;
232 return [];
235 # In both old or new cases, the "last" fid in the list is correct.
236 $state->{sdev_lastfid} = $fids[-1]->id;
238 # TODO: create a filterset for $fid settings. filesize, class, domain, etc.
239 my @devfids = ();
240 for my $fid (@fids) {
241 # count the fid or size against device limit.
242 next unless $fid->exists;
243 $self->_check_limits($fid) or next;
244 my $destdevs = $self->_choose_dest_devs($fid, $filtered_destdevs);
245 # Update internal stats.
246 $state->{fids_queued}++;
247 $state->{bytes_queued} += $fid->length;
248 push(@devfids, [$fid->id, $sdev->id, $destdevs]);
251 $state->{time_latest_run} = time;
252 unless (@devfids) {
253 $state->{empty_runs}++;
254 $state->{time_latest_empty_run} = time;
257 # return block of fiddev combos.
258 return \@devfids;
261 # ensure this fid wouldn't overrun a limit.
262 sub _check_limits {
263 my $self = shift;
264 my $fid = shift;
265 my $p = $self->{policy};
266 my $s = $self->{state};
267 return 1 if ($p->{limit_by} eq 'none');
269 my $limit;
270 if ($p->{limit_type} eq 'global') {
271 $limit = \$s->{limit};
272 } else {
273 $limit = \$s->{sdev_limit};
276 if ($p->{limit_by} eq 'count') {
277 return $fid ? $$limit-- : $$limit;
278 } elsif ($p->{limit_by} eq 'size') {
279 if ($fid) {
280 if ($fid->length() <= $$limit) {
281 $$limit -= $fid->length();
282 return 1;
283 } else {
284 return 0;
286 } else {
287 if ($$limit < 1024) {
288 # Arbitrary "give up if there's less than 1kb in the limit"
289 # FIXME: Make this configurable
290 return 0;
291 } else {
292 return 1;
295 } else {
296 croak("uknown limit_by type");
300 # shuffle the list and return by limit.
301 # TODO: use the fid->length to ensure we don't send the file to devices
302 # that can't handle it.
303 sub _choose_dest_devs {
304 my $self = shift;
305 my $fid = shift;
306 my $filtered_devs = shift;
307 my $p = $self->{policy};
309 my @shuffled_devs = List::Util::shuffle(@$filtered_devs);
310 return \@shuffled_devs if ($p->{use_dest_devs} eq 'all');
312 return [splice @shuffled_devs, 0, $p->{use_dest_devs}];
315 # Iterate through all possible constraints until we have a final list.
316 # unlike the source list we try this
317 sub filter_source_devices {
318 my $self = shift;
319 my $devs = shift;
320 my $policy = $self->{policy};
322 my @sdevs = ();
323 for my $dev (@$devs) {
324 next unless $dev->can_delete_from;
325 my $id = $dev->id;
326 if (@{$policy->{from_devices}}) {
327 next unless grep { $_ == $id } @{$policy->{from_devices}};
329 if (@{$policy->{from_hosts}}) {
330 my $hostid = $dev->hostid;
331 next unless grep { $_ == $hostid } @{$policy->{from_hosts}};
333 # "at least this much used"
334 if ($policy->{from_percent_used}) {
335 # returns undef if it doesn't have stats on the device.
336 my $full = $dev->percent_full * 100;
337 next unless defined $full;
338 next unless $full > $policy->{from_percent_used};
340 # "at least this much free"
341 if ($policy->{from_percent_free}) {
342 # returns *0* if lacking stats. Must fix :(
343 my $free = $dev->percent_free * 100;
344 next unless $free; # hope this never lands at exact zero.
345 next unless $free > $policy->{from_percent_free};
347 # "at least this much used"
348 if ($policy->{from_space_used}) {
349 my $used = $dev->mb_used;
350 next unless $used && $used > $policy->{from_space_used};
352 # "at least this much free"
353 if ($policy->{from_space_free}) {
354 my $free = $dev->mb_free;
355 next unless $free && $free > $policy->{from_space_free};
357 push @sdevs, $id;
360 return \@sdevs;
363 sub _finish_source_device {
364 my $self = shift;
365 my $state = $self->{state};
366 my $policy = $self->{policy};
367 croak "Not presently working on a source device"
368 unless $state->{sdev_current};
370 delete $state->{sdev_lastfid};
371 delete $state->{sdev_limit};
372 my $sdev = delete $state->{sdev_current};
373 # Unless the user wants a device to never get new files again (sticking in
374 # drain mode), return to alive.
375 unless ($policy->{leave_in_drain_mode}) {
376 Mgd::get_store()->set_device_state($sdev, 'alive');
378 push @{$state->{completed_devs}}, $sdev;
381 # TODO: Be aware of down/unavail devices. temp skip them?
382 sub _find_source_device {
383 my $self = shift;
384 my $sdevs = shift;
386 my $state = $self->{state};
387 my $p = $self->{policy};
388 unless ($state->{sdev_current}) {
389 my $sdev = shift @$sdevs;
390 return undef, undef unless $sdev;
391 $state->{sdev_current} = $sdev;
392 $state->{sdev_lastfid} = 0;
393 my $limit;
394 if ($p->{limit_type} eq 'device') {
395 if ($p->{limit_by} eq 'size') {
396 # Parse the size (default in megs?) out into bytes.
397 $limit = $self->_human_to_bytes($p->{limit});
398 } elsif ($p->{limit_by} eq 'count') {
399 $limit = $p->{limit};
400 } elsif ($p->{limit_by} eq 'percent') {
401 croak("policy size limits by percent are unimplemented");
402 } elsif ($p->{limit_by} eq 'none') {
403 $limit = 'none';
406 # Must mark device in "drain" mode while we work on it.
407 Mgd::get_store()->set_device_state($sdev, 'drain');
408 $state->{sdev_limit} = $limit;
411 return $state->{sdev_current};
414 # FIXME: Move to MogileFS::Util
415 # take a numeric string with a char suffix and turn it into bytes.
416 # no suffix means it's already bytes.
417 sub _human_to_bytes {
418 my $self = shift;
419 my $num = shift;
421 my ($digits, $type);
422 if ($num =~ m/^(\d+)([bkmgtp])?$/i) {
423 $digits = $1;
424 $type = lc($2);
425 } else {
426 croak("Don't know what this number is: " . $num);
429 return $digits unless $type || $type eq 'b';
430 # Sorry, being cute here :P
431 return $digits * (1024 ** index('bkmgtpezy', $type));
434 # Apply policy to destination devices.
435 sub filter_dest_devices {
436 my $self = shift;
437 my $devs = shift;
438 my $policy = $self->{policy};
439 my $state = $self->{state};
441 # skip anything we would source from.
442 # FIXME: ends up not skipping stuff out of completed_devs? :/
443 my %sdevs = map { $_ => 1 } @{$state->{source_devs}},
444 @{$state->{completed_devs}}, $state->{sdev_current};
445 my @devs = grep { ! $sdevs{$_->id} } @$devs;
447 my @ddevs = ();
448 for my $dev (@devs) {
449 next unless $dev->should_get_new_files;
450 my $id = $dev->id;
451 my $hostid = $dev->hostid;
453 if (@{$policy->{to_devices}}) {
454 next unless grep { $_ == $id } @{$policy->{to_devices}};
456 if (@{$policy->{to_hosts}}) {
457 next unless grep { $_ == $hostid } @{$policy->{to_hosts}};
459 if (@{$policy->{not_to_devices}}) {
460 next if grep { $_ == $id } @{$policy->{not_to_devices}};
462 if (@{$policy->{not_to_hosts}}) {
463 next if grep { $_ == $hostid } @{$policy->{not_to_hosts}};
465 if ($policy->{to_percent_used}) {
466 my $full = $dev->percent_full * 100;
467 next unless defined $full;
468 next unless $full > $policy->{to_percent_used};
470 if ($policy->{to_percent_free}) {
471 my $free = $dev->percent_free * 100;
472 next unless $free; # hope this never lands at exact zero.
473 next unless $free > $policy->{to_percent_free};
475 if ($policy->{to_space_used}) {
476 my $used = $dev->mb_used;
477 next unless $used && $used > $policy->{to_space_used};
479 if ($policy->{to_space_free}) {
480 my $free = $dev->mb_free;
481 next unless $free && $free > $policy->{to_space_free};
483 push @ddevs, $id;
486 return \@ddevs;