001/******************************************************************************* 002 * Copyright (c) 2017 Red Hat Inc and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Red Hat Inc - initial API and implementation 011 *******************************************************************************/ 012package org.eclipse.kapua.gateway.client.kura; 013 014import java.nio.ByteBuffer; 015import java.util.Collection; 016import java.util.Collections; 017import java.util.HashMap; 018import java.util.HashSet; 019import java.util.Map; 020import java.util.Objects; 021import java.util.Set; 022import java.util.TreeSet; 023 024import org.eclipse.kapua.gateway.client.Client; 025import org.eclipse.kapua.gateway.client.Module; 026import org.eclipse.kapua.gateway.client.ModuleContext; 027import org.eclipse.kapua.gateway.client.kura.internal.Metrics; 028import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto.KuraPayload; 029import org.eclipse.kapua.gateway.client.mqtt.MqttClient; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033public class KuraBirthCertificateModule implements Module { 034 035 private static final Logger logger = LoggerFactory.getLogger(KuraBirthCertificateModule.class); 036 037 @FunctionalInterface 038 public interface Provider { 039 040 public void provideData(Map<String, String> values); 041 042 public static final Provider JVM = new Provider() { 043 044 @Override 045 public void provideData(final Map<String, String> values) { 046 values.put("jvm_name", System.getProperty("java.vm.name")); 047 values.put("jvm_version", System.getProperty("java.version")); 048 049 values.put("os", System.getProperty("os.name")); 050 values.put("os_version", System.getProperty("os.version")); 051 values.put("os_arch", System.getProperty("os.arch")); 052 } 053 054 }; 055 056 public static final Provider RUNTIME = new Provider() { 057 058 @Override 059 public void provideData(final Map<String, String> values) { 060 values.put("available_processors", Integer.toString(Runtime.getRuntime().availableProcessors())); 061 values.put("total_memory", Long.toString(Runtime.getRuntime().totalMemory())); 062 } 063 064 }; 065 } 066 067 public static class Builder { 068 069 private final String accountName; 070 071 private final Set<Provider> providers = new HashSet<>(); 072 073 private Builder(final String accountName) { 074 this.accountName = accountName; 075 } 076 077 public Builder defaultProviders() { 078 this.providers.add(Provider.JVM); 079 this.providers.add(Provider.RUNTIME); 080 return this; 081 } 082 083 public Builder provider(final Provider provider) { 084 Objects.requireNonNull(provider); 085 this.providers.add(provider); 086 return this; 087 } 088 089 public Builder providers(final Collection<Provider> providers) { 090 Objects.requireNonNull(providers); 091 this.providers.addAll(providers); 092 return this; 093 } 094 095 public Set<Provider> providers() { 096 return Collections.unmodifiableSet(this.providers); 097 } 098 099 public KuraBirthCertificateModule build() { 100 return new KuraBirthCertificateModule(this.accountName, providers()); 101 } 102 } 103 104 public static Builder newBuilder(final String accountName) { 105 return new Builder(accountName); 106 } 107 108 private final Set<String> applications = new TreeSet<>(); 109 110 private MqttClient client; 111 112 private final String accountName; 113 114 private final Set<Provider> providers; 115 116 private KuraBirthCertificateModule(final String accountName, final Set<Provider> providers) { 117 this.accountName = accountName; 118 this.providers = new HashSet<>(providers); 119 } 120 121 @Override 122 public void applicationAdded(final String applicationId) { 123 logger.info("Application added: {}", applicationId); 124 if (this.applications.add(applicationId)) { 125 sendBirthCertificate(); 126 } 127 } 128 129 @Override 130 public void applicationRemoved(final String applicationId) { 131 logger.info("Application removed: {}", applicationId); 132 if (this.applications.remove(applicationId)) { 133 sendBirthCertificate(); 134 } 135 } 136 137 @Override 138 public void connected() { 139 sendBirthCertificate(); 140 } 141 142 @Override 143 public void initialize(final ModuleContext context) { 144 final Client client = context.getClient(); 145 if (!(client instanceof MqttClient)) { 146 throw new IllegalStateException(String.format("%s can only be used with an %s based instance", KuraBirthCertificateModule.class.getSimpleName(), MqttClient.class.getName())); 147 } 148 this.client = (MqttClient) client; 149 } 150 151 private void sendBirthCertificate() { 152 logger.debug("Sending birth certificate"); 153 154 final Map<String, String> values = new HashMap<>(); 155 156 for (final Provider provider : this.providers) { 157 provider.provideData(values); 158 } 159 160 values.put("application_ids", String.join(",", this.applications)); 161 162 // build payload 163 164 final KuraPayload.Builder builder = KuraPayload.newBuilder(); 165 Metrics.buildMetrics(builder, values); 166 final ByteBuffer buffer = ByteBuffer.wrap(builder.build().toByteArray()); 167 168 // publish MQTT payload 169 170 final String clientId = this.client.getMqttClientId(); 171 172 try { 173 this.client.publishMqtt(String.format("$EDC/%s/%s/MQTT/BIRTH", this.accountName, clientId), buffer); 174 } catch (final Exception e) { 175 logger.warn("Failed to publish birth certificate", e); 176 } 177 } 178 179}