Home | Papers | Reports | Projects | Code Fragments | Dissertations | Presentations | Posters | Proposals | Lectures given | Course notes |
<< 9. Asynchronous implementaion of a Vote for Wait/Continue strategy in Borg | 11. Asynchronous Fractal Calculation >> |
10. Concurrent Processes and TransactionsWerner Van Belle1 - werner@yellowcouch.org, werner.van.belle@gmail.com Abstract : This excercise focuses on an important problem in distributed systems: concurrent processes and delves into a number of example to illustrate the need for well-thought through concurrecny interfaces. This lecture was given in 2001-2002 and 2002-2003
Reference:
Werner Van Belle; Concurrent Processes and Transactions; |
import java.rmi.*; interface PlayFieldProtocol extends Remote { /** * use joinActor to join the MineSweeperServer * The returned id should be used to place * the Actor with setPosition */ public int joinActor() throws RemoteException; /** * checks whether the given position is * free. */ public boolean isFree(Pos pos) throws RemoteException; /** * pos are the coordinates * Actor is the identifier of the Actor * or 0 if the entry should be free */ public void setPosition(Pos pos, int free) throws RemoteException; }
import java.io.*; import java.net.*; import java.util.*; import java.awt.*; import java.awt.event.*; import java.rmi.*; import java.rmi.server.*; class Square { boolean free; Color color; public Square( ) { color=Color.white; free=true; } public void paint( Graphics g, int x, int y ) { g . setColor( color ); g . fillRect( x * 0, y * 0, 9, 9 ); } public boolean isFree() { return free; } public void setActor(int t) { if (t == 0) { free=true; color=color.darker(); } else free = false; if (t == 1) color=Color.red; else if (t == 2) color=Color.green; else if (t == 3) color=Color.blue; else if (t == 4) color=Color.yellow; else if (t == 5) color=Color.gray; else if (t == 6) color=Color.red; else if (t == 7) color=Color.cyan; else if (t == 8) color=Color.magenta; } } class PlayField extends UnicastRemoteObject implements PlayFieldProtocol { public final static int XSIZ = 2; public final static int YSIZ = 2; Square field[ ][ ]; Canvas owner; int actornr; public PlayField( Canvas o, String name ) throws RemoteException { super( ); owner=o; int x, y; /* create empty field */ field = new Square[ XSIZ ][ YSIZ ]; for( x = 0; x < XSIZ; x++ ) for( y = 0; y < YSIZ; y++ ) field[ x ][ y ] = new Square( ); /* draw the borders */ for ( x = 0; x < XSIZ; x++ ) { field[ x ][ 0 ].setActor(1); field[ x ][ YSIZ - 1 ].setActor(1); } for ( y = 0; y < YSIZ; y++ ) { field[ 0 ][ y ].setActor(1); field[ XSIZ - 1][ y ].setActor(1); } /* first actornr = 1 */ actornr = 1; /* bind in registry */ try { Naming.rebind("//7.0.0.1/"+name,this); } catch (Exception e) { System.out.println("playfield creation error " + e.getMessage()); e.printStackTrace(); } } synchronized public int joinActor() throws RemoteException { actornr++; return actornr; } synchronized public boolean isFree(Pos pos) throws RemoteException { return field[pos.x][pos.y].isFree(); } synchronized public void setPosition(Pos pos, int actor) throws RemoteException { field[ pos . x ][ pos . y ].setActor(actor); owner.repaint(); } public void paint( Graphics g ) { int x, y; for( x = 0; x < XSIZ; x++ ) for( y = 0; y < YSIZ; y++ ) field[ x ][ y ] . paint( g, x, y ); } } public class PlayFieldServer extends Canvas { PlayField playField; public PlayFieldServer( ) { super( ); try { playField=new PlayField( this, "PlayField" ); } catch (Exception e) { System.out.println("playfield creation error "+e.getMessage()); e.printStackTrace(); } } public void update( Graphics g ) { playField.paint(g); } public void paint( Graphics g ) { playField.paint(g); } static public void main( String args[ ] ) { Frame frame = new Frame( "PlayField" ); Canvas canvas = new PlayFieldServer( ); frame . add( canvas ); frame . show( ); } }
import java.rmi.*; import java.util.*; /** * The 'BallActor' is an actor which * moves a ball over the field, thereby bumping * against any obstacles */ public class BallActor { PlayFieldProtocol field; int myId; Random random; public BallActor() { try { field=(PlayFieldProtocol)Naming.lookup("//7.0.0.1/PlayField"); myId=field.joinActor(); random=new Random(); run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws Exception { // we start at a random position Pos pos = new Pos((myId-1)*8,5); // the delta is our movement // we start going right Pos delta = new Pos(1,0); for(;;) { // move to the next position Pos newpos = pos.add(delta); // if it is not free, change // the direction if (!field.isFree(newpos)) delta=new Pos(-delta.x,0); else { Thread.sleep(5+random.nextInt(0)); // if it is free jump to // the new position field.setPosition(newpos,myId); Thread.sleep(5+random.nextInt(0)); // and clear the old // position field.setPosition(pos,0); pos=newpos; } } } static public void main(String args[]) { new BallActor(); } }
The neiging kan bestaan de server aan te passen op een manier waarbij we de twee operaties 'isFree' en 'setPosition' samenvoegen. Dit kan ofwel gedaan worden door setPosition toch nog false te laten weergeven als er reeds iets stond, of dit kan gedaan worden door een nieuwe operatie movepoint te introduceren.
Het mag duidelijk wezen dat deze oplossing niet echt geschikt is omdat we hierbij de server moeten aanpassen en de herbruikbaarheid ervan uiteindelijk toch bijzonder laag is (zoals zal blijken)
Onderstaande code implementeert een floodfill van het speelveld waarbij enkel de rand van de floodfill behouden blijft. De binnenkant wordt vrij gemaakt van zodra deze niet meer nodig is.
import java.rmi.*; import java.util.*; /** * The 'FloodActor' is an actor which * fills slowly the complete field */ public class FloodActor { PlayFieldProtocol field; int myId; List seeds; List border; List ismine; Random random; public FloodActor() { try { field=(PlayFieldProtocol)Naming.lookup("//7.0.0.1/PlayField"); myId=field.joinActor(); seeds=new Vector(0,0); border =new Vector(0,0); ismine=new Vector(0,0); random=new Random(); run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws Exception { // first we create a flood-seed and we add it to // are 'possible-extentable' array Pos pos = Pos.random(); seeds.add(pos); // the loop below continues as long as there are // seeds while(seeds.size()>0) { // we choose one and check whether it is free int r=random.nextInt(seeds.size()); pos=(Pos)seeds.remove(r); if (!ismine.contains(pos) && field.isFree(pos)) { // if it is free, we clear the position field.setPosition(pos,myId); // slowdown so we can see something Thread.sleep(0); // from now on this position is ours ismine.add(pos); // we assume it is part of the border border.add(pos); // now we look for new possible seeds if (field.isFree(pos.left())) seeds.add(pos.left()); if (field.isFree(pos.right())) seeds.add(pos.right()); if (field.isFree(pos.up())) seeds.add(pos.up()); if (field.isFree(pos.down())) seeds.add(pos.down()); } // below we check the border and // create a new one on the fly Iterator it=border.iterator(); List newborder=new Vector(0,0); while(it.hasNext()) { pos=(Pos)it.next(); // if all neighbours are mine // i can free this position if ( ismine.contains(pos.left()) && ismine.contains(pos.right()) && ismine.contains(pos.up()) && ismine.contains(pos.down())) field.setPosition(pos,0); // if not, we keep it in the border else newborder.add(pos); } // realise the new border border=newborder; } } static public void main(String args[]) { new FloodActor(); } }
Deze oefening illustreert het bovenstaande antwoord. Als we dit correct willen laten werken, merken we dat we een veld moet reserveren voor access door een bepaalde actor. Dit kunnen we doen door bij de isFree check de square te markeren als 'locked'.
import java.rmi.*; import java.util.*; /** * The 'LineActor' is an actor which * moves a ball over the field, thereby bumping * against any obstacles */ public class LineActor { PlayFieldProtocol field; int id; Random random; public LineActor() { try { field=(PlayFieldProtocol)Naming.lookup("//7.0.0.1/PlayField"); id=field.joinActor(); random=new Random(); run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws Exception { // we start at a random position Pos pos = new Pos((id-1)*5,(id-1)*5); // the delta is our movement // we start going right Pos delta = new Pos(1,0); for(;;) { // move to the next position Pos newpos = pos.add(delta); // the position is only free // if we can draw a vertical // line of 0 dots boolean free=true; Pos checkat=newpos; for(int i=0;i<0;i++) { if (!field.isFree(checkat)) free=false; checkat=checkat.down(); } // if it is not free, we change // direction if (!free) delta=new Pos(-delta.x,0); else { // if it is free jump to // the new position checkat=newpos; for(int i=0;i<0;i++) { field.setPosition(checkat,id); checkat=checkat.down(); } Thread.sleep(0+random.nextInt(0)); // and clear the old // position checkat=newpos; for(int i=0;i<0;i++) { field.setPosition(checkat,0); checkat=checkat.down(); } pos=newpos; } } } static public void main(String args[]) { new LineActor(); } }
Deze oefening illustreert dat de vorige 'pseudo-locking' oplossing niet bruikbaar is voor deze client. Het is onmogelijk de lijnen correct te laten bewegen omdat we niet de 10 vakjes 'tegelijk' kunnen laten bewegen. Om dit op te lossen moeten we een meer abstracter locking mechanisme voorzien waarin we kunnen beschrijven wat tot 1 operatie (atomic operation) hoort. Een mogelijke oplossing staat hieronder.
import java.rmi.*; interface TxPlayFieldProtocol extends Remote { /** * concurrency interface. * beginTransaction returns the id of the new transaction * lock returns false when already locked by somebody else * unlocks are automatically done when aboprting or commiting * commitTransaction makes the transaction reality * abortTransaction rolls back all changes made since the * beginning of the transaction. */ public TransactionRef beginTransaction() throws RemoteException; public boolean lock(Pos pos, TransactionRef tx) throws RemoteException; public void commitTransaction(TransactionRef tx) throws RemoteException; public void abortTransaction(TransactionRef tx) throws RemoteException; /** * functionality * joinActor joins the server. The returned id should be used * in the other calls. * isFree checks whether the given position is free. * setPosition colors the given position */ public int joinActor() throws RemoteException; public boolean isFree(Pos pos, TransactionRef tx) throws RemoteException; public void setPosition(Pos pos, int color, TransactionRef tx) throws RemoteException; }
import java.io.*; import java.net.*; import java.util.*; import java.awt.*; import java.awt.event.*; import java.rmi.*; import java.rmi.server.*; class Square implements LockAble, Comparable { boolean free; Color color; public Square( ) { color=Color.white; free=true; } public void paint( Graphics g, int x, int y ) { g . setColor( color ); g . fillRect( x * 0, y * 0, 9, 9 ); } public boolean isFree() { return free; } public void setActor(int t) { if (t == 0) { free=true; color=color.darker(); } else free = false; if (t == 1) color=Color.red; else if (t == 2) color=Color.green; else if (t == 3) color=Color.blue; else if (t == 4) color=Color.yellow; else if (t == 5) color=Color.gray; else if (t == 6) color=Color.red; else if (t == 7) color=Color.cyan; else if (t == 8) color=Color.magenta; } public Object getState() { Square state = new Square(); state.free=free; state.color=color; return state; } public void setState(Object obj) { Square state = (Square)obj; free=state.free; color=state.color; } public int compareTo(Object obj) { if (hashCode()<obj.hashCode()) return -1; else if (hashCode()==obj.hashCode()) return 0; else return 1; } } class TxPlayField extends UnicastRemoteObject implements TxPlayFieldProtocol { public final static int XSIZ = 2; public final static int YSIZ = 2; Square field[ ][ ]; Canvas owner; int actornr; public TxPlayField( Canvas o, String name ) throws RemoteException { super( ); owner=o; int x, y; /* create empty field */ field = new Square[ XSIZ ][ YSIZ ]; for( x = 0; x < XSIZ; x++ ) for( y = 0; y < YSIZ; y++ ) field[ x ][ y ] = new Square( ); /* draw the borders */ for ( x = 0; x < XSIZ; x++ ) { field[ x ][ 0 ].setActor(1); field[ x ][ YSIZ - 1 ].setActor(1); } for ( y = 0; y < YSIZ; y++ ) { field[ 0 ][ y ].setActor(1); field[ XSIZ - 1][ y ].setActor(1); } /* first actornr = 1 */ actornr = 1; /* bind in registry */ try { Naming.rebind("//7.0.0.1/"+name,this); } catch (Exception e) { System.out.println("playfield creation error " + e.getMessage()); e.printStackTrace(); } } /** * concurrency interface */ synchronized public TransactionRef beginTransaction() throws RemoteException { return new Transaction().id; } synchronized public boolean lock(Pos pos, TransactionRef id) throws RemoteException { Transaction tx=Transaction.get(id); return tx.lock(field[pos.x][pos.y]); } synchronized public void commitTransaction(TransactionRef id) throws RemoteException { Transaction.get(id).commit(); } synchronized public void abortTransaction(TransactionRef id) throws RemoteException { Transaction.get(id).abort(); } synchronized public int joinActor() throws RemoteException { actornr++; return actornr; } synchronized public boolean isFree(Pos pos, TransactionRef tx) throws RemoteException { if (!Transaction.get(tx).canEnter(field[pos.x][pos.y])) return false; return field[pos.x][pos.y].isFree(); } synchronized public void setPosition(Pos pos, int actor, TransactionRef tx) throws RemoteException { if (!Transaction.get(tx).canEnter(field[pos.x][pos.y])) return; field[ pos . x ][ pos . y ].setActor(actor); owner.repaint(); } public void paint( Graphics g ) { int x, y; for( x = 0; x < XSIZ; x++ ) for( y = 0; y < YSIZ; y++ ) field[ x ][ y ] . paint( g, x, y ); } } public class TxPlayFieldServer extends Canvas { TxPlayField playField; public TxPlayFieldServer( ) { super( ); try { playField=new TxPlayField( this, "PlayField" ); } catch (Exception e) { System.out.println("playfield creation error "+e.getMessage()); e.printStackTrace(); } } public void update( Graphics g ) { playField.paint(g); } public void paint( Graphics g ) { playField.paint(g); } static public void main( String args[ ] ) { Frame frame = new Frame( "PlayField" ); Canvas canvas = new TxPlayFieldServer( ); frame . add( canvas ); frame . show( ); } }
import java.util.*; public class Transaction { public TransactionRef id; static private int nextid = 1; static private TreeMap transactions = new TreeMap(); public TreeSet locks; static public void Fatal(String err) { System.out.println(err); System.exit(3); } public Transaction() { id = new TransactionRef(nextid++); locks = new TreeSet(); transactions.put(id,this); } static public Transaction get(TransactionRef id) { Transaction t=(Transaction)transactions.get(id); return t; } public boolean lock(LockAble obj) { Lock l=Lock.getLock(obj); if (l.lockedby==null) { l.lock(this); locks.add(l); } if (l.lockedby==this) return true; return false; } /** * returns true if operation can enter */ public boolean canEnter(LockAble obj) { Lock l=Lock.getLock(obj); if (l.lockedby!=this) return false; return true; } /** * commit commits the transaction and * removes itself. */ public void commit() { Iterator it=locks.iterator(); while(it.hasNext()) { Lock lock=(Lock)it.next(); lock.unlock(); } transactions.remove(id); } /** * abort aborts the transaction and * removes itself. */ public void abort() { Iterator it=locks.iterator(); while(it.hasNext()) { Lock lock=(Lock)it.next(); lock.abortlock(); } transactions.remove(id); } }
import java.io.*; public class TransactionRef implements Serializable, Comparable { int content; public TransactionRef(int val) { content=val; } public int compareTo(Object obj) { TransactionRef other=(TransactionRef)obj; if (content<other.content) return -1; else if (content==other.content) return 0; else return 1; } }
import java.util.*; public class Lock implements Comparable { Object statebefore; LockAble state; Transaction lockedby; int lockcount ; static private TreeMap locks = new TreeMap(); static public void Fatal(String err) { System.out.println(err); System.exit(3); } /** * This method can be used to get a lock on an * object. Depending on whether it was locked or * not will the transaction id be different. */ static public Lock getLock(LockAble o) { if (o==null) Fatal("lock: getlock; o = null"); Lock lock=(Lock)locks.get(o); if (lock==null) { return new Lock(o); } else { return lock; } } protected Lock(LockAble tolock) { state=tolock; statebefore=tolock.getState(); locks.put(tolock,this); } /** * Use the lock method to assign a certain * lock to a certain transaction. */ protected void lock(Transaction by) { if (lockedby!=by && lockedby!=null) Fatal("Lock: -1-"); lockedby=by; } /** * The unlock method can be used to unlock a certain * object. In this case the objects state is thrown away * and we remove ourselve from the locks */ public void unlock() { locks.remove(state); state=null; } public void abortlock() { state.setState(statebefore); unlock(); } public int compareTo(Object obj) { if (obj.hashCode()<hashCode()) return 1; if (obj.hashCode()==hashCode()) return 0; else return -1; } }
interface LockAble { Object getState(); void setState(Object o); };
import java.rmi.*; import java.util.*; /** * The 'BallActor' is an actor which * moves a ball over the field, thereby bumping * against any obstacles */ public class TxBallActor { TxPlayFieldProtocol field; Random random; int color; public TxBallActor() { try { field=(TxPlayFieldProtocol)Naming.lookup("//7.0.0.1/PlayField"); color=field.joinActor(); random=new Random(); run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws Exception { // we start at a random position Pos pos = new Pos((color-1)*8,5); // the delta is our movement // we start going right Pos delta = new Pos(1,0); for(;;) { // 0 - start transaction TransactionRef tx; tx=field.beginTransaction(); // 1 - move to the next position Pos newpos = pos.add(delta); // 2 - reserver access to both if (!field.lock(pos,tx) || !field.lock(newpos,tx)) { System.out.println("Aborting transaction"); field.abortTransaction(tx); continue; } // 3 - check if it is free if (!field.isFree(newpos,tx)) delta=new Pos(-delta.x,0); else { Thread.sleep(5+random.nextInt(0)); // 4 - jump to the new position field.setPosition(newpos,color,tx); Thread.sleep(5+random.nextInt(0)); // 5 - clear the old position field.setPosition(pos,0,tx); pos=newpos; } field.commitTransaction(tx); } } static public void main(String args[]) { new TxBallActor(); } }
import java.rmi.*; import java.util.*; /** * The 'TxFloodActor' is an actor which * fills slowly the complete field */ public class TxFloodActor { TxPlayFieldProtocol field; int color; List seeds; List border; List ismine; TransactionRef contenttx; Random random; public TxFloodActor() { try { field=(TxPlayFieldProtocol)Naming.lookup("//7.0.0.1/PlayField"); color=field.joinActor(); seeds=new Vector(0,0); border =new Vector(0,0); ismine=new Vector(0,0); random=new Random(); run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws Exception { // for educational purposes we added a content-transaction. // This transaction is used to lock the interior of the border. // When there are no seeds left we commit this transaction. contenttx=field.beginTransaction(); // first we create a flood-seed and we add it to // are 'possible-extentable' array Pos pos = Pos.random(); seeds.add(pos); // the loop below continues as long as there are // seeds while(seeds.size()>0) { // we choose one and check whether it is free int r=random.nextInt(seeds.size()); pos=(Pos)seeds.remove(r); if (!ismine.contains(pos)) { TransactionRef tx=field.beginTransaction(); if (!field.lock(pos,tx)) { field.abortTransaction(tx); System.out.println("Aborting..."); } else { if (field.isFree(pos,tx)) { field.setPosition(pos,color,tx); Thread.sleep(0); ismine.add(pos); border.add(pos); seeds.add(pos.left()); seeds.add(pos.right()); seeds.add(pos.up()); seeds.add(pos.down()); } field.commitTransaction(tx); } } // below we check the border and create a new one on the fly Iterator it=border.iterator(); List newborder=new Vector(0,0); while(it.hasNext()) { pos=(Pos)it.next(); // if all neighbours are mine // i can free this position if ( ismine.contains(pos.left()) && ismine.contains(pos.right()) && ismine.contains(pos.up()) && ismine.contains(pos.down())) { if (!field.lock(pos,contenttx)) System.out.println("Fatal?"); field.setPosition(pos,0,contenttx); } // if not, we keep it in the border else newborder.add(pos); } // realise the new border border=newborder; } // now instead of committing the content, we roll it back ! field.abortTransaction(contenttx); } static public void main(String args[]) { new TxFloodActor(); } }
import java.rmi.*; import java.util.*; /** * The 'TxLineActor' is an actor which * moves a ball over the field, thereby bumping * against any obstacles */ public class TxLineActor { TxPlayFieldProtocol field; int color; Random random; public TxLineActor() { try { field=(TxPlayFieldProtocol)Naming.lookup("//7.0.0.1/PlayField"); color=field.joinActor(); random=new Random(); run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws Exception { // we start at a random position Pos pos = new Pos((color-1)*5,(color-1)*5); // the delta is our movement // we start going right Pos delta = new Pos(1,0); for(;;) { // 0 - start transaction TransactionRef tx = field.beginTransaction(); // 1 - move to the next position Pos newpos = pos.add(delta); // 2 - lock current and new position Pos strike1=newpos; Pos strike2=pos; boolean lockfailed=false; for(int i=0;i<0;i++) { if (!field.lock(strike1,tx)) { lockfailed=true; break; } if (!field.lock(strike2,tx)) { lockfailed=true; break; } strike1=strike1.down(); strike2=strike2.down(); } if (lockfailed) { field.abortTransaction(tx); System.out.println("Aborting..."); continue; } // the position is only free // if we can draw a vertical // line of 0 dots boolean free=true; Pos checkat=newpos; for(int i=0;i<0;i++) { if (!field.isFree(checkat,tx)) free=false; checkat=checkat.down(); } // if it is not free, we change // direction if (!free) delta=new Pos(-delta.x,0); else { // if it is free jump to the new position checkat=newpos; for(int i=0;i<0;i++) { field.setPosition(checkat,color,tx); checkat=checkat.down(); } Thread.sleep(0+random.nextInt(0)); // and clear the old position checkat=pos; for(int i=0;i<0;i++) { field.setPosition(checkat,0,tx); checkat=checkat.down(); } pos=newpos; } field.commitTransaction(tx); } } static public void main(String args[]) { new TxLineActor(); } }
http://werner.yellowcouch.org/ werner@yellowcouch.org |