001package jmri.jmrix.xpa;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.io.DataInputStream;
005import java.io.OutputStream;
006import java.util.ArrayList;
007import java.util.LinkedList;
008import java.util.NoSuchElementException;
009import org.slf4j.Logger;
010import org.slf4j.LoggerFactory;
011
012/**
013 * Converts Stream-based I/O to/from Xpa messages. The "XpaInterface" side
014 * sends/receives message objects. The connection to an XpaPortController is via
015 * a pair of *Streams, which then carry sequences of characters for
016 * transmission. Note that this processing is handled in an independent thread.
017 *
018 * @author Paul Bender Copyright (C) 2004, 2016
019 */
020public class XpaTrafficController implements XpaInterface, Runnable {
021
022    // Linked list to store the transmit queue.
023    final LinkedList<byte[]> xmtList = new LinkedList<>();
024
025    /**
026     * (local class) object to implement the transmit thread
027     */
028    final XmtHandler xmtHandler = new XmtHandler();
029    Thread xmtThread = null;
030
031    /**
032     * Create a new XpaTrafficController instance.
033     */
034    public XpaTrafficController() {
035        if (log.isDebugEnabled()) {
036            log.debug("setting instance: {}", this);
037        }
038    }
039
040    /**
041     * Start the Transmit thread.
042     */
043    public void startTransmitThread() {
044        if (xmtThread == null) {
045            // Start the xmtHandler thread
046            xmtThread = new Thread(xmtHandler, "XPA transmit handler");
047            xmtThread.setPriority(Thread.MAX_PRIORITY - 1);
048            xmtThread.start();
049        }
050    }
051
052    protected final ArrayList<XpaListener> cmdListeners = new ArrayList<>();
053
054    @Override
055    public boolean status() {
056        return (ostream != null && istream != null);
057    }
058
059    @Override
060    public synchronized void addXpaListener(XpaListener l) {
061        // add only if not already registered
062        if (l == null) {
063            throw new java.lang.NullPointerException();
064        }
065        if (!cmdListeners.contains(l)) {
066            cmdListeners.add(l);
067        }
068    }
069
070    @Override
071    public synchronized void removeXpaListener(XpaListener l) {
072        cmdListeners.remove(l);
073    }
074
075    /**
076     * Forward a XpaMessage to all registered XpaInterface listeners.
077     *
078     * @param m     the message to forward
079     * @param notMe registered listener not to forward the message to
080     */
081    protected void notifyMessage(XpaMessage m, XpaListener notMe) {
082        // make a copy of the listener vector to synchronized not needed for transmit
083        ArrayList<XpaListener> v;
084        synchronized (this) {
085            v = new ArrayList<>(cmdListeners);
086        }
087        // forward to all listeners
088        for (XpaListener client : v) {
089            if (notMe != client) {
090                if (log.isDebugEnabled()) {
091                    log.debug("notify client: {}", client);
092                }
093                try {
094                    client.message(m);
095                } catch (Exception e) {
096                    log.warn("notify: During dispatch to {}", client, e);
097                }
098            }
099        }
100    }
101
102    XpaListener lastSender = null;
103
104    protected void notifyReply(XpaMessage r) {
105        // make a copy of the listener vector to synchronized (not needed for transmit?)
106        ArrayList<XpaListener> v;
107        synchronized (this) {
108            v = new ArrayList<>(cmdListeners);
109        }
110        // forward to all listeners
111        for (XpaListener client : v) {
112            if (log.isDebugEnabled()) {
113                log.debug("notify client: {}", client);
114            }
115            try {
116                // Skip forwarding the message to the last sender until
117                // later.
118                if (lastSender != client) {
119                    client.reply(r);
120                }
121            } catch (Exception e) {
122                log.warn("notify: During dispatch to {}", client, e);
123            }
124        }
125
126        // forward to the last listener who send a message
127        // this is done _second_ so monitoring can have already stored the reply
128        // before a response is sent
129        if (lastSender != null) {
130            lastSender.reply(r);
131        }
132    }
133
134    /**
135     * Forward a pre-formatted message to the actual interface.
136     *
137     * @param m     the message to forward
138     * @param reply the listener to receive the reply
139     */
140    @SuppressFBWarnings(value = {"NO_NOTIFY_NOT_NOTIFYALL"},
141            justification = "Notify is used because Having more than one thread waiting on xmtHandler is an error.")
142    @Override
143    synchronized public void sendXpaMessage(XpaMessage m, XpaListener reply) {
144        if (log.isDebugEnabled()) {
145            log.debug("sendXpaMessage message: [{}]", m);
146        }
147        // remember who sent this
148        lastSender = reply;
149
150        // notify all _other_ listeners
151        notifyMessage(m, reply);
152
153        // stream to port in single write, as that's needed by serial
154        int len = m.getNumDataElements();
155        int cr = 1;  // space for carriage return linefeed
156
157        byte[] msg = new byte[len + cr];
158
159        for (int i = 0; i < len; i++) {
160            msg[i] = (byte) m.getElement(i);
161        }
162        msg[len] = 0x0d;
163
164        //queue the request to send, and notify the xmtHandler.
165        synchronized (xmtHandler) {
166            xmtList.addLast(msg);
167            xmtHandler.notify();
168        }
169
170    }
171
172    // methods to connect/disconnect to a source of data in a XpaPortController
173    private XpaPortController controller = null;
174
175    /**
176     * Make connection to existing PortController object.
177     *
178     * @param p controller for the port associated with this controller
179     */
180    public void connectPort(XpaPortController p) {
181        istream = p.getInputStream();
182        ostream = p.getOutputStream();
183        if (controller != null) {
184            log.warn("connectPort: connect called while connected");
185        }
186        controller = p;
187        // Send the initialization string to the port
188        this.sendXpaMessage(XpaMessage.getDefaultInitMsg(), null);
189    }
190
191    /**
192     * Break connection to existing XpaPortController object. Once broken,
193     * attempts to send via "message" member will fail.
194     *
195     * @param p controller for the port associated with this controller
196     */
197    public void disconnectPort(XpaPortController p) {
198        istream = null;
199        ostream = null;
200        if (controller != p) {
201            log.warn("disconnectPort: disconnect called from non-connected XpaPortController");
202        }
203        controller = null;
204    }
205
206    // data members to hold the streams
207    DataInputStream istream = null;
208    OutputStream ostream = null;
209
210    /**
211     * Handle incoming characters. This is a permanent loop, looking for input
212     * messages in character form on the stream connected to the PortController
213     * via <code>connectPort</code>. Terminates with the input stream breaking
214     * out of the try block.
215     */
216    @Override
217    public void run() {
218        while (true) {   // loop permanently, stream close will exit via exception
219            try {
220                handleOneIncomingReply();
221            } catch (java.io.IOException e) {
222                log.warn("run: Exception: {}", e.toString());
223            }
224        }
225    }
226
227    void handleOneIncomingReply() throws java.io.IOException {
228        // we sit in this until the message is complete, relying on
229        // threading to let other stuff happen
230
231        // Create output message
232        XpaMessage msg = new XpaMessage();
233        // message exists, now fill it
234        int i;
235        for (i = 0; i < XpaMessage.MAX_SIZE; i++) {
236            byte char1 = istream.readByte();
237            msg.setElement(i, char1);
238            //if (endReply(msg)) break;
239        }
240
241        // message is complete, dispatch it !!
242        if (log.isDebugEnabled()) {
243            log.debug("dispatch reply of length {}", i);
244        }
245        {
246            final XpaMessage thisMsg = msg;
247            final XpaTrafficController thisTc = this;
248            // return a notification via the queue to ensure end
249            Runnable r = new Runnable() {
250                final XpaMessage msgForLater = thisMsg;
251                final XpaTrafficController myTc = thisTc;
252
253                @Override
254                public void run() {
255                    log.debug("Delayed notify starts");
256                    myTc.notifyReply(msgForLater);
257                }
258            };
259            javax.swing.SwingUtilities.invokeLater(r);
260        }
261    }
262
263    /**
264     * Captive class to handle transmission.
265     */
266    class XmtHandler implements Runnable {
267
268        @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "NO_NOTIFY_NOT_NOTIFYALL"},
269                justification = "while loop controls access")
270        @Override
271        public void run() {
272            while (true) { //  loop forever
273                // Check to see if there is anything to send
274                try {
275                    // get content; failure is a NoSuchElementException
276                    if (log.isDebugEnabled()) {
277                        log.debug("check for input");
278                    }
279                    byte[] msg;
280                    synchronized (this) {
281                        msg = xmtList.removeFirst();
282                    }
283
284                    // Now send this to the port
285                    try {
286                        if (ostream != null) {
287                            if (log.isDebugEnabled()) {
288                                log.debug("write message: {}", java.util.Arrays.toString(msg));
289                            }
290                            synchronized (ostream) {
291                                ostream.write(msg);
292                                ostream.notify();
293                            }
294                        } else {
295                            // no stream connected
296                            log.warn("sendMessage: no connection established");
297                        }
298                    } catch (java.io.IOException e) {
299                        log.warn("sendMessage: Exception: {}", e.toString());
300                    }
301                } catch (NoSuchElementException e) {
302                    // message queue was empty, wait for input
303                    if (log.isDebugEnabled()) {
304                        log.debug("start wait");
305                    }
306                    try {
307                        synchronized (this) {
308                            wait();
309                        }
310                    } catch (java.lang.InterruptedException ei) {
311                        Thread.currentThread().interrupt(); // retain if needed later
312                    }
313                    if (log.isDebugEnabled()) {
314                        log.debug("end wait");
315                    }
316                }
317            }
318        }
319    }
320
321    private final static Logger log = LoggerFactory.getLogger(XpaTrafficController.class);
322
323}