001package jmri.jmrix; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.io.DataInputStream; 005import java.io.IOException; 006import java.io.OutputStream; 007import java.util.*; 008import javax.swing.SwingUtilities; 009 010import jmri.InstanceManager; 011import jmri.ShutDownManager; 012import jmri.ShutDownTask; 013 014/** 015 * Abstract base for TrafficControllers in a Message/Reply protocol. 016 * <p> 017 * Two threads are used for the actual communication. The "Transmit" thread 018 * handles pushing characters to the port, and also changing the mode. The 019 * "Receive" thread converts characters from the input stream into replies. 020 * <p> 021 * The constructor registers a shutdown task to 022 * trigger the necessary cleanup code 023 * <p> 024 * The internal state machine handles changes of mode, automatic retry of 025 * certain messages, time outs, and sending poll messages when otherwise idle. 026 * <p> 027 * "Mode" refers to the state of the command station communications. "Normal" 028 * and "Programming" are the two modes, used if the command station requires 029 * messages to go back and forth between them. <br> 030 * 031 * <img src="doc-files/AbstractMRTrafficController-StateDiagram.png" alt="UML State diagram"> 032 * 033 * <p> 034 * The key methods for the basic operation are: 035 * <ul> 036 * <li>If needed for formatting outbound messages, {@link #addHeaderToOutput(byte[], AbstractMRMessage)} and {@link #addTrailerToOutput(byte[], int, AbstractMRMessage)} 037 * <li> {@link #newReply()} creates an empty reply message (of the proper concrete type) to fill with incoming data 038 * <li>The {@link #endOfMessage(AbstractMRReply) } method is used to parse incoming messages. If it needs 039 * information on e.g. the last message sent, that can be stored in member variables 040 * by {@link #forwardToPort(AbstractMRMessage, AbstractMRListener)}. 041 * <li>{@link #forwardMessage(AbstractMRListener, AbstractMRMessage)} and {@link #forwardReply(AbstractMRListener, AbstractMRReply) } handle forwarding of specific types of objects 042 * </ul> 043 * <p> 044 * If your command station requires messages to go in and out of 045 * "programming mode", those should be provided by 046 * {@link #enterProgMode()} and {@link #enterNormalMode()}. 047 * <p> 048 * If you want to poll for information when the line is otherwise idle, 049 * implement {@link #pollMessage()} and {@link #pollReplyHandler()}. 050 * 051 * @author Bob Jacobsen Copyright (C) 2003 052 * @author Paul Bender Copyright (C) 2004-2010 053 */ 054 055/* 056@startuml jmri/jmrix/doc-files/AbstractMRTrafficController-StateDiagram.png 057 058 [*] --> IDLESTATE 059 IDLESTATE --> NOTIFIEDSTATE : sendMessage() 060 NOTIFIEDSTATE --> IDLESTATE : queue empty 061 062 NOTIFIEDSTATE --> WAITMSGREPLYSTATE : transmitLoop()\nwake, send message 063 064 WAITMSGREPLYSTATE --> WAITREPLYINPROGMODESTATE : transmitLoop()\nnot in PROGRAMINGMODE,\nmsg for PROGRAMINGMODE 065 WAITMSGREPLYSTATE --> WAITREPLYINNORMMODESTATE : transmitLoop()\nnot in NORMALMODE,\nmsg for NORMALMODE 066 067 WAITMSGREPLYSTATE --> NOTIFIEDSTATE : handleOneIncomingReply() 068 069 WAITREPLYINPROGMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered PROGRAMINGMODE 070 WAITREPLYINNORMMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered NORMALMODE 071 OKSENDMSGSTATE --> WAITMSGREPLYSTATE : send original pended message 072 073 IDLESTATE --> POLLSTATE : transmitLoop()\nno work 074 POLLSTATE --> WAITMSGREPLYSTATE : transmitLoop()\npoll msg exists, send it 075 POLLSTATE --> IDLESTATE : transmitLoop()\nno poll msg to send 076 077 WAITMSGREPLYSTATE --> AUTORETRYSTATE : handleOneIncomingReply()\nwhen tagged as error reply 078 AUTORETRYSTATE --> IDLESTATE : to drive a repeat of a message 079 080NOTIFIEDSTATE : Transmit thread wakes up and processes 081POLLSTATE : Transient while deciding to send poll 082OKSENDMSGSTATE : Transient while deciding to send\noriginal message after mode change 083AUTORETRYSTATE : Transient while deciding to resend auto-retry message 084WAITREPLYINPROGMODESTATE : Sent request to go to programming mode,\nwaiting reply 085WAITREPLYINNORMMODESTATE : Sent request to go to normal mode,\nwaiting reply 086WAITMSGREPLYSTATE : Have sent message, waiting a\nresponse from layout 087 088Note left of AUTORETRYSTATE : This state handles timeout of\nmessages marked for autoretry 089Note left of OKSENDMSGSTATE : Transient internal state\nwill transition when going back\nto send message that\nwas deferred for mode change. 090 091@enduml 092 */ 093 094public abstract class AbstractMRTrafficController { 095 096 private final Runnable shutDownTask = this::terminate; // retain for possible removal. 097 098 /** 099 * Create a new unnamed MRTrafficController. 100 */ 101 public AbstractMRTrafficController() { 102 log.debug("Creating AbstractMRTrafficController instance"); 103 mCurrentMode = NORMALMODE; 104 mCurrentState = IDLESTATE; 105 allowUnexpectedReply = false; 106 107 108 // We use a shutdown task here to make sure the connection is left 109 // in a clean state prior to exiting. This is required on systems 110 // which have a service mode to ensure we don't leave the system 111 // in an unusable state. Once the shutdown task executes, the connection 112 // must be considered permanently closed. 113 114 InstanceManager.getDefault(ShutDownManager.class).register(shutDownTask); 115 } 116 117 private boolean synchronizeRx = true; 118 119 protected void setSynchronizeRx(boolean val) { 120 synchronizeRx = val; 121 } 122 123 protected boolean getSynchronizeRx() { 124 return synchronizeRx; 125 } 126 127 // The methods to implement the abstract Interface 128 129 protected final Vector<AbstractMRListener> cmdListeners = new Vector<>(); 130 131 protected synchronized void addListener(AbstractMRListener l) { 132 // add only if not already registered 133 if (l == null) { 134 throw new NullPointerException(); 135 } 136 if (!cmdListeners.contains(l)) { 137 cmdListeners.addElement(l); 138 } 139 } 140 141 protected synchronized void removeListener(AbstractMRListener l) { 142 if (cmdListeners.contains(l)) { 143 cmdListeners.removeElement(l); 144 } 145 } 146 147 /** 148 * Forward a Message to registered listeners. 149 * 150 * @param m Message to be forwarded intact 151 * @param notMe One (optional) listener to be skipped, usually because it's 152 * the originating object. 153 */ 154 @SuppressWarnings("unchecked") 155 protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) { 156 // make a copy of the listener vector to synchronized not needed for transmit 157 Vector<AbstractMRListener> v; 158 synchronized (this) { 159 // FIXME: unnecessary synchronized; the Vector IS already thread-safe. 160 v = (Vector<AbstractMRListener>) cmdListeners.clone(); 161 } 162 // forward to all listeners 163 int cnt = v.size(); 164 for (int i = 0; i < cnt; i++) { 165 AbstractMRListener client = v.elementAt(i); 166 if (notMe != client) { 167 log.debug("notify message, client: {}", client); 168 try { 169 forwardMessage(client, m); 170 } catch (RuntimeException e) { 171 log.warn("notify: During message dispatch to {}", client, e); 172 } 173 } 174 } 175 } 176 177 /** 178 * Implement this to forward a specific message type to a protocol-specific 179 * listener interface. 180 * This puts the casting into the concrete class. 181 * @param client abstract listener. 182 * @param m message to forward. 183 */ 184 protected abstract void forwardMessage(AbstractMRListener client, AbstractMRMessage m); 185 186 /** 187 * Invoked if it's appropriate to do low-priority polling of the command 188 * station, this should return the next message to send, or null if the 189 * TrafficController should just sleep. 190 * @return Formatted poll message 191 */ 192 protected abstract AbstractMRMessage pollMessage(); 193 194 protected abstract AbstractMRListener pollReplyHandler(); 195 196 protected AbstractMRListener mLastSender = null; 197 198 protected volatile int mCurrentMode; 199 public static final int NORMALMODE = 1; 200 public static final int PROGRAMINGMODE = 4; 201 202 /** 203 * Set the system to programming mode. 204 * @see #enterNormalMode() 205 * 206 * @return any message that needs to be returned to the Command Station 207 * to change modes. If no message is needed, returns null. 208 */ 209 protected abstract AbstractMRMessage enterProgMode(); 210 211 /** 212 * Sets the system to normal mode during programming while in IDLESTATE. 213 * If {@link #programmerIdle()} returns true, enterNormalMode() is 214 * called after a timeout. 215 * @see #enterProgMode() 216 * 217 * @return any message that needs to be returned to the Command Station 218 * to change modes. If no message is needed, returns null. 219 */ 220 protected abstract AbstractMRMessage enterNormalMode(); 221 222 /** 223 * Check if the programmer is idle. 224 * Override in the system specific code if necessary (see notes for 225 * {@link #enterNormalMode()}. 226 * 227 * @return true if not busy programming 228 */ 229 protected boolean programmerIdle() { 230 return true; 231 } 232 233 /** 234 * Get the delay (wait time) after enabling the programming track. 235 * Override in subclass to add a longer delay. 236 * 237 * @return 0 as default delay 238 */ 239 protected int enterProgModeDelayTime() { 240 return 0; 241 } 242 243 protected volatile int mCurrentState; 244 public static final int IDLESTATE = 10; // nothing happened 245 public static final int NOTIFIEDSTATE = 15; // xmt notified, will next wake 246 public static final int WAITMSGREPLYSTATE = 25; // xmt has sent, await reply to message 247 public static final int WAITREPLYINPROGMODESTATE = 30; // xmt has done mode change, await reply 248 public static final int WAITREPLYINNORMMODESTATE = 35; // xmt has done mode change, await reply 249 public static final int OKSENDMSGSTATE = 40; // mode change reply here, send original msg 250 public static final int AUTORETRYSTATE = 45; // received message where automatic recovery may occur with a retransmission, re-send original msg 251 public static final int POLLSTATE = 50; // Send program mode or poll message 252 253 protected boolean allowUnexpectedReply; 254 255 /** 256 * Set whether the command station may send messages without a request 257 * sent to it. 258 * 259 * @param expected true to allow messages without a prior request 260 */ 261 protected void setAllowUnexpectedReply(boolean expected) { 262 allowUnexpectedReply = expected; 263 } 264 265 /** 266 * Forward a "Reply" from layout to registered listeners. 267 * 268 * @param r Reply to be forwarded intact 269 * @param dest One (optional) listener to be skipped, usually because it's 270 * the originating object. 271 */ 272 @SuppressWarnings("unchecked") 273 protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) { 274 // make a copy of the listener vector to synchronized (not needed for transmit?) 275 Vector<AbstractMRListener> v; 276 synchronized (this) { 277 // FIXME: unnecessary synchronized; the Vector IS already thread-safe. 278 v = (Vector<AbstractMRListener>) cmdListeners.clone(); 279 } 280 // forward to all listeners 281 int cnt = v.size(); 282 for (int i = 0; i < cnt; i++) { 283 AbstractMRListener client = v.elementAt(i); 284 log.debug("notify reply, client: {}", client); 285 try { 286 //skip dest for now, we'll send the message to there last. 287 if (dest != client) { 288 forwardReply(client, r); 289 } 290 } catch (RuntimeException e) { 291 log.warn("notify: During reply dispatch to {}", client, e); 292 } 293 } 294 295 // forward to the last listener who sent a message 296 // this is done _second_ so monitoring can have already stored the reply 297 // before a response is sent 298 if (dest != null) { 299 log.debug("notify reply, dest: {}", dest); 300 forwardReply(dest, r); 301 } 302 } 303 304 protected abstract void forwardReply(AbstractMRListener client, AbstractMRReply m); 305 306 /** 307 * Messages to be transmitted. 308 */ 309 protected LinkedList<AbstractMRMessage> msgQueue = new LinkedList<>(); 310 protected LinkedList<AbstractMRListener> listenerQueue = new LinkedList<>(); 311 312 /** 313 * Forward message to the port. Messages are queued and then the 314 * transmission thread is notified. 315 * @see #forwardToPort(AbstractMRMessage, AbstractMRListener) 316 * 317 * @param m the message to send 318 * @param reply the Listener sending the message, often provided as 'this' 319 */ 320 protected synchronized void sendMessage(AbstractMRMessage m, AbstractMRListener reply) { 321 msgQueue.addLast(m); 322 listenerQueue.addLast(reply); 323 synchronized (xmtRunnable) { 324 if (mCurrentState == IDLESTATE) { 325 mCurrentState = NOTIFIEDSTATE; 326 xmtRunnable.notify(); 327 } 328 } 329 if (m != null) { 330 log.debug("just notified transmit thread with message {}", m); 331 } 332 } 333 334 /** 335 * Permanent loop for the transmit thread. 336 */ 337 protected void transmitLoop() { 338 log.debug("transmitLoop starts in {}", this); 339 340 // loop forever 341 while (!connectionError && !threadStopRequest) { 342 AbstractMRMessage m = null; 343 AbstractMRListener l = null; 344 // check for something to do 345 synchronized (this) { 346 if (!msgQueue.isEmpty()) { 347 // yes, something to do 348 m = msgQueue.getFirst(); 349 msgQueue.removeFirst(); 350 l = listenerQueue.getFirst(); 351 listenerQueue.removeFirst(); 352 mCurrentState = WAITMSGREPLYSTATE; 353 log.debug("transmit loop has something to do: {}", m); 354 } // release lock here to proceed in parallel 355 } 356 // if a message has been extracted, process it 357 if (m != null) { 358 // check for need to change mode 359 log.debug("Start msg, state = {}", mCurrentMode); 360 if (m.getNeededMode() != mCurrentMode) { 361 AbstractMRMessage modeMsg; 362 if (m.getNeededMode() == PROGRAMINGMODE) { 363 // change state to programming mode and send message 364 modeMsg = enterProgMode(); 365 if (modeMsg != null) { 366 mCurrentState = WAITREPLYINPROGMODESTATE; 367 log.debug("Enter Programming Mode"); 368 forwardToPort(modeMsg, null); 369 // wait for reply 370 transmitWait(m.getTimeout(), WAITREPLYINPROGMODESTATE, "enter programming mode interrupted"); 371 } 372 } else { 373 // change state to normal and send message 374 modeMsg = enterNormalMode(); 375 if (modeMsg != null) { 376 mCurrentState = WAITREPLYINNORMMODESTATE; 377 log.debug("Enter Normal Mode"); 378 forwardToPort(modeMsg, null); 379 // wait for reply 380 transmitWait(m.getTimeout(), WAITREPLYINNORMMODESTATE, "enter normal mode interrupted"); 381 } 382 } 383 if (modeMsg != null) { 384 checkReplyInDispatch(); 385 if (mCurrentState != OKSENDMSGSTATE) { 386 handleTimeout(modeMsg, l); 387 } 388 mCurrentState = WAITMSGREPLYSTATE; 389 } else { 390 // no mode message required, but the message 391 // needs a different mode 392 log.debug("Setting mode to: {}", m.getNeededMode()); 393 mCurrentMode = m.getNeededMode(); 394 } 395 } 396 forwardToPort(m, l); 397 // reply expected? 398 if (m.replyExpected()) { 399 log.debug("reply expected is true for message {}",m); 400 // wait for a reply, or eventually timeout 401 transmitWait(m.getTimeout(), WAITMSGREPLYSTATE, "transmitLoop interrupted"); 402 checkReplyInDispatch(); 403 if (mCurrentState == WAITMSGREPLYSTATE) { 404 handleTimeout(m, l); 405 } else if (mCurrentState == AUTORETRYSTATE) { 406 log.info("Message added back to queue: {}", m); 407 msgQueue.addFirst(m); 408 listenerQueue.addFirst(l); 409 synchronized (xmtRunnable) { 410 mCurrentState = IDLESTATE; 411 } 412 } else { 413 resetTimeout(m); 414 } 415 } // just continue to the next message from here 416 } else { 417 // nothing to do 418 if (mCurrentState != IDLESTATE) { 419 log.debug("Setting IDLESTATE"); 420 log.debug("Current Mode {}", mCurrentMode); 421 mCurrentState = IDLESTATE; 422 } 423 // wait for something to send 424 if (mWaitBeforePoll > waitTimePoll || mCurrentMode == PROGRAMINGMODE) { 425 try { 426 long startTime = Calendar.getInstance().getTimeInMillis(); 427 synchronized (xmtRunnable) { 428 xmtRunnable.wait(mWaitBeforePoll); 429 } 430 long endTime = Calendar.getInstance().getTimeInMillis(); 431 waitTimePoll = waitTimePoll + endTime - startTime; 432 } catch (InterruptedException e) { 433 Thread.currentThread().interrupt(); // retain if needed later 434 // end of transmit loop 435 break; 436 } 437 } 438 // once we decide that mCurrentState is in the IDLESTATE and there's an xmt msg we must guarantee 439 // the change of mCurrentState to one of the waiting for reply states. Therefore we need to synchronize. 440 synchronized (this) { 441 if (mCurrentState != NOTIFIEDSTATE && mCurrentState != IDLESTATE) { 442 log.error("left timeout in unexpected state: {}", mCurrentState); 443 } 444 if (mCurrentState == IDLESTATE) { 445 mCurrentState = POLLSTATE; // this prevents other transitions from the IDLESTATE 446 } 447 } 448 // went around with nothing to do; leave programming state if in it 449 if (mCurrentMode == PROGRAMINGMODE) { 450 log.debug("Timeout - in service mode"); 451 } 452 if (mCurrentState == POLLSTATE && mCurrentMode == PROGRAMINGMODE && programmerIdle()) { 453 log.debug("timeout causes leaving programming mode"); 454 mCurrentState = WAITREPLYINNORMMODESTATE; 455 AbstractMRMessage msg = enterNormalMode(); 456 // if the enterNormalMode() message is null, we 457 // don't want to try to send it to the port. 458 if (msg != null) { 459 forwardToPort(msg, null); 460 // wait for reply 461 transmitWait(msg.getTimeout(), WAITREPLYINNORMMODESTATE, "interrupted while leaving programming mode"); 462 checkReplyInDispatch(); 463 // exit program mode timeout? 464 if (mCurrentState == WAITREPLYINNORMMODESTATE) { 465 // entering normal mode via timeout 466 handleTimeout(msg, l); 467 mCurrentMode = NORMALMODE; 468 } 469 // and go around again 470 } 471 } else if (mCurrentState == POLLSTATE && mCurrentMode == NORMALMODE) { 472 // We may need to poll 473 AbstractMRMessage msg = pollMessage(); 474 if (msg != null) { 475 // yes, send that 476 log.debug("Sending poll, wait time {}", waitTimePoll); 477 mCurrentState = WAITMSGREPLYSTATE; 478 forwardToPort(msg, pollReplyHandler()); 479 // wait for reply 480 log.debug("Still waiting for reply"); 481 transmitWait(msg.getTimeout(), WAITMSGREPLYSTATE, "interrupted while waiting poll reply"); 482 checkReplyInDispatch(); 483 // and go around again 484 if (mCurrentState == WAITMSGREPLYSTATE) { 485 handleTimeout(msg, l); 486 } else { 487 resetTimeout(msg); 488 } 489 } 490 waitTimePoll = 0; 491 } 492 // no messages, so back to idle 493 if (mCurrentState == POLLSTATE) { 494 mCurrentState = IDLESTATE; 495 } 496 } 497 } 498 } // end of transmit loop; go around again 499 500 protected void transmitWait(int waitTime, int state, String interruptMessage) { 501 // wait() can have spurious wakeup! 502 // so we protect by making sure the entire timeout time is used 503 long currentTime = Calendar.getInstance().getTimeInMillis(); 504 long endTime = currentTime + waitTime; 505 while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) { 506 long wait = endTime - currentTime; 507 try { 508 synchronized (xmtRunnable) { 509 // Do not wait if the current state has changed since we 510 // last set it. 511 if (mCurrentState != state) { 512 return; 513 } 514 xmtRunnable.wait(wait); // rcvr normally ends this w state change 515 } 516 } catch (InterruptedException e) { 517 Thread.currentThread().interrupt(); // retain if needed later 518 String[] packages = this.getClass().getName().split("\\."); 519 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 520 +(packages.length>=1 ? packages[packages.length-1] :""); 521 if (!threadStopRequest) { 522 log.error("{} in transmitWait(..) of {}", interruptMessage, name); 523 } else { 524 log.debug("during shutdown, {} in transmitWait(..) of {}", interruptMessage, name); 525 } 526 } 527 } 528 log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState); 529 } 530 531 // Dispatch control and timer 532 protected boolean replyInDispatch = false; // true when reply has been received but dispatch not completed 533 private int maxDispatchTime = 0; 534 private int warningMessageTime = DISPATCH_WARNING_TIME; 535 private static final int DISPATCH_WAIT_INTERVAL = 100; 536 private static final int DISPATCH_WARNING_TIME = 12000; // report warning when max dispatch time exceeded 537 private static final int WARN_NEXT_TIME = 1000; // report every second 538 539 private void checkReplyInDispatch() { 540 int loopCount = 0; 541 while (replyInDispatch) { 542 try { 543 synchronized (xmtRunnable) { 544 xmtRunnable.wait(DISPATCH_WAIT_INTERVAL); 545 } 546 } catch (InterruptedException e) { 547 Thread.currentThread().interrupt(); // retain if needed later 548 if (threadStopRequest) return; // don't log an error if closing. 549 String[] packages = this.getClass().getName().split("\\."); 550 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 551 +(packages.length>=1 ? packages[packages.length-1] :""); 552 log.error("transmitLoop interrupted in class {}", name); 553 } 554 loopCount++; 555 int currentDispatchTime = loopCount * DISPATCH_WAIT_INTERVAL; 556 if (currentDispatchTime > maxDispatchTime) { 557 maxDispatchTime = currentDispatchTime; 558 if (currentDispatchTime >= warningMessageTime) { 559 warningMessageTime = warningMessageTime + WARN_NEXT_TIME; 560 log.debug("Max dispatch time is now {}", currentDispatchTime); 561 } 562 } 563 } 564 } 565 566 /** 567 * Determine if the interface is down. 568 * 569 * @return timeoutFlag 570 */ 571 public boolean hasTimeouts() { 572 return timeoutFlag; 573 } 574 575 private boolean timeoutFlag = false; 576 private int timeouts = 0; 577 protected boolean flushReceiveChars = false; 578 579 protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) { 580 //log.debug("Timeout mCurrentState: {}", mCurrentState); 581 String[] packages = this.getClass().getName().split("\\."); 582 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 583 +(packages.length>=1 ? packages[packages.length-1] :""); 584 585 log.warn("Timeout on reply to message: {} consecutive timeouts = {} in {}", msg, timeouts, name); 586 timeouts++; 587 timeoutFlag = true; 588 flushReceiveChars = true; 589 } 590 591 protected void resetTimeout(AbstractMRMessage msg) { 592 if (timeouts > 0) { 593 log.debug("Reset timeout after {} timeouts", timeouts); 594 } 595 timeouts = 0; 596 timeoutFlag = false; 597 } 598 599 /** 600 * Add header to the outgoing byte stream. 601 * 602 * @param msg the output byte stream 603 * @param m Message results 604 * @return next location in the stream to fill 605 */ 606 protected int addHeaderToOutput(byte[] msg, AbstractMRMessage m) { 607 return 0; 608 } 609 610 protected int mWaitBeforePoll = 100; 611 protected long waitTimePoll = 0; 612 613 /** 614 * Add trailer to the outgoing byte stream. 615 * 616 * @param msg the output byte stream 617 * @param offset the first byte not yet used 618 * @param m output message to extend 619 */ 620 protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) { 621 if (!m.isBinary()) { 622 msg[offset] = 0x0d; 623 } 624 } 625 626 /** 627 * Determine how many bytes the entire message will take, including 628 * space for header and trailer. 629 * 630 * @param m the message to be sent 631 * @return number of bytes 632 */ 633 protected int lengthOfByteStream(AbstractMRMessage m) { 634 int len = m.getNumDataElements(); 635 int cr = 0; 636 if (!m.isBinary()) { 637 cr = 1; // space for return char 638 } 639 return len + cr; 640 } 641 642 protected boolean xmtException = false; 643 644 /** 645 * Actually transmit the next message to the port. 646 * @see #sendMessage(AbstractMRMessage, AbstractMRListener) 647 * 648 * @param m the message to send 649 * @param reply the Listener sending the message, often provided as 'this' 650 */ 651 @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT"}, 652 justification = "Two locks needed for synchronization here, this is OK") 653 protected synchronized void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) { 654 log.debug("forwardToPort message: [{}]", m); 655 // remember who sent this 656 mLastSender = reply; 657 658 // forward the message to the registered recipients, 659 // which includes the communications monitor, except the sender. 660 // Schedule notification via the Swing event queue to ensure order 661 log.trace("about to start XmtNotifier for {} last: {}", m, mLastSender, new Exception("traceback")); 662 Runnable r = new XmtNotifier(m, mLastSender, this); 663 SwingUtilities.invokeLater(r); 664 665 // stream to port in single write, as that's needed by serial 666 int byteLength = lengthOfByteStream(m); 667 byte[] msg= new byte[byteLength]; 668 log.debug("copying message, length = {}", byteLength); 669 // add header 670 int offset = addHeaderToOutput(msg, m); 671 672 // add data content 673 int len = m.getNumDataElements(); 674 log.debug("copying data to message, length = {}", len); 675 if (len > byteLength) { // happens somehow 676 log.warn("Invalid message array size {} for {} elements, truncated", byteLength, len); 677 } 678 for (int i = 0; (i < len && i < byteLength); i++) { 679 msg[i + offset] = (byte) m.getElement(i); 680 } 681 // add trailer 682 addTrailerToOutput(msg, len + offset, m); 683 // and stream the bytes 684 try { 685 if (ostream != null) { 686 if (log.isDebugEnabled()) { 687 StringBuilder f = new StringBuilder(); 688 for (int i = 0; i < msg.length; i++) { 689 f.append(String.format("%02X ",0xFF & msg[i])); 690 } 691 log.debug("formatted message: {}", f.toString() ); 692 } 693 while (m.getRetries() >= 0) { 694 if (portReadyToSend(controller)) { 695 ostream.write(msg); 696 ostream.flush(); 697 log.debug("written, msg timeout: {} mSec", m.getTimeout()); 698 break; 699 } else if (m.getRetries() >= 0) { 700 log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries()); 701 m.setRetries(m.getRetries() - 1); 702 try { 703 synchronized (xmtRunnable) { 704 xmtRunnable.wait(m.getTimeout()); 705 } 706 } catch (InterruptedException e) { 707 Thread.currentThread().interrupt(); // retain if needed later 708 log.error("retry wait interrupted"); 709 } 710 } else { 711 log.warn("sendMessage: port not ready for data sending: {}", Arrays.toString(msg)); 712 } 713 } 714 } else { // ostream is null 715 // no stream connected 716 connectionWarn(); 717 } 718 } catch (IOException | RuntimeException e) { 719 // TODO Currently there's no port recovery if an exception occurs 720 // must restart JMRI to clear xmtException. 721 xmtException = true; 722 portWarn(e); 723 } 724 } 725 726 protected void connectionWarn() { 727 log.warn("sendMessage: no connection established for {}", this.getClass().getName(), new Exception()); 728 } 729 730 protected void portWarn(Exception e) { 731 log.warn("sendMessage: Exception: In {} port warn: ", this.getClass().getName(), e); 732 } 733 734 protected boolean connectionError = false; 735 736 protected void portWarnTCP(Exception e) { 737 log.warn("Exception java net: ", e); 738 connectionError = true; 739 } 740 // methods to connect/disconnect to a source of data in an AbstractPortController 741 742 public AbstractPortController controller = null; 743 744 public boolean status() { 745 return (ostream != null && istream != null); 746 } 747 748 protected volatile Thread xmtThread = null; 749 protected volatile Thread rcvThread = null; 750 751 protected volatile Runnable xmtRunnable = null; 752 753 /** 754 * Make connection to an existing PortController object. 755 * 756 * @param p the PortController 757 */ 758 public void connectPort(AbstractPortController p) { 759 rcvException = false; 760 connectionError = false; 761 xmtException = false; 762 threadStopRequest = false; 763 try { 764 istream = p.getInputStream(); 765 ostream = p.getOutputStream(); 766 if (controller != null) { 767 log.warn("connectPort: connect called while connected"); 768 } else { 769 log.debug("connectPort invoked"); 770 } 771 controller = p; 772 // and start threads 773 xmtThread = jmri.util.ThreadingUtil.newThread( 774 xmtRunnable = new Runnable() { 775 @Override 776 public void run() { 777 try { 778 transmitLoop(); 779 } catch (ThreadDeath td) { 780 if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", td, td); 781 // ThreadDeath must be thrown per Java API Javadocs 782 throw td; 783 } catch (Throwable e) { 784 if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", e, e); 785 } 786 } 787 }); 788 789 String[] packages = this.getClass().getName().split("\\."); 790 xmtThread.setName( 791 (packages.length>=2 ? packages[packages.length-2]+"." :"") 792 +(packages.length>=1 ? packages[packages.length-1] :"") 793 +" Transmit thread"); 794 795 xmtThread.setDaemon(true); 796 xmtThread.setPriority(Thread.MAX_PRIORITY-1); //bump up the priority 797 xmtThread.start(); 798 799 rcvThread = jmri.util.ThreadingUtil.newThread( 800 new Runnable() { 801 @Override 802 public void run() { 803 receiveLoop(); 804 } 805 }); 806 rcvThread.setName( 807 (packages.length>=2 ? packages[packages.length-2]+"." :"") 808 +(packages.length>=1 ? packages[packages.length-1] :"") 809 +" Receive thread"); 810 811 rcvThread.setPriority(Thread.MAX_PRIORITY); //bump up the priority 812 rcvThread.setDaemon(true); 813 rcvThread.start(); 814 815 } catch (RuntimeException e) { 816 log.error("Failed to start up communications. Error was: ", e); 817 log.debug("Full trace:", e); 818 } 819 } 820 821 /** 822 * Get the port name for this connection from the TrafficController. 823 * 824 * @return the name of the port 825 */ 826 public String getPortName() { 827 return controller.getCurrentPortName(); 828 } 829 830 /** 831 * Break connection to existing PortController object. Once broken, attempts 832 * to send via "message" member will fail. 833 * 834 * @param p the PortController 835 */ 836 public void disconnectPort(AbstractPortController p) { 837 istream = null; 838 ostream = null; 839 if (controller != p) { 840 log.warn("disconnectPort: disconnect called from non-connected AbstractPortController"); 841 } 842 controller = null; 843 threadStopRequest = true; 844 } 845 846 /** 847 * Check if PortController object can be sent to. 848 * 849 * @param p the PortController 850 * @return true if ready, false otherwise May throw an Exception. 851 */ 852 public boolean portReadyToSend(AbstractPortController p) { 853 if (p != null && !xmtException && !rcvException) { 854 return true; 855 } else { 856 return false; 857 } 858 } 859 860 // data members to hold the streams 861 protected DataInputStream istream = null; 862 protected OutputStream ostream = null; 863 864 protected boolean rcvException = false; 865 866 protected int maxRcvExceptionCount = 100; 867 868 /** 869 * Handle incoming characters. This is a permanent loop, looking for input 870 * messages in character form on the stream connected to the PortController 871 * via {@link #connectPort(AbstractPortController)}. 872 * <p> 873 * Each turn of the loop is the receipt of a single message. 874 */ 875 public void receiveLoop() { 876 log.debug("receiveLoop starts in {}", this); 877 int errorCount = 0; 878 while (errorCount < maxRcvExceptionCount && !threadStopRequest) { // stream close will exit via exception 879 try { 880 handleOneIncomingReply(); 881 errorCount = 0; 882 } catch (java.io.InterruptedIOException e) { 883 // related to InterruptedException, catch first 884 break; 885 } catch (IOException e) { 886 rcvException = true; 887 reportReceiveLoopException(e); 888 break; 889 } catch (RuntimeException e1) { 890 log.error("Exception in receive loop: {}", e1.toString(), e1); 891 errorCount++; 892 if (errorCount == maxRcvExceptionCount) { 893 rcvException = true; 894 reportReceiveLoopException(e1); 895 } 896 } 897 } 898 if (!threadStopRequest) { // if e.g. unexpected end 899 ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), ConnectionStatus.CONNECTION_DOWN); 900 log.error("Exit from rcv loop in {}", this.getClass()); 901 recovery(); // see if you can restart 902 } 903 } 904 905 /** 906 * Disconnect and reset the current PortController. 907 * Invoked at abnormal ending of receiveLoop. 908 */ 909 protected final void recovery() { 910 AbstractPortController adapter = controller; 911 disconnectPort(controller); 912 adapter.recover(); 913 } 914 915 /** 916 * Report an error on the receive loop. Separated so tests can suppress, even 917 * though message is asynchronous. 918 * @param e Exception encountered at lower level to trigger error, or null 919 */ 920 protected void reportReceiveLoopException(Exception e) { 921 log.error("run: Exception: {} in {}", e.toString(), this.getClass().toString(), e); 922 jmri.jmrix.ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), jmri.jmrix.ConnectionStatus.CONNECTION_DOWN); 923 if (controller instanceof AbstractNetworkPortController) { 924 portWarnTCP(e); 925 } 926 } 927 928 protected abstract AbstractMRReply newReply(); 929 930 protected abstract boolean endOfMessage(AbstractMRReply r); 931 932 /** 933 * Dummy routine, to be filled by protocols that have to skip some 934 * start-of-message characters. 935 * @param istream input source 936 * @throws IOException from underlying operations 937 */ 938 protected void waitForStartOfReply(DataInputStream istream) throws IOException { 939 } 940 941 /** 942 * Read a single byte, protecting against various timeouts, etc. 943 * <p> 944 * When a port is set to have a receive timeout (via the 945 * {@link purejavacomm.SerialPort#enableReceiveTimeout(int)} method), some will return 946 * zero bytes or an EOFException at the end of the timeout. In that case, the read 947 * should be repeated to get the next real character. 948 * 949 * @param istream stream to read 950 * @return the byte read 951 * @throws java.io.IOException if unable to read 952 */ 953 protected byte readByteProtected(DataInputStream istream) throws IOException { 954 if (istream == null) { 955 throw new IOException("Input Stream NULL when reading"); 956 } 957 while (true) { // loop will repeat until character found 958 int nchars; 959 nchars = istream.read(rcvBuffer, 0, 1); 960 if (nchars == -1) { 961 // No more bytes can be read from the channel 962 throw new IOException("Connection not terminated normally"); 963 } 964 if (nchars > 0) { 965 return rcvBuffer[0]; 966 } 967 } 968 } 969 970 // Defined this way to reduce new object creation 971 private byte[] rcvBuffer = new byte[1]; 972 973 /** 974 * Get characters from the input source, and file a message. 975 * <p> 976 * Returns only when the message is complete. 977 * <p> 978 * Only used in the Receive thread. 979 * <p> 980 * Handles timeouts on read by ignoring zero-length reads. 981 * 982 * @param msg message to fill 983 * @param istream character source. 984 * @throws IOException when presented by the input source. 985 */ 986 protected void loadChars(AbstractMRReply msg, DataInputStream istream) 987 throws IOException { 988 int i; 989 for (i = 0; i < msg.maxSize(); i++) { 990 byte char1 = readByteProtected(istream); 991 log.trace("char: {} i: {}",(char1&0xFF),i); 992 // if there was a timeout, flush any char received and start over 993 if (flushReceiveChars) { 994 log.warn("timeout flushes receive buffer: {}", msg); 995 msg.flush(); 996 i = 0; // restart 997 flushReceiveChars = false; 998 } 999 if (canReceive()) { 1000 msg.setElement(i, char1); 1001 if (endOfMessage(msg)) { 1002 break; 1003 } 1004 } else { 1005 i--; // flush char 1006 log.error("unsolicited character received: {}", Integer.toHexString(char1)); 1007 } 1008 } 1009 } 1010 1011 /** 1012 * Override in the system specific code if necessary 1013 * 1014 * @return true if it is okay to buffer receive characters into a reply 1015 * message. When false, discard char received 1016 */ 1017 protected boolean canReceive() { 1018 return true; 1019 } 1020 1021 private int retransmitCount = 0; 1022 1023 /** 1024 * Executes a reply distribution action on the appropriate thread for JMRI. 1025 * @param r a runnable typically encapsulating a MRReply and the iteration code needed to 1026 * send it to all the listeners. 1027 */ 1028 protected void distributeReply(Runnable r) { 1029 try { 1030 if (synchronizeRx) { 1031 SwingUtilities.invokeAndWait(r); 1032 } else { 1033 SwingUtilities.invokeLater(r); 1034 } 1035 } catch (InterruptedException ie) { 1036 if (threadStopRequest) return; 1037 log.error("Unexpected exception in invokeAndWait: {}{}", ie, ie.toString()); 1038 } catch (java.lang.reflect.InvocationTargetException| RuntimeException e) { 1039 log.error("Unexpected exception in invokeAndWait: {}{}", e, e.toString()); 1040 return; 1041 } 1042 log.debug("dispatch thread invoked"); 1043 } 1044 1045 /** 1046 * Handle each reply when complete. 1047 * <p> 1048 * (This is public for testing purposes) Runs in the "Receive" thread. 1049 * 1050 * @throws java.io.IOException on error. 1051 */ 1052 public void handleOneIncomingReply() throws IOException { 1053 // we sit in this until the message is complete, relying on 1054 // threading to let other stuff happen 1055 1056 // Create message off the right concrete class 1057 AbstractMRReply msg = newReply(); 1058 1059 // wait for start if needed 1060 waitForStartOfReply(istream); 1061 1062 // message exists, now fill it 1063 loadChars(msg, istream); 1064 1065 if (threadStopRequest) return; 1066 1067 // message is complete, dispatch it !! 1068 replyInDispatch = true; 1069 log.debug("dispatch reply of length {} contains \"{}\", state {}", msg.getNumDataElements(), msg, mCurrentState); 1070 1071 // forward the message to the registered recipients, 1072 // which includes the communications monitor 1073 // return a notification via the Swing event queue to ensure proper thread 1074 Runnable r = new RcvNotifier(msg, mLastSender, this); 1075 distributeReply(r); 1076 1077 if (!msg.isUnsolicited()) { 1078 // effect on transmit: 1079 switch (mCurrentState) { 1080 case WAITMSGREPLYSTATE: { 1081 // check to see if the response was an error message we want 1082 // to automatically handle by re-queueing the last sent 1083 // message, otherwise go on to the next message 1084 if (msg.isRetransmittableErrorMsg()) { 1085 log.error("Automatic Recovery from Error Message: {}. Retransmitted {} times.", msg, retransmitCount); 1086 synchronized (xmtRunnable) { 1087 mCurrentState = AUTORETRYSTATE; 1088 if (retransmitCount > 0) { 1089 try { 1090 xmtRunnable.wait(retransmitCount * 100L); 1091 } catch (InterruptedException e) { 1092 Thread.currentThread().interrupt(); // retain if needed later 1093 } 1094 } 1095 replyInDispatch = false; 1096 xmtRunnable.notify(); 1097 retransmitCount++; 1098 } 1099 } else { 1100 // update state, and notify to continue 1101 synchronized (xmtRunnable) { 1102 mCurrentState = NOTIFIEDSTATE; 1103 replyInDispatch = false; 1104 xmtRunnable.notify(); 1105 retransmitCount = 0; 1106 } 1107 } 1108 break; 1109 } 1110 case WAITREPLYINPROGMODESTATE: { 1111 // entering programming mode 1112 mCurrentMode = PROGRAMINGMODE; 1113 replyInDispatch = false; 1114 1115 // check to see if we need to delay to allow decoders to become 1116 // responsive 1117 int warmUpDelay = enterProgModeDelayTime(); 1118 if (warmUpDelay != 0) { 1119 try { 1120 synchronized (xmtRunnable) { 1121 xmtRunnable.wait(warmUpDelay); 1122 } 1123 } catch (InterruptedException e) { 1124 Thread.currentThread().interrupt(); // retain if needed later 1125 } 1126 } 1127 // update state, and notify to continue 1128 synchronized (xmtRunnable) { 1129 mCurrentState = OKSENDMSGSTATE; 1130 xmtRunnable.notify(); 1131 } 1132 break; 1133 } 1134 case WAITREPLYINNORMMODESTATE: { 1135 // entering normal mode 1136 mCurrentMode = NORMALMODE; 1137 replyInDispatch = false; 1138 // update state, and notify to continue 1139 synchronized (xmtRunnable) { 1140 mCurrentState = OKSENDMSGSTATE; 1141 xmtRunnable.notify(); 1142 } 1143 break; 1144 } 1145 default: { 1146 replyInDispatch = false; 1147 if (allowUnexpectedReply) { 1148 log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg); 1149 synchronized (xmtRunnable) { 1150 // The transmit thread sometimes gets stuck 1151 // when unexpected replies are received. Notify 1152 // it to clear the block without a timeout. 1153 // (do not change the current state) 1154 //if(mCurrentState!=IDLESTATE) 1155 xmtRunnable.notify(); 1156 } 1157 } else { 1158 unexpectedReplyStateError(mCurrentState, msg.toString()); 1159 } 1160 } 1161 } 1162 // Unsolicited message 1163 } else { 1164 log.debug("Unsolicited Message Received {}", msg); 1165 1166 replyInDispatch = false; 1167 } 1168 } 1169 1170 /** 1171 * Log an error message for a message received in an unexpected state. 1172 * @param State message state. 1173 * @param msgString message string. 1174 */ 1175 protected void unexpectedReplyStateError(int State, String msgString) { 1176 String[] packages = this.getClass().getName().split("\\."); 1177 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 1178 +(packages.length>=1 ? packages[packages.length-1] :""); 1179 log.error("reply complete in unexpected state: {} was {} in class {}", State, msgString, name); 1180 } 1181 1182 /** 1183 * for testing purposes, let us be able to find out 1184 * what the last sender was. 1185 * @return last sender, mLastSender. 1186 */ 1187 public AbstractMRListener getLastSender() { 1188 return mLastSender; 1189 } 1190 1191 protected void terminate() { 1192 log.debug("Cleanup Starts"); 1193 if (ostream == null) { 1194 return; // no connection established 1195 } 1196 AbstractMRMessage modeMsg = enterNormalMode(); 1197 if (modeMsg != null) { 1198 modeMsg.setRetries(100); // set the number of retries 1199 // high, just in case the interface 1200 // is busy when we try to send 1201 forwardToPort(modeMsg, null); 1202 // wait for reply 1203 try { 1204 if (xmtRunnable != null) { 1205 synchronized (xmtRunnable) { 1206 xmtRunnable.wait(modeMsg.getTimeout()); 1207 } 1208 } 1209 } catch (InterruptedException e) { 1210 Thread.currentThread().interrupt(); // retain if needed later 1211 log.error("transmit interrupted"); 1212 } 1213 } 1214 } 1215 1216 /** 1217 * Internal class to remember the Reply object and destination listener with 1218 * a reply is received. 1219 */ 1220 protected static class RcvNotifier implements Runnable { 1221 1222 AbstractMRReply mMsg; 1223 AbstractMRListener mDest; 1224 AbstractMRTrafficController mTc; 1225 1226 public RcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest, 1227 AbstractMRTrafficController pTc) { 1228 mMsg = pMsg; 1229 mDest = pDest; 1230 mTc = pTc; 1231 } 1232 1233 @Override 1234 public void run() { 1235 log.debug("Delayed rcv notify starts"); 1236 mTc.notifyReply(mMsg, mDest); 1237 } 1238 } // end RcvNotifier 1239 1240 // allow creation of object outside package 1241 protected RcvNotifier newRcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest, 1242 AbstractMRTrafficController pTc) { 1243 return new RcvNotifier(pMsg, pDest, pTc); 1244 } 1245 1246 /** 1247 * Internal class to remember the Message object and destination listener 1248 * when a message is queued for notification. 1249 */ 1250 protected static class XmtNotifier implements Runnable { 1251 1252 AbstractMRMessage mMsg; 1253 AbstractMRListener mDest; 1254 AbstractMRTrafficController mTc; 1255 1256 public XmtNotifier(AbstractMRMessage pMsg, AbstractMRListener pDest, 1257 AbstractMRTrafficController pTc) { 1258 mMsg = pMsg; 1259 mDest = pDest; 1260 mTc = pTc; 1261 } 1262 1263 @Override 1264 public void run() { 1265 log.debug("Delayed xmt notify starts"); 1266 mTc.notifyMessage(mMsg, mDest); 1267 } 1268 } // end XmtNotifier 1269 1270 /** 1271 * Terminate the receive and transmit threads. 1272 * <p> 1273 * This is intended to be used only by testing subclasses. 1274 */ 1275 public void terminateThreads() { 1276 threadStopRequest = true; 1277 if (xmtThread != null) { 1278 xmtThread.interrupt(); 1279 try { 1280 xmtThread.join(); 1281 } catch (InterruptedException ie){ 1282 // interrupted during cleanup. 1283 } 1284 } 1285 1286 if (rcvThread != null) { 1287 rcvThread.interrupt(); 1288 try { 1289 rcvThread.join(); 1290 } catch (InterruptedException ie){ 1291 // interrupted during cleanup. 1292 } 1293 } 1294 // we also need to remove the shutdown task. 1295 InstanceManager.getDefault(ShutDownManager.class).deregister(shutDownTask); 1296 } 1297 1298 /** 1299 * Flag that threads should terminate as soon as they can. 1300 */ 1301 protected volatile boolean threadStopRequest = false; 1302 1303 private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(AbstractMRTrafficController.class); 1304 1305}