1 package MogileFS
::Connection
::Worker
;
2 # This class maintains a connection to one of the various classes of
7 use base
qw{Danga
::Socket
};
14 'last_alive', # unixtime
15 'known_state', # hashref of { "$what-$whatid" => $state }
16 'wants_todo', # count of how many jobs worker wants.
20 my MogileFS
::Connection
::Worker
$self = shift;
21 $self = fields
::new
($self) unless ref $self;
22 $self->SUPER::new
( @_ );
26 $self->{wants_todo
} = {};
28 $self->{last_alive
} = time();
29 $self->{known_state
} = {};
36 $self->{last_alive
} = time();
40 my MogileFS
::Connection
::Worker
$self = shift;
42 my $timeout = $self->worker_class->watchdog_timeout;
43 my $time_since_last_alive = time() - $self->{last_alive
};
44 return $time_since_last_alive < $timeout;
48 my MogileFS
::Connection
::Worker
$self = shift;
50 # if we read data from it, it's not blocked on something else.
53 my $bref = $self->read(1024);
54 return $self->close() unless defined $bref;
55 $self->{read_buf
} .= $$bref;
57 while ($self->{read_buf
} =~ s/^(.+?)\r?\n//) {
59 if ($self->job eq 'queryworker' && $line !~ /^(?:\:|error|debug)/) {
60 MogileFS
::ProcManager
->HandleQueryWorkerResponse($self, $line);
62 MogileFS
::ProcManager
->HandleChildRequest($self, $line);
69 my $done = $self->write(undef);
70 $self->watch_write(0) if $done;
74 my MogileFS
::Connection
::Worker
$self = shift;
75 return $self->{job
} unless @_;
76 return $self->{job
} = shift;
80 my MogileFS
::Connection
::Worker
$self = shift;
82 return $self->{wants_todo
}->{$type}-- unless @_;
83 return $self->{wants_todo
}->{$type} = shift;
87 my MogileFS
::Connection
::Worker
$self = shift;
88 return MogileFS
::ProcManager
->job_to_class($self->{job
});
92 my MogileFS
::Connection
::Worker
$self = shift;
93 return $self->{pid
} unless @_;
94 return $self->{pid
} = shift;
97 sub event_hup
{ my $self = shift; $self->close; }
98 sub event_err
{ my $self = shift; $self->close; }
101 # mark us as being dead
102 my MogileFS
::Connection
::Worker
$self = shift;
103 MogileFS
::ProcManager
->NoteDeadWorkerConn($self);
104 $self->SUPER::close(@_);
112 # indent-tabs-mode: nil