001/******************************************************************************* 002 * Copyright (c) 2017 Red Hat Inc and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Red Hat Inc - initial API and implementation 011 *******************************************************************************/ 012package org.eclipse.kapua.gateway.client.mqtt; 013 014import java.nio.ByteBuffer; 015import java.util.concurrent.CompletionStage; 016import java.util.concurrent.Executor; 017 018import org.eclipse.kapua.gateway.client.ErrorHandler; 019import org.eclipse.kapua.gateway.client.MessageHandler; 020import org.eclipse.kapua.gateway.client.Payload; 021import org.eclipse.kapua.gateway.client.Topic; 022import org.eclipse.kapua.gateway.client.spi.AbstractApplication; 023import org.eclipse.kapua.gateway.client.spi.AbstractData; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027public class MqttApplication extends AbstractApplication { 028 029 private static final Logger logger = LoggerFactory.getLogger(MqttApplication.class); 030 031 private MqttClient client; 032 033 public MqttApplication(final MqttClient client, final String applicationId, final Executor executor) { 034 super(client, applicationId, executor); 035 this.client = client; 036 } 037 038 @Override 039 public AbstractData data(final Topic topic) { 040 return new AbstractData(this, topic); 041 } 042 043 protected void publish(Topic topic, Payload payload) throws Exception { 044 logger.debug("Publishing values - {} -> {}", topic, payload.getValues()); 045 046 final ByteBuffer buffer = client.getCodec().encode(payload, null); 047 buffer.flip(); 048 049 client.publish(applicationId, topic, buffer); 050 } 051 052 @Override 053 protected CompletionStage<?> internalSubscribe(Topic topic, MessageHandler handler, ErrorHandler<? extends Throwable> errorHandler) throws Exception { 054 return client.subscribe(applicationId, topic, (messageTopic, payload) -> { 055 logger.debug("Received message for: {}", topic); 056 try { 057 MqttApplication.this.handleMessage(handler, payload); 058 } catch (final Exception e) { 059 try { 060 errorHandler.handleError(e, null); 061 } catch (final Exception e1) { 062 throw e1; 063 } catch (final Throwable e1) { 064 throw new Exception(e1); 065 } 066 } 067 }); 068 } 069 070 protected void handleMessage(final MessageHandler handler, final ByteBuffer buffer) throws Exception { 071 final Payload payload = client.getCodec().decode(buffer); 072 logger.debug("Received: {}", payload); 073 handler.handleMessage(payload); 074 } 075}