Combine Pipeline

Updated:

Combine Advanced

Combine pipelines

Combine pipelines so most of the time when you are just fetched some data and came back with an array and published that array into your App.

But one of the most important pats of combine is that these publishers and subscribers can stay alive even past the first publish

This capter is going to make fake publisher that’s will publish values every couple seconds and then we’re going to manipulate that data with some really cool unique subscribers

  • Basic Setup

When you go and you download data from an API It’s going to return back an array of data and then we can take data we put it on the screen (Looks like regular get API calls if you are going to just go fetch some data and then once it returns you’re done you have all of your data that’s the most common kind of fetch request)

import SwiftUI
import Combine

// MARK: -  FETCHDATA
class AdvancedCombineDataService {
	// MARK: -  PROPERTY
	@Published var basicPublisher: [String] = []
	// MARK: -  INIT
	init() {
		publishFakeData()
	}
	// MARK: -  FUNCTION
	private func publishFakeData() {
		DispatchQueue.main.asyncAfter(deadline: .now() + 1.0) {
			self.basicPublisher = ["one", "two", "three"]
		}
	}
}


// MARK: -  VIEWMODEL

class AdvancedComineViewModel: ObservableObject {
// MARK: -  PROPERTY
@Published var data: [String] = []
let dataService = AdvancedCombineDataService() // singleton

var cancellables = Set<AnyCancellable>()
// MARK: -  INIT
init() {
addSubscribers()
}
// MARK: -  FUNCTION
private func addSubscribers() {
dataService.$basicPublisher
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      print("ERROR: \(error.localizedDescription)")
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data = returnedValue
  }
  .store(in: &cancellables)

}
}

// MARK: -  VIEW
struct AdvancedCombineBootCamp: View {
// MARK: -  PROPERTY
@StateObject private var vm = AdvancedComineViewModel()

// MARK: -  BODY
var body: some View {
ScrollView {
VStack {
ForEach(vm.data, id: \.self) {
  Text($0)
    .font(.largeTitle)
    .fontWeight(.black)
} //: LOOP
} //: VSTACK
} //: SCROLL
}
}

image

In above code, these publishers can stay alive after they get a single publish so in this example we just get one publish and then we’re finished but this publisher is still alive. So if anything ever were to publish here agian our whole flow ould then update which is great


It’s going to now is convert this instead of holding it right here let’s just publish individual values.

There are two types of publishers in combine and the first one is going to be called a current value publisher so, if we didn’t have this at published property wrapper the way to wirte this would be we can create a constant and we can call this current value publisher. This is also called basic publisher

Second one is a passThrough publisher. we don’t give it a starting value this because the passTrough subject works the exact smae way as the current value publisher except it does not actually hold a current value. So, passThrough subject can be a little bit more memory efficient if the value you were holding here so if this was like an array of maybe let’s say an array of images and we had all of our array of images the current value that could be stored taking up a lot of memory space in your App. So, if you didn’t actually need to hold the current value in this publisher because again after we publish we’re holding the values in this data around down here

import SwiftUI
import Combine

// MARK: -  FETCHDATA
class AdvancedCombineDataService {
// MARK: -  PROPERTY
// @Published var basicPublisher: String = "first publish"
let currentValuePublisher = CurrentValueSubject<String, Error>("first publish")
let passThroughPublisher = PassthroughSubject<String, Error>()
// MARK: -  INIT
init() {
publishFakeData()
}
// MARK: -  FUNCTION
private func publishFakeData() {
let items = ["one", "two", "three"]

for x in items.indices {
  DispatchQueue.main.asyncAfter(deadline: .now() + Double(x)) {
    // self.currentValuePublisher.send(items[x])
    self.passThroughPublisher.send(items[x])
  }
}
}
}

// MARK: -  VIEWMODEL

class AdvancedComineViewModel: ObservableObject {
// MARK: -  PROPERTY
@Published var data: [String] = []
let dataService = AdvancedCombineDataService() // singleton

var cancellables = Set<AnyCancellable>()
// MARK: -  INIT
init() {
addSubscribers()
}
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      print("ERROR: \(error.localizedDescription)")
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)
}
}

// MARK: -  VIEW
struct AdvancedCombineBootCamp: View {
// MARK: -  PROPERTY
@StateObject private var vm = AdvancedComineViewModel()

// MARK: -  BODY
var body: some View {
ScrollView {
VStack {
  ForEach(vm.data, id: \.self) {
    Text($0)
      .font(.largeTitle)
      .fontWeight(.black)
  } //: LOOP
} //: VSTACK
} //: SCROLL
}
}

스크린샷

Sequnce Operatations

  • first() : tthis only one that’s going to come through the rest of our pipeline is first publish
import SwiftUI
import Combine

// MARK: -  FETCHDATA
class AdvancedCombineDataService {
// MARK: -  PROPERTY
// @Published var basicPublisher: String = "first publish"
let currentValuePublisher = CurrentValueSubject<String, Error>("first publish")
let passThroughPublisher = PassthroughSubject<Int, Error>()
// MARK: -  INIT
init() {
publishFakeData()
}
// MARK: -  FUNCTION
private func publishFakeData() {
let items: [Int] = Array(0..<11)

for x in items.indices {
  DispatchQueue.main.asyncAfter(deadline: .now() + Double(x)) {
    // self.currentValuePublisher.send(items[x])
    self.passThroughPublisher.send(items[x])
  }
}
}
}


// MARK: -  VIEWMODEL

class AdvancedComineViewModel: ObservableObject {
// MARK: -  PROPERTY
@Published var data: [String] = []
let dataService = AdvancedCombineDataService() // singleton

var cancellables = Set<AnyCancellable>()
// MARK: -  INIT
init() {
addSubscribers()
}
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
  .first() -> this only one that's going to come through the rest of our pipeline is first publish

  .map({ String($0) })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      print("ERROR: \(error.localizedDescription)")
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)

}
}

// MARK: -  VIEW
struct AdvancedCombineBootCamp: View {
// MARK: -  PROPERTY
@StateObject private var vm = AdvancedComineViewModel()

// MARK: -  BODY
var body: some View {
ScrollView {
VStack {
  ForEach(vm.data, id: \.self) {
    Text($0)
      .font(.largeTitle)
      .fontWeight(.black)
  } //: LOOP
} //: VSTACK
} //: SCROLL
}
}

image

  • .first(where: ) : this only one that’s going to come through the rest of our pipeline is first publish
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
  // .first() -> this only one that's going to come through the rest of our pipeline is first publish
  .first(where: { $0 > 4 }) // only want pick out certain publishes to actually show on this screen

  .map({ String($0) })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      print("ERROR: \(error.localizedDescription)")
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)

}

image

  • tryFrist(where: ) : It is going to publish the first item that sarisfies the closure to try notifies us that this closure can throw errors so if there’s a reason tath maybe we get a certain publish here if we want to throw an error onto this screen or something like that.
// MARK: -  VIEWMODEL

class AdvancedComineViewModel: ObservableObject {
// MARK: -  PROPERTY
@Published var data: [String] = []
@Published var error: String = ""
let dataService = AdvancedCombineDataService() // singleton

var cancellables = Set<AnyCancellable>()
// MARK: -  INIT
init() {
addSubscribers()
}
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
  .tryFirst(where: {
    if $0 == 3 {
      throw URLError(.badServerResponse)
    }
    return $0 > 4
  })


  .map({ String($0) })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      self.error = "ERROR: \(error)"
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)

}
}

// MARK: -  VIEW
struct AdvancedCombineBootCamp: View {
// MARK: -  PROPERTY
@StateObject private var vm = AdvancedComineViewModel()

// MARK: -  BODY
var body: some View {
ScrollView {
VStack {
ForEach(vm.data, id: \.self) {
  Text($0)
    .font(.largeTitle)
    .fontWeight(.black)
} //: LOOP
if !vm.error.isEmpty {
  Text(vm.error)
}
} //: VSTACK
} //: SCROLL
}
}

image

  • last() : we call last item that gets published. Byt las publisher we need to know somehow when this publisher is finished when is the last publisher because when we’re subscibing to the publisher it is basically going to be alive forever unitl we get some sort of completion
	// MARK: -  FUNCTION
private func publishFakeData() {
let items: [Int] = Array(0..<11)

for x in items.indices {
DispatchQueue.main.asyncAfter(deadline: .now() + Double(x)) {
  // self.currentValuePublisher.send(items[x])
  self.passThroughPublisher.send(items[x])

  // add completion to show last()
  if x == items.indices.last {
    self.passThroughPublisher.send(completion: .finished)
  }
}
}
}
}

// MARK: -  VIEWMODEL

class AdvancedComineViewModel: ObservableObject {
// MARK: -  PROPERTY
@Published var data: [String] = []
@Published var error: String = ""
let dataService = AdvancedCombineDataService() // singleton

var cancellables = Set<AnyCancellable>()
// MARK: -  INIT
init() {
addSubscribers()
}
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Sequnce Operations
  .last()
  .map({ String($0) })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      self.error = "ERROR: \(error)"
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)

}
}

image

  • last(where: {$0 < 4}) : This is the only show less than 4 until working through 0 ~ 10 and then show screen 3
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
.last(where: { $0 < 4 })
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)

}
}

image

  • dropFirst() : literally same thing that our normal flow will do except we don’t get first publish comning through and this coul be handy this drop first if we were using maybe like the basic publisher or currenValuePublisher and we gave it a starting value that maybe a blank string and you don’t want to publish first blank string to use dropFirst()
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
  .dropFirst()
  .map({ String($0) })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      self.error = "ERROR: \(error)"
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)

}
}

스크린샷

// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
.dropFirst(3) // drop 0,1,2 publish -> Start from 3 to 10
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)

}

스크린샷

  • drop(while: ) : Omits elements from the upstream publisher until a given closure returns false, before republishing all remaining elements.
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
.drop(while: { $0 < 5}) // while it's less than five we're going to continue to drop these values
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)

}

스크린샷

  • prefix(_ maxLength:) : Republishes elements up to specified maximum count.
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
.prefix(4) // first 4 published show
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)

}

스크린샷

  • prefix(while: ) - The publisher finishes when the closure returns false
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
.prefix(while: { $0 < 5 })
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)

}

스크린샷

  • .output(at: Int) : this integer with the actual integer that we were publishing if we were publishing strings or some custom data type would still be integer. This is looking for the item that gets published at this index
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
.output(at: 5) // publish at 5 index
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)

}

image

  • output(in: RangeExpression) : set up index of range
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Sequnce Operations
.output(in: 2..<4) // index between 2 and 3
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

스크린샷


Mathematic Operatations

  • .max() : published are integers which are capable of having mathematic operations to find maxium one of numbers in publishers
	// MARK: -  FUNCTION
	private func addSubscribers() {
		// dataService.currentValuePublisher
		dataService.passThroughPublisher

		// MARK: - Add publisher pipeline

		// Mathematic Operation
			.max()
			.map({ String($0) })
			.sink { completion in
				switch completion {
				case .finished:
					break
				case .failure(let error):
					self.error = "ERROR: \(error)"
				}
			} receiveValue: { [weak self] returnedValue in
				self?.data.append(returnedValue)
			}
			.store(in: &cancellables)
	}
}

image

  • .max(by:) : maxium values of elements received from the upstream publisher base on an ordering closure you specify. Compare step by step the value number and then maxinum number is return
// MARK: -  FUNCTION
private func publishFakeData() {
let items: [Int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 10]

for x in items.indices {
DispatchQueue.main.asyncAfter(deadline: .now() + Double(x)) {
  // self.currentValuePublisher.send(items[x])
  self.passThroughPublisher.send(items[x])

  // add completion to show last()
  if x == items.indices.last {
    self.passThroughPublisher.send(completion: .finished)
  }
}
}
}
}
// MARK: -  VIEWMODEL
class AdvancedComineViewModel: ObservableObject {
// MARK: -  PROPERTY
@Published var data: [String] = []
@Published var error: String = ""
let dataService = AdvancedCombineDataService() // singleton

var cancellables = Set<AnyCancellable>()
// MARK: -  INIT
init() {
addSubscribers()
}
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Mathematic Operation
.max(by: { int1, int2 in
  return int1 < int2
})
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}
}

image

  • min() : Same as max() except find out minimum values
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Mathematic Operation
.min()
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

image

스크린샷


Filter, Reducing Operatations

  • map() : we can transfrom a certain type which we’re getting an iteger and we are mapping it to a string right here
	// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Filter / Reducing Operation
  .map({ String($0) })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      self.error = "ERROR: \(error)"
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)
}
  • tryMap(transform: (Int) throws -> T) : It can possibly can throw an error and we can have the integer
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Filter / Reducing Operation
.tryMap({ int in
  if int == 5 {
    throw URLError(.badServerResponse)
  }
  return String(int)
})
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

image

  • compactMap(transform: (Int) -> T?) : compact mapping is basically the same thing as mapping but if somthing doen’t work we can just ignore that value. if somthing that maybe could not convert to whatever type we want to convert it to. This is handy if we had maybe some raw type from the API coming through and we wanted to try to map it to our local type but if it failed we just want to ignore that value and we wany to continue to try to map all the rest of the publishes
	private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Filter / Reducing Operation
.compactMap({ int in
  if int == 5 {
    return nil
  }
  return String(int)
})
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

스크린샷

  • filter(isIncluded: (Int) -> Bool) : filter pretty self-explanatory. we are going to make filter for the values between three and seven
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Filter / Reducing Operation
.filter({ ($0 > 3) && ($0 < 7)}) // show greater than 3 and less than 7
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

image

  • tryFilter(isIncluded: (Int) throws -> Bool) : tryFilter can then throw an error if you want

  • removeDuplicates() : This ia really useful one so if two valuw get published back to back are the exact same value then we can remove one of those because it’s duplicate. So, if you have a really complex app where a bunch of different things might be forcing publishers into this pipeline you might have situations you probably will have situations where maybe two things cause publishers to emit.

The remove duplicates it has to be bakc to back publishes it is not back to back then it doesn’t consider it duplicate

private func publishFakeData() {
let items: [Int] = [1, 2, 3, 4, 4, 4, 5, 6, 7, 4, 8, 9, 11, 10]
....

private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
  .removeDuplicates()
  .map({ String($0) })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      self.error = "ERROR: \(error)"
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)
}

스크린샷

  • removeDuplicates(by:) : set first, second publishes typical insert logic compare both of them (just let the exact numbers removepulicates)

  • replaceNil(with: T) : if a value that gets published is nil we can then replace it with this specified value whatever type we wnat to then publish instead of nil

class AdvancedCombineDataService {
// MARK: -  PROPERTY
// @Published var basicPublisher: String = "first publish"
let currentValuePublisher = CurrentValueSubject<String, Error>("first publish")
let passThroughPublisher = PassthroughSubject<Int?, Error>()
// MARK: -  INIT
init() {
  publishFakeData()
}
// MARK: -  FUNCTION
private func publishFakeData() {
  let items: [Int?] = [1, nil, , 4, 5, 6, 7, nil, 9, 11, 10]


  // MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline


// Filter / Reducing Operation
  .replaceNil(with: 5)
  .map({ String($0) })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      self.error = "ERROR: \(error)"
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)
}

스크린샷

  • replaceEmpty(with: Int) : Silmilarly we can also replcae empty not it so it maybe one of those arrays an empty array we can replace that with some default values

  • replaceError(with: Int) : if somwhere in the pipline we’re getting an error we can then replace it with the default value

  • scan(initialResult: T, nextPartialResult: (T, Int) -> T) : To auumulate all previously- published values into single value. Here is starting number 1 then sum of values in array

private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Filter / Reducing Operation
.scan(1, { existingValue, newValue in
  return existingValue + newValue
})
// .scan(1, { $0 + $1 }) // short version
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

image

  • reduce() : This is same as scan() logic But, reduce is all of these publishes into one single ending value (just one publish value)
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Filter / Reducing Operation
.reduce(0, { $0  + $1 })
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

image

  • collect() : This is going to collect all of the publishes and then publish them all at once. Once it’s done publish tham all on the screen (show published when pipline finished )
	private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline

// Filter / Reducing Operation
.map({ String($0) })
.collect() // collect all of published value in one array
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data = returnedValue  // change to array value
  // self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

image

  • collect(count: Int) : collect 3 publishes then show display
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Filter / Reducing Operation
  .map({ String($0) })
  // .collect() // collect all of published value in one array
  .collect(3) // collect 3 publishes then show display
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      self.error = "ERROR: \(error)"
    }
  } receiveValue: { [weak self] returnedValue in
      self?.data.append(contentsOf: returnedValue)
    // self?.data.append(returnedValue)
  }
  .store(in: &cancellables)
}

스크린샷

  • allSatisfy(predicate: (Bool) -> Bool) : Show single Boolean values that checking if with all satisfy that all of the item that get published satusfy this criteria
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher
// MARK: - Add publisher pipeline
// Filter / Reducing Operation
.allSatisfy({ $0 == 5 })
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in

  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

스크린샷


Timing Operation

  • debounce() : We can debounce we give it a specified amount of time. debounce is watching all of the publishers that get that come through the publisher and it’s only going to publish a value through here if there’s at least one second between each of the publsishes

Example, we are decoungcing ro 0.75 seconds so we get this publish that’s going through the pipeline butn then that publisher is it’s going to wait for 0.75 seconds to see if there’s another publish

debonce operation commonly for when working with text fields. If a user is typing in a textField every time they type a letter it’s ptobably going to publish a value and you can imagine if users are typing really quickly if you type in 10 letters quuckly then run your entire pipeline 10 times for each value so instead what you could do is add a letter debounce

private func publishFakeData() {
  let items: [Int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 10]
  DispatchQueue.main.asyncAfter(deadline: .now() + 0) {
    self.passThroughPublisher.send(1)
  }
  DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) {
    self.passThroughPublisher.send(2)
  }
  DispatchQueue.main.asyncAfter(deadline: .now() + 1.5) {
    self.passThroughPublisher.send(3)
  }
}
}

private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Timing Operations
.debounce(for: 0.75, scheduler: DispatchQueue.main)

.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in

  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}
}

image

  • delay() : delay start publish pipeline
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Timing Operations
.delay(for: 2, scheduler: DispatchQueue.main)
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in

  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}
  • measureInterval() : to show how much take time in processing pipeline
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Timing Operations
.measureInterval(using: DispatchQueue.main)
.map({ stride in
  return "\(stride.timeInterval)"
})
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in

  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}
}

image

  • throttle() : First publish we are going to throttle and we’re not going to open that throttle back up. if you don’t want to just constantly publish as soon as we get values maybe we want to just you know every five seconds every 10 seconds we’ll open the throttle. This is good way to kind of batch your publishers if you need to
	private func addSubscribers() {
		// dataService.currentValuePublisher
		dataService.passThroughPublisher
		// MARK: - Add publisher pipeline
		// Timing Operations
			.throttle(for: 5, scheduler: DispatchQueue.main, latest: true)
			.map({ String($0) })
			.sink { completion in
				switch completion {
				case .finished:
					break
				case .failure(let error):
					self.error = "ERROR: \(error)"
				}
			} receiveValue: { [weak self] returnedValue in

				self?.data.append(returnedValue)
			}
			.store(in: &cancellables)
	}

스크린샷

  • retry() : If this was like an API call and we probably wouldn’t do it on subsciber we would probaly do it like directly on the publisher that’s fetching our data but if that publisher was comning back with an error we could retry. If you get in error, let’s retry and make that request one more time to see may be if it comes back with a successful result
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Timing Operations
.retry(3)
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in

  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}
  • timeout() : We’re going to consider this publisher to basically fail and we will stop listening and terminate publishing if we excced that time interval. If you are doing like external api calls maybe you’re trying to download somewhere on in Internet and tat download is taking a really long time. You don’t want to the app to just wait forever for the fetch request to return instead you would put this.
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher
// Timing Operations
.timeout(0.75, scheduler: DispatchQueue.main)
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in

  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

Multiple Publishers and Subscribers

  • combineLatest() : We only want to publish the in value if the latest publish from our bool publisher
class AdvancedCombineDataService {
// MARK: -  PROPERTY
// @Published var basicPublisher: String = "first publish"
let currentValuePublisher = CurrentValueSubject<String, Error>("first publish")
let passThroughPublisher = PassthroughSubject<Int, Error>()
let boolPublisher = PassthroughSubject<Bool, Error>()
let intPublisher = PassthroughSubject<Int, Error>()
// MARK: -  INIT
init() {
publishFakeData()
}
// MARK: -  FUNCTION
private func publishFakeData() {
let items: [Int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 10]

for x in items.indices {
DispatchQueue.main.asyncAfter(deadline: .now() + Double(x)) {
  // self.currentValuePublisher.send(items[x])
  self.passThroughPublisher.send(items[x])

  // Add boolPublisher
  if (x > 4 && x < 8) {
    self.boolPublisher.send(true)
    self.intPublisher.send(999) // intPublisher runs only after 5 index
  } else {
    self.boolPublisher.send(false)
  }

  // add completion to show last()
  if x == items.indices.last {
    self.passThroughPublisher.send(completion: .finished)
  }
}
}
}
}
// MARK: -  VIEWMODEL
class AdvancedComineViewModel: ObservableObject {
// MARK: -  PROPERTY
@Published var data: [String] = []
@Published var error: String = ""
let dataService = AdvancedCombineDataService() // singleton

var cancellables = Set<AnyCancellable>()
// MARK: -  INIT
init() {
addSubscribers()
}
// MARK: -  FUNCTION
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher
// MARK: - Add publisher pipeline
// Multiple Publishers / Subscribers
  .combineLatest(dataService.boolPublisher, dataService.intPublisher)
  .compactMap({ (int1, bool, int2) in
    if bool {
      return String(int1)
    }
    return "n/a"
  })
  .removeDuplicates() // passThroughPublisher, boolPublisher run pipleline twice so delte duplicated
  // .map({ String($0) })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      self.error = "ERROR: \(error)"
    }
  } receiveValue: { [weak self] returnedValue in

    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)
}
}

image

  • merge() : We can merge a publisher with another publisher. The thing here is that both publishers need to emit the same values. Those publishers into the same pipeline. So this is great if you have maybe different classes that have different the same type of data it’s coming from different sources or something and you want to merge them all togeter combine them all together and then just you know put that onto the screen
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher
// MARK: - Add publisher pipeline
// Multiple Publishers / Subscribers
.merge(with: dataService.intPublisher)
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in

  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}
}

image

  • zip() : Combine elements from two orther publishers and delivers groups of elements as tuples. You have basically a one to one ratio of each of these publishers
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Multiple Publishers / Subscribers
.zip(dataService.boolPublisher, dataService.intPublisher)
.map({ tuple in
  return String(tuple.0) + tuple.1.description + String(tuple.2)
})
// .map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in

  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}

image

  • catch() : Handles errors an upstream publisher by replacing it with another publisher. If we get an error from our first publisher we can actually catch and then use this next publisher fro the rest of our data flow
private func addSubscribers() {
// dataService.currentValuePublisher
dataService.passThroughPublisher

// MARK: - Add publisher pipeline
// Multiple Publishers / Subscribers
.tryMap({ int in
  if int == 3 {
    throw URLError(.badServerResponse)
  }
  return int
})
.catch({ error in
  return self.dataService.intPublisher
})
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)
}
}

image

  • share()
dataService.passThroughPublisher
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)

dataService.passThroughPublisher
.map({ $0 > 5 ? true : false })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.dataBools.append(returnedValue)
}
.store(in: &cancellables)

image

This might not be as efficient or even as elegant becuase if you have many times publishers then so many codes in there. So, if you want to do is only listen and subscribe to this publisher one but we want to then share that published value to two different locations


let sharePublisher = dataService.passThroughPublisher
.dropFirst(3)
.share()

sharePublisher
.map({ String($0) })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.data.append(returnedValue)
}
.store(in: &cancellables)

sharePublisher
.map({ $0 > 5 ? true : false })
.sink { completion in
  switch completion {
  case .finished:
    break
  case .failure(let error):
    self.error = "ERROR: \(error)"
  }
} receiveValue: { [weak self] returnedValue in
  self?.dataBools.append(returnedValue)
}
.store(in: &cancellables)

image

  • multicast() : Provides a subject to deliver elements to multiple subscibers. We can basically store the publish in another publisher and then we can determine programmatically when we want to actually connect to this publisher
let sharePublisher = dataService.passThroughPublisher
  .dropFirst(3)
  .share()
  .multicast {
    PassthroughSubject<Int, Error>()
  }

sharePublisher
  .map({ String($0) })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      self.error = "ERROR: \(error)"
    }
  } receiveValue: { [weak self] returnedValue in
    self?.data.append(returnedValue)
  }
  .store(in: &cancellables)

sharePublisher
  .map({ $0 > 5 ? true : false })
  .sink { completion in
    switch completion {
    case .finished:
      break
    case .failure(let error):
      self.error = "ERROR: \(error)"
    }
  } receiveValue: { [weak self] returnedValue in
    self?.dataBools.append(returnedValue)
  }
  .store(in: &cancellables)

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
  sharePublisher
    .connect()
    .store(in: &self.cancellables)
}
}

image

Custom publishers and Subscriber

We’re going to create a ViewModel and put our own custom timer in the ViewModel and have the view of courcse update with that timer

Custom publishers and subscirbers going to be very useful when you make applications. So, through this capter, you can see why we use publishers and subscribers in combine becuase by using one to lead effiently we can keep all of the data in sync in our App.


// counter from 1 to 10 then stop counter
import SwiftUI
import Combine

// MARK: -  VIEWMODEL
class SubscriberViewModel: ObservableObject {
// MARK: -  PROPERTY
@Published var count: Int = 0
// canvellable : we can cancel it at any time
var cancellables = Set<AnyCancellable>()

// MARK: -  INIT
init() {
setUpTimer()
}
// MARK: -  FUNCTION
func setUpTimer() {
Timer
.publish(every: 1, on: .main, in: .common)
.autoconnect()
// listening to the published values and then doing somthing with it on screen
.sink { [weak self] _ in
  guard let self = self else { return }
  self.count += 1

  // after we get to 10 it should cancel so the timer will stop
  if self.count >= 10 {
    for item in self.cancellables {
      item.cancel()
    }
  }
}
.store(in: &cancellables)
}
}

struct SubscriberBootCamp: View {
// MARK: -  PROPERTY
@StateObject var vm = SubscriberViewModel()
// MARK: -  BODY
var body: some View {
VStack {
Text("\(vm.count)")
  .font(.largeTitle)
} //: VSTACK
}
}

스크린샷

import SwiftUI
import Combine

// MARK: -  VIEWMODEL
class SubscriberViewModel: ObservableObject {
// MARK: -  PROPERTY
@Published var count: Int = 0
// canvellable : we can cancel it at any time
var cancellables = Set<AnyCancellable>()

@Published var textFieldText: String = ""
@Published var textIsValid: Bool = false

@Published var showButton: Bool = false

// MARK: -  INIT
init() {
setUpTimer()
addTextFieldSubscriber()
addButtonSubscriber()
}
// MARK: -  FUNCTION
func setUpTimer() {
  Timer
  .publish(every: 1, on: .main, in: .common)
  .autoconnect()
// listening to the published values and then doing somthing with it on screen
  .sink { [weak self] _ in
    guard let self = self else { return }
    self.count += 1
  }
  .store(in: &cancellables)
}

func addTextFieldSubscriber() {
$textFieldText
.map { (text) -> Bool in
  if text.count > 3 {
    return true
  }
  return false
}
// .assign(to: \.textIsValid, on: self)
.sink(receiveValue: { [weak self] isValid in
  self?.textIsValid = isValid
})
.store(in: &cancellables)
}

func addButtonSubscriber() {
$textIsValid
.combineLatest($count)
.sink { [weak self] isValid, count in
  guard let self = self else { return }
  if isValid && count >= 10 {
    self.showButton = true
  } else {
    self.showButton = false
  }
}
.store(in: &cancellables)
}
}

struct SubscriberBootCamp: View {
// MARK: -  PROPERTY
@StateObject var vm = SubscriberViewModel()
// MARK: -  BODY
var body: some View {
VStack {
Text("\(vm.count)")
.font(.largeTitle)

Text(vm.textIsValid.description)

TextField("Type somthing here...", text: $vm.textFieldText)
.padding(.leading)
.frame(height: 55)
.font(.headline)
.background(Color.gray.opacity(0.2).cornerRadius(10))
.overlay(
ZStack {
  Image(systemName: "xmark")
    .foregroundColor(.red)
    .opacity(
      vm.textFieldText.count < 1 ? 0.0 :
      vm.textIsValid ? 0.0 : 1.0)

  Image(systemName: "checkmark")
    .foregroundColor(.green)
    .opacity(vm.textIsValid ? 1.0 : 0.0)
} //: ZSTACK
  .font(.title)
  .padding(.trailing)
, alignment: .trailing
)

Button {

} label: {
Text("submit".uppercased())
.font(.headline)
.foregroundColor(.white)
.frame(height: 55)
.frame(maxWidth: .infinity)
.background(Color.blue.cornerRadius(10))
.opacity(vm.showButton ? 1.0 : 0.5)
}
.disabled(!vm.showButton)

} //: VSTACK
.padding()
}
}

스크린샷


🗃 Reference


🗃 Reference

SwiftUI Thinking - https://www.youtube.com/watch?v=RUZcs0SWqnI&list=PLwvDm4Vfkdphc1LLLjCaEd87BEg07M97y&index=20

Categories:

Updated:

Leave a comment