001package jmri.jmrix.loconet;
002
003import java.io.DataInputStream;
004import java.io.OutputStream;
005import java.util.concurrent.LinkedTransferQueue;
006import org.slf4j.Logger;
007import org.slf4j.LoggerFactory;
008
009/**
010 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface"
011 * side sends/receives LocoNetMessage objects. The connection to a
012 * LnPortController is via a pair of *Streams, which then carry sequences of
013 * characters for transmission.
014 * <p>
015 * Messages come to this via the main GUI thread, and are forwarded back to
016 * listeners in that same thread. Reception and transmission are handled in
017 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal
018 * classes defined here. The thread priorities are:
019 * <ul>
020 *   <li> RcvHandler - at highest available priority
021 *   <li> XmtHandler - down one, which is assumed to be above the GUI
022 *   <li> (everything else)
023 * </ul>
024 * Some of the message formats used in this class are Copyright Digitrax, Inc.
025 * and used with permission as part of the JMRI project. That permission does
026 * not extend to uses in other software products. If you wish to use this code,
027 * algorithm or these message formats outside of JMRI, please contact Digitrax
028 * Inc for separate permission.
029 *
030 * @author Bob Jacobsen Copyright (C) 2001, 2018
031 * @author B. Milhaupt  Copyright (C) 2020
032 */
033public class LnPacketizer extends LnTrafficController {
034
035    /**
036     * True if the external hardware is not echoing messages, so we must.
037     */
038    protected boolean echo = false;  // true = echo messages here, instead of in hardware
039
040    public LnPacketizer(LocoNetSystemConnectionMemo m) {
041        // set the memo to point here
042        memo = m;
043        m.setLnTrafficController(this);
044    }
045
046    // The methods to implement the LocoNetInterface
047
048    /**
049     * {@inheritDoc}
050     */
051    @Override
052    public boolean status() {
053        boolean returnVal = ( ostream != null && istream != null
054                && xmtThread != null && xmtThread.isAlive() && xmtHandler != null
055                && rcvThread != null && rcvThread.isAlive() && rcvHandler != null
056                );
057        return returnVal;
058    }
059
060    /**
061     * Synchronized list used as a transmit queue.
062     */
063    protected LinkedTransferQueue<byte[]> xmtList = new LinkedTransferQueue<>();
064
065    /**
066     * XmtHandler (a local class) object to implement the transmit thread.
067     * <p>
068     * We create this object in startThreads() as each packetizer uses different handlers.
069     * So long as the object is created before using it to sync it works.
070     *
071     */
072    protected Runnable xmtHandler = null;
073
074    /**
075     * RcvHandler (a local class) object to implement the receive thread
076     */
077    protected Runnable rcvHandler;
078
079    /**
080     * Forward a preformatted LocoNetMessage to the actual interface.
081     * <p>
082     * Checksum is computed and overwritten here, then the message is converted
083     * to a byte array and queued for transmission.
084     *
085     * @param m Message to send; will be updated with CRC
086     */
087    @Override
088    public void sendLocoNetMessage(LocoNetMessage m) {
089
090        // update statistics
091        transmittedMsgCount++;
092
093        // set the error correcting code byte(s) before transmittal
094        m.setParity();
095
096        // stream to port in single write, as that's needed by serial
097        int len = m.getNumDataElements();
098        byte msg[] = new byte[len];
099        for (int i = 0; i < len; i++) {
100            msg[i] = (byte) m.getElement(i);
101        }
102
103        log.debug("queue LocoNet packet: {}", m);
104        // We need to queue the request and wake the xmit thread in an atomic operation
105        // But the thread might not be running, in which case the request is just
106        // queued up.
107        try {
108            xmtList.add(msg);
109        } catch (RuntimeException e) {
110            log.warn("passing to xmit: unexpected exception: ", e);
111        }
112    }
113
114    /**
115     * Implement abstract method to signal if there's a backlog of information
116     * waiting to be sent.
117     *
118     * @return true if busy, false if nothing waiting to send
119     */
120    @Override
121    public boolean isXmtBusy() {
122        if (controller == null) {
123            return false;
124        }
125
126        return (!controller.okToSend());
127    }
128
129    // methods to connect/disconnect to a source of data in a LnPortController
130
131    protected LnPortController controller = null;
132
133    /**
134     * Make connection to an existing LnPortController object.
135     *
136     * @param p Port controller for connected. Save this for a later disconnect
137     *          call
138     */
139    public void connectPort(LnPortController p) {
140        istream = p.getInputStream();
141        ostream = p.getOutputStream();
142        if (controller != null) {
143            log.warn("connectPort: connect called while connected");
144        }
145        controller = p;
146    }
147
148    /**
149     * Break connection to an existing LnPortController object. Once broken,
150     * attempts to send via "message" member will fail.
151     *
152     * @param p previously connected port
153     */
154    public void disconnectPort(LnPortController p) {
155        istream = null;
156        ostream = null;
157        if (controller != p) {
158            log.warn("disconnectPort: disconnect called from non-connected LnPortController");
159        }
160        controller = null;
161    }
162
163    // data members to hold the streams. These are public so the inner classes defined here
164    // can access them with a Java 1.1 compiler
165    public DataInputStream istream = null;
166    public OutputStream ostream = null;
167
168    /**
169     * Read a single byte, protecting against various timeouts, etc.
170     * <p>
171     * When a port is set to have a receive timeout (via the
172     * enableReceiveTimeout() method), some will return zero bytes or an
173     * EOFException at the end of the timeout. In that case, the read should be
174     * repeated to get the next real character.
175     *
176     * @param istream stream to read from
177     * @return buffer of received data
178     * @throws java.io.IOException failure during stream read
179     *
180     */
181    protected byte readByteProtected(DataInputStream istream) throws java.io.IOException {
182        while (true) { // loop will repeat until character found
183            int nchars;
184            nchars = istream.read(rcvBuffer, 0, 1);
185            if (nchars > 0) {
186                return rcvBuffer[0];
187            }
188        }
189    }
190    // Defined this way to reduce new object creation
191    private final byte[] rcvBuffer = new byte[1];
192
193    /**
194     * Captive class to handle incoming characters. This is a permanent loop,
195     * looking for input messages in character form on the stream connected to
196     * the LnPortController via <code>connectPort</code>.
197     */
198    protected class RcvHandler implements Runnable {
199
200        /**
201         * Remember the LnPacketizer object
202         */
203        LnTrafficController trafficController;
204
205        public RcvHandler(LnTrafficController lt) {
206            trafficController = lt;
207        }
208
209        /**
210         * Handle incoming characters. This is a permanent loop, looking for
211         * input messages in character form on the stream connected to the
212         * LnPortController via <code>connectPort</code>. Terminates with the
213         * input stream breaking out of the try block.
214         */
215        @Override
216        public void run() {
217
218            int opCode;
219            while (!threadStopRequest) {   // loop until asked to stop
220                try {
221                    // start by looking for command -  skip if bit not set
222                    while (((opCode = (readByteProtected(istream) & 0xFF)) & 0x80) == 0) { // the real work is in the loop check
223                        if (log.isTraceEnabled()) { // avoid building string
224                            log.trace("Skipping: {}", Integer.toHexString(opCode)); // NOI18N
225                        }
226                    }
227                    // here opCode is OK. Create output message
228                    if (log.isTraceEnabled()) { // avoid building string
229                        log.trace(" (RcvHandler) Start message with opcode: {}", Integer.toHexString(opCode)); // NOI18N
230                    }
231                    LocoNetMessage msg = null;
232                    while (msg == null) {
233                        try {
234                            // Capture 2nd byte, always present
235                            int byte2 = readByteProtected(istream) & 0xFF;
236                            if (log.isTraceEnabled()) { // avoid building string
237                                log.trace("Byte2: {}", Integer.toHexString(byte2)); // NOI18N
238                            }                            // Decide length
239                            int len = 2;
240                            switch ((opCode & 0x60) >> 5) {
241                                case 0:
242                                    /* 2 byte message */
243
244                                    len = 2;
245                                    break;
246
247                                case 1:
248                                    /* 4 byte message */
249
250                                    len = 4;
251                                    break;
252
253                                case 2:
254                                    /* 6 byte message */
255
256                                    len = 6;
257                                    break;
258
259                                case 3:
260                                    /* N byte message */
261
262                                    if (byte2 < 2) {
263                                        log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); // NOI18N
264                                    }
265                                    len = byte2;
266                                    break;
267                                default:
268                                    log.warn("Unhandled code: {}", (opCode & 0x60) >> 5);
269                                    break;
270                            }
271                            msg = new LocoNetMessage(len);
272                            // message exists, now fill it
273                            msg.setOpCode(opCode);
274                            msg.setElement(1, byte2);
275                            log.trace("len: {}", len); // NOI18N
276                            for (int i = 2; i < len; i++) {
277                                // check for message-blocking error
278                                int b = readByteProtected(istream) & 0xFF;
279                                if (log.isTraceEnabled()) {
280                                    log.trace("char {} is: {}", i, Integer.toHexString(b)); // NOI18N
281                                }
282                                if ((b & 0x80) != 0) {
283                                    log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); // NOI18N
284                                    opCode = b;
285                                    throw new LocoNetMessageException();
286                                }
287                                msg.setElement(i, b);
288                            }
289                        } catch (LocoNetMessageException e) {
290                            // retry by destroying the existing message
291                            // opCode is set for the newly-started packet
292                            msg = null;
293                        }
294                    }
295                    // check parity
296                    if (!msg.checkParity()) {
297                        log.warn("Ignore LocoNet packet with bad checksum: {}", msg);
298                        throw new LocoNetMessageException();
299                    }
300                    // message is complete, dispatch it !!
301                    {
302                        log.debug("queue message for notification: {}", msg);
303
304                        jmri.util.ThreadingUtil.runOnLayoutEventually(new RcvMemo(msg, trafficController));
305                    }
306
307                    // done with this one
308                } catch (LocoNetMessageException e) {
309                    // just let it ride for now
310                    log.warn("run: unexpected LocoNetMessageException", e); // NOI18N
311                } catch (java.io.EOFException e) {
312                    // posted from idle port when enableReceiveTimeout used
313                    log.trace("EOFException, is LocoNet serial I/O using timeouts?"); // NOI18N
314                } catch (java.io.IOException e) {
315                    // fired when write-end of HexFile reaches end
316                    log.debug("IOException, should only happen with HexFile", e); // NOI18N
317                    log.info("End of file"); // NOI18N
318                    disconnectPort(controller);
319                    return;
320                } // normally, we don't catch RuntimeException, but in this
321                  // permanently running loop it seems wise.
322                catch (RuntimeException e) {
323                    log.warn("run: unexpected Exception", e); // NOI18N
324                }
325            } // end of permanent loop
326        }
327    }
328
329    /**
330     * Captive class to notify of one message.
331     */
332    private static class RcvMemo implements jmri.util.ThreadingUtil.ThreadAction {
333
334        public RcvMemo(LocoNetMessage msg, LnTrafficController trafficController) {
335            thisMsg = msg;
336            thisTc = trafficController;
337        }
338        LocoNetMessage thisMsg;
339        LnTrafficController thisTc;
340
341        /**
342         * {@inheritDoc}
343         */
344        @Override
345        public void run() {
346            thisTc.notify(thisMsg);
347        }
348    }
349
350    /**
351     * Captive class to handle transmission.
352     */
353    class XmtHandler implements Runnable {
354
355        /**
356         * Loops forever, looking for message to send and processing them.
357         */
358        @Override
359        public void run() {
360
361            while (!threadStopRequest) {   // loop until asked to stop
362                // any input?
363                try {
364                    // get content; blocks until present
365                    log.trace("check for input"); // NOI18N
366
367                    byte msg[] = xmtList.take();
368
369                    // input - now send
370                    try {
371                        if (ostream != null) {
372                            if (log.isDebugEnabled()) { // avoid work if not needed
373                                if (isXmtBusy()) log.debug("LocoNet port not ready to receive"); // NOI18N
374                                log.debug("start write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N
375                            }
376                            ostream.write(msg);
377                            ostream.flush();
378                            if (log.isTraceEnabled()) { // avoid String building if not needed
379                                log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N
380                            }
381                            messageTransmitted(msg);
382                        } else {
383                            // no stream connected
384                            log.warn("sendLocoNetMessage: no connection established"); // NOI18N
385                        }
386                    } catch (java.io.IOException e) {
387                        log.warn("sendLocoNetMessage: IOException: {}", e.toString()); // NOI18N
388                    }
389                } catch (InterruptedException ie) {
390                    return; // ending the thread
391                } catch (RuntimeException rt) {
392                    log.error("Exception on take() call", rt);
393                }
394            }
395        }
396    }
397
398    /**
399     * When a message is finally transmitted, forward it to listeners if echoing
400     * is needed.
401     *
402     * @param msg message sent
403     */
404    protected void messageTransmitted(byte[] msg) {
405        log.debug("message transmitted (echo {})", echo);
406        if (!echo) {
407            return;
408        }
409        // message is queued for transmit, echo it when needed
410        // return a notification via the queue to ensure end
411        javax.swing.SwingUtilities.invokeLater(new Echo(this, new LocoNetMessage(msg)));
412    }
413
414    static class Echo implements Runnable {
415
416        Echo(LnPacketizer t, LocoNetMessage m) {
417            myTc = t;
418            msgForLater = m;
419        }
420        LocoNetMessage msgForLater;
421        LnPacketizer myTc;
422
423        /**
424         * {@inheritDoc}
425         */
426        @Override
427        public void run() {
428            myTc.notify(msgForLater);
429        }
430    }
431
432    /**
433     * Invoked at startup to start the threads needed here.
434     */
435    public void startThreads() {
436        int priority = Thread.currentThread().getPriority();
437        log.debug("startThreads current priority = {} max available = {} default = {} min available = {}", // NOI18N
438                priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY);
439
440        // start the RcvHandler in a thread of its own
441        if (rcvHandler == null) {
442            rcvHandler = new RcvHandler(this);
443        }
444        rcvThread = new Thread(rcvHandler, "LocoNet receive handler"); // NOI18N
445        rcvThread.setDaemon(true);
446        rcvThread.setPriority(Thread.MAX_PRIORITY);
447        rcvThread.start();
448
449        if (xmtHandler == null) {
450            xmtHandler = new XmtHandler();
451        }
452        // make sure that the xmt priority is no lower than the current priority
453        int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY);
454        // start the XmtHandler in a thread of its own
455        if (xmtThread == null) {
456            xmtThread = new Thread(xmtHandler, "LocoNet transmit handler"); // NOI18N
457        }
458        log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N
459        xmtThread.setDaemon(true);
460        xmtThread.setPriority(Thread.MAX_PRIORITY - 1);
461        xmtThread.start();
462
463        log.info("lnPacketizer Started");
464    }
465
466    protected Thread rcvThread;
467    protected Thread xmtThread;
468
469    /**
470     * {@inheritDoc}
471     */
472    @SuppressWarnings("deprecation") // Thread.stop()
473    @Override
474    public void dispose() {
475        if (xmtThread != null) {
476            xmtThread.stop(); // interrupt not sufficient?
477            try {
478                xmtThread.join();
479            } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);}
480        }
481        if (rcvThread != null) {
482            rcvThread.stop(); // interrupt not sufficient with the previous serial library. Not known if OK with SerialComm?
483            try {
484                rcvThread.join();
485            } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);}
486        }
487        super.dispose();
488    }
489
490    /**
491     * Terminate the receive and transmit threads.
492     * <p>
493     * This is intended to be used only by testing subclasses.
494     */
495    public void terminateThreads() {
496        threadStopRequest = true;
497        if (xmtThread != null) {
498            xmtThread.interrupt();
499            try {
500                xmtThread.join();
501            } catch (InterruptedException ie){
502                // interrupted during cleanup.
503            }
504        }
505
506        if (rcvThread != null) {
507            rcvThread.interrupt();
508            try {
509                rcvThread.join();
510            } catch (InterruptedException ie){
511                // interrupted during cleanup.
512            }
513        }
514    }
515
516    /**
517     * Flag that threads should terminate as soon as they can.
518     */
519    protected volatile boolean threadStopRequest = false;
520
521    private final static Logger log = LoggerFactory.getLogger(LnPacketizer.class);
522
523}