001package jmri.jmrix.loconet;
002
003import java.io.DataInputStream;
004import java.io.OutputStream;
005import java.util.concurrent.LinkedTransferQueue;
006import org.slf4j.Logger;
007import org.slf4j.LoggerFactory;
008
009/**
010 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface"
011 * side sends/receives LocoNetMessage objects. The connection to a
012 * LnPortController is via a pair of *Streams, which then carry sequences of
013 * characters for transmission.
014 * <p>
015 * Messages come to this via the main GUI thread, and are forwarded back to
016 * listeners in that same thread. Reception and transmission are handled in
017 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal
018 * classes defined here. The thread priorities are:
019 * <ul>
020 *   <li> RcvHandler - at highest available priority
021 *   <li> XmtHandler - down one, which is assumed to be above the GUI
022 *   <li> (everything else)
023 * </ul>
024 * Some of the message formats used in this class are Copyright Digitrax, Inc.
025 * and used with permission as part of the JMRI project. That permission does
026 * not extend to uses in other software products. If you wish to use this code,
027 * algorithm or these message formats outside of JMRI, please contact Digitrax
028 * Inc for separate permission.
029 *
030 * @author Bob Jacobsen Copyright (C) 2001, 2018
031 * @author B. Milhaupt  Copyright (C) 2020
032 */
033public class LnPacketizer extends LnTrafficController {
034
035    /**
036     * True if the external hardware is not echoing messages, so we must.
037     */
038    protected boolean echo = false;  // true = echo messages here, instead of in hardware
039
040    public LnPacketizer(LocoNetSystemConnectionMemo m) {
041        // set the memo to point here
042        memo = m;
043        m.setLnTrafficController(this);
044    }
045
046    // The methods to implement the LocoNetInterface
047
048    /**
049     * {@inheritDoc}
050     */
051    @Override
052    public boolean status() {
053        boolean returnVal = ( ostream != null && istream != null
054                && xmtThread != null && xmtThread.isAlive() && xmtHandler != null
055                && rcvThread != null && rcvThread.isAlive() && rcvHandler != null
056                );
057        return returnVal;
058    }
059
060    /**
061     * Synchronized list used as a transmit queue.
062     */
063    protected LinkedTransferQueue<byte[]> xmtList = new LinkedTransferQueue<>();
064
065    /**
066     * XmtHandler (a local class) object to implement the transmit thread.
067     * <p>
068     * We create this object in startThreads() as each packetizer uses different handlers.
069     * So long as the object is created before using it to sync it works.
070     *
071     */
072    protected Runnable xmtHandler = null;
073
074    /**
075     * RcvHandler (a local class) object to implement the receive thread
076     */
077    protected Runnable rcvHandler;
078
079    /**
080     * Forward a preformatted LocoNetMessage to the actual interface.
081     * <p>
082     * Checksum is computed and overwritten here, then the message is converted
083     * to a byte array and queued for transmission.
084     *
085     * @param m Message to send; will be updated with CRC
086     */
087    @Override
088    public void sendLocoNetMessage(LocoNetMessage m) {
089
090        // update statistics
091        transmittedMsgCount++;
092
093        // set the error correcting code byte(s) before transmittal
094        m.setParity();
095
096        // stream to port in single write, as that's needed by serial
097        int len = m.getNumDataElements();
098        byte msg[] = new byte[len];
099        for (int i = 0; i < len; i++) {
100            msg[i] = (byte) m.getElement(i);
101        }
102
103        log.debug("queue LocoNet packet: {}", m);
104        // We need to queue the request and wake the xmit thread in an atomic operation
105        // But the thread might not be running, in which case the request is just
106        // queued up.
107        try {
108            xmtList.add(msg);
109        } catch (RuntimeException e) {
110            log.warn("passing to xmit: unexpected exception: ", e);
111        }
112    }
113
114    /**
115     * Implement abstract method to signal if there's a backlog of information
116     * waiting to be sent.
117     *
118     * @return true if busy, false if nothing waiting to send
119     */
120    @Override
121    public boolean isXmtBusy() {
122        if (controller == null) {
123            return false;
124        }
125
126        return (!controller.okToSend());
127    }
128
129    // methods to connect/disconnect to a source of data in a LnPortController
130
131    protected LnPortController controller = null;
132
133    /**
134     * Make connection to an existing LnPortController object.
135     *
136     * @param p Port controller for connected. Save this for a later disconnect
137     *          call
138     */
139    public void connectPort(LnPortController p) {
140        istream = p.getInputStream();
141        ostream = p.getOutputStream();
142        if (controller != null) {
143            log.warn("connectPort: connect called while connected");
144        }
145        controller = p;
146    }
147
148    /**
149     * Break connection to an existing LnPortController object. Once broken,
150     * attempts to send via "message" member will fail.
151     *
152     * @param p previously connected port
153     */
154    public void disconnectPort(LnPortController p) {
155        istream = null;
156        ostream = null;
157        if (controller != p) {
158            log.warn("disconnectPort: disconnect called from non-connected LnPortController");
159        }
160        controller = null;
161    }
162
163    // data members to hold the streams. These are public so the inner classes defined here
164    // can access them with a Java 1.1 compiler
165    public DataInputStream istream = null;
166    public OutputStream ostream = null;
167
168    /**
169     * Read a single byte, protecting against various timeouts, etc.
170     * <p>
171     * When a port is set to have a receive timeout (via the
172     * enableReceiveTimeout() method), some will return zero bytes or an
173     * EOFException at the end of the timeout. In that case, the read should be
174     * repeated to get the next real character.
175     *
176     * @param istream stream to read from
177     * @return buffer of received data
178     * @throws java.io.IOException failure during stream read
179     *
180     */
181    protected byte readByteProtected(DataInputStream istream) throws java.io.IOException {
182        while (true) { // loop will repeat until character found
183            int nchars;
184            nchars = istream.read(rcvBuffer, 0, 1);
185            if (nchars > 0) {
186                return rcvBuffer[0];
187            }
188        }
189    }
190    // Defined this way to reduce new object creation
191    private final byte[] rcvBuffer = new byte[1];
192
193    /**
194     * Captive class to handle incoming characters. This is a permanent loop,
195     * looking for input messages in character form on the stream connected to
196     * the LnPortController via <code>connectPort</code>.
197     */
198    protected class RcvHandler implements Runnable {
199
200        /**
201         * Remember the LnPacketizer object
202         */
203        LnTrafficController trafficController;
204
205        public RcvHandler(LnTrafficController lt) {
206            trafficController = lt;
207        }
208
209        /**
210         * Handle incoming characters. This is a permanent loop, looking for
211         * input messages in character form on the stream connected to the
212         * LnPortController via <code>connectPort</code>. Terminates with the
213         * input stream breaking out of the try block.
214         */
215        @Override
216        public void run() {
217
218            int opCode;
219            while (!threadStopRequest && ! Thread.interrupted() ) {   // loop until asked to stop
220                try {
221                    // start by looking for command -  skip if bit not set
222                    while (((opCode = (readByteProtected(istream) & 0xFF)) & 0x80) == 0) { // the real work is in the loop check
223                        if (log.isTraceEnabled()) { // avoid building string
224                            log.trace("Skipping: {}", Integer.toHexString(opCode)); // NOI18N
225                        }
226                    }
227                    // here opCode is OK. Create output message
228                    if (log.isTraceEnabled()) { // avoid building string
229                        log.trace(" (RcvHandler) Start message with opcode: {}", Integer.toHexString(opCode)); // NOI18N
230                    }
231                    LocoNetMessage msg = null;
232                    while (msg == null) {
233                        try {
234                            // Capture 2nd byte, always present
235                            int byte2 = readByteProtected(istream) & 0xFF;
236                            if (log.isTraceEnabled()) { // avoid building string
237                                log.trace("Byte2: {}", Integer.toHexString(byte2)); // NOI18N
238                            }                            // Decide length
239                            int len = 2;
240                            switch ((opCode & 0x60) >> 5) {
241                                case 0:
242                                    /* 2 byte message */
243
244                                    len = 2;
245                                    break;
246
247                                case 1:
248                                    /* 4 byte message */
249
250                                    len = 4;
251                                    break;
252
253                                case 2:
254                                    /* 6 byte message */
255
256                                    len = 6;
257                                    break;
258
259                                case 3:
260                                    /* N byte message */
261
262                                    if (byte2 < 2) {
263                                        log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); // NOI18N
264                                    }
265                                    len = byte2;
266                                    break;
267                                default:
268                                    log.warn("Unhandled code: {}", (opCode & 0x60) >> 5);
269                                    break;
270                            }
271                            msg = new LocoNetMessage(len);
272                            // message exists, now fill it
273                            msg.setOpCode(opCode);
274                            msg.setElement(1, byte2);
275                            log.trace("len: {}", len); // NOI18N
276                            for (int i = 2; i < len; i++) {
277                                // check for message-blocking error
278                                int b = readByteProtected(istream) & 0xFF;
279                                if (log.isTraceEnabled()) {
280                                    log.trace("char {} is: {}", i, Integer.toHexString(b)); // NOI18N
281                                }
282                                if ((b & 0x80) != 0) {
283                                    log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); // NOI18N
284                                    opCode = b;
285                                    throw new LocoNetMessageException();
286                                }
287                                msg.setElement(i, b);
288                            }
289                        } catch (LocoNetMessageException e) {
290                            // retry by destroying the existing message
291                            // opCode is set for the newly-started packet
292                            msg = null;
293                        }
294                    }
295                    // check parity
296                    if (!msg.checkParity()) {
297                        log.warn("Ignore LocoNet packet with bad checksum: {}", msg);
298                        throw new LocoNetMessageException();
299                    }
300                    // message is complete, dispatch it !!
301                    {
302                        log.debug("queue message for notification: {}", msg);
303
304                        jmri.util.ThreadingUtil.runOnLayoutEventually(new RcvMemo(msg, trafficController));
305                    }
306
307                    // done with this one
308                } catch (LocoNetMessageException e) {
309                    // just let it ride for now
310                    log.warn("run: unexpected LocoNetMessageException", e); // NOI18N
311                } catch (java.io.EOFException | com.fazecast.jSerialComm.SerialPortTimeoutException e) {
312                    // posted from idle port when enableReceiveTimeout used
313                } catch (java.io.IOException e) {
314                    // fired when write-end of HexFile reaches end
315                    log.debug("IOException, should only happen with HexFile", e); // NOI18N
316                    log.info("End of file"); // NOI18N
317                    disconnectPort(controller);
318                    return;
319                } // normally, we don't catch RuntimeException, but in this
320                  // permanently running loop it seems wise.
321                catch (RuntimeException e) {
322                    log.warn("run: unexpected Exception", e); // NOI18N
323                }
324            } // end of permanent loop
325        }
326    }
327
328    /**
329     * Captive class to notify of one message.
330     */
331    private static class RcvMemo implements jmri.util.ThreadingUtil.ThreadAction {
332
333        public RcvMemo(LocoNetMessage msg, LnTrafficController trafficController) {
334            thisMsg = msg;
335            thisTc = trafficController;
336        }
337        LocoNetMessage thisMsg;
338        LnTrafficController thisTc;
339
340        /**
341         * {@inheritDoc}
342         */
343        @Override
344        public void run() {
345            thisTc.notify(thisMsg);
346        }
347    }
348
349    /**
350     * Captive class to handle transmission.
351     */
352    class XmtHandler implements Runnable {
353
354        /**
355         * Loops forever, looking for message to send and processing them.
356         */
357        @Override
358        public void run() {
359
360            while (!threadStopRequest) {   // loop until asked to stop
361                // any input?
362                try {
363                    // get content; blocks until present
364                    log.trace("check for input"); // NOI18N
365
366                    byte msg[] = xmtList.take();
367
368                    // input - now send
369                    try {
370                        if (ostream != null) {
371                            if (log.isDebugEnabled()) { // avoid work if not needed
372                                if (isXmtBusy()) log.debug("LocoNet port not ready to receive"); // NOI18N
373                                log.debug("start write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N
374                            }
375                            ostream.write(msg);
376                            ostream.flush();
377                            if (log.isTraceEnabled()) { // avoid String building if not needed
378                                log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N
379                            }
380                            messageTransmitted(msg);
381                        } else {
382                            // no stream connected
383                            log.warn("sendLocoNetMessage: no connection established"); // NOI18N
384                        }
385                    } catch (java.io.IOException e) {
386                        log.warn("sendLocoNetMessage: IOException: {}", e.toString()); // NOI18N
387                    }
388                } catch (InterruptedException ie) {
389                    return; // ending the thread
390                } catch (RuntimeException rt) {
391                    log.error("Exception on take() call", rt);
392                }
393            }
394        }
395    }
396
397    /**
398     * When a message is finally transmitted, forward it to listeners if echoing
399     * is needed.
400     *
401     * @param msg message sent
402     */
403    protected void messageTransmitted(byte[] msg) {
404        log.debug("message transmitted (echo {})", echo);
405        if (!echo) {
406            return;
407        }
408        // message is queued for transmit, echo it when needed
409        // return a notification via the queue to ensure end
410        javax.swing.SwingUtilities.invokeLater(new Echo(this, new LocoNetMessage(msg)));
411    }
412
413    static class Echo implements Runnable {
414
415        Echo(LnPacketizer t, LocoNetMessage m) {
416            myTc = t;
417            msgForLater = m;
418        }
419        LocoNetMessage msgForLater;
420        LnPacketizer myTc;
421
422        /**
423         * {@inheritDoc}
424         */
425        @Override
426        public void run() {
427            myTc.notify(msgForLater);
428        }
429    }
430
431    /**
432     * Invoked at startup to start the threads needed here.
433     */
434    public void startThreads() {
435        int priority = Thread.currentThread().getPriority();
436        log.debug("startThreads current priority = {} max available = {} default = {} min available = {}", // NOI18N
437                priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY);
438
439        // start the RcvHandler in a thread of its own
440        if (rcvHandler == null) {
441            rcvHandler = new RcvHandler(this);
442        }
443        rcvThread = new Thread(rcvHandler, "LocoNet receive handler"); // NOI18N
444        rcvThread.setDaemon(true);
445        rcvThread.setPriority(Thread.MAX_PRIORITY);
446        rcvThread.start();
447
448        if (xmtHandler == null) {
449            xmtHandler = new XmtHandler();
450        }
451        // make sure that the xmt priority is no lower than the current priority
452        int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY);
453        // start the XmtHandler in a thread of its own
454        if (xmtThread == null) {
455            xmtThread = new Thread(xmtHandler, "LocoNet transmit handler"); // NOI18N
456        }
457        log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N
458        xmtThread.setDaemon(true);
459        xmtThread.setPriority(Thread.MAX_PRIORITY - 1);
460        xmtThread.start();
461
462        log.info("lnPacketizer Started");
463    }
464
465    protected Thread rcvThread;
466    protected Thread xmtThread;
467
468    /**
469     * {@inheritDoc}
470     */
471    @SuppressWarnings("deprecation") // Thread.stop()
472    @Override
473    public void dispose() {
474        if (xmtThread != null) {
475            xmtThread.interrupt();
476            try {
477                xmtThread.join();
478            } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);}
479        }
480        if (rcvThread != null) {
481            rcvThread.interrupt();
482            try {
483                rcvThread.join();
484            } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);}
485        }
486        super.dispose();
487    }
488
489    /**
490     * Terminate the receive and transmit threads.
491     * <p>
492     * This is intended to be used only by testing subclasses.
493     */
494    public void terminateThreads() {
495        threadStopRequest = true;
496        if (xmtThread != null) {
497            xmtThread.interrupt();
498            try {
499                xmtThread.join();
500            } catch (InterruptedException ie){
501                // interrupted during cleanup.
502            }
503        }
504
505        if (rcvThread != null) {
506            rcvThread.interrupt();
507            try {
508                rcvThread.join();
509            } catch (InterruptedException ie){
510                // interrupted during cleanup.
511            }
512        }
513    }
514
515    /**
516     * Flag that threads should terminate as soon as they can.
517     */
518    protected volatile boolean threadStopRequest = false;
519
520    private final static Logger log = LoggerFactory.getLogger(LnPacketizer.class);
521
522}