G: Control de dispositivos serie (o USB-serie): GPS¶
Para ejecutar este notebook
necesitaremos instalar algunos paquetes (y poseer el hardware del que se habla!):
> conda install -c conda-forge tabulate
> conda install -c anaconda pyserial
> conda install -c anaconda pyaudio
Usaremos la biblioteca pySerial
(documentación) para leer de un modulo receptor de GPS USB como el de la imagen. Dichos aparatos, que se pueden comprar muy baratos (menos de 10€).

Un puerto serie se maneja de manera muy similar a como funcionan los archivos normales. Podemos leer de ellos y escribir en ellos.
En un sistema UNIX este tipo de dispositivos crean en el sistema de archivos un fichero con una ruta similar a /dev/ttyUSBx
o /dev/ttyACMx
donde x
puede variar. Por tanto una vez conectado el dispositivo podemos listar los ficheros de dicho directorio para ver si aparece ese puerto:
[1]:
ls /dev/tty*
/dev/tty /dev/tty23 /dev/tty39 /dev/tty54 /dev/ttyS1 /dev/ttyS25
/dev/tty0 /dev/tty24 /dev/tty4 /dev/tty55 /dev/ttyS10 /dev/ttyS26
/dev/tty1 /dev/tty25 /dev/tty40 /dev/tty56 /dev/ttyS11 /dev/ttyS27
/dev/tty10 /dev/tty26 /dev/tty41 /dev/tty57 /dev/ttyS12 /dev/ttyS28
/dev/tty11 /dev/tty27 /dev/tty42 /dev/tty58 /dev/ttyS13 /dev/ttyS29
/dev/tty12 /dev/tty28 /dev/tty43 /dev/tty59 /dev/ttyS14 /dev/ttyS3
/dev/tty13 /dev/tty29 /dev/tty44 /dev/tty6 /dev/ttyS15 /dev/ttyS30
/dev/tty14 /dev/tty3 /dev/tty45 /dev/tty60 /dev/ttyS16 /dev/ttyS31
/dev/tty15 /dev/tty30 /dev/tty46 /dev/tty61 /dev/ttyS17 /dev/ttyS4
/dev/tty16 /dev/tty31 /dev/tty47 /dev/tty62 /dev/ttyS18 /dev/ttyS5
/dev/tty17 /dev/tty32 /dev/tty48 /dev/tty63 /dev/ttyS19 /dev/ttyS6
/dev/tty18 /dev/tty33 /dev/tty49 /dev/tty7 /dev/ttyS2 /dev/ttyS7
/dev/tty19 /dev/tty34 /dev/tty5 /dev/tty8 /dev/ttyS20 /dev/ttyS8
/dev/tty2 /dev/tty35 /dev/tty50 /dev/tty9 /dev/ttyS21 /dev/ttyS9
/dev/tty20 /dev/tty36 /dev/tty51 /dev/ttyACM0 /dev/ttyS22
/dev/tty21 /dev/tty37 /dev/tty52 /dev/ttyprintk /dev/ttyS23
/dev/tty22 /dev/tty38 /dev/tty53 /dev/ttyS0 /dev/ttyS24
Estos dispositivos usualmente usan el protocolo NMEA. Podemos encontrar una descripción más detallada del mismo en este PDF.
Lo más básico que podemos hacer es abrir el puerto serie e ir leyendo los mensajes que nos envía el receptor:
[2]:
pathSerie = '/dev/ttyACM0'
[3]:
import serial
from IPython.display import display, clear_output, HTML
ser = serial.Serial(pathSerie, 9600) # Abrimos el puerto a una velocidad de 9600 baudios (típico en estos dispositivos)
try:
while True: # Bucle infinito. Seguramente querrás pararlo en algún momento
data = ser.readline()
if data:
print(data)
except KeyboardInterrupt:
print("Fin ejecución")
b'$GPRMC,231249.00,V,,,,,,,,,,N*72\r\n'
b'$GPVTG,,,,,,,,,N*30\r\n'
b'$GPGGA,231249.00,,,,,0,00,99.99,,,,,,*69\r\n'
b'$GPGSA,A,1,,,,,,,,,,,,,99.99,99.99,99.99*30\r\n'
b'$GPGSV,2,1,05,02,,,36,03,,,31,06,,,37,11,,,42*7D\r\n'
b'$GPGSV,2,2,05,20,,,33*7E\r\n'
b'$GPGLL,,,,,231249.00,V,N*45\r\n'
b'$GPTXT,01,01,02,u-blox ag - www.u-blox.com*50\r\n'
b'$GPTXT,01,01,02,HW UBX-G70xx 00070000 FF7FFFFFo*69\r\n'
b'$GPTXT,01,01,02,ROM CORE 1.00 (59842) Jun 27 2012 17:43:52*59\r\n'
b'$GPTXT,01,01,02,PROTVER 14.00*1E\r\n'
b'$GPTXT,01,01,02,ANTSUPERV=AC SD PDoS SR*20\r\n'
b'$GPTXT,01,01,02,ANTSTATUS=OK*3B\r\n'
b'$GPTXT,01,01,02,LLC FFFFFFFF-FFFFFFFF-FFFFFFFF-FFFFFFFF-FFFFFFFD*2C\r\n'
b'$GPRMC,231250.00,V,,,,,,,,,,N*7A\r\n'
b'$GPVTG,,,,,,,,,N*30\r\n'
b'$GPGGA,231250.00,,,,,0,00,99.99,,,,,,*61\r\n'
b'$GPGSA,A,1,,,,,,,,,,,,,99.99,99.99,99.99*30\r\n'
b'$GPGSV,2,1,05,02,,,36,03,,,30,06,,,37,11,,,42*7C\r\n'
b'$GPGSV,2,2,05,20,,,34*79\r\n'
b'$GPGLL,,,,,231250.00,V,N*4D\r\n'
b'$GPRMC,231251.00,V,,,,,,,,,,N*7B\r\n'
b'$GPVTG,,,,,,,,,N*30\r\n'
b'$GPGGA,231251.00,,,,,0,00,99.99,,,,,,*60\r\n'
b'$GPGSA,A,1,,,,,,,,,,,,,99.99,99.99,99.99*30\r\n'
b'$GPGSV,2,1,05,02,,,36,03,,,30,06,,,37,11,,,42*7C\r\n'
b'$GPGSV,2,2,05,20,,,34*79\r\n'
b'$GPGLL,,,,,231251.00,V,N*4C\r\n'
Fin ejecución
Vamos a crear unas pocas clases y funciones para mejorar la salida de información:
[4]:
import tabulate
import serial
from IPython import display
import time
lastGPRMC = "" # Último mensaje de tipo GPRMC codificado como tabla bonita
lastGPGSV = "" # Último mensaje de tipo GPSVC codificado como tabla bonita
lastDate = "" # Última fecha recibida del GPS
lastTime = "" # Última hora recibida del GPS
class printer(str): # Clase auxiliar para imprimir las tablas de datos
def __repr__(self):
return self
def parseNMEA(line): # Analiza una línea de mensaje del protocolo NMEA
if line.startswith("$GPRMC"):
parseNMEA_RMC(line)
if line.startswith("$GPGSV"):
parseNMEA_GSV(line)
def parseNMEA_RMC(line): # Analiza una línea NMEA de tipo RMC y almacena su información en una tabla bonita.
# Tb actualiza la última hora y fecha recibidas
global lastGPRMC
global lastDate
global lastTime
split = line.split(",")
tableD = [["RMC", "UTC Time", "Latitude", "Longitude", "Date"],
["", split[1], split[3] + split[4], split[5] + split[6], split[9]]]
table = tabulate.tabulate(tableD, tablefmt='grid')
lastTime = split[1]
lastDate = split[9]
lastGPRMC = table
def parseNMEA_GSV(line): # Analiza una línea NMEA de tipo GSV y almacena su información en una tabla bonita.
global lastGPGSV
split = line.split(",")
tableD = [
["GSV", "Satellites in view"],
["", split[3]]
]
table = tabulate.tabulate(tableD, tablefmt='grid')
lastGPGSV = table
def imprimeDatos(): # Borra las últimas tablas de datos mostradas y pinta los datos actualizados
global lastGPRMC
global lastGPGSV
display.clear_output()
display.display(printer(lastGPRMC))
display.display(printer(lastGPGSV))
def leeGPS(): # Lee del puerto serie los datos del GPS y los analiza.
try:
ser = serial.Serial(pathSerie, 9600)
lastUpdate = time.time() - 1
while True:
data = ser.readline()
if data:
data = data.decode('ascii')
parseNMEA(data)
currentTime = time.time()
if (currentTime - lastUpdate > 0.9): # Si ha pasado más de un segundo desde la última impresión, imprime de nuevo
imprimeDatos()
lastUpdate = currentTime
except KeyboardInterrupt:
print("Fin ejecución")
[5]:
leeGPS()
+-----+-----------+-------------+--------------+--------+
| RMC | UTC Time | Latitude | Longitude | Date |
+-----+-----------+-------------+--------------+--------+
| | 231432.00 | 3709.83273N | 00336.07721W | 290422 |
+-----+-----------+-------------+--------------+--------+
+-----+--------------------+
| GSV | Satellites in view |
+-----+--------------------+
| | 07 |
+-----+--------------------+
Fin ejecución
Produciendo sonidos para codificar la fecha y hora¶
Añadimos unas funciones más para reproducir unos sonidos que codifican la fecha y la hora recibida del GPS estilo «R2D2». Solo suena en los minutos «en punto» (segundos == 0).
[6]:
from IPython.display import Audio
import numpy as np
notes = np.arange(1, 12) * 440.0
def getNote(digito, volume = .1, sampling_rate=44100, duration = 0.1): # Fabrica los samples de una nota determinada (0-10)
global notes
digito = int(digito)
f = notes[digito]
samples = volume * (np.sin(2*np.pi*np.arange(sampling_rate*duration)*f/sampling_rate)).astype(np.float32)
return samples
def getNotes(datetime, volume): # Genera los samples de una fecha y hora, cifra a cifra. Empieza por la nota 10 para marcar el comienzo
samples = getNote(10, volume)
for i in datetime:
if (type(samples) is np.ndarray):
samples = np.concatenate((samples, getNote(i, volume)))
samples[-1] = 1.0 # Hack para evitar el que notebook normalice el volumen
return samples
def playDateTime(date, time, volume=0.3): # Reproduce los sonidos para codificar la fecha y hora
time = time[:6]
datetime = date + time
samples = getNotes(datetime, volume)
display.display(Audio(samples, rate=44100, autoplay=True))
lastSound = 0
def imprimeDatos(): # Sobreescribimos la función que pinta las tablas para que cada minuto en punto suene la codificación de la hora
global lastGPRMC
global lastGPGSV
global lastTime
global lastDate
global lastSound
currentTime = time.time()
if currentTime - lastSound > 1.9: # Evitamos que se borre la salida si estan sonando los pitidos
display.clear_output()
display.display(printer(lastGPRMC))
display.display(printer(lastGPGSV))
if lastTime[4:6] == "00": # En los minutos en punto pita
playDateTime(lastDate, lastTime, volume = 1)
lastSound = currentTime
[7]:
leeGPS()
+-----+-----------+-------------+--------------+--------+
| RMC | UTC Time | Latitude | Longitude | Date |
+-----+-----------+-------------+--------------+--------+
| | 231443.00 | 3709.83172N | 00336.07816W | 290422 |
+-----+-----------+-------------+--------------+--------+
+-----+--------------------+
| GSV | Satellites in view |
+-----+--------------------+
| | 07 |
+-----+--------------------+
Fin ejecución
Por último vamos a hacer unas rutinas que sean capaces de a partir de un audio en el que haya algunos pitidos decodificarlos y recuperar la fecha y hora en la que fueron emitidos.
[8]:
from numpy.fft import fftfreq, fft
import matplotlib.pyplot as plt
def buscaNota(notaBuscada, samples, tamanioMuestreo, pasos = 1, desde = 0, hasta = -1):
'''
Busca en toda la onda la frecuencia de la nota 10, que es la que estamos usando para señalizar el principio del mensaje
'''
global notes
if hasta == -1:
hasta = len(samples)
frecuenciaBuscada=int(notes[notaBuscada]/(44100 / tamanioMuestreo))
maxVal = 0
maxPos = 0
for pos in range(desde, hasta, pasos): # Vamos analizando trocitos
samp = samples[pos:pos+tamanioMuestreo]
four = abs(fft(samp))[:int(tamanioMuestreo/2)] # FFT
m = round(four[frecuenciaBuscada], 5) # Redondeamos a 5 cifras decimales la frecuencia que buscamos
if (m > maxVal): # Si es la mayor actualizamos
maxVal = m
maxPos = pos
#print(pos)
#print(m)
return maxPos
def queNota(samples): # Analiza un trozo de muestras y devuelve la nota que está más presente de acuerdo a su FFT
global notes
tamanioMuestreo = len(samples)
four = abs(fft(samples))[:int(len(samples)/2)] # FFT
max = 0
selectedNote = 0
for i, n in enumerate(notes): # Analizamos las frecuencias de las notas
frecuenciaBuscada=int(n/(44100 / tamanioMuestreo))
intensity = four[frecuenciaBuscada]
if intensity > max: # Nos quedamos con la más intensa
max = intensity
selectedNote = i
return selectedNote
def decodifica(samples, tamanioMuestreo): # A partir de una muestra que debe contener todos los digitos del
# mensaje los decodificamos
mensaje = ""
for pos in range(0, len(samples), tamanioMuestreo):
ss = samples[pos:pos + tamanioMuestreo]
nota = queNota(ss)
if (nota < 10):
mensaje += str(nota)
return mensaje
def localizaDecodifica(samples): # En la muestra completa de sonido buscamos donde empieza el mensaje
# y lo decodificamos
notaBuscada = 10 # La nota inicial del mensaje es la 10
tamanioMuestreo = int(44100 * 0.1)
newPos = buscaNota(notaBuscada, samples, int(tamanioMuestreo / 2), pasos = int(tamanioMuestreo/2)) # Buscamos rapidamente la zona donde está
#print(newPos)
newPos2 = buscaNota(notaBuscada, samples, tamanioMuestreo, pasos = 1, desde = newPos - tamanioMuestreo, hasta = newPos + tamanioMuestreo*2) # Hacemos una busqueda fina cerca de la anterior posición
#print(newPos2)
samp = samples[newPos2:newPos2 + tamanioMuestreo*13] # Recortamos el trozo que debe tener el mensaje completo
mensaje = decodifica(samp, tamanioMuestreo) # Decodificamos
return mensaje
Aquí probamos las rutinas anteriores generando un posible mensaje artificial:
[9]:
ruidoInicial = np.random.rand(67570) * 0.1 # Producimos un ruido blanco inicial
samples = getNote(10) # Producimos la nota de inicio
ruidoFinal = np.random.rand(143233) * 0.1 # Ruido blanco final
samples = np.concatenate((ruidoInicial, samples))
for i in range(0, 12): # Producimos doce notas: 012345678901
s = getNote(i%10)
samples = np.concatenate((samples, s))
samples = np.concatenate((samples, ruidoFinal))
samples[-1] = 1 # Pequeño hack porque el plugin de emitir audio normaliza el volumen
display.display(Audio(samples, rate=44100, autoplay=True)) # Reproducimos el mensaje
mensaje = localizaDecodifica(samples)
print(f"Mensaje localizado y decodificado: {mensaje}")
Mensaje localizado y decodificado: 012345678901
Repetimos lo de antes, pero con un mensaje grabado realmente en un fichero WAV
. Se oye ruido de fondo real y hasta el tick del reloj de la cocina:
[10]:
from scipy.io.wavfile import read
import numpy as np
from IPython.display import Audio
from IPython import display
a = read("sonidos/sonido.wav") # Leemos el fichero wav
data = np.array(a[1],dtype=float)
display.display(Audio(data, rate=44100, autoplay=True)) # Lo reproducimos
mensaje = localizaDecodifica(data) # Obtenemos el mensaje
print(f"Mensaje localizado y decodificado: {mensaje}")
Mensaje localizado y decodificado: 270422234800