001package jmri.jmrix;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.io.DataInputStream;
005import java.io.IOException;
006import java.io.OutputStream;
007import java.util.*;
008import javax.swing.SwingUtilities;
009
010import jmri.InstanceManager;
011import jmri.ShutDownManager;
012import jmri.ShutDownTask;
013
014/**
015 * Abstract base for TrafficControllers in a Message/Reply protocol.
016 * <p>
017 * Two threads are used for the actual communication. The "Transmit" thread
018 * handles pushing characters to the port, and also changing the mode. The
019 * "Receive" thread converts characters from the input stream into replies.
020 * <p>
021 * The constructor registers a shutdown task to
022 * trigger the necessary cleanup code
023 * <p>
024 * The internal state machine handles changes of mode, automatic retry of
025 * certain messages, time outs, and sending poll messages when otherwise idle.
026 * <p>
027 * "Mode" refers to the state of the command station communications. "Normal"
028 * and "Programming" are the two modes, used if the command station requires
029 * messages to go back and forth between them. <br>
030 *
031 * <img src="doc-files/AbstractMRTrafficController-StateDiagram.png" alt="UML State diagram">
032 *
033 * <p>
034 * The key methods for the basic operation are:
035 * <ul>
036 * <li>If needed for formatting outbound messages, {@link #addHeaderToOutput(byte[], AbstractMRMessage)} and {@link #addTrailerToOutput(byte[], int, AbstractMRMessage)}
037 * <li> {@link #newReply()} creates an empty reply message (of the proper concrete type) to fill with incoming data
038 * <li>The {@link #endOfMessage(AbstractMRReply) } method is used to parse incoming messages. If it needs
039 *      information on e.g. the last message sent, that can be stored in member variables
040 *      by {@link #forwardToPort(AbstractMRMessage, AbstractMRListener)}.
041 *  <li>{@link #forwardMessage(AbstractMRListener, AbstractMRMessage)} and {@link #forwardReply(AbstractMRListener, AbstractMRReply) } handle forwarding of specific types of objects
042 * </ul>
043 * <p>
044 * If your command station requires messages to go in and out of
045 * "programming mode", those should be provided by
046 * {@link #enterProgMode()} and {@link #enterNormalMode()}.
047 * <p>
048 * If you want to poll for information when the line is otherwise idle,
049 * implement {@link #pollMessage()} and {@link #pollReplyHandler()}.
050 *
051 * @author Bob Jacobsen Copyright (C) 2003
052 * @author Paul Bender Copyright (C) 2004-2010
053 */
054
055/*
056@startuml jmri/jmrix/doc-files/AbstractMRTrafficController-StateDiagram.png
057
058    [*] --> IDLESTATE
059    IDLESTATE --> NOTIFIEDSTATE : sendMessage()
060    NOTIFIEDSTATE --> IDLESTATE : queue empty
061
062    NOTIFIEDSTATE --> WAITMSGREPLYSTATE : transmitLoop()\nwake, send message
063
064    WAITMSGREPLYSTATE --> WAITREPLYINPROGMODESTATE : transmitLoop()\nnot in PROGRAMINGMODE,\nmsg for PROGRAMINGMODE
065    WAITMSGREPLYSTATE --> WAITREPLYINNORMMODESTATE : transmitLoop()\nnot in NORMALMODE,\nmsg for NORMALMODE
066
067    WAITMSGREPLYSTATE --> NOTIFIEDSTATE : handleOneIncomingReply()
068
069    WAITREPLYINPROGMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered PROGRAMINGMODE
070    WAITREPLYINNORMMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered NORMALMODE
071    OKSENDMSGSTATE --> WAITMSGREPLYSTATE : send original pended message
072
073    IDLESTATE --> POLLSTATE : transmitLoop()\nno work
074    POLLSTATE --> WAITMSGREPLYSTATE : transmitLoop()\npoll msg exists, send it
075    POLLSTATE --> IDLESTATE : transmitLoop()\nno poll msg to send
076
077    WAITMSGREPLYSTATE --> AUTORETRYSTATE : handleOneIncomingReply()\nwhen tagged as error reply
078    AUTORETRYSTATE --> IDLESTATE : to drive a repeat of a message
079
080NOTIFIEDSTATE : Transmit thread wakes up and processes
081POLLSTATE : Transient while deciding to send poll
082OKSENDMSGSTATE : Transient while deciding to send\noriginal message after mode change
083AUTORETRYSTATE : Transient while deciding to resend auto-retry message
084WAITREPLYINPROGMODESTATE : Sent request to go to programming mode,\nwaiting reply
085WAITREPLYINNORMMODESTATE : Sent request to go to normal mode,\nwaiting reply
086WAITMSGREPLYSTATE : Have sent message, waiting a\nresponse from layout
087
088Note left of AUTORETRYSTATE : This state handles timeout of\nmessages marked for autoretry
089Note left of OKSENDMSGSTATE : Transient internal state\nwill transition when going back\nto send message that\nwas deferred for mode change.
090
091@enduml
092 */
093
094public abstract class AbstractMRTrafficController {
095
096    private final Runnable shutDownTask = this::terminate; // retain for possible removal.
097
098    /**
099     * Create a new unnamed MRTrafficController.
100     */
101    public AbstractMRTrafficController() {
102        log.debug("Creating AbstractMRTrafficController instance");
103        mCurrentMode = NORMALMODE;
104        mCurrentState = IDLESTATE;
105        allowUnexpectedReply = false;
106
107
108        // We use a shutdown task here to make sure the connection is left
109        // in a clean state prior to exiting.  This is required on systems
110        // which have a service mode to ensure we don't leave the system
111        // in an unusable state. Once the shutdown task executes, the connection
112        // must be considered permanently closed.
113
114        InstanceManager.getDefault(ShutDownManager.class).register(shutDownTask);
115    }
116
117    private boolean synchronizeRx = true;
118
119    protected void setSynchronizeRx(boolean val) {
120        synchronizeRx = val;
121    }
122
123    protected boolean getSynchronizeRx() {
124        return synchronizeRx;
125    }
126
127    // The methods to implement the abstract Interface
128
129    protected final Vector<AbstractMRListener> cmdListeners = new Vector<>();
130
131    protected synchronized void addListener(AbstractMRListener l) {
132        // add only if not already registered
133        if (l == null) {
134            throw new NullPointerException();
135        }
136        if (!cmdListeners.contains(l)) {
137            cmdListeners.addElement(l);
138        }
139    }
140
141    protected synchronized void removeListener(AbstractMRListener l) {
142        if (cmdListeners.contains(l)) {
143            cmdListeners.removeElement(l);
144        }
145    }
146
147    /**
148     * Forward a Message to registered listeners.
149     *
150     * @param m     Message to be forwarded intact
151     * @param notMe One (optional) listener to be skipped, usually because it's
152     *              the originating object.
153     */
154    @SuppressWarnings("unchecked")
155    protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) {
156        // make a copy of the listener vector to synchronized not needed for transmit
157        Vector<AbstractMRListener> v;
158        synchronized (this) {
159            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
160            v = (Vector<AbstractMRListener>) cmdListeners.clone();
161        }
162        // forward to all listeners
163        int cnt = v.size();
164        for (int i = 0; i < cnt; i++) {
165            AbstractMRListener client = v.elementAt(i);
166            if (notMe != client) {
167                log.debug("notify message, client: {}", client);
168                try {
169                    forwardMessage(client, m);
170                } catch (RuntimeException e) {
171                    log.warn("notify: During message dispatch to {}", client, e);
172                }
173            }
174        }
175    }
176
177    /**
178     * Implement this to forward a specific message type to a protocol-specific
179     * listener interface.
180     * This puts the casting into the concrete class.
181     * @param client abstract listener.
182     * @param m message to forward.
183     */
184    protected abstract void forwardMessage(AbstractMRListener client, AbstractMRMessage m);
185
186    /**
187     * Invoked if it's appropriate to do low-priority polling of the command
188     * station, this should return the next message to send, or null if the
189     * TrafficController should just sleep.
190     * @return Formatted poll message
191     */
192    protected abstract AbstractMRMessage pollMessage();
193
194    protected abstract AbstractMRListener pollReplyHandler();
195
196    protected AbstractMRListener mLastSender = null;
197
198    protected volatile int mCurrentMode;
199    public static final int NORMALMODE = 1;
200    public static final int PROGRAMINGMODE = 4;
201
202    /**
203     * Set the system to programming mode.
204     * @see #enterNormalMode()
205     *
206     * @return any message that needs to be returned to the Command Station
207     * to change modes. If no message is needed, returns null.
208     */
209    protected abstract AbstractMRMessage enterProgMode();
210
211    /**
212     * Sets the system to normal mode during programming while in IDLESTATE.
213     * If {@link #programmerIdle()} returns true, enterNormalMode() is
214     * called after a timeout.
215     * @see #enterProgMode()
216     *
217     * @return any message that needs to be returned to the Command Station
218     * to change modes. If no message is needed, returns null.
219     */
220    protected abstract AbstractMRMessage enterNormalMode();
221
222    /**
223     * Check if the programmer is idle.
224     * Override in the system specific code if necessary (see notes for
225     * {@link #enterNormalMode()}.
226     *
227     * @return true if not busy programming
228     */
229    protected boolean programmerIdle() {
230        return true;
231    }
232
233    /**
234     * Get the delay (wait time) after enabling the programming track.
235     * Override in subclass to add a longer delay.
236     *
237     * @return 0 as default delay
238     */
239    protected int enterProgModeDelayTime() {
240        return 0;
241    }
242
243    protected volatile int mCurrentState;
244    public static final int IDLESTATE = 10;        // nothing happened
245    public static final int NOTIFIEDSTATE = 15;    // xmt notified, will next wake
246    public static final int WAITMSGREPLYSTATE = 25;  // xmt has sent, await reply to message
247    public static final int WAITREPLYINPROGMODESTATE = 30;  // xmt has done mode change, await reply
248    public static final int WAITREPLYINNORMMODESTATE = 35;  // xmt has done mode change, await reply
249    public static final int OKSENDMSGSTATE = 40;        // mode change reply here, send original msg
250    public static final int AUTORETRYSTATE = 45;        // received message where automatic recovery may occur with a retransmission, re-send original msg
251    public static final int POLLSTATE = 50;   // Send program mode or poll message
252
253    protected boolean allowUnexpectedReply;
254
255    /**
256     * Set whether the command station may send messages without a request
257     * sent to it.
258     *
259     * @param expected true to allow messages without a prior request
260     */
261    protected void setAllowUnexpectedReply(boolean expected) {
262        allowUnexpectedReply = expected;
263    }
264
265    /**
266     * Forward a "Reply" from layout to registered listeners.
267     *
268     * @param r    Reply to be forwarded intact
269     * @param dest One (optional) listener to be skipped, usually because it's
270     *             the originating object.
271     */
272    @SuppressWarnings("unchecked")
273    protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) {
274        // make a copy of the listener vector to synchronized (not needed for transmit?)
275        Vector<AbstractMRListener> v;
276        synchronized (this) {
277            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
278            v = (Vector<AbstractMRListener>) cmdListeners.clone();
279        }
280        // forward to all listeners
281        int cnt = v.size();
282        for (int i = 0; i < cnt; i++) {
283            AbstractMRListener client = v.elementAt(i);
284            log.debug("notify reply, client: {}", client);
285            try {
286                //skip dest for now, we'll send the message to there last.
287                if (dest != client) {
288                    forwardReply(client, r);
289                }
290            } catch (RuntimeException e) {
291                log.warn("notify: During reply dispatch to {}", client, e);
292            }
293        }
294
295        // forward to the last listener who sent a message
296        // this is done _second_ so monitoring can have already stored the reply
297        // before a response is sent
298        if (dest != null) {
299            log.debug("notify reply, dest: {}", dest);
300            forwardReply(dest, r);
301        }
302    }
303
304    protected abstract void forwardReply(AbstractMRListener client, AbstractMRReply m);
305
306    /**
307     * Messages to be transmitted.
308     */
309    protected LinkedList<AbstractMRMessage> msgQueue = new LinkedList<>();
310    protected LinkedList<AbstractMRListener> listenerQueue = new LinkedList<>();
311
312    /**
313     * Forward message to the port. Messages are queued and then the
314     * transmission thread is notified.
315     * @see #forwardToPort(AbstractMRMessage, AbstractMRListener)
316     *
317     * @param m the message to send
318     * @param reply the Listener sending the message, often provided as 'this'
319     */
320    protected synchronized void sendMessage(AbstractMRMessage m, AbstractMRListener reply) {
321        msgQueue.addLast(m);
322        listenerQueue.addLast(reply);
323        synchronized (xmtRunnable) {
324            if (mCurrentState == IDLESTATE) {
325                mCurrentState = NOTIFIEDSTATE;
326                xmtRunnable.notify();
327            }
328        }
329        if (m != null) {
330            log.debug("just notified transmit thread with message {}", m);
331        }
332    }
333
334    /**
335     * Permanent loop for the transmit thread.
336     */
337    protected void transmitLoop() {
338        log.debug("transmitLoop starts in {}", this);
339
340        // loop forever
341        while (!connectionError && !threadStopRequest) {
342            AbstractMRMessage m = null;
343            AbstractMRListener l = null;
344            // check for something to do
345            synchronized (this) {
346                if (!msgQueue.isEmpty()) {
347                    // yes, something to do
348                    m = msgQueue.getFirst();
349                    msgQueue.removeFirst();
350                    l = listenerQueue.getFirst();
351                    listenerQueue.removeFirst();
352                    mCurrentState = WAITMSGREPLYSTATE;
353                    log.debug("transmit loop has something to do: {}", m);
354                }  // release lock here to proceed in parallel
355            }
356            // if a message has been extracted, process it
357            if (m != null) {
358                // check for need to change mode
359                log.debug("Start msg, state = {}", mCurrentMode);
360                if (m.getNeededMode() != mCurrentMode) {
361                    AbstractMRMessage modeMsg;
362                    if (m.getNeededMode() == PROGRAMINGMODE) {
363                        // change state to programming mode and send message
364                        modeMsg = enterProgMode();
365                        if (modeMsg != null) {
366                            mCurrentState = WAITREPLYINPROGMODESTATE;
367                            log.debug("Enter Programming Mode");
368                            forwardToPort(modeMsg, null);
369                            // wait for reply
370                            transmitWait(m.getTimeout(), WAITREPLYINPROGMODESTATE, "enter programming mode interrupted");
371                        }
372                    } else {
373                        // change state to normal and send message
374                        modeMsg = enterNormalMode();
375                        if (modeMsg != null) {
376                            mCurrentState = WAITREPLYINNORMMODESTATE;
377                            log.debug("Enter Normal Mode");
378                            forwardToPort(modeMsg, null);
379                            // wait for reply
380                            transmitWait(m.getTimeout(), WAITREPLYINNORMMODESTATE, "enter normal mode interrupted");
381                        }
382                    }
383                    if (modeMsg != null) {
384                        checkReplyInDispatch();
385                        if (mCurrentState != OKSENDMSGSTATE) {
386                            handleTimeout(modeMsg, l);
387                        }
388                        mCurrentState = WAITMSGREPLYSTATE;
389                    } else {
390                        // no mode message required, but the message
391                        // needs a different mode
392                        log.debug("Setting mode to: {}", m.getNeededMode());
393                        mCurrentMode = m.getNeededMode();
394                    }
395                }
396                forwardToPort(m, l);
397                // reply expected?
398                if (m.replyExpected()) {
399                    log.debug("reply expected is true for message {}",m);
400                    // wait for a reply, or eventually timeout
401                    transmitWait(m.getTimeout(), WAITMSGREPLYSTATE, "transmitLoop interrupted");
402                    checkReplyInDispatch();
403                    if (mCurrentState == WAITMSGREPLYSTATE) {
404                        handleTimeout(m, l);
405                    } else if (mCurrentState == AUTORETRYSTATE) {
406                        log.info("Message added back to queue: {}", m);
407                        msgQueue.addFirst(m);
408                        listenerQueue.addFirst(l);
409                        synchronized (xmtRunnable) {
410                            mCurrentState = IDLESTATE;
411                        }
412                    } else {
413                        resetTimeout(m);
414                    }
415                } // just continue to the next message from here
416            } else {
417                // nothing to do
418                if (mCurrentState != IDLESTATE) {
419                    log.debug("Setting IDLESTATE");
420                    log.debug("Current Mode {}", mCurrentMode);
421                    mCurrentState = IDLESTATE;
422                }
423                // wait for something to send
424                if (mWaitBeforePoll > waitTimePoll || mCurrentMode == PROGRAMINGMODE) {
425                    try {
426                        long startTime = Calendar.getInstance().getTimeInMillis();
427                        synchronized (xmtRunnable) {
428                            xmtRunnable.wait(mWaitBeforePoll);
429                        }
430                        long endTime = Calendar.getInstance().getTimeInMillis();
431                        waitTimePoll = waitTimePoll + endTime - startTime;
432                    } catch (InterruptedException e) {
433                        Thread.currentThread().interrupt(); // retain if needed later
434                        // end of transmit loop
435                        break;
436                    }
437                }
438                // once we decide that mCurrentState is in the IDLESTATE and there's an xmt msg we must guarantee
439                // the change of mCurrentState to one of the waiting for reply states.  Therefore we need to synchronize.
440                synchronized (this) {
441                    if (mCurrentState != NOTIFIEDSTATE && mCurrentState != IDLESTATE) {
442                        log.error("left timeout in unexpected state: {}", mCurrentState);
443                    }
444                    if (mCurrentState == IDLESTATE) {
445                        mCurrentState = POLLSTATE; // this prevents other transitions from the IDLESTATE
446                    }
447                }
448                // went around with nothing to do; leave programming state if in it
449                if (mCurrentMode == PROGRAMINGMODE) {
450                    log.debug("Timeout - in service mode");
451                }
452                if (mCurrentState == POLLSTATE && mCurrentMode == PROGRAMINGMODE && programmerIdle()) {
453                    log.debug("timeout causes leaving programming mode");
454                    mCurrentState = WAITREPLYINNORMMODESTATE;
455                    AbstractMRMessage msg = enterNormalMode();
456                    // if the enterNormalMode() message is null, we
457                    // don't want to try to send it to the port.
458                    if (msg != null) {
459                        forwardToPort(msg, null);
460                        // wait for reply
461                        transmitWait(msg.getTimeout(), WAITREPLYINNORMMODESTATE, "interrupted while leaving programming mode");
462                        checkReplyInDispatch();
463                        // exit program mode timeout?
464                        if (mCurrentState == WAITREPLYINNORMMODESTATE) {
465                            // entering normal mode via timeout
466                            handleTimeout(msg, l);
467                            mCurrentMode = NORMALMODE;
468                        }
469                        // and go around again
470                    }
471                } else if (mCurrentState == POLLSTATE && mCurrentMode == NORMALMODE) {
472                    // We may need to poll
473                    AbstractMRMessage msg = pollMessage();
474                    if (msg != null) {
475                        // yes, send that
476                        log.debug("Sending poll, wait time {}", waitTimePoll);
477                        mCurrentState = WAITMSGREPLYSTATE;
478                        forwardToPort(msg, pollReplyHandler());
479                        // wait for reply
480                        log.debug("Still waiting for reply");
481                        transmitWait(msg.getTimeout(), WAITMSGREPLYSTATE, "interrupted while waiting poll reply");
482                        checkReplyInDispatch();
483                        // and go around again
484                        if (mCurrentState == WAITMSGREPLYSTATE) {
485                            handleTimeout(msg, l);
486                        } else {
487                            resetTimeout(msg);
488                        }
489                    }
490                    waitTimePoll = 0;
491                }
492                // no messages, so back to idle
493                if (mCurrentState == POLLSTATE) {
494                    mCurrentState = IDLESTATE;
495                }
496            }
497        }
498    }   // end of transmit loop; go around again
499
500    protected void transmitWait(int waitTime, int state, String interruptMessage) {
501        // wait() can have spurious wakeup!
502        // so we protect by making sure the entire timeout time is used
503        long currentTime = Calendar.getInstance().getTimeInMillis();
504        long endTime = currentTime + waitTime;
505        while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) {
506            long wait = endTime - currentTime;
507            try {
508                synchronized (xmtRunnable) {
509                    // Do not wait if the current state has changed since we
510                    // last set it.
511                    if (mCurrentState != state) {
512                        return;
513                    }
514                    xmtRunnable.wait(wait); // rcvr normally ends this w state change
515                }
516            } catch (InterruptedException e) {
517                Thread.currentThread().interrupt(); // retain if needed later
518                String[] packages = this.getClass().getName().split("\\.");
519                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
520                        +(packages.length>=1 ? packages[packages.length-1] :"");
521                if (!threadStopRequest) {
522                    log.error("{} in transmitWait(..) of {}", interruptMessage, name);
523                } else {
524                    log.debug("during shutdown, {}  in transmitWait(..) of {}", interruptMessage, name);
525                }
526            }
527        }
528        log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState);
529    }
530
531    // Dispatch control and timer
532    protected boolean replyInDispatch = false;          // true when reply has been received but dispatch not completed
533    private int maxDispatchTime = 0;
534    private int warningMessageTime = DISPATCH_WARNING_TIME;
535    private static final int DISPATCH_WAIT_INTERVAL = 100;
536    private static final int DISPATCH_WARNING_TIME = 12000; // report warning when max dispatch time exceeded
537    private static final int WARN_NEXT_TIME = 1000;         // report every second
538
539    private void checkReplyInDispatch() {
540        int loopCount = 0;
541        while (replyInDispatch) {
542            try {
543                synchronized (xmtRunnable) {
544                    xmtRunnable.wait(DISPATCH_WAIT_INTERVAL);
545                }
546            } catch (InterruptedException e) {
547                Thread.currentThread().interrupt(); // retain if needed later
548                if (threadStopRequest) return; // don't log an error if closing.
549                String[] packages = this.getClass().getName().split("\\.");
550                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
551                        +(packages.length>=1 ? packages[packages.length-1] :"");
552                log.error("transmitLoop interrupted in class {}", name);
553            }
554            loopCount++;
555            int currentDispatchTime = loopCount * DISPATCH_WAIT_INTERVAL;
556            if (currentDispatchTime > maxDispatchTime) {
557                maxDispatchTime = currentDispatchTime;
558                if (currentDispatchTime >= warningMessageTime) {
559                    warningMessageTime = warningMessageTime + WARN_NEXT_TIME;
560                    log.debug("Max dispatch time is now {}", currentDispatchTime);
561                }
562            }
563        }
564    }
565
566    /**
567     *  Determine if the interface is down.
568     *
569     *  @return timeoutFlag
570     */
571    public boolean hasTimeouts() {
572        return timeoutFlag;
573    }
574
575    private boolean timeoutFlag = false;
576    private int timeouts = 0;
577    protected boolean flushReceiveChars = false;
578
579    protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) {
580        //log.debug("Timeout mCurrentState: {}", mCurrentState);
581        String[] packages = this.getClass().getName().split("\\.");
582        String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
583                +(packages.length>=1 ? packages[packages.length-1] :"");
584
585        log.warn("Timeout on reply to message: {} consecutive timeouts = {} in {}", msg, timeouts, name);
586        timeouts++;
587        timeoutFlag = true;
588        flushReceiveChars = true;
589    }
590
591    protected void resetTimeout(AbstractMRMessage msg) {
592        if (timeouts > 0) {
593            log.debug("Reset timeout after {} timeouts", timeouts);
594        }
595        timeouts = 0;
596        timeoutFlag = false;
597    }
598
599    /**
600     * Add header to the outgoing byte stream.
601     *
602     * @param msg the output byte stream
603     * @param m Message results
604     * @return next location in the stream to fill
605     */
606    protected int addHeaderToOutput(byte[] msg, AbstractMRMessage m) {
607        return 0;
608    }
609
610    protected int mWaitBeforePoll = 100;
611    protected long waitTimePoll = 0;
612
613    /**
614     * Add trailer to the outgoing byte stream.
615     *
616     * @param msg    the output byte stream
617     * @param offset the first byte not yet used
618     * @param m   output message to extend
619     */
620    protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) {
621        if (!m.isBinary()) {
622            msg[offset] = 0x0d;
623        }
624    }
625
626    /**
627     * Determine how many bytes the entire message will take, including
628     * space for header and trailer.
629     *
630     * @param m the message to be sent
631     * @return number of bytes
632     */
633    protected int lengthOfByteStream(AbstractMRMessage m) {
634        int len = m.getNumDataElements();
635        int cr = 0;
636        if (!m.isBinary()) {
637            cr = 1;  // space for return char
638        }
639        return len + cr;
640    }
641
642    protected boolean xmtException = false;
643
644    /**
645     * Actually transmit the next message to the port.
646     * @see #sendMessage(AbstractMRMessage, AbstractMRListener)
647     *
648     * @param m the message to send
649     * @param reply the Listener sending the message, often provided as 'this'
650     */
651    @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT"},
652            justification = "Two locks needed for synchronization here, this is OK")
653    protected synchronized void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) {
654        log.debug("forwardToPort message: [{}]", m);
655        // remember who sent this
656        mLastSender = reply;
657
658        // forward the message to the registered recipients,
659        // which includes the communications monitor, except the sender.
660        // Schedule notification via the Swing event queue to ensure order
661        log.trace("about to start XmtNotifier for {} last: {}", m, mLastSender, new Exception("traceback"));
662        Runnable r = new XmtNotifier(m, mLastSender, this);
663        SwingUtilities.invokeLater(r);
664
665        // stream to port in single write, as that's needed by serial
666        int byteLength = lengthOfByteStream(m);
667        byte[]  msg= new byte[byteLength];
668        log.debug("copying message, length = {}", byteLength);
669        // add header
670        int offset = addHeaderToOutput(msg, m);
671
672        // add data content
673        int len = m.getNumDataElements();
674        log.debug("copying data to message, length = {}", len);
675        if (len > byteLength) { // happens somehow
676            log.warn("Invalid message array size {} for {} elements, truncated", byteLength, len);
677        }
678        for (int i = 0; (i < len && i < byteLength); i++) {
679            msg[i + offset] = (byte) m.getElement(i);
680        }
681        // add trailer
682        addTrailerToOutput(msg, len + offset, m);
683        // and stream the bytes
684        try {
685            if (ostream != null) {
686                if (log.isDebugEnabled()) {
687                    StringBuilder f = new StringBuilder();
688                    for (int i = 0; i < msg.length; i++) {
689                        f.append(String.format("%02X ",0xFF & msg[i]));
690                    }
691                    log.debug("formatted message: {}", f.toString() );
692                }
693                while (m.getRetries() >= 0) {
694                    if (portReadyToSend(controller)) {
695                        ostream.write(msg);
696                        ostream.flush();
697                        log.debug("written, msg timeout: {} mSec", m.getTimeout());
698                        break;
699                    } else if (m.getRetries() >= 0) {
700                        log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries());
701                        m.setRetries(m.getRetries() - 1);
702                        try {
703                            synchronized (xmtRunnable) {
704                                xmtRunnable.wait(m.getTimeout());
705                            }
706                        } catch (InterruptedException e) {
707                            Thread.currentThread().interrupt(); // retain if needed later
708                            log.error("retry wait interrupted");
709                        }
710                    } else {
711                        log.warn("sendMessage: port not ready for data sending: {}", Arrays.toString(msg));
712                    }
713                }
714            } else {  // ostream is null
715                // no stream connected
716                connectionWarn();
717            }
718        } catch (IOException | RuntimeException e) {
719            // TODO Currently there's no port recovery if an exception occurs
720            // must restart JMRI to clear xmtException.
721            xmtException = true;
722            portWarn(e);
723        }
724    }
725
726    protected void connectionWarn() {
727        log.warn("sendMessage: no connection established for {}", this.getClass().getName(), new Exception());
728    }
729
730    protected void portWarn(Exception e) {
731        log.warn("sendMessage: Exception: In {} port warn: ", this.getClass().getName(), e);
732    }
733
734    protected boolean connectionError = false;
735
736    protected void portWarnTCP(Exception e) {
737        log.warn("Exception java net: ", e);
738        connectionError = true;
739    }
740    // methods to connect/disconnect to a source of data in an AbstractPortController
741
742    public AbstractPortController controller = null;
743
744    public boolean status() {
745        return (ostream != null && istream != null);
746    }
747
748    protected volatile Thread xmtThread = null;
749    protected volatile Thread rcvThread = null;
750
751    protected volatile Runnable xmtRunnable = null;
752
753    /**
754     * Make connection to an existing PortController object.
755     *
756     * @param p the PortController
757     */
758    public void connectPort(AbstractPortController p) {
759        rcvException = false;
760        connectionError = false;
761        xmtException = false;
762        threadStopRequest = false;
763        try {
764            istream = p.getInputStream();
765            ostream = p.getOutputStream();
766            if (controller != null) {
767                log.warn("connectPort: connect called while connected");
768            } else {
769                log.debug("connectPort invoked");
770            }
771            controller = p;
772            // and start threads
773            xmtThread = jmri.util.ThreadingUtil.newThread(
774                xmtRunnable = new Runnable() {
775                    @Override
776                    public void run() {
777                        try {
778                            transmitLoop();
779                        } catch (ThreadDeath td) {
780                            if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", td, td);
781                            // ThreadDeath must be thrown per Java API Javadocs
782                            throw td;
783                        } catch (Throwable e) {
784                            if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", e, e);
785                        }
786                }
787            });
788
789            String[] packages = this.getClass().getName().split("\\.");
790            xmtThread.setName(
791                (packages.length>=2 ? packages[packages.length-2]+"." :"")
792                +(packages.length>=1 ? packages[packages.length-1] :"")
793                +" Transmit thread");
794
795            xmtThread.setDaemon(true);
796            xmtThread.setPriority(Thread.MAX_PRIORITY-1);      //bump up the priority
797            xmtThread.start();
798
799            rcvThread = jmri.util.ThreadingUtil.newThread(
800                new Runnable() {
801                    @Override
802                    public void run() {
803                        receiveLoop();
804                    }
805                });
806            rcvThread.setName(
807                (packages.length>=2 ? packages[packages.length-2]+"." :"")
808                +(packages.length>=1 ? packages[packages.length-1] :"")
809                +" Receive thread");
810
811            rcvThread.setPriority(Thread.MAX_PRIORITY);      //bump up the priority
812            rcvThread.setDaemon(true);
813            rcvThread.start();
814
815        } catch (RuntimeException e) {
816            log.error("Failed to start up communications. Error was: ", e);
817            log.debug("Full trace:", e);
818        }
819    }
820
821    /**
822     * Get the port name for this connection from the TrafficController.
823     *
824     * @return the name of the port
825     */
826    public String getPortName() {
827        return controller.getCurrentPortName();
828    }
829
830    /**
831     * Break connection to existing PortController object. Once broken, attempts
832     * to send via "message" member will fail.
833     *
834     * @param p the PortController
835     */
836    public void disconnectPort(AbstractPortController p) {
837        istream = null;
838        ostream = null;
839        if (controller != p) {
840            log.warn("disconnectPort: disconnect called from non-connected AbstractPortController");
841        }
842        controller = null;
843        threadStopRequest = true;
844    }
845
846    /**
847     * Check if PortController object can be sent to.
848     *
849     * @param p the PortController
850     * @return true if ready, false otherwise May throw an Exception.
851     */
852    public boolean portReadyToSend(AbstractPortController p) {
853        if (p != null && !xmtException && !rcvException) {
854            return true;
855        } else {
856            return false;
857        }
858    }
859
860    // data members to hold the streams
861    protected DataInputStream istream = null;
862    protected OutputStream ostream = null;
863
864    protected boolean rcvException = false;
865
866    protected int maxRcvExceptionCount = 100;
867
868    /**
869     * Handle incoming characters. This is a permanent loop, looking for input
870     * messages in character form on the stream connected to the PortController
871     * via {@link #connectPort(AbstractPortController)}.
872     * <p>
873     * Each turn of the loop is the receipt of a single message.
874     */
875    public void receiveLoop() {
876        log.debug("receiveLoop starts in {}", this);
877        int errorCount = 0;
878        while (errorCount < maxRcvExceptionCount && !threadStopRequest) { // stream close will exit via exception
879            try {
880                handleOneIncomingReply();
881                errorCount = 0;
882            } catch (java.io.InterruptedIOException e) {
883                // related to InterruptedException, catch first
884                break;
885            } catch (IOException e) {
886                rcvException = true;
887                reportReceiveLoopException(e);
888                break;
889            } catch (RuntimeException e1) {
890                log.error("Exception in receive loop: {}", e1.toString(), e1);
891                errorCount++;
892                if (errorCount == maxRcvExceptionCount) {
893                    rcvException = true;
894                    reportReceiveLoopException(e1);
895                }
896            }
897        }
898        if (!threadStopRequest) { // if e.g. unexpected end
899            ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), ConnectionStatus.CONNECTION_DOWN);
900            log.error("Exit from rcv loop in {}", this.getClass());
901            recovery(); // see if you can restart
902        }
903    }
904
905    /**
906     * Disconnect and reset the current PortController.
907     * Invoked at abnormal ending of receiveLoop.
908     */
909    protected final void recovery() {
910        AbstractPortController adapter = controller;
911        disconnectPort(controller);
912        adapter.recover();
913    }
914
915    /**
916     * Report an error on the receive loop. Separated so tests can suppress, even
917     * though message is asynchronous.
918     * @param e Exception encountered at lower level to trigger error, or null
919     */
920    protected void reportReceiveLoopException(Exception e) {
921        log.error("run: Exception: {} in {}", e.toString(), this.getClass().toString(), e);
922        jmri.jmrix.ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), jmri.jmrix.ConnectionStatus.CONNECTION_DOWN);
923        if (controller instanceof AbstractNetworkPortController) {
924            portWarnTCP(e);
925        }
926    }
927
928    protected abstract AbstractMRReply newReply();
929
930    protected abstract boolean endOfMessage(AbstractMRReply r);
931
932    /**
933     * Dummy routine, to be filled by protocols that have to skip some
934     * start-of-message characters.
935     * @param istream input source
936     * @throws IOException from underlying operations
937     */
938    protected void waitForStartOfReply(DataInputStream istream) throws IOException {
939    }
940
941    /**
942     * Read a single byte, protecting against various timeouts, etc.
943     * <p>
944     * When a port is set to have a receive timeout (via the
945     * {@link purejavacomm.SerialPort#enableReceiveTimeout(int)} method), some will return
946     * zero bytes or an EOFException at the end of the timeout. In that case, the read
947     * should be repeated to get the next real character.
948     *
949     * @param istream stream to read
950     * @return the byte read
951     * @throws java.io.IOException if unable to read
952     */
953    protected byte readByteProtected(DataInputStream istream) throws IOException {
954        if (istream == null) {
955            throw new IOException("Input Stream NULL when reading");
956        }
957        while (true) { // loop will repeat until character found
958            int nchars;
959            nchars = istream.read(rcvBuffer, 0, 1);
960            if (nchars == -1) {
961                // No more bytes can be read from the channel
962                throw new IOException("Connection not terminated normally");
963            }
964            if (nchars > 0) {
965                return rcvBuffer[0];
966            }
967        }
968    }
969
970    // Defined this way to reduce new object creation
971    private byte[] rcvBuffer = new byte[1];
972
973    /**
974     * Get characters from the input source, and file a message.
975     * <p>
976     * Returns only when the message is complete.
977     * <p>
978     * Only used in the Receive thread.
979     * <p>
980     * Handles timeouts on read by ignoring zero-length reads.
981     *
982     * @param msg     message to fill
983     * @param istream character source.
984     * @throws IOException when presented by the input source.
985     */
986    protected void loadChars(AbstractMRReply msg, DataInputStream istream)
987            throws IOException {
988        int i;
989        for (i = 0; i < msg.maxSize(); i++) {
990            byte char1 = readByteProtected(istream);
991            log.trace("char: {} i: {}",(char1&0xFF),i);
992            // if there was a timeout, flush any char received and start over
993            if (flushReceiveChars) {
994                log.warn("timeout flushes receive buffer: {}", msg);
995                msg.flush();
996                i = 0;  // restart
997                flushReceiveChars = false;
998            }
999            if (canReceive()) {
1000                msg.setElement(i, char1);
1001                if (endOfMessage(msg)) {
1002                    break;
1003                }
1004            } else {
1005                i--; // flush char
1006                log.error("unsolicited character received: {}", Integer.toHexString(char1));
1007            }
1008        }
1009    }
1010
1011    /**
1012     * Override in the system specific code if necessary
1013     *
1014     * @return true if it is okay to buffer receive characters into a reply
1015     *         message. When false, discard char received
1016     */
1017    protected boolean canReceive() {
1018        return true;
1019    }
1020
1021    private int retransmitCount = 0;
1022
1023    /**
1024     * Executes a reply distribution action on the appropriate thread for JMRI.
1025     * @param r a runnable typically encapsulating a MRReply and the iteration code needed to
1026     *          send it to all the listeners.
1027     */
1028    protected void distributeReply(Runnable r) {
1029        try {
1030            if (synchronizeRx) {
1031                SwingUtilities.invokeAndWait(r);
1032            } else {
1033                SwingUtilities.invokeLater(r);
1034            }
1035        } catch (InterruptedException ie) {
1036            if (threadStopRequest) return;
1037            log.error("Unexpected exception in invokeAndWait: {}{}", ie, ie.toString());
1038        } catch (java.lang.reflect.InvocationTargetException| RuntimeException e) {
1039            log.error("Unexpected exception in invokeAndWait: {}{}", e, e.toString());
1040            return;
1041        }
1042        log.debug("dispatch thread invoked");
1043    }
1044
1045    /**
1046     * Handle each reply when complete.
1047     * <p>
1048     * (This is public for testing purposes) Runs in the "Receive" thread.
1049     *
1050     * @throws java.io.IOException on error.
1051     */
1052    public void handleOneIncomingReply() throws IOException {
1053        // we sit in this until the message is complete, relying on
1054        // threading to let other stuff happen
1055
1056        // Create message off the right concrete class
1057        AbstractMRReply msg = newReply();
1058
1059        // wait for start if needed
1060        waitForStartOfReply(istream);
1061
1062        // message exists, now fill it
1063        loadChars(msg, istream);
1064
1065        if (threadStopRequest) return;
1066
1067        // message is complete, dispatch it !!
1068        replyInDispatch = true;
1069        log.debug("dispatch reply of length {} contains \"{}\", state {}", msg.getNumDataElements(), msg, mCurrentState);
1070
1071        // forward the message to the registered recipients,
1072        // which includes the communications monitor
1073        // return a notification via the Swing event queue to ensure proper thread
1074        Runnable r = new RcvNotifier(msg, mLastSender, this);
1075        distributeReply(r);
1076
1077        if (!msg.isUnsolicited()) {
1078            // effect on transmit:
1079            switch (mCurrentState) {
1080                case WAITMSGREPLYSTATE: {
1081                    // check to see if the response was an error message we want
1082                    // to automatically handle by re-queueing the last sent
1083                    // message, otherwise go on to the next message
1084                    if (msg.isRetransmittableErrorMsg()) {
1085                        log.error("Automatic Recovery from Error Message: {}.  Retransmitted {} times.", msg, retransmitCount);
1086                        synchronized (xmtRunnable) {
1087                            mCurrentState = AUTORETRYSTATE;
1088                            if (retransmitCount > 0) {
1089                                try {
1090                                    xmtRunnable.wait(retransmitCount * 100L);
1091                                } catch (InterruptedException e) {
1092                                    Thread.currentThread().interrupt(); // retain if needed later
1093                                }
1094                            }
1095                            replyInDispatch = false;
1096                            xmtRunnable.notify();
1097                            retransmitCount++;
1098                        }
1099                    } else {
1100                        // update state, and notify to continue
1101                        synchronized (xmtRunnable) {
1102                            mCurrentState = NOTIFIEDSTATE;
1103                            replyInDispatch = false;
1104                            xmtRunnable.notify();
1105                            retransmitCount = 0;
1106                        }
1107                    }
1108                    break;
1109                }
1110                case WAITREPLYINPROGMODESTATE: {
1111                    // entering programming mode
1112                    mCurrentMode = PROGRAMINGMODE;
1113                    replyInDispatch = false;
1114
1115                    // check to see if we need to delay to allow decoders to become
1116                    // responsive
1117                    int warmUpDelay = enterProgModeDelayTime();
1118                    if (warmUpDelay != 0) {
1119                        try {
1120                            synchronized (xmtRunnable) {
1121                                xmtRunnable.wait(warmUpDelay);
1122                            }
1123                        } catch (InterruptedException e) {
1124                            Thread.currentThread().interrupt(); // retain if needed later
1125                        }
1126                    }
1127                    // update state, and notify to continue
1128                    synchronized (xmtRunnable) {
1129                        mCurrentState = OKSENDMSGSTATE;
1130                        xmtRunnable.notify();
1131                    }
1132                    break;
1133                }
1134                case WAITREPLYINNORMMODESTATE: {
1135                    // entering normal mode
1136                    mCurrentMode = NORMALMODE;
1137                    replyInDispatch = false;
1138                    // update state, and notify to continue
1139                    synchronized (xmtRunnable) {
1140                        mCurrentState = OKSENDMSGSTATE;
1141                        xmtRunnable.notify();
1142                    }
1143                    break;
1144                }
1145                default: {
1146                    replyInDispatch = false;
1147                    if (allowUnexpectedReply) {
1148                        log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg);
1149                        synchronized (xmtRunnable) {
1150                            // The transmit thread sometimes gets stuck
1151                            // when unexpected replies are received.  Notify
1152                            // it to clear the block without a timeout.
1153                            // (do not change the current state)
1154                            //if(mCurrentState!=IDLESTATE)
1155                            xmtRunnable.notify();
1156                        }
1157                    } else {
1158                        unexpectedReplyStateError(mCurrentState, msg.toString());
1159                    }
1160                }
1161            }
1162            // Unsolicited message
1163        } else {
1164            log.debug("Unsolicited Message Received {}", msg);
1165
1166            replyInDispatch = false;
1167        }
1168    }
1169
1170    /**
1171     * Log an error message for a message received in an unexpected state.
1172     * @param State message state.
1173     * @param msgString message string.
1174     */
1175    protected void unexpectedReplyStateError(int State, String msgString) {
1176       String[] packages = this.getClass().getName().split("\\.");
1177       String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
1178                     +(packages.length>=1 ? packages[packages.length-1] :"");
1179       log.error("reply complete in unexpected state: {} was {} in class {}", State, msgString, name);
1180    }
1181
1182    /**
1183     * for testing purposes, let us be able to find out
1184     * what the last sender was.
1185     * @return last sender, mLastSender.
1186     */
1187    public AbstractMRListener getLastSender() {
1188        return mLastSender;
1189    }
1190
1191    protected void terminate() {
1192        log.debug("Cleanup Starts");
1193        if (ostream == null) {
1194            return;    // no connection established
1195        }
1196        AbstractMRMessage modeMsg = enterNormalMode();
1197        if (modeMsg != null) {
1198            modeMsg.setRetries(100); // set the number of retries
1199            // high, just in case the interface
1200            // is busy when we try to send
1201            forwardToPort(modeMsg, null);
1202            // wait for reply
1203            try {
1204                if (xmtRunnable != null) {
1205                    synchronized (xmtRunnable) {
1206                        xmtRunnable.wait(modeMsg.getTimeout());
1207                    }
1208                }
1209            } catch (InterruptedException e) {
1210                Thread.currentThread().interrupt(); // retain if needed later
1211                log.error("transmit interrupted");
1212            }
1213        }
1214    }
1215
1216    /**
1217     * Internal class to remember the Reply object and destination listener with
1218     * a reply is received.
1219     */
1220    protected static class RcvNotifier implements Runnable {
1221
1222        AbstractMRReply mMsg;
1223        AbstractMRListener mDest;
1224        AbstractMRTrafficController mTc;
1225
1226        public RcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1227                AbstractMRTrafficController pTc) {
1228            mMsg = pMsg;
1229            mDest = pDest;
1230            mTc = pTc;
1231        }
1232
1233        @Override
1234        public void run() {
1235            log.debug("Delayed rcv notify starts");
1236            mTc.notifyReply(mMsg, mDest);
1237        }
1238    } // end RcvNotifier
1239
1240    // allow creation of object outside package
1241    protected RcvNotifier newRcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1242            AbstractMRTrafficController pTc) {
1243        return new RcvNotifier(pMsg, pDest, pTc);
1244    }
1245
1246    /**
1247     * Internal class to remember the Message object and destination listener
1248     * when a message is queued for notification.
1249     */
1250    protected static class XmtNotifier implements Runnable {
1251
1252        AbstractMRMessage mMsg;
1253        AbstractMRListener mDest;
1254        AbstractMRTrafficController mTc;
1255
1256        public XmtNotifier(AbstractMRMessage pMsg, AbstractMRListener pDest,
1257                AbstractMRTrafficController pTc) {
1258            mMsg = pMsg;
1259            mDest = pDest;
1260            mTc = pTc;
1261        }
1262
1263        @Override
1264        public void run() {
1265            log.debug("Delayed xmt notify starts");
1266            mTc.notifyMessage(mMsg, mDest);
1267        }
1268    }  // end XmtNotifier
1269
1270    /**
1271     * Terminate the receive and transmit threads.
1272     * <p>
1273     * This is intended to be used only by testing subclasses.
1274     */
1275    public void terminateThreads() {
1276        threadStopRequest = true;
1277        if (xmtThread != null) {
1278            xmtThread.interrupt();
1279            try {
1280                xmtThread.join();
1281            } catch (InterruptedException ie){
1282                // interrupted during cleanup.
1283            }
1284        }
1285
1286        if (rcvThread != null) {
1287            rcvThread.interrupt();
1288            try {
1289                rcvThread.join();
1290            } catch (InterruptedException ie){
1291                // interrupted during cleanup.
1292            }
1293        }
1294        // we also need to remove the shutdown task.
1295        InstanceManager.getDefault(ShutDownManager.class).deregister(shutDownTask);
1296    }
1297
1298    /**
1299     * Flag that threads should terminate as soon as they can.
1300     */
1301    protected volatile boolean threadStopRequest = false;
1302
1303    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(AbstractMRTrafficController.class);
1304
1305}