NtripClient.java
/* Copyright 2002-2024 CS GROUP
* Licensed to CS GROUP (CS) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* CS licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.orekit.gnss.metric.ntrip;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Authenticator;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.Proxy.Type;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Formatter;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.hipparchus.util.FastMath;
import org.orekit.errors.OrekitException;
import org.orekit.errors.OrekitMessages;
import org.orekit.gnss.metric.messages.ParsedMessage;
/** Source table for ntrip streams retrieval.
* <p>
* Note that all authentication is performed automatically by just
* calling the standard {@link Authenticator#setDefault(Authenticator)}
* method to set up an authenticator.
* </p>
* @author Luc Maisonobe
* @since 11.0
*/
public class NtripClient {
/** Default timeout for connections and reads (ms). */
public static final int DEFAULT_TIMEOUT = 10000;
/** Default port for ntrip communication. */
public static final int DEFAULT_PORT = 2101;
/** Default delay before we reconnect after connection close (s). */
public static final double DEFAULT_RECONNECT_DELAY = 1.0;
/** Default factor by which reconnection delay is multiplied after each attempt. */
public static final double DEFAULT_RECONNECT_DELAY_FACTOR = 1.5;
/** Default maximum number of reconnect a attempts without readin any data. */
public static final int DEFAULT_MAX_RECONNECT = 20;
/** Host header. */
private static final String HOST_HEADER_KEY = "Host";
/** User-agent header key. */
private static final String USER_AGENT_HEADER_KEY = "User-Agent";
/** User-agent header value. */
private static final String USER_AGENT_HEADER_VALUE = "NTRIP orekit/11.0";
/** Version header key. */
private static final String VERSION_HEADER_KEY = "Ntrip-Version";
/** Version header value. */
private static final String VERSION_HEADER_VALUE = "Ntrip/2.0";
/** Connection header key. */
private static final String CONNECTION_HEADER_KEY = "Connection";
/** Connection header value. */
private static final String CONNECTION_HEADER_VALUE = "close";
/** Flags header key. */
private static final String FLAGS_HEADER_KEY = "Ntrip-Flags";
/** Content type for source table. */
private static final String SOURCETABLE_CONTENT_TYPE = "gnss/sourcetable";
/** Degrees to arc minutes conversion factor. */
private static final double DEG_TO_MINUTES = 60.0;
/** Caster host. */
private final String host;
/** Caster port. */
private final int port;
/** Delay before we reconnect after connection close. */
private double reconnectDelay;
/** Multiplication factor for reconnection delay. */
private double reconnectDelayFactor;
/** Max number of reconnections. */
private int maxRetries;
/** Timeout for connections and reads. */
private int timeout;
/** Proxy to use. */
private Proxy proxy;
/** NMEA GGA sentence (may be null). */
private AtomicReference<String> gga;
/** Observers for encoded messages. */
private final List<ObserverHolder> observers;
/** Monitors for data streams. */
private final Map<String, StreamMonitor> monitors;
/** Source table. */
private SourceTable sourceTable;
/** Executor for stream monitoring tasks. */
private ExecutorService executorService;
/** Build a client for NTRIP.
* <p>
* The default configuration uses default timeout, default reconnection
* parameters, no GPS fix and no proxy.
* </p>
* @param host caster host providing the source table
* @param port port to use for connection
* see {@link #DEFAULT_PORT}
*/
public NtripClient(final String host, final int port) {
this.host = host;
this.port = port;
this.observers = new ArrayList<>();
this.monitors = new HashMap<>();
setTimeout(DEFAULT_TIMEOUT);
setReconnectParameters(DEFAULT_RECONNECT_DELAY,
DEFAULT_RECONNECT_DELAY_FACTOR,
DEFAULT_MAX_RECONNECT);
setProxy(Type.DIRECT, null, -1);
this.gga = new AtomicReference<String>(null);
this.sourceTable = null;
this.executorService = null;
}
/** Get the caster host.
* @return caster host
*/
public String getHost() {
return host;
}
/** Get the port to use for connection.
* @return port to use for connection
*/
public int getPort() {
return port;
}
/** Set timeout for connections and reads.
* @param timeout timeout for connections and reads (ms)
*/
public void setTimeout(final int timeout) {
this.timeout = timeout;
}
/** Set Reconnect parameters.
* @param delay delay before we reconnect after connection close
* @param delayFactor factor by which reconnection delay is multiplied after each attempt
* @param max max number of reconnect a attempts without reading any data
*/
public void setReconnectParameters(final double delay,
final double delayFactor,
final int max) {
this.reconnectDelay = delay;
this.reconnectDelayFactor = delayFactor;
this.maxRetries = max;
}
/** Set proxy parameters.
* @param type proxy type
* @param proxyHost host name of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
* @param proxyPort port number of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
*/
public void setProxy(final Proxy.Type type, final String proxyHost, final int proxyPort) {
try {
if (type == Proxy.Type.DIRECT) {
// disable proxy
proxy = Proxy.NO_PROXY;
} else {
// enable proxy
final InetAddress hostAddress = InetAddress.getByName(proxyHost);
final SocketAddress proxyAddress = new InetSocketAddress(hostAddress, proxyPort);
proxy = new Proxy(type, proxyAddress);
}
} catch (UnknownHostException uhe) {
throw new OrekitException(uhe, OrekitMessages.UNKNOWN_HOST, proxyHost);
}
}
/** Get proxy.
* @return proxy to use
*/
public Proxy getProxy() {
return proxy;
}
/** Set GPS fix data to send as NMEA sentence to Ntrip caster if required.
* @param hour hour of the fix (UTC time)
* @param minute minute of the fix (UTC time)
* @param second second of the fix (UTC time)
* @param latitude latitude (radians)
* @param longitude longitude (radians)
* @param ellAltitude altitude above ellipsoid (m)
* @param undulation height of the geoid above ellipsoid (m)
*/
public void setFix(final int hour, final int minute, final double second,
final double latitude, final double longitude, final double ellAltitude,
final double undulation) {
// convert latitude
final double latDeg = FastMath.abs(FastMath.toDegrees(latitude));
final int dLat = (int) FastMath.floor(latDeg);
final double mLat = DEG_TO_MINUTES * (latDeg - dLat);
final char cLat = latitude >= 0.0 ? 'N' : 'S';
// convert longitude
final double lonDeg = FastMath.abs(FastMath.toDegrees(longitude));
final int dLon = (int) FastMath.floor(lonDeg);
final double mLon = DEG_TO_MINUTES * (lonDeg - dLon);
final char cLon = longitude >= 0.0 ? 'E' : 'W';
// build NMEA GGA sentence
final StringBuilder builder = new StringBuilder(82);
try (Formatter formatter = new Formatter(builder, Locale.US)) {
// dummy values
final int fixQuality = 1;
final int nbSat = 4;
final double hdop = 1.0;
// sentence body
formatter.format("$GPGGA,%02d%02d%06.3f,%02d%07.4f,%c,%02d%07.4f,%c,%1d,%02d,%3.1f,%.1f,M,%.1f,M,,",
hour, minute, second,
dLat, mLat, cLat, dLon, mLon, cLon,
fixQuality, nbSat, hdop,
ellAltitude, undulation);
// checksum
byte sum = 0;
for (int i = 1; i < builder.length(); ++i) {
sum ^= builder.charAt(i);
}
formatter.format("*%02X", sum);
}
gga.set(builder.toString());
}
/** Get NMEA GGA sentence.
* @return NMEA GGA sentence (may be null)
*/
String getGGA() {
return gga.get();
}
/** Add an observer for an encoded messages.
* <p>
* If messages of the specified type have already been retrieved from
* a stream, the observer will be immediately notified with the last
* message from each mount point (in unspecified order) as a side effect
* of being added.
* </p>
* @param typeCode code for the message type (if set to 0, notification
* will be triggered regardless of message type)
* @param mountPoint mountPoint from which data must come (if null, notification
* will be triggered regardless of mount point)
* @param observer observer for this message type
*/
public void addObserver(final int typeCode, final String mountPoint,
final MessageObserver observer) {
// store the observer for future monitored mount points
observers.add(new ObserverHolder(typeCode, mountPoint, observer));
// check if we should also add it to already monitored mount points
for (Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
if (mountPoint == null || mountPoint.equals(entry.getKey())) {
entry.getValue().addObserver(typeCode, observer);
}
}
}
/** Get a sourcetable.
* @return source table from the caster
*/
public SourceTable getSourceTable() {
if (sourceTable == null) {
try {
// perform request
final HttpURLConnection connection = connect("");
final int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, "caster");
} else if (responseCode != HttpURLConnection.HTTP_OK) {
throw new OrekitException(OrekitMessages.CONNECTION_ERROR, host, connection.getResponseMessage());
}
// for this request, we MUST get a source table
if (!SOURCETABLE_CONTENT_TYPE.equals(connection.getContentType())) {
throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
}
final SourceTable table = new SourceTable(getHeaderValue(connection, FLAGS_HEADER_KEY));
// parse source table records
try (InputStream is = connection.getInputStream();
InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr)) {
int lineNumber = 0;
for (String line = br.readLine(); line != null; line = br.readLine()) {
++lineNumber;
line = line.trim();
if (line.length() == 0) {
continue;
}
if (line.startsWith(RecordType.CAS.toString())) {
table.addCasterRecord(new CasterRecord(line));
} else if (line.startsWith(RecordType.NET.toString())) {
table.addNetworkRecord(new NetworkRecord(line));
} else if (line.startsWith(RecordType.STR.toString())) {
table.addDataStreamRecord(new DataStreamRecord(line));
} else if (line.startsWith("ENDSOURCETABLE")) {
// we have reached end of table
break;
} else {
throw new OrekitException(OrekitMessages.SOURCETABLE_PARSE_ERROR,
connection.getURL().getHost(), lineNumber, line);
}
}
}
sourceTable = table;
return table;
} catch (IOException | URISyntaxException e) {
throw new OrekitException(e, OrekitMessages.CANNOT_PARSE_SOURCETABLE, host);
}
}
return sourceTable;
}
/** Connect to a mount point and start streaming data from it.
* <p>
* This method sets up an internal dedicated thread for continuously
* monitoring data incoming from a mount point. When new complete
* {@link ParsedMessage parsed messages} becomes available, the
* {@link MessageObserver observers} that have been registered
* using {@link #addObserver(int, String, MessageObserver) addObserver()}
* method will be notified about the message.
* </p>
* <p>
* This method must be called once for each stream to monitor.
* </p>
* @param mountPoint mount point providing the stream
* @param type messages type of the mount point
* @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
* @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
*/
public void startStreaming(final String mountPoint, final org.orekit.gnss.metric.ntrip.Type type,
final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes) {
if (executorService == null) {
// lazy creation of executor service, with one thread for each possible data stream
executorService = Executors.newFixedThreadPool(getSourceTable().getDataStreams().size());
}
// safety check
if (monitors.containsKey(mountPoint)) {
throw new OrekitException(OrekitMessages.MOUNPOINT_ALREADY_CONNECTED, mountPoint);
}
// create the monitor
final StreamMonitor monitor = new StreamMonitor(this, mountPoint, type, requiresNMEA, ignoreUnknownMessageTypes,
reconnectDelay, reconnectDelayFactor, maxRetries);
monitors.put(mountPoint, monitor);
// set up the already known observers
for (final ObserverHolder observerHolder : observers) {
if (observerHolder.mountPoint == null ||
observerHolder.mountPoint.equals(mountPoint)) {
monitor.addObserver(observerHolder.typeCode, observerHolder.observer);
}
}
// start streaming data
executorService.execute(monitor);
}
/** Check if any of the streaming thread has thrown an exception.
* <p>
* If a streaming thread has thrown an exception, it will be rethrown here
* </p>
*/
public void checkException() {
// check if any of the stream got an exception
for (final Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
final OrekitException exception = entry.getValue().getException();
if (exception != null) {
throw exception;
}
}
}
/** Stop streaming data from all connected mount points.
* <p>
* If an exception was encountered during data streaming, it will be rethrown here
* </p>
* @param time timeout for waiting underlying threads termination (ms)
*/
public void stopStreaming(final int time) {
// ask all monitors to stop retrieving data
for (final Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
entry.getValue().stopMonitoring();
}
try {
// wait for proper ending
executorService.shutdown();
executorService.awaitTermination(time, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
// Restore interrupted state...
Thread.currentThread().interrupt();
}
checkException();
}
/** Connect to caster.
* @param mountPoint mount point (empty for getting sourcetable)
* @return performed connection
* @throws IOException if an I/O exception occurs during connection
* @throws URISyntaxException if the built URI is invalid
*/
HttpURLConnection connect(final String mountPoint)
throws IOException, URISyntaxException {
// set up connection
final String scheme = "http";
final URL casterURL = new URI(scheme, null, host, port, "/" + mountPoint, null, null).toURL();
final HttpURLConnection connection = (HttpURLConnection) casterURL.openConnection(proxy);
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
// common headers
connection.setRequestProperty(HOST_HEADER_KEY, host);
connection.setRequestProperty(VERSION_HEADER_KEY, VERSION_HEADER_VALUE);
connection.setRequestProperty(USER_AGENT_HEADER_KEY, USER_AGENT_HEADER_VALUE);
connection.setRequestProperty(CONNECTION_HEADER_KEY, CONNECTION_HEADER_VALUE);
return connection;
}
/** Get an header from a response.
* @param connection connection to analyze
* @param key header key
* @return header value
*/
private String getHeaderValue(final URLConnection connection, final String key) {
final String value = connection.getHeaderField(key);
if (value == null) {
throw new OrekitException(OrekitMessages.MISSING_HEADER,
connection.getURL().getHost(), key);
}
return value;
}
/** Local holder for observers. */
private static class ObserverHolder {
/** Code for the message type. */
private final int typeCode;
/** Mount point. */
private final String mountPoint;
/** Observer to notify. */
private final MessageObserver observer;
/** Simple constructor.
* @param typeCode code for the message type
* @param mountPoint mountPoint from which data must come (if null, notification
* will be triggered regardless of mount point)
* @param observer observer for this message type
*/
ObserverHolder(final int typeCode, final String mountPoint,
final MessageObserver observer) {
this.typeCode = typeCode;
this.mountPoint = mountPoint;
this.observer = observer;
}
}
}