update internal queue system to use arbitrary jobs
[MogileFS-Server.git] / lib / MogileFS / Connection / Worker.pm
blob7c053b1c533ed59d1358d1c4a6c0ecac47d9feb8
1 package MogileFS::Connection::Worker;
2 # This class maintains a connection to one of the various classes of
3 # workers.
5 use strict;
6 use Danga::Socket ();
7 use base qw{Danga::Socket};
9 use fields (
10 'read_buf',
11 'job',
12 'pid',
13 'reqid',
14 'last_alive', # unixtime
15 'known_state', # hashref of { "$what-$whatid" => $state }
16 'wants_todo', # count of how many jobs worker wants.
19 sub new {
20 my MogileFS::Connection::Worker $self = shift;
21 $self = fields::new($self) unless ref $self;
22 $self->SUPER::new( @_ );
24 $self->{pid} = 0;
25 $self->{reqid} = 0;
26 $self->{wants_todo} = {};
27 $self->{job} = undef;
28 $self->{last_alive} = time();
29 $self->{known_state} = {};
31 return $self;
34 sub note_alive {
35 my $self = shift;
36 $self->{last_alive} = time();
39 sub watchdog_check {
40 my MogileFS::Connection::Worker $self = shift;
42 my $timeout = $self->worker_class->watchdog_timeout;
43 my $time_since_last_alive = time() - $self->{last_alive};
44 return $time_since_last_alive < $timeout;
47 sub event_read {
48 my MogileFS::Connection::Worker $self = shift;
50 # if we read data from it, it's not blocked on something else.
51 $self->note_alive;
53 my $bref = $self->read(1024);
54 return $self->close() unless defined $bref;
55 $self->{read_buf} .= $$bref;
57 while ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
58 my $line = $1;
59 if ($self->job eq 'queryworker' && $line !~ /^(?:\:|error|debug)/) {
60 MogileFS::ProcManager->HandleQueryWorkerResponse($self, $line);
61 } else {
62 MogileFS::ProcManager->HandleChildRequest($self, $line);
67 sub event_write {
68 my $self = shift;
69 my $done = $self->write(undef);
70 $self->watch_write(0) if $done;
73 sub job {
74 my MogileFS::Connection::Worker $self = shift;
75 return $self->{job} unless @_;
76 return $self->{job} = shift;
79 sub wants_todo {
80 my MogileFS::Connection::Worker $self = shift;
81 my $type = shift;
82 return $self->{wants_todo}->{$type}-- unless @_;
83 return $self->{wants_todo}->{$type} = shift;
86 sub worker_class {
87 my MogileFS::Connection::Worker $self = shift;
88 return MogileFS::ProcManager->job_to_class($self->{job});
91 sub pid {
92 my MogileFS::Connection::Worker $self = shift;
93 return $self->{pid} unless @_;
94 return $self->{pid} = shift;
97 sub event_hup { my $self = shift; $self->close; }
98 sub event_err { my $self = shift; $self->close; }
100 sub close {
101 # mark us as being dead
102 my MogileFS::Connection::Worker $self = shift;
103 MogileFS::ProcManager->NoteDeadWorkerConn($self);
104 $self->SUPER::close(@_);
109 # Local Variables:
110 # mode: perl
111 # c-basic-indent: 4
112 # indent-tabs-mode: nil
113 # End: