Type: | Package |
Maintainer: | Gergely Daroczi <daroczig@rapporter.net> |
Author: | Gergely Daroczi <daroczig@rapporter.net> |
Title: | Amazon 'Kinesis' Consumer Application for Stream Processing |
Description: | Fetching data from Amazon 'Kinesis' Streams using the Java-based 'MultiLangDaemon' interacting with Amazon Web Services ('AWS') for easy stream processing from R. For more information on 'Kinesis', see https://aws.amazon.com/kinesis. |
Version: | 1.7.6 |
Date: | 2023-08-17 |
URL: | https://github.com/daroczig/AWR.Kinesis |
License: | AGPL-3 |
Imports: | AWR, logger, jsonlite, rJava |
RoxygenNote: | 7.2.3 |
Encoding: | UTF-8 |
NeedsCompilation: | no |
Packaged: | 2023-08-18 22:00:56 UTC; daroczig |
Repository: | CRAN |
Date/Publication: | 2023-08-19 00:02:34 UTC |
An R Kinesis Consumer
Description
Please find more details in the README.md
file.
Checkpoint at current or given sequence number
Description
Checkpoint at current or given sequence number
Usage
checkpoint(sequenceNumber)
Arguments
sequenceNumber |
optional |
Run Kinesis Consumer application
Description
Run Kinesis Consumer application
Usage
kinesis_consumer(
initialize,
processRecords,
shutdown,
checkpointing = TRUE,
updater,
logfile = tempfile()
)
Arguments
initialize |
optional function to be run on startup. Please note that the variables created inside of this function will not be available to eg |
processRecords |
function to process records taking a |
shutdown |
optional function to be run when finished processing all records in a shard |
checkpointing |
if set to |
updater |
optional list of list(s) including frequency (in minutes) and function to be run, most likely to update some objects in the parent or global namespace populated first in the |
logfile |
file path of the log file. To disable logging, set |
Note
Don't run this function directly, it is to be called by the MultiLangDaemon. See the package README for more details.
References
Examples
## Not run:
log_threshold(FATAL, namespace = 'AWR.Kinesis')
AWS.Kinesis::kinesis_consumer(
initialize = function() log_info('Loading some data'),
processRecords = function(records) log_info('Received some records from Kinesis'),
updater = list(list(1, function() log_info('Updating some data every minute')),
list(1/60, function() log_info('This is a high frequency updater call')))
)
## End(Not run)
Get record from a Kinesis Stream
Description
Get record from a Kinesis Stream
Usage
kinesis_get_records(
stream,
region = "us-west-1",
limit = 25,
shard_id,
iterator_type = c("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER",
"AFTER_SEQUENCE_NUMBER", "AT_TIMESTAMP"),
start_sequence_number,
start_timestamp
)
Arguments
stream |
stream name (string) |
region |
AWS region (string) |
limit |
number of records to fetch |
shard_id |
optional shard id - will pick a random active shard if left empty |
iterator_type |
shard iterator type |
start_sequence_number |
for |
start_timestamp |
for |
Value
character vector that you might want to post-process eg with jsonlite::stream_in
Note
Use this no more than getting sample data from a stream - it's not intended for prod usage.
References
Write a record to a Kinesis Stream
Description
Write a record to a Kinesis Stream
Usage
kinesis_put_record(stream, region = "us-west-1", data, partitionKey)
Arguments
stream |
stream name (string) |
region |
AWS region (string) |
data |
data blog (string) |
partitionKey |
determines which shard in the stream the data record is assigned to, eg username, stock symbol etc (string) |
Value
invisible list including the shard id and sequence number
References
Examples
## Not run:
df <- mtcars[1, ]
str(kinesis_put_record('test-AWR', data = jsonlite::toJSON(df), partitionKey = row.names(df)))
## End(Not run)
Read one non-empty line from stdin without any warnings printed to stdout
Description
Read one non-empty line from stdin without any warnings printed to stdout
Usage
read_line_from_stdin()
Value
string
Securely write a line to stdout with logging
Description
Securely write a line to stdout with logging
Usage
write_line_to_stdout(line)
Arguments
line |
string |