Categorygithub.com/pschou/go-flowfile
modulepackage
0.0.0-20230301020414-6ecc1a33f18b
Repository: https://github.com/pschou/go-flowfile.git
Documentation: pkg.go.dev

# README

go-flowfile

flowfile is a GoLang module providing a set of tools to interact with NiFi FlowFiles at a low level. It's been finely tuned to handle the streaming context best, as memory and disk often have limitations.

This module was built to be both simple-to-use and operate at a low level (at the bytes) to work with FlowFiles at wire speed. Here is an example of how a basic filtering and forwarding method would be written:

func main() {
  // Create a endpoint to send FlowFiles to:
  txn, err := flowfile.NewHTTPTransaction("http://target:8080/contentListener", nil)
  if err != nil {
    log.Fatal(err)
  }

  // Setup a receiver method to deal with incoming flowfiles
  myFilter := flowfile.NewHTTPFileReceiver(func(f *flowfile.File, w http.ResponseWriter, r *http.Request) error {
    if f.Attrs.Get("project") == "ProjectA" {
      return txn.Send(f) // Forward only ProjectA related FlowFiles
    }
    return nil // Drop the rest
  })
  http.Handle("/contentListener", myFilter) // Add the listener to a path
  http.ListenAndServe(":8080", nil)         // Start accepting connections
}

Another slightly more complex program example of building a NiFi routing program, this time it forwards 1 of every 10 FlowFiles while keeping the bundles together in one POST:

func main() {
  txn, err := flowfile.NewHTTPTransaction("http://decimated:8080/contentListener", tlsConfig)
  if err != nil {
    log.Fatal(err)
  }

  var counter int
  myDecimator := flowfile.NewHTTPReceiver(func(s *flowfile.Scanner, w http.ResponseWriter, r *http.Request) {
    pw := txn.NewHTTPPostWriter()
    defer pw.Close() // Ensure the POST is sent when the transaction finishes.

    for s.Scan() {
      f := s.File()
      if counter++; counter%10 == 1 { // Forward only 1 of every 10 Files
        if _, err = pw.Write(f); err != nil { // Oops, something unexpected bad happened
          w.WriteHeader(http.StatusInternalServerError) // Return an error
          pw.Terminate()
          return
        }
      }
    }
    if err := s.Err(); err != nil {
      log.Println("Error:", err)
      w.WriteHeader(http.StatusInternalServerError)
      return
    }
    w.WriteHeader(http.StatusOK) // Drop the rest by claiming all is ok
  })

  http.Handle("/contentDecimator", myDecimator) // Add the listener to a path
  http.ListenAndServe(":8080", nil)             // Start accepting connections
}

More examples can be found: https://pkg.go.dev/github.com/pschou/go-flowfile#pkg-examples

Early logic is key! When an incoming FlowFile is presented to the program, what is presented are the attributes often seen in the first packet in the stream, so by the time a decision is made on what to do with the FlowFile, the destination and incoming streams can be connected together to avoid all local caches and enable "fast-forwarding" of the original packets.

The complexity of the decision logic can be as complex or as simple as one desires. One can consume on one or more ports/listening paths and send to as many downstream servers as desired with concurrency.

For more documentation, go to https://pkg.go.dev/github.com/pschou/go-flowfile .

# Functions

Create a new File struct from an io.Reader with size.
NewFromDisk creates a new File struct from a file on disk.
NewHTTPFileReceiver interfaces with the built-in HTTP Handler and parses out the individual FlowFiles from a stream and sends them to a FlowFile handler.
NewHTTPReceiver interfaces with the built-in HTTP Handler and parses out the FlowFile stream and provids a FlowFile scanner to a FlowFile handler.
Create the HTTP sender and verify that the remote side is listening.
Create the HTTP sender without verifying remote is listening.
Create the HTTP sender and verify that the remote side is listening.
No description provided by the author
Create a new FlowFile reader, wrapping io.Reader for reading consecutive FlowFiles from a stream.
Create a new FlowFile reader, using a (chan *File) for reading consecutive FlowFiles from a channel.
Create a new FlowFile reader, for reading from a slice of FlowfFles.
No description provided by the author
Splits up a flowfile into count number of segments.
Splits up a flowfile into a number of segments with segmentSize.

# Constants

No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

A single attribue in a FlowFile header.
A File is a handler for either an incoming datafeed or outgoing datafeed of the contents of a file over a File connection.
Writer ecapsulates the ability to write one or more flow files in one POST request.
Implements http.Handler and can be used with the GoLang built-in http module: https://pkg.go.dev/net/http#Handler.
The HTTP Sender will establish a NiFi handshake and ensure that the remote endpoint is listening and compatible with the current flow file format.
No description provided by the author
A wrapper around an io.Reader which parses out the flow files.
No description provided by the author

# Type aliases

A set of attributes in a FlowFile header.