Atomic updates: Difference between revisions

m
m (→‎{{header|Wren}}: Minor tidy)
 
(43 intermediate revisions by 24 users not shown)
Line 1:
{{task|Concurrency}}{{requires|Concurrency}}
{{requires|Concurrency}}
 
;Task:
Define a data type consisting of a fixed number of 'buckets', each containing a nonnegative integer value, which supports operations to
Define a data type consisting of a fixed number of 'buckets', each containing a nonnegative integer value, which supports operations to:
# get the current value of any bucket
# remove a specified amount from one specified bucket and add it to another, preserving the total of all bucket values, and [[wp:Clamping (graphics)|clamping]] the transferred amount to ensure the values remain nonnegativenon-negative
 
----
Line 12 ⟶ 14:
# At whatever rate is convenient, display (by any means) the total value and, optionally, the individual values of each bucket.
 
<br>
The display task need not be explicit; use of e.g. a debugger or trace tool is acceptable provided it is simple to set up to provide the display.
 
----
 
This task is intended as an exercise in ''atomic'' operations. &nbsp; The sum of the bucket values must be preserved even if the two tasks attempt to perform transfers simultaneously, and a straightforward solution is to ensure that at any time, only one transfer is actually occurring — that the transfer operation is ''atomic''.
<br><br>
 
=={{header|8th}}==
<syntaxhighlight lang="forth">var bucket
var bucket-size
 
\ The 'bucket' will be a simple array of some values:
: genbucket \ n --
a:new swap
(
\ make a random int up to 1000
rand-pcg n:abs 1000 n:mod
a:push
) swap times
bucket ! ;
 
\ display bucket and its total:
: .bucket
bucket lock @
dup . space
' n:+ 0 a:reduce . cr
bucket unlock drop ;
 
\ Get current value of bucket #x
: bucket@ \ n -- bucket[n]
bucket @
swap a:@ nip ;
 
\ Transfer x from bucket n to bucket m
: bucket-xfer \ m n x --
>r bucket @
\ m n bucket
over a:@ r@ n:-
rot swap a:!
\ m bucket
over a:@ r> n:+
rot swap a:!
drop ;
 
\ Get two random indices to check (ensure they're not the same):
: pick2
rand-pcg n:abs bucket-size @ n:mod dup >r
repeat
drop
rand-pcg n:abs bucket-size @ n:mod
r@ over n:=
while!
r> ;
 
\ Pick two buckets and make them more equal (by a quarter of their difference):
: make-equal
repeat
pick2
bucket lock @
third a:@ >r
over a:@ r> n:-
\ if they are equal, do nothing
dup not if
\ equal, so do nothing
drop -rot 2drop
else
4 n:/ n:int
>r -rot r>
bucket-xfer
then
drop
bucket unlock drop
again ;
 
\ Moves a quarter of the smaller value from one (random) bucket to another:
: make-redist
repeat
pick2 bucket lock @
\ n m bucket
over a:@ >r \ n m b b[m]
third a:@ r> \ n m b b[n]
n:min 4 n:/ n:int
nip bucket-xfer
 
bucket unlock drop
again ;
 
: app:main
\ create 10 buckets with random positive integer values:
10 genbucket bucket @ a:len bucket-size ! drop
 
\ print the bucket
.bucket
 
\ the problem's tasks:
' make-equal t:task
' make-redist t:task
 
\ the print-the-bucket task. We'll do it just 10 times and then quit:
( 1 sleep .bucket ) 10 times
bye ;</syntaxhighlight>
{{out}}<pre>[941,654,311,605,332,822,62,658,9,348] 4742
[289,98,710,698,183,490,675,688,793,118] 4742
[269,51,141,11,3,1284,1371,436,344,832] 4742
[1097,229,1097,307,421,25,85,676,188,617] 4742
[503,475,459,467,458,477,451,488,460,504] 4742
[480,498,484,460,481,464,467,488,481,439] 4742
[442,491,511,446,540,487,424,489,524,388] 4742
[3,306,114,88,185,366,2331,202,1138,9] 4742
[312,187,212,616,698,790,551,572,568,236] 4742
[473,474,475,474,474,473,476,475,473,475] 4742
[466,457,448,468,454,501,479,490,469,510] 4742
</pre>
 
=={{header|Ada}}==
<langsyntaxhighlight lang="ada">with Ada.Text_IO; use Ada.Text_IO;
with Ada.Numerics.Discrete_Random;
Line 103 ⟶ 214:
end;
end loop;
end Test_Updates;</langsyntaxhighlight>
The array of buckets is a protected object which controls access to its state. The task Equalize averages pairs of buckets. The task Mess_Up moves content of one bucket to another. The main task performs monitoring of the buckets state. Sample output:
<pre>
Line 120 ⟶ 231:
 
=={{header|AutoHotkey}}==
<langsyntaxhighlight AutoHotkeylang="autohotkey">Bucket := [], Buckets := 10, Originaltotal = 0
loop, %Buckets% {
Random, rnd, 0,50
Line 160 ⟶ 271:
Res.= ">"
return Res
}</langsyntaxhighlight>
 
=={{header|BBC BASIC}}==
{{works with|BBC BASIC for Windows}}
The BBC BASIC interpreter is single-threaded so the 'concurrent' tasks are implemented by timer events. In this context an 'atomic' update means one which takes place within a single BASIC statement, so it cannot be 'interrupted'. Two (or more) buckets can be updated atomically by making them RETURN parameters of a procedure.
<langsyntaxhighlight lang="bbcbasic"> INSTALL @lib$+"TIMERLIB"
DIM Buckets%(100)
Line 216 ⟶ 327:
PROC_killtimer(tid1%)
PROC_killtimer(tid2%)
ENDPROC</langsyntaxhighlight>
 
=={{header|C}}==
Line 224 ⟶ 335:
 
{{libheader|pthread}}
<langsyntaxhighlight lang="c">#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
Line 334 ⟶ 445:
for(i=0; i < N_BUCKETS; i++) pthread_mutex_destroy(bucket_mutex+i);
return EXIT_SUCCESS;
}</langsyntaxhighlight>
 
===With OpenMP===
Compiled with <code>gcc -std=c99 -fopenmp</code>. The <code>#pragma omp critical</code> ensures the following block is entered by one thread at a time.
<langsyntaxhighlight Clang="c">#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
Line 388 ⟶ 499:
 
return 0;
}</langsyntaxhighlight>Output:<syntaxhighlight lang="text">1000 1000 1000 1798 1000 1000 1000 1000 202 1000 Sum: 10000
595 800 2508 2750 470 1209 283 314 601 470 Sum: 10000
5 521 3339 1656 351 1038 1656 54 508 872 Sum: 10000
Line 395 ⟶ 506:
.
752 490 385 2118 1503 508 384 509 1110 2241 Sum: 10000
752 823 385 2118 1544 508 10 509 1110 2241 Sum: 10000</langsyntaxhighlight>
 
=={{header|C sharp|C#}}==
This C# implementation uses a class to hold the buckets and data associated with them. The ThreadSafeBuckets class implements thread-stability, and ensures that two threads cannot operate on the same data at the same time. Additionally, the class uses a seperate mutex for each bucket, allowing multiple operations to occur at once if they do not alter the same buckets.
 
Line 404 ⟶ 515:
- The previous implementation tracked a "swapped" state - which seems a harder way to tackle the problem. You need to acquire the locks in the correct order, not swap i and j
 
<langsyntaxhighlight lang="csharp">
using System; //Rand class
using System.Threading; //Thread, Mutex classes
Line 529 ⟶ 640:
}
}
}</langsyntaxhighlight>
 
Sample Output:
Line 541 ⟶ 652:
11 17 19 1 18 1 12 35 26 16 = 156
</pre>
 
=={{header|C++}}==
{{trans|C}}
 
{{works with|C++11}}
<syntaxhighlight lang="cpp">#include <algorithm>
#include <array>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>
 
using namespace std;
 
constexpr int bucket_count = 15;
 
void equalizer(array<int, bucket_count>& buckets,
array<mutex, bucket_count>& bucket_mutex) {
random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> dist_bucket(0, bucket_count - 1);
 
while (true) {
int from = dist_bucket(gen);
int to = dist_bucket(gen);
if (from != to) {
lock_guard<mutex> lock_first(bucket_mutex[min(from, to)]);
lock_guard<mutex> lock_second(bucket_mutex[max(from, to)]);
int diff = buckets[from] - buckets[to];
int amount = abs(diff / 2);
if (diff < 0) {
swap(from, to);
}
buckets[from] -= amount;
buckets[to] += amount;
}
}
}
 
void randomizer(array<int, bucket_count>& buckets,
array<mutex, bucket_count>& bucket_mutex) {
random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> dist_bucket(0, bucket_count - 1);
 
while (true) {
int from = dist_bucket(gen);
int to = dist_bucket(gen);
if (from != to) {
lock_guard<mutex> lock_first(bucket_mutex[min(from, to)]);
lock_guard<mutex> lock_second(bucket_mutex[max(from, to)]);
uniform_int_distribution<> dist_amount(0, buckets[from]);
int amount = dist_amount(gen);
buckets[from] -= amount;
buckets[to] += amount;
}
}
}
 
void print_buckets(const array<int, bucket_count>& buckets) {
int total = 0;
for (const int& bucket : buckets) {
total += bucket;
cout << setw(3) << bucket << ' ';
}
cout << "= " << setw(3) << total << endl;
}
 
int main() {
random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> dist(0, 99);
 
array<int, bucket_count> buckets;
array<mutex, bucket_count> bucket_mutex;
for (int& bucket : buckets) {
bucket = dist(gen);
}
print_buckets(buckets);
 
thread t_eq(equalizer, ref(buckets), ref(bucket_mutex));
thread t_rd(randomizer, ref(buckets), ref(bucket_mutex));
 
while (true) {
this_thread::sleep_for(chrono::seconds(1));
for (mutex& mutex : bucket_mutex) {
mutex.lock();
}
print_buckets(buckets);
for (mutex& mutex : bucket_mutex) {
mutex.unlock();
}
}
return 0;
}</syntaxhighlight>
 
=={{header|Clojure}}==
Function returning a new map containing altered values:
<langsyntaxhighlight lang="lisp">(defn xfer [m from to amt]
(let [{f-bal from t-bal to} m
f-bal (- f-bal amt)
Line 550 ⟶ 758:
(if (or (neg? f-bal) (neg? t-bal))
(throw (IllegalArgumentException. "Call results in negative balance."))
(assoc m from f-bal to t-bal))))</langsyntaxhighlight>
Since clojure data structures are immutable, atomic mutability occurs via a reference, in this case an atom:
<langsyntaxhighlight lang="lisp">(def *data* (atom {:a 100 :b 100})) ;; *data* is an atom holding a map
(swap! *data* xfer :a :b 50) ;; atomically results in *data* holding {:a 50 :b 150}</langsyntaxhighlight>
Now for the test:
<langsyntaxhighlight lang="lisp">(defn equalize [m a b]
(let [{a-val a b-val b} m
diff (- a-val b-val)
Line 576 ⟶ 784:
 
(.start thread-eq)
(.start thread-rand)</langsyntaxhighlight>
 
=={{header|Common Lisp}}==
Depends on libraries in Quicklisp. STMX is a library that provides Software Transactional Memory.
<syntaxhighlight lang="lisp">(ql:quickload '(:alexandria :stmx :bordeaux-threads))
 
(defpackage :atomic-updates
(:use :cl))
 
(in-package :atomic-updates)
 
(defvar *buckets* nil)
(defvar *running* nil)
 
(defun distribute (ratio a b)
"Atomically redistribute the values of buckets A and B by RATIO."
(stmx:atomic
(let* ((sum (+ (stmx:$ a) (stmx:$ b)))
(a2 (truncate (* ratio sum))))
(setf (stmx:$ a) a2)
(setf (stmx:$ b) (- sum a2)))))
 
(defun runner (ratio-func)
"Continously distribute to two different elements in *BUCKETS* with the
value returned from RATIO-FUNC."
(loop while *running*
do (let ((a (alexandria:random-elt *buckets*))
(b (alexandria:random-elt *buckets*)))
(unless (eq a b)
(distribute (funcall ratio-func) a b)))))
 
(defun print-buckets ()
"Atomically get the bucket values and print out their metrics."
(let ((buckets (stmx:atomic (map 'vector 'stmx:$ *buckets*))))
(format t "Buckets: ~a~%Sum: ~a~%" buckets (reduce '+ buckets))))
 
(defun scenario ()
(setf *buckets* (coerce (loop repeat 20 collect (stmx:tvar 10)) 'vector))
(setf *running* t)
(bt:make-thread (lambda () (runner (constantly 0.5))))
(bt:make-thread (lambda () (runner (lambda () (random 1.0))))))</syntaxhighlight>
{{out}}
<syntaxhighlight lang="lisp">ATOMIC-UPDATES> (scenario)
#<SB-THREAD:THREAD "Anonymous thread" RUNNING {10058441D3}>
ATOMIC-UPDATES> (loop repeat 3 do (print-buckets) (sleep 1))
Buckets: #(8 4 12 17 12 10 5 10 9 10 4 11 4 15 16 20 11 8 4 10)
Sum: 200
Buckets: #(2 12 24 7 8 3 13 6 8 31 0 9 7 11 12 8 8 12 15 4)
Sum: 200
Buckets: #(1 2 3 3 2 8 33 23 0 8 4 11 24 2 3 5 32 8 2 26)
Sum: 200
NIL</syntaxhighlight>
 
=={{header|D}}==
This implements a more scalable version than most of the other languages, by using a lock per bucket instead of a single lock for the whole array.
 
<langsyntaxhighlight lang="d">import std.stdio: writeln;
import std.conv: text;
import std.random: uniform, Xorshift;
Line 588 ⟶ 847:
import core.thread: Thread;
import core.sync.mutex: Mutex;
import core.time: durseconds;
 
__gshared uint transfersCount;
Line 656 ⟶ 915:
 
void randomize(size_t N)(Buckets!N data) {
immutable maxi = data.length - 1;
auto rng = Xorshift(1);
 
while (data.running) {
immutable i = uniform(0, maxidata.length, rng);
immutable j = uniform(0, maxidata.length, rng);
immutable amount = uniform!"[]"(0, 20, rng);
data.transfer(i, j, amount);
}
Line 668 ⟶ 926:
 
void equalize(size_t N)(Buckets!N data) {
immutable maxi = data.length - 1;
auto rng = Xorshift(1);
 
while (data.running) {
immutable i = uniform(0, maxidata.length, rng);
immutable j = uniform(0, maxidata.length, rng);
immutable a = data[i];
immutable b = data[j];
Line 687 ⟶ 944:
writeln(transfersCount, " ", data);
transfersCount = 0;
Thread.sleep(dur!"msecs"(1000)1.seconds);
}
data.running = false;
Line 698 ⟶ 955:
task!equalize(data).executeInNewThread();
task!display(data).executeInNewThread();
}</langsyntaxhighlight>
{{out}}
<pre>N. transfers, buckets, buckets sum:
Line 724 ⟶ 981:
This example uses a Java AWT window to display the current state of the buckets.
 
<langsyntaxhighlight lang="e">#!/usr/bin/env rune
pragma.syntax("0.9")
 
Line 846 ⟶ 1,103:
 
frame.show()
interp.waitAtTop(done)</langsyntaxhighlight>
 
=={{header|Erlang}}==
Line 863 ⟶ 1,120:
[0,11,7,0,4,16,7,0,10,0] = 55
</pre>
<syntaxhighlight lang="erlang">
<lang Erlang>
-module( atomic_updates ).
-export( [buckets/1, buckets_get/2, buckets_get_all/1, buckets_move_contents/4, task/0] ).
Line 952 ⟶ 1,209:
buckets_move_contents( Amount, From, To, Buckets ),
redistribute_loop( N, Buckets ).
</syntaxhighlight>
</lang>
 
=={{header|Euphoria}}==
<langsyntaxhighlight lang="euphoria">function move(sequence s, integer amount, integer src, integer dest)
if src < 1 or src > length(s) or dest < 1 or dest > length(s) or amount < 0 then
return -1
Line 1,023 ⟶ 1,280:
printf(1," sum: %d\n", {sum(buckets)})
task_yield()
end for</langsyntaxhighlight>
 
Output:
Line 1,056 ⟶ 1,313:
The Buckets class is thread safe and its private higher-order Lock function ensures that locks are taken out in order (to avoid deadlocks):
 
<langsyntaxhighlight lang="fsharp">
open System.Threading
 
Line 1,130 ⟶ 1,387:
Thread.Sleep 100
bucket.Print()
</syntaxhighlight>
</lang>
 
This program performs a million concurrent transfers. Typical output is:
 
<langsyntaxhighlight lang="fsharp">
[|100; 100; 100; 100; 100; 100; 100; 100; 100; 100|] = 1000
[|119; 61; 138; 115; 157; 54; 82; 58; 157; 59|] = 1000
Line 1,193 ⟶ 1,450:
[|208; 147; 18; 25; 178; 159; 23; 170; 36; 36|] = 1000
Press any key to continue . . .
</syntaxhighlight>
</lang>
 
=={{header|GoFreeBASIC}}==
{{trans|Run Basic}}
Four solutions presented here. All share the same data type declaration, as specified by the task, that supports get and transfer operations, and all share the same code to exercise the data type. This common code represents "sloppy/buggy/competing client" code, as discussed on the talk page.
<syntaxhighlight lang="freebasic">Randomize Timer
Dim Shared As Uinteger cubo(1 To 10), a, i
For i As Uinteger = 1 To 10
cubo(i) = Int(Rnd * 90)
Next i
 
Function Display(cadena As String) As Uinteger
Differences in the solutions are in the implementation of the data type and it's methods. This is where synchronization is managed and invariants are maintained.
Dim As Uinteger valor
===Channels===
Print cadena; Spc(2);
This is the straightforward solution suggested by the task description. It uses a Go channel for synchronization, but really uses the channel just as a mutex. A sync.Mutex could be trivially substituted.
For i As Uinteger = 1 To 10
valor += cubo(i)
Print Using "###"; cubo(i);
Next i
Print " Total:"; valor
Return valor
End Function
 
Sub Flatten(f As Uinteger)
Common code:
Dim As Uinteger f1 = Int((f / 10) + .5), f2
<lang go>package main
For i As Uinteger = 1 To 10
cubo(i) = f1
f2 += f1
Next i
cubo(10) += f - f2
End Sub
 
Sub Transfer(a1 As Uinteger, a2 As Uinteger)
import (
Dim As Uinteger temp = Int(Rnd * cubo(a1))
"fmt"
cubo(a1) -= temp
"math/rand"
cubo(a2) += temp
"time"
End Sub
)
 
a = Display(" Display:") ' show original array
// Data type required by task.
Flatten(a) ' flatten the array
type bucketList interface {
a = Display(" Flatten:") ' show flattened array
// Two operations required by task. Updater parameter not specified
Transfer(3, 5) ' transfer some amount from 3 to 5
// by task, but useful for displaying update counts as an indication
Display(" 19 from 3 to 5:") ' show transfer array
// that transfer operations are happening "as often as possible."
Sleep</syntaxhighlight>
bucketValue(bucket int) int
{{out}}
transfer(b1, b2, ammount, updater int)
<pre> Display: 8 77 51 38 76 47 43 16 1 1 Total: 358
Flatten: 36 36 36 36 36 36 36 36 36 34 Total: 358
19 from 3 to 5: 36 36 21 36 51 36 36 36 36 34 Total: 358</pre>
 
=={{header|FutureBasic}}==
// Operation not specified by task, but needed for synchronization.
<syntaxhighlight lang="futurebasic">
snapshot(bucketValues []int, transferCounts []int)
local fn PopulateArrayWithRandomNumbers
NSUInteger i
for i = 0 to 9
mda (i) = rnd(90)
next
end fn
 
local fn Display( title as CFStringRef ) as NSUInteger
// Operation not specified by task, but useful.
NSUInteger i, worth = 0
buckets() int // number of buckets
CFStringRef comma = @","
}
printf @"%@ [\b", title
// Total of all bucket values, declared as a const to demonstrate that
for i = 0 to 9
// it doesn't change.
worth += mda_integer (i)
const originalTotal = 1000
if i == 9 then comma = @""
printf @"%2lu%@\b", mda_integer (i), comma
next
printf @"] Sum = %lu", worth
end fn = worth
 
local fn Flatten( f as NSUInteger )
// Updater ids, used for maintaining transfer counts.
NSUInteger i, f1 = int((f / 10) + .5 ), f2 = 0, temp
const (
idOrder = iota
for i idChaos= 0 to 9
nUpdatersmda (i) = f1
) f2 += f1
next
temp = mda_integer (9)
func main() {
mda (9) = temp + f - f2
// Create a concrete object implementing the bucketList interface.
end fn
bl := newChList(10, originalTotal, nUpdaters)
 
local fn Transfer( a1 as NSUInteger, a2 as NSUInteger )
// Three concurrent tasks.
NSUInteger t, temp = int( rnd( mda_integer ( a1 ) ) )
go order(bl)
go chaos(bl)
t = mda_integer ( a1 ) : mda ( a1 ) = t -temp
buddha(bl)
t = mda_integer ( a2 ) : mda ( a2 ) = t +temp
}
end fn
 
NSUInteger a, i
// The concurrent tasks exercise the data operations by going through
// the bucketList interface. They do no explicit synchronization and
// are not responsible for maintaining invariants.
 
random
// Exercise (1.) required by task: make values more equal.
fn PopulateArrayWithRandomNumbers
func order(bl bucketList) {
a = fn Display( @" Initial array:" )
r := rand.New(rand.NewSource(time.Now().UnixNano()))
fn Flatten( a )
nBuckets := bl.buckets()
a = fn Display( @" Current values:" )
for {
fn Transfer( 3, 5 )
b1 := r.Intn(nBuckets)
fn Display( @" 19 from 3 to 5:" )
b2 := r.Intn(nBuckets)
v1 := bl.bucketValue(b1)
v2 := bl.bucketValue(b2)
if v1 > v2 {
bl.transfer(b1, b2, (v1-v2)/2, idOrder)
} else {
bl.transfer(b2, b1, (v2-v1)/2, idOrder)
}
}
}
 
HandleEvents
// Exercise (2.) required by task: redistribute values.
</syntaxhighlight>
func chaos(bl bucketList) {
{{output}}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
<pre>
nBuckets := bl.buckets()
Initial array: [28,73,90, 1,75,51,69,35,70,28] Sum = 520
for {
Current values: [52,52,52,52,52,52,52,52,52,52] Sum = 520
b1 := r.Intn(nBuckets)
19 from 3 to 5: [52,52,52,34,52,70,52,52,52,52] Sum = 520
b2 := r.Intn(nBuckets)
</pre>
bl.transfer(b1, b2, r.Intn(bl.bucketValue(b1)+1), idChaos)
}
}
 
=={{header|Go}}==
// Exercise (3.) requred by task: display total.
<syntaxhighlight lang="go">package main
func buddha(bl bucketList) {
nBuckets := bl.buckets()
s := make([]int, nBuckets)
tc := make([]int, nUpdaters)
var total, nTicks int
 
import (
fmt.Println("sum ---updates--- mean buckets")
"fmt"
tr := time.Tick(time.Second / 10)
for {"math/rand"
var sum int"sync"
<-tr"time"
)
bl.snapshot(s, tc)
for _, l := range s {
if l < 0 {
panic("sob") // invariant not preserved
}
sum += l
}
// Output number of updates per tick and cummulative mean
// updates per tick to demonstrate "as often as possible"
// of task exercises 1 and 2.
total += tc[0] + tc[1]
nTicks++
fmt.Printf("%d %6d %6d %7d %v\n", sum, tc[0], tc[1], total/nTicks, s)
if sum != originalTotal {
panic("weep") // invariant not preserved
}
}
}</lang>
Data type implementation:
<lang go>// chList (ch for channel-synchronized) is a concrete type implementing
// the bucketList interface. The bucketList interface declared methods,
// the struct type here declares data. chList methods are repsonsible
// for synchronization so they are goroutine-safe. They are also
// responsible for maintaining the invariants that the sum of all buckets
// stays constant and that no bucket value goes negative.
type chList struct {
b []int // bucket data specified by task
s chan bool // syncronization object
tc []int // a transfer count for each updater
}
 
const nBuckets = 10
// Constructor.
func newChList(nBuckets, initialSum, nUpdaters int) *chList {
bl := &chList{
b: make([]int, nBuckets),
s: make(chan bool, 1),
tc: make([]int, nUpdaters),
}
// Distribute initialSum across buckets.
for i, dist := nBuckets, initialSum; i > 0; {
v := dist / i
i--
bl.b[i] = v
dist -= v
}
// Synchronization is needed to maintain the invariant that the total
// of all bucket values stays the same. This is an implementation of
// the straightforward solution mentioned in the task description,
// ensuring that only one transfer happens at a time. Channel s
// holds a token. All methods must take the token from the channel
// before accessing data and then return the token when they are done.
// it is equivalent to a mutex. The constructor makes data available
// by initially dropping the token in the channel after all data is
// initialized.
bl.s <- true
return bl
}
 
type bucketList struct {
// Four methods implementing the bucketList interface.
b [nBuckets]int // bucket data specified by task
func (bl *chList) bucketValue(b int) int {
<-bl.s // get token before accessing data
r := bl.b[b]
bl.s <- true // return token
return r
}
 
func (bl *chList) // transfer(b1, b2,counts afor inteach updater, uxnot strictly required by int)task {but
// useful to show that the two updaters get fair chances to run.
if b1 == b2 { // null operation
tc return[2]int
}
// Get access.
<-bl.s
// Clamping maintains invariant that bucket values remain nonnegative.
if a > bl.b[b1] {
a = bl.b[b1]
}
// Transfer.
bl.b[b1] -= a
bl.b[b2] += a
bl.tc[ux]++ // increment transfer count
// Release "lock".
bl.s <- true
}
 
sync.Mutex // synchronization
func (bl *chList) snapshot(s []int, tc []int) {
<-bl.s
copy(s, bl.b)
copy(tc, bl.tc)
for i := range bl.tc {
bl.tc[i] = 0
}
bl.s <- true
}
 
// Updater ids, to track number of transfers by updater.
func (bl *chList) buckets() int {
// these can index bucketlist.tc for example.
return len(bl.b)
const (
}</lang>
idOrder = iota
Output shows constant total and lack of any negative bucket counts. It also shows that the order and chaos tasks are given roughly fair chances to run, and that updates are happening at a high rate, "as often as possible."
idChaos
<pre>
)
sum ---updates--- mean buckets
1000 26098 21980 48078 [57 106 120 119 129 82 74 90 95 128]
1000 34982 32218 57639 [3 92 42 88 45 89 133 69 219 220]
1000 21142 19716 52045 [75 104 92 66 130 130 83 115 130 75]
1000 19450 25747 50333 [30 69 29 185 124 156 122 124 6 155]
1000 22688 20442 48892 [126 102 8 99 145 102 130 121 122 45]
...
</pre>
 
const initialSum = 1000 // sum of all bucket values
===RWMutex===
There are two optimizations in this version. First, mutexes are somewhat faster than channels. Second, separate mutexes for each bucket allow the two transfer routines, "order" and "chaos", to update the bucket list simultaneously. I use one more lock for the whole list to pause transferring while printing. This lock is a RWMutex, and interestingly, the transfer routines lock it "R" mode when they want to write, and the buddha routine locks it "RW" when it only wants to read. This is because "R" represents the situation where I want to allow simultaneous operations—transfering, and "RW" represents the situation where I need exclusive access—taking a snapshot.
<lang go>// rwList, rw is for RWMutex-synchronized.
type rwList struct {
b []int // bucket data specified by task
 
// Syncronization objects.
m []sync.Mutex // mutex for each bucket
all sync.RWMutex // mutex for entire list, for snapshot operation
 
tc []int // a transfer count for each updater
}
 
// Constructor.
func newBucketList() *bucketList {
func newRwList(nBuckets, initialSum, nUpdaters int) *rwList {
blvar :=bl &rwList{bucketList
// Distribute initialSum across buckets.
b: make([]int, nBuckets),
m: make([]sync.Mutex, nBuckets),
tc: make([]int, nUpdaters),
}
for i, dist := nBuckets, initialSum; i > 0; {
v := dist / i
Line 1,425 ⟶ 1,598:
dist -= v
}
return &bl
}
 
// method 1 required by task, get current value of a bucket
// Four methods implementing the bucketList interface.
func (bl *rwListbucketList) bucketValue(b int) int {
bl.m[b].Lock() // lock onbefore bucketaccessing ensures read is atomicdata
r := bl.b[b]
bl.m[b].Unlock()
return r
}
 
// method 2 required by task
func (bl *rwList) transfer(b1, b2, a int, ux int) {
func (bl *bucketList) transfer(b1, b2, a int, ux int) {
if b1 == b2 { // null operation
// Get returnaccess.
}bl.Lock()
// Clamping maintains invariant that bucket values remain nonnegative.
// RLock on list allows other simultaneous transfers.
bl.all.RLock()
// Locking lowest bucket first prevents deadlock
// with multiple tasks working at the same time.
if b1 < b2 {
bl.m[b1].Lock()
bl.m[b2].Lock()
} else {
bl.m[b2].Lock()
bl.m[b1].Lock()
}
// clamp
if a > bl.b[b1] {
a = bl.b[b1]
}
// transferTransfer.
bl.b[b1] -= a
bl.b[b2] += a
bl.tc[ux]++ // increment transfer count
// releasebl.Unlock()
bl.m[b1].Unlock()
bl.m[b2].Unlock()
bl.all.RUnlock()
// With current Go, the program can hang without a call to gosched here.
// It seems that functions in the sync package don't touch the scheduler,
// (which is good) but we need to touch it here to give the channel
// operations in buddha a chance to run. (The current Go scheduler
// is basically cooperative rather than preemptive.)
runtime.Gosched()
}
 
// additional useful method
func (bl *rwList) snapshot(s []int, tc []int) {
func (bl *bucketList) snapshot(s *[nBuckets]int, tc *[2]int) {
bl.all.Lock() // RW lock on list prevents transfers during snap.
copy(s, bl.bLock()
copy(tc,*s = bl.tc)b
for*tc i := range bl.tc {
bl.tc = [i2]int{} =// clear transfer 0counts
} bl.Unlock()
}
bl.all.Unlock()
}
 
var bl = newBucketList()
func (bl *rwList) buckets() int {
return len(bl.b)
}</lang>
Thoughput can be seen to be relatively better than the channel version.
<pre>
sum ---updates--- mean buckets
1000 38640 39222 77862 [127 199 75 126 11 28 165 111 23 135]
1000 33164 35308 73167 [0 138 13 276 80 23 196 53 71 150]
1000 35407 36654 72798 [88 276 72 78 71 25 28 98 181 83]
1000 39081 40104 74395 [117 33 64 332 76 85 62 123 66 42]
1000 38356 39811 75149 [108 301 41 50 10 165 69 62 20 174]
</pre>
 
func main() {
===Lock-free===
// Three concurrent tasks.
This version uses no locking for the phase where the two clients are updating the buckets. Instead it watches for collisions and retries as needed.
go order() // make values closer to equal
<lang go>// lf for lock-free
go chaos() // arbitrarily redistribute values
type lfList struct {
buddha() // display total value and individual values of each bucket
b []int32
sync.RWMutex
tc []int
}
// Constructor.
func newLfList(nBuckets, initialSum, nUpdaters int) *lfList {
bl := &lfList{
b: make([]int32, nBuckets),
tc: make([]int, nUpdaters),
}
for i, dist := int32(nBuckets), int32(initialSum); i > 0; {
v := dist / i
i--
bl.b[i] = v
dist -= v
}
return bl
}
 
// The concurrent tasks exercise the data operations by calling bucketList
// Four methods implementing the bucketList interface.
// methods. The bucketList methods are "threadsafe", by which we really mean
func (bl *lfList) bucketValue(b int) int {
// goroutine-safe. The conconcurrent tasks then do no explicit synchronization
return int(atomic.LoadInt32(&bl.b[b]))
// and are not responsible for maintaining invariants.
}
 
// Exercise 1 required by task: make values more equal.
func (bl *lfList) transfer(b1, b2, a int, ux int) {
func order() {
if b1 == b2 {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
return
}
bl.RLock()
for {
tb1 := int32r.Intn(anBuckets)
v1b2 := atomicr.LoadInt32Intn(&bl.b[b1]nBuckets - 1)
if tb2 >= v1b1 {
t = v1b2++
}
ifv1 atomic:= bl.CompareAndSwapInt32bucketValue(&bl.b[b1], v1, v1-t) {
v2 := atomicbl.AddInt32bucketValue(&bl.b[b2], t)
if v1 > v2 break{
bl.transfer(b1, b2, (v1-v2)/2, idOrder)
} else {
bl.transfer(b2, b1, (v2-v1)/2, idOrder)
}
// else retry
}
bl.tc[ux]++
bl.RUnlock()
runtime.Gosched()
}
 
// Exercise 2 required by task: redistribute values.
func (bl *lfList) snapshot(s []int, tc []int) {
func chaos() {
bl.Lock()
r := rand.New(rand.NewSource(time.Now().Unix()))
for i, bv := range bl.b {
for {
s[i] = int(bv)
b1 := r.Intn(nBuckets)
b2 := r.Intn(nBuckets - 1)
if b2 >= b1 {
b2++
}
bl.transfer(b1, b2, r.Intn(bl.bucketValue(b1)+1), idChaos)
}
for i := range bl.tc {
tc[i], bl.tc[i] = bl.tc[i], 0
}
bl.Unlock()
}
 
// Exercise 3 requred by task: display total.
func (bl *lfList) buckets() int {
func buddha() {
return len(bl.b)
var s [nBuckets]int
}</lang>
var tc [2]int
Clearly this is the way to go when performance matters:
var total, nTicks int
<pre>
sum ---updates--- mean buckets
1000 83713 73128 156841 [80 57 136 144 137 88 132 88 58 80]
1000 79416 83022 159639 [0 132 2 29 89 79 27 281 181 180]
1000 78319 76803 158133 [114 88 106 37 142 70 43 191 18 191]
1000 74888 74702 155997 [184 195 30 112 71 112 70 68 36 122]
1000 81305 76426 156344 [67 34 66 308 168 27 3 168 29 130]
</pre>
===Monitor===
Finally, here is a channel based monitor pattern solution. This solution is worse than any of the above solutions both in terms of code complexity and run time performance. Seriously, don't use this unless you have a really good reason. It is here for reference because it predates the solutions above, it does work, and it shows a different way of doing things.
<lang go>// mnList (mn for monitor-synchronized) is a concrete type implementing
// the bucketList interface. The monitor is a goroutine, all communication
// with it is done through channels, which are the members of mnList.
// All data implementing the buckets is encapsulated in the monitor.
type mnList struct {
vrCh chan *valueReq
trCh chan *transferReq
srCh chan *snapReq
nbCh chan chan int
}
 
fmt.Println("sum ---updates--- mean buckets")
// Constructor makes channels and starts monitor.
tr := time.Tick(time.Second / 10)
func newMnList(nBuckets, initialSum, nUpdaters int) *mnList {
mn := &mnList{
make(chan *valueReq),
make(chan *transferReq),
make(chan *snapReq),
make(chan chan int),
}
go monitor(mn, nBuckets, initialSum, nUpdaters)
return mn
}
 
// Monitor goroutine ecapsulates data and enters a loop to handle requests.
// The loop handles one request at a time, thus serializing all access.
func monitor(mn *mnList, nBuckets, initialSum, nUpdaters int) {
// bucket representation
b := make([]int, nBuckets)
for i, dist := nBuckets, initialSum; i > 0; {
v := dist / i
i--
b[i] = v
dist -= v
}
// transfer count representation
count := make([]int, nUpdaters)
 
// monitor loop
for {
select {<-tr
bl.snapshot(&s, &tc)
// value request operation
casevar vrsum := <-mn.vrCh:int
for _, l := vr.resprange <-s b[vr.bucket]{
if l < 0 {
panic("sob") // transferinvariant not operationpreserved
case tr := <-mn.trCh:
// clamp
if tr.amount > b[tr.from] {
tr.amount = b[tr.from]
}
//sum transfer+= l
b[tr.from] -= tr.amount}
// Output number of b[tr.to]updates +=per tr.amounttick and cummulative mean
// updates per tick to demonstrate "as often as possible"
count[tr.updaterId]++
// of task exercises 1 and 2.
//total snap+= operationtc[0] + tc[1]
case sr := <-mn.srCh:nTicks++
fmt.Printf("%d %6d %6d %7d %3d\n", sum, tc[0], tc[1], total/nTicks, s)
copy(sr.bucketSnap, b)
if sum != initialSum copy(sr.countSnap, count){
forpanic("weep") i// :=invariant rangenot count {preserved
count[i] = 0
}
sr.resp <- true
 
// number of buckets
case nb := <-mn.nbCh:
nb <- nBuckets
}
}
}</syntaxhighlight>
}
{{out}}
 
type valueReq struct {
bucket int
resp chan int
}
 
func (mn *mnList) bucketValue(b int) int {
resp := make(chan int)
mn.vrCh <- &valueReq{b, resp}
return <-resp
}
 
type transferReq struct {
from, to int
amount int
updaterId int
}
 
func (mn *mnList) transfer(b1, b2, a, ux int) {
mn.trCh <- &transferReq{b1, b2, a, ux}
}
 
type snapReq struct {
bucketSnap []int
countSnap []int
resp chan bool
}
 
func (mn *mnList) snapshot(s []int, tc []int) {
resp := make(chan bool)
mn.srCh <- &snapReq{s, tc, resp}
<-resp
}
 
func (mn *mnList) buckets() int {
resp := make(chan int)
mn.nbCh <- resp
return <-resp
}</lang>
Output:
<pre>
sum ---updates--- mean buckets
1000 317832 137235 3407 5101 8508455067 [165100 86100 19100 88100 61100 252100 119100 86100 64100 60100]
1000 391239 339389 3732 592847 5661[ 85 266 895081 [4385 131 122 22037 173 19162 65 2080 6111 3 15762]
1000 509436 497362 2966 730831 4860[ 70 194 194 8575 62 [122 216 129193 100 15310 16 102 27616 100126 0119]
1000 512065 499038 1883 2982 7648800899 [100 8100 166100 54100 53100 94100 177100 273100 40100 35100]
1000 250590 121947 2946 715226 4467[ 47 271 760178 [28061 162 234 119199 63 5873 149 5458 100 63 5079]
...
</pre>
 
=={{header|Groovy}}==
Solution:
<langsyntaxhighlight lang="groovy">class Buckets {
 
def cells = []
Line 1,769 ⟶ 1,799:
}
Thread.sleep(500)
}</langsyntaxhighlight>
 
Output:
Line 1,801 ⟶ 1,831:
So, at any given time, the current value map is either in the MVar or being examined or replaced by one thread, but not both. The IntMap held by the MVar is a pure immutable data structure (<code>adjust</code> returns a modified version), so there is no problem from that the ''display'' task puts the value back before it is done printing.
 
<langsyntaxhighlight lang="haskell">module AtomicUpdates (main) where
 
import Control.Concurrent (forkIO, threadDelay)
Line 1,869 ⟶ 1,899:
forkIO (roughen buckets)
forkIO (smooth buckets)
display buckets</langsyntaxhighlight>
 
Sample output:
Line 1,891 ⟶ 1,921:
The following only works in Unicon:
 
<langsyntaxhighlight lang="unicon">global mtx
 
procedure main(A)
Line 1,923 ⟶ 1,953:
buckets[b1] -:= x
buckets[b2] +:= x
end</langsyntaxhighlight>
 
Sample run:
Line 1,943 ⟶ 1,973:
 
=={{header|Java}}==
{{works with|Java|8+}}
<lang java>import java.util.Arrays;
<syntaxhighlight lang="java">import java.util.RandomArrays;
import java.util.concurrent.ThreadLocalRandom;
 
public class AtomicUpdates {
 
{
public private static classfinal int NUM_BUCKETS = Buckets10;
 
{
privatepublic finalstatic int[]class data;Buckets {
private final int[] data;
 
public Buckets(int[] data)
public Buckets(int[] data) {
{
this.data = data.clone();
}
public int getBucket(int index)
{
synchronized (data)
{ return data[index]; }
}
public int transfer(int srcBucketIndex, int destBucketIndex, int amount)
{
if (amount == 0)
return 0;
// Negative transfers will happen in the opposite direction
if (amount < 0)
{
int tempIndex = srcBucketIndex;
srcBucketIndex = destBucketIndex;
destBucketIndex = tempIndex;
amount = -amount;
}
synchronized (data)
{
if (amount > data[srcBucketIndex])
amount = data[srcBucketIndex];
if (amount <= 0)
return 0;
data[srcBucketIndex] -= amount;
data[destBucketIndex] += amount;
return amount;
}
}
public int[] getBuckets()
{
synchronized (data)
{ return data.clone(); }
}
}
public static int getTotal(int[] values)
{
int totalValue = 0;
for (int i = values.length - 1; i >= 0; i--)
totalValue += values[i];
return totalValue;
}
public static void main(String[] args)
{
final int NUM_BUCKETS = 10;
Random rnd = new Random();
final int[] values = new int[NUM_BUCKETS];
for (int i = 0; i < values.length; i++)
values[i] = rnd.nextInt(10);
System.out.println("Initial Array: " + getTotal(values) + " " + Arrays.toString(values));
final Buckets buckets = new Buckets(values);
new Thread(new Runnable() {
public void run()
{
Random r = new Random();
while (true)
{
int srcBucketIndex = r.nextInt(NUM_BUCKETS);
int destBucketIndex = r.nextInt(NUM_BUCKETS);
int amount = (buckets.getBucket(srcBucketIndex) - buckets.getBucket(destBucketIndex)) >> 1;
if (amount != 0)
buckets.transfer(srcBucketIndex, destBucketIndex, amount);
}
}
 
}
public int getBucket(int index) {
).start();
synchronized (data) {
return data[index];
new Thread(new Runnable() {
public void run() }
{
Random r = new Random();
while (true)
{
int srcBucketIndex = r.nextInt(NUM_BUCKETS);
int destBucketIndex = r.nextInt(NUM_BUCKETS);
int srcBucketAmount = buckets.getBucket(srcBucketIndex);
int destBucketAmount = buckets.getBucket(destBucketIndex);
int amount = r.nextInt(srcBucketAmount + destBucketAmount + 1) - destBucketAmount;
if (amount != 0)
buckets.transfer(srcBucketIndex, destBucketIndex, amount);
}
}
}
).start();
 
public int transfer(int srcIndex, int dstIndex, int amount) {
while (true)
if (amount < 0)
{
throw new IllegalArgumentException("negative amount: " + amount);
long nextPrintTime = System.currentTimeMillis() + 3000;
long curTime; if (amount == 0)
return 0;
while ((curTime = System.currentTimeMillis()) < nextPrintTime)
 
{
try synchronized (data) {
{ Thread.sleep if (nextPrintTimedata[srcIndex] - curTime);amount < }0)
amount = data[srcIndex];
catch (InterruptedException e)
{ } if (data[dstIndex] + amount < 0)
amount = Integer.MAX_VALUE - data[dstIndex];
}
if (amount < 0)
int[] bucketValues = buckets.getBuckets();
throw new IllegalStateException();
System.out.println("Current values: " + getTotal(bucketValues) + " " + Arrays.toString(bucketValues));
data[srcIndex] -= amount;
data[dstIndex] += amount;
return amount;
}
}
 
public int[] getBuckets() {
synchronized (data) {
return data.clone();
}
}
}
}
}
</lang>
 
private static long getTotal(int[] values) {
{{works with|Java|8+}}
long total = 0;
<lang java5>import java.util.Arrays;
for (int value : values) {
import java.util.Optional;
total += value;
import java.util.Random;
}
import java.util.function.Consumer;
return total;
import java.util.function.Function;
}
import java.util.stream.IntStream;
import java.util.stream.Stream;
 
public interfacestatic void main(String[] Bucketsargs) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
public static Buckets new_(int[] data) {
return $.new_(data);
}
public static void main(String... arguments) {
$.main(arguments);
}
public int[] getBuckets();
 
int[] values = new int[NUM_BUCKETS];
public int getBucket(int index);
for (int i = 0; i < values.length; i++)
values[i] = rnd.nextInt() & Integer.MAX_VALUE;
System.out.println("Initial Array: " + getTotal(values) + " " + Arrays.toString(values));
 
Buckets buckets = new Buckets(values);
public int transfer(int srcBucketIndex, int destBucketIndex, int amount);
new Thread(() -> equalize(buckets), "equalizer").start();
new Thread(() -> transferRandomAmount(buckets), "transferrer").start();
new Thread(() -> print(buckets), "printer").start();
}
 
private static void transferRandomAmount(Buckets buckets) {
public enum $ {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
$$;
while (true) {
int srcIndex = rnd.nextInt(NUM_BUCKETS);
int dstIndex = rnd.nextInt(NUM_BUCKETS);
int amount = rnd.nextInt() & Integer.MAX_VALUE;
buckets.transfer(srcIndex, dstIndex, amount);
}
}
 
private static Bucketsvoid new_equalize(int[]Buckets databuckets) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
return (FunctionalBuckets) function -> {
synchronizedwhile (datatrue) {
return Optional int srcIndex = rnd.ofnextInt(dataNUM_BUCKETS);
int dstIndex = rnd.mapnextInt(functionNUM_BUCKETS);
int amount = (buckets.filtergetBucket(outputsrcIndex) -> outputbuckets.getBucket(dstIndex)) !=/ data)2;
.orElseGetif (()amount ->= data.clone()0)
buckets.transfer(srcIndex, dstIndex, amount);
;
}
};
}
 
private static void mainprint(String...Buckets argumentsbuckets) {
Stream.of(new Random while (true)) {
long nextPrintTime = System.currentTimeMillis() + 3000;
.parallel()
.map(r -> r.ints($.NUM_BUCKETS, 0, NUM_BUCKETS))long now;
while ((now = System.currentTimeMillis()) < nextPrintTime) {
.map(IntStream::toArray)
try {
.peek(bucketValues -> Stream.of(bucketValues)
.map(values -> "Initial values: " + getTotal(values) + " " + ArraysThread.toStringsleep(values)nextPrintTime - now);
.forEach } catch (System.out::printlnInterruptedException e) {
) return;
.map(Buckets::new_) }
.forEach( }
 
Stream.<Consumer<Buckets>>of(
$::processBuckets,int[] bucketValues = buckets.getBuckets();
System.out.println("Current values: " + getTotal(bucketValues) + " " + Arrays.toString(bucketValues));
$::displayBuckets
}
).reduce($ -> {}, Consumer::andThen)
)
;
}
}</syntaxhighlight>
 
{{out}}
@FunctionalInterface
<pre>Initial Array: 8345792262 [143255168, 196076270, 933397723, 1556699232, 1050802212, 538674858, 1196357020, 738586704, 726124301, 1265818774]
private static interface FunctionalBuckets extends Buckets {
Current values: 8345792262 [0, 1874588555, 1422104978, 1646554792, 272895092, 0, 1100055274, 562892928, 0, 1466700643]
public Object untypedUseData(Function<int[], Object> function);
Current values: 8345792262 [0, 938536756, 1022153269, 802097042, 834165196, 893056852, 1022153268, 985683168, 985683168, 862263543]
Current values: 8345792262 [828663081, 828663080, 800738961, 653833491, 926105549, 856587200, 1235929058, 653833491, 780719176, 780719175]
Current values: 8345792262 [834986940, 835010170, 833752099, 835010170, 834668841, 834620567, 833370581, 835083486, 834620567, 834668841]
Current values: 8345792262 [0, 249877205, 1201027166, 2147483647, 0, 966988101, 725353114, 107211829, 2147483647, 800367553]
Current values: 8345792262 [789241957, 389741912, 898370461, 1824723292, 389741912, 898370462, 1824723293, 434230374, 896648599, 0]
Current values: 8345792262 [290197046, 76068608, 2147483647, 185783029, 646610948, 187523099, 1387188383, 0, 2147483647, 1277453855]
Current values: 8345792262 [0, 0, 1594297983, 1972188797, 0, 2147483647, 0, 2147483647, 92403769, 391934419]
Current values: 8345792262 [330331828, 330331828, 2147483647, 515452290, 2010486407, 0, 515452290, 0, 348770325, 2147483647]
...</pre>
 
Commenting out either of the threads mutating the buckets shows that they work properly.
@SuppressWarnings("unchecked")
public default <OUTPUT> OUTPUT useData(Function<int[], OUTPUT> function) {
return (OUTPUT) untypedUseData(function::apply);
}
 
=={{header|Julia}}==
@Override
<syntaxhighlight lang="julia">using StatsBase
public default int[] getBuckets() {
return useData(Function.<int[]>identity());
}
 
function runall()
@Override
nbuckets = 16
public default int getBucket(int index) {
unfinish = true
return useData(data -> data[index]);
spinner = }ReentrantLock()
buckets = rand(1:99, nbuckets)
totaltrans = 0
 
bucketsum() = sum(buckets)
@Override
smallpause() = sleep(rand() / 2000)
public default int transfer(int originalSrcBucketIndex, int originalDestBucketIndex, int originalAmount) {
picktwo() = (samplepair(nbuckets)...)
return useData(data -> {
function equalizer()
int srcBucketIndex = originalSrcBucketIndex;
while unfinish
int destBucketIndex = originalDestBucketIndex;
int amount = originalAmount;smallpause()
if trylock(amount == 0spinner) {
return 0; i, j = picktwo()
} sm = buckets[i] + buckets[j]
// Negative transfers will happen in them opposite= directionfld(sm + 1, 2)
if (amount < 0) { buckets[i], buckets[j] = m, sm - m
int tempIndex totaltrans += srcBucketIndex;1
srcBucketIndex = destBucketIndex; unlock(spinner)
destBucketIndex = tempIndex;end
amount = -amount;end
}end
function redistributor()
if (amount > data[srcBucketIndex]) {
while unfinish
amount = data[srcBucketIndex];
} smallpause()
if trylock(amount <= 0spinner) {
return 0; i, j = picktwo()
} sm = buckets[i] + buckets[j]
data buckets[srcBucketIndexi] -= amount;rand(0:sm)
data buckets[destBucketIndexj] += amount;sm - buckets[i]
return amount; totaltrans += 1
} unlock(spinner);
} end
} end
end
function accountant()
count = 0
while count < 16
smallpause()
if trylock(spinner)
println("Current state of buckets: $buckets. Total in buckets: $(bucketsum())")
unlock(spinner)
count += 1
sleep(1)
end
end
unfinish = false
end
t = time()
@async equalizer()
@async redistributor()
@async accountant()
while unfinish sleep(0.25) end
println("Total transactions: $totaltrans ($(round(Int, totaltrans / (time() - t))) unlocks per second).")
end
 
runall()</syntaxhighlight>
private static final int NUM_BUCKETS = 10;
private static final int PRINT_DELAY = 3_000;
 
{{out}}
private static int getTotal(int[] values) {
<pre>Current state of buckets: [56, 26, 34, 57, 26, 25, 39, 91, 53, 46, 96, 67, 86, 49, 2, 85]. Total in buckets: 838
return Arrays.stream(values)
Current state of buckets: [62, 32, 90, 50, 9, 43, 16, 71, 67, 99, 22, 44, 63, 85, 78, 7]. Total in buckets: 838
.parallel()
Current state of buckets: [58, 25, 41, 30, 9, 79, 42, 43, 32, 66, 110, 123, 90, 35, 13, 42]. Total in buckets: 838
.sum()
Current state of buckets: [86, 63, 70, 21, 41, 69, 30, 29, 38, 40, 12, 28, 85, 13, 127, 86]. Total in buckets: 838
;
Current state of buckets: [45, 32, 26, 30, 45, 9, 86, 200, 31, 45, 9, 23, 60, 64, 79, 54]. Total in buckets: 838
}
Current state of buckets: [68, 16, 89, 104, 15, 35, 15, 23, 91, 92, 29, 27, 33, 21, 136, 44]. Total in buckets: 838
Current state of buckets: [13, 72, 8, 25, 27, 62, 134, 33, 78, 79, 7, 22, 132, 73, 12, 61]. Total in buckets: 838
Current state of buckets: [97, 78, 16, 90, 90, 69, 0, 22, 26, 84, 23, 22, 78, 69, 32, 42]. Total in buckets: 838
Current state of buckets: [3, 105, 99, 69, 70, 8, 50, 32, 17, 69, 53, 1, 68, 66, 64, 64]. Total in buckets: 838
Current state of buckets: [27, 181, 9, 5, 66, 16, 60, 56, 66, 140, 43, 29, 51, 59, 1, 29]. Total in buckets: 838
Current state of buckets: [45, 108, 45, 28, 58, 108, 86, 41, 45, 29, 57, 11, 54, 23, 52, 48]. Total in buckets: 838
Current state of buckets: [76, 45, 47, 75, 62, 34, 73, 27, 102, 64, 32, 51, 55, 32, 43, 20]. Total in buckets: 838
Current state of buckets: [35, 69, 41, 34, 29, 79, 82, 72, 71, 65, 34, 67, 68, 14, 33, 45]. Total in buckets: 838
Current state of buckets: [85, 53, 53, 26, 45, 53, 84, 99, 48, 50, 27, 52, 60, 79, 13, 11]. Total in buckets: 838
Current state of buckets: [49, 63, 24, 38, 64, 79, 75, 70, 69, 68, 50, 74, 12, 60, 6, 37]. Total in buckets: 838
Current state of buckets: [32, 20, 82, 70, 54, 41, 87, 15, 15, 44, 82, 55, 17, 33, 87, 104]. Total in buckets: 838
Total transactions: 26751 (1639 unlocks per second).</pre>
 
=={{header|Kotlin}}==
private static void equalizeBuckets(Buckets buckets) {
{{trans|Java}}
Random r = new Random();
<syntaxhighlight lang="scala">// version 1.2.0
while (true) {
int srcBucketIndex = r.nextInt($.NUM_BUCKETS);
int destBucketIndex = r.nextInt($.NUM_BUCKETS);
Stream.of(srcBucketIndex, destBucketIndex)
.map(buckets::getBucket)
.reduce((srcBucketAmount, destBucketAmount) ->
srcBucketAmount - destBucketAmount
)
.map(amount -> amount >> 1)
.filter(amount -> amount != 0)
.ifPresent(amount ->
buckets.transfer(srcBucketIndex, destBucketIndex, amount)
)
;
}
}
 
import java.util.concurrent.ThreadLocalRandom
private static void randomizeBuckets (Buckets buckets) {
import kotlin.concurrent.thread
Random r = new Random();
 
while (true) {
const val NUM_BUCKETS = 10
int srcBucketIndex = r.nextInt($.NUM_BUCKETS);
 
int destBucketIndex = r.nextInt($.NUM_BUCKETS);
class Buckets(data: IntArray) {
Stream.of(srcBucketIndex, destBucketIndex)
private val data = data.mapcopyOf(buckets::getBucket)
 
.reduce((srcBucketAmount, destBucketAmount) ->
operator fun get(index: Int) = synchronized(data) { data[index] }
r.nextInt(srcBucketAmount + destBucketAmount + 1) - destBucketAmount
 
)
fun transfer(srcIndex: Int, dstIndex: Int, .filter(amount: ->Int): amountInt != 0){
if .ifPresent(amount ->< 0) {
buckets.transferthrow IllegalArgumentException(srcBucketIndex,"Negative destBucketIndex,amount: $amount")
)}
;if (amount == 0) return 0
} synchronized(data) {
var a = amount
if (data[srcIndex] - a < 0) a = data[srcIndex]
if (data[dstIndex] + a < 0) a = Int.MAX_VALUE - data[dstIndex]
if (a < 0) throw IllegalStateException()
data[srcIndex] -= a
data[dstIndex] += a
return a
}
}
 
val buckets get() = synchronized(data) { data.copyOf() }
private static Runnable run(Runnable runnable) {
 
return runnable;
fun transferRandomAmount() {
val rnd = ThreadLocalRandom.current()
while (true) {
val srcIndex = rnd.nextInt(NUM_BUCKETS)
val dstIndex = rnd.nextInt(NUM_BUCKETS)
val amount = rnd.nextInt() and Int.MAX_VALUE
transfer(srcIndex, dstIndex, amount)
}
}
 
fun equalize() {
private static void processBuckets(Buckets buckets) {
val rnd = ThreadLocalRandom.current()
Stream.<Consumer<Buckets>>of(
$::equalizeBuckets,while (true) {
val srcIndex = rnd.nextInt(NUM_BUCKETS)
$::randomizeBuckets
val dstIndex = rnd.nextInt(NUM_BUCKETS)
)
val amount = (this[srcIndex] - this[dstIndex]) / 2
.parallel()
if (amount >= 0) transfer(srcIndex, dstIndex, amount)
.map(consumer -> run(() -> consumer.accept(buckets)))
.map(Thread::new)}
.forEach(Thread::start)
;
}
 
fun print() {
private static void displayBuckets(Buckets buckets) {
while (true) {
long val nextPrintTime = System.currentTimeMillis() + PRINT_DELAY;3000
long curTime; while (true) {
while ((curTime val now = System.currentTimeMillis()) < nextPrintTime) {
try { if (now >= nextPrintTime) break
Thread.sleep(nextPrintTime - curTime); try {
Thread.sleep(nextPrintTime - now)
}
catch (InterruptedException e) { }
catch (e: InterruptedException) {
return
}
}
val bucketValues = buckets
println("Current values: ${bucketValues.total} ${bucketValues.asList()}")
}
Stream.of(buckets)
.parallel()
.map(Buckets::getBuckets)
.map(values -> "Current values: " + getTotal(values) + " " + Arrays.toString(values))
.forEach(System.out::println)
;
}
}
}
}</lang>
 
val IntArray.total: Long get() {
Output:
var sum = 0L
<pre>Initial Array: 52 [0, 6, 6, 7, 4, 9, 8, 5, 2, 5]
for (d in this) sum += d
Current values: 52 [2, 7, 3, 1, 2, 12, 6, 3, 5, 11]
return sum
Current values: 52 [5, 6, 5, 5, 6, 5, 5, 5, 5, 5]
}
Current values: 52 [5, 6, 5, 5, 5, 5, 5, 5, 6, 5]
Current values: 52 [9, 3, 5, 3, 14, 0, 4, 4, 7, 3]</pre>
 
fun main(args: Array<String>) {
Commenting out either of the threads mutating the buckets shows that they work properly.
val rnd = ThreadLocalRandom.current()
val values = IntArray(NUM_BUCKETS) { rnd.nextInt() and Int.MAX_VALUE }
println("Initial array: ${values.total} ${values.asList()}")
val buckets = Buckets(values)
thread(name = "equalizer") { buckets.equalize() }
thread(name = "transferrer") { buckets.transferRandomAmount() }
thread(name = "printer") { buckets.print() }
}</syntaxhighlight>
 
Sample output:
<pre>
Initial array: 9412276676 [1252597313, 1908616225, 824662669, 972947315, 2126883821, 405179067, 693458796, 481375538, 396750085, 349805847]
Current values: 9412276676 [2147483647, 844064584, 983174119, 580879514, 1073741823, 666808378, 2147483647, 0, 484320482, 484320482]
Current values: 9412276676 [941221423, 941207347, 941304553, 941221422, 941235585, 941235585, 941225242, 941242321, 941157955, 941225243]
Current values: 9412276676 [941656114, 941197476, 941190372, 941203044, 941187119, 941177701, 941208610, 941038975, 941226893, 941190372]
Current values: 9412276676 [0, 202110459, 2147483647, 1901203310, 2147483647, 1489083519, 0, 234363721, 1290548373, 0]
Current values: 9412276676 [695300460, 2147483647, 1452183187, 2147483647, 0, 0, 0, 570277505, 252064583, 2147483647]
Current values: 9412276676 [941219147, 941226725, 941226725, 941238796, 941219147, 941247715, 941238795, 941234946, 941189734, 941234946]
Current values: 9412276676 [941306524, 941145153, 941241743, 940668167, 942314400, 941491117, 940668168, 941145153, 941306524, 940989727]
Current values: 9412276676 [945548859, 939149475, 935477311, 941294057, 944294715, 940031668, 940860151, 940662863, 940662862, 944294715]
Current values: 9412276676 [941254898, 941342907, 941188859, 941250824, 941250825, 940973864, 941078878, 941373381, 941373381, 941188859]
Current values: 9412276676 [941147294, 941232689, 941132597, 941330728, 941281708, 941236213, 941147294, 941265970, 941236214, 941265969]
......
</pre>
 
=={{header|Lasso}}==
Lasso thread objects are thread-safe by design.
<langsyntaxhighlight lang="lasso">define atomic => thread {
data
private buckets = staticarray_join(10, void),
Line 2,339 ⟶ 2,357:
stdoutnl(#buckets->asString + " -- total: " + #total)
}
stdoutnl(`ERROR: totals no longer match: ` + #initial_total + ', ' + #total)</langsyntaxhighlight>
 
{{out}}
Line 2,349 ⟶ 2,367:
staticarray(281, 717, 341, 716, 50, 17, 129, 247, 964, 197) -- total: 3659
staticarray(423, 509, 51, 458, 265, 423, 292, 458, 661, 119) -- total: 3659</pre>
 
 
=={{header|Logtalk}}==
The following example can be found in the Logtalk distribution and is used here with permission. Works when using SWI-Prolog, XSB, or YAP as the backend compiler.
<langsyntaxhighlight lang="logtalk">
:- object(buckets).
 
Line 2,461 ⟶ 2,478:
 
:- end_object.
</syntaxhighlight>
</lang>
 
Sample output:
 
<langsyntaxhighlight lang="logtalk">
?- buckets::start.
Sum of all bucket values: 52
Line 2,480 ⟶ 2,497:
[11,6,10,4,0,4,5,5,4,3]
true.
</syntaxhighlight>
</lang>
 
=={{header|Mathematica}} / {{header|Wolfram Language}}==
<langsyntaxhighlight Mathematicalang="mathematica">transfer[bucks_, src_, dest_, n_] :=
ReplacePart[
bucks, {src -> Max[bucks[[src]] - n, 0],
Line 2,503 ⟶ 2,520:
dest = RandomInteger[{1, 20}]},
bucks = transfer[bucks, src, dest,
RandomInteger[{1, bucks[[src]]}]]]; comp = True]]}];</langsyntaxhighlight>
{{out}}
<pre>Original sum: &lt;number&gt;
Current sum: &lt;same number, stays fixed&gt;</pre>
This simply uses a variable named <tt>comp</tt> to determine whether or not it is currently computing something.
 
=={{header|Nim}}==
We use Threads objects which are mapped to system threads. Access to buckets is protected by locks (one lock per bucket). We use also a lock to protect the random number generator which is not thread-safe.
 
The main thread sleeps during 10 seconds, then ask the threads to terminate. For this purpose, we could have used a simple boolean but we have rather chosen to send the termination message via a channel. So each thread receives the number of the channel to listen to and checks regularly if a message ask it to terminate.
 
The program must be compiled with option <code>--threads:on</code>.
 
<syntaxhighlight lang="nim">import locks
import math
import os
import random
 
const N = 10 # Number of buckets.
const MaxInit = 99 # Maximum initial value for buckets.
 
var buckets: array[1..N, Natural] # Array of buckets.
var bucketLocks: array[1..N, Lock] # Array of bucket locks.
var randomLock: Lock # Lock to protect the random number generator.
var terminate: array[3, Channel[bool]] # Used to ask threads to terminate.
 
#---------------------------------------------------------------------------------------------------
 
proc getTwoIndexes(): tuple[a, b: int] =
## Get two indexes from the random number generator.
 
result.a = rand(1..N)
result.b = rand(2..N)
if result.b == result.a: result.b = 1
 
#---------------------------------------------------------------------------------------------------
 
proc equalize(num: int) {.thread.} =
## Try to equalize two buckets.
 
var b1, b2: int # Bucket indexes.
 
while true:
 
# Select the two buckets to "equalize".
withLock randomLock:
(b1, b2) = getTwoIndexes()
if b1 > b2: swap b1, b2 # We want "b1 < b2" to avoid deadlocks.
 
# Perform equalization.
withLock bucketLocks[b1]:
withLock bucketLocks[b2]:
let target = (buckets[b1] + buckets[b2]) div 2
let delta = target - buckets[b1]
inc buckets[b1], delta
dec buckets[b2], delta
 
# Check termination.
let (available, stop) = tryRecv terminate[num]
if available and stop: break
 
#---------------------------------------------------------------------------------------------------
 
proc distribute(num: int) {.thread.} =
## Redistribute contents of two buckets.
 
var b1, b2: int # Bucket indexes.
var factor: float # Ratio used to compute the new value for "b1".
 
while true:
 
# Select the two buckets for redistribution and the redistribution factor.
withLock randomLock:
(b1, b2) = getTwoIndexes()
factor = rand(0.0..1.0)
if b1 > b2: swap b1, b2 # We want "b1 < b2" to avoid deadlocks..
 
# Perform redistribution.
withLock bucketLocks[b1]:
withLock bucketLocks[b2]:
let sum = buckets[b1] + buckets[b2]
let value = (sum.toFloat * factor).toInt
buckets[b1] = value
buckets[b2] = sum - value
 
# Check termination.
let (available, stop) = tryRecv terminate[num]
if available and stop: break
 
#---------------------------------------------------------------------------------------------------
 
proc display(num: int) {.thread.} =
## Display the content of buckets and the sum (which should be constant).
 
while true:
for i in 1..N: acquire bucketLocks[i]
echo buckets, " Total = ", sum(buckets)
for i in countdown(N, 1): release bucketLocks[i]
os.sleep(1000)
 
# Check termination.
let (available, stop) = tryRecv terminate[num]
if available and stop: break
 
#———————————————————————————————————————————————————————————————————————————————————————————————————
 
randomize()
 
# Initialize the buckets with a random value.
for bucket in buckets.mitems:
bucket = rand(1..MaxInit)
 
# Initialize the locks.
randomLock.initLock()
for lock in bucketLocks.mitems:
lock.initLock()
 
# Open the channels.
for c in terminate.mitems:
c.open()
 
# Create and launch the threads.
var tequal, tdist, tdisp: Thread[int]
tequal.createThread(equalize, 0)
tdist.createThread(distribute, 1)
tdisp.createThread(display, 2)
 
sleep(10000)
 
# Ask the threads to stop.
for c in terminate.mitems:
c.send(true)
 
joinThreads([tequal, tdist, tdisp])
 
# Free resources.
randomLock.deinitLock()
for lock in bucketLocks.mitems:
lock.deinitLock()
for c in terminate.mitems:
c.close()</syntaxhighlight>
 
{{out}}
<pre>Total = 588 [92, 63, 33, 68, 66, 37, 26, 66, 77, 60]
Total = 588 [91, 3, 41, 126, 34, 3, 25, 92, 13, 160]
Total = 588 [129, 9, 80, 6, 68, 8, 73, 45, 69, 101]
Total = 588 [87, 71, 144, 20, 11, 54, 72, 48, 63, 18]
Total = 588 [158, 71, 110, 51, 19, 60, 27, 31, 10, 51]
Total = 588 [97, 43, 5, 70, 71, 104, 25, 17, 112, 44]
Total = 588 [68, 50, 12, 51, 128, 8, 21, 143, 53, 54]
Total = 588 [31, 47, 156, 81, 69, 5, 28, 76, 66, 29]
Total = 588 [97, 3, 27, 82, 42, 120, 72, 74, 39, 32]
Total = 588 [30, 39, 79, 109, 62, 62, 13, 14, 54, 126]</pre>
 
=={{header|Oz}}==
Uses a lock for every bucket. Enforces a locking order to avoid deadlocks.
 
<langsyntaxhighlight lang="oz">declare
%%
%% INIT
Line 2,609 ⟶ 2,774:
thread for do {Smooth {Pick} {Pick}} end end
thread for do {Roughen {Pick} {Pick}} end end
for do {Display} {Time.delay 50} end</langsyntaxhighlight>
 
Sample output:
<langsyntaxhighlight lang="oz">buckets(50 50 50 50 50 50 50 50 50 50 ,,,) sum: 5000
buckets(24 68 58 43 78 85 43 66 14 48 ,,,) sum: 5000
buckets(36 33 59 38 39 23 55 51 43 45 ,,,) sum: 5000
Line 2,618 ⟶ 2,783:
buckets(51 51 49 50 51 51 51 49 49 49 ,,,) sum: 5000
buckets(43 28 27 60 77 41 36 48 72 70 ,,,) sum: 5000
...</langsyntaxhighlight>
 
=={{header|PARI/GP}}==
Line 2,624 ⟶ 2,789:
 
=={{header|Perl}}==
<langsyntaxhighlight lang="perl">use strict;
use 5.10.0;
 
Line 2,675 ⟶ 2,840:
};
 
$t1->join; $t2->join; $t3->join;</langsyntaxhighlight>
 
=={{header|Phix}}==
<!--<syntaxhighlight lang="phix">(notonline)-->
Requires Phix 0.6.7 or later (due 1st Sept 15)
<span style="color: #008080;">without</span> <span style="color: #008080;">js</span> <span style="color: #000080;font-style:italic;">-- (no threads or critical sections in JavaScript)</span>
<lang Phix>constant nBuckets = 20
<span style="color: #008080;">constant</span> <span style="color: #000000;">nBuckets</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">20</span>
sequence buckets = tagset(nBuckets) -- {1,2,3,..,20}
<span style="color: #004080;">sequence</span> <span style="color: #000000;">buckets</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">tagset</span><span style="color: #0000FF;">(</span><span style="color: #000000;">nBuckets</span><span style="color: #0000FF;">)</span> <span style="color: #000080;font-style:italic;">-- {1,2,3,..,20}</span>
constant bucket_cs = init_cs() -- critical section
<span style="color: #008080;">constant</span> <span style="color: #000000;">bucket_cs</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">init_cs</span><span style="color: #0000FF;">()</span> <span style="color: #000080;font-style:italic;">-- critical section</span>
atom equals = 0, rands = 0 -- operation counts
<span style="color: #004080;">atom</span> <span style="color: #000000;">equals</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">0</span><span style="color: #0000FF;">,</span> <span style="color: #000000;">rands</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">0</span> <span style="color: #000080;font-style:italic;">-- operation counts</span>
integer terminate = 0 -- control flag
<span style="color: #004080;">integer</span> <span style="color: #000000;">terminate</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">0</span> <span style="color: #000080;font-style:italic;">-- control flag</span>
 
procedure mythreads(integer eq)
<span style="color: #008080;">procedure</span> <span style="color: #000000;">mythreads</span><span style="color: #0000FF;">(</span><span style="color: #004080;">integer</span> <span style="color: #000000;">eq</span><span style="color: #0000FF;">)</span>
-- if eq then equalise else randomise
<span style="color: #000080;font-style:italic;">-- if eq then equalise else randomise</span>
integer b1,b2,amt
<span style="color: #004080;">integer</span> <span style="color: #000000;">b1</span><span style="color: #0000FF;">,</span><span style="color: #000000;">b2</span><span style="color: #0000FF;">,</span><span style="color: #000000;">amt</span>
while not terminate do
<span style="color: #008080;">while</span> <span style="color: #008080;">not</span> <span style="color: #000000;">terminate</span> <span style="color: #008080;">do</span>
b1 = rand(nBuckets)
<span style="color: #000000;">b1</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">rand</span><span style="color: #0000FF;">(</span><span style="color: #000000;">nBuckets</span><span style="color: #0000FF;">)</span>
b2 = rand(nBuckets)
<span style="color: #000000;">b2</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">rand</span><span style="color: #0000FF;">(</span><span style="color: #000000;">nBuckets</span><span style="color: #0000FF;">)</span>
if b1!=b2 then -- (test not actually needed)
<span style="color: #008080;">if</span> <span style="color: #000000;">b1</span><span style="color: #0000FF;">!=</span><span style="color: #000000;">b2</span> <span style="color: #008080;">then</span> <span style="color: #000080;font-style:italic;">-- (test not actually needed)</span>
enter_cs(bucket_cs)
<span style="color: #7060A8;">enter_cs</span><span style="color: #0000FF;">(</span><span style="color: #000000;">bucket_cs</span><span style="color: #0000FF;">)</span>
if eq then
<span style="color: #008080;">if</span> <span style="color: #000000;">eq</span> <span style="color: #008080;">then</span>
amt = floor((buckets[b1]-buckets[b2])/2)
<span style="color: #000000;">amt</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">floor</span><span style="color: #0000FF;">((</span><span style="color: #000000;">buckets</span><span style="color: #0000FF;">[</span><span style="color: #000000;">b1</span><span style="color: #0000FF;">]-</span><span style="color: #000000;">buckets</span><span style="color: #0000FF;">[</span><span style="color: #000000;">b2</span><span style="color: #0000FF;">])/</span><span style="color: #000000;">2</span><span style="color: #0000FF;">)</span>
equals += 1
<span style="color: #000000;">equals</span> <span style="color: #0000FF;">+=</span> <span style="color: #000000;">1</span>
else
<span amt style="color: rand(buckets[b1]+1)-1#008080;">else</span>
<span style="color: #000000;">amt</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">rand</span><span style="color: #0000FF;">(</span><span style="color: #000000;">buckets</span><span style="color: #0000FF;">[</span><span style="color: #000000;">b1</span><span style="color: #0000FF;">]+</span><span style="color: #000000;">1</span><span style="color: #0000FF;">)-</span><span style="color: #000000;">1</span>
rands += 1
<span style="color: #000000;">rands</span> <span style="color: #0000FF;">+=</span> <span style="color: #000000;">1</span>
end if
<span style="color: #008080;">end</span> <span style="color: #008080;">if</span>
buckets[b1] -= amt
<span style="color: #000000;">buckets</span><span style="color: #0000FF;">[</span><span style="color: #000000;">b1</span><span style="color: #0000FF;">]</span> <span style="color: #0000FF;">-=</span> <span style="color: #000000;">amt</span>
buckets[b2] += amt
<span style="color: #000000;">buckets</span><span style="color: #0000FF;">[</span><span style="color: #000000;">b2</span><span style="color: #0000FF;">]</span> <span style="color: #0000FF;">+=</span> <span style="color: #000000;">amt</span>
leave_cs(bucket_cs)
<span style="color: #7060A8;">leave_cs</span><span style="color: #0000FF;">(</span><span style="color: #000000;">bucket_cs</span><span style="color: #0000FF;">)</span>
end if
<span style="color: #008080;">end</span> <span style="color: #008080;">if</span>
end while
<span style="color: #008080;">end</span> <span style="color: #008080;">while</span>
exit_thread(0)
<span style="color: #000000;">exit_thread</span><span style="color: #0000FF;">(</span><span style="color: #000000;">0</span><span style="color: #0000FF;">)</span>
end procedure
<span style="color: #008080;">end</span> <span style="color: #008080;">procedure</span>
 
procedure display()
<span style="color: #008080;">procedure</span> <span style="color: #7060A8;">display</span><span style="color: #0000FF;">()</span>
enter_cs(bucket_cs)
<span style="color: #7060A8;">enter_cs</span><span style="color: #0000FF;">(</span><span style="color: #000000;">bucket_cs</span><span style="color: #0000FF;">)</span>
?{sum(buckets),equals,rands,buckets}
<span style="color: #0000FF;">?{</span><span style="color: #7060A8;">sum</span><span style="color: #0000FF;">(</span><span style="color: #000000;">buckets</span><span style="color: #0000FF;">),</span><span style="color: #000000;">equals</span><span style="color: #0000FF;">,</span><span style="color: #000000;">rands</span><span style="color: #0000FF;">,</span><span style="color: #000000;">buckets</span><span style="color: #0000FF;">}</span>
leave_cs(bucket_cs)
<span style="color: #7060A8;">leave_cs</span><span style="color: #0000FF;">(</span><span style="color: #000000;">bucket_cs</span><span style="color: #0000FF;">)</span>
end procedure
<span style="color: #008080;">end</span> <span style="color: #008080;">procedure</span>
 
display()
<span style="color: #7060A8;">display</span><span style="color: #0000FF;">()</span>
 
constant threads = {create_thread(routine_id("mythreads"),{1}), -- equalise
<span style="color: #008080;">constant</span> <span style="color: #000000;">threads</span> <span style="color: #0000FF;">=</span> <span style="color: #0000FF;">{</span><span style="color: #000000;">create_thread</span><span style="color: #0000FF;">(</span><span style="color: #7060A8;">routine_id</span><span style="color: #0000FF;">(</span><span style="color: #008000;">"mythreads"</span><span style="color: #0000FF;">),{</span><span style="color: #000000;">1</span><span style="color: #0000FF;">}),</span> <span style="color: #000080;font-style:italic;">-- equalise</span>
create_thread(routine_id("mythreads"),{0})} -- randomise
<span style="color: #000000;">create_thread</span><span style="color: #0000FF;">(</span><span style="color: #7060A8;">routine_id</span><span style="color: #0000FF;">(</span><span style="color: #008000;">"mythreads"</span><span style="color: #0000FF;">),{</span><span style="color: #000000;">0</span><span style="color: #0000FF;">})}</span> <span style="color: #000080;font-style:italic;">-- randomise</span>
 
constant ESC = #1B
<span style="color: #008080;">constant</span> <span style="color: #000000;">ESC</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">#1B</span>
while not find(get_key(),{ESC,'q','Q'}) do
<span style="color: #008080;">while</span> <span style="color: #008080;">not</span> <span style="color: #7060A8;">find</span><span style="color: #0000FF;">(</span><span style="color: #7060A8;">get_key</span><span style="color: #0000FF;">(),{</span><span style="color: #000000;">ESC</span><span style="color: #0000FF;">,</span><span style="color: #008000;">'q'</span><span style="color: #0000FF;">,</span><span style="color: #008000;">'Q'</span><span style="color: #0000FF;">})</span> <span style="color: #008080;">do</span>
sleep(1)
<span style="color: #7060A8;">sleep</span><span style="color: #0000FF;">(</span><span style="color: #000000;">1</span><span style="color: #0000FF;">)</span>
display()
<span style="color: #7060A8;">display</span><span style="color: #0000FF;">()</span>
end while
<span style="color: #008080;">end</span> <span style="color: #008080;">while</span>
terminate = 1
<span style="color: #000000;">terminate</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">1</span>
wait_thread(threads)</lang>
<span style="color: #000000;">wait_thread</span><span style="color: #0000FF;">(</span><span style="color: #000000;">threads</span><span style="color: #0000FF;">)</span>
<span style="color: #000000;">delete_cs</span><span style="color: #0000FF;">(</span><span style="color: #000000;">bucket_cs</span><span style="color: #0000FF;">)</span>
<!--</syntaxhighlight>-->
{{out}}
<pre>
Line 2,744 ⟶ 2,913:
child processes to handle the tasks, as this is the standard way
for general PicoLisp applications.
<syntaxhighlight lang="picolisp">(seed (in "/dev/urandom" (rd 8)))
<lang PicoLisp>(de *Buckets . 15) # Number of buckets
 
(de *Buckets . 15) # Number of buckets
 
# E/R model
(class +Bucket +Entity)
(rel key (+Key +Number)) # Key 1 .. *Buckets
(rel val (+Number)) # Value 1 .. 999
 
 
# Start with an empty DB
(call 'rm "-f" "buckets.db") # Remove old DB (if any)
(pool "buckets.db") # Create new DB file
 
# Create new DB file
(pool (tmp "buckets.db"))
 
# Create *Buckets buckets with values between 1 and 999
Line 2,761 ⟶ 2,929:
(new T '(+Bucket) 'key K 'val (rand 1 999)) )
(commit)
 
 
# Pick a random bucket
(de pickBucket ()
(db 'key '+Bucket (rand 1 *Buckets)) )
 
# Create process
(de process (QuadFunction)
(unless (fork)
(seed *Pid) # Ensure local random sequence
(loop
(let (B1 (pickBucket) B2 (pickBucket)) # Pick two buckets 'B1' and 'B2'
(unless (== B1 B2) # Found two different ones?
(dbSync) # Atomic DB operation
(let (V1 (; B1 val) V2 (; B2 val)) # Get current values
(QuadFunction B1 V1 B2 V2) )
(commit 'upd) ) ) ) ) ) # Close transaction
 
 
# First process
(unless (fork)
(process
(seed *Pid) # Ensure local random sequence
(quote (B1 V1 B2 V2)
(condloop
(let (B1 (pickBucket) B2 (pickBucket)) # Pick two buckets 'B1' and 'B2'
((> V1 V2)
(dbSync) (dec> B1 'val) # Make them closer to equal # Atomic DB operation
(let (V1 (inc>; B2B1 'val) V2 (; B2 val)) # Get current values
((> V2 V1) (cond
((dec> B2V1 'valV2)
(incdec> B1 'val) ) ) ) ) # Make them closer to equal
(inc> B2 'val) )
((> V2 V1)
(dec> B2 'val)
(inc> B1 'val) ) ) )
(commit 'upd) # Close transaction
(wait 1) ) ) )
 
# Second process
(unless (fork)
(process
(seed *Pid) # Ensure local random sequence
(quote (B1 V1 B2 V2)
(condloop
(let (B1 (pickBucket) B2 (pickBucket)) # Pick two buckets 'B1' and 'B2'
((> V1 V2 0)
(inc>unless (== B1 'valB2) # RedistributeFound two different themones?
(dbSync) # Atomic DB operation
(dec> B2 'val) )
(let (>V1 (; B1 val) V2 V1(; 0B2 val)) # Get current values
(inc> B2 'val) (cond
(dec> B1 'val) ) ) ) ((> V1 V2 0)
(inc> B1 'val) # Redistribute them
(dec> B2 'val) )
((> V2 V1 0)
(inc> B2 'val)
(dec> B1 'val) ) ) )
(commit 'upd) # Close transaction
(wait 1) ) ) ) )
 
# Third process
(unless (fork)
(loop
(dbSync) let Lst (collect 'key '+Bucket) # AtomicGet DBall operationbuckets
(letfor This Lst (collect 'key '+Bucket) # GetPrint allcurrent bucketsvalues
(for This Lst # Print current values
(printsp (: val)) )
(prinl # and total sum
"-- Total: "
(sum '((This) (: val)) Lst) ) )
(wait 2000) ) ) # Sleep two seconds
(rollback)
(wait 2000) ) ) # Sleep two seconds
 
(wait)</langsyntaxhighlight>
Output:
<pre>70 236 582 30 395 215 525 653 502 825 129 769 722 440 708 -- Total: 6801
Line 2,827 ⟶ 2,992:
 
=={{header|PureBasic}}==
<langsyntaxhighlight PureBasiclang="purebasic">#Buckets=9
#TotalAmount=200
Global Dim Buckets(#Buckets)
Line 2,910 ⟶ 3,075:
Quit=#True ; Tell threads to shut down
WaitThread(Thread1): WaitThread(Thread2)
EndIf</langsyntaxhighlight>
 
=={{header|Python}}==
Line 2,917 ⟶ 3,082:
This code uses a ''threading.Lock'' to serialize access to the bucket set.
 
<langsyntaxhighlight lang="python">from __future__ import with_statement # required for Python 2.5
import threading
import random
Line 2,990 ⟶ 3,155:
# wait until all worker threads finish
t1.join()
t2.join()</langsyntaxhighlight>
 
Sample Output:
Line 3,004 ⟶ 3,169:
 
=={{header|Racket}}==
<langsyntaxhighlight lang="racket">#lang racket
 
(struct bucket (value [lock #:auto])
Line 3,104 ⟶ 3,269:
 
(thread (λ () (for ([_ (in-range 500000)]) (equalize (random 10) (random 10)))))
(thread (λ () (for ([_ (in-range 500000)]) (randomize (random 10) (random 10)))))</langsyntaxhighlight>
 
Sample output:
Line 3,122 ⟶ 3,287:
9 (171429). (27 169 298 9 26 184 134 27 110 16 - 1000)
10 (192857). (54 80 38 52 29 14 42 173 246 272 - 1000)
</pre>
 
=={{header|Raku}}==
(formerly Perl 6)
 
{{trans|Ruby}}
{{works with|Rakudo|2016.07}}
<syntaxhighlight lang="raku" line>#| A collection of non-negative integers, with atomic operations.
class BucketStore {
has $.elems is required;
has @!buckets = ^1024 .pick xx $!elems;
has $lock = Lock.new;
#| Returns an array with the contents of all buckets.
method buckets {
$lock.protect: { [@!buckets] }
}
#| Transfers $amount from bucket at index $from, to bucket at index $to.
method transfer ($amount, :$from!, :$to!) {
return if $from == $to;
$lock.protect: {
my $clamped = $amount min @!buckets[$from];
@!buckets[$from] -= $clamped;
@!buckets[$to] += $clamped;
}
}
}
 
# Create bucket store
my $bucket-store = BucketStore.new: elems => 8;
my $initial-sum = $bucket-store.buckets.sum;
 
# Start a thread to equalize buckets
Thread.start: {
loop {
my @buckets = $bucket-store.buckets;
# Pick 2 buckets, so that $to has not more than $from
my ($to, $from) = @buckets.keys.pick(2).sort({ @buckets[$_] });
# Transfer half of the difference, rounded down
$bucket-store.transfer: ([-] @buckets[$from, $to]) div 2, :$from, :$to;
}
}
 
# Start a thread to distribute values among buckets
Thread.start: {
loop {
my @buckets = $bucket-store.buckets;
# Pick 2 buckets
my ($to, $from) = @buckets.keys.pick(2);
# Transfer a random portion
$bucket-store.transfer: ^@buckets[$from] .pick, :$from, :$to;
}
}
 
# Loop to display buckets
loop {
sleep 1;
my @buckets = $bucket-store.buckets;
my $sum = @buckets.sum;
say "{@buckets.fmt: '%4d'}, total $sum";
if $sum != $initial-sum {
note "ERROR: Total changed from $initial-sum to $sum";
exit 1;
}
}</syntaxhighlight>
 
{{out}}
<pre>
23 52 831 195 1407 809 813 20, total 4150
1172 83 336 306 751 468 615 419, total 4150
734 103 1086 88 313 136 1252 438, total 4150
512 323 544 165 200 3 2155 248, total 4150
...
</pre>
 
=={{header|Ring}}==
<syntaxhighlight lang="ring">
# Project : Atomic updates
 
bucket = list(10)
f2 = 0
for i = 1 to 10
bucket[i] = floor(random(9)*10)
next
a = display("display:")
see nl
a = flatten(a)
see "" + a + nl
a = display("flatten:")
see nl
a = transfer(3,5)
see a + nl
see "19 from 3 to 5: "
a = display(a)
see nl
func display(a)
display = 0
see "" + a + " " + char(9)
for i = 1 to 10
display = display + bucket[i]
see "" + bucket[i] + " "
next
see " total:" + display
return display
func flatten(f)
f1 = floor((f / 10) + 0.5)
for i = 1 to 10
bucket[i] = f1
f2 = f2 + f1
next
bucket[10] = bucket[10] + f - f2
func transfer(a1,a2)
transfer = floor(random(9)/10 * bucket[a1])
bucket[a1] = bucket[a1] - transfer
bucket[a2] = bucket[a2] + transfer
</syntaxhighlight>
Output:
<pre>
display: 60 10 70 60 40 80 90 20 90 20 total:540
flatten: 54 54 54 54 54 54 54 54 54 54 total:540
19 from 3 to 5: 54 54 33 54 75 54 54 54 54 54 total:540
</pre>
 
=={{header|Ruby}}==
<langsyntaxhighlight Rubylang="ruby">require 'thread'
 
# A collection of buckets, filled with random non-negative integers.
Line 3,218 ⟶ 3,519:
exit 1
end
end</langsyntaxhighlight>
 
Sample Output:
Line 3,225 ⟶ 3,526:
455 455 455 455 454 454 455 455, total 3638
755 3 115 10 598 1326 515 316, total 3638 </pre>
 
=={{header|Run BASIC}}==
<langsyntaxhighlight lang="runbasic">DIM bucket(10)
FOR i = 1 TO 10 : bucket(i) = int(RND(0)*100) : NEXT
 
Line 3,259 ⟶ 3,561:
bucket(a1) = bucket(a1) - transfer
bucket(a2) = bucket(a2) + transfer
END FUNCTION</langsyntaxhighlight>
<pre> Display: 24 50 50 85 63 49 50 91 10 2 Total:474
Flatten: 47 47 47 47 47 47 47 47 47 51 Total:474
19 from 3 to 5: 47 47 28 47 66 47 47 47 47 51 Total:474</pre>
 
=={{header|Rust}}==
{{libheader|rand}}
<syntaxhighlight lang="rust">extern crate rand;
 
use std::sync::{Arc, Mutex};
use std::thread;
use std::cmp;
use std::time::Duration;
 
use rand::Rng;
use rand::distributions::{IndependentSample, Range};
 
trait Buckets {
fn equalize<R:Rng>(&mut self, rng: &mut R);
fn randomize<R:Rng>(&mut self, rng: &mut R);
fn print_state(&self);
}
 
impl Buckets for [i32] {
fn equalize<R:Rng>(&mut self, rng: &mut R) {
let range = Range::new(0,self.len()-1);
let src = range.ind_sample(rng);
let dst = range.ind_sample(rng);
if dst != src {
let amount = cmp::min(((dst + src) / 2) as i32, self[src]);
let multiplier = if amount >= 0 { -1 } else { 1 };
self[src] += amount * multiplier;
self[dst] -= amount * multiplier;
}
}
fn randomize<R:Rng>(&mut self, rng: &mut R) {
let ind_range = Range::new(0,self.len()-1);
let src = ind_range.ind_sample(rng);
let dst = ind_range.ind_sample(rng);
if dst != src {
let amount = cmp::min(Range::new(0,20).ind_sample(rng), self[src]);
self[src] -= amount;
self[dst] += amount;
 
}
}
fn print_state(&self) {
println!("{:?} = {}", self, self.iter().sum::<i32>());
}
}
 
fn main() {
let e_buckets = Arc::new(Mutex::new([10; 10]));
let r_buckets = e_buckets.clone();
let p_buckets = e_buckets.clone();
 
thread::spawn(move || {
let mut rng = rand::thread_rng();
loop {
let mut buckets = e_buckets.lock().unwrap();
buckets.equalize(&mut rng);
}
});
thread::spawn(move || {
let mut rng = rand::thread_rng();
loop {
let mut buckets = r_buckets.lock().unwrap();
buckets.randomize(&mut rng);
}
});
 
let sleep_time = Duration::new(1,0);
loop {
{
let buckets = p_buckets.lock().unwrap();
buckets.print_state();
}
thread::sleep(sleep_time);
}
}</syntaxhighlight>
 
=={{header|Scala}}==
<syntaxhighlight lang="scala">
<lang Scala>
object AtomicUpdates {
 
Line 3,341 ⟶ 3,719:
}
}
</syntaxhighlight>
</lang>
=={{header|Smalltalk}}==
{{works with|Smalltalk/X}}
<syntaxhighlight lang="smalltalk">NUM_BUCKETS := 10.
"create and preset with random data"
buckets := (1 to:NUM_BUCKETS)
collect:[:i | Random nextIntegerBetween:0 and:10000]
as:Array.
count_randomizations := 0.
count_equalizations := 0.
 
printSum :=
[
"the sum must be computed and printed while noone fiddles around"
|snapshot|
snapshot := buckets synchronized:[ buckets copy ].
Transcript showCR: e' {snapshot} sum={snapshot sum}'.
].
 
pickTwo :=
[:action |
"pick two pockets and eval action on it"
|p1 p2|
p1 := Random nextIntegerBetween:1 and:NUM_BUCKETS.
p2 := Random nextIntegerBetween:1 and:NUM_BUCKETS.
buckets synchronized:[ action value:p1 value:p2 ].
].
 
randomize :=
[
pickTwo value:[:p1 :p2 |
"take a random value from p1 and add to p2"
|howMuch|
howMuch := Random nextIntegerBetween:0 and:(buckets at:p1).
buckets at:p1 put:(buckets at:p1)-howMuch.
buckets at:p2 put:(buckets at:p2)+howMuch.
].
count_randomizations := count_randomizations + 1.
].
 
equalize :=
[
pickTwo value:[:p1 :p2 |
"average them"
|diff|
diff := ((buckets at:p1) - (buckets at:p2)) // 2.
buckets at:p1 put:(buckets at:p1)-diff.
buckets at:p2 put:(buckets at:p2)+diff.
].
count_equalizations := count_equalizations + 1.
].
 
"start the show"
randomizer := [ randomize loop ] fork.
equalizer := [ equalize loop ] fork.
 
"every 2 seconds, print the sum"
monitor := [
[
printSum value.
Delay waitFor:2 seconds.
] loop.
] fork.
 
"let it run for 10 seconds, then kill them all"
Delay waitFor:20 seconds.
randomizer terminate.
equalizer terminate.
monitor terminate.
 
Stdout printCR: e'performed {count_equalizations} equalizations and {count_randomizations} randomizations'.</syntaxhighlight>
{{out}}
<pre>#(3940 3940 3940 3940 3939 3940 3940 3940 3940 3939) sum=39398
#(3940 3939 3940 3940 3940 3940 3940 3940 3940 3939) sum=39398
#(3940 3939 3940 3939 3940 3940 3940 3940 3940 3940) sum=39398
#(326 90 19490 831 2668 4840 37 6285 441 4390) sum=39398
#(3940 3940 3939 3940 3940 3940 3940 3940 3940 3939) sum=39398
#(3940 3940 3939 3940 3940 3939 3940 3940 3940 3940) sum=39398
#(1073 499 8808 1094 457 7380 4447 12307 1526 1807) sum=39398
#(10073 494 3913 284 286 105 18599 437 1332 3875) sum=39398
#(7938 7 9691 1853 1709 3566 12374 459 1062 739) sum=39398
#(327 1185 790 9606 5667 477 1260 178 18474 1434) sum=39398
performed 3360635 equalizations and 3060706 randomizations</pre>
Due to the way the CPU is scheduled, there are periods where the equalizer is way ahead, and others, where the randomizer is. Thus, depending on when sampled, the buckets are well equalized at times (running a single core).
 
=={{header|Swift}}==
 
<syntaxhighlight lang="swift">import Foundation
 
final class AtomicBuckets: CustomStringConvertible {
var count: Int {
return buckets.count
}
 
var description: String {
return withBucketsLocked { "\(buckets)" }
}
 
var total: Int {
return withBucketsLocked { buckets.reduce(0, +) }
}
 
private let lock = DispatchSemaphore(value: 1)
 
private var buckets: [Int]
 
subscript(n: Int) -> Int {
return withBucketsLocked { buckets[n] }
}
 
init(with buckets: [Int]) {
self.buckets = buckets
}
 
func transfer(amount: Int, from: Int, to: Int) {
withBucketsLocked {
let transferAmount = buckets[from] >= amount ? amount : buckets[from]
 
buckets[from] -= transferAmount
buckets[to] += transferAmount
}
}
 
private func withBucketsLocked<T>(do: () -> T) -> T {
let ret: T
 
lock.wait()
ret = `do`()
lock.signal()
 
return ret
}
}
 
let bucks = AtomicBuckets(with: [21, 39, 40, 20])
let order = DispatchSource.makeTimerSource()
let chaos = DispatchSource.makeTimerSource()
let printer = DispatchSource.makeTimerSource()
 
printer.setEventHandler {
print("\(bucks) = \(bucks.total)")
}
 
printer.schedule(deadline: .now(), repeating: .seconds(1))
printer.activate()
 
order.setEventHandler {
let (b1, b2) = (Int.random(in: 0..<bucks.count), Int.random(in: 0..<bucks.count))
let (v1, v2) = (bucks[b1], bucks[b2])
 
guard v1 != v2 else {
return
}
 
if v1 > v2 {
bucks.transfer(amount: (v1 - v2) / 2, from: b1, to: b2)
} else {
bucks.transfer(amount: (v2 - v1) / 2, from: b2, to: b1)
}
}
 
order.schedule(deadline: .now(), repeating: .milliseconds(5))
order.activate()
 
chaos.setEventHandler {
let (b1, b2) = (Int.random(in: 0..<bucks.count), Int.random(in: 0..<bucks.count))
 
bucks.transfer(amount: Int.random(in: 0..<(bucks[b1] + 1)), from: b1, to: b2)
}
 
chaos.schedule(deadline: .now(), repeating: .milliseconds(5))
chaos.activate()
 
dispatchMain()</syntaxhighlight>
 
{{out}}
 
<pre>[21, 39, 40, 20] = 120
[14, 28, 46, 32] = 120
[25, 17, 38, 40] = 120
[5, 46, 69, 0] = 120
[22, 52, 24, 22] = 120
[11, 70, 20, 19] = 120
[18, 19, 46, 37] = 120</pre>
 
=={{header|Tcl}}==
Line 3,347 ⟶ 3,908:
<br>
{{works with|Tcl|8.5}}
<langsyntaxhighlight lang="tcl">package require Thread
package require Tk
 
Line 3,448 ⟶ 4,009:
tkwait window .
tsv::set still going 0
thread::broadcast thread::exit</langsyntaxhighlight>
 
=={{header|Wren}}==
{{libheader|Wren-math}}
This is based on the Kotlin entry but has been modified somewhat mainly due to the following factors.
 
Wren-cli doesn't have threads but uses Fibers for concurrent operations in combination with the Scheduler/Timer classes for asynchronous operations.
 
Fibers are cooperatively (rather than preemptively) scheduled and only one fiber can run at a time. Consequently, simultaneous operations are impossible and all operations are therefore atomic by their nature.
<syntaxhighlight lang="wren">import "random" for Random
import "scheduler" for Scheduler
import "timer" for Timer
import "./math" for Nums
 
var Rnd = Random.new()
 
var NUM_BUCKETS = 10
var MAX_VALUE = 9999
 
class Buckets {
construct new(data) {
_data = data.toList
_running = true
}
 
[index] { _data[index] }
 
transfer(srcIndex, dstIndex, amount) {
if (amount < 0) Fiber.abort("Negative amount: %(amount)")
if (amount == 0) return 0
var a = amount
if (_data[srcIndex] - a < 0) a = _data[srcIndex]
if (_data[dstIndex] + a < 0) a = MAX_VALUE - _data[dstIndex]
if (a < 0) Fiber.abort("Negative amount: %(a)")
_data[srcIndex] = _data[srcIndex] - a
_data[dstIndex] = _data[dstIndex] + a
return a
}
 
buckets { _data.toList }
 
transferRandomAmount() {
while (_running) {
var srcIndex = Rnd.int(NUM_BUCKETS)
var dstIndex = Rnd.int(NUM_BUCKETS)
var amount = Rnd.int(MAX_VALUE + 1)
transfer(srcIndex, dstIndex, amount)
Timer.sleep(1)
}
}
 
equalize() {
while (_running) {
var srcIndex = Rnd.int(NUM_BUCKETS)
var dstIndex = Rnd.int(NUM_BUCKETS)
var amount = ((this[srcIndex] - this[dstIndex])/2).truncate
if (amount >= 0) transfer(srcIndex, dstIndex, amount)
Timer.sleep(1)
}
}
 
stop() { _running = false }
 
print() {
Timer.sleep(1000) // one second delay between prints
var bucketValues = buckets
System.print("Current values: %(Nums.sum(bucketValues)) %(bucketValues)")
}
}
 
var values = List.filled(NUM_BUCKETS, 0)
for (i in 0...NUM_BUCKETS) values[i] = Rnd.int(MAX_VALUE + 1)
System.print("Initial array : %(Nums.sum(values)) %(values)")
var buckets = Buckets.new(values)
var count = 0
while (true) {
Scheduler.add {
buckets.equalize()
}
buckets.print()
Scheduler.add {
buckets.transferRandomAmount()
}
buckets.print()
count = count + 2
if (count == 10) { // stop after 10 prints, say
buckets.stop()
break
}
}</syntaxhighlight>
 
{{out}}
Sample run:
<pre>
Initial array : 54458 [3795, 7333, 1896, 7813, 5336, 3828, 7832, 2001, 4856, 9768]
Current values: 54458 [5446, 5446, 5445, 5445, 5446, 5446, 5446, 5446, 5446, 5446]
Current values: 54458 [7868, 0, 25620, 7867, 0, 6511, 0, 0, 3318, 3274]
Current values: 54458 [5290, 4692, 3744, 7575, 5290, 7575, 1804, 11419, 0, 7069]
Current values: 54458 [0, 0, 2285, 4534, 16194, 11666, 16195, 2608, 976, 0]
Current values: 54458 [3435, 9208, 0, 9054, 4589, 5898, 4866, 0, 12819, 4589]
Current values: 54458 [4122, 8125, 6552, 415, 5845, 8125, 415, 7159, 0, 13700]
Current values: 54458 [5134, 0, 0, 17435, 5135, 9417, 7871, 4299, 5167, 0]
Current values: 54458 [2592, 4867, 5708, 6754, 12923, 0, 10013, 6377, 2532, 2692]
Current values: 54458 [1574, 9609, 8627, 0, 1575, 229, 4157, 12565, 12973, 3149]
Current values: 54458 [4146, 213, 5928, 11141, 2865, 6928, 12598, 2286, 2052, 6301]
</pre>
 
=={{header|zkl}}==
Threads and thread safe objects (locks, lists, ints, etc) are built in.
<langsyntaxhighlight lang="zkl">class B{
const N=10;
var [const]
Line 3,480 ⟶ 4,146:
 
fcn threadA(b){ while(1) { b.transferArb(); } }
fcn threadE(b){ while(1) { b.transferEq(); } }</langsyntaxhighlight>
<langsyntaxhighlight lang="zkl">b:=B();
do(10){ threadA.launch(b); } do(10){ threadE.launch(b); }
 
Line 3,489 ⟶ 4,155:
vm.numThreads," threads");
Atomic.sleep(2.5);
}</langsyntaxhighlight>
{{out}}
<pre>
Line 3,506 ⟶ 4,172:
</pre>
Another solution, using a Pipe as a "holding tank". Pipes are thread safe queues. This code just moves the values to and from the pipe to synchronize changes. The use of this class is the same as above, just change b:=B() to b:=C();
<langsyntaxhighlight lang="zkl">class C{
const N=10;
var [const]
Line 3,535 ⟶ 4,201:
v2;
}
}</langsyntaxhighlight>
 
 
Line 3,545 ⟶ 4,211:
{{omit from|LaTeX}}
{{omit from|M4}}
{{omit from|ML/I}}
{{omit from|Make}}
{{omit from|ML/I}}
{{omit from|PlainTeX}}
{{omit from|TI-89 BASIC}} <!-- Does not have concurrency or background processes. -->
{{omit from|XSLT}}
{{omit from|Z80 Assembly|Has no LOCK instruction}}
9,476

edits