001/*******************************************************************************
002 * Copyright (c) 2017 Red Hat Inc and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Red Hat Inc - initial API and implementation
011 *******************************************************************************/
012package org.eclipse.kapua.gateway.client.kura;
013
014import java.nio.ByteBuffer;
015import java.time.Instant;
016import java.util.Map;
017import java.util.Objects;
018
019import org.eclipse.kapua.gateway.client.BinaryPayloadCodec;
020import org.eclipse.kapua.gateway.client.Payload;
021import org.eclipse.kapua.gateway.client.kura.internal.Metrics;
022import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto;
023import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto.KuraPayload;
024import org.eclipse.kapua.gateway.client.utils.Buffers;
025
026public class KuraBinaryPayloadCodec implements BinaryPayloadCodec {
027
028    public static class Builder {
029
030        public KuraBinaryPayloadCodec build() {
031            return new KuraBinaryPayloadCodec();
032        }
033    }
034
035    private KuraBinaryPayloadCodec() {
036    }
037
038    @Override
039    public ByteBuffer encode(final Payload payload, final ByteBuffer buffer) throws Exception {
040
041        Objects.requireNonNull(payload);
042
043        final KuraPayloadProto.KuraPayload.Builder builder = KuraPayload.newBuilder();
044        builder.setTimestamp(payload.getTimestamp().toEpochMilli());
045        Metrics.buildMetrics(builder, payload.getValues());
046
047        final byte[] data = builder.build().toByteArray();
048
049        if (buffer == null) {
050            // create a wrapped buffer
051            return Buffers.wrap(data);
052        } else if (buffer.remaining() < data.length) {
053            // create a new, merged buffer
054            buffer.flip();
055            final ByteBuffer newBuffer = ByteBuffer.allocate(buffer.remaining() + data.length);
056            newBuffer.put(buffer);
057            newBuffer.put(data);
058            return newBuffer;
059        } else {
060            buffer.put(data);
061            return buffer;
062        }
063    }
064
065    @Override
066    public Payload decode(final ByteBuffer buffer) throws Exception {
067        Objects.requireNonNull(buffer);
068
069        final KuraPayload payload = KuraPayload.parseFrom(Buffers.toByteArray(buffer));
070        final Map<String, Object> values = Metrics.extractMetrics(payload);
071        return Payload.of(Instant.ofEpochMilli(payload.getTimestamp()), values);
072    }
073
074}