Synchronous concurrency: Difference between revisions

From Rosetta Code
Content added Content deleted
(Clarified)
Line 233: Line 233:
writer.join()
writer.join()
</pre>
</pre>

Alernatively the ''write()'' function here could be replaced with a '''''Writer''''' class like:

class Writer(object):
def __init__(self, filehandle, queue):
self.linecount = 0
if not hasattr(queue, 'get'):
raise TypeError, 'filehandle must support "write()" method'
else:
self.file = filehandle
if not hasattr(queue, 'get'):
raise TypeError, 'Queue handle must support "get()" method'
self.queue = queue
def __call__(self):
while True:
line = self.queue.get()
if line is None:
break
self.file.write(line)
self.linecount += 1



... which keep the "linecount" attribute encapsulated to allow the main code path to access it separately using something like:

write = Writer(sys.stdout, lines)
reader = Thread(target=read, args=(open('input.txt'),))
writer = Thread(target=write)
reader.start()
writer.start()
reader.join()
writer.join()
print "Line count: ", write.linecount

... (Though this also requires that we remove the final ''print'' statement from the ''read()'' function --- otherwise the reader() thread won't "join" because of the last item remaining it the "count" queue).

In general it's cleaner to use the Queue objections for inter-thread communications in lieu of explicit, error prone and complicated locking. Python Queue objects are coherent thread-safe "producer-consumer" pipelines which are suitable for any combination of single or multiple producer and consumer threads. Since objects of any sort can be passed through a Queue it would be trivial to encapsulate each line read in an object specifying a target file object along with the data to be written. The ''Writer'' instance could then count each of these as it called something like "line.write(line.data)" (for example). Obviously the ''read()'' function in the original example could also be replaced with a class which could allow it to maintain any desired state or implement additional behavior.

Revision as of 01:10, 23 October 2007

Task
Synchronous concurrency
You are encouraged to solve this task according to the task description, using any language you may know.

The goal of this task is to create two concurrent activities ("Threads" or "Tasks", not processes.) that share data synchronously. Your language may provide syntax or libraries to perform concurrency. Different languages provide different implementations of concurrency, often with different names. Some languages use the term threads, others use the term tasks, while others use co-processes. This task should not be implemented using fork, spawn, or the linux/unix/Win32 pipe command, as communication should be between threads, not processes.

One of the concurrent units will read from a file named "input.txt" and send the contents of that file, one line at a time, to the other concurrent unit, which will print the line it receives to standard output. The printing unit must count the number of lines it prints. After the concurrent unit reading the file sends its last line to the printing unit, the reading unit will request the number of lines printed by the printing unit. The reading unit will then print the number of lines printed by the printing unit.

This task requires two-way communication between the concurrent units. All concurrent units must cleanly terminate at the end of the program.

Ada

Compiler: GNAT GPL 2006

This Ada example starts by creating a package defining a single instance of a printing task. Ada requires packages to be separated into two parts. The package specification defines the interface to all public members of the package.

package Synchronous_Concurrent is
   task Printer is
      entry Put(Item : in String);
      entry Get_Count(Count : out Natural);
   end Printer;
end Synchronous_Concurrent;

The package body contains the implementation of all the subprograms and tasks defined in the specification.

with Ada.Text_Io; use Ada.Text_Io;
with Ada.Strings.Unbounded; use Ada.Strings.Unbounded; 

package body Synchronous_Concurrent is 

   task body Printer is
      Num_Iter : Natural := 0;
      Line     : Unbounded_String;
   begin
      loop
         select
            accept Put(Item : in String) do
               Line := To_Unbounded_String(Item);
            end Put;
            Put_Line(To_String(Line));
            Num_Iter := Num_Iter + 1;
         or
            accept Get_Count(Count : out Natural) do
               Count := Num_Iter;
            end Get_Count;
         or terminate;
         end select;
      end loop;
   end Printer;

end Synchronous_Concurrent;

Note that the task body contains an accept block for each entry defined in the task specification. When some other task calls an entry in the Printer task the communication between the tasks is synchronized.

This example uses an infinite loop in the printer task. There is no way to know ahead of time how many lines the printer task will need to print. Each iteration through the loop causes the task to execute a selective accept. That means that it can either accept a call on the Put entry, or it can accept a call on the Get_Count entry. The terminate option is execute only when the program contains no more tasks that can call the entries in the Printer task. If no task has called either entry the Printer task will wait for a task to call one of the entries, or for the terminate option to apply.

The next file contains the main procedure for this program. The main or entry-point procedure for a program always runs in the environment task. For this program, the environment task is takes on the role of the file reading concurrent unit while the Printer task takes on the role of the printing concurrent unit.

with Synchronous_Concurrent; use Synchronous_Concurrent;
with Ada.Text_Io; use Ada.Text_Io;

procedure Synchronous_Concurrent_Main is
   Num_Strings : Natural;
   The_File : File_Type;
   Line : String(1..255);
   Length : Natural;
begin
   Open(File => The_File, Mode => In_File, 
      Name => "input.txt");
   while not End_Of_File(The_File) loop
      Get_Line(File => The_File, Item => Line, Last => Length);
      Printer.Put(Line(1..Length));
   end loop;
   Close(The_File);
   Printer.Get_Count(Num_Strings);
   New_Line;
   Put_Line("The task wrote" & Natural'Image(Num_Strings) & " strings.");
end Synchronous_Concurrent_Main;

In this example only the environment task can call the entries on the Printer task. When the environment task completes the terminate option of the Printer task applies, terminating the Printer task. The environment task completes right after printing the number of lines sent through the Printer task. Because of the terminate option, the Printer task terminates just after the enviroment task prints the count.

E

def printer := {
    var count := 0
    def printer {
        to run(item) {
            count += 1
            println(item)
        }
        to getCount() {
            return count
        }
    }
}
def sender(lines) {
    switch (lines) {
        match [] {
            when (def count := printer <- getCount()) -> {
                println(`$count lines were printed.`)
            }
        }
        match [line] + rest {
            when (printer <- run(line)) -> {
                sender(rest)
            }
        }
    }
}
# Stream IO in E is not finished yet, so this example just uses a list.
sender(<file:input.txt>.getText().split("\n"))

Haskell

The following Haskell code uses simple MVars for thread communication. While the GHC libraries for concurrency give quite a wide design space for thread communication, I felt that the following was fairly reasonable.

For those who are unaware of MVars, they are essentially mutable cells which may be empty or hold a single value, and which have the following important properties:

  • takeMVar will get the contents of an MVar when it is full, emptying it.
  • takeMVar will block if the MVar is empty, until it has been filled by another thread.
  • putMVar will fill an empty MVar with a given value.
  • putMVar will block until the MVar is empty if it is full.

So MVars are essentially bounded channels which hold a maximum of one element at a time.

The code below defines various signals in terms of takeMVar and putMVar and then passes those to the parts of the code which should be permitted to use them. Note that this way, it is impossible for the reader process to take the current line, for example.

import Control.Concurrent
import Control.Concurrent.MVar

main =
    do lineVar <- newEmptyMVar
       countVar <- newEmptyMVar

       let takeLine  = takeMVar lineVar
           putLine   = putMVar lineVar . Just
           putEOF    = putMVar lineVar Nothing
           takeCount = takeMVar countVar
           putCount  = putMVar countVar

       forkIO $ writer takeLine putCount
       reader putLine putEOF takeCount

The reader simply reads the file lazily, applying putLine to each of the lines in turn, which blocks until the writer has taken the line. It then signals that it is finished with putEOF, and then takes the count and prints it.

reader putLine putEOF takeCount =
    do ls <- fmap lines (readFile "input.txt")
       mapM putLine ls
       putEOF
       n <- takeCount
       print n

The writer gets the lines in a loop with takeLine until it receives Nothing, at which point it uses putCount to tell the reader how many lines there were.

writer takeLine putCount = loop 0
  where loop n = do l <- takeLine
                    case l of 
                       Just x  -> do putStrLn x
                                     loop (n+1)
                       Nothing -> putCount n

Perl

use threads;
use Thread::Queue qw();

my $q1 = Thread::Queue->new;
my $q2 = Thread::Queue->new;

my $reader = threads->create(sub {
    my $q1 = shift;
    my $q2 = shift;

    open my $fh, '<', 'input.txt';
    $q1->enqueue($_) while <$fh>;
    close $fh;
    $q1->enqueue(undef);

    print $q2->dequeue;
}, $q1, $q2);

my $printer = threads->create(sub {
    my $q1 = shift;
    my $q2 = shift;

    my $count;
    while (my $line = $q1->dequeue) {
        print $line;
        $count++;
    };

    $q2->enqueue($count);
}, $q1, $q2);

$reader->join;
$printer->join;

Python

Notes: instead of hardcoding the input and output files in the units, each unit is created with a file and read or write the given file.

import sys
from Queue import Queue
from threading import Thread

lines = Queue(1)
count = Queue(1)

def read(file):
    try:
        for line in file:
            lines.put(line)
    finally:
        lines.put(None)
    print count.get()

def write(file):
    n = 0
    while 1:
        line = lines.get()
        if line is None:
            break
        file.write(line)
        n += 1
    count.put(n)

reader = Thread(target=read, args=(open('input.txt'),))
writer = Thread(target=write, args=(sys.stdout,))
reader.start()
writer.start()
reader.join()
writer.join()

Alernatively the write() function here could be replaced with a Writer class like:

class Writer(object):
   def __init__(self, filehandle, queue):
        self.linecount = 0
        if not hasattr(queue, 'get'):
            raise TypeError, 'filehandle must support "write()" method'
        else:
            self.file = filehandle
        if not hasattr(queue, 'get'):
            raise TypeError, 'Queue handle must support "get()" method'
        self.queue = queue
   def __call__(self):
        while True:
            line = self.queue.get()
            if line is None:
                break
            self.file.write(line)
            self.linecount += 1


... which keep the "linecount" attribute encapsulated to allow the main code path to access it separately using something like:

write = Writer(sys.stdout, lines)
reader = Thread(target=read, args=(open('input.txt'),))
writer = Thread(target=write)
reader.start()
writer.start()
reader.join()
writer.join()
print "Line count: ", write.linecount

... (Though this also requires that we remove the final print statement from the read() function --- otherwise the reader() thread won't "join" because of the last item remaining it the "count" queue).

In general it's cleaner to use the Queue objections for inter-thread communications in lieu of explicit, error prone and complicated locking. Python Queue objects are coherent thread-safe "producer-consumer" pipelines which are suitable for any combination of single or multiple producer and consumer threads. Since objects of any sort can be passed through a Queue it would be trivial to encapsulate each line read in an object specifying a target file object along with the data to be written. The Writer instance could then count each of these as it called something like "line.write(line.data)" (for example). Obviously the read() function in the original example could also be replaced with a class which could allow it to maintain any desired state or implement additional behavior.