?

Log in

No account? Create an account
entries friends calendar profile Previous Previous Next Next
Perl - threading and Thread::Queue - Ed's journal
sobrique
sobrique
Perl - threading and Thread::Queue
This week, I have mostly been playing with Thread::Queue.
Once of the downsides of perl threading is that it's not particularly lightweight. Spawning lots of new threads to do a single task isn't a very efficient way of doing a task - especially if you have libraries imported, and large data tables.

So the method I've been playing with is queue oriented - spawn a number of threads equal to some arbitrary parallelism target - 1 per 'resource' consumed is a good bet (so for processor intensive stuff, one per processor - if you're doing remote access to 15 servers, one each).

And then implement a 'queue' which is a thread safe implementation of a FIFO queue (FIFO = First in, First out).

It uses the library Thread::Queue, so you include that at the start of your program. You don't actually strictly speaking need to be threading to use it though - there's other reasons to use a FIFO.

So as a sample:


#!/usr/bin/perl

use strict;
use warnings;

use threads;

use Thread::Queue;

my $worker_queue = Thread::Queue -> new;
my $QUEUE_END = "--::QUEUE_END::--"; #just a text pattern, that acts as a delimiter. 

sub worker_thread
{
  my $item = $worker_queue -> dequeue();
  until ( $item eq $QUEUE_END )
  {
    print threads -> self -> tid(), ": $item";
    sleep 1;
    $item = $worker_queue -> dequeue(); 
  }
}

my $thread1 = threads -> create ( \&worker_thread );
my $thread2 = threads -> create ( \&worker_thread );

for ( my $count = 0; $count < 100; $count++ )
{
  $worker_queue -> enqueue ( $count );
}

$worker_queue -> enqueue ( $QUEUE_END );
$worker_queue -> enqueue ( $QUEUE_END );

foreach my $thread ( threads -> list() )
{
  $thread -> join();
}




Fairly simple, but does allow for daisy chained processing (e.g. moving from one FIFO queue to the next).
The only slightly complicate part is in handling 'thread exiting'. I've taken to using an 'exit' signaler in the queue. (use an arbitrary pattern, and 'catch' when that occurs).
However the other possibility is in just using some kind of 'all done' shared variable, that you set once the queue is fully populated - because what you don't want to do is just assume that because the queue is empty, work is finished - because when you first start the thread, this might be the case, or perhaps if there's a dependency - or perhaps once the first items get 'dequeued' then the other threads might see an empty queue.

I've been using this mechanism to create a 'cascade' of tasks - run something on one (group of) server(s). Do a some processing. Run something based on the result on another server. This is well suited to queue style processing.
Similarly - because you're queue oriented, then it's also well suited to scaling up (or down) the parallelism. Such as when you're in a multi processor environment, for example - you may want to hog all the processors that are available, but you'll lose efficiency if you overdo it.

Tags: ,

Leave a comment