Skip to content

Instantly share code, notes, and snippets.

@paulopatto
Last active May 12, 2016 12:24
Show Gist options
  • Save paulopatto/e0611685e6bd99732fd1b75106c20a57 to your computer and use it in GitHub Desktop.
Save paulopatto/e0611685e6bd99732fd1b75106c20a57 to your computer and use it in GitHub Desktop.
Introduction To Concurrent Programming: A Beginner's Guide
#
# Accounts.exs
#
defmodule Accounts do
def accounts(state) do
receive do
{:transfer, source, destination, amount} ->
accounts %{state | source => state[source] - amount , destination => state[destination] + amount}
{:amounts, accounts, sender } ->
send sender, {:amounts, for account <- accounts do
{account, state[account]}
end}
accounts(state)
end
end
def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do
if times > 0 do
send accounts, {:amounts, [source, destination], self}
receive do
{:amounts, amounts} ->
if amounts[source] + amounts[destination] != 500_000 do
Agent.update(inconsistencies, fn value -> value + 1 end)
end
end
send accounts, {:transfer, source, destination, amount}
transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies)
else
send sender, {:done, self}
end
end
end
accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end
{:ok, inconsistencies} = Agent.start(fn -> 0 end)
this = self
transfer1 = spawn fn ->
IO.puts "Transfer A started"
Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies)
IO.puts "Transfer A finished"
end
transfer2 = spawn fn ->
IO.puts "Transfer B started"
Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies)
IO.puts "Transfer B finished"
end
IO.puts "Waiting for transfers to be done"
receive do
{:done, ^transfer1} -> nil
end
receive do
{:done, ^transfer2} -> nil
end
send accounts, {:amounts, [:bob, :joe], self}
receive do
{:amounts, amounts} ->
IO.puts "Bob has in account: #{amounts[:bob]}"
IO.puts "Joe has in account: #{amounts[:joe]}"
IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}"
end
;
; agent-counter.clj
;
(def counter (agent 0))
(def attempts (atom 0))
(defn counter-increases[]
(dotimes [cnt 500000]
(send counter (fn [counter]
(swap! attempts inc)
(inc counter)))))
(def first-future (future (counter-increases)))
(def second-future (future (counter-increases)))
; wait for futures to complete
@first-future
@second-future
; wait for counter to be finished with updating
(await counter)
; print the value of the counter
(println "The counter is: " @counter)
(println "Number of attempts: " @attempts)
;
; atom-accounts-fixed.clj
;
(def accounts (atom {:bob 200000, :joe 300000}))
(def inconsistencies (atom 0))
(defn transfer [source destination amount]
(let [deref-accounts @accounts]
(if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000)
(swap! inconsistencies inc))
(swap! accounts
(fn [accs]
(update (update accs source - amount) destination + amount)))))
(defn first-transfer []
(dotimes [cnt 100000]
(transfer :bob :joe 2)))
(defn second-transfer []
(dotimes [cnt 100000]
(transfer :joe :bob 1)))
(def first-future (future (first-transfer)))
(def second-future (future (second-transfer)))
@first-future
@second-future
(println "Bob has in account: " (get @accounts :bob))
(println "Joe has in account: " (get @accounts :joe))
(println "Inconsistencies while transfer: " @inconsistencies)
;
; atom-acocunts.clj
;
(def bob (atom 200000))
(def joe (atom 300000))
(def inconsistencies (atom 0))
(defn transfer [source destination amount]
(if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc))
(swap! source - amount)
(swap! destination + amount))
(defn first-transfer []
(dotimes [cnt 100000]
(transfer bob joe 2)))
(defn second-transfer []
(dotimes [cnt 100000]
(transfer joe bob 1)))
(def first-future (future (first-transfer)))
(def second-future (future (second-transfer)))
@first-future
@second-future
(println "Bob has in account: " @bob)
(println "Joe has in account: " @joe)
(println "Inconsistencies while transfer: " @inconsistencies)
;
; atom-counter.clj
;
(def counter (atom 0))
(def attempts (atom 0))
(defn counter-increases[]
(dotimes [cnt 500000]
(swap! counter (fn [counter]
(swap! attempts inc) ; side effect DO NOT DO THIS
(inc counter)))))
(def first-future (future (counter-increases)))
(def second-future (future (counter-increases)))
; Wait for futures to complete
@first-future
@second-future
; Print value of the counter
(println "The counter is: " @counter)
(println "Number of attempts: " @attempts)
#
# Counting.exs
#
defmodule Counting do
def counter(value) do
receive do
{:get, sender} ->
send sender, {:counter, value}
counter value
{:set, new_value} -> counter(new_value)
end
end
def counting(sender, counter, times) do
if times > 0 do
send counter, {:get, self}
receive do
{:counter, value} -> send counter, {:set, value + 1}
end
counting(sender, counter, times - 1)
else
send sender, {:done, self}
end
end
end
counter = spawn fn -> Counting.counter 0 end
IO.puts "Starting counting processes"
this = self
counting1 = spawn fn ->
IO.puts "Counting A started"
Counting.counting this, counter, 500_000
IO.puts "Counting A finished"
end
counting2 = spawn fn ->
IO.puts "Counting B started"
Counting.counting this, counter, 500_000
IO.puts "Counting B finished"
end
IO.puts "Waiting for counting to be done"
receive do
{:done, ^counting1} -> nil
end
receive do
{:done, ^counting2} -> nil
end
send counter, {:get, self}
receive do
{:counter, value} -> IO.puts "Counter is: #{value}"
end
//
// Counting.java
//
public class Counting {
public static void main(String[] args) throws InterruptedException {
class Counter {
int counter = 0;
public void increment() { counter++; }
public int get() { return counter; }
}
final Counter counter = new Counter();
class CountingThread extends Thread {
public void run() {
for (int x = 0; x < 500000; x++) {
counter.increment();
}
}
}
CountingThread t1 = new CountingThread();
CountingThread t2 = new CountingThread();
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println(counter.get());
}
}
class Counting
def initialize
@counter = 0;
end
def increment
@counter += 1
end
def get
@counter
end
end
counter = Counting.new
t1 = Thread.new { 0.upto(500000).each { counter.increment; puts "[DEBUG] Counter##{counter.get}" } }
t2 = Thread.new { 0.upto(500000).each { counter.increment; puts "[DEBUG] Counter##{counter.get}" } }
t1.run; t2.run;
t1.join; t2.join;
puts "#{counter.get}"
//
// CountingBetter.java
//
import java.util.concurrent.atomic.AtomicInteger;
class CountingBetter {
public static void main(String[] args) throws InterruptedException {
final AtomicInteger counter = new AtomicInteger(0);
class CountingThread extends Thread {
public viod run() {
for (int i = 0; i < 500000; i++) {
counter.incrementAndGet();
}
}
}
CountingThread thread1 = new CountingThread();
CountingThread thread2 = new CoutningThread();
thread1.start(); thread2.start();
thread1.join(); thread2.join();
System.out.println(counter.get());
}
}
#
# CountingFixed.exs
#
defmodule Counting do
def counter(value) do
receive do
:increase -> counter(value + 1)
{:get, sender} ->
send sender, {:counter, value}
counter value
end
end
def counting(sender, counter, times) do
if times > 0 do
send counter, :increase
counting(sender, counter, times - 1)
else
send sender, {:done, self}
end
end
end
counter = spawn fn -> Counting.counter 0 end
IO.puts "Starting counting processes"
this = self
counting1 = spawn fn ->
IO.puts "Counting A started"
Counting.counting this, counter, 500_000
IO.puts "Counting A finished"
end
counting2 = spawn fn ->
IO.puts "Counting B started"
Counting.counting this, counter, 500_000
IO.puts "Counting B finished"
end
IO.puts "Waiting for counting to be done"
receive do
{:done, ^counting1} -> nil
end
receive do
{:done, ^counting2} -> nil
end
send counter, {:get, self}
receive do
{:counter, value} -> IO.puts "Counter is: #{value}"
end
//
// CountingFixed.java
//
public class CountingFixed {
public static main(String[] args) throws InterruptedException {
class Counter {
int counter = 0;
public synchronized void increase() { counter++; }
public synchronized int get() { return counter; }
}
final Counter counter = new Counter();
class CountingThread extends Thread {
public void run() {
for (int i = 0; i < 500000; i++) {
counter.increment();
}
}
}
CountingThread thread1 = new CountingThread();
CountingThread thread2 = new CountingThread();
thread1.start(); thread2.start();
thread1.join(); thread2.join();
System.out.println(counter.get());
}
}
#
# Deadlock.exs
#
defmodule Lock do
def loop(state) do
receive do
{:lock, sender} ->
case state do
[] ->
send sender, :locked
loop([sender])
_ ->
loop(state ++ [sender])
end
{:unlock, sender} ->
case state do
[] ->
loop(state)
[^sender | []] ->
loop([])
[^sender | [next | tail]] ->
send next, :locked
loop([next | tail])
_ ->
loop(state)
end
end
end
def lock(pid) do
send pid, {:lock, self}
receive do
:locked -> nil # This will block until we receive message
end
end
def unlock(pid) do
send pid, {:unlock, self}
end
def locking(first, second, times) do
if times > 0 do
lock(first)
lock(second)
unlock(second)
unlock(first)
locking(first, second, times - 1)
end
end
end
a_lock = spawn fn -> Lock.loop([]) end
b_lock = spawn fn -> Lock.loop([]) end
this = self
IO.puts "Locking A, B started"
spawn fn ->
Lock.locking(a_lock, b_lock, 1_000)
IO.puts "Locking A, B finished"
send this, :done
end
IO.puts "Locking B, A started"
spawn fn ->
Lock.locking(b_lock, a_lock, 1_000)
IO.puts "Locking B, A finished"
send this, :done
end
IO.puts "Waiting for locking to be done"
receive do
:done -> nil
end
receive do
:done -> nil
End
//
// Deadlock.java
//
public class Deadlock {
public static void main(String[] args) throws InterruptedException {
class Account {
int balance = 100;
public Account(int balance) { this.balance = balance; }
public synchronized void deposit(int amount) { balance += amount; }
public synchronized boolean withdraw(int amount) {
if (balance >= amount) {
balance -= amount;
return true;
}
return false;
}
public synchronized boolean transfer(Account destination, int amount) {
if (balance >= amount) {
balance -= amount;
synchronized(destination) {
destination.balance += amount;
};
return true;
}
return false;
}
public int getBalance() { return balance; }
}
final Account bob = new Account(200000);
final Account joe = new Account(300000);
class FirstTransfer extends Thread {
public void run() {
for (int i = 0; i < 100000; i++) {
bob.transfer(joe, 2);
}
}
}
class SecondTransfer extends Thread {
public void run() {
for (int i = 0; i < 100000; i++) {
joe.transfer(bob, 1);
}
}
}
FirstTransfer thread1 = new FirstTransfer();
SecondTransfer thread2 = new SecondTransfer();
thread1.start(); thread2.start();
thread1.join(); thread2.join();
System.out.println("Bob's balance: " + bob.getBalance());
System.out.println("Joe's balance: " + joe.getBalance());
}
}
#
# Deadlock fixed
#
defmodule Lock do
def loop(state) do
receive do
{:lock, sender} ->
case state do
[] ->
send sender, :locked
loop([sender])
_ ->
loop(state ++ [sender])
end
{:unlock, sender} ->
case state do
[] ->
loop(state)
[^sender | []] ->
loop([])
[^sender | [next | tail]] ->
send next, :locked
loop([next | tail])
_ ->
loop(state)
end
end
end
def lock(pid) do
send pid, {:lock, self}
receive do
:locked -> nil # This will block until we receive message
end
end
def unlock(pid) do
send pid, {:unlock, self}
end
def locking(first, second, times) do
if times > 0 do
lock(first)
lock(second)
unlock(second)
unlock(first)
locking(first, second, times - 1)
end
end
end
a_lock = spawn fn -> Lock.loop([]) end
b_lock = spawn fn -> Lock.loop([]) end
this = self
IO.puts "Locking A, B started"
spawn fn ->
Lock.locking(a_lock, b_lock, 1_000)
IO.puts "Locking A, B finished"
send this, :done
end
IO.puts "Locking A, B started"
spawn fn ->
Lock.locking(a_lock, b_lock, 1_000)
IO.puts "Locking A, B finished"
send this, :done
end
IO.puts "Waiting for locking to be done"
receive do
:done -> nil
end
receive do
:done -> nil
End
//
// DeadlockFixed.java
//
import java.util.concurrent.atomic.AtomicInteger;
public class DeadlockFixed {
public static void main(String[] args) throws InterruptedException {
final AtomicInteger counter = new AtomicInteger(0);
class Account {
int balance = 100;
int order;
public Account(int balance) {
this.balance = balance;
this.order = counter.getAndIncrement();
}
public synchronized void deposit(int amount) { balance += amount; }
public synchronized boolean withdraw(int amount) {
if (balance >= amount) {
balance -= amount;
return true;
}
return false;
}
public boolean transfer(Account destination, int amount) {
Account first;
Account second;
if (this.order < destination.order) {
first = this;
second = destination;
}
else {
first = destination;
second = this;
}
synchronized(first) {
synchronized(second) {
if (balance >= amount) {
balance -= amount;
destination.balance += amount;
return true;
}
return false;
}
}
}
public synchronized int getBalance() { return balance; }
}
final Account bob = new Account(200000);
final Account joe = new Account(300000);
class FirstTransfer extends Thread {
public void run() {
for (int i = 0; i < 100000; i++) {
bob.transfer(joe, 2);
}
}
}
class SecondTransfer extends Thread {
public void run() {
for (int i = 0; i < 100000; i++) {
joe.transfer(bob, 1);
}
}
}
FirstTransfer thread1 = new FirstTransfer();
SecondTransfer thread2 = new SecondTransfer();
thread1.start(); thread2.start();
thread1.join(); thread2.join();
System.out.println("Bob's balance: " + bob.getBalance());
System.out.println("Joe's balance: " + joe.getBalance());
}
}
;
; fibonacci.clj
;
(defn fibonacci[a]
(if (<= a 2)
1
(+ (fibonacci (- a 1)) (fibonacci (- a 2)))))
(println "Start serial calculation")
(time (println "The result is: " (+ (fibonacci 36) (fibonacci 36))))
(println "Start parallel calculation")
(defn parallel-fibonacci[]
(def result-1 (future (fibonacci 36)))
(def result-2 (future (fibonacci 36)))
(+ @result-1 @result-2))
(time (println "The result is: " (parallel-fibonacci)))
;
; future.clj
;
(let [a (future
(println "Started A")
(Thread/sleep 1000)
(println "Finished A")
(+ 1 2))
b (future
(println "Started B")
(Thread/sleep 2000)
(println "Finished B")
(+ 3 4))]
(println "Waiting for futures")
(+ @a @b))
https://www.toptal.com/software/introduction-to-concurrent-programming
/
// NonDeteminism.java
//
public class NonDeterminism {
public static void main(String[] args) throws InterruptedException {
class Container {
public String value = "Empty";
}
final Container container = new Container();
class FastThread extends Thread {
public void run() {
container.value = "Fast";
}
}
class SlowThread extends Thread {
public void run() {
try {
Thread.sleep(50);
}
catch(Exception e) {}
container.value = "Slow";
}
}
FastThread fast = new FastThread();
SlowThread slow = new SlowThread();
fast.start(); slow.start();
fast.join(); slow.join();
System.out.println(container.value);
}
}
;
; promise-deadlock.clj
;
(def promise-result (promise))
(def future-result
(future
(println "The result is: " + @promise-result)
13))
(println "Future result is: " @future-result)
(deliver result 42)
;
; promise.clj
;
(def result (promise))
(future (println "The result is: " @result))
(Thread/sleep 2000)
(deliver result 42)

while $(sleep 3); do java Counting; done

;
; stm-accounts.clj
;
(def bob (ref 200000))
(def joe (ref 300000))
(def inconsistencies (atom 0))
(def attempts (atom 0))
(def transfers (agent 0))
(defn transfer [source destination amount]
(dosync
(swap! attempts inc) ; side effect DO NOT DO THIS
(send transfers inc)
(when (not= (+ @bob @joe) 500000)
(swap! inconsistencies inc)) ; side effect DO NOT DO THIS
(alter source - amount)
(alter destination + amount)))
(defn first-transfer []
(dotimes [cnt 100000]
(transfer bob joe 2)))
(defn second-transfer []
(dotimes [cnt 100000]
(transfer joe bob 1)))
(def first-future (future (first-transfer)))
(def second-future (future (second-transfer)))
@first-future
@second-future
(await transfers)
(println "Bob has in account: " @bob)
(println "Joe has in account: " @joe)
(println "Inconsistencies while transfer: " @inconsistencies)
(println "Attempts: " @attempts)
(println "Transfers: " @transfers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment