G: Control de dispositivos serie (o USB-serie): GPS

Para ejecutar este notebook necesitaremos instalar algunos paquetes (y poseer el hardware del que se habla!):

> conda install -c conda-forge tabulate

> conda install -c anaconda pyserial

> conda install -c anaconda pyaudio

Usaremos la biblioteca pySerial (documentación) para leer de un modulo receptor de GPS USB como el de la imagen. Dichos aparatos, que se pueden comprar muy baratos (menos de 10€).

_images/moduloGPS.jpg

Un puerto serie se maneja de manera muy similar a como funcionan los archivos normales. Podemos leer de ellos y escribir en ellos.

En un sistema UNIX este tipo de dispositivos crean en el sistema de archivos un fichero con una ruta similar a /dev/ttyUSBx o /dev/ttyACMx donde x puede variar. Por tanto una vez conectado el dispositivo podemos listar los ficheros de dicho directorio para ver si aparece ese puerto:

[1]:
ls /dev/tty*
/dev/tty    /dev/tty23  /dev/tty39  /dev/tty54      /dev/ttyS1   /dev/ttyS25
/dev/tty0   /dev/tty24  /dev/tty4   /dev/tty55      /dev/ttyS10  /dev/ttyS26
/dev/tty1   /dev/tty25  /dev/tty40  /dev/tty56      /dev/ttyS11  /dev/ttyS27
/dev/tty10  /dev/tty26  /dev/tty41  /dev/tty57      /dev/ttyS12  /dev/ttyS28
/dev/tty11  /dev/tty27  /dev/tty42  /dev/tty58      /dev/ttyS13  /dev/ttyS29
/dev/tty12  /dev/tty28  /dev/tty43  /dev/tty59      /dev/ttyS14  /dev/ttyS3
/dev/tty13  /dev/tty29  /dev/tty44  /dev/tty6       /dev/ttyS15  /dev/ttyS30
/dev/tty14  /dev/tty3   /dev/tty45  /dev/tty60      /dev/ttyS16  /dev/ttyS31
/dev/tty15  /dev/tty30  /dev/tty46  /dev/tty61      /dev/ttyS17  /dev/ttyS4
/dev/tty16  /dev/tty31  /dev/tty47  /dev/tty62      /dev/ttyS18  /dev/ttyS5
/dev/tty17  /dev/tty32  /dev/tty48  /dev/tty63      /dev/ttyS19  /dev/ttyS6
/dev/tty18  /dev/tty33  /dev/tty49  /dev/tty7       /dev/ttyS2   /dev/ttyS7
/dev/tty19  /dev/tty34  /dev/tty5   /dev/tty8       /dev/ttyS20  /dev/ttyS8
/dev/tty2   /dev/tty35  /dev/tty50  /dev/tty9       /dev/ttyS21  /dev/ttyS9
/dev/tty20  /dev/tty36  /dev/tty51  /dev/ttyACM0    /dev/ttyS22
/dev/tty21  /dev/tty37  /dev/tty52  /dev/ttyprintk  /dev/ttyS23
/dev/tty22  /dev/tty38  /dev/tty53  /dev/ttyS0      /dev/ttyS24

Estos dispositivos usualmente usan el protocolo NMEA. Podemos encontrar una descripción más detallada del mismo en este PDF.

Lo más básico que podemos hacer es abrir el puerto serie e ir leyendo los mensajes que nos envía el receptor:

[2]:
pathSerie = '/dev/ttyACM0'
[3]:
import serial
from IPython.display import display, clear_output, HTML

ser = serial.Serial(pathSerie, 9600)    # Abrimos el puerto a una velocidad de 9600 baudios (típico en estos dispositivos)

try:
    while True:                                  # Bucle infinito. Seguramente querrás pararlo en algún momento
        data = ser.readline()

        if data:
            print(data)
except KeyboardInterrupt:
    print("Fin ejecución")
b'$GPRMC,231249.00,V,,,,,,,,,,N*72\r\n'
b'$GPVTG,,,,,,,,,N*30\r\n'
b'$GPGGA,231249.00,,,,,0,00,99.99,,,,,,*69\r\n'
b'$GPGSA,A,1,,,,,,,,,,,,,99.99,99.99,99.99*30\r\n'
b'$GPGSV,2,1,05,02,,,36,03,,,31,06,,,37,11,,,42*7D\r\n'
b'$GPGSV,2,2,05,20,,,33*7E\r\n'
b'$GPGLL,,,,,231249.00,V,N*45\r\n'
b'$GPTXT,01,01,02,u-blox ag - www.u-blox.com*50\r\n'
b'$GPTXT,01,01,02,HW  UBX-G70xx   00070000 FF7FFFFFo*69\r\n'
b'$GPTXT,01,01,02,ROM CORE 1.00 (59842) Jun 27 2012 17:43:52*59\r\n'
b'$GPTXT,01,01,02,PROTVER 14.00*1E\r\n'
b'$GPTXT,01,01,02,ANTSUPERV=AC SD PDoS SR*20\r\n'
b'$GPTXT,01,01,02,ANTSTATUS=OK*3B\r\n'
b'$GPTXT,01,01,02,LLC FFFFFFFF-FFFFFFFF-FFFFFFFF-FFFFFFFF-FFFFFFFD*2C\r\n'
b'$GPRMC,231250.00,V,,,,,,,,,,N*7A\r\n'
b'$GPVTG,,,,,,,,,N*30\r\n'
b'$GPGGA,231250.00,,,,,0,00,99.99,,,,,,*61\r\n'
b'$GPGSA,A,1,,,,,,,,,,,,,99.99,99.99,99.99*30\r\n'
b'$GPGSV,2,1,05,02,,,36,03,,,30,06,,,37,11,,,42*7C\r\n'
b'$GPGSV,2,2,05,20,,,34*79\r\n'
b'$GPGLL,,,,,231250.00,V,N*4D\r\n'
b'$GPRMC,231251.00,V,,,,,,,,,,N*7B\r\n'
b'$GPVTG,,,,,,,,,N*30\r\n'
b'$GPGGA,231251.00,,,,,0,00,99.99,,,,,,*60\r\n'
b'$GPGSA,A,1,,,,,,,,,,,,,99.99,99.99,99.99*30\r\n'
b'$GPGSV,2,1,05,02,,,36,03,,,30,06,,,37,11,,,42*7C\r\n'
b'$GPGSV,2,2,05,20,,,34*79\r\n'
b'$GPGLL,,,,,231251.00,V,N*4C\r\n'
Fin ejecución

Vamos a crear unas pocas clases y funciones para mejorar la salida de información:

[4]:
import tabulate
import serial
from IPython import display
import time

lastGPRMC = ""    # Último mensaje de tipo GPRMC codificado como tabla bonita
lastGPGSV = ""    # Último mensaje de tipo GPSVC codificado como tabla bonita
lastDate = ""     # Última fecha recibida del GPS
lastTime = ""     # Última hora recibida del GPS

class printer(str):       # Clase auxiliar para imprimir las tablas de datos
    def __repr__(self):
        return self

def parseNMEA(line):                # Analiza una línea de mensaje del protocolo NMEA
    if line.startswith("$GPRMC"):
        parseNMEA_RMC(line)

    if line.startswith("$GPGSV"):
        parseNMEA_GSV(line)

def parseNMEA_RMC(line):            # Analiza una línea NMEA de tipo RMC y almacena su información en una tabla bonita.
                                    # Tb actualiza la última hora y fecha recibidas
    global lastGPRMC
    global lastDate
    global lastTime

    split = line.split(",")

    tableD = [["RMC", "UTC Time", "Latitude", "Longitude", "Date"],
             ["", split[1], split[3] + split[4], split[5] + split[6], split[9]]]

    table = tabulate.tabulate(tableD, tablefmt='grid')

    lastTime = split[1]
    lastDate = split[9]

    lastGPRMC = table

def parseNMEA_GSV(line):    # Analiza una línea NMEA de tipo GSV y almacena su información en una tabla bonita.
    global lastGPGSV

    split = line.split(",")

    tableD = [
               ["GSV", "Satellites in view"],
               ["", split[3]]
             ]

    table = tabulate.tabulate(tableD, tablefmt='grid')

    lastGPGSV = table

def imprimeDatos():        # Borra las últimas tablas de datos mostradas y pinta los datos actualizados
    global lastGPRMC
    global lastGPGSV

    display.clear_output()
    display.display(printer(lastGPRMC))
    display.display(printer(lastGPGSV))


def leeGPS():              # Lee del puerto serie los datos del GPS y los analiza.
    try:
        ser = serial.Serial(pathSerie, 9600)

        lastUpdate = time.time() - 1

        while True:
            data = ser.readline()

            if data:
                data = data.decode('ascii')
                parseNMEA(data)

                currentTime = time.time()

                if (currentTime - lastUpdate > 0.9):  # Si ha pasado más de un segundo desde la última impresión, imprime de nuevo
                    imprimeDatos()

                    lastUpdate = currentTime

    except KeyboardInterrupt:
        print("Fin ejecución")
[5]:
leeGPS()
+-----+-----------+-------------+--------------+--------+
| RMC | UTC Time  | Latitude    | Longitude    | Date   |
+-----+-----------+-------------+--------------+--------+
|     | 231432.00 | 3709.83273N | 00336.07721W | 290422 |
+-----+-----------+-------------+--------------+--------+
+-----+--------------------+
| GSV | Satellites in view |
+-----+--------------------+
|     | 07                 |
+-----+--------------------+
Fin ejecución

Produciendo sonidos para codificar la fecha y hora

Añadimos unas funciones más para reproducir unos sonidos que codifican la fecha y la hora recibida del GPS estilo «R2D2». Solo suena en los minutos «en punto» (segundos == 0).

[6]:
from IPython.display import Audio
import numpy as np

notes = np.arange(1, 12) * 440.0


def getNote(digito, volume = .1, sampling_rate=44100, duration = 0.1):  # Fabrica los samples de una nota determinada (0-10)
    global notes

    digito = int(digito)

    f = notes[digito]
    samples = volume * (np.sin(2*np.pi*np.arange(sampling_rate*duration)*f/sampling_rate)).astype(np.float32)

    return samples

def getNotes(datetime, volume):   # Genera los samples de una fecha y hora, cifra a cifra. Empieza por la nota 10 para marcar el comienzo
    samples = getNote(10, volume)

    for i in datetime:
        if (type(samples) is np.ndarray):
            samples = np.concatenate((samples, getNote(i, volume)))

    samples[-1] = 1.0    # Hack para evitar el que notebook normalice el volumen

    return samples

def playDateTime(date, time, volume=0.3):   # Reproduce los sonidos para codificar la fecha y hora
    time = time[:6]

    datetime = date + time

    samples = getNotes(datetime, volume)

    display.display(Audio(samples, rate=44100, autoplay=True))

lastSound = 0

def imprimeDatos():    # Sobreescribimos la función que pinta las tablas para que cada minuto en punto suene la codificación de la hora
    global lastGPRMC
    global lastGPGSV
    global lastTime
    global lastDate
    global lastSound

    currentTime = time.time()

    if currentTime - lastSound > 1.9:   # Evitamos que se borre la salida si estan sonando los pitidos
        display.clear_output()
        display.display(printer(lastGPRMC))
        display.display(printer(lastGPGSV))

        if lastTime[4:6] == "00":          # En los minutos en punto pita
            playDateTime(lastDate, lastTime, volume = 1)
            lastSound = currentTime

[7]:
leeGPS()
+-----+-----------+-------------+--------------+--------+
| RMC | UTC Time  | Latitude    | Longitude    | Date   |
+-----+-----------+-------------+--------------+--------+
|     | 231443.00 | 3709.83172N | 00336.07816W | 290422 |
+-----+-----------+-------------+--------------+--------+
+-----+--------------------+
| GSV | Satellites in view |
+-----+--------------------+
|     | 07                 |
+-----+--------------------+
Fin ejecución

Por último vamos a hacer unas rutinas que sean capaces de a partir de un audio en el que haya algunos pitidos decodificarlos y recuperar la fecha y hora en la que fueron emitidos.

[8]:
from numpy.fft import fftfreq, fft
import matplotlib.pyplot as plt

def buscaNota(notaBuscada, samples, tamanioMuestreo, pasos = 1, desde = 0, hasta = -1):
    '''
    Busca en toda la onda la frecuencia de la nota 10, que es la que estamos usando para señalizar el principio del mensaje
    '''

    global notes

    if hasta == -1:
        hasta = len(samples)

    frecuenciaBuscada=int(notes[notaBuscada]/(44100 / tamanioMuestreo))

    maxVal = 0
    maxPos = 0

    for pos in range(desde, hasta, pasos):       # Vamos analizando trocitos
        samp = samples[pos:pos+tamanioMuestreo]

        four = abs(fft(samp))[:int(tamanioMuestreo/2)]    # FFT

        m = round(four[frecuenciaBuscada], 5) # Redondeamos a 5 cifras decimales la frecuencia que buscamos

        if (m > maxVal):       # Si es la mayor actualizamos
            maxVal = m
            maxPos = pos
            #print(pos)
            #print(m)

    return maxPos



def queNota(samples):    # Analiza un trozo de muestras y devuelve la nota que está más presente de acuerdo a su FFT
    global notes

    tamanioMuestreo = len(samples)

    four = abs(fft(samples))[:int(len(samples)/2)]   # FFT

    max = 0
    selectedNote = 0

    for i, n in enumerate(notes):                           # Analizamos las frecuencias de las notas
        frecuenciaBuscada=int(n/(44100 / tamanioMuestreo))

        intensity = four[frecuenciaBuscada]

        if intensity > max:                                 # Nos quedamos con la más intensa
            max = intensity
            selectedNote = i

    return selectedNote


def decodifica(samples, tamanioMuestreo):                   # A partir de una muestra que debe contener todos los digitos del
                                                            # mensaje los decodificamos
    mensaje = ""

    for pos in range(0, len(samples), tamanioMuestreo):
        ss = samples[pos:pos + tamanioMuestreo]
        nota = queNota(ss)

        if (nota < 10):
            mensaje += str(nota)

    return mensaje



def localizaDecodifica(samples):            # En la muestra completa de sonido buscamos donde empieza el mensaje
                                            # y lo decodificamos

    notaBuscada = 10                        # La nota inicial del mensaje es la 10

    tamanioMuestreo = int(44100 * 0.1)

    newPos = buscaNota(notaBuscada, samples, int(tamanioMuestreo / 2), pasos = int(tamanioMuestreo/2))  # Buscamos rapidamente la zona donde está

    #print(newPos)

    newPos2 = buscaNota(notaBuscada, samples, tamanioMuestreo, pasos = 1, desde = newPos - tamanioMuestreo, hasta = newPos + tamanioMuestreo*2)  # Hacemos una busqueda fina cerca de la anterior posición

    #print(newPos2)

    samp = samples[newPos2:newPos2 + tamanioMuestreo*13]    # Recortamos el trozo que debe tener el mensaje completo

    mensaje = decodifica(samp, tamanioMuestreo)                       # Decodificamos

    return mensaje

Aquí probamos las rutinas anteriores generando un posible mensaje artificial:

[9]:
ruidoInicial = np.random.rand(67570) * 0.1    # Producimos un ruido blanco inicial
samples = getNote(10)                         # Producimos la nota de inicio
ruidoFinal = np.random.rand(143233) * 0.1     # Ruido blanco final

samples = np.concatenate((ruidoInicial, samples))

for i in range(0, 12):     # Producimos doce notas: 012345678901
    s = getNote(i%10)

    samples = np.concatenate((samples, s))

samples = np.concatenate((samples, ruidoFinal))


samples[-1] = 1                                # Pequeño hack porque el plugin de emitir audio normaliza el volumen

display.display(Audio(samples, rate=44100, autoplay=True))   # Reproducimos el mensaje

mensaje = localizaDecodifica(samples)

print(f"Mensaje localizado y decodificado: {mensaje}")
Mensaje localizado y decodificado: 012345678901

Repetimos lo de antes, pero con un mensaje grabado realmente en un fichero WAV. Se oye ruido de fondo real y hasta el tick del reloj de la cocina:

[10]:
from scipy.io.wavfile import read
import numpy as np
from IPython.display import Audio
from IPython import display

a = read("sonidos/sonido.wav")      # Leemos el fichero wav
data = np.array(a[1],dtype=float)


display.display(Audio(data, rate=44100, autoplay=True))  # Lo reproducimos

mensaje = localizaDecodifica(data)    # Obtenemos el mensaje

print(f"Mensaje localizado y decodificado: {mensaje}")
Mensaje localizado y decodificado: 270422234800