Source code for gdas.readGNOME

###############################################################################
# Copyright (c) 2019, GNOME Collaboration.
# Produced at the University of Mainz
#
# Written by Joseph A. Smiga (jsmiga@uni-mainz.de)
#
# All rights reserved.
# This file is part of GDAS.
# For details, see https://gnome.pages.gitlab.rlp.net/gdas/
# For details about use and distribution, please read GDAS/LICENSE.
###############################################################################
# Code for reading GNOME data.
#
# Summary of contents:
# getDataFromFile()--------Gets magnetometer data from single file.
# getFListInRange()--------Gets list of files in time range.
# getDataInRange()---------Gets list of data from files in time range.
# getFileNameTime()--------Gets file time from its name.
# getFListFromDates()------Gets list of file names from dates.
# getStartTimes()----------Gets list of start times for set of data files.
# getSaneList()------------Finds which data files contain only 'sane' data.
# getCharacteristics()-----Get position/orientation of sensors.
# getSensorInfoFromFile()--Get position/orientation from file.
# getFalseOffset()---------Generate an offset for random sampling.
# falseFListInRange()------Calls getFListInRange() with an offset.
# offsetTimeSeries()-------Subtracts an offset from a TimeSeries(List).

import h5py
import time, calendar

from gwpy.timeseries import TimeSeries,TimeSeriesList
from astropy.time import Time as astroTime
from astropy.coordinates import EarthLocation,AltAz
import astropy.units as u

from os import listdir
from os.path import isfile, join

from numpy.random import random
from sympy import lambdify # needed for conversion function

__all__ = [
    "getDataFromFile", "getFListInRange", "getDataInRange", "getFileNameTime",
    "getFListFromDates", "getStartTimes", "getSaneList", "getCharacteristics",
    "getSensorInfoFromFile", "getFalseOffset", "falseFListInRange",
    "offsetTimeSeries"
]

__author__ = 'Joseph A. Smiga <jsmiga@uni-mainz.de>'

[docs]def getDataFromFile(fName, convert=True, dataCh='MagneticFields', saneCh='SanityChannel'): ''' Gets magnetometer data from file. fName: str Name of file convert: boolean (default: True) Whether to use conversion function from file. dataCh: string (default "MagneticFields") Name of data channel. saneCh: string (default "SanityChannel") Name of sanity channel. returns (data, sanity data) as astropy TimeSeries Note: must evaluate values in 'sanity' (e.g., using 'value' attribute) to get boolean ''' # seconds from (1970/01/01 00:00:00.0) to gps epoch (1980/01/06 00:00:19.0), ignoring leap-seconds gpsEpoch = 315964819. try: h5pyFile = h5py.File(fName,'r') saneList = h5pyFile[saneCh] dataList = h5pyFile[dataCh] # get mag field attributes attrs = dataList.attrs sampRate = float(attrs['SamplingRate(Hz)']) # parse time into unix time startT = calendar.timegm(time.strptime(attrs['Date']+' '+attrs['t0'], '%Y/%m/%d %H:%M:%S.%f')) startT-= gpsEpoch # adjust epoch to GPS epoch # get milliseconds decPt = attrs['t0'].rfind('.') if(decPt >= 0): # has decimal point startT += float('0'+attrs['t0'][decPt:]) # convert to time object. gwpy.TimeSeries prefers using gps time (not unix) so this will handle the conversion startT = astroTime(startT, format='gps') # GPS uses TAI time (no leap seconds) # get sanity attributes saneAttrs = saneList.attrs saneRate = float(saneAttrs['SamplingRate(Hz)']) saneStart = calendar.timegm(time.strptime(saneAttrs['Date']+' '+saneAttrs['t0'], '%Y/%m/%d %H:%M:%S.%f')) saneStart-= gpsEpoch # adjust epoch to GPS epoch # get milliseconds decPt = saneAttrs['t0'].rfind('.') if(decPt >= 0): # has decimal point saneStart += float('0'+saneAttrs['t0'][decPt:]) # convert to time object. gwpy.TimeSeries prefers using gps time (not unix) so this will handle the conversion saneStart= astroTime(saneStart, format='gps') # create data TimeSeries if(convert): convStr = attrs['MagFieldEq'] #string contatining conversion function unitLoc = convStr.find('[') # unit info is at end in [] if(unitLoc >= 0): # unit given convStr = convStr[:unitLoc] # get substring before units # get list of channels used in conversion chNames = [k.encode() for k in h5pyFile.iterkeys()] # list of channels to use in calibration for i in range(len(chNames)-1,-1,-1): # decrement so I can just pop off unused channels if(convStr.find(chNames[i]) < 0): # remove key that does not appear chNames.pop(i) chList = [None]*len(chNames) for i in range(len(chNames)): # store "value" (a numpy array) k = chNames[i] if(k==saneCh): chList[i] = saneList.value elif(k==dataCh): chList[i] = dataList.value else: chList[i] = h5pyFile[k].value convFunc = lambdify(chNames, convStr) # create symbolic function (default is to use numpy) dataTS = TimeSeries(convFunc(*chList), sample_rate=sampRate, epoch=startT) # put data in TimeSeries # convStr=convStr.replace('MagneticFields','dataTS') # relabel with correct variable # exec 'dataTS = '+convStr # dynamic execution to convert dataTS else: dataTS = TimeSeries(dataList, sample_rate=sampRate, epoch=startT) # put data in TimeSeries # create sanity TimeSeries saneTS = TimeSeries(saneList, sample_rate=saneRate, epoch=saneStart) finally: h5pyFile.close() return dataTS, saneTS
[docs]def getFListInRange(station, startTime, endTime, path='./', verbose=False): ''' Get list of file names for GNOME experiment within a time period. Data located in folder of the form 'path/station/yyyy/mm/dd/'. Time range uses the start time listed in the 'hdf5' file name. station: str Name of station. startTime: float (unix time), str Earliest time. String formatted as 'yyyy-mm-dd-HH-MM-SS' (omitted values defaulted as 0) endTime: float (unix time), str Last time. Format same as startTime path: str (default './') Location of files verbose: bool (default False) Verbose output returns list of file names ''' # put date in consistant format # Note that the file names do not contain milliseconds, so "time" tuple is ok makeSU = True # Need to calculate start time in unix makeEU = True # Need to calculate end time in unix if(not isinstance(startTime,str)): # given unix time startUnix = startTime makeSU = False startTime = time.strftime('%Y-%m-%d-%H-%M-%S',time.gmtime(startTime)) if(not isinstance(endTime, str)): # given unix time endUnix = endTime makeEU = False endTime = time.strftime('%Y-%m-%d-%H-%M-%S',time.gmtime(endTime)) # Format start/end times (in array and unix time, if needed) startTList = [0.]*9 # date-time tuples (note that last 3 should never be changed) endTList = [0.]*9 startTime = str.split(startTime,'-') endTime = str.split(endTime,'-') startTList[:len(startTime)] = [int(t) if len(t)>0 else 0. for t in startTime] endTList[:len(endTime)] = [int(t) if len(t)>0 else 0. for t in endTime] if(makeSU): startUnix = calendar.timegm(startTList) if(makeEU): endUnix = calendar.timegm(endTList) # check for bad input if(startUnix > endUnix): raise ValueError('Bad input time range (check order):'+str(startTime)+' to '+str(endTime)) # create array of dates (used for folders) dummy = [0.]*9 dummy[0:3] = startTList[0:3] # start date (beginning of day) currTime = calendar.timegm(dummy) dates = [] while(currTime < endUnix): dates.append(time.strftime('%Y/%m/%d/',time.gmtime(currTime))) # path segment currTime += 86400 # add a day fList = [] #will hold list of files for i in range(len(dates)): firstDate = i==0 # use these bools to skip checks for middle dates lastDate = i==len(dates)-1 dataDir = join(path,station,dates[i]) #directory of files from date try: # get list of files (ignore, e.g., folders) foldFiles = [f for f in listdir(dataDir) if isfile(join(dataDir, f))] # add file to list if it is in time range # files like: fribourg01_20170102_122226.hdf5 for f in foldFiles: inRange = not (firstDate or lastDate) if(not inRange): # need to check fTime = f.split('_')[2].split('.')[0] # get time 'hhmmss' fTime = fTime[0:2]+':'+fTime[2:4]+':'+fTime[4:6] # time format hh:mm:ss # print(dates[i]+fTime, f) fTime = calendar.timegm(time.strptime(dates[i]+fTime, '%Y/%m/%d/%H:%M:%S')) # in unix if(fTime >= startUnix and fTime < endUnix): inRange = True # print('\t', inRange) if(inRange): # add file to list fList.append(join(dataDir, f)) # in case the file list is not sorted, look through all files. except OSError: if(verbose): print('getFListInRange() --- Data not found for: ' + dates[i]) return fList
[docs]def getDataInRange(station, startTime, endTime, sortTime=True, convert=True, path='./',dataCh='MagneticFields',saneCh='SanityChannel', verbose=False): ''' Get list of data in time range. Time range uses the start time listed in the 'hdf5' file name. station: str Name of station. startTime: float (unix time), str Earliest time. String formatted as 'yyyy-mm-dd-HH-MM-SS' (omitted values defaulted as 0) endTime: float (unix time), str Last time. Format same as startTime sortTime: bool (default: True) Actively sort output by start time (using data in file) convert: boolean (default: True) Whether to use conversion function from file. path: str (default './') Location of files dataCh: string (default "MagneticFields") Name of data channel. saneCh: string (default "SanityChannel") Name of sanity channel. verbose: bool (default False) Verbose output returns (data, sanity, fileList). Data and sanity are astropy TimeSeriesList Note: must evaluate values in 'sanity' (e.g., using 'value' attribute) to get boolean Note: use, e.g., dataTSL.join(pad=float('nan'),gap='pad') to combine TimeSeriesList into single Time series. ''' if(verbose): print('getDataInRange() --- Finding files') fList = getFListInRange(station, startTime, endTime, path=path) numFiles = len(fList) # get data if(verbose): print('getDataInRange() --- Reading files') dataList = [None]*numFiles saneList = [None]*numFiles for i in range(numFiles): dataList[i],saneList[i] = getDataFromFile(fList[i],convert=convert,dataCh=dataCh,saneCh=saneCh) # sort if needed if(sortTime): if(verbose): print('getDataInRange() --- Sorting data') # do insertion sort (likely that list is sorted) sortIndex = range(numFiles) # sorted list of indices for sRange in range(1, numFiles): # sRange is size of sorted segment # note, sortIndex[sRange] = sRange insPtTime = dataList[sRange].epoch # for point being inserted insHere = sRange # place to insert point while(insHere > 0 and dataList[sortIndex[insHere-1]].epoch > insPtTime): insHere -= 1 # decrement until finding place to insert # insert point dummy1 = sRange # point being moved while(insHere <= sRange): dummy2 = sortIndex[insHere] sortIndex[insHere] = dummy1 dummy1 = dummy2 insHere+=1 else: sortIndex = range(numFiles) # put data in TimeSeriesList dataTSL = TimeSeriesList() saneTSL = TimeSeriesList() for i in sortIndex: dataTSL.append(dataList[i]) saneTSL.append(saneList[i]) return dataTSL, saneTSL, [fList[i] for i in sortIndex]
def getFileNameTime(fileName): ''' Gives unix time from file name (does not read file). Files of the form */yyyy/mm/dd/*hhmmss.hdf5, no '/' after/in last * fileName: str file name return unix time of file, according to name ''' fileComp = str.split(fileName,'/') fileComp = fileComp[-4:] # last 4 elements year = int(fileComp[0]) month = int(fileComp[1]) day = int(fileComp[2]) dayTime = fileComp[3][-11:-5] # hhmmss hour = int(dayTime[:2]) minute = int(dayTime[2:4]) second = int(dayTime[4:]) timeList = [year, month, day, hour, minute, second,0,0,0] # date-time tuples (note that last 3 should never be changed) return calendar.timegm(timeList) def getFListFromDates(station,dates,path='./', verbose=False): ''' Gets list of files from a date (relies on file organization: path/station/yyyy/mm/dd/). station: str Name of station used in file system dates: list<str> List of date strings (format: year-month-day) path: str (default './') Data path of folder holding station folders verbose: bool (default: False) print when file not found returns list of file names ''' if(isinstance(dates, str)): # make 'dates' an array if single val given dates = [dates] fList = [] #will hold list of files for date in dates: dArr = [int(s) for s in date.split('-') if s.isdigit()] #split date into integer array dataDir = join(path,station,"{0:04d}/{1:02d}/{2:02d}".format(dArr[0],dArr[1],dArr[2])) #directory of files from date # append list of files (exclude directories). Use full path try: fList.extend([join(dataDir, f) for f in listdir(dataDir) if isfile(join(dataDir, f))]) except OSError: if(verbose): print('Data not found for: ' + date) return fList def getStartTimes(fList): ''' Gets start time (in sec since UNIX epoch) for file list. fList: List<str> list of file names retruns list of start times ''' nFiles = len(fList) stTimes = [0.]*nFiles for i in range(nFiles): # if(not i%1000): #print progress # print(i,'/',nFiles) f = fList[i] h5pyFile= h5py.File(f,'r') magFieldAttrs = h5pyFile['MagneticFields'].attrs timeStr = magFieldAttrs['Date'] + ' ' + magFieldAttrs['t0'] + ' UTC' h5pyFile.close() # get time since epoch (in sec) stTimes[i] = calendar.timegm(time.strptime(timeStr, "%Y/%m/%d %H:%M:%S.%f %Z")) return stTimes def getSaneList(fList, working=None): ''' Gets list of good data, i.e., those with perfect samity channels fList: list<str> list of file names working: list<int> indices of working stations. If not specified, all stations are used. returns list of good files (indices) ''' saneList = [] if(not working): # no filter specified working = range(len(fList)) for i in range(len(fList)): fName = fList[i] h5pyFile = h5py.File(fName,'r') if(all(h5pyFile['SanityChannel'].value)): saneList.append(i) h5pyFile.close() return saneList def getCharacteristics(stations,sensTime,path='./'): ''' Get dictionaries with magnetometer information. stations: List<str> List of station names sensTime: float (unix time), str Earliest time. String formatted as 'yyyy-mm-dd-HH-MM-SS' (omitted values defaulted as 0) path: str (default './') Location of files returns poistion (EarthLocation) and orientation (AltAz) dictionaries, respectively -- using astropy classes. If the sensor was off on this day, the next available data will be used. If no later data are available, the previous data are used. ''' # put date in consistant format # Note that the file names do not contain milliseconds, so "time" tuple is ok makeU = True # Need to calculate start time in unix if(not isinstance(sensTime, str)): # given unix time sensUnix = sensTime makeU = False sensTime = time.strftime('%Y-%m-%d-%H-%M-%S',time.gmtime(sensTime)) # Format start/end times (in array and unix time, if needed) sensTList = [0]*9 # date-time tuples (note that last 3 should never be changed) sensTime = str.split(sensTime,'-') sensTList[:len(sensTime)] = [int(t) if len(t)>0 else 0. for t in sensTime] if(makeU): sensUnix = calendar.timegm(sensTList) magPos = {} magOrient = {} # find file near specified time for st in stations: dataDir = join(path,st) relDate = 0 # Negative > Use latest (date before). Positive > Use earliest (date after) try: # check if sensor exists # get year folder foldFiles = listdir(dataDir) foldFiles.sort() # sort years i=0 # find earliest year on or after requested year (or just last year) while(i+1<len(foldFiles) and int(foldFiles[i]) < sensTList[0]): i+=1 dataDir = join(dataDir,foldFiles[i]) relDate = int(foldFiles[i]) - sensTList[0] # get month folder foldFiles = listdir(dataDir) foldFiles.sort() # sort months if(relDate > 0): dataDir = join(dataDir,foldFiles[0]) elif(relDate < 0): dataDir = join(dataDir,foldFiles[-1]) else: i=0 while(i+1<len(foldFiles) and int(foldFiles[i]) < sensTList[1]): i+=1 dataDir = join(dataDir,foldFiles[i]) relDate = int(foldFiles[i]) - sensTList[1] # get day folder foldFiles = listdir(dataDir) foldFiles.sort() # sort days if(relDate > 0): dataDir = join(dataDir,foldFiles[0]) elif(relDate < 0): dataDir = join(dataDir,foldFiles[-1]) else: i=0 while(i+1<len(foldFiles) and int(foldFiles[i]) < sensTList[2]): i+=1 dataDir = join(dataDir,foldFiles[i]) relDate = int(foldFiles[i]) - sensTList[2] # get list of files foldFiles = [f for f in listdir(dataDir) if isfile(join(dataDir, f))] if(len(foldFiles) <= 0): # empty folder raise Exception('Folder empty: '+dataDir) # files names: station_YYYYmmdd_HHMMSS.hdf5 fileTimes = [None]*len(foldFiles) for i in range(len(foldFiles)): # record file time (unix) year=int(foldFiles[i].split('_')[1][:4]) month=int(foldFiles[i].split('_')[1][4:6]) day=int(foldFiles[i].split('_')[1][6:8]) hour=int(foldFiles[i].split('_')[2][:2]) minute=int(foldFiles[i].split('_')[2][2:4]) sec=int(foldFiles[i].split('_')[2][4:6]) fileTimes[i] = calendar.timegm([year,month,day,hour,minute,sec,0,0,0]) # sort list of files by start time (insertion sort) sortList = range(len(foldFiles)) # foldFiles[sortList[:]] is sorted for i in range(1,len(foldFiles)): # first i are sorted # find location to insert sortList[i] j=i # will insert at jth location while(j>0 and fileTimes[i] < fileTimes[j-1]): j-=1 # decrement while index is at a strictly greater time while(j < i): # insert sortList[i] at j location dummy = sortList[j] sortList[j] = sortList[i] sortList[i] = dummy # move next insert to ith location j+=1 if(fileTimes[sortList[0]] >= sensUnix): # time is before list infoFile=foldFiles[sortList[0]] else: sPos = 0 ePos = len(sortList) # binary search for first file time before sensUnix while(sPos < ePos - 1): mPos = (sPos+ePos)//2 if(fileTimes[sortList[mPos]] < sensUnix): sPos = mPos else: ePos = mPos # timeList[sortList[ePos-1]] <= sensUnix < timeList[sortList[ePos]] (if defined) infoFile = foldFiles[sortList[ePos-1]] posOrient = getSensorInfoFromFile(join(dataDir,infoFile)) if(posOrient[0] is not None): magPos[st] = posOrient[0] if(posOrient[1] is not None): magOrient[st] = posOrient[1] except OSError: print("No/empty folder: "+dataDir) return magPos, magOrient def getSensorInfoFromFile(fileName): ''' Get position and orientation from data file. fileName: str Absolute/relative file name. returns (EarthLocation, AltAz) of magnetometer (using astropy classes). ''' try: h5pyFile = h5py.File(fileName,'r') attribs = h5pyFile['MagneticFields'].attrs try: lon = attribs['Longitude'] lat = attribs['Latitude'] alt = attribs['Altitude'] pos = EarthLocation.from_geodetic(lon, lat, height=alt) except KeyError: # attribute not available pos = None try: altDir = attribs['SensorDirAltitude(Deg)'] azDir = attribs['SensorDirAzimuth(Deg)'] orient = AltAz(az=azDir*u.deg, alt=altDir*u.deg) except KeyError: # attribute not available orient = None return pos, orient finally: h5pyFile.close() def getFalseOffset(falseStart, falseEnd, rangeStart, rangeEnd): ''' Generates a random time offset from a false start so that one can collect data in [rangeStart, rangeEnd], but with the length (falseEnd-falseStart). This is useful for collecting data from different sensors sampled from random times in a science run. falseStart: float (unix time), str Earliest artificial time. String formatted as 'yyyy-mm-dd-HH-MM-SS' (omitted values defaulted as 0). falseEnd: float (unix time), str Last artificial time. Format same as falseStart. rangeStart: float (unix time), str Earliest time in actual data. Format same as falseStart. rangeEnd: float (unix time), str Last time in actual data. Format same as falseStart. returns offset (in sec) ''' # put dates in unix time # Note that the file names do not contain milliseconds, so "time" tuple is ok # falseStart -> Unix if(isinstance(falseStart,str)): # given as string tList = [0.]*9 # date-time tuples (note that last 3 should never be changed) strTList = falseStart.split('-') tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList] fsUnix = calendar.timegm(tList) else: fsUnix = falseStart # falseEnd -> Unix if(isinstance(falseEnd,str)): # given as string tList = [0.]*9 # date-time tuples (note that last 3 should never be changed) strTList = falseEnd.split('-') tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList] feUnix = calendar.timegm(tList) else: feUnix = falseEnd # rangeStart -> Unix if(isinstance(rangeStart,str)): # given as string tList = [0.]*9 # date-time tuples (note that last 3 should never be changed) strTList = rangeStart.split('-') tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList] rsUnix = calendar.timegm(tList) else: rsUnix = rangeStart # rangeEnd -> Unix if(isinstance(rangeEnd,str)): # given as string tList = [0.]*9 # date-time tuples (note that last 3 should never be changed) strTList = rangeEnd.split('-') tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList] reUnix = calendar.timegm(tList) else: reUnix = rangeEnd rangeSize = reUnix - rsUnix # size of range sampSize = feUnix - fsUnix # size of sample offMin = rsUnix - fsUnix # offset between range start and false start offSize = rangeSize - sampSize # range size for collecting return offMin + offSize*random() # random offset def falseFListInRange(station, falseStart, falseEnd, offset, path='./', verbose=False): ''' Get list of file names for GNOME experiment within a time period. The start times of the files will be in [falseStart+offset, falseEnd+offset). Data located in folder of the form 'path/station/yyyy/mm/dd/'. Time range uses the start time listed in the 'hdf5' file name. station: str Name of station. falseStart: float (unix time), str Earliest time. String formatted as 'yyyy-mm-dd-HH-MM-SS' (omitted values defaulted as 0). falseEnd: float (unix time), str Last time. Format same as falseStart. offset: float offset of start time (in sec). path: str (default './') Location of files. verbose: bool (default False) Verbose output. returns list of offset file names ''' # put dates in unix time # Note that the file names do not contain milliseconds, so "time" tuple is ok # falseStart -> Unix if(isinstance(falseStart,str)): # given as string tList = [0.]*9 # date-time tuples (note that last 3 should never be changed) strTList = falseStart.split('-') tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList] fsUnix = calendar.timegm(tList) else: fsUnix = falseStart # falseEnd -> Unix if(isinstance(falseEnd,str)): # given as string tList = [0.]*9 # date-time tuples (note that last 3 should never be changed) strTList = falseEnd.split('-') tList[:len(strTList)] = [int(t) if len(t)>0 else 0. for t in strTList] feUnix = calendar.timegm(tList) else: feUnix = falseEnd return getFListInRange(station, fsUnix+offset, feUnix+offset, path=path, verbose=verbose) def offsetTimeSeries(data, offset): ''' Subtracts a time offset to the data; i.e., time -> time-offset. data: gwpy.timeseries.TimeSeries(List) Time series with data. offset: float (in sec), astropy quantity Amount by which to offset the data. ''' if(not isinstance(offset, u.quantity.Quantity)): # float/int given offset = offset*u.s if(isinstance(data,TimeSeriesList)): for ts in data: ts.epoch -= offset else: data.epoch -= offset