hw-kafka-client
Safe HaskellNone
LanguageHaskell2010

Kafka.Consumer

Description

Module to consume messages from Kafka topics.

Here's an example of code to consume messages from a topic:

import Control.Exception (bracket)
import Control.Monad (replicateM_)
import Kafka.Consumer

-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = brokersList ["localhost:9092"]
             <> groupId (ConsumerGroupId "consumer_example_group")
             <> noAutoCommit
             <> logLevel KafkaLogInfo

-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics [TopicName "kafka-client-example-topic"]
           <> offsetReset Earliest

-- Running an example
runConsumerExample :: IO ()
runConsumerExample = do
    res <- bracket mkConsumer clConsumer runHandler
    print res
    where
      mkConsumer = newConsumer consumerProps consumerSub
      clConsumer (Left err) = pure (Left err)
      clConsumer (Right kc) = (maybe (Right ()) Left) <$> closeConsumer kc
      runHandler (Left err) = pure (Left err)
      runHandler (Right kc) = processMessages kc

-- Example polling 10 times before stopping
processMessages :: KafkaConsumer -> IO (Either KafkaError ())
processMessages kafka = do
    replicateM_ 10 $ do
      msg <- pollMessage kafka (Timeout 1000)
      putStrLn $ "Message: " <> show msg
      err <- commitAllOffsets OffsetCommit kafka
      putStrLn $ "Offsets: " <> maybe "Committed." show err
    pure $ Right ()
Synopsis

Documentation

data KafkaConsumer Source #

The main type for Kafka consumption, used e.g. to poll and commit messages.

Its constructor is intentionally not exposed, instead, one should use newConsumer to acquire such a value.

bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v')) Source #

Deprecated: Isn't concern of this library. Use bisequenceA <$> bimapM f g r

crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v' Source #

Deprecated: Isn't concern of this library. Use bimap

crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v Source #

Deprecated: Isn't concern of this library. Use first

crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v' Source #

Deprecated: Isn't concern of this library. Use second

sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v) Source #

Deprecated: Isn't concern of this library. Use bitraverse id pure

traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v) Source #

Deprecated: Isn't concern of this library. Use bitraverse f pure

traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v)) Source #

Deprecated: Isn't concern of this library. Use bitraverse id pure <$> bitraverse f pure r

traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v')) Source #

Deprecated: Isn't concern of this library. Use sequenceA <$> traverse f r

newtype ConsumerGroupId Source #

Consumer group ID. Different consumers with the same consumer group ID will get assigned different partitions of each subscribed topic.

See Kafka documentation on consumer group

Constructors

ConsumerGroupId 

Instances

Instances details
IsString ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

Generic ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep ConsumerGroupId 
Instance details

Defined in Kafka.Consumer.Types

type Rep ConsumerGroupId = D1 ('MetaData "ConsumerGroupId" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'True) (C1 ('MetaCons "ConsumerGroupId" 'PrefixI 'True) (S1 ('MetaSel ('Just "unConsumerGroupId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))
Show ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

Ord ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep ConsumerGroupId Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep ConsumerGroupId = D1 ('MetaData "ConsumerGroupId" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'True) (C1 ('MetaCons "ConsumerGroupId" 'PrefixI 'True) (S1 ('MetaSel ('Just "unConsumerGroupId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

data ConsumerRecord k v Source #

Represents a received message from Kafka (i.e. used in a consumer)

Constructors

ConsumerRecord 

Fields

Instances

Instances details
Bifoldable ConsumerRecord Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

bifold :: Monoid m => ConsumerRecord m m -> m #

bifoldMap :: Monoid m => (a -> m) -> (b -> m) -> ConsumerRecord a b -> m #

bifoldr :: (a -> c -> c) -> (b -> c -> c) -> c -> ConsumerRecord a b -> c #

bifoldl :: (c -> a -> c) -> (c -> b -> c) -> c -> ConsumerRecord a b -> c #

Bifunctor ConsumerRecord Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

bimap :: (a -> b) -> (c -> d) -> ConsumerRecord a c -> ConsumerRecord b d #

first :: (a -> b) -> ConsumerRecord a c -> ConsumerRecord b c #

second :: (b -> c) -> ConsumerRecord a b -> ConsumerRecord a c #

Bitraversable ConsumerRecord Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

bitraverse :: Applicative f => (a -> f c) -> (b -> f d) -> ConsumerRecord a b -> f (ConsumerRecord c d) #

Functor (ConsumerRecord k) Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

fmap :: (a -> b) -> ConsumerRecord k a -> ConsumerRecord k b #

(<$) :: a -> ConsumerRecord k b -> ConsumerRecord k a #

Foldable (ConsumerRecord k) Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

fold :: Monoid m => ConsumerRecord k m -> m #

foldMap :: Monoid m => (a -> m) -> ConsumerRecord k a -> m #

foldMap' :: Monoid m => (a -> m) -> ConsumerRecord k a -> m #

foldr :: (a -> b -> b) -> b -> ConsumerRecord k a -> b #

foldr' :: (a -> b -> b) -> b -> ConsumerRecord k a -> b #

foldl :: (b -> a -> b) -> b -> ConsumerRecord k a -> b #

foldl' :: (b -> a -> b) -> b -> ConsumerRecord k a -> b #

foldr1 :: (a -> a -> a) -> ConsumerRecord k a -> a #

foldl1 :: (a -> a -> a) -> ConsumerRecord k a -> a #

toList :: ConsumerRecord k a -> [a] #

null :: ConsumerRecord k a -> Bool #

length :: ConsumerRecord k a -> Int #

elem :: Eq a => a -> ConsumerRecord k a -> Bool #

maximum :: Ord a => ConsumerRecord k a -> a #

minimum :: Ord a => ConsumerRecord k a -> a #

sum :: Num a => ConsumerRecord k a -> a #

product :: Num a => ConsumerRecord k a -> a #

Traversable (ConsumerRecord k) Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

traverse :: Applicative f => (a -> f b) -> ConsumerRecord k a -> f (ConsumerRecord k b) #

sequenceA :: Applicative f => ConsumerRecord k (f a) -> f (ConsumerRecord k a) #

mapM :: Monad m => (a -> m b) -> ConsumerRecord k a -> m (ConsumerRecord k b) #

sequence :: Monad m => ConsumerRecord k (m a) -> m (ConsumerRecord k a) #

Generic (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep (ConsumerRecord k v) 
Instance details

Defined in Kafka.Consumer.Types

type Rep (ConsumerRecord k v) = D1 ('MetaData "ConsumerRecord" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "ConsumerRecord" 'PrefixI 'True) ((S1 ('MetaSel ('Just "crTopic") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 TopicName) :*: (S1 ('MetaSel ('Just "crPartition") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 PartitionId) :*: S1 ('MetaSel ('Just "crOffset") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Offset))) :*: ((S1 ('MetaSel ('Just "crTimestamp") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Timestamp) :*: S1 ('MetaSel ('Just "crHeaders") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Headers)) :*: (S1 ('MetaSel ('Just "crKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 k) :*: S1 ('MetaSel ('Just "crValue") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 v)))))

Methods

from :: ConsumerRecord k v -> Rep (ConsumerRecord k v) x #

to :: Rep (ConsumerRecord k v) x -> ConsumerRecord k v #

(Read k, Read v) => Read (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

(Show k, Show v) => Show (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

(Eq k, Eq v) => Eq (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep (ConsumerRecord k v) Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep (ConsumerRecord k v) = D1 ('MetaData "ConsumerRecord" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "ConsumerRecord" 'PrefixI 'True) ((S1 ('MetaSel ('Just "crTopic") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 TopicName) :*: (S1 ('MetaSel ('Just "crPartition") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 PartitionId) :*: S1 ('MetaSel ('Just "crOffset") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Offset))) :*: ((S1 ('MetaSel ('Just "crTimestamp") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Timestamp) :*: S1 ('MetaSel ('Just "crHeaders") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Headers)) :*: (S1 ('MetaSel ('Just "crKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 k) :*: S1 ('MetaSel ('Just "crValue") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 v)))))

newtype Offset Source #

A message offset in a partition

Constructors

Offset 

Fields

Instances

Instances details
Generic Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep Offset 
Instance details

Defined in Kafka.Consumer.Types

type Rep Offset = D1 ('MetaData "Offset" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'True) (C1 ('MetaCons "Offset" 'PrefixI 'True) (S1 ('MetaSel ('Just "unOffset") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)))

Methods

from :: Offset -> Rep Offset x #

to :: Rep Offset x -> Offset #

Read Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Show Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

Methods

(==) :: Offset -> Offset -> Bool #

(/=) :: Offset -> Offset -> Bool #

Ord Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep Offset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep Offset = D1 ('MetaData "Offset" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'True) (C1 ('MetaCons "Offset" 'PrefixI 'True) (S1 ('MetaSel ('Just "unOffset") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)))

data OffsetCommit Source #

Offsets commit mode

Constructors

OffsetCommit

Forces consumer to block until the broker offsets commit is done

OffsetCommitAsync

Offsets will be committed in a non-blocking way

Instances

Instances details
Generic OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetCommit 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetCommit = D1 ('MetaData "OffsetCommit" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "OffsetCommit" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "OffsetCommitAsync" 'PrefixI 'False) (U1 :: Type -> Type))
Show OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetCommit Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetCommit = D1 ('MetaData "OffsetCommit" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "OffsetCommit" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "OffsetCommitAsync" 'PrefixI 'False) (U1 :: Type -> Type))

data OffsetReset Source #

Where to reset the offset when there is no initial offset in Kafka

See Kafka documentation on offset reset

Constructors

Earliest 
Latest 

Instances

Instances details
Generic OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetReset 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetReset = D1 ('MetaData "OffsetReset" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "Earliest" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Latest" 'PrefixI 'False) (U1 :: Type -> Type))
Show OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetReset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetReset = D1 ('MetaData "OffsetReset" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "Earliest" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Latest" 'PrefixI 'False) (U1 :: Type -> Type))

data OffsetStoreMethod Source #

Indicates the method of storing the offsets

Constructors

OffsetStoreBroker

Offsets are stored in Kafka broker (preferred)

OffsetStoreFile FilePath OffsetStoreSync

Offsets are stored in a file (and synced to disk according to the sync policy)

Instances

Instances details
Generic OffsetStoreMethod Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetStoreMethod 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetStoreMethod = D1 ('MetaData "OffsetStoreMethod" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "OffsetStoreBroker" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "OffsetStoreFile" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 FilePath) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 OffsetStoreSync)))
Show OffsetStoreMethod Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq OffsetStoreMethod Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetStoreMethod Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetStoreMethod = D1 ('MetaData "OffsetStoreMethod" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "OffsetStoreBroker" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "OffsetStoreFile" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 FilePath) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 OffsetStoreSync)))

data OffsetStoreSync Source #

Indicates how offsets are to be synced to disk

Constructors

OffsetSyncDisable

Do not sync offsets (in Kafka: -1)

OffsetSyncImmediate

Sync immediately after each offset commit (in Kafka: 0)

OffsetSyncInterval Int

Sync after specified interval in millis

Instances

Instances details
Generic OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep OffsetStoreSync 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetStoreSync = D1 ('MetaData "OffsetStoreSync" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "OffsetSyncDisable" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "OffsetSyncImmediate" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "OffsetSyncInterval" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int))))
Show OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetStoreSync Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep OffsetStoreSync = D1 ('MetaData "OffsetStoreSync" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "OffsetSyncDisable" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "OffsetSyncImmediate" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "OffsetSyncInterval" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int))))

data PartitionOffset Source #

The partition offset

Instances

Instances details
Generic PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep PartitionOffset 
Instance details

Defined in Kafka.Consumer.Types

type Rep PartitionOffset = D1 ('MetaData "PartitionOffset" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) ((C1 ('MetaCons "PartitionOffsetBeginning" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "PartitionOffsetEnd" 'PrefixI 'False) (U1 :: Type -> Type)) :+: (C1 ('MetaCons "PartitionOffset" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :+: (C1 ('MetaCons "PartitionOffsetStored" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "PartitionOffsetInvalid" 'PrefixI 'False) (U1 :: Type -> Type))))
Show PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep PartitionOffset Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep PartitionOffset = D1 ('MetaData "PartitionOffset" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) ((C1 ('MetaCons "PartitionOffsetBeginning" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "PartitionOffsetEnd" 'PrefixI 'False) (U1 :: Type -> Type)) :+: (C1 ('MetaCons "PartitionOffset" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :+: (C1 ('MetaCons "PartitionOffsetStored" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "PartitionOffsetInvalid" 'PrefixI 'False) (U1 :: Type -> Type))))

data RebalanceEvent Source #

A set of events which happen during the rebalancing process

Constructors

RebalanceBeforeAssign [(TopicName, PartitionId)]

Happens before Kafka Client confirms new assignment

RebalanceAssign [(TopicName, PartitionId)]

Happens after the new assignment is confirmed

RebalanceBeforeRevoke [(TopicName, PartitionId)]

Happens before Kafka Client confirms partitions rejection

RebalanceRevoke [(TopicName, PartitionId)]

Happens after the rejection is confirmed

Instances

Instances details
Generic RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep RebalanceEvent 
Instance details

Defined in Kafka.Consumer.Types

type Rep RebalanceEvent = D1 ('MetaData "RebalanceEvent" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) ((C1 ('MetaCons "RebalanceBeforeAssign" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [(TopicName, PartitionId)])) :+: C1 ('MetaCons "RebalanceAssign" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [(TopicName, PartitionId)]))) :+: (C1 ('MetaCons "RebalanceBeforeRevoke" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [(TopicName, PartitionId)])) :+: C1 ('MetaCons "RebalanceRevoke" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [(TopicName, PartitionId)]))))
Show RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep RebalanceEvent Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep RebalanceEvent = D1 ('MetaData "RebalanceEvent" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) ((C1 ('MetaCons "RebalanceBeforeAssign" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [(TopicName, PartitionId)])) :+: C1 ('MetaCons "RebalanceAssign" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [(TopicName, PartitionId)]))) :+: (C1 ('MetaCons "RebalanceBeforeRevoke" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [(TopicName, PartitionId)])) :+: C1 ('MetaCons "RebalanceRevoke" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [(TopicName, PartitionId)]))))

data SubscribedPartitions Source #

Partitions subscribed by a consumer

Constructors

SubscribedPartitions [PartitionId]

Subscribe only to those partitions

SubscribedPartitionsAll

Subscribe to all partitions

Instances

Instances details
Generic SubscribedPartitions Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep SubscribedPartitions 
Instance details

Defined in Kafka.Consumer.Types

type Rep SubscribedPartitions = D1 ('MetaData "SubscribedPartitions" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "SubscribedPartitions" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [PartitionId])) :+: C1 ('MetaCons "SubscribedPartitionsAll" 'PrefixI 'False) (U1 :: Type -> Type))
Show SubscribedPartitions Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq SubscribedPartitions Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep SubscribedPartitions Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep SubscribedPartitions = D1 ('MetaData "SubscribedPartitions" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "SubscribedPartitions" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [PartitionId])) :+: C1 ('MetaCons "SubscribedPartitionsAll" 'PrefixI 'False) (U1 :: Type -> Type))

data Timestamp Source #

Consumer record timestamp

Instances

Instances details
Generic Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep Timestamp 
Instance details

Defined in Kafka.Consumer.Types

type Rep Timestamp = D1 ('MetaData "Timestamp" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "CreateTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Millis)) :+: (C1 ('MetaCons "LogAppendTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Millis)) :+: C1 ('MetaCons "NoTimestamp" 'PrefixI 'False) (U1 :: Type -> Type)))
Read Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Show Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep Timestamp Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep Timestamp = D1 ('MetaData "Timestamp" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "CreateTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Millis)) :+: (C1 ('MetaCons "LogAppendTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Millis)) :+: C1 ('MetaCons "NoTimestamp" 'PrefixI 'False) (U1 :: Type -> Type)))

data TopicPartition Source #

Kafka topic partition structure

Instances

Instances details
Generic TopicPartition Source # 
Instance details

Defined in Kafka.Consumer.Types

Associated Types

type Rep TopicPartition 
Instance details

Defined in Kafka.Consumer.Types

type Rep TopicPartition = D1 ('MetaData "TopicPartition" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "TopicPartition" 'PrefixI 'True) (S1 ('MetaSel ('Just "tpTopicName") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 TopicName) :*: (S1 ('MetaSel ('Just "tpPartition") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 PartitionId) :*: S1 ('MetaSel ('Just "tpOffset") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 PartitionOffset))))
Show TopicPartition Source # 
Instance details

Defined in Kafka.Consumer.Types

Eq TopicPartition Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep TopicPartition Source # 
Instance details

Defined in Kafka.Consumer.Types

type Rep TopicPartition = D1 ('MetaData "TopicPartition" "Kafka.Consumer.Types" "hw-kafka-client-5.3.0-6F0nERM3YC6BKbbrjPfHDJ" 'False) (C1 ('MetaCons "TopicPartition" 'PrefixI 'True) (S1 ('MetaSel ('Just "tpTopicName") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 TopicName) :*: (S1 ('MetaSel ('Just "tpPartition") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 PartitionId) :*: S1 ('MetaSel ('Just "tpOffset") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 PartitionOffset))))

runConsumer Source #

Arguments

:: ConsumerProperties 
-> Subscription 
-> (KafkaConsumer -> IO (Either KafkaError a))

A callback function to poll and handle messages

-> IO (Either KafkaError a) 

Deprecated: Use newConsumer/closeConsumer instead

Runs high-level kafka consumer. A callback provided is expected to call pollMessage when convenient.

newConsumer :: MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer) Source #

Create a KafkaConsumer. This consumer must be correctly released using closeConsumer.

assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #

Assigns the consumer to consume from the given topics, partitions, and offsets.

assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (Map TopicName [PartitionId])) Source #

Returns current consumer's assignment

subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)]) Source #

Returns current consumer's subscription

pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #

Pauses specified partitions on the current consumer.

resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError Source #

Resumes specified partitions on the current consumer.

committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #

Retrieve committed offsets for topics+partitions.

position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition]) Source #

Retrieve current positions (last consumed message offset+1) for the current running instance of the consumer. If the current consumer hasn't received any messages for a given partition, PartitionOffsetInvalid is returned.

seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError) Source #

Deprecated: Use seekPartitions instead

Seek a particular offset for each provided TopicPartition

seekPartitions :: MonadIO m => KafkaConsumer -> [TopicPartition] -> Timeout -> m (Maybe KafkaError) Source #

Seek consumer for partitions in partitions to the per-partition offset in the offset field of partitions.

pollMessage Source #

Arguments

:: MonadIO m 
=> KafkaConsumer 
-> Timeout

the timeout, in milliseconds

-> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))

Left on error or timeout, right for success

Polls a single message

pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO () Source #

Polls the provided kafka consumer for events.

Events will cause application provided callbacks to be called.

The Timeout argument specifies the maximum amount of time (in milliseconds) that the call will block waiting for events.

This function is called on each pollMessage and, if runtime allows multi threading, it is called periodically in a separate thread to ensure the callbacks are handled ASAP.

There is no particular need to call this function manually unless some special cases in a single-threaded environment when polling for events on each pollMessage is not frequent enough.

pollMessageBatch :: MonadIO m => KafkaConsumer -> Timeout -> BatchSize -> m [Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))] Source #

Polls up to BatchSize messages. Unlike pollMessage this function does not return usual "timeout" errors. An empty batch is returned when there are no messages available.

This API is not available when CallbackPollMode is set to CallbackPollModeSync.

commitOffsetMessage :: MonadIO m => OffsetCommit -> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #

Commit message's offset on broker for the message's partition.

commitAllOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError) Source #

Commit offsets for all currently assigned partitions.

commitPartitionsOffsets :: MonadIO m => OffsetCommit -> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #

Commit offsets for all currently assigned partitions.

storeOffsets :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError) Source #

Stores offsets locally

storeOffsetMessage :: MonadIO m => KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError) Source #

Stores message's offset locally for the message's partition.

rewindConsumer :: MonadIO m => KafkaConsumer -> Timeout -> m (Maybe KafkaError) Source #

Rewind consumer's consume position to the last committed offsets for the current assignment. NOTE: follows https://github.com/edenhill/librdkafka/blob/master/examples/transactions.c#L166

closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError) Source #

Closes the consumer.

See newConsumer

data RdKafkaRespErrT Source #

Constructors

RdKafkaRespErrBegin 
RdKafkaRespErrBadMsg 
RdKafkaRespErrBadCompression 
RdKafkaRespErrDestroy 
RdKafkaRespErrFail 
RdKafkaRespErrTransport 
RdKafkaRespErrCritSysResource 
RdKafkaRespErrResolve 
RdKafkaRespErrMsgTimedOut 
RdKafkaRespErrPartitionEof 
RdKafkaRespErrUnknownPartition 
RdKafkaRespErrFs 
RdKafkaRespErrUnknownTopic 
RdKafkaRespErrAllBrokersDown 
RdKafkaRespErrInvalidArg 
RdKafkaRespErrTimedOut 
RdKafkaRespErrQueueFull 
RdKafkaRespErrIsrInsuff 
RdKafkaRespErrNodeUpdate 
RdKafkaRespErrSsl 
RdKafkaRespErrWaitCoord 
RdKafkaRespErrUnknownGroup 
RdKafkaRespErrInProgress 
RdKafkaRespErrPrevInProgress 
RdKafkaRespErrExistingSubscription 
RdKafkaRespErrAssignPartitions 
RdKafkaRespErrRevokePartitions 
RdKafkaRespErrConflict 
RdKafkaRespErrState 
RdKafkaRespErrUnknownProtocol 
RdKafkaRespErrNotImplemented 
RdKafkaRespErrAuthentication 
RdKafkaRespErrNoOffset 
RdKafkaRespErrOutdated 
RdKafkaRespErrTimedOutQueue 
RdKafkaRespErrUnsupportedFeature 
RdKafkaRespErrWaitCache 
RdKafkaRespErrIntr 
RdKafkaRespErrKeySerialization 
RdKafkaRespErrValueSerialization 
RdKafkaRespErrKeyDeserialization 
RdKafkaRespErrValueDeserialization 
RdKafkaRespErrPartial 
RdKafkaRespErrReadOnly 
RdKafkaRespErrNoent 
RdKafkaRespErrUnderflow 
RdKafkaRespErrInvalidType 
RdKafkaRespErrRetry 
RdKafkaRespErrPurgeQueue 
RdKafkaRespErrPurgeInflight 
RdKafkaRespErrFatal 
RdKafkaRespErrInconsistent 
RdKafkaRespErrGaplessGuarantee 
RdKafkaRespErrMaxPollExceeded 
RdKafkaRespErrUnknownBroker 
RdKafkaRespErrNotConfigured 
RdKafkaRespErrFenced 
RdKafkaRespErrApplication 
RdKafkaRespErrAssignmentLost 
RdKafkaRespErrNoop 
RdKafkaRespErrAutoOffsetReset 
RdKafkaRespErrLogTruncation 
RdKafkaRespErrEnd 
RdKafkaRespErrUnknown 
RdKafkaRespErrNoError 
RdKafkaRespErrOffsetOutOfRange 
RdKafkaRespErrInvalidMsg 
RdKafkaRespErrUnknownTopicOrPart 
RdKafkaRespErrInvalidMsgSize 
RdKafkaRespErrLeaderNotAvailable 
RdKafkaRespErrNotLeaderForPartition 
RdKafkaRespErrRequestTimedOut 
RdKafkaRespErrBrokerNotAvailable 
RdKafkaRespErrReplicaNotAvailable 
RdKafkaRespErrMsgSizeTooLarge 
RdKafkaRespErrStaleCtrlEpoch 
RdKafkaRespErrOffsetMetadataTooLarge 
RdKafkaRespErrNetworkException 
RdKafkaRespErrCoordinatorLoadInProgress 
RdKafkaRespErrCoordinatorNotAvailable 
RdKafkaRespErrNotCoordinator 
RdKafkaRespErrTopicException 
RdKafkaRespErrRecordListTooLarge 
RdKafkaRespErrNotEnoughReplicas 
RdKafkaRespErrNotEnoughReplicasAfterAppend 
RdKafkaRespErrInvalidRequiredAcks 
RdKafkaRespErrIllegalGeneration 
RdKafkaRespErrInconsistentGroupProtocol 
RdKafkaRespErrInvalidGroupId 
RdKafkaRespErrUnknownMemberId 
RdKafkaRespErrInvalidSessionTimeout 
RdKafkaRespErrRebalanceInProgress 
RdKafkaRespErrInvalidCommitOffsetSize 
RdKafkaRespErrTopicAuthorizationFailed 
RdKafkaRespErrGroupAuthorizationFailed 
RdKafkaRespErrClusterAuthorizationFailed 
RdKafkaRespErrInvalidTimestamp 
RdKafkaRespErrUnsupportedSaslMechanism 
RdKafkaRespErrIllegalSaslState 
RdKafkaRespErrUnsupportedVersion 
RdKafkaRespErrTopicAlreadyExists 
RdKafkaRespErrInvalidPartitions 
RdKafkaRespErrInvalidReplicationFactor 
RdKafkaRespErrInvalidReplicaAssignment 
RdKafkaRespErrInvalidConfig 
RdKafkaRespErrNotController 
RdKafkaRespErrInvalidRequest 
RdKafkaRespErrUnsupportedForMessageFormat 
RdKafkaRespErrPolicyViolation 
RdKafkaRespErrOutOfOrderSequenceNumber 
RdKafkaRespErrDuplicateSequenceNumber 
RdKafkaRespErrInvalidProducerEpoch 
RdKafkaRespErrInvalidTxnState 
RdKafkaRespErrInvalidProducerIdMapping 
RdKafkaRespErrInvalidTransactionTimeout 
RdKafkaRespErrConcurrentTransactions 
RdKafkaRespErrTransactionCoordinatorFenced 
RdKafkaRespErrTransactionalIdAuthorizationFailed 
RdKafkaRespErrSecurityDisabled 
RdKafkaRespErrOperationNotAttempted 
RdKafkaRespErrKafkaStorageError 
RdKafkaRespErrLogDirNotFound 
RdKafkaRespErrSaslAuthenticationFailed 
RdKafkaRespErrUnknownProducerId 
RdKafkaRespErrReassignmentInProgress 
RdKafkaRespErrDelegationTokenAuthDisabled 
RdKafkaRespErrDelegationTokenNotFound 
RdKafkaRespErrDelegationTokenOwnerMismatch 
RdKafkaRespErrDelegationTokenRequestNotAllowed 
RdKafkaRespErrDelegationTokenAuthorizationFailed 
RdKafkaRespErrDelegationTokenExpired 
RdKafkaRespErrInvalidPrincipalType 
RdKafkaRespErrNonEmptyGroup 
RdKafkaRespErrGroupIdNotFound 
RdKafkaRespErrFetchSessionIdNotFound 
RdKafkaRespErrInvalidFetchSessionEpoch 
RdKafkaRespErrListenerNotFound 
RdKafkaRespErrTopicDeletionDisabled 
RdKafkaRespErrFencedLeaderEpoch 
RdKafkaRespErrUnknownLeaderEpoch 
RdKafkaRespErrUnsupportedCompressionType 
RdKafkaRespErrStaleBrokerEpoch 
RdKafkaRespErrOffsetNotAvailable 
RdKafkaRespErrMemberIdRequired 
RdKafkaRespErrPreferredLeaderNotAvailable 
RdKafkaRespErrGroupMaxSizeReached 
RdKafkaRespErrFencedInstanceId 
RdKafkaRespErrEligibleLeadersNotAvailable 
RdKafkaRespErrElectionNotNeeded 
RdKafkaRespErrNoReassignmentInProgress 
RdKafkaRespErrGroupSubscribedToTopic 
RdKafkaRespErrInvalidRecord 
RdKafkaRespErrUnstableOffsetCommit 
RdKafkaRespErrThrottlingQuotaExceeded 
RdKafkaRespErrProducerFenced 
RdKafkaRespErrResourceNotFound 
RdKafkaRespErrDuplicateResource 
RdKafkaRespErrUnacceptableCredential 
RdKafkaRespErrInconsistentVoterSet 
RdKafkaRespErrInvalidUpdateVersion 
RdKafkaRespErrFeatureUpdateFailed 
RdKafkaRespErrPrincipalDeserializationFailure 
RdKafkaRespErrEndAll