001/******************************************************************************* 002 * Copyright (c) 2017 Red Hat Inc and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Red Hat Inc - initial API and implementation 011 *******************************************************************************/ 012package org.eclipse.kapua.gateway.client.spi; 013 014import java.util.Collection; 015import java.util.HashMap; 016import java.util.HashSet; 017import java.util.Map; 018import java.util.Objects; 019import java.util.Set; 020import java.util.concurrent.ScheduledExecutorService; 021import java.util.function.Consumer; 022 023import org.eclipse.kapua.gateway.client.Application; 024import org.eclipse.kapua.gateway.client.Client; 025import org.eclipse.kapua.gateway.client.Module; 026import org.eclipse.kapua.gateway.client.ModuleContext; 027import org.eclipse.kapua.gateway.client.Topic; 028import org.eclipse.kapua.gateway.client.Transport; 029import org.eclipse.kapua.gateway.client.utils.TransportAsync; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033public abstract class AbstractClient implements Client { 034 035 private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class); 036 037 public static abstract class Builder<T extends Builder<T>> implements Client.Builder { 038 039 protected abstract T builder(); 040 041 private final Set<Module> modules = new HashSet<>(); 042 043 public T module(final Module module) { 044 Objects.requireNonNull(module); 045 046 this.modules.add(module); 047 return builder(); 048 } 049 050 public Set<Module> modules() { 051 return this.modules; 052 } 053 } 054 055 protected final ScheduledExecutorService executor; 056 private final Set<Module> modules; 057 058 private final TransportAsync transport; 059 060 private final Map<String, AbstractApplication> applications = new HashMap<>(); 061 062 public AbstractClient(final ScheduledExecutorService executor, final Set<Module> modules) { 063 this.executor = executor; 064 this.modules = new HashSet<>(modules); 065 066 this.transport = new TransportAsync(executor); 067 068 fireModuleEvent(module -> module.initialize(new ModuleContext() { 069 070 @Override 071 public Client getClient() { 072 return AbstractClient.this; 073 } 074 })); 075 } 076 077 @Override 078 public Transport transport() { 079 return this.transport; 080 } 081 082 private void fireModuleEvent(final Consumer<Module> consumer) { 083 for (final Module module : this.modules) { 084 try { 085 consumer.accept(module); 086 } catch (final Exception e) { 087 logger.info("Failed to process module event", e); 088 } 089 } 090 } 091 092 protected void notifyAddApplication(final String applicationId) { 093 fireModuleEvent(module -> module.applicationAdded(applicationId)); 094 } 095 096 protected void notifyRemoveApplication(final String applicationId) { 097 fireModuleEvent(module -> module.applicationRemoved(applicationId)); 098 } 099 100 protected void notifyConnected() { 101 fireModuleEvent(Module::connected); 102 this.transport.handleConnected(); 103 } 104 105 protected void notifyDisconnected() { 106 fireModuleEvent(Module::disconnected); 107 this.transport.handleDisconnected(); 108 } 109 110 protected void handleConnected() { 111 logger.debug("Connected"); 112 113 notifyConnected(); 114 synchronized (this) { 115 this.applications.values().stream().forEach(app -> app.handleConnected()); 116 } 117 } 118 119 protected void handleDisconnected() { 120 logger.debug("Disconnected"); 121 122 notifyDisconnected(); 123 synchronized (this) { 124 this.applications.values().stream().forEach(app -> app.handleDisconnected()); 125 } 126 } 127 128 @Override 129 public Application.Builder buildApplication(final String applicationId) { 130 return new Application.Builder() { 131 132 @Override 133 public Application build() { 134 return internalBuildApplication(this, applicationId); 135 } 136 }; 137 } 138 139 protected AbstractApplication internalBuildApplication(final Application.Builder builder, final String applicationId) { 140 synchronized (this) { 141 if (applications.containsKey(applicationId)) { 142 throw new IllegalStateException(String.format("An application with the ID '%s' already exists", applicationId)); 143 } 144 145 final AbstractApplication result = internalCreateApplication(builder, applicationId); 146 147 this.applications.put(applicationId, result); 148 notifyAddApplication(applicationId); 149 150 return result; 151 } 152 } 153 154 protected abstract AbstractApplication internalCreateApplication(final Application.Builder builder, final String applicationId); 155 156 protected abstract void internalUnsubscribe(String applicationId, Collection<Topic> topics) throws Exception; 157 158 protected synchronized void internalCloseApplication(final String applicationId, Set<Topic> topics, final AbstractApplication application) { 159 if (this.applications.remove(applicationId, application)) { 160 try { 161 internalUnsubscribe(applicationId, topics); 162 } catch (Exception e) { 163 logger.warn("Failed to unsubscribe on application close", e); 164 } 165 handleApplicationClosed(applicationId, application); 166 } 167 } 168 169 protected void handleApplicationClosed(final String applicationId, final AbstractApplication application) { 170 notifyRemoveApplication(applicationId); 171 } 172 173}