001package jmri.jmrix.dccpp.dccppovertcp;
002
003import java.io.BufferedReader;
004import java.io.IOException;
005import java.io.InputStreamReader;
006import java.io.OutputStream;
007import java.net.Socket;
008import java.util.LinkedList;
009import jmri.InstanceManager;
010import jmri.jmrix.dccpp.DCCppListener;
011import jmri.jmrix.dccpp.DCCppMessage;
012import jmri.jmrix.dccpp.DCCppReply;
013import jmri.jmrix.dccpp.DCCppSystemConnectionMemo;
014import org.slf4j.Logger;
015import org.slf4j.LoggerFactory;
016
017import javax.annotation.concurrent.GuardedBy;
018
019/**
020 * Implementation of the DCCppOverTcp LbServer Server Protocol.
021 *
022 * @author Alex Shepherd Copyright (C) 2006
023 * @author Mark Underwood Copyright (C) 2015
024 */
025public final class ClientRxHandler extends Thread implements DCCppListener {
026
027    Socket clientSocket;
028    BufferedReader inStream;
029    OutputStream outStream;
030    @GuardedBy ("replyQueue")
031    final LinkedList<DCCppReply> replyQueue = new LinkedList<>(); // Init before Rx and Tx
032
033    Thread txThread;
034    String inString;
035    String remoteAddress;
036    DCCppMessage lastSentMessage;
037    private static final String oldSendPrefix = "SEND"; // lack of space is correct for legacy code
038    private static final String oldReceivePrefix = "RECEIVE "; // presence of space is correct for legacy code
039    private static final String sendPrefix = "<";
040    private static final String oldServerVersionString = "VERSION JMRI Server "; // CAREFUL: Changing this could break backward compatibility
041    private static final String newServerVersionString = "VERSION DCC++ Server ";
042    boolean useOldPrefix = false;
043
044    public ClientRxHandler(String newRemoteAddress, Socket newSocket) {
045        clientSocket = newSocket;
046        setDaemon(true);
047        setPriority(Thread.MAX_PRIORITY);
048        remoteAddress = newRemoteAddress;
049        setName("ClientRxHandler:" + remoteAddress);
050        lastSentMessage = null;
051        start();
052    }
053
054    @Override
055    public void run() {
056
057        DCCppSystemConnectionMemo memo = InstanceManager.getDefault(DCCppSystemConnectionMemo.class);
058
059        try {
060            txThread = new Thread(new ClientTxHandler(this));
061            txThread.setDaemon(true);
062            txThread.setPriority(Thread.MAX_PRIORITY);
063            txThread.setName("ClientTxHandler:" + remoteAddress);
064
065            inStream = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
066            outStream = clientSocket.getOutputStream();
067
068            memo.getDCCppTrafficController().addDCCppListener(~0, this);
069
070            txThread.start();
071
072            while (!isInterrupted()) {
073                inString = inStream.readLine();
074                if (inString == null) {
075                    log.debug("ClientRxHandler: Remote Connection Closed");
076                    interrupt();
077                } else {
078                    log.debug("ClientRxHandler: Received: {}", inString);
079
080                    // Check for the old server version string.  If present,
081                    // append the old-style prefixes to transmissions.
082                    // Not sure this ever happens. Only the client sends
083                    // the version string.
084                    if (inString.startsWith(oldServerVersionString)) {
085                        useOldPrefix = true;
086                    }
087                    // Legacy support: If the old prefix is there, delete it.
088                    // Also, set the flag so we will start sending old-style
089                    // prefixes.
090                    if (inString.startsWith(oldSendPrefix)) {
091                        useOldPrefix = true;
092                        final int trim = oldSendPrefix.length();
093                        inString = inString.substring(trim);
094                        log.debug("Adapted String: {}", inString);
095                    }
096                    // Check for the opening bracket
097                    if (!inString.startsWith(sendPrefix)) {
098                        log.debug("Invalid packet format: {}", inString);
099                        continue;
100                    }
101
102                    // BUG FIX: Incoming DCCppOverTCP messages are already formatted for DCC++ and don't
103                    // need to be parsed. Indeed, trying to parse them will screw them up.
104                    // So instead, we de-deprecated the string constructor so that we can
105                    // directly create a DCCppMessage from the incoming string without translation/parsing.
106                    DCCppMessage msg = new DCCppMessage(inString.substring(inString.indexOf('<') + 1,
107                            inString.lastIndexOf('>')));
108                    if (!msg.isValidMessageFormat()) {
109                        log.warn("Unknown Message Format '{}', forwarding anyway", msg);
110//                        continue;
111                    }
112
113                    memo.getDCCppTrafficController().sendDCCppMessage(msg, null);
114                    // Keep the message we just sent so we can ACK it when we hear
115                    // the echo from the LocoBuffer
116                    lastSentMessage = msg;
117                }
118            }
119        } catch (IOException ex) {
120            log.debug("ClientRxHandler: IO Exception: ", ex);
121        }
122
123        memo.getDCCppTrafficController().removeDCCppListener(~0, this);
124        txThread.interrupt();
125
126        txThread = null;
127        inStream = null;
128        outStream = null;
129        synchronized (replyQueue) {
130            replyQueue.clear();
131        }
132
133        try {
134            clientSocket.close();
135        } catch (IOException ex1) {
136            log.trace("Exception while closing client socket",ex1);
137        }
138
139        InstanceManager.getDefault(Server.class).removeClient(this);
140        log.info("ClientRxHandler: Exiting");
141    }
142
143    public void close() {
144        try {
145            clientSocket.close();
146        } catch (IOException ex1) {
147            log.error("close, which closing clientSocket", ex1);
148        }
149    }
150
151    class ClientTxHandler implements Runnable {
152
153        DCCppReply msg;
154        StringBuilder outBuf;
155        Thread parentThread;
156
157        ClientTxHandler(Thread creator) {
158            parentThread = creator;
159        }
160
161        @Override
162        public void run() {
163
164            try {
165                outBuf = new StringBuilder(newServerVersionString);
166                outBuf.append(jmri.Version.name()).append("\r\n");
167                outStream.write(outBuf.toString().getBytes());
168
169                while (!isInterrupted()) {
170                    msg = null;
171
172                    synchronized (replyQueue) {
173                        if (replyQueue.isEmpty()) {
174                            replyQueue.wait();
175                        }
176
177                        if (!replyQueue.isEmpty()) {
178                            msg = replyQueue.removeFirst();
179                            log.debug("Prepping to send message: {}", msg);
180                        }
181                    }
182
183                    if (msg != null) {
184                        outBuf.setLength(0);
185                        if (useOldPrefix) {
186                            outBuf.append(oldReceivePrefix);
187                        }
188                        outBuf.append("<");
189                        outBuf.append(msg.toString());
190                        outBuf.append(">");
191                        log.debug("ClientTxHandler: Send: {}", outBuf);
192                        outBuf.append("\r\n");
193                        outStream.write(outBuf.toString().getBytes());
194                        outStream.flush();
195                    }
196                }
197            } catch (IOException ex) {
198                log.error("ClientTxHandler: IO Exception");
199            } catch (InterruptedException ex) {
200                Thread.currentThread().interrupt(); // retain if needed later
201                log.debug("ClientTxHandler: Interrupted Exception");
202            }
203            // Interrupt the Parent to let it know we are exiting for some reason
204            parentThread.interrupt();
205
206            parentThread = null;
207            msg = null;
208            outBuf = null;
209            log.info("ClientTxHandler: Exiting");
210        }
211    }
212
213    @Override
214    public void message(DCCppMessage msg) {
215        // no need to handle outgoing messages
216    }
217
218    @Override
219    public void message(DCCppReply msg) {
220        synchronized (replyQueue) {
221            replyQueue.add(msg);
222            replyQueue.notifyAll();
223        }
224        log.debug("Message added to queue: {}", msg);
225    }
226
227    @Override
228    public void notifyTimeout(DCCppMessage m) {
229        // ToDo : handle timeouts
230    }
231
232    private static final Logger log = LoggerFactory.getLogger(ClientRxHandler.class);
233
234}