001/*******************************************************************************
002 * Copyright (c) 2017 Red Hat Inc and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Red Hat Inc - initial API and implementation
011 *******************************************************************************/
012package org.eclipse.kapua.gateway.client;
013
014import java.util.concurrent.Semaphore;
015import java.util.function.Consumer;
016
017/**
018 * A control interface on the underlying client transport
019 * <p>
020 * <b>Note:</b> There is only one set of transport events available for the client.
021 * Setting a new set of transport state listeners will clear the previously set listeners.
022 * </p>
023 */
024public interface Transport {
025
026    public interface TransportEvents {
027
028        public void connected(Runnable runnable);
029
030        public void disconnected(Runnable runnable);
031    }
032
033    /**
034     * Set a state listener
035     * 
036     * <p>
037     * The listener will be called immediately after setting with the
038     * last known state.
039     * </p>
040     * 
041     * @param stateChange
042     *            the listener to transport state changes
043     */
044    public void state(Consumer<Boolean> stateChange);
045
046    /**
047     * This method allows to atomically set a state listener using simple runnable.
048     * 
049     * <p>
050     * This method is intended to be used with Java lambdas where each state change
051     * (connected, disconnected) is mapped to one lambda. However, as the state change
052     * will be initially reported it might happen that the state actually changes between
053     * setting the connect and disconnect handler. This way there would be no way to properly
054     * report the initial state.
055     * </p>
056     * <p>
057     * Setting the event handlers using this methods works by updating
058     * the provided {@link TransportEvents} fields inside the provided consumer. The
059     * consumer will only be called once inside this method. The event listeners will
060     * then be set atomically.
061     * </p>
062     * 
063     * <pre>
064     * client.transport().events( events {@code ->} {
065     *   events.connected ( () {@code ->} System.out.println ("Connected") );
066     *   events.disconnected ( () {@code ->} System.out.println ("Disconnected") ); 
067     * });
068     * </pre>
069     * 
070     * @param events
071     *            code to update the {@link TransportEvents}
072     * 
073     */
074    public default void events(final Consumer<TransportEvents> events) {
075        class TransportEventsImpl implements TransportEvents {
076
077            private Runnable connected;
078            private Runnable disconnected;
079
080            @Override
081            public void connected(final Runnable runnable) {
082                this.connected = runnable;
083            }
084
085            @Override
086            public void disconnected(final Runnable runnable) {
087                this.disconnected = runnable;
088            }
089
090        }
091
092        final TransportEventsImpl impl = new TransportEventsImpl();
093
094        events.accept(impl);
095
096        state(state -> {
097            if (state) {
098                if (impl.connected != null) {
099                    impl.connected.run();
100                }
101            } else {
102                if (impl.disconnected != null) {
103                    impl.disconnected.run();
104                }
105            }
106        });
107    }
108
109    /**
110     * Wait for the connection to be established
111     * <p>
112     * <b>Note:</b> This method will reset the transport listeners.
113     * </p>
114     * 
115     * @param transport
116     *            to wait on
117     * @throws InterruptedException
118     *             if the wait got interrupted
119     */
120    public static void waitForConnection(final Transport transport) throws InterruptedException {
121
122        final Semaphore sem = new Semaphore(0);
123
124        transport.state(state -> {
125            if (state) {
126                sem.release();
127            }
128        });
129
130        sem.acquire();
131    }
132}