001package jmri.jmrix; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.io.DataInputStream; 005import java.io.IOException; 006import java.io.OutputStream; 007import java.util.*; 008import javax.swing.SwingUtilities; 009 010import jmri.InstanceManager; 011import jmri.ShutDownManager; 012import jmri.ShutDownTask; 013 014/** 015 * Abstract base for TrafficControllers in a Message/Reply protocol. 016 * <p> 017 * Two threads are used for the actual communication. The "Transmit" thread 018 * handles pushing characters to the port, and also changing the mode. The 019 * "Receive" thread converts characters from the input stream into replies. 020 * <p> 021 * The constructor registers a shutdown task to 022 * trigger the necessary cleanup code 023 * <p> 024 * The internal state machine handles changes of mode, automatic retry of 025 * certain messages, time outs, and sending poll messages when otherwise idle. 026 * <p> 027 * "Mode" refers to the state of the command station communications. "Normal" 028 * and "Programming" are the two modes, used if the command station requires 029 * messages to go back and forth between them. <br> 030 * 031 * <img src="doc-files/AbstractMRTrafficController-StateDiagram.png" alt="UML State diagram"> 032 * 033 * <p> 034 * The key methods for the basic operation are: 035 * <ul> 036 * <li>If needed for formatting outbound messages, {@link #addHeaderToOutput(byte[], AbstractMRMessage)} and {@link #addTrailerToOutput(byte[], int, AbstractMRMessage)} 037 * <li> {@link #newReply()} creates an empty reply message (of the proper concrete type) to fill with incoming data 038 * <li>The {@link #endOfMessage(AbstractMRReply) } method is used to parse incoming messages. If it needs 039 * information on e.g. the last message sent, that can be stored in member variables 040 * by {@link #forwardToPort(AbstractMRMessage, AbstractMRListener)}. 041 * <li>{@link #forwardMessage(AbstractMRListener, AbstractMRMessage)} and {@link #forwardReply(AbstractMRListener, AbstractMRReply) } handle forwarding of specific types of objects 042 * </ul> 043 * <p> 044 * If your command station requires messages to go in and out of 045 * "programming mode", those should be provided by 046 * {@link #enterProgMode()} and {@link #enterNormalMode()}. 047 * <p> 048 * If you want to poll for information when the line is otherwise idle, 049 * implement {@link #pollMessage()} and {@link #pollReplyHandler()}. 050 * 051 * @author Bob Jacobsen Copyright (C) 2003 052 * @author Paul Bender Copyright (C) 2004-2010 053 */ 054 055/* 056@startuml jmri/jmrix/doc-files/AbstractMRTrafficController-StateDiagram.png 057 058 [*] --> IDLESTATE 059 IDLESTATE --> NOTIFIEDSTATE : sendMessage() 060 NOTIFIEDSTATE --> IDLESTATE : queue empty 061 062 NOTIFIEDSTATE --> WAITMSGREPLYSTATE : transmitLoop()\nwake, send message 063 064 WAITMSGREPLYSTATE --> WAITREPLYINPROGMODESTATE : transmitLoop()\nnot in PROGRAMINGMODE,\nmsg for PROGRAMINGMODE 065 WAITMSGREPLYSTATE --> WAITREPLYINNORMMODESTATE : transmitLoop()\nnot in NORMALMODE,\nmsg for NORMALMODE 066 067 WAITMSGREPLYSTATE --> NOTIFIEDSTATE : handleOneIncomingReply() 068 069 WAITREPLYINPROGMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered PROGRAMINGMODE 070 WAITREPLYINNORMMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered NORMALMODE 071 OKSENDMSGSTATE --> WAITMSGREPLYSTATE : send original pended message 072 073 IDLESTATE --> POLLSTATE : transmitLoop()\nno work 074 POLLSTATE --> WAITMSGREPLYSTATE : transmitLoop()\npoll msg exists, send it 075 POLLSTATE --> IDLESTATE : transmitLoop()\nno poll msg to send 076 077 WAITMSGREPLYSTATE --> AUTORETRYSTATE : handleOneIncomingReply()\nwhen tagged as error reply 078 AUTORETRYSTATE --> IDLESTATE : to drive a repeat of a message 079 080NOTIFIEDSTATE : Transmit thread wakes up and processes 081POLLSTATE : Transient while deciding to send poll 082OKSENDMSGSTATE : Transient while deciding to send\noriginal message after mode change 083AUTORETRYSTATE : Transient while deciding to resend auto-retry message 084WAITREPLYINPROGMODESTATE : Sent request to go to programming mode,\nwaiting reply 085WAITREPLYINNORMMODESTATE : Sent request to go to normal mode,\nwaiting reply 086WAITMSGREPLYSTATE : Have sent message, waiting a\nresponse from layout 087 088Note left of AUTORETRYSTATE : This state handles timeout of\nmessages marked for autoretry 089Note left of OKSENDMSGSTATE : Transient internal state\nwill transition when going back\nto send message that\nwas deferred for mode change. 090 091@enduml 092 */ 093 094public abstract class AbstractMRTrafficController { 095 096 private final Runnable shutDownTask = this::terminate; // retain for possible removal. 097 098 /** 099 * Create a new unnamed MRTrafficController. 100 */ 101 public AbstractMRTrafficController() { 102 log.debug("Creating AbstractMRTrafficController instance"); 103 mCurrentMode = NORMALMODE; 104 mCurrentState = IDLESTATE; 105 allowUnexpectedReply = false; 106 107 108 // We use a shutdown task here to make sure the connection is left 109 // in a clean state prior to exiting. This is required on systems 110 // which have a service mode to ensure we don't leave the system 111 // in an unusable state. Once the shutdown task executes, the connection 112 // must be considered permanently closed. 113 114 InstanceManager.getDefault(ShutDownManager.class).register(shutDownTask); 115 } 116 117 private boolean synchronizeRx = true; 118 119 protected void setSynchronizeRx(boolean val) { 120 synchronizeRx = val; 121 } 122 123 protected boolean getSynchronizeRx() { 124 return synchronizeRx; 125 } 126 127 // The methods to implement the abstract Interface 128 129 protected final Vector<AbstractMRListener> cmdListeners = new Vector<>(); 130 131 protected synchronized void addListener(AbstractMRListener l) { 132 // add only if not already registered 133 if (l == null) { 134 throw new NullPointerException(); 135 } 136 if (!cmdListeners.contains(l)) { 137 cmdListeners.addElement(l); 138 } 139 } 140 141 protected synchronized void removeListener(AbstractMRListener l) { 142 if (cmdListeners.contains(l)) { 143 cmdListeners.removeElement(l); 144 } 145 } 146 147 /** 148 * Forward a Message to registered listeners. 149 * 150 * @param m Message to be forwarded intact 151 * @param notMe One (optional) listener to be skipped, usually because it's 152 * the originating object. 153 */ 154 @SuppressWarnings("unchecked") 155 protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) { 156 // make a copy of the listener vector to synchronized not needed for transmit 157 Vector<AbstractMRListener> v; 158 synchronized (this) { 159 // FIXME: unnecessary synchronized; the Vector IS already thread-safe. 160 v = (Vector<AbstractMRListener>) cmdListeners.clone(); 161 } 162 // forward to all listeners 163 int cnt = v.size(); 164 for (int i = 0; i < cnt; i++) { 165 AbstractMRListener client = v.elementAt(i); 166 if (notMe != client) { 167 log.debug("notify client: {}", client); 168 try { 169 forwardMessage(client, m); 170 } catch (RuntimeException e) { 171 log.warn("notify: During message dispatch to {}", client, e); 172 } 173 } 174 } 175 } 176 177 /** 178 * Implement this to forward a specific message type to a protocol-specific 179 * listener interface. 180 * This puts the casting into the concrete class. 181 * @param client abstract listener. 182 * @param m message to forward. 183 */ 184 protected abstract void forwardMessage(AbstractMRListener client, AbstractMRMessage m); 185 186 /** 187 * Invoked if it's appropriate to do low-priority polling of the command 188 * station, this should return the next message to send, or null if the 189 * TrafficController should just sleep. 190 * @return Formatted poll message 191 */ 192 protected abstract AbstractMRMessage pollMessage(); 193 194 protected abstract AbstractMRListener pollReplyHandler(); 195 196 protected AbstractMRListener mLastSender = null; 197 198 protected volatile int mCurrentMode; 199 public static final int NORMALMODE = 1; 200 public static final int PROGRAMINGMODE = 4; 201 202 /** 203 * Set the system to programming mode. 204 * @see #enterNormalMode() 205 * 206 * @return any message that needs to be returned to the Command Station 207 * to change modes. If no message is needed, returns null. 208 */ 209 protected abstract AbstractMRMessage enterProgMode(); 210 211 /** 212 * Sets the system to normal mode during programming while in IDLESTATE. 213 * If {@link #programmerIdle()} returns true, enterNormalMode() is 214 * called after a timeout. 215 * @see #enterProgMode() 216 * 217 * @return any message that needs to be returned to the Command Station 218 * to change modes. If no message is needed, returns null. 219 */ 220 protected abstract AbstractMRMessage enterNormalMode(); 221 222 /** 223 * Check if the programmer is idle. 224 * Override in the system specific code if necessary (see notes for 225 * {@link #enterNormalMode()}. 226 * 227 * @return true if not busy programming 228 */ 229 protected boolean programmerIdle() { 230 return true; 231 } 232 233 /** 234 * Get the delay (wait time) after enabling the programming track. 235 * Override in subclass to add a longer delay. 236 * 237 * @return 0 as default delay 238 */ 239 protected int enterProgModeDelayTime() { 240 return 0; 241 } 242 243 protected volatile int mCurrentState; 244 public static final int IDLESTATE = 10; // nothing happened 245 public static final int NOTIFIEDSTATE = 15; // xmt notified, will next wake 246 public static final int WAITMSGREPLYSTATE = 25; // xmt has sent, await reply to message 247 public static final int WAITREPLYINPROGMODESTATE = 30; // xmt has done mode change, await reply 248 public static final int WAITREPLYINNORMMODESTATE = 35; // xmt has done mode change, await reply 249 public static final int OKSENDMSGSTATE = 40; // mode change reply here, send original msg 250 public static final int AUTORETRYSTATE = 45; // received message where automatic recovery may occur with a retransmission, re-send original msg 251 public static final int POLLSTATE = 50; // Send program mode or poll message 252 253 protected boolean allowUnexpectedReply; 254 255 /** 256 * Set whether the command station may send messages without a request 257 * sent to it. 258 * 259 * @param expected true to allow messages without a prior request 260 */ 261 protected void setAllowUnexpectedReply(boolean expected) { 262 allowUnexpectedReply = expected; 263 } 264 265 /** 266 * Forward a "Reply" from layout to registered listeners. 267 * 268 * @param r Reply to be forwarded intact 269 * @param dest One (optional) listener to be skipped, usually because it's 270 * the originating object. 271 */ 272 @SuppressWarnings("unchecked") 273 protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) { 274 // make a copy of the listener vector to synchronized (not needed for transmit?) 275 Vector<AbstractMRListener> v; 276 synchronized (this) { 277 // FIXME: unnecessary synchronized; the Vector IS already thread-safe. 278 v = (Vector<AbstractMRListener>) cmdListeners.clone(); 279 } 280 // forward to all listeners 281 int cnt = v.size(); 282 for (int i = 0; i < cnt; i++) { 283 AbstractMRListener client = v.elementAt(i); 284 log.debug("notify client: {}", client); 285 try { 286 //skip dest for now, we'll send the message to there last. 287 if (dest != client) { 288 forwardReply(client, r); 289 } 290 } catch (RuntimeException e) { 291 log.warn("notify: During reply dispatch to {}", client, e); 292 } 293 } 294 295 // forward to the last listener who sent a message 296 // this is done _second_ so monitoring can have already stored the reply 297 // before a response is sent 298 if (dest != null) { 299 forwardReply(dest, r); 300 } 301 } 302 303 protected abstract void forwardReply(AbstractMRListener client, AbstractMRReply m); 304 305 /** 306 * Messages to be transmitted. 307 */ 308 protected LinkedList<AbstractMRMessage> msgQueue = new LinkedList<>(); 309 protected LinkedList<AbstractMRListener> listenerQueue = new LinkedList<>(); 310 311 /** 312 * Forward message to the port. Messages are queued and then the 313 * transmission thread is notified. 314 * @see #forwardToPort(AbstractMRMessage, AbstractMRListener) 315 * 316 * @param m the message to send 317 * @param reply the Listener sending the message, often provided as 'this' 318 */ 319 protected synchronized void sendMessage(AbstractMRMessage m, AbstractMRListener reply) { 320 msgQueue.addLast(m); 321 listenerQueue.addLast(reply); 322 synchronized (xmtRunnable) { 323 if (mCurrentState == IDLESTATE) { 324 mCurrentState = NOTIFIEDSTATE; 325 xmtRunnable.notify(); 326 } 327 } 328 if (m != null) { 329 log.debug("just notified transmit thread with message {}", m); 330 } 331 } 332 333 /** 334 * Permanent loop for the transmit thread. 335 */ 336 protected void transmitLoop() { 337 log.debug("transmitLoop starts in {}", this); 338 339 // loop forever 340 while (!connectionError && !threadStopRequest) { 341 AbstractMRMessage m = null; 342 AbstractMRListener l = null; 343 // check for something to do 344 synchronized (this) { 345 if (!msgQueue.isEmpty()) { 346 // yes, something to do 347 m = msgQueue.getFirst(); 348 msgQueue.removeFirst(); 349 l = listenerQueue.getFirst(); 350 listenerQueue.removeFirst(); 351 mCurrentState = WAITMSGREPLYSTATE; 352 log.debug("transmit loop has something to do: {}", m); 353 } // release lock here to proceed in parallel 354 } 355 // if a message has been extracted, process it 356 if (m != null) { 357 // check for need to change mode 358 log.debug("Start msg, state = {}", mCurrentMode); 359 if (m.getNeededMode() != mCurrentMode) { 360 AbstractMRMessage modeMsg; 361 if (m.getNeededMode() == PROGRAMINGMODE) { 362 // change state to programming mode and send message 363 modeMsg = enterProgMode(); 364 if (modeMsg != null) { 365 mCurrentState = WAITREPLYINPROGMODESTATE; 366 log.debug("Enter Programming Mode"); 367 forwardToPort(modeMsg, null); 368 // wait for reply 369 transmitWait(m.getTimeout(), WAITREPLYINPROGMODESTATE, "enter programming mode interrupted"); 370 } 371 } else { 372 // change state to normal and send message 373 modeMsg = enterNormalMode(); 374 if (modeMsg != null) { 375 mCurrentState = WAITREPLYINNORMMODESTATE; 376 log.debug("Enter Normal Mode"); 377 forwardToPort(modeMsg, null); 378 // wait for reply 379 transmitWait(m.getTimeout(), WAITREPLYINNORMMODESTATE, "enter normal mode interrupted"); 380 } 381 } 382 if (modeMsg != null) { 383 checkReplyInDispatch(); 384 if (mCurrentState != OKSENDMSGSTATE) { 385 handleTimeout(modeMsg, l); 386 } 387 mCurrentState = WAITMSGREPLYSTATE; 388 } else { 389 // no mode message required, but the message 390 // needs a different mode 391 log.debug("Setting mode to: {}", m.getNeededMode()); 392 mCurrentMode = m.getNeededMode(); 393 } 394 } 395 forwardToPort(m, l); 396 // reply expected? 397 if (m.replyExpected()) { 398 log.debug("reply expected is true for message {}",m); 399 // wait for a reply, or eventually timeout 400 transmitWait(m.getTimeout(), WAITMSGREPLYSTATE, "transmitLoop interrupted"); 401 checkReplyInDispatch(); 402 if (mCurrentState == WAITMSGREPLYSTATE) { 403 handleTimeout(m, l); 404 } else if (mCurrentState == AUTORETRYSTATE) { 405 log.info("Message added back to queue: {}", m); 406 msgQueue.addFirst(m); 407 listenerQueue.addFirst(l); 408 synchronized (xmtRunnable) { 409 mCurrentState = IDLESTATE; 410 } 411 } else { 412 resetTimeout(m); 413 } 414 } // just continue to the next message from here 415 } else { 416 // nothing to do 417 if (mCurrentState != IDLESTATE) { 418 log.debug("Setting IDLESTATE"); 419 log.debug("Current Mode {}", mCurrentMode); 420 mCurrentState = IDLESTATE; 421 } 422 // wait for something to send 423 if (mWaitBeforePoll > waitTimePoll || mCurrentMode == PROGRAMINGMODE) { 424 try { 425 long startTime = Calendar.getInstance().getTimeInMillis(); 426 synchronized (xmtRunnable) { 427 xmtRunnable.wait(mWaitBeforePoll); 428 } 429 long endTime = Calendar.getInstance().getTimeInMillis(); 430 waitTimePoll = waitTimePoll + endTime - startTime; 431 } catch (InterruptedException e) { 432 Thread.currentThread().interrupt(); // retain if needed later 433 // end of transmit loop 434 break; 435 } 436 } 437 // once we decide that mCurrentState is in the IDLESTATE and there's an xmt msg we must guarantee 438 // the change of mCurrentState to one of the waiting for reply states. Therefore we need to synchronize. 439 synchronized (this) { 440 if (mCurrentState != NOTIFIEDSTATE && mCurrentState != IDLESTATE) { 441 log.error("left timeout in unexpected state: {}", mCurrentState); 442 } 443 if (mCurrentState == IDLESTATE) { 444 mCurrentState = POLLSTATE; // this prevents other transitions from the IDLESTATE 445 } 446 } 447 // went around with nothing to do; leave programming state if in it 448 if (mCurrentMode == PROGRAMINGMODE) { 449 log.debug("Timeout - in service mode"); 450 } 451 if (mCurrentState == POLLSTATE && mCurrentMode == PROGRAMINGMODE && programmerIdle()) { 452 log.debug("timeout causes leaving programming mode"); 453 mCurrentState = WAITREPLYINNORMMODESTATE; 454 AbstractMRMessage msg = enterNormalMode(); 455 // if the enterNormalMode() message is null, we 456 // don't want to try to send it to the port. 457 if (msg != null) { 458 forwardToPort(msg, null); 459 // wait for reply 460 transmitWait(msg.getTimeout(), WAITREPLYINNORMMODESTATE, "interrupted while leaving programming mode"); 461 checkReplyInDispatch(); 462 // exit program mode timeout? 463 if (mCurrentState == WAITREPLYINNORMMODESTATE) { 464 // entering normal mode via timeout 465 handleTimeout(msg, l); 466 mCurrentMode = NORMALMODE; 467 } 468 // and go around again 469 } 470 } else if (mCurrentState == POLLSTATE && mCurrentMode == NORMALMODE) { 471 // We may need to poll 472 AbstractMRMessage msg = pollMessage(); 473 if (msg != null) { 474 // yes, send that 475 log.debug("Sending poll, wait time {}", Long.toString(waitTimePoll)); 476 mCurrentState = WAITMSGREPLYSTATE; 477 forwardToPort(msg, pollReplyHandler()); 478 // wait for reply 479 log.debug("Still waiting for reply"); 480 transmitWait(msg.getTimeout(), WAITMSGREPLYSTATE, "interrupted while waiting poll reply"); 481 checkReplyInDispatch(); 482 // and go around again 483 if (mCurrentState == WAITMSGREPLYSTATE) { 484 handleTimeout(msg, l); 485 } else { 486 resetTimeout(msg); 487 } 488 } 489 waitTimePoll = 0; 490 } 491 // no messages, so back to idle 492 if (mCurrentState == POLLSTATE) { 493 mCurrentState = IDLESTATE; 494 } 495 } 496 } 497 } // end of transmit loop; go around again 498 499 protected void transmitWait(int waitTime, int state, String interruptMessage) { 500 // wait() can have spurious wakeup! 501 // so we protect by making sure the entire timeout time is used 502 long currentTime = Calendar.getInstance().getTimeInMillis(); 503 long endTime = currentTime + waitTime; 504 while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) { 505 long wait = endTime - currentTime; 506 try { 507 synchronized (xmtRunnable) { 508 // Do not wait if the current state has changed since we 509 // last set it. 510 if (mCurrentState != state) { 511 return; 512 } 513 xmtRunnable.wait(wait); // rcvr normally ends this w state change 514 } 515 } catch (InterruptedException e) { 516 Thread.currentThread().interrupt(); // retain if needed later 517 String[] packages = this.getClass().getName().split("\\."); 518 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 519 +(packages.length>=1 ? packages[packages.length-1] :""); 520 if (!threadStopRequest) { 521 log.error("{} in transmitWait(..) of {}", interruptMessage, name); 522 } else { 523 log.debug("during shutdown, {} in transmitWait(..) of {}", interruptMessage, name); 524 } 525 } 526 } 527 log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState); 528 } 529 530 // Dispatch control and timer 531 protected boolean replyInDispatch = false; // true when reply has been received but dispatch not completed 532 private int maxDispatchTime = 0; 533 private int warningMessageTime = DISPATCH_WARNING_TIME; 534 private static final int DISPATCH_WAIT_INTERVAL = 100; 535 private static final int DISPATCH_WARNING_TIME = 12000; // report warning when max dispatch time exceeded 536 private static final int WARN_NEXT_TIME = 1000; // report every second 537 538 private void checkReplyInDispatch() { 539 int loopCount = 0; 540 while (replyInDispatch) { 541 try { 542 synchronized (xmtRunnable) { 543 xmtRunnable.wait(DISPATCH_WAIT_INTERVAL); 544 } 545 } catch (InterruptedException e) { 546 Thread.currentThread().interrupt(); // retain if needed later 547 if (threadStopRequest) return; // don't log an error if closing. 548 String[] packages = this.getClass().getName().split("\\."); 549 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 550 +(packages.length>=1 ? packages[packages.length-1] :""); 551 log.error("transmitLoop interrupted in class {}", name); 552 } 553 loopCount++; 554 int currentDispatchTime = loopCount * DISPATCH_WAIT_INTERVAL; 555 if (currentDispatchTime > maxDispatchTime) { 556 maxDispatchTime = currentDispatchTime; 557 if (currentDispatchTime >= warningMessageTime) { 558 warningMessageTime = warningMessageTime + WARN_NEXT_TIME; 559 log.debug("Max dispatch time is now {}", currentDispatchTime); 560 } 561 } 562 } 563 } 564 565 /** 566 * Determine if the interface is down. 567 * 568 * @return timeoutFlag 569 */ 570 public boolean hasTimeouts() { 571 return timeoutFlag; 572 } 573 574 private boolean timeoutFlag = false; 575 private int timeouts = 0; 576 protected boolean flushReceiveChars = false; 577 578 protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) { 579 //log.debug("Timeout mCurrentState: {}", mCurrentState); 580 String[] packages = this.getClass().getName().split("\\."); 581 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 582 +(packages.length>=1 ? packages[packages.length-1] :""); 583 584 log.warn("Timeout on reply to message: {} consecutive timeouts = {} in {}", msg, timeouts, name); 585 timeouts++; 586 timeoutFlag = true; 587 flushReceiveChars = true; 588 } 589 590 protected void resetTimeout(AbstractMRMessage msg) { 591 if (timeouts > 0) { 592 log.debug("Reset timeout after {} timeouts", timeouts); 593 } 594 timeouts = 0; 595 timeoutFlag = false; 596 } 597 598 /** 599 * Add header to the outgoing byte stream. 600 * 601 * @param msg the output byte stream 602 * @param m Message results 603 * @return next location in the stream to fill 604 */ 605 protected int addHeaderToOutput(byte[] msg, AbstractMRMessage m) { 606 return 0; 607 } 608 609 protected int mWaitBeforePoll = 100; 610 protected long waitTimePoll = 0; 611 612 /** 613 * Add trailer to the outgoing byte stream. 614 * 615 * @param msg the output byte stream 616 * @param offset the first byte not yet used 617 * @param m output message to extend 618 */ 619 protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) { 620 if (!m.isBinary()) { 621 msg[offset] = 0x0d; 622 } 623 } 624 625 /** 626 * Determine how many bytes the entire message will take, including 627 * space for header and trailer. 628 * 629 * @param m the message to be sent 630 * @return number of bytes 631 */ 632 protected int lengthOfByteStream(AbstractMRMessage m) { 633 int len = m.getNumDataElements(); 634 int cr = 0; 635 if (!m.isBinary()) { 636 cr = 1; // space for return char 637 } 638 return len + cr; 639 } 640 641 protected boolean xmtException = false; 642 643 /** 644 * Actually transmit the next message to the port. 645 * @see #sendMessage(AbstractMRMessage, AbstractMRListener) 646 * 647 * @param m the message to send 648 * @param reply the Listener sending the message, often provided as 'this' 649 */ 650 @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT"}, 651 justification = "Two locks needed for synchronization here, this is OK") 652 protected synchronized void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) { 653 log.debug("forwardToPort message: [{}]", m); 654 // remember who sent this 655 mLastSender = reply; 656 657 // forward the message to the registered recipients, 658 // which includes the communications monitor, except the sender. 659 // Schedule notification via the Swing event queue to ensure order 660 Runnable r = new XmtNotifier(m, mLastSender, this); 661 SwingUtilities.invokeLater(r); 662 663 // stream to port in single write, as that's needed by serial 664 int byteLength = lengthOfByteStream(m); 665 byte[] msg= new byte[byteLength]; 666 log.debug("copying message, length = {}", byteLength); 667 // add header 668 int offset = addHeaderToOutput(msg, m); 669 670 // add data content 671 int len = m.getNumDataElements(); 672 log.debug("copying data to message, length = {}", len); 673 if (len > byteLength) { // happens somehow 674 log.warn("Invalid message array size {} for {} elements, truncated", byteLength, len); 675 } 676 for (int i = 0; (i < len && i < byteLength); i++) { 677 msg[i + offset] = (byte) m.getElement(i); 678 } 679 // add trailer 680 addTrailerToOutput(msg, len + offset, m); 681 // and stream the bytes 682 try { 683 if (ostream != null) { 684 if (log.isDebugEnabled()) { 685 StringBuilder f = new StringBuilder("formatted message: "); 686 for (int i = 0; i < msg.length; i++) { 687 f.append(String.format("%02X ",0xFF & msg[i])); 688 } 689 log.debug(f.toString()); 690 } 691 while (m.getRetries() >= 0) { 692 if (portReadyToSend(controller)) { 693 ostream.write(msg); 694 ostream.flush(); 695 log.debug("written, msg timeout: {} mSec", m.getTimeout()); 696 break; 697 } else if (m.getRetries() >= 0) { 698 log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries()); 699 m.setRetries(m.getRetries() - 1); 700 try { 701 synchronized (xmtRunnable) { 702 xmtRunnable.wait(m.getTimeout()); 703 } 704 } catch (InterruptedException e) { 705 Thread.currentThread().interrupt(); // retain if needed later 706 log.error("retry wait interrupted"); 707 } 708 } else { 709 log.warn("sendMessage: port not ready for data sending: {}", Arrays.toString(msg)); 710 } 711 } 712 } else { // ostream is null 713 // no stream connected 714 connectionWarn(); 715 } 716 } catch (IOException | RuntimeException e) { 717 // TODO Currently there's no port recovery if an exception occurs 718 // must restart JMRI to clear xmtException. 719 xmtException = true; 720 portWarn(e); 721 } 722 } 723 724 protected void connectionWarn() { 725 log.warn("sendMessage: no connection established for {}", this.getClass().getName(), new Exception()); 726 } 727 728 protected void portWarn(Exception e) { 729 log.warn("sendMessage: Exception: In {} port warn: ", this.getClass().getName(), e); 730 } 731 732 protected boolean connectionError = false; 733 734 protected void portWarnTCP(Exception e) { 735 log.warn("Exception java net: ", e); 736 connectionError = true; 737 } 738 // methods to connect/disconnect to a source of data in an AbstractPortController 739 740 public AbstractPortController controller = null; 741 742 public boolean status() { 743 return (ostream != null && istream != null); 744 } 745 746 protected volatile Thread xmtThread = null; 747 protected volatile Thread rcvThread = null; 748 749 protected volatile Runnable xmtRunnable = null; 750 751 /** 752 * Make connection to an existing PortController object. 753 * 754 * @param p the PortController 755 */ 756 public void connectPort(AbstractPortController p) { 757 rcvException = false; 758 connectionError = false; 759 xmtException = false; 760 threadStopRequest = false; 761 try { 762 istream = p.getInputStream(); 763 ostream = p.getOutputStream(); 764 if (controller != null) { 765 log.warn("connectPort: connect called while connected"); 766 } else { 767 log.debug("connectPort invoked"); 768 } 769 controller = p; 770 // and start threads 771 xmtThread = jmri.util.ThreadingUtil.newThread( 772 xmtRunnable = new Runnable() { 773 @Override 774 public void run() { 775 try { 776 transmitLoop(); 777 } catch(ThreadDeath td) { 778 if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", td, td); 779 // ThreadDeath must be thrown per Java API Javadocs 780 throw td; 781 } catch (Throwable e) { 782 if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", e, e); 783 } 784 } 785 }); 786 787 String[] packages = this.getClass().getName().split("\\."); 788 xmtThread.setName( 789 (packages.length>=2 ? packages[packages.length-2]+"." :"") 790 +(packages.length>=1 ? packages[packages.length-1] :"") 791 +" Transmit thread"); 792 793 xmtThread.setDaemon(true); 794 xmtThread.setPriority(Thread.MAX_PRIORITY-1); //bump up the priority 795 xmtThread.start(); 796 797 rcvThread = jmri.util.ThreadingUtil.newThread( 798 new Runnable() { 799 @Override 800 public void run() { 801 receiveLoop(); 802 } 803 }); 804 rcvThread.setName( 805 (packages.length>=2 ? packages[packages.length-2]+"." :"") 806 +(packages.length>=1 ? packages[packages.length-1] :"") 807 +" Receive thread"); 808 809 rcvThread.setPriority(Thread.MAX_PRIORITY); //bump up the priority 810 rcvThread.setDaemon(true); 811 rcvThread.start(); 812 813 } catch (RuntimeException e) { 814 log.error("Failed to start up communications. Error was: ", e); 815 log.debug("Full trace:", e); 816 } 817 } 818 819 /** 820 * Get the port name for this connection from the TrafficController. 821 * 822 * @return the name of the port 823 */ 824 public String getPortName() { 825 return controller.getCurrentPortName(); 826 } 827 828 /** 829 * Break connection to existing PortController object. Once broken, attempts 830 * to send via "message" member will fail. 831 * 832 * @param p the PortController 833 */ 834 public void disconnectPort(AbstractPortController p) { 835 istream = null; 836 ostream = null; 837 if (controller != p) { 838 log.warn("disconnectPort: disconnect called from non-connected AbstractPortController"); 839 } 840 controller = null; 841 threadStopRequest=true; 842 } 843 844 /** 845 * Check if PortController object can be sent to. 846 * 847 * @param p the PortController 848 * @return true if ready, false otherwise May throw an Exception. 849 */ 850 public boolean portReadyToSend(AbstractPortController p) { 851 if (p != null && !xmtException && !rcvException) { 852 return true; 853 } else { 854 return false; 855 } 856 } 857 858 // data members to hold the streams 859 protected DataInputStream istream = null; 860 protected OutputStream ostream = null; 861 862 protected boolean rcvException = false; 863 864 protected int maxRcvExceptionCount = 100; 865 866 /** 867 * Handle incoming characters. This is a permanent loop, looking for input 868 * messages in character form on the stream connected to the PortController 869 * via {@link #connectPort(AbstractPortController)}. 870 * <p> 871 * Each turn of the loop is the receipt of a single message. 872 */ 873 public void receiveLoop() { 874 log.debug("receiveLoop starts in {}", this); 875 int errorCount = 0; 876 while (errorCount < maxRcvExceptionCount && !threadStopRequest) { // stream close will exit via exception 877 try { 878 handleOneIncomingReply(); 879 errorCount = 0; 880 } catch (java.io.InterruptedIOException e) { 881 // related to InterruptedException, catch first 882 break; 883 } catch (IOException e) { 884 rcvException = true; 885 reportReceiveLoopException(e); 886 break; 887 } catch (RuntimeException e1) { 888 log.error("Exception in receive loop: {}", e1.toString(), e1); 889 errorCount++; 890 if (errorCount == maxRcvExceptionCount) { 891 rcvException = true; 892 reportReceiveLoopException(e1); 893 } 894 } 895 } 896 if (!threadStopRequest) { // if e.g. unexpected end 897 ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), ConnectionStatus.CONNECTION_DOWN); 898 log.error("Exit from rcv loop in {}", this.getClass()); 899 recovery(); // see if you can restart 900 } 901 } 902 903 /** 904 * Disconnect and reset the current PortController. 905 * Invoked at abnormal ending of receiveLoop. 906 */ 907 protected final void recovery() { 908 AbstractPortController adapter = controller; 909 disconnectPort(controller); 910 adapter.recover(); 911 } 912 913 /** 914 * Report an error on the receive loop. Separated so tests can suppress, even 915 * though message is asynchronous. 916 * @param e Exception encountered at lower level to trigger error, or null 917 */ 918 protected void reportReceiveLoopException(Exception e) { 919 log.error("run: Exception: {} in {}", e.toString(), this.getClass().toString(), e); 920 jmri.jmrix.ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), jmri.jmrix.ConnectionStatus.CONNECTION_DOWN); 921 if (controller instanceof AbstractNetworkPortController) { 922 portWarnTCP(e); 923 } 924 } 925 926 protected abstract AbstractMRReply newReply(); 927 928 protected abstract boolean endOfMessage(AbstractMRReply r); 929 930 /** 931 * Dummy routine, to be filled by protocols that have to skip some 932 * start-of-message characters. 933 * @param istream input source 934 * @throws IOException from underlying operations 935 */ 936 protected void waitForStartOfReply(DataInputStream istream) throws IOException { 937 } 938 939 /** 940 * Read a single byte, protecting against various timeouts, etc. 941 * <p> 942 * When a port is set to have a receive timeout (via the 943 * {@link purejavacomm.SerialPort#enableReceiveTimeout(int)} method), some will return 944 * zero bytes or an EOFException at the end of the timeout. In that case, the read 945 * should be repeated to get the next real character. 946 * 947 * @param istream stream to read 948 * @return the byte read 949 * @throws java.io.IOException if unable to read 950 */ 951 protected byte readByteProtected(DataInputStream istream) throws IOException { 952 if (istream == null) { 953 throw new IOException("Input Stream NULL when reading"); 954 } 955 while (true) { // loop will repeat until character found 956 int nchars; 957 nchars = istream.read(rcvBuffer, 0, 1); 958 if (nchars == -1) { 959 // No more bytes can be read from the channel 960 throw new IOException("Connection not terminated normally"); 961 } 962 if (nchars > 0) { 963 return rcvBuffer[0]; 964 } 965 } 966 } 967 968 // Defined this way to reduce new object creation 969 private byte[] rcvBuffer = new byte[1]; 970 971 /** 972 * Get characters from the input source, and file a message. 973 * <p> 974 * Returns only when the message is complete. 975 * <p> 976 * Only used in the Receive thread. 977 * <p> 978 * Handles timeouts on read by ignoring zero-length reads. 979 * 980 * @param msg message to fill 981 * @param istream character source. 982 * @throws IOException when presented by the input source. 983 */ 984 protected void loadChars(AbstractMRReply msg, DataInputStream istream) 985 throws IOException { 986 int i; 987 for (i = 0; i < msg.maxSize(); i++) { 988 byte char1 = readByteProtected(istream); 989 log.trace("char: {} i: {}",(char1&0xFF),i); 990 // if there was a timeout, flush any char received and start over 991 if (flushReceiveChars) { 992 log.warn("timeout flushes receive buffer: {}", msg); 993 msg.flush(); 994 i = 0; // restart 995 flushReceiveChars = false; 996 } 997 if (canReceive()) { 998 msg.setElement(i, char1); 999 if (endOfMessage(msg)) { 1000 break; 1001 } 1002 } else { 1003 i--; // flush char 1004 log.error("unsolicited character received: {}", Integer.toHexString(char1)); 1005 } 1006 } 1007 } 1008 1009 /** 1010 * Override in the system specific code if necessary 1011 * 1012 * @return true if it is okay to buffer receive characters into a reply 1013 * message. When false, discard char received 1014 */ 1015 protected boolean canReceive() { 1016 return true; 1017 } 1018 1019 private int retransmitCount = 0; 1020 1021 /** 1022 * Executes a reply distribution action on the appropriate thread for JMRI. 1023 * @param r a runnable typically encapsulating a MRReply and the iteration code needed to 1024 * send it to all the listeners. 1025 */ 1026 protected void distributeReply(Runnable r) { 1027 try { 1028 if (synchronizeRx) { 1029 SwingUtilities.invokeAndWait(r); 1030 } else { 1031 SwingUtilities.invokeLater(r); 1032 } 1033 } catch (InterruptedException ie) { 1034 if(threadStopRequest) return; 1035 log.error("Unexpected exception in invokeAndWait: {}{}", ie, ie.toString()); 1036 } catch (java.lang.reflect.InvocationTargetException| RuntimeException e) { 1037 log.error("Unexpected exception in invokeAndWait: {}{}", e, e.toString()); 1038 return; 1039 } 1040 log.debug("dispatch thread invoked"); 1041 } 1042 1043 /** 1044 * Handle each reply when complete. 1045 * <p> 1046 * (This is public for testing purposes) Runs in the "Receive" thread. 1047 * 1048 * @throws java.io.IOException on error. 1049 */ 1050 public void handleOneIncomingReply() throws IOException { 1051 // we sit in this until the message is complete, relying on 1052 // threading to let other stuff happen 1053 1054 // Create message off the right concrete class 1055 AbstractMRReply msg = newReply(); 1056 1057 // wait for start if needed 1058 waitForStartOfReply(istream); 1059 1060 // message exists, now fill it 1061 loadChars(msg, istream); 1062 1063 if (threadStopRequest) return; 1064 1065 // message is complete, dispatch it !! 1066 replyInDispatch = true; 1067 log.debug("dispatch reply of length {} contains \"{}\", state {}", msg.getNumDataElements(), msg, mCurrentState); 1068 1069 // forward the message to the registered recipients, 1070 // which includes the communications monitor 1071 // return a notification via the Swing event queue to ensure proper thread 1072 Runnable r = new RcvNotifier(msg, mLastSender, this); 1073 distributeReply(r); 1074 1075 if (!msg.isUnsolicited()) { 1076 // effect on transmit: 1077 switch (mCurrentState) { 1078 case WAITMSGREPLYSTATE: { 1079 // check to see if the response was an error message we want 1080 // to automatically handle by re-queueing the last sent 1081 // message, otherwise go on to the next message 1082 if (msg.isRetransmittableErrorMsg()) { 1083 log.error("Automatic Recovery from Error Message: {}. Retransmitted {} times.", msg, retransmitCount); 1084 synchronized (xmtRunnable) { 1085 mCurrentState = AUTORETRYSTATE; 1086 if (retransmitCount > 0) { 1087 try { 1088 xmtRunnable.wait(retransmitCount * 100L); 1089 } catch (InterruptedException e) { 1090 Thread.currentThread().interrupt(); // retain if needed later 1091 } 1092 } 1093 replyInDispatch = false; 1094 xmtRunnable.notify(); 1095 retransmitCount++; 1096 } 1097 } else { 1098 // update state, and notify to continue 1099 synchronized (xmtRunnable) { 1100 mCurrentState = NOTIFIEDSTATE; 1101 replyInDispatch = false; 1102 xmtRunnable.notify(); 1103 retransmitCount = 0; 1104 } 1105 } 1106 break; 1107 } 1108 case WAITREPLYINPROGMODESTATE: { 1109 // entering programming mode 1110 mCurrentMode = PROGRAMINGMODE; 1111 replyInDispatch = false; 1112 1113 // check to see if we need to delay to allow decoders to become 1114 // responsive 1115 int warmUpDelay = enterProgModeDelayTime(); 1116 if (warmUpDelay != 0) { 1117 try { 1118 synchronized (xmtRunnable) { 1119 xmtRunnable.wait(warmUpDelay); 1120 } 1121 } catch (InterruptedException e) { 1122 Thread.currentThread().interrupt(); // retain if needed later 1123 } 1124 } 1125 // update state, and notify to continue 1126 synchronized (xmtRunnable) { 1127 mCurrentState = OKSENDMSGSTATE; 1128 xmtRunnable.notify(); 1129 } 1130 break; 1131 } 1132 case WAITREPLYINNORMMODESTATE: { 1133 // entering normal mode 1134 mCurrentMode = NORMALMODE; 1135 replyInDispatch = false; 1136 // update state, and notify to continue 1137 synchronized (xmtRunnable) { 1138 mCurrentState = OKSENDMSGSTATE; 1139 xmtRunnable.notify(); 1140 } 1141 break; 1142 } 1143 default: { 1144 replyInDispatch = false; 1145 if (allowUnexpectedReply) { 1146 log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg); 1147 synchronized (xmtRunnable) { 1148 // The transmit thread sometimes gets stuck 1149 // when unexpected replies are received. Notify 1150 // it to clear the block without a timeout. 1151 // (do not change the current state) 1152 //if(mCurrentState!=IDLESTATE) 1153 xmtRunnable.notify(); 1154 } 1155 } else { 1156 unexpectedReplyStateError(mCurrentState, msg.toString()); 1157 } 1158 } 1159 } 1160 // Unsolicited message 1161 } else { 1162 log.debug("Unsolicited Message Received {}", msg); 1163 1164 replyInDispatch = false; 1165 } 1166 } 1167 1168 /** 1169 * Log an error message for a message received in an unexpected state. 1170 * @param State message state. 1171 * @param msgString message string. 1172 */ 1173 protected void unexpectedReplyStateError(int State, String msgString) { 1174 String[] packages = this.getClass().getName().split("\\."); 1175 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 1176 +(packages.length>=1 ? packages[packages.length-1] :""); 1177 log.error("reply complete in unexpected state: {} was {} in class {}", State, msgString, name); 1178 } 1179 1180 /** 1181 * for testing purposes, let us be able to find out 1182 * what the last sender was. 1183 * @return last sender, mLastSender. 1184 */ 1185 public AbstractMRListener getLastSender() { 1186 return mLastSender; 1187 } 1188 1189 // Override the finalize method for this class 1190 // to request termination, which might have happened 1191 // before in any case 1192 /** 1193 * finalize deprecated in Java 9, but not yet removed 1194 * @deprecated since Java 9 1195 */ 1196 @Override 1197 @Deprecated 1198 protected final void finalize() throws Throwable { 1199 terminate(); 1200 super.finalize(); 1201 } 1202 1203 protected void terminate() { 1204 log.debug("Cleanup Starts"); 1205 if (ostream == null) { 1206 return; // no connection established 1207 } 1208 AbstractMRMessage modeMsg = enterNormalMode(); 1209 if (modeMsg != null) { 1210 modeMsg.setRetries(100); // set the number of retries 1211 // high, just in case the interface 1212 // is busy when we try to send 1213 forwardToPort(modeMsg, null); 1214 // wait for reply 1215 try { 1216 if (xmtRunnable != null) { 1217 synchronized (xmtRunnable) { 1218 xmtRunnable.wait(modeMsg.getTimeout()); 1219 } 1220 } 1221 } catch (InterruptedException e) { 1222 Thread.currentThread().interrupt(); // retain if needed later 1223 log.error("transmit interrupted"); 1224 } 1225 } 1226 } 1227 1228 /** 1229 * Internal class to remember the Reply object and destination listener with 1230 * a reply is received. 1231 */ 1232 protected static class RcvNotifier implements Runnable { 1233 1234 AbstractMRReply mMsg; 1235 AbstractMRListener mDest; 1236 AbstractMRTrafficController mTc; 1237 1238 public RcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest, 1239 AbstractMRTrafficController pTc) { 1240 mMsg = pMsg; 1241 mDest = pDest; 1242 mTc = pTc; 1243 } 1244 1245 @Override 1246 public void run() { 1247 log.debug("Delayed rcv notify starts"); 1248 mTc.notifyReply(mMsg, mDest); 1249 } 1250 } // end RcvNotifier 1251 1252 // allow creation of object outside package 1253 protected RcvNotifier newRcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest, 1254 AbstractMRTrafficController pTc) { 1255 return new RcvNotifier(pMsg, pDest, pTc); 1256 } 1257 1258 /** 1259 * Internal class to remember the Message object and destination listener 1260 * when a message is queued for notification. 1261 */ 1262 protected static class XmtNotifier implements Runnable { 1263 1264 AbstractMRMessage mMsg; 1265 AbstractMRListener mDest; 1266 AbstractMRTrafficController mTc; 1267 1268 public XmtNotifier(AbstractMRMessage pMsg, AbstractMRListener pDest, 1269 AbstractMRTrafficController pTc) { 1270 mMsg = pMsg; 1271 mDest = pDest; 1272 mTc = pTc; 1273 } 1274 1275 @Override 1276 public void run() { 1277 log.debug("Delayed xmt notify starts"); 1278 mTc.notifyMessage(mMsg, mDest); 1279 } 1280 } // end XmtNotifier 1281 1282 /** 1283 * Terminate the receive and transmit threads. 1284 * <p> 1285 * This is intended to be used only by testing subclasses. 1286 */ 1287 public void terminateThreads() { 1288 threadStopRequest = true; 1289 if (xmtThread != null) { 1290 xmtThread.interrupt(); 1291 try { 1292 xmtThread.join(); 1293 } catch (InterruptedException ie){ 1294 // interrupted during cleanup. 1295 } 1296 } 1297 1298 if (rcvThread != null) { 1299 rcvThread.interrupt(); 1300 try { 1301 rcvThread.join(); 1302 } catch (InterruptedException ie){ 1303 // interrupted during cleanup. 1304 } 1305 } 1306 // we also need to remove the shutdown task. 1307 InstanceManager.getDefault(ShutDownManager.class).deregister(shutDownTask); 1308 } 1309 1310 /** 1311 * Flag that threads should terminate as soon as they can. 1312 */ 1313 protected volatile boolean threadStopRequest = false; 1314 1315 private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(AbstractMRTrafficController.class); 1316 1317}