001package jmri.jmrix.loconet; 002 003import java.io.DataInputStream; 004import java.io.OutputStream; 005import java.util.concurrent.LinkedTransferQueue; 006import org.slf4j.Logger; 007import org.slf4j.LoggerFactory; 008 009/** 010 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface" 011 * side sends/receives LocoNetMessage objects. The connection to a 012 * LnPortController is via a pair of *Streams, which then carry sequences of 013 * characters for transmission. 014 * <p> 015 * Messages come to this via the main GUI thread, and are forwarded back to 016 * listeners in that same thread. Reception and transmission are handled in 017 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal 018 * classes defined here. The thread priorities are: 019 * <ul> 020 * <li> RcvHandler - at highest available priority 021 * <li> XmtHandler - down one, which is assumed to be above the GUI 022 * <li> (everything else) 023 * </ul> 024 * Some of the message formats used in this class are Copyright Digitrax, Inc. 025 * and used with permission as part of the JMRI project. That permission does 026 * not extend to uses in other software products. If you wish to use this code, 027 * algorithm or these message formats outside of JMRI, please contact Digitrax 028 * Inc for separate permission. 029 * 030 * @author Bob Jacobsen Copyright (C) 2001, 2018 031 * @author B. Milhaupt Copyright (C) 2020 032 */ 033public class LnPacketizer extends LnTrafficController { 034 035 /** 036 * True if the external hardware is not echoing messages, so we must. 037 */ 038 protected boolean echo = false; // true = echo messages here, instead of in hardware 039 040 public LnPacketizer(LocoNetSystemConnectionMemo m) { 041 // set the memo to point here 042 memo = m; 043 m.setLnTrafficController(this); 044 } 045 046 // The methods to implement the LocoNetInterface 047 048 /** 049 * {@inheritDoc} 050 */ 051 @Override 052 public boolean status() { 053 boolean returnVal = ( ostream != null && istream != null 054 && xmtThread != null && xmtThread.isAlive() && xmtHandler != null 055 && rcvThread != null && rcvThread.isAlive() && rcvHandler != null 056 ); 057 return returnVal; 058 } 059 060 /** 061 * Synchronized list used as a transmit queue. 062 */ 063 protected LinkedTransferQueue<byte[]> xmtList = new LinkedTransferQueue<>(); 064 065 /** 066 * XmtHandler (a local class) object to implement the transmit thread. 067 * <p> 068 * We create this object in startThreads() as each packetizer uses different handlers. 069 * So long as the object is created before using it to sync it works. 070 * 071 */ 072 protected Runnable xmtHandler = null; 073 074 /** 075 * RcvHandler (a local class) object to implement the receive thread 076 */ 077 protected Runnable rcvHandler; 078 079 /** 080 * Forward a preformatted LocoNetMessage to the actual interface. 081 * <p> 082 * Checksum is computed and overwritten here, then the message is converted 083 * to a byte array and queued for transmission. 084 * 085 * @param m Message to send; will be updated with CRC 086 */ 087 @Override 088 public void sendLocoNetMessage(LocoNetMessage m) { 089 090 // update statistics 091 transmittedMsgCount++; 092 093 // set the error correcting code byte(s) before transmittal 094 m.setParity(); 095 096 // stream to port in single write, as that's needed by serial 097 int len = m.getNumDataElements(); 098 byte msg[] = new byte[len]; 099 for (int i = 0; i < len; i++) { 100 msg[i] = (byte) m.getElement(i); 101 } 102 103 log.debug("queue LocoNet packet: {}", m); 104 // We need to queue the request and wake the xmit thread in an atomic operation 105 // But the thread might not be running, in which case the request is just 106 // queued up. 107 try { 108 xmtList.add(msg); 109 } catch (RuntimeException e) { 110 log.warn("passing to xmit: unexpected exception: ", e); 111 } 112 } 113 114 /** 115 * Implement abstract method to signal if there's a backlog of information 116 * waiting to be sent. 117 * 118 * @return true if busy, false if nothing waiting to send 119 */ 120 @Override 121 public boolean isXmtBusy() { 122 if (controller == null) { 123 return false; 124 } 125 126 return (!controller.okToSend()); 127 } 128 129 // methods to connect/disconnect to a source of data in a LnPortController 130 131 protected LnPortController controller = null; 132 133 /** 134 * Make connection to an existing LnPortController object. 135 * 136 * @param p Port controller for connected. Save this for a later disconnect 137 * call 138 */ 139 public void connectPort(LnPortController p) { 140 istream = p.getInputStream(); 141 ostream = p.getOutputStream(); 142 if (controller != null) { 143 log.warn("connectPort: connect called while connected"); 144 } 145 controller = p; 146 } 147 148 /** 149 * Break connection to an existing LnPortController object. Once broken, 150 * attempts to send via "message" member will fail. 151 * 152 * @param p previously connected port 153 */ 154 public void disconnectPort(LnPortController p) { 155 istream = null; 156 ostream = null; 157 if (controller != p) { 158 log.warn("disconnectPort: disconnect called from non-connected LnPortController"); 159 } 160 controller = null; 161 } 162 163 // data members to hold the streams. These are public so the inner classes defined here 164 // can access them with a Java 1.1 compiler 165 public DataInputStream istream = null; 166 public OutputStream ostream = null; 167 168 /** 169 * Read a single byte, protecting against various timeouts, etc. 170 * <p> 171 * When a port is set to have a receive timeout (via the 172 * enableReceiveTimeout() method), some will return zero bytes or an 173 * EOFException at the end of the timeout. In that case, the read should be 174 * repeated to get the next real character. 175 * 176 * @param istream stream to read from 177 * @return buffer of received data 178 * @throws java.io.IOException failure during stream read 179 * 180 */ 181 protected byte readByteProtected(DataInputStream istream) throws java.io.IOException { 182 while (true) { // loop will repeat until character found 183 int nchars; 184 nchars = istream.read(rcvBuffer, 0, 1); 185 if (nchars > 0) { 186 return rcvBuffer[0]; 187 } 188 } 189 } 190 // Defined this way to reduce new object creation 191 private final byte[] rcvBuffer = new byte[1]; 192 193 /** 194 * Captive class to handle incoming characters. This is a permanent loop, 195 * looking for input messages in character form on the stream connected to 196 * the LnPortController via <code>connectPort</code>. 197 */ 198 protected class RcvHandler implements Runnable { 199 200 /** 201 * Remember the LnPacketizer object 202 */ 203 LnTrafficController trafficController; 204 205 public RcvHandler(LnTrafficController lt) { 206 trafficController = lt; 207 } 208 209 /** 210 * Handle incoming characters. This is a permanent loop, looking for 211 * input messages in character form on the stream connected to the 212 * LnPortController via <code>connectPort</code>. Terminates with the 213 * input stream breaking out of the try block. 214 */ 215 @Override 216 public void run() { 217 218 int opCode; 219 while (!threadStopRequest) { // loop until asked to stop 220 try { 221 // start by looking for command - skip if bit not set 222 while (((opCode = (readByteProtected(istream) & 0xFF)) & 0x80) == 0) { // the real work is in the loop check 223 if (log.isTraceEnabled()) { // avoid building string 224 log.trace("Skipping: {}", Integer.toHexString(opCode)); // NOI18N 225 } 226 } 227 // here opCode is OK. Create output message 228 if (log.isTraceEnabled()) { // avoid building string 229 log.trace(" (RcvHandler) Start message with opcode: {}", Integer.toHexString(opCode)); // NOI18N 230 } 231 LocoNetMessage msg = null; 232 while (msg == null) { 233 try { 234 // Capture 2nd byte, always present 235 int byte2 = readByteProtected(istream) & 0xFF; 236 if (log.isTraceEnabled()) { // avoid building string 237 log.trace("Byte2: {}", Integer.toHexString(byte2)); // NOI18N 238 } // Decide length 239 int len = 2; 240 switch ((opCode & 0x60) >> 5) { 241 case 0: 242 /* 2 byte message */ 243 244 len = 2; 245 break; 246 247 case 1: 248 /* 4 byte message */ 249 250 len = 4; 251 break; 252 253 case 2: 254 /* 6 byte message */ 255 256 len = 6; 257 break; 258 259 case 3: 260 /* N byte message */ 261 262 if (byte2 < 2) { 263 log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); // NOI18N 264 } 265 len = byte2; 266 break; 267 default: 268 log.warn("Unhandled code: {}", (opCode & 0x60) >> 5); 269 break; 270 } 271 msg = new LocoNetMessage(len); 272 // message exists, now fill it 273 msg.setOpCode(opCode); 274 msg.setElement(1, byte2); 275 log.trace("len: {}", len); // NOI18N 276 for (int i = 2; i < len; i++) { 277 // check for message-blocking error 278 int b = readByteProtected(istream) & 0xFF; 279 if (log.isTraceEnabled()) { 280 log.trace("char {} is: {}", i, Integer.toHexString(b)); // NOI18N 281 } 282 if ((b & 0x80) != 0) { 283 log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); // NOI18N 284 opCode = b; 285 throw new LocoNetMessageException(); 286 } 287 msg.setElement(i, b); 288 } 289 } catch (LocoNetMessageException e) { 290 // retry by destroying the existing message 291 // opCode is set for the newly-started packet 292 msg = null; 293 } 294 } 295 // check parity 296 if (!msg.checkParity()) { 297 log.warn("Ignore LocoNet packet with bad checksum: {}", msg); 298 throw new LocoNetMessageException(); 299 } 300 // message is complete, dispatch it !! 301 { 302 log.debug("queue message for notification: {}", msg); 303 304 jmri.util.ThreadingUtil.runOnLayoutEventually(new RcvMemo(msg, trafficController)); 305 } 306 307 // done with this one 308 } catch (LocoNetMessageException e) { 309 // just let it ride for now 310 log.warn("run: unexpected LocoNetMessageException", e); // NOI18N 311 } catch (java.io.EOFException e) { 312 // posted from idle port when enableReceiveTimeout used 313 log.trace("EOFException, is LocoNet serial I/O using timeouts?"); // NOI18N 314 } catch (java.io.IOException e) { 315 // fired when write-end of HexFile reaches end 316 log.debug("IOException, should only happen with HexFile", e); // NOI18N 317 log.info("End of file"); // NOI18N 318 disconnectPort(controller); 319 return; 320 } // normally, we don't catch RuntimeException, but in this 321 // permanently running loop it seems wise. 322 catch (RuntimeException e) { 323 log.warn("run: unexpected Exception", e); // NOI18N 324 } 325 } // end of permanent loop 326 } 327 } 328 329 /** 330 * Captive class to notify of one message. 331 */ 332 private static class RcvMemo implements jmri.util.ThreadingUtil.ThreadAction { 333 334 public RcvMemo(LocoNetMessage msg, LnTrafficController trafficController) { 335 thisMsg = msg; 336 thisTc = trafficController; 337 } 338 LocoNetMessage thisMsg; 339 LnTrafficController thisTc; 340 341 /** 342 * {@inheritDoc} 343 */ 344 @Override 345 public void run() { 346 thisTc.notify(thisMsg); 347 } 348 } 349 350 /** 351 * Captive class to handle transmission. 352 */ 353 class XmtHandler implements Runnable { 354 355 /** 356 * Loops forever, looking for message to send and processing them. 357 */ 358 @Override 359 public void run() { 360 361 while (!threadStopRequest) { // loop until asked to stop 362 // any input? 363 try { 364 // get content; blocks until present 365 log.trace("check for input"); // NOI18N 366 367 byte msg[] = xmtList.take(); 368 369 // input - now send 370 try { 371 if (ostream != null) { 372 if (log.isDebugEnabled()) { // avoid work if not needed 373 if (isXmtBusy()) log.debug("LocoNet port not ready to receive"); // NOI18N 374 log.debug("start write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N 375 } 376 ostream.write(msg); 377 ostream.flush(); 378 if (log.isTraceEnabled()) { // avoid String building if not needed 379 log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N 380 } 381 messageTransmitted(msg); 382 } else { 383 // no stream connected 384 log.warn("sendLocoNetMessage: no connection established"); // NOI18N 385 } 386 } catch (java.io.IOException e) { 387 log.warn("sendLocoNetMessage: IOException: {}", e.toString()); // NOI18N 388 } 389 } catch (InterruptedException ie) { 390 return; // ending the thread 391 } catch (RuntimeException rt) { 392 log.error("Exception on take() call", rt); 393 } 394 } 395 } 396 } 397 398 /** 399 * When a message is finally transmitted, forward it to listeners if echoing 400 * is needed. 401 * 402 * @param msg message sent 403 */ 404 protected void messageTransmitted(byte[] msg) { 405 log.debug("message transmitted (echo {})", echo); 406 if (!echo) { 407 return; 408 } 409 // message is queued for transmit, echo it when needed 410 // return a notification via the queue to ensure end 411 javax.swing.SwingUtilities.invokeLater(new Echo(this, new LocoNetMessage(msg))); 412 } 413 414 static class Echo implements Runnable { 415 416 Echo(LnPacketizer t, LocoNetMessage m) { 417 myTc = t; 418 msgForLater = m; 419 } 420 LocoNetMessage msgForLater; 421 LnPacketizer myTc; 422 423 /** 424 * {@inheritDoc} 425 */ 426 @Override 427 public void run() { 428 myTc.notify(msgForLater); 429 } 430 } 431 432 /** 433 * Invoked at startup to start the threads needed here. 434 */ 435 public void startThreads() { 436 int priority = Thread.currentThread().getPriority(); 437 log.debug("startThreads current priority = {} max available = {} default = {} min available = {}", // NOI18N 438 priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY); 439 440 // start the RcvHandler in a thread of its own 441 if (rcvHandler == null) { 442 rcvHandler = new RcvHandler(this); 443 } 444 rcvThread = new Thread(rcvHandler, "LocoNet receive handler"); // NOI18N 445 rcvThread.setDaemon(true); 446 rcvThread.setPriority(Thread.MAX_PRIORITY); 447 rcvThread.start(); 448 449 if (xmtHandler == null) { 450 xmtHandler = new XmtHandler(); 451 } 452 // make sure that the xmt priority is no lower than the current priority 453 int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY); 454 // start the XmtHandler in a thread of its own 455 if (xmtThread == null) { 456 xmtThread = new Thread(xmtHandler, "LocoNet transmit handler"); // NOI18N 457 } 458 log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N 459 xmtThread.setDaemon(true); 460 xmtThread.setPriority(Thread.MAX_PRIORITY - 1); 461 xmtThread.start(); 462 463 log.info("lnPacketizer Started"); 464 } 465 466 protected Thread rcvThread; 467 protected Thread xmtThread; 468 469 /** 470 * {@inheritDoc} 471 */ 472 @SuppressWarnings("deprecation") // Thread.stop() 473 @Override 474 public void dispose() { 475 if (xmtThread != null) { 476 xmtThread.stop(); // interrupt not sufficient? 477 try { 478 xmtThread.join(); 479 } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);} 480 } 481 if (rcvThread != null) { 482 rcvThread.stop(); // interrupt not sufficient with the previous serial library. Not known if OK with SerialComm? 483 try { 484 rcvThread.join(); 485 } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);} 486 } 487 super.dispose(); 488 } 489 490 /** 491 * Terminate the receive and transmit threads. 492 * <p> 493 * This is intended to be used only by testing subclasses. 494 */ 495 public void terminateThreads() { 496 threadStopRequest = true; 497 if (xmtThread != null) { 498 xmtThread.interrupt(); 499 try { 500 xmtThread.join(); 501 } catch (InterruptedException ie){ 502 // interrupted during cleanup. 503 } 504 } 505 506 if (rcvThread != null) { 507 rcvThread.interrupt(); 508 try { 509 rcvThread.join(); 510 } catch (InterruptedException ie){ 511 // interrupted during cleanup. 512 } 513 } 514 } 515 516 /** 517 * Flag that threads should terminate as soon as they can. 518 */ 519 protected volatile boolean threadStopRequest = false; 520 521 private final static Logger log = LoggerFactory.getLogger(LnPacketizer.class); 522 523}