Source code for /engineering/autohit-2003/src/autohit/common/channels/QueuedDrain.javaOriginal file QueuedDrain.java
   1 /**
   2  * AUTOHIT 2003
   3  * Copyright Erich P Gatejen (c) 1989,1997,2003,2004
   4  * 
   5  * This program is free software; you can redistribute it and/or modify 
   6  * it under the terms of the GNU General Public License as published by 
   7  * the Free Software Foundation; either version 2 of the License, or (at
   8  * your option) any later version.
   9  * This program is distributed in the hope that it will be useful, but
  10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
  11  * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  12  * more details.
  13  * 
  14  * You should have received a copy of the GNU General Public License along
  15  * with this program; if not, write to the Free Software Foundation, Inc.,
  16  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  17  *
  18  * Additional license information can be found in the documentation.
  19  * @author Erich P Gatejen
  20  */
  21 package autohit.common.channels;
  22 
  23 import java.util.LinkedList;
  24 import autohit.common.ProcessMonitor;
  25 
  26 /**
  27  * Basically, a queued drain. It is absolutely threadsafe. It can be used for
  28  * interprocess communication.
  29  * 
  30  * @author Erich P. Gatejen
  31  * @version 1.0 <i>Version History</i><code>EPG - Rewrite - 17Sep03</code>
  32  */
  33 public class QueuedDrain implements Drain {
  34 
  35 	/**
  36 	 * INTERNAL DATA */
  37 	private LinkedList fifo;
  38 	private ProcessMonitor monitor;
  39 	private String drainname;
  40 	private int sequence; // Let overflow wrap it
  41 
  42 	/**
  43 	 * Constructor */
  44 	public QueuedDrain() {
  45 		super();
  46 		drainname = "unnamed QueueDrain";
  47 		monitor = new ProcessMonitor();
  48 		fifo = new LinkedList();
  49 	}
  50 
  51 	/**
  52 	 * Initializer
  53 	 * 
  54 	 * @param name
  55 	 *           of the drain
  56 	 */
  57 	public void init(String name) throws ChannelException {
  58 		drainname = name;
  59 	}
  60 
  61 	/**
  62 	 * Post an item
  63 	 * 
  64 	 * @param a
  65 	 *           An atom containing the posted data
  66 	 * @return a receipt
  67 	 * @throws ChannelException
  68 	 */
  69 	public Receipt post(Atom a) throws ChannelException {
  70 
  71 		Receipt rr = null;
  72 
  73 		try {
  74 			monitor.waitlock();
  75 			fifo.add(a);
  76 
  77 			// Build the receipt
  78 			sequence++;
  79 			rr = new QueueReceipt(a.senderID, drainname, sequence);
  80 
  81 			// Let someone waiting on the fifo go
  82 			monitor.signal();
  83 
  84 		} catch (Exception ee) {
  85 			throw new ChannelException(
  86 				"Post to QueuedDrain failed due to exception.  message=" + ee.getMessage(),
  87 				ChannelException.CODE_CHANNEL_DRAIN_GENERAL_FAULT);
  88 		} finally {
  89 			try {
  90 				monitor.unlock();
  91 			} catch (Exception e) {
  92 				// Don't care
  93 			}
  94 		}
  95 		return rr;
  96 	}
  97 
  98 	/**
  99 	 * Is there something available.   There is no guarantee that between a
 100 	 * poll() and a get() that there will still be something there.
 101 	 * 
 102 	 * @return true if there is something in the queue, otherwise false
 103 	 */
 104 	public boolean poll() {
 105 		if (fifo.size() > 0)
 106 			return true;
 107 		return false;
 108 	}
 109 
 110 	/**
 111 	 * Get something from the queue. There is no guarantee that anything is
 112 	 * there. If there isn't, it will return a null.
 113 	 * 
 114 	 * @return An Atom or null
 115 	 * @throws ChannelException
 116 	 * @see Atom
 117 	 */
 118 	public Atom get() throws ChannelException {
 119 
 120 		Atom candidate = null;
 121 
 122 		try {
 123 			monitor.waitlock();
 124 			if (fifo.size() > 0) {
 125 				candidate = (Atom) fifo.removeFirst();
 126 			}
 127 
 128 		} catch (Exception ee) {
 129 			throw new ChannelException(
 130 				"FAULT in " + drainname + ".get().  message=" + ee.getMessage(),
 131 				ChannelException.CODE_CHANNEL_DRAIN_GENERAL_FAULT);
 132 
 133 		} finally {
 134 			try {
 135 				monitor.unlock();
 136 			} catch (Exception eee) {
 137 				// Don't care
 138 			}
 139 		}
 140 		return candidate;
 141 	}
 142 
 143 	/**
 144 	 * Block until there is something in the queue. It will always return
 145 	 * something
 146 	 * 
 147 	 * @return An Atom or null
 148 	 * @throws ChannelException
 149 	 * @see Atom
 150 	 */
 151 	public Atom block() throws ChannelException {
 152 		Atom candidate = this.get();
 153 
 154 		try {
 155 			while (candidate == null) {
 156 				monitor.waitSignal();
 157 				candidate = this.get();
 158 			}
 159 		} catch (InterruptedException ie) {
 160 			throw new ChannelException("Channel.QueueDrain.block() interrupted.", ChannelException.CODE_CHANNEL_INTERRUPTED);
 161 		}
 162 		return candidate;
 163 	}
 164 
 165 }