001package jmri.jmrix.loconet; 002 003import java.io.DataInputStream; 004import java.io.OutputStream; 005import java.util.concurrent.LinkedTransferQueue; 006import org.slf4j.Logger; 007import org.slf4j.LoggerFactory; 008 009/** 010 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface" 011 * side sends/receives LocoNetMessage objects. The connection to a 012 * LnPortController is via a pair of *Streams, which then carry sequences of 013 * characters for transmission. 014 * <p> 015 * Messages come to this via the main GUI thread, and are forwarded back to 016 * listeners in that same thread. Reception and transmission are handled in 017 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal 018 * classes defined here. The thread priorities are: 019 * <ul> 020 * <li> RcvHandler - at highest available priority 021 * <li> XmtHandler - down one, which is assumed to be above the GUI 022 * <li> (everything else) 023 * </ul> 024 * Some of the message formats used in this class are Copyright Digitrax, Inc. 025 * and used with permission as part of the JMRI project. That permission does 026 * not extend to uses in other software products. If you wish to use this code, 027 * algorithm or these message formats outside of JMRI, please contact Digitrax 028 * Inc for separate permission. 029 * 030 * @author Bob Jacobsen Copyright (C) 2001, 2018 031 * @author B. Milhaupt Copyright (C) 2020 032 */ 033public class LnPacketizer extends LnTrafficController { 034 035 /** 036 * True if the external hardware is not echoing messages, so we must. 037 */ 038 protected boolean echo = false; // true = echo messages here, instead of in hardware 039 040 public LnPacketizer(LocoNetSystemConnectionMemo m) { 041 // set the memo to point here 042 memo = m; 043 m.setLnTrafficController(this); 044 } 045 046 // The methods to implement the LocoNetInterface 047 048 /** 049 * {@inheritDoc} 050 */ 051 @Override 052 public boolean status() { 053 boolean returnVal = ( ostream != null && istream != null 054 && xmtThread != null && xmtThread.isAlive() && xmtHandler != null 055 && rcvThread != null && rcvThread.isAlive() && rcvHandler != null 056 ); 057 return returnVal; 058 } 059 060 /** 061 * Synchronized list used as a transmit queue. 062 */ 063 protected LinkedTransferQueue<byte[]> xmtList = new LinkedTransferQueue<>(); 064 065 /** 066 * XmtHandler (a local class) object to implement the transmit thread. 067 * <p> 068 * We create this object in startThreads() as each packetizer uses different handlers. 069 * So long as the object is created before using it to sync it works. 070 * 071 */ 072 protected Runnable xmtHandler = null; 073 074 /** 075 * RcvHandler (a local class) object to implement the receive thread 076 */ 077 protected Runnable rcvHandler; 078 079 /** 080 * Forward a preformatted LocoNetMessage to the actual interface. 081 * <p> 082 * Checksum is computed and overwritten here, then the message is converted 083 * to a byte array and queued for transmission. 084 * 085 * @param m Message to send; will be updated with CRC 086 */ 087 @Override 088 public void sendLocoNetMessage(LocoNetMessage m) { 089 090 // update statistics 091 transmittedMsgCount++; 092 093 // set the error correcting code byte(s) before transmittal 094 m.setParity(); 095 096 // stream to port in single write, as that's needed by serial 097 int len = m.getNumDataElements(); 098 byte msg[] = new byte[len]; 099 for (int i = 0; i < len; i++) { 100 msg[i] = (byte) m.getElement(i); 101 } 102 103 log.debug("queue LocoNet packet: {}", m); 104 // We need to queue the request and wake the xmit thread in an atomic operation 105 // But the thread might not be running, in which case the request is just 106 // queued up. 107 try { 108 xmtList.add(msg); 109 } catch (RuntimeException e) { 110 log.warn("passing to xmit: unexpected exception: ", e); 111 } 112 } 113 114 /** 115 * Implement abstract method to signal if there's a backlog of information 116 * waiting to be sent. 117 * 118 * @return true if busy, false if nothing waiting to send 119 */ 120 @Override 121 public boolean isXmtBusy() { 122 if (controller == null) { 123 return false; 124 } 125 126 return (!controller.okToSend()); 127 } 128 129 // methods to connect/disconnect to a source of data in a LnPortController 130 131 protected LnPortController controller = null; 132 133 /** 134 * Make connection to an existing LnPortController object. 135 * 136 * @param p Port controller for connected. Save this for a later disconnect 137 * call 138 */ 139 public void connectPort(LnPortController p) { 140 istream = p.getInputStream(); 141 ostream = p.getOutputStream(); 142 if (controller != null) { 143 log.warn("connectPort: connect called while connected"); 144 } 145 controller = p; 146 } 147 148 /** 149 * Break connection to an existing LnPortController object. Once broken, 150 * attempts to send via "message" member will fail. 151 * 152 * @param p previously connected port 153 */ 154 public void disconnectPort(LnPortController p) { 155 istream = null; 156 ostream = null; 157 if (controller != p) { 158 log.warn("disconnectPort: disconnect called from non-connected LnPortController"); 159 } 160 controller = null; 161 } 162 163 // data members to hold the streams. These are public so the inner classes defined here 164 // can access them with a Java 1.1 compiler 165 public DataInputStream istream = null; 166 public OutputStream ostream = null; 167 168 /** 169 * Read a single byte, protecting against various timeouts, etc. 170 * <p> 171 * When a port is set to have a receive timeout (via the 172 * enableReceiveTimeout() method), some will return zero bytes or an 173 * EOFException at the end of the timeout. In that case, the read should be 174 * repeated to get the next real character. 175 * 176 * @param istream stream to read from 177 * @return buffer of received data 178 * @throws java.io.IOException failure during stream read 179 * 180 */ 181 protected byte readByteProtected(DataInputStream istream) throws java.io.IOException { 182 while (true) { // loop will repeat until character found 183 int nchars; 184 nchars = istream.read(rcvBuffer, 0, 1); 185 if (nchars > 0) { 186 return rcvBuffer[0]; 187 } 188 } 189 } 190 // Defined this way to reduce new object creation 191 private final byte[] rcvBuffer = new byte[1]; 192 193 /** 194 * Captive class to handle incoming characters. This is a permanent loop, 195 * looking for input messages in character form on the stream connected to 196 * the LnPortController via <code>connectPort</code>. 197 */ 198 protected class RcvHandler implements Runnable { 199 200 /** 201 * Remember the LnPacketizer object 202 */ 203 LnTrafficController trafficController; 204 205 public RcvHandler(LnTrafficController lt) { 206 trafficController = lt; 207 } 208 209 /** 210 * Handle incoming characters. This is a permanent loop, looking for 211 * input messages in character form on the stream connected to the 212 * LnPortController via <code>connectPort</code>. Terminates with the 213 * input stream breaking out of the try block. 214 */ 215 @Override 216 public void run() { 217 218 int opCode; 219 while (!threadStopRequest && ! Thread.interrupted() ) { // loop until asked to stop 220 try { 221 // start by looking for command - skip if bit not set 222 while (((opCode = (readByteProtected(istream) & 0xFF)) & 0x80) == 0) { // the real work is in the loop check 223 if (log.isTraceEnabled()) { // avoid building string 224 log.trace("Skipping: {}", Integer.toHexString(opCode)); // NOI18N 225 } 226 } 227 // here opCode is OK. Create output message 228 if (log.isTraceEnabled()) { // avoid building string 229 log.trace(" (RcvHandler) Start message with opcode: {}", Integer.toHexString(opCode)); // NOI18N 230 } 231 LocoNetMessage msg = null; 232 while (msg == null) { 233 try { 234 // Capture 2nd byte, always present 235 int byte2 = readByteProtected(istream) & 0xFF; 236 if (log.isTraceEnabled()) { // avoid building string 237 log.trace("Byte2: {}", Integer.toHexString(byte2)); // NOI18N 238 } // Decide length 239 int len = 2; 240 switch ((opCode & 0x60) >> 5) { 241 case 0: 242 /* 2 byte message */ 243 244 len = 2; 245 break; 246 247 case 1: 248 /* 4 byte message */ 249 250 len = 4; 251 break; 252 253 case 2: 254 /* 6 byte message */ 255 256 len = 6; 257 break; 258 259 case 3: 260 /* N byte message */ 261 262 if (byte2 < 2) { 263 log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); // NOI18N 264 } 265 len = byte2; 266 break; 267 default: 268 log.warn("Unhandled code: {}", (opCode & 0x60) >> 5); 269 break; 270 } 271 msg = new LocoNetMessage(len); 272 // message exists, now fill it 273 msg.setOpCode(opCode); 274 msg.setElement(1, byte2); 275 log.trace("len: {}", len); // NOI18N 276 for (int i = 2; i < len; i++) { 277 // check for message-blocking error 278 int b = readByteProtected(istream) & 0xFF; 279 if (log.isTraceEnabled()) { 280 log.trace("char {} is: {}", i, Integer.toHexString(b)); // NOI18N 281 } 282 if ((b & 0x80) != 0) { 283 log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); // NOI18N 284 opCode = b; 285 throw new LocoNetMessageException(); 286 } 287 msg.setElement(i, b); 288 } 289 } catch (LocoNetMessageException e) { 290 // retry by destroying the existing message 291 // opCode is set for the newly-started packet 292 msg = null; 293 } 294 } 295 // check parity 296 if (!msg.checkParity()) { 297 log.warn("Ignore LocoNet packet with bad checksum: {}", msg); 298 throw new LocoNetMessageException(); 299 } 300 // message is complete, dispatch it !! 301 { 302 log.debug("queue message for notification: {}", msg); 303 304 jmri.util.ThreadingUtil.runOnLayoutEventually(new RcvMemo(msg, trafficController)); 305 } 306 307 // done with this one 308 } catch (LocoNetMessageException e) { 309 // just let it ride for now 310 log.warn("run: unexpected LocoNetMessageException", e); // NOI18N 311 } catch (java.io.EOFException | com.fazecast.jSerialComm.SerialPortTimeoutException e) { 312 // posted from idle port when enableReceiveTimeout used 313 } catch (java.io.IOException e) { 314 // fired when write-end of HexFile reaches end 315 log.debug("IOException, should only happen with HexFile", e); // NOI18N 316 log.info("End of file"); // NOI18N 317 disconnectPort(controller); 318 return; 319 } // normally, we don't catch RuntimeException, but in this 320 // permanently running loop it seems wise. 321 catch (RuntimeException e) { 322 log.warn("run: unexpected Exception", e); // NOI18N 323 } 324 } // end of permanent loop 325 } 326 } 327 328 /** 329 * Captive class to notify of one message. 330 */ 331 private static class RcvMemo implements jmri.util.ThreadingUtil.ThreadAction { 332 333 public RcvMemo(LocoNetMessage msg, LnTrafficController trafficController) { 334 thisMsg = msg; 335 thisTc = trafficController; 336 } 337 LocoNetMessage thisMsg; 338 LnTrafficController thisTc; 339 340 /** 341 * {@inheritDoc} 342 */ 343 @Override 344 public void run() { 345 thisTc.notify(thisMsg); 346 } 347 } 348 349 /** 350 * Captive class to handle transmission. 351 */ 352 class XmtHandler implements Runnable { 353 354 /** 355 * Loops forever, looking for message to send and processing them. 356 */ 357 @Override 358 public void run() { 359 360 while (!threadStopRequest) { // loop until asked to stop 361 // any input? 362 try { 363 // get content; blocks until present 364 log.trace("check for input"); // NOI18N 365 366 byte msg[] = xmtList.take(); 367 368 // input - now send 369 try { 370 if (ostream != null) { 371 if (log.isDebugEnabled()) { // avoid work if not needed 372 if (isXmtBusy()) log.debug("LocoNet port not ready to receive"); // NOI18N 373 log.debug("start write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N 374 } 375 ostream.write(msg); 376 ostream.flush(); 377 if (log.isTraceEnabled()) { // avoid String building if not needed 378 log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N 379 } 380 messageTransmitted(msg); 381 } else { 382 // no stream connected 383 log.warn("sendLocoNetMessage: no connection established"); // NOI18N 384 } 385 } catch (java.io.IOException e) { 386 log.warn("sendLocoNetMessage: IOException: {}", e.toString()); // NOI18N 387 } 388 } catch (InterruptedException ie) { 389 return; // ending the thread 390 } catch (RuntimeException rt) { 391 log.error("Exception on take() call", rt); 392 } 393 } 394 } 395 } 396 397 /** 398 * When a message is finally transmitted, forward it to listeners if echoing 399 * is needed. 400 * 401 * @param msg message sent 402 */ 403 protected void messageTransmitted(byte[] msg) { 404 log.debug("message transmitted (echo {})", echo); 405 if (!echo) { 406 return; 407 } 408 // message is queued for transmit, echo it when needed 409 // return a notification via the queue to ensure end 410 javax.swing.SwingUtilities.invokeLater(new Echo(this, new LocoNetMessage(msg))); 411 } 412 413 static class Echo implements Runnable { 414 415 Echo(LnPacketizer t, LocoNetMessage m) { 416 myTc = t; 417 msgForLater = m; 418 } 419 LocoNetMessage msgForLater; 420 LnPacketizer myTc; 421 422 /** 423 * {@inheritDoc} 424 */ 425 @Override 426 public void run() { 427 myTc.notify(msgForLater); 428 } 429 } 430 431 /** 432 * Invoked at startup to start the threads needed here. 433 */ 434 public void startThreads() { 435 int priority = Thread.currentThread().getPriority(); 436 log.debug("startThreads current priority = {} max available = {} default = {} min available = {}", // NOI18N 437 priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY); 438 439 // start the RcvHandler in a thread of its own 440 if (rcvHandler == null) { 441 rcvHandler = new RcvHandler(this); 442 } 443 rcvThread = new Thread(rcvHandler, "LocoNet receive handler"); // NOI18N 444 rcvThread.setDaemon(true); 445 rcvThread.setPriority(Thread.MAX_PRIORITY); 446 rcvThread.start(); 447 448 if (xmtHandler == null) { 449 xmtHandler = new XmtHandler(); 450 } 451 // make sure that the xmt priority is no lower than the current priority 452 int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY); 453 // start the XmtHandler in a thread of its own 454 if (xmtThread == null) { 455 xmtThread = new Thread(xmtHandler, "LocoNet transmit handler"); // NOI18N 456 } 457 log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N 458 xmtThread.setDaemon(true); 459 xmtThread.setPriority(Thread.MAX_PRIORITY - 1); 460 xmtThread.start(); 461 462 log.info("lnPacketizer Started"); 463 } 464 465 protected Thread rcvThread; 466 protected Thread xmtThread; 467 468 /** 469 * {@inheritDoc} 470 */ 471 @SuppressWarnings("deprecation") // Thread.stop() 472 @Override 473 public void dispose() { 474 if (xmtThread != null) { 475 xmtThread.interrupt(); 476 try { 477 xmtThread.join(); 478 } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);} 479 } 480 if (rcvThread != null) { 481 rcvThread.interrupt(); 482 try { 483 rcvThread.join(); 484 } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);} 485 } 486 super.dispose(); 487 } 488 489 /** 490 * Terminate the receive and transmit threads. 491 * <p> 492 * This is intended to be used only by testing subclasses. 493 */ 494 public void terminateThreads() { 495 threadStopRequest = true; 496 if (xmtThread != null) { 497 xmtThread.interrupt(); 498 try { 499 xmtThread.join(); 500 } catch (InterruptedException ie){ 501 // interrupted during cleanup. 502 } 503 } 504 505 if (rcvThread != null) { 506 rcvThread.interrupt(); 507 try { 508 rcvThread.join(); 509 } catch (InterruptedException ie){ 510 // interrupted during cleanup. 511 } 512 } 513 } 514 515 /** 516 * Flag that threads should terminate as soon as they can. 517 */ 518 protected volatile boolean threadStopRequest = false; 519 520 private final static Logger log = LoggerFactory.getLogger(LnPacketizer.class); 521 522}