001/******************************************************************************* 002 * Copyright (c) 2017 Red Hat Inc and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Red Hat Inc - initial API and implementation 011 *******************************************************************************/ 012package org.eclipse.kapua.gateway.client.spi; 013 014import static java.util.Objects.requireNonNull; 015 016import java.util.concurrent.CompletionStage; 017 018import org.eclipse.kapua.gateway.client.Data; 019import org.eclipse.kapua.gateway.client.ErrorHandler; 020import org.eclipse.kapua.gateway.client.MessageHandler; 021import org.eclipse.kapua.gateway.client.Payload; 022import org.eclipse.kapua.gateway.client.Topic; 023import org.slf4j.Logger; 024import org.slf4j.LoggerFactory; 025 026public class AbstractData implements Data { 027 028 private static final Logger logger = LoggerFactory.getLogger(AbstractData.class); 029 030 private AbstractApplication application; 031 private Topic topic; 032 033 public AbstractData(final AbstractApplication application, final Topic topic) { 034 this.application = application; 035 this.topic = topic; 036 } 037 038 @Override 039 public void send(final Payload payload) throws Exception { 040 this.application.publish(this.topic, payload); 041 } 042 043 @Override 044 public void subscribe(final MessageHandler handler, final ErrorHandler<? extends Throwable> errorHandler) throws Exception { 045 requireNonNull(handler); 046 requireNonNull(errorHandler); 047 048 logger.debug("Setting subscription for: {}", this.topic); 049 050 final CompletionStage<?> future = application.subscribe(this.topic, handler, errorHandler ); 051 future.toCompletableFuture().get(); 052 } 053}