001/******************************************************************************* 002 * Copyright (c) 2017 Red Hat Inc and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Red Hat Inc - initial API and implementation 011 *******************************************************************************/ 012package org.eclipse.kapua.gateway.client.kura; 013 014import java.nio.ByteBuffer; 015import java.time.Instant; 016import java.util.Map; 017import java.util.Objects; 018 019import org.eclipse.kapua.gateway.client.BinaryPayloadCodec; 020import org.eclipse.kapua.gateway.client.Payload; 021import org.eclipse.kapua.gateway.client.kura.internal.Metrics; 022import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto; 023import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto.KuraPayload; 024import org.eclipse.kapua.gateway.client.utils.Buffers; 025 026public class KuraBinaryPayloadCodec implements BinaryPayloadCodec { 027 028 public static class Builder { 029 030 public KuraBinaryPayloadCodec build() { 031 return new KuraBinaryPayloadCodec(); 032 } 033 } 034 035 private KuraBinaryPayloadCodec() { 036 } 037 038 @Override 039 public ByteBuffer encode(final Payload payload, final ByteBuffer buffer) throws Exception { 040 041 Objects.requireNonNull(payload); 042 043 final KuraPayloadProto.KuraPayload.Builder builder = KuraPayload.newBuilder(); 044 builder.setTimestamp(payload.getTimestamp().toEpochMilli()); 045 Metrics.buildMetrics(builder, payload.getValues()); 046 047 final byte[] data = builder.build().toByteArray(); 048 049 if (buffer == null) { 050 // create a wrapped buffer 051 return Buffers.wrap(data); 052 } else if (buffer.remaining() < data.length) { 053 // create a new, merged buffer 054 buffer.flip(); 055 final ByteBuffer newBuffer = ByteBuffer.allocate(buffer.remaining() + data.length); 056 newBuffer.put(buffer); 057 newBuffer.put(data); 058 return newBuffer; 059 } else { 060 buffer.put(data); 061 return buffer; 062 } 063 } 064 065 @Override 066 public Payload decode(final ByteBuffer buffer) throws Exception { 067 Objects.requireNonNull(buffer); 068 069 final KuraPayload payload = KuraPayload.parseFrom(Buffers.toByteArray(buffer)); 070 final Map<String, Object> values = Metrics.extractMetrics(payload); 071 return Payload.of(Instant.ofEpochMilli(payload.getTimestamp()), values); 072 } 073 074}