001package jmri.jmrix;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.io.DataInputStream;
005import java.io.IOException;
006import java.io.OutputStream;
007import java.util.*;
008import javax.swing.SwingUtilities;
009
010import jmri.InstanceManager;
011import jmri.ShutDownManager;
012import jmri.ShutDownTask;
013
014/**
015 * Abstract base for TrafficControllers in a Message/Reply protocol.
016 * <p>
017 * Two threads are used for the actual communication. The "Transmit" thread
018 * handles pushing characters to the port, and also changing the mode. The
019 * "Receive" thread converts characters from the input stream into replies.
020 * <p>
021 * The constructor registers a shutdown task to
022 * trigger the necessary cleanup code
023 * <p>
024 * The internal state machine handles changes of mode, automatic retry of 
025 * certain messages, time outs, and sending poll messages when otherwise idle.
026 * <p>
027 * "Mode" refers to the state of the command station communications. "Normal" 
028 * and "Programming" are the two modes, used if the command station requires
029 * messages to go back and forth between them. <br>
030 *
031 * <img src="doc-files/AbstractMRTrafficController-StateDiagram.png" alt="UML State diagram">
032 * 
033 * <p>
034 * The key methods for the basic operation are:
035 * <ul>
036 * <li>If needed for formatting outbound messages, {@link #addHeaderToOutput(byte[], AbstractMRMessage)} and {@link #addTrailerToOutput(byte[], int, AbstractMRMessage)}
037 * <li> {@link #newReply()} creates an empty reply message (of the proper concrete type) to fill with incoming data
038 * <li>The {@link #endOfMessage(AbstractMRReply) } method is used to parse incoming messages. If it needs
039 *      information on e.g. the last message sent, that can be stored in member variables
040 *      by {@link #forwardToPort(AbstractMRMessage, AbstractMRListener)}.
041 *  <li>{@link #forwardMessage(AbstractMRListener, AbstractMRMessage)} and {@link #forwardReply(AbstractMRListener, AbstractMRReply) } handle forwarding of specific types of objects
042 * </ul>
043 * <p>
044 * If your command station requires messages to go in and out of 
045 * "programming mode", those should be provided by 
046 * {@link #enterProgMode()} and {@link #enterNormalMode()}.
047 * <p>
048 * If you want to poll for information when the line is otherwise idle,
049 * implement {@link #pollMessage()} and {@link #pollReplyHandler()}.
050 * 
051 * @author Bob Jacobsen Copyright (C) 2003
052 * @author Paul Bender Copyright (C) 2004-2010
053 */
054
055/*
056@startuml jmri/jmrix/doc-files/AbstractMRTrafficController-StateDiagram.png
057
058    [*] --> IDLESTATE
059    IDLESTATE --> NOTIFIEDSTATE : sendMessage()
060    NOTIFIEDSTATE --> IDLESTATE : queue empty
061    
062    NOTIFIEDSTATE --> WAITMSGREPLYSTATE : transmitLoop()\nwake, send message
063    
064    WAITMSGREPLYSTATE --> WAITREPLYINPROGMODESTATE : transmitLoop()\nnot in PROGRAMINGMODE,\nmsg for PROGRAMINGMODE
065    WAITMSGREPLYSTATE --> WAITREPLYINNORMMODESTATE : transmitLoop()\nnot in NORMALMODE,\nmsg for NORMALMODE
066    
067    WAITMSGREPLYSTATE --> NOTIFIEDSTATE : handleOneIncomingReply()
068
069    WAITREPLYINPROGMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered PROGRAMINGMODE
070    WAITREPLYINNORMMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered NORMALMODE
071    OKSENDMSGSTATE --> WAITMSGREPLYSTATE : send original pended message
072    
073    IDLESTATE --> POLLSTATE : transmitLoop()\nno work
074    POLLSTATE --> WAITMSGREPLYSTATE : transmitLoop()\npoll msg exists, send it
075    POLLSTATE --> IDLESTATE : transmitLoop()\nno poll msg to send
076    
077    WAITMSGREPLYSTATE --> AUTORETRYSTATE : handleOneIncomingReply()\nwhen tagged as error reply
078    AUTORETRYSTATE --> IDLESTATE : to drive a repeat of a message 
079
080NOTIFIEDSTATE : Transmit thread wakes up and processes
081POLLSTATE : Transient while deciding to send poll
082OKSENDMSGSTATE : Transient while deciding to send\noriginal message after mode change
083AUTORETRYSTATE : Transient while deciding to resend auto-retry message
084WAITREPLYINPROGMODESTATE : Sent request to go to programming mode,\nwaiting reply
085WAITREPLYINNORMMODESTATE : Sent request to go to normal mode,\nwaiting reply
086WAITMSGREPLYSTATE : Have sent message, waiting a\nresponse from layout
087
088Note left of AUTORETRYSTATE : This state handles timeout of\nmessages marked for autoretry
089Note left of OKSENDMSGSTATE : Transient internal state\nwill transition when going back\nto send message that\nwas deferred for mode change.
090
091@enduml
092 */
093
094public abstract class AbstractMRTrafficController {
095
096    private final Runnable shutDownTask = this::terminate; // retain for possible removal.
097
098    /**
099     * Create a new unnamed MRTrafficController.
100     */
101    public AbstractMRTrafficController() {
102        log.debug("Creating AbstractMRTrafficController instance");
103        mCurrentMode = NORMALMODE;
104        mCurrentState = IDLESTATE;
105        allowUnexpectedReply = false;
106
107
108        // We use a shutdown task here to make sure the connection is left
109        // in a clean state prior to exiting.  This is required on systems
110        // which have a service mode to ensure we don't leave the system 
111        // in an unusable state. Once the shutdown task executes, the connection
112        // must be considered permanently closed.
113        
114        InstanceManager.getDefault(ShutDownManager.class).register(shutDownTask);
115    }
116
117    private boolean synchronizeRx = true;
118    
119    protected void setSynchronizeRx(boolean val) {
120        synchronizeRx = val;
121    }
122
123    protected boolean getSynchronizeRx() {
124        return synchronizeRx;
125    }
126
127    // The methods to implement the abstract Interface
128
129    protected final Vector<AbstractMRListener> cmdListeners = new Vector<>();
130
131    protected synchronized void addListener(AbstractMRListener l) {
132        // add only if not already registered
133        if (l == null) {
134            throw new NullPointerException();
135        }
136        if (!cmdListeners.contains(l)) {
137            cmdListeners.addElement(l);
138        }
139    }
140
141    protected synchronized void removeListener(AbstractMRListener l) {
142        if (cmdListeners.contains(l)) {
143            cmdListeners.removeElement(l);
144        }
145    }
146
147    /**
148     * Forward a Message to registered listeners.
149     *
150     * @param m     Message to be forwarded intact
151     * @param notMe One (optional) listener to be skipped, usually because it's
152     *              the originating object.
153     */
154    @SuppressWarnings("unchecked")
155    protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) {
156        // make a copy of the listener vector to synchronized not needed for transmit
157        Vector<AbstractMRListener> v;
158        synchronized (this) {
159            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
160            v = (Vector<AbstractMRListener>) cmdListeners.clone();
161        }
162        // forward to all listeners
163        int cnt = v.size();
164        for (int i = 0; i < cnt; i++) {
165            AbstractMRListener client = v.elementAt(i);
166            if (notMe != client) {
167                log.debug("notify client: {}", client);
168                try {
169                    forwardMessage(client, m);
170                } catch (RuntimeException e) {
171                    log.warn("notify: During message dispatch to {}", client, e);
172                }
173            }
174        }
175    }
176
177    /**
178     * Implement this to forward a specific message type to a protocol-specific
179     * listener interface.
180     * This puts the casting into the concrete class.
181     * @param client abstract listener.
182     * @param m message to forward.
183     */
184    protected abstract void forwardMessage(AbstractMRListener client, AbstractMRMessage m);
185
186    /**
187     * Invoked if it's appropriate to do low-priority polling of the command
188     * station, this should return the next message to send, or null if the
189     * TrafficController should just sleep.
190     * @return Formatted poll message
191     */
192    protected abstract AbstractMRMessage pollMessage();
193
194    protected abstract AbstractMRListener pollReplyHandler();
195
196    protected AbstractMRListener mLastSender = null;
197
198    protected volatile int mCurrentMode;
199    public static final int NORMALMODE = 1;
200    public static final int PROGRAMINGMODE = 4;
201
202    /**
203     * Set the system to programming mode.
204     * @see #enterNormalMode()
205     *
206     * @return any message that needs to be returned to the Command Station
207     * to change modes. If no message is needed, returns null.
208     */
209    protected abstract AbstractMRMessage enterProgMode();
210
211    /**
212     * Sets the system to normal mode during programming while in IDLESTATE.
213     * If {@link #programmerIdle()} returns true, enterNormalMode() is
214     * called after a timeout.
215     * @see #enterProgMode()
216     *
217     * @return any message that needs to be returned to the Command Station
218     * to change modes. If no message is needed, returns null.
219     */
220    protected abstract AbstractMRMessage enterNormalMode();
221
222    /**
223     * Check if the programmer is idle.
224     * Override in the system specific code if necessary (see notes for
225     * {@link #enterNormalMode()}.
226     *
227     * @return true if not busy programming
228     */
229    protected boolean programmerIdle() {
230        return true;
231    }
232
233    /**
234     * Get the delay (wait time) after enabling the programming track.
235     * Override in subclass to add a longer delay.
236     *
237     * @return 0 as default delay
238     */
239    protected int enterProgModeDelayTime() {
240        return 0;
241    }
242
243    protected volatile int mCurrentState;
244    public static final int IDLESTATE = 10;        // nothing happened
245    public static final int NOTIFIEDSTATE = 15;    // xmt notified, will next wake
246    public static final int WAITMSGREPLYSTATE = 25;  // xmt has sent, await reply to message
247    public static final int WAITREPLYINPROGMODESTATE = 30;  // xmt has done mode change, await reply
248    public static final int WAITREPLYINNORMMODESTATE = 35;  // xmt has done mode change, await reply
249    public static final int OKSENDMSGSTATE = 40;        // mode change reply here, send original msg
250    public static final int AUTORETRYSTATE = 45;        // received message where automatic recovery may occur with a retransmission, re-send original msg
251    public static final int POLLSTATE = 50;   // Send program mode or poll message
252
253    protected boolean allowUnexpectedReply;
254
255    /**
256     * Set whether the command station may send messages without a request
257     * sent to it.
258     *
259     * @param expected true to allow messages without a prior request
260     */
261    protected void setAllowUnexpectedReply(boolean expected) {
262        allowUnexpectedReply = expected;
263    }
264
265    /**
266     * Forward a "Reply" from layout to registered listeners.
267     *
268     * @param r    Reply to be forwarded intact
269     * @param dest One (optional) listener to be skipped, usually because it's
270     *             the originating object.
271     */
272    @SuppressWarnings("unchecked")
273    protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) {
274        // make a copy of the listener vector to synchronized (not needed for transmit?)
275        Vector<AbstractMRListener> v;
276        synchronized (this) {
277            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
278            v = (Vector<AbstractMRListener>) cmdListeners.clone();
279        }
280        // forward to all listeners
281        int cnt = v.size();
282        for (int i = 0; i < cnt; i++) {
283            AbstractMRListener client = v.elementAt(i);
284            log.debug("notify client: {}", client);
285            try {
286                //skip dest for now, we'll send the message to there last.
287                if (dest != client) {
288                    forwardReply(client, r);
289                }
290            } catch (RuntimeException e) {
291                log.warn("notify: During reply dispatch to {}", client, e);
292            }
293        }
294
295        // forward to the last listener who sent a message
296        // this is done _second_ so monitoring can have already stored the reply
297        // before a response is sent
298        if (dest != null) {
299            forwardReply(dest, r);
300        }
301    }
302
303    protected abstract void forwardReply(AbstractMRListener client, AbstractMRReply m);
304
305    /**
306     * Messages to be transmitted.
307     */
308    protected LinkedList<AbstractMRMessage> msgQueue = new LinkedList<>();
309    protected LinkedList<AbstractMRListener> listenerQueue = new LinkedList<>();
310
311    /**
312     * Forward message to the port. Messages are queued and then the
313     * transmission thread is notified.
314     * @see #forwardToPort(AbstractMRMessage, AbstractMRListener)
315     *
316     * @param m the message to send
317     * @param reply the Listener sending the message, often provided as 'this'
318     */
319    protected synchronized void sendMessage(AbstractMRMessage m, AbstractMRListener reply) {
320        msgQueue.addLast(m);
321        listenerQueue.addLast(reply);
322        synchronized (xmtRunnable) {
323            if (mCurrentState == IDLESTATE) {
324                mCurrentState = NOTIFIEDSTATE;
325                xmtRunnable.notify();
326            }
327        }
328        if (m != null) {
329            log.debug("just notified transmit thread with message {}", m);
330        }
331    }
332
333    /**
334     * Permanent loop for the transmit thread.
335     */
336    protected void transmitLoop() {
337        log.debug("transmitLoop starts in {}", this);
338
339        // loop forever
340        while (!connectionError && !threadStopRequest) {
341            AbstractMRMessage m = null;
342            AbstractMRListener l = null;
343            // check for something to do
344            synchronized (this) {
345                if (!msgQueue.isEmpty()) {
346                    // yes, something to do
347                    m = msgQueue.getFirst();
348                    msgQueue.removeFirst();
349                    l = listenerQueue.getFirst();
350                    listenerQueue.removeFirst();
351                    mCurrentState = WAITMSGREPLYSTATE;
352                    log.debug("transmit loop has something to do: {}", m);
353                }  // release lock here to proceed in parallel
354            }
355            // if a message has been extracted, process it
356            if (m != null) {
357                // check for need to change mode
358                log.debug("Start msg, state = {}", mCurrentMode);
359                if (m.getNeededMode() != mCurrentMode) {
360                    AbstractMRMessage modeMsg;
361                    if (m.getNeededMode() == PROGRAMINGMODE) {
362                        // change state to programming mode and send message
363                        modeMsg = enterProgMode();
364                        if (modeMsg != null) {
365                            mCurrentState = WAITREPLYINPROGMODESTATE;
366                            log.debug("Enter Programming Mode");
367                            forwardToPort(modeMsg, null);
368                            // wait for reply
369                            transmitWait(m.getTimeout(), WAITREPLYINPROGMODESTATE, "enter programming mode interrupted");
370                        }
371                    } else {
372                        // change state to normal and send message
373                        modeMsg = enterNormalMode();
374                        if (modeMsg != null) {
375                            mCurrentState = WAITREPLYINNORMMODESTATE;
376                            log.debug("Enter Normal Mode");
377                            forwardToPort(modeMsg, null);
378                            // wait for reply
379                            transmitWait(m.getTimeout(), WAITREPLYINNORMMODESTATE, "enter normal mode interrupted");
380                        }
381                    }
382                    if (modeMsg != null) {
383                        checkReplyInDispatch();
384                        if (mCurrentState != OKSENDMSGSTATE) {
385                            handleTimeout(modeMsg, l);
386                        }
387                        mCurrentState = WAITMSGREPLYSTATE;
388                    } else {
389                        // no mode message required, but the message
390                        // needs a different mode
391                        log.debug("Setting mode to: {}", m.getNeededMode());
392                        mCurrentMode = m.getNeededMode();
393                    }
394                }
395                forwardToPort(m, l);
396                // reply expected?
397                if (m.replyExpected()) {
398                    log.debug("reply expected is true for message {}",m);
399                    // wait for a reply, or eventually timeout
400                    transmitWait(m.getTimeout(), WAITMSGREPLYSTATE, "transmitLoop interrupted");
401                    checkReplyInDispatch();
402                    if (mCurrentState == WAITMSGREPLYSTATE) {
403                        handleTimeout(m, l);
404                    } else if (mCurrentState == AUTORETRYSTATE) {
405                        log.info("Message added back to queue: {}", m);
406                        msgQueue.addFirst(m);
407                        listenerQueue.addFirst(l);
408                        synchronized (xmtRunnable) {
409                            mCurrentState = IDLESTATE;
410                        }
411                    } else {
412                        resetTimeout(m);
413                    }
414                } // just continue to the next message from here
415            } else {
416                // nothing to do
417                if (mCurrentState != IDLESTATE) {
418                    log.debug("Setting IDLESTATE");
419                    log.debug("Current Mode {}", mCurrentMode);
420                    mCurrentState = IDLESTATE;
421                }
422                // wait for something to send
423                if (mWaitBeforePoll > waitTimePoll || mCurrentMode == PROGRAMINGMODE) {
424                    try {
425                        long startTime = Calendar.getInstance().getTimeInMillis();
426                        synchronized (xmtRunnable) {
427                            xmtRunnable.wait(mWaitBeforePoll);
428                        }
429                        long endTime = Calendar.getInstance().getTimeInMillis();
430                        waitTimePoll = waitTimePoll + endTime - startTime;
431                    } catch (InterruptedException e) {
432                        Thread.currentThread().interrupt(); // retain if needed later
433                        // end of transmit loop
434                        break;
435                    }
436                }
437                // once we decide that mCurrentState is in the IDLESTATE and there's an xmt msg we must guarantee
438                // the change of mCurrentState to one of the waiting for reply states.  Therefore we need to synchronize.
439                synchronized (this) {
440                    if (mCurrentState != NOTIFIEDSTATE && mCurrentState != IDLESTATE) {
441                        log.error("left timeout in unexpected state: {}", mCurrentState);
442                    }
443                    if (mCurrentState == IDLESTATE) {
444                        mCurrentState = POLLSTATE; // this prevents other transitions from the IDLESTATE
445                    }
446                }
447                // went around with nothing to do; leave programming state if in it
448                if (mCurrentMode == PROGRAMINGMODE) {
449                    log.debug("Timeout - in service mode");
450                }
451                if (mCurrentState == POLLSTATE && mCurrentMode == PROGRAMINGMODE && programmerIdle()) {
452                    log.debug("timeout causes leaving programming mode");
453                    mCurrentState = WAITREPLYINNORMMODESTATE;
454                    AbstractMRMessage msg = enterNormalMode();
455                    // if the enterNormalMode() message is null, we
456                    // don't want to try to send it to the port.
457                    if (msg != null) {
458                        forwardToPort(msg, null);
459                        // wait for reply
460                        transmitWait(msg.getTimeout(), WAITREPLYINNORMMODESTATE, "interrupted while leaving programming mode");
461                        checkReplyInDispatch();
462                        // exit program mode timeout?
463                        if (mCurrentState == WAITREPLYINNORMMODESTATE) {
464                            // entering normal mode via timeout
465                            handleTimeout(msg, l);
466                            mCurrentMode = NORMALMODE;
467                        }
468                        // and go around again
469                    }
470                } else if (mCurrentState == POLLSTATE && mCurrentMode == NORMALMODE) {
471                    // We may need to poll
472                    AbstractMRMessage msg = pollMessage();
473                    if (msg != null) {
474                        // yes, send that
475                        log.debug("Sending poll, wait time {}", Long.toString(waitTimePoll));
476                        mCurrentState = WAITMSGREPLYSTATE;
477                        forwardToPort(msg, pollReplyHandler());
478                        // wait for reply
479                        log.debug("Still waiting for reply");
480                        transmitWait(msg.getTimeout(), WAITMSGREPLYSTATE, "interrupted while waiting poll reply");
481                        checkReplyInDispatch();
482                        // and go around again
483                        if (mCurrentState == WAITMSGREPLYSTATE) {
484                            handleTimeout(msg, l);
485                        } else {
486                            resetTimeout(msg);
487                        }
488                    }
489                    waitTimePoll = 0;
490                }
491                // no messages, so back to idle
492                if (mCurrentState == POLLSTATE) {
493                    mCurrentState = IDLESTATE;
494                }
495            }
496        }
497    }   // end of transmit loop; go around again
498
499    protected void transmitWait(int waitTime, int state, String interruptMessage) {
500        // wait() can have spurious wakeup!
501        // so we protect by making sure the entire timeout time is used
502        long currentTime = Calendar.getInstance().getTimeInMillis();
503        long endTime = currentTime + waitTime;
504        while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) {
505            long wait = endTime - currentTime;
506            try {
507                synchronized (xmtRunnable) {
508                    // Do not wait if the current state has changed since we
509                    // last set it.
510                    if (mCurrentState != state) {
511                        return;
512                    }
513                    xmtRunnable.wait(wait); // rcvr normally ends this w state change
514                }
515            } catch (InterruptedException e) {
516                Thread.currentThread().interrupt(); // retain if needed later
517                String[] packages = this.getClass().getName().split("\\.");
518                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
519                        +(packages.length>=1 ? packages[packages.length-1] :"");
520                if (!threadStopRequest) {
521                    log.error("{} in transmitWait(..) of {}", interruptMessage, name);
522                } else {
523                    log.debug("during shutdown, {}  in transmitWait(..) of {}", interruptMessage, name);
524                }
525            }
526        }
527        log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState);
528    }
529
530    // Dispatch control and timer
531    protected boolean replyInDispatch = false;          // true when reply has been received but dispatch not completed
532    private int maxDispatchTime = 0;
533    private int warningMessageTime = DISPATCH_WARNING_TIME;
534    private static final int DISPATCH_WAIT_INTERVAL = 100;
535    private static final int DISPATCH_WARNING_TIME = 12000; // report warning when max dispatch time exceeded
536    private static final int WARN_NEXT_TIME = 1000;         // report every second
537
538    private void checkReplyInDispatch() {
539        int loopCount = 0;
540        while (replyInDispatch) {
541            try {
542                synchronized (xmtRunnable) {
543                    xmtRunnable.wait(DISPATCH_WAIT_INTERVAL);
544                }
545            } catch (InterruptedException e) {
546                Thread.currentThread().interrupt(); // retain if needed later
547                if (threadStopRequest) return; // don't log an error if closing.
548                String[] packages = this.getClass().getName().split("\\.");
549                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
550                        +(packages.length>=1 ? packages[packages.length-1] :"");
551                log.error("transmitLoop interrupted in class {}", name);
552            }
553            loopCount++;
554            int currentDispatchTime = loopCount * DISPATCH_WAIT_INTERVAL;
555            if (currentDispatchTime > maxDispatchTime) {
556                maxDispatchTime = currentDispatchTime;
557                if (currentDispatchTime >= warningMessageTime) {
558                    warningMessageTime = warningMessageTime + WARN_NEXT_TIME;
559                    log.debug("Max dispatch time is now {}", currentDispatchTime);
560                }
561            }
562        }
563    }
564
565    /**
566     *  Determine if the interface is down.
567     *
568     *  @return timeoutFlag
569     */
570    public boolean hasTimeouts() {
571        return timeoutFlag;
572    }
573
574    private boolean timeoutFlag = false;
575    private int timeouts = 0;
576    protected boolean flushReceiveChars = false;
577
578    protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) {
579        //log.debug("Timeout mCurrentState: {}", mCurrentState);
580        String[] packages = this.getClass().getName().split("\\.");
581        String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
582                +(packages.length>=1 ? packages[packages.length-1] :"");
583
584        log.warn("Timeout on reply to message: {} consecutive timeouts = {} in {}", msg, timeouts, name);
585        timeouts++;
586        timeoutFlag = true;
587        flushReceiveChars = true;
588    }
589
590    protected void resetTimeout(AbstractMRMessage msg) {
591        if (timeouts > 0) {
592            log.debug("Reset timeout after {} timeouts", timeouts);
593        }
594        timeouts = 0;
595        timeoutFlag = false;
596    }
597
598    /**
599     * Add header to the outgoing byte stream.
600     *
601     * @param msg the output byte stream
602     * @param m Message results
603     * @return next location in the stream to fill
604     */
605    protected int addHeaderToOutput(byte[] msg, AbstractMRMessage m) {
606        return 0;
607    }
608
609    protected int mWaitBeforePoll = 100;
610    protected long waitTimePoll = 0;
611
612    /**
613     * Add trailer to the outgoing byte stream.
614     *
615     * @param msg    the output byte stream
616     * @param offset the first byte not yet used
617     * @param m   output message to extend
618     */
619    protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) {
620        if (!m.isBinary()) {
621            msg[offset] = 0x0d;
622        }
623    }
624
625    /**
626     * Determine how many bytes the entire message will take, including
627     * space for header and trailer.
628     *
629     * @param m the message to be sent
630     * @return number of bytes
631     */
632    protected int lengthOfByteStream(AbstractMRMessage m) {
633        int len = m.getNumDataElements();
634        int cr = 0;
635        if (!m.isBinary()) {
636            cr = 1;  // space for return char
637        }
638        return len + cr;
639    }
640
641    protected boolean xmtException = false;
642
643    /**
644     * Actually transmit the next message to the port.
645     * @see #sendMessage(AbstractMRMessage, AbstractMRListener)
646     *
647     * @param m the message to send
648     * @param reply the Listener sending the message, often provided as 'this'
649     */
650    @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT"},
651            justification = "Two locks needed for synchronization here, this is OK")
652    protected synchronized void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) {
653        log.debug("forwardToPort message: [{}]", m);
654        // remember who sent this
655        mLastSender = reply;
656
657        // forward the message to the registered recipients,
658        // which includes the communications monitor, except the sender.
659        // Schedule notification via the Swing event queue to ensure order
660        Runnable r = new XmtNotifier(m, mLastSender, this);
661        SwingUtilities.invokeLater(r);
662
663        // stream to port in single write, as that's needed by serial
664        int byteLength = lengthOfByteStream(m);
665        byte[]  msg= new byte[byteLength];
666        log.debug("copying message, length = {}", byteLength);
667        // add header
668        int offset = addHeaderToOutput(msg, m);
669
670        // add data content
671        int len = m.getNumDataElements();
672        log.debug("copying data to message, length = {}", len);
673        if (len > byteLength) { // happens somehow
674            log.warn("Invalid message array size {} for {} elements, truncated", byteLength, len);
675        }
676        for (int i = 0; (i < len && i < byteLength); i++) {
677            msg[i + offset] = (byte) m.getElement(i);
678        }
679        // add trailer
680        addTrailerToOutput(msg, len + offset, m);
681        // and stream the bytes
682        try {
683            if (ostream != null) {
684                if (log.isDebugEnabled()) {
685                    StringBuilder f = new StringBuilder("formatted message: ");
686                    for (int i = 0; i < msg.length; i++) {
687                        f.append(String.format("%02X ",0xFF & msg[i]));
688                    }
689                    log.debug(f.toString());
690                }
691                while (m.getRetries() >= 0) {
692                    if (portReadyToSend(controller)) {
693                        ostream.write(msg);
694                        ostream.flush();
695                        log.debug("written, msg timeout: {} mSec", m.getTimeout());
696                        break;
697                    } else if (m.getRetries() >= 0) {
698                        log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries());
699                        m.setRetries(m.getRetries() - 1);
700                        try {
701                            synchronized (xmtRunnable) {
702                                xmtRunnable.wait(m.getTimeout());
703                            }
704                        } catch (InterruptedException e) {
705                            Thread.currentThread().interrupt(); // retain if needed later
706                            log.error("retry wait interrupted");
707                        }
708                    } else {
709                        log.warn("sendMessage: port not ready for data sending: {}", Arrays.toString(msg));
710                    }
711                }
712            } else {  // ostream is null
713                // no stream connected
714                connectionWarn();
715            }
716        } catch (IOException | RuntimeException e) {
717            // TODO Currently there's no port recovery if an exception occurs
718            // must restart JMRI to clear xmtException.
719            xmtException = true;
720            portWarn(e);
721        }
722    }
723
724    protected void connectionWarn() {
725        log.warn("sendMessage: no connection established for {}", this.getClass().getName(), new Exception());
726    }
727
728    protected void portWarn(Exception e) {
729        log.warn("sendMessage: Exception: In {} port warn: ", this.getClass().getName(), e);
730    }
731
732    protected boolean connectionError = false;
733
734    protected void portWarnTCP(Exception e) {
735        log.warn("Exception java net: ", e);
736        connectionError = true;
737    }
738    // methods to connect/disconnect to a source of data in an AbstractPortController
739
740    public AbstractPortController controller = null;
741
742    public boolean status() {
743        return (ostream != null && istream != null);
744    }
745
746    protected volatile Thread xmtThread = null;
747    protected volatile Thread rcvThread = null;
748
749    protected volatile Runnable xmtRunnable = null;
750
751    /**
752     * Make connection to an existing PortController object.
753     *
754     * @param p the PortController
755     */
756    public void connectPort(AbstractPortController p) {
757        rcvException = false;
758        connectionError = false;
759        xmtException = false;
760        threadStopRequest = false;
761        try {
762            istream = p.getInputStream();
763            ostream = p.getOutputStream();
764            if (controller != null) {
765                log.warn("connectPort: connect called while connected");
766            } else {
767                log.debug("connectPort invoked");
768            }
769            controller = p;
770            // and start threads
771            xmtThread = jmri.util.ThreadingUtil.newThread(
772                xmtRunnable = new Runnable() {
773                    @Override
774                    public void run() {
775                        try {
776                            transmitLoop();
777                        } catch(ThreadDeath td) {
778                            if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", td, td);
779                            // ThreadDeath must be thrown per Java API Javadocs
780                            throw td;
781                        } catch (Throwable e) {
782                            if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", e, e);
783                        }
784                }
785            });
786            
787            String[] packages = this.getClass().getName().split("\\.");
788            xmtThread.setName(
789                (packages.length>=2 ? packages[packages.length-2]+"." :"")
790                +(packages.length>=1 ? packages[packages.length-1] :"")
791                +" Transmit thread");
792
793            xmtThread.setDaemon(true);
794            xmtThread.setPriority(Thread.MAX_PRIORITY-1);      //bump up the priority
795            xmtThread.start();
796
797            rcvThread = jmri.util.ThreadingUtil.newThread(
798                new Runnable() {
799                    @Override
800                    public void run() {
801                        receiveLoop();
802                    }
803                });
804            rcvThread.setName(
805                (packages.length>=2 ? packages[packages.length-2]+"." :"")
806                +(packages.length>=1 ? packages[packages.length-1] :"")
807                +" Receive thread");
808
809            rcvThread.setPriority(Thread.MAX_PRIORITY);      //bump up the priority
810            rcvThread.setDaemon(true);
811            rcvThread.start();
812            
813        } catch (RuntimeException e) {
814            log.error("Failed to start up communications. Error was: ", e);
815            log.debug("Full trace:", e);
816        }
817    }
818
819    /**
820     * Get the port name for this connection from the TrafficController.
821     *
822     * @return the name of the port
823     */
824    public String getPortName() {
825        return controller.getCurrentPortName();
826    }
827
828    /**
829     * Break connection to existing PortController object. Once broken, attempts
830     * to send via "message" member will fail.
831     *
832     * @param p the PortController
833     */
834    public void disconnectPort(AbstractPortController p) {
835        istream = null;
836        ostream = null;
837        if (controller != p) {
838            log.warn("disconnectPort: disconnect called from non-connected AbstractPortController");
839        }
840        controller = null;
841        threadStopRequest=true;
842    }
843
844    /**
845     * Check if PortController object can be sent to.
846     *
847     * @param p the PortController
848     * @return true if ready, false otherwise May throw an Exception.
849     */
850    public boolean portReadyToSend(AbstractPortController p) {
851        if (p != null && !xmtException && !rcvException) {
852            return true;
853        } else {
854            return false;
855        }
856    }
857
858    // data members to hold the streams
859    protected DataInputStream istream = null;
860    protected OutputStream ostream = null;
861
862    protected boolean rcvException = false;
863
864    protected int maxRcvExceptionCount = 100;
865
866    /**
867     * Handle incoming characters. This is a permanent loop, looking for input
868     * messages in character form on the stream connected to the PortController
869     * via {@link #connectPort(AbstractPortController)}.
870     * <p>
871     * Each turn of the loop is the receipt of a single message.
872     */
873    public void receiveLoop() {
874        log.debug("receiveLoop starts in {}", this);
875        int errorCount = 0;
876        while (errorCount < maxRcvExceptionCount && !threadStopRequest) { // stream close will exit via exception
877            try {
878                handleOneIncomingReply();
879                errorCount = 0;
880            } catch (java.io.InterruptedIOException e) {
881                // related to InterruptedException, catch first
882                break;
883            } catch (IOException e) {
884                rcvException = true;
885                reportReceiveLoopException(e);
886                break;
887            } catch (RuntimeException e1) {
888                log.error("Exception in receive loop: {}", e1.toString(), e1);
889                errorCount++;
890                if (errorCount == maxRcvExceptionCount) {
891                    rcvException = true;
892                    reportReceiveLoopException(e1);
893                }
894            }
895        }
896        if (!threadStopRequest) { // if e.g. unexpected end
897            ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), ConnectionStatus.CONNECTION_DOWN);
898            log.error("Exit from rcv loop in {}", this.getClass());
899            recovery(); // see if you can restart
900        }
901    }
902
903    /**
904     * Disconnect and reset the current PortController.
905     * Invoked at abnormal ending of receiveLoop.
906     */
907    protected final void recovery() {
908        AbstractPortController adapter = controller;
909        disconnectPort(controller);
910        adapter.recover();
911    }
912
913    /**
914     * Report an error on the receive loop. Separated so tests can suppress, even
915     * though message is asynchronous.
916     * @param e Exception encountered at lower level to trigger error, or null
917     */
918    protected void reportReceiveLoopException(Exception e) {
919        log.error("run: Exception: {} in {}", e.toString(), this.getClass().toString(), e);
920        jmri.jmrix.ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), jmri.jmrix.ConnectionStatus.CONNECTION_DOWN);
921        if (controller instanceof AbstractNetworkPortController) {
922            portWarnTCP(e);
923        }
924    }
925
926    protected abstract AbstractMRReply newReply();
927
928    protected abstract boolean endOfMessage(AbstractMRReply r);
929
930    /**
931     * Dummy routine, to be filled by protocols that have to skip some
932     * start-of-message characters.
933     * @param istream input source
934     * @throws IOException from underlying operations
935     */
936    protected void waitForStartOfReply(DataInputStream istream) throws IOException {
937    }
938
939    /**
940     * Read a single byte, protecting against various timeouts, etc.
941     * <p>
942     * When a port is set to have a receive timeout (via the
943     * {@link purejavacomm.SerialPort#enableReceiveTimeout(int)} method), some will return
944     * zero bytes or an EOFException at the end of the timeout. In that case, the read
945     * should be repeated to get the next real character.
946     *
947     * @param istream stream to read
948     * @return the byte read
949     * @throws java.io.IOException if unable to read
950     */
951    protected byte readByteProtected(DataInputStream istream) throws IOException {
952        if (istream == null) {
953            throw new IOException("Input Stream NULL when reading");
954        }
955        while (true) { // loop will repeat until character found
956            int nchars;
957            nchars = istream.read(rcvBuffer, 0, 1);
958            if (nchars == -1) {
959                // No more bytes can be read from the channel
960                throw new IOException("Connection not terminated normally");
961            }
962            if (nchars > 0) {
963                return rcvBuffer[0];
964            }
965        }
966    }
967
968    // Defined this way to reduce new object creation
969    private byte[] rcvBuffer = new byte[1];
970
971    /**
972     * Get characters from the input source, and file a message.
973     * <p>
974     * Returns only when the message is complete.
975     * <p>
976     * Only used in the Receive thread.
977     * <p>
978     * Handles timeouts on read by ignoring zero-length reads.
979     *
980     * @param msg     message to fill
981     * @param istream character source.
982     * @throws IOException when presented by the input source.
983     */
984    protected void loadChars(AbstractMRReply msg, DataInputStream istream)
985            throws IOException {
986        int i;
987        for (i = 0; i < msg.maxSize(); i++) {
988            byte char1 = readByteProtected(istream);
989            log.trace("char: {} i: {}",(char1&0xFF),i);
990            // if there was a timeout, flush any char received and start over
991            if (flushReceiveChars) {
992                log.warn("timeout flushes receive buffer: {}", msg);
993                msg.flush();
994                i = 0;  // restart
995                flushReceiveChars = false;
996            }
997            if (canReceive()) {
998                msg.setElement(i, char1);
999                if (endOfMessage(msg)) {
1000                    break;
1001                }
1002            } else {
1003                i--; // flush char
1004                log.error("unsolicited character received: {}", Integer.toHexString(char1));
1005            }
1006        }
1007    }
1008
1009    /**
1010     * Override in the system specific code if necessary
1011     *
1012     * @return true if it is okay to buffer receive characters into a reply
1013     *         message. When false, discard char received
1014     */
1015    protected boolean canReceive() {
1016        return true;
1017    }
1018
1019    private int retransmitCount = 0;
1020
1021    /**
1022     * Executes a reply distribution action on the appropriate thread for JMRI.
1023     * @param r a runnable typically encapsulating a MRReply and the iteration code needed to
1024     *          send it to all the listeners.
1025     */
1026    protected void distributeReply(Runnable r) {
1027        try {
1028            if (synchronizeRx) {
1029                SwingUtilities.invokeAndWait(r);
1030            } else {
1031                SwingUtilities.invokeLater(r);
1032            }
1033        } catch (InterruptedException ie) {
1034            if(threadStopRequest) return;
1035            log.error("Unexpected exception in invokeAndWait: {}{}", ie, ie.toString());
1036        } catch (java.lang.reflect.InvocationTargetException| RuntimeException e) {
1037            log.error("Unexpected exception in invokeAndWait: {}{}", e, e.toString());
1038            return;
1039        }
1040        log.debug("dispatch thread invoked");
1041    }
1042
1043    /**
1044     * Handle each reply when complete.
1045     * <p>
1046     * (This is public for testing purposes) Runs in the "Receive" thread.
1047     *
1048     * @throws java.io.IOException on error.
1049     */
1050    public void handleOneIncomingReply() throws IOException {
1051        // we sit in this until the message is complete, relying on
1052        // threading to let other stuff happen
1053
1054        // Create message off the right concrete class
1055        AbstractMRReply msg = newReply();
1056
1057        // wait for start if needed
1058        waitForStartOfReply(istream);
1059
1060        // message exists, now fill it
1061        loadChars(msg, istream);
1062
1063        if (threadStopRequest) return;
1064        
1065        // message is complete, dispatch it !!
1066        replyInDispatch = true;
1067        log.debug("dispatch reply of length {} contains \"{}\", state {}", msg.getNumDataElements(), msg, mCurrentState);
1068
1069        // forward the message to the registered recipients,
1070        // which includes the communications monitor
1071        // return a notification via the Swing event queue to ensure proper thread
1072        Runnable r = new RcvNotifier(msg, mLastSender, this);
1073        distributeReply(r);
1074
1075        if (!msg.isUnsolicited()) {
1076            // effect on transmit:
1077            switch (mCurrentState) {
1078                case WAITMSGREPLYSTATE: {
1079                    // check to see if the response was an error message we want
1080                    // to automatically handle by re-queueing the last sent
1081                    // message, otherwise go on to the next message
1082                    if (msg.isRetransmittableErrorMsg()) {
1083                        log.error("Automatic Recovery from Error Message: {}.  Retransmitted {} times.", msg, retransmitCount);
1084                        synchronized (xmtRunnable) {
1085                            mCurrentState = AUTORETRYSTATE;
1086                            if (retransmitCount > 0) {
1087                                try {
1088                                    xmtRunnable.wait(retransmitCount * 100L);
1089                                } catch (InterruptedException e) {
1090                                    Thread.currentThread().interrupt(); // retain if needed later
1091                                }
1092                            }
1093                            replyInDispatch = false;
1094                            xmtRunnable.notify();
1095                            retransmitCount++;
1096                        }
1097                    } else {
1098                        // update state, and notify to continue
1099                        synchronized (xmtRunnable) {
1100                            mCurrentState = NOTIFIEDSTATE;
1101                            replyInDispatch = false;
1102                            xmtRunnable.notify();
1103                            retransmitCount = 0;
1104                        }
1105                    }
1106                    break;
1107                }
1108                case WAITREPLYINPROGMODESTATE: {
1109                    // entering programming mode
1110                    mCurrentMode = PROGRAMINGMODE;
1111                    replyInDispatch = false;
1112
1113                    // check to see if we need to delay to allow decoders to become
1114                    // responsive
1115                    int warmUpDelay = enterProgModeDelayTime();
1116                    if (warmUpDelay != 0) {
1117                        try {
1118                            synchronized (xmtRunnable) {
1119                                xmtRunnable.wait(warmUpDelay);
1120                            }
1121                        } catch (InterruptedException e) {
1122                            Thread.currentThread().interrupt(); // retain if needed later
1123                        }
1124                    }
1125                    // update state, and notify to continue
1126                    synchronized (xmtRunnable) {
1127                        mCurrentState = OKSENDMSGSTATE;
1128                        xmtRunnable.notify();
1129                    }
1130                    break;
1131                }
1132                case WAITREPLYINNORMMODESTATE: {
1133                    // entering normal mode
1134                    mCurrentMode = NORMALMODE;
1135                    replyInDispatch = false;
1136                    // update state, and notify to continue
1137                    synchronized (xmtRunnable) {
1138                        mCurrentState = OKSENDMSGSTATE;
1139                        xmtRunnable.notify();
1140                    }
1141                    break;
1142                }
1143                default: {
1144                    replyInDispatch = false;
1145                    if (allowUnexpectedReply) {
1146                        log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg);
1147                        synchronized (xmtRunnable) {
1148                            // The transmit thread sometimes gets stuck
1149                            // when unexpected replies are received.  Notify
1150                            // it to clear the block without a timeout.
1151                            // (do not change the current state)
1152                            //if(mCurrentState!=IDLESTATE)
1153                            xmtRunnable.notify();
1154                        }
1155                    } else {
1156                        unexpectedReplyStateError(mCurrentState, msg.toString());
1157                    }
1158                }
1159            }
1160            // Unsolicited message
1161        } else {
1162            log.debug("Unsolicited Message Received {}", msg);
1163
1164            replyInDispatch = false;
1165        }
1166    }
1167
1168    /**
1169     * Log an error message for a message received in an unexpected state.
1170     * @param State message state.
1171     * @param msgString message string.
1172     */
1173    protected void unexpectedReplyStateError(int State, String msgString) {
1174       String[] packages = this.getClass().getName().split("\\.");
1175       String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
1176                     +(packages.length>=1 ? packages[packages.length-1] :"");
1177       log.error("reply complete in unexpected state: {} was {} in class {}", State, msgString, name);
1178    }
1179
1180    /**
1181     * for testing purposes, let us be able to find out
1182     * what the last sender was.
1183     * @return last sender, mLastSender.
1184     */
1185    public AbstractMRListener getLastSender() {
1186        return mLastSender;
1187    }
1188
1189    // Override the finalize method for this class
1190    // to request termination, which might have happened
1191    // before in any case
1192    /**
1193     * finalize deprecated in Java 9, but not yet removed
1194     * @deprecated since Java 9
1195     */
1196    @Override
1197    @Deprecated
1198    protected final void finalize() throws Throwable {
1199        terminate();
1200        super.finalize();
1201    }
1202
1203    protected void terminate() {
1204        log.debug("Cleanup Starts");
1205        if (ostream == null) {
1206            return;    // no connection established
1207        }
1208        AbstractMRMessage modeMsg = enterNormalMode();
1209        if (modeMsg != null) {
1210            modeMsg.setRetries(100); // set the number of retries
1211            // high, just in case the interface
1212            // is busy when we try to send
1213            forwardToPort(modeMsg, null);
1214            // wait for reply
1215            try {
1216                if (xmtRunnable != null) {
1217                    synchronized (xmtRunnable) {
1218                        xmtRunnable.wait(modeMsg.getTimeout());
1219                    }
1220                }
1221            } catch (InterruptedException e) {
1222                Thread.currentThread().interrupt(); // retain if needed later
1223                log.error("transmit interrupted");
1224            }
1225        }
1226    }
1227
1228    /**
1229     * Internal class to remember the Reply object and destination listener with
1230     * a reply is received.
1231     */
1232    protected static class RcvNotifier implements Runnable {
1233
1234        AbstractMRReply mMsg;
1235        AbstractMRListener mDest;
1236        AbstractMRTrafficController mTc;
1237
1238        public RcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1239                AbstractMRTrafficController pTc) {
1240            mMsg = pMsg;
1241            mDest = pDest;
1242            mTc = pTc;
1243        }
1244
1245        @Override
1246        public void run() {
1247            log.debug("Delayed rcv notify starts");
1248            mTc.notifyReply(mMsg, mDest);
1249        }
1250    } // end RcvNotifier
1251
1252    // allow creation of object outside package
1253    protected RcvNotifier newRcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1254            AbstractMRTrafficController pTc) {
1255        return new RcvNotifier(pMsg, pDest, pTc);
1256    }
1257
1258    /**
1259     * Internal class to remember the Message object and destination listener
1260     * when a message is queued for notification.
1261     */
1262    protected static class XmtNotifier implements Runnable {
1263
1264        AbstractMRMessage mMsg;
1265        AbstractMRListener mDest;
1266        AbstractMRTrafficController mTc;
1267
1268        public XmtNotifier(AbstractMRMessage pMsg, AbstractMRListener pDest,
1269                AbstractMRTrafficController pTc) {
1270            mMsg = pMsg;
1271            mDest = pDest;
1272            mTc = pTc;
1273        }
1274
1275        @Override
1276        public void run() {
1277            log.debug("Delayed xmt notify starts");
1278            mTc.notifyMessage(mMsg, mDest);
1279        }
1280    }  // end XmtNotifier
1281
1282    /**
1283     * Terminate the receive and transmit threads.
1284     * <p>
1285     * This is intended to be used only by testing subclasses.
1286     */
1287    public void terminateThreads() {
1288        threadStopRequest = true;
1289        if (xmtThread != null) {
1290            xmtThread.interrupt();
1291            try {
1292                xmtThread.join();
1293            } catch (InterruptedException ie){
1294                // interrupted during cleanup.
1295            }
1296        }
1297        
1298        if (rcvThread != null) {
1299            rcvThread.interrupt();
1300            try {
1301                rcvThread.join();
1302            } catch (InterruptedException ie){
1303                // interrupted during cleanup.
1304            }
1305        }    
1306        // we also need to remove the shutdown task. 
1307        InstanceManager.getDefault(ShutDownManager.class).deregister(shutDownTask);
1308    }
1309    
1310    /**
1311     * Flag that threads should terminate as soon as they can.
1312     */
1313    protected volatile boolean threadStopRequest = false;
1314    
1315    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(AbstractMRTrafficController.class);
1316
1317}