001/*******************************************************************************
002 * Copyright (c) 2017 Red Hat Inc and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Red Hat Inc - initial API and implementation
011 *******************************************************************************/
012package org.eclipse.kapua.gateway.client.mqtt.fuse;
013
014import static java.util.Objects.requireNonNull;
015import static org.eclipse.kapua.gateway.client.utils.Strings.nonEmptyText;
016
017import java.net.URI;
018import java.nio.ByteBuffer;
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.List;
022import java.util.Map;
023import java.util.Set;
024import java.util.concurrent.CompletableFuture;
025import java.util.concurrent.CompletionStage;
026import java.util.concurrent.Executors;
027import java.util.concurrent.ScheduledExecutorService;
028
029import org.eclipse.kapua.gateway.client.BinaryPayloadCodec;
030import org.eclipse.kapua.gateway.client.Credentials.UserAndPassword;
031import org.eclipse.kapua.gateway.client.Module;
032import org.eclipse.kapua.gateway.client.mqtt.MqttClient;
033import org.eclipse.kapua.gateway.client.mqtt.MqttMessageHandler;
034import org.eclipse.kapua.gateway.client.mqtt.MqttNamespace;
035import org.eclipse.kapua.gateway.client.mqtt.fuse.internal.Callbacks;
036import org.fusesource.hawtbuf.Buffer;
037import org.fusesource.hawtbuf.UTF8Buffer;
038import org.fusesource.mqtt.client.Callback;
039import org.fusesource.mqtt.client.CallbackConnection;
040import org.fusesource.mqtt.client.ExtendedListener;
041import org.fusesource.mqtt.client.MQTT;
042import org.fusesource.mqtt.client.Promise;
043import org.fusesource.mqtt.client.QoS;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047public class FuseClient extends MqttClient {
048
049    private static final Logger logger = LoggerFactory.getLogger(FuseClient.class);
050
051    public static class Builder extends MqttClient.Builder<Builder> {
052
053        @Override
054        protected Builder builder() {
055            return this;
056        }
057
058        @Override
059        public FuseClient build() throws Exception {
060
061            final URI broker = requireNonNull(broker(), "Broker must be set");
062            final String clientId = nonEmptyText(clientId(), "clientId");
063
064            final MqttNamespace namespace = requireNonNull(namespace(), "Namespace must be set");
065            final BinaryPayloadCodec codec = requireNonNull(codec(), "Codec must be set");
066
067            final MQTT mqtt = new MQTT();
068            mqtt.setCleanSession(false);
069            mqtt.setHost(broker);
070            mqtt.setClientId(clientId);
071
072            final Object credentials = credentials();
073            if (credentials == null) {
074                // none
075            } else if (credentials instanceof UserAndPassword) {
076                final UserAndPassword userAndPassword = (UserAndPassword) credentials;
077                mqtt.setUserName(userAndPassword.getUsername());
078                mqtt.setPassword(userAndPassword.getPasswordAsString());
079            } else {
080                throw new IllegalStateException(
081                        String.format("Unknown credentials type: %s", credentials.getClass().getName()));
082            }
083
084            CallbackConnection connection = mqtt.callbackConnection();
085            ScheduledExecutorService executor = createExecutor(clientId);
086            try {
087                final FuseClient result = new FuseClient(modules(), clientId, executor, namespace, codec, connection);
088                connection = null;
089                executor = null;
090                return result;
091            } finally {
092                if (executor != null) {
093                    executor.shutdown();
094                }
095            }
096        }
097    }
098
099    private static ScheduledExecutorService createExecutor(final String clientId) {
100        return Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, clientId));
101    }
102
103    private ExtendedListener listener = new ExtendedListener() {
104
105        @Override
106        public void onPublish(final UTF8Buffer topic, final Buffer body, final Runnable ack) {
107            onPublish(topic, body, new Callback<Callback<Void>>() {
108
109                @Override
110                public void onSuccess(Callback<Void> value) {
111                    ack.run();
112                }
113
114                @Override
115                public void onFailure(Throwable value) {
116                }
117
118            });
119        }
120
121        @Override
122        public void onFailure(Throwable value) {
123        }
124
125        @Override
126        public void onDisconnected() {
127            handleDisconnected();
128        }
129
130        @Override
131        public void onConnected() {
132            handleConnected();
133        }
134
135        @Override
136        public void onPublish(final UTF8Buffer topic, final Buffer body, final Callback<Callback<Void>> ack) {
137            handleMessageArrived(topic.toString(), body, ack);
138        }
139    };
140
141    private final CallbackConnection connection;
142
143    private final Map<String, MqttMessageHandler> subscriptions = new HashMap<>();
144
145    private FuseClient(final Set<Module> modules, final String clientId, final ScheduledExecutorService executor,
146            final MqttNamespace namespace, final BinaryPayloadCodec codec, final CallbackConnection connection) {
147
148        super(executor, codec, namespace, clientId, modules);
149
150        this.connection = connection;
151
152        connection.listener(this.listener);
153        connection.connect(new Promise<>());
154    }
155
156    @Override
157    public void close() {
158        connection.disconnect(null);
159        executor.shutdown();
160    }
161
162    @Override
163    public void publishMqtt(final String topic, final ByteBuffer payload) {
164        this.connection.publish(Buffer.utf8(topic), new Buffer(payload), QoS.AT_LEAST_ONCE, false, null);
165    }
166
167    @Override
168    protected CompletionStage<?> subscribeMqtt(final String topic, final MqttMessageHandler messageHandler) {
169        synchronized (this) {
170            this.subscriptions.put(topic, messageHandler);
171
172            final CompletableFuture<byte[]> future = new CompletableFuture<>();
173            connection.subscribe(
174                    new org.fusesource.mqtt.client.Topic[] {
175                            new org.fusesource.mqtt.client.Topic(topic, QoS.AT_LEAST_ONCE) },
176                    Callbacks.asCallback(future));
177
178            return future;
179        }
180    }
181
182    @Override
183    protected void unsubscribeMqtt(final Set<String> mqttTopics) {
184
185        logger.info("Unsubscribe from: {}", mqttTopics);
186
187        final List<UTF8Buffer> topics = new ArrayList<>(mqttTopics.size());
188
189        synchronized (this) {
190            for (final String topic : mqttTopics) {
191                if (subscriptions.remove(topic) != null) {
192                    topics.add(new UTF8Buffer(topic));
193                }
194            }
195        }
196
197        connection.unsubscribe(topics.toArray(new UTF8Buffer[topics.size()]), new Promise<>());
198    }
199
200    protected void handleMessageArrived(final String topic, final Buffer payload, final Callback<Callback<Void>> ack) {
201        final MqttMessageHandler handler;
202
203        synchronized (this) {
204            handler = this.subscriptions.get(topic);
205        }
206
207        if (handler != null) {
208            try {
209                handler.handleMessage(topic, payload.toByteBuffer());
210                ack.onSuccess(null);
211            } catch (Exception e) {
212                ack.onFailure(e);
213            }
214        }
215    }
216
217}