001package jmri.jmrix.sprog; 002 003import java.io.DataInputStream; 004import java.io.OutputStream; 005import java.util.Vector; 006import java.util.concurrent.BlockingQueue; 007import java.util.concurrent.LinkedBlockingQueue; 008 009import org.slf4j.Logger; 010import org.slf4j.LoggerFactory; 011 012import jmri.jmrix.AbstractPortController; 013import jmri.jmrix.sprog.SprogConstants.SprogState; 014import jmri.jmrix.sprog.serialdriver.SerialDriverAdapter; 015 016/** 017 * Converts Stream-based I/O to/from Sprog messages. The "SprogInterface" side 018 * sends/receives message objects. The connection to a SprogPortController is 019 * via a pair of *Streams, which then carry sequences of characters for 020 * transmission. Note that this processing is handled in an independent thread. 021 * <p> 022 * Rewritten during 4.11.x series. Create a high priority thread for the tc to 023 * move everything off the swing thread. Use a blocking queue to handle 024 * asynchronous messages from multiple sources. 025 * 026 * @author Bob Jacobsen Copyright (C) 2001 027 * @author Andrew Crosland Copyright (C) 2018 028 */ 029public class SprogTrafficController implements SprogInterface, 030 Runnable { 031 032 private SprogReply reply = new SprogReply(); 033 SprogListener lastSender = null; 034 private SprogState sprogState = SprogState.NORMAL; 035 private int lastId; 036 037 private Thread tcThread; 038 private final Object lock = new Object(); 039 private boolean replyAvailable = false; 040 // Make this public so it can be overridden by a script for debug 041 public int timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT; 042 043 /** 044 * Create a new SprogTrafficController instance. 045 * 046 * @param adaptermemo the associated SystemConnectionMemo 047 */ 048 @edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value="SC_START_IN_CTOR", justification="done at end, waits for data") 049 public SprogTrafficController(SprogSystemConnectionMemo adaptermemo) { 050 memo = adaptermemo; 051 init(); 052 } 053 054 private void init() { 055 // Set the timeout for communication with hardware 056 resetTimeout(); 057 058 tcThread = jmri.util.ThreadingUtil.newThread(this); 059 tcThread.setName("SPROG TC thread"); 060 tcThread.setPriority(Thread.MAX_PRIORITY-1); 061 tcThread.setDaemon(true); 062 log.debug("starting TC thread from {} ", this, jmri.util.LoggingUtil.shortenStacktrace(new Exception("traceback"),6)); 063 tcThread.start(); 064 } 065 066 // Methods to implement the Sprog Interface 067 068 protected Vector<SprogListener> cmdListeners = new Vector<SprogListener>(); 069 070 @Override 071 public boolean status() { 072 return (ostream != null && istream != null); 073 } 074 075 protected boolean isTcThreadAlive() { 076 return tcThread.isAlive(); 077 } 078 079 @Override 080 public synchronized void addSprogListener(SprogListener l) { 081 // add only if not already registered 082 if (l == null) { 083 throw new java.lang.NullPointerException(); 084 } 085 if (!cmdListeners.contains(l)) { 086 cmdListeners.addElement(l); 087 log.debug("SprogListener added to {} tc", memo.getUserName()); 088 } 089 } 090 091 @Override 092 public synchronized void removeSprogListener(SprogListener l) { 093 if (cmdListeners.contains(l)) { 094 cmdListeners.removeElement(l); 095 } 096 } 097 098 /** 099 * Reset timeout to default depending on current mode 100 */ 101 public void resetTimeout() { 102 if (memo.getSprogMode() == SprogConstants.SprogMode.OPS) { 103 timeout = SprogConstants.TC_OPS_REPLY_TIMEOUT; 104 } else { 105 timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT; 106 } 107 } 108 109 public void setTimeout(int t) { 110 timeout = t; 111 } 112 113 public SprogState getSprogState() { 114 return sprogState; 115 } 116 117 public void setSprogState(SprogState s) { 118 this.sprogState = s; 119 if (s == SprogState.V4BOOTMODE) { 120 // enable flow control - required for sprog v4 bootloader 121 var controller = getController(); 122 controller.setHandshake(jmri.jmrix.AbstractSerialPortController.FlowControl.RTSCTS); 123 124 } else { 125 // disable flow control 126 // removed Jan 2010 - this stops SPROG from sending. Could cause problems with 127 // serial Sprogs, but I have no way of testing: 128 // getController().setHandshake(false); 129 } 130 log.debug("Setting sprogState {}", s); 131 } 132 133 public boolean isNormalMode() { 134 return sprogState == SprogState.NORMAL; 135 } 136 137 public boolean isSIIBootMode() { 138 return sprogState == SprogState.SIIBOOTMODE; 139 } 140 141 public boolean isV4BootMode() { 142 return sprogState == SprogState.V4BOOTMODE; 143 } 144 145 @SuppressWarnings("unchecked") 146 private synchronized Vector<SprogListener> getCopyOfListeners() { 147 return (Vector<SprogListener>) cmdListeners.clone(); 148 149 } 150 151 protected synchronized void notifyMessage(SprogMessage m, SprogListener originator) { 152 for (SprogListener listener : this.getCopyOfListeners()) { 153 try { 154 // don't send it back to the originator! 155 if (listener != originator) { 156 // skip forwarding to the last sender for now, we'll get them later 157 if (lastSender != listener) { 158 listener.notifyMessage(m); 159 } 160 } 161 } catch (Exception e) { 162 log.warn("notify: During dispatch to {}", listener, e); 163 } 164 } 165 // forward to the last listener who sent a message 166 // this is done _second_ so monitoring can have already stored the reply 167 // before a response is sent 168 if (lastSender != null && lastSender != originator) { 169 lastSender.notifyMessage(m); 170 } 171 } 172 173 protected synchronized void notifyReply(SprogReply r) { 174 log.debug("notifyReply starts for later, last sender: {}", lastSender); 175 176 final Vector<SprogListener> listeners = this.getCopyOfListeners(); 177 final SprogReply replyForLater = r; 178 final SprogListener senderForLater = lastSender; 179 Runnable rl = () -> { 180 for (SprogListener listener : listeners) { 181 try { 182 // don't send message back to the originator! 183 // skip forwarding to the last sender for now, we'll get them later 184 if (senderForLater != listener) { 185 listener.notifyReply(replyForLater); 186 } 187 188 } catch (Exception e) { 189 log.warn("notify: During dispatch to {}", listener, e); 190 } 191 } 192 // forward to the last listener who sent a message 193 // this is done _second_ so monitoring can have already stored the reply 194 // before a response is sent 195 if (senderForLater != null) { 196 senderForLater.notifyReply(replyForLater); 197 } 198 }; 199 javax.swing.SwingUtilities.invokeLater(rl); 200 } 201 202 protected synchronized void notifyReply(SprogReply r, SprogListener lastSender) { 203 log.debug("notifyReply starts for later, last sender: {}", lastSender); 204 205 final Vector<SprogListener> listeners = this.getCopyOfListeners(); 206 final SprogReply replyForLater = r; 207 final SprogListener senderForLater = lastSender; 208 Runnable rl = () -> { 209 log.debug("notifyReply starts last sender: {}", senderForLater); 210 for (SprogListener listener : listeners) { 211 try { 212 //if is message don't send it back to the originator! 213 // skip forwarding to the last sender for now, we'll get them later 214 if (senderForLater != listener) { 215 log.debug("Notify listener: {} {}", listener, r.toString()); 216 listener.notifyReply(replyForLater); 217 } 218 219 } catch (Exception e) { 220 log.warn("notify: During dispatch to {}", listener, e); 221 } 222 } 223 224 // forward to the last listener who sent a message 225 // this is done _second_ so monitoring can have already stored the reply 226 // before a response is sent 227 if (senderForLater != null) { 228 log.debug("notify last sender: {} {}", senderForLater, replyForLater.toString()); 229 senderForLater.notifyReply(replyForLater); 230 } 231 }; 232 javax.swing.SwingUtilities.invokeLater(rl); 233 } 234 235 // A class to remember the message and who sent it 236 static private class MessageTuple { 237 private final SprogMessage message; 238 private final SprogListener listener; 239 240 public MessageTuple(SprogMessage m, SprogListener l) { 241 message = m; 242 listener = l; 243 } 244 245 // Copy constructor 246 public MessageTuple(MessageTuple mt) { 247 message = mt.message; 248 listener = mt.listener; 249 } 250 } 251 252 // The queue to hold messages being sent 253 BlockingQueue<MessageTuple> sendQueue = new LinkedBlockingQueue<MessageTuple>(); 254 255 /** 256 * Enqueue a preformatted message to be sent to the actual interface 257 * 258 * @param m The message to be forwarded 259 */ 260 public void sendSprogMessage(SprogMessage m) { 261 log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId()); 262 try { 263 sendQueue.add(new MessageTuple(m, null)); 264 } catch (Exception e) { 265 log.error("Could not add message to queue", e); 266 } 267 } 268 269 /** 270 * Enqueue a preformatted message to be sent to the actual interface 271 * 272 * @param m Message to send 273 * @param replyTo Who is sending the message 274 */ 275 @Override 276 public synchronized void sendSprogMessage(SprogMessage m, SprogListener replyTo) { 277 log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId()); 278 try { 279 sendQueue.add(new MessageTuple(m, replyTo)); 280 } catch (Exception e) { 281 log.error("Could not add message to queue", e); 282 } 283 } 284 285 /** 286 * Block until a message is available from the queue, send it to the interface 287 * and then block until reply is received or a timeout occurs. This will be 288 * a very long timeout to allow for page mode programming operations in SPROG 289 * programmer mode. 290 */ 291 @Override 292 public void run() { 293 MessageTuple messageToSend; 294 log.debug("Traffic controller queuing thread starts"); 295 while (true) { 296 log.debug("Traffic controller queue waiting"); 297 try { 298 messageToSend = new MessageTuple(sendQueue.take()); 299 } catch (InterruptedException e) { 300 log.debug("Thread interrupted while dequeuing message to send"); 301 return; 302 } 303 log.debug("Message dequeued {} id: {}", messageToSend.message, messageToSend.message.getId()); 304 // remember who sent this 305 lastSender = messageToSend.listener; 306 lastId = messageToSend.message.getId(); 307 // notify all _other_ listeners 308 notifyMessage(messageToSend.message, messageToSend.listener); 309 replyAvailable = false; 310 sendToInterface(messageToSend.message); 311 log.debug("Waiting {} for a reply", timeout); 312 try { 313 synchronized (lock) { 314 lock.wait(timeout); // Wait for notify 315 } 316 } catch (InterruptedException e) { 317 log.debug("waitingForReply interrupted"); 318 } 319 if (!replyAvailable) { 320 // Timed out 321 log.warn("Timeout waiting for reply from hardware in SprogState {}", sprogState); 322 } else { 323 log.debug("Notified of reply"); 324 } 325 } 326 } 327 328 /** 329 * Forward a preformatted message to the interface. 330 * 331 * @param m The message to be forwarded 332 */ 333 public void sendToInterface(SprogMessage m) { 334 // stream to port in single write, as that's needed by serial 335 try { 336 if (ostream != null) { 337 ostream.write(m.getFormattedMessage(sprogState)); 338 log.debug("sendSprogMessage written to ostream"); 339 } else { 340 // no stream connected 341 log.warn("sendMessage: no connection established"); 342 } 343 } catch (Exception e) { 344 log.warn("sendMessage: Exception: ", e); 345 } 346 } 347 348// methods to connect/disconnect to a source of data in a SprogPortController 349 private AbstractPortController controller = null; 350 351 /** 352 * Make connection to existing PortController object. 353 * 354 * @param p The port controller 355 */ 356 public void connectPort(AbstractPortController p) { 357 istream = p.getInputStream(); 358 ostream = p.getOutputStream(); 359 if (controller != null) { 360 log.warn("connectPort: connect called while connected"); 361 } 362 controller = p; 363 } 364 365 /** 366 * Get the port controller, as a SerialDriverAdapter. 367 * 368 * @return the port controller 369 */ 370 protected SerialDriverAdapter getController(){ 371 return (SerialDriverAdapter) controller; 372 } 373 374 /** 375 * Break connection to existing SprogPortController object. 376 * <p> 377 * Once broken, attempts to send via "message" member will fail. 378 * 379 * @param p the connection to break 380 */ 381 public void disconnectPort(AbstractPortController p) { 382 istream = null; 383 ostream = null; 384 if (controller != p) { 385 log.warn("disconnectPort: disconnect called from non-connected SprogPortController"); 386 } 387 controller = null; 388 } 389 390 static volatile protected SprogTrafficController self = null; 391 392 public void setAdapterMemo(SprogSystemConnectionMemo adaptermemo) { 393 memo = adaptermemo; 394 } 395 396 public SprogSystemConnectionMemo getAdapterMemo() { 397 return memo; 398 } 399 400 private SprogSystemConnectionMemo memo = null; 401 402 // data members to hold the streams 403 DataInputStream istream = null; 404 OutputStream ostream = null; 405 406 boolean endReply(SprogReply msg) { 407 return msg.endNormalReply() || msg.endBootReply(); 408 } 409 410 private boolean unsolicited; 411 412 /** 413 * Handle an incoming reply. 414 */ 415 public void handleOneIncomingReply() { 416 // we get here if data has been received and this method is explicitly invoked 417 // fill the current reply with any data received 418 int replyCurrentSize = reply.getNumDataElements(); 419 int i; 420 for (i = replyCurrentSize; i < SprogReply.maxSize - replyCurrentSize; i++) { 421 try { 422 if (istream.available() == 0) { 423 break; // nothing waiting to be read 424 } 425 byte char1 = istream.readByte(); 426 reply.setElement(i, char1); 427 428 } catch (Exception e) { 429 log.warn("Exception in DATA_AVAILABLE state", e); 430 } 431 if (endReply(reply)) { 432 sendreply(); 433 break; 434 } 435 } 436 } 437 438 /** 439 * Send the current reply - built using data from serialEvent. 440 */ 441 private void sendreply() { 442 //send the reply 443 log.debug("dispatch reply of length {} in SprogState {}", reply.getNumDataElements(), sprogState); 444 if (unsolicited) { 445 log.debug("Unsolicited Reply"); 446 reply.setUnsolicited(); 447 } 448 // Insert the id 449 reply.setId(lastId); 450 notifyReply(reply, lastSender); 451 log.debug("Notify() wait"); 452 replyAvailable = true; 453 synchronized(lock) { 454 lock.notifyAll(); 455 } 456 457 //Create a new reply, ready to be filled 458 reply = new SprogReply(); 459 } 460 461 public void dispose(){ 462 tcThread.interrupt(); 463 } 464 465 private final static Logger log = LoggerFactory.getLogger(SprogTrafficController.class); 466 467}