Commit 0683399a authored by John Tourtellott's avatar John Tourtellott
Browse files

Add SimulationAssetLocationIndex class, with put_sali.py script

* For tracking models uploaded to Cori.
* For now, the put_sali.py script inserts asset locations into girder
parent af991e8c
# Python files
*.pyc
__pycache__
# Dev venv folders
/dev/sali/sali*
# Girder key files
*.key
"""
Script to insert entry in Simulation Asset Location Index.
This script REQUIRES a key file with "url" and "apikey" fields.
"""
import argparse
import json
import girder_client
from girder_client import GirderClient, HttpError
from writers import sali
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Add remote asset location to SALI')
parser.add_argument('keyfile', help='json file with girder url and apikey strings')
parser.add_argument('newt_sessionid', help='NEWT session id')
parser.add_argument('modelfile', help='model file (path)')
parser.add_argument('cori_path', help='path on Cori file system to save')
parser.add_argument('-r', action='store_true', help='can replace existing cori_path')
args = parser.parse_args()
# Read keyfile
with open(args.keyfile) as key_fp:
js = json.load(key_fp)
girder_url = js['url']
apikey = js['apikey']
if girder_url is None:
raise RuntimeError('key file missing girder_url')
if apikey is None:
raise RuntimeError('key file missing apikey')
# Initialize girder client
url = '%s/api/v1' % girder_url
girder_client = GirderClient(apiUrl=url)
girder_client.authenticate(apiKey=apikey)
sali = sali.SimulationAssetLocationIndex(girder_client, args.newt_sessionid)
result = sali.put(args.modelfile, args.cori_path, remote_machine='cori', can_replace=args.r)
print('finis')
......@@ -33,6 +33,10 @@ from . import newtclient
reload(newtclient) # for dev
from .cumulusclient import CumulusClient
from .newtclient import NewtClient
from .sali import SimulationAssetLocationIndex
newt_url = 'https://newt.nersc.gov/newt'
# ---------------------------------------------------------------------
def submit_ace3p(scope, sim_item):
......@@ -75,6 +79,10 @@ def submit_ace3p(scope, sim_item):
# Initialize CumulusClient
scope.cumulus = create_cumulus_client(scope, sim_item)
# Check if simulation assets (model) are already available
# on the remote machine.
_check_asset_locations(scope)
# Create cluster
machine = get_string(sim_item, 'Machine')
print('machine', machine)
......@@ -126,15 +134,13 @@ def login_nersc(scope, sim_item):
Note that this method must *also* initialize NewtClient instance
'''
nersc_url = 'https://newt.nersc.gov/newt'
# Check if sesson id provided
credentials_item = sim_item.find('NERSCCredentials')
credentials_type = credentials_item.value()
if credentials_type == 'newt_sessionid':
session_item = credentials_item.find('NEWTSessionId')
session_id = session_item.value()
scope.nersc = NewtClient(nersc_url, session_id)
scope.nersc = NewtClient(newt_url, session_id)
return session_id
elif credentials_type != 'login':
......@@ -153,7 +159,7 @@ def login_nersc(scope, sim_item):
mfa = get_string(sim_item, 'NERSCMultfactorToken')
scope.nersc = NewtClient(nersc_url)
scope.nersc = NewtClient(newt_url)
r = scope.nersc.login(username, password+mfa)
return scope.nersc.get_sessionid()
......@@ -405,3 +411,31 @@ def get_string(group_item, name):
return None
return item.value(0)
# ---------------------------------------------------------------------
def _check_asset_locations(scope):
"""Checks for simulation assets available on remote machine
"""
# Current implementation only supports files to upload (todo folders)
# Current implementation only supports ONE remote file (symlink)
if scope.symlink is not None:
return # already in use
# Initialize Simulation Asset Location Index
location_index = SimulationAssetLocationIndex(scope.cumulus._client, scope.newt_sessionid)
# Traverse files_to_upload
for local_path in scope.files_to_upload:
# Check if local_path is a model
basename,ext = os.path.splitext(local_path)
if ext not in ['.gen', '.ncdf']:
continue
# Check location index for a remote path
remote_path = location_index.query(local_path)
if remote_path is not None:
scope.files_to_upload.remove(local_path)
scope.symlink = remote_path
# Can only use 1 symlink for now, so...
break
"""
Class using girder to track simulation resources uploaded to remote machines.
SALI = Simulation Asset Location Index
This first implementation creates a folder "SALI" under the user's Private folder,
and creates items to represent assets on the remote machine. The item name is set
to the local filename, and item meta data is used to store md5 sums and locations
for the local and remote files. An example metadata is:
{
"cori: {
"md5": "acada04583414ca8e5e646899e70e06d
"path": "/scratch2/scratchdirs/johnt/cw18/pillbox4.ncdf"
},
"sources": [
{
"hostname": "turtleland4",
"md5": "c3fe41261f6276ed81b6652103d02e9b",
"path": "/home/john/projects/slac/git/smtk/data/model/3d/genesis/pillbox4.gen"
}
]
}
"""
import argparse
import hashlib
import json
import os
import socket
import requests
import girder_client
from girder_client import GirderClient, HttpError
NEWT_URL = 'https://newt.nersc.gov/newt'
class SimulationAssetLocationIndex():
"""Utility for tracking simulation resources uploaded to remote machines at NERSC.
Stores info in girder instance.
"""
def __init__(self, girder_client, newt_sessionid):
""""""
self._girder_client = girder_client
self._sali_folder_id = None
self._newt_requests = requests.Session()
self._newt_requests.cookies.update(dict(newt_sessionid=newt_sessionid))
# Get user's private folder
user = self._girder_client.get('user/me')
user_id = user['_id']
gen = self._girder_client.listFolder(user_id, 'user', name='Private')
private_folder = self._next_item(gen)
if private_folder is None:
raise RuntimeError('Failed to find Private folder for this user')
private_folder_id = private_folder['_id']
# print('private_folder_id', private_folder_id)
# Get SALI folder (create if needed)
gen = self._girder_client.listFolder(private_folder_id, name='SALI')
sali_folder = self._next_item(gen)
if sali_folder is None:
# Create folder now
sali_folder = self._girder_client.createFolder(private_folder_id, 'SALI',
description='Simulation Asset Location Index - first prototype')
print('Created SALI folder, id {}'.format(sali_folder['_id']))
self._sali_folder_id = sali_folder['_id']
def query(self, local_location, remote_machine='cori', verify=True, return_all_metadata=False):
"""Checks for item in the SALI folder with the given model name."""
filename = os.path.basename(local_location)
# print('query name:', filename)
gen = self._girder_client.listItem(self._sali_folder_id, name=filename)
item = self._next_item(gen)
if item is None:
return None
if verify:
# TODO Verify local location md5sum
# TODO Verify remote location exists
# TODO Verify remote location md5sum
remote_location = item.get('meta').get(remote_machine, {}).get('path')
print('Warning: simulation asset was NOT verified:', remote_location)
if return_all_metadata:
return item.get('meta')
# (else)
remote_location = item.get('meta').get(remote_machine, {}).get('path')
return remote_location
def put(self, local_location, remote_location, remote_machine='cori', can_replace=False):
"""Adds item to SALI folder.
Returns boolean indicating success.
"""
# Make sure that local file exists
if not os.path.exists(local_location):
print('Local file not found at {}'.format(local_location))
return False
filename = os.path.basename(local_location)
# Get local md5 and hostname
local_md5 = None
with open(local_location, 'rb') as fp:
local_md5 = hashlib.md5(fp.read()).hexdigest()
if local_md5 is None:
print('Error getting model file md5')
return False
print('local md5:', local_md5)
hostname = socket.gethostname()
print('local hostname:', hostname)
# Check remote file
url = '{}/command/{}'.format(NEWT_URL, remote_machine)
data = {
'executable': '/usr/bin/ls {}'.format(remote_location),
'loginenv': 'true'
}
r = self._newt_requests.post(url, data=data)
print('ls command returned', r.json())
ls_result = r.json()
if ls_result['error']:
print(ls_result['error'])
return False
# Get md5 for remote file
data['executable'] = '/usr/bin/md5sum {}'.format(remote_location)
r = self._newt_requests.post(url, data=data)
print('md5sum command returned', r.json())
md5_result = r.json()
if md5_result['error']:
print(md5_result['error'])
return False
# Valid output is of the form "<md5sum> <path>"
output = md5_result['output']
remote_md5 = output.split(' ')[0]
# Check if item for this filename is already in girder
gen = self._girder_client.listItem(self._sali_folder_id, name=filename)
item = self._next_item(gen)
if item is not None and not can_replace:
print('Item already exits for {}. Use can_replace flag to overwrite'.format(filename))
return False
source = dict(hostname=hostname, path=local_location, md5=local_md5)
remote = dict(path=remote_location, md5=remote_md5)
metadata = dict(sources=[source])
metadata[remote_machine] = remote
item = self._girder_client.createItem(
self._sali_folder_id, filename, reuseExisting=True, metadata=metadata)
print('createItem returned', item)
return True
def _check_remote_file(self, remote_path, md5=None):
"""Verifies that remote_path exists, and checkes md5 if specified.
Returns boolean indication success.
"""
return True
def _next_item(self, gen):
"""Returns next item from generator, or None if empty"""
try:
entry = next(gen)
except StopIteration:
return None
return entry
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment