1 /**
2 * AUTOHIT 2003
3 * Copyright Erich P Gatejen (c) 1989,1997,2003,2004
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or (at
8 * your option) any later version.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * Additional license information can be found in the documentation.
19 * @author Erich P Gatejen
20 */
21 package autohit.common.channels;
22
23 import java.util.LinkedList;
24 import autohit.common.ProcessMonitor;
25
26 /**
27 * Basically, a queued drain. It is absolutely threadsafe. It can be used for
28 * interprocess communication.
29 *
30 * @author Erich P. Gatejen
31 * @version 1.0 <i>Version History</i><code>EPG - Rewrite - 17Sep03</code>
32 */
33 public class QueuedDrain implements Drain {
34
35 /**
36 * INTERNAL DATA */
37 private LinkedList fifo;
38 private ProcessMonitor monitor;
39 private String drainname;
40 private int sequence; // Let overflow wrap it
41
42 /**
43 * Constructor */
44 public QueuedDrain() {
45 super();
46 drainname = "unnamed QueueDrain";
47 monitor = new ProcessMonitor();
48 fifo = new LinkedList();
49 }
50
51 /**
52 * Initializer
53 *
54 * @param name
55 * of the drain
56 */
57 public void init(String name) throws ChannelException {
58 drainname = name;
59 }
60
61 /**
62 * Post an item
63 *
64 * @param a
65 * An atom containing the posted data
66 * @return a receipt
67 * @throws ChannelException
68 */
69 public Receipt post(Atom a) throws ChannelException {
70
71 Receipt rr = null;
72
73 try {
74 monitor.waitlock();
75 fifo.add(a);
76
77 // Build the receipt
78 sequence++;
79 rr = new QueueReceipt(a.senderID, drainname, sequence);
80
81 // Let someone waiting on the fifo go
82 monitor.signal();
83
84 } catch (Exception ee) {
85 throw new ChannelException(
86 "Post to QueuedDrain failed due to exception. message=" + ee.getMessage(),
87 ChannelException.CODE_CHANNEL_DRAIN_GENERAL_FAULT);
88 } finally {
89 try {
90 monitor.unlock();
91 } catch (Exception e) {
92 // Don't care
93 }
94 }
95 return rr;
96 }
97
98 /**
99 * Is there something available. There is no guarantee that between a
100 * poll() and a get() that there will still be something there.
101 *
102 * @return true if there is something in the queue, otherwise false
103 */
104 public boolean poll() {
105 if (fifo.size() > 0)
106 return true;
107 return false;
108 }
109
110 /**
111 * Get something from the queue. There is no guarantee that anything is
112 * there. If there isn't, it will return a null.
113 *
114 * @return An Atom or null
115 * @throws ChannelException
116 * @see Atom
117 */
118 public Atom get() throws ChannelException {
119
120 Atom candidate = null;
121
122 try {
123 monitor.waitlock();
124 if (fifo.size() > 0) {
125 candidate = (Atom) fifo.removeFirst();
126 }
127
128 } catch (Exception ee) {
129 throw new ChannelException(
130 "FAULT in " + drainname + ".get(). message=" + ee.getMessage(),
131 ChannelException.CODE_CHANNEL_DRAIN_GENERAL_FAULT);
132
133 } finally {
134 try {
135 monitor.unlock();
136 } catch (Exception eee) {
137 // Don't care
138 }
139 }
140 return candidate;
141 }
142
143 /**
144 * Block until there is something in the queue. It will always return
145 * something
146 *
147 * @return An Atom or null
148 * @throws ChannelException
149 * @see Atom
150 */
151 public Atom block() throws ChannelException {
152 Atom candidate = this.get();
153
154 try {
155 while (candidate == null) {
156 monitor.waitSignal();
157 candidate = this.get();
158 }
159 } catch (InterruptedException ie) {
160 throw new ChannelException("Channel.QueueDrain.block() interrupted.", ChannelException.CODE_CHANNEL_INTERRUPTED);
161 }
162 return candidate;
163 }
164
165 }
|