001/******************************************************************************* 002 * Copyright (c) 2017 Red Hat Inc and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Red Hat Inc - initial API and implementation 011 *******************************************************************************/ 012package org.eclipse.kapua.gateway.client.mqtt.fuse; 013 014import static java.util.Objects.requireNonNull; 015import static org.eclipse.kapua.gateway.client.utils.Strings.nonEmptyText; 016 017import java.net.URI; 018import java.nio.ByteBuffer; 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.List; 022import java.util.Map; 023import java.util.Set; 024import java.util.concurrent.CompletableFuture; 025import java.util.concurrent.CompletionStage; 026import java.util.concurrent.Executors; 027import java.util.concurrent.ScheduledExecutorService; 028 029import org.eclipse.kapua.gateway.client.BinaryPayloadCodec; 030import org.eclipse.kapua.gateway.client.Credentials.UserAndPassword; 031import org.eclipse.kapua.gateway.client.Module; 032import org.eclipse.kapua.gateway.client.mqtt.MqttClient; 033import org.eclipse.kapua.gateway.client.mqtt.MqttMessageHandler; 034import org.eclipse.kapua.gateway.client.mqtt.MqttNamespace; 035import org.eclipse.kapua.gateway.client.mqtt.fuse.internal.Callbacks; 036import org.fusesource.hawtbuf.Buffer; 037import org.fusesource.hawtbuf.UTF8Buffer; 038import org.fusesource.mqtt.client.Callback; 039import org.fusesource.mqtt.client.CallbackConnection; 040import org.fusesource.mqtt.client.ExtendedListener; 041import org.fusesource.mqtt.client.MQTT; 042import org.fusesource.mqtt.client.Promise; 043import org.fusesource.mqtt.client.QoS; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047public class FuseClient extends MqttClient { 048 049 private static final Logger logger = LoggerFactory.getLogger(FuseClient.class); 050 051 public static class Builder extends MqttClient.Builder<Builder> { 052 053 @Override 054 protected Builder builder() { 055 return this; 056 } 057 058 @Override 059 public FuseClient build() throws Exception { 060 061 final URI broker = requireNonNull(broker(), "Broker must be set"); 062 final String clientId = nonEmptyText(clientId(), "clientId"); 063 064 final MqttNamespace namespace = requireNonNull(namespace(), "Namespace must be set"); 065 final BinaryPayloadCodec codec = requireNonNull(codec(), "Codec must be set"); 066 067 final MQTT mqtt = new MQTT(); 068 mqtt.setCleanSession(false); 069 mqtt.setHost(broker); 070 mqtt.setClientId(clientId); 071 072 final Object credentials = credentials(); 073 if (credentials == null) { 074 // none 075 } else if (credentials instanceof UserAndPassword) { 076 final UserAndPassword userAndPassword = (UserAndPassword) credentials; 077 mqtt.setUserName(userAndPassword.getUsername()); 078 mqtt.setPassword(userAndPassword.getPasswordAsString()); 079 } else { 080 throw new IllegalStateException( 081 String.format("Unknown credentials type: %s", credentials.getClass().getName())); 082 } 083 084 CallbackConnection connection = mqtt.callbackConnection(); 085 ScheduledExecutorService executor = createExecutor(clientId); 086 try { 087 final FuseClient result = new FuseClient(modules(), clientId, executor, namespace, codec, connection); 088 connection = null; 089 executor = null; 090 return result; 091 } finally { 092 if (executor != null) { 093 executor.shutdown(); 094 } 095 } 096 } 097 } 098 099 private static ScheduledExecutorService createExecutor(final String clientId) { 100 return Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, clientId)); 101 } 102 103 private ExtendedListener listener = new ExtendedListener() { 104 105 @Override 106 public void onPublish(final UTF8Buffer topic, final Buffer body, final Runnable ack) { 107 onPublish(topic, body, new Callback<Callback<Void>>() { 108 109 @Override 110 public void onSuccess(Callback<Void> value) { 111 ack.run(); 112 } 113 114 @Override 115 public void onFailure(Throwable value) { 116 } 117 118 }); 119 } 120 121 @Override 122 public void onFailure(Throwable value) { 123 } 124 125 @Override 126 public void onDisconnected() { 127 handleDisconnected(); 128 } 129 130 @Override 131 public void onConnected() { 132 handleConnected(); 133 } 134 135 @Override 136 public void onPublish(final UTF8Buffer topic, final Buffer body, final Callback<Callback<Void>> ack) { 137 handleMessageArrived(topic.toString(), body, ack); 138 } 139 }; 140 141 private final CallbackConnection connection; 142 143 private final Map<String, MqttMessageHandler> subscriptions = new HashMap<>(); 144 145 private FuseClient(final Set<Module> modules, final String clientId, final ScheduledExecutorService executor, 146 final MqttNamespace namespace, final BinaryPayloadCodec codec, final CallbackConnection connection) { 147 148 super(executor, codec, namespace, clientId, modules); 149 150 this.connection = connection; 151 152 connection.listener(this.listener); 153 connection.connect(new Promise<>()); 154 } 155 156 @Override 157 public void close() { 158 connection.disconnect(null); 159 executor.shutdown(); 160 } 161 162 @Override 163 public void publishMqtt(final String topic, final ByteBuffer payload) { 164 this.connection.publish(Buffer.utf8(topic), new Buffer(payload), QoS.AT_LEAST_ONCE, false, null); 165 } 166 167 @Override 168 protected CompletionStage<?> subscribeMqtt(final String topic, final MqttMessageHandler messageHandler) { 169 synchronized (this) { 170 this.subscriptions.put(topic, messageHandler); 171 172 final CompletableFuture<byte[]> future = new CompletableFuture<>(); 173 connection.subscribe( 174 new org.fusesource.mqtt.client.Topic[] { 175 new org.fusesource.mqtt.client.Topic(topic, QoS.AT_LEAST_ONCE) }, 176 Callbacks.asCallback(future)); 177 178 return future; 179 } 180 } 181 182 @Override 183 protected void unsubscribeMqtt(final Set<String> mqttTopics) { 184 185 logger.info("Unsubscribe from: {}", mqttTopics); 186 187 final List<UTF8Buffer> topics = new ArrayList<>(mqttTopics.size()); 188 189 synchronized (this) { 190 for (final String topic : mqttTopics) { 191 if (subscriptions.remove(topic) != null) { 192 topics.add(new UTF8Buffer(topic)); 193 } 194 } 195 } 196 197 connection.unsubscribe(topics.toArray(new UTF8Buffer[topics.size()]), new Promise<>()); 198 } 199 200 protected void handleMessageArrived(final String topic, final Buffer payload, final Callback<Callback<Void>> ack) { 201 final MqttMessageHandler handler; 202 203 synchronized (this) { 204 handler = this.subscriptions.get(topic); 205 } 206 207 if (handler != null) { 208 try { 209 handler.handleMessage(topic, payload.toByteBuffer()); 210 ack.onSuccess(null); 211 } catch (Exception e) { 212 ack.onFailure(e); 213 } 214 } 215 } 216 217}