001/*******************************************************************************
002 * Copyright (c) 2017 Red Hat Inc and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Red Hat Inc - initial API and implementation
011 *******************************************************************************/
012package org.eclipse.kapua.gateway.client.mqtt;
013
014import java.net.URI;
015import java.net.URISyntaxException;
016import java.nio.ByteBuffer;
017import java.util.Collection;
018import java.util.Objects;
019import java.util.Set;
020import java.util.concurrent.CompletionStage;
021import java.util.concurrent.ScheduledExecutorService;
022import java.util.stream.Collectors;
023
024import org.eclipse.kapua.gateway.client.Application;
025import org.eclipse.kapua.gateway.client.BinaryPayloadCodec;
026import org.eclipse.kapua.gateway.client.Credentials.UserAndPassword;
027import org.eclipse.kapua.gateway.client.Module;
028import org.eclipse.kapua.gateway.client.Topic;
029import org.eclipse.kapua.gateway.client.spi.AbstractApplication;
030import org.eclipse.kapua.gateway.client.spi.AbstractClient;
031
032public abstract class MqttClient extends AbstractClient {
033
034    public abstract static class Builder<T extends Builder<T>> extends AbstractClient.Builder<T> {
035
036        private MqttNamespace namespace;
037        private BinaryPayloadCodec codec;
038        private UserAndPassword userAndPassword;
039        private String clientId;
040        private URI broker;
041
042        public T codec(final BinaryPayloadCodec codec) {
043            this.codec = codec;
044            return builder();
045        }
046
047        public BinaryPayloadCodec codec() {
048            return this.codec;
049        }
050
051        public T namespace(final MqttNamespace namespace) {
052            this.namespace = namespace;
053            return builder();
054        }
055
056        public MqttNamespace namespace() {
057            return this.namespace;
058        }
059
060        public T clientId(final String clientId) {
061            this.clientId = clientId;
062            return builder();
063        }
064
065        public String clientId() {
066            return this.clientId;
067        }
068
069        public T credentials(final UserAndPassword userAndPassword) {
070            this.userAndPassword = userAndPassword;
071            return builder();
072        }
073
074        public T broker(final String broker) throws URISyntaxException {
075            Objects.requireNonNull(broker);
076            this.broker = new URI(broker);
077            return builder();
078        }
079        
080        public T broker(final URI broker) throws URISyntaxException {
081            Objects.requireNonNull(broker);
082            this.broker = broker;
083            return builder();
084        }
085
086        public URI broker() {
087            return this.broker;
088        }
089
090        public Object credentials() {
091            return this.userAndPassword;
092        }
093    }
094
095    private final String clientId;
096    private final BinaryPayloadCodec codec;
097    private MqttNamespace namespace;
098
099    public MqttClient(final ScheduledExecutorService executor, final BinaryPayloadCodec codec, final MqttNamespace namespace, final String clientId, final Set<Module> modules) {
100        super(executor, modules);
101        this.clientId = clientId;
102        this.codec = codec;
103        this.namespace = namespace;
104    }
105
106    protected void publish(String applicationId, Topic topic, ByteBuffer buffer) throws Exception {
107        final String mqttTopic = namespace.dataTopic(clientId, applicationId, topic);
108        publishMqtt(mqttTopic, buffer);
109    }
110
111    public abstract void publishMqtt(String topic, ByteBuffer payload) throws Exception;
112
113    protected abstract CompletionStage<?> subscribeMqtt(String topic, MqttMessageHandler messageHandler) throws Exception;
114
115    protected CompletionStage<?> subscribe(final String applicationId, final Topic topic, final MqttMessageHandler messageHandler) throws Exception {
116        final String mqttTopic = namespace.dataTopic(clientId, applicationId, topic);
117        return subscribeMqtt(mqttTopic, messageHandler);
118    }
119
120    @Override
121    protected void internalUnsubscribe(final String applicationId, final Collection<Topic> topics) throws Exception {
122        Set<String> mqttTopics = topics.stream().map(topic -> namespace.dataTopic(clientId, applicationId, topic)).collect(Collectors.toSet());
123        unsubscribeMqtt(mqttTopics);
124    }
125
126    protected abstract void unsubscribeMqtt(Set<String> mqttTopics) throws Exception;
127
128    public String getMqttClientId() {
129        return this.clientId;
130    }
131
132    @Override
133    protected AbstractApplication internalCreateApplication(final Application.Builder builder, final String applicationId) {
134        return new MqttApplication(this, applicationId, this.executor);
135    }
136
137    protected BinaryPayloadCodec getCodec() {
138        return this.codec;
139    }
140
141}