1 package MogileFS
::Rebalance
;
6 use MogileFS
::Server
();
8 # Note: The filters aren't written for maximum speed, as they're not likely
9 # in the slow path. They're supposed to be readable/extensible. Please don't
10 # cram them down unless you have to.
11 # TODO: allow filters to note by dev why they were filtered in/out, and return
12 # that info for DEBUG display.
13 # TODO: Add "debug trace" lines to most functions. "choosing sdev to work on",
15 # TODO: tally into the state how many fids/size/etc it's done so far.
16 # TODO: should track old device state and return to it. Overall this is
17 # probably better fit by switching "device state" to a set of "device flags",
18 # so we can disable specifically "stop getting new files" while we work :(
20 # Default policy structure are all of these fields.
21 # A minimum set of fields should be defined for a policy to be valid..
22 my %default_policy = (
25 from_hosts
=> [], # host ids.
26 from_devices
=> [], # dev ids.
27 from_percent_used
=> undef, # 0.nn * 100
28 from_percent_free
=> undef,
29 from_space_used
=> undef,
30 from_space_free
=> undef,
31 fid_age
=> 'old', # old|new
32 limit_type
=> 'device', # global|device
33 limit_by
=> 'none', # size|count|percent|none
34 limit
=> undef, # 100g|10%|5000
39 to_percent_used
=> undef,
40 to_percent_free
=> undef,
41 to_space_used
=> undef,
42 to_space_free
=> undef,
45 use_dest_devs
=> 'all', # all|N (list up to N devices to rep pol)
46 leave_in_drain_mode
=> 0,
49 # State policy example
63 time_latest_empty_run
=> 0,
69 my $policy = shift || "";
70 my $state = shift || '';
72 # Validate policy here?
78 $self->policy($policy) if $policy;
79 $self->load_state($state) if $state;
88 croak
"policy object already initialized" if $self->{state};
89 croak
"please pass in devices to filter" unless $devs && ref($devs);
90 my %state = %default_state;
92 # If we don't have an initial source device list, discover them.
93 # Used to filter destination devices later.
94 $state{source_devs
} = $self->filter_source_devices($devs);
95 $state{time_started
} = time();
96 $self->{state} = \
%state;
101 my $p = $self->{policy
};
102 my $s = $self->{state};
103 my $sdev = $self->{sdev_current
};
104 unless ($p->{leave_in_drain_mode
}) {
105 Mgd
::get_store
()->set_device_state($sdev, 'alive') if $sdev;
107 $s->{time_stopped
} = time();
112 my $s = $self->{state};
113 $s->{time_finished
} = time();
116 # Resume from saved as_string state.
120 my $state_parsed = $self->_parse_settings($state, \
%default_state);
121 # TODO: validate state?
122 $self->{state} = $state_parsed;
125 # Call as_string()? merge into load_state as "state"?
128 return $self->_save_settings($self->{state});
133 return $self->{source_devs
};
139 # TODO: serialize it or pass a structure?
140 return $self->{policy
};
143 $self->{policy
} = $self->_parse_settings($policy, \
%default_policy);
144 return $self->{policy
};
149 my $settings = shift;
151 while (my ($key, $val) = each %{$settings}) {
152 # Only ref we support is ARRAY at the mo'...
153 if (ref($val) eq 'ARRAY') {
154 push(@tosave, $key . '=' . join(',', @
$val));
156 push(@tosave, $key . '=' . $val);
159 return join(' ', @tosave);
162 # foo=bar foo2=bar2 foo3=baaz,quux
163 sub _parse_settings
{
165 my $settings = shift;
166 my $constraint = shift || '';
168 # the constraint also serves as a set of defaults.
169 %parsed = %{$constraint} if ($constraint);
171 return unless $settings;
172 # parse out from a string: key=value key=value
173 for my $tuple (split /\s/, $settings) {
174 my ($key, $value) = split /=/, $tuple;
175 if (index($value, ',') > -1) {
176 # ',' is reserved for multivalue types.
177 $value = [split /,/, $value];
179 # In the future we could do stronger type checking at load
180 # time, but for now this will happen at use time :/
182 if (exists $constraint->{$key}) {
183 my $c = $constraint->{$key};
184 # default says we should be an array.
185 if (ref($c) && ref($c) eq 'ARRAY' && !ref($value)) {
186 $parsed{$key} = [$value];
188 $parsed{$key} = $value;
191 croak
"Invalid setting $key";
194 $parsed{$key} = $value;
200 # step through the filters and find the next set of fids to rebalance.
201 # should $sto be passed in here or should we fetch it ourselves?
202 # also, should device info be passed in? I think so.
203 # returning 'undef' means there's nothing left
204 # returning an empty array means "try again"
205 sub next_fids_to_rebalance
{
209 my $limit = shift || 100; # random low default.
210 # Balk unless we have a policy or a state?
211 my $policy = $self->{policy
};
212 croak
"No policy loaded" unless $policy;
213 croak
"Must pass in device list" unless $devs;
214 croak
"Must pass in storage object" unless $sto;
215 my $state = $self->{state};
217 # If we're not working against a source device, discover one
218 my $sdev = $self->_find_source_device($state->{source_devs
});
219 return undef unless $sdev;
220 $sdev = Mgd
::device_factory
()->get_by_id($sdev);
221 my $filtered_destdevs = $self->filter_dest_devices($devs);
223 croak
("rebalance cannot find suitable destination devices")
224 unless (@
$filtered_destdevs);
226 my @fids = $sdev->fid_chunks(age
=> $policy->{fid_age
},
227 fidid
=> $state->{sdev_lastfid
},
229 # We'll wait until the next cycle to find a new sdev.
230 if (! @fids || ! $self->_check_limits) {
231 $self->_finish_source_device;
235 # In both old or new cases, the "last" fid in the list is correct.
236 $state->{sdev_lastfid
} = $fids[-1]->id;
238 # TODO: create a filterset for $fid settings. filesize, class, domain, etc.
240 for my $fid (@fids) {
241 # count the fid or size against device limit.
242 next unless $fid->exists;
243 $self->_check_limits($fid) or next;
244 my $destdevs = $self->_choose_dest_devs($fid, $filtered_destdevs);
245 # Update internal stats.
246 $state->{fids_queued
}++;
247 $state->{bytes_queued
} += $fid->length;
248 push(@devfids, [$fid->id, $sdev->id, $destdevs]);
251 $state->{time_latest_run
} = time;
253 $state->{empty_runs
}++;
254 $state->{time_latest_empty_run
} = time;
257 # return block of fiddev combos.
261 # ensure this fid wouldn't overrun a limit.
265 my $p = $self->{policy
};
266 my $s = $self->{state};
267 return 1 if ($p->{limit_by
} eq 'none');
270 if ($p->{limit_type
} eq 'global') {
271 $limit = \
$s->{limit
};
273 $limit = \
$s->{sdev_limit
};
276 if ($p->{limit_by
} eq 'count') {
277 return $fid ?
$$limit-- : $$limit;
278 } elsif ($p->{limit_by
} eq 'size') {
280 if ($fid->length() <= $$limit) {
281 $$limit -= $fid->length();
287 if ($$limit < 1024) {
288 # Arbitrary "give up if there's less than 1kb in the limit"
289 # FIXME: Make this configurable
296 croak
("uknown limit_by type");
300 # shuffle the list and return by limit.
301 # TODO: use the fid->length to ensure we don't send the file to devices
302 # that can't handle it.
303 sub _choose_dest_devs
{
306 my $filtered_devs = shift;
307 my $p = $self->{policy
};
309 my @shuffled_devs = List
::Util
::shuffle
(@
$filtered_devs);
310 return \
@shuffled_devs if ($p->{use_dest_devs
} eq 'all');
312 return [splice @shuffled_devs, 0, $p->{use_dest_devs
}];
315 # Iterate through all possible constraints until we have a final list.
316 # unlike the source list we try this
317 sub filter_source_devices
{
320 my $policy = $self->{policy
};
323 for my $dev (@
$devs) {
324 next unless $dev->can_delete_from;
326 if (@
{$policy->{from_devices
}}) {
327 next unless grep { $_ == $id } @
{$policy->{from_devices
}};
329 if (@
{$policy->{from_hosts
}}) {
330 my $hostid = $dev->hostid;
331 next unless grep { $_ == $hostid } @
{$policy->{from_hosts
}};
333 # "at least this much used"
334 if ($policy->{from_percent_used
}) {
335 # returns undef if it doesn't have stats on the device.
336 my $full = $dev->percent_full * 100;
337 next unless defined $full;
338 next unless $full > $policy->{from_percent_used
};
340 # "at least this much free"
341 if ($policy->{from_percent_free
}) {
342 # returns *0* if lacking stats. Must fix :(
343 my $free = $dev->percent_free * 100;
344 next unless $free; # hope this never lands at exact zero.
345 next unless $free > $policy->{from_percent_free
};
347 # "at least this much used"
348 if ($policy->{from_space_used
}) {
349 my $used = $dev->mb_used;
350 next unless $used && $used > $policy->{from_space_used
};
352 # "at least this much free"
353 if ($policy->{from_space_free
}) {
354 my $free = $dev->mb_free;
355 next unless $free && $free > $policy->{from_space_free
};
363 sub _finish_source_device
{
365 my $state = $self->{state};
366 my $policy = $self->{policy
};
367 croak
"Not presently working on a source device"
368 unless $state->{sdev_current
};
370 delete $state->{sdev_lastfid
};
371 delete $state->{sdev_limit
};
372 my $sdev = delete $state->{sdev_current
};
373 # Unless the user wants a device to never get new files again (sticking in
374 # drain mode), return to alive.
375 unless ($policy->{leave_in_drain_mode
}) {
376 Mgd
::get_store
()->set_device_state($sdev, 'alive');
378 push @
{$state->{completed_devs
}}, $sdev;
381 # TODO: Be aware of down/unavail devices. temp skip them?
382 sub _find_source_device
{
386 my $state = $self->{state};
387 my $p = $self->{policy
};
388 unless ($state->{sdev_current
}) {
389 my $sdev = shift @
$sdevs;
390 return undef, undef unless $sdev;
391 $state->{sdev_current
} = $sdev;
392 $state->{sdev_lastfid
} = 0;
394 if ($p->{limit_type
} eq 'device') {
395 if ($p->{limit_by
} eq 'size') {
396 # Parse the size (default in megs?) out into bytes.
397 $limit = $self->_human_to_bytes($p->{limit
});
398 } elsif ($p->{limit_by
} eq 'count') {
399 $limit = $p->{limit
};
400 } elsif ($p->{limit_by
} eq 'percent') {
401 croak
("policy size limits by percent are unimplemented");
402 } elsif ($p->{limit_by
} eq 'none') {
406 # Must mark device in "drain" mode while we work on it.
407 Mgd
::get_store
()->set_device_state($sdev, 'drain');
408 $state->{sdev_limit
} = $limit;
411 return $state->{sdev_current
};
414 # FIXME: Move to MogileFS::Util
415 # take a numeric string with a char suffix and turn it into bytes.
416 # no suffix means it's already bytes.
417 sub _human_to_bytes
{
422 if ($num =~ m/^(\d+)([bkmgtp])?$/i) {
426 croak
("Don't know what this number is: " . $num);
429 return $digits unless $type || $type eq 'b';
430 # Sorry, being cute here :P
431 return $digits * (1024 ** index('bkmgtpezy', $type));
434 # Apply policy to destination devices.
435 sub filter_dest_devices
{
438 my $policy = $self->{policy
};
439 my $state = $self->{state};
441 # skip anything we would source from.
442 # FIXME: ends up not skipping stuff out of completed_devs? :/
443 my %sdevs = map { $_ => 1 } @
{$state->{source_devs
}},
444 @
{$state->{completed_devs
}}, $state->{sdev_current
};
445 my @devs = grep { ! $sdevs{$_->id} } @
$devs;
448 for my $dev (@devs) {
449 next unless $dev->should_get_new_files;
451 my $hostid = $dev->hostid;
453 if (@
{$policy->{to_devices
}}) {
454 next unless grep { $_ == $id } @
{$policy->{to_devices
}};
456 if (@
{$policy->{to_hosts
}}) {
457 next unless grep { $_ == $hostid } @
{$policy->{to_hosts
}};
459 if (@
{$policy->{not_to_devices
}}) {
460 next if grep { $_ == $id } @
{$policy->{not_to_devices
}};
462 if (@
{$policy->{not_to_hosts
}}) {
463 next if grep { $_ == $hostid } @
{$policy->{not_to_hosts
}};
465 if ($policy->{to_percent_used
}) {
466 my $full = $dev->percent_full * 100;
467 next unless defined $full;
468 next unless $full > $policy->{to_percent_used
};
470 if ($policy->{to_percent_free
}) {
471 my $free = $dev->percent_free * 100;
472 next unless $free; # hope this never lands at exact zero.
473 next unless $free > $policy->{to_percent_free
};
475 if ($policy->{to_space_used
}) {
476 my $used = $dev->mb_used;
477 next unless $used && $used > $policy->{to_space_used
};
479 if ($policy->{to_space_free
}) {
480 my $free = $dev->mb_free;
481 next unless $free && $free > $policy->{to_space_free
};