001package jmri.jmrix.mqtt;
002
003import jmri.DccLocoAddress;
004import jmri.LocoAddress;
005import jmri.SpeedStepMode;
006import jmri.jmrix.AbstractThrottle;
007import org.slf4j.Logger;
008import org.slf4j.LoggerFactory;
009import javax.annotation.Nonnull;
010import java.util.regex.*;
011
012
013/**
014 * An implementation of AbstractThrottle with code specific to a MQTT
015 * connection.
016 *
017 * @author Dean Cording (C) 2023
018 */
019
020
021 public class MqttThrottle extends AbstractThrottle implements MqttEventListener{
022
023    private final MqttAdapter mqttAdapter;
024    @Nonnull
025    public String sendThrottleTopic = "cab/{0}/throttle";
026    @Nonnull
027    public String rcvThrottleTopic ="cab/{0}/throttle";
028    @Nonnull
029    public String sendDirectionTopic = "cab/{0}/direction";
030    @Nonnull
031    public String rcvDirectionTopic = "cab/{0}/direction";
032    @Nonnull
033    public String sendFunctionTopic = "cab/{0}/function/{1}";
034    @Nonnull
035    public String rcvFunctionTopic = "cab/{0}/function/{1}";
036
037    protected int address = -1;
038
039    private Pattern functionPattern;
040
041    private MqttConsistManager consistManager;
042
043   /**
044     * Constructor.
045     * @param memo system connection.
046     */
047
048    public MqttThrottle(MqttSystemConnectionMemo memo) {
049        super(memo);
050        mqttAdapter = memo.getMqttAdapter();
051        consistManager = memo.getConsistManager();
052
053        this.speedStepMode = SpeedStepMode.NMRA_DCC_128;
054
055        this.isForward = true; //loco should default to forward
056        log.debug("MqttThrottle constructor");
057    }
058
059
060
061    public MqttThrottle(MqttSystemConnectionMemo memo, String sendThrottleTopic, String rcvThrottleTopic,
062                    String sendDirectionTopic, String rcvDirectionTopic, String sendFunctionTopic,
063                    String rcvFunctionTopic) {
064        super(memo);
065        mqttAdapter = memo.getMqttAdapter();
066        consistManager = memo.getConsistManager();
067        this.sendThrottleTopic = sendThrottleTopic;
068        this.rcvThrottleTopic = rcvThrottleTopic;
069        this.sendDirectionTopic = sendDirectionTopic;
070        this.rcvDirectionTopic = rcvDirectionTopic;
071        this.sendFunctionTopic = sendFunctionTopic;
072        this.rcvFunctionTopic = rcvFunctionTopic;
073
074        this.speedStepMode = SpeedStepMode.NMRA_DCC_128;
075
076        this.isForward = true; //loco should default to forward
077        log.debug("MqttThrottle constructor");
078    }
079
080    /**
081     * Constructor.
082     * @param memo system connection
083     * @param sendThrottleTopic     MQTT topic for sending speed
084     * @param rcvThrottleTopic      MQTT topic for receiving speed
085     * @param sendDirectionTopic    MQTT topic for sending direction
086     * @param rcvDirectionTopic     MQTT topic for receiving direction
087     * @param sendFunctionTopic     MQTT topic for sending function values
088     * @param rcvFunctionTopic      MQTT topic for receiving function values
089     * @param address loco address to set on throttle
090     */
091    public MqttThrottle(MqttSystemConnectionMemo memo, String sendThrottleTopic, String rcvThrottleTopic,
092        String sendDirectionTopic, String rcvDirectionTopic, String sendFunctionTopic, String rcvFunctionTopic, LocoAddress address) {
093        super(memo);
094        mqttAdapter = memo.getMqttAdapter();
095        consistManager = memo.getConsistManager();
096        this.sendThrottleTopic = sendThrottleTopic;
097        this.rcvThrottleTopic = rcvThrottleTopic;
098        this.sendDirectionTopic = sendDirectionTopic;
099        this.rcvDirectionTopic = rcvDirectionTopic;
100        this.sendFunctionTopic = sendFunctionTopic;
101        this.rcvFunctionTopic = rcvFunctionTopic;
102
103        this.setDccAddress(address.getNumber());
104        this.speedStepMode = SpeedStepMode.NMRA_DCC_128;
105
106        this.isForward = true; //loco should default to forward
107
108        log.debug("MqttThrottle constructor called for address {}", address);
109    }
110
111
112    /**
113     * {@inheritDoc}
114     */
115    @Override
116    public void setSpeedSetting(float speed) {
117
118        super.setSpeedSetting(speed);
119
120        if (speed < 0) {
121            speed = 0;
122            // Send MQTT message
123            jmri.util.ThreadingUtil.runOnLayout(() -> {
124                mqttAdapter.publish(this.sendDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), "STOP");
125            });
126            super.setSpeedSetting(0);
127            log.debug("sent address {} direction {}", address, "STOP");
128        }
129
130        int intSpeed = Math.round(speed * 100);
131        
132        // ensure non-zero input will result in non-zero output
133        if (speed > 0 && intSpeed == 0)
134        {
135            intSpeed = 1;
136        }
137
138        final String stringSpeed = String.valueOf(intSpeed);
139        
140        // Send MQTT message
141        jmri.util.ThreadingUtil.runOnLayout(() -> {
142
143            mqttAdapter.publish(this.sendThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), stringSpeed);
144        });
145        log.debug("sent address {} speed {}", address, intSpeed);
146
147
148    }
149
150    /**
151     * Set the direction
152     *
153     * @param forward true if forward; false otherwise
154     */
155    @Override
156    public void setIsForward(boolean forward) {
157
158        super.setIsForward(forward);
159         // Send MQTT message
160        jmri.util.ThreadingUtil.runOnLayout(() -> {
161            mqttAdapter.publish(this.sendDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), (forward ? "FORWARD" : "REVERSE"));
162        });
163        log.debug("sent address {} direction {}", address, (forward ? "FORWARD" : "REVERSE"));
164
165    }
166
167    /**
168     * {@inheritDoc}
169     */
170    @Override
171    public void sendFunctionGroup(int functionNum, boolean momentary) {
172
173        // Send MQTT message
174        jmri.util.ThreadingUtil.runOnLayoutEventually(() -> {
175            mqttAdapter.publish(this.sendFunctionTopic.replaceFirst("\\{0\\}", String.valueOf(address)).replaceFirst("\\{1\\}",String.valueOf(functionNum)), (getFunction(functionNum) ? "ON" : "OFF"));
176        });
177
178        log.debug("sent address {} function {} {}", address, functionNum, (getFunction(functionNum) ? "ON" : "OFF"));
179
180    }
181
182
183    protected void throttleRelease() {
184
185        active = false;
186
187        // Send blank MQTT message to remove any persistent message
188        jmri.util.ThreadingUtil.runOnLayout(() -> {
189            mqttAdapter.publish(this.sendThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), "");
190            mqttAdapter.publish(this.sendDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), "");
191
192            for (int functionNum = 0; functionNum < getFunctions().length; functionNum++) {
193                mqttAdapter.publish(this.sendFunctionTopic.replaceFirst("\\{0\\}",
194                    String.valueOf(address)).replaceFirst("\\{1\\}",String.valueOf(functionNum)), "");
195            }
196        });
197        consistManager.deactivateConsist(getLocoAddress());
198
199        mqttAdapter.unsubscribe(this.rcvThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this);
200        mqttAdapter.unsubscribe(this.rcvDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this);
201        mqttAdapter.unsubscribe(this.rcvFunctionTopic.replaceFirst("\\{0\\}", String.valueOf(address)).replaceFirst("\\{1\\}", "+"), this);
202
203
204    }
205
206    /**
207     * Dispose when finished with this object. After this, further usage of this
208     * Throttle object will result in a JmriException.
209     *
210     * This is quite problematic, because a using object doesn't know when it's
211     * the last user.
212     */
213    @Override
214    protected void throttleDispose() {
215        log.debug("throttleDispose {}", address);
216
217        finishRecord();
218    }
219
220
221
222    public int setDccAddress(int newaddress) {
223
224        if (address > 0) {
225            // Send blank MQTT message to remove any persistent message
226            jmri.util.ThreadingUtil.runOnLayout(() -> {
227                mqttAdapter.publish(this.sendThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), "");
228                mqttAdapter.publish(this.sendDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), "");
229
230                for (int functionNum = 0; functionNum < getFunctions().length; functionNum++) {
231                    mqttAdapter.publish(this.sendFunctionTopic.replaceFirst("\\{0\\}",
232                        String.valueOf(address)).replaceFirst("\\{1\\}",String.valueOf(functionNum)), "");
233                }
234            });
235
236            mqttAdapter.unsubscribe(this.rcvThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this);
237            mqttAdapter.unsubscribe(this.rcvDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this);
238            mqttAdapter.unsubscribe(this.rcvFunctionTopic.replaceFirst("\\{0\\}", String.valueOf(address)).replaceFirst("\\{1\\}", "+"), this);
239        }
240        address = newaddress;
241
242        mqttAdapter.subscribe(this.rcvThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this);
243        mqttAdapter.subscribe(this.rcvDirectionTopic.replaceFirst("\\{0\\}", String.valueOf(address)), this);
244        mqttAdapter.subscribe(this.rcvFunctionTopic.replaceFirst("\\{0\\}", String.valueOf(address)).replaceFirst("\\{1\\}", "+"), this);
245
246        consistManager.activateConsist(getLocoAddress());
247        setSpeedSetting(0);
248        setIsForward(true);
249
250        functionPattern = Pattern.compile(this.rcvFunctionTopic.replaceFirst("\\{0\\}",
251            String.valueOf(address)).replaceFirst("\\{1\\}", "(\\\\d+)"));
252
253        return address;
254    }
255
256    public int getDccAddress() {
257        return address;
258    }
259
260    @Override
261    public LocoAddress getLocoAddress() {
262        return new DccLocoAddress(address, MqttThrottleManager.isLongAddress(address));
263    }
264
265    @Override
266    public void notifyMqttMessage(String receivedTopic, String message) {
267
268        if (receivedTopic.endsWith(this.rcvThrottleTopic.replaceFirst("\\{0\\}", String.valueOf(address)))) {
269
270            Float speed ;
271
272            try {
273                speed = Math.max(0.0f,Math.min(Float.parseFloat(message)/100.0f,1.0f));
274            }
275            catch (Exception e){
276                if (message.length() != 0) {
277                    log.error("Invalid throttle speed: '{}'", message);
278                }
279                speed = -1.0f;
280            }
281
282            super.setSpeedSetting(speed);
283
284        } else if (receivedTopic.endsWith(this.rcvDirectionTopic.replaceFirst("\\{0\\}",
285                    String.valueOf(address)))) {
286            switch (message) {
287                case "FORWARD":
288                    super.setIsForward(true);
289                    break;
290                case "REVERSE":
291                    super.setIsForward(false);
292                    break;
293                case "STOP":
294                case "":
295                    super.setSpeedSetting(-1);
296                    break;
297                default:
298                    log.error("Invalid message {}", message);
299            }
300        } else {
301
302            Matcher functionMatcher = functionPattern.matcher(receivedTopic);
303            if (functionMatcher.matches()) {
304                updateFunction(Integer.parseInt(functionMatcher.group(1)),(message.equals("ON")));
305            }
306        }
307    }
308
309    // register for notification
310    private final static Logger log = LoggerFactory.getLogger(MqttThrottle.class);
311
312 }