001/*******************************************************************************
002 * Copyright (c) 2017 Red Hat Inc and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Red Hat Inc - initial API and implementation
011 *******************************************************************************/
012package org.eclipse.kapua.gateway.client.mqtt;
013
014import java.nio.ByteBuffer;
015import java.util.concurrent.CompletionStage;
016import java.util.concurrent.Executor;
017
018import org.eclipse.kapua.gateway.client.ErrorHandler;
019import org.eclipse.kapua.gateway.client.MessageHandler;
020import org.eclipse.kapua.gateway.client.Payload;
021import org.eclipse.kapua.gateway.client.Topic;
022import org.eclipse.kapua.gateway.client.spi.AbstractApplication;
023import org.eclipse.kapua.gateway.client.spi.AbstractData;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027public class MqttApplication extends AbstractApplication {
028
029    private static final Logger logger = LoggerFactory.getLogger(MqttApplication.class);
030
031    private MqttClient client;
032
033    public MqttApplication(final MqttClient client, final String applicationId, final Executor executor) {
034        super(client, applicationId, executor);
035        this.client = client;
036    }
037
038    @Override
039    public AbstractData data(final Topic topic) {
040        return new AbstractData(this, topic);
041    }
042
043    protected void publish(Topic topic, Payload payload) throws Exception {
044        logger.debug("Publishing values - {} -> {}", topic, payload.getValues());
045
046        final ByteBuffer buffer = client.getCodec().encode(payload, null);
047        buffer.flip();
048
049        client.publish(applicationId, topic, buffer);
050    }
051
052    @Override
053    protected CompletionStage<?> internalSubscribe(Topic topic, MessageHandler handler, ErrorHandler<? extends Throwable> errorHandler) throws Exception {
054        return client.subscribe(applicationId, topic, (messageTopic, payload) -> {
055            logger.debug("Received message for: {}", topic);
056            try {
057                MqttApplication.this.handleMessage(handler, payload);
058            } catch (final Exception e) {
059                try {
060                    errorHandler.handleError(e, null);
061                } catch (final Exception e1) {
062                    throw e1;
063                } catch (final Throwable e1) {
064                    throw new Exception(e1);
065                }
066            }
067        });
068    }
069
070    protected void handleMessage(final MessageHandler handler, final ByteBuffer buffer) throws Exception {
071        final Payload payload = client.getCodec().decode(buffer);
072        logger.debug("Received: {}", payload);
073        handler.handleMessage(payload);
074    }
075}