Atomic updates

From Rosetta Code
Revision as of 13:28, 1 March 2010 by rosettacode>Abu (Added PicoLisp)
Task
Atomic updates
You are encouraged to solve this task according to the task description, using any language you may know.

Define a data type consisting of a fixed number of 'buckets', each containing a nonnegative integer value, which supports operations to

  1. get the current value of any bucket
  2. remove a specified amount from one specified bucket and add it to another, preserving the total of all bucket values, and clamping the transferred amount to ensure the values remain nonnegative

In order to exercise this data type, create one set of buckets, and start three concurrent tasks:

  1. As often as possible, pick two buckets and make their values closer to equal.
  2. As often as possible, pick two buckets and arbitrarily redistribute their values.
  3. At whatever rate is convenient, display (by any means) the total value and, optionally, the individual values of each bucket.

The display task need not be explicit; use of e.g. a debugger or trace tool is acceptable provided it is simple to set up to provide the display.


This task is intended as an exercise in atomic operations. The sum of the bucket values must be preserved even if the two tasks attempt to perform transfers simultaneously, and a straightforward solution is to ensure that at any time, only one transfer is actually occurring — that the transfer operation is atomic.

Ada

<lang ada>with Ada.Text_IO; use Ada.Text_IO; with Ada.Numerics.Discrete_Random;

procedure Test_Updates is

  type Bucket_Index is range 1..13;
  package Random_Index is new Ada.Numerics.Discrete_Random (Bucket_Index);
  use Random_Index;
  type Buckets is array (Bucket_Index) of Natural;

  protected type Safe_Buckets is
     procedure Initialize (Value : Buckets);
     function Get (I : Bucket_Index) return Natural;
     procedure Transfer (I, J : Bucket_Index; Amount : Integer);
     function Snapshot return Buckets;
  private
     Data : Buckets := (others => 0);
  end Safe_Buckets;

  protected body Safe_Buckets is
     procedure Initialize (Value : Buckets) is
     begin
        Data := Value;
     end Initialize;

     function Get (I : Bucket_Index) return Natural is
     begin
        return Data (I);
     end Get;

     procedure Transfer (I, J : Bucket_Index; Amount : Integer) is
        Increment : constant Integer :=
           Integer'Max (-Data (J), Integer'Min (Data (I), Amount));
     begin
        Data (I) := Data (I) - Increment;
        Data (J) := Data (J) + Increment;
     end Transfer;

     function Snapshot return Buckets is
     begin
        return Data;
     end Snapshot;
  end Safe_Buckets;

  Data : Safe_Buckets;

  task Equalize;
  task Mess_Up;

  task body Equalize is
     Dice : Generator;
     I, J : Bucket_Index;
  begin
     loop
        I := Random (Dice);
        J := Random (Dice);
        Data.Transfer (I, J, (Data.Get (I) - Data.Get (J)) / 2);
     end loop;
  end Equalize;

  task body Mess_Up is
     Dice : Generator;
  begin
     loop
        Data.Transfer (Random (Dice), Random (Dice), 100);
     end loop;
  end Mess_Up;

begin

  Data.Initialize ((1,2,3,4,5,6,7,8,9,10,11,12,13));
  loop
     delay 1.0;
     declare
        State : Buckets := Data.Snapshot;
        Sum   : Natural := 0;
     begin
        for Index in State'Range loop
           Sum := Sum + State (Index);
           Put (Integer'Image (State (Index)));
        end loop;
        Put (" =" & Integer'Image (Sum));
        New_Line;
     end;
  end loop;

end Test_Updates;</lang> The array of buckets is a protected object which controls access to its state. The task Equalize averages pairs of buckets. The task Mess_Up moves content of one bucket to another. The main task performs monitoring of the buckets state. Sample output:

 18 0 0 0 36 16 0 0 0 2 0 19 0 = 91
 0 0 0 6 0 0 37 0 6 23 19 0 0 = 91
 1 0 7 66 4 0 0 4 0 0 0 0 9 = 91
 0 1 0 2 28 0 17 0 0 22 1 0 20 = 91
 2 0 0 11 0 37 17 0 0 0 8 0 16 = 91
 0 10 0 59 0 2 0 13 0 2 0 5 0 = 91
 0 1 0 10 0 0 0 0 0 0 80 0 0 = 91
 16 0 0 0 13 0 9 8 14 16 0 15 0 = 91
 0 1 2 0 1 0 42 1 0 42 2 0 0 = 91
 0 16 0 0 0 19 28 0 0 0 0 0 28 = 91
...

C

Translation of: C#
Works with: POSIX version .1-2001
Library: pthread

<lang c>#include <stdio.h>

  1. include <stdlib.h>
  2. include <stdbool.h>
  3. include <unistd.h>
  4. include <time.h>
  5. include <pthread.h>
  1. define N_BUCKETS 15

pthread_mutex_t bucket_mutex[N_BUCKETS]; int buckets[N_BUCKETS];

pthread_t equalizer; pthread_t randomizer;

void transfer_value(int from, int to, int howmuch) {

 bool swapped = false;
 if ( (from == to) || ( howmuch < 0 ) ||
      (from < 0 ) || (to < 0) || (from >= N_BUCKETS) || (to >= N_BUCKETS) ) return;
 
 if ( from > to ) {
   int temp1 = from;
   from = to;
   to = temp1;
   swapped = true;
   howmuch = -howmuch;
 }
 pthread_mutex_lock(&bucket_mutex[from]);
 pthread_mutex_lock(&bucket_mutex[to]);
 if ( howmuch > buckets[from] && !swapped )
   howmuch = buckets[from];
 if ( -howmuch > buckets[to] && swapped )
   howmuch = -buckets[to];
 
 buckets[from] -= howmuch;
 buckets[to] += howmuch;
 pthread_mutex_unlock(&bucket_mutex[from]);
 pthread_mutex_unlock(&bucket_mutex[to]);

}

void print_buckets() {

 int i;
 int sum=0;
 for(i=0; i < N_BUCKETS; i++) pthread_mutex_lock(&bucket_mutex[i]);
 for(i=0; i < N_BUCKETS; i++) {
   printf("%3d ", buckets[i]);
   sum += buckets[i];
 }
 printf("= %d\n", sum);
 for(i=0; i < N_BUCKETS; i++) pthread_mutex_unlock(&bucket_mutex[i]);

}

void *equalizer_start(void *t) {

 for(;;) {
   int b1 = rand()%N_BUCKETS;
   int b2 = rand()%N_BUCKETS;
   int diff = buckets[b1] - buckets[b2];
   if ( diff < 0 )
     transfer_value(b2, b1, -diff/2);
   else
     transfer_value(b1, b2, diff/2);
 }
 return NULL;

}

void *randomizer_start(void *t) {

 for(;;) {
   int b1 = rand()%N_BUCKETS;
   int b2 = rand()%N_BUCKETS;
   int diff = rand()%(buckets[b1]+1);
   transfer_value(b1, b2, diff);
 }
 return NULL;

}

int main() {

 int i, total=0;
 for(i=0; i < N_BUCKETS; i++) pthread_mutex_init(&bucket_mutex[i], NULL);
 for(i=0; i < N_BUCKETS; i++) {
   buckets[i] = rand() % 100;
   total += buckets[i];
   printf("%3d ", buckets[i]);
 }
 printf("= %d\n", total);
 // we should check if these succeeded
 pthread_create(&equalizer, NULL, equalizer_start, NULL);
 pthread_create(&randomizer, NULL, randomizer_start, NULL);
 for(;;) {
   sleep(1);
   print_buckets();
 }
 // we do not provide a "good" way to stop this run, so the following
 // is never reached indeed...
 for(i=0; i < N_BUCKETS; i++) pthread_mutex_destroy(bucket_mutex+i);
 return EXIT_SUCCESS;

}</lang>

C#

This C# implementation uses a class to hold the buckets and data associated with them. The ThreadSafeBuckets class implements thread-stability, and ensures that two threads cannot operate on the same data at the same time. Additionally, the class uses a seperate mutex for each bucket, allowing multiple operations to occur at once if they do not alter the same buckets.

<lang csharp>using System; //Rand class using System.Threading; //Thread, Mutex classes public class ThreadSafeBuckets {

   //This class is thread safe, and ensures that all operations on it are atomic.
   //Calling threads do not need to ensure safety.
   Random rand = new Random();
   int[] Buckets;
   Mutex[] Mutexes; //Mutexes for each bucket so they can lock individually
   public int BucketCount { get; private set; }
   public ThreadSafeBuckets(int bucketcount)
   {
       //Create buckets+mutexes and fill them with a random amount
       BucketCount = bucketcount;
       Buckets = new int[bucketcount];
       Mutexes = new Mutex[bucketcount];
       int startingtotal = 0;
       for (int i = 0; i < BucketCount; i++)
       {
           Mutexes[i] = new Mutex(false);
           Buckets[i] = rand.Next(30);
           startingtotal += Buckets[i];
       }
       //Print the starting total
       Console.WriteLine("Starting total: " + startingtotal);
   }
   public int GetBucketValue(int i)
   {
       return Buckets[i];
   }
   public void Transfer(int i, int j, int amount)
   {
       //Transfer amount from bucket i to bucket j
       bool swapped = false;
       if (i > BucketCount || j > BucketCount || i < 0 || j < 0 ||
           i == j || amount < 0)
           return;
       if (i > j)
       { //To prevent deadlock, always try to lock the lower bucket first
           int temp1 = i;
           i = j;
           j = temp1;
           swapped = true; //Make sure we still transfer in the right direction
           amount = -amount;
       }
       //Get the locks
       Mutexes[i].WaitOne();
       Mutexes[j].WaitOne();
       //Make sure don't transfer out more than what's in the bucket
       if (amount > Buckets[i] && !swapped)
           amount = Buckets[i];
       if (-amount > Buckets[j] && swapped)
           amount = -Buckets[j];
       //Do the transfer
       Buckets[i] -= amount;
       Buckets[j] += amount;
       //Release the locks
       Mutexes[i].ReleaseMutex();
       Mutexes[j].ReleaseMutex();
   }
   public void PrintBuckets()
   {
       int counter = 0;
       //Lock all the buckets in sequential order and print their contents
       for (int i = 0; i < BucketCount; i++)
       {
           Mutexes[i].WaitOne();
           Console.Write(Buckets[i] + " ");
           counter += Buckets[i];
       }
       //Print the bucket total, then unlock all the mutexes
       Console.Write("= " + counter);
       Console.WriteLine();
       for (int i = 0; i < BucketCount; i++)
       {
           Mutexes[i].ReleaseMutex();
       }
   }

}

class Program {

   static ThreadSafeBuckets TSBs;
   public static void Main(){
       //Create the thread-safe bucket list
       TSBs = new ThreadSafeBuckets(10);
       TSBs.PrintBuckets();
       //Create and start the Equalizing Thread
       new Thread(new ThreadStart(EqualizerThread)).Start();
       Thread.Sleep(1);
       //Create and start the Randamizing Thread
       new Thread(new ThreadStart(RandomizerThread)).Start();
       //Use this thread to do the printing
       PrinterThread();
   }
   //EqualizerThread runs on it's own thread and randomly averages two buckets
   static void EqualizerThread()
   {
       Random rand = new Random();
       while (true)
       {
           //Pick two buckets
           int b1 = rand.Next(TSBs.BucketCount);
           int b2 = rand.Next(TSBs.BucketCount);
           //Get the difference
           int diff = TSBs.GetBucketValue(b1) - TSBs.GetBucketValue(b2);
           //Transfer to equalize
           if (diff < 0)
               TSBs.Transfer(b2, b1, -diff / 2);
           else
               TSBs.Transfer(b1, b2, diff/2);
       }
   }
   //RandomizerThread redistributes the values between two buckets
   static void RandomizerThread()
   {
       Random rand = new Random();
       while (true)
       {
           int b1 = rand.Next(TSBs.BucketCount);
           int b2 = rand.Next(TSBs.BucketCount);
           int diff = rand.Next(TSBs.GetBucketValue(b1));
           TSBs.Transfer(b1, b2, diff);
       }
   }
   //PrinterThread prints the current bucket contents
   static void PrinterThread()
   {
       while (true)
       {
           Thread.Sleep(50); //Only print every few milliseconds to let the other threads work
           TSBs.PrintBuckets();
       }
   }

}</lang>

Sample Output:

Starting total: 156
15 15 12 27 6 21 19 18 16 7 = 156
17 13 15 15 18 14 18 15 14 17 = 156
12 9 22 15 9 8 23 10 16 32 = 156
0 6 28 4 21 10 28 11 34 14 = 156
35 14 30 11 32 1 26 4 3 0 = 156
11 17 19 1 18 1 12 35 26 16 = 156

Clojure

Function returning a new map containing altered values: <lang lisp>(defn xfer [m from to amt]

 (let [{f-bal from t-bal to} m
       f-bal (- f-bal amt)
       t-bal (+ t-bal amt)]
   (if (or (neg? f-bal) (neg? t-bal))
     (throw (IllegalArgumentException. "Call results in negative balance."))
     (assoc m from f-bal to t-bal))))</lang>

Since clojure data structures are immutable, atomic mutability occurs via a reference, in this case an atom: <lang lisp>(def *data* (atom {:a 100 :b 100})) ;; *data* is an atom holding a map (swap! *data* xfer :a :b 50) ;; atomically results in *data* holding {:a 50 :b 150}</lang> Now for the test: <lang lisp>(defn equalize [m a b]

 (let [{a-val a b-val b} m
       diff (- a-val b-val)
       amt (/ diff 2)]
   (xfer m a b amt)))
   

(defn randomize [m a b]

 (let [{a-val a b-val b} m
       min-val (min a-val b-val)
       amt (rand-int (- min-val) min-val)]
   (xfer m a b amt)))
 

(defn test-conc [f data a b n name]

 (dotimes [i n]
   (swap! data f a b)
   (println (str "total is " (reduce + (vals @data)) " after " name " iteration " i))))
   

(def thread-eq (Thread. #(test-conc equalize *data* :a :b 1000 "equalize"))) (def thread-rand (Thread. #(test-conc randomize *data* :a :b 1000 "randomize")))

(.start thread-eq) (.start thread-rand)</lang>

E

In E, any computation occurs in a particular vat. Over its lifetime, a vat executes many individual computations, turns, which are taken from a queue of pending events. The eventual send operator <- puts message-sends on the queue.

Since a vat executes only one turn at a time, each turn is atomic; since the below implementation of the transfer operation does not invoke any other code, the transfer operation is itself automatically atomic and will always preserve the total value provided that it does not have any bugs.

In this example, the tasks are in the same vat as the buckets, but it would be straightforward to write them to live in separate vats.

Works with: E-on-Java

This example uses a Java AWT window to display the current state of the buckets.

<lang e>#!/usr/bin/env rune pragma.syntax("0.9")

def pi := (-1.0).acos() def makeEPainter := <unsafe:com.zooko.tray.makeEPainter> def colors := <awt:makeColor>

  1. --------------------------------------------------------------
  2. --- Definitions

/** Execute 'task' repeatedly as long 'indicator' is unresolved. */ def doWhileUnresolved(indicator, task) {

 def loop() {
   if (!Ref.isResolved(indicator)) {
     task()
     loop <- ()
   }
 }
 loop <- ()

}

/** The data structure specified for the task. */ def makeBuckets(size) {

   def values := ([100] * size).diverge() # storage
   def buckets {
       to size() :int { return size }
       /** get current quantity in bucket 'i' */
       to get(i :int) { return values[i] }
       /** transfer 'amount' units, as much as possible, from bucket 'i' to bucket 'j'
           or vice versa if 'amount' is negative */
       to transfer(i :int, j :int, amount :int) {
           def amountLim := amount.min(values[i]).max(-(values[j]))
           values[i] -= amountLim
           values[j] += amountLim
       }
   }
   return buckets

}

/** A view of the current state of the buckets. */ def makeDisplayComponent(buckets) {

 def c := makeEPainter(def paintCallback {
   to paintComponent(g) {
     def pixelsW := c.getWidth()
     def pixelsH := c.getHeight()
     def bucketsW := buckets.size()
     g.setColor(colors.getWhite())
     g.fillRect(0, 0, pixelsW, pixelsH)
     
     g.setColor(colors.getDarkGray())
     var sum := 0
     for i in 0..!bucketsW {
       sum += def value := buckets[i]
       def x0 := (i       * pixelsW / bucketsW).floor()
       def x1 := ((i + 1) * pixelsW / bucketsW).floor()
       g.fillRect(x0 + 1, pixelsH - value,
                  x1 - x0 - 1, value)
     }
     
     g.setColor(colors.getBlack())
     g."drawString(String, int, int)"(`Total: $sum`, 2, 20)
   }
 })
 c.setPreferredSize(<awt:makeDimension>(500, 300))
 return c

}

  1. --------------------------------------------------------------
  2. --- Application setup

def buckets := makeBuckets(100) def done # Promise indicating when the window is closed

  1. Create the window

def frame := <unsafe:javax.swing.makeJFrame>("Atomic transfers") frame.setContentPane(def display := makeDisplayComponent(buckets)) frame.addWindowListener(def mainWindowListener {

 to windowClosing(event) :void {
   bind done := null
 }
 match _ {}

}) frame.setLocation(50, 50) frame.pack()

  1. --------------------------------------------------------------
  2. --- Tasks
  1. Neatens up buckets

var ni := 0 doWhileUnresolved(done, fn {

 def i := ni
 def j := (ni + 1) %% buckets.size()
 buckets.transfer(i, j, (buckets[i] - buckets[j]) // 4)
 ni := j

})

  1. Messes up buckets

var mi := 0 doWhileUnresolved(done, fn {

   def i := (mi + entropy.nextInt(3)) %% buckets.size()
   def j := (i + entropy.nextInt(3)) %% buckets.size() #entropy.nextInt(buckets.size())
   buckets.transfer(i, j, (buckets[i] / pi).floor())
   mi := j

})

  1. Updates display at fixed 10 Hz
  2. (Note: tries to catch up; on slow systems slow this down or it will starve the other tasks)

def clock := timer.every(100, def _(_) {

 if (Ref.isResolved(done)) { 
   clock.stop()
 } else {
   display.repaint()
 }

}) clock.start()

  1. --------------------------------------------------------------
  2. --- All ready, go visible and wait

frame.show() interp.waitAtTop(done)</lang>

F#

The Buckets class is thread safe and its private higher-order Lock function ensures that locks are taken out in order (to avoid deadlocks):

<lang fsharp> open System.Threading

type Buckets(n) =

 let rand = System.Random()
 let mutex = Array.init n (fun _ -> new Mutex())
 let bucket = Array.init n (fun _ -> 100)
 
 member this.Count = n
 
 member this.Item n = bucket.[n]
 
 member private this.Lock is k =
   let is = Seq.sort is
   for i in is do
     mutex.[i].WaitOne() |> ignore
   try k() finally
   for i in is do
     mutex.[i].ReleaseMutex()
 
 member this.Transfer i j d =
   if i <> j && d <> 0 then
     let i, j, d = if d > 0 then i, j, d else j, i, -d
     this.Lock [i; j] (fun () ->
       let d = min d bucket.[i]
       bucket.[i] <- bucket.[i] - d
       bucket.[j] <- bucket.[j] + d)
 
 member this.Read =
   this.Lock [0..n-1] (fun () -> Array.copy bucket)
 
 member this.Print() =
   let xs = this.Read
   printf "%A = %d\n" xs (Seq.sum xs)
 
 interface System.IDisposable with
   member this.Dispose() =
     for m in mutex do
       (m :> System.IDisposable).Dispose()

let transfers = ref 0 let max_transfers = 1000000

let rand_pair (rand: System.Random) n =

 let i, j = rand.Next n, rand.Next(n-1)
 i, if j<i then j else j+1

let equalizer (bucket: Buckets) () =

 let rand = System.Random()
 while System.Threading.Interlocked.Increment transfers < max_transfers do
   let i, j = rand_pair rand bucket.Count
   let d = (bucket.[i] - bucket.[j]) / 2
   if d > 0 then
     bucket.Transfer i j d
   else
     bucket.Transfer j i -d

let randomizer (bucket: Buckets) () =

 let rand = System.Random()
 while System.Threading.Interlocked.Increment transfers < max_transfers do
   let i, j = rand_pair rand bucket.Count
   let d = 1 + rand.Next bucket.[i]
   bucket.Transfer i j d

do

 let bucket = new Buckets(10)
 let equalizer = Thread(equalizer bucket)
 let randomizer = Thread(randomizer bucket)
 bucket.Print()
 equalizer.Start()
 randomizer.Start()
 while !transfers < max_transfers do
   Thread.Sleep 100
   bucket.Print()

</lang>

This program performs a million concurrent transfers. Typical output is:

<lang fsharp> [|100; 100; 100; 100; 100; 100; 100; 100; 100; 100|] = 1000 [|119; 61; 138; 115; 157; 54; 82; 58; 157; 59|] = 1000 [|109; 90; 78; 268; 55; 104; 91; 46; 105; 54|] = 1000 [|101; 75; 38; 114; 161; 160; 2; 234; 14; 101|] = 1000 [|104; 30; 114; 37; 32; 117; 50; 236; 127; 153|] = 1000 [|102; 32; 6; 55; 367; 69; 157; 80; 77; 55|] = 1000 [|211; 12; 319; 18; 11; 25; 73; 154; 154; 23|] = 1000 [|23; 373; 110; 108; 64; 33; 109; 8; 63; 109|] = 1000 [|72; 106; 174; 99; 115; 141; 98; 63; 123; 9|] = 1000 [|188; 67; 271; 30; 76; 134; 1; 74; 91; 68|] = 1000 [|2; 46; 240; 198; 63; 63; 113; 57; 136; 82|] = 1000 [|5; 151; 11; 191; 88; 236; 14; 0; 152; 152|] = 1000 [|162; 97; 102; 97; 122; 123; 0; 86; 84; 127|] = 1000 [|9; 11; 204; 50; 169; 206; 137; 26; 137; 51|] = 1000 [|175; 55; 157; 150; 116; 54; 10; 168; 114; 1|] = 1000 [|73; 85; 124; 3; 63; 62; 189; 115; 172; 114|] = 1000 [|112; 102; 253; 124; 39; 67; 197; 77; 20; 9|] = 1000 [|139; 172; 102; 1; 101; 64; 127; 55; 92; 147|] = 1000 [|54; 72; 130; 31; 99; 99; 130; 38; 186; 161|] = 1000 [|90; 0; 43; 46; 84; 335; 77; 79; 90; 156|] = 1000 [|20; 7; 128; 115; 24; 26; 128; 105; 240; 207|] = 1000 [|42; 79; 45; 60; 312; 37; 26; 61; 47; 291|] = 1000 [|176; 25; 10; 44; 126; 268; 78; 94; 46; 133|] = 1000 [|117; 153; 74; 63; 214; 44; 43; 93; 96; 103|] = 1000 [|56; 11; 106; 54; 1; 135; 174; 140; 174; 149|] = 1000 [|84; 153; 108; 77; 118; 140; 96; 102; 103; 19|] = 1000 [|59; 64; 85; 118; 215; 127; 42; 42; 120; 128|] = 1000 [|147; 95; 175; 116; 117; 0; 74; 116; 117; 43|] = 1000 [|131; 24; 128; 140; 45; 139; 155; 23; 68; 147|] = 1000 [|63; 184; 70; 24; 64; 84; 254; 14; 184; 59|] = 1000 [|119; 0; 234; 0; 98; 130; 94; 53; 99; 173|] = 1000 [|101; 0; 114; 129; 162; 176; 86; 84; 64; 84|] = 1000 [|95; 49; 57; 38; 73; 153; 276; 10; 147; 102|] = 1000 [|109; 182; 3; 147; 81; 107; 2; 142; 147; 80|] = 1000 [|45; 2; 103; 43; 103; 79; 65; 314; 57; 189|] = 1000 [|86; 86; 202; 47; 69; 11; 31; 246; 157; 65|] = 1000 [|82; 27; 107; 86; 106; 182; 64; 120; 82; 144|] = 1000 [|32; 158; 248; 50; 83; 109; 85; 16; 134; 85|] = 1000 [|49; 15; 246; 68; 69; 13; 219; 123; 130; 68|] = 1000 [|125; 133; 70; 23; 266; 30; 30; 44; 44; 235|] = 1000 [|18; 40; 174; 145; 146; 131; 62; 46; 138; 100|] = 1000 [|24; 128; 64; 104; 65; 109; 231; 101; 87; 87|] = 1000 [|107; 82; 40; 8; 133; 110; 180; 82; 102; 156|] = 1000 [|129; 122; 122; 52; 22; 143; 45; 49; 217; 99|] = 1000 [|15; 13; 71; 55; 55; 120; 115; 192; 192; 172|] = 1000 [|3; 95; 136; 76; 74; 37; 309; 44; 137; 89|] = 1000 [|14; 185; 47; 47; 97; 164; 180; 74; 98; 94|] = 1000 [|152; 145; 148; 83; 27; 35; 35; 77; 289; 9|] = 1000 [|78; 133; 147; 148; 83; 84; 142; 21; 141; 23|] = 1000 [|101; 63; 94; 168; 63; 90; 55; 94; 209; 63|] = 1000 [|73; 131; 182; 172; 130; 43; 102; 102; 5; 60|] = 1000 [|84; 61; 102; 9; 164; 175; 56; 4; 266; 79|] = 1000 [|89; 95; 29; 78; 200; 82; 152; 87; 101; 87|] = 1000 [|32; 33; 100; 7; 132; 75; 134; 234; 85; 168|] = 1000 [|197; 53; 81; 27; 1; 264; 100; 130; 34; 113|] = 1000 [|120; 198; 102; 51; 102; 64; 178; 45; 64; 76|] = 1000 [|208; 147; 18; 25; 178; 159; 23; 170; 36; 36|] = 1000 Press any key to continue . . . </lang>

Haskell

Works with: GHC

This uses MVar as its concurrency protection. An MVar is a container that may have a value or not; trying to take the value when it is absent blocks until a value is provided, at which point it is atomically taken again. modifyMVar_ is a shortcut to take the value, then put a modified value; readMVar takes the value and puts back the same value while returning it.

So, at any given time, the current value map is either in the MVar or being examined or replaced by one thread, but not both. The IntMap held by the MVar is a pure immutable data structure (adjust returns a modified version), so there is no problem from that the display task puts the value back before it is done printing.

<lang haskell>module AtomicUpdates (main) where

import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.MVar (MVar, newMVar, readMVar, modifyMVar_) import Control.Monad (forever, forM_) import Data.IntMap (IntMap, (!), toAscList, fromList, adjust) import System.Random (randomRIO) import Text.Printf (printf)


type Index = Int type Value = Integer data Buckets = Buckets Index (MVar (IntMap Value))

makeBuckets  :: Int -> IO Buckets size  :: Buckets -> Index currentValue  :: Buckets -> Index -> IO Value currentValues :: Buckets -> IO (IntMap Value) transfer  :: Buckets -> Index -> Index -> Value -> IO ()


makeBuckets n = do v <- newMVar (fromList [(i, 100) | i <- [1..n]])

                  return (Buckets n v)

size (Buckets n _) = n

currentValue (Buckets _ v) i = fmap (! i) (readMVar v) currentValues (Buckets _ v) = readMVar v

transfer b@(Buckets n v) i j amt | amt < 0 = transfer b j i (-amt)

                                | otherwise      = do
 modifyMVar_ v $ \map -> let amt' = min amt (map ! i)
                           in return $ adjust (subtract amt') i
                                     $ adjust (+        amt') j
                                     $ map

roughen, smooth, display :: Buckets -> IO ()

pick buckets = randomRIO (1, size buckets)

roughen buckets = forever loop where

 loop = do i <- pick buckets
           j <- pick buckets
           iv <- currentValue buckets i
           transfer buckets i j (iv `div` 3)

smooth buckets = forever loop where

 loop = do i <- pick buckets
           j <- pick buckets
           iv <- currentValue buckets i
           jv <- currentValue buckets j
           transfer buckets i j ((iv - jv) `div` 4)

display buckets = forever loop where

 loop = do threadDelay 1000000
           bmap <- currentValues buckets
           putStrLn (report $ map snd $ toAscList bmap)
 report list = "\nTotal: " ++ show (sum list) ++ "\n" ++ bars
   where bars = concatMap row $ map (*40) $ reverse [1..5]
         row lim = printf "%3d " lim ++ [if x >= lim then '*' else ' ' | x <- list] ++ "\n"

main = do buckets <- makeBuckets 100

         forkIO (roughen buckets)
         forkIO (smooth buckets)
         display buckets</lang>

Sample output:

Total: 10000
200       *           *                                   *                                             
160       *           *           *            *          *                *  *   *          *        * 
120 **   ** *  ***   ****   **    *   *    *   ** *    * **              * *  *   * *        *   *    **
 80 ***  ** ** ***** **** ******  ****** ***   ** **  ***** * * *****    * * **   * ***     **   *    **
 40 ********** ******************************* ***** ****** *******************  ******* ***************

Total: 10000
200                                   *                                                                 
160                *        *         *                         *     *                      *         *
120     *  **  *** *  *     **    *  **    *    ** * *    *  ** *   * *  * *    *   * **     *      * **
 80  ***** **  ********     ***   * *** ** **  *** * * ***** ****   ***  *** * ** *** ***  * *** *  * **
 40  ******** ******************  ************************************************************** *******

Oz

Uses a lock for every bucket. Enforces a locking order to avoid deadlocks.

<lang oz>declare

 %%
 %% INIT
 %%
 NBuckets = 100
 StartVal = 50
 ExpectedSum = NBuckets * StartVal
 %% Makes a tuple and calls Fun for every field
 fun {Make Label N Fun}
    R = {Tuple.make Label N}
 in
    for I in 1..N do R.I = {Fun} end
    R
 end
 
 Buckets = {Make buckets NBuckets fun {$} {Cell.new StartVal} end}
 Locks = {Make locks NBuckets Lock.new}
 LockList = {Record.toList Locks}
 %%
 %% DISPLAY
 %%
 proc {Display}
    Snapshot = {WithLocks LockList
                fun {$}
                   {Record.map Buckets Cell.access}
                end
               }
    Sum = {Record.foldL Snapshot Number.'+' 0}
 in
    {Print Snapshot}   
    {System.showInfo "  sum: "#Sum}
    Sum = ExpectedSum %% assert
 end
 %% Calls Fun with multiple locks locked and returns the result of Fun.
 fun {WithLocks Ls Fun}
    case Ls of L|Lr then
       lock L then
          {WithLocks Lr Fun}
       end
    [] nil then {Fun}
    end
 end
 %%
 %% MANIPULATE
 %%  
 proc {Smooth I J}
    Diff = @(Buckets.I) - @(Buckets.J) %% reading without lock: by design
    Amount = Diff div 4
 in
    {Transfer I J Amount}
 end
 proc {Roughen I J}
    Amount = @(Buckets.I) div 3 %% reading without lock: by design
 in
    {Transfer I J Amount}
 end
 %% Atomically transfer an amount from From to To.
 %% Negative amounts are allowed;
 %% will never make a bucket negative.
 proc {Transfer From To Amount}
    if From \= To then
       %% lock in order (to avoid deadlocks)
       Smaller = {Min From To}
       Bigger = {Max From To}
    in
       lock Locks.Smaller then
          lock Locks.Bigger then
             FromBucket = Buckets.From
             ToBucket = Buckets.To
             NewFromValue = @FromBucket - Amount
             NewToValue = @ToBucket + Amount
          in
             if NewFromValue >= 0 andthen NewToValue >= 0 then
                FromBucket := NewFromValue
                ToBucket := NewToValue
             end
          end
       end
    end
  end
 %% Returns a random bucket index.
 fun {Pick}
    {OS.rand} mod NBuckets + 1
 end

in

 %%
 %% START
 %%
 thread for do {Smooth {Pick} {Pick}} end end
 thread for do {Roughen {Pick} {Pick}} end end
 for do {Display} {Time.delay 50} end</lang>

Sample output: <lang oz>buckets(50 50 50 50 50 50 50 50 50 50 ,,,) sum: 5000 buckets(24 68 58 43 78 85 43 66 14 48 ,,,) sum: 5000 buckets(36 33 59 38 39 23 55 51 43 45 ,,,) sum: 5000 buckets(64 32 62 26 50 82 38 70 16 43 ,,,) sum: 5000 buckets(51 51 49 50 51 51 51 49 49 49 ,,,) sum: 5000 buckets(43 28 27 60 77 41 36 48 72 70 ,,,) sum: 5000 ...</lang>

PicoLisp

We use database objects (persistent symbols) for the buckets, and child processes to handle the tasks, as this is the standard way for general PicoLisp applications. <lang PicoLisp>(de *Buckets . 15) # Number of buckets

  1. E/R model

(class +Bucket +Entity) (rel key (+Key +Number)) # Key 1 .. *Buckets (rel val (+Number)) # Value 1 .. 999


  1. Start with an empty DB

(call 'rm "-f" "buckets.db") # Remove old DB (if any) (pool "buckets.db") # Create new DB file


  1. Create *Buckets buckets with values between 1 and 999

(for (K 1 (>= *Buckets K) (inc K))

  (new T '(+Bucket)  'key K  'val (rand 1 999)) )

(commit)


  1. Pick a random bucket

(de pickBucket ()

  (db 'key '+Bucket (rand 1 *Buckets)) )


  1. First process

(unless (fork)

  (seed *Pid)  # Ensure local random sequence
  (loop
     (let (B1 (pickBucket)  B2 (pickBucket))  # Pick two buckets 'B1' and 'B2'
        (dbSync)                              # Atomic DB operation
        (let (V1 (; B1 val)  V2 (; B2 val))   # Get current values
           (cond
              ((> V1 V2)
                 (dec> B1 'val)               # Make them closer to equal
                 (inc> B2 'val) )
              ((> V2 V1)
                 (dec> B2 'val)
                 (inc> B1 'val) ) ) )
        (commit 'upd) ) ) )                   # Close transaction
  1. Second process

(unless (fork)

  (seed *Pid)  # Ensure local random sequence
  (loop
     (let (B1 (pickBucket)  B2 (pickBucket))  # Pick two buckets 'B1' and 'B2'
        (unless (== B1 B2)                    # Found two different ones?
           (dbSync)                              # Atomic DB operation
           (let (V1 (; B1 val)  V2 (; B2 val))   # Get current values
              (cond
                 ((> V1 V2 0)
                    (inc> B1 'val)               # Redistribute them
                    (dec> B2 'val) )
                 ((> V2 V1 0)
                    (inc> B2 'val)
                    (dec> B1 'val) ) ) )
           (commit 'upd) ) ) ) )                 # Close transaction
  1. Third process

(unless (fork)

  (loop
     (dbSync)                         # Atomic DB operation
     (let Lst (collect 'key '+Bucket) # Get all buckets
        (for This Lst                 # Print current values
           (printsp (: val)) )
        (prinl                        # and total sum
           "-- Total: "
           (sum '((This) (: val)) Lst) ) )
     (rollback)
     (wait 2000) ) )                  # Sleep two seconds

(wait)</lang> Output:

70 236 582 30 395 215 525 653 502 825 129 769 722 440 708 -- Total: 6801
0 156 566 352 198 263 0 743 0 1316 58 1180 897 0 1072 -- Total: 6801
0 0 424 101 0 0 0 682 0 1809 0 1549 961 0 1275 -- Total: 6801
0 0 0 0 0 0 0 452 0 2226 0 1838 884 0 1401 -- Total: 6801
54 55 56 55 54 55 54 102 54 2363 54 1816 666 55 1308 -- Total: 6801
198 198 197 196 198 198 197 197 196 1903 197 1438 345 197 946 -- Total: 6801
342 344 343 344 344 342 344 343 343 1278 343 992 343 343 413 -- Total: 6801
^C

Python

Works with: Python version 2.5 and above

This code uses a threading.Lock to serialize access to the bucket set.

<lang python>from __future__ import with_statement # required for Python 2.5 import threading import random import time

terminate = threading.Event()

class Buckets:

   def __init__(self, nbuckets):
       self.nbuckets = nbuckets
       self.values = [random.randrange(10) for i in range(nbuckets)]
       self.lock = threading.Lock()
   def __getitem__(self, i):
       return self.values[i]
   def transfer(self, src, dst, amount):
       with self.lock:
           amount = min(amount, self.values[src])
           self.values[src] -= amount
           self.values[dst] += amount
   def snapshot(self):
       # copy of the current state (synchronized)
       with self.lock:
           return self.values[:]

def randomize(buckets):

   nbuckets = buckets.nbuckets
   while not terminate.isSet():
       src = random.randrange(nbuckets)
       dst = random.randrange(nbuckets)
       if dst!=src:
           amount = random.randrange(20)
           buckets.transfer(src, dst, amount)

def equalize(buckets):

   nbuckets = buckets.nbuckets
   while not terminate.isSet():
       src = random.randrange(nbuckets)
       dst = random.randrange(nbuckets)
       if dst!=src:
           amount = (buckets[src] - buckets[dst]) // 2
           if amount>=0: buckets.transfer(src, dst, amount)
           else: buckets.transfer(dst, src, -amount)

def print_state(buckets):

   snapshot = buckets.snapshot()
   for value in snapshot:
       print '%2d' % value,
   print '=', sum(snapshot)
  1. create 15 buckets

buckets = Buckets(15)

  1. the randomize thread

t1 = threading.Thread(target=randomize, args=[buckets]) t1.start()

  1. the equalize thread

t2 = threading.Thread(target=equalize, args=[buckets]) t2.start()

  1. main thread, display

try:

   while True:
       print_state(buckets)
       time.sleep(1)

except KeyboardInterrupt: # ^C to finish

   terminate.set()
  1. wait until all worker threads finish

t1.join() t2.join()</lang>

Sample Output:

 5  5 11  5  5  5  5  5  5  0  6  5  5  6  5 = 78
 9  0  0  0 20  5  0 21 10  0  0  8  5  0  0 = 78
 4  0  4 12  4  4  9  2 14  0 11  2  0 12  0 = 78
 5  5  6  5  5  5  6  5  6  5  5  5  5  5  5 = 78
 2  0  3  0  0  0  0  4 13  4  9  0  1  9 33 = 78
 0  0  0 22 11  0 13 12  0  0  0 20  0  0  0 = 78

Scala

<lang Scala> object AtomicUpdates {

 class Buckets(ns: Int*) {
   
   import scala.actors.Actor._
   val buckets = ns.toArray
   case class Get(index: Int)
   case class Transfer(fromIndex: Int, toIndex: Int, amount: Int)
   case object GetAll
   val handler = actor {
     loop {
       react {
         case Get(index) => reply(buckets(index))
         case Transfer(fromIndex, toIndex, amount) =>
           assert(amount >= 0)
           val actualAmount = Math.min(amount, buckets(fromIndex))
           buckets(fromIndex) -= actualAmount
           buckets(toIndex) += actualAmount
         case GetAll => reply(buckets.toList)
       }
     }
   }
   def get(index: Int): Int = (handler !? Get(index)).asInstanceOf[Int]
   def transfer(fromIndex: Int, toIndex: Int, amount: Int) = handler ! Transfer(fromIndex, toIndex, amount)
   def getAll: List[Int] = (handler !? GetAll).asInstanceOf[List[Int]]
 }
 def randomPair(n: Int): (Int, Int) = {
   import scala.util.Random._
   val pair = (nextInt(n), nextInt(n))
   if (pair._1 == pair._2) randomPair(n) else pair
 }
 def main(args: Array[String]) {
   import scala.actors.Scheduler._
   val buckets = new Buckets(List.range(1, 11): _*)
   val stop = new java.util.concurrent.atomic.AtomicBoolean(false)
   val latch = new java.util.concurrent.CountDownLatch(3)
   execute {
     while (!stop.get) {
       val (i1, i2) = randomPair(10)
       val (n1, n2) = (buckets.get(i1), buckets.get(i2))
       val m = (n1 + n2) / 2
       if (n1 < n2)
         buckets.transfer(i2, i1, n2 - m)
       else
         buckets.transfer(i1, i2, n1 - m)
     }
     latch.countDown
   }
   execute {
     while (!stop.get) {
       val (i1, i2) = randomPair(10)
       val n = buckets.get(i1)
       buckets.transfer(i1, i2, if (n == 0) 0 else scala.util.Random.nextInt(n))
     }
     latch.countDown
   }
   execute {
     for (i <- 1 to 20) {
       val all = buckets.getAll
       println(all.sum + ":" + all)
       Thread.sleep(500)
     }
     stop.set(true)
     latch.countDown
   }
   latch.await
   shutdown
 }

} </lang>

Tcl

In Tcl, you need to explicitly hold a mutex if you want to reliably access multiple shared variables; single shared variable accesses use a built-in lock.

Works with: Tcl version 8.5

<lang tcl>package require Thread package require Tk

  1. Make the shared state

canvas .c ;# So we can allocate the display lines in one loop set m [thread::mutex create] for {set i 0} {$i<100} {incr i} {

   set bucket b$i	;# A handle for every bucket...
   tsv::set buckets $bucket 50
   lappend buckets $bucket
   lappend lines [.c create line 0 0 0 0]

} tsv::set still going 1

  1. Make the "make more equal" task

lappend tasks [thread::create {

   # Perform an atomic update of two cells
   proc transfer {b1 b2 val} {

variable m thread::mutex lock $m set v [tsv::get buckets $b1] if {$val > $v} { set val $v } tsv::incr buckets $b1 [expr {-$val}] tsv::incr buckets $b2 $val thread::mutex unlock $m

   }
   # The task itself; we loop this round frequently
   proc task {mutex buckets} {

variable m $mutex b $buckets i 0 while {[tsv::get still going]} { set b1 [lindex $b $i] if {[incr i] == [llength $b]} {set i 0} set b2 [lindex $b $i]

if {[tsv::get buckets $b1] > [tsv::get buckets $b2]} { transfer $b1 $b2 1 } else { transfer $b1 $b2 -1 } }

   }
   thread::wait

}]

  1. Make the "mess things up" task

lappend tasks [thread::create {

   # Utility to pick a random item from a list
   proc pick list {

lindex $list [expr {int(rand() * [llength $list])}]

   }
   proc transfer {b1 b2 val} {

variable m thread::mutex lock $m set v [tsv::get buckets $b1] if {$val > $v} { set val $v } tsv::incr buckets $b1 [expr {-$val}] tsv::incr buckets $b2 $val thread::mutex unlock $m

   }
   # The task to move a large amount between two random buckets
   proc task {mutex buckets} {

variable m $mutex b $buckets while {[tsv::get still going]} { set b1 [pick $b] set b2 [pick $b] transfer $b1 $b2 [expr {[tsv::get buckets $b1] / 3}] }

   }
   thread::wait

}]

  1. The "main" task; we keep GUI operations in the main thread

proc redisplay {} {

   global m buckets lines
   thread::mutex lock $m
   set i 1
   foreach b $buckets l $lines {

.c coords $l $i 0 $i [tsv::get buckets $b] incr i 2

   }
   thread::mutex unlock $m
   after 100 redisplay

}

  1. Start tasks and display

.c configure -width 201 -height 120 pack .c redisplay foreach t $tasks {

   thread::send -async $t [list task $m $buckets]

}

  1. Wait for user to close window, then tidy up

tkwait window . tsv::set still going 0 thread::broadcast thread::exit</lang>