001package jmri.jmrix.mqtt;
002
003import java.io.IOException;
004import java.util.*;
005
006import javax.annotation.Nonnull;
007
008import org.apiguardian.api.API;
009import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
010import org.eclipse.paho.client.mqttv3.MqttCallback;
011import org.eclipse.paho.client.mqttv3.MqttClient;
012import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
013import org.eclipse.paho.client.mqttv3.MqttException;
014import org.eclipse.paho.client.mqttv3.MqttMessage;
015import org.eclipse.paho.client.mqttv3.MqttTopic;
016import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
017
018import org.slf4j.Logger;
019import org.slf4j.LoggerFactory;
020
021/**
022 * Communications adapter for Mqtt communications links.
023 *
024 * @author Lionel Jeanson
025 * @author Bob Jacobsen   Copyright (c) 2019, 2023
026 */
027@API(status=API.Status.MAINTAINED)
028public class MqttAdapter extends jmri.jmrix.AbstractNetworkPortController implements MqttCallback {
029
030    private final static String PROTOCOL = "tcp://";
031    private final static String DEFAULT_BASETOPIC = Bundle.getMessage("TopicBase");
032
033    // 0.1 to get it to the front of the list
034    private final static String MQTT_USERNAME_OPTION = "0.1";
035
036    // 0.2 to get it to the front of the list
037    private final static String MQTT_PASSWORD_OPTION = "0.2";
038
039    public boolean retained = true;  // public for script access
040    public int      qosflag = 2;     // public for script access
041
042    /**
043     * Otherwise known as "Channel", this is prepended to the
044     * topic for all JMRI inward and outward communications.
045     * Typically set by preferences at startup.  Changing it
046     * after startup might have no or bad effect.
047     */
048    @API(status=API.Status.MAINTAINED)
049    public String baseTopic = DEFAULT_BASETOPIC;
050
051    HashMap<String, ArrayList<MqttEventListener>> mqttEventListeners = new HashMap<>();
052
053    MqttClient mqttClient;
054
055    @API(status=API.Status.INTERNAL)
056    public MqttAdapter() {
057        super(new MqttSystemConnectionMemo());
058        log.debug("Doing ctor...");
059
060        options.put(MQTT_USERNAME_OPTION, new Option(Bundle.getMessage("MQTT_Username"),
061                new String[]{""},  Option.Type.TEXT));
062
063        options.put(MQTT_PASSWORD_OPTION, new Option(Bundle.getMessage("MQTT_Password"),
064                new String[]{""},  Option.Type.PASSWORD));
065
066        option2Name = "0 MQTTchannel"; // 0 to get it to the front of the list
067        options.put(option2Name, new Option(Bundle.getMessage("NameTopicBase"),
068                                            new String[]{baseTopic}, Option.Type.TEXT));
069
070        options.put("10.3", new Option(Bundle.getMessage("NameTopicTurnoutSend"),
071                new String[]{Bundle.getMessage("TopicTurnoutSend")},  Option.Type.TEXT));
072        options.put("10.5", new Option(Bundle.getMessage("NameTopicTurnoutRcv"),
073                new String[]{Bundle.getMessage("TopicTurnoutRcv")},  Option.Type.TEXT));
074
075
076        options.put("11.3", new Option(Bundle.getMessage("NameTopicSensorSend"),
077                                            new String[]{Bundle.getMessage("TopicSensorSend")},   Option.Type.TEXT));
078        options.put("11.5", new Option(Bundle.getMessage("NameTopicSensorRcv"),
079                                            new String[]{Bundle.getMessage("TopicSensorRcv")},   Option.Type.TEXT));
080
081        options.put("12.3", new Option(Bundle.getMessage("NameTopicLightSend"),
082                                       new String[]{Bundle.getMessage("TopicLightSend")},  Option.Type.TEXT));
083        options.put("12.5", new Option(Bundle.getMessage("NameTopicLightRcv"),
084                                       new String[]{Bundle.getMessage("TopicLightRcv")},  Option.Type.TEXT));
085
086        options.put("13", new Option("Reporter topic :",    new String[]{Bundle.getMessage("TopicReporter")}, Option.Type.TEXT));
087        options.put("14", new Option("Signal Head topic :", new String[]{Bundle.getMessage("TopicSignalHead")}, Option.Type.TEXT));
088        options.put("15", new Option("Signal Mast topic :", new String[]{Bundle.getMessage("TopicSignalMast")}, Option.Type.TEXT));
089        options.put("16.3", new Option(Bundle.getMessage("NameTopicThrottleSend"),
090                                       new String[]{Bundle.getMessage("TopicThrottleSend")},  Option.Type.TEXT));
091        options.put("16.5", new Option(Bundle.getMessage("NameTopicThrottleRcv"),
092                                       new String[]{Bundle.getMessage("TopicThrottleRcv")},  Option.Type.TEXT));
093        options.put("17.3", new Option(Bundle.getMessage("NameTopicDirectionSend"),
094                                       new String[]{Bundle.getMessage("TopicDirectionSend")},  Option.Type.TEXT));
095        options.put("17.5", new Option(Bundle.getMessage("NameTopicDirectionRcv"),
096                                       new String[]{Bundle.getMessage("TopicDirectionRcv")},  Option.Type.TEXT));
097        options.put("18.3", new Option(Bundle.getMessage("NameTopicFunctionSend"),
098                                       new String[]{Bundle.getMessage("TopicFunctionSend")},  Option.Type.TEXT));
099        options.put("18.5", new Option(Bundle.getMessage("NameTopicFunctionRcv"),
100                                       new String[]{Bundle.getMessage("TopicFunctionRcv")},  Option.Type.TEXT));
101        options.put("19.3", new Option(Bundle.getMessage("NameTopicConsistSend"),
102                                       new String[]{Bundle.getMessage("TopicConsistSend")},  Option.Type.TEXT));
103        options.put("20.3", new Option(Bundle.getMessage("NameTopicPowerSend"),
104                                       new String[]{Bundle.getMessage("TopicPowerSend")},  Option.Type.TEXT));
105        options.put("20.5", new Option(Bundle.getMessage("NameTopicPowerRcv"),
106                                       new String[]{Bundle.getMessage("TopicPowerRcv")},  Option.Type.TEXT));
107
108        options.put("LastWillTopic", new Option(Bundle.getMessage("NameTopicLastWill"),
109                    new String[]{Bundle.getMessage("TopicLastWill")}, Option.Type.TEXT));
110        options.put("LastWillMessage", new Option(Bundle.getMessage("NameMessageLastWill"),
111                    new String[]{Bundle.getMessage("MessageLastWill")}, Option.Type.TEXT));
112        allowConnectionRecovery = true;
113
114    }
115
116    public MqttConnectOptions getMqttConnectionOptions() {
117
118        // Setup the MQTT Connection Options
119        MqttConnectOptions mqttConnOpts = new MqttConnectOptions();
120        mqttConnOpts.setCleanSession(true);
121        if ( getOptionState(MQTT_USERNAME_OPTION) != null
122                && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) {
123            mqttConnOpts.setUserName(getOptionState(MQTT_USERNAME_OPTION));
124            mqttConnOpts.setPassword(getOptionState(MQTT_PASSWORD_OPTION).toCharArray());
125        }
126
127        //set Last Will
128        if (! getOptionState("LastWillTopic").isEmpty()
129                && ! getOptionState("LastWillMessage").isEmpty()) {
130            mqttConnOpts.setWill(baseTopic + getOptionState("LastWillTopic"),
131                    getOptionState("LastWillMessage").getBytes(),
132                    qosflag,
133                    true);
134        }
135
136        return mqttConnOpts;
137    }
138
139    @Override
140    @API(status=API.Status.INTERNAL)
141    public void configure() {
142        log.debug("Doing configure...");
143        mqttEventListeners = new HashMap<>();
144        getSystemConnectionMemo().setMqttAdapter(this);
145        getSystemConnectionMemo().configureManagers();
146    }
147
148    @Override
149    @API(status=API.Status.INTERNAL)
150    public void connect() throws IOException {
151        log.info("MQTT starting connect with MQTTchannel = \"{}\"", getOptionState(option2Name));
152
153        try {
154            if ( getOptionState(option2Name)!= null && ! getOptionState(option2Name).trim().isEmpty()) {
155                baseTopic = getOptionState(option2Name);
156            }
157
158            // have to make that a valid choice, overriding the original above. This
159            // is ugly and temporary.
160            if (! DEFAULT_BASETOPIC.equals(baseTopic)) {
161                options.put(option2Name, new Option("MQTT channel: ", new String[]{baseTopic, DEFAULT_BASETOPIC}));
162            }
163
164            // generate a unique client ID based on the network ID and the system prefix of the MQTT connection.
165            String clientID = jmri.InstanceManager.getDefault(jmri.web.server.WebServerPreferences.class).getRailroadName();
166
167            // ensure that only guaranteed valid characters are included in the client ID
168            clientID = clientID.replaceAll("[^A-Za-z0-9]", "");
169
170            String clientIDsuffix = "JMRI" + Integer.toHexString(jmri.util.node.NodeIdentity.networkIdentity().hashCode()) .toUpperCase() + getSystemPrefix();
171
172            // Trim railroad name to fit within MQTT client id 23 character limit.
173            if (clientID.length() > 23 - clientIDsuffix.length())
174                clientID = clientID.substring(0,23 - clientIDsuffix.length());
175
176            clientID = clientID + clientIDsuffix;
177
178            log.info("Connection {} is using a clientID of \"{}\"", getSystemPrefix(), clientID);
179            
180            String tempdirName = jmri.util.FileUtil.getExternalFilename(jmri.util.FileUtil.PROFILE);
181            log.debug("will use {} as temporary directory", tempdirName);
182
183            mqttClient = getNewMqttClient(clientID, tempdirName);
184
185            if ((getOptionState(MQTT_USERNAME_OPTION) != null
186                    && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty())
187                    || ( ! getOptionState("LastWillTopic").isEmpty()
188                    && ! getOptionState("LastWillMessage").isEmpty())) {
189                mqttClient.connect(getMqttConnectionOptions());
190
191            } else {
192                mqttClient.connect();
193            }
194
195            if ( ! getOptionState("LastWillTopic").isEmpty()) {
196                publish(getOptionState("LastWillTopic"), "");
197            }
198
199            mqttClient.setCallback(this);
200
201        } catch (MqttException ex) {
202            throw new IOException("Can't create MQTT client", ex);
203        }
204    }
205
206    MqttClient getNewMqttClient(String clientID, String tempdirName) throws MqttException {
207        return new MqttClient(PROTOCOL + getCurrentPortName(),
208            clientID, new MqttDefaultFilePersistence(tempdirName));
209    }
210
211    @Override
212    @API(status=API.Status.MAINTAINED)
213    public MqttSystemConnectionMemo getSystemConnectionMemo() {
214        return (MqttSystemConnectionMemo) super.getSystemConnectionMemo();
215    }
216
217    @API(status=API.Status.MAINTAINED)
218    public void subscribe(String topic, MqttEventListener mel) {
219        if (mqttEventListeners == null || mqttClient == null) {
220            jmri.util.LoggingUtil.warnOnce(log, "Trying to subscribe before connect/configure is done");
221            return;
222        }
223        try {
224            String fullTopic = baseTopic + topic;
225            if (mqttEventListeners.containsKey(fullTopic)) {
226                if (!mqttEventListeners.get(fullTopic).contains(mel)) {
227                    mqttEventListeners.get(fullTopic).add(mel);
228                }
229                return;
230            }
231            ArrayList<MqttEventListener> mels = new ArrayList<>();
232            mels.add(mel);
233            mqttEventListeners.put(fullTopic, mels);
234            mqttClient.subscribe(fullTopic);
235            log.debug("Subscribed : \"{}\"", fullTopic);
236        } catch (MqttException ex) {
237            log.error("Can't subscribe : ", ex);
238        }
239    }
240
241    @API(status=API.Status.MAINTAINED)
242    public void unsubscribe(String topic, MqttEventListener mel) {
243        String fullTopic = baseTopic + topic;
244        if (mqttEventListeners == null || mqttClient == null) {
245            jmri.util.LoggingUtil.warnOnce(log, "Trying to unsubscribe before connect/configure is done");
246            return;
247        }
248        try {
249            mqttEventListeners.get(fullTopic).remove(mel);
250        } catch (NullPointerException e) {
251            // Not subscribed
252            log.debug("Unsubscribe but not subscribed: \"{}\"", fullTopic);
253            return;
254        }
255        if (mqttEventListeners.get(fullTopic).isEmpty()) {
256            try {
257                mqttClient.unsubscribe(fullTopic);
258                mqttEventListeners.remove(fullTopic);
259                log.debug("Unsubscribed : \"{}\"", fullTopic);
260            } catch (MqttException ex) {
261                log.error("Can't unsubscribe : ", ex);
262            }
263        }
264    }
265
266    @API(status=API.Status.MAINTAINED)
267    public void unsubscribeall(MqttEventListener mel) {
268        mqttEventListeners.keySet().forEach((t) -> {
269            unsubscribe(t, mel);
270        });
271    }
272
273    /**
274     * Send a message over the existing link to a broker.
275     * @param topic The topic, which follows the channel and precedes the payload in the message
276     * @param payload The payload makes up the final part of the message
277     */
278    @API(status=API.Status.MAINTAINED)
279    public void publish(@Nonnull String topic, @Nonnull byte[] payload) {
280        try {
281            String fullTopic = baseTopic + topic;
282            mqttClient.publish(fullTopic, payload, qosflag, retained);
283        } catch (MqttException ex) {
284            log.error("Can't publish : ", ex);
285        }
286    }
287
288    /**
289     * Send a message over the existing link to a broker.
290     * @param topic The topic, which follows the channel and precedes the payload in the message
291     * @param payload The payload makes up the final part of the message
292     */
293    @API(status=API.Status.MAINTAINED)
294    public void publish(@Nonnull String topic, @Nonnull String payload) {
295        publish(topic, payload.getBytes());
296    }
297
298    public MqttClient getMQttClient() {
299        return (mqttClient);
300    }
301
302    private void tryToReconnect(boolean showLogMessages) {
303        if (showLogMessages) log.warn("Try to reconnect");
304        try {
305             if ((getOptionState(MQTT_USERNAME_OPTION) != null
306                    && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty())
307                    || ( ! getOptionState("LastWillTopic").isEmpty()
308                    && ! getOptionState("LastWillMessage").isEmpty())) {
309                mqttClient.connect(getMqttConnectionOptions());
310            } else {
311                mqttClient.connect();
312            }
313
314            if (! getOptionState("LastWillTopic").isEmpty()) {
315                publish(getOptionState("LastWillTopic"), "");
316            }
317
318            log.warn("Succeeded to reconnect");
319
320            mqttClient.setCallback(this);
321            Set<String> set = new HashSet<>(mqttEventListeners.keySet());
322            for (String t : set) {
323                mqttClient.subscribe(t);
324            }
325        } catch (MqttException ex) {
326            if (showLogMessages) log.error("Unable to reconnect", ex);
327            scheduleReconnectTimer(false);
328        }
329    }
330
331    private void scheduleReconnectTimer(boolean showLogMessages) {
332        jmri.util.TimerUtil.scheduleOnLayoutThread(new java.util.TimerTask() {
333            @Override
334            public void run() {
335                tryToReconnect(showLogMessages);
336            }
337        }, 500);
338    }
339
340    @Override
341    @API(status=API.Status.INTERNAL)
342    public void connectionLost(Throwable thrwbl) {
343        log.warn("Lost MQTT broker connection...");
344        if (this.allowConnectionRecovery) {
345            log.info("...trying to reconnect repeatedly");
346            scheduleReconnectTimer(true);
347            return;
348        }
349        log.error("Won't reconnect");
350    }
351
352    @Override
353    @API(status=API.Status.INTERNAL)
354    public void messageArrived(String topic, MqttMessage mm) throws Exception {
355        log.debug("Message received, topic : {} - '{}'", topic, mm);
356
357        boolean found = false;
358        Map<String,ArrayList<MqttEventListener>> tempMap
359            = new HashMap<> (mqttEventListeners); // Avoid ConcurrentModificationException
360        for (Map.Entry<String,ArrayList<MqttEventListener>> e : tempMap.entrySet()) {
361            // does key match received topic, including wildcards?
362            if (MqttTopic.isMatched(e.getKey(), topic) ) {
363                found = true;
364                e.getValue().forEach((mel) -> {
365                    try {
366                        mel.notifyMqttMessage(topic, mm.toString());
367                    }
368                    catch (Exception exception) {
369                        log.error("MqttEventListener exception: ", exception);
370                    }
371                });
372            }
373        }
374
375        if (!found) {
376            log.error("No one subscribed to {}", topic);
377            throw new Exception("No subscriber for MQTT topic " + topic);
378        }
379    }
380
381    @Override
382    @API(status=API.Status.INTERNAL)
383    public void deliveryComplete(IMqttDeliveryToken imdt) {
384        log.debug("Message delivered");
385    }
386
387
388    @Override
389    protected void closeConnection(){
390       log.debug("Closing MqttAdapter");
391        try {
392            mqttClient.disconnect();
393        }
394        catch (Exception exception) {
395            log.error("MqttEventListener exception: ", exception);
396        }
397
398    }
399
400    @Override
401    public void dispose() {
402        log.debug("Disposing MqttAdapter");
403        closeConnection();
404        super.dispose();
405    }
406
407    private final static Logger log = LoggerFactory.getLogger(MqttAdapter.class);
408
409}