001package jmri.jmrix.dccpp.dccppovertcp;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.io.BufferedReader;
005import java.io.InputStreamReader;
006import java.util.LinkedList;
007import java.util.NoSuchElementException;
008import jmri.jmrix.dccpp.DCCppCommandStation;
009import jmri.jmrix.dccpp.DCCppListener;
010import jmri.jmrix.dccpp.DCCppMessage;
011import jmri.jmrix.dccpp.DCCppNetworkPortController;
012import jmri.jmrix.dccpp.DCCppPacketizer;
013import jmri.jmrix.dccpp.DCCppReply;
014import org.slf4j.Logger;
015import org.slf4j.LoggerFactory;
016
017import javax.annotation.concurrent.GuardedBy;
018
019/**
020 * Converts Stream-based I/O to/from DCC++ messages. The "DCCppInterface" side
021 * sends/receives DCCppMessage objects. The connection to a
022 * DCCppPortnetworkController is via a pair of *Streams, which then carry
023 * sequences of characters for transmission.
024 * <p>
025 * Messages come to this via the main GUI thread, and are forwarded back to
026 * listeners in that same thread. Reception and transmission are handled in
027 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal
028 * classes defined here. The thread priorities are:
029 * <ul>
030 * <li> RcvHandler - at highest available priority
031 * <li> XmtHandler - down one, which is assumed to be above the GUI
032 * <li> (everything else)
033 * </ul>
034 *
035 * @author Bob Jacobsen Copyright (C) 2001
036 * @author Alex Shepherd Copyright (C) 2003, 2006
037 * @author Mark Underwood Copyright (C) 2015
038 *
039 * Based on jmri.jmrix.loconet.loconetovertcp.LnOverTcpPacketizer
040 *
041 */
042// TODO: Consider ditching the LocoNet-inherited "RECEIVE" and "SEND" prefixes
043// and just rely on the already-present "<" and ">" to mark start and end
044// of frame.  This would pretty much make DCCppOverTCP redundant with the
045// Network Port interface to the Base Station (that is, the "host" JMRI
046// application would look just like a Network Base Station to the "client" JMRI
047// application).
048//
049// However, at minimum, this would break backward compatibility for the interface,
050// so there is that to consider.  Probably best to do this sooner than later,
051// to minimize that impact.
052//
053public class DCCppOverTcpPacketizer extends DCCppPacketizer {
054
055    static final String OLD_RECEIVE_PREFIX = "RECEIVE ";
056    static final String OLD_SEND_PREFIX = "SEND";
057    static final String RECEIVE_PREFIX = "<";
058    static final String SEND_PREFIX = ""; // Making this an empty string on purpose.
059    static final String OLD_SERVER_VERSION_STRING = "VERSION JMRI Server "; // CAREFUL: Changing this could break backward compatibility
060    static final String NEW_SERVER_VERSION_STRING = "VERSION DCC++ Server ";
061
062    boolean useOldPrefix = false;
063
064    protected BufferedReader istreamReader = null;
065
066    /**
067     * XmtHandler (a local class) object to implement the transmit thread
068     */
069    @GuardedBy ("xmtHandler")
070    final protected Runnable xmtHandler;
071
072    /**
073     * RcvHandler (a local class) object to implement the receive thread
074     */
075    protected Runnable rcvHandler;
076
077    /**
078     * Synchronized list used as a transmit queue.
079     */
080    @GuardedBy ("xmtHandler")
081    protected LinkedList<DCCppMessage> xmtList = new LinkedList<>();
082
083    @SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", justification = "Only used during system initialization")
084    public DCCppOverTcpPacketizer(DCCppCommandStation cs) {
085        super(cs); // Don't need the command station (?)
086
087        xmtHandler = new XmtHandler();
088        rcvHandler = new RcvHandler(this);
089        log.debug("DCCppOverTcpPacketizer created.");
090    }
091
092    public DCCppNetworkPortController networkController = null;
093
094    public boolean isXmtBusy() {
095        return networkController != null;
096    }
097
098    /**
099     * Make connection to existing DCCppNetworkPortController object.
100     *
101     * @param p Port networkController for connected. Save this for a later
102     *          disconnect call
103     */
104    public void connectPort(DCCppNetworkPortController p) {
105        istream = p.getInputStream();
106        istreamReader = new BufferedReader(new InputStreamReader(istream));
107        ostream = p.getOutputStream();
108        if (networkController != null) {
109            log.warn("connectPort: connect called while connected");
110        }
111        networkController = p;
112    }
113
114    /**
115     * Break connection to existing DCCppNetworkPortController object. Once broken,
116     * attempts to send via "message" member will fail.
117     *
118     * @param p previously connected port
119     */
120    public void disconnectPort(DCCppNetworkPortController p) {
121        istream = null;
122        ostream = null;
123        if (networkController != p) {
124            log.warn("disconnectPort: disconnect called from non-connected DCCppNetworkPortController");
125        }
126        networkController = null;
127    }
128
129    /**
130     * Forward a preformatted DCCppMessage to the actual interface.
131     *
132     * Checksum is computed and overwritten here, then the message is converted
133     * to a byte array and queue for transmission
134     *
135     * @param m Message to send; will be updated with CRC
136     */
137    @Override
138    public void sendDCCppMessage(DCCppMessage m, DCCppListener reply) {
139        // update statistics
140        //transmittedMsgCount++;
141
142        log.debug("queue DCCpp packet: {}", m);
143        // in an atomic operation, queue the request and wake the xmit thread
144        try {
145            synchronized (xmtHandler) {
146                xmtList.addLast(m);
147                xmtHandler.notifyAll();
148            }
149        } catch (Exception e) {
150            log.warn("passing to xmit: unexpected exception: ", e);
151        }
152    }
153
154    /**
155     * Invoked at startup to start the threads needed here.
156     */
157    public void startThreads() {
158        int priority = Thread.currentThread().getPriority();
159        log.debug("startThreads current priority = {} max available {} default = {} min available = {}",
160                priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY);
161
162        // make sure that the xmt priority is no lower than the current priority
163        int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY);
164        // start the XmtHandler in a thread of its own
165        Thread xmtThread;
166        synchronized (xmtHandler) { // never null at this point
167            xmtThread = new Thread(xmtHandler, "DCC++ transmit handler");
168        }
169        log.debug("Xmt thread starts at priority {}", xmtpriority);
170        xmtThread.setDaemon(true);
171        xmtThread.setPriority(Thread.MAX_PRIORITY - 1);
172        xmtThread.start();
173
174        // start the RcvHandler in a thread of its own
175        if (rcvHandler == null) {
176            rcvHandler = new RcvHandler(this);
177        }
178        Thread rcvThread = new Thread(rcvHandler, "DCC++ receive handler");
179        rcvThread.setDaemon(true);
180        rcvThread.setPriority(Thread.MAX_PRIORITY);
181        rcvThread.start();
182    }
183
184    /**
185     * Captive class to handle incoming characters. This is a permanent loop,
186     * looking for input messages in character form on the stream connected to
187     * the DCCppNetworkPortController via <code>connectPort</code>.
188     */
189    class RcvHandler implements Runnable {
190
191        /**
192         * nothing to do here
193         *
194         * @param lt the DCCppOverTcpPacketizer trafficController to run over
195         */
196        public RcvHandler(DCCppOverTcpPacketizer lt) {
197        }
198
199        // readline is deprecated, but there are no problems
200        // with multi-byte characters here.
201        @Override
202        public void run() {
203
204            String rxLine;
205            while (true) {   // loop permanently, program close will exit
206                try {
207                    // start by looking for a complete line
208
209                    if (istreamReader == null) {
210                        log.error("istreamReader not initialized!");
211                        return;
212                    }
213                    rxLine = istreamReader.readLine(); // Note: This uses BufferedReader for safer data handling
214                    if (rxLine == null) {
215                        log.warn("run: input stream returned null, exiting loop");
216                        return;
217                    }
218
219                    log.debug("Received: {}", rxLine);
220
221                    // Legacy support. If this message is the old JMRI version
222                    // handshake, flag us as in "old mode"
223                    if (rxLine.startsWith(OLD_SERVER_VERSION_STRING)) {
224                        useOldPrefix = true;
225                    }
226
227                    // Legacy support. If the old receive prefix is present
228                    // remove it.
229                    if (rxLine.startsWith(OLD_RECEIVE_PREFIX)) {
230                        final int trim = OLD_RECEIVE_PREFIX.length();
231                        rxLine = rxLine.substring(trim);
232                    }
233
234                    if (!rxLine.startsWith(RECEIVE_PREFIX)) {
235                        // Not a valid Tcp packet
236                        log.debug("Wrong Prefix: {}", rxLine);
237                        continue;
238                    }
239
240                    // Strip the prefix off.
241                    //final int trim = RECEIVE_PREFIX.length();
242                    //rxLine = rxLine.substring(trim);
243
244                    int firstidx = rxLine.indexOf("<");
245                    int lastidx = rxLine.lastIndexOf(">");
246                    log.debug("String {} Index1 {} Index 2{}", rxLine, firstidx, lastidx);
247
248                    // BUG FIX: Incoming DCCppOverTCP messages are already formatted for DCC++ and don't
249                    // need to be parsed. Indeed, trying to parse them will screw them up.
250                    // So instead, we de-deprecated the string constructor so that we can
251                    // directly create a DCCppReply from the incoming string without translation/parsing.
252
253                    //  Note: the substring call below also strips off the "< >"
254                    DCCppReply msg = DCCppReply.parseDCCppReply(rxLine.substring(rxLine.indexOf("<") + 1,
255                                                                                 rxLine.lastIndexOf(">")));
256
257                    if (!msg.isValidReplyFormat()) {
258                        log.warn("Invalid Reply Format: {}", msg.toString());
259                        continue;
260                    }
261                    // message is complete, dispatch it !!
262                    log.debug("queue reply for notification");
263
264                    final DCCppReply thisMsg = msg;
265                    //final DCCppPacketizer thisTc = trafficController;
266                    // return a notification via the queue to ensure end
267                    Runnable r = new Runnable() {
268                            final DCCppReply msgForLater = thisMsg;
269
270                            @Override
271                            public void run() {
272                                notifyReply(msgForLater, null);
273                            }
274                        };
275                    javax.swing.SwingUtilities.invokeLater(r);
276                    // done with this one
277                    //} catch (DCCppMessageException e) {
278                    // just let it ride for now
279                    //  log.warn("run: unexpected DCCppMessageException: ", e);
280                } catch (java.io.EOFException e) {
281                    // posted from idle port when enableReceiveTimeout used
282                    log.debug("EOFException, is DCC++ serial I/O using timeouts?");
283                } catch (java.io.IOException e) {
284                    // fired when write-end of HexFile reaches end
285                    log.debug("IOException, should only happen with HexFile: ", e);
286                    log.info("End of file");
287                    //  disconnectPort(networkController);
288                    return;
289                } // normally, we don't catch the unnamed Exception, but in this
290                // permanently running loop it seems wise.
291                catch (Exception e) {
292                    log.warn("run: unexpected Exception: ", e);
293                }
294            } // end of permanent loop
295        }
296    }
297
298    /**
299     * Captive class to handle transmission.
300     */
301    class XmtHandler implements Runnable {
302
303        @Override
304        public void run() {
305
306            while (!threadStopRequest) {   // loop until asked to stop
307                // any input?
308                try {
309                    // get content; failure is a NoSuchElementException
310                    log.debug("check for input");
311                    DCCppMessage msg;
312                    synchronized (xmtHandler) {
313                        msg = xmtList.removeFirst();
314                    }
315
316                    // input - now send
317                    try {
318                        if (ostream != null) {
319                            //Commented out as the original LnPortnetworkController always returned true.
320                            //if (!networkController.okToSend()) log.warn(DCCpp port not ready to receive"); // TCP, not RS232, so message is a real warning
321                            log.debug("start write to network stream");
322                            StringBuilder packet = new StringBuilder(msg.length() + SEND_PREFIX.length() + 2);
323                            if (useOldPrefix) {
324                                packet.append(OLD_SEND_PREFIX);
325                            }
326                            packet.append("<").append(msg.toString()).append(">");
327                            if (log.isDebugEnabled()) { // avoid building a String when not needed
328                                log.debug("Write to LbServer: {}", packet.toString());
329                            }
330                            packet.append("\r\n");
331                            ostream.write(packet.toString().getBytes());
332                            ostream.flush();
333                            log.debug("end write to stream");
334                        } else {
335                            // no stream connected
336                            log.warn("sendDCCppMessage: no connection established");
337                        }
338                    } catch (java.io.IOException e) {
339                        log.warn("sendDCCppMessage: IOException: {}", e.toString());
340                    }
341                } catch (NoSuchElementException e) {
342                    // message queue was empty, wait for input
343                    log.debug("start wait");
344
345                    new jmri.util.WaitHandler(this);  // handle synchronization, spurious wake, interruption
346
347                    log.debug("end wait");
348                }
349            }
350        }
351    }
352
353    /**
354     * Terminate the receive and transmit threads.
355     * <p>
356     * This is intended to be used only by testing subclasses.
357     */
358    @Override
359    public void terminateThreads() {
360        threadStopRequest = true;
361        if (xmtThread != null) {
362            xmtThread.interrupt();
363            try {
364                xmtThread.join();
365            } catch (InterruptedException ie){
366                // interrupted during cleanup.
367            }
368        }
369
370        if (rcvThread != null) {
371            rcvThread.interrupt();
372            try {
373                rcvThread.join();
374            } catch (InterruptedException ie){
375                // interrupted during cleanup.
376            }
377        }
378    }
379
380    private final static Logger log = LoggerFactory.getLogger(DCCppOverTcpPacketizer.class);
381
382}