001package jmri.jmrix.loconet;
002
003import java.io.DataInputStream;
004import java.io.OutputStream;
005import java.util.LinkedList;
006import java.util.NoSuchElementException;
007import org.slf4j.Logger;
008import org.slf4j.LoggerFactory;
009
010/**
011 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface"
012 * side sends/receives LocoNetMessage objects. The connection to a
013 * LnPortController is via a pair of *Streams, which then carry sequences of
014 * characters for transmission.
015 * <p>
016 * Messages come to this via the main GUI thread, and are forwarded back to
017 * listeners in that same thread. Reception and transmission are handled in
018 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal
019 * classes defined here. The thread priorities are:
020 * <ul>
021 *   <li> RcvHandler - at highest available priority
022 *   <li> XmtHandler - down one, which is assumed to be above the GUI
023 *   <li> (everything else)
024 * </ul>
025 * Some of the message formats used in this class are Copyright Digitrax, Inc.
026 * and used with permission as part of the JMRI project. That permission does
027 * not extend to uses in other software products. If you wish to use this code,
028 * algorithm or these message formats outside of JMRI, please contact Digitrax
029 * Inc for separate permission.
030 *
031 * @author Bob Jacobsen Copyright (C) 2001, 2018
032 * @author B. Milhaupt  Copyright (C) 2020
033 */
034public class LnPacketizer extends LnTrafficController {
035
036    /**
037     * True if the external hardware is not echoing messages, so we must.
038     */
039    protected boolean echo = false;  // true = echo messages here, instead of in hardware
040
041    public LnPacketizer(LocoNetSystemConnectionMemo m) {
042        // set the memo to point here
043        memo = m;
044        m.setLnTrafficController(this);
045    }
046
047    // The methods to implement the LocoNetInterface
048
049    /**
050     * {@inheritDoc}
051     */
052    @Override
053    public boolean status() {
054        boolean returnVal = ( ostream != null && istream != null
055                && xmtThread != null && xmtThread.isAlive() && xmtHandler != null
056                && rcvThread != null && rcvThread.isAlive() && rcvHandler != null
057                );
058        return returnVal;
059    }
060
061    /**
062     * Synchronized list used as a transmit queue.
063     */
064    protected LinkedList<byte[]> xmtList = new LinkedList<>();
065
066    /**
067     * XmtHandler (a local class) object to implement the transmit thread.
068     * <p>
069     * We create this object in startThreads() as each packetizer uses different handlers.
070     * So long as the object is created before using it to sync it works.
071     *
072     */
073    protected Runnable xmtHandler = null;
074
075    /**
076     * RcvHandler (a local class) object to implement the receive thread
077     */
078    protected Runnable rcvHandler;
079
080    /**
081     * Forward a preformatted LocoNetMessage to the actual interface.
082     * <p>
083     * Checksum is computed and overwritten here, then the message is converted
084     * to a byte array and queued for transmission.
085     *
086     * @param m Message to send; will be updated with CRC
087     */
088    @Override
089    public void sendLocoNetMessage(LocoNetMessage m) {
090
091        // update statistics
092        transmittedMsgCount++;
093
094        // set the error correcting code byte(s) before transmittal
095        m.setParity();
096
097        // stream to port in single write, as that's needed by serial
098        int len = m.getNumDataElements();
099        byte msg[] = new byte[len];
100        for (int i = 0; i < len; i++) {
101            msg[i] = (byte) m.getElement(i);
102        }
103
104        log.debug("queue LocoNet packet: {}", m);
105        // We need to queue the request and wake the xmit thread in an atomic operation
106        // But the thread might not be running, in which case the request is just
107        // queued up.
108        try {
109            synchronized (xmtHandler) {
110                xmtList.addLast(msg);
111                xmtHandler.notifyAll();
112            }
113        } catch (RuntimeException e) {
114            log.warn("passing to xmit: unexpected exception: ", e);
115        }
116    }
117
118    /**
119     * Implement abstract method to signal if there's a backlog of information
120     * waiting to be sent.
121     *
122     * @return true if busy, false if nothing waiting to send
123     */
124    @Override
125    public boolean isXmtBusy() {
126        if (controller == null) {
127            return false;
128        }
129
130        return (!controller.okToSend());
131    }
132
133    // methods to connect/disconnect to a source of data in a LnPortController
134
135    protected LnPortController controller = null;
136
137    /**
138     * Make connection to an existing LnPortController object.
139     *
140     * @param p Port controller for connected. Save this for a later disconnect
141     *          call
142     */
143    public void connectPort(LnPortController p) {
144        istream = p.getInputStream();
145        ostream = p.getOutputStream();
146        if (controller != null) {
147            log.warn("connectPort: connect called while connected");
148        }
149        controller = p;
150    }
151
152    /**
153     * Break connection to an existing LnPortController object. Once broken,
154     * attempts to send via "message" member will fail.
155     *
156     * @param p previously connected port
157     */
158    public void disconnectPort(LnPortController p) {
159        istream = null;
160        ostream = null;
161        if (controller != p) {
162            log.warn("disconnectPort: disconnect called from non-connected LnPortController");
163        }
164        controller = null;
165    }
166
167    // data members to hold the streams. These are public so the inner classes defined here
168    // can access them with a Java 1.1 compiler
169    public DataInputStream istream = null;
170    public OutputStream ostream = null;
171
172    /**
173     * Read a single byte, protecting against various timeouts, etc.
174     * <p>
175     * When a port is set to have a receive timeout (via the
176     * enableReceiveTimeout() method), some will return zero bytes or an
177     * EOFException at the end of the timeout. In that case, the read should be
178     * repeated to get the next real character.
179     *
180     * @param istream stream to read from
181     * @return buffer of received data
182     * @throws java.io.IOException failure during stream read
183     *
184     */
185    protected byte readByteProtected(DataInputStream istream) throws java.io.IOException {
186        while (true) { // loop will repeat until character found
187            int nchars;
188            nchars = istream.read(rcvBuffer, 0, 1);
189            if (nchars > 0) {
190                return rcvBuffer[0];
191            }
192        }
193    }
194    // Defined this way to reduce new object creation
195    private final byte[] rcvBuffer = new byte[1];
196
197    /**
198     * Captive class to handle incoming characters. This is a permanent loop,
199     * looking for input messages in character form on the stream connected to
200     * the LnPortController via <code>connectPort</code>.
201     */
202    protected class RcvHandler implements Runnable {
203
204        /**
205         * Remember the LnPacketizer object
206         */
207        LnTrafficController trafficController;
208
209        public RcvHandler(LnTrafficController lt) {
210            trafficController = lt;
211        }
212
213        /**
214         * Handle incoming characters. This is a permanent loop, looking for
215         * input messages in character form on the stream connected to the
216         * LnPortController via <code>connectPort</code>. Terminates with the
217         * input stream breaking out of the try block.
218         */
219        @Override
220        public void run() {
221
222            int opCode;
223            while (!threadStopRequest) {   // loop until asked to stop
224                try {
225                    // start by looking for command -  skip if bit not set
226                    while (((opCode = (readByteProtected(istream) & 0xFF)) & 0x80) == 0) { // the real work is in the loop check
227                        if (log.isTraceEnabled()) { // avoid building string
228                            log.trace("Skipping: {}", Integer.toHexString(opCode)); // NOI18N
229                        }
230                    }
231                    // here opCode is OK. Create output message
232                    if (log.isTraceEnabled()) { // avoid building string
233                        log.trace(" (RcvHandler) Start message with opcode: {}", Integer.toHexString(opCode)); // NOI18N
234                    }
235                    LocoNetMessage msg = null;
236                    while (msg == null) {
237                        try {
238                            // Capture 2nd byte, always present
239                            int byte2 = readByteProtected(istream) & 0xFF;
240                            if (log.isTraceEnabled()) { // avoid building string
241                                log.trace("Byte2: {}", Integer.toHexString(byte2)); // NOI18N
242                            }                            // Decide length
243                            int len = 2;
244                            switch ((opCode & 0x60) >> 5) {
245                                case 0:
246                                    /* 2 byte message */
247
248                                    len = 2;
249                                    break;
250
251                                case 1:
252                                    /* 4 byte message */
253
254                                    len = 4;
255                                    break;
256
257                                case 2:
258                                    /* 6 byte message */
259
260                                    len = 6;
261                                    break;
262
263                                case 3:
264                                    /* N byte message */
265
266                                    if (byte2 < 2) {
267                                        log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); // NOI18N
268                                    }
269                                    len = byte2;
270                                    break;
271                                default:
272                                    log.warn("Unhandled code: {}", (opCode & 0x60) >> 5);
273                                    break;
274                            }
275                            msg = new LocoNetMessage(len);
276                            // message exists, now fill it
277                            msg.setOpCode(opCode);
278                            msg.setElement(1, byte2);
279                            log.trace("len: {}", len); // NOI18N
280                            for (int i = 2; i < len; i++) {
281                                // check for message-blocking error
282                                int b = readByteProtected(istream) & 0xFF;
283                                if (log.isTraceEnabled()) {
284                                    log.trace("char {} is: {}", i, Integer.toHexString(b)); // NOI18N
285                                }
286                                if ((b & 0x80) != 0) {
287                                    log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); // NOI18N
288                                    opCode = b;
289                                    throw new LocoNetMessageException();
290                                }
291                                msg.setElement(i, b);
292                            }
293                        } catch (LocoNetMessageException e) {
294                            // retry by destroying the existing message
295                            // opCode is set for the newly-started packet
296                            msg = null;
297                        }
298                    }
299                    // check parity
300                    if (!msg.checkParity()) {
301                        log.warn("Ignore LocoNet packet with bad checksum: {}", msg);
302                        throw new LocoNetMessageException();
303                    }
304                    // message is complete, dispatch it !!
305                    {
306                        log.debug("queue message for notification: {}", msg);
307
308                        jmri.util.ThreadingUtil.runOnLayoutEventually(new RcvMemo(msg, trafficController));
309                    }
310
311                    // done with this one
312                } catch (LocoNetMessageException e) {
313                    // just let it ride for now
314                    log.warn("run: unexpected LocoNetMessageException", e); // NOI18N
315                } catch (java.io.EOFException e) {
316                    // posted from idle port when enableReceiveTimeout used
317                    log.trace("EOFException, is LocoNet serial I/O using timeouts?"); // NOI18N
318                } catch (java.io.IOException e) {
319                    // fired when write-end of HexFile reaches end
320                    log.debug("IOException, should only happen with HexFile", e); // NOI18N
321                    log.info("End of file"); // NOI18N
322                    disconnectPort(controller);
323                    return;
324                } // normally, we don't catch RuntimeException, but in this
325                  // permanently running loop it seems wise.
326                catch (RuntimeException e) {
327                    log.warn("run: unexpected Exception", e); // NOI18N
328                }
329            } // end of permanent loop
330        }
331    }
332
333    /**
334     * Captive class to notify of one message.
335     */
336    private static class RcvMemo implements jmri.util.ThreadingUtil.ThreadAction {
337
338        public RcvMemo(LocoNetMessage msg, LnTrafficController trafficController) {
339            thisMsg = msg;
340            thisTc = trafficController;
341        }
342        LocoNetMessage thisMsg;
343        LnTrafficController thisTc;
344
345        /**
346         * {@inheritDoc}
347         */
348        @Override
349        public void run() {
350            thisTc.notify(thisMsg);
351        }
352    }
353
354    /**
355     * Captive class to handle transmission.
356     */
357    class XmtHandler implements Runnable {
358
359        /**
360         * Loops forever, looking for message to send and processing them.
361         */
362        @Override
363        public void run() {
364
365            while (!threadStopRequest) {   // loop until asked to stop
366                // any input?
367                try {
368                    // get content; failure is a NoSuchElementException
369                    log.trace("check for input"); // NOI18N
370                    byte msg[] = null;
371                    synchronized (this) {
372                        msg = xmtList.removeFirst();
373                    }
374
375                    // input - now send
376                    try {
377                        if (ostream != null) {
378                            if (log.isDebugEnabled()) { // avoid work if not needed
379                                if (isXmtBusy()) log.debug("LocoNet port not ready to receive"); // NOI18N
380                                log.debug("start write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N
381                            }
382                            ostream.write(msg);
383                            ostream.flush();
384                            if (log.isTraceEnabled()) { // avoid String building if not needed
385                                log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N
386                            }
387                            messageTransmitted(msg);
388                        } else {
389                            // no stream connected
390                            log.warn("sendLocoNetMessage: no connection established"); // NOI18N
391                        }
392                    } catch (java.io.IOException e) {
393                        log.warn("sendLocoNetMessage: IOException: {}", e.toString()); // NOI18N
394                    }
395                } catch (NoSuchElementException e) {
396                    // message queue was empty, wait for input
397                    log.trace("start wait"); // NOI18N
398
399                    new jmri.util.WaitHandler(this); // handle synchronization, spurious wake, interruption
400
401                    log.trace("end wait"); // NOI18N
402                }
403            }
404        }
405    }
406
407    /**
408     * When a message is finally transmitted, forward it to listeners if echoing
409     * is needed.
410     *
411     * @param msg message sent
412     */
413    protected void messageTransmitted(byte[] msg) {
414        log.debug("message transmitted (echo {})", echo);
415        if (!echo) {
416            return;
417        }
418        // message is queued for transmit, echo it when needed
419        // return a notification via the queue to ensure end
420        javax.swing.SwingUtilities.invokeLater(new Echo(this, new LocoNetMessage(msg)));
421    }
422
423    static class Echo implements Runnable {
424
425        Echo(LnPacketizer t, LocoNetMessage m) {
426            myTc = t;
427            msgForLater = m;
428        }
429        LocoNetMessage msgForLater;
430        LnPacketizer myTc;
431
432        /**
433         * {@inheritDoc}
434         */
435        @Override
436        public void run() {
437            myTc.notify(msgForLater);
438        }
439    }
440
441    /**
442     * Invoked at startup to start the threads needed here.
443     */
444    public void startThreads() {
445        int priority = Thread.currentThread().getPriority();
446        log.debug("startThreads current priority = {} max available = {} default = {} min available = {}", // NOI18N
447                priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY);
448
449        // start the RcvHandler in a thread of its own
450        if (rcvHandler == null) {
451            rcvHandler = new RcvHandler(this);
452        }
453        rcvThread = new Thread(rcvHandler, "LocoNet receive handler"); // NOI18N
454        rcvThread.setDaemon(true);
455        rcvThread.setPriority(Thread.MAX_PRIORITY);
456        rcvThread.start();
457
458        if (xmtHandler == null) {
459            xmtHandler = new XmtHandler();
460        }
461        // make sure that the xmt priority is no lower than the current priority
462        int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY);
463        // start the XmtHandler in a thread of its own
464        if (xmtThread == null) {
465            xmtThread = new Thread(xmtHandler, "LocoNet transmit handler"); // NOI18N
466        }
467        log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N
468        xmtThread.setDaemon(true);
469        xmtThread.setPriority(Thread.MAX_PRIORITY - 1);
470        xmtThread.start();
471
472        log.info("lnPacketizer Started");
473    }
474
475    protected Thread rcvThread;
476    protected Thread xmtThread;
477
478    /**
479     * {@inheritDoc}
480     */
481    @SuppressWarnings("deprecation") // Thread.stop()
482    @Override
483    public void dispose() {
484        if (xmtThread != null) {
485            xmtThread.stop(); // interrupt not sufficient?
486            try {
487                xmtThread.join();
488            } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);}
489        }
490        if (rcvThread != null) {
491            rcvThread.stop(); // interrupt not sufficient, because jtermios hangs in select via purejavacomm.PureJavaSerialPort$2.read
492            try {
493                rcvThread.join();
494            } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);}
495        }
496        super.dispose();
497    }
498
499    /**
500     * Terminate the receive and transmit threads.
501     * <p>
502     * This is intended to be used only by testing subclasses.
503     */
504    public void terminateThreads() {
505        threadStopRequest = true;
506        if (xmtThread != null) {
507            xmtThread.interrupt();
508            try {
509                xmtThread.join();
510            } catch (InterruptedException ie){
511                // interrupted during cleanup.
512            }
513        }
514
515        if (rcvThread != null) {
516            rcvThread.interrupt();
517            try {
518                rcvThread.join();
519            } catch (InterruptedException ie){
520                // interrupted during cleanup.
521            }
522        }
523    }
524
525    /**
526     * Flag that threads should terminate as soon as they can.
527     */
528    protected volatile boolean threadStopRequest = false;
529
530    private final static Logger log = LoggerFactory.getLogger(LnPacketizer.class);
531
532}