Friday, October 12, 2007

Creating a Job queue in Innodb

This post is half-this is how you do it, and half-is there a better way to do it?

So you want to build a system that does jobs. You want the jobs to be able to be run in parallel for speed, but also for redundancy. This system needs to be coordinated so, for example, the same jobs aren't being done twice, the status of every job is easy to see, and multiple servers can run jobs by simply querying the central source.

How can we build code around Innodb to have MySQL be our central controlling scheme?


CREATE TABLE IF NOT EXISTS job_queue(
id int(10) not null auto_increment,

updated timestamp not null,
started timestamp not null,

state ENUM('NEW', 'WORKING', 'DONE', 'ERROR' ) default 'NEW',

PRIMARY KEY ( id ),
KEY( STATE )
) ENGINE=Innodb;

In this schema, our job table has a unique id, started and last updated time, and a STATE. In a real system, there would probably be some more meta data here about the nature of the job.

When new jobs need to be done, rows are inserted into the table (with started=NOW(), updated changes automatically)

insert into job_queue set started=NOW();


Now, in perl, we write our job controller. This program could be run on every server you want to run jobs:


#!/usr/bin/perl

# Don't leave zombies around, shamelessly stolen from 'man perlipc'
use POSIX ":sys_wait_h";
sub REAPER {
my $child;
while (($child = waitpid(-1,WNOHANG)) > 0) {
$Kid_Status{$child} = $?;
}
$SIG{CHLD} = \&REAPER;
}
$SIG{CHLD} = \&REAPER;

use DBD::mysql;

my $dbh = DBI->connect(
'DBI:mysql:database=test;host=127.0.0.1;port=3306',
'test', 'test',
{ RaiseError => 1, AutoCommit => 0 }
);


while( 1 ) {
my $row;
eval {
my $sth = $dbh->prepare( "select id from job_queue where state='NEW' li
mit 1 for update" );
$sth->execute();
$row = $sth->fetchrow_hashref();
$sth->finish();
};

if( $@ or !$row ) {
# Couldn't lock or lock wait timeout
$dbh->commit();
sleep 10;
next;
}

# Got one, change state, commit, and fork a worker

$dbh->do( "update job_queue set state='WORKING' where id=" . $row->{id} );
$dbh->commit();

# Fork a worker
if( fork ) {
# Parent, let the child have the old connection and reconnect to
# the db.
$dbh->{InactiveDestroy} = 1;
$dbh = DBI->connect(
'DBI:mysql:database=test;host=127.0.0.1;port=3306',
'test', 'test',
{ RaiseError => 1, AutoCommit => 0 }
);
} else{
# Child
print "Locking\n";
$dbh->do( "select * from job_queue where id=" . $row->{id}
. " for update " );

print "Working\n";
#simulated work here
srand( time );
sleep rand 30;

$dbh->do( "update job_queue set STATE='DONE' where
id=" . $row->{id} );
print "Committing\n";
$dbh->commit();

$dbh->disconnect();

exit;
}
sleep 1;
}


Now, this isn't bad. But it has at least one thing I don't like about it. If a forked worker dies before the job finishes (and commits), there's no way for the job to be reassigned to another worker.

Ideally each worker would have a lock on his job row inside of a transaction, which it is doing now, but, instead of the job being in the 'WORKING' state first, I'd rather it was 'NEW' before the transaction started. If that were the case, if a worker died, it's unfinished transaction would be rolled back, and the job would be unlocked and NEW again.

However, because of the way SELECT ... FOR UPDATE works, Innodb will deadlock waiting for 'NEW' jobs to unlock from the workers (or wait until they are done). This is sub-optimal, since my parent process could be forking new jobs in the meantime. What would fix this is a SELECT ... FOR UPDATE that skipped locked rows without blocking.

If anyone knows a good way to achieve this, please let me know!

However,  assuming we have no better alternatives, we can create a reaper process like this:


#!/usr/bin/perl

use DBD::mysql;

my $dbh = DBI->connect(
'DBI:mysql:database=test;host=127.0.0.1;port=3306',
'test', 'test',
{ RaiseError => 1, AutoCommit => 0 }
);


while( 1 ) {
my $row;
eval {
my $sth = $dbh->prepare( "select id from job_queue where state='WORKING' limit 1 for update" );
$sth->execute();
$row = $sth->fetchrow_hashref();
$sth->finish();
};

if( $@ or !$row ) {
# Couldn't lock or lock wait timeout
$dbh->commit();
sleep 10;
next;
}

# Got one, change state, commit, and fork a worker

$dbh->do( "update job_queue set state='NEW' where id=" . $row->{id} ); $dbh->commit();

sleep 1;
}


This will find the first WORKING row and try to lock it. Since normal jobs will be moved to DONE before they are committed, this should only ever find jobs that are in WORKING and unlocked, which means the worker died.

However this isn't perfect. Because SELECT ... FOR UPDATE will try to lock a row already locked, we have to wait until all jobs before our stalled job are complete, getting deadlocks along the way (be sure to set your innodb_lock_wait_timeout fairly low!. Even further, I've seen dead worker processes leave behind idle mysql connections that hold onto their row locks.

Is there any smoother way to do this? I'd love to hear other people's advice.

7 comments:

Jeremy Cole said...

How about using a slightly different forking model:

* Parent thread is only responsible for keeping X children running, and cleaning up after a child that aborts.
* Each child connects, looks for work to do, grabs a task, and does it. On finish, the child notifies the parent of success and exits (to keep the state clean).
* The parent replaces the child to keep the pool at X children.
* If a child exits without claiming success, the parent cleans up any pending work marked with that child's hostname+PID.

You need to add a few things to your tasks table regardless:

* Hostname + PID of worker processing it. (mandatory)
* Time the task started to be processed. (mandatory)
* Recent activity timestamp and char field for verbose "current state" (optional)

Those will allow you to get a lot more visibility into what the heck is going on.

Baron said...

I agree with Jeremy, but I'd go one farther. I have found at my employer that it is a much better idea to avoid forking. Forked code is hard to understand, maintain, debug, and monitor. Instead, we have turned all of our forked jobs into jobs where the application can be run many times and not interfere with other running instances. This usually involves a "claim token" -- something unique, like what you get from CONNECTION_ID(). You "reserve some work," say a chunk of ten items in the queue, and if you die without doing them, it's easy for other instances to know. You can just check SHOW PROCESSLIST to see whether the claimant is still alive, and reset the token to 0 or NULL (take your pick) and they're available for processing. There are no race conditions and you need fewer statements: "UPDATE blah SET claim=CONNECTION_ID() WHERE claim IS NULL LIMIT 10" and you're ready to roll.

Baron said...

Oh -- I meant to say. We run these jobs every so often and they grab-and-process until they find no more work. If there's more work than one instance can do, cron will fire off another and it will join in and help. If the two can't keep up, they'll still be running when the next one starts, and now you'll have three of them. And so on. There is no penalty for starting them if there's no work to do. It is beautifully simple and self-regulating. I love simplicity :)

Ants Aasma said...

I avoided using long transactions to control the concurrency precisely due to the issues you mentioned. Instead I had a last checkout date on the job, which defaulted to a special date in the past. Each worker would then grab a job by locking the job table, fetching the job with the earliest checkout that doesn't have the completed flag checked and that is checked out at least a configurable amount of timeout ago, updating the checkout to current timestamp and committing. When done the worker would just set the completed flag on the transaction.

I had no concurrency limiting issues due to the table lock, because the job checkout transaction is really-really fast. As I was using postgres and needed the job records to persist I used a partial index over the last checkout consisting of only uncompleted jobs. This reduced the job lookup to basically a single index fetch and in steady state where most of the jobs in the queue are done the index would be quite tiny. I guess you could achieve similar efficiency by marking the last checkout of completed jobs as a special date far in the future.

Parvesh Garg said...

well, my 2 cents

1. Keep your forking model as it is and add hostname, pid, start time and current state as suggested by jeremy.
2. Make sure your controller checks for the process state for all the workers to be in sync with the DB. Just checking that the worker is running (pid) and doing the same job should be enough.
3. Change your locking strategy from SELECT ... FOR UPDATE to SELECT get_lock('table-name'). This would essentially work as if it is a table lock but is better than LOCK TABLE etc. Doesn't use and lock table indexes, is very light weight, doesn't prevent any other application (monitoring etc) from just reading the table data. Also, innodb has its own well known problems with LOCK TABLE...

Parvesh Garg said...

Also, I'm not a big fan of using auto_increment with InnoDB. If you have requirements where you need to have unique job ids throughout the system, keep the logic in your application.

Anonymous said...
This comment has been removed by a blog administrator.