001package jmri.jmrix.can.adapters.gridconnect;
002
003import java.io.DataInputStream;
004import java.io.DataOutputStream;
005import java.io.FilterInputStream;
006import java.io.IOException;
007import java.io.InputStream;
008import java.util.concurrent.BlockingQueue;
009import java.util.concurrent.LinkedBlockingQueue;
010
011import jmri.jmrix.ConnectionStatus;
012import jmri.jmrix.can.TrafficController;
013import org.slf4j.Logger;
014import org.slf4j.LoggerFactory;
015import purejavacomm.CommPortIdentifier;
016import purejavacomm.NoSuchPortException;
017import purejavacomm.PortInUseException;
018import purejavacomm.SerialPort;
019import purejavacomm.UnsupportedCommOperationException;
020
021/**
022 * Implements SerialPortAdapter for the GridConnect protocol.
023 *
024 * @author Bob Jacobsen Copyright (C) 2001, 2002
025 * @author Andrew Crosland Copyright (C) 2008
026 * @author Balazs Racz Copyright (C) 2017
027 */
028public class GcSerialDriverAdapter extends GcPortController {
029
030    protected SerialPort activeSerialPort = null;
031    protected int flowControl = purejavacomm.SerialPort.FLOWCONTROL_NONE;
032
033    /**
034     * Creates a new CAN GridConnect Network Driver Adapter.
035     */
036    public GcSerialDriverAdapter() {
037        super(new jmri.jmrix.can.CanSystemConnectionMemo());
038        option1Name = "Protocol"; // NOI18N
039        options.put(option1Name, new Option(Bundle.getMessage("ConnectionProtocol"),
040                jmri.jmrix.can.ConfigurationManager.getSystemOptions()));
041        this.manufacturerName = jmri.jmrix.merg.MergConnectionTypeList.MERG;
042        allowConnectionRecovery = true;
043    }
044
045    /**
046     * Creates a new CAN GridConnect Network Driver Adapter.
047     * <p>
048     * Allows for default systemPrefix other than "M".
049     * @param prefix System Prefix.
050     */
051    public GcSerialDriverAdapter(String prefix) {
052        super(new jmri.jmrix.can.CanSystemConnectionMemo(prefix));
053        option1Name = "Protocol"; // NOI18N
054        options.put(option1Name, new Option(Bundle.getMessage("ConnectionProtocol"),
055                jmri.jmrix.can.ConfigurationManager.getSystemOptions()));
056        this.manufacturerName = jmri.jmrix.merg.MergConnectionTypeList.MERG;
057        allowConnectionRecovery = true;
058    }
059
060    /**
061     * Creates a new CAN GridConnect Network Driver Adapter.
062     * <p>
063     * Allows for default systemPrefix other than "M".
064     * @param prefix System Prefix.
065     * @param flow flow control.
066     */
067    public GcSerialDriverAdapter(String prefix, int flow) {
068        super(new jmri.jmrix.can.CanSystemConnectionMemo(prefix));
069        option1Name = "Protocol"; // NOI18N
070        options.put(option1Name, new Option(Bundle.getMessage("ConnectionProtocol"),
071                jmri.jmrix.can.ConfigurationManager.getSystemOptions()));
072        this.manufacturerName = jmri.jmrix.merg.MergConnectionTypeList.MERG;
073        allowConnectionRecovery = true;
074        flowControl = flow;
075    }
076
077    /**
078     * {@inheritDoc}
079     */
080    @Override
081    public String openPort(String portName, String appName) {
082        // open the port, check ability to set moderators
083        try {
084            // get and open the primary port
085            CommPortIdentifier portID = CommPortIdentifier.getPortIdentifier(portName);
086            try {
087                activeSerialPort = (SerialPort) portID.open(appName, 2000);  // name of program, msec to wait
088            } catch (PortInUseException p) {
089                return handlePortBusy(p, portName, log);
090            }
091
092            // try to set it for communication via SerialDriver
093            try {
094                // find the baud rate value, configure comm options
095                int baud = currentBaudNumber(mBaudRate);
096                activeSerialPort.setSerialPortParams(baud, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE);
097            } catch (UnsupportedCommOperationException e) {
098                log.error("Cannot set serial parameters on port {}: {}", portName, e.getMessage());
099                return "Cannot set serial parameters on port " + portName + ": " + e.getMessage();
100            }
101
102            // Set requested flow control
103            configureLeadsAndFlowControl(activeSerialPort, flowControl);
104            activeSerialPort.enableReceiveTimeout(50);  // 50 mSec timeout before sending chars
105
106            // set timeout
107            // activeSerialPort.enableReceiveTimeout(1000);
108            log.debug("Serial timeout was observed as: {} {}",
109                    activeSerialPort.getReceiveTimeout(),
110                    activeSerialPort.isReceiveTimeoutEnabled());
111
112            // get and save stream
113            serialStream = activeSerialPort.getInputStream();
114
115            // purge contents, if any
116            purgeStream(serialStream);
117
118            // report status?
119            if (log.isInfoEnabled()) {
120                log.info("{} port opened at {} baud, sees  DTR: {} RTS: {} DSR: {} CTS: {}  CD: {}", portName, activeSerialPort.getBaudRate(), activeSerialPort.isDTR(), activeSerialPort.isRTS(), activeSerialPort.isDSR(), activeSerialPort.isCTS(), activeSerialPort.isCD());
121            }
122
123            opened = true;
124
125        } catch (NoSuchPortException p) {
126            return handlePortNotFound(p, portName, log);
127        } catch (UnsupportedCommOperationException | IOException ex) {
128            log.error("Unexpected exception while opening port {}", portName, ex);
129            return "Unexpected error while opening port " + portName + ": " + ex;
130        }
131
132        return null; // indicates OK return
133    }
134
135    /**
136     * Set up all of the other objects to operate with a CAN RS adapter
137     * connected to this port.
138     * {@inheritDoc}
139     */
140    @Override
141    public void configure() {
142        // Register the CAN traffic controller being used for this connection
143        //GcTrafficController.instance();
144        TrafficController tc = makeGcTrafficController();
145        this.getSystemConnectionMemo().setTrafficController(tc);
146
147        // Now connect to the traffic controller
148        log.debug("Connecting port");
149        tc.connectPort(this);
150
151        this.getSystemConnectionMemo().setProtocol(getOptionState(option1Name));
152
153        // do central protocol-specific configuration
154        //jmri.jmrix.can.ConfigurationManager.configure(getOptionState(option1Name));
155        this.getSystemConnectionMemo().configureManagers();
156    }
157    
158    /**
159     * {@inheritDoc}
160     * Reconnects to Traffic Controller.
161     * Updates connection status.
162     */
163    @Override
164    protected void resetupConnection() {
165        if (!getSystemConnectionMemo().getTrafficController().status()) {
166            getSystemConnectionMemo().getTrafficController().connectPort(this);
167            ConnectionStatus.instance().setConnectionState(getUserName(), getCurrentPortName(), 
168                ((getSystemConnectionMemo().getTrafficController().status() && status()) ? ConnectionStatus.CONNECTION_UP : ConnectionStatus.CONNECTION_DOWN));
169        }
170    }
171    
172    /**
173     * {@inheritDoc}
174     * 
175     * Closes serial streams.
176     */
177    @Override
178    protected void closeConnection(){
179        log.info("Closing connection {}.",getCurrentPortName());        
180        try {
181            if (serialStream!=null) {
182                serialStream.close();
183            }
184            serialStream = null;
185            if (bufferedStream!=null) {
186                bufferedStream.close();
187            }
188            bufferedStream = null;
189        }
190        catch ( IOException e ) {
191            log.error("unable to close {}",this.activeSerialPort.getName());
192        }
193        if (activeSerialPort!=null) {
194            activeSerialPort.close();
195        }
196        activeSerialPort = null;
197    }
198
199    /**
200     * Make a new GC Traffic Controller.
201     * @return new GridConnect Traffic Controller.
202     */
203    protected GcTrafficController makeGcTrafficController() {
204        return new GcTrafficController();
205    }
206
207    /**
208     * Helper class wrapping the input serial port's InputStream.
209     * <p>
210     * It starts a helper thread at high priority that reads the input serial 
211     * port as fast as it can, buffering all incoming data in memory in a queue.
212     * <p>
213     * The queue is unbounded and readers will get the data from the queue.
214     * <p>
215     * This class is thread-safe.
216     */
217    private static class AsyncBufferInputStream extends FilterInputStream {
218
219        private boolean active;
220        
221        /**
222         * Create new AsyncBufferInputStream.
223         * @param inputStream Input Stream.
224         * @param portName Port Name.
225         */
226        AsyncBufferInputStream(InputStream inputStream, String portName) {
227            super(inputStream);
228            this.portName = portName;
229            active = true;
230            Thread rt = jmri.util.ThreadingUtil.newThread(this::readThreadBody);
231            rt.setName("GcSerialPort InputBufferThread " + portName);
232            rt.setDaemon(true);
233            rt.setPriority(Thread.MAX_PRIORITY);
234            rt.start();
235        }
236
237        /**
238         * Helper function that tries to perform a read from the underlying port
239         * with a given maximum length.
240         *
241         * @param len how many bytes to request from the port. Setting this to 1
242         *            will apparently block the thread if there are zero bytes
243         *            available.
244         * @return a block of data read, or nullptr if fatal IO errors make
245         *         further use of this port impossible.
246         */
247        private BufferEntry tryRead(int len) {
248            BufferEntry tail = new BufferEntry();
249            try {
250                tail.data = new byte[len];
251                tail.len = in.read(tail.data, 0, len);
252                errorCount = 0;
253            } catch (IOException e) {
254                tail.e = e;
255                if (++errorCount > MAX_IO_ERRORS_TO_ABORT) {
256                    log.error("Closing read thread due to too many IO errors", e);
257                    return null;
258                } else {
259                    log.warn("Error reading serial port {}", portName, e);
260                }
261            } 
262            catch (purejavacomm.PureJavaIllegalStateException e) {
263                log.error("PureJavaIllegalStateException Illegal State, closing read thread.");
264                return null;
265            }
266            return tail;
267        }
268
269        /**
270         * Implementation of the buffering thread.
271         */
272        private void readThreadBody() {
273            BufferEntry tail;
274            while (active) {
275                // Try to read one byte to block the thread.
276                tail = tryRead(1);
277                if (tail == null) {
278                    return;
279                }
280                // NOTE: in order to reuse this class in a generic context, we need to add support
281                // for the underlying input stream persistently returning EOF. That does not
282                // happen on a serial port.
283                if (tail.len > 0 || tail.e != null) {
284                    readAhead.add(tail);
285                } else {
286                    continue;
287                }
288                // Read as many bytes as we have in large increments. REading 128 bytes is a good
289                // compromise between throughput (4 gridconnect packets per kernel IO) but not
290                // wasting a lot of memory if less data actually shows up.
291                do {
292                    tail = tryRead(128);
293                    if (tail == null) {
294                        return;
295                    }
296                    if (tail.len > 0 || tail.e != null) {
297                        readAhead.add(tail);
298                    } else {
299                        break;
300                    }
301                } while (true);
302            }
303        }
304
305        /**
306         * We queue objects of this class between the read thread and the actual
307         * read() methods.
308         */
309        private static class BufferEntry {
310
311            // data payload
312            byte[] data;
313            // how many bytes of data are filled in
314            int len = 0;
315            // an exception was caught reading the input stream
316            IOException e = null;
317        }
318
319        /**
320         * {@inheritDoc}
321         */
322        @Override
323        public int read() throws IOException {
324            throw new UnsupportedOperationException();
325        }
326
327        /**
328         * {@inheritDoc}
329         */
330        @Override
331        public int read(byte[] bytes) throws IOException {
332            throw new UnsupportedOperationException();
333        }
334
335        /**
336         * {@inheritDoc}
337         */
338        @Override
339        public synchronized int read(byte[] bytes, int skip, int len) throws IOException {
340            if (skip != 0) {
341                throw new UnsupportedOperationException();
342            }
343            if (head == null || headOfs >= head.len) {
344                try {
345                    head = readAhead.take();
346                } catch (InterruptedException e) {
347                    Thread.currentThread().interrupt();
348                }
349                if (head.e != null) {
350                    throw head.e;
351                }
352                headOfs = 0;
353                if (head.len < 0) {
354                    return -1;
355                }
356            }
357            int cp = head.len - headOfs;
358            if (cp > len) {
359                cp = len;
360            }
361            System.arraycopy(head.data, headOfs, bytes, 0, cp);
362            headOfs += cp;
363            return cp;
364        }
365
366        private final String portName;
367        // After this many consecutive read attempts resulting in an exception we will terminate
368        // the read thread and return the last exception to the reader.
369        private final static int MAX_IO_ERRORS_TO_ABORT = 10;
370        // Queue holding the buffered data.
371        private final BlockingQueue<BufferEntry> readAhead = new LinkedBlockingQueue<>();
372        // The last entry we got from the queue if there are still bytes we need to return from it.
373        BufferEntry head = null;
374        // Offset of next live byte in head.
375        int headOfs = 0;
376        // How many of the last consecutive read attempts have resulted in an exception.
377        int errorCount = 0;
378        
379        @Override
380        public void close() throws IOException {
381            active = false;
382            super.close();
383        }
384    }
385
386    /**
387     * Base class methods for the PortController interface.
388     * {@inheritDoc}
389     */
390    @Override
391    public DataInputStream getInputStream() {
392        if (!opened) {
393            log.error("getInputStream called before load(), stream not available");
394            return null;
395        }
396        synchronized (this) {
397            if (bufferedStream == null) {
398                bufferedStream = new AsyncBufferInputStream(serialStream, activeSerialPort.getName());
399            }
400            return new DataInputStream(bufferedStream);
401        }
402    }
403
404    /**
405     * {@inheritDoc}
406     */
407    @Override
408    public DataOutputStream getOutputStream() {
409        if (!opened) {
410            log.error("getOutputStream called before load(), stream not available");
411        }
412        try {
413            return new DataOutputStream(activeSerialPort.getOutputStream());
414        } catch (java.io.IOException e) {
415            log.error("getOutputStream exception: {}", e.getMessage());
416        }
417        return null;
418    }
419
420    /**
421     * {@inheritDoc}
422     *
423     * @return array of localized valid baud rates
424     */
425    @Override
426    public String[] validBaudRates() {
427        return new String[]{Bundle.getMessage("Baud57600"),
428                Bundle.getMessage("Baud115200"), Bundle.getMessage("Baud230400"),
429                Bundle.getMessage("Baud250000"), Bundle.getMessage("Baud333333"),
430                Bundle.getMessage("Baud460800")};
431    }
432
433    /**
434     * Get an array of valid baud rates.
435     *
436     * @return valid baud rates
437     */
438    @Override
439    public int[] validBaudNumbers() {
440        return new int[]{57600, 115200, 230400, 250000, 333333, 460800};
441    }
442
443    /**
444     * {@inheritDoc}
445     */
446    @Override
447    public int defaultBaudIndex() {
448        return 0;
449    }
450
451    /**
452     * Migration method.
453     * @return array of valid baud numbers.
454     * @deprecated since 4.16
455     */
456    @Deprecated
457    public int[] validBaudValues() {
458        return validBaudNumbers();
459    }
460
461    // private control members
462    InputStream serialStream = null;
463    // Stream wrapper that buffers the input bytes.
464    private InputStream bufferedStream = null;
465
466    private final static Logger log = LoggerFactory.getLogger(GcSerialDriverAdapter.class);
467
468}