001package jmri.jmrix.dccpp.dccppovertcp; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.io.BufferedReader; 005import java.io.InputStreamReader; 006import java.util.LinkedList; 007import java.util.NoSuchElementException; 008import jmri.jmrix.dccpp.DCCppCommandStation; 009import jmri.jmrix.dccpp.DCCppListener; 010import jmri.jmrix.dccpp.DCCppMessage; 011import jmri.jmrix.dccpp.DCCppNetworkPortController; 012import jmri.jmrix.dccpp.DCCppPacketizer; 013import jmri.jmrix.dccpp.DCCppReply; 014import org.slf4j.Logger; 015import org.slf4j.LoggerFactory; 016 017import javax.annotation.concurrent.GuardedBy; 018 019/** 020 * Converts Stream-based I/O to/from DCC++ messages. The "DCCppInterface" side 021 * sends/receives DCCppMessage objects. The connection to a 022 * DCCppPortnetworkController is via a pair of *Streams, which then carry 023 * sequences of characters for transmission. 024 * <p> 025 * Messages come to this via the main GUI thread, and are forwarded back to 026 * listeners in that same thread. Reception and transmission are handled in 027 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal 028 * classes defined here. The thread priorities are: 029 * <ul> 030 * <li> RcvHandler - at highest available priority 031 * <li> XmtHandler - down one, which is assumed to be above the GUI 032 * <li> (everything else) 033 * </ul> 034 * 035 * @author Bob Jacobsen Copyright (C) 2001 036 * @author Alex Shepherd Copyright (C) 2003, 2006 037 * @author Mark Underwood Copyright (C) 2015 038 * 039 * Based on jmri.jmrix.loconet.loconetovertcp.LnOverTcpPacketizer 040 * 041 */ 042// TODO: Consider ditching the LocoNet-inherited "RECEIVE" and "SEND" prefixes 043// and just rely on the already-present "<" and ">" to mark start and end 044// of frame. This would pretty much make DCCppOverTCP redundant with the 045// Network Port interface to the Base Station (that is, the "host" JMRI 046// application would look just like a Network Base Station to the "client" JMRI 047// application). 048// 049// However, at minimum, this would break backward compatibility for the interface, 050// so there is that to consider. Probably best to do this sooner than later, 051// to minimize that impact. 052// 053public class DCCppOverTcpPacketizer extends DCCppPacketizer { 054 055 static final String OLD_RECEIVE_PREFIX = "RECEIVE "; 056 static final String OLD_SEND_PREFIX = "SEND"; 057 static final String RECEIVE_PREFIX = "<"; 058 static final String SEND_PREFIX = ""; // Making this an empty string on purpose. 059 static final String OLD_SERVER_VERSION_STRING = "VERSION JMRI Server "; // CAREFUL: Changing this could break backward compatibility 060 static final String NEW_SERVER_VERSION_STRING = "VERSION DCC++ Server "; 061 062 boolean useOldPrefix = false; 063 064 protected BufferedReader istreamReader = null; 065 066 /** 067 * XmtHandler (a local class) object to implement the transmit thread 068 */ 069 @GuardedBy ("xmtHandler") 070 final protected Runnable xmtHandler; 071 072 /** 073 * RcvHandler (a local class) object to implement the receive thread 074 */ 075 protected Runnable rcvHandler; 076 077 /** 078 * Synchronized list used as a transmit queue. 079 */ 080 @GuardedBy ("xmtHandler") 081 protected LinkedList<DCCppMessage> xmtList = new LinkedList<>(); 082 083 @SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", justification = "Only used during system initialization") 084 public DCCppOverTcpPacketizer(DCCppCommandStation cs) { 085 super(cs); // Don't need the command station (?) 086 087 xmtHandler = new XmtHandler(); 088 rcvHandler = new RcvHandler(this); 089 log.debug("DCCppOverTcpPacketizer created."); 090 } 091 092 public DCCppNetworkPortController networkController = null; 093 094 public boolean isXmtBusy() { 095 return networkController != null; 096 } 097 098 /** 099 * Make connection to existing DCCppNetworkPortController object. 100 * 101 * @param p Port networkController for connected. Save this for a later 102 * disconnect call 103 */ 104 public void connectPort(DCCppNetworkPortController p) { 105 istream = p.getInputStream(); 106 istreamReader = new BufferedReader(new InputStreamReader(istream)); 107 ostream = p.getOutputStream(); 108 if (networkController != null) { 109 log.warn("connectPort: connect called while connected"); 110 } 111 networkController = p; 112 } 113 114 /** 115 * Break connection to existing DCCppNetworkPortController object. Once broken, 116 * attempts to send via "message" member will fail. 117 * 118 * @param p previously connected port 119 */ 120 public void disconnectPort(DCCppNetworkPortController p) { 121 istream = null; 122 ostream = null; 123 if (networkController != p) { 124 log.warn("disconnectPort: disconnect called from non-connected DCCppNetworkPortController"); 125 } 126 networkController = null; 127 } 128 129 /** 130 * Forward a preformatted DCCppMessage to the actual interface. 131 * 132 * Checksum is computed and overwritten here, then the message is converted 133 * to a byte array and queue for transmission 134 * 135 * @param m Message to send; will be updated with CRC 136 */ 137 @Override 138 public void sendDCCppMessage(DCCppMessage m, DCCppListener reply) { 139 // update statistics 140 //transmittedMsgCount++; 141 142 log.debug("queue DCCpp packet: {}", m); 143 // in an atomic operation, queue the request and wake the xmit thread 144 try { 145 synchronized (xmtHandler) { 146 xmtList.addLast(m); 147 xmtHandler.notifyAll(); 148 } 149 } catch (Exception e) { 150 log.warn("passing to xmit: unexpected exception: ", e); 151 } 152 } 153 154 /** 155 * Invoked at startup to start the threads needed here. 156 */ 157 public void startThreads() { 158 int priority = Thread.currentThread().getPriority(); 159 log.debug("startThreads current priority = {} max available {} default = {} min available = {}", 160 priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY); 161 162 // make sure that the xmt priority is no lower than the current priority 163 int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY); 164 // start the XmtHandler in a thread of its own 165 Thread xmtThread; 166 synchronized (xmtHandler) { // never null at this point 167 xmtThread = new Thread(xmtHandler, "DCC++ transmit handler"); 168 } 169 log.debug("Xmt thread starts at priority {}", xmtpriority); 170 xmtThread.setDaemon(true); 171 xmtThread.setPriority(Thread.MAX_PRIORITY - 1); 172 xmtThread.start(); 173 174 // start the RcvHandler in a thread of its own 175 if (rcvHandler == null) { 176 rcvHandler = new RcvHandler(this); 177 } 178 Thread rcvThread = new Thread(rcvHandler, "DCC++ receive handler"); 179 rcvThread.setDaemon(true); 180 rcvThread.setPriority(Thread.MAX_PRIORITY); 181 rcvThread.start(); 182 } 183 184 /** 185 * Captive class to handle incoming characters. This is a permanent loop, 186 * looking for input messages in character form on the stream connected to 187 * the DCCppNetworkPortController via <code>connectPort</code>. 188 */ 189 class RcvHandler implements Runnable { 190 191 /** 192 * Remember the DCCppPacketizer object. 193 * 194 * @param lt the DCCppOverTcpPacketizer trafficController to run over 195 */ 196 public RcvHandler(DCCppOverTcpPacketizer lt) { 197 //trafficController = lt; 198 } 199 //DCCppOverTcpPacketizer trafficController; 200 201 // readline is deprecated, but there are no problems 202 // with multi-byte characters here. 203 @Override 204 public void run() { 205 206 String rxLine; 207 while (true) { // loop permanently, program close will exit 208 try { 209 // start by looking for a complete line 210 211 if (istreamReader == null) { 212 log.error("istreamReader not initialized!"); 213 return; 214 } 215 rxLine = istreamReader.readLine(); // Note: This uses BufferedReader for safer data handling 216 if (rxLine == null) { 217 log.warn("run: input stream returned null, exiting loop"); 218 return; 219 } 220 221 log.debug("Received: {}", rxLine); 222 223 // Legacy support. If this message is the old JMRI version 224 // handshake, flag us as in "old mode" 225 if (rxLine.startsWith(OLD_SERVER_VERSION_STRING)) { 226 useOldPrefix = true; 227 } 228 229 // Legacy support. If the old receive prefix is present 230 // remove it. 231 if (rxLine.startsWith(OLD_RECEIVE_PREFIX)) { 232 final int trim = OLD_RECEIVE_PREFIX.length(); 233 rxLine = rxLine.substring(trim); 234 } 235 236 if (!rxLine.startsWith(RECEIVE_PREFIX)) { 237 // Not a valid Tcp packet 238 log.debug("Wrong Prefix: {}", rxLine); 239 continue; 240 } 241 242 // Strip the prefix off. 243 //final int trim = RECEIVE_PREFIX.length(); 244 //rxLine = rxLine.substring(trim); 245 246 int firstidx = rxLine.indexOf("<"); 247 int lastidx = rxLine.lastIndexOf(">"); 248 log.debug("String {} Index1 {} Index 2{}", rxLine, firstidx, lastidx); 249 250 // BUG FIX: Incoming DCCppOverTCP messages are already formatted for DCC++ and don't 251 // need to be parsed. Indeed, trying to parse them will screw them up. 252 // So instead, we de-deprecated the string constructor so that we can 253 // directly create a DCCppReply from the incoming string without translation/parsing. 254 255 // Note: the substring call below also strips off the "< >" 256 DCCppReply msg = DCCppReply.parseDCCppReply(rxLine.substring(rxLine.indexOf("<") + 1, 257 rxLine.lastIndexOf(">"))); 258 //DCCppReply msg = new DCCppReply(rxLine.substring(rxLine.indexOf("<") + 1, 259 // rxLine.lastIndexOf(">"))); 260 261 if (!msg.isValidReplyFormat()) { 262 log.warn("Invalid Reply Format: {}", msg.toString()); 263 continue; 264 } 265 // message is complete, dispatch it !! 266 log.debug("queue reply for notification"); 267 268 final DCCppReply thisMsg = msg; 269 //final DCCppPacketizer thisTc = trafficController; 270 // return a notification via the queue to ensure end 271 Runnable r = new Runnable() { 272 final DCCppReply msgForLater = thisMsg; 273 274 @Override 275 public void run() { 276 notifyReply(msgForLater, null); 277 } 278 }; 279 javax.swing.SwingUtilities.invokeLater(r); 280 // done with this one 281 //} catch (DCCppMessageException e) { 282 // just let it ride for now 283 // log.warn("run: unexpected DCCppMessageException: ", e); 284 } catch (java.io.EOFException e) { 285 // posted from idle port when enableReceiveTimeout used 286 log.debug("EOFException, is DCC++ serial I/O using timeouts?"); 287 } catch (java.io.IOException e) { 288 // fired when write-end of HexFile reaches end 289 log.debug("IOException, should only happen with HexFile: ", e); 290 log.info("End of file"); 291 // disconnectPort(networkController); 292 return; 293 } // normally, we don't catch the unnamed Exception, but in this 294 // permanently running loop it seems wise. 295 catch (Exception e) { 296 log.warn("run: unexpected Exception: ", e); 297 } 298 } // end of permanent loop 299 } 300 } 301 302 /** 303 * Captive class to handle transmission. 304 */ 305 class XmtHandler implements Runnable { 306 307 @Override 308 public void run() { 309 310 while (!threadStopRequest) { // loop until asked to stop 311 // any input? 312 try { 313 // get content; failure is a NoSuchElementException 314 log.debug("check for input"); 315 DCCppMessage msg; 316 synchronized (xmtHandler) { 317 msg = xmtList.removeFirst(); 318 } 319 320 // input - now send 321 try { 322 if (ostream != null) { 323 //Commented out as the original LnPortnetworkController always returned true. 324 //if (!networkController.okToSend()) log.warn(DCCpp port not ready to receive"); // TCP, not RS232, so message is a real warning 325 log.debug("start write to network stream"); 326 StringBuilder packet = new StringBuilder(msg.length() + SEND_PREFIX.length() + 2); 327 if (useOldPrefix) { 328 packet.append(OLD_SEND_PREFIX); 329 } 330 packet.append("<").append(msg.toString()).append(">"); 331 if (log.isDebugEnabled()) { // avoid building a String when not needed 332 log.debug("Write to LbServer: {}", packet.toString()); 333 } 334 packet.append("\r\n"); 335 ostream.write(packet.toString().getBytes()); 336 ostream.flush(); 337 log.debug("end write to stream"); 338 } else { 339 // no stream connected 340 log.warn("sendDCCppMessage: no connection established"); 341 } 342 } catch (java.io.IOException e) { 343 log.warn("sendDCCppMessage: IOException: {}", e.toString()); 344 } 345 } catch (NoSuchElementException e) { 346 // message queue was empty, wait for input 347 log.debug("start wait"); 348 349 new jmri.util.WaitHandler(this); // handle synchronization, spurious wake, interruption 350 351 log.debug("end wait"); 352 } 353 } 354 } 355 } 356 357 /** 358 * Terminate the receive and transmit threads. 359 * <p> 360 * This is intended to be used only by testing subclasses. 361 */ 362 @Override 363 public void terminateThreads() { 364 threadStopRequest = true; 365 if (xmtThread != null) { 366 xmtThread.interrupt(); 367 try { 368 xmtThread.join(); 369 } catch (InterruptedException ie){ 370 // interrupted during cleanup. 371 } 372 } 373 374 if (rcvThread != null) { 375 rcvThread.interrupt(); 376 try { 377 rcvThread.join(); 378 } catch (InterruptedException ie){ 379 // interrupted during cleanup. 380 } 381 } 382 } 383 384 private final static Logger log = LoggerFactory.getLogger(DCCppOverTcpPacketizer.class); 385 386}