001package jmri.jmrix.sprog;
002
003import java.io.DataInputStream;
004import java.io.OutputStream;
005import java.util.Vector;
006import java.util.concurrent.BlockingQueue;
007import java.util.concurrent.LinkedBlockingQueue;
008
009import org.slf4j.Logger;
010import org.slf4j.LoggerFactory;
011
012import jmri.jmrix.AbstractPortController;
013import jmri.jmrix.sprog.SprogConstants.SprogState;
014import jmri.jmrix.sprog.serialdriver.SerialDriverAdapter;
015
016/**
017 * Converts Stream-based I/O to/from Sprog messages. The "SprogInterface" side
018 * sends/receives message objects. The connection to a SprogPortController is
019 * via a pair of *Streams, which then carry sequences of characters for
020 * transmission. Note that this processing is handled in an independent thread.
021 * <p>
022 * Rewritten during 4.11.x series. Create a high priority thread for the tc to
023 * move everything off the swing thread. Use a blocking queue to handle
024 * asynchronous messages from multiple sources.
025 *
026 * @author Bob Jacobsen Copyright (C) 2001
027 * @author Andrew Crosland Copyright (C) 2018
028 */
029public class SprogTrafficController implements SprogInterface,
030        Runnable {
031
032    private SprogReply reply = new SprogReply();
033    SprogListener lastSender = null;
034    private SprogState sprogState = SprogState.NORMAL;
035    private int lastId;
036
037    private Thread tcThread;
038    private final Object lock = new Object();
039    private boolean replyAvailable = false;
040    // Make this public so it can be overridden by a script for debug
041    public int timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT;
042
043    /**
044     * Create a new SprogTrafficController instance.
045     *
046     * @param adaptermemo the associated SystemConnectionMemo
047     */
048    @edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value="SC_START_IN_CTOR", justification="done at end, waits for data")
049    public SprogTrafficController(SprogSystemConnectionMemo adaptermemo) {
050        memo = adaptermemo;
051        init();
052    }
053
054    private void init() {
055        // Set the timeout for communication with hardware
056        resetTimeout();
057
058        tcThread = jmri.util.ThreadingUtil.newThread(this);
059        tcThread.setName("SPROG TC thread");
060        tcThread.setPriority(Thread.MAX_PRIORITY-1);
061        tcThread.setDaemon(true);
062        log.debug("starting TC thread from {} ", this, jmri.util.LoggingUtil.shortenStacktrace(new Exception("traceback"),6));
063        tcThread.start();
064    }
065
066    // Methods to implement the Sprog Interface
067
068    protected Vector<SprogListener> cmdListeners = new Vector<SprogListener>();
069
070    @Override
071    public boolean status() {
072        return (ostream != null && istream != null);
073    }
074    
075    protected boolean isTcThreadAlive() {
076        return tcThread.isAlive();
077    }
078
079    @Override
080    public synchronized void addSprogListener(SprogListener l) {
081        // add only if not already registered
082        if (l == null) {
083            throw new java.lang.NullPointerException();
084        }
085        if (!cmdListeners.contains(l)) {
086            cmdListeners.addElement(l);
087            log.debug("SprogListener added to {} tc", memo.getUserName());
088        }
089    }
090
091    @Override
092    public synchronized void removeSprogListener(SprogListener l) {
093        if (cmdListeners.contains(l)) {
094            cmdListeners.removeElement(l);
095        }
096    }
097
098    /**
099     * Reset timeout to default depending on current mode
100     */
101    public void resetTimeout() {
102        if (memo.getSprogMode() == SprogConstants.SprogMode.OPS) {
103            timeout = SprogConstants.TC_OPS_REPLY_TIMEOUT;
104        } else {
105            timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT;
106        }
107    }
108
109    public void setTimeout(int t) {
110        timeout = t;
111    }
112
113    public SprogState getSprogState() {
114        return sprogState;
115    }
116
117    public void setSprogState(SprogState s) {
118        this.sprogState = s;
119        if (s == SprogState.V4BOOTMODE) {
120            // enable flow control - required for sprog v4 bootloader
121            var controller = getController();
122            controller.setHandshake(jmri.jmrix.AbstractSerialPortController.FlowControl.RTSCTS);
123
124        } else {
125            // disable flow control
126            // removed Jan 2010 - this stops SPROG from sending. Could cause problems with
127            // serial Sprogs, but I have no way of testing:
128            // getController().setHandshake(false);
129        }
130        log.debug("Setting sprogState {}", s);
131    }
132
133    public boolean isNormalMode() {
134        return sprogState == SprogState.NORMAL;
135    }
136
137    public boolean isSIIBootMode() {
138        return sprogState == SprogState.SIIBOOTMODE;
139    }
140
141    public boolean isV4BootMode() {
142        return sprogState == SprogState.V4BOOTMODE;
143    }
144
145    @SuppressWarnings("unchecked")
146    private synchronized Vector<SprogListener> getCopyOfListeners() {
147        return (Vector<SprogListener>) cmdListeners.clone();
148
149    }
150
151    protected synchronized void notifyMessage(SprogMessage m, SprogListener originator) {
152        for (SprogListener listener : this.getCopyOfListeners()) {
153            try {
154                // don't send it back to the originator!
155                if (listener != originator) {
156                    // skip forwarding to the last sender for now, we'll get them later
157                    if (lastSender != listener) {
158                        listener.notifyMessage(m);
159                    }
160                }
161            } catch (Exception e) {
162                log.warn("notify: During dispatch to {}", listener, e);
163            }
164        }
165        // forward to the last listener who sent a message
166        // this is done _second_ so monitoring can have already stored the reply
167        // before a response is sent
168        if (lastSender != null && lastSender != originator) {
169            lastSender.notifyMessage(m);
170        }
171    }
172
173    protected synchronized void notifyReply(SprogReply r) {
174        log.debug("notifyReply starts for later, last sender: {}", lastSender);
175
176        final Vector<SprogListener> listeners = this.getCopyOfListeners();
177        final SprogReply replyForLater = r;
178        final SprogListener senderForLater = lastSender;
179        Runnable rl = () -> {
180            for (SprogListener listener : listeners) {
181                try {
182                    // don't send message back to the originator!
183                    // skip forwarding to the last sender for now, we'll get them later
184                    if (senderForLater != listener) {
185                        listener.notifyReply(replyForLater);
186                    }
187
188                } catch (Exception e) {
189                    log.warn("notify: During dispatch to {}", listener, e);
190                }
191            }
192            // forward to the last listener who sent a message
193            // this is done _second_ so monitoring can have already stored the reply
194            // before a response is sent
195            if (senderForLater != null) {
196                senderForLater.notifyReply(replyForLater);
197            }
198        };
199        javax.swing.SwingUtilities.invokeLater(rl);
200    }
201
202    protected synchronized void notifyReply(SprogReply r, SprogListener lastSender) {
203        log.debug("notifyReply starts for later, last sender: {}", lastSender);
204
205        final Vector<SprogListener> listeners = this.getCopyOfListeners();
206        final SprogReply replyForLater = r;
207        final SprogListener senderForLater = lastSender;
208        Runnable rl = () -> {
209            log.debug("notifyReply starts last sender: {}", senderForLater);
210            for (SprogListener listener : listeners) {
211                try {
212                //if is message don't send it back to the originator!
213                    // skip forwarding to the last sender for now, we'll get them later
214                    if (senderForLater != listener) {
215                        log.debug("Notify listener: {} {}", listener, r.toString());
216                        listener.notifyReply(replyForLater);
217                    }
218
219                } catch (Exception e) {
220                    log.warn("notify: During dispatch to {}", listener, e);
221                }
222            }
223
224            // forward to the last listener who sent a message
225            // this is done _second_ so monitoring can have already stored the reply
226            // before a response is sent
227            if (senderForLater != null) {
228                log.debug("notify last sender: {} {}", senderForLater, replyForLater.toString());
229                senderForLater.notifyReply(replyForLater);
230            }
231        };
232        javax.swing.SwingUtilities.invokeLater(rl);
233    }
234
235    // A class to remember the message and who sent it
236    static private class MessageTuple {
237        private final SprogMessage message;
238        private final SprogListener listener;
239
240        public MessageTuple(SprogMessage m, SprogListener l) {
241            message = m;
242            listener = l;
243        }
244
245        // Copy constructor
246        public MessageTuple(MessageTuple mt) {
247            message = mt.message;
248            listener = mt.listener;
249        }
250    }
251
252    // The queue to hold messages being sent
253    BlockingQueue<MessageTuple> sendQueue = new LinkedBlockingQueue<MessageTuple>();
254
255    /**
256     * Enqueue a preformatted message to be sent to the actual interface
257     *
258     * @param m The message to be forwarded
259     */
260    public void sendSprogMessage(SprogMessage m) {
261        log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId());
262        try {
263            sendQueue.add(new MessageTuple(m, null));
264        } catch (Exception e) {
265            log.error("Could not add message to queue", e);
266        }
267    }
268
269    /**
270     * Enqueue a preformatted message to be sent to the actual interface
271     *
272     * @param m         Message to send
273     * @param replyTo   Who is sending the message
274     */
275    @Override
276    public synchronized void sendSprogMessage(SprogMessage m, SprogListener replyTo) {
277        log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId());
278        try {
279            sendQueue.add(new MessageTuple(m, replyTo));
280        } catch (Exception e) {
281            log.error("Could not add message to queue", e);
282        }
283    }
284
285    /**
286     * Block until a message is available from the queue, send it to the interface
287     * and then block until reply is received or a timeout occurs. This will be
288     * a very long timeout to allow for page mode programming operations in SPROG
289     * programmer mode.
290     */
291    @Override
292    public void run() {
293        MessageTuple messageToSend;
294        log.debug("Traffic controller queuing thread starts");
295        while (true) {
296            log.debug("Traffic controller queue waiting");
297            try {
298                messageToSend = new MessageTuple(sendQueue.take());
299            } catch (InterruptedException e) {
300                log.debug("Thread interrupted while dequeuing message to send");
301                return;
302            }
303            log.debug("Message dequeued {} id: {}", messageToSend.message, messageToSend.message.getId());
304            // remember who sent this
305            lastSender = messageToSend.listener;
306            lastId = messageToSend.message.getId();
307            // notify all _other_ listeners
308            notifyMessage(messageToSend.message, messageToSend.listener);
309            replyAvailable = false;
310            sendToInterface(messageToSend.message);
311            log.debug("Waiting {} for a reply", timeout);
312            try {
313                synchronized (lock) {
314                    lock.wait(timeout); // Wait for notify
315                }
316            } catch (InterruptedException e) {
317                log.debug("waitingForReply interrupted");
318            }
319            if (!replyAvailable) {
320                // Timed out
321                log.warn("Timeout waiting for reply from hardware in SprogState {}", sprogState);
322            } else {
323                log.debug("Notified of reply");
324            }
325        }
326    }
327
328    /**
329     * Forward a preformatted message to the interface.
330     *
331     * @param m The message to be forwarded
332     */
333    public void sendToInterface(SprogMessage m) {
334        // stream to port in single write, as that's needed by serial
335        try {
336            if (ostream != null) {
337                ostream.write(m.getFormattedMessage(sprogState));
338                log.debug("sendSprogMessage written to ostream");
339            } else {
340                // no stream connected
341                log.warn("sendMessage: no connection established");
342            }
343        } catch (Exception e) {
344            log.warn("sendMessage: Exception: ", e);
345        }
346    }
347
348// methods to connect/disconnect to a source of data in a SprogPortController
349    private AbstractPortController controller = null;
350
351    /**
352     * Make connection to existing PortController object.
353     *
354     * @param p The port controller
355     */
356    public void connectPort(AbstractPortController p) {
357        istream = p.getInputStream();
358        ostream = p.getOutputStream();
359        if (controller != null) {
360            log.warn("connectPort: connect called while connected");
361        }
362        controller = p;
363    }
364
365    /**
366     * Get the port controller, as a SerialDriverAdapter.
367     *
368     * @return the port controller
369     */
370    protected SerialDriverAdapter getController(){
371       return (SerialDriverAdapter) controller;
372    }
373
374    /**
375     * Break connection to existing SprogPortController object.
376     * <p>
377     * Once broken, attempts to send via "message" member will fail.
378     *
379     * @param p the connection to break
380     */
381    public void disconnectPort(AbstractPortController p) {
382        istream = null;
383        ostream = null;
384        if (controller != p) {
385            log.warn("disconnectPort: disconnect called from non-connected SprogPortController");
386        }
387        controller = null;
388    }
389
390    static volatile protected SprogTrafficController self = null;
391
392    public void setAdapterMemo(SprogSystemConnectionMemo adaptermemo) {
393        memo = adaptermemo;
394    }
395
396    public SprogSystemConnectionMemo getAdapterMemo() {
397        return memo;
398    }
399
400    private SprogSystemConnectionMemo memo = null;
401
402    // data members to hold the streams
403    DataInputStream istream = null;
404    OutputStream ostream = null;
405
406    boolean endReply(SprogReply msg) {
407        return msg.endNormalReply() || msg.endBootReply();
408    }
409
410    private boolean unsolicited;
411
412    /**
413     * Handle an incoming reply.
414     */
415    public void handleOneIncomingReply() {
416        // we get here if data has been received and this method is explicitly invoked
417        // fill the current reply with any data received
418        int replyCurrentSize = reply.getNumDataElements();
419        int i;
420        for (i = replyCurrentSize; i < SprogReply.maxSize - replyCurrentSize; i++) {
421            try {
422                if (istream.available() == 0) {
423                    break; // nothing waiting to be read
424                }
425                byte char1 = istream.readByte();
426                reply.setElement(i, char1);
427
428            } catch (Exception e) {
429                log.warn("Exception in DATA_AVAILABLE state", e);
430            }
431            if (endReply(reply)) {
432                sendreply();
433                break;
434            }
435        }
436    }
437
438    /**
439     * Send the current reply - built using data from serialEvent.
440     */
441    private void sendreply() {
442        //send the reply
443        log.debug("dispatch reply of length {} in SprogState {}", reply.getNumDataElements(), sprogState);
444        if (unsolicited) {
445            log.debug("Unsolicited Reply");
446            reply.setUnsolicited();
447        }
448        // Insert the id
449        reply.setId(lastId);
450        notifyReply(reply, lastSender);
451        log.debug("Notify() wait");
452        replyAvailable = true;
453        synchronized(lock) {
454            lock.notifyAll();
455        }
456
457        //Create a new reply, ready to be filled
458        reply = new SprogReply();
459    }
460
461    public void dispose(){
462       tcThread.interrupt();
463    }
464
465    private final static Logger log = LoggerFactory.getLogger(SprogTrafficController.class);
466
467}