001/******************************************************************************* 002 * Copyright (c) 2017 Red Hat Inc and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Red Hat Inc - initial API and implementation 011 *******************************************************************************/ 012package org.eclipse.kapua.gateway.client.mqtt; 013 014import java.net.URI; 015import java.net.URISyntaxException; 016import java.nio.ByteBuffer; 017import java.util.Collection; 018import java.util.Objects; 019import java.util.Set; 020import java.util.concurrent.CompletionStage; 021import java.util.concurrent.ScheduledExecutorService; 022import java.util.stream.Collectors; 023 024import org.eclipse.kapua.gateway.client.Application; 025import org.eclipse.kapua.gateway.client.BinaryPayloadCodec; 026import org.eclipse.kapua.gateway.client.Credentials.UserAndPassword; 027import org.eclipse.kapua.gateway.client.Module; 028import org.eclipse.kapua.gateway.client.Topic; 029import org.eclipse.kapua.gateway.client.spi.AbstractApplication; 030import org.eclipse.kapua.gateway.client.spi.AbstractClient; 031 032public abstract class MqttClient extends AbstractClient { 033 034 public abstract static class Builder<T extends Builder<T>> extends AbstractClient.Builder<T> { 035 036 private MqttNamespace namespace; 037 private BinaryPayloadCodec codec; 038 private UserAndPassword userAndPassword; 039 private String clientId; 040 private URI broker; 041 042 public T codec(final BinaryPayloadCodec codec) { 043 this.codec = codec; 044 return builder(); 045 } 046 047 public BinaryPayloadCodec codec() { 048 return this.codec; 049 } 050 051 public T namespace(final MqttNamespace namespace) { 052 this.namespace = namespace; 053 return builder(); 054 } 055 056 public MqttNamespace namespace() { 057 return this.namespace; 058 } 059 060 public T clientId(final String clientId) { 061 this.clientId = clientId; 062 return builder(); 063 } 064 065 public String clientId() { 066 return this.clientId; 067 } 068 069 public T credentials(final UserAndPassword userAndPassword) { 070 this.userAndPassword = userAndPassword; 071 return builder(); 072 } 073 074 public T broker(final String broker) throws URISyntaxException { 075 Objects.requireNonNull(broker); 076 this.broker = new URI(broker); 077 return builder(); 078 } 079 080 public T broker(final URI broker) throws URISyntaxException { 081 Objects.requireNonNull(broker); 082 this.broker = broker; 083 return builder(); 084 } 085 086 public URI broker() { 087 return this.broker; 088 } 089 090 public Object credentials() { 091 return this.userAndPassword; 092 } 093 } 094 095 private final String clientId; 096 private final BinaryPayloadCodec codec; 097 private MqttNamespace namespace; 098 099 public MqttClient(final ScheduledExecutorService executor, final BinaryPayloadCodec codec, final MqttNamespace namespace, final String clientId, final Set<Module> modules) { 100 super(executor, modules); 101 this.clientId = clientId; 102 this.codec = codec; 103 this.namespace = namespace; 104 } 105 106 protected void publish(String applicationId, Topic topic, ByteBuffer buffer) throws Exception { 107 final String mqttTopic = namespace.dataTopic(clientId, applicationId, topic); 108 publishMqtt(mqttTopic, buffer); 109 } 110 111 public abstract void publishMqtt(String topic, ByteBuffer payload) throws Exception; 112 113 protected abstract CompletionStage<?> subscribeMqtt(String topic, MqttMessageHandler messageHandler) throws Exception; 114 115 protected CompletionStage<?> subscribe(final String applicationId, final Topic topic, final MqttMessageHandler messageHandler) throws Exception { 116 final String mqttTopic = namespace.dataTopic(clientId, applicationId, topic); 117 return subscribeMqtt(mqttTopic, messageHandler); 118 } 119 120 @Override 121 protected void internalUnsubscribe(final String applicationId, final Collection<Topic> topics) throws Exception { 122 Set<String> mqttTopics = topics.stream().map(topic -> namespace.dataTopic(clientId, applicationId, topic)).collect(Collectors.toSet()); 123 unsubscribeMqtt(mqttTopics); 124 } 125 126 protected abstract void unsubscribeMqtt(Set<String> mqttTopics) throws Exception; 127 128 public String getMqttClientId() { 129 return this.clientId; 130 } 131 132 @Override 133 protected AbstractApplication internalCreateApplication(final Application.Builder builder, final String applicationId) { 134 return new MqttApplication(this, applicationId, this.executor); 135 } 136 137 protected BinaryPayloadCodec getCodec() { 138 return this.codec; 139 } 140 141}