001package jmri.jmrix.roco.z21;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.net.DatagramPacket;
005import java.util.ArrayList;
006import java.util.Arrays;
007import java.util.List;
008
009import jmri.jmrix.*;
010import org.slf4j.Logger;
011import org.slf4j.LoggerFactory;
012
013/**
014 * Abstract base for TrafficControllers in a Message/Reply protocol.
015 *
016 * @author Paul Bender Copyright (C) 2014
017 */
018public class Z21TrafficController extends jmri.jmrix.AbstractMRTrafficController implements Z21Interface {
019
020    private java.net.InetAddress host;
021    private int port;
022
023    public Z21TrafficController() {
024        super();
025        allowUnexpectedReply = true;
026    }
027
028    /**
029     * Implement this to forward a specific message type to a protocol-specific
030     * listener interface. This puts the casting into the concrete class.
031     */
032    @Override
033    protected void forwardMessage(AbstractMRListener client, AbstractMRMessage m) {
034        ((Z21Listener) client).message((Z21Message) m);
035    }
036
037    /**
038     * Implement this to forward a specific Reply type to a protocol-specific
039     * listener interface. This puts the casting into the concrete class.
040     */
041    @Override
042    protected void forwardReply(AbstractMRListener client, AbstractMRReply m) {
043        ((Z21Listener) client).reply((Z21Reply) m);
044    }
045
046    /**
047     * Invoked if it's appropriate to do low-priority polling of the command
048     * station, this should return the next message to send, or null if the TC
049     * should just sleep.
050     */
051    @Override
052    protected Z21Message pollMessage() {
053        return null;
054    }
055
056    @Override
057    protected Z21Listener pollReplyHandler() {
058        return null;
059    }
060
061    /**
062     * enterProgMode() and enterNormalMode() return any message that
063     * needs to be returned to the command station to change modes.
064     *
065     * @see #enterNormalMode()
066     * @return if no message is needed, you may return null.
067     *
068     * If the programmerIdle() function returns true, enterNormalMode() is
069     * called after a timeout while in IDLESTATE during programming to
070     * return the system to normal mode.
071     */
072    @Override
073    protected Z21Message enterProgMode() {
074        return null;
075    }
076
077    /**
078     * enterProgMode() and enterNormalMode() return any message that
079     * needs to be returned to the command station to change modes.
080     *
081     * @see #enterProgMode()
082     * @return if no message is needed, you may return null.
083     */
084    @Override
085    protected Z21Message enterNormalMode() {
086        return null;
087    }
088
089    /**
090     * Actually transmits the next message to the port.
091     */
092    @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT", "", "UW_UNCOND_WAIT"},
093            justification = "Two locks needed for synchronization here, this is OK; String + only used for debug, so inefficient String processing not really a problem; Unconditional Wait is to give external hardware, which doesn't necessarilly respond, time to process the data.")
094    @Override
095    synchronized protected void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) {
096        if (log.isDebugEnabled()) {
097            log.debug("forwardToPort message: [{}]", m);
098        }
099        // remember who sent this
100        mLastSender = reply;
101
102        // forward the message to the registered recipients,
103        // which includes the communications monitor, except the sender.
104        // Schedule notification via the Swing event queue to ensure order
105        Runnable r = new XmtNotifier(m, mLastSender, this);
106        javax.swing.SwingUtilities.invokeLater(r);
107
108        // stream to port in single write, as that's needed by serial
109        byte[] msg = new byte[lengthOfByteStream(m)];
110        // add header
111        int offset = addHeaderToOutput(msg, m);
112
113        // add data content
114        int len = m.getNumDataElements();
115        for (int i = 0; i < len; i++) {
116            msg[i + offset] = (byte) m.getElement(i);
117        }
118        // add trailer
119        addTrailerToOutput(msg, len + offset, m);
120        // and send the bytes
121        try {
122            if (log.isDebugEnabled()) {
123                StringBuilder f = new StringBuilder("formatted message: ");
124                for (byte b : msg) {
125                    f.append(Integer.toHexString(0xFF & b));
126                    f.append(" ");
127                }
128                log.debug(new String(f));
129            }
130            while (m.getRetries() >= 0) {
131                if (portReadyToSend(controller)) {
132                    // create a datagram with the data from the
133                    // message.
134                    byte[] data = ((Z21Message) m).getBuffer();
135                    DatagramPacket sendPacket
136                            = new DatagramPacket(data, ((Z21Message) m).getLength(), host, port);
137                    // and send it.
138                    ((Z21Adapter) controller).getSocket().send(sendPacket);
139                    log.debug("written, msg timeout: {} mSec", m.getTimeout());
140                    break;
141                } else if (m.getRetries() >= 0) {
142                    if (log.isDebugEnabled()) {
143                        StringBuilder b = new StringBuilder("Retry message: ");
144                        b.append(m.toString());
145                        b.append(" attempts remaining: ");
146                        b.append(m.getRetries());
147                        log.debug(new String(b));
148                    }
149                    m.setRetries(m.getRetries() - 1);
150                    try {
151                        synchronized (xmtRunnable) {
152                            xmtRunnable.wait(m.getTimeout());
153                        }
154                    } catch (InterruptedException e) {
155                        Thread.currentThread().interrupt(); // retain if needed later
156                        if(!threadStopRequest) {
157                           log.error("retry wait interrupted");
158                        } else {
159                           log.error("retry wait interrupted during thread stop");
160                        }
161                    }
162                } else {
163                    log.warn("sendMessage: port not ready for data sending: {}", java.util.Arrays.toString(msg));
164                }
165            }
166        } catch (Exception e) {
167            // TODO Currently there's no port recovery if an exception occurs
168            // must restart JMRI to clear xmtException.
169            xmtException = true;
170            portWarn(e);
171        }
172    }
173
174    @Override()
175    public boolean status() {
176        if (controller == null) {
177            return false;
178        } else {
179            return (controller.status());
180        }
181    }
182
183    /**
184     * Make connection to existing PortController object.
185     */
186    @Override
187    public void connectPort(AbstractPortController p) {
188        rcvException = false;
189        xmtException = false;
190        if (controller != null) {
191            log.warn("connectPort: connect called while connected");
192        } else {
193            log.debug("connectPort invoked");
194        }
195        if (!(p instanceof Z21Adapter)) {
196            throw new IllegalArgumentException("attempt to connect wrong port type");
197        }
198        controller = p;
199        try {
200            host = java.net.InetAddress.getByName(((Z21Adapter) controller).getHostName());
201            port = ((Z21Adapter) controller).getPort();
202            ConnectionStatus.instance().setConnectionState(
203                    p.getSystemConnectionMemo().getUserName(),
204                    ((Z21Adapter) p).getHostName() + ":" + ((Z21Adapter) p).getPort(), ConnectionStatus.CONNECTION_UP);
205        } catch (java.net.UnknownHostException uhe) {
206            log.error("Unknown Host: {} ", ((Z21Adapter) controller).getHostName());
207            if (((Z21Adapter) p).getPort() != 0) {
208                ConnectionStatus.instance().setConnectionState(
209                        p.getSystemConnectionMemo().getUserName(),
210                        ((Z21Adapter) controller).getHostName() + ":" + ((Z21Adapter) p).getPort(), ConnectionStatus.CONNECTION_DOWN);
211            } else {
212                ConnectionStatus.instance().setConnectionState(
213                        p.getSystemConnectionMemo().getUserName(),
214                        ((Z21Adapter) controller).getHostName(), ConnectionStatus.CONNECTION_DOWN);
215            }
216        }
217        // and start threads
218        xmtThread = new Thread(xmtRunnable = () -> {
219            try {
220                transmitLoop();
221            } catch (Throwable e) {
222                if(!threadStopRequest)
223                    log.error("Transmit thread terminated prematurely by: {}", e.toString(), e);
224                // ThreadDeath must be thrown per Java API JavaDocs
225                if (e instanceof ThreadDeath) {
226                    throw e;
227                }
228            }
229        });
230        xmtThread.setName("z21.Z21TrafficController Transmit thread");
231        xmtThread.start();
232        rcvThread = new Thread(this::receiveLoop);
233        rcvThread.setName("z21.Z21TrafficController Receive thread");
234        int xr = rcvThread.getPriority();
235        xr++;
236        rcvThread.setPriority(xr);      //bump up the priority
237        rcvThread.start();
238    }
239
240    /**
241     * Break connection to existing PortController object. Once broken, attempts
242     * to send via "message" member will fail.
243     */
244    @Override
245    public void disconnectPort(AbstractPortController p) {
246        if (controller != p) {
247            log.warn("disconnectPort: disconnect called from non-connected AbstractPortController");
248        }
249        controller = null;
250    }
251
252    @Override
253    protected Z21Reply newReply() {
254        return new Z21Reply();
255    }
256
257    @Override
258    protected boolean endOfMessage(AbstractMRReply r) {
259        // since this is a UDP protocol, and each reply in the packet is complete,
260        // we don't check for end of message manually.
261        return true;
262    }
263
264    /**
265     * Handle each reply when complete.
266     * <p>
267     * (This is public for testing purposes) Runs in the "Receive" thread.
268     */
269    @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP", "NO_NOTIFY_NOT_NOTIFYALL"},
270            justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.  Notify is used because Having more than one thread waiting on xmtRunnable is an error.")
271    @Override
272    public void handleOneIncomingReply() throws java.io.IOException {
273        // we sit in this until the message is complete, relying on
274        // threading to let other stuff happen
275
276        // create a buffer to hold the incoming data.
277        byte[] buffer = new byte[100];  // the size here just needs to be longer
278        // than the longest protocol message.  
279        // Otherwise, the receive will truncate.
280
281        // create the packet.
282        DatagramPacket receivePacket = new DatagramPacket(buffer, 100, host, port);
283
284        // and wait to receive data in the packet.
285        try {
286            ((Z21Adapter) controller).getSocket().receive(receivePacket);
287        } catch (java.net.SocketException | NullPointerException se) {
288            // if we are waiting when the controller is disposed,
289            // a socket exception will be thrown.
290            log.debug("Socket exception during receive.  Connection Closed?");
291            rcvException = true;
292            return;
293        }
294        if (threadStopRequest) return;
295
296        // handle more than one reply in the same UDP packet.
297        List<Z21Reply> replies = new ArrayList<>();
298
299        int totalLength=receivePacket.getLength();
300        int consumed=0;
301
302        do {
303            int length = (0xff & buffer[0]) + ((0xff & buffer[1]) << 8);
304            Z21Reply msg = new Z21Reply(buffer, length);
305
306            replies.add(msg);
307
308            buffer = Arrays.copyOfRange(buffer,length,buffer.length);
309            consumed +=length;
310            log.trace("total length: {} consumed {}",totalLength,consumed);
311        } while(totalLength>consumed);
312
313
314        // and then dispatch each reply
315        replies.forEach(this::dispatchReply);
316    }
317
318    private void dispatchReply(Z21Reply msg) {
319        // message is complete, dispatch it !!
320        replyInDispatch = true;
321        if (log.isDebugEnabled()) {
322            log.debug("dispatch reply of length {} contains {} state {}", msg.getNumDataElements(), msg.toString(), mCurrentState);
323        }
324
325        // forward the message to the registered recipients,
326        // which includes the communications monitor
327        // return a notification via the Swing event queue to ensure proper thread
328        Runnable r = new RcvNotifier(msg, mLastSender, this);
329        try {
330            javax.swing.SwingUtilities.invokeAndWait(r);
331        } catch (InterruptedException ie) {
332            if(threadStopRequest) return;
333            log.error("Unexpected exception in invokeAndWait:{}", ie, ie);
334        } catch (Exception e) {
335            log.error("Unexpected exception in invokeAndWait:{}", e, e);
336        }
337        if (log.isDebugEnabled()) {
338            log.debug("dispatch thread invoked");
339        }
340
341        if (!msg.isUnsolicited()) {
342            // effect on transmit:
343            switch (mCurrentState) {
344                case WAITMSGREPLYSTATE: {
345                    // check to see if the response was an error message we want
346                    // to automatically handle by re-queueing the last sent
347                    // message, otherwise go on to the next message
348                    if (msg.isRetransmittableErrorMsg()) {
349                        if (log.isDebugEnabled()) {
350                            log.debug("Automatic Recovery from Error Message: +msg.toString()");
351                        }
352                        synchronized (xmtRunnable) {
353                            mCurrentState = AUTORETRYSTATE;
354                            replyInDispatch = false;
355                            xmtRunnable.notify();
356                        }
357                    } else {
358                        // update state, and notify to continue
359                        synchronized (xmtRunnable) {
360                            mCurrentState = NOTIFIEDSTATE;
361                            replyInDispatch = false;
362                            xmtRunnable.notify();
363                        }
364                    }
365                    break;
366                }
367                case WAITREPLYINPROGMODESTATE: {
368                    // entering programming mode
369                    mCurrentMode = PROGRAMINGMODE;
370                    replyInDispatch = false;
371
372                    // check to see if we need to delay to allow decoders to become
373                    // responsive
374                    int warmUpDelay = enterProgModeDelayTime();
375                    if (warmUpDelay != 0) {
376                        try {
377                            synchronized (xmtRunnable) {
378                                xmtRunnable.wait(warmUpDelay);
379                            }
380                        } catch (InterruptedException e) {
381                            Thread.currentThread().interrupt(); // retain if needed later
382                            if (threadStopRequest) return;
383                        }
384                    }
385                    // update state, and notify to continue
386                    synchronized (xmtRunnable) {
387                        mCurrentState = OKSENDMSGSTATE;
388                        xmtRunnable.notify();
389                    }
390                    break;
391                }
392                case WAITREPLYINNORMMODESTATE: {
393                    // entering normal mode
394                    mCurrentMode = NORMALMODE;
395                    replyInDispatch = false;
396                    // update state, and notify to continue
397                    synchronized (xmtRunnable) {
398                        mCurrentState = OKSENDMSGSTATE;
399                        xmtRunnable.notify();
400                    }
401                    break;
402                }
403                default: {
404                    replyInDispatch = false;
405                    if (allowUnexpectedReply) {
406                        if (log.isDebugEnabled()) {
407                            log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg.toString());
408                        }
409                        synchronized (xmtRunnable) {
410                            // The transmit thread sometimes gets stuck
411                            // when unexpected replies are received.  Notify
412                            // it to clear the block without a timeout.
413                            // (do not change the current state)
414                            //if(mCurrentState!=IDLESTATE)
415                            xmtRunnable.notify();
416                        }
417                    } else {
418                        unexpectedReplyStateError(mCurrentState,msg.toString());
419                    }
420                }
421            }
422            // Unsolicited message
423        } else {
424            if (log.isDebugEnabled()) {
425                log.debug("Unsolicited Message Received {}", msg.toString());
426            }
427
428            replyInDispatch = false;
429        }
430    }
431
432    @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"},
433            justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.")
434    @Override
435    protected void terminate() {
436        if (controller == null) {
437            log.debug("terminate called while not connected");
438            return;
439        } else {
440            log.debug("Cleanup Starts");
441        }
442
443        Z21Message logoffMessage = Z21Message.getLanLogoffRequestMessage();
444        forwardToPort(logoffMessage, null);
445        // wait for reply
446        try {
447            if (xmtRunnable != null) {
448                synchronized (xmtRunnable) {
449                    xmtRunnable.wait(logoffMessage.getTimeout());
450                }
451            }
452        } catch (InterruptedException e) {
453            Thread.currentThread().interrupt(); // retain if needed later
454            log.error("transmit interrupted");
455        } finally {
456            // set the controller to null, even if terminate fails.
457            controller = null;
458        }
459    }
460
461    /**
462     * Terminate the receive and transmit threads.
463     * <p>
464     * This is intended to be used only by testing subclasses.
465     */
466    @Override
467    public void terminateThreads() {
468        threadStopRequest = true;
469        // ensure socket closed to end pending operations
470        if ( controller != null && ((Z21Adapter) controller).getSocket() != null) ((Z21Adapter) controller).getSocket().close();
471        
472        // usual stop process
473        super.terminateThreads();
474    }
475
476    // The methods to implement the Z21Interface
477    @Override
478    public synchronized void addz21Listener(Z21Listener l) {
479        this.addListener(l);
480    }
481
482    @Override
483    public synchronized void removez21Listener(Z21Listener l) {
484        this.removeListener(l);
485    }
486
487    /**
488     * Forward a preformatted message to the actual interface.
489     */
490    @Override
491    public void sendz21Message(Z21Message m, Z21Listener reply) {
492        sendMessage(m, reply);
493    }
494
495    private final static Logger log = LoggerFactory.getLogger(Z21TrafficController.class);
496}