001/*******************************************************************************
002 * Copyright (c) 2017 Red Hat Inc and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Red Hat Inc - initial API and implementation
011 *******************************************************************************/
012package org.eclipse.kapua.gateway.client.spi;
013
014import static java.util.Objects.requireNonNull;
015
016import java.util.concurrent.CompletionStage;
017
018import org.eclipse.kapua.gateway.client.Data;
019import org.eclipse.kapua.gateway.client.ErrorHandler;
020import org.eclipse.kapua.gateway.client.MessageHandler;
021import org.eclipse.kapua.gateway.client.Payload;
022import org.eclipse.kapua.gateway.client.Topic;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026public class AbstractData implements Data {
027
028    private static final Logger logger = LoggerFactory.getLogger(AbstractData.class);
029
030    private AbstractApplication application;
031    private Topic topic;
032
033    public AbstractData(final AbstractApplication application, final Topic topic) {
034        this.application = application;
035        this.topic = topic;
036    }
037
038    @Override
039    public void send(final Payload payload) throws Exception {
040        this.application.publish(this.topic, payload);
041    }
042
043    @Override
044    public void subscribe(final MessageHandler handler, final ErrorHandler<? extends Throwable> errorHandler) throws Exception {
045        requireNonNull(handler);
046        requireNonNull(errorHandler);
047
048        logger.debug("Setting subscription for: {}", this.topic);
049
050        final CompletionStage<?> future = application.subscribe(this.topic, handler, errorHandler );
051        future.toCompletableFuture().get();
052    }
053}