001package jmri.jmrix.can.adapters.gridconnect;
002
003import java.io.DataInputStream;
004import java.io.FilterInputStream;
005import java.io.IOException;
006import java.io.InputStream;
007import java.util.concurrent.BlockingQueue;
008import java.util.concurrent.LinkedBlockingQueue;
009
010import jmri.jmrix.ConnectionStatus;
011import jmri.jmrix.can.TrafficController;
012
013/**
014 * Implements SerialPortAdapter for the GridConnect protocol.
015 *
016 * @author Bob Jacobsen Copyright (C) 2001, 2002
017 * @author Andrew Crosland Copyright (C) 2008
018 * @author Balazs Racz Copyright (C) 2017
019 */
020public class GcSerialDriverAdapter extends GcPortController {
021
022    protected FlowControl flowControl = FlowControl.NONE; // disabled to start
023
024    /**
025     * Creates a new CAN GridConnect Network Driver Adapter.
026     */
027    public GcSerialDriverAdapter() {
028        super(new jmri.jmrix.can.CanSystemConnectionMemo());
029        option1Name = "Protocol"; // NOI18N
030        options.put(option1Name, new Option(Bundle.getMessage("ConnectionProtocol"),
031                jmri.jmrix.can.ConfigurationManager.getSystemOptions()));
032        this.manufacturerName = jmri.jmrix.merg.MergConnectionTypeList.MERG;
033        allowConnectionRecovery = true;
034    }
035
036    /**
037     * Creates a new CAN GridConnect Network Driver Adapter.
038     * <p>
039     * Allows for default systemPrefix other than "M".
040     * @param prefix System Prefix.
041     */
042    public GcSerialDriverAdapter(String prefix) {
043        super(new jmri.jmrix.can.CanSystemConnectionMemo(prefix));
044        option1Name = "Protocol"; // NOI18N
045        options.put(option1Name, new Option(Bundle.getMessage("ConnectionProtocol"),
046                jmri.jmrix.can.ConfigurationManager.getSystemOptions()));
047        this.manufacturerName = jmri.jmrix.merg.MergConnectionTypeList.MERG;
048        allowConnectionRecovery = true;
049    }
050
051    /**
052     * Creates a new CAN GridConnect Network Driver Adapter.
053     * <p>
054     * Allows for default systemPrefix other than "M".
055     * @param prefix System Prefix.
056     * @param flow flow control, true for RTS/CTS
057     */
058    public GcSerialDriverAdapter(String prefix, FlowControl flow) {
059        super(new jmri.jmrix.can.CanSystemConnectionMemo(prefix));
060        option1Name = "Protocol"; // NOI18N
061        options.put(option1Name, new Option(Bundle.getMessage("ConnectionProtocol"),
062                jmri.jmrix.can.ConfigurationManager.getSystemOptions()));
063        this.manufacturerName = jmri.jmrix.merg.MergConnectionTypeList.MERG;
064        allowConnectionRecovery = true;
065        flowControl = flow;
066    }
067
068    /**
069     * {@inheritDoc}
070     */
071    @Override
072    public String openPort(String portName, String appName) {
073
074        // get and open the primary port
075        currentSerialPort = activatePort(portName, log);
076        if (currentSerialPort == null) {
077            log.error("failed to connect CAN to {}", portName);
078            return Bundle.getMessage("SerialPortNotFound", portName);
079        }
080        log.info("Connecting CAN to {} {}", portName, currentSerialPort);
081        
082        // try to set it for communication via SerialDriver
083        // find the baud rate value, configure comm options
084        int baud = currentBaudNumber(mBaudRate);
085        setBaudRate(currentSerialPort, baud);
086        configureLeads(currentSerialPort, true, true);
087        localSetFlowControl();
088
089        // get and save stream
090        serialStream = currentSerialPort.getInputStream();
091        // this is referenced in several other methods, 
092        // so can't easily be removed.
093
094        // report status
095        reportPortStatus(log, portName);
096
097        opened = true;
098
099        return null; // indicates OK return
100    }
101
102    /** 
103     * Local set up the flow contro, here to allow override
104     */
105    protected void localSetFlowControl() {
106        setFlowControl(currentSerialPort, flowControl);
107    }
108
109    /**
110     * Set up all of the other objects to operate with a CAN RS adapter
111     * connected to this port.
112     * {@inheritDoc}
113     */
114    @Override
115    public void configure() {
116        // Register the CAN traffic controller being used for this connection
117        //GcTrafficController.instance();
118        TrafficController tc = makeGcTrafficController();
119        this.getSystemConnectionMemo().setTrafficController(tc);
120
121        // Now connect to the traffic controller
122        log.debug("Connecting port");
123        tc.connectPort(this);
124
125        this.getSystemConnectionMemo().setProtocol(getOptionState(option1Name));
126
127        // do central protocol-specific configuration
128        //jmri.jmrix.can.ConfigurationManager.configure(getOptionState(option1Name));
129        this.getSystemConnectionMemo().configureManagers();
130    }
131
132    /**
133     * {@inheritDoc}
134     * Reconnects to Traffic Controller.
135     * Updates connection status.
136     */
137    @Override
138    protected void resetupConnection() {
139        if (!getSystemConnectionMemo().getTrafficController().status()) {
140            getSystemConnectionMemo().getTrafficController().connectPort(this);
141            ConnectionStatus.instance().setConnectionState(getUserName(), getCurrentPortName(),
142                ((getSystemConnectionMemo().getTrafficController().status() && status()) ? ConnectionStatus.CONNECTION_UP : ConnectionStatus.CONNECTION_DOWN));
143        }
144    }
145
146    /**
147     * {@inheritDoc}
148     *
149     * Closes serial streams.
150     */
151    @Override
152    protected void closeConnection(){
153        log.info("Closing connection {}.",getCurrentPortName());
154        try {
155            if (serialStream!=null) {
156                serialStream.close();
157            }
158            serialStream = null;
159            if (bufferedStream!=null) {
160                bufferedStream.close();
161            }
162            bufferedStream = null;
163        }
164        catch ( IOException e ) {
165            log.error("unable to close {}",this.currentSerialPort);
166        }
167        if (currentSerialPort!=null) {
168            currentSerialPort.closePort();
169        }
170        currentSerialPort = null;
171    }
172
173    /**
174     * Make a new GC Traffic Controller.
175     * @return new GridConnect Traffic Controller.
176     */
177    protected GcTrafficController makeGcTrafficController() {
178        return new GcTrafficController();
179    }
180
181    /**
182     * Helper class wrapping the input serial port's InputStream.
183     * <p>
184     * It starts a helper thread at high priority that reads the input serial
185     * port as fast as it can, buffering all incoming data in memory in a queue.
186     * <p>
187     * The queue is unbounded and readers will get the data from the queue.
188     * <p>
189     * This class is thread-safe.
190     */
191    private static class AsyncBufferInputStream extends FilterInputStream {
192
193        private boolean active;
194
195        /**
196         * Create new AsyncBufferInputStream.
197         * @param inputStream Input Stream.
198         * @param portName Port Name.
199         */
200        AsyncBufferInputStream(InputStream inputStream, String portName) {
201            super(inputStream);
202            this.portName = portName;
203            active = true;
204            Thread rt = jmri.util.ThreadingUtil.newThread(this::readThreadBody);
205            rt.setName("GcSerialPort InputBufferThread " + portName);
206            rt.setDaemon(true);
207            rt.setPriority(Thread.MAX_PRIORITY);
208            rt.start();
209        }
210
211        /**
212         * Helper function that tries to perform a read from the underlying port
213         * with a given maximum length.
214         *
215         * @param maxLen how many bytes to request from the port. Setting this to 1
216         *            will apparently block the thread if there are zero bytes
217         *            available.
218         * @return a block of data read, or nullptr if fatal IO errors make
219         *         further use of this port impossible.
220         */
221        private BufferEntry tryRead(int maxLen) {
222            BufferEntry tail = new BufferEntry();
223            try {
224                // read what's available, up to maxLen, but always at least 1
225                int len = Math.max (1, Math.min(maxLen, in.available()));
226                tail.data = new byte[len];
227                tail.len = in.read(tail.data, 0, len);
228                errorCount = 0;
229            } catch (IOException e) {
230                tail.e = e;
231                if (++errorCount > MAX_IO_ERRORS_TO_ABORT) {
232                    log.error("Closing read thread due to too many IO errors", e);
233                    return null;
234                } else {
235                    log.debug("Error reading serial port {}", portName, e);
236                }
237            }
238            return tail;
239        }
240
241        /**
242         * Implementation of the buffering thread.
243         */
244        private void readThreadBody() {
245            BufferEntry tail;
246            while (active) {
247                // Try to read one byte to block the thread.
248                tail = tryRead(1);
249                if (tail == null) {
250                    return;
251                }
252                // NOTE: in order to reuse this class in a generic context, we need to add support
253                // for the underlying input stream persistently returning EOF. That does not
254                // happen on a serial port.
255                if (tail.len > 0 || tail.e != null) {
256                    readAhead.add(tail);
257                } else {
258                    continue;
259                }
260                // Read as many bytes as we have in large increments. Reading 128 bytes is a good
261                // compromise between throughput (4 gridconnect packets per kernel IO) but not
262                // wasting a lot of memory if less data actually shows up.
263                do {
264                    tail = tryRead(128);
265                    if (tail == null) {
266                        return;
267                    }
268                    if (tail.len > 0 || tail.e != null) {
269                        readAhead.add(tail);
270                    } else {
271                        break;
272                    }
273                } while (true);
274            }
275        }
276
277        /**
278         * We queue objects of this class between the read thread and the actual
279         * read() methods.
280         */
281        private static class BufferEntry {
282
283            // data payload
284            byte[] data;
285            // how many bytes of data are filled in
286            int len = 0;
287            // an exception was caught reading the input stream
288            IOException e = null;
289        }
290
291        /**
292         * {@inheritDoc}
293         */
294        @Override
295        public int read() throws IOException {
296            throw new UnsupportedOperationException();
297        }
298
299        /**
300         * {@inheritDoc}
301         */
302        @Override
303        public int read(byte[] bytes) throws IOException {
304            throw new UnsupportedOperationException();
305        }
306
307        /**
308         * {@inheritDoc}
309         */
310        @Override
311        public synchronized int read(byte[] bytes, int skip, int len) throws IOException {
312            if (skip != 0) {
313                throw new UnsupportedOperationException();
314            }
315            if (head == null || headOfs >= head.len) {
316                try {
317                    head = readAhead.take();
318                } catch (InterruptedException e) {
319                    Thread.currentThread().interrupt();
320                }
321                if (head.e != null) {
322                    throw head.e;
323                }
324                headOfs = 0;
325                if (head.len < 0) {
326                    return -1;
327                }
328            }
329            int cp = head.len - headOfs;
330            if (cp > len) {
331                cp = len;
332            }
333            System.arraycopy(head.data, headOfs, bytes, 0, cp);
334            headOfs += cp;
335            return cp;
336        }
337
338        private final String portName;
339        // After this many consecutive read attempts resulting in an exception we will terminate
340        // the read thread and return the last exception to the reader.
341        private final static int MAX_IO_ERRORS_TO_ABORT = 10;
342        // Queue holding the buffered data.
343        private final BlockingQueue<BufferEntry> readAhead = new LinkedBlockingQueue<>();
344        // The last entry we got from the queue if there are still bytes we need to return from it.
345        BufferEntry head = null;
346        // Offset of next live byte in head.
347        int headOfs = 0;
348        // How many of the last consecutive read attempts have resulted in an exception.
349        int errorCount = 0;
350
351        @Override
352        public void close() throws IOException {
353            active = false;
354            super.close();
355        }
356    }
357
358    /**
359     * Base class methods for the PortController interface.
360     * {@inheritDoc}
361     */
362    @Override
363    public DataInputStream getInputStream() {
364        if (!opened) {
365            log.error("getInputStream called before load(), stream not available");
366            return null;
367        }
368        synchronized (this) {
369            if (bufferedStream == null) {
370                bufferedStream = new AsyncBufferInputStream(serialStream, currentSerialPort.toString());
371            }
372            return new DataInputStream(bufferedStream);
373        }
374    }
375
376    /**
377     * {@inheritDoc}
378     *
379     * @return array of localized valid baud rates
380     */
381    @Override
382    public String[] validBaudRates() {
383        return new String[]{Bundle.getMessage("Baud57600"),
384                Bundle.getMessage("Baud115200"), Bundle.getMessage("Baud230400"),
385                Bundle.getMessage("Baud250000"), Bundle.getMessage("Baud333333"),
386                Bundle.getMessage("Baud460800")};
387    }
388
389    /**
390     * Get an array of valid baud rates.
391     *
392     * @return valid baud rates
393     */
394    @Override
395    public int[] validBaudNumbers() {
396        return new int[]{57600, 115200, 230400, 250000, 333333, 460800};
397    }
398
399    /**
400     * {@inheritDoc}
401     */
402    @Override
403    public int defaultBaudIndex() {
404        return 0;
405    }
406
407    // private control members
408    InputStream serialStream = null;
409    // Stream wrapper that buffers the input bytes.
410    private InputStream bufferedStream = null;
411
412    private final static org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(GcSerialDriverAdapter.class);
413
414}