001package jmri.jmrix.loconet; 002 003import java.io.DataInputStream; 004import java.io.OutputStream; 005import java.util.LinkedList; 006import java.util.NoSuchElementException; 007import org.slf4j.Logger; 008import org.slf4j.LoggerFactory; 009 010/** 011 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface" 012 * side sends/receives LocoNetMessage objects. The connection to a 013 * LnPortController is via a pair of *Streams, which then carry sequences of 014 * characters for transmission. 015 * <p> 016 * Messages come to this via the main GUI thread, and are forwarded back to 017 * listeners in that same thread. Reception and transmission are handled in 018 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal 019 * classes defined here. The thread priorities are: 020 * <ul> 021 * <li> RcvHandler - at highest available priority 022 * <li> XmtHandler - down one, which is assumed to be above the GUI 023 * <li> (everything else) 024 * </ul> 025 * Some of the message formats used in this class are Copyright Digitrax, Inc. 026 * and used with permission as part of the JMRI project. That permission does 027 * not extend to uses in other software products. If you wish to use this code, 028 * algorithm or these message formats outside of JMRI, please contact Digitrax 029 * Inc for separate permission. 030 * 031 * @author Bob Jacobsen Copyright (C) 2001, 2018 032 * @author B. Milhaupt Copyright (C) 2020 033 */ 034public class LnPacketizer extends LnTrafficController { 035 036 /** 037 * True if the external hardware is not echoing messages, so we must. 038 */ 039 protected boolean echo = false; // true = echo messages here, instead of in hardware 040 041 public LnPacketizer(LocoNetSystemConnectionMemo m) { 042 // set the memo to point here 043 memo = m; 044 m.setLnTrafficController(this); 045 } 046 047 // The methods to implement the LocoNetInterface 048 049 /** 050 * {@inheritDoc} 051 */ 052 @Override 053 public boolean status() { 054 boolean returnVal = ( ostream != null && istream != null 055 && xmtThread != null && xmtThread.isAlive() && xmtHandler != null 056 && rcvThread != null && rcvThread.isAlive() && rcvHandler != null 057 ); 058 return returnVal; 059 } 060 061 /** 062 * Synchronized list used as a transmit queue. 063 */ 064 protected LinkedList<byte[]> xmtList = new LinkedList<>(); 065 066 /** 067 * XmtHandler (a local class) object to implement the transmit thread. 068 * <p> 069 * We create this object in startThreads() as each packetizer uses different handlers. 070 * So long as the object is created before using it to sync it works. 071 * 072 */ 073 protected Runnable xmtHandler = null; 074 075 /** 076 * RcvHandler (a local class) object to implement the receive thread 077 */ 078 protected Runnable rcvHandler; 079 080 /** 081 * Forward a preformatted LocoNetMessage to the actual interface. 082 * <p> 083 * Checksum is computed and overwritten here, then the message is converted 084 * to a byte array and queued for transmission. 085 * 086 * @param m Message to send; will be updated with CRC 087 */ 088 @Override 089 public void sendLocoNetMessage(LocoNetMessage m) { 090 091 // update statistics 092 transmittedMsgCount++; 093 094 // set the error correcting code byte(s) before transmittal 095 m.setParity(); 096 097 // stream to port in single write, as that's needed by serial 098 int len = m.getNumDataElements(); 099 byte msg[] = new byte[len]; 100 for (int i = 0; i < len; i++) { 101 msg[i] = (byte) m.getElement(i); 102 } 103 104 log.debug("queue LocoNet packet: {}", m); 105 // We need to queue the request and wake the xmit thread in an atomic operation 106 // But the thread might not be running, in which case the request is just 107 // queued up. 108 try { 109 synchronized (xmtHandler) { 110 xmtList.addLast(msg); 111 xmtHandler.notifyAll(); 112 } 113 } catch (RuntimeException e) { 114 log.warn("passing to xmit: unexpected exception: ", e); 115 } 116 } 117 118 /** 119 * Implement abstract method to signal if there's a backlog of information 120 * waiting to be sent. 121 * 122 * @return true if busy, false if nothing waiting to send 123 */ 124 @Override 125 public boolean isXmtBusy() { 126 if (controller == null) { 127 return false; 128 } 129 130 return (!controller.okToSend()); 131 } 132 133 // methods to connect/disconnect to a source of data in a LnPortController 134 135 protected LnPortController controller = null; 136 137 /** 138 * Make connection to an existing LnPortController object. 139 * 140 * @param p Port controller for connected. Save this for a later disconnect 141 * call 142 */ 143 public void connectPort(LnPortController p) { 144 istream = p.getInputStream(); 145 ostream = p.getOutputStream(); 146 if (controller != null) { 147 log.warn("connectPort: connect called while connected"); 148 } 149 controller = p; 150 } 151 152 /** 153 * Break connection to an existing LnPortController object. Once broken, 154 * attempts to send via "message" member will fail. 155 * 156 * @param p previously connected port 157 */ 158 public void disconnectPort(LnPortController p) { 159 istream = null; 160 ostream = null; 161 if (controller != p) { 162 log.warn("disconnectPort: disconnect called from non-connected LnPortController"); 163 } 164 controller = null; 165 } 166 167 // data members to hold the streams. These are public so the inner classes defined here 168 // can access them with a Java 1.1 compiler 169 public DataInputStream istream = null; 170 public OutputStream ostream = null; 171 172 /** 173 * Read a single byte, protecting against various timeouts, etc. 174 * <p> 175 * When a port is set to have a receive timeout (via the 176 * enableReceiveTimeout() method), some will return zero bytes or an 177 * EOFException at the end of the timeout. In that case, the read should be 178 * repeated to get the next real character. 179 * 180 * @param istream stream to read from 181 * @return buffer of received data 182 * @throws java.io.IOException failure during stream read 183 * 184 */ 185 protected byte readByteProtected(DataInputStream istream) throws java.io.IOException { 186 while (true) { // loop will repeat until character found 187 int nchars; 188 nchars = istream.read(rcvBuffer, 0, 1); 189 if (nchars > 0) { 190 return rcvBuffer[0]; 191 } 192 } 193 } 194 // Defined this way to reduce new object creation 195 private final byte[] rcvBuffer = new byte[1]; 196 197 /** 198 * Captive class to handle incoming characters. This is a permanent loop, 199 * looking for input messages in character form on the stream connected to 200 * the LnPortController via <code>connectPort</code>. 201 */ 202 protected class RcvHandler implements Runnable { 203 204 /** 205 * Remember the LnPacketizer object 206 */ 207 LnTrafficController trafficController; 208 209 public RcvHandler(LnTrafficController lt) { 210 trafficController = lt; 211 } 212 213 /** 214 * Handle incoming characters. This is a permanent loop, looking for 215 * input messages in character form on the stream connected to the 216 * LnPortController via <code>connectPort</code>. Terminates with the 217 * input stream breaking out of the try block. 218 */ 219 @Override 220 public void run() { 221 222 int opCode; 223 while (!threadStopRequest) { // loop until asked to stop 224 try { 225 // start by looking for command - skip if bit not set 226 while (((opCode = (readByteProtected(istream) & 0xFF)) & 0x80) == 0) { // the real work is in the loop check 227 if (log.isTraceEnabled()) { // avoid building string 228 log.trace("Skipping: {}", Integer.toHexString(opCode)); // NOI18N 229 } 230 } 231 // here opCode is OK. Create output message 232 if (log.isTraceEnabled()) { // avoid building string 233 log.trace(" (RcvHandler) Start message with opcode: {}", Integer.toHexString(opCode)); // NOI18N 234 } 235 LocoNetMessage msg = null; 236 while (msg == null) { 237 try { 238 // Capture 2nd byte, always present 239 int byte2 = readByteProtected(istream) & 0xFF; 240 if (log.isTraceEnabled()) { // avoid building string 241 log.trace("Byte2: {}", Integer.toHexString(byte2)); // NOI18N 242 } // Decide length 243 int len = 2; 244 switch ((opCode & 0x60) >> 5) { 245 case 0: 246 /* 2 byte message */ 247 248 len = 2; 249 break; 250 251 case 1: 252 /* 4 byte message */ 253 254 len = 4; 255 break; 256 257 case 2: 258 /* 6 byte message */ 259 260 len = 6; 261 break; 262 263 case 3: 264 /* N byte message */ 265 266 if (byte2 < 2) { 267 log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); // NOI18N 268 } 269 len = byte2; 270 break; 271 default: 272 log.warn("Unhandled code: {}", (opCode & 0x60) >> 5); 273 break; 274 } 275 msg = new LocoNetMessage(len); 276 // message exists, now fill it 277 msg.setOpCode(opCode); 278 msg.setElement(1, byte2); 279 log.trace("len: {}", len); // NOI18N 280 for (int i = 2; i < len; i++) { 281 // check for message-blocking error 282 int b = readByteProtected(istream) & 0xFF; 283 if (log.isTraceEnabled()) { 284 log.trace("char {} is: {}", i, Integer.toHexString(b)); // NOI18N 285 } 286 if ((b & 0x80) != 0) { 287 log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); // NOI18N 288 opCode = b; 289 throw new LocoNetMessageException(); 290 } 291 msg.setElement(i, b); 292 } 293 } catch (LocoNetMessageException e) { 294 // retry by destroying the existing message 295 // opCode is set for the newly-started packet 296 msg = null; 297 } 298 } 299 // check parity 300 if (!msg.checkParity()) { 301 log.warn("Ignore LocoNet packet with bad checksum: {}", msg); 302 throw new LocoNetMessageException(); 303 } 304 // message is complete, dispatch it !! 305 { 306 log.debug("queue message for notification: {}", msg); 307 308 jmri.util.ThreadingUtil.runOnLayoutEventually(new RcvMemo(msg, trafficController)); 309 } 310 311 // done with this one 312 } catch (LocoNetMessageException e) { 313 // just let it ride for now 314 log.warn("run: unexpected LocoNetMessageException", e); // NOI18N 315 } catch (java.io.EOFException e) { 316 // posted from idle port when enableReceiveTimeout used 317 log.trace("EOFException, is LocoNet serial I/O using timeouts?"); // NOI18N 318 } catch (java.io.IOException e) { 319 // fired when write-end of HexFile reaches end 320 log.debug("IOException, should only happen with HexFile", e); // NOI18N 321 log.info("End of file"); // NOI18N 322 disconnectPort(controller); 323 return; 324 } // normally, we don't catch RuntimeException, but in this 325 // permanently running loop it seems wise. 326 catch (RuntimeException e) { 327 log.warn("run: unexpected Exception", e); // NOI18N 328 } 329 } // end of permanent loop 330 } 331 } 332 333 /** 334 * Captive class to notify of one message. 335 */ 336 private static class RcvMemo implements jmri.util.ThreadingUtil.ThreadAction { 337 338 public RcvMemo(LocoNetMessage msg, LnTrafficController trafficController) { 339 thisMsg = msg; 340 thisTc = trafficController; 341 } 342 LocoNetMessage thisMsg; 343 LnTrafficController thisTc; 344 345 /** 346 * {@inheritDoc} 347 */ 348 @Override 349 public void run() { 350 thisTc.notify(thisMsg); 351 } 352 } 353 354 /** 355 * Captive class to handle transmission. 356 */ 357 class XmtHandler implements Runnable { 358 359 /** 360 * Loops forever, looking for message to send and processing them. 361 */ 362 @Override 363 public void run() { 364 365 while (!threadStopRequest) { // loop until asked to stop 366 // any input? 367 try { 368 // get content; failure is a NoSuchElementException 369 log.trace("check for input"); // NOI18N 370 byte msg[] = null; 371 synchronized (this) { 372 msg = xmtList.removeFirst(); 373 } 374 375 // input - now send 376 try { 377 if (ostream != null) { 378 if (log.isDebugEnabled()) { // avoid work if not needed 379 if (isXmtBusy()) log.debug("LocoNet port not ready to receive"); // NOI18N 380 log.debug("start write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N 381 } 382 ostream.write(msg); 383 ostream.flush(); 384 if (log.isTraceEnabled()) { // avoid String building if not needed 385 log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N 386 } 387 messageTransmitted(msg); 388 } else { 389 // no stream connected 390 log.warn("sendLocoNetMessage: no connection established"); // NOI18N 391 } 392 } catch (java.io.IOException e) { 393 log.warn("sendLocoNetMessage: IOException: {}", e.toString()); // NOI18N 394 } 395 } catch (NoSuchElementException e) { 396 // message queue was empty, wait for input 397 log.trace("start wait"); // NOI18N 398 399 new jmri.util.WaitHandler(this); // handle synchronization, spurious wake, interruption 400 401 log.trace("end wait"); // NOI18N 402 } 403 } 404 } 405 } 406 407 /** 408 * When a message is finally transmitted, forward it to listeners if echoing 409 * is needed. 410 * 411 * @param msg message sent 412 */ 413 protected void messageTransmitted(byte[] msg) { 414 log.debug("message transmitted (echo {})", echo); 415 if (!echo) { 416 return; 417 } 418 // message is queued for transmit, echo it when needed 419 // return a notification via the queue to ensure end 420 javax.swing.SwingUtilities.invokeLater(new Echo(this, new LocoNetMessage(msg))); 421 } 422 423 static class Echo implements Runnable { 424 425 Echo(LnPacketizer t, LocoNetMessage m) { 426 myTc = t; 427 msgForLater = m; 428 } 429 LocoNetMessage msgForLater; 430 LnPacketizer myTc; 431 432 /** 433 * {@inheritDoc} 434 */ 435 @Override 436 public void run() { 437 myTc.notify(msgForLater); 438 } 439 } 440 441 /** 442 * Invoked at startup to start the threads needed here. 443 */ 444 public void startThreads() { 445 int priority = Thread.currentThread().getPriority(); 446 log.debug("startThreads current priority = {} max available = {} default = {} min available = {}", // NOI18N 447 priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY); 448 449 // start the RcvHandler in a thread of its own 450 if (rcvHandler == null) { 451 rcvHandler = new RcvHandler(this); 452 } 453 rcvThread = new Thread(rcvHandler, "LocoNet receive handler"); // NOI18N 454 rcvThread.setDaemon(true); 455 rcvThread.setPriority(Thread.MAX_PRIORITY); 456 rcvThread.start(); 457 458 if (xmtHandler == null) { 459 xmtHandler = new XmtHandler(); 460 } 461 // make sure that the xmt priority is no lower than the current priority 462 int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY); 463 // start the XmtHandler in a thread of its own 464 if (xmtThread == null) { 465 xmtThread = new Thread(xmtHandler, "LocoNet transmit handler"); // NOI18N 466 } 467 log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N 468 xmtThread.setDaemon(true); 469 xmtThread.setPriority(Thread.MAX_PRIORITY - 1); 470 xmtThread.start(); 471 472 log.info("lnPacketizer Started"); 473 } 474 475 protected Thread rcvThread; 476 protected Thread xmtThread; 477 478 /** 479 * {@inheritDoc} 480 */ 481 @SuppressWarnings("deprecation") // Thread.stop() 482 @Override 483 public void dispose() { 484 if (xmtThread != null) { 485 xmtThread.stop(); // interrupt not sufficient? 486 try { 487 xmtThread.join(); 488 } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);} 489 } 490 if (rcvThread != null) { 491 rcvThread.stop(); // interrupt not sufficient, because jtermios hangs in select via purejavacomm.PureJavaSerialPort$2.read 492 try { 493 rcvThread.join(); 494 } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);} 495 } 496 super.dispose(); 497 } 498 499 /** 500 * Terminate the receive and transmit threads. 501 * <p> 502 * This is intended to be used only by testing subclasses. 503 */ 504 public void terminateThreads() { 505 threadStopRequest = true; 506 if (xmtThread != null) { 507 xmtThread.interrupt(); 508 try { 509 xmtThread.join(); 510 } catch (InterruptedException ie){ 511 // interrupted during cleanup. 512 } 513 } 514 515 if (rcvThread != null) { 516 rcvThread.interrupt(); 517 try { 518 rcvThread.join(); 519 } catch (InterruptedException ie){ 520 // interrupted during cleanup. 521 } 522 } 523 } 524 525 /** 526 * Flag that threads should terminate as soon as they can. 527 */ 528 protected volatile boolean threadStopRequest = false; 529 530 private final static Logger log = LoggerFactory.getLogger(LnPacketizer.class); 531 532}