001/*******************************************************************************
002 * Copyright (c) 2017 Red Hat Inc and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Red Hat Inc - initial API and implementation
011 *******************************************************************************/
012package org.eclipse.kapua.gateway.client.spi;
013
014import java.util.Collection;
015import java.util.HashMap;
016import java.util.HashSet;
017import java.util.Map;
018import java.util.Objects;
019import java.util.Set;
020import java.util.concurrent.ScheduledExecutorService;
021import java.util.function.Consumer;
022
023import org.eclipse.kapua.gateway.client.Application;
024import org.eclipse.kapua.gateway.client.Client;
025import org.eclipse.kapua.gateway.client.Module;
026import org.eclipse.kapua.gateway.client.ModuleContext;
027import org.eclipse.kapua.gateway.client.Topic;
028import org.eclipse.kapua.gateway.client.Transport;
029import org.eclipse.kapua.gateway.client.utils.TransportAsync;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033public abstract class AbstractClient implements Client {
034
035    private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
036
037    public static abstract class Builder<T extends Builder<T>> implements Client.Builder {
038
039        protected abstract T builder();
040
041        private final Set<Module> modules = new HashSet<>();
042
043        public T module(final Module module) {
044            Objects.requireNonNull(module);
045
046            this.modules.add(module);
047            return builder();
048        }
049
050        public Set<Module> modules() {
051            return this.modules;
052        }
053    }
054
055    protected final ScheduledExecutorService executor;
056    private final Set<Module> modules;
057
058    private final TransportAsync transport;
059
060    private final Map<String, AbstractApplication> applications = new HashMap<>();
061
062    public AbstractClient(final ScheduledExecutorService executor, final Set<Module> modules) {
063        this.executor = executor;
064        this.modules = new HashSet<>(modules);
065
066        this.transport = new TransportAsync(executor);
067
068        fireModuleEvent(module -> module.initialize(new ModuleContext() {
069
070            @Override
071            public Client getClient() {
072                return AbstractClient.this;
073            }
074        }));
075    }
076
077    @Override
078    public Transport transport() {
079        return this.transport;
080    }
081
082    private void fireModuleEvent(final Consumer<Module> consumer) {
083        for (final Module module : this.modules) {
084            try {
085                consumer.accept(module);
086            } catch (final Exception e) {
087                logger.info("Failed to process module event", e);
088            }
089        }
090    }
091
092    protected void notifyAddApplication(final String applicationId) {
093        fireModuleEvent(module -> module.applicationAdded(applicationId));
094    }
095
096    protected void notifyRemoveApplication(final String applicationId) {
097        fireModuleEvent(module -> module.applicationRemoved(applicationId));
098    }
099
100    protected void notifyConnected() {
101        fireModuleEvent(Module::connected);
102        this.transport.handleConnected();
103    }
104
105    protected void notifyDisconnected() {
106        fireModuleEvent(Module::disconnected);
107        this.transport.handleDisconnected();
108    }
109
110    protected void handleConnected() {
111        logger.debug("Connected");
112
113        notifyConnected();
114        synchronized (this) {
115            this.applications.values().stream().forEach(app -> app.handleConnected());
116        }
117    }
118
119    protected void handleDisconnected() {
120        logger.debug("Disconnected");
121
122        notifyDisconnected();
123        synchronized (this) {
124            this.applications.values().stream().forEach(app -> app.handleDisconnected());
125        }
126    }
127
128    @Override
129    public Application.Builder buildApplication(final String applicationId) {
130        return new Application.Builder() {
131
132            @Override
133            public Application build() {
134                return internalBuildApplication(this, applicationId);
135            }
136        };
137    }
138
139    protected AbstractApplication internalBuildApplication(final Application.Builder builder, final String applicationId) {
140        synchronized (this) {
141            if (applications.containsKey(applicationId)) {
142                throw new IllegalStateException(String.format("An application with the ID '%s' already exists", applicationId));
143            }
144
145            final AbstractApplication result = internalCreateApplication(builder, applicationId);
146
147            this.applications.put(applicationId, result);
148            notifyAddApplication(applicationId);
149
150            return result;
151        }
152    }
153
154    protected abstract AbstractApplication internalCreateApplication(final Application.Builder builder, final String applicationId);
155
156    protected abstract void internalUnsubscribe(String applicationId, Collection<Topic> topics) throws Exception;
157
158    protected synchronized void internalCloseApplication(final String applicationId, Set<Topic> topics, final AbstractApplication application) {
159        if (this.applications.remove(applicationId, application)) {
160            try {
161                internalUnsubscribe(applicationId, topics);
162            } catch (Exception e) {
163                logger.warn("Failed to unsubscribe on application close", e);
164            }
165            handleApplicationClosed(applicationId, application);
166        }
167    }
168
169    protected void handleApplicationClosed(final String applicationId, final AbstractApplication application) {
170        notifyRemoveApplication(applicationId);
171    }
172
173}