001package jmri.jmrix;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.io.DataInputStream;
005import java.io.IOException;
006import java.io.OutputStream;
007import java.util.*;
008
009import javax.annotation.Nonnull;
010import javax.swing.SwingUtilities;
011
012import jmri.InstanceManager;
013import jmri.ShutDownManager;
014
015/**
016 * Abstract base for TrafficControllers in a Message/Reply protocol.
017 * <p>
018 * Two threads are used for the actual communication. The "Transmit" thread
019 * handles pushing characters to the port, and also changing the mode. The
020 * "Receive" thread converts characters from the input stream into replies.
021 * <p>
022 * The constructor registers a shutdown task to
023 * trigger the necessary cleanup code
024 * <p>
025 * The internal state machine handles changes of mode, automatic retry of
026 * certain messages, time outs, and sending poll messages when otherwise idle.
027 * <p>
028 * "Mode" refers to the state of the command station communications. "Normal"
029 * and "Programming" are the two modes, used if the command station requires
030 * messages to go back and forth between them. <br>
031 *
032 * <img src="doc-files/AbstractMRTrafficController-StateDiagram.png" alt="UML State diagram">
033 *
034 * <p>
035 * The key methods for the basic operation are:
036 * <ul>
037 * <li>If needed for formatting outbound messages, {@link #addHeaderToOutput(byte[], AbstractMRMessage)} and {@link #addTrailerToOutput(byte[], int, AbstractMRMessage)}
038 * <li> {@link #newReply()} creates an empty reply message (of the proper concrete type) to fill with incoming data
039 * <li>The {@link #endOfMessage(AbstractMRReply) } method is used to parse incoming messages. If it needs
040 *      information on e.g. the last message sent, that can be stored in member variables
041 *      by {@link #forwardToPort(AbstractMRMessage, AbstractMRListener)}.
042 *  <li>{@link #forwardMessage(AbstractMRListener, AbstractMRMessage)} and {@link #forwardReply(AbstractMRListener, AbstractMRReply) } handle forwarding of specific types of objects
043 * </ul>
044 * <p>
045 * If your command station requires messages to go in and out of
046 * "programming mode", those should be provided by
047 * {@link #enterProgMode()} and {@link #enterNormalMode()}.
048 * <p>
049 * If you want to poll for information when the line is otherwise idle,
050 * implement {@link #pollMessage()} and {@link #pollReplyHandler()}.
051 *
052 * @author Bob Jacobsen Copyright (C) 2003
053 * @author Paul Bender Copyright (C) 2004-2010
054 */
055
056/*
057@startuml jmri/jmrix/doc-files/AbstractMRTrafficController-StateDiagram.png
058
059    [*] --> IDLESTATE
060    IDLESTATE --> NOTIFIEDSTATE : sendMessage()
061    NOTIFIEDSTATE --> IDLESTATE : queue empty
062
063    NOTIFIEDSTATE --> WAITMSGREPLYSTATE : transmitLoop()\nwake, send message
064
065    WAITMSGREPLYSTATE --> WAITREPLYINPROGMODESTATE : transmitLoop()\nnot in PROGRAMINGMODE,\nmsg for PROGRAMINGMODE
066    WAITMSGREPLYSTATE --> WAITREPLYINNORMMODESTATE : transmitLoop()\nnot in NORMALMODE,\nmsg for NORMALMODE
067
068    WAITMSGREPLYSTATE --> NOTIFIEDSTATE : handleOneIncomingReply()
069
070    WAITREPLYINPROGMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered PROGRAMINGMODE
071    WAITREPLYINNORMMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered NORMALMODE
072    OKSENDMSGSTATE --> WAITMSGREPLYSTATE : send original pended message
073
074    IDLESTATE --> POLLSTATE : transmitLoop()\nno work
075    POLLSTATE --> WAITMSGREPLYSTATE : transmitLoop()\npoll msg exists, send it
076    POLLSTATE --> IDLESTATE : transmitLoop()\nno poll msg to send
077
078    WAITMSGREPLYSTATE --> AUTORETRYSTATE : handleOneIncomingReply()\nwhen tagged as error reply
079    AUTORETRYSTATE --> IDLESTATE : to drive a repeat of a message
080
081NOTIFIEDSTATE : Transmit thread wakes up and processes
082POLLSTATE : Transient while deciding to send poll
083OKSENDMSGSTATE : Transient while deciding to send\noriginal message after mode change
084AUTORETRYSTATE : Transient while deciding to resend auto-retry message
085WAITREPLYINPROGMODESTATE : Sent request to go to programming mode,\nwaiting reply
086WAITREPLYINNORMMODESTATE : Sent request to go to normal mode,\nwaiting reply
087WAITMSGREPLYSTATE : Have sent message, waiting a\nresponse from layout
088
089Note left of AUTORETRYSTATE : This state handles timeout of\nmessages marked for autoretry
090Note left of OKSENDMSGSTATE : Transient internal state\nwill transition when going back\nto send message that\nwas deferred for mode change.
091
092@enduml
093 */
094
095public abstract class AbstractMRTrafficController {
096
097    private final Runnable shutDownTask = this::terminate; // retain for possible removal.
098
099    /**
100     * Create a new unnamed MRTrafficController.
101     */
102    public AbstractMRTrafficController() {
103        log.debug("Creating AbstractMRTrafficController instance");
104        mCurrentMode = NORMALMODE;
105        mCurrentState = IDLESTATE;
106        allowUnexpectedReply = false;
107
108
109        // We use a shutdown task here to make sure the connection is left
110        // in a clean state prior to exiting.  This is required on systems
111        // which have a service mode to ensure we don't leave the system
112        // in an unusable state. Once the shutdown task executes, the connection
113        // must be considered permanently closed.
114
115        InstanceManager.getDefault(ShutDownManager.class).register(shutDownTask);
116    }
117
118    private boolean synchronizeRx = true;
119
120    protected void setSynchronizeRx(boolean val) {
121        synchronizeRx = val;
122    }
123
124    protected boolean getSynchronizeRx() {
125        return synchronizeRx;
126    }
127
128    // The methods to implement the abstract Interface
129
130    protected final Vector<AbstractMRListener> cmdListeners = new Vector<>();
131
132    /**
133     * Add a Listener to the Listener list.
134     * @param l The Listener to be added, not null.
135     */
136    protected synchronized void addListener(AbstractMRListener l) {
137        // add only if not already registered
138        if (l == null) {
139            throw new NullPointerException();
140        }
141        if (!cmdListeners.contains(l)) {
142            cmdListeners.addElement(l);
143        }
144    }
145
146    /**
147     * Add a Listener to start of the Listener list.
148     * Intended for use only by system Consoles which may prefer notification
149     * before other objects have processed a Message and sent a Reply.
150     * @param l The Listener to be added, not null.
151     */
152    protected synchronized void addConsoleListener(@Nonnull AbstractMRListener l){
153        // add only if not already registered
154        if (!cmdListeners.contains(l)) {
155            cmdListeners.insertElementAt(l, 0);
156        }
157    }
158
159    /**
160     * Remove a Listener from the Listener list.
161     * The Listener will receive no further notifications.
162     * @param l The Listener to be removed.
163     */
164    protected synchronized void removeListener(AbstractMRListener l) {
165        if (cmdListeners.contains(l)) {
166            cmdListeners.removeElement(l);
167        }
168    }
169
170    /**
171     * Forward a Message to registered listeners.
172     *
173     * @param m     Message to be forwarded intact
174     * @param notMe One (optional) listener to be skipped, usually because it's
175     *              the originating object.
176     */
177    @SuppressWarnings("unchecked")
178    protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) {
179        // make a copy of the listener vector to synchronized not needed for transmit
180        Vector<AbstractMRListener> v;
181        synchronized (this) {
182            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
183            v = (Vector<AbstractMRListener>) cmdListeners.clone();
184        }
185        // forward to all listeners
186        int cnt = v.size();
187        for (int i = 0; i < cnt; i++) {
188            AbstractMRListener client = v.elementAt(i);
189            if (notMe != client) {
190                log.debug("notify message, client: {}", client);
191                try {
192                    forwardMessage(client, m);
193                } catch (RuntimeException e) {
194                    log.warn("notify: During message dispatch to {}", client, e);
195                }
196            }
197        }
198    }
199
200    /**
201     * Implement this to forward a specific message type to a protocol-specific
202     * listener interface.
203     * This puts the casting into the concrete class.
204     * @param client abstract listener.
205     * @param m message to forward.
206     */
207    protected abstract void forwardMessage(AbstractMRListener client, AbstractMRMessage m);
208
209    /**
210     * Invoked if it's appropriate to do low-priority polling of the command
211     * station, this should return the next message to send, or null if the
212     * TrafficController should just sleep.
213     * @return Formatted poll message
214     */
215    protected abstract AbstractMRMessage pollMessage();
216
217    protected abstract AbstractMRListener pollReplyHandler();
218
219    protected AbstractMRListener mLastSender = null;
220
221    protected volatile int mCurrentMode;
222    public static final int NORMALMODE = 1;
223    public static final int PROGRAMINGMODE = 4;
224
225    /**
226     * Set the system to programming mode.
227     * @see #enterNormalMode()
228     *
229     * @return any message that needs to be returned to the Command Station
230     * to change modes. If no message is needed, returns null.
231     */
232    protected abstract AbstractMRMessage enterProgMode();
233
234    /**
235     * Sets the system to normal mode during programming while in IDLESTATE.
236     * If {@link #programmerIdle()} returns true, enterNormalMode() is
237     * called after a timeout.
238     * @see #enterProgMode()
239     *
240     * @return any message that needs to be returned to the Command Station
241     * to change modes. If no message is needed, returns null.
242     */
243    protected abstract AbstractMRMessage enterNormalMode();
244
245    /**
246     * Check if the programmer is idle.
247     * Override in the system specific code if necessary (see notes for
248     * {@link #enterNormalMode()}.
249     *
250     * @return true if not busy programming
251     */
252    protected boolean programmerIdle() {
253        return true;
254    }
255
256    /**
257     * Get the delay (wait time) after enabling the programming track.
258     * Override in subclass to add a longer delay.
259     *
260     * @return 0 as default delay
261     */
262    protected int enterProgModeDelayTime() {
263        return 0;
264    }
265
266    protected volatile int mCurrentState;
267    public static final int IDLESTATE = 10;        // nothing happened
268    public static final int NOTIFIEDSTATE = 15;    // xmt notified, will next wake
269    public static final int WAITMSGREPLYSTATE = 25;  // xmt has sent, await reply to message
270    public static final int WAITREPLYINPROGMODESTATE = 30;  // xmt has done mode change, await reply
271    public static final int WAITREPLYINNORMMODESTATE = 35;  // xmt has done mode change, await reply
272    public static final int OKSENDMSGSTATE = 40;        // mode change reply here, send original msg
273    public static final int AUTORETRYSTATE = 45;        // received message where automatic recovery may occur with a retransmission, re-send original msg
274    public static final int POLLSTATE = 50;   // Send program mode or poll message
275
276    protected boolean allowUnexpectedReply;
277
278    /**
279     * Set whether the command station may send messages without a request
280     * sent to it.
281     *
282     * @param expected true to allow messages without a prior request
283     */
284    protected void setAllowUnexpectedReply(boolean expected) {
285        allowUnexpectedReply = expected;
286    }
287
288    /**
289     * Forward a "Reply" from layout to registered listeners.
290     *
291     * @param r    Reply to be forwarded intact
292     * @param dest One (optional) listener to be skipped, usually because it's
293     *             the originating object.
294     */
295    @SuppressWarnings("unchecked")
296    protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) {
297        // make a copy of the listener vector to synchronized (not needed for transmit?)
298        Vector<AbstractMRListener> v;
299        synchronized (this) {
300            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
301            v = (Vector<AbstractMRListener>) cmdListeners.clone();
302        }
303        // forward to all listeners
304        int cnt = v.size();
305        for (int i = 0; i < cnt; i++) {
306            AbstractMRListener client = v.elementAt(i);
307            log.debug("notify reply, client: {}", client);
308            try {
309                //skip dest for now, we'll send the message to there last.
310                if (dest != client) {
311                    forwardReply(client, r);
312                }
313            } catch (RuntimeException e) {
314                log.warn("notify: During reply dispatch to {}", client, e);
315            }
316        }
317
318        // forward to the last listener who sent a message
319        // this is done _second_ so monitoring can have already stored the reply
320        // before a response is sent
321        if (dest != null) {
322            log.debug("notify reply, dest: {}", dest);
323            forwardReply(dest, r);
324        }
325    }
326
327    protected abstract void forwardReply(AbstractMRListener client, AbstractMRReply m);
328
329    /**
330     * Messages to be transmitted.
331     */
332    protected LinkedList<AbstractMRMessage> msgQueue = new LinkedList<>();
333    protected LinkedList<AbstractMRListener> listenerQueue = new LinkedList<>();
334
335    /**
336     * Forward message to the port. Messages are queued and then the
337     * transmission thread is notified.
338     * @see #forwardToPort(AbstractMRMessage, AbstractMRListener)
339     *
340     * @param m the message to send
341     * @param reply the Listener sending the message, often provided as 'this'
342     */
343    protected synchronized void sendMessage(AbstractMRMessage m, AbstractMRListener reply) {
344        msgQueue.addLast(m);
345        listenerQueue.addLast(reply);
346        synchronized (xmtRunnable) {
347            if (mCurrentState == IDLESTATE) {
348                mCurrentState = NOTIFIEDSTATE;
349                xmtRunnable.notify();
350            }
351        }
352        if (m != null) {
353            log.debug("just notified transmit thread with message {}", m);
354        }
355    }
356
357    /**
358     * Permanent loop for the transmit thread.
359     */
360    protected void transmitLoop() {
361        log.debug("transmitLoop starts in {}", this);
362
363        // loop forever
364        while (!connectionError && !threadStopRequest) {
365            AbstractMRMessage m = null;
366            AbstractMRListener l = null;
367            // check for something to do
368            synchronized (this) {
369                if (!msgQueue.isEmpty()) {
370                    // yes, something to do
371                    m = msgQueue.getFirst();
372                    msgQueue.removeFirst();
373                    l = listenerQueue.getFirst();
374                    listenerQueue.removeFirst();
375                    mCurrentState = WAITMSGREPLYSTATE;
376                    log.debug("transmit loop has something to do: {}", m);
377                }  // release lock here to proceed in parallel
378            }
379            // if a message has been extracted, process it
380            if (m != null) {
381                // check for need to change mode
382                log.debug("Start msg, state = {}", mCurrentMode);
383                if (m.getNeededMode() != mCurrentMode) {
384                    AbstractMRMessage modeMsg;
385                    if (m.getNeededMode() == PROGRAMINGMODE) {
386                        // change state to programming mode and send message
387                        modeMsg = enterProgMode();
388                        if (modeMsg != null) {
389                            mCurrentState = WAITREPLYINPROGMODESTATE;
390                            log.debug("Enter Programming Mode");
391                            forwardToPort(modeMsg, null);
392                            // wait for reply
393                            transmitWait(m.getTimeout(), WAITREPLYINPROGMODESTATE, "enter programming mode interrupted");
394                        }
395                    } else {
396                        // change state to normal and send message
397                        modeMsg = enterNormalMode();
398                        if (modeMsg != null) {
399                            mCurrentState = WAITREPLYINNORMMODESTATE;
400                            log.debug("Enter Normal Mode");
401                            forwardToPort(modeMsg, null);
402                            // wait for reply
403                            transmitWait(m.getTimeout(), WAITREPLYINNORMMODESTATE, "enter normal mode interrupted");
404                        }
405                    }
406                    if (modeMsg != null) {
407                        checkReplyInDispatch();
408                        if (mCurrentState != OKSENDMSGSTATE) {
409                            handleTimeout(modeMsg, l);
410                        }
411                        mCurrentState = WAITMSGREPLYSTATE;
412                    } else {
413                        // no mode message required, but the message
414                        // needs a different mode
415                        log.debug("Setting mode to: {}", m.getNeededMode());
416                        mCurrentMode = m.getNeededMode();
417                    }
418                }
419                forwardToPort(m, l);
420                // reply expected?
421                if (m.replyExpected()) {
422                    log.debug("reply expected is true for message {}",m);
423                    // wait for a reply, or eventually timeout
424                    transmitWait(m.getTimeout(), WAITMSGREPLYSTATE, "transmitLoop interrupted");
425                    checkReplyInDispatch();
426                    if (mCurrentState == WAITMSGREPLYSTATE) {
427                        handleTimeout(m, l);
428                    } else if (mCurrentState == AUTORETRYSTATE) {
429                        log.info("Message added back to queue: {}", m);
430                        msgQueue.addFirst(m);
431                        listenerQueue.addFirst(l);
432                        synchronized (xmtRunnable) {
433                            mCurrentState = IDLESTATE;
434                        }
435                    } else {
436                        resetTimeout(m);
437                    }
438                } // just continue to the next message from here
439            } else {
440                // nothing to do
441                if (mCurrentState != IDLESTATE) {
442                    log.debug("Setting IDLESTATE");
443                    log.debug("Current Mode {}", mCurrentMode);
444                    mCurrentState = IDLESTATE;
445                }
446                // wait for something to send
447                if (mWaitBeforePoll > waitTimePoll || mCurrentMode == PROGRAMINGMODE) {
448                    try {
449                        long startTime = Calendar.getInstance().getTimeInMillis();
450                        synchronized (xmtRunnable) {
451                            xmtRunnable.wait(mWaitBeforePoll);
452                        }
453                        long endTime = Calendar.getInstance().getTimeInMillis();
454                        waitTimePoll = waitTimePoll + endTime - startTime;
455                    } catch (InterruptedException e) {
456                        Thread.currentThread().interrupt(); // retain if needed later
457                        // end of transmit loop
458                        break;
459                    }
460                }
461                // once we decide that mCurrentState is in the IDLESTATE and there's an xmt msg we must guarantee
462                // the change of mCurrentState to one of the waiting for reply states.  Therefore we need to synchronize.
463                synchronized (this) {
464                    if (mCurrentState != NOTIFIEDSTATE && mCurrentState != IDLESTATE) {
465                        log.error("left timeout in unexpected state: {}", mCurrentState);
466                    }
467                    if (mCurrentState == IDLESTATE) {
468                        mCurrentState = POLLSTATE; // this prevents other transitions from the IDLESTATE
469                    }
470                }
471                // went around with nothing to do; leave programming state if in it
472                if (mCurrentMode == PROGRAMINGMODE) {
473                    log.debug("Timeout - in service mode");
474                }
475                if (mCurrentState == POLLSTATE && mCurrentMode == PROGRAMINGMODE && programmerIdle()) {
476                    log.debug("timeout causes leaving programming mode");
477                    mCurrentState = WAITREPLYINNORMMODESTATE;
478                    AbstractMRMessage msg = enterNormalMode();
479                    // if the enterNormalMode() message is null, we
480                    // don't want to try to send it to the port.
481                    if (msg != null) {
482                        forwardToPort(msg, null);
483                        // wait for reply
484                        transmitWait(msg.getTimeout(), WAITREPLYINNORMMODESTATE, "interrupted while leaving programming mode");
485                        checkReplyInDispatch();
486                        // exit program mode timeout?
487                        if (mCurrentState == WAITREPLYINNORMMODESTATE) {
488                            // entering normal mode via timeout
489                            handleTimeout(msg, l);
490                            mCurrentMode = NORMALMODE;
491                        }
492                        // and go around again
493                    }
494                } else if (mCurrentState == POLLSTATE && mCurrentMode == NORMALMODE) {
495                    // We may need to poll
496                    AbstractMRMessage msg = pollMessage();
497                    if (msg != null) {
498                        // yes, send that
499                        log.debug("Sending poll, wait time {}", waitTimePoll);
500                        mCurrentState = WAITMSGREPLYSTATE;
501                        forwardToPort(msg, pollReplyHandler());
502                        // wait for reply
503                        log.debug("Still waiting for reply");
504                        transmitWait(msg.getTimeout(), WAITMSGREPLYSTATE, "interrupted while waiting poll reply");
505                        checkReplyInDispatch();
506                        // and go around again
507                        if (mCurrentState == WAITMSGREPLYSTATE) {
508                            handleTimeout(msg, l);
509                        } else {
510                            resetTimeout(msg);
511                        }
512                    }
513                    waitTimePoll = 0;
514                }
515                // no messages, so back to idle
516                if (mCurrentState == POLLSTATE) {
517                    mCurrentState = IDLESTATE;
518                }
519            }
520        }
521    }   // end of transmit loop; go around again
522
523    protected void transmitWait(int waitTime, int state, String interruptMessage) {
524        // wait() can have spurious wakeup!
525        // so we protect by making sure the entire timeout time is used
526        long currentTime = Calendar.getInstance().getTimeInMillis();
527        long endTime = currentTime + waitTime;
528        while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) {
529            long wait = endTime - currentTime;
530            try {
531                synchronized (xmtRunnable) {
532                    // Do not wait if the current state has changed since we
533                    // last set it.
534                    if (mCurrentState != state) {
535                        return;
536                    }
537                    xmtRunnable.wait(wait); // rcvr normally ends this w state change
538                }
539            } catch (InterruptedException e) {
540                Thread.currentThread().interrupt(); // retain if needed later
541                String[] packages = this.getClass().getName().split("\\.");
542                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
543                        +(packages.length>=1 ? packages[packages.length-1] :"");
544                if (!threadStopRequest) {
545                    log.error("{} in transmitWait(..) of {}", interruptMessage, name);
546                } else {
547                    log.debug("during shutdown, {}  in transmitWait(..) of {}", interruptMessage, name);
548                }
549            }
550        }
551        log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState);
552    }
553
554    // Dispatch control and timer
555    protected boolean replyInDispatch = false;          // true when reply has been received but dispatch not completed
556    private int maxDispatchTime = 0;
557    private int warningMessageTime = DISPATCH_WARNING_TIME;
558    private static final int DISPATCH_WAIT_INTERVAL = 100;
559    private static final int DISPATCH_WARNING_TIME = 12000; // report warning when max dispatch time exceeded
560    private static final int WARN_NEXT_TIME = 1000;         // report every second
561
562    private void checkReplyInDispatch() {
563        int loopCount = 0;
564        while (replyInDispatch) {
565            try {
566                synchronized (xmtRunnable) {
567                    xmtRunnable.wait(DISPATCH_WAIT_INTERVAL);
568                }
569            } catch (InterruptedException e) {
570                Thread.currentThread().interrupt(); // retain if needed later
571                if (threadStopRequest) return; // don't log an error if closing.
572                String[] packages = this.getClass().getName().split("\\.");
573                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
574                        +(packages.length>=1 ? packages[packages.length-1] :"");
575                log.error("transmitLoop interrupted in class {}", name);
576            }
577            loopCount++;
578            int currentDispatchTime = loopCount * DISPATCH_WAIT_INTERVAL;
579            if (currentDispatchTime > maxDispatchTime) {
580                maxDispatchTime = currentDispatchTime;
581                if (currentDispatchTime >= warningMessageTime) {
582                    warningMessageTime = warningMessageTime + WARN_NEXT_TIME;
583                    log.debug("Max dispatch time is now {}", currentDispatchTime);
584                }
585            }
586        }
587    }
588
589    /**
590     *  Determine if the interface is down.
591     *
592     *  @return timeoutFlag
593     */
594    public boolean hasTimeouts() {
595        return timeoutFlag;
596    }
597
598    protected boolean timeoutFlag = false;
599    protected int timeouts = 0;
600    protected boolean flushReceiveChars = false;
601
602    protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) {
603        //log.debug("Timeout mCurrentState: {}", mCurrentState);
604        warnOnTimeout(msg, l);
605        timeouts++;
606        timeoutFlag = true;
607        flushReceiveChars = true;
608    }
609
610    protected void warnOnTimeout(AbstractMRMessage msg, AbstractMRListener l) {
611        String[] packages = this.getClass().getName().split("\\.");
612        String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
613                +(packages.length>=1 ? packages[packages.length-1] :"");
614
615        log.warn("Timeout on reply to message: {} consecutive timeouts = {} in {}", msg, timeouts, name);
616    }
617    
618    protected void resetTimeout(AbstractMRMessage msg) {
619        if (timeouts > 0) {
620            log.debug("Reset timeout after {} timeouts", timeouts);
621        }
622        timeouts = 0;
623        timeoutFlag = false;
624    }
625
626    /**
627     * Add header to the outgoing byte stream.
628     *
629     * @param msg the output byte stream
630     * @param m Message results
631     * @return next location in the stream to fill
632     */
633    protected int addHeaderToOutput(byte[] msg, AbstractMRMessage m) {
634        return 0;
635    }
636
637    protected int mWaitBeforePoll = 100;
638    protected long waitTimePoll = 0;
639
640    /**
641     * Add trailer to the outgoing byte stream.
642     *
643     * @param msg    the output byte stream
644     * @param offset the first byte not yet used
645     * @param m   output message to extend
646     */
647    protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) {
648        if (!m.isBinary()) {
649            msg[offset] = 0x0d;
650        }
651    }
652
653    /**
654     * Determine how many bytes the entire message will take, including
655     * space for header and trailer.
656     *
657     * @param m the message to be sent
658     * @return number of bytes
659     */
660    protected int lengthOfByteStream(AbstractMRMessage m) {
661        int len = m.getNumDataElements();
662        int cr = 0;
663        if (!m.isBinary()) {
664            cr = 1;  // space for return char
665        }
666        return len + cr;
667    }
668
669    protected boolean xmtException = false;
670
671    /**
672     * Actually transmit the next message to the port.
673     * @see #sendMessage(AbstractMRMessage, AbstractMRListener)
674     *
675     * @param m the message to send
676     * @param reply the Listener sending the message, often provided as 'this'
677     */
678    @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT"},
679            justification = "Two locks needed for synchronization here, this is OK")
680    protected synchronized void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) {
681        log.debug("forwardToPort message: [{}]", m);
682        // remember who sent this
683        mLastSender = reply;
684
685        // forward the message to the registered recipients,
686        // which includes the communications monitor, except the sender.
687        // Schedule notification via the Swing event queue to ensure order
688        log.trace("about to start XmtNotifier for {} last: {}", m, mLastSender, new Exception("traceback"));
689        Runnable r = new XmtNotifier(m, mLastSender, this);
690        SwingUtilities.invokeLater(r);
691
692        // stream to port in single write, as that's needed by serial
693        int byteLength = lengthOfByteStream(m);
694        byte[]  msg= new byte[byteLength];
695        log.debug("copying message, length = {}", byteLength);
696        // add header
697        int offset = addHeaderToOutput(msg, m);
698
699        // add data content
700        int len = m.getNumDataElements();
701        log.debug("copying data to message, length = {}", len);
702        if (len > byteLength) { // happens somehow
703            log.warn("Invalid message array size {} for {} elements, truncated", byteLength, len);
704        }
705        for (int i = 0; (i < len && i < byteLength); i++) {
706            msg[i + offset] = (byte) m.getElement(i);
707        }
708        // add trailer
709        addTrailerToOutput(msg, len + offset, m);
710        // and stream the bytes
711        try {
712            if (ostream != null) {
713                if (log.isDebugEnabled()) {
714                    StringBuilder f = new StringBuilder();
715                    for (int i = 0; i < msg.length; i++) {
716                        f.append(String.format("%02X ",0xFF & msg[i]));
717                    }
718                    log.debug("formatted message: {}", f.toString() );
719                }
720                while (m.getRetries() >= 0) {
721                    if (portReadyToSend(controller)) {
722                        ostream.write(msg);
723                        ostream.flush();
724                        log.debug("written, msg timeout: {} mSec", m.getTimeout());
725                        break;
726                    } else if (m.getRetries() >= 0) {
727                        log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries());
728                        m.setRetries(m.getRetries() - 1);
729                        try {
730                            synchronized (xmtRunnable) {
731                                xmtRunnable.wait(m.getTimeout());
732                            }
733                        } catch (InterruptedException e) {
734                            Thread.currentThread().interrupt(); // retain if needed later
735                            log.error("retry wait interrupted");
736                        }
737                    } else {
738                        log.warn("sendMessage: port not ready for data sending: {}", Arrays.toString(msg));
739                    }
740                }
741            } else {  // ostream is null
742                // no stream connected
743                connectionWarn();
744            }
745        } catch (IOException | RuntimeException e) {
746            // TODO Currently there's no port recovery if an exception occurs
747            // must restart JMRI to clear xmtException.
748            xmtException = true;
749            portWarn(e);
750        }
751    }
752
753    protected void connectionWarn() {
754        log.warn("sendMessage: no connection established for {}", this.getClass().getName(), new Exception());
755    }
756
757    protected void portWarn(Exception e) {
758        log.warn("sendMessage: Exception: In {} port warn: ", this.getClass().getName(), e);
759    }
760
761    protected boolean connectionError = false;
762
763    protected void portWarnTCP(Exception e) {
764        log.warn("Exception java net: ", e);
765        connectionError = true;
766    }
767    // methods to connect/disconnect to a source of data in an AbstractPortController
768
769    public AbstractPortController controller = null;
770
771    public boolean status() {
772        return (ostream != null && istream != null);
773    }
774
775    protected volatile Thread xmtThread = null;
776    protected volatile Thread rcvThread = null;
777
778    protected volatile Runnable xmtRunnable = null;
779
780    /**
781     * Make connection to an existing PortController object.
782     *
783     * @param p the PortController
784     */
785    public void connectPort(AbstractPortController p) {
786        rcvException = false;
787        connectionError = false;
788        xmtException = false;
789        threadStopRequest = false;
790        try {
791            istream = p.getInputStream();
792            ostream = p.getOutputStream();
793            if (controller != null) {
794                log.warn("connectPort: connect called while connected");
795            } else {
796                log.debug("connectPort invoked");
797            }
798            controller = p;
799            // and start threads
800            xmtThread = jmri.util.ThreadingUtil.newThread(
801                xmtRunnable = new Runnable() {
802                    @Override
803                    public void run() {
804                        try {
805                            transmitLoop();
806                        } catch (ThreadDeath td) {
807                            if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", td, td);
808                            // ThreadDeath must be thrown per Java API Javadocs
809                            throw td;
810                        } catch (Throwable e) {
811                            if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", e, e);
812                        }
813                }
814            });
815
816            String[] packages = this.getClass().getName().split("\\.");
817            xmtThread.setName(
818                (packages.length>=2 ? packages[packages.length-2]+"." :"")
819                +(packages.length>=1 ? packages[packages.length-1] :"")
820                +" Transmit thread");
821
822            xmtThread.setDaemon(true);
823            xmtThread.setPriority(Thread.MAX_PRIORITY-1);      //bump up the priority
824            xmtThread.start();
825
826            rcvThread = jmri.util.ThreadingUtil.newThread(
827                new Runnable() {
828                    @Override
829                    public void run() {
830                        receiveLoop();
831                    }
832                });
833            rcvThread.setName(
834                (packages.length>=2 ? packages[packages.length-2]+"." :"")
835                +(packages.length>=1 ? packages[packages.length-1] :"")
836                +" Receive thread");
837
838            rcvThread.setPriority(Thread.MAX_PRIORITY);      //bump up the priority
839            rcvThread.setDaemon(true);
840            rcvThread.start();
841
842        } catch (RuntimeException e) {
843            log.error("Failed to start up communications. Error was: ", e);
844            log.debug("Full trace:", e);
845        }
846    }
847
848    /**
849     * Get the port name for this connection from the TrafficController.
850     *
851     * @return the name of the port
852     */
853    public String getPortName() {
854        return controller.getCurrentPortName();
855    }
856
857    /**
858     * Break connection to existing PortController object. Once broken, attempts
859     * to send via "message" member will fail.
860     *
861     * @param p the PortController
862     */
863    public void disconnectPort(AbstractPortController p) {
864        istream = null;
865        ostream = null;
866        if (controller != p) {
867            log.warn("disconnectPort: disconnect called from non-connected AbstractPortController");
868        }
869        controller = null;
870        threadStopRequest = true;
871    }
872
873    /**
874     * Check if PortController object can be sent to.
875     *
876     * @param p the PortController
877     * @return true if ready, false otherwise May throw an Exception.
878     */
879    public boolean portReadyToSend(AbstractPortController p) {
880        if (p != null && !xmtException && !rcvException) {
881            return true;
882        } else {
883            return false;
884        }
885    }
886
887    // data members to hold the streams
888    protected DataInputStream istream = null;
889    protected OutputStream ostream = null;
890
891    protected boolean rcvException = false;
892
893    protected int maxRcvExceptionCount = 100;
894
895    /**
896     * Handle incoming characters. This is a permanent loop, looking for input
897     * messages in character form on the stream connected to the PortController
898     * via {@link #connectPort(AbstractPortController)}.
899     * <p>
900     * Each turn of the loop is the receipt of a single message.
901     */
902    public void receiveLoop() {
903        log.debug("receiveLoop starts in {}", this);
904        int errorCount = 0;
905        while (errorCount < maxRcvExceptionCount && !threadStopRequest) { // stream close will exit via exception
906            try {
907                handleOneIncomingReply();
908                errorCount = 0;
909            } catch (java.io.InterruptedIOException e) {
910                // related to InterruptedException, catch first
911                break;
912            } catch (IOException e) {
913                rcvException = true;
914                reportReceiveLoopException(e);
915                break;
916            } catch (RuntimeException e1) {
917                log.error("Exception in receive loop: {}", e1.toString(), e1);
918                errorCount++;
919                if (errorCount == maxRcvExceptionCount) {
920                    rcvException = true;
921                    reportReceiveLoopException(e1);
922                }
923            }
924        }
925        if (!threadStopRequest) { // if e.g. unexpected end
926            ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), ConnectionStatus.CONNECTION_DOWN);
927            log.debug("Exit from rcv loop in {}", this.getClass());
928            log.info("Exiting receive loop");
929            recovery(); // see if you can restart
930        }
931    }
932
933    /**
934     * Disconnect and reset the current PortController.
935     * Invoked at abnormal ending of receiveLoop.
936     */
937    protected final void recovery() {
938        AbstractPortController adapter = controller;
939        disconnectPort(controller);
940        adapter.recover();
941    }
942
943    /**
944     * Report an error on the receive loop. Separated so tests can suppress, even
945     * though message is asynchronous.
946     * @param e Exception encountered at lower level to trigger error, or null
947     */
948    protected void reportReceiveLoopException(Exception e) {
949        log.error("run: Exception: {} in {}", e.toString(), this.getClass().toString(), e);
950        jmri.jmrix.ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), jmri.jmrix.ConnectionStatus.CONNECTION_DOWN);
951        if (controller instanceof AbstractNetworkPortController) {
952            portWarnTCP(e);
953        }
954    }
955
956    protected abstract AbstractMRReply newReply();
957
958    protected abstract boolean endOfMessage(AbstractMRReply r);
959
960    /**
961     * Dummy routine, to be filled by protocols that have to skip some
962     * start-of-message characters.
963     * @param istream input source
964     * @throws IOException from underlying operations
965     */
966    protected void waitForStartOfReply(DataInputStream istream) throws IOException {
967    }
968
969    /**
970     * Read a single byte, protecting against various timeouts, etc.
971     * <p>
972     * When a port is set to have a receive timeout, some will return
973     * zero bytes or an EOFException at the end of the timeout. In that case, the read
974     * should be repeated to get the next real character.
975     *
976     * @param istream stream to read
977     * @return the byte read
978     * @throws java.io.IOException if unable to read
979     */
980    protected byte readByteProtected(DataInputStream istream) throws IOException {
981        if (istream == null) {
982            throw new IOException("Input Stream NULL when reading");
983        }
984        while (true) { // loop will repeat until character found
985            int nchars;
986            nchars = istream.read(rcvBuffer, 0, 1);
987            if (nchars == -1) {
988                // No more bytes can be read from the channel
989                throw new IOException("Connection not terminated normally");
990            }
991            if (nchars > 0) {
992                return rcvBuffer[0];
993            }
994        }
995    }
996
997    // Defined this way to reduce new object creation
998    private byte[] rcvBuffer = new byte[1];
999
1000    /**
1001     * Get characters from the input source, and file a message.
1002     * <p>
1003     * Returns only when the message is complete.
1004     * <p>
1005     * Only used in the Receive thread.
1006     * <p>
1007     * Handles timeouts on read by ignoring zero-length reads.
1008     *
1009     * @param msg     message to fill
1010     * @param istream character source.
1011     * @throws IOException when presented by the input source.
1012     */
1013    protected void loadChars(AbstractMRReply msg, DataInputStream istream)
1014            throws IOException {
1015        int i;
1016        for (i = 0; i < msg.maxSize(); i++) {
1017            byte char1 = readByteProtected(istream);
1018            log.trace("char: {} i: {}",(char1&0xFF),i);
1019            // if there was a timeout, flush any char received and start over
1020            if (flushReceiveChars) {
1021                log.warn("timeout flushes receive buffer: {}", msg);
1022                msg.flush();
1023                i = 0;  // restart
1024                flushReceiveChars = false;
1025            }
1026            if (canReceive()) {
1027                msg.setElement(i, char1);
1028                if (endOfMessage(msg)) {
1029                    break;
1030                }
1031            } else {
1032                i--; // flush char
1033                log.error("unsolicited character received: {}", Integer.toHexString(char1));
1034            }
1035        }
1036    }
1037
1038    /**
1039     * Override in the system specific code if necessary
1040     *
1041     * @return true if it is okay to buffer receive characters into a reply
1042     *         message. When false, discard char received
1043     */
1044    protected boolean canReceive() {
1045        return true;
1046    }
1047
1048    private int retransmitCount = 0;
1049
1050    /**
1051     * Executes a reply distribution action on the appropriate thread for JMRI.
1052     * @param r a runnable typically encapsulating a MRReply and the iteration code needed to
1053     *          send it to all the listeners.
1054     */
1055    protected void distributeReply(Runnable r) {
1056        try {
1057            if (synchronizeRx) {
1058                SwingUtilities.invokeAndWait(r);
1059            } else {
1060                SwingUtilities.invokeLater(r);
1061            }
1062        } catch (InterruptedException ie) {
1063            if (threadStopRequest) return;
1064            log.error("Unexpected exception in invokeAndWait: {}{}", ie, ie.toString());
1065        } catch (java.lang.reflect.InvocationTargetException| RuntimeException e) {
1066            log.error("Unexpected exception in invokeAndWait: {}{}", e, e.toString());
1067            return;
1068        }
1069        log.debug("dispatch thread invoked");
1070    }
1071
1072    /**
1073     * Handle each reply when complete.
1074     * <p>
1075     * (This is public for testing purposes) Runs in the "Receive" thread.
1076     *
1077     * @throws java.io.IOException on error.
1078     */
1079    public void handleOneIncomingReply() throws IOException {
1080        // we sit in this until the message is complete, relying on
1081        // threading to let other stuff happen
1082
1083        // Create message off the right concrete class
1084        AbstractMRReply msg = newReply();
1085
1086        // wait for start if needed
1087        waitForStartOfReply(istream);
1088
1089        // message exists, now fill it
1090        loadChars(msg, istream);
1091
1092        if (threadStopRequest) return;
1093
1094        // message is complete, dispatch it !!
1095        replyInDispatch = true;
1096        log.debug("dispatch reply of length {} contains \"{}\", state {}", msg.getNumDataElements(), msg, mCurrentState);
1097
1098        // forward the message to the registered recipients,
1099        // which includes the communications monitor
1100        // return a notification via the Swing event queue to ensure proper thread
1101        Runnable r = new RcvNotifier(msg, mLastSender, this);
1102        distributeReply(r);
1103
1104        if (!msg.isUnsolicited()) {
1105            // effect on transmit:
1106            switch (mCurrentState) {
1107                case WAITMSGREPLYSTATE: {
1108                    // check to see if the response was an error message we want
1109                    // to automatically handle by re-queueing the last sent
1110                    // message, otherwise go on to the next message
1111                    if (msg.isRetransmittableErrorMsg()) {
1112                        log.error("Automatic Recovery from Error Message: {}.  Retransmitted {} times.", msg, retransmitCount);
1113                        synchronized (xmtRunnable) {
1114                            mCurrentState = AUTORETRYSTATE;
1115                            if (retransmitCount > 0) {
1116                                try {
1117                                    xmtRunnable.wait(retransmitCount * 100L);
1118                                } catch (InterruptedException e) {
1119                                    Thread.currentThread().interrupt(); // retain if needed later
1120                                }
1121                            }
1122                            replyInDispatch = false;
1123                            xmtRunnable.notify();
1124                            retransmitCount++;
1125                        }
1126                    } else {
1127                        // update state, and notify to continue
1128                        synchronized (xmtRunnable) {
1129                            mCurrentState = NOTIFIEDSTATE;
1130                            replyInDispatch = false;
1131                            xmtRunnable.notify();
1132                            retransmitCount = 0;
1133                        }
1134                    }
1135                    break;
1136                }
1137                case WAITREPLYINPROGMODESTATE: {
1138                    // entering programming mode
1139                    mCurrentMode = PROGRAMINGMODE;
1140                    replyInDispatch = false;
1141
1142                    // check to see if we need to delay to allow decoders to become
1143                    // responsive
1144                    int warmUpDelay = enterProgModeDelayTime();
1145                    if (warmUpDelay != 0) {
1146                        try {
1147                            synchronized (xmtRunnable) {
1148                                xmtRunnable.wait(warmUpDelay);
1149                            }
1150                        } catch (InterruptedException e) {
1151                            Thread.currentThread().interrupt(); // retain if needed later
1152                        }
1153                    }
1154                    // update state, and notify to continue
1155                    synchronized (xmtRunnable) {
1156                        mCurrentState = OKSENDMSGSTATE;
1157                        xmtRunnable.notify();
1158                    }
1159                    break;
1160                }
1161                case WAITREPLYINNORMMODESTATE: {
1162                    // entering normal mode
1163                    mCurrentMode = NORMALMODE;
1164                    replyInDispatch = false;
1165                    // update state, and notify to continue
1166                    synchronized (xmtRunnable) {
1167                        mCurrentState = OKSENDMSGSTATE;
1168                        xmtRunnable.notify();
1169                    }
1170                    break;
1171                }
1172                default: {
1173                    replyInDispatch = false;
1174                    if (allowUnexpectedReply) {
1175                        log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg);
1176                        synchronized (xmtRunnable) {
1177                            // The transmit thread sometimes gets stuck
1178                            // when unexpected replies are received.  Notify
1179                            // it to clear the block without a timeout.
1180                            // (do not change the current state)
1181                            //if(mCurrentState!=IDLESTATE)
1182                            xmtRunnable.notify();
1183                        }
1184                    } else {
1185                        unexpectedReplyStateError(mCurrentState, msg.toString());
1186                    }
1187                }
1188            }
1189            // Unsolicited message
1190        } else {
1191            log.debug("Unsolicited Message Received {}", msg);
1192
1193            replyInDispatch = false;
1194        }
1195    }
1196
1197    /**
1198     * Log an error message for a message received in an unexpected state.
1199     * @param State message state.
1200     * @param msgString message string.
1201     */
1202    protected void unexpectedReplyStateError(int State, String msgString) {
1203       String[] packages = this.getClass().getName().split("\\.");
1204       String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
1205                     +(packages.length>=1 ? packages[packages.length-1] :"");
1206       log.error("reply complete in unexpected state: {} was {} in class {}", State, msgString, name);
1207    }
1208
1209    /**
1210     * for testing purposes, let us be able to find out
1211     * what the last sender was.
1212     * @return last sender, mLastSender.
1213     */
1214    public AbstractMRListener getLastSender() {
1215        return mLastSender;
1216    }
1217
1218    protected void terminate() {
1219        log.debug("Cleanup Starts");
1220        if (ostream == null) {
1221            return;    // no connection established
1222        }
1223        AbstractMRMessage modeMsg = enterNormalMode();
1224        if (modeMsg != null) {
1225            modeMsg.setRetries(100); // set the number of retries
1226            // high, just in case the interface
1227            // is busy when we try to send
1228            forwardToPort(modeMsg, null);
1229            // wait for reply
1230            try {
1231                if (xmtRunnable != null) {
1232                    synchronized (xmtRunnable) {
1233                        xmtRunnable.wait(modeMsg.getTimeout());
1234                    }
1235                }
1236            } catch (InterruptedException e) {
1237                Thread.currentThread().interrupt(); // retain if needed later
1238                log.error("transmit interrupted");
1239            }
1240        }
1241    }
1242
1243    /**
1244     * Internal class to remember the Reply object and destination listener with
1245     * a reply is received.
1246     */
1247    protected static class RcvNotifier implements Runnable {
1248
1249        AbstractMRReply mMsg;
1250        AbstractMRListener mDest;
1251        AbstractMRTrafficController mTc;
1252
1253        public RcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1254                AbstractMRTrafficController pTc) {
1255            mMsg = pMsg;
1256            mDest = pDest;
1257            mTc = pTc;
1258        }
1259
1260        @Override
1261        public void run() {
1262            log.debug("Delayed rcv notify starts");
1263            mTc.notifyReply(mMsg, mDest);
1264        }
1265    } // end RcvNotifier
1266
1267    // allow creation of object outside package
1268    protected RcvNotifier newRcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1269            AbstractMRTrafficController pTc) {
1270        return new RcvNotifier(pMsg, pDest, pTc);
1271    }
1272
1273    /**
1274     * Internal class to remember the Message object and destination listener
1275     * when a message is queued for notification.
1276     */
1277    protected static class XmtNotifier implements Runnable {
1278
1279        AbstractMRMessage mMsg;
1280        AbstractMRListener mDest;
1281        AbstractMRTrafficController mTc;
1282
1283        public XmtNotifier(AbstractMRMessage pMsg, AbstractMRListener pDest,
1284                AbstractMRTrafficController pTc) {
1285            mMsg = pMsg;
1286            mDest = pDest;
1287            mTc = pTc;
1288        }
1289
1290        @Override
1291        public void run() {
1292            log.debug("Delayed xmt notify starts");
1293            mTc.notifyMessage(mMsg, mDest);
1294        }
1295    }  // end XmtNotifier
1296
1297    /**
1298     * Terminate the receive and transmit threads.
1299     * <p>
1300     * This is intended to be used only by testing subclasses.
1301     */
1302    public void terminateThreads() {
1303        threadStopRequest = true;
1304        if (xmtThread != null) {
1305            xmtThread.interrupt();
1306            try {
1307                xmtThread.join();
1308            } catch (InterruptedException ie){
1309                // interrupted during cleanup.
1310            }
1311        }
1312
1313        if (rcvThread != null) {
1314            rcvThread.interrupt();
1315            try {
1316                rcvThread.join();
1317            } catch (InterruptedException ie){
1318                // interrupted during cleanup.
1319            }
1320        }
1321        // we also need to remove the shutdown task.
1322        InstanceManager.getDefault(ShutDownManager.class).deregister(shutDownTask);
1323    }
1324
1325    /**
1326     * Flag that threads should terminate as soon as they can.
1327     */
1328    protected volatile boolean threadStopRequest = false;
1329
1330    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(AbstractMRTrafficController.class);
1331
1332}