001package jmri.jmrix.mqtt; 002 003import java.io.IOException; 004import java.util.*; 005 006import javax.annotation.Nonnull; 007 008import org.apiguardian.api.API; 009import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 010import org.eclipse.paho.client.mqttv3.MqttCallback; 011import org.eclipse.paho.client.mqttv3.MqttClient; 012import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 013import org.eclipse.paho.client.mqttv3.MqttException; 014import org.eclipse.paho.client.mqttv3.MqttMessage; 015import org.eclipse.paho.client.mqttv3.MqttTopic; 016import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; 017 018import org.slf4j.Logger; 019import org.slf4j.LoggerFactory; 020 021/** 022 * Communications adapter for Mqtt communications links. 023 * 024 * @author Lionel Jeanson 025 * @author Bob Jacobsen Copyright (c) 2019, 2023 026 */ 027@API(status=API.Status.MAINTAINED) 028public class MqttAdapter extends jmri.jmrix.AbstractNetworkPortController implements MqttCallback { 029 030 private final static String PROTOCOL = "tcp://"; 031 private final static String DEFAULT_BASETOPIC = Bundle.getMessage("TopicBase"); 032 033 // 0.1 to get it to the front of the list 034 private final static String MQTT_USERNAME_OPTION = "0.1"; 035 036 // 0.2 to get it to the front of the list 037 private final static String MQTT_PASSWORD_OPTION = "0.2"; 038 039 public boolean retained = true; // public for script access 040 public int qosflag = 2; // public for script access 041 042 /** 043 * Otherwise known as "Channel", this is prepended to the 044 * topic for all JMRI inward and outward communications. 045 * Typically set by preferences at startup. Changing it 046 * after startup might have no or bad effect. 047 */ 048 @API(status=API.Status.MAINTAINED) 049 public String baseTopic = DEFAULT_BASETOPIC; 050 051 HashMap<String, ArrayList<MqttEventListener>> mqttEventListeners = new HashMap<>(); 052 053 MqttClient mqttClient; 054 055 @API(status=API.Status.INTERNAL) 056 public MqttAdapter() { 057 super(new MqttSystemConnectionMemo()); 058 log.debug("Doing ctor..."); 059 060 options.put(MQTT_USERNAME_OPTION, new Option(Bundle.getMessage("MQTT_Username"), 061 new String[]{""}, Option.Type.TEXT)); 062 063 options.put(MQTT_PASSWORD_OPTION, new Option(Bundle.getMessage("MQTT_Password"), 064 new String[]{""}, Option.Type.PASSWORD)); 065 066 option2Name = "0 MQTTchannel"; // 0 to get it to the front of the list 067 options.put(option2Name, new Option(Bundle.getMessage("NameTopicBase"), 068 new String[]{baseTopic}, Option.Type.TEXT)); 069 070 options.put("10.3", new Option(Bundle.getMessage("NameTopicTurnoutSend"), 071 new String[]{Bundle.getMessage("TopicTurnoutSend")}, Option.Type.TEXT)); 072 options.put("10.5", new Option(Bundle.getMessage("NameTopicTurnoutRcv"), 073 new String[]{Bundle.getMessage("TopicTurnoutRcv")}, Option.Type.TEXT)); 074 075 076 options.put("11.3", new Option(Bundle.getMessage("NameTopicSensorSend"), 077 new String[]{Bundle.getMessage("TopicSensorSend")}, Option.Type.TEXT)); 078 options.put("11.5", new Option(Bundle.getMessage("NameTopicSensorRcv"), 079 new String[]{Bundle.getMessage("TopicSensorRcv")}, Option.Type.TEXT)); 080 081 options.put("12.3", new Option(Bundle.getMessage("NameTopicLightSend"), 082 new String[]{Bundle.getMessage("TopicLightSend")}, Option.Type.TEXT)); 083 options.put("12.5", new Option(Bundle.getMessage("NameTopicLightRcv"), 084 new String[]{Bundle.getMessage("TopicLightRcv")}, Option.Type.TEXT)); 085 086 options.put("13", new Option("Reporter topic :", new String[]{Bundle.getMessage("TopicReporter")}, Option.Type.TEXT)); 087 options.put("14", new Option("Signal Head topic :", new String[]{Bundle.getMessage("TopicSignalHead")}, Option.Type.TEXT)); 088 options.put("15", new Option("Signal Mast topic :", new String[]{Bundle.getMessage("TopicSignalMast")}, Option.Type.TEXT)); 089 options.put("16.3", new Option(Bundle.getMessage("NameTopicThrottleSend"), 090 new String[]{Bundle.getMessage("TopicThrottleSend")}, Option.Type.TEXT)); 091 options.put("16.5", new Option(Bundle.getMessage("NameTopicThrottleRcv"), 092 new String[]{Bundle.getMessage("TopicThrottleRcv")}, Option.Type.TEXT)); 093 options.put("17.3", new Option(Bundle.getMessage("NameTopicDirectionSend"), 094 new String[]{Bundle.getMessage("TopicDirectionSend")}, Option.Type.TEXT)); 095 options.put("17.5", new Option(Bundle.getMessage("NameTopicDirectionRcv"), 096 new String[]{Bundle.getMessage("TopicDirectionRcv")}, Option.Type.TEXT)); 097 options.put("18.3", new Option(Bundle.getMessage("NameTopicFunctionSend"), 098 new String[]{Bundle.getMessage("TopicFunctionSend")}, Option.Type.TEXT)); 099 options.put("18.5", new Option(Bundle.getMessage("NameTopicFunctionRcv"), 100 new String[]{Bundle.getMessage("TopicFunctionRcv")}, Option.Type.TEXT)); 101 options.put("19.3", new Option(Bundle.getMessage("NameTopicConsistSend"), 102 new String[]{Bundle.getMessage("TopicConsistSend")}, Option.Type.TEXT)); 103 options.put("20.3", new Option(Bundle.getMessage("NameTopicPowerSend"), 104 new String[]{Bundle.getMessage("TopicPowerSend")}, Option.Type.TEXT)); 105 options.put("20.5", new Option(Bundle.getMessage("NameTopicPowerRcv"), 106 new String[]{Bundle.getMessage("TopicPowerRcv")}, Option.Type.TEXT)); 107 108 options.put("LastWillTopic", new Option(Bundle.getMessage("NameTopicLastWill"), 109 new String[]{Bundle.getMessage("TopicLastWill")}, Option.Type.TEXT)); 110 options.put("LastWillMessage", new Option(Bundle.getMessage("NameMessageLastWill"), 111 new String[]{Bundle.getMessage("MessageLastWill")}, Option.Type.TEXT)); 112 allowConnectionRecovery = true; 113 114 } 115 116 public MqttConnectOptions getMqttConnectionOptions() { 117 118 // Setup the MQTT Connection Options 119 MqttConnectOptions mqttConnOpts = new MqttConnectOptions(); 120 mqttConnOpts.setCleanSession(true); 121 if ( getOptionState(MQTT_USERNAME_OPTION) != null 122 && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) { 123 mqttConnOpts.setUserName(getOptionState(MQTT_USERNAME_OPTION)); 124 mqttConnOpts.setPassword(getOptionState(MQTT_PASSWORD_OPTION).toCharArray()); 125 } 126 127 //set Last Will 128 if (! getOptionState("LastWillTopic").isEmpty() 129 && ! getOptionState("LastWillMessage").isEmpty()) { 130 mqttConnOpts.setWill(baseTopic + getOptionState("LastWillTopic"), 131 getOptionState("LastWillMessage").getBytes(), 132 qosflag, 133 true); 134 } 135 136 return mqttConnOpts; 137 } 138 139 @Override 140 @API(status=API.Status.INTERNAL) 141 public void configure() { 142 log.debug("Doing configure..."); 143 mqttEventListeners = new HashMap<>(); 144 getSystemConnectionMemo().setMqttAdapter(this); 145 getSystemConnectionMemo().configureManagers(); 146 } 147 148 @Override 149 @API(status=API.Status.INTERNAL) 150 public void connect() throws IOException { 151 log.info("MQTT starting connect with MQTTchannel = \"{}\"", getOptionState(option2Name)); 152 153 try { 154 if ( getOptionState(option2Name)!= null && ! getOptionState(option2Name).trim().isEmpty()) { 155 baseTopic = getOptionState(option2Name); 156 } 157 158 // have to make that a valid choice, overriding the original above. This 159 // is ugly and temporary. 160 if (! DEFAULT_BASETOPIC.equals(baseTopic)) { 161 options.put(option2Name, new Option("MQTT channel: ", new String[]{baseTopic, DEFAULT_BASETOPIC})); 162 } 163 164 // generate a unique client ID based on the network ID and the system prefix of the MQTT connection. 165 String clientID = jmri.InstanceManager.getDefault(jmri.web.server.WebServerPreferences.class).getRailroadName(); 166 167 // ensure that only guaranteed valid characters are included in the client ID 168 clientID = clientID.replaceAll("[^A-Za-z0-9]", ""); 169 170 String clientIDsuffix = "JMRI" + Integer.toHexString(jmri.util.node.NodeIdentity.networkIdentity().hashCode()) .toUpperCase() + getSystemPrefix(); 171 172 // Trim railroad name to fit within MQTT client id 23 character limit. 173 if (clientID.length() > 23 - clientIDsuffix.length()) 174 clientID = clientID.substring(0,23 - clientIDsuffix.length()); 175 176 clientID = clientID + clientIDsuffix; 177 178 log.info("Connection {} is using a clientID of \"{}\"", getSystemPrefix(), clientID); 179 180 String tempdirName = jmri.util.FileUtil.getExternalFilename(jmri.util.FileUtil.PROFILE); 181 log.debug("will use {} as temporary directory", tempdirName); 182 183 mqttClient = getNewMqttClient(clientID, tempdirName); 184 185 if ((getOptionState(MQTT_USERNAME_OPTION) != null 186 && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) 187 || ( ! getOptionState("LastWillTopic").isEmpty() 188 && ! getOptionState("LastWillMessage").isEmpty())) { 189 mqttClient.connect(getMqttConnectionOptions()); 190 191 } else { 192 mqttClient.connect(); 193 } 194 195 if ( ! getOptionState("LastWillTopic").isEmpty()) { 196 publish(getOptionState("LastWillTopic"), ""); 197 } 198 199 mqttClient.setCallback(this); 200 201 } catch (MqttException ex) { 202 throw new IOException("Can't create MQTT client", ex); 203 } 204 } 205 206 MqttClient getNewMqttClient(String clientID, String tempdirName) throws MqttException { 207 return new MqttClient(PROTOCOL + getCurrentPortName(), 208 clientID, new MqttDefaultFilePersistence(tempdirName)); 209 } 210 211 @Override 212 @API(status=API.Status.MAINTAINED) 213 public MqttSystemConnectionMemo getSystemConnectionMemo() { 214 return (MqttSystemConnectionMemo) super.getSystemConnectionMemo(); 215 } 216 217 @API(status=API.Status.MAINTAINED) 218 public void subscribe(String topic, MqttEventListener mel) { 219 if (mqttEventListeners == null || mqttClient == null) { 220 jmri.util.LoggingUtil.warnOnce(log, "Trying to subscribe before connect/configure is done"); 221 return; 222 } 223 try { 224 String fullTopic = baseTopic + topic; 225 if (mqttEventListeners.containsKey(fullTopic)) { 226 if (!mqttEventListeners.get(fullTopic).contains(mel)) { 227 mqttEventListeners.get(fullTopic).add(mel); 228 } 229 return; 230 } 231 ArrayList<MqttEventListener> mels = new ArrayList<>(); 232 mels.add(mel); 233 mqttEventListeners.put(fullTopic, mels); 234 mqttClient.subscribe(fullTopic); 235 log.debug("Subscribed : \"{}\"", fullTopic); 236 } catch (MqttException ex) { 237 log.error("Can't subscribe : ", ex); 238 } 239 } 240 241 @API(status=API.Status.MAINTAINED) 242 public void unsubscribe(String topic, MqttEventListener mel) { 243 String fullTopic = baseTopic + topic; 244 if (mqttEventListeners == null || mqttClient == null) { 245 jmri.util.LoggingUtil.warnOnce(log, "Trying to unsubscribe before connect/configure is done"); 246 return; 247 } 248 try { 249 mqttEventListeners.get(fullTopic).remove(mel); 250 } catch (NullPointerException e) { 251 // Not subscribed 252 log.debug("Unsubscribe but not subscribed: \"{}\"", fullTopic); 253 return; 254 } 255 if (mqttEventListeners.get(fullTopic).isEmpty()) { 256 try { 257 mqttClient.unsubscribe(fullTopic); 258 mqttEventListeners.remove(fullTopic); 259 log.debug("Unsubscribed : \"{}\"", fullTopic); 260 } catch (MqttException ex) { 261 log.error("Can't unsubscribe : ", ex); 262 } 263 } 264 } 265 266 @API(status=API.Status.MAINTAINED) 267 public void unsubscribeall(MqttEventListener mel) { 268 mqttEventListeners.keySet().forEach((t) -> { 269 unsubscribe(t, mel); 270 }); 271 } 272 273 /** 274 * Send a message over the existing link to a broker. 275 * @param topic The topic, which follows the channel and precedes the payload in the message 276 * @param payload The payload makes up the final part of the message 277 */ 278 @API(status=API.Status.MAINTAINED) 279 public void publish(@Nonnull String topic, @Nonnull byte[] payload) { 280 try { 281 String fullTopic = baseTopic + topic; 282 mqttClient.publish(fullTopic, payload, qosflag, retained); 283 } catch (MqttException ex) { 284 log.error("Can't publish : ", ex); 285 } 286 } 287 288 /** 289 * Send a message over the existing link to a broker. 290 * @param topic The topic, which follows the channel and precedes the payload in the message 291 * @param payload The payload makes up the final part of the message 292 */ 293 @API(status=API.Status.MAINTAINED) 294 public void publish(@Nonnull String topic, @Nonnull String payload) { 295 publish(topic, payload.getBytes()); 296 } 297 298 public MqttClient getMQttClient() { 299 return (mqttClient); 300 } 301 302 private void tryToReconnect(boolean showLogMessages) { 303 if (showLogMessages) log.warn("Try to reconnect"); 304 try { 305 if ((getOptionState(MQTT_USERNAME_OPTION) != null 306 && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) 307 || ( ! getOptionState("LastWillTopic").isEmpty() 308 && ! getOptionState("LastWillMessage").isEmpty())) { 309 mqttClient.connect(getMqttConnectionOptions()); 310 } else { 311 mqttClient.connect(); 312 } 313 314 if (! getOptionState("LastWillTopic").isEmpty()) { 315 publish(getOptionState("LastWillTopic"), ""); 316 } 317 318 log.warn("Succeeded to reconnect"); 319 320 mqttClient.setCallback(this); 321 Set<String> set = new HashSet<>(mqttEventListeners.keySet()); 322 for (String t : set) { 323 mqttClient.subscribe(t); 324 } 325 } catch (MqttException ex) { 326 if (showLogMessages) log.error("Unable to reconnect", ex); 327 scheduleReconnectTimer(false); 328 } 329 } 330 331 private void scheduleReconnectTimer(boolean showLogMessages) { 332 jmri.util.TimerUtil.scheduleOnLayoutThread(new java.util.TimerTask() { 333 @Override 334 public void run() { 335 tryToReconnect(showLogMessages); 336 } 337 }, 500); 338 } 339 340 @Override 341 @API(status=API.Status.INTERNAL) 342 public void connectionLost(Throwable thrwbl) { 343 log.warn("Lost MQTT broker connection..."); 344 if (this.allowConnectionRecovery) { 345 log.info("...trying to reconnect repeatedly"); 346 scheduleReconnectTimer(true); 347 return; 348 } 349 log.error("Won't reconnect"); 350 } 351 352 @Override 353 @API(status=API.Status.INTERNAL) 354 public void messageArrived(String topic, MqttMessage mm) throws Exception { 355 log.debug("Message received, topic : {} - '{}'", topic, mm); 356 357 boolean found = false; 358 Map<String,ArrayList<MqttEventListener>> tempMap 359 = new HashMap<> (mqttEventListeners); // Avoid ConcurrentModificationException 360 for (Map.Entry<String,ArrayList<MqttEventListener>> e : tempMap.entrySet()) { 361 // does key match received topic, including wildcards? 362 if (MqttTopic.isMatched(e.getKey(), topic) ) { 363 found = true; 364 e.getValue().forEach((mel) -> { 365 try { 366 mel.notifyMqttMessage(topic, mm.toString()); 367 } 368 catch (Exception exception) { 369 log.error("MqttEventListener exception: ", exception); 370 } 371 }); 372 } 373 } 374 375 if (!found) { 376 log.error("No one subscribed to {}", topic); 377 throw new Exception("No subscriber for MQTT topic " + topic); 378 } 379 } 380 381 @Override 382 @API(status=API.Status.INTERNAL) 383 public void deliveryComplete(IMqttDeliveryToken imdt) { 384 log.debug("Message delivered"); 385 } 386 387 388 @Override 389 protected void closeConnection(){ 390 log.debug("Closing MqttAdapter"); 391 try { 392 mqttClient.disconnect(); 393 } 394 catch (Exception exception) { 395 log.error("MqttEventListener exception: ", exception); 396 } 397 398 } 399 400 @Override 401 public void dispose() { 402 log.debug("Disposing MqttAdapter"); 403 closeConnection(); 404 super.dispose(); 405 } 406 407 private final static Logger log = LoggerFactory.getLogger(MqttAdapter.class); 408 409}