So you want to build a system that does jobs. You want the jobs to be able to be run in parallel for speed, but also for redundancy. This system needs to be coordinated so, for example, the same jobs aren't being done twice, the status of every job is easy to see, and multiple servers can run jobs by simply querying the central source.
How can we build code around Innodb to have MySQL be our central controlling scheme?
CREATE TABLE IF NOT EXISTS job_queue(
id int(10) not null auto_increment,
updated timestamp not null,
started timestamp not null,
state ENUM('NEW', 'WORKING', 'DONE', 'ERROR' ) default 'NEW',
PRIMARY KEY ( id ),
KEY( STATE )
) ENGINE=Innodb;
In this schema, our job table has a unique id, started and last updated time, and a STATE. In a real system, there would probably be some more meta data here about the nature of the job.
When new jobs need to be done, rows are inserted into the table (with started=NOW(), updated changes automatically)
insert into job_queue set started=NOW();
Now, in perl, we write our job controller. This program could be run on every server you want to run jobs:
#!/usr/bin/perl
# Don't leave zombies around, shamelessly stolen from 'man perlipc'
use POSIX ":sys_wait_h";
sub REAPER {
my $child;
while (($child = waitpid(-1,WNOHANG)) > 0) {
$Kid_Status{$child} = $?;
}
$SIG{CHLD} = \&REAPER;
}
$SIG{CHLD} = \&REAPER;
use DBD::mysql;
my $dbh = DBI->connect(
'DBI:mysql:database=test;host=127.0.0.1;port=3306',
'test', 'test',
{ RaiseError => 1, AutoCommit => 0 }
);
while( 1 ) {
my $row;
eval {
my $sth = $dbh->prepare( "select id from job_queue where state='NEW' li
mit 1 for update" );
$sth->execute();
$row = $sth->fetchrow_hashref();
$sth->finish();
};
if( $@ or !$row ) {
# Couldn't lock or lock wait timeout
$dbh->commit();
sleep 10;
next;
}
# Got one, change state, commit, and fork a worker
$dbh->do( "update job_queue set state='WORKING' where id=" . $row->{id} );
$dbh->commit();
# Fork a worker
if( fork ) {
# Parent, let the child have the old connection and reconnect to
# the db.
$dbh->{InactiveDestroy} = 1;
$dbh = DBI->connect(
'DBI:mysql:database=test;host=127.0.0.1;port=3306',
'test', 'test',
{ RaiseError => 1, AutoCommit => 0 }
);
} else{
# Child
print "Locking\n";
$dbh->do( "select * from job_queue where id=" . $row->{id}
. " for update " );
print "Working\n";
#simulated work here
srand( time );
sleep rand 30;
$dbh->do( "update job_queue set STATE='DONE' where
id=" . $row->{id} );
print "Committing\n";
$dbh->commit();
$dbh->disconnect();
exit;
}
sleep 1;
}
Now, this isn't bad. But it has at least one thing I don't like about it. If a forked worker dies before the job finishes (and commits), there's no way for the job to be reassigned to another worker.
Ideally each worker would have a lock on his job row inside of a transaction, which it is doing now, but, instead of the job being in the 'WORKING' state first, I'd rather it was 'NEW' before the transaction started. If that were the case, if a worker died, it's unfinished transaction would be rolled back, and the job would be unlocked and NEW again.
However, because of the way SELECT ... FOR UPDATE works, Innodb will deadlock waiting for 'NEW' jobs to unlock from the workers (or wait until they are done). This is sub-optimal, since my parent process could be forking new jobs in the meantime. What would fix this is a SELECT ... FOR UPDATE that skipped locked rows without blocking.
If anyone knows a good way to achieve this, please let me know!
However, assuming we have no better alternatives, we can create a reaper process like this:
#!/usr/bin/perl
use DBD::mysql;
my $dbh = DBI->connect(
'DBI:mysql:database=test;host=127.0.0.1;port=3306',
'test', 'test',
{ RaiseError => 1, AutoCommit => 0 }
);
while( 1 ) {
my $row;
eval {
my $sth = $dbh->prepare( "select id from job_queue where state='WORKING' limit 1 for update" );
$sth->execute();
$row = $sth->fetchrow_hashref();
$sth->finish();
};
if( $@ or !$row ) {
# Couldn't lock or lock wait timeout
$dbh->commit();
sleep 10;
next;
}
# Got one, change state, commit, and fork a worker
$dbh->do( "update job_queue set state='NEW' where id=" . $row->{id} ); $dbh->commit();
sleep 1;
}
This will find the first WORKING row and try to lock it. Since normal jobs will be moved to DONE before they are committed, this should only ever find jobs that are in WORKING and unlocked, which means the worker died.
However this isn't perfect. Because SELECT ... FOR UPDATE will try to lock a row already locked, we have to wait until all jobs before our stalled job are complete, getting deadlocks along the way (be sure to set your innodb_lock_wait_timeout fairly low!. Even further, I've seen dead worker processes leave behind idle mysql connections that hold onto their row locks.
Is there any smoother way to do this? I'd love to hear other people's advice.