# # # This source code is subject to the license referenced at
# # # https://github.com/NRLMMD-GEOIPS.
"""Utilities for creating a database of tropical cyclone tracks."""
from geoips.filenames.base_paths import PATHS as gpaths
import logging
import sqlite3
LOG = logging.getLogger(__name__)
TC_DECKS_DB = gpaths["TC_DECKS_DB"]
TC_DECKS_DIR = gpaths["TC_DECKS_DIR"]
[docs]def open_tc_db(dbname=TC_DECKS_DB):
"""Open the TC Decks Database, create it if it doesn't exist."""
# Make sure the directory exists. If the db doesn't exist,
# the sqlite3.connect command will create it - which will
# fail if the directory doesn't exist.
from os.path import dirname as pathdirname
from geoips.filenames.base_paths import make_dirs
make_dirs(pathdirname(dbname))
conn = sqlite3.connect(dbname)
LOG.interactive("Opening tc db: %s", dbname)
conn_cursor = conn.cursor()
# Try to create the table - if it already exists, it will just fail
# trying to create, pass, and return the already opened db.
try:
conn_cursor.execute(
"""CREATE TABLE tc_trackfiles
(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
filename text,
last_updated timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
storm_num integer,
storm_basin text,
start_datetime timestamp,
start_lat real,
start_lon real,
start_vmax real,
start_name real,
vmax real,
end_datetime timestamp)"""
)
# Add in at some point?
# storm_start_datetime timestamp,
except sqlite3.OperationalError:
pass
return conn_cursor, conn
[docs]def check_db(filenames=None, process=False):
"""Check TC database for passed filenames.
filenames is a list of filenames and directories. if a list element is a
string directory name, it expands to list of files in dir.
"""
from os.path import join as pathjoin
from glob import glob
if filenames is None:
filenames = glob(pathjoin(TC_DECKS_DIR, "*"))
updated_files = []
cc, conn = open_tc_db()
# We might want to rearrange this so we don't open up every file...
# Check timestamps first.
for filename in filenames:
try:
updated_files += update_fields(filename, cc, conn, process=process)
except IndexError:
LOG.warning("Failed parsing %s", filename)
continue
cc.execute("SELECT * FROM tc_trackfiles")
# data = cc.fetchall()
conn.close()
# return data
LOG.interactive("%s updated storms", len(updated_files))
return updated_files
[docs]def update_fields(tc_trackfilename, cc, conn, process=False):
"""Update fields in TC track database with passed tc_trackfilename."""
# Must be of form similar to
# Gal912016.dat
import re
from datetime import datetime, timedelta
from os.path import basename as pathbasename
from os import stat as osstat
updated_files = []
LOG.info("Checking " + tc_trackfilename + " ... process " + str(process))
# Check if we match Gxxdddddd.dat filename format.
# If not just return and don't do anything.
if (
not re.compile(r"G\D\D\d\d\d\d\d\d\.\d\d\d\d\d\d\d\d\d\d.dat").match(
pathbasename(tc_trackfilename)
)
and not re.compile(r"G\D\D\d\d\d\d\d\d\.dat").match(
pathbasename(tc_trackfilename)
)
and not re.compile(r"b\D\D\d\d\d\d\d\d\.\d\d\d\d\d\d\d\d\d\d.dat").match(
pathbasename(tc_trackfilename)
)
and not re.compile(r"b\D\D\d\d\d\d\d\d\.dat").match(
pathbasename(tc_trackfilename)
)
):
LOG.info("")
LOG.warning(
" DID NOT MATCH REQUIRED FILENAME FORMAT, SKIPPING: %s", tc_trackfilename
)
return []
# Get all fields for the database entry for the current filename
cc.execute("SELECT * FROM tc_trackfiles WHERE filename = ?", (tc_trackfilename,))
data = cc.fetchone()
file_timestamp = datetime.fromtimestamp(osstat(tc_trackfilename).st_mtime)
# Reads timestamp out as string - convert to datetime object.
# Check if timestamp on file is newer than timestamp in database -
# if not, just return and don't do anything.
if data:
try:
database_timestamp = datetime.strptime(
cc.execute(
"SELECT last_updated from tc_trackfiles WHERE filename = ?",
(tc_trackfilename,),
).fetchone()[0],
"%Y-%m-%d %H:%M:%S.%f",
)
except ValueError as resp:
LOG.exception(f"Failed on {tc_trackfilename}")
raise (ValueError(f"FAILED ON {tc_trackfilename}: {resp}"))
if file_timestamp < database_timestamp:
LOG.info("")
LOG.interactive(
"%s already in %s and up to date, not doing anything",
tc_trackfilename,
TC_DECKS_DB,
)
return []
lines = open(tc_trackfilename, "r").readlines()
# Remove any whitespace from the loaded deck file lines - otherwise will fail
# parsing datetime information downstream
lines = [x.replace(" ", "") for x in lines]
start_line = lines[0].split(",")
# Start 24h prior to start in sectorfile, for initial processing
# storm_start_datetime = datetime.strptime(start_line[2],'%Y%m%d%H')
start_datetime = datetime.strptime(start_line[2], "%Y%m%d%H") - timedelta(hours=24)
end_datetime = datetime.strptime(lines[-1].split(",")[2], "%Y%m%d%H")
start_vmax = start_line[8]
vmax = 0
for line in lines:
currv = line.split(",")[8]
track = line.split(",")[4]
if currv and track == "BEST" and float(currv) > vmax:
vmax = float(currv)
if data and database_timestamp < file_timestamp:
LOG.interactive("")
LOG.interactive(
"Updating start/end datetime and last_updated fields for "
+ tc_trackfilename
+ " in "
+ TC_DECKS_DB
)
old_start_datetime, old_end_datetime, old_vmax = cc.execute(
"SELECT start_datetime,end_datetime,vmax from tc_trackfiles WHERE "
"filename = ?",
(tc_trackfilename,),
).fetchone()
# Eventually add in storm_start_datetime
# old_storm_start_datetime,old_start_datetime,old_end_datetime,old_vmax =
# cc.execute("SELECT storm_start_datetime,start_datetime,end_datetime,vmax
# from tc_trackfiles WHERE filename = ?", (tc_trackfilename,)).fetchone()
if old_start_datetime == start_datetime.strftime("%Y-%m-%d %H:%M:%S"):
LOG.interactive(" UNCHANGED start_datetime: " + old_start_datetime)
else:
LOG.interactive(
" Old start_datetime: "
+ old_start_datetime
+ " to new: "
+ start_datetime.strftime("%Y-%m-%d %H:%M:%S")
)
updated_files += [tc_trackfilename]
# if old_storm_start_datetime ==
# storm_start_datetime.strftime('%Y-%m-%d %H:%M:%S'):
# LOG.info(' UNCHANGED storm_start_datetime: '+old_storm_start_datetime)
# else:
# LOG.info(' Old storm_start_datetime: '+old_storm_start_datetime+' to
# new: '+storm_start_datetime.strftime('%Y-%m-%d %H:%M:%S'))
if old_end_datetime == end_datetime.strftime("%Y-%m-%d %H:%M:%S"):
LOG.interactive(" UNCHANGED end_datetime: " + old_end_datetime)
else:
LOG.interactive(
" Old end_datetime: "
+ old_end_datetime
+ " to new: "
+ end_datetime.strftime("%Y-%m-%d %H:%M:%S")
)
updated_files += [tc_trackfilename]
if database_timestamp == file_timestamp:
LOG.interactive(
" UNCHANGED last_updated: "
+ database_timestamp.strftime("%Y-%m-%d %H:%M:%S")
)
else:
LOG.interactive(
" Old last_updated: "
+ database_timestamp.strftime("%Y-%m-%d %H:%M:%S")
+ " to new: "
+ file_timestamp.strftime("%Y-%m-%d %H:%M:%S")
)
updated_files += [tc_trackfilename]
if old_vmax == vmax:
LOG.interactive(" UNCHANGED vmax: " + str(old_vmax))
else:
LOG.interactive(" Old vmax: " + str(old_vmax) + " to new: " + str(vmax))
updated_files += [tc_trackfilename]
cc.execute(
"""UPDATE tc_trackfiles SET
last_updated=?,
start_datetime=?,
end_datetime=?,
vmax=?
WHERE filename = ?""",
# Eventually add in ?
# storm_start_datetime=?,
(
file_timestamp,
# storm_start_datetime,
start_datetime,
end_datetime,
str(vmax),
tc_trackfilename,
),
)
conn.commit()
return updated_files
start_lat = start_line[6].replace("N", "")
if "S" in start_lat:
start_lat = "-" + start_lat.replace("S", "")
start_lon = start_line[7].replace("E", "")
if "W" in start_lon:
start_lon = "-" + start_lon.replace("W", "")
storm_basin = start_line[0]
storm_num = start_line[1]
if re.compile(r"b\D\D\d\d\d\d\d\d\.dat").match(
pathbasename(tc_trackfilename)
) or re.compile(r"b\D\D\d\d\d\d\d\d\.\d\d\d\d\d\d\d\d\d\d.dat").match(
pathbasename(tc_trackfilename)
):
# For the most part, b-deck files line up fairly with G-deck files, but the
# index of the start name is wildly different. I almost wonder if we should
# use the b-deck parser to grab all this information, but this is hopefully
# good enough
start_name = start_line[27]
else:
# Keep the old logic for parsing the start_name out of the G-deck files
try:
start_name = start_line[48] + start_line[49]
except IndexError:
start_name = start_line[41]
if data is None:
# print ' Adding '+tc_trackfilename+' to '+TC_DECKS_DB
cc.execute(
"""insert into tc_trackfiles(
filename,
last_updated,
vmax,
storm_num,
storm_basin,
start_datetime,
start_lat,
start_lon,
start_vmax,
start_name,
end_datetime) values(?, ?,?, ?,?,?,?,?,?,?,?)""",
# Eventually add in ?
# end_datetime) values(?, ?, ?,?, ?,?,?,?,?,?,?,?)''',
# storm_start_datetime,
(
tc_trackfilename,
file_timestamp,
str(vmax),
storm_num,
storm_basin,
# storm_start_datetime,
start_datetime,
start_lat,
start_lon,
start_vmax,
start_name,
end_datetime,
),
)
LOG.info("")
LOG.interactive(" Adding " + tc_trackfilename + " to " + TC_DECKS_DB)
updated_files += [tc_trackfilename]
conn.commit()
# This ONLY runs if it is a brand new storm file and we requested
# processing. Not used right now -- includes errors.
# if process:
# reprocess_storm(tc_trackfilename)
return updated_files
# The function below was commented out as it included errors, and is not used by GeoIPS
# at this time. 9/27/23
# def reprocess_storm(tc_trackfilename):
# """Reprocess storm tc_trackfilename, using info in TC tracks database."""
# datelist = [startdt.strftime("%Y%m%d")]
# for nn in range((enddt - startdt).days + 2):
# datelist += [(startdt + timedelta(nn)).strftime("%Y%m%d")]
# hourlist = []
# for ii in range(24):
# hourlist += [(enddt - timedelta(hours=ii)).strftime("%H")]
# hourlist.sort()
# # Do newest first
# datelist.sort(reverse=True)
# for sat, sensor in [
# ("gcom-w1", "amsr2"),
# ("gpm", "gmi"),
# ("npp", "viirs"),
# ("jpss-1", "viirs"),
# ("aqua", "modis"),
# ("terra", "modis"),
# ("himawari-8", "ahi"),
# ("goes-16", "abi"),
# ("goes-17", "abi"),
# ]:
# for datestr in datelist:
# process_overpass(
# sat,
# sensor,
# productlist=None,
# sectorlist=[startstormsect.name],
# sectorfiles=None,
# extra_dirs=None,
# sector_file=sector_file,
# datelist=[datestr],
# hourlist=hourlist,
# queue=os.getenv("DEFAULT_QUEUE"),
# mp_max_cpus=3,
# allstatic=False,
# alldynamic=True,
# # list=True will just list files and not actually run
# # list=True,
# list=False,
# quiet=True,
# start_datetime=startdt,
# end_datetime=enddt,
# )
[docs]def get_all_storms_from_db(
start_datetime,
end_datetime,
tc_spec_template=None,
trackfile_parser=None,
include_track_files=False,
):
"""Get all entries from all storms within a specific range of time from the TC DB.
Parameters
----------
start_datetime : datetime.datetime
Start time of desired range
end_datetime : datetime.datetime
End time of desired range
Returns
-------
list of pyresample Area Definitions
List of pyresample Area Definitions, each storm location that falls
within the desired time range.
Examples
--------
>>> startdt = datetime.strptime('20200216', '%Y%m%d')
>>> enddt = datetime.strptime('20200217', '%Y%m%d')
>>> get_storm_from_db(startdt, enddt)
"""
from os.path import exists as path_exists
return_area_defs = []
connection_cursor, connection = open_tc_db()
LOG.interactive(
"Getting all storms from tcdb from '%s' to '%s'",
start_datetime,
end_datetime,
)
LOG.info("connection: %s", connection)
try:
connection_cursor.execute(
"""SELECT filename from tc_trackfiles WHERE
end_datetime >= ?
AND start_datetime <= ?""",
(start_datetime, end_datetime),
)
except sqlite3.InterfaceError as resp:
LOG.exception(f"{resp} error getting fields from database, return empty list")
return return_area_defs
deck_filenames = connection_cursor.fetchall()
from geoips.sector_utils.tc_tracks import trackfile_to_area_defs
for deck_filename in deck_filenames:
if deck_filename is not None:
# Is a tuple
(deck_filename,) = deck_filename
LOG.info("deck_filename %s", deck_filename)
if not path_exists(deck_filename):
LOG.warning("Deck file does not exist! %s", deck_filename)
continue
area_defs = trackfile_to_area_defs(
deck_filename,
trackfile_parser=trackfile_parser,
tc_spec_template=tc_spec_template,
)
for area_def in area_defs:
if (
area_def.sector_start_datetime > start_datetime
and area_def.sector_start_datetime < end_datetime
):
if include_track_files:
return_area_defs += [(area_def, deck_filename)]
else:
return_area_defs += [area_def]
LOG.interactive("%s storms found in time range in tcdb!", len(return_area_defs))
# return None if no storm matched
return return_area_defs