001package jmri.jmrix.dccpp.dccppovertcp; 002 003import java.io.BufferedReader; 004import java.io.IOException; 005import java.io.InputStreamReader; 006import java.io.OutputStream; 007import java.net.Socket; 008import java.util.LinkedList; 009import jmri.InstanceManager; 010import jmri.jmrix.dccpp.DCCppListener; 011import jmri.jmrix.dccpp.DCCppMessage; 012import jmri.jmrix.dccpp.DCCppReply; 013import jmri.jmrix.dccpp.DCCppSystemConnectionMemo; 014import org.slf4j.Logger; 015import org.slf4j.LoggerFactory; 016 017import javax.annotation.concurrent.GuardedBy; 018 019/** 020 * Implementation of the DCCppOverTcp LbServer Server Protocol. 021 * 022 * @author Alex Shepherd Copyright (C) 2006 023 * @author Mark Underwood Copyright (C) 2015 024 */ 025public final class ClientRxHandler extends Thread implements DCCppListener { 026 027 Socket clientSocket; 028 BufferedReader inStream; 029 OutputStream outStream; 030 @GuardedBy ("replyQueue") 031 final LinkedList<DCCppReply> replyQueue = new LinkedList<>(); // Init before Rx and Tx 032 033 Thread txThread; 034 String inString; 035 String remoteAddress; 036 DCCppMessage lastSentMessage; 037 private static final String oldSendPrefix = "SEND"; // lack of space is correct for legacy code 038 private static final String oldReceivePrefix = "RECEIVE "; // presence of space is correct for legacy code 039 private static final String sendPrefix = "<"; 040 private static final String oldServerVersionString = "VERSION JMRI Server "; // CAREFUL: Changing this could break backward compatibility 041 private static final String newServerVersionString = "VERSION DCC++ Server "; 042 boolean useOldPrefix = false; 043 044 public ClientRxHandler(String newRemoteAddress, Socket newSocket) { 045 clientSocket = newSocket; 046 setDaemon(true); 047 setPriority(Thread.MAX_PRIORITY); 048 remoteAddress = newRemoteAddress; 049 setName("ClientRxHandler:" + remoteAddress); 050 lastSentMessage = null; 051 start(); 052 } 053 054 @Override 055 public void run() { 056 057 DCCppSystemConnectionMemo memo = InstanceManager.getDefault(DCCppSystemConnectionMemo.class); 058 059 try { 060 txThread = new Thread(new ClientTxHandler(this)); 061 txThread.setDaemon(true); 062 txThread.setPriority(Thread.MAX_PRIORITY); 063 txThread.setName("ClientTxHandler:" + remoteAddress); 064 065 inStream = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); 066 outStream = clientSocket.getOutputStream(); 067 068 memo.getDCCppTrafficController().addDCCppListener(~0, this); 069 070 txThread.start(); 071 072 while (!isInterrupted()) { 073 inString = inStream.readLine(); 074 if (inString == null) { 075 log.debug("ClientRxHandler: Remote Connection Closed"); 076 interrupt(); 077 } else { 078 log.debug("ClientRxHandler: Received: {}", inString); 079 080 // Check for the old server version string. If present, 081 // append the old-style prefixes to transmissions. 082 // Not sure this ever happens. Only the client sends 083 // the version string. 084 if (inString.startsWith(oldServerVersionString)) { 085 useOldPrefix = true; 086 } 087 // Legacy support: If the old prefix is there, delete it. 088 // Also, set the flag so we will start sending old-style 089 // prefixes. 090 if (inString.startsWith(oldSendPrefix)) { 091 useOldPrefix = true; 092 final int trim = oldSendPrefix.length(); 093 inString = inString.substring(trim); 094 log.debug("Adapted String: {}", inString); 095 } 096 // Check for the opening bracket 097 if (!inString.startsWith(sendPrefix)) { 098 log.debug("Invalid packet format: {}", inString); 099 continue; 100 } 101 102 // BUG FIX: Incoming DCCppOverTCP messages are already formatted for DCC++ and don't 103 // need to be parsed. Indeed, trying to parse them will screw them up. 104 // So instead, we de-deprecated the string constructor so that we can 105 // directly create a DCCppMessage from the incoming string without translation/parsing. 106 DCCppMessage msg = new DCCppMessage(inString.substring(inString.indexOf('<') + 1, 107 inString.lastIndexOf('>'))); 108 if (!msg.isValidMessageFormat()) { 109 log.warn("Unknown Message Format '{}', forwarding anyway", msg); 110// continue; 111 } 112 113 memo.getDCCppTrafficController().sendDCCppMessage(msg, null); 114 // Keep the message we just sent so we can ACK it when we hear 115 // the echo from the LocoBuffer 116 lastSentMessage = msg; 117 } 118 } 119 } catch (IOException ex) { 120 log.debug("ClientRxHandler: IO Exception: ", ex); 121 } 122 123 memo.getDCCppTrafficController().removeDCCppListener(~0, this); 124 txThread.interrupt(); 125 126 txThread = null; 127 inStream = null; 128 outStream = null; 129 synchronized (replyQueue) { 130 replyQueue.clear(); 131 } 132 133 try { 134 clientSocket.close(); 135 } catch (IOException ex1) { 136 log.trace("Exception while closing client socket",ex1); 137 } 138 139 InstanceManager.getDefault(Server.class).removeClient(this); 140 log.info("ClientRxHandler: Exiting"); 141 } 142 143 public void close() { 144 try { 145 clientSocket.close(); 146 } catch (IOException ex1) { 147 log.error("close, which closing clientSocket", ex1); 148 } 149 } 150 151 class ClientTxHandler implements Runnable { 152 153 DCCppReply msg; 154 StringBuilder outBuf; 155 Thread parentThread; 156 157 ClientTxHandler(Thread creator) { 158 parentThread = creator; 159 } 160 161 @Override 162 public void run() { 163 164 try { 165 outBuf = new StringBuilder(newServerVersionString); 166 outBuf.append(jmri.Version.name()).append("\r\n"); 167 outStream.write(outBuf.toString().getBytes()); 168 169 while (!isInterrupted()) { 170 msg = null; 171 172 synchronized (replyQueue) { 173 if (replyQueue.isEmpty()) { 174 replyQueue.wait(); 175 } 176 177 if (!replyQueue.isEmpty()) { 178 msg = replyQueue.removeFirst(); 179 log.debug("Prepping to send message: {}", msg); 180 } 181 } 182 183 if (msg != null) { 184 outBuf.setLength(0); 185 if (useOldPrefix) { 186 outBuf.append(oldReceivePrefix); 187 } 188 outBuf.append("<"); 189 outBuf.append(msg.toString()); 190 outBuf.append(">"); 191 log.debug("ClientTxHandler: Send: {}", outBuf); 192 outBuf.append("\r\n"); 193 outStream.write(outBuf.toString().getBytes()); 194 outStream.flush(); 195 } 196 } 197 } catch (IOException ex) { 198 log.error("ClientTxHandler: IO Exception"); 199 } catch (InterruptedException ex) { 200 Thread.currentThread().interrupt(); // retain if needed later 201 log.debug("ClientTxHandler: Interrupted Exception"); 202 } 203 // Interrupt the Parent to let it know we are exiting for some reason 204 parentThread.interrupt(); 205 206 parentThread = null; 207 msg = null; 208 outBuf = null; 209 log.info("ClientTxHandler: Exiting"); 210 } 211 } 212 213 @Override 214 public void message(DCCppMessage msg) { 215 // no need to handle outgoing messages 216 } 217 218 @Override 219 public void message(DCCppReply msg) { 220 synchronized (replyQueue) { 221 replyQueue.add(msg); 222 replyQueue.notifyAll(); 223 } 224 log.debug("Message added to queue: {}", msg); 225 } 226 227 @Override 228 public void notifyTimeout(DCCppMessage m) { 229 // ToDo : handle timeouts 230 } 231 232 private static final Logger log = LoggerFactory.getLogger(ClientRxHandler.class); 233 234}