001package jmri.jmrix.roco.z21; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.net.DatagramPacket; 005import java.util.ArrayList; 006import java.util.Arrays; 007import java.util.List; 008 009import jmri.jmrix.*; 010import org.slf4j.Logger; 011import org.slf4j.LoggerFactory; 012 013/** 014 * Abstract base for TrafficControllers in a Message/Reply protocol. 015 * 016 * @author Paul Bender Copyright (C) 2014 017 */ 018public class Z21TrafficController extends jmri.jmrix.AbstractMRTrafficController implements Z21Interface { 019 020 private java.net.InetAddress host; 021 private int port; 022 023 public Z21TrafficController() { 024 super(); 025 allowUnexpectedReply = true; 026 } 027 028 /** 029 * Implement this to forward a specific message type to a protocol-specific 030 * listener interface. This puts the casting into the concrete class. 031 */ 032 @Override 033 protected void forwardMessage(AbstractMRListener client, AbstractMRMessage m) { 034 ((Z21Listener) client).message((Z21Message) m); 035 } 036 037 /** 038 * Implement this to forward a specific Reply type to a protocol-specific 039 * listener interface. This puts the casting into the concrete class. 040 */ 041 @Override 042 protected void forwardReply(AbstractMRListener client, AbstractMRReply m) { 043 ((Z21Listener) client).reply((Z21Reply) m); 044 } 045 046 /** 047 * Invoked if it's appropriate to do low-priority polling of the command 048 * station, this should return the next message to send, or null if the TC 049 * should just sleep. 050 */ 051 @Override 052 protected Z21Message pollMessage() { 053 return null; 054 } 055 056 @Override 057 protected Z21Listener pollReplyHandler() { 058 return null; 059 } 060 061 /** 062 * enterProgMode() and enterNormalMode() return any message that 063 * needs to be returned to the command station to change modes. 064 * 065 * @see #enterNormalMode() 066 * @return if no message is needed, you may return null. 067 * 068 * If the programmerIdle() function returns true, enterNormalMode() is 069 * called after a timeout while in IDLESTATE during programming to 070 * return the system to normal mode. 071 */ 072 @Override 073 protected Z21Message enterProgMode() { 074 return null; 075 } 076 077 /** 078 * enterProgMode() and enterNormalMode() return any message that 079 * needs to be returned to the command station to change modes. 080 * 081 * @see #enterProgMode() 082 * @return if no message is needed, you may return null. 083 */ 084 @Override 085 protected Z21Message enterNormalMode() { 086 return null; 087 } 088 089 /** 090 * Actually transmits the next message to the port. 091 */ 092 @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT", "", "UW_UNCOND_WAIT"}, 093 justification = "Two locks needed for synchronization here, this is OK; String + only used for debug, so inefficient String processing not really a problem; Unconditional Wait is to give external hardware, which doesn't necessarilly respond, time to process the data.") 094 @Override 095 synchronized protected void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) { 096 if (log.isDebugEnabled()) { 097 log.debug("forwardToPort message: [{}]", m); 098 } 099 // remember who sent this 100 mLastSender = reply; 101 102 // forward the message to the registered recipients, 103 // which includes the communications monitor, except the sender. 104 // Schedule notification via the Swing event queue to ensure order 105 Runnable r = new XmtNotifier(m, mLastSender, this); 106 javax.swing.SwingUtilities.invokeLater(r); 107 108 // stream to port in single write, as that's needed by serial 109 byte[] msg = new byte[lengthOfByteStream(m)]; 110 // add header 111 int offset = addHeaderToOutput(msg, m); 112 113 // add data content 114 int len = m.getNumDataElements(); 115 for (int i = 0; i < len; i++) { 116 msg[i + offset] = (byte) m.getElement(i); 117 } 118 // add trailer 119 addTrailerToOutput(msg, len + offset, m); 120 // and send the bytes 121 try { 122 if (log.isDebugEnabled()) { 123 StringBuilder f = new StringBuilder("formatted message: "); 124 for (byte b : msg) { 125 f.append(Integer.toHexString(0xFF & b)); 126 f.append(" "); 127 } 128 log.debug(new String(f)); 129 } 130 while (m.getRetries() >= 0) { 131 if (portReadyToSend(controller)) { 132 // create a datagram with the data from the 133 // message. 134 byte[] data = ((Z21Message) m).getBuffer(); 135 DatagramPacket sendPacket 136 = new DatagramPacket(data, ((Z21Message) m).getLength(), host, port); 137 // and send it. 138 ((Z21Adapter) controller).getSocket().send(sendPacket); 139 log.debug("written, msg timeout: {} mSec", m.getTimeout()); 140 break; 141 } else if (m.getRetries() >= 0) { 142 if (log.isDebugEnabled()) { 143 StringBuilder b = new StringBuilder("Retry message: "); 144 b.append(m.toString()); 145 b.append(" attempts remaining: "); 146 b.append(m.getRetries()); 147 log.debug(new String(b)); 148 } 149 m.setRetries(m.getRetries() - 1); 150 try { 151 synchronized (xmtRunnable) { 152 xmtRunnable.wait(m.getTimeout()); 153 } 154 } catch (InterruptedException e) { 155 Thread.currentThread().interrupt(); // retain if needed later 156 if(!threadStopRequest) { 157 log.error("retry wait interrupted"); 158 } else { 159 log.error("retry wait interrupted during thread stop"); 160 } 161 } 162 } else { 163 log.warn("sendMessage: port not ready for data sending: {}", java.util.Arrays.toString(msg)); 164 } 165 } 166 } catch (Exception e) { 167 // TODO Currently there's no port recovery if an exception occurs 168 // must restart JMRI to clear xmtException. 169 xmtException = true; 170 portWarn(e); 171 } 172 } 173 174 @Override() 175 public boolean status() { 176 if (controller == null) { 177 return false; 178 } else { 179 return (controller.status()); 180 } 181 } 182 183 /** 184 * Make connection to existing PortController object. 185 */ 186 @Override 187 public void connectPort(AbstractPortController p) { 188 rcvException = false; 189 xmtException = false; 190 if (controller != null) { 191 log.warn("connectPort: connect called while connected"); 192 } else { 193 log.debug("connectPort invoked"); 194 } 195 if (!(p instanceof Z21Adapter)) { 196 throw new IllegalArgumentException("attempt to connect wrong port type"); 197 } 198 controller = p; 199 try { 200 host = java.net.InetAddress.getByName(((Z21Adapter) controller).getHostName()); 201 port = ((Z21Adapter) controller).getPort(); 202 ConnectionStatus.instance().setConnectionState( 203 p.getSystemConnectionMemo().getUserName(), 204 ((Z21Adapter) p).getHostName() + ":" + ((Z21Adapter) p).getPort(), ConnectionStatus.CONNECTION_UP); 205 } catch (java.net.UnknownHostException uhe) { 206 log.error("Unknown Host: {} ", ((Z21Adapter) controller).getHostName()); 207 if (((Z21Adapter) p).getPort() != 0) { 208 ConnectionStatus.instance().setConnectionState( 209 p.getSystemConnectionMemo().getUserName(), 210 ((Z21Adapter) controller).getHostName() + ":" + ((Z21Adapter) p).getPort(), ConnectionStatus.CONNECTION_DOWN); 211 } else { 212 ConnectionStatus.instance().setConnectionState( 213 p.getSystemConnectionMemo().getUserName(), 214 ((Z21Adapter) controller).getHostName(), ConnectionStatus.CONNECTION_DOWN); 215 } 216 } 217 // and start threads 218 xmtThread = new Thread(xmtRunnable = () -> { 219 try { 220 transmitLoop(); 221 } catch (Throwable e) { 222 if(!threadStopRequest) 223 log.error("Transmit thread terminated prematurely by: {}", e.toString(), e); 224 // ThreadDeath must be thrown per Java API JavaDocs 225 if (e instanceof ThreadDeath) { 226 throw e; 227 } 228 } 229 }); 230 xmtThread.setName("z21.Z21TrafficController Transmit thread"); 231 xmtThread.start(); 232 rcvThread = new Thread(this::receiveLoop); 233 rcvThread.setName("z21.Z21TrafficController Receive thread"); 234 int xr = rcvThread.getPriority(); 235 xr++; 236 rcvThread.setPriority(xr); //bump up the priority 237 rcvThread.start(); 238 } 239 240 /** 241 * Break connection to existing PortController object. Once broken, attempts 242 * to send via "message" member will fail. 243 */ 244 @Override 245 public void disconnectPort(AbstractPortController p) { 246 if (controller != p) { 247 log.warn("disconnectPort: disconnect called from non-connected AbstractPortController"); 248 } 249 controller = null; 250 } 251 252 @Override 253 protected Z21Reply newReply() { 254 return new Z21Reply(); 255 } 256 257 @Override 258 protected boolean endOfMessage(AbstractMRReply r) { 259 // since this is a UDP protocol, and each reply in the packet is complete, 260 // we don't check for end of message manually. 261 return true; 262 } 263 264 /** 265 * Handle each reply when complete. 266 * <p> 267 * (This is public for testing purposes) Runs in the "Receive" thread. 268 */ 269 @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP", "NO_NOTIFY_NOT_NOTIFYALL"}, 270 justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data. Notify is used because Having more than one thread waiting on xmtRunnable is an error.") 271 @Override 272 public void handleOneIncomingReply() throws java.io.IOException { 273 // we sit in this until the message is complete, relying on 274 // threading to let other stuff happen 275 276 // create a buffer to hold the incoming data. 277 byte[] buffer = new byte[100]; // the size here just needs to be longer 278 // than the longest protocol message. 279 // Otherwise, the receive will truncate. 280 281 // create the packet. 282 DatagramPacket receivePacket = new DatagramPacket(buffer, 100, host, port); 283 284 // and wait to receive data in the packet. 285 try { 286 ((Z21Adapter) controller).getSocket().receive(receivePacket); 287 } catch (java.net.SocketException | NullPointerException se) { 288 // if we are waiting when the controller is disposed, 289 // a socket exception will be thrown. 290 log.debug("Socket exception during receive. Connection Closed?"); 291 rcvException = true; 292 return; 293 } 294 if (threadStopRequest) return; 295 296 // handle more than one reply in the same UDP packet. 297 List<Z21Reply> replies = new ArrayList<>(); 298 299 int totalLength=receivePacket.getLength(); 300 int consumed=0; 301 302 do { 303 int length = (0xff & buffer[0]) + ((0xff & buffer[1]) << 8); 304 Z21Reply msg = new Z21Reply(buffer, length); 305 306 replies.add(msg); 307 308 buffer = Arrays.copyOfRange(buffer,length,buffer.length); 309 consumed +=length; 310 log.trace("total length: {} consumed {}",totalLength,consumed); 311 } while(totalLength>consumed); 312 313 314 // and then dispatch each reply 315 replies.forEach(this::dispatchReply); 316 } 317 318 private void dispatchReply(Z21Reply msg) { 319 // message is complete, dispatch it !! 320 replyInDispatch = true; 321 if (log.isDebugEnabled()) { 322 log.debug("dispatch reply of length {} contains {} state {}", msg.getNumDataElements(), msg.toString(), mCurrentState); 323 } 324 325 // forward the message to the registered recipients, 326 // which includes the communications monitor 327 // return a notification via the Swing event queue to ensure proper thread 328 Runnable r = new RcvNotifier(msg, mLastSender, this); 329 try { 330 javax.swing.SwingUtilities.invokeAndWait(r); 331 } catch (InterruptedException ie) { 332 if(threadStopRequest) return; 333 log.error("Unexpected exception in invokeAndWait:{}", ie, ie); 334 } catch (Exception e) { 335 log.error("Unexpected exception in invokeAndWait:{}", e, e); 336 } 337 if (log.isDebugEnabled()) { 338 log.debug("dispatch thread invoked"); 339 } 340 341 if (!msg.isUnsolicited()) { 342 // effect on transmit: 343 switch (mCurrentState) { 344 case WAITMSGREPLYSTATE: { 345 // check to see if the response was an error message we want 346 // to automatically handle by re-queueing the last sent 347 // message, otherwise go on to the next message 348 if (msg.isRetransmittableErrorMsg()) { 349 if (log.isDebugEnabled()) { 350 log.debug("Automatic Recovery from Error Message: +msg.toString()"); 351 } 352 synchronized (xmtRunnable) { 353 mCurrentState = AUTORETRYSTATE; 354 replyInDispatch = false; 355 xmtRunnable.notify(); 356 } 357 } else { 358 // update state, and notify to continue 359 synchronized (xmtRunnable) { 360 mCurrentState = NOTIFIEDSTATE; 361 replyInDispatch = false; 362 xmtRunnable.notify(); 363 } 364 } 365 break; 366 } 367 case WAITREPLYINPROGMODESTATE: { 368 // entering programming mode 369 mCurrentMode = PROGRAMINGMODE; 370 replyInDispatch = false; 371 372 // check to see if we need to delay to allow decoders to become 373 // responsive 374 int warmUpDelay = enterProgModeDelayTime(); 375 if (warmUpDelay != 0) { 376 try { 377 synchronized (xmtRunnable) { 378 xmtRunnable.wait(warmUpDelay); 379 } 380 } catch (InterruptedException e) { 381 Thread.currentThread().interrupt(); // retain if needed later 382 if (threadStopRequest) return; 383 } 384 } 385 // update state, and notify to continue 386 synchronized (xmtRunnable) { 387 mCurrentState = OKSENDMSGSTATE; 388 xmtRunnable.notify(); 389 } 390 break; 391 } 392 case WAITREPLYINNORMMODESTATE: { 393 // entering normal mode 394 mCurrentMode = NORMALMODE; 395 replyInDispatch = false; 396 // update state, and notify to continue 397 synchronized (xmtRunnable) { 398 mCurrentState = OKSENDMSGSTATE; 399 xmtRunnable.notify(); 400 } 401 break; 402 } 403 default: { 404 replyInDispatch = false; 405 if (allowUnexpectedReply) { 406 if (log.isDebugEnabled()) { 407 log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg.toString()); 408 } 409 synchronized (xmtRunnable) { 410 // The transmit thread sometimes gets stuck 411 // when unexpected replies are received. Notify 412 // it to clear the block without a timeout. 413 // (do not change the current state) 414 //if(mCurrentState!=IDLESTATE) 415 xmtRunnable.notify(); 416 } 417 } else { 418 unexpectedReplyStateError(mCurrentState,msg.toString()); 419 } 420 } 421 } 422 // Unsolicited message 423 } else { 424 if (log.isDebugEnabled()) { 425 log.debug("Unsolicited Message Received {}", msg.toString()); 426 } 427 428 replyInDispatch = false; 429 } 430 } 431 432 @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"}, 433 justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.") 434 @Override 435 protected void terminate() { 436 if (controller == null) { 437 log.debug("terminate called while not connected"); 438 return; 439 } else { 440 log.debug("Cleanup Starts"); 441 } 442 443 Z21Message logoffMessage = Z21Message.getLanLogoffRequestMessage(); 444 forwardToPort(logoffMessage, null); 445 // wait for reply 446 try { 447 if (xmtRunnable != null) { 448 synchronized (xmtRunnable) { 449 xmtRunnable.wait(logoffMessage.getTimeout()); 450 } 451 } 452 } catch (InterruptedException e) { 453 Thread.currentThread().interrupt(); // retain if needed later 454 log.error("transmit interrupted"); 455 } finally { 456 // set the controller to null, even if terminate fails. 457 controller = null; 458 } 459 } 460 461 /** 462 * Terminate the receive and transmit threads. 463 * <p> 464 * This is intended to be used only by testing subclasses. 465 */ 466 @Override 467 public void terminateThreads() { 468 threadStopRequest = true; 469 // ensure socket closed to end pending operations 470 if ( controller != null && ((Z21Adapter) controller).getSocket() != null) ((Z21Adapter) controller).getSocket().close(); 471 472 // usual stop process 473 super.terminateThreads(); 474 } 475 476 // The methods to implement the Z21Interface 477 @Override 478 public synchronized void addz21Listener(Z21Listener l) { 479 this.addListener(l); 480 } 481 482 @Override 483 public synchronized void removez21Listener(Z21Listener l) { 484 this.removeListener(l); 485 } 486 487 /** 488 * Forward a preformatted message to the actual interface. 489 */ 490 @Override 491 public void sendz21Message(Z21Message m, Z21Listener reply) { 492 sendMessage(m, reply); 493 } 494 495 private final static Logger log = LoggerFactory.getLogger(Z21TrafficController.class); 496}