001package jmri.jmrix.roco.z21;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.net.DatagramPacket;
005import java.util.ArrayList;
006import java.util.Arrays;
007import java.util.List;
008
009import jmri.jmrix.*;
010import jmri.util.StringUtil;
011import org.slf4j.Logger;
012import org.slf4j.LoggerFactory;
013
014/**
015 * Abstract base for TrafficControllers in a Message/Reply protocol.
016 *
017 * @author Paul Bender Copyright (C) 2014
018 */
019public class Z21TrafficController extends jmri.jmrix.AbstractMRTrafficController implements Z21Interface {
020
021    private java.net.InetAddress host;
022    private int port;
023
024    public Z21TrafficController() {
025        super();
026        allowUnexpectedReply = true;
027    }
028
029    /**
030     * Implement this to forward a specific message type to a protocol-specific
031     * listener interface. This puts the casting into the concrete class.
032     */
033    @Override
034    protected void forwardMessage(AbstractMRListener client, AbstractMRMessage m) {
035        ((Z21Listener) client).message((Z21Message) m);
036    }
037
038    /**
039     * Implement this to forward a specific Reply type to a protocol-specific
040     * listener interface. This puts the casting into the concrete class.
041     */
042    @Override
043    protected void forwardReply(AbstractMRListener client, AbstractMRReply m) {
044        ((Z21Listener) client).reply((Z21Reply) m);
045    }
046
047    /**
048     * Invoked if it's appropriate to do low-priority polling of the command
049     * station, this should return the next message to send, or null if the TC
050     * should just sleep.
051     */
052    @Override
053    protected Z21Message pollMessage() {
054        return null;
055    }
056
057    @Override
058    protected Z21Listener pollReplyHandler() {
059        return null;
060    }
061
062    /**
063     * enterProgMode() and enterNormalMode() return any message that
064     * needs to be returned to the command station to change modes.
065     *
066     * @see #enterNormalMode()
067     * @return if no message is needed, you may return null.
068     *
069     * If the programmerIdle() function returns true, enterNormalMode() is
070     * called after a timeout while in IDLESTATE during programming to
071     * return the system to normal mode.
072     */
073    @Override
074    protected Z21Message enterProgMode() {
075        return null;
076    }
077
078    /**
079     * enterProgMode() and enterNormalMode() return any message that
080     * needs to be returned to the command station to change modes.
081     *
082     * @see #enterProgMode()
083     * @return if no message is needed, you may return null.
084     */
085    @Override
086    protected Z21Message enterNormalMode() {
087        return null;
088    }
089
090    /**
091     * Actually transmits the next message to the port.
092     */
093    @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT", "", "UW_UNCOND_WAIT"},
094            justification = "Two locks needed for synchronization here, this is OK; String + only used for debug, so inefficient String processing not really a problem; Unconditional Wait is to give external hardware, which doesn't necessarilly respond, time to process the data.")
095    @Override
096    synchronized protected void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) {
097        if (log.isDebugEnabled()) {
098            log.debug("forwardToPort message: [{}]", m);
099        }
100        // remember who sent this
101        mLastSender = reply;
102
103        // forward the message to the registered recipients,
104        // which includes the communications monitor, except the sender.
105        // Schedule notification via the Swing event queue to ensure order
106        Runnable r = new XmtNotifier(m, mLastSender, this);
107        javax.swing.SwingUtilities.invokeLater(r);
108
109        // stream to port in single write, as that's needed by serial
110        byte[] msg = new byte[lengthOfByteStream(m)];
111        // add header
112        int offset = addHeaderToOutput(msg, m);
113
114        // add data content
115        int len = m.getNumDataElements();
116        for (int i = 0; i < len; i++) {
117            msg[i + offset] = (byte) m.getElement(i);
118        }
119        // add trailer
120        addTrailerToOutput(msg, len + offset, m);
121        // and send the bytes
122        try {
123            if (log.isDebugEnabled()) {
124                StringBuilder f = new StringBuilder("formatted message: ");
125                for (byte b : msg) {
126                    f.append(Integer.toHexString(0xFF & b));
127                    f.append(" ");
128                }
129                log.debug(new String(f));
130            }
131            while (m.getRetries() >= 0) {
132                if (portReadyToSend(controller)) {
133                    // create a datagram with the data from the
134                    // message.
135                    byte[] data = ((Z21Message) m).getBuffer();
136                    DatagramPacket sendPacket
137                            = new DatagramPacket(data, ((Z21Message) m).getLength(), host, port);
138                    // and send it.
139                    ((Z21Adapter) controller).getSocket().send(sendPacket);
140                    log.debug("written, msg timeout: {} mSec", m.getTimeout());
141                    break;
142                } else if (m.getRetries() >= 0) {
143                    if (log.isDebugEnabled()) {
144                        StringBuilder b = new StringBuilder("Retry message: ");
145                        b.append(m.toString());
146                        b.append(" attempts remaining: ");
147                        b.append(m.getRetries());
148                        log.debug(new String(b));
149                    }
150                    m.setRetries(m.getRetries() - 1);
151                    try {
152                        synchronized (xmtRunnable) {
153                            xmtRunnable.wait(m.getTimeout());
154                        }
155                    } catch (InterruptedException e) {
156                        Thread.currentThread().interrupt(); // retain if needed later
157                        if(!threadStopRequest) {
158                           log.error("retry wait interrupted");
159                        } else {
160                           log.error("retry wait interrupted during thread stop");
161                        }
162                    }
163                } else {
164                    log.warn("sendMessage: port not ready for data sending: {}", java.util.Arrays.toString(msg));
165                }
166            }
167        } catch (Exception e) {
168            // TODO Currently there's no port recovery if an exception occurs
169            // must restart JMRI to clear xmtException.
170            xmtException = true;
171            portWarn(e);
172        }
173    }
174
175    @Override()
176    public boolean status() {
177        if (controller == null) {
178            return false;
179        } else {
180            return (controller.status());
181        }
182    }
183
184    /**
185     * Make connection to existing PortController object.
186     */
187    @Override
188    public void connectPort(AbstractPortController p) {
189        rcvException = false;
190        xmtException = false;
191        if (controller != null) {
192            log.warn("connectPort: connect called while connected");
193        } else {
194            log.debug("connectPort invoked");
195        }
196        if (!(p instanceof Z21Adapter)) {
197            throw new IllegalArgumentException("attempt to connect wrong port type");
198        }
199        controller = p;
200        try {
201            host = java.net.InetAddress.getByName(((Z21Adapter) controller).getHostName());
202            port = ((Z21Adapter) controller).getPort();
203            ConnectionStatus.instance().setConnectionState(
204                    p.getSystemConnectionMemo().getUserName(),
205                    ((Z21Adapter) p).getHostName() + ":" + ((Z21Adapter) p).getPort(), ConnectionStatus.CONNECTION_UP);
206        } catch (java.net.UnknownHostException uhe) {
207            log.error("Unknown Host: {} ", ((Z21Adapter) controller).getHostName());
208            if (((Z21Adapter) p).getPort() != 0) {
209                ConnectionStatus.instance().setConnectionState(
210                        p.getSystemConnectionMemo().getUserName(),
211                        ((Z21Adapter) controller).getHostName() + ":" + ((Z21Adapter) p).getPort(), ConnectionStatus.CONNECTION_DOWN);
212            } else {
213                ConnectionStatus.instance().setConnectionState(
214                        p.getSystemConnectionMemo().getUserName(),
215                        ((Z21Adapter) controller).getHostName(), ConnectionStatus.CONNECTION_DOWN);
216            }
217        }
218        // and start threads
219        xmtThread = new Thread(xmtRunnable = () -> {
220            try {
221                transmitLoop();
222            } catch (Throwable e) {
223                if(!threadStopRequest)
224                    log.error("Transmit thread terminated prematurely by: {}", e.toString(), e);
225                // ThreadDeath must be thrown per Java API JavaDocs
226                if (e instanceof ThreadDeath) {
227                    throw e;
228                }
229            }
230        });
231        xmtThread.setName("z21.Z21TrafficController Transmit thread");
232        xmtThread.start();
233        rcvThread = new Thread(this::receiveLoop);
234        rcvThread.setName("z21.Z21TrafficController Receive thread");
235        int xr = rcvThread.getPriority();
236        xr++;
237        rcvThread.setPriority(xr);      //bump up the priority
238        rcvThread.start();
239    }
240
241    /**
242     * Break connection to existing PortController object. Once broken, attempts
243     * to send via "message" member will fail.
244     */
245    @Override
246    public void disconnectPort(AbstractPortController p) {
247        if (controller != p) {
248            log.warn("disconnectPort: disconnect called from non-connected AbstractPortController");
249        }
250        controller = null;
251    }
252
253    @Override
254    protected Z21Reply newReply() {
255        return new Z21Reply();
256    }
257
258    @Override
259    protected boolean endOfMessage(AbstractMRReply r) {
260        // since this is a UDP protocol, and each reply in the packet is complete,
261        // we don't check for end of message manually.
262        return true;
263    }
264
265    /**
266     * Handle each reply when complete.
267     * <p>
268     * (This is public for testing purposes) Runs in the "Receive" thread.
269     */
270    @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP", "NO_NOTIFY_NOT_NOTIFYALL"},
271            justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.  Notify is used because Having more than one thread waiting on xmtRunnable is an error.")
272    @Override
273    public void handleOneIncomingReply() throws java.io.IOException {
274        // we sit in this until the message is complete, relying on
275        // threading to let other stuff happen
276
277        // create a buffer to hold the incoming data.
278        byte[] buffer = new byte[100];  // the size here just needs to be longer
279        // than the longest protocol message.  
280        // Otherwise, the receive will truncate.
281
282        // create the packet.
283        DatagramPacket receivePacket = new DatagramPacket(buffer, 100, host, port);
284
285        // and wait to receive data in the packet.
286        try {
287            ((Z21Adapter) controller).getSocket().receive(receivePacket);
288        } catch (java.net.SocketException | NullPointerException se) {
289            // if we are waiting when the controller is disposed,
290            // a socket exception will be thrown.
291            log.debug("Socket exception during receive.  Connection Closed?");
292            rcvException = true;
293            return;
294        }
295        if (threadStopRequest) return;
296
297        // handle more than one reply in the same UDP packet.
298        List<Z21Reply> replies = new ArrayList<>();
299
300        int totalLength=receivePacket.getLength();
301        int consumed=0;
302
303        do {
304            int length = (0xff & buffer[0]) + ((0xff & buffer[1]) << 8);
305            Z21Reply msg = new Z21Reply(buffer, length);
306
307            replies.add(msg);
308
309            buffer = Arrays.copyOfRange(buffer,length,buffer.length);
310            consumed +=length;
311            log.trace("total length: {} consumed {}",totalLength,consumed);
312        } while(totalLength>consumed);
313
314
315        // and then dispatch each reply
316        replies.forEach(this::dispatchReply);
317    }
318
319    private void dispatchReply(Z21Reply msg) {
320        // message is complete, dispatch it !!
321        replyInDispatch = true;
322        if (log.isDebugEnabled()) {
323            log.debug("dispatch reply of length {} contains {} state {}", msg.getNumDataElements(), msg.toString(), mCurrentState);
324        }
325
326        // forward the message to the registered recipients,
327        // which includes the communications monitor
328        // return a notification via the Swing event queue to ensure proper thread
329        Runnable r = new RcvNotifier(msg, mLastSender, this);
330        try {
331            javax.swing.SwingUtilities.invokeAndWait(r);
332        } catch (InterruptedException ie) {
333            if(threadStopRequest) return;
334            log.error("Unexpected exception in invokeAndWait:{}", ie, ie);
335        } catch (Exception e) {
336            log.error("Unexpected exception in invokeAndWait:{}", e, e);
337        }
338        if (log.isDebugEnabled()) {
339            log.debug("dispatch thread invoked");
340        }
341
342        if (!msg.isUnsolicited()) {
343            // effect on transmit:
344            switch (mCurrentState) {
345                case WAITMSGREPLYSTATE: {
346                    // check to see if the response was an error message we want
347                    // to automatically handle by re-queueing the last sent
348                    // message, otherwise go on to the next message
349                    if (msg.isRetransmittableErrorMsg()) {
350                        if (log.isDebugEnabled()) {
351                            log.debug("Automatic Recovery from Error Message: +msg.toString()");
352                        }
353                        synchronized (xmtRunnable) {
354                            mCurrentState = AUTORETRYSTATE;
355                            replyInDispatch = false;
356                            xmtRunnable.notify();
357                        }
358                    } else {
359                        // update state, and notify to continue
360                        synchronized (xmtRunnable) {
361                            mCurrentState = NOTIFIEDSTATE;
362                            replyInDispatch = false;
363                            xmtRunnable.notify();
364                        }
365                    }
366                    break;
367                }
368                case WAITREPLYINPROGMODESTATE: {
369                    // entering programming mode
370                    mCurrentMode = PROGRAMINGMODE;
371                    replyInDispatch = false;
372
373                    // check to see if we need to delay to allow decoders to become
374                    // responsive
375                    int warmUpDelay = enterProgModeDelayTime();
376                    if (warmUpDelay != 0) {
377                        try {
378                            synchronized (xmtRunnable) {
379                                xmtRunnable.wait(warmUpDelay);
380                            }
381                        } catch (InterruptedException e) {
382                            Thread.currentThread().interrupt(); // retain if needed later
383                            if (threadStopRequest) return;
384                        }
385                    }
386                    // update state, and notify to continue
387                    synchronized (xmtRunnable) {
388                        mCurrentState = OKSENDMSGSTATE;
389                        xmtRunnable.notify();
390                    }
391                    break;
392                }
393                case WAITREPLYINNORMMODESTATE: {
394                    // entering normal mode
395                    mCurrentMode = NORMALMODE;
396                    replyInDispatch = false;
397                    // update state, and notify to continue
398                    synchronized (xmtRunnable) {
399                        mCurrentState = OKSENDMSGSTATE;
400                        xmtRunnable.notify();
401                    }
402                    break;
403                }
404                default: {
405                    replyInDispatch = false;
406                    if (allowUnexpectedReply) {
407                        if (log.isDebugEnabled()) {
408                            log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg.toString());
409                        }
410                        synchronized (xmtRunnable) {
411                            // The transmit thread sometimes gets stuck
412                            // when unexpected replies are received.  Notify
413                            // it to clear the block without a timeout.
414                            // (do not change the current state)
415                            //if(mCurrentState!=IDLESTATE)
416                            xmtRunnable.notify();
417                        }
418                    } else {
419                        unexpectedReplyStateError(mCurrentState,msg.toString());
420                    }
421                }
422            }
423            // Unsolicited message
424        } else {
425            if (log.isDebugEnabled()) {
426                log.debug("Unsolicited Message Received {}", msg.toString());
427            }
428
429            replyInDispatch = false;
430        }
431    }
432
433    @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"},
434            justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.")
435    @Override
436    protected void terminate() {
437        if (controller == null) {
438            log.debug("terminate called while not connected");
439            return;
440        } else {
441            log.debug("Cleanup Starts");
442        }
443
444        Z21Message logoffMessage = Z21Message.getLanLogoffRequestMessage();
445        forwardToPort(logoffMessage, null);
446        // wait for reply
447        try {
448            if (xmtRunnable != null) {
449                synchronized (xmtRunnable) {
450                    xmtRunnable.wait(logoffMessage.getTimeout());
451                }
452            }
453        } catch (InterruptedException e) {
454            Thread.currentThread().interrupt(); // retain if needed later
455            log.error("transmit interrupted");
456        } finally {
457            // set the controller to null, even if terminate fails.
458            controller = null;
459        }
460    }
461
462    /**
463     * Terminate the receive and transmit threads.
464     * <p>
465     * This is intended to be used only by testing subclasses.
466     */
467    @Override
468    public void terminateThreads() {
469        threadStopRequest = true;
470        // ensure socket closed to end pending operations
471        if ( controller != null && ((Z21Adapter) controller).getSocket() != null) ((Z21Adapter) controller).getSocket().close();
472        
473        // usual stop process
474        super.terminateThreads();
475    }
476
477    // The methods to implement the Z21Interface
478    @Override
479    public synchronized void addz21Listener(Z21Listener l) {
480        this.addListener(l);
481    }
482
483    @Override
484    public synchronized void removez21Listener(Z21Listener l) {
485        this.removeListener(l);
486    }
487
488    /**
489     * Forward a preformatted message to the actual interface.
490     */
491    @Override
492    public void sendz21Message(Z21Message m, Z21Listener reply) {
493        sendMessage(m, reply);
494    }
495
496    private final static Logger log = LoggerFactory.getLogger(Z21TrafficController.class);
497}