001/******************************************************************************* 002 * Copyright (c) 2017 Red Hat Inc and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Red Hat Inc - initial API and implementation 011 *******************************************************************************/ 012package org.eclipse.kapua.gateway.client.spi; 013 014import java.util.HashSet; 015import java.util.Set; 016import java.util.concurrent.CompletionStage; 017import java.util.concurrent.Executor; 018 019import org.eclipse.kapua.gateway.client.Application; 020import org.eclipse.kapua.gateway.client.ErrorHandler; 021import org.eclipse.kapua.gateway.client.MessageHandler; 022import org.eclipse.kapua.gateway.client.Payload; 023import org.eclipse.kapua.gateway.client.Topic; 024import org.eclipse.kapua.gateway.client.Transport; 025import org.eclipse.kapua.gateway.client.utils.TransportAsync; 026 027public abstract class AbstractApplication implements Application { 028 029 private final AbstractClient client; 030 protected final Set<Topic> subscriptions = new HashSet<>(); 031 protected final String applicationId; 032 protected final TransportAsync transport; 033 private boolean closed; 034 035 public AbstractApplication(final AbstractClient client, final String applicationId, final Executor executor) { 036 this.client = client; 037 this.applicationId = applicationId; 038 this.transport = new TransportAsync(executor); 039 } 040 041 protected synchronized void handleConnected() { 042 if (closed) { 043 return; 044 } 045 this.transport.handleConnected(); 046 } 047 048 protected synchronized void handleDisconnected() { 049 if (closed) { 050 return; 051 } 052 this.transport.handleDisconnected(); 053 } 054 055 protected void checkClosed() { 056 if (closed) { 057 throw new IllegalStateException("Application is closed"); 058 } 059 } 060 061 @Override 062 public synchronized Transport transport() { 063 checkClosed(); 064 return this.transport; 065 } 066 067 @Override 068 public abstract AbstractData data(Topic topic); 069 070 @Override 071 public void close() throws Exception { 072 synchronized (this) { 073 if (closed) { 074 return; 075 } 076 closed = true; 077 } 078 079 client.internalCloseApplication(applicationId, subscriptions, this); 080 } 081 082 protected abstract void publish(Topic topic, Payload payload) throws Exception; 083 084 public CompletionStage<?> subscribe(Topic topic, MessageHandler handler, ErrorHandler<? extends Throwable> errorHandler) throws Exception { 085 recordSubscription(topic); 086 return internalSubscribe(topic, handler, errorHandler); 087 } 088 089 private void recordSubscription(final Topic topic) { 090 subscriptions.add(topic); 091 } 092 093 protected abstract CompletionStage<?> internalSubscribe(Topic topic, MessageHandler handler, ErrorHandler<? extends Throwable> errorHandler) throws Exception; 094}