001package jmri.jmrix;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.io.DataInputStream;
005import java.io.IOException;
006import java.io.OutputStream;
007import java.util.*;
008import javax.swing.SwingUtilities;
009
010import jmri.InstanceManager;
011import jmri.ShutDownManager;
012import jmri.ShutDownTask;
013
014/**
015 * Abstract base for TrafficControllers in a Message/Reply protocol.
016 * <p>
017 * Two threads are used for the actual communication. The "Transmit" thread
018 * handles pushing characters to the port, and also changing the mode. The
019 * "Receive" thread converts characters from the input stream into replies.
020 * <p>
021 * The constructor registers a shutdown task to
022 * trigger the necessary cleanup code
023 * <p>
024 * The internal state machine handles changes of mode, automatic retry of
025 * certain messages, time outs, and sending poll messages when otherwise idle.
026 * <p>
027 * "Mode" refers to the state of the command station communications. "Normal"
028 * and "Programming" are the two modes, used if the command station requires
029 * messages to go back and forth between them. <br>
030 *
031 * <img src="doc-files/AbstractMRTrafficController-StateDiagram.png" alt="UML State diagram">
032 *
033 * <p>
034 * The key methods for the basic operation are:
035 * <ul>
036 * <li>If needed for formatting outbound messages, {@link #addHeaderToOutput(byte[], AbstractMRMessage)} and {@link #addTrailerToOutput(byte[], int, AbstractMRMessage)}
037 * <li> {@link #newReply()} creates an empty reply message (of the proper concrete type) to fill with incoming data
038 * <li>The {@link #endOfMessage(AbstractMRReply) } method is used to parse incoming messages. If it needs
039 *      information on e.g. the last message sent, that can be stored in member variables
040 *      by {@link #forwardToPort(AbstractMRMessage, AbstractMRListener)}.
041 *  <li>{@link #forwardMessage(AbstractMRListener, AbstractMRMessage)} and {@link #forwardReply(AbstractMRListener, AbstractMRReply) } handle forwarding of specific types of objects
042 * </ul>
043 * <p>
044 * If your command station requires messages to go in and out of
045 * "programming mode", those should be provided by
046 * {@link #enterProgMode()} and {@link #enterNormalMode()}.
047 * <p>
048 * If you want to poll for information when the line is otherwise idle,
049 * implement {@link #pollMessage()} and {@link #pollReplyHandler()}.
050 *
051 * @author Bob Jacobsen Copyright (C) 2003
052 * @author Paul Bender Copyright (C) 2004-2010
053 */
054
055/*
056@startuml jmri/jmrix/doc-files/AbstractMRTrafficController-StateDiagram.png
057
058    [*] --> IDLESTATE
059    IDLESTATE --> NOTIFIEDSTATE : sendMessage()
060    NOTIFIEDSTATE --> IDLESTATE : queue empty
061
062    NOTIFIEDSTATE --> WAITMSGREPLYSTATE : transmitLoop()\nwake, send message
063
064    WAITMSGREPLYSTATE --> WAITREPLYINPROGMODESTATE : transmitLoop()\nnot in PROGRAMINGMODE,\nmsg for PROGRAMINGMODE
065    WAITMSGREPLYSTATE --> WAITREPLYINNORMMODESTATE : transmitLoop()\nnot in NORMALMODE,\nmsg for NORMALMODE
066
067    WAITMSGREPLYSTATE --> NOTIFIEDSTATE : handleOneIncomingReply()
068
069    WAITREPLYINPROGMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered PROGRAMINGMODE
070    WAITREPLYINNORMMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered NORMALMODE
071    OKSENDMSGSTATE --> WAITMSGREPLYSTATE : send original pended message
072
073    IDLESTATE --> POLLSTATE : transmitLoop()\nno work
074    POLLSTATE --> WAITMSGREPLYSTATE : transmitLoop()\npoll msg exists, send it
075    POLLSTATE --> IDLESTATE : transmitLoop()\nno poll msg to send
076
077    WAITMSGREPLYSTATE --> AUTORETRYSTATE : handleOneIncomingReply()\nwhen tagged as error reply
078    AUTORETRYSTATE --> IDLESTATE : to drive a repeat of a message
079
080NOTIFIEDSTATE : Transmit thread wakes up and processes
081POLLSTATE : Transient while deciding to send poll
082OKSENDMSGSTATE : Transient while deciding to send\noriginal message after mode change
083AUTORETRYSTATE : Transient while deciding to resend auto-retry message
084WAITREPLYINPROGMODESTATE : Sent request to go to programming mode,\nwaiting reply
085WAITREPLYINNORMMODESTATE : Sent request to go to normal mode,\nwaiting reply
086WAITMSGREPLYSTATE : Have sent message, waiting a\nresponse from layout
087
088Note left of AUTORETRYSTATE : This state handles timeout of\nmessages marked for autoretry
089Note left of OKSENDMSGSTATE : Transient internal state\nwill transition when going back\nto send message that\nwas deferred for mode change.
090
091@enduml
092 */
093
094public abstract class AbstractMRTrafficController {
095
096    private final Runnable shutDownTask = this::terminate; // retain for possible removal.
097
098    /**
099     * Create a new unnamed MRTrafficController.
100     */
101    public AbstractMRTrafficController() {
102        log.debug("Creating AbstractMRTrafficController instance");
103        mCurrentMode = NORMALMODE;
104        mCurrentState = IDLESTATE;
105        allowUnexpectedReply = false;
106
107
108        // We use a shutdown task here to make sure the connection is left
109        // in a clean state prior to exiting.  This is required on systems
110        // which have a service mode to ensure we don't leave the system
111        // in an unusable state. Once the shutdown task executes, the connection
112        // must be considered permanently closed.
113
114        InstanceManager.getDefault(ShutDownManager.class).register(shutDownTask);
115    }
116
117    private boolean synchronizeRx = true;
118
119    protected void setSynchronizeRx(boolean val) {
120        synchronizeRx = val;
121    }
122
123    protected boolean getSynchronizeRx() {
124        return synchronizeRx;
125    }
126
127    // The methods to implement the abstract Interface
128
129    protected final Vector<AbstractMRListener> cmdListeners = new Vector<>();
130
131    protected synchronized void addListener(AbstractMRListener l) {
132        // add only if not already registered
133        if (l == null) {
134            throw new NullPointerException();
135        }
136        if (!cmdListeners.contains(l)) {
137            cmdListeners.addElement(l);
138        }
139    }
140
141    protected synchronized void removeListener(AbstractMRListener l) {
142        if (cmdListeners.contains(l)) {
143            cmdListeners.removeElement(l);
144        }
145    }
146
147    /**
148     * Forward a Message to registered listeners.
149     *
150     * @param m     Message to be forwarded intact
151     * @param notMe One (optional) listener to be skipped, usually because it's
152     *              the originating object.
153     */
154    @SuppressWarnings("unchecked")
155    protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) {
156        // make a copy of the listener vector to synchronized not needed for transmit
157        Vector<AbstractMRListener> v;
158        synchronized (this) {
159            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
160            v = (Vector<AbstractMRListener>) cmdListeners.clone();
161        }
162        // forward to all listeners
163        int cnt = v.size();
164        for (int i = 0; i < cnt; i++) {
165            AbstractMRListener client = v.elementAt(i);
166            if (notMe != client) {
167                log.debug("notify message, client: {}", client);
168                try {
169                    forwardMessage(client, m);
170                } catch (RuntimeException e) {
171                    log.warn("notify: During message dispatch to {}", client, e);
172                }
173            }
174        }
175    }
176
177    /**
178     * Implement this to forward a specific message type to a protocol-specific
179     * listener interface.
180     * This puts the casting into the concrete class.
181     * @param client abstract listener.
182     * @param m message to forward.
183     */
184    protected abstract void forwardMessage(AbstractMRListener client, AbstractMRMessage m);
185
186    /**
187     * Invoked if it's appropriate to do low-priority polling of the command
188     * station, this should return the next message to send, or null if the
189     * TrafficController should just sleep.
190     * @return Formatted poll message
191     */
192    protected abstract AbstractMRMessage pollMessage();
193
194    protected abstract AbstractMRListener pollReplyHandler();
195
196    protected AbstractMRListener mLastSender = null;
197
198    protected volatile int mCurrentMode;
199    public static final int NORMALMODE = 1;
200    public static final int PROGRAMINGMODE = 4;
201
202    /**
203     * Set the system to programming mode.
204     * @see #enterNormalMode()
205     *
206     * @return any message that needs to be returned to the Command Station
207     * to change modes. If no message is needed, returns null.
208     */
209    protected abstract AbstractMRMessage enterProgMode();
210
211    /**
212     * Sets the system to normal mode during programming while in IDLESTATE.
213     * If {@link #programmerIdle()} returns true, enterNormalMode() is
214     * called after a timeout.
215     * @see #enterProgMode()
216     *
217     * @return any message that needs to be returned to the Command Station
218     * to change modes. If no message is needed, returns null.
219     */
220    protected abstract AbstractMRMessage enterNormalMode();
221
222    /**
223     * Check if the programmer is idle.
224     * Override in the system specific code if necessary (see notes for
225     * {@link #enterNormalMode()}.
226     *
227     * @return true if not busy programming
228     */
229    protected boolean programmerIdle() {
230        return true;
231    }
232
233    /**
234     * Get the delay (wait time) after enabling the programming track.
235     * Override in subclass to add a longer delay.
236     *
237     * @return 0 as default delay
238     */
239    protected int enterProgModeDelayTime() {
240        return 0;
241    }
242
243    protected volatile int mCurrentState;
244    public static final int IDLESTATE = 10;        // nothing happened
245    public static final int NOTIFIEDSTATE = 15;    // xmt notified, will next wake
246    public static final int WAITMSGREPLYSTATE = 25;  // xmt has sent, await reply to message
247    public static final int WAITREPLYINPROGMODESTATE = 30;  // xmt has done mode change, await reply
248    public static final int WAITREPLYINNORMMODESTATE = 35;  // xmt has done mode change, await reply
249    public static final int OKSENDMSGSTATE = 40;        // mode change reply here, send original msg
250    public static final int AUTORETRYSTATE = 45;        // received message where automatic recovery may occur with a retransmission, re-send original msg
251    public static final int POLLSTATE = 50;   // Send program mode or poll message
252
253    protected boolean allowUnexpectedReply;
254
255    /**
256     * Set whether the command station may send messages without a request
257     * sent to it.
258     *
259     * @param expected true to allow messages without a prior request
260     */
261    protected void setAllowUnexpectedReply(boolean expected) {
262        allowUnexpectedReply = expected;
263    }
264
265    /**
266     * Forward a "Reply" from layout to registered listeners.
267     *
268     * @param r    Reply to be forwarded intact
269     * @param dest One (optional) listener to be skipped, usually because it's
270     *             the originating object.
271     */
272    @SuppressWarnings("unchecked")
273    protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) {
274        // make a copy of the listener vector to synchronized (not needed for transmit?)
275        Vector<AbstractMRListener> v;
276        synchronized (this) {
277            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
278            v = (Vector<AbstractMRListener>) cmdListeners.clone();
279        }
280        // forward to all listeners
281        int cnt = v.size();
282        for (int i = 0; i < cnt; i++) {
283            AbstractMRListener client = v.elementAt(i);
284            log.debug("notify reply, client: {}", client);
285            try {
286                //skip dest for now, we'll send the message to there last.
287                if (dest != client) {
288                    forwardReply(client, r);
289                }
290            } catch (RuntimeException e) {
291                log.warn("notify: During reply dispatch to {}", client, e);
292            }
293        }
294
295        // forward to the last listener who sent a message
296        // this is done _second_ so monitoring can have already stored the reply
297        // before a response is sent
298        if (dest != null) {
299            log.debug("notify reply, dest: {}", dest);
300            forwardReply(dest, r);
301        }
302    }
303
304    protected abstract void forwardReply(AbstractMRListener client, AbstractMRReply m);
305
306    /**
307     * Messages to be transmitted.
308     */
309    protected LinkedList<AbstractMRMessage> msgQueue = new LinkedList<>();
310    protected LinkedList<AbstractMRListener> listenerQueue = new LinkedList<>();
311
312    /**
313     * Forward message to the port. Messages are queued and then the
314     * transmission thread is notified.
315     * @see #forwardToPort(AbstractMRMessage, AbstractMRListener)
316     *
317     * @param m the message to send
318     * @param reply the Listener sending the message, often provided as 'this'
319     */
320    protected synchronized void sendMessage(AbstractMRMessage m, AbstractMRListener reply) {
321        msgQueue.addLast(m);
322        listenerQueue.addLast(reply);
323        synchronized (xmtRunnable) {
324            if (mCurrentState == IDLESTATE) {
325                mCurrentState = NOTIFIEDSTATE;
326                xmtRunnable.notify();
327            }
328        }
329        if (m != null) {
330            log.debug("just notified transmit thread with message {}", m);
331        }
332    }
333
334    /**
335     * Permanent loop for the transmit thread.
336     */
337    protected void transmitLoop() {
338        log.debug("transmitLoop starts in {}", this);
339
340        // loop forever
341        while (!connectionError && !threadStopRequest) {
342            AbstractMRMessage m = null;
343            AbstractMRListener l = null;
344            // check for something to do
345            synchronized (this) {
346                if (!msgQueue.isEmpty()) {
347                    // yes, something to do
348                    m = msgQueue.getFirst();
349                    msgQueue.removeFirst();
350                    l = listenerQueue.getFirst();
351                    listenerQueue.removeFirst();
352                    mCurrentState = WAITMSGREPLYSTATE;
353                    log.debug("transmit loop has something to do: {}", m);
354                }  // release lock here to proceed in parallel
355            }
356            // if a message has been extracted, process it
357            if (m != null) {
358                // check for need to change mode
359                log.debug("Start msg, state = {}", mCurrentMode);
360                if (m.getNeededMode() != mCurrentMode) {
361                    AbstractMRMessage modeMsg;
362                    if (m.getNeededMode() == PROGRAMINGMODE) {
363                        // change state to programming mode and send message
364                        modeMsg = enterProgMode();
365                        if (modeMsg != null) {
366                            mCurrentState = WAITREPLYINPROGMODESTATE;
367                            log.debug("Enter Programming Mode");
368                            forwardToPort(modeMsg, null);
369                            // wait for reply
370                            transmitWait(m.getTimeout(), WAITREPLYINPROGMODESTATE, "enter programming mode interrupted");
371                        }
372                    } else {
373                        // change state to normal and send message
374                        modeMsg = enterNormalMode();
375                        if (modeMsg != null) {
376                            mCurrentState = WAITREPLYINNORMMODESTATE;
377                            log.debug("Enter Normal Mode");
378                            forwardToPort(modeMsg, null);
379                            // wait for reply
380                            transmitWait(m.getTimeout(), WAITREPLYINNORMMODESTATE, "enter normal mode interrupted");
381                        }
382                    }
383                    if (modeMsg != null) {
384                        checkReplyInDispatch();
385                        if (mCurrentState != OKSENDMSGSTATE) {
386                            handleTimeout(modeMsg, l);
387                        }
388                        mCurrentState = WAITMSGREPLYSTATE;
389                    } else {
390                        // no mode message required, but the message
391                        // needs a different mode
392                        log.debug("Setting mode to: {}", m.getNeededMode());
393                        mCurrentMode = m.getNeededMode();
394                    }
395                }
396                forwardToPort(m, l);
397                // reply expected?
398                if (m.replyExpected()) {
399                    log.debug("reply expected is true for message {}",m);
400                    // wait for a reply, or eventually timeout
401                    transmitWait(m.getTimeout(), WAITMSGREPLYSTATE, "transmitLoop interrupted");
402                    checkReplyInDispatch();
403                    if (mCurrentState == WAITMSGREPLYSTATE) {
404                        handleTimeout(m, l);
405                    } else if (mCurrentState == AUTORETRYSTATE) {
406                        log.info("Message added back to queue: {}", m);
407                        msgQueue.addFirst(m);
408                        listenerQueue.addFirst(l);
409                        synchronized (xmtRunnable) {
410                            mCurrentState = IDLESTATE;
411                        }
412                    } else {
413                        resetTimeout(m);
414                    }
415                } // just continue to the next message from here
416            } else {
417                // nothing to do
418                if (mCurrentState != IDLESTATE) {
419                    log.debug("Setting IDLESTATE");
420                    log.debug("Current Mode {}", mCurrentMode);
421                    mCurrentState = IDLESTATE;
422                }
423                // wait for something to send
424                if (mWaitBeforePoll > waitTimePoll || mCurrentMode == PROGRAMINGMODE) {
425                    try {
426                        long startTime = Calendar.getInstance().getTimeInMillis();
427                        synchronized (xmtRunnable) {
428                            xmtRunnable.wait(mWaitBeforePoll);
429                        }
430                        long endTime = Calendar.getInstance().getTimeInMillis();
431                        waitTimePoll = waitTimePoll + endTime - startTime;
432                    } catch (InterruptedException e) {
433                        Thread.currentThread().interrupt(); // retain if needed later
434                        // end of transmit loop
435                        break;
436                    }
437                }
438                // once we decide that mCurrentState is in the IDLESTATE and there's an xmt msg we must guarantee
439                // the change of mCurrentState to one of the waiting for reply states.  Therefore we need to synchronize.
440                synchronized (this) {
441                    if (mCurrentState != NOTIFIEDSTATE && mCurrentState != IDLESTATE) {
442                        log.error("left timeout in unexpected state: {}", mCurrentState);
443                    }
444                    if (mCurrentState == IDLESTATE) {
445                        mCurrentState = POLLSTATE; // this prevents other transitions from the IDLESTATE
446                    }
447                }
448                // went around with nothing to do; leave programming state if in it
449                if (mCurrentMode == PROGRAMINGMODE) {
450                    log.debug("Timeout - in service mode");
451                }
452                if (mCurrentState == POLLSTATE && mCurrentMode == PROGRAMINGMODE && programmerIdle()) {
453                    log.debug("timeout causes leaving programming mode");
454                    mCurrentState = WAITREPLYINNORMMODESTATE;
455                    AbstractMRMessage msg = enterNormalMode();
456                    // if the enterNormalMode() message is null, we
457                    // don't want to try to send it to the port.
458                    if (msg != null) {
459                        forwardToPort(msg, null);
460                        // wait for reply
461                        transmitWait(msg.getTimeout(), WAITREPLYINNORMMODESTATE, "interrupted while leaving programming mode");
462                        checkReplyInDispatch();
463                        // exit program mode timeout?
464                        if (mCurrentState == WAITREPLYINNORMMODESTATE) {
465                            // entering normal mode via timeout
466                            handleTimeout(msg, l);
467                            mCurrentMode = NORMALMODE;
468                        }
469                        // and go around again
470                    }
471                } else if (mCurrentState == POLLSTATE && mCurrentMode == NORMALMODE) {
472                    // We may need to poll
473                    AbstractMRMessage msg = pollMessage();
474                    if (msg != null) {
475                        // yes, send that
476                        log.debug("Sending poll, wait time {}", waitTimePoll);
477                        mCurrentState = WAITMSGREPLYSTATE;
478                        forwardToPort(msg, pollReplyHandler());
479                        // wait for reply
480                        log.debug("Still waiting for reply");
481                        transmitWait(msg.getTimeout(), WAITMSGREPLYSTATE, "interrupted while waiting poll reply");
482                        checkReplyInDispatch();
483                        // and go around again
484                        if (mCurrentState == WAITMSGREPLYSTATE) {
485                            handleTimeout(msg, l);
486                        } else {
487                            resetTimeout(msg);
488                        }
489                    }
490                    waitTimePoll = 0;
491                }
492                // no messages, so back to idle
493                if (mCurrentState == POLLSTATE) {
494                    mCurrentState = IDLESTATE;
495                }
496            }
497        }
498    }   // end of transmit loop; go around again
499
500    protected void transmitWait(int waitTime, int state, String interruptMessage) {
501        // wait() can have spurious wakeup!
502        // so we protect by making sure the entire timeout time is used
503        long currentTime = Calendar.getInstance().getTimeInMillis();
504        long endTime = currentTime + waitTime;
505        while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) {
506            long wait = endTime - currentTime;
507            try {
508                synchronized (xmtRunnable) {
509                    // Do not wait if the current state has changed since we
510                    // last set it.
511                    if (mCurrentState != state) {
512                        return;
513                    }
514                    xmtRunnable.wait(wait); // rcvr normally ends this w state change
515                }
516            } catch (InterruptedException e) {
517                Thread.currentThread().interrupt(); // retain if needed later
518                String[] packages = this.getClass().getName().split("\\.");
519                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
520                        +(packages.length>=1 ? packages[packages.length-1] :"");
521                if (!threadStopRequest) {
522                    log.error("{} in transmitWait(..) of {}", interruptMessage, name);
523                } else {
524                    log.debug("during shutdown, {}  in transmitWait(..) of {}", interruptMessage, name);
525                }
526            }
527        }
528        log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState);
529    }
530
531    // Dispatch control and timer
532    protected boolean replyInDispatch = false;          // true when reply has been received but dispatch not completed
533    private int maxDispatchTime = 0;
534    private int warningMessageTime = DISPATCH_WARNING_TIME;
535    private static final int DISPATCH_WAIT_INTERVAL = 100;
536    private static final int DISPATCH_WARNING_TIME = 12000; // report warning when max dispatch time exceeded
537    private static final int WARN_NEXT_TIME = 1000;         // report every second
538
539    private void checkReplyInDispatch() {
540        int loopCount = 0;
541        while (replyInDispatch) {
542            try {
543                synchronized (xmtRunnable) {
544                    xmtRunnable.wait(DISPATCH_WAIT_INTERVAL);
545                }
546            } catch (InterruptedException e) {
547                Thread.currentThread().interrupt(); // retain if needed later
548                if (threadStopRequest) return; // don't log an error if closing.
549                String[] packages = this.getClass().getName().split("\\.");
550                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
551                        +(packages.length>=1 ? packages[packages.length-1] :"");
552                log.error("transmitLoop interrupted in class {}", name);
553            }
554            loopCount++;
555            int currentDispatchTime = loopCount * DISPATCH_WAIT_INTERVAL;
556            if (currentDispatchTime > maxDispatchTime) {
557                maxDispatchTime = currentDispatchTime;
558                if (currentDispatchTime >= warningMessageTime) {
559                    warningMessageTime = warningMessageTime + WARN_NEXT_TIME;
560                    log.debug("Max dispatch time is now {}", currentDispatchTime);
561                }
562            }
563        }
564    }
565
566    /**
567     *  Determine if the interface is down.
568     *
569     *  @return timeoutFlag
570     */
571    public boolean hasTimeouts() {
572        return timeoutFlag;
573    }
574
575    private boolean timeoutFlag = false;
576    private int timeouts = 0;
577    protected boolean flushReceiveChars = false;
578
579    protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) {
580        //log.debug("Timeout mCurrentState: {}", mCurrentState);
581        String[] packages = this.getClass().getName().split("\\.");
582        String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
583                +(packages.length>=1 ? packages[packages.length-1] :"");
584
585        log.warn("Timeout on reply to message: {} consecutive timeouts = {} in {}", msg, timeouts, name);
586        timeouts++;
587        timeoutFlag = true;
588        flushReceiveChars = true;
589    }
590
591    protected void resetTimeout(AbstractMRMessage msg) {
592        if (timeouts > 0) {
593            log.debug("Reset timeout after {} timeouts", timeouts);
594        }
595        timeouts = 0;
596        timeoutFlag = false;
597    }
598
599    /**
600     * Add header to the outgoing byte stream.
601     *
602     * @param msg the output byte stream
603     * @param m Message results
604     * @return next location in the stream to fill
605     */
606    protected int addHeaderToOutput(byte[] msg, AbstractMRMessage m) {
607        return 0;
608    }
609
610    protected int mWaitBeforePoll = 100;
611    protected long waitTimePoll = 0;
612
613    /**
614     * Add trailer to the outgoing byte stream.
615     *
616     * @param msg    the output byte stream
617     * @param offset the first byte not yet used
618     * @param m   output message to extend
619     */
620    protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) {
621        if (!m.isBinary()) {
622            msg[offset] = 0x0d;
623        }
624    }
625
626    /**
627     * Determine how many bytes the entire message will take, including
628     * space for header and trailer.
629     *
630     * @param m the message to be sent
631     * @return number of bytes
632     */
633    protected int lengthOfByteStream(AbstractMRMessage m) {
634        int len = m.getNumDataElements();
635        int cr = 0;
636        if (!m.isBinary()) {
637            cr = 1;  // space for return char
638        }
639        return len + cr;
640    }
641
642    protected boolean xmtException = false;
643
644    /**
645     * Actually transmit the next message to the port.
646     * @see #sendMessage(AbstractMRMessage, AbstractMRListener)
647     *
648     * @param m the message to send
649     * @param reply the Listener sending the message, often provided as 'this'
650     */
651    @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT"},
652            justification = "Two locks needed for synchronization here, this is OK")
653    protected synchronized void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) {
654        log.debug("forwardToPort message: [{}]", m);
655        // remember who sent this
656        mLastSender = reply;
657
658        // forward the message to the registered recipients,
659        // which includes the communications monitor, except the sender.
660        // Schedule notification via the Swing event queue to ensure order
661        Runnable r = new XmtNotifier(m, mLastSender, this);
662        SwingUtilities.invokeLater(r);
663
664        // stream to port in single write, as that's needed by serial
665        int byteLength = lengthOfByteStream(m);
666        byte[]  msg= new byte[byteLength];
667        log.debug("copying message, length = {}", byteLength);
668        // add header
669        int offset = addHeaderToOutput(msg, m);
670
671        // add data content
672        int len = m.getNumDataElements();
673        log.debug("copying data to message, length = {}", len);
674        if (len > byteLength) { // happens somehow
675            log.warn("Invalid message array size {} for {} elements, truncated", byteLength, len);
676        }
677        for (int i = 0; (i < len && i < byteLength); i++) {
678            msg[i + offset] = (byte) m.getElement(i);
679        }
680        // add trailer
681        addTrailerToOutput(msg, len + offset, m);
682        // and stream the bytes
683        try {
684            if (ostream != null) {
685                if (log.isDebugEnabled()) {
686                    StringBuilder f = new StringBuilder();
687                    for (int i = 0; i < msg.length; i++) {
688                        f.append(String.format("%02X ",0xFF & msg[i]));
689                    }
690                    log.debug("formatted message: {}", f.toString() );
691                }
692                while (m.getRetries() >= 0) {
693                    if (portReadyToSend(controller)) {
694                        ostream.write(msg);
695                        ostream.flush();
696                        log.debug("written, msg timeout: {} mSec", m.getTimeout());
697                        break;
698                    } else if (m.getRetries() >= 0) {
699                        log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries());
700                        m.setRetries(m.getRetries() - 1);
701                        try {
702                            synchronized (xmtRunnable) {
703                                xmtRunnable.wait(m.getTimeout());
704                            }
705                        } catch (InterruptedException e) {
706                            Thread.currentThread().interrupt(); // retain if needed later
707                            log.error("retry wait interrupted");
708                        }
709                    } else {
710                        log.warn("sendMessage: port not ready for data sending: {}", Arrays.toString(msg));
711                    }
712                }
713            } else {  // ostream is null
714                // no stream connected
715                connectionWarn();
716            }
717        } catch (IOException | RuntimeException e) {
718            // TODO Currently there's no port recovery if an exception occurs
719            // must restart JMRI to clear xmtException.
720            xmtException = true;
721            portWarn(e);
722        }
723    }
724
725    protected void connectionWarn() {
726        log.warn("sendMessage: no connection established for {}", this.getClass().getName(), new Exception());
727    }
728
729    protected void portWarn(Exception e) {
730        log.warn("sendMessage: Exception: In {} port warn: ", this.getClass().getName(), e);
731    }
732
733    protected boolean connectionError = false;
734
735    protected void portWarnTCP(Exception e) {
736        log.warn("Exception java net: ", e);
737        connectionError = true;
738    }
739    // methods to connect/disconnect to a source of data in an AbstractPortController
740
741    public AbstractPortController controller = null;
742
743    public boolean status() {
744        return (ostream != null && istream != null);
745    }
746
747    protected volatile Thread xmtThread = null;
748    protected volatile Thread rcvThread = null;
749
750    protected volatile Runnable xmtRunnable = null;
751
752    /**
753     * Make connection to an existing PortController object.
754     *
755     * @param p the PortController
756     */
757    public void connectPort(AbstractPortController p) {
758        rcvException = false;
759        connectionError = false;
760        xmtException = false;
761        threadStopRequest = false;
762        try {
763            istream = p.getInputStream();
764            ostream = p.getOutputStream();
765            if (controller != null) {
766                log.warn("connectPort: connect called while connected");
767            } else {
768                log.debug("connectPort invoked");
769            }
770            controller = p;
771            // and start threads
772            xmtThread = jmri.util.ThreadingUtil.newThread(
773                xmtRunnable = new Runnable() {
774                    @Override
775                    public void run() {
776                        try {
777                            transmitLoop();
778                        } catch (ThreadDeath td) {
779                            if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", td, td);
780                            // ThreadDeath must be thrown per Java API Javadocs
781                            throw td;
782                        } catch (Throwable e) {
783                            if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", e, e);
784                        }
785                }
786            });
787
788            String[] packages = this.getClass().getName().split("\\.");
789            xmtThread.setName(
790                (packages.length>=2 ? packages[packages.length-2]+"." :"")
791                +(packages.length>=1 ? packages[packages.length-1] :"")
792                +" Transmit thread");
793
794            xmtThread.setDaemon(true);
795            xmtThread.setPriority(Thread.MAX_PRIORITY-1);      //bump up the priority
796            xmtThread.start();
797
798            rcvThread = jmri.util.ThreadingUtil.newThread(
799                new Runnable() {
800                    @Override
801                    public void run() {
802                        receiveLoop();
803                    }
804                });
805            rcvThread.setName(
806                (packages.length>=2 ? packages[packages.length-2]+"." :"")
807                +(packages.length>=1 ? packages[packages.length-1] :"")
808                +" Receive thread");
809
810            rcvThread.setPriority(Thread.MAX_PRIORITY);      //bump up the priority
811            rcvThread.setDaemon(true);
812            rcvThread.start();
813
814        } catch (RuntimeException e) {
815            log.error("Failed to start up communications. Error was: ", e);
816            log.debug("Full trace:", e);
817        }
818    }
819
820    /**
821     * Get the port name for this connection from the TrafficController.
822     *
823     * @return the name of the port
824     */
825    public String getPortName() {
826        return controller.getCurrentPortName();
827    }
828
829    /**
830     * Break connection to existing PortController object. Once broken, attempts
831     * to send via "message" member will fail.
832     *
833     * @param p the PortController
834     */
835    public void disconnectPort(AbstractPortController p) {
836        istream = null;
837        ostream = null;
838        if (controller != p) {
839            log.warn("disconnectPort: disconnect called from non-connected AbstractPortController");
840        }
841        controller = null;
842        threadStopRequest = true;
843    }
844
845    /**
846     * Check if PortController object can be sent to.
847     *
848     * @param p the PortController
849     * @return true if ready, false otherwise May throw an Exception.
850     */
851    public boolean portReadyToSend(AbstractPortController p) {
852        if (p != null && !xmtException && !rcvException) {
853            return true;
854        } else {
855            return false;
856        }
857    }
858
859    // data members to hold the streams
860    protected DataInputStream istream = null;
861    protected OutputStream ostream = null;
862
863    protected boolean rcvException = false;
864
865    protected int maxRcvExceptionCount = 100;
866
867    /**
868     * Handle incoming characters. This is a permanent loop, looking for input
869     * messages in character form on the stream connected to the PortController
870     * via {@link #connectPort(AbstractPortController)}.
871     * <p>
872     * Each turn of the loop is the receipt of a single message.
873     */
874    public void receiveLoop() {
875        log.debug("receiveLoop starts in {}", this);
876        int errorCount = 0;
877        while (errorCount < maxRcvExceptionCount && !threadStopRequest) { // stream close will exit via exception
878            try {
879                handleOneIncomingReply();
880                errorCount = 0;
881            } catch (java.io.InterruptedIOException e) {
882                // related to InterruptedException, catch first
883                break;
884            } catch (IOException e) {
885                rcvException = true;
886                reportReceiveLoopException(e);
887                break;
888            } catch (RuntimeException e1) {
889                log.error("Exception in receive loop: {}", e1.toString(), e1);
890                errorCount++;
891                if (errorCount == maxRcvExceptionCount) {
892                    rcvException = true;
893                    reportReceiveLoopException(e1);
894                }
895            }
896        }
897        if (!threadStopRequest) { // if e.g. unexpected end
898            ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), ConnectionStatus.CONNECTION_DOWN);
899            log.error("Exit from rcv loop in {}", this.getClass());
900            recovery(); // see if you can restart
901        }
902    }
903
904    /**
905     * Disconnect and reset the current PortController.
906     * Invoked at abnormal ending of receiveLoop.
907     */
908    protected final void recovery() {
909        AbstractPortController adapter = controller;
910        disconnectPort(controller);
911        adapter.recover();
912    }
913
914    /**
915     * Report an error on the receive loop. Separated so tests can suppress, even
916     * though message is asynchronous.
917     * @param e Exception encountered at lower level to trigger error, or null
918     */
919    protected void reportReceiveLoopException(Exception e) {
920        log.error("run: Exception: {} in {}", e.toString(), this.getClass().toString(), e);
921        jmri.jmrix.ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), jmri.jmrix.ConnectionStatus.CONNECTION_DOWN);
922        if (controller instanceof AbstractNetworkPortController) {
923            portWarnTCP(e);
924        }
925    }
926
927    protected abstract AbstractMRReply newReply();
928
929    protected abstract boolean endOfMessage(AbstractMRReply r);
930
931    /**
932     * Dummy routine, to be filled by protocols that have to skip some
933     * start-of-message characters.
934     * @param istream input source
935     * @throws IOException from underlying operations
936     */
937    protected void waitForStartOfReply(DataInputStream istream) throws IOException {
938    }
939
940    /**
941     * Read a single byte, protecting against various timeouts, etc.
942     * <p>
943     * When a port is set to have a receive timeout (via the
944     * {@link purejavacomm.SerialPort#enableReceiveTimeout(int)} method), some will return
945     * zero bytes or an EOFException at the end of the timeout. In that case, the read
946     * should be repeated to get the next real character.
947     *
948     * @param istream stream to read
949     * @return the byte read
950     * @throws java.io.IOException if unable to read
951     */
952    protected byte readByteProtected(DataInputStream istream) throws IOException {
953        if (istream == null) {
954            throw new IOException("Input Stream NULL when reading");
955        }
956        while (true) { // loop will repeat until character found
957            int nchars;
958            nchars = istream.read(rcvBuffer, 0, 1);
959            if (nchars == -1) {
960                // No more bytes can be read from the channel
961                throw new IOException("Connection not terminated normally");
962            }
963            if (nchars > 0) {
964                return rcvBuffer[0];
965            }
966        }
967    }
968
969    // Defined this way to reduce new object creation
970    private byte[] rcvBuffer = new byte[1];
971
972    /**
973     * Get characters from the input source, and file a message.
974     * <p>
975     * Returns only when the message is complete.
976     * <p>
977     * Only used in the Receive thread.
978     * <p>
979     * Handles timeouts on read by ignoring zero-length reads.
980     *
981     * @param msg     message to fill
982     * @param istream character source.
983     * @throws IOException when presented by the input source.
984     */
985    protected void loadChars(AbstractMRReply msg, DataInputStream istream)
986            throws IOException {
987        int i;
988        for (i = 0; i < msg.maxSize(); i++) {
989            byte char1 = readByteProtected(istream);
990            log.trace("char: {} i: {}",(char1&0xFF),i);
991            // if there was a timeout, flush any char received and start over
992            if (flushReceiveChars) {
993                log.warn("timeout flushes receive buffer: {}", msg);
994                msg.flush();
995                i = 0;  // restart
996                flushReceiveChars = false;
997            }
998            if (canReceive()) {
999                msg.setElement(i, char1);
1000                if (endOfMessage(msg)) {
1001                    break;
1002                }
1003            } else {
1004                i--; // flush char
1005                log.error("unsolicited character received: {}", Integer.toHexString(char1));
1006            }
1007        }
1008    }
1009
1010    /**
1011     * Override in the system specific code if necessary
1012     *
1013     * @return true if it is okay to buffer receive characters into a reply
1014     *         message. When false, discard char received
1015     */
1016    protected boolean canReceive() {
1017        return true;
1018    }
1019
1020    private int retransmitCount = 0;
1021
1022    /**
1023     * Executes a reply distribution action on the appropriate thread for JMRI.
1024     * @param r a runnable typically encapsulating a MRReply and the iteration code needed to
1025     *          send it to all the listeners.
1026     */
1027    protected void distributeReply(Runnable r) {
1028        try {
1029            if (synchronizeRx) {
1030                SwingUtilities.invokeAndWait(r);
1031            } else {
1032                SwingUtilities.invokeLater(r);
1033            }
1034        } catch (InterruptedException ie) {
1035            if (threadStopRequest) return;
1036            log.error("Unexpected exception in invokeAndWait: {}{}", ie, ie.toString());
1037        } catch (java.lang.reflect.InvocationTargetException| RuntimeException e) {
1038            log.error("Unexpected exception in invokeAndWait: {}{}", e, e.toString());
1039            return;
1040        }
1041        log.debug("dispatch thread invoked");
1042    }
1043
1044    /**
1045     * Handle each reply when complete.
1046     * <p>
1047     * (This is public for testing purposes) Runs in the "Receive" thread.
1048     *
1049     * @throws java.io.IOException on error.
1050     */
1051    public void handleOneIncomingReply() throws IOException {
1052        // we sit in this until the message is complete, relying on
1053        // threading to let other stuff happen
1054
1055        // Create message off the right concrete class
1056        AbstractMRReply msg = newReply();
1057
1058        // wait for start if needed
1059        waitForStartOfReply(istream);
1060
1061        // message exists, now fill it
1062        loadChars(msg, istream);
1063
1064        if (threadStopRequest) return;
1065
1066        // message is complete, dispatch it !!
1067        replyInDispatch = true;
1068        log.debug("dispatch reply of length {} contains \"{}\", state {}", msg.getNumDataElements(), msg, mCurrentState);
1069
1070        // forward the message to the registered recipients,
1071        // which includes the communications monitor
1072        // return a notification via the Swing event queue to ensure proper thread
1073        Runnable r = new RcvNotifier(msg, mLastSender, this);
1074        distributeReply(r);
1075
1076        if (!msg.isUnsolicited()) {
1077            // effect on transmit:
1078            switch (mCurrentState) {
1079                case WAITMSGREPLYSTATE: {
1080                    // check to see if the response was an error message we want
1081                    // to automatically handle by re-queueing the last sent
1082                    // message, otherwise go on to the next message
1083                    if (msg.isRetransmittableErrorMsg()) {
1084                        log.error("Automatic Recovery from Error Message: {}.  Retransmitted {} times.", msg, retransmitCount);
1085                        synchronized (xmtRunnable) {
1086                            mCurrentState = AUTORETRYSTATE;
1087                            if (retransmitCount > 0) {
1088                                try {
1089                                    xmtRunnable.wait(retransmitCount * 100L);
1090                                } catch (InterruptedException e) {
1091                                    Thread.currentThread().interrupt(); // retain if needed later
1092                                }
1093                            }
1094                            replyInDispatch = false;
1095                            xmtRunnable.notify();
1096                            retransmitCount++;
1097                        }
1098                    } else {
1099                        // update state, and notify to continue
1100                        synchronized (xmtRunnable) {
1101                            mCurrentState = NOTIFIEDSTATE;
1102                            replyInDispatch = false;
1103                            xmtRunnable.notify();
1104                            retransmitCount = 0;
1105                        }
1106                    }
1107                    break;
1108                }
1109                case WAITREPLYINPROGMODESTATE: {
1110                    // entering programming mode
1111                    mCurrentMode = PROGRAMINGMODE;
1112                    replyInDispatch = false;
1113
1114                    // check to see if we need to delay to allow decoders to become
1115                    // responsive
1116                    int warmUpDelay = enterProgModeDelayTime();
1117                    if (warmUpDelay != 0) {
1118                        try {
1119                            synchronized (xmtRunnable) {
1120                                xmtRunnable.wait(warmUpDelay);
1121                            }
1122                        } catch (InterruptedException e) {
1123                            Thread.currentThread().interrupt(); // retain if needed later
1124                        }
1125                    }
1126                    // update state, and notify to continue
1127                    synchronized (xmtRunnable) {
1128                        mCurrentState = OKSENDMSGSTATE;
1129                        xmtRunnable.notify();
1130                    }
1131                    break;
1132                }
1133                case WAITREPLYINNORMMODESTATE: {
1134                    // entering normal mode
1135                    mCurrentMode = NORMALMODE;
1136                    replyInDispatch = false;
1137                    // update state, and notify to continue
1138                    synchronized (xmtRunnable) {
1139                        mCurrentState = OKSENDMSGSTATE;
1140                        xmtRunnable.notify();
1141                    }
1142                    break;
1143                }
1144                default: {
1145                    replyInDispatch = false;
1146                    if (allowUnexpectedReply) {
1147                        log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg);
1148                        synchronized (xmtRunnable) {
1149                            // The transmit thread sometimes gets stuck
1150                            // when unexpected replies are received.  Notify
1151                            // it to clear the block without a timeout.
1152                            // (do not change the current state)
1153                            //if(mCurrentState!=IDLESTATE)
1154                            xmtRunnable.notify();
1155                        }
1156                    } else {
1157                        unexpectedReplyStateError(mCurrentState, msg.toString());
1158                    }
1159                }
1160            }
1161            // Unsolicited message
1162        } else {
1163            log.debug("Unsolicited Message Received {}", msg);
1164
1165            replyInDispatch = false;
1166        }
1167    }
1168
1169    /**
1170     * Log an error message for a message received in an unexpected state.
1171     * @param State message state.
1172     * @param msgString message string.
1173     */
1174    protected void unexpectedReplyStateError(int State, String msgString) {
1175       String[] packages = this.getClass().getName().split("\\.");
1176       String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
1177                     +(packages.length>=1 ? packages[packages.length-1] :"");
1178       log.error("reply complete in unexpected state: {} was {} in class {}", State, msgString, name);
1179    }
1180
1181    /**
1182     * for testing purposes, let us be able to find out
1183     * what the last sender was.
1184     * @return last sender, mLastSender.
1185     */
1186    public AbstractMRListener getLastSender() {
1187        return mLastSender;
1188    }
1189
1190    protected void terminate() {
1191        log.debug("Cleanup Starts");
1192        if (ostream == null) {
1193            return;    // no connection established
1194        }
1195        AbstractMRMessage modeMsg = enterNormalMode();
1196        if (modeMsg != null) {
1197            modeMsg.setRetries(100); // set the number of retries
1198            // high, just in case the interface
1199            // is busy when we try to send
1200            forwardToPort(modeMsg, null);
1201            // wait for reply
1202            try {
1203                if (xmtRunnable != null) {
1204                    synchronized (xmtRunnable) {
1205                        xmtRunnable.wait(modeMsg.getTimeout());
1206                    }
1207                }
1208            } catch (InterruptedException e) {
1209                Thread.currentThread().interrupt(); // retain if needed later
1210                log.error("transmit interrupted");
1211            }
1212        }
1213    }
1214
1215    /**
1216     * Internal class to remember the Reply object and destination listener with
1217     * a reply is received.
1218     */
1219    protected static class RcvNotifier implements Runnable {
1220
1221        AbstractMRReply mMsg;
1222        AbstractMRListener mDest;
1223        AbstractMRTrafficController mTc;
1224
1225        public RcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1226                AbstractMRTrafficController pTc) {
1227            mMsg = pMsg;
1228            mDest = pDest;
1229            mTc = pTc;
1230        }
1231
1232        @Override
1233        public void run() {
1234            log.debug("Delayed rcv notify starts");
1235            mTc.notifyReply(mMsg, mDest);
1236        }
1237    } // end RcvNotifier
1238
1239    // allow creation of object outside package
1240    protected RcvNotifier newRcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1241            AbstractMRTrafficController pTc) {
1242        return new RcvNotifier(pMsg, pDest, pTc);
1243    }
1244
1245    /**
1246     * Internal class to remember the Message object and destination listener
1247     * when a message is queued for notification.
1248     */
1249    protected static class XmtNotifier implements Runnable {
1250
1251        AbstractMRMessage mMsg;
1252        AbstractMRListener mDest;
1253        AbstractMRTrafficController mTc;
1254
1255        public XmtNotifier(AbstractMRMessage pMsg, AbstractMRListener pDest,
1256                AbstractMRTrafficController pTc) {
1257            mMsg = pMsg;
1258            mDest = pDest;
1259            mTc = pTc;
1260        }
1261
1262        @Override
1263        public void run() {
1264            log.debug("Delayed xmt notify starts");
1265            mTc.notifyMessage(mMsg, mDest);
1266        }
1267    }  // end XmtNotifier
1268
1269    /**
1270     * Terminate the receive and transmit threads.
1271     * <p>
1272     * This is intended to be used only by testing subclasses.
1273     */
1274    public void terminateThreads() {
1275        threadStopRequest = true;
1276        if (xmtThread != null) {
1277            xmtThread.interrupt();
1278            try {
1279                xmtThread.join();
1280            } catch (InterruptedException ie){
1281                // interrupted during cleanup.
1282            }
1283        }
1284
1285        if (rcvThread != null) {
1286            rcvThread.interrupt();
1287            try {
1288                rcvThread.join();
1289            } catch (InterruptedException ie){
1290                // interrupted during cleanup.
1291            }
1292        }
1293        // we also need to remove the shutdown task.
1294        InstanceManager.getDefault(ShutDownManager.class).deregister(shutDownTask);
1295    }
1296
1297    /**
1298     * Flag that threads should terminate as soon as they can.
1299     */
1300    protected volatile boolean threadStopRequest = false;
1301
1302    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(AbstractMRTrafficController.class);
1303
1304}