Automate file processing with Buckets and Objects Part II

Automate receiving files from various sources, processing them, and sending them to external destinations.

·

18 min read

(Part II of a series on Putting Buckets and Objects to work. Part I Putting Oracle Buckets and Objects To Work created the foundation for viewing and managing Buckets in an APEX application unifying different data sources and the user-interface in a scalable, versatile manner.)

Context

Depending on the quantity and complexity of data processed from and to external sources achieving productivity, consistency, accuracy, and security can be challenging. Secure FTP (sFTP) is one channel that acts as a gateway for sending, receiving, and staging/storing files through Buckets and Objects. REST Data Sources and websites are other data channel examples.

The source code presented below is not production ready but is intended for illustrative purposes. It does not include error handling, logging, or security authorization, and in some cases are code snippets, recognizing different standards among organizations and for brevity.

Objective

Automate the receiving, sending, and processing of files utilizing OCI Buckets, Objects and APEX in a secure, consistent manner that increases productivity using sFTP as an example source and destination.

Benefits

  • Productivity: Increase productivity by automating data transmission and processing thereby reducing required effort.
  • Consistency and Accuracy: Increase consistency and accuracy with automated processes and workflow following the same rules each time.
  • Data Driven: Minimize "hard coding" with data driven processing based on Object Registry attributes. Changes to Object attributes such as database instance, process to run, whether it should be processed (an on/off switch), credentials, full URIs, etc. can be queried by processes in the database and are updateable through an APEX application instead of needing changes to program source code.
  • Flexibility: Adding new files or adjusting to modifications of Originator or Destination requirements are more likely handled in the user-interface, for example adding an entry for another Marketing file to import from the Market Analytics firm.
  • Insight: Access to the data through an APEX user-interface makes it easier to determine what processes are in place, log history, and overall relationships.

Solution

Let's begin with the architecture described in Part I of the series, "Putting Oracle Buckets and Objects To Work".

060_Bucket_architecture_Diagram.jpg

In brief, the table XE_Buckets unifies Buckets from two different regions using REST data sources and synchronization with tables that mirror REST data source structures and values. User-defined columns exist in XE_Buckets, not the synchronization tables. This Blog assumes that architecture is in place.

The architecture below picks up where Part I left off integrating sFTP, OCI Buckets, Objects, processes, and an APEX application.

600_FTP Architecture.jpg

The "Data Originator" transmits files via sFTP to the sFTP Server. Command Line Interface (CLI) Oracle CLI scripts regularly running on the sFTP Server forward the files to pre-designated OCI Buckets depending on file type and removes them from the inbound sftp folders to avoid duplication. Data sources and destinations could have been a(n) REST Service(s), website, etc. Here sFTP will be used as the example.

Inbound development, test, and production buckets named ftp_inbound_dev, ftp_inound_test, and ftp_inbound, respectively, exist to receive the files. Each instance can have their own copy to facilitate usage without impacting the other. Below is a sample excerpt from the XE_Buckets view.

620_ftp_Buckets.jpg

The Object Registry describes inbound and outbound files, user-defined attributes, and processing information. To be clear, there is one descriptive entry for each file type, not an entry for each actual file. The Object Log tracks the actual files themselves. The Registry makes processing as data driven as possible, enables process updating through an APEX interface, and provides a level of documentation.

640_Registry_Inbound.jpg

The Object Registry table structure is:

Create Table XE_OCI_OBJECT_REGISTRY
    (
    XE_OCI_OBJECT_REGISTRY_ID     NUMBER(20)            GENERATED BY DEFAULT ON NULL AS IDENTITY NOCACHE,
    XE_BUCKETS_ID                 NUMBER(20)          Constraint XE_OCI_OBJECT_REGISTRY_BU_NN NOT NULL,
    OP_OBJECT_NAME                VARCHAR2(255 CHAR)  Constraint XE_OCI_OBJECT_REGISTRY_NA_NN NOT NULL,
    OP_XE_BUCKETS_ID_MOVE_TO      NUMBER(20)          ,
    OP_CURRENT_DATABASE_CID       NUMBER(20)          Constraint XE_OCI_OBJECT_REGISTRY_CD_NN NOT NULL,
    OP_FILE_EXTENSION             VARCHAR2(50 CHAR)   ,
    OP_FILE_DATA_TYPE_CID         NUMBER(20)     , -- I.E. CENSUS, PAYROLL DATA, CREDS, ETC.
    OP_FILE_DIRECTION_CID         NUMBER(20)     ,
    OP_DATA_ROW_ACTION_CID        NUMBER(20)     , -- i.e. Delete, Import, Export, Noaction
    OP_SY_ENTITY_ID_SOURCE       NUMBER(20)     ,
    OP_SY_ENTITY_ID_DEST         NUMBER(20)     ,
    OP_TABLE_IMPORTED_TO          VARCHAR2(200 CHAR)  ,
    OP_TABLE_EXPORTED_FROM        VARCHAR2(200 CHAR)  ,
    OP_OBJECT_PROCESS_CID         NUMBER(20)          ,
    OP_PROCESS_YN                 VARCHAR2(1 CHAR)    ,
    OP_OVERWRITE_YN              VARCHAR2(1 CHAR),
    OP_EFFECTIVE                  DATE        Default on Null Trunc(Current_Date) CONSTRAINT XE_OCI_OBJECT_REGISTRY_EE_NN  NOT NULL ,
    OP_EXPIRES                    DATE        Default on Null To_Date('12/31/2999','MM/DD/YYYY') CONSTRAINT XE_OCI_OBJECT_REGISTRY_EX_NN    NOT NULL ,
    SY_USER_ID_ADD                NUMBER(20)  CONSTRAINT XE_OCI_OBJECT_REGISTRY_UA_NN  NOT NULL,
      SY_DATE_ADD                   DATE        CONSTRAINT XE_OCI_OBJECT_REGISTRY_DA_NN  NOT NULL, 
      SY_USER_ID_MOD                NUMBER(20)  ,
       SY_DATE_MOD                   DATE        ,
      Constraint                    XE_OCI_OBJECT_REGISTRY_BU_FK Foreign Key ( XE_BUCKETS_ID )          References XE_BUCKETS(XE_BUCKETS_ID),
      Constraint                    XE_OCI_OBJECT_REGISTRY_CD_FK Foreign Key ( 
   OP_CURRENT_DATABASE_CID) References Codes_Value(Codes_Value_ID),
      Constraint                    XE_OCI_OBJECT_REGISTRY_DT_FK Foreign Key (     OP_FILE_DATA_TYPE_CID )  References CODES_VALUE(CODES_VALUE_ID),
      Constraint                    XE_OCI_OBJECT_REGISTRY_FD_FK Foreign Key ( OP_FILE_DIRECTION_CID )  References CODES_VALUE(CODES_VALUE_ID),
      Constraint                    XE_OCI_OBJECT_REGISTRY_RA_FK Foreign Key ( OP_DATA_ROW_ACTION_CID ) References CODES_VALUE(CODES_VALUE_ID),
      Constraint                    XE_OCI_OBJECT_Registry_OP_FK Foreign Key ( OP_OBJECT_PROCESS_CID )  References Codes_Value(CODES_VALUE_ID),
      Constraint                    XE_OCI_OBJECT_REGISTRY_VS_FK Foreign Key ( OP_SY_ENTITY_ID_SOURCE ) References SY_ENTITY(SY_ENTITY_ID),
      Constraint                    XE_OCI_OBJECT_REGISTRY_VD_FK Foreign Key ( OP_SY_ENTITY_ID_DEST )  References SY_ENTITY(SY_ENTITY_ID),
      Constraint                    XE_OCI_OBJECT_REGISTRY_OP_PK Primary Key (XE_OCI_OBJECT_REGISTRY_ID)
    )
    /

Notes on the Registry columns are:

  • Processing needs vary across organizations. The objective here is to list possible user-defined columns that may be generally appropriate, and to inspire thought about what other attributes might be helpful in making the processing data driven and more easily maintained in your organization. Alternatively, these or other attributes could be tracked in JSON formatted column.
  • Columns ending in CID: "CID" columns contain primary keys of code values found in a Code Value view as described in Simplyfy managing codes and their values In short, instead of having many tables for each code and value combination, Code Type, Code Description, and Code Value tables provide a vehicle for managing them in three tables and a view and an APEX application user-interface only. Or, use your own methods for tracking codes and their values.
  • XE_Buckets_ID: Unique Bucket ID the object initially appears in.
  • OP_Object_Name: Case sensitive Object name. Use APEX_Util.URL_Encode() when referring to the item directly in the bucket to address the vagaries of file names.
  • OP_XE_Buckets_ID_Move_To: ID of Bucket where the Object should be moved to once processing is complete, removing it from Bucket XE_Buckets_ID to avoid duplicate processing. If Object names repeat, consider enabling Bucket Versioning to keep history.
  • OP_Current_Database_CID: Database instance this entry pertains to, i.e. Dev, Test, Prod. While creation of an entry for a single file for each instance leads to some duplication, it does allow for customization by instance. For example, files can have entries in Dev and Test before they are ready for production, or exist only in Dev and Test because they contain dummy test data that would not be appropriate for Production, etc. Alternatively, use a code value of "ANY" to indicate a match to all instances and only need one descriptive file entry. The Instance can be used to make refreshing Dev and Test with a data dump import from Prod easier. Attributes such as starting and move to bucket IDs do not need to be changed to Dev and Test Buckets if there is an entry for each Instance, or no entry for a particular file's instance such as Prod if the file is still in development and test.
  • OP_File_Extension: File extension of Object
  • OP_File_Data_Type_CID: Describes contents of the Object, i.e. Payroll, Inventory, Sales, etc.
  • OP_File_Direction_CID: Inbound, Outbound, Stationary, etc.
  • OP_Data_Row_Action_CID: Action to perform on the Object, i.e. Import, Export, Process, etc.
  • OP_SY_Entity_ID_Source: The Entity originating the data.
  • OP_SY_Entity_ID_Dest: The Entity receiving the data
  • OP_Table_Imported_To: Internal table the data will be applied/added to
  • OP_Table_Exported_From: Internal table or view originating the data
  • OP_Object_Process_CID: Name or Abbreviation of Object Process used by processing code to determine which set of actions to take on the data.
  • OP_Process_YN: Switch to turn automated processing on/off
  • OP_OverWrite_YN: Overwrite existing objects or content if it already exists Yes/No.
  • OP_Effective and OP_Expires: Date range in which this entry is valid.
  • SY Add and Mod: Track user and dates when this entry originally created and last updated.

Manage the Object Registry entries with a Modal Dialog Form linked with the Object Registry ID.

660_Reg_Edit.jpg

The code column naming convention is OP_ [Code Abbreviation] _CID. The resulting Source SQL Query for the Database Instance (Current_Database) item as an example is:

  Select CV_Description, Codes_Value_ID
  From VW_CODES_CUR_FULL
  Where VC_CO_NAME_ABREV_CAPS = 'CURRENT_DATABASE' And
        Current_Date Between CV_Effective and CV_Expires And
        Current_Date Between CO_Effective and CO_Expires
  Order By CV_Description

With a Default Value of:

  Select CV_Description, Codes_Value_ID
  From VW_CODES_CUR_FULL
  Where VC_CO_NAME_ABREV_CAPS = 'CURRENT_DATABASE' And
        Current_Date Between CV_Effective and CV_Expires And
        Current_Date Between CO_Effective and CO_Expires
  Order By CV_Description
  Fetch first 1 rows only;

The Table Imported To Source is a List of Values with a SQL Query of:

Select Table_Name Disp_Value, Table_Name
  From User_Tables
  Order by Table_Name

The Table Exported From Source is a List of Values with a SQL Query of:

Select Table_Name Disp_Value, Table_Name
  From User_Tables
Union
  Select View_Name Disp_Value, View_Name Table_Name
    From User_Views
Order By Table_Name;

XE_Buckets, XE_OCI_Object_Registry, and Codes_Value are tied together in the following View.

Create Or Replace View VW_XE_OCI_Object_Registry
  As
  Select
     XE_OCI_OBJECT_REGISTRY.XE_OCI_OBJECT_REGISTRY_ID
    ,XE_OCI_OBJECT_REGISTRY.XE_Buckets_ID
    ,VW_XE_Buckets.BU_Bucket_Name
    ,VW_XE_Buckets.BU_Credential
    ,VW_XE_Buckets.BU_Bucket_Type
    ,VW_XE_Buckets.BU_Bucket_Type_CID
    ,VW_XE_Buckets.BU_DB_Instance
    ,VW_XE_Buckets.BU_DB_Instance_CID
    ,VW_XE_Buckets.BU_FULL_URI
    ,(VW_XE_Buckets.BU_FULL_URI || XE_OCI_Object_Registry.OP_Object_Name) OP_FULL_Object_URI
    ,XE_OCI_OBJECT_REGISTRY.OP_OBJECT_NAME
    ,XE_OCI_Object_Registry.OP_Current_Database_CID
    ,CD_Current_Database.CV_Value_Character DC_Current_Database
    ,XE_OCI_Object_Registry.OP_XE_BUCKETS_ID_MOVE_TO
    ,XE_Buckets_Move.BU_Bucket_Name  BU_Bucket_Name_Move_To
    ,XE_Buckets_Move.BU_Credential  BU_Credential_Move_To
    ,XE_Buckets_Move.BU_FULL_URI    BU_Full_URI_Move_To
    ,(XE_Buckets_Move.BU_Full_URI || XE_OCI_Object_Registry.OP_Object_Name) OP_Full_Object_URI_Move_To
    ,XE_OCI_OBJECT_REGISTRY.OP_FILE_EXTENSION
    ,XE_OCI_OBJECT_REGISTRY.OP_FILE_DATA_TYPE_CID
    ,CD_File_Data_Type.CV_Value_Character DC_File_Data_Type
    ,XE_OCI_OBJECT_REGISTRY.OP_FILE_DIRECTION_CID
    ,CD_File_Direction.CV_Value_Character DC_File_Direction
    ,XE_OCI_OBJECT_REGISTRY.OP_DATA_ROW_ACTION_CID
    ,CD_Data_Row_Action.CV_Value_Character DC_Data_Row_Action
    ,XE_OCI_OBJECT_REGISTRY.OP_SY_ENTITY_ID_SOURCE
    ,Entity_Source.EN_Abbrev EN_Abbrev_Source
    ,XE_OCI_OBJECT_REGISTRY.OP_SY_ENTITY_ID_DEST
    ,Entity_Dest.EN_Abbrev EN_Abbrev_Dest
    ,XE_OCI_OBJECT_REGISTRY.OP_TABLE_IMPORTED_TO
    ,XE_OCI_OBJECT_REGISTRY.OP_TABLE_EXPORTED_FROM
    ,XE_OCI_OBJECT_REGISTRY.OP_Object_PROCESS_CID
    ,CD_Object_Process.CV_Value_Character DC_Object_Process
    ,XE_OCI_OBJECT_REGISTRY.OP_PROCESS_YN
    ,XE_OCI_OBJECT_REGISTRY.OP_OVERWRITE_YN
    ,XE_OCI_OBJECT_REGISTRY.OP_EFFECTIVE
    ,XE_OCI_OBJECT_REGISTRY.OP_EXPIRES
    ,XE_OCI_OBJECT_REGISTRY.SY_USER_ID_ADD
    ,XE_OCI_OBJECT_REGISTRY.SY_DATE_ADD
    ,XE_OCI_OBJECT_REGISTRY.SY_USER_ID_MOD
    ,XE_OCI_OBJECT_REGISTRY.SY_DATE_MOD

  From  XE_OCI_Object_Registry
       ,VW_XE_Buckets
       ,VW_XE_Buckets  XE_Buckets_Move
       ,Codes_Value       CD_Current_Database
       ,Codes_Value       CD_File_Data_Type
       ,Codes_Value       CD_File_Direction
       ,Codes_Value       CD_Data_Row_Action
       ,Codes_Value       CD_Object_Process
       ,SY_Entity            SY_Entity_Source
       ,SY_Entity            SY_Entity_Dest

  Where     XE_OCI_Object_Registry.XE_Buckets_ID            = VW_XE_Buckets.XE_Buckets_ID
        And Current_Date Between Trunc(VW_XE_Buckets.BU_Effective) and Trunc(VW_XE_Buckets.BU_Expires)
        And XE_OCI_Object_Registry.OP_XE_BUCKETS_ID_MOVE_TO = XE_Buckets_Move.XE_Buckets_ID (+)
        And XE_OCI_Object_Registry.OP_Current_Database_CID  = CD_Current_Database.Codes_Value_ID
        And XE_OCI_Object_Registry.OP_File_Data_Type_CID    = CD_File_Data_Type.Codes_Value_ID
        And XE_OCI_Object_Registry.OP_File_Direction_CID    = CD_File_Direction.Codes_Value_ID
        And XE_OCI_Object_Registry.OP_Data_Row_Action_CID   = CD_Data_Row_Action.Codes_Value_ID
        And XE_OCI_Object_Registry.OP_Object_Process_CID    = CD_Object_Process.Codes_Value_ID (+)
        And XE_OCI_Object_Registry.OP_SY_Entity_ID_Source  = Entity_Source.SY_Entity_ID
        And XE_OCI_Object_Registry.OP_SY_Entity_ID_Dest    = Entity_Dest.SY_Entity_ID
        And Current_Date Between OP_Effective and OP_Expires
        ;
/

Note, in addition to translating linked attributes between tables, the view includes full Bucket and Object URI's and appropriate credential names.

Next, create an Object Processing Log to track results as shown below.

The Object Processing Log captures actions taken, when, where the Object ended up, and other tracking information.

680_Process_Log.jpg

Table Creation:

Create Table XE_OCI_OBJECTS_LOG
    (
        XE_OCI_OBJECTS_LOG_ID     NUMBER(20)           GENERATED BY DEFAULT ON NULL AS IDENTITY NOCACHE,
    XE_OCI_OBJECT_REGISTRY_ID NUMBER(20)          ,
    LO_OBJECT_NAME            Varchar2(256 CHAR) CONSTRAINT XE_OCI_Objects_Log_ON_NN Not Null,
    LO_CHECKSUM               Varchar2(256 CHAR) CONSTRAINT XE_OCI_Objects_Log_CK_NN Not Null,
        LO_DATA_ROW_ACTION_CID    NUMBER(20)        ,  -- IGNORE, PROCESS ROW
        LO_DATA_ROW_STATUS_CID    NUMBER(20)        ,  -- NEEDS PROCESSING, ALREADY PROCESSED, READ ONLY, DUPLICATE IGNORE ALREADY PROCESSED, CAN DELETE
        LO_XE_BUCKETS_ID_CURRENT  Number(20)        ,
        LO_OBJECT_CREATED         DATE              ,
        LO_BATCH_ID               NUMBER(20)        ,
        LO_FILE_EXTENSION         VARCHAR2(50 CHAR) ,
        LO_FILE_SIZE              Number(20)        ,
    LO_DATE_OBJECT_PROCESSED  DATE              ,
        LO_EFFECTIVE              DATE              Default on Null Trunc(Current_Date) CONSTRAINT XE_OCI_OBJECTS_LOG_EE_NN  NOT NULL ,
        LO_EXPIRES                DATE              Default on Null To_Date('12/31/2999','MM/DD/YYYY') CONSTRAINT XE_OCI_OBJECTS_LOG_EX_NN    NOT NULL ,
      SY_USER_ID_ADD            NUMBER(20)        CONSTRAINT XE_OCI_OBJECTS_LOG_UA_NN  NOT NULL,
      SY_DATE_ADD               DATE              CONSTRAINT XE_OCI_OBJECTS_LOG_DA_NN  NOT NULL, 
      SY_USER_ID_MOD            NUMBER(20)        ,
      SY_DATE_MOD               DATE              ,
      VC_LO_OBJECT_NAME_CAPS    Generated always as (Upper(LO_OBJECT_NAME)),
      VC_LO_FILE_EXTNESION_CAPS Generated always as (Upper(LO_FILE_EXTENSION)),
      Constraint                XE_OCI_OBJECTS_LOG_RE_FK Foreign Key ( XE_OCI_OBJECT_REGISTRY_ID ) References XE_OCI_OBJECT_REGISTRY(XE_OCI_OBJECT_REGISTRY_ID),
      Constraint                XE_OCI_OBJECTS_LOG_RA_FK Foreign Key ( LO_DATA_ROW_ACTION_CID ) References CODES_VALUE(CODES_VALUE_ID),
      Constraint                XE_OCI_OBJECTS_LOG_RS_FK Foreign Key ( LO_DATA_ROW_STATUS_CID ) References CODES_VALUE(CODES_VALUE_ID),
      Constraint                XE_OCI_OBJECTS_LOG_BF_FK Foreign Key ( LO_XE_BUCKETS_ID_CURRENT ) References XE_Buckets(XE_BUCKETS_ID),
      Constraint                XE_OCI_OBJECTS_LOG_BA_FK Foreign Key ( LO_BATCH_ID ) References SYBATCH(BATCH_ID),
      Constraint                XE_OCI_OBJECTS_LOG_LO_PK   Primary Key (XE_OCI_OBJECTS_LOG_ID)
    )
    /

Object Log View:

Create Or Replace View VW_XE_OCI_OBJECTS_LOG
  As
  Select
     XE_OCI_Objects_Log.XE_OCI_OBJECTS_LOG_ID    
    ,XE_OCI_Objects_Log.XE_OCI_OBJECT_REGISTRY_ID
    ,XE_OCI_Objects_Log.LO_OBJECT_NAME           
    ,XE_OCI_Objects_Log.LO_CHECKSUM              
    ,XE_OCI_Objects_Log.LO_DATA_ROW_ACTION_CID
    ,DC_DATA_ROW_ACTION.VC_CV_VALUE             LO_DATA_ROW_ACTION   
    ,XE_OCI_Objects_Log.LO_DATA_ROW_STATUS_CID   
    ,DC_DATA_ROW_STATUS.VC_CV_VALUE             LO_DATA_ROW_STATUS
    ,XE_OCI_Objects_Log.LO_XE_BUCKETS_ID_CURRENT 
    ,XE_BUCKETS.BU_Bucket_Name                  BU_Bucket_Name_Current
    ,XE_Buckets.BU_Bucket_Type                  BU_Bucket_Type_Current
    ,VW_XE_OCI_Object_Registry.BU_Bucket_Name   BU_Bucket_Name_Original
    ,VW_XE_OCI_Object_Registry.DC_FILE_DATA_TYPE
    ,VW_XE_OCI_Object_Registry.DC_Object_Process
    ,VW_XE_OCI_Object_Registry.OP_Process_YN 
    ,VW_XE_OCI_Object_Registry.BU_Bucket_Type   BU_Bucket_Type_Original
    ,XE_OCI_Objects_Log.LO_OBJECT_CREATED        
    ,XE_OCI_Objects_Log.LO_SY_BATCH_ID          
    ,XE_OCI_Objects_Log.LO_FILE_EXTENSION        
    ,XE_OCI_Objects_Log.LO_FILE_SIZE             
    ,XE_OCI_Objects_Log.LO_DATE_OBJECT_PROCESSED 
    ,XE_OCI_Objects_Log.LO_EFFECTIVE             
    ,XE_OCI_Objects_Log.LO_EXPIRES               
    ,XE_OCI_Objects_Log.SY_USER_ID_ADD           
    ,XE_OCI_Objects_Log.SY_DATE_ADD              
    ,XE_OCI_Objects_Log.SY_USER_ID_MOD           
    ,XE_OCI_Objects_Log.SY_DATE_MOD              
    ,XE_OCI_Objects_Log.VC_LO_OBJECT_NAME_CAPS   
    ,XE_OCI_Objects_Log.VC_LO_FILE_EXTNESION_CAPS

  From  XE_OCI_Objects_Log
       ,VW_XE_OCI_Object_Registry
       ,Codes_Value DC_DATA_ROW_ACTION
       ,Codes_Value DC_DATA_ROW_STATUS
       ,VW_XE_Buckets XE_Buckets

  Where
            XE_OCI_Objects_Log.LO_XE_Buckets_ID_Current = XE_Buckets.XE_Buckets_ID
        And XE_OCI_Objects_Log.XE_OCI_Object_Registry_ID= VW_XE_OCI_Object_Registry.XE_OCI_Object_Registry_ID
        And XE_OCI_Objects_Log.LO_Data_Row_Action_CID   = DC_Data_Row_Action.Codes_Value_ID
        And XE_OCI_Objects_Log.LO_Data_Row_Status_CID   = DC_Data_Row_Status.Codes_Value_ID
  ;

Now for Import/Export processing. The following is based on packages, procedures, and functions being executed to carry out review of the Object Registry and processing on an ad hoc and APEX Application Shared Components Automations basis.

The possibilities seem endless... They can be scheduled through APEX Application Shared Component Automations, executed on an ad hoc basis, or triggered by Object Events Oracle Object Events. Functions could also be serverless: Oracle Serverless Functions An Oracle Workflow could be incorporated as well. But, it is unlikely you would be interested in reading about all of that in one Blog :)

We will stay with the ad-hoc and Shared Components Automations model for purposes of illustration.

  -- For each XE_OCI_Object_Registry for current DB instance, and processes
  --  Is object in XE_Buckets_ID Bucket, Process_YN = Y, Row effective, Same
  --     DB_Instance as Current?
  --    - Yes
  --      Run identified Object Process to
  --        Type is 'IMPORT'
  --          Import into designated OP_Table_Imported_To
  --          Copy Object to OP_XE_Buckets_ID_Move_TO Bucket
  --           Delete from XE_Buckets_ID Bucket
  --        Type is 'EXPORT'
  --           Export to designated Bucket
  --        Insert Entry into XE_OCI_Object_Log
  --    - No
  --      Null;
  --
  Procedure Process_Objects( p_DC_File_Direction IN VW_XE_OCI_Object_Registry.DC_File_Direction%Type )
    Is
  v_Current_Database_CID      Number(20);
    v_Cloud_Object_Info       Pkg_XE_OCI_Object_Registry.t_Cloud_Object;
  v_Batch_ID                      Batch.Batch_ID%Type;
    v_DC_File_Direction         VW_XE_OCI_Object_Registry.DC_File_Direction%Type;
    v_OP_File_Direction_CID     Codes_Value.Codes_Value_ID%Type;
  v_XE_OCI_Objects_Log  XE_OCI_Objects_Log%RowType;
  v_Message                     App_Error.Ae_Message%Type;
  e_Error                        Exception;

  Begin
      v_DC_File_Direction         := Upper(p_DC_File_Direction);
    -- Validate direction
    v_OP_File_Direction_CID     := Pkg_Codes_Value.Qry_Code_Value_To_ID(p_CO_NAME_ABREV                    => 'FILE_DIRECTION',
                                                                            p_CV_Value_Character        => v_DC_File_Direction,
                                                                            p_Error_on_No_Find        => 'Y');

-- Determine current database instance Codes_Value_ID
    v_Current_Database_CID:= Pkg_Codes_Value.Qry_Code_Value_To_ID(p_CO_Name_Abrev=> 'CURRENT_DATABASE',                                                           
                                           p_CV_Value_Character    =>    Current_DB_Instance);
    For c_Object_Registry In (Select *
                    From VW_XE_OCI_Object_Registry
                    Where     Current_Date Between VW_XE_OCI_Object_Registry.OP_Effective and VW_XE_OCI_Object_Registry.OP_Expires
                          And VW_XE_OCI_Object_Registry.OP_Process_YN = 'Y'
                          And VW_XE_OCI_Object_Registry.DC_Object_Process Is Not Null
                          And VW_XE_OCI_Object_Registry.DC_FILE_Direction = v_DC_FILE_Direction
                          And VW_XE_OCI_Object_Registry.OP_Current_Database_CID = v_Current_Database_CID)
      Loop
        -- Does object exist in bucket?
        v_Cloud_Object_Info := Pkg_XE_OCI_Object_Registry.Qry_Cloud_Object_Info(  p_XE_OCI_Object_Registry_ID =>  c_Object_Registry.XE_OCI_Object_Registry_ID);
         Case c_Object_Registry.DC_FILE_Direction
         When 'IMPORT' Then
            If v_Cloud_Object_Info.t_Object_Name Is Not Null then
              -- Object exists in Bucket.
              -- Insert Contents into appropriate table, and move object to processed bucket.
              v_Batch_ID      := Pkg_XE_OCI_Object_Registry.Import_Object_Contents( p_XE_OCI_Object_Registry_ID =>  c_Object_Registry.XE_OCI_Object_Registry_ID,
                                                                                            p_Error_On_No_Find          =>  'Y');
              v_Processed_YN      := 'Y';
            Else
              -- No Object file to import, so skip
              Null;
            End If;
          When 'EXPORT' Then
            -- Export Contents into appropriate Object.
            Pkg_XE_OCI_Object_Registry.Export_File_Contents( p_XE_OCI_Object_Registry_ID =>  c_Object_Registry.XE_OCI_Object_Registry_ID,
                                                             p_Error_On_No_Find          =>  'Y');
            v_Processed_YN      := 'Y';
          Else
             -- Either does not apply, No action necessary, or process not created for type yet.
             Null;
          End Case;
        If v_Processed_YN = 'Y' Then
          -- Log in XE_OCI_Objects_Log
          v_XE_OCI_Objects_Log.XE_OCI_OBJECT_REGISTRY_ID := v_VW_XE_OCI_Object_Registry.XE_OCI_Object_Registry_ID;
          v_XE_OCI_Objects_Log.LO_OBJECT_NAME := c_Object_Registry.OP_Object_Name;
          v_XE_OCI_Objects_Log.LO_CHECKSUM := v_Cloud_Object_Info.t_CheckSum;
          v_XE_OCI_Objects_Log.LO_DATA_ROW_ACTION_CID := Pkg_Codes_Value.Qry_Code_Value_To_ID( p_CO_Name_Abrev => 'DATA_ROW_ACTION',
                                                                      p_CV_Value_Character    =>    'IMPORT');
          v_XE_OCI_Objects_Log.LO_DATA_ROW_STATUS_CID    := Pkg_Codes_Value.Qry_Code_Value_To_ID(p_CO_Name_Abrev => 'DATA_ROW_STATUS',
                                                                   p_CV_Value_Character    =>    'PROCESSED');
          v_XE_OCI_Objects_Log.LO_XE_BUCKETS_ID_CURRENT  := v_VW_XE_OCI_Object_Registry.OP_XE_Buckets_ID_Move_To;
          v_XE_OCI_Objects_Log.LO_OBJECT_CREATED := Nvl(v_Cloud_Object_Info.t_Created,v_Cloud_Object_Info.t_Last_Modified);
          v_XE_OCI_Objects_Log.LO_BATCH_ID := v_Batch_ID;
          v_XE_OCI_Objects_Log.LO_FILE_EXTENSION := c_Object_Registry.OP_File_Extension;
          v_XE_OCI_Objects_Log.LO_FILE_SIZE  := v_Cloud_Object_Info.t_Bytes;
          v_XE_OCI_Objects_Log.LO_DATE_OBJECT_PROCESSED  := Current_Date;
          v_XE_OCI_Objects_Log.LO_EFFECTIVE  := Nvl(v_Cloud_Object_Info.t_Created,v_Cloud_Object_Info.t_Last_Modified);
          v_XE_OCI_Objects_Log.LO_EXPIRES     := Pkg_Codes_Value.g_Date_Infinity;
          -- Managed by Trigger and Virtual Column
          -- v_XE_OCI_Objects_Log.SY_USER_ID_ADD            := ;
          -- v_XE_OCI_Objects_Log.SY_DATE_ADD               := ;
          -- v_XE_OCI_Objects_Log.SY_USER_ID_MOD            := ;
          -- v_XE_OCI_Objects_Log.SY_DATE_MOD               := ;
          -- v_XE_OCI_Objects_Log.VC_LO_OBJECT_NAME_CAPS    := ;
          -- v_XE_OCI_Objects_Log.VC_LO_FILE_EXTNESION_CAPS := ;
          Pkg_XE_OCI_Objects_Log.Ins( p_XE_OCI_Objects_Log  =>  v_XE_OCI_Objects_Log);
        End If;
      End Loop;
    Exception
        When e_Error Then
      -- Your error handling and logging here
        When Others Then
      -- Your error handling and logging here
  End Process_Objects;

Recognizing your code will naturally be different, perhaps very different, the detail of the packages, procedures, and functions will not be shown. Instead, here are some notes on the major components:

  • Direction: Parameter value determines direction of process, import or export. Allows segmentation of processing so automated import and export processing can be scheduled and handled differently.
  • Identify Current Database Instance Code Value: Current DB Instance ID is compared to each Object Registry row so only rows associated with the current instance are processed, i.e. Dev, Test, and Prod. Current_DB_Instance is a user-defined function (Returns select ora_database_Name from dual;) returning the current DB.
  • Process Loop: Loops through each Object Registry View row to determine if it pertains to the current database instance, OP_Process_YN is (Y)es and the effective/expire range is current.
  • Object Exists in Bucket? If importing Object to be acted upon exists in expected Bucket (XE_Buckets_ID) process it if OP_OverWrite = 'Y', otherwise proceed to next Object.
  • Log: Create and Insert Log entry if a file was processed.

And some snippets for the more generic key code in the called procedures.

Get Object information from the Bucket and determine if Object exists upon return.

Declare
  v_Obj       Pkg_XE_OCI_Object_Registry.t_Cloud_Object;
  v_VW_XE_OCI_Object_Registry VW_XE_OCI_Object_Registry%RowType;
Begin
  v_VW_XE_OCI_Object_Registry := Pkg_XE_OCI_Object_Registry.Pkg_XE_OCI_Object_Registry.Qry_View( p_XE_OCI_Object_Registry_ID => c_Object_Registry.XE_OCI_Object_Registry_ID);

  SELECT Object_Name, Bytes, Checksum, Created, Last_Modified
    Into v_Obj.t_Object_Name, v_Obj.t_Bytes, v_Obj.t_CheckSum, v_Obj.t_Created, v_Obj.t_Last_Modified
    FROM DBMS_CLOUD.LIST_OBJECTS( Credential_Name => v_VW_XE_OCI_Object_Registry.BU_Credential,
                                Location_URI    => v_VW_XE_OCI_Object_Registry.BU_Full_URI)
         Where Upper(Object_Name) = Upper(APEX_Util.URL_Encode(v_VW_XE_OCI_Object_Registry.OP_Object_Name));
   Return v_Obj;  
End;

Import Contents by creating an external table on the particular Object.

  • If external table already exists delete it to accommodate any potential file or table definition changes that might have occurred.
  • Create the new table using Object Registry view information.
  • Import and process data in Object based on newly created external table. The process importing from the external table is not shown because it is process specific and would not apply in general.
  • The external table name is retrieved separately from the Object Registry even though the next procedure retrieves the full Object Registry information for the Object for two reasons. First, it allows the external name to be defined/customized in one place so if the naming convention changes in the future, it only needs to change in this function. Second, it returns a single value that could be needed separately elsewhere without returning the whole row.
    Pkg_XE_OCI_Object_Registry.Del_External_Process_Table(  p_XE_OCI_Object_Registry_ID => p_XE_OCI_Object_Registry_ID);

    -- Retrieve external table name for this object 
    v_External_Table_Name := Pkg_XE_OCI_Object_Registry.Qry_External_Table_Name( p_XE_OCI_Object_Registry_ID =>  p_XE_OCI_Object_Registry_ID);
    -- Get Object Registry information
    v_VW_Object_Registry  := Pkg_XE_OCI_Object_Registry.Qry_View( p_XE_OCI_Object_Registry_ID => p_XE_OCI_Object_Registry_ID);

    Case v_VW_Object_Registry.DC_Object_Process
    When 'IMP_MARKET_CSV' Then
      Begin
        -- Create External Table linked to Object in Bucket
        Dbms_Cloud.Create_External_Table(
          table_name => v_External_Table_Name.Ext_Table_Name,
          credential_name => v_VW_Object_Registry.BU_Credential,
          file_uri_list => v_VW_Object_Registry.OP_Full_Object_URI,
          column_list => 
                              'COL001    Varchar2(300 CHAR),
                               COL002    Varchar2(300 CHAR),
                               COL003    Varchar2(300 CHAR),
                               COL004    Varchar2(300 CHAR),
                               COL005    Varchar2(300 CHAR),
                               COL006    Varchar2(300 CHAR),
                               COL007    Varchar2(300 CHAR),
                               COL008    Varchar2(300 CHAR),
                               COL009    Varchar2(300 CHAR),
                               COL010    Varchar2(300 CHAR),
                               COL011    Varchar2(300 CHAR),
                               COL012    Varchar2(300 CHAR),
                               COL013    Varchar2(300 CHAR),
                               COL014    Varchar2(300 CHAR),
                               COL015    Varchar2(300 CHAR),
                               COL016    Varchar2(300 CHAR),
                               COL017    Varchar2(300 CHAR),
                               COL018    Varchar2(300 CHAR),
                               COL019    Varchar2(300 CHAR),
                               COL020    Varchar2(300 CHAR),
                               COL021    Varchar2(300 CHAR),
                               COL022    Varchar2(300 CHAR),
                               COL023    Varchar2(300 CHAR), 
                               COL024    Varchar2(300 CHAR)',
          format          => json_object('type' value 'csv', 'skipheaders' value '1')
        );

      End;

After Importing Object contents move the Object to a "Processed" Bucket so it is not re-processed the next time the Import procedure runs. (Moving the Object after export is the responsibility of the sFTP server CLI script when it gets the contents and FTPs them to the destination.)

    v_Object_Contents_Blob  := DBMS_CLOUD.GET_OBJECT(
                                                    credential_name      => v_VW_XE_OCI_Object_Registry.BU_Credential,
                                                    object_uri           => v_VW_XE_OCI_Object_Registry.OP_Full_Object_URI,
                                                    startoffset          => 0,
                                                    endoffset            => 0,
                                                    compression          => Null
                                                    );

    DBMS_CLOUD.PUT_OBJECT (
       credential_name      => v_VW_XE_OCI_Object_Registry.BU_Credential,
       object_uri           => v_VW_XE_OCI_Object_Registry.OP_Full_Object_URI_Move_To,
       contents             => v_Object_Contents_Blob);

    DBMS_CLOUD.DELETE_OBJECT (
       credential_name      => v_VW_XE_OCI_Object_Registry.BU_Credential,
       object_uri           => v_VW_XE_OCI_Object_Registry.OP_Full_Object_URI);

Exporting is similar. Loop through Object Registry, identify Objects for Export, and instead of importing from an external table, create Objects in their designated Buckets.

For example:

  v_Instance := Current_DB_Instance;
  v_VW_XE_OCI_Object_Registry := Pkg_XE_OCI_Object_Registry.Qry_View_Object_Name(  
                                      p_OP_Object_Name      =>  'FILE_TO_BE_SENT.xlsx',
                                      p_DC_Current_Database =>  v_Instance,
                                      p_DC_File_Direction   =>  'EXPORT');
  v_SQL  := 'Select * from ' || v_VW_XE_OCI_Object_Registry.OP_Table_Exported_From;

  l_context := apex_exec.open_query_context(
        p_location    => apex_exec.c_location_local_db,
        p_sql_query   => v_SQL );
  l_export := apex_data_export.export (
                        p_context   => l_context,
                        p_format    => apex_data_export.c_format_xlsx );
  apex_exec.close( l_context );
  DBMS_CLOUD.PUT_OBJECT(
     credential_name => v_VW_XE_OCI_Object_Registry.BU_Credential,
     object_uri => v_VW_XE_OCI_Object_Registry.OP_Full_Object_URI,
     contents => l_export.content_blob
     );

A CLI script running on the sFTP server checks for Objects in the ftp_outbound Bucket and FTPs them to their destination as appropriate. After they are sent the files are moved to the ftp_processed Bucket, completing the overall process.

Following is a sample de-identified Perl ftp server script that my system admin prepared that I would not have been able to do...

#
# This script checks to see if there are outbound files ready for sftp transfer
# to [ftp-site].com.  The outbound buckets are:
# ftp_outbound, ftp_outbound_dev, and ftp_outbound_test
# If there are files, then this script will sftp the bucket files to
# [ftp site].com.  Then after the files are successfully sftp'd
# then the files will be transferred to the processed buckets which are:
# ftp_processed, ftp_processed_dev, and ftp_processed_test
#
# This script needs to run as a user that is configured to be
# able to run oci command line commands.

# Define parameters
$bucket_name = "";
@file_dir = ("ftpdev", "ftptest", "ftpprod");
$oci_file_out = "/tmp/ftp_xfer.exit";
$oci_dir_xfer = "/data/ftp_xfer";

$ftp_folder_dev = "/fl_home/arriving/Devobjects";
$ftp_folder_prod = "/fl_home/arriving";
$ftp_folder_test = "/fl_home/arriving/TestObjects";

# Initialize variables
$date = scalar localtime;
$file_date = `date +_%b-%d-%Y_%H:%M`;
chomp ($file_date);
chop ($hostname = `/bin/hostname`);
$mailcmd = "| /usr/sbin/sendmail -oi -t";
$mail_to="tester\@your-domain.com";
#$mail_to="FileProcess\@your-domain.com, tester\@your-domain.com";
#$mail_to="tester2\@your-domain.com, tester\@your-domain.com";
#$mail_to="FileProcess\@your-domain.com";

# Make sure the data directories exist
foreach $env_dir (@file_dir) {
  $return_code = system ("/bin/mkdir -p $oci_dir_xfer/$env_dir");
  if ($return_code) { &mail_abort ("Cannot create the data directories in $oci_dir_xfer/$env_dir: $!\n"); }
next; }

# Make sure the data directories have the proper file permissions
$return_code = system ("/bin/chown -R testname:testname $oci_dir_xfer");
#
if ($return_code) { &mail_abort ("Cannot modify the ownership on the data directories in $oci_dir_xfer: $!\n"); }
$return_code = system ("/bin/chmod -R 700 $oci_dir_xfer");
if ($return_code) { &mail_abort ("Cannot modify the permissions on the data directories in $oci_dir_xfer: $!\n"); }

# Delete any files in the data directories that were not previously deleted
# for any reason
foreach $env_dir (@file_dir) {
  $return_code = system ("/bin/rm -f $oci_dir_xfer/$env_dir/*");
  if ($return_code) { &mail_abort ("Cannot delete the files in $oci_dir_xfer/$env_dir: $!\n"); }
next; }

# Download all files in all buckets
foreach $env_dir (@file_dir) {
  if ($env_dir eq "prod") { $bucket_name = "ftp_outbound"; }
  else { $bucket_name = "ftp_outbound_$env_dir"; }
  $return_code = system ("/bin/oci os object bulk-download --bucket-name $bucket_name --download-dir $oci_dir_xfer/$env_dir --no-overwrite > $oci_file_out 2>&1");
  if ($return_code) { &mail_abort ("Cannot download the files to $oci_dir_xfer/$env_dir from bucket $bucket_name: $!\n"); }
next; }

# Search all directories for any downloaded files
foreach $env_dir (@file_dir) {
  if ($env_dir eq "ftpprod") {
    $src_bucket_name = "ftp_outbound";
    $dst_bucket_name = "ftp_processed"; }
  else {
    $src_bucket_name = "ftp_outbound_$env_dir";
    $dst_bucket_name = "ftp_processed_$env_dir"; }
  opendir(BUCKET_DIR,"$oci_dir_xfer/$env_dir") or &mail_abort ("Cannot open the bucket directory $oci_dir_xfer/$env_dir: $!");
  @bucket_files = readdir(BUCKET_DIR) or &mail_abort ("Unable to read directory $oci_dir_xfer/$env_dir: $!");
  closedir (BUCKET_DIR);
  foreach $bucket_file (@bucket_files) {
    next if ($bucket_file eq ".");  # Skip the current directory entry
    next if ($bucket_file eq ".."); # Skip the parent  directory entry
    # sftp the file to ftp.destination-site.com
    # $return_code = system ("/bin/sftp $username_arriving_dir/$sftp_file $remote_sftp_user\@$remote_sftp_server:$remote_sftp_dir/$username/$remote_rtp_dir/$sftp_file > $oci_file_out");
    # if ($return_code) { &mail_abort ("Cannot copy the file $username_arriving_dir/$sftp_file to $username_rtp_dir/$sftp_file: $!\n"); }
    # If the file was sftp'd successfully above then copy the file to
    # the processed bucket.
    $return_code = system ("/bin/oci os object copy --bucket-name $src_bucket_name --destination-bucket $dst_bucket_name --source-object-name $bucket_file > $oci_file_out 2>&1");
    if ($return_code) { &mail_abort ("Cannot copy $bucket_file from bucket $src_bucket_name to bucket $dst_bucket_name: $!\n"); }
    # If the file was copied successfully above then delete the file from
    # the outbound bucket.
    $return_code = system ("/bin/oci os object delete --bucket-name $src_bucket_name --object-name $bucket_file --force >> $oci_file_out 2>&1");
    if ($return_code) { &mail_abort ("Cannot delete $bucket_file from bucket $src_bucket_name: $!\n"); }
    unlink "$oci_dir_xfer/$env_dir/$bucket_file" or &mail_abort ("Cannot delete $oci_dir_xfer/$env_dir/$bucket_file: $!\n");
  next; # Goto the next file in the bucket's directory
  }
next; # Goto the next bucket's download directory
}

exit (0);

sub mail_notify {
  # Mail the appropriate people that a file has been received.
  my $file_name = shift;

  open (MAIL, $mailcmd);
From: sftp\@$hostname
To: $mail_to
Subject: Originator File Received: $file_name
Date: $date

$file_name has been transferred to the sftp server.
MSG
  close MAIL;
}

sub mail_abort {
  # Mail the appropriate person that there was an error and then abort
  # processing the files.
  my $errormsg = shift;

  open (MAIL, $mailcmd);
From: sftp\@$hostname
To: $mail_to
Subject: $0 on $hostname aborted processing!
Date: $date

$0 on $hostname on $date had a severe error!\n
$errormsg
...ABORTING...
MSG
  close MAIL;
  exit (1);
}

In summary, the scalable, versatile, data driven architecture helps deliver increased productivity, consistency, flexibility, and insight. While everyone's processes are different the intent is to offer a framework and ideas you can build on.

I hope you find this information useful. Comments (and likes :) ) welcome !