Parsing FlySight Log Files

Parsing Pipelines

FlySight V1

pyflysight.flysight_proc.load_flysight

load_flysight(filepath: Path, normalize_gps: bool = False) -> FlysightV1FlightLog

Parse the provided FlySight log into a DataFrame.

FlySight logs are assumed to contain 2 header rows, one for labels and the other for units.

The following derived columns are added to the output DataFrame:

  • elapsed_time
  • groundspeed (m/s)

If normalize_gps is True, the GPS track data is normalized to start at (0, 0)

pyflysight.flysight_proc.batch_load_flysight

batch_load_flysight(
    top_dir: Path, pattern: str = "*.CSV", normalize_gps: bool = False
) -> dict[str, dict[str, FlysightV1FlightLog]]

Batch parse a directory of FlySight logs into a dictionary of DataFrames.

Because the FlySight hardware groups logs by date & the log CSV name does not contain date information, the date is inferred from the log's parent directory name & the output dictionary is of the form {log date: {log_time: DataFrame}}.

Log file discovery is not recursive by default, the pattern kwarg can be adjusted to support a recursive glob.

Warning

File case sensitivity is deferred to the OS; pattern is passed to glob as-is so matches may or may not be case-sensitive.

If normalize_gps is True, the GPS track data is normalized to start at (0, 0)

FlySight V2

pyflysight.flysight_proc.parse_v2_log_directory

parse_v2_log_directory(
    log_directory: Path,
    prefer_processed: bool = False,
    normalize_gps: bool = False,
    sensor_filename: str = "SENSOR.CSV",
    track_filename: str = "TRACK.CSV",
) -> FlysightV2FlightLog

Data parsing pipeline for a directory of FlySight V2 logs.

The FlySight V2 outputs a timestamped (YY-mm-DD/HH-MM-SS/*) directory of files:

  • EVENT.CSV - Debugging output, optionally present based on firmware version
  • RAW.UBX - Raw UBlox sensor output
  • SENSOR.CSV - Onboard sensor data
  • TRACK.CSV - GPS sensor data

When utilizing this pipeline, an elapsed_time_sensor column is added to the track DataFrame, providing a synchronized elapsed time that can be used to align the sensor & track DataFrames.

If prefer_processed is True, if a serialized FlysightV2FlightLog instance is discovered in the target directory it will be loaded rather than parsing the raw data files.

If normalize_gps is True, the GPS track data is normalized to start at (0, 0)

FlySight V1

Objects

pyflysight.flysight_proc.FlysightV1 dataclass

FlysightV1(
    sensor_info: dict[str, SensorInfo], flysight_type: FlysightType = FlysightType.VERSION_1
)

Store device metadata for a corresponding FlySight V1 data logger.

Note

Though it only has one sensor (GPS), column names and units are stored in a "GNSS" column in order to better align with the structure of the FlySight V2 data.

flysight_type class-attribute instance-attribute

flysight_type: FlysightType = VERSION_1

sensor_info instance-attribute

sensor_info: dict[str, SensorInfo]

pyflysight.flysight_proc.FlysightV1FlightLog dataclass

FlysightV1FlightLog(track_data: polars.DataFrame, device_info: FlysightV1)

device_info instance-attribute

device_info: FlysightV1

track_data instance-attribute

track_data: DataFrame

normalize_gps

normalize_gps(start_coord: tuple[float, float] = (0, 0)) -> None

Shift parsed GPS coordinates so they begin at the provided starting location.

FlySight V2

Objects

pyflysight.flysight_proc.SensorInfo

Bases: NamedTuple

Store sensor record column & unit information, assumed to be of equal length.

columns instance-attribute

columns: list[str]

id_ class-attribute instance-attribute

id_: str = ''

units instance-attribute

units: list[str]

pyflysight.flysight_proc.FlysightV2 dataclass

FlysightV2(
    firmware_version: str,
    device_id: str,
    session_id: str,
    sensor_info: dict[str, SensorInfo],
    flysight_type: FlysightType = FlysightType.VERSION_2,
    first_sensor_timestamp: float | None = None,
    ground_pressure_pa: int | float = 101325,
)

Store device metadata for a corresponding FlySight V2 data logger.

Sensor information is provided as a dictionary keyed by a sensor ID, assumed to be shared between the unit information contained in the header and each row of the sensor's records.

first_sensor_timestamp refers to the time value of the first data record & used to calculate the running elapsed_time column during the parsing pipeline. This timestamp value must be set & should be set later by the provided parsing pipeline.

ground_pressure_pa is the atmospheric pressure at ground level, in Pascals, used by some pressure altitude calculations. This defaults to standard day sea level pressure.

device_id instance-attribute

device_id: str

firmware_version instance-attribute

firmware_version: str

first_sensor_timestamp class-attribute instance-attribute

first_sensor_timestamp: float | None = None

flysight_type class-attribute instance-attribute

flysight_type: FlysightType = VERSION_2

ground_pressure_pa class-attribute instance-attribute

ground_pressure_pa: int | float = 101325

sensor_info instance-attribute

sensor_info: dict[str, SensorInfo]

session_id instance-attribute

session_id: str

from_json classmethod

from_json(raw_json: dict[str, t.Any]) -> FlysightV2

Generate a new instance from a raw device data JSON file.

It is assumed that the JSON file is generated by FlysightV2FlightLog.to_csv, only minimal checking is done for JSON validity (top level keys match FlysightV2's fields).

pyflysight.flysight_proc.FlysightV2FlightLog dataclass

FlysightV2FlightLog(
    track_data: polars.DataFrame,
    sensor_data: SensorDataFrames,
    device_info: FlysightV2,
    _is_trimmed: bool = False,
)

device_info instance-attribute

device_info: FlysightV2

sensor_data instance-attribute

sensor_data: SensorDataFrames

track_data instance-attribute

track_data: DataFrame

filter_accel

filter_accel(
    filter_func: abc.Callable[[polars.Series], polars.Series], filter_derived: bool = False
) -> None

Filter the accleration data columns using the specified filter function.

The derived total acceleration column is also recomputed using the filtered component data.

The filtering function is specified as a callable that accepts & returns a polars.Series object (i.e. a data column). Filtered data is saved to a set of new columns with a "_filt" suffix.

If filter_derived is True, the filter function is also applied to the derived total acceleration column.

filter_baro

filter_baro(
    filter_func: abc.Callable[[polars.Series], polars.Series], filter_derived: bool = False
) -> None

Filter the barometric pressure data column using the specified filter function.

The derived pressure altitude columns are also recomputed using the filtered component data.

The filtering function is specified as a callable that accepts & returns a polars.Series object (i.e. a data column). Filtered data is saved to a set of new columns with a "_filt" suffix.

If filter_derived is True, the filter function is also applied to the derived pressure altitude columns.

from_csv classmethod

from_csv(base_dir: Path) -> FlysightV2FlightLog

Generate a new instance from a directory of saved device data.

The specified base directory must contain only one child directory of device data, determined by the presence of a device_info.json file.

For example, given the file structure:

.
├── a/
│   └── b/
│       └── device_data_1
└── c/
    └── d/
        ├── device_data_2
        └── device_data_3

device_data_1 can be located using base_dir as /a/, /a/b/, or /a/b/device_data_1, but device_data_2 and device_data_3 must be located using base_dir as /a/b/device_data_2 and /a/b/device_data_3, respectively.

It is assumed that the data directory is generated by FlysightV2FlightLog.to_csv, minimal error checking is performed prior to attempting to reload the data.

normalize_gps

normalize_gps(start_coord: tuple[float, float] = (0, 0)) -> None

Shift parsed GPS coordinates so they begin at the provided starting location.

to_csv

to_csv(base_dir: Path, normalize_gps: bool = False) -> None

Output logged data to a collection of CSV files relative to the provided base directory.

Sensor data is named by sensor name & nested under base_dir: base_dir/device_id/session_id/*. Note that any existing data in this directory will be overwritten.

If normalize_gps is True, the GPS track data is normalized to start at (0, 0)

trim_log

trim_log(elapsed_start: NUMERIC_T, elapsed_end: NUMERIC_T) -> None

Trim the sensor & track logs to data between the provided start and end elapsed times.

Note

The elapsed time column is re-normalized to the provided trim window.

Helpers

pyflysight.flysight_proc.parse_v2_track_data

parse_v2_track_data(log_filepath: Path) -> tuple[polars.DataFrame, FlysightV2]

Data parsing pipeline for a FlySight V2 track log CSV file.

Sensor data files should come off the FlySight as TRACK.CSV.

pyflysight.flysight_proc.parse_v2_sensor_data

parse_v2_sensor_data(log_filepath: Path) -> tuple[SensorDataFrames, FlysightV2]

Data parsing pipeline for a FlySight V2 sensor log CSV file.

Sensor data files should come off the FlySight as SENSOR.CSV.

pyflysight.flysight_proc.calculate_sync_delta

calculate_sync_delta(track_data: polars.DataFrame, time_sensor: polars.DataFrame) -> float

Calculate the time delta required, in seconds, to align the track & sensor data.

When added to the track data's elapsed time, the resulting elapsed time should align with the elapsed time recorded by the sensor data. Empirical checks seem to show that the sensor data typically begins prior to the first recorded GPS timestamp in the track data, so this value will typically be positive.

The sensor data contains regular time logs, given as (timestamp, GPS time of week, GPS week) (e.g. $TIME,60077.615,316515.000,2311), which can be used to calculate the GPS timestamp of the reading.

Note

I believe, but have not confirmed, that the U-Blox chip already accounts for leap seconds, so the correction is omitted from this calculation.