diff --git a/generic/sftp/BB_SftpPollerBase-v1.0.qclass b/generic/sftp/BB_SftpPollerBase-v1.0.qclass index 23a18990603179625fc3e3f7448831ff288b87b6..1273696327539668ecf5a430ec3fc119fed02560 100644 --- a/generic/sftp/BB_SftpPollerBase-v1.0.qclass +++ b/generic/sftp/BB_SftpPollerBase-v1.0.qclass @@ -104,6 +104,8 @@ class BB_SftpPollerBase { This method then calls singleFileEventImpl() for further processing; the event argument to this method has the following additional keys if \c sftp-polling-file-connection is set: + - \c transfer_time: a relative date/time value giving the time it took to transfer the file + - \c tmp_filename: the temporary filename used to store the file until the post action - \c tmp_path: the current temporary path on the filesystem of the file - \c local_path: the target path for the file where the file will be moved to in the post event action */ @@ -127,10 +129,11 @@ class BB_SftpPollerBase { sftpclient.retrieveFile(event.filepath, tmp_path); event += { "transfer_time": now_us() - start, + "tmp_filename": tmp_filename, "tmp_path": tmp_path, + "local_path": local_path, }; - UserApi::logInfo("retrieved %y in %y", event.name, event.transfer_time); - event.local_path = local_path; + UserApi::logInfo("retrieved %y in %y (temp path %y)", event.name, event.transfer_time, tmp_path); } else { # if we have no local filesystem connection, then by default we retrieve the remote file and place the # data in the "data" key in the event hash @@ -155,13 +158,22 @@ class BB_SftpPollerBase { - \c data: this key is only present if \c sftp-polling-file-connection not set; in this case the remote file data is retrieved in memory and stored in this key as binary data - \c filepath: the remote filepath relative to SFTP root directory - - \c local_path: this key is set to a string giving the local path where the file's data is stored if - config item \c sftp-polling-file-connection is set + . + The following keys are included if config item \c sftp-polling-file-connection is set: + - \c transfer_time: a relative date/time value giving the time it took to transfer the file + - \c tmp_filename: the temporary filename used to store the file until the post action + - \c tmp_path: the current temporary path on the filesystem of the file + - \c local_path: this key is set to a string giving the local path where the file's data is stored + + @return the event hash plus the following keys: + - \c remote_status: either \c "MOVED" or \c "DELETED" depending on the action taken, if + \c sftp-polling-delete-file if false, then \c sftp-polling-move-target must be set, in which case also the + \c remote_move_target key is set to the path the remote file is moved to If config item \c sftp-polling-delete-file is true, then the remote file is deleted here. - Otherwise, if \c sftp-polling-move-target is true, then the remove file is moved to the target location on the - remove server here. + Otherwise, if \c sftp-polling-move-target is true, then the remote file is moved to the target location on the + remote server here. */ hash postSingleFileEvent(hash event) { # move to target filename @@ -176,7 +188,7 @@ class BB_SftpPollerBase { event.remote_status = RS_DELETED; return event; } - *string move_target = UserApi::getConfigItemValue("sftp-polling-move-target", event.name); + *string move_target = UserApi::getConfigItemValue("sftp-polling-move-target", event); if (!exists move_target) { throw "CONFIG-ERROR", sprintf("\"sftp-polling-delete-file\" is false but \"sftp-polling-move-target\" " "is not set; no post-processing of the SFTP files is possible; fix the configuration and try again"); diff --git a/test/job/sftp/bb-test-sftp-job.qtest b/test/job/sftp/bb-test-sftp-job.qtest index 57958ddb89af2e8a3bda2896d9ceeb012566b5d9..b8d3d16bc0f335e6f0877b177937b44861f8ed01 100755 --- a/test/job/sftp/bb-test-sftp-job.qtest +++ b/test/job/sftp/bb-test-sftp-job.qtest @@ -41,6 +41,7 @@ class BB_TestSftpJob inherits QorusJobTest { } constructor() : QorusJobTest(JobName, "1.0", \ARGV, MyOpts) { + addTestCase("SFTP job move test", \sftpJobMoveTest()); addTestCase("SFTP job FS test", \sftpJobFsTest()); addTestCase("SFTP job transfer test", \sftpJobTransferTest()); set_return_value(main()); @@ -124,6 +125,63 @@ private usageIntern(int offset = OffsetColumn) { qrest.del("remote/user/" + sftp_conn_name); } + sftpJobMoveTest() { + # setup move directory + makeMoveDirectory(); + hash job_config = { + "sftp-polling-delete-file": False, + "sftp-polling-move-target": "move/$local:{name}", + }; + map qrest.put("jobs/" + JobName + "/config/" + $1.key, {"value": $1.value}), job_config.pairIterator(); + on_exit { + map qrest.del("jobs/" + JobName + "/config/" + $1), keys job_config; + } + + # create example file + string file_path = writeSftpFile(); + string filename = basename(file_path); + + # make sure files that don't match the mask are not polled + TmpFile ignore_file("test-", ".ignore", sftp_tmpdir.path); + + RunJobResult job(OMQ::StatComplete); + exec(job); + + int jiid = job.getJobResult().job_instanceid; + hash results = qrest.get("jobresults/" + jiid).info[0]; + assertEq("MOVED", results.remote_status); + assertEq(OMQ::StatBlocked, results.order_status); + int wfiid = results.workflow_instanceid; + # wait for workflow order to be COMPLETE + WaitForWfiid wait(wfiid, NOTHING, 5s); + wait.run(self); + hash order = qrest.get("orders/" + wfiid); + assertEq(filename, order.keys.filename); + assertEq(filename, order.staticdata.name); + assertEq(filename, order.staticdata.filepath); + assertEq(Type::Date, order.staticdata.transfer_time.type()); + assertEq(Type::Date, order.staticdata.atime.type()); + assertEq(Type::Date, order.staticdata.mtime.type()); + assertEq("REGULAR", order.staticdata.type); + assertEq(TestData, order.staticdata."data".toString()); + + assertTrue(is_file(ignore_file.path)); + + # verify that the file was moved + if (sftp_url) { + SFTPClient sftp = getClient(); + assertEq(filename, sftp.list("move").files[0]); + } else { + assertTrue(is_file(sftp_tmpdir.path + DirSep + "move" + DirSep + filename)); + } + + # run the job again to ensure that no file is polled + exec(job); + jiid = job.getJobResult().job_instanceid; + results = qrest.get("jobresults/" + jiid); + assertEq((), results.info); + } + sftpJobFsTest() { string file_conn_name = get_random_string(20); @@ -158,6 +216,7 @@ private usageIntern(int offset = OffsetColumn) { int jiid = job.getJobResult().job_instanceid; hash results = qrest.get("jobresults/" + jiid).info[0]; + assertEq("DELETED", results.remote_status); assertEq(OMQ::StatBlocked, results.order_status); int wfiid = results.workflow_instanceid; # wait for workflow order to be COMPLETE @@ -211,26 +270,47 @@ private usageIntern(int offset = OffsetColumn) { int wfiid2 = results.workflow_instanceid; assertEq(wfiid, wfiid2); assertTrue(results.duplicate); + assertEq("DELETED", results.remote_status); assertEq(OMQ::StatComplete, results.order_status); } + + # run the job again to ensure that no file is polled + exec(job); + jiid = job.getJobResult().job_instanceid; + results = qrest.get("jobresults/" + jiid); + assertEq((), results.info); + } + + private makeMoveDirectory() { + if (sftp_url) { + SFTPClient sftp = getClient(); + sftp.mkdir("move"); + } else { + mkdir(sftp_tmpdir.path + DirSep + "move"); + } + } + + private SFTPClient getClient() { + SFTPClient sftp(sftp_url); + if (is_file(ENV.HOME + "/.ssh/id_rsa.old")) { + sftp.setKeys(ENV.HOME + "/.ssh/id_rsa.old"); + } else if (is_file(ENV.HOME + "/.ssh/id_rsa")) { + sftp.setKeys(ENV.HOME + "/.ssh/id_rsa"); + } + # change to target directory + hash url_info = parse_url(sftp_url); + if (url_info.path) { + sftp.chdir(url_info.path); + } + return sftp; } private string writeSftpFile(*string file_path) { if (sftp_url) { - SFTPClient sftp(sftp_url); - if (is_file(ENV.HOME + "/.ssh/id_rsa.old")) { - sftp.setKeys(ENV.HOME + "/.ssh/id_rsa.old"); - } else if (is_file(ENV.HOME + "/.ssh/id_rsa")) { - sftp.setKeys(ENV.HOME + "/.ssh/id_rsa"); - } + SFTPClient sftp = getClient(); if (!file_path) { file_path = sprintf("test-%s.txt", get_random_string()); } - # write to target directory - hash url_info = parse_url(sftp_url); - if (url_info.path) { - sftp.chdir(url_info.path); - } int bytes = sftp.putFile(TestData, file_path); if (m_options.verbose > 2) { printf("wrote %d bytes of %y to %y\n", bytes, file_path, sftp_url); @@ -257,17 +337,12 @@ private usageIntern(int offset = OffsetColumn) { # make sure files that don't match the mask are not polled TmpFile ignore_file("test-", ".ignore", sftp_tmpdir.path); - # start test workflow - qrest.put("workflows/" + WorkflowName + "/setAutostart", {"autostart": 1}); - on_exit { - qrest.put("workflows/" + WorkflowName + "/setAutostart", {"autostart": 0}); - } - RunJobResult job(OMQ::StatComplete); exec(job); int jiid = job.getJobResult().job_instanceid; hash results = qrest.get("jobresults/" + jiid).info[0]; + assertEq("DELETED", results.remote_status); assertEq(OMQ::StatBlocked, results.order_status); int wfiid = results.workflow_instanceid; # wait for workflow order to be COMPLETE @@ -294,6 +369,13 @@ private usageIntern(int offset = OffsetColumn) { int wfiid2 = results.workflow_instanceid; assertEq(wfiid, wfiid2); assertTrue(results.duplicate); + assertEq("DELETED", results.remote_status); assertEq(OMQ::StatComplete, results.order_status); + + # run the job again to ensure that no file is polled + exec(job); + jiid = job.getJobResult().job_instanceid; + results = qrest.get("jobresults/" + jiid); + assertEq((), results.info); } } \ No newline at end of file