這是一個數位化的世界。隨著越來越多數位設備的使用,大量的資料也隨之產生,而這麼多的原始資料就需要進行整理、分析、產生新的洞見,資料才會變成資訊,進而促使我們行動。然而傳統的分析工具已經越來越無法有效的分析這些大量的資料,我們需要一種創新的方法來分析這一些大數據。
大數據工具和技術為高效能數據分析來更好的了解客戶需求、在市場中獲得競爭優勢和發展業務提供了機遇和挑戰。 資料管理架構已經從傳統的數據倉儲模型演變為更複雜的架構,以滿足更多的需求,例如即時和批次資料處理; 結構化和非結構化資料; 高頻交易等等。
AWS 提供了廣泛的託管服務平台,可幫助我們快速輕鬆地建立、保護和無縫擴展端到端(end-to-end)的大數據應用程序。 無論我們的應用程序需要及時串流還是批次資料處理,AWS 都能提供基礎設施和工具來處理我們的大數據專案。 無需採購硬體與軟體,無需維護和擴展基礎設施 — — 只需收集、存儲、處理和分析大數據所需的東西。 AWS 擁有一個分析解決方案的生態系統,專門設計用於處理不斷增長的資料量並深入了解我們的業務。
AWS的大數據分析優勢
分析大量資料需要強大的電腦運算能力,其資料大小會因輸入資料量和分析類型而異。 AWS的大數據服務的特性非常適合Pay-as-you-go的雲端計費模型,在這種模型中,應用程序可以根據需求輕鬆擴展和縮減。 隨著需求的變化,我們可以輕易的調整 AWS 上的環境(水平擴展或垂直擴展)以滿足業務需求,不需要等待額外的軟硬體購買時間或需要過度投資軟硬體的容量。
AWS提供了多種的工具與服務來支援這些大數據分析需求,下面將會一個個介紹其功用與特性。在AWS的大數據分析中會分為資料的四個階段,分別是收集,處理,儲存與分析(如下圖)。
我們將介紹以下工具來對應AWS大數據分析的四個階段。除了這四個階段,資料的視覺化與安全也同等重要,而我們也將一併介紹AWS關於這個方面的服務。
當然除了上述這些託管服務之外,我們也可以指使用EC2(VM)來建立自己的大數據分析平台。
資料收集階段
AWS在這個階段將資料收集分為三種
Kinesis
這是基於 Apache Kafka提供用來處理串流資料(streaming data)與即時分析的平台,當然我們也可以客製化自己的應用程式放在這上面運行。串流資料的種類可能是應用程式日誌,網站使用者的點擊資料,IoT裝置的監控資料等等,我們可以將這些資料存放到資料庫/資料湖/資料倉儲中甚至是我們自己的即時資料處理程式來處理這些資料。Kinesis適合用在串流處理框架(streaming processing frameworks),像是Spark, NiFi等等。由於這是託管式服務,在Kinesis處理的資料都會自動同步複製在AWS的三個AZ之間。
而Kinesis服務分為以下五個部分
1.Kinesis Data Streams — 構建處理或分析streaming data的自定義應用程序,適合low latency的streaming ingest。
2.Kinesis Video Streams — 構建處理或分析Video streaming的自定義應用程序,主要目的對串流影音與time-encoded的資料做分析。
3.Kinesis Data Firehose— 將資料傳送到AWS的其他服務,像是 S3, Redshift, Splunk與 Elasticsearch Service。
4.Kinesis Data Analytics — 使用Standard SQL與Apache Flink來處理與分析串流資料。
5.Managed Streaming for Kafka — 這是當Kinesis前面四個服務無法滿足你的需求時可以使用這一個服務。
而Kinesis提供了四種不同的方式來擷取(ingest),處理,並分析資料。
Kinesis Data Streams
使我們能夠構建即時處理或分析串流資料的自定義的應用程序。 Kinesis Data Streams 每小時可以從數十萬個來源(例如網站點擊流、金融交易、社交媒體來源、IT logs和位置追踪來存儲好幾 TB 的資料。
我們在開發程式與AWS Kinesis搭配的應用程序時,搭配著Kinesis Client Library(KCL)我們可以構建 Amazon Kinesis Application並使用streaming data來支援即時顯示的儀表板、產生告警以及進行動態定價和廣告。這一類搭配KCL的應用程式可以將資料寫入到AWS的 S3, DynamoDB, Redshift, Elasticsearch Service等(如下示意圖)
使用 AWS 控制台、API 或SDK,以"每秒 1 MB" 的block為單位預置data stream所需的輸入和輸出等級。 可以隨時向上或向下調整stream的大小,而無需重新啟動stream,也不會對將資料推送到stream的data source產生任何影響。 在幾秒鐘內,放入stream中的資料即可用於分析。
而在Kensis stream中, stream被分為數個 Shards(或稱Partitions)。Shard是唯一標識streams的順序與固定的處理容量。
上圖中可以看到,推送資料進到Shards的部分稱為producers,取出資料的部分稱為consumers。資料存放在Kinesis stream最高可以存放七天(預設是24小時),也就是說Kinesis stream的服務內部是有storage存放資料的(Kinesis Firehose則沒有)。也因為可以存放資料,所以資料是可以被reprocess/ replay 的。而在處理資料Consumers可以同時是多個應用程序,意思是同一個data stream同時可以服務多個consumer。並且因為需要服務多個應用程序(consumers)所以我們可以提高整個輸出資料的throughput。另外資料一旦進入Kinesis它就不能被刪除,直到資料的保留時間到期。
而Shard則是最小的計費單位,意思是我們使用的shard的數量越多費用就會拉高。但運作過程中我們可以依當時運作的需求對shard實行 reshard作業。而records在每個shard中是有排序的,哪甚麼是Records呢?可以參考下圖
由上圖可知,每筆record的組成為三個部分
Data Blob: 正在傳送的資料,並序列化(serialized)為bytes. 資料量最高可以到"1MB"。
Record Key : 與record一起發送,有助於在shard中對record進行分組。意思是 Same key = Same shard。而由於我們要避免某一個shard會處理到大部分甚至是全部的資料,所以key的設計必須是highly distributed,將資料平均分配到所有的shard中處理以防止 hot partition的問題發生。
Sequence number: 資料進入kinesis stream中會由kinesis來對每筆record添加sequence number來辨識每筆record的唯一性,但這個唯一性只存在於該shard中。
Kinesis Data Streams的限制
在使用Kinesis Data streams時我們需要知道它的限制性在哪裡
Producer: 每個shard可以"寫入"的資料量是"每秒1MB"或是"每秒 1000 messages".
傳統的Consumer : 每個shard對”所有的”consumers是"每秒2MB"的資料讀取量。或是每個shard對”所有的”consumers是"每秒5個 API calls".這類的consumer是對shard裡的資料做pull的動作。
增強型的(enhanced Fan-out)Consumer: 每個shard對每一個增強型的的consumer都有"每秒2MB"的資料讀取量。也就是說不會像是傳統型的consumer一樣shard的資料讀取是要被分享給所有的consumer的,每個consumer都可以有自己"2MB的資料專用資料讀取量"。在這個模式中不再需要API calls了,因為這時資料對應到consumer是屬於"push mode"。
資料留存時間: 預設是24小時,最高可以到七天。
Kinesis Producers
哪甚麼是Kinesis prodcuers呢?基本上就是將資料寫入到data stream,包含 Kinesis SDK / Kinesis Procedure Library(KPL)/ Kinesis Agent甚至是其他第三方的library,像是Spark, Log4J, Appenders, Flume, Kafka connect, NiFi等等
讓我們來看一下以下幾種Producer的使用方式與情境
Kinesis Producer SDK
在輸入資料時的API call有哪些是常用到的 — 例如 : PutRecord(一筆資料)或是PutRecords(很多筆資料),PutRecords通常使用在batching或是要增加throughput,同時這麼做也意味著我們只需要少量的 HTTP requests。每一個單一個HTTP request的PutRecords可以支援500筆records且每筆record可以有1MB的資料大小,但整體大小只能支援到5MB。另外還有 ProvisionedThroughExceeded,這是我們的資料超過限制後可以進行甚麼樣的處置。通常為這三種處理方式 — 1.)Retire with backoff, 2.)增加shards, 或是3.)確認我們的partition key的設計是正確的。
Kinesis Producer Library(KPL)
但KPL(Kinesis Producer Library)的功能沒有壓縮這一個,如果要做資料壓縮得要自己來。而KPL的資料必須使用KCL(Kinesis client library)或特別的函式庫才可以進行解碼。
上面提到我們可以聚合(aggregate)資料(KPL batching)來增加throughput(示意圖如下)。而我們也可以通過使用RecordMaxBufferedTime(預設為100)的參數來左右資料的delay.
雖然使用KPL的效益不少,但還是有一些狀況不要使用會比較好。因為我們可以使用RecordsMaxBufferedTime來加大資料處理量,但也因為這樣做資料處理時間就會增加(latency變高),效能變好了但資料處理也不會哪麼即時了。所以如果我們的整體服務是不容許資料處理有delay的話,哪麼可能就需要考慮直接使用 AWS SDK來將資料輸入Kinesis Data Streams(如下示意圖)。
Kinesis Agent
說白話一點,這就是將agent(Java-based,建立於KPL之上) 安裝在 Linux 的OS上的。它通常都是監控一些OS中的 Log檔案,並將這些Log傳送回 Kinesis data streams。
主要功能在於
傳統的Kinesis Consumers
哪甚麼是Kinesis consumer呢? 它可以是如下圖這些元件或服務
Kinesis Consumer SDK-讀取資料(GetRecords)
傳統的consumer的資料讀取是pull的方式。每個shard最高是2MB(每秒)的 aggregate throughput。GetRecords 返回最多 10MB 的資料(限制是 5 秒)或最多 10000 條records. 每個shard每秒最多 5 個 GetRecords API calls等於 200ms 延遲. 例如我們有5個 consumer Application,要從同一個shard從取得資料(pull mode)。意味著每個consumer 需要一秒鐘(200ms x 5 consumer)才能取到資料,並且資料會少於400 KB/s (2MB / 5 consumer)。如下示意圖
Kinesis Client Library(KCL)
這可以用在多種開發語言上,例如JAVA, Go, Python, Ruby, .NET等等。會從使用 KPL 生成的 Kinesis 中讀取records(de-aggregation)。另外KCL會使用稱為shard discovery的方式來將 consumer與shard分群。而Checkingpoint的功能是為了當 KCL的作業失敗時可以從中斷的地方繼續進行而不用從頭再來。而為了達到這個功能我們就需要搭配其它AWS的服務 — DynamoDB來記錄資料處理到哪一段,通常一個shard是一筆row。但使用DynamoDB時我們需要注意DynamoDB的WCU/RCU是不是足夠,不然就要用 On-Demand的方式。否則我們就會看到KCL的效能降低, "ExpiredIteratorException"意味著我們要增加WCU。
Kinesis Connector Library
這是舊版的JAVA library, 若可以我們用 KCL來代替。而connector library可以將資料寫入到如圖下所示的AWS服務中
我們除了可以使用函式庫將一些資料處理的功能放在AWS EC2上面外,也可以使用AWS Lamda服務來處理資料,並將資料寫到其他地方。若使用Lamda的方式則函式庫都已經內建在Lamdba中,Lambda扮演的是一個從 KPL將 record de-agreggate的功能。所以Lambda可以當作一個lightweight的ETL工具將資料寫入到 S3/DynamoDB/Redshift/ElasticSearch與其他任何地方。
增強型的(Enhanced Fan-out)Consumer
我們上面提到這一種增強型的Consumer。由於它是push mode(透過http/2)的方式,所以沒有傳統上consumer 資料傳輸的throughput需要平均分給Application consumer的狀況,每一個consumer都可以從每個shard中得到專屬的2MB ,而這樣子的push mode也讓延遲大約只有70ms左右。