Devolve is a simple singleton class for a dynamic thread pool and associated job queue.
The constructor is automatically invoked by the first call to
instance() and
starts a listener thread that listens for worker connections on a configurable port. Client
threads can add jobs by calling add() at any time.
When a worker connects, the pool listener spawns a new proxy thread dedicated to that worker and resumes listening. All proxy threads run an infinite loop performing these steps within:
- Dequeue job from the job queue (blocking if the queue is empty).
- If what was just dequeued was the QUIT token, put it back on the queue so other proxy threads can see it, close socket and terminate.
- Otherwise, send job data to worker and wait for result.
- Put result string in job object and resume step 1.
Any number of workers may connect and process jobs; currently select() is used to listen for connections so that may limit the number of workers to 1024. When the select is replaced by epoll, this limit should go away.
Any client thread can terminate the pool by invoking
close();
this causes the pool listener to write the QUIT token to the job queue, wait for all
worker proxy threads to
end and then terminate; no new connections are accepted.
Jobs enqueued must be objects that respond to
get_work() and
put_result() methods. The first must return a string that
is sent to the worker; this can be a normal string or the result of marshalling an
object. The second should take a single argument which will be either:
- nil, indicating that the worker crashed or some unexpected problem was encountered in the protocol or a bug in the worker code; or
- the result string sent by the worker.
The put_result() method can re-enqueue the job if the
argument is nil; if non-nil, it can dispose of the result in any suitable way,
for example, unmarshal the result (if it is not a plain string), write it to a file or a
database, add it to objects, etc. The get_work() method
allows clients to delay fetching the actual data until it is about to be sent to a worker
without consuming memory while the job is in the queue. The application can decide whether
this string is a plain string or one that needs to be unmarshalled into an object by the
worker; the thread pool does not care.
Each worker must have a unique name, which should be a short string. Workers should do the following after connecting:
-
Send the name using
socket.puts. -
Send the process id using
socket.puts. -
Enter an infinite loop with these steps:
-
Read a short string using
socket.gets; if this string is the QUIT token, close the socket and terminate. Otherwise, convert it to an integer, say n, denoting the size (in bytes) of the data string to come. -
Read the data string of length n bytes (clients can decide whether this is a plain string or one that needs to be unmarshalled into an object; the thread pool does not care).
-
Process the input, wrap it in a
Resultobject, marshal that object and send it back to the boss; it will be passed unchanged to theput_result()method of the corresponding job object. Since the result string is a marshalledResultobject, it should never be nil.
-
The sample application under the example subdirectory
illustrates usage. It is intended to be run on Ubuntu Linux systems. Run the boss like
this:
cd example
ruby -w deb-pkg.rb
Now run as many workers as you have cores, one in each terminal window where the worker name should be different for each worker, w1, w2, etc.:
cd example
ruby -w worker.rb -n w1 -b localhost
Appending the -h option to either the boss or worker
invocation will display a short summary of available options.
On completion, you should see a short message indicating the number of package stanzas
processed and log files named boss.log,
w1.log, w2.log, etc.
Some performance numbers are given below.
The example application parses package descriptions on an Ubuntu Linux machine from the
file /var/lib/dpkg/status. Here are some details of a
couple of runs:
Data file: /var/lib/dpkg/status on an Ubuntu 12.04 system
with: 2239086 bytes, 51317 lines, 2184 package stanzas.
Machines: M1 = Intel Core i3 laptop, running Ubuntu 12.04; M2 = AMD Quad core Phenom desktop, running Ubuntu 10.04.
Each time is the average completion time of all the worker processes; this is a better indicator of performance than the completion time of the boss since the listener thread has a 30s socket timeout which makes timing unreliable. The times are also affected by the number of stanzas included in each job and controlled by the "-s" option; for example, worker completion times drop by half when we use "-s 8".
Column titles (except the first) are the number of workers.
| Machine | 1 | 2 | 3 | 4 |
|---|---|---|---|---|
| M1 | 44s | 22s | 15s | 11s |
| M2 | 32s | 16s | 11s | 8s |
There are of course other tools (such as Drb and EventMachine) but they seemed more geared towards web servers than the rather simple distributed computation I was interested in. Some scenarios in which I find 'devolve' useful:
- Generating large number of combinatorial objects such as graphs, permutations, etc. and farming out the analysis of those objects to worker processes.
- Parsing a large file (or set of files) such as the lists of Debian packages where the files are naturally separated into stanzas that can be independently parsed by worker processes.
I appreciate feedback, so if you have any questions, suggestions or patches, please send me email: amberarrow at gmail with the usual extension. Thanks.