001/*******************************************************************************
002 * Copyright (c) 2017 Red Hat Inc and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Red Hat Inc - initial API and implementation
011 *******************************************************************************/
012package org.eclipse.kapua.gateway.client.spi;
013
014import java.util.HashSet;
015import java.util.Set;
016import java.util.concurrent.CompletionStage;
017import java.util.concurrent.Executor;
018
019import org.eclipse.kapua.gateway.client.Application;
020import org.eclipse.kapua.gateway.client.ErrorHandler;
021import org.eclipse.kapua.gateway.client.MessageHandler;
022import org.eclipse.kapua.gateway.client.Payload;
023import org.eclipse.kapua.gateway.client.Topic;
024import org.eclipse.kapua.gateway.client.Transport;
025import org.eclipse.kapua.gateway.client.utils.TransportAsync;
026
027public abstract class AbstractApplication implements Application {
028
029    private final AbstractClient client;
030    protected final Set<Topic> subscriptions = new HashSet<>();
031    protected final String applicationId;
032    protected final TransportAsync transport;
033    private boolean closed;
034
035    public AbstractApplication(final AbstractClient client, final String applicationId, final Executor executor) {
036        this.client = client;
037        this.applicationId = applicationId;
038        this.transport = new TransportAsync(executor);
039    }
040
041    protected synchronized void handleConnected() {
042        if (closed) {
043            return;
044        }
045        this.transport.handleConnected();
046    }
047
048    protected synchronized void handleDisconnected() {
049        if (closed) {
050            return;
051        }
052        this.transport.handleDisconnected();
053    }
054
055    protected void checkClosed() {
056        if (closed) {
057            throw new IllegalStateException("Application is closed");
058        }
059    }
060
061    @Override
062    public synchronized Transport transport() {
063        checkClosed();
064        return this.transport;
065    }
066
067    @Override
068    public abstract AbstractData data(Topic topic);
069
070    @Override
071    public void close() throws Exception {
072        synchronized (this) {
073            if (closed) {
074                return;
075            }
076            closed = true;
077        }
078
079        client.internalCloseApplication(applicationId, subscriptions, this);
080    }
081
082    protected abstract void publish(Topic topic, Payload payload) throws Exception;
083
084    public CompletionStage<?> subscribe(Topic topic, MessageHandler handler, ErrorHandler<? extends Throwable> errorHandler) throws Exception {
085        recordSubscription(topic);
086        return internalSubscribe(topic, handler, errorHandler);
087    }
088
089    private void recordSubscription(final Topic topic) {
090        subscriptions.add(topic);
091    }
092
093    protected abstract CompletionStage<?> internalSubscribe(Topic topic, MessageHandler handler, ErrorHandler<? extends Throwable> errorHandler) throws Exception;
094}