001package jmri.jmrix.loconet.loconetovertcp;
002
003import java.io.BufferedReader;
004import java.io.IOException;
005import java.io.InputStreamReader;
006import java.io.OutputStream;
007import java.net.Socket;
008import java.util.LinkedList;
009import java.util.StringTokenizer;
010import jmri.jmrix.loconet.LnTrafficController;
011import jmri.jmrix.loconet.LocoNetListener;
012import jmri.jmrix.loconet.LocoNetMessage;
013import org.slf4j.Logger;
014import org.slf4j.LoggerFactory;
015
016/**
017 * Implementation of the LocoNetOverTcp LbServer Server Protocol.
018 *
019 * @author Alex Shepherd Copyright (C) 2006
020 */
021public final class ClientRxHandler extends Thread implements LocoNetListener {
022
023    Socket clientSocket;
024    BufferedReader inStream;
025    OutputStream outStream;
026    final LinkedList<LocoNetMessage> msgQueue = new LinkedList<>();
027    volatile Thread txThread;
028    String inString;
029    String remoteAddress;
030    LocoNetMessage lastSentMessage;
031    LnTrafficController tc;
032
033    public ClientRxHandler(String newRemoteAddress, Socket newSocket, LnTrafficController _tc) {
034        tc = _tc;
035        clientSocket = newSocket;
036        setDaemon(true);
037        setPriority(Thread.MAX_PRIORITY);
038        remoteAddress = newRemoteAddress;
039        setName("ClientRxHandler:" + remoteAddress);
040        lastSentMessage = null;
041        start();
042    }
043
044    @Override
045    public void run() {
046
047        try {
048            inStream = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
049            outStream = clientSocket.getOutputStream();
050
051            tc.addLocoNetListener(~0, this);
052
053            txThread = new Thread(new ClientTxHandler(this));
054            txThread.setDaemon(true);
055            txThread.setPriority(Thread.MAX_PRIORITY);
056            txThread.setName("ClientTxHandler: " + remoteAddress);
057            txThread.start();
058
059            while (!isInterrupted()) {
060                inString = inStream.readLine();
061                if (inString == null) {
062                    log.debug("ClientRxHandler: Remote Connection Closed");
063                    interrupt();
064                } else {
065                    log.debug("ClientRxHandler: Received: {}", inString);
066
067                    StringTokenizer st = new StringTokenizer(inString);
068                    if (st.nextToken().equals("SEND")) {
069                        LocoNetMessage msg = null;
070                        int opCode = Integer.parseInt(st.nextToken(), 16);
071                        int byte2 = Integer.parseInt(st.nextToken(), 16);
072
073                        // Decide length
074                        switch ((opCode & 0x60) >> 5) {
075                            case 0: // 2 byte message
076
077                                msg = new LocoNetMessage(2);
078                                break;
079
080                            case 1: // 4 byte message
081
082                                msg = new LocoNetMessage(4);
083                                break;
084
085                            case 2: // 6 byte message
086
087                                msg = new LocoNetMessage(6);
088                                break;
089
090                            case 3: // N byte message
091
092                                if (byte2 < 2) {
093                                    log.error("ClientRxHandler: LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode));
094                                }
095                                msg = new LocoNetMessage(byte2);
096                                break;
097                            default:
098                                log.warn("Unhandled msg length: {}", (opCode & 0x60) >> 5);
099                                break;
100                        }
101                        if (msg == null) { // IDE may flag, spotbugs warns it can be null at this point, so keep this check
102                            log.error("msg is null!");
103                            return;
104                        }
105                        // message exists, now fill it
106                        msg.setOpCode(opCode);
107                        msg.setElement(1, byte2);
108                        int len = msg.getNumDataElements();
109                        //log.debug("len: "+len);
110
111                        for (int i = 2; i < len; i++) {
112                            int b = Integer.parseInt(st.nextToken(), 16);
113                            msg.setElement(i, b);
114                        }
115
116                        tc.sendLocoNetMessage(msg);
117                        // Keep the message we just sent so we can ACK it when we hear
118                        // the echo from the LocoBuffer
119                        lastSentMessage = msg;
120                    }
121                }
122            }
123        } catch (IOException ex) {
124            log.debug("ClientRxHandler: IO Exception: ", ex);
125        }
126        tc.removeLocoNetListener(~0, this);
127        if (txThread != null) txThread.interrupt();
128
129        txThread = null;
130        inStream = null;
131        outStream = null;
132        msgQueue.clear();
133
134        try {
135            clientSocket.close();
136        } catch (IOException ignore) {
137        }
138
139        LnTcpServer.getDefault().removeClient(this); // NPE here:
140        // log.info("ClientRxHandler: Exiting");
141    }
142
143    public void close() {
144        try {
145            clientSocket.close();
146        } catch (IOException ex1) {
147            log.error("close, which closing clientSocket", ex1);
148        }
149    }
150
151    class ClientTxHandler implements Runnable {
152
153        LocoNetMessage msg;
154        StringBuffer outBuf;
155        Thread parentThread;
156
157        ClientTxHandler(Thread creator) {
158            parentThread = creator;
159        }
160
161        @Override
162        public void run() {
163
164            try {
165                outBuf = new StringBuffer("VERSION JMRI Server ");
166                outBuf.append(jmri.Version.name()).append("\r\n");
167                outStream.write(outBuf.toString().getBytes());
168
169                while (!isInterrupted()) {
170                    msg = null;
171
172                    synchronized (msgQueue) {
173                        if (msgQueue.isEmpty()) {
174                            msgQueue.wait();
175                        }
176
177                        if (!msgQueue.isEmpty()) {
178                            msg = msgQueue.removeFirst();
179                        }
180                    }
181
182                    if (msg != null) {
183                        outBuf.setLength(0);
184                        outBuf.append("RECEIVE ");
185                        outBuf.append(msg.toString());
186                        log.debug("ClientTxHandler: Send: {}", outBuf.toString());
187                        outBuf.append("\r\n");
188                        // See if we are waiting for an echo of a sent message
189                        // and if it is append the Ack to the client
190                        if ((lastSentMessage != null) && lastSentMessage.equals(msg)) {
191                            lastSentMessage = null;
192                            outBuf.append("SENT OK\r\n");
193                        }
194                        outStream.write(outBuf.toString().getBytes());
195                        outStream.flush();
196                    }
197                }
198            } catch (IOException ex) {
199                log.error("ClientTxHandler: IO Exception");
200            } catch (InterruptedException ex) {
201                Thread.currentThread().interrupt(); // retain if needed later
202                log.debug("ClientTxHandler: Interrupted Exception");
203            }
204            // Interrupt the Parent to let it know we are exiting for some reason
205            parentThread.interrupt();
206
207            parentThread = null;
208            msg = null;
209            outBuf = null;
210            //log.info("ClientTxHandler: Exiting");
211        }
212    }
213
214    @Override
215    public void message(LocoNetMessage msg) {
216        synchronized (msgQueue) {
217            msgQueue.add(msg);
218            msgQueue.notify();
219        }
220    }
221
222    /**
223     * Kill this thread, usually for testing purposes
224     */
225    void dispose() {
226        try {
227            this.interrupt();
228            this.join();
229        } catch (InterruptedException ex) {
230            log.warn("dispose() interrupted");
231        }
232    }
233
234    private final static Logger log = LoggerFactory.getLogger(ClientRxHandler.class);
235
236}