001package jmri.jmrix;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.io.DataInputStream;
005import java.io.IOException;
006import java.io.OutputStream;
007import java.util.*;
008
009import javax.annotation.Nonnull;
010import javax.swing.SwingUtilities;
011
012import jmri.InstanceManager;
013import jmri.ShutDownManager;
014
015/**
016 * Abstract base for TrafficControllers in a Message/Reply protocol.
017 * <p>
018 * Two threads are used for the actual communication. The "Transmit" thread
019 * handles pushing characters to the port, and also changing the mode. The
020 * "Receive" thread converts characters from the input stream into replies.
021 * <p>
022 * The constructor registers a shutdown task to
023 * trigger the necessary cleanup code
024 * <p>
025 * The internal state machine handles changes of mode, automatic retry of
026 * certain messages, time outs, and sending poll messages when otherwise idle.
027 * <p>
028 * "Mode" refers to the state of the command station communications. "Normal"
029 * and "Programming" are the two modes, used if the command station requires
030 * messages to go back and forth between them. <br>
031 *
032 * <img src="doc-files/AbstractMRTrafficController-StateDiagram.png" alt="UML State diagram">
033 *
034 * <p>
035 * The key methods for the basic operation are:
036 * <ul>
037 * <li>If needed for formatting outbound messages, {@link #addHeaderToOutput(byte[], AbstractMRMessage)} and {@link #addTrailerToOutput(byte[], int, AbstractMRMessage)}
038 * <li> {@link #newReply()} creates an empty reply message (of the proper concrete type) to fill with incoming data
039 * <li>The {@link #endOfMessage(AbstractMRReply) } method is used to parse incoming messages. If it needs
040 *      information on e.g. the last message sent, that can be stored in member variables
041 *      by {@link #forwardToPort(AbstractMRMessage, AbstractMRListener)}.
042 *  <li>{@link #forwardMessage(AbstractMRListener, AbstractMRMessage)} and {@link #forwardReply(AbstractMRListener, AbstractMRReply) } handle forwarding of specific types of objects
043 * </ul>
044 * <p>
045 * If your command station requires messages to go in and out of
046 * "programming mode", those should be provided by
047 * {@link #enterProgMode()} and {@link #enterNormalMode()}.
048 * <p>
049 * If you want to poll for information when the line is otherwise idle,
050 * implement {@link #pollMessage()} and {@link #pollReplyHandler()}.
051 *
052 * @author Bob Jacobsen Copyright (C) 2003
053 * @author Paul Bender Copyright (C) 2004-2010
054 */
055
056/*
057@startuml jmri/jmrix/doc-files/AbstractMRTrafficController-StateDiagram.png
058
059    [*] --> IDLESTATE
060    IDLESTATE --> NOTIFIEDSTATE : sendMessage()
061    NOTIFIEDSTATE --> IDLESTATE : queue empty
062
063    NOTIFIEDSTATE --> WAITMSGREPLYSTATE : transmitLoop()\nwake, send message
064
065    WAITMSGREPLYSTATE --> WAITREPLYINPROGMODESTATE : transmitLoop()\nnot in PROGRAMINGMODE,\nmsg for PROGRAMINGMODE
066    WAITMSGREPLYSTATE --> WAITREPLYINNORMMODESTATE : transmitLoop()\nnot in NORMALMODE,\nmsg for NORMALMODE
067
068    WAITMSGREPLYSTATE --> NOTIFIEDSTATE : handleOneIncomingReply()
069
070    WAITREPLYINPROGMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered PROGRAMINGMODE
071    WAITREPLYINNORMMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered NORMALMODE
072    OKSENDMSGSTATE --> WAITMSGREPLYSTATE : send original pended message
073
074    IDLESTATE --> POLLSTATE : transmitLoop()\nno work
075    POLLSTATE --> WAITMSGREPLYSTATE : transmitLoop()\npoll msg exists, send it
076    POLLSTATE --> IDLESTATE : transmitLoop()\nno poll msg to send
077
078    WAITMSGREPLYSTATE --> AUTORETRYSTATE : handleOneIncomingReply()\nwhen tagged as error reply
079    AUTORETRYSTATE --> IDLESTATE : to drive a repeat of a message
080
081NOTIFIEDSTATE : Transmit thread wakes up and processes
082POLLSTATE : Transient while deciding to send poll
083OKSENDMSGSTATE : Transient while deciding to send\noriginal message after mode change
084AUTORETRYSTATE : Transient while deciding to resend auto-retry message
085WAITREPLYINPROGMODESTATE : Sent request to go to programming mode,\nwaiting reply
086WAITREPLYINNORMMODESTATE : Sent request to go to normal mode,\nwaiting reply
087WAITMSGREPLYSTATE : Have sent message, waiting a\nresponse from layout
088
089Note left of AUTORETRYSTATE : This state handles timeout of\nmessages marked for autoretry
090Note left of OKSENDMSGSTATE : Transient internal state\nwill transition when going back\nto send message that\nwas deferred for mode change.
091
092@enduml
093 */
094
095public abstract class AbstractMRTrafficController {
096
097    private final Runnable shutDownTask = this::terminate; // retain for possible removal.
098
099    /**
100     * Create a new unnamed MRTrafficController.
101     */
102    public AbstractMRTrafficController() {
103        log.debug("Creating AbstractMRTrafficController instance");
104        mCurrentMode = NORMALMODE;
105        mCurrentState = IDLESTATE;
106        allowUnexpectedReply = false;
107
108
109        // We use a shutdown task here to make sure the connection is left
110        // in a clean state prior to exiting.  This is required on systems
111        // which have a service mode to ensure we don't leave the system
112        // in an unusable state. Once the shutdown task executes, the connection
113        // must be considered permanently closed.
114
115        InstanceManager.getDefault(ShutDownManager.class).register(shutDownTask);
116    }
117
118    private boolean synchronizeRx = true;
119
120    protected void setSynchronizeRx(boolean val) {
121        synchronizeRx = val;
122    }
123
124    protected boolean getSynchronizeRx() {
125        return synchronizeRx;
126    }
127
128    // The methods to implement the abstract Interface
129
130    protected final Vector<AbstractMRListener> cmdListeners = new Vector<>();
131
132    /**
133     * Add a Listener to the Listener list.
134     * @param l The Listener to be added, not null.
135     */
136    protected synchronized void addListener(AbstractMRListener l) {
137        // add only if not already registered
138        if (l == null) {
139            throw new NullPointerException();
140        }
141        if (!cmdListeners.contains(l)) {
142            cmdListeners.addElement(l);
143        }
144    }
145
146    /**
147     * Add a Listener to start of the Listener list.
148     * Intended for use only by system Consoles which may prefer notification
149     * before other objects have processed a Message and sent a Reply.
150     * @param l The Listener to be added, not null.
151     */
152    protected synchronized void addConsoleListener(@Nonnull AbstractMRListener l){
153        // add only if not already registered
154        if (!cmdListeners.contains(l)) {
155            cmdListeners.insertElementAt(l, 0);
156        }
157    }
158
159    /**
160     * Remove a Listener from the Listener list.
161     * The Listener will receive no further notifications.
162     * @param l The Listener to be removed.
163     */
164    protected synchronized void removeListener(AbstractMRListener l) {
165        if (cmdListeners.contains(l)) {
166            cmdListeners.removeElement(l);
167        }
168    }
169
170    /**
171     * Forward a Message to registered listeners.
172     *
173     * @param m     Message to be forwarded intact
174     * @param notMe One (optional) listener to be skipped, usually because it's
175     *              the originating object.
176     */
177    @SuppressWarnings("unchecked")
178    protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) {
179        // make a copy of the listener vector to synchronized not needed for transmit
180        Vector<AbstractMRListener> v;
181        synchronized (this) {
182            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
183            v = (Vector<AbstractMRListener>) cmdListeners.clone();
184        }
185        // forward to all listeners
186        int cnt = v.size();
187        for (int i = 0; i < cnt; i++) {
188            AbstractMRListener client = v.elementAt(i);
189            if (notMe != client) {
190                log.debug("notify message, client: {}", client);
191                try {
192                    forwardMessage(client, m);
193                } catch (RuntimeException e) {
194                    log.warn("notify: During message dispatch to {}", client, e);
195                }
196            }
197        }
198    }
199
200    /**
201     * Implement this to forward a specific message type to a protocol-specific
202     * listener interface.
203     * This puts the casting into the concrete class.
204     * @param client abstract listener.
205     * @param m message to forward.
206     */
207    protected abstract void forwardMessage(AbstractMRListener client, AbstractMRMessage m);
208
209    /**
210     * Invoked if it's appropriate to do low-priority polling of the command
211     * station, this should return the next message to send, or null if the
212     * TrafficController should just sleep.
213     * @return Formatted poll message
214     */
215    protected abstract AbstractMRMessage pollMessage();
216
217    protected abstract AbstractMRListener pollReplyHandler();
218
219    protected AbstractMRListener mLastSender = null;
220
221    protected volatile int mCurrentMode;
222    public static final int NORMALMODE = 1;
223    public static final int PROGRAMINGMODE = 4;
224
225    /**
226     * Set the system to programming mode.
227     * @see #enterNormalMode()
228     *
229     * @return any message that needs to be returned to the Command Station
230     * to change modes. If no message is needed, returns null.
231     */
232    protected abstract AbstractMRMessage enterProgMode();
233
234    /**
235     * Sets the system to normal mode during programming while in IDLESTATE.
236     * If {@link #programmerIdle()} returns true, enterNormalMode() is
237     * called after a timeout.
238     * @see #enterProgMode()
239     *
240     * @return any message that needs to be returned to the Command Station
241     * to change modes. If no message is needed, returns null.
242     */
243    protected abstract AbstractMRMessage enterNormalMode();
244
245    /**
246     * Check if the programmer is idle.
247     * Override in the system specific code if necessary (see notes for
248     * {@link #enterNormalMode()}.
249     *
250     * @return true if not busy programming
251     */
252    protected boolean programmerIdle() {
253        return true;
254    }
255
256    /**
257     * Get the delay (wait time) after enabling the programming track.
258     * Override in subclass to add a longer delay.
259     *
260     * @return 0 as default delay
261     */
262    protected int enterProgModeDelayTime() {
263        return 0;
264    }
265
266    protected volatile int mCurrentState;
267    public static final int IDLESTATE = 10;        // nothing happened
268    public static final int NOTIFIEDSTATE = 15;    // xmt notified, will next wake
269    public static final int WAITMSGREPLYSTATE = 25;  // xmt has sent, await reply to message
270    public static final int WAITREPLYINPROGMODESTATE = 30;  // xmt has done mode change, await reply
271    public static final int WAITREPLYINNORMMODESTATE = 35;  // xmt has done mode change, await reply
272    public static final int OKSENDMSGSTATE = 40;        // mode change reply here, send original msg
273    public static final int AUTORETRYSTATE = 45;        // received message where automatic recovery may occur with a retransmission, re-send original msg
274    public static final int POLLSTATE = 50;   // Send program mode or poll message
275
276    protected boolean allowUnexpectedReply;
277
278    /**
279     * Set whether the command station may send messages without a request
280     * sent to it.
281     *
282     * @param expected true to allow messages without a prior request
283     */
284    protected void setAllowUnexpectedReply(boolean expected) {
285        allowUnexpectedReply = expected;
286    }
287
288    /**
289     * Forward a "Reply" from layout to registered listeners.
290     *
291     * @param r    Reply to be forwarded intact
292     * @param dest One (optional) listener to be skipped, usually because it's
293     *             the originating object.
294     */
295    @SuppressWarnings("unchecked")
296    protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) {
297        // make a copy of the listener vector to synchronized (not needed for transmit?)
298        Vector<AbstractMRListener> v;
299        synchronized (this) {
300            // FIXME: unnecessary synchronized; the Vector IS already thread-safe.
301            v = (Vector<AbstractMRListener>) cmdListeners.clone();
302        }
303        // forward to all listeners
304        int cnt = v.size();
305        for (int i = 0; i < cnt; i++) {
306            AbstractMRListener client = v.elementAt(i);
307            log.debug("notify reply, client: {}", client);
308            try {
309                //skip dest for now, we'll send the message to there last.
310                if (dest != client) {
311                    forwardReply(client, r);
312                }
313            } catch (RuntimeException e) {
314                log.warn("notify: During reply dispatch to {}", client, e);
315            }
316        }
317
318        // forward to the last listener who sent a message
319        // this is done _second_ so monitoring can have already stored the reply
320        // before a response is sent
321        if (dest != null) {
322            log.debug("notify reply, dest: {}", dest);
323            forwardReply(dest, r);
324        }
325    }
326
327    protected abstract void forwardReply(AbstractMRListener client, AbstractMRReply m);
328
329    /**
330     * Messages to be transmitted.
331     */
332    protected LinkedList<AbstractMRMessage> msgQueue = new LinkedList<>();
333    protected LinkedList<AbstractMRListener> listenerQueue = new LinkedList<>();
334
335    /**
336     * Forward message to the port. Messages are queued and then the
337     * transmission thread is notified.
338     * @see #forwardToPort(AbstractMRMessage, AbstractMRListener)
339     *
340     * @param m the message to send
341     * @param reply the Listener sending the message, often provided as 'this'
342     */
343    protected synchronized void sendMessage(AbstractMRMessage m, AbstractMRListener reply) {
344        msgQueue.addLast(m);
345        listenerQueue.addLast(reply);
346        synchronized (xmtRunnable) {
347            if (mCurrentState == IDLESTATE) {
348                mCurrentState = NOTIFIEDSTATE;
349                xmtRunnable.notify();
350            }
351        }
352        if (m != null) {
353            log.debug("just notified transmit thread with message {}", m);
354        }
355    }
356
357    /**
358     * Permanent loop for the transmit thread.
359     */
360    protected void transmitLoop() {
361        log.debug("transmitLoop starts in {}", this);
362
363        // loop forever
364        while (!connectionError && !threadStopRequest) {
365            AbstractMRMessage m = null;
366            AbstractMRListener l = null;
367            // check for something to do
368            synchronized (this) {
369                if (!msgQueue.isEmpty()) {
370                    // yes, something to do
371                    m = msgQueue.getFirst();
372                    msgQueue.removeFirst();
373                    l = listenerQueue.getFirst();
374                    listenerQueue.removeFirst();
375                    mCurrentState = WAITMSGREPLYSTATE;
376                    log.debug("transmit loop has something to do: {}", m);
377                }  // release lock here to proceed in parallel
378            }
379            // if a message has been extracted, process it
380            if (m != null) {
381                // check for need to change mode
382                log.debug("Start msg, state = {}", mCurrentMode);
383                if (m.getNeededMode() != mCurrentMode) {
384                    AbstractMRMessage modeMsg;
385                    if (m.getNeededMode() == PROGRAMINGMODE) {
386                        // change state to programming mode and send message
387                        modeMsg = enterProgMode();
388                        if (modeMsg != null) {
389                            mCurrentState = WAITREPLYINPROGMODESTATE;
390                            log.debug("Enter Programming Mode");
391                            forwardToPort(modeMsg, null);
392                            // wait for reply
393                            transmitWait(m.getTimeout(), WAITREPLYINPROGMODESTATE, "enter programming mode interrupted");
394                        }
395                    } else {
396                        // change state to normal and send message
397                        modeMsg = enterNormalMode();
398                        if (modeMsg != null) {
399                            mCurrentState = WAITREPLYINNORMMODESTATE;
400                            log.debug("Enter Normal Mode");
401                            forwardToPort(modeMsg, null);
402                            // wait for reply
403                            transmitWait(m.getTimeout(), WAITREPLYINNORMMODESTATE, "enter normal mode interrupted");
404                        }
405                    }
406                    if (modeMsg != null) {
407                        checkReplyInDispatch();
408                        if (mCurrentState != OKSENDMSGSTATE) {
409                            handleTimeout(modeMsg, l);
410                        }
411                        mCurrentState = WAITMSGREPLYSTATE;
412                    } else {
413                        // no mode message required, but the message
414                        // needs a different mode
415                        log.debug("Setting mode to: {}", m.getNeededMode());
416                        mCurrentMode = m.getNeededMode();
417                    }
418                }
419                forwardToPort(m, l);
420                // reply expected?
421                if (m.replyExpected()) {
422                    log.debug("reply expected is true for message {}",m);
423                    // wait for a reply, or eventually timeout
424                    transmitWait(m.getTimeout(), WAITMSGREPLYSTATE, "transmitLoop interrupted");
425                    checkReplyInDispatch();
426                    if (mCurrentState == WAITMSGREPLYSTATE) {
427                        handleTimeout(m, l);
428                    } else if (mCurrentState == AUTORETRYSTATE) {
429                        log.info("Message added back to queue: {}", m);
430                        msgQueue.addFirst(m);
431                        listenerQueue.addFirst(l);
432                        synchronized (xmtRunnable) {
433                            mCurrentState = IDLESTATE;
434                        }
435                    } else {
436                        resetTimeout(m);
437                    }
438                } // just continue to the next message from here
439            } else {
440                // nothing to do
441                if (mCurrentState != IDLESTATE) {
442                    log.debug("Setting IDLESTATE");
443                    log.debug("Current Mode {}", mCurrentMode);
444                    mCurrentState = IDLESTATE;
445                }
446                // wait for something to send
447                if (mWaitBeforePoll > waitTimePoll || mCurrentMode == PROGRAMINGMODE) {
448                    try {
449                        long startTime = Calendar.getInstance().getTimeInMillis();
450                        synchronized (xmtRunnable) {
451                            xmtRunnable.wait(mWaitBeforePoll);
452                        }
453                        long endTime = Calendar.getInstance().getTimeInMillis();
454                        waitTimePoll = waitTimePoll + endTime - startTime;
455                    } catch (InterruptedException e) {
456                        Thread.currentThread().interrupt(); // retain if needed later
457                        // end of transmit loop
458                        break;
459                    }
460                }
461                // once we decide that mCurrentState is in the IDLESTATE and there's an xmt msg we must guarantee
462                // the change of mCurrentState to one of the waiting for reply states.  Therefore we need to synchronize.
463                synchronized (this) {
464                    if (mCurrentState != NOTIFIEDSTATE && mCurrentState != IDLESTATE) {
465                        log.error("left timeout in unexpected state: {}", mCurrentState);
466                    }
467                    if (mCurrentState == IDLESTATE) {
468                        mCurrentState = POLLSTATE; // this prevents other transitions from the IDLESTATE
469                    }
470                }
471                // went around with nothing to do; leave programming state if in it
472                if (mCurrentMode == PROGRAMINGMODE) {
473                    log.debug("Timeout - in service mode");
474                }
475                if (mCurrentState == POLLSTATE && mCurrentMode == PROGRAMINGMODE && programmerIdle()) {
476                    log.debug("timeout causes leaving programming mode");
477                    mCurrentState = WAITREPLYINNORMMODESTATE;
478                    AbstractMRMessage msg = enterNormalMode();
479                    // if the enterNormalMode() message is null, we
480                    // don't want to try to send it to the port.
481                    if (msg != null) {
482                        forwardToPort(msg, null);
483                        // wait for reply
484                        transmitWait(msg.getTimeout(), WAITREPLYINNORMMODESTATE, "interrupted while leaving programming mode");
485                        checkReplyInDispatch();
486                        // exit program mode timeout?
487                        if (mCurrentState == WAITREPLYINNORMMODESTATE) {
488                            // entering normal mode via timeout
489                            handleTimeout(msg, l);
490                            mCurrentMode = NORMALMODE;
491                        }
492                        // and go around again
493                    }
494                } else if (mCurrentState == POLLSTATE && mCurrentMode == NORMALMODE) {
495                    // We may need to poll
496                    AbstractMRMessage msg = pollMessage();
497                    if (msg != null) {
498                        // yes, send that
499                        log.debug("Sending poll, wait time {}", waitTimePoll);
500                        mCurrentState = WAITMSGREPLYSTATE;
501                        forwardToPort(msg, pollReplyHandler());
502                        // wait for reply
503                        log.debug("Still waiting for reply");
504                        transmitWait(msg.getTimeout(), WAITMSGREPLYSTATE, "interrupted while waiting poll reply");
505                        checkReplyInDispatch();
506                        // and go around again
507                        if (mCurrentState == WAITMSGREPLYSTATE) {
508                            handleTimeout(msg, l);
509                        } else {
510                            resetTimeout(msg);
511                        }
512                    }
513                    waitTimePoll = 0;
514                }
515                // no messages, so back to idle
516                if (mCurrentState == POLLSTATE) {
517                    mCurrentState = IDLESTATE;
518                }
519            }
520        }
521    }   // end of transmit loop; go around again
522
523    protected void transmitWait(int waitTime, int state, String interruptMessage) {
524        // wait() can have spurious wakeup!
525        // so we protect by making sure the entire timeout time is used
526        long currentTime = Calendar.getInstance().getTimeInMillis();
527        long endTime = currentTime + waitTime;
528        while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) {
529            long wait = endTime - currentTime;
530            try {
531                synchronized (xmtRunnable) {
532                    // Do not wait if the current state has changed since we
533                    // last set it.
534                    if (mCurrentState != state) {
535                        return;
536                    }
537                    xmtRunnable.wait(wait); // rcvr normally ends this w state change
538                }
539            } catch (InterruptedException e) {
540                Thread.currentThread().interrupt(); // retain if needed later
541                String[] packages = this.getClass().getName().split("\\.");
542                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
543                        +(packages.length>=1 ? packages[packages.length-1] :"");
544                if (!threadStopRequest) {
545                    log.error("{} in transmitWait(..) of {}", interruptMessage, name);
546                } else {
547                    log.debug("during shutdown, {}  in transmitWait(..) of {}", interruptMessage, name);
548                }
549            }
550        }
551        log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState);
552    }
553
554    // Dispatch control and timer
555    protected boolean replyInDispatch = false;          // true when reply has been received but dispatch not completed
556    private int maxDispatchTime = 0;
557    private int warningMessageTime = DISPATCH_WARNING_TIME;
558    private static final int DISPATCH_WAIT_INTERVAL = 100;
559    private static final int DISPATCH_WARNING_TIME = 12000; // report warning when max dispatch time exceeded
560    private static final int WARN_NEXT_TIME = 1000;         // report every second
561
562    private void checkReplyInDispatch() {
563        int loopCount = 0;
564        while (replyInDispatch) {
565            try {
566                synchronized (xmtRunnable) {
567                    xmtRunnable.wait(DISPATCH_WAIT_INTERVAL);
568                }
569            } catch (InterruptedException e) {
570                Thread.currentThread().interrupt(); // retain if needed later
571                if (threadStopRequest) return; // don't log an error if closing.
572                String[] packages = this.getClass().getName().split("\\.");
573                String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
574                        +(packages.length>=1 ? packages[packages.length-1] :"");
575                log.error("transmitLoop interrupted in class {}", name);
576            }
577            loopCount++;
578            int currentDispatchTime = loopCount * DISPATCH_WAIT_INTERVAL;
579            if (currentDispatchTime > maxDispatchTime) {
580                maxDispatchTime = currentDispatchTime;
581                if (currentDispatchTime >= warningMessageTime) {
582                    warningMessageTime = warningMessageTime + WARN_NEXT_TIME;
583                    log.debug("Max dispatch time is now {}", currentDispatchTime);
584                }
585            }
586        }
587    }
588
589    /**
590     *  Determine if the interface is down.
591     *
592     *  @return timeoutFlag
593     */
594    public boolean hasTimeouts() {
595        return timeoutFlag;
596    }
597
598    protected boolean timeoutFlag = false;
599    protected int timeouts = 0;
600    protected boolean flushReceiveChars = false;
601
602    protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) {
603        //log.debug("Timeout mCurrentState: {}", mCurrentState);
604        warnOnTimeout(msg, l);
605        timeouts++;
606        timeoutFlag = true;
607        flushReceiveChars = true;
608    }
609
610    protected void warnOnTimeout(AbstractMRMessage msg, AbstractMRListener l) {
611        String[] packages = this.getClass().getName().split("\\.");
612        String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
613                +(packages.length>=1 ? packages[packages.length-1] :"");
614
615        log.warn("Timeout on reply to message: {} consecutive timeouts = {} in {}", msg, timeouts, name);
616    }
617    
618    protected void resetTimeout(AbstractMRMessage msg) {
619        if (timeouts > 0) {
620            log.debug("Reset timeout after {} timeouts", timeouts);
621        }
622        timeouts = 0;
623        timeoutFlag = false;
624    }
625
626    /**
627     * Add header to the outgoing byte stream.
628     *
629     * @param msg the output byte stream
630     * @param m Message results
631     * @return next location in the stream to fill
632     */
633    protected int addHeaderToOutput(byte[] msg, AbstractMRMessage m) {
634        return 0;
635    }
636
637    protected int mWaitBeforePoll = 100;
638    protected long waitTimePoll = 0;
639
640    /**
641     * Add trailer to the outgoing byte stream.
642     *
643     * @param msg    the output byte stream
644     * @param offset the first byte not yet used
645     * @param m   output message to extend
646     */
647    protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) {
648        if (!m.isBinary()) {
649            msg[offset] = 0x0d;
650        }
651    }
652
653    /**
654     * Determine how many bytes the entire message will take, including
655     * space for header and trailer.
656     *
657     * @param m the message to be sent
658     * @return number of bytes
659     */
660    protected int lengthOfByteStream(AbstractMRMessage m) {
661        int len = m.getNumDataElements();
662        int cr = 0;
663        if (!m.isBinary()) {
664            cr = 1;  // space for return char
665        }
666        return len + cr;
667    }
668
669    protected boolean xmtException = false;
670
671    /**
672     * Actually transmit the next message to the port.
673     * @see #sendMessage(AbstractMRMessage, AbstractMRListener)
674     *
675     * @param m the message to send
676     * @param reply the Listener sending the message, often provided as 'this'
677     */
678    @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT"},
679            justification = "Two locks needed for synchronization here, this is OK")
680    protected synchronized void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) {
681        log.debug("forwardToPort message: [{}]", m);
682        // remember who sent this
683        mLastSender = reply;
684
685        // forward the message to the registered recipients,
686        // which includes the communications monitor, except the sender.
687        // Schedule notification via the Swing event queue to ensure order
688        log.trace("about to start XmtNotifier for {} last: {}", m, mLastSender, new Exception("traceback"));
689        Runnable r = new XmtNotifier(m, mLastSender, this);
690        SwingUtilities.invokeLater(r);
691
692        // stream to port in single write, as that's needed by serial
693        int byteLength = lengthOfByteStream(m);
694        byte[]  msg= new byte[byteLength];
695        log.debug("copying message, length = {}", byteLength);
696        // add header
697        int offset = addHeaderToOutput(msg, m);
698
699        // add data content
700        int len = m.getNumDataElements();
701        log.debug("copying data to message, length = {}", len);
702        if (len > byteLength) { // happens somehow
703            log.warn("Invalid message array size {} for {} elements, truncated", byteLength, len);
704        }
705        for (int i = 0; (i < len && i < byteLength); i++) {
706            msg[i + offset] = (byte) m.getElement(i);
707        }
708        // add trailer
709        addTrailerToOutput(msg, len + offset, m);
710        // and stream the bytes
711        try {
712            if (ostream != null) {
713                if (log.isDebugEnabled()) {
714                    StringBuilder f = new StringBuilder();
715                    for (int i = 0; i < msg.length; i++) {
716                        f.append(String.format("%02X ",0xFF & msg[i]));
717                    }
718                    log.debug("formatted message: {}", f.toString() );
719                }
720                while (m.getRetries() >= 0) {
721                    if (portReadyToSend(controller)) {
722                        ostream.write(msg);
723                        ostream.flush();
724                        log.debug("written, msg timeout: {} mSec", m.getTimeout());
725                        break;
726                    } else if (m.getRetries() >= 0) {
727                        log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries());
728                        m.setRetries(m.getRetries() - 1);
729                        try {
730                            synchronized (xmtRunnable) {
731                                xmtRunnable.wait(m.getTimeout());
732                            }
733                        } catch (InterruptedException e) {
734                            Thread.currentThread().interrupt(); // retain if needed later
735                            log.error("retry wait interrupted");
736                        }
737                    } else {
738                        log.warn("sendMessage: port not ready for data sending: {}", Arrays.toString(msg));
739                    }
740                }
741            } else {  // ostream is null
742                // no stream connected
743                connectionWarn();
744            }
745        } catch (IOException | RuntimeException e) {
746            // TODO Currently there's no port recovery if an exception occurs
747            // must restart JMRI to clear xmtException.
748            xmtException = true;
749            portWarn(e);
750        }
751    }
752
753    protected void connectionWarn() {
754        log.warn("sendMessage: no connection established for {}", this.getClass().getName(), new Exception());
755    }
756
757    protected void portWarn(Exception e) {
758        log.warn("sendMessage: Exception: In {} port warn: ", this.getClass().getName(), e);
759    }
760
761    protected boolean connectionError = false;
762
763    protected void portWarnTCP(Exception e) {
764        log.warn("Exception java net: ", e);
765        connectionError = true;
766    }
767    // methods to connect/disconnect to a source of data in an AbstractPortController
768
769    public AbstractPortController controller = null;
770
771    public boolean status() {
772        return (ostream != null && istream != null);
773    }
774
775    protected volatile Thread xmtThread = null;
776    protected volatile Thread rcvThread = null;
777
778    protected volatile Runnable xmtRunnable = null;
779
780    /**
781     * Make connection to an existing PortController object.
782     *
783     * @param p the PortController
784     */
785    public void connectPort(AbstractPortController p) {
786        rcvException = false;
787        connectionError = false;
788        xmtException = false;
789        threadStopRequest = false;
790        try {
791            istream = p.getInputStream();
792            ostream = p.getOutputStream();
793            if (controller != null) {
794                log.warn("connectPort: connect called while connected");
795            } else {
796                log.debug("connectPort invoked");
797            }
798            controller = p;
799            // and start threads
800            xmtThread = jmri.util.ThreadingUtil.newThread(
801                xmtRunnable = new Runnable() {
802                    @Override
803                    public void run() {
804                        try {
805                            transmitLoop();
806                        } catch (ThreadDeath td) {
807                            if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", td, td);
808                            // ThreadDeath must be thrown per Java API Javadocs
809                            throw td;
810                        } catch (Throwable e) {
811                            if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", e, e);
812                        }
813                }
814            });
815
816            String[] packages = this.getClass().getName().split("\\.");
817            xmtThread.setName(
818                (packages.length>=2 ? packages[packages.length-2]+"." :"")
819                +(packages.length>=1 ? packages[packages.length-1] :"")
820                +" Transmit thread");
821
822            xmtThread.setDaemon(true);
823            xmtThread.setPriority(Thread.MAX_PRIORITY-1);      //bump up the priority
824            xmtThread.start();
825
826            rcvThread = jmri.util.ThreadingUtil.newThread(
827                new Runnable() {
828                    @Override
829                    public void run() {
830                        receiveLoop();
831                    }
832                });
833            rcvThread.setName(
834                (packages.length>=2 ? packages[packages.length-2]+"." :"")
835                +(packages.length>=1 ? packages[packages.length-1] :"")
836                +" Receive thread");
837
838            rcvThread.setPriority(Thread.MAX_PRIORITY);      //bump up the priority
839            rcvThread.setDaemon(true);
840            rcvThread.start();
841
842        } catch (RuntimeException e) {
843            log.error("Failed to start up communications. Error was: ", e);
844            log.debug("Full trace:", e);
845        }
846    }
847
848    /**
849     * Get the port name for this connection from the TrafficController.
850     *
851     * @return the name of the port
852     */
853    public String getPortName() {
854        return controller.getCurrentPortName();
855    }
856
857    /**
858     * Break connection to existing PortController object. Once broken, attempts
859     * to send via "message" member will fail.
860     *
861     * @param p the PortController
862     */
863    public void disconnectPort(AbstractPortController p) {
864        istream = null;
865        ostream = null;
866        if (controller != p) {
867            log.warn("disconnectPort: disconnect called from non-connected AbstractPortController");
868        }
869        controller = null;
870        threadStopRequest = true;
871    }
872
873    /**
874     * Check if PortController object can be sent to.
875     *
876     * @param p the PortController
877     * @return true if ready, false otherwise May throw an Exception.
878     */
879    public boolean portReadyToSend(AbstractPortController p) {
880        if (p != null && !xmtException && !rcvException) {
881            return true;
882        } else {
883            return false;
884        }
885    }
886
887    // data members to hold the streams
888    protected DataInputStream istream = null;
889    protected OutputStream ostream = null;
890
891    protected boolean rcvException = false;
892
893    protected int maxRcvExceptionCount = 100;
894
895    /**
896     * Handle incoming characters. This is a permanent loop, looking for input
897     * messages in character form on the stream connected to the PortController
898     * via {@link #connectPort(AbstractPortController)}.
899     * <p>
900     * Each turn of the loop is the receipt of a single message.
901     */
902    public void receiveLoop() {
903        log.debug("receiveLoop starts in {}", this);
904        int errorCount = 0;
905        while (errorCount < maxRcvExceptionCount && !threadStopRequest) { // stream close will exit via exception
906            try {
907                handleOneIncomingReply();
908                errorCount = 0;
909            } catch (java.io.InterruptedIOException e) {
910                // related to InterruptedException, catch first
911                break;
912            } catch (IOException e) {
913                rcvException = true;
914                reportReceiveLoopException(e);
915                break;
916            } catch (RuntimeException e1) {
917                log.error("Exception in receive loop: {}", e1.toString(), e1);
918                errorCount++;
919                if (errorCount == maxRcvExceptionCount) {
920                    rcvException = true;
921                    reportReceiveLoopException(e1);
922                }
923            }
924        }
925        if (!threadStopRequest) { // if e.g. unexpected end
926            ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), ConnectionStatus.CONNECTION_DOWN);
927            log.debug("Exit from rcv loop in {}", this.getClass());
928            log.info("Exiting receive loop");
929            recovery(); // see if you can restart
930        }
931    }
932
933    /**
934     * Disconnect and reset the current PortController.
935     * Invoked at abnormal ending of receiveLoop.
936     */
937    protected final void recovery() {
938        AbstractPortController adapter = controller;
939        disconnectPort(controller);
940        adapter.recover();
941    }
942
943    /**
944     * Report an error on the receive loop. Separated so tests can suppress, even
945     * though message is asynchronous.
946     * @param e Exception encountered at lower level to trigger error, or null
947     */
948    protected void reportReceiveLoopException(Exception e) {
949        log.error("run: Exception: {} in {}", e.toString(), this.getClass().toString(), e);
950        jmri.jmrix.ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), jmri.jmrix.ConnectionStatus.CONNECTION_DOWN);
951        if (controller instanceof AbstractNetworkPortController) {
952            portWarnTCP(e);
953        }
954    }
955
956    protected abstract AbstractMRReply newReply();
957
958    protected abstract boolean endOfMessage(AbstractMRReply r);
959
960    /**
961     * Dummy routine, to be filled by protocols that have to skip some
962     * start-of-message characters.
963     * @param istream input source
964     * @throws IOException from underlying operations
965     */
966    protected void waitForStartOfReply(DataInputStream istream) throws IOException {
967    }
968
969    /**
970     * Read a single byte, protecting against various timeouts, etc.
971     * <p>
972     * When a port is set to have a receive timeout, some will return
973     * zero bytes, an EOFException or a InterruptedIOException at the end of the timeout. 
974     * In that case, the read()
975     * should be repeated to get the next real character.
976     *
977     * @param istream stream to read
978     * @return the byte read
979     * @throws java.io.IOException if unable to read
980     */
981    protected byte readByteProtected(DataInputStream istream) throws IOException {
982        if (istream == null) {
983            throw new IOException("Input Stream NULL when reading");
984        }
985        while (true) { // loop will repeat until character found
986            int nchars;
987            // The istream should be configured so that the following
988            // read(..) call only blocks for a short time, e.g. 100msec, if no
989            // data is available.  It's OK if it 
990            // throws e.g. java.io.InterruptedIOException
991            // in that case, as the calling loop should just go around
992            // and request input again.  This semi-blocking behavior will
993            // let the terminateThreads() method end this thread cleanly.
994            nchars = istream.read(rcvBuffer, 0, 1);
995            if (nchars == -1) {
996                // No more bytes can be read from the channel
997                throw new IOException("Connection not terminated normally");
998            }
999            if (nchars > 0) {
1000                return rcvBuffer[0];
1001            }
1002        }
1003    }
1004
1005    // Defined this way to reduce new object creation
1006    private byte[] rcvBuffer = new byte[1];
1007
1008    /**
1009     * Get characters from the input source, and file a message.
1010     * <p>
1011     * Returns only when the message is complete.
1012     * <p>
1013     * Only used in the Receive thread.
1014     * <p>
1015     * Handles timeouts on read by ignoring zero-length reads.
1016     *
1017     * @param msg     message to fill
1018     * @param istream character source.
1019     * @throws IOException when presented by the input source.
1020     */
1021    protected void loadChars(AbstractMRReply msg, DataInputStream istream)
1022            throws IOException {
1023        int i;
1024        for (i = 0; i < msg.maxSize(); i++) {
1025            byte char1 = readByteProtected(istream);
1026            log.trace("char: {} i: {}",(char1&0xFF),i);
1027            // if there was a timeout, flush any char received and start over
1028            if (flushReceiveChars) {
1029                log.warn("timeout flushes receive buffer: {}", msg);
1030                msg.flush();
1031                i = 0;  // restart
1032                flushReceiveChars = false;
1033            }
1034            if (canReceive()) {
1035                msg.setElement(i, char1);
1036                if (endOfMessage(msg)) {
1037                    break;
1038                }
1039            } else {
1040                i--; // flush char
1041                log.error("unsolicited character received: {}", Integer.toHexString(char1));
1042            }
1043        }
1044    }
1045
1046    /**
1047     * Override in the system specific code if necessary
1048     *
1049     * @return true if it is okay to buffer receive characters into a reply
1050     *         message. When false, discard char received
1051     */
1052    protected boolean canReceive() {
1053        return true;
1054    }
1055
1056    private int retransmitCount = 0;
1057
1058    /**
1059     * Executes a reply distribution action on the appropriate thread for JMRI.
1060     * @param r a runnable typically encapsulating a MRReply and the iteration code needed to
1061     *          send it to all the listeners.
1062     */
1063    protected void distributeReply(Runnable r) {
1064        try {
1065            if (synchronizeRx) {
1066                SwingUtilities.invokeAndWait(r);
1067            } else {
1068                SwingUtilities.invokeLater(r);
1069            }
1070        } catch (InterruptedException ie) {
1071            if (threadStopRequest) return;
1072            log.error("Unexpected exception in invokeAndWait: {}{}", ie, ie.toString());
1073        } catch (java.lang.reflect.InvocationTargetException| RuntimeException e) {
1074            log.error("Unexpected exception in invokeAndWait: {}{}", e, e.toString());
1075            return;
1076        }
1077        log.debug("dispatch thread invoked");
1078    }
1079
1080    /**
1081     * Handle each reply when complete.
1082     * <p>
1083     * (This is public for testing purposes) Runs in the "Receive" thread.
1084     *
1085     * @throws java.io.IOException on error.
1086     */
1087    public void handleOneIncomingReply() throws IOException {
1088        // we sit in this until the message is complete, relying on
1089        // threading to let other stuff happen
1090
1091        // Create message off the right concrete class
1092        AbstractMRReply msg = newReply();
1093
1094        // wait for start if needed
1095        waitForStartOfReply(istream);
1096
1097        // message exists, now fill it
1098        loadChars(msg, istream);
1099
1100        if (threadStopRequest) return;
1101
1102        // message is complete, dispatch it !!
1103        replyInDispatch = true;
1104        log.debug("dispatch reply of length {} contains \"{}\", state {}", msg.getNumDataElements(), msg, mCurrentState);
1105
1106        // forward the message to the registered recipients,
1107        // which includes the communications monitor
1108        // return a notification via the Swing event queue to ensure proper thread
1109        Runnable r = new RcvNotifier(msg, mLastSender, this);
1110        distributeReply(r);
1111
1112        if (!msg.isUnsolicited()) {
1113            // effect on transmit:
1114            switch (mCurrentState) {
1115                case WAITMSGREPLYSTATE: {
1116                    // check to see if the response was an error message we want
1117                    // to automatically handle by re-queueing the last sent
1118                    // message, otherwise go on to the next message
1119                    if (msg.isRetransmittableErrorMsg()) {
1120                        log.error("Automatic Recovery from Error Message: {}.  Retransmitted {} times.", msg, retransmitCount);
1121                        synchronized (xmtRunnable) {
1122                            mCurrentState = AUTORETRYSTATE;
1123                            if (retransmitCount > 0) {
1124                                try {
1125                                    xmtRunnable.wait(retransmitCount * 100L);
1126                                } catch (InterruptedException e) {
1127                                    Thread.currentThread().interrupt(); // retain if needed later
1128                                }
1129                            }
1130                            replyInDispatch = false;
1131                            xmtRunnable.notify();
1132                            retransmitCount++;
1133                        }
1134                    } else {
1135                        // update state, and notify to continue
1136                        synchronized (xmtRunnable) {
1137                            mCurrentState = NOTIFIEDSTATE;
1138                            replyInDispatch = false;
1139                            xmtRunnable.notify();
1140                            retransmitCount = 0;
1141                        }
1142                    }
1143                    break;
1144                }
1145                case WAITREPLYINPROGMODESTATE: {
1146                    // entering programming mode
1147                    mCurrentMode = PROGRAMINGMODE;
1148                    replyInDispatch = false;
1149
1150                    // check to see if we need to delay to allow decoders to become
1151                    // responsive
1152                    int warmUpDelay = enterProgModeDelayTime();
1153                    if (warmUpDelay != 0) {
1154                        try {
1155                            synchronized (xmtRunnable) {
1156                                xmtRunnable.wait(warmUpDelay);
1157                            }
1158                        } catch (InterruptedException e) {
1159                            Thread.currentThread().interrupt(); // retain if needed later
1160                        }
1161                    }
1162                    // update state, and notify to continue
1163                    synchronized (xmtRunnable) {
1164                        mCurrentState = OKSENDMSGSTATE;
1165                        xmtRunnable.notify();
1166                    }
1167                    break;
1168                }
1169                case WAITREPLYINNORMMODESTATE: {
1170                    // entering normal mode
1171                    mCurrentMode = NORMALMODE;
1172                    replyInDispatch = false;
1173                    // update state, and notify to continue
1174                    synchronized (xmtRunnable) {
1175                        mCurrentState = OKSENDMSGSTATE;
1176                        xmtRunnable.notify();
1177                    }
1178                    break;
1179                }
1180                default: {
1181                    replyInDispatch = false;
1182                    if (allowUnexpectedReply) {
1183                        log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg);
1184                        synchronized (xmtRunnable) {
1185                            // The transmit thread sometimes gets stuck
1186                            // when unexpected replies are received.  Notify
1187                            // it to clear the block without a timeout.
1188                            // (do not change the current state)
1189                            //if(mCurrentState!=IDLESTATE)
1190                            xmtRunnable.notify();
1191                        }
1192                    } else {
1193                        unexpectedReplyStateError(mCurrentState, msg.toString());
1194                    }
1195                }
1196            }
1197            // Unsolicited message
1198        } else {
1199            log.debug("Unsolicited Message Received {}", msg);
1200
1201            replyInDispatch = false;
1202        }
1203    }
1204
1205    /**
1206     * Log an error message for a message received in an unexpected state.
1207     * @param State message state.
1208     * @param msgString message string.
1209     */
1210    protected void unexpectedReplyStateError(int State, String msgString) {
1211       String[] packages = this.getClass().getName().split("\\.");
1212       String name = (packages.length>=2 ? packages[packages.length-2]+"." :"")
1213                     +(packages.length>=1 ? packages[packages.length-1] :"");
1214       log.error("reply complete in unexpected state: {} was {} in class {}", State, msgString, name);
1215    }
1216
1217    /**
1218     * for testing purposes, let us be able to find out
1219     * what the last sender was.
1220     * @return last sender, mLastSender.
1221     */
1222    public AbstractMRListener getLastSender() {
1223        return mLastSender;
1224    }
1225
1226    protected void terminate() {
1227        log.debug("Cleanup Starts");
1228        if (ostream == null) {
1229            return;    // no connection established
1230        }
1231        AbstractMRMessage modeMsg = enterNormalMode();
1232        if (modeMsg != null) {
1233            modeMsg.setRetries(100); // set the number of retries
1234            // high, just in case the interface
1235            // is busy when we try to send
1236            forwardToPort(modeMsg, null);
1237            // wait for reply
1238            try {
1239                if (xmtRunnable != null) {
1240                    synchronized (xmtRunnable) {
1241                        xmtRunnable.wait(modeMsg.getTimeout());
1242                    }
1243                }
1244            } catch (InterruptedException e) {
1245                Thread.currentThread().interrupt(); // retain if needed later
1246                log.error("transmit interrupted");
1247            }
1248        }
1249    }
1250
1251    /**
1252     * Internal class to remember the Reply object and destination listener with
1253     * a reply is received.
1254     */
1255    protected static class RcvNotifier implements Runnable {
1256
1257        AbstractMRReply mMsg;
1258        AbstractMRListener mDest;
1259        AbstractMRTrafficController mTc;
1260
1261        public RcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1262                AbstractMRTrafficController pTc) {
1263            mMsg = pMsg;
1264            mDest = pDest;
1265            mTc = pTc;
1266        }
1267
1268        @Override
1269        public void run() {
1270            log.debug("Delayed rcv notify starts");
1271            mTc.notifyReply(mMsg, mDest);
1272        }
1273    } // end RcvNotifier
1274
1275    // allow creation of object outside package
1276    protected RcvNotifier newRcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest,
1277            AbstractMRTrafficController pTc) {
1278        return new RcvNotifier(pMsg, pDest, pTc);
1279    }
1280
1281    /**
1282     * Internal class to remember the Message object and destination listener
1283     * when a message is queued for notification.
1284     */
1285    protected static class XmtNotifier implements Runnable {
1286
1287        AbstractMRMessage mMsg;
1288        AbstractMRListener mDest;
1289        AbstractMRTrafficController mTc;
1290
1291        public XmtNotifier(AbstractMRMessage pMsg, AbstractMRListener pDest,
1292                AbstractMRTrafficController pTc) {
1293            mMsg = pMsg;
1294            mDest = pDest;
1295            mTc = pTc;
1296        }
1297
1298        @Override
1299        public void run() {
1300            log.debug("Delayed xmt notify starts");
1301            mTc.notifyMessage(mMsg, mDest);
1302        }
1303    }  // end XmtNotifier
1304
1305    /**
1306     * Terminate the receive and transmit threads.
1307     * <p>
1308     * This is intended to be used only by testing subclasses.
1309     */
1310    public void terminateThreads() {
1311        threadStopRequest = true;
1312        if (xmtThread != null) {
1313            xmtThread.interrupt();
1314            try {
1315                xmtThread.join(150);
1316            } catch (InterruptedException ie){
1317                // interrupted during cleanup.
1318            }
1319        }
1320
1321        if (rcvThread != null) {
1322            rcvThread.interrupt();
1323            try {
1324                rcvThread.join(150);
1325            } catch (InterruptedException ie){
1326                // interrupted during cleanup.
1327            }
1328        }
1329        // we also need to remove the shutdown task.
1330        InstanceManager.getDefault(ShutDownManager.class).deregister(shutDownTask);
1331    }
1332
1333    /**
1334     * Flag that threads should terminate as soon as they can.
1335     */
1336    protected volatile boolean threadStopRequest = false;
1337
1338    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(AbstractMRTrafficController.class);
1339
1340}