Skip to content

Instantly share code, notes, and snippets.

@diegobernardes
Last active March 13, 2025 09:18
Show Gist options
  • Save diegobernardes/819c3f3a766aa618101b701410cca6ac to your computer and use it in GitHub Desktop.
Save diegobernardes/819c3f3a766aa618101b701410cca6ac to your computer and use it in GitHub Desktop.
defmodule NoSurprise.Protocol.H02 do
@moduledoc """
H02 is a protocol used by many brands. This implementation has been tested on:
- SinoTrack ST-902L
"""
@behaviour NoSurprise.Protocol
import NimbleParsec
header =
string("*")
|> ascii_string([?A..?Z, ?0..?9], min: 1)
|> lookahead(string(","))
|> ignore()
|> label("header")
identifier =
ignore(string(","))
|> ascii_string([?0..?9], min: 1)
|> lookahead(string(","))
|> unwrap_and_tag(:identifier)
|> label("identifier")
message_type =
ignore(string(","))
|> choice([string("V1"), string("V6")])
|> lookahead(string(","))
|> ignore
|> label("message_type")
time =
ignore(string(","))
|> ascii_string([?0..?9], 6)
|> lookahead(string(","))
|> unwrap_and_tag(:time)
|> label("time")
gps_status =
ignore(string(","))
|> choice([string("A"), string("V")])
|> map({:parse_gps_status, []})
|> lookahead(string(","))
|> unwrap_and_tag(:gps_status)
|> label("gps_status")
latitude =
ignore(string(","))
|> ascii_string([?0..?9, ?.], min: 1)
|> ignore(string(","))
|> choice([string("N"), string("S")])
|> reduce({:parse_coordinate, []})
|> lookahead(string(","))
|> unwrap_and_tag(:latitude)
|> label("latitude")
longitude =
ignore(string(","))
|> ascii_string([?0..?9, ?.], min: 1)
|> ignore(string(","))
|> choice([string("E"), string("W")])
|> reduce({:parse_coordinate, []})
|> lookahead(string(","))
|> unwrap_and_tag(:longitude)
|> label("longitude")
speed =
ignore(string(","))
|> ascii_string([?0..?9, ?.], min: 1)
|> map({String, :to_float, []})
|> lookahead(string(","))
|> unwrap_and_tag(:speed)
|> label("speed")
course =
ignore(string(","))
|> integer(min: 1)
|> lookahead(string(","))
|> unwrap_and_tag(:course)
|> label("course")
date =
ignore(string(","))
|> ascii_string([?0..?9], 6)
|> lookahead(string(","))
|> unwrap_and_tag(:date)
|> label("date")
trailer =
ignore(string(","))
|> ignore(repeat(lookahead_not(string("#")) |> utf8_char([])))
|> ignore(string("#"))
|> eos
|> label("trailer")
defparsecp(
:parse_message,
header
|> concat(identifier)
|> concat(message_type)
|> concat(time)
|> concat(gps_status)
|> concat(latitude)
|> concat(longitude)
|> concat(speed)
|> concat(course)
|> concat(date)
|> concat(trailer)
)
@doc """
Parse the message sent by the device. The timestamp is set as UTC given that the tracker does not send any timezone
information. The device should be configured to use the timezone UTC.
"""
@impl NoSurprise.Protocol
def parse(message) do
with {:ok, fields, "", _, _, _} <- parse_message(message),
{:ok, timestamp} <-
parse_timestamp(Keyword.get(fields, :date), Keyword.get(fields, :time)) do
fields =
fields
|> Keyword.delete(:time)
|> Keyword.delete(:date)
|> Keyword.put(:timestamp, timestamp)
message_position = struct(NoSurprise.MessagePosition, fields)
{:ok,
%{
device_identifier: Keyword.get(fields, :identifier),
message_positions: [message_position]
}}
else
{:error, _} ->
{:error, :message_invalid}
{:error, _, _, _, _, _} ->
{:error, :message_invalid}
end
end
defp parse_gps_status(value) do
case value do
"A" -> :valid
"V" -> :invalid
end
end
defp parse_coordinate([coordinate, hemisphere]) do
{degrees, minutes} = split_degrees_minutes(coordinate)
decimal_degrees = degrees + minutes / 60
case hemisphere do
"N" -> decimal_degrees
"E" -> decimal_degrees
"S" -> -decimal_degrees
"W" -> -decimal_degrees
end
end
defp split_degrees_minutes(raw) do
# The parser guarantees that we'll receive a valid float here.
{value, ""} = Float.parse(raw)
degrees = trunc(value / 100)
minutes = value - degrees * 100
{degrees, minutes}
end
defp parse_timestamp(date, time) do
day = String.slice(date, 0, 2)
month = String.slice(date, 2, 2)
year = "20" <> String.slice(date, 4, 2)
hour = String.slice(time, 0, 2)
minute = String.slice(time, 2, 2)
second = String.slice(time, 4, 2)
case DateTime.from_iso8601("#{year}-#{month}-#{day}T#{hour}:#{minute}:#{second}Z") do
{:ok, datetime, _offset} -> {:ok, datetime}
{:error, _} -> {:error, :invalid_timestamp}
end
end
end
defmodule NoSurprise.Protocol.H02Test do
use ExUnit.Case, async: true
alias NoSurprise.Protocol.H02
alias NoSurprise.MessagePosition
@message_position_v1_raw "*HQ,1234567890,V1,082747,A,3843.3267,N,00908.3482,W,089.00,123,010325,FBFFFBFF,111,22,33333,4444444#"
@message_position_v6_raw "*HQ,1234567890,V1,082747,A,3843.3267,N,00908.3482,W,089.00,123,010325,FBFFFBFF,111,22,33333,4444444,8944538532055310118F#"
@result %{
device_identifier: "1234567890",
message_positions: [
%MessagePosition{
gps_status: :valid,
latitude: 38.72211166666667,
longitude: -9.139136666666667,
speed: 89.0,
course: 123,
timestamp: ~U[2025-03-01 08:27:47Z]
}
]
}
test "valid position on message type V1" do
assert {:ok, @result} = H02.parse(@message_position_v1_raw)
end
test "valid position on message type V6" do
assert {:ok, @result} = H02.parse(@message_position_v6_raw)
end
test "missing '*'" do
assert {:error, :message_invalid} =
H02.parse(String.trim_leading(@message_position_v1_raw, "*"))
end
test "missing '#'" do
assert {:error, :message_invalid} =
H02.parse(String.trim_trailing(@message_position_v1_raw, "#"))
end
test "invalid fragments" do
assert {:error, :message_invalid} = H02.parse("*#")
end
test "invalid message type" do
message = String.replace(@message_position_v1_raw, "V1", "V3")
assert {:error, :message_invalid} = H02.parse(message)
end
test "invalid date" do
message =
@message_position_v1_raw
|> String.replace("092747", "999999")
|> String.replace("010325", "999999")
assert {:error, :message_invalid} = H02.parse(message)
end
test "invalid course" do
assert {:error, :message_invalid} =
H02.parse(String.replace(@message_position_v1_raw, "123", "abc"))
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment