###############################################################################
# Copyright (c) 2019, GNOME Collaboration.
# Produced at the University of Mainz
#
# Written by Joseph A. Smiga (jsmiga@uni-mainz.de)
#
# All rights reserved.
# This file is part of GDAS.
# For details, see https://gnome.pages.gitlab.rlp.net/gdas/
# For details about use and distribution, please read GDAS/LICENSE.
###############################################################################
# Code for reading GNOME data.
#
# Summary of contents:
# getDataFromFile()--------Gets magnetometer data from single file.
# getFListInRange()--------Gets list of files in time range.
# getDataInRange()---------Gets list of data from files in time range.
# getFileNameTime()--------Gets file time from its name.
# getFListFromDates()------Gets list of file names from dates.
# getStartTimes()----------Gets list of start times for set of data files.
# getSaneList()------------Finds which data files contain only 'sane' data.
# getCharacteristics()-----Get position/orientation of sensors.
# getSensorInfoFromFile()--Get position/orientation from file.
# getFalseOffset()---------Generate an offset for random sampling.
# falseFListInRange()------Calls getFListInRange() with an offset.
# offsetTimeSeries()-------Subtracts an offset from a TimeSeries(List).
import h5py
import time, calendar
from gwpy.timeseries import TimeSeries,TimeSeriesList
from astropy.time import Time as astroTime
from astropy.coordinates import EarthLocation,AltAz
import astropy.units as u
from os import listdir
from os.path import isfile, join
from numpy.random import random
from sympy import lambdify # needed for conversion function
__all__ = [
"getDataFromFile", "getFListInRange", "getDataInRange", "getFileNameTime",
"getFListFromDates", "getStartTimes", "getSaneList", "getCharacteristics",
"getSensorInfoFromFile", "getFalseOffset", "falseFListInRange",
"offsetTimeSeries"
]
__author__ = 'Joseph A. Smiga <jsmiga@uni-mainz.de>'
[docs]def getDataFromFile(fName, convert=True, dataCh='MagneticFields',
saneCh='SanityChannel'):
'''
Gets magnetometer data from file.
fName: str
Name of file
convert: boolean (default: True)
Whether to use conversion function from file.
dataCh: string (default "MagneticFields")
Name of data channel.
saneCh: string (default "SanityChannel")
Name of sanity channel.
returns (data, sanity data) as astropy TimeSeries
Note: must evaluate values in 'sanity' (e.g., using 'value' attribute) to
get boolean
'''
# seconds from (1970/01/01 00:00:00.0) to gps epoch (1980/01/06 00:00:19.0), ignoring leap-seconds
gpsEpoch = 315964819.
try:
h5pyFile = h5py.File(fName,'r')
saneList = h5pyFile[saneCh]
dataList = h5pyFile[dataCh]
# get mag field attributes
attrs = dataList.attrs
sampRate = float(attrs['SamplingRate(Hz)'])
# parse time into unix time
startT = calendar.timegm(time.strptime(attrs['Date']+' '+attrs['t0'], '%Y/%m/%d %H:%M:%S.%f'))
startT-= gpsEpoch # adjust epoch to GPS epoch
# get milliseconds
decPt = attrs['t0'].rfind('.')
if(decPt >= 0): # has decimal point
startT += float('0'+attrs['t0'][decPt:])
# convert to time object. gwpy.TimeSeries prefers using gps time (not unix) so this will handle the conversion
startT = astroTime(startT, format='gps') # GPS uses TAI time (no leap seconds)
# get sanity attributes
saneAttrs = saneList.attrs
saneRate = float(saneAttrs['SamplingRate(Hz)'])
saneStart = calendar.timegm(time.strptime(saneAttrs['Date']+' '+saneAttrs['t0'], '%Y/%m/%d %H:%M:%S.%f'))
saneStart-= gpsEpoch # adjust epoch to GPS epoch
# get milliseconds
decPt = saneAttrs['t0'].rfind('.')
if(decPt >= 0): # has decimal point
saneStart += float('0'+saneAttrs['t0'][decPt:])
# convert to time object. gwpy.TimeSeries prefers using gps time (not unix) so this will handle the conversion
saneStart= astroTime(saneStart, format='gps')
# create data TimeSeries
if(convert):
convStr = attrs['MagFieldEq'] #string contatining conversion function
unitLoc = convStr.find('[') # unit info is at end in []
if(unitLoc >= 0): # unit given
convStr = convStr[:unitLoc] # get substring before units
# get list of channels used in conversion
chNames = [k.encode() for k in h5pyFile.iterkeys()] # list of channels to use in calibration
for i in range(len(chNames)-1,-1,-1): # decrement so I can just pop off unused channels
if(convStr.find(chNames[i]) < 0): # remove key that does not appear
chNames.pop(i)
chList = [None]*len(chNames)
for i in range(len(chNames)): # store "value" (a numpy array)
k = chNames[i]
if(k==saneCh):
chList[i] = saneList.value
elif(k==dataCh):
chList[i] = dataList.value
else:
chList[i] = h5pyFile[k].value
convFunc = lambdify(chNames, convStr) # create symbolic function (default is to use numpy)
dataTS = TimeSeries(convFunc(*chList), sample_rate=sampRate, epoch=startT) # put data in TimeSeries
# convStr=convStr.replace('MagneticFields','dataTS') # relabel with correct variable
# exec 'dataTS = '+convStr # dynamic execution to convert dataTS
else:
dataTS = TimeSeries(dataList, sample_rate=sampRate, epoch=startT) # put data in TimeSeries
# create sanity TimeSeries
saneTS = TimeSeries(saneList, sample_rate=saneRate, epoch=saneStart)
finally:
h5pyFile.close()
return dataTS, saneTS
[docs]def getFListInRange(station, startTime, endTime, path='./', verbose=False):
'''
Get list of file names for GNOME experiment within a time period.
Data located in folder of the form 'path/station/yyyy/mm/dd/'.
Time range uses the start time listed in the 'hdf5' file name.
station: str
Name of station.
startTime: float (unix time), str
Earliest time. String formatted as 'yyyy-mm-dd-HH-MM-SS' (omitted values
defaulted as 0)
endTime: float (unix time), str
Last time. Format same as startTime
path: str (default './')
Location of files
verbose: bool (default False)
Verbose output
returns list of file names
'''
# put date in consistant format
# Note that the file names do not contain milliseconds, so "time" tuple is ok
makeSU = True # Need to calculate start time in unix
makeEU = True # Need to calculate end time in unix
if(not isinstance(startTime,str)): # given unix time
startUnix = startTime
makeSU = False
startTime = time.strftime('%Y-%m-%d-%H-%M-%S',time.gmtime(startTime))
if(not isinstance(endTime, str)): # given unix time
endUnix = endTime
makeEU = False
endTime = time.strftime('%Y-%m-%d-%H-%M-%S',time.gmtime(endTime))
# Format start/end times (in array and unix time, if needed)
startTList = [0.]*9 # date-time tuples (note that last 3 should never be changed)
endTList = [0.]*9
startTime = str.split(startTime,'-')
endTime = str.split(endTime,'-')
startTList[:len(startTime)] = [int(t) if len(t)>0 else 0. for t in startTime]
endTList[:len(endTime)] = [int(t) if len(t)>0 else 0. for t in endTime]
if(makeSU):
startUnix = calendar.timegm(startTList)
if(makeEU):
endUnix = calendar.timegm(endTList)
# check for bad input
if(startUnix > endUnix):
raise ValueError('Bad input time range (check order):'+str(startTime)+' to '+str(endTime))
# create array of dates (used for folders)
dummy = [0.]*9
dummy[0:3] = startTList[0:3] # start date (beginning of day)
currTime = calendar.timegm(dummy)
dates = []
while(currTime < endUnix):
dates.append(time.strftime('%Y/%m/%d/',time.gmtime(currTime))) # path segment
currTime += 86400 # add a day
fList = [] #will hold list of files
for i in range(len(dates)):
firstDate = i==0 # use these bools to skip checks for middle dates
lastDate = i==len(dates)-1
dataDir = join(path,station,dates[i]) #directory of files from date
try:
# get list of files (ignore, e.g., folders)
foldFiles = [f for f in listdir(dataDir) if isfile(join(dataDir, f))]
# add file to list if it is in time range
# files like: fribourg01_20170102_122226.hdf5
for f in foldFiles:
inRange = not (firstDate or lastDate)
if(not inRange): # need to check
fTime = f.split('_')[2].split('.')[0] # get time 'hhmmss'
fTime = fTime[0:2]+':'+fTime[2:4]+':'+fTime[4:6] # time format hh:mm:ss
# print(dates[i]+fTime, f)
fTime = calendar.timegm(time.strptime(dates[i]+fTime, '%Y/%m/%d/%H:%M:%S')) # in unix
if(fTime >= startUnix and fTime < endUnix):
inRange = True
# print('\t', inRange)
if(inRange): # add file to list
fList.append(join(dataDir, f))
# in case the file list is not sorted, look through all files.
except OSError:
if(verbose):
print('getFListInRange() --- Data not found for: ' + dates[i])
return fList
[docs]def getDataInRange(station, startTime, endTime, sortTime=True, convert=True,
path='./',dataCh='MagneticFields',saneCh='SanityChannel',
verbose=False):
'''
Get list of data in time range. Time range uses the start time listed in the
'hdf5' file name.
station: str
Name of station.
startTime: float (unix time), str
Earliest time. String formatted as 'yyyy-mm-dd-HH-MM-SS'
(omitted values defaulted as 0)
endTime: float (unix time), str
Last time. Format same as startTime
sortTime: bool (default: True)
Actively sort output by start time (using data in file)
convert: boolean (default: True)
Whether to use conversion function from file.
path: str (default './')
Location of files
dataCh: string (default "MagneticFields")
Name of data channel.
saneCh: string (default "SanityChannel")
Name of sanity channel.
verbose: bool (default False)
Verbose output
returns (data, sanity, fileList). Data and sanity are astropy TimeSeriesList
Note: must evaluate values in 'sanity' (e.g., using 'value' attribute) to
get boolean
Note: use, e.g., dataTSL.join(pad=float('nan'),gap='pad') to combine
TimeSeriesList into single Time series.
'''
if(verbose):
print('getDataInRange() --- Finding files')
fList = getFListInRange(station, startTime, endTime, path=path)
numFiles = len(fList)
# get data
if(verbose):
print('getDataInRange() --- Reading files')
dataList = [None]*numFiles
saneList = [None]*numFiles
for i in range(numFiles):
dataList[i],saneList[i] = getDataFromFile(fList[i],convert=convert,dataCh=dataCh,saneCh=saneCh)
# sort if needed
if(sortTime):
if(verbose):
print('getDataInRange() --- Sorting data')
# do insertion sort (likely that list is sorted)
sortIndex = range(numFiles) # sorted list of indices
for sRange in range(1, numFiles): # sRange is size of sorted segment
# note, sortIndex[sRange] = sRange
insPtTime = dataList[sRange].epoch # for point being inserted
insHere = sRange # place to insert point
while(insHere > 0 and dataList[sortIndex[insHere-1]].epoch > insPtTime):
insHere -= 1 # decrement until finding place to insert
# insert point
dummy1 = sRange # point being moved
while(insHere <= sRange):
dummy2 = sortIndex[insHere]
sortIndex[insHere] = dummy1
dummy1 = dummy2
insHere+=1
else:
sortIndex = range(numFiles)
# put data in TimeSeriesList
dataTSL = TimeSeriesList()
saneTSL = TimeSeriesList()
for i in sortIndex:
dataTSL.append(dataList[i])
saneTSL.append(saneList[i])
return dataTSL, saneTSL, [fList[i] for i in sortIndex]
def getFileNameTime(fileName):
'''
Gives unix time from file name (does not read file).
Files of the form */yyyy/mm/dd/*hhmmss.hdf5, no '/' after/in last *
fileName: str
file name
return unix time of file, according to name
'''
fileComp = str.split(fileName,'/')
fileComp = fileComp[-4:] # last 4 elements
year = int(fileComp[0])
month = int(fileComp[1])
day = int(fileComp[2])
dayTime = fileComp[3][-11:-5] # hhmmss
hour = int(dayTime[:2])
minute = int(dayTime[2:4])
second = int(dayTime[4:])
timeList = [year, month, day, hour, minute, second,0,0,0] # date-time tuples (note that last 3 should never be changed)
return calendar.timegm(timeList)
def getFListFromDates(station,dates,path='./', verbose=False):
'''
Gets list of files from a date (relies on file organization:
path/station/yyyy/mm/dd/).
station: str
Name of station used in file system
dates: list<str>
List of date strings (format: year-month-day)
path: str (default './')
Data path of folder holding station folders
verbose: bool (default: False)
print when file not found
returns list of file names
'''
if(isinstance(dates, str)): # make 'dates' an array if single val given
dates = [dates]
fList = [] #will hold list of files
for date in dates:
dArr = [int(s) for s in date.split('-') if s.isdigit()] #split date into integer array
dataDir = join(path,station,"{0:04d}/{1:02d}/{2:02d}".format(dArr[0],dArr[1],dArr[2])) #directory of files from date
# append list of files (exclude directories). Use full path
try:
fList.extend([join(dataDir, f) for f in listdir(dataDir) if isfile(join(dataDir, f))])
except OSError:
if(verbose):
print('Data not found for: ' + date)
return fList
def getStartTimes(fList):
'''
Gets start time (in sec since UNIX epoch) for file list.
fList: List<str>
list of file names
retruns list of start times
'''
nFiles = len(fList)
stTimes = [0.]*nFiles
for i in range(nFiles):
# if(not i%1000): #print progress
# print(i,'/',nFiles)
f = fList[i]
h5pyFile= h5py.File(f,'r')
magFieldAttrs = h5pyFile['MagneticFields'].attrs
timeStr = magFieldAttrs['Date'] + ' ' + magFieldAttrs['t0'] + ' UTC'
h5pyFile.close()
# get time since epoch (in sec)
stTimes[i] = calendar.timegm(time.strptime(timeStr, "%Y/%m/%d %H:%M:%S.%f %Z"))
return stTimes
def getSaneList(fList, working=None):
'''
Gets list of good data, i.e., those with perfect samity channels
fList: list<str>
list of file names
working: list<int>
indices of working stations. If not specified, all stations are used.
returns list of good files (indices)
'''
saneList = []
if(not working): # no filter specified
working = range(len(fList))
for i in range(len(fList)):
fName = fList[i]
h5pyFile = h5py.File(fName,'r')
if(all(h5pyFile['SanityChannel'].value)):
saneList.append(i)
h5pyFile.close()
return saneList
def getCharacteristics(stations,sensTime,path='./'):
'''
Get dictionaries with magnetometer information.
stations: List<str>
List of station names
sensTime: float (unix time), str
Earliest time. String formatted as 'yyyy-mm-dd-HH-MM-SS' (omitted values
defaulted as 0)
path: str (default './')
Location of files
returns poistion (EarthLocation) and orientation (AltAz) dictionaries,
respectively -- using astropy classes.
If the sensor was off on this day, the next available data will be used.
If no later data are available, the previous data are used.
'''
# put date in consistant format
# Note that the file names do not contain milliseconds, so "time" tuple is ok
makeU = True # Need to calculate start time in unix
if(not isinstance(sensTime, str)): # given unix time
sensUnix = sensTime
makeU = False
sensTime = time.strftime('%Y-%m-%d-%H-%M-%S',time.gmtime(sensTime))
# Format start/end times (in array and unix time, if needed)
sensTList = [0]*9 # date-time tuples (note that last 3 should never be changed)
sensTime = str.split(sensTime,'-')
sensTList[:len(sensTime)] = [int(t) if len(t)>0 else 0. for t in sensTime]
if(makeU):
sensUnix = calendar.timegm(sensTList)
magPos = {}
magOrient = {}
# find file near specified time
for st in stations:
dataDir = join(path,st)
relDate = 0 # Negative > Use latest (date before). Positive > Use earliest (date after)
try: # check if sensor exists
# get year folder
foldFiles = listdir(dataDir)
foldFiles.sort() # sort years
i=0 # find earliest year on or after requested year (or just last year)
while(i+1<len(foldFiles) and int(foldFiles[i]) < sensTList[0]):
i+=1
dataDir = join(dataDir,foldFiles[i])
relDate = int(foldFiles[i]) - sensTList[0]
# get month folder
foldFiles = listdir(dataDir)
foldFiles.sort() # sort months
if(relDate > 0):
dataDir = join(dataDir,foldFiles[0])
elif(relDate < 0):
dataDir = join(dataDir,foldFiles[-1])
else:
i=0
while(i+1<len(foldFiles) and int(foldFiles[i]) < sensTList[1]):
i+=1
dataDir = join(dataDir,foldFiles[i])
relDate = int(foldFiles[i]) - sensTList[1]
# get day folder
foldFiles = listdir(dataDir)
foldFiles.sort() # sort days
if(relDate > 0):
dataDir = join(dataDir,foldFiles[0])
elif(relDate < 0):
dataDir = join(dataDir,foldFiles[-1])
else:
i=0
while(i+1<len(foldFiles) and int(foldFiles[i]) < sensTList[2]):
i+=1
dataDir = join(dataDir,foldFiles[i])
relDate = int(foldFiles[i]) - sensTList[2]
# get list of files
foldFiles = [f for f in listdir(dataDir) if isfile(join(dataDir, f))]
if(len(foldFiles) <= 0): # empty folder
raise Exception('Folder empty: '+dataDir)
# files names: station_YYYYmmdd_HHMMSS.hdf5
fileTimes = [None]*len(foldFiles)
for i in range(len(foldFiles)): # record file time (unix)
year=int(foldFiles[i].split('_')[1][:4])
month=int(foldFiles[i].split('_')[1][4:6])
day=int(foldFiles[i].split('_')[1][6:8])
hour=int(foldFiles[i].split('_')[2][:2])
minute=int(foldFiles[i].split('_')[2][2:4])
sec=int(foldFiles[i].split('_')[2][4:6])
fileTimes[i] = calendar.timegm([year,month,day,hour,minute,sec,0,0,0])
# sort list of files by start time (insertion sort)
sortList = range(len(foldFiles)) # foldFiles[sortList[:]] is sorted
for i in range(1,len(foldFiles)): # first i are sorted
# find location to insert sortList[i]
j=i # will insert at jth location
while(j>0 and fileTimes[i] < fileTimes[j-1]):
j-=1 # decrement while index is at a strictly greater time
while(j < i):
# insert sortList[i] at j location
dummy = sortList[j]
sortList[j] = sortList[i]
sortList[i] = dummy # move next insert to ith location
j+=1
if(fileTimes[sortList[0]] >= sensUnix):
# time is before list
infoFile=foldFiles[sortList[0]]
else:
sPos = 0
ePos = len(sortList)
# binary search for first file time before sensUnix
while(sPos < ePos - 1):
mPos = (sPos+ePos)//2
if(fileTimes[sortList[mPos]] < sensUnix):
sPos = mPos
else:
ePos = mPos
# timeList[sortList[ePos-1]] <= sensUnix < timeList[sortList[ePos]] (if defined)
infoFile = foldFiles[sortList[ePos-1]]
posOrient = getSensorInfoFromFile(join(dataDir,infoFile))
if(posOrient[0] is not None):
magPos[st] = posOrient[0]
if(posOrient[1] is not None):
magOrient[st] = posOrient[1]
except OSError:
print("No/empty folder: "+dataDir)
return magPos, magOrient
def getSensorInfoFromFile(fileName):
'''
Get position and orientation from data file.
fileName: str
Absolute/relative file name.
returns (EarthLocation, AltAz) of magnetometer (using astropy classes).
'''
try:
h5pyFile = h5py.File(fileName,'r')
attribs = h5pyFile['MagneticFields'].attrs
try:
lon = attribs['Longitude']
lat = attribs['Latitude']
alt = attribs['Altitude']
pos = EarthLocation.from_geodetic(lon, lat, height=alt)
except KeyError: # attribute not available
pos = None
try:
altDir = attribs['SensorDirAltitude(Deg)']
azDir = attribs['SensorDirAzimuth(Deg)']
orient = AltAz(az=azDir*u.deg, alt=altDir*u.deg)
except KeyError: # attribute not available
orient = None
return pos, orient
finally:
h5pyFile.close()
def getFalseOffset(falseStart, falseEnd, rangeStart, rangeEnd):
'''
Generates a random time offset from a false start so that one can collect
data in [rangeStart, rangeEnd], but with the length (falseEnd-falseStart).
This is useful for collecting data from different sensors sampled from
random times in a science run.
falseStart: float (unix time), str
Earliest artificial time. String formatted as 'yyyy-mm-dd-HH-MM-SS'
(omitted values defaulted as 0).
falseEnd: float (unix time), str
Last artificial time. Format same as falseStart.
rangeStart: float (unix time), str
Earliest time in actual data. Format same as falseStart.
rangeEnd: float (unix time), str
Last time in actual data. Format same as falseStart.
returns offset (in sec)
'''
# put dates in unix time
# Note that the file names do not contain milliseconds, so "time" tuple is ok
# falseStart -> Unix
if(isinstance(falseStart,str)): # given as string
tList = [0.]*9 # date-time tuples (note that last 3 should never be changed)
strTList = falseStart.split('-')
tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList]
fsUnix = calendar.timegm(tList)
else:
fsUnix = falseStart
# falseEnd -> Unix
if(isinstance(falseEnd,str)): # given as string
tList = [0.]*9 # date-time tuples (note that last 3 should never be changed)
strTList = falseEnd.split('-')
tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList]
feUnix = calendar.timegm(tList)
else:
feUnix = falseEnd
# rangeStart -> Unix
if(isinstance(rangeStart,str)): # given as string
tList = [0.]*9 # date-time tuples (note that last 3 should never be changed)
strTList = rangeStart.split('-')
tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList]
rsUnix = calendar.timegm(tList)
else:
rsUnix = rangeStart
# rangeEnd -> Unix
if(isinstance(rangeEnd,str)): # given as string
tList = [0.]*9 # date-time tuples (note that last 3 should never be changed)
strTList = rangeEnd.split('-')
tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList]
reUnix = calendar.timegm(tList)
else:
reUnix = rangeEnd
rangeSize = reUnix - rsUnix # size of range
sampSize = feUnix - fsUnix # size of sample
offMin = rsUnix - fsUnix # offset between range start and false start
offSize = rangeSize - sampSize # range size for collecting
return offMin + offSize*random() # random offset
def falseFListInRange(station, falseStart, falseEnd, offset, path='./',
verbose=False):
'''
Get list of file names for GNOME experiment within a time period. The start
times of the files will be in [falseStart+offset, falseEnd+offset).
Data located in folder of the form 'path/station/yyyy/mm/dd/'.
Time range uses the start time listed in the 'hdf5' file name.
station: str
Name of station.
falseStart: float (unix time), str
Earliest time. String formatted as 'yyyy-mm-dd-HH-MM-SS' (omitted values
defaulted as 0).
falseEnd: float (unix time), str
Last time. Format same as falseStart.
offset: float
offset of start time (in sec).
path: str (default './')
Location of files.
verbose: bool (default False)
Verbose output.
returns list of offset file names
'''
# put dates in unix time
# Note that the file names do not contain milliseconds, so "time" tuple is ok
# falseStart -> Unix
if(isinstance(falseStart,str)): # given as string
tList = [0.]*9 # date-time tuples (note that last 3 should never be changed)
strTList = falseStart.split('-')
tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList]
fsUnix = calendar.timegm(tList)
else:
fsUnix = falseStart
# falseEnd -> Unix
if(isinstance(falseEnd,str)): # given as string
tList = [0.]*9 # date-time tuples (note that last 3 should never be changed)
strTList = falseEnd.split('-')
tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList]
feUnix = calendar.timegm(tList)
else:
feUnix = falseEnd
return getFListInRange(station, fsUnix+offset, feUnix+offset, path=path, verbose=verbose)
def offsetTimeSeries(data, offset):
'''
Subtracts a time offset to the data; i.e., time -> time-offset.
data: gwpy.timeseries.TimeSeries(List)
Time series with data.
offset: float (in sec), astropy quantity
Amount by which to offset the data.
'''
if(not isinstance(offset, u.quantity.Quantity)): # float/int given
offset = offset*u.s
if(isinstance(data,TimeSeriesList)):
for ts in data:
ts.epoch -= offset
else:
data.epoch -= offset