Atomic updates: Difference between revisions

From Rosetta Code
Content added Content deleted
(omits)
m (Placeholder for C# implemtation to make editing easier)
Line 118: Line 118:
...
...
</pre>
</pre>

=={{header|C sharp|C#}}==



=={{header|E}}==
=={{header|E}}==

Revision as of 14:21, 28 May 2009

Task
Atomic updates
You are encouraged to solve this task according to the task description, using any language you may know.

Define a data type consisting of a fixed number of 'buckets', each containing an nonnegative integer value, which supports operations to

  1. get the current value of any bucket
  2. remove a specified amount from one specified bucket and add it to another, preserving the total of all bucket values, and clamping the transferred amount to ensure the values remain nonnegative

Create one set of buckets, and start three concurrent tasks:

  1. As often as possible, pick two buckets and make their values closer to equal.
  2. As often as possible, pick two buckets and arbitrarily redistribute their values.
  3. At whatever rate is convenient, display (by any means) the total value and, optionally, the individual values of each bucket.

The display task need not be explicit; use of e.g. a debugger or trace tool is acceptable provided it is simple to set up to provide the display.


This task is intended as an exercise in atomic operations. The sum of the bucket values must be preserved even if the two tasks attempt to perform transfers simultaneously, and a straightforward solution is to ensure that at any time, only one transfer is actually occurring — that the transfer operation is atomic.

Ada

<lang ada> with Ada.Text_IO; use Ada.Text_IO; with Ada.Numerics.Discrete_Random;

procedure Test_Updates is

  type Bucket_Index is range 1..13;
  package Random_Index is new Ada.Numerics.Discrete_Random (Bucket_Index);
  use Random_Index;
  type Buckets is array (Bucket_Index) of Natural;

  protected type Safe_Buckets is
     procedure Initialize (Value : Buckets);
     function Get (I : Bucket_Index) return Natural;
     procedure Transfer (I, J : Bucket_Index; Amount : Integer);
     function Snapshot return Buckets;
  private
     Data : Buckets := (others => 0);
  end Safe_Buckets;

  protected body Safe_Buckets is
     procedure Initialize (Value : Buckets) is
     begin
        Data := Value;
     end Initialize;

     function Get (I : Bucket_Index) return Natural is
     begin
        return Data (I);
     end Get;

     procedure Transfer (I, J : Bucket_Index; Amount : Integer) is
        Increment : constant Integer :=
           Integer'Max (-Data (J), Integer'Min (Data (I), Amount));
     begin
        Data (I) := Data (I) - Increment;
        Data (J) := Data (J) + Increment;
     end Transfer;

     function Snapshot return Buckets is
     begin
        return Data;
     end Snapshot;
  end Safe_Buckets;

  Data : Safe_Buckets;

  task Equalize;
  task Mess_Up;

  task body Equalize is
     Dice : Generator;
     I, J : Bucket_Index;
  begin
     loop
        I := Random (Dice);
        J := Random (Dice);
        Data.Transfer (I, J, (Data.Get (I) - Data.Get (J)) / 2);
     end loop;
  end Equalize;

  task body Mess_Up is
     Dice : Generator;
  begin
     loop
        Data.Transfer (Random (Dice), Random (Dice), 100);
     end loop;
  end Mess_Up;

begin

  Data.Initialize ((1,2,3,4,5,6,7,8,9,10,11,12,13));
  loop
     delay 1.0;
     declare
        State : Buckets := Data.Snapshot;
        Sum   : Natural := 0;
     begin
        for Index in State'Range loop
           Sum := Sum + State (Index);
           Put (Integer'Image (State (Index)));
        end loop;
        Put (" =" & Integer'Image (Sum));
        New_Line;
     end;
  end loop;

end Test_Updates; </lang> The array of buckets is a protected object which controls access to its state. The task Equalize averages pairs of buckets. The task Mess_Up moves content of one bucket to another. The main task performs monitoring of the buckets state. Sample output:

 18 0 0 0 36 16 0 0 0 2 0 19 0 = 91
 0 0 0 6 0 0 37 0 6 23 19 0 0 = 91
 1 0 7 66 4 0 0 4 0 0 0 0 9 = 91
 0 1 0 2 28 0 17 0 0 22 1 0 20 = 91
 2 0 0 11 0 37 17 0 0 0 8 0 16 = 91
 0 10 0 59 0 2 0 13 0 2 0 5 0 = 91
 0 1 0 10 0 0 0 0 0 0 80 0 0 = 91
 16 0 0 0 13 0 9 8 14 16 0 15 0 = 91
 0 1 2 0 1 0 42 1 0 42 2 0 0 = 91
 0 16 0 0 0 19 28 0 0 0 0 0 28 = 91
...

C#

E

In E, any computation occurs in a particular vat. Over its lifetime, a vat executes many individual computations, turns, which are taken from a queue of pending events. The eventual send operator <- puts message-sends on the queue.

Since a vat executes only one turn at a time, each turn is atomic; since the below implementation of the transfer operation does not invoke any other code, the transfer operation is itself automatically atomic and will always preserve the total value provided that it does not have any bugs.

In this example, the tasks are in the same vat as the buckets, but it would be straightforward to write them to live in separate vats.

Works with: E-on-Java

This example uses a Java AWT window to display the current state of the buckets.

<lang e>#!/usr/bin/env rune pragma.syntax("0.9")

def pi := (-1.0).acos() def makeEPainter := <unsafe:com.zooko.tray.makeEPainter> def colors := <awt:makeColor>

  1. --------------------------------------------------------------
  2. --- Definitions

/** Execute 'task' repeatedly as long 'indicator' is unresolved. */ def doWhileUnresolved(indicator, task) {

 def loop() {
   if (!Ref.isResolved(indicator)) {
     task()
     loop <- ()
   }
 }
 loop <- ()

}

/** The data structure specified for the task. */ def makeBuckets(size) {

   def values := ([100] * size).diverge() # storage
   def buckets {
       to size() :int { return size }
       /** get current quantity in bucket 'i' */
       to get(i :int) { return values[i] }
       /** transfer 'amount' units, as much as possible, from bucket 'i' to bucket 'j'
           or vice versa if 'amount' is negative */
       to transfer(i :int, j :int, amount :int) {
           def amountLim := amount.min(values[i]).max(-(values[j]))
           values[i] -= amountLim
           values[j] += amountLim
       }
   }
   return buckets

}

/** A view of the current state of the buckets. */ def makeDisplayComponent(buckets) {

 def c := makeEPainter(def paintCallback {
   to paintComponent(g) {
     def pixelsW := c.getWidth()
     def pixelsH := c.getHeight()
     def bucketsW := buckets.size()
     g.setColor(colors.getWhite())
     g.fillRect(0, 0, pixelsW, pixelsH)
     
     g.setColor(colors.getDarkGray())
     var sum := 0
     for i in 0..!bucketsW {
       sum += def value := buckets[i]
       def x0 := (i       * pixelsW / bucketsW).floor()
       def x1 := ((i + 1) * pixelsW / bucketsW).floor()
       g.fillRect(x0 + 1, pixelsH - value,
                  x1 - x0 - 1, value)
     }
     
     g.setColor(colors.getBlack())
     g."drawString(String, int, int)"(`Total: $sum`, 2, 20)
   }
 })
 c.setPreferredSize(<awt:makeDimension>(500, 300))
 return c

}

  1. --------------------------------------------------------------
  2. --- Application setup

def buckets := makeBuckets(100) def done # Promise indicating when the window is closed

  1. Create the window

def frame := <unsafe:javax.swing.makeJFrame>("Atomic transfers") frame.setContentPane(def display := makeDisplayComponent(buckets)) frame.addWindowListener(def mainWindowListener {

 to windowClosing(event) :void {
   bind done := null
 }
 match _ {}

}) frame.setLocation(50, 50) frame.pack()

  1. --------------------------------------------------------------
  2. --- Tasks
  1. Neatens up buckets

var ni := 0 doWhileUnresolved(done, fn {

 def i := ni
 def j := (ni + 1) %% buckets.size()
 buckets.transfer(i, j, (buckets[i] - buckets[j]) // 4)
 ni := j

})

  1. Messes up buckets

var mi := 0 doWhileUnresolved(done, fn {

   def i := (mi + entropy.nextInt(3)) %% buckets.size()
   def j := (i + entropy.nextInt(3)) %% buckets.size() #entropy.nextInt(buckets.size())
   buckets.transfer(i, j, (buckets[i] / pi).floor())
   mi := j

})

  1. Updates display at fixed 10 Hz
  2. (Note: tries to catch up; on slow systems slow this down or it will starve the other tasks)

def clock := timer.every(100, def _(_) {

 if (Ref.isResolved(done)) { 
   clock.stop()
 } else {
   display.repaint()
 }

}) clock.start()

  1. --------------------------------------------------------------
  2. --- All ready, go visible and wait

frame.show() interp.waitAtTop(done)</lang>

Tcl

In Tcl, you need to explicitly hold a mutex if you want to reliably access multiple shared variables; single shared variable accesses use a built-in lock.

Works with: Tcl version 8.5

<lang tcl>package require Thread package require Tk

  1. Make the shared state

canvas .c ;# So we can allocate the display lines in one loop set m [thread::mutex create] for {set i 0} {$i<100} {incr i} {

   set bucket b$i	;# A handle for every bucket...
   tsv::set buckets $bucket 50
   lappend buckets $bucket
   lappend lines [.c create line 0 0 0 0]

} tsv::set still going 1

  1. Make the "make more equal" task

lappend tasks [thread::create {

   # Perform an atomic update of two cells
   proc transfer {b1 b2 val} {

variable m thread::mutex lock $m set v [tsv::get buckets $b1] if {$val > $v} { set val $v } tsv::incr buckets $b1 [expr {-$val}] tsv::incr buckets $b2 $val thread::mutex unlock $m

   }
   # The task itself; we loop this round frequently
   proc task {mutex buckets} {

variable m $mutex b $buckets i 0 while {[tsv::get still going]} { set b1 [lindex $b $i] if {[incr i] == [llength $b]} {set i 0} set b2 [lindex $b $i]

if {[tsv::get buckets $b1] > [tsv::get buckets $b2]} { transfer $b1 $b2 1 } else { transfer $b1 $b2 -1 } }

   }
   thread::wait

}]

  1. Make the "mess things up" task

lappend tasks [thread::create {

   # Utility to pick a random item from a list
   proc pick list {

lindex $list [expr {int(rand() * [llength $list])}]

   }
   proc transfer {b1 b2 val} {

variable m thread::mutex lock $m set v [tsv::get buckets $b1] if {$val > $v} { set val $v } tsv::incr buckets $b1 [expr {-$val}] tsv::incr buckets $b2 $val thread::mutex unlock $m

   }
   # The task to move a large amount between two random buckets
   proc task {mutex buckets} {

variable m $mutex b $buckets while {[tsv::get still going]} { set b1 [pick $b] set b2 [pick $b] transfer $b1 $b2 [expr {[tsv::get buckets $b1] / 3}] }

   }
   thread::wait

}]

  1. The "main" task; we keep GUI operations in the main thread

proc redisplay {} {

   global m buckets lines
   thread::mutex lock $m
   set i 1
   foreach b $buckets l $lines {

.c coords $l $i 0 $i [tsv::get buckets $b] incr i 2

   }
   thread::mutex unlock $m
   after 100 redisplay

}

  1. Start tasks and display

.c configure -width 201 -height 120 pack .c redisplay foreach t $tasks {

   thread::send -async $t [list task $m $buckets]

}

  1. Wait for user to close window, then tidy up

tkwait window . tsv::set still going 0 thread::broadcast thread::exit</lang>